forked from Aiven-Open/pghoard
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtest_transferagent.py
142 lines (125 loc) · 5.3 KB
/
test_transferagent.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
"""
pghoard
Copyright (c) 2015 Ohmu Ltd
See LICENSE for details
"""
# pylint: disable=attribute-defined-outside-init
from .base import PGHoardTestCase
from pghoard import statsd
from pghoard.rohmu.errors import StorageError
from pghoard.transfer import TransferAgent
from queue import Empty, Queue
from unittest.mock import Mock
import os
import pytest
class MockStorage(Mock):
def get_contents_to_string(self, key): # pylint: disable=unused-argument
return b"joo", {"key": "value"}
def store_file_from_disk(self, key, local_path, metadata, multipart=None): # pylint: disable=unused-argument
pass
class MockStorageRaising(Mock):
def get_contents_to_string(self, key): # pylint: disable=unused-argument
return b"joo", {"key": "value"}
def store_file_from_disk(self, key, local_path, metadata, multipart=None): # pylint: disable=unused-argument
raise StorageError("foo")
class TestTransferAgent(PGHoardTestCase):
def setup_method(self, method):
super().setup_method(method)
self.config = self.config_template()
self.config["backup_sites"][self.test_site]["object_storage"] = {"storage_type": "s3"}
os.makedirs(self.config["alert_file_dir"], exist_ok=True)
self.foo_path = os.path.join(self.temp_dir, self.test_site, "xlog", "00000001000000000000000C")
os.makedirs(os.path.join(self.temp_dir, self.test_site, "xlog"))
with open(self.foo_path, "w") as out:
out.write("foo")
self.foo_basebackup_path = os.path.join(self.temp_dir, self.test_site, "basebackup", "2015-04-15_0", "base.tar.xz")
os.makedirs(os.path.join(self.temp_dir, self.test_site, "basebackup", "2015-04-15_0"))
with open(self.foo_basebackup_path, "w") as out:
out.write("foo")
self.compression_queue = Queue()
self.transfer_queue = Queue()
self.transfer_agent = TransferAgent(
config=self.config,
compression_queue=self.compression_queue,
transfer_queue=self.transfer_queue,
stats=statsd.StatsClient(host=None),
shared_state_dict={})
self.transfer_agent.start()
def teardown_method(self, method):
self.transfer_agent.running = False
self.transfer_queue.put({"type": "QUIT"})
self.transfer_agent.join()
super().teardown_method(method)
def test_handle_download(self):
callback_queue = Queue()
self.transfer_agent.get_object_storage = MockStorage()
self.transfer_queue.put({
"callback_queue": callback_queue,
"filetype": "xlog",
"local_path": self.temp_dir,
"opaque": 42,
"site": self.test_site,
"target_path": self.temp_dir,
"type": "DOWNLOAD",
})
expected_event = {
"blob": b"joo",
"callback_queue": callback_queue,
"local_path": self.temp_dir,
"metadata": {"key": "value"},
"opaque": 42,
"site": self.test_site,
"type": "DECOMPRESSION",
}
assert self.compression_queue.get(timeout=1.0) == expected_event
def test_handle_upload_xlog(self):
callback_queue = Queue()
storage = MockStorage()
self.transfer_agent.get_object_storage = storage
assert os.path.exists(self.foo_path) is True
self.transfer_queue.put({
"callback_queue": callback_queue,
"file_size": 3,
"filetype": "xlog",
"local_path": self.foo_path,
"metadata": {"start-wal-segment": "00000001000000000000000C"},
"site": self.test_site,
"type": "UPLOAD",
})
assert callback_queue.get(timeout=1.0) == {"success": True, "opaque": None}
assert os.path.exists(self.foo_path) is False
def test_handle_upload_basebackup(self):
callback_queue = Queue()
storage = MockStorage()
self.transfer_agent.get_object_storage = storage
assert os.path.exists(self.foo_path) is True
self.transfer_queue.put({
"callback_queue": callback_queue,
"file_size": 3,
"filetype": "basebackup",
"local_path": self.foo_basebackup_path,
"metadata": {"start-wal-segment": "00000001000000000000000C"},
"site": self.test_site,
"type": "UPLOAD",
})
assert callback_queue.get(timeout=1.0) == {"success": True, "opaque": None}
assert os.path.exists(self.foo_basebackup_path) is False
def test_handle_failing_upload_xlog(self):
callback_queue = Queue()
storage = MockStorageRaising()
self.transfer_agent.get_object_storage = storage
assert os.path.exists(self.foo_path) is True
self.transfer_queue.put({
"callback_queue": callback_queue,
"file_size": 3,
"filetype": "xlog",
"local_path": self.foo_path,
"metadata": {"start-wal-segment": "00000001000000000000000C"},
"site": self.test_site,
"type": "UPLOAD",
})
with pytest.raises(Empty):
callback_queue.get(timeout=3.0)
alert_file_path = os.path.join(self.config["alert_file_dir"], "upload_retries_warning")
assert os.path.exists(alert_file_path) is True
os.unlink(alert_file_path)