Skip to content

Commit

Permalink
[FLINK-4611] [kinesis] Make "AUTO" credential provider as default for…
Browse files Browse the repository at this point in the history
… Kinesis Connector

This closes apache#2914.
  • Loading branch information
魏偉哲 authored and tzulitai committed Dec 15, 2016
1 parent d84599e commit 4666e65
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,20 @@ public static AmazonKinesisClient createKinesisClient(Properties configProps) {
* @return The corresponding AWS Credentials Provider instance
*/
public static AWSCredentialsProvider getCredentialsProvider(final Properties configProps) {
CredentialProvider credentialProviderType = CredentialProvider.valueOf(configProps.getProperty(
AWSConfigConstants.AWS_CREDENTIALS_PROVIDER, CredentialProvider.BASIC.toString()));
CredentialProvider credentialProviderType;
if (!configProps.containsKey(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER)) {
if (configProps.containsKey(AWSConfigConstants.AWS_ACCESS_KEY_ID)
&& configProps.containsKey(AWSConfigConstants.AWS_SECRET_ACCESS_KEY)) {
// if the credential provider type is not specified, but the Access Key ID and Secret Key are given, it will default to BASIC
credentialProviderType = CredentialProvider.BASIC;
} else {
// if the credential provider type is not specified, it will default to AUTO
credentialProviderType = CredentialProvider.AUTO;
}
} else {
credentialProviderType = CredentialProvider.valueOf(configProps.getProperty(
AWSConfigConstants.AWS_CREDENTIALS_PROVIDER));
}

AWSCredentialsProvider credentialsProvider;

Expand All @@ -90,10 +102,6 @@ public static AWSCredentialsProvider getCredentialsProvider(final Properties con
? new ProfileCredentialsProvider(profileName)
: new ProfileCredentialsProvider(profileConfigPath, profileName);
break;
case AUTO:
credentialsProvider = new DefaultAWSCredentialsProviderChain();
break;
default:
case BASIC:
credentialsProvider = new AWSCredentialsProvider() {
@Override
Expand All @@ -108,6 +116,10 @@ public void refresh() {
// do nothing
}
};
break;
default:
case AUTO:
credentialsProvider = new DefaultAWSCredentialsProviderChain();
}

return credentialsProvider;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,15 +130,7 @@ public static void validateProducerConfiguration(Properties config) {
* Validate configuration properties related to Amazon AWS service
*/
public static void validateAwsConfiguration(Properties config) {
if (!config.containsKey(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER)) {
// if the credential provider type is not specified, it will default to BASIC later on,
// so the Access Key ID and Secret Key must be given
if (!config.containsKey(AWSConfigConstants.AWS_ACCESS_KEY_ID)
|| !config.containsKey(AWSConfigConstants.AWS_SECRET_ACCESS_KEY)) {
throw new IllegalArgumentException("Please set values for AWS Access Key ID ('"+ AWSConfigConstants.AWS_ACCESS_KEY_ID +"') " +
"and Secret Key ('" + AWSConfigConstants.AWS_SECRET_ACCESS_KEY + "') when using the BASIC AWS credential provider type.");
}
} else {
if (config.containsKey(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER)) {
String credentialsProviderType = config.getProperty(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER);

// value specified for AWSConfigConstants.AWS_CREDENTIALS_PROVIDER needs to be recognizable
Expand All @@ -157,7 +149,7 @@ public static void validateAwsConfiguration(Properties config) {
if (providerType == CredentialProvider.BASIC) {
if (!config.containsKey(AWSConfigConstants.AWS_ACCESS_KEY_ID)
|| !config.containsKey(AWSConfigConstants.AWS_SECRET_ACCESS_KEY)) {
throw new IllegalArgumentException("Please set values for AWS Access Key ID ('"+ AWSConfigConstants.AWS_ACCESS_KEY_ID +"') " +
throw new IllegalArgumentException("Please set values for AWS Access Key ID ('" + AWSConfigConstants.AWS_ACCESS_KEY_ID + "') " +
"and Secret Key ('" + AWSConfigConstants.AWS_SECRET_ACCESS_KEY + "') when using the BASIC AWS credential provider type.");
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,18 +86,6 @@ public void testUnrecognizableAwsRegionInConfig() {
KinesisConfigUtil.validateAwsConfiguration(testConfig);
}

@Test
public void testCredentialProviderTypeDefaultToBasicButNoCredentialsSetInConfig() {
exception.expect(IllegalArgumentException.class);
exception.expectMessage("Please set values for AWS Access Key ID ('"+ AWSConfigConstants.AWS_ACCESS_KEY_ID +"') " +
"and Secret Key ('" + AWSConfigConstants.AWS_SECRET_ACCESS_KEY + "') when using the BASIC AWS credential provider type.");

Properties testConfig = new Properties();
testConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");

KinesisConfigUtil.validateAwsConfiguration(testConfig);
}

@Test
public void testCredentialProviderTypeSetToBasicButNoCredentialSetInConfig() {
exception.expect(IllegalArgumentException.class);
Expand Down

0 comments on commit 4666e65

Please sign in to comment.