Skip to content

Commit

Permalink
MQ异步推送支持
Browse files Browse the repository at this point in the history
  • Loading branch information
nbcoolkid committed Dec 9, 2019
1 parent fab3a65 commit 64fdded
Show file tree
Hide file tree
Showing 11 changed files with 267 additions and 42 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package com.zhangln.push.wspush;

import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.zhangln.push.wspush.config.prop.AppProp;
import com.zhangln.push.wspush.entity.LogWsConnectEntity;
import com.zhangln.push.wspush.service.ILogWsConnectService;
import com.zhangln.push.wspush.websocket.service.WsService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
Expand All @@ -10,6 +13,8 @@
import org.springframework.context.annotation.Bean;
import org.springframework.util.AntPathMatcher;

import java.net.InetAddress;

/**
* @author zhangliuning
*/
Expand All @@ -19,9 +24,13 @@ public class WsPushApplication implements CommandLineRunner {

@Autowired
private ILogWsConnectService iLogWsConnectService;
@Autowired
private AppProp appProp;
@Autowired
private WsService wsService;

@Bean
public AntPathMatcher antPathMatcher(){
public AntPathMatcher antPathMatcher() {
return new AntPathMatcher();
}

Expand All @@ -31,7 +40,12 @@ public static void main(String[] args) {

@Override
public void run(String... args) throws Exception {

String ip = InetAddress.getLocalHost().getHostAddress();
Integer port = appProp.getWsPort();

// 全部下线
iLogWsConnectService.remove(new QueryWrapper<>());
iLogWsConnectService.remove(new QueryWrapper<LogWsConnectEntity>()
.eq(LogWsConnectEntity.INSTANCE_FLAG,wsService.getInstanceFlag()));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package com.zhangln.push.wspush.config;

import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
* @author sherry
* @description
* @date Create in 2019/12/9
* @modified By:
*/
@Configuration
public class MqConfig {

/**
* 通用推送队列
* @return
*/
@Bean
public Queue queue(){
return new Queue("push-common",false,false,true);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
public class AppProp {

private String active;
// ws所使用的端口
private Integer wsPort;
private List<String> ignoreUrl;

}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.zhangln.push.wspush.controller;

import com.alibaba.fastjson.JSONObject;
import com.zhangln.push.wspush.config.prop.AppProp;
import com.zhangln.push.wspush.controller.service.WsPushService;
import com.zhangln.push.wspush.entity.LogPushTaskEntity;
import com.zhangln.push.wspush.vo.HttpResVo;
Expand All @@ -18,9 +19,11 @@
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.net.UnknownHostException;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.List;
import java.util.UUID;

/**
* @author sherry
Expand Down Expand Up @@ -71,45 +74,13 @@ public ResponseEntity httpTest() {
* @return
*/
@PostMapping("/common")
public ResponseEntity commonPush(@Validated HttpWsPushCondition condition) {
public ResponseEntity commonPush(@Validated HttpWsPushCondition condition) throws UnknownHostException {
HttpResVo httpResVo = wsPushService.commonPushService(condition);
return ResponseEntity.ok(httpResVo);

// 查询channelId

List<String> channelIds = wsPushService.getChannelIds(condition);

if (!CollectionUtils.isEmpty(channelIds)) {
channelIds.stream()
.forEach(channelId -> {
try {
WsRespVo wsRespVo = WsRespVo.builder()
.id(condition.getPushId())
.date(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss").format(LocalDateTime.now()))
.pushType(condition.getPushType())
.code(200)
.msg("正常")
.data(condition.getContent())
.build();

String pushStr = JSONObject.toJSONString(wsRespVo);
log.info("向{}推送{}", channelId, pushStr);

UserChannelRelation.get(channelId)
.ifPresent(channel -> {
channel.writeAndFlush(new TextWebSocketFrame(pushStr));
wsPushService.savePushTask(condition, pushStr, channel);
});
} catch (Exception e) {
log.error(e.getMessage(), e);
}

});
} else {
return ResponseEntity.ok(HttpResVo.buildError("无有效客户端连接,推送失败"));
}
}


return ResponseEntity.ok(HttpResVo.buildSuccess(condition.getPushId()));
}

/**
* 查询推送情况
Expand Down
Original file line number Diff line number Diff line change
@@ -1,19 +1,33 @@
package com.zhangln.push.wspush.controller.service;

import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.zhangln.push.wspush.config.prop.AppProp;
import com.zhangln.push.wspush.entity.LogPushTaskEntity;
import com.zhangln.push.wspush.entity.LogWsConnectEntity;
import com.zhangln.push.wspush.service.ILogPushTaskService;
import com.zhangln.push.wspush.service.ILogWsConnectService;
import com.zhangln.push.wspush.vo.HttpResVo;
import com.zhangln.push.wspush.vo.HttpWsPushCondition;
import com.zhangln.push.wspush.vo.WsRespVo;
import com.zhangln.push.wspush.websocket.UserChannelRelation;
import com.zhangln.push.wspush.websocket.service.WsService;
import io.netty.channel.Channel;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
import org.springframework.validation.annotation.Validated;

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.List;
import java.util.UUID;
import java.util.stream.Collectors;

/**
Expand All @@ -31,13 +45,69 @@ public class WsPushService {
@Autowired
private ILogPushTaskService iLogPushTaskService;

@Autowired
private WsService wsService;

/**
* 通用推送
*
* @param condition
* @return
* @throws UnknownHostException
*/
public HttpResVo commonPushService(HttpWsPushCondition condition) throws UnknownHostException {
// 查询channelId

List<String> channelIds = this.getChannelIds(condition);

if (!CollectionUtils.isEmpty(channelIds)) {

// 由推送方的服务器来设置本次推送的id
// 整个id对应的是一次调用,而不是一次推送
// 每次具体的推送本身肯定是端对端的
condition.setPushId(UUID.randomUUID().toString());

channelIds.stream()
.forEach(channelId -> {
try {
WsRespVo wsRespVo = WsRespVo.builder()
.id(condition.getPushId())
.date(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss").format(LocalDateTime.now()))
.pushType(condition.getPushType())
.code(200)
.msg("正常")
.data(condition.getContent())
.build();

String pushStr = JSONObject.toJSONString(wsRespVo);
log.info("向{}推送{}", channelId, pushStr);

UserChannelRelation.get(channelId)
.ifPresent(channel -> {
channel.writeAndFlush(new TextWebSocketFrame(pushStr));
this.savePushTask(condition, pushStr, channel);
});
} catch (Exception e) {
log.error(e.getMessage(), e);
}

});
} else {
return HttpResVo.buildError("无有效客户端连接,推送失败");
}


return HttpResVo.buildSuccess(condition.getPushId());
}

/**
* 查询
*
* @param condition
* @return
*/
public List<String> getChannelIds(HttpWsPushCondition condition) {
public List<String> getChannelIds(HttpWsPushCondition condition) throws UnknownHostException {

List<LogWsConnectEntity> list = iLogWsConnectService.list(new QueryWrapper<LogWsConnectEntity>()
.eq(LogWsConnectEntity.STATUS, 1)
.eq(!StringUtils.isEmpty(condition.getClientType()), LogWsConnectEntity.CLIENT_TYPE, condition.getClientType())
Expand All @@ -46,6 +116,7 @@ public List<String> getChannelIds(HttpWsPushCondition condition) {
.eq(!StringUtils.isEmpty(condition.getGroup()), LogWsConnectEntity.GROUP, condition.getGroup())
.eq(!StringUtils.isEmpty(condition.getAreaCode()), LogWsConnectEntity.AREA_CODE, condition.getAreaCode())
.eq(!StringUtils.isEmpty(condition.getCountry()), LogWsConnectEntity.COUNTRY, condition.getCountry())
.eq(LogWsConnectEntity.INSTANCE_FLAG, wsService.getInstanceFlag())
);
if (!CollectionUtils.isEmpty(list)) {
List<String> ids = list.stream()
Expand Down Expand Up @@ -82,6 +153,7 @@ public void savePushTask(HttpWsPushCondition condition, String pushStr, Channel

/**
* 根据push_id查询推送详情
*
* @param pushId
* @return
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,10 @@ public class LogWsConnectEntity implements Serializable {

private LocalDateTime updateTime;

private String serverHost;
private String instanceFlag;
private Integer serverPort;


public static final String PK_ID = "pk_id";

Expand All @@ -100,5 +104,8 @@ public class LogWsConnectEntity implements Serializable {
public static final String CREATE_TIME = "create_time";

public static final String UPDATE_TIME = "update_time";
public static final String SERVER_HOST = "server_host";
public static final String SERVER_PORT = "server_port";
public static final String INSTANCE_FLAG = "instance_flag";

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package com.zhangln.push.wspush.mq;

import com.alibaba.fastjson.JSONObject;
import com.rabbitmq.client.Channel;
import com.zhangln.push.wspush.controller.service.WsPushService;
import com.zhangln.push.wspush.vo.HttpResVo;
import com.zhangln.push.wspush.vo.HttpWsPushCondition;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.io.ByteArrayInputStream;
import java.io.ObjectInputStream;

/**
* @author sherry
* @description
* @date Create in 2019/12/9
* @modified By:
*/

@Slf4j
@Component
public class PushListener {
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private WsPushService wsPushService;

/**
* 通用异步推送
* 为了避免重复推送,所以这里的MQ采用工作队列模式
*
* @param message
* @param channel
*/
@RabbitHandler
@RabbitListener(queues = "push-common")
public void commomPush(Message message, Channel channel) {
byte[] body = message.getBody();
try (ByteArrayInputStream bais = new ByteArrayInputStream(body);
ObjectInputStream oii = new ObjectInputStream(bais);) {
Object o = oii.readObject();
//再做一次强制类型转化,就可以拿到生产者发送的对象数据了
HttpWsPushCondition httpWsPushCondition = (HttpWsPushCondition) o;
log.info("处理一次异步推送:{}", JSONObject.toJSONString(httpWsPushCondition));
HttpResVo httpResVo = wsPushService.commonPushService(httpWsPushCondition);
// 广播本次推送结果
rabbitTemplate.convertAndSend("push-result", "", httpResVo);
} catch (Exception e) {
log.error(e.getMessage(), e);
} finally {
try {
// 返回确认状态
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
log.error("RabbitMQ 网关日志ACK异常", e);
}
}
}

/**
* 监听广播中的推送结果
*
* @param message
* @param channel
*/
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "push-result-1", durable = "false", autoDelete = "true"),
exchange = @Exchange(value = "push-result", type = ExchangeTypes.FANOUT)
))
public void pushResult(Message message, Channel channel) {
byte[] body = message.getBody();
try (ByteArrayInputStream bais = new ByteArrayInputStream(body);
ObjectInputStream oii = new ObjectInputStream(bais);) {
Object o = oii.readObject();
HttpResVo httpResVo = (HttpResVo) o;
log.info("接收到一次MQ异步推送结果:", JSONObject.toJSONString(httpResVo));
} catch (Exception e) {
log.error(e.getMessage(), e);
} finally {
try {
// 返回确认状态
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
log.error("RabbitMQ 网关日志ACK异常", e);
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
@ToString
public class HttpWsPushCondition implements Serializable {

@NotEmpty
private String pushId;
private String clientType = "";
private String app = "";
Expand Down
Loading

0 comments on commit 64fdded

Please sign in to comment.