做代码和网站腾讯云域名注册
- 作者: 多梦笔记
- 时间: 2026年02月16日 16:05
当前位置: 首页 > news >正文
做代码和网站,腾讯云域名注册,中国建设银行阆中分行网站,响应式网站建设有利于seo文章目录 你可以学到啥测试代码背景知识SQL转变流程图问题 你可以学到啥
SQL如何一步步变成执行计划的有哪些优化器#xff0c;哪些优化规则calcite 和flink 如何结合的
测试代码
EnvironmentSettings settings EnvironmentSettings.inBatchMode();
TableEnvironment tabl… 文章目录 你可以学到啥测试代码背景知识SQL转变流程图问题 你可以学到啥
SQL如何一步步变成执行计划的有哪些优化器哪些优化规则calcite 和flink 如何结合的
测试代码
EnvironmentSettings settings EnvironmentSettings.inBatchMode();
TableEnvironment tableEnvironment TableEnvironment.create(settings);Schema schema Schema.newBuilder().column(count, DataTypes.INT()).column(word, DataTypes.STRING()).build();Schema schema1 Schema.newBuilder().column(id, DataTypes.INT()).column(name, DataTypes.STRING()).build();tableEnvironment.createTemporaryTable(aa_user, TableDescriptor.forConnector(filesystem).schema(schema).option(path, /Users/xx/IdeaProjects/flink-demo/data/order.csv).format(csv).build());tableEnvironment.createTemporaryTable(bb_order, TableDescriptor.forConnector(filesystem).schema(schema1).option(path, /Users/xx/IdeaProjects/flink-demo/data/user.csv).format(csv).build());String cost tableEnvironment.explainSql(select * from aa_user inner join bb_order on aa_user.countbb_order.id, ExplainDetail.ESTIMATED_COST);System.out.println(cost);背景知识
需要了解calcite 里的基本知识如AST,RelNode ,hepPlanner等等。 需要了解Flink 和Flink SQL里的一些知识
SQL转变流程图
SQL经过flink 里注册的每一个优化器优化后就能变成物理计划了不过要变成执行代码还要再经过代码生成。
问题
问题1FlinkBatchProgram 所有flink优化器都是在这个类里添加的
object FlinkBatchProgram {val SUBQUERY_REWRITE subquery_rewriteval TEMPORAL_JOIN_REWRITE temporal_join_rewriteval DECORRELATE decorrelateval DEFAULT_REWRITE default_rewriteval PREDICATE_PUSHDOWN predicate_pushdownval JOIN_REORDER join_reorderval JOIN_REWRITE join_rewriteval PROJECT_REWRITE project_rewriteval WINDOW windowval LOGICAL logicalval LOGICAL_REWRITE logical_rewriteval TIME_INDICATOR time_indicatorval PHYSICAL physicalval PHYSICAL_REWRITE physical_rewriteval DYNAMIC_PARTITION_PRUNING dynamic_partition_pruningval RUNTIME_FILTER runtime_filter}问题2calcite 优化器和flink 如何结合的 logicalphysical 这两个优化器都是用的VolcanoPlanner结合规则和代价。 剩下的优化器HepPlannerHepPlanner 完全使用规则。 问题3projectrewrite 后为啥少了LogicalProject ReNode ? 因为最后一个操作logicalproject 这里就是把所有的字段查出来了所有这一步实际上是不用的 问题4物理计划如何生成执行代码的 BatchPhysicalTableSourceScan 类
class BatchPhysicalTableSourceScan(cluster: RelOptCluster,traitSet: RelTraitSet,hints: util.List[RelHint],tableSourceTable: TableSourceTable)extends CommonPhysicalTableSourceScan(cluster, traitSet, hints, tableSourceTable)with BatchPhysicalRel {override def computeSelfCost(planner: RelOptPlanner, mq: RelMetadataQuery): RelOptCost {val rowCnt mq.getRowCount(this)if (rowCnt null) {return null}val cpu 0val rowSize mq.getAverageRowSize(this)val size rowCnt * rowSizeplanner.getCostFactory.makeCost(rowCnt, cpu, size)}// 这里生成的执行代码override def translateToExecNode(): ExecNode[] {val tableSourceSpec new DynamicTableSourceSpec(tableSourceTable.contextResolvedTable,util.Arrays.asList(tableSourceTable.abilitySpecs: _*))tableSourceSpec.setTableSource(tableSourceTable.tableSource)new BatchExecTableSourceScan(unwrapTableConfig(this),tableSourceSpec,FlinkTypeFactory.toLogicalRowType(getRowType),getRelDetailedDescription)}
}问题5为啥aa_user 表被广播哪里实现的
BatchPhysicalHashJoinRule 规则实现的
核心代码 val leftSize JoinUtil.binaryRowRelNodeSize(join.getLeft)val rightSize JoinUtil.binaryRowRelNodeSize(join.getRight)// if it is not with hint, just check size of left and right side by statistic and config// if leftSize or rightSize is unknown, cannot use broadcastif (leftSize null || rightSize null) {return (false, false)}val threshold tableConfig.get(OptimizerConfigOptions.TABLE_OPTIMIZER_BROADCAST_JOIN_THRESHOLD)val rightSizeSmallerThanThreshold rightSize thresholdval leftSizeSmallerThanThreshold leftSize thresholdval leftSmallerThanRight leftSize rightSizejoin.getJoinType match {case JoinRelType.LEFT (rightSizeSmallerThanThreshold, false)case JoinRelType.RIGHT (leftSizeSmallerThanThreshold, true)case JoinRelType.FULL (false, false)case JoinRelType.INNER (leftSizeSmallerThanThreshold|| rightSizeSmallerThanThreshold,leftSmallerThanRight)// left side cannot be used as build side in SEMI/ANTI join.case JoinRelType.SEMI | JoinRelType.ANTI (rightSizeSmallerThanThreshold, false)}主要就是实现 def binaryRowRelNodeSize(relNode: RelNode): JDouble {val mq relNode.getCluster.getMetadataQueryval rowCount mq.getRowCount(relNode)if (rowCount null) {null} else {rowCount * FlinkRelMdUtil.binaryRowAverageSize(relNode)}}最后还是到了FlinkRelMdColumnNullCount 这个类 从这个ts: TableScan 对象里取出来 那ts 对象又是在哪里赋值的看这个FlinkRecomputeStatisticsProgram 类
class FlinkRelMdColumnNullCount private extends MetadataHandler[ColumnNullCount] {override def getDef: MetadataDef[ColumnNullCount] FlinkMetadata.ColumnNullCount.DEF/*** Gets the null count of the given column in TableScan.** param ts* TableScan RelNode* param mq* RelMetadataQuery instance* param index* the index of the given column* return* the null count of the given column in TableScan*/def getColumnNullCount(ts: TableScan, mq: RelMetadataQuery, index: Int): JDouble {Preconditions.checkArgument(mq.isInstanceOf[FlinkRelMetadataQuery])val relOptTable ts.getTable.asInstanceOf[FlinkPreparingTableBase]val fieldNames relOptTable.getRowType.getFieldNamesPreconditions.checkArgument(index 0 index fieldNames.size())val fieldName fieldNames.get(index)val statistic relOptTable.getStatisticval colStats statistic.getColumnStats(fieldName)if (colStats ! null colStats.getNullCount ! null) {colStats.getNullCount.toDouble} else {null}}}ts是在这里赋值这里最后会用调用具体的文件系统找到文件行数 private LogicalTableScan recomputeStatistics(LogicalTableScan scan) {final RelOptTable scanTable scan.getTable();if (!(scanTable instanceof TableSourceTable)) {return scan;}FlinkContext context ShortcutUtils.unwrapContext(scan);TableSourceTable table (TableSourceTable) scanTable;boolean reportStatEnabled context.getTableConfig().get(TABLE_OPTIMIZER_SOURCE_REPORT_STATISTICS_ENABLED) table.tableSource() instanceof SupportsStatisticReport;SourceAbilitySpec[] specs table.abilitySpecs();PartitionPushDownSpec partitionPushDownSpec getSpec(specs, PartitionPushDownSpec.class);FilterPushDownSpec filterPushDownSpec getSpec(specs, FilterPushDownSpec.class);TableStats newTableStat recomputeStatistics(table, partitionPushDownSpec, filterPushDownSpec, reportStatEnabled);FlinkStatistic newStatistic FlinkStatistic.builder().statistic(table.getStatistic()).tableStats(newTableStat).build();TableSourceTable newTable table.copy(newStatistic);return new LogicalTableScan(scan.getCluster(), scan.getTraitSet(), scan.getHints(), newTable);}
相关文章
-
做代练去什么网站安全吗wordpress识别环境的文件
做代练去什么网站安全吗wordpress识别环境的文件
- 站长
- 2026年02月16日
-
做代理的网站学院网络营销策划方案
做代理的网站学院网络营销策划方案
- 站长
- 2026年02月16日
-
做打折的淘宝小卖家的网站wordpress中文网站
做打折的淘宝小卖家的网站wordpress中文网站
- 站长
- 2026年02月16日
-
做单抗药的看什么网站好链接提交使用说明
做单抗药的看什么网站好链接提交使用说明
- 站长
- 2026年02月16日
-
做单页网站需要做什么的动漫制作专业累吗
做单页网站需要做什么的动漫制作专业累吗
- 站长
- 2026年02月16日
-
做单页网站要多少钱地下城做心悦任务的网站
做单页网站要多少钱地下城做心悦任务的网站
- 站长
- 2026年02月16日
