Skip to content

Commit

Permalink
[FLINK-9514,FLINK-9515,FLINK-9516] Introduce wrapper to enhance state…
Browse files Browse the repository at this point in the history
… with ttl

This closes apache#6186.
  • Loading branch information
azagrebin authored and StefanRRichter committed Jun 29, 2018
1 parent 92a6a22 commit 2c13e00
Show file tree
Hide file tree
Showing 27 changed files with 2,243 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.state.ttl;

import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.SupplierWithException;
import org.apache.flink.util.function.ThrowingConsumer;
import org.apache.flink.util.function.ThrowingRunnable;

import javax.annotation.Nonnull;

/**
* Base class for TTL logic wrappers.
*
* @param <T> Type of originally wrapped object
*/
abstract class AbstractTtlDecorator<T> {
/** Wrapped original state handler. */
final T original;

final TtlConfig config;

final TtlTimeProvider timeProvider;

/** Whether to renew expiration timestamp on state read access. */
final boolean updateTsOnRead;

/** Whether to renew expiration timestamp on state read access. */
final boolean returnExpired;

/** State value time to live in milliseconds. */
final long ttl;

AbstractTtlDecorator(
T original,
TtlConfig config,
TtlTimeProvider timeProvider) {
Preconditions.checkNotNull(original);
Preconditions.checkNotNull(config);
Preconditions.checkNotNull(timeProvider);
Preconditions.checkArgument(config.getTtlUpdateType() != TtlConfig.TtlUpdateType.Disabled,
"State does not need to be wrapped with TTL if it is configured as disabled.");
this.original = original;
this.config = config;
this.timeProvider = timeProvider;
this.updateTsOnRead = config.getTtlUpdateType() == TtlConfig.TtlUpdateType.OnReadAndWrite;
this.returnExpired = config.getStateVisibility() == TtlConfig.TtlStateVisibility.ReturnExpiredIfNotCleanedUp;
this.ttl = config.getTtl().toMilliseconds();
}

<V> V getUnexpired(TtlValue<V> ttlValue) {
return ttlValue == null || (expired(ttlValue) && !returnExpired) ? null : ttlValue.getUserValue();
}

<V> boolean expired(TtlValue<V> ttlValue) {
return ttlValue != null && getExpirationTimestamp(ttlValue) <= timeProvider.currentTimestamp();
}

private long getExpirationTimestamp(@Nonnull TtlValue<?> ttlValue) {
long ts = ttlValue.getLastAccessTimestamp();
long ttlWithoutOverflow = ts > 0 ? Math.min(Long.MAX_VALUE - ts, ttl) : ttl;
return ts + ttlWithoutOverflow;
}

<V> TtlValue<V> wrapWithTs(V value) {
return wrapWithTs(value, timeProvider.currentTimestamp());
}

static <V> TtlValue<V> wrapWithTs(V value, long ts) {
return value == null ? null : new TtlValue<>(value, ts);
}

<V> TtlValue<V> rewrapWithNewTs(TtlValue<V> ttlValue) {
return wrapWithTs(ttlValue.getUserValue());
}

<SE extends Throwable, CE extends Throwable, CLE extends Throwable, V> V getWithTtlCheckAndUpdate(
SupplierWithException<TtlValue<V>, SE> getter,
ThrowingConsumer<TtlValue<V>, CE> updater,
ThrowingRunnable<CLE> stateClear) throws SE, CE, CLE {
TtlValue<V> ttlValue = getter.get();
if (ttlValue == null) {
return null;
} else if (expired(ttlValue)) {
stateClear.run();
if (!returnExpired) {
return null;
}
} else if (updateTsOnRead) {
updater.accept(rewrapWithNewTs(ttlValue));
}
return ttlValue.getUserValue();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.state.ttl;

import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.state.internal.InternalKvState;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.function.SupplierWithException;
import org.apache.flink.util.function.ThrowingConsumer;

/**
* Base class for TTL logic wrappers of state objects.
*
* @param <K> The type of key the state is associated to
* @param <N> The type of the namespace
* @param <SV> The type of values kept internally in state without TTL
* @param <TTLSV> The type of values kept internally in state with TTL
* @param <S> Type of originally wrapped state object
*/
abstract class AbstractTtlState<K, N, SV, TTLSV, S extends InternalKvState<K, N, TTLSV>>
extends AbstractTtlDecorator<S>
implements InternalKvState<K, N, SV> {
private final TypeSerializer<SV> valueSerializer;

AbstractTtlState(S original, TtlConfig config, TtlTimeProvider timeProvider, TypeSerializer<SV> valueSerializer) {
super(original, config, timeProvider);
this.valueSerializer = valueSerializer;
}

<SE extends Throwable, CE extends Throwable, T> T getWithTtlCheckAndUpdate(
SupplierWithException<TtlValue<T>, SE> getter,
ThrowingConsumer<TtlValue<T>, CE> updater) throws SE, CE {
return getWithTtlCheckAndUpdate(getter, updater, original::clear);
}

@Override
public TypeSerializer<K> getKeySerializer() {
return original.getKeySerializer();
}

@Override
public TypeSerializer<N> getNamespaceSerializer() {
return original.getNamespaceSerializer();
}

@Override
public TypeSerializer<SV> getValueSerializer() {
return valueSerializer;
}

@Override
public void setCurrentNamespace(N namespace) {
original.setCurrentNamespace(namespace);
}

@Override
public byte[] getSerializedValue(
byte[] serializedKeyAndNamespace,
TypeSerializer<K> safeKeySerializer,
TypeSerializer<N> safeNamespaceSerializer,
TypeSerializer<SV> safeValueSerializer) {
throw new FlinkRuntimeException("Queryable state is not currently supported with TTL.");
}

@Override
public void clear() {
original.clear();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.state.ttl;

import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.ThrowingConsumer;
import org.apache.flink.util.function.ThrowingRunnable;

/**
* This class wraps aggregating function with TTL logic.
*
* @param <IN> The type of the values that are aggregated (input values)
* @param <ACC> The type of the accumulator (intermediate aggregate state).
* @param <OUT> The type of the aggregated result
*/
class TtlAggregateFunction<IN, ACC, OUT>
extends AbstractTtlDecorator<AggregateFunction<IN, ACC, OUT>>
implements AggregateFunction<IN, TtlValue<ACC>, OUT> {
ThrowingRunnable<Exception> stateClear;
ThrowingConsumer<TtlValue<ACC>, Exception> updater;

TtlAggregateFunction(AggregateFunction<IN, ACC, OUT> aggFunction, TtlConfig config, TtlTimeProvider timeProvider) {
super(aggFunction, config, timeProvider);
}

@Override
public TtlValue<ACC> createAccumulator() {
return wrapWithTs(original.createAccumulator());
}

@Override
public TtlValue<ACC> add(IN value, TtlValue<ACC> accumulator) {
ACC userAcc = getUnexpired(accumulator);
userAcc = userAcc == null ? original.createAccumulator() : userAcc;
return wrapWithTs(original.add(value, userAcc));
}

@Override
public OUT getResult(TtlValue<ACC> accumulator) {
Preconditions.checkNotNull(updater, "State updater should be set in TtlAggregatingState");
Preconditions.checkNotNull(stateClear, "State clearing should be set in TtlAggregatingState");
ACC userAcc;
try {
userAcc = getWithTtlCheckAndUpdate(() -> accumulator, updater, stateClear);
} catch (Exception e) {
throw new FlinkRuntimeException("Failed to retrieve original internal aggregating state", e);
}
return userAcc == null ? null : original.getResult(userAcc);
}

@Override
public TtlValue<ACC> merge(TtlValue<ACC> a, TtlValue<ACC> b) {
ACC userA = getUnexpired(a);
ACC userB = getUnexpired(b);
if (userA != null && userB != null) {
return wrapWithTs(original.merge(userA, userB));
} else if (userA != null) {
return rewrapWithNewTs(a);
} else if (userB != null) {
return rewrapWithNewTs(b);
} else {
return null;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.state.ttl;

import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.state.internal.InternalAggregatingState;

import java.util.Collection;

/**
* This class wraps aggregating state with TTL logic.
*
* @param <K> The type of key the state is associated to
* @param <N> The type of the namespace
* @param <IN> Type of the value added to the state
* @param <ACC> The type of the accumulator (intermediate aggregate state).
* @param <OUT> Type of the value extracted from the state
*
*/
class TtlAggregatingState<K, N, IN, ACC, OUT>
extends AbstractTtlState<K, N, ACC, TtlValue<ACC>, InternalAggregatingState<K, N, IN, TtlValue<ACC>, OUT>>
implements InternalAggregatingState<K, N, IN, ACC, OUT> {

TtlAggregatingState(
InternalAggregatingState<K, N, IN, TtlValue<ACC>, OUT> originalState,
TtlConfig config,
TtlTimeProvider timeProvider,
TypeSerializer<ACC> valueSerializer,
TtlAggregateFunction<IN, ACC, OUT> aggregateFunction) {
super(originalState, config, timeProvider, valueSerializer);
aggregateFunction.stateClear = originalState::clear;
aggregateFunction.updater = originalState::updateInternal;
}

@Override
public OUT get() throws Exception {
return original.get();
}

@Override
public void add(IN value) throws Exception {
original.add(value);
}

@Override
public void clear() {
original.clear();
}

@Override
public ACC getInternal() throws Exception {
return getWithTtlCheckAndUpdate(original::getInternal, original::updateInternal);
}

@Override
public void updateInternal(ACC valueToStore) throws Exception {
original.updateInternal(wrapWithTs(valueToStore));
}

@Override
public void mergeNamespaces(N target, Collection<N> sources) throws Exception {
original.mergeNamespaces(target, sources);
}
}
Loading

0 comments on commit 2c13e00

Please sign in to comment.