Skip to content

Commit 11653ce

Browse files
committed
ROCKETMQ-80 Add batch feature closes apache#53
1 parent 087d989 commit 11653ce

File tree

31 files changed

+1464
-206
lines changed

31 files changed

+1464
-206
lines changed

broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -374,9 +374,11 @@ public void registerProcessor() {
374374

375375
this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);
376376
this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);
377+
this.remotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor);
377378
this.remotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);
378379
this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);
379380
this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);
381+
this.fastRemotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor);
380382
this.fastRemotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);
381383
/**
382384
* PullMessageProcessor

broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,6 @@
1717
package org.apache.rocketmq.broker.processor;
1818

1919
import io.netty.channel.ChannelHandlerContext;
20-
import java.net.InetSocketAddress;
21-
import java.net.SocketAddress;
22-
import java.util.List;
23-
import java.util.Map;
24-
import java.util.Random;
2520
import org.apache.rocketmq.broker.BrokerController;
2621
import org.apache.rocketmq.broker.mqtrace.SendMessageContext;
2722
import org.apache.rocketmq.broker.mqtrace.SendMessageHook;
@@ -51,6 +46,12 @@
5146
import org.slf4j.Logger;
5247
import org.slf4j.LoggerFactory;
5348

49+
import java.net.InetSocketAddress;
50+
import java.net.SocketAddress;
51+
import java.util.List;
52+
import java.util.Map;
53+
import java.util.Random;
54+
5455
public abstract class AbstractSendMessageProcessor implements NettyRequestProcessor {
5556
protected static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
5657

@@ -279,6 +280,7 @@ protected SendMessageRequestHeader parseRequestHeader(RemotingCommand request)
279280
SendMessageRequestHeaderV2 requestHeaderV2 = null;
280281
SendMessageRequestHeader requestHeader = null;
281282
switch (request.getCode()) {
283+
case RequestCode.SEND_BATCH_MESSAGE:
282284
case RequestCode.SEND_MESSAGE_V2:
283285
requestHeaderV2 =
284286
(SendMessageRequestHeaderV2) request

broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java

Lines changed: 222 additions & 128 deletions
Large diffs are not rendered by default.

client/src/main/java/org/apache/rocketmq/client/Validators.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@ public static void checkMessage(Message msg, DefaultMQProducer defaultMQProducer
9595
}
9696
// topic
9797
Validators.checkTopic(msg.getTopic());
98+
9899
// body
99100
if (null == msg.getBody()) {
100101
throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body is null");

client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java

Lines changed: 33 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,14 @@
1818

1919
import java.io.UnsupportedEncodingException;
2020
import java.nio.ByteBuffer;
21-
import java.util.ArrayList;
22-
import java.util.Collections;
23-
import java.util.HashMap;
24-
import java.util.Iterator;
2521
import java.util.List;
2622
import java.util.Map;
2723
import java.util.Properties;
2824
import java.util.Set;
25+
import java.util.Iterator;
26+
import java.util.Collections;
27+
import java.util.ArrayList;
28+
import java.util.HashMap;
2929
import java.util.concurrent.atomic.AtomicInteger;
3030
import org.apache.rocketmq.client.ClientConfig;
3131
import org.apache.rocketmq.client.consumer.PullCallback;
@@ -50,10 +50,11 @@
5050
import org.apache.rocketmq.common.admin.TopicStatsTable;
5151
import org.apache.rocketmq.common.message.Message;
5252
import org.apache.rocketmq.common.message.MessageClientIDSetter;
53-
import org.apache.rocketmq.common.message.MessageConst;
54-
import org.apache.rocketmq.common.message.MessageDecoder;
5553
import org.apache.rocketmq.common.message.MessageExt;
5654
import org.apache.rocketmq.common.message.MessageQueue;
55+
import org.apache.rocketmq.common.message.MessageConst;
56+
import org.apache.rocketmq.common.message.MessageDecoder;
57+
import org.apache.rocketmq.common.message.MessageBatch;
5758
import org.apache.rocketmq.common.namesrv.TopAddressing;
5859
import org.apache.rocketmq.common.protocol.RequestCode;
5960
import org.apache.rocketmq.common.protocol.ResponseCode;
@@ -147,6 +148,7 @@
147148
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
148149
import org.slf4j.Logger;
149150

151+
150152
public class MQClientAPIImpl {
151153

152154
private final static Logger log = ClientLogger.getLog();
@@ -278,14 +280,14 @@ public void createTopic(final String addr, final String defaultTopic, final Topi
278280
}
279281

280282
public SendResult sendMessage(//
281-
final String addr, // 1
282-
final String brokerName, // 2
283-
final Message msg, // 3
284-
final SendMessageRequestHeader requestHeader, // 4
285-
final long timeoutMillis, // 5
286-
final CommunicationMode communicationMode, // 6
287-
final SendMessageContext context, // 7
288-
final DefaultMQProducerImpl producer // 8
283+
final String addr, // 1
284+
final String brokerName, // 2
285+
final Message msg, // 3
286+
final SendMessageRequestHeader requestHeader, // 4
287+
final long timeoutMillis, // 5
288+
final CommunicationMode communicationMode, // 6
289+
final SendMessageContext context, // 7
290+
final DefaultMQProducerImpl producer // 8
289291
) throws RemotingException, MQBrokerException, InterruptedException {
290292
return sendMessage(addr, brokerName, msg, requestHeader, timeoutMillis, communicationMode, null, null, null, 0, context, producer);
291293
}
@@ -305,9 +307,9 @@ public SendResult sendMessage(//
305307
final DefaultMQProducerImpl producer // 12
306308
) throws RemotingException, MQBrokerException, InterruptedException {
307309
RemotingCommand request = null;
308-
if (sendSmartMsg) {
310+
if (sendSmartMsg || msg instanceof MessageBatch) {
309311
SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);
310-
request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE_V2, requestHeaderV2);
312+
request = RemotingCommand.createRequestCommand(msg instanceof MessageBatch ? RequestCode.SEND_BATCH_MESSAGE : RequestCode.SEND_MESSAGE_V2, requestHeaderV2);
311313
} else {
312314
request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader);
313315
}
@@ -334,11 +336,11 @@ public SendResult sendMessage(//
334336
}
335337

336338
private SendResult sendMessageSync(//
337-
final String addr, //
338-
final String brokerName, //
339-
final Message msg, //
340-
final long timeoutMillis, //
341-
final RemotingCommand request//
339+
final String addr, //
340+
final String brokerName, //
341+
final Message msg, //
342+
final long timeoutMillis, //
343+
final RemotingCommand request//
342344
) throws RemotingException, MQBrokerException, InterruptedException {
343345
RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
344346
assert response != null;
@@ -507,8 +509,16 @@ private SendResult processSendResponse(//
507509

508510
MessageQueue messageQueue = new MessageQueue(msg.getTopic(), brokerName, responseHeader.getQueueId());
509511

512+
String uniqMsgId = MessageClientIDSetter.getUniqID(msg);
513+
if (msg instanceof MessageBatch) {
514+
StringBuilder sb = new StringBuilder();
515+
for (Message message : (MessageBatch) msg) {
516+
sb.append(sb.length() == 0 ? "" : ",").append(MessageClientIDSetter.getUniqID(message));
517+
}
518+
uniqMsgId = sb.toString();
519+
}
510520
SendResult sendResult = new SendResult(sendStatus,
511-
MessageClientIDSetter.getUniqID(msg),
521+
uniqMsgId,
512522
responseHeader.getMsgId(), messageQueue, responseHeader.getQueueOffset());
513523
sendResult.setTransactionId(responseHeader.getTransactionId());
514524
String regionId = response.getExtFields().get(MessageConst.PROPERTY_MSG_REGION);
@@ -1452,7 +1462,7 @@ public Map<MessageQueue, Long> invokeBrokerToResetOffset(final String addr, fina
14521462
}
14531463

14541464
public Map<String, Map<MessageQueue, Long>> invokeBrokerToGetConsumerStatus(final String addr, final String topic, final String group,
1455-
final String clientAddr, final long timeoutMillis) throws RemotingException, MQClientException, InterruptedException {
1465+
final String clientAddr, final long timeoutMillis) throws RemotingException, MQClientException, InterruptedException {
14561466
GetConsumerStatusRequestHeader requestHeader = new GetConsumerStatusRequestHeader();
14571467
requestHeader.setTopic(topic);
14581468
requestHeader.setGroup(group);

client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java

Lines changed: 19 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,16 @@
3030
import java.util.concurrent.LinkedBlockingQueue;
3131
import java.util.concurrent.ThreadPoolExecutor;
3232
import java.util.concurrent.TimeUnit;
33+
import org.apache.rocketmq.common.message.Message;
34+
import org.apache.rocketmq.common.message.MessageClientIDSetter;
35+
import org.apache.rocketmq.common.message.MessageExt;
36+
import org.apache.rocketmq.common.message.MessageQueue;
37+
import org.apache.rocketmq.common.message.MessageConst;
38+
import org.apache.rocketmq.common.message.MessageDecoder;
39+
import org.apache.rocketmq.common.message.MessageBatch;
40+
import org.apache.rocketmq.common.message.MessageAccessor;
41+
import org.apache.rocketmq.common.message.MessageType;
42+
import org.apache.rocketmq.common.message.MessageId;
3343
import org.apache.rocketmq.client.QueryResult;
3444
import org.apache.rocketmq.client.Validators;
3545
import org.apache.rocketmq.client.common.ClientErrorCode;
@@ -58,15 +68,6 @@
5868
import org.apache.rocketmq.common.ServiceState;
5969
import org.apache.rocketmq.common.UtilAll;
6070
import org.apache.rocketmq.common.help.FAQUrl;
61-
import org.apache.rocketmq.common.message.Message;
62-
import org.apache.rocketmq.common.message.MessageAccessor;
63-
import org.apache.rocketmq.common.message.MessageClientIDSetter;
64-
import org.apache.rocketmq.common.message.MessageConst;
65-
import org.apache.rocketmq.common.message.MessageDecoder;
66-
import org.apache.rocketmq.common.message.MessageExt;
67-
import org.apache.rocketmq.common.message.MessageId;
68-
import org.apache.rocketmq.common.message.MessageQueue;
69-
import org.apache.rocketmq.common.message.MessageType;
7071
import org.apache.rocketmq.common.protocol.ResponseCode;
7172
import org.apache.rocketmq.common.protocol.header.CheckTransactionStateRequestHeader;
7273
import org.apache.rocketmq.common.protocol.header.EndTransactionRequestHeader;
@@ -595,8 +596,10 @@ private SendResult sendKernelImpl(final Message msg, //
595596

596597
byte[] prevBody = msg.getBody();
597598
try {
598-
599-
MessageClientIDSetter.setUniqID(msg);
599+
//for MessageBatch,ID has been set in the generating process
600+
if (!(msg instanceof MessageBatch)) {
601+
MessageClientIDSetter.setUniqID(msg);
602+
}
600603

601604
int sysFlag = 0;
602605
if (this.tryToCompressMessage(msg)) {
@@ -652,6 +655,7 @@ private SendResult sendKernelImpl(final Message msg, //
652655
requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
653656
requestHeader.setReconsumeTimes(0);
654657
requestHeader.setUnitMode(this.isUnitMode());
658+
requestHeader.setBatch(msg instanceof MessageBatch);
655659
if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
656660
String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);
657661
if (reconsumeTimes != null) {
@@ -737,6 +741,10 @@ public MQClientInstance getmQClientFactory() {
737741
}
738742

739743
private boolean tryToCompressMessage(final Message msg) {
744+
if (msg instanceof MessageBatch) {
745+
//batch dose not support compressing right now
746+
return false;
747+
}
740748
byte[] body = msg.getBody();
741749
if (body != null) {
742750
if (body.length >= this.defaultMQProducer.getCompressMsgBodyOverHowmuch()) {

client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,18 @@
1616
*/
1717
package org.apache.rocketmq.client.producer;
1818

