淘先锋技术网

首页 1 2 3 4 5 6 7

可以关注下公众号,专注数据开发、数据架构之路,热衷于分享技术干货。 

一、提交流程

        Flink作业在开发完毕之后,需要提交到Flink集群执行。ClientFronted是入口,触发用户开发的Flink应用Jar文件中的main方法,然后交给PipelineExecutor(流水线执行器,在FlinkClient 升成JobGraph之后,将作业提交给集群的重要环节。)#execue方法,最终会选择一个触发一个具体的PiplineExecutor执行。

不同模式的适用场景
运行模式                            适用场景
Session模式

共享Dispatcher 和ResourceManager

按需申请资源,作业共享集群资源

适合执行时间段,频繁执行的短任务

Per-Job模式

独享Dispatcher 和 ResourceManager

按需申请资源,作业独享集群资源

长周期执行的任务,集群异常影响范围小

提交模式又可分为:

  • Detached:Flink Client创建完集群之后,可以退出命令行窗口,集群独立运行。
  • Attached:不能关闭命令行窗口,需要与集群之间维持连接。

1.1 Yarn Session提交流程

启动集群

  • 使用bin/yarn-session.sh提交会话模式的作业。

        如果提交到已经存在的集群,则获取Yarn集群信息、应用ID,并准备提交作业。如果启动新的Yarn Session集群,则进入步骤(2)

  •  Yarn启动新Flink集群

        1)如果没有集群,则创建一个新的Session模式的集群。首先将应用配置(flink-conf.yaml、logback.xml、log4j.properties)和相关文件(Flink Jar、配置类文件、用户Jar文件、JobGraph对象等)上传至分布式存储(如HDFS)的应用暂存目录。

        2)通过Yarn Client 向Yarn 提交Flink创建集群的申请,Yarn分配资源,在申请的Yarn Container中初始化并启动FlinkJobManager进程,在JobManager进程中运行YarnSessionClusterEntrypoint作为集群启动入口(不同的集群部署模式有不同的ClusterEntrypoint实现),初始化Dispatcher、ResourceManager,启动相关的RPC服务,等待Client通过Rest接口提交作业。

作业提交

Yarn 集群准备好后,开始作业提交。

1)Flink Client通过Rest向Dispatcher提交JobGraph。

2)Dispatcher是Rest接口,不负责实际的调度、执行方面的工作,当收到JobGraph后,为作业创建一个JobMaster,将工作交给JobManager(负责作业调度、管理作业和Task的生命周期),构建ExecutionGraph(JobGraph的并行化版本,调度层最核心的数据结构)。

这两个步骤结束后,作业进入调度执行阶段。

作业调度执行

1)JobMaster向YarnResourceManager申请资源,开始调度ExecutionGraph执行,向YarnResourceManager申请资源;初次提交作业集群中尚没有TaskManager,此时资源不足,开始申请资源。

2)YarnResourceManager收到JobManager的资源请求,如果当前有空闲Slot则将Slot分配给JobMaster.,否则YarnResourceManager将向YarnMaster请求创建TaskManager。

3)YarnResourceManager将资源请求加入到等待请求队列,并通过心跳向Yarn RM 申请新的Container资源来启动TaskManager进程,Yarn分配新的Container给TaskManager。

4)YarnResourceManager启动,然后从HDFS加载Jar文件等所需要的的相关资源,在容器中启动TaskManager。

5)TaskManager启动之后,向ResourceManager注册,并把自己的Slot资源情况汇报给ResouceManager。

6)ResourceManager从等待队列中取出Slot请求,向TaskManager确认资源可用情况,并告知TaskManager将Slot分配给哪个JobMaster。

7)TaskManager向JobMaster提供Slot,JobMaster调度Task到TaskManager的此Slot上执行。

1.2 Yarn Per-Job提交流程

启动集群

  • 使用./flink run -m yarn-cluster提交Per-Job模式的作业。
  • Yarn启动Flink集群。该模式下Flink集群的启动入口是YarnJobClusterEntryPoint,其他与YarnSession模式下集群的启动类似。

