Source code for egglib.io._vcfcodon

"""
    Copyright 2025 Stephane De Mita, Mathieu Siol

    This file is part of EggLib.

    EggLib is free software: you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
    the Free Software Foundation, either version 3 of the License, or
    (at your option) any later version.

    EggLib is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    GNU General Public License for more details.

    You should have received a copy of the GNU General Public License
    along with EggLib.  If not, see <http://www.gnu.org/licenses/>.
"""

from ._vcftools import VCF
from ._gff3 import GFF3, Gff3Feature
from .._interface import Container
from .._site import Site
from .._freq import Freq
from ..tools import ReadingFrame, Translator
from ..alphabets import codons

[docs] class CodonVCF: """ Analysis of coding sites from VCF data. This class takes a VCF, which is required to be indexed, and the GFF annotation (in GFF3 format) of the reference sequence. It allows to iterate over coding sites of a given CDS feature (using :meth:`.iter_codons`) or extract the coding site which includes a random position (using :meth:`.from_position`). .. note:: Codons with at least one missing nucleotide are ignored altogether. :param vcf_name: name of an indexed VCF file. :param gff_name: name of a GFF3-fomatted annotation file corresponding to the reference of the VCF file. :param use_ref: assume positions not referred to in the VCF file are identical to the reference. :param ref: only if *use_ref* is ``True``: reference genome as a :class:`.Container` instance. By default, use the reference genome included in the GFF3. If no reference genome is provided and *use_ref* if ``True``, a ``ValueError`` is raised. .. versionadded:: 3.5 """ _complement = { 'A': 'T', 'C': 'G', 'G': 'C', 'T': 'A', 'R': 'Y', 'Y': 'R', 'S': 'S', 'W': 'W', 'K': 'M', 'M': 'K', 'B': 'V', 'D': 'H', 'H': 'D', 'V': 'B', 'N': 'N', '-': '-', '?': '?'} def __init__(self, vcf_name, gff_name, fill_with_ref=False, ref=None): # open requested files self._vcf = VCF(vcf_name) self._gff = GFF3(gff_name, strict=True) if not self._vcf.has_index: raise ValueError('VCF file must be indexed') # make a database of CDS features self._db = {} for feat in self._gff.iter_features(all_features=True): if feat.type == 'CDS': self._db[feat.ID] = feat # get the reference if fill_with_ref: if ref: if not isinstance(ref, Container): raise TypeError('`ref` must be a Container instance') self._ref = ref else: if self._gff.sequences is None: raise ValueError('a reference genome must be provided') self._ref = self._gff.sequences else: self._ref = None # initialize placeholders for reading frame information self._rf = None self._seqid = None self._ref_sample = None # utilities self._frq = Freq() self._tr = Translator(code=1)
[docs] def set_cds(self, ID, subset=None): """ Specify the CDS feature to consider. :param ID: name of a CDS feature of the annotation. :param subset: optional list of indexes of samples to include (by default, all samples are included). If the sample indexes are reshuffled or repeated, the exported sites will be affected accordingly. Samples must be understood as per the VCF file (e.g. individuals if the VCF contains diploid individuals). .. note:: The provided annotation ID must correspond to a *CDS* feature (and not a *gene* or a *mRNA*). """ if ID not in self._db: raise ValueError(f'cannot find CDS with ID: {ID}') feat = self._db[ID] segs = feat.segments if feat.strand == '-': # reverse exons and count positions from the end segs = [(feat.end-b, feat.end-a, c) for (a,b,c) in reversed(segs)] self._anchor = feat.end-1 else: segs = [(a,b,c) for (a,b,c) in segs] # deep copy in case the feature is used again self._anchor = None # validate list of samples self._subset = subset if subset is not None: for i in subset: if i < 0 or i >= self._vcf.num_samples: raise ValueError(f'sample index of out range: {i}') # ensure that exons are continuous (no partial exons expect 5' end of first and 3' end of last) codon_start = [1, 3, 2] cur = codon_start[segs[0][2]] - 1 + (segs[0][1] - segs[0][0]) segs[0] = (segs[0][0], segs[0][1], codon_start[segs[0][2]]) for i in range(1, len(segs)): if cur%3 != codon_start[segs[i][2]] - 1: raise ValueError(f'Mismatch between expected phase and value in GFF3. Exon {i} or {i+1} seems to be partial. Cannot proceed.') segs[i] = segs[i][:2] cur += segs[i][1] - segs[i][0] # create ReadingFrame self._rf = ReadingFrame(segs, keep_truncated=False) self._seqid = feat.seqid # detect reference if self._ref is None: self._ref_sample = None else: self._ref_sample = self._ref.find(feat.seqid) if self._ref_sample is None: raise ValueError(f'seqid {feat.seqid} not found in reference genome')
[docs] def iter_codons(self): """ Iterate over coding sites. Return an iterator over :class:`.CodingSite` objects representing all codon positions defined by the current CDS annotation. Codons positions which are incomplete in the feature due to missing 5' or 3' end are not included in the iteration. """ if self._rf is None: raise ValueError('a CDS feature must be specified first') for idx, bases in enumerate(self._rf.iter_codons()): yield self._get_site(bases, idx)
[docs] def from_position(self, pos): """ Return the :class:`.CodingSite` corresponding to the codon to which the passed position belongs to. """ if self._rf is None: raise ValueError('a CDS feature must be specified first') idx = self._rf.codon_index(pos if self._anchor is None else self._anchor-pos) if idx is None: return CodingSite(CodingSite.NCOD, None, None, None) bases = self._rf.codon_bases(idx) if None in bases: return CodingSite(CodingSite.NCOD, None, None, None) return self._get_site(bases, idx)
def _get_site(self, pos, codon_index): # revert positions if needed if self._anchor is not None: pos = [self._anchor-p for p in pos] # reverse the codon flag = 0 # flag telling which site(s) are available sites = [None] * 3 ns = set() for idx in range(3): if not self._vcf.goto(self._seqid, pos[idx]): continue # move forward until a type-0 variant is found, if any while self._vcf.get_allele_type() != 0 or self._vcf.get_pos() < pos[idx]: if not self._vcf.read(): break # if valid site if self._vcf.get_pos() == pos[idx] and self._vcf.get_allele_type() == 0: flag |= 1 << idx genotypes = self._vcf.get_genotypes() if self._subset is None: genotypes = [j for i in genotypes for j in i] else: genotypes = [j for i in self._subset for j in genotypes[i]] if self._anchor: sites[idx] = ['?' if i is None else self._complement[i] for i in genotypes] else: sites[idx] = ['?' if i is None else i for i in genotypes] ns.add(len(sites[idx])) # mismatch in sample size if len(ns) > 1: return CodingSite(CodingSite.MISM, self._seqid, codon_index, pos[1]) # unavailable positions if flag != 7: if flag == 0 or self._ref_sample is None: return CodingSite(CodingSite.UNAVAIL, self._seqid, codon_index, pos[1]) ns = ns.pop() if flag&1 == 0: try: b = self._ref_sample.sequence[pos[0]] except IndexError: raise ValueError('cannot find site in reference while filling a gap in VCF') sites[0] = [b] * ns if flag&2 == 0: try: b = self._ref_sample.sequence[pos[1]] except IndexError: raise ValueError('cannot find site in reference while filling a gap in VCF') sites[1] = [b] * ns if flag&4 == 0: try: b = self._ref_sample.sequence[pos[2]] except IndexError: raise ValueError('cannot find site in reference while filling a gap in VCF') sites[2] = [b] * ns # build site site = CodingSite(0, self._seqid, codon_index, pos[1]) site.extend(list(map(''.join, zip(sites[0], sites[1], sites[2])))) # analyse site self._frq.from_site(site) if self._frq.num_alleles > 2: site.set_bit(CodingSite.MMUT) aas = [self._tr.translate_codon(self._frq.allele(idx)) for idx in range(self._frq.num_alleles)] for i in range(self._frq.num_alleles): a1 = self._frq.allele(i) if aas[i] == '*': site.set_bit(CodingSite.STOP) else: for j in range(i+1, self._frq.num_alleles): a2 = self._frq.allele(j) if a1 != a2: site.set_bit(CodingSite.VAR) if (a1[0] != a2[0] and (a1[1] != a2[1] or a1[2] != a2[2])) or (a1[1] != a1[1] and a1[2] != a2[2]): site.set_bit(CodingSite.MHIT) if aas[j] == '*': continue if aas[i] == aas[j]: site.set_bit(CodingSite.SYN) else: site.set_bit(CodingSite.NSYN) # return site return site
[docs] class CodingSite(Site): """ Represent a coding site extracted from VCF data. Instances of this class are normally generated by :class:`CodonVCF` . .. versionadded:: 3.5 :ivar NCOD: requested position is not in coding region. Only if site is obtained from :meth:`.from_position`. If this flag is on, all the others are off. :ivar MISM: there is a mismatch in sample size (including mismatch in ploidy) between sites of the codon. If this flag is on, all the others are off. :ivar UNAVAIL: at least one of the three codon position is unavailable and there is not available reference genome. If this flag is on, all the others are off. :ivar STOP: a stop codon has been found in the site. The stop codon is not counted as missing data but is not considered for synonymous / nonsynonymous analysis either. However it is considered to determine if there is polymorphism, multiple hits and multiple mutations. :ivar MHIT: more than one of the three codon positions is variable. :ivar MMUT: there are more than two different codons. :ivar SYN: there is at least one synonymous change (excluding stop codons). :ivar NSYN: there is at least one non-synonymous change (excluding stop codons). :ivar VAR: there is at least one change (including stop codons). """ NCOD = 1 MISM = 2 UNAVAIL = 4 STOP = 8 MHIT = 16 MMUT = 32 SYN = 64 NSYN = 128 VAR = 256 def __init__(self, flag, chrom, idx, pos): super().__init__(alphabet=codons) self._flag = flag self._codon_index = idx if chrom is not None: self.chrom = chrom if pos is not None: self.position = pos
[docs] def set_bit(self, flag): """ Turn a flag on. For example, to specify the the site has a stop codon, use: ``cdn.set_bit(cdn.STOP)`` """ self._flag |= flag
@property def codon_index(self): """ Codon index. Index of the codon in the CDS specification. .. note:: Only complete codons are included in the numeration. """ return self._codon_index @property def flag(self): """ Flag value. """ return self._flag
[docs] def is_valid(self): """ Check if site is valid. ``True`` if the site is in coding region, has no mismatch, has all available position and doesn't contain a stop codon. """ return self._flag&(self.NCOD|self.MISM|self.UNAVAIL|self.STOP) == 0