From b05ab175200ad790bed5f64f9358012f9f80e06c Mon Sep 17 00:00:00 2001 From: Eugene Kulak Date: Tue, 20 Jul 2021 01:26:59 +0300 Subject: [PATCH] =?UTF-8?q?=F0=9F=90=9B=20Source=20Instagram:=20Read=20pre?= =?UTF-8?q?vious=20state=20format=20and=20upgrade=20it=20(#4805)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * few fixes for user_insights state * support old state format * format * bump Co-authored-by: Eugene Kulak --- .../6acf6b55-4f1e-4fca-944e-1a3caef8aba8.json | 2 +- .../resources/seed/source_definitions.yaml | 2 +- .../connectors/source-instagram/Dockerfile | 2 +- .../source_instagram/source.py | 16 +++++++-- .../source_instagram/streams.py | 34 +++++++++++++++---- docs/integrations/sources/instagram.md | 1 + 6 files changed, 45 insertions(+), 12 deletions(-) diff --git a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/6acf6b55-4f1e-4fca-944e-1a3caef8aba8.json b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/6acf6b55-4f1e-4fca-944e-1a3caef8aba8.json index e7f417554cd8..fd0329ee9269 100644 --- a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/6acf6b55-4f1e-4fca-944e-1a3caef8aba8.json +++ b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/6acf6b55-4f1e-4fca-944e-1a3caef8aba8.json @@ -2,6 +2,6 @@ "sourceDefinitionId": "6acf6b55-4f1e-4fca-944e-1a3caef8aba8", "name": "Instagram", "dockerRepository": "airbyte/source-instagram", - "dockerImageTag": "0.1.6", + "dockerImageTag": "0.1.7", "documentationUrl": "https://hub.docker.com/r/airbyte/source-instagram" } diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index 7c2fa0bdfd33..815295fd9d2f 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -255,7 +255,7 @@ - sourceDefinitionId: 6acf6b55-4f1e-4fca-944e-1a3caef8aba8 name: Instagram dockerRepository: airbyte/source-instagram - dockerImageTag: 0.1.6 + dockerImageTag: 0.1.7 documentationUrl: https://hub.docker.com/r/airbyte/source-instagram - sourceDefinitionId: 5e6175e5-68e1-4c17-bff9-56103bbb0d80 name: Gitlab diff --git a/airbyte-integrations/connectors/source-instagram/Dockerfile b/airbyte-integrations/connectors/source-instagram/Dockerfile index 567da436c800..ed96dda5c727 100644 --- a/airbyte-integrations/connectors/source-instagram/Dockerfile +++ b/airbyte-integrations/connectors/source-instagram/Dockerfile @@ -12,5 +12,5 @@ RUN pip install . ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py" ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] -LABEL io.airbyte.version=0.1.6 +LABEL io.airbyte.version=0.1.7 LABEL io.airbyte.name=airbyte/source-instagram diff --git a/airbyte-integrations/connectors/source-instagram/source_instagram/source.py b/airbyte-integrations/connectors/source-instagram/source_instagram/source.py index 87883faf1d60..c3cd24fbacea 100644 --- a/airbyte-integrations/connectors/source-instagram/source_instagram/source.py +++ b/airbyte-integrations/connectors/source-instagram/source_instagram/source.py @@ -23,9 +23,10 @@ # from datetime import datetime -from typing import Any, List, Mapping, Tuple, Type +from typing import Any, Iterator, List, Mapping, MutableMapping, Tuple -from airbyte_cdk.models import ConnectorSpecification, DestinationSyncMode +from airbyte_cdk import AirbyteLogger +from airbyte_cdk.models import AirbyteMessage, ConfiguredAirbyteCatalog, ConnectorSpecification, DestinationSyncMode from airbyte_cdk.sources import AbstractSource from airbyte_cdk.sources.streams import Stream from pydantic import BaseModel, Field @@ -70,7 +71,16 @@ def check_connection(self, logger, config: Mapping[str, Any]) -> Tuple[bool, Any return ok, error_msg - def streams(self, config: Mapping[str, Any]) -> List[Type[Stream]]: + def read( + self, logger: AirbyteLogger, config: Mapping[str, Any], catalog: ConfiguredAirbyteCatalog, state: MutableMapping[str, Any] = None + ) -> Iterator[AirbyteMessage]: + for stream in self.streams(config): + state_key = str(stream.name) + if state_key in state and hasattr(stream, "upgrade_state_to_latest_format"): + state[state_key] = stream.upgrade_state_to_latest_format(state[state_key]) + return super().read(logger, config, catalog, state) + + def streams(self, config: Mapping[str, Any]) -> List[Stream]: """Discovery method, returns available streams :param config: A Mapping of the user input configuration as defined in the connector spec. diff --git a/airbyte-integrations/connectors/source-instagram/source_instagram/streams.py b/airbyte-integrations/connectors/source-instagram/source_instagram/streams.py index 8a402a035320..a97f0846e97b 100644 --- a/airbyte-integrations/connectors/source-instagram/source_instagram/streams.py +++ b/airbyte-integrations/connectors/source-instagram/source_instagram/streams.py @@ -22,6 +22,7 @@ # SOFTWARE. # +import copy from abc import ABC from datetime import datetime from typing import Any, Iterable, List, Mapping, MutableMapping, Optional @@ -54,6 +55,10 @@ def fields(self) -> List[str]: fields = list(self.get_json_schema().get("properties", {}).keys()) return list(set(fields) - set(non_object_fields)) + def upgrade_state_to_latest_format(self, state: MutableMapping[str, Any]) -> MutableMapping[str, Any]: + """Upgrade state to latest format and return new state object""" + return copy.deepcopy(state) + def request_params( self, stream_slice: Mapping[str, Any] = None, @@ -238,7 +243,7 @@ def stream_slices( start_date = pendulum.parse(state_value) if state_value else self._start_date start_date = max(start_date, self._start_date, pendulum.now().subtract(days=self.buffer_days)) for since in pendulum.period(start_date, self._end_date).range("days", self.days_increment): - until = min(since.add(days=self.days_increment), self._end_date) + until = since.add(days=self.days_increment) self.logger.info(f"Reading insights between {since.date()} and {until.date()}") yield { **stream_slice, @@ -259,17 +264,34 @@ def request_params( "until": stream_slice["until"], } + def _state_has_legacy_format(self, state: Mapping[str, Any]) -> bool: + """Tell if the format of state is outdated""" + for value in state.values(): + if not isinstance(value, Mapping): + return True + return False + + def upgrade_state_to_latest_format(self, state: MutableMapping[str, Any]) -> MutableMapping[str, Any]: + """Upgrade state to latest format and return new state object""" + if self._state_has_legacy_format(state): + self.logger.info(f"The {self.name} state has old format, converting...") + return {account_id: {self.cursor_field: str(cursor_value)} for account_id, cursor_value in state.items()} + + return super().upgrade_state_to_latest_format(state) + def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]): """Update stream state from latest record""" record_value = latest_record[self.cursor_field] - state_value = current_stream_state.get("business_account_id", {}).get(self.cursor_field) or record_value + account_id = latest_record.get("business_account_id") + state_value = current_stream_state.get(account_id, {}).get(self.cursor_field) or record_value max_cursor = max(pendulum.parse(state_value), pendulum.parse(record_value)) - current_stream_state[latest_record["business_account_id"]] = { + new_stream_state = copy.deepcopy(current_stream_state) + new_stream_state[account_id] = { self.cursor_field: str(max_cursor), } - return current_stream_state + return new_stream_state class Media(InstagramStream): @@ -356,8 +378,8 @@ def _get_insights(self, item, account_id) -> Optional[MutableMapping[str, Any]]: # An error might occur if the media was posted before the most recent time that # the user's account was converted to a business account from a personal account if error.api_error_subcode() == 2108006: - self.logger.error(f"Insights error for business_account_id {account_id}: {error.api_error_message()}") - + details = error.body().get("error", {}).get("error_user_title") or error.api_error_message() + self.logger.error(f"Insights error for business_account_id {account_id}: {details}") # We receive all Media starting from the last one, and if on the next Media we get an Insight error, # then no reason to make inquiries for each Media further, since they were published even earlier. return None diff --git a/docs/integrations/sources/instagram.md b/docs/integrations/sources/instagram.md index ff08ee3f9a2a..456e67c61ce5 100644 --- a/docs/integrations/sources/instagram.md +++ b/docs/integrations/sources/instagram.md @@ -83,4 +83,5 @@ With the Instagram Account ID and API access token, you should be ready to start | Version | Date | Pull Request | Subject | | :------ | :-------- | :----- | :------ | +| 0.1.7 | 2021-07-19 | [4805](https://github.com/airbytehq/airbyte/pull/4805) | Add support for previous format of STATE.| | 0.1.6 | 2021-07-07 | [4210](https://github.com/airbytehq/airbyte/pull/4210) | Refactor connector to use CDK:
- improve error handling.
- fix sync fail with HTTP status 400.
- integrate SAT.|