diff --git a/public/images/aws/msk.svg b/public/images/aws/msk.svg
new file mode 100644
index 00000000..877fab3a
--- /dev/null
+++ b/public/images/aws/msk.svg
@@ -0,0 +1,18 @@
+
+
diff --git a/src/assets/images/aws/tutorials/schema-evolution-glue-msk-featured-image.png b/src/assets/images/aws/tutorials/schema-evolution-glue-msk-featured-image.png
new file mode 100644
index 00000000..2e990a61
Binary files /dev/null and b/src/assets/images/aws/tutorials/schema-evolution-glue-msk-featured-image.png differ
diff --git a/src/content/docs/aws/tutorials/schema-evolution-glue-msk.mdx b/src/content/docs/aws/tutorials/schema-evolution-glue-msk.mdx
new file mode 100644
index 00000000..c5191275
--- /dev/null
+++ b/src/content/docs/aws/tutorials/schema-evolution-glue-msk.mdx
@@ -0,0 +1,1061 @@
+---
+title: "Schema Evolution with Glue Schema Registry and Managed Streaming for Kafka (MSK) using LocalStack"
+description: Find incompatibilities early or even avoid them altogether when developing Kafka producers or consumers! Learn how to test data schema evolution by using Managed Streaming for Kafka (MSK) with the Glue Schema Registry in LocalStack.
+services:
+- msk
+- glu
+platform:
+- Java
+deployment:
+- awscli
+pro: true
+leadimage: "schema-evolution-glue-msk-featured-image.png"
+---
+
+## Introduction
+
+[Apache Kafka](https://kafka.apache.org/) is an open-source distributed event store and stream-processing platform.
+It is used to capture data generated by producers and distribute it among its consumers.
+Kafka is known for its scalability, with reports of production environments scaling to [trillions of messages per day](https://engineering.linkedin.com/blog/2019/apache-kafka-trillion-messages).
+With [Amazon Managed Streaming for Apache Kafka (MSK)](https://aws.amazon.com/msk/), AWS provides a service to provision Apache Kafka clusters easily.
+
+[LocalStack Pro](https://app.localstack.cloud/) supports [Amazon Managed Streaming for Kafka (MSK)](aws/services/msk), which enables you to spin up Kafka clusters on your local machine and test the integration of your applications with Amazon MSK.
+
+Kafka clusters are often used as the central messaging infrastructure in complex microservice environments.
+However, the continuous and independent development of the individual microservices - the data producers and consumers - can make it hard to coordinate and evolve data schemas over time without introducing application failures due to incompatibilities.
+A common solution to this problem is to use a schema registry which provides for the validation of schema changes, preventing any unsafe changes and subsequent application failures.
+
+[AWS Glue Schema Registry](https://docs.aws.amazon.com/glue/latest/dg/schema-registry.html) can be used as such a schema registry, enabling you to validate and evolve streaming data using Apache Avro schemas.
+It can be easily integrated into Java applications for Apache Kafka with [AWS's official open-source serializers and deserializers](https://github.com/awslabs/aws-glue-schema-registry).
+
+
+
+The following chart shows the integration of producers and consumers with Amazon MSK and the AWS Glue Schema Registry:
+
+
+
+1. Before sending a record, the producer validates that the schema it is using to serialize its records is valid.
+ We can configure the producer to register a new schema version if the schema is not yet registered.
+ - When registering the new schema version, the schema registry validates if the schema is compatible.
+ - If the registry detects an incompatibility, the registration is rejected.
+ This ensures that a producer fails early and cannot publish incompatible records in the first place.
+2. Once the schema is valid, the producer serializes and compresses the record and sends it to the Kafka cluster.
+3. The consumer reads the serialized and compressed record.
+4. The consumer requests the schema from the schema registry (if it is not already cached) and uses the schema to decompress and deserialize the record.
+
+[AWS Glue Schema Registry](/aws/services/glue) is supported by LocalStack Pro as well, ultimately allowing you to test the evolution of your data streaming application completely on your local machine.
+It allows you develop and test your application's data schema evolution locally.
+The code for this tutorial (including a script to execute it step-by-step) can be found in our [LocalStack Pro samples over GitHub](https://github.com/localstack/localstack-pro-samples/tree/master/glue-msk-schema-registry).
+
+# Prerequisites
+
+For this tutorial you will need:
+
+- [LocalStack Pro](https://localstack.cloud/pricing/) to emulate Amazon MSK and AWS Glue Schema Registry locally
+ - Don't worry, if you don't have a subscription yet, you can just get a trial license for free.
+- [awslocal](/aws/integrations/aws-native-tools/aws-cli#localstack-aws-cli-awslocal)
+- Java 11+
+- Maven 3
+
+## Initial schema
+
+At first, we will define our schema, set up our Java project, and generate the Java data classes using the schema.
+In our Apache Avro data schema we describe a request to ride a unicorn, including the necessary addresses, a fare, a duration, some preferences, and a customer record:
+
+```json
+{
+ "type": "record",
+ "name": "UnicornRideRequest",
+ "namespace": "cloud.localstack.demos.gluemsk.schema",
+ "fields": [
+ {"name": "request_id", "type": "int", "doc": "customer request id"},
+ {"name": "pickup_address","type": "string","doc": "customer pickup address"},
+ {"name": "destination_address","type": "string","doc": "customer destination address"},
+ {"name": "ride_fare","type": "float","doc": "ride fare amount (USD)"},
+ {"name": "ride_duration","type": "int","doc": "ride duration in minutes"},
+ {"name": "preferred_unicorn_color","type": {"type": "enum","name": "UnicornPreferredColor","symbols": ["WHITE","BLACK","RED","BLUE","GREY"]}, "default": "WHITE"},
+ {
+ "name": "recommended_unicorn",
+ "type": {
+ "type": "record",
+ "name": "RecommendedUnicorn",
+ "fields": [
+ {"name": "unicorn_id","type": "int", "doc": "recommended unicorn id"},
+ {"name": "color","type": {"type": "enum","name": "unicorn_color","symbols": ["WHITE","RED","BLUE"]}},
+ {"name": "stars_rating", "type": ["null", "int"], "default": null, "doc": "unicorn star ratings based on customers feedback"}
+ ]
+ }
+ },
+ {
+ "name": "customer",
+ "type": {
+ "type": "record",
+ "name": "Customer",
+ "fields": [
+ {"name": "customer_account_no","type": "int", "doc": "customer account number"},
+ {"name": "first_name","type": "string"},
+ {"name": "middle_name","type": ["null","string"], "default": null},
+ {"name": "last_name","type": "string"},
+ {"name": "email_addresses","type": ["null", {"type":"array", "items":"string" }]},
+ {"name": "customer_address","type": "string","doc": "customer address"},
+ {"name": "mode_of_payment","type": {"type": "enum","name": "ModeOfPayment","symbols": ["CARD","CASH"]}, "default": "CARD"},
+ {"name": "customer_rating", "type": ["null", "int"], "default": null}
+ ]
+ }
+ }
+ ]
+}
+```
+
+## Project setup
+
+Now we can set up our Java project with one module for the `producer` and another module for the `consumer`.
+Both modules have their schema in the `src/main/resources` folder.
+
+```bash
+.
+├── consumer
+│ ├── pom.xml
+│ └── src
+│ └── main
+│ └── resources
+│ └── avro
+│ └── unicorn_ride_request_v1.avsc
+├── producer
+│ ├── pom.xml
+│ └── src
+│ └── main
+│ └── resources
+│ └── avro
+│ └── unicorn_ride_request_v1.avsc
+└── pom.xml
+```
+
+In our root pom, we configure the `producer` and the `consumer` module, some shared dependencies (most notably `software.amazon.glue:schema-registry-serde`), and the `avro-maven-plugin` to generate Java classes for the schema:
+
+```xml
+
+
+ 4.0.0
+
+ cloud.localstack.demos.gluemsk
+ root-pom
+ 1.0-SNAPSHOT
+ pom
+ Glue MSK Demo
+
+
+ producer
+ consumer
+
+
+
+ UTF-8
+ 11
+ 11
+
+
+
+
+ software.amazon.glue
+ schema-registry-serde
+ 1.1.10
+
+
+ org.slf4j
+ slf4j-api
+ 1.7.36
+
+
+ org.slf4j
+ slf4j-reload4j
+ 1.7.36
+
+
+ org.apache.avro
+ avro
+ 1.11.0
+
+
+ com.beust
+ jcommander
+ 1.82
+
+
+
+
+
+
+
+ org.apache.avro
+ avro-maven-plugin
+ 1.11.0
+
+
+ generate-sources
+
+ schema
+
+
+ ${project.basedir}/src/main/resources/avro/
+ ${project.basedir}/src/main/java/
+
+
+
+
+
+
+
+
+```
+
+While the root pom is a bit lengthy, the `pom.xml` files of the two modules are quite simple.
+They only reference the root pom, activate the `avro-maven-plugin`, and define a main class (we'll go into detail on the actual Java code in [the section below](#implementing-a-producer-and-consumer)).
+
+Here is what the producer's `pom.xml` looks like:
+
+```xml
+
+
+
+ 4.0.0
+
+
+ cloud.localstack.demos.gluemsk
+ root-pom
+ 1.0-SNAPSHOT
+ ../pom.xml
+
+
+ producer
+ 1.0-SNAPSHOT
+ Glue MSK Demo Producer
+
+
+ cloud.localstack.demos.gluemsk.producer.Producer
+
+
+
+
+
+ org.apache.avro
+ avro-maven-plugin
+
+
+
+
+```
+
+And similarly the consumer's `pom.xml` looks like:
+
+```xml
+
+
+
+ 4.0.0
+
+
+ cloud.localstack.demos.gluemsk
+ root-pom
+ 1.0-SNAPSHOT
+ ../pom.xml
+
+
+ consumer
+ 1.0-SNAPSHOT
+ Glue MSK Demo Consumer
+
+
+ cloud.localstack.demos.gluemsk.consumer.Consumer
+
+
+
+
+
+ org.apache.avro
+ avro-maven-plugin
+
+
+
+
+```
+
+Now the project is all set up and we can already generate our schema classes from the AVRO schema using the `avro-maven-plugin`:
+
+```bash
+mvn clean generate-sources
+```
+
+After the maven plugin is done, we have all types generated for both the producer and the consumer:
+
+```plaintext
+.
+├── consumer
+│ └── src
+│ └── main
+│ ├── java
+│ └── cloud
+│ └── localstack
+│ └── demos
+│ └── gluemsk
+│ └── schema
+│ ├── Customer.java
+│ ├── ModeOfPayment.java
+│ ├── RecommendedUnicorn.java
+│ ├── unicorn_color.java
+│ ├── UnicornPreferredColor.java
+│ └── UnicornRideRequest.java
+└── producer
+ └── src
+ └── main
+ └── java
+ └── cloud
+ └── localstack
+ └── demos
+ └── gluemsk
+ └── schema
+ ├── Customer.java
+ ├── ModeOfPayment.java
+ ├── RecommendedUnicorn.java
+ ├── unicorn_color.java
+ ├── UnicornPreferredColor.java
+ └── UnicornRideRequest.java
+```
+
+Since we want to log the sent and received messages, we need to configure the logging by adding a `log4j.properties` to the `src/main/resources` folder of the two modules:
+
+```properties
+log4j.rootLogger=DEBUG, CONSOLE
+log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
+log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
+log4j.appender.CONSOLE.layout.ConversionPattern=[%c{1}][%-5p] %m%n
+```
+
+## Implementing a Producer and Consumer
+
+Now, all the boilerplate is done:
+
+- We have a proper project setup.
+- The logging is properly configured.
+- We generated our schema classes which we'll use as a typesafe interface for the records.
+
+### The Producer
+
+The next step is to implement our producer.
+The complete module can be found on our [samples repository (along with the rest of the code of this tutorial)](https://github.com/localstack/localstack-pro-samples/blob/cd023a84a3b473984e9c34053d4feb7de8e038c1/glue-msk-schema-registry/producer/).
+
+We create a new class called `Producer` in `producer/src/main/java/cloud/localstack/demos/gluemsk/producer/`.
+The `Producer` contains a `main` method which uses [`jcommander`](https://jcommander.org/) to create a simple CLI interface:
+
+```java
+package cloud.localstack.demos.gluemsk.producer;
+
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class Producer {
+ private final static Logger LOGGER = LoggerFactory.getLogger(org.apache.kafka.clients.producer.Producer.class.getName());
+
+ @Parameter(names = {"--help", "-h"}, help = true)
+ protected boolean help = false;
+ @Parameter(names = {"--bootstrap-servers", "-bs"}, description = "Kafka bootstrap servers endpoint to connect to.")
+ protected String bootstrapServers = "localhost:4511";
+ @Parameter(names = {"--aws-endpoint-servers", "-ae"}, description = "AWS endpoint to use.")
+ protected String awsEndpoint = "https://localhost.localstack.cloud:4566";
+ @Parameter(names = {"--region", "-reg"}, description = "AWS Region to use.")
+ protected String regionName = "us-east-1";
+ @Parameter(names = {"--topic-name", "-topic"}, description = "Kafka topic name where you send the data records. Default is unicorn-ride-request-topic.")
+ protected String topic = "unicorn-ride-request-topic";
+ @Parameter(names = {"--num-messages", "-nm"}, description = "Number of messages you want producer to send. Default is 100.")
+ protected String str_numOfMessages = "100";
+
+
+ public static void main(String[] args) {
+ Producer producer = new Producer();
+ JCommander jc = JCommander.newBuilder().addObject(producer).build();
+ jc.parse(args);
+ if (producer.help) {
+ jc.usage();
+ return;
+ }
+ producer.startProducer();
+ }
+
+ public void startProducer() {
+ // TODO:
+ // - Create the producer
+ // - Send the UnicornRideRequest records to the topic
+ }
+}
+```
+
+Now we can add a method to configure our producer:
+
+```java
+private Properties getProducerConfig() {
+ Properties props = new Properties();
+ // use the "--bootstrap-servers" argument to define the Kafka bootstrap address to connect to
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServers);
+ props.put(ProducerConfig.ACKS_CONFIG, "-1");
+ props.put(ProducerConfig.CLIENT_ID_CONFIG, "glue-msk-demo-producer");
+ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+ // use the GlueSchemaRegistryKafkaSerializer from software.amazon.glue:schema-registry-serde
+ props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, GlueSchemaRegistryKafkaSerializer.class.getName());
+ // configure the GlueSchemaRegistryKafkaSerializer (data format, region, Glue registry and schema,...)
+ props.put(AWSSchemaRegistryConstants.DATA_FORMAT, DataFormat.AVRO.name());
+ props.put(AWSSchemaRegistryConstants.AWS_REGION, regionName);
+ props.put(AWSSchemaRegistryConstants.REGISTRY_NAME, "unicorn-ride-request-registry");
+ props.put(AWSSchemaRegistryConstants.SCHEMA_NAME, "unicorn-ride-request-schema-avro");
+ props.put(AWSSchemaRegistryConstants.AVRO_RECORD_TYPE, AvroRecordType.SPECIFIC_RECORD.getName());
+ // define the endpoint to use - by default we use LocalStack (localhost.localstack.cloud)
+ props.put(AWSSchemaRegistryConstants.AWS_ENDPOINT, this.awsEndpoint);
+ // enable compression of records
+ props.put(AWSSchemaRegistryConstants.COMPRESSION_TYPE, AWSSchemaRegistryConstants.COMPRESSION.ZLIB.name());
+ return props;
+}
+```
+
+In addition, we create a method to generate new dummy `UnicornRideRequests` and a `Callback` class logging the producer metadata:
+
+```java
+public UnicornRideRequest getRecord(int requestId) {
+ /*
+ Initialise UnicornRideRequest object of
+ class that is generated from AVRO Schema
+ */
+ UnicornRideRequest rideRequest = UnicornRideRequest.newBuilder()
+ .setRequestId(requestId)
+ .setPickupAddress("Melbourne, Victoria, Australia")
+ .setDestinationAddress("Sydney, NSW, Aus")
+ .setRideFare(1200.50F)
+ .setRideDuration(120)
+ .setPreferredUnicornColor(UnicornPreferredColor.WHITE)
+ .setRecommendedUnicorn(RecommendedUnicorn.newBuilder()
+ .setUnicornId(requestId * 2)
+ .setColor(unicorn_color.WHITE)
+ .setStarsRating(5).build())
+ .setCustomer(Customer.newBuilder()
+ .setCustomerAccountNo(1001)
+ .setFirstName("Dummy")
+ .setLastName("User")
+ .setEmailAddresses(List.of("demo@example.com"))
+ .setCustomerAddress("Flinders Street Station")
+ .setModeOfPayment(ModeOfPayment.CARD)
+ .setCustomerRating(5).build()).build();
+ LOGGER.info(rideRequest.toString());
+ return rideRequest;
+}
+
+private static class ProducerCallback implements Callback {
+ @Override
+ public void onCompletion(RecordMetadata recordMetaData, Exception e) {
+ if (e == null) {
+ LOGGER.info("Received new metadata. \t" +
+ "Topic:" + recordMetaData.topic() + "\t" +
+ "Partition: " + recordMetaData.partition() + "\t" +
+ "Offset: " + recordMetaData.offset() + "\t" +
+ "Timestamp: " + recordMetaData.timestamp());
+ } else {
+ LOGGER.info("There's been an error from the Producer side");
+ e.printStackTrace();
+ }
+ }
+}
+```
+
+And finally, we can implement our `startProducer` method which creates the producer using `getProducerConfig` and sends records generated by `getRecord`:
+
+```java
+public void startProducer() {
+ try (KafkaProducer producer = new KafkaProducer<>(getProducerConfig())) {
+ int numberOfMessages = Integer.parseInt(str_numOfMessages);
+ LOGGER.info("Starting to send records...");
+ for (int i = 0; i < numberOfMessages; i++) {
+ UnicornRideRequest rideRequest = getRecord(i);
+ String key = "key-" + i;
+ ProducerRecord record = new ProducerRecord<>(topic, key, rideRequest);
+ producer.send(record, new ProducerCallback());
+ }
+ }
+}
+```
+
+### The Consumer
+
+Now that we have a `Producer`, we need a component which reads the data from the Kafka cluster.
+
+The complete module can be found on our [samples repository (along with the rest of the code of this tutorial).](https://github.com/localstack/localstack-pro-samples/blob/cd023a84a3b473984e9c34053d4feb7de8e038c1/glue-msk-schema-registry/consumer/)
+
+We create a new class called `Consumer` in `consumer/src/main/java/cloud/localstack/demos/gluemsk/consumer/`.
+
+Analogous to the `Producer`, the `Consumer` contains a `main` method which uses [`jcommander`](https://jcommander.org/) to create a simple CLI interface:
+
+```java
+package cloud.localstack.demos.gluemsk.consumer;
+
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class Consumer {
+ private final static Logger LOGGER = LoggerFactory.getLogger(java.util.function.Consumer.class.getName());
+
+ @Parameter(names = {"--help", "-h"}, help = true)
+ protected boolean help = false;
+ @Parameter(names = {"--bootstrap-servers", "-bs"}, description = "kafka bootstrap servers endpoint")
+ protected String bootstrapServers = "localhost:4511";
+ @Parameter(names = {"--aws-endpoint-servers", "-ae"}, description = "AWS endpoint")
+ protected String awsEndpoint = "https://localhost.localstack.cloud:4566";
+ @Parameter(names = {"--region", "-reg"}, description = "AWS Region to use.")
+ protected String regionName = "us-east-1";
+ @Parameter(names = {"--topic-name", "-topic"}, description = "Kafka topic name where you send the data records. Default is unicorn-ride-request-topic")
+ protected String topic = "unicorn-ride-request-topic";
+ @Parameter(names = {"--num-messages", "-nm"}, description = "Number of messages you want consumer to wait for until it stops. Default is 100, use 0 if you want it to run indefinitely.")
+ protected String str_numOfMessages = "100";
+
+
+ public static void main(String[] args) {
+ Consumer consumer = new Consumer();
+ JCommander jc = JCommander.newBuilder().addObject(consumer).build();
+ jc.parse(args);
+ if (consumer.help) {
+ jc.usage();
+ return;
+ }
+ consumer.startConsumer();
+ }
+
+ public void startConsumer() {
+ // TODO:
+ // - Create the Kafka Consumer
+ // - Subscribe to the topic
+ // - Process the incoming records
+ }
+}
+```
+
+Similar to the producer, we need to configure our consumer:
+
+```java
+private Properties getConsumerConfig() {
+ Properties props = new Properties();
+ // use the "--bootstrap-servers" argument to define the Kafka bootstrap address to connect to
+ props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServers);
+ props.put(ConsumerConfig.GROUP_ID_CONFIG, "unicorn.riderequest.consumer");
+ props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+ // use the GlueSchemaRegistryKafkaDeserializer from software.amazon.glue:schema-registry-serde
+ props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, GlueSchemaRegistryKafkaDeserializer.class.getName());
+ props.put(AWSSchemaRegistryConstants.AWS_REGION, this.regionName);
+ props.put(AWSSchemaRegistryConstants.AVRO_RECORD_TYPE, AvroRecordType.SPECIFIC_RECORD.getName());
+ // define the endpoint to use - by default we use LocalStack (localhost.localstack.cloud)
+ props.put(AWSSchemaRegistryConstants.AWS_ENDPOINT, this.awsEndpoint);
+ return props;
+}
+```
+
+And finally, we can implement our `startConsumer` method which creates the consumer using `getConsumerConfig`, reads the records from the topic, and logs them:
+
+```java
+public void startConsumer() {
+ LOGGER.info("Starting consumer...");
+ try (KafkaConsumer consumer = new KafkaConsumer<>(getConsumerConfig())) {
+ consumer.subscribe(Collections.singletonList(topic));
+ int outstandingMessages = Integer.parseInt(str_numOfMessages);
+ boolean runIndefinitely = outstandingMessages == 0;
+ while (outstandingMessages > 0 || runIndefinitely) {
+ // a real consumer would probably run in an endless loop waiting for new records here
+ final ConsumerRecords records = consumer.poll(Duration.ofMillis(10));
+ for (final ConsumerRecord record : records) {
+ final UnicornRideRequest rideRequest = record.value();
+ LOGGER.info(String.valueOf(rideRequest.getRequestId()));
+ LOGGER.info(rideRequest.toString());
+ outstandingMessages--;
+ }
+ }
+ }
+ LOGGER.info("Stopping consumer...");
+}
+```
+
+## Setting up the infrastructure
+
+Now that the initial coding is done, we can give it a try.
+Let's start LocalStack:
+
+```bash
+LOCALSTACK_AUTH_TOKEN= localstack start -d
+```
+
+Once LocalStack is started, we can create a new Kafka cluster using `awslocal`:
+
+```bash
+awslocal kafka create-cluster \
+ --cluster-name "unicorn-ride-cluster" \
+ --kafka-version "2.2.1" \
+ --number-of-broker-nodes 1 \
+ --broker-node-group-info "{\"ClientSubnets\": [], \"InstanceType\":\"kafka.m5.xlarge\"}"
+```
+
+```bash title="Output"
+{
+ "ClusterArn": "arn:aws:kafka:us-east-1:000000000000:cluster/unicorn-ride-cluster/f9b16124-baf3-459b-8507-ec6c605b7a0a-25",
+ "ClusterName": "unicorn-ride-cluster",
+ "State": "CREATING"
+}
+```
+
+The `ClusterArn` is created dynamically and will be different for your run.
+Make sure to use your `ClusterArn` for the commands below.
+
+It takes some time for the cluster to get up and running.
+We can monitor the state with `describe-cluster`:
+
+```bash
+awslocal kafka describe-cluster --cluster-arn "arn:aws:kafka:us-east-1:000000000000:cluster/unicorn-ride-cluster/f9b16124-baf3-459b-8507-ec6c605b7a0a-25"
+```
+
+```bash title="Output"
+{
+ "ClusterInfo": {
+ "BrokerNodeGroupInfo": {
+ "ClientSubnets": [],
+ "InstanceType": "kafka.m5.xlarge"
+ },
+ "ClusterArn": "arn:aws:kafka:us-east-1:000000000000:cluster/unicorn-ride-cluster/f9b16124-baf3-459b-8507-ec6c605b7a0a-25",
+ "ClusterName": "unicorn-ride-cluster",
+ "CreationTime": "2022-07-21T14:41:07.897000Z",
+ "CurrentBrokerSoftwareInfo": {
+ "KafkaVersion": "2.5.0"
+ },
+ "CurrentVersion": "KFABVNKTBGF6HX",
+ "NumberOfBrokerNodes": 1,
+ "State": "ACTIVE",
+ "ZookeeperConnectString": "localhost:4510"
+ }
+}
+```
+
+Once the `State` is `ACTIVE`, the cluster is ready to be used.
+Now it's time to create our Glue Schema Registry:
+
+```bash
+awslocal glue create-registry --registry-name unicorn-ride-request-registry
+```
+
+```bash title="Output"
+{
+ "RegistryArn": "arn:aws:glue:us-east-1:000000000000:file-registry/unicorn-ride-request-registry",
+ "RegistryName": "unicorn-ride-request-registry"
+}
+```
+
+In the newly created registry, we can now add our initial `UnicornRideRequest` schema:
+
+```bash
+awslocal glue create-schema \
+ --registry-id RegistryName="unicorn-ride-request-registry" \
+ --schema-name unicorn-ride-request-schema-avro \
+ --compatibility BACKWARD \
+ --data-format AVRO \
+ --schema-definition "file://producer/src/main/resources/avro/unicorn_ride_request_v1.avsc"
+```
+
+```bash title="Output"
+{
+ "RegistryName": "unicorn-ride-request-registry",
+ "RegistryArn": "arn:aws:glue:us-east-1:000000000000:file-registry/unicorn-ride-request-registry",
+ "SchemaName": "unicorn-ride-request-schema-avro",
+ "SchemaArn": "arn:aws:glue:us-east-1:000000000000:schema/unicorn-ride-request-registry/unicorn-ride-request-schema-avro",
+ "DataFormat": "AVRO",
+ "Compatibility": "BACKWARD",
+ "SchemaCheckpoint": 1,
+ "LatestSchemaVersion": 1,
+ "NextSchemaVersion": 2,
+ "SchemaStatus": "AVAILABLE",
+ "SchemaVersionId": "925c868c-ff56-4992-9f24-6df5933c15cc",
+ "SchemaVersionStatus": "AVAILABLE"
+}
+```
+
+For the schema, we just defined the compatibility mode `BACKWARD`.
+This means that the consumers using the new schema can also read data produced with the last schema.
+For example, this would allow the deletion of fields, or the introduction of new optional fields.
+
+You can find a thorough description of the different compatibility modes in the [AWS docs on Schema Versioning and Compatibility](https://docs.aws.amazon.com/glue/latest/dg/schema-registry.html#schema-registry-compatibility).
+The Glue Schema Registry will ensure that newly registered schemas fulfill the constraints defined by the compatibility mode.
+
+## Running the Producer and Consumer
+
+Finally, everything is ready to start our `Producer` and `Consumer`.
+First, we need to get the bootstrap server address from the Kafka cluster:
+
+```bash
+awslocal kafka get-bootstrap-brokers --cluster-arn "arn:aws:kafka:us-east-1:000000000000:cluster/unicorn-ride-cluster/f9b16124-baf3-459b-8507-ec6c605b7a0a-25"
+```
+
+```bash title="Output"
+{
+ "BootstrapBrokerString": "localhost:4511"
+}
+```
+
+Like the `ClusterArn`, the `BootstrapBrokerString` is dynamic and can be different.
+Please make sure to use your bootstrap server address for the runs below.
+
+Now let's start the `Producer`.
+By default, our producer will just send 100 records and shut down.
+
+```bash
+# Compile the Java packages
+mvn clean install
+# Run the producer
+mvn -pl producer exec:java -Dexec.args="--bootstrap-servers localhost:4511"
+```
+
+Once the producer is running, we can observe the different [steps of the producer as described in the intro](#integration-description):
+
+1. Before sending a record, the producer validates that the schema, which it is using to serialize its records, is valid:
+
+ ```plaintext
+ ...
+ [GlueSchemaRegistrySerializerFactory][DEBUG] Returning Avro serializer instance from GlueSchemaRegistrySerializerFactory
+ [AWSSchemaRegistryClient][DEBUG] Getting Schema Version Id for : schemaDefinition = {"type":"record","name":"UnicornRideRequest","namespace":"cloud.localstack.demos.gluemsk.schema","fields":[{"name":"request_id","type":"int","doc":"customer request id"},{"name":"pickup_address","type":"string","doc":"customer pickup address"},{"name":"destination_address","type":"string","doc":"customer destination address"},{"name":"ride_fare","type":"float","doc":"ride fare amount (USD)"},{"name":"ride_duration","type":"int","doc":"ride duration in minutes"},{"name":"preferred_unicorn_color","type":{"type":"enum","name":"UnicornPreferredColor","symbols":["WHITE","BLACK","RED","BLUE","GREY"]},"default":"WHITE"},{"name":"recommended_unicorn","type":{"type":"record","name":"RecommendedUnicorn","fields":[{"name":"unicorn_id","type":"int","doc":"recommended unicorn id"},{"name":"color","type":{"type":"enum","name":"unicorn_color","symbols":["WHITE","RED","BLUE"]}},{"name":"stars_rating","type":["null","int"],"doc":"unicorn star ratings based on customers feedback","default":null}]}},{"name":"customer","type":{"type":"record","name":"Customer","fields":[{"name":"customer_account_no","type":"int","doc":"customer account number"},{"name":"first_name","type":"string"},{"name":"middle_name","type":["null","string"],"default":null},{"name":"last_name","type":"string"},{"name":"email_addresses","type":["null",{"type":"array","items":"string"}]},{"name":"customer_address","type":"string","doc":"customer address"},{"name":"mode_of_payment","type":{"type":"enum","name":"ModeOfPayment","symbols":["CARD","CASH"]},"default":"CARD"},{"name":"customer_rating","type":["null","int"],"default":null}]}}]}, schemaName = unicorn-ride-request-schema-avro, dataFormat = AVRO
+ ...
+ ```
+
+2. Once the schema is known to be valid, the producer serializes the record, compresses it, and sends it to the Kafka cluster.
+
+ ```plaintext
+ ...
+ [GlueSchemaRegistryKafkaSerializer][DEBUG] Schema Version Id received from the from schema registry: f95edc4b-778d-4f65-b23e-7de41c5b4e53
+ [GlueSchemaRegistrySerializerFactory][DEBUG] Returning Avro serializer instance from GlueSchemaRegistrySerializerFactory
+ [GlueSchemaRegistryDefaultCompression][DEBUG] Compression :: record length: 0KB
+ [GlueSchemaRegistryDefaultCompression][DEBUG] Compression :: record length after compression: 0KB
+ [Producer][INFO ] {"request_id": 1, "pickup_address": "Melbourne, Victoria, Australia", "destination_address": "Sydney, NSW, Aus", "ride_fare": 1200.5, "ride_duration": 120, "preferred_unicorn_color": "WHITE", "recommended_unicorn": {"unicorn_id": 2, "color": "WHITE", "stars_rating": 5}, "customer": {"customer_account_no": 1001, "first_name": "Dummy", "middle_name": null, "last_name": "User", "email_addresses": ["demo@example.com"], "customer_address": "Flinders Street Station", "mode_of_payment": "CARD", "customer_rating": 5}}
+ ...
+ ```
+
+ Now, the records sent by the producer are managed by the Kafka cluster and are waiting for a consumer to pick them up.
+ We can start the consumer with the same bootstrap server address as the producer:
+
+ ```bash
+ mvn -pl consumer exec:java -Dexec.args="--bootstrap-servers localhost:4511"
+ ```
+
+ In the logs of the consumer, we can now observe the [steps of the consumer as described in the intro](#integration-description):
+
+3. The serialized and compressed record is read by the consumers:
+
+ ```plaintext
+ [NetworkClient][DEBUG] [Consumer clientId=consumer-unicorn.riderequest.consumer-1, groupId=unicorn.riderequest.consumer] Sending FETCH request with header RequestHeader(apiKey=FETCH, apiVersion=12, clientId=consumer-unicorn.riderequest.consumer-1, correlationId=10) and timeout 30000 to node 0: FetchRequestData(clusterId=null, replicaId=-1, maxWaitMs=500, minBytes=1, maxBytes=52428800, isolationLevel=0, sessionId=0, sessionEpoch=0, topics=[FetchTopic(topic='unicorn-ride-request-topic', partitions=[FetchPartition(partition=0, currentLeaderEpoch=0, fetchOffset=900, lastFetchedEpoch=-1, logStartOffset=-1, partitionMaxBytes=1048576)])], forgottenTopicsData=[], rackId='')
+ [NetworkClient][DEBUG] [Consumer clientId=consumer-unicorn.riderequest.consumer-1, groupId=unicorn.riderequest.consumer] Received FETCH response from node 0 for request with header RequestHeader(apiKey=FETCH, apiVersion=12, clientId=consumer-unicorn.riderequest.consumer-1, correlationId=10): FetchResponseData(throttleTimeMs=0, errorCode=0, sessionId=2057133662, responses=[FetchableTopicResponse(topic='unicorn-ride-request-topic', partitionResponses=[FetchablePartitionResponse(partition=0, errorCode=0, highWatermark=1000, lastStableOffset=1000, logStartOffset=0, divergingEpoch=EpochEndOffset(epoch=-1, endOffset=-1), currentLeader=LeaderIdAndEpoch(leaderId=-1, leaderEpoch=-1), snapshotId=SnapshotId(endOffset=-1, epoch=-1), abortedTransactions=null, preferredReadReplica=-1, recordSet=MemoryRecords(size=17141, buffer=java.nio.HeapByteBuffer[pos=0 lim=17141 cap=17144]))])])
+ ```
+
+4. The consumer requests the schema from the schema registry, and uses the schema to decompress and deserialize the record.
+
+ ```plaintext
+ [request][DEBUG] Sending Request: DefaultSdkHttpFullRequest(httpMethod=POST, protocol=https, host=localhost.localstack.cloud, port=4566, encodedPath=/, headers=[amz-sdk-invocation-id, Content-Length, Content-Type, User-Agent, X-Amz-Target], queryParameters=[])
+ ...
+ [request][DEBUG] Received successful response: 200, Request ID: 1EDT4G1DBSDS0XCD2N7FM9L4SRROEEML1FPDOYSANII78CXDR4F2, Extended Request ID: not available
+ [GlueSchemaRegistryDeserializerFactory][DEBUG] Returning Avro de-serializer instance from GlueSchemaRegistryDeserializerFactory
+ [GlueSchemaRegistryDefaultCompression][DEBUG] Decompression :: Compressed record length: 0KB
+ [GlueSchemaRegistryDefaultCompression][DEBUG] Decompression :: Decompressed record length: 0KB
+ [AvroDeserializer][DEBUG] Length of actual message: 121
+ [DatumReaderInstance][DEBUG] Using SpecificDatumReader for de-serializing Avro message, schema: {"type":"record","name":"UnicornRideRequest","namespace":"cloud.localstack.demos.gluemsk.schema","fields":[{"name":"request_id","type":"int","doc":"customer request id"},{"name":"pickup_address","type":"string","doc":"customer pickup address"},{"name":"destination_address","type":"string","doc":"customer destination address"},{"name":"ride_fare","type":"float","doc":"ride fare amount (USD)"},{"name":"ride_duration","type":"int","doc":"ride duration in minutes"},{"name":"preferred_unicorn_color","type":{"type":"enum","name":"UnicornPreferredColor","symbols":["WHITE","BLACK","RED","BLUE","GREY"]},"default":"WHITE"},{"name":"recommended_unicorn","type":{"type":"record","name":"RecommendedUnicorn","fields":[{"name":"unicorn_id","type":"int","doc":"recommended unicorn id"},{"name":"color","type":{"type":"enum","name":"unicorn_color","symbols":["WHITE","RED","BLUE"]}},{"name":"stars_rating","type":["null","int"],"doc":"unicorn star ratings based on customers feedback","default":null}]}},{"name":"customer","type":{"type":"record","name":"Customer","fields":[{"name":"customer_account_no","type":"int","doc":"customer account number"},{"name":"first_name","type":"string"},{"name":"middle_name","type":["null","string"],"default":null},{"name":"last_name","type":"string"},{"name":"email_addresses","type":["null",{"type":"array","items":"string"}]},{"name":"customer_address","type":"string","doc":"customer address"},{"name":"mode_of_payment","type":{"type":"enum","name":"ModeOfPayment","symbols":["CARD","CASH"]},"default":"CARD"},{"name":"customer_rating","type":["null","int"],"default":null}]}}]})
+ [AvroDeserializer][DEBUG] Finished de-serializing Avro message
+ [GlueSchemaRegistryDeserializerFactory][DEBUG] Returning Avro de-serializer instance from GlueSchemaRegistryDeserializerFactory
+ [GlueSchemaRegistryDefaultCompression][DEBUG] Decompression :: Compressed record length: 0KB
+ [GlueSchemaRegistryDefaultCompression][DEBUG] Decompression :: Decompressed record length: 0KB
+ [AvroDeserializer][DEBUG] Length of actual message: 121
+ [AvroDeserializer][DEBUG] Finished de-serializing Avro message
+ ```
+
+## Schema Evolution
+
+In the course of this tutorial, we have implemented a Kafka producer and a consumer which integrate with the Glue Schema Registry.
+But the full potential of the Glue Schema Registry is unlocked when performing a schema evolution, i.e., when running producers and consumers with a new version of an already registered schema.
+
+Therefore, we will run a few more interesting scenarios to illustrate the benefits of the Schema Registry:
+
+1. Running a producer which automatically registers a new, compatible schema version.
+2. Running a producer which is rejected when trying to register a new, incompatible schema version.
+3. Running an old consumer which is rejected since it is not compatible with the newly registered schema version.
+4. Running an updated consumer which can consume the new schema / records.
+
+### Producer registering a new schema version
+
+In this step, we will create a new producer which uses a new, `BACKWARD` compatible schema version.
+The complete module can be found in our [samples repository (along with the rest of the code of this tutorial).](https://github.com/localstack/localstack-pro-samples/blob/cd023a84a3b473984e9c34053d4feb7de8e038c1/glue-msk-schema-registry/producer-2/)
+
+The producer should register the new schema version automatically on its own.
+
+We create the new producer by executing the following steps:
+
+- Copy the `producer` directory and rename it to `producer-2`.
+- Set a new artifact ID in the `pom.xml` of the module:
+
+ ```xml
+ ...
+ producer-2
+ ...
+ ```
+
+- Add the new module to the root `pom.xml`:
+
+ ```xml
+
+ producer
+ producer-2
+ consumer
+
+ ```
+
+- Create a new version of the schema:
+ - Rename the schema to `unicorn_ride_request_v2.avsc`.
+ - In the schema, remove the previously required field `customer`:
+
+ ```bash
+ $ diff -u producer/src/main/resources/avro/unicorn_ride_request_v1.avsc producer-2/src/main/resources/avro/unicorn_ride_request_v2.avsc
+ --- producer/src/main/resources/avro/unicorn_ride_request_v1.avsc 2022-05-13 08:27:08.219354922 +0200
+ +++ producer-2/src/main/resources/avro/unicorn_ride_request_v2.avsc 2022-05-13 08:27:08.219354922 +0200
+ @@ -20,23 +20,6 @@
+ {"name": "stars_rating", "type": ["null", "int"], "default": null, "doc": "unicorn star ratings based on customers feedback"}
+ ]
+ }
+ - },
+ - {
+ - "name": "customer",
+ - "type": {
+ - "type": "record",
+ - "name": "Customer",
+ - "fields": [
+ - {"name": "customer_account_no","type": "int", "doc": "customer account number"},
+ - {"name": "first_name","type": "string"},
+ - {"name": "middle_name","type": ["null","string"], "default": null},
+ - {"name": "last_name","type": "string"},
+ - {"name": "email_addresses","type": ["null", {"type":"array", "items":"string" }]},
+ - {"name": "customer_address","type": "string","doc": "customer address"},
+ - {"name": "mode_of_payment","type": {"type": "enum","name": "ModeOfPayment","symbols": ["CARD","CASH"]}, "default": "CARD"},
+ - {"name": "customer_rating", "type": ["null", "int"], "default": null}
+ - ]
+ - }
+ }
+ ]
+ }
+ ```
+
+ This change is `BACKWARD` compatible, because an updated consumer can read records for both - the current and the previous - records (new consumers don't need the customer data, they don't care if it's present or not).
+- Re-generate the Java classes for the schema:
+
+ ```bash
+ mvn clean generate-sources
+ ```
+
+- Once the classes have been generated, the producer code needs to be adjusted (remove the usage of `setCustomer` in the producer's `getRecord`, since the method does not exist anymore).
+- Configure the producer to automatically register its schema version in case it's not yet registered by setting the additional property `AWSSchemaRegistryConstants.SCHEMA_AUTO_REGISTRATION_SETTING` to `true`:
+
+ ```java
+ ...
+ props.put(AWSSchemaRegistryConstants.COMPRESSION_TYPE, AWSSchemaRegistryConstants.COMPRESSION.ZLIB.name());
+ // Automatically register the new schema version of this producer!
+ props.put(AWSSchemaRegistryConstants.SCHEMA_AUTO_REGISTRATION_SETTING, true);
+ return props;
+ }
+ ...
+ ```
+
+Now we can run the new producer with the same bootstrap server address as before:
+
+```bash
+mvn clean install
+mvn -pl producer-2 exec:java -Dexec.args="--bootstrap-servers localhost:4511"
+```
+
+In the logs we can see that the producer registered a new schema version before successfully publishing the new records:
+
+```plaintext
+...
+[AWSSchemaRegistryClient][INFO ] Registered the schema version with schema version id = 02922438-aa47-41e0-80e0-42238d07565f and with version number = 2 and status AVAILABLE
+...
+```
+
+### Producer trying to register an incompatible schema version
+
+In our next scenario we will create a new producer which wants to register a schema which is _not_ compatible to the schema in the registry.
+The producer will be rejected right when trying to register the new schema version, before even sending a record.
+
+The complete module can be found in our [samples repository (along with the rest of the code of this tutorial).](https://github.com/localstack/localstack-pro-samples/blob/cd023a84a3b473984e9c34053d4feb7de8e038c1/glue-msk-schema-registry/producer-3/)
+
+Similar to the previous scenario, we create a new producer by executing the following steps:
+- Copy the `producer-2` directory and rename it to `producer-3`.
+- Set a new artifact ID in the `pom.xml` of the module:
+
+ ```xml
+ ...
+ producer-3
+ ...
+ ```
+
+- Add the new module to the root `pom.xml`:
+
+ ```xml
+
+ producer
+ producer-2
+ producer-3
+ consumer
+
+ ```
+
+- Create a new version of the schema:
+ - Rename the schema to `unicorn_ride_request_v3.avsc`.
+ - In the schema, add a new required field `unicorn_food`:
+
+ ```bash
+ $ diff -u producer-2/src/main/resources/avro/unicorn_ride_request_v2.avsc producer-3/src/main/resources/avro/unicorn_ride_request_v3.avsc
+ --- producer-2/src/main/resources/avro/unicorn_ride_request_v2.avsc 2022-05-13 08:27:08.219354922 +0200
+ +++ producer-3/src/main/resources/avro/unicorn_ride_request_v3.avsc 2022-05-13 08:27:08.219354922 +0200
+ @@ -20,6 +20,17 @@
+ {"name": "stars_rating", "type": ["null", "int"], "default": null, "doc": "unicorn star ratings based on customers feedback"}
+ ]
+ }
+ + },
+ + {
+ + "name": "unicorn_food",
+ + "type": {
+ + "type": "record",
+ + "name": "Food",
+ + "fields": [
+ + {"name": "price","type": "float","doc": "price per pound of food (USD)"},
+ + {"name": "name","type": "string"}
+ + ]
+ + }
+ }
+ ]
+ }
+ ```
+
+ This change is _not_ `BACKWARD` compatible, because an updated consumer cannot read records for both - the current and the previous - records (new consumers would expect the `unicorn_food`, which is not present in old records).
+- Re-generate the Java classes for the schema:
+
+ ```bash
+ mvn clean generate-sources
+ ```
+
+- Once the classes have been generated, the producer code needs to be adjusted (set the new required `unicorn_food` in the producer's `getRecord`):
+
+ ```java
+ ...
+ .setStarsRating(5).build())
+ // we removed the (previously required) customer data here in the new version of the producer (v1)
+ // and added a new field without a default (which isn't backward compatible)
+ .setUnicornFood(Food.newBuilder().setPrice(133.7f).setName("Rainbow").build())
+ .build();
+ ...
+ ```
+
+We can run the new producer with the same bootstrap server address as before:
+
+```bash
+mvn clean install
+mvn -pl producer-3 exec:java -Dexec.args="--bootstrap-servers localhost:4511"
+```
+
+In the logs we can see that the producer fails when trying to register its new, but incompatible, version of the schema:
+
+```plaintext
+...
+[AWSSchemaRegistryClient][INFO ] Registered the schema version with schema version id = 8f625284-07b4-442e-83ae-395cd7853746 and with version number = 3 and status FAILURE
+...
+[WARNING]
+com.amazonaws.services.schemaregistry.exception.AWSSchemaRegistryException: Register schema :: Call failed when registering the schema with the schema registry for schema name = unicorn-ride-request-schema-avro
+ at com.amazonaws.services.schemaregistry.common.AWSSchemaRegistryClient.registerSchemaVersion (AWSSchemaRegistryClient.java:310)
+...
+
+```
+
+### Outdated consumers
+
+We've seen how the producer's schema evolution works in the previous scenarios.
+Now, we'll take a closer look at our consumer.
+
+In our [first schema evolution scenario](#producer-registering-a-new-schema-version), the producer registered a new version of the schema and afterwards published records with that schema.
+The `BACKWARD` schema compatibility guarantees that updated consumers can read older records, i.e., the consumers need to be updated before the producers.
+
+Therefore, our old consumer will fail in consuming these records, because it is not compatible with the new schema registered by the new producer yet:
+
+```bash
+mvn -pl consumer exec:java -Dexec.args="--bootstrap-servers localhost:4511"
+```
+
+In the logs, we can see that the consumer fails because the AVRO schema used by the consumer is expecting a required field (`customer`):
+
+```plaintext
+[request][DEBUG] Sending Request: DefaultSdkHttpFullRequest(httpMethod=POST, protocol=https, host=localhost.localstack.cloud, port=4566, encodedPath=/, headers=[amz-sdk-invocation-id, Content-Length, Content-Type, User-Agent, X-Amz-Target], queryParameters=[])
+...
+[DatumReaderInstance][DEBUG] Using SpecificDatumReader for de-serializing Avro message, schema: {"type":"record","name":"UnicornRideRequest","namespace":"cloud.localstack.demos.gluemsk.schema","fields":[{"name":"request_id","type":"int","doc":"customer request id"},{"name":"pickup_address","type":"string","doc":"customer pickup address"},{"name":"destination_address","type":"string","doc":"customer destination address"},{"name":"ride_fare","type":"float","doc":"ride fare amount (USD)"},{"name":"ride_duration","type":"int","doc":"ride duration in minutes"},{"name":"preferred_unicorn_color","type":{"type":"enum","name":"UnicornPreferredColor","symbols":["WHITE","BLACK","RED","BLUE","GREY"]},"default":"WHITE"},{"name":"recommended_unicorn","type":{"type":"record","name":"RecommendedUnicorn","fields":[{"name":"unicorn_id","type":"int","doc":"recommended unicorn id"},{"name":"color","type":{"type":"enum","name":"unicorn_color","symbols":["WHITE","RED","BLUE"]}},{"name":"stars_rating","type":["null","int"],"doc":"unicorn star ratings based on customers feedback","default":null}]}},{"name":"customer","type":{"type":"record","name":"Customer","fields":[{"name":"customer_account_no","type":"int","doc":"customer account number"},{"name":"first_name","type":"string"},{"name":"middle_name","type":["null","string"],"default":null},{"name":"last_name","type":"string"},{"name":"email_addresses","type":["null",{"type":"array","items":"string"}]},{"name":"customer_address","type":"string","doc":"customer address"},{"name":"mode_of_payment","type":{"type":"enum","name":"ModeOfPayment","symbols":["CARD","CASH"]},"default":"CARD"},{"name":"customer_rating","type":["null","int"],"default":null}]}}]})
+...
+[WARNING]
+org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition unicorn-ride-request-topic-0 at offset 100. If needed, please seek past the record to continue consumption.
+Caused by: com.amazonaws.services.schemaregistry.exception.AWSSchemaRegistryException: Exception occurred while de-serializing Avro message
+ at com.amazonaws.services.schemaregistry.deserializers.avro.AvroDeserializer.deserialize (AvroDeserializer.java:103)
+...
+Caused by: org.apache.avro.AvroTypeException: Found cloud.localstack.demos.gluemsk.schema.UnicornRideRequest, expecting cloud.localstack.demos.gluemsk.schema.UnicornRideRequest, missing required field customer
+ at org.apache.avro.io.ResolvingDecoder.doAction (ResolvingDecoder.java:308)
+...
+```
+
+### Updated consumers
+
+Finally, we will update our consumer such that it is compatible to the new version of the schema.
+The complete module can be found in our [samples repository (along with the rest of the code of this tutorial).](https://github.com/localstack/localstack-pro-samples/blob/cd023a84a3b473984e9c34053d4feb7de8e038c1/glue-msk-schema-registry/consumer-2/)
+
+Similar to the previous producer scenarios, we create a new consumer by executing the following steps:
+- Copy the `consumer` directory and rename it to `consumer-2`.
+- Set a new artifact ID in the `pom.xml` of the module:
+
+ ```xml
+ ...
+ consumer-2
+ ...
+ ```
+
+- Add the new module to the root `pom.xml`:
+
+ ```xml
+
+ producer
+ producer-2
+ producer-3
+ consumer
+ consumer-2
+
+ ```
+
+- Replace the `unicorn_ride_request_v1.avsc` with the new version used by `producer-2` (`unicorn_ride_request_v2.avsc`).
+- Re-generate the Java classes for the schema:
+
+ ```bash
+ mvn clean generate-sources
+ ```
+
+Our new consumer, based on the latest version of the schema, will be able to successfully consume the records published by the new producer:
+
+```bash
+mvn -pl consumer-2 exec:java -Dexec.args="--bootstrap-servers localhost:4511"
+```
+
+## Conclusion
+
+Apache Kafka is used as the core messaging system in complex environments, with independent producers and consumers.
+The individual development of these components make it hard to coordinate and evolve data schemas over time.
+Using the AWS Glue Schema Registry can help you to prevent the usage of incompatible schemas.
+
+With LocalStack, emulating Amazon Managed Streaming for Kafka and AWS Glue Schema Registry, you can develop and test the next evolution of your data schema locally on your own machine.