Skip to content

Commit

Permalink
Increase azure-eventhubs test coverage
Browse files Browse the repository at this point in the history
Fixes #6367
  • Loading branch information
jamesnetherton committed Aug 22, 2024
1 parent 16ab2d8 commit 7e62303
Show file tree
Hide file tree
Showing 10 changed files with 976 additions and 71 deletions.
25 changes: 21 additions & 4 deletions integration-test-groups/azure/azure-eventhubs/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,15 @@
<dependencies>
<dependency>
<groupId>org.apache.camel.quarkus</groupId>
<artifactId>camel-quarkus-azure-eventhubs</artifactId>
<artifactId>camel-quarkus-direct</artifactId>
</dependency>
<dependency>
<groupId>org.apache.camel.quarkus</groupId>
<artifactId>camel-quarkus-mock</artifactId>
<artifactId>camel-quarkus-azure-eventhubs</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-quartz</artifactId>
<groupId>org.apache.camel.quarkus</groupId>
<artifactId>camel-quarkus-mock</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
Expand All @@ -51,6 +51,10 @@
<groupId>io.quarkus</groupId>
<artifactId>quarkus-resteasy-jsonb</artifactId>
</dependency>
<dependency>
<groupId>org.apache.camel.quarkus</groupId>
<artifactId>camel-quarkus-integration-test-support</artifactId>
</dependency>

<!-- test dependencies -->
<dependency>
Expand Down Expand Up @@ -107,6 +111,19 @@
</activation>
<dependencies>
<!-- The following dependencies guarantee that this module is built after them. You can update them by running `mvn process-resources -Pformat -N` from the source tree root directory -->
<dependency>
<groupId>org.apache.camel.quarkus</groupId>
<artifactId>camel-quarkus-direct-deployment</artifactId>
<version>${project.version}</version>
<type>pom</type>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.camel.quarkus</groupId>
<artifactId>camel-quarkus-azure-eventhubs-deployment</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.quarkus.component.azure.eventhubs.it;

import java.util.HashMap;
import java.util.Map;
import java.util.Optional;

import com.azure.core.amqp.implementation.ConnectionStringProperties;
import org.eclipse.microprofile.config.Config;
import org.eclipse.microprofile.config.ConfigProvider;

