Skip to content

Commit

Permalink
Partially adapted to separate cdc
Browse files Browse the repository at this point in the history
  • Loading branch information
dartartem committed Jun 6, 2019
1 parent 2060012 commit c464c09
Show file tree
Hide file tree
Showing 52 changed files with 217 additions and 156 deletions.
5 changes: 4 additions & 1 deletion buildSrc/src/main/groovy/FtgoServicePlugin.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@ class FtgoServicePlugin implements Plugin<Project> {
}
}

project.configurations.all {
exclude group: 'org.apache.logging.log4j'
exclude group: 'log4j'
}

project.dependencies {

compile 'org.springframework.cloud:spring-cloud-starter-sleuth'
compile 'org.springframework.cloud:spring-cloud-starter-zipkin'
compile 'io.zipkin.brave:brave-bom:4.17.1'
Expand Down
8 changes: 4 additions & 4 deletions docker-compose-mysql-schema-per-service.yml
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
version: '3'
services:
zookeeper:
image: eventuateio/eventuateio-local-zookeeper:0.30.2.RELEASE
image: test-eventuate-zookeeper
ports:
- 2181:2181
- 2888:2888
- 3888:3888
kafka:
image: eventuateio/eventuateio-local-kafka:0.30.2.RELEASE
image: test-eventuate-kafka
ports:
- 9092:9092
depends_on:
Expand All @@ -24,8 +24,8 @@ services:
- MYSQL_ROOT_PASSWORD=rootpassword
- MYSQL_USER=mysqluser
- MYSQL_PASSWORD=mysqlpw
tram-cdc-service:
image: eventuateio/eventuate-tram-cdc-mysql-service:0.21.2.RELEASE
cdc-service:
image: test-eventuate-cdc-service
ports:
- "8099:8080"
depends_on:
Expand Down
6 changes: 3 additions & 3 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
version: '3'
services:
zookeeper:
image: eventuateio/eventuateio-local-zookeeper:0.30.2.RELEASE
image: test-eventuate-zookeeper
ports:
- 2181:2181
- 2888:2888
- 3888:3888
kafka:
image: eventuateio/eventuateio-local-kafka:0.30.2.RELEASE
image: test-eventuate-kafka
ports:
- 9092:9092
depends_on:
Expand All @@ -25,7 +25,7 @@ services:
- MYSQL_USER=mysqluser
- MYSQL_PASSWORD=mysqlpw
tram-cdc-service:
image: eventuateio/eventuate-tram-cdc-mysql-service:0.21.2.RELEASE
image: test-eventuate-cdc-service
ports:
- "8099:8080"
depends_on:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@

import io.eventuate.javaclient.driver.EventuateDriverConfiguration;
import io.eventuate.jdbckafka.TramJdbcKafkaConfiguration;
import io.eventuate.tram.commands.common.ChannelMapping;
import io.eventuate.tram.commands.common.DefaultChannelMapping;
import io.eventuate.tram.commands.producer.TramCommandProducerConfiguration;
import io.eventuate.tram.messaging.common.ChannelMapping;
import io.eventuate.tram.messaging.common.DefaultChannelMapping;
import net.chrisrichardson.ftgo.accountingservice.messaging.AccountingMessagingConfiguration;
import net.chrisrichardson.ftgo.accountingservice.web.AccountingWebConfiguration;
import org.springframework.boot.SpringApplication;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@
import io.eventuate.javaclient.spring.EnableEventHandlers;
import io.eventuate.tram.commands.consumer.CommandDispatcher;
import io.eventuate.tram.consumer.common.DuplicateMessageDetector;
import io.eventuate.tram.events.common.DefaultDomainEventNameMapping;
import io.eventuate.tram.events.subscriber.DomainEventDispatcher;
import io.eventuate.tram.messaging.consumer.MessageConsumer;
import io.eventuate.tram.messaging.producer.MessageProducer;
import io.eventuate.tram.sagas.eventsourcingsupport.SagaReplyRequestedEventSubscriber;
import net.chrisrichardson.ftgo.accountingservice.domain.Account;
import net.chrisrichardson.ftgo.accountingservice.domain.AccountServiceConfiguration;
Expand All @@ -27,7 +29,10 @@ public AccountingEventConsumer accountingEventConsumer() {

@Bean
public DomainEventDispatcher domainEventDispatcher(AccountingEventConsumer accountingEventConsumer, MessageConsumer messageConsumer) {
return new DomainEventDispatcher("accountingServiceDomainEventDispatcher", accountingEventConsumer.domainEventHandlers(), messageConsumer);
return new DomainEventDispatcher("accountingServiceDomainEventDispatcher",
accountingEventConsumer.domainEventHandlers(),
messageConsumer,
new DefaultDomainEventNameMapping());
}

@Bean
Expand All @@ -38,8 +43,10 @@ public AccountingServiceCommandHandler accountCommandHandler() {

@Bean
public CommandDispatcher commandDispatcher(AccountingServiceCommandHandler target,
AccountServiceChannelConfiguration data) {
return new CommandDispatcher(data.getCommandDispatcherId(), target.commandHandlers());
AccountServiceChannelConfiguration data,
MessageConsumer messageConsumer,
MessageProducer messageProducer) {
return new CommandDispatcher(data.getCommandDispatcherId(), target.commandHandlers(), messageConsumer, messageProducer);
}

@Bean
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
spring.application.name=ftgo-accounting-service

management.health.jms.enabled=false
management.health.redis.enabled=false
management.endpoint.health.show-details=always

spring.jpa.generate-ddl=true
logging.level.org.springframework.orm.jpa=INFO
logging.level.org.hibernate.SQL=DEBUG
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,18 @@

import io.eventuate.sync.AggregateRepository;
import io.eventuate.javaclient.spring.jdbc.EmbeddedTestAggregateStoreConfiguration;
import io.eventuate.tram.commands.common.ChannelMapping;
import io.eventuate.tram.commands.common.DefaultChannelMapping;
import io.eventuate.tram.commands.producer.CommandProducer;
import io.eventuate.tram.commands.producer.TramCommandProducerConfiguration;
import io.eventuate.tram.events.publisher.DomainEventPublisher;
import io.eventuate.tram.events.publisher.TramEventsPublisherConfiguration;
import io.eventuate.tram.inmemory.TramInMemoryConfiguration;
import io.eventuate.tram.messaging.common.ChannelMapping;
import io.eventuate.tram.messaging.common.DefaultChannelMapping;
import io.eventuate.tram.sagas.common.SagaCommandHeaders;
import io.eventuate.tram.testutil.TestMessageConsumer;
import io.eventuate.tram.testutil.TestMessageConsumerFactory;
import net.chrisrichardson.ftgo.accountingservice.domain.Account;
import net.chrisrichardson.ftgo.accountingservice.domain.AccountCommand;
import net.chrisrichardson.ftgo.accountingservice.domain.AuthorizeCommandInternal;
import net.chrisrichardson.ftgo.accountservice.api.AccountingServiceChannels;
import net.chrisrichardson.ftgo.accountservice.api.AuthorizeCommand;
import net.chrisrichardson.ftgo.common.Money;
Expand Down
1 change: 1 addition & 0 deletions ftgo-common/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ dependencies {
compile "org.springframework.boot:spring-boot-starter-data-jpa:$springBootVersion"

compile "io.eventuate.client.java:eventuate-client-java-common-impl:$eventuateClientVersion"
compile "io.eventuate.common:eventuate-common-json-mapper:$eventuateCommonVersion"

testCompile "org.springframework.boot:spring-boot-starter-test:$springBootVersion"

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package net.chrisrichardson.ftgo.common;

import io.eventuate.javaclient.commonimpl.JSonMapper;
import io.eventuate.common.json.mapper.JSonMapper;

import javax.annotation.PostConstruct;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
package net.chrisrichardson.ftgo.common;

import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.eventuate.javaclient.commonimpl.JSonMapper;
import io.eventuate.common.json.mapper.JSonMapper;
import org.apache.commons.lang.builder.EqualsBuilder;
import org.apache.commons.lang.builder.HashCodeBuilder;
import org.apache.commons.lang.builder.ToStringBuilder;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package net.chrisrichardson.ftgo.consumerservice.domain;

import io.eventuate.tram.events.ResultWithEvents;
import io.eventuate.tram.events.publisher.ResultWithEvents;
import net.chrisrichardson.ftgo.common.Money;
import net.chrisrichardson.ftgo.common.PersonName;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package net.chrisrichardson.ftgo.consumerservice.domain;

import io.eventuate.tram.events.ResultWithEvents;
import io.eventuate.tram.events.publisher.DomainEventPublisher;
import io.eventuate.tram.events.publisher.ResultWithEvents;
import net.chrisrichardson.ftgo.common.Money;
import net.chrisrichardson.ftgo.common.PersonName;
import org.springframework.beans.factory.annotation.Autowired;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
package net.chrisrichardson.ftgo.consumerservice.domain;

import io.eventuate.tram.commands.common.ChannelMapping;
import io.eventuate.tram.commands.common.DefaultChannelMapping;
import io.eventuate.tram.commands.consumer.CommandDispatcher;
import io.eventuate.tram.events.publisher.TramEventsPublisherConfiguration;
import io.eventuate.tram.messaging.common.ChannelMapping;
import io.eventuate.tram.messaging.common.DefaultChannelMapping;
import io.eventuate.tram.messaging.consumer.MessageConsumer;
import io.eventuate.tram.messaging.producer.MessageProducer;
import io.eventuate.tram.sagas.common.SagaLockManager;
import io.eventuate.tram.sagas.participant.SagaCommandDispatcher;
import io.eventuate.tram.sagas.participant.SagaParticipantConfiguration;
import net.chrisrichardson.ftgo.common.CommonConfiguration;
Expand Down Expand Up @@ -34,8 +37,15 @@ public ConsumerService consumerService() {
}

@Bean
public CommandDispatcher commandDispatcher(ConsumerServiceCommandHandlers consumerServiceCommandHandlers) {
return new SagaCommandDispatcher("consumerServiceDispatcher", consumerServiceCommandHandlers.commandHandlers());
public CommandDispatcher commandDispatcher(ConsumerServiceCommandHandlers consumerServiceCommandHandlers,
MessageConsumer messageConsumer,
MessageProducer messageProducer,
SagaLockManager sagaLockManager) {
return new SagaCommandDispatcher("consumerServiceDispatcher",
consumerServiceCommandHandlers.commandHandlers(),
messageConsumer,
messageProducer,
sagaLockManager);
}

@Bean
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package net.chrisrichardson.ftgo.consumerservice.web;

import io.eventuate.tram.events.ResultWithEvents;
import io.eventuate.tram.events.publisher.ResultWithEvents;
import net.chrisrichardson.ftgo.consumerservice.api.web.CreateConsumerRequest;
import net.chrisrichardson.ftgo.consumerservice.api.web.CreateConsumerResponse;
import net.chrisrichardson.ftgo.consumerservice.domain.Consumer;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
spring.application.name=ftgo-consumer-service

management.health.jms.enabled=false
management.health.redis.enabled=false
management.endpoint.health.show-details=always

spring.jpa.generate-ddl=true
logging.level.org.springframework.orm.jpa=INFO
logging.level.org.hibernate.SQL=DEBUG
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import com.jayway.restassured.RestAssured;
import com.jayway.restassured.config.ObjectMapperConfig;
import com.jayway.restassured.config.RestAssuredConfig;
import io.eventuate.javaclient.commonimpl.JSonMapper;
import io.eventuate.common.json.mapper.JSonMapper;
import net.chrisrichardson.ftgo.common.CommonJsonMapperInitializer;
import net.chrisrichardson.ftgo.common.Money;
import net.chrisrichardson.ftgo.common.PersonName;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package net.chrisrichardson.ftgo.kitchenservice.main;

import io.eventuate.jdbckafka.TramJdbcKafkaConfiguration;
import io.eventuate.tram.commands.common.ChannelMapping;
import io.eventuate.tram.commands.common.DefaultChannelMapping;
import io.eventuate.tram.messaging.common.ChannelMapping;
import io.eventuate.tram.messaging.common.DefaultChannelMapping;
import net.chrisrichardson.eventstore.examples.customersandorders.commonswagger.CommonSwaggerConfiguration;
import net.chrisrichardson.ftgo.kitchenservice.messagehandlers.KitchenServiceMessageHandlersConfiguration;
import net.chrisrichardson.ftgo.kitchenservice.web.KitchenServiceWebConfiguration;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package net.chrisrichardson.ftgo.kitchenservice.messagehandlers;

import io.eventuate.tram.events.common.DefaultDomainEventNameMapping;
import io.eventuate.tram.events.subscriber.DomainEventDispatcher;
import io.eventuate.tram.messaging.consumer.MessageConsumer;
import io.eventuate.tram.messaging.producer.MessageProducer;
import io.eventuate.tram.sagas.common.SagaLockManager;
import io.eventuate.tram.sagas.participant.SagaCommandDispatcher;
import io.eventuate.tram.sagas.participant.SagaParticipantConfiguration;
import net.chrisrichardson.ftgo.common.CommonConfiguration;
Expand All @@ -25,12 +28,22 @@ public KitchenServiceCommandHandler kitchenServiceCommandHandler() {
}

@Bean
public SagaCommandDispatcher kitchenServiceSagaCommandDispatcher(KitchenServiceCommandHandler kitchenServiceCommandHandler) {
return new SagaCommandDispatcher("kitchenServiceCommands", kitchenServiceCommandHandler.commandHandlers());
public SagaCommandDispatcher kitchenServiceSagaCommandDispatcher(KitchenServiceCommandHandler kitchenServiceCommandHandler,
MessageConsumer messageConsumer,
MessageProducer messageProducer,
SagaLockManager sagaLockManager) {
return new SagaCommandDispatcher("kitchenServiceCommands",
kitchenServiceCommandHandler.commandHandlers(),
messageConsumer,
messageProducer,
sagaLockManager);
}

@Bean
public DomainEventDispatcher domainEventDispatcher(KitchenServiceEventConsumer kitchenServiceEventConsumer, MessageConsumer messageConsumer) {
return new DomainEventDispatcher("kitchenServiceEvents", kitchenServiceEventConsumer.domainEventHandlers(), messageConsumer); // @Autowire
return new DomainEventDispatcher("kitchenServiceEvents",
kitchenServiceEventConsumer.domainEventHandlers(),
messageConsumer,
new DefaultDomainEventNameMapping()); // @Autowire
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
spring.application.name=ftgo-kitchen-service

management.health.jms.enabled=false
management.health.redis.enabled=false
management.endpoint.health.show-details=always

spring.jpa.generate-ddl=true
logging.level.org.springframework.orm.jpa=INFO
logging.level.org.hibernate.SQL=DEBUG
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package net.chrisrichardson.ftgo.kitchenservice.domain;


import io.eventuate.tram.commands.common.ChannelMapping;
import io.eventuate.tram.commands.common.DefaultChannelMapping;
import io.eventuate.tram.commands.producer.CommandProducer;
import io.eventuate.tram.commands.producer.TramCommandProducerConfiguration;
import io.eventuate.tram.inmemory.TramInMemoryConfiguration;
import io.eventuate.tram.messaging.common.ChannelMapping;
import io.eventuate.tram.messaging.common.DefaultChannelMapping;
import io.eventuate.tram.sagas.common.SagaCommandHeaders;
import io.eventuate.tram.testutil.TestMessageConsumer;
import io.eventuate.tram.testutil.TestMessageConsumerFactory;
Expand Down
4 changes: 2 additions & 2 deletions ftgo-order-history-service/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ dependencyManagement {
}
}


dependencies {
compile 'com.amazonaws:aws-java-sdk-dynamodb:1.11.158'

Expand All @@ -30,9 +29,10 @@ dependencies {
exclude module: "spring-boot-starter-data-jpa"
}

compile "io.eventuate.tram.core:eventuate-tram-consumer-kafka:$eventuateTramVersion"
compile "io.eventuate.messaging.kafka:eventuate-messaging-kafka-consumer:$eventuateMessagingKafkaVersion"

compile "io.eventuate.tram.core:eventuate-tram-events:$eventuateTramVersion"
compile "io.eventuate.tram.core:eventuate-tram-consumer-wrappers:$eventuateTramVersion"

compile "org.springframework.boot:spring-boot-starter-actuator:$springBootVersion"
compile "org.springframework.boot:spring-boot-starter-web:$springBootVersion"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
package net.chrisrichardson.ftgo.cqrs.orderhistory.main;

import io.eventuate.tram.consumer.kafka.TramConsumerKafkaConfiguration;
import io.eventuate.messaging.kafka.consumer.MessageConsumerKafkaConfiguration;
import io.eventuate.tram.consumer.common.TramConsumerCommonConfiguration;
import net.chrisrichardson.eventstore.examples.customersandorders.commonswagger.CommonSwaggerConfiguration;
import net.chrisrichardson.ftgo.cqrs.orderhistory.messaging.OrderHistoryServiceMessagingConfiguration;
import net.chrisrichardson.ftgo.cqrs.orderhistory.web.OrderHistoryWebConfiguration;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.orm.jpa.HibernateJpaAutoConfiguration;
import org.springframework.context.annotation.Import;

@SpringBootApplication
@Import({OrderHistoryWebConfiguration.class, OrderHistoryServiceMessagingConfiguration.class,
TramConsumerKafkaConfiguration.class, CommonSwaggerConfiguration.class})
MessageConsumerKafkaConfiguration.class, TramConsumerCommonConfiguration.class, CommonSwaggerConfiguration.class})
public class OrderHistoryServiceMain {

public static void main(String[] args) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package net.chrisrichardson.ftgo.cqrs.orderhistory.messaging;

import io.eventuate.messaging.kafka.consumer.MessageConsumerKafkaImpl;
import io.eventuate.tram.consumer.common.MessageConsumerImplementation;
import io.eventuate.tram.consumer.common.TramNoopDuplicateMessageDetectorConfiguration;
import io.eventuate.tram.consumer.wrappers.EventuateKafkaMessageConsumerWrapper;
import io.eventuate.tram.events.common.DefaultDomainEventNameMapping;
import io.eventuate.tram.events.subscriber.DomainEventDispatcher;
import io.eventuate.tram.messaging.consumer.MessageConsumer;
import net.chrisrichardson.ftgo.common.CommonConfiguration;
Expand All @@ -13,14 +17,22 @@
@Import({CommonConfiguration.class, TramNoopDuplicateMessageDetectorConfiguration.class})
public class OrderHistoryServiceMessagingConfiguration {

@Bean
public MessageConsumerImplementation messageConsumerImplementation(MessageConsumerKafkaImpl messageConsumerKafka) {
return new EventuateKafkaMessageConsumerWrapper(messageConsumerKafka);
}

@Bean
public OrderHistoryEventHandlers orderHistoryEventHandlers(OrderHistoryDao orderHistoryDao) {
return new OrderHistoryEventHandlers(orderHistoryDao);
}

@Bean
public DomainEventDispatcher orderHistoryDomainEventDispatcher(OrderHistoryEventHandlers orderHistoryEventHandlers, MessageConsumer messageConsumer) {
return new DomainEventDispatcher("orderHistoryDomainEventDispatcher", orderHistoryEventHandlers.domainEventHandlers(), messageConsumer);
return new DomainEventDispatcher("orderHistoryDomainEventDispatcher",
orderHistoryEventHandlers.domainEventHandlers(),
messageConsumer,
new DefaultDomainEventNameMapping());
}

}
Loading

0 comments on commit c464c09

Please sign in to comment.