Skip to content

Commit

Permalink
Fix sonar issue of MySQLBinlogEventHandler (apache#25644)
Browse files Browse the repository at this point in the history
* Fix sonar issue of InventoryTask

* Fix sonar issue of MySQLBinlogEventHandler

* Fix sonar issue of ServerInfo

* Fix sonar issue of ServerInfo

* Fix sonar issue of MySQLCommandPacketDecoder

* Fix sonar issue of MySQLCommandPacketDecoder

* Fix sonar issue of MySQLNegotiateHandler
  • Loading branch information
terrymanu authored May 13, 2023
1 parent 5c91008 commit 8dd163a
Show file tree
Hide file tree
Showing 8 changed files with 47 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import java.util.Collection;
import java.util.LinkedList;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;

/**
* Inventory task.
Expand All @@ -65,7 +66,7 @@ public final class InventoryTask implements PipelineTask, AutoCloseable {

private final Importer importer;

private volatile IngestPosition<?> position;
private final AtomicReference<IngestPosition<?>> position;

public InventoryTask(final InventoryDumperConfiguration inventoryDumperConfig, final ImporterConfiguration importerConfig,
final PipelineChannelCreator pipelineChannelCreator, final ImporterConnector importerConnector,
Expand All @@ -77,9 +78,9 @@ public InventoryTask(final InventoryDumperConfiguration inventoryDumperConfig, f
this.inventoryImporterExecuteEngine = inventoryImporterExecuteEngine;
channel = createChannel(pipelineChannelCreator);
dumper = new InventoryDumper(inventoryDumperConfig, channel, sourceDataSource, sourceMetaDataLoader);
importer = TypedSPILoader.getService(ImporterCreator.class, importerConnector.getType()).createImporter(importerConfig, importerConnector, channel, jobProgressListener,
ImporterType.INVENTORY);
position = inventoryDumperConfig.getPosition();
importer = TypedSPILoader.getService(ImporterCreator.class,
importerConnector.getType()).createImporter(importerConfig, importerConnector, channel, jobProgressListener, ImporterType.INVENTORY);
position = new AtomicReference<>(inventoryDumperConfig.getPosition());
}

private String generateTaskId(final InventoryDumperConfiguration inventoryDumperConfig) {
Expand Down Expand Up @@ -123,7 +124,7 @@ private PipelineChannel createChannel(final PipelineChannelCreator pipelineChann
return pipelineChannelCreator.createPipelineChannel(1, records -> {
Record lastNormalRecord = RecordUtils.getLastNormalRecord(records);
if (null != lastNormalRecord) {
position = lastNormalRecord.getPosition();
position.set(lastNormalRecord.getPosition());
}
});
}
Expand All @@ -136,7 +137,7 @@ public void stop() {

@Override
public InventoryTaskProgress getTaskProgress() {
return new InventoryTaskProgress(position);
return new InventoryTaskProgress(position.get());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.concurrent.DefaultPromise;
import io.netty.util.concurrent.Promise;
import lombok.AllArgsConstructor;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.core.exception.PipelineInternalException;
Expand Down Expand Up @@ -61,6 +60,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

/**
* MySQL Connector.
Expand Down Expand Up @@ -297,19 +297,22 @@ public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cau
}
}

@AllArgsConstructor
private final class MySQLBinlogEventHandler extends ChannelInboundHandlerAdapter {

private volatile AbstractBinlogEvent lastBinlogEvent;
private final AtomicReference<AbstractBinlogEvent> lastBinlogEvent;

MySQLBinlogEventHandler(final AbstractBinlogEvent lastBinlogEvent) {
this.lastBinlogEvent = new AtomicReference<>(lastBinlogEvent);
}

@Override
public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception {
if (!running) {
return;
}
if (msg instanceof AbstractBinlogEvent) {
lastBinlogEvent = (AbstractBinlogEvent) msg;
blockingEventQueue.put(lastBinlogEvent);
lastBinlogEvent.set((AbstractBinlogEvent) msg);
blockingEventQueue.put(lastBinlogEvent.get());
reconnectTimes.set(0);
}
}
Expand All @@ -325,8 +328,8 @@ public void channelInactive(final ChannelHandlerContext ctx) {

@Override
public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) {
String fileName = null == lastBinlogEvent ? null : lastBinlogEvent.getFileName();
Long position = null == lastBinlogEvent ? null : lastBinlogEvent.getPosition();
String fileName = null == lastBinlogEvent.get() ? null : lastBinlogEvent.get().getFileName();
Long position = null == lastBinlogEvent.get() ? null : lastBinlogEvent.get().getPosition();
log.error("MySQLBinlogEventHandler protocol resolution error, file name:{}, position:{}", fileName, position, cause);
}

Expand All @@ -339,7 +342,7 @@ private void reconnect() {
reconnectTimes.incrementAndGet();
connect();
log.info("reconnect times {}", reconnectTimes.get());
subscribe(lastBinlogEvent.getFileName(), lastBinlogEvent.getPosition());
subscribe(lastBinlogEvent.get().getFileName(), lastBinlogEvent.get().getPosition());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@
package org.apache.shardingsphere.data.pipeline.mysql.ingest.client;

import lombok.Getter;
import lombok.Setter;
import lombok.RequiredArgsConstructor;

/**
* MySQL server info.
*/
@RequiredArgsConstructor
@Getter
@Setter
public final class ServerInfo {

private volatile ServerVersion serverVersion;
private final ServerVersion serverVersion;
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import java.util.regex.Pattern;

/**
* Server Version.
* Server version.
*/
@Getter
@Slf4j
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,19 +31,16 @@
import org.apache.shardingsphere.db.protocol.mysql.payload.MySQLPacketPayload;

import java.util.List;
import java.util.concurrent.atomic.AtomicReference;

/**
* MySQL Command Packet decoder.
* MySQL command packet decoder.
*/
public final class MySQLCommandPacketDecoder extends ByteToMessageDecoder {

private enum States {
RESPONSE_PACKET, FIELD_PACKET, ROW_DATA_PACKET
}

private volatile States currentState = States.RESPONSE_PACKET;
private final AtomicReference<States> currentState = new AtomicReference<>(States.RESPONSE_PACKET);

private volatile InternalResultSet internalResultSet;
private final AtomicReference<InternalResultSet> internalResultSet = new AtomicReference<>();

@Override
protected void decode(final ChannelHandlerContext ctx, final ByteBuf in, final List<Object> out) {
Expand All @@ -52,11 +49,11 @@ protected void decode(final ChannelHandlerContext ctx, final ByteBuf in, final L
}

private void decodeCommandPacket(final MySQLPacketPayload payload, final List<Object> out) {
if (States.FIELD_PACKET == currentState) {
if (States.FIELD_PACKET == currentState.get()) {
decodeFieldPacket(payload);
return;
}
if (States.ROW_DATA_PACKET == currentState) {
if (States.ROW_DATA_PACKET == currentState.get()) {
decodeRowDataPacket(payload, out);
return;
}
Expand All @@ -66,20 +63,20 @@ private void decodeCommandPacket(final MySQLPacketPayload payload, final List<Ob
private void decodeFieldPacket(final MySQLPacketPayload payload) {
if (MySQLEofPacket.HEADER == (payload.getByteBuf().getByte(0) & 0xff)) {
new MySQLEofPacket(payload);
currentState = States.ROW_DATA_PACKET;
currentState.set(States.ROW_DATA_PACKET);
} else {
internalResultSet.getFieldDescriptors().add(new MySQLColumnDefinition41Packet(payload));
internalResultSet.get().getFieldDescriptors().add(new MySQLColumnDefinition41Packet(payload));
}
}

private void decodeRowDataPacket(final MySQLPacketPayload payload, final List<Object> out) {
if (MySQLEofPacket.HEADER == (payload.getByteBuf().getByte(0) & 0xff)) {
new MySQLEofPacket(payload);
out.add(internalResultSet);
currentState = States.RESPONSE_PACKET;
internalResultSet = null;
out.add(internalResultSet.get());
currentState.set(States.RESPONSE_PACKET);
internalResultSet.set(null);
} else {
internalResultSet.getFieldValues().add(new MySQLTextResultSetRowPacket(payload, internalResultSet.getHeader().getColumnCount()));
internalResultSet.get().getFieldValues().add(new MySQLTextResultSetRowPacket(payload, internalResultSet.get().getHeader().getColumnCount()));
}
}

Expand All @@ -93,9 +90,14 @@ private void decodeResponsePacket(final MySQLPacketPayload payload, final List<O
break;
default:
MySQLFieldCountPacket fieldCountPacket = new MySQLFieldCountPacket(payload);
currentState = States.FIELD_PACKET;
internalResultSet = new InternalResultSet(fieldCountPacket);
currentState.set(States.FIELD_PACKET);
internalResultSet.set(new InternalResultSet(fieldCountPacket));
break;
}
}

private enum States {

RESPONSE_PACKET, FIELD_PACKET, ROW_DATA_PACKET
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
import java.security.NoSuchAlgorithmException;

/**
* MySQL Negotiate Handler.
* MySQL negotiate handler.
*/
@RequiredArgsConstructor
public final class MySQLNegotiateHandler extends ChannelInboundHandlerAdapter {
Expand Down Expand Up @@ -75,8 +75,7 @@ public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
handshakeResponsePacket.setCapabilityFlags(generateClientCapability());
handshakeResponsePacket.setAuthPluginName(MySQLAuthenticationMethod.NATIVE);
ctx.channel().writeAndFlush(handshakeResponsePacket);
serverInfo = new ServerInfo();
serverInfo.setServerVersion(new ServerVersion(handshake.getServerVersion()));
serverInfo = new ServerInfo(new ServerVersion(handshake.getServerVersion()));
return;
}
if (msg instanceof MySQLAuthSwitchRequestPacket) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ void setUp() throws InterruptedException {

@Test
void assertConnect() throws ReflectiveOperationException {
ServerInfo expected = new ServerInfo();
ServerInfo expected = new ServerInfo(new ServerVersion("5.5.0-log"));
mockChannelResponse(expected);
mysqlClient.connect();
ServerInfo actual = (ServerInfo) Plugins.getMemberAccessor().get(MySQLClient.class.getDeclaredField("serverInfo"), mysqlClient);
Expand Down Expand Up @@ -123,8 +123,7 @@ void assertExecuteQuery() throws ReflectiveOperationException {

@Test
void assertSubscribeBelow56Version() throws ReflectiveOperationException {
ServerInfo serverInfo = new ServerInfo();
serverInfo.setServerVersion(new ServerVersion("5.5.0-log"));
ServerInfo serverInfo = new ServerInfo(new ServerVersion("5.5.0-log"));
Plugins.getMemberAccessor().set(MySQLClient.class.getDeclaredField("serverInfo"), mysqlClient, serverInfo);
Plugins.getMemberAccessor().set(MySQLClient.class.getDeclaredField("channel"), mysqlClient, channel);
Plugins.getMemberAccessor().set(MySQLClient.class.getDeclaredField("eventLoopGroup"), mysqlClient, new NioEventLoopGroup(1));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.netty.channel.ChannelPipeline;
import io.netty.util.concurrent.Promise;
import org.apache.shardingsphere.data.pipeline.mysql.ingest.client.ServerInfo;
import org.apache.shardingsphere.data.pipeline.mysql.ingest.client.ServerVersion;
import org.apache.shardingsphere.db.protocol.mysql.constant.MySQLAuthenticationMethod;
import org.apache.shardingsphere.db.protocol.mysql.packet.generic.MySQLErrPacket;
import org.apache.shardingsphere.db.protocol.mysql.packet.generic.MySQLOKPacket;
Expand Down Expand Up @@ -91,7 +92,7 @@ void assertChannelReadHandshakeInitPacket() throws ReflectiveOperationException
@Test
void assertChannelReadOkPacket() throws ReflectiveOperationException {
MySQLOKPacket okPacket = new MySQLOKPacket(0);
ServerInfo serverInfo = new ServerInfo();
ServerInfo serverInfo = new ServerInfo(new ServerVersion("5.5.0-log"));
Plugins.getMemberAccessor().set(MySQLNegotiateHandler.class.getDeclaredField("serverInfo"), mysqlNegotiateHandler, serverInfo);
mysqlNegotiateHandler.channelRead(channelHandlerContext, okPacket);
verify(pipeline).remove(mysqlNegotiateHandler);
Expand Down

0 comments on commit 8dd163a

Please sign in to comment.