Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-8 集成flink on local模式 #12

Merged
merged 67 commits into from
Apr 24, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
67 commits
Select commit Hold shift + click to select a range
d6ee7be
迁移部分代码
ispong Apr 7, 2024
37788df
格式化代码
ispong Apr 7, 2024
f02def9
完成flink提交到flinkCluster中
ispong Apr 9, 2024
3aabe01
完成flink提交到k8s中
ispong Apr 11, 2024
056b497
初始化flink-yun项目
ispong Apr 11, 2024
b3da0c5
格式化代码
ispong Apr 11, 2024
8eec8a9
完成对yarn的集成
ispong Apr 12, 2024
531fba2
格式化代码
ispong Apr 12, 2024
a8bcdd2
修改flink的依赖
ispong Apr 12, 2024
7ea1d46
修改官网背景颜色
ispong Apr 12, 2024
0b7502d
优化官网
ispong Apr 12, 2024
7fbe329
修改主题色
ispong Apr 12, 2024
7e1ba33
修改背景色
ispong Apr 12, 2024
c8bab4f
修复项目启动
ispong Apr 12, 2024
43f363f
优化swagger启动
ispong Apr 12, 2024
87a0b15
修复启动脚本异常
ispong Apr 12, 2024
1e4d883
修改banner
ispong Apr 12, 2024
a292af5
修改banner
ispong Apr 12, 2024
97876e0
修复gradle backend启动路径不一致问题
ispong Apr 12, 2024
a05ec74
修复gradle backend启动路径不一致问题
ispong Apr 12, 2024
7831ca9
优化git提交配置
ispong Apr 12, 2024
44c50bd
优化git提交配置
ispong Apr 12, 2024
7a30d70
优化git提交配置
ispong Apr 12, 2024
848dc20
优化git提交配置
ispong Apr 12, 2024
c089a02
格式化代码
ispong Apr 12, 2024
4d7f1c2
优化git提交配置
ispong Apr 12, 2024
ecbfaa8
优化git提交配置
ispong Apr 12, 2024
381525f
checkstyle优化
ispong Apr 12, 2024
aff7515
spotless优化
ispong Apr 12, 2024
6bab114
spotless优化
ispong Apr 12, 2024
af5aaa9
spotless优化
ispong Apr 12, 2024
386e164
spotless优化
ispong Apr 12, 2024
f5a9239
格式化代码
ispong Apr 12, 2024
8164b10
格式化代码
ispong Apr 12, 2024
d694589
checkstyle格式话
ispong Apr 12, 2024
7f5f2e8
优化gradle命令
ispong Apr 12, 2024
25fc226
替换logo
ispong Apr 13, 2024
002b4f9
替换产品logo
ispong Apr 13, 2024
2675d1f
优化代理器
ispong Apr 13, 2024
dd33732
优化节点安装
ispong Apr 13, 2024
510002b
修复节点无法安装问题
ispong Apr 13, 2024
26f9dc2
修复节点无法安装问题
ispong Apr 13, 2024
498b512
格式化代码
ispong Apr 13, 2024
8ec70dc
优化官网
ispong Apr 13, 2024
2c0d40a
修改产品名称
ispong Apr 13, 2024
679225d
修复节点无法安装
ispong Apr 13, 2024
7990c63
修复local模式flink无法提交问题
ispong Apr 14, 2024
c00ae47
修复代码无法提交到yarn中的问题
ispong Apr 14, 2024
65fff88
格式化代码
ispong Apr 14, 2024
a87640f
修复k8s的日志问题
ispong Apr 14, 2024
4574bca
修复yarn提交问题
ispong Apr 14, 2024
077add1
格式化代码
ispong Apr 14, 2024
b6806c0
修改git提交忽略
ispong Apr 23, 2024
22a77f9
完成local模式本地通测
ispong Apr 23, 2024
b335907
支持yarn测试
ispong Apr 23, 2024
4e171df
格式化代码
ispong Apr 23, 2024
ef91d26
添加解压安装lib
ispong Apr 23, 2024
ab11d75
格式化代码
ispong Apr 23, 2024
21dc462
修复yarn不支持提交
ispong Apr 23, 2024
3ac3b41
资源文件注意提交
ispong Apr 24, 2024
9357bc9
修复脚本不兼容
ispong Apr 24, 2024
baafeb3
修复脚本不兼容
ispong Apr 24, 2024
a6210ed
修复本地中止报错
ispong Apr 24, 2024
d2f4af4
格式化代码
ispong Apr 24, 2024
cc36d08
添加环境变量文件
ispong Apr 24, 2024
5f7f864
支持提交到docker中
ispong Apr 24, 2024
9391db9
修复k8s异常
ispong Apr 24, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
修复代码无法提交到yarn中的问题
  • Loading branch information
