Skip to content

Commit

Permalink
Return producer result instead of matching on :ok
Browse files Browse the repository at this point in the history
Instead of forcing a match on `:ok`, the result of `produce_sync` is
passed back to the caller. This allows the caller to handle errors in
an application-specific manner.

ENSI-167 #comment produce_sync result passed back to the caller
  • Loading branch information
Kevin Lewis committed May 9, 2017
1 parent 1b36d4b commit 7f24010
Show file tree
Hide file tree
Showing 7 changed files with 57 additions and 26 deletions.
7 changes: 0 additions & 7 deletions lib/kaffe.ex
Original file line number Diff line number Diff line change
@@ -1,9 +1,2 @@
defmodule Kaffe do
def start_consumer_client(config) do
:brod.start_client(config.endpoints, config.subscriber_name, config.consumer_config)
end

def start_producer_client(config) do
:brod.start_client(config.endpoints, config.client_name, config.producer_config)
end
end
6 changes: 5 additions & 1 deletion lib/kaffe/consumer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ defmodule Kaffe.Consumer do
Initialize the consumer loop.
"""
def init(_consumer_group, [config]) do
Kaffe.start_consumer_client(config)
start_consumer_client(config)
{:ok, %Kaffe.Consumer.State{message_handler: config.message_handler, async: config.async_message_ack}}
end

Expand Down Expand Up @@ -140,6 +140,10 @@ defmodule Kaffe.Consumer do
## internal functions
## -------------------------------------------------------------------------

def start_consumer_client(config) do
@kafka.start_client(config.endpoints, config.subscriber_name, config.consumer_config)
end

defp compile_message(msg, topic, partition) do
Map.merge(%{topic: topic, partition: partition}, kafka_message_to_map(msg))
end
Expand Down
2 changes: 0 additions & 2 deletions lib/kaffe/group_member/manager/group_manager.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ defmodule Kaffe.GroupManager do
The process begins by starting the client connection to Kafka. Then, group
members are created for each of the configured topics.
This is also the place where subscribers get the PID for a Worker.
"""

use GenServer
Expand Down
31 changes: 22 additions & 9 deletions lib/kaffe/producer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -59,20 +59,29 @@ defmodule Kaffe.Producer do
This is a simpler way to produce if you've only given Producer a single topic
for production and don't want to specify the topic for each call.
Returns:
* `:ok` on successfully producing the message
* `{:error, reason}` for any error
"""
def produce_sync(key, value) do
GenServer.call(@name, {:produce_sync, key, value})
end

@doc """
Synchronously produce the `key`/`value` to `topic`
See `produce_sync/2` for returns.
"""
def produce_sync(topic, key, value) do
GenServer.call(@name, {:produce_sync, topic, key, value})
end

@doc """
Synchronously produce the given `key`/`value` to the `topic`/`partition`
See `produce_sync/2` for returns.
"""
def produce_sync(topic, partition, key, value) do
GenServer.call(@name, {:produce_sync, topic, partition, key, value})
Expand All @@ -83,7 +92,7 @@ defmodule Kaffe.Producer do
## -------------------------------------------------------------------------

def init([config]) do
Kaffe.start_producer_client(config)
start_producer_client(config)
state = %Kaffe.Producer.State{
client: config.client_name,
topics: config.topics,
Expand All @@ -97,37 +106,41 @@ defmodule Kaffe.Producer do
"""
def handle_call({:produce_sync, key, value}, _from, state) do
topic = state.topics |> List.first
{:ok, new_state} = produce(topic, key, value, state)
{:reply, :ok, new_state}
{response, new_state} = produce(topic, key, value, state)
{:reply, response, new_state}
end

@doc """
Sync produce the `key`/`value` to the given `topic`
"""
def handle_call({:produce_sync, topic, key, value}, _from, state) do
{:ok, new_state} = produce(topic, key, value, state)
{:reply, :ok, new_state}
{response, new_state} = produce(topic, key, value, state)
{:reply, response, new_state}
end

@doc """
Sync produce the `key`/`value` to the given `topic` and `partition`
"""
def handle_call({:produce_sync, topic, partition, key, value}, _from, state) do
@kafka.produce_sync(state.client, topic, partition, key, value)
{:reply, :ok, state}
response = @kafka.produce_sync(state.client, topic, partition, key, value)
{:reply, response, state}
end

## -------------------------------------------------------------------------
## internal
## -------------------------------------------------------------------------

defp start_producer_client(config) do
@kafka.start_client(config.endpoints, config.client_name, config.producer_config)
end

defp produce(topic, key, value, state) do
topic_key = String.to_atom(topic)
details = state.partition_details[topic_key]
partition = choose_partition(
topic, details.partition, details.total, key, value, state.partition_strategy)
:ok = @kafka.produce_sync(state.client, topic, partition, key, value)
{:ok, put_in(state.partition_details[topic_key].partition, partition)}
response = @kafka.produce_sync(state.client, topic, partition, key, value)
{response, put_in(state.partition_details[topic_key].partition, partition)}
end