19+
import java.util.Collection;
1920
import java.util.List;
2021
import org.apache.rocketmq.client.ClientConfig;
2122
import org.apache.rocketmq.client.QueryResult;
23+
import org.apache.rocketmq.client.Validators;
2224
import org.apache.rocketmq.client.exception.MQBrokerException;
2325
import org.apache.rocketmq.client.exception.MQClientException;
2426
import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
2527
import org.apache.rocketmq.common.MixAll;
2628
import org.apache.rocketmq.common.message.Message;
29+
import org.apache.rocketmq.common.message.MessageBatch;
30+
import org.apache.rocketmq.common.message.MessageClientIDSetter;
2731
import org.apache.rocketmq.common.message.MessageDecoder;
2832
import org.apache.rocketmq.common.message.MessageExt;
2933
import org.apache.rocketmq.common.message.MessageId;
@@ -577,6 +581,40 @@ public MessageExt viewMessage(String topic, String msgId) throws RemotingExcepti
577581
return this.defaultMQProducerImpl.queryMessageByUniqKey(topic, msgId);
578582
}
579583

584+
@Override
585+
public SendResult send(Collection<Message> msgs) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
586+
return this.defaultMQProducerImpl.send(batch(msgs));
587+
}
588+
589+
@Override
590+
public SendResult send(Collection<Message> msgs, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
591+
return this.defaultMQProducerImpl.send(batch(msgs), timeout);
592+
}
593+
594+
@Override
595+
public SendResult send(Collection<Message> msgs, MessageQueue messageQueue) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
596+
return this.defaultMQProducerImpl.send(batch(msgs), messageQueue);
597+
}
598+
599+
@Override
600+
public SendResult send(Collection<Message> msgs, MessageQueue messageQueue, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
601+
return this.defaultMQProducerImpl.send(batch(msgs), messageQueue, timeout);
602+
}
603+
604+
private MessageBatch batch(Collection<Message> msgs) throws MQClientException {
605+
MessageBatch msgBatch;
606+
try {
607+
msgBatch = MessageBatch.generateFromList(msgs);
608+
for (Message message : msgBatch) {
609+
Validators.checkMessage(message, this);
610+
MessageClientIDSetter.setUniqID(message);
611+
}
612+
msgBatch.setBody(msgBatch.encode());
613+
} catch (Exception e) {
614+
throw new MQClientException("Failed to initiate the MessageBatch", e);
615+
}
616+
return msgBatch;
617+
}
580618
public String getProducerGroup() {
581619
return producerGroup;
582620
}

