# Class definitions for storage summary # Classes needed: ### DriveHealthServer - this will be the list of remote server objects and functions for interacting with them ### DriveHealthClient - this will be the remote client class where all the drives are from typing import List, Mapping, Any, Sequence, Dict from Helpers import * print("Importing Storage Class") ################################################################# ### DriveHealthServer Class ### This is the server objext ################################################################# class DriveHealthServer: # create server object for local cache of remote clients def __init__(self, hostname: str): # the system needs a name, should be equal to the uuid of the client self.name = hostname self.short_id = short_uuid(self.name) self.hostname = hostname # system contains an array of CosmostatClient Objects self.clients = [] def __str__(self): self_string = f"DriveHealthServer {self.name} - {self.short_id}" return self_string def __repr__(self): self_string = f"DriveHealthServer {self.name} - {self.short_id}" def __del__(self): print("Deleting Server") # either add or update client def process_client_data(self, client_data: dict): result = None if self.check_for_uuid(self.calculate_uuid(client_data)): result = { "message": "Updating client.", "summary": self.update_client(client_data) } else: result = { "message": "Creating new client", "summary": self.add_client(client_data) } return result def add_client(self, client_data: dict): new_client = DriveHealthClient(client_data) self.clients.append(new_client) return new_client.get_summary() def update_client(self, client_data): result = self.get_client(self.calculate_uuid(client_data)) result.update_client(client_data) return result.get_summary() def get_client(self, client_uuid: str): result = None for client in self.clients: if client.short_id == client_uuid: result = client return result def remove_client(self, client_uuid: str | list[str]): result = None old_clients = self.clients temp_clients = [] purged_clients = [] for client in old_clients: if client.short_id in client_uuid: purged_clients.append(client) else: temp_clients.append(client) self.clients = temp_clients result = { "message": "client removal complete", #"clients_removed": purged_clients, #"new_client_count": len(self.clients), #"old_client_count": len(old_clients) } return result def check_for_uuid(self, uuid: str): result = False for client in self.clients: if client.short_id == uuid: result = True return result # calculate uuid based on same parameters def calculate_uuid(self, client_data): unique_string = f"{client_data["hostname"]} - {client_data["IP Address"]}" return short_uuid(unique_string) ################################################################# ### DriveHealthClient Class ### These are the actual remote clients ################################################################# class DriveHealthClient: ############################################################ # instantiate new DriveHealthClient ############################################################ def __init__(self, client_data: dict): # the system needs a name, should be equal to the uuid of the client self.client_data = client_data self.ip = self.client_data["IP Address"] self.name = self.client_data["hostname"] self.data_timestamp = self.client_data["processed_at"] self._unique_string = f"{self.name} - {self.ip}" self.short_id = short_uuid(self._unique_string) self.drives = self.client_data["drives"] def __str__(self): self_string = f"DriveHealthClient Server {self.name} - {self.short_id}" return self_string def __repr__(self): self_string = f"DriveHealthClient Server {self.name} - {self.short_id}" def __del__(self): print("Deleting Client") def get_summary(self): drives_brief = [] for drive in self.drives: drives_brief.append({ "serial": drive["Serial Number"], "model": drive["Model"], "capacity": drive["Disk Size"] }) result = { "name": self.name, "hostname": self.ip, "uuid": self.short_id, "drives": drives_brief } return result def get_details(self): drive_details = [] for drive in self.drives: drive_details.append({ "disk_id": drive["Disk ID"], "serial": drive["Serial Number"], "health_status": drive["Health Status"], "model": drive["Model"], "capacity": drive["Disk Size"], "power_on_hours": drive["Power On Hours"], "power_on_count": drive["Power On Count"], "host_writes": drive["Host Writes"], "wear_level": drive["Wear Level Count"], "drive_letter": drive["Drive Letter"], "drive_interface": drive["Interface"], "transfer_mode" : drive["Transfer Mode"] }) result = { "name": self.name, "ip": self.ip, "uuid": self.short_id, "timestamp": self.data_timestamp, "drives": drive_details } return result def update_client(self, client_data: dict): result = None self.data_timestamp = client_data["processed_at"] self.drives = client_data["drives"] return result