Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add distribute add columns by ray #3369

Open
wants to merge 2 commits into
base: main
Choose a base branch
from

Conversation

Jay-ju
Copy link
Contributor

@Jay-ju Jay-ju commented Jan 11, 2025

@github-actions github-actions bot added enhancement New feature or request python labels Jan 11, 2025
@Jay-ju Jay-ju force-pushed the merge_column_distribute_ray branch 19 times, most recently from 1812181 to 5500c36 Compare January 13, 2025 11:33
Copy link
Contributor

@LuQQiu LuQQiu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for adding this great capability!

python/python/lance/ray/distribute_task.py Outdated Show resolved Hide resolved
python/python/lance/ray/distribute_task.py Show resolved Hide resolved
@Jay-ju Jay-ju force-pushed the merge_column_distribute_ray branch 3 times, most recently from 613bbdd to 7cd0508 Compare January 14, 2025 03:32
@LuQQiu LuQQiu requested a review from westonpace January 14, 2025 18:42
Copy link
Contributor

@LuQQiu LuQQiu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me! Will see if @westonpace has time to give a second pass

Copy link
Contributor

@westonpace westonpace left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for getting this started. We'll want more user-facing documentation at some point. Is your goal to expand on this?


@pytest.mark.filterwarnings("ignore::DeprecationWarning")
@pytest.mark.skip(
reason="This test local can run, but not in CI." "it's blocked by ray env"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What error do we get? The CI should be able to test Ray.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

test_ray module not found

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure why we would get that error. I can confirm that Ray tests are running in the latest CI runs:

image

Can you remove the skip so we can see the failure in the CI and work through it?

Copy link
Contributor Author

@Jay-ju Jay-ju Jan 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@westonpace https://github.com/lancedb/lance/actions/runs/12859405923/job/35849807699?pr=3369
How can I log in to this environment to check the running content?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Uploading 20250120-101845.jpeg…

python/python/tests/test_ray.py Outdated Show resolved Hide resolved
python/python/lance/ray/in_place_api.py Outdated Show resolved Hide resolved
@Jay-ju Jay-ju force-pushed the merge_column_distribute_ray branch from bab57fb to 827272f Compare January 15, 2025 01:45
@SaintBacchus
Copy link
Collaborator

Hi @Jay-ju can you add some doc about this pr?

self.partition = partition


class CustomTask:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CustomTask may be hard to understand. How about use RayBaseTask for these functions?

from .distribute_task import DistributeCustomTasks


def custom_inplace(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does this file in_place mean?

.write_lance(tmp_path, schema=schema)
)
lance_ds = LanceDatasource(uri=tmp_path)
add_columns(DistributeCustomTasks(lance_ds), generate_label, ["height"])
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we register the add_columns functions into lance data source like write_lance
image

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will align the team on Ray's side later. We need to take a look at the way this code is maintained together.
@SaintBacchus

Copy link

@richardliaw richardliaw Jan 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah this should go into a datasink (which allows you do to write_X)

Copy link
Contributor Author

@Jay-ju Jay-ju Jan 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah this should go into a datasink (which allows you do to write_X)

@richardliaw
Does putting the form of adding columns into the datasink seem to have a bit of semantic conflict?
I would rather have a custom task. This is the PR I originally proposed in the Ray community. Because currently the datasource only supports read/write, but multimodal data like lance would rather be able to support more semantics, such as update, add_column, compaction, and so on.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

However, I still hope that sinks can be uniformly maintained in the Ray community because there has been a situation where Lance cannot write through Ray due to interface changes in Ray sinks.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WE chatted with @westonpace yesterday -- I think starting a discussion on evolving datasinks to take care of state updates of the underlying storage makes sense (as compared to your previous PR which was putting it into the read/datasource API)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

However, I still hope that sinks can be uniformly maintained in the Ray community because there has been a situation where Lance cannot write through Ray due to interface changes in Ray sinks.

We also plan on upstreaming our datasink into Ray soon :)

Then yes, I agree with richardliaw. We can make a datasink (or add flags to the datasink) that sinks data into additional columns instead of creating a brand new dataset.

Copy link
Contributor Author

@Jay-ju Jay-ju Jan 31, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WE chatted with @westonpace yesterday -- I think starting a discussion on evolving datasinks to take care of state updates of the underlying storage makes sense (as compared to your previous PR which was putting it into the read/datasource API)

@richardliaw How should this be understood? just like this:

@ray.remote(scheduling_strategy=ctx.scheduling_strategy)
        class DataSink:
            def __init__(self):
                self.rows_written = 0
                self.enabled = True

            # new add columns function ??
            def add_columns(self,  value_fn: Callable[[pa.RecordBatch], pa.RecordBatch],
    read_columns: List[str]) -> None:
             xxxx

Copy link
Contributor Author

@Jay-ju Jay-ju Jan 31, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

However, I still hope that sinks can be uniformly maintained in the Ray community because there has been a situation where Lance cannot write through Ray due to interface changes in Ray sinks.

We also plan on upstreaming our datasink into Ray soon :)

Then yes, I agree with richardliaw. We can make a datasink (or add flags to the datasink) that sinks data into additional columns instead of creating a brand new dataset.
@westonpace
When is this ray lance datasink expected to be merged into ray? I want to see how these codes should be processed.

@LuQQiu
Copy link
Contributor

LuQQiu commented Jan 24, 2025

error_linux_39_x86.log

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request python
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants