Skip to content

Commit ce14693

Browse files
committed
Add Producer related implementation for OpenMessaging.
1 parent c60ac52 commit ce14693

File tree

14 files changed

+959
-8
lines changed

14 files changed

+959
-8
lines changed

example/pom.xml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,5 +48,14 @@
4848
<groupId>org.javassist</groupId>
4949
<artifactId>javassist</artifactId>
5050
</dependency>
51+
<dependency>
52+
<groupId>io.openmessaging</groupId>
53+
<artifactId>openmessaging-api</artifactId>
54+
</dependency>
55+
<dependency>
56+
<groupId>org.apache.rocketmq</groupId>
57+
<artifactId>rocketmq-openmessaging</artifactId>
58+
<version>4.1.0-incubating-SNAPSHOT</version>
59+
</dependency>
5160
</dependencies>
5261
</project>

example/src/main/java/org/apache/rocketmq/example/openmessaging/SimpleProducer.java

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,59 @@
1616
*/
1717
package org.apache.rocketmq.example.openmessaging;
1818

19+
import io.openmessaging.Message;
20+
import io.openmessaging.MessagingAccessPoint;
21+
import io.openmessaging.MessagingAccessPointFactory;
22+
import io.openmessaging.Producer;
23+
import io.openmessaging.Promise;
24+
import io.openmessaging.PromiseListener;
25+
import io.openmessaging.SendResult;
26+
import java.nio.charset.Charset;
27+
1928
public class SimpleProducer {
2029
public static void main(String[] args) {
30+
final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory
31+
.getMessagingAccessPoint("openmessaging:rocketmq://10.125.3.140:9876,10.189.232.59:9876/namespace");
32+
33+
final Producer producer = messagingAccessPoint.createProducer();
34+
35+
messagingAccessPoint.startup();
36+
System.out.println("messagingAccessPoint startup OK");
37+
38+
producer.startup();
39+
System.out.println("producer startup OK");
40+
41+
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
42+
@Override
43+
public void run() {
44+
producer.shutdown();
45+
messagingAccessPoint.shutdown();
46+
}
47+
}));
48+
49+
{
50+
Message message = producer.createBytesMessageToTopic("HELLO_TOPIC", "HELLO_BODY".getBytes(Charset.forName("UTF-8")));
51+
SendResult sendResult = producer.send(message);
52+
//final Void aVoid = result.get(3000L);
53+
System.out.println("send async message OK, msgId: " + sendResult.messageId());
54+
}
55+
56+
{
57+
final Promise<SendResult> result = producer.sendAsync(producer.createBytesMessageToTopic("HELLO_TOPIC", "HELLO_BODY".getBytes(Charset.forName("UTF-8"))));
58+
result.addListener(new PromiseListener<SendResult>() {
59+
@Override public void operationCompleted(Promise<SendResult> promise) {
60+
System.out.println("Send async message OK, msgId: " + promise.get().messageId());
61+
}
62+
63+
@Override public void operationFailed(Promise<SendResult> promise) {
64+
System.out.println("send async message Failed, error: " + promise.getThrowable().getMessage());
65+
}
66+
});
67+
}
2168

69+
{
70+
producer.sendOneway(producer.createBytesMessageToTopic("HELLO_TOPIC", "HELLO_BODY".getBytes(Charset.forName("UTF-8"))));
71+
System.out.println("Send oneway message OK");
72+
}
2273
}
2374
}

openmessaging/pom.xml

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,22 @@
2727
<modelVersion>4.0.0</modelVersion>
2828

2929
<artifactId>rocketmq-openmessaging</artifactId>
30+
<name>rocketmq-openmessaging ${project.version}</name>
3031

