Skip to content

Commit

Permalink
Merge pull request quarkusio#983 from cescoffier/build-time-wiring-si…
Browse files Browse the repository at this point in the history
…mpliciation

Simplify configuration allowed by the build time wiring of reactive messaging
  • Loading branch information
cescoffier authored Nov 25, 2021
2 parents 2b39f77 + 46ae6e3 commit 2385a13
Show file tree
Hide file tree
Showing 13 changed files with 110 additions and 53 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,2 @@
# Configure the incoming AMQP queue `quote-requests`
mp.messaging.incoming.requests.connector=smallrye-amqp
# Set the AMQP address for the `requests` channel, as it's not the channel name
mp.messaging.incoming.requests.address=quote-requests

# Configure the outgoing AMQP queue `quotes`
mp.messaging.outgoing.quotes.connector=smallrye-amqp
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package org.acme.amqp.processor;

import static org.junit.jupiter.api.Assertions.assertEquals;

import java.util.UUID;

import io.smallrye.mutiny.helpers.test.AssertSubscriber;
import io.vertx.amqp.AmqpClientOptions;
import io.vertx.mutiny.amqp.AmqpClient;
import io.vertx.mutiny.amqp.AmqpConnection;
import io.vertx.mutiny.amqp.AmqpMessage;
import io.vertx.mutiny.amqp.AmqpReceiver;
import io.vertx.mutiny.amqp.AmqpSender;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import io.quarkus.test.junit.QuarkusTest;

@QuarkusTest
public class QuoteProcessorTest {

@ConfigProperty(name = "amqp-host") String host;
@ConfigProperty(name = "amqp-port") int port;
private AmqpClient client;

@BeforeEach
void setUp() {
client = AmqpClient.create(new AmqpClientOptions().setHost(host).setPort(port));
}

@AfterEach
void tearDown() {
client.closeAndAwait();
}

@Test
void testProcessor() {
AmqpConnection connection = client.connectAndAwait();
AmqpReceiver quotes = connection.createReceiverAndAwait("quotes");
AssertSubscriber<AmqpMessage> subscriber = quotes.toMulti().subscribe().withSubscriber(AssertSubscriber.create(Long.MAX_VALUE));
AmqpSender sender = connection.createSenderAndAwait("quote-requests");
UUID quoteId = UUID.randomUUID();
sender.sendWithAckAndAwait(AmqpMessage.create().address("quote-requests").withBody(quoteId.toString()).build());
subscriber.awaitItems(1);
AmqpMessage received = subscriber.getItems().get(0);
assertEquals(received.bodyAsJsonObject().getString("id"), quoteId.toString());
}
}
5 changes: 5 additions & 0 deletions amqp-quickstart/amqp-quickstart-producer/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@
<artifactId>quarkus-junit5</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.rest-assured</groupId>
<artifactId>rest-assured</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +0,0 @@
# Configure the outgoing `quote-requests` queue
mp.messaging.outgoing.quote-requests.connector=smallrye-amqp

# Configure the incoming `quotes` queue
mp.messaging.incoming.quotes.connector=smallrye-amqp
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package org.acme.amqp.producer;


import io.quarkus.test.junit.NativeImageTest;

