Source code for pdsc.table

"""
Parses PDS cumulative index files into an internal table representation
"""
import re
import numpy as np
from datetime import datetime

from .util import registerer, standard_progress_bar

INSTRUMENT_TABLES = {}
register_table = registerer(INSTRUMENT_TABLES)
"""
A decorator that can be used to register a :py:class:`PdsTable` subclass to a
particular instrument.

:param instrument: PDSC instrument name
:return: decorator that registers target to given instrument

See :ref:`Extending PDSC` for more details.
"""

INSTRUMENT_DETERMINERS = {}
register_determiner = registerer(INSTRUMENT_DETERMINERS)
"""
A decorator that can be used to register a function that determines whether a
cumulative index file is for a particular instrument.

:param instrument: PDSC instrument name
:return: decorator that registers target to given instrument

See :ref:`Extending PDSC` for more details.
"""

[docs]class PdsColumnType(object): """ Wraps a type used for PDS columns to ensure a deterministic representation that omits memory addresses. This is a workaroud for an issue in Sphinx. >>> f = PdsColumnType(themis_datetime) >>> repr(f) '<function themis_datetime>' >>> f('1985-10-26T01:20:00.000') datetime.datetime(1985, 10, 26, 1, 20) """ def __init__(self, f): """ :param f: type function to wrap """ self._f = f def __repr__(self): frepr = repr(self._f) return re.sub(' at 0x[0-9A-Fa-f]*', '', frepr) def __call__(self, *args, **kwargs): return self._f(*args, **kwargs)
[docs]def themis_datetime(s): """ Parses date/time format found in THEMIS cumulative index files :param s: datetime string :return: :py:class:`datetime.datetime` object >>> themis_datetime('1985-10-26T01:20:00.000') datetime.datetime(1985, 10, 26, 1, 20) """ return datetime.strptime(s, '%Y-%m-%dT%H:%M:%S.%f')
[docs]def hirise_datetime(s): """ Parses date/time format found in HiRISE cumulative index files :param s: datetime string :return: :py:class:`datetime.datetime` object >>> hirise_datetime('1985-10-26T01:20:00') datetime.datetime(1985, 10, 26, 1, 20) """ return datetime.strptime(s.strip(), '%Y-%m-%dT%H:%M:%S')
[docs]def ctx_sclk(s): ''' Converts the CTX SCLK representation with a colon into a fractional second with a decimal place :param s: CTX SCLK string :return: floating-point fractional second >>> ctx_sclk('10:1') 10.1 ''' return float(s.replace(':', '.'))
[docs]def moc_observation_id(s): """ Remove the forward slash in MOC observation ids :param s: MOC observation id :return: reformatted id >>> moc_observation_id('FHA/00469') 'FHA00469' """ return s.replace('/', '')
[docs]@register_determiner('hirise_edr') def hirise_edr_determiner(label_contents): """ Determines whether a cumulative index file is for HiRISE EDR products :param label_contents: PDS cumulative index LBL file contents :return: ``True`` iff this label file is for HiRISE EDR products """ return ( 'HiRISE' in label_contents and 'EDR_INDEX_TABLE' in label_contents )
[docs]@register_determiner('hirise_rdr') def hirise_rdr_determiner(label_contents): """ Determines whether a cumulative index file is for HiRISE RDR products :param label_contents: PDS cumulative index LBL file contents :return: ``True`` iff this label file is for HiRISE RDR products """ return ( 'HiRISE' in label_contents and 'RDR_INDEX_TABLE' in label_contents )
[docs]def themis_determiner(label_contents, detector_name): """ Determines whether a cumulative index file is for generic THEMIS products :param label_contents: PDS cumulative index LBL file contents :param detector_name: THEMIS detector name (either ``'VIS'`` or ``'IR'``) :return: ``True`` iff this label file is for THEMIS products with the specified detector """ instrument = parse_simple_label(label_contents, 'INSTRUMENT_NAME') detector = parse_simple_label(label_contents, 'DETECTOR_ID') return ( instrument is not None and detector is not None and 'THERMAL EMISSION IMAGING SYSTEM' in instrument and detector_name in detector )
[docs]def parse_simple_label(label_contents, key): """ Retrieves the value of a "simple" PDS header entry corresponding to the given key. Simple entries are string-valued entries that do not split across lines. :param label_contents: string contents of the PDS LBL file :param key: entry key to search for in PDS label :return: entry value string or ``None`` if not found """ for line in label_contents.splitlines(False): match = re.match(r'^\s*(\w+)\s*=\s*"?([^"]+)"?\s*$', line) if match is not None: k = match.group(1) v = match.group(2) if key == k: return v return None
[docs]def generic_determiner(label_contents, instrument_name): """ Determines whether a cumulative index file is for an instrument with the specified name :param label_contents: PDS cumulative index LBL file contents :param instrument_name: instrument name as reported in the cumulative index ``INSTRUMENT_NAME`` header :return: ``True`` iff this label file is for the specified instrument This determiner works for cumulative index files that have an explicit ``INSTRUMENT_NAME`` header. """ instrument = parse_simple_label(label_contents, 'INSTRUMENT_NAME') return (instrument is not None and instrument_name in instrument)
[docs]@register_determiner('themis_vis') def themis_vis_determiner(label_contents): """ Determines whether a cumulative index file is for THEMIS VIS products :param label_contents: PDS cumulative index LBL file contents :return: ``True`` iff this label file is for THEMIS VIS products """ return themis_determiner(label_contents, 'VIS')
[docs]@register_determiner('themis_ir') def themis_ir_determiner(label_contents): """ Determines whether a cumulative index file is for THEMIS IR products :param label_contents: PDS cumulative index LBL file contents :return: ``True`` iff this label file is for THEMIS IR products """ return themis_determiner(label_contents, 'IR')
[docs]@register_determiner('ctx') def ctx_determiner(label_contents): """ Determines whether a cumulative index file is for CTX products :param label_contents: PDS cumulative index LBL file contents :return: ``True`` iff this label file is for CTX products """ return generic_determiner(label_contents, 'CONTEXT CAMERA')
[docs]@register_determiner('moc') def moc_determiner(label_contents): """ Determines whether a cumulative index file is for MOC products :param label_contents: PDS cumulative index LBL file contents :return: ``True`` iff this label file is for MOC products """ return generic_determiner(label_contents, 'MARS ORBITER CAMERA')
[docs]def determine_instrument(label_contents): """ Determines the PDSC instrument name associated with a PDS cumulative index LBL file :param label_contents: contents of the PDS cumulative index LBL file :return: the instrument name corresponding to the first registered "determiner" function that returns ``True``; instruments are checked in alphabetical order by name """ for iname, determiner in sorted(INSTRUMENT_DETERMINERS.items()): if determiner(label_contents): return iname raise ValueError('Could not determine instrument')
[docs]class PdsTableColumn(object): """ Class for representing and parsing a column from a PDS cumulative index table """ PARSE_TABLE = { 'NAME' : ('name', str), 'COLUMN_NUMBER' : ('number', int), 'DATA_TYPE': ('dtype', str), 'START_BYTE': ('start_byte', int), 'BYTES': ('length', int), 'NOT_APPLICABLE_CONSTANT': ('unknown_constant', str), } """ Information for parsing table columns; each column contains associated metadata such as the column number, the size of the column in bytes, or the fill value used when an entry is not applicable This dictionary maps the metadata identifier to a tuple containing the name and type of the :py:class:`PdsTableColumn` attribute that will be set when parsing this column. """ TYPE_TABLE = { 'ASCII_REAL' : float, 'ASCII_INTEGER' : int, } """ Contains a mapping of standard column data types to assocaited Python types """ SPECIAL_TYPES = {} """ Contains a mapping from column names with non-standard column types to assocaited Python types; sub-classes should use this attribute to define custom column types for a particular instrument """ def __init__(self, fpointer): """ :param fpointer: an open file object, pointing to the start of the column within the PDS index LBL file """ self.name = None self.dtype = None self.number = None self.start_byte = None self.length = None self.unknown_constant = None success = self._parse(fpointer) if not success: raise ValueError('Column was not successfully parsed!') # Remap column data type if self.name in self.SPECIAL_TYPES: self.dtype = self.SPECIAL_TYPES[self.name] else: self.dtype = self.TYPE_TABLE.get(self.dtype, str) # Recast unknown constant to type of column if self.unknown_constant is not None: self.unknown_constant = self.dtype(self.unknown_constant) def _parse(self, fpointer): while True: line = fpointer.readline() if len(line) == 0: break match = re.match(r'\s*(\w+)\s*=\s*(\w+)\s*', line) if match is None: if 'END_OBJECT' in line: return True else: continue # pragma: no cover key = match.group(1) val = match.group(2) if key == 'END_OBJECT' and val == 'COLUMN': return True action = self.PARSE_TABLE.get(key, None) if action is None: continue vdest, vtype = action setattr(self, vdest, vtype(val)) return False
[docs]class PdsTable(object): """ Class for representing and parsing a PDS cumulative index table """ PARSE_TABLE = { 'COLUMNS' : ('n_columns', int), 'ROWS' : ('n_rows', int), 'ROW_BYTES' : ('row_bytes', int), } """ Information for parsing the table object out of the PDS cumulative index label file This dictionary maps the metadata identifier to a tuple containing the name and type of the :py:class:`PdsTable` attribute that will be set when parsing this column. """ TABLE_OBJECT_NAME = 'TABLE' """ The name of a TABLE object in the PDS cumulative index label """ COLUMN_OBJECT_NAME = 'COLUMN' """ The name of a COLUMN object in the PDS cumulative index label """ COLUMN_CLASS = PdsTableColumn """ The table column class used to parse columns in this table """ CHECK_COLUMN_COUNT = True """ Whether to check the number of columns parsed against the number of columns reported in the table metadata; for most instruments, these numbers match, but other instruments have columns with multiple fields so there is sometimes a discrepancy between the effective number of columns and the reported column count. """ def __init__(self, label_file, table_file): """ :param label_file: path to a PDS cumulative index LBL file :param table_file: path to a PDS cumulative index TAB file """ self.label_file = label_file self.table_file = table_file self._data_cache = {} for attr, _ in self.PARSE_TABLE.values(): setattr(self, attr, None) with open(label_file, 'r') as f: columns = self._parse(f) if columns is None: raise RuntimeError('Error parsing table') if self.CHECK_COLUMN_COUNT and (len(columns) != self.n_columns): raise ValueError( 'Expected %d columns; got %d' % (self.n_columns, len(columns)) ) _, self.columns = zip(*sorted(columns.items())) def _parse(self, fpointer): columns = {} in_table = False while True: line = fpointer.readline() if len(line) == 0: break match = re.match(r'\s*(\w+)\s*=\s*(\w+)\s*', line) if match is None: if in_table and 'END_OBJECT' in line: return columns else: continue # pragma: no cover key = match.group(1) val = match.group(2) if in_table: if key == 'END_OBJECT' and val == self.TABLE_OBJECT_NAME: return columns if key == 'OBJECT' and val == self.COLUMN_OBJECT_NAME: column = self.COLUMN_CLASS(fpointer) if column.number is None: column.number = len(columns) columns[column.number] = column continue action = self.PARSE_TABLE.get(key, None) if action is None: continue vdest, vtype = action setattr(self, vdest, vtype(val)) continue else: if key == 'OBJECT' and val == self.TABLE_OBJECT_NAME: in_table = True continue return None
[docs] def get_column_idx(self, column_name): """ Get numerical column index given column name :param column_name: PDS table column name :return: index of column within table (raises :py:class:`IndexError` if the column is not found) """ for i, c in enumerate(self.columns): if c.name == column_name: return i raise IndexError('Column name "%s" not found' % str(column_name))
[docs] def get_column(self, column_name_or_idx, progress=True, cache=True): """ Parses all column values out of a PDS cumulative index table :param column_name_or_idx: either an integer column index, or its name as given in the PDS label file :param progress: if ``True``, displays a progress bar as the column is being read :param cache: if ``True``, caches the result in memory so that subsequent calls do not have to read from the file :return: a :py:class:`numpy.array` containing values for every row of the specified column """ if type(column_name_or_idx) != int: cidx = self.get_column_idx(column_name_or_idx) else: cidx = column_name_or_idx if cidx in self._data_cache: return self._data_cache[cidx] else: column = self.columns[cidx] values = [] pbar = standard_progress_bar('Reading column %d' % cidx, progress) with open(self.table_file, 'r') as f: for r in pbar(range(self.n_rows)): f.seek(r*self.row_bytes + column.start_byte - 1) value = f.read(column.length) values.append(value) try: data_column = np.array(values, dtype=column.dtype) except TypeError: pbar = standard_progress_bar( 'Converting column %d' % cidx, progress) data_column = np.array([column.dtype(v) for v in pbar(values)]) if column.unknown_constant is not None: data_column[data_column == column.unknown_constant] = np.nan if data_column.dtype.char == 'S': data_column = np.char.strip(data_column) if cache: self._data_cache[cidx] = data_column return data_column
# **************************************************************************** # CTX # ****************************************************************************
[docs]class CtxTableColumn(PdsTableColumn): """ A subclass of :py:class:`PdsTableColumn` for the CTX instrument to define some special types """ SPECIAL_TYPES = { 'IMAGE_TIME': PdsColumnType(themis_datetime), 'SPACECRAFT_CLOCK_START_COUNT': PdsColumnType(ctx_sclk), } """ Defines special types for the CTX instrument to parse observation and spacecraft clock times """
[docs]@register_table('ctx') class CtxTable(PdsTable): """ A subclass of :py:class:`PdsTable` for the CTX instrument that uses the custom :py:class:`CtxTableColumn` class """ COLUMN_CLASS = CtxTableColumn """ The :py:class:`CtxTable` class should use :py:class:`CtxTableColumn` for parsing columns """
# **************************************************************************** # THEMIS # ****************************************************************************
[docs]class ThemisTableColumn(PdsTableColumn): """ A subclass of :py:class:`PdsTableColumn` for the THEMIS instrument to override column metadata and define some special types """ PARSE_TABLE = { 'NAME' : ('name', str), 'COLUMN_NUMBER' : ('number', int), 'DATA_TYPE': ('dtype', str), 'START_BYTE': ('start_byte', int), 'BYTES': ('length', int), 'UNKNOWN_CONSTANT': ('unknown_constant', str), } """ Override column metadata, which follows a slightly different convention for THEMIS """ SPECIAL_TYPES = { 'START_TIME': PdsColumnType(themis_datetime), 'STOP_TIME': PdsColumnType(themis_datetime), 'SPACECRAFT_CLOCK_START_COUNT': float, 'SPACECRAFT_CLOCK_STOP_COUNT': float, 'START_TIME_ET': float, 'STOP_TIME_ET': float, 'UNCORRECTED_SCLK_START_COUNT': float, 'BAND_NUMBER': int, 'LOCAL_TIME': float, } """ Defines special types for the THEMIS observation metadata """
[docs]@register_table('themis_vis') @register_table('themis_ir') class ThemisTable(PdsTable): """ A subclass of :py:class:`PdsTable` for the THEMIS instrument that uses the custom :py:class:`ThemisTableColumn` class """ COLUMN_CLASS = ThemisTableColumn """ The :py:class:`ThemisTable` class should use :py:class:`ThemisTableColumn` for parsing columns """
# **************************************************************************** # HiRISE # ****************************************************************************
[docs]class HiRiseTableColumn(PdsTableColumn): """ A subclass of :py:class:`PdsTableColumn` for the HiRISE instrument to define some special types """ SPECIAL_TYPES = { 'OBSERVATION_START_TIME': PdsColumnType(hirise_datetime), 'START_TIME': PdsColumnType(hirise_datetime), 'OBSERVATION_START_COUNT': PdsColumnType(ctx_sclk), 'STOP_TIME': PdsColumnType(hirise_datetime), 'SPACECRAFT_CLOCK_START_COUNT': PdsColumnType(ctx_sclk), 'SPACECRAFT_CLOCK_STOP_COUNT': PdsColumnType(ctx_sclk), 'ADC_CONVERSION_SETTINGS': str, } """ Defines special types for the HiRISE observation metadata """
[docs]@register_table('hirise_edr') class HiRiseEdrTable(PdsTable): """ A subclass of :py:class:`PdsTable` for the HiRISE instrument that uses the custom :py:class:`HiRiseTableColumn` class """ COLUMN_CLASS = HiRiseTableColumn """ The :py:class:`HiRiseEdrTable` class should use :py:class:`HiRiseTableColumn` for parsing columns """ TABLE_OBJECT_NAME = 'EDR_INDEX_TABLE' """ The HiRISE EDR table has a custom name """ CHECK_COLUMN_COUNT = False """ Ignore the column count discrepancy for the HiRISE EDR table """
# **************************************************************************** # HiRISE RDR # ****************************************************************************
[docs]@register_table('hirise_rdr') class HiRiseRdrTable(PdsTable): """ A subclass of :py:class:`PdsTable` for the HiRISE instrument that uses the custom :py:class:`HiRiseTableColumn` class """ COLUMN_CLASS = HiRiseTableColumn """ The :py:class:`HiRiseRdrTable` class should use :py:class:`HiRiseTableColumn` for parsing columns """ TABLE_OBJECT_NAME = 'RDR_INDEX_TABLE' """ The HiRISE RDR table has a custom name """
# **************************************************************************** # MOC # ****************************************************************************
[docs]class MocTableColumn(PdsTableColumn): """ A subclass of :py:class:`PdsTableColumn` for the MOC instrument to define some special types """ SPECIAL_TYPES = { 'IMAGE_TIME': PdsColumnType(themis_datetime), 'SPACECRAFT_CLOCK_START_COUNT': PdsColumnType(ctx_sclk), 'PRODUCT_ID': PdsColumnType(moc_observation_id), } """ Defines special types for the MOC observation metadata """
[docs]@register_table('moc') class MocTable(PdsTable): """ A subclass of :py:class:`PdsTable` for the MOC instrument that uses the custom :py:class:`MocTableColumn` class """ COLUMN_CLASS = MocTableColumn """ The :py:class:`MocTable` class should use :py:class:`MocTableColumn` for parsing columns """
[docs]def parse_table(label_file, table_file): """ Parses a PDS cumulative index table :param label_file: path to the PDS LBL file assocated with the cumulate index :param table_file: path to the PDS TAB file assocated with the cumulate index :return: a :py:class:`PdsTable` object containing parsed table metadata This function first uses :py:meth:`determine_instrument` to determine the instrument name associated with the ``label_file``. Then, the function looks up the :py:class:`PdsTable` subclass that has been registered to the instrument and uses this class to parse the table. See :ref:`Extending PDSC` for more details. """ with open(label_file, 'r') as f: instrument = determine_instrument(f.read()) if instrument not in INSTRUMENT_TABLES: raise ValueError('Table parsing not implemented for %s' % instrument) return instrument, INSTRUMENT_TABLES[instrument](label_file, table_file)