Skip to content

Commit

Permalink
(feat) counter example (sofastack#318)
Browse files Browse the repository at this point in the history
* (feat) counter example

* Add final to fields

* (feat) catch exception in GetValueRequestProcessor

* (feat)async GetValueRequestProcessor
  • Loading branch information
masaimu authored and fengjiachun committed Nov 11, 2019
1 parent 4016336 commit 4b2bc6d
Show file tree
Hide file tree
Showing 10 changed files with 361 additions and 140 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alipay.sofa.jraft.example.counter;

import com.alipay.sofa.jraft.Closure;
import com.alipay.sofa.jraft.example.counter.rpc.ValueResponse;

/**
* @author likun ([email protected])
*/
public abstract class CounterClosure implements Closure {

private ValueResponse valueResponse;
private CounterOperation counterOperation;

public void setCounterOperation(CounterOperation counterOperation) {
this.counterOperation = counterOperation;
}

public CounterOperation getCounterOperation() {
return counterOperation;
}

public ValueResponse getValueResponse() {
return valueResponse;
}

public void setValueResponse(ValueResponse valueResponse) {
this.valueResponse = valueResponse;
}

protected void failure(final String errorMsg, final String redirect) {
final ValueResponse response = new ValueResponse();
response.setSuccess(false);
response.setErrorMsg(errorMsg);
response.setRedirect(redirect);
setValueResponse(response);
}

protected void success(final long value) {
final ValueResponse response = new ValueResponse();
response.setValue(value);
response.setSuccess(true);
setValueResponse(response);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alipay.sofa.jraft.example.counter;

import java.io.Serializable;

/**
* The counter operation
*
* @author likun ([email protected])
*/
public class CounterOperation implements Serializable {

private static final long serialVersionUID = -6597003954824547294L;

/** Get value */
public static final byte GET = 0x01;
/** Increment and get value */
public static final byte INCREMENT = 0x02;

private byte op;
private long delta;

public static CounterOperation createGet() {
return new CounterOperation(GET);
}

public static CounterOperation createIncrement(final long delta) {
return new CounterOperation(INCREMENT, delta);
}

public CounterOperation(byte op) {
this(op, 0);
}

public CounterOperation(byte op, long delta) {
this.op = op;
this.delta = delta;
}

public byte getOp() {
return op;
}

public long getDelta() {
return delta;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,9 @@ public CounterServer(final String dataPath, final String groupId, final PeerId s
final RpcServer rpcServer = new RpcServer(serverId.getPort());
RaftRpcServerFactory.addRaftRequestProcessors(rpcServer);
// 注册业务处理器
rpcServer.registerUserProcessor(new GetValueRequestProcessor(this));
rpcServer.registerUserProcessor(new IncrementAndGetRequestProcessor(this));
CounterService counterService = new CounterServiceImpl(this);
rpcServer.registerUserProcessor(new GetValueRequestProcessor(counterService));
rpcServer.registerUserProcessor(new IncrementAndGetRequestProcessor(counterService));
// 初始化状态机
this.fsm = new CounterStateMachine();
// 设置状态机到启动参数
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alipay.sofa.jraft.example.counter;

/**
* The counter service supporting query and count function.
*
* @author likun ([email protected])
*/
public interface CounterService {

/**
* Get current value from counter
*
* Provide consistent reading if {@code readOnlySafe} is true.
*/
void get(final boolean readOnlySafe, final CounterClosure closure);

/**
* Add delta to counter then get value
*/
void incrementAndGet(final long delta, final CounterClosure closure);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alipay.sofa.jraft.example.counter;

import java.nio.ByteBuffer;
import java.util.concurrent.Executor;

import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.alipay.remoting.exception.CodecException;
import com.alipay.remoting.serialization.SerializerManager;
import com.alipay.sofa.jraft.Status;
import com.alipay.sofa.jraft.closure.ReadIndexClosure;
import com.alipay.sofa.jraft.entity.Task;
import com.alipay.sofa.jraft.error.RaftError;
import com.alipay.sofa.jraft.rhea.StoreEngineHelper;
import com.alipay.sofa.jraft.rhea.options.StoreEngineOptions;
import com.alipay.sofa.jraft.util.BytesUtil;

/**
* @author likun ([email protected])
*/
public class CounterServiceImpl implements CounterService {
private static final Logger LOG = LoggerFactory.getLogger(CounterServiceImpl.class);

private final CounterServer counterServer;
private final Executor readIndexExecutor;

public CounterServiceImpl(CounterServer counterServer) {
this.counterServer = counterServer;
this.readIndexExecutor = createReadIndexExecutor();
}

private Executor createReadIndexExecutor() {
final StoreEngineOptions opts = new StoreEngineOptions();
return StoreEngineHelper.createReadIndexExecutor(opts.getReadIndexCoreThreads());
}

@Override
public void get(final boolean readOnlySafe, final CounterClosure closure) {
if(!readOnlySafe){
closure.success(getValue());
closure.run(Status.OK());
return;
}

this.counterServer.getNode().readIndex(BytesUtil.EMPTY_BYTES, new ReadIndexClosure() {
@Override
public void run(Status status, long index, byte[] reqCtx) {
if(status.isOk()){
closure.success(getValue());
closure.run(Status.OK());
return;
}
CounterServiceImpl.this.readIndexExecutor.execute(() -> {
if(isLeader()){
LOG.debug("Fail to get value with 'ReadIndex': {}, try to applying to the state machine.", status);
applyOperation(CounterOperation.createGet(), closure);
}else {
handlerNotLeaderError(closure);
}
});
}
});
}

private boolean isLeader() {
return this.counterServer.getFsm().isLeader();
}

private long getValue() {
return this.counterServer.getFsm().getValue();
}

private String getRedirect() {
return this.counterServer.redirect().getRedirect();
}

@Override
public void incrementAndGet(final long delta, final CounterClosure closure) {
applyOperation(CounterOperation.createIncrement(delta), closure);
}

private void applyOperation(final CounterOperation op, final CounterClosure closure) {
if (!isLeader()) {
handlerNotLeaderError(closure);
return;
}

try {
closure.setCounterOperation(op);
final Task task = new Task();
task.setData(ByteBuffer.wrap(SerializerManager.getSerializer(SerializerManager.Hessian2).serialize(op)));
task.setDone(closure);
this.counterServer.getNode().apply(task);
} catch (CodecException e) {
String errorMsg = "Fail to encode CounterOperation";
LOG.error(errorMsg, e);
closure.failure(errorMsg, StringUtils.EMPTY);
closure.run(new Status(RaftError.EINTERNAL, errorMsg));
}
}

private void handlerNotLeaderError(final CounterClosure closure) {
closure.failure("Not leader.", getRedirect());
closure.run(new Status(RaftError.EPERM, "Not leader"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,14 @@
import com.alipay.sofa.jraft.core.StateMachineAdapter;
import com.alipay.sofa.jraft.error.RaftError;
import com.alipay.sofa.jraft.error.RaftException;
import com.alipay.sofa.jraft.example.counter.rpc.IncrementAndGetRequest;
import com.alipay.sofa.jraft.example.counter.snapshot.CounterSnapshotFile;
import com.alipay.sofa.jraft.storage.snapshot.SnapshotReader;
import com.alipay.sofa.jraft.storage.snapshot.SnapshotWriter;
import com.alipay.sofa.jraft.util.Utils;

import static com.alipay.sofa.jraft.example.counter.CounterOperation.GET;
import static com.alipay.sofa.jraft.example.counter.CounterOperation.INCREMENT;

/**
* Counter state machine.
*
Expand Down Expand Up @@ -72,32 +74,43 @@ public long getValue() {
@Override
public void onApply(final Iterator iter) {
while (iter.hasNext()) {
long delta = 0;
long current = 0;
CounterOperation counterOperation = null;

IncrementAndAddClosure closure = null;
CounterClosure closure = null;
if (iter.done() != null) {
// This task is applied by this node, get value from closure to avoid additional parsing.
closure = (IncrementAndAddClosure) iter.done();
delta = closure.getRequest().getDelta();
closure = (CounterClosure) iter.done();
counterOperation = closure.getCounterOperation();
} else {
// Have to parse FetchAddRequest from this user log.
final ByteBuffer data = iter.getData();
try {
final IncrementAndGetRequest request = SerializerManager.getSerializer(SerializerManager.Hessian2)
.deserialize(data.array(), IncrementAndGetRequest.class.getName());
delta = request.getDelta();
counterOperation = SerializerManager.getSerializer(SerializerManager.Hessian2).deserialize(
data.array(), CounterOperation.class.getName());
} catch (final CodecException e) {
LOG.error("Fail to decode IncrementAndGetRequest", e);
}
}
final long prev = this.value.get();
final long updated = value.addAndGet(delta);
if (closure != null) {
closure.getResponse().setValue(updated);
closure.getResponse().setSuccess(true);
closure.run(Status.OK());
if (counterOperation != null) {
switch (counterOperation.getOp()) {
case GET:
current = this.value.get();
LOG.info("Get value={} at logIndex={}", current, iter.getIndex());
break;
case INCREMENT:
final long delta = counterOperation.getDelta();
final long prev = this.value.get();
current = value.addAndGet(delta);
LOG.info("Added value={} by delta={} at logIndex={}", prev, delta, iter.getIndex());
break;
}

if (closure != null) {
closure.success(current);
closure.run(Status.OK());
}
}
LOG.info("Added value={} by delta={} at logIndex={}", prev, delta, iter.getIndex());
iter.next();
}
}
Expand Down
Loading

0 comments on commit 4b2bc6d

Please sign in to comment.