Files
ansible-windows/roles/storage_api/templates/disk_service.py

157 lines
5.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from flask import Flask, jsonify
from flask_apscheduler import APScheduler
import psutil
import os
import requests, json
from subprocess import check_output
app = Flask(__name__)
scheduler = APScheduler()
app.config['JSONIFY_PRETTYPRINT_REGULAR'] = True
# Bits to Bytes etc
def bytes_to_human_readable(bytes):
for unit in ['B', 'KB', 'MB', 'GB', 'TB']:
if bytes < 1024.0:
return f"{bytes:.2f} {unit}"
bytes /= 1024.0
# Parse cache and return results
def get_crystal_disk_info():
try:
# Read the generated DiskInfo.txt file
with open("DiskInfo.txt", "r") as file:
output = file.read()
# Initialize a list to hold each drive's data
drives = []
# Split the file content into sections for each drive
drive_sections = output.split('----------------------------------------------------------------------------')
for section in drive_sections:
lines = section.strip().splitlines()
data = {
"Hostname": None,
"Disk Size": None,
"Model": None,
"Serial Number": None,
"Firmware": None,
"Temperature": None,
"Health Status": None,
"Power On Hours": None,
"Power On Count": None,
"Host Writes": None,
"Wear Level Count": None,
"Drive Letter": None,
"Interface": None
}
for line in lines:
if "Model" in line:
if ":" in line:
data["Model"] = line.split(":", 1)[1].strip()
elif "Serial Number" in line:
if ":" in line:
data["Serial Number"] = line.split(":", 1)[1].strip()
elif "Firmware" in line:
if ":" in line:
data["Firmware"] = line.split(":", 1)[1].strip()
elif "Temperature" in line:
if ":" in line:
data["Temperature"] = line.split(":", 1)[1].strip()
elif "Health Status" in line:
if ":" in line:
data["Health Status"] = line.split(":", 1)[1].strip()
elif "Power On Hours" in line:
if ":" in line:
data["Power On Hours"] = line.split(":", 1)[1].strip()
elif "Power On Count" in line:
if ":" in line:
data["Power On Count"] = line.split(":", 1)[1].strip()
elif "Host Writes" in line:
if ":" in line:
data["Host Writes"] = line.split(":", 1)[1].strip()
elif "Wear Level Count" in line:
if ":" in line:
data["Wear Level Count"] = line.split(":", 1)[1].strip()
elif "Drive Letter" in line:
if ":" in line:
data["Drive Letter"] = line.split(":", 1)[1].strip()
elif "Disk Size" in line:
if ":" in line:
data["Disk Size"] = line.split(":", 1)[1].strip()
elif "Interface" in line:
if ":" in line:
data["Disk Size"] = line.split(":", 1)[1].strip()
if any(value is not None for value in data.values()):
drives.append(data)
data["Hostname"] = "{{ hostname_output.stdout_lines[0] }}"
if not drives:
raise ValueError("No drive data found")
return {"drives": drives}
except Exception as e:
return {"error": str(e)}
# Disk Info Function
def get_disk_info():
disk_info = []
partitions = psutil.disk_partitions()
for partition in partitions:
usage = psutil.disk_usage(partition.mountpoint)
disk_info.append({
'device': partition.device.replace('\\\\', '\\').rstrip('\\'),
#'mountpoint': partition.mountpoint,
#'fstype': partition.fstype,
'total': bytes_to_human_readable(usage.total),
'used': bytes_to_human_readable(usage.used),
'free': bytes_to_human_readable(usage.free),
'percent': usage.percent
})
return disk_info
# Flask endpoints
@app.route('/disk', methods=['GET'])
def disk():
return jsonify(get_disk_info())
@app.route('/health', methods=['GET'])
def drive_health():
return jsonify(get_crystal_disk_info())
def server_reporter():
base_url="http://172.25.1.18:5001/client_update"
url = f"{base_url}/process"
data_dict = get_crystal_disk_info()
response = requests.post(url, json=data_dict)
# Raise an exception for non2xx status codes
response.raise_for_status()
# Return the JSON payload
return response.json()
if __name__ == '__main__':
# Background Loop Function
# That makes this the service loop
def background_loop():
diskinfo_command = f"{{ storage_api_root }}\\dist\\DiskInfo64.exe /CopyExit"
result = check_output(diskinfo_command, shell=True)
print(result)
server_reporter()
return result
scheduler.add_job(id='background_loop',
func=background_loop,
trigger='interval',
seconds=60)
scheduler.init_app(app)
scheduler.start()
background_loop()
app.run(host='0.0.0.0', port={{ api_service_port }})