cosmostat has working drive health dashboard
This commit is contained in:
66
files/docker/apis/StorageSummary/Helpers.py
Normal file
66
files/docker/apis/StorageSummary/Helpers.py
Normal file
@ -0,0 +1,66 @@
|
||||
|
||||
import base64, hashlib
|
||||
import subprocess
|
||||
import ipaddress
|
||||
from typing import Dict, Any, List
|
||||
|
||||
# pickle subroutines
|
||||
import pickle
|
||||
from pathlib import Path
|
||||
|
||||
print("Importing Helpers")
|
||||
# subnet helper app
|
||||
def is_ip_in_subnets(ip, subnet):
|
||||
try:
|
||||
ip_obj = ipaddress.IPv4Address(ip)
|
||||
subnet_obj = ipaddress.IPv4Network(subnet)
|
||||
if ip_obj in subnet_obj:
|
||||
return True
|
||||
return False
|
||||
except ValueError as e:
|
||||
# If the IP address is not valid, raise an error
|
||||
return False
|
||||
|
||||
# subroutine to run a command, return stdout as array unless zero_only then return [0]
|
||||
def run_command(cmd, zero_only=False, use_shell=True, req_check = True):
|
||||
# Run the command and capture the output
|
||||
result = subprocess.run(cmd, shell=use_shell, check=req_check, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
||||
# Decode the byte output to a string
|
||||
output = result.stdout.decode('utf-8')
|
||||
# Split the output into lines and store it in an array
|
||||
output_lines = [line for line in output.split('\n') if line]
|
||||
# Return result
|
||||
try:
|
||||
return output_lines[0] if zero_only else output_lines
|
||||
except:
|
||||
return output_lines
|
||||
|
||||
def short_uuid(value: str, length=8):
|
||||
hasher = hashlib.md5()
|
||||
hasher.update(value.encode('utf-8'))
|
||||
full_hex = hasher.hexdigest()
|
||||
return full_hex[:length]
|
||||
|
||||
# test subroutine
|
||||
def get_hostname():
|
||||
hostname_command = "hostname"
|
||||
return run_command(hostname_command, zero_only = True)
|
||||
|
||||
|
||||
# pickle helpers
|
||||
# Where the pickled state will live
|
||||
STATE_FILE = Path(__file__).parent / "storage_api_state.pkl"
|
||||
|
||||
def save_state(obj: object, path: Path | str = STATE_FILE) -> None:
|
||||
path = Path(path)
|
||||
path.parent.mkdir(parents=True, exist_ok=True)
|
||||
with path.open("wb") as f:
|
||||
pickle.dump(obj, f, protocol=pickle.HIGHEST_PROTOCOL)
|
||||
print("Pickle saved")
|
||||
|
||||
def load_state(path: Path | str = STATE_FILE) -> object | None:
|
||||
path = Path(path)
|
||||
if path.is_file():
|
||||
with path.open("rb") as f:
|
||||
return pickle.load(f)
|
||||
return None
|
||||
Reference in New Issue
Block a user