Skip to content

Commit

Permalink
Merge remote-tracking branch 'guthub/master'
Browse files Browse the repository at this point in the history
  • Loading branch information
yuan.cheng committed Dec 5, 2016
2 parents c2ad4ce + e1d9f67 commit f7e1d3f
Show file tree
Hide file tree
Showing 4 changed files with 94 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -224,4 +224,9 @@ public interface ExtConfig {
String JOB_TRACKER_PUSHER_THREAD_NUM = "lts.job.tracker.pusher.thread.num";

String JOB_TRACKER_PUSH_BATCH_SIZE = "lts.job.tracker.push.batch.size";

String TASK_TRACKER_BIZ_LOGGER_FAIL_STORE_CLOSE = "lts.task.tracker.biz.logger.failstore.close";

String TASK_TRACKER_JOB_RESULT_FAIL_STORE_CLOSE = "lts.task.tracker.job.result.failstore.close";

}
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public ByteBuffer encode(NioChannel channel, Object msg) {
RemotingHelper.closeChannel(c);
}
} else {
LOGGER.error("Message is instance of " + RemotingCommand.class.getName());
LOGGER.error("Message is not instance of " + RemotingCommand.class.getName());
}
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,15 @@
import com.github.ltsopensource.core.commons.utils.Callable;
import com.github.ltsopensource.core.commons.utils.CollectionUtils;
import com.github.ltsopensource.core.commons.utils.StringUtils;
import com.github.ltsopensource.core.constant.ExtConfig;
import com.github.ltsopensource.core.constant.Level;
import com.github.ltsopensource.core.domain.BizLog;
import com.github.ltsopensource.core.domain.JobMeta;
import com.github.ltsopensource.core.exception.JobTrackerNotFoundException;
import com.github.ltsopensource.core.failstore.FailStorePathBuilder;
import com.github.ltsopensource.core.json.JSON;
import com.github.ltsopensource.core.logger.Logger;
import com.github.ltsopensource.core.logger.LoggerFactory;
import com.github.ltsopensource.core.protocol.JobProtos;
import com.github.ltsopensource.core.protocol.command.BizLogSendRequest;
import com.github.ltsopensource.core.protocol.command.CommandBodyWrapper;
Expand All @@ -32,6 +36,7 @@
*/
public class BizLoggerImpl extends BizLoggerAdapter implements BizLogger {

private static final Logger logger = LoggerFactory.getLogger(BizLoggerImpl.class);
private Level level;
private RemotingClientDelegate remotingClient;
private TaskTrackerAppContext appContext;
Expand All @@ -44,25 +49,31 @@ public BizLoggerImpl(Level level, final RemotingClientDelegate remotingClient, T
}
this.appContext = appContext;
this.remotingClient = remotingClient;
this.retryScheduler = new RetryScheduler<BizLog>(BizLogger.class.getSimpleName(), appContext, FailStorePathBuilder.getBizLoggerPath(appContext)) {
@Override
protected boolean isRemotingEnable() {
return remotingClient.isServerEnable();
}

@Override
protected boolean retry(List<BizLog> list) {
return sendBizLog(list);
}
};
this.retryScheduler.start();
if (isEnableBizLoggerFailStore()) {

this.retryScheduler = new RetryScheduler<BizLog>(BizLogger.class.getSimpleName(), appContext, FailStorePathBuilder.getBizLoggerPath(appContext)) {
@Override
protected boolean isRemotingEnable() {
return remotingClient.isServerEnable();
}

@Override
protected boolean retry(List<BizLog> list) {
return sendBizLog(list);
}
};

this.retryScheduler.start();

NodeShutdownHook.registerHook(appContext, this.getClass().getName(), new Callable() {
@Override
public void call() throws Exception {
retryScheduler.stop();
}
});
}

NodeShutdownHook.registerHook(appContext, this.getClass().getName(), new Callable() {
@Override
public void call() throws Exception {
retryScheduler.stop();
}
});
}

@Override
Expand Down Expand Up @@ -105,7 +116,11 @@ private void sendMsg(String msg, Level level) {
requestBody.setBizLogs(Collections.singletonList(bizLog));

if (!remotingClient.isServerEnable()) {
retryScheduler.inSchedule(StringUtils.generateUUID(), bizLog);
if(isEnableBizLoggerFailStore()){
retryScheduler.inSchedule(StringUtils.generateUUID(), bizLog);
}else{
logger.error("Send Biz Logger to JobTracker Error, server is down, bizLog={}", JSON.toJSONString(bizLog));
}
return;
}

Expand All @@ -120,7 +135,12 @@ public void operationComplete(ResponseFuture responseFuture) {
if (response != null && response.getCode() == JobProtos.ResponseCode.BIZ_LOG_SEND_SUCCESS.code()) {
// success
} else {
retryScheduler.inSchedule(StringUtils.generateUUID(), bizLog);
if (!isEnableBizLoggerFailStore()) {
logger.error("Send Biz Logger to JobTracker Error, code={}, bizLog={}",
response != null ? response.getCode() : null, JSON.toJSONString(bizLog));
} else {
retryScheduler.inSchedule(StringUtils.generateUUID(), bizLog);
}
}
}
});
Expand All @@ -129,6 +149,10 @@ public void operationComplete(ResponseFuture responseFuture) {
}
}

