Skip to content

Commit

Permalink
formatting and javadoc
Browse files Browse the repository at this point in the history
  • Loading branch information
johnmcclean committed Oct 12, 2016
1 parent a62f144 commit abaf35f
Show file tree
Hide file tree
Showing 35 changed files with 11,457 additions and 6,351 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package com.aol.cyclops.control;


import java.util.Objects;
import java.util.stream.Stream;

Expand Down Expand Up @@ -37,17 +36,17 @@ public class FluxSource {
*/
public static <T> MultipleFluxSource<T> ofMultiple() {
return new MultipleFluxSource<T>(
FluxSource.ofUnbounded()
.createQueue());
FluxSource.ofUnbounded()
.createQueue());
}

/**
* @return a builder that will use Topics to allow multiple Streams from the same data
*/
public static <T> MultipleFluxSource<T> ofMultiple(int backPressureAfter) {
return new MultipleFluxSource<T>(
FluxSource.of(backPressureAfter)
.createQueue());
FluxSource.of(backPressureAfter)
.createQueue());
}

/**
Expand All @@ -56,8 +55,8 @@ public static <T> MultipleFluxSource<T> ofMultiple(int backPressureAfter) {
public static <T> MultipleFluxSource<T> ofMultiple(QueueFactory<?> q) {
Objects.requireNonNull(q);
return new MultipleFluxSource<T>(
FluxSource.of(q)
.createQueue());
FluxSource.of(q)
.createQueue());
}

/**
Expand Down Expand Up @@ -116,7 +115,7 @@ public static FluxSource of(int backPressureAfter) {
throw new IllegalArgumentException(
"Can't apply back pressure after less than 1 event");
return new FluxSource(
backPressureAfter, true);
backPressureAfter, true);
}

<T> Queue<T> createQueue() {
Expand Down Expand Up @@ -176,6 +175,7 @@ public <T> PushableStream<T> stream() {
q, (Stream) q.stream());

}

/**
* Create a pushable Flux
*
Expand All @@ -185,8 +185,9 @@ public <T> PushableStream<T> stream() {
public <T> PushableFlux<T> flux() {
Queue<T> q = createQueue();
return new PushableFlux<T>(
q, Flux.from(q.stream()));
q, Flux.from(q.stream()));
}

/**
* Create a pushable ReactiveSeq
*
Expand Down Expand Up @@ -220,6 +221,7 @@ public static <T> ReactiveSeq<T> reactiveSeq(Adapter<T> adapter) {

return adapter.stream();
}

/**
* Create a pushable ReactiveSeq
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@

import reactor.core.publisher.Flux;


/**
* Utilities for working with reactive-streams Publishers
*
Expand Down Expand Up @@ -39,26 +38,26 @@ public class Publishers {
public static <T> AnyMSeq<T> anyM(Publisher<T> flux) {
return AnyM.ofSeq(flux);
}

/**
* Convert a reactive-streams Publisher to a cyclops-react ReactiveSeq extended Stream type
*
* @param pub Publisher to convert to a Stream
* @return ReactiveSeq
*/
public static<T> ReactiveSeq<T> stream(Publisher<T> pub){
public static <T> ReactiveSeq<T> stream(Publisher<T> pub) {
return ReactiveSeq.fromStream(jdkStream(pub));
}

/**
* Convert a reactive-streams Publisher to a plain java.util.Stream
*
* @param pub Publisher to convert to a Stream
* @return Stream
*/
public static<T> Stream<T> jdkStream(Publisher<T> pub){
public static <T> Stream<T> jdkStream(Publisher<T> pub) {
SeqSubscriber<T> sub = SeqSubscriber.subscriber();
pub.subscribe(sub);
return StreamSupport.stream(sub.spliterator(),false);
return StreamSupport.stream(sub.spliterator(), false);
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package com.aol.cyclops.control;


import static org.hamcrest.Matchers.equalTo;
import static org.junit.Assert.assertThat;

Expand Down Expand Up @@ -45,7 +44,6 @@ public class ReactorPipes<K, V> {

private final ConcurrentMap<K, Adapter<V>> registered = new ConcurrentHashMap<>();


/**
* @return Size of registered pipes
*/
Expand Down Expand Up @@ -133,6 +131,7 @@ public Maybe<LazyFutureStream<V>> futureStream(final K key, final LazyReact reac
public Maybe<ReactiveSeq<V>> reactiveSeq(final K key) {
return get(key).map(a -> a.stream());
}

/**
* @param key : Adapter identifier
* @return LazyFutureStream from selected Queue
Expand All @@ -147,8 +146,7 @@ public LazyListX<V> xValues(final K key, final long x) {
return get(key).peek(a -> a.stream()
.subscribe(sub))
.map(a -> LazyListX.fromStreamS(sub.stream()
.limit(x))
)
.limit(x)))
.orElse(LazyListX.empty());
}

Expand Down
Loading

0 comments on commit abaf35f

Please sign in to comment.