Skip to content

Commit

Permalink
🎉 Source S3: support of Parquet format (airbytehq#5305)
Browse files Browse the repository at this point in the history
* add parquet parser

* add integration tests for partquet formats

* add unit tests for parquet

* update docs and secrets

* fix incorrect import for tests

* add lib pandas for unit tests

* revert changes of foreign connectors

* update secret settings

* fix config values

* Update airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/formats/parquet_spec.py

Co-authored-by: George Claireaux <[email protected]>

* Update airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/formats/parquet_spec.py

Co-authored-by: George Claireaux <[email protected]>

* remove some unused default options

* update tests

* update docs

* bump its version

* fix expected test

Co-authored-by: Maksym Pavlenok <[email protected]>
Co-authored-by: George Claireaux <[email protected]>
  • Loading branch information
3 people authored Sep 4, 2021
1 parent aad2161 commit e5c44e6
Show file tree
Hide file tree
Showing 30 changed files with 1,081 additions and 374 deletions.
1 change: 1 addition & 0 deletions .github/workflows/publish-command.yml
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ jobs:
SOURCE_MARKETO_SINGER_INTEGRATION_TEST_CONFIG: ${{ secrets.SOURCE_MARKETO_SINGER_INTEGRATION_TEST_CONFIG }}
SOURCE_RECURLY_INTEGRATION_TEST_CREDS: ${{ secrets.SOURCE_RECURLY_INTEGRATION_TEST_CREDS }}
SOURCE_S3_TEST_CREDS: ${{ secrets.SOURCE_S3_TEST_CREDS }}
SOURCE_S3_PARQUET_CREDS: ${{ secrets.SOURCE_S3_PARQUET_CREDS }}
SOURCE_SHORTIO_TEST_CREDS: ${{ secrets.SOURCE_SHORTIO_TEST_CREDS }}
SOURCE_STRIPE_CREDS: ${{ secrets.SOURCE_STRIPE_CREDS }}
STRIPE_INTEGRATION_CONNECTED_ACCOUNT_TEST_CREDS: ${{ secrets.STRIPE_INTEGRATION_CONNECTED_ACCOUNT_TEST_CREDS }}
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/test-command.yml
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ jobs:
SOURCE_MARKETO_SINGER_INTEGRATION_TEST_CONFIG: ${{ secrets.SOURCE_MARKETO_SINGER_INTEGRATION_TEST_CONFIG }}
SOURCE_RECURLY_INTEGRATION_TEST_CREDS: ${{ secrets.SOURCE_RECURLY_INTEGRATION_TEST_CREDS }}
SOURCE_S3_TEST_CREDS: ${{ secrets.SOURCE_S3_TEST_CREDS }}
SOURCE_S3_PARQUET_CREDS: ${{ secrets.SOURCE_S3_PARQUET_CREDS }}
SOURCE_SHORTIO_TEST_CREDS: ${{ secrets.SOURCE_SHORTIO_TEST_CREDS }}
SOURCE_STRIPE_CREDS: ${{ secrets.SOURCE_STRIPE_CREDS }}
STRIPE_INTEGRATION_CONNECTED_ACCOUNT_TEST_CREDS: ${{ secrets.STRIPE_INTEGRATION_CONNECTED_ACCOUNT_TEST_CREDS }}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"sourceDefinitionId": "69589781-7828-43c5-9f63-8925b1c1ccc2",
"name": "S3",
"dockerRepository": "airbyte/source-s3",
"dockerImageTag": "0.1.3",
"dockerImageTag": "0.1.4",
"documentationUrl": "https://docs.airbyte.io/integrations/sources/s3"
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@
- sourceDefinitionId: 69589781-7828-43c5-9f63-8925b1c1ccc2
name: S3
dockerRepository: airbyte/source-s3
dockerImageTag: 0.1.3
dockerImageTag: 0.1.4
documentationUrl: https://docs.airbyte.io/integrations/sources/s3
- sourceDefinitionId: fbb5fbe2-16ad-4cf4-af7d-ff9d9c316c87
name: Sendgrid
Expand Down
22 changes: 15 additions & 7 deletions airbyte-integrations/connectors/source-s3/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,16 +1,24 @@
FROM python:3.7-slim
FROM python:3.7-slim as base
FROM base as builder

# Bash is installed for more convenient debugging.
RUN apt-get update && apt-get install -y bash && rm -rf /var/lib/apt/lists/*
RUN apt-get update
WORKDIR /airbyte/integration_code
COPY setup.py ./
RUN pip install --prefix=/install .

FROM base
WORKDIR /airbyte/integration_code
COPY source_s3 ./source_s3
COPY --from=builder /install /usr/local

COPY main.py ./
COPY setup.py ./
RUN pip install .
COPY source_s3 ./source_s3


ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.1.3
LABEL io.airbyte.version=0.1.4
LABEL io.airbyte.name=airbyte/source-s3



Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,48 @@ tests:
spec:
- spec_path: "integration_tests/spec.json"
connection:
# for CSV format
- config_path: "secrets/config.json"
status: "succeed"
# for Parquet format
- config_path: "secrets/parquet_config.json"
status: "succeed"
- config_path: "integration_tests/invalid_config.json"
status: "failed"
discovery:
# for CSV format
- config_path: "secrets/config.json"
# for Parquet format
- config_path: "secrets/parquet_config.json"
basic_read:
# for CSV format
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
- expect_records:
path: "integration_tests/expected_records.txt"
expect_records:
path: "integration_tests/expected_records.txt"
# for Parquet format
- config_path: "secrets/parquet_config.json"
configured_catalog_path: "integration_tests/parquet_configured_catalog.json"
expect_records:
path: "integration_tests/parquet_expected_records.txt"
incremental:
# for CSV format
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
cursor_paths:
test: ["_ab_source_file_last_modified"]
future_state_path: "integration_tests/abnormal_state.json"
# for Parquet format
- config_path: "secrets/parquet_config.json"
configured_catalog_path: "integration_tests/parquet_configured_catalog.json"
cursor_paths:
test: ["_ab_source_file_last_modified"]
future_state_path: "integration_tests/abnormal_state.json"

full_refresh:
# for CSV format
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
# for Parquet format
- config_path: "secrets/parquet_config.json"
configured_catalog_path: "integration_tests/parquet_configured_catalog.json"
Original file line number Diff line number Diff line change
@@ -1,4 +1,13 @@
#!/usr/bin/env sh

# Build latest connector image
source_image=$(cat acceptance-test-config.yml | grep "connector_image" | head -n 1 | cut -d: -f2-)
echo "try to build the source image: ${source_image}"
docker build . -t ${source_image}

# Pull latest acctest image
docker pull airbyte/source-acceptance-test:latest

docker run --rm -it \
-v /var/run/docker.sock:/var/run/docker.sock \
-v /tmp:/tmp \
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
"stream": {
"name": "test",
"json_schema": {},
"supported_sync_modes": ["incremental", "full_refresh"],
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"default_cursor_field": ["_ab_source_file_last_modified"]
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,4 @@
{"stream": "test", "data": {"id": 5, "name": "77h4aiMP", "valid": true, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2021-07-25T15:33:04+0000", "_ab_source_file_url": "simple_test.csv"}, "emitted_at": 1627227468000}
{"stream": "test", "data": {"id": 6, "name": "Le35Wyic", "valid": true, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2021-07-25T15:33:04+0000", "_ab_source_file_url": "simple_test.csv"}, "emitted_at": 1627227468000}
{"stream": "test", "data": {"id": 7, "name": "xZhh1Kyl", "valid": false, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2021-07-25T15:33:04+0000", "_ab_source_file_url": "simple_test.csv"}, "emitted_at": 1627227468000}
{"stream": "test", "data": {"id": 8, "name": "M2t286iJ", "valid": false, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2021-07-25T15:33:04+0000", "_ab_source_file_url": "simple_test.csv"}, "emitted_at": 1627227468000}
{"stream": "test", "data": {"id": 8, "name": "M2t286iJ", "valid": false, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2021-07-25T15:33:04+0000", "_ab_source_file_url": "simple_test.csv"}, "emitted_at": 1627227468000}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import pytest
from airbyte_cdk.logger import AirbyteLogger
from airbyte_cdk.models import SyncMode
from source_s3.source_files_abstract.fileformatparser import CsvParser
from source_s3.source_files_abstract.formats.csv_parser import CsvParser
from source_s3.source_files_abstract.stream import FileStream

HERE = Path(__file__).resolve().parent
Expand Down Expand Up @@ -136,15 +136,17 @@ def _stream_records_test_logic(
fs = self.stream_class("dataset", provider, format, path_pattern, str_user_schema)
LOGGER.info(f"Testing stream_records() in SyncMode:{sync_mode.value}")

assert fs._get_schema_map() == full_expected_schema # check we return correct schema from get_json_schema()
# check we return correct schema from get_json_schema()
assert fs._get_schema_map() == full_expected_schema

records = []
for stream_slice in fs.stream_slices(sync_mode=sync_mode, stream_state=current_state):
if stream_slice is not None:
# we need to do this in order to work out which extra columns (if any) we expect in this stream_slice
expected_columns = []
for file_dict in stream_slice:
file_reader = CsvParser(format) # TODO: if we ever test other filetypes in these tests this will need fixing
# TODO: if we ever test other filetypes in these tests this will need fixing
file_reader = CsvParser(format)
with file_dict["storagefile"].open(file_reader.is_binary) as f:
expected_columns.extend(list(file_reader.get_inferred_schema(f).keys()))
expected_columns = set(expected_columns) # de-dupe
Expand Down Expand Up @@ -550,7 +552,8 @@ def test_stream_records(
state=latest_state,
)
LOGGER.info(f"incremental state: {latest_state}")
time.sleep(1) # small delay to ensure next file gets later last_modified timestamp
# small delay to ensure next file gets later last_modified timestamp
time.sleep(1)
self.teardown_infra(cloud_bucket_name, self.credentials)

except Exception as e:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
{
"streams": [
{
"stream": {
"name": "test",
"json_schema": {},
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"default_cursor_field": ["_ab_source_file_last_modified"]
},
"sync_mode": "incremental",
"destination_sync_mode": "append"
}
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{"stream": "test", "data": {"number": 1.0, "name": "foo", "flag": true, "delta": -1.0, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2021-08-30T15:46:17+0000", "_ab_source_file_url": "simple_test.parquet"}, "emitted_at": 1630795278000}
{"stream": "test", "data": {"number": 2.0, "name": null, "flag": false, "delta": 2.5, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2021-08-30T15:46:17+0000", "_ab_source_file_url": "simple_test.parquet"}, "emitted_at": 1630795278000}
{"stream": "test", "data": {"number": 3.0, "name": "bar", "flag": null, "delta": 0.1, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2021-08-30T15:46:17+0000", "_ab_source_file_url": "simple_test.parquet"}, "emitted_at": 1630795278000}
{"stream": "test", "data": {"number": null, "name": "baz", "flag": true, "delta": null, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2021-08-30T15:46:17+0000", "_ab_source_file_url": "simple_test.parquet"}, "emitted_at": 1630795278000}
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,16 @@
"format": {
"title": "Format",
"default": "csv",
"type": "object",
"oneOf": [
{
"title": "csv",
"description": "This connector utilises <a href=\"https: // arrow.apache.org/docs/python/generated/pyarrow.csv.open_csv.html\" target=\"_blank\">PyArrow (Apache Arrow)</a> for CSV parsing.",
"type": "object",
"properties": {
"filetype": {
"title": "CsvFiletype",
"description": "This connector utilises <a href=\"https://arrow.apache.org/docs/python/generated/pyarrow.csv.open_csv.html\" target=\"_blank\">PyArrow (Apache Arrow)</a> for CSV parsing.",
"enum": ["csv"],
"title": "Filetype",
"const": "csv",
"type": "string"
},
"delimiter": {
Expand Down Expand Up @@ -93,24 +94,41 @@
],
"type": "string"
}
},
"required": ["filetype"]
}
},
{
"title": "Coming Soon...",
"title": "parquet",
"description": "This connector utilises <a href=\"https://arrow.apache.org/docs/python/generated/pyarrow.parquet.ParquetFile.html\" target=\"_blank\">PyArrow (Apache Arrow)</a> for Parquet parsing.",
"type": "object",
"properties": {
"filetype": {
"title": "ParquetFiletype",
"description": "An enumeration.",
"enum": ["parquet"],
"title": "Filetype",
"const": "parquet",
"type": "string"
},
"buffer_size": {
"title": "Buffer Size",
"description": "Perform read buffering when deserializing individual column chunks. By default every group column will be loaded fully to memory. This option can help to optimize a work with memory if your data is particularly wide or failing during detection of OOM errors.",
"default": 0,
"type": "integer"
},
"columns": {
"title": "Columns",
"description": "If you only want to sync a subset of the columns from the file(s), add the columns you want here. Leave it empty to sync all columns.",
"type": "array",
"items": {
"type": "string"
}
},
"batch_size": {
"title": "Batch Size",
"description": "Maximum number of records per batch. Batches may be smaller if there aren’t enough rows in the file. This option can help to optimize a work with memory if your data is particularly wide or failing during detection of OOM errors.",
"default": 65536,
"type": "integer"
}
},
"required": ["filetype"]
}
}
],
"type": "object"
]
},
"provider": {
"title": "S3: Amazon Web Services",
Expand Down
9 changes: 8 additions & 1 deletion airbyte-integrations/connectors/source-s3/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,18 @@

from setuptools import find_packages, setup

MAIN_REQUIREMENTS = ["airbyte-cdk", "pyarrow==4.0.1", "smart-open[s3]==5.1.0", "wcmatch==8.2", "dill==0.3.4"]
MAIN_REQUIREMENTS = [
"airbyte-cdk~=0.1.7",
"pyarrow==4.0.1",
"smart-open[s3]==5.1.0",
"wcmatch==8.2",
"dill==0.3.4",
]

TEST_REQUIREMENTS = [
"pytest~=6.1",
"source-acceptance-test",
"pandas==1.3.1",
]

setup(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ def last_modified(self) -> datetime:
return obj.last_modified
# For some reason, this standard method above doesn't work for public files with no credentials so fall back on below
except NoCredentialsError as nce:
if self.use_aws_account(self._provider): # we don't expect this error if using credentials so throw it
# we don't expect this error if using credentials so throw it
if self.use_aws_account(self._provider):
raise nce
else:
return boto3.client("s3", config=ClientConfig(signature_version=UNSIGNED)).head_object(Bucket=bucket, Key=self.url)[
Expand Down
Loading

0 comments on commit e5c44e6

Please sign in to comment.