Skip to content

Commit

Permalink
优化 HttpCmdServer , 保证一个JVM 一个 HttpCmdServer,节约资源
Browse files Browse the repository at this point in the history
  • Loading branch information
qq254963746 committed Mar 5, 2016
1 parent e0193d3 commit f00db84
Show file tree
Hide file tree
Showing 39 changed files with 193 additions and 98 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,8 @@ public RestfulResponse loadJob(JobQueueRequest request) {
boolean success = false;
HttpCmdResponse cmdResponse = null;
for (Node node : jobTrackerNodeList) {
cmdResponse = HttpCmdClient.execute(node.getIp(), node.getCommandPort(), httpCmd);
httpCmd.setNodeIdentity(node.getIdentity());
cmdResponse = HttpCmdClient.execute(node.getIp(), node.getHttpCmdPort(), httpCmd);
if (cmdResponse.isSuccess()) {
success = true;
break;
Expand All @@ -276,7 +277,7 @@ public RestfulResponse jobAdd(JobQueueRequest request) {
try {
Assert.hasLength(request.getTaskId(), "taskId不能为空!");
Assert.hasLength(request.getTaskTrackerNodeGroup(), "taskTrackerNodeGroup不能为空!");
if(request.getNeedFeedback()){
if (request.getNeedFeedback()) {
Assert.hasLength(request.getSubmitNodeGroup(), "submitNodeGroup不能为空!");
}

Expand Down Expand Up @@ -345,7 +346,8 @@ private KVPair<Boolean, String> addJob(JobQueueRequest request) {

HttpCmdResponse response = null;
for (Node node : jobTrackerNodeList) {
response = HttpCmdClient.execute(node.getIp(), node.getCommandPort(), httpCmd);
httpCmd.setNodeIdentity(node.getIdentity());
response = HttpCmdClient.execute(node.getIp(), node.getHttpCmdPort(), httpCmd);
if (response.isSuccess()) {
return new KVPair<Boolean, String>(true, "Add success");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import com.lts.core.commons.utils.StringUtils;
import com.lts.core.json.JSONFactory;
import com.lts.core.logger.LoggerFactory;
import com.lts.core.spi.SpiKey;
import com.lts.core.spi.SpiExtensionKey;
import com.lts.web.support.AppConfigurer;
import org.apache.log4j.PropertyConfigurator;

Expand All @@ -24,12 +24,12 @@ public void contextInitialized(ServletContextEvent servletContextEvent) {
}
AppConfigurer.load(confPath);

String jsonAdapter = AppConfigurer.getProperty("configs." + SpiKey.LTS_JSON);
String jsonAdapter = AppConfigurer.getProperty("configs." + SpiExtensionKey.LTS_JSON);
if (StringUtils.isNotEmpty(jsonAdapter)) {
JSONFactory.setJSONAdapter(jsonAdapter);
}

String loggerAdapter = AppConfigurer.getProperty("configs." + SpiKey.LTS_LOGGER);
String loggerAdapter = AppConfigurer.getProperty("configs." + SpiExtensionKey.LTS_LOGGER);
if (StringUtils.isNotEmpty(loggerAdapter)) {
LoggerFactory.setLoggerAdapter(loggerAdapter);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public class NodeMemoryDatabase extends MemoryDatabase {
" createTime, " +
" threads," +
" hostName," +
" commandPort)" +
" httpCmdPort)" +
" VALUES (?,?,?,?,?,?,?,?,?,?,?)";

private String deleteSQL = "DELETE FROM lts_node where identity = ?";
Expand All @@ -56,7 +56,7 @@ private void createTable() {
" `createTime` bigint(20) DEFAULT NULL," +
" `threads` int(11) DEFAULT NULL," +
" `hostName` varchar(64) DEFAULT NULL," +
" `commandPort` int(11) DEFAULT NULL," +
" `httpCmdPort` int(11) DEFAULT NULL," +
" PRIMARY KEY (`identity`)" +
")";

Expand Down Expand Up @@ -88,7 +88,7 @@ public void addNode(List<Node> nodes) {
node.getCreateTime(),
node.getThreads(),
node.getHostName(),
node.getCommandPort()
node.getHttpCmdPort()
);
} catch (Exception e) {
LOGGER.error("Insert {} error!", node, e);
Expand Down Expand Up @@ -149,7 +149,7 @@ public List<Node> handle(ResultSet rs) throws SQLException {
node.setThreads(rs.getInt("threads"));
node.setAvailable(rs.getInt("available") == 1);
node.setHostName(rs.getString("hostName"));
node.setCommandPort(rs.getInt("commandPort"));
node.setHttpCmdPort(rs.getInt("httpCmdPort"));
nodes.add(node);
}
return nodes;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@
<td>{{row.ip | format:'ipLabel',row}}</td>
<td>{{row.threads | format:'threadLabel',row}}</td>
<td>{{row.available | format:'availableLabel'}}</td>
<td>{{row.commandPort }}</td>
<td>{{row.httpCmdPort }}</td>
<td>{{row.opt | format:'optFormat',row}}</td>
</tr>
{{/each}}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import com.lts.core.commons.utils.GenericsUtils;
import com.lts.core.commons.utils.NetUtils;
import com.lts.core.commons.utils.StringUtils;
import com.lts.core.constant.EcTopic;
import com.lts.core.factory.JobNodeConfigFactory;
import com.lts.core.factory.NodeFactory;
import com.lts.core.json.JSONFactory;
Expand All @@ -18,9 +17,8 @@
import com.lts.core.protocol.command.CommandBodyWrapper;
import com.lts.core.registry.*;
import com.lts.core.spi.ServiceLoader;
import com.lts.core.spi.SpiKey;
import com.lts.core.spi.SpiExtensionKey;
import com.lts.ec.EventCenter;
import com.lts.ec.EventInfo;
import com.lts.remoting.serialize.AdaptiveSerializable;

import java.util.ArrayList;
Expand Down Expand Up @@ -141,13 +139,13 @@ protected void initConfig() {

private void setSpiConfig() {
// 设置默认序列化方式
String defaultSerializable = config.getParameter(SpiKey.REMOTING_SERIALIZABLE_DFT);
String defaultSerializable = config.getParameter(SpiExtensionKey.REMOTING_SERIALIZABLE_DFT);
if (StringUtils.isNotEmpty(defaultSerializable)) {
AdaptiveSerializable.setDefaultSerializable(defaultSerializable);
}

// 设置json
String ltsJson = config.getParameter(SpiKey.LTS_JSON);
String ltsJson = config.getParameter(SpiExtensionKey.LTS_JSON);
if (StringUtils.isNotEmpty(ltsJson)) {
JSONFactory.setJSONAdapter(ltsJson);
}
Expand Down
10 changes: 5 additions & 5 deletions lts-core/src/main/java/com/lts/core/cluster/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public class Node {
// 唯一标识
private String identity;
// 命令端口
private Integer commandPort;
private Integer httpCmdPort;

// 自己关注的节点类型
private List<NodeType> listenNodeTypes;
Expand All @@ -44,12 +44,12 @@ public void setJob(Job job) {
this.job = job;
}

public Integer getCommandPort() {
return commandPort;
public Integer getHttpCmdPort() {
return httpCmdPort;
}

public void setCommandPort(Integer commandPort) {
this.commandPort = commandPort;
public void setHttpCmdPort(Integer httpCmdPort) {
this.httpCmdPort = httpCmdPort;
}

public String getHostName() {
Expand Down
6 changes: 5 additions & 1 deletion lts-core/src/main/java/com/lts/core/cmd/HttpCmdClient.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.lts.core.cmd;

import com.lts.core.commons.utils.Assert;
import com.lts.core.commons.utils.CollectionUtils;

import java.net.URLEncoder;
Expand All @@ -15,8 +16,11 @@ public class HttpCmdClient {
*/
public static <Resp extends HttpCmdResponse> Resp execute(String ip, int port, HttpCmd<Resp> cmd) {

Assert.hasText(cmd.getNodeIdentity(), "nodeIdentity can't be empty");
Assert.hasText(cmd.getCommand(), "command can't be empty");

StringBuilder sb = new StringBuilder();
sb.append("http://").append(ip).append(":").append(port).append("/").append(cmd.getCommand());
sb.append("http://").append(ip).append(":").append(port).append("/").append(cmd.getNodeIdentity()).append("/").append(cmd.getCommand());

try {
Map<String, String> params = cmd.getParams();
Expand Down
36 changes: 28 additions & 8 deletions lts-core/src/main/java/com/lts/core/cmd/HttpCmdContext.java
Original file line number Diff line number Diff line change
@@ -1,32 +1,52 @@
package com.lts.core.cmd;

import com.lts.core.commons.utils.StringUtils;
import com.lts.core.commons.utils.Assert;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantLock;

/**
* @author Robert HG ([email protected]) on 2/17/16.
*/
public class HttpCmdContext {

private final Map<String, HttpCmdProcessor> processorMap = new HashMap<String, HttpCmdProcessor>();
private ReentrantLock lock = new ReentrantLock();
private final Map<String/*节点标识*/, Map<String/*cmd*/, HttpCmdProcessor>>
NODE_PROCESSOR_MAP = new HashMap<String, Map<String, HttpCmdProcessor>>();

public void addCmdProcessor(HttpCmdProcessor processor) {
if (processor == null) {
throw new IllegalArgumentException("processor can not be null");
}

String command = processor.getCommand();
String identity = processor.nodeIdentity();
Assert.hasText(identity, "nodeIdentity can't be empty");

if (StringUtils.isEmpty(command)) {
throw new IllegalArgumentException("processor.command can not be null");
String command = processor.getCommand();
Assert.hasText(command, "command can't be empty");

Map<String, HttpCmdProcessor> cmdProcessorMap = NODE_PROCESSOR_MAP.get(identity);
if (cmdProcessorMap == null) {
lock.lock();
if (cmdProcessorMap == null) {
cmdProcessorMap = new ConcurrentHashMap<String, HttpCmdProcessor>();
NODE_PROCESSOR_MAP.put(identity, cmdProcessorMap);
}
lock.unlock();
}
processorMap.put(command, processor);
cmdProcessorMap.put(command, processor);
}

public HttpCmdProcessor getCmdProcessor(String command) {
return processorMap.get(command);
public HttpCmdProcessor getCmdProcessor(String nodeIdentity, String command) {
Assert.hasText(nodeIdentity, "nodeIdentity can't be empty");

Map<String, HttpCmdProcessor> cmdProcessorMap = NODE_PROCESSOR_MAP.get(nodeIdentity);
if (cmdProcessorMap == null) {
return null;
}
return cmdProcessorMap.get(command);
}

}
7 changes: 6 additions & 1 deletion lts-core/src/main/java/com/lts/core/cmd/HttpCmdExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,12 @@ public void run() {
return;
}

HttpCmdProcessor httpCmdProcessor = context.getCmdProcessor(request.getCommand());
if (StringUtils.isEmpty(request.getNodeIdentity())) {
response = HttpCmdResponse.newResponse(false, "nodeIdentity is blank");
return;
}

HttpCmdProcessor httpCmdProcessor = context.getCmdProcessor(request.getNodeIdentity(), request.getCommand());

if (httpCmdProcessor != null) {
response = httpCmdProcessor.execute(request);
Expand Down
2 changes: 2 additions & 0 deletions lts-core/src/main/java/com/lts/core/cmd/HttpCmdProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
*/
public interface HttpCmdProcessor {

String nodeIdentity();

String getCommand();

HttpCmdResponse execute(HttpCmdRequest request) throws Exception;
Expand Down
24 changes: 20 additions & 4 deletions lts-core/src/main/java/com/lts/core/cmd/HttpCmdRequest.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
public class HttpCmdRequest {

private String command;
private String nodeIdentity;

private Map<String, String> params;

Expand All @@ -23,6 +24,18 @@ public void setCommand(String command) {
this.command = command;
}

public String getNodeIdentity() {
return nodeIdentity;
}

public void setNodeIdentity(String nodeIdentity) {
this.nodeIdentity = nodeIdentity;
}

public void setParams(Map<String, String> params) {
this.params = params;
}

public String getParam(String key) {
if (params != null) {
return params.get(key);
Expand Down Expand Up @@ -53,7 +66,7 @@ public Map<String, String> getParams() {
}

/**
* GET /xxxCommand?xxx=yyyyy HTTP/1.1
* GET /nodeIdentity/xxxCommand?xxx=yyyyy HTTP/1.1
*/
protected static HttpCmdRequest parse(String url) throws Exception {

Expand All @@ -65,8 +78,12 @@ protected static HttpCmdRequest parse(String url) throws Exception {
int start = url.indexOf('/');
int ask = url.indexOf('?') == -1 ? url.lastIndexOf(' ') : url.indexOf('?');
int space = url.lastIndexOf(' ');
String target = url.substring(start != -1 ? start + 1 : 0, ask != -1 ? ask : url.length());
request.setCommand(target);
String path = url.substring(start != -1 ? start + 1 : 0, ask != -1 ? ask : url.length());
String nodeIdentity = path.substring(0, path.indexOf('/'));
String command = path.substring(path.indexOf('/') + 1, path.length());
request.setCommand(command);
request.setNodeIdentity(nodeIdentity);

if (ask == -1 || ask == space) {
return request;
}
Expand All @@ -89,6 +106,5 @@ protected static HttpCmdRequest parse(String url) throws Exception {
request.addParam(key, value);
}
return request;

}
}
24 changes: 23 additions & 1 deletion lts-core/src/main/java/com/lts/core/cmd/HttpCmdServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public class HttpCmdServer {
private int port;
private HttpCmdContext context;

public HttpCmdServer(Config config) {
protected HttpCmdServer(Config config) {
this.port = config.getParameter("lts.command.port", 8719);
this.context = new HttpCmdContext();
}
Expand Down Expand Up @@ -71,4 +71,26 @@ public void registerCommand(HttpCmdProcessor processor) {
context.addCmdProcessor(processor);
}

/**
* 保证一个jvm公用一个 HttpCmdServer
*/
public static class Factory {

private static HttpCmdServer httpCmdServer;

public static HttpCmdServer getHttpCmdServer(Config config) {
if (httpCmdServer != null) {
return httpCmdServer;
}
synchronized (Factory.class) {
if (httpCmdServer != null) {
return httpCmdServer;
}
httpCmdServer = new HttpCmdServer(config);
return httpCmdServer;
}
}

}

}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.lts.core.exception;

/**
* Created by hugui.hg on 3/2/16.
* @author Robert HG ([email protected]) on 3/2/16.
*/
public class LtsRuntimeException extends RuntimeException {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@

import com.lts.core.cluster.Config;
import com.lts.core.spi.SPI;
import com.lts.core.spi.SpiKey;
import com.lts.core.spi.SpiExtensionKey;

/**
* Robert HG ([email protected]) on 5/21/15.
*/
@SPI(key = SpiKey.FAIL_STORE, dftValue = "leveldb")
@SPI(key = SpiExtensionKey.FAIL_STORE, dftValue = "leveldb")
public interface FailStoreFactory {

public FailStore getFailStore(Config config, String storePath);
Expand Down
4 changes: 2 additions & 2 deletions lts-core/src/main/java/com/lts/core/json/JSONAdapter.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.lts.core.json;

import com.lts.core.spi.SpiKey;
import com.lts.core.spi.SpiExtensionKey;
import com.lts.core.spi.SPI;

import java.lang.reflect.Type;
Expand All @@ -10,7 +10,7 @@
/**
* @author Robert HG ([email protected]) on 11/19/15.
*/
@SPI(key = SpiKey.LTS_JSON, dftValue = "fastjson")
@SPI(key = SpiExtensionKey.LTS_JSON, dftValue = "fastjson")
public interface JSONAdapter {

public String getName();
Expand Down
Loading

0 comments on commit f00db84

Please sign in to comment.