Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support lazy schema retrieval in IO Plugins #18638

Closed
tmct opened this issue Sep 9, 2024 · 15 comments · Fixed by #20982
Closed

Support lazy schema retrieval in IO Plugins #18638

tmct opened this issue Sep 9, 2024 · 15 comments · Fixed by #20982
Assignees
Labels
accepted Ready for implementation enhancement New feature or an improvement of an existing feature

Comments

@tmct
Copy link

tmct commented Sep 9, 2024

Description

Hi,

I just wrote my first Python IO Plugin, for fetching data from a company-internal data source - very pleased that you can create LazyFrames of completely custom origin now, thanks! This brings us one step closer to expressing entire data processing pipelines declaratively without breaks using the Lazy API.

What is missing from the example, and would really help me to put my scan_custom_data_source into production, is the ability to collect the schema lazily, as has been recently-ish made possible in LazyFrames.

Is there already a way to do this, or are changes needed to register_io_source?

Many thanks,
Tom

@tmct tmct added the enhancement New feature or an improvement of an existing feature label Sep 9, 2024
@tmct
Copy link
Author

tmct commented Sep 10, 2024

Let me know if this is worth another Issue, but I am also interested in the scenario where I am able to leverage existing built-in LazyFrame scans, but want to do some lazily-defined work beforehand.

Motivating example: I would like to write my own scan_encrypted_parquet method. I can of course write a register_io_source callable which decrypts files and yields DataFrames, but this means that any compute graph defined within that method is disconnected from later compute. And the onus is then on me to pass through the with_columns args etc appropriately. Imagine for example a similar function to register_io_source, whose callable argument returns LazyFrames.

@tmct
Copy link
Author

tmct commented Sep 10, 2024

Here is my work in progress, illustrating both questions I have:

def scan_parquet_sc(
    f: Path,
    schema: SchemaDict,  # ideally we would collect this lazily (and maybe cache the result if collect_schema is called before collect.) But until then, don't get this schema wrong!
) -> pl.LazyFrame:
    """Decrypts a parquet file then scans.
    
    Don't ask why we're encrypting the whole file instead of using the Parquet columnar standard..."""

    def pq_source(
        with_columns: list[str] | None,
        predicate: pl.Expr | None,
        _n_rows: int | None,
        _batch_size: int | None,
    ) -> Iterator[pl.DataFrame]:
        res = _decrypt_and_scan_parquet_sc(f, with_columns, predicate, _n_rows, _batch_size)
        if _batch_size is not None:
            logger.warning(f"Was passed {_batch_size=} but will ignore for now - maybe we should collect row group by row group then check if that's big enough to batch")
        yield res.collect()  # if batch=None I would perhaps like a way to return a LazyFrame?

    return register_io_source(pq_source, schema=schema)


def _decrypt_and_scan_parquet_sc(f: Path,
       with_columns: list[str] | None,
       predicate: pl.Expr | None,
        n_rows: int | None,
       _batch_size: int | None,
   ) -> pl.LazyFrame:
    pq_bytes = BytesIO(crypt.decrypt_bytes(f.read_bytes()))
    df = pl.scan_parquet(pq_bytes)  # once #10413 is released...
    if with_columns is not None:
        df = df.select(with_columns)
    if predicate is not None:
        df = df.filter(predicate)
    if _n_rows is not None:
        df = df.head(_n_rows)  # I'm pretty sure you want head and not fetch here
    return df


def _collect_schema_parquet_sc(f: Path):
    # TODO implement and hook in when supported
    ...

@ritchie46
Copy link
Member

Hey @tmct, great that you made an IO plugin! That's where they are for.

This example shows the ability to create the schema lazily:

https://github.com/pola-rs/pyo3-polars/tree/main/example/io_plugin

@tmct
Copy link
Author

tmct commented Sep 11, 2024

Many thanks Ritchie, I'll give it a go.

@tmct
Copy link
Author

tmct commented Sep 11, 2024

Does 'src.schema()' in that example not realise the schema at the time of scan_... though, rather than lazily? I do not understand the significance of that last instance of the Source, so likely I've misunderstood how it works

@tmct
Copy link
Author

tmct commented Sep 20, 2024

I suppose another way of describing what I want is this:

Currently I can provide a custom Python function that can be used as the basis for a LazyFrame collect() - this is the IO plugins feature. To do so, I must currently provide a fixed schema up front. (And - I am pleased that this extensibility point exists!)

