Skip to content

Commit

Permalink
Enhanced the Simulator client end_run solution to run the END_RUN whi…
Browse files Browse the repository at this point in the history
…le the client is in running session. (NVIDIA#2270)
  • Loading branch information
yhwen authored Jan 9, 2024
1 parent af7f76b commit 030b3d2
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 8 deletions.
25 changes: 19 additions & 6 deletions nvflare/private/fed/app/simulator/simulator_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -509,6 +509,8 @@ def __init__(self, args, clients: [], client_config, deploy_args, build_ctx):
self.build_ctx = build_ctx
self.kv_list = parse_vars(args.set)

self.end_run_clients = []

def run(self, gpu):
try:
# self.create_clients()
Expand All @@ -521,6 +523,13 @@ def run(self, gpu):

# wait for the server and client running thread to finish.
executor.shutdown()

for client in self.federated_clients:
if client.client_name not in self.end_run_clients:
self.do_one_task(
client, self.args.threads, gpu, lock, timeout=timeout, task_name=RunnerTask.END_RUN
)

except Exception as e:
self.logger.error(f"SimulatorClientRunner run error: {secure_format_exception(e)}")
finally:
Expand Down Expand Up @@ -554,15 +563,16 @@ def run_client_thread(self, num_of_threads, gpu, lock, rank, timeout=60):
client = client_to_run

client.simulate_running = True
stop_run, client_to_run = self.do_one_task(client, num_of_threads, gpu, lock, timeout=timeout)
stop_run, client_to_run, end_run_client = self.do_one_task(
client, num_of_threads, gpu, lock, timeout=timeout
)
if end_run_client:
with lock:
self.end_run_clients.append(end_run_client)

client.simulate_running = False
except Exception as e:
self.logger.error(f"run_client_thread error: {secure_format_exception(e)}")
finally:
if rank == 0:
for client in self.federated_clients:
self.do_one_task(client, num_of_threads, gpu, lock, timeout=timeout, task_name=RunnerTask.END_RUN)

def do_one_task(self, client, num_of_threads, gpu, lock, timeout=60.0, task_name=RunnerTask.TASK_EXEC):
open_port = get_open_ports(1)[0]
Expand Down Expand Up @@ -607,8 +617,11 @@ def do_one_task(self, client, num_of_threads, gpu, lock, timeout=60.0, task_name
}
conn.send(data)

end_run_client = None
while True:
stop_run = conn.recv()
if stop_run:
end_run_client = conn.recv()

with lock:
if num_of_threads != len(self.federated_clients):
Expand All @@ -621,7 +634,7 @@ def do_one_task(self, client, num_of_threads, gpu, lock, timeout=60.0, task_name
conn.send(False)
break

return stop_run, next_client
return stop_run, next_client, end_run_client

def _create_connection(self, open_port, timeout=60.0):
conn = None
Expand Down
9 changes: 7 additions & 2 deletions nvflare/private/fed/app/simulator/simulator_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ def create_client_runner(self, client):
def do_one_task(self, client, args):
interval = 1.0
stop_run = False
end_run_client = None
# Create the ClientRunManager and ClientRunner for the new client to run
try:
if client.run_manager is None:
Expand All @@ -105,6 +106,8 @@ def do_one_task(self, client, args):

# if any client got the END_RUN event, stop the simulator run.
if client_runner.run_abort_signal.triggered:
client_runner.end_run_events_sequence()
end_run_client = client.client_name
stop_run = True
self.logger.info("End the Simulator run.")
break
Expand All @@ -121,7 +124,7 @@ def do_one_task(self, client, args):
secure_log_traceback()
stop_run = True

return interval, stop_run
return interval, stop_run, end_run_client

def release_resources(self, client):
if client.run_manager:
Expand Down Expand Up @@ -150,8 +153,10 @@ def run(self, args, conn):
self.create_client_engine(client, deploy_args)

while True:
interval, stop_run = self.do_one_task(client, args)
interval, stop_run, end_run_client = self.do_one_task(client, args)
conn.send(stop_run)
if stop_run:
conn.send(end_run_client)

continue_run = conn.recv()
if not continue_run:
Expand Down

0 comments on commit 030b3d2

Please sign in to comment.