Copyright 2015-2023 Stephane De Mita, Mathieu Siol
This file is part of EggLib.
EggLib is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
EggLib is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with EggLib. If not, see <http://www.gnu.org/licenses/>.
import re, operator, os, sys
from io import StringIO
from .. import eggwrapper as _eggwrapper
from .. import alphabets, _interface
_SOFA = {
'SO:0000704': 'gene',
'SO:0000234': 'mRNA',
'SO:0000147': 'exon',
'SO:0000316': 'cds',
'SO:0000188': 'intron',
'SO:0000610': 'polyA_sequence',
'SO:0000553': 'polyA_site',
'SO:0000204': 'five_prime_UTR',
'SO:0000205': 'three_prime_UTR'
[docs]class GFF3(object):
Import GFF3 genome annotation data.
Read General Feature Format (GFF)-formatted (version 3) genome
annotation data from a file specified by name or from a provided
See the description of the GFF3 format at `<http://www.sequenceontology.org/gff3.shtml>`_.
All features are loaded into memory and can be processed interactively.
:param source: name of a GFF3-formatted file. To pass a string,
use the factory method :meth:`from_string`.
:param strict: by default, apply strictly GFF3 requirements. See
.. _gff3-strict:
The ``strict`` argument can be set to ``False`` to support CDS
features lacking a phase.
Some observations regarding this implementation:
* The validity of parent-child relationship with respect to the
Sequence Ontology is not enforced, meaning that this parser
allows any type of feature to be parent to any feature of any
* The Sequence Ontology accession number of types of features related
to genes are automatically mapped to the type name (such as
"SO:0000704", which is translated as "gene").
* Discontinuous features are required to have matching attributes
(except start, stop, and phase for CDS features) but subsequent
segments are allowed to skip attributes if the first
segment defines them.
* fasta-formatted sequences are imported but are currently not
checked for consistency with annotations and defined sequence
:class:`.io.GFF3` instances are iterable. The expression:
.. code-block:: python
>>> for feat in GFF3:
... ...
is equivalent to:
.. code-block:: python
>>> for feat in GFF3.iter_features():
... ...
[docs] @classmethod
def from_string(cls, string, strict=True):
Import data from a GFF3-formatted string. This is a class method,
which can be called as shown below to create a new object:
.. code-block:: python
>>> gff3 = egglib.io.GFF3.from_string(gff3_string)
:param string: GFF3-formatted string.
:param strict: apply strict GFF3 specifications (see :ref:`here <gff3-strict>`).
:return: A new :class:`.io.GFF3` instance.
obj = cls.__new__(cls)
obj._strict = strict
return obj
def __init__(self, fname, strict=True):
self._strict = strict
f = open(fname, mode='r')
def _parse(self, f): # read a file or file-like object
# initialize parameters
self._regions = {}
self._feature_ontology = []
self._attribute_ontology = []
self._source_ontology = []
self._species = None
self._genome_build = None
self._sequences = None
self._seqid = []
self._mapping = {}
self._num_tot = 0
self._num_top = 0
self._IDs = set() # IDs of all features
self._open_features = {} # only those with an ID
# check first line
mo = re.match('##gff-version[ \t]+(3(\.\d+(\.\d+)?)?)$', f.readline().strip())
if mo is None: raise IOError('invalid first line of GFF3 file (invalid gff-version directive or invalid GFF version)')
self._version = mo.group(1)
# read all lines
self._lineno = 1 # offset because of header line
self._f = f
self._pos = self._f.tell()
while True:
self._line = f.readline()
if self._line == '': break
self._lineno += 1
# sort features within seqid entries
for key in self._mapping:
self._mapping[key].sort(key=lambda feat: (feat.start, feat.end))
for feat in self._mapping[key]:
def _readline(self, line): # process a line
if line.rstrip() == '###': self._close_objects()
elif line[0] == '>': self._get_sequences()
elif line.rstrip() == '##FASTA':
self._pos = self._f.tell()
elif line[:2] == '##': self._get_directive()
elif line[0] == '#': pass
else: self._get_annotation()
self._pos = self._f.tell()
def _close_objects(self): # close all open features
for ID, feat in self._open_features.items():
def _process_text(self, text): # expand escape expressions
for hit in set(re.findall('(%[0-9A-Fa-f]{2})', text)):
text = text.replace(hit, chr(int('0x'+hit[1:], 16)))
return text
def _get_directive(self): # import a directive
args = self._line.split()
if args[0] == '##sequence-region':
if len(args) != 4: raise IOError('invalid sequence-region directive [line: {0}]'.format(self._lineno))
seqid, start, end = args[1:]
seqid = self._process_text(seqid)
start = int(start) - 1
end = int(end) - 1
except ValueError: raise IOError('invalid sequence-region directive [line: {0}]'.format(self._lineno))
if start < 0 or end < start: raise IOError('invalid sequence-region directive [line: {0}]'.format(self._lineno))
if seqid in self._regions: raise IOError('invalid sequence-region directive (seqid already defined) [line: {0}]'.format(self._lineno))
self._regions[seqid] = start, end
elif args[0] == '##feature-ontology':
if len(args) != 2: raise IOError('invalid feature-ontology directive [line: {0}]'.format(self._lineno))
elif args[0] == '##attribute-ontology':
if len(args) != 2: raise IOError('invalid attribute-ontology directive [line: {0}]'.format(self._lineno))
elif args[0] == '##source-ontology':
if len(args) != 2: raise IOError('invalid source-ontology directive [line: {0}]'.format(self._lineno))
elif args[0] == '##species':
if self._species is not None: raise IOError('species directive repeateError(d [line: {0}]'.format(self._lineno))
self._species = ' '.join(map(self._process_text, args[1:]))
elif args[0] == '##genome-build':
if len(args) != 3: raise IOError('invalid genome-build directive [line: {0}]'.format(self._lineno))
if self._genome_build is not None: raise IOError('genome-build directive repeated [line: {0}]'.format(self._lineno))
self._genome_build = tuple(map(self._process_text, args[1:]))
raise IOError('invalid directive: {0} [line: {1}]'.format(args[0], self._lineno))
def _get_annotation(self): # import an annotation feature
# separate the 9 columns
bits = self._line.rstrip().split('\t')
if len(bits) != 9: raise IOError('invalid annotation line [line: {0}]'.format(self._lineno))
# seqid
if bits[0] == '.': raise IOError('invalid annotation line (seqid cannot be missing) [line: {0}]'.format(self._lineno))
else: seqid = self._process_text(bits[0])
# source
if bits[1] == '.': source = None
else: source = self._process_text(bits[1])
# type
if bits[2] == '.': raise IOError('invalid annotation line: type must be defined [line: {0}]'.format(self._lineno))
type_ = self._process_text(bits[2])
type_ = _SOFA.get(type_, type_)
# start and stop
if bits[3] == '.': raise IOError('invalid annotation line: start must be defined [line: {0}]'.format(self._lineno))
if bits[4] == '.': raise IOError('invalid annotation line: stop must be defined [line: {0}]'.format(self._lineno))
start = int(bits[3]) - 1
stop = int(bits[4]) - 1
except ValueError: raise IOError('invalid feature positions [line: {0}]'.format(self._lineno))
# positions are checked later (after Is_circular can be detected)
# score
if bits[5] == '.': score = None
try: score = float(bits[5])
except ValueError: raise IOError('invalid feature score [line: {0}]'.format(self._lineno))
# strand
strand = bits[6]
if strand not in '+-.?': raise IOError('invalid feature strand [line: {0}]'.format(self._lineno))
if strand == '.': strand = None
# phase
phase = bits[7]
if phase not in '.012': raise IOError('invalid feature phase [line: {0}]'.format(self._lineno))
if type_ == 'CDS':
if phase == '.':
if self._strict: raise IOError('invalid feature: phase must be defined for CDS [line: {0}]'.format(self._lineno))
phase = int(phase)
if phase != '.' and type_ not in ['start_codon', 'stop_codon']:
raise IOError('invalid feature: phase must only be defined for CDS [line: {0}]'.format(self._lineno))
if phase == '.': phase = None
# attributes
ID = None
Name = None
Alias = None
Parent = None
Target = None
Gap = None
Note = None
Dbxref = None
Ontology_term = None
Derives_from = None
Is_circular = None
attributes = {}
attrs = bits[8].split(';')
for attr in attrs:
if attr.count('=') != 1: raise IOError('invalid attribute: {0} [line: {1}]'.format(attr, self._lineno))
key, value = attr.split('=')
if key[0].isupper():
if key == 'ID':
if ID is not None: raise IOError('invalid attribute: {0} (ID defined more than once) [line: {1}]'.format(attr, self._lineno))
ID = self._process_text(value)
elif key == 'Name':
if Name is not None: raise IOError('invalid attribute: {0} (Name defined more than once) [line: {1}]'.format(attr, self._lineno))
Name = self._process_text(value)
elif key == 'Alias':
if Alias is not None: raise IOError('invalid attribute: {0} (Alias defined more than once) [line: {1}]'.format(attr, self._lineno))
Alias = tuple(map(self._process_text, value.split(',')))
elif key == 'Parent':
if Parent is not None: raise IOError('invalid attribute: {0} (Parent defined more than once) [line: {1}]'.format(attr, self._lineno))
Parent = tuple(map(self._process_text, value.split(',')))
elif key == 'Target':
if Target is not None: raise IOError('invalid attribute: {0} (Target defined more than once) [line: {1}]'.format(attr, self._lineno))
bits = value.split()
if len(bits) < 3 or len(bits) > 4:
raise IOError('invalid attribute: {0} [line: {1}]'.format(attr, self._lineno))
target_id = self._process_text(bits[0])
target_start = int(bits[1]) - 1
target_end = int(bits[2]) - 1
except ValueError: raise IOError('invalid attribute: {0} (invalid bounds) [line: {1}]'.format(attr, self._lineno))
if target_start < 0 or target_end < target_start:
raise IOError('invalid attribute: {0} (invalid bounds) [line: {1}]'.format(attr, self._lineno))
if len(bits) == 3:
target_strand = '.'
if bits[3] not in '+-': raise IOError('invalid attribute: {0} [line: {1}]'.format(attr, self._lineno))
target_strand = bits[3]
Target = (target_id, target_start, target_end, target_strand)
elif key == 'Gap':
if Gap is not None: raise IOError('invalid attribute: {0} (Gap defined more than once) [line: {1}]'.format(attr, self._lineno))
bits = value.split()
for bit in bits:
if re.match('[MIDFR]\d+$', bit) is None:
raise IOError('invalid attribute: {0} (invalid CIGAR string) [line: {1}]'.format(attr, self._lineno))
Gap = value
elif key == 'Derives_from':
if Derives_from is not None: raise IOError('invalid attribute: {0} (Derives_from defined more than once) [line: {1}]'.format(attr, self._lineno))
Derives_from = self._process_text(value)
elif key == 'Note':
if Note is not None: raise IOError('invalid attribute: {0} (Note defined more than once) [line: {1}]'.format(attr, self._lineno))
Note = tuple(map(self._process_text, value.split(',')))
elif key == 'Dbxref':
if Dbxref is not None: raise IOError('invalid attribute: {0} (Dbxref defined more than once) [line: {1}]'.format(attr, self._lineno))
Dbxref = []
for v in value.split(','):
v = self._process_text(v)
if v.count(':') != 1: raise IOError('invalid attribute: {0} [line: {1}]'.format(attr, self._lineno))
elif key == 'Ontology_term':
if Ontology_term is not None: raise IOError('invalid attribute: {0} (Ontology_term defined more than once) [line: {1}]'.format(attr, self._lineno))
Ontology_term = []
for v in value.split(','):
v = self._process_text(v)
if v.count(':') != 1: raise IOError('invalid attribute: {0} [line: {1}]'.format(attr, self._lineno))
elif key == 'Is_circular':
if Is_circular is not None: raise IOError('invalid attribute: {0} (Is_circular defined more than once) [line: {1}]'.format(attr, self._lineno))
if value.lower() == 'true': Is_circular = True
elif value.lower() == 'false': Is_circular = False
else: raise IOError('invalid attribute: {0} [line: {1}]'.format(key, self._lineno))
raise IOError('invalid attribute name: {0} [line: {1}]'.format(key, self._lineno))
if key in attributes: raise IOError('invalid attribute: {0} ({1} defined more than once) [line: {1}]'.format(attr, key, self._lineno))
attributes[key] = self._process_text(value)
# default value Is_circular
if Is_circular is None: Is_circular = False
# check bounds
if stop < start: raise IOError('invalid feature positions [line: {0}]'.format(self._lineno))
if seqid in self._regions:
if start < self._regions[seqid][0]: raise IOError('invalid feature positions [line: {0}]'.format(self._lineno))
if stop > self._regions[seqid][1] and not Is_circular: raise IOError('invalid feature positions [line: {0}]'.format(self._lineno))
if ID is not None:
# check if new segment of an open feature
if ID in self._open_features:
f = self._open_features[ID]
if seqid is not None and seqid != f._seqid: raise IOError('invalid feature: mismatch of `seqid` between feature segments [line: {0}]'.format(self._lineno))
if type_ is not None and type_ != f._type: raise IOError('invalid feature: mismatch of `type` between feature segments [line: {0}]'.format(self._lineno))
if score is not None and score != f._score: raise IOError('invalid feature: mismatch of `score` between feature segments [line: {0}]'.format(self._lineno))
if strand is not None and strand != f._strand: raise IOError('invalid feature: mismatch of `strand` between feature segments [line: {0}]'.format(self._lineno))
for k in attributes:
if k not in f.attributes or attributes[k] != f._attributes[k]:
if k in ['exon_number', 'exon_id']:
pass # ignore exon_number qualifier at the level of segment
raise IOError('invalid feature: mismatch of `{0}` between feature segments [line: {1}]'.format(k, self._lineno))
f._add_segment(start, stop, phase)
# check that ID not duplicated
if ID in self._IDs: raise IOError('invalid feature: duplicated ID: {0} [line: {1}]'.format(ID, self._lineno))
# create new feature
f = Gff3Feature._make(seqid=seqid, source=source, type_=type_,
score=score, strand=strand, ID=ID, Name=Name, Alias=Alias,
Parent=Parent, Target=Target, Gap=Gap,
Derives_from=Derives_from, Note=Note, Dbxref=Dbxref,
Ontology_term=Ontology_term, Is_circular=Is_circular,
f._add_segment(start, stop, phase)
# connect to parents
parents = []
if Parent is not None:
for p in Parent:
if p not in self._open_features: raise IOError('invalid feature: invalid parent ID: {0} (non-existing or closed feature) [line: {1}]'.format(p, self._lineno))
derives = []
if Derives_from is not None:
for d in Derives_from:
if d not in self._open_features: raise IOError('invalid feature: invalid `Derives_from` ID: {0} (non-existing or closed feature) [line: {1}]'.format(d, self._lineno))
f._connect(parents, derives)
ultimate = []
for parent in parents: parent._ultimate_parents(ultimate)
for p in ultimate: p._all_parts.append(f)
# add feature
if ID is None: f._close()
else: self._open_features[ID] = f
self._num_tot += 1
if len(parents) == 0:
self._num_top += 1
if seqid not in self._seqid:
self._mapping[seqid] = []
def _get_sequences(self): # import fasta-formatted sequences
fp = _eggwrapper.FastaParser()
fp.open_file(self._f.name, alphabets.DNA._obj, self._pos)
self._f.seek(0, os.SEEK_END)
except AttributeError:
fp.set_string(self._f.read(), alphabets.DNA._obj)
obj = _eggwrapper.DataHolder()
fp.read_all(False, obj)
self._sequences = _interface.Container._create_from_data_holder(obj, alphabets.DNA)
def version(self):
"""GFF version."""
return self._version
def regions(self):
"""Dictionary with all sequence regions defined by directives."""
return self._regions
def feature_ontology(self):
"""Content of all feature-ontology directives."""
return self._feature_ontology
def attribute_ontology(self):
"""Content of all attribute-ontology directives."""
return self._attribute_ontology
def source_ontology(self):
"""Content of all source-ontology directives."""
return self._source_ontology
def species(self):
"""Species (if specified by a directive)."""
return self._species
def genome_build(self):
"""Genome build if specified by a directive."""
return self._genome_build
def sequences(self):
If specified, sequences as a :class:`.Container`. By default,
this value is ``None``.
return self._sequences
def num_seqid(self):
"""Number of seqid of features present in instance."""
return len(self._seqid)
def list_seqid(self):
"""List of seqid."""
return self._seqid
def num_top_features(self):
"""Number of features without parents."""
return self._num_top
def num_tot_features(self):
"""Total number of features."""
return self._num_tot
def __iter__(self):
return Iterator(self, None, None, None, False)
[docs] def iter_features(self, seqid=None, start=None, end=None, all_features=False):
Iterate over features.
:param seqid: only iterates overs features of this contig (by
default, consider all contigs). If the specified seqid is not
present in the GFF3 file, iteration is empty (without error).
:param start: start iteration at this position. By default,
start with the first position.
:param end: stop iteration at this position (does not include
features whose end position is larger than this value). By
default, process all features.
:param all_features: whether iteration should also include
features that are part of another (by default, only include
features that don't have a parent). A given feature may be
repeated if it has several unconnected parents.
.. note::
If *start* and/or *end* are specified, *seqid* is required.
return Iterator(self, seqid, start, end, all_features)
class Iterator(object):
def __init__(self, obj, seqid, start, end, all_features):
# check that bounds are used only for one seqid
if (start is not None or end is not None) and seqid is None:
raise ValueError('if start/end are specified, seqid must be specified')
if start is None: start = 0
if end is None: end = _eggwrapper.MAX
self._obj = obj
self._end = end
# get list of seqid to process
if seqid is None: self._seqid = obj._seqid
elif seqid not in obj._seqid: self._seqid = []
else: self._seqid = [seqid]
self._idx_seq = 0
if len(self._seqid) == 0:
self._next = self._nextD
# set to the start position
if start > 0:
if all_features:
for self._idx_top in range(len(obj._mapping[self._seqid[0]])):
for self._idx_part in range(len(obj._mapping[self._seqid[0]][self._idx_top].all_parts)):
if obj._mapping[self._seqid[0]][self._idx_top].all_parts[self._idx_part].start >= start: break
else: continue # no part has been found: check next top feature
break # a part has been found
else: # no features found after the start position
self._next = self._nextD
for self._idx_top in range(len(obj._mapping[self._seqid[0]])):
if obj._mapping[self._seqid[0]][self._idx_top].start >= start: break
else: # no features found after the start position
self._next = self._nextD
self._idx_top = 0
self._idx_part = 0
if all_features:
self._next = self._nextAF
self._next = self._nextTF
def __iter__(self):
return self
def __next__(self):
return self._next()
next = __next__
def _nextD(self):
raise StopIteration
def _nextAF(self):
if self._idx_seq == len(self._seqid): raise StopIteration
feats = self._obj._mapping[self._seqid[self._idx_seq]]
if self._idx_top == len(feats):
self._idx_seq += 1
self._idx_top = 0
return self._nextAF()
if feats[self._idx_top].start > self._end: raise StopIteration
if self._idx_part == len(feats[self._idx_top].all_parts):
self._idx_top += 1
self._idx_part = 0
return self._nextAF()
self._idx_part += 1
if feats[self._idx_top].all_parts[self._idx_part-1].end <= self._end:
return feats[self._idx_top].all_parts[self._idx_part-1]
return self._nextAF()
def _nextTF(self):
if self._idx_seq == len(self._seqid): raise StopIteration
feats = self._obj._mapping[self._seqid[self._idx_seq]]
if self._idx_top == len(feats):
self._idx_seq += 1
self._idx_top = 0
return self._nextTF()
if feats[self._idx_top].end > self._end: raise StopIteration # necessarily finished because there must be only one seqid
self._idx_top += 1
return feats[self._idx_top-1]
[docs]class Gff3Feature(object):
Provide access to data of a feature. This class cannot be instanciated
by the user.
def __init__(self):
raise ValueError('Gff3Feature instances cannot be created directly')
def _make(cls, seqid, source, type_, score, strand, ID, Name,
Alias, Parent, Target, Gap, Derives_from, Note,
Dbxref, Ontology_term, Is_circular, **attrs):
obj = Gff3Feature.__new__(cls)
obj._seqid = seqid
obj._type = type_
obj._score = score
obj._source = source
obj._strand = strand
obj._ID = ID
obj._attributes = {'ID': ID, 'Name': Name, 'Alias': Alias,
'Parent': Parent, 'Target': Target, 'Gap': Gap,
'Derives_from': Derives_from, 'Note': Note,
'Dbxref': Dbxref, 'Ontology_term': Ontology_term,
'Is_circular': Is_circular}
obj._start = None
obj._end = None
obj._segments = []
obj._descendants = []
obj._all_parts = [obj]
obj._derivers = []
return obj
def seqid(self):
"""Seqid on which this feature is located."""
return self._seqid
def type(self):
"""Feature type."""
return self._type
def score(self):
"""Feature score."""
return self._score
def source(self):
"""Feature source."""
return self._source
def strand(self):
"""Strand on which the feature is located."""
return self._strand
def ID(self):
"""Feature identifier."""
return self._ID
def attributes(self):
Dictionary of attributes attached to the feature.
The list of attributes is:
* ID
* Name
* Alias
* Parent
* Target
* Gap
* Derives_from
* Note
* Dbxref
* Ontology_term
* Is_circular
* Other attributes as defined in the GFF3 file.
return self._attributes
def start(self):
"""Feature start position."""
return self._start
def end(self):
"""Feature end position."""
return self._end
def segments(self):
"""List of segments of the feature."""
return self._segments
def parents(self):
"""List of features parent to this one."""
return self._parents
def descendants(self):
"""List of features descending from this one."""
return self._descendants
def all_parts(self):
"""All parts of this feature."""
return self._all_parts
def derivers(self):
"""List of features deriving from this one."""
return self._derivers
def _add_segment(self, start, end, phase):
# Add a segment to the feature. Assume that start <= end. The
# segments can be loaded in any order. Segments are not allowed to
# overlap. They are sorted when _close() is called.
self._segments.append((start, end, phase))
def _connect(self, parents, derives):
# Connect this feature to parents or other features from which it
# derives.
self._parents = parents
self._derives_from = derives
for i in self._parents: i._descendants.append(self)
for i in self._derives_from: i._derivers.append(self)
def _ultimate_parents(self, dest):
# Return the list of ultimate parents.
if len(self._parents) == 0: dest.append(self)
for p in self._parents: p._ultimate_parents(dest)
def _close(self):
# Terminate loading of segments in the instance.
if len(self._segments) == 0: raise ValueError('feature without segments')
for i in range(1, len(self._segments)):
if self._segments[i-1][1] >= self._segments[i][0]: raise ValueError('feature segments are overlapping')
self._start = self._segments[0][0]
self._end = self._segments[-1][1]