Skip to content

Commit

Permalink
[FLINK-24227][connectors/kinesis] Fixing Backward Compatibility for A…
Browse files Browse the repository at this point in the history
…WSConfigConstants Class, Removing dependency on assert in tests.
  • Loading branch information
vahmed-hamdy authored and dannycranmer committed Dec 16, 2021
1 parent a6ec687 commit 4453f15
Show file tree
Hide file tree
Showing 36 changed files with 312 additions and 219 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.flink.annotation.PublicEvolving;

import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.http.async.SdkAsyncHttpClient;

/** Configuration keys for AWS service usage. */
@PublicEvolving
Expand Down Expand Up @@ -131,6 +132,12 @@ public enum CredentialProvider {
/** The HTTP protocol version to use. */
public static final String HTTP_PROTOCOL_VERSION = "aws.http.protocol.version";

/** Maximum request concurrency for {@link SdkAsyncHttpClient}. */
public static final String HTTP_CLIENT_MAX_CONCURRENCY = "aws.http-client.max-concurrency";

/** Read Request timeout for {@link SdkAsyncHttpClient}. */
public static final String HTTP_CLIENT_READ_TIMEOUT_MILLIS = "aws.http-client.read-timeout";

public static String accessKeyId(String prefix) {
return prefix + ".basic.accesskeyid";
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,46 @@ static AwsCredentialsProvider getWebIdentityTokenFileCredentialsProvider(
return webIdentityBuilder.build();
}

public static SdkAsyncHttpClient createAsyncHttpClient(final Properties configProperties) {
final AttributeMap.Builder clientConfiguration =
AttributeMap.builder().put(SdkHttpConfigurationOption.TCP_KEEPALIVE, true);

Optional.ofNullable(
configProperties.getProperty(
AWSConfigConstants.HTTP_CLIENT_MAX_CONCURRENCY))
.map(Integer::parseInt)
.ifPresent(
integer ->
clientConfiguration.put(
SdkHttpConfigurationOption.MAX_CONNECTIONS, integer));

Optional.ofNullable(
configProperties.getProperty(
AWSConfigConstants.HTTP_CLIENT_READ_TIMEOUT_MILLIS))
.map(Integer::parseInt)
.map(Duration::ofMillis)
.ifPresent(
timeout ->
clientConfiguration.put(
SdkHttpConfigurationOption.READ_TIMEOUT, timeout));

Optional.ofNullable(configProperties.getProperty(AWSConfigConstants.TRUST_ALL_CERTIFICATES))
.map(Boolean::parseBoolean)
.ifPresent(
bool ->
clientConfiguration.put(
SdkHttpConfigurationOption.TRUST_ALL_CERTIFICATES, bool));

Optional.ofNullable(configProperties.getProperty(AWSConfigConstants.HTTP_PROTOCOL_VERSION))
.map(Protocol::valueOf)
.ifPresent(
protocol ->
clientConfiguration.put(
SdkHttpConfigurationOption.PROTOCOL, protocol));
return createAsyncHttpClient(
clientConfiguration.build(), NettyNioAsyncHttpClient.builder());
}

public static SdkAsyncHttpClient createAsyncHttpClient(
final NettyNioAsyncHttpClient.Builder httpClientBuilder) {
return createAsyncHttpClient(AttributeMap.empty(), httpClientBuilder);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,68 @@ public void testGetCredentialsProviderNamedProfile() {
assertEquals("wJalrXUtnFEMI/K7MDENG/bPxRfiCY2222222222", credentials.secretAccessKey());
}

@Test
public void testCreateNettyAsyncHttpClientWithPropertyTcpKeepAlive() throws Exception {
SdkAsyncHttpClient httpClient = AWSGeneralUtil.createAsyncHttpClient(new Properties());
NettyConfiguration nettyConfiguration = TestUtil.getNettyConfiguration(httpClient);

assertTrue(nettyConfiguration.tcpKeepAlive());
}

@Test
public void testCreateNettyAsyncHttpClientWithPropertyMaxConcurrency() throws Exception {
int maxConnections = 45678;
Properties properties = new Properties();
properties.setProperty(
AWSConfigConstants.HTTP_CLIENT_MAX_CONCURRENCY, String.valueOf(maxConnections));

SdkAsyncHttpClient httpClient = AWSGeneralUtil.createAsyncHttpClient(properties);
NettyConfiguration nettyConfiguration = TestUtil.getNettyConfiguration(httpClient);

assertEquals(maxConnections, nettyConfiguration.maxConnections());
}

@Test
public void testCreateNettyAsyncHttpClientWithPropertyReadTimeout() throws Exception {
int readTimeoutMillis = 45678;
Properties properties = new Properties();
properties.setProperty(
AWSConfigConstants.HTTP_CLIENT_READ_TIMEOUT_MILLIS,
String.valueOf(readTimeoutMillis));

SdkAsyncHttpClient httpClient = AWSGeneralUtil.createAsyncHttpClient(properties);
NettyConfiguration nettyConfiguration = TestUtil.getNettyConfiguration(httpClient);

assertEquals(readTimeoutMillis, nettyConfiguration.readTimeoutMillis());
}

@Test
public void testCreateNettyAsyncHttpClientWithPropertyTrustAllCertificates() throws Exception {
boolean trustAllCerts = true;
Properties properties = new Properties();
properties.setProperty(
AWSConfigConstants.TRUST_ALL_CERTIFICATES, String.valueOf(trustAllCerts));

SdkAsyncHttpClient httpClient = AWSGeneralUtil.createAsyncHttpClient(properties);
NettyConfiguration nettyConfiguration = TestUtil.getNettyConfiguration(httpClient);

assertEquals(trustAllCerts, nettyConfiguration.trustAllCertificates());
}

@Test
public void testCreateNettyAsyncHttpClientWithPropertyProtocol() throws Exception {
Protocol httpVersion = HTTP1_1;
Properties properties = new Properties();
properties.setProperty(
AWSConfigConstants.HTTP_PROTOCOL_VERSION, String.valueOf(httpVersion));

SdkAsyncHttpClient httpClient = AWSGeneralUtil.createAsyncHttpClient(properties);
NettyConfiguration nettyConfiguration = TestUtil.getNettyConfiguration(httpClient);

assertEquals(
httpVersion, nettyConfiguration.attribute(SdkHttpConfigurationOption.PROTOCOL));
}

@Test
public void testCreateNettyAsyncHttpClientWithDefaultsConnectionAcquireTimeout()
throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ under the License.
<name>Flink : Connectors : AWS Kinesis Data Streams</name>
<properties>
<aws.sdk.version>2.17.52</aws.sdk.version>
<aws.kinesis-kpl.version>0.14.1</aws.kinesis-kpl.version>
</properties>

<packaging>jar</packaging>
Expand Down Expand Up @@ -119,12 +118,6 @@ under the License.
</execution>
</executions>
</plugin>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.22.1</version>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,16 @@
package org.apache.flink.connector.kinesis.config;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.connector.aws.config.AWSConfigConstants;
import org.apache.flink.connector.kinesis.util.AWSKinesisDataStreamsUtil;

/** Defaults for {@link AWSKinesisDataStreamsUtil}. */
@PublicEvolving
public class AWSKinesisDataStreamsConfigConstants extends AWSConfigConstants {
public class AWSKinesisDataStreamsConfigConstants {

public static final boolean DEFAULT_LEGACY_CONNECTOR = false;
public static final String BASE_KINESIS_USER_AGENT_PREFIX_FORMAT =
"Apache Flink %s (%s) Kinesis Connector";

/** The identifier of the legacy connector. */
public static final String LEGACY_CONNECTOR = "aws.kinesis.legacy";
/** Identifier for user agent prefix. */
public static final String KINESIS_CLIENT_USER_AGENT_PREFIX =
"aws.kinesis.client.user-agent-prefix";
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,12 @@
* will be blocked until some have completed
* <li>{@code maxBufferedRequests}: the maximum number of elements held in the buffer, requests to
* add elements will be blocked while the number of elements in the buffer is at the maximum
* <li>{@code flushOnBufferSizeInBytes}: if the total size in bytes of all elements in the buffer
* reaches this value, then a flush will occur the next time any elements are added to the
* buffer
* <li>{@code maxBatchSizeInBytes}: the maximum size of a batch of entries that may be sent to KDS
* measured in bytes
* <li>{@code maxTimeInBufferMS}: the maximum amount of time an entry is allowed to live in the
* buffer, if any element reaches this age, the entire buffer will be flushed immediately
* <li>{@code maxRecordSizeInBytes}: the maximum size of a record the sink will accept into the
* buffer, a record of size larger than this will be rejected when passed to the sink
* <li>{@code failOnError}: when an exception is encountered while persisting to Kinesis Data
* Streams, the job will fail immediately if failOnError is set
* </ul>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import software.amazon.awssdk.services.kinesis.model.PutRecordsRequestEntry;

import java.util.Optional;
import java.util.Properties;

/**
Expand All @@ -47,11 +48,12 @@
* <p>If the following parameters are not set in this builder, the following defaults will be used:
*
* <ul>
* <li>{@code maxBatchSize} will be 200
* <li>{@code maxBatchSize} will be 500
* <li>{@code maxInFlightRequests} will be 16
* <li>{@code maxBufferedRequests} will be 10000
* <li>{@code flushOnBufferSizeInBytes} will be 64MB i.e. {@code 64 * 1024 * 1024}
* <li>{@code maxBatchSizeInBytes} will be 5 MB i.e. {@code 5 * 1024 * 1024}
* <li>{@code maxTimeInBufferMS} will be 5000ms
* <li>{@code maxRecordSizeInBytes} will be 1 MB i.e. {@code 1 * 1024 * 1024}
* <li>{@code failOnError} will be false
* </ul>
*
Expand Down Expand Up @@ -104,24 +106,15 @@ public KinesisDataStreamsSinkBuilder<InputT> setKinesisClientProperties(
public KinesisDataStreamsSink<InputT> build() {
return new KinesisDataStreamsSink<>(
getElementConverter(),
getMaxBatchSize() == null ? DEFAULT_MAX_BATCH_SIZE : getMaxBatchSize(),
getMaxInFlightRequests() == null
? DEFAULT_MAX_IN_FLIGHT_REQUESTS
: getMaxInFlightRequests(),
getMaxBufferedRequests() == null
? DEFAULT_MAX_BUFFERED_REQUESTS
: getMaxBufferedRequests(),
getMaxBatchSizeInBytes() == null
? DEFAULT_MAX_BATCH_SIZE_IN_B
: getMaxBatchSizeInBytes(),
getMaxTimeInBufferMS() == null
? DEFAULT_MAX_TIME_IN_BUFFER_MS
: getMaxTimeInBufferMS(),
getMaxRecordSizeInBytes() == null
? DEFAULT_MAX_RECORD_SIZE_IN_B
: getMaxRecordSizeInBytes(),
failOnError == null ? DEFAULT_FAIL_ON_ERROR : failOnError,
Optional.ofNullable(getMaxBatchSize()).orElse(DEFAULT_MAX_BATCH_SIZE),
Optional.ofNullable(getMaxInFlightRequests())
.orElse(DEFAULT_MAX_IN_FLIGHT_REQUESTS),
Optional.ofNullable(getMaxBufferedRequests()).orElse(DEFAULT_MAX_BUFFERED_REQUESTS),
Optional.ofNullable(getMaxBatchSizeInBytes()).orElse(DEFAULT_MAX_BATCH_SIZE_IN_B),
Optional.ofNullable(getMaxTimeInBufferMS()).orElse(DEFAULT_MAX_TIME_IN_BUFFER_MS),
Optional.ofNullable(getMaxRecordSizeInBytes()).orElse(DEFAULT_MAX_RECORD_SIZE_IN_B),
Optional.ofNullable(failOnError).orElse(DEFAULT_FAIL_ON_ERROR),
streamName,
kinesisClientProperties == null ? new Properties() : kinesisClientProperties);
Optional.ofNullable(kinesisClientProperties).orElse(new Properties()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.flink.connector.kinesis.sink;

import org.apache.flink.api.connector.sink.Sink;
import org.apache.flink.connector.aws.config.AWSConfigConstants;
import org.apache.flink.connector.aws.util.AWSGeneralUtil;
import org.apache.flink.connector.base.sink.writer.AsyncSinkWriter;
import org.apache.flink.connector.base.sink.writer.ElementConverter;
Expand All @@ -28,32 +27,23 @@

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.http.Protocol;
import software.amazon.awssdk.http.SdkHttpConfigurationOption;
import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.model.PutRecordsRequest;
import software.amazon.awssdk.services.kinesis.model.PutRecordsRequestEntry;
import software.amazon.awssdk.services.kinesis.model.PutRecordsResponse;
import software.amazon.awssdk.services.kinesis.model.PutRecordsResultEntry;
import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException;
import software.amazon.awssdk.utils.AttributeMap;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.function.Consumer;

import static org.apache.flink.connector.kinesis.config.AsyncProducerConfigConstants.HTTP_CLIENT_MAX_CONCURRENCY;
import static org.apache.flink.connector.kinesis.config.AsyncProducerConfigConstants.HTTP_CLIENT_READ_TIMEOUT_MILLIS;

/**
* Sink writer created by {@link KinesisDataStreamsSink} to write to Kinesis Data Streams. More
* details on the operation of this sink writer may be found in the doc for {@link
Expand Down Expand Up @@ -111,45 +101,9 @@ class KinesisDataStreamsSinkWriter<InputT> extends AsyncSinkWriter<InputT, PutRe
}

private KinesisAsyncClient buildClient(Properties kinesisClientProperties) {
final AttributeMap.Builder clientConfiguration =
AttributeMap.builder().put(SdkHttpConfigurationOption.TCP_KEEPALIVE, true);

Optional.ofNullable(kinesisClientProperties.getProperty(HTTP_CLIENT_MAX_CONCURRENCY))
.map(Integer::parseInt)
.ifPresent(
integer ->
clientConfiguration.put(
SdkHttpConfigurationOption.MAX_CONNECTIONS, integer));

Optional.ofNullable(kinesisClientProperties.getProperty(HTTP_CLIENT_READ_TIMEOUT_MILLIS))
.map(Integer::parseInt)
.map(Duration::ofMillis)
.ifPresent(
timeout ->
clientConfiguration.put(
SdkHttpConfigurationOption.READ_TIMEOUT, timeout));

Optional.ofNullable(
kinesisClientProperties.getProperty(
AWSConfigConstants.TRUST_ALL_CERTIFICATES))
.map(Boolean::parseBoolean)
.ifPresent(
bool ->
clientConfiguration.put(
SdkHttpConfigurationOption.TRUST_ALL_CERTIFICATES, bool));

Optional.ofNullable(
kinesisClientProperties.getProperty(
AWSConfigConstants.HTTP_PROTOCOL_VERSION))
.map(Protocol::valueOf)
.ifPresent(
protocol ->
clientConfiguration.put(
SdkHttpConfigurationOption.PROTOCOL, protocol));

final SdkAsyncHttpClient httpClient =
AWSGeneralUtil.createAsyncHttpClient(
clientConfiguration.build(), NettyNioAsyncHttpClient.builder());
AWSGeneralUtil.createAsyncHttpClient(kinesisClientProperties);

return AWSKinesisDataStreamsUtil.createKinesisAsyncClient(
kinesisClientProperties, httpClient);
Expand Down
Loading

0 comments on commit 4453f15

Please sign in to comment.