Source code for wiggelen.index

"""
Index regions/chromosomes in wiggle tracks for random access.

Indexing a wiggle track results in a mapping of regions to summaries. The
summaries are dictionaries including start and stop positions and some
statistical metrics. A summary for the entire wiggle track is included as the
special region ``_all``.

This data can be written to a file next to the wiggle track file (in case this
is a regular file). Example of the serialization we use::

    region=_all,start=0,stop=12453,sum=4544353,count=63343
    region=1,start=47,stop=3433,sum=4353,count=643
    region=X,start=3433,stop=8743,sum=454,count=343
    region=Y,start=8743,stop=10362,sum=7353,count=343
    region=MT,start=10362,stop=12453,sum=353,count=143

Note that we do not impose a certain order on the lines in the index nor on
the fields on a line.

Additional custom fields can be added to the index by providing custom field
definitions. Such a definition is created with the `Field` constructor and the
following arguments:

* The name of the field.
* A function casting a field value from `string`.
* Initial value.
* Aggregate function used as the function argument in a reduce- or fold-like
  operation to construct the field value. This function takes as inputs the
  accumulated field value, the current value and the current span, and returns
  a new accumulated field value.

As an example, the standard `sum` field could be defined as the
following tuple::

    Field('sum', float, 0, lambda acc, value, span: acc + value * span)

In practice, choose unique names for custom fields, not clashing with the
standard fields such as `sum`.

.. todo:: Add some other metrics to the index (standard deviation, min, max).

.. todo:: A custom field that is missing from an existing index file causes
   the entire index to be rebuilt, throwing away any other custom field values
   that were already there.

.. moduleauthor:: Martijn Vermaat <martijn@vermaat.name>

.. Licensed under the MIT license, see the LICENSE file.
"""


from collections import defaultdict, namedtuple
import sys

from .parse import LineType, create_state, parse


#: Whether or not indices are written to a file.
WRITE_INDEX = True

#: Suffix used for index files.
INDEX_SUFFIX = '.idx'

#: Whether or not indices are cached in memory during execution.
CACHE_INDEX = True


# Cache store of indices, indexed by index filename.
_cache = {}


#: Type for custom index field definitions.
Field = namedtuple('Field', 'name caster init func')


[docs]class ReadError(Exception): """ Raised if a wiggle track does not provide random access. Reading with random access is needed for using and creating an index. """ pass # Cast index values to their respective types.
def _cast(field, value, fields=None): fields = fields or [] casters = defaultdict(lambda: str, start=int, stop=int, sum=float, min=float, posmin=float, max=float, count=int) casters.update(dict((field.name, field.caster) for field in fields)) return casters[field](value) # Try to create a filename for the index file. def _index_filename(track=sys.stdin): filename = getattr(track, 'name', None) if filename is not None and not filename.startswith('<'): # Note that this does not play nice if we have changed our current # working dir and the file was opened using a relative path. return filename + INDEX_SUFFIX
[docs]def clear_cache(): """ Clear the in-memory cache of index objects. """ _cache.clear()
[docs]def write_index(idx, track=sys.stdout): """ Try to write the index to a file and return its filename. :arg idx: Wiggle track index. :type idx: dict(str, dict(str, _)) :arg track: Wiggle track the index belongs to. :type track: file :return: Filename for the written index, or `None` if the index could not be written. :rtype: str """ filename = _index_filename(track) if filename is None: return if CACHE_INDEX: _cache[filename] = idx if not WRITE_INDEX: return try: with open(filename, 'w') as f: f.write('\n'.join(','.join('%s=%s' % d for d in s.items()) for s in idx.values()) + '\n') return filename except IOError: pass
[docs]def read_index(track=sys.stdin, fields=None): """ Try to read the index from a file. :arg track: Wiggle track the index belongs to. :type track: file :arg fields: List of custom index field definitions. :type fields: list :return: Wiggle track index, or `None` if the index could not be read. :rtype: dict(str, dict(str, _)) .. todo:: Only accept if index is newer than wiggle track? """ fields = fields or [] filename = _index_filename(track) if filename is None: return if CACHE_INDEX and filename in _cache: return _cache[filename] try: idx = {} with open(filename) as f: for line in f: summary = {} for d in line.rstrip().split(','): k, v = d.split('=') summary[k] = _cast(k, v, fields=fields) idx[summary['region']] = summary # Todo: Here we check if all the required custom fields are in the # index for `_all`, but we should really do a better check if all # required data is there. if all(field.name in idx['_all'] for field in fields): return idx except IOError: pass
[docs]def index(track=sys.stdin, force=False, fields=None): """ Return index of region positions in track. :arg track: Wiggle track. :type track: file :arg force: Force creating an index if it does not yet exist. :type force: bool :arg fields: List of custom index field definitions. :type fields: list :return: Wiggle track index and index filename. :rtype: dict(str, dict(str, _)), str .. todo:: It is not possible to force the index to be rewritten if it already exists. .. todo:: Handle non-writable index, corrupt index, etc. """ fields = fields or [] idx = read_index(track, fields=fields) if idx is not None or not force: return idx, _index_filename(track) try: track.tell() except (AttributeError, IOError): raise ReadError('Could not index track (needs random access)') region = None idx = {'_all': {'region': '_all', 'start': 0, 'stop': track.tell(), 'sum': 0, 'min': sys.float_info.max, 'posmin': sys.float_info.max, 'max': 0, 'count': 0}} idx['_all'].update(dict((field.name, field.init) for field in fields)) state = create_state() while True: line = track.readline() if not line: break line_type, data = parse(line, state) if line_type == LineType.REGION: region = data idx[region] = { 'region': region, 'start': track.tell() - len(line), 'stop': track.tell(), 'sum': 0, 'min': sys.float_info.max, 'posmin': sys.float_info.max, 'max': 0, 'count': 0} idx[region].update(dict((field.name, field.init) for field in fields)) elif line_type == LineType.DATA: for r in region, '_all': idx[r]['stop'] = track.tell() idx[r]['sum'] += data.value * data.span idx[r]['min'] = min(data.value, idx[r]['min']) if data.value > 0: idx[r]['posmin'] = min(data.value, idx[r]['posmin']) idx[r]['max'] = max(data.value, idx[r]['max']) idx[r]['count'] += data.span for field in fields: idx[r][field.name] = field.func(idx[r][field.name], data.value, data.span) return idx, write_index(idx, track)