Skip to content

Commit

Permalink
PR: Recursive flatmap fixes for On_Complete tracking
Browse files Browse the repository at this point in the history
Recursive flatmap fixes for On_Complete tracking
  • Loading branch information
mosteo authored Jun 20, 2019
2 parents 802d0f2 + d4d0765 commit 6363c74
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 54 deletions.
4 changes: 4 additions & 0 deletions dirx/src/dirx-observables.adb
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
with Rx.Debug;
with Rx.Errors;

package body DirX.Observables is
Expand Down Expand Up @@ -104,15 +105,18 @@ package body DirX.Observables is
Item : constant Directory_Entry := Wrap (DE);
begin
AD.Get_Next_Entry (Search, DE.all);
Rx.Debug.Trace ("dir_entry on_next");
Consumer.On_Next (Item);
end;
end loop;

AD.End_Search (Search);
Rx.Debug.Trace ("dir_entry on_complete");
Consumer.On_Complete;
exception
when E : others =>
AD.End_Search (Search);
Rx.Debug.Trace ("dir_entry on_error");
Consumer.On_Error (Rx.Errors.Create (E));
end Subscribe;

Expand Down
69 changes: 21 additions & 48 deletions src/body/rx-op-flatmap.adb
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ package body Rx.Op.Flatmap is

type Unsafe_Controller is record
Subscribed : Boolean := True;
Master_Finished : Boolean := False;
Live_Subscriptions : Natural := 0;
Live_Subscriptions : Natural := 1;
-- One corresponding to the original upstream observable;
-- In the non-recursive case we get more via elements;
-- In the recursive case we get even more via the fron resubscribing.
end record;

