Skip to content

Commit

Permalink
Improve RocketMQ integration example (alibaba#1757)
Browse files Browse the repository at this point in the history
- The demo was unable to run and stop because of missing namesrv configuration, and now fixed.
  • Loading branch information
PeineLiang authored Feb 3, 2021
1 parent bf83f05 commit fe59485
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ public final class Constants {

public static final String TEST_GROUP_NAME = "sentinel-group";
public static final String TEST_TOPIC_NAME = "SentinelTopicTest";
public static final String TEST_NAMESRV_ADDR = "127.0.0.1:9876";

private Constants() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -55,10 +56,16 @@ public static void main(String[] args) throws MQClientException {
initFlowControlRule();

DefaultMQPullConsumer consumer = new DefaultMQPullConsumer(Constants.TEST_GROUP_NAME);

consumer.setNamesrvAddr(Constants.TEST_NAMESRV_ADDR);
consumer.start();

Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues(Constants.TEST_TOPIC_NAME);
Set<MessageQueue> mqs = new HashSet<>();
try {
mqs = consumer.fetchSubscribeMessageQueues(Constants.TEST_TOPIC_NAME);
} catch (Exception e) {
e.printStackTrace();
}

for (MessageQueue mq : mqs) {
System.out.printf("Consuming messages from the queue: %s%n", mq);
SINGLE_MQ:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,23 @@ public class SyncProducer {

public static void main(String[] args) throws Exception {
// Instantiate with a producer group name.
DefaultMQProducer producer = new
DefaultMQProducer(Constants.TEST_GROUP_NAME);
DefaultMQProducer producer = new DefaultMQProducer(Constants.TEST_GROUP_NAME);
producer.setNamesrvAddr(Constants.TEST_NAMESRV_ADDR);
// Launch the instance.
producer.start();
for (int i = 0; i < 1000; i++) {
// Create a message instance, specifying topic, tag and message body.
Message msg = new Message(Constants.TEST_TOPIC_NAME, "TagA",
("Hello RocketMQ From Sentinel " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
);
// Call send message to deliver message to one of brokers.
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);

try {
// Call send message to deliver message to one of brokers.
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
} catch (Exception e) {
e.printStackTrace();
}
}
// Shut down once the producer instance is not longer in use.
producer.shutdown();
Expand Down

0 comments on commit fe59485

Please sign in to comment.