# contains the business logic for aggregates from typing import List, Dict from pprint import pprint from fastapi import Request from src.aggregate.aggregate_schema import AggregateSchema, MetricEnum from logging import getLogger from src.utils import round_bytes, get_data_from_ontap logger = getLogger("uvicorn") logger.setLevel("DEBUG") # TAG2REST = { # 'worm_compliance': { 'snaplock_type': 'compliance' }, # 'worm_enterprise': { 'snaplock_type': 'enterprise' }, # 'flash': { 'block_storage.storage_type': 'ssd' }, # 'hdd': { 'block_storage.storage_type': 'hdd' }, # 'mcc': { 'block_storage.mirror.enabled': 'true' } # } # { # "flash": "production", # "performance": "gold", # "worm": "compliance" # } async def get_aggregates(request: Request, metric: str = "relative", tags: Dict[str, str] = None) -> List[AggregateSchema]: # Dummy data for demonstration # You can use the metric parameter to filter or modify results as needed # For now, just return the same data and show metric usage logger.debug(f"Metric used: {metric}") logger.debug(f"Tags used: {tags}") # convert tags to ONTAP filter # filter_str = "" # if tags: # str_filter_parts = [f"tag.{key} eq '{value}'" for key, value in tags.items()] # param_str = "&".join([f"{TAG2REST[key]}" for key, value in tags.items()]) __aggregates = await get_data_from_ontap(request, logger, "storage/aggregates", "fields=*") pprint(__aggregates) if metric == MetricEnum.relative: __aggregates = sorted(__aggregates, key=lambda r: r["space"]["block_storage"].get("used_percent"), reverse=True) elif metric == MetricEnum.absolute: __aggregates = sorted(__aggregates, key=lambda r: r["space"]["block_storage"].get("available"), reverse=False) aggregates: list = [ AggregateSchema( aggregate=a["name"], node=a["node"]["name"], available=a["space"]["block_storage"]["available"], available_str=round_bytes(a["space"]["block_storage"]["available"]), ) for a in __aggregates ] return aggregates