3132
<dependencies>
3233
<dependency>
3334
<groupId>io.openmessaging</groupId>
34-
<artifactId>messaging-user-level-api</artifactId>
35+
<artifactId>openmessaging-api</artifactId>
36+
</dependency>
37+
<dependency>
38+
<groupId>org.apache.rocketmq</groupId>
39+
<artifactId>rocketmq-client</artifactId>
40+
</dependency>
41+
<dependency>
42+
<groupId>javax.jms</groupId>
43+
<artifactId>javax.jms-api</artifactId>
44+
<version>2.0.1</version>
45+
<scope>test</scope>
3546
</dependency>
3647
</dependencies>
3748
</project>

openmessaging/src/main/java/io/openmessaging/rocketmq/MessagingAccessPointImpl.java

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,26 +26,39 @@
2626
import io.openmessaging.SequenceProducer;
2727
import io.openmessaging.ServiceEndPoint;
2828
import io.openmessaging.observer.Observer;
29+
import io.openmessaging.rocketmq.producer.ProducerImpl;
30+
import io.openmessaging.rocketmq.producer.SequenceProducerImpl;
2931

3032
public class MessagingAccessPointImpl implements MessagingAccessPoint {
33+
private final KeyValue accessPointProperties;
34+
35+
public MessagingAccessPointImpl(final KeyValue accessPointProperties) {
36+
this.accessPointProperties = accessPointProperties;
37+
}
38+
39+
@Override
40+
public KeyValue properties() {
41+
return accessPointProperties;
42+
}
43+
3144
@Override
3245
public Producer createProducer() {
33-
return null;
46+
return new ProducerImpl(this.accessPointProperties);
3447
}
3548

3649
@Override
3750
public Producer createProducer(KeyValue properties) {
38-
return null;
51+
return new ProducerImpl(OMSUtil.buildKeyValue(this.accessPointProperties, properties));
3952
}
4053

4154
@Override
4255
public SequenceProducer createSequenceProducer() {
43-
return null;
56+
return new SequenceProducerImpl(this.accessPointProperties);
4457
}
4558

4659
@Override
4760
public SequenceProducer createSequenceProducer(KeyValue properties) {
48-
return null;
61+
return new SequenceProducerImpl(OMSUtil.buildKeyValue(this.accessPointProperties, properties));
4962
}
5063

5164
@Override
@@ -79,7 +92,7 @@ public IterableConsumer createIterableConsumer(String queueName, KeyValue proper
7992
}
8093

8194
@Override
82-
public ResourceManager createResourceManager() {
95+
public ResourceManager getResourceManager() {
8396
return null;
8497
}
8598

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package io.openmessaging.rocketmq;
18+
19+
import io.openmessaging.BytesMessage;
20+
import io.openmessaging.KeyValue;
21+
import io.openmessaging.MessageHeader;
22+
import io.openmessaging.OMS;
23+
import io.openmessaging.SendResult;
24+
import io.openmessaging.rocketmq.domain.SendResultImpl;
25+
import org.apache.rocketmq.client.producer.SendStatus;
26+
import org.apache.rocketmq.common.UtilAll;
27+
import org.apache.rocketmq.common.message.MessageAccessor;
28+
29+
public class OMSUtil {
30+
31+
/**
32+
* Builds a OMS client instance name.
33+
*
34+
* @return a unique instance name
35+
*/
36+
public static String buildInstanceName() {
37+
return Integer.toString(UtilAll.getPid()) + "%OpenMessaging" + "%" + System.nanoTime();
38+
}
39+
40+
public static org.apache.rocketmq.common.message.Message msgConvert(BytesMessage omsMessage) {
41+
org.apache.rocketmq.common.message.Message rmqMessage = new org.apache.rocketmq.common.message.Message();
42+
rmqMessage.setBody(omsMessage.getBody());
43+
44+
KeyValue headers = omsMessage.headers();
45+
KeyValue properties = omsMessage.properties();
46+
47+
//All destinations in RocketMQ use Topic
48+
rmqMessage.setTopic(headers.containsKey(MessageHeader.TOPIC)
49+
? headers.getString(MessageHeader.TOPIC) : headers.getString(MessageHeader.QUEUE));
50+
51+
for (String key : properties.keySet()) {
52+
MessageAccessor.putProperty(rmqMessage, key, properties.getString(key));
53+
}
54+
55+
//Headers has a high priority
56+
for (String key : headers.keySet()) {
57+
MessageAccessor.putProperty(rmqMessage, key, headers.getString(key));
58+
}
59+
60+
return rmqMessage;
61+
}
62+
63+
/**
64+
* Convert a RocketMQ SEND_OK SendResult instance to a OMS SendResult.
65+
*/
66+
public static SendResult sendResultConvert(org.apache.rocketmq.client.producer.SendResult rmqResult) {
67+
assert rmqResult.getSendStatus().equals(SendStatus.SEND_OK);
68+
return new SendResultImpl(rmqResult.getMsgId(), OMS.newKeyValue());
69+
}
70+
71+
public static KeyValue buildKeyValue(KeyValue ... keyValues) {
72+
KeyValue keyValue = OMS.newKeyValue();
73+
for (KeyValue properties : keyValues) {
74+
for (String key : properties.keySet()) {
75+
keyValue.put(key, properties.getString(key));
76+
}
77+
}
78+
return keyValue;
79+
}
80+
}
Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package io.openmessaging.rocketmq.domain;
18+
19+
import io.openmessaging.BytesMessage;
20+
import io.openmessaging.KeyValue;
21+
import io.openmessaging.Message;
22+
import io.openmessaging.OMS;
23+
24+
public class BytesMessageImpl implements BytesMessage {
25+
private KeyValue headers;
26+
private KeyValue properties;
27+
private byte[] body;
28+
29+
public BytesMessageImpl() {
30+
this.headers = OMS.newKeyValue();
31+
this.properties = OMS.newKeyValue();
32+
}
33+
34+
@Override
35+
public byte[] getBody() {
36+
return body;
37+
}
38+
39+
@Override
40+
public BytesMessage setBody(final byte[] body) {
41+
this.body = body;
42+
return this;
43+
}
44+
45+
@Override
46+
public KeyValue headers() {
47+
return headers;
48+
}
49+
50+
@Override
51+
public KeyValue properties() {
52+
return properties;
53+
}
54+
55+
@Override
56+
public Message putHeaders(final String key, final int value) {
57+
headers.put(key, value);
58+
return this;
59+
}
60+
61+
@Override
62+
public Message putHeaders(final String key, final long value) {
63+
headers.put(key, value);
64+
return this;
65+
}
66+
67+
@Override
68+
public Message putHeaders(final String key, final double value) {
69+
headers.put(key, value);
70+
return this;
71+
}
72+
73+
@Override
74+
public Message putHeaders(final String key, final String value) {
75+
headers.put(key, value);
76+
return this;
77+
}
78+
79+
@Override
80+
public Message putProperties(final String key, final int value) {
81+
properties.put(key, value);
82+
return this;
83+
}
84+
85+
@Override
86+
public Message putProperties(final String key, final long value) {
87+
properties.put(key, value);
88+
return this;
89+
}
90+
91+
@Override
92+
public Message putProperties(final String key, final double value) {
93+
properties.put(key, value);
94+
return this;
95+
}
96+
97+
@Override
98+
public Message putProperties(final String key, final String value) {
99+
properties.put(key, value);
100+
return this;
101+
}
102+
}
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package io.openmessaging.rocketmq.domain;
18+
19+
public class NonStandardKeys {
20+
}
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package io.openmessaging.rocketmq.domain;
18+
19+
import io.openmessaging.KeyValue;
20+
import io.openmessaging.SendResult;
21+
22+
public class SendResultImpl implements SendResult {
23+
private String messageId;
24+
private KeyValue properties;
25+
26+
public SendResultImpl(final String messageId, final KeyValue properties) {
27+
this.messageId = messageId;
28+
this.properties = properties;
29+
}
30+
31+
@Override
32+
public String messageId() {
33+
return messageId;
34+
}
35+
36+
@Override
37+
public KeyValue properties() {
38+
return properties;
39+
}
40+
}

0 commit comments

Comments
 (0)