Skip to content

Commit

Permalink
SWATCH-2558: Save events creates kafka message (#3554)
Browse files Browse the repository at this point in the history
<!-- Replace XXXX with the issue number. Issue will be auto-linked -->
Jira issue: SWATCH-2558

## Description
<!-- Provide a description of this PR. Try to provide answers to "what",
"how",
and "why" -->
Previously, events were sent to this endpoint during testing and did not
go through the eventController. This bypassed some of the verification
and processing. This allows the event to be created, emitted, and
consumed.

## Testing
<!--
When possible, please use commands a developer can directly paste or
modify
-->
The iqe test "test_event_uom_to_metric_id_via_api" sends data to the
saveEvents endpoint.


### Setup
<!-- Add any steps required to set up the test case -->
1. ```podman-compose up -d```
2. ```RHSM_SUBSCRIPTIONS_ENABLE_SYNCHRONOUS_OPERATIONS=true
DEV_MODE=true
PROM_URL=http://localhost:8082/api/metrics/v1/telemeter/api/v1
SWATCH_CONTRACTS_INTERNAL_SERVICE=http://localhost:8882 SERVER_PORT=8000
SPRING_PROFILES_ACTIVE=worker,kafka-queue,api,capacity-ingress,rh-marketplace,rhsm-conduit
./gradlew clean :bootRun```

### Steps
<!-- Enter each step of the test below -->
1. Run the iqe test listed above.

### Verification
<!-- Enter the steps needed to verify the test passed -->
1. The events table in the database will have a new row.
2. You will see 
```[thread=swatch-instance-ingress-0-C-1] [INFO ] [org.candlepin.subscriptions.tally.billing.ServiceInstanceMessageConsumer] - Events received w/ event list size=1. Consuming events.``` in the logs.
  • Loading branch information
wottop authored Aug 22, 2024
2 parents 4c22515 + dd3e0b4 commit 8d41cbf
Show file tree
Hide file tree
Showing 3 changed files with 133 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.candlepin.subscriptions.inventory.db.InventoryDataSourceConfiguration;
import org.candlepin.subscriptions.json.EnabledOrgsRequest;
import org.candlepin.subscriptions.json.EnabledOrgsResponse;
import org.candlepin.subscriptions.json.Event;
import org.candlepin.subscriptions.json.TallySummary;
import org.candlepin.subscriptions.product.ProductConfiguration;
import org.candlepin.subscriptions.tally.billing.BillableUsageConfiguration;
Expand Down Expand Up @@ -206,6 +207,25 @@ public KafkaTemplate<String, TallySummary> tallySummaryKafkaTemplate(
return new KafkaTemplate<>(tallySummaryProducerFactory);
}

@Bean
public ProducerFactory<String, Event> eventProducerFactory(
KafkaProperties kafkaProperties, ObjectMapper objectMapper) {
DefaultKafkaProducerFactory<String, Event> factory =
new DefaultKafkaProducerFactory<>(getProducerProperties(kafkaProperties));
/*
Use our customized ObjectMapper. Notably, the spring-kafka default ObjectMapper writes dates as
timestamps, which produces messages not compatible with JSON-B deserialization.
*/
factory.setValueSerializer(new JsonSerializer<>(objectMapper));
return factory;
}

@Bean
public KafkaTemplate<String, Event> eventKafkaTemplate(
ProducerFactory<String, Event> eventProducerFactory) {
return new KafkaTemplate<>(eventProducerFactory);
}

@Bean(name = "purgeTallySnapshotsJobExecutor")
public Executor getPurgeSnapshotsJobExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,21 @@
*/
package org.candlepin.subscriptions.tally.admin;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.micrometer.core.annotation.Timed;
import jakarta.transaction.Transactional;
import jakarta.ws.rs.BadRequestException;
import java.time.OffsetDateTime;
import java.time.temporal.ChronoUnit;
import java.util.List;
import lombok.extern.slf4j.Slf4j;
import org.candlepin.clock.ApplicationClock;
import org.candlepin.subscriptions.ApplicationProperties;
import org.candlepin.subscriptions.db.EventRecordRepository;
import org.candlepin.subscriptions.db.model.config.OptInType;
import org.candlepin.subscriptions.json.Event;
import org.candlepin.subscriptions.resource.ResourceUtils;
import org.candlepin.subscriptions.retention.TallyRetentionController;
import org.candlepin.subscriptions.security.SecurityProperties;
Expand All @@ -44,9 +49,12 @@
import org.candlepin.subscriptions.tally.admin.api.model.UuidList;
import org.candlepin.subscriptions.tally.events.EventRecordsRetentionProperties;
import org.candlepin.subscriptions.tally.job.CaptureSnapshotsTaskManager;
import org.candlepin.subscriptions.task.TaskQueueProperties;
import org.candlepin.subscriptions.util.LogUtils;
import org.jetbrains.annotations.NotNull;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.core.task.TaskRejectedException;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

/** This resource is for exposing administrator REST endpoints for Tally. */
Expand All @@ -68,6 +76,9 @@ public class InternalTallyResource implements InternalTallyApi {
private final SecurityProperties properties;
private final EventRecordRepository eventRecordRepository;
private final EventRecordsRetentionProperties eventRecordsRetentionProperties;
private final KafkaTemplate<String, Event> eventKafkaTemplate;
private final ObjectMapper objectMapper;
private final String eventTopic;

@SuppressWarnings("java:S107")
public InternalTallyResource(
Expand All @@ -79,7 +90,11 @@ public InternalTallyResource(
InternalTallyDataController internalTallyDataController,
SecurityProperties properties,
EventRecordRepository eventRecordRepository,
EventRecordsRetentionProperties eventRecordsRetentionProperties) {
EventRecordsRetentionProperties eventRecordsRetentionProperties,
ObjectMapper objectMapper,
KafkaTemplate<String, Event> eventKafkaTemplate,
@Qualifier("serviceInstanceTopicProperties")
TaskQueueProperties serviceInstanceTopicProperties) {
this.clock = clock;
this.applicationProperties = applicationProperties;
this.resendTallyController = resendTallyController;
Expand All @@ -89,6 +104,9 @@ public InternalTallyResource(
this.properties = properties;
this.eventRecordRepository = eventRecordRepository;
this.eventRecordsRetentionProperties = eventRecordsRetentionProperties;
this.eventKafkaTemplate = eventKafkaTemplate;
this.objectMapper = objectMapper;
this.eventTopic = serviceInstanceTopicProperties.getTopic();
}

@Override
Expand Down Expand Up @@ -213,8 +231,23 @@ public EventsResponse fetchEventsForOrgIdInTimeRange(
@Override
public EventsResponse saveEvents(String jsonListOfEvents) {
var response = new EventsResponse();
StringBuilder messages = new StringBuilder();
if (isFeatureEnabled()) {
response.setDetail(internalTallyDataController.saveEvents(jsonListOfEvents));
try {
List<Event> events = objectMapper.readValue(jsonListOfEvents, new TypeReference<>() {});
events.stream()
.forEach(
event -> {
try {
eventKafkaTemplate.send(this.eventTopic, event);
} catch (Exception e) {
messages.append(e.getMessage());
}
});
} catch (JsonProcessingException e) {
messages.append(e.getMessage());
}
response.setDetail(String.valueOf(messages));
return response;

} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,26 +21,41 @@
package org.candlepin.subscriptions.tally.admin;

import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.databind.introspect.JacksonAnnotationIntrospector;
import com.fasterxml.jackson.databind.util.StdDateFormat;
import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import com.fasterxml.jackson.module.jakarta.xmlbind.JakartaXmlBindAnnotationModule;
import jakarta.ws.rs.BadRequestException;
import java.time.OffsetDateTime;
import java.time.temporal.ChronoUnit;
import org.candlepin.clock.ApplicationClock;
import org.candlepin.subscriptions.ApplicationProperties;
import org.candlepin.subscriptions.db.EventRecordRepository;
import org.candlepin.subscriptions.json.Event;
import org.candlepin.subscriptions.retention.TallyRetentionController;
import org.candlepin.subscriptions.security.SecurityProperties;
import org.candlepin.subscriptions.tally.MarketplaceResendTallyController;
import org.candlepin.subscriptions.tally.events.EventRecordsRetentionProperties;
import org.candlepin.subscriptions.tally.job.CaptureSnapshotsTaskManager;
import org.candlepin.subscriptions.task.TaskQueueProperties;
import org.candlepin.subscriptions.test.TestClockConfiguration;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.springframework.kafka.core.KafkaTemplate;

@ExtendWith(MockitoExtension.class)
class InternalTallyResourceTest {
Expand All @@ -53,17 +68,21 @@ class InternalTallyResourceTest {
@Mock private InternalTallyDataController internalTallyDataController;
@Mock private SecurityProperties properties;
@Mock private EventRecordRepository eventRecordRepository;
@Mock private KafkaTemplate<String, Event> kafkaTemplate;

private EventRecordsRetentionProperties eventRecordsRetentionProperties;
private InternalTallyResource resource;
private ApplicationProperties appProps;
private ApplicationClock clock;
private TaskQueueProperties taskQueueProperties;

@BeforeEach
void setupTest() {
clock = new TestClockConfiguration().adjustableClock();
appProps = new ApplicationProperties();
eventRecordsRetentionProperties = new EventRecordsRetentionProperties();
taskQueueProperties = new TaskQueueProperties();
taskQueueProperties.setTopic("platform.rhsm-subscriptions.service-instance-ingress");
resource =
new InternalTallyResource(
clock,
Expand All @@ -74,7 +93,45 @@ void setupTest() {
internalTallyDataController,
properties,
eventRecordRepository,
eventRecordsRetentionProperties);
eventRecordsRetentionProperties,
objectMapper(appProps),
kafkaTemplate,
taskQueueProperties);
}

@Test
void testSaveEvents() {
when(properties.isDevMode()).thenReturn(true);
resource.saveEvents(
"[{"
+ " \"sla\": \"Premium\","
+ " \"role\": \"moa-hostedcontrolplane\","
+ " \"org_id\": \"11091977\","
+ " \"event_id\": \"e2afa8c0-63de-4f55-8ded-66d088c439c4\","
+ " \"timestamp\": \"2024-07-10T03:00:00Z\","
+ " \"conversion\": false,"
+ " \"event_type\": \"snapshot_rosa_cores\","
+ " \"expiration\": \"2024-07-11T04:00:00Z\","
+ " \"instance_id\": \"73e5401b-52ff-4fb2-adcd-ed151ed9b1bd\","
+ " \"product_ids\": [],"
+ " \"product_tag\": ["
+ " \"rosa\""
+ " ],"
+ " \"record_date\": \"2024-07-02T04:31:06.608885043Z\", "
+ " \"display_name\": \"psl-rosa-nprod\","
+ " \"event_source\": \"prometheus\","
+ " \"measurements\": ["
+ " {"
+ " \"value\": 13.333333333333334,"
+ " \"metric_id\": \"Cores\""
+ " }"
+ " ],"
+ " \"service_type\": \"rosa Instance\","
+ " \"billing_provider\": \"aws\","
+ " \"metering_batch_id\": \"f39d1d22-eb72-4597-b722-5bee34abd78d\","
+ " \"billing_account_id\": \"381492115198\""
+ " }]");
verify(kafkaTemplate, times(1)).send(eq(taskQueueProperties.getTopic()), any(Event.class));
}

@Test
Expand Down Expand Up @@ -137,4 +194,24 @@ void testPurgeEventRecords() {
verify(eventRecordRepository)
.deleteInBulkEventRecordsByTimestampBefore(expectedRetentionTarget);
}

ObjectMapper objectMapper(ApplicationProperties applicationProperties) {
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
objectMapper.setDateFormat(new StdDateFormat().withColonInTimeZone(true));
objectMapper.configure(
SerializationFeature.INDENT_OUTPUT, applicationProperties.isPrettyPrintJson());
objectMapper.setSerializationInclusion(JsonInclude.Include.NON_NULL);
objectMapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
objectMapper.setAnnotationIntrospector(new JacksonAnnotationIntrospector());

// Explicitly load the modules we need rather than use ObjectMapper.findAndRegisterModules in
// order to avoid com.fasterxml.jackson.module.scala.DefaultScalaModule, which was causing
// deserialization to ignore @JsonProperty on OpenApi classes.
objectMapper.registerModule(new JakartaXmlBindAnnotationModule());
objectMapper.registerModule(new JavaTimeModule());
objectMapper.registerModule(new Jdk8Module());

return objectMapper;
}
}

0 comments on commit 8d41cbf

Please sign in to comment.