Skip to content

Commit

Permalink
webserver: Do proper concurrent WAL/history downloads
Browse files Browse the repository at this point in the history
Earlier logic tried to prefetch WAL/history files that are expected to
be requested by PostgreSQL but because it waited for the prefetch ops
to complete before serving the original request it wasn't very
effective. Also, due to GIL the processing often ended up being CPU
bound.

To speed up the processing, make all file fetch operations run in the
background as identical jobs and just poll a queue of completed
operations to finalize the background jobs and in case one of them is
the operation that is currently expected to finish, fulfil the request.
New prefetch operations are always scheduled even if the current
request can be fulfilled without downloading anything so that there is
always a backlog of prefetch operations.

In addition to improving the prefetch logic, the file download,
decrypt and decompress operation was changed to be executed in a
separate process so that full CPU capacity of the host can be utilized.
To avoid having a lot of background processes running when initial WAL
playback isn't ongoing, a custom process management logic is used and
idle processes are killed after 10 minutes.

The new implementation was tested with 21 gigabytes of non-trivial (100%
non-compressible inserts) WAL files on a 16 core host with max disk
write speed of about 400 megabytes per second. Processing was restricted
by PostgreSQL not requesting/handling files fast enough or disk I/O,
slightly varying during the run. For the 1310 file requests done during
the run, 1306 were served immediately and the remaining 4 had total
delay of less than 3 seconds. Total restoration time for the run was
220 seconds.

The old version restored the 21 gigabytes of WAL in 340 seconds so the
new implementation is "only" 35% faster but the bottleneck has been
moved completely to PG and I/O side and in case of simple WAL files
faster I/O will make the new version execute better in comparison
and if PG is ever changed to support some kind of concurrency the new
implementation will be able to serve files fast enough.
  • Loading branch information
rikonen committed May 7, 2018
1 parent 50db342 commit 8fdf13c
Show file tree
Hide file tree
Showing 7 changed files with 326 additions and 150 deletions.
116 changes: 116 additions & 0 deletions pghoard/fetcher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
from pghoard.common import get_object_storage_config
from pghoard.config import key_lookup_for_site
from pghoard.rohmu import get_transfer
from pghoard.rohmu.rohmufile import create_sink_pipeline
import multiprocessing
import os
import queue
import signal
import time
import threading


class FileFetchManager:
"""Manages (potentially) multiprocessing related assets for fetching file contents from
object storage. If a multiprocess.Manager instance is provided, the fetch is performed
in a subprocess to avoid GIL related performance constraints, otherwise file is fetched
in current process."""

def __init__(self, app_config, mp_manager, transfer_provider):
self.config = app_config
self.last_activity = time.monotonic()
self.lock = threading.RLock()
self.max_idle_age = 10 * 60
self.mp_manager = mp_manager
self.process = None
self.result_queue = None
self.task_queue = None
self.transfer_provider = transfer_provider

def check_state(self):
if self.process and time.monotonic() - self.last_activity > self.max_idle_age:
self.stop()

def fetch_file(self, site, key, target_path):
self.last_activity = time.monotonic()
self._start_process()
if self.mp_manager:
self.task_queue.put((site, key, target_path))
result = self.result_queue.get()
if result is None:
# Should only happen if the process is terminated while we're waiting for
# a result, which is pretty much the same as timeout
raise queue.Empty
elif isinstance(result[1], Exception):
raise result[1]
return result[1]
else:
transfer = self.transfer_provider(site)
return FileFetcher(self.config, transfer).fetch(site, key, target_path)

def stop(self):
with self.lock:
if not self.process:
return
self.task_queue.put(None)
self.result_queue.put(None)
process = self.process
self.process = None
self.task_queue = None
self.result_queue = None
process.join(timeout=0.1)
if process.exitcode is None:
os.kill(process.pid, signal.SIGKILL)
process.join()

def _start_process(self):
with self.lock:
if not self.mp_manager or self.process:
return
self.result_queue = self.mp_manager.Queue()
self.task_queue = self.mp_manager.Queue()
self.process = multiprocessing.Process(target=_remote_file_fetch_loop,
args=(self.config, self.task_queue, self.result_queue))
self.process.start()


class FileFetcher:
"""Fetches a file from object storage and strips possible encryption and/or compression away."""
def __init__(self, app_config, transfer):
self.config = app_config
self.transfer = transfer

def fetch(self, site, key, target_path):
try:
lookup = key_lookup_for_site(self.config, site)
data, metadata = self.transfer.get_contents_to_string(key)
if isinstance(data, str):
data = data.encode("latin1")
file_size = len(data)
with open(target_path, "wb") as target_file:
output = create_sink_pipeline(
output=target_file, file_size=file_size, metadata=metadata, key_lookup=lookup, throttle_time=0)
output.write(data)
return file_size
except Exception:
if os.path.isfile(target_path):
os.unlink(target_path)
raise


