Skip to content

Commit e1a7c76

Browse files
committedMar 26, 2019
Spring Boot整合Kafka
1 parent adc5715 commit e1a7c76

File tree

8 files changed

+307
-0
lines changed

8 files changed

+307
-0
lines changed
 

‎54.Spring-Boot-Kafka/pom.xml

+41
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
3+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
4+
<modelVersion>4.0.0</modelVersion>
5+
<parent>
6+
<groupId>org.springframework.boot</groupId>
7+
<artifactId>spring-boot-starter-parent</artifactId>
8+
<version>2.1.3.RELEASE</version>
9+
<relativePath/> <!-- lookup parent from repository -->
10+
</parent>
11+
<groupId>com.example</groupId>
12+
<artifactId>demo</artifactId>
13+
<version>0.0.1-SNAPSHOT</version>
14+
<name>demo</name>
15+
<description>Demo project for Spring Boot</description>
16+
17+
<properties>
18+
<java.version>1.8</java.version>
19+
</properties>
20+
21+
<dependencies>
22+
<dependency>
23+
<groupId>org.springframework.boot</groupId>
24+
<artifactId>spring-boot-starter-web</artifactId>
25+
</dependency>
26+
<dependency>
27+
<groupId>org.springframework.kafka</groupId>
28+
<artifactId>spring-kafka</artifactId>
29+
</dependency>
30+
</dependencies>
31+
32+
<build>
33+
<plugins>
34+
<plugin>
35+
<groupId>org.springframework.boot</groupId>
36+
<artifactId>spring-boot-maven-plugin</artifactId>
37+
</plugin>
38+
</plugins>
39+
</build>
40+
41+
</project>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
package com.example.demo;
2+
3+
import org.springframework.boot.SpringApplication;
4+
import org.springframework.boot.autoconfigure.SpringBootApplication;
5+
6+
@SpringBootApplication
7+
public class KafkaApplication {
8+
9+
public static void main(String[] args) {
10+
SpringApplication.run(KafkaApplication.class, args);
11+
}
12+
13+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
package com.example.demo.config;
2+
3+
import com.example.demo.domain.Message;
4+
import org.apache.kafka.clients.consumer.ConsumerConfig;
5+
import org.apache.kafka.common.serialization.StringDeserializer;
6+
import org.springframework.beans.factory.annotation.Value;
7+
import org.springframework.context.annotation.Bean;
8+
import org.springframework.context.annotation.Configuration;
9+
import org.springframework.kafka.annotation.EnableKafka;
10+
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
11+
import org.springframework.kafka.core.ConsumerFactory;
12+
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
13+
import org.springframework.kafka.support.serializer.JsonDeserializer;
14+
import org.springframework.kafka.support.serializer.JsonSerializer;
15+
16+
import java.util.HashMap;
17+
import java.util.Map;
18+
19+
/**
20+
* @author MrBird
21+
*/
22+
@EnableKafka
23+
@Configuration
24+
public class KafkaConsumerConfig {
25+
26+
@Value("${spring.kafka.bootstrap-servers}")
27+
private String bootstrapServers;
28+
29+
@Value("${spring.kafka.consumer.group-id}")
30+
private String consumerGroupId;
31+
32+
@Value("${spring.kafka.consumer.auto-offset-reset}")
33+
private String autoOffsetReset;
34+
35+
@Bean
36+
public ConsumerFactory<String, Message> consumerFactory() {
37+
Map<String, Object> props = new HashMap<>();
38+
props.put(
39+
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
40+
bootstrapServers);
41+
props.put(
42+
ConsumerConfig.GROUP_ID_CONFIG,
43+
consumerGroupId);
44+
props.put(
45+
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
46+
autoOffsetReset);
47+
// props.put(
48+
// ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
49+
// StringDeserializer.class);
50+
// props.put(
51+
// ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
52+
// StringDeserializer.class);
53+
return new DefaultKafkaConsumerFactory<>(
54+
props,
55+
new StringDeserializer(),
56+
new JsonDeserializer<>(Message.class));
57+
}
58+
59+
@Bean
60+
public ConcurrentKafkaListenerContainerFactory<String, Message> kafkaListenerContainerFactory() {
61+
ConcurrentKafkaListenerContainerFactory<String, Message> factory
62+
= new ConcurrentKafkaListenerContainerFactory<>();
63+
factory.setConsumerFactory(consumerFactory());
64+
// factory.setRecordFilterStrategy(
65+
// r -> r.value().contains("fuck")
66+
// );
67+
return factory;
68+
}
69+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
package com.example.demo.config;
2+
3+
import com.example.demo.domain.Message;
4+
import org.apache.kafka.clients.producer.ProducerConfig;
5+
import org.apache.kafka.common.serialization.StringSerializer;
6+
import org.springframework.beans.factory.annotation.Value;
7+
import org.springframework.context.annotation.Bean;
8+
import org.springframework.context.annotation.Configuration;
9+
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
10+
import org.springframework.kafka.core.KafkaTemplate;
11+
import org.springframework.kafka.core.ProducerFactory;
12+
import org.springframework.kafka.support.serializer.JsonSerializer;
13+
14+
import java.util.HashMap;
15+
import java.util.Map;
16+
17+
/**
18+
* @author MrBird
19+
*/
20+
@Configuration
21+
public class KafkaProducerConfig {
22+
23+
@Value("${spring.kafka.bootstrap-servers}")
24+
private String bootstrapServers;
25+
26+
@Bean
27+
public ProducerFactory<String, Message> producerFactory() {
28+
Map<String, Object> configProps = new HashMap<>();
29+
configProps.put(
30+
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
31+
bootstrapServers);
32+
configProps.put(
33+
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
34+
StringSerializer.class);
35+
configProps.put(
36+
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
37+
JsonSerializer.class);
38+
return new DefaultKafkaProducerFactory<>(configProps);
39+
}
40+
41+
@Bean
42+
public KafkaTemplate<String, Message> kafkaTemplate() {
43+
return new KafkaTemplate<>(producerFactory());
44+
}
45+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
package com.example.demo.controller;
2+
3+
import com.example.demo.domain.Message;
4+
import org.slf4j.Logger;
5+
import org.slf4j.LoggerFactory;
6+
import org.springframework.beans.factory.annotation.Autowired;
7+
import org.springframework.kafka.core.KafkaTemplate;
8+
import org.springframework.kafka.support.SendResult;
9+
import org.springframework.util.concurrent.ListenableFuture;
10+
import org.springframework.util.concurrent.ListenableFutureCallback;
11+
import org.springframework.web.bind.annotation.GetMapping;
12+
import org.springframework.web.bind.annotation.PathVariable;
13+
import org.springframework.web.bind.annotation.RestController;
14+
15+
/**
16+
* @author MrBird
17+
*/
18+
@RestController
19+
public class SendMessageController {
20+
21+
private Logger logger = LoggerFactory.getLogger(this.getClass());
22+
23+
@Autowired
24+
// private KafkaTemplate<String, String> kafkaTemplate;
25+
private KafkaTemplate<String, Message> kafkaTemplate;
26+
27+
// @GetMapping("send/{message}")
28+
// public void send(@PathVariable String message) {
29+
// this.kafkaTemplate.send("test", message);
30+
// ListenableFuture<SendResult<String, String>> future = this.kafkaTemplate.send("test", message);
31+
// future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
32+
// @Override
33+
// public void onSuccess(SendResult<String, String> result) {
34+
// logger.info("成功发送消息:{},offset=[{}]", message, result.getRecordMetadata().offset());
35+
// }
36+
//
37+
// @Override
38+
// public void onFailure(Throwable ex) {
39+
// logger.error("消息:{} 发送失败,原因:{}", message, ex.getMessage());
40+
// }
41+
// });
42+
// }
43+
44+
@GetMapping("send/{message}")
45+
public void sendMessage(@PathVariable String message) {
46+
this.kafkaTemplate.send("test", new Message("mrbird", message));
47+
}
48+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
package com.example.demo.domain;
2+
3+
import java.io.Serializable;
4+
import java.time.LocalTime;
5+
6+
/**
7+
* @author MrBird
8+
*/
9+
public class Message implements Serializable {
10+
private static final long serialVersionUID = 6678420965611108427L;
11+
12+
private String from;
13+
14+
private String message;
15+
16+
public Message() {
17+
}
18+
19+
public Message(String from, String message) {
20+
this.from = from;
21+
this.message = message;
22+
}
23+
24+
@Override
25+
public String toString() {
26+
return "Message{" +
27+
"from='" + from + '\'' +
28+
", message='" + message + '\'' +
29+
'}';
30+
}
31+
32+
public String getFrom() {
33+
return from;
34+
}
35+
36+
public void setFrom(String from) {
37+
this.from = from;
38+
}
39+
40+
public String getMessage() {
41+
return message;
42+
}
43+
44+
public void setMessage(String message) {
45+
this.message = message;
46+
}
47+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
package com.example.demo.listener;
2+
3+
import com.example.demo.domain.Message;
4+
import org.slf4j.Logger;
5+
import org.slf4j.LoggerFactory;
6+
import org.springframework.kafka.annotation.KafkaListener;
7+
import org.springframework.kafka.annotation.PartitionOffset;
8+
import org.springframework.kafka.annotation.TopicPartition;
9+
import org.springframework.kafka.support.KafkaHeaders;
10+
import org.springframework.messaging.handler.annotation.Header;
11+
import org.springframework.messaging.handler.annotation.Payload;
12+
import org.springframework.stereotype.Component;
13+
14+
/**
15+
* @author MrBird
16+
*/
17+
@Component
18+
public class KafkaMessageListener {
19+
20+
private Logger logger = LoggerFactory.getLogger(this.getClass());
21+
22+
// @KafkaListener(topics = "test", groupId = "test-consumer")
23+
// @KafkaListener(groupId = "test-consumer",
24+
// topicPartitions = @TopicPartition(topic = "test",
25+
// partitionOffsets = {
26+
// @PartitionOffset(partition = "0", initialOffset = "0")
27+
// }))
28+
// public void listen(@Payload String message,
29+
// @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
30+
// logger.info("接收消息: {},partition:{}", message, partition);
31+
// }
32+
33+
@KafkaListener(topics = "test", groupId = "test-consumer")
34+
public void listen(Message message) {
35+
logger.info("接收消息: {}", message);
36+
}
37+
38+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
spring:
2+
kafka:
3+
bootstrap-servers: localhost:9092
4+
consumer:
5+
group-id: test-consumer
6+
auto-offset-reset: latest

0 commit comments

Comments
 (0)
Please sign in to comment.