作业提交

        该步骤与Seesion模式下的不同,Client并不会通过Rest向Dispatcher提交JobGraph,由Dispatcher从本地文件系统获取JObGraph,其后的不好走与Session模式的一样

作业调度执行

        与Yarn Session模式下一致。

1.3 K8s Session提交流程

 启动集群

  •  Flink客户端首先连接Kubernetes API Server,提交Flink集群的资源描述文件,包括flink-configuration-configmap.yaml、jobmanager-service.yaml、jobmanager-deployment.yaml和taskmanager-deployment.yaml等。
  • Kubernets Master会根据这些资源描述文件去创建对应的Kubernetes实体。以JobManager部署为例,Kubernetes集群中的某个节点收到请求后,Kubelet进程会从中央仓库下载Flink镜像,准备和挂载卷,然后执行启动命令。Pod启动后Flink Master(JobManager)进程随之启动,初始化Dispacher和KubernetesResourceManager。并通过K8s服务对外暴露FlinkMaster的端口,K8s服务类似于路由服务。

两个步骤完成之后,Session模式的集群就创建成功,集群可以接收作业提交请求,但是此时还没有JobManager、TaskManager,当作业需要执行时,才会按需创建。

作业提交

  • Client用户可以通过Flink命令行(即Flink Client)向这个会话模式的集群提交任务。此时JobGraph会在FlinkClient端生成,然后和用户Jar包一起通过RestClient上传。
  • 作业提交成功,Dispatcher会为每个作业启动一个JobMaster,将JobGraph交给JobMaster调度执行。

        两个步骤完成之后,作业进入调度执行阶段。

作业调度执行

        K8s Session模式集群下,ResourceManager向k8sMaster申请和释放TaskManager,除此之外,作业的调度与执行和Yarn模式是一样的。

        1)JobMaster向KubernetesResourceManager请求Slot。

        2)KubernetesResourceManager从kubernetes集群分配TaskManager。每个TaskManager都是具有唯一标识的Pod。KubernetesResourceManager会为TaskManager生成一份新的配置文件,里面有Flink Master的service name 作为地址。这样在FLInkMaster failover之后,TaskManager仍然可以重新连上。

        3)Kubernetes集群分配一个新的Pod后,在上面启动TaskManager。

        4)TaskManager启动后注册到SlotManager。

        5)SlotManager向TaskManager请求Slot.

        6)TaskManager 提供Slot给JobMaster,然后任务就会被分配到这个Slot上运行。

二、Graph总览

  • 流计算应用的Graph转换:StreamGraph-->JobGraph-->ExecutionGraph-->物理执行图(启动计算任务)
  • 批处理应用的Graph转换:OptimizedPlan-->JobGraph
  • Table & SQL API的Graph转换:Blink Table Planner /Flink Table Planner。

2.1 流图

使用DataStreamAPI 开发的应用程序,首先被转换为Transformation,然后被映射为StreamGraph。

2.1.1 SteramGraph核心对象

  • StreamNode

        StreamNode是StremGraph中的节点 ,从Transformation转换而来,可以简单理解为一个StreamNode表示一个算子,从逻辑上来说,SteramNode在StreamGraph中存在实体和虚拟的StreamNode。StremNode可以有多个输入,也可以有多个输出。

        实体的StreamNode会最终变成物理算子。虚拟的StreamNode会附着在StreamEdge上。

  • StreamEdge

        StreamEdge是StreamGraph中的边,用来连接两个StreamNode,一个StreamNode可以有多个出边、入边,StreamEdge中包含了旁路输出、分区器、字段筛选输出等信息。

2.1.2 StreamGraph生成过程

        StreamGraph在FlinkClient中生成,由FlinkClient在提交的时候触发Flink应用的main方法,用户编写的业务逻辑组装成Transformation流水线,在最后调用StreamExecutionEnvironment.execute() 的时候开始触发StreamGraph构建。

