Skip to content

Commit

Permalink
🐛 Source Cart: fixing of testing for a lot of data (airbytehq#5465)
Browse files Browse the repository at this point in the history
* add the end_date spec field

* bump version

* Update cart.md

* fix descriptions

* fix descriptions

* handle a data parsing

* correction after merging

* remove end_date from spec

Co-authored-by: Maksym Pavlenok <[email protected]>
  • Loading branch information
antixar and gl-pix authored Aug 26, 2021
1 parent 3630cde commit b0be38f
Show file tree
Hide file tree
Showing 9 changed files with 85 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"sourceDefinitionId": "bb1a6d31-6879-4819-a2bd-3eed299ea8e2",
"name": "Cart.com",
"dockerRepository": "airbyte/source-cart",
"dockerImageTag": "0.1.1",
"dockerImageTag": "0.1.3",
"documentationUrl": "https://docs.airbyte.io/integrations/sources/cart"
}
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,7 @@
- sourceDefinitionId: bb1a6d31-6879-4819-a2bd-3eed299ea8e2
name: Cart.com
dockerRepository: airbyte/source-cart
dockerImageTag: 0.1.1
dockerImageTag: 0.1.3
documentationUrl: https://docs.airbyte.io/integrations/sources/cart
- sourceDefinitionId: d60a46d4-709f-4092-a6b7-2457f7d455f5
name: Prestashop
Expand Down
23 changes: 16 additions & 7 deletions airbyte-integrations/connectors/source-cart/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,16 +1,25 @@
FROM python:3.7-slim
FROM python:3.7.11-alpine3.14 as base
FROM base as builder

