Source code for sfftk.notes.modify

"""
``sfftk.notes.modify``
=======================

Add, edit and delete terms in EMDB-SFF files
"""
import json
import os
import re
import shutil
import sys

import numpy
import requests
import sfftkrw.schema.adapter_v0_8_0_dev1 as schema
from sfftkrw.core import _str, _decode
from sfftkrw.core.parser import parse_args
from sfftkrw.core.print_tools import print_date
from styled import Styled

from . import RESOURCE_LIST_NAMES
from ..notes.view import HeaderView, NoteView
from ..sff import handle_convert

__author__ = "Paul K. Korir, PhD"
__email__ = "pkorir@ebi.ac.uk, paul.korir@gmail.com"
__date__ = "2017-04-07"


# todo: allow user to modify/view hierarchy through segmentation annotation toolkit


[docs] class ExternalReference(object): """Class definition for a :py:class:`ExternalReference` object""" def __init__(self, resource=None, url=None, accession=None): """Initialise an object of class :py:class:`ExternalReference` :param type_: the name of the resource :param otherType: the IRI at which the resource may be reached :param value: the external reference accession code """ self.resource = resource self.url = url self.accession = accession self.label, self.description = self._get_text() @property def iri(self): """The IRI value should be *double* url-encoded""" if sys.version_info[0] > 2: from urllib.parse import urlencode else: from urllib import urlencode urlenc = urlencode({u'iri': self.url.encode('idna')}) urlenc2 = urlencode({u'iri': urlenc.split('=')[1]}) return _decode(urlenc2.split('=')[1], 'utf-8') # fixme: perhaps the text should exist already instead of being searched for? # this seems to be a special case for OLS # the user provides the name of the resource, the IRI/URL and the accession and this # method obtains the text meaning that its implementation would have to depend on # the name of the resource i.e. the field from which to extract the text # will vary by resource def _get_text(self): """Get the label and description if they exist""" label = None description = None # only search for label and description if from OLS if self.resource not in RESOURCE_LIST_NAMES: url = "https://www.ebi.ac.uk/ols/api/ontologies/{ontology}/terms/{iri}".format( ontology=self.resource, iri=self.iri, ) response = requests.get(url) if response.status_code == 200: self._result = json.loads(response.text) # label try: label = self._result['label'] except KeyError: label = '' # description try: description = self._result['description'][0] if self._result['description'] else None except KeyError: description = '' else: print_date( "Could not find label and description for external reference {}:{}".format(self.resource, self.accession)) elif self.resource == 'EMDB': url = "https://www.ebi.ac.uk/emdb/api/entry/{}".format(self.accession) response = requests.get(url) if response.status_code == 200: self._result = json.loads(response.text) # label label = self._result['emdb_id'] # description description = self._result['admin']['title'] else: print_date( "Could not find label and description for external reference {}:{}".format(self.resource, self.accession)) elif self.resource == "PDB": url = "https://www.ebi.ac.uk/pdbe/search/pdb/select?q={}&wt=json".format(self.accession) response = requests.get(url) if response.status_code == 200: self._result = json.loads(response.text) try: # label label = self._result['response']['docs'][0]['title'] # description description = "; ".join(self._result['response']['docs'][0]['organism_scientific_name']) except IndexError: print_date( "Could not find label and description for external reference {}:{}".format(self.resource, self.accession)) else: print_date( "Could not find label and description for external reference {}:{}".format(self.resource, self.accession)) elif self.resource == "UniProt": url = ( "https://rest.uniprot.org/uniprotkb/search" "?query=accession:{search_term}&format=tsv&size=1&fields=accession,id," "protein_name,organism_name" ).format( search_term=self.accession, ) response = requests.get(url) if response.status_code == 200: self._result = response.text try: # split rows; split columns; dump first and last rows _structured_results = list(map(lambda r: r.split('\t'), self._result.split('\n')))[1:-1] # make a list of dicts with the given ids structured_results = list(map(lambda r: dict(zip(['id', 'name', 'proteins', 'organism'], r)), _structured_results))[0] # label label = structured_results['name'] # description description = "{} (Organism: {})".format(structured_results['proteins'], structured_results['organism']) except ValueError as v: print_date("Unknown exception: {}".format(str(v))) except IndexError: print_date( "Could not find label and description for external reference {}:{}".format(self.resource, self.accession)) else: print_date( "Could not find label and description for external reference {}:{}".format(self.resource, self.accession)) elif self.resource == 'Europe PMC': url = "https://www.ebi.ac.uk/europepmc/webservices/rest/search?query=30932919&format=json" response = requests.get(url) if response.status_code == 200: self._result = json.loads(response.text) try: # label label = self._result["resultList"]["result"][0]["authorString"] # description description = self._result["resultList"]["result"][0]["title"] except IndexError: print_date( "Could not find label and description for external reference {}:{}".format( self.resource, self.accession ) ) else: print_date( "Could not find label and description for external reference {}:{}".format(self.resource, self.accession)) elif self.resource == 'EMPIAR': url = "https://www.ebi.ac.uk/empiar/api/entry/{}".format(self.accession) response = requests.get(url) if response.status_code == 200: self._result = json.loads(response.text) try: # label label = self._result[self.accession]["title"] # description description = self._result[self.accession]["experiment_type"] except IndexError: print_date( "Could not find label and description for external reference {}:{}".format( self.resource, self.accession ) ) else: print_date( "Could not find label and description for external reference {}:{}".format(self.resource, self.accession)) return label, description
[docs] class NoteAttr(object): """Descriptor class for note attributes :param initval: the initial value of the attribute (default: :py:class:`None`) :param str name: the name of the variable (default: 'var') """ def __init__(self, initval=None, name='var'): self.val = initval self.name = name def __get__(self, obj, _): return self.val def __set__(self, obj, val): self.val = val
# todo: attribute type checking
[docs] class BaseNote(object): """Note base class""" def __init__(self): self._ext_ref_list = list() @property def external_references(self): return self._ext_ref_list @external_references.setter def external_references(self, value): """Assigne directly; must all be ExternalReference objects or lists of strings""" self._ext_ref_list = list() # empty the present list for v in value: if isinstance(v, ExternalReference): self._ext_ref_list.append(v) elif isinstance(v, list) or isinstance(v, tuple): self._ext_ref_list.append(ExternalReference(*v))
[docs] class AbstractGlobalNote(BaseNote): """Abstract class definition for global annotations which defines attributes of global annotation. Also defines three methods that effect the annotation to a segmentation object: - :py:func:`add_to_segmentation` - :py:func:`edit_in_segmentation` - :py:func:`del_from_segmentation` """ name = NoteAttr('name') software_name = NoteAttr('software_name') software_version = NoteAttr('sofware_version') software_processing_details = NoteAttr('software_processing_details') transform = NoteAttr('transform') details = NoteAttr('details') external_reference_id = NoteAttr('external_reference_id')
[docs] def add_to_segmentation(self, segmentation): """Adds this note to the given segmentation :param segmentation: an EMDB-SFF segmentation object :type segmentation: :py:class:`sfftkrw.SFFSegmentation` :return segmentation: the EMDB-SFF segmentation with the annotation added """ # name if self.name is not None: segmentation.name = self.name # to ensure we don't have any id collisions if self.software_name or self.software_version or self.software_processing_details: if segmentation.software_list: max_id = max(list(segmentation.software_list.get_ids())) software = schema.SFFSoftware(id=max_id + 1) else: segmentation.software_list = schema.SFFSoftwareList() software = schema.SFFSoftware() # software name if self.software_name is not None: software.name = self.software_name # software version if self.software_version is not None: software.version = self.software_version # software processing details if self.software_processing_details is not None: software.processing_details = self.software_processing_details segmentation.software_list.append(software) # transform if self.transform is not None: if segmentation.transform_list: max_id = max(list(segmentation.transform_list.get_ids())) transform = schema.SFFTransformationMatrix( id=max_id + 1, rows=3, cols=4, data=" ".join(map(str, self.transform)) ) segmentation.transform_list.append(transform) else: # create and populate the transform list immediately segmentation.transform_list = schema.SFFTransformList() segmentation.transform_list.append( schema.SFFTransformationMatrix.from_array(numpy.array(self.transform).reshape(3, 4))) if self.details is not None: segmentation.details = self.details # global external references if self.external_references: if not segmentation.global_external_references: segmentation.global_external_references = schema.SFFGlobalExternalReferenceList() for g_ext_ref in self.external_references: segmentation.global_external_references.append( schema.SFFExternalReference( resource=g_ext_ref.resource, url=g_ext_ref.url, accession=g_ext_ref.accession, label=g_ext_ref.label, description=g_ext_ref.description ) ) return segmentation
[docs] def edit_in_segmentation(self, segmentation): """Modify the global annotation of the given segmentation :param segmentation: an EMDB-SFF segmentation object :type segmentation: :py:class:`sfftkrw.SFFSegmentation` :return segmentation: the EMDB-SFF segmentation with annotated edited :rtype segmentation: :py:class:`sfftkrw.SFFSegmentation` """ # name if self.name is not None: segmentation.name = self.name # get the software to be edited try: software = segmentation.software_list.get_by_id(self.software_id) except KeyError: software = None if software is not None: # software name if self.software_name is not None: software.name = self.software_name # software version if self.software_version is not None: software.version = self.software_version # software processing details if self.software_processing_details is not None: software.processing_details = self.software_processing_details # get the transform to be edited try: transform = segmentation.transform_list.get_by_id(self.transform_id) except KeyError: transform = None if transform is not None: if self.transform is not None: transform.data = " ".join(map(str, self.transform)) # details if self.details is not None: segmentation.details = self.details # external references start_index = self.external_reference_id # editing global_external_references starting at index 'start_index' # this will result in all subsequent global_external_references being replaced # once it gets to the end of the list of gER any additional ones will be appended # e.g. if we have 5 gERs and we want to add another 5 but starting at index 4 (the fifth) # then we will replace index 4 then keep adding the other new 4 gERs # to add new ones use 'notes add -E <extref>' for g_ext_ref in self.external_references: ext_ref = schema.SFFExternalReference( resource=g_ext_ref.resource, url=g_ext_ref.url, accession=g_ext_ref.accession, label=g_ext_ref.label, description=g_ext_ref.description ) try: segmentation.global_external_references.insert( start_index, ext_ref, ) except IndexError: segmentation.global_external_references.append( ext_ref ) start_index += 1 return segmentation
[docs] def del_from_segmentation(self, segmentation): """Delete attributes from a segmentation :param segmentation: an EMDB-SFF segmentation object :type segmentation: :py:class:`sfftkrw.SFFSegmentation` :return segmentation: the EMDB-SFF segmentation with annotation deleted :rtype segmentation: :py:class:`sfftkrw.SFFSegmentation` """ # name if self.name: segmentation.name = None # software if self.software_id is not None: for software_id in self.software_id: try: software = segmentation.software_list.get_by_id(software_id) except KeyError: software = None if software is not None: segmentation.software_list.remove(software) # transform if self.transform_id is not None: for transform_id in self.transform_id: try: transform = segmentation.transform_list.get_by_id(transform_id) except KeyError: software = None if transform is not None: segmentation.transform_list.remove(transform) # details if self.details: segmentation.details = None # external references if self.external_reference_id: if segmentation.global_external_references: # current globalExtRefs # fixme: should not cast!!! # extract the items to be removed to_remove = list() for i in self.external_reference_id: try: to_remove.append(segmentation.global_external_references[i]) except IndexError: print_date(_str( Styled("[[ '{}'|fg-red ]]", "Failed to delete global external reference ID {}".format(i)))) # now remove them for t in to_remove: segmentation.global_external_references.remove(t) # segmentation.global_external_references = schema.SFFGlobalExternalReferenceList(refs) else: print_date("No global external references to delete from!") return segmentation
def __repr__(self): return ( "{class_name}(name={name}, software_name={software_name}, software_version={software_version}, " "software_processing_details={software_processing_details}, details={details}, " "external_ref_id={external_ref_id})" ).format( class_name=self.__class__, name=self.name, software_name=self.software_name, software_version=self.software_version, software_processing_details=self.software_processing_details, details=self.details, external_ref_id=self.external_reference_id, )
[docs] class GlobalArgsNote(AbstractGlobalNote): """Class defining segmentation (global) annotation based on command-line arguments :param args: an :py:class:`argparse.Namespace` object :param configs: persistent configurations for ``sfftk`` """ def __init__(self, args, configs, *args_, **kwargs_): super(GlobalArgsNote, self).__init__(*args_, **kwargs_) self.name = args.name if hasattr(args, 'name') else None self.configs = configs self.software_id = args.software_id if hasattr(args, 'software_id') else None self.software_name = args.software_name self.software_version = args.software_version self.software_processing_details = args.software_processing_details self.transform_id = args.transform_id if hasattr(args, 'transform_id') else None self.transform = args.transform if hasattr(args, 'transform') else None self.details = args.details if hasattr(args, 'external_ref_id'): self.external_reference_id = args.external_ref_id if hasattr(args, 'external_ref'): if args.external_ref: for resource, url, accession in args.external_ref: self._ext_ref_list.append( ExternalReference( resource=resource, url=url, accession=accession ) )
[docs] class GlobalSimpleNote(AbstractGlobalNote): """Class definition for global segmentation notes""" def __init__(self, name=None, software_id=None, software_name=None, software_version=None, software_processing_details=None, transform_id=None, transform=None, details=None, external_reference_id=None, external_references=None, *args, **kwargs): """Initialise a :py:class:`GlobalSimpleNote` object :param str name: name of the whole segmentation :param int software_id: ID of the software to be edited or deleted :param str software_name: name of the software used :param str software_version: the version name :param str software_processing_details: a brief description of how the segmentation was generated using the corresponding software :param int transform_id: ID of the transform to be edited or deleted :param list transform: a sequence of 12 numbers used for the 3x4 transformation matrix :param str details: a description of this segmentation :param int external_reference_id: ID of the external reference to be edited or deleted :param list external_references: iterable of a tuple with the ``resource``, ``url`` and ``accession`` """ super().__init__(*args, **kwargs) self.name = name self.software_id = software_id self.software_name = software_name self.software_version = software_version self.software_processing_details = software_processing_details self.transform_id = transform_id self.transform = transform self.details = details self.external_reference_id = external_reference_id # external references if external_references: for resource, url, accession in external_references: self._ext_ref_list.append( ExternalReference( resource=resource, url=url, accession=accession, ) )
[docs] class AbstractNote(BaseNote): """Note 'abstact' class that defines private attributes and main methods""" name = NoteAttr('name') description = NoteAttr('description') number_of_instances = NoteAttr('number_of_instances') external_reference_id = NoteAttr('external_reference_id')
[docs] def add_to_segment(self, segment): """Add the annotations found in this ``Note`` object to the :py:class:`sfftkrw.SFFSegment` object :param segment: single segment in EMDB-SFF :type segment: :py:class:`sfftkrw.SFFSegment` """ # biological_annotation if not segment.biological_annotation: segment.biological_annotation = schema.SFFBiologicalAnnotation() bA = segment.biological_annotation if self.name is not None: bA.name = self.name if self.description is not None: bA.description = self.description # else: # bA.description = segment.biological_annotation.description if self.number_of_instances: bA.number_of_instances = self.number_of_instances # else: # bA.number_of_instances = segment.biological_annotation.number_of_instances # copy current external references bA.external_references = segment.biological_annotation.external_references if self.external_references: if not bA.external_references: bA.external_references = schema.SFFExternalReferenceList() for ext_ref in self.external_references: ext_ref._get_text() bA.external_references.append( schema.SFFExternalReference( resource=ext_ref.resource, url=ext_ref.url, accession=ext_ref.accession, label=ext_ref.label, description=ext_ref.description ) ) segment.biological_annotation = bA return segment
[docs] def edit_in_segment(self, segment): """Edit the annotations found in this ``Note`` object to the :py:class:`sfftkrw.SFFSegment` object :param segment: single segment in EMDB-SFF :type segment: :py:class:`sfftkrw.SFFSegment` """ # biological_annotation if not segment.biological_annotation: print_date("Note: no biological annotation was found. You may edit only after adding with 'sff notes add'.") else: bA = segment.biological_annotation # name if self.name is not None: bA.name = self.name # description if self.description is not None: bA.description = self.description # number of instances if self.number_of_instances: bA.number_of_instances = self.number_of_instances # external references # editing external_references starting at index 'start_index' # this will result in all subsequent external_references being replaced # once it gets to the end of the list of eR any additional ones will be appended # e.g. if we have 5 eRs and we want to add another 5 but starting at index 4 (the fifth) # then we will replace index 4 then keep adding the other new 4 eRs # to add new ones use 'notes add -i <segment_id> -E <extref>' start_index = self.external_reference_id for ext_ref in self.external_references: if not bA.external_references: bA.external_references = schema.SFFExternalReferenceList() reference = schema.SFFExternalReference( resource=ext_ref.resource, url=ext_ref.url, accession=ext_ref.accession, label=ext_ref.label, description=ext_ref.description ) try: bA.external_references.insert( start_index, reference, ) except IndexError: bA.external_references.append( reference ) start_index += 1 segment.biological_annotation = bA return segment
[docs] def del_from_segment(self, segment): """Delete the annotations found in this ``Note`` object to the :py:class:`sfftkrw.SFFSegment` object :param segment: single segment in EMDB-SFF :type segment: :py:class:`sfftkrw.SFFSegment` """ # biological_annotation if not segment.biological_annotation: print_date("No biological anotation found! Use 'add' to first add a new annotation.") else: bA = segment.biological_annotation if self.name: bA.name = None if self.description: bA.description = None if self.number_of_instances: bA.number_of_instances = None if self.external_reference_id: if bA.external_references: # current extRefs refs = list(bA.external_references) # extract the items to be removed to_remove = list() for i in self.external_reference_id: try: to_remove.append(refs[i]) except IndexError: print_date( _str(Styled("[[ '{}|fg-red ]]", "Failed to delete external reference ID {}".format(i)))) # now remove them for t in to_remove: refs.remove(t) reference_list = schema.SFFExternalReferenceList() if refs: # if there is anything left # we have to manually add the remaining refs for ref in refs: ref.id = 0 # reset the ids reference_list.append(ref) bA.external_references = reference_list else: print_date("No external references to delete from!") segment.biological_annotation = bA return segment
[docs] class ArgsNote(AbstractNote): """Class definition for an ArgsNote object""" def __init__(self, args, configs, *args_, **kwargs_): """Initialise an :py:class:`ArgsNote` object :param args: an :py:class:`argparse.Namespace` object :params configs: ``sfftk`` persistent configs (see :py:mod:`sfftk.config` documentation) """ super(ArgsNote, self).__init__(*args_, **kwargs_) self.name = args.segment_name self.description = args.description self.number_of_instances = args.number_of_instances if hasattr(args, 'external_ref_id'): self.external_reference_id = args.external_ref_id # external_references if hasattr(args, 'external_ref'): # sff notes del has no -E arg if args.external_ref: for resource, url, accession in args.external_ref: self._ext_ref_list.append( ExternalReference( resource=resource, url=url, accession=accession ) )
[docs] class SimpleNote(AbstractNote): """Class definition for a :py:class:`SimpleNote` object""" def __init__( self, name=None, description=None, number_of_instances=None, external_reference_id=None, external_references=None, *args, **kwargs ): """Initialise an :py:class:`SimpleNote` object :param str description: the description string of the segment :param int number_of_instances: the number of instances of this segment :param int external_reference_id: ID of an external reference :param external_references: iterable of external references """ super(SimpleNote, self).__init__(*args, **kwargs) self.name = name self.description = description self.number_of_instances = number_of_instances self.external_reference_id = external_reference_id # external_references if external_references: for resource, url, accession in external_references: self._ext_ref_list.append( ExternalReference( resource=resource, url=url, accession=accession ) )
[docs] def add_note(args, configs): """Add annotation to a segment specified in args :param args: parsed arguments :type args: :py:class:`argparse.Namespace` :param configs: configurations object :type configs: :py:class:`sfftk.core.configs.Configs` :return status: status :rtype status: int """ sff_seg = schema.SFFSegmentation.from_file(args.sff_file) # global changes if args.segment_id is None: # create a GlobalArgsNote object global_note = GlobalArgsNote(args, configs) # add notes to segmentation sff_seg = global_note.add_to_segmentation(sff_seg) # show the updated header string = Styled("[[ ''|fg-green:no-end ]]") string += _str(HeaderView(sff_seg)) string += Styled("[[ ''|reset ]]") # fixme: use print_date print(_str(string)) else: found_segment = False for segment in sff_seg.segments: if segment.id in args.segment_id: note = ArgsNote(args, configs) sff_seg.segment = note.add_to_segment(segment) string = Styled("[[ ''|fg-green:no-end ]]") string += _str(NoteView(sff_seg.segment, _long=True)) string += Styled("[[ ''|reset ]]") # fixme: use print_date print(string) found_segment = True if not found_segment: print_date("Segment of ID(s) {} not found".format(", ".join(map(str, args.segment_id)))) # export sff_seg.export(args.sff_file) return 0
[docs] def edit_note(args, configs): """Edit annotation to a segment specified in args :param args: parsed arguments :type args: :py:class:`argparse.Namespace` :param configs: configurations object :type configs: :py:class:`sfftk.core.configs.Configs` :return status: status :rtype status: int """ sff_seg = schema.SFFSegmentation.from_file(args.sff_file) # global changes if args.segment_id is None: # create a GlobalArgsNote object global_note = GlobalArgsNote(args, configs) # edit the notes in the segmentation # editing name, software, filePath, details are exactly the same as adding # editing external references is different: # the external_reference_id refers to the extRef to edit # any additionally specified external references (-E a b -E x y) are inserted after the edited index sff_seg = global_note.edit_in_segmentation(sff_seg) # show the updated header string = Styled("[[ ''|fg-green:no-end ]]") string += _str(HeaderView(sff_seg)) string += Styled("[[ ''|reset ]]") # fixme: use print_date print(_str(string)) else: found_segment = False for segment in sff_seg.segments: if segment.id in args.segment_id: note = ArgsNote(args, configs) sff_seg.segment = note.edit_in_segment(segment) string = Styled("[[ ''|fg-green:no-end ]]") string += _str(NoteView(sff_seg.segment, _long=True)) string += Styled("[[ ''|reset ]]") # fixme: use print_date print(_str(string)) found_segment = True if not found_segment: print_date("Segment of ID(s) {} not found".format(", ".join(map(str, args.segment_id)))) # export sff_seg.export(args.sff_file) return 0
[docs] def del_note(args, configs): """Delete annotation to a segment specified in args :param args: parsed arguments :type args: :py:class:`argparse.Namespace` :param configs: configurations object :type configs: :py:class:`sfftk.core.configs.Configs` :return status: status :rtype status: int """ sff_seg = schema.SFFSegmentation.from_file(args.sff_file) # global changes if args.segment_id is None: # create a GlobalArgsNote object global_note = GlobalArgsNote(args, configs) # delete the notes from segmentation sff_seg = global_note.del_from_segmentation(sff_seg) # show the updated header string = Styled("[[ ''|fg-green:no-end ]]") string += _str(HeaderView(sff_seg)) string += Styled("[[ ''|reset ]]") # fixme: use print_date print(_str(string)) else: found_segment = False for segment in sff_seg.segments: if segment.id in args.segment_id: note = ArgsNote(args, configs) sff_seg.segment = note.del_from_segment(segment) string = Styled("[[ ''|fg-green:no-end ]]") string += _str(NoteView(sff_seg.segment, _long=True)) string += Styled("[[ ''|reset ]]") # fixme: use print_date print(_str(string)) found_segment = True if not found_segment: print_date("Segment of ID(s) {} not found".format(", ".join(map(str, args.segment_id)))) # export sff_seg.export(args.sff_file) return 0
[docs] def copy_notes(args, configs): """Copy notes across segments One or more segments can be chosen for either or both source and destination :param args: parsed arguments :type args: :py:class:`argparse.Namespace` :param configs: configurations object :type configs: :py:class:`sfftk.core.configs.Configs` :return status: status :rtype status: int """ sff_seg = schema.SFFSegmentation.from_file(args.sff_file) # from segment from_segment = list() if args.segment_id is not None: from_segment = args.segment_id if args.from_global: try: from_segment.append(-1) except NameError: from_segment = [-1] # to_segment to_segment = list() if args.to_segment is not None: to_segment = args.to_segment elif args.to_all: all_segment_ids = set(sff_seg.segments.get_ids()) to_segment = list(all_segment_ids.difference(set(from_segment))) if args.to_global: try: to_segment.append(-1) except NameError: to_segment = [-1] for f in from_segment: for t in to_segment: sff_seg.copy_annotation(f, t) string = Styled("[[ ''|fg-green:no-end ]]") if t == -1: string += _str(HeaderView(sff_seg)) else: string += _str(NoteView(sff_seg.segments.get_by_id(t), _long=True)) string += Styled("[[ ''|reset ]]") # fixme: use print_date print(_str(string)) # export sff_seg.export(args.sff_file) return 0
[docs] def clear_notes(args, configs): """Clear notes from segments :param args: parsed arguments :type args: :py:class:`argparse.Namespace` :param configs: configurations object :type configs: :py:class:`sfftk.core.configs.Configs` :return status: status :rtype status: int """ sff_seg = schema.SFFSegmentation.from_file(args.sff_file) from_segment = list() if args.segment_id is not None: from_segment = args.segment_id elif args.from_all_segments: from_segment = list(sff_seg.segments.get_ids()) if args.from_global: try: from_segment.append(-1) except NameError: from_segment = [-1] # fixme: sff notes clear emd_5625.sff raises UnboundLocalError: local variable # 'from_segment' referenced before assignment for f in from_segment: sff_seg.clear_annotation(f) if args.from_global: string = Styled("[[ ''|fg-green:no-end ]]") string += _str(HeaderView(sff_seg)) string += Styled("[[ ''|reset ]]") # fixme: use print_date print(_str(string)) if args.segment_id is not None: for segment in sff_seg.segments: if segment.id in args.segment_id: string = Styled("[[ ''|fg-green:no-end ]]") string += _str(NoteView(segment, _long=True)) string += Styled("[[ ''|reset ]]") # fixme: use print_date print(_str(string)) elif args.from_all_segments: for segment in sff_seg.segments: string = Styled("[[ ''|fg-green:no-end ]]") string += _str(NoteView(segment, _long=True)) string += Styled("[[ ''|reset ]]") # fixme: use print_date print(_str(string)) # export sff_seg.export(args.sff_file) return 0
[docs] def merge(args, configs): """Merge notes from two EMDB-SFF files :param args: parsed arguments :type args: :py:class:`argparse.Namespace` :param configs: configurations object :type configs: :py:class:`sfftk.core.configs.Configs` :return status: status :rtype status: int """ # source if args.verbose: print_date("Reading in source: {}...".format(args.source)) source = schema.SFFSegmentation.from_file(args.source) # destination if args.verbose: print_date("Reading in destination: {}...".format(args.other)) other = schema.SFFSegmentation.from_file(args.other) if args.verbose: print_date("Merging annotations...") other.merge_annotation(source, include_colour=args.include_colour) # include transforms instead of modifying and releasing a new sfftk-rw package other.transform_list = source.transform_list # export if args.verbose: print_date("Writing output to {}".format(args.output)) other.export(args.output) if args.verbose: print_date("Done") return 0
[docs] def save(args, configs): """Save changes made :param args: parsed arguments :type args: :py:class:`argparse.Namespace` :param configs: configurations object :type configs: :py:class:`sfftk.core.configs.Configs` :return status: status :rtype status: int """ temp_file = configs['__TEMP_FILE'] if os.path.exists(temp_file): # temp_file: file.sff; args.sff_file: file.sff copy # temp_file: file.hff; args.sff_file: file.hff copy # temp_file: file.json; args.sff_file: file.json copy if (re.match(r'.*\.sff$', temp_file, re.IGNORECASE) and re.match(r'.*\.sff$', args.sff_file, re.IGNORECASE)) or \ (re.match(r'.*\.hff$', temp_file, re.IGNORECASE) and re.match(r'.*\.hff$', args.sff_file, re.IGNORECASE)) or \ (re.match(r'.*\.json$', temp_file, re.IGNORECASE) and re.match(r'.*\.json$', args.sff_file, re.IGNORECASE)): print_date("Copying temp file {} to {}...".format(temp_file, args.sff_file)) shutil.copy(temp_file, args.sff_file) print_date("Deleting temp file {}...".format(temp_file)) os.remove(temp_file) assert not os.path.exists(temp_file) # temp_file: file.sff; args.sff_file: file.hff convert # temp_file: file.sff; args.sff_file: file.json convert # temp_file: file.hff; args.sff_file: file.sff convert # temp_file: file.hff; args.sff_file: file.json convert elif (re.match(r'.*\.sff$', temp_file, re.IGNORECASE) and ( re.match(r'.*\.hff$', args.sff_file, re.IGNORECASE) or re.match(r'.*\.json$', args.sff_file, re.IGNORECASE))) or \ (re.match(r'.*\.hff$', temp_file, re.IGNORECASE) and ( re.match(r'.*\.json$', args.sff_file, re.IGNORECASE) or re.match(r'.*\.sff$', args.sff_file, re.IGNORECASE))): cmd = "convert -v {} -o {}".format(temp_file, args.sff_file) _args = parse_args(cmd, use_shlex=True) handle_convert(_args) # convert print_date("Deleting temp file {}...".format(temp_file)) os.remove(temp_file) assert not os.path.exists(temp_file) # temp_file: file.json; args.sff_file: file.sff merge # temp_file: file.json; args.sff_file: file.hff merge elif re.match(r'.*\.json$', temp_file, re.IGNORECASE) and ( re.match(r'.*\.sff$', args.sff_file, re.IGNORECASE) or re.match(r'.*\.hff$', args.sff_file, re.IGNORECASE)): json_seg = schema.SFFSegmentation.from_file(temp_file) seg = schema.SFFSegmentation.from_file(args.sff_file) # merge seg.merge_annotation(json_seg) seg.transform_list = json_seg.transform_list seg.export(args.sff_file) print_date("Deleting temp file {}...".format(temp_file)) os.remove(temp_file) assert not os.path.exists(temp_file) else: print_date("Unknown file type: {}".format(args.sff_file)) return 64 return 0 else: print_date( "Missing temp file {}. First perform some edit actions ('add', 'edit', 'del') before trying to save.".format( temp_file)) return 64
[docs] def trash(args, configs): """Trash changes made :param args: parsed arguments :type args: :py:class:`argparse.Namespace` :param configs: configurations object :type configs: :py:class:`sfftk.core.configs.Configs` :return status: status :rtype status: int """ temp_file = configs['__TEMP_FILE'] if os.path.exists(temp_file): print_date("Discarding all changes made in temp file {}...".format(temp_file), newline=False) os.remove(temp_file) assert not os.path.exists(temp_file) print_date("Done", incl_date=False) return 0 else: print_date("Unable to discard with missing temp file {}. No changes made.".format(temp_file)) return 65