client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
*/
1717
package org.apache.rocketmq.client.producer;
1818

19+
import java.util.Collection;
1920
import java.util.List;
2021
import org.apache.rocketmq.client.MQAdmin;
2122
import org.apache.rocketmq.client.exception.MQBrokerException;
@@ -81,4 +82,17 @@ void sendOneway(final Message msg, final MessageQueueSelector selector, final Ob
8182

8283
TransactionSendResult sendMessageInTransaction(final Message msg,
8384
final LocalTransactionExecuter tranExecuter, final Object arg) throws MQClientException;
85+
86+
//for batch
87+
SendResult send(final Collection<Message> msgs) throws MQClientException, RemotingException, MQBrokerException,
88+
InterruptedException;
89+
90+
SendResult send(final Collection<Message> msgs, final long timeout) throws MQClientException,
91+
RemotingException, MQBrokerException, InterruptedException;
92+
93+
SendResult send(final Collection<Message> msgs, final MessageQueue mq) throws MQClientException,
94+
RemotingException, MQBrokerException, InterruptedException;
95+
96+
SendResult send(final Collection<Message> msgs, final MessageQueue mq, final long timeout)
97+
throws MQClientException, RemotingException, MQBrokerException, InterruptedException;
8498
}

common/src/main/java/org/apache/rocketmq/common/TopicFilterType.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,4 +19,5 @@
1919
public enum TopicFilterType {
2020
SINGLE_TAG,
2121
MULTI_TAG
22+
2223
}

0 commit comments

Comments
 (0)