################################################################# ################################################################# ### Cosmostat Classes ### The Cosmostat Server is a Class for the API running on the ### dashboard. This keeps track of all active systems that are ### actively reporting back to the Cosmostat Server Dashboard ### The static and active data is maintained in a single ### Cosmostat Server Object, and this Object contains a List ### of Cosmostat Client Objects. This is where the data actually ### lives ################################################################# ################################################################# import subprocess import json import time import weakref import ipaddress import base64, hashlib from typing import Dict, Any, List # Import Cosmos Settings from Cosmos_Settings import * ################################################################# ### Cosmostat Server Class ### This Class is for maintaining a list of Client Objects. ### Each Object is a remote System reporting back ### The Class Functions are for the main application to interact ### with client Object data. ################################################################# class CosmostatServer: ############################################################ # instantiate new Cosmostat server ############################################################ def __init__(self, name: str, hostname: str): # the system needs a name, should be equal to the uuid of the client self.name = name self.short_id = self.short_uuid(self.name) self.hostname = hostname log_data(log_output = f"Cosmostat Server {self.short_id} initializing", log_level = "log_output") # system contains an array of CosmostatClient Objects self.systems = [] def __str__(self): self_string = f"Cosmostat Server {self.short_id}" return self_string def __repr__(self): self_string = f"Cosmostat Server {self.short_id}" def add_system(self, system_dictionary: dict): if not self.check_uuid(system_dictionary["uuid"]): print(f"Adding Cosmostat Host: {system_dictionary['hostname']}") new_cosmostat_clilent = CosmostatClient( name = system_dictionary["short_id"], uuid = system_dictionary["uuid"], hostname = system_dictionary["hostname"], active_ip = system_dictionary["active_interface"], is_server = system_dictionary["is_server"], data_timestamp = time.time(), client_properties = system_dictionary["client_properties"], redis_data = {} ) print(f"New Cosmostat Server Object - IP {system_dictionary['active_interface']}") self.systems.append(new_cosmostat_clilent) log_data(log_output = f'Client system {system_dictionary["short_id"]} added', log_level = "log_output") return new_cosmostat_clilent.data_timestamp def update_system(self, system_dictionary: {}, system_uuid: str): this_system = self.get_system(system_uuid) this_system.redis_data = system_dictionary this_system.data_timestamp = time.time() log_data(log_output = f"Client system {this_system.name} update requested, {this_system.uuid}", log_level = "log_output") data_age = time.time() - this_system.data_timestamp if int(data_age) > 60: self.systems.remove(this_system) return this_system.data_timestamp def get_system(self, system_uuid: str): log_data(log_output = f'Cosmostat - get_system - {system_uuid}', log_level = "debug_output") result = None for system in self.systems: if system.uuid == system_uuid: result = system break return result def short_uuid(self, value: str, length=8): hasher = hashlib.md5() hasher.update(value.encode('utf-8')) full_hex = hasher.hexdigest() return full_hex[:length] def check_uuid(self, uuid: str): uuid_exists = False for system in self.systems: if system.uuid == uuid: uuid_exists = True return uuid_exists def get_client_hostname(self, system_uuid: str): client = self.get_system(system_uuid) return client.hostname def get_client_timestamp(self, system_hostname: str): client = self.get_system(get_uuid_from_hostname(system_hostname)) return client.data_timestamp def get_uuid_from_hostname(self, system_hostname): result = "" for system in self.systems: if system.hostname == system_hostname: result = system.uuid return result def get_client_hostnames(self, send_age = False): self.purge_stale_hostnames() result = [] for system in self.systems: if send_age: result.append({"hostname": system.hostname, "data_age": age}) else: result.append(system.hostname) return result def get_metrics_from_ip(self, ip): this_metrics = "" for system in self.systems: if system.active_ip == ip: this_metrics = system.redis_data return this_metrics def purge_stale_hostnames(self): now = time.time() fresh_systems = [] for system in self.systems: age = now - system.data_timestamp if age <= 60: # keep only fresh servers fresh_systems.append(system) self.systems = fresh_systems # replace the old list # return the VPN IP if present, if just_check then it returns true/false if th def get_vpn_ip(self, remote_ip, just_check = False): cosmos_vpn_subnet = "10.200.26.0/24" vpn_ip = None this_client_metrics = self.get_metrics_from_ip(remote_ip) for metric in this_client_metrics: # if the metric is from VPN, is an IP address, and it belongs to the Jenkins VPN subnet if metric["Metric"] == "IP Address" and "VPN" in metric["Source"] and is_ip_in_subnets(metric["Data"].split("/")[0], cosmos_vpn_subnet): vpn_ip = metric["Data"].split("/")[0] if just_check and vpn_ip is not None: vpn_ip = False elif just_check and vpn_ip is None: vpn_ip = True return vpn_ip ################################################################# ### Cosmostat Client Class ### Each Class Object contains static and active data as well as ### the hostname and uuid/short_id. ### The timestamp is for removing stale Clients ################################################################# class CosmostatClient: ############################################################ # instantiate new Cosmostat server ############################################################ def __init__(self, name: str, uuid: str, hostname: str, active_ip: str, is_server: str, data_timestamp: float, client_properties: dict, redis_data: dict): self.name = name self.uuid = uuid self.hostname = hostname self.active_ip = active_ip self.is_server = is_server self.data_timestamp = data_timestamp self.client_properties = client_properties self.redis_data = redis_data def __str__(self): self_string = f'Cosmostat Client {self.name} - Hostname {self.hostname}' return self_string def __repr__(self): self_string = f'Cosmostat Client {self.name} - Hostname {self.hostname}' return self_string def get_properties(self): return self.client.properties def get_redis(self): return self.redis_data # subnet helper app def is_ip_in_subnets(ip, subnet): try: ip_obj = ipaddress.IPv4Address(ip) subnet_obj = ipaddress.IPv4Network(subnet) if ip_obj in subnet_obj: return True return False except ValueError as e: # If the IP address is not valid, raise an error return False