Skip to content

Commit

Permalink
rohmu: Support uploading stream of data
Browse files Browse the repository at this point in the history
Stream size does not need to be known beforehand and it can be
arbitrarily large (up to the maximum size supported by the storage
provider, i.e. in practice 5 terabytes).
  • Loading branch information
rikonen committed Jan 15, 2019
1 parent dba9a9f commit 82697df
Show file tree
Hide file tree
Showing 6 changed files with 305 additions and 63 deletions.
41 changes: 40 additions & 1 deletion pghoard/rohmu/object_storage/azure.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
# pylint: disable=import-error, no-name-in-module
from azure.storage.blob import BlockBlobService, ContentSettings
from azure.storage.blob.models import BlobPrefix
from .base import BaseTransfer, KEY_TYPE_PREFIX, KEY_TYPE_OBJECT, IterKeyItem
from .base import BaseTransfer, get_total_memory, KEY_TYPE_PREFIX, KEY_TYPE_OBJECT, IterKeyItem
from ..errors import FileNotFoundFromStorageError, InvalidConfigurationError, StorageError
import azure.common
import time
Expand All @@ -20,6 +20,19 @@
}


def calculate_max_block_size():
total_mem_mib = get_total_memory() or 0
# At least 4 MiB, at most 100 MiB. Max block size used for hosts with ~100+ GB of memory
return max(min(int(total_mem_mib / 1000), 100), 4) * 1024 * 1024


# Increase block size based on host memory. Azure supports up to 50k blocks and up to 5 TiB individual
# files. Default block size is set to 4 MiB so only ~200 GB files can be uploaded. In order to get close
# to that 5 TiB increase the block size based on host memory; we don't want to use the max 100 for all
# hosts because the uploader will allocate (with default settings) 3 x block size of memory.
BlockBlobService.MAX_BLOCK_SIZE = calculate_max_block_size()


