Skip to content

Commit

Permalink
🎉 Source Github: break point added for workflows_runs stream (airbyte…
Browse files Browse the repository at this point in the history
…hq#13926)

Signed-off-by: Sergey Chvalyuk <[email protected]>
  • Loading branch information
grubberr authored Jun 22, 2022
1 parent 7cd02b0 commit f69a78c
Show file tree
Hide file tree
Showing 6 changed files with 162 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@
- name: GitHub
sourceDefinitionId: ef69ef6e-aa7f-4af1-a01d-ef775033524e
dockerRepository: airbyte/source-github
dockerImageTag: 0.2.35
dockerImageTag: 0.2.36
documentationUrl: https://docs.airbyte.io/integrations/sources/github
icon: github.svg
sourceType: api
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2620,7 +2620,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-github:0.2.35"
- dockerImage: "airbyte/source-github:0.2.36"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/sources/github"
connectionSpecification:
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-github/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@ RUN pip install .
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.2.35
LABEL io.airbyte.version=0.2.36
LABEL io.airbyte.name=airbyte/source-github
Original file line number Diff line number Diff line change
Expand Up @@ -1106,13 +1106,16 @@ def convert_cursor_value(self, value):

class WorkflowRuns(SemiIncrementalMixin, GithubStream):
"""
Get all workflows of a GitHub repository
Get all workflow runs for a GitHub repository
API documentation: https://docs.github.com/en/rest/actions/workflow-runs#list-workflow-runs-for-a-repository
"""

# key for accessing slice value from record
record_slice_key = ["repository", "full_name"]

# https://docs.github.com/en/actions/managing-workflow-runs/re-running-workflows-and-jobs
re_run_period = 32 # days

def path(self, stream_slice: Mapping[str, Any] = None, **kwargs) -> str:
return f"repos/{stream_slice['repository']}/actions/runs"

Expand All @@ -1121,6 +1124,31 @@ def parse_response(self, response: requests.Response, stream_slice: Mapping[str,
for record in response:
yield record

def read_records(
self,
sync_mode: SyncMode,
cursor_field: List[str] = None,
stream_slice: Mapping[str, Any] = None,
stream_state: Mapping[str, Any] = None,
) -> Iterable[Mapping[str, Any]]:
# Records in the workflows_runs stream are naturally descending sorted by `created_at` field.
# On first sight this is not big deal because cursor_field is `updated_at`.
# But we still can use `created_at` as a breakpoint because after 30 days period
# https://docs.github.com/en/actions/managing-workflow-runs/re-running-workflows-and-jobs
# workflows_runs records cannot be updated. It means if we initially fully synced stream on subsequent incremental sync we need
# only to look behind on 30 days to find all records which were updated.
start_point = self.get_starting_point(stream_state=stream_state, stream_slice=stream_slice)
break_point = (pendulum.parse(start_point) - pendulum.duration(days=self.re_run_period)).to_iso8601_string()
for record in super(SemiIncrementalMixin, self).read_records(
sync_mode=sync_mode, cursor_field=cursor_field, stream_slice=stream_slice, stream_state=stream_state
):
cursor_value = record[self.cursor_field]
created_at = record["created_at"]
if cursor_value > start_point:
yield record
if created_at < break_point:
break


class TeamMembers(GithubStream):
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import responses
from airbyte_cdk.sources.streams.http.exceptions import BaseBackoffException
from responses import matchers
from source_github import streams
from source_github.streams import (
Branches,
Collaborators,
Expand Down Expand Up @@ -37,6 +38,7 @@
TeamMemberships,
Teams,
Users,
WorkflowRuns,
)

from .utils import ProjectsResponsesAPI, read_full_refresh, read_incremental, urlbase
Expand Down Expand Up @@ -949,3 +951,122 @@ def test_stream_commit_comment_reactions_incremental_read():
{"id": 154935432, "comment_id": 55538826, "created_at": "2022-02-01T16:00:00Z", "repository": "airbytehq/integration-test"},
{"id": 154935433, "comment_id": 55538827, "created_at": "2022-02-01T17:00:00Z", "repository": "airbytehq/integration-test"},
]


@responses.activate
def test_stream_workflow_runs_read_incremental(monkeypatch):

repository_args_with_start_date = {
"repositories": ["org/repos"],
"page_size_for_large_streams": 30,
"start_date": "2022-01-01T00:00:00Z",
}

monkeypatch.setattr(streams, "DEFAULT_PAGE_SIZE", 1)
stream = WorkflowRuns(**repository_args_with_start_date)

data = [
{"id": 4, "created_at": "2022-02-05T00:00:00Z", "updated_at": "2022-02-05T00:00:00Z", "repository": {"full_name": "org/repos"}},
{"id": 3, "created_at": "2022-01-15T00:00:00Z", "updated_at": "2022-01-15T00:00:00Z", "repository": {"full_name": "org/repos"}},
{"id": 2, "created_at": "2022-01-03T00:00:00Z", "updated_at": "2022-01-03T00:00:00Z", "repository": {"full_name": "org/repos"}},
{"id": 1, "created_at": "2022-01-02T00:00:00Z", "updated_at": "2022-01-02T00:00:00Z", "repository": {"full_name": "org/repos"}},
]

responses.add(
"GET",
"https://api.github.com/repos/org/repos/actions/runs",
json={"total_count": len(data), "workflow_runs": data[0:1]},
headers={"Link": '<https://api.github.com/repositories/283046497/actions/runs?per_page=1&page=2>; rel="next"'},
match=[matchers.query_param_matcher({"per_page": "1"}, strict_match=True)],
)

responses.add(
"GET",
"https://api.github.com/repos/org/repos/actions/runs",
json={"total_count": len(data), "workflow_runs": data[1:2]},
headers={"Link": '<https://api.github.com/repositories/283046497/actions/runs?per_page=1&page=3>; rel="next"'},
match=[matchers.query_param_matcher({"per_page": "1", "page": "2"}, strict_match=True)],
)

responses.add(
"GET",
"https://api.github.com/repos/org/repos/actions/runs",
json={"total_count": len(data), "workflow_runs": data[2:3]},
headers={"Link": '<https://api.github.com/repositories/283046497/actions/runs?per_page=1&page=4>; rel="next"'},
match=[matchers.query_param_matcher({"per_page": "1", "page": "3"}, strict_match=True)],
)

responses.add(
"GET",
"https://api.github.com/repos/org/repos/actions/runs",
json={"total_count": len(data), "workflow_runs": data[3:4]},
match=[matchers.query_param_matcher({"per_page": "1", "page": "4"}, strict_match=True)],
)

state = {}
records = read_incremental(stream, state)
assert state == {"org/repos": {"updated_at": "2022-02-05T00:00:00Z"}}

assert records == [
{"id": 4, "repository": {"full_name": "org/repos"}, "created_at": "2022-02-05T00:00:00Z", "updated_at": "2022-02-05T00:00:00Z"},
{"id": 3, "repository": {"full_name": "org/repos"}, "created_at": "2022-01-15T00:00:00Z", "updated_at": "2022-01-15T00:00:00Z"},
{"id": 2, "repository": {"full_name": "org/repos"}, "created_at": "2022-01-03T00:00:00Z", "updated_at": "2022-01-03T00:00:00Z"},
{"id": 1, "repository": {"full_name": "org/repos"}, "created_at": "2022-01-02T00:00:00Z", "updated_at": "2022-01-02T00:00:00Z"},
]

assert len(responses.calls) == 4

data.insert(
0,
{
"id": 5,
"created_at": "2022-02-07T00:00:00Z",
"updated_at": "2022-02-07T00:00:00Z",
"repository": {"full_name": "org/repos"},
},
)

data[2]["updated_at"] = "2022-02-08T00:00:00Z"

responses.add(
"GET",
"https://api.github.com/repos/org/repos/actions/runs",
json={"total_count": len(data), "workflow_runs": data[0:1]},
headers={"Link": '<https://api.github.com/repositories/283046497/actions/runs?per_page=1&page=2>; rel="next"'},
match=[matchers.query_param_matcher({"per_page": "1"}, strict_match=True)],
)

responses.add(
"GET",
"https://api.github.com/repos/org/repos/actions/runs",
json={"total_count": len(data), "workflow_runs": data[1:2]},
headers={"Link": '<https://api.github.com/repositories/283046497/actions/runs?per_page=1&page=3>; rel="next"'},
match=[matchers.query_param_matcher({"per_page": "1", "page": "2"}, strict_match=True)],
)

responses.add(
"GET",
"https://api.github.com/repos/org/repos/actions/runs",
json={"total_count": len(data), "workflow_runs": data[2:3]},
headers={"Link": '<https://api.github.com/repositories/283046497/actions/runs?per_page=1&page=4>; rel="next"'},
match=[matchers.query_param_matcher({"per_page": "1", "page": "3"}, strict_match=True)],
)

responses.add(
"GET",
"https://api.github.com/repos/org/repos/actions/runs",
json={"total_count": len(data), "workflow_runs": data[3:4]},
headers={"Link": '<https://api.github.com/repositories/283046497/actions/runs?per_page=1&page=5>; rel="next"'},
match=[matchers.query_param_matcher({"per_page": "1", "page": "4"}, strict_match=True)],
)

responses.calls.reset()
records = read_incremental(stream, state)

assert state == {"org/repos": {"updated_at": "2022-02-08T00:00:00Z"}}
assert records == [
{"id": 5, "repository": {"full_name": "org/repos"}, "created_at": "2022-02-07T00:00:00Z", "updated_at": "2022-02-07T00:00:00Z"},
{"id": 3, "repository": {"full_name": "org/repos"}, "created_at": "2022-01-15T00:00:00Z", "updated_at": "2022-02-08T00:00:00Z"},
]

assert len(responses.calls) == 4
13 changes: 9 additions & 4 deletions docs/integrations/sources/github.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ This connector outputs the following incremental streams:
* [Review comments](https://docs.github.com/en/rest/reference/pulls#list-review-comments-in-a-repository)
* [Reviews](https://docs.github.com/en/rest/reference/pulls#list-reviews-for-a-pull-request)
* [Stargazers](https://docs.github.com/en/rest/reference/activity#list-stargazers)
* [WorkflowRuns](https://docs.github.com/en/rest/reference/actions#list-workflow-runs-for-a-repository)
* [WorkflowRuns](https://docs.github.com/en/rest/actions/workflow-runs#list-workflow-runs-for-a-repository)
* [Workflows](https://docs.github.com/en/rest/reference/actions#workflows)

### Notes
Expand All @@ -99,12 +99,16 @@ This connector outputs the following incremental streams:
* read only new records;
* output only new records.

2. Other 20 incremental streams are also incremental but with one difference, they:
2. Stream `workflow_runs` is almost pure incremental:
* read new records and some portion of old records (in past 30 days) [docs](https://docs.github.com/en/actions/managing-workflow-runs/re-running-workflows-and-jobs);
* output only new records.

3. Other 19 incremental streams are also incremental but with one difference, they:
* read all records;
* output only new records.
Please, consider this behaviour when using those 20 incremental streams because it may affect you API call limits.
Please, consider this behaviour when using those 19 incremental streams because it may affect you API call limits.

3. We are passing few parameters \(`since`, `sort` and `direction`\) to GitHub in order to filter records and sometimes for large streams specifying very distant `start_date` in the past may result in keep on getting error from GitHub instead of records \(respective `WARN` log message will be outputted\). In this case Specifying more recent `start_date` may help.
4. We are passing few parameters \(`since`, `sort` and `direction`\) to GitHub in order to filter records and sometimes for large streams specifying very distant `start_date` in the past may result in keep on getting error from GitHub instead of records \(respective `WARN` log message will be outputted\). In this case Specifying more recent `start_date` may help.
**The "Start date" configuration option does not apply to the streams below, because the GitHub API does not include dates which can be used for filtering:**

* `assignees`
Expand Down Expand Up @@ -137,6 +141,7 @@ The GitHub connector should not run into GitHub API limitations under normal usa

| Version | Date | Pull Request | Subject |
|:--------|:-----------| :--- |:-------------------------------------------------------------------------------------------------------------|
| 0.2.36 | 2022-06-20 | [13926](https://github.com/airbytehq/airbyte/pull/13926) | Break point added for `workflows_runs` stream |
| 0.2.35 | 2022-06-16 | [13763](https://github.com/airbytehq/airbyte/pull/13763) | Use GraphQL for `pull_request_stats` stream |
| 0.2.34 | 2022-06-14 | [13707](https://github.com/airbytehq/airbyte/pull/13707) | Fix API sorting, fix `get_starting_point` caching |
| 0.2.33 | 2022-06-08 | [13558](https://github.com/airbytehq/airbyte/pull/13558) | Enable caching only for parent streams |
Expand Down

0 comments on commit f69a78c

Please sign in to comment.