Skip to content

Latest commit

 

History

History
440 lines (351 loc) · 13.7 KB

FlinkSQL源码解析.md

File metadata and controls

440 lines (351 loc) · 13.7 KB

Flink SQL执行流程

0.预览 AST、Optimized Logical Plan、Physical Execution Plan

  • 程序方法可以打印 待执行Sql的抽象语法树(Abstract Syntax Tree)、优化后的逻辑计划以及物理计划: == Abstract Syntax Tree == == Optimized Logical Plan == == Physical Execution Plan ==
== Abstract Syntax Tree ==
LogicalProject(user=[$0], product=[$1], amount=[$2])
+- LogicalFilter(condition=[>($2, 2)])
   +- LogicalUnion(all=[true])
      :- LogicalProject(user=[$0], product=[$1], amount=[$2])
      :  +- LogicalFilter(condition=[<($0, 3)])
      :     +- LogicalTableScan(table=[[default_catalog, default_database, OrderA]])
      +- LogicalProject(user=[$0], product=[$1], amount=[$2])
         +- LogicalFilter(condition=[<>($1, _UTF-16LE'rubber')])
            +- LogicalTableScan(table=[[default_catalog, default_database, OrderB]])

== Optimized Logical Plan ==
Union(all=[true], union=[user, product, amount])
:- Calc(select=[user, product, amount], where=[AND(<(user, 3), >(amount, 2))])
:  +- DataStreamScan(table=[[default_catalog, default_database, OrderA]], fields=[user, product, amount])
+- Calc(select=[user, product, amount], where=[AND(<>(product, _UTF-16LE'rubber':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"), >(amount, 2))])
   +- DataStreamScan(table=[[default_catalog, default_database, OrderB]], fields=[user, product, amount])

== Physical Execution Plan ==
Stage 1 : Data Source
	content : Source: Collection Source

Stage 2 : Data Source
	content : Source: Collection Source

	Stage 3 : Operator
		content : SourceConversion(table=[default_catalog.default_database.OrderA], fields=[user, product, amount])
		ship_strategy : FORWARD

		Stage 4 : Operator
			content : Calc(select=[user, product, amount], where=[((user < 3) AND (amount > 2))])
			ship_strategy : FORWARD

			Stage 5 : Operator
				content : SourceConversion(table=[default_catalog.default_database.OrderB], fields=[user, product, amount])
				ship_strategy : FORWARD

				Stage 6 : Operator
					content : Calc(select=[user, product, amount], where=[((product <> _UTF-16LE'rubber':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND (amount > 2))])
					ship_strategy : FORWARD

1.Parse阶段(语法解析)

  • 使用JavaCC把SQL转换成抽象语法树(AST),在Calcite中用SQLNode来表示

2.Validate阶段(语法校验)

  • 根据元数据信息进行验证,例如查询的表、使用的函数是否存在,会分别对 from / where / group by / having / select / order by 等子句进行validate ,校验之后仍然是 SqlNode 构成的语法树;

3.Convert阶段

  • 语义分析,根据SqlNode和元数据信息构建关系表达式RelNode树,也就是最初版本的逻辑计划。

4.Optimize阶段(查询计划优化)

  • 首先将 SqlNode 语法树转换成关系表达式 RelNode 构成的逻辑树
  • 然后使用优化器基于规则进行等价变换,例如我们比较熟悉的谓词下推、列裁剪等,经过优化器优化后得到最优的查询计划;RBO(rule based optimization)、CBO(cost based optimization)

5.Execute阶段

  • 将逻辑查询计划翻译成物理执行计划,生成对应的可执行代码,提交运行。转换成Flink识别的StreamGraph、JobGraph

Blink Planner

runtime

data

  • binary:blink中将原生planner的Row格式修改为BinaryRow,减少序列化和让数据存储更加紧凑。

MemorySegment

  • 底层存储二进制数据的抽象类,分为HybridMemorySegment(混合型存储)和HeapMemorySegment(堆内存储)实现

BinaryRowData

  • 分为定长数据和可变数据,底层Row存储格式

UpdatableRowData

  • 可修改的行记录,记录读取的行记录,和修改后的记录
	// 行记录
	private RowData row;
	// 记录修改过后的值
	private final Object[] fields;
	// 是否updated
	private final boolean[] updated;
	
	public void setField(int pos, Object value) {
		updated[pos] = true;
		fields[pos] = value;
	}

JoinedRowData

  • 存储Join后的RowData
	// sql的行种类
	private RowKind rowKind = RowKind.INSERT;
	// 俩个join的rowData
	private RowData row1;
	private RowData row2;

ColumnarRowData

  • 按照列存储的方式存储数据
	private RowKind rowKind = RowKind.INSERT;
	//Verctor列批处理,存储每个列的数据,根据rowId来获取对应行的这列值,理解为列存储
	private VectorizedColumnBatch vectorizedColumnBatch;
	// 行id,记录处理的行数
	private int rowId;

ColumnarArrayData

  • 数组方式存储列的全部数据,根据对应的offset和pos获取对应行的列的值
 // 存储每列的数据
	private final ColumnVector data;
	// 偏移量,offset+pos获取指定行的数据
	private final int offset;
	// 元素总数
	private final int numElements;

BoxedWrapperRowData

  • 装箱包装RowData,数据存储在Object数组中
// sql操作类型
	private RowKind rowKind = RowKind.INSERT; // INSERT as default

	protected final Object[] fields;

BinaryWriter

  • 二进制数据写入器,主要负责将Row,Array等类型按照二进制格式写入
/**
 * Writer to write a composite data format, like row, array.
 * 1. Invoke {@link #reset()}.
 * // 通过writeXx或者setNullAt写入每个字段,相同字段不能重复写入
 * 2. Write each field by writeXX or setNullAt. (Same field can not be written repeatedly.)
 * 3. Invoke {@link #complete()}.
 */
@Internal
public interface BinaryWriter 

filesystem

PartitionCommitTrigger

  • 分区提交触发器,触发何时提交分区
public interface PartitionCommitTrigger {

	String PARTITION_TIME = "partition-time";
	String PROCESS_TIME = "process-time";

	/**
	 * Add a pending partition.
	 */
	void addPartition(String partition);

	/**
	 * 获取可提交的分区,并清理无用的水印和分区
	 * Get committable partitions, and cleanup useless watermarks and partitions.
	 */
	List<String> committablePartitions(long checkpointId) throws IOException;

	/**
	 * End input, return committable partitions and clear.
	 */
	List<String> endInput();

	/**
	 * Snapshot state.
	 */
	void snapshotState(long checkpointId, long watermark) throws Exception;

	static PartitionCommitTrigger create(
			boolean isRestored,
			OperatorStateStore stateStore,
			Configuration conf,
			ClassLoader cl,
			List<String> partitionKeys,
			ProcessingTimeService procTimeService) throws Exception {
		// 获取`sink.partition-commit.trigger`配置,默认为process-time触发器
		String trigger = conf.get(SINK_PARTITION_COMMIT_TRIGGER);
		switch (trigger) {
			case PARTITION_TIME:
				return new PartitionTimeCommitTigger(
						isRestored, stateStore, conf, cl, partitionKeys);
			case PROCESS_TIME:
				return new ProcTimeCommitTigger(
						isRestored, stateStore, conf, procTimeService);
			default:
				throw new UnsupportedOperationException(
						"Unsupported partition commit trigger: " + trigger);
		}
	}
}

PartitionTimeCommitTigger

  • partition-time分区提交触发器,用于提交基于分区时间的分区
public class PartitionTimeCommitTigger implements PartitionCommitTrigger {
	// 记录等待提交的分区
	private static final ListStateDescriptor<List<String>> PENDING_PARTITIONS_STATE_DESC =
			new ListStateDescriptor<>(
					"pending-partitions",
					new ListSerializer<>(StringSerializer.INSTANCE));
	// 记录checkpointid对应的watermark
	private static final ListStateDescriptor<Map<Long, Long>> WATERMARKS_STATE_DESC =
			new ListStateDescriptor<>(
					"checkpoint-id-to-watermark",
					new MapSerializer<>(LongSerializer.INSTANCE, LongSerializer.INSTANCE));

	private final ListState<List<String>> pendingPartitionsState;
	// 等待中的分区
	private final Set<String> pendingPartitions;

	// watermarker状态,mapkey为checkpointid,value为watermark
	private final ListState<Map<Long, Long>> watermarksState;
	// 记录watermark
	private final TreeMap<Long, Long> watermarks;
	// 分区时间提取器
	private final PartitionTimeExtractor extractor;
	// 提交延迟时间
	private final long commitDelay;
	// 分区key
	private final List<String> partitionKeys;

	public PartitionTimeCommitTigger(
			boolean isRestored,
			OperatorStateStore stateStore,
			Configuration conf,
			ClassLoader cl,
			List<String> partitionKeys) throws Exception {
		// 初始化等待提交分区状态
		this.pendingPartitionsState = stateStore.getListState(PENDING_PARTITIONS_STATE_DESC);
		this.pendingPartitions = new HashSet<>();
		// 是否为恢复状态
		if (isRestored) {
			// 将从状态后端中获取的带提交分区放入内存中
			pendingPartitions.addAll(pendingPartitionsState.get().iterator().next());
		}

		this.partitionKeys = partitionKeys;
		// 获取"sink.partition-commit.delay"延迟,默认为0ms
		this.commitDelay = conf.get(SINK_PARTITION_COMMIT_DELAY).toMillis();
		// 根据指定配置实例化出分区时间提交器
		this.extractor = PartitionTimeExtractor.create(
				cl,
				// 指定分区时间提交器类型,默认为 default,可以选择custom
				conf.get(PARTITION_TIME_EXTRACTOR_KIND),
				// 指定自定义的分区时间提取器
				conf.get(PARTITION_TIME_EXTRACTOR_CLASS),
				// 提取时间匹配的格式
				conf.get(PARTITION_TIME_EXTRACTOR_TIMESTAMP_PATTERN));

		// 获取watermark状态
		this.watermarksState = stateStore.getListState(WATERMARKS_STATE_DESC);
		this.watermarks = new TreeMap<>();
		if (isRestored) {
			// 恢复状态
			watermarks.putAll(watermarksState.get().iterator().next());
		}
	}

	@Override
	public void addPartition(String partition) {
		if (!StringUtils.isNullOrWhitespaceOnly(partition)) {
			this.pendingPartitions.add(partition);
		}
	}

	@Override
	public List<String> committablePartitions(long checkpointId) {
		// 判断提交的分区是否已经checkpoint完毕
		if (!watermarks.containsKey(checkpointId)) {
			throw new IllegalArgumentException(String.format(
					"Checkpoint(%d) has not been snapshot. The watermark information is: %s.",
					checkpointId, watermarks));
		}
		// 获取其watermarker
		long watermark = watermarks.get(checkpointId);

		watermarks.headMap(checkpointId, true).clear();

		List<String> needCommit = new ArrayList<>();
		Iterator<String> iter = pendingPartitions.iterator();
		while (iter.hasNext()) {
			String partition = iter.next();
			// 从指定分区key中提取分区时间
			LocalDateTime partTime = extractor.extract(
					partitionKeys, extractPartitionValues(new Path(partition)));
			// 如果watermarker大于分区时间+延迟时间,则可以提交,提交后移除
			if (watermark > toMills(partTime) + commitDelay) {
				needCommit.add(partition);
				iter.remove();
			}
		}
		return needCommit;
	}

	/**
	 * 快照状态
	 * @param checkpointId
	 * @param watermark
	 * @throws Exception
	 */
	@Override
	public void snapshotState(long checkpointId, long watermark) throws Exception {
		pendingPartitionsState.clear();
		// 将内存中数据加入state
		pendingPartitionsState.add(new ArrayList<>(pendingPartitions));

		watermarks.put(checkpointId, watermark);
		watermarksState.clear();
		watermarksState.add(new HashMap<>(watermarks));
	}

	@Override
	public List<String> endInput() {
		ArrayList<String> partitions = new ArrayList<>(pendingPartitions);
		pendingPartitions.clear();
		return partitions;
	}
}

ProcTimeCommitTigger

  • processTime提交触发器
public class ProcTimeCommitTigger implements PartitionCommitTrigger {

	private static final ListStateDescriptor<Map<String, Long>> PENDING_PARTITIONS_STATE_DESC =
			new ListStateDescriptor<>(
					"pending-partitions-with-time",
					new MapSerializer<>(StringSerializer.INSTANCE, LongSerializer.INSTANCE));
	// 等待的分区和创建时process时间
	private final ListState<Map<String, Long>> pendingPartitionsState;
	private final Map<String, Long> pendingPartitions;
	private final long commitDelay;
	private final ProcessingTimeService procTimeService;

	public ProcTimeCommitTigger(
			boolean isRestored,
			OperatorStateStore stateStore,
			Configuration conf,
			ProcessingTimeService procTimeService) throws Exception {
		this.pendingPartitionsState = stateStore.getListState(PENDING_PARTITIONS_STATE_DESC);
		this.pendingPartitions = new HashMap<>();
		if (isRestored) {
			pendingPartitions.putAll(pendingPartitionsState.get().iterator().next());
		}

		this.procTimeService = procTimeService;
		this.commitDelay = conf.get(SINK_PARTITION_COMMIT_DELAY).toMillis();
	}

	@Override
	public void addPartition(String partition) {
		if (!StringUtils.isNullOrWhitespaceOnly(partition)) {
			this.pendingPartitions.putIfAbsent(partition, procTimeService.getCurrentProcessingTime());
		}
	}

	@Override
	public List<String> committablePartitions(long checkpointId) {
		List<String> needCommit = new ArrayList<>();
		// 获取当前processTime
		long currentProcTime = procTimeService.getCurrentProcessingTime();
		Iterator<Map.Entry<String, Long>> iter = pendingPartitions.entrySet().iterator();
		while (iter.hasNext()) {
			Map.Entry<String, Long> entry = iter.next();
			long creationTime = entry.getValue();
			// 如果提交延迟为0或者当前process时间大于创建时间+提交延迟则提交分区
			if (commitDelay == 0 || currentProcTime > creationTime + commitDelay) {
				needCommit.add(entry.getKey());
				iter.remove();
			}
		}
		return needCommit;
	}

	@Override
	public void snapshotState(long checkpointId, long watermark) throws Exception {
		pendingPartitionsState.clear();
		pendingPartitionsState.add(new HashMap<>(pendingPartitions));
	}

	@Override
	public List<String> endInput() {
		ArrayList<String> partitions = new ArrayList<>(pendingPartitions.keySet());
		pendingPartitions.clear();
		return partitions;
	}
}

runtime

TableFunctionCollector

  • 表函数收集器,收集表函数计算的最终结果传递至下游