Skip to content

Commit

Permalink
fixed test
Browse files Browse the repository at this point in the history
  • Loading branch information
agapple committed Sep 30, 2018
1 parent 9344ed1 commit eb2a83b
Show file tree
Hide file tree
Showing 6 changed files with 48 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import org.apache.kafka.common.serialization.StringDeserializer;

import com.alibaba.fastjson.JSON;
import com.alibaba.otter.canal.client.CanalMessageDeserializer;
import com.alibaba.otter.canal.protocol.FlatMessage;
import com.alibaba.otter.canal.protocol.Message;

Expand Down Expand Up @@ -50,7 +49,7 @@ public KafkaCanalConnector(String servers, String topic, Integer partition, Stri
properties.put("max.poll.records", "100");
properties.put("key.deserializer", StringDeserializer.class.getName());
if (!flatMessage) {
properties.put("value.deserializer", CanalMessageDeserializer.class.getName());
properties.put("value.deserializer", MessageDeserializer.class.getName());
} else {
properties.put("value.deserializer", StringDeserializer.class.getName());
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package com.alibaba.otter.canal.client.kafka;

import java.util.Map;

import org.apache.kafka.common.serialization.Deserializer;

import com.alibaba.otter.canal.client.CanalMessageDeserializer;
import com.alibaba.otter.canal.protocol.Message;

/**
* Kafka Message类的反序列化
*
* @author machengyuan @ 2018-6-12
* @version 1.0.0
*/
public class MessageDeserializer implements Deserializer<Message> {

@Override
public void configure(Map<String, ?> configs, boolean isKey) {
}

@Override
public Message deserialize(String topic1, byte[] data) {
return CanalMessageDeserializer.deserializer(data);
}

@Override
public void close() {
// nothing to do
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1082,10 +1082,10 @@ final Serializable fetchValue(int type, final int meta, boolean isBinary) {
buffer.fillBytes(binary, 0, len);

/* Warning unsupport cloumn type */
logger.warn(String.format("!! Unsupport column type MYSQL_TYPE_GEOMETRY: meta=%d (%04X), len = %d",
meta,
meta,
len));
// logger.warn(String.format("!! Unsupport column type MYSQL_TYPE_GEOMETRY: meta=%d (%04X), len = %d",
// meta,
// meta,
// len));
javaType = Types.BINARY;
value = binary;
length = len;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package com.alibaba.otter.canal.example;

import java.net.InetSocketAddress;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
Expand Down Expand Up @@ -44,7 +46,7 @@ public void run() {
connector.connect();
connector.subscribe();
while (true) {
Message message = connector.getWithoutAck(batchSize);
Message message = connector.getWithoutAck(batchSize, 100L, TimeUnit.MILLISECONDS);
long batchId = message.getId();
int size = message.getRawEntries().size();
sum += size;
Expand All @@ -53,11 +55,13 @@ public void run() {
queue.add(batchId);
if (count % 10 == 0) {
end = System.currentTimeMillis();
long tps = (perSum * 1000) / (end - start);
System.out.println(" total : " + sum + " , current : " + perSum + " , cost : " + (end - start)
+ " , tps : " + tps);
start = end;
perSum = 0;
if (end - start != 0) {
long tps = (perSum * 1000) / (end - start);
System.out.println(" total : " + sum + " , current : " + perSum + " , cost : " + (end - start)
+ " , tps : " + tps);
start = end;
perSum = 0;
}
}
}
} catch (Throwable e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public SessionHandler(CanalServerWithEmbedded embeddedServer){
this.embeddedServer = embeddedServer;
}

@SuppressWarnings({ "deprecation", "deprecation" })
@SuppressWarnings({ "deprecation" })
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
logger.info("message receives in session handler...");
long start = System.nanoTime();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ public void testSimple() throws IOException {
}
}

@SuppressWarnings("deprecation")
private byte[] buildData(Message message) throws IOException {
List<ByteString> rowEntries = message.getRawEntries();
// message size
Expand Down

0 comments on commit eb2a83b

Please sign in to comment.