Skip to content

Commit

Permalink
Ensure both inner and outer transform streams are closed
Browse files Browse the repository at this point in the history
  • Loading branch information
José Valim committed Oct 27, 2016
1 parent de3873f commit 8505463
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 2 deletions.
12 changes: 10 additions & 2 deletions lib/elixir/lib/stream.ex
Original file line number Diff line number Diff line change
Expand Up @@ -761,7 +761,8 @@ defmodule Stream do
do_transform(user_acc.(), user, fun, :cont, next, inner_acc, inner, after_fun)
end

defp do_transform(user_acc, _user, _fun, _next_op, _next, {:halt, inner_acc}, _inner, after_fun) do
defp do_transform(user_acc, _user, _fun, _next_op, next, {:halt, inner_acc}, _inner, after_fun) do
next.({:halt, []})
do_after(after_fun, user_acc)
{:halted, inner_acc}
end
Expand All @@ -776,7 +777,14 @@ defmodule Stream do
end

defp do_transform(user_acc, user, fun, :cont, next, inner_acc, inner, after_fun) do
case next.({:cont, []}) do
try do
next.({:cont, []})
catch
kind, reason ->
stacktrace = System.stacktrace
do_after(after_fun, user_acc)
:erlang.raise(kind, reason, stacktrace)
else
{:suspended, [val], next} ->
do_transform_user(val, user_acc, user, fun, :cont, next, inner_acc, inner, after_fun)
{_, [val]} ->
Expand Down
18 changes: 18 additions & 0 deletions lib/elixir/test/elixir/stream_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -686,6 +686,24 @@ defmodule StreamTest do
assert Process.get(:stream_transform)
end

test "transform/4 closes on nested errors" do
stream =
1..10
|> Stream.transform(fn -> 0 end,
fn 3, _ -> throw(:error)
x, acc -> {[x + acc], x} end,
fn _ -> Process.put(:stream_transform_inner, true) end)
|> Stream.transform(fn -> 0 end,
fn x, acc -> {[x], acc} end,
fn 0 -> Process.put(:stream_transform_outer, true) end)

Process.put(:stream_transform_inner, false)
Process.put(:stream_transform_outer, false)
assert catch_throw(Enum.to_list(stream)) == :error
assert Process.get(:stream_transform_inner)
assert Process.get(:stream_transform_outer)
end

test "transform/4 is zippable" do
stream = Stream.transform(1..20, fn -> 0 end,
fn 10, acc -> {:halt, acc}
Expand Down

0 comments on commit 8505463

Please sign in to comment.