Skip to content

Commit

Permalink
Merge pull request #8 from APL-HTM-S3-AWARE/416-s3-awareness
Browse files Browse the repository at this point in the history
Cloud Awareness for Custom Download Interfaces
  • Loading branch information
edmondb authored Nov 12, 2024
2 parents 9c75d14 + 8ec9a2d commit 321e05c
Show file tree
Hide file tree
Showing 21 changed files with 2,515 additions and 151 deletions.
54 changes: 43 additions & 11 deletions pyspedas/projects/cluster/load_csa.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
from typing import List
from .config import CONFIG

from pyspedas.utilities.download import is_fsspec_uri
import fsspec

def cl_master_datatypes():
"""Return list of data types."""
Expand Down Expand Up @@ -193,9 +195,17 @@ def load_csa(trange:List[str]=['2001-02-01', '2001-02-03'],
# Encode the url urllib.parse.quote
url = base_url + (query_string)

local_path = CONFIG['local_data_dir']
Path(local_path).mkdir(parents=True, exist_ok=True)
out_gz = os.path.join(local_path, 'temp_cluster_file.tar.gz') # Temp file name
local_path = CONFIG['local_data_dir'] # could be URI
if is_fsspec_uri(local_path):
local_protocol, lpath = local_path.split("://")
local_fs = fsspec.filesystem(local_protocol, anon=False)

out_gz = '/'.join([local_path, 'temp_cluster_file.tar.gz']) # Temp file name
fileobj = local_fs.open(out_gz, 'wb')
else:
Path(local_path).mkdir(parents=True, exist_ok=True)
out_gz = os.path.join(local_path, 'temp_cluster_file.tar.gz') # Temp file name
fileobj = open(out_gz, 'wb')

# Download the file.
logging.info("Downloading Cluster data, please wait....")
Expand All @@ -211,24 +221,46 @@ def load_csa(trange:List[str]=['2001-02-01', '2001-02-03'],
logging.info("Download complete.")

# Open the downloaded file.
with open(out_gz, 'wb') as w:
with fileobj as w:
w.write(r.content)

# Extract the tar archive.
tar = tarfile.open(out_gz, "r:gz")
f = tar.getnames()
if sys.version_info >= (3, 12):
tar.extractall(path=local_path, filter='fully_trusted')
if is_fsspec_uri(out_gz):
# Cloud-Awareness: Opens byte stream for tarfile package.
bo = local_fs.open(out_gz, "rb")
tar = tarfile.open(fileobj=bo)
else:
tar.extractall(path=local_path)
tar = tarfile.open(out_gz, "r:gz")
f = tar.getnames()

for member in tar.getmembers():
if member.isfile():
p = '/'.join([local_path, member.path])
if is_fsspec_uri(p):
membo = local_fs.open(p, "wb")
else:
os.makedirs(str(Path(p).parent), exist_ok=True)
membo = open(p, "wb")

# Python > 3.9 requirement from setup.py
# note: data is written after file is read into memory
# https://stackoverflow.com/a/62247729
with tar.extractfile(member.path) as tarbo:
membo.write(tarbo.read())

membo.close()
tar.close()
# Remove the tar.gz file but keep the extracted.
os.remove(out_gz)
if is_fsspec_uri(out_gz):
local_fs.delete(out_gz)
else:
os.remove(out_gz)

# Get unique set of files.
f_set = set(f)
# File list with full path.
out_files = [os.path.join(local_path, s) for s in list(f_set)]
sep = "/" if is_fsspec_uri(local_path) else os.path.sep
out_files = [sep.join([local_path, s]) for s in list(f_set)]
out_files = sorted(out_files)

if downloadonly:
Expand Down
Loading

0 comments on commit 321e05c

Please sign in to comment.