Skip to content

Commit

Permalink
fix bug: 并行解析模式下,开启gtid 会导致解析错误
Browse files Browse the repository at this point in the history
  • Loading branch information
wingerx committed Aug 24, 2018
1 parent 00b5ade commit c1f9285
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 8 deletions.
5 changes: 5 additions & 0 deletions dbsync/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@
<packaging>jar</packaging>
<name>canal dbsync module for otter ${project.version}</name>
<dependencies>
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.parse.driver</artifactId>
<version>${project.version}</version>
</dependency>
<!-- log -->
<dependency>
<groupId>ch.qos.logback</groupId>
Expand Down
20 changes: 20 additions & 0 deletions dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/LogContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@
import java.util.HashMap;
import java.util.Map;

import com.alibaba.otter.canal.parse.driver.mysql.packets.GTIDSet;
import com.alibaba.otter.canal.parse.driver.mysql.packets.MysqlGTIDSet;
import com.taobao.tddl.dbsync.binlog.event.FormatDescriptionLogEvent;
import com.taobao.tddl.dbsync.binlog.event.GtidLogEvent;
import com.taobao.tddl.dbsync.binlog.event.TableMapLogEvent;

/**
Expand All @@ -20,6 +23,8 @@ public final class LogContext {

private LogPosition logPosition;

private GTIDSet gtidSet;

public LogContext(){
this.formatDescription = FormatDescriptionLogEvent.FORMAT_DESCRIPTION_EVENT_5_x;
}
Expand Down Expand Up @@ -60,4 +65,19 @@ public void reset() {
formatDescription = FormatDescriptionLogEvent.FORMAT_DESCRIPTION_EVENT_5_x;
mapOfTable.clear();
}

public final void putGtid(GtidLogEvent logEvent) {
if (logEvent != null) {
String gtid = logEvent.getSid().toString() + ":" + logEvent.getGno();
if (gtidSet == null) {
gtid = logEvent.getSid().toString() + ":1-" + logEvent.getGno();
gtidSet = MysqlGTIDSet.parse(gtid);
}
gtidSet.update(gtid);
}
}

public GTIDSet getGtidSet() {
return gtidSet;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import java.io.IOException;
import java.util.BitSet;

import com.alibaba.otter.canal.parse.driver.mysql.packets.GTIDSet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

Expand Down Expand Up @@ -159,18 +160,19 @@ public static LogEvent decode(LogBuffer buffer, LogHeader header, LogContext con
// remove checksum bytes
buffer.limit(header.getEventLen() - LogEvent.BINLOG_CHECKSUM_LEN);
}

switch (header.getType()) {
case LogEvent.QUERY_EVENT: {
QueryLogEvent event = new QueryLogEvent(header, buffer, descriptionEvent);
/* updating position in context */
logPosition.position = header.getLogPos();
header.putGtidStr(context.getGtidSet());
return event;
}
case LogEvent.XID_EVENT: {
XidLogEvent event = new XidLogEvent(header, buffer, descriptionEvent);
/* updating position in context */
logPosition.position = header.getLogPos();
header.putGtidStr(context.getGtidSet());
return event;
}
case LogEvent.TABLE_MAP_EVENT: {
Expand Down Expand Up @@ -268,12 +270,14 @@ public static LogEvent decode(LogBuffer buffer, LogHeader header, LogContext con
RandLogEvent event = new RandLogEvent(header, buffer, descriptionEvent);
/* updating position in context */
logPosition.position = header.getLogPos();
header.putGtidStr(context.getGtidSet());
return event;
}
case LogEvent.USER_VAR_EVENT: {
UserVarLogEvent event = new UserVarLogEvent(header, buffer, descriptionEvent);
/* updating position in context */
logPosition.position = header.getLogPos();
header.putGtidStr(context.getGtidSet());
return event;
}
case LogEvent.FORMAT_DESCRIPTION_EVENT: {
Expand Down Expand Up @@ -324,6 +328,7 @@ public static LogEvent decode(LogBuffer buffer, LogHeader header, LogContext con
HeartbeatLogEvent event = new HeartbeatLogEvent(header, buffer, descriptionEvent);
/* updating position in context */
logPosition.position = header.getLogPos();
header.putGtidStr(context.getGtidSet());
return event;
}
case LogEvent.IGNORABLE_LOG_EVENT: {
Expand All @@ -336,34 +341,40 @@ public static LogEvent decode(LogBuffer buffer, LogHeader header, LogContext con
RowsQueryLogEvent event = new RowsQueryLogEvent(header, buffer, descriptionEvent);
/* updating position in context */
logPosition.position = header.getLogPos();
header.putGtidStr(context.getGtidSet());
return event;
}
case LogEvent.WRITE_ROWS_EVENT: {
RowsLogEvent event = new WriteRowsLogEvent(header, buffer, descriptionEvent);
/* updating position in context */
logPosition.position = header.getLogPos();
event.fillTable(context);
header.putGtidStr(context.getGtidSet());
return event;
}
case LogEvent.UPDATE_ROWS_EVENT: {
RowsLogEvent event = new UpdateRowsLogEvent(header, buffer, descriptionEvent);
/* updating position in context */
logPosition.position = header.getLogPos();
event.fillTable(context);
header.putGtidStr(context.getGtidSet());
return event;
}
case LogEvent.DELETE_ROWS_EVENT: {
RowsLogEvent event = new DeleteRowsLogEvent(header, buffer, descriptionEvent);
/* updating position in context */
logPosition.position = header.getLogPos();
event.fillTable(context);
header.putGtidStr(context.getGtidSet());
return event;
}
case LogEvent.GTID_LOG_EVENT:
case LogEvent.ANONYMOUS_GTID_LOG_EVENT: {
GtidLogEvent event = new GtidLogEvent(header, buffer, descriptionEvent);
/* updating position in context */
logPosition.position = header.getLogPos();
// update latest gtid
context.putGtid(event);
return event;
}
case LogEvent.PREVIOUS_GTIDS_LOG_EVENT: {
Expand Down Expand Up @@ -394,6 +405,7 @@ public static LogEvent decode(LogBuffer buffer, LogHeader header, LogContext con
AnnotateRowsEvent event = new AnnotateRowsEvent(header, buffer, descriptionEvent);
/* updating position in context */
logPosition.position = header.getLogPos();
header.putGtidStr(context.getGtidSet());
return event;
}
case LogEvent.BINLOG_CHECKPOINT_EVENT: {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.taobao.tddl.dbsync.binlog.event;

import com.alibaba.otter.canal.parse.driver.mysql.packets.GTIDSet;
import com.taobao.tddl.dbsync.binlog.LogBuffer;
import com.taobao.tddl.dbsync.binlog.LogEvent;

Expand Down Expand Up @@ -120,6 +121,8 @@ public final class LogHeader {
*/
protected String logFileName;

protected String gtidStr;

/* for Start_event_v3 */
public LogHeader(final int type){
this.type = type;
Expand Down Expand Up @@ -288,4 +291,14 @@ private void processCheckSum(LogBuffer buffer) {
crc = buffer.getUint32(eventLen - LogEvent.BINLOG_CHECKSUM_LEN);
}
}

public String getGtidStr() {
return gtidStr;
}

public void putGtidStr(GTIDSet gtidSet) {
if (gtidSet != null) {
this.gtidStr = gtidSet.toString();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -170,15 +170,13 @@ private Entry parseHeartbeatLogEvent(HeartbeatLogEvent logEvent) {
return entryBuilder.build();
}

private Entry parseGTIDLogEvent(GtidLogEvent logEvent) {
public Entry parseGTIDLogEvent(GtidLogEvent logEvent) {
LogHeader logHeader = logEvent.getHeader();
String value = logEvent.getSid().toString() + ":" + logEvent.getGno();
Pair.Builder builder = Pair.newBuilder();
builder.setKey("gtid");
builder.setValue(value);
if (gtidSet != null) {
gtidSet.update(value);
}

if (logEvent.getLastCommitted() != null) {
builder.setKey("lastCommitted");
builder.setValue(String.valueOf(logEvent.getLastCommitted()));
Expand Down Expand Up @@ -845,9 +843,8 @@ private Header createHeader(LogHeader logHeader, String schemaName, String table
}
headerBuilder.setEventLength(logHeader.getEventLen());
// enable gtid position
if (gtidSet != null) {
String gtid = gtidSet.toString();
headerBuilder.setGtid(gtid);
if (StringUtils.isNotEmpty(logHeader.getGtidStr())) {
headerBuilder.setGtid(logHeader.getGtidStr());
}

// add rowsCount suppport
Expand Down

0 comments on commit c1f9285

Please sign in to comment.