Files
ansible-windows/roles/storage_api/templates/disk_service.py
2026-04-26 14:36:25 -07:00

307 lines
10 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# this got big...
from flask import Flask, jsonify
from flask_apscheduler import APScheduler
import psutil
import os
import requests
from requests import RequestException, Response
import json
from subprocess import check_output
import win32api
from datetime import datetime, timedelta
import sys
from pathlib import Path
import wmi
import pythoncom
import ctypes
from ctypes import wintypes
app = Flask(__name__)
scheduler = APScheduler()
app.config['JSONIFY_PRETTYPRINT_REGULAR'] = True
# human readable bytes
def bytes_to_human_readable(bytes):
for unit in ['B', 'KB', 'MB', 'GB', 'TB']:
if bytes < 1024.0:
return f"{bytes:.2f} {unit}"
bytes /= 1024.0
# Parse cache and return results
def get_crystal_disk_info():
try:
# Read the generated DiskInfo.txt file
with open("DiskInfo.txt", "r") as file:
output = file.read()
# Initialize a list to hold each drive's data
drives = []
# Split the file content into sections for each drive
drive_sections = output.split('----------------------------------------------------------------------------')
disk_id = 0
for section in drive_sections:
lines = section.strip().splitlines()
data = {
"Hostname": None,
"Disk Size": None,
"Model": None,
"Serial Number": None,
"Firmware": None,
"Temperature": None,
"Health Status": None,
"Power On Hours": None,
"Power On Count": None,
"Host Writes": None,
"Wear Level Count": None,
"Drive Letter": None,
"Interface": None,
"Transfer Mode": None
}
for line in lines:
if "Model" in line:
if ":" in line:
data["Model"] = line.split(":", 1)[1].strip()
elif "Serial Number" in line:
if ":" in line:
data["Serial Number"] = line.split(":", 1)[1].strip()
elif "Firmware" in line:
if ":" in line:
data["Firmware"] = line.split(":", 1)[1].strip()
elif "Temperature" in line:
if ":" in line:
data["Temperature"] = line.split(":", 1)[1].strip()
elif "Health Status" in line:
if ":" in line:
data["Health Status"] = line.split(":", 1)[1].strip()
elif "Power On Hours" in line:
if ":" in line:
data["Power On Hours"] = line.split(":", 1)[1].strip()
elif "Power On Count" in line:
if ":" in line:
data["Power On Count"] = line.split(":", 1)[1].strip()
elif "Host Writes" in line:
if ":" in line:
data["Host Writes"] = line.split(":", 1)[1].strip()
elif "Wear Level Count" in line:
if ":" in line:
data["Wear Level Count"] = line.split(":", 1)[1].strip()
elif "Drive Letter" in line:
if ":" in line:
data["Drive Letter"] = line.split(":", 1)[1].strip()
elif "Disk Size" in line:
if ":" in line:
raw = line.split(":", 1)[1].strip()
data["Disk Size"] = raw.split('GB')[0].strip() + ' GB'
elif "Interface" in line:
if ":" in line:
data["Interface"] = line.split(":", 1)[1].strip()
elif "Transfer Mode" in line:
if ":" in line:
data["Transfer Mode"] = line.split(":", 1)[1].strip()
# This makes sure something was changed,
if any(value is not None for value in data.values()):
data["Disk ID"] = disk_id
drives.append(data)
disk_id = disk_id + 1
#data["Hostname"] = "{{ hostname_output.stdout_lines[0] }}"
if not drives:
raise ValueError("No drive data found")
return {"drives": drives}
except Exception as e:
return {"error": str(e)}
# Disk Info Function
def get_disk_info():
disk_info = []
partitions = psutil.disk_partitions()
for partition in partitions:
usage = psutil.disk_usage(partition.mountpoint)
drive_letter = partition.device.replace('\\\\', '\\').rstrip('\\')
disk_info.append({
'device': drive_letter,
'label': get_drive_label(drive_letter),
#'mountpoint': partition.mountpoint,
'fstype': partition.fstype,
'total': bytes_to_human_readable(usage.total),
'used': bytes_to_human_readable(usage.used),
'free': bytes_to_human_readable(usage.free),
'percent': usage.percent
})
return disk_info
# drive label function
def get_drive_label(drive_letter: str) -> str:
result = "none"
root = drive_letter.strip()
if not root.endswith(':'):
root += ':'
if not root.endswith('\\'):
root += '\\'
# Make sure the drive actually exists
if not os.path.exists(root):
# Not a valid drive letter, return None so the caller can decide what to do.
print(f"[DEBUG] Drive '{root}' does not exist.")
result = "drive does not exist 0_o"
# Prepare buffers for the Win32 API call
volume_name_buf = ctypes.create_unicode_buffer(260) # MAX_PATH
fs_name_buf = ctypes.create_unicode_buffer(260)
serial_number = wintypes.DWORD()
max_component_len = wintypes.DWORD()
file_system_flags = wintypes.DWORD()
# Call GetVolumeInformationW
res = ctypes.windll.kernel32.GetVolumeInformationW(
ctypes.c_wchar_p(root), # lpRootPathName
volume_name_buf, # lpVolumeNameBuffer
ctypes.sizeof(volume_name_buf), # nVolumeNameSize
ctypes.byref(serial_number), # lpVolumeSerialNumber
ctypes.byref(max_component_len), # lpMaximumComponentLength
ctypes.byref(file_system_flags), # lpFileSystemFlags
fs_name_buf, # lpFileSystemNameBuffer
ctypes.sizeof(fs_name_buf) # nFileSystemNameSize
)
if res == 0: # The call failed
err = ctypes.get_last_error()
print(f"[ERROR] GetVolumeInformationW failed for '{root}'. "
f"Win32 error code: {err}")
result = "label error"
else:
result = volume_name_buf.value
if volume_name_buf.value == '':
result = "no_label"
return result
# os info function
def get_os_info() -> str:
result = "windows"
pythoncom.CoInitialize()
#try:
wmi_data = wmi.WMI()
os_info = wmi_data.Win32_OperatingSystem()[0]
#return {
# "Name" : os_info.Name,
# "Version" : os_info.Version,
# "BuildNumber": os_info.BuildNumber,
# "InstallDate": os_info.InstallDate,
# "ProductType": int(os_info.ProductType)
#}
# 1. Major version + edition (e.g. "10 Pro")
# os_info.Caption → "Microsoft Windows 10 Pro"
parts = os_info.Caption.split()
major = parts[2] # "10"
edition = parts[3] # "Pro" (for server: "2019", etc.)
major_edition = f"{major} {edition}"
# 2. Build number
build = os_info.BuildNumber
# 3. Install date (WMI gives an ISO8601 string)
# e.g. "20210930142300.000000+000"
install_ts = os_info.InstallDate[:14] # "20210930142300"
dt = datetime.strptime(install_ts, "%Y%m%d%H%M%S")
install_date = f"{dt.month}-{dt.day}-{dt.year}"
result = f"Windows {major_edition} - Build {build} - Installed {install_date}"
#except Exception as e:
# print(e)
# result = "wmi_error"
return result
# server reporter info
def get_server_info() -> dict:
result = {}
drives_dict = get_crystal_disk_info()
data_dict = {
"hostname": "{{ hostname_output.stdout_lines[0] }}",
"os_string": get_os_info(),
"drives": drives_dict["drives"],
"API_KEY": "deadbeef",
"storage_summary": get_disk_info()
}
result = data_dict
return result
# Flask endpoints
@app.route('/disk', methods=['GET'])
def disk():
return jsonify(get_disk_info())
@app.route('/health', methods=['GET'])
def drive_health():
return jsonify(get_crystal_disk_info())
@app.route('/full', methods=['GET'])
def full_summary():
return jsonify(get_server_info())
@app.route('/test', methods=['GET'])
def test_flask():
return jsonify({
"message": "hello world",
"os_string": get_os_info()
})
def server_reporter():
#base_url="https://cosmostat.matt-cloud.com"
base_url="http://10.200.27.20:5001"
url = f"{base_url}/storage_client_update"
data_dict = get_server_info()
result = []
try:
response = requests.post(url, json=data_dict)
# Raise an exception for non2xx status codes
response.raise_for_status()
result = response.json()
except:
result = {
"message": "error"
}
# Return the JSON payload
return result
if __name__ == '__main__':
# disk info Loop Function
def update_disk_info():
diskinfo_command = f"{{ storage_api_root }}\\dist\\DiskInfo64.exe /CopyExit"
result = check_output(diskinfo_command, shell=True)
print(result)
server_reporter()
return result
# gonna try something wild
run_date = datetime.now() + timedelta(seconds=15)
scheduler.add_job(id='init_disk_info',
func=update_disk_info,
trigger='date',
run_date=run_date)
scheduler.add_job(id='update_disk_info',
func=update_disk_info,
trigger='interval',
seconds=10000)
scheduler.add_job(id='server_reporter',
func=server_reporter,
trigger='interval',
seconds=15)
scheduler.init_app(app)
scheduler.start()
#update_disk_info()
app.run(host='0.0.0.0', port={{ api_service_port }})