Source code for rook.provenance

import json
import uuid
from datetime import datetime
from pathlib import Path

import prov.model as prov
from prov.dot import prov_to_dot
from prov.identifier import Namespace

from pywps import configuration

# prov namespace
PROV_ORGANISATION = prov.PROV["Organization"]
PROV_PROVIDER = prov.PROV["Agent"]
PROV_SOFTWARE_AGENT = prov.PROV["SoftwareAgent"]

# provone namespace
PROVONE = Namespace(
    "provone", uri="https://purl.dataone.org/provone/2015/01/15/ontology#"
)
PROVONE_WORKFLOW = PROVONE["Workflow"]
PROVONE_DATA = PROVONE["Data"]
PROVONE_EXECUTION = PROVONE["Execution"]

# dcterms namespace
DCTERMS = Namespace("dcterms", uri="http://purl.org/dc/terms/")
DCTERMS_SOURCE = DCTERMS["source"]

# roocs namespace
ROOCS = Namespace("roocs", uri="urn:roocs:")


[docs] class Provenance: def __init__(self, output_dir): if isinstance(output_dir, Path): self.output_dir = output_dir else: self.output_dir = Path(output_dir) self.doc = None self._identifier = None self._workflow = None @property def identifier(self): return self._identifier
[docs] def start(self, workflow=False): from clisops import __version__ as clisops_version from rook import __version__ as rook_version self.doc = prov.ProvDocument() self._identifier = uuid.uuid4() self._workflow = None # Declaring namespaces for various prefixes self.doc.add_namespace(ROOCS) self.doc.add_namespace(PROVONE) self.doc.add_namespace(DCTERMS) # Define entities project_cds = self.doc.agent( ROOCS["C3S_CDS"], { prov.PROV_TYPE: PROV_ORGANISATION, prov.PROV_LABEL: "Copernicus Climate Data Store", DCTERMS_SOURCE: "https://cds.climate.copernicus.eu", }, ) provider = self.doc.agent( ROOCS["Provider"], { prov.PROV_TYPE: PROV_PROVIDER, prov.PROV_LABEL: "Provider", DCTERMS_SOURCE: configuration.get_config_value( "metadata:main", "provider", "Rook" ), }, ) self.sw_rook = self.doc.agent( ROOCS[f"rook_v{rook_version}"], { prov.PROV_TYPE: PROV_SOFTWARE_AGENT, prov.PROV_LABEL: "Rook", DCTERMS_SOURCE: f"https://github.com/roocs/rook/releases/tag/v{rook_version}", }, ) self.doc.wasAttributedTo(self.sw_rook, project_cds) self.doc.wasAttributedTo(self.sw_rook, provider) self.sw_clisops = self.doc.agent( ROOCS[f"clisops_v{clisops_version}"], { prov.PROV_TYPE: PROV_SOFTWARE_AGENT, prov.PROV_LABEL: "clisops", DCTERMS_SOURCE: f"https://github.com/roocs/clisops/releases/tag/v{clisops_version}", }, ) # workflow if workflow is True: self._workflow = self.doc.entity( ROOCS[f"workflow_{self.identifier}"], {prov.PROV_TYPE: PROVONE_WORKFLOW} ) orchestrate = self._execution_activity( identifier=ROOCS[f"orchestrate_{self.identifier}"], label="orchestrate", attributes={ prov.PROV_ATTR_STARTTIME: datetime.now().isoformat( timespec="seconds" ) }, ) self.doc.wasAssociatedWith( orchestrate, agent=self.sw_rook, plan=self._workflow )
[docs] def stop(self): if self._workflow: self._execution_activity( identifier=ROOCS[f"orchestrate_{self.identifier}"], attributes={ prov.PROV_ATTR_ENDTIME: datetime.now().isoformat(timespec="seconds") }, )
[docs] def add_operator(self, operator, parameters, collection, output): attributes = {} for param in [ "time", "time_components", "area", "level", "dims", "freq", "method", "grid", "adaptive_masking_threshold", "apply_fixes", "apply_average", ]: if param in parameters: value = parameters[param] if isinstance(value, list): value = ",".join(value) attributes[ROOCS[param]] = value op = self._execution_activity( identifier=ROOCS[f"{operator}_{uuid.uuid4()}"], label=operator, attributes=attributes, ) # input data ds_in = Path(collection[0]).name op_input = self._data_entitiy(identifier=ROOCS[ds_in], label=ds_in) # operator started by clisops if self._workflow: self.doc.wasAssociatedWith(op, agent=self.sw_clisops, plan=self._workflow) else: self.doc.start(op, starter=self.sw_clisops, trigger=self.sw_rook) # Generated output file for out in output: ds_out = Path(out).name op_output = self._data_entitiy(identifier=ROOCS[ds_out], label=ds_out) self.doc.wasDerivedFrom(op_output, op_input, activity=op)
def _data_entitiy(self, identifier, label=None): records = self.doc.get_record(identifier) if records: entity = records[0] else: entity = self.doc.entity(identifier) entity.add_attributes( { prov.PROV_TYPE: PROVONE_DATA, prov.PROV_LABEL: label or "data", } ) return entity def _execution_activity(self, identifier, label=None, attributes=None): records = self.doc.get_record(identifier) if records: activity = records[0] else: activity = self.doc.activity(identifier) activity.add_attributes( { prov.PROV_TYPE: PROVONE_EXECUTION, prov.PROV_LABEL: label or "operator", } ) if attributes: activity.add_attributes(attributes) return activity
[docs] def write_json(self): outfile = self.output_dir / "provenance.json" self.doc.serialize(outfile.as_posix(), format="json") return outfile
[docs] def write_png(self): outfile = self.output_dir / "provenance.png" figure = prov_to_dot(self.doc) figure.write_png(outfile.as_posix()) return outfile
[docs] def get_provn(self): return self.doc.get_provn()
[docs] def dump_json(self): return self.doc.serialize(indent=2)
[docs] def json(self): return json.loads(self.dump_json())