forked from apache/flink
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[FLINK-35355][State] Internal async aggregating state and correspondi…
…ng state descriptor This closes apache#24810
- Loading branch information
Showing
10 changed files
with
423 additions
and
1 deletion.
There are no files selected for viewing
44 changes: 44 additions & 0 deletions
44
flink-core-api/src/main/java/org/apache/flink/api/common/state/v2/AggregatingState.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,44 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.flink.api.common.state.v2; | ||
|
||
import org.apache.flink.annotation.Experimental; | ||
|
||
/** | ||
* {@link State} interface for aggregating state, based on an {@link | ||
* org.apache.flink.api.common.functions.AggregateFunction}. Elements that are added to this type of | ||
* state will be eagerly pre-aggregated using a given {@code AggregateFunction}. | ||
* | ||
* <p>The state holds internally always the accumulator type of the {@code AggregateFunction}. When | ||
* accessing the result of the state, the function's {@link | ||
* org.apache.flink.api.common.functions.AggregateFunction#getResult(Object)} method. | ||
* | ||
* <p>The state is accessed and modified by user functions, and checkpointed consistently by the | ||
* system as part of the distributed snapshots. | ||
* | ||
* <p>The state is only accessible by functions applied on a {@code KeyedStream}. The key is | ||
* automatically supplied by the system, so the function always sees the value mapped to the key of | ||
* the current element. That way, the system can handle stream and state partitioning consistently | ||
* together. | ||
* | ||
* @param <IN> Type of the value added to the state. | ||
* @param <OUT> Type of the value extracted from the state. | ||
*/ | ||
@Experimental | ||
public interface AggregatingState<IN, OUT> extends MergingState<IN, OUT> {} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
84 changes: 84 additions & 0 deletions
84
...k-runtime/src/main/java/org/apache/flink/runtime/state/v2/AggregatingStateDescriptor.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,84 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.flink.runtime.state.v2; | ||
|
||
import org.apache.flink.api.common.functions.AggregateFunction; | ||
import org.apache.flink.api.common.serialization.SerializerConfig; | ||
import org.apache.flink.api.common.typeinfo.TypeInformation; | ||
|
||
import javax.annotation.Nonnull; | ||
|
||
import static org.apache.flink.util.Preconditions.checkNotNull; | ||
|
||
/** | ||
* A {@link StateDescriptor} for {@link org.apache.flink.api.common.state.v2.AggregatingState}. | ||
* | ||
* <p>The type internally stored in the state is the type of the {@code Accumulator} of the {@code | ||
* AggregateFunction}. | ||
* | ||
* @param <IN> The type of the values that are added to the state. | ||
* @param <ACC> The type of the accumulator (intermediate aggregation state). | ||
* @param <OUT> The type of the values that are returned from the state. | ||
*/ | ||
public class AggregatingStateDescriptor<IN, ACC, OUT> extends StateDescriptor<ACC> { | ||
|
||
private final AggregateFunction<IN, ACC, OUT> aggregateFunction; | ||
|
||
/** | ||
* Create a new state descriptor with the given name, function, and type. | ||
* | ||
* @param stateId The (unique) name for the state. | ||
* @param aggregateFunction The {@code AggregateFunction} used to aggregate the state. | ||
* @param typeInfo The type of the accumulator. The accumulator is stored in the state. | ||
*/ | ||
public AggregatingStateDescriptor( | ||
@Nonnull String stateId, | ||
@Nonnull AggregateFunction<IN, ACC, OUT> aggregateFunction, | ||
@Nonnull TypeInformation<ACC> typeInfo) { | ||
super(stateId, typeInfo); | ||
this.aggregateFunction = checkNotNull(aggregateFunction); | ||
} | ||
|
||
/** | ||
* Create a new state descriptor with the given name, function, and type. | ||
* | ||
* @param stateId The (unique) name for the state. | ||
* @param aggregateFunction The {@code AggregateFunction} used to aggregate the state. | ||
* @param typeInfo The type of the accumulator. The accumulator is stored in the state. | ||
* @param serializerConfig The serializer related config used to generate TypeSerializer. | ||
*/ | ||
public AggregatingStateDescriptor( | ||
@Nonnull String stateId, | ||
@Nonnull AggregateFunction<IN, ACC, OUT> aggregateFunction, | ||
@Nonnull TypeInformation<ACC> typeInfo, | ||
SerializerConfig serializerConfig) { | ||
super(stateId, typeInfo, serializerConfig); | ||
this.aggregateFunction = checkNotNull(aggregateFunction); | ||
} | ||
|
||
/** Returns the Aggregate function for this state. */ | ||
public AggregateFunction<IN, ACC, OUT> getAggregateFunction() { | ||
return aggregateFunction; | ||
} | ||
|
||
@Override | ||
public Type getType() { | ||
return Type.AGGREGATING; | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
63 changes: 63 additions & 0 deletions
63
flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/InternalAggregatingState.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,63 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.flink.runtime.state.v2; | ||
|
||
import org.apache.flink.api.common.functions.AggregateFunction; | ||
import org.apache.flink.api.common.state.v2.AggregatingState; | ||
import org.apache.flink.api.common.state.v2.StateFuture; | ||
import org.apache.flink.runtime.asyncprocessing.StateRequestHandler; | ||
import org.apache.flink.runtime.asyncprocessing.StateRequestType; | ||
|
||
/** | ||
* The default implementation of {@link AggregatingState}, which delegates all async requests to | ||
* {@link StateRequestHandler}. | ||
* | ||
* @param <K> The type of key the state is associated to. | ||
* @param <IN> The type of the values that are added into the state. | ||
* @param <ACC> TThe type of the accumulator (intermediate aggregation state). | ||
* @param <OUT> The type of the values that are returned from the state. | ||
*/ | ||
public class InternalAggregatingState<K, IN, ACC, OUT> extends InternalKeyedState<K, ACC> | ||
implements AggregatingState<IN, OUT> { | ||
|
||
protected final AggregateFunction<IN, ACC, OUT> aggregateFunction; | ||
|
||
/** | ||
* Creates a new InternalKeyedState with the given asyncExecutionController and stateDescriptor. | ||
* | ||
* @param stateRequestHandler The async request handler for handling all requests. | ||
* @param stateDescriptor The properties of the state. | ||
*/ | ||
public InternalAggregatingState( | ||
StateRequestHandler stateRequestHandler, | ||
AggregatingStateDescriptor<IN, ACC, OUT> stateDescriptor) { | ||
super(stateRequestHandler, stateDescriptor); | ||
this.aggregateFunction = stateDescriptor.getAggregateFunction(); | ||
} | ||
|
||
@Override | ||
public StateFuture<OUT> asyncGet() { | ||
return handleRequest(StateRequestType.AGGREGATING_GET, null); | ||
} | ||
|
||
@Override | ||
public StateFuture<Void> asyncAdd(IN value) { | ||
return handleRequest(StateRequestType.AGGREGATING_ADD, value); | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
80 changes: 80 additions & 0 deletions
80
...ntime/src/test/java/org/apache/flink/runtime/state/v2/AggregatingStateDescriptorTest.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,80 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.flink.runtime.state.v2; | ||
|
||
import org.apache.flink.api.common.functions.AggregateFunction; | ||
import org.apache.flink.api.common.typeinfo.BasicTypeInfo; | ||
import org.apache.flink.core.testutils.CommonTestUtils; | ||
|
||
import org.junit.jupiter.api.Test; | ||
|
||
import java.io.Serializable; | ||
|
||
import static org.assertj.core.api.Assertions.assertThat; | ||
|
||
/** Tests for {@link AggregatingStateDescriptor}. */ | ||
class AggregatingStateDescriptorTest implements Serializable { | ||
|
||
@Test | ||
void testHashCodeAndEquals() throws Exception { | ||
final String name = "testName"; | ||
AggregateFunction<Integer, Integer, Integer> aggregator = | ||
new AggregateFunction<Integer, Integer, Integer>() { | ||
@Override | ||
public Integer createAccumulator() { | ||
return 0; | ||
} | ||
|
||
@Override | ||
public Integer add(Integer value, Integer accumulator) { | ||
return accumulator + value; | ||
} | ||
|
||
@Override | ||
public Integer getResult(Integer accumulator) { | ||
return accumulator; | ||
} | ||
|
||
@Override | ||
public Integer merge(Integer a, Integer b) { | ||
return a + b; | ||
} | ||
}; | ||
|
||
AggregatingStateDescriptor<Integer, Integer, Integer> original = | ||
new AggregatingStateDescriptor<>(name, aggregator, BasicTypeInfo.INT_TYPE_INFO); | ||
AggregatingStateDescriptor<Integer, Integer, Integer> same = | ||
new AggregatingStateDescriptor<>(name, aggregator, BasicTypeInfo.INT_TYPE_INFO); | ||
AggregatingStateDescriptor<Integer, Integer, Integer> sameBySerializer = | ||
new AggregatingStateDescriptor<>(name, aggregator, BasicTypeInfo.INT_TYPE_INFO); | ||
|
||
// test that hashCode() works on state descriptors with initialized and uninitialized | ||
// serializers | ||
assertThat(same).hasSameHashCodeAs(original); | ||
assertThat(sameBySerializer).hasSameHashCodeAs(original); | ||
|
||
assertThat(same).isEqualTo(original); | ||
assertThat(sameBySerializer).isEqualTo(original); | ||
|
||
// equality with a clone | ||
AggregatingStateDescriptor<Integer, Integer, Integer> clone = | ||
CommonTestUtils.createCopySerializable(original); | ||
assertThat(clone).isEqualTo(original); | ||
} | ||
} |
Oops, something went wrong.