ispong committed Apr 24, 2024
commit c00ae47b407b86d72305a901d51883736f0ec021
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public GetJobInfoRes getJobInfo(GetJobInfoReq getJobInfoReq) {
try (KubernetesClusterDescriptor clusterDescriptor =
kubernetesClusterClientFactory.createClusterDescriptor(flinkConfig)) {
ClusterClientProvider<String> clusterClientProvider =
clusterDescriptor.retrieve(getJobInfoReq.getClusterId());
clusterDescriptor.retrieve(getJobInfoReq.getJobId());
System.out.println(clusterClientProvider.getClusterClient().getWebInterfaceURL());
return null;
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
package com.isxcode.acorn.agent.run;

import com.alibaba.fastjson2.JSON;
import com.isxcode.acorn.api.agent.pojos.dto.FlinkVerticesDto;
import com.isxcode.acorn.api.agent.pojos.req.GetJobInfoReq;
import com.isxcode.acorn.api.agent.pojos.req.GetJobLogReq;
import com.isxcode.acorn.api.agent.pojos.req.StopJobReq;
import com.isxcode.acorn.api.agent.pojos.req.SubmitJobReq;
import com.isxcode.acorn.api.agent.pojos.res.GetJobInfoRes;
import com.isxcode.acorn.api.agent.pojos.res.GetJobLogRes;
import com.isxcode.acorn.api.agent.pojos.res.StopJobRes;
import com.isxcode.acorn.api.agent.pojos.res.SubmitJobRes;
import com.isxcode.acorn.api.agent.pojos.res.*;
import com.isxcode.acorn.backend.api.base.exceptions.IsxAppException;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -18,12 +17,27 @@
import org.apache.flink.yarn.YarnClusterClientFactory;
import org.apache.flink.yarn.YarnClusterDescriptor;
import org.apache.flink.yarn.configuration.YarnConfigOptions;
import org.apache.flink.yarn.configuration.YarnConfigOptionsInternal;
import org.apache.flink.yarn.configuration.YarnDeploymentTarget;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Service;
import org.springframework.web.client.RestTemplate;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Base64;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

import static java.util.Collections.singletonList;

Expand All @@ -38,19 +52,17 @@ public SubmitJobRes submitJob(SubmitJobReq submitJobReq) {
Configuration flinkConfig = GlobalConfiguration.loadConfiguration();
flinkConfig.set(DeploymentOptions.TARGET, YarnDeploymentTarget.APPLICATION.getName());
flinkConfig.set(PipelineOptions.NAME, submitJobReq.getAppName());
flinkConfig.set(ApplicationConfiguration.APPLICATION_ARGS, singletonList(""));
flinkConfig.set(ApplicationConfiguration.APPLICATION_ARGS, singletonList(Base64.getEncoder().encodeToString(JSON.toJSONString(submitJobReq.getAcornPluginReq()).getBytes())));
flinkConfig.set(ApplicationConfiguration.APPLICATION_MAIN_CLASS, submitJobReq.getEntryClass());
flinkConfig.set(PipelineOptions.JARS, singletonList(
"/Users/ispong/isxcode/flink-yun/flink-yun-plugins/flink-sql-execute-plugin/build/libs/flink-sql-execute-plugin.jar"));
flinkConfig.set(PipelineOptions.JARS, singletonList(submitJobReq.getAppResource()));
flinkConfig.set(DeploymentOptionsInternal.CONF_DIR, submitJobReq.getFlinkHome() + "/conf");
flinkConfig.set(JobManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.parse("2g"));
flinkConfig.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.parse("2g"));
flinkConfig.set(TaskManagerOptions.NUM_TASK_SLOTS, 1);
flinkConfig.set(YarnConfigOptions.APPLICATION_NAME, submitJobReq.getAppName());
flinkConfig.set(YarnConfigOptions.FLINK_DIST_JAR,
"/Users/ispong/isxcode/flink-yun/flink-yun-dist/flink-min/lib/flink-dist-1.18.1.jar");
flinkConfig.set(YarnConfigOptions.FLINK_DIST_JAR, submitJobReq.getFlinkHome() + "/lib/flink-dist-1.18.1.jar");
List<String> libFile = new ArrayList<>();
libFile.add("/Users/ispong/isxcode/flink-yun/flink-yun-dist/flink-min/lib");
libFile.add(submitJobReq.getFlinkHome() + "/lib");
libFile.add("/Users/ispong/isxcode/flink-yun/resources/jdbc/system");
libFile.add("/Users/ispong/isxcode/flink-yun/resources/cdc");
flinkConfig.set(YarnConfigOptions.SHIP_FILES, libFile);
Expand All @@ -60,32 +72,93 @@ public SubmitJobRes submitJob(SubmitJobReq submitJobReq) {
.setSlotsPerTaskManager(2).createClusterSpecification();

ApplicationConfiguration applicationConfiguration = ApplicationConfiguration.fromConfiguration(flinkConfig);
applicationConfiguration.applyToConfiguration(flinkConfig);
YarnClusterClientFactory yarnClusterClientFactory = new YarnClusterClientFactory();
try (YarnClusterDescriptor clusterDescriptor = yarnClusterClientFactory.createClusterDescriptor(flinkConfig)) {
ClusterClientProvider<ApplicationId> applicationIdClusterClientProvider =
clusterDescriptor.deployApplicationCluster(clusterSpecification, applicationConfiguration);
System.out.println(applicationIdClusterClientProvider.getClusterClient().getClusterId());
System.out.println(applicationIdClusterClientProvider.getClusterClient().getWebInterfaceURL());
return null;
return SubmitJobRes.builder().jobId(String.valueOf(applicationIdClusterClientProvider.getClusterClient().getClusterId())).build();
} catch (Exception e) {
e.printStackTrace();
throw new IsxAppException("提交任务失败" + e.getMessage());
}
}

@Override
public GetJobInfoRes getJobInfo(GetJobInfoReq getJobInfoReq) {
return null;

Configuration flinkConfig = GlobalConfiguration.loadConfiguration();
flinkConfig.set(DeploymentOptionsInternal.CONF_DIR, getJobInfoReq.getFlinkHome() + "/conf");

YarnClusterClientFactory yarnClusterClientFactory = new YarnClusterClientFactory();
try (YarnClusterDescriptor clusterDescriptor = yarnClusterClientFactory.createClusterDescriptor(flinkConfig)) {
ApplicationReport applicationReport = clusterDescriptor.getYarnClient().getApplicationReport(ApplicationId.fromString(getJobInfoReq.getJobId()));
return GetJobInfoRes.builder().jobId(getJobInfoReq.getJobId()).status(applicationReport.getYarnApplicationState().toString()).build();
} catch (Exception e) {
throw new IsxAppException("提交任务失败" + e.getMessage());
}
}

@Override
public GetJobLogRes getJobLog(GetJobLogReq getJobLogReq) {
return null;

String getLogCmdFormat = "yarn logs -applicationId %s";
Process process;
try {
process = Runtime.getRuntime().exec(String.format(getLogCmdFormat, getJobLogReq.getJobId()));
} catch (IOException e) {
throw new RuntimeException(e);
}

InputStream inputStream = process.getInputStream();
BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8));

StringBuilder errLog = new StringBuilder();
String line;
while (true) {
try {
if (!((line = reader.readLine()) != null)) break;
} catch (IOException e) {
throw new RuntimeException(e);
}
errLog.append(line).append("\n");
}

try {
int exitCode = process.waitFor();
if (exitCode == 1) {
throw new IsxAppException(errLog.toString());
} else {
Pattern regex = Pattern.compile("LogType:taskmanager.log\\s*([\\s\\S]*?)\\s*End of LogType:taskmanager.log");
Matcher matcher = regex.matcher(errLog);
String log = "";
while (matcher.find()) {
String tmpLog = matcher.group();
if (tmpLog.contains("ERROR")) {
log = tmpLog;
break;
}
if (tmpLog.length() > log.length()) {
log = tmpLog;
}
}
return GetJobLogRes.builder().log(log).build();
}
} catch (InterruptedException e) {
throw new IsxAppException(e.getMessage());
}
}

