Skip to content
This repository has been archived by the owner on Oct 18, 2019. It is now read-only.

Commit

Permalink
Merge pull request #462 from lukecampbell/harvesting
Browse files Browse the repository at this point in the history
Improves harvesting
  • Loading branch information
lukecampbell authored Apr 28, 2017
2 parents 2b1d9a5 + 370c072 commit b492592
Show file tree
Hide file tree
Showing 11 changed files with 282 additions and 114 deletions.
2 changes: 1 addition & 1 deletion config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ COMMON: &common
# Logging and Debug
DEBUG: False
TESTING: False
LOG_FILE: True
LOG_FILE: logs/ioos_catalog.txt
JSONIFY_PRETTYPRINT_REGULAR: False
# Host configuration, only used by the app.py not used by gunicorn or WSGI
# service
Expand Down
7 changes: 4 additions & 3 deletions ioos_catalog/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,14 @@
RQDashboard(app)

# Create logging
if app.config.get('LOG_FILE') == True:
if app.config.get('LOG_FILE'):
import logging
from logging import FileHandler
file_handler = FileHandler('logs/ioos_catalog.txt')
file_handler = FileHandler(app.config['LOG_FILE'])
formatter = logging.Formatter('%(asctime)s - %(process)d - %(name)s - %(module)s:%(lineno)d - %(levelname)s - %(message)s')
file_handler.setFormatter(formatter)
file_handler.setLevel(logging.INFO)
config_logging_level = app.config.get('LOGGING_LEVEL', 'INFO')
file_handler.setLevel(getattr(logging, config_logging_level))
app.logger.addHandler(file_handler)
app.logger.info('Application Process Started')

Expand Down
192 changes: 153 additions & 39 deletions ioos_catalog/harvesters/sos_harvester.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,67 +26,171 @@

from pyoos.parsers.ioos.describe_sensor import IoosDescribeSensor
from petulantbear import netcdf2ncml

from urllib import urlencode
import geojson
import json


IOOS_SENSORML = 'text/xml;subtype="sensorML/1.0.1/profiles/ioos_sos/1.0"'
IOOS_SWE = 'text/xml;subtype="om/1.0.0/profiles/ioos_sos/1.0"'
SENSORML = 'text/xml;subtype="sensorML/1.0.1"'


class SosHarvestError(Exception):
def __init__(self, message):
self.message = message
self.messages = [message]

def append(self, message):
self.messages.append(message)

def __repr__(self):
return '\n'.join(['SosHarvestError'] + self.messages)


class DescribeSensorError(SosHarvestError):
pass


class SosHarvester(Harvester):

def __init__(self, service):
Harvester.__init__(self, service)
self.output_format = IOOS_SENSORML

def _handle_ows_exception(self, **kwargs):
try:
return self.sos.describe_sensor(**kwargs)
except ows.ExceptionReport as e:
if e.code == 'InvalidParameterValue':
# TODO: use SOS getCaps to determine valid formats
# some only work with plain SensorML as the format

# see if O&M will work instead
try:
kwargs[
'outputFormat'] = 'text/xml;subtype="om/1.0.0/profiles/ioos_sos/1.0"'
return self.sos.describe_sensor(**kwargs)

# see if plain sensorml wll work
except ows.ExceptionReport as e:
# if this fails, just raise the exception without handling
# here
kwargs['outputFormat'] = 'text/xml;subtype="sensorML/1.0.1"'
return self.sos.describe_sensor(**kwargs)
elif e.msg == 'No data found for this station':
# Put the current output format first, this will prevent us from trying
# subsequent calls with different formats.
formats = [IOOS_SENSORML, IOOS_SWE, SENSORML]
formats.pop(formats.index(self.output_format))
formats.insert(0, self.output_format)
for output_format in formats:
try:
self.output_format = output_format
kwargs['outputFormat'] = output_format
return self.sos.describe_sensor(**kwargs)
except ows.ExceptionReport as e:
if e.code == 'InvalidParameterValue':
continue
e.msg = e.msg + '\n' + self.format_url(kwargs['procedure'])
raise e
else:
raise SosHarvestError('No valid outputFormat found for DescribeSensor')

def _describe_sensor(self, uid, timeout=120,
outputFormat='text/xml;subtype="sensorML/1.0.1/profiles/ioos_sos/1.0"'):
def _describe_sensor(self, uid, timeout=120):
"""
Issues a DescribeSensor request with fallback behavior for oddly-acting SOS servers.
"""
kwargs = {
'outputFormat': outputFormat,
'procedure': uid,
'timeout': timeout
}

return self._handle_ows_exception(**kwargs)

def format_url(self, procedure, outputFormat=None):
'''
Returns the full SOS GET URL for the offering/procedure.
'''
if outputFormat is None:
outputFormat = self.output_format
try:
base_url = next((m.get('url') for m in self.sos.getOperationByName('DescribeSensor').methods if m.get('type').lower() == 'get'))
except StopIteration:
base_url = self.sos.url

while base_url.endswith('?'):
base_url = base_url[:-1]

