Skip to content

Commit

Permalink
Integrate Sentry for performance and errors tracking. (airbytehq#8248)
Browse files Browse the repository at this point in the history
* Integrate Sentry for performance and errors tracking.

* Add sentry sensitive data scrubbing.

* updated cdk version and changelog

* Integrate Sentry for performance and errors tracking

Add `SENTRY_DSN` environment variable

* Integrate Sentry for performance and errors tracking

Add `sentry_sdk` to install requirements

* format cdk

* enable Sentry for google-search-console

* updated connector version

* update spec and source yamls

Co-authored-by: auganbay <[email protected]>
Co-authored-by: Sergei Solonitcyn <[email protected]>
  • Loading branch information
3 people authored Dec 21, 2021
1 parent f0a9945 commit fe954c1
Show file tree
Hide file tree
Showing 15 changed files with 568 additions and 50 deletions.
3 changes: 3 additions & 0 deletions .env
Original file line number Diff line number Diff line change
Expand Up @@ -95,3 +95,6 @@ STATE_STORAGE_MINIO_BUCKET_NAME=
STATE_STORAGE_MINIO_ENDPOINT=

STATE_STORAGE_GCS_BUCKET_NAME=

# Sentry
SENTRY_DSN="https://[email protected]/6102835"
4 changes: 4 additions & 0 deletions .env.dev
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,7 @@ API_URL=/api/v1/
INTERNAL_API_HOST=airbyte-server:8001
SYNC_JOB_MAX_ATTEMPTS=3
SYNC_JOB_MAX_TIMEOUT_DAYS=3

# Sentry
SENTRY_DSN=""

3 changes: 3 additions & 0 deletions airbyte-cdk/python/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Changelog

## 0.1.45
Integrate Sentry for performance and errors tracking.

## 0.1.44
Log http response status code and its content.

Expand Down
19 changes: 15 additions & 4 deletions airbyte-cdk/python/airbyte_cdk/entrypoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,14 @@
import os.path
import sys
import tempfile
from typing import Iterable, List
from typing import Any, Dict, Iterable, List

from airbyte_cdk.logger import AirbyteLogFormatter, init_logger
from airbyte_cdk.models import AirbyteMessage, Status, Type
from airbyte_cdk.models.airbyte_protocol import ConnectorSpecification
from airbyte_cdk.sources import Source
from airbyte_cdk.sources.utils.schema_helpers import check_config_against_spec_or_exit, split_config
from airbyte_cdk.sources.utils.schema_helpers import check_config_against_spec_or_exit, get_secret_values, split_config
from airbyte_cdk.sources.utils.sentry import AirbyteSentry
from airbyte_cdk.utils.airbyte_secrets_utils import get_secrets

logger = init_logger("airbyte")
Expand Down Expand Up @@ -59,14 +61,23 @@ def parse_args(args: List[str]) -> argparse.Namespace:

return main_parser.parse_args(args)

def configure_sentry(self, spec_schema: Dict[str, Any], parsed_args: argparse.Namespace):
secret_values = []
if "config" in parsed_args:
config = self.source.read_config(parsed_args.config)
secret_values = get_secret_values(spec_schema, config)
source_name = self.source.__module__.split(".")[0]
source_name = source_name.split("_", 1)[-1]
AirbyteSentry.init(source_tag=source_name, secret_values=secret_values)

def run(self, parsed_args: argparse.Namespace) -> Iterable[str]:
cmd = parsed_args.command
if not cmd:
raise Exception("No command passed")

# todo: add try catch for exceptions with different exit codes
source_spec = self.source.spec(self.logger)

source_spec: ConnectorSpecification = self.source.spec(self.logger)
self.configure_sentry(source_spec.connectionSpecification, parsed_args)
with tempfile.TemporaryDirectory() as temp_dir:
if cmd == "spec":
message = AirbyteMessage(type=Type.SPEC, spec=source_spec)
Expand Down
70 changes: 40 additions & 30 deletions airbyte-cdk/python/airbyte_cdk/sources/streams/http/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#


import logging
import os
from abc import ABC, abstractmethod
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Union
Expand All @@ -13,6 +14,7 @@
import vcr.cassette as Cassette
from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources.streams.core import Stream
from airbyte_cdk.sources.utils.sentry import AirbyteSentry
from requests.auth import AuthBase

from .auth.core import HttpAuthenticator, NoAuth
Expand All @@ -22,6 +24,8 @@
# list of all possible HTTP methods which can be used for sending of request bodies
BODY_REQUEST_METHODS = ("POST", "PUT", "PATCH")

logging.getLogger("vcr").setLevel(logging.ERROR)


class HttpStream(Stream, ABC):
"""
Expand Down Expand Up @@ -272,7 +276,9 @@ def _send(self, request: requests.PreparedRequest, request_kwargs: Mapping[str,
Unexpected transient exceptions use the default backoff parameters.
Unexpected persistent exceptions are not handled and will cause the sync to fail.
"""
response: requests.Response = self._session.send(request, **request_kwargs)
AirbyteSentry.add_breadcrumb(message=f"Issue {request.url}", data=request_kwargs)
with AirbyteSentry.start_transaction_span(op="_send", description=request.url):
response: requests.Response = self._session.send(request, **request_kwargs)

if self.should_retry(response):
custom_backoff_time = self.backoff_time(response)
Expand Down Expand Up @@ -313,10 +319,12 @@ def _send_request(self, request: requests.PreparedRequest, request_kwargs: Mappi
"""
if max_tries is not None:
max_tries = max(0, max_tries) + 1
AirbyteSentry.set_context("request", {"url": request.url, "headers": request.headers, "args": request_kwargs})

user_backoff_handler = user_defined_backoff_handler(max_tries=max_tries)(self._send)
backoff_handler = default_backoff_handler(max_tries=max_tries, factor=self.retry_factor)
return backoff_handler(user_backoff_handler)(request, request_kwargs)
with AirbyteSentry.start_transaction_span(op="_send_request"):
user_backoff_handler = user_defined_backoff_handler(max_tries=max_tries)(self._send)
backoff_handler = default_backoff_handler(max_tries=max_tries, factor=self.retry_factor)
return backoff_handler(user_backoff_handler)(request, request_kwargs)

def read_records(
self,
Expand All @@ -329,36 +337,38 @@ def read_records(
pagination_complete = False

next_page_token = None
while not pagination_complete:
request_headers = self.request_headers(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token)
request = self._create_prepared_request(
path=self.path(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token),
headers=dict(request_headers, **self.authenticator.get_auth_header()),
params=self.request_params(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token),
json=self.request_body_json(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token),
data=self.request_body_data(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token),
)
request_kwargs = self.request_kwargs(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token)

if self.use_cache:
# use context manager to handle and store cassette metadata
with self.cache_file as cass:
self.cassete = cass
# vcr tries to find records based on the request, if such records exist, return from cache file
# else make a request and save record in cache file
response = self._send_request(request, request_kwargs)
with AirbyteSentry.start_transaction("read_records", self.name), AirbyteSentry.start_transaction_span("read_records"):
while not pagination_complete:
request_headers = self.request_headers(
stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token
)
request = self._create_prepared_request(
path=self.path(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token),
headers=dict(request_headers, **self.authenticator.get_auth_header()),
params=self.request_params(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token),
json=self.request_body_json(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token),
data=self.request_body_data(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token),
)
request_kwargs = self.request_kwargs(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token)

else:
response = self._send_request(request, request_kwargs)
if self.use_cache:
# use context manager to handle and store cassette metadata
with self.cache_file as cass:
self.cassete = cass
# vcr tries to find records based on the request, if such records exist, return from cache file
# else make a request and save record in cache file
response = self._send_request(request, request_kwargs)

yield from self.parse_response(response, stream_state=stream_state, stream_slice=stream_slice)
else:
response = self._send_request(request, request_kwargs)
yield from self.parse_response(response, stream_state=stream_state, stream_slice=stream_slice)

next_page_token = self.next_page_token(response)
if not next_page_token:
pagination_complete = True
next_page_token = self.next_page_token(response)
if not next_page_token:
pagination_complete = True

# Always return an empty generator just in case no records were ever yielded
yield from []
# Always return an empty generator just in case no records were ever yielded
yield from []


class HttpSubStream(HttpStream, ABC):
Expand Down
32 changes: 31 additions & 1 deletion airbyte-cdk/python/airbyte_cdk/sources/utils/schema_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@
import json
import os
import pkgutil
from typing import Any, ClassVar, Dict, List, Mapping, MutableMapping, Optional, Tuple, Union
from typing import Any, ClassVar, Dict, List, Mapping, MutableMapping, Optional, Set, Tuple, Union

import dpath.util
import jsonref
from airbyte_cdk.models import ConnectorSpecification
from jsonschema import RefResolver, validate
Expand Down Expand Up @@ -191,3 +192,32 @@ def split_config(config: Mapping[str, Any]) -> Tuple[dict, InternalConfig]:
else:
main_config[k] = v
return main_config, InternalConfig.parse_obj(internal_config)


def get_secret_values(schema: Mapping[str, Any], config: Mapping[str, Any]) -> List[str]:
def get_secret_pathes(schema: Mapping[str, Any]) -> Set[str]:
pathes = set()

def traverse_schema(schema: Any, path: List[str]):
if isinstance(schema, dict):
for k, v in schema.items():
traverse_schema(v, [*path, k])
elif isinstance(schema, list):
for i in schema:
traverse_schema(i, path)
else:
if path[-1] == "airbyte_secret" and schema is True:
path = "/".join([p for p in path[:-1] if p not in ["properties", "oneOf"]])
pathes.add(path)

traverse_schema(schema, [])
return pathes

secret_pathes = get_secret_pathes(schema)
result = []
for path in secret_pathes:
try:
result.append(dpath.util.get(config, path))
except KeyError:
pass
return result
Loading

0 comments on commit fe954c1

Please sign in to comment.