Skip to content

Commit

Permalink
[FLINK-25351][annotations] Introduce FlinkVersion as a global enum
Browse files Browse the repository at this point in the history
Rename `MigrationVersion` to `FlinkVersion` and move it to
`flink-annotations` in the`org.apache.flink` package so that is available
globally and will facilitate the APIs and SQL/TableAPI versioning
and upgrades.

This closes apache#18340.
  • Loading branch information
matriv authored and twalthr committed Jan 13, 2022
1 parent 1ea2a7a commit 4518a45
Show file tree
Hide file tree
Showing 66 changed files with 656 additions and 651 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,25 +16,30 @@
* limitations under the License.
*/

package org.apache.flink.testutils.migration;
package org.apache.flink;

import org.apache.flink.annotation.Public;

import java.util.Arrays;
import java.util.List;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/**
* Enumeration for Flink versions, used in migration integration tests to indicate the migrated
* snapshot version.
* Enumeration for Flink versions.
*
* <p>It used for API versioning, during SQL/Table API upgrades, and for migration tests.
*/
public enum MigrationVersion {
@Public
public enum FlinkVersion {

// NOTE: the version strings must not change,
// as they are used to locate snapshot file paths.
// The definition order matters for performing version arithmetic.
// The definition order (enum ordinal) matters for performing version arithmetic.
v1_3("1.3"),
v1_4("1.4"),
v1_5("1.5"),
Expand All @@ -50,7 +55,7 @@ public enum MigrationVersion {

private final String versionStr;

MigrationVersion(String versionStr) {
FlinkVersion(String versionStr) {
this.versionStr = versionStr;
}

Expand All @@ -59,22 +64,22 @@ public String toString() {
return versionStr;
}

public boolean isNewerVersionThan(MigrationVersion otherVersion) {
public boolean isNewerVersionThan(FlinkVersion otherVersion) {
return this.ordinal() > otherVersion.ordinal();
}

/** Returns all versions equal to or higher than the selected version. */
public List<MigrationVersion> orHigher() {
return Stream.of(MigrationVersion.values())
public Set<FlinkVersion> orHigher() {
return Stream.of(FlinkVersion.values())
.filter(v -> this.ordinal() <= v.ordinal())
.collect(Collectors.toList());
.collect(Collectors.toCollection(LinkedHashSet::new));
}

private static final Map<String, MigrationVersion> CODE_MAP =
private static final Map<String, FlinkVersion> CODE_MAP =
Arrays.stream(values())
.collect(Collectors.toMap(v -> v.versionStr, Function.identity()));

public static Optional<MigrationVersion> byCode(String code) {
public static Optional<FlinkVersion> byCode(String code) {
return Optional.ofNullable(CODE_MAP.get(code));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.flink.connector.jdbc.xa;

import org.apache.flink.FlinkVersion;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.jdbc.DbMetadata;
Expand All @@ -26,7 +27,6 @@
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.streaming.api.operators.StreamSink;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.testutils.migration.MigrationVersion;
import org.apache.flink.util.Preconditions;

import org.junit.After;
Expand Down Expand Up @@ -67,16 +67,15 @@ public static void main(String[] args) throws Exception {
}

@Parameterized.Parameters
public static Collection<MigrationVersion> getReadVersions() {
// return Collections.singleton(MigrationVersion.v1_10);
public static Collection<FlinkVersion> getReadVersions() {
return Collections.emptyList();
}

public JdbcXaSinkMigrationTest(MigrationVersion readVersion) {
public JdbcXaSinkMigrationTest(FlinkVersion readVersion) {
this.readVersion = readVersion;
}

private final MigrationVersion readVersion;
private final FlinkVersion readVersion;

@Test
public void testCommitFromSnapshot() throws Exception {
Expand Down Expand Up @@ -144,7 +143,7 @@ public boolean belongsToSubtask(Xid xid, RuntimeContext ctx) {
};
}

private static String getSnapshotPath(MigrationVersion version) {
private static String getSnapshotPath(FlinkVersion version) {
return String.format(
"src/test/resources/jdbc-exactly-once-sink-migration-%s-snapshot", version);
}
Expand All @@ -157,14 +156,14 @@ private static OneInputStreamOperatorTestHarness<TestEntry, Object> createHarnes
return harness;
}

private static MigrationVersion parseVersionArg(String[] args) {
private static FlinkVersion parseVersionArg(String[] args) {
return (args == null || args.length == 0 ? Optional.<String>empty() : of(args[0]))
.flatMap(MigrationVersion::byCode)
.flatMap(FlinkVersion::byCode)
.orElseThrow(
() ->
new IllegalArgumentException(
"Please specify a version as a 1st parameter. Valid values are: "
+ Arrays.toString(MigrationVersion.values())));
+ Arrays.toString(FlinkVersion.values())));
}

private static JdbcXaSinkFunction<TestEntry> buildSink() {
Expand All @@ -187,7 +186,7 @@ private static void cancelAllTx() throws Exception {
}
}

private static void writeSnapshot(MigrationVersion v) throws Exception {
private static void writeSnapshot(FlinkVersion v) throws Exception {
String path = getSnapshotPath(v);
Preconditions.checkArgument(
!Files.exists(Paths.get(path)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.streaming.connectors.kafka;

import org.apache.flink.FlinkVersion;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.metrics.MetricGroup;
Expand All @@ -35,7 +36,6 @@
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicsDescriptor;
import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
import org.apache.flink.streaming.util.OperatorSnapshotUtil;
import org.apache.flink.testutils.migration.MigrationVersion;
import org.apache.flink.util.SerializedValue;

import org.junit.Ignore;
Expand Down Expand Up @@ -73,11 +73,11 @@ public class FlinkKafkaConsumerBaseMigrationTest {

/**
* TODO change this to the corresponding savepoint version to be written (e.g. {@link
* MigrationVersion#v1_3} for 1.3) TODO and remove all @Ignore annotations on write*Snapshot()
* FlinkVersion#v1_3} for 1.3) TODO and remove all @Ignore annotations on write*Snapshot()
* methods to generate savepoints TODO Note: You should generate the savepoint based on the
* release branch instead of the master.
*/
private final MigrationVersion flinkGenerateSavepointVersion = null;
private final FlinkVersion flinkGenerateSavepointVersion = null;

private static final HashMap<KafkaTopicPartition, Long> PARTITION_STATE = new HashMap<>();

Expand All @@ -90,25 +90,25 @@ public class FlinkKafkaConsumerBaseMigrationTest {
new ArrayList<>(PARTITION_STATE.keySet())
.stream().map(p -> p.getTopic()).distinct().collect(Collectors.toList());

private final MigrationVersion testMigrateVersion;
private final FlinkVersion testMigrateVersion;

@Parameterized.Parameters(name = "Migration Savepoint: {0}")
public static Collection<MigrationVersion> parameters() {
public static Collection<FlinkVersion> parameters() {
return Arrays.asList(
MigrationVersion.v1_4,
MigrationVersion.v1_5,
MigrationVersion.v1_6,
MigrationVersion.v1_7,
MigrationVersion.v1_8,
MigrationVersion.v1_9,
MigrationVersion.v1_10,
MigrationVersion.v1_11,
MigrationVersion.v1_12,
MigrationVersion.v1_13,
MigrationVersion.v1_14);
FlinkVersion.v1_4,
FlinkVersion.v1_5,
FlinkVersion.v1_6,
FlinkVersion.v1_7,
FlinkVersion.v1_8,
FlinkVersion.v1_9,
FlinkVersion.v1_10,
FlinkVersion.v1_11,
FlinkVersion.v1_12,
FlinkVersion.v1_13,
FlinkVersion.v1_14);
}

public FlinkKafkaConsumerBaseMigrationTest(MigrationVersion testMigrateVersion) {
public FlinkKafkaConsumerBaseMigrationTest(FlinkVersion testMigrateVersion) {
this.testMigrateVersion = testMigrateVersion;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

package org.apache.flink.streaming.connectors.kafka;

import org.apache.flink.testutils.migration.MigrationVersion;
import org.apache.flink.FlinkVersion;

import org.junit.Ignore;
import org.junit.runners.Parameterized;
Expand All @@ -31,25 +31,22 @@
* by {@link FlinkKafkaProducer011MigrationTest#writeSnapshot()}.
*
* <p>Warning: We need to rename the generated resource based on the file naming pattern specified
* by the {@link #getOperatorSnapshotPath(MigrationVersion)} method then copy the resource to the
* path also specified by the {@link #getOperatorSnapshotPath(MigrationVersion)} method.
* by the {@link #getOperatorSnapshotPath(FlinkVersion)} method then copy the resource to the path
* also specified by the {@link #getOperatorSnapshotPath(FlinkVersion)} method.
*/
public class FlinkKafkaProducerMigrationOperatorTest extends FlinkKafkaProducerMigrationTest {
@Parameterized.Parameters(name = "Migration Savepoint: {0}")
public static Collection<MigrationVersion> parameters() {
public static Collection<FlinkVersion> parameters() {
return Arrays.asList(
MigrationVersion.v1_8,
MigrationVersion.v1_9,
MigrationVersion.v1_10,
MigrationVersion.v1_11);
FlinkVersion.v1_8, FlinkVersion.v1_9, FlinkVersion.v1_10, FlinkVersion.v1_11);
}

public FlinkKafkaProducerMigrationOperatorTest(MigrationVersion testMigrateVersion) {
public FlinkKafkaProducerMigrationOperatorTest(FlinkVersion testMigrateVersion) {
super(testMigrateVersion);
}

@Override
public String getOperatorSnapshotPath(MigrationVersion version) {
public String getOperatorSnapshotPath(FlinkVersion version) {
return "src/test/resources/kafka-0.11-migration-kafka-producer-flink-"
+ version
+ "-snapshot";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@

package org.apache.flink.streaming.connectors.kafka;

import org.apache.flink.FlinkVersion;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.streaming.api.operators.StreamSink;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.testutils.migration.MigrationVersion;

import org.apache.kafka.clients.producer.ProducerConfig;
import org.junit.runner.RunWith;
Expand All @@ -42,18 +42,18 @@
@RunWith(Parameterized.class)
public class FlinkKafkaProducerMigrationTest extends KafkaMigrationTestBase {
@Parameterized.Parameters(name = "Migration Savepoint: {0}")
public static Collection<MigrationVersion> parameters() {
public static Collection<FlinkVersion> parameters() {
return Arrays.asList(
MigrationVersion.v1_8,
MigrationVersion.v1_9,
MigrationVersion.v1_10,
MigrationVersion.v1_11,
MigrationVersion.v1_12,
MigrationVersion.v1_13,
MigrationVersion.v1_14);
FlinkVersion.v1_8,
FlinkVersion.v1_9,
FlinkVersion.v1_10,
FlinkVersion.v1_11,
FlinkVersion.v1_12,
FlinkVersion.v1_13,
FlinkVersion.v1_14);
}

public FlinkKafkaProducerMigrationTest(MigrationVersion testMigrateVersion) {
public FlinkKafkaProducerMigrationTest(FlinkVersion testMigrateVersion) {
super(testMigrateVersion);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.flink.streaming.connectors.kafka;

import org.apache.flink.FlinkVersion;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.serialization.TypeInformationSerializationSchema;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
Expand All @@ -25,7 +26,6 @@
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.OperatorSnapshotUtil;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
import org.apache.flink.testutils.migration.MigrationVersion;

import org.junit.AfterClass;
import org.junit.BeforeClass;
Expand All @@ -48,7 +48,7 @@ public abstract class KafkaMigrationTestBase extends KafkaTestBase {
protected static final Logger LOG = LoggerFactory.getLogger(KafkaMigrationTestBase.class);
protected static final String TOPIC = "flink-kafka-producer-migration-test";

protected final MigrationVersion testMigrateVersion;
protected final FlinkVersion testMigrateVersion;
protected final TypeInformationSerializationSchema<Integer> integerSerializationSchema =
new TypeInformationSerializationSchema<>(
BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig());
Expand All @@ -57,21 +57,21 @@ public abstract class KafkaMigrationTestBase extends KafkaTestBase {

/**
* TODO change this to the corresponding savepoint version to be written (e.g. {@link
* MigrationVersion#v1_3} for 1.3) TODO and remove all @Ignore annotations on write*Snapshot()
* FlinkVersion#v1_3} for 1.3) TODO and remove all @Ignore annotations on write*Snapshot()
* methods to generate savepoints TODO Note: You should generate the savepoint based on the
* release branch instead of the master.
*/
protected final Optional<MigrationVersion> flinkGenerateSavepointVersion = Optional.empty();
protected final Optional<FlinkVersion> flinkGenerateSavepointVersion = Optional.empty();

public KafkaMigrationTestBase(MigrationVersion testMigrateVersion) {
public KafkaMigrationTestBase(FlinkVersion testMigrateVersion) {
this.testMigrateVersion = checkNotNull(testMigrateVersion);
}

public String getOperatorSnapshotPath() {
return getOperatorSnapshotPath(testMigrateVersion);
}

public String getOperatorSnapshotPath(MigrationVersion version) {
public String getOperatorSnapshotPath(FlinkVersion version) {
return "src/test/resources/kafka-migration-kafka-producer-flink-" + version + "-snapshot";
}

Expand Down
Loading

0 comments on commit 4518a45

Please sign in to comment.