request = {'service': 'SOS', 'version': self.sos.version, 'request': 'DescribeSensor'}
if isinstance(outputFormat, str):
request['outputFormat'] = outputFormat
if isinstance(procedure, str):
request['procedure'] = procedure
data = urlencode(request)
url = base_url + '?' + data
return url

def update_service_metadata(self):
metamap = self.metamap_service()
metadata = db.Metadata.find_one({"ref_id": self.service._id})
if metadata is None:
metadata = db.Metadata()
metadata.ref_id = self.service._id
metadata.ref_type = u'service'

update = {
'cc_score': {
'score': 0.,
'max_score': 0.,
'pct': 0.
},
'cc_results': [],
'metamap': metamap
}
for record in metadata.metadata:
if record['service_id'] == self.service._id:
record.update(update)
break
else:
record = {
'service_id': self.service._id,
'checker': None
}
record.update(update)
metadata.metadata.append(record)

metadata.updated = datetime.utcnow()
metadata.save()
return metadata

def update_dataset_metadata(self, dataset_id, sensor_ml, describe_sensor_url=None):
metamap = self.metamap_station(sensor_ml)
if describe_sensor_url:
metamap['Describe Sensor URL'] = describe_sensor_url
metadata = db.Metadata.find_one({"ref_id": dataset_id})
if metadata is None:
metadata = db.Metadata()
metadata.ref_id = dataset_id
metadata.ref_type = u'dataset'

update = {
'cc_score': {
'score': 0.,
'max_score': 0.,
'pct': 0.
},
'cc_results': [],
'metamap': metamap
}
for record in metadata.metadata:
if record['service_id'] == self.service._id:
record.update(update)
break
else:
record = {
'service_id': self.service._id,
'checker': None
}
record.update(update)
metadata.metadata.append(record)

metadata.updated = datetime.utcnow()
metadata.save()
return metadata

def harvest(self):
self.sos = SensorObservationService(self.service.get('url'))

scores = self.ccheck_service()
metamap = self.metamap_service()
try:
self.save_ccheck_service('ioos', scores, metamap)
finally:
pass
self.update_service_metadata()

# List storing the stations that have already been processed in this SOS server.
# This is kept and checked later to avoid servers that have the same
# stations in many offerings.
processed = []

exception = None

# handle network:all by increasing max timeout
net_len = len(self.sos.offerings)
net_timeout = 120 if net_len <= 36 else 5 * net_len
Expand All @@ -105,7 +209,14 @@ def harvest(self):
if len(sp_uid) > 2 and sp_uid[2] == "network": # Network Offering
if uid[-3:].lower() == 'all':
continue # Skip the all
net = self._describe_sensor(uid, timeout=net_timeout)
try:
net = self._describe_sensor(uid, timeout=net_timeout)
except Exception as e:
message = '\n'.join(['DescribeSensor failed for {}'.format(uid), e.message])
if exception is None:
exception = DescribeSensorError(message)
else:
exception.append(message)

network_ds = IoosDescribeSensor(net)
# Iterate over stations in the network and process them
Expand All @@ -126,6 +237,9 @@ def harvest(self):
self.process_station(uid, offering)
processed.append(uid)

if exception is not None:
raise exception

def process_station(self, uid, offering):
""" Makes a DescribeSensor request based on a 'uid' parameter being a
station procedure. Also pass along an offering with
Expand All @@ -135,7 +249,6 @@ def process_station(self, uid, offering):

with app.app_context():

app.logger.info("process_station: %s", uid)
desc_sens = self._describe_sensor(uid, timeout=1200)
# FIXME: add some kind of notice saying the station failed
if desc_sens is None:
Expand Down Expand Up @@ -167,10 +280,10 @@ def process_station(self, uid, offering):

# Find service reference in Dataset.services and remove (to replace
# it)
tmp = dataset.services[:]
for d in tmp:
if d['service_id'] == self.service.get('_id'):
dataset.services.remove(d)
dataset_services = dataset.services[:]
for service in dataset_services:
if service['url'] == self.service.get('url'):
dataset.services.remove(service)

# Parsing messages
messages = []
Expand Down Expand Up @@ -246,6 +359,7 @@ def process_station(self, uid, offering):
'service_type': self.service.get('service_type'),
'service_id': ObjectId(self.service.get('_id')),
'data_provider': self.service.get('data_provider'),
'url': self.service.url,
'metadata_type': u'sensorml',
'metadata_value': u'',
'time_min': getattr(offering, 'begin_position', None),
Expand All @@ -257,17 +371,17 @@ def process_station(self, uid, offering):
'geojson': gj,
'updated': datetime.utcnow()
}
dataset.service_url = self.service.url

dataset.services.append(service)
dataset.updated = datetime.utcnow()
dataset.save()

# do compliance checker / metadata now
scores = self.ccheck_station(sensor_ml)
metamap = self.metamap_station(sensor_ml)

try:
self.save_ccheck_station('ioos', dataset._id, scores, metamap)
describe_sensor_url = self.format_url(uid)
self.update_dataset_metadata(dataset._id, sensor_ml, describe_sensor_url)
except Exception as e:
app.logger.warn(
"could not save compliancecheck/metamap information: %s", e)
Expand Down
Loading

0 comments on commit b492592

Please sign in to comment.