360 lines
12 KiB
Python
360 lines
12 KiB
Python
# this class file is for the cosmostat service
|
|
import subprocess
|
|
import json
|
|
from typing import Dict, Any, List
|
|
|
|
# Global Class Vars
|
|
global_max_length = 500
|
|
debug_output = False
|
|
|
|
# import the component descriptor
|
|
# this outlines how the component class works
|
|
# each type of component has a "type"
|
|
try:
|
|
with open("component_descriptors.json", encoding="utf-8") as f:
|
|
component_class_tree: List[Dict] = json.load(f)
|
|
except FileNotFoundError as exc:
|
|
raise RuntimeError("Descriptor file not found") from exc
|
|
|
|
component_types = [{"name": entry["name"], "multi_check": entry["multi_check"] == "True"} for entry in component_class_tree]
|
|
|
|
class Component:
|
|
|
|
def __init__(self, name: str, comp_type: str ):
|
|
self.name = name
|
|
self.type = comp_type
|
|
for component in component_class_tree:
|
|
if component["name"] == self.type:
|
|
COMPONENT_DESCRIPTORS = component
|
|
# Load component type descriptor from class tree
|
|
# COMPONENT_DESCRIPTORS = {d['type']: d for d in component_class_tree}
|
|
descriptor = COMPONENT_DESCRIPTORS
|
|
self._descriptor = descriptor
|
|
if descriptor is None:
|
|
raise ValueError(
|
|
f"Component type '{comp_type}' is not defined in the "
|
|
f"component descriptor tree."
|
|
)
|
|
# store static properties
|
|
self.multi_check = self.is_multi()
|
|
self._properties: Dict[str, str] = {}
|
|
for key, command in descriptor.get('properties', {}).items():
|
|
self._properties[key] = run_command(command, True)
|
|
# build the description string
|
|
self._description_template: str | None = descriptor.get("description")
|
|
self.description = self._description_template.format(**self._properties)
|
|
# initialize metrics
|
|
self._metrics: Dict[str, str] = {}
|
|
self.update_metrics()
|
|
|
|
def __str__(self):
|
|
self_string = (f"Component name: {self.name}, type: {self.type} - "
|
|
f"{self.description}")
|
|
return self_string
|
|
|
|
def __repr__(self):
|
|
self_string = (f"Component name: {self.name}, type {self.type} - "
|
|
f"{self.description}")
|
|
return self_string
|
|
|
|
def update_metrics(self):
|
|
for key, command in self._descriptor.get('metrics', {}).items():
|
|
self._metrics[key] = run_command(command, True)
|
|
|
|
# complex data type return
|
|
def get_metrics(self, type = None):
|
|
these_metrics = []
|
|
if type == None:
|
|
for name, value in self._metrics:
|
|
these_metrics.append({"name": name, "value": value})
|
|
else:
|
|
for name, value in self._metrics:
|
|
if name == type:
|
|
these_metrics.append({"name": name, "value": value})
|
|
result = {
|
|
"name": self.name,
|
|
"type": self.type,
|
|
"metrics": these_metrics
|
|
}
|
|
return result
|
|
|
|
# complex data type return
|
|
def get_properties(self, type = None):
|
|
these_properties = []
|
|
if type == None:
|
|
for name, value in self._properties.items():
|
|
these_properties.append({"name": name, "value": value})
|
|
else:
|
|
for name, value in self._properties.items():
|
|
if name == type:
|
|
these_properties.append({"name": name, "value": value})
|
|
result = {
|
|
"name": self.name,
|
|
"type": self.type,
|
|
"properties": these_properties
|
|
}
|
|
return result
|
|
|
|
# this gets the value of a specified property, type required
|
|
def get_property(self, type):
|
|
return self._properties[type]
|
|
|
|
# returns array of dicts for redis
|
|
def get_metrics_keys(self):
|
|
result = []
|
|
for name, value in self._metrics.items():
|
|
this_metric = {
|
|
"name": self.name,
|
|
"type": name,
|
|
"metric": value
|
|
}
|
|
result.append(this_metric)
|
|
return result
|
|
|
|
def get_properties_keys(self):
|
|
result = []
|
|
for name, value in self._properties.items():
|
|
this_property = {
|
|
"name": self.name,
|
|
"property": name,
|
|
"value": value
|
|
}
|
|
result.append(this_property)
|
|
return result
|
|
|
|
# full data return
|
|
def get_description(self):
|
|
these_properties = []
|
|
for name, value in self._metrics.items():
|
|
these_properties.append({"name": name, "value": value})
|
|
these_metrics = []
|
|
for name, value in self._metrics.items():
|
|
these_metrics.append({"name": name, "value": value})
|
|
result = {
|
|
"name": self.name,
|
|
"type": self.type,
|
|
"properties": these_properties,
|
|
"metrics": these_metrics
|
|
}
|
|
return result
|
|
|
|
def is_multi(self):
|
|
for component_type in component_types:
|
|
if self.type == component_type["name"]:
|
|
return component_type["multi_check"]
|
|
return False
|
|
|
|
|
|
############################################################
|
|
# System Class
|
|
############################################################
|
|
|
|
class System:
|
|
# system variable declarations
|
|
# keys to add: model and serial number
|
|
static_key_variables = [
|
|
{"name": "hostname", "command": "hostname"},
|
|
{"name": "virt_string", "command": "systemd-detect-virt"}
|
|
]
|
|
dynamic_key_variables = [
|
|
{"name": "uptime", "command": "uptime -p"},
|
|
{"name": "timestamp", "command": "date '+%D %r'"},
|
|
]
|
|
# add components based on the class tree
|
|
# component_types = [{"name": entry["name"], "multi_check": entry["multi_check"] == "True"} for entry in component_class_tree]
|
|
|
|
# instantiate new system
|
|
def __init__(self, name: str):
|
|
# the system needs a name
|
|
self.name = name
|
|
if debug_output:
|
|
print(f"System initializing, name {self.name}")
|
|
# system contains an array of component objects
|
|
self.components = []
|
|
# initialize system properties and metrics dicts
|
|
self._properties: Dict[str, str] = {}
|
|
self._metrics: Dict[str, str] = {}
|
|
# load static keys
|
|
for static_key in self.static_key_variables:
|
|
command = static_key["command"]
|
|
result = run_command(command, True)
|
|
if debug_output:
|
|
print(f"Static key [{static_key["name"]}] - command [{command}] - output [{result}]")
|
|
self._properties[static_key["name"]] = result
|
|
# initialize live keys
|
|
self.update_live_keys()
|
|
# initialze components
|
|
self.load_components()
|
|
|
|
# update only system dynamic keys
|
|
def update_live_keys(self):
|
|
for live_key in self.dynamic_key_variables:
|
|
if live_key['command'] is not None:
|
|
command = live_key['command']
|
|
result = run_command(command, True)
|
|
self._metrics[live_key['name']] = result
|
|
if debug_output:
|
|
print(f"Command {live_key["name"]} - [{command}] Result - [{result}]")
|
|
|
|
# update all dynamic keys, including components
|
|
def update_system_state(self):
|
|
self.update_live_keys()
|
|
for component in self.components:
|
|
component.update_metrics()
|
|
|
|
# check for components
|
|
def load_components(self):
|
|
for component in component_types:
|
|
component_name = component["name"]
|
|
multi_check = component["multi_check"]
|
|
if multi_check:
|
|
print("placeholder...")
|
|
else:
|
|
if debug_output:
|
|
print(f"Creating component {component["name"]}")
|
|
self.add_components(Component(component_name, component_name))
|
|
|
|
# Add a component to the system
|
|
def add_components(self, component: Component):
|
|
if debug_output:
|
|
print(f"Component description: {component.description}")
|
|
self.components.append(component)
|
|
|
|
# Get all components, optionally filtered by type
|
|
def get_components(self, component_type: type = None):
|
|
if component_type is None:
|
|
return self.components
|
|
else:
|
|
result = []
|
|
for component in self.components:
|
|
if component.type == component_type:
|
|
result.append(component)
|
|
if component.is_multi():
|
|
return result
|
|
else:
|
|
return result[0]
|
|
|
|
def get_component_strings(self, component_type: type = None):
|
|
if component_type is None:
|
|
result = []
|
|
for component in self.components:
|
|
result.append(component.description)
|
|
return result
|
|
else:
|
|
result = []
|
|
for component in self.components:
|
|
if component.type == component_type:
|
|
result.append(component.description)
|
|
if component.is_multi():
|
|
return result
|
|
else:
|
|
return result[0]
|
|
|
|
# get component count
|
|
def get_component_count(self):
|
|
result = int(len(self.components))
|
|
return result
|
|
|
|
def __str__(self):
|
|
components_str = "\n".join(f" - {c}" for c in self.components)
|
|
return f"System hostname: {self.name}\nComponent Count: {self.get_component_count()}\n{components_str}"
|
|
|
|
# return both static and dynamic data
|
|
def get_sysvars_summary_keys(self):
|
|
result = []
|
|
for name, value in self._properties.items():
|
|
thisvar = {
|
|
"name": "System Class Property",
|
|
"type": name,
|
|
"value": value
|
|
}
|
|
result.append(thisvar)
|
|
for name, value in self._metrics.items():
|
|
thisvar = {
|
|
"name": "System Class Metric",
|
|
"type": name,
|
|
"value": value
|
|
}
|
|
result.append(thisvar)
|
|
return result
|
|
|
|
# return list of all live metrics from system and properties
|
|
def get_live_metrics(self):
|
|
result = []
|
|
for component_metric in self.get_component_metrics():
|
|
result.append(component_metric)
|
|
for system_metric in self.get_system_metrics():
|
|
result.append(system_metric)
|
|
return result
|
|
|
|
# return array of all component metrics
|
|
def get_component_metrics(self):
|
|
result = []
|
|
for component in self.components:
|
|
for metric in component.get_metrics_keys():
|
|
result.append(metric)
|
|
return result
|
|
|
|
# return array of all component metrics
|
|
def get_component_properties(self):
|
|
result = []
|
|
for component in self.components:
|
|
for metric in component.get_properties_keys():
|
|
result.append(metric)
|
|
return result
|
|
|
|
# return array of all system metrics
|
|
def get_system_metrics(self):
|
|
result = []
|
|
for name, value in self._metrics.items():
|
|
thisvar = {
|
|
"name": "System",
|
|
"type": name,
|
|
"metric": value
|
|
}
|
|
result.append(thisvar)
|
|
# add component count
|
|
result.append({
|
|
"name": "System",
|
|
"type": "component_count",
|
|
"metric": self.get_component_count()
|
|
})
|
|
return result
|
|
|
|
def get_system_properties(self):
|
|
result = []
|
|
for name, value in self._properties.items():
|
|
if name == "virt_string":
|
|
thisvar = {
|
|
"name": "System",
|
|
"property": name,
|
|
"value": value == "none"
|
|
}
|
|
else:
|
|
thisvar = {
|
|
"name": "System",
|
|
"property": name,
|
|
"value": value
|
|
}
|
|
result.append(thisvar)
|
|
return result
|
|
|
|
|
|
############################################################
|
|
# Helper Functions
|
|
############################################################
|
|
|
|
# subroutine to run a command, return stdout as array unless zero_only then return [0]
|
|
def run_command(cmd, zero_only=False):
|
|
# Run the command and capture the output
|
|
result = subprocess.run(cmd, shell=True, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
|
# Decode the byte output to a string
|
|
output = result.stdout.decode('utf-8')
|
|
# Split the output into lines and store it in an array
|
|
output_lines = [line for line in output.split('\n') if line]
|
|
# Return result
|
|
try:
|
|
return output_lines[0] if zero_only else output_lines
|
|
except:
|
|
return output_lines
|