# Bash is installed for more convenient debugging.
RUN apt-get update && apt-get install -y bash && rm -rf /var/lib/apt/lists/*

RUN apk --no-cache upgrade \
&& pip install --upgrade pip

WORKDIR /airbyte/integration_code
COPY source_cart ./source_cart
COPY main.py ./
COPY setup.py ./
RUN pip install .
RUN pip install --prefix=/install .


FROM base
COPY --from=builder /install /usr/local

WORKDIR /airbyte/integration_code
COPY main.py ./
COPY source_cart ./source_cart


ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.1.2
LABEL io.airbyte.version=0.1.3
LABEL io.airbyte.name=airbyte/source-cart
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#!/usr/bin/env sh

# Build latest connector image
docker build . -t $(cat acceptance-test-config.yml | grep "connector_image" | head -n 1 | cut -d: -f2)
docker build . -t $(cat acceptance-test-config.yml | grep "connector_image" | head -n 1 | cut -d: -f2-)

# Pull latest acctest image
docker pull airbyte/source-acceptance-test:latest
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-cart/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from setuptools import find_packages, setup

MAIN_REQUIREMENTS = [
"airbyte-cdk",
"airbyte-cdk~=0.1.7",
]

TEST_REQUIREMENTS = [
Expand Down
37 changes: 34 additions & 3 deletions airbyte-integrations/connectors/source-cart/source_cart/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,17 @@
# SOFTWARE.
#

from functools import wraps
from typing import Any, List, Mapping, Tuple

import pendulum
import requests
from airbyte_cdk.logger import AirbyteLogger
from airbyte_cdk import AirbyteLogger
from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources import AbstractSource
from airbyte_cdk.sources.streams import Stream
from airbyte_cdk.sources.streams.http.auth import HttpAuthenticator
from pendulum.parsing.exceptions import ParserError

from .streams import CustomersCart, OrderItems, OrderPayments, Orders, Products

Expand All @@ -44,10 +46,33 @@ def get_auth_header(self) -> Mapping[str, Any]:


class SourceCart(AbstractSource):
def validate_config_values(func):
"""Check input config values for check_connection and stream functions. It will raise an exception if there is an parsing error"""

@wraps(func)
def decorator(self_, *args, **kwargs):
for arg in args:
if isinstance(arg, Mapping):
try:
# parse date strings by the pendulum library. It will raise the exception ParserError if it is some format mistakes.
pendulum.parse(arg["start_date"])
# try to check an end_date value. It can be ussed for different CI tests
end_date = arg.get("end_date")
if end_date:
pendulum.parse(end_date)
except ParserError as e:
raise Exception(f"{str(e)}. Example: 2021-01-01T00:00:00Z")
break

return func(self_, *args, **kwargs)

return decorator

@validate_config_values
def check_connection(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> Tuple[bool, Any]:
try:
authenticator = CustomHeaderAuthenticator(access_token=config["access_token"])
pendulum.parse(config["start_date"])

stream = Products(authenticator=authenticator, start_date=config["start_date"], store_name=config["store_name"])
records = stream.read_records(sync_mode=SyncMode.full_refresh)
next(records)
Expand All @@ -60,7 +85,13 @@ def check_connection(self, logger: AirbyteLogger, config: Mapping[str, Any]) ->
return False, err_message
return False, repr(e)

@validate_config_values
def streams(self, config: Mapping[str, Any]) -> List[Stream]:
authenticator = CustomHeaderAuthenticator(access_token=config["access_token"])
args = {"authenticator": authenticator, "start_date": config["start_date"], "store_name": config["store_name"]}
args = {
"authenticator": authenticator,
"start_date": config["start_date"],
"store_name": config["store_name"],
"end_date": config.get("end_date"),
}
return [CustomersCart(**args), Orders(**args), OrderPayments(**args), OrderItems(**args), Products(**args)]
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,16 @@
"airbyte_secret": true,
"description": "API Key. See the <a href=\"https://docs.airbyte.io/integrations/sources/mailchimp\">docs</a> for information on how to generate this key."
},
"store_name": {
"type": "string",
"description": "Store name. All API URLs start with https://[mystorename.com]/api/v1/, where [mystorename.com] is the domain name of your store."
},
"start_date": {
"title": "Start Date",
"type": "string",
"description": "The date from which you'd like to replicate the data",
"pattern": "^[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}Z$",
"examples": ["2021-01-01T00:00:00Z"]
},
"store_name": {
"type": "string",
"description": "Store name. All API URLs start with https://[mystorename.com]/api/v1/, where [mystorename.com] is the domain name of your store."
}
}
}
Expand Down
50 changes: 26 additions & 24 deletions airbyte-integrations/connectors/source-cart/source_cart/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
#

import urllib.parse
from abc import ABC, abstractmethod
from abc import ABC
from typing import Any, Iterable, Mapping, MutableMapping, Optional

import requests
Expand All @@ -33,8 +33,9 @@
class CartStream(HttpStream, ABC):
primary_key = "id"

def __init__(self, start_date: str, store_name: str, **kwargs):
def __init__(self, start_date: str, store_name: str, end_date: str = None, **kwargs):
self._start_date = start_date
self._end_date = end_date
self.store_name = store_name
super().__init__(**kwargs)

Expand All @@ -43,9 +44,15 @@ def url_base(self) -> str:
return f"https://{self.store_name}/api/v1/"

@property
@abstractmethod
def data_field() -> str:
"""Field of the response containing data"""
def data_field(self) -> str:
"""
Field of the response containing data.
By default the value self.name will be used if this property is empty or None
"""
return None

def path(self, **kwargs) -> str:
return self.name

def backoff_time(self, response: requests.Response) -> Optional[float]:
"""
Expand All @@ -57,6 +64,7 @@ def backoff_time(self, response: requests.Response) -> Optional[float]:

def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
response_json = response.json()

if response_json.get("next_page"):
next_query_string = urllib.parse.urlsplit(response_json.get("next_page")).query
params = dict(urllib.parse.parse_qsl(next_query_string))
Expand All @@ -67,7 +75,7 @@ def request_headers(self, **kwargs) -> Mapping[str, Any]:

def parse_response(self, response: requests.Response, stream_state: Mapping[str, Any], **kwargs) -> Iterable[Mapping]:
response_json = response.json()
result = response_json.get(self.data_field, [])
result = response_json.get(self.data_field or self.name, [])
yield from result

def request_params(
Expand All @@ -85,10 +93,20 @@ class IncrementalCartStream(CartStream, ABC):
cursor_field = "updated_at"

def request_params(self, stream_state: Mapping[str, Any], **kwargs) -> MutableMapping[str, Any]:
"""
Generates a query for incremental logic
Docs: https://developers.cart.com/docs/rest-api/docs/query_syntax.md
"""
params = super().request_params(stream_state=stream_state, **kwargs)
cursor_value = stream_state.get(self.cursor_field) or self._start_date
params["sort"] = self.cursor_field
params[self.cursor_field] = f"gt:{max(cursor_value, self._start_date)}"
start_date = max(cursor_value, self._start_date)
query = f"gt:{start_date}"
if self._end_date and self._end_date > start_date:
query += f" AND lt:{self._end_date}"

params[self.cursor_field] = query
return params

def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]:
Expand All @@ -111,19 +129,14 @@ class CustomersCart(IncrementalCartStream):
data_field = "customers"

def path(self, **kwargs) -> str:
return "customers"
return self.data_field


class Orders(IncrementalCartStream):
"""
Docs: https://developers.cart.com/docs/rest-api/restapi.json/paths/~1orders/get
"""

data_field = "orders"

def path(self, **kwargs) -> str:
return "orders"


class OrderPayments(IncrementalCartStream):
"""
Expand All @@ -132,9 +145,6 @@ class OrderPayments(IncrementalCartStream):

data_field = "payments"

def path(self, **kwargs) -> str:
return "order_payments"


class OrderItems(IncrementalCartStream):
"""
Expand All @@ -143,16 +153,8 @@ class OrderItems(IncrementalCartStream):

data_field = "items"

def path(self, **kwargs) -> str:
return "order_items"


class Products(IncrementalCartStream):
"""
Docs: https://developers.cart.com/docs/rest-api/restapi.json/paths/~1products/get
"""

data_field = "products"

def path(self, **kwargs) -> str:
return "products"
1 change: 1 addition & 0 deletions docs/integrations/sources/cart.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,5 +51,6 @@ Please follow these [steps](https://developers.cart.com/docs/rest-api/docs/READM

| Version | Date | Pull Request | Subject |
| :------ | :-------- | :----- | :------ |
| 0.1.3 | 2021-08-26 | [5465](https://github.com/airbytehq/airbyte/pull/5465) | Add the end_date option for limitation of the amount of synced data|
| 0.1.2 | 2021-08-23 | [1111](https://github.com/airbytehq/airbyte/pull/1111) | Add `order_items` stream |
| 0.1.0 | 2021-06-08 | [4574](https://github.com/airbytehq/airbyte/pull/4574) | Initial Release |

0 comments on commit b0be38f

Please sign in to comment.