From 2dc2d9c71e792c73f05bb12007382a505f384364 Mon Sep 17 00:00:00 2001 From: Eirini Koutsaniti Date: Fri, 15 Nov 2024 14:06:10 +0100 Subject: [PATCH 1/9] Add better support for transfer jobs in mv/cp/rm --- firecrest/FirecrestException.py | 13 ++++ firecrest/v2/AsyncClient.py | 112 ++++++++++++++++++++++++-------- 2 files changed, 98 insertions(+), 27 deletions(-) diff --git a/firecrest/FirecrestException.py b/firecrest/FirecrestException.py index fddf6dd..b0b9b0e 100644 --- a/firecrest/FirecrestException.py +++ b/firecrest/FirecrestException.py @@ -112,3 +112,16 @@ def __str__(self): f"is exhausted. Update `polling_sleep_times` of the client " f"to increase the number of polling attempts." ) + + +class TransferJobFailedException(Exception): + """Exception raised when the polling iterator is exhausted""" + + def __init__(self, transfer_job_info): + self._transfer_job_info = transfer_job_info + + def __str__(self): + return ( + f"Transfer job failed. Check the log files for more " + f"information: {self._transfer_job_info['transferJob']}" + ) diff --git a/firecrest/v2/AsyncClient.py b/firecrest/v2/AsyncClient.py index 72f5ac3..14d9dea 100644 --- a/firecrest/v2/AsyncClient.py +++ b/firecrest/v2/AsyncClient.py @@ -6,6 +6,7 @@ # from __future__ import annotations +import asyncio import httpx import json import logging @@ -20,7 +21,11 @@ from firecrest.utilities import ( parse_retry_after, slurm_state_completed, time_block ) -from firecrest.FirecrestException import UnexpectedStatusException +from firecrest.FirecrestException import ( + FirecrestException, + TransferJobFailedException, + UnexpectedStatusException +) logger = logging.getLogger(__name__) @@ -39,6 +44,14 @@ def handle_response(response): print("-") +def sleep_generator(): + yield 0.2 # First yield 2 seconds because the api takes time to update + value = 0.5 + while True: + yield value + value *= 2 # Double the value for each iteration + + class AsyncFirecrest: """ This is the basic class you instantiate to access the FirecREST API v2. @@ -610,14 +623,18 @@ async def mv( self, system_name: str, source_path: str, - target_path: str + target_path: str, + blocking: bool = False ) -> dict: """Rename/move a file, directory, or symlink at the `source_path` to the `target_path` on `system_name`'s filesystem. + This operation runs in a job. :param system_name: the system name where the filesystem belongs to :param source_path: the absolute source path :param target_path: the absolute target path + :param blocking: whether to wait for the job to complete + :calls: POST `/filesystem/{system_name}/transfer/mv` """ data: dict[str, str] = { @@ -628,19 +645,52 @@ async def mv( endpoint=f"/filesystem/{system_name}/transfer/mv", data=json.dumps(data) ) - return self._check_response(resp, 201) + job_info = self._check_response(resp, 201) + + if blocking: + await self._wait_for_transfer_job(job_info) + + return job_info + + async def _wait_for_transfer_job(self, job_info): + job_id = job_info["transferJob"]["jobId"] + system_name = job_info["transferJob"]["system"] + for i in sleep_generator(): + try: + job = await self.job_info(system_name, job_id) + except FirecrestException as e: + if e.responses[-1].status_code == 404 and "Job not found" in e.responses[-1].json()['message']: + await asyncio.sleep(i) + continue + + state = job[0]["state"]["current"] + if isinstance(state, list): + state = ",".join(state) + + if slurm_state_completed(state): + break + + await asyncio.sleep(i) + + # TODO: Check if the job was successful + + stdout_file = await self.view(system_name, job_info["transferJob"]["logs"]["outputLog"]) + if "Files were successfully" not in stdout_file: + raise TransferJobFailedException(job_info) async def cp( self, system_name: str, source_path: str, - target_path: str + target_path: str, + blocking: bool = False ) -> dict: """Copies file from `source_path` to `target_path`. :param system_name: the system name where the filesystem belongs to :param source_path: the absolute source path :param target_path: the absolute target path + :param blocking: whether to wait for the job to complete :calls: POST `/filesystem/{system_name}/transfer/cp` """ data: dict[str, str] = { @@ -652,24 +702,37 @@ async def cp( endpoint=f"/filesystem/{system_name}/transfer/cp", data=json.dumps(data) ) - return self._check_response(resp, 201) + job_info = self._check_response(resp, 201) + + if blocking: + await self._wait_for_transfer_job(job_info) + + return job_info async def rm( self, system_name: str, path: str, + blocking: bool = False ) -> None: - """Blocking call to delete a small file. + """Delete a file. :param system_name: the system name where the filesystem belongs to :param path: the absolute target path :calls: DELETE `/filesystem/{system_name}/transfer/rm` """ resp = await self._delete_request( - endpoint=f"/filesystem/{system_name}/ops/rm", + endpoint=f"/filesystem/{system_name}/transfer/rm", params={"path": path} ) - self._check_response(resp, 204) + # self._check_response(resp, 204) + + job_info = self._check_response(resp, 200) + + if blocking: + await self._wait_for_transfer_job(job_info) + + return job_info async def submit( self, @@ -678,8 +741,7 @@ async def submit( working_dir: str, env_vars: Optional[dict[str, str]] = None, ) -> dict: - """Rename/move a file, directory, or symlink at the `source_path` to - the `target_path` on `system_name`'s filesystem. + """Submit a job. :param system_name: the system name where the filesystem belongs to :param script: the job script @@ -706,21 +768,21 @@ async def submit( async def job_info( self, system_name: str, - job: str, - # TODO: support jobs list + jobid: Optional[str] = None ) -> dict: - """Rename/move a file, directory, or symlink at the `source_path` to - the `target_path` on `system_name`'s filesystem. + """Get job information. When the job is not specified, it will return + all the jobs. :param system_name: the system name where the filesystem belongs to - :param script: the job script - :param working_dir: the working directory of the job - :param env_vars: environment variables to be set before running the - job - :calls: POST `/compute/{system_name}/jobs` + :param job: the ID of the job + :calls: GET `/compute/{system_name}/jobs` or + GET `/compute/{system_name}/jobs/{job}` """ + url = f"/compute/{system_name}/jobs" + url = f"{url}/{jobid}" if jobid else url + resp = await self._get_request( - endpoint=f"/compute/{system_name}/jobs", + endpoint=url, ) return self._check_response(resp, 200)["jobs"] @@ -729,15 +791,11 @@ async def job_metadata( system_name: str, jobid: str, ) -> dict: - """Rename/move a file, directory, or symlink at the `source_path` to - the `target_path` on `system_name`'s filesystem. + """Get job metadata. :param system_name: the system name where the filesystem belongs to - :param script: the job script - :param working_dir: the working directory of the job - :param env_vars: environment variables to be set before running the - job - :calls: POST `/compute/{system_name}/jobs` + :param jobid: the ID of the job + :calls: GET `/compute/{system_name}/jobs/{jobid}/metadata` """ resp = await self._get_request( endpoint=f"/compute/{system_name}/jobs/{jobid}/metadata", From 3e7c40be2f8e69f2d1df8f12d0c76bb2c42d497b Mon Sep 17 00:00:00 2001 From: Eirini Koutsaniti Date: Fri, 15 Nov 2024 16:49:01 +0100 Subject: [PATCH 2/9] Fix type --- firecrest/v2/AsyncClient.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/firecrest/v2/AsyncClient.py b/firecrest/v2/AsyncClient.py index 14d9dea..a65e4ab 100644 --- a/firecrest/v2/AsyncClient.py +++ b/firecrest/v2/AsyncClient.py @@ -714,7 +714,7 @@ async def rm( system_name: str, path: str, blocking: bool = False - ) -> None: + ) -> dict: """Delete a file. :param system_name: the system name where the filesystem belongs to From d75b16a8bfcfa9429d47156bde6c62287fa66b53 Mon Sep 17 00:00:00 2001 From: Eirini Koutsaniti Date: Wed, 20 Nov 2024 11:20:34 +0100 Subject: [PATCH 3/9] Add upload method --- firecrest/v2/AsyncClient.py | 143 ++++++++++++++++++------------------ 1 file changed, 72 insertions(+), 71 deletions(-) diff --git a/firecrest/v2/AsyncClient.py b/firecrest/v2/AsyncClient.py index a65e4ab..997d1ba 100644 --- a/firecrest/v2/AsyncClient.py +++ b/firecrest/v2/AsyncClient.py @@ -6,6 +6,7 @@ # from __future__ import annotations +import aiofiles import asyncio import httpx import json @@ -13,7 +14,9 @@ import pathlib import ssl -from contextlib import nullcontext +import requests + +from contextlib import asynccontextmanager from io import BytesIO from packaging.version import Version, parse from typing import Any, ContextManager, Optional, List @@ -52,6 +55,18 @@ def sleep_generator(): value *= 2 # Double the value for each iteration +@asynccontextmanager +async def async_file_context(file): + if isinstance(file, (str, pathlib.Path)): + # Open the file asynchronously if it's a string or path + async with aiofiles.open(file, "rb") as f: + print('opened file') + yield f + else: + # Use the file as-is if it's already a BytesIO or similar + yield file + + class AsyncFirecrest: """ This is the basic class you instantiate to access the FirecREST API v2. @@ -379,7 +394,7 @@ async def tail( path: str, num_bytes: Optional[int] = None, num_lines: Optional[int] = None, - exclude_beginning: bool = False, # Changed to exclude_beginning + exclude_beginning: bool = False, ) -> List[dict]: """Display the ending of a specified file. By default, 10 lines will be returned. @@ -402,7 +417,8 @@ async def tail( "You cannot specify both `num_bytes` and `num_lines`." ) - # If `exclude_beginning` is passed, either `num_bytes` or `num_lines` must be passed + # If `exclude_beginning` is passed, either `num_bytes` or `num_lines` + # must be passed if exclude_beginning and num_bytes is None and num_lines is None: raise ValueError( "`exclude_beginning` requires either `num_bytes` or " @@ -485,7 +501,8 @@ async def chmod( path: str, mode: str ) -> dict: - """Changes the file mod bits of a given file according to the specified mode. + """Changes the file mod bits of a given file according to the + specified mode. :param system_name: the system name where the filesystem belongs to :param path: the absolute target path of the file @@ -510,7 +527,8 @@ async def chown( group: str ) -> dict: """Changes the user and/or group ownership of a given file. - If only owner or group information is passed, only that information will be updated. + If only owner or group information is passed, only that information + will be updated. :param system_name: the system name where the filesystem belongs to :param path: the absolute target path of the file @@ -554,71 +572,6 @@ async def stat( ) return self._check_response(resp, 200)["output"] - async def upload( - self, - system_name: str, - source_path: str | pathlib.Path | BytesIO, - target_path: str, - filename: Optional[str] = None, - ) -> None: - """Blocking call to upload a small file. - The file that will be uploaded will have the same name as the - source_path. - - :param system_name: the system name where the filesystem belongs to - :param source_path: the source path of the file or binary stream - :param target_path: the absolute target path of the directory where - the file will be uploaded - :param filename: naming target file to filename (default is same as - the local one) - :calls: POST `/filesystem/{system_name}/ops/upload` - """ - context: ContextManager[BytesIO] = ( - open(source_path, "rb") # type: ignore - if isinstance(source_path, str) or isinstance(source_path, pathlib.Path) - else nullcontext(source_path) - ) - with context as f: - # Set filename - if filename is not None: - f = (filename, f) # type: ignore - - resp = await self._post_request( - endpoint=f"/filesystem/{system_name}/ops/upload", - params={"target_path": target_path}, - files={"file": f} - ) - - self._check_response(resp, 201) - - async def download( - self, - system_name: str, - source_path: str, - target_path: str | pathlib.Path | BytesIO, - ) -> None: - """Blocking call to download a small file. - - :param system_name: the system name where the filesystem belongs to - :param source_path: the absolute source path of the file or binary - stream - :param target_path: the target path in the local filesystem or binary - stream - :calls: POST `/filesystem/{system_name}/ops/download` - """ - resp = await self._get_request( - endpoint=f"/filesystem/{system_name}/ops/download", - params={"path": source_path} - ) - self._check_response(resp, 200, return_json=False) - context: ContextManager[BytesIO] = ( - open(target_path, "wb") # type: ignore - if isinstance(target_path, str) or isinstance(target_path, pathlib.Path) - else nullcontext(target_path) - ) - with context as f: - f.write(resp.content) - async def mv( self, system_name: str, @@ -675,7 +628,10 @@ async def _wait_for_transfer_job(self, job_info): # TODO: Check if the job was successful stdout_file = await self.view(system_name, job_info["transferJob"]["logs"]["outputLog"]) - if "Files were successfully" not in stdout_file: + if ( + "Files were successfully" not in stdout_file and + "File was successfully" not in stdout_file + ): raise TransferJobFailedException(job_info) async def cp( @@ -734,6 +690,51 @@ async def rm( return job_info + async def upload( + self, + system_name: str, + local_file: str | pathlib.Path | BytesIO, + directory: str, + filename: str, + blocking: bool = False + ) -> dict: + """Upload a file to the system. The user uploads a file to the + staging area Object storage) of FirecREST and it will be moved + to the target directory in a job. + + :param system_name: the system name where the filesystem belongs to + :param local_file: the local file's path to be uploaded (can be + relative) + :param source_path: the absolut target path of the directory where the + file will be uploaded + :param filename: the name of the file in the target directory + :param blocking: whether to wait for the job to complete + :calls: POST `/filesystem/{system_name}/transfer/upload` + """ + # TODO check if the file exists locally + + resp = await self._post_request( + endpoint=f"/filesystem/{system_name}/transfer/upload", + data=json.dumps({ + "source_path": directory, + "fileName": filename + }) + ) + + transfer_info = self._check_response(resp, 201) + # Upload the file + # FIXME: This is a blocking call, should be async + with open(local_file, "rb") as f: + requests.put( + url=transfer_info["uploadUrl"], + data=f + ) + + if blocking: + await self._wait_for_transfer_job(transfer_info) + + return transfer_info + async def submit( self, system_name: str, From 89640e2e0a0af08d3b17f2de1484ba902f28a73a Mon Sep 17 00:00:00 2001 From: Eirini Koutsaniti Date: Wed, 20 Nov 2024 13:46:19 +0100 Subject: [PATCH 4/9] Add download method --- firecrest/v2/AsyncClient.py | 48 +++++++++++++++++++++++++++++++++---- 1 file changed, 43 insertions(+), 5 deletions(-) diff --git a/firecrest/v2/AsyncClient.py b/firecrest/v2/AsyncClient.py index 997d1ba..c8a40ff 100644 --- a/firecrest/v2/AsyncClient.py +++ b/firecrest/v2/AsyncClient.py @@ -630,7 +630,8 @@ async def _wait_for_transfer_job(self, job_info): stdout_file = await self.view(system_name, job_info["transferJob"]["logs"]["outputLog"]) if ( "Files were successfully" not in stdout_file and - "File was successfully" not in stdout_file + "File was successfully" not in stdout_file and + "Multipart file upload successfully completed" not in stdout_file ): raise TransferJobFailedException(job_info) @@ -723,11 +724,11 @@ async def upload( transfer_info = self._check_response(resp, 201) # Upload the file - # FIXME: This is a blocking call, should be async - with open(local_file, "rb") as f: - requests.put( + async with aiofiles.open(local_file, "rb") as f: + data = await f.read() # TODO this will fail for large files + await self._session.put( url=transfer_info["uploadUrl"], - data=f + data=data ) if blocking: @@ -735,6 +736,43 @@ async def upload( return transfer_info + async def download( + self, + system_name: str, + source_path: str, + target_path: str, + blocking: bool = False + ) -> dict: + """Download a file from the remote system. + + :param system_name: the system name where the filesystem belongs to + :param source_path: the absolute source path of the file + :param target_path: the target path in the local filesystem (can + be relative path) + :param blocking: whether to wait for the job to complete + :calls: POST `/filesystem/{system_name}/transfer/upload` + """ + resp = await self._post_request( + endpoint=f"/filesystem/{system_name}/transfer/download", + data=json.dumps({ + "source_path": source_path, + }) + ) + + transfer_info = self._check_response(resp, 201) + if blocking: + await self._wait_for_transfer_job(transfer_info) + + # Download the file + async with aiofiles.open(target_path, "wb") as f: + # TODO this will fail for large files + resp = await self._session.get( + url=transfer_info["downloadUrl"], + ) + await f.write(resp.content) + + return transfer_info + async def submit( self, system_name: str, From e65b57e40ce46cb91f95f34ef7cd90203bfa31ec Mon Sep 17 00:00:00 2001 From: Eirini Koutsaniti Date: Mon, 25 Nov 2024 12:57:08 +0100 Subject: [PATCH 5/9] Add aiofiles dep --- pyproject.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/pyproject.toml b/pyproject.toml index 928c4d4..55ff7b6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -25,6 +25,7 @@ classifiers = [ ] requires-python = ">=3.7" dependencies = [ + "aiofiles>=24.1.0", "requests>=2.14.0", "PyJWT>=2.4.0", "typer[all]~=0.7.0", From 3f9d33b232b18055a78cbd94ade7bc243bee95ff Mon Sep 17 00:00:00 2001 From: Eirini Koutsaniti Date: Mon, 25 Nov 2024 13:09:26 +0100 Subject: [PATCH 6/9] Pin aiofiles to 23.2.1 to maintain support for python 3.7 --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 55ff7b6..5bc55a1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -25,7 +25,7 @@ classifiers = [ ] requires-python = ">=3.7" dependencies = [ - "aiofiles>=24.1.0", + "aiofiles==23.2.1", "requests>=2.14.0", "PyJWT>=2.4.0", "typer[all]~=0.7.0", From 855c1d49a7c0c384b204f679735e5574b532a2c7 Mon Sep 17 00:00:00 2001 From: Eirini Koutsaniti Date: Mon, 25 Nov 2024 13:16:51 +0100 Subject: [PATCH 7/9] Fix mypy errors --- firecrest/v2/AsyncClient.py | 2 +- pyproject.toml | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/firecrest/v2/AsyncClient.py b/firecrest/v2/AsyncClient.py index c8a40ff..391fbef 100644 --- a/firecrest/v2/AsyncClient.py +++ b/firecrest/v2/AsyncClient.py @@ -728,7 +728,7 @@ async def upload( data = await f.read() # TODO this will fail for large files await self._session.put( url=transfer_info["uploadUrl"], - data=data + data=data # type: ignore ) if blocking: diff --git a/pyproject.toml b/pyproject.toml index 5bc55a1..4944f2b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -47,6 +47,7 @@ test = [ "pytest>=5.3", "flake8~=5.0", "mypy~=0.991", + "types-aiofiles>=23.2.1.0", "types-requests~=2.28.11", "pytest-httpserver~=1.0.6", "pytest-asyncio>=0.21.1", From 7987e8d084c2ddb8bb9edb7e129f3d1d51e53d9a Mon Sep 17 00:00:00 2001 From: Eirini Koutsaniti Date: Mon, 25 Nov 2024 13:23:51 +0100 Subject: [PATCH 8/9] Fix aiofiles version --- pyproject.toml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 4944f2b..e54edef 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -25,7 +25,7 @@ classifiers = [ ] requires-python = ">=3.7" dependencies = [ - "aiofiles==23.2.1", + "aiofiles~=23.2.1", "requests>=2.14.0", "PyJWT>=2.4.0", "typer[all]~=0.7.0", @@ -47,7 +47,7 @@ test = [ "pytest>=5.3", "flake8~=5.0", "mypy~=0.991", - "types-aiofiles>=23.2.1.0", + "types-aiofiles~=23.2.1.0", "types-requests~=2.28.11", "pytest-httpserver~=1.0.6", "pytest-asyncio>=0.21.1", From 06eca4f982fd6ddc37e42916b5df8799f8dc18e8 Mon Sep 17 00:00:00 2001 From: Eirini Koutsaniti Date: Mon, 25 Nov 2024 13:41:05 +0100 Subject: [PATCH 9/9] Fix types-aiofiles version --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index e54edef..2e57309 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -47,7 +47,7 @@ test = [ "pytest>=5.3", "flake8~=5.0", "mypy~=0.991", - "types-aiofiles~=23.2.1.0", + "types-aiofiles~=23.2.0.0", "types-requests~=2.28.11", "pytest-httpserver~=1.0.6", "pytest-asyncio>=0.21.1",