From febad02274102c52e35ea907f764c2de064512bf Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Fri, 23 Feb 2018 14:41:42 -0800 Subject: [PATCH 1/2] Add Java samples with connection recovery code --- .../main/java/RecvWithConnectionRecovery.java | 1 + .../main/java/SendWithConnectionRecovery.java | 1 + java/RecvWithConnectionRecovery.java | 33 +++++++++++++++++++ java/SendWithConnectionRecovery.java | 23 +++++++++++++ python/.gitignore | 1 + 5 files changed, 59 insertions(+) create mode 120000 java-mvn/src/main/java/RecvWithConnectionRecovery.java create mode 120000 java-mvn/src/main/java/SendWithConnectionRecovery.java create mode 100644 java/RecvWithConnectionRecovery.java create mode 100644 java/SendWithConnectionRecovery.java create mode 100644 python/.gitignore diff --git a/java-mvn/src/main/java/RecvWithConnectionRecovery.java b/java-mvn/src/main/java/RecvWithConnectionRecovery.java new file mode 120000 index 00000000..cfbef9b3 --- /dev/null +++ b/java-mvn/src/main/java/RecvWithConnectionRecovery.java @@ -0,0 +1 @@ +../../../../java/RecvWithConnectionRecovery.java \ No newline at end of file diff --git a/java-mvn/src/main/java/SendWithConnectionRecovery.java b/java-mvn/src/main/java/SendWithConnectionRecovery.java new file mode 120000 index 00000000..93ad6e98 --- /dev/null +++ b/java-mvn/src/main/java/SendWithConnectionRecovery.java @@ -0,0 +1 @@ +../../../../java/SendWithConnectionRecovery.java \ No newline at end of file diff --git a/java/RecvWithConnectionRecovery.java b/java/RecvWithConnectionRecovery.java new file mode 100644 index 00000000..5dfcd18a --- /dev/null +++ b/java/RecvWithConnectionRecovery.java @@ -0,0 +1,33 @@ +import com.rabbitmq.client.*; + +import java.io.IOException; + +public class RecvWithConnectionRecovery { + + private final static String QUEUE_NAME = "hello"; + + public static void main(String[] argv) throws Exception { + Address[] addresses = new Address[]{ + new Address("localhost", 5672), + new Address("localhost", 5673) + }; + ConnectionFactory factory = new ConnectionFactory(); + factory.setHost("localhost"); + factory.setAutomaticRecoveryEnabled(true); + Connection connection = factory.newConnection(addresses); + Channel channel = connection.createChannel(); + + channel.queueDeclare(QUEUE_NAME, false, false, false, null); + System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); + + Consumer consumer = new DefaultConsumer(channel) { + @Override + public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) + throws IOException { + String message = new String(body, "UTF-8"); + System.out.println(" [x] Received '" + message + "'"); + } + }; + channel.basicConsume(QUEUE_NAME, true, consumer); + } +} diff --git a/java/SendWithConnectionRecovery.java b/java/SendWithConnectionRecovery.java new file mode 100644 index 00000000..82dd5d34 --- /dev/null +++ b/java/SendWithConnectionRecovery.java @@ -0,0 +1,23 @@ +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.ConnectionFactory; + +public class SendWithConnectionRecovery { + + private final static String QUEUE_NAME = "hello"; + + public static void main(String[] argv) throws Exception { + ConnectionFactory factory = new ConnectionFactory(); + factory.setHost("localhost"); + Connection connection = factory.newConnection(); + Channel channel = connection.createChannel(); + + channel.queueDeclare(QUEUE_NAME, false, false, false, null); + String message = "Hello World!"; + channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8")); + System.out.println(" [x] Sent '" + message + "'"); + + channel.close(); + connection.close(); + } +} diff --git a/python/.gitignore b/python/.gitignore new file mode 100644 index 00000000..cdb93cd5 --- /dev/null +++ b/python/.gitignore @@ -0,0 +1 @@ +.python-version From 8399c1d513248d3a3a8af39018e68fc0cfeaffd8 Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Fri, 23 Feb 2018 15:58:26 -0800 Subject: [PATCH 2/2] Add examples that use connection recovery --- java-mvn/README.md | 5 ++++ java-mvn/pom.xml | 12 +++++++++- java/RecvWithConnectionRecovery.java | 35 +++++++++++++++++++++------- java/SendWithConnectionRecovery.java | 9 +++++-- 4 files changed, 49 insertions(+), 12 deletions(-) create mode 100644 java-mvn/README.md diff --git a/java-mvn/README.md b/java-mvn/README.md new file mode 100644 index 00000000..ff628e56 --- /dev/null +++ b/java-mvn/README.md @@ -0,0 +1,5 @@ +Running an example using `mvn` and enabling debug logging: + +``` +mvn -e exec:java -Dorg.slf4j.simpleLogger.defaultLogLevel=debug -Dexec.mainClass=RecvWithConnectionRecovery +``` diff --git a/java-mvn/pom.xml b/java-mvn/pom.xml index 6cbdcd97..b91f9500 100644 --- a/java-mvn/pom.xml +++ b/java-mvn/pom.xml @@ -20,7 +20,17 @@ com.rabbitmq amqp-client - LATEST + 5.1.2 + + + org.slf4j + slf4j-api + 1.7.25 + + + org.slf4j + slf4j-simple + 1.7.25 diff --git a/java/RecvWithConnectionRecovery.java b/java/RecvWithConnectionRecovery.java index 5dfcd18a..fa884bcd 100644 --- a/java/RecvWithConnectionRecovery.java +++ b/java/RecvWithConnectionRecovery.java @@ -1,28 +1,45 @@ -import com.rabbitmq.client.*; +import com.rabbitmq.client.Address; +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.ConnectionFactory; +import com.rabbitmq.client.Consumer; +import com.rabbitmq.client.DefaultConsumer; +import com.rabbitmq.client.Envelope; +import com.rabbitmq.client.AMQP.BasicProperties; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; public class RecvWithConnectionRecovery { - private final static String QUEUE_NAME = "hello"; + private static final Logger log = LoggerFactory.getLogger(RecvWithConnectionRecovery.class); + private static final String QUEUE_NAME = "hello"; public static void main(String[] argv) throws Exception { - Address[] addresses = new Address[]{ + final Address[] addresses = new Address[]{ new Address("localhost", 5672), new Address("localhost", 5673) }; - ConnectionFactory factory = new ConnectionFactory(); - factory.setHost("localhost"); + + log.debug("Initializing ConnectionFactory"); + final ConnectionFactory factory = new ConnectionFactory(); factory.setAutomaticRecoveryEnabled(true); - Connection connection = factory.newConnection(addresses); - Channel channel = connection.createChannel(); + factory.setTopologyRecoveryEnabled(true); + + log.debug("Creating Connection"); + final Connection connection = factory.newConnection(addresses); + + log.debug("Creating Channel"); + final Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); - System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); + log.info(" [*] Waiting for messages. To exit press CTRL+C"); Consumer consumer = new DefaultConsumer(channel) { @Override - public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) + public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received '" + message + "'"); diff --git a/java/SendWithConnectionRecovery.java b/java/SendWithConnectionRecovery.java index 82dd5d34..cb4e59d4 100644 --- a/java/SendWithConnectionRecovery.java +++ b/java/SendWithConnectionRecovery.java @@ -1,3 +1,4 @@ +import com.rabbitmq.client.Address; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; @@ -7,9 +8,13 @@ public class SendWithConnectionRecovery { private final static String QUEUE_NAME = "hello"; public static void main(String[] argv) throws Exception { + Address[] addresses = new Address[]{ + new Address("localhost", 5672), + new Address("localhost", 5673) + }; ConnectionFactory factory = new ConnectionFactory(); - factory.setHost("localhost"); - Connection connection = factory.newConnection(); + factory.setAutomaticRecoveryEnabled(true); + Connection connection = factory.newConnection(addresses); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null);