Skip to content

Commit

Permalink
CDK: emit AirbyteTraceMessage with exception trace information (air…
Browse files Browse the repository at this point in the history
  • Loading branch information
pedroslopez authored May 6, 2022
1 parent 7024731 commit 73c7fad
Show file tree
Hide file tree
Showing 17 changed files with 320 additions and 72 deletions.
5 changes: 5 additions & 0 deletions airbyte-cdk/python/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
# Changelog

## 0.1.56
- Update protocol models to include `AirbyteTraceMessage`
- Emit an `AirbyteTraceMessage` on uncaught exceptions
- Add `AirbyteTracedException`

## 0.1.55
Add support for reading the spec from a YAML file (`spec.yaml`)

Expand Down
8 changes: 5 additions & 3 deletions airbyte-cdk/python/airbyte_cdk/entrypoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,17 @@
import tempfile
from typing import Any, Dict, Iterable, List

from airbyte_cdk.logger import AirbyteLogFormatter, init_logger
from airbyte_cdk.exception_handler import init_uncaught_exception_handler
from airbyte_cdk.logger import init_logger
from airbyte_cdk.models import AirbyteMessage, Status, Type
from airbyte_cdk.models.airbyte_protocol import ConnectorSpecification
from airbyte_cdk.sources import Source
from airbyte_cdk.sources.utils.schema_helpers import check_config_against_spec_or_exit, get_secret_values, split_config
from airbyte_cdk.sources.utils.sentry import AirbyteSentry
from airbyte_cdk.utils.airbyte_secrets_utils import get_secrets
from airbyte_cdk.utils.airbyte_secrets_utils import get_secrets, update_secrets

logger = init_logger("airbyte")
init_uncaught_exception_handler(logger)


class AirbyteEntrypoint(object):
Expand Down Expand Up @@ -89,7 +91,7 @@ def run(self, parsed_args: argparse.Namespace) -> Iterable[str]:
# Now that we have the config, we can use it to get a list of ai airbyte_secrets
# that we should filter in logging to avoid leaking secrets
config_secrets = get_secrets(self.source, config, self.logger)
AirbyteLogFormatter.update_secrets(config_secrets)
update_secrets(config_secrets)

# Remove internal flags from config before validating so
# jsonschema's additionalProperties flag wont fail the validation
Expand Down
34 changes: 34 additions & 0 deletions airbyte-cdk/python/airbyte_cdk/exception_handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
#
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
#

import logging
import sys

from airbyte_cdk.utils.traced_exception import AirbyteTracedException


def init_uncaught_exception_handler(logger: logging.Logger) -> None:
"""
Handles uncaught exceptions by emitting an AirbyteTraceMessage and making sure they are not
printed to the console without having secrets removed.
"""

def hook_fn(exception_type, exception_value, traceback_):
# For developer ergonomics, we want to see the stack trace in the logs when we do a ctrl-c
if issubclass(exception_type, KeyboardInterrupt):
sys.__excepthook__(exception_type, exception_value, traceback_)
return

logger.fatal(exception_value, exc_info=exception_value)

# emit an AirbyteTraceMessage for any exception that gets to this spot
traced_exc = (
exception_value
if issubclass(exception_type, AirbyteTracedException)
else AirbyteTracedException.from_exception(exception_value)
)

traced_exc.emit_message()

sys.excepthook = hook_fn
31 changes: 3 additions & 28 deletions airbyte-cdk/python/airbyte_cdk/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@

import logging
import logging.config
import sys
import traceback
from typing import List, Tuple
from typing import Tuple

from airbyte_cdk.models import AirbyteLogMessage, AirbyteMessage
from airbyte_cdk.utils.airbyte_secrets_utils import filter_secrets
from deprecated import deprecated

TRACE_LEVEL_NUM = 5
Expand All @@ -32,42 +32,18 @@
}


def init_unhandled_exception_output_filtering(logger: logging.Logger) -> None:
"""
Make sure unhandled exceptions are not printed to the console without passing through the Airbyte logger and having
secrets removed.
"""

def hook_fn(exception_type, exception_value, traceback_):
# For developer ergonomics, we want to see the stack trace in the logs when we do a ctrl-c
if issubclass(exception_type, KeyboardInterrupt):
sys.__excepthook__(exception_type, exception_value, traceback_)
else:
logger.critical(exception_value, exc_info=exception_value)

sys.excepthook = hook_fn


def init_logger(name: str = None):
"""Initial set up of logger"""
logging.addLevelName(TRACE_LEVEL_NUM, "TRACE")
logger = logging.getLogger(name)
logger.setLevel(TRACE_LEVEL_NUM)
logging.config.dictConfig(LOGGING_CONFIG)
init_unhandled_exception_output_filtering(logger)
return logger


