Skip to content

Commit

Permalink
rocketmq 延迟队列
Browse files Browse the repository at this point in the history
  • Loading branch information
huankai committed Dec 2, 2019
1 parent 5e75c86 commit 7a72a3d
Show file tree
Hide file tree
Showing 7 changed files with 60 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;

import java.time.LocalDateTime;
import java.util.List;


Expand All @@ -31,8 +32,9 @@ public static void main(String[] args) throws MQClientException {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.println(LocalDateTime.now());
System.out.println(" Receive Message:" + new String(msg.getBody()) +
", 延迟时间:" + (System.currentTimeMillis() - msg.getStoreTimestamp()));
", " + (System.currentTimeMillis() - msg.getStoreTimestamp()) + "ms later");
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;

import java.time.LocalDateTime;

/**
* 延迟消息生产者
*
Expand All @@ -30,8 +32,9 @@ public static void main(String[] args) throws MQClientException, RemotingExcepti
只能设置等级 ,预设值的延迟时间间隔为:
1s、 5s、 10s、 30s、 1m、 2m、 3m、 4m、 5m、 6m、 7m、 8m、 9m、 10m、 20m、 30m、 1h、 2h
*/
msg.setDelayTimeLevel(3);
msg.setDelayTimeLevel(4);// 延迟等级从 1 开始 ,第一次为 1s ,依次类推
producer.send(msg);
System.out.println(LocalDateTime.now());
producer.shutdown();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package com.hk.rocketmq.listener;

import com.hk.commons.util.date.DatePattern;
import com.hk.commons.util.date.DateTimeUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;

import java.time.LocalDateTime;

/**
* 消费者接受延迟消息
*
* @author kevin
* @date 2019-11-22 16:27
*/
@Slf4j
@Service
@RocketMQMessageListener(topic = "delay-topic", consumerGroup = "delay-group")
public class DelayConsumerListener implements RocketMQListener<String> {

@Override
public void onMessage(String message) {
log.debug("receive delay Message:{},currentDate:{}",
message, DateTimeUtils.localDateTimeToString(LocalDateTime.now(), DatePattern.YYYY_MM_DD_HH_MM_SS));
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.hk.rocketmq;

import com.hk.commons.util.date.DatePattern;
import com.hk.commons.util.date.DateTimeUtils;
import lombok.AllArgsConstructor;
import lombok.Data;
import org.apache.rocketmq.client.producer.SendResult;
Expand All @@ -18,6 +20,7 @@

import java.io.Serializable;
import java.math.BigDecimal;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -89,7 +92,20 @@ public void run(String... args) throws Exception {
/*
7、 发送事物消息
*/
testTransaction();
// testTransaction();

/*
8、发送延迟消息
参数一: 发送的目的地
参数二:  发送的消息
参数三:  发送超时时间,单位: 毫秒
参数四:  延迟等级、小于等于 0 是不延迟 ,
延迟等级为: 1s、 5s、 10s、 30s、 1m、 2m、 3m、 4m、 5m、 6m、 7m、 8m、 9m、 10m、 20m、 30m、 1h、 2h
设置为 1,延迟1s ,依次类推
*/
rocketMQTemplate.syncSend("delay-topic",
MessageBuilder.withPayload("sync delay Message" +
DateTimeUtils.localDateTimeToString(LocalDateTime.now(), DatePattern.YYYY_MM_DD_HH_MM_SS)).build(), 3000, 4);
}


Expand All @@ -100,11 +116,11 @@ private void testTransaction() throws MessagingException {
for (int i = 0; i < 10; i++) {
try {

org.springframework.messaging.Message msg = MessageBuilder.withPayload("Hello RocketMQ " + i).
org.springframework.messaging.Message<String> msg = MessageBuilder.withPayload("Hello RocketMQ " + i).
setHeader(RocketMQHeaders.TRANSACTION_ID, "KEY_" + i).build();
SendResult sendResult = rocketMQTemplate.sendMessageInTransaction(TX_GROUP_NAME,
"transaction-topic:" + tags[i % tags.length], msg, null);
System.out.printf("------ send Transactional msg body = %s , sendResult=%s %n",
System.out.printf("------ send Transactional msg bod y = %s , sendResult=%s %n",
msg.getPayload(), sendResult.getSendStatus());

Thread.sleep(10);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public void run(String... args) throws Exception {
.withPayload(new MessageVo(IDGenerator.UUID_32.generate(), "delay message ..." + LocalDateTime.now()))
// 添加延迟队列头等级,只能设置等级 ,预设值的延迟时间间隔为:
// 1s、 5s、 10s、 30s、 1m、 2m、 3m、 4m、 5m、 6m、 7m、 8m、 9m、 10m、 20m、 30m、 1h、 2h
.setHeader(MessageConst.PROPERTY_DELAY_TIME_LEVEL, 2)// TODO 延迟队列好像没有生效
.setHeader(MessageConst.PROPERTY_DELAY_TIME_LEVEL, 1)
.build());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ spring:
delay-stream-output:
# 生产者配置
producer:
# 延迟消息不支持事物
transactional: false
# 延迟消息必须使用同步发送
sync: true
# 生产者组
group: delay-stream-output-group
bindings:
Expand Down
1 change: 1 addition & 0 deletions hk-util-example/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
<artifactId>netty-all</artifactId>
<version>4.1.39.Final</version>
</dependency>

<dependency>
<groupId>com.hk</groupId>
<artifactId>hk-core-httpclient</artifactId>
Expand Down

0 comments on commit 7a72a3d

Please sign in to comment.