defp analyze(client, topics) do
Expand Down
8 changes: 4 additions & 4 deletions mix.lock
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
%{"brod": {:hex, :brod, "2.3.1", "c80036a92fff8a6be1182293c2cf353fa13e7deb11747d09fe0ae3fcd8d10f54", [:make, :rebar, :rebar3], [{:kafka_protocol, "0.9.1", [hex: :kafka_protocol, optional: false]}, {:supervisor3, "1.1.5", [hex: :supervisor3, optional: false]}]},
"kafka_protocol": {:hex, :kafka_protocol, "0.9.1", "8e943f26590a1ae034422fb8305f64a1aa21ff6dea4fb51e36b702c410861205", [:make, :rebar, :rebar3], [{:snappyer, "1.2.0", [hex: :snappyer, optional: false]}]},
%{"brod": {:hex, :brod, "2.3.1", "c80036a92fff8a6be1182293c2cf353fa13e7deb11747d09fe0ae3fcd8d10f54", [:make, :rebar, :rebar3], [{:kafka_protocol, "0.9.1", [hex: :kafka_protocol, repo: "hexpm", optional: false]}, {:supervisor3, "1.1.5", [hex: :supervisor3, repo: "hexpm", optional: false]}], "hexpm"},
"kafka_protocol": {:hex, :kafka_protocol, "0.9.1", "8e943f26590a1ae034422fb8305f64a1aa21ff6dea4fb51e36b702c410861205", [:make, :rebar, :rebar3], [{:snappyer, "1.2.0", [hex: :snappyer, repo: "hexpm", optional: false]}], "hexpm"},
"logfmt": {:hex, :logfmt, "3.2.0", "887a091adad28acc6e4d8b3d3bce177b934e7c61e7655c86946410f44aca6d84", [:mix], []},
"metrix": {:git, "https://github.com/rwdaigle/metrix.git", "a6738df9346da0412ca68f82a24a67d2a32b066e", [branch: "master"]},
"snappyer": {:hex, :snappyer, "1.2.0", "4761f475c53fec4a027dc90ea0626a735e3d276bfcfaf9a8dc4c7bbda4d1e058", [:make, :rebar, :rebar3], []},
"supervisor3": {:hex, :supervisor3, "1.1.5", "5f3c487a6eba23de0e64c06e93efa0eca06f40324a6412c1318c77aca6da8424", [:make, :rebar, :rebar3], []}}
"snappyer": {:hex, :snappyer, "1.2.0", "4761f475c53fec4a027dc90ea0626a735e3d276bfcfaf9a8dc4c7bbda4d1e058", [:make, :rebar, :rebar3], [], "hexpm"},
"supervisor3": {:hex, :supervisor3, "1.1.5", "5f3c487a6eba23de0e64c06e93efa0eca06f40324a6412c1318c77aca6da8424", [:make, :rebar, :rebar3], [], "hexpm"}}
8 changes: 8 additions & 0 deletions test/kaffe/producer_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ defmodule Kaffe.ProducerTest do
@test_partition_count Application.get_env(:kaffe, :test_partition_count)

setup do
Process.register(self(), :test_case)
Producer.start_link

%{
client_name: :client,
endpoints: [kafka: 9092],
Expand Down Expand Up @@ -49,6 +52,11 @@ defmodule Kaffe.ProducerTest do
assert_receive [:produce_sync, "topic2", ^partition, "key", "value"]
end

test "produce_sync passes through the result" do
TestBrod.set_produce_response(response = {:error, {:producer_down, :noproc}})
assert response == Producer.produce_sync("key", "value")
end

describe "partition selection" do
test "round robin strategy", c do
state = c.producer_state
Expand Down
21 changes: 18 additions & 3 deletions test/support/test_brod.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,33 @@ defmodule TestBrod do

@test_partition_count Application.get_env(:kaffe, :test_partition_count)

def start_client(_endpoints, _client_name, _producer_config) do
GenServer.start_link(__MODULE__, :ok, name: TestBrod)
end

def start_link_group_subscriber(_client, _consumer_group, _topics, _group_config, _consumer_config, _handler, _init_args) do
GenServer.start_link(__MODULE__, :ok)
end

def produce_sync(_client, topic, partition, key, value) do
send self, [:produce_sync, topic, partition, key, value]
:ok
GenServer.call(TestBrod, {:produce_sync, topic, partition, key, value})
end

def get_partitions_count(_client, _topic), do: {:ok, @test_partition_count}

def set_produce_response(response) do
GenServer.call(TestBrod, {:set_produce_response, response})
end

def init(:ok) do
{:ok, %{}}
{:ok, %{produce_response: :ok}}
end

def handle_call({:produce_sync, topic, partition, key, value}, _from, state) do
send :test_case, [:produce_sync, topic, partition, key, value]
{:reply, state.produce_response, state}
end
def handle_call({:set_produce_response, response}, _from, state) do
{:reply, response, %{state | produce_response: response}}
end
end

0 comments on commit 7f24010

Please sign in to comment.