forked from blue-yonder/turbodbc
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathhelpers.py
168 lines (126 loc) · 4.49 KB
/
helpers.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
import json
import os
from contextlib import contextmanager
from typing import Optional
import pytest
import turbodbc
def generate_microseconds_with_precision(digits):
microseconds = 0
for i in range(digits):
microseconds = 10 * microseconds + i + 1
for i in range(6 - digits):
microseconds *= 10
return microseconds
def _get_config_files():
variable = "TURBODBC_TEST_CONFIGURATION_FILES"
try:
raw = os.environ[variable]
file_names = raw.split(",")
return [file_name.strip() for file_name in file_names]
except KeyError:
raise KeyError(
"Please set the environment variable {} to specify the configuration files as a comma-separated list".format(
variable
)
)
def get_credentials(configuration):
if "user" in configuration:
return {
configuration["capabilities"]["connection_user_option"]: configuration[
"user"
],
configuration["capabilities"]["connection_password_option"]: configuration[
"password"
],
}
else:
return {}
def _load_configuration(file_name):
with open(file_name) as f:
return json.load(f)
def _get_configuration(file_name):
conf = _load_configuration(file_name)
return (conf["data_source_name"], conf)
def _get_configurations():
return [_get_configuration(file_name) for file_name in _get_config_files()]
"""
Use this decorator to execute a test function once for each database configuration.
Please note the test function *must* take the parameters `dsn` and `configuration`,
and in that order.
Example:
@for_each_database
def test_important_stuff(dsn, configuration):
assert 1 == 2
"""
for_each_database = pytest.mark.parametrize("dsn,configuration", _get_configurations())
"""
Use this decorator to execute a test function once for each database configuration
except for those databases configurations listed in the parameter.
Please note the test function *must* take the parameters `dsn` and `configuration`,
and in that order.
Example:
@for_each_database_except(["MySQL"])
def test_important_stuff(dsn, configuration):
assert 1 == 2
"""
def for_each_database_except(exceptions):
configurations = _get_configurations()
return pytest.mark.parametrize(
"dsn,configuration", [c for c in configurations if c[0] not in exceptions]
)
"""
Use this decorator to execute a test function once for a single database configuration.
Please note the test function *must* take the parameters `dsn` and `configuration`,
and in that order.
Example:
@for_one_database
def test_important_stuff(dsn, configuration):
assert 1 == 2
"""
for_one_database = pytest.mark.parametrize(
"dsn,configuration", [_get_configuration(_get_config_files()[0])]
)
def for_specific_databases(db_filter: Optional[str] = None):
"""
Use this decorator to execute a test function for a specific database configuration that contains the given filter.
Please note the test function *must* take the parameters `dsn` and `configuration`,
and in that order.
Example:
@for_specific_databases("mssql")
def test_important_stuff_only_for_mssql(dsn, configuration):
assert 1 == 2
"""
filtered_config_files = [fn for fn in _get_config_files() if db_filter in fn]
return pytest.mark.parametrize(
"dsn,configuration", [_get_configuration(cf) for cf in filtered_config_files]
)
@contextmanager
def open_connection(
configuration, rows_to_buffer=None, parameter_sets_to_buffer=100, **turbodbc_options
):
dsn = configuration["data_source_name"]
prefer_unicode = configuration.get("prefer_unicode", False)
read_buffer_size = (
turbodbc.Rows(rows_to_buffer) if rows_to_buffer else turbodbc.Megabytes(1)
)
options = turbodbc.make_options(
read_buffer_size=read_buffer_size,
parameter_sets_to_buffer=parameter_sets_to_buffer,
prefer_unicode=prefer_unicode,
**turbodbc_options,
)
connection = turbodbc.connect(
dsn, turbodbc_options=options, **get_credentials(configuration)
)
yield connection
connection.close()
@contextmanager
def open_cursor(
configuration, rows_to_buffer=None, parameter_sets_to_buffer=100, **turbodbc_options
):
with open_connection(
configuration, rows_to_buffer, parameter_sets_to_buffer, **turbodbc_options
) as connection:
cursor = connection.cursor()
yield cursor
cursor.close()