@Override
public StopJobRes stopJobReq(StopJobReq stopJobReq) {
return null;

Configuration flinkConfig = GlobalConfiguration.loadConfiguration();
flinkConfig.set(DeploymentOptionsInternal.CONF_DIR, stopJobReq.getFlinkHome() + "/conf");

YarnClusterClientFactory yarnClusterClientFactory = new YarnClusterClientFactory();
try (YarnClusterDescriptor clusterDescriptor = yarnClusterClientFactory.createClusterDescriptor(flinkConfig)) {
clusterDescriptor.getYarnClient().killApplication(ApplicationId.fromString(stopJobReq.getJobId()));
return StopJobRes.builder().build();
} catch (Exception e) {
throw new IsxAppException("停止任务失败" + e.getMessage());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,41 +49,35 @@ public GetJobInfoRes getJobInfo(GetJobInfoReq getJobInfoReq) {

switch (getJobInfoReq.getAgentType()) {
case AgentType.YARN:

break;
return yarnAcorn.getJobInfo(getJobInfoReq);
case AgentType.K8S:
return kubernetesAcorn.getJobInfo(getJobInfoReq);
case AgentType.FlinkCluster:
return flinkClusterAcorn.getJobInfo(getJobInfoReq);
default:
throw new IsxAppException("agent类型不支持");
}

return null;
}

public GetJobLogRes getJobLog(GetJobLogReq getJobLogReq) {

switch (getJobLogReq.getAgentType()) {
case AgentType.YARN:
break;
return yarnAcorn.getJobLog(getJobLogReq);
case AgentType.K8S:
return kubernetesAcorn.getJobLog(getJobLogReq);
case AgentType.FlinkCluster:
return flinkClusterAcorn.getJobLog(getJobLogReq);
default:
throw new IsxAppException("agent类型不支持");
}

return null;
}

public StopJobRes stopJob(StopJobReq stopJobReq) {

switch (stopJobReq.getAgentType()) {
case AgentType.YARN:

break;
return yarnAcorn.stopJobReq(stopJobReq);
case AgentType.K8S:
return kubernetesAcorn.stopJobReq(stopJobReq);
case AgentType.FlinkCluster:
Expand All @@ -92,6 +86,5 @@ public StopJobRes stopJob(StopJobReq stopJobReq) {
throw new IsxAppException("agent类型不支持");
}

return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,4 @@ public class GetJobInfoReq {
private String flinkHome;

private String agentType;

private String clusterId;
}
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ protected void execute(WorkRunContext workRunContext, WorkInstanceEntity workIns
SubmitJobReq submitJobReq = SubmitJobReq.builder().entryClass("com.isxcode.acorn.plugin.sql.execute.Job")
.appResource(
"/Users/ispong/isxcode/flink-yun/flink-yun-plugins/flink-sql-execute-plugin/build/libs/flink-sql-execute-plugin.jar")
.appName("zhiliuyun-job")
.acornPluginReq(AcornPluginReq.builder().flinkSql(workRunContext.getScript()).build())
.flinkHome(engineNode.getFlinkHomePath()).agentType(calculateEngineEntityOptional.get().getClusterType())
.build();
Expand Down