StreamGraph实际上是在StreamGraphGenerator中生成的,从SinkTransformation(输出) 向前追溯到SourceTransformation。在遍历过程中一边遍历一边构建StreamGraph。

 在遍历Transformation的过程中,会对不同类型的Transformation分别进行转换。对于物理Transformation则转换为StreamNode实体,对于虚拟Transformation则作为虚拟StreamNode。

 针对于某一种类型的Transformation,会调用其相应的transformxxx()函数进行转换。transfromxxx()首先转换上游Transformation进行递归转换,确保上游的都已经完成了转换。然后通过addOperator()方法构造出StreamNode,通过addEdge()方法与上游的transform进行连接,构造出StreamEdge。

在添加StreamEdge的过程中,如果ShuffleMode为null,则使用ShuffleMode PIPELINED模式,在流计算中,只有PIPLINED模式才会在批处理中设计其他模式。构建StreamEdge的时候,在转换Transformation过程中生成的 虚拟StreamNode会将虚拟StreamNode的信息附着在StreamEdge上

2.1.3 虚拟Transformation 的转换

 虚拟的Transformation生成的时候不会转换为SteramNode,而是添加为虚拟节点。

	private void addEdgeInternal(Integer upStreamVertexID,
			Integer downStreamVertexID,
			int typeNumber,
			StreamPartitioner<?> partitioner,
			List<String> outputNames,
			OutputTag outputTag,
			ShuffleMode shuffleMode) {
		//当上游是sideoutput时,递归调用,并传入sideoutput信息
		if (virtualSideOutputNodes.containsKey(upStreamVertexID)) {
			int virtualId = upStreamVertexID;
			upStreamVertexID = virtualSideOutputNodes.get(virtualId).f0;
			if (outputTag == null) {
				outputTag = virtualSideOutputNodes.get(virtualId).f1;
			}
			addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, null, outputTag, shuffleMode);
			
		}
		//当上游是select时,递归调用,并传入select信息
		else if (virtualSelectNodes.containsKey(upStreamVertexID)) {
			int virtualId = upStreamVertexID;
			upStreamVertexID = virtualSelectNodes.get(virtualId).f0;
			if (outputNames.isEmpty()) {
				// selections that happen downstream override earlier selections
				outputNames = virtualSelectNodes.get(virtualId).f1;
			}
			addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, outputNames, outputTag, shuffleMode);
		}
		//当上游是Partition时,递归调用,并传入Partition信息
		else if (virtualPartitionNodes.containsKey(upStreamVertexID)) {
			int virtualId = upStreamVertexID;
			upStreamVertexID = virtualPartitionNodes.get(virtualId).f0;
			if (partitioner == null) {
				partitioner = virtualPartitionNodes.get(virtualId).f1;
			}
			shuffleMode = virtualPartitionNodes.get(virtualId).f2;
			addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, outputNames, outputTag, shuffleMode);
		}
		//不是以上逻辑转换的情况,真正构建StreamEdge
		else {
			StreamNode upstreamNode = getStreamNode(upStreamVertexID);
			StreamNode downstreamNode = getStreamNode(downStreamVertexID);

			// If no partitioner was specified and the parallelism of upstream and downstream
			// operator matches use forward partitioning, use rebalance otherwise.
			//没有指定partitioner时,会为其选择forward或者rebalance
			if (partitioner == null && upstreamNode.getParallelism() == downstreamNode.getParallelism()) {
				partitioner = new ForwardPartitioner<Object>();
			} else if (partitioner == null) {
				partitioner = new RebalancePartitioner<Object>();
			}

			if (partitioner instanceof ForwardPartitioner) {
				if (upstreamNode.getParallelism() != downstreamNode.getParallelism()) {
					throw new UnsupportedOperationException("Forward partitioning does not allow " +
							"change of parallelism. Upstream operation: " + upstreamNode + " parallelism: " + upstreamNode.getParallelism() +
							", downstream operation: " + downstreamNode + " parallelism: " + downstreamNode.getParallelism() +
							" You must use another partitioning strategy, such as broadcast, rebalance, shuffle or global.");
				}
			}

			if (shuffleMode == null) {
				shuffleMode = ShuffleMode.UNDEFINED;
			}
			//创建StreamEdge,并将该SteramEdge添加到上游的输出,下游的输入。
			StreamEdge edge = new StreamEdge(upstreamNode, downstreamNode, typeNumber, outputNames, partitioner, outputTag, shuffleMode);

			getStreamNode(edge.getSourceId()).addOutEdge(edge);
			getStreamNode(edge.getTargetId()).addInEdge(edge);
		}
	}

