Skip to content

Commit

Permalink
JobTracker限流处理
Browse files Browse the repository at this point in the history
  • Loading branch information
qq254963746 committed Mar 4, 2016
1 parent 44661c7 commit e0193d3
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 22 deletions.
9 changes: 5 additions & 4 deletions lts-jobclient/src/main/java/com/lts/jobclient/JobClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@

import com.lts.core.AppContext;
import com.lts.core.cluster.AbstractClientNode;
import com.lts.core.commons.utils.*;
import com.lts.core.commons.utils.Assert;
import com.lts.core.commons.utils.BatchUtils;
import com.lts.core.commons.utils.CollectionUtils;
import com.lts.core.commons.utils.StringUtils;
import com.lts.core.constant.Constants;
import com.lts.core.domain.Job;
import com.lts.core.exception.JobSubmitException;
Expand Down Expand Up @@ -51,9 +54,7 @@ public class JobClient<T extends JobClientNode, Context extends AppContext> exte
@Override
protected void beforeStart() {
appContext.setRemotingClient(remotingClient);
int concurrentSize = config.getParameter(Constants.JOB_SUBMIT_CONCURRENCY_SIZE,
Constants.DEFAULT_JOB_SUBMIT_CONCURRENCY_SIZE);
protector = new JobSubmitProtector(concurrentSize);
protector = new JobSubmitProtector(appContext);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.lts.core.constant.Constants;
import com.lts.core.domain.Job;
import com.lts.core.exception.JobSubmitException;
import com.lts.jobclient.domain.JobClientAppContext;
import com.lts.jobclient.domain.Response;

import java.util.List;
Expand All @@ -19,32 +20,32 @@ public class JobSubmitProtector {
private int concurrentSize = Constants.AVAILABLE_PROCESSOR * 4;
// 用信号量进行过载保护
private Semaphore semaphore;
private int timeout = 500;
private int acquireTimeout = 500;
private String errorMsg;

public JobSubmitProtector() {
errorMsg = "the concurrent size is " + concurrentSize +
public JobSubmitProtector(JobClientAppContext appContext) {

int concurrentSize = appContext.getConfig().getParameter(Constants.JOB_SUBMIT_CONCURRENCY_SIZE,
Constants.DEFAULT_JOB_SUBMIT_CONCURRENCY_SIZE);

this.errorMsg = "the concurrent size is " + concurrentSize +
" , submit too fast , use " + Constants.JOB_SUBMIT_CONCURRENCY_SIZE +
" can change the concurrent size .";
}
this.acquireTimeout = appContext.getConfig().getParameter("job.submit.lock.acquire.timeout", 500);

public JobSubmitProtector(int concurrentSize) {
this();
if (concurrentSize > 0) {
this.concurrentSize = concurrentSize;
}
semaphore = new Semaphore(this.concurrentSize);
this.semaphore = new Semaphore(this.concurrentSize);
}

public Response execute(final List<Job> jobs, final JobSubmitExecutor<Response> jobSubmitExecutor) throws JobSubmitException {
boolean acquire = false;
try {
try {
acquire = semaphore.tryAcquire(timeout, TimeUnit.MILLISECONDS);
if (!acquire) {
throw new JobSubmitProtectException(concurrentSize, errorMsg);
}
} catch (InterruptedException e) {
acquire = semaphore.tryAcquire(acquireTimeout, TimeUnit.MILLISECONDS);
} catch (InterruptedException ignored) {}
if (!acquire) {
throw new JobSubmitProtectException(concurrentSize, errorMsg);
}
return jobSubmitExecutor.execute(jobs);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package com.lts.jobtracker.processor;

import com.lts.core.cluster.NodeType;
import com.lts.core.logger.Logger;
import com.lts.core.logger.LoggerFactory;
import com.lts.core.protocol.JobProtos;
import com.lts.core.protocol.command.AbstractRemotingCommandBody;
import com.lts.jobtracker.channel.ChannelWrapper;
Expand All @@ -13,6 +15,8 @@

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

import static com.lts.core.protocol.JobProtos.RequestCode;

Expand All @@ -23,6 +27,10 @@
public class RemotingDispatcher extends AbstractRemotingProcessor {

private final Map<RequestCode, RemotingProcessor> processors = new HashMap<RequestCode, RemotingProcessor>();
private Semaphore reqLimitSemaphore;
private int reqLimitAcquireTimeout = 200;
private boolean reqLimitEnable = false;
private static final Logger LOGGER = LoggerFactory.getLogger(RemotingDispatcher.class);

public RemotingDispatcher(JobTrackerAppContext appContext) {
super(appContext);
Expand All @@ -31,31 +39,66 @@ public RemotingDispatcher(JobTrackerAppContext appContext) {
processors.put(RequestCode.JOB_PULL, new JobPullProcessor(appContext));
processors.put(RequestCode.BIZ_LOG_SEND, new JobBizLogProcessor(appContext));
processors.put(RequestCode.CANCEL_JOB, new JobCancelProcessor(appContext));

this.reqLimitEnable = appContext.getConfig().getParameter("remoting.req.limit.enable", false);
Integer maxQPS = appContext.getConfig().getParameter("remoting.req.limit.maxQPS", 500);
this.reqLimitSemaphore = new Semaphore(maxQPS);
this.reqLimitAcquireTimeout = appContext.getConfig().getParameter("remoting.req.limit.acquire.timeout", 200);
}

@Override
public RemotingCommand processRequest(Channel channel, RemotingCommand request) throws RemotingCommandException {
// 心跳
if (request.getCode() == JobProtos.RequestCode.HEART_BEAT.code()) {
commonHandler(channel, request);
offerHandler(channel, request);
return RemotingCommand.createResponseCommand(JobProtos.ResponseCode.HEART_BEAT_SUCCESS.code(), "");
}
if (reqLimitEnable) {
return doBizWithReqLimit(channel, request);
} else {
return doBiz(channel, request);
}
}

/**
* 限流处理
*/
private RemotingCommand doBizWithReqLimit(Channel channel, RemotingCommand request) throws RemotingCommandException {
boolean acquired = false;
try {
try {
acquired = reqLimitSemaphore.tryAcquire(reqLimitAcquireTimeout, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
LOGGER.warn("acquire lock error", e);
}

if (!acquired) {
return RemotingCommand.createResponseCommand(RemotingProtos.ResponseCode.SYSTEM_BUSY.code(), "remoting server is busy!");
}
return doBiz(channel, request);
} finally {
if (acquired) {
reqLimitSemaphore.release();
}
}
}

private RemotingCommand doBiz(Channel channel, RemotingCommand request) throws RemotingCommandException {
// 其他的请求code
RequestCode code = RequestCode.valueOf(request.getCode());
RemotingProcessor processor = processors.get(code);
if (processor == null) {
return RemotingCommand.createResponseCommand(RemotingProtos.ResponseCode.REQUEST_CODE_NOT_SUPPORTED.code(), "request code not supported!");
}
commonHandler(channel, request);
offerHandler(channel, request);
return processor.processRequest(channel, request);
}

/**
* 1. 将 channel 纳入管理中(不存在就加入)
* 2. 更新 TaskTracker 节点信息(可用线程数)
*/
private void commonHandler(Channel channel, RemotingCommand request) {
private void offerHandler(Channel channel, RemotingCommand request) {
AbstractRemotingCommandBody commandBody = request.getBody();
String nodeGroup = commandBody.getNodeGroup();
String identity = commandBody.getIdentity();
Expand Down
5 changes: 2 additions & 3 deletions 开发计划.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,11 @@
* 可以自定义收到特定的BizLog进行报警
* 报警形式可以是短信,邮件等

####5.LTS 的注册中心
可以参考diamond 和 RokectMQ的nameserver等。无非就是一个pub/sub
####6.zookeeper客户端的封装
主要是为了去除对zkClient和curator的依赖
####7.LTS nio框架的实现
主要是为了去除对netty和mina的依赖(群主正在开发中)
####8.任务依赖的原生支持
####9.对任务业务日志的处理
可以采用按数量分表
可以采用按数量分表
####10.TaskTracker增加设置,当自身节点内存不足或者cpu资源不足的时候,不去pull任务

0 comments on commit e0193d3

Please sign in to comment.