首页 技术 正文
技术 2022年11月11日
0 收藏 915 点赞 3,750 浏览 10602 个字

转发请注明原创地址:https://www.cnblogs.com/dongxiao-yang/p/9403427.html

flink任务的deploy形式有很多种选择,常见的有standalone,on yarn , Meos , Kubernetes等方式,目前公司内部统一采用flink on yarn的 single job模式(每个flink job 单独在yarn上声明一个flink集群),本文分析的是flink1.5.1版本源码使用legacy 模式提交yarn single job到yarn集群的部分源码。

典型的flink提交single job命令格式如下: ./flink run  -m yarn-cluster -d  -yst -yqu flinkqu -yst  -yn 4 -ys 2 -c flinkdemoclass  flinkdemo.jar  args1 args2 …

flink脚本的入口类为org.apache.flink.client.cli.CliFrontend

在CliFrontend的main函数中首先通过loadCustomCommandLines方法加载了提交yarn任务初始化一个重要工具类

org.apache.flink.yarn.cli.FlinkYarnSessionCli
    public static List<CustomCommandLine<?>> loadCustomCommandLines(Configuration configuration, String configurationDirectory) {
List<CustomCommandLine<?>> customCommandLines = new ArrayList<>(2); // Command line interface of the YARN session, with a special initialization here
// to prefix all options with y/yarn.
// Tips: DefaultCLI must be added at last, because getActiveCustomCommandLine(..) will get the
// active CustomCommandLine in order and DefaultCLI isActive always return true.
final String flinkYarnSessionCLI = "org.apache.flink.yarn.cli.FlinkYarnSessionCli";
try {
customCommandLines.add(
loadCustomCommandLine(flinkYarnSessionCLI,
configuration,
configurationDirectory,
"y",
"yarn"));
} catch (NoClassDefFoundError | Exception e) {
LOG.warn("Could not load CLI class {}.", flinkYarnSessionCLI, e);
} if (configuration.getString(CoreOptions.MODE).equalsIgnoreCase(CoreOptions.NEW_MODE)) {
customCommandLines.add(new DefaultCLI(configuration));
} else {
customCommandLines.add(new LegacyCLI(configuration));
} return customCommandLines;
}

根据启动参数,CliFrontend开始运行方法run()->runProgram(),runProgram内部与yarn相关的一个重点方法为

client = clusterDescriptor.deploySessionCluster(clusterSpecification);

上文中的clusterDescriptor就是前面的FlinkYarnSessionCli执行createClusterDescriptor()方法后产生的集群属性描述对象,在本模式中对应的具体类是org.apache.flink.yarn.LegacyYarnClusterDescriptor,父类为AbstractYarnClusterDescriptor

deploySessionCluster内部进一步调用deployInternal来向yarn集群提交一个flink集群。

protected ClusterClient<ApplicationId> deployInternal(
ClusterSpecification clusterSpecification,
String applicationName,
String yarnClusterEntrypoint,
@Nullable JobGraph jobGraph,
boolean detached) throws Exception { // ------------------ Check if configuration is valid --------------------
validateClusterSpecification(clusterSpecification); if (UserGroupInformation.isSecurityEnabled()) {
// note: UGI::hasKerberosCredentials inaccurately reports false
// for logins based on a keytab (fixed in Hadoop 2.6.1, see HADOOP-10786),
// so we check only in ticket cache scenario.
boolean useTicketCache = flinkConfiguration.getBoolean(SecurityOptions.KERBEROS_LOGIN_USETICKETCACHE); UserGroupInformation loginUser = UserGroupInformation.getCurrentUser();
if (loginUser.getAuthenticationMethod() == UserGroupInformation.AuthenticationMethod.KERBEROS
&& useTicketCache && !loginUser.hasKerberosCredentials()) {
LOG.error("Hadoop security with Kerberos is enabled but the login user does not have Kerberos credentials");
throw new RuntimeException("Hadoop security with Kerberos is enabled but the login user " +
"does not have Kerberos credentials");
}
} isReadyForDeployment(clusterSpecification); // ------------------ Check if the specified queue exists -------------------- checkYarnQueues(yarnClient); // ------------------ Add dynamic properties to local flinkConfiguraton ------
Map<String, String> dynProperties = getDynamicProperties(dynamicPropertiesEncoded);
for (Map.Entry<String, String> dynProperty : dynProperties.entrySet()) {
flinkConfiguration.setString(dynProperty.getKey(), dynProperty.getValue());
} // ------------------ Check if the YARN ClusterClient has the requested resources -------------- // Create application via yarnClient
final YarnClientApplication yarnApplication = yarnClient.createApplication();
final GetNewApplicationResponse appResponse = yarnApplication.getNewApplicationResponse(); Resource maxRes = appResponse.getMaximumResourceCapability(); final ClusterResourceDescription freeClusterMem;
try {
freeClusterMem = getCurrentFreeClusterResources(yarnClient);
} catch (YarnException | IOException e) {
failSessionDuringDeployment(yarnClient, yarnApplication);
throw new YarnDeploymentException("Could not retrieve information about free cluster resources.", e);
} final int yarnMinAllocationMB = yarnConfiguration.getInt(yarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 0); final ClusterSpecification validClusterSpecification;
try {
validClusterSpecification = validateClusterResources(
clusterSpecification,
yarnMinAllocationMB,
maxRes,
freeClusterMem);
} catch (YarnDeploymentException yde) {
failSessionDuringDeployment(yarnClient, yarnApplication);
throw yde;
} LOG.info("Cluster specification: {}", validClusterSpecification); final ClusterEntrypoint.ExecutionMode executionMode = detached ?
ClusterEntrypoint.ExecutionMode.DETACHED
: ClusterEntrypoint.ExecutionMode.NORMAL; flinkConfiguration.setString(ClusterEntrypoint.EXECUTION_MODE, executionMode.toString()); ApplicationReport report = startAppMaster(
flinkConfiguration,
applicationName,
yarnClusterEntrypoint,
jobGraph,
yarnClient,
yarnApplication,
clusterSpecification); String host = report.getHost();
int port = report.getRpcPort(); // Correctly initialize the Flink config
flinkConfiguration.setString(JobManagerOptions.ADDRESS, host);
flinkConfiguration.setInteger(JobManagerOptions.PORT, port); flinkConfiguration.setString(RestOptions.ADDRESS, host);
flinkConfiguration.setInteger(RestOptions.PORT, port); // the Flink cluster is deployed in YARN. Represent cluster
return createYarnClusterClient(
this,
clusterSpecification.getNumberTaskManagers(),
clusterSpecification.getSlotsPerTaskManager(),
report,
flinkConfiguration,
true);
}

