Skip to content

Commit

Permalink
Merge pull request Azure#750 from Azure/hpc_monitoring_node_meta
Browse files Browse the repository at this point in the history
hpc monitoring (Added support to report node metadata metrics)
  • Loading branch information
garvct authored Jul 16, 2024
2 parents 9efb458 + e57cf65 commit 7e6d1fc
Showing 1 changed file with 78 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
import struct
import time
import argparse
import pickle
from urllib.request import urlopen, Request


# Some useful DCGM field ID's (for GPU monitoring)
Expand Down Expand Up @@ -64,6 +66,9 @@
]
# Other useful CPU memory counters are Bufferes and Cached (there are many other useful counters see /proc/meminfo

# The location of the VM metadata pickle file
VM_METADATA_FILE_PATH = "./vm_metadata_pickle"


# Build the API signature
def build_signature(customer_id, shared_key, date, content_length, method, content_type, resource):
Expand Down Expand Up @@ -120,7 +125,7 @@ def num(s):
return float(s)


def create_data_records(gpu_l, ib_rates_l, eth_rates_l, nfs_rates_l, disk_l, df_l, cpu_mem_l, cpu_l, event_l):
def create_data_records(gpu_l, ib_rates_l, eth_rates_l, nfs_rates_l, disk_l, df_l, cpu_mem_l, cpu_l, event_l, node_meta_l):
data_l = []
if gpu_l:
data_l = data_l + gpu_l
Expand All @@ -140,6 +145,8 @@ def create_data_records(gpu_l, ib_rates_l, eth_rates_l, nfs_rates_l, disk_l, df_
data_l = data_l + cpu_l
if event_l:
data_l = data_l + event_l
if node_meta_l:
data_l = data_l + node_meta_l

return data_l

Expand Down Expand Up @@ -170,6 +177,48 @@ def get_physicalhostname():
return str(value[0])[2:][:-1]


def get_vm_metadata_from_url():
metadata_url = "http://169.254.169.254/metadata/instance?api-version=2021-02-01"
metadata_req = Request(metadata_url, headers={"Metadata": True})

for _ in range(30):
metadata_response = urlopen(metadata_req, timeout=2)

try:
return json.load(metadata_response)
except ValueError as e:
print("Failed to get metadata %s" % e)
print(" Retrying")
sleep(2)
continue
except:
print("Unable to obtain metadata after 30 tries")
raise


def save_json_data_as_pickle(json_data, pickle_file):
try:
with open(pickle_file, 'wb') as f:
pickle.dump(json_data, f)
print(f"Data saved as {pickle_file}")
except Exception as e:
print(f"Error saving json data as pickle file: {e}")


def get_vm_metadata():
try:
if os.path.exists(VM_METADATA_FILE_PATH):
with open(VM_METADATA_FILE_PATH, 'rb') as pickle_file:
data = pickle.load(pickle_file)
json_data = json.loads(json.dumps(data))
else:
json_data = get_vm_metadata_from_url()
save_json_data_as_pickle(json_data, VM_METADATA_FILE_PATH)
except Exception as e:
print(f"Error getting VM metadata or creating pickle file: {str(e)}")
return json_data


def get_counter_value(file_path):
file = open(file_path, "r")
value = file.read()
Expand Down Expand Up @@ -317,6 +366,18 @@ def get_df_data(hostname, physicalhostname_val, have_jobid, slurm_jobid):
return df_l


def get_node_metadata(hostname, physicalhostname_val, node_resourceId, have_jobid, slurm_jobid):
nm_l = []
nm_data_d = {}
nm_data_d['hostname'] = hostname
nm_data_d['physicalhostname'] = physicalhostname_val
nm_data_d['resourceid'] = node_resourceId
if have_jobid:
nm_data_d['slurm_jobid'] = slurm_jobid
nm_l.append(nm_data_d)
return nm_l


def get_nfs_rates(nfs_counters, time_interval_seconds, hostname, physicalhostname_val, have_jobid, slurm_jobid):
nfs_rates_l = []
current_nfs_counters = get_nfs_data()
Expand Down Expand Up @@ -565,6 +626,7 @@ def parse_args():
parser.add_argument("-cpum", "--cpu_metrics", action="store_true", help="Collects CPU metrics (e.g. user, sys, idle & iowait time)")
parser.add_argument("-cpu_memm", "--cpu_mem_metrics", action="store_true", help="Collects CPU memory metrics (Default: MemTotal, MemFree)")
parser.add_argument("-eventm", "--scheduled_event_metrics", action="store_true", help="Collects Azure/user scheduled events metrics")
parser.add_argument("-node_metam", "--node_metadata_metrics", action="store_true", help="Collects Node identification metadata metrics")
parser.add_argument("-uc", "--use_crontab", action="store_true", help="This script will be started by the system contab and the time interval between each data collection will be decided by the system crontab (if crontab is selected then the -tis argument will be ignored).")
parser.add_argument("-tis", "--time_interval_seconds", dest="time_interval_seconds", type=int, default=10, help="The time interval in seconds between each data collection (This option cannot be used with the -uc argument)")
args = parser.parse_args()
Expand Down Expand Up @@ -609,16 +671,20 @@ def parse_args():
scheduled_event_metrics = True
else:
scheduled_event_metrics = False
if args.node_metadata_metrics:
node_metadata_metrics = True
else:
node_metadata_metrics = False
time_interval_seconds = args.time_interval_seconds
dcgm_field_ids = args.dcgm_field_ids
force_hpc_monitoring = args.force_hpc_monitoring
name_log_event = args.name_log_event

return (gpu_metrics,use_crontab,time_interval_seconds,dcgm_field_ids,force_hpc_monitoring,ib_metrics,eth_metrics,nfs_metrics,disk_metrics,df_metrics,cpu_metrics,cpu_mem_metrics,scheduled_event_metrics,name_log_event)
return (gpu_metrics,use_crontab,time_interval_seconds,dcgm_field_ids,force_hpc_monitoring,ib_metrics,eth_metrics,nfs_metrics,disk_metrics,df_metrics,cpu_metrics,cpu_mem_metrics,scheduled_event_metrics,node_metadata_metrics,name_log_event)


def main():
(gpu_metrics,use_crontab,time_interval_seconds,dcgm_field_ids,force_hpc_monitoring,ib_metrics,eth_metrics,nfs_metrics,disk_metrics,df_metrics,cpu_metrics,cpu_mem_metrics,scheduled_event_metrics,name_log_event) = parse_args()
(gpu_metrics,use_crontab,time_interval_seconds,dcgm_field_ids,force_hpc_monitoring,ib_metrics,eth_metrics,nfs_metrics,disk_metrics,df_metrics,cpu_metrics,cpu_mem_metrics,scheduled_event_metrics,node_metadata_metrics,name_log_event) = parse_args()
(customer_id,shared_key) = read_env_vars()
ib_counters = {}
cpu_counters = {}
Expand All @@ -634,16 +700,19 @@ def main():
cpu_l = []
gpu_l = []
event_l = []
node_meta_l = []
dcgm_dmon_fields_out = []
dcgm_dmon_list_out = []
last_DocumentIncarnation = -1

hostname = socket.gethostname()
physicalhostname_val = get_physicalhostname()
node_metadata_d = get_vm_metadata()

while True:
(have_jobid, slurm_jobid) = get_slurm_jobid()

if have_jobid or force_hpc_monitoring:
hostname = socket.gethostname()
physicalhostname_val = get_physicalhostname()
if gpu_metrics:
gpu_l = get_gpu_data(dcgm_field_ids, hostname, physicalhostname_val, have_jobid, slurm_jobid)
if ib_metrics:
Expand All @@ -662,7 +731,10 @@ def main():
df_l = get_df_data(hostname, physicalhostname_val, have_jobid, slurm_jobid)
if scheduled_event_metrics:
event_l,last_DocumentIncarnation = get_scheduled_events_data(last_DocumentIncarnation)
data_l = create_data_records(gpu_l, ib_rates_l, eth_rates_l, nfs_rates_l, disk_l, df_l, cpu_mem_l, cpu_l, event_l)
if node_metadata_metrics:
node_resourceid = node_metadata_d["compute"]["resourceId"]
node_meta_l = get_node_metadata(hostname, physicalhostname_val, node_resourceid, have_jobid, slurm_jobid)
data_l = create_data_records(gpu_l, ib_rates_l, eth_rates_l, nfs_rates_l, disk_l, df_l, cpu_mem_l, cpu_l, event_l, node_meta_l)
print(data_l)
if data_l:
body = json.dumps(data_l)
Expand Down

0 comments on commit 7e6d1fc

Please sign in to comment.