Skip to content

Commit

Permalink
Support nested promises.
Browse files Browse the repository at this point in the history
  • Loading branch information
ldaley committed Oct 15, 2014
1 parent 66561ed commit 8cdaa71
Show file tree
Hide file tree
Showing 29 changed files with 613 additions and 196 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ ratpack {
def t = new TemplatingModule()
t.staticallyCompile = true
add t
<% } else { %>
add(TemplatingModule) { TemplatingModule.Config config -> config.staticallyCompile = true }
<% } %>

Expand Down
5 changes: 2 additions & 3 deletions ratpack-core/src/main/java/ratpack/exec/ExecControl.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package ratpack.exec;

import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import ratpack.func.Action;

import java.util.concurrent.Callable;
Expand Down Expand Up @@ -132,9 +131,9 @@ public interface ExecControl {
*
* @param publisher the provider of a potentially unbounded number of sequenced elements, publishing them according to the demand
* received from its Subscriber(s)
* @param subscriber a component that accepts a sequenced stream of elements provided by a Publisher
* @return effectively the given publisher, emitting each “event” as an execution segment of the current execution
* @param <T> the type of streamed elements
*/
<T> void stream(Publisher<T> publisher, Subscriber<? super T> subscriber);
<T> Publisher<T> stream(Publisher<T> publisher);

}
2 changes: 0 additions & 2 deletions ratpack-core/src/main/java/ratpack/exec/Execution.java
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,6 @@ public interface Execution extends MutableRegistry {
// TODO: this is not the right name.
void onCleanup(AutoCloseable autoCloseable);

void checkpoint(String checkpointId);

/**
* {@inheritDoc}
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package ratpack.exec;

import java.util.List;
import java.util.Optional;

public interface ExecutionSnapshot {
Expand All @@ -25,12 +24,8 @@ public interface ExecutionSnapshot {

boolean getWaiting();

List<String> getCheckpoints();

Long getStartedAt();

Optional<StackTraceElement[]> getStartedTrace();

Optional<StackTraceElement[]> getLastSegmentTrace();

}
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,11 @@ public CachingPromise(Consumer<? super Fulfiller<? super T>> fulfillment, Suppli

private class Job {
final Fulfiller<? super T> fulfiller;
final ExecutionBacking executionBacking = executionSupplier.get();
final ExecutionBacking.StreamHandle streamHandle;

private Job(Fulfiller<? super T> fulfiller) {
private Job(Fulfiller<? super T> fulfiller, ExecutionBacking.StreamHandle streamHandle) {
this.fulfiller = fulfiller;
this.streamHandle = streamHandle;
}
}

Expand Down Expand Up @@ -128,7 +129,7 @@ private void tryDrain() {
Job job = waiting.poll();
while (job != null) {
Job finalJob = job;
job.executionBacking.join(e -> finalJob.fulfiller.accept(result));
job.streamHandle.complete(e -> finalJob.fulfiller.accept(result));
job = waiting.poll();
}
} finally {
Expand Down Expand Up @@ -161,9 +162,8 @@ public void success(T value) {
}
});
} else {
Job job = new Job(fulfiller);
job.executionBacking.continueVia(() -> {
waiting.add(job);
executionSupplier.get().streamSubscribe((streamHandle) -> {
waiting.add(new Job(fulfiller, streamHandle));
if (result.get() != null) {
tryDrain();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,11 @@ public class DefaultExecControl implements ExecControl {
private static final int MAX_ERRORS_THRESHOLD = 5;

private final ExecController execController;
private final boolean debug;
private final ThreadLocal<ExecutionBacking> threadBinding = new ThreadLocal<>();
private final Set<ExecutionBacking> executions = new ConcurrentSet<>();

public DefaultExecControl(ExecController execController, boolean debug) {
public DefaultExecControl(ExecController execController) {
this.execController = execController;
this.debug = debug;
}

private ExecutionBacking getBacking() {
Expand Down Expand Up @@ -115,13 +113,13 @@ public ExecStarter register(Action<? super RegistrySpec> registry) {

@Override
public void start(Action<? super Execution> action) {
Optional<StackTraceElement[]> startStack = debug ? Optional.of(Thread.currentThread().getStackTrace()) : Optional.empty();
Optional<StackTraceElement[]> startTrace = ExecutionBacking.TRACE ? Optional.of(Thread.currentThread().getStackTrace()) : Optional.empty();

Action<? super Execution> effectiveAction = registry == null ? action : Action.join(registry, action);
if (execController.isManagedThread() && threadBinding.get() == null) {
new ExecutionBacking(execController, executions, startStack, threadBinding, effectiveAction, onError, onComplete);
new ExecutionBacking(execController, executions, startTrace, threadBinding, effectiveAction, onError, onComplete);
} else {
execController.getExecutor().submit(() -> new ExecutionBacking(execController, executions, startStack, threadBinding, effectiveAction, onError, onComplete));
execController.getExecutor().submit(() -> new ExecutionBacking(execController, executions, startTrace, threadBinding, effectiveAction, onError, onComplete));
}
}
};
Expand All @@ -137,7 +135,7 @@ public <T> Promise<T> blocking(final Callable<T> blockingOperation) {
final ExecutionBacking backing = getBacking();
final ExecController controller = backing.getController();
return directPromise(f ->
backing.continueVia(() -> CompletableFuture.supplyAsync(() -> {
backing.streamSubscribe((streamHandle) -> CompletableFuture.supplyAsync(() -> {
List<Result<T>> holder = Lists.newArrayListWithCapacity(1);
try {
backing.intercept(ExecInterceptor.ExecType.BLOCKING, backing.getInterceptors(), execution ->
Expand All @@ -148,49 +146,41 @@ public <T> Promise<T> blocking(final Callable<T> blockingOperation) {
return Result.<T>failure(e);
}
}, controller.getBlockingExecutor()
).thenAccept(v -> backing.join(e -> f.accept(v))))
).thenAccept(v -> streamHandle.complete(e -> f.accept(v))))
);
}

private <T> Promise<T> directPromise(Consumer<? super Fulfiller<? super T>> action) {
return new DefaultPromise<>(this::getBacking, action);
}

@Override
public <T> void stream(final Publisher<T> publisher, final Subscriber<? super T> subscriber) {
final ExecutionBacking executionBacking = getBacking();

directPromise((Consumer<Fulfiller<? super Subscription>>) fulfiller ->
executionBacking.continueVia(() ->
publisher.subscribe(new Subscriber<T>() {
@Override
public void onSubscribe(final Subscription subscription) {
fulfiller.success(subscription);
}

@Override
public void onNext(final T element) {
executionBacking.streamExecution(execution -> subscriber.onNext(element));
}

@Override
public void onComplete() {
executionBacking.completeStreamExecution(execution -> subscriber.onComplete());
}

@Override
public void onError(final Throwable cause) {
executionBacking.completeStreamExecution(execution -> subscriber.onError(cause));
}
})
)
).then(subscription ->
executionBacking.join(e ->
executionBacking.streamExecution(execution ->
subscriber.onSubscribe(subscription)
)
)
);
public <T> Publisher<T> stream(Publisher<T> publisher) {
return subscriber -> {
final ExecutionBacking executionBacking = getBacking();
executionBacking.streamSubscribe((handle) ->
publisher.subscribe(new Subscriber<T>() {
@Override
public void onSubscribe(final Subscription subscription) {
handle.event((e) -> subscriber.onSubscribe(subscription));
}

@Override
public void onNext(final T element) {
handle.event(execution -> subscriber.onNext(element));
}

@Override
public void onComplete() {
handle.complete(execution -> subscriber.onComplete());
}

@Override
public void onError(final Throwable cause) {
handle.complete(execution -> subscriber.onError(cause));
}
})
);
};
}

public List<? extends ExecutionSnapshot> getExecutionSnapshots() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,10 @@ public DefaultExecController() {
}

public DefaultExecController(int numThreads) {
this(numThreads, false);
}

public DefaultExecController(int numThreads, boolean debug) {
this.numThreads = numThreads;
this.eventLoopGroup = new NioEventLoopGroup(numThreads, new ExecControllerBindingThreadFactory("ratpack-compute", Thread.MAX_PRIORITY));
this.blockingExecutor = Executors.newCachedThreadPool(new ExecControllerBindingThreadFactory("ratpack-blocking", Thread.NORM_PRIORITY));
this.control = new DefaultExecControl(this, debug);
this.control = new DefaultExecControl(this);
}

public void close() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,17 @@
import ratpack.exec.Execution;
import ratpack.registry.internal.SimpleMutableRegistry;

import java.util.Collection;
import java.util.List;
import java.util.function.Supplier;

public class DefaultExecution extends SimpleMutableRegistry implements Execution {

private final ExecController controller;
private final List<AutoCloseable> closeables;
private final Collection<String> checkpoints;

public DefaultExecution(ExecController controller, List<AutoCloseable> closeables, Collection<String> checkpoints) {
public DefaultExecution(ExecController controller, List<AutoCloseable> closeables) {
this.controller = controller;
this.closeables = closeables;
this.checkpoints = checkpoints;
}

@Override
Expand All @@ -53,11 +50,6 @@ public void onCleanup(AutoCloseable autoCloseable) {
closeables.add(autoCloseable);
}

@Override
public void checkpoint(String checkpointId) {
checkpoints.add(checkpointId);
}

@Override
public <O> Execution addLazy(TypeToken<O> type, Supplier<? extends O> supplier) {
super.addLazy(type, supplier);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.function.Supplier;

import static ratpack.func.Action.ignoreArg;
import static ratpack.func.Action.throwException;

public class DefaultSuccessPromise<T> implements SuccessPromise<T> {

Expand Down Expand Up @@ -206,10 +207,10 @@ public Promise<T> defer(Action<? super Runnable> releaser) {
if (fired.compareAndSet(false, true)) {
return new DefaultPromise<>(executionSupplier, downstream -> {
ExecutionBacking executionBacking = executionSupplier.get();
executionBacking.continueVia(() -> {
executionBacking.streamSubscribe((streamHandle) -> {
try {
releaser.execute((Runnable) () ->
executionBacking.join(e ->
streamHandle.complete(e ->
fulfillment.accept(downstream)
)
);
Expand Down Expand Up @@ -300,7 +301,7 @@ public void error(final Throwable throwable) {
try {
errorHandler.execute(throwable);
} catch (Throwable errorHandlerThrown) {
executionBacking.join(Action.throwException(errorHandlerThrown));
executionBacking.streamSubscribe(h -> h.complete(throwException(errorHandlerThrown)));
}
}

Expand All @@ -309,7 +310,7 @@ public void success(final T value) {
try {
then.execute(value);
} catch (Throwable throwable) {
executionBacking.join(Action.throwException(throwable));
executionBacking.streamSubscribe(h -> h.complete(throwException(throwable)));
}
}
}
Expand Down
Loading

0 comments on commit 8cdaa71

Please sign in to comment.