2.2 作业图

 JobGraph可以由流计算的StreamGraph和批处理的OptimizedPlan转换而来。流计算中,在StreamGraph的基础上进行了一些优化,如果通过OperatorChain机制将算子合并起来,在执行时,调度在同一个Task线程上,避免数据的跨线程、跨网段的传递。

2.2.1 JobGraph核心对象

  • JobVertex

        经过算子融合优化后符合条件的多个SteramNode可能会融合在一起生成一个JobVertex,即一个JobVertex包含一个或多个算子,JobVertex的输入是JobEdge,输出是IntermediateDataSet。

  • JobEdge

        JobEdge是JobGraph中连接IntermediateDataSet和JobVertex的边,表示JobGraph中的一个数据流转通道,其上游数据源是IntermediateDataSet,下游消费者是JobVertex。数据通过JobEdge 由IntermediateDataSet传递给JobVertex。

  • IntermediateDataSet

        中间数据集IntermediateDataSet是一种逻辑结构,用来表示JobVertex的输出,即该JobVertex中包含的算子会产生的数据集。不同的执行模式下,其对应的结果分区类型不同,决定了在执行时刻数据交换的模式。

        IntermediateDataSet的个数与该JobVertex对应的StreamNode的出边数量相同,可以是一个或者多个。

2.2.2 JobGraph生成过程

StreamingJobGraphGenerator负责流计算JobGraph的生成,在转换前需要进行一系列的预处理。

	private JobGraph createJobGraph() {
		preValidate();

		// make sure that all vertices start immediately
		//设置调度模式
		jobGraph.setScheduleMode(streamGraph.getScheduleMode());

		// Generate deterministic hashes for the nodes in order to identify them across
		// submission iff they didn't change.
		//为每个节点生成确定的hashid作为唯一表示,在提交和执行过程中保持不变。
		Map<Integer, byte[]> hashes = defaultStreamGraphHasher.traverseStreamGraphAndGenerateHashes(streamGraph);

		// Generate legacy version hashes for backwards compatibility
		//为了向后保持兼容,为每个节点生成老版本的hash id
		List<Map<Integer, byte[]>> legacyHashes = new ArrayList<>(legacyStreamGraphHashers.size());
		for (StreamGraphHasher hasher : legacyStreamGraphHashers) {
			legacyHashes.add(hasher.traverseStreamGraphAndGenerateHashes(streamGraph));
		}

		Map<Integer, List<Tuple2<byte[], byte[]>>> chainedOperatorHashes = new HashMap<>();
		//真正对SteramGraph进行转换,生成JobGraph图
		setChaining(hashes, legacyHashes, chainedOperatorHashes);
		setPhysicalEdges();

		//设置共享slotgroup
		setSlotSharingAndCoLocation();

		setManagedMemoryFraction(
			Collections.unmodifiableMap(jobVertices),
			Collections.unmodifiableMap(vertexConfigs),
			Collections.unmodifiableMap(chainedConfigs),
			id -> streamGraph.getStreamNode(id).getMinResources(),
			id -> streamGraph.getStreamNode(id).getManagedMemoryWeight());
		//配置checkpoint
		configureCheckpointing();

		jobGraph.setSavepointRestoreSettings(streamGraph.getSavepointRestoreSettings());
		//如果有之前的缓存文件的配置,则重新读入
		JobGraphGenerator.addUserArtifactEntries(streamGraph.getUserArtifacts(), jobGraph);

		// set the ExecutionConfig last when it has been finalized
		try {
			//设置执行环境配置
			jobGraph.setExecutionConfig(streamGraph.getExecutionConfig());
		}
		catch (IOException e) {
			throw new IllegalConfigurationException("Could not serialize the ExecutionConfig." +
					"This indicates that non-serializable types (like custom serializers) were registered");
		}

		return jobGraph;
	}

 预处理完毕后,开始构建JobGraph中的点和边,从Source向下遍历StreamGraph,逐步创建JObGraph,在创建的过程中同事完成算子融合(OperatorChain)优化。

 执行具体的Chain和JobVertex生成、JobEdge的关联、IntermediateDataSet。从StreamGraph读取数据的StreamNode开始,递归遍历同时将StreamOperator连接在一起。

 整理构建的逻辑如下(看上图!!!):

