Skip to content

Commit

Permalink
feat/rpc-adapter (sofastack#402)
Browse files Browse the repository at this point in the history
* [rpc-adapter] add rpc interface

* [rpc-adapter] add rpc interface

* [rpc-adapter] clear direct dependencies of bolt

* [rpc-adapter] add grpc impl [WIP]
  • Loading branch information
fengjiachun authored Mar 30, 2020
1 parent 6b66aa6 commit 2d04955
Show file tree
Hide file tree
Showing 107 changed files with 1,993 additions and 769 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.alipay.remoting.rpc.RpcServer;
import com.alipay.sofa.jraft.entity.PeerId;
import com.alipay.sofa.jraft.option.NodeOptions;
import com.alipay.sofa.jraft.option.RpcOptions;
import com.alipay.sofa.jraft.rpc.ProtobufMsgFactory;
import com.alipay.sofa.jraft.rpc.RaftRpcServerFactory;
import com.alipay.sofa.jraft.rpc.RpcServer;
import com.alipay.sofa.jraft.util.Endpoint;
import com.alipay.sofa.jraft.util.Utils;

Expand Down Expand Up @@ -128,7 +128,7 @@ public synchronized Node start(final boolean startRpcServer) {

this.node = RaftServiceFactory.createAndInitRaftNode(this.groupId, this.serverId, this.nodeOptions);
if (startRpcServer) {
this.rpcServer.startup();
this.rpcServer.init(null);
} else {
LOG.warn("RPC server is not started in RaftGroupService.");
}
Expand Down Expand Up @@ -249,7 +249,7 @@ public void setRpcServer(final RpcServer rpcServer) {
if (this.serverId == null) {
throw new IllegalStateException("Please set serverId at first");
}
if (rpcServer.port() != this.serverId.getPort()) {
if (rpcServer.boundPort() != this.serverId.getPort()) {
throw new IllegalArgumentException("RPC server port mismatch");
}
this.rpcServer = rpcServer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@
import com.alipay.sofa.jraft.rpc.CliRequests.SnapshotRequest;
import com.alipay.sofa.jraft.rpc.CliRequests.TransferLeaderRequest;
import com.alipay.sofa.jraft.rpc.RpcRequests.ErrorResponse;
import com.alipay.sofa.jraft.rpc.impl.cli.BoltCliClientService;
import com.alipay.sofa.jraft.rpc.impl.cli.CliClientServiceImpl;
import com.alipay.sofa.jraft.util.Requires;
import com.alipay.sofa.jraft.util.Utils;
import com.google.protobuf.Message;
Expand All @@ -67,8 +67,7 @@
* Cli service implementation.
*
* @author boyan ([email protected])
*
* 2018-Apr-09 4:12:06 PM
* @author jiachun.fjc
*/
public class CliServiceImpl implements CliService {

Expand All @@ -85,7 +84,7 @@ public synchronized boolean init(final CliOptions opts) {
return true;
}
this.cliOptions = opts;
this.cliClientService = new BoltCliClientService();
this.cliClientService = new CliClientServiceImpl();
return this.cliClientService.init(this.cliOptions);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alipay.sofa.jraft.error;

/**
* @author jiachun.fjc
*/
public class InvokeTimeoutException extends RemotingException {

private static final long serialVersionUID = -4710810309766380565L;

public InvokeTimeoutException() {
}

public InvokeTimeoutException(String message) {
super(message);
}

public InvokeTimeoutException(String message, Throwable cause) {
super(message, cause);
}

public InvokeTimeoutException(Throwable cause) {
super(cause);
}

public InvokeTimeoutException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
super(message, cause, enableSuppression, writableStackTrace);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alipay.sofa.jraft.error;

/**
* Exception for default remoting problems.
*
* @author jiachun.fjc
*/
public class RemotingException extends Exception {

private static final long serialVersionUID = -6326244159775972292L;

public RemotingException() {
}

public RemotingException(String message) {
super(message);
}

public RemotingException(String message, Throwable cause) {
super(message, cause);
}

public RemotingException(Throwable cause) {
super(cause);
}

public RemotingException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
super(message, cause, enableSuppression, writableStackTrace);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,23 +38,23 @@ public interface ClientService extends Lifecycle<RpcOptions> {
* @param endpoint server address
* @return true on connect success
*/
boolean connect(Endpoint endpoint);
boolean connect(final Endpoint endpoint);

/**
* Disconnect from endpoint.
*
* @param endpoint server address
* @return true on disconnect success
*/
boolean disconnect(Endpoint endpoint);
boolean disconnect(final Endpoint endpoint);

/**
* Returns true when the endpoint's connection is active.
*
* @param endpoint server address
* @return true on connection is active
*/
boolean isConnected(Endpoint endpoint);
boolean isConnected(final Endpoint endpoint);

/**
* Send a requests and waits for response with callback, returns the request future.
Expand All @@ -65,6 +65,6 @@ public interface ClientService extends Lifecycle<RpcOptions> {
* @param timeoutMs timeout millis
* @return a future with operation result
*/
<T extends Message> Future<Message> invokeWithDone(Endpoint endpoint, Message request, RpcResponseClosure<T> done,
int timeoutMs);
<T extends Message> Future<Message> invokeWithDone(final Endpoint endpoint, final Message request,
final RpcResponseClosure<T> done, final int timeoutMs);
}
44 changes: 44 additions & 0 deletions jraft-core/src/main/java/com/alipay/sofa/jraft/rpc/Connection.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alipay.sofa.jraft.rpc;

/**
* @author jiachun.fjc
*/
public interface Connection {

/**
* Get the attribute that bound to the connection.
*
* @param key the attribute key
* @return the attribute value
*/
Object getAttribute(final String key);

/**
* Set the attribute to the connection.
*
* @param key the attribute key
* @param value the attribute value
*/
void setAttribute(final String key, final Object value);

/**
* Close the connection.
*/
void close();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alipay.sofa.jraft.rpc;

import java.util.concurrent.Executor;

/**
* @author jiachun.fjc
*/
public interface InvokeCallback {

void complete(final Object result, final Throwable err);

Executor executor();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alipay.sofa.jraft.rpc;

import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

/**
* RPC invoke context.
*
* @author jiachun.fjc
*/
public class InvokeContext {

public final static String CRC_SWITCH = "invoke.crc.switch";

private final ConcurrentMap<String, Object> ctx = new ConcurrentHashMap<>();

public Object put(final String key, final Object value) {
return this.ctx.put(key, value);
}

@SuppressWarnings("unchecked")
public <T> T get(final String key) {
return (T) this.ctx.get(key);
}

@SuppressWarnings("unchecked")
public <T> T getOrDefault(final String key, final T defaultValue) {
return (T) this.ctx.getOrDefault(key, defaultValue);
}

public void clear() {
this.ctx.clear();
}

public Set<Map.Entry<String, Object>> entrySet() {
return this.ctx.entrySet();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@

import org.apache.commons.lang.SerializationException;

import com.alipay.remoting.CustomSerializerManager;
import com.alipay.sofa.jraft.error.MessageClassNotFoundException;
import com.alipay.sofa.jraft.storage.io.ProtoBufFile;
import com.alipay.sofa.jraft.util.RpcFactoryHelper;
import com.google.protobuf.DescriptorProtos.FileDescriptorProto;
import com.google.protobuf.DescriptorProtos.FileDescriptorSet;
import com.google.protobuf.Descriptors.Descriptor;
Expand All @@ -50,31 +50,30 @@ public class ProtobufMsgFactory {

static {
try {
FileDescriptorSet descriptorSet = FileDescriptorSet.parseFrom(ProtoBufFile.class
final FileDescriptorSet descriptorSet = FileDescriptorSet.parseFrom(ProtoBufFile.class
.getResourceAsStream("/raft.desc"));
List<FileDescriptor> resolveFDs = new ArrayList<>();
for (FileDescriptorProto fdp : descriptorSet.getFileList()) {
final List<FileDescriptor> resolveFDs = new ArrayList<>();
for (final FileDescriptorProto fdp : descriptorSet.getFileList()) {

FileDescriptor[] dependencies = new FileDescriptor[resolveFDs.size()];
final FileDescriptor[] dependencies = new FileDescriptor[resolveFDs.size()];
resolveFDs.toArray(dependencies);

FileDescriptor fd = FileDescriptor.buildFrom(fdp, dependencies);
final FileDescriptor fd = FileDescriptor.buildFrom(fdp, dependencies);
resolveFDs.add(fd);
for (Descriptor descriptor : fd.getMessageTypes()) {
for (final Descriptor descriptor : fd.getMessageTypes()) {

String className = fdp.getOptions().getJavaPackage() + "."
+ fdp.getOptions().getJavaOuterClassname() + "$" + descriptor.getName();
Class<?> clazz = Class.forName(className);
MethodHandle methodHandle = MethodHandles.lookup().findStatic(clazz, "parseFrom",
final String className = fdp.getOptions().getJavaPackage() + "."
+ fdp.getOptions().getJavaOuterClassname() + "$" + descriptor.getName();
final Class<?> clazz = Class.forName(className);
final MethodHandle methodHandle = MethodHandles.lookup().findStatic(clazz, "parseFrom",
methodType(clazz, byte[].class));
PARSE_METHODS_4PROTO.put(descriptor.getFullName(), methodHandle);
PARSE_METHODS_4J.put(className, methodHandle);
CustomSerializerManager.registerCustomSerializer(className, ProtobufSerializer.INSTANCE);
RpcFactoryHelper.getRpcFactory().registerProtobufSerializer(className);
}

}

} catch (Exception e) {
} catch (final Exception e) {
e.printStackTrace(); // NOPMD
}
}
Expand All @@ -86,8 +85,8 @@ public static void load() {
}

@SuppressWarnings("unchecked")
public static <T extends Message> T newMessageByJavaClassName(String className, byte[] bs) {
MethodHandle handle = PARSE_METHODS_4J.get(className);
public static <T extends Message> T newMessageByJavaClassName(final String className, final byte[] bs) {
final MethodHandle handle = PARSE_METHODS_4J.get(className);
if (handle == null) {
throw new MessageClassNotFoundException(className + " not found");
}
Expand All @@ -99,8 +98,8 @@ public static <T extends Message> T newMessageByJavaClassName(String className,
}

@SuppressWarnings("unchecked")
public static <T extends Message> T newMessageByProtoClassName(String className, byte[] bs) {
MethodHandle handle = PARSE_METHODS_4PROTO.get(className);
public static <T extends Message> T newMessageByProtoClassName(final String className, final byte[] bs) {
final MethodHandle handle = PARSE_METHODS_4PROTO.get(className);
if (handle == null) {
throw new MessageClassNotFoundException(className + " not found");
}
Expand Down
Loading

0 comments on commit 2d04955

Please sign in to comment.