Skip to content

Commit

Permalink
Provide finer grain control over committing offsets
Browse files Browse the repository at this point in the history
Depending on the use case it may not be desirable for new offsets to be
commited back to a partition. This provides message handlers a mechanism
to communicate back how to handle offsets.

- :ok will commit back the offset of the last message and request new messages
- {:ok, :maintain_offset} will request more messages keeping the prior offset
- {:ok, offset} will commit back a specific offset and get messages from there

Closes spreedly#60
  • Loading branch information
davidsantoso committed Jan 29, 2018
1 parent c1d5437 commit 1a0d995
Show file tree
Hide file tree
Showing 4 changed files with 140 additions and 64 deletions.
116 changes: 69 additions & 47 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,19 +75,19 @@ Batch message consumers receive a list of messages and work as part of the `:bro

2. Configure your Kaffe Consumer in your mix config

```elixir
config :kaffe,
consumer: [
endpoints: [kafka: 9092], # that's [hostname: kafka_port]
topics: ["interesting-topic"], # the topic(s) that will be consumed
consumer_group: "your-app-consumer-group", # the consumer group for tracking offsets in Kafka
message_handler: MessageProcessor, # the module from Step 1 that will process messages

# optional
async_message_ack: false, # see "async message acknowledgement" below
start_with_earliest_message: true # default false
],
```
```elixir
config :kaffe,
consumer: [
endpoints: [kafka: 9092], # that's [hostname: kafka_port]
topics: ["interesting-topic"], # the topic(s) that will be consumed
consumer_group: "your-app-consumer-group", # the consumer group for tracking offsets in Kafka
message_handler: MessageProcessor, # the module from Step 1 that will process messages

# optional
async_message_ack: false, # see "async message acknowledgement" below
start_with_earliest_message: true # default false
],
```

The `start_with_earliest_message` field controls where your consumer group starts when it starts for the very first time. Once offsets have been committed to Kafka then they will supercede this option. If omitted then your consumer group will start processing from the most recent messages in the topic instead of consuming all available messages.

Expand Down Expand Up @@ -122,21 +122,21 @@ Batch message consumers receive a list of messages and work as part of the `:bro

1. Define a `handle_messages/1` function in the provided module.

`handle_messages/1` This function (note the pluralization) will be called with a *list of messages*, with each message as a map. Each message map will include the topic and partition in addition to the normal Kafka message metadata.
`handle_messages/1` This function (note the pluralization) will be called with a *list of messages*, with each message as a map. Each message map will include the topic and partition in addition to the normal Kafka message metadata.

The module's `handle_messages/1` function _must_ return `:ok` or Kaffe will throw an error. The Kaffe consumer will block until your `handle_messages/1` function returns `:ok`.
The module's `handle_messages/1` function _must_ return `:ok` or Kaffe will throw an error. The Kaffe consumer will block until your `handle_messages/1` function returns `:ok`.
```elixir
defmodule MessageProcessor
def handle_messages(messages) do
for %{key: key, value: value} = message <- messages do
IO.inspect message
IO.puts "#{key}: #{value}"
end
:ok # Important!
end
```elixir
defmodule MessageProcessor
def handle_messages(messages) do
for %{key: key, value: value} = message <- messages do
IO.inspect message
IO.puts "#{key}: #{value}"
end
```
:ok # Important!
end
end
```
2. The configuration options for the `GroupMember` consumer are a
superset of those for `Kaffe.Consumer`, except for
Expand Down Expand Up @@ -225,22 +225,44 @@ Kafka only tracks a single numeric offset, not individual messages. If a message

It's possible that your topic and system are entirely ok with losing some messages (i.e. frequent metrics that aren't individually important).

### Managing how offsets are committed

