Skip to content

Commit

Permalink
fixes for python apis for consume.
Browse files Browse the repository at this point in the history
  • Loading branch information
hariharan-devarajan committed Jan 25, 2024
1 parent 9f9e45b commit 778f02f
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 8 deletions.
16 changes: 16 additions & 0 deletions pydyad/pydyad/bindings.py
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,22 @@ def consume(self, fname):
if int(res) != 0:
raise RuntimeError("Cannot consume data with DYAD!")

@dlio_log.log
def consume_w_metadata(self, fname, metadata_wrapper):
if self.dyad_consume is None:
warnings.warn(
"Trying to consunme with metadata with DYAD when libdyad_core.so was not found",
RuntimeWarning
)
return
res = self.dyad_consume_w_metadata(
self.ctx,
fname.encode(),
metadata_wrapper
)
if int(res) != 0:
raise RuntimeError("Cannot consume data with metadata with DYAD!")

@dlio_log.log
def finalize(self):
if not self.initialized:
Expand Down
9 changes: 6 additions & 3 deletions pydyad/pydyad/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# https://peps.python.org/pep-0343/
@contextmanager
@dlio_log.log
def dyad_open(*args, dyad_ctx=None, register_dyad_ctx=False, **kwargs):
def dyad_open(*args, dyad_ctx=None, metadata_wrapper=None, register_dyad_ctx=False, **kwargs):
global DYAD_IO
local_dyad_io = dyad_ctx
if dyad_ctx is None:
Expand All @@ -38,7 +38,10 @@ def dyad_open(*args, dyad_ctx=None, register_dyad_ctx=False, **kwargs):
if mode in ("r", "rb", "rt"):
if (local_dyad_io.cons_path is not None and
local_dyad_io.cons_path in fname.parents):
local_dyad_io.consume(str(fname))
if metadata_wrapper:
local_dyad_io.consume_w_metadata(str(fname), metadata_wrapper)
else:
local_dyad_io.consume(str(fname))
file_obj = io.open(*args, **kwargs)
try:
yield file_obj
Expand All @@ -47,4 +50,4 @@ def dyad_open(*args, dyad_ctx=None, register_dyad_ctx=False, **kwargs):
if mode in ("w", "wb", "wt"):
if (local_dyad_io.prod_path is not None and
local_dyad_io.prod_path in fname.parents):
local_dyad_io.produce(str(fname))
local_dyad_io.produce(str(fname))
12 changes: 7 additions & 5 deletions tests/integration/dlio_benchmark/dyad_torch_data_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,23 +110,25 @@ def __getitem__(self, image_idx):
dlp.update(args={"fname":filename})
dlp.update(args={"image_idx":image_idx})
if self.dyad_managed_directory != "":
logging.info(f"{utcnow()} Rank {DLIOMPI.get_instance().rank()} reading metadata")
base_fname = os.path.join(self.dyad_managed_directory, os.path.basename(filename))
file_obj = self.dyad_io.get_metadata(fname=base_fname, should_wait=False)
file_obj = self.dyad_io.get_metadata(fname=base_fname, should_wait=False, raw=True)
logging.debug(f"Using managed directory {self.dyad_managed_directory} {base_fname} {file_obj}")
is_present = True
if file_obj:
access_mode = "remote"
file_node_index = int(file_obj.owner_rank*1.0 / self.broker_per_node)
file_node_index = int(file_obj.contents.owner_rank*1.0 / self.broker_per_node)
if self.my_node_index == file_node_index:
access_mode = "local"
dlp.update(args={"owner_rank":str(file_obj.owner_rank)})
dlp.update(args={"owner_rank":str(file_obj.contents.owner_rank)})
dlp.update(args={"my_broker":str(self.broker_rank)})
dlp.update(args={"mode":"dyad"})
dlp.update(args={"access":access_mode})
logging.info(f"{utcnow()} Rank {DLIOMPI.get_instance().rank()} reading {image_idx} sample from {access_mode} dyad {file_obj.owner_rank}")
logging.info(f"{utcnow()} Rank {DLIOMPI.get_instance().rank()} reading {image_idx} sample from {access_mode} dyad {file_obj.contents.owner_rank}")
logging.debug(f"Reading from managed directory {base_fname}")
with dyad_open(base_fname, "rb", dyad_ctx=self.dyad_io) as f:
with dyad_open(base_fname, "rb", dyad_ctx=self.dyad_io, metadata_wrapper=file_obj) as f:
data = np.load(f, allow_pickle=True)["x"]
self.dyad_io.free_metadata(file_obj)
else:
dlp.update(args={"mode":"pfs"})
dlp.update(args={"access":"remote"})
Expand Down

0 comments on commit 778f02f

Please sign in to comment.