Skip to content

Commit

Permalink
Add TaskTracker FailStore close config
Browse files Browse the repository at this point in the history
  • Loading branch information
qq254963746 committed Nov 30, 2016
1 parent 3d749e2 commit 8536d16
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -226,4 +226,7 @@ public interface ExtConfig {
String JOB_TRACKER_PUSH_BATCH_SIZE = "lts.job.tracker.push.batch.size";

String TASK_TRACKER_BIZ_LOGGER_FAIL_STORE_CLOSE = "lts.task.tracker.biz.logger.failstore.close";

String TASK_TRACKER_JOB_RESULT_FAIL_STORE_CLOSE = "lts.task.tracker.job.result.failstore.close";

}
Original file line number Diff line number Diff line change
Expand Up @@ -49,27 +49,31 @@ public BizLoggerImpl(Level level, final RemotingClientDelegate remotingClient, T
}
this.appContext = appContext;
this.remotingClient = remotingClient;
this.retryScheduler = new RetryScheduler<BizLog>(BizLogger.class.getSimpleName(), appContext, FailStorePathBuilder.getBizLoggerPath(appContext)) {
@Override
protected boolean isRemotingEnable() {
return remotingClient.isServerEnable();
}

@Override
protected boolean retry(List<BizLog> list) {
return sendBizLog(list);
}
};
if (isEnableBizLoggerFailStore()) {

this.retryScheduler = new RetryScheduler<BizLog>(BizLogger.class.getSimpleName(), appContext, FailStorePathBuilder.getBizLoggerPath(appContext)) {
@Override
protected boolean isRemotingEnable() {
return remotingClient.isServerEnable();
}

@Override
protected boolean retry(List<BizLog> list) {
return sendBizLog(list);
}
};

this.retryScheduler.start();

NodeShutdownHook.registerHook(appContext, this.getClass().getName(), new Callable() {
@Override
public void call() throws Exception {
retryScheduler.stop();
}
});
}

NodeShutdownHook.registerHook(appContext, this.getClass().getName(), new Callable() {
@Override
public void call() throws Exception {
retryScheduler.stop();
}
});
}

@Override
Expand Down Expand Up @@ -112,7 +116,11 @@ private void sendMsg(String msg, Level level) {
requestBody.setBizLogs(Collections.singletonList(bizLog));

if (!remotingClient.isServerEnable()) {
retryScheduler.inSchedule(StringUtils.generateUUID(), bizLog);
if(isEnableBizLoggerFailStore()){
retryScheduler.inSchedule(StringUtils.generateUUID(), bizLog);
}else{
logger.error("Send Biz Logger to JobTracker Error, server is down, bizLog={}", JSON.toJSONString(bizLog));
}
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.github.ltsopensource.core.commons.utils.Callable;
import com.github.ltsopensource.core.commons.utils.CollectionUtils;
import com.github.ltsopensource.core.constant.Constants;
import com.github.ltsopensource.core.constant.ExtConfig;
import com.github.ltsopensource.core.domain.JobMeta;
import com.github.ltsopensource.core.domain.JobRunResult;
import com.github.ltsopensource.core.exception.JobTrackerNotFoundException;
Expand Down Expand Up @@ -50,29 +51,32 @@ public class JobPushProcessor extends AbstractProcessor {
protected JobPushProcessor(TaskTrackerAppContext appContext) {
super(appContext);
this.remotingClient = appContext.getRemotingClient();
retryScheduler = new RetryScheduler<JobRunResult>(JobPushProcessor.class.getSimpleName(), appContext,
FailStorePathBuilder.getJobFeedbackPath(appContext), 3) {
@Override
protected boolean isRemotingEnable() {
return remotingClient.isServerEnable();
}

@Override
protected boolean retry(List<JobRunResult> results) {
return retrySendJobResults(results);
}
};
retryScheduler.start();

// 线程安全的
jobRunnerCallback = new JobRunnerCallback();

NodeShutdownHook.registerHook(appContext, this.getClass().getName(), new Callable() {
@Override
public void call() throws Exception {
retryScheduler.stop();
}
});

if (isEnableFailStore()) {
retryScheduler = new RetryScheduler<JobRunResult>(JobPushProcessor.class.getSimpleName(), appContext,
FailStorePathBuilder.getJobFeedbackPath(appContext), 3) {
@Override
protected boolean isRemotingEnable() {
return remotingClient.isServerEnable();
}

@Override
protected boolean retry(List<JobRunResult> results) {
return retrySendJobResults(results);
}
};
retryScheduler.start();

NodeShutdownHook.registerHook(appContext, this.getClass().getName(), new Callable() {
@Override
public void call() throws Exception {
retryScheduler.stop();
}
});
}
}

@Override
Expand Down Expand Up @@ -152,9 +156,15 @@ public void operationComplete(ResponseFuture responseFuture) {
LOGGER.info("Job feedback failed, save local files。{}", jobRunResult);
}
try {
retryScheduler.inSchedule(
jobRunResult.getJobMeta().getJobId().concat("_") + SystemClock.now(),
jobRunResult);
if (isEnableFailStore()) {
retryScheduler.inSchedule(
jobRunResult.getJobMeta().getJobId().concat("_") + SystemClock.now(),
jobRunResult);
} else {
LOGGER.error("Send Job Result to JobTracker Error, code={}, jobRunResult={}",
commandResponse != null ? commandResponse.getCode() : null, JSON.toJSONString(jobRunResult));
}

} catch (Exception e) {
LOGGER.error("Job feedback failed", e);
}
Expand All @@ -173,9 +183,14 @@ public void operationComplete(ResponseFuture responseFuture) {
} catch (JobTrackerNotFoundException e) {
try {
LOGGER.warn("No job tracker available! save local files.");
retryScheduler.inSchedule(
jobRunResult.getJobMeta().getJobId().concat("_") + SystemClock.now(),
jobRunResult);

if (isEnableFailStore()) {
retryScheduler.inSchedule(
jobRunResult.getJobMeta().getJobId().concat("_") + SystemClock.now(),
jobRunResult);
} else {
LOGGER.error("Send Job Result to JobTracker Error, server is down, jobRunResult={}", JSON.toJSONString(jobRunResult));
}
} catch (Exception e1) {
LOGGER.error("Save files failed, {}", jobRunResult.getJobMeta(), e1);
}
Expand All @@ -185,6 +200,10 @@ public void operationComplete(ResponseFuture responseFuture) {
}
}

private boolean isEnableFailStore() {
return !appContext.getConfig().getParameter(ExtConfig.TASK_TRACKER_JOB_RESULT_FAIL_STORE_CLOSE, false);
}

/**
* 发送JobResults
*/
Expand Down

0 comments on commit 8536d16

Please sign in to comment.