Skip to content

Commit

Permalink
core: implement callbacks for put_file/get_file (fsspec#416)
Browse files Browse the repository at this point in the history
  • Loading branch information
isidentical authored Sep 15, 2021
1 parent ceaa9bc commit 552ccb3
Showing 1 changed file with 23 additions and 7 deletions.
30 changes: 23 additions & 7 deletions gcsfs/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -884,6 +884,7 @@ async def _put_file(
consistency=None,
content_type="application/octet-stream",
chunksize=50 * 2 ** 20,
callback=None,
**kwargs,
):
# enforce blocksize should be a multiple of 2**18
Expand All @@ -895,8 +896,11 @@ async def _put_file(
with open(lpath, "rb") as f0:
size = f0.seek(0, 2)
f0.seek(0)
if callback is not None:
callback.set_size(size)

if size < 5 * 2 ** 20:
return await simple_upload(
await simple_upload(
self,
bucket,
key,
Expand All @@ -905,6 +909,8 @@ async def _put_file(
metadatain=metadata,
content_type=content_type,
)
callback.absolute_update(size)
return None
else:
location = await initiate_upload(
self, bucket, key, content_type, metadata
Expand All @@ -918,6 +924,8 @@ async def _put_file(
self, location, bit, offset, size, content_type
)
offset += len(bit)
if callback is not None:
callback.absolute_update(offset)
checker.update(bit)

checker.validate_json_response(out)
Expand Down Expand Up @@ -981,7 +989,9 @@ async def _find(self, path, withdirs=False, detail=False, prefix="", **kwargs):
return [o["name"] for o in out]

@retry_request(retries=retries)
async def _get_file_request(self, rpath, lpath, *args, headers=None, **kwargs):
async def _get_file_request(
self, rpath, lpath, *args, headers=None, callback=None, **kwargs
):
consistency = kwargs.pop("consistency", self.consistency)

async with self.session.get(
Expand All @@ -991,8 +1001,14 @@ async def _get_file_request(self, rpath, lpath, *args, headers=None, **kwargs):
timeout=self.requests_timeout,
) as r:
r.raise_for_status()
checker = get_consistency_checker(consistency)
if callback is not None:
try:
size = int(r.headers["content-length"])
except (KeyError, ValueError):
size = None
callback.set_size(size)

checker = get_consistency_checker(consistency)
os.makedirs(os.path.dirname(lpath), exist_ok=True)
with open(lpath, "wb") as f2:
while True:
Expand All @@ -1001,16 +1017,16 @@ async def _get_file_request(self, rpath, lpath, *args, headers=None, **kwargs):
break
f2.write(data)
checker.update(data)
if callback is not None:
callback.relative_update(len(data))

validate_response(r.status, data, rpath) # validate http request
checker.validate_http_response(r) # validate file consistency
return r.status, r.headers, r.request_info, data

async def _get_file(self, rpath, lpath, **kwargs):
if await self._isdir(rpath):
return
async def _get_file(self, rpath, lpath, callback=None, **kwargs):
u2 = self.url(rpath)
await self._get_file_request(u2, lpath, **kwargs)
await self._get_file_request(u2, lpath, callback=callback, **kwargs)

def _open(
self,
Expand Down

0 comments on commit 552ccb3

Please sign in to comment.