Skip to content

Commit

Permalink
Handle client down error
Browse files Browse the repository at this point in the history
Closes #97
  • Loading branch information
MortezaHosseini authored and Kevin Lewis committed Mar 23, 2020
1 parent cc984b3 commit ff860d0
Showing 1 changed file with 12 additions and 7 deletions.
19 changes: 12 additions & 7 deletions lib/kaffe/producer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,12 @@ defmodule Kaffe.Producer do
message_list
|> add_timestamp
|> group_by_partition(topic, partition_strategy)
|> produce_list_to_topic(topic)
|> case do
messages = %{} -> produce_list_to_topic(messages, topic)
{:error, reason} ->
Logger.warn("Error while grouping by partition #{inspect(reason)}")
{:error, reason}
end
end

defp produce_value(topic, key, value) do
Expand Down Expand Up @@ -149,12 +154,12 @@ defmodule Kaffe.Producer do
end

defp group_by_partition(messages, topic, partition_strategy) do
{:ok, partitions_count} = @kafka.get_partitions_count(client_name(), topic)

messages
|> Enum.group_by(fn {_timestamp, key, message} ->
choose_partition(topic, partitions_count, key, message, partition_strategy)
end)
with {:ok, partitions_count} <- @kafka.get_partitions_count(client_name(), topic) do
messages
|> Enum.group_by(fn {_timestamp, key, message} ->
choose_partition(topic, partitions_count, key, message, partition_strategy)
end)
end
end

defp produce_list_to_topic(message_list, topic) do
Expand Down

0 comments on commit ff860d0

Please sign in to comment.