Skip to content

Commit

Permalink
Last bit of reactive for now; more locking and threading support or w…
Browse files Browse the repository at this point in the history
…ait for portable library support and use Rx for real.
  • Loading branch information
Greg Munn committed Jun 5, 2012
1 parent 01147bc commit 91ee30b
Show file tree
Hide file tree
Showing 6 changed files with 132 additions and 63 deletions.
1 change: 1 addition & 0 deletions MonoKit.iOS/Reactive/UIKitObservableExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ namespace MonoKit.Reactive
{
using System;
using MonoKit.Reactive.Concurrency;
using MonoKit.Reactive.Linq;

public static class UIKitObservableExtensions
{
Expand Down
3 changes: 2 additions & 1 deletion MonoKit/MonoKit.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
<Folder Include="Reactive\Subjects\" />
<Folder Include="Domain\Reactive\" />
<Folder Include="Reactive\Concurrency\" />
<Folder Include="Reactive\Linq\" />
</ItemGroup>
<ItemGroup>
<Compile Include="DataBinding\Binding.cs" />
Expand Down Expand Up @@ -134,12 +135,12 @@
<Compile Include="Domain\UnitOfWorkEventBus.cs" />
<Compile Include="Domain\IEventBus.cs" />
<Compile Include="Domain\IReadModel.cs" />
<Compile Include="Reactive\Observable.cs" />
<Compile Include="Reactive\Concurrency\IScheduler.cs" />
<Compile Include="ObservableExtensions.cs" />
<Compile Include="Reactive\Concurrency\ImmediateScheduler.cs" />
<Compile Include="Reactive\Observer.cs" />
<Compile Include="Reactive\ScheduledObserver.cs" />
<Compile Include="Reactive\EmptyActions.cs" />
<Compile Include="Reactive\Linq\Observable.cs" />
</ItemGroup>
</Project>
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,11 @@
// --------------------------------------------------------------------------------------------------------------------
//

namespace MonoKit.Reactive
namespace MonoKit.Reactive.Linq
{
using System;
using System.Threading.Tasks;
using MonoKit.Reactive;
using MonoKit.Reactive.Concurrency;
using MonoKit.Reactive.Subjects;
using MonoKit.Reactive.Disposables;
Expand Down Expand Up @@ -128,6 +129,130 @@ public static IObservable<T> AsObservable<T>(this Task<T> task)
return subscription;
});
}

public static IObservable<T> Do<T>(this IObservable<T> source, Action<T> onNextAction)
{
Func<IObserver<T>, IDisposable> subscribeFunction = observer => source.Subscribe(
(x =>
{
try
{
onNextAction(x);
}
catch (Exception ex)
{
observer.OnError(ex);
return;
}

observer.OnNext(x);
}),
observer.OnError,
observer.OnCompleted);

return new AnonymousObservable<T>(subscribeFunction);
}

public static IObservable<T> Where<T>(this IObservable<T> source, Func<T, bool> whereClause)
{
return new AnonymousObservable<T>(
observer => source.Subscribe(
new AnonymousObserver<T>(
x =>
{
bool result;
try
{
result = whereClause(x);
}
catch (Exception exception)
{
observer.OnError(exception);
return;
}

if (result)
{
observer.OnNext(x);
}
},
observer.OnError,
observer.OnCompleted))
);


}


public static IObservable<TResult> Select<T, TResult>(this IObservable<T> source, Func<T, TResult> selector)
{
return new AnonymousObservable<TResult>(
observer => source.Subscribe(
new AnonymousObserver<T>(
x =>
{
TResult result;

try
{
result = selector(x);
}
catch (Exception exception)
{
observer.OnError(exception);
return;
}

observer.OnNext(result);
},
observer.OnError,
observer.OnCompleted)));


}


public static IObservable<T> Take<T>(this IObservable<T> source, int count)
{
return new AnonymousObservable<T>(
observer => source.Subscribe(
new AnonymousObserver<T>(
x =>
{
if (count > 0)
{
count--;
observer.OnNext(x);
if (count < 1)
{
observer.OnCompleted();
}
}
},
observer.OnError,
observer.OnCompleted)));
}


public static IObservable<T> Skip<T>(this IObservable<T> source, int count)
{
return new AnonymousObservable<T>(
observer => source.Subscribe(
new AnonymousObserver<T>(
x =>
{
if (count <= 0)
{
observer.OnNext(x);
}
else
{
count--;
}
},
observer.OnError,
observer.OnCompleted)));
}
}
}

28 changes: 0 additions & 28 deletions MonoKit/Reactive/Observer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,34 +27,6 @@
namespace MonoKit.Reactive
{
using System;
using MonoKit.Reactive.Disposables;

// internal class HotObservable<T> : IObservable<T>
// {
// IDisposable scheduler_disposable;
// Subject<T> subject;
//
// public HotObservable (Action<IObserver<T>> work, IScheduler scheduler)
// {
// subject = new Subject<T> ();
// scheduler_disposable = scheduler.Schedule (() => work (subject));
// }
//
// bool disposed;
//
// public void Dispose ()
// {
// if (disposed)
// return;
// disposed = true;
// scheduler_disposable.Dispose ();
// }
//
// public IDisposable Subscribe (IObserver<T> observer)
// {
// return subject.Subscribe (observer);
// }
// }

public static class Observer
{
Expand Down
35 changes: 2 additions & 33 deletions MonoKit/Reactive/ScheduledObserver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,43 +19,12 @@
// --------------------------------------------------------------------------------------------------------------------
//

using MonoKit.Reactive.Concurrency;
using System.Collections.Generic;
using System.Threading.Tasks;
using MonoKit.Reactive.Subjects;

namespace MonoKit.Reactive
{
using System;
using MonoKit.Reactive.Disposables;
using MonoKit.Reactive.Concurrency;
using System.Collections.Generic;

// internal class HotObservable<T> : IObservable<T>
// {
// IDisposable scheduler_disposable;
// Subject<T> subject;
//
// public HotObservable (Action<IObserver<T>> work, IScheduler scheduler)
// {
// subject = new Subject<T> ();
// scheduler_disposable = scheduler.Schedule (() => work (subject));
// }
//
// bool disposed;
//
// public void Dispose ()
// {
// if (disposed)
// return;
// disposed = true;
// scheduler_disposable.Dispose ();
// }
//
// public IDisposable Subscribe (IObserver<T> observer)
// {
// return subject.Subscribe (observer);
// }
// }

public class ScheduledObserver<T> : IObserver<T>, IDisposable
{
private readonly Queue<Action> queue;
Expand Down
1 change: 1 addition & 0 deletions MonoKitSample/Samples.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
using MonoKit;
using System.Threading;
using System.Threading.Tasks;
using MonoKit.Reactive.Linq;

namespace MonoKitSample
{
Expand Down

0 comments on commit 91ee30b

Please sign in to comment.