Skip to content

Commit

Permalink
[FLINK-4380] Remove KeyGroupAssigner in favor of static method/Have d…
Browse files Browse the repository at this point in the history
…efault max. parallelism at 128
  • Loading branch information
StefanRRichter authored and aljoscha committed Aug 31, 2016
1 parent 2b7a8d6 commit 6d43061
Show file tree
Hide file tree
Showing 42 changed files with 297 additions and 586 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.runtime.state.KvState;
import org.apache.flink.util.Preconditions;
import org.rocksdb.ColumnFamilyHandle;
Expand Down Expand Up @@ -130,7 +131,7 @@ public byte[] getSerializedValue(byte[] serializedKeyAndNamespace) throws Except
backend.getKeySerializer(),
namespaceSerializer);

int keyGroup = backend.getKeyGroupAssigner().getKeyGroupIndex(des.f0);
int keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(des.f0, backend.getNumberOfKeyGroups());
writeKeyWithGroupAndNamespace(keyGroup, des.f0, des.f1);
return backend.db.get(columnFamily, keySerializationStream.toByteArray());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.state.FoldingState;
import org.apache.flink.api.common.state.FoldingStateDescriptor;
import org.apache.flink.api.common.state.KeyGroupAssigner;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.ReducingState;
Expand Down Expand Up @@ -131,11 +130,11 @@ public RocksDBKeyedStateBackend(
ColumnFamilyOptions columnFamilyOptions,
TaskKvStateRegistry kvStateRegistry,
TypeSerializer<K> keySerializer,
KeyGroupAssigner<K> keyGroupAssigner,
int numberOfKeyGroups,
KeyGroupRange keyGroupRange
) throws Exception {

super(kvStateRegistry, keySerializer, keyGroupAssigner, keyGroupRange);
super(kvStateRegistry, keySerializer, numberOfKeyGroups, keyGroupRange);

this.operatorIdentifier = operatorIdentifier;
this.jobId = jobId;
Expand Down Expand Up @@ -183,7 +182,7 @@ public RocksDBKeyedStateBackend(
ColumnFamilyOptions columnFamilyOptions,
TaskKvStateRegistry kvStateRegistry,
TypeSerializer<K> keySerializer,
KeyGroupAssigner<K> keyGroupAssigner,
int numberOfKeyGroups,
KeyGroupRange keyGroupRange,
List<KeyGroupsStateHandle> restoreState
) throws Exception {
Expand All @@ -195,7 +194,7 @@ public RocksDBKeyedStateBackend(
columnFamilyOptions,
kvStateRegistry,
keySerializer,
keyGroupAssigner,
numberOfKeyGroups,
keyGroupRange);

LOG.info("Initializing RocksDB keyed state backend from snapshot.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.flink.contrib.streaming.state;

import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.state.KeyGroupAssigner;
import org.apache.flink.api.common.state.StateBackend;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.fs.Path;
Expand Down Expand Up @@ -230,7 +229,7 @@ public <K> KeyedStateBackend<K> createKeyedStateBackend(
JobID jobID,
String operatorIdentifier,
TypeSerializer<K> keySerializer,
KeyGroupAssigner<K> keyGroupAssigner,
int numberOfKeyGroups,
KeyGroupRange keyGroupRange,
TaskKvStateRegistry kvStateRegistry) throws Exception {

Expand All @@ -246,15 +245,15 @@ public <K> KeyedStateBackend<K> createKeyedStateBackend(
getColumnOptions(),
kvStateRegistry,
keySerializer,
keyGroupAssigner,
numberOfKeyGroups,
keyGroupRange);
}

@Override
public <K> KeyedStateBackend<K> restoreKeyedStateBackend(Environment env, JobID jobID,
String operatorIdentifier,
TypeSerializer<K> keySerializer,
KeyGroupAssigner<K> keyGroupAssigner,
int numberOfKeyGroups,
KeyGroupRange keyGroupRange,
List<KeyGroupsStateHandle> restoredState,
TaskKvStateRegistry kvStateRegistry) throws Exception {
Expand All @@ -270,7 +269,7 @@ public <K> KeyedStateBackend<K> restoreKeyedStateBackend(Environment env, JobID
getColumnOptions(),
kvStateRegistry,
keySerializer,
keyGroupAssigner,
numberOfKeyGroups,
keyGroupRange,
restoredState);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.apache.flink.runtime.query.KvStateRegistry;

import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.HashKeyGroupAssigner;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.util.OperatingSystem;
import org.junit.Assume;
Expand Down Expand Up @@ -93,7 +92,7 @@ public void testSetDbPath() throws Exception {
env.getJobID(),
"test_op",
IntSerializer.INSTANCE,
new HashKeyGroupAssigner<Integer>(1),
1,
new KeyGroupRange(0, 0),
env.getTaskKvStateRegistry());

Expand Down Expand Up @@ -147,7 +146,7 @@ public void testUseTempDirectories() throws Exception {
env.getJobID(),
"test_op",
IntSerializer.INSTANCE,
new HashKeyGroupAssigner<Integer>(1),
1,
new KeyGroupRange(0, 0),
env.getTaskKvStateRegistry());

Expand Down Expand Up @@ -182,7 +181,7 @@ public void testFailWhenNoLocalStorageDir() throws Exception {
env.getJobID(),
"foobar",
IntSerializer.INSTANCE,
new HashKeyGroupAssigner<Integer>(1),
1,
new KeyGroupRange(0, 0),
new KvStateRegistry().createTaskRegistry(env.getJobID(), new JobVertexID()));
}
Expand Down Expand Up @@ -224,7 +223,7 @@ public void testContinueOnSomeDbDirectoriesMissing() throws Exception {
env.getJobID(),
"foobar",
IntSerializer.INSTANCE,
new HashKeyGroupAssigner<Integer>(1),
1,
new KeyGroupRange(0, 0),
new KvStateRegistry().createTaskRegistry(env.getJobID(), new JobVertexID()));
}
Expand Down

This file was deleted.

15 changes: 15 additions & 0 deletions flink-core/src/main/java/org/apache/flink/util/MathUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,21 @@ else if (code != Integer.MIN_VALUE) {
}
}

/**
* Round the given number to the next power of two
* @param x number to round
* @return x rounded up to the next power of two
*/
public static int roundUpToPowerOfTwo(int x) {
x = x - 1;
x |= x >> 1;
x |= x >> 2;
x |= x >> 4;
x |= x >> 8;
x |= x >> 16;
return x + 1;
}

// ============================================================================================

/**
Expand Down
31 changes: 31 additions & 0 deletions flink-core/src/test/java/org/apache/flink/util/MathUtilTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,37 @@ public void testRoundDownToPowerOf2() {
assertEquals(1073741824, MathUtils.roundDownToPowerOf2(Integer.MAX_VALUE));
}

@Test
public void testRoundUpToPowerOf2() {
assertEquals(0, MathUtils.roundUpToPowerOfTwo(0));
assertEquals(1, MathUtils.roundUpToPowerOfTwo(1));
assertEquals(2, MathUtils.roundUpToPowerOfTwo(2));
assertEquals(4, MathUtils.roundUpToPowerOfTwo(3));
assertEquals(4, MathUtils.roundUpToPowerOfTwo(4));
assertEquals(8, MathUtils.roundUpToPowerOfTwo(5));
assertEquals(8, MathUtils.roundUpToPowerOfTwo(6));
assertEquals(8, MathUtils.roundUpToPowerOfTwo(7));
assertEquals(8, MathUtils.roundUpToPowerOfTwo(8));
assertEquals(16, MathUtils.roundUpToPowerOfTwo(9));
assertEquals(16, MathUtils.roundUpToPowerOfTwo(15));
assertEquals(16, MathUtils.roundUpToPowerOfTwo(16));
assertEquals(32, MathUtils.roundUpToPowerOfTwo(17));
assertEquals(32, MathUtils.roundUpToPowerOfTwo(31));
assertEquals(32, MathUtils.roundUpToPowerOfTwo(32));
assertEquals(64, MathUtils.roundUpToPowerOfTwo(33));
assertEquals(64, MathUtils.roundUpToPowerOfTwo(42));
assertEquals(64, MathUtils.roundUpToPowerOfTwo(63));
assertEquals(64, MathUtils.roundUpToPowerOfTwo(64));
assertEquals(128, MathUtils.roundUpToPowerOfTwo(125));
assertEquals(32768, MathUtils.roundUpToPowerOfTwo(25654));
assertEquals(67108864, MathUtils.roundUpToPowerOfTwo(34366363));
assertEquals(67108864, MathUtils.roundUpToPowerOfTwo(67108863));
assertEquals(67108864, MathUtils.roundUpToPowerOfTwo(67108864));
assertEquals(0x40000000, MathUtils.roundUpToPowerOfTwo(0x3FFFFFFE));
assertEquals(0x40000000, MathUtils.roundUpToPowerOfTwo(0x3FFFFFFF));
assertEquals(0x40000000, MathUtils.roundUpToPowerOfTwo(0x40000000));
}

@Test
public void testPowerOfTwo() {
assertTrue(MathUtils.isPowerOf2(1));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.flink.runtime.messages.checkpoint.TriggerCheckpoint;
import org.apache.flink.runtime.state.ChainedStateHandle;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.runtime.state.KeyGroupsStateHandle;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.util.Preconditions;
Expand Down Expand Up @@ -886,7 +887,7 @@ public static List<KeyGroupRange> createKeyGroupPartitions(int numberKeyGroups,
List<KeyGroupRange> result = new ArrayList<>(parallelism);
int start = 0;
for (int i = 0; i < parallelism; ++i) {
result.add(KeyGroupRange.computeKeyGroupRangeForOperatorIndex(numberKeyGroups, parallelism, i));
result.add(KeyGroupRangeAssignment.computeKeyGroupRangeForOperatorIndex(numberKeyGroups, parallelism, i));
}
return result;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,8 @@ public int getMaxParallelism() {
*/
public void setMaxParallelism(int maxParallelism) {
org.apache.flink.util.Preconditions.checkArgument(
maxParallelism > 0 && maxParallelism <= Short.MAX_VALUE, "The max parallelism must be at least 1.");
maxParallelism > 0 && maxParallelism <= (1 << 15),
"The max parallelism must be at least 1 and smaller than 2^15.");

this.maxParallelism = maxParallelism;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.flink.runtime.state;

import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.state.KeyGroupAssigner;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
Expand Down Expand Up @@ -53,7 +52,7 @@ public abstract <K> KeyedStateBackend<K> createKeyedStateBackend(
JobID jobID,
String operatorIdentifier,
TypeSerializer<K> keySerializer,
KeyGroupAssigner<K> keyGroupAssigner,
int numberOfKeyGroups,
KeyGroupRange keyGroupRange,
TaskKvStateRegistry kvStateRegistry) throws Exception;

Expand All @@ -66,7 +65,7 @@ public abstract <K> KeyedStateBackend<K> restoreKeyedStateBackend(
JobID jobID,
String operatorIdentifier,
TypeSerializer<K> keySerializer,
KeyGroupAssigner<K> keyGroupAssigner,
int numberOfKeyGroups,
KeyGroupRange keyGroupRange,
List<KeyGroupsStateHandle> restoredState,
TaskKvStateRegistry kvStateRegistry) throws Exception;
Expand Down

This file was deleted.

Loading

0 comments on commit 6d43061

Please sign in to comment.