From 22a11039273f97f9d065052299c5e4d1971ada27 Mon Sep 17 00:00:00 2001 From: Dataliberate Date: Mon, 30 Jul 2018 12:29:57 +0100 Subject: [PATCH] First pass at introducung Cloud Storage as caching mechanisum --- api.py | 113 +++- app.yaml | 19 +- lib/cloudstorage/__init__.py | 29 + lib/cloudstorage/api_utils.py | 364 +++++++++++ lib/cloudstorage/cloudstorage_api.py | 673 +++++++++++++++++++ lib/cloudstorage/common.py | 429 ++++++++++++ lib/cloudstorage/errors.py | 143 ++++ lib/cloudstorage/rest_api.py | 280 ++++++++ lib/cloudstorage/storage_api.py | 933 +++++++++++++++++++++++++++ lib/cloudstorage/test_utils.py | 25 + sdoapp.py | 269 +++++--- sdocloudstore.py | 453 +++++++++++++ 12 files changed, 3605 insertions(+), 125 deletions(-) create mode 100755 lib/cloudstorage/__init__.py create mode 100755 lib/cloudstorage/api_utils.py create mode 100755 lib/cloudstorage/cloudstorage_api.py create mode 100755 lib/cloudstorage/common.py create mode 100755 lib/cloudstorage/errors.py create mode 100755 lib/cloudstorage/rest_api.py create mode 100755 lib/cloudstorage/storage_api.py create mode 100755 lib/cloudstorage/test_utils.py create mode 100644 sdocloudstore.py diff --git a/api.py b/api.py index 83db46f1bb..144e337676 100755 --- a/api.py +++ b/api.py @@ -45,6 +45,7 @@ def getInstanceId(short=False): log.info("IN TESTHARNESS %s" % getInTestHarness()) if not getInTestHarness(): from google.appengine.api import memcache + from sdocloudstore import SdoCloud AllLayersList = [] def setAllLayersList(val): @@ -67,7 +68,10 @@ def getAllLayersList(): # loader=jinja2.FileSystemLoader(os.path.join(os.path.dirname(__file__), 'templates')), # extensions=['jinja2.ext.autoescape'], autoescape=True) -NDBPAGESTORE = True #True - uses NDB shared (accross instances) store for page cache - False uses in memory local cache +#PAGESTOREMODE = "NDBSHARED" #INMEM (In instance memory) +PAGESTOREMODE = "CLOUDSTORE" #INMEM (In instance memory) + #NDBSHARED (NDB shared - accross instances) + #CLOUDSTORE - (Cloudstorage files) debugging = False def getMasterStore(): @@ -181,7 +185,7 @@ def keys(self): class PageEntity(ndb.Model): content = ndb.TextProperty() -class PageStoreTool(): +class NDBPageStoreTool(): def __init__ (self): self.tlocal = threading.local() self.tlocal.CurrentStoreSet = "core" @@ -243,6 +247,60 @@ def remove(self, key,cache=None): #log.info("PageStore '%s' not found" % fullKey) return None +class CloudPageStoreTool(): + def __init__ (self): + self.init() + + def init(self): + log.info("CloudPageStoreTool.init") + self.tlocal = threading.local() + self.tlocal.CurrentStoreSet = "core" + log.info("CloudPageStoreTool.CurrentStoreSet: %s" % self.tlocal.CurrentStoreSet) + + def _getTypeFromKey(self,key): + name = key + typ = None + split = key.split(':') + if len(split) > 1: + name = split[1] + typ = split[0] + if typ[0] == '.': + typ = typ[1:] + log.info("%s > %s %s" % (key,name,typ)) + return name,typ + + def initialise(self): + return {"CloudPageStore":SdoCloud.delete_files_in_bucket()} + + def getCurrent(self): + try: + if not self.tlocal.CurrentStoreSet: + self.tlocal.CurrentStoreSet = "core" + except Exception: + self.tlocal.CurrentStoreSet = "core" + ret = self.tlocal.CurrentStoreSet + return ret + + def setCurrent(self,current): + self.tlocal.CurrentStoreSet = current + log.debug("CloudPageStore setting CurrentStoreSet: %s",current) + + def put(self, key, val,cache=None,extrameta=None): + fname, ftype = self._getTypeFromKey(key) + if not ftype: + ftype = "html" + SdoCloud.writeFormattedFile(fname,ftype=ftype,content=val,extrameta=extrameta) + + def get(self, key,cache=None): + fname, ftype = self._getTypeFromKey(key) + if not ftype: + ftype = "html" + return SdoCloud.readFormattedFile(fname,ftype=ftype) + + def remove(self, key,cache=None): + SdoCloud.deleteFormattedFile(key) + + class HeaderEntity(ndb.Model): content = ndb.PickleProperty() @@ -375,33 +433,57 @@ def remove(self, key,cache=None): PageStore = None HeaderStore = None DataCache = None -log.info("[%s] NDB PageStore & HeaderStore available: %s" % (getInstanceId(short=True),NDBPAGESTORE)) +log.info("[%s] PageStore mode: %s" % (getInstanceId(short=True),PAGESTOREMODE)) -def enablePageStore(state): +def enablePageStore(mode): global PageStore,HeaderStore,DataCache - log.info("enablePageStore(%s)" % state) - if state: + log.info("enablePageStore(%s)" % mode) + if(mode == "NDBSHARED"): log.info("[%s] Enabling NDB" % getInstanceId(short=True)) - PageStore = PageStoreTool() + PageStore = NDBPageStoreTool() log.info("[%s] Created PageStore" % getInstanceId(short=True)) HeaderStore = HeaderStoreTool() log.info("[%s] Created HeaderStore" % getInstanceId(short=True)) DataCache = DataStoreTool() log.info("[%s] Created DataStore" % getInstanceId(short=True)) - else: + + elif(mode == "INMEM"): log.info("[%s] Disabling NDB" % getInstanceId(short=True)) PageStore = DataCacheTool() HeaderStore = DataCacheTool() DataCache = DataCacheTool() + + elif(mode == "CLOUDSTORE"): + log.info("[%s] Enabling CloudStore" % getInstanceId(short=True)) + PageStore = CloudPageStoreTool() + log.info("[%s] Created PageStore" % getInstanceId(short=True)) + HeaderStore = HeaderStoreTool() + log.info("[%s] Created HeaderStore" % getInstanceId(short=True)) + DataCache = DataStoreTool() + log.info("[%s] Created DataStore" % getInstanceId(short=True)) + else: + log.error("Invalid storage mode: %s" % mode) if getInTestHarness(): #Override pagestore decision if in testharness - enablePageStore(False) + enablePageStore("INMEM") else: - if NDBPAGESTORE: - enablePageStore(True) - else: - enablePageStore(False) + enablePageStore(PAGESTOREMODE) + +def prepareCloudstoreDocs(): + for root, dirs, files in os.walk("docs"): + for f in files: + fname = os.path.join(root, f) + log.info("%s" %( fname )) + try: + with open(fname, 'r') as f: + content = f.read() + SdoCloud.writeFormattedFile(fname,content=content, location="html", raw=True) + f.close() + except Exception as e: + log.info("ERROR reading: %s" % e) + pass + class Unit (): """ @@ -1380,7 +1462,7 @@ class CacheControl(): def clean(pagesonly=False): ret = {} - if not NDBPAGESTORE: + if PAGESTOREMODE == "INMEM": ret["PageStore"] = PageStore.initialise() ret["HeaderStore"] = HeaderStore.initialise() ret["DataCache"] = DataCache.initialise() @@ -1388,6 +1470,9 @@ def clean(pagesonly=False): ndbret = CacheControl.ndbClean() ret.update(ndbret) + if PAGESTOREMODE == "CLOUDSTORE": + prepareCloudstoreDocs() + return ret @staticmethod diff --git a/app.yaml b/app.yaml index eb6925c5d4..d23f290328 100644 --- a/app.yaml +++ b/app.yaml @@ -2,16 +2,18 @@ #application: webschemas #version: 1 -module: default +#module: default runtime: python27 api_version: 1 threadsafe: true -#automatic_scaling: #Only applicable for appengine accounts with billing enabled -# min_idle_instances: 2 +automatic_scaling: #Only applicable for appengine accounts with billing enabled + min_idle_instances: 1 + +instance_class: F2 #basic_scaling: -# max_instances: 10 +# max_instances: 3 # idle_timeout: 10m inbound_services: @@ -20,23 +22,27 @@ inbound_services: env_variables: PRODSITEDEBUG: 'False' WARMUPSTATE: 'Auto' # 'Off', 'On', 'Auto' - Off for localhost, On elsewhere + STAYINEXTENTION: 'False' handlers: - url: /favicon.ico static_files: docs/favicon.ico upload: docs/favicon.ico + application_readable: True mime_type: image/x-icon - url: /robots.txt static_files: docs/robots-blockall.txt upload: docs/robots-blockall.txt + application_readable: True mime_type: text/plain # To avoid: "Could not guess mimetype for docs/schemaorg.owl. Using application/octet-stream." - url: /docs/schemaorg.owl static_files: docs/schemaorg.owl upload: docs/schemaorg.owl + application_readable: True mime_type: application/rdf+xml - url: /docs/schema_org_rdfa.html @@ -62,6 +68,11 @@ handlers: - url: /docs static_dir: docs + application_readable: True + +- url: /admin + static_dir: admin + application_readable: True #- url: / # static_files: static/index.html diff --git a/lib/cloudstorage/__init__.py b/lib/cloudstorage/__init__.py new file mode 100755 index 0000000000..349a021a55 --- /dev/null +++ b/lib/cloudstorage/__init__.py @@ -0,0 +1,29 @@ +# Copyright 2014 Google Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, +# either express or implied. See the License for the specific +# language governing permissions and limitations under the License. + +"""Client Library for Google Cloud Storage.""" + + + + +from .api_utils import RetryParams +from .api_utils import set_default_retry_params +from cloudstorage_api import * +from .common import CSFileStat +from .common import GCSFileStat +from .common import validate_bucket_name +from .common import validate_bucket_path +from .common import validate_file_path +from errors import * +from storage_api import * diff --git a/lib/cloudstorage/api_utils.py b/lib/cloudstorage/api_utils.py new file mode 100755 index 0000000000..33e4b6e207 --- /dev/null +++ b/lib/cloudstorage/api_utils.py @@ -0,0 +1,364 @@ +# Copyright 2013 Google Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, +# either express or implied. See the License for the specific +# language governing permissions and limitations under the License. + +"""Util functions and classes for cloudstorage_api.""" + + + +__all__ = ['set_default_retry_params', + 'RetryParams', + ] + +import copy +import httplib +import logging +import math +import os +import threading +import time +import urllib + + +try: + from google.appengine.api import app_identity + from google.appengine.api import urlfetch + from google.appengine.api import urlfetch_errors + from google.appengine.datastore import datastore_rpc + from google.appengine.ext import ndb + from google.appengine.ext.ndb import eventloop + from google.appengine.ext.ndb import tasklets + from google.appengine.ext.ndb import utils + from google.appengine import runtime + from google.appengine.runtime import apiproxy_errors +except ImportError: + from google.appengine.api import app_identity + from google.appengine.api import urlfetch + from google.appengine.api import urlfetch_errors + from google.appengine.datastore import datastore_rpc + from google.appengine import runtime + from google.appengine.runtime import apiproxy_errors + from google.appengine.ext import ndb + from google.appengine.ext.ndb import eventloop + from google.appengine.ext.ndb import tasklets + from google.appengine.ext.ndb import utils + + +_RETRIABLE_EXCEPTIONS = (urlfetch.DownloadError, + urlfetch_errors.InternalTransientError, + apiproxy_errors.Error, + app_identity.InternalError, + app_identity.BackendDeadlineExceeded) + +_thread_local_settings = threading.local() +_thread_local_settings.default_retry_params = None + + +def set_default_retry_params(retry_params): + """Set a default RetryParams for current thread current request.""" + _thread_local_settings.default_retry_params = copy.copy(retry_params) + + +def _get_default_retry_params(): + """Get default RetryParams for current request and current thread. + + Returns: + A new instance of the default RetryParams. + """ + default = getattr(_thread_local_settings, 'default_retry_params', None) + if default is None or not default.belong_to_current_request(): + return RetryParams() + else: + return copy.copy(default) + + +def _quote_filename(filename): + """Quotes filename to use as a valid URI path. + + Args: + filename: user provided filename. /bucket/filename. + + Returns: + The filename properly quoted to use as URI's path component. + """ + return urllib.quote(filename) + + +def _unquote_filename(filename): + """Unquotes a valid URI path back to its filename. + + This is the opposite of _quote_filename. + + Args: + filename: a quoted filename. /bucket/some%20filename. + + Returns: + The filename unquoted. + """ + return urllib.unquote(filename) + + +def _should_retry(resp): + """Given a urlfetch response, decide whether to retry that request.""" + return (resp.status_code == httplib.REQUEST_TIMEOUT or + (resp.status_code >= 500 and + resp.status_code < 600)) + + +class _RetryWrapper(object): + """A wrapper that wraps retry logic around any tasklet.""" + + def __init__(self, + retry_params, + retriable_exceptions=_RETRIABLE_EXCEPTIONS, + should_retry=lambda r: False): + """Init. + + Args: + retry_params: an RetryParams instance. + retriable_exceptions: a list of exception classes that are retriable. + should_retry: a function that takes a result from the tasklet and returns + a boolean. True if the result should be retried. + """ + self.retry_params = retry_params + self.retriable_exceptions = retriable_exceptions + self.should_retry = should_retry + + @ndb.tasklet + def run(self, tasklet, **kwds): + """Run a tasklet with retry. + + The retry should be transparent to the caller: if no results + are successful, the exception or result from the last retry is returned + to the caller. + + Args: + tasklet: the tasklet to run. + **kwds: keywords arguments to run the tasklet. + + Raises: + The exception from running the tasklet. + + Returns: + The result from running the tasklet. + """ + start_time = time.time() + n = 1 + + while True: + e = None + result = None + got_result = False + + try: + result = yield tasklet(**kwds) + got_result = True + if not self.should_retry(result): + raise ndb.Return(result) + except runtime.DeadlineExceededError: + logging.debug( + 'Tasklet has exceeded request deadline after %s seconds total', + time.time() - start_time) + raise + except self.retriable_exceptions as e: + pass + + if n == 1: + logging.debug('Tasklet is %r', tasklet) + + delay = self.retry_params.delay(n, start_time) + + if delay <= 0: + logging.debug( + 'Tasklet failed after %s attempts and %s seconds in total', + n, time.time() - start_time) + if got_result: + raise ndb.Return(result) + elif e is not None: + raise e + else: + assert False, 'Should never reach here.' + + if got_result: + logging.debug( + 'Got result %r from tasklet.', result) + else: + logging.debug( + 'Got exception "%r" from tasklet.', e) + logging.debug('Retry in %s seconds.', delay) + n += 1 + yield tasklets.sleep(delay) + + +class RetryParams(object): + """Retry configuration parameters.""" + + _DEFAULT_USER_AGENT = 'AppEngine-Python-GCS' + + @datastore_rpc._positional(1) + def __init__(self, + backoff_factor=2.0, + initial_delay=0.1, + max_delay=10.0, + min_retries=3, + max_retries=6, + max_retry_period=30.0, + urlfetch_timeout=None, + save_access_token=False, + _user_agent=None, + memcache_access_token=True): + """Init. + + This object is unique per request per thread. + + Library will retry according to this setting when App Engine Server + can't call urlfetch, urlfetch timed out, or urlfetch got a 408 or + 500-600 response. + + Args: + backoff_factor: exponential backoff multiplier. + initial_delay: seconds to delay for the first retry. + max_delay: max seconds to delay for every retry. + min_retries: min number of times to retry. This value is automatically + capped by max_retries. + max_retries: max number of times to retry. Set this to 0 for no retry. + max_retry_period: max total seconds spent on retry. Retry stops when + this period passed AND min_retries has been attempted. + urlfetch_timeout: timeout for urlfetch in seconds. Could be None, + in which case the value will be chosen by urlfetch module. + save_access_token: persist access token to datastore to avoid + excessive usage of GetAccessToken API. In addition to this, the token + will be cached in process, and may also be cached in memcache (see + memcache_access_token param). However, storing in Datastore can still + be useful in the event that memcache is unavailable. + _user_agent: The user agent string that you want to use in your requests. + memcache_access_token: cache access token in memcache to avoid excessive + usage of GetAccessToken API. + """ + self.backoff_factor = self._check('backoff_factor', backoff_factor) + self.initial_delay = self._check('initial_delay', initial_delay) + self.max_delay = self._check('max_delay', max_delay) + self.max_retry_period = self._check('max_retry_period', max_retry_period) + self.max_retries = self._check('max_retries', max_retries, True, int) + self.min_retries = self._check('min_retries', min_retries, True, int) + if self.min_retries > self.max_retries: + self.min_retries = self.max_retries + + self.urlfetch_timeout = None + if urlfetch_timeout is not None: + self.urlfetch_timeout = self._check('urlfetch_timeout', urlfetch_timeout) + self.save_access_token = self._check('save_access_token', save_access_token, + True, bool) + self.memcache_access_token = self._check('memcache_access_token', + memcache_access_token, + True, + bool) + self._user_agent = _user_agent or self._DEFAULT_USER_AGENT + + self._request_id = os.getenv('REQUEST_LOG_ID') + + def __eq__(self, other): + if not isinstance(other, self.__class__): + return False + return self.__dict__ == other.__dict__ + + def __ne__(self, other): + return not self.__eq__(other) + + @classmethod + def _check(cls, name, val, can_be_zero=False, val_type=float): + """Check init arguments. + + Args: + name: name of the argument. For logging purpose. + val: value. Value has to be non negative number. + can_be_zero: whether value can be zero. + val_type: Python type of the value. + + Returns: + The value. + + Raises: + ValueError: when invalid value is passed in. + TypeError: when invalid value type is passed in. + """ + valid_types = [val_type] + if val_type is float: + valid_types.append(int) + + if type(val) not in valid_types: + raise TypeError( + 'Expect type %s for parameter %s' % (val_type.__name__, name)) + if val < 0: + raise ValueError( + 'Value for parameter %s has to be greater than 0' % name) + if not can_be_zero and val == 0: + raise ValueError( + 'Value for parameter %s can not be 0' % name) + return val + + def belong_to_current_request(self): + return os.getenv('REQUEST_LOG_ID') == self._request_id + + def delay(self, n, start_time): + """Calculate delay before the next retry. + + Args: + n: the number of current attempt. The first attempt should be 1. + start_time: the time when retry started in unix time. + + Returns: + Number of seconds to wait before next retry. -1 if retry should give up. + """ + if (n > self.max_retries or + (n > self.min_retries and + time.time() - start_time > self.max_retry_period)): + return -1 + return min( + math.pow(self.backoff_factor, n-1) * self.initial_delay, + self.max_delay) + + +def _run_until_rpc(): + """Eagerly evaluate tasklets until it is blocking on some RPC. + + Usually ndb eventloop el isn't run until some code calls future.get_result(). + + When an async tasklet is called, the tasklet wrapper evaluates the tasklet + code into a generator, enqueues a callback _help_tasklet_along onto + the el.current queue, and returns a future. + + _help_tasklet_along, when called by the el, will + get one yielded value from the generator. If the value if another future, + set up a callback _on_future_complete to invoke _help_tasklet_along + when the dependent future fulfills. If the value if a RPC, set up a + callback _on_rpc_complete to invoke _help_tasklet_along when the RPC fulfills. + Thus _help_tasklet_along drills down + the chain of futures until some future is blocked by RPC. El runs + all callbacks and constantly check pending RPC status. + """ + el = eventloop.get_event_loop() + while el.current: + el.run0() + + +def _eager_tasklet(tasklet): + """Decorator to turn tasklet to run eagerly.""" + + @utils.wrapping(tasklet) + def eager_wrapper(*args, **kwds): + fut = tasklet(*args, **kwds) + _run_until_rpc() + return fut + + return eager_wrapper diff --git a/lib/cloudstorage/cloudstorage_api.py b/lib/cloudstorage/cloudstorage_api.py new file mode 100755 index 0000000000..1c186d4c37 --- /dev/null +++ b/lib/cloudstorage/cloudstorage_api.py @@ -0,0 +1,673 @@ +# Copyright 2012 Google Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, +# either express or implied. See the License for the specific +# language governing permissions and limitations under the License. + +"""File Interface for Google Cloud Storage.""" + + + +from __future__ import with_statement + + + +__all__ = ['copy2', + 'delete', + 'listbucket', + 'open', + 'stat', + 'compose', + 'get_location', + 'get_storage_class', + ] + +import logging +import StringIO +import urllib +import os +import itertools +import types +import xml.etree.cElementTree as ET +from . import api_utils +from . import common +from . import errors +from . import storage_api + + + +def open(filename, + mode='r', + content_type=None, + options=None, + read_buffer_size=storage_api.ReadBuffer.DEFAULT_BUFFER_SIZE, + retry_params=None, + _account_id=None, + offset=0): + """Opens a Google Cloud Storage file and returns it as a File-like object. + + Args: + filename: A Google Cloud Storage filename of form '/bucket/filename'. + mode: 'r' for reading mode. 'w' for writing mode. + In reading mode, the file must exist. In writing mode, a file will + be created or be overrode. + content_type: The MIME type of the file. str. Only valid in writing mode. + options: A str->basestring dict to specify additional headers to pass to + GCS e.g. {'x-goog-acl': 'private', 'x-goog-meta-foo': 'foo'}. + Supported options are x-goog-acl, x-goog-meta-, cache-control, + content-disposition, and content-encoding. + Only valid in writing mode. + See https://developers.google.com/storage/docs/reference-headers + for details. + read_buffer_size: The buffer size for read. Read keeps a buffer + and prefetches another one. To minimize blocking for large files, + always read by buffer size. To minimize number of RPC requests for + small files, set a large buffer size. Max is 30MB. + retry_params: An instance of api_utils.RetryParams for subsequent calls + to GCS from this file handle. If None, the default one is used. + _account_id: Internal-use only. + offset: Number of bytes to skip at the start of the file. If None, 0 is + used. + + Returns: + A reading or writing buffer that supports File-like interface. Buffer + must be closed after operations are done. + + Raises: + errors.AuthorizationError: if authorization failed. + errors.NotFoundError: if an object that's expected to exist doesn't. + ValueError: invalid open mode or if content_type or options are specified + in reading mode. + """ + common.validate_file_path(filename) + api = storage_api._get_storage_api(retry_params=retry_params, + account_id=_account_id) + filename = api_utils._quote_filename(filename) + + if mode == 'w': + common.validate_options(options) + return storage_api.StreamingBuffer(api, filename, content_type, options) + elif mode == 'r': + if content_type or options: + raise ValueError('Options and content_type can only be specified ' + 'for writing mode.') + return storage_api.ReadBuffer(api, + filename, + buffer_size=read_buffer_size, + offset=offset) + else: + raise ValueError('Invalid mode %s.' % mode) + + +def delete(filename, retry_params=None, _account_id=None): + """Delete a Google Cloud Storage file. + + Args: + filename: A Google Cloud Storage filename of form '/bucket/filename'. + retry_params: An api_utils.RetryParams for this call to GCS. If None, + the default one is used. + _account_id: Internal-use only. + + Raises: + errors.NotFoundError: if the file doesn't exist prior to deletion. + """ + api = storage_api._get_storage_api(retry_params=retry_params, + account_id=_account_id) + common.validate_file_path(filename) + filename = api_utils._quote_filename(filename) + status, resp_headers, content = api.delete_object(filename) + errors.check_status(status, [204], filename, resp_headers=resp_headers, + body=content) + + +def get_location(bucket, retry_params=None, _account_id=None): + """Returns the location for the given bucket. + + https://cloud.google.com/storage/docs/bucket-locations + + Args: + bucket: A Google Cloud Storage bucket of form '/bucket'. + retry_params: An api_utils.RetryParams for this call to GCS. If None, + the default one is used. + _account_id: Internal-use only. + + Returns: + The location as a string. + + Raises: + errors.AuthorizationError: if authorization failed. + errors.NotFoundError: if the bucket does not exist. + """ + + return _get_bucket_attribute(bucket, + 'location', + 'LocationConstraint', + retry_params=retry_params, + _account_id=_account_id) + + +def get_storage_class(bucket, retry_params=None, _account_id=None): + """Returns the storage class for the given bucket. + + https://cloud.google.com/storage/docs/storage-classes + + Args: + bucket: A Google Cloud Storage bucket of form '/bucket'. + retry_params: An api_utils.RetryParams for this call to GCS. If None, + the default one is used. + _account_id: Internal-use only. + + Returns: + The storage class as a string. + + Raises: + errors.AuthorizationError: if authorization failed. + errors.NotFoundError: if the bucket does not exist. + """ + + return _get_bucket_attribute(bucket, + 'storageClass', + 'StorageClass', + retry_params=retry_params, + _account_id=_account_id) + + +def _get_bucket_attribute(bucket, + query_param, + xml_response_tag, + retry_params=None, + _account_id=None): + """Helper method to request a bucket parameter and parse the response. + + Args: + bucket: A Google Cloud Storage bucket of form '/bucket'. + query_param: The query parameter to include in the get bucket request. + xml_response_tag: The expected tag in the xml response. + retry_params: An api_utils.RetryParams for this call to GCS. If None, + the default one is used. + _account_id: Internal-use only. + + Returns: + The xml value as a string. None if the returned xml does not match expected + format. + + Raises: + errors.AuthorizationError: if authorization failed. + errors.NotFoundError: if the bucket does not exist. + """ + api = storage_api._get_storage_api(retry_params=retry_params, + account_id=_account_id) + common.validate_bucket_path(bucket) + status, headers, content = api.get_bucket('%s?%s' % (bucket, query_param)) + + errors.check_status(status, [200], bucket, resp_headers=headers, body=content) + + root = ET.fromstring(content) + if root.tag == xml_response_tag and root.text: + return root.text + return None + + +def stat(filename, retry_params=None, _account_id=None): + """Get GCSFileStat of a Google Cloud storage file. + + Args: + filename: A Google Cloud Storage filename of form '/bucket/filename'. + retry_params: An api_utils.RetryParams for this call to GCS. If None, + the default one is used. + _account_id: Internal-use only. + + Returns: + a GCSFileStat object containing info about this file. + + Raises: + errors.AuthorizationError: if authorization failed. + errors.NotFoundError: if an object that's expected to exist doesn't. + """ + common.validate_file_path(filename) + api = storage_api._get_storage_api(retry_params=retry_params, + account_id=_account_id) + status, headers, content = api.head_object( + api_utils._quote_filename(filename)) + errors.check_status(status, [200], filename, resp_headers=headers, + body=content) + file_stat = common.GCSFileStat( + filename=filename, + st_size=common.get_stored_content_length(headers), + st_ctime=common.http_time_to_posix(headers.get('last-modified')), + etag=headers.get('etag'), + content_type=headers.get('content-type'), + metadata=common.get_metadata(headers)) + + return file_stat + + +def copy2(src, dst, metadata=None, retry_params=None): + """Copy the file content from src to dst. + + Args: + src: /bucket/filename + dst: /bucket/filename + metadata: a dict of metadata for this copy. If None, old metadata is copied. + For example, {'x-goog-meta-foo': 'bar'}. + retry_params: An api_utils.RetryParams for this call to GCS. If None, + the default one is used. + + Raises: + errors.AuthorizationError: if authorization failed. + errors.NotFoundError: if an object that's expected to exist doesn't. + """ + common.validate_file_path(src) + common.validate_file_path(dst) + + if metadata is None: + metadata = {} + copy_meta = 'COPY' + else: + copy_meta = 'REPLACE' + metadata.update({'x-goog-copy-source': src, + 'x-goog-metadata-directive': copy_meta}) + + api = storage_api._get_storage_api(retry_params=retry_params) + status, resp_headers, content = api.put_object( + api_utils._quote_filename(dst), headers=metadata) + errors.check_status(status, [200], src, metadata, resp_headers, body=content) + + +def listbucket(path_prefix, marker=None, prefix=None, max_keys=None, + delimiter=None, retry_params=None, _account_id=None): + """Returns a GCSFileStat iterator over a bucket. + + Optional arguments can limit the result to a subset of files under bucket. + + This function has two modes: + 1. List bucket mode: Lists all files in the bucket without any concept of + hierarchy. GCS doesn't have real directory hierarchies. + 2. Directory emulation mode: If you specify the 'delimiter' argument, + it is used as a path separator to emulate a hierarchy of directories. + In this mode, the "path_prefix" argument should end in the delimiter + specified (thus designates a logical directory). The logical directory's + contents, both files and subdirectories, are listed. The names of + subdirectories returned will end with the delimiter. So listbucket + can be called with the subdirectory name to list the subdirectory's + contents. + + Args: + path_prefix: A Google Cloud Storage path of format "/bucket" or + "/bucket/prefix". Only objects whose fullpath starts with the + path_prefix will be returned. + marker: Another path prefix. Only objects whose fullpath starts + lexicographically after marker will be returned (exclusive). + prefix: Deprecated. Use path_prefix. + max_keys: The limit on the number of objects to return. int. + For best performance, specify max_keys only if you know how many objects + you want. Otherwise, this method requests large batches and handles + pagination for you. + delimiter: Use to turn on directory mode. str of one or multiple chars + that your bucket uses as its directory separator. + retry_params: An api_utils.RetryParams for this call to GCS. If None, + the default one is used. + _account_id: Internal-use only. + + Examples: + For files "/bucket/a", + "/bucket/bar/1" + "/bucket/foo", + "/bucket/foo/1", "/bucket/foo/2/1", "/bucket/foo/3/1", + + Regular mode: + listbucket("/bucket/f", marker="/bucket/foo/1") + will match "/bucket/foo/2/1", "/bucket/foo/3/1". + + Directory mode: + listbucket("/bucket/", delimiter="/") + will match "/bucket/a, "/bucket/bar/" "/bucket/foo", "/bucket/foo/". + listbucket("/bucket/foo/", delimiter="/") + will match "/bucket/foo/1", "/bucket/foo/2/", "/bucket/foo/3/" + + Returns: + Regular mode: + A GCSFileStat iterator over matched files ordered by filename. + The iterator returns GCSFileStat objects. filename, etag, st_size, + st_ctime, and is_dir are set. + + Directory emulation mode: + A GCSFileStat iterator over matched files and directories ordered by + name. The iterator returns GCSFileStat objects. For directories, + only the filename and is_dir fields are set. + + The last name yielded can be used as next call's marker. + """ + if prefix: + common.validate_bucket_path(path_prefix) + bucket = path_prefix + else: + bucket, prefix = common._process_path_prefix(path_prefix) + + if marker and marker.startswith(bucket): + marker = marker[len(bucket) + 1:] + + api = storage_api._get_storage_api(retry_params=retry_params, + account_id=_account_id) + options = {} + if marker: + options['marker'] = marker + if max_keys: + options['max-keys'] = max_keys + if prefix: + options['prefix'] = prefix + if delimiter: + options['delimiter'] = delimiter + + return _Bucket(api, bucket, options) + +def compose(list_of_files, destination_file, files_metadata=None, + content_type=None, retry_params=None, _account_id=None): + """Runs the GCS Compose on the given files. + + Merges between 2 and 32 files into one file. Composite files may even + be built from other existing composites, provided that the total + component count does not exceed 1024. See here for details: + https://cloud.google.com/storage/docs/composite-objects + + Args: + list_of_files: List of file name strings with no leading slashes or bucket. + destination_file: Path to the output file. Must have the bucket in the path. + files_metadata: Optional, file metadata, order must match list_of_files, + see link for available options: + https://cloud.google.com/storage/docs/composite-objects#_Xml + content_type: Optional, used to specify content-header of the output file. + retry_params: Optional, an api_utils.RetryParams for this call to GCS. + If None,the default one is used. + _account_id: Internal-use only. + + Raises: + ValueError: If the number of files is outside the range of 2-32. + """ + api = storage_api._get_storage_api(retry_params=retry_params, + account_id=_account_id) + + + if os.getenv('SERVER_SOFTWARE').startswith('Dev'): + def _temp_func(file_list, destination_file, content_type): + bucket = '/' + destination_file.split('/')[1] + '/' + with open(destination_file, 'w', content_type=content_type) as gcs_merge: + for source_file in file_list: + with open(bucket + source_file['Name'], 'r') as gcs_source: + gcs_merge.write(gcs_source.read()) + + compose_object = _temp_func + else: + compose_object = api.compose_object + file_list, _ = _validate_compose_list(destination_file, + list_of_files, + files_metadata, 32) + compose_object(file_list, destination_file, content_type) + + +def _file_exists(destination): + """Checks if a file exists. + + Tries to open the file. + If it succeeds returns True otherwise False. + + Args: + destination: Full path to the file (ie. /bucket/object) with leading slash. + + Returns: + True if the file is accessible otherwise False. + """ + try: + with open(destination, "r"): + return True + except errors.NotFoundError: + return False + + +def _validate_compose_list(destination_file, file_list, + files_metadata=None, number_of_files=32): + """Validates the file_list and merges the file_list, files_metadata. + + Args: + destination: Path to the file (ie. /destination_bucket/destination_file). + file_list: List of files to compose, see compose for details. + files_metadata: Meta details for each file in the file_list. + number_of_files: Maximum number of files allowed in the list. + + Returns: + A tuple (list_of_files, bucket): + list_of_files: Ready to use dict version of the list. + bucket: bucket name extracted from the file paths. + """ + common.validate_file_path(destination_file) + bucket = destination_file[0:(destination_file.index('/', 1) + 1)] + try: + if isinstance(file_list, types.StringTypes): + raise TypeError + list_len = len(file_list) + except TypeError: + raise TypeError('file_list must be a list') + + if list_len > number_of_files: + raise ValueError( + 'Compose attempted to create composite with too many' + '(%i) components; limit is (%i).' % (list_len, number_of_files)) + if list_len <= 0: + raise ValueError('Compose operation requires at' + ' least one component; 0 provided.') + + if files_metadata is None: + files_metadata = [] + elif len(files_metadata) > list_len: + raise ValueError('files_metadata contains more entries(%i)' + ' than file_list(%i)' + % (len(files_metadata), list_len)) + list_of_files = [] + for source_file, meta_data in itertools.izip_longest(file_list, + files_metadata): + if not isinstance(source_file, str): + raise TypeError('Each item of file_list must be a string') + if source_file.startswith('/'): + logging.warn('Detected a "/" at the start of the file, ' + 'Unless the file name contains a "/" it ' + ' may cause files to be misread') + if source_file.startswith(bucket): + logging.warn('Detected bucket name at the start of the file, ' + 'must not specify the bucket when listing file_names.' + ' May cause files to be misread') + common.validate_file_path(bucket + source_file) + + list_entry = {} + + if meta_data is not None: + list_entry.update(meta_data) + list_entry['Name'] = source_file + list_of_files.append(list_entry) + + return list_of_files, bucket + + +class _Bucket(object): + """A wrapper for a GCS bucket as the return value of listbucket.""" + + def __init__(self, api, path, options): + """Initialize. + + Args: + api: storage_api instance. + path: bucket path of form '/bucket'. + options: a dict of listbucket options. Please see listbucket doc. + """ + self._init(api, path, options) + + def _init(self, api, path, options): + self._api = api + self._path = path + self._options = options.copy() + self._get_bucket_fut = self._api.get_bucket_async( + self._path + '?' + urllib.urlencode(self._options)) + self._last_yield = None + self._new_max_keys = self._options.get('max-keys') + + def __getstate__(self): + options = self._options + if self._last_yield: + options['marker'] = self._last_yield.filename[len(self._path) + 1:] + if self._new_max_keys is not None: + options['max-keys'] = self._new_max_keys + return {'api': self._api, + 'path': self._path, + 'options': options} + + def __setstate__(self, state): + self._init(state['api'], state['path'], state['options']) + + def __iter__(self): + """Iter over the bucket. + + Yields: + GCSFileStat: a GCSFileStat for an object in the bucket. + They are ordered by GCSFileStat.filename. + """ + total = 0 + max_keys = self._options.get('max-keys') + + while self._get_bucket_fut: + status, resp_headers, content = self._get_bucket_fut.get_result() + errors.check_status(status, [200], self._path, resp_headers=resp_headers, + body=content, extras=self._options) + + if self._should_get_another_batch(content): + self._get_bucket_fut = self._api.get_bucket_async( + self._path + '?' + urllib.urlencode(self._options)) + else: + self._get_bucket_fut = None + + root = ET.fromstring(content) + dirs = self._next_dir_gen(root) + files = self._next_file_gen(root) + next_file = files.next() + next_dir = dirs.next() + + while ((max_keys is None or total < max_keys) and + not (next_file is None and next_dir is None)): + total += 1 + if next_file is None: + self._last_yield = next_dir + next_dir = dirs.next() + elif next_dir is None: + self._last_yield = next_file + next_file = files.next() + elif next_dir < next_file: + self._last_yield = next_dir + next_dir = dirs.next() + elif next_file < next_dir: + self._last_yield = next_file + next_file = files.next() + else: + logging.error( + 'Should never reach. next file is %r. next dir is %r.', + next_file, next_dir) + if self._new_max_keys: + self._new_max_keys -= 1 + yield self._last_yield + + def _next_file_gen(self, root): + """Generator for next file element in the document. + + Args: + root: root element of the XML tree. + + Yields: + GCSFileStat for the next file. + """ + for e in root.getiterator(common._T_CONTENTS): + st_ctime, size, etag, key = None, None, None, None + for child in e.getiterator('*'): + if child.tag == common._T_LAST_MODIFIED: + st_ctime = common.dt_str_to_posix(child.text) + elif child.tag == common._T_ETAG: + etag = child.text + elif child.tag == common._T_SIZE: + size = child.text + elif child.tag == common._T_KEY: + key = child.text + yield common.GCSFileStat(self._path + '/' + key, + size, etag, st_ctime) + e.clear() + yield None + + def _next_dir_gen(self, root): + """Generator for next directory element in the document. + + Args: + root: root element in the XML tree. + + Yields: + GCSFileStat for the next directory. + """ + for e in root.getiterator(common._T_COMMON_PREFIXES): + yield common.GCSFileStat( + self._path + '/' + e.find(common._T_PREFIX).text, + st_size=None, etag=None, st_ctime=None, is_dir=True) + e.clear() + yield None + + def _should_get_another_batch(self, content): + """Whether to issue another GET bucket call. + + Args: + content: response XML. + + Returns: + True if should, also update self._options for the next request. + False otherwise. + """ + if ('max-keys' in self._options and + self._options['max-keys'] <= common._MAX_GET_BUCKET_RESULT): + return False + + elements = self._find_elements( + content, set([common._T_IS_TRUNCATED, + common._T_NEXT_MARKER])) + if elements.get(common._T_IS_TRUNCATED, 'false').lower() != 'true': + return False + + next_marker = elements.get(common._T_NEXT_MARKER) + if next_marker is None: + self._options.pop('marker', None) + return False + self._options['marker'] = next_marker + return True + + def _find_elements(self, result, elements): + """Find interesting elements from XML. + + This function tries to only look for specified elements + without parsing the entire XML. The specified elements is better + located near the beginning. + + Args: + result: response XML. + elements: a set of interesting element tags. + + Returns: + A dict from element tag to element value. + """ + element_mapping = {} + result = StringIO.StringIO(result) + for _, e in ET.iterparse(result, events=('end',)): + if not elements: + break + if e.tag in elements: + element_mapping[e.tag] = e.text + elements.remove(e.tag) + return element_mapping diff --git a/lib/cloudstorage/common.py b/lib/cloudstorage/common.py new file mode 100755 index 0000000000..ab9c8df358 --- /dev/null +++ b/lib/cloudstorage/common.py @@ -0,0 +1,429 @@ +# Copyright 2012 Google Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, +# either express or implied. See the License for the specific +# language governing permissions and limitations under the License. + +"""Helpers shared by cloudstorage_stub and cloudstorage_api.""" + + + + + +__all__ = ['CS_XML_NS', + 'CSFileStat', + 'dt_str_to_posix', + 'local_api_url', + 'LOCAL_GCS_ENDPOINT', + 'local_run', + 'get_access_token', + 'get_stored_content_length', + 'get_metadata', + 'GCSFileStat', + 'http_time_to_posix', + 'memory_usage', + 'posix_time_to_http', + 'posix_to_dt_str', + 'set_access_token', + 'validate_options', + 'validate_bucket_name', + 'validate_bucket_path', + 'validate_file_path', + ] + + +import calendar +import datetime +from email import utils as email_utils +import logging +import os +import re + +try: + from google.appengine.api import runtime +except ImportError: + from google.appengine.api import runtime + + +_GCS_BUCKET_REGEX_BASE = r'[a-z0-9\.\-_]{3,63}' +_GCS_BUCKET_REGEX = re.compile(_GCS_BUCKET_REGEX_BASE + r'$') +_GCS_BUCKET_PATH_REGEX = re.compile(r'/' + _GCS_BUCKET_REGEX_BASE + r'$') +_GCS_PATH_PREFIX_REGEX = re.compile(r'/' + _GCS_BUCKET_REGEX_BASE + r'.*') +_GCS_FULLPATH_REGEX = re.compile(r'/' + _GCS_BUCKET_REGEX_BASE + r'/.*') +_GCS_METADATA = ['x-goog-meta-', + 'content-disposition', + 'cache-control', + 'content-encoding'] +_GCS_OPTIONS = _GCS_METADATA + ['x-goog-acl'] +CS_XML_NS = 'http://doc.s3.amazonaws.com/2006-03-01' +LOCAL_GCS_ENDPOINT = '/_ah/gcs' +_access_token = '' + + +_MAX_GET_BUCKET_RESULT = 1000 + + +def set_access_token(access_token): + """Set the shared access token to authenticate with Google Cloud Storage. + + When set, the library will always attempt to communicate with the + real Google Cloud Storage with this token even when running on dev appserver. + Note the token could expire so it's up to you to renew it. + + When absent, the library will automatically request and refresh a token + on appserver, or when on dev appserver, talk to a Google Cloud Storage + stub. + + Args: + access_token: you can get one by run 'gsutil -d ls' and copy the + str after 'Bearer'. + """ + global _access_token + _access_token = access_token + + +def get_access_token(): + """Returns the shared access token.""" + return _access_token + + +class GCSFileStat(object): + """Container for GCS file stat.""" + + def __init__(self, + filename, + st_size, + etag, + st_ctime, + content_type=None, + metadata=None, + is_dir=False): + """Initialize. + + For files, the non optional arguments are always set. + For directories, only filename and is_dir is set. + + Args: + filename: a Google Cloud Storage filename of form '/bucket/filename'. + st_size: file size in bytes. long compatible. + etag: hex digest of the md5 hash of the file's content. str. + st_ctime: posix file creation time. float compatible. + content_type: content type. str. + metadata: a str->str dict of user specified options when creating + the file. Possible keys are x-goog-meta-, content-disposition, + content-encoding, and cache-control. + is_dir: True if this represents a directory. False if this is a real file. + """ + self.filename = filename + self.is_dir = is_dir + self.st_size = None + self.st_ctime = None + self.etag = None + self.content_type = content_type + self.metadata = metadata + + if not is_dir: + self.st_size = long(st_size) + self.st_ctime = float(st_ctime) + if etag[0] == '"' and etag[-1] == '"': + etag = etag[1:-1] + self.etag = etag + + def __repr__(self): + if self.is_dir: + return '(directory: %s)' % self.filename + + return ( + '(filename: %(filename)s, st_size: %(st_size)s, ' + 'st_ctime: %(st_ctime)s, etag: %(etag)s, ' + 'content_type: %(content_type)s, ' + 'metadata: %(metadata)s)' % + dict(filename=self.filename, + st_size=self.st_size, + st_ctime=self.st_ctime, + etag=self.etag, + content_type=self.content_type, + metadata=self.metadata)) + + def __cmp__(self, other): + if not isinstance(other, self.__class__): + raise ValueError('Argument to cmp must have the same type. ' + 'Expect %s, got %s', self.__class__.__name__, + other.__class__.__name__) + if self.filename > other.filename: + return 1 + elif self.filename < other.filename: + return -1 + return 0 + + def __hash__(self): + if self.etag: + return hash(self.etag) + return hash(self.filename) + + +CSFileStat = GCSFileStat + + +def get_stored_content_length(headers): + """Return the content length (in bytes) of the object as stored in GCS. + + x-goog-stored-content-length should always be present except when called via + the local dev_appserver. Therefore if it is not present we default to the + standard content-length header. + + Args: + headers: a dict of headers from the http response. + + Returns: + the stored content length. + """ + length = headers.get('x-goog-stored-content-length') + if length is None: + length = headers.get('content-length') + return length + + +def get_metadata(headers): + """Get user defined options from HTTP response headers.""" + return dict((k, v) for k, v in headers.iteritems() + if any(k.lower().startswith(valid) for valid in _GCS_METADATA)) + + +def validate_bucket_name(name): + """Validate a Google Storage bucket name. + + Args: + name: a Google Storage bucket name with no prefix or suffix. + + Raises: + ValueError: if name is invalid. + """ + _validate_path(name) + if not _GCS_BUCKET_REGEX.match(name): + raise ValueError('Bucket should be 3-63 characters long using only a-z,' + '0-9, underscore, dash or dot but got %s' % name) + + +def validate_bucket_path(path): + """Validate a Google Cloud Storage bucket path. + + Args: + path: a Google Storage bucket path. It should have form '/bucket'. + + Raises: + ValueError: if path is invalid. + """ + _validate_path(path) + if not _GCS_BUCKET_PATH_REGEX.match(path): + raise ValueError('Bucket should have format /bucket ' + 'but got %s' % path) + + +def validate_file_path(path): + """Validate a Google Cloud Storage file path. + + Args: + path: a Google Storage file path. It should have form '/bucket/filename'. + + Raises: + ValueError: if path is invalid. + """ + _validate_path(path) + if not _GCS_FULLPATH_REGEX.match(path): + raise ValueError('Path should have format /bucket/filename ' + 'but got %s' % path) + + +def _process_path_prefix(path_prefix): + """Validate and process a Google Cloud Stoarge path prefix. + + Args: + path_prefix: a Google Cloud Storage path prefix of format '/bucket/prefix' + or '/bucket/' or '/bucket'. + + Raises: + ValueError: if path is invalid. + + Returns: + a tuple of /bucket and prefix. prefix can be None. + """ + _validate_path(path_prefix) + if not _GCS_PATH_PREFIX_REGEX.match(path_prefix): + raise ValueError('Path prefix should have format /bucket, /bucket/, ' + 'or /bucket/prefix but got %s.' % path_prefix) + bucket_name_end = path_prefix.find('/', 1) + bucket = path_prefix + prefix = None + if bucket_name_end != -1: + bucket = path_prefix[:bucket_name_end] + prefix = path_prefix[bucket_name_end + 1:] or None + return bucket, prefix + + +def _validate_path(path): + """Basic validation of Google Storage paths. + + Args: + path: a Google Storage path. It should have form '/bucket/filename' + or '/bucket'. + + Raises: + ValueError: if path is invalid. + TypeError: if path is not of type basestring. + """ + if not path: + raise ValueError('Path is empty') + if not isinstance(path, basestring): + raise TypeError('Path should be a string but is %s (%s).' % + (path.__class__, path)) + + +def validate_options(options): + """Validate Google Cloud Storage options. + + Args: + options: a str->basestring dict of options to pass to Google Cloud Storage. + + Raises: + ValueError: if option is not supported. + TypeError: if option is not of type str or value of an option + is not of type basestring. + """ + if not options: + return + + for k, v in options.iteritems(): + if not isinstance(k, str): + raise TypeError('option %r should be a str.' % k) + if not any(k.lower().startswith(valid) for valid in _GCS_OPTIONS): + raise ValueError('option %s is not supported.' % k) + if not isinstance(v, basestring): + raise TypeError('value %r for option %s should be of type basestring.' % + (v, k)) + + +def http_time_to_posix(http_time): + """Convert HTTP time format to posix time. + + See http://www.w3.org/Protocols/rfc2616/rfc2616-sec3.html#sec3.3.1 + for http time format. + + Args: + http_time: time in RFC 2616 format. e.g. + "Mon, 20 Nov 1995 19:12:08 GMT". + + Returns: + A float of secs from unix epoch. + """ + if http_time is not None: + return email_utils.mktime_tz(email_utils.parsedate_tz(http_time)) + + +def posix_time_to_http(posix_time): + """Convert posix time to HTML header time format. + + Args: + posix_time: unix time. + + Returns: + A datatime str in RFC 2616 format. + """ + if posix_time: + return email_utils.formatdate(posix_time, usegmt=True) + + +_DT_FORMAT = '%Y-%m-%dT%H:%M:%S' + + +def dt_str_to_posix(dt_str): + """format str to posix. + + datetime str is of format %Y-%m-%dT%H:%M:%S.%fZ, + e.g. 2013-04-12T00:22:27.978Z. According to ISO 8601, T is a separator + between date and time when they are on the same line. + Z indicates UTC (zero meridian). + + A pointer: http://www.cl.cam.ac.uk/~mgk25/iso-time.html + + This is used to parse LastModified node from GCS's GET bucket XML response. + + Args: + dt_str: A datetime str. + + Returns: + A float of secs from unix epoch. By posix definition, epoch is midnight + 1970/1/1 UTC. + """ + parsable, _ = dt_str.split('.') + dt = datetime.datetime.strptime(parsable, _DT_FORMAT) + return calendar.timegm(dt.utctimetuple()) + + +def posix_to_dt_str(posix): + """Reverse of str_to_datetime. + + This is used by GCS stub to generate GET bucket XML response. + + Args: + posix: A float of secs from unix epoch. + + Returns: + A datetime str. + """ + dt = datetime.datetime.utcfromtimestamp(posix) + dt_str = dt.strftime(_DT_FORMAT) + return dt_str + '.000Z' + + +def local_run(): + """Whether we should hit GCS dev appserver stub.""" + server_software = os.environ.get('SERVER_SOFTWARE') + if server_software is None: + return True + if 'remote_api' in server_software: + return False + if server_software.startswith(('Development', 'testutil')): + return True + return False + + +def local_api_url(): + """Return URL for GCS emulation on dev appserver.""" + return 'http://%s%s' % (os.environ.get('HTTP_HOST'), LOCAL_GCS_ENDPOINT) + + +def memory_usage(method): + """Log memory usage before and after a method.""" + def wrapper(*args, **kwargs): + logging.info('Memory before method %s is %s.', + method.__name__, runtime.memory_usage().current()) + result = method(*args, **kwargs) + logging.info('Memory after method %s is %s', + method.__name__, runtime.memory_usage().current()) + return result + return wrapper + + +def _add_ns(tagname): + return '{%(ns)s}%(tag)s' % {'ns': CS_XML_NS, + 'tag': tagname} + + +_T_CONTENTS = _add_ns('Contents') +_T_LAST_MODIFIED = _add_ns('LastModified') +_T_ETAG = _add_ns('ETag') +_T_KEY = _add_ns('Key') +_T_SIZE = _add_ns('Size') +_T_PREFIX = _add_ns('Prefix') +_T_COMMON_PREFIXES = _add_ns('CommonPrefixes') +_T_NEXT_MARKER = _add_ns('NextMarker') +_T_IS_TRUNCATED = _add_ns('IsTruncated') diff --git a/lib/cloudstorage/errors.py b/lib/cloudstorage/errors.py new file mode 100755 index 0000000000..21743806d4 --- /dev/null +++ b/lib/cloudstorage/errors.py @@ -0,0 +1,143 @@ +# Copyright 2012 Google Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, +# either express or implied. See the License for the specific +# language governing permissions and limitations under the License. + +"""Google Cloud Storage specific Files API calls.""" + + + + + +__all__ = ['AuthorizationError', + 'check_status', + 'Error', + 'FatalError', + 'FileClosedError', + 'ForbiddenError', + 'InvalidRange', + 'NotFoundError', + 'ServerError', + 'TimeoutError', + 'TransientError', + ] + +import httplib + + +class Error(Exception): + """Base error for all gcs operations. + + Error can happen on GAE side or GCS server side. + For details on a particular GCS HTTP response code, see + https://developers.google.com/storage/docs/reference-status#standardcodes + """ + + +class TransientError(Error): + """TransientError could be retried.""" + + +class TimeoutError(TransientError): + """HTTP 408 timeout.""" + + +class FatalError(Error): + """FatalError shouldn't be retried.""" + + +class FileClosedError(FatalError): + """File is already closed. + + This can happen when the upload has finished but 'write' is called on + a stale upload handle. + """ + + +class NotFoundError(FatalError): + """HTTP 404 resource not found.""" + + +class ForbiddenError(FatalError): + """HTTP 403 Forbidden. + + While GCS replies with a 403 error for many reasons, the most common one + is due to bucket permission not correctly setup for your app to access. + """ + + +class AuthorizationError(FatalError): + """HTTP 401 authentication required. + + Unauthorized request has been received by GCS. + + This error is mostly handled by GCS client. GCS client will request + a new access token and retry the request. + """ + + +class InvalidRange(FatalError): + """HTTP 416 RequestRangeNotSatifiable.""" + + +class ServerError(TransientError): + """HTTP >= 500 server side error.""" + + +def check_status(status, expected, path, headers=None, + resp_headers=None, body=None, extras=None): + """Check HTTP response status is expected. + + Args: + status: HTTP response status. int. + expected: a list of expected statuses. A list of ints. + path: filename or a path prefix. + headers: HTTP request headers. + resp_headers: HTTP response headers. + body: HTTP response body. + extras: extra info to be logged verbatim if error occurs. + + Raises: + AuthorizationError: if authorization failed. + NotFoundError: if an object that's expected to exist doesn't. + TimeoutError: if HTTP request timed out. + ServerError: if server experienced some errors. + FatalError: if any other unexpected errors occurred. + """ + if status in expected: + return + + msg = ('Expect status %r from Google Storage. But got status %d.\n' + 'Path: %r.\n' + 'Request headers: %r.\n' + 'Response headers: %r.\n' + 'Body: %r.\n' + 'Extra info: %r.\n' % + (expected, status, path, headers, resp_headers, body, extras)) + + if status == httplib.UNAUTHORIZED: + raise AuthorizationError(msg) + elif status == httplib.FORBIDDEN: + raise ForbiddenError(msg) + elif status == httplib.NOT_FOUND: + raise NotFoundError(msg) + elif status == httplib.REQUEST_TIMEOUT: + raise TimeoutError(msg) + elif status == httplib.REQUESTED_RANGE_NOT_SATISFIABLE: + raise InvalidRange(msg) + elif (status == httplib.OK and 308 in expected and + httplib.OK not in expected): + raise FileClosedError(msg) + elif status >= 500: + raise ServerError(msg) + else: + raise FatalError(msg) diff --git a/lib/cloudstorage/rest_api.py b/lib/cloudstorage/rest_api.py new file mode 100755 index 0000000000..ff02819282 --- /dev/null +++ b/lib/cloudstorage/rest_api.py @@ -0,0 +1,280 @@ +# Copyright 2012 Google Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, +# either express or implied. See the License for the specific +# language governing permissions and limitations under the License. + +"""Base and helper classes for Google RESTful APIs.""" + + + + + +__all__ = ['add_sync_methods'] + +import logging +import os +import random +import time + +from . import api_utils + +try: + from google.appengine.api import app_identity + from google.appengine.api import lib_config + from google.appengine.ext import ndb +except ImportError: + from google.appengine.api import app_identity + from google.appengine.api import lib_config + from google.appengine.ext import ndb + + + +@ndb.tasklet +def _make_token_async(scopes, service_account_id): + """Get a fresh authentication token. + + Args: + scopes: A list of scopes. + service_account_id: Internal-use only. + + Raises: + An ndb.Return with a tuple (token, expiration_time) where expiration_time is + seconds since the epoch. + """ + rpc = app_identity.create_rpc() + app_identity.make_get_access_token_call(rpc, scopes, service_account_id) + token, expires_at = yield rpc + raise ndb.Return((token, expires_at)) + + +class _ConfigDefaults(object): + TOKEN_MAKER = _make_token_async + +_config = lib_config.register('cloudstorage', _ConfigDefaults.__dict__) + + +def _make_sync_method(name): + """Helper to synthesize a synchronous method from an async method name. + + Used by the @add_sync_methods class decorator below. + + Args: + name: The name of the synchronous method. + + Returns: + A method (with first argument 'self') that retrieves and calls + self., passing its own arguments, expects it to return a + Future, and then waits for and returns that Future's result. + """ + + def sync_wrapper(self, *args, **kwds): + method = getattr(self, name) + future = method(*args, **kwds) + return future.get_result() + + return sync_wrapper + + +def add_sync_methods(cls): + """Class decorator to add synchronous methods corresponding to async methods. + + This modifies the class in place, adding additional methods to it. + If a synchronous method of a given name already exists it is not + replaced. + + Args: + cls: A class. + + Returns: + The same class, modified in place. + """ + for name in cls.__dict__.keys(): + if name.endswith('_async'): + sync_name = name[:-6] + if not hasattr(cls, sync_name): + setattr(cls, sync_name, _make_sync_method(name)) + return cls + + +class _AE_TokenStorage_(ndb.Model): + """Entity to store app_identity tokens in memcache.""" + + token = ndb.StringProperty() + expires = ndb.FloatProperty() + + +class _RestApi(object): + """Base class for REST-based API wrapper classes. + + This class manages authentication tokens and request retries. All + APIs are available as synchronous and async methods; synchronous + methods are synthesized from async ones by the add_sync_methods() + function in this module. + + WARNING: Do NOT directly use this api. It's an implementation detail + and is subject to change at any release. + """ + + def __init__(self, scopes, service_account_id=None, token_maker=None, + retry_params=None): + """Constructor. + + Args: + scopes: A scope or a list of scopes. + service_account_id: Internal use only. + token_maker: An asynchronous function of the form + (scopes, service_account_id) -> (token, expires). + retry_params: An instance of api_utils.RetryParams. If None, the + default for current thread will be used. + """ + + if isinstance(scopes, basestring): + scopes = [scopes] + self.scopes = scopes + self.service_account_id = service_account_id + self.make_token_async = token_maker or _config.TOKEN_MAKER + if not retry_params: + retry_params = api_utils._get_default_retry_params() + self.retry_params = retry_params + self.user_agent = {'User-Agent': retry_params._user_agent} + self.expiration_headroom = random.randint(60, 240) + + def __getstate__(self): + """Store state as part of serialization/pickling.""" + return {'scopes': self.scopes, + 'id': self.service_account_id, + 'a_maker': (None if self.make_token_async == _make_token_async + else self.make_token_async), + 'retry_params': self.retry_params, + 'expiration_headroom': self.expiration_headroom} + + def __setstate__(self, state): + """Restore state as part of deserialization/unpickling.""" + self.__init__(state['scopes'], + service_account_id=state['id'], + token_maker=state['a_maker'], + retry_params=state['retry_params']) + self.expiration_headroom = state['expiration_headroom'] + + @ndb.tasklet + def do_request_async(self, url, method='GET', headers=None, payload=None, + deadline=None, callback=None): + """Issue one HTTP request. + + It performs async retries using tasklets. + + Args: + url: the url to fetch. + method: the method in which to fetch. + headers: the http headers. + payload: the data to submit in the fetch. + deadline: the deadline in which to make the call. + callback: the call to make once completed. + + Yields: + The async fetch of the url. + """ + retry_wrapper = api_utils._RetryWrapper( + self.retry_params, + retriable_exceptions=api_utils._RETRIABLE_EXCEPTIONS, + should_retry=api_utils._should_retry) + resp = yield retry_wrapper.run( + self.urlfetch_async, + url=url, + method=method, + headers=headers, + payload=payload, + deadline=deadline, + callback=callback, + follow_redirects=False) + raise ndb.Return((resp.status_code, resp.headers, resp.content)) + + @ndb.tasklet + def get_token_async(self, refresh=False): + """Get an authentication token. + + The token is cached in memcache, keyed by the scopes argument. + Uses a random token expiration headroom value generated in the constructor + to eliminate a burst of GET_ACCESS_TOKEN API requests. + + Args: + refresh: If True, ignore a cached token; default False. + + Yields: + An authentication token. This token is guaranteed to be non-expired. + """ + key = '%s,%s' % (self.service_account_id, ','.join(self.scopes)) + ts = yield _AE_TokenStorage_.get_by_id_async( + key, + use_cache=True, + use_memcache=self.retry_params.memcache_access_token, + use_datastore=self.retry_params.save_access_token) + if refresh or ts is None or ts.expires < ( + time.time() + self.expiration_headroom): + token, expires_at = yield self.make_token_async( + self.scopes, self.service_account_id) + timeout = int(expires_at - time.time()) + ts = _AE_TokenStorage_(id=key, token=token, expires=expires_at) + if timeout > 0: + yield ts.put_async(memcache_timeout=timeout, + use_datastore=self.retry_params.save_access_token, + force_writes=True, + use_cache=True, + use_memcache=self.retry_params.memcache_access_token) + raise ndb.Return(ts.token) + + @ndb.tasklet + def urlfetch_async(self, url, method='GET', headers=None, + payload=None, deadline=None, callback=None, + follow_redirects=False): + """Make an async urlfetch() call. + + This is an async wrapper around urlfetch(). It adds an authentication + header. + + Args: + url: the url to fetch. + method: the method in which to fetch. + headers: the http headers. + payload: the data to submit in the fetch. + deadline: the deadline in which to make the call. + callback: the call to make once completed. + follow_redirects: whether or not to follow redirects. + + Yields: + This returns a Future despite not being decorated with @ndb.tasklet! + """ + headers = {} if headers is None else dict(headers) + headers.update(self.user_agent) + try: + self.token = yield self.get_token_async() + except app_identity.InternalError, e: + if os.environ.get('DATACENTER', '').endswith('sandman'): + self.token = None + logging.warning('Could not fetch an authentication token in sandman ' + 'based Appengine devel setup; proceeding without one.') + else: + raise e + if self.token: + headers['authorization'] = 'OAuth ' + self.token + + deadline = deadline or self.retry_params.urlfetch_timeout + + ctx = ndb.get_context() + resp = yield ctx.urlfetch( + url, payload=payload, method=method, + headers=headers, follow_redirects=follow_redirects, + deadline=deadline, callback=callback) + raise ndb.Return(resp) + + +_RestApi = add_sync_methods(_RestApi) diff --git a/lib/cloudstorage/storage_api.py b/lib/cloudstorage/storage_api.py new file mode 100755 index 0000000000..26254fdade --- /dev/null +++ b/lib/cloudstorage/storage_api.py @@ -0,0 +1,933 @@ +# Copyright 2012 Google Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, +# either express or implied. See the License for the specific +# language governing permissions and limitations under the License. + +"""Python wrappers for the Google Storage RESTful API.""" + + + + + +__all__ = ['ReadBuffer', + 'StreamingBuffer', + ] + +import collections +import os +import urlparse + +from . import api_utils +from . import common +from . import errors +from . import rest_api + +try: + from google.appengine.api import urlfetch + from google.appengine.ext import ndb +except ImportError: + from google.appengine.api import urlfetch + from google.appengine.ext import ndb + +from google.appengine.api import app_identity + + +def _get_storage_api(retry_params, account_id=None): + """Returns storage_api instance for API methods. + + Args: + retry_params: An instance of api_utils.RetryParams. If none, + thread's default will be used. + account_id: Internal-use only. + + Returns: + A storage_api instance to handle urlfetch work to GCS. + On dev appserver, this instance will talk to a local stub by default. + However, if you pass the arguments --appidentity_email_address and + --appidentity_private_key_path to dev_appserver.py it will attempt to use + the real GCS with these credentials. Alternatively, you can set a specific + access token with common.set_access_token. You can also pass + --default_gcs_bucket_name to set the default bucket. + """ + + + api = _StorageApi(_StorageApi.full_control_scope, + service_account_id=account_id, + retry_params=retry_params) + + # when running local unit tests, the service account is test@localhost + # from google.appengine.api.app_identity.app_identity_stub.APP_SERVICE_ACCOUNT_NAME + service_account = app_identity.get_service_account_name() + if (common.local_run() and not common.get_access_token() + and (not service_account or service_account.endswith('@localhost'))): + api.api_url = common.local_api_url() + if common.get_access_token(): + api.token = common.get_access_token() + return api + + +class _StorageApi(rest_api._RestApi): + """A simple wrapper for the Google Storage RESTful API. + + WARNING: Do NOT directly use this api. It's an implementation detail + and is subject to change at any release. + + All async methods have similar args and returns. + + Args: + path: The path to the Google Storage object or bucket, e.g. + '/mybucket/myfile' or '/mybucket'. + **kwd: Options for urlfetch. e.g. + headers={'content-type': 'text/plain'}, payload='blah'. + + Returns: + A ndb Future. When fulfilled, future.get_result() should return + a tuple of (status, headers, content) that represents a HTTP response + of Google Cloud Storage XML API. + """ + + api_url = 'https://storage.googleapis.com' + read_only_scope = 'https://www.googleapis.com/auth/devstorage.read_only' + read_write_scope = 'https://www.googleapis.com/auth/devstorage.read_write' + full_control_scope = 'https://www.googleapis.com/auth/devstorage.full_control' + + def __getstate__(self): + """Store state as part of serialization/pickling. + + Returns: + A tuple (of dictionaries) with the state of this object + """ + return (super(_StorageApi, self).__getstate__(), {'api_url': self.api_url}) + + def __setstate__(self, state): + """Restore state as part of deserialization/unpickling. + + Args: + state: the tuple from a __getstate__ call + """ + superstate, localstate = state + super(_StorageApi, self).__setstate__(superstate) + self.api_url = localstate['api_url'] + + @api_utils._eager_tasklet + @ndb.tasklet + def do_request_async(self, url, method='GET', headers=None, payload=None, + deadline=None, callback=None): + """Inherit docs. + + This method translates urlfetch exceptions to more service specific ones. + """ + if headers is None: + headers = {} + if 'x-goog-api-version' not in headers: + headers['x-goog-api-version'] = '2' + headers['accept-encoding'] = 'gzip, *' + try: + resp_tuple = yield super(_StorageApi, self).do_request_async( + url, method=method, headers=headers, payload=payload, + deadline=deadline, callback=callback) + except urlfetch.DownloadError as e: + raise errors.TimeoutError( + 'Request to Google Cloud Storage timed out.', e) + + raise ndb.Return(resp_tuple) + + + def post_object_async(self, path, **kwds): + """POST to an object.""" + return self.do_request_async(self.api_url + path, 'POST', **kwds) + + def put_object_async(self, path, **kwds): + """PUT an object.""" + return self.do_request_async(self.api_url + path, 'PUT', **kwds) + + def get_object_async(self, path, **kwds): + """GET an object. + + Note: No payload argument is supported. + """ + return self.do_request_async(self.api_url + path, 'GET', **kwds) + + def delete_object_async(self, path, **kwds): + """DELETE an object. + + Note: No payload argument is supported. + """ + return self.do_request_async(self.api_url + path, 'DELETE', **kwds) + + def head_object_async(self, path, **kwds): + """HEAD an object. + + Depending on request headers, HEAD returns various object properties, + e.g. Content-Length, Last-Modified, and ETag. + + Note: No payload argument is supported. + """ + return self.do_request_async(self.api_url + path, 'HEAD', **kwds) + + def get_bucket_async(self, path, **kwds): + """GET a bucket.""" + return self.do_request_async(self.api_url + path, 'GET', **kwds) + + def compose_object(self, file_list, destination_file, content_type): + """COMPOSE multiple objects together. + + Using the given list of files, calls the put object with the compose flag. + This call merges all the files into the destination file. + + Args: + file_list: list of dicts with the file name. + destination_file: Path to the destination file. + content_type: Content type for the destination file. + """ + + xml_setting_list = [''] + + for meta_data in file_list: + xml_setting_list.append('') + for key, val in meta_data.iteritems(): + xml_setting_list.append('<%s>%s' % (key, val, key)) + xml_setting_list.append('') + xml_setting_list.append('') + xml = ''.join(xml_setting_list) + + if content_type is not None: + headers = {'Content-Type': content_type} + else: + headers = None + status, resp_headers, content = self.put_object( + api_utils._quote_filename(destination_file) + '?compose', + payload=xml, + headers=headers) + errors.check_status(status, [200], destination_file, resp_headers, + body=content) + + +_StorageApi = rest_api.add_sync_methods(_StorageApi) + + +class ReadBuffer(object): + """A class for reading Google storage files.""" + + DEFAULT_BUFFER_SIZE = 1024 * 1024 + MAX_REQUEST_SIZE = 30 * DEFAULT_BUFFER_SIZE + + def __init__(self, + api, + path, + buffer_size=DEFAULT_BUFFER_SIZE, + max_request_size=MAX_REQUEST_SIZE, + offset=0): + """Constructor. + + Args: + api: A StorageApi instance. + path: Quoted/escaped path to the object, e.g. /mybucket/myfile + buffer_size: buffer size. The ReadBuffer keeps + one buffer. But there may be a pending future that contains + a second buffer. This size must be less than max_request_size. + max_request_size: Max bytes to request in one urlfetch. + offset: Number of bytes to skip at the start of the file. If None, 0 is + used. + """ + self._api = api + self._path = path + self.name = api_utils._unquote_filename(path) + self.closed = False + + assert buffer_size <= max_request_size + self._buffer_size = buffer_size + self._max_request_size = max_request_size + self._offset = offset + + self._buffer = _Buffer() + self._etag = None + + get_future = self._get_segment(offset, self._buffer_size, check_response=False) + + status, headers, content = self._api.head_object(path) + errors.check_status(status, [200], path, resp_headers=headers, body=content) + self._file_size = long(common.get_stored_content_length(headers)) + self._check_etag(headers.get('etag')) + + self._buffer_future = None + + if self._file_size != 0: + content, check_response_closure = get_future.get_result() + check_response_closure() + self._buffer.reset(content) + self._request_next_buffer() + + def __getstate__(self): + """Store state as part of serialization/pickling. + + The contents of the read buffer are not stored, only the current offset for + data read by the client. A new read buffer is established at unpickling. + The head information for the object (file size and etag) are stored to + reduce startup and ensure the file has not changed. + + Returns: + A dictionary with the state of this object + """ + return {'api': self._api, + 'path': self._path, + 'buffer_size': self._buffer_size, + 'request_size': self._max_request_size, + 'etag': self._etag, + 'size': self._file_size, + 'offset': self._offset, + 'closed': self.closed} + + def __setstate__(self, state): + """Restore state as part of deserialization/unpickling. + + Args: + state: the dictionary from a __getstate__ call + + Along with restoring the state, pre-fetch the next read buffer. + """ + self._api = state['api'] + self._path = state['path'] + self.name = api_utils._unquote_filename(self._path) + self._buffer_size = state['buffer_size'] + self._max_request_size = state['request_size'] + self._etag = state['etag'] + self._file_size = state['size'] + self._offset = state['offset'] + self._buffer = _Buffer() + self.closed = state['closed'] + self._buffer_future = None + if self._remaining() and not self.closed: + self._request_next_buffer() + + def __iter__(self): + """Iterator interface. + + Note the ReadBuffer container itself is the iterator. It's + (quote PEP0234) + 'destructive: they consumes all the values and a second iterator + cannot easily be created that iterates independently over the same values. + You could open the file for the second time, or seek() to the beginning.' + + Returns: + Self. + """ + return self + + def next(self): + line = self.readline() + if not line: + raise StopIteration() + return line + + def readline(self, size=-1): + """Read one line delimited by '\n' from the file. + + A trailing newline character is kept in the string. It may be absent when a + file ends with an incomplete line. If the size argument is non-negative, + it specifies the maximum string size (counting the newline) to return. + A negative size is the same as unspecified. Empty string is returned + only when EOF is encountered immediately. + + Args: + size: Maximum number of bytes to read. If not specified, readline stops + only on '\n' or EOF. + + Returns: + The data read as a string. + + Raises: + IOError: When this buffer is closed. + """ + self._check_open() + if size == 0 or not self._remaining(): + return '' + + data_list = [] + newline_offset = self._buffer.find_newline(size) + while newline_offset < 0: + data = self._buffer.read(size) + size -= len(data) + self._offset += len(data) + data_list.append(data) + if size == 0 or not self._remaining(): + return ''.join(data_list) + self._buffer.reset(self._buffer_future.get_result()) + self._request_next_buffer() + newline_offset = self._buffer.find_newline(size) + + data = self._buffer.read_to_offset(newline_offset + 1) + self._offset += len(data) + data_list.append(data) + + return ''.join(data_list) + + def read(self, size=-1): + """Read data from RAW file. + + Args: + size: Number of bytes to read as integer. Actual number of bytes + read is always equal to size unless EOF is reached. If size is + negative or unspecified, read the entire file. + + Returns: + data read as str. + + Raises: + IOError: When this buffer is closed. + """ + self._check_open() + if not self._remaining(): + return '' + + data_list = [] + while True: + remaining = self._buffer.remaining() + if size >= 0 and size < remaining: + data_list.append(self._buffer.read(size)) + self._offset += size + break + else: + size -= remaining + self._offset += remaining + data_list.append(self._buffer.read()) + + if self._buffer_future is None: + if size < 0 or size >= self._remaining(): + needs = self._remaining() + else: + needs = size + data_list.extend(self._get_segments(self._offset, needs)) + self._offset += needs + break + + if self._buffer_future: + self._buffer.reset(self._buffer_future.get_result()) + self._buffer_future = None + + if self._buffer_future is None: + self._request_next_buffer() + return ''.join(data_list) + + def _remaining(self): + return self._file_size - self._offset + + def _request_next_buffer(self): + """Request next buffer. + + Requires self._offset and self._buffer are in consistent state. + """ + self._buffer_future = None + next_offset = self._offset + self._buffer.remaining() + if next_offset != self._file_size: + self._buffer_future = self._get_segment(next_offset, + self._buffer_size) + + def _get_segments(self, start, request_size): + """Get segments of the file from Google Storage as a list. + + A large request is broken into segments to avoid hitting urlfetch + response size limit. Each segment is returned from a separate urlfetch. + + Args: + start: start offset to request. Inclusive. Have to be within the + range of the file. + request_size: number of bytes to request. + + Returns: + A list of file segments in order + """ + if not request_size: + return [] + + end = start + request_size + futures = [] + + while request_size > self._max_request_size: + futures.append(self._get_segment(start, self._max_request_size)) + request_size -= self._max_request_size + start += self._max_request_size + if start < end: + futures.append(self._get_segment(start, end - start)) + return [fut.get_result() for fut in futures] + + @ndb.tasklet + def _get_segment(self, start, request_size, check_response=True): + """Get a segment of the file from Google Storage. + + Args: + start: start offset of the segment. Inclusive. Have to be within the + range of the file. + request_size: number of bytes to request. Have to be small enough + for a single urlfetch request. May go over the logical range of the + file. + check_response: True to check the validity of GCS response automatically + before the future returns. False otherwise. See Yields section. + + Yields: + If check_response is True, the segment [start, start + request_size) + of the file. + Otherwise, a tuple. The first element is the unverified file segment. + The second element is a closure that checks response. Caller should + first invoke the closure before consuing the file segment. + + Raises: + ValueError: if the file has changed while reading. + """ + end = start + request_size - 1 + content_range = '%d-%d' % (start, end) + headers = {'Range': 'bytes=' + content_range} + status, resp_headers, content = yield self._api.get_object_async( + self._path, headers=headers) + def _checker(): + errors.check_status(status, [200, 206], self._path, headers, + resp_headers, body=content) + self._check_etag(resp_headers.get('etag')) + if check_response: + _checker() + raise ndb.Return(content) + raise ndb.Return(content, _checker) + + def _check_etag(self, etag): + """Check if etag is the same across requests to GCS. + + If self._etag is None, set it. If etag is set, check that the new + etag equals the old one. + + In the __init__ method, we fire one HEAD and one GET request using + ndb tasklet. One of them would return first and set the first value. + + Args: + etag: etag from a GCS HTTP response. None if etag is not part of the + response header. It could be None for example in the case of GCS + composite file. + + Raises: + ValueError: if two etags are not equal. + """ + if etag is None: + return + elif self._etag is None: + self._etag = etag + elif self._etag != etag: + raise ValueError('File on GCS has changed while reading.') + + def close(self): + self.closed = True + self._buffer = None + self._buffer_future = None + + def __enter__(self): + return self + + def __exit__(self, atype, value, traceback): + self.close() + return False + + def seek(self, offset, whence=os.SEEK_SET): + """Set the file's current offset. + + Note if the new offset is out of bound, it is adjusted to either 0 or EOF. + + Args: + offset: seek offset as number. + whence: seek mode. Supported modes are os.SEEK_SET (absolute seek), + os.SEEK_CUR (seek relative to the current position), and os.SEEK_END + (seek relative to the end, offset should be negative). + + Raises: + IOError: When this buffer is closed. + ValueError: When whence is invalid. + """ + self._check_open() + + self._buffer.reset() + self._buffer_future = None + + if whence == os.SEEK_SET: + self._offset = offset + elif whence == os.SEEK_CUR: + self._offset += offset + elif whence == os.SEEK_END: + self._offset = self._file_size + offset + else: + raise ValueError('Whence mode %s is invalid.' % str(whence)) + + self._offset = min(self._offset, self._file_size) + self._offset = max(self._offset, 0) + if self._remaining(): + self._request_next_buffer() + + def tell(self): + """Tell the file's current offset. + + Returns: + current offset in reading this file. + + Raises: + IOError: When this buffer is closed. + """ + self._check_open() + return self._offset + + def _check_open(self): + if self.closed: + raise IOError('Buffer is closed.') + + def seekable(self): + return True + + def readable(self): + return True + + def writable(self): + return False + + +class _Buffer(object): + """In memory buffer.""" + + def __init__(self): + self.reset() + + def reset(self, content='', offset=0): + self._buffer = content + self._offset = offset + + def read(self, size=-1): + """Returns bytes from self._buffer and update related offsets. + + Args: + size: number of bytes to read starting from current offset. + Read the entire buffer if negative. + + Returns: + Requested bytes from buffer. + """ + if size < 0: + offset = len(self._buffer) + else: + offset = self._offset + size + return self.read_to_offset(offset) + + def read_to_offset(self, offset): + """Returns bytes from self._buffer and update related offsets. + + Args: + offset: read from current offset to this offset, exclusive. + + Returns: + Requested bytes from buffer. + """ + assert offset >= self._offset + result = self._buffer[self._offset: offset] + self._offset += len(result) + return result + + def remaining(self): + return len(self._buffer) - self._offset + + def find_newline(self, size=-1): + """Search for newline char in buffer starting from current offset. + + Args: + size: number of bytes to search. -1 means all. + + Returns: + offset of newline char in buffer. -1 if doesn't exist. + """ + if size < 0: + return self._buffer.find('\n', self._offset) + return self._buffer.find('\n', self._offset, self._offset + size) + + +class StreamingBuffer(object): + """A class for creating large objects using the 'resumable' API. + + The API is a subset of the Python writable stream API sufficient to + support writing zip files using the zipfile module. + + The exact sequence of calls and use of headers is documented at + https://developers.google.com/storage/docs/developer-guide#unknownresumables + """ + + _blocksize = 256 * 1024 + + _flushsize = 8 * _blocksize + + _maxrequestsize = 9 * 4 * _blocksize + + def __init__(self, + api, + path, + content_type=None, + gcs_headers=None): + """Constructor. + + Args: + api: A StorageApi instance. + path: Quoted/escaped path to the object, e.g. /mybucket/myfile + content_type: Optional content-type; Default value is + delegate to Google Cloud Storage. + gcs_headers: additional gs headers as a str->str dict, e.g + {'x-goog-acl': 'private', 'x-goog-meta-foo': 'foo'}. + Raises: + IOError: When this location can not be found. + """ + assert self._maxrequestsize > self._blocksize + assert self._maxrequestsize % self._blocksize == 0 + assert self._maxrequestsize >= self._flushsize + + self._api = api + self._path = path + + self.name = api_utils._unquote_filename(path) + self.closed = False + + self._buffer = collections.deque() + self._buffered = 0 + self._written = 0 + self._offset = 0 + + headers = {'x-goog-resumable': 'start'} + if content_type: + headers['content-type'] = content_type + if gcs_headers: + headers.update(gcs_headers) + status, resp_headers, content = self._api.post_object(path, headers=headers) + errors.check_status(status, [201], path, headers, resp_headers, + body=content) + loc = resp_headers.get('location') + if not loc: + raise IOError('No location header found in 201 response') + parsed = urlparse.urlparse(loc) + self._path_with_token = '%s?%s' % (self._path, parsed.query) + + def __getstate__(self): + """Store state as part of serialization/pickling. + + The contents of the write buffer are stored. Writes to the underlying + storage are required to be on block boundaries (_blocksize) except for the + last write. In the worst case the pickled version of this object may be + slightly larger than the blocksize. + + Returns: + A dictionary with the state of this object + + """ + return {'api': self._api, + 'path': self._path, + 'path_token': self._path_with_token, + 'buffer': self._buffer, + 'buffered': self._buffered, + 'written': self._written, + 'offset': self._offset, + 'closed': self.closed} + + def __setstate__(self, state): + """Restore state as part of deserialization/unpickling. + + Args: + state: the dictionary from a __getstate__ call + """ + self._api = state['api'] + self._path_with_token = state['path_token'] + self._buffer = state['buffer'] + self._buffered = state['buffered'] + self._written = state['written'] + self._offset = state['offset'] + self.closed = state['closed'] + self._path = state['path'] + self.name = api_utils._unquote_filename(self._path) + + def write(self, data): + """Write some bytes. + + Args: + data: data to write. str. + + Raises: + TypeError: if data is not of type str. + """ + self._check_open() + if not isinstance(data, str): + raise TypeError('Expected str but got %s.' % type(data)) + if not data: + return + self._buffer.append(data) + self._buffered += len(data) + self._offset += len(data) + if self._buffered >= self._flushsize: + self._flush() + + def flush(self): + """Flush as much as possible to GCS. + + GCS *requires* that all writes except for the final one align on + 256KB boundaries. So the internal buffer may still have < 256KB bytes left + after flush. + """ + self._check_open() + self._flush(finish=False) + + def tell(self): + """Return the total number of bytes passed to write() so far. + + (There is no seek() method.) + """ + return self._offset + + def close(self): + """Flush the buffer and finalize the file. + + When this returns the new file is available for reading. + """ + if not self.closed: + self.closed = True + self._flush(finish=True) + self._buffer = None + + def __enter__(self): + return self + + def __exit__(self, atype, value, traceback): + self.close() + return False + + def _flush(self, finish=False): + """Internal API to flush. + + Buffer is flushed to GCS only when the total amount of buffered data is at + least self._blocksize, or to flush the final (incomplete) block of + the file with finish=True. + """ + while ((finish and self._buffered >= 0) or + (not finish and self._buffered >= self._blocksize)): + tmp_buffer = [] + tmp_buffer_len = 0 + + excess = 0 + while self._buffer: + buf = self._buffer.popleft() + size = len(buf) + self._buffered -= size + tmp_buffer.append(buf) + tmp_buffer_len += size + if tmp_buffer_len >= self._maxrequestsize: + excess = tmp_buffer_len - self._maxrequestsize + break + if not finish and ( + tmp_buffer_len % self._blocksize + self._buffered < + self._blocksize): + excess = tmp_buffer_len % self._blocksize + break + + if excess: + over = tmp_buffer.pop() + size = len(over) + assert size >= excess + tmp_buffer_len -= size + head, tail = over[:-excess], over[-excess:] + self._buffer.appendleft(tail) + self._buffered += len(tail) + if head: + tmp_buffer.append(head) + tmp_buffer_len += len(head) + + data = ''.join(tmp_buffer) + file_len = '*' + if finish and not self._buffered: + file_len = self._written + len(data) + self._send_data(data, self._written, file_len) + self._written += len(data) + if file_len != '*': + break + + def _send_data(self, data, start_offset, file_len): + """Send the block to the storage service. + + This is a utility method that does not modify self. + + Args: + data: data to send in str. + start_offset: start offset of the data in relation to the file. + file_len: an int if this is the last data to append to the file. + Otherwise '*'. + """ + headers = {} + end_offset = start_offset + len(data) - 1 + + if data: + headers['content-range'] = ('bytes %d-%d/%s' % + (start_offset, end_offset, file_len)) + else: + headers['content-range'] = ('bytes */%s' % file_len) + + status, response_headers, content = self._api.put_object( + self._path_with_token, payload=data, headers=headers) + if file_len == '*': + expected = 308 + else: + expected = 200 + errors.check_status(status, [expected], self._path, headers, + response_headers, content, + {'upload_path': self._path_with_token}) + + def _get_offset_from_gcs(self): + """Get the last offset that has been written to GCS. + + This is a utility method that does not modify self. + + Returns: + an int of the last offset written to GCS by this upload, inclusive. + -1 means nothing has been written. + """ + headers = {'content-range': 'bytes */*'} + status, response_headers, content = self._api.put_object( + self._path_with_token, headers=headers) + errors.check_status(status, [308], self._path, headers, + response_headers, content, + {'upload_path': self._path_with_token}) + val = response_headers.get('range') + if val is None: + return -1 + _, offset = val.rsplit('-', 1) + return int(offset) + + def _force_close(self, file_length=None): + """Close this buffer on file_length. + + Finalize this upload immediately on file_length. + Contents that are still in memory will not be uploaded. + + This is a utility method that does not modify self. + + Args: + file_length: file length. Must match what has been uploaded. If None, + it will be queried from GCS. + """ + if file_length is None: + file_length = self._get_offset_from_gcs() + 1 + self._send_data('', 0, file_length) + + def _check_open(self): + if self.closed: + raise IOError('Buffer is closed.') + + def seekable(self): + return False + + def readable(self): + return False + + def writable(self): + return True diff --git a/lib/cloudstorage/test_utils.py b/lib/cloudstorage/test_utils.py new file mode 100755 index 0000000000..e4d82477dc --- /dev/null +++ b/lib/cloudstorage/test_utils.py @@ -0,0 +1,25 @@ +# Copyright 2013 Google Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, +# either express or implied. See the License for the specific +# language governing permissions and limitations under the License. + +"""Utils for testing.""" + + +class MockUrlFetchResult(object): + + def __init__(self, status, headers, body): + self.status_code = status + self.headers = headers + self.content = body + self.content_was_truncated = False + self.final_url = None diff --git a/sdoapp.py b/sdoapp.py index 3ca14c154b..bf35c5402e 100755 --- a/sdoapp.py +++ b/sdoapp.py @@ -34,6 +34,7 @@ #Testharness Used to indicate we are being called from tests - use setInTestHarness() & getInTestHarness() to manage value - defauluts to False (we are not in tests) from testharness import * +from sdoutil import * from api import * from apirdflib import load_graph, getNss, getRevNss, buildSingleTermGraph, serializeSingleTermGrapth from apirdflib import countTypes, countProperties, countEnums @@ -181,10 +182,29 @@ def tick(): #Keep memcache values fresh so they don't expire if getInTestHarness(): load_examples_data(ENABLED_EXTENSIONS) + #prepareCloudstoreDocs() else: #Ensure clean start for any memcached or ndb store values... - if memcache.get("static-version") != appver: #We are a new instance of the app + dep = None + try: + fpath = os.path.join(os.path.split(__file__)[0], 'admin/deploy_timestamp.txt') + log.info("fpath: %s" % fpath) + with open(fpath, 'r') as f: + dep = f.read() + f.close() + except Exception as e: + log.info("ERROR reading: %s" % e) + pass + + log.info("Dep: %s mem:%s" % (dep, memcache.get("deployed-timestamp"))) +# if memcache.get("static-version") != appver: #We are a new instance of the app + + if memcache.get("deployed-timestamp") != dep: #We are a new instance of the app + msg = "New app instance detected - flushing caches. (mem='%s' deploy_timestamp='%s')" % (memcache.get("deployed-timestamp"), dep) memcache.flush_all() + memcache.set(key="deployed-timestamp", value=dep) + #sdo_send_mail(to="rjw@dataliberate.com",subject="[SCHEMAINFO] from 'sdoapp'", msg=msg) + log.info(">>>> %s " % msg) load_start = datetime.datetime.now() systarttime = datetime.datetime.utcnow() @@ -272,11 +292,11 @@ def toHTML(self): def toJSON(self): return self.txt - def traverseForHTML(self, node, depth = 1, hashorslash="/", layers='core', idprefix="", traverseAllLayers=False, buff=None): + def traverseForHTML(self, node, depth = 1, hashorslash="/", layers='core', idprefix="", urlprefix="", traverseAllLayers=False, buff=None): """Generate a hierarchical tree view of the types. hashorslash is used for relative link prefixing.""" - log.debug("traverseForHTML: node=%s hashorslash=%s" % ( node.id, hashorslash )) + #log.info("traverseForHTML: node=%s hashorslash=%s" % ( node.id, hashorslash )) if node.superseded(layers=layers): return False @@ -286,7 +306,6 @@ def traverseForHTML(self, node, depth = 1, hashorslash="/", layers='core', idpre localBuff = True buff = StringIO.StringIO() - urlprefix = "" home = node.getHomeLayer() gotOutput = False @@ -297,7 +316,7 @@ def traverseForHTML(self, node, depth = 1, hashorslash="/", layers='core', idpre if home in ENABLED_EXTENSIONS and home != getHostExt(): urlprefix = makeUrl(home) - + extclass = "" extflag = "" tooltip="" @@ -318,7 +337,7 @@ def traverseForHTML(self, node, depth = 1, hashorslash="/", layers='core', idpre # handle our subtypes for item in subTypes: subBuff = StringIO.StringIO() - got = self.traverseForHTML(item, depth + 1, hashorslash=hashorslash, layers=layers, idprefix=idprefix, traverseAllLayers=traverseAllLayers,buff=subBuff) + got = self.traverseForHTML(item, depth + 1, hashorslash=hashorslash, layers=layers, idprefix=idprefix, urlprefix=urlprefix, traverseAllLayers=traverseAllLayers,buff=subBuff) if got: self.emit2buff(buff,subBuff.getvalue()) subBuff.close() @@ -866,7 +885,7 @@ def _ClassExtensionProperties (self, out, cl, layers="core"): if inLayer("meta",prop): #Suppress mentioning properties from the 'meta' extension. continue ext = prop.getHomeLayer() - log.debug("ClassExtensionFound %s from %s" % (prop, ext)) + if not ext in exts.keys(): exts[ext] = [] exts[ext].append(prop) @@ -1133,7 +1152,8 @@ def handleHomepage(self, node): # Serve a homepage from template # the .tpl has responsibility for extension homepages # TODO: pass in extension, base_domain etc. - sitekeyedhomepage = "homepage %s" % getSiteName() + #sitekeyedhomepage = "homepage %s" % getSiteName() + sitekeyedhomepage = "%sindex" % getHostExt() hp = getPageFromStore(sitekeyedhomepage) self.response.headers['Content-Type'] = "text/html" self.emitCacheHeaders() @@ -1141,16 +1161,15 @@ def handleHomepage(self, node): self.response.out.write( hp ) #log.info("Served datacache homepage.tpl key: %s" % sitekeyedhomepage) else: - - template_values = { 'ext_contents': self.handleExtensionContents(getHostExt()), 'home_page': "True", } - page = templateRender('homepage.tpl',template_values) + page = templateRender('homepage.tpl', node, template_values) self.response.out.write( page ) log.debug("Served and cached fresh homepage.tpl key: %s " % sitekeyedhomepage) - #log.info("Served and cached fresh homepage.tpl key: %s " % sitekeyedhomepage) + + setAppVar(CLOUDEXTRAMETA,{'x-goog-meta-sdotermlayer': getHostExt()}) PageStore.put(sitekeyedhomepage, page) # self.response.out.write( open("static/index.html", 'r').read() ) return False # - Not caching homepage @@ -1166,11 +1185,16 @@ def getExtendedSiteName(self, layers): return (getHostExt() + ".schema.org") def emitSchemaorgHeaders(self, node, ext_mappings='', sitemode="default", sitename="schema.org", layers="core"): + self.response.out.write(self.buildSchemaorgHeaders(node, ext_mappings, sitemode, sitename, layers)) + + + def buildSchemaorgHeaders(self, node, ext_mappings='', sitemode="default", sitename="schema.org", layers="core"): """ Generates, caches and emits HTML headers for class, property and enumeration pages. Leaves open. * entry = name of the class or property """ + buff = sdoStringIO() rdfs_type = 'rdfs:Property' anode = True @@ -1197,31 +1221,24 @@ def emitSchemaorgHeaders(self, node, ext_mappings='', sitemode="default", sitena elif node.isAttribute(): rdfs_type = 'rdfs:Property' - generated_page_id = "GTPH-%s-%s" % ( str(entry), getSiteName() ) - gtp = DataCache.get( generated_page_id ) - - if gtp: - self.response.out.write( gtp ) - log.info("Served recycled genericTermPageHeader.tpl for %s" % generated_page_id ) - else: - - desc = entry - if anode: - desc = self.getMetaDescription(node, layers=layers, lengthHint=200) + desc = entry + if anode: + desc = self.getMetaDescription(node, layers=layers, lengthHint=200) + + template_values = { + 'entry': str(entry), + 'desc' : desc, + 'menu_sel': "Schemas", + 'rdfs_type': rdfs_type, + 'ext_mappings': ext_mappings, + 'noindexpage': noindexpages + } + out = templateRender('genericTermPageHeader.tpl', node, template_values) + buff.write(out) - template_values = { - 'entry': str(entry), - 'desc' : desc, - 'menu_sel': "Schemas", - 'rdfs_type': rdfs_type, - 'ext_mappings': ext_mappings, - 'noindexpage': noindexpages - } - out = templateRender('genericTermPageHeader.tpl',template_values) - DataCache.put(generated_page_id, out) - log.info("Served and cached fresh genericTermPageHeader.tpl for %s" % generated_page_id ) - - self.response.write(out) + ret = buff.getvalue() + buff.close() + return ret def getMetaDescription(self, node, layers="core",lengthHint=250): ins = "" @@ -1259,7 +1276,7 @@ def emitExactTermPage(self, node, layers="core"): if ("schema.org" not in self.request.host and sitemode == "mainsite"): sitemode = "mainsite testsite" - self.emitSchemaorgHeaders(node, ext_mappings, sitemode, getSiteName(), layers) +# self.emitSchemaorgHeaders(node, ext_mappings, sitemode, getSiteName(), layers) cached = getPageFromStore(node.id) @@ -1267,6 +1284,9 @@ def emitExactTermPage(self, node, layers="core"): log.info("GOT CACHED page for %s" % node.id) self.response.write(cached) return + + self.write(self.buildSchemaorgHeaders(node, ext_mappings, sitemode, getSiteName(), layers)) + self.parentStack = [] self.GetParentStack(node, layers=self.appropriateLayers(layers=layers)) @@ -1431,9 +1451,11 @@ def emitExactTermPage(self, node, layers="core"): ga('create', 'UA-52672119-1', 'auto');ga('send', 'pageview');""") - self.write(" \n\n\n\n\n" % getAppEngineVersion()) + self.write(" \n\n\n\n\n" % (getAppEngineVersion(),appver)) + log.info("outputStrings len: %s" % len(self.outputStrings)) page = "".join(self.outputStrings) + setAppVar(CLOUDEXTRAMETA,{'x-goog-meta-sdotermlayer': node.home}) PageStore.put(node.id,page) # self.response.write(self.AddCachedText(node, self.outputStrings, layers)) @@ -1501,66 +1523,57 @@ def handleJSONContext(self, node): # see also handleHomepage for conneg'd version. def handleSchemasPage(self, node, layerlist='core'): - self.response.headers['Content-Type'] = "text/html" - self.emitCacheHeaders() - - if getPageFromStore('SchemasPage'): + if getPageFromStore(node): self.response.out.write( getPageFromStore('SchemasPage') ) log.debug("Serving recycled SchemasPage.") return True else: + self.response.headers['Content-Type'] = "text/html" + self.emitCacheHeaders() + extensions = [] for ex in sorted(ENABLED_EXTENSIONS): if ex != ATTIC: extensions.append("%s.schema.org" % (makeUrl(ex,"",full=True),ex)) - page = templateRender('schemas.tpl',{'counts': self.getCounts(), + page = templateRender('schemas.tpl', node, {'counts': self.getCounts(), 'extensions': extensions, 'attic': "%s.schema.org" % (makeUrl(ATTIC,""),ATTIC), 'menu_sel': "Schemas"}) self.response.out.write( page ) log.debug("Serving fresh SchemasPage.") - PageStore.put("SchemasPage",page) + PageStore.put(node,page) return True def handleDumpsPage(self, node, layerlist='core'): self.response.headers['Content-Type'] = "text/html" self.emitCacheHeaders() + + page = getPageFromStore(node) - if getPageFromStore('DumpsPage'): - self.response.out.write( getPageFromStore('DumpsPage') ) + if page: + self.response.out.write( page) log.debug("Serving recycled DumpsPage.") return True else: extensions = sorted(ENABLED_EXTENSIONS) - page = templateRender('developers.tpl',{'extensions': extensions, + page = templateRender('developers.tpl', node, {'extensions': extensions, 'version': SCHEMA_VERSION, 'menu_sel': "Schemas"}) self.response.out.write( page ) log.debug("Serving fresh DumpsPage.") - PageStore.put("DumpsPage",page) + PageStore.put(node,page) return True def getCounts(self): - typesCount = getPageFromStore('typesCount-core') - if not typesCount: - typesCount = str(countTypes(extension="core")) - PageStore.put('typesCount-core',typesCount) - - propsCount = getPageFromStore('propsCount-core') - if not propsCount: - propsCount = str(countProperties(extension="core")) - PageStore.put('propsCount-core',propsCount) - - enumCount = getPageFromStore('enumCount-core') - if not enumCount: - enumCount = str(countEnums(extension="core")) - PageStore.put('enumCount-core',enumCount) + typesCount = str(countTypes(extension="core")) + propsCount = str(countProperties(extension="core")) + enumCount = str(countEnums(extension="core")) text = "" text += "The core vocabulary currently consists of %s Types, " % typesCount @@ -1570,15 +1583,20 @@ def getCounts(self): def handleFullHierarchyPage(self, node, layerlist='core'): - self.response.headers['Content-Type'] = "text/html" - self.emitCacheHeaders() - label = 'FullTreePage - %s' % getHostExt() + #label = 'FullTreePage - %s' % getHostExt() + #label = 'FullTreePage' + urlprefix = '' + label = node + if label.startswith('docs/'): + urlprefix = '..' if getPageFromStore(label): self.response.out.write( getPageFromStore(label) ) log.debug("Serving recycled %s." % label) return True else: + self.response.headers['Content-Type'] = "text/html" + self.emitCacheHeaders() template = JINJA_ENVIRONMENT.get_template('full.tpl') extlist="" @@ -1612,26 +1630,26 @@ def handleFullHierarchyPage(self, node, layerlist='core'): uDataType = Unit.GetUnit("DataType") mainroot = TypeHierarchyTree(local_label) - mainroot.traverseForHTML(uThing, layers=layerlist, idprefix="C.") + mainroot.traverseForHTML(uThing, layers=layerlist, idprefix="C.", urlprefix=urlprefix) thing_tree = mainroot.toHTML() fullmainroot = TypeHierarchyTree("

