547 lines
21 KiB
Python
547 lines
21 KiB
Python
# this class file is for the cosmostat service
|
|
import subprocess
|
|
import json
|
|
import time
|
|
from typing import Dict, Any, List
|
|
|
|
# Global Class Vars
|
|
global_max_length = 500
|
|
debug_output = False
|
|
|
|
# import the component descriptor
|
|
try:
|
|
with open("component_descriptors.json", encoding="utf-8") as f:
|
|
component_class_tree: List[Dict] = json.load(f)
|
|
except FileNotFoundError as exc:
|
|
raise RuntimeError("Descriptor file not found") from exc
|
|
|
|
component_types = [{"name": entry["name"], "multi_check": entry["multi_check"] == "True"} for entry in component_class_tree]
|
|
|
|
#################################################################
|
|
# Component Class
|
|
#################################################################
|
|
|
|
class Component:
|
|
|
|
############################################################
|
|
# instantiate new component
|
|
############################################################
|
|
|
|
def __init__(self, name: str, comp_type: str, this_device="None", is_virtual = "True"):
|
|
self.name = name
|
|
self.type = comp_type
|
|
# this variable is set when the device can have multiples
|
|
# it indicates that the commands in the descriptor might need templating
|
|
self.this_device = this_device
|
|
self.is_virtual = is_virtual
|
|
print(f"This device - {self.this_device}")
|
|
# build the component descriptor dictionary
|
|
for component in component_class_tree:
|
|
if component["name"] == self.type:
|
|
COMPONENT_DESCRIPTORS = component
|
|
descriptor = COMPONENT_DESCRIPTORS
|
|
self._descriptor = descriptor
|
|
if descriptor is None:
|
|
raise ValueError(
|
|
f"Component type '{comp_type}' is not defined in the "
|
|
f"component descriptor tree."
|
|
)
|
|
# store static properties
|
|
self.multi_check = self.is_multi()
|
|
self.virt_ignore = self._descriptor.get('virt_ignore', [])
|
|
if self.is_virtual:
|
|
self.virt_ignore = []
|
|
self._properties: Dict[str, str] = {}
|
|
for key, command in descriptor.get('properties', {}).items():
|
|
if self.this_device != "None":
|
|
# this means this component type is a multi and the commands need templating for each device
|
|
formatted_command = command.format(this_device=self.this_device)
|
|
self._properties[key] = run_command(formatted_command, True)
|
|
else:
|
|
self._properties[key] = run_command(command, zero_only = True)
|
|
print(self._properties[key])
|
|
# build the description string
|
|
self._description_template: str | None = descriptor.get("description")
|
|
self.description = self._description_template.format(**self._properties)
|
|
# initialize metrics
|
|
self._metrics: Dict[str, str] = {}
|
|
self.update_metrics()
|
|
|
|
def __str__(self):
|
|
self_string = (f"Component name: {self.name}, type: {self.type} - "
|
|
f"{self.description}")
|
|
return self_string
|
|
|
|
def __repr__(self):
|
|
self_string = (f"Component name: {self.name}, type {self.type} - "
|
|
f"{self.description}")
|
|
return self_string
|
|
|
|
############################################################
|
|
# Class Functions
|
|
############################################################
|
|
|
|
def update_metrics(self):
|
|
for key, command in self._descriptor.get('metrics', {}).items():
|
|
if self.this_device != "None":
|
|
formatted_command = command.format(this_device=self.this_device)
|
|
this_metric = run_command(formatted_command, True)
|
|
if this_metric is not None:
|
|
self._metrics[key] = this_metric
|
|
else:
|
|
self._metrics[key] = run_command(command, zero_only = True)
|
|
|
|
def get_property(self, type = None):
|
|
if type == None:
|
|
return self._properties
|
|
else:
|
|
return self._properties[type]
|
|
|
|
def is_multi(self):
|
|
for component_type in component_types:
|
|
if self.type == component_type["name"]:
|
|
return component_type["multi_check"]
|
|
return False
|
|
|
|
########################################################
|
|
# redis data functions
|
|
########################################################
|
|
|
|
def get_properties_keys(self, component = None):
|
|
result = []
|
|
component_properties = []
|
|
if component == None:
|
|
component_properties = self._properties.items()
|
|
else:
|
|
component_properties = self.get_property(component)
|
|
for name, value in component_properties:
|
|
this_property = {
|
|
"Source": self.name,
|
|
"Property": name,
|
|
"Value": value
|
|
}
|
|
if name not in self.virt_ignore:
|
|
result.append(this_property)
|
|
return result
|
|
|
|
def get_properties_strings(self, return_simple = False):
|
|
result = []
|
|
component_properties = self._properties.items()
|
|
print(component_properties)
|
|
for name, value in component_properties:
|
|
simple_property = f"{name}: {value}"
|
|
complex_property = {
|
|
"Source": self.name,
|
|
"Property": simple_property
|
|
}
|
|
if name not in self.virt_ignore:
|
|
if return_simple:
|
|
result.append(simple_property)
|
|
else:
|
|
result.append(complex_property)
|
|
return result
|
|
|
|
def get_metrics_keys(self):
|
|
result = []
|
|
empty_value = ["", "null", None, []]
|
|
for name, value in self._metrics.items():
|
|
this_metric = {
|
|
"Source": self.name,
|
|
"Metric": name,
|
|
"Data": value
|
|
}
|
|
if value not in empty_value and name not in self.virt_ignore:
|
|
result.append(this_metric)
|
|
return result
|
|
|
|
def get_metrics_strings(self):
|
|
result = []
|
|
empty_value = ["", "null", None, []]
|
|
for name, value in self._metrics.items():
|
|
this_metric = {
|
|
"Source": self.name,
|
|
"Metric": f"{name}:{value}"
|
|
}
|
|
if value not in empty_value and name not in self.virt_ignore:
|
|
result.append(this_metric)
|
|
return result
|
|
|
|
########################################################
|
|
# random data functions
|
|
########################################################
|
|
|
|
# complex data type return
|
|
def get_metrics(self, type = None):
|
|
these_metrics = []
|
|
if type == None:
|
|
for name, value in self._metrics:
|
|
these_metrics.append({"Metric": name, "Data": value})
|
|
else:
|
|
for name, value in self._metrics:
|
|
if name == type:
|
|
these_metrics.append({"Metric": name, "Data": value})
|
|
result = {
|
|
"Source": self.name,
|
|
"Component Type": self.type,
|
|
"Metrics": these_metrics
|
|
}
|
|
return result
|
|
|
|
# complex data type return
|
|
def get_property_summary(self, type = None):
|
|
these_properties = []
|
|
if type == None:
|
|
for name, value in self._properties.items():
|
|
these_properties.append({"Property": name, "Value": value})
|
|
else:
|
|
for name, value in self._properties.items():
|
|
if name == type:
|
|
these_properties.append({"Property": name, "Value": value})
|
|
result = {
|
|
"Source": self.name,
|
|
"Component Type": self.type,
|
|
"Properties": these_properties
|
|
}
|
|
return result
|
|
|
|
# full data return
|
|
def get_description(self):
|
|
these_properties = []
|
|
for name, value in self._properties.items():
|
|
these_properties.append({"Property": name, "Value": value})
|
|
these_metrics = []
|
|
for name, value in self._metrics.items():
|
|
these_metrics.append({"Metric": name, "Data": value})
|
|
result = {
|
|
"Source": self.name,
|
|
"Type": self.type,
|
|
"Properties": these_properties,
|
|
"Metrics": these_metrics
|
|
}
|
|
return result
|
|
|
|
############################################################
|
|
# System Class
|
|
# this is a big one...
|
|
############################################################
|
|
|
|
class System:
|
|
|
|
########################################################
|
|
# system variable declarations
|
|
########################################################
|
|
|
|
static_key_variables = [
|
|
{"name": "Hostname", "command": "hostname"},
|
|
{"name": "Virtual Machine", "command": 'echo $( [ "$(systemd-detect-virt)" = none ] && echo False || echo True )', "req_check": "False"},
|
|
{"name": "CPU Architecture:", "command": "lscpu --json | jq -r '.lscpu[] | select(.field==\"Architecture:\") | .data'"},
|
|
{"name": "OS Kernel", "command": "uname -r"},
|
|
{"name": "OS Name", "command": "cat /etc/os-release | grep PRETTY | cut -d\\\" -f2"},
|
|
{"name": "Manufacturer", "command": "sudo dmidecode --type 1 | grep Manufacturer: | cut -d: -f2 | sed -e 's/^[ \\t]*//'"},
|
|
{"name": "Product Name", "command": "sudo dmidecode --type 2 | grep 'Product Name:' | cut -d: -f2 | sed -e 's/^[ \\t]*//'"},
|
|
{"name": "Serial Number", "command": "sudo dmidecode --type 2 | grep 'Serial Number: '| cut -d: -f2 | sed -e 's/^[ \\t]*//'"},
|
|
]
|
|
dynamic_key_variables = [
|
|
{"name": "System Uptime", "command": "uptime -p"},
|
|
{"name": "Current Date", "command": "date '+%D %r'"},
|
|
]
|
|
|
|
virt_ignore = [
|
|
"Product Name",
|
|
"Serial Number"
|
|
]
|
|
|
|
########################################################
|
|
# instantiate new system
|
|
########################################################
|
|
|
|
def __init__(self, name: str):
|
|
# the system needs a name
|
|
self.name = name
|
|
if debug_output:
|
|
print(f"System initializing, name {self.name}")
|
|
# system contains an array of component objects
|
|
self.components = []
|
|
# initialize system properties and metrics dicts
|
|
self._properties: Dict[str, str] = {}
|
|
self._metrics: Dict[str, str] = {}
|
|
self._virt_string = run_command('systemd-detect-virt', zero_only = True, req_check = False)
|
|
self._virt_ignore = self.virt_ignore
|
|
if self._virt_string == "none":
|
|
self._virt_ignore = []
|
|
# timekeeping for websocket
|
|
self.recent_check = int(time.time())
|
|
# load static keys
|
|
for static_key in self.static_key_variables:
|
|
if static_key["name"] not in self._virt_ignore:
|
|
command = static_key["command"]
|
|
if "req_check" in static_key:
|
|
result = run_command(command, zero_only = True, req_check = static_key["req_check"])
|
|
else:
|
|
result = run_command(command, zero_only = True)
|
|
if debug_output:
|
|
print(f'Static key [{static_key["name"]}] - command [{command}] - output [{result}]')
|
|
self._properties[static_key["name"]] = result
|
|
# initialize live keys
|
|
self.update_live_keys()
|
|
# initialze components
|
|
self.load_components()
|
|
|
|
def __str__(self):
|
|
components_str = "\n".join(f" - {c}" for c in self.components)
|
|
return f"System hostname: {self.name}\nComponent Count: {self.get_component_count()}\n{components_str}"
|
|
|
|
########################################################
|
|
# critical class functions
|
|
########################################################
|
|
|
|
# update only system dynamic keys
|
|
def update_live_keys(self):
|
|
for live_key in self.dynamic_key_variables:
|
|
if live_key['command'] is not None:
|
|
command = live_key['command']
|
|
result = run_command(command, zero_only = True)
|
|
self._metrics[live_key['name']] = result
|
|
if debug_output:
|
|
print(f'Command {live_key["name"]} - [{command}] Result - [{result}]')
|
|
|
|
# update all dynamic keys, including components
|
|
def update_system_state(self):
|
|
self.update_live_keys()
|
|
for component in self.components:
|
|
component.update_metrics()
|
|
|
|
# check for components
|
|
def load_components(self):
|
|
for component in component_types:
|
|
component_name = component["name"]
|
|
multi_check = component["multi_check"]
|
|
# if multi, note that the command in device_list creates the list of things to pipe into this_device
|
|
if multi_check:
|
|
letters = [chr(c) for c in range(ord('A'), ord('Z')+1)]
|
|
print(f"Creating one component of type {component_name} for each one found")
|
|
component_type_device_list = get_device_list(component_name)
|
|
|
|
for this_device in component_type_device_list:
|
|
this_component_letter = letters[component_type_device_list.index(this_device)]
|
|
this_component_name = f"{component_name} {this_component_letter}"
|
|
print(f"{this_component_name} - {component_name} - {this_device}")
|
|
self.add_components(Component(name = this_component_name, comp_type = component_name, this_device = this_device))
|
|
|
|
else:
|
|
if debug_output:
|
|
print(f'Creating component {component["name"]}')
|
|
self.add_components(Component(name = component_name, comp_type = component_name, is_virtual = self.is_virtual()))
|
|
|
|
# Add a component to the system
|
|
def add_components(self, component: Component,):
|
|
if debug_output:
|
|
print(f"Component description: {component.description}")
|
|
self.components.append(component)
|
|
|
|
########################################################
|
|
# helper class functions
|
|
########################################################
|
|
|
|
# Get all components, optionally filtered by type
|
|
def get_components(self, component_type: type = None):
|
|
if component_type is None:
|
|
return self.components
|
|
else:
|
|
result = []
|
|
for component in self.components:
|
|
if component.type == component_type:
|
|
result.append(component)
|
|
if component.is_multi():
|
|
return result
|
|
else:
|
|
return result[0]
|
|
|
|
def get_component_count(self):
|
|
result = int(len(self.components))
|
|
return result
|
|
|
|
def get_property(self, property = None):
|
|
if property == None:
|
|
return None
|
|
else:
|
|
return self._properties.get(property, {})
|
|
|
|
def is_virtual(self):
|
|
vm_check = self.get_property('Virtual Machine')
|
|
print(f'vm_check: {vm_check}')
|
|
return vm_check
|
|
|
|
def check_system_timer(self):
|
|
time_lapsed = time.time() - float(self.recent_check)
|
|
return time_lapsed < 30.0
|
|
|
|
########################################################
|
|
# static metrics redis data functions
|
|
########################################################
|
|
|
|
# return list of all static metrics from system and properties
|
|
def get_static_metrics(self, human_readable = False):
|
|
result = []
|
|
for component_property in self.get_component_properties(human_readable):
|
|
result.append(component_property)
|
|
for system_property in self.get_system_properties(human_readable):
|
|
result.append(system_property)
|
|
return result
|
|
|
|
def get_component_properties(self, human_readable = False, component = None):
|
|
result = []
|
|
for component in self.components:
|
|
if human_readable:
|
|
for metric in component.get_properties_strings(component = component):
|
|
result.append(metric)
|
|
else:
|
|
for metric in component.get_properties_keys(component = component):
|
|
result.append(metric)
|
|
return result
|
|
|
|
def get_system_properties(self, human_readable = False):
|
|
result = []
|
|
for name, value in self._properties.items():
|
|
if human_readable:
|
|
result.append({
|
|
"Source": "System",
|
|
"Property": f"{name}: {value}"
|
|
})
|
|
else:
|
|
result.append({
|
|
"Source": "System",
|
|
"Property": name,
|
|
"Value": value
|
|
})
|
|
return result
|
|
|
|
########################################################
|
|
# live metrics redis data functions
|
|
########################################################
|
|
|
|
# return list of all live metrics from system and properties
|
|
def get_live_metrics(self, human_readable = False):
|
|
result = []
|
|
for component_metric in self.get_component_metrics(human_readable):
|
|
result.append(component_metric)
|
|
for system_metric in self.get_system_metrics(human_readable):
|
|
result.append(system_metric)
|
|
return result
|
|
|
|
def get_component_metrics(self, human_readable = False):
|
|
result = []
|
|
for component in self.components:
|
|
if human_readable:
|
|
metrics_keys = component.get_metrics_strings()
|
|
else:
|
|
metrics_keys = component.get_metrics_keys()
|
|
for metric in metrics_keys:
|
|
result.append(metric)
|
|
return result
|
|
|
|
def get_system_metrics(self, human_readable = False):
|
|
if human_readable:
|
|
return self.get_system_metric_strings()
|
|
else:
|
|
return self.get_system_metric_keys()
|
|
|
|
def get_system_metric_keys(self):
|
|
result = []
|
|
for name, value in self._metrics.items():
|
|
thisvar = {
|
|
"Source": "System",
|
|
"Metric": name,
|
|
"Data": value
|
|
}
|
|
|
|
result.append(thisvar)
|
|
# add internal dynamic metrics
|
|
result.append({
|
|
"Source": "System",
|
|
"Metric": "component_count",
|
|
"Data": self.get_component_count()
|
|
})
|
|
return result
|
|
|
|
def get_system_metric_strings(self):
|
|
result = []
|
|
for name, value in self._metrics.items():
|
|
thisvar = {
|
|
"Source": "System",
|
|
"Metric": f"{name}: {value}"
|
|
}
|
|
|
|
result.append(thisvar)
|
|
# add internal dynamic metrics
|
|
result.append({
|
|
"Source": "System",
|
|
"Metric": f"component_count: {self.get_component_count()}"
|
|
})
|
|
return result
|
|
|
|
# straggler functions, might cut them
|
|
|
|
# return both static and dynamic data
|
|
def get_sysvars_summary_keys(self):
|
|
result = []
|
|
for name, value in self._properties.items():
|
|
thisvar = {
|
|
"name": "System Class Property",
|
|
"type": name,
|
|
"value": value
|
|
}
|
|
result.append(thisvar)
|
|
for name, value in self._metrics.items():
|
|
thisvar = {
|
|
"name": "System Class Metric",
|
|
"type": name,
|
|
"value": value
|
|
}
|
|
result.append(thisvar)
|
|
return result
|
|
|
|
def get_component_strings(self, component_type: type = None):
|
|
if component_type is None:
|
|
result = []
|
|
for component in self.components:
|
|
result.append(component.description)
|
|
return result
|
|
else:
|
|
result = []
|
|
for component in self.components:
|
|
if component.type == component_type:
|
|
result.append(component.description)
|
|
if component.is_multi():
|
|
return result
|
|
else:
|
|
return result[0]
|
|
|
|
############################################################
|
|
# Non-class Helper Functions
|
|
############################################################
|
|
|
|
# subroutine to run a command, return stdout as array unless zero_only then return [0]
|
|
def run_command(cmd, zero_only=False, use_shell=True, req_check = True):
|
|
# Run the command and capture the output
|
|
result = subprocess.run(cmd, shell=use_shell, check=req_check, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
|
# Decode the byte output to a string
|
|
output = result.stdout.decode('utf-8')
|
|
# Split the output into lines and store it in an array
|
|
output_lines = [line for line in output.split('\n') if line]
|
|
# Return result
|
|
try:
|
|
return output_lines[0] if zero_only else output_lines
|
|
except:
|
|
return output_lines
|
|
|
|
def get_device_list(device_type_name: str):
|
|
result = []
|
|
for component in component_class_tree:
|
|
if component["name"] == device_type_name:
|
|
device_list_command = component["device_list"]
|
|
device_list_result = run_command(device_list_command)
|
|
result = device_list_result
|
|
|
|
return result
|