InsertIntoTable Unary Logical Operator
InsertIntoTable
is a unary logical operator that represents the following:
-
INSERT INTO and INSERT OVERWRITE TABLE SQL statements
-
DataFrameWriter
is requested to insert the rows of a DataFrame into a table
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
// make sure that the tables are available in a catalog sql("CREATE TABLE IF NOT EXISTS t1(id long)") sql("CREATE TABLE IF NOT EXISTS t2(id long)") val q = sql("INSERT INTO TABLE t2 SELECT * from t1 LIMIT 100") val plan = q.queryExecution.logical scala> println(plan.numberedTreeString) 00 'InsertIntoTable 'UnresolvedRelation `t2`, false, false 01 +- 'GlobalLimit 100 02 +- 'LocalLimit 100 03 +- 'Project [*] 04 +- 'UnresolvedRelation `t1` // Dataset API's version of "INSERT OVERWRITE TABLE" in SQL spark.range(10).write.mode("overwrite").insertInto("t2") |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
INSERT INTO partitioned_table spark.range(10) .withColumn("p1", 'id % 2) .write .mode("overwrite") .partitionBy("p1") .saveAsTable("partitioned_table") val insertIntoQ = sql("INSERT INTO TABLE partitioned_table PARTITION (p1 = 4) VALUES 41, 42") scala> println(insertIntoQ.queryExecution.logical.numberedTreeString) 00 'InsertIntoTable 'UnresolvedRelation `partitioned_table`, Map(p1 -> Some(4)), false, false 01 +- 'UnresolvedInlineTable [col1], [List(41), List(42)] |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
INSERT OVERWRITE TABLE partitioned_table spark.range(10) .withColumn("p1", 'id % 2) .write .mode("overwrite") .partitionBy("p1") .saveAsTable("partitioned_table") val insertOverwriteQ = sql("INSERT OVERWRITE TABLE partitioned_table PARTITION (p1 = 4) VALUES 40") scala> println(insertOverwriteQ.queryExecution.logical.numberedTreeString) 00 'InsertIntoTable 'UnresolvedRelation `partitioned_table`, Map(p1 -> Some(4)), true, false 01 +- 'UnresolvedInlineTable [col1], [List(40)] |
InsertIntoTable
is created with partition keys that correspond to the partitionSpec
part of the following SQL statements:
-
INSERT INTO TABLE
(with the overwrite and ifPartitionNotExists flags off) -
INSERT OVERWRITE TABLE
(with the overwrite and ifPartitionNotExists flags off)
InsertIntoTable
has no partition keys when created as follows:
-
insertInto operator from the Catalyst DSL
-
DataFrameWriter.insertInto operator
InsertIntoTable
can never be resolved (i.e. InsertIntoTable
should not be part of a logical plan after analysis and is supposed to be converted to logical commands at analysis phase).
Logical Command | Description |
---|---|
When HiveAnalysis resolution rule transforms |
|
When DataSourceAnalysis posthoc logical resolution resolves an |
|
When DataSourceAnalysis posthoc logical resolution transforms |
Caution
|
FIXME What’s the difference between HiveAnalysis that converts InsertIntoTable(r: HiveTableRelation…) to InsertIntoHiveTable and RelationConversions that converts InsertIntoTable(r: HiveTableRelation,…) to InsertIntoTable (with LogicalRelation )?
|
Note
|
Inserting into views or RDD-based tables is not allowed (and fails at analysis). |
InsertIntoTable
(with UnresolvedRelation leaf logical operator) is created when:
-
INSERT INTO
orINSERT OVERWRITE TABLE
SQL statements are executed (as a single insert or a multi-insert query) -
DataFrameWriter
is requested to insert a DataFrame into a table -
RelationConversions
logical evaluation rule is executed (and transformsInsertIntoTable
operators) -
CreateHiveTableAsSelectCommand
logical command is executed
InsertIntoTable
has an empty output schema.
Tip
|
Use
|
Creating InsertIntoTable Instance
InsertIntoTable
takes the following when created:
-
Logical plan for the table to insert into
-
Partition keys (with optional partition values for dynamic partition insert)
-
Logical plan representing the data to be written
-
overwrite
flag that indicates whether to overwrite an existing table or partitions (true
) or not (false
)
Inserting Into View Not Allowed
Inserting into a view is not allowed, i.e. a query plan with an InsertIntoTable
operator with a UnresolvedRelation leaf operator that is resolved to a View unary operator fails at analysis (when ResolveRelations logical resolution is executed).
1 2 3 4 5 |
Inserting into a view is not allowed. View: [name]. |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 |
// Create a view val viewName = "demo_view" sql(s"DROP VIEW IF EXISTS $viewName") assert(spark.catalog.tableExists(viewName) == false) sql(s"CREATE VIEW $viewName COMMENT 'demo view' AS SELECT 1,2,3") assert(spark.catalog.tableExists(viewName)) // The following should fail with an AnalysisException scala> spark.range(0).write.insertInto(viewName) org.apache.spark.sql.AnalysisException: Inserting into a view is not allowed. View: `default`.`demo_view`.; at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$8.applyOrElse(Analyzer.scala:644) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$8.applyOrElse(Analyzer.scala:640) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70) at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:288) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.apply(Analyzer.scala:640) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.apply(Analyzer.scala:586) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:87) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:84) at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:124) at scala.collection.immutable.List.foldLeft(List.scala:84) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:84) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:76) at scala.collection.immutable.List.foreach(List.scala:381) at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:76) at org.apache.spark.sql.catalyst.analysis.Analyzer.org$apache$spark$sql$catalyst$analysis$Analyzer$$executeSameContext(Analyzer.scala:124) at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:118) at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:103) at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:57) at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:55) at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:47) at org.apache.spark.sql.execution.QueryExecution.withCachedData$lzycompute(QueryExecution.scala:61) at org.apache.spark.sql.execution.QueryExecution.withCachedData(QueryExecution.scala:60) at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:66) at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:66) at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:72) at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:68) at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:77) at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:77) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:75) at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:654) at org.apache.spark.sql.DataFrameWriter.insertInto(DataFrameWriter.scala:322) at org.apache.spark.sql.DataFrameWriter.insertInto(DataFrameWriter.scala:308) ... 49 elided |
Inserting Into RDD-Based Table Not Allowed
Inserting into an RDD-based table is not allowed, i.e. a query plan with an InsertIntoTable
operator with one of the following logical operators (as the logical plan representing the table) fails at analysis (when PreWriteCheck extended logical check is executed):
-
Logical operator is not a leaf node
-
Range leaf operator
-
OneRowRelation leaf operator
-
LocalRelation leaf operator
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 |
// Create a temporary view val data = spark.range(1) data.createOrReplaceTempView("demo") scala> spark.range(0).write.insertInto("demo") org.apache.spark.sql.AnalysisException: Inserting into an RDD-based table is not allowed.;; 'InsertIntoTable Range (0, 1, step=1, splits=Some(8)), false, false +- Range (0, 0, step=1, splits=Some(8)) at org.apache.spark.sql.execution.datasources.PreWriteCheck$.failAnalysis(rules.scala:442) at org.apache.spark.sql.execution.datasources.PreWriteCheck$$anonfun$apply$14.apply(rules.scala:473) at org.apache.spark.sql.execution.datasources.PreWriteCheck$$anonfun$apply$14.apply(rules.scala:445) at org.apache.spark.sql.catalyst.trees.TreeNode.foreach(TreeNode.scala:117) at org.apache.spark.sql.execution.datasources.PreWriteCheck$.apply(rules.scala:445) at org.apache.spark.sql.execution.datasources.PreWriteCheck$.apply(rules.scala:440) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$2.apply(CheckAnalysis.scala:349) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$2.apply(CheckAnalysis.scala:349) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:349) at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:92) at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:105) at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:57) at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:55) at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:47) at org.apache.spark.sql.execution.QueryExecution.withCachedData$lzycompute(QueryExecution.scala:61) at org.apache.spark.sql.execution.QueryExecution.withCachedData(QueryExecution.scala:60) at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:66) at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:66) at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:72) at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:68) at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:77) at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:77) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:75) at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:654) at org.apache.spark.sql.DataFrameWriter.insertInto(DataFrameWriter.scala:322) at org.apache.spark.sql.DataFrameWriter.insertInto(DataFrameWriter.scala:308) ... 49 elided |