Skip to content

Commit

Permalink
Merge pull request apache#8677 from Riduidel/fix/rabbitmq-message-not…
Browse files Browse the repository at this point in the history
…-serializable

[BEAM-7414] fix for message being not serializable due to LongString in headers
  • Loading branch information
jbonofre authored Oct 12, 2019
2 parents 2ff462d + d8d8761 commit 5a851b7
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@
* instead of directly from a queue:
*
* <pre>{@code
* PCollection<RabbitMqMessage> messages = pipeline.apply(
* RabbitMqIO.read().withUri("amqp://user:password@localhost:5672").withExchange("EXCHANGE", "fanout", "QUEUE"));
* PCollection<RabbitMqMessage> messages = pipeline.apply(RabbitMqIO.read()
* .withUri("amqp://user:password@localhost:5672").withExchange("EXCHANGE", "fanout", "QUEUE"));
* }</pre>
*
* <h3>Publishing messages to RabbitMQ server</h3>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,11 @@
package org.apache.beam.sdk.io.rabbitmq;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.LongString;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;
import java.io.Serializable;
import java.util.Arrays;
import java.util.Date;
Expand All @@ -33,6 +37,67 @@
*/
public class RabbitMqMessage implements Serializable {

/**
* Make delivery serializable by cloning all non-serializable values into serializable ones. If it
* is not possible, initial delivery is returned and error message is logged
*
* @param processed
* @return
*/
private static Delivery serializableDeliveryOf(Delivery processed) {
// All content of envelope is serializable, so no problem there
Envelope envelope = processed.getEnvelope();
// in basicproperties, there may be LongString, which are *not* serializable
BasicProperties properties = processed.getProperties();
BasicProperties nextProperties =
new BasicProperties.Builder()
.appId(properties.getAppId())
.clusterId(properties.getClusterId())
.contentEncoding(properties.getContentEncoding())
.contentType(properties.getContentType())
.correlationId(properties.getCorrelationId())
.deliveryMode(properties.getDeliveryMode())
.expiration(properties.getExpiration())
.headers(serializableHeaders(properties.getHeaders()))
.messageId(properties.getMessageId())
.priority(properties.getPriority())
.replyTo(properties.getReplyTo())
.timestamp(properties.getTimestamp())
.type(properties.getType())
.userId(properties.getUserId())
.build();
return new Delivery(envelope, nextProperties, processed.getBody());
}

private static Map<String, Object> serializableHeaders(Map<String, Object> headers) {
Map<String, Object> returned = new HashMap<>();
if (headers != null) {
for (Map.Entry<String, Object> h : headers.entrySet()) {
Object value = h.getValue();
if (!(value instanceof Serializable)) {
try {
if (value instanceof LongString) {
LongString longString = (LongString) value;
byte[] bytes = longString.getBytes();
String s = new String(bytes, "UTF-8");
value = s;
} else {
throw new RuntimeException(String.format("no transformation defined for %s", value));
}
} catch (Throwable t) {
throw new UnsupportedOperationException(
String.format(
"can't make unserializable value %s a serializable value (which is mandatory for Apache Beam dataflow implementation)",
value),
t);
}
}
returned.put(h.getKey(), value);
}
}
return returned;
}

@Nullable private final String routingKey;
private final byte[] body;
private final String contentType;
Expand Down Expand Up @@ -71,6 +136,7 @@ public RabbitMqMessage(byte[] body) {

public RabbitMqMessage(String routingKey, QueueingConsumer.Delivery delivery) {
this.routingKey = routingKey;
delivery = serializableDeliveryOf(delivery);
body = delivery.getBody();
contentType = delivery.getProperties().getContentType();
contentEncoding = delivery.getProperties().getContentEncoding();
Expand Down

0 comments on commit 5a851b7

Please sign in to comment.