Skip to content

Commit

Permalink
for comp. refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
johnmcclean committed Oct 18, 2016
1 parent 9b207d3 commit 0b6b580
Show file tree
Hide file tree
Showing 10 changed files with 81 additions and 79 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.aol.cyclops.types.Functor;
import com.aol.cyclops.types.stream.reactive.SeqSubscriber;

import javaslang.API;
import javaslang.Lazy;
import javaslang.collection.Array;
import javaslang.collection.CharSeq;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
package com.aol.cyclops.control;

import static org.hamcrest.Matchers.equalTo;
import static org.junit.Assert.assertThat;

import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Objects;
Expand All @@ -17,7 +14,6 @@

import com.aol.cyclops.data.LazyImmutable;
import com.aol.cyclops.data.async.Adapter;
import com.aol.cyclops.data.async.Queue;
import com.aol.cyclops.data.collections.extensions.persistent.PMapX;
import com.aol.cyclops.react.threads.SequentialElasticPools;
import com.aol.cyclops.reactor.collections.extensions.standard.LazyListX;
Expand Down
24 changes: 12 additions & 12 deletions cyclops-reactor/src/main/java/com/aol/cyclops/reactor/FluxTs.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,11 @@ public static <T> FluxTSeq<T> fluxT(Publisher<Flux<T>> nested) {
* <pre>
* {@code
*
* import static com.aol.cyclops.reactor.FluxTs.forEach;
* import static com.aol.cyclops.reactor.FluxTs.forEach4;
*
* FluxT<Integer> fluxT = FluxT.fromIterable(Arrays.asList(Flux.range(10,2),Flux.range(100,2)));
*
forEach(fluxT,
forEach4(fluxT,
a-> ReactiveSeq.iterate(a,i->i+1).limit(2),
(a,b)-> ReactiveSeq.iterate(a,i->i+1).limit(2),
(a,b,c)-> ReactiveSeq.iterate(a,i->i+1).limit(2),
Expand All @@ -90,7 +90,7 @@ public static <T> FluxTSeq<T> fluxT(Publisher<Flux<T>> nested) {
* @param yieldingFunction Generates a result per combination
* @return FluxT with an element per combination of nested publishers generated by the yielding function
*/
public static <T1, T2, T3, R1, R2, R3, R> FluxT<R> forEach(FluxT<? extends T1> value1,
public static <T1, T2, T3, R1, R2, R3, R> FluxT<R> forEach4(FluxT<? extends T1> value1,
Function<? super T1, ? extends Publisher<R1>> value2,
BiFunction<? super T1, ? super R1, ? extends Publisher<R2>> value3,
TriFunction<? super T1, ? super R1, ? super R2, ? extends Publisher<R3>> value4,
Expand All @@ -113,11 +113,11 @@ public static <T1, T2, T3, R1, R2, R3, R> FluxT<R> forEach(FluxT<? extends T1> v
* <pre>
* {@code
*
* import static com.aol.cyclops.reactor.FluxTs.forEach;
* import static com.aol.cyclops.reactor.FluxTs.forEach4;
*
* FluxT<Integer> fluxT = FluxT.fromIterable(Arrays.asList(Flux.range(10,2),Flux.range(100,2)));
*
forEach(fluxT,
forEach4(fluxT,
a-> ReactiveSeq.iterate(a,i->i+1).limit(2),
(a,b)-> ReactiveSeq.iterate(a,i->i+1).limit(2),
(a,b,c)-> ReactiveSeq.iterate(a,i->i+1).limit(2),
Expand All @@ -138,7 +138,7 @@ public static <T1, T2, T3, R1, R2, R3, R> FluxT<R> forEach(FluxT<? extends T1> v
* @param yieldingFunction Generates a result per combination
* @return FluxT with an element per combination of nested publishers generated by the yielding function
*/
public static <T1, T2, T3, R1, R2, R3, R> FluxT<R> forEach(FluxT<? extends T1> value1,
public static <T1, T2, T3, R1, R2, R3, R> FluxT<R> forEach4(FluxT<? extends T1> value1,
Function<? super T1, ? extends Publisher<R1>> value2,
BiFunction<? super T1, ? super R1, ? extends Publisher<R2>> value3,
TriFunction<? super T1, ? super R1, ? super R2, ? extends Publisher<R3>> value4,
Expand All @@ -163,11 +163,11 @@ public static <T1, T2, T3, R1, R2, R3, R> FluxT<R> forEach(FluxT<? extends T1> v
* <pre>
* {@code
*
* import static com.aol.cyclops.reactor.FluxTs.forEach;
* import static com.aol.cyclops.reactor.FluxTs.forEach3;
*
* FluxT<Integer> fluxT = FluxT.fromIterable(Arrays.asList(Flux.range(10,2),Flux.range(100,2)));
*
forEach(fluxT,
forEach3(fluxT,
a-> ReactiveSeq.iterate(a,i->i+1).limit(2),
(a,b)-> ReactiveSeq.iterate(a,i->i+1).limit(2),
Tuple::tuple)
Expand All @@ -184,7 +184,7 @@ public static <T1, T2, T3, R1, R2, R3, R> FluxT<R> forEach(FluxT<? extends T1> v
* @param yieldingFunction Generates a result per combination
* @return FluxT with an element per combination of nested publishers generated by the yielding function
*/
public static <T1, T2, R1, R2, R> FluxT<R> forEach(FluxT<? extends T1> value1,
public static <T1, T2, R1, R2, R> FluxT<R> forEach3(FluxT<? extends T1> value1,
Function<? super T1, ? extends Publisher<R1>> value2,
BiFunction<? super T1, ? super R1, ? extends Publisher<R2>> value3,
TriFunction<? super T1, ? super R1, ? super R2, ? extends R> yieldingFunction) {
Expand All @@ -205,11 +205,11 @@ public static <T1, T2, R1, R2, R> FluxT<R> forEach(FluxT<? extends T1> value1,
* <pre>
* {@code
*
* import static com.aol.cyclops.reactor.FluxTs.forEach;
* import static com.aol.cyclops.reactor.FluxTs.forEach3;
*
* FluxT<Integer> fluxT = FluxT.fromIterable(Arrays.asList(Flux.range(10,2),Flux.range(100,2)));
*
forEach(fluxT,
forEach3(fluxT,
a-> ReactiveSeq.iterate(a,i->i+1).limit(2),
(a,b)-> ReactiveSeq.iterate(a,i->i+1).limit(2),
(a,b,c)->a+b+c<102,
Expand All @@ -226,7 +226,7 @@ public static <T1, T2, R1, R2, R> FluxT<R> forEach(FluxT<? extends T1> value1,
* @param yieldingFunction Generates a result per combination
* @return FluxT with an element per combination of nested publishers generated by the yielding function
*/
public static <T1, T2, R1, R2, R> FluxT<R> forEach(FluxT<? extends T1> value1,
public static <T1, T2, R1, R2, R> FluxT<R> forEach3(FluxT<? extends T1> value1,
Function<? super T1, ? extends Publisher<R1>> value2,
BiFunction<? super T1, ? super R1, ? extends Publisher<R2>> value3,
TriFunction<? super T1, ? super R1, ? super R2, Boolean> filterFunction,
Expand Down
24 changes: 15 additions & 9 deletions cyclops-reactor/src/main/java/com/aol/cyclops/reactor/Fluxes.java
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,9 @@ public static <T> AnyMSeq<T> anyM(Flux<T> flux) {
* <pre>
* {@code
*
* import static com.aol.cyclops.reactor.Fluxes.forEach;
* import static com.aol.cyclops.reactor.Fluxes.forEach4;
*
forEach(Flux.range(1,10),
forEach4(Flux.range(1,10),
a-> ReactiveSeq.iterate(a,i->i+1).limit(10),
(a,b) -> Maybe.<Integer>of(a+b),
(a,b,c) -> Mono.<Integer>just(a+b+c),
Expand All @@ -92,7 +92,7 @@ public static <T> AnyMSeq<T> anyM(Flux<T> flux) {
* @param yieldingFunction Generates a result per combination
* @return Flux with an element per combination of nested publishers generated by the yielding function
*/
public static <T1, T2, T3, R1, R2, R3, R> Flux<R> forEach(Flux<? extends T1> value1,
public static <T1, T2, T3, R1, R2, R3, R> Flux<R> forEach4(Flux<? extends T1> value1,
Function<? super T1, ? extends Publisher<R1>> value2,
BiFunction<? super T1, ? super R1, ? extends Publisher<R2>> value3,
TriFunction<? super T1, ? super R1, ? super R2, ? extends Publisher<R3>> value4,
Expand All @@ -114,9 +114,9 @@ public static <T1, T2, T3, R1, R2, R3, R> Flux<R> forEach(Flux<? extends T1> val
* <pre>
* {@code
*
* import static com.aol.cyclops.reactor.Fluxes.forEach;
* import static com.aol.cyclops.reactor.Fluxes.forEach4;
*
* forEach(Flux.range(1,10),
* forEach4(Flux.range(1,10),
a-> ReactiveSeq.iterate(a,i->i+1).limit(10),
(a,b) -> Maybe.<Integer>just(a+b),
(a,b,c) -> Mono.<Integer>just(a+b+c),
Expand All @@ -134,7 +134,7 @@ public static <T1, T2, T3, R1, R2, R3, R> Flux<R> forEach(Flux<? extends T1> val
* @param yieldingFunction Generates a result per combination
* @return Flux with an element per combination of nested publishers generated by the yielding function
*/
public static <T1, T2, T3, R1, R2, R3, R> Flux<R> forEach(Flux<? extends T1> value1,
public static <T1, T2, T3, R1, R2, R3, R> Flux<R> forEach4(Flux<? extends T1> value1,
Function<? super T1, ? extends Publisher<R1>> value2,
BiFunction<? super T1, ? super R1, ? extends Publisher<R2>> value3,
TriFunction<? super T1, ? super R1, ? super R2, ? extends Publisher<R3>> value4,
Expand Down Expand Up @@ -176,7 +176,7 @@ public static <T1, T2, T3, R1, R2, R3, R> Flux<R> forEach(Flux<? extends T1> val
* @param yieldingFunction Generates a result per combination
* @return Flux with an element per combination of nested publishers generated by the yielding function
*/
public static <T1, T2, R1, R2, R> Flux<R> forEach(Flux<? extends T1> value1,
public static <T1, T2, R1, R2, R> Flux<R> forEach3(Flux<? extends T1> value1,
Function<? super T1, ? extends Publisher<R1>> value2,
BiFunction<? super T1, ? super R1, ? extends Publisher<R2>> value3,
TriFunction<? super T1, ? super R1, ? super R2, ? extends R> yieldingFunction) {
Expand Down Expand Up @@ -213,7 +213,7 @@ public static <T1, T2, R1, R2, R> Flux<R> forEach(Flux<? extends T1> value1,
* @param yieldingFunction Generates a result per combination
* @return
*/
public static <T1, T2, R1, R2, R> Flux<R> forEach(Flux<? extends T1> value1,
public static <T1, T2, R1, R2, R> Flux<R> forEach3(Flux<? extends T1> value1,
Function<? super T1, ? extends Publisher<R1>> value2,
BiFunction<? super T1, ? super R1, ? extends Publisher<R2>> value3,
TriFunction<? super T1, ? super R1, ? super R2, Boolean> filterFunction,
Expand Down Expand Up @@ -253,7 +253,8 @@ public static <T1, T2, R1, R2, R> Flux<R> forEach(Flux<? extends T1> value1,
* @param yieldingFunction Generates a result per combination
* @return
*/
public static <T, R1, R> Flux<R> forEach(Flux<? extends T> value1, Function<? super T, Flux<R1>> value2,
public static <T, R1, R> Flux<R> forEach(Flux<? extends T> value1,
Function<? super T, Flux<R1>> value2,
BiFunction<? super T, ? super R1, ? extends R> yieldingFunction) {

return AnyM.ofSeq(For.anyM(anyM(value1))
Expand Down Expand Up @@ -1192,6 +1193,11 @@ public U next() {
});
}

/**
* @param flux
* @param monoid
* @return
*/
public static <T> Flux<T> scanRight(Flux<T> flux, Monoid<T> monoid) {

return Flux.fromIterable(() -> new Iterator<T>() {
Expand Down
24 changes: 12 additions & 12 deletions cyclops-reactor/src/main/java/com/aol/cyclops/reactor/MonoTs.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,11 @@ public static <T> MonoTSeq<T> monoT(Publisher<Mono<T>> nested) {
* <pre>
* {@code
*
* import static com.aol.cyclops.reactor.MonoTs.forEach;
* import static com.aol.cyclops.reactor.MonoTs.forEach4;
*
* MonoT<Integer> monoT = MonoT.fromIterable(Arrays.asList(Flux.range(10,2),Flux.range(100,2)));
*
forEach(monoT,
forEach4(monoT,
a-> ReactiveSeq.iterate(a,i->i+1).limit(2),
(a,b)-> ReactiveSeq.iterate(a,i->i+1).limit(2),
(a,b,c)-> ReactiveSeq.iterate(a,i->i+1).limit(2),
Expand All @@ -91,7 +91,7 @@ public static <T> MonoTSeq<T> monoT(Publisher<Mono<T>> nested) {
* @param yieldingFunction Generates a result per combination
* @return MonoTSeq with an element per combination of nested publishers generated by the yielding function
*/
public static <T1, T2, T3, R1, R2, R3, R> MonoT<R> forEach(MonoT<? extends T1> value1,
public static <T1, T2, T3, R1, R2, R3, R> MonoT<R> forEach4(MonoT<? extends T1> value1,
Function<? super T1, ? extends Publisher<R1>> value2,
BiFunction<? super T1, ? super R1, ? extends Publisher<R2>> value3,
TriFunction<? super T1, ? super R1, ? super R2, ? extends Publisher<R3>> value4,
Expand All @@ -114,11 +114,11 @@ public static <T1, T2, T3, R1, R2, R3, R> MonoT<R> forEach(MonoT<? extends T1> v
* <pre>
* {@code
*
* import static com.aol.cyclops.reactor.MonoTs.forEach;
* import static com.aol.cyclops.reactor.MonoTs.forEach4;
*
* MonoT<Integer> monoT = MonoT.fromIterable(Arrays.asList(Flux.range(10,2),Flux.range(100,2)));
*
forEach(monoT,
forEach4(monoT,
a-> ReactiveSeq.iterate(a,i->i+1).limit(2),
(a,b)-> ReactiveSeq.iterate(a,i->i+1).limit(2),
(a,b,c)-> ReactiveSeq.iterate(a,i->i+1).limit(2),
Expand All @@ -139,7 +139,7 @@ public static <T1, T2, T3, R1, R2, R3, R> MonoT<R> forEach(MonoT<? extends T1> v
* @param yieldingFunction Generates a result per combination
* @return
*/
public static <T1, T2, T3, R1, R2, R3, R> MonoT<R> forEach(MonoT<? extends T1> value1,
public static <T1, T2, T3, R1, R2, R3, R> MonoT<R> forEach4(MonoT<? extends T1> value1,
Function<? super T1, ? extends Publisher<R1>> value2,
BiFunction<? super T1, ? super R1, ? extends Publisher<R2>> value3,
TriFunction<? super T1, ? super R1, ? super R2, ? extends Publisher<R3>> value4,
Expand All @@ -164,11 +164,11 @@ public static <T1, T2, T3, R1, R2, R3, R> MonoT<R> forEach(MonoT<? extends T1> v
* <pre>
* {@code
*
* import static com.aol.cyclops.reactor.MonoTs.forEach;
* import static com.aol.cyclops.reactor.MonoTs.forEach3;
*
* MonoT<Integer> monoT = MonoT.fromIterable(Arrays.asList(Flux.range(10,2),Flux.range(100,2)));
*
forEach(monoT,
forEach3(monoT,
a-> ReactiveSeq.iterate(a,i->i+1).limit(2),
(a,b)-> ReactiveSeq.iterate(a,i->i+1).limit(2),
Tuple::tuple)
Expand All @@ -185,7 +185,7 @@ public static <T1, T2, T3, R1, R2, R3, R> MonoT<R> forEach(MonoT<? extends T1> v
* @param yieldingFunction Generates a result per combination
* @return MonoTSeq with an element per combination of nested publishers generated by the yielding function
*/
public static <T1, T2, R1, R2, R> MonoT<R> forEach(MonoT<? extends T1> value1,
public static <T1, T2, R1, R2, R> MonoT<R> forEach3(MonoT<? extends T1> value1,
Function<? super T1, ? extends Publisher<R1>> value2,
BiFunction<? super T1, ? super R1, ? extends Publisher<R2>> value3,
TriFunction<? super T1, ? super R1, ? super R2, ? extends R> yieldingFunction) {
Expand All @@ -206,11 +206,11 @@ public static <T1, T2, R1, R2, R> MonoT<R> forEach(MonoT<? extends T1> value1,
* <pre>
* {@code
*
* import static com.aol.cyclops.reactor.MonoTs.forEach;
* import static com.aol.cyclops.reactor.MonoTs.forEach3;
*
* MonoT<Integer> monoT = MonoT.fromIterable(Arrays.asList(Flux.range(10,2),Flux.range(100,2)));
*
forEach(monoT,
forEach3(monoT,
a-> ReactiveSeq.iterate(a,i->i+1).limit(2),
(a,b)-> ReactiveSeq.iterate(a,i->i+1).limit(2),
(a,b,c,d)->a+b+c+d<102,
Expand All @@ -227,7 +227,7 @@ public static <T1, T2, R1, R2, R> MonoT<R> forEach(MonoT<? extends T1> value1,
* @param yieldingFunction Generates a result per combination
* @return MonoTSeq with an element per combination of nested publishers generated by the yielding function
*/
public static <T1, T2, R1, R2, R> MonoT<R> forEach(MonoT<? extends T1> value1,
public static <T1, T2, R1, R2, R> MonoT<R> forEach3(MonoT<? extends T1> value1,
Function<? super T1, ? extends Publisher<R1>> value2,
BiFunction<? super T1, ? super R1, ? extends Publisher<R2>> value3,
TriFunction<? super T1, ? super R1, ? super R2, Boolean> filterFunction,
Expand Down
24 changes: 12 additions & 12 deletions cyclops-reactor/src/main/java/com/aol/cyclops/reactor/Monos.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,9 @@ public static <T> AnyMValue<T> anyM(Mono<T> mono) {
* <pre>
* {@code
*
* import static com.aol.cyclops.reactor.Monos.forEach;
* import static com.aol.cyclops.reactor.Monos.forEach4;
*
forEach(Mono.just(1),
forEach4(Mono.just(1),
a-> Mono.just(a+1),
(a,b) -> Mono.<Integer>just(a+b),
(a,b,c) -> Mono.<Integer>just(a+b+c),
Expand All @@ -74,7 +74,7 @@ public static <T> AnyMValue<T> anyM(Mono<T> mono) {
* @param yieldingFunction Generates a result per combination
* @return Mono with a combined value generated by the yielding function
*/
public static <T1, T2, T3, R1, R2, R3, R> Mono<R> forEach(Mono<? extends T1> value1,
public static <T1, T2, T3, R1, R2, R3, R> Mono<R> forEach4(Mono<? extends T1> value1,
Function<? super T1, ? extends Mono<R1>> value2,
BiFunction<? super T1, ? super R1, ? extends Mono<R2>> value3,
TriFunction<? super T1, ? super R1, ? super R2, ? extends Mono<R3>> value4,
Expand All @@ -98,9 +98,9 @@ public static <T1, T2, T3, R1, R2, R3, R> Mono<R> forEach(Mono<? extends T1> val
* <pre>
* {@code
*
* import static com.aol.cyclops.reactor.Monos.forEach;
* import static com.aol.cyclops.reactor.Monos.forEach4;
*
* forEach(Mono.just(1),
* forEach4(Mono.just(1),
a-> Mono.just(a+1),
(a,b) -> Mono.<Integer>just(a+b),
(a,b,c) -> Mono.<Integer>just(a+b+c),
Expand All @@ -118,7 +118,7 @@ public static <T1, T2, T3, R1, R2, R3, R> Mono<R> forEach(Mono<? extends T1> val
* @param yieldingFunction Generates a result per combination
* @return Mono with a combined value generated by the yielding function
*/
public static <T1, T2, T3, R1, R2, R3, R> Mono<R> forEach(Mono<? extends T1> value1,
public static <T1, T2, T3, R1, R2, R3, R> Mono<R> forEach4(Mono<? extends T1> value1,
Function<? super T1, ? extends Mono<R1>> value2,
BiFunction<? super T1, ? super R1, ? extends Mono<R2>> value3,
TriFunction<? super T1, ? super R1, ? super R2, ? extends Mono<R3>> value4,
Expand All @@ -143,9 +143,9 @@ public static <T1, T2, T3, R1, R2, R3, R> Mono<R> forEach(Mono<? extends T1> val
* <pre>
* {@code
*
* import static com.aol.cyclops.reactor.Monos.forEach;
* import static com.aol.cyclops.reactor.Monos.forEach3;
*
forEach(Mono.just(1),
forEach3(Mono.just(1),
a-> Mono.just(a+1),
(a,b) -> Mono.<Integer>just(a+b),
Tuple::tuple)
Expand All @@ -159,7 +159,7 @@ public static <T1, T2, T3, R1, R2, R3, R> Mono<R> forEach(Mono<? extends T1> val
* @param yieldingFunction Generates a result per combination
* @return Mono with a combined value generated by the yielding function
*/
public static <T1, T2, R1, R2, R> Mono<R> forEach(Mono<? extends T1> value1,
public static <T1, T2, R1, R2, R> Mono<R> forEach3(Mono<? extends T1> value1,
Function<? super T1, ? extends Mono<R1>> value2,
BiFunction<? super T1, ? super R1, ? extends Mono<R2>> value3,
TriFunction<? super T1, ? super R1, ? super R2, ? extends R> yieldingFunction) {
Expand All @@ -181,9 +181,9 @@ public static <T1, T2, R1, R2, R> Mono<R> forEach(Mono<? extends T1> value1,
* <pre>
* {@code
*
* import static com.aol.cyclops.reactor.Monos.forEach;
* import static com.aol.cyclops.reactor.Monos.forEach3;
*
* forEach(Mono.just(1),
* forEach3(Mono.just(1),
a-> Mono.just(a+1),
(a,b) -> Mono.<Integer>just(a+b),
(a,b,c) -> a+b+c <100,
Expand All @@ -199,7 +199,7 @@ public static <T1, T2, R1, R2, R> Mono<R> forEach(Mono<? extends T1> value1,
* @param yieldingFunction Generates a result per combination
* @return Mono with a combined value generated by the yielding function
*/
public static <T1, T2, R1, R2, R> Mono<R> forEach(Mono<? extends T1> value1,
public static <T1, T2, R1, R2, R> Mono<R> forEach3(Mono<? extends T1> value1,
Function<? super T1, ? extends Mono<R1>> value2,
BiFunction<? super T1, ? super R1, ? extends Mono<R2>> value3,
TriFunction<? super T1, ? super R1, ? super R2, Boolean> filterFunction,
Expand Down
Loading

0 comments on commit 0b6b580

Please sign in to comment.