Source code for egglib.tools._concat

"""
    Copyright 2008-2023 Stephane De Mita, Mathieu Siol

    This file is part of EggLib.

    EggLib is free software: you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
    the Free Software Foundation, either version 3 of the License, or
    (at your option) any later version.

    EggLib is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    GNU General Public License for more details.

    You should have received a copy of the GNU General Public License
    along with EggLib.  If not, see <http://www.gnu.org/licenses/>.
"""

from .. import _interface

[docs]def concat(*aligns, **kwargs): """ egglib.tools.concat(align1, align2, ..., spacer=0, ch='?', group_check=True, no_missing=False, ignore_names=False, dest=None) Concatenates sequence alignments. A unique :class:`.Align` is generated. All different sequences from all passed alignments are represented in the final alignment. Sequences whose name match are concatenated. In case several sequences have the same name in a given segment, the first one is considered and others are discarded. In case a sequence is missing for a particular segment, a stretch of non-varying characters is inserted to replace the unknown sequence. All options (excluding the alignements to be concatenated) must be specified as keyword arguments, otherwise they will be treated as part of the list of alignments. :param align1: :param align2: two or more :class:`!Align` instances (their order is used for concatenation). It is not allowed to specify them using the keyword syntax. All instances must be configured to use the same alphabet. :param spacer: length of unsequenced stretches (represented by non-varying characters) between concatenated alignments. If *spacer* is a positive integer, the length of all stretches will be identical. If *spacer* is an iterable containing integers, each specifying the interval between two consecutive alignments (if there are ``n`` alignments, *spacer* must be of length ``n-1``). :param ch: character to used for conserved stretches and for missing segments. This character must be valid for the alphabet considered. :param group_check: if ``True``, an exception will be raised in case of a mismatch between group labels of different sequence segments bearing the same name. Otherwise, the group labels of the first segment found will be used as group labels of the final sequence. :param no_missing: if ``True``, an exception will be raised in case the list of samples differs between :class:`!Align` instances. Then, the number of samples must always be the same and all samples must always be present (although it is possible that they consist in missing data only). Ignored if *ignore_names* is ``True``. :param ignore_names: don't consider sample names and concatenate sequences based on they order in the instance. If used, the value of the option *no_missing* is ignored and the number of samples is required to be constant over alignments. :param dest: an optional :class:`!Align` instance to recycle and to use to store the result. This instance is automatically reset, ignoring all data previously loaded. If this argument is not ``None``, the function returns nothing and the passed instance is modified. Allows to recycle the same object in intensive applications. :return: If *dest* is ``None``, a new :class:`!Align` instance. If *dest* is ``None``, this function returns ``None``. """ # import default value of options spacer = kwargs.get('spacer', 0) ch = kwargs.get('ch', '?') group_check = kwargs.get('group_check', True) no_missing = kwargs.get('no_missing', False) ignore_names = kwargs.get('ignore_names', False) dest = kwargs.get('dest', None) for key in kwargs: if key not in ['spacer', 'ch', 'group_check', 'no_missing', 'ignore_names', 'dest']: raise ValueError('invalid argument: `{0}`'.format(key)) # process arguments if len(aligns) == 0: raise ValueError('there must be at least one alignment') for aln in aligns: if not isinstance(aln, _interface.Align): raise TypeError('expect an Align instance') for aln in aligns[1:]: if aln._alphabet._obj != aligns[0]._alphabet._obj: raise ValueError('all alignments must have the same alphabet') nloc = len(aligns) if isinstance(spacer, int): if spacer < 0: raise ValueError('`spacer` argument must not be negative') spacer = [spacer] * (nloc -1) # supports nloc==0 elif min(spacer) < 0: raise ValueError('`spacer` argument must not be negative') elif len(spacer) == 0 and nloc == 0: pass elif len(spacer) != nloc-1: raise ValueError('`spacer` does not have the right number of items') spacer.append(0) # convenience to avoid having an "if" in the main loop if len(ch) != 1: raise ValueError('`ch` must be a single character') # get the total length ls = sum([aln.ls for aln in aligns]) + sum(spacer) # get the list of samples if not ignore_names: # get the list of names of each alignment names = list(map(_interface.Align.names, aligns)) # get the total list of names (as a dict without values) samples = dict.fromkeys(set().union(*names)) # number of samples ns = len(samples) # check that list is constant if requested if no_missing: for aln in aligns: if aln.ns != ns: raise ValueError('inconsistent list of samples') # get the index of each sample for name in samples: samples[name] = [] for aln, lnames in zip(aligns, names): if name in lnames: samples[name].append(lnames.index(name)) else: samples[name].append(None) else: ns = set([aln.ns for aln in aligns]) if len(ns) != 1: raise ValueError('inconsistent list of samples') ns = ns.pop() samples = dict([(i, [i]*nloc) for i in range(ns)]) # create or reset destination if dest is None: conc = _interface.Align(nsam=ns, nsit=0, alphabet=aligns[0]._alphabet) if not ignore_names: for i, v in enumerate(samples): conc.set_name(i, v) else: dest.reset() dest._alphabet = aligns[0]._alphabet conc = dest if ignore_names: for v in samples: conc.add_sample('', []) else: for v in samples: conc.add_sample(v, []) conc._obj.set_nsit_all(ls) # this doesn't initialize new values # set names names = sorted(samples) if not ignore_names: for i, name in enumerate(names): conc.set_name(i, name) # process groups group_mapping = {} for name in samples: groups = [aligns[i].get_sample(j).labels for (i,j) in enumerate(samples[name]) if j != None] if len(groups) > 0: if group_check: for i in range(len(groups)-1): if len(groups[0]) != len(groups[i+1]): raise ValueError('inconsistent labels') for a, b in zip(groups[0], groups[i+1]): if a != b: raise ValueError('inconsistent labels') group_mapping[name] = groups[0] else: group_mapping[name] = [] for idx, name in enumerate(names): conc._obj.set_nlabels(idx, len(group_mapping[name])) for i, g in enumerate(group_mapping[name]): conc._obj.set_label(idx, i, g) # process the sequences themselves curr = 0 for align_idx, (align, spc) in enumerate(zip(aligns, spacer)): # add sequence of this align + spacer ls = align.ls for main_idx, name in enumerate(names): sample_idx = samples[name][align_idx] if sample_idx != None: conc.get_sample(main_idx).sequence[curr:curr+ls] = align.get_sample(sample_idx).sequence conc.get_sample(main_idx).sequence[curr+ls:curr+ls+spc] = ch * spc else: conc.get_sample(main_idx).sequence[curr:curr+ls+spc] = ch * (ls+spc) curr += ls + spc # return if needed if dest is None: return conc