####################################################################### ### app.py ### cosmostat service handler ####################################################################### from flask import Flask, jsonify, request, Response from flask_apscheduler import APScheduler from typing import Dict, Union import json, time, redis, yaml import secrets, string import requests from requests import RequestException, Response # Import Cosmos Settings from Cosmos_Settings import * # System and Component Classes from Components import * # Cosmostat server Classes from Cosmostat import * # declare flask apps app = Flask(__name__) scheduler = APScheduler() ####################################################################### ### Redis Functions ####################################################################### # Redis client - will publish updates r = redis.Redis(host=redis_gateway_ip(), port=6379) def update_redis_channel(redis_channel, data): # Publish to the specified Redis channel r.publish(redis_channel, json.dumps(data)) log_data(log_output = data, log_level = "noisy_test") def update_redis_server(): # Client Redis Tree if cosmostat_client.check_system_timer(): update_redis_channel("host_metrics", get_client_redis_data(human_readable = False)) if run_cosmostat_server(): update_redis_channel("client_summary", get_server_redis_data()) #update_redis_channel("client_hostnames", get_server_hostnames()) # History Redis Tree # Update history_stats Redis Channel # update_redis_channel("history_stats", get_component_list()) def get_client_redis_data(human_readable = False): result = [] for metric in get_dynamic_data(human_readable): result.append(metric) #for metric in get_static_data(human_readable): # result.append(metric) return result def get_server_redis_data(): result = [] for client in cosmostat_server.systems: this_client_key = { "hostname": client.hostname, "data_timestamp": client.data_timestamp, "uuid": client.uuid, "short_id": client.name, "redis_data": client.redis_data } result.append(this_client_key) return result def get_server_hostnames(): return cosmostat_server.get_client_hostnames() ####################################################################### ### Client Flask Routes ####################################################################### # dynamic data # this will go to the redis server @app.route('/dynamic_data', methods=['GET']) def dynamic_data(): return jsonify(get_dynamic_data()) # static data @app.route('/static_data', methods=['GET']) def static_data(): return jsonify(get_static_data()) # redis data @app.route('/redis_data', methods=['GET']) def redis_data(): return jsonify(get_client_redis_data(human_readable = False)) # redis strings @app.route('/redis_strings', methods=['GET']) def redis_strings(): return jsonify(get_client_redis_data(human_readable = True)) # php summary @app.route('/php_summary', methods=['GET']) def php_summary(): return jsonify(get_php_summary()) # return full descriptor @app.route('/descriptor', methods=['GET']) def descriptor(): return jsonify(get_descriptor()) # socket timer handler @app.route('/start_timer', methods=['GET']) def start_timer(): current_timestamp = int(time.time()) cosmostat_client.recent_check = current_timestamp log_data(log_output = f"Timestamp updated to {cosmostat_client.recent_check}", log_level = "noisy_test") return jsonify( { "message": "websocket timer reset", "new_timestamp": cosmostat_client.recent_check } ) # socket timer data @app.route('/timer_data', methods=['GET']) def timer_data(): time_now = time.time() time_lapsed = time_now - float(cosmostat_client.recent_check) result = { "Time Lapsed": time_lapsed, "Current Time Value": time_now, "Last Update Value": float(cosmostat_client.recent_check), "System Updating": cosmostat_client.check_system_timer() } return jsonify(result) # test route @app.route('/test', methods=['GET']) def test(): this_cpu = cosmostat_client.get_components(component_type="CPU") return jsonify( { "component_count:": len(cosmostat_client.components), "user": jenkins_user_settings(), "hostname": jenkins_hostname_settings(), "cpu_model": this_cpu[0].description } ) ####################################################################### ### Client Flask Helpers ####################################################################### # needs to return array of {name: name, type: type, metrics: metrics} # for redis table generation, includes system and component metrics def get_dynamic_data(human_readable = False): return cosmostat_client.get_live_metrics(human_readable) def get_static_data(human_readable = False): result = [] return cosmostat_client.get_static_metrics(human_readable) def get_php_summary(): system_properties = cosmostat_client.get_system_properties(human_readable = True, php_extra = True) system_components = [] for component in cosmostat_client.get_components(): this_component = { "component_name": component.name, "info_strings": component.get_properties_strings(return_simple = True) } system_components.append(this_component) if run_cosmostat_server(): print(cosmostat_client.name) client_uuid = cosmostat_server.get_uuid_from_hostname(cosmostat_client.name) print(client_uuid) data_timestamp = cosmostat_server.get_system(client_uuid) print(data_timestamp) component_age = { "component_name": "Data Timestamp", "info_strings": f"Data is {data_timestamp} seconds old" } system_components.append(component_age) result = [{ "system_properties": system_properties, "system_components": system_components }] return result def get_descriptor(): return cosmostat_client.get_component_class_tree() def generate_state_definition(): result = { "uuid": cosmostat_client.uuid, "state_definition": get_php_summary() } return result ####################################################################### ### Server Flask Routes ####################################################################### # update client on server @app.route('/update_client', methods=['POST']) def update_client(): result = {} # check the request and return payload dict {} if all good payload = client_submit_check(request = request, dict_name = "redis_data") result = run_update_client(payload) return jsonify(result), 200 # create client on server @app.route('/create_client', methods=['POST']) def create_client(): result = {} # check the request and return payload dict {} if all good payload = client_submit_check(request = request, dict_name = "client_properties") if not cosmostat_server.check_uuid(payload["uuid"]): result = run_create_client(payload) else: result = {"message": "object already exists, skipping creation"} return jsonify(result), 200 # api to validate Cosmostat Class @app.route('/client_summary', methods=['GET']) def client_summary(): result = [] if run_cosmostat_server(): result = get_client_summary() else: result = {"message": "server not running on this endpoint"} return jsonify(result) # api to pull all data @app.route('/client_details', methods=['GET']) def client_details(): result = [] if run_cosmostat_server(): result = get_client_details() else: result = {"message": "server not running on this endpoint"} return jsonify(result) # api to get all hostnames @app.route('/client_hostnames', methods=['GET']) def client_hostnames(): result = [] if run_cosmostat_server(): result = cosmostat_server.get_client_hostnames(send_age = True) else: result = {"message": "server not running on this endpoint"} return jsonify(result) # api to get server redis data @app.route('/get_server_redis', methods=['GET']) def get_server_redis(): result = [] if run_cosmostat_server(): result = get_server_redis_data() else: result = {"message": "server not running on this endpoint"} return jsonify(result) ####################################################################### ### Server Flask Helpers ####################################################################### # update client on server def run_update_client(this_client): if public_api_check(this_client): if not cosmostat_server.check_uuid(this_client["uuid"]): return { "message": "client not found" } else: timestamp_update = cosmostat_server.update_system(system_dictionary = this_client["redis_data"], system_uuid = this_client["uuid"]) update_status = f'updated client {this_client["short_id"]}' return { "status": update_status, "uuid": this_client["uuid"], "redis_data": this_client, "timestamp_update": timestamp_update } else: return{ "status": "api failure" } # create client on server def run_create_client(this_client): if public_api_check(this_client): timestamp_update = cosmostat_server.add_system(system_dictionary = this_client) update_status = f'created client {this_client["short_id"]}' return { "status": update_status, "uuid": this_client["uuid"], "client_properties": this_client, "timestamp_update": timestamp_update } else: return{ "status": "api failure" } def public_api_check(this_client): result = False default_key = ''.join(secrets.choice(string.ascii_letters + string.digits) for _ in range(256)) api_key = this_client.get('API_KEY', default_key) if api_key == app_settings["REAL_API_KEY"]: result = True return result # flask submission check function def client_submit_check(request, dict_name: str): payload = {} required_keys = {"uuid", "short_id", "hostname", dict_name} if not request.is_json: logging.warning("Received non-JSON request") return jsonify({"error": "Content-type must be application/json"}), 400 payload = request.get_json(silent=True) if payload is None: logging.warning("Malformed JSON body") return jsonify({"error": "Malformed JSON"}), 400 missing = required_keys - payload.keys() if missing: raise ValueError(f"Missing required keys: {', '.join(sorted(missing))}") return payload # generate cosmostat server summary def get_client_summary(): result = [] for client in cosmostat_server.systems: data_age = time.time() - client.data_timestamp this_client = { "uuid": client.uuid, "short_id": client.name, "data_age": data_age, "hostname": client.hostname } result.append(this_client) if result == []: result = {"message": "no clients reporting"} return result # no redis data needed here def get_client_details(): result = [] for client in cosmostat_server.systems: data_age = time.time() - client.data_timestamp this_client = { "uuid": client.uuid, "short_id": client.name, "client_properties": client.client_properties, # "redis_data": client.redis_data, "hostname": client.hostname } result.append(this_client) if result == []: result = {"message": "no clients reporting"} return result ####################################################################### ### Cosmostat Client Subroutines ####################################################################### # since the API isn't running # def local_client_update(): # Cosmostat Client Reporter def client_update(): api_url = f"{cosmostat_server_api()}update_client" print(api_url) payload = get_client_payload(get_client_redis_data(human_readable = False), "redis_data") log_data(log_output = "client_update - redis data from local client:", log_level = "noisy_test") log_data(log_output = payload, log_level = "noisy_test") # execute API call result = client_submission_handler(api_url, payload) client_api_initialize() return result # Cosmostat Client Initializer def client_api_initialize(): api_url = f"{cosmostat_server_api()}create_client" # generate payload payload = get_client_payload(get_php_summary(), "client_properties") # execute API call result = client_submission_handler(api_url, payload) return result # Cosmostat Client API Reporting Handler def client_submission_handler(api_url: str, payload: dict): result = None try: # `json=` automatically sets Content-Type to application/json response: Response = requests.post(api_url, json=payload, timeout=4) response.raise_for_status() # raise HTTPError for 4xx/5xx except RequestException as exc: # Wrap the low-level exception in a more descriptive one log_data(log_output = f"Failed to POST to {api_url!r}: {exc}", log_level = "log_output") # process reply from API try: result = response.json() except ValueError as exc: log_data(log_output = "Server responded with non-JSON payload: {response.text!r}", log_level = "log_output") return result def get_client_payload(system_dictionary: dict, dictionary_name: str): this_uuid = cosmostat_client.uuid this_short_id = cosmostat_client.short_id this_hostname = cosmostat_client.name payload = { "uuid": this_uuid, "short_id": this_short_id, "hostname": this_hostname, dictionary_name: system_dictionary, "API_KEY": app_settings["REAL_API_KEY"] } return payload ####################################################################### ####################################################################### ### Main Subroutine ####################################################################### ####################################################################### if __name__ == '__main__': ###################################### ### Main Functions ###################################### # instantiate and return the Client System object def new_cosmos_client(): new_client = System(f"{jenkins_hostname_settings()}") log_data(log_output = f"New System object name: {new_client.name} - {new_client.get_component_count()} components:", log_level = "log_output") for component in new_client.components: log_data(log_output = component.description, log_level = "log_output") return new_client # instantiate and return the Cosmoserver System object def new_cosmostat_server(): new_server = CosmostatServer(cosmostat_client.uuid) log_data(log_output = f"New Cosmostat serverobject name: {new_server.name}", log_level = "log_output") return new_server # Background Loop Function def background_loop(): # Update all data on the local System object if cosmostat_client.check_system_timer() or run_cosmostat_server(): cosmostat_client.update_system_state() # publish to redis if the web dashboard is active locally if app_settings["push_redis"] and not app_settings["disable_local_api"]: update_redis_server() # report data to the server if configured if run_cosmostat_reporter(): if int(time.time()) % 5 == 0 and not cosmostat_client.check_system_timer(): cosmostat_client.update_system_state() client_update() # if this is the server, do this stuff if run_cosmostat_server(): # purge stale client systems cosmostat_server.purge_stale_hostnames() # report the server's own client object to itself run_update_client(get_client_payload(get_client_redis_data(human_readable = False), "redis_data")) log_data(log_output = f"{this_client}", log_level = "noisy_test") time.sleep(0.5) ###################################### # instantiate client ###################################### # local client System Class Object cosmostat_client = new_cosmos_client() # remote client reporter if app_settings["cosmostat_server_reporter"] and not app_settings["cosmostat_server"]: client_api_initialize() ###################################### # instantiate server ###################################### cosmostat_server = None if run_cosmostat_server(): cosmostat_server = new_cosmostat_server() this_client = get_client_payload(get_php_summary(), "client_properties") timestamp_update = cosmostat_server.add_system(system_dictionary = this_client) ###################################### # send initial stats update to redis ###################################### if app_settings["push_redis"] and not app_settings["disable_local_api"]: update_redis_server() ###################################### # Flask scheduler for scanner ###################################### if app_settings["run_background"] and not app_settings["disable_local_api"]: log_data(log_output = "Loading flask background subroutine...", log_level = "log_output") scheduler.add_job(id='background_loop', func=background_loop, trigger='interval', seconds=app_settings["update_frequency"]) scheduler.init_app(app) scheduler.start() log_data(log_output = "...Done", log_level = "log_output") else: log_data(log_output = "Skipping flask background task", log_level = "log_output") ###################################### # Flask API ###################################### print(f"gateway: {service_gateway_ip()} - port: {service_api_port()}") if not app_settings["disable_local_api"]: app.run(debug=False, host=service_gateway_ip(), port=service_api_port()) else: # if local API disabled, phone home if configured print("Internal API Disabled.") while True: if int(time.time()) % 5 == 0 and not cosmostat_client.check_system_timer(): cosmostat_client.update_system_state() client_update() time.sleep(0.5)