Skip to content

Commit

Permalink
重新实现短信验证码的 Disruptor 及 Disruptor 案例
Browse files Browse the repository at this point in the history
  • Loading branch information
Lusifer committed May 26, 2017
1 parent abf9e82 commit 2966f91
Show file tree
Hide file tree
Showing 17 changed files with 376 additions and 191 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package com.funtl.leesite.common.sms;

import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
Expand All @@ -14,19 +12,14 @@
import com.aliyun.mns.model.MessageAttributes;
import com.aliyun.mns.model.RawTopicMessage;
import com.aliyun.mns.model.TopicMessage;
import com.funtl.leesite.common.sms.consumer.SmsValidateEventPutCacheHandler;
import com.funtl.leesite.common.sms.consumer.SmsValidateEventRemoveCacheHandler;
import com.funtl.leesite.common.sms.publisher.SmsValidateEvent;
import com.funtl.leesite.common.sms.publisher.SmsValidateEventFactory;
import com.funtl.leesite.common.sms.publisher.SmsValidateEventPublisher;
import com.funtl.leesite.common.utils.CacheUtils;
import com.funtl.leesite.common.utils.ExecutorUtils;
import com.funtl.leesite.common.utils.SpringContextHolder;
import com.funtl.leesite.modules.config.entity.ConfigSms;
import com.funtl.leesite.modules.config.entity.ConfigSmsTemplate;
import com.funtl.leesite.modules.config.service.ConfigSmsService;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -214,18 +207,12 @@ public String sendValidate(String phoneNumber, String templateCode, Map<String,
logger.debug("MessageId: {}", ret.getMessageId());
logger.debug("MessageMD5: {}", ret.getMessageBodyMD5());

// 生产者准备
Disruptor<SmsValidateEvent> disruptor = new Disruptor<>(factory, BUFFER_SIZE, executor);
disruptor.handleEventsWith(new SmsValidateEventPutCacheHandler());
disruptor.start();

RingBuffer<SmsValidateEvent> ringBuffer = disruptor.getRingBuffer();
SmsValidateEventPublisher publisher = new SmsValidateEventPublisher(ringBuffer);
String sendString = phoneNumber + "," + code;
ByteBuffer sendBuffer = ByteBuffer.wrap(sendString.getBytes(StandardCharsets.UTF_16BE));
publisher.onData(sendBuffer);

disruptor.shutdown();
// 生产短信验证码
SmsValidateEvent.Item item = new SmsValidateEvent.Item();
item.setPhoneNumber(phoneNumber);
item.setCode(code);
item.setType(SmsValidateEvent.Item.TYPE_PUT_CACHE);
SmsValidateDisruptor.getInstance().publish(item);

return "OK";
} catch (ServiceException se) {
Expand All @@ -246,18 +233,12 @@ public String sendValidate(String phoneNumber, String templateCode, Map<String,
* @param phoneNumber
*/
public void removeValidate(String phoneNumber) {
// 消费者准备
Disruptor<SmsValidateEvent> disruptor = new Disruptor<>(factory, BUFFER_SIZE, executor);
disruptor.handleEventsWith(new SmsValidateEventRemoveCacheHandler());
disruptor.start();

RingBuffer<SmsValidateEvent> ringBuffer = disruptor.getRingBuffer();
SmsValidateEventPublisher publisher = new SmsValidateEventPublisher(ringBuffer);
String sendString = phoneNumber + "," + CacheUtils.get("smsCache", phoneNumber);
ByteBuffer sendBuffer = ByteBuffer.wrap(sendString.getBytes(StandardCharsets.UTF_16BE));
publisher.onData(sendBuffer);

disruptor.shutdown();
// 消费短信验证码
SmsValidateEvent.Item item = new SmsValidateEvent.Item();
item.setPhoneNumber(phoneNumber);
item.setCode(String.valueOf(CacheUtils.get("smsCache", phoneNumber)));
item.setType(SmsValidateEvent.Item.TYPE_REMOVE_CACHE);
SmsValidateDisruptor.getInstance().publish(item);
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package com.funtl.leesite.common.sms;

import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;

import com.funtl.leesite.common.sms.consumer.SmsValidateEventHandler;
import com.funtl.leesite.common.sms.publisher.SmsValidateEvent;
import com.funtl.leesite.common.sms.publisher.SmsValidateEventFactory;
import com.funtl.leesite.common.sms.publisher.SmsValidateEventPublisher;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.YieldingWaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* 短信验证码的 Disruptor
* Created by Lusifer on 2017/5/27.
*/
public final class SmsValidateDisruptor {
private static final Logger logger = LoggerFactory.getLogger(SmsValidateDisruptor.class);

// 指定 ringBuffer 字节大小, 必须是 2 的倍数
private static final int BUFFER_SIZE = 1024;

private ThreadFactory threadFactory;
private SmsValidateEventFactory factory;
private Disruptor<SmsValidateEvent> disruptor;
private RingBuffer<SmsValidateEvent> ringBuffer;
private SmsValidateEventPublisher publisher;

private SmsValidateDisruptor() {
// 执行器,用于构造消费者线程
threadFactory = Executors.defaultThreadFactory();
// 指定事件工厂
factory = new SmsValidateEventFactory();

// 多线程模式
disruptor = new Disruptor<SmsValidateEvent>(factory, BUFFER_SIZE, threadFactory, ProducerType.MULTI, new YieldingWaitStrategy());
// 设置事件业务处理器 -> 消费者
disruptor.handleEventsWith(new SmsValidateEventHandler());
// 启动 disruptor 线程
disruptor.start();

// 获取 ringBuffer 环,用于接取生产者生产的事件
ringBuffer = disruptor.getRingBuffer();
// 为 ringBuffer 指定事件生产者
publisher = new SmsValidateEventPublisher(ringBuffer);

logger.debug("Created SmsValidateDisruptor.");
}

/**
* 生产消息
* @param item
*/
public void publish(SmsValidateEvent.Item item) {
publisher.onData(item);
}

/**
* 服务器关闭时别忘记调用
*/
public void shutdown() {
disruptor.shutdown();
}

/**
* 静态初始化器,由 JVM 来保证线程安全
*/
private static class SingletonHolder {
private static SmsValidateDisruptor instance = new SmsValidateDisruptor();
}

public static SmsValidateDisruptor getInstance() {
return SingletonHolder.instance;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package com.funtl.leesite.common.sms.consumer;

import com.funtl.leesite.common.sms.publisher.SmsValidateEvent;
import com.funtl.leesite.common.utils.CacheUtils;
import com.lmax.disruptor.EventHandler;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* 短信验证码消息处理
* Created by Lusifer on 2017/5/27.
*/
public class SmsValidateEventHandler implements EventHandler<SmsValidateEvent> {
private static final Logger logger = LoggerFactory.getLogger(SmsValidateEventHandler.class);

@Override
public void onEvent(SmsValidateEvent event, long sequence, boolean endOfBatch) throws Exception {
SmsValidateEvent.Item item = event.getItem();
String phoneNumber = item.getPhoneNumber();
String code = item.getCode();
String type = item.getType();

if (SmsValidateEvent.Item.TYPE_PUT_CACHE.equals(type)) {
pubCache(phoneNumber, code);
} else if (SmsValidateEvent.Item.TYPE_REMOVE_CACHE.equals(type)) {
removeCache(phoneNumber, code);
}
}

/**
* 往缓存中放验证码
* @param phoneNumber
* @param code
*/
private void pubCache(String phoneNumber, String code) {
logger.debug("生产短信验证码:手机号({}),验证码({})", phoneNumber, code);
CacheUtils.put("smsCache", phoneNumber, code);
}

/**
* 从缓存中删除验证码
* @param phoneNumber
* @param code
*/
private void removeCache(String phoneNumber, String code) {
logger.debug("消费短信验证码:手机号({}),验证码({})", phoneNumber, code);
CacheUtils.remove("smsCache", phoneNumber);
}
}

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,46 @@
* Created by Lusifer on 2017/5/5.
*/
public class SmsValidateEvent {
private String phoneNumber;
private String code;
private Item item;

public String getPhoneNumber() {
return phoneNumber;
public Item getItem() {
return item;
}

public void setPhoneNumber(String phoneNumber) {
this.phoneNumber = phoneNumber;
public void setItem(Item item) {
this.item = item;
}

public String getCode() {
return code;
}
public static class Item {
public static final String TYPE_PUT_CACHE = "put"; // 往缓存中放验证码
public static final String TYPE_REMOVE_CACHE = "remove"; // 从缓存中删除验证码

private String phoneNumber;
private String code;
private String type; // 处理方式,通过常量值判断

public String getPhoneNumber() {
return phoneNumber;
}

public void setPhoneNumber(String phoneNumber) {
this.phoneNumber = phoneNumber;
}

public String getCode() {
return code;
}

public void setCode(String code) {
this.code = code;
}

public String getType() {
return type;
}

public void setCode(String code) {
this.code = code;
public void setType(String type) {
this.type = type;
}
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package com.funtl.leesite.common.sms.publisher;

import java.nio.ByteBuffer;

import com.lmax.disruptor.EventTranslatorOneArg;
import com.lmax.disruptor.RingBuffer;

Expand All @@ -16,19 +14,14 @@ public SmsValidateEventPublisher(RingBuffer<SmsValidateEvent> ringBuffer) {
this.ringBuffer = ringBuffer;
}

private static final EventTranslatorOneArg<SmsValidateEvent, ByteBuffer> TRANSLATOR = new EventTranslatorOneArg<SmsValidateEvent, ByteBuffer>() {
private static final EventTranslatorOneArg<SmsValidateEvent, SmsValidateEvent.Item> TRANSLATOR = new EventTranslatorOneArg<SmsValidateEvent, SmsValidateEvent.Item>() {
@Override
public void translateTo(SmsValidateEvent event, long sequence, ByteBuffer byteBuffer) {
String[] str = byteBuffer.asCharBuffer().toString().trim().split(",");
String phoneNumber = str[0];
String code = str[1];

event.setPhoneNumber(phoneNumber);
event.setCode(code);
public void translateTo(SmsValidateEvent event, long sequence, SmsValidateEvent.Item item) {
event.setItem(item);
}
};

public void onData(ByteBuffer byteBuffer) {
ringBuffer.publishEvent(TRANSLATOR, byteBuffer);
public void onData(SmsValidateEvent.Item item) {
ringBuffer.publishEvent(TRANSLATOR, item);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import javax.servlet.ServletContext;
import javax.servlet.ServletContextEvent;

import com.funtl.leesite.common.sms.SmsValidateDisruptor;
import com.funtl.leesite.common.utils.ExecutorUtils;
import com.funtl.leesite.modules.sys.service.SystemService;

Expand All @@ -43,6 +44,10 @@ public WebApplicationContext initWebApplicationContext(ServletContext servletCon
@Override
public void contextDestroyed(ServletContextEvent event) {
// TODO 容器销毁时调用

// 关闭 Disruptor
SmsValidateDisruptor.getInstance().shutdown();

try {
// 平缓关闭 ExecutorService
ExecutorService executor = ExecutorUtils.getCachedThreadPool();
Expand Down
Loading

0 comments on commit 2966f91

Please sign in to comment.