Skip to content

Commit

Permalink
KAFKA-1865 Add a flush() method to the producer.
Browse files Browse the repository at this point in the history
  • Loading branch information
jkreps committed Feb 28, 2015
1 parent 22ff9e9 commit 0636928
Show file tree
Hide file tree
Showing 18 changed files with 393 additions and 99 deletions.
10 changes: 3 additions & 7 deletions clients/src/main/java/org/apache/kafka/clients/Metadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -99,19 +99,15 @@ public synchronized int requestUpdate() {
/**
* Wait for metadata update until the current version is larger than the last version we know of
*/
public synchronized void awaitUpdate(final int lastVersion, final long maxWaitMs) {
public synchronized void awaitUpdate(final int lastVersion, final long maxWaitMs) throws InterruptedException {
if (maxWaitMs < 0) {
throw new IllegalArgumentException("Max time to wait for metadata updates should not be < 0 milli seconds");
}
long begin = System.currentTimeMillis();
long remainingWaitMs = maxWaitMs;
while (this.version <= lastVersion) {
try {
if (remainingWaitMs != 0) {
wait(remainingWaitMs);
}
} catch (InterruptedException e) { /* this is fine */
}
if (remainingWaitMs != 0)
wait(remainingWaitMs);
long elapsed = System.currentTimeMillis() - begin;
if (elapsed >= maxWaitMs)
throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.RecordTooLargeException;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.errors.TimeoutException;
Expand All @@ -55,10 +56,66 @@
/**
* A Kafka client that publishes records to the Kafka cluster.
* <P>
* The producer is <i>thread safe</i> and should generally be shared among all threads for best performance.
* The producer is <i>thread safe</i> and sharing a single producer instance across threads will generally be faster than
* having multiple instances.
* <p>
* The producer manages a single background thread that does I/O as well as a TCP connection to each of the brokers it
* needs to communicate with. Failure to close the producer after use will leak these resources.
* Here is a simple example of using the producer to send records with strings containing sequential numbers as the key/value
* pairs.
* <pre>
* {@code
* Properties props = new Properties();
* props.put("bootstrap.servers", "localhost:4242");
* props.put("acks", "all");
* props.put("retries", 0);
* props.put("batch.size", 16384);
* props.put("linger.ms", 1);
* props.put("buffer.memory", 33554432);
* props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
* props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
*
* Producer<String, String> producer = new KafkaProducer(props);
* for(int i = 0; i < 100; i++)
* producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i)));
*
* producer.close();
* }</pre>
* <p>
* The producer consists of a pool of buffer space that holds records that haven't yet been transmitted to the server
* as well as a background I/O thread that is responsible for turning these records into requests and transmitting them
* to the cluster. Failure to close the producer after use will leak these resources.
* <p>
* The {@link #send(ProducerRecord) send()} method is asynchronous. When called it adds the record to a buffer of pending record sends
* and immediately returns. This allows the producer to batch together individual records for efficiency.
* <p>
* The <code>acks</code> config controls the criteria under which requests are considered complete. The "all" setting
* we have specified will result in blocking on the full commit of the record, the slowest but most durable setting.
* <p>
* If the request fails, the producer can automatically retry, though since we have specified <code>retries</code>
* as 0 it won't. Enabling retries also opens up the possibility of duplicates (see the documentation on
* <a href="http://kafka.apache.org/documentation.html#semantics">message delivery semantics</a> for details).
* <p>
* The producer maintains buffers of unsent records for each partition. These buffers are of a size specified by
* the <code>batch.size</code> config. Making this larger can result in more batching, but requires more memory (since we will
* generally have one of these buffers for each active partition).
* <p>
* By default a buffer is available to send immediately even if there is additional unused space in the buffer. However if you
* want to reduce the number of requests you can set <code>linger.ms</code> to something greater than 0. This will
* instruct the producer to wait up to that number of milliseconds before sending a request in hope that more records will
* arrive to fill up the same batch. This is analogous to Nagle's algorithm in TCP. For example, in the code snippet above,
* likely all 100 records would be sent in a single request since we set our linger time to 1 millisecond. However this setting
* would add 1 millisecond of latency to our request waiting for more records to arrive if we didn't fill up the buffer. Note that
* records that arrive close together in time will generally batch together even with <code>linger.ms=0</code> so under heavy load
* batching will occur regardless of the linger configuration; however setting this to something larger than 0 can lead to fewer, more
* efficient requests when not under maximal load at the cost of a small amount of latency.
* <p>
* The <code>buffer.memory</code> controls the total amount of memory available to the producer for buffering. If records
* are sent faster than they can be transmitted to the server then this buffer space will be exhausted. When the buffer space is
* exhausted additional send calls will block. For uses where you want to avoid any blocking you can set <code>block.on.buffer.full=false</code> which
* will cause the send call to result in an exception.
* <p>
* The <code>key.serializer</code> and <code>value.serializer</code> instruct how to turn the key and value objects the user provides with
* their <code>ProducerRecord</code> into bytes. You can use the included {@link org.apache.kafka.common.serialization.ByteArraySerializer} or
* {@link org.apache.kafka.common.serialization.StringSerializer} for simple string or byte types.
*/
public class KafkaProducer<K, V> implements Producer<K, V> {

Expand Down Expand Up @@ -241,8 +298,8 @@ private static int parseAcks(String acksString) {
}

/**
* Asynchronously send a record to a topic. Equivalent to {@link #send(ProducerRecord, Callback) send(record, null)}
* @param record The record to be sent
* Asynchronously send a record to a topic. Equivalent to <code>send(record, null)</code>.
* See {@link #send(ProducerRecord, Callback)} for details.
*/
@Override
public Future<RecordMetadata> send(ProducerRecord<K, V> record) {
Expand All @@ -261,53 +318,59 @@ public Future<RecordMetadata> send(ProducerRecord<K, V> record) {
* <p>
* Since the send call is asynchronous it returns a {@link java.util.concurrent.Future Future} for the
* {@link RecordMetadata} that will be assigned to this record. Invoking {@link java.util.concurrent.Future#get()
* get()} on this future will result in the metadata for the record or throw any exception that occurred while
* sending the record.
* get()} on this future will block until the associated request completes and then return the metadata for the record
* or throw any exception that occurred while sending the record.
* <p>
* If you want to simulate a simple blocking call you can do the following:
* If you want to simulate a simple blocking call you can call the <code>get()</code> method immediately:
*
* <pre>{@code
* producer.send(new ProducerRecord<byte[],byte[]>("the-topic", "key".getBytes(), "value".getBytes())).get();
* <pre>
* {@code
* byte[] key = "key".getBytes();
* byte[] value = "value".getBytes();
* ProducerRecord<byte[],byte[]> record = new ProducerRecord<byte[],byte[]>("my-topic", key, value)
* producer.send(record).get();
* }</pre>
* <p>
* Those desiring fully non-blocking usage can make use of the {@link Callback} parameter to provide a callback that
* Fully non-blocking usage can make use of the {@link Callback} parameter to provide a callback that
* will be invoked when the request is complete.
*
* <pre>{@code
* ProducerRecord<byte[],byte[]> record = new ProducerRecord<byte[],byte[]>("the-topic", "key".getBytes(), "value".getBytes());
* producer.send(myRecord,
* new Callback() {
* public void onCompletion(RecordMetadata metadata, Exception e) {
* if(e != null)
* e.printStackTrace();
* System.out.println("The offset of the record we just sent is: " + metadata.offset());
* }
* });
* }</pre>
* <pre>
* {@code
* ProducerRecord<byte[],byte[]> record = new ProducerRecord<byte[],byte[]>("the-topic", key, value);
* producer.send(myRecord,
* new Callback() {
* public void onCompletion(RecordMetadata metadata, Exception e) {
* if(e != null)
* e.printStackTrace();
* System.out.println("The offset of the record we just sent is: " + metadata.offset());
* }
* });
* }
* </pre>
*
* Callbacks for records being sent to the same partition are guaranteed to execute in order. That is, in the
* following example <code>callback1</code> is guaranteed to execute before <code>callback2</code>:
*
* <pre>{@code
* <pre>
* {@code
* producer.send(new ProducerRecord<byte[],byte[]>(topic, partition, key1, value1), callback1);
* producer.send(new ProducerRecord<byte[],byte[]>(topic, partition, key2, value2), callback2);
* }</pre>
* }
* </pre>
* <p>
* Note that callbacks will generally execute in the I/O thread of the producer and so should be reasonably fast or
* they will delay the sending of messages from other threads. If you want to execute blocking or computationally
* expensive callbacks it is recommended to use your own {@link java.util.concurrent.Executor} in the callback body
* to parallelize processing.
* <p>
* The producer manages a buffer of records waiting to be sent. This buffer has a hard limit on it's size, which is
* controlled by the configuration <code>total.memory.bytes</code>. If <code>send()</code> is called faster than the
* I/O thread can transfer data to the brokers the buffer will eventually run out of space. The default behavior in
* this case is to block the send call until the I/O thread catches up and more buffer space is available. However
* in cases where non-blocking usage is desired the setting <code>block.on.buffer.full=false</code> will cause the
* producer to instead throw an exception when buffer memory is exhausted.
*
* @param record The record to send
* @param callback A user-supplied callback to execute when the record has been acknowledged by the server (null
* indicates no callback)
*
* @throws InterruptException If the thread is interrupted while blocked
* @throws SerializationException If the key or value are not valid objects given the configured serializers
* @throws BufferExhaustedException If <code>block.on.buffer.full=false</code> and the buffer is full.
*
*/
@Override
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
Expand Down Expand Up @@ -352,7 +415,7 @@ public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callbac
return new FutureFailure(e);
} catch (InterruptedException e) {
this.errors.record();
throw new KafkaException(e);
throw new InterruptException(e);
} catch (KafkaException e) {
this.errors.record();
throw e;
Expand All @@ -364,7 +427,7 @@ public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callbac
* @param topic The topic we want metadata for
* @param maxWaitMs The maximum time in ms for waiting on the metadata
*/
private void waitOnMetadata(String topic, long maxWaitMs) {
private void waitOnMetadata(String topic, long maxWaitMs) throws InterruptedException {
if (metadata.fetch().partitionsForTopic(topic) != null) {
return;
} else {
Expand Down Expand Up @@ -399,20 +462,72 @@ private void ensureValidRecordSize(int size) {
ProducerConfig.BUFFER_MEMORY_CONFIG +
" configuration.");
}

/**
* Invoking this method makes all buffered records immediately available to send (even if <code>linger.ms</code> is
* greater than 0) and blocks on the completion of the requests associated with these records. The post-condition
* of <code>flush()</code> is that any previously sent record will have completed (e.g. <code>Future.isDone() == true</code>).
* A request is considered completed when it is successfully acknowledged
* according to the <code>acks</code> configuration you have specified or else it results in an error.
* <p>
* Other threads can continue sending records while one thread is blocked waiting for a flush call to complete,
* however no guarantee is made about the completion of records sent after the flush call begins.
* <p>
* This method can be useful when consuming from some input system and producing into Kafka. The <code>flush()</code> call
* gives a convenient way to ensure all previously sent messages have actually completed.
* <p>
* This example shows how to consume from one Kafka topic and produce to another Kafka topic:
* <pre>
* {@code
* for(ConsumerRecord<String, String> record: consumer.poll(100))
* producer.send(new ProducerRecord("my-topic", record.key(), record.value());
* producer.flush();
* consumer.commit();
* }
* </pre>
*
* Note that the above example may drop records if the produce request fails. If we want to ensure that this does not occur
* we need to set <code>retries=&lt;large_number&gt;</code> in our config.
*
* @throws InterruptException If the thread is interrupted while blocked
*/
@Override
public void flush() {
log.trace("Flushing accumulated records in producer.");
this.accumulator.beginFlush();
this.sender.wakeup();
try {
this.accumulator.awaitFlushCompletion();
} catch (InterruptedException e) {
throw new InterruptException("Flush interrupted.", e);
}
}

/**
* Get the partition metadata for the give topic. This can be used for custom partitioning.
* @throws InterruptException If the thread is interrupted while blocked
*/
@Override
public List<PartitionInfo> partitionsFor(String topic) {
waitOnMetadata(topic, this.metadataFetchTimeoutMs);
try {
waitOnMetadata(topic, this.metadataFetchTimeoutMs);
} catch (InterruptedException e) {
throw new InterruptException(e);
}
return this.metadata.fetch().partitionsForTopic(topic);
}

/**
* Get the full set of internal metrics maintained by the producer.
*/
@Override
public Map<MetricName, ? extends Metric> metrics() {
return Collections.unmodifiableMap(this.metrics.metrics());
}

/**
* Close this producer. This method blocks until all in-flight requests complete.
* Close this producer. This method blocks until all previously sent requests complete.
* @throws InterruptException If the thread is interrupted while blocked
*/
@Override
public void close() {
Expand All @@ -421,7 +536,7 @@ public void close() {
try {
this.ioThread.join();
} catch (InterruptedException e) {
throw new KafkaException(e);
throw new InterruptException(e);
}
this.metrics.close();
this.keySerializer.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,11 @@ private long nextOffset(TopicPartition tp) {
return offset;
}
}

public synchronized void flush() {
while (!this.completions.isEmpty())
completeNext();
}

public List<PartitionInfo> partitionsFor(String topic) {
return this.cluster.partitionsForTopic(topic);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ public interface Producer<K, V> extends Closeable {
* Send a record and invoke the given callback when the record has been acknowledged by the server
*/
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback);

/**
* Flush any accumulated records from the producer. Blocks until all sends are complete.
*/
public void flush();

/**
* Get a list of partitions for the given topic for custom partition assignment. The partition metadata will change
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,15 +102,21 @@ public String toString() {

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (!(o instanceof ProducerRecord)) return false;
if (this == o)
return true;
else if (!(o instanceof ProducerRecord))
return false;

ProducerRecord that = (ProducerRecord) o;
ProducerRecord<?, ?> that = (ProducerRecord<?, ?>) o;

if (key != null ? !key.equals(that.key) : that.key != null) return false;
if (partition != null ? !partition.equals(that.partition) : that.partition != null) return false;
if (topic != null ? !topic.equals(that.topic) : that.topic != null) return false;
if (value != null ? !value.equals(that.value) : that.value != null) return false;
if (key != null ? !key.equals(that.key) : that.key != null)
return false;
else if (partition != null ? !partition.equals(that.partition) : that.partition != null)
return false;
else if (topic != null ? !topic.equals(that.topic) : that.topic != null)
return false;
else if (value != null ? !value.equals(that.value) : that.value != null)
return false;

return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,17 @@ public RecordMetadata get(long timeout, TimeUnit unit) throws InterruptedExcepti
return valueOrError();
}

private RecordMetadata valueOrError() throws ExecutionException {
RecordMetadata valueOrError() throws ExecutionException {
if (this.result.error() != null)
throw new ExecutionException(this.result.error());
else
return new RecordMetadata(result.topicPartition(), this.result.baseOffset(), this.relativeOffset);
return value();
}


RecordMetadata value() {
return new RecordMetadata(result.topicPartition(), this.result.baseOffset(), this.relativeOffset);
}

public long relativeOffset() {
return this.relativeOffset;
}
Expand Down
Loading

0 comments on commit 0636928

Please sign in to comment.