# this class file is for the cosmostat service import subprocess import json import time import weakref import base64, hashlib from typing import Dict, Any, List from Cosmos_Settings import * # Global Class Vars global_max_length = 500 null_result = [ "", "null", None, [], "Unknown", "To Be Filled By O.E.M." ] # import the component descriptor try: with open("descriptors.json", encoding="utf-8") as f: component_class_tree: List[Dict] = json.load(f) except FileNotFoundError as exc: raise RuntimeError("Descriptor file not found") from exc component_types = [] for entry in component_class_tree: if entry["name"] != "System": component_types.append({"name": entry["name"], "multi_check": entry["multi_check"] == "True"}) ################################################################# ################################################################# # Component Class ################################################################# ################################################################# class Component: ############################################################ # instantiate new component # this_device is set when the component has multiple instances ############################################################ def __init__(self, name: str, comp_type: str, parent_system, this_device=None): # begin init self.name = name self.type = comp_type self.parent_system = weakref.ref(parent_system) # this variable is set when the device can have multiples # it indicates that the commands in the descriptor might need templating self.this_device = this_device self.is_virtual = parent_system.is_virtual() self.cpu_arch = parent_system.get_system_arch() if self.this_device is None: log_data(log_output = f"This device - {self.name}", log_level = "log_output") else: log_data(log_output = f"This device - {self.this_device}", log_level = "log_output") # build the component descriptor dictionary self._descriptor = self._parse_descriptor() # store static properties self.multi_check = self.is_multi() self.virt_ignore = self._descriptor.get('virt_ignore', []) self.multi_metrics = self._descriptor.get('multi_metrics', []) self.arch_check = self._descriptor.get('arch_check', []) if self.is_virtual: self.virt_ignore = [] # initialize properties self._properties: Dict[str, str | list[str]] = {} self._process_properties() # build the description string, requires the properties first self._description_template: str | None = self._descriptor.get("description") self.description = self._description_template.format(**self._properties) # initialize metrics self._metrics: Dict[str, str] = {} self.update_metrics() def __str__(self): self_string = (f"Component name: {self.name}, type: {self.type} - " f"{self.description}") return self_string def __repr__(self): self_string = (f"Component name: {self.name}, type {self.type} - " f"{self.description}") return self_string ############################################################ # Class Functions ############################################################ def update_metrics(self): for key, command in self._descriptor.get('metrics', {}).items(): log_data(log_output = f"Key: {key} - Command: {command}", log_level = "noisy_test") formatted_command = command if self.arch_check is not None: arch_variance = self._descriptor.get('arch_variance', {}) if key in arch_variance: if self.cpu_arch in formatted_command: formatted_command = command[self.cpu_arch] else: formatted_command = f"echo Missing {self.cpu_arch} command" if self.this_device is not None: formatted_command = formatted_command.format(this_device=self.this_device) if formatted_command is not None: result = run_command(formatted_command, zero_only = True) if result not in null_result: self._metrics[key] = result def get_property(self, type = None): if type == None: return self._properties else: return self._properties[type] def is_multi(self): for component_type in component_types: if self.type == component_type["name"]: return component_type["multi_check"] return False # return descriptor for this device type def _parse_descriptor(self): for component in component_class_tree: if component["name"] == self.type: COMPONENT_DESCRIPTORS = component descriptor = COMPONENT_DESCRIPTORS if descriptor is None: raise ValueError( f"Component type '{comp_type}' is not defined in the " f"component descriptor tree." ) return descriptor # iterate over all properties to process descriptor def _process_properties(self): for key, command in self._descriptor.get('properties', {}).items(): return_string = True if key in self.multi_metrics: return_string = False formatted_command = self._parse_command(key, command, return_string) log_data(log_output = f"Property {key} - command: {formatted_command}", log_level = "debug_output") result = run_command(formatted_command, zero_only = return_string) if result not in null_result: self._properties[key] = result # helper function to parse command key def _parse_command(self, key: str, command: str | list[str], return_string = True): result_command = command log_data(log_output = f"_parse_command - {command}", log_level = "debug_output") if self.arch_check: # since the keys are stored with the arch variable this can be concise arch_variance = self._descriptor.get('arch_variance', {}) if key in arch_variance: if self.cpu_arch in result_command: log_data(log_output = f"arch_variance - {key} - {result_command}", log_level = "debug_output") result_command = result_command[self.cpu_arch] else: result_command = f"echo Missing {self.cpu_arch} command" if self.this_device is not None: # template the key if the component type can have multiples result_command = command.format(this_device=self.this_device) log_data(log_output = f"result - {result_command}", log_level = "debug_output") return result_command ######################################################## # keyed data functions ######################################################## def get_properties_keys(self, component = None): result = [] component_properties = [] if component == None: component_properties = self._properties.items() else: component_properties = self.get_property(component) for name, values in component_properties: for value in (values if isinstance(values, list) else [values]): this_property = { "Source": self.name, "Property": name, "Value": value } if name not in self.virt_ignore: result.append(this_property) return result def get_properties_strings(self, return_simple = False): result = [] component_properties = self._properties.items() for name, values in component_properties: for value in (values if isinstance(values, list) else [values]): simple_property = f"{name}: {value}" complex_property = { "Source": self.name, "Property": simple_property } if name not in self.virt_ignore: if return_simple: result.append(simple_property) else: result.append(complex_property) return result def get_metrics_keys(self): result = [] empty_value = ["", "null", None, []] for name, value in self._metrics.items(): this_metric = { "Source": self.name, "Metric": name, "Data": value } if value not in empty_value and name not in self.virt_ignore: result.append(this_metric) return result def get_metrics_strings(self): result = [] empty_value = ["", "null", None, []] for name, value in self._metrics.items(): this_metric = { "Source": self.name, "Metric": f"{name}:{value}" } if value not in empty_value and name not in self.virt_ignore: result.append(this_metric) return result ######################################################## # random data functions ######################################################## # complex data type return def get_metrics(self, type = None): these_metrics = [] if type == None: for name, value in self._metrics: these_metrics.append({"Metric": name, "Data": value}) else: for name, value in self._metrics: if name == type: these_metrics.append({"Metric": name, "Data": value}) result = { "Source": self.name, "Component Type": self.type, "Metrics": these_metrics } return result # complex data type return def get_property_summary(self, type = None): these_properties = [] if type == None: for name, value in self._properties.items(): these_properties.append({"Property": name, "Value": value}) else: for name, value in self._properties.items(): if name == type: these_properties.append({"Property": name, "Value": value}) result = { "Source": self.name, "Component Type": self.type, "Properties": these_properties } return result # full data return def get_description(self): these_properties = [] for name, value in self._properties.items(): these_properties.append({"Property": name, "Value": value}) these_metrics = [] for name, value in self._metrics.items(): these_metrics.append({"Metric": name, "Data": value}) result = { "Source": self.name, "Type": self.type, "Properties": these_properties, "Metrics": these_metrics } return result ############################################################ ############################################################ # System Class ############################################################ ############################################################ class System: ######################################################## # system variable declarations ######################################################## for component in component_class_tree: if component["name"] == "System": SYSTEM_DESCRIPTOR = component descriptor = SYSTEM_DESCRIPTOR if descriptor is None: raise ValueError( f"Component type 'System' is not defined in the " f"component descriptor tree." ) static_key_variables = descriptor["static_key_variables"] dynamic_key_variables = descriptor["dynamic_key_variables"] virt_ignore = descriptor["virt_ignore"] ######################################################## # instantiate new system ######################################################## def __init__(self, name: str): # the system needs a name self.name = name log_data(log_output = f"System initializing, name {self.name}", log_level = "debug_output") self.uuid = run_command(cmd = "cat /etc/machine-id", zero_only = True) self.short_id = self.short_uuid(self.uuid) # system contains an array of component objects self.components = [] self.component_class_tree = component_class_tree # initialize system properties and metrics dicts self._properties: Dict[str, str] = {} self._metrics: Dict[str, str] = {} self._virt_string = run_command('systemd-detect-virt', zero_only = True, req_check = False) self._virt_ignore = self.virt_ignore if self._virt_string == "none": self._virt_ignore = [] # timekeeping for websocket self.recent_check = int(time.time()) # load static keys for static_key in self.static_key_variables: if static_key["name"] not in self._virt_ignore: self.process_property(static_key = static_key) # initialize live keys self.update_live_keys() # initialze components for component in component_types: self.create_component(component) def __str__(self): components_str = "\n".join(f" - {c}" for c in self.components) return f"System hostname: {self.name}\nComponent Count: {self.get_component_count()}\n{components_str}" def __repr__(self): self_string = f"Cosmostat Client {self.short_id}" def short_uuid(self, value: str, length=8): hasher = hashlib.md5() hasher.update(value.encode('utf-8')) full_hex = hasher.hexdigest() return full_hex[:length] ######################################################## # critical class functions ######################################################## # process static keys def process_property(self, static_key): command = static_key["command"] if "arch_check" in static_key: arch_string = run_command("lscpu --json | jq -r '.lscpu[] | select(.field==\"Architecture:\") | .data'", zero_only = True) if arch_string in command: command = command[arch_string] else: command = f"echo Missing {arch_string} command" if "req_check" in static_key: result = run_command(command, zero_only = True, req_check = static_key["req_check"]) else: result = run_command(command, zero_only = True) log_data(log_output = f'Static key [{static_key["name"]}] - command [{command}] - output [{result}]', log_level = "debug_output") if result not in null_result: self._properties[static_key["name"]] = result # update only system dynamic keys def update_live_keys(self): for live_key in self.dynamic_key_variables: if live_key['command'] is not None: command = live_key['command'] result = run_command(command, zero_only = True) if result not in null_result: self._metrics[live_key['name']] = result log_data(log_output = f'Command {live_key["name"]} - [{command}] Result - [{result}]', log_level = "noisy_test") # update all dynamic keys, including components def update_system_state(self): self.update_live_keys() for component in self.components: component.update_metrics() # component creation helper def create_component(self, component): component_name = component["name"] multi_check = component["multi_check"] # if multi, note that the command in device_list creates the list of things to pipe into this_device if multi_check: log_data(log_output = f"Creating one component of type {component_name} for each one found", log_level = "log_output") component_type_device_list = get_device_list(component_name) component_id = 0 for this_device in component_type_device_list: this_component_ID = component_type_device_list.index(this_device) this_component_name = f"{component_name} {this_component_ID}" log_data(log_output = f"{this_component_name} - {component_name} - {this_device}", log_level = "debug_output") new_component = Component(name = this_component_name, comp_type = component_name, this_device = this_device, parent_system = self) self.components.append(new_component) else: log_data(log_output = f'Creating component {component["name"]}', log_level = "debug_output") new_component = Component(name = component_name, comp_type = component_name, parent_system = self) self.components.append(new_component) ######################################################## # helper class functions ######################################################## # Get all components, optionally filtered by type def get_components(self, component_type: type = None): if component_type is None: return self.components else: result = [] for component in self.components: if component.type == component_type: result.append(component) if component.is_multi(): return result else: return result[0] def get_component_count(self): result = int(len(self.components)) return result def get_property(self, property = None): if property == None: return None else: return self._properties.get(property, {}) def is_virtual(self): vm_check = self.get_property('Virtual Machine') log_data(log_output = f'vm_check: {vm_check}', log_level = "debug_output") return vm_check def check_system_timer(self): time_lapsed = time.time() - float(self.recent_check) return time_lapsed < 30.0 def get_component_class_tree(self): return self.component_class_tree def get_system_arch(self): return self.get_property("CPU Architecture") ######################################################## # static metrics redis data functions ######################################################## # return list of all static metrics from system and properties def get_static_metrics(self, human_readable = False): result = [] for component_property in self.get_component_properties(human_readable): result.append(component_property) for system_property in self.get_system_properties(human_readable): result.append(system_property) return result def get_component_properties(self, human_readable = False, component = None): result = [] for component in self.components: if human_readable: for metric in component.get_properties_strings(component = component): result.append(metric) else: for metric in component.get_properties_keys(component = component): result.append(metric) return result def get_system_properties(self, human_readable = False): result = [] for name, value in self._properties.items(): if human_readable: result.append({ "Source": "System", "Property": f"{name}: {value}" }) else: result.append({ "Source": "System", "Property": name, "Value": value }) return result ######################################################## # live metrics redis data functions ######################################################## # return list of all live metrics from system and properties def get_live_metrics(self, human_readable = False): result = [] for component_metric in self.get_component_metrics(human_readable): result.append(component_metric) for system_metric in self.get_system_metrics(human_readable): result.append(system_metric) return result def get_component_metrics(self, human_readable = False): result = [] for component in self.components: if human_readable: metrics_keys = component.get_metrics_strings() else: metrics_keys = component.get_metrics_keys() for metric in metrics_keys: result.append(metric) return result def get_system_metrics(self, human_readable = False): if human_readable: return self.get_system_metric_strings() else: return self.get_system_metric_keys() def get_system_metric_keys(self): result = [] for name, value in self._metrics.items(): thisvar = { "Source": "System", "Metric": name, "Data": value } result.append(thisvar) # add internal dynamic metrics result.append({ "Source": "System", "Metric": "component_count", "Data": self.get_component_count() }) return result def get_system_metric_strings(self): result = [] for name, value in self._metrics.items(): thisvar = { "Source": "System", "Metric": f"{name}: {value}" } result.append(thisvar) # add internal dynamic metrics result.append({ "Source": "System", "Metric": f"component_count: {self.get_component_count()}" }) return result ############################################################ # Non-class Helper Functions ############################################################ # subroutine to run a command, return stdout as array unless zero_only then return [0] def run_command(cmd, zero_only=False, use_shell=True, req_check = True): # Run the command and capture the output result = subprocess.run(cmd, shell=use_shell, check=req_check, stdout=subprocess.PIPE, stderr=subprocess.PIPE) # Decode the byte output to a string output = result.stdout.decode('utf-8') # Split the output into lines and store it in an array output_lines = [line for line in output.split('\n') if line] # Return result try: return output_lines[0] if zero_only else output_lines except: return output_lines def get_device_list(device_type_name: str): result = [] for component in component_class_tree: precheck_value = 1 if "precheck" in component: precheck_command = component["precheck"] precheck_value_output = run_command(precheck_command, zero_only = True) precheck_value = int(precheck_value_output) log_data(log_output = f"Precheck found - {precheck_command} - {precheck_value}", log_level = "log_output") if component["name"] == device_type_name and precheck_value != 0: device_list_command = component["device_list"] device_list_result = run_command(device_list_command) result = device_list_result return result