class AirbyteLogFormatter(logging.Formatter):
"""Output log records using AirbyteMessage"""

_secrets: List[str] = []

@classmethod
def update_secrets(cls, secrets: List[str]):
"""Update the list of secrets to be replaced in the log message"""
cls._secrets = secrets

# Transforming Python log levels to Airbyte protocol log levels
level_mapping = {
logging.FATAL: "FATAL",
Expand All @@ -82,8 +58,7 @@ def format(self, record: logging.LogRecord) -> str:
"""Return a JSON representation of the log message"""
message = super().format(record)
airbyte_level = self.level_mapping.get(record.levelno, "INFO")
for secret in AirbyteLogFormatter._secrets:
message = message.replace(secret, "****")
message = filter_secrets(message)
log_message = AirbyteMessage(type="LOG", log=AirbyteLogMessage(level=airbyte_level, message=message))
return log_message.json(exclude_unset=True)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,19 @@
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
#


import logging
import sys
import time
from typing import Optional

import backoff
from airbyte_cdk.logger import AirbyteLogger
from requests import codes, exceptions

from .exceptions import DefaultBackoffException, UserDefinedBackoffException

TRANSIENT_EXCEPTIONS = (DefaultBackoffException, exceptions.ConnectTimeout, exceptions.ReadTimeout, exceptions.ConnectionError)

# TODO inject singleton logger?
logger = AirbyteLogger()
logger = logging.getLogger("airbyte")


def default_backoff_handler(max_tries: Optional[int], factor: float, **kwargs):
Expand Down
6 changes: 3 additions & 3 deletions airbyte-cdk/python/airbyte_cdk/sources/utils/transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
#

import logging
from distutils.util import strtobool
from enum import Flag, auto
from typing import Any, Callable, Dict, Mapping, Optional

from airbyte_cdk.logger import AirbyteLogger
from jsonschema import Draft7Validator, validators

logger = AirbyteLogger()
logger = logging.getLogger("airbyte")


class TransformConfig(Flag):
Expand Down Expand Up @@ -174,4 +174,4 @@ def transform(self, record: Dict[str, Any], schema: Mapping[str, Any]):
just calling normalizer.validate() would throw an exception on
first validation occurences and stop processing rest of schema.
"""
logger.warn(e.message)
logger.warning(e.message)
16 changes: 16 additions & 0 deletions airbyte-cdk/python/airbyte_cdk/utils/airbyte_secrets_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,19 @@ def get_secrets(source: Source, config: Mapping[str, Any], logger: logging.Logge
".".join(key.split(".")[:1]) for key, value in flattened_key_values.items() if value and key.endswith("airbyte_secret")
]
return [str(get_value_by_dot_notation(config, key)) for key in secret_key_names if config.get(key)]


__SECRETS_FROM_CONFIG: List[str] = []


def update_secrets(secrets: List[str]):
"""Update the list of secrets to be replaced"""
global __SECRETS_FROM_CONFIG
__SECRETS_FROM_CONFIG = secrets


def filter_secrets(string: str) -> str:
"""Filter secrets from a string by replacing them with ****"""
for secret in __SECRETS_FROM_CONFIG:
string = string.replace(secret, "****")
return string
7 changes: 3 additions & 4 deletions airbyte-cdk/python/airbyte_cdk/utils/event_timing.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,13 @@
#

import datetime
import logging
import time
from contextlib import contextmanager
from dataclasses import dataclass, field
from typing import Optional

from airbyte_cdk.logger import AirbyteLogger

logger = AirbyteLogger()
logger = logging.getLogger("airbyte")


class EventTimer:
Expand Down Expand Up @@ -42,7 +41,7 @@ def finish_event(self):
event = self.stack.pop(0)
event.finish()
else:
logger.warn(f"{self.name} finish_event called without start_event")
logger.warning(f"{self.name} finish_event called without start_event")

def report(self, order_by="name"):
"""
Expand Down
74 changes: 74 additions & 0 deletions airbyte-cdk/python/airbyte_cdk/utils/traced_exception.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
#
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
#

import traceback
from datetime import datetime

from airbyte_cdk.models import AirbyteErrorTraceMessage, AirbyteMessage, AirbyteTraceMessage, FailureType, TraceType
from airbyte_cdk.models import Type as MessageType
from airbyte_cdk.utils.airbyte_secrets_utils import filter_secrets


class AirbyteTracedException(Exception):
"""
An exception that should be emitted as an AirbyteTraceMessage
"""

def __init__(
self,
internal_message: str = None,
message: str = None,
failure_type: FailureType = FailureType.system_error,
exception: BaseException = None,
):
"""
:param internal_message: the internal error that caused the failure
:param message: a user-friendly message that indicates the cause of the error
:param failure_type: the type of error
:param exception: the exception that caused the error, from which the stack trace should be retrieved
"""
self.internal_message = internal_message
self.message = message
self.failure_type = failure_type
self._exception = exception
super().__init__(internal_message)

def as_airbyte_message(self) -> AirbyteMessage:
"""
Builds an AirbyteTraceMessage from the exception
"""
now_millis = datetime.now().timestamp() * 1000.0

trace_exc = self._exception or self
stack_trace_str = "".join(traceback.TracebackException.from_exception(trace_exc).format())

trace_message = AirbyteTraceMessage(
type=TraceType.ERROR,
emitted_at=now_millis,
error=AirbyteErrorTraceMessage(
message=self.message or "Something went wrong in the connector. See the logs for more details.",
internal_message=self.internal_message,
failure_type=self.failure_type,
stack_trace=stack_trace_str,
),
)

return AirbyteMessage(type=MessageType.TRACE, trace=trace_message)

def emit_message(self):
"""
Prints the exception as an AirbyteTraceMessage.
Note that this will be called automatically on uncaught exceptions when using the airbyte_cdk entrypoint.
"""
message = self.as_airbyte_message().json(exclude_unset=True)
filtered_message = filter_secrets(message)
print(filtered_message)

@classmethod
def from_exception(cls, exc: Exception, *args, **kwargs) -> "AirbyteTracedException":
"""
Helper to create an AirbyteTracedException from an existing exception
:param exc: the exception that caused the error
"""
return cls(internal_message=str(exc), exception=exc, *args, **kwargs)
2 changes: 1 addition & 1 deletion airbyte-cdk/python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

setup(
name="airbyte-cdk",
version="0.1.55",
version="0.1.56",
description="A framework for writing Airbyte Connectors.",
long_description=README,
long_description_content_type="text/markdown",
Expand Down
10 changes: 6 additions & 4 deletions airbyte-cdk/python/unit_tests/sources/utils/test_transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,15 +151,17 @@
),
],
)
def test_transform(schema, actual, expected, expected_warns, capsys):
def test_transform(schema, actual, expected, expected_warns, caplog):
t = TypeTransformer(TransformConfig.DefaultSchemaNormalization)
t.transform(actual, schema)
assert json.dumps(actual) == json.dumps(expected)
stdout = capsys.readouterr().out
if expected_warns:
assert expected_warns in stdout
record = caplog.records[0]
assert record.name == "airbyte"
assert record.levelname == "WARNING"
assert record.message == expected_warns
else:
assert not stdout
assert len(caplog.records) == 0


