Skip to content

Commit

Permalink
Optimize configuration, increase instance configuration and source/si…
Browse files Browse the repository at this point in the history
…nk configuration
  • Loading branch information
wendy512 committed Apr 30, 2024
1 parent c29137a commit effa697
Show file tree
Hide file tree
Showing 66 changed files with 255 additions and 1,495 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ buildNumber.properties
*.ipr
rebel.xml
logs/
.flattened-pom.xml

### NetBeans ###
/nbproject/private/
Expand Down
10 changes: 6 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,12 @@ If you use Maven, you just need to add the following dependency in pom.xml:
[Reference Wiki](https://github.com/wendy512/stream/wiki)

## Examples
* [Local queue](https://github.com/wendy512/stream/tree/master/samples/local-sample)
* [Kafka](https://github.com/wendy512/stream/tree/master/samples/kafka-sample)
* [MQTT](https://github.com/wendy512/stream/tree/master/samples/mqtt-sample)
* [RabbitMQ](https://github.com/wendy512/stream/tree/master/samples/rabbitmq-sample)
* [Local queue](https://github.com/wendy512/stream-samples/local-sample)
* [Kafka](https://github.com/wendy512/stream-samples/kafka-sample)
* [MQTT](https://github.com/wendy512/stream-samples/mqtt-sample)
* [RabbitMQ](https://github.com/wendy512/stream-samples/rabbitmq-sample)
* [Redis](https://github.com/wendy512/stream-samples/redis-sample)
* [Pulsar](https://github.com/wendy512/stream-samples/pulsar-sample)

## License
Stream is based on the [Apache License 2.0](./LICENSE) agreement, and Stream relies on some third-party components whose open source agreement is also Apache License 2.0.
Expand Down
22 changes: 19 additions & 3 deletions bom/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
<url>https://github.com/wendy512/stream</url>

<properties>
<revision>1.0.1</revision>
<revision>1.0.2</revision>
<maven-compiler-plugin.version>3.11.0</maven-compiler-plugin.version>
<maven-source-plugin.version>3.0.1</maven-source-plugin.version>
<nexus-staging-maven-plugin.version>1.6.7</nexus-staging-maven-plugin.version>
Expand All @@ -32,6 +32,7 @@
<kafka.version>3.2.0</kafka.version>
<amqp-client.version>5.16.0</amqp-client.version>
<redisson.version>3.16.6</redisson.version>
<pulsar-client.version>2.11.0</pulsar-client.version>
</properties>

<dependencyManagement>
Expand All @@ -57,6 +58,16 @@
<artifactId>stream-rabbitmq</artifactId>
<version>${revision}</version>
</dependency>
<dependency>
<groupId>io.github.wendy512</groupId>
<artifactId>stream-redis</artifactId>
<version>${revision}</version>
</dependency>
<dependency>
<groupId>io.github.wendy512</groupId>
<artifactId>stream-pulsar</artifactId>
<version>${revision}</version>
</dependency>
<dependency>
<groupId>io.github.wendy512</groupId>
<artifactId>stream-components</artifactId>
Expand Down Expand Up @@ -118,6 +129,11 @@
<artifactId>redisson</artifactId>
<version>${redisson.version}</version>
</dependency>
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client</artifactId>
<version>${pulsar-client.version}</version>
</dependency>
<!-- Test -->
<dependency>
<groupId>junit</groupId>
Expand Down Expand Up @@ -159,7 +175,7 @@
<goals>deploy</goals>
</configuration>
</plugin>
<!--gpg插件-->
<!--gpg插件
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-gpg-plugin</artifactId>
Expand All @@ -173,7 +189,7 @@
</goals>
</execution>
</executions>
</plugin>
</plugin>-->
</plugins>
</build>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
import org.apache.kafka.common.serialization.StringSerializer;

import io.github.stream.core.Message;
import io.github.stream.core.configuration.ConfigContext;
import io.github.stream.core.message.MessageHeaders;
import io.github.stream.core.properties.BaseProperties;
import io.github.stream.core.sink.AbstractSink;
import lombok.extern.slf4j.Slf4j;

Expand All @@ -41,8 +41,8 @@ public class KafkaSink extends AbstractSink<Object> {
private KafkaProducer kafkaProducer;

@Override
public void configure(BaseProperties properties) {
Map config = properties.getConfig();
public void configure(ConfigContext context) {
Map config = context.getInstance().getOriginal();
if (null == config) {
throw new IllegalArgumentException("Kafka sink config cannot empty");
}
Expand All @@ -64,18 +64,17 @@ public void process(List<Message<Object>> messages) {
String topic = headers.getString("topic");
Object key = headers.get("key");
Integer partition = MapUtils.getInteger(headers, "partition");
Long timestamp = MapUtils.getLong(headers, "timestamp");
Object payload = message.getPayload();
if (StringUtils.isBlank(topic)) {
continue;
}

send(topic, partition, timestamp, key, payload);
send(topic, partition, key, payload);
}
}

public void send(String topic, Integer partition, Long timestamp, Object key, Object payload) {
ProducerRecord record = new ProducerRecord(topic, partition, timestamp, key, payload);
public void send(String topic, Integer partition, Object key, Object payload) {
ProducerRecord record = new ProducerRecord(topic, partition, key, payload);
kafkaProducer.send(record);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@

import io.github.stream.core.AbstractAutoRunnable;
import io.github.stream.core.Message;
import io.github.stream.core.configuration.ConfigContext;
import io.github.stream.core.message.MessageBuilder;
import io.github.stream.core.properties.BaseProperties;
import io.github.stream.core.source.AbstractSource;
import lombok.extern.slf4j.Slf4j;

Expand All @@ -56,25 +56,25 @@ public class KafkaSource extends AbstractSource {
private boolean autoCommit;

@Override
public void configure(BaseProperties properties) {
String topic = properties.getString("topic");
public void configure(ConfigContext context) {
String topic = context.getConfig().getString("topic");
if (StringUtils.isBlank(topic)) {
throw new IllegalArgumentException("Kafka topic is empty");
throw new IllegalArgumentException("Kafka topic config is empty");
}

Map config = properties.getConfig();
Map consumerProperties = context.getInstance().getOriginal();
// 手动提交
config.putIfAbsent(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
consumerProperties.putIfAbsent(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
// 自动提交间隔
config.putIfAbsent(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);
consumerProperties.putIfAbsent(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);
// 配置序列化
config.putIfAbsent(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
config.putIfAbsent(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProperties.putIfAbsent(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProperties.putIfAbsent(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

this.topics = Arrays.asList(topic.split(","));
this.interval = properties.getInt("pollInterval", 50);
this.autoCommit = MapUtils.getBooleanValue(config, ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG);
this.kafkaConsumer = new KafkaConsumer(config);
this.interval = context.getConfig().getInt("pollInterval", 50);
this.autoCommit = MapUtils.getBooleanValue(consumerProperties, ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG);
this.kafkaConsumer = new KafkaConsumer(consumerProperties);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@

import io.github.stream.core.Configurable;
import io.github.stream.core.StreamException;
import io.github.stream.core.configuration.ConfigContext;
import io.github.stream.core.properties.BaseProperties;
import io.github.stream.core.utils.UUID;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -70,11 +71,12 @@ public class MqttStateConfigure implements Configurable {
private int qos;

@Override
public void configure(BaseProperties properties) throws Exception {
this.configure(properties, true);
public void configure(ConfigContext context) throws Exception {
this.configure(context, true);
}

public void configure(BaseProperties properties, boolean resolveTopic) throws Exception {
public void configure(ConfigContext context, boolean resolveTopic) throws Exception {
BaseProperties properties = context.getInstance();
MqttConnectOptions options = createOptions(properties);

String host = properties.getString(OPTIONS_HOST);
Expand All @@ -83,10 +85,10 @@ public void configure(BaseProperties properties, boolean resolveTopic) throws Ex
}

if (resolveTopic) {
resolveTopic(properties);
resolveTopic(context.getConfig().get(OPTIONS_TOPIC));
}

String clientId = properties.getString(OPTIONS_CLIENT_ID);
String clientId = context.getConfig().getString(OPTIONS_CLIENT_ID);
if (StringUtils.isBlank(clientId)) {
clientId = UUID.fastUUID().toString(true);
}
Expand All @@ -103,8 +105,7 @@ public void configure(BaseProperties properties, boolean resolveTopic) throws Ex
}
}

private void resolveTopic(BaseProperties properties) {
Object topicValue = properties.get(OPTIONS_TOPIC);
private void resolveTopic(Object topicValue) {
if (topicValue instanceof List) {
List<String> topicList = (List<String>) topicValue;
Assert.notEmpty(topicList, "MQTT topic cannot be empty");
Expand Down Expand Up @@ -171,15 +172,13 @@ private void setSSLOptions(BaseProperties properties, MqttConnectOptions options
throws KeyStoreException, IOException, NoSuchAlgorithmException, CertificateException, KeyManagementException {
BaseProperties ssl = properties.getProperties("ssl");
if (null != ssl && ssl.getBooleanValue("enabled")) {
String keyStorePath = ssl.getString("key-store");
if (StringUtils.isBlank(keyStorePath)) {
throw new IllegalArgumentException("MQTT ssl key-store cannot be empty");
}
String keyStorePath = ssl.getString("keyStore");
Assert.hasText(keyStorePath, "MQTT ssl key-store cannot be empty");

// 加载信任库
KeyStore trustStore = KeyStore.getInstance(ssl.getString("key-store-type", "JKS"));
KeyStore trustStore = KeyStore.getInstance(ssl.getString("keyStoreType", "JKS"));
InputStream keyStoreStream;
String keyStorePassword = ssl.getString("key-store-password", "");
String keyStorePassword = ssl.getString("keyStorePassword", "");

if (keyStorePath.startsWith(ResourceUtils.CLASSPATH_URL_PREFIX)) {
String path = keyStorePath.substring(ResourceUtils.CLASSPATH_URL_PREFIX.length());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import org.eclipse.paho.client.mqttv3.MqttMessage;

import io.github.stream.core.Message;
import io.github.stream.core.properties.BaseProperties;
import io.github.stream.core.configuration.ConfigContext;
import io.github.stream.core.sink.AbstractSink;
import io.github.stream.mqtt.MqttStateConfigure;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -38,9 +38,9 @@ public class MqttSink extends AbstractSink<String> {
private MqttStateConfigure stateConfigure;

@Override
public void configure(BaseProperties properties) throws Exception {
public void configure(ConfigContext context) throws Exception {
this.stateConfigure = new MqttStateConfigure();
this.stateConfigure.configure(properties, false);
this.stateConfigure.configure(context, false);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@

import io.github.stream.core.Message;
import io.github.stream.core.StreamException;
import io.github.stream.core.configuration.ConfigContext;
import io.github.stream.core.message.MessageBuilder;
import io.github.stream.core.properties.BaseProperties;
import io.github.stream.core.source.AbstractSource;
import io.github.stream.mqtt.MqttStateConfigure;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -40,8 +40,8 @@ public class MqttSource extends AbstractSource<String> {
private final MqttStateConfigure stateConfigure = new MqttStateConfigure();

@Override
public void configure(BaseProperties properties) throws Exception {
stateConfigure.configure(properties);
public void configure(ConfigContext context) throws Exception {
stateConfigure.configure(context);
}

@Override
Expand Down
1 change: 1 addition & 0 deletions components/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
<module>kafka</module>
<module>rabbitmq</module>
<module>redis</module>
<module>pulsar</module>
</modules>

<artifactId>stream-components</artifactId>
Expand Down
2 changes: 0 additions & 2 deletions components/pulsar/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,10 @@
<groupId>io.github.wendy512</groupId>
<artifactId>stream-core</artifactId>
</dependency>

<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client</artifactId>
</dependency>

</dependencies>

</project>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
package io.github.stream.pulsar;

import io.github.stream.core.Configurable;
import io.github.stream.core.properties.AbstractProperties;
import java.io.IOException;
import java.util.Map;

import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.PulsarClient;

import java.io.IOException;
import java.util.Map;
import io.github.stream.core.Configurable;
import io.github.stream.core.configuration.ConfigContext;

/**
* pulsar配置加载
Expand All @@ -19,21 +20,21 @@ public class PulsarStateConfigure implements Configurable {


@Override
public void configure(AbstractProperties properties) {
builder = createPulsarClientBuilder(properties);
public void configure(ConfigContext context) {
builder = createPulsarClientBuilder(context);
}

public PulsarClient newPulsarClient() throws IOException {
return builder.build();
}

@SuppressWarnings("unchecked")
private ClientBuilder createPulsarClientBuilder(AbstractProperties properties) {
Map<String, Object> config = properties.getConfig();
private ClientBuilder createPulsarClientBuilder(ConfigContext context) {
Map<String, Object> config = context.getInstance().getOriginal();
if (null == config) {
throw new IllegalArgumentException("pulsar sink config cannot empty");
}
Map<String, Object> clientConfig = (Map<String, Object>) config.get("client");
Map<String, Object> clientConfig = context.getInstance().getOriginal();
return PulsarClient.builder().loadConf(clientConfig);
}
}
Loading

0 comments on commit effa697

Please sign in to comment.