145 lines
4.9 KiB
Python
145 lines
4.9 KiB
Python
from flask import Flask, jsonify, request
|
|
import sqlite3
|
|
import json
|
|
import os
|
|
|
|
app = Flask(__name__)
|
|
db_path = '/opt/ssd_health/drive_records.db'
|
|
|
|
# init db function
|
|
def init_db():
|
|
print("Initializing DB")
|
|
db_check = "SELECT name FROM sqlite_master WHERE type='table' AND name='drive_records';"
|
|
create_table_command = """
|
|
CREATE TABLE drive_records (
|
|
id INTEGER PRIMARY KEY,
|
|
serial TEXT NOT NULL,
|
|
model TEXT NOT NULL,
|
|
flavor TEXT NOT NULL,
|
|
capacity TEXT NOT NULL,
|
|
TBW TEXT NOT NULL,
|
|
smart TEXT NOT NULL
|
|
);
|
|
"""
|
|
# this code deletes the db file if 0 bytes
|
|
if os.path.exists(db_path) and os.path.getsize(db_path) == 0:
|
|
try:
|
|
print("Database is 0 bytes, deleting.")
|
|
os.remove(db_path)
|
|
except Exception as e:
|
|
print(f"error during file deletion - 405: {e}")
|
|
return jsonify({'error during file deletion': e}), 405
|
|
try:
|
|
result = bool(query_db(db_check))
|
|
# Check if any tables were found
|
|
if result:
|
|
print(result)
|
|
print("drive_records exists - 205")
|
|
else:
|
|
print("drive_records does not exist, creating")
|
|
try:
|
|
result_init = query_db(create_table_command)
|
|
print(result_init)
|
|
print("Database created - 201")
|
|
except sqlite3.Error as e:
|
|
print(f"error during table initialization: {e}")
|
|
return jsonify({'error during table initialization - 401': e}), 401
|
|
|
|
except sqlite3.Error as e:
|
|
print(f"error during table check: {e}")
|
|
return jsonify({'error during table check - 400': e}), 400
|
|
|
|
# sqlite query function with lots of protection
|
|
def query_db(sql_query):
|
|
try:
|
|
with sqlite3.connect(db_path) as conn:
|
|
cursor = conn.cursor()
|
|
print("Executing SQL query:", sql_query)
|
|
cursor.execute(sql_query)
|
|
rows = cursor.fetchall()
|
|
return rows
|
|
except sqlite3.Error as e:
|
|
print("An error occurred:", e)
|
|
return []
|
|
|
|
# Function to return all drive records in database
|
|
def get_all_drive_records():
|
|
get_all_drives = "SELECT * FROM drive_records"
|
|
rows = query_db(get_all_drives)
|
|
drives = []
|
|
for row in rows:
|
|
drive = {
|
|
'id': row[0],
|
|
'serial': row[1],
|
|
'model': row[2],
|
|
'flavor': row[3],
|
|
'capacity': row[4],
|
|
'TBW': row[5],
|
|
'smart': row[6]
|
|
}
|
|
drives.append(drive)
|
|
return jsonify(drives)
|
|
|
|
# Function to check if a serial number exists in the database
|
|
def check_serial_exists(serial):
|
|
serial_check = f"SELECT * FROM drive_records WHERE serial='{serial}'"
|
|
print(serial_check)
|
|
return bool(query_db(serial_check))
|
|
|
|
# Route to check if a serial number exists in the database
|
|
@app.route('/check', methods=['GET'])
|
|
def check():
|
|
serial_lookup = request.args.get('serial_lookup')
|
|
print(f"Serial to check: {serial_lookup}")
|
|
if not serial_lookup:
|
|
return jsonify({'error': 'No serial number provided'}), 400
|
|
|
|
exists = check_serial_exists(serial_lookup)
|
|
return jsonify({'serial_number_exists': exists})
|
|
|
|
# Route to get all drive records in JSON format
|
|
@app.route('/drives', methods=['GET'])
|
|
def index():
|
|
return get_all_drive_records()
|
|
|
|
# Route to add drive in database
|
|
# serial,model,flavor,capacity,TBW,smart
|
|
@app.route('/add_drive', methods=['GET'])
|
|
def add_drive():
|
|
serial = request.args.get('serial')
|
|
model = request.args.get('model')
|
|
flavor = request.args.get('flavor')
|
|
capacity = request.args.get('capacity')
|
|
TBW = request.args.get('TBW')
|
|
smart = request.args.get('smart')
|
|
if None in [serial, model, flavor, capacity, TBW, smart]:
|
|
return jsonify({'error': 'Missing required query parameter(s)'}), 400
|
|
add_drive_query = f"INSERT INTO drive_records (serial, model, flavor, capacity, TBW, smart) VALUES ('{serial}', '{model}', '{flavor}', '{capacity}', '{TBW}', '{smart}'); "
|
|
print(add_drive_query)
|
|
return jsonify(query_db(add_drive_query))
|
|
|
|
# Route to update drive in database
|
|
# serial,TBW,smart
|
|
@app.route('/update_drive', methods=['GET'])
|
|
def update_drive():
|
|
serial = request.args.get('serial')
|
|
TBW = request.args.get('TBW')
|
|
smart = request.args.get('smart')
|
|
if None in [serial, TBW, smart]:
|
|
return jsonify({'error': 'Missing required query parameter(s)'}), 400
|
|
update_drive_query = f"UPDATE drive_records SET TBW = '{TBW}', smart = '{smart}' WHERE serial = '{serial}';"
|
|
print(update_drive_query)
|
|
return jsonify(query_db(update_drive_query))
|
|
|
|
# test route
|
|
@app.route('/test', methods=['GET'])
|
|
def test():
|
|
db_check = "SELECT name FROM sqlite_master WHERE type='table' AND name='drive_records';"
|
|
return query_db(db_check)
|
|
|
|
if __name__ == '__main__':
|
|
result=init_db()
|
|
print(result)
|
|
app.run(debug=True, host='172.17.0.1', port=5000)
|
|
|
|
|