Skip to content

Commit

Permalink
Make BeakerExecutor more robust to all recoverable errors types (co…
Browse files Browse the repository at this point in the history
…nnection, HTTP, SSL, timeout, etc) (#397)
  • Loading branch information
epwalsh authored Sep 9, 2022
1 parent 5f63a27 commit 230d78e
Show file tree
Hide file tree
Showing 5 changed files with 13 additions and 11 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## Unreleased

### Fixed

- Made `BeakerExecutor` more robust to connection, timeout, SSL, and other recoverable HTTP errors.

## [v0.13.0](https://github.com/allenai/tango/releases/tag/v0.13.0) - 2022-09-07

### Added
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ pytorch-lightning>=1.6,<1.8 # needed by: pytorch_lightning
transformers>=4.12.3 # needed by: transformers
sentencepiece==0.1.97 # needed by: transformers
fairscale==0.4.9 # needed by: fairscale
beaker-py>=1.7.3,<2.0.0 # needed by: beaker
beaker-py>=1.8.0,<2.0.0 # needed by: beaker

# sacremoses should be a dependency of transformers, but it is missing, so we add it manually.
sacremoses # needed by: transformers
Expand Down
8 changes: 3 additions & 5 deletions tango/integrations/beaker/workspace.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,7 @@ def step_info(self, step_or_unique_id: Union[Step, str]) -> StepInfo:
if file_info.digest in self._step_info_cache:
step_info = self._step_info_cache.pop(file_info.digest)
else:
step_info_bytes = b"".join(
self.beaker.dataset.stream_file(dataset, file_info, quiet=True)
)
step_info_bytes = self.beaker.dataset.get_file(dataset, file_info, quiet=True)
step_info = StepInfo.from_json_dict(json.loads(step_info_bytes))
self._step_info_cache[file_info.digest] = step_info
while len(self._step_info_cache) > self.STEP_INFO_CACHE_SIZE:
Expand Down Expand Up @@ -291,8 +289,8 @@ def _get_run_from_dataset(self, dataset: Dataset) -> Optional[Run]:
try:
run_name = dataset.name[len(Constants.RUN_DATASET_PREFIX) :]
steps: Dict[str, StepInfo] = {}
steps_info_bytes = b"".join(
list(self.beaker.dataset.stream_file(dataset, Constants.RUN_DATA_FNAME, quiet=True))
steps_info_bytes = self.beaker.dataset.get_file(
dataset, Constants.RUN_DATA_FNAME, quiet=True
)
steps_info = json.loads(steps_info_bytes)
except (DatasetNotFound, FileNotFoundError):
Expand Down
2 changes: 1 addition & 1 deletion tango/integrations/wandb/step_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ def __setitem__(self, step: Step, value: Any) -> None:
def __len__(self) -> int:
completed_cacheable_step_runs = self.wandb_client.runs(
f"{self.entity}/{self.project}",
filters={
filters={ # type: ignore
"config.job_type": "step",
"config.cacheable": True,
"state": "finished",
Expand Down
8 changes: 4 additions & 4 deletions tango/integrations/wandb/workspace.py
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ def registered_runs(self) -> Dict[str, Run]:
matching_runs = list(
self.wandb_client.runs(
f"{self.entity}/{self.project}",
filters={"config.job_type": RunKind.TANGO_RUN.value},
filters={"config.job_type": RunKind.TANGO_RUN.value}, # type: ignore
)
)
for wandb_run in matching_runs:
Expand All @@ -370,7 +370,7 @@ def registered_run(self, name: str) -> Run:
matching_runs = list(
self.wandb_client.runs(
f"{self.entity}/{self.project}",
filters={"display_name": name, "config.job_type": RunKind.TANGO_RUN.value},
filters={"display_name": name, "config.job_type": RunKind.TANGO_RUN.value}, # type: ignore
)
)
if not matching_runs:
Expand Down Expand Up @@ -414,7 +414,7 @@ def _get_updated_step_info(
filters["display_name"] = step_name
for wandb_run in self.wandb_client.runs(
f"{self.entity}/{self.project}",
filters=filters,
filters=filters, # type: ignore
):
step_info = StepInfo.from_json_dict(wandb_run.config["step_info"])
# Might need to fix the step info the step failed and we failed to update the config.
Expand All @@ -441,7 +441,7 @@ def _get_updated_step_info(
filters[f"config.steps.{step_name}.unique_id"] = step_id
for wandb_run in self.wandb_client.runs(
f"{self.entity}/{self.project}",
filters=filters,
filters=filters, # type: ignore
):
if step_name is not None:
step_info_data = wandb_run.config["steps"][step_name]
Expand Down

0 comments on commit 230d78e

Please sign in to comment.