private boolean isEnableBizLoggerFailStore() {
return !appContext.getConfig().getParameter(ExtConfig.TASK_TRACKER_BIZ_LOGGER_FAIL_STORE_CLOSE, false);
}

private boolean sendBizLog(List<BizLog> bizLogs) {
if (CollectionUtils.isEmpty(bizLogs)) {
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.github.ltsopensource.core.commons.utils.Callable;
import com.github.ltsopensource.core.commons.utils.CollectionUtils;
import com.github.ltsopensource.core.constant.Constants;
import com.github.ltsopensource.core.constant.ExtConfig;
import com.github.ltsopensource.core.domain.JobMeta;
import com.github.ltsopensource.core.domain.JobRunResult;
import com.github.ltsopensource.core.exception.JobTrackerNotFoundException;
Expand Down Expand Up @@ -50,29 +51,32 @@ public class JobPushProcessor extends AbstractProcessor {
protected JobPushProcessor(TaskTrackerAppContext appContext) {
super(appContext);
this.remotingClient = appContext.getRemotingClient();
retryScheduler = new RetryScheduler<JobRunResult>(JobPushProcessor.class.getSimpleName(), appContext,
FailStorePathBuilder.getJobFeedbackPath(appContext), 3) {
@Override
protected boolean isRemotingEnable() {
return remotingClient.isServerEnable();
}

@Override
protected boolean retry(List<JobRunResult> results) {
return retrySendJobResults(results);
}
};
retryScheduler.start();

// 线程安全的
jobRunnerCallback = new JobRunnerCallback();

NodeShutdownHook.registerHook(appContext, this.getClass().getName(), new Callable() {
@Override
public void call() throws Exception {
retryScheduler.stop();
}
});

if (isEnableFailStore()) {
retryScheduler = new RetryScheduler<JobRunResult>(JobPushProcessor.class.getSimpleName(), appContext,
FailStorePathBuilder.getJobFeedbackPath(appContext), 3) {
@Override
protected boolean isRemotingEnable() {
return remotingClient.isServerEnable();
}

@Override
protected boolean retry(List<JobRunResult> results) {
return retrySendJobResults(results);
}
};
retryScheduler.start();

NodeShutdownHook.registerHook(appContext, this.getClass().getName(), new Callable() {
@Override
public void call() throws Exception {
retryScheduler.stop();
}
});
}
}

@Override
Expand Down Expand Up @@ -152,9 +156,15 @@ public void operationComplete(ResponseFuture responseFuture) {
LOGGER.info("Job feedback failed, save local files。{}", jobRunResult);
}
try {
retryScheduler.inSchedule(
jobRunResult.getJobMeta().getJobId().concat("_") + SystemClock.now(),
jobRunResult);
if (isEnableFailStore()) {
retryScheduler.inSchedule(
jobRunResult.getJobMeta().getJobId().concat("_") + SystemClock.now(),
jobRunResult);
} else {
LOGGER.error("Send Job Result to JobTracker Error, code={}, jobRunResult={}",
commandResponse != null ? commandResponse.getCode() : null, JSON.toJSONString(jobRunResult));
}

} catch (Exception e) {
LOGGER.error("Job feedback failed", e);
}
Expand All @@ -173,9 +183,14 @@ public void operationComplete(ResponseFuture responseFuture) {
} catch (JobTrackerNotFoundException e) {
try {
LOGGER.warn("No job tracker available! save local files.");
retryScheduler.inSchedule(
jobRunResult.getJobMeta().getJobId().concat("_") + SystemClock.now(),
jobRunResult);

if (isEnableFailStore()) {
retryScheduler.inSchedule(
jobRunResult.getJobMeta().getJobId().concat("_") + SystemClock.now(),
jobRunResult);
} else {
LOGGER.error("Send Job Result to JobTracker Error, server is down, jobRunResult={}", JSON.toJSONString(jobRunResult));
}
} catch (Exception e1) {
LOGGER.error("Save files failed, {}", jobRunResult.getJobMeta(), e1);
}
Expand All @@ -185,6 +200,10 @@ public void operationComplete(ResponseFuture responseFuture) {
}
}

private boolean isEnableFailStore() {
return !appContext.getConfig().getParameter(ExtConfig.TASK_TRACKER_JOB_RESULT_FAIL_STORE_CLOSE, false);
}

/**
* 发送JobResults
*/
Expand Down

0 comments on commit f7e1d3f

Please sign in to comment.