################################################################# ################################################################# ### Cosmostat Component and System Class ################################################################# ################################################################# import subprocess import json import time import weakref import base64, hashlib import ipaddress from typing import Dict, Any, List # Import Cosmos Settings from Cosmos_Settings import * from Helpers import * # Global Class Vars global_max_length = 500 null_result = [ "", "null", None, [], "Unknown", "To Be Filled By O.E.M." ] # import the component descriptor try: with open("descriptors.json", encoding="utf-8") as f: component_class_tree: List[Dict] = json.load(f) except FileNotFoundError as exc: raise RuntimeError("Descriptor file not found") from exc component_types = [] for entry in component_class_tree: if entry["name"] != "System": component_types.append({"name": entry["name"], "multi_check": entry["multi_check"] == "True"}) ################################################################# ################################################################# ### Component Class ### Each Component type is defined by the descriptor and built ### as part of the System Class Instantiation ### Each Component Object contains static and dynamic data ### The static data is declared once at instantiation ### THe dynamic data is periodically updated by the application ### where the System Class Object is instantiated ################################################################# ################################################################# class Component: ############################################################ # instantiate new component # this_device is set when the component has multiple instances ############################################################ def __init__(self, name: str, comp_type: str, parent_system, this_device=None): # begin init self.name = name self.type = comp_type self.parent_system = weakref.ref(parent_system) # this variable is set when the device can have multiples # it indicates that the commands in the descriptor might need templating self.this_device = this_device self.is_virtual = parent_system.is_virtual() self.cpu_arch = parent_system.get_system_arch() if self.this_device is None: log_data(log_output = f"This device - {self.name}", log_level = "log_output") else: log_data(log_output = f"This device - {self.this_device}", log_level = "log_output") # build the component descriptor dictionary self._descriptor = self._parse_descriptor() # store static properties self.multi_check = self.is_multi() self.virt_ignore = self._descriptor.get('virt_ignore', []) self.multi_metrics = self._descriptor.get('multi_metrics', []) self.arch_check = self._descriptor.get('arch_check', []) self.php_extra_list = self._descriptor.get('php_extra', []) if self.is_virtual: self.virt_ignore = [] # initialize properties self._properties: Dict[str, str | list[str]] = {} self._process_properties() # build the description string, requires the properties first self._description_template: str | None = self._descriptor.get("description") self.description = self._description_template.format(**self._properties) # initialize metrics self._metrics: Dict[str, str] = {} self.update_metrics() def __str__(self): self_string = (f"Component name: {self.name}, type: {self.type} - " f"{self.description}") return self_string def __repr__(self): self_string = (f"Component name: {self.name}, type {self.type} - " f"{self.description}") return self_string ############################################################ # Class Functions ############################################################ def update_metrics(self): for key, command in self._descriptor.get('metrics', {}).items(): log_data(log_output = f"Key: {key} - Command: {command}", log_level = "noisy_test") formatted_command = command if self.arch_check is not None: arch_variance = self._descriptor.get('arch_variance', {}) if key in arch_variance: if self.cpu_arch in formatted_command: formatted_command = command[self.cpu_arch] else: formatted_command = f"echo Missing {self.cpu_arch} command" if self.this_device is not None: formatted_command = formatted_command.format(this_device=self.this_device) if formatted_command is not None: result = run_command(formatted_command, zero_only = True) if result not in null_result: self._metrics[key] = result def get_property(self, type = None): if type == None: return self._properties else: return self._properties[type] def is_multi(self): for component_type in component_types: if self.type == component_type["name"]: return component_type["multi_check"] return False # return descriptor for this device type def _parse_descriptor(self): for component in component_class_tree: if component["name"] == self.type: COMPONENT_DESCRIPTORS = component descriptor = COMPONENT_DESCRIPTORS if descriptor is None: raise ValueError( f"Component type '{comp_type}' is not defined in the " f"component descriptor tree." ) return descriptor # iterate over all properties to process descriptor def _process_properties(self): for key, command in self._descriptor.get('properties', {}).items(): # if this is a component that can be multi, then it might need to return a list return_string = True if key in self.multi_metrics: return_string = False formatted_command = self._parse_command(key, command, return_string) log_data(log_output = f"Property {key} - command: {formatted_command}", log_level = "debug_output") result = run_command(formatted_command, zero_only = return_string) if result not in null_result: self._properties[key] = result # helper function to parse command key # this is for substituting variables in descriptor commands when there are multi things # think needing to grep eth0 or grep eth1 to get the same metric for a different component def _parse_command(self, key: str, command: str | list[str], return_string = True): result_command = command log_data(log_output = f"_parse_command - {command}", log_level = "debug_output") if self.arch_check: # since the keys are stored with the arch variable this can be concise arch_variance = self._descriptor.get('arch_variance', {}) if key in arch_variance: if self.cpu_arch in result_command: log_data(log_output = f"arch_variance - {key} - {result_command}", log_level = "debug_output") result_command = result_command[self.cpu_arch] else: result_command = f"echo Missing {self.cpu_arch} command" if self.this_device is not None: # template the key if the component type can have multiples result_command = command.format(this_device=self.this_device) log_data(log_output = f"result - {result_command}", log_level = "debug_output") return result_command # check if this property should show in the System Properties box # these are intended to provide a summary of critical things that all computers have # like memory and cpu and cores def check_php_extra(self, property_name): result = False if property_name in self.php_extra_list: result = True return result ######################################################## # keyed data functions # various ways to query the component data ######################################################## def get_properties_keys(self, component = None): result = [] component_properties = [] if component == None: component_properties = self._properties.items() else: component_properties = self.get_property(component) for name, values in component_properties: for value in (values if isinstance(values, list) else [values]): this_property = { "Source": self.name, "Property": name, "Value": value } if name not in self.virt_ignore: result.append(this_property) return result def get_properties_strings(self, return_simple = False): result = [] component_properties = self._properties.items() for name, values in component_properties: for value in (values if isinstance(values, list) else [values]): simple_property = f"{name}: {value}" complex_property = { "Source": self.name, "Property": simple_property } if name not in self.virt_ignore: if return_simple: result.append(simple_property) else: result.append(complex_property) return result def get_metrics_keys(self): result = [] empty_value = ["", "null", None, []] for name, value in self._metrics.items(): this_metric = { "Source": self.name, "Metric": name, "Data": value } if value not in empty_value and name not in self.virt_ignore: result.append(this_metric) return result def get_metrics_strings(self): result = [] empty_value = ["", "null", None, []] for name, value in self._metrics.items(): this_metric = { "Source": self.name, "Metric": f"{name}:{value}" } if value not in empty_value and name not in self.virt_ignore: result.append(this_metric) return result # simple data value return def get_metrics_value(self, type = None): result = [] print(f"Metric type: {type}") for name, value in self._metrics.items(): print(f"Metric Property: {name}") if type in name: result.append(value) if len(result) == 1: return result[0] else: return result ############################################################ ############################################################ ### System Class ### The System Class uses the Descriptor to build a List ### of Components and interact with the data in a ### useful manner. The System Object is similar to a ### Component Object in that it has Static and Dynamic ### properties, which are populated in a similar manner ### to the Components. In fact, this is designed for the ### System object to update Component Dymanic Metrics ### as part of the same subroutine that updates its own ############################################################ ############################################################ class System: ######################################################## # system variable declarations ######################################################## for component in component_class_tree: if component["name"] == "System": SYSTEM_DESCRIPTOR = component descriptor = SYSTEM_DESCRIPTOR if descriptor is None: raise ValueError( f"Component type 'System' is not defined in the " f"component descriptor tree." ) static_key_variables = descriptor["static_key_variables"] dynamic_key_variables = descriptor["dynamic_key_variables"] virt_ignore = descriptor["virt_ignore"] ######################################################## # instantiate new system ######################################################## def __init__(self, name: str): # the system needs a name self.name = name log_data(log_output = f"System initializing, name {self.name}", log_level = "debug_output") self.uuid = run_command(cmd = "cat /etc/machine-id", zero_only = True) self.short_id = self.short_uuid(self.uuid) # system contains an array of component objects self.components = [] self.component_class_tree = component_class_tree # initialize system properties and metrics dicts self._properties: Dict[str, str] = {} self._metrics: Dict[str, str] = {} self._virt_string = run_command('systemd-detect-virt', zero_only = True, req_check = False) self.default_gateway = run_command("ip route show | grep def | awk '{print $3}'", zero_only = True, req_check = False) self._virt_ignore = self.virt_ignore if self._virt_string == "none": self._virt_ignore = [] # timekeeping for websocket self.recent_check = int(time.time()) # load static keys for static_key in self.static_key_variables: if static_key["name"] not in self._virt_ignore: self._process_property(static_key = static_key) # initialize live keys self.update_live_keys() # initialze components for component in component_types: self.create_component(component) self.primary_ip = self.get_primary_ip() print(f"Cosmostat System IP: {self.primary_ip}") def __str__(self): components_str = "\n".join(f" - {c}" for c in self.components) return f"System hostname: {self.name}\nComponent Count: {self.get_component_count()}\n{components_str}" def __repr__(self): self_string = f"Cosmostat Client {self.short_id}" def short_uuid(self, value: str, length=8): hasher = hashlib.md5() hasher.update(value.encode('utf-8')) full_hex = hasher.hexdigest() return full_hex[:length] ######################################################## # critical class functions ######################################################## # process static properties, done once at instantiation def _process_property(self, static_key): command = static_key["command"] if "arch_check" in static_key: arch_string = run_command("lscpu --json | jq -r '.lscpu[] | select(.field==\"Architecture:\") | .data'", zero_only = True) if arch_string in command: command = command[arch_string] else: command = f"echo Missing {arch_string} command" if "req_check" in static_key: result = run_command(command, zero_only = True, req_check = static_key["req_check"]) else: result = run_command(command, zero_only = True) log_data(log_output = f'Static key [{static_key["name"]}] - command [{command}] - output [{result}]', log_level = "debug_output") if result not in null_result: self._properties[static_key["name"]] = result # update only system dynamic keys, done periodically def update_live_keys(self): for live_key in self.dynamic_key_variables: if live_key['command'] is not None: command = live_key['command'] result = run_command(command, zero_only = True) if result not in null_result: self._metrics[live_key['name']] = result log_data(log_output = f'Command {live_key["name"]} - [{command}] Result - [{result}]', log_level = "noisy_test") # update all dynamic keys, including components def update_system_state(self): self.update_live_keys() for component in self.components: component.update_metrics() # component creation helper def create_component(self, component): # this is the type, i.e. CPU MEM LAN etc component_name = component["name"] # if there can be multiples, i.e.LAN STOR GPU etc multi_check = component["multi_check"] # if multi, note that the command in device_list creates the list of things to pipe into this_device if multi_check: log_data(log_output = f"Creating one component of type {component_name} for each one found", log_level = "log_output") # build list of components of this type to initialize component_type_device_list = self.get_device_list(component_name) for this_device in component_type_device_list: this_component_ID = component_type_device_list.index(this_device) this_component_name = f"{component_name} {this_component_ID}" log_data(log_output = f"{this_component_name} - {component_name} - {this_device}", log_level = "debug_output") new_component = Component(name = this_component_name, comp_type = component_name, this_device = this_device, parent_system = self) # add new component to system self.components.append(new_component) # if it's just a single thing like CPU RAM then just do one device else: log_data(log_output = f'Creating component {component["name"]}', log_level = "debug_output") new_component = Component(name = component_name, comp_type = component_name, parent_system = self) # add new component to system self.components.append(new_component) # helper function - component type list builder for multi check items def get_device_list(self, device_type_name: str): result = [] for component in component_class_tree: # pre-set to 1 for true value by default precheck_value = 1 # the precheck is critical, i.e there can be 0 or 1 or 2 GPU if "precheck" in component: precheck_command = component["precheck"] precheck_value_output = run_command(precheck_command, zero_only = True) precheck_value = int(precheck_value_output) log_data(log_output = f"Precheck found - {precheck_command} - {precheck_value}", log_level = "log_output") # build a list of devices if precheck passes, this can be a list of 1 like a single GPU system if component["name"] == device_type_name and precheck_value != 0: device_list_command = component["device_list"] device_list_result = run_command(device_list_command) result = device_list_result return result # return the IP of the interface with the gateway # remember the IP is stored like this 192.168.1.0/24 # this data is pulled from the system properties class data # this was built for the update inventory generation # but is not the complete picture def get_primary_ip(self): primary_ip = None interfaces = self.get_components(component_type = "LAN") for interface in interfaces: interface_ip = interface.get_metrics_value(type = "IP Address") interface_subnet = None if "/" in interface_ip: interface_subnet = ipaddress.ip_network(interface_ip, strict=False) print(f"interface IP: {interface_ip} - Default Gateway: {self.default_gateway}") if is_ip_in_subnets(self.default_gateway ,interface_subnet): primary_ip = str(interface_ip).split("/")[0] print(f"Primary IP: {primary_ip}") return primary_ip ######################################################## # helper class functions ######################################################## # Get all components, optionally filtered by type def get_components(self, component_type: type = None): if component_type is None: return self.components else: result = [] for component in self.components: if component_type in component.type: result.append(component) if component.is_multi(): return result else: return result[0] def get_component_count(self): result = int(len(self.components)) return result def get_property(self, property = None): if property == None: return None else: return self._properties.get(property, {}) def is_virtual(self): vm_check = self.get_property('Virtual Machine') log_data(log_output = f'vm_check: {vm_check}', log_level = "debug_output") return vm_check def check_system_timer(self): time_lapsed = time.time() - float(self.recent_check) return time_lapsed < 30.0 def get_component_class_tree(self): return self.component_class_tree def get_system_arch(self): return self.get_property("CPU Architecture") ######################################################## # static metrics redis data functions ######################################################## # return list of all static metrics from system and properties def get_static_metrics(self, human_readable = False): result = [] for component_property in self.get_component_properties(human_readable): result.append(component_property) for system_property in self.get_system_properties(human_readable): result.append(system_property) return result def get_component_properties(self, human_readable = False, component = None): result = [] for component in self.components: if human_readable: for metric in component.get_properties_strings(component = component): result.append(metric) else: for metric in component.get_properties_keys(component = component): result.append(metric) return result def get_system_properties(self, human_readable = False, php_extra = False): result = [] for name, value in self._properties.items(): if human_readable: result.append({ "Source": "System", "Property": f"{name}: {value}" }) else: result.append({ "Source": "System", "Property": name, "Value": value }) if php_extra and human_readable: for component_result in self.php_component_data(): result.append(component_result) return result # helper function for system properties for dashboard rendering def php_component_data(self): result = [] for component in self.components: for this_property in component._properties: if component.check_php_extra(this_property): result_string = f"{this_property}: {component._properties[this_property]}" result.append({ "Source": "System", "Property": result_string }) return result ######################################################## # live metrics redis data functions ######################################################## # return list of all live metrics from system and properties def get_live_metrics(self, human_readable = False): result = [] for component_metric in self.get_component_metrics(human_readable): result.append(component_metric) for system_metric in self.get_system_metrics(human_readable): result.append(system_metric) return result def get_component_metrics(self, human_readable = False): result = [] for component in self.components: if human_readable: metrics_keys = component.get_metrics_strings() else: metrics_keys = component.get_metrics_keys() for metric in metrics_keys: result.append(metric) return result def get_system_metrics(self, human_readable = False): if human_readable: return self.get_system_metric_strings() else: return self.get_system_metric_keys() def get_system_metric_keys(self): result = [] for name, value in self._metrics.items(): thisvar = { "Source": "System", "Metric": name, "Data": value } result.append(thisvar) # add internal dynamic metrics result.append({ "Source": "System", "Metric": "component_count", "Data": self.get_component_count() }) return result def get_system_metric_strings(self): result = [] for name, value in self._metrics.items(): thisvar = { "Source": "System", "Metric": f"{name}: {value}" } result.append(thisvar) # add internal dynamic metrics result.append({ "Source": "System", "Metric": f"component_count: {self.get_component_count()}" }) return result