In some cases you may not want to commit back the most recent offset after processing a list of messages. For example, if you're batching messages to be sent elsewhere and want to ensure that a batch can be rebuilt should there be an error further downstream. In that example you might want to keep the offset of the first message in your batch so your consumer can restart back at that point to reprocess and rebatch the messages. Your message handler can respond in the following ways to manage how offsets are committed back:
`:ok` - commit back the most recent offset and request more messages
`{:ok, :no_commit}` - do _not_ commit back the most recent offset and request more message from the offset of the last message
`{:ok, offset}` - commit back at the offset specified and request messages from that point forward
Example:
```elixir
defmodule MessageProcessor
def handle_messages(messages) do
for %{key: key, value: value} = message <- messages do
IO.inspect message
IO.puts "#{key}: #{value}"
end
{:ok, :no_commit}
end
end
```
## Kaffe Producer Usage
`Kaffe.Producer` handles producing messages to Kafka and will automatically select the topic partitions per message or can be given a function to call to determine the partition per message. Kaffe automatically inserts a Kafka timestamp with each message.
Configure your Kaffe Producer in your mix config
```elixir
config :kaffe,
producer: [
endpoints: [kafka: 9092], # [hostname: port]
topics: ["kafka-topic"],
```elixir
config :kaffe,
producer: [
endpoints: [kafka: 9092], # [hostname: port]
topics: ["kafka-topic"],
# optional
partition_strategy: :md5
]
```
# optional
partition_strategy: :md5
]
```
The `partition_strategy` setting can be one of:
Expand All @@ -254,23 +276,23 @@ You can also set any of the Brod producer configuration options in the `producer
To configure a Kaffe Producer for a Heroku Kafka compatible environment including SSL omit the `endpoint` and instead set `heroku_kafka_env: true`
```elixir
config :kaffe,
producer: [
heroku_kafka_env: true,
topics: ["kafka-topic"],
```elixir
config :kaffe,
producer: [
heroku_kafka_env: true,
topics: ["kafka-topic"],
# optional
partition_strategy: :md5
]
```
# optional
partition_strategy: :md5
]
```
With that setting in place Kaffe will automatically pull required info from the following ENV variables:
- `KAFKA_URL`
- `KAFKA_CLIENT_CERT`
- `KAFKA_CLIENT_CERT_KEY`
- `KAFKA_TRUSTED_CERT`
- `KAFKA_URL`
- `KAFKA_CLIENT_CERT`
- `KAFKA_CLIENT_CERT_KEY`
- `KAFKA_TRUSTED_CERT`
### Producing to Kafka
Expand Down
24 changes: 16 additions & 8 deletions lib/kaffe/group_member/subscriber/subscriber.ex
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
defmodule Kaffe.Subscriber do
@moduledoc """
Consume messages from a single partition of a single Kafka topic.
Assignments are received from a group consumer member, `Kaffe.GroupMember`.
Messages are delegated to `Kaffe.Worker`. The worker is expected to cast back
Expand Down Expand Up @@ -46,8 +46,12 @@ defmodule Kaffe.Subscriber do
GenServer.stop(subscriber_pid)
end

def ack_messages(subscriber_pid, topic, partition, generation_id, offset) do
GenServer.cast(subscriber_pid, {:ack_messages, topic, partition, generation_id, offset})
def commit_offset(subscriber_pid, topic, partition, generation_id, offset) do
GenServer.cast(subscriber_pid, {:commit_offset, topic, partition, generation_id, offset})
end

def request_more_messages(subscriber_pid, offset) do
GenServer.cast(subscriber_pid, {:request_more_messages, offset})
end

def init([subscriber_name, group_coordinator_pid, worker_pid,
Expand Down Expand Up @@ -100,9 +104,8 @@ defmodule Kaffe.Subscriber do
{:noreply, state}
end

def handle_cast({:ack_messages, topic, partition, generation_id, offset}, state) do

Logger.debug "Ready to ack messages of #{state.topic} / #{state.partition} / #{generation_id} at offset: #{offset}"
def handle_cast({:commit_offset, topic, partition, generation_id, offset}, state) do
Logger.debug "Ready to commit offsets for #{state.topic} / #{state.partition} / #{generation_id} at offset: #{offset}"

# Is this the ack we're looking for?
^topic = state.topic
Expand All @@ -112,12 +115,17 @@ defmodule Kaffe.Subscriber do
# Update the offsets in the group
:ok = group_coordinator().ack(state.group_coordinator_pid, state.gen_id,
state.topic, state.partition, offset)
# Request more messages from the consumer
:ok = kafka().consume_ack(state.subscriber_pid, offset)

{:noreply, state}
end

def handle_cast({:request_more_messages, offset}, state) do
Logger.debug "Ready to consume more messages of #{state.topic} / #{state.partition} at offset: #{offset}. Offset has not been commited back"

:ok = kafka().consume_ack(state.subscriber_pid, offset)

{:noreply, state}
end
defp handle_subscribe({:ok, subscriber_pid}, state) do
Logger.debug "Subscribe success: #{inspect subscriber_pid}"
Process.monitor(subscriber_pid)
Expand Down
15 changes: 12 additions & 3 deletions lib/kaffe/group_member/worker/worker.ex
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,18 @@ defmodule Kaffe.Worker do
def handle_cast({:process_messages, subscriber_pid, topic, partition, generation_id, messages},
%{message_handler: message_handler} = state) do

:ok = apply(message_handler, :handle_messages, [messages])
offset = List.last(messages).offset
subscriber().ack_messages(subscriber_pid, topic, partition, generation_id, offset)
case apply(message_handler, :handle_messages, [messages]) do
:ok ->
offset = List.last(messages).offset
subscriber().commit_offsets(subscriber_pid, topic, partition, generation_id, offset)
subscriber().request_more_messages(subscriber_pid, offset)
{:ok, :no_commit} ->
offset = List.last(messages).offset
subscriber().request_more_messages(subscriber_pid, offset)
{:ok, offset} ->
subscriber().commit_offsets(subscriber_pid, topic, partition, generation_id, offset)
subscriber().request_more_messages(subscriber_pid, offset)
end

{:noreply, state}
end
Expand Down
49 changes: 43 additions & 6 deletions test/kaffe/group_member/worker/worker_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,24 @@ defmodule Kaffe.WorkerTest do
alias Kaffe.Worker

defmodule TestSubscriber do
def ack_messages(_subscriber_pid, topic, partition, generation_id, offset) do
send :test_case, {:ack_messages, {topic, partition, generation_id, offset}}
def commit_offsets(_subscriber_pid, topic, partition, generation_id, offset) do
send :test_case, {:commit_offsets, {topic, partition, generation_id, offset}}
end

def request_more_messages(_subscriber_pid, offset) do
send :test_case, {:request_more_messages, {offset}}
end
end

defmodule TestHandler do
def handle_messages([%{key: :one, offset: 100}, %{key: :two, offset: 888}] = messages) do
send :test_case, {:handle_messages, messages}
{:ok, :no_commit}
end
def handle_messages([%{key: :one, offset: 100}, %{key: :two, offset: 999}] = messages) do
send :test_case, {:handle_messages, messages}
{:ok, 100}
end
def handle_messages(messages) do
send :test_case, {:handle_messages, messages}
:ok
Expand All @@ -23,13 +35,38 @@ defmodule Kaffe.WorkerTest do
:ok
end

test "handle messages" do

test "handle messages and commit back offset" do
{:ok, worker_pid} = Worker.start_link(TestHandler, "subscriber_name", 0)

Worker.process_messages(worker_pid, self(), "topic", 1, 2,
[%{key: :one, offset: 100}, %{key: :two, offset: 101}])

assert_receive {:handle_messages, [%{key: :one, offset: 100}, %{key: :two, offset: 101}]}
assert_receive {:ack_messages, {"topic", 1, 2, 101}}
assert_receive {:commit_offsets, {"topic", 1, 2, 101}}
assert_receive {:request_more_messages, {101}}
end

test "handle messages and maintain offset" do
{:ok, worker_pid} = Worker.start_link(TestHandler, "subscriber_name", 0)

Worker.process_messages(worker_pid, self(), "topic", 1, 2,
[%{key: :one, offset: 100}, %{key: :two, offset: 888}])

assert_receive {:handle_messages,
[%{key: :one, offset: 100}, %{key: :two, offset: 888}]}
refute_received {:commit_offsets, {"topic", 1, 2, 888}}
assert_receive {:request_more_messages, {888}}
end

test "handle messages and commit back specific offset" do
{:ok, worker_pid} = Worker.start_link(TestHandler, "subscriber_name", 0)

Worker.process_messages(worker_pid, self(), "topic", 1, 2,
[%{key: :one, offset: 100}, %{key: :two, offset: 999}])

assert_receive {:handle_messages,
[%{key: :one, offset: 100}, %{key: :two, offset: 999}]}
assert_receive {:commit_offsets, {"topic", 1, 2, 100}}
assert_receive {:request_more_messages, {100}}
end
end

0 comments on commit 1a0d995

Please sign in to comment.