Skip to content

Commit

Permalink
Try and smartly reuse the enumerable as enumerator when possible
Browse files Browse the repository at this point in the history
  • Loading branch information
JohSand committed Nov 1, 2023
1 parent d3ce7df commit 156be5e
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 11 deletions.
28 changes: 17 additions & 11 deletions src/Orsak/EffSeqBuilder.fs
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,15 @@ type ResumableAsyncEnumerator<'T>() =

//allows external callers do drive the state-machine

/// allows MoveNextAsync know that we are fully done, and will never move again.
[<DefaultValue(false)>]
val mutable IsComplete: bool
val mutable InProgress: int

/// Used by the AsyncEnumerator interface to return the Current value when
/// IAsyncEnumerator.Current is called
[<DefaultValue(false)>]
val mutable Current: ValueOption<'T>


interface IAsyncEnumerator<'T> with
member this.Current =
match this.Current with
Expand All @@ -59,7 +59,7 @@ type ResumableAsyncEnumerator<'T>() =
let mutable src = &this.ValueTaskSource
src.Reset()

if this.Token.IsCancellationRequested || this.IsComplete then
if this.Token.IsCancellationRequested || this.InProgress = 0 then
ValueTask<_> false
else
this.MoveNext()
Expand All @@ -76,7 +76,12 @@ type ResumableAsyncEnumerator<'T>() =
| ValueTaskSourceStatus.Pending
| _ -> ValueTask<bool>(this, version)

member this.DisposeAsync() = this.Registration.DisposeAsync()
member this.DisposeAsync() = vtask {
do! this.Registration.DisposeAsync()
this.InProgress <- -1
return ()
}


[<Struct; NoComparison; NoEquality>]
type EffSeq<'r, 'a, 'e> =
Expand Down Expand Up @@ -138,10 +143,10 @@ and [<NoComparison; NoEquality>] EffectEnumerable<'Env, 'Machine, 'T, 'Err
interface IAsyncEnumerable<Result<'T, 'Err>> with
member this.GetAsyncEnumerator(ct) =
let enumerator =
if this.IsComplete then
EffectEnumerable<'Env, 'Machine, 'T, 'Err>()
else
if Interlocked.CompareExchange(&this.InProgress, 1, -1) = -1 then
this
else
EffectEnumerable<'Env, 'Machine, 'T, 'Err>(InProgress = 1)

enumerator.Token <- ct

Expand Down Expand Up @@ -174,8 +179,8 @@ type EffSeqBuilder() =

try
let __stack_code_fin = code.Invoke(&sm)
sm.Data.Enumerator.IsComplete <- __stack_code_fin
//always finish current itteration step if we complete the itteration, else finish it if we have gotten some data
sm.Data.Enumerator.InProgress <- Convert.ToInt32(not __stack_code_fin)
//always finish current itteration step if we complete the iteration, else finish it if we have gotten some data
if __stack_code_fin || sm.Data.Enumerator.Current.IsSome then
sm.Data.Enumerator.ValueTaskSource.SetResult(not __stack_code_fin)
else
Expand All @@ -188,10 +193,10 @@ type EffSeqBuilder() =
| null -> ()
| exn ->
sm.Data.Enumerator.ValueTaskSource.SetException exn
sm.Data.Enumerator.IsComplete <- true))
sm.Data.Enumerator.InProgress <- 0))
(SetStateMachineMethodImpl<_>(fun _ _ -> ()))
(AfterCode<_, _>(fun sm ->
let ts = EffectEnumerable<'Env, _, 'T, 'Err>(InitialMachine = sm)
let ts = EffectEnumerable<'Env, _, 'T, 'Err>(InitialMachine = sm, InProgress = -1)

EffSeq.Effect(
EffectSeqDelegate(fun env ->
Expand Down Expand Up @@ -527,6 +532,7 @@ module HighPrioritySeq =
ResumableCode.Yield().Invoke(&sm))

type EffBuilderBase with

member inline this.For
(
s: EffSeq<'Env, 'a, 'Err>,
Expand Down
21 changes: 21 additions & 0 deletions tests/Orsak.Tests/EffSeqTests.fs
Original file line number Diff line number Diff line change
Expand Up @@ -689,6 +689,27 @@ module ``Effect Sequences With Elements`` =
}
|> evaluatesToSequence [ 1; 2 ]

[<Fact>]
let ``should support parallel calls to GetEnumerator`` () = task {
let enumerable =
effSeq {
1
do! vtask { do! Task.Yield() }
2
}
do!
Parallel.ForEachAsync(
[ 1.. 10 ],
ParallelOptions(MaxDegreeOfParallelism = 10),
(fun _ _ -> ValueTask(task = task {
do! evaluatesToSequence [ 1; 2; ] enumerable
return ()
}))
)

return ()
}

[<Fact>]
let ``should bind async`` () =
effSeq {
Expand Down

0 comments on commit 156be5e

Please sign in to comment.