Skip to content

Commit

Permalink
GCS support with testing
Browse files Browse the repository at this point in the history
  • Loading branch information
amol- committed May 3, 2023
1 parent 654dedb commit 35577ec
Show file tree
Hide file tree
Showing 4 changed files with 111 additions and 53 deletions.
88 changes: 69 additions & 19 deletions depot/io/gcs.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from typing import List
import uuid
from datetime import datetime
from google.cloud import storage
from depot.io import utils
from depot.io.interfaces import FileStorage, StoredFile
Expand All @@ -13,24 +14,36 @@

class GCSStoredFile(StoredFile):
def __init__(self, file_id, blob):
_check_file_id(file_id)

self.blob = blob
self._fileid = file_id
self._closed = False

metadata = blob.metadata or {}
filename = metadata.get('x-depot-filename')
content_type = metadata.get('x-depot-content-type')

last_modified = blob.updated
content_length = blob.size

last_modified = None
try:
last_modified = metadata.get('x-depot-modified')
if last_modified:
last_modified = datetime.strptime(last_modified, '%Y-%m-%d %H:%M:%S')
except:
pass

super(GCSStoredFile, self).__init__(file_id, filename, content_type, last_modified, content_length)
self._pos = 0

def read(self, n=-1):
if self.closed:
raise ValueError("I/O operation on closed file")

if self._pos == 0:
# If we are starting a new read, we need to get the latest generation of the blob
self.blob = storage.Blob(self.blob.name, self.blob.bucket)

data = self.blob.download_as_bytes(start=self._pos, end=self._pos + n - 1 if n != -1 else None)
self._pos += len(data)
return data
Expand All @@ -49,13 +62,21 @@ def public_url(self):

class GCSStorage(FileStorage):
def __init__(self, project_id=None, credentials=None, bucket=None, policy=None, storage_class=None, prefix=''):

if not credentials:
if not os.environ.get("GOOGLE_APPLICATION_CREDENTIALS"):
raise ValueError("GOOGLE_APPLICATION_CREDENTIALS environment variable not set")
credentials = service_account.Credentials.from_service_account_file(os.environ["GOOGLE_APPLICATION_CREDENTIALS"])
credentials = os.environ["GOOGLE_APPLICATION_CREDENTIALS"]

if isinstance(credentials, dict):
credentials = service_account.Credentials.from_service_account_info(credentials)
elif isinstance(credentials, str):
credentials = service_account.Credentials.from_service_account_file(credentials)
else:
credentials = credentials

if not project_id:
project_id = credentials.project_id
project_id = credentials.project_id