public final class AzureCredentialsHelper {
private AzureCredentialsHelper() {
// Utility class
}

public static boolean isMinimumConfigurationAvailable() {
Config config = ConfigProvider.getConfig();
if (isMockBackEnd()) {
return false;
}
Optional<String> storageAccountName = config.getOptionalValue("azure.storage.account-name", String.class);
Optional<String> storageAccountKey = config.getOptionalValue("azure.storage.account-key", String.class);
Optional<String> connectionString = config.getOptionalValue("azure.event.hubs.connection.string", String.class);
return storageAccountName.isPresent() && storageAccountKey.isPresent() && connectionString.isPresent();
}

public static boolean isAzureIdentityCredentialsAvailable() {
Config config = ConfigProvider.getConfig();
if (isMockBackEnd()) {
return false;
}

Optional<String> clientId = config.getOptionalValue("azure.client.id", String.class);
Optional<String> tenantId = config.getOptionalValue("azure.tenant.id", String.class);
Optional<String> username = config.getOptionalValue("azure.username", String.class);
Optional<String> password = config.getOptionalValue("azure.password", String.class);
Optional<String> clientSecret = config.getOptionalValue("azure.client.secret", String.class);
Optional<String> clientCertificate = config.getOptionalValue("azure.client.certificate.path", String.class);
Optional<String> clientCertificatePassword = config.getOptionalValue("azure.client.certificate.password", String.class);
return (clientId.isPresent() && tenantId.isPresent() &&
(username.isPresent() || password.isPresent() || clientCertificate.isPresent() || clientSecret.isPresent()
|| clientCertificatePassword.isPresent()));
}

public static boolean isSharedAccessKeyAvailable() {
Config config = ConfigProvider.getConfig();
if (isMockBackEnd()) {
return false;
}
return config.getOptionalValue("azure.event.hubs.shared.access.name", String.class).isPresent()
&& config.getOptionalValue("azure.event.hubs.shared.access.key", String.class).isPresent();
}

public static boolean isMockBackEnd() {
Config config = ConfigProvider.getConfig();
return config.getOptionalValue("camel.quarkus.start.mock.backend", Boolean.class).orElse(true);
}

public static Map<String, String> parseConnectionString(String connectionString) {
Map<String, String> properties = new HashMap<>();
ConnectionStringProperties stringProperties = new ConnectionStringProperties(connectionString);
properties.put("Endpoint", stringProperties.getEndpoint().toString());
properties.put("EntityPath", stringProperties.getEntityPath());
properties.put("SharedAccessKey", stringProperties.getSharedAccessKeyName());
properties.put("SharedAccessKeyValue", stringProperties.getSharedAccessKey());

String host = stringProperties.getEndpoint().getHost();
properties.put("Namespace", host.substring(0, host.indexOf('.')));

return properties;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.quarkus.component.azure.eventhubs.it;

import java.util.Optional;

import com.azure.core.amqp.implementation.ConnectionStringProperties;
import com.azure.core.credential.TokenCredential;
import com.azure.messaging.eventhubs.EventHubClientBuilder;
import com.azure.messaging.eventhubs.EventHubProducerAsyncClient;
import com.azure.messaging.eventhubs.implementation.ClientConstants;
import com.azure.messaging.eventhubs.implementation.EventHubSharedKeyCredential;
import jakarta.inject.Named;
import org.eclipse.microprofile.config.inject.ConfigProperty;

public class AzureEventhubsProducers {
@ConfigProperty(name = "azure.event.hubs.connection.string")
Optional<String> connectionString;

@Named("connectionStringTokenCredential")
TokenCredential tokenCredential() {
if (connectionString.isPresent()) {
ConnectionStringProperties properties = new ConnectionStringProperties(connectionString.get());
TokenCredential tokenCredential;
if (properties.getSharedAccessSignature() == null) {
tokenCredential = new EventHubSharedKeyCredential(properties.getSharedAccessKeyName(),
properties.getSharedAccessKey(), ClientConstants.TOKEN_VALIDITY);
} else {
tokenCredential = new EventHubSharedKeyCredential(properties.getSharedAccessSignature());
}
return tokenCredential;
}
return null;
}

@Named("eventHubClient")
EventHubProducerAsyncClient eventHubClient() {
return connectionString.map(connection -> new EventHubClientBuilder()
.connectionString(connection)
.buildAsyncProducerClient())
.orElse(null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,77 +17,133 @@
package org.apache.camel.quarkus.component.azure.eventhubs.it;

import java.net.URI;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;

import io.quarkus.scheduler.Scheduled;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import jakarta.ws.rs.Consumes;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.PathParam;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.QueryParam;
import jakarta.ws.rs.core.MediaType;
import jakarta.ws.rs.core.Response;
import org.apache.camel.CamelContext;
import org.apache.camel.ConsumerTemplate;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.component.azure.eventhubs.EventHubsConstants;
import org.apache.camel.component.mock.MockEndpoint;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.apache.camel.util.ObjectHelper;
import org.jboss.logging.Logger;

@Path("/azure-eventhubs")
@ApplicationScoped
public class AzureEventhubsResource {
private static final Logger LOG = Logger.getLogger(AzureEventhubsResource.class);

@Inject
ProducerTemplate producerTemplate;

@Inject
ConsumerTemplate consumerTemplate;

@Inject
CamelContext context;

@ConfigProperty(name = "azure.event.hubs.connection.string")
Optional<String> connectionString;

private volatile String message;
private int counter = 0;

/**
* For some reason if we send just a single message, it is not always received by the consumer.
* Sending multiple messages seems to be more reliable.
*/
@Scheduled(every = "1s")
void schedule() {
if (message != null) {
final String endpointUri = "azure-eventhubs:?connectionString=RAW(" + connectionString.get() + ")";
producerTemplate.sendBody(endpointUri, message + (counter++));
@Path("/receive-event")
@GET
@Produces(MediaType.APPLICATION_JSON)
public Map<String, Object> receiveEvent(@QueryParam("endpointUri") String endpointUri, String match) {
final MockEndpoint mockEndpoint = context.getEndpoint(endpointUri, MockEndpoint.class);
List<Exchange> receivedExchanges = mockEndpoint.getReceivedExchanges();

Optional<Exchange> optionalExchange = receivedExchanges.stream()
.filter(exchange -> exchange.getMessage().getBody(String.class).equals(match))
.findFirst();

if (optionalExchange.isEmpty()) {
return Collections.emptyMap();
}

Exchange exchange = optionalExchange.get();
Message message = exchange.getMessage();
return Map.of(
"body", message.getBody(String.class),
"headers", message.getHeaders());
}

@Path("/send-event/{partitionId}")
@POST
@Consumes(MediaType.TEXT_PLAIN)
@Produces(MediaType.TEXT_PLAIN)
public Response sendEvent(
@PathParam("partitionId") String partitionId,
@QueryParam("endpointUri") String endpointUri,
String message) throws Exception {

if (ObjectHelper.isEmpty(endpointUri)) {
endpointUri = "direct:sendEvent";
}

LOG.infof("Producing event to endpoint uri: %s", endpointUri);

producerTemplate.sendBodyAndHeader(endpointUri, message, EventHubsConstants.PARTITION_ID, partitionId);
return Response.created(new URI("https://camel.apache.org/")).build();
}

@Path("/receive-events")
@GET
@Produces(MediaType.APPLICATION_JSON)
public List<String> receiveEvents() throws Exception {
public List<Map<String, Object>> receiveEvents(@QueryParam("endpointUri") String endpointUri, List<String> matches) {
final MockEndpoint mockEndpoint = context.getEndpoint(endpointUri, MockEndpoint.class);
List<Exchange> receivedExchanges = mockEndpoint.getReceivedExchanges();

List<Exchange> exchanges = receivedExchanges.stream()
.filter(exchange -> matches.contains(exchange.getMessage().getBody(String.class)))
.toList();

if (exchanges.isEmpty()) {
return Collections.emptyList();
}

List<Map<String, Object>> result = new ArrayList<>();
for (Exchange exchange : exchanges) {
Message message = exchange.getMessage();
result.add(Map.of(
"body", message.getBody(String.class),
"headers", message.getHeaders()));
}

final MockEndpoint mockEndpoint = context.getEndpoint("mock:azure-consumed", MockEndpoint.class);
return mockEndpoint.getReceivedExchanges().stream()
.map(Exchange::getMessage)
.map(m -> m.getBody(String.class))
.collect(Collectors.toList());
return result;
}

@Path("/send-events")
@Path("/send-events/{partitionId}")
@POST
@Consumes(MediaType.APPLICATION_JSON)
@Produces(MediaType.TEXT_PLAIN)
@Consumes(MediaType.TEXT_PLAIN)
public Response sendEvents(String body) throws Exception {
this.message = body; // start sending the messages via schedule()
public Response sendEvents(@PathParam("partitionId") String partitionId, List<String> messages) throws Exception {
producerTemplate.sendBodyAndHeader("direct:sendEvent", messages, EventHubsConstants.PARTITION_ID, partitionId);
return Response.created(new URI("https://camel.apache.org/")).build();
}

@Path("/route/{routeId}/start")
@POST
public void startRoute(@PathParam("routeId") String routeId) throws Exception {
LOG.infof("Starting route: %s", routeId);
context.getRouteController().startRoute(routeId);
// A random jitter value is applied in the Event Hubs client before its message listener is active.
// In addition, claiming ownership of partitions seems to take an indeterminate amount of time.
// Therefore, we need to wait until it's safe to produce events
Thread.sleep(5000);
}

@Path("/route/{routeId}/stop")
@POST
public void stopRoute(@PathParam("routeId") String routeId) throws Exception {
context.getRouteController().stopRoute(routeId);
}
}
Loading

0 comments on commit 7e62303

Please sign in to comment.