InsertIntoDataSourceCommand Logical Command
InsertIntoDataSourceCommand is a RunnableCommand that inserts or overwrites data in an InsertableRelation (per overwrite flag).
InsertIntoDataSourceCommand is created exclusively when DataSourceAnalysis logical resolution is executed (and resolves an InsertIntoTable unary logical operator with a LogicalRelation on an InsertableRelation).
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
sql("DROP TABLE IF EXISTS t2") sql("CREATE TABLE t2(id long)") val query = "SELECT * FROM RANGE(1)" // Using INSERT INTO SQL statement so we can access QueryExecution // DataFrameWriter.insertInto returns no value val q = sql("INSERT INTO TABLE t2 " + query) val logicalPlan = q.queryExecution.logical scala> println(logicalPlan.numberedTreeString) 00 'InsertIntoTable 'UnresolvedRelation `t2`, false, false 01 +- 'Project [*] 02 +- 'UnresolvedTableValuedFunction RANGE, [1] val analyzedPlan = q.queryExecution.analyzed scala> println(analyzedPlan.numberedTreeString) 00 InsertIntoHiveTable `default`.`t2`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, false, false, [id#6L] 01 +- Project [id#6L] 02 +- Range (0, 1, step=1, splits=None) |
InsertIntoDataSourceCommand returns the logical query plan when requested for the inner nodes (that should be shown as an inner nested tree of this node).
|
1 2 3 4 5 6 7 8 9 10 11 |
val query = "SELECT * FROM RANGE(1)" val sqlText = "INSERT INTO TABLE t2 " + query val plan = spark.sessionState.sqlParser.parsePlan(sqlText) scala> println(plan.numberedTreeString) 00 'InsertIntoTable 'UnresolvedRelation `t2`, false, false 01 +- 'Project [*] 02 +- 'UnresolvedTableValuedFunction RANGE, [1] |
Executing Logical Command (Inserting or Overwriting Data in InsertableRelation) — run Method
|
1 2 3 4 5 |
run(session: SparkSession): Seq[Row] |
|
Note
|
run is part of RunnableCommand Contract to execute (run) a logical command.
|
run takes the InsertableRelation (that is the relation of the LogicalRelation).
run then creates a DataFrame for the logical query plan and the input SparkSession.
run requests the DataFrame for the QueryExecution that in turn is requested for the RDD (of the structured query). run requests the LogicalRelation for the output schema.
With the RDD and the output schema, run creates another DataFrame that is the RDD[InternalRow] with the schema applied.
run requests the InsertableRelation to insert or overwrite data.
In the end, since the data in the InsertableRelation has changed, run requests the CacheManager to recacheByPlan with the LogicalRelation.
|
Note
|
run requests the SparkSession for SharedState that is in turn requested for the CacheManager.
|
Creating InsertIntoDataSourceCommand Instance
InsertIntoDataSourceCommand takes the following when created:
-
LogicalRelation leaf logical operator
spark技术分享