Skip to content

Commit

Permalink
客户端consumer
Browse files Browse the repository at this point in the history
  • Loading branch information
CaRb u committed Jul 11, 2022
1 parent 19c9843 commit 8fa056f
Show file tree
Hide file tree
Showing 43 changed files with 860 additions and 71 deletions.
2 changes: 2 additions & 0 deletions conf/xymq-cli.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
server.port=8686
server.host=127.0.0.1
36 changes: 36 additions & 0 deletions xymq-cli/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,42 @@
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<slf4j.version>1.7.30</slf4j.version>
<fastjson.version>1.2.75</fastjson.version>
<netty.version>4.1.20.Final</netty.version>
</properties>

<dependencies>
<!-- netty -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>${netty.version}</version>
<scope>compile</scope>
</dependency>
<!--fastjson-->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>${fastjson.version}</version>
</dependency>
<!-- 日志 -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>jul-to-slf4j</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-nop</artifactId>
<version>${slf4j.version}</version>
</dependency>
</dependencies>


</project>
107 changes: 107 additions & 0 deletions xymq-cli/src/main/java/com/xymq_cli/client/Consumer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
package com.xymq_cli.client;

import com.xymq_cli.handler.ConsumerHandler;
import com.xymq_cli.listener.MessageData;
import com.xymq_cli.listener.MessageListener;
import com.xymq_cli.message.Message;
import com.xymq_cli.util.ResourceUtils;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;

/**
* @author 黎勇炫
* @date 2022年07月11日 15:08
*/
public class Consumer {
/**
* 服务器端地址
*/
private static String host;
/**
* 服务器端口
*/
private static int port;
/**
* 消息处理器
*/
private ConsumerHandler consumerHandler;
/**
* 日志信息
*/
private Logger logger = LoggerFactory.getLogger(Consumer.class);

static{
host = ResourceUtils.getKey("server.host");
port = Integer.parseInt(ResourceUtils.getKey("server.port"));
}

/**
* @param destination 队列/主题名
* @return
* @author 黎勇炫
* @create 2022/7/11
* @email [email protected]
*/
public Consumer(String destination) {
consumerHandler = new ConsumerHandler(destination);
try {
run();
}catch (Exception e){
logger.error("消费者初始化异常");
e.printStackTrace();
}
}

private void run() {

NioEventLoopGroup clientGroup = new NioEventLoopGroup();

try{
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(clientGroup)
.channel(NioSocketChannel.class)
.handler(new ConsumerlInitializer(consumerHandler));

// 连接服务器
ChannelFuture sync = bootstrap.connect(host, port).sync();
// 监听关闭连接
sync.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {

}
}

/**
* 设置自动签名
*/
public void setAutoAcknowledge(boolean autoAcknowledge){
this.consumerHandler.setAutoAcknowledge(autoAcknowledge);
}

/**
* 设置监听器
*/
public void createListener(MessageListener listener){
this.consumerHandler.createListener(listener);
}

public static void main(String[] args) {
Consumer consumer = new Consumer("queue");
consumer.createListener(new MessageListener() {
@Override
public void getMessage(MessageData data) {
System.out.println(data.getMessage());
}
});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package com.xymq_cli.client;

import com.xymq_cli.handler.ConsumerHandler;
import com.xymq_cli.protocol.MessageDecoder;
import com.xymq_cli.protocol.MessageEncoder;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;

/**
* 消费者初始化器
* @author 黎勇炫
* @date 2022年07月11日 15:40
*/
public class ConsumerlInitializer extends ChannelInitializer<SocketChannel> {

private String destination;
private ConsumerHandler consumerHandler;

public ConsumerlInitializer(ConsumerHandler consumerHandler){
this.consumerHandler = consumerHandler;
}

/**
* 该方法主要是为客户端channel设置编解码器以及消息处理器
* @param sc netty通道
* @return void
* @author 黎勇炫
* @create 2022/7/11
* @email [email protected]
*/
@Override
protected void initChannel(SocketChannel sc) throws Exception {
ChannelPipeline pipeline = sc.pipeline();
pipeline.addLast("encoder",new MessageEncoder())
.addLast("decoder",new MessageDecoder())
.addLast(consumerHandler);
}
}
23 changes: 23 additions & 0 deletions xymq-cli/src/main/java/com/xymq_cli/client/Producer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package com.xymq_cli.client;

import com.xymq_cli.util.ResourceUtils;

/**
* @author 黎勇炫
* @date 2022年07月11日 15:09
*/
public class Producer {
/**
* 服务器端地址
*/
private static String host;
/**
* 服务器端口
*/
private static int port;

static{
host = ResourceUtils.getKey("server.host");
port = Integer.parseInt(ResourceUtils.getKey("server.port"));
}
}
17 changes: 17 additions & 0 deletions xymq-cli/src/main/java/com/xymq_cli/constant/ClientConstant.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package com.xymq_cli.constant;

/**
* @author 黎勇炫
* @date 2022年07月11日 15:00
*/
public class ClientConstant {

/**
* 服务端口
*/
public static final String SERVER_PORT = "server.port";
/**
* 服务端地址
*/
public static final String SERVER_HOST = "server.host";
}
24 changes: 24 additions & 0 deletions xymq-cli/src/main/java/com/xymq_cli/constant/Destination.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package com.xymq_cli.constant;

/**
* @author 黎勇炫
* @date 2022年07月10日 13:37
*/
public enum Destination {
QUEUE(0),
TOPIC(1);

private int destination;

Destination(int destination) {
this.destination = destination;
}

public int getDestination() {
return destination;
}

public void setDestination(int destination) {
this.destination = destination;
}
}
31 changes: 31 additions & 0 deletions xymq-cli/src/main/java/com/xymq_cli/constant/MessageType.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package com.xymq_cli.constant;

/**
* @author 黎勇炫
* @date 2022年07月11日 15:57
*/
public enum MessageType {


PRIVODER(0,"provider"), //生产者
COMSUMER(1,"comsumer"), //消费者
ACK(2,"ack"); //签收


Integer type;
String describe;

MessageType(Integer type, String describe){
this.type = type;
this.describe = describe;
}

public Integer getType(){
return type;
}

public String getDescribe(){
return describe;
}

}
Loading

0 comments on commit 8fa056f

Please sign in to comment.