Skip to content

Commit

Permalink
Add random partition strategy
Browse files Browse the repository at this point in the history
  • Loading branch information
sdball committed Oct 24, 2016
1 parent 1e6d335 commit 7925525
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 20 deletions.
4 changes: 4 additions & 0 deletions lib/kaffe/partition_selector.ex
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,8 @@ defmodule Kaffe.PartitionSelector do
0
end
end

def random(total) do
:crypto.rand_uniform(0, total)
end
end
4 changes: 4 additions & 0 deletions lib/kaffe/producer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -141,4 +141,8 @@ defmodule Kaffe.Producer do
defp next_partition(_topic, current_partition, partitions_count, _key, _value, :round_robin) do
Kaffe.PartitionSelector.round_robin(current_partition, partitions_count)
end

defp next_partition(_topic, _current_partition, partitions_count, _key, _value, :random) do
Kaffe.PartitionSelector.random(partitions_count)
end
end
63 changes: 43 additions & 20 deletions test/kaffe/producer_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -47,30 +47,53 @@ defmodule Kaffe.ProducerTest do
assert_receive [:produce_sync, "topic2", ^partition, "key", "value"]
end

test "producer uses the configured partition selection strategy when it chooses the next partition", c do
starting_state = c.producer_state
assert starting_state.partition_strategy == :round_robin
describe "partition selection" do
test "round robin strategy", c do
state = c.producer_state
state = %{state | partition_strategy: :round_robin}

{:reply, :ok, new_state} = Producer.handle_call(
{:produce_sync, "topic", "key", "value"}, self, starting_state)
assert_receive [:produce_sync, "topic", 0, "key", "value"]
{:reply, :ok, new_state} = Producer.handle_call(
{:produce_sync, "topic", "key", "value"}, self, state)
assert_receive [:produce_sync, "topic", 0, "key", "value"]

{:reply, :ok, new_state} = Producer.handle_call(
{:produce_sync, "topic", "key", "value"}, self, new_state)
assert_receive [:produce_sync, "topic", 1, "key", "value"]
{:reply, :ok, new_state} = Producer.handle_call(
{:produce_sync, "topic", "key", "value"}, self, new_state)
assert_receive [:produce_sync, "topic", 1, "key", "value"]

Producer.handle_call(
{:produce_sync, "topic", "key", "value"}, self, new_state)
assert_receive [:produce_sync, "topic", 2, "key", "value"]
end
{:reply, :ok, _new_state} = Producer.handle_call(
{:produce_sync, "topic", "key", "value"}, self, new_state)
assert_receive [:produce_sync, "topic", 2, "key", "value"]
end

test "producer does not use the partition selection strategy when given direct partition", c do
{:reply, :ok, new_state} = Producer.handle_call(
{:produce_sync, "topic", 0, "key", "value"}, self, c.producer_state)
assert_receive [:produce_sync, "topic", 0, "key", "value"]
test "random partition strategy", c do
state = c.producer_state
state = %{state | partition_strategy: :random}

Producer.handle_call(
{:produce_sync, "topic", 0, "key", "value"}, self, new_state)
assert_receive [:produce_sync, "topic", 0, "key", "value"]
{:reply, :ok, new_state} = Producer.handle_call(
{:produce_sync, "topic", "key", "value"}, self, state)
assert_receive [:produce_sync, "topic", 0, "key", "value"]

{:reply, :ok, new_state} = Producer.handle_call(
{:produce_sync, "topic", "key", "value"}, self, new_state)
assert_receive [:produce_sync, "topic", random_partition, "key", "value"]

assert (0 <= random_partition) && (random_partition <= 19)

{:reply, :ok, _new_state} = Producer.handle_call(
{:produce_sync, "topic", "key", "value"}, self, new_state)
assert_receive [:produce_sync, "topic", random_partition, "key", "value"]

assert (0 <= random_partition) && (random_partition <= 19)
end

test "producer does not use a selection strategy when given a direct partition", c do
{:reply, :ok, new_state} = Producer.handle_call(
{:produce_sync, "topic", 0, "key", "value"}, self, c.producer_state)
assert_receive [:produce_sync, "topic", 0, "key", "value"]

Producer.handle_call(
{:produce_sync, "topic", 0, "key", "value"}, self, new_state)
assert_receive [:produce_sync, "topic", 0, "key", "value"]
end
end
end

0 comments on commit 7925525

Please sign in to comment.