# this class file is for the cosmostat service import subprocess from LinkedList import * global_max_length = 500 class Component: ########################################################################################## # Base class for all system components. All instantiated objects need a child class # Class data: ### name - name of the type of component, declared in the parent class ### status ### model_string - string with device info, declared in parent class ### metric_name - name of the value being measured ### current_value ### historical_data - This will be a linked list used to generate a json when calling get_historical_data ### for this to work, the function using these classes needs to update the values periodically #### historical_data = [ #### { #### "timestamp": timestamp, # seconds since epoch #### "value": value #### }, #### { #### "timestamp": timestamp, #### "value": value #### } #### ] def __init__(self, name: str, model_string: str = None): # fail instantiation if critical data is missing if self.model_string is None: raise TypeError("Error - missing component model_string") if self.metric_name is None: raise TypeError("Error - missing component metric_name") if self.metric_value_command is None: raise TypeError("Error - missing component metric_value_command") if self.type is None: raise TypeError("Error - missing component type") if self.has_temp is None: raise TypeError("Error - missing temp data check") # set up history list self.history_max_length = global_max_length self.historical_data = ValueHistory(self.history_max_length) self.history_start = self.historical_data.get_first_timestamp() self.update_value() if self.current_value is None: raise TypeError("Error - failed to read value") # if temp data exists, handle it if self.has_temp: self.temp_history_data = ValueHistory(self.history_max_length) self.temp_history_start = self.temp_history_data.get_first_timestamp() self.current_temp = self.temp_history_data.get_current_value() else: self.temp_history_data = None # instantiate other shared class variables self.name = name self.current_value = self.historical_data.get_current_value() if self.has_temp: self.current_temp = self.temp_history_data.get_current_value() else: self.current_temp = None self.comment = f"This is a {self.type}, so we are measuring {self.metric_name}, currently at {self.current_value}" # if nothing failed, the object is ready self.status = "ready" def __str__(self): return (f"{self.__class__.__name__}: {self.name} " f"{self.model_string}") def __del__(self): print(f"Deleting {self.type} component - {self.model_string}") def get_info_key(self): result = { "name": self.name, "type": self.type, "model_string": self.model_string, "metric_name": self.metric_name } return result def get_summary_key(self): result = { "type": self.type, "current_value": self.current_value, "metric_name": self.metric_name, "model_string": self.model_string } return result def update_value(self): #try: self.current_value = run_command(self.metric_value_command, True) self.historical_data.add(self.current_value) #except: def update_temp_value(self): if has_temp: #try: self.current_temp = run_command(self.temp_value_command, True) self.temp_history_data.add(self.current_value) #except: else: return None def get_history(self, count: int = global_max_length): if self.has_temp: result = { "value_metric": self.metric_name, "history_count": count, "history_data": self.historical_data.get_history(count), # reminder this is a LinkedList get_history "history_temp_data": self.temp_history_data.get_history(count) } else: result = { "value_metric": self.metric_name, "history_count": count, "history_data": self.historical_data.get_history(count) # same reminder here } return result ############################################################ # Component Class Types # There needs to be one of these for each monitored thing ############################################################ # Need to add: ### temperatures ### network + VPN ### storage + ZFS ### video cards ### virtual machines # CPU component class. class CPU(Component): def __init__(self, name: str, is_virtual: bool = False): # Declare component type self.type = "CPU" # deal with temp later self.has_temp = False # no temp if VM #self.has_temp = not is_virtual #self.temp_value_command = "acpi -V | jc --acpi -p | jq '.[] | select(.type==\"Thermal\") | .temperature '" self.model_string = self.get_model_string() # Initialize value self.metric_name = "1m_load" self.metric_value_command = "cat /proc/loadavg | awk '{print $1}'" self.current_value = run_command(self.metric_value_command, True) # Complete instantiation super().__init__(name, self.model_string) def get_model_string(self): # Get CPU Info model_string_command = "lscpu --json | jq -r '.lscpu[] | select(.field==\"Model name:\") | .data'" return run_command(model_string_command, True) # RAM component class. class RAM(Component): def __init__(self, name: str): # Declare component type self.type = "RAM" self.has_temp = False self.model_string = self.get_model_string() # Initialize Value self.metric_name = "used_capacity_mb" self.metric_value_command = "free -m | grep Mem | awk '{print $3}'" self.current_value = run_command(self.metric_value_command, True) # Complete instantiation super().__init__(name, self.model_string) def get_model_string(self): # Check total system RAM bytes_total_command = "sudo lshw -json -c memory | jq -r '.[] | select(.description==\"System Memory\").size' " bytes_total = float(run_command(bytes_total_command, True)) gb_total = round(bytes_total / 1073741824, 2) return f"Total Capacity: {gb_total}GB" ############################################################ # System Class # A system is build from components ############################################################ class System: # instantiate new system def __init__(self, name: str): # the system needs a name self.name = name # system is built of other component objects self.components = [] # other system properties self.sysvars = {} # either i do it here or i do it twice self.sysvars["is_virtual"] = self.check_for_virtual() # Let's build a system self.add_component(CPU("CPU", self.sysvars["is_virtual"])) self.add_component(RAM("RAM")) # let's build system values self.check_values() # Add a component to the system def add_component(self, component: Component): self.components.append(component) # Get all components, optionally filtered by type def get_components(self, component_type: type = None): if component_type is None: return self.components return [c for c in self.components if isinstance(c, component_type)] # get component count def get_component_count(self): result = int(len(self.components)) return result def __str__(self): components_str = "\n".join(f" - {c}" for c in self.components) return f"System: {self.name}\n{components_str}" # update metrics for all components def update_values(self): self.check_values() for component in self.components: component.update_value() def check_for_virtual(self): check_if_vm_command = "systemd-detect-virt" check_if_vm = run_command(check_if_vm_command, True) if check_if_vm != "none": return True else: return False def check_uptime(self): check_uptime_command = "uptime -p" system_uptime = run_command(check_uptime_command, True) return system_uptime def check_timestamp(self): check_timestamp_command = "date '+%D %r'" system_timestamp = run_command(check_timestamp_command, True) return system_timestamp def check_values(self): self.sysvars["uptime"] = self.check_uptime() self.sysvars["name"] = self.name self.sysvars["component_count"] = self.get_component_count() self.sysvars["timestamp"] = self.check_timestamp() def get_sysvars(self): result = {} for sysvar in self.sysvars: result[f"{sysvar}"] = self.sysvars[f"{sysvar}"] return result def get_sysvars_summary_keys(self): result = [] for sysvar in self.sysvars: system_type_string = f"sysvar['{sysvar}']" thisvar = { "type": "System Class Variable", "current_value": sysvar, "metric_name": system_type_string, "model_string": self.sysvars[sysvar] } result.append(thisvar) return result ############################################################ # Helper Functions ############################################################ # subroutine to run a command, return stdout as array unless zero_only then return [0] def run_command(cmd, zero_only=False): # Run the command and capture the output result = subprocess.run(cmd, shell=True, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) # Decode the byte output to a string output = result.stdout.decode('utf-8') # Split the output into lines and store it in an array output_lines = [line for line in output.split('\n') if line] # Return result try: return output_lines[0] if zero_only else output_lines except: return output_lines