Skip to content

Commit

Permalink
Add retry policy for azureStorage (microsoft#1480)
Browse files Browse the repository at this point in the history
  • Loading branch information
SparkSnail authored Aug 29, 2019
1 parent a224f4f commit 2d252c9
Show file tree
Hide file tree
Showing 8 changed files with 125 additions and 75 deletions.
4 changes: 4 additions & 0 deletions docs/en_US/Tutorial/ExperimentConfig.md
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,10 @@ machineList:

__azureShare__ is the share of the azure file storage.

* __uploadRetryCount__

If upload files to azure storage failed, NNI will retry the process of uploading, this field will specify the number of attempts to re-upload files.

* __paiConfig__

* __userName__
Expand Down
6 changes: 4 additions & 2 deletions src/nni_manager/rest_server/restValidationSchemas.ts
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,8 @@ export namespace ValidationSchemas {
azureStorage: joi.object({
accountName: joi.string().regex(/^([0-9]|[a-z]|[A-Z]|-){3,31}$/),
azureShare: joi.string().regex(/^([0-9]|[a-z]|[A-Z]|-){3,63}$/)
})
}),
uploadRetryCount: joi.number().min(1)
}),
frameworkcontroller_config: joi.object({
storage: joi.string().min(1),
Expand All @@ -141,7 +142,8 @@ export namespace ValidationSchemas {
azureStorage: joi.object({
accountName: joi.string().regex(/^([0-9]|[a-z]|[A-Z]|-){3,31}$/),
azureShare: joi.string().regex(/^([0-9]|[a-z]|[A-Z]|-){3,63}$/)
})
}),
uploadRetryCount: joi.number().min(1)
}),
nni_manager_ip: joi.object({
nniManagerIp: joi.string().min(1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,15 @@ export namespace AzureStorageClientUtility {
* @param fileServerClient
* @param azureShare
*/
export async function createShare(fileServerClient: any, azureShare: any): Promise<void> {
const deferred: Deferred<void> = new Deferred<void>();
export async function createShare(fileServerClient: any, azureShare: any): Promise<boolean> {
const deferred: Deferred<boolean> = new Deferred<boolean>();
fileServerClient.createShareIfNotExists(azureShare, (error: any, result: any, response: any) => {
if (error) {
getLogger()
.error(`Create share failed:, ${error}`);
deferred.reject(error);
deferred.resolve(false);
} else {
deferred.resolve();
deferred.resolve(true);
}
});

Expand All @@ -56,18 +56,17 @@ export namespace AzureStorageClientUtility {
* @param azureFoler
* @param azureShare
*/
export async function createDirectory(fileServerClient: azureStorage.FileService, azureFoler: any, azureShare: any): Promise<void> {
const deferred: Deferred<void> = new Deferred<void>();
export async function createDirectory(fileServerClient: azureStorage.FileService, azureFoler: any, azureShare: any): Promise<boolean> {
const deferred: Deferred<boolean> = new Deferred<boolean>();
fileServerClient.createDirectoryIfNotExists(azureShare, azureFoler, (error: any, result: any, response: any) => {
if (error) {
getLogger()
.error(`Create directory failed:, ${error}`);
deferred.reject(error);
deferred.resolve(false);
} else {
deferred.resolve();
deferred.resolve(true);
}
});

return deferred.promise;
}

Expand All @@ -77,16 +76,20 @@ export namespace AzureStorageClientUtility {
* @param azureDirectory
*/
export async function createDirectoryRecursive(fileServerClient: azureStorage.FileService, azureDirectory: string,
azureShare: any): Promise<void> {
const deferred: Deferred<void> = new Deferred<void>();
azureShare: any): Promise<boolean> {
const deferred: Deferred<boolean> = new Deferred<boolean>();
const directories: string[] = azureDirectory.split('/');
let rootDirectory: string = '';
for (const directory of directories) {
rootDirectory += directory;
await createDirectory(fileServerClient, rootDirectory, azureShare);
let result:boolean = await createDirectory(fileServerClient, rootDirectory, azureShare);
if (!result) {
deferred.resolve(false);
return deferred.promise;
}
rootDirectory += '/';
}
deferred.resolve();
deferred.resolve(true);

return deferred.promise;
}
Expand All @@ -100,16 +103,16 @@ export namespace AzureStorageClientUtility {
* @param localFilePath
*/
async function uploadFileToAzure(fileServerClient: any, azureDirectory: string, azureFileName: any, azureShare: any,
localFilePath: string): Promise<void> {
const deferred: Deferred<void> = new Deferred<void>();
localFilePath: string): Promise<boolean> {
const deferred: Deferred<boolean> = new Deferred<boolean>();
await fileServerClient.createFileFromLocalFile(azureShare, azureDirectory, azureFileName, localFilePath,
(error: any, result: any, response: any) => {
if (error) {
getLogger()
.error(`Upload file failed:, ${error}`);
deferred.reject(error);
deferred.resolve(false);
} else {
deferred.resolve();
deferred.resolve(true);
}
});

Expand All @@ -125,17 +128,17 @@ export namespace AzureStorageClientUtility {
* @param localFilePath
*/
async function downloadFile(fileServerClient: any, azureDirectory: string, azureFileName: any, azureShare: any,
localFilePath: string): Promise<void> {
const deferred: Deferred<void> = new Deferred<void>();
localFilePath: string): Promise<boolean> {
const deferred: Deferred<boolean> = new Deferred<boolean>();
// tslint:disable-next-line:non-literal-fs-path
await fileServerClient.getFileToStream(azureShare, azureDirectory, azureFileName, fs.createWriteStream(localFilePath),
(error: any, result: any, response: any) => {
if (error) {
getLogger()
.error(`Download file failed:, ${error}`);
deferred.reject(error);
deferred.resolve(false);
} else {
deferred.resolve();
deferred.resolve(true);
}
});

Expand All @@ -151,28 +154,38 @@ export namespace AzureStorageClientUtility {
*/
// tslint:disable:non-literal-fs-path
export async function uploadDirectory(fileServerClient: azureStorage.FileService, azureDirectory: string, azureShare: any,
localDirectory: string): Promise<void> {
const deferred: Deferred<void> = new Deferred<void>();
localDirectory: string): Promise<boolean> {
const deferred: Deferred<boolean> = new Deferred<boolean>();
const fileNameArray: string[] = fs.readdirSync(localDirectory);
await createDirectoryRecursive(fileServerClient, azureDirectory, azureShare);
let result: boolean = await createDirectoryRecursive(fileServerClient, azureDirectory, azureShare);
if (!result) {
deferred.resolve(false);
return deferred.promise;
}
for (const fileName of fileNameArray) {
const fullFilePath: string = path.join(localDirectory, fileName);
try {
let resultUploadFile: boolean = true;
let resultUploadDir: boolean = true;
if (fs.lstatSync(fullFilePath)
.isFile()) {
await uploadFileToAzure(fileServerClient, azureDirectory, fileName, azureShare, fullFilePath);
resultUploadFile = await uploadFileToAzure(fileServerClient, azureDirectory, fileName, azureShare, fullFilePath);
} else {
// If filePath is a directory, recuisively copy it to azure
await uploadDirectory(fileServerClient, String.Format('{0}/{1}', azureDirectory, fileName), azureShare, fullFilePath);
resultUploadDir = await uploadDirectory(fileServerClient, String.Format('{0}/{1}', azureDirectory, fileName), azureShare, fullFilePath);
}
if (!(resultUploadFile && resultUploadDir)) {
deferred.resolve(false);
return deferred.promise;
}
} catch (error) {
deferred.reject(error);
deferred.resolve(false);

return deferred.promise;
}
}
// All files/directories are copied successfully, resolve
deferred.resolve();
deferred.resolve(true);

return deferred.promise;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import * as path from 'path';
import * as component from '../../../common/component';
import { getExperimentId } from '../../../common/experimentStartupInfo';
import {
JobApplicationForm, NNIManagerIpConfig, TrialJobApplicationForm, TrialJobDetail
JobApplicationForm, NNIManagerIpConfig, TrialJobApplicationForm, TrialJobDetail, TrialJobStatus
} from '../../../common/trainingService';
import { delay, generateParamFileName, getExperimentRootDir, uniqueString } from '../../../common/utils';
import { CONTAINER_INSTALL_NNI_SHELL_FORMAT } from '../../common/containerJobData';
Expand Down Expand Up @@ -102,10 +102,13 @@ class FrameworkControllerTrainingService extends KubernetesTrainingService imple

//upload code files
const trialJobOutputUrl: string = await this.uploadCodeFiles(trialJobId, trialLocalTempFolder);

let initStatus: TrialJobStatus = 'WAITING';
if (!trialJobOutputUrl) {
initStatus = 'FAILED';
}
const trialJobDetail: KubernetesTrialJobDetail = new KubernetesTrialJobDetail(
trialJobId,
'WAITING',
initStatus,
Date.now(),
trialWorkingFolder,
form,
Expand Down Expand Up @@ -208,24 +211,10 @@ class FrameworkControllerTrainingService extends KubernetesTrainingService imple
let trialJobOutputUrl: string = '';

if (this.fcClusterConfig.storageType === 'azureStorage') {
if (this.azureStorageClient === undefined) {
throw new Error('azureStorageClient is not initialized');
}
try {
//upload local files, including scripts for running the trial and configuration (e.g., hyperparameters) for the trial, to azure storage
await AzureStorageClientUtility.uploadDirectory(
this.azureStorageClient, `nni/${getExperimentId()}/${trialJobId}`, this.azureStorageShare, `${trialLocalTempFolder}`);
//upload code files to azure storage
await AzureStorageClientUtility.uploadDirectory(
this.azureStorageClient, `nni/${getExperimentId()}/${trialJobId}`, this.azureStorageShare, `${this.fcTrialConfig.codeDir}`);

trialJobOutputUrl = `https://${this.azureStorageAccountName}.file.core.windows.net/` +
`${this.azureStorageShare}/${path.join('nni', getExperimentId(), trialJobId, 'output')}`;
} catch (error) {
this.log.error(error);

return Promise.reject(error);
}
const azureFrameworkControllerClusterConfig: FrameworkControllerClusterConfigAzure =
<FrameworkControllerClusterConfigAzure>this.fcClusterConfig;
trialJobOutputUrl = await this.uploadFilesToAzureStorage(trialJobId, trialLocalTempFolder, this.fcTrialConfig.codeDir,
azureFrameworkControllerClusterConfig.uploadRetryCount);
} else if (this.fcClusterConfig.storageType === 'nfs') {
const nfsFrameworkControllerClusterConfig: FrameworkControllerClusterConfigNFS =
<FrameworkControllerClusterConfigNFS>this.fcClusterConfig;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import * as component from '../../../common/component';

import { getExperimentId } from '../../../common/experimentStartupInfo';
import {
JobApplicationForm, NNIManagerIpConfig, TrialJobApplicationForm, TrialJobDetail
JobApplicationForm, NNIManagerIpConfig, TrialJobApplicationForm, TrialJobDetail, TrialJobStatus
} from '../../../common/trainingService';
import { delay, generateParamFileName, getExperimentRootDir, uniqueString } from '../../../common/utils';
import { CONTAINER_INSTALL_NNI_SHELL_FORMAT } from '../../common/containerJobData';
Expand Down Expand Up @@ -102,9 +102,13 @@ class KubeflowTrainingService extends KubernetesTrainingService implements Kuber
await this.prepareRunScript(trialLocalTempFolder, trialJobId, trialWorkingFolder, curTrialSequenceId, form);
//upload files to sotrage
const trialJobOutputUrl: string = await this.uploadCodeFiles(trialJobId, trialLocalTempFolder);
let initStatus: TrialJobStatus = 'WAITING';
if (!trialJobOutputUrl) {
initStatus = 'FAILED';
}
const trialJobDetail: KubernetesTrialJobDetail = new KubernetesTrialJobDetail(
trialJobId,
'WAITING',
initStatus,
Date.now(),
trialWorkingFolder,
form,
Expand Down Expand Up @@ -215,23 +219,8 @@ class KubeflowTrainingService extends KubernetesTrainingService implements Kuber
if (this.azureStorageClient === undefined) {
throw new Error('azureStorageClient is not initialized');
}
try {
//upload local files, including scripts for running the trial and configuration (e.g., hyperparameters) for the trial, to azure storage
await AzureStorageClientUtility.uploadDirectory(this.azureStorageClient,
`nni/${getExperimentId()}/${trialJobId}`, this.azureStorageShare,
`${trialLocalTempFolder}`);
//upload code files to azure storage
await AzureStorageClientUtility.uploadDirectory(this.azureStorageClient,
`nni/${getExperimentId()}/${trialJobId}`, this.azureStorageShare,
`${this.kubeflowTrialConfig.codeDir}`);

trialJobOutputUrl = `https://${this.azureStorageAccountName}.file.core.windows.net/${this.azureStorageShare}` +
`/${path.join('nni', getExperimentId(), trialJobId, 'output')}`;
} catch (error) {
this.log.error(error);

return Promise.reject(error);
}
const azureKubeflowClusterConfig: KubeflowClusterConfigAzure = <KubeflowClusterConfigAzure>this.kubeflowClusterConfig;
trialJobOutputUrl = await this.uploadFilesToAzureStorage(trialJobId, trialLocalTempFolder, this.kubeflowTrialConfig.codeDir, azureKubeflowClusterConfig.uploadRetryCount);
} else if (this.kubeflowClusterConfig.storage === 'nfs' || this.kubeflowClusterConfig.storage === undefined) {
const nfsKubeflowClusterConfig: KubeflowClusterConfigNFS = <KubeflowClusterConfigNFS>this.kubeflowClusterConfig;
// Creat work dir for current trial in NFS directory
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,16 +75,19 @@ export class KubernetesClusterConfigNFS extends KubernetesClusterConfig {
export class KubernetesClusterConfigAzure extends KubernetesClusterConfig {
public readonly keyVault: KeyVaultConfig;
public readonly azureStorage: AzureStorage;
public readonly uploadRetryCount: number | undefined;

constructor(
apiVersion: string,
keyVault: KeyVaultConfig,
azureStorage: AzureStorage,
storage?: KubernetesStorageKind
storage?: KubernetesStorageKind,
uploadRetryCount?: number
) {
super(apiVersion, storage);
this.keyVault = keyVault;
this.azureStorage = azureStorage;
this.uploadRetryCount = uploadRetryCount;
}

public get storageType(): KubernetesStorageKind {
Expand All @@ -98,7 +101,8 @@ export class KubernetesClusterConfigAzure extends KubernetesClusterConfig {
kubernetesClusterConfigObjectAzure.apiVersion,
kubernetesClusterConfigObjectAzure.keyVault,
kubernetesClusterConfigObjectAzure.azureStorage,
kubernetesClusterConfigObjectAzure.storage
kubernetesClusterConfigObjectAzure.storage,
kubernetesClusterConfigObjectAzure.uploadRetryCount
);
}
}
Expand Down
Loading

0 comments on commit 2d252c9

Please sign in to comment.