Skip to content

Commit

Permalink
fixed duplicate handler
Browse files Browse the repository at this point in the history
  • Loading branch information
adyliu committed Mar 18, 2014
1 parent 9d2d08b commit 2e3ad5d
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 46 deletions.
26 changes: 15 additions & 11 deletions src/main/java/com/sohu/jafka/producer/async/AsyncProducer.java
Original file line number Diff line number Diff line change
Expand Up @@ -123,22 +123,26 @@ public void send(String topic, T event, int partition) {
if (this.callbackHandler != null) {
data = this.callbackHandler.beforeEnqueue(data);
}

boolean added = false;
try {
if (enqueueTimeoutMs == 0) {
added = queue.offer(data);
} else if (enqueueTimeoutMs < 0) {
queue.put(data);
added = true;
} else {
added = queue.offer(data, enqueueTimeoutMs, TimeUnit.MILLISECONDS);
if (data != null) {
try {
if (enqueueTimeoutMs == 0) {
added = queue.offer(data);
} else if (enqueueTimeoutMs < 0) {
queue.put(data);
added = true;
} else {
added = queue.offer(data, enqueueTimeoutMs, TimeUnit.MILLISECONDS);
}
} catch (InterruptedException e) {
throw new AsyncProducerInterruptedException(e);
}
} catch (InterruptedException e) {
throw new AsyncProducerInterruptedException(e);
}
if (this.callbackHandler != null && this.callbackHandler.beforeEnqueue(data) != null) {
if (this.callbackHandler != null) {
this.callbackHandler.afterEnqueue(data, added);
}

if (!added) {
AsyncProducerStats.recordDroppedEvents();
throw new QueueFullException("Event queue is full of unsent messages, could not send event: " + event);
Expand Down
26 changes: 13 additions & 13 deletions src/main/java/com/sohu/jafka/producer/async/CallbackHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
Expand All @@ -25,62 +25,62 @@
* give the user some callback handles to insert custom functionality at
* various stages as the data flows through the pipeline of the async
* producer
*
*
* @author adyliu ([email protected])
* @since 1.0
*/
public interface CallbackHandler<T> {

/**
* Initializes the callback handler using a Properties object
*
*
* @param properties properties used to initialize the callback handler
*/
void init(Properties properties);

/**
* Callback to process the data before it enters the batching queue of
* the asynchronous producer
*
*
* @param data the data sent to the producer
* @return the processed data that enters the queue
* @return the processed data that enters the queue or null
*/
QueueItem<T> beforeEnqueue(QueueItem<T> data);

/**
* Callback to process the data right after it enters the batching
* queue of the asynchronous producer
*
* @param data the data sent to the producer
*
* @param data the data sent to the producer after {@link #beforeEnqueue(QueueItem)}
* @param added flag that indicates if the data was successfully added
* to the queue
* to the queue
*/
QueueItem<T> afterEnqueue(QueueItem<T> data, boolean added);

/**
* Callback to process the data item right after it has been dequeued
* by the background sender thread of the asynchronous producer
*
*
* @param data the data item dequeued from the async producer queue
* @return the processed list of data items that gets added to the data
* handled by the event handler
* handled by the event handler
*/
List<QueueItem<T>> afterDequeuingExistingData(QueueItem<T> data);

/**
* Callback to process the batched data right before it is being sent
* by the handle API of the event handler
*
*
* @param data the batched data received by the event handler
* @return the processed batched data that gets sent by the handle()
* API of the event handler
* API of the event handler
*/
List<QueueItem<T>> beforeSendingData(List<QueueItem<T>> data);

/**
* Callback to process the last batch of data right before the producer
* send thread is shutdown
*
*
* @return the last batch of data that is sent to the EventHandler
*/
List<QueueItem<T>> lastBatchBeforeClose();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package com.sohu.jafka.producer.async;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -64,7 +65,7 @@ public void init(Properties properties) {

public void handle(List<QueueItem<T>> events, SyncProducer producer, Encoder<T> encoder) {
List<QueueItem<T>> processedEvents = events;
if (this.callbackHandler != null && this.callbackHandler.beforeSendingData(events) != null) {
if (this.callbackHandler != null) {
processedEvents = this.callbackHandler.beforeSendingData(events);
}
send(collate(processedEvents, encoder), producer);
Expand All @@ -89,7 +90,9 @@ private void send(List<ProducerRequest> produces, SyncProducer syncProducer) {
}

private List<ProducerRequest> collate(List<QueueItem<T>> events, Encoder<T> encoder) {
//TODO: to be continue
if(events == null || events.isEmpty()){
return Collections.emptyList();
}
final Map<String, Map<Integer, List<Message>>> topicPartitionData = new HashMap<String, Map<Integer, List<Message>>>();
for (QueueItem<T> event : events) {
Map<Integer, List<Message>> partitionData = topicPartitionData.get(event.topic);
Expand Down
45 changes: 25 additions & 20 deletions src/main/java/com/sohu/jafka/producer/async/ProducerSendThread.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
Expand All @@ -17,20 +17,19 @@

package com.sohu.jafka.producer.async;

import static java.lang.String.format;
import com.sohu.jafka.common.IllegalQueueStateException;
import com.sohu.jafka.producer.SyncProducer;
import com.sohu.jafka.producer.serializer.Encoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;


import com.sohu.jafka.common.IllegalQueueStateException;
import com.sohu.jafka.producer.SyncProducer;
import com.sohu.jafka.producer.serializer.Encoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static java.lang.String.format;

/**
* @author adyliu ([email protected])
Expand Down Expand Up @@ -62,13 +61,13 @@ public class ProducerSendThread<T> extends Thread {
private volatile boolean shutdown = false;

public ProducerSendThread(String threadName, //
BlockingQueue<QueueItem<T>> queue, //
Encoder<T> serializer, //
SyncProducer underlyingProducer,//
EventHandler<T> eventHandler, //
CallbackHandler<T> callbackHandler, //
long queueTime, //
int batchSize) {
BlockingQueue<QueueItem<T>> queue, //
Encoder<T> serializer, //
SyncProducer underlyingProducer,//
EventHandler<T> eventHandler, //
CallbackHandler<T> callbackHandler, //
long queueTime, //
int batchSize) {
super();
this.threadName = threadName;
this.queue = queue;
Expand Down Expand Up @@ -117,11 +116,14 @@ private List<QueueItem<T>> processEvents() {
while (!shutdown) {
try {
QueueItem<T> item = queue.poll(Math.max(0, (lastSend + queueTime) - System.currentTimeMillis()), TimeUnit.MILLISECONDS);
long elapsed = System.currentTimeMillis()- lastSend;
long elapsed = System.currentTimeMillis() - lastSend;
boolean expired = item == null;
if (item != null) {
if (callbackHandler != null && callbackHandler.afterDequeuingExistingData(item) != null) {
events.addAll(callbackHandler.afterDequeuingExistingData(item));
if (callbackHandler != null) {
List<QueueItem<T>> items = callbackHandler.afterDequeuingExistingData(item);
if (items != null) {
events.addAll(items);
}
} else {
events.add(item);
}
Expand All @@ -147,8 +149,11 @@ private List<QueueItem<T>> processEvents() {
if (queue.size() > 0) {
throw new IllegalQueueStateException("Invalid queue state! After queue shutdown, " + queue.size() + " remaining items in the queue");
}
if (this.callbackHandler != null && this.callbackHandler.lastBatchBeforeClose() != null) {
events.addAll(callbackHandler.lastBatchBeforeClose());
if (this.callbackHandler != null) {
List<QueueItem<T>> remainEvents = this.callbackHandler.lastBatchBeforeClose();
if (remainEvents != null) {
events.addAll(remainEvents);
}
}
return events;
}
Expand Down

0 comments on commit 2e3ad5d

Please sign in to comment.