Skip to content

Commit

Permalink
feat/joinable-task (sofastack#403)
Browse files Browse the repository at this point in the history
* joinable for raft task

* fix: fix the fmt NPE

* code format

* better name

* typo
  • Loading branch information
fengjiachun authored Mar 25, 2020
1 parent 94c2ece commit 74684dc
Show file tree
Hide file tree
Showing 5 changed files with 162 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ interface LastAppliedLogIndexListener {
/**
* Called when log entry committed
*
* @param committedIndex committed log indexx
* @param committedIndex committed log index
*/
boolean onCommitted(final long committedIndex);

Expand Down
4 changes: 2 additions & 2 deletions jraft-core/src/main/java/com/alipay/sofa/jraft/Status.java
Original file line number Diff line number Diff line change
Expand Up @@ -199,14 +199,14 @@ public void setErrorMsg(String errMsg) {
* Set error code and error msg.
*/
public void setError(int code, String fmt, Object... args) {
this.state = new State(code, String.format(fmt, args));
this.state = new State(code, String.format(String.valueOf(fmt), args));
}

/**
* Set raft error and error msg.
*/
public void setError(RaftError error, String fmt, Object... args) {
this.state = new State(error.getNumber(), String.format(fmt, args));
this.state = new State(error.getNumber(), String.format(String.valueOf(fmt), args));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alipay.sofa.jraft.closure;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import com.alipay.sofa.jraft.Closure;
import com.alipay.sofa.jraft.Status;
import com.alipay.sofa.jraft.util.Requires;

/**
* @author jiachun.fjc
*/
public class JoinableClosure implements Closure {

private final CountDownLatch latch = new CountDownLatch(1);
private final Closure closure;

public JoinableClosure(Closure closure) {
this.closure = Requires.requireNonNull(closure, "closure");
}

@Override
public void run(final Status status) {
this.closure.run(status);
latch.countDown();
}

public void join() throws InterruptedException {
this.latch.await();
}

public void join(final long timeoutMillis) throws InterruptedException, TimeoutException {
if (!this.latch.await(timeoutMillis, TimeUnit.MILLISECONDS)) {
throw new TimeoutException("joined timeout");
}
}

public Closure getClosure() {
return closure;
}
}
82 changes: 82 additions & 0 deletions jraft-core/src/main/java/com/alipay/sofa/jraft/entity/Task.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,13 @@

import java.io.Serializable;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import com.alipay.sofa.jraft.Closure;
import com.alipay.sofa.jraft.closure.JoinableClosure;

/**
* Basic message structure of jraft, contains:
Expand Down Expand Up @@ -89,4 +94,81 @@ public long getExpectedTerm() {
public void setExpectedTerm(long expectedTerm) {
this.expectedTerm = expectedTerm;
}

/**
* Waiting for the task to complete, to note that throughput may be reduced,
* which is generally not recommended.
*
* @return done closure
* @throws InterruptedException if the current thread is interrupted while waiting
* @since 1.3.1
*/
public Closure join() throws InterruptedException {
final JoinableClosure joinable = castToJoinalbe(this.done);
joinable.join();
return joinable.getClosure();
}

/**
* Waiting for the task to complete with a timeout millis, to note that throughput
* may be reduced, which is generally not recommended.
*
* @param timeoutMillis the maximum millis to wait
* @return done closure
* @throws InterruptedException if the current thread is interrupted while waiting
* @throws TimeoutException if timeout
* @since 1.3.1
*/
public Closure join(final long timeoutMillis) throws InterruptedException, TimeoutException {
final JoinableClosure joinable = castToJoinalbe(this.done);
joinable.join(timeoutMillis);
return joinable.getClosure();
}

/**
* Waiting for all tasks to complete.
*
* @param tasks task list
* @return the closure list in tasks
* @throws InterruptedException if the current thread is interrupted while waiting
* @since 1.3.1
*/
public static List<Closure> joinAll(final List<Task> tasks) throws InterruptedException {
final List<Closure> closures = new ArrayList<>(tasks.size());
for (final Task t : tasks) {
closures.add(t.join());
}
return closures;
}

/**
* Waiting for all tasks to complete with a timeout millis.
*
* @param tasks task list
* @param timeoutMillis the maximum millis to wait
* @return the closure list in the tasks
* @throws InterruptedException if the current thread is interrupted while waiting
* @throws TimeoutException if timeout
* @since 1.3.1
*/
public static List<Closure> joinAll(final List<Task> tasks, long timeoutMillis) throws InterruptedException,
TimeoutException {
final List<Closure> closures = new ArrayList<>(tasks.size());
for (final Task t : tasks) {
final long start = System.nanoTime();
closures.add(t.join(timeoutMillis));
timeoutMillis -= TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
if (timeoutMillis <= 0) {
throw new TimeoutException("joined timeout");
}
}
return closures;
}

private static JoinableClosure castToJoinalbe(final Closure closure) {
if (closure instanceof JoinableClosure) {
return (JoinableClosure) closure;
}
throw new UnsupportedOperationException("Unsupported join");
}
}
40 changes: 19 additions & 21 deletions jraft-core/src/test/java/com/alipay/sofa/jraft/core/NodeTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,6 @@
*/
package com.alipay.sofa.jraft.core;

import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

import java.io.File;
import java.nio.ByteBuffer;
import java.util.ArrayList;
Expand Down Expand Up @@ -62,6 +52,7 @@
import com.alipay.sofa.jraft.RaftGroupService;
import com.alipay.sofa.jraft.StateMachine;
import com.alipay.sofa.jraft.Status;
import com.alipay.sofa.jraft.closure.JoinableClosure;
import com.alipay.sofa.jraft.closure.ReadIndexClosure;
import com.alipay.sofa.jraft.closure.SynchronizedClosure;
import com.alipay.sofa.jraft.closure.TaskClosure;
Expand Down Expand Up @@ -89,6 +80,16 @@
import com.alipay.sofa.jraft.util.Utils;
import com.codahale.metrics.ConsoleReporter;

import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

public class NodeTest {

static final Logger LOG = LoggerFactory.getLogger(NodeTest.class);
Expand Down Expand Up @@ -180,26 +181,23 @@ public void testNodeTaskOverload() throws Exception {
;
}

final CountDownLatch latch = new CountDownLatch(10);
final List<Task> tasks = new ArrayList<>();
final AtomicInteger c = new AtomicInteger(0);
for (int i = 0; i < 10; i++) {
final ByteBuffer data = ByteBuffer.wrap(("hello" + i).getBytes());
final Task task = new Task(data, status -> {
final Task task = new Task(data, new JoinableClosure(status -> {
System.out.println(status);
try {
if (!status.isOk()) {
assertTrue(
if (!status.isOk()) {
assertTrue(
status.getRaftError() == RaftError.EBUSY || status.getRaftError() == RaftError.EPERM);
}
c.incrementAndGet();
} finally {
latch.countDown();
}
});
c.incrementAndGet();
}));
node.apply(task);
tasks.add(task);
}
try {
waitLatch(latch);
Task.joinAll(tasks, TimeUnit.SECONDS.toMillis(30));
assertEquals(10, c.get());
} finally {
node.shutdown();
Expand Down

0 comments on commit 74684dc

Please sign in to comment.