Files
ssd_health/files/scripts/app.py
2025-11-30 22:17:14 -08:00

376 lines
14 KiB
Python

from flask import Flask, jsonify, request
import sqlite3
import redis, json, time
import os
import subprocess
import re
app = Flask(__name__)
db_path = '/opt/ssd_health/drive_records.db'
debug_output = False
secure_api = True
####################################################
### Redis Functions
####################################################
r = redis.Redis(host='172.17.0.1', port=6379)
def update_disk_redis():
active = get_active_drive_records(as_json=False)
all_rec = get_all_drive_records(as_json=False)
enriched = merge_active_with_details(active, all_rec)
r.publish('attached_disks', json.dumps(enriched))
if debug_output:
print("=== Active drives sent to Redis ===")
print(json.dumps(enriched, indent=2))
def update_stats_redis():
# store the data in vm_list
data = get_host_stats(as_json=False)
# push data to redis
# Publish to the Redis channel that the WS server is listening on
r.publish('host_stats', json.dumps(data))
if debug_output:
print("=== Stats Redis Update ===")
print(json.dumps(data, indent=2))
return True
def merge_active_with_details(active, all_records):
# Build a quick lookup dictionary keyed by serial
record_by_serial = {rec['serial']: rec for rec in all_records}
# Add the extra fields to each active drive
for drive in active:
rec = record_by_serial.get(drive['serial'])
if rec:
extra = {k: v for k, v in rec.items() if k not in ('id', 'serial')}
drive.update(extra)
return active
####################################################
### Host Stats Function
####################################################
def get_host_stats(as_json=False):
total_memory_command = "free -h | grep 'Mem:' | awk '{print $2}'"
total_memory = run_command(total_memory_command, zero_only=True)
used_memory_command = "free -h | grep 'Mem:' | awk '{print $3}'"
used_memory = run_command(used_memory_command, zero_only=True)
free_memory_command = "free -h | grep 'Mem:' | awk '{print $4}'"
free_memory = run_command(free_memory_command, zero_only=True)
cpu_load_command = "uptime | grep -oP '(?<=age: ).*'"
cpu_load = run_command(cpu_load_command, zero_only=True)
# nano pi command
#cpu_temp_command = "sensors | grep 'temp1:' | cut -d+ -f 2 | awk '{print $1}'"
cpu_temp_command = "sensors | grep Package | cut -d+ -f 2 | awk '{print $1}'"
cpu_temp = run_command(cpu_temp_command, zero_only=True)
cpu_temp_stripped = re.sub(r'\u00b0C', '', cpu_temp)
cpu_temp_fixed = f"{cpu_temp_stripped} C"
ip_address_command = "ip -o -4 ad | grep -e eth -e tun | awk '{print $2\": \" $4}'"
ip_addresses = run_command(ip_address_command, zero_only=True)
time_now_command = "date +%r"
time_now = run_command(time_now_command, zero_only=True)
# Redis stores in this order, or at least the html renders it in this order
stats = [{
"memory_total": total_memory,
"memory_used": used_memory,
"memory_free": free_memory,
"cpu_load": cpu_load,
"cpu_temp": cpu_temp_fixed,
"ip_addresses": ip_addresses,
"time": time_now
}]
if debug_output:
print("=== Current Host Stats ===")
print(json.dumps(stats, indent=2))
return jsonify(stats) if as_json else stats
####################################################
### db functions
####################################################
def init_db():
print("Checking Database...")
db_check = "SELECT name FROM sqlite_master WHERE type='table' AND name='drive_records';"
create_table_command = """
CREATE TABLE drive_records (
id INTEGER PRIMARY KEY,
serial TEXT NOT NULL,
model TEXT NOT NULL,
flavor TEXT NOT NULL,
capacity TEXT NOT NULL,
TBW TEXT NOT NULL,
smart TEXT NOT NULL
);
"""
active_disks_command = """
CREATE TABLE active_disks (
id INTEGER PRIMARY KEY,
name TEXT,
serial TEXT,
size TEXT
);
"""
# this code deletes the db file if 0 bytes
if os.path.exists(db_path) and os.path.getsize(db_path) == 0:
try:
print("Database file exists and is 0 bytes, deleting.")
os.remove(db_path)
except Exception as e:
print(f"error during file deletion - 405: {e}")
return jsonify({'error during file deletion': e}), 405
try:
result = bool(query_db(db_check))
print(result)
# Check if any tables were found
if result:
print("drive_records exists - 205")
else:
print("drive_records does not exist, creating")
try:
result_init = query_db(create_table_command)
result_active = query_db(active_disks_command)
print(result_init)
print(result_active)
print("Database created - 201")
except sqlite3.Error as e:
print(f"error during table initialization: {e}")
return jsonify({'error during table initialization - 401': e}), 401
except sqlite3.Error as e:
print(f"error during table check: {e}")
return jsonify({'error during table check - 400': e}), 400
# sqlite query function with lots of protection
def query_db(sql_query):
try:
with sqlite3.connect(db_path) as conn:
cursor = conn.cursor()
if debug_output:
print("Executing SQL query:", sql_query)
cursor.execute(sql_query)
rows = cursor.fetchall()
if debug_output:
print("Query Result:", rows)
return rows
except sqlite3.Error as e:
print("An error occurred:", e)
return []
# is this redundant? oh my, yes
# does it save me time? also, big yes
# note how the one above doesn't have the query params
# i don't want to re-write the subroutine i took from the VM party
def query_database(query_string, query_params=None):
if debug_output:
print(query_string, query_params)
# Connect to the SQLite database (or create it if it doesn't exist)
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
if query_params is not None:
cursor.execute(query_string, query_params)
else:
cursor.execute(query_string)
result = cursor.fetchall()
if debug_output:
print(result)
# Commit the transaction and close the connection
conn.commit()
conn.close()
return result
####################################################
### Other Helper Functions
####################################################
# subroutine to run a command, return stdout as array unless zero_only then return [0]
def run_command(cmd, zero_only=False):
# Run the command and capture the output
result = subprocess.run(cmd, shell=True, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
# Decode the byte output to a string
output = result.stdout.decode('utf-8')
# Split the output into lines and store it in an array
output_lines = [line for line in output.split('\n') if line]
# Return result
return output_lines[0] if zero_only else output_lines
# Function to return all drive records in database
def get_all_drive_records(as_json=True):
get_all_drives = "SELECT * FROM drive_records"
rows = query_db(get_all_drives)
drives = []
for row in rows:
drive = {
'id': row[0],
'serial': row[1],
'model': row[2],
'flavor': row[3],
'capacity': row[4],
'TBW': row[5],
'smart': row[6]
}
drives.append(drive)
return jsonify(drives) if as_json else drives
# Function to return all active drives in database
def get_active_drive_records(as_json=True):
get_active_drives = "SELECT * FROM active_disks"
rows = query_db(get_active_drives)
drives = []
for row in rows:
drive = {
'id': row[0],
'name': row[1],
'serial': row[2],
'size': row[3]
}
drives.append(drive)
return jsonify(drives) if as_json else drives
# Function to check if a serial number exists in the database
def check_serial_exists(serial):
serial_check = f"SELECT * FROM drive_records WHERE serial='{serial}'"
if debug_output:
print(serial_check)
return bool(query_db(serial_check))
####################################################
### Flask Routes
####################################################
# Route to check if a serial number exists in the database
@app.route('/check', methods=['GET'])
def check():
serial_lookup = request.args.get('serial_lookup')
if debug_output:
print(f"Serial to check: {serial_lookup}")
if not serial_lookup:
return jsonify({'error': 'No serial number provided'}), 400
exists = check_serial_exists(serial_lookup)
return jsonify({'serial_number_exists': exists, 'serial_lookup': serial_lookup})
# Route to get all drive records in JSON format
@app.route('/drives', methods=['GET'])
def index():
return get_all_drive_records()
# Route to add drive in database
# serial,model,flavor,capacity,TBW,smart
@app.route('/add_drive', methods=['GET'])
def add_drive():
serial = request.args.get('serial')
model = request.args.get('model')
flavor = request.args.get('flavor')
capacity = request.args.get('capacity')
TBW = request.args.get('TBW')
smart = request.args.get('smart')
if None in [serial, model, flavor, capacity, TBW, smart]:
return jsonify({'error': 'Missing required query parameter(s)'}), 400
add_drive_query = f"INSERT INTO drive_records (serial, model, flavor, capacity, TBW, smart) VALUES ('{serial}', '{model}', '{flavor}', '{capacity}', '{TBW}', '{smart}'); "
if debug_output:
print(add_drive_query)
return jsonify(query_db(add_drive_query))
# Route to update drive in database
# serial,TBW,smart
@app.route('/update_drive', methods=['GET'])
def update_drive():
serial = request.args.get('serial')
TBW = request.args.get('TBW')
smart = request.args.get('smart')
if None in [serial, TBW, smart]:
return jsonify({'error': 'Missing required query parameter(s)'}), 400
update_drive_query = f"UPDATE drive_records SET TBW = '{TBW}', smart = '{smart}' WHERE serial = '{serial}';"
if debug_output:
print(update_drive_query)
return jsonify(query_db(update_drive_query))
# Route to return active drives
@app.route('/list_active_drives', methods=['GET'])
def list_active_drives():
return get_active_drive_records()
# list disks as sda,serial
def list_disk_and_serial():
# Init blank devices array
devices = []
# get the devices
cmd = "lsblk -o NAME,SERIAL,SIZE,TYPE | grep sd | grep disk | awk '{print $1 \",\" $2. \",\" $3}'"
# try to run the command, should not fail
try:
devices = run_command(cmd)
except subprocess.CalledProcessError as e:
print(f"An error occurred: {e.stderr.decode('utf-8')}")
# return the devices as an array
return sorted([item for item in devices if item])
# Route to refresh active drives
@app.route('/refresh_active_drives', methods=['GET'])
def refresh_active_drives(): # List of items to be inserted; each item is a tuple (name, serial, size)
current_items = list_disk_and_serial()
# Loop through the list and insert items, checking for duplicates based on 'serial'
for item in current_items:
item = item.split(',')
# Check if the serial already exists in the database
existing_item = query_database('SELECT * FROM active_disks WHERE name = ?', (item[0],))
if not existing_item:
# If no duplicate is found, insert the new item
if debug_output:
print(f"Disk /dev/{item[0]} inserted, updating database")
verified_serial = run_command(f"hdparm -I /dev/{item[0]} | grep 'Serial\ Number' | cut -d: -f2 | awk '{{print $1}}' ", zero_only=True)
if debug_output:
print(f"Verified serial number through smartctl: {verified_serial}")
item[1] = verified_serial
query_database('INSERT INTO active_disks (name, serial, size) VALUES (?, ?, ?)', item)
update_disk_redis()
# Remove items from the database that are not in the current list of items
# first grab all the disks in the database
for row in query_database('SELECT name, serial FROM active_disks'):
drive_object = ""
drive_serial = ""
# the drive is missing until proven present, let's see if it exists
not_found = True
# load the currently attached drives in another array
for item in current_items:
item = item.split(',')
# this is where the drive is found, set this to false
if row[0] == item[0]:
drive_object = item[0]
drive_serial = item[1]
not_found = False
# if the drive was not found in the above loop, it's missing, remove it and loop to the next record
if not_found:
target_name = row[0].split(',')
if debug_output:
print(f"Deleting disk /dev/{drive_object} - serial {drive_serial}")
query_database('DELETE FROM active_disks WHERE name = ?', target_name)
update_disk_redis()
update_disk_redis()
update_stats_redis()
return jsonify({"function": "update_disk_database"})
# host stats
@app.route('/host_stats', methods=['GET'])
def host_stats():
update_stats_redis()
return jsonify(get_host_stats())
# test route
@app.route('/test', methods=['GET'])
def test():
db_check = "SELECT name FROM sqlite_master WHERE type='table';"
return query_db(db_check)
if __name__ == '__main__':
result=init_db()
print(result)
if secure_api:
app.run(debug=True, host='172.17.0.1', port=5000)
else:
app.run(debug=True, host='0.0.0.0', port=5000)