class AzureTransfer(BaseTransfer):
def __init__(self, account_name, account_key, bucket_name, prefix=None,
azure_cloud=None):
Expand Down Expand Up @@ -199,6 +212,32 @@ def store_file_from_disk(self, key, filepath, metadata=None, multipart=None, cac
self.conn.create_blob_from_path(self.container_name, key, filepath, content_settings=content_settings,
metadata=self.sanitize_metadata(metadata, replace_hyphen_with="_"))

def store_file_object(self, key, fd, *, cache_control=None, metadata=None, mimetype=None, upload_progress_fn=None):
if cache_control is not None:
raise NotImplementedError("AzureTransfer: cache_control support not implemented")
key = self.format_key_for_backend(key, remove_slash_prefix=True)
content_settings = None
if mimetype:
content_settings = ContentSettings(content_type=mimetype)

def progress_callback(bytes_sent, _):
if upload_progress_fn:
upload_progress_fn(bytes_sent)

# Azure _BlobChunkUploader calls `tell()` on the stream even though it doesn't use the result.
# We expect the input stream not to support `tell()` so use dummy implementation for it
original_tell = getattr(fd, "tell", None)
fd.tell = lambda: None
try:
self.conn.create_blob_from_stream(self.container_name, key, fd, content_settings=content_settings,
metadata=self.sanitize_metadata(metadata, replace_hyphen_with="_"),
progress_callback=progress_callback)
finally:
if original_tell:
fd.tell = original_tell
else:
delattr(fd, "tell")

def get_or_create_container(self, container_name):
start_time = time.monotonic()
self.conn.create_container(container_name)
Expand Down
19 changes: 19 additions & 0 deletions pghoard/rohmu/object_storage/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

from ..errors import StorageError
import logging
import platform


KEY_TYPE_OBJECT = "object"
Expand Down Expand Up @@ -105,3 +106,21 @@ def store_file_from_memory(self, key, memstring, metadata=None, cache_control=No

def store_file_from_disk(self, key, filepath, metadata=None, multipart=None, cache_control=None, mimetype=None):
raise NotImplementedError

def store_file_object(self, key, fd, *, cache_control=None, metadata=None, mimetype=None, upload_progress_fn=None):
raise NotImplementedError


def get_total_memory():
"""return total system memory in mebibytes (or None if parsing meminfo fails)"""
if platform.system() != "Linux":
return None

with open("/proc/meminfo", "r") as in_file:
for line in in_file:
info = line.split()
if info[0] == "MemTotal:" and info[-1] == "kB":
memory_mb = int(int(info[1]) / 1024)
return memory_mb

return None
106 changes: 96 additions & 10 deletions pghoard/rohmu/object_storage/google.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@

from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
from googleapiclient.http import MediaFileUpload, MediaIoBaseUpload, MediaIoBaseDownload
from googleapiclient.http import MediaFileUpload, MediaIoBaseUpload, MediaIoBaseDownload, MediaUpload
from oauth2client import GOOGLE_TOKEN_URI
from oauth2client.client import GoogleCredentials

Expand Down Expand Up @@ -273,11 +273,9 @@ def get_file_size(self, key):
obj = self._retry_on_reset(req, req.execute)
return int(obj["size"])

def _upload(self, upload_type, local_object, key, metadata, extra_props, cache_control, mimetype=None):
def _upload(self, upload, key, metadata, extra_props, cache_control, upload_progress_fn=None):
key = self.format_key_for_backend(key)
self.log.debug("Starting to upload %r", key)
upload = upload_type(local_object, mimetype=mimetype or "application/octet-stream",
resumable=True, chunksize=UPLOAD_CHUNK_SIZE)
body = {"metadata": metadata}
if extra_props:
body.update(extra_props)
Expand All @@ -290,19 +288,35 @@ def _upload(self, upload_type, local_object, key, metadata, extra_props, cache_c
while response is None:
status, response = self._retry_on_reset(req, req.next_chunk)
if status:
self.log.debug("Upload of %r to %r: %d%%", local_object, key, status.progress() * 100)
self.log.debug("Upload of %r to %r: %d%%, %s bytes", upload, key, status.progress() * 100,
status.resumable_progress)
if upload_progress_fn:
upload_progress_fn(status.resumable_progress)

def store_file_from_memory(self, key, memstring, metadata=None, extra_props=None, # pylint: disable=arguments-differ
cache_control=None, mimetype=None):
return self._upload(MediaIoBaseUpload, BytesIO(memstring), key,
self.sanitize_metadata(metadata), extra_props,
cache_control=cache_control, mimetype=mimetype)
upload = MediaIoBaseUpload(
BytesIO(memstring), mimetype or "application/octet-stream", chunksize=UPLOAD_CHUNK_SIZE, resumable=True
)
return self._upload(upload, key, self.sanitize_metadata(metadata), extra_props, cache_control=cache_control)

def store_file_from_disk(self, key, filepath, metadata=None, # pylint: disable=arguments-differ, unused-variable
*, multipart=None, extra_props=None, # pylint: disable=arguments-differ, unused-variable
cache_control=None, mimetype=None):
return self._upload(MediaFileUpload, filepath, key, self.sanitize_metadata(metadata), extra_props,
cache_control=cache_control, mimetype=mimetype)
mimetype = mimetype or "application/octet-stream"
upload = MediaFileUpload(filepath, mimetype, chunksize=UPLOAD_CHUNK_SIZE, resumable=True)
return self._upload(upload, key, self.sanitize_metadata(metadata), extra_props, cache_control=cache_control)

def store_file_object(self, key, fd, *, cache_control=None, metadata=None, mimetype=None, upload_progress_fn=None):
mimetype = mimetype or "application/octet-stream"
return self._upload(
MediaStreamUpload(fd, chunk_size=UPLOAD_CHUNK_SIZE, mime_type=mimetype, name=key),
key,
self.sanitize_metadata(metadata),
None,
cache_control=cache_control,
upload_progress_fn=upload_progress_fn,
)

def get_or_create_bucket(self, bucket_name):
"""Look up the bucket if it already exists and try to create the
Expand Down Expand Up @@ -347,3 +361,75 @@ def get_or_create_bucket(self, bucket_name):
raise

return bucket_name


class MediaStreamUpload(MediaUpload):
"""Support streaming arbitrary amount of data from non-seekable object supporting read method."""

def __init__(self, fd, *, chunk_size, mime_type, name):
self._data = []
self._chunk_size = chunk_size
self._fd = fd
self._mime_type = mime_type
self._name = name
self._position = None

def chunksize(self):
return self._chunk_size

def mimetype(self):
return self._mime_type

def size(self):
return None

def resumable(self):
return True

def getbytes(self, begin, end):
length = end
if begin < (self._position or 0):
msg = "Requested position {} for {!r} preceeds already fulfilled position {}".format(
begin, self._name, self._position
)
raise IndexError(msg)
elif begin > (self._position or 0) + len(self._data):
msg = "Requested position {} for {!r} has gap from previous position {} and {} byte chunk".format(
begin, self._name, self._position, len(self._data)
)
raise IndexError(msg)

if self._position is None or begin == self._position + len(self._data):
self._data = self._read_bytes(length)
elif begin != self._position:
retain_chunk = self._data[begin - self._position]
self._data = self._read_bytes(length - len(retain_chunk), initial_data=retain_chunk)

self._position = begin
return self._data

def has_stream(self):
return False

def stream(self):
raise NotImplementedError

def _read_bytes(self, length, *, initial_data=None):
bytes_remaining = length
read_results = []
if initial_data:
read_results.append(initial_data)
while bytes_remaining > 0:
data = self._fd.read(bytes_remaining)
if data:
read_results.append(data)
bytes_remaining -= len(data)
else:
break

if not read_results:
return None
elif len(read_results) == 1:
return read_results[0]
else:
return b"".join(read_results)
17 changes: 17 additions & 0 deletions pghoard/rohmu/object_storage/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,3 +172,20 @@ def store_file_from_disk(self, key, filepath, metadata=None, multipart=None, cac
makedirs(os.path.dirname(target_path), exist_ok=True)
shutil.copyfile(filepath, target_path)
self._save_metadata(target_path, metadata)

def store_file_object(self, key, fd, *, cache_control=None, # pylint: disable=unused-argument
metadata=None, mimetype=None, upload_progress_fn=None): # pylint: disable=unused-argument
target_path = self.format_key_for_backend(key.strip("/"))
makedirs(os.path.dirname(target_path), exist_ok=True)
bytes_written = 0
with open(target_path, "wb") as output_fp:
while True:
data = fd.read(1024 * 1024)
if not data:
break
output_fp.write(data)
bytes_written += len(data)
if upload_progress_fn:
upload_progress_fn(bytes_written)

self._save_metadata(target_path, metadata)
Loading

0 comments on commit 82697df

Please sign in to comment.