Files
ansible-windows/roles/storage_api/templates/disk_service.py
2026-04-26 14:58:35 -07:00

307 lines
10 KiB
Python

# this got big...
from flask import Flask, jsonify
from flask_apscheduler import APScheduler
import psutil
import os
import requests
from requests import RequestException, Response
import json
from subprocess import check_output
import win32api
from datetime import datetime, timedelta
import sys
from pathlib import Path
import wmi
import pythoncom
import ctypes
from ctypes import wintypes
app = Flask(__name__)
scheduler = APScheduler()
app.config['JSONIFY_PRETTYPRINT_REGULAR'] = True
# human readable bytes
def bytes_to_human_readable(bytes):
for unit in ['B', 'KB', 'MB', 'GB', 'TB']:
if bytes < 1024.0:
return f"{bytes:.2f} {unit}"
bytes /= 1024.0
# Parse cache and return results
def get_crystal_disk_info():
try:
# Read the generated DiskInfo.txt file
with open("DiskInfo.txt", "r") as file:
output = file.read()
# Initialize a list to hold each drive's data
drives = []
# Split the file content into sections for each drive
drive_sections = output.split('----------------------------------------------------------------------------')
disk_id = 0
for section in drive_sections:
lines = section.strip().splitlines()
data = {
"Hostname": None,
"Disk Size": None,
"Model": None,
"Serial Number": None,
"Firmware": None,
"Temperature": None,
"Health Status": None,
"Power On Hours": None,
"Power On Count": None,
"Host Writes": None,
"Wear Level Count": None,
"Drive Letter": None,
"Interface": None,
"Transfer Mode": None
}
for line in lines:
if "Model" in line:
if ":" in line:
data["Model"] = line.split(":", 1)[1].strip()
elif "Serial Number" in line:
if ":" in line:
data["Serial Number"] = line.split(":", 1)[1].strip()
elif "Firmware" in line:
if ":" in line:
data["Firmware"] = line.split(":", 1)[1].strip()
elif "Temperature" in line:
if ":" in line:
data["Temperature"] = line.split(":", 1)[1].strip()
elif "Health Status" in line:
if ":" in line:
data["Health Status"] = line.split(":", 1)[1].strip()
elif "Power On Hours" in line:
if ":" in line:
data["Power On Hours"] = line.split(":", 1)[1].strip()
elif "Power On Count" in line:
if ":" in line:
data["Power On Count"] = line.split(":", 1)[1].strip()
elif "Host Writes" in line:
if ":" in line:
data["Host Writes"] = line.split(":", 1)[1].strip()
elif "Wear Level Count" in line:
if ":" in line:
data["Wear Level Count"] = line.split(":", 1)[1].strip()
elif "Drive Letter" in line:
if ":" in line:
data["Drive Letter"] = line.split(":", 1)[1].strip()
elif "Disk Size" in line:
if ":" in line:
raw = line.split(":", 1)[1].strip()
data["Disk Size"] = raw.split('GB')[0].strip() + ' GB'
elif "Interface" in line:
if ":" in line:
data["Interface"] = line.split(":", 1)[1].strip()
elif "Transfer Mode" in line:
if ":" in line:
data["Transfer Mode"] = line.split(":", 1)[1].strip()
# This makes sure something was changed,
if any(value is not None for value in data.values()):
data["Disk ID"] = disk_id
drives.append(data)
disk_id = disk_id + 1
#data["Hostname"] = "{{ hostname_output.stdout_lines[0] }}"
if not drives:
raise ValueError("No drive data found")
return {"drives": drives}
except Exception as e:
return {"error": str(e)}
# Disk Info Function
def get_disk_info():
disk_info = []
partitions = psutil.disk_partitions()
for partition in partitions:
usage = psutil.disk_usage(partition.mountpoint)
drive_letter = partition.device.replace('\\\\', '\\').rstrip('\\')
disk_info.append({
'device': drive_letter,
'label': get_drive_label(drive_letter),
#'mountpoint': partition.mountpoint,
'fstype': partition.fstype,
'total': bytes_to_human_readable(usage.total),
'used': bytes_to_human_readable(usage.used),
'free': bytes_to_human_readable(usage.free),
'percent': usage.percent
})
return disk_info
# drive label function
def get_drive_label(drive_letter: str) -> str:
result = "none"
root = drive_letter.strip()
if not root.endswith(':'):
root += ':'
if not root.endswith('\\'):
root += '\\'
# Make sure the drive actually exists
if not os.path.exists(root):
# Not a valid drive letter, return None so the caller can decide what to do.
print(f"[DEBUG] Drive '{root}' does not exist.")
result = "drive does not exist 0_o"
# Prepare buffers for the Win32 API call
volume_name_buf = ctypes.create_unicode_buffer(260) # MAX_PATH
fs_name_buf = ctypes.create_unicode_buffer(260)
serial_number = wintypes.DWORD()
max_component_len = wintypes.DWORD()
file_system_flags = wintypes.DWORD()
# Call GetVolumeInformationW
res = ctypes.windll.kernel32.GetVolumeInformationW(
ctypes.c_wchar_p(root), # lpRootPathName
volume_name_buf, # lpVolumeNameBuffer
ctypes.sizeof(volume_name_buf), # nVolumeNameSize
ctypes.byref(serial_number), # lpVolumeSerialNumber
ctypes.byref(max_component_len), # lpMaximumComponentLength
ctypes.byref(file_system_flags), # lpFileSystemFlags
fs_name_buf, # lpFileSystemNameBuffer
ctypes.sizeof(fs_name_buf) # nFileSystemNameSize
)
if res == 0: # The call failed
err = ctypes.get_last_error()
print(f"[ERROR] GetVolumeInformationW failed for '{root}'. "
f"Win32 error code: {err}")
result = "label error"
else:
result = volume_name_buf.value
if volume_name_buf.value == '':
result = "no_label"
return result
# os info function
def get_os_info() -> str:
result = "windows"
pythoncom.CoInitialize()
#try:
wmi_data = wmi.WMI()
os_info = wmi_data.Win32_OperatingSystem()[0]
#return {
# "Name" : os_info.Name,
# "Version" : os_info.Version,
# "BuildNumber": os_info.BuildNumber,
# "InstallDate": os_info.InstallDate,
# "ProductType": int(os_info.ProductType)
#}
# 1. Major version + edition (e.g. "10 Pro")
# os_info.Caption → "Microsoft Windows 10 Pro"
parts = os_info.Caption.split()
major = parts[2] # "10"
edition = parts[3] # "Pro" (for server: "2019", etc.)
major_edition = f"{major} {edition}"
# 2. Build number
build = os_info.BuildNumber
# 3. Install date (WMI gives an ISO-8601 string)
# e.g. "20210930142300.000000+000"
install_ts = os_info.InstallDate[:14] # "20210930142300"
dt = datetime.strptime(install_ts, "%Y%m%d%H%M%S")
install_date = f"{dt.month}-{dt.day}-{dt.year}"
result = f"Windows {major_edition} - Build {build} - Installed {install_date}"
#except Exception as e:
# print(e)
# result = "wmi_error"
return result
# server reporter info
def get_server_info() -> dict:
result = {}
drives_dict = get_crystal_disk_info()
data_dict = {
"hostname": "{{ hostname_output.stdout_lines[0] }}",
"os_string": get_os_info(),
"drives": drives_dict["drives"],
"API_KEY": "deadbeef",
"storage_summary": get_disk_info()
}
result = data_dict
return result
# Flask endpoints
@app.route('/disk', methods=['GET'])
def disk():
return jsonify(get_disk_info())
@app.route('/health', methods=['GET'])
def drive_health():
return jsonify(get_crystal_disk_info())
@app.route('/full', methods=['GET'])
def full_summary():
return jsonify(get_server_info())
@app.route('/test', methods=['GET'])
def test_flask():
return jsonify({
"message": "hello world",
"os_string": get_os_info()
})
def server_reporter():
#base_url="https://cosmostat.matt-cloud.com"
base_url="http://10.200.27.20:5001"
url = f"{base_url}/storage_client_update"
data_dict = get_server_info()
result = []
try:
response = requests.post(url, json=data_dict)
# Raise an exception for non-2xx status codes
response.raise_for_status()
result = response.json()
except:
result = {
"message": "error"
}
# Return the JSON payload
return result
if __name__ == '__main__':
# disk info Loop Function
def update_disk_info():
diskinfo_command = f"{{ storage_api_root }}\\dist\\DiskInfo64.exe /CopyExit"
result = check_output(diskinfo_command, shell=True)
print(result)
server_reporter()
return result
# gonna try something wild
run_date = datetime.now() + timedelta(seconds=15)
scheduler.add_job(id='init_disk_info',
func=update_disk_info,
trigger='date',
run_date=run_date)
scheduler.add_job(id='update_disk_info',
func=update_disk_info,
trigger='interval',
seconds=10000)
scheduler.add_job(id='server_reporter',
func=server_reporter,
trigger='interval',
seconds=15)
scheduler.init_app(app)
scheduler.start()
#update_disk_info()
app.run(host='0.0.0.0', port={{ api_service_port }})