def test_transform_wrong_config():
Expand Down
57 changes: 57 additions & 0 deletions airbyte-cdk/python/unit_tests/test_exception_handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
#
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
#


import json
import subprocess
import sys

import pytest
from airbyte_cdk.models import AirbyteErrorTraceMessage, AirbyteLogMessage, AirbyteMessage, AirbyteTraceMessage


def test_uncaught_exception_handler():
cmd = "from airbyte_cdk.logger import init_logger; from airbyte_cdk.exception_handler import init_uncaught_exception_handler; logger = init_logger('airbyte'); init_uncaught_exception_handler(logger); raise 1"
exception_message = "exceptions must derive from BaseException"
exception_trace = (
"Traceback (most recent call last):\n"
' File "<string>", line 1, in <module>\n'
"TypeError: exceptions must derive from BaseException"
)

expected_log_message = AirbyteMessage(
type="LOG", log=AirbyteLogMessage(level="FATAL", message=f"{exception_message}\n{exception_trace}")
)

expected_trace_message = AirbyteMessage(
type="TRACE",
trace=AirbyteTraceMessage(
type="ERROR",
emitted_at=0.0,
error=AirbyteErrorTraceMessage(
failure_type="system_error",
message="Something went wrong in the connector. See the logs for more details.",
internal_message=exception_message,
stack_trace=f"{exception_trace}\n",
),
),
)

with pytest.raises(subprocess.CalledProcessError) as err:
subprocess.check_output([sys.executable, "-c", cmd], stderr=subprocess.STDOUT)

assert not err.value.stderr, "nothing on the stderr"

stdout_lines = err.value.output.decode("utf-8").strip().split("\n")
assert len(stdout_lines) == 2

log_output, trace_output = stdout_lines

out_log_message = AirbyteMessage.parse_obj(json.loads(log_output))
assert out_log_message == expected_log_message, "Log message should be emitted in expected form"

out_trace_message = AirbyteMessage.parse_obj(json.loads(trace_output))
assert out_trace_message.trace.emitted_at > 0
out_trace_message.trace.emitted_at = 0.0 # set a specific emitted_at value for testing
assert out_trace_message == expected_trace_message, "Trace message should be emitted in expected form"
Loading

0 comments on commit 73c7fad

Please sign in to comment.