# this class file is for the cosmostat service import subprocess import json import time from typing import Dict, Any, List # Global Class Vars global_max_length = 500 debug_output = False # import the component descriptor try: with open("component_descriptors.json", encoding="utf-8") as f: component_class_tree: List[Dict] = json.load(f) except FileNotFoundError as exc: raise RuntimeError("Descriptor file not found") from exc component_types = [{"name": entry["name"], "multi_check": entry["multi_check"] == "True"} for entry in component_class_tree] ################################################################# # Component Class ################################################################# class Component: ############################################################ # instantiate new component ############################################################ def __init__(self, name: str, comp_type: str, this_device="None"): self.name = name self.type = comp_type self.this_device = this_device print(f"This device - {self.this_device}") # build the component descriptor dictionary for component in component_class_tree: if component["name"] == self.type: COMPONENT_DESCRIPTORS = component descriptor = COMPONENT_DESCRIPTORS self._descriptor = descriptor if descriptor is None: raise ValueError( f"Component type '{comp_type}' is not defined in the " f"component descriptor tree." ) # store static properties self.multi_check = self.is_multi() self.virt_ignore = self._descriptor.get('virt_ignore', []) self._properties: Dict[str, str] = {} for key, command in descriptor.get('properties', {}).items(): if self.this_device != "None": formatted_command = command.format(this_device=self.this_device) self._properties[key] = run_command(formatted_command, True) else: self._properties[key] = run_command(command, True) # build the description string self._description_template: str | None = descriptor.get("description") self.description = self._description_template.format(**self._properties) # initialize metrics self._metrics: Dict[str, str] = {} self.update_metrics() def __str__(self): self_string = (f"Component name: {self.name}, type: {self.type} - " f"{self.description}") return self_string def __repr__(self): self_string = (f"Component name: {self.name}, type {self.type} - " f"{self.description}") return self_string ############################################################ # Class Functions ############################################################ def update_metrics(self): for key, command in self._descriptor.get('metrics', {}).items(): if self.this_device != "None": formatted_command = command.format(this_device=self.this_device) this_metric = run_command(formatted_command, True) if this_metric is not None: self._metrics[key] = this_metric else: self._metrics[key] = run_command(command, True) def get_property(self, type): return self._properties[type] def is_multi(self): for component_type in component_types: if self.type == component_type["name"]: return component_type["multi_check"] return False ######################################################## # redis data functions ######################################################## def get_properties_keys(self): result = [] for name, value in self._properties.items(): this_property = { "Source": self.name, "Property": name, "Value": value } if name not in self.virt_ignore: result.append(this_property) return result def get_properties_strings(self): result = [] for name, value in self._properties.items(): this_property = { "Source": self.name, "Property": f"{name}: {value}" } if name not in self.virt_ignore: result.append(this_property) return result def get_metrics_keys(self): result = [] empty_value = ["", "null", None, []] for name, value in self._metrics.items(): this_metric = { "Source": self.name, "Metric": name, "Data": value } if value not in empty_value and name not in self.virt_ignore: result.append(this_metric) return result def get_metrics_strings(self): result = [] empty_value = ["", "null", None, []] for name, value in self._metrics.items(): this_metric = { "Source": self.name, "Metric": f"{name}:{value}" } if value not in empty_value and name not in self.virt_ignore: result.append(this_metric) return result ######################################################## # random data functions ######################################################## # complex data type return def get_metrics(self, type = None): these_metrics = [] if type == None: for name, value in self._metrics: these_metrics.append({"Metric": name, "Data": value}) else: for name, value in self._metrics: if name == type: these_metrics.append({"Metric": name, "Data": value}) result = { "Source": self.name, "Component Type": self.type, "Metrics": these_metrics } return result # complex data type return def get_properties(self, type = None): these_properties = [] if type == None: for name, value in self._properties.items(): these_properties.append({"Property": name, "Value": value}) else: for name, value in self._properties.items(): if name == type: these_properties.append({"Property": name, "Value": value}) result = { "Source": self.name, "Component Type": self.type, "Properties": these_properties } return result # full data return def get_description(self): these_properties = [] for name, value in self._properties.items(): these_properties.append({"Property": name, "Value": value}) these_metrics = [] for name, value in self._metrics.items(): these_metrics.append({"Metric": name, "Data": value}) result = { "Source": self.name, "Type": self.type, "Properties": these_properties, "Metrics": these_metrics } return result ############################################################ # System Class # this is a big one... ############################################################ class System: ######################################################## # system variable declarations ######################################################## static_key_variables = [ {"name": "Hostname", "command": "hostname"}, {"name": "Virtual Machine", "command": "echo $([[ \"$(systemd-detect-virt)\" == none ]] && echo False || echo True)"}, {"name": "CPU Architecture:", "command": "lscpu --json | jq -r '.lscpu[] | select(.field==\"Architecture:\") | .data'"}, {"name": "OS Kernel", "command": "uname -r"}, {"name": "OS Name", "command": "cat /etc/os-release | grep PRETTY | cut -d\\\" -f2"}, {"name": "Manufacturer", "command": "sudo dmidecode --type 1 | grep Manufacturer: | cut -d: -f2 | sed -e 's/^[ \\t]*//'"}, {"name": "Product Name", "command": "sudo dmidecode --type 2 | grep 'Product Name:' | cut -d: -f2 | sed -e 's/^[ \\t]*//'"}, {"name": "Serial Number", "command": "sudo dmidecode --type 2 | grep 'Serial Number: '| cut -d: -f2 | sed -e 's/^[ \\t]*//'"}, ] dynamic_key_variables = [ {"name": "System Uptime", "command": "uptime -p"}, {"name": "Current Date", "command": "date '+%D %r'"}, ] virt_ignore = [ "Product Name", "Serial Number" ] ######################################################## # instantiate new system ######################################################## def __init__(self, name: str): # the system needs a name self.name = name if debug_output: print(f"System initializing, name {self.name}") # system contains an array of component objects self.components = [] # initialize system properties and metrics dicts self._properties: Dict[str, str] = {} self._metrics: Dict[str, str] = {} # timekeeping for websocket self.recent_check = int(time.time()) # load static keys for static_key in self.static_key_variables: if static_key["name"] not in self.virt_ignore: command = static_key["command"] result = run_command(command, True) if debug_output: print(f'Static key [{static_key["name"]}] - command [{command}] - output [{result}]') self._properties[static_key["name"]] = result # initialize live keys self.update_live_keys() # initialze components self.load_components() def __str__(self): components_str = "\n".join(f" - {c}" for c in self.components) return f"System hostname: {self.name}\nComponent Count: {self.get_component_count()}\n{components_str}" ######################################################## # critical class functions ######################################################## # update only system dynamic keys def update_live_keys(self): for live_key in self.dynamic_key_variables: if live_key['command'] is not None: command = live_key['command'] result = run_command(command, True) self._metrics[live_key['name']] = result if debug_output: print(f'Command {live_key["name"]} - [{command}] Result - [{result}]') # update all dynamic keys, including components def update_system_state(self): self.update_live_keys() for component in self.components: component.update_metrics() # check for components def load_components(self): for component in component_types: component_name = component["name"] multi_check = component["multi_check"] # if multi, note that the command in device_list creates the list of things to pipe into this_device if multi_check: letters = [chr(c) for c in range(ord('A'), ord('Z')+1)] print(f"Creating one component of type {component_name} for each one found") component_type_device_list = get_device_list(component_name) for this_device in component_type_device_list: this_component_letter = letters[component_type_device_list.index(this_device)] this_component_name = f"{component_name} {this_component_letter}" print(f"{this_component_name} - {component_name} - {this_device}") self.add_components(Component(this_component_name, component_name, this_device)) else: if debug_output: print(f'Creating component {component["name"]}') self.add_components(Component(component_name, component_name)) # Add a component to the system def add_components(self, component: Component,): if debug_output: print(f"Component description: {component.description}") self.components.append(component) ######################################################## # helper class functions ######################################################## # Get all components, optionally filtered by type def get_components(self, component_type: type = None): if component_type is None: return self.components else: result = [] for component in self.components: if component.type == component_type: result.append(component) if component.is_multi(): return result else: return result[0] def get_component_count(self): result = int(len(self.components)) return result def is_virtual(self): virt_check = self._properties.get('virt_ignore', {}).items() def check_system_timer(self): time_lapsed = time.time() - float(self.recent_check) return time_lapsed < 30.0 ######################################################## # static metrics redis data functions ######################################################## # return list of all static metrics from system and properties def get_static_metrics(self, human_readable = False): result = [] for component_property in self.get_component_properties(human_readable): result.append(component_property) for system_property in self.get_system_properties(human_readable): result.append(system_property) return result def get_component_properties(self, human_readable = False): result = [] for component in self.components: if human_readable: for metric in component.get_properties_strings(): result.append(metric) else: for metric in component.get_properties_keys(): result.append(metric) return result def get_system_properties(self, human_readable = False): result = [] for name, value in self._properties.items(): if human_readable: result.append({ "Source": "System", "Property": f"{name}: {value}" }) else: result.append({ "Source": "System", "Property": name, "Value": value }) return result ######################################################## # live metrics redis data functions ######################################################## # return list of all live metrics from system and properties def get_live_metrics(self, human_readable = False): result = [] for component_metric in self.get_component_metrics(human_readable): result.append(component_metric) for system_metric in self.get_system_metrics(human_readable): result.append(system_metric) return result def get_component_metrics(self, human_readable = False): result = [] for component in self.components: if human_readable: metrics_keys = component.get_metrics_strings() else: metrics_keys = component.get_metrics_keys() for metric in metrics_keys: result.append(metric) return result def get_system_metrics(self, human_readable = False): if human_readable: return self.get_system_metric_strings() else: return self.get_system_metric_keys() def get_system_metric_keys(self): result = [] for name, value in self._metrics.items(): thisvar = { "Source": "System", "Metric": name, "Data": value } result.append(thisvar) # add internal dynamic metrics result.append({ "Source": "System", "Metric": "component_count", "Data": self.get_component_count() }) return result def get_system_metric_strings(self): result = [] for name, value in self._metrics.items(): thisvar = { "Source": "System", "Metric": f"{name}: {value}" } result.append(thisvar) # add internal dynamic metrics result.append({ "Source": "System", "Metric": f"component_count: {self.get_component_count()}" }) return result # straggler functions, might cut them # return both static and dynamic data def get_sysvars_summary_keys(self): result = [] for name, value in self._properties.items(): thisvar = { "name": "System Class Property", "type": name, "value": value } result.append(thisvar) for name, value in self._metrics.items(): thisvar = { "name": "System Class Metric", "type": name, "value": value } result.append(thisvar) return result def get_component_strings(self, component_type: type = None): if component_type is None: result = [] for component in self.components: result.append(component.description) return result else: result = [] for component in self.components: if component.type == component_type: result.append(component.description) if component.is_multi(): return result else: return result[0] ############################################################ # Non-class Helper Functions ############################################################ # subroutine to run a command, return stdout as array unless zero_only then return [0] def run_command(cmd, zero_only=False): # Run the command and capture the output result = subprocess.run(cmd, shell=True, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) # Decode the byte output to a string output = result.stdout.decode('utf-8') # Split the output into lines and store it in an array output_lines = [line for line in output.split('\n') if line] # Return result try: return output_lines[0] if zero_only else output_lines except: return output_lines def get_device_list(device_type_name: str): result = [] for component in component_class_tree: if component["name"] == device_type_name: device_list_command = component["device_list"] device_list_result = run_command(device_list_command) result = device_list_result return result