Skip to content

Commit

Permalink
[data/preprocessors] feat: allow simple imputer to execute on append …
Browse files Browse the repository at this point in the history
…mode (ray-project#50713)

<!-- Thank you for your contribution! Please review
https://github.com/ray-project/ray/blob/master/CONTRIBUTING.rst before
opening a pull request. -->

<!-- Please add a reviewer to the assignee section when you create a PR.
If you don't have the access to it, we will shortly find a reviewer and
assign them to your PR. -->

## Why are these changes needed?

This is part of ray-project#48133.
Continuing the approach taken in
ray-project#49426, make all the simple
imputer work in append mode

## Related issue number

ray-project#48133

## Checks

- [x] I've signed off every commit(by using the -s flag, i.e., `git
commit -s`) in this PR.
- [x] I've run `scripts/format.sh` to lint the changes in this PR.
- [x] I've included any doc changes needed for
https://docs.ray.io/en/master/.
- [x] I've added any new APIs to the API Reference. For example, if I
added a
method in Tune, I've added it in `doc/source/tune/api/` under the
           corresponding `.rst` file.
- [x] I've made sure the tests are passing. Note that there might be a
few flaky tests, see the recent failures at https://flakey-tests.ray.io/
- Testing Strategy
   - [x] Unit tests
   - [ ] Release tests
   - [ ] This PR is not tested :(

Signed-off-by: Martin Bomio <[email protected]>
  • Loading branch information
martinbomio authored Feb 20, 2025
1 parent 39ff2fa commit 6bfe146
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 10 deletions.
34 changes: 29 additions & 5 deletions python/ray/data/preprocessors/imputer.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,17 @@ class SimpleImputer(Preprocessor):
2 3.0 c
3 3.0 c
:class:`SimpleImputer` can also be used in append mode by providing the
name of the output_columns that should hold the imputed values.
>>> preprocessor = SimpleImputer(columns=["X"], output_columns=["X_imputed"], strategy="mean")
>>> preprocessor.fit_transform(ds).to_pandas() # doctest: +SKIP
X Y X_imputed
0 0.0 None 0.0
1 NaN b 2.0
2 3.0 c 3.0
3 3.0 c 3.0
Args:
columns: The columns to apply imputation to.
strategy: How imputed values are chosen.
Expand All @@ -75,6 +86,10 @@ class SimpleImputer(Preprocessor):
* ``"constant"``: The value passed to ``fill_value``.
fill_value: The value to use when ``strategy`` is ``"constant"``.
output_columns: The names of the transformed columns. If None, the transformed
columns will be the same as the input columns. If not None, the length of
``output_columns`` must match the length of ``columns``, othwerwise an error
will be raised.
Raises:
ValueError: if ``strategy`` is not ``"mean"``, ``"most_frequent"``, or
Expand All @@ -88,6 +103,8 @@ def __init__(
columns: List[str],
strategy: str = "mean",
fill_value: Optional[Union[str, Number]] = None,
*,
output_columns: Optional[List[str]] = None,
):
self.columns = columns
self.strategy = strategy
Expand All @@ -107,6 +124,10 @@ def __init__(
'`fill_value` must be set when using "constant" strategy.'
)

self.output_columns = Preprocessor._derive_and_validate_output_columns(
columns, output_columns
)

def _fit(self, dataset: Dataset) -> Preprocessor:
if self.strategy == "mean":
aggregates = [Mean(col) for col in self.columns]
Expand All @@ -117,7 +138,7 @@ def _fit(self, dataset: Dataset) -> Preprocessor:
return self

def _transform_pandas(self, df: pd.DataFrame):
for column in self.columns:
for column, output_column in zip(self.columns, self.output_columns):
value = self._get_fill_value(column)

if value is None:
Expand All @@ -128,11 +149,13 @@ def _transform_pandas(self, df: pd.DataFrame):

if column not in df.columns:
# Create the column with the fill_value if it doesn't exist
df[column] = value
df[output_column] = value
else:
if is_categorical_dtype(df.dtypes[column]):
df[column] = df[column].cat.add_categories([value])
df[column].fillna(value, inplace=True)
df[output_column] = df[column].cat.add_categories([value])
if output_column != column:
df[output_column] = df[column].copy(deep=True)
df[output_column].fillna(value, inplace=True)

return df

Expand All @@ -152,7 +175,8 @@ def _get_fill_value(self, column):
def __repr__(self):
return (
f"{self.__class__.__name__}(columns={self.columns!r}, "
f"strategy={self.strategy!r}, fill_value={self.fill_value!r})"
f"strategy={self.strategy!r}, fill_value={self.fill_value!r}, "
f"output_columns={self.output_columns!r})"
)


Expand Down
46 changes: 41 additions & 5 deletions python/ray/data/tests/preprocessors/test_imputer.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ def test_simple_imputer():
{"A": processed_col_a, "B": processed_col_b, "C": processed_col_c}
)

assert out_df.equals(expected_df)
pd.testing.assert_frame_equal(out_df, expected_df)

# Transform batch.
pred_col_a = [1, 2, np.nan]
Expand All @@ -59,7 +59,7 @@ def test_simple_imputer():
}
)

assert pred_out_df.equals(pred_expected_df)
pd.testing.assert_frame_equal(pred_out_df, pred_expected_df, check_like=True)

# with missing column
pred_in_df = pd.DataFrame.from_dict({"A": pred_col_a, "B": pred_col_b})
Expand All @@ -71,7 +71,39 @@ def test_simple_imputer():
"C": pred_processed_col_c,
}
)
assert pred_out_df.equals(pred_expected_df)
pd.testing.assert_frame_equal(pred_out_df, pred_expected_df, check_like=True)

# append mode
with pytest.raises(ValueError):
SimpleImputer(columns=["B", "C"], output_columns=["B_encoded"])

imputer = SimpleImputer(
columns=["B", "C"],
output_columns=["B_imputed", "C_imputed"],
)
imputer.fit(ds)

pred_col_a = [1, 2, np.nan]
pred_col_b = [1, 2, np.nan]
pred_col_c = [None, None, None]
pred_in_df = pd.DataFrame.from_dict(
{"A": pred_col_a, "B": pred_col_b, "C": pred_col_c}
)
pred_out_df = imputer.transform_batch(pred_in_df)

pred_processed_col_b = [1.0, 2.0, 2.0]
pred_processed_col_c = [1.0, 1.0, 1.0]
pred_expected_df = pd.DataFrame.from_dict(
{
"A": pred_col_a,
"B": pred_col_b,
"C": pred_col_c,
"B_imputed": pred_processed_col_b,
"C_imputed": pred_processed_col_c,
}
)

pd.testing.assert_frame_equal(pred_out_df, pred_expected_df, check_like=True)

# Test "most_frequent" strategy.
most_frequent_col_a = [1, 2, 2, None, None, None]
Expand All @@ -97,7 +129,9 @@ def test_simple_imputer():
{"A": most_frequent_processed_col_a, "B": most_frequent_processed_col_b}
)

assert most_frequent_out_df.equals(most_frequent_expected_df)
pd.testing.assert_frame_equal(
most_frequent_out_df, most_frequent_expected_df, check_like=True
)

# Test "constant" strategy.
constant_col_a = ["apple", None]
Expand All @@ -123,7 +157,9 @@ def test_simple_imputer():
)
constant_expected_df["B"] = constant_expected_df["B"].astype("category")

assert constant_out_df.equals(constant_expected_df)
pd.testing.assert_frame_equal(
constant_out_df, constant_expected_df, check_like=True
)


def test_imputer_all_nan_raise_error():
Expand Down

0 comments on commit 6bfe146

Please sign in to comment.