procedure Add_Sub (Ctrl : in out Unsafe_Controller) is
Expand All @@ -34,16 +36,9 @@ package body Rx.Op.Flatmap is
Debug.Trace ("flatmap subs--: " & Ctrl.Live_Subscriptions'Img);
end Del_Sub;

procedure Mark_Front_Completed (Ctrl : in out Unsafe_Controller) is
begin
Ctrl.Master_Finished := True;
Debug.Trace ("flatmap master sub finished");
end Mark_Front_Completed;

procedure Mark_Errored (Ctrl : in out Unsafe_Controller) is
begin
Ctrl.Subscribed := False;
Ctrl.Master_Finished := True;
Ctrl.Live_Subscriptions := 0;
Debug.Trace ("flatmap mark_errored");
end Mark_Errored;
Expand Down Expand Up @@ -128,65 +123,44 @@ package body Rx.Op.Flatmap is
-----------------

overriding procedure On_Complete (This : in out Front) is
Done : aliased Boolean := False;
procedure Check_Done (Ctrl : in out Unsafe_Controller) is
begin
Done := Ctrl.Master_Finished and then Ctrl.Live_Subscriptions = 0;
end Check_Done;
begin
if not This.Is_Subscribed then
Debug.Trace ("front on_complete [unsubscribed]");
raise No_Longer_Subscribed;
end if;

This.Control.Apply (Mark_Front_Completed'Access);
This.Control.Apply (Check_Done'Access);
if Done then
Debug.Trace ("front on_complete [for real]");
This.Get_Observer.On_Complete;
This.Unsubscribe;
else
Debug.Trace ("front on_complete [subs pending]");
end if;
Debug.Trace ("front on_complete [passing down]");
-- We don't do checks on completion here, since we can centralize
-- completion on the downstream unique Back observer
-- Also, since we don't know if this comes from main upstream or
-- secondary subscriptions (in the recursive case), we would be unable
-- to properly do counting.

This.Get_Observer.On_Complete;
end On_Complete;

overriding procedure On_Complete (This : in out Back) is
Done_Master : aliased Boolean := False;
Done_Subs : aliased Boolean := False;
Done : aliased Boolean := False;
procedure Check_Done (Ctrl : in out Unsafe_Controller) is
begin
Done_Master := Ctrl.Master_Finished;
Done_Subs := Ctrl.Live_Subscriptions = 0;
Done := Ctrl.Live_Subscriptions = 0;
end Check_Done;
begin
if not This.Is_Subscribed then
Debug.Trace ("back on_complete [unsubscribed]");
raise No_Longer_Subscribed;
end if;

This.Control.Apply (Del_Sub'Access);
This.Control.Apply (Check_Done'Access);
if Done_Master and then Done_Subs then
-- On_Complete without live subscriptions mean master flow is
-- complete, and we can pack out
Debug.Trace ("back on_complete [from front]");
if Done then
-- On_Complete without live subscriptions mean all, primary & secondary
-- flows, have completed
Debug.Trace ("back on_complete [final]");
This.Get_Observer.On_Complete;
This.Unsubscribe;
else
-- No matter if master is complete, being here means we have live subs
-- and this comes from one of them
This.Control.Apply (Del_Sub'Access);
This.Control.Apply (Check_Done'Access);
if Done_Master and then Done_Subs then
Debug.Trace ("back on_complete [from subs]");
This.Get_Observer.On_Complete;
This.Unsubscribe;
elsif Done_Master then
Debug.Trace ("back on_complete [master pending]");
elsif Done_Subs then
Debug.Trace ("back on_complete [subs pending]");
else
Debug.Trace ("back on_complete [master+subs pending]");
end if;
Debug.Trace ("back on_complete [subs pending]");
end if;
end On_Complete;

Expand Down Expand Up @@ -294,8 +268,7 @@ package body Rx.Op.Flatmap is
Debug.Trace ("flatmap subscribe [1st]");
This.Control := Wrap (new Unsafe_Controller'
(Subscribed => True,
Master_Finished => <>,
Live_Subscriptions => <>));
Live_Subscriptions => 1));
declare
use Preserver.Linkers;
Downstream : Preserver.Operator'Class :=
Expand Down
23 changes: 18 additions & 5 deletions src/main/rx-devel.adb
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,12 @@ package body Rx.Devel is

-- function Selfsum (I : Rx_Integer) return Integers.Observable'Class is (Just (I + I));

-- Inf : Integer_To_String.Typed.Actions.Inflater1 := AAA'Access;
-- Inf : Integer_To_String.Typed.Actions.Inflater1 := AAA'Access;

function Below (I : Rx_Integer) return Integers.Observable'Class is
(if I <= 1
then Integers.Empty
else Numeric.Integers.Range_Slice (1, I - 1));

procedure Run is
Subs : Rx.Subscriptions.Subscription with Unreferenced;
Expand All @@ -27,15 +32,23 @@ package body Rx.Devel is
-- & Expand (Selfsum'Access)
-- & Limit (16)
-- & Images.Integers.Print
-- & Subscribe;

-- Subs :=
-- From ((1, 2))
-- & Flat_Map (No_Op)
-- & Images.Integers.Print
-- & Subscribe;

Subs :=
Numeric.Integers.Range_Slice (1, 5)
& Flat_Map (Repeat (4)
& Observe_On (Schedulers.Computation)
& Hold (Fixed => 0.0, Random => 0.01))
From ((1, 2, 3, 4))
& Expand (Below'Access)
& Images.Integers.Print
& Subscribe;

while Subs.Is_Subscribed loop
delay 0.1;
end loop;
end Run;

end Rx.Devel;
16 changes: 15 additions & 1 deletion src/utests/rx-tests.adb
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,12 @@ package body Rx.Tests is

function Selfsum (I : Rx_Integer) return Integers.Observable'Class is (Just (I + I));

function Below (I : Rx_Integer) return Integers.Observable'Class is
(if I <= 1
then Integers.Empty
else Numeric.Integers.Range_Slice (1, I - 1));
-- Recursively emit integers below given one. Used to test Expand with finite recursivity

procedure Stopwatch_Test (Event_Kind : Rx_Event_Kinds;
Unused : Duration;
Since_Subscription : Duration)
Expand Down Expand Up @@ -466,11 +472,19 @@ package body Rx.Tests is
Just (1)
& Expand (Selfsum'Access)
& Limit (16)
& Subscribe_Checker (Name => "expand",
& Subscribe_Checker (Name => "expand & limit",
Do_Count => True, Ok_Count => 16,
Do_First => True, Ok_First => 1,
Do_Last => True, Ok_Last => 32768);

Subs :=
From ((1, 2, 3, 4))
& Expand (Below'Access)
& Subscribe_Checker (Name => "expand finite",
Do_Count => True, Ok_Count => 15,
Do_First => True, Ok_First => 1,
Do_Last => True, Ok_Last => 1);

-- No_Op
Subs :=
Just (1) &
Expand Down

0 comments on commit 6363c74

Please sign in to comment.