Skip to content

Commit

Permalink
Filter refactor, keep all callback methods inside Filter.Listener (ap…
Browse files Browse the repository at this point in the history
  • Loading branch information
chickenlj authored Feb 28, 2020
1 parent 54b0d7e commit 47ee52d
Show file tree
Hide file tree
Showing 19 changed files with 80 additions and 76 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
* MonitorFilter. (SPI, Singleton, ThreadSafe)
*/
@Activate(group = {PROVIDER, CONSUMER})
public class MonitorFilter implements Filter, Filter.Listener2 {
public class MonitorFilter implements Filter, Filter.Listener {

private static final Logger logger = LoggerFactory.getLogger(MonitorFilter.class);
private static final String MONITOR_FILTER_START_TIME = "monitor_filter_start_time";
Expand Down Expand Up @@ -96,7 +96,7 @@ private AtomicInteger getConcurrent(Invoker<?> invoker, Invocation invocation) {
}

@Override
public void onMessage(Result result, Invoker<?> invoker, Invocation invocation) {
public void onResponse(Result result, Invoker<?> invoker, Invocation invocation) {
if (invoker.getUrl().hasParameter(MONITOR_KEY)) {
collect(invoker, invocation, result, RpcContext.getContext().getRemoteHost(), (long) invocation.get(MONITOR_FILTER_START_TIME), false);
getConcurrent(invoker, invocation).decrementAndGet(); // count down
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ public void testFilter() throws Exception {
Result result = monitorFilter.invoke(serviceInvoker, invocation);
result.whenCompleteWithContext((r, t) -> {
if (t == null) {
monitorFilter.onMessage(r, serviceInvoker, invocation);
monitorFilter.onResponse(r, serviceInvoker, invocation);
} else {
monitorFilter.onError(t, serviceInvoker, invocation);
}
Expand Down Expand Up @@ -167,7 +167,7 @@ public void testGenericFilter() throws Exception {
Result result = monitorFilter.invoke(serviceInvoker, invocation);
result.whenCompleteWithContext((r, t) -> {
if (t == null) {
monitorFilter.onMessage(r, serviceInvoker, invocation);
monitorFilter.onResponse(r, serviceInvoker, invocation);
} else {
monitorFilter.onError(t, serviceInvoker, invocation);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,32 +47,11 @@ public interface Filter {
*/
Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException;

/**
* Please use {@link Listener2#onMessage(Result, Invoker, Invocation)} instead.
* This method is kept only for compatibility and may get removed at any version in the future.
*
* @param appResponse
* @param invoker
* @param invocation
*/
@Deprecated
default Result onResponse(Result appResponse, Invoker<?> invoker, Invocation invocation) {
return appResponse;
}

@Deprecated
interface Listener {

void onResponse(Result appResponse, Invoker<?> invoker, Invocation invocation);

void onError(Throwable t, Invoker<?> invoker, Invocation invocation);
}
interface Listener2 {

void onMessage(Result appResponse, Invoker<?> invoker, Invocation invocation);

void onError(Throwable t, Invoker<?> invoker, Invocation invocation);
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,38 @@
*/
package org.apache.dubbo.rpc;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

/**
* This abstract will be removed soon from one future release.
* Please implementing Filter.Listener2 directly for callback registration,
* check the default implementation, see {@link org.apache.dubbo.rpc.filter.ExceptionFilter}, for example.
* It's recommended to implement Filter.Listener directly for callback registration, check the default implementation,
* see {@link org.apache.dubbo.rpc.filter.ExceptionFilter}, for example.
* <p>
* If you do not want to share Listener instance between RPC calls. You can use ListenableFilter
* to keep a 'one Listener each RPC call' model.
*/
@Deprecated
public abstract class ListenableFilter implements Filter {

protected Listener listener = null;
protected final ConcurrentMap<Invocation, Listener> listeners = new ConcurrentHashMap<>();

public Listener listener() {
return listener;
}

public Listener listener(Invocation invocation) {
Listener invListener = listeners.get(invocation);
if (invListener == null) {
invListener = listener;
}
return invListener;
}

public void addListener(Invocation invocation, Listener listener) {
listeners.putIfAbsent(invocation, listener);
}

public void removeListener(Invocation invocation) {
listeners.remove(invocation);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
* @see Filter
*/
@Activate(group = CONSUMER, value = ACTIVES_KEY)
public class ActiveLimitFilter implements Filter, Filter.Listener2 {
public class ActiveLimitFilter implements Filter, Filter.Listener {

private static final String ACTIVELIMIT_FILTER_START_TIME = "activelimit_filter_start_time";

Expand Down Expand Up @@ -82,7 +82,7 @@ public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcExcept
}

@Override
public void onMessage(Result appResponse, Invoker<?> invoker, Invocation invocation) {
public void onResponse(Result appResponse, Invoker<?> invoker, Invocation invocation) {
String methodName = invocation.getMethodName();
URL url = invoker.getUrl();
int max = invoker.getUrl().getMethodParameter(methodName, ACTIVES_KEY, 0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
* @see Filter
*
*/
public class CompatibleFilter implements Filter, Filter.Listener2 {
public class CompatibleFilter implements Filter, Filter.Listener {

private static Logger logger = LoggerFactory.getLogger(CompatibleFilter.class);

Expand All @@ -55,7 +55,7 @@ public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcExcept
}

@Override
public void onMessage(Result appResponse, Invoker<?> invoker, Invocation invocation) {
public void onResponse(Result appResponse, Invoker<?> invoker, Invocation invocation) {
if (!invocation.getMethodName().startsWith("$") && !appResponse.hasException()) {
Object value = appResponse.getValue();
if (value != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
* @see RpcContext
*/
@Activate(group = PROVIDER, order = -10000)
public class ContextFilter implements Filter, Filter.Listener2 {
public class ContextFilter implements Filter, Filter.Listener {

private static final String TAG_KEY = "dubbo.tag";

Expand Down Expand Up @@ -125,7 +125,7 @@ public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcExcept
}

@Override
public void onMessage(Result appResponse, Invoker<?> invoker, Invocation invocation) {
public void onResponse(Result appResponse, Invoker<?> invoker, Invocation invocation) {
// pass attachments to result
appResponse.addObjectAttachments(RpcContext.getServerContext().getObjectAttachments());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
* </ol>
*/
@Activate(group = CommonConstants.PROVIDER)
public class ExceptionFilter implements Filter, Filter.Listener2 {
public class ExceptionFilter implements Filter, Filter.Listener {
private Logger logger = LoggerFactory.getLogger(ExceptionFilter.class);

@Override
Expand All @@ -53,7 +53,7 @@ public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcExcept
}

@Override
public void onMessage(Result appResponse, Invoker<?> invoker, Invocation invocation) {
public void onResponse(Result appResponse, Invoker<?> invoker, Invocation invocation) {
if (appResponse.hasException() && GenericService.class != invoker.getInterface()) {
try {
Throwable exception = appResponse.getException();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,12 @@


/**
*
* The maximum parallel execution request count per method per service for the provider.If the max configured
* <b>executes</b> is set to 10 and if invoke request where it is already 10 then it will throws exception. It
* continue the same behaviour un till it is <10.
*
*/
@Activate(group = CommonConstants.PROVIDER, value = EXECUTES_KEY)
public class ExecuteLimitFilter implements Filter, Filter.Listener2 {
public class ExecuteLimitFilter implements Filter, Filter.Listener {

private static final String EXECUTELIMIT_FILTER_START_TIME = "execugtelimit_filter_start_time";

Expand Down Expand Up @@ -66,7 +64,7 @@ public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcExcept
}

@Override
public void onMessage(Result appResponse, Invoker<?> invoker, Invocation invocation) {
public void onResponse(Result appResponse, Invoker<?> invoker, Invocation invocation) {
RpcStatus.endCount(invoker.getUrl(), invocation.getMethodName(), getElapsed(invocation), true);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
* GenericInvokerFilter.
*/
@Activate(group = CommonConstants.PROVIDER, order = -20000)
public class GenericFilter implements Filter, Filter.Listener2 {
public class GenericFilter implements Filter, Filter.Listener {

@Override
public Result invoke(Invoker<?> invoker, Invocation inv) throws RpcException {
Expand Down Expand Up @@ -153,7 +153,7 @@ public Result invoke(Invoker<?> invoker, Invocation inv) throws RpcException {
}

@Override
public void onMessage(Result appResponse, Invoker<?> invoker, Invocation inv) {
public void onResponse(Result appResponse, Invoker<?> invoker, Invocation inv) {
if ((inv.getMethodName().equals($INVOKE) || inv.getMethodName().equals($INVOKE_ASYNC))
&& inv.getArguments() != null
&& inv.getArguments().length == 3
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
* GenericImplInvokerFilter
*/
@Activate(group = CommonConstants.CONSUMER, value = GENERIC_KEY, order = 20000)
public class GenericImplFilter implements Filter, Filter.Listener2 {
public class GenericImplFilter implements Filter, Filter.Listener {

private static final Logger logger = LoggerFactory.getLogger(GenericImplFilter.class);

Expand Down Expand Up @@ -131,7 +131,7 @@ private void error(String generic, String expected, String actual) throws RpcExc
}

@Override
public void onMessage(Result appResponse, Invoker<?> invoker, Invocation invocation) {
public void onResponse(Result appResponse, Invoker<?> invoker, Invocation invocation) {
String generic = invoker.getUrl().getParameter(GENERIC_KEY);
String methodName = invocation.getMethodName();
Class<?>[] parameterTypes = invocation.getParameterTypes();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
* Log any invocation timeout, but don't stop server from running
*/
@Activate(group = CommonConstants.PROVIDER)
public class TimeoutFilter implements Filter, Filter.Listener2 {
public class TimeoutFilter implements Filter, Filter.Listener {

private static final Logger logger = LoggerFactory.getLogger(TimeoutFilter.class);

Expand All @@ -45,7 +45,7 @@ public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcExcept
}

@Override
public void onMessage(Result appResponse, Invoker<?> invoker, Invocation invocation) {
public void onResponse(Result appResponse, Invoker<?> invoker, Invocation invocation) {
Object startTime = invocation.get(TIMEOUT_FILTER_START_TIME);
if (startTime != null) {
long elapsed = System.currentTimeMillis() - (Long) startTime;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,38 +80,46 @@ public Result invoke(Invocation invocation) throws RpcException {
try {
asyncResult = filter.invoke(next, invocation);
} catch (Exception e) {
if (filter instanceof ListenableFilter) {// Deprecated!
Filter.Listener listener = ((ListenableFilter) filter).listener();
if (listener != null) {
listener.onError(e, invoker, invocation);
if (filter instanceof ListenableFilter) {
ListenableFilter listenableFilter = ((ListenableFilter) filter);
try {
Filter.Listener listener = listenableFilter.listener(invocation);
if (listener != null) {
listener.onError(e, invoker, invocation);
}
} finally {
listenableFilter.removeListener(invocation);
}
} else if (filter instanceof Filter.Listener2) {
Filter.Listener2 listener = (Filter.Listener2) filter;
} else if (filter instanceof Filter.Listener) {
Filter.Listener listener = (Filter.Listener) filter;
listener.onError(e, invoker, invocation);
}
throw e;
} finally {

}
return asyncResult.whenCompleteWithContext((r, t) -> {
if (filter instanceof ListenableFilter) {// Deprecated!
Filter.Listener listener = ((ListenableFilter) filter).listener();
if (listener != null) {
if (t == null) {
listener.onResponse(r, invoker, invocation);
} else {
listener.onError(t, invoker, invocation);
if (filter instanceof ListenableFilter) {
ListenableFilter listenableFilter = ((ListenableFilter) filter);
Filter.Listener listener = listenableFilter.listener(invocation);
try {
if (listener != null) {
if (t == null) {
listener.onResponse(r, invoker, invocation);
} else {
listener.onError(t, invoker, invocation);
}
}
} finally {
listenableFilter.removeListener(invocation);
}
} else if (filter instanceof Filter.Listener2) {
Filter.Listener2 listener = (Filter.Listener2) filter;
} else if (filter instanceof Filter.Listener) {
Filter.Listener listener = (Filter.Listener) filter;
if (t == null) {
listener.onMessage(r, invoker, invocation);
listener.onResponse(r, invoker, invocation);
} else {
listener.onError(t, invoker, invocation);
}
} else {// Deprecated!
filter.onResponse(r, invoker, invocation);
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,10 +123,9 @@ public void run() {
try {
Result asyncResult = activeLimitFilter.invoke(invoker, invocation);
Result result = asyncResult.get();
activeLimitFilter.onMessage(result, invoker, invocation);
activeLimitFilter.onResponse(result, invoker, invocation);
} catch (RpcException expected) {
count.incrementAndGet();
// activeLimitFilter.Listener2().onError(expected, invoker, invocation);
} catch (Exception e) {
fail();
}
Expand Down Expand Up @@ -171,7 +170,7 @@ public void run() {
try {
Result asyncResult = activeLimitFilter.invoke(invoker, invocation);
Result result = asyncResult.get();
activeLimitFilter.onMessage(result, invoker, invocation);
activeLimitFilter.onResponse(result, invoker, invocation);
} catch (RpcException expected) {
count.incrementAndGet();
activeLimitFilter.onError(expected, invoker, invocation);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ public void testInvokerJsonPojoSerialization() throws Exception {

Result asyncResult = compatibleFilter.invoke(invoker, invocation);
AppResponse appResponse = (AppResponse) asyncResult.get();
compatibleFilter.onMessage(appResponse, invoker, invocation);
compatibleFilter.onResponse(appResponse, invoker, invocation);
assertEquals(Type.High, appResponse.getValue());
}

Expand All @@ -128,7 +128,7 @@ public void testInvokerNonJsonEnumSerialization() throws Exception {

Result asyncResult = compatibleFilter.invoke(invoker, invocation);
AppResponse appResponse = (AppResponse) asyncResult.get();
compatibleFilter.onMessage(appResponse, invoker, invocation);
compatibleFilter.onResponse(appResponse, invoker, invocation);
assertEquals(Type.High, appResponse.getValue());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ public void testConvertToRunTimeException() throws Exception {
Result asyncResult = exceptionFilter.invoke(invoker, invocation);

AppResponse appResponse = (AppResponse) asyncResult.get();
exceptionFilter.onMessage(appResponse, invoker, invocation);
exceptionFilter.onResponse(appResponse, invoker, invocation);

Assertions.assertFalse(appResponse.getException() instanceof HessianException);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public void testInvokeWithDefault() throws Exception {
Result asyncResult = genericFilter.invoke(invoker, invocation);

AppResponse appResponse = (AppResponse) asyncResult.get();
genericFilter.onMessage(appResponse, invoker, invocation);
genericFilter.onResponse(appResponse, invoker, invocation);
Assertions.assertEquals(HashMap.class, appResponse.getValue().getClass());
Assertions.assertEquals(10, ((HashMap) appResponse.getValue()).get("age"));

Expand Down
Loading

0 comments on commit 47ee52d

Please sign in to comment.