Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug Report]:Unable to produce tombstone records #516

Closed
1 task done
adimoraret opened this issue Feb 1, 2024 · 7 comments · Fixed by #547
Closed
1 task done

[Bug Report]:Unable to produce tombstone records #516

adimoraret opened this issue Feb 1, 2024 · 7 comments · Fixed by #547
Labels
bug Something isn't working

Comments

@adimoraret
Copy link

adimoraret commented Feb 1, 2024

Prerequisites

  • I have searched issues to ensure it has not already been reported

Description

I am unable to produce tombstone records. It looks like the message value is sent as byte[0] instead of null.

Steps to reproduce

services.AddKafka(kafka => kafka
            .AddCluster(cluster => cluster
                .WithBrokers(new[] { kafkaBrokers })
                .AddProducer("test", producer => producer
                    .AddMiddlewares(middlewares => middlewares
                        .AddSerializer<JsonCoreSerializer>(_ => new JsonCoreSerializer(options))
                    )
                    .WithAcks(Acks.All)
                )
            )
        );

await producer.ProduceAsync("test", Guid.NewGuid().ToString(), null);

Expected behavior

Message value should be sent as null

Actual behavior

Message value is sent as byte[0]

KafkaFlow version

3.0.3

@adimoraret adimoraret added the bug Something isn't working label Feb 1, 2024
@tomaszprasolek
Copy link

Do you have any error while producing the message?

@adimoraret
Copy link
Author

No, there is no error. But it won't produce tombstone records when using a kafka upsert source: https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/upsert-kafka/ .

@JoaoRodriguesGithub
Copy link
Contributor

JoaoRodriguesGithub commented Feb 20, 2024

Like we discussed on kafkaflow on slack, by using a custom serializer, you can bypass this limitation by checking if the message is null before serializing the message value.

This should produce a message where the message value is an empty byte[] which is the tombstone.

Meanwhile, we will address this issue, by creating a pull request to fix the JsonCoreSerializer.

Regarding the Kakfa upsert source, flink we won't expect a different behavior from Kafka client as they both produce a null record.

@adrianmoraret-mdsol
Copy link

I have tried that approach, end to end, using kafka-upsert source and iceberg table destination in AWS. It sends indeed an empty byte array, but that did not translate into a tombstone record.
If I read correctly this https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/upsert-kafka/, it would only work if we send null for message.
Maybe I missed something, but I'm happy to test your PR once ready.

@JoaoRodriguesGithub
Copy link
Contributor

Fixing the issue with JsonCoreSerializer when producing a message with a null value it results in a tombstone record where the message value is an empty byte[].

"A null payload is a payload with 0 bytes"
reference here

public Task SerializeAsync(object? message, Stream output, ISerializerContext context)
    {
        if (message == null)
        {
            return Task.CompletedTask;
        }

        return SerializeNonNullMessageAsync(message, output);
    }

As an alternative, you can always use the native Kafka client to confirm that it also produces a tombstone record with an empty byte[] like KafkaFlow.

@esskar
Copy link
Contributor

esskar commented Feb 26, 2024

When fixing it with the serializer, then it has to be changed for all serializers. Sending null for tombstoning should not be part of the serialization imho.

great library by the way. :)

@esskar
Copy link
Contributor

esskar commented Mar 25, 2024

Fixing the issue with JsonCoreSerializer when producing a message with a null value it results in a tombstone record where the message value is an empty byte[].

"A null payload is a payload with 0 bytes"
reference here

public Task SerializeAsync(object? message, Stream output, ISerializerContext context)
    {
        if (message == null)
        {
            return Task.CompletedTask;
        }

        return SerializeNonNullMessageAsync(message, output);
    }

As an alternative, you can always use the native Kafka client to confirm that it also produces a tombstone record with an empty byte[] like KafkaFlow.

I resolved it the same way for the ProtobufSerializer. Is there an idea to fix it in general?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
5 participants