Source code for wiggelen.distance

"""
Calculate the distance between two wiggle tracks using a metric designed for
multisets.

This module can be used to assess similarity of next generation sequencing
datasets. A multiset distance measure is used for pairwise comparison of
genomic information as provided by wiggle tracks.

The algorithm can be parameterized by a pairwise distance metric. Four of
these metrics are predefined in :attr:`metrics`:

Metric ``a``: :math:`\\frac{|x - y|}{(x + 1) (y + 1)}`

Metric ``b``: :math:`\\frac{|x - y|}{x + y + 1}`

Metric ``c``: :math:`\\frac{\\text{max}(x, y) \, |x - y|}{(x^2 + 1) (y^2 + 1)}`

Metric ``d``: :math:`\\frac{|x - y|}{\\text{max}(x, y) + 1}`

.. note:: These metrics are ill-defined on the interval (0, 1) so we scale all
       values if necessary.

.. moduleauthor:: Martijn Vermaat <martijn@vermaat.name>
.. moduleauthor:: Jeroen Laros <j.f.j.laros@lumc.nl>

.. Licensed under the MIT license, see the LICENSE file.
"""


from __future__ import division

from collections import defaultdict
import itertools
import sys

from .wiggle import walk
from .index import Field, index
from .merge import merge


# Todo: Give these metrics a name.
_metric_a = lambda x, y: abs(x - y) / ((x + 1) * (y + 1))
_metric_b = lambda x, y: abs(x - y) / (x + y + 1)
_metric_c = lambda x, y: ((max(x, y) * abs(x - y))
                          / (((x * x) + 1) * ((y * y) + 1)))
_metric_d = lambda x, y: abs(x - y) / (max(x, y) + 1)


#: Predefined pairwise distance metrics. See :mod:`wiggelen.distance` for
#: their definition.
metrics = {'a': _metric_a,
           'b': _metric_b,
           'c': _metric_c,
           'd': _metric_d}


[docs]def normalize(*values): """ Normalize values relative to the smallest value. :arg values: List of values. :type values: list(float) :return: Scale the values such that the minimum is 1. :rtype: list(float) """ min_value = min(*values) if not min_value: return [1 for _ in values] return [value / min_value for value in values]
[docs]def matrix(size, reflexive=False, symmetric=False): """ Create all coordinates in a square matrix. With the default `False` value for `reflexive` and `symmetric`, include only the coordinates below the diagonal. :arg size: Width and height of the matrix. :type size: int :arg reflexive: Include coordinates on (`x`, `x`) diagonal. :type reflexive: bool :arg symmetric: Include coordinates (`x`, `y`) above the diagonal (where `x < y`). :type symmetric: bool :return: All coordinates in the matrix as tuples. :rtype: list(int, int) Examples:: >>> matrix(5) [(1, 0), (2, 0), (2, 1), (3, 0), (3, 1), (3, 2), (4, 0), (4, 1), (4, 2), (4, 3)] >>> matrix(5, reflexive=True) [(0, 0), (1, 0), (1, 1), (2, 0), (2, 1), (2, 2), (3, 0), (3, 1), (3, 2), (3, 3), (4, 0), (4, 1), (4, 2), (4, 3), (4, 4)] >>> matrix(5, symmetric=True) [ (0, 1), (0, 2), (0, 3), (0, 4), (1, 0), (1, 2), (1, 3), (1, 4), (2, 0), (2, 1), (2, 3), (2, 4), (3, 0), (3, 1), (3, 2), (3, 4), (4, 0), (4, 1), (4, 2), (4, 3) ] """ return [(i, j) for i in range(0, size) for j in itertools.chain(range(i + 1 if reflexive else i), range(i + 1, size if symmetric else 0))]
[docs]def distance(*tracks, **options): """ Calculate the pairwise distances between wiggle tracks. :arg tracks: List of wiggle tracks. :type walkers: list(file) :arg metric: Pairwise distance metric (default: a). :type merger: function(float, float -> float) :arg threshold: Threshold for noise filter (default: no noise filter) :type threshold: float :return: Pairwise distances between `tracks` as a mapping from coordinates in the distance matrix to their values. :rtype: dict((int, int), float) .. todo:: Check where this goes wrong if we cannot .seek() the tracks. .. todo:: Calculate weights per region instead of over the entire track. """ metric = options.get('metric', metrics['a']) threshold = options.get('threshold') # We construct a list of comparisons for the merger, where each comparison # is a tuple of (left, right, weight_left, weight_right). comparisons = [] if threshold: field_suffix = '-threshold-%s' % str(threshold) noise_filter = lambda value: max(value - threshold, 0) def sum_func(acc, value, span): return acc + noise_filter(value) * span def min_func(acc, value, span): filtered = noise_filter(value) if filtered > 0: return min(acc, noise_filter(value)) return acc fields = [Field('sum' + field_suffix, float, 0, sum_func), Field('posmin' + field_suffix, float, sys.float_info.max, min_func)] else: field_suffix = '' noise_filter = lambda value: value fields = [] summaries = [index(track, force=True, fields=fields)[0]['_all'] for track in tracks] # Our metrics are undifined on the (0, 1) interval, so if the positive # minimum over all tracks is < 1 we upscale everything. min_value = min(summary['posmin' + field_suffix] for summary in summaries) scale = 1 / min_value if 0 < min_value < 1 else 1 # Based on the sums of all values in each track we define weights. sums = [summary['sum' + field_suffix] for summary in summaries] for left, right in matrix(len(tracks)): weight_right, weight_left = normalize(sums[left], sums[right]) comparisons.append( (left, right, weight_left, weight_right) ) # The merger returns a matrix of values, one for each pairwise comparison. def merger(values): results = {} for left, right, weight_left, weight_right in comparisons: value_left, value_right = values[left], values[right] if value_left is None and value_right is None: result = None else: x = (weight_left * scale * noise_filter(value_left) if value_left else 0) y = (weight_right * scale * noise_filter(value_right) if value_right else 0) result = metric(x, y) if x or y else None results[left, right] = result return results # Indexed walkers. walkers = [walk(track, force_index=True) for track in tracks] # Aggregate results. totals = defaultdict(lambda: 0) counts = defaultdict(lambda: 1) for _, _, values in merge(*walkers, merger=merger): for comparison, value in values.items(): if value is not None: totals[comparison] += value counts[comparison] += 1 # Create the distance matrix by taking the averages. distances = {} for comparison in counts: distances[comparison] = totals[comparison] / counts[comparison] return distances