Skip to content

Commit

Permalink
Remove CDCConnectionStatus (apache#25638)
Browse files Browse the repository at this point in the history
  • Loading branch information
azexcy authored May 13, 2023
1 parent f8ae121 commit 9b7891f
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 42 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,21 @@
package org.apache.shardingsphere.data.pipeline.cdc.context;

import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.Setter;
import org.apache.shardingsphere.data.pipeline.cdc.constant.CDCConnectionStatus;
import org.apache.shardingsphere.infra.metadata.user.ShardingSphereUser;

/**
* CDC connection context.
*/
@Getter
@Setter
@RequiredArgsConstructor
public final class CDCConnectionContext {

private volatile CDCConnectionStatus status;
private final ShardingSphereUser currentUser;

private volatile String database;

private volatile String jobId;

private volatile ShardingSphereUser currentUser;
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.authority.model.ShardingSpherePrivileges;
import org.apache.shardingsphere.authority.rule.AuthorityRule;
import org.apache.shardingsphere.data.pipeline.cdc.constant.CDCConnectionStatus;
import org.apache.shardingsphere.data.pipeline.cdc.context.CDCConnectionContext;
import org.apache.shardingsphere.data.pipeline.cdc.exception.CDCExceptionWrapper;
import org.apache.shardingsphere.data.pipeline.cdc.exception.CDCLoginException;
Expand Down Expand Up @@ -72,9 +71,6 @@ public final class CDCChannelInboundHandler extends ChannelInboundHandlerAdapter

@Override
public void channelActive(final ChannelHandlerContext ctx) {
CDCConnectionContext context = new CDCConnectionContext();
context.setStatus(CDCConnectionStatus.NOT_LOGGED_IN);
ctx.channel().attr(CONNECTION_CONTEXT_KEY).setIfAbsent(context);
CDCResponse response = CDCResponse.newBuilder().setServerGreetingResult(ServerGreetingResult.newBuilder().setServerVersion(ShardingSphereVersion.VERSION).setProtocolVersion("1")
.build()).build();
ctx.writeAndFlush(response);
Expand All @@ -93,7 +89,6 @@ public void channelInactive(final ChannelHandlerContext ctx) {
public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) {
log.error("caught CDC resolution error", cause);
// TODO add CDC exception to wrapper this exception, and add the parameters requestId and whether to close connect
CDCConnectionContext connectionContext = ctx.channel().attr(CONNECTION_CONTEXT_KEY).get();
ChannelFuture channelFuture;
if (cause instanceof CDCExceptionWrapper) {
CDCExceptionWrapper wrapper = (CDCExceptionWrapper) cause;
Expand All @@ -102,18 +97,18 @@ public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cau
} else {
channelFuture = ctx.writeAndFlush(CDCResponseGenerator.failed("", XOpenSQLState.GENERAL_ERROR.getValue(), String.valueOf(cause.getMessage())));
}
if (CDCConnectionStatus.NOT_LOGGED_IN == connectionContext.getStatus()) {
CDCConnectionContext connectionContext = ctx.channel().attr(CONNECTION_CONTEXT_KEY).get();
if (null == connectionContext) {
channelFuture.addListener(ChannelFutureListener.CLOSE);
}
}

@Override
public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
CDCConnectionContext connectionContext = ctx.channel().attr(CONNECTION_CONTEXT_KEY).get();
CDCConnectionStatus status = connectionContext.getStatus();
CDCRequest request = (CDCRequest) msg;
if (CDCConnectionStatus.NOT_LOGGED_IN == status) {
processLogin(ctx, request, connectionContext);
if (null == connectionContext) {
processLogin(ctx, request);
return;
}
switch (request.getType()) {
Expand Down Expand Up @@ -141,16 +136,15 @@ public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
}
}

private void processLogin(final ChannelHandlerContext ctx, final CDCRequest request, final CDCConnectionContext connectionContext) {
private void processLogin(final ChannelHandlerContext ctx, final CDCRequest request) {
if (!request.hasLoginRequestBody() || !request.getLoginRequestBody().hasBasicBody()) {
throw new CDCExceptionWrapper(request.getRequestId(), new PipelineInvalidParameterException("Login request body is empty"));
}
BasicBody body = request.getLoginRequestBody().getBasicBody();
AuthorityRule authorityRule = ProxyContext.getInstance().getContextManager().getMetaDataContexts().getMetaData().getGlobalRuleMetaData().getSingleRule(AuthorityRule.class);
Optional<ShardingSphereUser> user = authorityRule.findUser(new Grantee(body.getUsername(), getHostAddress(ctx)));
if (user.isPresent() && Objects.equals(Hashing.sha256().hashBytes(user.get().getPassword().getBytes()).toString().toUpperCase(), body.getPassword())) {
connectionContext.setStatus(CDCConnectionStatus.LOGGED_IN);
connectionContext.setCurrentUser(user.get());
ctx.channel().attr(CONNECTION_CONTEXT_KEY).set(new CDCConnectionContext(user.get()));
ctx.writeAndFlush(CDCResponseGenerator.succeedBuilder(request.getRequestId()).build());
} else {
throw new CDCExceptionWrapper(request.getRequestId(), new CDCLoginException("Illegal username or password"));
Expand Down

0 comments on commit 9b7891f

Please sign in to comment.