diff --git a/core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala b/core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala deleted file mode 100644 index c3c7631659fbd..0000000000000 --- a/core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala +++ /dev/null @@ -1,173 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.integration - -import kafka.api.FetchRequestBuilder -import kafka.message.ByteBufferMessageSet -import kafka.server.{KafkaRequestHandler, KafkaConfig} -import org.apache.log4j.{Level, Logger} -import org.scalatest.junit.JUnit3Suite -import scala.collection._ -import kafka.utils._ -import kafka.common.{ErrorMapping, KafkaException, OffsetOutOfRangeException} -import kafka.producer.KeyedMessage -import org.junit.Assert.assertEquals - -/** - * End to end tests of the primitive apis against a local server - */ -class LazyInitProducerTest extends JUnit3Suite with ProducerConsumerTestHarness { - - val port = TestUtils.choosePort - val props = TestUtils.createBrokerConfig(0, port) - val config = new KafkaConfig(props) - val configs = List(config) - val requestHandlerLogger = Logger.getLogger(classOf[KafkaRequestHandler]) - - override def setUp() { - super.setUp - if(configs.size <= 0) - throw new KafkaException("Must suply at least one server config.") - - // temporarily set request handler logger to a higher level - requestHandlerLogger.setLevel(Level.FATAL) - } - - override def tearDown() { - // restore set request handler logger to a higher level - requestHandlerLogger.setLevel(Level.ERROR) - - super.tearDown - } - - def testProduceAndFetch() { - // send some messages - val topic = "test" - val sentMessages = List("hello", "there") - val producerData = sentMessages.map(m => new KeyedMessage[String, String](topic, topic, m)) - - producer.send(producerData:_*) - - TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0, 1000) - - var fetchedMessage: ByteBufferMessageSet = null - while(fetchedMessage == null || fetchedMessage.validBytes == 0) { - val fetched = consumer.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, 10000).build()) - fetchedMessage = fetched.messageSet(topic, 0) - } - assertEquals(sentMessages, fetchedMessage.map(m => Utils.readString(m.message.payload)).toList) - - // send an invalid offset - try { - val fetchedWithError = consumer.fetch(new FetchRequestBuilder().addFetch(topic, 0, -1, 10000).build()) - fetchedWithError.data.values.foreach(pdata => ErrorMapping.maybeThrowException(pdata.error)) - fail("Expected an OffsetOutOfRangeException exception to be thrown") - } catch { - case e: OffsetOutOfRangeException => - } - } - - def testProduceAndMultiFetch() { - // send some messages, with non-ordered topics - val topicOffsets = List(("test4", 0), ("test1", 0), ("test2", 0), ("test3", 0)); - { - val messages = new mutable.HashMap[String, Seq[String]] - val builder = new FetchRequestBuilder() - for( (topic, offset) <- topicOffsets) { - val producedData = List("a_" + topic, "b_" + topic) - messages += topic -> producedData - producer.send(producedData.map(m => new KeyedMessage[String, String](topic, topic, m)):_*) - TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0, 1000) - builder.addFetch(topic, offset, 0, 10000) - } - - // wait a bit for produced message to be available - val request = builder.build() - val response = consumer.fetch(request) - for( (topic, offset) <- topicOffsets) { - val fetched = response.messageSet(topic, offset) - assertEquals(messages(topic), fetched.map(m => Utils.readString(m.message.payload))) - } - } - - // send some invalid offsets - val builder = new FetchRequestBuilder() - for((topic, offset) <- topicOffsets) - builder.addFetch(topic, offset, -1, 10000) - - val request = builder.build() - val responses = consumer.fetch(request) - responses.data.values.foreach(pd => { - try { - ErrorMapping.maybeThrowException(pd.error) - fail("Expected an OffsetOutOfRangeException exception to be thrown") - } catch { - case e: OffsetOutOfRangeException => // this is good - } - }) - } - - def testMultiProduce() { - // send some messages - val topics = List("test1", "test2", "test3"); - val messages = new mutable.HashMap[String, Seq[String]] - val builder = new FetchRequestBuilder() - var produceList: List[KeyedMessage[String, String]] = Nil - for(topic <- topics) { - val set = List("a_" + topic, "b_" + topic) - messages += topic -> set - produceList ++= set.map(new KeyedMessage[String, String](topic, topic, _)) - builder.addFetch(topic, 0, 0, 10000) - } - producer.send(produceList: _*) - topics.foreach(topic => TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0, 1000)) - - // wait a bit for produced message to be available - val request = builder.build() - val response = consumer.fetch(request) - for(topic <- topics) { - val fetched = response.messageSet(topic, 0) - assertEquals(messages(topic), fetched.map(m => Utils.readString(m.message.payload))) - } - } - - def testMultiProduceResend() { - // send some messages - val topics = List("test1", "test2", "test3"); - val messages = new mutable.HashMap[String, Seq[String]] - val builder = new FetchRequestBuilder() - var produceList: List[KeyedMessage[String, String]] = Nil - for(topic <- topics) { - val set = List("a_" + topic, "b_" + topic) - messages += topic -> set - produceList ++= set.map(new KeyedMessage[String, String](topic, topic, _)) - builder.addFetch(topic, 0, 0, 10000) - } - producer.send(produceList: _*) - topics.foreach(topic => TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0, 1000)) - - producer.send(produceList: _*) - // wait a bit for produced message to be available - val request = builder.build() - val response = consumer.fetch(request) - for(topic <- topics) { - val topicMessages = response.messageSet(topic, 0) - assertEquals(messages(topic) ++ messages(topic), topicMessages.map(m => Utils.readString(m.message.payload))) - } - } -} diff --git a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala index a062f68c7f6b2..60a466ff5de78 100644 --- a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala +++ b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala @@ -94,7 +94,7 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with def testDefaultEncoderProducerAndFetchWithCompression() { val topic = "test-topic" val props = producer.config.props.props - props.put("compression", "true") + props.put("compression.codec", "gzip") val config = new ProducerConfig(props) val stringProducer1 = new Producer[String, String](config) @@ -178,14 +178,6 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with produceAndMultiFetch(noCompressionProducer) } - def testProduceAndMultiFetchWithCompression() { - val props = producer.config.props.props - props.put("compression", "true") - val config = new ProducerConfig(props) - val producerWithCompression = new Producer[String, String](config) - produceAndMultiFetch(producerWithCompression) - } - private def multiProduce(producer: Producer[String, String]) { val topics = Map("test4" -> 0, "test1" -> 0, "test2" -> 0, "test3" -> 0) createSimpleTopicsAndAwaitLeader(zkClient, topics.keys) @@ -215,14 +207,6 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with multiProduce(noCompressionProducer) } - def testMultiProduceWithCompression() { - val props = producer.config.props.props - props.put("compression", "true") - val config = new ProducerConfig(props) - val producerWithCompression = new Producer[String, String](config) - multiProduce(producerWithCompression) - } - def testConsumerEmptyTopic() { val newTopic = "new-topic" AdminUtils.createTopic(zkClient, newTopic, 1, 1) diff --git a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala index 18e355501808c..bdc6f01f2e2d8 100644 --- a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala +++ b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala @@ -355,42 +355,6 @@ class AsyncProducerTest extends JUnit3Suite { } } - @Test - def testBrokerListAndAsync() { - return - val props = TestUtils.getProducerConfig(TestUtils.getBrokerListStrFromConfigs(configs)) - props.put("producer.type", "async") - props.put("batch.num.messages", "5") - - val config = new ProducerConfig(props) - - val topic = "topic1" - val topic1Metadata = getTopicMetadata(topic, 0, 0, "localhost", 9092) - val topicPartitionInfos = new collection.mutable.HashMap[String, TopicMetadata] - topicPartitionInfos.put("topic1", topic1Metadata) - - val producerPool = new ProducerPool(config) - - val msgs = TestUtils.getMsgStrings(10) - - val handler = new DefaultEventHandler[String,String](config, - partitioner = null.asInstanceOf[Partitioner], - encoder = new StringEncoder, - keyEncoder = new StringEncoder, - producerPool = producerPool, - topicPartitionInfos = topicPartitionInfos) - - val producer = new Producer[String, String](config, handler) - try { - // send all 10 messages, should create 2 batches and 2 syncproducer calls - producer.send(msgs.map(m => new KeyedMessage[String,String](topic, m)): _*) - producer.close - - } catch { - case e: Exception => fail("Not expected", e) - } - } - @Test def testFailedSendRetryLogic() { val props = new Properties()