############################################################### # SSDObject.py # ssd object class function created for ssd monitor # functions for handling database and shell commands # class definitions and other class helper functions ############################################################### import sqlite3 import json, redis import subprocess db_path = '/opt/ssd_health/drive_records.db' debug_output = False show_records = False suppress_errors = False push_redis = False ############################################################### # Class Definition # # Most of the heavy lifting is done on object instantiation # Object is instantiated with just the dev_id and flavor # Drive data is collected on instantition and stored to database ############################################################### class SSDObject: def __str__(self) -> str: return f"""Drive at /dev/{self.dev_id} is a '{self.model}' {self.capacity} {self.flavor}, SN: {self.serial}""" def __init__(self, dev_id: str): self.dev_id = dev_id if not check_serial_attached(self.dev_id): raise TypeError(f"No device at /dev/{self.dev_id}") self._smart_data = return_smartctl(dev_id) self.flavor = self._get_flavor() if self.flavor == "HDD": raise TypeError("Unable to instantiate HDD") if self.flavor == "Error": raise TypeError("Unable to instantiate storage device") self.serial = self._smart_data['serial_number'] self.model = self._smart_data['model_name'] self.capacity_bytes = self._smart_data['user_capacity']['bytes'] self.smart_status = self._smart_data['smart_status']['passed'] self.capacity = self._get_human_capacity() self.sector_size = return_sector_size(self.dev_id) self.gb_written = self._get_gbw() self._update_db() def _get_flavor(self) -> str: if "rotation_rate" in json.dumps(self._smart_data): if int(self._smart_data['rotation_rate'] == 0): if not suppress_errors: print(f"Warning - /dev/{self.dev_id} is a weird SSD with a rotation rate of 0") return "SSD" else: return "HDD" elif "NVMe" in json.dumps(self._smart_data): return "NVMe" elif "Solid State" in json.dumps(self._smart_data): return "SSD" elif "Unknown USB bridge" in json.dumps(self._smart_data): return "Error" def _get_human_capacity(self) -> str: size = self.capacity_bytes factor = 1024 units = [ (factor ** 4, "TiB"), (factor ** 3, "GiB"), (factor ** 2, "MiB"), (factor ** 1, "KiB"), ] for thresh, suffix in units: if size > thresh: value = size / thresh return f"{value:.{0}f} {suffix}" def _get_gbw(self) -> str: result = '' gib_factor = 2 ** 30 if self.flavor == "SSD": data_units_written = return_ls_written(self._smart_data) result = round(data_units_written * self.sector_size / gib_factor, 2) elif self.flavor == "NVMe": data_units_written = float(self._smart_data['nvme_smart_health_information_log']['data_units_written']) result = round(data_units_written * self.sector_size / gib_factor, 2) return result def _update_db(self): if push_redis: update_disk_redis() if self.exists(): drive_query = f""" UPDATE drive_records SET gb_written = '{self.gb_written}', smart = '{self.smart_status}' WHERE serial = '{self.serial}'; """ else: drive_query = f""" INSERT INTO drive_records (serial, model, flavor, capacity, gb_written, smart) VALUES ('{self.serial}', '{self.model}', '{self.flavor}', '{self.capacity}', '{self.gb_written}', '{self.smart_status}'); """ query_db(drive_query) def exists(self) -> bool: return check_serial_exists(self.serial) def attached(self) -> bool: return(check_serial_attached(self.serial)) ######################################## # Other Helper Functions ######################################## # subroutine to run a command, return stdout as array unless zero_only then return [0] def run_command(cmd, zero_only=False): # Run the command and capture the output result = subprocess.run(cmd, shell=True, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) # Decode the byte output to a string output = result.stdout.decode('utf-8') # Split the output into lines and store it in an array output_lines = [line for line in output.split('\n') if line] # Return result try: return output_lines[0] if zero_only else output_lines except: return output_lines def return_smartctl(drive_id): smartctl_string = f"/usr/sbin/smartctl --json -x /dev/{drive_id} || true" smartctl_result = subprocess.run(smartctl_string, shell=True, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) smartctl_data = json.loads(smartctl_result.stdout.decode("utf-8")) return smartctl_data def return_sector_size(drive_id): sector_size_command = f"fdisk -l /dev/{drive_id} | grep 'Sector size' | awk '{{print $4}}'" sector_size_result = subprocess.run(sector_size_command, shell=True, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) return int(sector_size_result.stdout.decode("utf-8")) def return_ls_written(data): pages = data.get("ata_device_statistics", {}).get("pages", []) for page in pages: for entry in page.get("table", []): if entry.get("name") == "Logical Sectors Written": return entry.get("value") # Function to return all drive records in database def get_all_drive_records(): get_all_drives = "SELECT * FROM drive_records" rows = query_db(get_all_drives) drives = [] for row in rows: drive = { 'id': row[0], 'serial': row[1], 'model': row[2], 'flavor': row[3], 'capacity': row[4], 'gb_written': round(float(row[5]),2), 'smart': row[6] } drives.append(drive) return json.dumps(drives) # return attached disks def list_disk_and_serial(): # Init blank devices array devices = [] # get the devices cmd = "lsblk -o NAME,SERIAL,SIZE,TYPE | grep sd | grep disk | awk '{print $1 \",\" $2. \",\" $3}'" # try to run the command, should not fail try: while 'disk' in run_command(cmd, zero_only=False): time.sleep(0.5) devices = run_command(cmd, zero_only=False) except subprocess.CalledProcessError as e: print(f"An error occurred: {e.stderr.decode('utf-8')}") drives = [] for device in devices: if debug_output: print(device) drive = { "dev_id": device.split(',')[0], "serial": device.split(',')[1], "capacity": device.split(',')[2] } drives.append(drive) # return the devices as an array if debug_output: print(drives) return drives # Function to check if a serial number exists in the database def check_serial_exists(serial): serial_check = f"SELECT * FROM drive_records WHERE serial='{serial}'" if debug_output: print(serial_check) return bool(query_db(serial_check)) def check_serial_attached(serial): serial_check = f"lsblk -o NAME,SERIAL,SIZE,TYPE | grep {serial} || true" if run_command(serial_check, zero_only=False): return True else: return False # Function to run SQL Query def query_db(sql_query): try: with sqlite3.connect(db_path) as conn: cursor = conn.cursor() if debug_output: print("Executing SQL query:", sql_query) cursor.execute(sql_query) rows = cursor.fetchall() if debug_output: print("Query Result:", rows) conn.commit() conn.close() return rows except sqlite3.Error as e: if not suppress_errors: print("An error occurred:", e) return [] def init_db(): print("Checking Database...") db_check = "SELECT name FROM sqlite_master WHERE type='table' AND name='drive_records';" create_table_command = """ CREATE TABLE drive_records ( id INTEGER PRIMARY KEY, serial TEXT NOT NULL, model TEXT NOT NULL, flavor TEXT NOT NULL, capacity TEXT NOT NULL, gb_written TEXT NOT NULL, smart TEXT NOT NULL ); """ # check for drives try: result = bool(query_db(db_check)) if debug_output: print(f"Database exists: {result}") # Check if any tables were found if result: print("drive_records exists, skipping db init") if debug_output or show_records: all_drives = json.loads(get_all_drive_records()) print("--- Drive Records ---") for drive in all_drives: print(f"{drive['model']} - SN: {drive['serial']}") print("--- End Records ---") print() else: print("drive_records does not exist, creating") try: result_init = query_db(create_table_command) if debug_output: print(result_init) print("Database created - 201") except sqlite3.Error as e: if not suppress_errors: print(f"error during table initialization: {e}") return jsonify({'error during table initialization - 401': e}), 401 except sqlite3.Error as e: if not suppress_errors: print(f"error during table check: {e}") return jsonify({'error during table check - 400': e}), 400 #################################################### ### Redis Functions #################################################### r = redis.Redis(host='172.17.0.1', port=6379) def update_disk_redis(): active = list_disk_and_serial() all_rec = json.loads(get_all_drive_records()) enriched = merge_active_with_details(active, all_rec) r.publish('attached_disks', json.dumps(enriched)) if debug_output: print("=== Active drives sent to Redis ===") print(json.dumps(enriched, indent=2)) def merge_active_with_details(active, all_records): # Build a quick lookup dictionary keyed by serial record_by_serial = {rec['serial']: rec for rec in all_records} # Add the extra fields to each active drive for drive in active: rec = record_by_serial.get(drive['serial']) if rec: extra = {k: v for k, v in rec.items() if k not in ('id', 'serial')} drive.update(extra) return active ######################################## # Run init_db when Class file is imported ######################################## init_db()