Skip to content

Commit

Permalink
[FLINK-7258] [network] Fix watermark configuration order
Browse files Browse the repository at this point in the history
When configuring larger memory segment sizes, configuring the
low watermark before the high watermark may lead to an
IllegalArgumentException, because the low watermark will
temporarily be higher than the high watermark. It's necessary
to configure the high watermark before the low watermark.

For the queryable state server in KvStateServer I didn't
add an extra test as the watermarks cannot be configured there.

This closes apache#4391.
  • Loading branch information
uce committed Jul 25, 2017
1 parent 53d6582 commit 038a9ac
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,8 @@ void init(final NettyProtocol protocol, NettyBufferPool nettyBufferPool) throws
}

// Low and high water marks for flow control
bootstrap.childOption(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, config.getMemorySegmentSize() + 1);
bootstrap.childOption(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 2 * config.getMemorySegmentSize());
bootstrap.childOption(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, config.getMemorySegmentSize() + 1);

// SSL related configuration
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,8 @@ public KvStateServer(
.option(ChannelOption.ALLOCATOR, bufferPool)
// Child channel options
.childOption(ChannelOption.ALLOCATOR, bufferPool)
.childOption(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, LOW_WATER_MARK)
.childOption(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, HIGH_WATER_MARK)
.childOption(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, LOW_WATER_MARK)
// See initializer for pipeline details
.childHandler(new KvStateServerChannelInitializer(serverHandler));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,17 @@
import static org.apache.flink.runtime.io.network.netty.NettyTestUtil.createConfig;
import static org.apache.flink.runtime.io.network.netty.NettyTestUtil.initServerAndClient;
import static org.apache.flink.runtime.io.network.netty.NettyTestUtil.shutdown;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

public class NettyServerLowAndHighWatermarkTest {

private final static int PageSize = 1024;
/**
* Pick a larger memory segment size here in order to trigger
* <a href="https://issues.apache.org/jira/browse/FLINK-7258">FLINK-7258</a>.
*/
private final static int PageSize = 65536;

/**
* Verifies that the high and low watermark are set in relation to the page size.
Expand All @@ -54,12 +59,16 @@ public class NettyServerLowAndHighWatermarkTest {
*/
@Test
public void testLowAndHighWatermarks() throws Throwable {
final int expectedLowWatermark = PageSize + 1;
final int expectedHighWatermark = 2 * PageSize;

final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
final NettyProtocol protocol = new NettyProtocol() {
@Override
public ChannelHandler[] getServerChannelHandlers() {
// The channel handler implements the test
return new ChannelHandler[] {new TestLowAndHighWatermarkHandler(error)};
return new ChannelHandler[] {new TestLowAndHighWatermarkHandler(
expectedLowWatermark, expectedHighWatermark, error)};
}

@Override
Expand Down Expand Up @@ -97,18 +106,27 @@ public ChannelHandler[] getClientChannelHandlers() {
*/
private static class TestLowAndHighWatermarkHandler extends ChannelInboundHandlerAdapter {

private final int expectedLowWatermark;

private final int expectedHighWatermark;

private final AtomicReference<Throwable> error;

private boolean hasFlushed;

public TestLowAndHighWatermarkHandler(AtomicReference<Throwable> error) {
public TestLowAndHighWatermarkHandler(int expectedLowWatermark, int expectedHighWatermark, AtomicReference<Throwable> error) {
this.expectedLowWatermark = expectedLowWatermark;
this.expectedHighWatermark = expectedHighWatermark;
this.error = error;
}

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
final Channel ch = ctx.channel();

assertEquals("Low watermark", expectedLowWatermark, ch.config().getWriteBufferLowWaterMark());
assertEquals("High watermark", expectedHighWatermark, ch.config().getWriteBufferHighWaterMark());

// Start with a writable channel
assertTrue(ch.isWritable());

Expand Down

0 comments on commit 038a9ac

Please sign in to comment.