deployInternal方法开头对yarn集群的可用内存,queue等进行检查后申请了一个application,并调用startAppMaster声明了AM的启动类:YarnApplicationMasterRunner

public ApplicationReport startAppMaster(
Configuration configuration,
String applicationName,
String yarnClusterEntrypoint,
JobGraph jobGraph,
YarnClient yarnClient,
YarnClientApplication yarnApplication,
ClusterSpecification clusterSpecification) throws Exception {
..... setApplicationTags(appContext); // add a hook to clean up in case deployment fails
Thread deploymentFailureHook = new DeploymentFailureHook(yarnClient, yarnApplication, yarnFilesDir);
Runtime.getRuntime().addShutdownHook(deploymentFailureHook);
LOG.info("Submitting application master " + appId);
yarnClient.submitApplication(appContext); LOG.info("Waiting for the cluster to be allocated");
final long startTime = System.currentTimeMillis();
ApplicationReport report;}

YarnApplicationMasterRunner会在yarn集群上作为appmaster与resourcemanager通信申请对应的Taskmanagercontainer服务,启动jobmanager服务和webui服务等

    protected int runApplicationMaster(Configuration config) {
......
......
webMonitor = BootstrapTools.startWebMonitorIfConfigured(
config,
highAvailabilityServices,
new AkkaJobManagerRetriever(actorSystem, webMonitorTimeout, 10, Time.milliseconds(50L)),
new AkkaQueryServiceRetriever(actorSystem, webMonitorTimeout),
webMonitorTimeout,
new ScheduledExecutorServiceAdapter(futureExecutor),
LOG); metricRegistry = new MetricRegistryImpl(
MetricRegistryConfiguration.fromConfiguration(config)); metricRegistry.startQueryService(actorSystem, null); // 2: the JobManager
LOG.debug("Starting JobManager actor"); // we start the JobManager with its standard name
ActorRef jobManager = JobManager.startJobManagerActors(
config,
actorSystem,
futureExecutor,
ioExecutor,
highAvailabilityServices,
metricRegistry,
webMonitor == null ? Option.empty() : Option.apply(webMonitor.getRestAddress()),
new Some<>(JobMaster.JOB_MANAGER_NAME),
Option.<String>empty(),
getJobManagerClass(),
getArchivistClass())._1(); final String webMonitorURL = webMonitor == null ? null : webMonitor.getRestAddress(); // 3: Flink's Yarn ResourceManager
LOG.debug("Starting YARN Flink Resource Manager"); Props resourceMasterProps = YarnFlinkResourceManager.createActorProps(
getResourceManagerClass(),
config,
yarnConfig,
highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID),
appMasterHostname,
webMonitorURL,
taskManagerParameters,
taskManagerContext,
numInitialTaskManagers,
LOG); ActorRef resourceMaster = actorSystem.actorOf(resourceMasterProps);