1)从Source开始,Source与下游的FlatMap不可连接,Source是起始节点,自己成为一个JobVertx。

2)此时开始一个新的连接分析,FlatMap是起始节点,与下游的KeyedAgg也不可以连接,那么FlatMap自己成为一个JobVertex。

3)此时开始一个新的连接分析。KeyedAgg是起始节点,并且与下游的Sink可以连接,那么递归地分析Sink节点,构造Sink与其下游是否可以连接,因为Slink没有下游,所以KeyedAgg和Sink节点连接在一起,共同构成了一个JobVertex。在这个JobVertex中,KeyedAgg是起始节点,index编号为0,sink节点index编号为1.

        构建JobVertex的时候需要将StreamNode中的重要配置信息复制到JobVertex中。构建好JobVertex之后,需要构建JobEdge将JobVertex连接起来。KeyedAgg和Sink之间构成了一个算子连接,连接内部的算子之间无序构成JobEdge进行连接。

        在构建JobEdge的时候,很重要的一点是确定上游JobVertex和下游JobVertex的数据交换方式。此时根据ShuffleMode来确定ResultPartition类型,用FlinkPartition来确定JobVertex的连接方式。

        Shuffle确定了ResultPartition,那么就可以确定上游JobVertex输出的IntermediateDataSet的类型了,也就知道JobEdge的输入IntermediateDataSet。

        ForwardPartitioner和RescalePartitioner两种类型的Partitioner转换为DistributionPattern.POINTWISE 的分发模式。其他类型的Partitioner统一转换为DistributionPattern.ALL_TO_ALL模式。

JobGraph的构建和OperatorChain优化:

private List<StreamEdge> createChain(
			Integer startNodeId,
			Integer currentNodeId,
			Map<Integer, byte[]> hashes,
			List<Map<Integer, byte[]>> legacyHashes,
			int chainIndex,
			Map<Integer, List<Tuple2<byte[], byte[]>>> chainedOperatorHashes) {

		if (!builtVertices.contains(startNodeId)) {

			List<StreamEdge> transitiveOutEdges = new ArrayList<StreamEdge>();

			List<StreamEdge> chainableOutputs = new ArrayList<StreamEdge>();
			List<StreamEdge> nonChainableOutputs = new ArrayList<StreamEdge>();

			StreamNode currentNode = streamGraph.getStreamNode(currentNodeId);
			//获取当前节点的出边,判断是否符合OperatorChain的条件
			//分为两类:chainableoutputs,nonchainableoutputs
			for (StreamEdge outEdge : currentNode.getOutEdges()) {
				if (isChainable(outEdge, streamGraph)) {
					chainableOutputs.add(outEdge);
				} else {
					nonChainableOutputs.add(outEdge);
				}
			}
			//对于chainable的边,递归调用createchain
			//返回值添加到transitiveOutEdges中
			for (StreamEdge chainable : chainableOutputs) {
				transitiveOutEdges.addAll(
						createChain(startNodeId, chainable.getTargetId(), hashes, legacyHashes, chainIndex + 1, chainedOperatorHashes));
			}

			//对于无法chain在一起的边,边的下游节点作为Operatorchain的Head节点
			//进行递归调用,返回值添加到transitiveOutEdges中
			for (StreamEdge nonChainable : nonChainableOutputs) {
				transitiveOutEdges.add(nonChainable);
				createChain(nonChainable.getTargetId(), nonChainable.getTargetId(), hashes, legacyHashes, 0, chainedOperatorHashes);
			}

			List<Tuple2<byte[], byte[]>> operatorHashes =
				chainedOperatorHashes.computeIfAbsent(startNodeId, k -> new ArrayList<>());

			byte[] primaryHashBytes = hashes.get(currentNodeId);
			OperatorID currentOperatorId = new OperatorID(primaryHashBytes);

			for (Map<Integer, byte[]> legacyHash : legacyHashes) {
				operatorHashes.add(new Tuple2<>(primaryHashBytes, legacyHash.get(currentNodeId)));
			}

			chainedNames.put(currentNodeId, createChainedName(currentNodeId, chainableOutputs));
			chainedMinResources.put(currentNodeId, createChainedMinResources(currentNodeId, chainableOutputs));
			chainedPreferredResources.put(currentNodeId, createChainedPreferredResources(currentNodeId, chainableOutputs));

			if (currentNode.getInputFormat() != null) {
				getOrCreateFormatContainer(startNodeId).addInputFormat(currentOperatorId, currentNode.getInputFormat());
			}

			if (currentNode.getOutputFormat() != null) {
				getOrCreateFormatContainer(startNodeId).addOutputFormat(currentOperatorId, currentNode.getOutputFormat());
			}
			//如果当前节点是起始节点,则直接创建JobVertex,否则返回一个空的StreamConfig
			StreamConfig config = currentNodeId.equals(startNodeId)
					? createJobVertex(startNodeId, hashes, legacyHashes, chainedOperatorHashes)
					: new StreamConfig(new Configuration());

			//将StreamNode中的配置信息序列化到Streamconfig中。
			setVertexConfig(currentNodeId, config, chainableOutputs, nonChainableOutputs);
			//再次判断,如果是Chain的起始节点,执行connect()方法,创建JobEdge和IntermediateDataset
			//否则将当前节点的StreamConfig 添加到chainedConfig中。
			if (currentNodeId.equals(startNodeId)) {

				config.setChainStart();
				config.setChainIndex(0);
				config.setOperatorName(streamGraph.getStreamNode(currentNodeId).getOperatorName());

				for (StreamEdge edge : transitiveOutEdges) {
					connect(startNodeId, edge);
				}

				config.setOutEdgesInOrder(transitiveOutEdges);
				config.setTransitiveChainedTaskConfigs(chainedConfigs.get(startNodeId));

			} else {
				chainedConfigs.computeIfAbsent(startNodeId, k -> new HashMap<Integer, StreamConfig>());

				config.setChainIndex(chainIndex);
				StreamNode node = streamGraph.getStreamNode(currentNodeId);
				config.setOperatorName(node.getOperatorName());
				chainedConfigs.get(startNodeId).put(currentNodeId, config);
			}

			config.setOperatorID(currentOperatorId);

			if (chainableOutputs.isEmpty()) {
				config.setChainEnd();
			}
			return transitiveOutEdges;

		} else {
			return new ArrayList<>();
		}
	}

 2.2.3 算子融合

一个Operatorchain在同一个Task线程内执行。OperatorChain内的算子之间,在同一个线程内通过方法调用的方式传递数据,能减少线程之间的切换,减少消息的序列化/反序列化,无序借助内存缓存区,也无须通过网络在算子间传递数据,可在减少延迟的同时提高整体吞吐量

operatorchain的条件:

1)下游节点的入度为1

2)SteramEdge的下游节点对应的算子不为null

3)StreamEdge的上游节点对应的算子不为null

4)StreamEdge的上下游节点拥有相同的slotSharingGroup,默认都是default.

5)下游算子的连接策略为ALWAYS.

6)上游算子的连接策略为ALWAYS 或者HEAD.

7)StreamEdge的分区类型为ForwardPartitioner

8)上下游节点的并行度一致

9)当前StreamGraph允许chain

2.3 执行图

