Skip to content

Commit

Permalink
prevent async process from writing alive file until the main process …
Browse files Browse the repository at this point in the history
…has created the cache root (tensorflow#5614)
  • Loading branch information
Taylor Robie authored Oct 25, 2018
1 parent 31d3890 commit 2644707
Showing 1 changed file with 10 additions and 5 deletions.
15 changes: 10 additions & 5 deletions official/recommendation/data_async_generation.py
Original file line number Diff line number Diff line change
Expand Up @@ -479,18 +479,21 @@ def _generation_loop(num_workers, # type: int
gc.collect()


def _parse_flagfile(flagfile):
"""Fill flags with flagfile written by the main process."""
tf.logging.info("Waiting for flagfile to appear at {}..."
.format(flagfile))
def wait_for_path(fpath):
start_time = time.time()
while not tf.gfile.Exists(flagfile):
while not tf.gfile.Exists(fpath):
if time.time() - start_time > rconst.TIMEOUT_SECONDS:
log_msg("Waited more than {} seconds. Concluding that this "
"process is orphaned and exiting gracefully."
.format(rconst.TIMEOUT_SECONDS))
sys.exit()
time.sleep(1)

def _parse_flagfile(flagfile):
"""Fill flags with flagfile written by the main process."""
tf.logging.info("Waiting for flagfile to appear at {}..."
.format(flagfile))
wait_for_path(flagfile)
tf.logging.info("flagfile found.")

# `flags` module opens `flagfile` with `open`, which does not work on
Expand All @@ -504,6 +507,8 @@ def _parse_flagfile(flagfile):

def write_alive_file(cache_paths):
"""Write file to signal that generation process started correctly."""
wait_for_path(cache_paths.cache_root)

log_msg("Signaling that I am alive.")
with tf.gfile.Open(cache_paths.subproc_alive, "w") as f:
f.write("Generation subproc has started.")
Expand Down

0 comments on commit 2644707

Please sign in to comment.