Skip to content

Commit

Permalink
Add functionality that directly writes variables to a temporary zarr …
Browse files Browse the repository at this point in the history
…store (OSOceanAcoustics#774)

* add initial code to grab the appropriate parsed data

* establish initial structure to go from parsed to zarr

* modify open_raw routines for EK60/80 so that we can load in zarr arrays directly into the EchoData object and the temp zarr directory persists until EchoData object is completely destroyed

* change distribution of times to a red-robin like distribution

* take first step towards generalizing the parsed_to_zarr module

* generalize write_df_to_zarr so it can handle columns without arrays, begin documenting parsed_to_zarr, and add the padding of range_sample in get_np_chunk

* finish cleaning up the code in parsed_to_zarr

* improve chunking in parsed_to_zarr and change num_mb to max_mb

* make a preliminary attempt at writing complex data

* start the restructuring of parsed_to_zarr into a class

* finish parsed to zarr reorganization for EK60 and EK80

* document and clean up the code associated with set_groups_ek60

* clean up parse_base and make it so that we do not store zarr variables twice

* move get_power_dataarray and get_angle_dataarrays to set_groups_base, modify the inputs of these functions, and begin working on set_groups_ek80 for straight to zarr

* obtain partially working version of Beam_group2 for EK80

* finish constructing ds_beam_power when zarr variables are present

* add method to get complex data arrays from zarr in set_groups_base

* generalize parsed_to_zarr so we can have column elements with multi-dimensional arrays

* finish get_ds_complex_zarr in set_groups_ek80

* add open_raw zarr variables to api and create a routine that automatically determines if large variables should be written to a temporary zarr store

* modify the condition for when we should write directly to a temporary zarr store

* only store zarr varriables when we do not have receieve data, add structure for direct to zarr unit tests, run pre-commit on all files

* change all occurances of parser2zarr to parsed2zarr

* correct zarr typo in _append_channel_ping_data

* correct kwarg in rectangularize_data

* replace hardwired time dtype in parsed to zarr with times.dtype.str

* add return docstrings and types to a couple of functions in parsed_to_zarr.py

* add back the EK60 file description in the test_data readme

* add pytest.mark.skip to unit test

Co-authored-by: Don Setiawan <[email protected]>

* remove xfail and add pass to unit test

Co-authored-by: Don Setiawan <[email protected]>

* remove pandas from requirements file

* Add simple test for noaa file

* remove the auto option in open_raw

* remove Union typing import

* add test_data/README.md lines back in

* add spaces in test_data/README.md

* remove optional typing for `offload_to_zarr`

Co-authored-by: Don Setiawan <[email protected]>

* remove auto description in notes and add beta statement in open_raw

Co-authored-by: Don Setiawan <[email protected]>
  • Loading branch information
b-reyes and lsetiawan authored Aug 11, 2022
1 parent ad6dbc7 commit a099ee5
Show file tree
Hide file tree
Showing 17 changed files with 1,987 additions and 276 deletions.
111 changes: 86 additions & 25 deletions echopype/convert/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
# fmt: off
# black and isort have conflicting ideas about how this should be formatted
from ..core import SONAR_MODELS
from .parsed_to_zarr import Parsed2Zarr

if TYPE_CHECKING:
from ..core import EngineHint, PathHint, SonarModelsHint
Expand Down Expand Up @@ -111,9 +112,10 @@ def _save_groups_to_file(echodata, output_path, engine, compress=True):
# Environment group
if "time1" in echodata["Environment"]:
io.save_file(
echodata["Environment"].chunk(
{"time1": DEFAULT_CHUNK_SIZE["ping_time"]}
), # TODO: chunking necessary?
# echodata["Environment"].chunk(
# {"time1": DEFAULT_CHUNK_SIZE["ping_time"]}
# ), # TODO: chunking necessary?
echodata["Environment"],
path=output_path,
mode="a",
engine=engine,
Expand Down Expand Up @@ -171,11 +173,12 @@ def _save_groups_to_file(echodata, output_path, engine, compress=True):
if echodata.sonar_model == "AD2CP":
for i in range(1, len(echodata["Sonar"]["beam_group"]) + 1):
io.save_file(
echodata[f"Sonar/Beam_group{i}"].chunk(
{
"ping_time": DEFAULT_CHUNK_SIZE["ping_time"],
}
),
# echodata[f"Sonar/Beam_group{i}"].chunk(
# {
# "ping_time": DEFAULT_CHUNK_SIZE["ping_time"],
# }
# ),
echodata[f"Sonar/Beam_group{i}"],
path=output_path,
mode="a",
engine=engine,
Expand All @@ -184,12 +187,13 @@ def _save_groups_to_file(echodata, output_path, engine, compress=True):
)
else:
io.save_file(
echodata[f"Sonar/{BEAM_SUBGROUP_DEFAULT}"].chunk(
{
"range_sample": DEFAULT_CHUNK_SIZE["range_sample"],
"ping_time": DEFAULT_CHUNK_SIZE["ping_time"],
}
),
# echodata[f"Sonar/{BEAM_SUBGROUP_DEFAULT}"].chunk(
# {
# "range_sample": DEFAULT_CHUNK_SIZE["range_sample"],
# "ping_time": DEFAULT_CHUNK_SIZE["ping_time"],
# }
# ),
echodata[f"Sonar/{BEAM_SUBGROUP_DEFAULT}"],
path=output_path,
mode="a",
engine=engine,
Expand All @@ -199,12 +203,13 @@ def _save_groups_to_file(echodata, output_path, engine, compress=True):
if echodata["Sonar/Beam_group2"] is not None:
# some sonar model does not produce Sonar/Beam_group2
io.save_file(
echodata["Sonar/Beam_group2"].chunk(
{
"range_sample": DEFAULT_CHUNK_SIZE["range_sample"],
"ping_time": DEFAULT_CHUNK_SIZE["ping_time"],
}
),
# echodata["Sonar/Beam_group2"].chunk(
# {
# "range_sample": DEFAULT_CHUNK_SIZE["range_sample"],
# "ping_time": DEFAULT_CHUNK_SIZE["ping_time"],
# }
# ),
echodata["Sonar/Beam_group2"],
path=output_path,
mode="a",
engine=engine,
Expand All @@ -215,9 +220,10 @@ def _save_groups_to_file(echodata, output_path, engine, compress=True):
# Vendor_specific group
if "ping_time" in echodata["Vendor_specific"]:
io.save_file(
echodata["Vendor_specific"].chunk(
{"ping_time": DEFAULT_CHUNK_SIZE["ping_time"]}
), # TODO: chunking necessary?
# echodata["Vendor_specific"].chunk(
# {"ping_time": DEFAULT_CHUNK_SIZE["ping_time"]}
# ), # TODO: chunking necessary?
echodata["Vendor_specific"],
path=output_path,
mode="a",
engine=engine,
Expand Down Expand Up @@ -331,6 +337,8 @@ def open_raw(
xml_path: Optional["PathHint"] = None,
convert_params: Optional[Dict[str, str]] = None,
storage_options: Optional[Dict[str, str]] = None,
offload_to_zarr: bool = False,
max_zarr_mb: int = 100,
) -> Optional[EchoData]:
"""Create an EchoData object containing parsed data from a single raw data file.
Expand Down Expand Up @@ -359,10 +367,22 @@ def open_raw(
and need to be added to the converted file
storage_options : dict
options for cloud storage
offload_to_zarr: bool
If True, variables with a large memory footprint will be
written to a temporary zarr store.
max_zarr_mb : int
maximum MB that each zarr chunk should hold, when offloading
variables with a large memory footprint to a temporary zarr store
Returns
-------
EchoData object
Notes
-----
``offload_to_zarr=True`` is only available for the following
echosounders: EK60, ES70, EK80, ES80, EA640. Additionally, this feature
is currently in beta.
"""
if (sonar_model is None) and (raw_file is None):
logger.warning("Please specify the path to the raw data file and the sonar model.")
Expand Down Expand Up @@ -418,24 +438,63 @@ def open_raw(
# Check file extension and existence
file_chk, xml_chk = _check_file(raw_file, sonar_model, xml_path, storage_options)

# TODO: remove once 'auto' option is added
if not isinstance(offload_to_zarr, bool):
raise ValueError("offload_to_zarr must be of type bool.")

# Ensure offload_to_zarr is 'auto', if it is a string
# TODO: use the following when we allow for 'auto' option
# if isinstance(offload_to_zarr, str) and offload_to_zarr != "auto":
# raise ValueError("offload_to_zarr must be a bool or equal to 'auto'.")

# TODO: the if-else below only works for the AZFP vs EK contrast,
# but is brittle since it is abusing params by using it implicitly
if SONAR_MODELS[sonar_model]["xml"]:
params = xml_chk
else:
params = "ALL" # reserved to control if only wants to parse a certain type of datagram

# obtain dict associated with directly writing to zarr
dgram_zarr_vars = SONAR_MODELS[sonar_model]["dgram_zarr_vars"]

# Parse raw file and organize data into groups
parser = SONAR_MODELS[sonar_model]["parser"](
file_chk, params=params, storage_options=storage_options
file_chk, params=params, storage_options=storage_options, dgram_zarr_vars=dgram_zarr_vars
)

parser.parse_raw()

# code block corresponding to directly writing parsed data to zarr
if offload_to_zarr and (sonar_model in ["EK60", "ES70", "EK80", "ES80", "EA640"]):

# Determines if writing to zarr is necessary and writes to zarr
p2z = SONAR_MODELS[sonar_model]["parsed2zarr"](parser)

# TODO: perform more robust tests for the 'auto' heuristic value
if offload_to_zarr == "auto" and p2z.write_to_zarr(mem_mult=0.4):
p2z.datagram_to_zarr(max_mb=max_zarr_mb)
elif offload_to_zarr is True:
p2z.datagram_to_zarr(max_mb=max_zarr_mb)
else:
del p2z
p2z = Parsed2Zarr(parser)
if "ALL" in parser.data_type:
parser.rectangularize_data()

else:
p2z = Parsed2Zarr(parser)
if (sonar_model in ["EK60", "ES70", "EK80", "ES80", "EA640"]) and (
"ALL" in parser.data_type
):
parser.rectangularize_data()

setgrouper = SONAR_MODELS[sonar_model]["set_groups"](
parser,
input_file=file_chk,
output_path=None,
sonar_model=sonar_model,
params=_set_convert_params(convert_params),
parsed2zarr_obj=p2z,
)

# Setup tree dictionary
Expand Down Expand Up @@ -482,7 +541,9 @@ def open_raw(
# Create tree and echodata
# TODO: make the creation of tree dynamically generated from yaml
tree = DataTree.from_dict(tree_dict, name="root")
echodata = EchoData(source_file=file_chk, xml_path=xml_chk, sonar_model=sonar_model)
echodata = EchoData(
source_file=file_chk, xml_path=xml_chk, sonar_model=sonar_model, parsed2zarr_obj=p2z
)
echodata._set_tree(tree)
echodata._load_tree()

Expand Down
9 changes: 2 additions & 7 deletions echopype/convert/parse_ad2cp.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,13 +219,8 @@ class NoMorePackets(Exception):


class ParseAd2cp(ParseBase):
def __init__(
self,
*args,
params,
**kwargs,
):
super().__init__(*args, **kwargs)
def __init__(self, file, params, storage_options={}, dgram_zarr_vars={}):
super().__init__(file, storage_options)
self.config = None
self.packets: List[Ad2cpDataPacket] = []

Expand Down
2 changes: 1 addition & 1 deletion echopype/convert/parse_azfp.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
class ParseAZFP(ParseBase):
"""Class for converting data from ASL Environmental Sciences AZFP echosounder."""

def __init__(self, file, params, storage_options={}):
def __init__(self, file, params, storage_options={}, dgram_zarr_vars={}):
super().__init__(file, storage_options)
# Parent class attributes
# regex pattern used to grab datetime embedded in filename
Expand Down
Loading

0 comments on commit a099ee5

Please sign in to comment.