@NativeImageTest
public class QuotesResourceIT extends QuotesResourceTest {

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package org.acme.amqp.producer;

import static io.restassured.RestAssured.given;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;

import java.util.UUID;

import org.junit.jupiter.api.Test;

import io.quarkus.test.junit.QuarkusTest;

@QuarkusTest
public class QuotesResourceTest {

@Test
void testQuotesEventStream() {
String body = given()
.when()
.post("/quotes/request")
.then()
.statusCode(200)
.extract().body()
.asString();
assertDoesNotThrow(() -> UUID.fromString(body));
}
}
Original file line number Diff line number Diff line change
@@ -1,21 +1,8 @@
# set the connector for the outgoing channel to `smallrye-kafka`
mp.messaging.outgoing.movies.connector=smallrye-kafka

# set the topic name for the channel to `movies`
mp.messaging.outgoing.movies.topic=movies

# automatically register the schema with the registry, if not present
mp.messaging.outgoing.movies.apicurio.registry.auto-register=true

# set the connector for the incoming channel to `smallrye-kafka`
mp.messaging.incoming.movies-from-kafka.connector=smallrye-kafka
kafka.apicurio.registry.auto-register=true
kafka.auto.offset.reset=earliest

# set the topic name for the channel to `movies`
# set the topic name for the incoming channel to `movies`, as it's not the channel name
mp.messaging.incoming.movies-from-kafka.topic=movies

# disable auto-commit, Reactive Messaging handles it itself
mp.messaging.incoming.movies-from-kafka.enable.auto.commit=false

mp.messaging.incoming.movies-from-kafka.auto.offset.reset=earliest

%prod.mp.messaging.connector.smallrye-kafka.apicurio.registry.url=http://localhost:8081/apis/registry/v2
%prod.kafka.apicurio.registry.url=http://localhost:8081/apis/registry/v2
Original file line number Diff line number Diff line change
@@ -1,12 +1,8 @@
# Configure the Kafka sink (we write to it)
mp.messaging.outgoing.generated-price.connector=smallrye-kafka
mp.messaging.outgoing.generated-price.topic=prices
mp.messaging.outgoing.generated-price.value.serializer=org.apache.kafka.common.serialization.IntegerSerializer

# Configure the Kafka source (we read from it)
mp.messaging.incoming.prices.connector=smallrye-kafka
mp.messaging.incoming.prices.topic=prices
mp.messaging.incoming.prices.health-readiness-enabled=false
mp.messaging.incoming.prices.value.deserializer=org.apache.kafka.common.serialization.IntegerDeserializer

%prod.quarkus.datasource.db-kind=postgresql
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,6 @@
# Configure the Kafka sink (we write to it)
mp.messaging.outgoing.generated-price.connector=smallrye-kafka
# We must set the topic we wrtie to, as it's not the channel name.
mp.messaging.outgoing.generated-price.topic=prices
mp.messaging.outgoing.generated-price.value.serializer=org.apache.kafka.common.serialization.IntegerSerializer

# Configure the Kafka source (we read from it)
mp.messaging.incoming.prices.connector=smallrye-kafka
mp.messaging.incoming.prices.topic=prices
mp.messaging.incoming.prices.health-readiness-enabled=false
mp.messaging.incoming.prices.value.deserializer=org.apache.kafka.common.serialization.IntegerDeserializer

%prod.quarkus.datasource.db-kind=postgresql
%prod.quarkus.datasource.username=quarkus_test
Expand Down
5 changes: 0 additions & 5 deletions kafka-quickstart/processor/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,6 @@
<artifactId>quarkus-junit5</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.smallrye.reactive</groupId>
<artifactId>smallrye-reactive-messaging-in-memory</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
%dev.quarkus.http.port=8081

# Configure the incoming `quote-requests` Kafka topic
mp.messaging.incoming.requests.connector=smallrye-kafka
# Go bad to the first records, if it's out first access
kafka.auto.offset.reset=earliest

# Set the Kafka topic, as it's not the channel name
mp.messaging.incoming.requests.topic=quote-requests
mp.messaging.incoming.requests.auto.offset.reset=earliest


# Configure the outgoing `quotes` Kafka topic
mp.messaging.outgoing.quotes.connector=smallrye-kafka
mp.messaging.outgoing.quotes.value.serializer=io.quarkus.kafka.client.serialization.ObjectMapperSerializer
Original file line number Diff line number Diff line change
@@ -1,5 +0,0 @@
# Configure the outgoing `quote-requests` Kafka topic
mp.messaging.outgoing.quote-requests.connector=smallrye-kafka

# Configure the incoming `quotes` Kafka topic
mp.messaging.incoming.quotes.connector=smallrye-kafka
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package org.acme.kafka.producer;


import io.quarkus.test.junit.NativeImageTest;

@NativeImageTest
public class QuotesResourceIT extends QuotesResourceTest {

}

0 comments on commit 2385a13

Please sign in to comment.