Skip to content

Commit

Permalink
Merge pull request Netflix#1524 from dmgcodevil/iss1403
Browse files Browse the repository at this point in the history
iss1403: Support @HystrixCommand for rx.Single and rx.Completable sim…
  • Loading branch information
mattrjacobs authored May 9, 2017
2 parents 251e5d9 + e79acc9 commit dd1087a
Show file tree
Hide file tree
Showing 7 changed files with 207 additions and 8 deletions.
4 changes: 2 additions & 2 deletions hystrix-contrib/hystrix-javanica/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,8 @@ To perform "Reactive Execution" you should return an instance of `Observable` in
});
}
```

The return type of command method should be `Observable`.
In addition to `Observable` Javanica supports the following RX types: `Single` and `Completable`.
Hystrix core supports only one RX type which is `Observable`, `HystrixObservableCommand` requires to return `Observable` therefore javanica transforms `Single` or `Completable` to `Observable` using `toObservable()` method for appropriate type and before returning the result to caller it translates `Observable` to either `Single` or `Completable` using `toSingle()` or `toCompletable()` correspondingly.

HystrixObservable interface provides two methods: ```observe()``` - eagerly starts execution of the command the same as ``` HystrixCommand#queue()``` and ```HystrixCommand#execute()```; ```toObservable()``` - lazily starts execution of the command only once the Observable is subscribed to. To control this behaviour and swith between two modes ```@HystrixCommand``` provides specific parameter called ```observableExecutionMode```.
```@HystrixCommand(observableExecutionMode = EAGER)``` indicates that ```observe()``` method should be used to execute observable command
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.aspectj.lang.reflect.MethodSignature;
import rx.Completable;
import rx.Observable;
import rx.Single;
import rx.functions.Func1;

import java.lang.reflect.Method;
Expand Down Expand Up @@ -110,8 +112,8 @@ public Object methodsAnnotatedWithHystrixCommand(final ProceedingJoinPoint joinP
return result;
}

private Observable executeObservable(HystrixInvokable invokable, ExecutionType executionType, final MetaHolder metaHolder) {
return ((Observable) CommandExecutor.execute(invokable, executionType, metaHolder))
private Object executeObservable(HystrixInvokable invokable, ExecutionType executionType, final MetaHolder metaHolder) {
return mapObservable(((Observable) CommandExecutor.execute(invokable, executionType, metaHolder))
.onErrorResumeNext(new Func1<Throwable, Observable>() {
@Override
public Observable call(Throwable throwable) {
Expand All @@ -123,7 +125,16 @@ public Observable call(Throwable throwable) {
}
return Observable.error(throwable);
}
});
}), metaHolder);
}

private Object mapObservable(Observable observable, final MetaHolder metaHolder) {
if (Completable.class.isAssignableFrom(metaHolder.getMethod().getReturnType())) {
return observable.toCompletable();
} else if (Single.class.isAssignableFrom(metaHolder.getMethod().getReturnType())) {
return observable.toSingle();
}
return observable;
}