2.3.1 ExecutionGraph核心对象 

  • ExecutionJobVertex

        该对象和JobGraph中的JobVertex一一对应。该对象还包含了一组ExecutionVertex,数量与该JobVertex中所包含的SteramNode的并行度一致。

        ExecutionJobVertex用来将一个JobVertex封装成一ExecutionJobVertex,并以此创建ExecutionVertex、Execution、IntermediateResult和IntermediateResultPartition,用于丰富ExecutionGraph。

        在ExecutionJobVertex的构造函数中,首先是依据对应的JobVertex的并发度,生成对应个数的ExecutionVertex。其中,一个ExecutionVertex代表一个ExecutionJobVertex的并发子Task。然后是将原来JobVertex的中间结果IntermediateDataSet转化为ExecutionGrap中IntermediateResult

  • ExecutionVertex

        ExecutionJobVertex中会对作业进行并行化处理,构造可以并行执行的实例,每个并行执行的实例就是ExecutionVertex.

        构建ExecutionVertex的同时,也回构建ExecutionVertex的输出IntermediateResult。并且将ExecutionEdge输出为IntermediatePartition。

        ExecutionVertex的构造函数中,首先会创建IntermediatePartition,并通过IntermediateResult.setPartition()建立IntermediateResult和IntermediateResultPartition之间的关系,然后生成Execution,并配置资源相关。

  • IntermediateResult

        IntermediateResult又叫做中间结果集,该对象是个逻辑概念,表示ExecutionJobVertex的输出,和JobGraph中的IntermediateDataSet一一对应,同样,一个ExecutionJobVertex可以有多个中间二级果,取决于当前JobVertex有几个出边。

        一个中间结果集包含多个中间结果分区IntermediateResultPartition,其个数等于该JobVertex的并发度。

  • IntermediateResultPartition

        IntermediateResultPartition又叫做中间结果分区,表示1个ExecutionVertex输出结果,与ExecutionEdge相关联。

  • ExecutionEdge

        表示ExecutionVertex的输入,连接到上游产生的IntermediateResultPartition。一个Execution对应于唯一的一个IntermediateResultPartition和一个ExecutionVertex。一个ExecutionVertex可以有多个ExecutionEdge。

  • Execution

        ExecutionVertex相当于每个Task的模板,在真正执行的时候,会将ExecutionVertex中的信息包装为一个Execution,执行一个ExecutionVertex的一次尝试。JobManager和TaskManager之间关于Task的部署和Task执行状态的更新都是通过ExecutionAttemptID来标识实例的。在故障或者数据需要重算的情况下,ExecutionVertex可能会有多个ExecutionAttemptID.一个Execution通过ExecutionAttemptID标识。

2.3.2 ExecutionGrap生成过程

        初始话作业调度器的时候,根据JobGraph生活ExecutionGraph。在SchedulerBase的构造方法中触发构建,最终调用SchedulerBase#createExecutionGraph 触发实际的构建动作,使用ExecutionGraphBuiler构建ExecutionGraph。

 核心代码attachJobGraph:

 构建ExecutionEdge 的连接策略

  • 点对点连接(DistributionPattern.POINTWISE)

        该策略用来连接当前ExecutionVertex与上游的IntermediataeResultParition。

        连接分三种情况

        1)一对一连接:并发的Task数量与分区数相等。

        2)多对一连接:下游的Task数量小于上游的分区数,此时分两种情况:

                a:下游Task可以分配同数量的结果分区IntermediataeResultParition。如上游有4个结果分区,下游有2个Task,那么每个Task会分配两个结果分区进行消费。

                b:每个Task消费的上游分区结果数据不均,如上游有3个结果分区,下游有两个Task,那么一个Task分配2个结果分区消费,另一个分配一个结果分区消费。

        3)一对多连接:下游的Task数量多余上游的分区数,此时两种情况:

                a:每个结果分区的下游消费Task数据量相同,如上游有两个结果分区,下游有4个Task,每个结果分区被两个Task消费。

                b:每个结果分区的下游消费Task数量不相同,如上游有两个结果分区,下游有3个Task,那么一个结果分区分配2个Task消费,另一个结果分区分配一个Task消费。

  • 全连接(DistributionPattern.ALL_TO_ALL)

        该策略下游的ExecutionVertex与上游的所有IntermediataeResultParition建立连接,消费其生产的数据。一般全连接的情况意味着数据在Shuffle。

接下来Flink资源管理篇,如果对Flink感兴趣或者正在使用的小伙伴,可以加我入群一起探讨学习。

参考书籍《Flink 内核原理与实现》

欢迎关注公众号: 数据基石