Skip to content

Commit

Permalink
Improve error handling so that unhandled errors are at least logged a…
Browse files Browse the repository at this point in the history
…nd there is some protection for exception cycles.
  • Loading branch information
ldaley committed Oct 2, 2014
1 parent 6fefe59 commit 16c73ef
Show file tree
Hide file tree
Showing 27 changed files with 274 additions and 366 deletions.
53 changes: 3 additions & 50 deletions ratpack-core/src/main/java/ratpack/exec/ExecControl.java
Original file line number Diff line number Diff line change
Expand Up @@ -95,45 +95,6 @@ public interface ExecControl {
* <p>
* This method is just a specialization of {@link #promise}, and shares all of the same semantics with regard to
* execution binding and execution-on-promise-subscription.
* <pre class="java">
* import ratpack.launch.LaunchConfigBuilder;
* import ratpack.func.Action;
* import ratpack.exec.Execution;
* import ratpack.exec.ExecController;
*
* import java.util.concurrent.Callable;
* import java.util.concurrent.CountDownLatch;
*
* public class Example {
*
* public static void main(String[] args) throws InterruptedException {
* ExecController controller = LaunchConfigBuilder.noBaseDir().build().getExecController();
*
* final CountDownLatch latch = new CountDownLatch(1);
*
* controller.getControl().fork(new Action&lt;Execution&gt;() {
* public void execute(Execution execution) {
* execution
* .getControl()
* .blocking(new Callable&lt;String&gt;() {
* public String call() {
* // perform a blocking op, e.g. a database call, filesystem read etc.
* return "foo";
* }
* })
* .then(new Action&lt;String&gt;() {
* public void execute(String string) {
* // do something with the value that was obtained from a blocking operation
* latch.countDown();
* }
* });
* }
* });
*
* latch.await();
* }
* }
* </pre>
*
* @param blockingOperation the operation that blocks
* @param <T> the type of value created by the operation
Expand All @@ -157,19 +118,11 @@ public interface ExecControl {
<T> Promise<T> promise(Action<? super Fulfiller<T>> action);

/**
* Forks a new execution on a separate thread.
* <p>
* This is similar to using {@code new Thread().run()} except that the action will be executed
* on a Ratpack managed thread, and will use Ratpack's execution semantics.
* <p>
* Creates a new execution stater that can be used to initiate a new execution.
*
* @param action the initial execution segment
* @return an execution start that can be used to configure and
*/
void fork(Action<? super Execution> action);

void fork(Action<? super Execution> action, Action<? super Throwable> onError);

void fork(Action<? super Execution> action, Action<? super Throwable> onError, Action<? super Execution> onComplete);
ExecStarter exec();

/**
* Process streams of data asynchronously with non-blocking back pressure.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ static Optional<ExecController> current() {
* <p>
* This executor wraps Netty's event loop executor to provide callback features by way of Guava's executor extensions.
* <p>
* It is generally preferable to use {@link ExecControl#fork(ratpack.func.Action)} to submit computation work rather than this method,
* It is generally preferable to use {@link ratpack.exec.ExecControl#exec()} to submit computation work rather than this method,
* which properly initialises Ratpack's execution infrastructure.
*
* @return the executor that performs computation
Expand Down
37 changes: 37 additions & 0 deletions ratpack-core/src/main/java/ratpack/exec/ExecStarter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright 2014 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package ratpack.exec;

import ratpack.func.Action;
import ratpack.registry.RegistrySpec;

/**
* Effectively a builder for a new {@link Execution}.
*
* @see ExecControl#exec()
*/
public interface ExecStarter {

ExecStarter onError(Action<? super Throwable> onError);

ExecStarter onComplete(Action<? super Execution> onComplete);

ExecStarter register(Action<? super RegistrySpec> registry);

void start(Action<? super Execution> onError);

}
2 changes: 1 addition & 1 deletion ratpack-core/src/main/java/ratpack/exec/Execution.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
* <h3>Executions and request handling</h3>
* <p>
* The execution object actually underpins the {@link ratpack.handling.Context} objects that are used when handling requests.
* It is rarely used directly when request handling, except when concurrency or parallelism is required to process data via the {@link ratpack.handling.Context#fork(ratpack.func.Action)} method.
* It is rarely used directly when request handling, except when concurrency or parallelism is required to process data via the {@link ratpack.handling.Context#exec()} method.
* Moreover, it provides its own error handling and completion mechanisms.
* </p>
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,30 @@

package ratpack.exec.internal;

import com.google.common.collect.Lists;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import ratpack.exec.*;
import ratpack.func.Action;
import ratpack.registry.RegistrySpec;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.Callable;

import static ratpack.func.Action.noop;

public class DefaultExecControl implements ExecControl {

private static final Logger LOGGER = ExecutionBacking.LOGGER;
private static final Action<Throwable> LOG_UNCAUGHT = t -> LOGGER.error("Uncaught execution exception", t);
private static final int MAX_ERRORS_THRESHOLD = 5;

private final ExecController execController;
private final ThreadLocal<ExecutionBacking> threadBinding = new ThreadLocal<>();

Expand Down Expand Up @@ -74,27 +84,54 @@ public <T> Promise<T> blocking(final Callable<T> blockingOperation) {
}

@Override
public <T> Promise<T> promise(Action<? super Fulfiller<T>> action) {
return new DefaultPromise<>(this::getBacking, action);
}
public ExecStarter exec() {
return new ExecStarter() {
private Action<? super Throwable> onError = LOG_UNCAUGHT;
private Action<? super Execution> onComplete = noop();
private Action<? super RegistrySpec> registry;

@Override
public void fork(Action<? super Execution> action) {
fork(action, Action.throwException(), Action.noop());
}
@Override
public ExecStarter onError(Action<? super Throwable> onError) {
List<Throwable> seen = Lists.newLinkedList();
this.onError = t -> {
if (seen.size() < MAX_ERRORS_THRESHOLD) {
seen.add(t);
onError.execute(t);
} else {
seen.forEach(t::addSuppressed);
LOGGER.error("Error handler " + onError + "reached maximum error threshold (might be caught in an error loop)", t);
}
};
return this;
}

@Override
public void fork(Action<? super Execution> action, Action<? super Throwable> onError) {
fork(action, onError, Action.noop());
@Override
public ExecStarter onComplete(Action<? super Execution> onComplete) {
this.onComplete = onComplete;
return this;
}

@Override
public ExecStarter register(Action<? super RegistrySpec> registry) {
this.registry = registry;
return this;
}

@Override
public void start(Action<? super Execution> action) {
Action<? super Execution> effectiveAction = registry == null ? action : Action.join(registry, action);
if (execController.isManagedThread() && threadBinding.get() == null) {
new ExecutionBacking(execController, threadBinding, effectiveAction, onError, onComplete);
} else {
execController.getExecutor().submit(() -> new ExecutionBacking(execController, threadBinding, effectiveAction, onError, onComplete));
}
}
};
}

@Override
public void fork(final Action<? super Execution> action, final Action<? super Throwable> onError, final Action<? super Execution> onComplete) {
if (execController.isManagedThread() && threadBinding.get() == null) {
new ExecutionBacking(execController, threadBinding, action, onError, onComplete);
} else {
execController.getExecutor().submit(() -> new ExecutionBacking(execController, threadBinding, action, onError, onComplete));
}
public <T> Promise<T> promise(Action<? super Fulfiller<T>> action) {
return new DefaultPromise<>(this::getBacking, action);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@

public class ExecutionBacking {

private final static Logger LOGGER = LoggerFactory.getLogger(ExecutionBacking.class);
final static Logger LOGGER = LoggerFactory.getLogger(Execution.class);

private final List<ExecInterceptor> interceptors = new LinkedList<>();
private final List<AutoCloseable> closeables = new LinkedList<>();
Expand Down
53 changes: 20 additions & 33 deletions ratpack-core/src/main/java/ratpack/handling/Context.java
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,7 @@ public interface Context extends ExecControl, Registry {
* <p>
* When using forking to process work in parallel, use {@link #promise(ratpack.func.Action)} to continue request handling when the parallel work is done.
*
* <pre class="java">
* <pre class="java">{@code
* import ratpack.handling.Handler;
* import ratpack.handling.Context;
* import ratpack.func.Action;
Expand All @@ -423,11 +423,11 @@ public interface Context extends ExecControl, Registry {
* final int numJobs = 3;
* final Integer failOnIteration = context.getPathTokens().asInt("failOn");
*
* context.promise(new Action&lt;Fulfiller&lt;Integer&gt;&gt;() {
* context.promise(new Action<Fulfiller<Integer>>() {
* private final AtomicInteger counter = new AtomicInteger();
* private final AtomicReference&lt;Throwable&gt; error = new AtomicReference&lt;&gt;();
* private final AtomicReference<Throwable> error = new AtomicReference<>();
*
* private void completeJob(Fulfiller&lt;Integer&gt; fulfiller) {
* private void completeJob(Fulfiller<Integer> fulfiller) {
* if (counter.incrementAndGet() == numJobs) {
* Throwable throwable = error.get();
* if (throwable == null) {
Expand All @@ -438,41 +438,34 @@ public interface Context extends ExecControl, Registry {
* }
* }
*
* public void execute(final Fulfiller&lt;Integer&gt; fulfiller) {
* for (int i = 0; i &lt; numJobs; ++i) {
* public void execute(final Fulfiller<Integer> fulfiller) {
* for (int i = 0; i < numJobs; ++i) {
* final int iteration = i;
*
* context.fork(new Action&lt;Execution&gt;() {
* public void execute(Execution execution) throws Exception {
* if (failOnIteration != null &amp;&amp; failOnIteration.intValue() == iteration) {
* context.exec()
* .onError(throwable -> {
* error.compareAndSet(null, throwable); // just take the first error
* completeJob(fulfiller);
* })
* .start(execution -> {
* if (failOnIteration != null && failOnIteration.intValue() == iteration) {
* throw new Exception("bang!");
* } else {
* completeJob(fulfiller);
* }
* }
*
*
* }, new Action&lt;Throwable&gt;() {
* public void execute(Throwable throwable) {
* error.compareAndSet(null, throwable); // just take the first error
* completeJob(fulfiller);
* }
* });
* });
* }
* }
* }).then(new Action&lt;Integer&gt;() {
* public void execute(Integer integer) {
* context.render(integer.toString());
* }
* });
* })
* .then(i -> context.render(i.toString()));
* }
* }
*
* public static void main(String[] args) throws Exception {
* HandlingResult result = UnitTest.handle(new ForkingHandler(), Action.noop());
* assert result.rendered(String.class).equals("3");
*
* result = UnitTest.handle(new ForkingHandler(), new Action&lt;RequestFixture&gt;() {
* result = UnitTest.handle(new ForkingHandler(), new Action<RequestFixture>() {
* public void execute(RequestFixture fixture) {
* fixture.pathBinding(Collections.singletonMap("failOn", "2"));
* }
Expand All @@ -481,25 +474,19 @@ public interface Context extends ExecControl, Registry {
* assert result.getException().getMessage().equals("bang!");
* }
* }
* </pre>
* }</pre>
*
* @param action the initial execution segment
* @return an execution starter
*/
@Override
void fork(Action<? super Execution> action);
ExecStarter exec();

@Override
ExecController getController();

@Override
void addInterceptor(ExecInterceptor execInterceptor, Action<? super Execution> continuation) throws Exception;

@Override
void fork(Action<? super Execution> action, Action<? super Throwable> onError);

@Override
void fork(Action<? super Execution> action, Action<? super Throwable> onError, Action<? super Execution> onComplete);

<T> void stream(Publisher<T> publisher, Subscriber<? super T> subscriber);

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import com.google.common.reflect.TypeToken;
import ratpack.handling.internal.Extractions;
import ratpack.registry.Registries;

import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
Expand Down Expand Up @@ -147,7 +146,7 @@ protected InjectionHandler() throws NoSuitableHandleMethodException {
public final void handle(Context context) {
Object[] args = new Object[types.size() + 1];
args[0] = context;
Extractions.extract(types, Registries.join(context.getRequest(), context), args, 1);
Extractions.extract(types, context.getRequest().join(context), args, 1);
try {
handleMethod.invoke(this, args);
} catch (IllegalAccessException e) {
Expand Down
Loading

0 comments on commit 16c73ef

Please sign in to comment.