Skip to content
This repository has been archived by the owner on Mar 11, 2024. It is now read-only.

Commit

Permalink
⚡ use native parquet reader (#41)
Browse files Browse the repository at this point in the history
* ⚡ use native parquet reader

* fix CI to install polars and Dagster combinations properly
  • Loading branch information
danielgafni authored Oct 31, 2023
1 parent 293b7d6 commit 3223d53
Show file tree
Hide file tree
Showing 4 changed files with 117 additions and 50 deletions.
8 changes: 4 additions & 4 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ on:

jobs:
test:
name: test ${{ matrix.py }} - ${{ matrix.os }} - polars=${{ matrix.polars_version }} - dagster=${{ matrix.dagster_version }}
name: test polars=${{ matrix.polars_version }} dagster=${{ matrix.dagster_version }} py=${{ matrix.py }} ${{ matrix.os }}
runs-on: ${{ matrix.os }}-latest
strategy:
fail-fast: false
Expand Down Expand Up @@ -51,7 +51,7 @@ jobs:
virtualenvs-in-project: false
installer-parallel: true
- name: Install dependencies
run: poetry install --all-extras --sync && pip install --ignore-installed --force-reinstall polars~=${{ matrix.polars_version }} dagster~=${{ matrix.dagster_version }}
run: poetry install --all-extras --sync && pip install --force-reinstall polars~=${{ matrix.polars_version }} dagster~=${{ matrix.dagster_version }}
- name: Print polars info
run: python -c 'import polars; print(polars.show_versions())'
- name: Print dagster info
Expand All @@ -60,7 +60,7 @@ jobs:
run: pytest -v .

lint:
name: lint ${{ matrix.py }} - ${{ matrix.os }} - polars=${{ matrix.polars_version }} - dagster=${{ matrix.dagster_version }}
name: lint polars=${{ matrix.polars_version }} dagster=${{ matrix.dagster_version }} py=${{ matrix.py }} ${{ matrix.os }}
runs-on: ${{ matrix.os }}-latest
strategy:
fail-fast: false
Expand Down Expand Up @@ -93,7 +93,7 @@ jobs:
virtualenvs-in-project: false
installer-parallel: true
- name: Install dependencies
run: poetry install --all-extras --sync && pip install --ignore-installed --force-reinstall polars~=${{ matrix.polars_version }} dagster~=${{ matrix.dagster_version }}
run: poetry install --all-extras --sync && pip install --force-reinstall polars~=${{ matrix.polars_version }} dagster~=${{ matrix.dagster_version }}
- name: Run pre-commit hooks
run: pre-commit run --all-files

Expand Down
105 changes: 83 additions & 22 deletions dagster_polars/io_managers/parquet.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
import json
from typing import Optional, Union
from typing import Any, Optional, Union

import fsspec
import polars as pl
import pyarrow as pa
import pyarrow.dataset
import pyarrow.parquet
from dagster import InputContext, OutputContext
from packaging.version import Version
from pyarrow import Table
from upath import UPath

Expand All @@ -15,8 +16,84 @@
from dagster_polars.types import LazyFrameWithMetadata, StorageMetadata


def get_pyarrow_dataset(path: UPath, context: InputContext) -> pyarrow.dataset.Dataset:
assert context.metadata is not None

fs: Union[fsspec.AbstractFileSystem, None] = None

try:
fs = path._accessor._fs
except AttributeError:
pass

ds = pyarrow.dataset.dataset(
str(path),
filesystem=fs,
format=context.metadata.get("format", "parquet"),
partitioning=context.metadata.get("partitioning"),
partition_base_dir=context.metadata.get("partition_base_dir"),
exclude_invalid_files=context.metadata.get("exclude_invalid_files", True),
ignore_prefixes=context.metadata.get("ignore_prefixes", [".", "_"]),
)

return ds


def scan_parquet_legacy(path: UPath, context: InputContext) -> pl.LazyFrame:
"""
Scan a parquet file and return a lazy frame. Uses pyarrow.
:param path:
:param context:
:return:
"""
assert context.metadata is not None

ldf = pl.scan_pyarrow_dataset(
get_pyarrow_dataset(path, context),
allow_pyarrow_filter=context.metadata.get("allow_pyarrow_filter", True),
)

return ldf


def scan_parquet(path: UPath, context: InputContext) -> pl.LazyFrame:
"""
Scan a parquet file and return a lazy frame. Uses polars native reader.
:param path:
:param context:
:return:
"""
assert context.metadata is not None

storage_options: Optional[dict[str, Any]] = None

try:
storage_options = path.storage_options
except AttributeError:
# TODO: explore removing this as universal-pathlib should always provide storage_options in newer versions
pass

kwargs = dict(
n_rows=context.metadata.get("n_rows", None),
cache=context.metadata.get("cache", True),
parallel=context.metadata.get("parallel", "auto"),
rechunk=context.metadata.get("rechunk", True),
row_count_name=context.metadata.get("row_count_name", None),
row_count_offset=context.metadata.get("row_count_offset", 0),
low_memory=context.metadata.get("low_memory", False),
use_statistics=context.metadata.get("use_statistics", True),
)

if Version(pl.__version__) > Version("0.19.4"):
kwargs["hive_partitioning"] = context.metadata.get("hive_partitioning", True)
kwargs["retries"] = context.metadata.get("retries", 0)

return pl.scan_parquet(str(path), storage_options=storage_options, **kwargs) # type: ignore


class PolarsParquetIOManager(BasePolarsUPathIOManager):
extension: str = ".parquet"
use_legacy_reader: bool = False

assert BasePolarsUPathIOManager.__doc__ is not None
__doc__ = (
Expand Down Expand Up @@ -74,31 +151,15 @@ def scan_df_from_path(
) -> Union[pl.LazyFrame, LazyFrameWithMetadata]:
assert context.metadata is not None

fs: Union[fsspec.AbstractFileSystem, None] = None

try:
fs = path._accessor._fs
except AttributeError:
pass

ds = pyarrow.dataset.dataset(
str(path),
filesystem=fs,
format=context.metadata.get("format", "parquet"),
partitioning=context.metadata.get("partitioning"),
partition_base_dir=context.metadata.get("partition_base_dir"),
exclude_invalid_files=context.metadata.get("exclude_invalid_files", True),
ignore_prefixes=context.metadata.get("ignore_prefixes", [".", "_"]),
)

ldf = pl.scan_pyarrow_dataset(
ds,
allow_pyarrow_filter=context.metadata.get("allow_pyarrow_filter", True),
)
if self.use_legacy_reader or Version(pl.__version__) < Version("0.19.4"):
ldf = scan_parquet_legacy(path, context)
else:
ldf = scan_parquet(path, context)

if not with_metadata:
return ldf
else:
ds = get_pyarrow_dataset(path, context)
dagster_polars_metadata = (
ds.schema.metadata.get(DAGSTER_POLARS_STORAGE_METADATA_KEY.encode("utf-8"))
if ds.schema.metadata is not None
Expand Down
53 changes: 29 additions & 24 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ typing-extensions = "^4.7.1"

deltalake = { version = ">=0.10.0", optional = true }
dagster-gcp = { version = ">=0.19.5", optional = true }
universal-pathlib = "^0.1.4"


[tool.poetry.extras]
Expand Down

0 comments on commit 3223d53

Please sign in to comment.