Core plus all extension vocabularies

") - fullmainroot.traverseForHTML(uThing, layers=ALL_LAYERS_NO_ATTIC, idprefix="CE.") + fullmainroot.traverseForHTML(uThing, layers=ALL_LAYERS_NO_ATTIC, idprefix="CE.", urlprefix=urlprefix) full_thing_tree = fullmainroot.toHTML() ext_thing_tree = None if len(extonlylist) > 0: extroot = TypeHierarchyTree("

Extension: %s

" % extlist) - extroot.traverseForHTML(uThing, layers=extonlylist, traverseAllLayers=True, idprefix="E.") + extroot.traverseForHTML(uThing, layers=extonlylist, traverseAllLayers=True, idprefix="E.", urlprefix=urlprefix) ext_thing_tree = extroot.toHTML() dtroot = TypeHierarchyTree("

Data Types

") - dtroot.traverseForHTML(uDataType, layers=layerlist, idprefix="D.") + dtroot.traverseForHTML(uDataType, layers=layerlist, idprefix="D.", urlprefix=urlprefix) datatype_tree = dtroot.toHTML() full_button = "Core plus all extension vocabularies" - page = templateRender('full.tpl',{ 'thing_tree': thing_tree, + page = templateRender('full.tpl', node, { 'thing_tree': thing_tree, 'full_thing_tree': full_thing_tree, 'ext_thing_tree': ext_thing_tree, 'datatype_tree': datatype_tree, @@ -1648,12 +1666,16 @@ def handleFullHierarchyPage(self, node, layerlist='core'): def handleJSONSchemaTree(self, node, layerlist='core'): """Handle a request for a JSON-LD tree representation of the schemas (RDFS-based).""" - + + if isinstance(node, Unit): + node = node.id + self.response.headers['Content-Type'] = "application/ld+json" self.emitCacheHeaders() - if getPageFromStore('JSONLDThingTree'): - self.response.out.write( getPageFromStore('JSONLDThingTree') ) + page = getPageFromStore(node) + if page: + self.response.out.write( page ) log.debug("Serving recycled JSONLDThingTree.") return True else: @@ -1663,7 +1685,7 @@ def handleJSONSchemaTree(self, node, layerlist='core'): thing_tree = mainroot.toJSON() self.response.out.write( thing_tree ) log.debug("Serving fresh JSONLDThingTree.") - PageStore.put("JSONLDThingTree",thing_tree) + PageStore.put(node,thing_tree) return True return False @@ -1712,9 +1734,14 @@ def handleExactTermPage(self, node, layers='core'): if self.checkConneg(node): return True + schema_node = Unit.GetUnit(node) # e.g. "Person", "CreativeWork". + if not self.checkNodeExt(schema_node): + return False + self.response.headers['Content-Type'] = "text/html" self.emitCacheHeaders() - schema_node = Unit.GetUnit(node) # e.g. "Person", "CreativeWork". + + if inLayer(layers, schema_node): self.emitExactTermPage(schema_node, layers=layers) return True @@ -1734,7 +1761,7 @@ def handleExactTermPage(self, node, layers='core'): #self.response.out.write("
  • %s
  • " % (makeUrl(x,schema_node.id), x) ) template = JINJA_ENVIRONMENT.get_template('wrongExt.tpl') - page = templateRender('wrongExt.tpl', + page = templateRender('wrongExt.tpl', node, {'target': schema_node.id, 'targetext': schema_node.getHomeLayer(), 'extensions': extensions, @@ -1749,6 +1776,27 @@ def handleExactTermPage(self, node, layers='core'): log.info("Should not have reached here!!") + def checkNodeExt(self,node): + if os.environ['STAYINEXTENTION'] and os.environ['STAYINEXTENTION'] == "True": + return True + + home = node.home + ext = getHostExt() + log.info("node: '%s' home: '%s' ext: '%s'" % (node,home,ext)) + if home == CORE and ext == '': + return True + if home == ext: + return True + + if home == CORE: + log.info("Redirecting to core entity") + self.redirectToBase(node.id,full=True) + else: + log.info("Redirecting to '%s' entity" % home) + self.redirectToBase(node.id,ext=home, full=True) + return False + + def handleExactTermDataOutput(self, node=None, outputtype=None): log.info("handleExactTermDataOutput Node: '%s' Outputtype: '%s'" % (node, outputtype)) ret = False @@ -1861,7 +1909,7 @@ def handleFullReleasePage(self, node, layerlist='core'): return True else: log.debug("Serving tocversionPage from cache.") - page = templateRender('tocVersionPage.tpl', + page = templateRender('tocVersionPage.tpl', node, {"releases": sorted(releaselog.iterkeys()), "menu_sel": "Schemas"}) @@ -1966,7 +2014,7 @@ def handleFullReleasePage(self, node, layerlist='core'): else: releasedate = releaselog[str(SCHEMA_VERSION)] - page = templateRender('fullReleasePage.tpl', + page = templateRender('fullReleasePage.tpl', node, {"base_href": base_href, 'thing_tree': thing_tree, 'liveversion': SCHEMA_VERSION, @@ -2220,7 +2268,6 @@ def get(self, node): self.response.set_status(304,"Not Modified") else: enableCaching = self._get(node) #Go build the page - tagsuff = "" if ( "content-type" in self.response.headers and "json" in self.response.headers["content-type"] ): @@ -2228,8 +2275,16 @@ def get(self, node): if enableCaching: if self.response.status.startswith("200"): - self.response.headers.add_header("ETag", etag + tagsuff) - self.response.headers['Last-Modified'] = getmodiftime().strftime("%a, %d %b %Y %H:%M:%S UTC") + stat = getAppVar(CLOUDSTAT) + log.info("CLOUDSTAT %s" % stat) + + if stat: #Use values from cloud storage + self.response.headers.add_header("ETag", stat.etag) + self.response.headers['Last-Modified'] = time.strftime("%a, %d %b %Y %H:%M:%S UTC",time.gmtime(stat.st_ctime)) + else: + self.response.headers.add_header("ETag", etag + tagsuff) + self.response.headers['Last-Modified'] = getmodiftime().strftime("%a, %d %b %Y %H:%M:%S UTC") + retHdrs = self.response.headers.copy() HeaderStore.putIfNewKey(etag + tagsuff,retHdrs) #Cache these headers for a future 304 return @@ -2286,7 +2341,7 @@ def _get(self, node, doWarm=True): log.info("[%s] Warmup dissabled for localhost instance" % getInstanceId(short=True)) if DISABLE_NDB_FOR_LOCALHOST: log.info("[%s] NDB dissabled for localhost instance" % getInstanceId(short=True)) - enablePageStore(False) + enablePageStore("INMEM") else: if not memcache.get("warmedup"): memcache.set("warmedup", value=True) @@ -2323,16 +2378,17 @@ def _get(self, node, doWarm=True): memcache.flush_all() return False - if not getPageFromStore(node): #Not stored this page before - #log.info("Not stored %s" % node) - if not LOADEDSOURCES: - log.info("Instance[%s] received request for not stored page: %s" % (getInstanceId(short=True), node) ) - log.info("Instance[%s] needs to load sources to create it" % (getInstanceId(short=True)) ) - load_sources() #Get Examples files and schema definitions + if not LOADEDSOURCES: + log.info("Instance[%s] received request for not stored page: %s" % (getInstanceId(short=True), node) ) + log.info("Instance[%s] needs to load sources to create it" % (getInstanceId(short=True)) ) + load_sources() #Get Examples files and schema definitions if (node in ["", "/"]): return self.handleHomepage(node) - + + if (node.startswith("docs/") and getHostExt() != ""): #All docs should operate in core + self.redirectToBase(node,True) + if node in ["docs/jsonldcontext.json.txt", "docs/jsonldcontext.json"]: if self.handleJSONContext(node): return True @@ -2360,8 +2416,6 @@ def _get(self, node, doWarm=True): log.info("Error handling developers.html : %s " % node) return False - - if (node == "docs/tree.jsonld" or node == "docs/tree.json"): if self.handleJSONSchemaTree(node, layerlist=ALL_LAYERS): return True @@ -2421,7 +2475,7 @@ def _get(self, node, doWarm=True): def siteDebug(self): global STATS - page = templateRender('siteDebug.tpl') + page = templateRender('siteDebug.tpl', "_siteDebug" ) self.response.out.write( page ) ext = getHostExt() @@ -2525,7 +2579,7 @@ class WarmupTool(): def __init__(self): #self.pageList = ["docs/schemas.html"] - self.pageList = ["/","docs/schemas.html","docs/full.html","docs/tree.jsonld"] + self.pageList = ["/","docs/schemas.html","docs/full.html","docs/tree.jsonld","docs/developers.html"] self.warmPages = {} for l in ALL_LAYERS: self.warmPages[l] = [] @@ -2575,8 +2629,12 @@ def warmAll(self,unit): Warmer = WarmupTool() -def templateRender(templateName,values=None): +def templateRender(templateName, node, values=None): global sitemode #,sitename + + if isinstance(node, Unit): + node = node.id + extDef = Unit.GetUnit(getNss(getHostExt()),True) extComment = "" extVers = "" @@ -2604,19 +2662,26 @@ def templateRender(templateName,values=None): nms = GetTargets(Unit.GetUnit("name", True), extDef, layers=ALL_LAYERS ) if len(nms) > 0: extName = nms[0] - + if node.startswith("docs/"): + docsdir = "./" + homedir = ".." + else: + docsdir = "docs/" + homedir = "." defvars = { 'ENABLE_HOSTED_EXTENSIONS': ENABLE_HOSTED_EXTENSIONS, 'SCHEMA_VERSION': SCHEMA_VERSION, 'sitemode': sitemode, 'sitename': getSiteName(), - 'staticPath': makeUrl("",""), - 'extensionPath': makeUrl(getHostExt(),"",True), + 'staticPath': makeUrl("","",full=True), + 'extensionPath': makeUrl(getHostExt(),"",full=True), 'myhost': getHost(), 'myport': getHostPort(), 'mybasehost': getBaseHost(), 'host_ext': getHostExt(), 'extComment': extComment, + 'docsdir': docsdir, + 'homedir': homedir, 'extDD': extDD, 'extVers': extVers, 'extName': extName, @@ -2644,16 +2709,6 @@ def my_shutdown_hook(): runtime.set_shutdown_hook(my_shutdown_hook) -ThreadVars = threading.local() -def getAppVar(var): - ret = getattr(ThreadVars, var, None) - #log.debug("got var %s as %s" % (var,ret)) - return ret - -def setAppVar(var,val): - #log.debug("Setting var %s to %s" % (var,val)) - setattr(ThreadVars,var,val) - def setHttpScheme(val): setAppVar('httpScheme',val) diff --git a/sdocloudstore.py b/sdocloudstore.py new file mode 100644 index 0000000000..3932c8245a --- /dev/null +++ b/sdocloudstore.py @@ -0,0 +1,453 @@ +# [START imports] +import logging +logging.basicConfig(level=logging.INFO) # dev_appserver.py --log_level debug . +log = logging.getLogger(__name__) + +import os +import datetime, time +import cloudstorage +import mimetypes +import StringIO +import json + +from google.appengine.ext import blobstore +from google.appengine.ext.webapp import blobstore_handlers + +from testharness import * +from sdoutil import * + +if not getInTestHarness(): + from google.appengine.api import app_identity + +# [START retries] +cloudstorage.set_default_retry_params( + cloudstorage.RetryParams( + initial_delay=0.2, max_delay=5.0, backoff_factor=2, max_retry_period=15 + )) +# [END retries] +BUCKETROOT = "schemaorg" +DEFAULTCURRENT = "TestData" +CLOUDCACHEENABLE = False + +class StoreLocation(): + def __init__(self): + self.intitialise() + + def intitialise(self): + self.root = BUCKETROOT + self.current = self._findCurrent() + + def _findCurrent(self): + + if not getInTestHarness(): + from google.appengine.api.modules.modules import get_current_version_name + ret = get_current_version_name() + + if ret: + return ret + + return DEFAULTCURRENT + + def getRoot(self): + return self.root + + def getCurrentLoc(self): + return self.current + + def locate(self,path=""): + ret = self.getRoot() + "/" + self.getCurrentLoc() + if path and len(path): + ret += '/' + path + return ret + + def checkConfig(self): + changed = False + prev = self.current + new = self._findCurrent() + if new != prev: + changed = True + self.current = new + return changed + + + + +STORELOC = StoreLocation() + +class bucketCacheItem(): + def __init__(self,stat=None,content=None): + self.stat = stat + self.content = content + +class SdoCloudStore(): + def __init__(self): + log.info("InTestHarness: %s" % getInTestHarness()) + if getInTestHarness(): + self.bucket_name = os.environ.get('BUCKET_NAME',"app_default_bucket") + else: + self.bucket_name = os.environ.get('BUCKET_NAME', app_identity.get_default_gcs_bucket_name()) + + log.info("Storage Bucket: %s" % self.bucket_name) + self.cleanCache() +# self.cleanCache() +# self.bucket_name = "sdo-rjwtest.appspot.com" +# self.storageClient = cloudstorage.Client() +# self.bucket = self.storageClient.bucket(self.bucket_name) + + + def buildNameType(self,filename,ftype): + dataext = os.path.splitext(filename) + ext = dataext[1] + if ext and len(ext) and ext.startswith('.'): + ext = ext[1:] + if ftype and ext != ftype: + filename = filename + '.' + ftype + + log.info("buildNameType: filename:%s ftype: %s ext: %s" % (filename, ftype, ext)) + + return filename + + def buildBucketFile(self,filename,ftype,location): + log.info("buildBucketFile( %s %s %s )" % (filename,ftype,location)) + + filename = self.buildNameType(filename,ftype) + + if not location: + if ftype: + location = ftype + if location: + bucketFile = location + "/" + filename + else: + bucketFile = filename + + mimetype, contentType = mimetypes.guess_type(bucketFile) + + log.info("buildBucketFile: %s %s (%s)" % (bucketFile,mimetype,contentType)) + + return bucketFile, mimetype + + def getPath(self,bucketFile): + bucketFile = "/" + self.bucket_name + "/" + STORELOC.locate(bucketFile) + return bucketFile + + +# [START write] + def writeFormattedFile(self, filename, ftype=None, location=None, content="", raw=False, private=False, extrameta=None): + """Create a file.""" + bucketFile, mtype = self.buildBucketFile(filename,ftype,location) + self.write_file(bucketFile, mtype, content, raw=raw, private=private, extrameta=extrameta) + + def write_file(self, bucketFile, mtype=None, content="", raw=False, private=False, extrameta=None): + """Create a file.""" + + log.info('Creating file {} ({})'.format(bucketFile,mtype)) + bucketFile = self.getPath(bucketFile) + return self._write_file(bucketFile=bucketFile, mtype=mtype, content=content, raw=raw, private=private, extrameta=extrameta) + + def _write_file(self, bucketFile, mtype=None, content="", raw=False, private=False, extrameta=None): + log.info("Attempting to write: %s %s %s" % (bucketFile, mtype, raw)) + # The retry_params specified in the open call will override the default + # retry params for this particular file handle. + + setAppVar(CLOUDSTAT,None) + moremeta = getAppVar(CLOUDEXTRAMETA) + setAppVar(CLOUDEXTRAMETA,None) #clear out now potentially stale values + + if extrameta and moremeta: + extrameta.update(moremeta) + else: + extrameta = moremeta + + try: + write_retry_params = cloudstorage.RetryParams(backoff_factor=1.1) + write_options = {} + if not private: + write_options.update({'x-goog-acl': 'public-read'}) + if extrameta: + write_options.update(extrameta) + if private: + write_options="" + if not raw: + log.info("Encoding to utf8") + content = content.encode('utf-8') + with cloudstorage.open( + bucketFile, 'w', + content_type=mtype, + options=write_options, + retry_params=write_retry_params) as cloudstorage_file: + cloudstorage_file.write(content) + except Exception as e: + log.error("File write error: (%s): %s" % (bucketFile,e)) + return False + + setAppVar(CLOUDSTAT,self._stat_file(bucketFile,cache=False)) + return True + + def write_json_file(self, bucketFile, mtype="application/json", data={}, private=False): + """Create a file.""" + + bucketFile = self.getPath(bucketFile) + return self._write_json_file(bucketFile=bucketFile, mtype=mtype, data=data, private=private) + + + def _write_json_file(self, bucketFile, mtype="application/json", data={}, private=False): + # The retry_params specified in the open call will override the default + # retry params for this particular file handle. + log.info("Attempting to write: %s" % bucketFile) + try: + write_retry_params = cloudstorage.RetryParams(backoff_factor=1.1) + write_options = {'x-goog-acl': 'public-read'} + if private: + write_options="" + with cloudstorage.open( + bucketFile, 'w', + content_type=mtype, + options=write_options, + retry_params=write_retry_params) as cloudstorage_file: + json.dump(data,cloudstorage_file) + except Exception as e: + log.info("File write error: (%s): %s" % (bucketFile,e)) + return False + return True + +# [END write] + +# [START stat] + def statFormattedFile(self, filename, ftype="html", location=None, cache=True): + log.info("statFormattedFile(%s,%s,%s,%s)" % (filename, ftype, location, cache)) + bucketFile, mtype = self.buildBucketFile(filename,ftype,location) + return self.stat_file(bucketFile, ftype, cache) + + def stat_file(self, bucketFile, ftype=None, cache=True): + bucketFile = self.getPath(bucketFile) + return self._stat_file(bucketFile, ftype=ftype, cache=cache) + + def _stat_file(self, bucketFile, ftype=None, cache=True): + log.info("stat_file(%s,%s,%s)" % (bucketFile, ftype, cache)) + ret = None + if cache: + item = self.readCache(bucketFile,ftype) + if item: + ret = item.stat + log.info("Got from readCache") + + if not ret: + log.info('Stating file {}'.format(bucketFile)) + try: + ret = cloudstorage.stat(bucketFile) + except cloudstorage.NotFoundError: + log.info("File not found: %s" % bucketFile) + except Exception as e: + log.info("Stat error(%s): %s" % (bucketFile,e)) + + if ret: + log.info("Stat {}".format(ret)) + itm = bucketCacheItem(ret,None) + self.writeCache(bucketFile, itm, ftype) + return ret + +# [END stat] + +# [START read] + def readFormattedFile(self, filename, ftype="html", location=None, cache=True): + log.info("readFormattedFile(%s,%s,%s,%s)" % (filename,ftype,location,cache)) + stat, content = self.getFormattedFile(filename, ftype, location, cache) + return content + + def read_file(self, bucketFile, cache=True, stat=None): + log.info("read_file(%s,%s,%s)" % (bucketFile,cache,stat)) + stat, content = self.get_file(bucketFile,cache,stat) + return content + + + def getFormattedFile(self, filename, ftype="html", location=None, cache=True): + log.info("getFormattedFile(%s,%s,%s,%s)" % (filename,ftype,location,cache)) + + bucketFile, mtype = self.buildBucketFile(filename,ftype,location) + stat, content = self.get_file(bucketFile,reqtype=ftype, cache=cache) + return stat, content + + def get_file(self, bucketFile, reqtype=None, cache=True): + bucketFile = self.getPath(bucketFile) + return self._get_file(bucketFile=bucketFile, reqtype=reqtype, cache=cache) + + + def _get_file(self, bucketFile, reqtype=None, cache=True): + log.info("get_file(%s,%s)" % (bucketFile,cache)) + + setAppVar(CLOUDSTAT,None) + + stat = None + content = None + cached = False + if cache: + item = self.readCache(bucketFile,reqtype) + if item: + content = item.content + stat = item.stat + if content: + cached = True + log.info("Got from readCache") + + if not stat: + stat = self._stat_file(bucketFile,cache=False) + if stat: + log.info('Opening file {}'.format(bucketFile)) + try: + with cloudstorage.open(bucketFile) as cloudstorage_file: + content = cloudstorage_file.read() + cloudstorage_file.close() + + except cloudstorage.NotFoundError: + log.info("File not found: %s" % bucketFile) + except Exception as e: + log.info("File read error (%s): %s" % (bucketFile,e)) + + if not cached and content: + log.info("Adding to cache: %s" % bucketFile) + val = bucketCacheItem(stat=stat,content=content) + self.writeCache(bucketFile,val, reqtype) + + setAppVar(CLOUDSTAT,stat) + return stat, content + + def delete_file(self, bucketFile, ftype=None): + bucketFile = self.getPath(bucketFile) + return _delete_file(bucketFile=bucketFile, ftype=ftype) + + def _delete_file(self, bucketFile, ftype=None): + log.info("Deleting: %s" % bucketFile) + self.delCache(bucketFile, ftype) + try: + cloudstorage.delete(bucketFile) + except cloudstorage.NotFoundError: + pass + + def get_json_file(self, bucketFile): + log.info("get_json_file(%s)" % (bucketFile)) + data = None + stat = self.stat_file(bucketFile,cache=False) + if stat: + log.info('Opening file {}'.format(self.getPath(bucketFile))) + try: + with cloudstorage.open(self.getPath(bucketFile)) as cloudstorage_file: + data = json.load(cloudstorage_file) + cloudstorage_file.close() + except cloudstorage.NotFoundError: + log.info("File not found: %s" % self.getPath(bucketFile)) + except Exception as e: + log.info("File read error (%s): %s" % (bucketFile,e)) + return data + + def cleanCache(self,reqtype=None): + self.cache = {} + + def emptyCache(self,reqtype): + log.info("Emptying cache for %s" % reqtype) + if not reqtype: + self.cache = {} + else: + self.cache[reqtype] = {} + + def readCache(self, index, reqtype=None): + if not CLOUDCACHEENABLE: + return None + + if not reqtype: + reqtype = "unknown" + + ctype = self.cache.get(reqtype) + if ctype: + return ctype.get(index) + return None + + def writeCache(self, index, value, reqtype=None): + if not CLOUDCACHEENABLE: + return + + if not reqtype: + reqtype = "unknown" + + ctype = self.cache.get(reqtype) + if not ctype: + ctype = {} + self.cache[reqtype] = ctype + ctype[index] = value + + def delCache(self, index, reqtype=None): + if not CLOUDCACHEENABLE: + return + + if not reqtype: + reqtype = "unknown" + ctype = self.cache.get(reqtype) + if ctype: + ctype.pop(index,None) + + +# [END read] + +# [START delete_file] + def deleteFormattedFile(self, filename, ftype="html", location=None ): + bucketFile, mtype = self.buildBucketFile(filename,ftype,location) + self.delete_file(bucketFile,ftype) + + def delete_file(self,bucketFile,ftype=None): + log.info('Deleting file {}'.format(bucketFile)) + self.delCache(bucketFile,reqtype=ftype) + self._delete_file(self.getPath(bucketFile)) + + def _delete_file(self, file): + try: + cloudstorage.delete(file) + except cloudstorage.NotFoundError: + log.info("File not found: %s" % file) + pass + +# [END delete_file] + +# [START delete_files] + + def delete_files_in_bucket(self): + return self.delete_files_in_folder("") + + def delete_files_in_folder(self, folder): + bucketFolder = self.getPath(folder) + log.info("bucketFolder %s" % bucketFolder) + startdel = datetime.datetime.now() + delcount = 0 + page_size = 100 + stats = cloudstorage.listbucket(bucketFolder, max_keys=page_size) + files = [] + while True: + count = 0 + for stat in stats: + count += 1 + files.append(stat.filename) + + for f in files: + delcount += 1 + self._delete_file(f) + + if count != page_size or count == 0: + break + stats = cloudstorage.listbucket(bucketFolder, max_keys=page_size,marker=stat.filename) + files = [] + + log.info("Cloudstorage: deleted %s files in %s seconds" % (delcount, (datetime.datetime.now() - startdel))) + return delcount + +# [END delete_files] + + +SdoCloud = SdoCloudStore() + + + + + + + + + +