Skip to content

Commit

Permalink
kafka-1797; (missed parametric in a few files) add the serializer/des…
Browse files Browse the repository at this point in the history
…erializer api to the new java client; patched by Jun Rao
  • Loading branch information
junrao committed Jan 13, 2015
1 parent 8d5d459 commit 828b808
Show file tree
Hide file tree
Showing 5 changed files with 21 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
* By default this mock will synchronously complete each send call successfully. However it can be configured to allow
* the user to control the completion of the call and supply an optional error for the producer to throw.
*/
public class MockProducer implements Producer {
public class MockProducer implements Producer<byte[],byte[]> {

private final Cluster cluster;
private final Partitioner partitioner = new Partitioner();
Expand Down Expand Up @@ -90,7 +90,7 @@ public MockProducer() {
* @see #history()
*/
@Override
public synchronized Future<RecordMetadata> send(ProducerRecord record) {
public synchronized Future<RecordMetadata> send(ProducerRecord<byte[], byte[]> record) {
return send(record, null);
}

Expand All @@ -100,7 +100,7 @@ public synchronized Future<RecordMetadata> send(ProducerRecord record) {
* @see #history()
*/
@Override
public synchronized Future<RecordMetadata> send(ProducerRecord record, Callback callback) {
public synchronized Future<RecordMetadata> send(ProducerRecord<byte[],byte[]> record, Callback callback) {
int partition = 0;
if (this.cluster.partitionsForTopic(record.topic()) != null)
partition = partitioner.partition(record, this.cluster);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,12 @@ public static void main(String[] args) throws Exception {
throw new IllegalArgumentException("Invalid property: " + args[i]);
props.put(pieces[0], pieces[1]);
}
KafkaProducer producer = new KafkaProducer(props);
KafkaProducer<byte[], byte[]> producer = new KafkaProducer<byte[], byte[]>(props);

/* setup perf test */
byte[] payload = new byte[recordSize];
Arrays.fill(payload, (byte) 1);
ProducerRecord record = new ProducerRecord(topicName, payload);
ProducerRecord<byte[], byte[]> record = new ProducerRecord<byte[], byte[]>(topicName, payload);
long sleepTime = NS_PER_SEC / throughput;
long sleepDeficitNs = 0;
Stats stats = new Stats(numRecords, 5000);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,6 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

import org.apache.kafka.clients.producer.MockProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.junit.Test;

public class MockProducerTest {
Expand All @@ -37,7 +34,7 @@ public class MockProducerTest {
@Test
public void testAutoCompleteMock() throws Exception {
MockProducer producer = new MockProducer(true);
ProducerRecord record = new ProducerRecord(topic, "key".getBytes(), "value".getBytes());
ProducerRecord<byte[], byte[]> record = new ProducerRecord<byte[], byte[]>(topic, "key".getBytes(), "value".getBytes());
Future<RecordMetadata> metadata = producer.send(record);
assertTrue("Send should be immediately complete", metadata.isDone());
assertFalse("Send should be successful", isError(metadata));
Expand All @@ -51,8 +48,8 @@ public void testAutoCompleteMock() throws Exception {
@Test
public void testManualCompletion() throws Exception {
MockProducer producer = new MockProducer(false);
ProducerRecord record1 = new ProducerRecord("topic", "key1".getBytes(), "value1".getBytes());
ProducerRecord record2 = new ProducerRecord("topic", "key2".getBytes(), "value2".getBytes());
ProducerRecord<byte[], byte[]> record1 = new ProducerRecord<byte[], byte[]>("topic", "key1".getBytes(), "value1".getBytes());
ProducerRecord<byte[], byte[]> record2 = new ProducerRecord<byte[], byte[]>("topic", "key2".getBytes(), "value2".getBytes());
Future<RecordMetadata> md1 = producer.send(record1);
assertFalse("Send shouldn't have completed", md1.isDone());
Future<RecordMetadata> md2 = producer.send(record2);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,18 @@
*/
package org.apache.kafka.clients.producer;

import static java.util.Arrays.asList;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertTrue;

import java.util.List;


import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.internals.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.junit.Test;

import java.util.List;

import static java.util.Arrays.asList;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

public class PartitionerTest {

private byte[] key = "key".getBytes();
Expand All @@ -50,22 +47,22 @@ public class PartitionerTest {
public void testUserSuppliedPartitioning() {
assertEquals("If the user supplies a partition we should use it.",
0,
partitioner.partition(new ProducerRecord("test", 0, key, value), cluster));
partitioner.partition(new ProducerRecord<byte[], byte[]>("test", 0, key, value), cluster));
}

@Test
public void testKeyPartitionIsStable() {
int partition = partitioner.partition(new ProducerRecord("test", key, value), cluster);
int partition = partitioner.partition(new ProducerRecord<byte[], byte[]>("test", key, value), cluster);
assertEquals("Same key should yield same partition",
partition,
partitioner.partition(new ProducerRecord("test", key, "value2".getBytes()), cluster));
partitioner.partition(new ProducerRecord<byte[], byte[]>("test", key, "value2".getBytes()), cluster));
}

@Test
public void testRoundRobinIsStable() {
int startPart = partitioner.partition(new ProducerRecord("test", value), cluster);
int startPart = partitioner.partition(new ProducerRecord<byte[], byte[]>("test", value), cluster);
for (int i = 1; i <= 100; i++) {
int partition = partitioner.partition(new ProducerRecord("test", value), cluster);
int partition = partitioner.partition(new ProducerRecord<byte[], byte[]>("test", value), cluster);
assertEquals("Should yield a different partition each call with round-robin partitioner",
partition, (startPart + i) % 2);
}
Expand All @@ -74,7 +71,7 @@ public void testRoundRobinIsStable() {
@Test
public void testRoundRobinWithDownNode() {
for (int i = 0; i < partitions.size(); i++) {
int part = partitioner.partition(new ProducerRecord("test", value), cluster);
int part = partitioner.partition(new ProducerRecord<byte[], byte[]>("test", value), cluster);
assertTrue("We should never choose a leader-less node in round robin", part >= 0 && part < 2);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ class ProducerFailureHandlingTest extends JUnit3Suite with KafkaServerTestHarnes

@Test(expected = classOf[InvalidTopicException])
def testCannotSendToInternalTopic() {
producer1.send(new ProducerRecord(Topic.InternalTopics.head, "test".getBytes, "test".getBytes)).get
producer1.send(new ProducerRecord[Array[Byte],Array[Byte]](Topic.InternalTopics.head, "test".getBytes, "test".getBytes)).get
}

@Test
Expand Down

0 comments on commit 828b808

Please sign in to comment.