policy = policy or CANNED_ACL_PUBLIC_READ
assert policy in [CANNED_ACL_PUBLIC_READ, CANNED_ACL_PRIVATE], (
"Key policy must be %s or %s" % (CANNED_ACL_PUBLIC_READ, CANNED_ACL_PRIVATE))
Expand All @@ -77,12 +98,17 @@ def __init__(self, project_id=None, credentials=None, bucket=None, policy=None,

def get(self, file_or_id):
file_id = self.fileid(file_or_id)
_check_file_id(file_id)

blob = self.bucket.blob(self._prefix+file_id)
if not blob.exists():
raise NotFound('File %s not existing' % self._prefix+file_id)
# TODO: Can we get rid of this extra exists check?
raise IOError('File %s not existing' % file_id)

blob.reload()
return GCSStoredFile(file_id, blob)

def set_bucket_public_iam(self,bucket,members: List[str] = ["allUsers"]):
def set_bucket_public_iam(self, bucket, members=("allUsers", )):
policy = bucket.get_iam_policy(requested_policy_version=3)
policy.bindings.append(
{"role": "roles/storage.objectViewer", "members": members}
Expand All @@ -97,14 +123,25 @@ def __save_file(self,file_id, content, filename, content_type=None):
#blob.content_encoding = 'utf-8'
#blob.cache_control = 'no-cache'
blob.content_disposition = make_content_disposition('inline', filename)

blob.metadata = {
'x-depot-modified': utils.timestamp(),
'x-depot-filename': filename,
'x-depot-content-type': content_type
}

blob.upload_from_file(content, content_type=content_type)

if hasattr(content, 'read'):
try:
pos = content.tell()
content.seek(pos)
blob.upload_from_file(content, content_type=content_type)
except:
content = content.read()
blob.upload_from_string(content, content_type=content_type)
else:
if isinstance(content, unicode_text):
raise TypeError('Only bytes can be stored, not unicode')
blob.upload_from_string(content, content_type=content_type)

if self._policy == CANNED_ACL_PUBLIC_READ:
blob.make_public()
Expand All @@ -117,27 +154,40 @@ def create(self, content, filename=None, content_type=None):

def replace(self, file_or_id, content, filename=None, content_type=None):
file_id = self.fileid(file_or_id)
_check_file_id(file_id)

existing_file = self.get(file_id)
content, filename, content_type = self.fileinfo(content, filename, content_type, existing_file)

new_file_id = str(uuid.uuid4())
if new_file_id != file_id:
raise ValueError("New file ID does not match existing file ID")

self.__save_file(new_file_id, content, filename, content_type)
self.__save_file(file_id, content, filename, content_type)

def delete(self, file_or_id):
file_id = self.fileid(file_or_id)
_check_file_id(file_id)

blob = self.bucket.blob(self._prefix+file_id)
generation_match_precondition = None
blob.reload()
generation_match_precondition = blob.generation
blob.delete(if_generation_match=generation_match_precondition)
try:
blob.reload()
generation_match_precondition = blob.generation
blob.delete(if_generation_match=generation_match_precondition)
except NotFound:
pass

def exists(self, file_or_id):
file_id = self.fileid(file_or_id)
_check_file_id(file_id)

blob = self.bucket.blob(self._prefix+file_id)
return blob.exists()

def list(self):
return [blob.name for blob in self.bucket.list_blobs()]


def _check_file_id(file_id):
# Check that the given file id is valid, this also
# prevents unsafe paths.
try:
uuid.UUID('{%s}' % file_id)
except:
raise ValueError('Invalid file id %s' % file_id)
30 changes: 11 additions & 19 deletions tests/test_gcs_storage.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import io
import os
import uuid
import json
import requests
from flaky import flaky
import unittest
from unittest import SkipTest
from google.cloud.exceptions import NotFound
from google.oauth2 import service_account

from depot._compat import PY2, unicode_text
from depot.io.gcs import GCSStorage
Expand All @@ -14,7 +15,7 @@


@flaky
class TestGCSStorage(object):
class TestGCSStorage(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.run_id = '%s-%s' % (uuid.uuid1().hex, os.getpid())
Expand All @@ -28,11 +29,14 @@ def setUpClass(cls):

cls._bucket = 'filedepot-testfs-%s' % cls.run_id
env = os.environ
if not env.get('GOOGLE_APPLICATION_CREDENTIALS'):
raise SkipTest('GOOGLE_APPLICATION_CREDENTIALS environment variable not set')
cls._gcs_credentials = service_account.Credentials.from_service_account_file(env.get('GOOGLE_APPLICATION_CREDENTIALS'))
cls._project_id = cls._gcs_credentials .project_id
cls.fs = GCSStorage(project_id=cls._project_id,credentials=cls._gcs_credentials,bucket=cls._bucket, prefix=cls._prefix)
google_credentials = env.get('GOOGLE_SERVICE_CREDENTIALS')
if not google_credentials:
raise SkipTest('GOOGLE_SERVICE_CREDENTIALS environment variable not set')
google_credentials = json.loads(google_credentials)
cls.fs = GCSStorage(project_id=google_credentials["project_id"],
credentials=google_credentials,
bucket=cls._bucket,
prefix=cls._prefix)

@classmethod
def tearDownClass(cls):
Expand All @@ -52,16 +56,6 @@ def test_fileoutside_depot(self):
f = self.fs.get(fid)
assert f.read() == FILE_CONTENT


def test_get_key_failure(self):
non_existent_key = str(uuid.uuid1())
try:
self.fs.get(non_existent_key)
except NotFound:
pass
else:
assert False, 'Should have raised NotFound'

def test_public_url(self):
fid = str(uuid.uuid1())
blob = self.fs.bucket.blob(self._prefix+fid)
Expand All @@ -76,10 +70,8 @@ def test_content_disposition(self):
response = requests.get(test_file.public_url)
assert response.headers['Content-Disposition'] == "inline;filename=\"test.txt\";filename*=utf-8''test.txt"


def test_storage_non_ascii_filenames(self):
filename = u'ملف.pdf'
#storage = GCSStorage(project_id=self._project_id,credentials=self._gcs_credentials, bucket=self._bucket)
new_file_id = self.fs.create(
io.BytesIO(FILE_CONTENT),
filename=filename,
Expand Down
11 changes: 8 additions & 3 deletions tests/test_gridfs_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,17 @@


class TestGridFSFileStorage(unittest.TestCase):
def setUp(self):
@classmethod
def setUpClass(cls):
try:
import pymongo.errors
self.fs = GridFSStorage('mongodb://localhost/gridfs_example?serverSelectionTimeoutMS=1', 'testfs')
self.fs._gridfs.exists("") # Any operation to test that mongodb is up.
cls.fs = GridFSStorage('mongodb://localhost/gridfs_example?serverSelectionTimeoutMS=1', 'testfs')
cls.fs._gridfs.exists("") # Any operation to test that mongodb is up.
except pymongo.errors.ConnectionFailure:
cls.fs = None

def setUp(self):
if self.fs is None:
self.skipTest('MongoDB not running')

def tearDown(self):
Expand Down
35 changes: 23 additions & 12 deletions tests/test_storage_interface.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# -*- coding: utf-8 -*-
import uuid
import unittest
import json
from flaky import flaky
import shutil
import mock
Expand Down Expand Up @@ -119,7 +120,7 @@ def test_another_storage(self):
f2 = self.fs.get(file_id)
assert f2.filename == f.filename
assert f2.content_type == f.content_type
assert f2.filename == 'file.txt'
assert f2.filename == 'file.txt', (f.filename, f2.filename)
assert f2.content_type == 'text/plain'

def test_repr(self):
Expand Down Expand Up @@ -151,7 +152,7 @@ def test_replace_keeps_filename(self):
f2 = self.fs.get(f.file_id)

assert f2.file_id == f.file_id
assert f.filename == f2.filename, (f.filename, f2.filename)
assert f.filename == f2.filename
assert f.read() == b'NEW CONTENT'
assert f2.content_type == 'text/plain'

Expand Down Expand Up @@ -262,17 +263,22 @@ def tearDown(self):


class TestGridFSFileStorage(unittest.TestCase, BaseStorageTestFixture):
def setUp(self):
@classmethod
def setUpClass(cls):
try:
from depot.io.gridfs import GridFSStorage
except ImportError:
self.skipTest('PyMongo not installed')
cls.skipTest('PyMongo not installed')

import pymongo.errors
try:
self.fs = GridFSStorage('mongodb://localhost/gridfs_example?serverSelectionTimeoutMS=1', 'testfs')
self.fs._gridfs.exists("") # Any operation to test that mongodb is up.
cls.fs = GridFSStorage('mongodb://localhost/gridfs_example?serverSelectionTimeoutMS=1', 'testfs')
cls.fs._gridfs.exists("") # Any operation to test that mongodb is up.
except pymongo.errors.ConnectionFailure:
cls.fs = None

def setUp(self):
if self.fs is None:
self.skipTest('MongoDB not running')

def teardown(self):
Expand Down Expand Up @@ -373,14 +379,15 @@ def tearDownClass(cls):
cls.fs._bucket_driver.bucket.delete()
except:
pass



@flaky
class TestGCSFileStorage(unittest.TestCase, BaseStorageTestFixture):

@classmethod
def get_storage(cls,bucket_name,project_id=None,credentials=None):
def get_storage(cls, bucket_name, project_id, credentials):
from depot.io.gcs import GCSStorage
return GCSStorage(project_id=project_id,credentials=credentials, bucket=bucket_name)
return GCSStorage(project_id=project_id, credentials=credentials, bucket=bucket_name)

@classmethod
def setUpClass(cls):
Expand All @@ -390,11 +397,15 @@ def setUpClass(cls):
raise unittest.SkipTest('Google Cloud Storage not installed')

env = os.environ
if not env.get('GOOGLE_APPLICATION_CREDENTIALS'):
raise unittest.SkipTest('GOOGLE_APPLICATION_CREDENTIALS environment variable not set')
google_credentials = env.get('GOOGLE_SERVICE_CREDENTIALS')
if not google_credentials:
raise SkipTest('GOOGLE_SERVICE_CREDENTIALS environment variable not set')
google_credentials = json.loads(google_credentials)

BUCKET_NAME = 'fdtest-%s-%s' % (uuid.uuid1(), os.getpid())
cls.fs = cls.get_storage(bucket_name=BUCKET_NAME)
cls.fs = cls.get_storage(bucket_name=BUCKET_NAME,
project_id=google_credentials['project_id'],
credentials=google_credentials)

def tearDown(self):
for blob in self.fs.bucket.list_blobs():
Expand Down

0 comments on commit 35577ec

Please sign in to comment.