Skip to content

Commit

Permalink
🐛 Source Instagram: Read previous state format and upgrade it (airbyt…
Browse files Browse the repository at this point in the history
…ehq#4805)

* few fixes for user_insights state

* support old state format

* format

* bump

Co-authored-by: Eugene Kulak <[email protected]>
  • Loading branch information
keu and eugene-kulak authored Jul 19, 2021
1 parent 84aad35 commit b05ab17
Show file tree
Hide file tree
Showing 6 changed files with 45 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"sourceDefinitionId": "6acf6b55-4f1e-4fca-944e-1a3caef8aba8",
"name": "Instagram",
"dockerRepository": "airbyte/source-instagram",
"dockerImageTag": "0.1.6",
"dockerImageTag": "0.1.7",
"documentationUrl": "https://hub.docker.com/r/airbyte/source-instagram"
}
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@
- sourceDefinitionId: 6acf6b55-4f1e-4fca-944e-1a3caef8aba8
name: Instagram
dockerRepository: airbyte/source-instagram
dockerImageTag: 0.1.6
dockerImageTag: 0.1.7
documentationUrl: https://hub.docker.com/r/airbyte/source-instagram
- sourceDefinitionId: 5e6175e5-68e1-4c17-bff9-56103bbb0d80
name: Gitlab
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@ RUN pip install .
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.1.6
LABEL io.airbyte.version=0.1.7
LABEL io.airbyte.name=airbyte/source-instagram
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,10 @@
#

from datetime import datetime
from typing import Any, List, Mapping, Tuple, Type
from typing import Any, Iterator, List, Mapping, MutableMapping, Tuple

from airbyte_cdk.models import ConnectorSpecification, DestinationSyncMode
from airbyte_cdk import AirbyteLogger
from airbyte_cdk.models import AirbyteMessage, ConfiguredAirbyteCatalog, ConnectorSpecification, DestinationSyncMode
from airbyte_cdk.sources import AbstractSource
from airbyte_cdk.sources.streams import Stream
from pydantic import BaseModel, Field
Expand Down Expand Up @@ -70,7 +71,16 @@ def check_connection(self, logger, config: Mapping[str, Any]) -> Tuple[bool, Any

return ok, error_msg

def streams(self, config: Mapping[str, Any]) -> List[Type[Stream]]:
def read(
self, logger: AirbyteLogger, config: Mapping[str, Any], catalog: ConfiguredAirbyteCatalog, state: MutableMapping[str, Any] = None
) -> Iterator[AirbyteMessage]:
for stream in self.streams(config):
state_key = str(stream.name)
if state_key in state and hasattr(stream, "upgrade_state_to_latest_format"):
state[state_key] = stream.upgrade_state_to_latest_format(state[state_key])
return super().read(logger, config, catalog, state)

def streams(self, config: Mapping[str, Any]) -> List[Stream]:
"""Discovery method, returns available streams
:param config: A Mapping of the user input configuration as defined in the connector spec.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
# SOFTWARE.
#

import copy
from abc import ABC
from datetime import datetime
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional
Expand Down Expand Up @@ -54,6 +55,10 @@ def fields(self) -> List[str]:
fields = list(self.get_json_schema().get("properties", {}).keys())
return list(set(fields) - set(non_object_fields))

def upgrade_state_to_latest_format(self, state: MutableMapping[str, Any]) -> MutableMapping[str, Any]:
"""Upgrade state to latest format and return new state object"""
return copy.deepcopy(state)

def request_params(
self,
stream_slice: Mapping[str, Any] = None,
Expand Down Expand Up @@ -238,7 +243,7 @@ def stream_slices(
start_date = pendulum.parse(state_value) if state_value else self._start_date
start_date = max(start_date, self._start_date, pendulum.now().subtract(days=self.buffer_days))
for since in pendulum.period(start_date, self._end_date).range("days", self.days_increment):
until = min(since.add(days=self.days_increment), self._end_date)
until = since.add(days=self.days_increment)
self.logger.info(f"Reading insights between {since.date()} and {until.date()}")
yield {
**stream_slice,
Expand All @@ -259,17 +264,34 @@ def request_params(
"until": stream_slice["until"],
}

def _state_has_legacy_format(self, state: Mapping[str, Any]) -> bool:
"""Tell if the format of state is outdated"""
for value in state.values():
if not isinstance(value, Mapping):
return True
return False

def upgrade_state_to_latest_format(self, state: MutableMapping[str, Any]) -> MutableMapping[str, Any]:
"""Upgrade state to latest format and return new state object"""
if self._state_has_legacy_format(state):
self.logger.info(f"The {self.name} state has old format, converting...")
return {account_id: {self.cursor_field: str(cursor_value)} for account_id, cursor_value in state.items()}

return super().upgrade_state_to_latest_format(state)

def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]):
"""Update stream state from latest record"""
record_value = latest_record[self.cursor_field]
state_value = current_stream_state.get("business_account_id", {}).get(self.cursor_field) or record_value
account_id = latest_record.get("business_account_id")
state_value = current_stream_state.get(account_id, {}).get(self.cursor_field) or record_value
max_cursor = max(pendulum.parse(state_value), pendulum.parse(record_value))

current_stream_state[latest_record["business_account_id"]] = {
new_stream_state = copy.deepcopy(current_stream_state)
new_stream_state[account_id] = {
self.cursor_field: str(max_cursor),
}

return current_stream_state
return new_stream_state


class Media(InstagramStream):
Expand Down Expand Up @@ -356,8 +378,8 @@ def _get_insights(self, item, account_id) -> Optional[MutableMapping[str, Any]]:
# An error might occur if the media was posted before the most recent time that
# the user's account was converted to a business account from a personal account
if error.api_error_subcode() == 2108006:
self.logger.error(f"Insights error for business_account_id {account_id}: {error.api_error_message()}")

details = error.body().get("error", {}).get("error_user_title") or error.api_error_message()
self.logger.error(f"Insights error for business_account_id {account_id}: {details}")
# We receive all Media starting from the last one, and if on the next Media we get an Insight error,
# then no reason to make inquiries for each Media further, since they were published even earlier.
return None
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/instagram.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,4 +83,5 @@ With the Instagram Account ID and API access token, you should be ready to start

| Version | Date | Pull Request | Subject |
| :------ | :-------- | :----- | :------ |
| 0.1.7 | 2021-07-19 | [4805](https://github.com/airbytehq/airbyte/pull/4805) | Add support for previous format of STATE.|
| 0.1.6 | 2021-07-07 | [4210](https://github.com/airbytehq/airbyte/pull/4210) | Refactor connector to use CDK:<br>- improve error handling.<br>- fix sync fail with HTTP status 400.<br>- integrate SAT.|

0 comments on commit b05ab17

Please sign in to comment.