Files
cosmoserver/files/api/Components.py

563 lines
22 KiB
Python

# this class file is for the cosmostat service
import subprocess
import json
import time
from typing import Dict, Any, List
# Global Class Vars
global_max_length = 500
debug_output = False
# import the component descriptor
try:
with open("component_descriptors.json", encoding="utf-8") as f:
component_class_tree: List[Dict] = json.load(f)
except FileNotFoundError as exc:
raise RuntimeError("Descriptor file not found") from exc
component_types = [{"name": entry["name"], "multi_check": entry["multi_check"] == "True"} for entry in component_class_tree]
#################################################################
# Component Class
#################################################################
class Component:
############################################################
# instantiate new component
############################################################
def __init__(self, name: str, comp_type: str, this_device="None", is_virtual = "True"):
self.name = name
self.type = comp_type
# this variable is set when the device can have multiples
# it indicates that the commands in the descriptor might need templating
self.this_device = this_device
self.is_virtual = is_virtual
print(f"This device - {self.this_device}")
# build the component descriptor dictionary
for component in component_class_tree:
if component["name"] == self.type:
COMPONENT_DESCRIPTORS = component
descriptor = COMPONENT_DESCRIPTORS
self._descriptor = descriptor
if descriptor is None:
raise ValueError(
f"Component type '{comp_type}' is not defined in the "
f"component descriptor tree."
)
# store static properties
self.multi_check = self.is_multi()
self.virt_ignore = self._descriptor.get('virt_ignore', [])
self.multi_metrics = self._descriptor.get('multi_metrics', [])
#if 'precheck' in self._descriptor:
# precheck_command = self._descriptor.get('precheck', [])
# precheck_value = int(run_command(precheck_command, zero_only = True))
# if precheck_value == 0:
# raise ValueError(f"No devices of type {self.type}")
if self.is_virtual:
self.virt_ignore = []
self._properties: Dict[str, str | list[str]] = {}
for key, command in descriptor.get('properties', {}).items():
return_string = True
if key in self.multi_metrics:
return_string = False
if self.this_device != "None":
# this means this component type is a multi and the commands need templating for each device
formatted_command = command.format(this_device=self.this_device)
self._properties[key] = run_command(formatted_command, zero_only = return_string)
else:
self._properties[key] = run_command(command, zero_only = return_string)
print(self._properties[key])
# build the description string
self._description_template: str | None = descriptor.get("description")
self.description = self._description_template.format(**self._properties)
# initialize metrics
self._metrics: Dict[str, str] = {}
self.update_metrics()
def __str__(self):
self_string = (f"Component name: {self.name}, type: {self.type} - "
f"{self.description}")
return self_string
def __repr__(self):
self_string = (f"Component name: {self.name}, type {self.type} - "
f"{self.description}")
return self_string
############################################################
# Class Functions
############################################################
def update_metrics(self):
for key, command in self._descriptor.get('metrics', {}).items():
if self.this_device != "None":
formatted_command = command.format(this_device=self.this_device)
this_metric = run_command(formatted_command, True)
if this_metric is not None:
self._metrics[key] = this_metric
else:
self._metrics[key] = run_command(command, zero_only = True)
def get_property(self, type = None):
if type == None:
return self._properties
else:
return self._properties[type]
def is_multi(self):
for component_type in component_types:
if self.type == component_type["name"]:
return component_type["multi_check"]
return False
########################################################
# redis data functions
########################################################
def get_properties_keys(self, component = None):
result = []
component_properties = []
if component == None:
component_properties = self._properties.items()
else:
component_properties = self.get_property(component)
for name, values in component_properties:
for value in (values if isinstance(values, list) else [values]):
this_property = {
"Source": self.name,
"Property": name,
"Value": value
}
if name not in self.virt_ignore:
result.append(this_property)
return result
def get_properties_strings(self, return_simple = False):
result = []
component_properties = self._properties.items()
for name, values in component_properties:
for value in (values if isinstance(values, list) else [values]):
simple_property = f"{name}: {value}"
complex_property = {
"Source": self.name,
"Property": simple_property
}
if name not in self.virt_ignore:
if return_simple:
result.append(simple_property)
else:
result.append(complex_property)
return result
def get_metrics_keys(self):
result = []
empty_value = ["", "null", None, []]
for name, value in self._metrics.items():
this_metric = {
"Source": self.name,
"Metric": name,
"Data": value
}
if value not in empty_value and name not in self.virt_ignore:
result.append(this_metric)
return result
def get_metrics_strings(self):
result = []
empty_value = ["", "null", None, []]
for name, value in self._metrics.items():
this_metric = {
"Source": self.name,
"Metric": f"{name}:{value}"
}
if value not in empty_value and name not in self.virt_ignore:
result.append(this_metric)
return result
########################################################
# random data functions
########################################################
# complex data type return
def get_metrics(self, type = None):
these_metrics = []
if type == None:
for name, value in self._metrics:
these_metrics.append({"Metric": name, "Data": value})
else:
for name, value in self._metrics:
if name == type:
these_metrics.append({"Metric": name, "Data": value})
result = {
"Source": self.name,
"Component Type": self.type,
"Metrics": these_metrics
}
return result
# complex data type return
def get_property_summary(self, type = None):
these_properties = []
if type == None:
for name, value in self._properties.items():
these_properties.append({"Property": name, "Value": value})
else:
for name, value in self._properties.items():
if name == type:
these_properties.append({"Property": name, "Value": value})
result = {
"Source": self.name,
"Component Type": self.type,
"Properties": these_properties
}
return result
# full data return
def get_description(self):
these_properties = []
for name, value in self._properties.items():
these_properties.append({"Property": name, "Value": value})
these_metrics = []
for name, value in self._metrics.items():
these_metrics.append({"Metric": name, "Data": value})
result = {
"Source": self.name,
"Type": self.type,
"Properties": these_properties,
"Metrics": these_metrics
}
return result
############################################################
# System Class
# this is a big one...
############################################################
class System:
########################################################
# system variable declarations
########################################################
static_key_variables = [
{"name": "Hostname", "command": "hostname"},
{"name": "Virtual Machine", "command": 'echo $( [ "$(systemd-detect-virt)" = none ] && echo False || echo True )', "req_check": "False"},
{"name": "CPU Architecture:", "command": "lscpu --json | jq -r '.lscpu[] | select(.field==\"Architecture:\") | .data'"},
{"name": "OS Kernel", "command": "uname -r"},
{"name": "OS Name", "command": "cat /etc/os-release | grep PRETTY | cut -d\\\" -f2"},
{"name": "Manufacturer", "command": "sudo dmidecode --type 1 | grep Manufacturer: | cut -d: -f2 | sed -e 's/^[ \\t]*//'"},
{"name": "Product Name", "command": "sudo dmidecode --type 2 | grep 'Product Name:' | cut -d: -f2 | sed -e 's/^[ \\t]*//'"},
{"name": "Serial Number", "command": "sudo dmidecode --type 2 | grep 'Serial Number: '| cut -d: -f2 | sed -e 's/^[ \\t]*//'"},
]
dynamic_key_variables = [
{"name": "System Uptime", "command": "uptime -p"},
{"name": "Current Date", "command": "date '+%D %r'"},
]
virt_ignore = [
"Product Name",
"Serial Number"
]
########################################################
# instantiate new system
########################################################
def __init__(self, name: str):
# the system needs a name
self.name = name
if debug_output:
print(f"System initializing, name {self.name}")
# system contains an array of component objects
self.components = []
# initialize system properties and metrics dicts
self._properties: Dict[str, str] = {}
self._metrics: Dict[str, str] = {}
self._virt_string = run_command('systemd-detect-virt', zero_only = True, req_check = False)
self._virt_ignore = self.virt_ignore
if self._virt_string == "none":
self._virt_ignore = []
# timekeeping for websocket
self.recent_check = int(time.time())
# load static keys
for static_key in self.static_key_variables:
if static_key["name"] not in self._virt_ignore:
command = static_key["command"]
if "req_check" in static_key:
result = run_command(command, zero_only = True, req_check = static_key["req_check"])
else:
result = run_command(command, zero_only = True)
if debug_output:
print(f'Static key [{static_key["name"]}] - command [{command}] - output [{result}]')
self._properties[static_key["name"]] = result
# initialize live keys
self.update_live_keys()
# initialze components
self.load_components()
def __str__(self):
components_str = "\n".join(f" - {c}" for c in self.components)
return f"System hostname: {self.name}\nComponent Count: {self.get_component_count()}\n{components_str}"
########################################################
# critical class functions
########################################################
# update only system dynamic keys
def update_live_keys(self):
for live_key in self.dynamic_key_variables:
if live_key['command'] is not None:
command = live_key['command']
result = run_command(command, zero_only = True)
self._metrics[live_key['name']] = result
if debug_output:
print(f'Command {live_key["name"]} - [{command}] Result - [{result}]')
# update all dynamic keys, including components
def update_system_state(self):
self.update_live_keys()
for component in self.components:
component.update_metrics()
# check for components
def load_components(self):
for component in component_types:
component_name = component["name"]
multi_check = component["multi_check"]
# if multi, note that the command in device_list creates the list of things to pipe into this_device
if multi_check:
print(f"Creating one component of type {component_name} for each one found")
component_type_device_list = get_device_list(component_name)
component_id = 0
for this_device in component_type_device_list:
this_component_ID = component_type_device_list.index(this_device)
this_component_name = f"{component_name} {this_component_ID}"
print(f"{this_component_name} - {component_name} - {this_device}")
new_component = Component(name = this_component_name, comp_type = component_name, this_device = this_device)
self.add_components(new_component)
else:
if debug_output:
print(f'Creating component {component["name"]}')
self.add_components(Component(name = component_name, comp_type = component_name, is_virtual = self.is_virtual()))
# Add a component to the system
def add_components(self, component: Component,):
if debug_output:
print(f"Component description: {component.description}")
self.components.append(component)
########################################################
# helper class functions
########################################################
# Get all components, optionally filtered by type
def get_components(self, component_type: type = None):
if component_type is None:
return self.components
else:
result = []
for component in self.components:
if component.type == component_type:
result.append(component)
if component.is_multi():
return result
else:
return result[0]
def get_component_count(self):
result = int(len(self.components))
return result
def get_property(self, property = None):
if property == None:
return None
else:
return self._properties.get(property, {})
def is_virtual(self):
vm_check = self.get_property('Virtual Machine')
print(f'vm_check: {vm_check}')
return vm_check
def check_system_timer(self):
time_lapsed = time.time() - float(self.recent_check)
return time_lapsed < 30.0
########################################################
# static metrics redis data functions
########################################################
# return list of all static metrics from system and properties
def get_static_metrics(self, human_readable = False):
result = []
for component_property in self.get_component_properties(human_readable):
result.append(component_property)
for system_property in self.get_system_properties(human_readable):
result.append(system_property)
return result
def get_component_properties(self, human_readable = False, component = None):
result = []
for component in self.components:
if human_readable:
for metric in component.get_properties_strings(component = component):
result.append(metric)
else:
for metric in component.get_properties_keys(component = component):
result.append(metric)
return result
def get_system_properties(self, human_readable = False):
result = []
for name, value in self._properties.items():
if human_readable:
result.append({
"Source": "System",
"Property": f"{name}: {value}"
})
else:
result.append({
"Source": "System",
"Property": name,
"Value": value
})
return result
########################################################
# live metrics redis data functions
########################################################
# return list of all live metrics from system and properties
def get_live_metrics(self, human_readable = False):
result = []
for component_metric in self.get_component_metrics(human_readable):
result.append(component_metric)
for system_metric in self.get_system_metrics(human_readable):
result.append(system_metric)
return result
def get_component_metrics(self, human_readable = False):
result = []
for component in self.components:
if human_readable:
metrics_keys = component.get_metrics_strings()
else:
metrics_keys = component.get_metrics_keys()
for metric in metrics_keys:
result.append(metric)
return result
def get_system_metrics(self, human_readable = False):
if human_readable:
return self.get_system_metric_strings()
else:
return self.get_system_metric_keys()
def get_system_metric_keys(self):
result = []
for name, value in self._metrics.items():
thisvar = {
"Source": "System",
"Metric": name,
"Data": value
}
result.append(thisvar)
# add internal dynamic metrics
result.append({
"Source": "System",
"Metric": "component_count",
"Data": self.get_component_count()
})
return result
def get_system_metric_strings(self):
result = []
for name, value in self._metrics.items():
thisvar = {
"Source": "System",
"Metric": f"{name}: {value}"
}
result.append(thisvar)
# add internal dynamic metrics
result.append({
"Source": "System",
"Metric": f"component_count: {self.get_component_count()}"
})
return result
# straggler functions, might cut them
# return both static and dynamic data
def get_sysvars_summary_keys(self):
result = []
for name, value in self._properties.items():
thisvar = {
"name": "System Class Property",
"type": name,
"value": value
}
result.append(thisvar)
for name, value in self._metrics.items():
thisvar = {
"name": "System Class Metric",
"type": name,
"value": value
}
result.append(thisvar)
return result
def get_component_strings(self, component_type: type = None):
if component_type is None:
result = []
for component in self.components:
result.append(component.description)
return result
else:
result = []
for component in self.components:
if component.type == component_type:
result.append(component.description)
if component.is_multi():
return result
else:
return result[0]
############################################################
# Non-class Helper Functions
############################################################
# subroutine to run a command, return stdout as array unless zero_only then return [0]
def run_command(cmd, zero_only=False, use_shell=True, req_check = True):
# Run the command and capture the output
result = subprocess.run(cmd, shell=use_shell, check=req_check, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
# Decode the byte output to a string
output = result.stdout.decode('utf-8')
# Split the output into lines and store it in an array
output_lines = [line for line in output.split('\n') if line]
# Return result
try:
return output_lines[0] if zero_only else output_lines
except:
return output_lines
def get_device_list(device_type_name: str):
result = []
for component in component_class_tree:
precheck_value = 1
if "precheck" in component:
precheck_command = component["precheck"]
precheck_value_output = run_command(precheck_command, zero_only = True)
precheck_value = int(precheck_value_output)
print(f"Precheck found - {precheck_command} - {precheck_value}")
if component["name"] == device_type_name and precheck_value != 0:
device_list_command = component["device_list"]
device_list_result = run_command(device_list_command)
result = device_list_result
return result