Files
ssd_health/files/scripts/app.py
2025-11-02 20:31:51 -08:00

154 lines
5.4 KiB
Python

from flask import Flask, jsonify, request
import sqlite3
import json
import os
app = Flask(__name__)
db_path = '/opt/ssd_health/drive_records.db'
# init db function
def init_db():
print("Initializing DB")
db_check = "SELECT name FROM sqlite_master WHERE type='table' AND name='drive_records';"
create_table_command = """
CREATE TABLE drive_records (
id INTEGER PRIMARY KEY,
serial TEXT NOT NULL,
model TEXT NOT NULL,
flavor TEXT NOT NULL,
capacity TEXT NOT NULL,
TBW TEXT NOT NULL,
smart TEXT NOT NULL
);
"""
# this code deletes the db file if 0 bytes
if os.path.exists(db_path) and os.path.getsize(db_path) == 0:
try:
os.remove(db_path)
print("Database is 0 bytes, deleting.")
except Exception as e:
print(f"error during file deletion - 405: {e}")
return jsonify({'error during file deletion': e}), 405
try:
result = bool(query_db(db_check))
print(result)
# Check if any tables were found
if result:
print("drive_records exists - 205")
#return jsonify({'drive_records exists - 205': result}), 205
else:
print("drive_records does not exist, creating")
try:
result_init = query_db(create_table_command)
print("Database created - 201")
#return jsonify({'Database created': True, 'Result': result_init}), 201
except sqlite3.Error as e:
print(f"error during table initialization: {e}")
return jsonify({'error during table initialization - 401': e}), 401
#return len(result) > 0
except sqlite3.Error as e:
print(f"error during table check: {e}")
return jsonify({'error during table check - 400': e}), 400
# sqlite query function
def query_db(sql_query):
try:
# Establish a connection to the SQLite database using a context manager
with sqlite3.connect(db_path) as conn:
cursor = conn.cursor()
print("Executing SQL query:", sql_query)
cursor.execute(sql_query)
rows = cursor.fetchall()
return rows
except sqlite3.Error as e:
# Handle any potential errors that might occur during database operations
print("An error occurred:", e)
return []
#conn = sqlite3.connect(db_path)
#cursor = conn.cursor()
#cursor.execute(sql_query)
#rows = cursor.fetchall()
#conn.close()
#return rows
# Function to return all drive records in database
def get_all_drive_records():
get_all_drives = "SELECT * FROM drive_records"
rows = query_db(get_all_drives)
drives = []
for row in rows:
drive = {
'id': row[0],
'serial': row[1],
'model': row[2],
'flavor': row[3],
'capacity': row[4],
'TBW': row[5],
'smart': row[6]
}
drives.append(drive)
return jsonify(drives)
# Function to check if a serial number exists in the database
def check_serial_exists(serial):
serial_check = f"SELECT * FROM drive_records WHERE serial='{serial}'"
print(serial_check)
return bool(query_db(serial_check))
# Route to check if a serial number exists in the database
@app.route('/check', methods=['GET'])
def check():
serial_lookup = request.args.get('serial_lookup')
print(f"Serial to check: {serial_lookup}")
if not serial_lookup:
return jsonify({'error': 'No serial number provided'}), 400
exists = check_serial_exists(serial_lookup)
return jsonify({'serial_number_exists': exists})
# Route to get all drive records in JSON format
@app.route('/drives', methods=['GET'])
def index():
return get_all_drive_records()
# Route to add drive in database
# serial,model,flavor,capacity,TBW,smart
@app.route('/add_drive', methods=['GET'])
def add_drive():
serial = request.args.get('serial')
model = request.args.get('model')
flavor = request.args.get('flavor')
capacity = request.args.get('capacity')
TBW = request.args.get('TBW')
smart = request.args.get('smart')
if None in [serial, model, flavor, capacity, TBW, smart]:
return jsonify({'error': 'Missing required query parameter(s)'}), 400
add_drive_query = f"INSERT INTO drive_records (serial, model, flavor, capacity, TBW, smart) VALUES ('{serial}', '{model}', '{flavor}', '{capacity}', '{TBW}', '{smart}'); "
print(add_drive_query)
return jsonify(query_db(add_drive_query))
# Route to update drive in database
# serial,TBW,smart
@app.route('/update_drive', methods=['GET'])
def update_drive():
serial = request.args.get('serial')
TBW = request.args.get('TBW')
smart = request.args.get('smart')
if None in [serial, TBW, smart]:
return jsonify({'error': 'Missing required query parameter(s)'}), 400
update_drive_query = f"UPDATE drive_records SET TBW = '{TBW}', smart = '{smart}' WHERE serial = '{serial}';"
print(update_drive_query)
return jsonify(query_db(update_drive_query))
# test route
@app.route('/test', methods=['GET'])
def test():
db_check = "SELECT name FROM sqlite_master WHERE type='table' AND name='drive_records';"
return query_db(db_check)
if __name__ == '__main__':
result=init_db()
print(result)
app.run(debug=True, host='0.0.0.0', port=5000)