Skip to content

Commit

Permalink
Support parse task output params under multiple log (apache#15244)
Browse files Browse the repository at this point in the history
  • Loading branch information
ruanwenjun authored Dec 1, 2023
1 parent b4cf1cc commit 71ee1f0
Show file tree
Hide file tree
Showing 60 changed files with 495 additions and 480 deletions.
2 changes: 1 addition & 1 deletion docs/docs/en/guide/parameter/context.md
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ Although the two parameters var1 and var2 are output in the A task, only the `OU

#### Pass parameter from Kubernetes task to downstream

Different programming languages may use different logging frameworks in Kubernetes tasks. To be compatible with these frameworks, DolphinScheduler provides a universal logging data format `${(key=value)dsVal}`. Users can output log data in the format `${(key=value)dsVal}` in the terminal logs of their applications, where `key` is the corresponding parameter prop and `value` is the value of that parameter. DolphinScheduler will capture the `${(key=value)dsVal}` in the output logs to capture the parameters and pass them downstream.
Different programming languages may use different logging frameworks in Kubernetes tasks. To be compatible with these frameworks, DolphinScheduler provides a universal logging data format `${(key=value)}` or `#{(key=value)}`. Users can output log data in the format in the terminal logs of their applications, where `key` is the corresponding parameter prop and `value` is the value of that parameter. DolphinScheduler will capture the `${(key=value)}` or `#{(key=value)}` in the output logs to capture the parameters and pass them downstream.

For example

Expand Down
2 changes: 1 addition & 1 deletion docs/docs/zh/guide/parameter/context.md
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ Node_mysql 运行结果如下:

#### Kubernetes 任务传递参数

在Kubernetes任务中不同的程序开发语言可能会采用不同的日志框架,DolphinScheduler为了兼容不同的日志框架,提供了一种通用的日志数据格式,即 `${(key=value)dsVal}`,用户可以在应用程序的终端日志中输出以格式为 `${(key=value)dsVal}` 结束的日志数据,key 为对应参数的 prop,value 为该参数的值。DolphinScheduler会捕捉输出日志中的 `${(key=value)dsVal}`来进行参数捕捉,从而传递到下游。
在Kubernetes任务中不同的程序开发语言可能会采用不同的日志框架,DolphinScheduler为了兼容不同的日志框架,提供了一种通用的日志数据格式,即 `${(key=value)}``#{(key=value)}`,用户可以在应用程序的终端日志中输出以这种格式的日志数据,key 为对应参数的 prop,value 为该参数的值。DolphinScheduler会捕捉输出日志中的 `${(key=value)}``#{(key=value)}` 来进行参数捕捉,从而传递到下游。

如下图所示:

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse;
import org.apache.dolphinscheduler.plugin.task.api.parser.TaskOutputParameterParser;
import org.apache.dolphinscheduler.plugin.task.api.shell.IShellInterceptor;
import org.apache.dolphinscheduler.plugin.task.api.shell.IShellInterceptorBuilder;
import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
Expand All @@ -39,31 +40,25 @@
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.lang.reflect.Field;
import java.util.List;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.slf4j.Logger;

import lombok.extern.slf4j.Slf4j;
import io.fabric8.kubernetes.client.dsl.LogWatch;

/**
* abstract command executor
*/
@Slf4j
public abstract class AbstractCommandExecutor {

/**
* rules for extracting Var Pool
*/
protected static final Pattern SETVALUE_REGEX = Pattern.compile(TaskConstants.SETVALUE_REGEX);

protected StringBuilder varPool = new StringBuilder();
protected volatile Map<String, String> taskOutputParams = new HashMap<>();
/**
* process
*/
Expand All @@ -74,11 +69,6 @@ public abstract class AbstractCommandExecutor {
*/
protected Consumer<LinkedBlockingQueue<String>> logHandler;

/**
* logger
*/
protected Logger logger;

/**
* log list
*/
Expand All @@ -98,11 +88,9 @@ public abstract class AbstractCommandExecutor {
protected Future<?> podLogOutputFuture;

public AbstractCommandExecutor(Consumer<LinkedBlockingQueue<String>> logHandler,
TaskExecutionContext taskRequest,
Logger logger) {
TaskExecutionContext taskRequest) {
this.logHandler = logHandler;
this.taskRequest = taskRequest;
this.logger = logger;
this.logBuffer = new LinkedBlockingQueue<>();
this.logBuffer.add(EMPTY_STRING);

Expand All @@ -119,7 +107,7 @@ public TaskResponse run(IShellInterceptorBuilder iShellInterceptorBuilder,
TaskResponse result = new TaskResponse();
int taskInstanceId = taskRequest.getTaskInstanceId();
if (null == TaskExecutionContextCacheManager.getByTaskInstanceId(taskInstanceId)) {
logger.warn(
log.warn(
"Cannot find the taskInstance: {} from TaskExecutionContextCacheManager, the task might already been killed",
taskInstanceId);
result.setExitStatusCode(EXIT_CODE_KILL);
Expand Down Expand Up @@ -180,7 +168,7 @@ public TaskResponse run(IShellInterceptorBuilder iShellInterceptorBuilder,
return result;
}
// print process id
logger.info("process start, process id is: {}", processId);
log.info("process start, process id is: {}", processId);

// if timeout occurs, exit directly
long remainTime = getRemainTime();
Expand All @@ -201,7 +189,7 @@ public TaskResponse run(IShellInterceptorBuilder iShellInterceptorBuilder,
// Wait the task log process finished.
taskOutputFuture.get();
} catch (ExecutionException e) {
logger.error("Handle task log error", e);
log.error("Handle task log error", e);
}
}

Expand All @@ -212,7 +200,7 @@ public TaskResponse run(IShellInterceptorBuilder iShellInterceptorBuilder,
// delete pod after successful execution and log collection
ProcessUtils.cancelApplication(taskRequest);
} catch (ExecutionException e) {
logger.error("Handle pod log error", e);
log.error("Handle pod log error", e);
}
}

Expand All @@ -223,21 +211,21 @@ public TaskResponse run(IShellInterceptorBuilder iShellInterceptorBuilder,
result.setExitStatusCode(this.process.exitValue());

} else {
logger.error("process has failure, the task timeout configuration value is:{}, ready to kill ...",
log.error("process has failure, the task timeout configuration value is:{}, ready to kill ...",
taskRequest.getTaskTimeout());
result.setExitStatusCode(EXIT_CODE_FAILURE);
cancelApplication();
}
int exitCode = this.process.exitValue();
String exitLogMessage = EXIT_CODE_KILL == exitCode ? "process has killed." : "process has exited.";
logger.info("{} execute path:{}, processId:{} ,exitStatusCode:{} ,processWaitForStatus:{} ,processExitValue:{}",
log.info("{} execute path:{}, processId:{} ,exitStatusCode:{} ,processWaitForStatus:{} ,processExitValue:{}",
exitLogMessage, taskRequest.getExecutePath(), processId, result.getExitStatusCode(), status, exitCode);
return result;

}

public String getVarPool() {
return varPool.toString();
public Map<String, String> getTaskOutputParams() {
return taskOutputParams;
}

public void cancelApplication() throws InterruptedException {
Expand All @@ -246,16 +234,12 @@ public void cancelApplication() throws InterruptedException {
}

// soft kill
logger.info("Begin to kill process process, pid is : {}", taskRequest.getProcessId());
log.info("Begin to kill process process, pid is : {}", taskRequest.getProcessId());
process.destroy();
if (!process.waitFor(5, TimeUnit.SECONDS)) {
process.destroyForcibly();
}
logger.info("Success kill task: {}, pid: {}", taskRequest.getTaskAppId(), taskRequest.getProcessId());
}

private void printCommand(List<String> commands) {
logger.info("task run command: {}", String.join(" ", commands));
log.info("Success kill task: {}, pid: {}", taskRequest.getTaskAppId(), taskRequest.getProcessId());
}

private void collectPodLogIfNeeded() {
Expand Down Expand Up @@ -299,24 +283,22 @@ private void parseProcessOutput(Process process) {
ExecutorService getOutputLogService = ThreadUtils
.newSingleDaemonScheduledExecutorService("ResolveOutputLog-thread-" + taskRequest.getTaskName());
getOutputLogService.submit(() -> {
TaskOutputParameterParser taskOutputParameterParser = new TaskOutputParameterParser();
try (BufferedReader inReader = new BufferedReader(new InputStreamReader(process.getInputStream()))) {
LogUtils.setTaskInstanceLogFullPathMDC(taskRequest.getLogPath());
String line;
while ((line = inReader.readLine()) != null) {
if (line.startsWith("${setValue(") || line.startsWith("#{setValue(")) {
varPool.append(findVarPool(line));
varPool.append("$VarPool$");
} else {
logBuffer.add(line);
}
logBuffer.add(line);
taskOutputParameterParser.appendParseLog(line);
}
processLogOutputIsSuccess = true;
} catch (Exception e) {
logger.error("Parse var pool error", e);
log.error("Parse var pool error", e);
processLogOutputIsSuccess = true;
} finally {
LogUtils.removeTaskInstanceLogFullPathMDC();
}
taskOutputParams = taskOutputParameterParser.getTaskOutputParams();
});

getOutputLogService.shutdown();
Expand All @@ -336,28 +318,14 @@ private void parseProcessOutput(Process process) {
}
}
} catch (Exception e) {
logger.error("Output task log error", e);
log.error("Output task log error", e);
} finally {
LogUtils.removeTaskInstanceLogFullPathMDC();
}
});
parseProcessOutputExecutorService.shutdown();
}

/**
* find var pool
*
* @param line
* @return
*/
private String findVarPool(String line) {
Matcher matcher = SETVALUE_REGEX.matcher(line);
if (matcher.find()) {
return matcher.group(1);
}
return null;
}

/**
* get remain time(s)
*
Expand Down Expand Up @@ -389,7 +357,7 @@ private int getProcessId(Process process) {

processId = f.getInt(process);
} catch (Exception e) {
logger.error("Get task pid failed", e);
log.error("Get task pid failed", e);
}

return processId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,24 +28,20 @@
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;

/**
* executive task
*/
@Slf4j
public abstract class AbstractTask {

protected final Logger log = LoggerFactory.getLogger(AbstractTask.class);

private static String groupName1 = "paramName1";
private static String groupName2 = "paramName2";
public String rgex = String.format("['\"]\\$\\{(?<%s>.*?)}['\"]|\\$\\{(?<%s>.*?)}", groupName1, groupName2);

/**
* varPool string
*/
protected String varPool;
@Getter
@Setter
protected Map<String, String> taskOutputParams;

/**
* taskExecutionContext
Expand Down Expand Up @@ -91,14 +87,6 @@ public void init() {

public abstract void cancel() throws TaskException;

public void setVarPool(String varPool) {
this.varPool = varPool;
}

public String getVarPool() {
return varPool;
}

/**
* get exit status code
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,16 @@
import java.util.List;
import java.util.Map;

import lombok.extern.slf4j.Slf4j;

@Slf4j
public abstract class AbstractYarnTask extends AbstractRemoteTask {

private ShellCommandExecutor shellCommandExecutor;

public AbstractYarnTask(TaskExecutionContext taskRequest) {
super(taskRequest);
this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle,
taskRequest,
log);
this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle, taskRequest);
}

// todo split handle to submit and track
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,11 @@
import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.Consumer;

import org.slf4j.Logger;

/**
* shell command executor
*/
public class ShellCommandExecutor extends AbstractCommandExecutor {

public ShellCommandExecutor(Consumer<LinkedBlockingQueue<String>> logHandler,
TaskExecutionContext taskRequest,
Logger logger) {
super(logHandler, taskRequest, logger);
TaskExecutionContext taskRequest) {
super(logHandler, taskRequest);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,6 @@ private TaskConstants() {

public static final String FLINK_APPLICATION_REGEX = "JobID \\w+";

public static final String SETVALUE_REGEX = "[\\$#]\\{setValue\\((.*?)\\)}";

public static final String DSVALUE_REGEX = "[\\$#]\\{\\((.*?)\\)dsVal}$";

/**
* string false
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@
import org.apache.dolphinscheduler.plugin.task.api.k8s.impl.K8sTaskExecutor;
import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse;

import java.util.Map;

import lombok.extern.slf4j.Slf4j;

@Slf4j
public abstract class AbstractK8sTask extends AbstractRemoteTask {

/**
Expand All @@ -37,7 +42,7 @@ public abstract class AbstractK8sTask extends AbstractRemoteTask {
*/
protected AbstractK8sTask(TaskExecutionContext taskRequest) {
super(taskRequest);
this.abstractK8sTaskExecutor = new K8sTaskExecutor(log, taskRequest);
this.abstractK8sTaskExecutor = new K8sTaskExecutor(taskRequest);
}

// todo split handle to submit and track
Expand All @@ -47,7 +52,7 @@ public void handle(TaskCallBack taskCallBack) throws TaskException {
TaskResponse response = abstractK8sTaskExecutor.run(buildCommand());
setExitStatusCode(response.getExitStatusCode());
setAppIds(response.getAppIds());
dealOutParam(abstractK8sTaskExecutor.getVarPool());
dealOutParam(abstractK8sTaskExecutor.getTaskOutputParams());
} catch (Exception e) {
log.error("k8s task submit failed with error");
exitStatusCode = -1;
Expand Down Expand Up @@ -86,5 +91,5 @@ public void cancelApplication() throws TaskException {
*/
protected abstract String buildCommand();

protected abstract void dealOutParam(String result);
protected abstract void dealOutParam(Map<String, String> taskOutputParams);
}
Loading

0 comments on commit 71ee1f0

Please sign in to comment.