import subprocess import re import csv import os import time # Define your shell command command = ["top", "-n", "1"] headerRow = [ "timestamp", "name", "loadaverage_1", "loadaverage_5", "loadaverage_15", "total_processes", "running", "sleeping", "zombie", "cpu_user", "cpu_nice", "cpu_system", "cpu_interrupt", "cpu_idle", "mem_active", "mem_inactive", "mem_wired", "mem_free", # "arc_total", # "arc_MFU", # "arc_MRU", # "arc_anon", # "arc_header", # "arc_other", # "swap_total", # "swap_free", ] filename = "topStat.csv" # Check if the file exists if not os.path.exists(filename): # Create the file in write mode ('w') if it doesn't exist with open(filename, "w", newline="") as csvfile: csv_writer = csv.writer(csvfile) # Write the header row (optional) csv_writer.writerow(headerRow) def convert_to_megabytes(data_size): """ Converts a string representing data size (number + unit) to size in megabytes. Args: data_size (str): String representing data size (e.g., "10G", "2M"). Returns: int: Size of data in megabytes, or None if the format is invalid. """ try: unit = data_size[-1].upper() size = data_size[:-1] size = float(size) except (ValueError, AttributeError): return None unit_map = { "K": 1 / (1024**1), "M": 1, "G": 1024, "T": 1024**2, } if unit not in unit_map: return None return int(size * unit_map[unit]) def runtopstat(): # Run the command and capture output result = subprocess.run(command, capture_output=True, text=True) timestamp = time.time() local_time = time.localtime(timestamp) # Format the time components for a more readable output timestamp = time.strftime("%Y-%m-%d %H:%M:%S", local_time) # Check the return code (0 for success) if result.returncode == 0: # Access the captured output as a string output = result.stdout loadaverages = re.search("^last.*", output, re.MULTILINE).group() loadaverages = re.sub("up.*", "", loadaverages.strip()) loadaverages = re.sub("[a-z:;,]", "", loadaverages.strip()).split() loadaverage1 = loadaverages[0] loadaverage5 = loadaverages[1] loadaverage15 = loadaverages[2] processes = re.search(".*processes.*", output).group() processes = re.sub("[a-z,:]", "", processes.strip()).split() total_processes = processes[0] running = processes[1] try: sleeping = processes[2] except IndexError: print("no sleeping processes") try: sleeping = processes[3] except IndexError: print("no zombie processes") cpu = re.search("CPU:.*", output).group() cpu = re.sub("CPU.", "", cpu.strip()) cpu = re.sub("[a-z,%]", "", cpu.strip()).split() cpu_user = cpu[0] cpu_nice = cpu[1] cpu_system = cpu[2] cpu_interrupt = cpu[3] cpu_idle = cpu[4] mem = re.search("Mem:.*", output).group() mem = re.sub("Mem.", "", mem.strip()) mem = re.sub("[Active|Inact|Wired|Free|,]", "", mem.strip()).split() meminmeg = [] for x in mem: meminmeg.append(convert_to_megabytes(x)) mem = meminmeg mem_active = mem[0] mem_inactive = mem[1] mem_wired = mem[2] mem_free = mem[3] lineData = [ timestamp, "top", loadaverage1, loadaverage5, loadaverage15, total_processes, running, sleeping, zombie, cpu_user, cpu_nice, cpu_system, cpu_interrupt, cpu_idle, mem_active, mem_inactive, mem_wired, mem_free, ] if lineData: with open(filename, "a", newline="") as csvfile: csv_writer = csv.writer(csvfile) csv_writer.writerow(lineData) else: print(f"Error running command: {result.stderr}") def main(): runtopstat() if __name__ == "__main__": main()