"""
Copyright 2008-2023 Stephane De Mita, Mathieu Siol
This file is part of EggLib.
EggLib is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
EggLib is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with EggLib. If not, see <http://www.gnu.org/licenses/>.
"""
import re, sys
from io import StringIO
_staden_table = str.maketrans(' -*', '?N-')
from .. import eggwrapper as _eggwrapper
from .. import _interface, alphabets
special_DNA = alphabets.Alphabet('char', alphabets.DNA.get_alleles()[0], alphabets.DNA.get_alleles()[1], case_insensitive=True, name='DNA with dot')
special_DNA.add_missing('.')
special_DOT = special_DNA._obj.get_code('.')
special_MISSING = special_DNA._obj.get_code('?')
MISSING = alphabets.DNA._obj.get_code('?')
[docs]def from_clustal(string, alphabet):
"""
Import a clustal-formatted alignment. The input format is the one
generated and used by `ClustalW <http://www.clustal.org/clustal2/>`_.
:param string: clustal-formatted sequence alignment.
:param alphabet: an :class:`.Alphabet` instance.
:return: A new :class:`.Align` instance.
"""
if not isinstance(alphabet._obj, _eggwrapper.CharAlphabet):
raise ValueError('invalid alphabet for parsing fasta data: {0}'.format(alphabet.name))
stream = StringIO(string)
# the first line must start by CLUSTAL W or CLUSTALW
line = stream.readline()
c = 1
if (line[:8] != 'CLUSTALW' and
line[:9] != 'CLUSTAL W' and
line.strip() != 'CLUSTAL'): raise ValueError('invalid clustalw string (line: {0})'.format(c))
# start reading blocks
cnt = _eggwrapper.DataHolder(False)
line = stream.readline()
# initialize names list
names = []
while True:
# skip empty lines
while line != '' and line.strip() == '':
line = stream.readline()
c += 1
# detect end of file
if line == '': break
# read a block
block_length = None
# read sequences
while True:
# conservation line
if line[0] == ' ':
if block_length is None: raise ValueError('invalid clustalw string (line: {0})'.format(c))
line = stream.readline()
c += 1
if line == '': break # end of file
if line.strip() != '': raise ValueError('invalid clustalw string (line: {0})'.format(c))
break
# reads sequence
bits = line.split()
# sequence line
if len(bits) < 2 or len(bits) > 3:
raise ValueError('invalid clustalw string (line: {0})'.format(c))
name = bits[0]
seq = bits[1]
if block_length is None:
block_length = len(seq)
elif block_length != len(seq):
raise ValueError('invalid clustalw string (line: {0})'.format(c))
# adds the sequence to the container (new sequence)
if name not in names:
pos = len(names)
cnt.set_nsam(pos + 1)
cnt.set_name(pos, name)
cnt.set_nsit_sample(pos, len(seq))
for i, v in enumerate(seq):
cnt.set_sample(pos, i, alphabet._obj.get_code(v))
names.append(name)
# adds the sequence (continuing old sequence)
else:
pos = names.index(name)
cur = cnt.get_nsit_sample(pos)
cnt.set_nsit_sample(pos, cur + len(seq))
for i, v in enumerate(seq):
cnt.set_sample(pos, cur+i, alphabet._obj.get_code(v))
if len(bits) == 3:
try:
i = int(bits[2])
except ValueError:
raise ValueError('invalid clustalw string (line: {0})'.format(c))
if cnt.get_nsit_sample(pos) != i:
raise ValueError('invalid clustalw string (line: {0})'.format(c))
# checks next line
line = stream.readline()
c += 1
# empty (or absent) conservation line is caught by this line
if line.strip() == '': break
if not cnt.is_equal(): raise ValueError('invalid clustalw string (unequal sequences)')
return _interface.Align._create_from_data_holder(cnt, alphabet)
[docs]def from_staden(string, keep_consensus=False):
"""
Import data from the Staden assembly package.
Process the output file of the GAP4 program of the
`Staden <http://staden.sourceforge.net/>`_ package.
:param string: input string.
:param keep_consensus: don't delete consensus sequence.
:return: An :class:`.Align` instance.
The input string should have been generated from a contig alignment by
the GAP4 contig editor, using the command "dump contig to file". The
sequence named ``CONSENSUS``, if present, is automatically removed
unless the option *keep_consensus* is used.
Staden's default convention is followed:
* ``-`` codes for an unknown base and is replaced by ``N``.
* ``*`` codes for an alignment gap and is replaced by ``-``.
* ``.`` represents the same sequence than the consensus at that
position.
* White space represents missing data and is replaced by ``?``.
"""
# get shift from the first CONSENSUS line
mo = re.search('( CONSENSUS +)[A-Za-z\-\*]', string)
if mo is None: raise ValueError('invalid staden contig dump file')
shift = len(mo.group(1))
# split lines and identify blocks (based on empty lines)
lines = string.splitlines()
empty_lines = [i for i, v in enumerate(lines) if v == '']
empty_lines.insert(0, -1) # emulate a white line immediately before first line
empty_lines.append(len(lines))
blocks = [lines[empty_lines[i]+2 : empty_lines[i+1]] for i in range(len(empty_lines) - 1)]
# +1 to read after each blank line
# +2 to skip the first line of each block
# initialize variables
align = _eggwrapper.DataHolder(False)
ns = 0
currpos = 0
IDs = []
# process all blocks
for block in blocks:
for line in block:
ID = line[:7].strip()
name = line[8:shift].strip()
sequence = line[shift:]
sequence = sequence.translate(_staden_table)
block_length = len(sequence)
if ID not in IDs:
ns += 1
align.set_nsam(ns)
align.set_name(ns-1, name)
align.set_nsit_sample(ns-1, currpos)
for i in range(currpos): align.set_sample(ns-1, i, special_MISSING)
pos = ns - 1
IDs.append(ID)
else:
pos = IDs.index(ID)
align.set_nsit_sample(pos, currpos + len(sequence))
for i, v in enumerate(sequence):
align.set_sample(pos, currpos + i, special_DNA._obj.get_code(v))
currpos += block_length
# equalize
for i in range(ns):
n = align.get_nsit_sample(i)
align.set_nsit_sample(i, currpos)
for j in range(n, currpos): align.set_sample(i, j, special_MISSING)
align.set_is_matrix(True)
# undot
idx = IDs.index('')
for i in range(ns):
if i != idx:
for j in range(currpos):
if align.get_sample(i, j) == special_DOT:
align.set_sample(i, j, align.get_sample(idx, j))
# remove consensus
if not keep_consensus:
align.del_sample(idx)
# return
return _interface.Align._create_from_data_holder(align, alphabets.DNA)
[docs]def from_genalys(string):
"""
Import Genalys data.
Genalys was a proprietary program to process sequencing reads, perform
assembly and detect polymorphism.
Convert Genalys-formatted sequence alignment files to fasta. This
function imports files generated through the option "Save SNPs" of
Genalys 2.8.
:param string: input data as a Genalys-formatted string.
:return: An :class:`.Align` instance.
"""
stream = StringIO(string)
insertions = []
flag = False
for line in stream:
line = line.split("\t")
if len(line) > 1 and line[0] == "Polymorphism":
flag = True
if len(line) > 1 and line[0] == "IN" and flag:
insertions.extend(line[1].split("/"))
if len(insertions) > 0:
tp = insertions[0].split("_")
if len(tp) == 1:
tp = tp[0].split(".")
if len(tp) == 1:
tp.append("1")
finsertions = [tp]
for i in insertions:
i = i.split("_")
if len(i) == 1:
tp = tp[0].split(".")
if len(tp) == 1:
i.append("1")
if i[0] != finsertions[-1][0]:
finsertions.append(i)
finsertions[-1][1] = i[1]
if len(insertions) > 0:
insertions = finsertions
stream.close()
stream = StringIO(string)
names = []
sequences = []
maxlen = 0
for line in stream:
line = line.split("\t")
if len(line) > 1:
bidon = re.match(".+\.ab1$", line[1])
if bidon != None:
names.append(line[1])
sequences.append("")
index = 6
for i in range(10):
if line[i] == "F" or line[i] == "R":
index = i+1
break
if line[index] != "":
beginning = int(line[index]) - 1
for i in insertions:
if int(i[0]) <= beginning:
beginning = beginning + int(i[1])
else:
break
for i in range(beginning):
sequences[-1]= sequences[-1] + "?"
sequences[-1] = sequences[-1] + line[-1].rstrip("\n")
if len(sequences[-1]) > maxlen:
maxlen = len(sequences[-1])
data = _eggwrapper.DataHolder(True)
data.set_nsam(len(sequences))
data.set_nsit_all(maxlen)
for i in range(len(sequences)):
data.set_name(i, names[i])
sequences[i] = sequences[i].replace("_", "-")
for j, v in enumerate(sequences[i]): data.set_sample(i, j, alphabets.DNA._obj.get_code(v))
for j in range(len(sequences[i]), maxlen): data.set_sample(i, j, MISSING)
return _interface.Align._create_from_data_holder(data, alphabets.DNA)
[docs]def get_fgenesh(string, locus='locus'):
"""
Import fgenesh data.
:param fname: a string containing fgenesh ouput.
:parma locus: locus name.
:return: A list of gene and CDS features
represented by dictionaries. Note that 5' partial features
might not be in the appropriate frame and that it can be
necessary to add a ``codon_start`` qualifier.
"""
# supports for mac/windows files
string = '\n'.join(string.splitlines())
# gets the feature table
try:
data_sub = string.split(' G Str Feature Start End Score ORF Len\n')[1].split('Predicted protein(s):\n')[0]
except IndexError:
raise ValueError('invalid fgenesh format')
data_sub = data_sub.split('\n\n')
# edit
del data_sub[-1]
data_sub[0]= '\n'.join(data_sub[0].split('\n')[1:])
# iteratively grabs the features
features = {}
for i in data_sub:
pos = []
start = 1
rank = '---'
strand = '---'
for j in i.split('\n'):
a = re.search(' ?[0-9]+ ([+|-]) (TSS|PolA) +([0-9]+)', j)
b = re.search(' ?([0-9]+) ([+|-]) + ([0-9])+ CDS(o|f|i|l) +([0-9]+) - +([0-9]+) +[-\.0-9]+ + ([0-9]+)', j)
if b:
if b.group(3) == "1":
if int(b.group(5)) == int(b.group(7)): start= 1
elif int(b.group(5)) == (int(b.group(7))-1): start= 2
elif int(b.group(5)) == (int(b.group(7))-2): start= 3
else: raise ValueError('invalid fgenesh format')
pos.append( [int(b.group(5))-1, int(b.group(6))-1 ] )
rank = b.group(1)
if b.group(2) == '+': strand = 'plus'
else: strand = 'minus'
features['cds'+rank] ={
'gene': locus+'_'+rank,
'strand': strand,
'pos': pos,
'type': 'CDS',
'note': 'fgenesh prediction'
}
features['gene'+rank] ={
'gene': locus+'_'+rank,
'strand': strand,
'pos': [[ pos[0][0], pos[-1][1] ]],
'type': 'gene',
'note': 'fgenesh prediction'
}
# gets the sequence section
try:
data_sub = string.split(' G Str Feature Start End Score ORF Len\n')[1].split('Predicted protein(s):\n')[1].split('>')
except IndexError:
raise ValueError('invalid fgenesh format')
del data_sub[0]
if ( (2*len(data_sub) != len(features)) and
(len(data_sub) != len(features)) ) : raise ValueError('invalid fgenesh format')
# returns the sequences as a table
return list([features[i] for i in features])