Skip to content

Commit

Permalink
refactoring and javadoc
Browse files Browse the repository at this point in the history
  • Loading branch information
johnmcclean committed Oct 12, 2016
1 parent ef27ae5 commit 1cf2667
Show file tree
Hide file tree
Showing 5 changed files with 36 additions and 141 deletions.
63 changes: 26 additions & 37 deletions cyclops-reactor/src/main/java/com/aol/cyclops/reactor/Fluxes.java
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ public static <T, R1, R> Flux<R> forEach(Flux<? extends T> value1, Function<? su
*
* <pre>
* {@code
* FluxUtils.grouped(Flux.just(1,2,3,1,1,1),3,()->new TreeSet<>())
* Fluxes.grouped(Flux.just(1,2,3,1,1,1),3,()->new TreeSet<>())
*
* //Flux[Set[1,2,4],Set[1]]
Expand Down Expand Up @@ -370,7 +370,7 @@ public static <T> Iterator<T> iterator(Flux<T> stream){
* with it's neighbour
* <pre>
* {@code
* FluxUtils.combine(Flux.just(1,1,2,3),(a, b)->a.equals(b),Semigroups.intSum)
* Fluxes.combine(Flux.just(1,1,2,3),(a, b)->a.equals(b),Semigroups.intSum)
* //Flux(3,4)
Expand Down Expand Up @@ -426,7 +426,7 @@ public ReactiveSeq<T> next() {
*
* <pre>
* {@code
* List<Integer> list = Fluexs.cycle(Flux.just(1,2,2)),Reducers.toCountInt(),3)
* List<Integer> list = Fluxes.cycle(Flux.just(1,2,2)),Reducers.toCountInt(),3)
* .collect(Collectors.toList());
* //List[3,3,3];
* }
Expand Down Expand Up @@ -468,7 +468,7 @@ public T next() {
* {@code
* int count =0;
*
assertThat(FluxUtils.cycleWhile(Flux.just(1,2,2)
assertThat(Fluxes.cycleWhile(Flux.just(1,2,2)
,next -> count++<6 )
.collect(Collectors.toList()),equalTo(Arrays.asList(1,2,2,1,2,2)));
* }
Expand Down Expand Up @@ -502,6 +502,20 @@ public T next() {
}
);
}

/**
* Take elements from the Stream until the predicate returns true, after
* which all elements are excluded.
* <pre>
* {@code
* Fluxes.limitUntil(Flux.justf(4,3,6,7),i->i==6).collect(Collectors.toList())
* //Arrays.asList(4,3)
* }</pre>
*
* @param stream Flux to take elements from
* @param predicate Take until predicate is true
* @return Stream with limited elements
*/
public final static <T> Flux<T> takeUntil(final Flux<T> stream, final Predicate<? super T> predicate) {
return Flux.fromIterable(()-> new Iterator<T>(){

Expand All @@ -527,39 +541,15 @@ public T next() {
}
);
}
public final static <T> Flux<T> limitUntil(final Flux<T> stream, final Predicate<? super T> predicate) {
return Flux.fromIterable(()-> new Iterator<T>(){

Iterator<T> it;
private void init(){
if(it==null){
ReactiveSeq<T> seq = ReactiveSeq.fromPublisher(stream);
it = seq.limitUntil(predicate).iterator();
}
}
@Override
public boolean hasNext() {
init();
return it.hasNext();
}

@Override
public T next() {
init();
return it.next();
}

}
);
}


/**
* Repeat in a Stream until specified predicate holds
*
* <pre>
* {@code
* count =0;
assertThat(FluxUtils.cycleUntil(Flux.just(1,2,2,3)
assertThat(Fluxes.cycleUntil(Flux.just(1,2,2,3)
,next -> count++>10 )
.collect(Collectors.toList()),equalTo(Arrays.asList(1, 2, 2, 3, 1, 2, 2, 3, 1, 2, 2)));
Expand Down Expand Up @@ -600,7 +590,7 @@ public T next() {
*
*
* assertThat(Arrays.asList(1, 2, 3),
* equalTo( FluxUtils.ofType(Flux.just(1, "a", 2, "b", 3,Integer.class));
* equalTo( Fluxes.ofType(Flux.just(1, "a", 2, "b", 3,Integer.class));
*
*/
@SuppressWarnings("unchecked")
Expand Down Expand Up @@ -630,7 +620,7 @@ public final static <T> Flux<ListX<T>> groupedUntil(final Flux<T> stream, final
* <pre>
* {@code
*
FluxUtils.trampoline(Flux.just(10,20,30,40),i-> fibonacci(i))
Fluxes.trampoline(Flux.just(10,20,30,40),i-> fibonacci(i))
.forEach(System.out::println);
Trampoline<Long> fibonacci(int i){
Expand All @@ -647,7 +637,7 @@ Trampoline<Long> fibonacci(int n, long a, long b) {
*
*
*
FluxUtils.trampoline(Flux.just(10_000,200_000,3_000_000,40_000_000),i-> fibonacci(i))
Fluxes.trampoline(Flux.just(10_000,200_000,3_000_000,40_000_000),i-> fibonacci(i))
.forEach(System.out::println);
Expand All @@ -668,8 +658,7 @@ public static <T,R> Flux<R> trampoline(Flux<T> flux,Function<? super T, ? extend
*
* <pre>
* {@code
* List<String> result = CollectionX.of(1,2,3,4)
.patternMatch(
* List<String> result = Fluxes.patternMatch(Flux.just(1,2,3,4),
c->c.valuesWhere(i->"even", (Integer i)->i%2==0 )
)
* }
Expand Down Expand Up @@ -1209,7 +1198,7 @@ public static <T> Flux<Tuple2<T, Long>> zipWithIndex(Flux<T> stream) {
* Delete elements between given indexes in a Flux
* <pre>
* {@code
* List<String> result = FluxUtils.deleteBetween(Flux.just(1,2,3,4,5,6),2,4)
* List<String> result = Fluxes.deleteBetween(Flux.just(1,2,3,4,5,6),2,4)
.map(it ->it+"!!")
.collect(Collectors.toList())
.block();
Expand All @@ -1230,7 +1219,7 @@ public static final <T> Flux<T> deleteBetween(final Flux<T> stream, final int st
* Insert data into a Flux at given position
* <pre>
* {@code
* List<String> result = FluxUtils.insertAt(Flux.just(1,2,3),1,100,200,300)
* List<String> result = Fluxes.insertAt(Flux.just(1,2,3),1,100,200,300)
.map(it ->it+"!!")
.collect(Collectors.toList())
.block();
Expand Down
97 changes: 2 additions & 95 deletions cyclops-reactor/src/main/java/com/aol/cyclops/reactor/Reactor.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,7 @@ public static <T> AnyMSeq<T> fromFluxT(FluxT<T> flux) {
return AnyM.ofSeq(flux);
}

public static <T> AnyMSeq<T> flux(Flux<T> flux) {
return AnyM.ofSeq(flux);
}


public static <T> FluxTSeq<T> fluxT(Publisher<Flux<T>> nested) {
return FluxT.fromPublisher(nested);
}
Expand All @@ -50,97 +47,7 @@ public static <T> MonoTSeq<T> monoT(Publisher<Mono<T>> nested) {
return MonoT.fromPublisher(Flux.from(nested));
}

public interface ForFlux {

static <T1, T2, T3, R1, R2, R3, R> Flux<R> each4(Flux<? extends T1> value1,
Function<? super T1, ? extends Flux<R1>> value2,
BiFunction<? super T1, ? super R1, ? extends Flux<R2>> value3,
TriFunction<? super T1, ? super R1, ? super R2, ? extends Flux<R3>> value4,
QuadFunction<? super T1, ? super R1, ? super R2, ? super R3, ? extends R> yieldingFunction) {

return AnyM.ofSeq(For.anyM(flux(value1))
.anyM(a -> flux(value2.apply(a)))
.anyM(a -> b -> flux(value3.apply(a, b)))
.anyM(a -> b -> c -> flux(value4.apply(a, b, c)))
.yield4(yieldingFunction)
.unwrap())
.unwrap();

}

static <T1, T2, T3, R1, R2, R3, R> Flux<R> each4(Flux<? extends T1> value1,
Function<? super T1, ? extends Flux<R1>> value2,
BiFunction<? super T1, ? super R1, ? extends Flux<R2>> value3,
TriFunction<? super T1, ? super R1, ? super R2, ? extends Flux<R3>> value4,
QuadFunction<? super T1, ? super R1, ? super R2, ? super R3, Boolean> filterFunction,
QuadFunction<? super T1, ? super R1, ? super R2, ? super R3, ? extends R> yieldingFunction) {

return AnyM.ofSeq(For.anyM(flux(value1))
.anyM(a -> flux(value2.apply(a)))
.anyM(a -> b -> flux(value3.apply(a, b)))
.anyM(a -> b -> c -> flux(value4.apply(a, b, c)))
.filter(a -> b -> c -> d -> filterFunction.apply(a, b, c, d))
.yield4(yieldingFunction)
.unwrap())
.unwrap();

}

static <T1, T2, R1, R2, R> Flux<R> each3(Flux<? extends T1> value1,
Function<? super T1, ? extends Flux<R1>> value2,
BiFunction<? super T1, ? super R1, ? extends Flux<R2>> value3,
TriFunction<? super T1, ? super R1, ? super R2, ? extends R> yieldingFunction) {

return AnyM.ofSeq(For.anyM(flux(value1))
.anyM(a -> flux(value2.apply(a)))
.anyM(a -> b -> flux(value3.apply(a, b)))
.yield3(yieldingFunction)
.unwrap())
.unwrap();

}

static <T1, T2, R1, R2, R> Flux<R> each3(Flux<? extends T1> value1,
Function<? super T1, ? extends Flux<R1>> value2,
BiFunction<? super T1, ? super R1, ? extends Flux<R2>> value3,
TriFunction<? super T1, ? super R1, ? super R2, Boolean> filterFunction,
TriFunction<? super T1, ? super R1, ? super R2, ? extends R> yieldingFunction) {

return AnyM.ofSeq(For.anyM(flux(value1))
.anyM(a -> flux(value2.apply(a)))
.anyM(a -> b -> flux(value3.apply(a, b)))
.filter(a -> b -> c -> filterFunction.apply(a, b, c))
.yield3(yieldingFunction)
.unwrap())
.unwrap();

}

static <T, R1, R> Flux<R> each2(Flux<? extends T> value1, Function<? super T, Flux<R1>> value2,
BiFunction<? super T, ? super R1, ? extends R> yieldingFunction) {

return AnyM.ofSeq(For.anyM(flux(value1))
.anyM(a -> flux(value2.apply(a)))
.yield2(yieldingFunction)
.unwrap())
.unwrap();

}

static <T, R1, R> Flux<R> each2(Flux<? extends T> value1, Function<? super T, ? extends Flux<R1>> value2,
BiFunction<? super T, ? super R1, Boolean> filterFunction,
BiFunction<? super T, ? super R1, ? extends R> yieldingFunction) {

return AnyM.ofSeq(For.anyM(flux(value1))
.anyM(a -> flux(value2.apply(a)))
.filter(a -> b -> filterFunction.apply(a, b))
.yield2(yieldingFunction)
.unwrap())
.unwrap();

}
}


public interface ForFluxTransformer {

static <T1, T2, T3, R1, R2, R3, R> FluxT<R> each4(FluxT<? extends T1> value1,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ public FluentCollectionX<T> dropWhile(final Predicate<? super T> p) {
*/
@Override
public FluentCollectionX<T> takeUntil(final Predicate<? super T> p) {
return stream(Fluxes.limitUntil(flux(), p));
return stream(Fluxes.takeUntil(flux(), p));
}

/* (non-Javadoc)
Expand Down Expand Up @@ -665,7 +665,7 @@ public FluentCollectionX<T> limitWhile(final Predicate<? super T> p) {
@Override
public FluentCollectionX<T> limitUntil(final Predicate<? super T> p) {

return stream(Fluxes.limitUntil(flux(), p));
return stream(Fluxes.takeUntil(flux(), p));
}

/*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import com.aol.cyclops.control.Trampoline;
import com.aol.cyclops.control.monads.transformers.values.FoldableTransformerSeq;
import com.aol.cyclops.data.collections.extensions.standard.ListX;
import com.aol.cyclops.reactor.Reactor;
import com.aol.cyclops.reactor.Fluxes;
import com.aol.cyclops.types.MonadicValue;
import com.aol.cyclops.types.anyM.AnyMSeq;
import com.aol.cyclops.types.anyM.AnyMValue;
Expand Down Expand Up @@ -206,7 +206,7 @@ public static <A> FluxTSeq<A> fromIterable(Iterable<Flux<A>> iterableOfFluxs) {
}

public static <A> FluxTSeq<A> fromFlux(Flux<Flux<A>> FluxOfFluxs) {
return FluxTSeq.of(Reactor.flux(FluxOfFluxs));
return FluxTSeq.of(Fluxes.anyM(FluxOfFluxs));
}

public static <A> FluxTSeq<A> fromPublisher(Publisher<Flux<A>> publisherOfFluxs) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import com.aol.cyclops.control.monads.transformers.seq.ListTSeq;
import com.aol.cyclops.data.collections.extensions.standard.ListX;
import com.aol.cyclops.data.collections.extensions.standard.SetX;
import com.aol.cyclops.reactor.Reactor.ForFlux;
import com.aol.cyclops.reactor.Reactor.ForFluxTransformer;
import com.aol.cyclops.reactor.transformer.FluxT;

Expand Down Expand Up @@ -82,7 +81,7 @@ public Data findRemote(int in) {

@Test
public void flux() {
assertThat(Reactor.flux(Flux.just(1, 2, 3))
assertThat(Fluxes.anyM(Flux.just(1, 2, 3))
.toListX(),
equalTo(ListX.of(1, 2, 3)));
}
Expand Down Expand Up @@ -119,10 +118,10 @@ public void monoT() {
@Test
public void fluxComp() {

ForFlux.each2(Flux.range(1, 10), i -> Flux.range(i, 10), Tuple::tuple);
Fluxes.forEach(Flux.range(1, 10), i -> Flux.range(i, 10), Tuple::tuple);

Flux<Tuple2<Integer, Integer>> stream = ForFlux.each2(Flux.range(1, 10), i -> Flux.range(i, 10), Tuple::tuple);
Flux<Integer> result = Reactor.ForFlux.each2(Flux.just(10, 20), a -> Flux.<Integer> just(a + 10),
Flux<Tuple2<Integer, Integer>> stream = Fluxes.forEach(Flux.range(1, 10), i -> Flux.range(i, 10), Tuple::tuple);
Flux<Integer> result = Fluxes.forEach(Flux.just(10, 20), a -> Flux.<Integer> just(a + 10),
(a, b) -> a + b);
assertThat(result.collectList()
.block(),
Expand Down

0 comments on commit 1cf2667

Please sign in to comment.