"""
Copyright 2015-2021 Stephane De Mita, Mathieu Siol
This file is part of EggLib.
EggLib is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
EggLib is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with EggLib. If not, see <http://www.gnu.org/licenses/>.
"""
from .. import eggwrapper as _eggwrapper
from .. import stats, _interface
[docs]class ParamDict(object):
"""
:class:`dict`-like class managing parameters. Do most of what a
dictionary does except for adding and removing keys. The order of
parameters is fixed and consistent.
In addition to the methods documented below, :class:`.ParamDict`
instances support the following operations (where ``params`` in one
instance):
+------------------------+--------------------------------------------------------+
|Expression | Action |
+========================+========================================================+
|``len(params)`` | Number of parameters |
+------------------------+--------------------------------------------------------+
|``params[key]`` | Get the value of parameter ``key`` |
+------------------------+--------------------------------------------------------+
|``params[key] = value`` | Assign ``value`` to parameter ``key`` (see note below) |
+------------------------+--------------------------------------------------------+
|``for key in params`` | Same as ``for key in params.keys()`` |
+------------------------+--------------------------------------------------------+
|``reversed(params)`` | Reversed iterator |
+------------------------+--------------------------------------------------------+
|``key in params`` | Check if ``key`` in a parameter name |
+------------------------+--------------------------------------------------------+
|``str(params)`` | Representation of the instance as a :class:`dict` |
+------------------------+--------------------------------------------------------+
Note that the ``params[key] = value`` expression is straightforward
only for parameters that have a single value. For parameters
represented by a :class:`.ParamList` instance, this expression is
only supported if the right-hand operand is a sequence of matching
length (in order to set all values at once). For parameters
represented by a :class:`.ParamMatrix` instance, this expression is
only supported if the right-hand operand is another
:class:`.ParamMatrix` instance of matching dimension. For
``events``, this expression is not supported at all. In all cases
where ``params[key]`` returns :class:`.ParamList`,
:class:`.ParamMatrix` or a :class:`.EventList` instance, the
returned value can be modified using its own methods.
"""
_keys = ['num_pop', 'num_sites', 'recomb', 'theta', 'num_mut',
'mut_model', 'TPM_proba', 'TPM_param', 'num_alleles',
'rand_start', 'num_chrom', 'num_indiv', 'N', 'G', 's',
'site_pos', 'site_weight', 'migr_matrix', 'trans_matrix',
'events', 'max_iter']
# sub-lists with only parameters that can be changed between
# simulations
_keys_scalar = ['num_sites', 'recomb', 'theta', 'num_mut',
'mut_model', 'TPM_proba', 'TPM_param', 'num_alleles',
'rand_start']
_keys_list = ['num_chrom', 'num_indiv', 'N', 'G', 's', 'site_pos',
'site_weight']
_keys_matrix = ['migr_matrix', 'trans_matrix']
def __init__(self, npop):
if not isinstance(npop, int): raise TypeError('invalid type for `npop`')
if npop < 1: raise ValueError('invalid value for `npop`')
self._params = _eggwrapper.Params(npop, 0.0)
self._migr = self._params.M()
self._params.set_L(0)
self._npop = npop
self._num_chrom = ParamList(getter=self._params.get_n1,
setter=self._params.set_n1,
num_values=npop,
check_item=lambda x: x>=0)
self._num_indiv = ParamList(getter=self._params.get_n2,
setter=self._params.set_n2,
num_values=npop,
check_item=lambda x: x>=0)
self._N = ParamList(getter=self._params.get_N,
setter=self._params.set_N,
num_values=npop,
check_item=lambda x: x>0.0)
self._G = ParamList(getter=self._params.get_G,
setter=self._params.set_G,
num_values=npop,
check_item=lambda x: True)
self._s = ParamList(getter=self._params.get_s,
setter=self._params.set_s,
num_values=npop,
check_item=lambda x: x>=0.0 and x<=1.0)
self._site_pos = ParamList(getter=self._params.get_sitePos,
setter=self._params.set_sitePos,
num_values=0,
check_item=lambda x: x>=0.0 and x<=1.0)
self._site_weight = ParamList(getter=self._params.get_siteW,
setter=self._params.set_siteW,
num_values=0,
check_item=lambda x: x>0.0)
self._migr_matrix = ParamMatrix(getter=self._migr.get_pair,
setter=self._migr.set_pair,
num_values=npop,
check_item=lambda x:x>=0.0,
check_active=lambda: True,
set_active=None)
self._trans_matrix = ParamMatrix(getter=self._params.get_transW_pair,
setter=self._params.set_transW_pair,
num_values=2,
check_item=lambda x:x>0.0,
check_active=lambda: self._params.get_transW_matrix(),
set_active=self._params.set_transW_matrix)
self._events = EventList(npop=npop,
add=self._params.addChange,
clear=self._params.clearChanges)
[docs] def disable_trans_matrix(self):
"""
Disable transition weight matrix.
Disable the matrix of weights of transition between alleles (it
is disabled by default, and automatically activated i any value
is set). This sets all weights to 1.0.
"""
self._params.set_transW_matrix(False)
[docs] def get_values(self, other):
"""
Import parameter values.
Update the dictionary with values from another
:class:`.ParamDict` instance (with the same numbers of
populations and sites).
"""
if not isinstance(other, ParamDict): raise TypeError('`other` should be a `ParamDict` instance')
self['num_sites'] = other['num_sites']
self['num_alleles'] = other['num_alleles']
for k in self._keys:
if k not in ['num_pop', 'num_sites', 'num_alleles', 'events']: self[k] = other[k]
self._events.replace(other._events)
[docs] def summary(self):
"""
Parameter summary.
Return a string displaying all current values of parameters at
the level of the C++ simulator. Use for debugging only, as the
format is not guaranteed to be stable.
"""
return self._params.summary()
def _set(self, key, values): # alternative setter
if key in self._keys_scalar: self[key] = values
elif key in self._keys_list: self[key][values[0]] = values[1]
elif key in self._keys_matrix: self[key][values[0], values[1]] = values[2]
else: raise KeyError('this parameter cannot be modified between simulations')
[docs] def set_migr(self, value):
"""
Set migration rates.
Set all pairwise migration rates to ``value/(num_pop-1)``.
"""
if value < 0.0: raise ValueError('invalid value for `migr`')
self._migr.set_all(value)
[docs] def mk_structure(self, skip_indiv=False, outgroup_label=None):
"""
Create structure object.
Export a :class:`.Structure` instance containing the
structure information corresponding to currently loaded
simulation parameters.
:param skip_indiv: this argument determines whether the individuals
level should be skipped (if ``True``, alleles from each given
individual are treated as belonging to separate haploid individuals.
:param outgroup_label: this argument indicates the label of the outgroup
population. It must be passed as a string. Default value is None.
.. warning::
The individual level cannot be processed if the ploidy is
not constant (mixing sampled chromosomes and sampled
individuals).
:return: A new :class:`.Structure` instance.
"""
# process populations
struct_ing = {None: {}}
struct_out = {}
cnt_sam = 0
cnt_idv = 0
for k in range(self._npop):
# get list of sample indexes (as a list of tuples/per individual)
sam_start = cnt_sam
idv = [(sam_start+i*2, sam_start+i*2+1) for i in range(self._params.get_n2(k))]
sam_start += 2*len(idv)
idv.extend([(sam_start+i,) for i in range(self._params.get_n1(k))])
cnt_sam += 2*self._params.get_n2(k) + self._params.get_n1(k)
# process population
if skip_indiv:
idv = [(j,) for i in idv for j in i]
idv = dict(zip(map(str, range(cnt_idv, cnt_idv+len(idv))), idv))
if outgroup_label != None and k == int(outgroup_label):
struct_out = idv
else:
struct_ing[None][str(k)] = idv
cnt_idv += len(idv)
# process delayed samples
if self._params.nDSChanges() > 0:
for event in self._events:
if event['cat'] == 'sample':
k = event['label']
idv = [(cnt_sam+i*2, cnt_sam+i*2+1) for i in range(event['num_indiv'])]
idv.extend([(cnt_sam+i,) for i in range(event['num_chrom'])])
cnt_sam += 2 * event['num_indiv'] + event['num_chrom']
if skip_indiv: idv = [(j,) for i in idv for j in i]
idv = dict(zip(map(str, range(cnt_idv, cnt_idv+len(idv))), idv))
if outgroup_label != None and k == int(outgroup_label):
struct_out = idv
else:
if str(k) not in struct_ing[None]: struct_ing[None][str(k)] = {}
struct_ing[None][str(k)].update(idv)
cnt_idv += len(idv)
if len(struct_ing[None]) == 0: raise ValueError('cannot generate structure')
return _interface.struct_from_dict(struct_ing, struct_out)
####################################################################
# below: dict-like methods
####################################################################
[docs] def has_key(self, key):
"""
Tell if a parameter exists.
Return a boolean indicating if the passed name is one of the
parameters.
"""
return key in self._keys
[docs] def get(self, key, default=None):
"""
Get a parameter value.
Return the value for *key* if *key* is one of the parameters,
else return the value passed as *default* (by default,
``None``). This method therefore never raises a
:exc:`KeyError`.
"""
if key in self._keys: return self[key]
return default
def __iter__(self):
for key in self._keys: yield key
[docs] def keys(self):
"""
Iterator over the parameter names.
"""
for key in self._keys: yield key
[docs] def values(self):
"""
Iterator over the parameter values.
"""
for key in self._keys: yield self[key]
[docs] def items(self):
"""
Iterator over content of the instance. Each iteration round
yield a ``(key, value)`` parameter name and value pair.
"""
for key in self._keys: yield key, self[key]
[docs] def copy(self):
"""
Shallow copy.
Return a shallow copy of this instance. All modifications of the
values of either copy will modify the other one (even
modification of integer, float and string parameters).
"""
ret = ParamDict(self._npop)
ret._params = self._params
return ret
[docs] def update(self, other=None, **kwargs):
"""
Import parameter values.
Update the dictionary with the ``(key, value)`` parameter name
and value pairs from the object passed as *other*, overwriting
existing keys and raising a :exc:`KeyError`
exception if an unknown parameter name is met.
Accepts :class:`dict` instances, or else any iterable of
``(key, value)`` pairs. If keyword arguments are specified, the
instance is then updated with those, as in
``param_dict.update(theta=5.0, recomb=2.5)``.
"""
if isinstance(other, ParamDict):
self.get_values(other)
elif other is not None:
if 'num_sites' in other: self['num_sites'] = other['num_sites']
if 'num_alleles' in other: self['num_alleles'] = other['num_alleles']
try:
iter_ = other.items()
except AttributeError: iter_ = iter(other)
for k, v in iter_: self[k] = v
if 'num_sites' in kwargs:
self['num_sites'] = kwargs['num_sites']
del kwargs['num_sites']
if 'num_alleles' in kwargs:
self['num_alleles'] = kwargs['num_alleles']
del kwargs['num_alleles']
for k, v in kwargs.items(): self[k] = v
def __len__(self):
return len(self._keys)
def __reversed__(self):
for key in reversed(self._keys):
yield key
def __contains__(self, key):
return key in self._keys
def __str__(self):
return str(dict(self.items()))
def __getitem__(self, key):
if key == 'num_pop': return self._npop
elif key == 'num_sites': return self._params.get_L()
elif key == 'recomb': return self._params.get_R()
elif key == 'theta': return self._params.get_theta()
elif key == 'num_mut': return self._params.get_fixed()
elif key == 'mut_model':
model = self._params.get_mutmodel()
if model == _eggwrapper.Params.KAM: return 'KAM'
elif model == _eggwrapper.Params.IAM: return 'IAM'
elif model == _eggwrapper.Params.SMM: return 'SMM'
elif model == _eggwrapper.Params.TPM: return 'TPM'
elif key == 'TPM_proba': return self._params.get_TPMproba()
elif key == 'TPM_param': return self._params.get_TPMparam()
elif key == 'num_alleles': return self._params.get_K()
elif key == 'rand_start': return self._params.get_random_start_allele()
elif key == 'num_chrom': return self._num_chrom
elif key == 'num_indiv': return self._num_indiv
elif key == 'N': return self._N
elif key == 'G': return self._G
elif key == 's': return self._s
elif key == 'site_pos': return self._site_pos
elif key == 'site_weight': return self._site_weight
elif key == 'migr_matrix': return self._migr_matrix
elif key == 'trans_matrix': return self._trans_matrix
elif key == 'events': return self._events
elif key == 'max_iter': return self._params.get_max_iter()
else: raise KeyError('invalid parameter name: {0}'.format(key))
def __setitem__(self, key, value):
if key == 'num_pop': raise ValueError('the number of populations is read-only (it must be set at construction time)')
elif key == 'num_sites':
if value < 0: raise ValueError('invalid value for `{0}`'.format(key))
self._params.set_L(value)
self._site_pos._num_values = value
self._site_weight._num_values = value
if value > 0: self._params.autoSitePos()
elif key == 'recomb':
if value < 0.0: raise ValueError('invalid value for `{0}`'.format(key))
self._params.set_R(value)
elif key == 'theta':
if value < 0: raise ValueError('invalid value for `{0}`'.format(key))
if value > 0.0 and self._params.get_fixed() > 0.0: raise ValueError('it is not allowed to set both theta and the number of mutations to non-zero')
self._params.set_theta(value)
elif key == 'num_mut':
if value < 0: raise ValueError('invalid value for `{0}`'.format(key))
if value > 0.0 and self._params.get_theta() > 0.0: raise ValueError('it is not allowed to set both theta and the number of mutations to non-zero')
self._params.set_fixed(value)
elif key == 'mut_model':
if value == 'KAM': self._params.set_mutmodel(_eggwrapper.Params.KAM)
elif value == 'IAM': self._params.set_mutmodel(_eggwrapper.Params.IAM)
elif value == 'SMM': self._params.set_mutmodel(_eggwrapper.Params.SMM)
elif value == 'TPM': self._params.set_mutmodel(_eggwrapper.Params.TPM)
else: raise ValueError('invalid value for `{0}`'.format(key))
elif key == 'TPM_proba':
if value < 0 or value > 1: raise ValueError('invalid value for `{0}`'.format(key))
self._params.set_TPMproba(value)
elif key == 'TPM_param':
if value < 0 or value > 1: raise ValueError('invalid value for `{0}`'.format(key))
self._params.set_TPMparam(value)
elif key == 'num_alleles':
if value < 2: raise ValueError('invalid value for `{0}`'.format(key))
self._params.set_K(value)
self._trans_matrix._num_values = value
elif key == 'rand_start':
self._params.set_random_start_allele(value)
elif key == 'max_iter':
if value < 0: raise ValueError('maximum number of iterations cannot be negative')
self._params.set_max_iter(value)
elif key == 'num_chrom': self._num_chrom[:] = value
elif key == 'num_indiv': self._num_indiv[:] = value
elif key == 'N': self._N[:] = value
elif key == 'G': self._G[:] = value
elif key == 's': self._s[:] = value
elif key == 'site_pos': self._site_pos[:] = value
elif key == 'site_weight': self._site_weight[:] = value
elif key == 'migr_matrix': self._migr_matrix.get_values(value)
elif key == 'trans_matrix': self._trans_matrix.get_values(value)
elif key == 'events': raise ValueError('cannot replace the list of events as a whole')
else: raise KeyError('invalid parameter name: `{0}`'.format(key))
[docs] def add_event(self, cat, T, **params):
"""
Add an event to the list. See the documentation for the class
:class:`.Simulator` for more details on what is expected as
arguments.
:param cat: category of the event.
:param T: date of the event.
:param params: all needed parameters for this category of
events.
"""
self._events.add(cat, T, **params)
[docs]class ParamList(object):
"""
:class:`list`-like class managing values for a parameter. Do
most of what a list does except for adding and removing values. No
methods :meth:`!reverse` and :meth:`!sort` are provided either
as they make little sense. Expressions involving the ``+``
and ``*`` arithmetic operators are supported, but return genuine
:class:`list` instances that are disconnected from the original
parameter holder.
In addition to the methods documented below, :class:`.ParamList`
instances support the following operations (where ``items`` is a
:class:`.ParamList` instance):
+--------------------------+----------------------------------------------------------------+
|Expression | Action |
+==========================+================================================================+
|``len(items)`` | Number of items |
+--------------------------+----------------------------------------------------------------+
|``items[i]`` | Get *i*\ th item |
+--------------------------+----------------------------------------------------------------+
|``items[i:j]`` | Slice from *i* to *j* |
+--------------------------+----------------------------------------------------------------+
|``items[i:j:k]`` | Slice from *i* to *j* with step *k* |
+--------------------------+----------------------------------------------------------------+
|``items[i] = value`` | Assign ``value`` to parameter ``key`` |
+--------------------------+----------------------------------------------------------------+
|``items[i:j] = values`` | Replace a slice of values (with a sequence of the same length) |
+--------------------------+----------------------------------------------------------------+
|``items[i:j:k] = values`` | Replace a slice of values (with a sequence of the same length) |
+--------------------------+----------------------------------------------------------------+
|``for item in items`` | Iterates over items |
+--------------------------+----------------------------------------------------------------+
|``reversed(items)`` | Reversed iterator |
+--------------------------+----------------------------------------------------------------+
|``item in items`` | Check if ``item`` is present among items |
+--------------------------+----------------------------------------------------------------+
|``items + other`` | Same as ``list(items) + other`` (returns a :class:`list`) |
+--------------------------+----------------------------------------------------------------+
|``items * n`` | Same as ``list(items) * n`` (returns a :class:`list`) |
+--------------------------+----------------------------------------------------------------+
|``str(items)`` | Representation of the instance as a list |
+--------------------------+----------------------------------------------------------------+
The indexing operator (``[]``) supports negative indexes (to count
from the end) and slices operators, just as for the built-in type
:class:`list`. However, all operators (such as ``del``) or methods
(such as :meth:`!append`, :meth:`!extend`,
:meth:`!remove`) that would change the length of the list are
not available.
"""
def __init__(self, getter, setter, num_values, check_item):
self._getter = getter
self._setter = setter
self._num_values = num_values
self._check_item = check_item
def __getitem__(self, index):
if isinstance(index, slice):
return tuple(map(self.__getitem__, range(*index.indices(self._num_values))))
else:
if index < 0: index = self._num_values + index
if index >= self._num_values: raise ValueError('list index out of range')
return self._getter(index)
def __setitem__(self, index, value):
if isinstance(index, slice):
rng = list(range(*index.indices(self._num_values)))
if len(rng) != len(value): raise ValueError('number of values is required to match number of indexes for a slice assignment')
for i, v in zip(rng, value):
if self._check_item(v) == False: raise ValueError('value out of range')
self._setter(i, v)
else:
if index < 0: index = self._num_values + index
if index >= self._num_values: raise ValueError('list index out of range')
if self._check_item(value) == False: raise ValueError('value out of range')
self._setter(index, value)
def __iter__(self):
for i in range(self._num_values):
yield self._getter(i)
[docs] def count(self, value):
"""
Number of items that are equal to *value*.
"""
cnt = 0
for i in range(self._num_values):
if self._getter(i) == value: cnt += 1
return cnt
[docs] def index(self, value, imin=0, imax=None):
"""
Index of the first value matching *value*. The
returned value is ``>=imin`` and ``<imax`` if they are provided.
Raise a :exc:`ValueError` if the value is not found.
"""
if imax is None: imax = self._num_values
for i in range(imin, imax):
if self._getter(i) == value: return i
else:
raise ValueError('{0} is not in list'.format(value))
def __add__(self, other):
return list(self) + other
def __radd__(self, other):
return other + list(self)
def __mul__(self, num):
return list(self) * num
def __rmul__(self, num):
return num * list(self)
def __contains__(self, value):
for i in range(self._num_values):
if self._getter(i) == value: return True
else:
return False
def __len__(self):
return self._num_values
def __str__(self):
return str(list(self))
def __repr__(self):
if self._num_values <= 10: return str(list(self))
else: return '<list of {0} values>'.format(self._num_values)
def __reversed__(self):
for i in reversed(range(self._num_values)):
yield self._getter(i)
[docs]class ParamMatrix(object):
"""
:class:`list`-like class managing values for a parameter as a
matrix. Similar to :class:`.ParamList` except that it holds a
matrix. It therefore implements less methods (no support for ``+``
and ``*`` operators, no slices) and use double indexing system as in
``matrix[i, j] = x``, allowing both setting and getting values. The
iterator, such as in ``for i in matrix``, flattens the matrix,
returning all values from the first row, then those from the second
tow, and so on (replacing the diagonal values by ``None``).
``len(matrix)`` returns the dimension (number of rows, which is the
same as the number of columns).
In addition to the methods documented below, :class:`.ParamMatrix`
instances support the following operations (where ``matrix`` is a
:class:`.ParamMatrix` instance):
+--------------------------+----------------------------------------------------------------------+
|Expression | Action |
+==========================+======================================================================+
|``len(matrix)`` | Size of the matrix (as number of rows/colums) |
+--------------------------+----------------------------------------------------------------------+
|``matrix[i,j]`` | Get *j*\ th item of the *i*\ th row (``None`` for diagonal) |
+--------------------------+----------------------------------------------------------------------+
|``matrix[i,j] = value`` | Assign ``value`` to *(i,j)*\ th item (no diagonal) |
+--------------------------+----------------------------------------------------------------------+
|``for item in matrix`` | Iterates over items (row-first *flat* iteration) |
+--------------------------+----------------------------------------------------------------------+
|``reversed(matrix)`` | Reversed iterator |
+--------------------------+----------------------------------------------------------------------+
|``item in matrix`` | Check if ``item`` is present among items |
+--------------------------+----------------------------------------------------------------------+
|``str(matrix)`` | Representation of the instance as a nested list (including diagonal) |
+--------------------------+----------------------------------------------------------------------+
It is not possible to slice-set ranges of values in the matrix, but
one can set all values at once from a compatible source with
:meth:`~.ParamMatrix.get_values`.
"""
def __init__(self, getter, setter, num_values, check_item, check_active, set_active):
self._getter = getter
self._setter = setter
self._num_values = num_values
self._check_item = check_item
self._check_active = check_active
self._set_active = set_active
[docs] def get_values(self, other):
"""
Import parameter values.
Get all values from *other*, which must be another
:class:`.ParamMatrix` or a nested sequence (such as a
:class:`list` or :class:`list`). The dimension of *other* must
be the same as the current instance. Not that in the input
object is not a :class:`.ParamMatrix`, any value it can have on
the diagonal will be ignored (but they must be present to avoid
shifting indexes).
"""
if isinstance(other, ParamMatrix):
if other._num_values != self._num_values: raise ValueError('`other` must have the same dimension')
if not other._check_active():
if self._check_active(): self._set_active(False)
return
if not self._check_active():
self._set_active(True)
for i in range(self._num_values):
for j in range(self._num_values):
if i != j: self._setter(i, j, other._getter(i, j))
# use of other._getter requires that number has been checked
else:
if not self._check_active(): self._set_active(True)
if len(other) != self._num_values: raise ValueError('`other` must have the same dimension')
for i, row in enumerate(other):
if len(row) != self._num_values: raise ValueError('`other` must have the same dimension')
for j, value in enumerate(row):
if i != j:
if self._check_item(value) == False: raise ValueError('value out of range')
self._setter(i, j, value)
def __getitem__(self, idx):
if len(idx) != 2: raise ValueError('expect a tuple of two values')
if idx[0] < 0: idx[0] = self._num_values + idx[0]
if idx[1] < 0: idx[1] = self._num_values + idx[1]
if idx[0] >= self._num_values: raise ValueError('matrix index out of range')
if idx[1] >= self._num_values: raise ValueError('matrix index out of range')
if idx[0] == idx[1]: return None
return self._getter(idx[0], idx[1])
def __setitem__(self, idx, value):
if len(idx) != 2: raise ValueError('expect a tuple of two values')
if idx[0] < 0: idx[0] = self._num_values + idx[0]
if idx[1] < 0: idx[1] = self._num_values + idx[1]
if idx[0] >= self._num_values: raise ValueError('matrix index out of range')
if idx[1] >= self._num_values: raise ValueError('matrix index out of range')
if idx[0] == idx[1]:
if value is not None: raise ValueError('cannot set matrix diagonal to a value different than `None`')
else: return
if self._check_item(value) == False: raise ValueError('value out of range')
if not self._check_active(): self._set_active(True)
self._setter(idx[0], idx[1], value)
def __iter__(self):
for i in range(self._num_values):
for j in range(self._num_values):
if i == j: yield None
else: yield self._getter(i, j)
[docs] def count(self, value):
"""
Number of items that are equal to *value*.
"""
cnt = 0
for i in range(self._num_values):
for j in range(self._num_values):
if i!=j and self._getter(i, j) == value: cnt += 1
return cnt
[docs] def index(self, value, imin=0, imax=None, jmin=0, jmax=None):
"""
Get the index of a given value.
Return the ``(i, j)`` tuple of indexes of the first value
(iterating first over rows and then over columns) matching
*value*. The returned row index is >=imin and <imax and the
returns column index is >=jmin and <jmax if they are provided.
Raise a :exc:`ValueError` if the value is not found.
The diagonal is not considered.
"""
if imax is None: imax = self._num_values
if jmax is None: jmax = self._num_values
for i in range(imin, imax):
for j in range(jmin, jmax):
if i!=j and self._getter(i, j) == value: return (i, j)
else:
raise ValueError('{0} is not in matrix'.format(value))
def __contains__(self, value):
for i in range(self._num_values):
for j in range(self._num_values):
if i!=j and self._getter(i, j) == value: return True
else:
return False
def __len__(self):
return self._num_values
def __str__(self):
return str([[self[i,j] for j in range(self._num_values)] for i in range(self._num_values)])
def __repr__(self):
if self._num_values <= 4: return str([[self[i,j] for j in range(self._num_values)] for i in range(self._num_values)])
else: return '<matrix of {0}*{0} values>'.format(self._num_values)
def __reversed__(self):
for i in reversed(range(self._num_values)):
for j in reversed(range(self._num_values)):
if i == j: yield None
else: yield self._getter(i, j)
[docs]class EventList(object):
"""
Class storing the list of demographic events. Even if events appear
as unsorted, they are internally sorted so that the user does not
have to care about their order. This class has limited
functionality: add events (but not removing them), accessing and
modifying parameters.
In addition to the methods documented below, :class:`.EventList`
instances support the following operations (where ``events`` is an
:class:`.EventList` instance):
+--------------------------+----------------------------------------------------------------------+
|Expression | Action |
+==========================+======================================================================+
| ``len(events)`` | Number of events currently loaded |
+--------------------------+----------------------------------------------------------------------+
| ``events[i]`` | Dctionary with parameters of the *i*\ th event (see note) |
+--------------------------+----------------------------------------------------------------------+
| ``for event in events`` | Same as ``for i in range(len(events)): event = events[i]`` |
+--------------------------+----------------------------------------------------------------------+
| ``str(events)``` | Representation of the instance, roughly as a list |
+--------------------------+----------------------------------------------------------------------+
The string representation is such as a string at the first level,
but each event is represented as an angle-bracketed delimited string
containing comma-separated ``key=value`` pairs with the parameter as
``key`` and its value as ``value`` (with an additional key,
``event_index``, providing the index of the event in the list).
.. note::
There is no way to modify content of the instance using the
indexing operator ``events[i]``. One must use
:meth:`~.EventList.update`.
More information on the list of events and their parameters is
available in the documentation for class :class:`.Simulator`.
"""
_map_enum = {
'size': _eggwrapper.Event.change_N,
'migr': _eggwrapper.Event.change_M,
'pair_migr': _eggwrapper.Event.change_Mp,
'growth': _eggwrapper.Event.change_G,
'selfing': _eggwrapper.Event.change_s,
'recombination': _eggwrapper.Event.change_R,
'bottleneck': _eggwrapper.Event.bottleneck,
'admixture': _eggwrapper.Event.admixture,
'sample': _eggwrapper.Event.delayed,
'merge': None
}
_required_parameters = { # not: T is enforced in add's signature
'size': set(['N']),
'migr': set(['M']),
'pair_migr': set(['src', 'dst', 'M']),
'growth': set(['G']),
'selfing': set(['s']),
'recombination': set(['R']),
'bottleneck': set(['S']),
'admixture': set(['src', 'dst', 'proba']),
'sample': set(['idx', 'label', 'num_chrom', 'num_indiv']),
'merge': set(['src', 'dst'])
}
_optional_parameters = {
'size': set(['idx']),
'growth': set(['idx']),
'selfing': set(['idx']),
'bottleneck': set(['idx'])
}
_all_parameters = {}
for i in _required_parameters:
_all_parameters[i] = _required_parameters[i] | _optional_parameters.get(i, set())
_all_parameters[i].add('T')
def __init__(self, npop, add, clear):
self._npop = npop
self._add = add
self._clear = clear
self._events = []
[docs] def replace(self, other):
"""
Replace list of events.
Replace own list of events with the one in the
:class:`.EventList` instance passed as *other*. The current list
of events is dropped.
"""
self.clear() # let self.clear() call self._clear() for security
for event in other: self.add(** event)
[docs] def clear(self):
"""
Clear list of events.
"""
self._clear() # do this first or the Event objects might be garbage-collected!
del self._events[:]
def __len__(self):
return len(self._events)
def __iter__(self):
for params, changes in self._events:
yield dict(params)
def __str__(self):
return ('[' + ', '.join([
'<event_index={0};'.format(i)
+ ';'.join(['{0}={1}'.format(k,v) for (k,v) in d.items()]) + '>'
for i, (d, changes) in enumerate(self._events)]) + ']')
def __repr__(self):
return str(self)
def __getitem__(self, i):
if i >= len(self._events): raise ValueError('invalid event index')
return dict(self._events[i][0])
[docs] def add(self, cat, T, **params):
"""
Add an event to the list. See the documentation for the class
:class:`.Simulator` for more details on what is expected as
arguments.
:param cat: category of the event.
:param T: date of the event.
:param params: all needed parameters for this category of
events.
"""
# check that category is valid
if cat not in self._all_parameters:
raise ValueError('invalid event category: `{0}`'.format(cat))
# check that all required parameters are present
if not self._required_parameters[cat].issubset(params):
raise ValueError('event `{0}` requires: {1}'.format(cat, ', '.join(self._required_parameters[cat] - set(params.keys()))))
# create the required backend objects
if cat == 'merge':
changes = [_eggwrapper.Event(self._map_enum['admixture'], T)]
if (params['src'] < 0 or params['src'] >= self._npop or
params['dst'] < 0 or params['dst'] >= self._npop or
params['src'] == params['dst']):
raise ValueError('invalid population indexes provided for `merge` event')
changes[0].set_index(params['src'])
changes[0].set_dest(params['dst'])
changes[0].set_param(1.0)
for i in range(self._npop):
if i != params['src']:
changes.append(_eggwrapper.Event(self._map_enum['pair_migr'], T))
changes[-1].set_param(0.0)
changes[-1].set_index(i)
changes[-1].set_dest(params['src'])
else:
changes = [_eggwrapper.Event(self._map_enum[cat], T)]
# add the event to the internal list
params['cat'] = cat
params['T'] = T
self._events.append((params, changes))
params_update = dict(params)
# use the update method to set parameters (delete the event in case of an error)
del params_update['cat']
del params_update['T']
try:
self.update(len(self._events) - 1, **params_update)
except ValueError:
del self._events[-1]
raise
# actually add changes to the lower-level Params
for change in changes: self._add(change)
[docs] def update(self, event_index, **params):
"""
Modify any parameter from one of the event of the list. If an
event's date is modified, sorting will be updated automatically.
:param event_index: index of the event to modify (based on the
order in which events have been specified with :meth:`.add`,
which is the same order as events appear when representing
the instance or iterating).
:param params: keyword arguments specifying what parameters to
modify. Only parameters that have to be changed should be
specified.
"""
# check index, get category
if event_index < 0 or event_index >= len(self._events): raise IndexError('invalid event index')
cat = self._events[event_index][0]['cat']
change0 = self._events[event_index][1][0] # not used if complex change
# initialize bool to perform ad-hoc tests
test1 = False
test2 = False
# process complex events
if cat == 'merge':
if 'T' in params:
for change in self._events[event_index][1]:
change.move(params['T'])
if 'src' in params:
test2 = True
if params['src'] < 0 or params['src'] > self._npop: raise ValueError('event `{0}`, population index out of range'.format(cat))
self._events[event_index][1][0].set_index(params['src'])
for change in self._events[event_index][1][1:]:
change.set_dest(params['src'])
if 'dst' in params:
test2 = True
if params['dst'] < 0 or params['dst'] > self._npop: raise ValueError('event `{0}`, population index out of range'.format(cat))
self._events[event_index][1][0].set_dest(params['dst'])
# process simple events
else:
# process all parameters and check+set them
for key, value in params.items():
if key not in self._all_parameters[cat]:
raise ValueError('event `{0}`: unknown parameter: {1}'.format(cat, key))
if key == 'T':
if value < 0.0: raise ValueError('event `{0}`: date must be positive'.format(cat))
change0.move(value)
elif key == 'N':
if value <= 0.0: raise ValueError('event `{0}`: size must be strictly positive'.format(cat))
change0.set_param(value)
elif key == 'M':
if value < 0.0: raise ValueError('event `{0}`: size cannot be negative'.format(cat))
change0.set_param(value)
elif key == 'G':
change0.set_param(value)
elif key == 's':
if value < 0.0 or value > 1.0: raise ValueError('event `{0}`: selfing rate must be between 0 and 1'.format(cat))
change0.set_param(value)
elif key == 'R':
if value < 0.0: raise ValueError('event `{0}`: recombination rate must be positive'.format(cat))
change0.set_param(value)
elif key == 'S':
if value < 0.0: raise ValueError('event `{0}`: size must be positive'.format(cat))
change0.set_param(value)
elif key == 'proba':
if value < 0.0 or value > 1.0: raise ValueError('event `{0}`: probability must be between 0 and 1'.format(cat))
change0.set_param(value)
elif key == 'idx':
if value < 0 or value > self._npop: raise ValueError('event `{0}`: population index out of range'.format(cat))
change0.set_index(value)
elif key == 'src':
test2 = True
if value < 0 or value > self._npop: raise ValueError('event `{0}`: population index out of range'.format(cat))
change0.set_index(value)
elif key == 'dst':
test2 = True
if value < 0 or value > self._npop: raise ValueError('event `{0}`: population index out of range'.format(cat))
change0.set_dest(value)
elif key == 'label':
if not isinstance(value, str): raise TypeError('label must be a string')
change0.set_label(value)
elif key == 'num_chrom':
test1 = True
if value < 0: raise ValueError('event `{0}`: number of samples must be positive'.format(cat))
change0.set_number1(value)
elif key == 'num_indiv':
test1 = True
if value < 0: raise ValueError('event `{0}`: number of samples must be positive'.format(cat))
change0.set_number2(value)
else:
raise RuntimeError('unexpected error')
# perform ad-hoc tests
if test1 and change0.get_number1() + change0.get_number2() == 0:
raise ValueError('event `{0}`: there must be at least one sample'.format(cat))
if test2 and change0.get_index() == change0.get_dest():
raise ValueError('event `{0}`: cannot source/destination populations cannot be the same'.format(cat))
# copy all parameters
self._events[event_index][0].update(params)