Skip to content

Commit

Permalink
fix: fix performance of process sampling failures
Browse files Browse the repository at this point in the history
  • Loading branch information
WildBeast114514 committed Aug 27, 2024
1 parent 9bc1c30 commit bde4e20
Showing 1 changed file with 87 additions and 79 deletions.
166 changes: 87 additions & 79 deletions cyber/tools/cyber_performance/cyber_performance.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@
process_network_io = {}
network_data_lock = threading.Lock()

sudo_prv = subprocess.run(
['sudo', '-n', 'true'], stdout=None, stderr=None).returncode == 0

remove_list = []
for i in os.listdir(DIRECTORY_TO_WATCH):
if i.startswith("performance_dumps"):
Expand Down Expand Up @@ -217,68 +220,71 @@ def on_any_event(event):
process_name = file_name.replace(".system.data", "")
pid_file_name = event.src_path.replace(".system.data", ".data")
latency_file_name = event.src_path.replace(".system.data", ".latency.data")
with open(event.src_path, 'r') as f:
current_data = {}
current_data["autodrive"] = autodrive
contents = f.read().split("\n")
if len(contents) == 0:
return None
elif len(contents) == 1 and contents[0] == "":
return None
for line in contents:
if line == "":
while True:
with open(event.src_path, 'r') as f:
current_data = {}
current_data["autodrive"] = autodrive
contents = f.read().split("\n")
if len(contents) == 0:
continue
instance = line.strip().split(" : ")
if len(instance) == 1:
elif len(contents) == 1 and contents[0] == "":
continue

if instance[0].endswith("cpu_usage"):
current_data["BASIC - cpu_usage(%, single-core)"] = float(instance[1]) * 100
elif instance[0].endswith("memory_resident"):
current_data["BASIC - memory(MB)"] = int(instance[1]) / 1024 / 1024
elif instance[0].endswith("disk_read_bytes_second"):
current_data["BLOCK_DEVICE_IO - block_device_io_read(rkB/s)"] = int(instance[1]) / 1024
elif instance[0].endswith("disk_write_bytes_second"):
current_data["BLOCK_DEVICE_IO - block_device_io_write(wkB/s)"] = int(instance[1]) / 1024
else:
continue
pid = -1
with open(pid_file_name, "r") as pf:
contents = pf.readlines()
for line in contents:
if line == "":
continue
instance = line.strip().split(" : ")
if len(instance) == 1:
continue
if instance[0].endswith("_pid"):
pid = int(instance[1])
break
gpu_mem = gpu_memory_usage(pid)

with open(latency_file_name, "r") as lf:
contents = lf.readlines()
has_proc_info = False
for line in contents:
if "proc_latency :" in line:
instance = line.split(" : ")
latency_name = instance[0].replace("mainboard_", "")
latency_val = int(instance[1]) / 1000

if instance[0].endswith("cpu_usage"):
current_data["BASIC - cpu_usage(%, single-core)"] = float(instance[1]) * 100
elif instance[0].endswith("memory_resident"):
current_data["BASIC - memory(MB)"] = int(instance[1]) / 1024 / 1024
elif instance[0].endswith("disk_read_bytes_second"):
current_data["BLOCK_DEVICE_IO - block_device_io_read(rkB/s)"] = int(instance[1]) / 1024
elif instance[0].endswith("disk_write_bytes_second"):
current_data["BLOCK_DEVICE_IO - block_device_io_write(wkB/s)"] = int(instance[1]) / 1024
else:
continue
has_proc_info = True
current_data[f"E2E_LATENCY - {latency_name}(ms)"] = latency_val
if not has_proc_info:
return
with network_data_lock:
if pid in process_network_io:
current_data["ETHERNET_DEVICE_IO - ethernet_device_io_write(wkB/s)"] = process_network_io[pid]["tx"]
current_data["ETHERNET_DEVICE_IO - ethernet_device_io_read(rkB/s)"] = process_network_io[pid]["rx"]
break
pid = -1

with open(pid_file_name, "r") as pf:
contents = pf.readlines()
for line in contents:
if line == "":
continue
instance = line.strip().split(" : ")
if len(instance) == 1:
continue
if instance[0].endswith("_pid"):
pid = int(instance[1])
break
gpu_mem = gpu_memory_usage(pid)

