"""
Copyright 2015-2023 Stephane De Mita, Mathieu Siol, Thomas Coudoux
This file is part of EggLib.
EggLib is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
EggLib is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with EggLib. If not, see <http://www.gnu.org/licenses/>.
"""
import os
from .. import eggwrapper as _eggwrapper
from .. import alphabets
from .. import _site
FIRST = _eggwrapper.FIRST
LAST = _eggwrapper.LAST
### NOTE: it is necessary to replace the position() method of _eggwrapper.VcfParser!!!
class _VcfParserBase(object):
def _f_position(self):
x = _eggwrapper.VcfParser.position(self._parser)
return -1 if x == _eggwrapper.BEFORE else x
def _position(self):
raise NotImplementedError
def __init__(self):
raise NotImplementedError
@staticmethod
def _mk_index_fname(fname):
basename, ext = os.path.splitext(fname)
if ext == '.vcf': return basename + '.vcfi'
else: return fname + '.vcfi'
def _set_params(self, threshold_PL, threshold_GL):
if threshold_PL is None and threshold_GL is None:
self._parser.set_threshold_PL(_eggwrapper.UNKNOWN)
self._parser.set_threshold_GL(_eggwrapper.UNKNOWN)
elif threshold_PL is not None and threshold_GL is None :
if threshold_PL < 1: raise ValueError('threshold_PL must be at least 1')
else: self._parser.set_threshold_PL(threshold_PL)
elif threshold_GL is not None and threshold_PL is None :
if threshold_GL < 1: raise ValueError('threshold_GL must be at least 1')
else: self._parser.set_threshold_GL(threshold_GL)
else:
raise ValueError('cannot use threshold_PL and threshold_GL at the same time')
file_format = property(lambda self: self._parser.file_format(), doc='File format specified in the header.')
num_info = property(lambda self: self._parser.num_info(), doc='Number of INFO definitions in the header.')
num_format = property(lambda self: self._parser.num_format(), doc='Number of FORMAT definitions in the header.')
num_filter = property(lambda self: self._parser.num_filter(), doc='Number of FILTER definitions in the header.')
num_alt = property(lambda self: self._parser.num_alt(), doc='Number of ALT definitions in the header.')
num_meta = property(lambda self: self._parser.num_meta(), doc='Number of META definitions in the header.')
num_samples = property(lambda self: self._parser.num_samples(), doc='Number of samples defined in the header.')
def get_sample(self, idx):
"""
Get the name of a sample read from the header. The index
must be smaller than :py:obj:`~.io.VcfParser.num_samples`.
"""
if idx < 0 or idx >= self._parser.num_samples(): raise IndexError('sample index out of range')
return self._parser.get_sample(idx)
def get_info(self, idx):
"""
Get an INFO definition from the header. The
index must be smaller than :py:obj:`.num_info`. Return a
dictionary containing the following data:
* ``"id"``: identifier string.
* ``"type"``: one of ``"Integer"``, ``"Float"``, ``"Flag"``,
``"Character"``, and ``"String"``.
* ``"description"``: description string.
* ``"number"``: expected number of items. Special values are
``None`` (if undefined), ``"NUM_GENOTYPES"`` (number matching
the number of genotypes for any particular variant),
``"NUM_ALTERNATE"`` (number matching the number of alternate
alleles for any particular variant), and ``"NUM_ALLELES"``
(number matching the number of alleles--including the
reference--for any particular variant).
* ``"extra"``: all extra qualifiers, presented as a list
of ``(key, value)`` :class:`tuple` instances.
"""
if idx < 0 or idx >= self._parser.num_info(): raise IndexError('INFO index out of range')
info = self._parser.get_info(idx)
n = info.get_number()
if n == _eggwrapper.UNKNOWN: n = None
elif n == _eggwrapper.NUM_ALTERNATE: n = 'NUM_ALTERNATE'
elif n == _eggwrapper.NUM_GENOTYPES: n = 'NUM_GENOTYPES'
elif n == _eggwrapper.NUM_POSSIBLE_ALLELES: n = 'NUM_ALLELES'
t = info.get_type()
if t == _eggwrapper.Info.Integer: t = 'Integer'
elif t == _eggwrapper.Info.Float: t = 'Float'
elif t == _eggwrapper.Info.Flag: t = 'Flag'
elif t == _eggwrapper.Info.Character: t = 'Character'
elif t == _eggwrapper.Info.String: t = 'String'
else: raise ValueError('invalid VCF metainformation type found')
return {
'id': info.get_ID(),
'description': info.get_description(),
'number': n,
'extra': [(info.get_extra_key(j), info.get_extra_value(j))
for j in range(info.get_num_extra())],
'type': t
}
def get_format(self, idx):
"""
Get a FORMAT definition from the header. The
index must be smaller than :py:obj:`~.io.VcfParser.num_format`. Return a
dictionary containing the following data:
* ``"id"``: identifier string.
* ``"type"``: one of ``"Integer"``, ``"Float"``, ``"Character"``,
and ``"String"``.
* ``"description"``: description string.
* ``"number"``: expected number of items. Special values are
``None`` (if undefined), ``"NUM_GENOTYPES"`` (number matching
the number of genotypes for any particular variant),
``"NUM_ALTERNATE"`` (number matching the number of alternate
alleles for any particular variant), and ``"NUM_ALLELES"``
(number matching the number of alleles--including the
reference--for any particular variant).
* ``"extra"``: all extra qualifiers, presented as a list
of ``(key, value)`` :class:`tuple` instances.
"""
if idx < 0 or idx >= self._parser.num_format(): raise IndexError('FORMAT index out of range')
format_ = self._parser.get_format(idx)
n = format_.get_number()
if n == _eggwrapper.UNKNOWN: n = None
elif n == _eggwrapper.NUM_ALTERNATE: n = 'NUM_ALTERNATE'
elif n == _eggwrapper.NUM_GENOTYPES: n = 'NUM_GENOTYPES'
elif n == _eggwrapper.NUM_POSSIBLE_ALLELES: n = 'NUM_ALLELES'
t = format_.get_type()
if t == _eggwrapper.Info.Integer: t = 'Integer'
elif t == _eggwrapper.Info.Float: t = 'Float'
elif t == _eggwrapper.Info.Character: t = 'Character'
elif t == _eggwrapper.Info.String: t = 'String'
else: raise ValueError('invalid VCF metainformation type found')
return {
'id': format_.get_ID(),
'description': format_.get_description(),
'number': n,
'extra': [(format_.get_extra_key(j), format_.get_extra_value(j))
for j in range(format_.get_num_extra())],
'type': t
}
def get_filter(self, idx):
"""
Get a FILTER definition from the header. The
index must be smaller than :py:obj:`~.io.VcfParser.num_filter`. Return a
dictionary containing the following data:
* ``"id"``: identifier string.
* ``"description"``: description string.
* ``"extra"``: all extra qualifiers, presented as a list
of ``(key, value)`` :class:`tuple` instances.
"""
if idx < 0 or idx >= self._parser.num_filter(): raise IndexError('FILTER index out of range')
filter_ = self._parser.get_filter(idx)
return {
'id': filter_.get_ID(),
'description': filter_.get_description(),
'extra': [(filter_.get_extra_key(j), filter_.get_extra_value(j))
for j in range(filter_.get_num_extra())]
}
def get_alt(self, idx):
"""
Get an ALT definition from the header. The
index must be smaller than :py:obj:`.num_alt`. Return a
dictionary containing the following data:
* ``"id"``: identifier string.
* ``"description"``: description string.
* ``"extra"s``: all extra qualifiers, presented as list
of ``(key, value)`` :class:`tuple` instances.
"""
if idx < 0 or idx >= self._parser.num_alt(): raise IndexError('ALT index out of range')
alt = self._parser.get_alt(idx)
return {
'id': alt.get_ID(),
'description': alt.get_description(),
'extra': [(alt.get_extra_key(j), alt.get_extra_value(j))
for j in range(alt.get_num_extra())]
}
def get_meta(self, idx):
"""
Get data for a given META field defined in the VCF header. The
index must be smaller than :py:obj:`~.io.VcfParser.num_meta`. Return a
:class:`tuple` containing the key and the value of the META
field.
"""
if idx < 0 or idx >= self._parser.num_meta(): raise IndexError('META index out of range')
meta = self._parser.get_meta(idx)
return ( meta.get_key(), meta.get_value() )
def get_variant(self):
"""
Get all data for the current variant.
Return an :class:`.io.VcfVariant` instance containing all data available
for the last variant processed by this instance. It is required
that a variant has been effectively processed.
"""
return VcfVariant._make(self)
def get_genotypes(self, start=0, stop=None, dest=None):
"""
Extract genotype data for the current site.
It is required that a variant has been effectively processed.
:param start: index of the first sample to process (at least
0 and smaller than the number of samples).
:param stop: sample index at which processing must be
stopped (this sample is not processed; at least equal
to *start* and smaller than the number of samples).
:param dest: a :class:`.Site`
instance that will be recycled and used to place results.
:return: A :class:`.Site` instance by default, or
``None`` if *dest* was specified.
"""
# check parser
if not self._parser.has_data(): raise ValueError('data must have been read from VCF parser')
if not self._parser.has_GT(): raise ValueError('`VcfParser` instance must have `GT` data')
pl = self._parser.ploidy()
if pl < 1: raise ValueError('GT ploidy is 0')
# check indexes
n = self._parser.num_samples()
if start < 0 or start >= n: raise IndexError('invalid start index')
if stop is None: stop = n
elif stop < start or stop > n: raise IndexError('invalid stop index')
# create a new site instance or recycle if needed
if dest is None: site = _site.Site()
else: site = dest
siteobj = site._obj
siteobj.reset()
site._alphabet = self._mk_alphabet()
site._position = self._parser.position()
# process
siteobj.process_vcf(self._parser, start, stop)
# create/recycle provided instance
if dest is None:
return site
def _mk_alphabet(self):
if self._parser.type_alleles() == 0:
return alphabets.DNA
if self._parser.type_alleles() == 1:
alph = alphabets.Alphabet('string', [], [], case_insensitive=True, name=None)
self._parser.set_alleles(alph._obj)
return alph
if self._parser.type_alleles() == 2 or self._parser.type_alleles() == 3:
alph = alphabets.Alphabet('custom', [], [], case_insensitive=False, name=None)
self._parser.set_alleles(alph._obj)
return alph
raise RuntimeError('unexpected value for `type_alleles` (internal error)')
[docs]class VcfStringParser(_VcfParserBase):
"""
Import Variant Call Format data from a string.
Alias of :class:`.io.VcfParser` processing strings instead of a file.
The constructor takes a string containing a VCF header (the first
line being the file format specification and the last line being the
header line ,starting with ``#CHROM``).
"""
def __init__(self, header, threshold_PL=None, threshold_GL=None):
string = header.strip()
self._parser = _eggwrapper.VcfParser()
self._position = self._f_position # substitution function (replace BEFORE by -1)
self._parser.read_header(string)
self._set_params(threshold_PL, threshold_GL)
[docs] def readline(self, string):
"""
Read one variant from a user-provided single line. The string
should contain a single line of VCF-formatted data (no header).
All field specifications and sample information should be
consistent with the information contained in the header that
has been provided when creating this instance.
:return: A :class:`tuple` with the chromosome name, the position, and
the number of alleles at the variant (as indicated in the VCF
line).
"""
self._parser.readline(string)
return self._parser.chromosome(), self._position(), self._parser.num_alternate() + 1
[docs]class VcfParser(_VcfParserBase):
"""
Import Variant Call Format data from a file. The VCF format
is designed to store data describing genomic variation in an
extremely flexible way.
See the `description of the VCF format. <https://github.com/samtools/hts-specs>`_
To parse VCF data stored in string, use :class:`.io.VcfStringParser`.
:param fname: name of a properly formatted VCF file. The header
section will be processed upon instance creation, and lines will
be read later, when the user iterates over the instance (or call
:meth:`~.io.VcfParser.readline`).
:param threshold_PL: call genotypes parameter.
This parameter controls how genotype calling (GT field) is
performed from the PL (phred-scaled genotype likelihood) field.
By default (``None``), this conversion is never performed. If
*threshold_PL* is specified, genotype calling is
only performed if GT is not available and PL is available. The
genotype with the lowest PL value is selected. The parameter gives the
minimum acceptable gap between the best genotype and the second one.
If the second genotype has a too good score, based on this parameter,
the genotype is called unknown. The parameter must be at least 1.
:class:`.io.VcfParser` instances are iterable. Everly loop
yields a ``(chromosome, position, num_all)``
:class:`tuple` that allows the user to determines if the variant is of
interest. Note that the position is considered as an index and therefore
has been decremented compared with the value found in the file.
If the variant is of interest, :class:`.io.VcfParser` instances provide methods to
extract all data for this variant. Iterating over VCF lines can be
performed manually using the method :meth:`~.io.VcfParser.readline`.
"""
def __init__(self, fname, threshold_PL=None, threshold_GL=None):
self._parser = _eggwrapper.VcfParser()
self._position = self._f_position # substitution function (replace BEFORE by -1)
if not isinstance(fname, str): raise TypeError('invalid fname: invalid type')
self._parser.open_file(fname)
self._fname = fname
self._set_params(threshold_PL, threshold_GL)
[docs] def close(self):
"""
Close file (if it is open)
"""
self._parser.reset()
@property
def good(self):
"""
Tell if the file is good for reading. ``False`` is the end of
the file has been reached.
"""
return self._parser.good()
@property
def currline(self):
""" Index of the current line of the VCF file. """
return self._parser.get_currline()
def __iter__(self):
return self
def __next__(self):
if not self._parser.good(): raise StopIteration
self._parser.read()
return self._parser.chromosome(), self._position(), self._parser.num_alternate() + 1
[docs] def readline(self):
"""
Read one variant.
Return a :class:`tuple` with the chromosome name, the position, and
the number of alleles at the variant (as indicated in the VCF
file).
Raise a :exc:`ValueError` if file is finished.
"""
if not self._parser.good(): raise ValueError('cannot read line')
self._parser.read()
return self._parser.chromosome(), self._position(), self._parser.num_alternate() + 1
[docs] def load_index(self, fname=None):
"""
Load an index file allowing fast navigation.
Index files allow fast navigation in VCF file regardless of
their size, allowing to move instantly to a determined variant
using its position or its index in file. Index files can be
created by :func:`.io.make_vcf_index`.
:param fname: name of the index file. By default, use the default
index file name, which is the name of the VCF file with the
".vcfi" extension (removing the original extension only if it
is ".vcf").
"""
if fname is None:
fname = self._mk_index_fname(self._fname)
if not os.path.exists(fname):
raise IOError('index file does not exist')
self._parser.get_index().load_data(self._parser, fname)
@property
def num_index(self):
"""
Number of indexed lines if the file index.
"""
return self._parser.get_index().num()
@property
def has_index(self):
"""
``True`` if an index file has been loaded.
"""
return self._parser.get_index().has_data()
[docs] def goto(self, contig, position=FIRST):
"""goto(self, contig, position=egglib.io.FIRST)
Move to an arbitrary position of the VCF file.
Requires an index (see :meth:`~.io.VcfParser.load_index`).
A :exc:`ValueError` is raised if the position cannot be found.
:param contig: contig name.
:param position: contig position. Use :py:obj:`.io.FIRST` for the
first available variant of the contig, :py:obj:`.io.LAST` for
the last, and -1 for the position immediately before the
first position (position 0 in VCF files).
"""
self._parser.get_index().go_to(contig, _eggwrapper.BEFORE if position == -1 else position)
[docs] def unread(self):
"""
Go back to the previous variant. No index is required, but only
allowed after reading one line (not allowed immediately after
creating instance or calling :meth:`~.io.VcfParser.rewind`).
"""
self._parser.unread()
[docs] def rewind(self):
"""
Reset the parser. Move back to the first variant.
No index is required.
"""
self._parser.rewind()
[docs] def slider(self, size, step, as_variants=False, start=0, stop=None, max_missing=0, allow_indel=False, allow_custom=False):
"""
Start a sliding window from the current position.
:param size: size of the sliding window (by default, in base pairs).
:param step: increment of the sliding window (by default, in base pairs).
:param as_variants: express size and step in number of variants instead of base pairs.
:param start: start position of the sliding window.
:param stop: stop position of the sliding window.
:param max_missing: maximum number of missing alleles (variants
exceeding this threshold are ignored).
.. warning::
Here, *max_missing* must be an absolute number of
samples.
:param allow_indel: include variants with alleles of variable size
:param allow_custom: include variants with custom alleles
:return: An :class:`.io.VcfSlidingWindow` instance.
.. versionchanged:: 3.2.0
*max_missing* is required to be an integer.
"""
if not self._parser.good(): raise ValueError('parser reached end of file')
if max_missing > self._parser.num_samples(): raise ValueError('invalid value for `max_missing`')
if not isinstance(max_missing, int): raise TypeError('`max_missing\' must be an integer')
obj = VcfSlidingWindow.__new__(VcfSlidingWindow)
obj._sld = _eggwrapper.VcfWindowSliderPerSite() if as_variants else _eggwrapper.VcfWindowSlider()
obj._wdw = VcfWindow.__new__(VcfWindow)
obj._wdw._sld = obj._sld
obj._wdw._parser = self
obj._parser = self._parser
if stop is None: stop = _eggwrapper.UNKNOWN
if start < 0: raise ValueError('invalid start value')
if stop < start: raise ValueError('invalid stop value')
ns = self._parser.num_samples()
mask = 3
if allow_indel: mask &= 2
if allow_custom: mask &= 1
obj._sld.setup(self._parser, size, step, start, stop, max_missing, mask)
return obj
[docs] def bed_slider(self, bed, max_missing=0, allow_indel=False, allow_custom=False):
"""
Perform a sliding window based on BED coordinates.
:param bed: an :class:`.io.BED` instance.
:param max_missing: maximum number of missing alleles.
.. warning::
Here, *max_missing* must be an absolute number of
samples.
:param allow_indel: include variants with alleles of varying size.
:param allow_custom: include variants with custom alleles.
:return: An :class:`.io.VcfWindow` instance.
.. versionchanged:: 3.2.0
*max_missing* is required to be an integer.
"""
if not self._parser.good(): raise ValueError('parser reached end of file')
if not isinstance(max_missing, int): raise TypeError('`max_missing\' must be an integer')
if max_missing > self._parser.num_samples(): raise ValueError('invalid value for `max_missing`')
obj = VcfSlidingWindow.__new__(VcfSlidingWindow)
obj._sld = _eggwrapper.VcfWindowBED()
obj._wdw = VcfWindow.__new__(VcfWindow)
obj._wdw._sld = obj._sld
obj._wdw._parser = self
obj._parser = self._parser
ns = self._parser.num_samples()
mask = 3
if allow_indel: mask &= 2
if allow_custom: mask &= 1
obj._sld.setup(self._parser, bed._obj, max_missing, mask)
obj._bed_reference = bed # keep a reference to BED slider to avoid intempestive garbage collection
return obj
[docs]def make_vcf_index(fname, outname=None):
"""
Create the index for a VCF file.
:param fname: name of a VCF file.
:param outname: name of the index file. By default, use a default
name. The default name is based on the VCF file name, stripping
the extension only if it is vcf and appending the vcfi
extension. Index files bearing this default name are
automatically loaded if the corresponding VCF file is opened.
"""
_parser = _eggwrapper.VcfParser()
_parser.open_file(fname)
if outname is None:
outname = VcfParser._mk_index_fname(fname)
_eggwrapper.make_vcf_index(_parser, outname)
del _parser
return outname
[docs]class VcfVariant(object):
"""
Represent a single VCF variant.
The user cannot create instances of this class himself
(instances are generated by :class:`.io.VcfParser` or :class:`.io.VcfStringParser`).
.. note::
The ``AA`` (ancestral allele), ``AN`` (allele number),
``AC`` (allele count), and ``AF`` (allele frequency) INFO fields
as well as the ``GT`` (deduced genotype) FORMAT are
automatically extracted if they are present in the the file and
if their definition matches the format specification (meaning
that they were not re-defined with different number/type) in
the header. If present, they are available through the dedicated
attributes :py:obj:`~.io.VcfVariant.AN`, :py:obj:`~.io.VcfVariant.AA`,
:py:obj:`~.io.VcfVariant.AC`, :py:obj:`~.io.VcfVariant.AF`,
:py:obj:`~.io.VcfVariant.GT`, :py:obj:`~.io.VcfVariant.ploidy`
and :py:obj:`~.io.VcfVariant.GT_phased`. However,
they are also available in the respective
:py:obj:`~.io.VcfVariant.info` and
:py:obj:`~.io.VcfVariant.samples` (sub)-dictionaries.
"""
# the following are provided for comparison to content of variant.alternate_types
alt_type_default = _eggwrapper.Default #: Explicit alternate allele (the string represents the nucleotide sequence of the allele).
alt_type_referred = _eggwrapper.Referred #: Alternate allele referring to a pre-defined allele (the string provides the ID of the allele).
alt_type_breakend = _eggwrapper.Breakend #: Alternate allele symbolizing a breakend (see VCF description for more details).
def __init__(self):
raise NotImplementedError('cannot create `VcfVariant` instance')
@staticmethod
def _filter_missing(v, type_):
if ((type_==int and v==_eggwrapper.MISSINGDATA) or
(type_==float and v==_eggwrapper.UNDEF) or
(type_==str and v[0]==_eggwrapper.MAXCHAR)): return None
else: return v
@classmethod
def _make(cls, parser):
if parser._parser.len_reference() == 0: raise ValueError('cannot generate `VcfVariant` instance: no VCF line has been parsed (or the length of the reference allele is null)')
obj = cls.__new__(cls)
# get chromosome
obj._chrom = parser._parser.chromosome()
if obj._chrom == '': obj._chrom = None
# get position
obj._pos = parser._position()
if obj._pos == _eggwrapper.MISSING: obj._pos = None
elif obj._pos == _eggwrapper.UNKNOWN: obj._pos = -1
# get IDs
obj._id = tuple(parser._parser.ID(i) for i in range(parser._parser.num_ID()))
# get reference+alternate alleles
obj._alleles = [parser._parser.reference()]
if obj._alleles[0] == '': obj._alleles[0] = None
obj._alternate_types = []
obj._num_alternate = parser._parser.num_alternate()
obj._num_alleles = obj._num_alternate + 1
for i in range(obj._num_alternate):
obj._alleles.append(parser._parser.alternate(i))
obj._alternate_types.append(parser._parser.alternate_type(i))
obj._alleles = tuple(obj._alleles)
obj._alternate_types = tuple(obj._alternate_types)
# get quality
obj._quality = parser._parser.quality()
if obj._quality == _eggwrapper.UNDEF: obj._quality = None
# get test info
n = parser._parser.num_failed_tests()
if n == _eggwrapper.UNKNOWN: obj._failed_tests = None
else: obj._failed_tests = tuple(parser._parser.failed_test(i) for i in range(n))
# get info for the whole variant (site)
obj._info = {}
for i in range(parser._parser.num_FlagInfo()):
item = parser._parser.FlagInfo(i)
obj._info[item.get_ID()] = ()
for i in range(parser._parser.num_IntegerInfo()):
item = parser._parser.IntegerInfo(i)
if item.get_expected_number() == 1:
obj._info[item.get_ID()] = cls._filter_missing(item.item(0), int)
else:
obj._info[item.get_ID()] = tuple(cls._filter_missing(item.item(j), int) for j in range(item.num_items()))
for i in range(parser._parser.num_FloatInfo()):
item = parser._parser.FloatInfo(i)
if item.get_expected_number() == 1:
obj._info[item.get_ID()] = cls._filter_missing(item.item(0), float)
else:
obj._info[item.get_ID()] = tuple(cls._filter_missing(item.item(j), float) for j in range(item.num_items()))
for i in range(parser._parser.num_CharacterInfo()):
item = parser._parser.CharacterInfo(i)
if item.get_expected_number() == 1:
obj._info[item.get_ID()] = cls._filter_missing(item.item(0), str)
else:
obj._info[item.get_ID()] = tuple(cls._filter_missing(item.item(j), str) for j in range(item.num_items()))
for i in range(parser._parser.num_StringInfo()):
item = parser._parser.StringInfo(i)
if item.get_expected_number() == 1:
obj._info[item.get_ID()] = cls._filter_missing(item.item(0), str)
else:
obj._info[item.get_ID()] = tuple(cls._filter_missing(item.item(j), str) for j in range(item.num_items()))
# get predefined AN/AA/AC/AF info fields if they are matching definition
if parser._parser.has_AN(): obj._AN = parser._parser.AN()
else: obj._AN = None
if parser._parser.has_AA(): obj._AA = parser._parser.AA_string()
else: obj._AA = None
if parser._parser.has_AC(): obj._AC = tuple(parser._parser.AC(i) for i in range(parser._parser.num_AC()))
else: obj._AC = None
if parser._parser.has_AF(): obj._AF = tuple(parser._parser.AF(i) for i in range(parser._parser.num_AF()))
else: obj._AF = None
# pre-process the format column to allow match the ID's to the proper accessor method of SampleInfo
fields = []
for i in range(parser._parser.num_fields()):
format_ = parser._parser.field(i)
type_ = format_.get_type()
if type_ == _eggwrapper.Info.String:
f1 = _eggwrapper.SampleInfo.num_StringItems
f2 = _eggwrapper.SampleInfo.StringItem
t = str
elif type_ == _eggwrapper.Info.Float:
f1 = _eggwrapper.SampleInfo.num_FloatItems
f2 = _eggwrapper.SampleInfo.FloatItem
t = float
elif type_ == _eggwrapper.Info.Integer:
f1 = _eggwrapper.SampleInfo.num_IntegerItems
f2 = _eggwrapper.SampleInfo.IntegerItem
t = int
elif type_ == _eggwrapper.Info.Character:
f1 = _eggwrapper.SampleInfo.num_CharacterItems
f2 = _eggwrapper.SampleInfo.CharacterItem
t = str
fields.append((
format_.get_ID(), # the ID
parser._parser.field_rank(i), # the index within range
f1, # SampleInfo's method to get number of items
f2, # SampleInfo's method to get a given item
t # type (required to convert missing data to None)
))
obj._format_fields = frozenset(i[0] for i in fields)
# get all SampleInfo data
obj._num_samples = parser._parser.num_samples()
obj._samples = []
for sam in range(obj._num_samples):
sample_info = parser._parser.sample_info(sam)
sample_data = {}
for ID, idx, f1, f2, t in fields:
sample_data[ID] = (
None if f1(sample_info, idx)==0 else
tuple(cls._filter_missing(f2(sample_info, idx, i), t) for i in range(f1(sample_info, idx))))
# the above line get all items for a given FORMAT (using method pointers)
obj._samples.append(sample_data)
# get ploidy/num genotpes
obj._ploidy = parser._parser.ploidy()
obj._num_genotypes = parser._parser.num_genotypes()
# get genotypes
if parser._parser.has_GT():
obj._gt = tuple(
tuple((None if parser._parser.GT(i,j) == _eggwrapper.UNKNOWN
else obj._alleles[parser._parser.GT(i,j)])
for j in range(obj._ploidy))
for i in range(obj._num_samples))
obj._gt_phased = tuple(parser._parser.GT_phased(i) for i in range(obj._num_samples))
obj._gt_field = []
gt_f = None
for i in range(obj._num_samples):
aa_f = []
for j in range(obj._ploidy):
if parser._parser.GT(i,j) == _eggwrapper.UNKNOWN:
aa_f = None
else:
aa_f.append(parser._parser.GT(i,j))
if aa_f is not None:
aa_fs = tuple(map(str, aa_f))
if not parser._parser.GT_phased(i):
gt_f = '/'.join(aa_fs)
else : gt_f = '|'.join(aa_fs)
else:
gt_f = '.'
obj._gt_field.append(gt_f)
obj._gt_field= tuple(obj._gt_field)
else:
obj._gt = None
obj._gt_phased = None
# get PL
if parser._parser.has_PL():
obj._pl = tuple(
tuple(parser._parser.PL(i, j) for j in range(obj._num_genotypes))
for i in range(obj._num_samples))
else:
obj._pl = None
# get GL
if parser._parser.has_GL():
obj._gl = tuple(
tuple(parser._parser.GL(i, j) for j in range(obj._num_genotypes))
for i in range(obj._num_samples))
else:
obj._gl = None
return obj
# block of accessors below: they don't DO anything (it's only doc who makes is cumbersome)
chromosome = property(lambda self: self._chrom, doc='Chromosome name. ``None`` if missing.')
position = property(lambda self: self._pos, doc='Position. Given as an index; first value is 0. ``None`` if missing.')
ID = property(lambda self: self._id, doc='Tuple containing all IDs.')
num_alleles = property(lambda self: len(self._alleles), doc='Number of alleles. The reference is always included.')
num_alternate = property(lambda self: self._num_alternate, doc='Number of alternate alleles. Equal to :py:obj:`~.io.VcfVariant.num_alleles` minus 1.')
alleles = property(lambda self: self._alleles, doc='Tuple of variant alleles. The first is the reference, which is not guaranteed to be present in samples.')
alternate_types = property(lambda self: self._alternate_types, doc="""Alternate alleles types, as a tuple.
One value is provided for each alternate allele. The provided values are
integers whose values should always be compared to class attributes
:py:obj:`~.io.VcfVariant.alt_type_default`, :py:obj:`~.io.VcfVariant.alt_type_referred` and
:py:obj:`~.io.VcfVariant.alt_type_breakend`, as in (for the type of the first alternate
allele)::
>>> type_ = variant.alternate_types[0]
>>> if type_ == variant.alt_type_default:
... allele = variant.allele(0)""")
quality = property(lambda self: self._quality, doc='Variant quality. ``None`` if missing.')
failed_tests = property(lambda self: self._failed_tests, doc='Filters at which this variant failed. Provided as a tuple or names, or ``None`` if no filters have been applied.')
AA = property(lambda self: self._AA, doc='Value of the AA info field. ``None`` if missing.')
AN = property(lambda self: self._AN, doc='Value of the AN info field. ``None`` if missing.')
AC = property(lambda self: self._AC, doc='Value of the AC info field. Provided as a tuple. ``None`` if missing.')
AF = property(lambda self: self._AF, doc='Value of the AF info field. Provided as a tuple. ``None`` if missing.')
info = property(lambda self: self._info, doc="""Dictionary of INFO
fields for this variant. Keys are ID of INFO fields available for this
variant, and values are always a :class:`tuple` of items, even if there is only
one item. For flag INFO types,
the value is always an empty :class:`!tuple`.""")
format_fields = property(lambda self: self._format_fields, doc='Frozenset of FORMAT fields ID\'s. The frozenset is empty if no sample data are available.')
num_samples = property(lambda self: self._num_samples, doc='Number of samples. Equivalent to ``len(variant.samples)``.')
samples = property(lambda self: self._samples, doc="""Information
for each sample. Empty list if no samples are
defined. Otherwise, the list contains one dictionary for each sample: keys of
these dictionaries are FORMAT fields identifiers (the keys are always the same as
the content of :py:obj:`~.io.VcfVariant.format_fields`), and their values are :class:`tuple` instances in
all cases.""")
ploidy = property(lambda self: self._ploidy, doc='Ploidy among genotypes. Always 2 if GT is not available.')
num_genotypes = property(lambda self: self._num_genotypes, doc='Number of genotypes.')
GT_phased = property(lambda self: self._gt_phased, doc='Tell if the genotype for each sample is phased. ``None`` if GT is not available.')
GT = property(lambda self: self._gt, doc="""Genotypes from GT
fields. Only if this format field is available. Provided as a :class:`tuple` of
sub-tuples. The number of sub-tuples is equal to the number of samples
(:py:obj:`~.io.VcfVariant.num_samples`). The number of items within each
sub-tuples is equal to the ploidy (:py:obj:`~.io.VcfVariant.ploidy`). These items are
allele expression (as found in :py:obj:`~.io.VcfVariant.alleles`), or ``None`` (for
missing values). This attribute is ``None`` if GT is not available.""")
GT_vcf = property(lambda self: self._gt_field, doc = """GT field as written in the VCF file""")
PL = property(lambda self: self._pl, doc = """PL values for all samples""")
GL = property(lambda self: self._gl, doc = """GL values for all samples""")
[docs]class VcfSlidingWindow(object):
"""
This class manages a sliding window on a VCF file.
This class cannot be instanciated directly: instances are
returned by the methods :meth:`~.io.VcfParser.slider` and
:meth:`~.io.VcfParser.bed_slider` of :class:`.io.VcfParser`
instances.
:class:`.io.VcfSlidingWindow` instances are iterable:
iteration steps return a common :class:`.io.VcfWindow` instance
which is updated at each iteration round.
"""
def __init__(self):
raise NotImplementedError('cannot create `VcfSlidingWindow` instance')
@property
def good(self):
"""
``True`` if there is still data to be processed.
"""
return self._sld.good()
def __iter__(self):
return self
def __next__(self):
if self._sld.good():
self._sld.next_window()
return self._wdw
else: raise StopIteration
next = __next__ # for python2
[docs]class VcfWindow(object):
"""
Provide access to sites of a window from a VCF file. The following
operations are supported by :class:`.io.VcfWindow` instances:
+-----------------------------+------------------------------------------------+
| Operation | Result |
+=============================+================================================+
| ``len(win)`` | number of sites in the window |
+-----------------------------+------------------------------------------------+
| ``win[i]`` | get a site as a :class:`.Site` instance |
+-----------------------------+------------------------------------------------+
| ``for site in win:`` | iterate over sites as :class:`.Site` instances |
+-----------------------------+------------------------------------------------+
"""
def __init__(self):
raise NotImplementedError('cannot create `VcfWindow` instance')
num_sites = property(lambda self: self._sld.num_sites(), doc='Number of sites in window.')
chromosome = property(lambda self: self._sld.chromosome(), doc='Name of the chromosome or contig.')
@property
def bounds(self):
"""Bounds of the window. The second bound is not included in the window."""
return self._sld.win_start(), self._sld.win_stop()
def __len__(self):
return self._sld.num_sites()
def __iter__(self):
site = self._sld.first_site()
while site is not None:
yield _site.Site._from_site_holder(site.site(), alphabets.Alphabet._make(site.alphabet()) if site.alphabet() else alphabets.DNA)
site = site.next()
def __getitem__(self, idx):
if idx < 0:
idx = self._sld.num_sites() + idx
if idx >= self._sld.num_sites() or idx < 0:
raise ValueError('invalid site index')
else:
site = self._sld.get_site(idx)
return _site.Site._from_site_holder(site.site(), alphabets.Alphabet._make(site.alphabet()) if site.alphabet() else alphabets.DNA)
[docs]class BED(object):
"""
Class holding BED (Browser Extensible Data) data.
:param fname: name of the BED-formatted input file. By default,
create an empty instance.
BED is a flexible format representing
genomic annotation. In the current implementation, only the
chromosome/scaffold, start, and end positions (the first three
fields of each lines, which are the only ones to be required) are
processed. If incorrectly formed fields are provided besides the
first three fields, the behaviour is undefined (that is, it is not
guaranteed that are exception is raised).
Supported operations:
+-------------------+----------------------------+-------+
| Operation | Result | Notes |
+===================+============================+=======+
| ``len(bed)`` | Number of annotation items | |
+-------------------+----------------------------+-------+
| ``bed[i]`` | Get an annotation | (1) |
+-------------------+----------------------------+-------+
| ``for i in bed:`` | Iterate over annotations | (1) |
+-------------------+----------------------------+-------+
Notes:
#. Annotation items are represented by dictionaries containing the
following keys: ``"chrom"``, ``"start"``, and ``"end"``.
"""
def __init__(self, fname=None):
self._obj = _eggwrapper.BedParser()
if fname is not None:
self._obj.get_bed_file(fname)
def __len__(self):
return self._obj.n_bed_data()
def __getitem__(self, i):
if i < 0:
i = self._obj.n_bed_data() + i
if i < 0 or i >= self._obj.n_bed_data():
raise IndexError('invalid BED index')
return {'chrom': self._obj.get_chrom(i),
'start': self._obj.get_start(i),
'end': self._obj.get_end(i)}
[docs] def extend(self, values):
"""
Append items from an iterable. All items of *values*
must be a sequence containing a
value for *chrom*, *start*, and *end* (in that order).
"""
for item in values:
self.append(* item)
[docs] def append(self, chrom, start, end):
"""
Add an item at the end of the BED data.
"""
self._obj.append(chrom, start, end)