另一方面,flink客户端在提交完集群后从runprogram()方法进入executeProgram();

    protected void executeProgram(PackagedProgram program, ClusterClient<?> client, int parallelism) throws ProgramMissingJobException, ProgramInvocationException {
logAndSysout("Starting execution of program"); final JobSubmissionResult result = client.run(program, parallelism); if (null == result) {
throw new ProgramMissingJobException("No JobSubmissionResult returned, please make sure you called " +
"ExecutionEnvironment.execute()");
} if (result.isJobExecutionResult()) {
logAndSysout("Program execution finished");
JobExecutionResult execResult = result.getJobExecutionResult();
System.out.println("Job with JobID " + execResult.getJobID() + " has finished.");
System.out.println("Job Runtime: " + execResult.getNetRuntime() + " ms");
Map<String, Object> accumulatorsResult = execResult.getAllAccumulatorResults();
if (accumulatorsResult.size() > 0) {
System.out.println("Accumulator Results: ");
System.out.println(AccumulatorHelper.getResultsFormatted(accumulatorsResult));
}
} else {
logAndSysout("Job has been submitted with JobID " + result.getJobID());
}
}

代码从ClusterClient.run()->prog.invokeInteractiveModeForExecution()开始真正进入用户flink job的main方法。

main方法中,代码最后的env.execute() 会把生成job的执行plan并返回对应的DetachedEnvironment对象。

方法调用链路为DetachedEnvironment.finalizeExecute()->ClusterClient.run()->YarnClusterClient.submitJob->ClusterClient.runDetached();

    /**
* Submits a JobGraph detached.
* @param jobGraph The JobGraph
* @param classLoader User code class loader to deserialize the results and errors (may contain custom classes).
* @return JobSubmissionResult
* @throws ProgramInvocationException
*/
public JobSubmissionResult runDetached(JobGraph jobGraph, ClassLoader classLoader) throws ProgramInvocationException { waitForClusterToBeReady(); final ActorGateway jobManagerGateway;
try {
jobManagerGateway = getJobManagerGateway();
} catch (Exception e) {
throw new ProgramInvocationException("Failed to retrieve the JobManager gateway.", e);
} try {
logAndSysout("Submitting Job with JobID: " + jobGraph.getJobID() + ". Returning after job submission.");
JobClient.submitJobDetached(
new AkkaJobManagerGateway(jobManagerGateway),
flinkConfig,
jobGraph,
Time.milliseconds(timeout.toMillis()),
classLoader);
return new JobSubmissionResult(jobGraph.getJobID());
} catch (JobExecutionException e) {
throw new ProgramInvocationException("The program execution failed: " + e.getMessage(), e);
}
}
    @Override
public JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoader) throws ProgramInvocationException {
if (isDetached()) {
if (newlyCreatedCluster) {
stopAfterJob(jobGraph.getJobID());
}
LOG.info("super.runDetached");
return super.runDetached(jobGraph, classLoader);
} else {
LOG.info("super.run");
return super.run(jobGraph, classLoader);
}
}

最后,客户端连接到前文对应的jobmanager服务并把flink job grafaph提交给yarn上已经申请好的flink集群。

结论:flink on yarn的single job模式提交作业的逻辑为flink客户端首先申请一个yarn集群的application,等待集群成功部署后再联系jobmanager并把job提交到集群上面。这个模式的优点是每个

flink job有一个独立的集群便于资源规划和管理,缺点是经过验证在am挂掉后yarn只能把原来的集群重启回来但是无法恢复flink jobgraph的行为,所以需要额外配置ha信息。

<!–
p.p1 {margin: 0.0px 0.0px 0.0px 0.0px; font: 11.0px Monaco}
span.s1 {color: #7e504f}
–>

相关推荐
python开发_常用的python模块及安装方法
adodb:我们领导推荐的数据库连接组件bsddb3:BerkeleyDB的连接组件Cheetah-1.0:我比较喜欢这个版本的cheeta…
日期:2022-11-24 点赞:878 阅读:9,088
Educational Codeforces Round 11 C. Hard Process 二分
C. Hard Process题目连接:http://www.codeforces.com/contest/660/problem/CDes…
日期:2022-11-24 点赞:807 阅读:5,564
下载Ubuntn 17.04 内核源代码
zengkefu@server1:/usr/src$ uname -aLinux server1 4.10.0-19-generic #21…
日期:2022-11-24 点赞:569 阅读:6,412
可用Active Desktop Calendar V7.86 注册码序列号
可用Active Desktop Calendar V7.86 注册码序列号Name: www.greendown.cn Code: &nb…
日期:2022-11-24 点赞:733 阅读:6,185
Android调用系统相机、自定义相机、处理大图片
Android调用系统相机和自定义相机实例本博文主要是介绍了android上使用相机进行拍照并显示的两种方式,并且由于涉及到要把拍到的照片显…
日期:2022-11-24 点赞:512 阅读:7,821
Struts的使用
一、Struts2的获取  Struts的官方网站为:http://struts.apache.org/  下载完Struts2的jar包,…
日期:2022-11-24 点赞:671 阅读:4,905