Source code for rook.usage.combine
import concurrent.futures
import time as time_
from pathlib import Path
import pandas as pd
from owslib.wps import ASYNC, WebProcessingService
from pywps import configuration as config
from .base import Usage
URLS = {
"local": config.get_config_value("server", "url"),
"ipsl": "http://copernicus-wps.ipsl.fr/wps",
"dkrz": "http://rook8.cloud.dkrz.de/wps",
}
[docs]
def get_usage(site, time):
wps = WebProcessingService(url=URLS[site])
resp = wps.execute(
identifier="usage",
inputs=[("time", time)],
mode=ASYNC,
output=[("wpsusage", True), ("downloads", True)],
)
while resp.isComplete() is False:
time_.sleep(10)
resp.checkStatus()
if not resp.isSucceded():
raise Exception("usage collection failed.")
# requests
df = pd.read_csv(
resp.processOutputs[0].reference, parse_dates=["time_start", "time_end"]
)
df["site"] = site
df["URL"] = URLS[site]
# downloads
df_downloads = pd.read_csv(
resp.processOutputs[1].reference, parse_dates=["datetime"]
)
df_downloads["site"] = site
df_downloads["URL"] = URLS[site]
return df, df_downloads
[docs]
class Combine(Usage):
def __init__(self, site=None):
site = site or "local"
if site == "all":
self.sites = ["ipsl", "dkrz"]
else:
self.sites = [site]
[docs]
def collect(self, time_start=None, time_end=None, outdir=None):
time = format_time(time_start, time_end)
df_list = []
df_downloads_list = []
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
jobs = {executor.submit(get_usage, site, time): site for site in self.sites}
for future in concurrent.futures.as_completed(jobs):
site = jobs[future]
try:
df, df_downloads = future.result()
except Exception:
raise Exception(f"usage collection for site={site} failed.")
else:
df_list.append(df)
df_downloads_list.append(df_downloads)
# dump usage
cdf = pd.concat(df_list, ignore_index=True)
fusage = Path(outdir).joinpath("usage.csv").as_posix()
cdf.to_csv(fusage, index=False)
# dump downloads
cdf = pd.concat(df_downloads_list, ignore_index=True)
fdownloads = Path(outdir).joinpath("downloads.csv").as_posix()
cdf.to_csv(fdownloads, index=False)
return fusage, fdownloads