def _remote_file_fetch_loop(app_config, task_queue, result_queue):
transfers = {}
while True:
task = task_queue.get()
if not task:
return
try:
site, key, target_path = task
transfer = transfers.get(site)
if not transfer:
transfer = get_transfer(get_object_storage_config(app_config, site))
transfers[site] = transfer
file_size = FileFetcher(app_config, transfer).fetch(site, key, target_path)
result_queue.put((task, file_size))
except Exception as e: # pylint: disable=broad-except
result_queue.put((task, e))
10 changes: 10 additions & 0 deletions pghoard/pghoard.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import io
import json
import logging
import multiprocessing
import os
import psycopg2
import random
Expand Down Expand Up @@ -55,12 +56,15 @@ def __init__(self, config_path):
self.transfer_queue = Queue()
self.syslog_handler = None
self.config = {}
self.mp_manager = None
self.site_transfers = {}
self.state = {
"backup_sites": {},
"startup_time": datetime.datetime.utcnow().isoformat(),
}
self.load_config()
if self.config["transfer"]["thread_count"] > 1:
self.mp_manager = multiprocessing.Manager()

if not os.path.exists(self.config["backup_location"]):
os.makedirs(self.config["backup_location"])
Expand Down Expand Up @@ -98,6 +102,7 @@ def __init__(self, config_path):
ta = TransferAgent(
config=self.config,
compression_queue=self.compression_queue,
mp_manager=self.mp_manager,
transfer_queue=self.transfer_queue,
stats=self.stats,
shared_state_dict=compressor_state)
Expand Down Expand Up @@ -564,6 +569,9 @@ def quit(self, _signal=None, _frame=None): # pylint: disable=unused-argument
for t in all_threads:
if t.is_alive():
t.join()
if self.mp_manager:
self.mp_manager.shutdown()
self.mp_manager = None


def main(args=None):
Expand Down Expand Up @@ -593,6 +601,8 @@ def main(args=None):

logutil.configure_logging(short_log=arg.short_log, level=logging.DEBUG if arg.debug else logging.INFO)

multiprocessing.set_start_method("forkserver")

try:
pghoard = PGHoard(config_path)
except InvalidConfigurationError as ex:
Expand Down
25 changes: 9 additions & 16 deletions pghoard/transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
See LICENSE for details
"""
from pghoard.common import create_alert_file, get_object_storage_config
from pghoard.fetcher import FileFetchManager
from pghoard.rohmu.compat import suppress
from pghoard.rohmu.errors import (
FileNotFoundFromStorageError,
Expand All @@ -22,13 +23,15 @@


class TransferAgent(Thread):
def __init__(self, config, compression_queue, transfer_queue, stats,
def __init__(self, config, compression_queue, mp_manager, transfer_queue, stats,
shared_state_dict):
super().__init__()
self.log = logging.getLogger("TransferAgent")
self.config = config
self.stats = stats
self.compression_queue = compression_queue
self.mp_manager = mp_manager
self.fetch_manager = FileFetchManager(self.config, self.mp_manager, self.get_object_storage)
self.transfer_queue = transfer_queue
self.running = True
self.sleep = time.sleep
Expand Down Expand Up @@ -100,6 +103,7 @@ def transmit_statsd_metrics(self):
def run(self):
while self.running:
self.transmit_statsd_metrics()
self.fetch_manager.check_state()
try:
file_to_transfer = self.transfer_queue.get(timeout=1.0)
except Empty:
Expand Down Expand Up @@ -174,6 +178,7 @@ def run(self):
"FAILED " if not result["success"] else "",
key, oper_size, time.time() - start_time)

self.fetch_manager.stop()
self.log.debug("Quitting TransferAgent")

def handle_list(self, site, key, file_to_transfer):
Expand Down Expand Up @@ -206,21 +211,9 @@ def handle_metadata(self, site, key, file_to_transfer):

def handle_download(self, site, key, file_to_transfer):
try:
storage = self.get_object_storage(site)

content, metadata = storage.get_contents_to_string(key)
file_to_transfer["file_size"] = len(content)
# Note that here we flip the local_path to mean the target_path
self.compression_queue.put({
"blob": content,
"callback_queue": file_to_transfer["callback_queue"],
"local_path": file_to_transfer["target_path"],
"metadata": metadata,
"opaque": file_to_transfer.get("opaque"),
"site": site,
"type": "DECOMPRESSION",
})
return {"success": True, "call_callback": False}
file_size = self.fetch_manager.fetch_file(site, key, file_to_transfer["target_path"])
file_to_transfer["file_size"] = file_size
return {"success": True, "opaque": file_to_transfer.get("opaque")}
except FileNotFoundFromStorageError as ex:
self.log.warning("%r not found from storage", key)
return {"success": False, "exception": ex, "opaque": file_to_transfer.get("opaque")}
Expand Down
Loading

0 comments on commit 8fdf13c

Please sign in to comment.