import redis import subprocess import re import json #################################################### ### Redis Functions #################################################### debug_output = False r = redis.Redis(host='172.17.0.1', port=6379) def update_stats_redis(): # store the data in vm_list data = get_host_stats(as_json=False) # push data to redis # Publish to the Redis channel that the WS server is listening on r.publish('host_stats', json.dumps(data)) if debug_output: print("=== Stats Redis Update ===") print(json.dumps(data, indent=2)) return True #################################################### ### Host Stats Function #################################################### def get_host_stats(as_json=False): total_memory_command = "free -h | grep 'Mem:' | awk '{print $2}'" total_memory = run_shell(total_memory_command, zero_only=True) used_memory_command = "free -h | grep 'Mem:' | awk '{print $3}'" used_memory = run_shell(used_memory_command, zero_only=True) free_memory_command = "free -h | grep 'Mem:' | awk '{print $4}'" free_memory = run_shell(free_memory_command, zero_only=True) cpu_load_command = "uptime | grep -oP '(?<=age: ).*'" cpu_load = run_shell(cpu_load_command, zero_only=True) # nano pi command #cpu_temp_command = "sensors | grep 'temp1:' | cut -d+ -f 2 | awk '{print $1}'" cpu_temp_command = "sensors | grep -e Sensor -e Package | cut -d+ -f 2 | awk '{print $1}'" cpu_temp = run_shell(cpu_temp_command, zero_only=True) cpu_temp_stripped = re.sub(r'\u00b0C', '', cpu_temp) cpu_temp_fixed = f"{cpu_temp_stripped} C" ip_address_command = "ip -o -4 ad | grep -v -e docker -e 127.0.0.1 | awk '{print $2\": \" $4}'" ip_addresses = run_shell(ip_address_command, zero_only=True) time_now_command = "date +%r" time_now = run_shell(time_now_command, zero_only=True) # Redis stores in this order, or at least the html renders it in this order stats = [{ "memory_total": total_memory, "memory_used": used_memory, "memory_free": free_memory, "cpu_load": cpu_load, "cpu_temp": cpu_temp_fixed, "ip_addresses": ip_addresses, "time": time_now }] if check_for_battery(): battery_level_command = "acpi | grep Battery | awk {print'$3 \" \" $4'}" battery_level = run_shell(battery_level_command, zero_only=True) stats = [{ "memory_total": total_memory, "memory_used": used_memory, "memory_free": free_memory, "cpu_load": cpu_load, "cpu_temp": cpu_temp_fixed, "ip_addresses": ip_addresses, "battery_level": battery_level, "time": time_now }] if debug_output: print("=== Current Host Stats ===") print(json.dumps(stats, indent=2)) return jsonify(stats) if as_json else stats def check_for_battery(): battery_check_command = "acpi | grep Battery | awk {print'$1'}" battery_check = run_shell(battery_check_command, zero_only=True) if battery_check == 'Battery': return True else: return False # subroutine to run a command, return stdout as array unless zero_only then return [0] def run_shell(cmd, zero_only=False): # Run the command and capture the output result = subprocess.run(cmd, shell=True, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) # Decode the byte output to a string output = result.stdout.decode('utf-8') # Split the output into lines and store it in an array output_lines = [line for line in output.split('\n') if line] # Return result try: return output_lines[0] if zero_only else output_lines except: return output_lines