Skip to content

Commit

Permalink
Create CircuitBreakerWithReset
Browse files Browse the repository at this point in the history
  • Loading branch information
jsjtzyy authored Oct 25, 2021
1 parent cd1c1bf commit 9a28086
Showing 1 changed file with 117 additions and 0 deletions.
117 changes: 117 additions & 0 deletions Circuit Breaker/CircuitBreakerWithReset
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/************************************************************
* More details see https://www.cnblogs.com/throwable/p/13906733.html
*************************************************************/

// 添加一个Monitor用于记录状态变更
public enum CircuitBreakerStatusMonitor {

/**
* 单例
*/
X;

public void report(String name, CircuitBreakerStatus o, CircuitBreakerStatus n) {
System.out.println(String.format("断路器[%s]状态变更,[%s]->[%s]", name, o, n));
}

public void reset(String name) {
System.out.println(String.format("断路器[%s]重置", name));
}
}

@Getter
public class RestCircuitBreaker {

private final long failureThreshold;
private final long resetTimeout;
private LongAdder failureCounter;
private LongAdder callCounter;
private AtomicReference<CircuitBreakerStatus> status;
private final Object fallback = null;

/**
* 最后一次调用失败的时间戳
*/
private long lastFailureTime;

public RestCircuitBreaker(long failureThreshold, long resetTimeout) {
this.failureThreshold = failureThreshold;
this.resetTimeout = resetTimeout;
reset();
}

public void reset() {
CircuitBreakerStatusMonitor.X.reset("RestCircuitBreaker");
this.callCounter = new LongAdder();
this.failureCounter = new LongAdder();
this.status = new AtomicReference<>(CircuitBreakerStatus.CLOSED);
this.lastFailureTime = -1L;
}

@SuppressWarnings("unchecked")
public <T> T call(Supplier<T> supplier) {
try {
if (shouldAllowExecution()) {
T result = supplier.get();
markSuccess();
return result;
}
} catch (Exception e) {
markNoneSuccess();
} finally {
this.callCounter.increment();
}
return (T) fallback;
}

public void call(Runnable runnable) {
call(() -> {
runnable.run();
return null;
});
}

boolean shouldAllowExecution() {
// 本质是Closed状态
if (lastFailureTime == -1L) {
return true;
}
// 没到达阈值
if (failureThreshold > failureCounter.sum()) {
return true;
}
return shouldTryAfterRestTimeoutWindow()
&& changeStatus(CircuitBreakerStatus.OPEN, CircuitBreakerStatus.HALF_OPEN);
}

boolean changeStatus(CircuitBreakerStatus o, CircuitBreakerStatus n) {
boolean r = status.compareAndSet(o, n);
if (r) {
CircuitBreakerStatusMonitor.X.report("RestCircuitBreaker", o, n);
}
return r;
}

boolean shouldTryAfterRestTimeoutWindow() {
long lastFailureTimeSnap = lastFailureTime;
long currentTime = System.currentTimeMillis();
return currentTime > lastFailureTimeSnap + resetTimeout;
}

public void markSuccess() {
if (changeStatus(CircuitBreakerStatus.HALF_OPEN, CircuitBreakerStatus.CLOSED)) {
reset();
}
}

public void markNoneSuccess() {
this.failureCounter.increment();
if (changeStatus(CircuitBreakerStatus.HALF_OPEN, CircuitBreakerStatus.OPEN)) {
this.lastFailureTime = System.currentTimeMillis();
}
if (this.failureCounter.sum() >= failureThreshold &&
changeStatus(CircuitBreakerStatus.CLOSED, CircuitBreakerStatus.OPEN)) {
this.lastFailureTime = System.currentTimeMillis();
}
}
}

0 comments on commit 9a28086

Please sign in to comment.