diff --git a/dxgb_bench/datasets/generated.py b/dxgb_bench/datasets/generated.py index 56be947..151dde8 100644 --- a/dxgb_bench/datasets/generated.py +++ b/dxgb_bench/datasets/generated.py @@ -165,13 +165,15 @@ def save_Xy(X: np.ndarray, y: np.ndarray, i: int, saveto: list[str]) -> None: prev = 0 for b in range(n_dirs): - output = saveto[b] + output = os.path.abspath(saveto[b]) + assert os.path.exists(output), output end = min(X.shape[0], prev + n_samples_per_batch) assert end - prev > 0, "Empty partition is not supported yet." X_d = X[prev:end] y_d = y[prev:end] path = os.path.join(output, f"X_{X_d.shape[0]}_{X_d.shape[1]}-{i}-{b}.npa") + print("saveto:", os.path.abspath(path), flush=True) with kvikio.CuFile(path, "w") as fd: n_bytes = fd.write(X_d) assert n_bytes == X_d.nbytes diff --git a/dxgb_bench/dsk/__init__.py b/dxgb_bench/dsk/__init__.py index 0705b8b..830efd5 100644 --- a/dxgb_bench/dsk/__init__.py +++ b/dxgb_bench/dsk/__init__.py @@ -7,10 +7,12 @@ import numpy as np from dask import array as da from dask import dataframe as dd -from dask.distributed import Client, wait +from dask.distributed import Client, Future, wait +from dask.distributed.comm import parse_host_port from ..datasets.generated import make_dense_regression as mdr from ..datasets.generated import save_Xy +from ..utils import fprint def make_dense_regression( @@ -41,10 +43,40 @@ def make_dense_regression_scatter( saveto: str, local_test: bool, ) -> None: - saveto = os.path.expanduser(saveto) + saveto = os.path.abspath(os.path.expanduser(saveto)) + client.restart() if not os.path.exists(saveto): os.mkdir(saveto) + workers = list(client.scheduler_info()["workers"].keys()) + n_workers = len(workers) + + def rmtree(h: str) -> None: + fprint(f"rmtree: {saveto}") + if os.path.exists(saveto): + import shutil + + fprint(f"removing: {saveto}") + shutil.rmtree(saveto) + os.mkdir(saveto) + + futures: list[Future] = [] + + hosts = set() + for w in workers: + host, _ = parse_host_port(w) + hosts.add(host) + + for i in range(n_workers): + host, _ = parse_host_port(workers[i]) + if host in hosts: + print("submit:", host, workers[i]) + fut = client.submit(rmtree, host, workers=workers[i]) + futures.append(fut) + hosts.remove(host) + + client.gather(futures) + def make(n_samples: int, batch_idx: int, seed: int) -> int: X, y = mdr( device=device, @@ -53,30 +85,21 @@ def make(n_samples: int, batch_idx: int, seed: int) -> int: sparsity=0.0, random_state=seed, ) - if local_test: - path = os.path.join(saveto, str(batch_idx)) - else: - path = saveto - if os.path.exists(path): - import shutil - print(f"removing: {path}") - shutil.rmtree(path) - if not os.path.exists(path): - os.mkdir(path) - - save_Xy(X, y, 0, [path]) + save_Xy(X, y, batch_idx, [saveto]) return n_samples - workers = client.scheduler_info()["workers"] - n_workers = len(workers) + if n_samples > n_workers * 8: + n_tasks = n_workers * 8 + else: + n_tasks = n_workers - n_samples_per_worker = int(math.ceil(n_samples / n_workers)) + n_samples_per_task = int(math.ceil(n_samples / n_tasks)) last = 0 futures = [] - for i in range(n_workers): - batch_size = min(n_samples_per_worker, n_samples - last) - fut = client.submit(make, batch_size, i, last, workers=[workers[i]]) + for i in range(n_tasks): + batch_size = min(n_samples_per_task, n_samples - last) + fut = client.submit(make, batch_size, i, last, workers=[workers[i % n_workers]]) last += batch_size futures.append(fut) @@ -90,7 +113,7 @@ def load_dense_gather( loadfrom = os.path.expanduser(loadfrom) - def get_shape(batch_idx: int) -> tuple[int, int]: + def get_shape(batch_idx: int) -> list[tuple[int, int]]: if local_test: path = os.path.join(loadfrom, str(batch_idx)) else: @@ -98,8 +121,11 @@ def get_shape(batch_idx: int) -> tuple[int, int]: X, y = get_file_paths_local(path) print(X, y) - x, n_samples, n_features, batch_idx, shard_idx = get_pinfo(X[0]) - return n_samples, n_features + shapes = [] + for xp in X: + _, n_samples, n_features, _, _ = get_pinfo(xp) + shapes.append((n_samples, n_features)) + return shapes def load(batch_idx: int) -> np.ndarray: if local_test: @@ -120,7 +146,7 @@ def load(batch_idx: int) -> np.ndarray: assert Xy.shape[1] == X.shape[1] + 1 return Xy - workers = client.scheduler_info()["workers"] + workers = list(client.scheduler_info()["workers"].keys()) n_workers = len(workers) print(f"n_workers: {n_workers}") @@ -128,8 +154,12 @@ def load(batch_idx: int) -> np.ndarray: for i in range(n_workers): fut = client.submit(get_shape, i, workers=[workers[i]]) futures.append(fut) - shapes = client.gather(futures) - print(shapes) + shapes_local = client.gather(futures) + print("shapes_local:", shapes_local) + shapes = [] + for s in shapes_local: + shapes.extend(s) + arrays = [] for i in range(n_workers): fut = client.submit(load, i, workers=[workers[i]]) diff --git a/dxgb_bench/dsk/algorithm.py b/dxgb_bench/dsk/algorithm.py index 0850921..c817b73 100644 --- a/dxgb_bench/dsk/algorithm.py +++ b/dxgb_bench/dsk/algorithm.py @@ -7,6 +7,7 @@ from distributed import Client from typing_extensions import TypeAlias from xgboost import dask as dxgb +from xgboost.collective import Config as CollCfg from ..utils import DC, ID, Progress, Timer, fprint @@ -127,7 +128,7 @@ def fit(self, X: DC, y: DC, weight: Optional[DC] = None) -> EvalsLog: callbacks = [Progress(self.num_boost_round)] evals = [] - with xgb.config_context(use_rmm=True): + with xgb.config_context(use_rmm=True, verbosity=2): with Timer(self.name, "train"): output = dxgb.train( client=self.client, @@ -136,6 +137,7 @@ def fit(self, X: DC, y: DC, weight: Optional[DC] = None) -> EvalsLog: evals=evals, num_boost_round=self.num_boost_round, callbacks=callbacks, + coll_cfg=CollCfg(timeout=60), ) self.booster = output["booster"] return output["history"]