But - a general LazyFrame returned from e.g. scan_parquets followed by subsequent LazyFrame operations does not necessarily know its schema up front. (Indeed, I believe the case where this is non-trivial motivated the introduction of collect_schema()?)

So - I wish to implement such "fully lazy" LazyFrames using custom Python functions please, so that I can use the Polars Lazy API as a fully lazy description of my intended tabular processing. I imagine this could take the form of me passing a python callable returning the schema.

Do you think this would be possible? Desirable?

Thank you

@tmct
Copy link
Author

tmct commented Oct 14, 2024

I've made an attempt at starting the public-facing side of this feature in #19232.

Any feedback or help would be much welcomed, thanks.

@tmct
Copy link
Author

tmct commented Oct 30, 2024

Thanks to some help from @itamarst I now have a branch of my fork with this feature on it! It's not prod-ready at this point but I will have a play with it.

Example (passing) test from poc branch: https://github.com/tmct/polars/blob/8773d9c/py-polars/tests/unit/io/test_plugins.py#L39

@ritchie46
Copy link
Member

ritchie46 commented Jan 29, 2025

@tmct your function can register the callable every scan_io_function and on that registration you provided the schema.

In pseudo code

def scan_foo(file_name) -> pl.LazyFrame:
    def generator(

    ) -> Iterator[pl.DataFrame]:
        """
        Generator function that creates the source.
        This function will be registered as IO source.
        """

        new_size = size
        if n_rows is not None and n_rows < size:
            new_size = n_rows

        src = MySource()
        if with_columns is not None:
            src.set_with_columns(with_columns)

        # Set the predicate.
        predicate_set = True
        if predicate is not None:
            try:
                src.try_set_predicate(predicate)
            except pl.exceptions.ComputeError:
                predicate_set = False

        while (out := src.next()) is not None:
            # If the source could not apply the predicate
            # (because it wasn't able to deserialize it), we do it here.
            if not predicate_set and predicate is not None:
                out = out.filter(predicate)

            yield out

    # create src again to compute the schema
    src = MySource(samplers, 0, 0)
    return register_io_source(callable=source_generator, schema=src.schema())

@adamreeve
Copy link
Contributor

Hi @ritchie46, your example doesn't solve this issue. What we want is to not have to know the schema until collect, collect_schema or explain etc are called. In your example, you still need to do some IO to determine the schema when scan_foo is called (where you call src.schema()).

The built-in scan_parquet etc methods don't have this behaviour, scan_parquet doesn't immediately read the Parquet file when it is called, this is deferred until the schema is needed when creating an execution plan.

@c-peters c-peters added the accepted Ready for implementation label Feb 3, 2025
@c-peters c-peters moved this to Done in Backlog Feb 3, 2025
@c-peters c-peters added this to Backlog Feb 3, 2025
@ritchie46
Copy link
Member

Right, I agree. It differs from the other scans. It is something that would be problematic for cloud. Is it problematic for your use cases though?

Maybe the schema should accept a callable that returns the schema.

@adamreeve
Copy link
Contributor

Is it problematic for your use cases though?

I'm not 100% sure of the details, I'll let @tmct answer that.

Maybe the schema should accept a callable that returns the schema.

Yes, this is the approach being taken in #19232 (see https://github.com/pola-rs/polars/pull/19232/files#diff-5cab423b4448a3a63943ca166f0129ffd7425c114acc8f05835e76b5f9046de3R43)

@ritchie46
Copy link
Member

Will make it a callable. 👍

@tmct
Copy link
Author

tmct commented Feb 3, 2025

Yes, the lack of this has been problematic - for examples we have some abstractions that wrap lazyframes, and I think these can and should become pure LazyFrames, accompanied by domain-specific extensions and IO sources. We have in-house Python libraries that retrieve data of varying schema, and it is going to be hugely faster to get these working as IO Plugins (or at least prove the concept...) than full Rust rewrites.

Passing a callable sounds great to me - thank you very much.

@ritchie46
Copy link
Member

That should still work. All scans had the schema in the DSL a year ago and that worked fine for local execution. I am adding the callable option, but I do think it should be workable as is already.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
accepted Ready for implementation enhancement New feature or an improvement of an existing feature
Projects
Status: Done
Development

Successfully merging a pull request may close this issue.

4 participants