Source code for rook.usage.downloads

import ipaddress
import logging
import subprocess  # noqa: S404
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime
from pathlib import Path
from urllib.parse import urlparse

import pandas as pd
from pywps import configuration as config

from .base import Usage

LOGGER = logging.getLogger()


[docs] class NotFoundError(ValueError): """Raised when a log entry is not found or is invalid.""" pass
[docs] class AddressValueError(ValueError): """Raised when an IP address cannot be parsed.""" pass
[docs] def dot2longip(ip): """Convert an IPv4 address to an IP number.""" try: return int(ipaddress.IPv4Address(ip)) except ipaddress.AddressValueError: LOGGER.debug(f"Could not convert IP address to an IP number: {ip}. Skipping...") return 0
[docs] def parse_record(line): """Parse a log record into a dictionary.""" tokens = line.strip().split() MIN_EXPECTED_TOKENS = 12 # noqa: N806 if len(tokens) < MIN_EXPECTED_TOKENS: LOGGER.warning("Line does not contain the expected apache record format") raise NotFoundError("Invalid log line format") ip_number = dot2longip(tokens[0]) if ip_number == 0: raise NotFoundError("Invalid IP address") try: record_time = datetime.strptime(tokens[3].lstrip("["), "%d/%b/%Y:%H:%M:%S") except ValueError: LOGGER.warning(f"Invalid datetime format: {tokens[3].lstrip('[')}") raise NotFoundError("Invalid datetime format") try: status_code = int(tokens[8]) size = int(tokens[9]) if tokens[9] != "-" else 0 except ValueError: LOGGER.warning(f"Invalid status code or size: {tokens[8]}, {tokens[9]}") raise NotFoundError("Invalid status code or size") return { "remote_host_ip": tokens[0], "ip_number": ip_number, "datetime": record_time, "timezone": tokens[4].rstrip("]"), "request_type": tokens[5].lstrip('"'), "request": tokens[6], "protocol": tokens[7].rstrip('"'), "status_code": status_code, "size": size, "referer": tokens[10].replace('"', ""), "user_agent": " ".join(tokens[11:]).strip('"'), }
[docs] class Downloads(Usage): def __init__(self): self._output_path = urlparse( config.get_config_value("server", "outputurl") ).path self._http_log_path = config.get_config_value("logging", "http_log_path") @property def output_path(self): return self._output_path @property def http_log_path(self): return self._http_log_path
[docs] def collect(self, time_start=None, time_end=None, outdir=None): log_files = sorted(Path(self.http_log_path).glob("access.log*")) return self.parse(log_files, time_start, time_end, outdir)
[docs] def parse(self, log_files, time_start=None, time_end=None, outdir=None): def process_file(log_file): records = [] try: # FIXME: This is very insecure, as it allows for command injection # Use zgrep to pre-filter logs based on the output path p = subprocess.run( # noqa: S603 ["zgrep", search_pattern, log_file], # noqa: S607 stdout=subprocess.PIPE, text=True, check=True, ) lines = p.stdout.splitlines() for line in lines: try: record = parse_record(line) records.append(record) except NotFoundError: continue except subprocess.CalledProcessError as e: LOGGER.error(f"Failed to process log file {log_file}: {e}") return records search_pattern = rf"GET {self.output_path}/.*/.*\.nc" all_records = [] with ThreadPoolExecutor() as executor: futures = [ executor.submit(process_file, log_file) for log_file in log_files ] for future in futures: all_records.extend(future.result()) if not all_records: raise NotFoundError("Could not find any records") df = pd.DataFrame(all_records) df = df[ df["request"].str.contains(rf"{self.output_path}/.*/.*\.nc", regex=True) ] if time_start: df = df[df["datetime"] >= time_start] if time_end: df = df[df["datetime"] <= time_end] fname = Path(outdir).joinpath("downloads.csv").as_posix() df.to_csv(fname, index=False) return fname