feat: added functionality inside GET/aggregates

This commit is contained in:
Magel, Denis
2025-09-18 12:16:30 +02:00
parent 1592333ef8
commit e8efde9892
4 changed files with 54 additions and 18 deletions

View File

@@ -1,16 +1,10 @@
# contains the router for the aggregates endpoint # contains the router for the aggregates endpoint
from fastapi import APIRouter, Query from fastapi import APIRouter, Query
from enum import Enum
from typing import List from typing import List
from .aggregate_schema import AggregateSchema from .aggregate_schema import AggregateSchema, MetricEnum
from .aggregate_service import get_aggregates from .aggregate_service import get_aggregates
class MetricEnum(str, Enum):
relative = "relative"
absolute = "absolute"
router = APIRouter(tags=["aggregates"]) router = APIRouter(tags=["aggregates"])

View File

@@ -1,8 +1,15 @@
# contains the schema definitions for aggregates # contains the schema definitions for aggregates
from pydantic import BaseModel from pydantic import BaseModel
from enum import Enum
class AggregateSchema(BaseModel): class AggregateSchema(BaseModel):
aggregate: str aggregate: str
node: str node: str
available: str available: int
available_str: str
class MetricEnum(str, Enum):
relative = "relative"
absolute = "absolute"

View File

@@ -1,24 +1,36 @@
# contains the business logic for aggregates # contains the business logic for aggregates
from typing import List from typing import List
from .aggregate_schema import AggregateSchema from .aggregate_schema import AggregateSchema, MetricEnum
from logging import getLogger
from ..utils import round_bytes, get_data_from_ontap
logger = getLogger("uvicorn")
logger.setLevel("DEBUG")
async def get_aggregates(metric: str = "relative") -> List[AggregateSchema]: async def get_aggregates(metric: str = "relative") -> List[AggregateSchema]:
# Dummy data for demonstration # Dummy data for demonstration
# You can use the metric parameter to filter or modify results as needed # You can use the metric parameter to filter or modify results as needed
# For now, just return the same data and show metric usage # For now, just return the same data and show metric usage
print(f"Metric used: {metric}") logger.debug(f"Metric used: {metric}")
__aggregates = get_data_from_ontap(logger, "172.16.57.2", "admin", "Netapp12", "storage/aggregates", "fields=name,uuid,space,node,home_node")
logger.debug(__aggregates)
__aggregates = __aggregates.get("records")
if metric == MetricEnum.relative:
__aggregates = sorted(__aggregates, key=lambda r: r["space"]["block_storage"].get("used_percent"), reverse=True)
elif metric == MetricEnum.absolute:
__aggregates = sorted(__aggregates, key=lambda r: r["space"]["block_storage"].get("available"), reverse=False)
aggregates: list = [ aggregates: list = [
AggregateSchema( AggregateSchema(
aggregate="Aggregate A", node="cluster01-01", available="100.0TB" aggregate=a["name"],
), node=a["node"]["name"],
AggregateSchema( available=a["space"]["block_storage"]["available"],
aggregate="Aggregate B", node="cluster01-01", available="200.5GB" available_str=round_bytes(a["space"]["block_storage"]["available"]),
), )
AggregateSchema( for a in __aggregates
aggregate="Aggregate C", node="cluster01-02", available="300.75MB"
),
] ]
return aggregates return aggregates

23
src/utils.py Normal file
View File

@@ -0,0 +1,23 @@
import httpx
def round_bytes(size_in_bytes: int) -> str:
# Helper function to convert bytes to a human-readable format
for unit in ["B", "KiB", "MiB", "GiB", "TiB", "PiB"]:
if size_in_bytes < 1024:
return f"{size_in_bytes:.2f}{unit}"
size_in_bytes /= 1024
return f"{size_in_bytes:.2f}EB"
def get_data_from_ontap(logger, hostname: str, username: str, password: str, endpoint: str, query_string: str = ""):
url = f"https://{hostname}/api/{endpoint}"
if query_string:
url += f"?{query_string}"
try:
logger.debug(f"Fetching data from ONTAP: {url}")
response = httpx.get(url, auth=(username, password), verify=False)
response.raise_for_status()
return response.json()
except httpx.HTTPError as e:
logger.error(f"HTTP error occurred: {e}")
return None