Skip to content

Commit

Permalink
id增量优化
Browse files Browse the repository at this point in the history
  • Loading branch information
WeiYe-Jing committed May 24, 2020
1 parent 544a2a7 commit 7eff7a1
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.wugui.datatx.core.biz.model.ReturnT;
import com.wugui.datatx.core.biz.model.TriggerParam;
import com.wugui.datatx.core.enums.ExecutorBlockStrategyEnum;
import com.wugui.datatx.core.enums.IncrementTypeEnum;
import com.wugui.datatx.core.glue.GlueTypeEnum;
import com.wugui.datax.admin.core.conf.JobAdminConfig;
import com.wugui.datax.admin.core.route.ExecutorRouteStrategyEnum;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
package com.wugui.datax.admin.core.trigger;

import com.baomidou.mybatisplus.annotation.EnumValue;
package com.wugui.datatx.core.enums;

/**
* increment type
Expand All @@ -20,7 +18,6 @@ public enum IncrementTypeEnum {
this.descp = descp;
}

@EnumValue
private final int code;
private final String descp;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import com.wugui.datatx.core.biz.model.HandleProcessCallbackParam;
import com.wugui.datatx.core.biz.model.ReturnT;
import com.wugui.datatx.core.biz.model.TriggerParam;
import com.wugui.datatx.core.enums.IncrementTypeEnum;
import com.wugui.datatx.core.handler.IJobHandler;
import com.wugui.datatx.core.handler.annotation.JobHandler;
import com.wugui.datatx.core.log.JobLogger;
Expand All @@ -24,6 +25,7 @@
import java.util.Date;
import java.util.List;

import static com.wugui.datatx.core.util.Constants.SPLIT_COMMA;
import static com.wugui.datax.executor.service.jobhandler.DataXOptionConstant.*;

/**
Expand Down Expand Up @@ -136,17 +138,17 @@ private static void reader(InputStream inputStream) throws IOException {
while ((line = reader.readLine()) != null) {

if (line.contains(TASK_START_TIME_SUFFIX)) {
stringBuilder.append(subResult(line)).append(Constants.SPLIT_COMMA);
stringBuilder.append(subResult(line)).append(SPLIT_COMMA);
} else if (line.contains(TASK_END_TIME_SUFFIX)) {
stringBuilder.append(subResult(line)).append(Constants.SPLIT_COMMA);
stringBuilder.append(subResult(line)).append(SPLIT_COMMA);
} else if (line.contains(TASK_TOTAL_TIME_SUFFIX)) {
stringBuilder.append(subResult(line)).append(Constants.SPLIT_COMMA);
stringBuilder.append(subResult(line)).append(SPLIT_COMMA);
} else if (line.contains(TASK_AVERAGE_FLOW_SUFFIX)) {
stringBuilder.append(subResult(line)).append(Constants.SPLIT_COMMA);
stringBuilder.append(subResult(line)).append(SPLIT_COMMA);
} else if (line.contains(TASK_RECORD_WRITING_SPEED_SUFFIX)) {
stringBuilder.append(subResult(line)).append(Constants.SPLIT_COMMA);
stringBuilder.append(subResult(line)).append(SPLIT_COMMA);
} else if (line.contains(TASK_RECORD_READER_NUM_SUFFIX)) {
stringBuilder.append(subResult(line)).append(Constants.SPLIT_COMMA);
stringBuilder.append(subResult(line)).append(SPLIT_COMMA);
} else if (line.contains(TASK_RECORD_WRITING_NUM_SUFFIX)) {
stringBuilder.append(subResult(line));
}
Expand All @@ -168,56 +170,58 @@ private String buildDataXParam(TriggerParam tgParam) {
if (StringUtils.isNotBlank(jvmParam)) {
doc.append(JVM_CM).append(TRANSFORM_QUOTES).append(jvmParam).append(TRANSFORM_QUOTES);
}

String replaceParam = tgParam.getReplaceParam().trim();
if (StringUtils.isNotBlank(replaceParam) && tgParam.getIncrementType() == 2) {

if (IncrementTypeEnum.TIME.getCode() == tgParam.getIncrementType()) {

if (doc.length() > 0) doc.append(SPLIT_SPACE);
if (StringUtils.isNotBlank(tgParam.getReplaceParam())) {
if (doc.length() > 0) doc.append(DataXOptionConstant.SPLIT_SPACE);

if (tgParam.getReplaceParamType() == null || tgParam.getReplaceParamType().isEmpty() || tgParam.getReplaceParamType().equals("UnitTime")) {
if (StringUtils.isNotBlank(replaceParam)) {
if (doc.length() > 0) doc.append(SPLIT_SPACE);

String replaceParamType = tgParam.getReplaceParamType();

if (StringUtils.isBlank(replaceParamType) || replaceParamType.equals("UnitTime")) {
long startTime = tgParam.getStartTime().getTime() / 1000;
long endTime = tgParam.getTriggerTime().getTime() / 1000;
doc.append(DataXOptionConstant.PARAMS_CM).append(DataXOptionConstant.TRANSFORM_QUOTES).append(String.format(tgParam.getReplaceParam(), startTime, endTime));
doc.append(PARAMS_CM).append(TRANSFORM_QUOTES).append(String.format(replaceParam, startTime, endTime));
} else {
SimpleDateFormat sdf = new SimpleDateFormat(tgParam.getReplaceParamType());
SimpleDateFormat sdf = new SimpleDateFormat(replaceParamType);
String tgSecondTime = sdf.format(tgParam.getTriggerTime());
String lastTime = sdf.format(tgParam.getStartTime());
doc.append(DataXOptionConstant.PARAMS_CM).append(DataXOptionConstant.TRANSFORM_QUOTES).append(String.format(tgParam.getReplaceParam(), lastTime, tgSecondTime));
}

if (StringUtils.isNotBlank(partitionStr)) {
doc.append(SPLIT_SPACE);
List<String> partitionInfo = Arrays.asList(partitionStr.split(Constants.SPLIT_COMMA));
doc.append(String.format(PARAMS_CM_V_PT, buildPartition(partitionInfo)));
doc.append(PARAMS_CM).append(TRANSFORM_QUOTES).append(String.format(replaceParam, lastTime, tgSecondTime));
}
//buildPartitionCM(doc, partitionStr);
doc.append(TRANSFORM_QUOTES);
} else if(StringUtils.isNotBlank(tgParam.getReplaceParam()) && tgParam.getIncrementType() == 1){
long startId = tgParam.getStartId();
long endId = tgParam.getEndId();
if (doc.length() > 0) {
doc.append(DataXOptionConstant.SPLIT_SPACE);
}

doc.append(DataXOptionConstant.PARAMS_CM).append(DataXOptionConstant.TRANSFORM_QUOTES).append(String.format(tgParam.getReplaceParam(), startId, endId));
if (StringUtils.isNotBlank(partitionStr)) {
doc.append(DataXOptionConstant.SPLIT_SPACE);
List<String> partitionInfo = Arrays.asList(partitionStr.split(Constants.SPLIT_COMMA));
doc.append(String.format(DataXOptionConstant.PARAMS_CM_V_PT, buildPartition(partitionInfo)));
}
doc.append(DataXOptionConstant.TRANSFORM_QUOTES);
}
} else if (IncrementTypeEnum.ID.getCode() == tgParam.getIncrementType()) {
long startId = tgParam.getStartId();
long endId = tgParam.getEndId();
if (doc.length() > 0) doc.append(SPLIT_SPACE);
doc.append(PARAMS_CM).append(TRANSFORM_QUOTES).append(String.format(replaceParam, startId, endId));
doc.append(TRANSFORM_QUOTES);

} else {
if (StringUtils.isNotBlank(partitionStr)) {
List<String> partitionInfo = Arrays.asList(partitionStr.split(Constants.SPLIT_COMMA));
if (doc.length() > 0) doc.append(SPLIT_SPACE);
doc.append(PARAMS_CM).append(TRANSFORM_QUOTES).append(String.format(PARAMS_CM_V_PT, buildPartition(partitionInfo))).append(TRANSFORM_QUOTES);
}
} else if (IncrementTypeEnum.PARTITION.getCode() == tgParam.getIncrementType()) {
if (StringUtils.isNotBlank(partitionStr)) {
List<String> partitionInfo = Arrays.asList(partitionStr.split(SPLIT_COMMA));
if (doc.length() > 0) doc.append(SPLIT_SPACE);
doc.append(PARAMS_CM).append(TRANSFORM_QUOTES).append(String.format(PARAMS_CM_V_PT, buildPartition(partitionInfo))).append(TRANSFORM_QUOTES);
}
}

JobLogger.log("------------------命令参数: " + doc);
return doc.toString();
}

private void buildPartitionCM(StringBuilder doc, String partitionStr) {
if (StringUtils.isNotBlank(partitionStr)) {
doc.append(SPLIT_SPACE);
List<String> partitionInfo = Arrays.asList(partitionStr.split(SPLIT_COMMA));
doc.append(String.format(PARAMS_CM_V_PT, buildPartition(partitionInfo)));
}
}

private String buildPartition(List<String> partitionInfo) {
String field = partitionInfo.get(0);
int timeOffset = Integer.parseInt(partitionInfo.get(1));
Expand Down

0 comments on commit 7eff7a1

Please sign in to comment.