Skip to content

Commit

Permalink
expanding profiler runtimes support (#631)
Browse files Browse the repository at this point in the history
* profiler spark implementation

* fixed pyproject

* fixed Makefile

* add Spark

* completed PR

* completed PR
  • Loading branch information
blublinsky authored Sep 30, 2024
1 parent 5f9757f commit fdabeaf
Show file tree
Hide file tree
Showing 42 changed files with 2,410 additions and 577 deletions.
2 changes: 2 additions & 0 deletions .make.versions
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,9 @@ NOOP_PYTHON_VERSION=$(DPK_VERSION)
NOOP_RAY_VERSION=$(DPK_VERSION)
NOOP_SPARK_VERSION=$(DPK_VERSION)

PROFILER_PYTHON_VERSION=$(DPK_VERSION)
PROFILER_RAY_VERSION=$(DPK_VERSION)
PROFILER_SPARK_VERSION=$(DPK_VERSION)

RESIZE_PYTHON_VERSION=$(DPK_VERSION)
RESIZE_RAY_VERSION=$(DPK_VERSION)
Expand Down
44 changes: 35 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -126,33 +126,59 @@ Now that you have run a single transform, the next step is to explore how to put
The matrix below shows the the combination of modules and supported runtimes. All the modules can be accessed [here](transforms) and can be combined to form data processing pipelines, as shown in the [examples](examples) folder.


| Modules | Python-only | Ray | Spark | KFP on Ray |
|----------------------------------|--------------------|------------------|------------------|------------------------|
| **Data Ingestion** | | | | |
| Modules | Python-only | Ray | Spark | KFP on Ray |
|----------------------------------|------------------|------------------|------------------|------------------------|
| **Data Ingestion** | | | | |
| [Code (from zip) to Parquet](transforms/code/code2parquet/python/README.md) | :white_check_mark: |:white_check_mark:| |:white_check_mark: |
| [PDF to Parquet](transforms/language/pdf2parquet/python/README.md) | :white_check_mark: |:white_check_mark:| |:white_check_mark: |
|[HTML to Parquet](transforms/universal/html2parquet/python/README.md) |:white_check_mark:| | | |
| **Universal (Code & Language)** | | | | |
| **Universal (Code & Language)** | | | | |
| [Exact dedup filter](transforms/universal/ededup/ray/README.md) | :white_check_mark: |:white_check_mark:| |:white_check_mark: |
| [Fuzzy dedup filter](transforms/universal/fdedup/ray/README.md) | |:white_check_mark:| |:white_check_mark: |
| [Fuzzy dedup filter](transforms/universal/fdedup/ray/README.md) | |:white_check_mark:| |:white_check_mark: |
| [Unique ID annotation](transforms/universal/doc_id/ray/README.md) | :white_check_mark: |:white_check_mark:|:white_check_mark:|:white_check_mark: |
| [Filter on annotations](transforms/universal/filter/python/README.md) | :white_check_mark: |:white_check_mark:|:white_check_mark:|:white_check_mark: |
| [Profiler](transforms/universal/profiler/ray/README.md) | |:white_check_mark:| |:white_check_mark: |
| [Profiler](transforms/universal/profiler/ray/README.md) | :white_check_mark: |:white_check_mark:|:white_check_mark: |:white_check_mark: |
| [Resize](transforms/universal/resize/python/README.md) | :white_check_mark: |:white_check_mark:|:white_check_mark: |:white_check_mark: |
| [Tokenizer](transforms/universal/tokenization/python/README.md) | :white_check_mark: |:white_check_mark:| |:white_check_mark: |
| **Language-only** | | | | |
| **Language-only** | | | | |
| [Language identification](transforms/language/lang_id/python/README.md) | :white_check_mark: |:white_check_mark:| |:white_check_mark: |
| [Document quality](transforms/language/doc_quality/python/README.md) | :white_check_mark: |:white_check_mark:| |:white_check_mark: |
| [Document chunking for RAG](transforms/language/doc_chunk/python/README.md) | :white_check_mark: |:white_check_mark:| |:white_check_mark: |
| [Text encoder](transforms/language/text_encoder/python/README.md) | :white_check_mark: |:white_check_mark:| |:white_check_mark: |
| [PII Annotator/Redactor](transforms/language/pii_redactor/python/README.md)| :white_check_mark:| :white_check_mark: | | :white_check_mark: |
| **Code-only** | | | | |
| **Code-only** | | | | |
| [Programming language annnotation](transforms/code/proglang_select/python/README.md) | :white_check_mark: |:white_check_mark:| |:white_check_mark: |
| [Code quality annotation](transforms/code/code_quality/python/README.md) | :white_check_mark: |:white_check_mark:| |:white_check_mark: |
| [Malware annotation](transforms/code/malware/python/README.md) | :white_check_mark: |:white_check_mark:| |:white_check_mark: |
| [Header cleanser](transforms/code/header_cleanser/python/README.md) | :white_check_mark: |:white_check_mark:| |:white_check_mark: |
| [Semantic file ordering](transforms/code/repo_level_ordering/ray/README.md) | |:white_check_mark:| | |
| [Semantic file ordering](transforms/code/repo_level_ordering/ray/README.md) | |:white_check_mark:| | |
| [License Select Annotation](transforms/code/license_select/python/README.md) | :white_check_mark: |:white_check_mark:| |:white_check_mark: |
| Modules | Python-only | Ray | Spark | KFP on Ray |
|----------------------------------|------------------|------------------|---------------------|------------------------|
| **Data Ingestion** | | | | |
| [Code (from zip) to Parquet](transforms/code/code2parquet/python/README.md) | :white_check_mark: |:white_check_mark:| |:white_check_mark: |
| [PDF to Parquet](transforms/language/pdf2parquet/python/README.md) | :white_check_mark: |:white_check_mark:| |:white_check_mark: |
|[HTML to Parquet](transforms/universal/html2parquet/python/README.md) |:white_check_mark:| | | |
| **Universal (Code & Language)** | | | | |
| [Exact dedup filter](transforms/universal/ededup/ray/README.md) | :white_check_mark: |:white_check_mark:| |:white_check_mark: |
| [Fuzzy dedup filter](transforms/universal/fdedup/ray/README.md) | |:white_check_mark:| |:white_check_mark: |
| [Unique ID annotation](transforms/universal/doc_id/ray/README.md) | :white_check_mark: |:white_check_mark:| :white_check_mark: |:white_check_mark: |
| [Filter on annotations](transforms/universal/filter/python/README.md) | :white_check_mark: |:white_check_mark:| :white_check_mark: |:white_check_mark: |
| [Profiler](transforms/universal/profiler/ray/README.md) | :white_check_mark: |:white_check_mark:| :white_check_mark: |:white_check_mark: |
| [Resize](transforms/universal/resize/python/README.md) | :white_check_mark: |:white_check_mark:| |:white_check_mark: |
| [Tokenizer](transforms/universal/tokenization/python/README.md) | :white_check_mark: |:white_check_mark:| |:white_check_mark: |
| **Language-only** | | | | |
| [Language identification](transforms/language/lang_id/python/README.md) | :white_check_mark: |:white_check_mark:| |:white_check_mark: |
| [Document quality](transforms/language/doc_quality/python/README.md) | :white_check_mark: |:white_check_mark:| |:white_check_mark: |
| [Document chunking for RAG](transforms/language/doc_chunk/python/README.md) | :white_check_mark: |:white_check_mark:| |:white_check_mark: |
| [Text encoder](transforms/language/text_encoder/python/README.md) | :white_check_mark: |:white_check_mark:| |:white_check_mark: |
| [PII Annotator/Redactor](transforms/language/pii_redactor/python/README.md)| :white_check_mark:| :white_check_mark: | | :white_check_mark: |
| **Code-only** | | | | |
| [Programming language annnotation](transforms/code/proglang_select/python/README.md) | :white_check_mark: |:white_check_mark:| |:white_check_mark: |
| [Code quality annotation](transforms/code/code_quality/python/README.md) | :white_check_mark: |:white_check_mark:| |:white_check_mark: |
| [Malware annotation](transforms/code/malware/python/README.md) | :white_check_mark: |:white_check_mark:| |:white_check_mark: |
| [Header cleanser](transforms/code/header_cleanser/python/README.md) | :white_check_mark: |:white_check_mark:| |:white_check_mark: |
| [Semantic file ordering](transforms/code/repo_level_ordering/ray/README.md) | |:white_check_mark:| | |


Contributors are welcome to add new modules to expand to other data modalities as well as add runtime support for existing modules!
Expand Down
25 changes: 17 additions & 8 deletions transforms/universal/profiler/README.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,19 @@
# Profiler Transform

Per the set of
[transform project conventions](../../README.md#transform-project-conventions)
the following runtimes are available:

* [ray](ray/README.md) - enables the running of the base python transformation
in a Ray runtime
* [kfp_ray](kfp_ray/README.md) - enables running the ray docker image for
the transformer in a kubernetes cluster using a generated `yaml` file.
Profiler implement a word count. Typical implementation of the word count is done using map reduce.
* It’s O(N2) complexity
* shuffling with lots of data movement

Implementation here is using “streaming” aggregation, based on central cache:

* At the heart of the implementation is a cache of partial word counts, implemented as a set of Ray actors and containing
word counts processed so far.
* Individual data processors are responsible for:
* Reading data from data plane
* tokenizing documents (we use pluggable tokenizer)
* Coordinating with distributed cache to collect overall word counts

The complication of mapping this model to transform model is the fact that implementation requires an aggregators cache,
that transform mode knows nothing about. The solution here is to use transform runtime to create cache
and pass it as a parameter to transforms.

1 change: 1 addition & 0 deletions transforms/universal/profiler/python/.dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
venv/
39 changes: 39 additions & 0 deletions transforms/universal/profiler/python/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
FROM docker.io/python:3.10.14-slim-bullseye

RUN pip install --upgrade --no-cache-dir pip

# install pytest
RUN pip install --no-cache-dir pytest

# Create a user and use it to run the transform
RUN useradd -ms /bin/bash dpk
USER dpk
WORKDIR /home/dpk

# Copy and install data processing libraries
# These are expected to be placed in the docker context before this is run (see the make image).
COPY --chown=dpk:root data-processing-lib-python/ data-processing-lib-python/
RUN cd data-processing-lib-python && pip install --no-cache-dir -e .

COPY --chown=dpk:root src/ src/
COPY --chown=dpk:root pyproject.toml pyproject.toml
COPY --chown=dpk:root README.md README.md

RUN pip install --no-cache-dir -e .

# copy source data
COPY ./src/profiler_transform_python.py .
COPY ./src/profiler_local.py local/

# copy test
COPY test/ test/
COPY test-data/ test-data/

# Set environment
ENV PYTHONPATH /home/dpk

# Put these at the end since they seem to upset the docker cache.
ARG BUILD_DATE
ARG GIT_COMMIT
LABEL build-date=$BUILD_DATE
LABEL git-commit=$GIT_COMMIT
58 changes: 58 additions & 0 deletions transforms/universal/profiler/python/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
# Define the root of the local git clone for the common rules to be able
# know where they are running from.
REPOROOT=../../../..
# Include a library of common .transform.* targets which most
# transforms should be able to reuse. However, feel free
# to override/redefine the rules below.

# $(REPOROOT)/.make.versions file contains the versions

TRANSFORM_NAME=profiler

include $(REPOROOT)/transforms/.make.transforms

venv:: .transforms.python-venv

test:: .transforms.python-test

clean:: .transforms.clean

image:: .transforms.python-image

test-src:: .transforms.test-src

setup:: .transforms.setup

build:: build-dist image

publish: publish-image

publish-image:: .transforms.publish-image-python

setup:: .transforms.setup

# distribution versions is the same as image version.
set-versions:
$(MAKE) TRANSFORM_PYTHON_VERSION=$(PROFILER_PYTHON_VERSION) TOML_VERSION=$(PROFILER_PYTHON_VERSION) .transforms.set-versions

build-dist:: .defaults.build-dist

publish-dist:: .defaults.publish-dist

test-image:: .transforms.python-test-image

run-cli-sample: .transforms.run-cli-python-sample

run-local-sample: .transforms.run-local-sample

run-local-python-sample: .transforms.run-local-python-sample

#run-s3-ray-sample: .transforms.run-s3-ray-sample

minio-start: .minio-start

kind-load-image:: .transforms.kind-load-image

docker-load-image: .defaults.docker-load-image

docker-save-image: .defaults.docker-save-image
82 changes: 82 additions & 0 deletions transforms/universal/profiler/python/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
# Profiler

Please see the set of
[transform project conventions](../../../README.md)
for details on general project conventions, transform configuration,
testing and IDE set up.

## Summary

Profiler implement a word count. Typical implementation of the word count is done using map reduce.
* It’s O(N2) complexity
* shuffling with lots of data movement

Implementation here is using “streaming” aggregation, based on central cache:

* At the heart of the implementation is a cache of partial word counts, implemented as a set of Ray actors and containing
word counts processed so far.
* Individual data processors are responsible for:
* Reading data from data plane
* tokenizing documents (we use pluggable tokenizer)
* Coordinating with distributed cache to collect overall word counts

The complication of mapping this model to transform model is the fact that implementation requires an aggregators cache,
that transform mode knows nothing about. The solution here is to use transform runtime to create cache
and pass it as a parameter to transforms.

## Transform runtime

[Transform runtime](src/profiler_transform_ray.py) is responsible for creation cache actors and sending their
handles to the transforms themselves
Additionally it writes created word counts to the data storage (as .csv files) and enhances statistics information with the information about cache size and utilization

## Configuration and command line Options

The set of dictionary keys holding [ProfilerTransform](src/profiler_transform_python.py)
configuration for values are as follows:

* _doc_column_ - specifies name of the column containing documents

## Running

### Launched Command Line Options
When running the transform with the Python launcher (i.e. TransformLauncher),
the following command line arguments are available in addition to
[the options provided by the launcher](../../../../data-processing-lib/doc/python-launcher-options.md).

```shell
--profiler_doc_column PROFILER_DOC_COLUMN
key for accessing data
```

These correspond to the configuration keys described above.

### Running the samples
To run the samples, use the following `make` targets

* `run-cli-sample` - runs src/ededup_transform_ray.py using command line args
* `run-local-sample` - runs src/ededup_local_ray.py
* `run-s3-sample` - runs src/ededup_s3_ray.py
* Requires prior installation of minio, depending on your platform (e.g., from [here](https://min.io/docs/minio/macos/index.html)
and [here](https://min.io/docs/minio/linux/index.html)
and invocation of `make minio-start` to load data into local minio for S3 access.

These targets will activate the virtual environment and set up any configuration needed.
Use the `-n` option of `make` to see the detail of what is done to run the sample.

For example,
```shell
make run-cli-sample
...
```
Then
```shell
ls output
```
To see results of the transform.

### Transforming data using the transform image

To use the transform image to transform your data, please refer to the
[running images quickstart](../../../../doc/quick-start/run-transform-image.md),
substituting the name of this transform image and runtime as appropriate.
46 changes: 46 additions & 0 deletions transforms/universal/profiler/python/pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
[project]
name = "dpk_profiler_transform_python"
version = "0.2.2.dev0"
requires-python = ">=3.10"
description = "profiler Python Transform"
license = {text = "Apache-2.0"}
readme = {file = "README.md", content-type = "text/markdown"}
authors = [
{ name = "Boris Lublinsky", email = "[email protected]" },
]
dependencies = [
"data-prep-toolkit==0.2.2.dev0",
"mmh3==4.1.0",
"xxhash==3.4.1",
]

[build-system]
requires = ["setuptools>=68.0.0", "wheel", "setuptools_scm[toml]>=7.1.0"]
build-backend = "setuptools.build_meta"

[project.optional-dependencies]
dev = [
"twine",
"pytest>=7.3.2",
"pytest-dotenv>=0.5.2",
"pytest-env>=1.0.0",
"pre-commit>=3.3.2",
"pytest-cov>=4.1.0",
"pytest-mock>=3.10.0",
"moto==5.0.5",
"markupsafe==2.0.1",
]

[options]
package_dir = ["src","test"]

[options.packages.find]
where = ["src/"]

[tool.pytest.ini_options]
# Currently we use low coverage since we have to run tests separately (see makefile)
#addopts = "--cov --cov-report term-missing --cov-fail-under 25"
markers = ["unit: unit tests", "integration: integration tests"]

[tool.coverage.run]
include = ["src/*"]
Loading

0 comments on commit fdabeaf

Please sign in to comment.