Files
cosmoserver/files/api/Cosmostat.py
2026-03-22 18:44:07 -07:00

134 lines
4.8 KiB
Python

# This will be a class definitation for the cosmostat server
# On the server, there will be a Cosmostat Class Object
# This will have an array of System Class Objects
# These will be created based on API input from remote systems
# The remote systems will submit a json of their state to a private API
# this will define the System Class
import subprocess
import json
import time
import weakref
import base64, hashlib
from typing import Dict, Any, List
from Cosmos_Settings import *
#################################################################
#################################################################
# Cosmostat Class
#################################################################
#################################################################
class CosmostatServer:
############################################################
# instantiate new Cosmostat server
############################################################
def __init__(self, name: str):
# the system needs a name, should be equal to the uuid of the client
self.name = name
self.short_id = self.short_uuid(self.name)
log_data(log_output = f"Cosmostat Server {self.short_id} initializing", log_level = "log_output")
# system contains an array of CosmostatClient Objects
self.systems = []
def __str__(self):
self_string = f"Cosmostat Server {self.short_id}"
return self_string
def __repr__(self):
self_string = f"Cosmostat Server {self.short_id}"
def add_system(self, system_dictionary: dict):
if not self.check_uuid(system_dictionary["uuid"]):
new_cosmostat_clilent = CosmostatClient(
name = system_dictionary["short_id"],
uuid = system_dictionary["uuid"],
hostname = system_dictionary["hostname"],
data_timestamp = time.time(),
client_properties = system_dictionary["client_properties"],
redis_data = {}
)
self.systems.append(new_cosmostat_clilent)
log_data(log_output = f'Client system {system_dictionary["short_id"]} added', log_level = "log_output")
return new_cosmostat_clilent.data_timestamp
def update_system(self, system_dictionary: {}, system_uuid: str):
this_system = self.get_system(system_uuid)
this_system.redis_data = system_dictionary
this_system.data_timestamp = time.time()
log_data(log_output = f"Client system {this_system.name} update requested, {this_system.uuid}", log_level = "log_output")
data_age = time.time() - this_system.data_timestamp
if int(data_age) > 60:
self.systems.remove(this_system)
return this_system.data_timestamp
def get_system(self, system_uuid: str):
log_data(log_output = f'Cosmostat - get_system - {system_uuid}', log_level = "debug_output")
result = None
for system in self.systems:
if system.uuid == system_uuid:
result = system
break
return result
def short_uuid(self, value: str, length=8):
hasher = hashlib.md5()
hasher.update(value.encode('utf-8'))
full_hex = hasher.hexdigest()
return full_hex[:length]
def check_uuid(self, uuid: str):
uuid_exists = False
for system in self.systems:
if system.uuid == uuid:
uuid_exists = True
return uuid_exists
def get_client_hostname(self, system_uuid: str):
client = self.get_system(system_uuid)
return client.hostname
def get_client_hostnames(self, send_age = False):
result = []
for system in self.systems:
data_age = time.time() - system.data_timestamp
if int(data_age) > 60:
self.systems.remove(system)
else:
result.append(system.hostname)
return result
class CosmostatClient:
############################################################
# instantiate new Cosmostat server
############################################################
def __init__(self, name: str, uuid: str, hostname: str, data_timestamp: float, client_properties: dict, redis_data: dict):
self.name = name
self.uuid = uuid
self.hostname = hostname
self.data_timestamp = data_timestamp
self.client_properties = client_properties
self.redis_data = redis_data
def __str__(self):
self_string = f'Cosmostat Client {self.name} - Hostname {self.hostname}'
return self_string
def __repr__(self):
self_string = f'Cosmostat Client {self.name} - Hostname {self.hostname}'
return self_string
def get_properties(self):
return self.client.properties
def get_redis(self):
return self.redis_data