forked from airbytehq/airbyte
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathconnector.py
88 lines (68 loc) · 3.02 KB
/
connector.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#
import json
import logging
import os
import pkgutil
from abc import ABC, abstractmethod
from typing import Any, Mapping, Optional
import yaml
from airbyte_cdk.models import AirbyteConnectionStatus, ConnectorSpecification
def load_optional_package_file(package: str, filename: str) -> Optional[bytes]:
"""Gets a resource from a package, returning None if it does not exist"""
try:
return pkgutil.get_data(package, filename)
except FileNotFoundError:
return None
class AirbyteSpec(object):
@staticmethod
def from_file(file_name: str):
with open(file_name) as file:
spec_text = file.read()
return AirbyteSpec(spec_text)
def __init__(self, spec_string):
self.spec_string = spec_string
class Connector(ABC):
# configure whether the `check_config_against_spec_or_exit()` needs to be called
check_config_against_spec: bool = True
# can be overridden to change an input config
def configure(self, config: Mapping[str, Any], temp_dir: str) -> Mapping[str, Any]:
"""
Persist config in temporary directory to run the Source job
"""
config_path = os.path.join(temp_dir, "config.json")
self.write_config(config, config_path)
return config
@staticmethod
def read_config(config_path: str) -> Mapping[str, Any]:
with open(config_path, "r") as file:
contents = file.read()
return json.loads(contents)
@staticmethod
def write_config(config: Mapping[str, Any], config_path: str):
with open(config_path, "w") as fh:
fh.write(json.dumps(config))
def spec(self, logger: logging.Logger) -> ConnectorSpecification:
"""
Returns the spec for this integration. The spec is a JSON-Schema object describing the required configurations (e.g: username and password)
required to run this integration. By default, this will be loaded from a "spec.yaml" or a "spec.json" in the package root.
"""
package = self.__class__.__module__.split(".")[0]
yaml_spec = load_optional_package_file(package, "spec.yaml")
json_spec = load_optional_package_file(package, "spec.json")
if yaml_spec and json_spec:
raise RuntimeError("Found multiple spec files in the package. Only one of spec.yaml or spec.json should be provided.")
if yaml_spec:
spec_obj = yaml.load(yaml_spec, Loader=yaml.SafeLoader)
elif json_spec:
spec_obj = json.loads(json_spec)
else:
raise FileNotFoundError("Unable to find spec.yaml or spec.json in the package.")
return ConnectorSpecification.parse_obj(spec_obj)
@abstractmethod
def check(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteConnectionStatus:
"""
Tests if the input configuration can be used to successfully connect to the integration e.g: if a provided Stripe API token can be used to connect
to the Stripe API.
"""