Skip to content

Commit

Permalink
Merge pull request #185 from harbby/dev
Browse files Browse the repository at this point in the history
fix flink 1.13.1 runtime
  • Loading branch information
harbby authored Aug 8, 2021
2 parents fca2bd5 + 8cac84c commit 4fd67fa
Showing 1 changed file with 6 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@
import ideal.sylph.runner.flink.FlinkJobConfig;
import org.apache.flink.client.deployment.ClusterSpecification;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.yarn.YarnClientYarnClusterInformationRetriever;
import org.apache.flink.yarn.YarnClusterDescriptor;
Expand All @@ -30,8 +32,6 @@
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;

import static com.github.harbby.gadtry.base.MoreObjects.checkState;

public class YarnJobDescriptor
extends YarnClusterDescriptor
{
Expand Down Expand Up @@ -100,9 +100,10 @@ public ClusterClient<ApplicationId> deploy(JobGraph jobGraph, boolean detached)
.setSlotsPerTaskManager(appConf.getTaskManagerSlots())
.setTaskManagerMemoryMB(appConf.getTaskManagerMemoryMb())
.createClusterSpecification();
String flinkPlugins = System.getenv(ConfigConstants.ENV_FLINK_PLUGINS_DIR);
checkState(flinkPlugins != null, "must set env FLINK_PLUGINS_DIR");

//checkState(System.getenv(ConfigConstants.ENV_FLINK_PLUGINS_DIR) != null, "flink1.12 must set env FLINK_PLUGINS_DIR"); //flink1.12 need
flinkConfiguration.set(JobManagerOptions.TOTAL_PROCESS_MEMORY, new MemorySize((long) appConf.getJobManagerMemoryMb() * 1024 * 1024));
flinkConfiguration.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, new MemorySize((long) appConf.getTaskManagerMemoryMb() * 1024 * 1024));
return this.deployJobCluster(clusterSpecification, jobGraph, detached).getClusterClient();
}
}

0 comments on commit 4fd67fa

Please sign in to comment.