with open(latency_file_name, "r") as lf:
contents = lf.readlines()
has_proc_info = False
for line in contents:
if "proc_latency :" in line:
instance = line.split(" : ")
latency_name = instance[0].replace("mainboard_", "")
latency_val = float((instance[1].strip())) / 1000
else:
current_data["ETHERNET_DEVICE_IO - ethernet_device_io_write(wkB/s)"] = 0
current_data["ETHERNET_DEVICE_IO - ethernet_device_io_read(rkB/s)"] = 0

current_data["BASIC - gpu_memory(MB)"] = gpu_mem / 1024
continue
has_proc_info = True
current_data[f"E2E_LATENCY - {latency_name}(ms)"] = latency_val
if not has_proc_info:
return
with network_data_lock:
if pid in process_network_io:
current_data["ETHERNET_DEVICE_IO - ethernet_device_io_write(wkB/s)"] = process_network_io[pid]["tx"]
current_data["ETHERNET_DEVICE_IO - ethernet_device_io_read(rkB/s)"] = process_network_io[pid]["rx"]
else:
current_data["ETHERNET_DEVICE_IO - ethernet_device_io_write(wkB/s)"] = 0
current_data["ETHERNET_DEVICE_IO - ethernet_device_io_read(rkB/s)"] = 0

current_data["BASIC - gpu_memory(MB)"] = gpu_mem / 1024

with data_lock:
if process_name not in data_history["data"]:
Expand Down Expand Up @@ -420,7 +426,7 @@ def sample_task():

if dev["tran"] == "nvme" or dev["tran"] == "sata":
blk_monitor_devices.append(dev["name"])
elif dev["trans"] == "null" and "children" in dev:
elif dev["tran"] == "null" and "children" in dev:
blk_monitor_devices.append(dev["name"])
blk_monitor_devices = list(filter(
lambda x: os.path.exists(f"/dev/{x}"), blk_monitor_devices))
Expand Down Expand Up @@ -737,34 +743,36 @@ def index():
watchdog_thread.daemon = True
watchdog_thread.start()

nethogs_process = subprocess.Popen(
['sudo', 'nethogs', '-t'], stdout=subprocess.PIPE,
stderr=subprocess.PIPE, bufsize=1, universal_newlines=True)

def parse_nethogs_output():
global nethogs_buf, process_network_io, network_data_lock
for line in iter(nethogs_process.stdout.readline, ''):
if line.startswith("Refreshing:"):
with network_data_lock:
process_network_io = {}
nethogs_buf_list = list(filter(lambda x: x!= "", nethogs_buf.split("\n")))
for i in nethogs_buf_list:
raw_line = list(filter(lambda x: x != "", i.split(" ")))
raw_process_info = raw_line[0].split("\t")
process_info = raw_process_info[0].split("/")
if len(process_info) < 3 or int(process_info[-2]) == 0:
continue
process_network_io[int(process_info[-2])] = {}
process_network_io[int(process_info[-2])]["rx"] = float(raw_process_info[2])
process_network_io[int(process_info[-2])]["tx"] = float(raw_process_info[1])
nethogs_buf = ""
else:
nethogs_buf = nethogs_buf + line

network_sample_thread = threading.Thread(target=parse_nethogs_output)
network_sample_thread.daemon = True
network_sample_thread.start()
if sudo_prv:
nethogs_process = subprocess.Popen(
['sudo', 'nethogs', '-t'], stdout=subprocess.PIPE,
stderr=subprocess.PIPE, bufsize=1, universal_newlines=True)

def parse_nethogs_output():
global nethogs_buf, process_network_io, network_data_lock
for line in iter(nethogs_process.stdout.readline, ''):
if line.startswith("Refreshing:"):
with network_data_lock:
process_network_io = {}
nethogs_buf_list = list(filter(lambda x: x!= "", nethogs_buf.split("\n")))
for i in nethogs_buf_list:
raw_line = list(filter(lambda x: x != "", i.split(" ")))
raw_process_info = raw_line[0].split("\t")
process_info = raw_process_info[0].split("/")
if len(process_info) < 3 or int(process_info[-2]) == 0:
continue
process_network_io[int(process_info[-2])] = {}
process_network_io[int(process_info[-2])]["rx"] = float(raw_process_info[2])
process_network_io[int(process_info[-2])]["tx"] = float(raw_process_info[1])
nethogs_buf = ""
else:
nethogs_buf = nethogs_buf + line

network_sample_thread = threading.Thread(target=parse_nethogs_output)
network_sample_thread.daemon = True
network_sample_thread.start()

cyber_task()

app.run(host="0.0.0.0", threaded=True)

0 comments on commit bde4e20

Please sign in to comment.