forked from NVIDIA/NVFlare
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathjob_def_manager_spec.py
254 lines (196 loc) · 7.34 KB
/
job_def_manager_spec.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
# Copyright (c) 2022, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from abc import ABC, abstractmethod
from typing import Any, Dict, List, Optional, Union
from nvflare.apis.fl_component import FLComponent
from nvflare.apis.fl_context import FLContext
from nvflare.apis.job_def import Job, RunStatus
class JobDefManagerSpec(FLComponent, ABC):
"""Job Definition Management API."""
@abstractmethod
def create(self, meta: dict, uploaded_content: Union[str, bytes], fl_ctx: FLContext) -> Dict[str, Any]:
"""Create a new job permanently.
The caller must have validated the content already and created initial meta. Receives bytes of uploaded folder,
uploading to permanent store, create unique Job ID (jid) and return meta.
Args:
meta: caller-provided meta info
uploaded_content: data of the job definition: data bytes or file name that contains the data bytes
fl_ctx (FLContext): FLContext information
Returns:
A dict containing meta info. Additional meta info are added, especially
a unique Job ID (jid) which has been created.
"""
pass
@abstractmethod
def clone(self, from_jid: str, meta: dict, fl_ctx: FLContext) -> Dict[str, Any]:
"""Create a new job by cloning an existing job.
Args:
from_jid: the job to be cloned
meta: meta info for the new job
fl_ctx: FLContext info
Returns:
A dict containing meta info. Additional meta info are added, especially
a unique Job ID (jid) which has been created.
"""
pass
@abstractmethod
def get_job(self, jid: str, fl_ctx: FLContext) -> Job:
"""Gets the Job object through the job ID.
Args:
jid (str): Job ID
fl_ctx (FLContext): FLContext information
Returns:
A Job object
"""
pass
@abstractmethod
def get_app(self, job: Job, app_name: str, fl_ctx: FLContext) -> bytes:
"""Get the contents of the specified app in bytes.
Args:
job: Job object
app_name: name of the app to get
fl_ctx (FLContext): FLContext information
Returns:
Content of the specified app in bytes
"""
pass
@abstractmethod
def get_content(self, meta: dict, fl_ctx: FLContext) -> Optional[bytes]:
"""Gets the entire uploaded content for a Job.
Args:
meta (dict): the meta info of the job
fl_ctx (FLContext): FLContext information
Returns:
Uploaded content of the job in bytes
"""
pass
@abstractmethod
def update_meta(self, jid: str, meta, fl_ctx: FLContext):
"""Update the meta of an existing Job.
Args:
jid (str): Job ID
meta: dictionary of metadata for the job
fl_ctx (FLContext): FLContext information
"""
pass
@abstractmethod
def refresh_meta(self, job: Job, meta_keys: list, fl_ctx: FLContext):
"""Refresh meta of the job as specified in the meta keys
Save the values of the specified keys into job store
Args:
job: job object
meta_keys: meta keys need to updated
fl_ctx: FLContext
"""
pass
@abstractmethod
def set_status(self, jid: str, status: RunStatus, fl_ctx: FLContext):
"""Set status of an existing Job.
Args:
jid (str): Job ID
status (RunStatus): status to set
fl_ctx (FLContext): FLContext information
"""
pass
@abstractmethod
def get_jobs_to_schedule(self, fl_ctx: FLContext) -> List[Job]:
"""Get job candidates for scheduling.
Args:
fl_ctx: FL context
Returns: list of jobs for scheduling
"""
pass
@abstractmethod
def get_all_jobs(self, fl_ctx: FLContext) -> List[Job]:
"""Gets all Jobs in the system.
Args:
fl_ctx (FLContext): FLContext information
Returns:
A list of all jobs
"""
pass
@abstractmethod
def get_jobs_by_status(self, run_status: Union[RunStatus, List[RunStatus]], fl_ctx: FLContext) -> List[Job]:
"""Gets Jobs of a specified status.
Args:
run_status: status to filter for: a single or a list of status values
fl_ctx (FLContext): FLContext information
Returns:
A list of Jobs of the specified status
"""
pass
@abstractmethod
def get_jobs_waiting_for_review(self, reviewer_name: str, fl_ctx: FLContext) -> List[Job]:
"""Gets Jobs waiting for review for the specified user.
Args:
reviewer_name (str): reviewer name
fl_ctx (FLContext): FLContext information
Returns:
A list of Jobs waiting for review for the specified user.
"""
pass
@abstractmethod
def set_approval(
self, jid: str, reviewer_name: str, approved: bool, note: str, fl_ctx: FLContext
) -> Dict[str, Any]:
"""Sets the approval for the specified user for a certain Job.
Args:
jid (str): job id
reviewer_name (str): reviewer name
approved (bool): whether job is approved
note (str): any note message
fl_ctx (FLContext): FLContext information
Returns:
A dictionary of Job metadata.
"""
pass
@abstractmethod
def delete(self, jid: str, fl_ctx: FLContext):
"""Deletes the specified Job.
Args:
jid (str): Job ID
fl_ctx (FLContext): FLContext information
"""
pass
@abstractmethod
def save_workspace(self, jid: str, data: Union[bytes, str], fl_ctx: FLContext):
"""Save the job workspace to the job storage.
Args:
jid (str): Job ID
data: Job workspace data or name of data file
fl_ctx (FLContext): FLContext information
"""
pass
@abstractmethod
def get_storage_component(self, jid: str, component: str, fl_ctx: FLContext):
"""Get the workspace data from the job storage.
Args:
jid (str): Job ID
component: storage component name
fl_ctx (FLContext): FLContext information
"""
pass
@abstractmethod
def get_storage_for_download(
self, jid: str, download_dir: str, component: str, download_file: str, fl_ctx: FLContext
):
"""Get the workspace data from the job storage.
Args:
jid (str): Job ID
download_dir: download folder
component: storage component name
download_file: download file name
fl_ctx (FLContext): FLContext information
"""
pass