From 552ccb3735b9bcfe153977ca78fa842b05ad59bf Mon Sep 17 00:00:00 2001 From: Batuhan Taskaya Date: Wed, 15 Sep 2021 21:42:00 +0300 Subject: [PATCH] core: implement callbacks for put_file/get_file (#416) --- gcsfs/core.py | 30 +++++++++++++++++++++++------- 1 file changed, 23 insertions(+), 7 deletions(-) diff --git a/gcsfs/core.py b/gcsfs/core.py index 1e9b9360..d2a40747 100644 --- a/gcsfs/core.py +++ b/gcsfs/core.py @@ -884,6 +884,7 @@ async def _put_file( consistency=None, content_type="application/octet-stream", chunksize=50 * 2 ** 20, + callback=None, **kwargs, ): # enforce blocksize should be a multiple of 2**18 @@ -895,8 +896,11 @@ async def _put_file( with open(lpath, "rb") as f0: size = f0.seek(0, 2) f0.seek(0) + if callback is not None: + callback.set_size(size) + if size < 5 * 2 ** 20: - return await simple_upload( + await simple_upload( self, bucket, key, @@ -905,6 +909,8 @@ async def _put_file( metadatain=metadata, content_type=content_type, ) + callback.absolute_update(size) + return None else: location = await initiate_upload( self, bucket, key, content_type, metadata @@ -918,6 +924,8 @@ async def _put_file( self, location, bit, offset, size, content_type ) offset += len(bit) + if callback is not None: + callback.absolute_update(offset) checker.update(bit) checker.validate_json_response(out) @@ -981,7 +989,9 @@ async def _find(self, path, withdirs=False, detail=False, prefix="", **kwargs): return [o["name"] for o in out] @retry_request(retries=retries) - async def _get_file_request(self, rpath, lpath, *args, headers=None, **kwargs): + async def _get_file_request( + self, rpath, lpath, *args, headers=None, callback=None, **kwargs + ): consistency = kwargs.pop("consistency", self.consistency) async with self.session.get( @@ -991,8 +1001,14 @@ async def _get_file_request(self, rpath, lpath, *args, headers=None, **kwargs): timeout=self.requests_timeout, ) as r: r.raise_for_status() - checker = get_consistency_checker(consistency) + if callback is not None: + try: + size = int(r.headers["content-length"]) + except (KeyError, ValueError): + size = None + callback.set_size(size) + checker = get_consistency_checker(consistency) os.makedirs(os.path.dirname(lpath), exist_ok=True) with open(lpath, "wb") as f2: while True: @@ -1001,16 +1017,16 @@ async def _get_file_request(self, rpath, lpath, *args, headers=None, **kwargs): break f2.write(data) checker.update(data) + if callback is not None: + callback.relative_update(len(data)) validate_response(r.status, data, rpath) # validate http request checker.validate_http_response(r) # validate file consistency return r.status, r.headers, r.request_info, data - async def _get_file(self, rpath, lpath, **kwargs): - if await self._isdir(rpath): - return + async def _get_file(self, rpath, lpath, callback=None, **kwargs): u2 = self.url(rpath) - await self._get_file_request(u2, lpath, **kwargs) + await self._get_file_request(u2, lpath, callback=callback, **kwargs) def _open( self,