Source code for pdsc.client

"""
This module contains code to support various types of clients that can be used
to query PDSC for metadata or find observations coincident with a point on the
surface or another point.
"""
import os
import json
import sqlite3
import requests
from glob import glob

from .metadata import PdsMetadata, METADATA_DB_SUFFIX, json_loads

from .segment import (SegmentTree, PointQuery, TriSegment,
    SEGMENT_DB_SUFFIX, SEGMENT_TREE_SUFFIX)

DATABASE_DIRECTORY_VAR = 'PDSC_DATABASE_DIR'
"""
Environment variable that holds the PDSC ingested database directory

See :ref:`Environment Variables` for details.
"""

SERVER_VAR = 'PDSC_SERVER_HOST'
"""
Environment variable that holds the PDSC server host name

See :ref:`Environment Variables` for details.
"""

PORT_VAR = 'PDSC_SERVER_PORT'
"""
Environment variable that holds the PDSC server port

See :ref:`Environment Variables` for details.
"""

[docs]class PdsClient(object): """ The :py:class:`PdsClient` class handles queries to a local PDSC database for looking up observations based on id or location """ def __init__(self, database_directory=None): """ :param database_directory: location of the PDSC databases; if ``None``, the ``PDSC_DATABASE_DIR`` environment variable is used to determine the database directory """ if database_directory is None: database_directory = os.environ.get(DATABASE_DIRECTORY_VAR, None) if database_directory is None: raise ValueError( 'Must specify database directory ' 'or set "%s" environment variable' % DATABASE_DIRECTORY_VAR ) if not os.path.exists(database_directory): raise ValueError( 'Database directory "%s" does not exist' % database_directory ) db_files = glob(os.path.join( database_directory, '*%s' % METADATA_DB_SUFFIX)) self.instruments = [ os.path.basename(db)[:-len(METADATA_DB_SUFFIX)] for db in db_files ] self._db_files = dict([ (i, f) for i, f in zip(self.instruments, db_files)]) self._seg_files = {} self._seg_tree_files = {} self._seg_trees = {} for i in self.instruments: segfile = os.path.join(database_directory, '%s%s' % (i, SEGMENT_DB_SUFFIX)) treefile = os.path.join(database_directory, '%s%s' % (i, SEGMENT_TREE_SUFFIX)) if not (os.path.exists(segfile) and os.path.exists(treefile)): continue self._seg_files[i] = segfile self._seg_tree_files[i] = treefile self._seg_trees[i] = None def _get_seg_tree(self, instrument): if instrument not in self._seg_trees: raise ValueError( 'Localization index not available for %s' % instrument) if self._seg_trees[instrument] is None: self._seg_trees[instrument] = SegmentTree.load( self._seg_tree_files[instrument]) return self._seg_trees[instrument] def _query(self, instrument, conditions=None): """ instrument: instrument name conditions: list of tuples (variable name, >/=/<, value) """ if instrument not in self.instruments: raise ValueError('Instrument "%s" not found' % instrument) query_str = 'SELECT * FROM metadata' if conditions is None or len(conditions) == 0: query_tup = (query_str,) else: for t in conditions: if len(t) != 3: raise ValueError('Invalid condition "%s"' % str(t)) if t[1] not in ('<', '=', '>', '>=', '<='): raise ValueError('Invalid comparator "%s"' % t[1]) parts, values = zip(*[ ('%s%s?' % (col, comp), val) for col, comp, val in conditions ]) query_str += ' WHERE %s' % ( ' and '.join(parts) ) query_tup = (query_str, values) db_file = self._db_files[instrument] params = {'detect_types': sqlite3.PARSE_DECLTYPES} with sqlite3.connect(db_file, **params) as conn: cur = conn.cursor() cur.execute(*query_tup) names = [description[0] for description in cur.description] while True: row = cur.fetchone() if row is None: break valdict = dict(zip(names, row)) yield PdsMetadata(instrument, **valdict)
[docs] def query(self, instrument, conditions=None): """ Supports a generic query of observations based on metadata :param instrument: PDSC instrument name :param conditions: a collection of tuples indicating query constraints; each tuple should contain three entries: - metadata variable name - comparator (``'='``, ``'<'``, ``'>'``, ``'<='``, ``'>='``) - value A SQL-like query will be performed with a logical AND of the specified conditions :return: a list of :py:class:`~pdsc.metata.PdsMetadata` objects corresponding to observations matching the specified query conditions >>> import pdsc >>> client = pdsc.PdsClient() >>> metadata = client.query('hirise_rdr', [ ... ('corner1_latitude', '>', -0.5), ... ('corner1_latitude', '<', 0.5) ... ]) .. Warning:: This function currently assumes non-adversarial inputs; the current implementation allows a potential SQL injection attack. """ return list(self._query(instrument, conditions))
[docs] def query_by_observation_id(self, instrument, observation_ids): """ Query observation matching any of the specified ``observation_ids`` :param instrument: PDSC instrument name :param observation_ids: either a collection of observation ids, or a single observation id :return: a list of :py:class:`~pdsc.metadata.PdsMetadata` objects corresponding to observations matching the specified ``observation_ids`` >>> import pdsc >>> client = pdsc.PdsClient() >>> metadata = client.query_by_observation_id( ... 'hirise_rdr', 'PSP_005423_1780' ... ) >>> len(metadata) 2 >>> metadata[0].product_id u'PSP_005423_1780_COLOR' >>> metadata[1].product_id u'PSP_005423_1780_RED' >>> metadata = client.query_by_observation_id( ... 'hirise_rdr', ['PSP_010341_1775', 'PSP_010486_1775'] ... ) >>> len(metadata) 4 .. Note:: Some instruments generate multiple data products per observation, os a single observation id might correspond to multiple :py:class:`~pdsc.metadata.PdsMetadata` objects for each data product. """ if instrument not in self.instruments: raise ValueError('Instrument "%s" not found' % instrument) single_id = (type(observation_ids) == str) if single_id: observation_ids = [observation_ids] db_file = self._db_files[instrument] params = {'detect_types': sqlite3.PARSE_DECLTYPES} with sqlite3.connect(db_file, **params) as conn: cur = conn.cursor() values = set([]) for oid in observation_ids: cur.execute( 'SELECT * FROM metadata WHERE observation_id=?', (oid,) ) rows = cur.fetchall() values |= set(rows) names = [description[0] for description in cur.description] metadata = [ PdsMetadata(instrument, **dict(zip(names, v))) for v in sorted(values) ] return metadata
def _query_segments(self, instrument, segment_ids): db_file = self._seg_files[instrument] with sqlite3.connect(db_file) as conn: cur = conn.cursor() values = [] for sid in segment_ids.tolist(): cur.execute( 'SELECT * FROM segments WHERE segment_id=?', (sid,) ) row = cur.fetchone() assert(row is not None) values.append(row) segments = [ (v[1], TriSegment(v[2:4], v[4:6], v[6:8])) for v in values ] return segments def _get_observation_segments(self, instrument, observation_id): db_file = self._seg_files[instrument] with sqlite3.connect(db_file) as conn: cur = conn.cursor() cur.execute( 'SELECT * FROM segments WHERE observation_id=?', (observation_id,) ) values = cur.fetchall() segments = [ TriSegment(v[2:4], v[4:6], v[6:8]) for v in values ] return segments
[docs] def find_observations_of_latlon(self, instrument, lat, lon, radius=0): """ Find observations from a particular instrument that fall within a radius of the given location. :param instrument: PDSC instrument name :param lat: degrees latitude :param lon: degrees east longitude :param radius: query tolerance in meters :return: a list of observation ids corresponding to observations within ``radius`` of the given location >>> import pdsc >>> client = pdsc.PdsClient() >>> observation_ids = client.find_observations_of_latlon( ... 'hirise_rdr', -4.5, 137.4, radius=0 ... ) >>> observation_ids # doctest: +ELLIPSIS [u'ESP_018854_1755', u'ESP_018920_1755', ..., u'PSP_010639_1755'] """ assert(instrument in self._seg_files) tree = self._get_seg_tree(instrument) point = PointQuery(lat, lon, radius) idx = tree.query_point(point) segments = self._query_segments(instrument, idx) overlapping_observations = set([]) for observation_id, seg in segments: if observation_id in overlapping_observations: continue if seg.includes_point(point): overlapping_observations.add(observation_id) return sorted(overlapping_observations)
[docs] def find_overlapping_observations(self, instrument, observation_id, other_instrument): """ Find observations from ``other_instrument`` that overlap observations with the given ``observation_id`` from ``instrument``. :param instrument: PDSC instrument name for query observation :param observation_id: query observation id :param other_instrument: PDSC instrument name for target instrument :return: a list of observation ids corresponding to observations overlapping the given observation >>> import pdsc >>> client = pdsc.PdsClient() >>> observation_ids = client.find_overlapping_observations( ... 'ctx', 'P09_004477_1906_XN_10N100W', 'hirise_rdr' ... ) >>> observation_ids # doctest: +ELLIPSIS [u'ESP_015909_1890', u'ESP_016832_1885', ..., u'PSP_007246_1890'] """ for i in (instrument, other_instrument): assert(i in self._seg_files) tree = self._get_seg_tree(other_instrument) overlapping_observations = set([]) for seg in self._get_observation_segments(instrument, observation_id): idx = tree.query_segment(seg) other_segments = self._query_segments(other_instrument, idx) for other_oid, other_seg in other_segments: if other_oid in overlapping_observations: continue if seg.overlaps_segment(other_seg): overlapping_observations.add(other_oid) return sorted(overlapping_observations)
[docs]class PdsHttpClient(object): """ The :py:class:`PdsHttpClient` class handles queries to a remote PDSC database over HTTP for looking up observations based on id or location. The interface for :py:class:`PdsHttpClient` is the same as for :py:class:`PdsClient`. """ def __init__(self, host=None, port=None): """ :param host: the hostname of the :py:class:`~pdsc.server.PdsServer` to query :param port: the port to use for queries """ if port is None: port = os.environ.get(PORT_VAR, None) if port is not None: try: port = int(port) except ValueError: raise ValueError( 'Port must be integer (got "%s")' % port ) if host is None: host = os.environ.get(SERVER_VAR, None) if host is None: raise ValueError( 'Must specify server hostname ' 'or set "%s" environment variable' % SERVER_VAR ) self.base_url = 'http://%s%s/' % ( host, '' if port is None else (':%d' % port) ) def query(self, instrument, conditions=None): url = self.base_url + 'query' params = { 'instrument': instrument } if conditions is not None: params['conditions'] = json.dumps(conditions) response = requests.post(url, data=params) response.raise_for_status() return json_loads(response.text) def query_by_observation_id(self, instrument, observation_ids): url = self.base_url + 'queryByObservationId' if type(observation_ids) != str: observation_ids = list(observation_ids) params = { 'instrument': instrument, 'observation_ids': json.dumps(observation_ids) } response = requests.post(url, data=params) response.raise_for_status() return json_loads(response.text) def find_observations_of_latlon(self, instrument, lat, lon, radius=0): url = self.base_url + 'queryByLatLon' params = { 'instrument': instrument, 'lat': lat, 'lon': lon, 'radius': radius, } response = requests.get(url, params=params) response.raise_for_status() return list(map(str, response.json())) def find_overlapping_observations(self, instrument, observation_id, other_instrument): url = self.base_url + 'queryByOverlap' params = { 'instrument': instrument, 'observation_id': observation_id, 'other_instrument': other_instrument, } response = requests.get(url, params=params) response.raise_for_status() return list(map(str, response.json()))