From 8715b71b8c87968f9880d708e919496e44bf000e Mon Sep 17 00:00:00 2001 From: dmgcodevil Date: Mon, 20 Feb 2017 20:09:31 -0500 Subject: [PATCH 1/3] iss1403: Support @HystrixCommand for rx.Single and rx.Completable similar to rx.Observable --- .../aop/aspectj/HystrixCommandAspect.java | 17 +++++++++--- .../javanica/command/ExecutionType.java | 17 +++++++++++- .../command/GenericObservableCommand.java | 17 +++++++++++- .../observable/BasicObservableTest.java | 27 +++++++++++++++++++ 4 files changed, 73 insertions(+), 5 deletions(-) diff --git a/hystrix-contrib/hystrix-javanica/src/main/java/com/netflix/hystrix/contrib/javanica/aop/aspectj/HystrixCommandAspect.java b/hystrix-contrib/hystrix-javanica/src/main/java/com/netflix/hystrix/contrib/javanica/aop/aspectj/HystrixCommandAspect.java index f7d6c0627..1046a0d4c 100644 --- a/hystrix-contrib/hystrix-javanica/src/main/java/com/netflix/hystrix/contrib/javanica/aop/aspectj/HystrixCommandAspect.java +++ b/hystrix-contrib/hystrix-javanica/src/main/java/com/netflix/hystrix/contrib/javanica/aop/aspectj/HystrixCommandAspect.java @@ -41,7 +41,9 @@ import org.aspectj.lang.annotation.Aspect; import org.aspectj.lang.annotation.Pointcut; import org.aspectj.lang.reflect.MethodSignature; +import rx.Completable; import rx.Observable; +import rx.Single; import rx.functions.Func1; import java.lang.reflect.Method; @@ -109,8 +111,8 @@ public Object methodsAnnotatedWithHystrixCommand(final ProceedingJoinPoint joinP return result; } - private Observable executeObservable(HystrixInvokable invokable, ExecutionType executionType, final MetaHolder metaHolder) { - return ((Observable) CommandExecutor.execute(invokable, executionType, metaHolder)) + private Object executeObservable(HystrixInvokable invokable, ExecutionType executionType, final MetaHolder metaHolder) { + return mapObservable(((Observable) CommandExecutor.execute(invokable, executionType, metaHolder)) .onErrorResumeNext(new Func1() { @Override public Observable call(Throwable throwable) { @@ -122,7 +124,16 @@ public Observable call(Throwable throwable) { } return Observable.error(throwable); } - }); + }), metaHolder); + } + + private Object mapObservable(Observable observable, final MetaHolder metaHolder) { + if (Completable.class.isAssignableFrom(metaHolder.getMethod().getReturnType())) { + return observable.toCompletable(); + } else if (Single.class.isAssignableFrom(metaHolder.getMethod().getReturnType())) { + return observable.toSingle(); + } + return observable; } private Throwable hystrixRuntimeExceptionToThrowable(MetaHolder metaHolder, HystrixRuntimeException e) { diff --git a/hystrix-contrib/hystrix-javanica/src/main/java/com/netflix/hystrix/contrib/javanica/command/ExecutionType.java b/hystrix-contrib/hystrix-javanica/src/main/java/com/netflix/hystrix/contrib/javanica/command/ExecutionType.java index ea3e285d1..55f4f26c8 100644 --- a/hystrix-contrib/hystrix-javanica/src/main/java/com/netflix/hystrix/contrib/javanica/command/ExecutionType.java +++ b/hystrix-contrib/hystrix-javanica/src/main/java/com/netflix/hystrix/contrib/javanica/command/ExecutionType.java @@ -15,8 +15,12 @@ */ package com.netflix.hystrix.contrib.javanica.command; +import com.google.common.collect.ImmutableSet; +import rx.Completable; import rx.Observable; +import rx.Single; +import java.util.Set; import java.util.concurrent.Future; /** @@ -39,6 +43,9 @@ public enum ExecutionType { */ OBSERVABLE; + // RX types + private static final Set RX_TYPES = ImmutableSet.of(Observable.class, Single.class, Completable.class); + /** * Gets execution type for specified class type. * @param type the type @@ -47,11 +54,19 @@ public enum ExecutionType { public static ExecutionType getExecutionType(Class type) { if (Future.class.isAssignableFrom(type)) { return ExecutionType.ASYNCHRONOUS; - } else if (Observable.class.isAssignableFrom(type)) { + } else if (isRxType(type)) { return ExecutionType.OBSERVABLE; } else { return ExecutionType.SYNCHRONOUS; } } + private static boolean isRxType(Class cl) { + for (Class rxType : RX_TYPES) { + if (rxType.isAssignableFrom(cl)) { + return true; + } + } + return false; + } } diff --git a/hystrix-contrib/hystrix-javanica/src/main/java/com/netflix/hystrix/contrib/javanica/command/GenericObservableCommand.java b/hystrix-contrib/hystrix-javanica/src/main/java/com/netflix/hystrix/contrib/javanica/command/GenericObservableCommand.java index 120e68f51..c5623aba3 100644 --- a/hystrix-contrib/hystrix-javanica/src/main/java/com/netflix/hystrix/contrib/javanica/command/GenericObservableCommand.java +++ b/hystrix-contrib/hystrix-javanica/src/main/java/com/netflix/hystrix/contrib/javanica/command/GenericObservableCommand.java @@ -28,7 +28,9 @@ import com.netflix.hystrix.exception.HystrixBadRequestException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import rx.Completable; import rx.Observable; +import rx.Single; import rx.functions.Func1; import javax.annotation.concurrent.ThreadSafe; @@ -67,7 +69,8 @@ public GenericObservableCommand(HystrixCommandBuilder builder) { protected Observable construct() { Observable result; try { - result = ((Observable) commandActions.getCommandAction().execute(executionType)) + Observable observable = toObservable(commandActions.getCommandAction().execute(executionType)); + result = observable .onErrorResumeNext(new Func1() { @Override public Observable call(Throwable throwable) { @@ -157,4 +160,16 @@ boolean isIgnorable(Throwable throwable) { } return false; } + + private Observable toObservable(Object obj) { + if (Observable.class.isAssignableFrom(obj.getClass())) { + return (Observable) obj; + } else if (Completable.class.isAssignableFrom(obj.getClass())) { + return ((Completable) obj).toObservable(); + } else if (Single.class.isAssignableFrom(obj.getClass())) { + return ((Single) obj).toObservable(); + } else { + throw new IllegalStateException("unsupported rx type: " + obj.getClass()); + } + } } diff --git a/hystrix-contrib/hystrix-javanica/src/test/java/com/netflix/hystrix/contrib/javanica/test/common/observable/BasicObservableTest.java b/hystrix-contrib/hystrix-javanica/src/test/java/com/netflix/hystrix/contrib/javanica/test/common/observable/BasicObservableTest.java index 9125b6fb9..262c24bb6 100644 --- a/hystrix-contrib/hystrix-javanica/src/test/java/com/netflix/hystrix/contrib/javanica/test/common/observable/BasicObservableTest.java +++ b/hystrix-contrib/hystrix-javanica/src/test/java/com/netflix/hystrix/contrib/javanica/test/common/observable/BasicObservableTest.java @@ -24,8 +24,10 @@ import org.apache.commons.lang3.StringUtils; import org.junit.Before; import org.junit.Test; +import rx.Completable; import rx.Observable; import rx.Observer; +import rx.Single; import rx.Subscriber; import rx.functions.Action1; import rx.subjects.ReplaySubject; @@ -90,6 +92,19 @@ public void call(User user) { assertTrue(getUserCommand.getExecutionEvents().contains(HystrixEventType.SUCCESS)); } + @Test + public void testCompletable(){ + userService.getUserCompletable("1", "name: "); + com.netflix.hystrix.HystrixInvokableInfo getUserCommand = getHystrixCommandByKey("getUserCompletable"); + assertTrue(getUserCommand.getExecutionEvents().contains(HystrixEventType.SUCCESS)); + } + + @Test + public void testSingle(){ + userService.getUserSingle("1", "name: "); + com.netflix.hystrix.HystrixInvokableInfo getUserCommand = getHystrixCommandByKey("getUserSingle"); + assertTrue(getUserCommand.getExecutionEvents().contains(HystrixEventType.SUCCESS)); + } @Test public void testGetUserWithRegularFallback() { @@ -163,6 +178,18 @@ public Observable getUser(final String id, final String name) { return createObservable(id, name); } + @HystrixCommand + public Completable getUserCompletable(final String id, final String name) { + validate(id, name, "getUser has failed"); + return createObservable(id, name).toCompletable(); + } + + @HystrixCommand + public Single getUserSingle(final String id, final String name) { + validate(id, name, "getUser has failed"); + return createObservable(id, name).toSingle(); + } + @HystrixCommand(fallbackMethod = "regularFallback", observableExecutionMode = ObservableExecutionMode.LAZY) public Observable getUserRegularFallback(final String id, final String name) { validate(id, name, "getUser has failed"); From 67fd60a81eb3b686c6b97f05a33ddd113d82d290 Mon Sep 17 00:00:00 2001 From: dmgcodevil Date: Sat, 15 Apr 2017 21:53:50 -0400 Subject: [PATCH 2/3] iss1403: added tests for Single and Completable, fixed bugs in GenericObservableCommand related to these RX types --- .../command/GenericObservableCommand.java | 4 + .../javanica/utils/FallbackMethod.java | 15 +++ .../fallback/BasicGenericFallbackTest.java | 8 ++ .../observable/BasicObservableTest.java | 126 ++++++++++++++++-- 4 files changed, 142 insertions(+), 11 deletions(-) diff --git a/hystrix-contrib/hystrix-javanica/src/main/java/com/netflix/hystrix/contrib/javanica/command/GenericObservableCommand.java b/hystrix-contrib/hystrix-javanica/src/main/java/com/netflix/hystrix/contrib/javanica/command/GenericObservableCommand.java index c5623aba3..75f56a5fa 100644 --- a/hystrix-contrib/hystrix-javanica/src/main/java/com/netflix/hystrix/contrib/javanica/command/GenericObservableCommand.java +++ b/hystrix-contrib/hystrix-javanica/src/main/java/com/netflix/hystrix/contrib/javanica/command/GenericObservableCommand.java @@ -108,6 +108,10 @@ protected Observable resumeWithFallback() { Object res = commandActions.getFallbackAction().executeWithArgs(executionType, args); if (res instanceof Observable) { return (Observable) res; + } else if (res instanceof Single) { + return ((Single) res).toObservable(); + } else if (res instanceof Completable) { + return ((Completable) res).toObservable(); } else { return Observable.just(res); } diff --git a/hystrix-contrib/hystrix-javanica/src/main/java/com/netflix/hystrix/contrib/javanica/utils/FallbackMethod.java b/hystrix-contrib/hystrix-javanica/src/main/java/com/netflix/hystrix/contrib/javanica/utils/FallbackMethod.java index e2b00d0e2..d620f13fa 100644 --- a/hystrix-contrib/hystrix-javanica/src/main/java/com/netflix/hystrix/contrib/javanica/utils/FallbackMethod.java +++ b/hystrix-contrib/hystrix-javanica/src/main/java/com/netflix/hystrix/contrib/javanica/utils/FallbackMethod.java @@ -25,6 +25,7 @@ import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.Validate; +import rx.Completable; import javax.annotation.Nonnull; import javax.annotation.Nullable; @@ -92,6 +93,13 @@ public void validateReturnType(Method commandMethod) { if (ExecutionType.OBSERVABLE == ExecutionType.getExecutionType(commandReturnType)) { if (ExecutionType.OBSERVABLE != getExecutionType()) { Type commandParametrizedType = commandMethod.getGenericReturnType(); + + // basically any object can be wrapped into Completable, Completable itself ins't parametrized + if(Completable.class.isAssignableFrom(commandMethod.getReturnType())) { + validateCompletableReturnType(commandMethod, method.getReturnType()); + return; + } + if (isReturnTypeParametrized(commandMethod)) { commandParametrizedType = getFirstParametrizedType(commandMethod); } @@ -137,6 +145,13 @@ private Type getFirstParametrizedType(Method m) { return null; } + // everything can be wrapped into completable except 'void' + private void validateCompletableReturnType(Method commandMethod, Class callbackReturnType) { + if (Void.TYPE == callbackReturnType) { + throw new FallbackDefinitionException(createErrorMsg(commandMethod, method, "fallback cannot return 'void' if command return type is " + Completable.class.getSimpleName())); + } + } + private void validateReturnType(Method commandMethod, Method fallbackMethod) { if (isGenericReturnType(commandMethod)) { List commandParametrizedTypes = flattenTypeVariables(commandMethod.getGenericReturnType()); diff --git a/hystrix-contrib/hystrix-javanica/src/test/java/com/netflix/hystrix/contrib/javanica/test/common/fallback/BasicGenericFallbackTest.java b/hystrix-contrib/hystrix-javanica/src/test/java/com/netflix/hystrix/contrib/javanica/test/common/fallback/BasicGenericFallbackTest.java index 233bad874..f6f6d1bde 100644 --- a/hystrix-contrib/hystrix-javanica/src/test/java/com/netflix/hystrix/contrib/javanica/test/common/fallback/BasicGenericFallbackTest.java +++ b/hystrix-contrib/hystrix-javanica/src/test/java/com/netflix/hystrix/contrib/javanica/test/common/fallback/BasicGenericFallbackTest.java @@ -14,6 +14,7 @@ import org.slf4j.helpers.MessageFormatter; import org.springframework.test.context.junit4.rules.SpringClassRule; import org.springframework.test.context.junit4.rules.SpringMethodRule; +import rx.Completable; import java.io.Serializable; import java.lang.reflect.InvocationTargetException; @@ -64,6 +65,7 @@ public Object[] methodGenericDefinitionFailure() { new Object[]{MethodGenericDefinitionFailureCase8.class}, new Object[]{MethodGenericDefinitionFailureCase9.class}, new Object[]{MethodGenericDefinitionFailureCase10.class}, + new Object[]{MethodGenericDefinitionFailureCase11.class}, }; } @@ -247,6 +249,12 @@ public static class MethodGenericDefinitionFailureCase10 { private GenericEntity fallback() { return null; } } + public static class MethodGenericDefinitionFailureCase11 { + @HystrixCommand(fallbackMethod = "fallback") + public Completable command() { throw new IllegalStateException(); } + private void fallback() { return; } + } + /* ====================================================================== */ /* ===================== GENERIC CLASS DEFINITIONS =====+================ */ /* =========================== SUCCESS CASES ============================ */ diff --git a/hystrix-contrib/hystrix-javanica/src/test/java/com/netflix/hystrix/contrib/javanica/test/common/observable/BasicObservableTest.java b/hystrix-contrib/hystrix-javanica/src/test/java/com/netflix/hystrix/contrib/javanica/test/common/observable/BasicObservableTest.java index 262c24bb6..8af92d951 100644 --- a/hystrix-contrib/hystrix-javanica/src/test/java/com/netflix/hystrix/contrib/javanica/test/common/observable/BasicObservableTest.java +++ b/hystrix-contrib/hystrix-javanica/src/test/java/com/netflix/hystrix/contrib/javanica/test/common/observable/BasicObservableTest.java @@ -30,7 +30,7 @@ import rx.Single; import rx.Subscriber; import rx.functions.Action1; -import rx.subjects.ReplaySubject; +import rx.functions.Func0; import static com.netflix.hystrix.contrib.javanica.test.common.CommonUtils.getHystrixCommandByKey; import static org.junit.Assert.assertEquals; @@ -93,19 +93,82 @@ public void call(User user) { } @Test - public void testCompletable(){ - userService.getUserCompletable("1", "name: "); - com.netflix.hystrix.HystrixInvokableInfo getUserCommand = getHystrixCommandByKey("getUserCompletable"); + public void testGetCompletableUser(){ + userService.getCompletableUser("1", "name: "); + com.netflix.hystrix.HystrixInvokableInfo getUserCommand = getHystrixCommandByKey("getCompletableUser"); assertTrue(getUserCommand.getExecutionEvents().contains(HystrixEventType.SUCCESS)); } @Test - public void testSingle(){ - userService.getUserSingle("1", "name: "); - com.netflix.hystrix.HystrixInvokableInfo getUserCommand = getHystrixCommandByKey("getUserSingle"); + public void testGetCompletableUserWithRegularFallback() { + Completable completable = userService.getCompletableUserWithRegularFallback(null, "name: "); + completable.toObservable().subscribe(new Action1() { + @Override + public void call(User user) { + assertEquals("default_id", user.getId()); + } + }); + com.netflix.hystrix.HystrixInvokableInfo getUserCommand = getHystrixCommandByKey("getCompletableUserWithRegularFallback"); + assertTrue(getUserCommand.getExecutionEvents().contains(HystrixEventType.FAILURE)); + assertTrue(getUserCommand.getExecutionEvents().contains(HystrixEventType.FALLBACK_SUCCESS)); + } + + @Test + public void testGetCompletableUserWithRxFallback() { + Completable completable = userService.getCompletableUserWithRxFallback(null, "name: "); + completable.toObservable().subscribe(new Action1() { + @Override + public void call(User user) { + assertEquals("default_id", user.getId()); + } + }); + com.netflix.hystrix.HystrixInvokableInfo getUserCommand = getHystrixCommandByKey("getCompletableUserWithRxFallback"); + assertTrue(getUserCommand.getExecutionEvents().contains(HystrixEventType.FAILURE)); + assertTrue(getUserCommand.getExecutionEvents().contains(HystrixEventType.FALLBACK_SUCCESS)); + } + + @Test + public void testGetSingleUser() { + final String id = "1"; + Single user = userService.getSingleUser(id, "name: "); + user.subscribe(new Action1() { + @Override + public void call(User user) { + assertEquals(id, user.getId()); + } + }); + com.netflix.hystrix.HystrixInvokableInfo getUserCommand = getHystrixCommandByKey("getSingleUser"); assertTrue(getUserCommand.getExecutionEvents().contains(HystrixEventType.SUCCESS)); } + @Test + public void testGetSingleUserWithRegularFallback(){ + Single user = userService.getSingleUserWithRegularFallback(null, "name: "); + user.subscribe(new Action1() { + @Override + public void call(User user) { + assertEquals("default_id", user.getId()); + } + }); + com.netflix.hystrix.HystrixInvokableInfo getUserCommand = getHystrixCommandByKey("getSingleUserWithRegularFallback"); + assertTrue(getUserCommand.getExecutionEvents().contains(HystrixEventType.FAILURE)); + assertTrue(getUserCommand.getExecutionEvents().contains(HystrixEventType.FALLBACK_SUCCESS)); + } + + @Test + public void testGetSingleUserWithRxFallback(){ + Single user = userService.getSingleUserWithRxFallback(null, "name: "); + user.subscribe(new Action1() { + @Override + public void call(User user) { + assertEquals("default_id", user.getId()); + } + }); + com.netflix.hystrix.HystrixInvokableInfo getUserCommand = getHystrixCommandByKey("getSingleUserWithRxFallback"); + assertTrue(getUserCommand.getExecutionEvents().contains(HystrixEventType.FAILURE)); + assertTrue(getUserCommand.getExecutionEvents().contains(HystrixEventType.FALLBACK_SUCCESS)); + } + @Test public void testGetUserWithRegularFallback() { final User exUser = new User("def", "def"); @@ -179,17 +242,58 @@ public Observable getUser(final String id, final String name) { } @HystrixCommand - public Completable getUserCompletable(final String id, final String name) { - validate(id, name, "getUser has failed"); + public Completable getCompletableUser(final String id, final String name) { + validate(id, name, "getCompletableUser has failed"); return createObservable(id, name).toCompletable(); } + @HystrixCommand(fallbackMethod = "completableUserRegularFallback") + public Completable getCompletableUserWithRegularFallback(final String id, final String name) { + return getCompletableUser(id, name); + } + + @HystrixCommand(fallbackMethod = "completableUserRxFallback") + public Completable getCompletableUserWithRxFallback(final String id, final String name) { + return getCompletableUser(id, name); + } + + public User completableUserRegularFallback(final String id, final String name) { + return new User("default_id", "default_name"); + } + + public Completable completableUserRxFallback(final String id, final String name) { + return Completable.fromCallable(new Func0() { + @Override + public User call() { + return new User("default_id", "default_name"); + } + }); + } + @HystrixCommand - public Single getUserSingle(final String id, final String name) { - validate(id, name, "getUser has failed"); + public Single getSingleUser(final String id, final String name) { + validate(id, name, "getSingleUser has failed"); return createObservable(id, name).toSingle(); } + @HystrixCommand(fallbackMethod = "singleUserRegularFallback") + public Single getSingleUserWithRegularFallback(final String id, final String name) { + return getSingleUser(id, name); + } + + @HystrixCommand(fallbackMethod = "singleUserRxFallback") + public Single getSingleUserWithRxFallback(final String id, final String name) { + return getSingleUser(id, name); + } + + User singleUserRegularFallback(final String id, final String name) { + return new User("default_id", "default_name"); + } + + Single singleUserRxFallback(final String id, final String name) { + return createObservable("default_id", "default_name").toSingle(); + } + @HystrixCommand(fallbackMethod = "regularFallback", observableExecutionMode = ObservableExecutionMode.LAZY) public Observable getUserRegularFallback(final String id, final String name) { validate(id, name, "getUser has failed"); From e79acc948ec317a8be272ad1791126dd72a4cc38 Mon Sep 17 00:00:00 2001 From: Roman Pleshkov Date: Sun, 16 Apr 2017 11:56:42 -0400 Subject: [PATCH 3/3] iss1403 added support for 'Single' and 'Completable' rx types --- hystrix-contrib/hystrix-javanica/README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/hystrix-contrib/hystrix-javanica/README.md b/hystrix-contrib/hystrix-javanica/README.md index df6346fb0..3a8b73890 100644 --- a/hystrix-contrib/hystrix-javanica/README.md +++ b/hystrix-contrib/hystrix-javanica/README.md @@ -125,8 +125,8 @@ To perform "Reactive Execution" you should return an instance of `Observable` in }); } ``` - -The return type of command method should be `Observable`. +In addition to `Observable` Javanica supports the following RX types: `Single` and `Completable`. +Hystrix core supports only one RX type which is `Observable`, `HystrixObservableCommand` requires to return `Observable` therefore javanica transforms `Single` or `Completable` to `Observable` using `toObservable()` method for appropriate type and before returning the result to caller it translates `Observable` to either `Single` or `Completable` using `toSingle()` or `toCompletable()` correspondingly. HystrixObservable interface provides two methods: ```observe()``` - eagerly starts execution of the command the same as ``` HystrixCommand#queue()``` and ```HystrixCommand#execute()```; ```toObservable()``` - lazily starts execution of the command only once the Observable is subscribed to. To control this behaviour and swith between two modes ```@HystrixCommand``` provides specific parameter called ```observableExecutionMode```. ```@HystrixCommand(observableExecutionMode = EAGER)``` indicates that ```observe()``` method should be used to execute observable command