Files
cosmoserver/files/api/Cosmostat.py
2026-03-21 21:20:00 -07:00

76 lines
2.8 KiB
Python

# This will be a class definitation for the cosmostat server
# On the server, there will be a Cosmostat Class Object
# This will have an array of System Class Objects
# These will be created based on API input from remote systems
# The remote systems will submit a json of their state to a private API
# this will define the System Class
import subprocess
import json
import time
import weakref
import base64, hashlib
from typing import Dict, Any, List
from Cosmos_Settings import *
#################################################################
#################################################################
# Cosmostat Class
#################################################################
#################################################################
class Cosmostat:
############################################################
# instantiate new Cosmostat server
############################################################
def __init__(self, name: str):
# the system needs a name, should be equal to the uuid of the client
self.name = name
self.short_id = self.short_uuid(self.name)
log_data(log_output = f"Cosmostat Server {self.short_id} initializing", log_level = "log_output")
# system contains an array of keys with component objects
self.systems = []
def __str__(self):
self_string = f"Cosmostat Server {self.short_id}"
return self_string
def __repr__(self):
self_string = f"Cosmostat Server {self.short_id}"
def add_system(self, system_dictionary: dict):
new_system_key = {
"data_timestamp": time.time(),
"uuid": system_dictionary["uuid"],
"short_id": system_dictionary["short_id"],
"client_properties": system_dictionary["client_properties"],
"redis_data": {}
}
log_data(log_output = f"Client system {system_dictionary["short_id"]} added", log_level = "log_output")
self.systems.append(new_system_key)
def update_system(self, system_state: {}, system_uuid: str):
this_system = self.get_system(system_uuid)
this_system["redis_data"] = system_state
this_system["data_timestamp"] = time.time()
log_data(log_output = f"Client system {this_system["short_id"]} addupdateded", log_level = "log_output")
return this_system["data_timestamp"]
def get_system(self, system_uuid: str) -> dict:
result = {}
for system in self.systems:
if system["uuid"] == system_uuid:
return system
return result
def short_uuid(self, value: str, length=8):
hasher = hashlib.md5()
hasher.update(value.encode('utf-8'))
full_hex = hasher.hexdigest()
return full_hex[:length]