private Throwable hystrixRuntimeExceptionToThrowable(MetaHolder metaHolder, HystrixRuntimeException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,12 @@
*/
package com.netflix.hystrix.contrib.javanica.command;

import com.google.common.collect.ImmutableSet;
import rx.Completable;
import rx.Observable;
import rx.Single;

import java.util.Set;
import java.util.concurrent.Future;

/**
Expand All @@ -39,6 +43,9 @@ public enum ExecutionType {
*/
OBSERVABLE;

// RX types
private static final Set<? extends Class> RX_TYPES = ImmutableSet.of(Observable.class, Single.class, Completable.class);

/**
* Gets execution type for specified class type.
* @param type the type
Expand All @@ -47,11 +54,19 @@ public enum ExecutionType {
public static ExecutionType getExecutionType(Class<?> type) {
if (Future.class.isAssignableFrom(type)) {
return ExecutionType.ASYNCHRONOUS;
} else if (Observable.class.isAssignableFrom(type)) {
} else if (isRxType(type)) {
return ExecutionType.OBSERVABLE;
} else {
return ExecutionType.SYNCHRONOUS;
}
}

private static boolean isRxType(Class<?> cl) {
for (Class<?> rxType : RX_TYPES) {
if (rxType.isAssignableFrom(cl)) {
return true;
}
}
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@
import com.netflix.hystrix.exception.HystrixBadRequestException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Completable;
import rx.Observable;
import rx.Single;
import rx.functions.Func1;

import javax.annotation.concurrent.ThreadSafe;
Expand Down Expand Up @@ -67,7 +69,8 @@ public GenericObservableCommand(HystrixCommandBuilder builder) {
protected Observable construct() {
Observable result;
try {
result = ((Observable) commandActions.getCommandAction().execute(executionType))
Observable observable = toObservable(commandActions.getCommandAction().execute(executionType));
result = observable
.onErrorResumeNext(new Func1<Throwable, Observable>() {
@Override
public Observable call(Throwable throwable) {
Expand Down Expand Up @@ -105,6 +108,10 @@ protected Observable resumeWithFallback() {
Object res = commandActions.getFallbackAction().executeWithArgs(executionType, args);
if (res instanceof Observable) {
return (Observable) res;
} else if (res instanceof Single) {
return ((Single) res).toObservable();
} else if (res instanceof Completable) {
return ((Completable) res).toObservable();
} else {
return Observable.just(res);
}
Expand Down Expand Up @@ -157,4 +164,16 @@ boolean isIgnorable(Throwable throwable) {
}
return false;
}

private Observable toObservable(Object obj) {
if (Observable.class.isAssignableFrom(obj.getClass())) {
return (Observable) obj;
} else if (Completable.class.isAssignableFrom(obj.getClass())) {
return ((Completable) obj).toObservable();
} else if (Single.class.isAssignableFrom(obj.getClass())) {
return ((Single) obj).toObservable();
} else {
throw new IllegalStateException("unsupported rx type: " + obj.getClass());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.netflix.hystrix.contrib.javanica.exception.FallbackDefinitionException;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.Validate;
import rx.Completable;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -97,6 +98,13 @@ public void validateReturnType(Method commandMethod) throws FallbackDefinitionEx
if (ExecutionType.OBSERVABLE == ExecutionType.getExecutionType(commandReturnType)) {
if (ExecutionType.OBSERVABLE != getExecutionType()) {
Type commandParametrizedType = commandMethod.getGenericReturnType();

// basically any object can be wrapped into Completable, Completable itself ins't parametrized
if(Completable.class.isAssignableFrom(commandMethod.getReturnType())) {
validateCompletableReturnType(commandMethod, method.getReturnType());
return;
}

if (isReturnTypeParametrized(commandMethod)) {
commandParametrizedType = getFirstParametrizedType(commandMethod);
}
Expand Down Expand Up @@ -142,6 +150,13 @@ private Type getFirstParametrizedType(Method m) {
return null;
}

// everything can be wrapped into completable except 'void'
private void validateCompletableReturnType(Method commandMethod, Class<?> callbackReturnType) {
if (Void.TYPE == callbackReturnType) {
throw new FallbackDefinitionException(createErrorMsg(commandMethod, method, "fallback cannot return 'void' if command return type is " + Completable.class.getSimpleName()));
}
}

private void validateReturnType(Method commandMethod, Method fallbackMethod) {
if (isGenericReturnType(commandMethod)) {
List<Type> commandParametrizedTypes = flattenTypeVariables(commandMethod.getGenericReturnType());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.slf4j.helpers.MessageFormatter;
import org.springframework.test.context.junit4.rules.SpringClassRule;
import org.springframework.test.context.junit4.rules.SpringMethodRule;
import rx.Completable;

import java.io.Serializable;
import java.lang.reflect.InvocationTargetException;
Expand Down Expand Up @@ -64,6 +65,7 @@ public Object[] methodGenericDefinitionFailure() {
new Object[]{MethodGenericDefinitionFailureCase8.class},
new Object[]{MethodGenericDefinitionFailureCase9.class},
new Object[]{MethodGenericDefinitionFailureCase10.class},
new Object[]{MethodGenericDefinitionFailureCase11.class},

};
}
Expand Down Expand Up @@ -247,6 +249,12 @@ public static class MethodGenericDefinitionFailureCase10 {
private GenericEntity<? super Comparable> fallback() { return null; }
}

public static class MethodGenericDefinitionFailureCase11 {
@HystrixCommand(fallbackMethod = "fallback")
public Completable command() { throw new IllegalStateException(); }
private void fallback() { return; }
}

/* ====================================================================== */
/* ===================== GENERIC CLASS DEFINITIONS =====+================ */
/* =========================== SUCCESS CASES ============================ */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,13 @@
import org.apache.commons.lang3.StringUtils;
import org.junit.Before;
import org.junit.Test;
import rx.Completable;
import rx.Observable;
import rx.Observer;
import rx.Single;
import rx.Subscriber;
import rx.functions.Action1;
import rx.subjects.ReplaySubject;
import rx.functions.Func0;

import static com.netflix.hystrix.contrib.javanica.test.common.CommonUtils.getHystrixCommandByKey;
import static org.junit.Assert.assertEquals;
Expand Down Expand Up @@ -90,6 +92,82 @@ public void call(User user) {
assertTrue(getUserCommand.getExecutionEvents().contains(HystrixEventType.SUCCESS));
}

@Test
public void testGetCompletableUser(){
userService.getCompletableUser("1", "name: ");
com.netflix.hystrix.HystrixInvokableInfo getUserCommand = getHystrixCommandByKey("getCompletableUser");
assertTrue(getUserCommand.getExecutionEvents().contains(HystrixEventType.SUCCESS));
}

@Test
public void testGetCompletableUserWithRegularFallback() {
Completable completable = userService.getCompletableUserWithRegularFallback(null, "name: ");
completable.<User>toObservable().subscribe(new Action1<User>() {
@Override
public void call(User user) {
assertEquals("default_id", user.getId());
}
});
com.netflix.hystrix.HystrixInvokableInfo getUserCommand = getHystrixCommandByKey("getCompletableUserWithRegularFallback");
assertTrue(getUserCommand.getExecutionEvents().contains(HystrixEventType.FAILURE));
assertTrue(getUserCommand.getExecutionEvents().contains(HystrixEventType.FALLBACK_SUCCESS));
}

@Test
public void testGetCompletableUserWithRxFallback() {
Completable completable = userService.getCompletableUserWithRxFallback(null, "name: ");
completable.<User>toObservable().subscribe(new Action1<User>() {
@Override
public void call(User user) {
assertEquals("default_id", user.getId());
}
});
com.netflix.hystrix.HystrixInvokableInfo getUserCommand = getHystrixCommandByKey("getCompletableUserWithRxFallback");
assertTrue(getUserCommand.getExecutionEvents().contains(HystrixEventType.FAILURE));
assertTrue(getUserCommand.getExecutionEvents().contains(HystrixEventType.FALLBACK_SUCCESS));
}

@Test
public void testGetSingleUser() {
final String id = "1";
Single<User> user = userService.getSingleUser(id, "name: ");
user.subscribe(new Action1<User>() {
@Override
public void call(User user) {
assertEquals(id, user.getId());
}
});
com.netflix.hystrix.HystrixInvokableInfo getUserCommand = getHystrixCommandByKey("getSingleUser");
assertTrue(getUserCommand.getExecutionEvents().contains(HystrixEventType.SUCCESS));
}

@Test
public void testGetSingleUserWithRegularFallback(){
Single<User> user = userService.getSingleUserWithRegularFallback(null, "name: ");
user.subscribe(new Action1<User>() {
@Override
public void call(User user) {
assertEquals("default_id", user.getId());
}
});
com.netflix.hystrix.HystrixInvokableInfo getUserCommand = getHystrixCommandByKey("getSingleUserWithRegularFallback");
assertTrue(getUserCommand.getExecutionEvents().contains(HystrixEventType.FAILURE));
assertTrue(getUserCommand.getExecutionEvents().contains(HystrixEventType.FALLBACK_SUCCESS));
}

@Test
public void testGetSingleUserWithRxFallback(){
Single<User> user = userService.getSingleUserWithRxFallback(null, "name: ");
user.subscribe(new Action1<User>() {
@Override
public void call(User user) {
assertEquals("default_id", user.getId());
}
});
com.netflix.hystrix.HystrixInvokableInfo getUserCommand = getHystrixCommandByKey("getSingleUserWithRxFallback");
assertTrue(getUserCommand.getExecutionEvents().contains(HystrixEventType.FAILURE));
assertTrue(getUserCommand.getExecutionEvents().contains(HystrixEventType.FALLBACK_SUCCESS));
}

@Test
public void testGetUserWithRegularFallback() {
Expand Down Expand Up @@ -163,6 +241,59 @@ public Observable<User> getUser(final String id, final String name) {
return createObservable(id, name);
}

@HystrixCommand
public Completable getCompletableUser(final String id, final String name) {
validate(id, name, "getCompletableUser has failed");
return createObservable(id, name).toCompletable();
}

@HystrixCommand(fallbackMethod = "completableUserRegularFallback")
public Completable getCompletableUserWithRegularFallback(final String id, final String name) {
return getCompletableUser(id, name);
}

@HystrixCommand(fallbackMethod = "completableUserRxFallback")
public Completable getCompletableUserWithRxFallback(final String id, final String name) {
return getCompletableUser(id, name);
}

public User completableUserRegularFallback(final String id, final String name) {
return new User("default_id", "default_name");
}

public Completable completableUserRxFallback(final String id, final String name) {
return Completable.fromCallable(new Func0<User>() {
@Override
public User call() {
return new User("default_id", "default_name");
}
});
}

@HystrixCommand
public Single<User> getSingleUser(final String id, final String name) {
validate(id, name, "getSingleUser has failed");
return createObservable(id, name).toSingle();
}

@HystrixCommand(fallbackMethod = "singleUserRegularFallback")
public Single<User> getSingleUserWithRegularFallback(final String id, final String name) {
return getSingleUser(id, name);
}

@HystrixCommand(fallbackMethod = "singleUserRxFallback")
public Single<User> getSingleUserWithRxFallback(final String id, final String name) {
return getSingleUser(id, name);
}

User singleUserRegularFallback(final String id, final String name) {
return new User("default_id", "default_name");
}

Single<User> singleUserRxFallback(final String id, final String name) {
return createObservable("default_id", "default_name").toSingle();
}

@HystrixCommand(fallbackMethod = "regularFallback", observableExecutionMode = ObservableExecutionMode.LAZY)
public Observable<User> getUserRegularFallback(final String id, final String name) {
validate(id, name, "getUser has failed");
Expand Down

0 comments on commit dd1087a

Please sign in to comment.