关注 spark技术分享,
撸spark源码 玩spark最佳实践

理解spark sql 优化策略的最好方法就是自己实现一个

admin阅读(2642)

spark sql 的优化框架 Catalyst 博大精深,里面的精华是很多大牛一个pr一个pr积累起来的,仔细琢磨琢磨相关源码也是一件痛并快乐的事情,spark 逻辑优化就是在一个 AST 树上进行匹配,匹配到一定的规则,然后进行等价变换规则,从而使计算的成本更低,今天我带大家自己实现一个逻辑优化规则,帮助大家更快地理解spark sql 逻辑优化的底层原理,如果对 spark sql 总体架构不了解的,可以先看这篇文章 是时候学习真正的spark技术了 了解全貌。

我们的例子非常简单,先注册一个表,包含一个 a 字段:

理解spark sql 优化策略的最好方法就是自己实现一个

我们看下当前的执行计划:


理解spark sql 优化策略的最好方法就是自己实现一个

可以看到这个执行计划是比较费的, 因为对于   (a * 1) 这个算式来讲,其实就等于a本身,我们针对这种规则自定义一个 逻辑优化规则

理解spark sql 优化策略的最好方法就是自己实现一个

理解spark sql 优化策略的最好方法就是自己实现一个

上面的代码很好理解,如果匹配到一个变量乘以1的表达式,就直接变换为变量本身, 应用完这个规则 (a#27 * 1) 就变为了 a#27:

理解spark sql 优化策略的最好方法就是自己实现一个

这样就少了一次乘法运算,从而提高了性能。


上面我们是从内部测试,如果你在应用中要定义一个基于规则的优化,然后让这个优化策略自动应用到你写的sql中,可以如下方式定义

理解spark sql 优化策略的最好方法就是自己实现一个

sparkSession 中给用户留了扩展点,Spark catalyst的扩展点在SPARK-18127中被引入,Spark用户可以在SQL处理的各个阶段扩展自定义实现,非常强大高效

  • injectOptimizerRule – 添加optimizer自定义规则,optimizer负责逻辑执行计划的优化,我们例子中就是扩展了逻辑优化规则。
  • injectParser – 添加parser自定义规则,parser负责SQL解析。
  • injectPlannerStrategy – 添加planner strategy自定义规则,planner负责物理执行计划的生成。
  • injectResolutionRule – 添加Analyzer自定义规则到Resolution阶段,analyzer负责逻辑执行计划生成。
  • injectPostHocResolutionRule – 添加Analyzer自定义规则到Post Resolution阶段。
  • injectCheckRule – 添加Analyzer自定义Check规则。

其他几种扩展我们可以也会举例说明,今天只讲解一下怎么扩展逻辑优化规则。

大家都在看

spark sql源码系列:

是时候学习真正的spark技术了  从0到1认识 spark sql  spark sql 源码剖析 PushDownPredicate   spark sql 源码剖析 OptimizeIn 篇

structured streaming 系列:

structured streaming 原理剖析   structured streaming 碰上kafka structured streaming 是如何搞定乱序时间的

spark streaming 系列:

spark streaming 读取kafka各种姿势详解   spark streaming流式计算中的困境与解决之道 

spark core 系列:

彻底搞懂spark shuffle过程(1)  彻底搞懂spark shuffle过程(2)  spark内存管理-Tungsten框架探秘

spark 机器学习系列





关注【spark技术分享】


一起撸spark源码,一起玩spark最佳实践


理解spark sql 优化策略的最好方法就是自己实现一个

原文始发于微信公众号(spark技术分享):理解spark sql 优化策略的最好方法就是自己实现一个

spark 2.4让你飞一般的处理复杂数据类型

admin阅读(3132)

spark 2.4 对复杂数据处理类型引入了 29 个内嵌函数,文档参考 https://docs.databricks.com/_static/notebooks/apache-spark-2.4-functions.html,里面包含一些 higher-order 函数,就跟scala 里面的 map filter reduce 一样,让你在sql中也可以享受函数式编程的快感。


我们都知道,在spark2.4 之前,处理复杂数据类型是一件比较痛苦的事情,有两种比较恶心的处理方式


  •  使用 explod 表达式把嵌套数据类型平展开,应用你自己的处理逻辑,再用 collect_list 表达式在拼凑起来,
  • 自定义一个 udf 函数处理多层嵌套的数据类型

在 spark2.4 之后,你就轻松了,可以使用多种内嵌函数处理复杂类型,对 array 或者 map类型的列处理起来很easy, 如果满足不了你的需求,你可以直接在sql中写lambda 表达式,怎么用,怎么爽。

1 匿名lambda函数使用姿势

下面举个例子说明下:

假如我们有这样一个 dataframe, 有两列,vals 列是个数组,我们的需求是对数组中的多个元素都 +1

spark 2.4让你飞一般的处理复杂数据类型

spark 2.4 之前的写法是:


spark 2.4让你飞一般的处理复杂数据类型

这样会有几个问题, 如果有两个 id 为1的行,平展开在组合后的结果就只有一行了,这就错了,而且带着 group by 肯定就涉及到 shuffle 操作了,性能会下降,而且shuffle 操作不保证数据元素的顺序,有可能数组元素顺序就变了。


另外一种写法是自定义一个UDF:


spark 2.4让你飞一般的处理复杂数据类型

spark 2.4让你飞一般的处理复杂数据类型
这种用法正确性倒是没有问题, 但是会损失性能,下文中会进行详细分析。
如果我们使用 spark2.4 提供的  higher-order 函数, 里面定义一个匿名lambda函数,就轻松了:


spark 2.4让你飞一般的处理复杂数据类型

这个 transform 函数会遍历数组,然后应用你定义的匿名lambda函数,是不是很简单。


下面我举个复杂一些的例子:

key values nested_values
1 [1, 2, 3] [[1, 2, 3], [], [4, 5]]

 如果我们想对数组中的每个元素都加上同一行的key,sql可以写成这样:


spark 2.4让你飞一般的处理复杂数据类型如果你需要处理多层嵌套的数据类型,比如我们例子中的nested_values,没关系,你直接写一个两层的匿名lambda函数 就可以了:

spark 2.4让你飞一般的处理复杂数据类型

2 性能好在哪里


有人就问了,这种在 sql 中写 匿名lambda函数 就是轻便了一些,和 自定义一个 udf 到底有什么差距,其实我今天就是想重点探讨一下这个问题

其实两者的差距就在于直接写lambda函数不需要序列化和反序列化, udf 需要,你想呀,如果对每条数据都要进行序列化和反序列化, 对于海量数据,性能必定有很大的损失。

对于 tansform 处理一个数组,spark2.4 内部会创建一个 tansform 类型的表达式节点

spark 2.4让你飞一般的处理复杂数据类型

这个节点对数组的处理流程如下,需要注意的是,spark 会使用 encoder 把加载的数据,或者jvm对象转换为一种内部的数组字节格式 InternalRow,这种不同于java 序列化,虽然都是把对象转换为字节数组,但是表达式生成的代码可以直接操作字节数组,而不需要反序列化,这种字节数组格式大大提高了处理时间效率和空间效率。

spark 2.4让你飞一般的处理复杂数据类型

arrayTransform 表达式会遍历数组,然后应用你定义的匿名lamdba 函数,最后更新相应的元素。

下面我们来看下 udf 方式的处理方式:

spark 2.4让你飞一般的处理复杂数据类型

看到没有,中间处理过程中,需要先把catalyst类型(也就是  InternalRow 格式) 格式转换为 scala 类型, 然后应用自定义函数,然后再转回去,多了一次序列化和反序列化的性能损耗,所以如果在海量数据下,这种性能损失还是很大的。


大家都在看

spark sql源码系列:

是时候学习真正的spark技术了  从0到1认识 spark sql  spark sql 源码剖析 PushDownPredicate   spark sql 源码剖析 OptimizeIn 篇

structured streaming 系列:

structured streaming 原理剖析   structured streaming 碰上kafka structured streaming 是如何搞定乱序时间的

spark streaming 系列:

spark streaming 读取kafka各种姿势详解   spark streaming流式计算中的困境与解决之道 

spark core 系列:

彻底搞懂spark shuffle过程(1)  彻底搞懂spark shuffle过程(2)  spark内存管理-Tungsten框架探秘

spark 机器学习系列





关注【spark技术分享】


一起撸spark源码,一起玩spark最佳实践


spark 2.4让你飞一般的处理复杂数据类型



听说新版微信这有个好看


原文始发于微信公众号(spark技术分享):spark 2.4让你飞一般的处理复杂数据类型

spark sql 源码剖析 OptimizeIn 篇

admin阅读(2658)

spark sql 的优化框架 Catalyst 博大精深,里面的精华是很多大牛一个pr一个pr积累起来的,仔细琢磨琢磨相关源码也是一件痛并快乐的事情,今天我来抛砖引玉,讲讲 逻辑优化里面 OptimizeIn 的实现原理,如果对 spark sql 总体架构不了解的,可以先看这篇文章 是时候学习真正的spark技术了 了解全貌。

关键词请忽略:spark sql内核剖析pdf spark源码分析 spark 2.0源码分析 spark sql流程 spark sql action spark sql parser spark core源码分析 spark sql execute plan spark sql内核剖析pdf spark源码分析 spark 2.0源码分析 spark sql流程 spark sql parser spark sql action spark sql ast spark dataframe row spark core源码分析 spark sql execute plan

OptimizeIn 的优化方式相对简单,所以今天这篇文章的篇幅应该比较短,大家理解起来也相对轻松,当然目的只有一个,就是带动大家看源码的热情,我们先来测试个例子:

spark sql 源码剖析 OptimizeIn 篇

这个例子很简单,我们就是建一个表, 表有3个字段 a,b,c  里面包含3条数据:

spark sql 源码剖析 OptimizeIn 篇

下面我们对比OptimizeIn 优化策略使用前和使用后的变化

spark sql 源码剖析 OptimizeIn 篇

spark sql 源码剖析 OptimizeIn 篇

可以看到, Filter 节点谓词表达式从 a#7 IN (a1,a2,a1)  变为了 a#7 IN (a1,a2) ,   其实这个变化不大,只是对 list 中的原始去重了,但是仍然是list类型,sql中谓词的判断还是要去遍历list,然后一个元素一个元素的去判断。这个主要的原因是有一个阈值,低于这个阈值就不转换为 HashSet, 毕竟元素太少了嘛,也没必要,这个配置是 spark.sql.optimizer.inSetConversionThreshold,默认值是 10 , 我们可以修改一下这个配置,然后再测试一下

关键词请忽略:spark sql内核剖析pdf spark源码分析 spark 2.0源码分析 spark sql流程 spark sql action spark sql parser spark core源码分析 spark sql execute plan spark sql内核剖析pdf spark源码分析 spark 2.0源码分析 spark sql流程 spark sql parser spark sql action spark sql ast spark dataframe row spark core源码分析 spark sql execute plan

spark sql 源码剖析 OptimizeIn 篇

spark sql 源码剖析 OptimizeIn 篇

你会发现Filter 节点里面的谓词表达式变为了 a#7 INSET (a1,a2),这种情况使用hashSet 数据结构提升性能,这个在元素比较多的情况下,意义巨大。

 

按照我之前的原则,我一般不会贴源码,因为这个优化策略实现代码比较少,我就直接贴一下:

spark sql 源码剖析 OptimizeIn 篇

我们来分析一下这段代码,首先向下遍历所有的表达式,进行匹配处理,如果match 到相应的模式,就使用新的表达式替换匹配到的表达式。

首先第一种情况,在 list 为空的情况下,替换为一种 If 表达式,

这时候如果 字段的值为空,就返回一个null的常量,这种情况就是

 select xxx from t where null  in ()

如果不会空,就直接把 Filter的谓词赋值为Flase, 这样对每一行就直接返回一个 False, 全部过滤掉,提升了性能

select xxx from t where x in () –> select xxx from t  where flase

下面是在 list 不为空的情况下,首先新建了一个 newList, 这个是一个表达式的 Set, 怎么理解呢,常量在sql中也是表达式,对于我们的例子:

spark sql 源码剖析 OptimizeIn 篇

我们来看下list 不为空的处理流程:

  • 如果 newList 长度为1, In 表达式直接转换为判断和这唯一元素是否相等的EqualTo表达式
  • 如果 newList  大于 spark.sql.optimizer.inSetConversionThreshold 配置的值,In表达式转变为 InSet 表达式,使用 hashSet 数据结构提升性能
  • 如果 newList 长度小于原来的list长度相等,也就意味中有重复元素,这时候In 表达式转变为一个 元素更少的 In 表达式,这种情况判断方式和原来一样,也是遍历list,一个元素一个元素的去比较
  • 如果newList 长度等于原来的list长度相等,就啥也不变

 

这个OptimizeIn 的逻辑蛮简单的,可以作为一个简单的例子学习一下。

 

关键词请忽略:spark sql内核剖析pdf spark源码分析 spark 2.0源码分析 spark sql流程 spark sql action spark sql parser spark core源码分析 spark sql execute plan spark sql内核剖析pdf spark源码分析 spark 2.0源码分析 spark sql流程 spark sql parser spark sql action spark sql ast spark dataframe row spark core源码分析 spark sql execute plan

大家都在看

spark sql源码系列:

是时候学习真正的spark技术了从0到1认识 spark sqlspark sql 源码剖析 PushDownPredicate

structured streaming 系列:

structured streaming 原理剖析structured streaming 碰上kafkastructured streaming 是如何搞定乱序时间的

spark streaming 系列:

spark streaming 读取kafka各种姿势详解spark streaming流式计算中的困境与解决之道

spark core 系列:

彻底搞懂spark shuffle过程(1)彻底搞懂spark shuffle过程(2)spark内存管理-Tungsten框架探秘

spark 机器学习系列

关注【spark技术分享】

一起撸spark源码,一起玩spark最佳实践

spark sql 源码剖析 OptimizeIn 篇

听说新版微信这有个好看


原文始发于微信公众号(spark技术分享):spark sql 源码剖析 OptimizeIn 篇

spark sql 源码剖析 PushDownPredicate:谓词不是想下推,想推就能推

admin阅读(4038)

spark sql 的优化框架 Catalyst 博大精深,里面的精华是很多大牛一个pr一个pr积累起来的,仔细琢磨琢磨相关源码也是一件痛并快乐的事情,今天我来抛砖引玉,讲讲 逻辑优化里面 谓词下推的实现,如果对 spark sql 总体架构不了解的,可以先看这篇文章 是时候学习真正的spark技术了 了解全貌。

谓词下推, 顾名思义,就是把过滤算子(就是你在 sql语句里面写的 where语句),尽可能地放在执行计划靠前的地方,好处就是尽早地过滤到不必要的数据,后续流程都节省了计算量,从而优化了性能。

关键词:hive谓词下推 spark谓词下推 mysql谓词下推 谓语下推 spark sql谓词下推 sql下推 project下推 Predicate Pushdown Rules(谓词下推规则) spark pushdown filters spark parquet filter pushdown

举个最简单的例子:

 

spark sql 源码剖析 PushDownPredicate:谓词不是想下推,想推就能推

我们对整个执行计划 explain 一下,就能清晰看到 spark sql 做的优化,Filter  operator 从 Project operator 后面挪到了前面。

 

spark sql 源码剖析 PushDownPredicate:谓词不是想下推,想推就能推

你可能觉得这也没有啥,不过对有些数据库,是直接可以把这个过滤下沉到 数据库层面,这样加载的数据量就少了很多,省了网络带宽,不过这个跟spark sql 没啥关系,就不提这个了。

上文说,要把 过滤算子 尽可能地放在执行计划靠前的地方, 这篇文章就是要把这个 尽可能掰扯清楚,哪些情况是可以挪动的,哪些情况是不可以挪动的。

spark sql 到了逻辑优化这一步就是利用scala强大的case正则匹配,对一个由各种operator组成的AST树尽其所能的匹配和修改,下面我们看下PushDownPredicate 优化策略都对哪些情况做了匹配优化

关键词:hive谓词下推 spark谓词下推 mysql谓词下推 谓语下推 spark sql谓词下推 sql下推 project下推 Predicate Pushdown Rules(谓词下推规则) spark pushdown filters spark parquet filter pushdown

1 Filter 有个Project类型的子节点

spark sql 源码剖析 PushDownPredicate:谓词不是想下推,想推就能推

这里匹配到的就是 Filter 算子,有个Project类型子节点的情况,就是我们上文例子中给的情况,然后后面又加了两个限制条件,  一个是 project 里面的要取的字段都是确定性的(deterministic),这个是啥意思呢,我举个例子

 

spark sql 源码剖析 PushDownPredicate:谓词不是想下推,想推就能推

spark sql 源码剖析 PushDownPredicate:谓词不是想下推,想推就能推

spark sql 源码剖析 PushDownPredicate:谓词不是想下推,想推就能推

这里的 monotonicallyIncreasingId 就是不确定性的一个 expression, 这个表达式会生成一个64位的id,这个id 是唯一和单调递增的,多个分区的开始值不同,作用就是生成一个递增的唯一Id,  我看了下这个 expression 的实现, 前 31位是有分区ID组成,后33位是在这个分区里面累加上去的,问题就出在这里, 因为这个值是一个有状态的值,后一行的值依赖前一行的值,这就导致如果你把Filter 下推了,我们的例子中,对于第二个分区的两行数据 id: 2 和 id:3 ,其中 id 为2的行被过滤掉了,Long_id 没有经过累加1,然后id为3的Long_id就成了 8589934592 而不是8589934593,而如果先执行 monotonicallyIncreasingId 再过滤,这个值是 8589934593 。这个Filter下推,导致了结果的不同,所以在谓词下推的时候,只有operator 包含的所有expression都是确定性的时候才可以下推, 同理不能下推的还包括 rand 表达式。这个bug影响的版本和修复pr参考 https://issues.apache.org/jira/browse/SPARK-13473

关键词:hive谓词下推 spark谓词下推 mysql谓词下推 谓语下推 spark sql谓词下推 sql下推 project下推 Predicate Pushdown Rules(谓词下推规则) spark pushdown filters spark parquet filter pushdown

2  Filter 有个Aggregate 类型的子节点

spark sql 源码剖析 PushDownPredicate:谓词不是想下推,想推就能推

这种情况, Filter 的有个Aggregate 类型子节点的情况(也就是你写的一些聚合操作), 同样的,aggregate 包含的表达式也必须是确定性的,还有一个条件是你Filter 的字段必须要在 group by 的维度字段里面,举个例子:

 

1 下面的聚合是可以 谓词下推的:

  • select a, count(*) as c from t1 group by a  where a ==“1″

 

2 下面的聚合是不可以谓词下推的:

  • select  count(*) as  c  from t1 where c ==  “10”
  • select a, count(b) as c   from t1  group by a  where c == “10″

这个其实是很好理解的,2 这种情况类似我们在 sql 里面写的 having 语句一样,是为了过滤分组聚合后的结果用的,如果把这个过滤下推,就相当于你把 count(*) 的别名 c 下推当成成了原始表中的 c字段,那么统计的结果就是错的, 而 1 的情况因为 a 字段在分组的字段里面, 这种经过 having 过滤后,其他 a 不为1 的分组肯定会被过滤掉, 所以 聚合后过滤   和 聚合前过滤,两者是等价的,可以谓词下推,相关的issue 和修复pr 参考  https://issues.apache.org/jira/browse/SPARK-22983

3  Filter 有个Window 窗口类型的子节点

 

针对 sql 中的窗口聚合,需要2种条件,才可以谓词下推:

 

  • 谓词下推的表达式必须是窗口聚合的分区key
  •  谓词必须是确定性的。

我们知道,spark 中窗口聚合操作,和普通聚合操作的不同的就在于,对于每个分组,前者对每一行都会算出来新的一行,后者对每个分组只会算出来一行,我们举个例子。

 

spark sql 源码剖析 PushDownPredicate:谓词不是想下推,想推就能推

spark sql 源码剖析 PushDownPredicate:谓词不是想下推,想推就能推这个例子是按照 部门进行分组,每个部门按照业绩 对员工进行降序排名,业绩相同的员工,rank 并列,假如这时候我们需要统计develop 部门 8 号员工的名次,可以在后面加上过滤条件

spark sql 源码剖析 PushDownPredicate:谓词不是想下推,想推就能推

我们来看下执行计划:

spark sql 源码剖析 PushDownPredicate:谓词不是想下推,想推就能推

 

你会发现 

spark sql 源码剖析 PushDownPredicate:谓词不是想下推,想推就能推

这个原因其实跟 group 聚合操作类似,假如我把  Filter (empNo#16L = 8 ) 下推了,那么过滤后就剩这一个员工了,那他肯定排第一,这个结果不就错了么,而 Filter(depName#15 = develop) 谓词可以下推是因为depName是分组的key,其实我后面只需要在这一个分组里面去窗口聚合,其他分组的数据拿了也是浪费,在前面过滤和在后面过滤在正确性上来讲是等价的。

 

4 Filter 的子节点只有部分类型才可以谓词下推

spark sql 源码剖析 PushDownPredicate:谓词不是想下推,想推就能推

spark sql 源码剖析 PushDownPredicate:谓词不是想下推,想推就能推

也就是说在 AST 树上多次应用谓词下推策略的时候,只有子节点是这些类型的 Operator的时候,才可以下推,其他的无法下推,比如Limit 类型,就不能谓词下推,这个也很好理解,举个例子:

 

spark sql 源码剖析 PushDownPredicate:谓词不是想下推,想推就能推

spark sql 源码剖析 PushDownPredicate:谓词不是想下推,想推就能推

 

一种是先过滤,然后再limit ,结果有10条数据,一种是先limit 在过滤,只有一条数据,很明显后者是对的。

好,抛砖引玉就到此为止,源码里面还有其他情况,比如碰到 EventTimeWatermark 应该怎么处理才能保证sql语义不错,这个等后面介绍过 Watermark 之后再讲。后面打算出一个 spark sql 源码解析的系列,这个就算是第一篇,大家如果有什么疑问直接留言或者发送我邮箱 1319027852@qq.com,欢迎交流。

 

关键词:hive谓词下推 spark谓词下推 mysql谓词下推 谓语下推 spark sql谓词下推 sql下推 project下推 Predicate Pushdown Rules(谓词下推规则) spark pushdown filters spark parquet filter pushdown

大家都在看

spark sql源码系列:

是时候学习真正的spark技术了  从0到1认识 spark sql

structured streaming 系列:

structured streaming 原理剖析   structured streaming 碰上kafka structured streaming 是如何搞定乱序时间的

spark streaming 系列:

spark streaming 读取kafka各种姿势详解   spark streaming流式计算中的困境与解决之道 

spark core 系列:

彻底搞懂spark shuffle过程(1)  彻底搞懂spark shuffle过程(2)  spark内存管理-Tungsten框架探秘

spark 机器学习系列

关注【spark技术分享】

一起撸spark源码,一起玩spark最佳实践

spark sql 源码剖析 PushDownPredicate:谓词不是想下推,想推就能推

听说新版微信这有个好看

原文始发于微信公众号(spark技术分享):spark sql 源码剖析 PushDownPredicate:谓词不是想下推,想推就能推

(obsolete) SQLContext

admin阅读(1819)

SQLContext

Caution

As of Spark 2.0.0 SQLContext is only for backward compatibility and is a mere wrapper of SparkSession.

In the pre-Spark 2.0’s ear, SQLContext was the entry point for Spark SQL. Whatever you did in Spark SQL it had to start from creating an instance of SQLContext.

A SQLContext object requires a SparkContext, a CacheManager, and a SQLListener. They are all transient and do not participate in serializing a SQLContext.

You should use SQLContext for the following:

Creating SQLContext Instance

You can create a SQLContext using the following constructors:

  • SQLContext(sc: SparkContext)
  • SQLContext.getOrCreate(sc: SparkContext)
  • SQLContext.newSession() allows for creating a new instance of SQLContext with a separate SQL configuration (through a shared SparkContext).

Setting Configuration Properties

You can set Spark SQL configuration properties using:

  • setConf(props: Properties): Unit
  • setConf(key: String, value: String): Unit

You can get the current value of a configuration property by key using:

  • getConf(key: String): String
  • getConf(key: String, defaultValue: String): String
  • getAllConfs: immutable.Map[String, String]
Note
Properties that start with spark.sql are reserved for Spark SQL.

Creating DataFrames

emptyDataFrame

emptyDataFrame creates an empty DataFrame. It calls createDataFrame with an empty RDD[Row] and an empty schema StructType(Nil).

createDataFrame for RDD and Seq

createDataFrame family of methods can create a DataFrame from an RDD of Scala’s Product types like case classes or tuples or Seq thereof.

createDataFrame for RDD of Row with Explicit Schema

This variant of createDataFrame creates a DataFrame from RDD of Row and explicit schema.

Registering User-Defined Functions (UDF)

udf method gives you access to UDFRegistration to manipulate user-defined functions. Functions registered using udf are available for Hive queries only.

Tip
Read up on UDFs in UDFs — User-Defined Functions document.

Caching DataFrames in In-Memory Cache

isCached method asks CacheManager whether tableName table is cached in memory or not. It simply requests CacheManager for CachedData and when exists, it assumes the table is cached.

You can cache a table in memory using cacheTable.

Caution
Why would I want to cache a table?

uncacheTable and clearCache remove one or all in-memory cached tables.

Implicits — SQLContext.implicits

The implicits object is a helper class with methods to convert objects into Datasets and DataFrames, and also comes with many Encoders for “primitive” types as well as the collections thereof.

Note

Import the implicits by import spark.implicits._ as follows:

It holds Encoders for Scala “primitive” types like Int, Double, String, and their collections.

It offers support for creating Dataset from RDD of any types (for which an encoder exists in scope), or case classes or tuples, and Seq.

It also offers conversions from Scala’s Symbol or $ to Column.

It also offers conversions from RDD or Seq of Product types (e.g. case classes or tuples) to DataFrame. It has direct conversions from RDD of Int, Long and String to DataFrame with a single column name _1.

Note
It is not possible to call toDF methods on RDD objects of other “primitive” types except Int, Long, and String.

Creating Datasets

createDataset family of methods creates a Dataset from a collection of elements of type T, be it a regular Scala Seq or Spark’s RDD.

It requires that there is an encoder in scope.

Note
Importing SQLContext.implicits brings many encoders available in scope.

Accessing DataFrameReader (read method)

The experimental read method returns a DataFrameReader that is used to read data from external storage systems and load it into a DataFrame.

Creating External Tables

The experimental createExternalTable family of methods is used to create an external table tableName and return a corresponding DataFrame.

Caution
FIXME What is an external table?

It assumes parquet as the default data source format that you can change using spark.sql.sources.default setting.

Dropping Temporary Tables

dropTempTable method drops a temporary table tableName.

Caution
FIXME What is a temporary table?

Creating Dataset[Long] (range method)

The range family of methods creates a Dataset[Long] with the sole id column of LongType for given start, end, and step.

Note
The three first variants use SparkContext.defaultParallelism for the number of partitions numPartitions.

Creating DataFrames for Table

table method creates a tableName table and returns a corresponding DataFrame.

Listing Existing Tables

table methods return a DataFrame that holds names of existing tables in a database.

The schema consists of two columns – tableName of StringType and isTemporary of BooleanType.

Note
tables is a result of SHOW TABLES [IN databaseName].

tableNames are similar to tables with the only difference that they return Array[String] which is a collection of table names.

Accessing StreamingQueryManager

The streams method returns a StreamingQueryManager that is used to…​TK

Caution
FIXME

Managing Active SQLContext for JVM

SQLContext.getOrCreate method returns an active SQLContext object for the JVM or creates a new one using a given sparkContext.

Note
It is a factory-like method that works on SQLContext class.

Interestingly, there are two helper methods to set and clear the active SQLContext object – setActive and clearActive respectively.

Executing SQL Queries

sql executes the sqlText SQL query.

Note
It supports Hive statements through HiveContext.

sql parses sqlText using a dialect that can be set up using spark.sql.dialect setting.

Note

sql is imported in spark-shell so you can execute Hive statements without spark prefix.

Tip
You may also use spark-sql shell script to interact with Hive.

Internally, it uses SessionState.sqlParser.parsePlan(sql) method to create a LogicalPlan.

Caution
FIXME Review

Tip

Enable INFO logging level for the loggers that correspond to the AbstractSqlParser to see what happens inside sql.

Add the following line to conf/log4j.properties:

Refer to Logging.

Creating New Session

You can use newSession method to create a new session without a cost of instantiating a new SqlContext from scratch.

newSession returns a new SqlContext that shares SparkContext, CacheManager, SQLListener, and ExternalCatalog.

Caution
FIXME Why would I need that?

CompressionCodecs

admin阅读(2435)

CompressionCodecs

CompressionCodecs is a utility object…​FIXME

Table 1. Known Compression Codecs
Alias Fully-Qualified Class Name

none

uncompressed

bzip2

org.apache.hadoop.io.compress.BZip2Codec

deflate

org.apache.hadoop.io.compress.DeflateCodec

gzip

org.apache.hadoop.io.compress.GzipCodec

lz4

org.apache.hadoop.io.compress.Lz4Codec

snappy

org.apache.hadoop.io.compress.SnappyCodec

setCodecConfiguration Method

setCodecConfiguration sets compression-related configurations to the Hadoop Configuration per the input codec.

Note
The input codec should be a fully-qualified class name, i.e. org.apache.hadoop.io.compress.SnappyCodec.

If the input codec is defined (i.e. not null), setCodecConfiguration sets the following configuration properties.

Table 2. Compression-Related Hadoop Configuration Properties (codec defined)
Name Value

mapreduce.output.fileoutputformat.compress

true

mapreduce.output.fileoutputformat.compress.type

BLOCK

mapreduce.output.fileoutputformat.compress.codec

The input codec name

mapreduce.map.output.compress

true

mapreduce.map.output.compress.codec

The input codec name

If the input codec is not defined (i.e. null), setCodecConfiguration sets the following configuration properties.

Table 3. Compression-Related Hadoop Configuration Properties (codec not defined)
Name Value

mapreduce.output.fileoutputformat.compress

false

mapreduce.map.output.compress

false

Note
setCodecConfiguration is used when CSVFileFormat, JsonFileFormat and TextFileFormat are requested to prepareWrite.

BufferedRowIterator

admin阅读(1536)

BufferedRowIterator

BufferedRowIterator is…​FIXME

ExternalCatalogUtils

admin阅读(1238)

ExternalCatalogUtils

ExternalCatalogUtils is…​FIXME

prunePartitionsByFilter Method

prunePartitionsByFilter…​FIXME

Note
prunePartitionsByFilter is used when InMemoryCatalog and HiveExternalCatalog are requested to list partitions by a filter.

CatalogUtils Helper Object

admin阅读(960)

CatalogUtils Helper Object

CatalogUtils is a Scala object with the methods to support PreprocessTableCreation post-hoc logical resolution rule (among others).

Table 1. CatalogUtils API
Name Description

maskCredentials

Used when:

normalizeBucketSpec

Used exclusively when PreprocessTableCreation post-hoc logical resolution rule is executed.

normalizePartCols

Used exclusively when PreprocessTableCreation post-hoc logical resolution rule is executed.

normalizeColumnName Internal Method

normalizeColumnName…​FIXME

Note
normalizeColumnName is used when CatalogUtils is requested to normalizePartCols and normalizeBucketSpec.

关注公众号:spark技术分享

联系我们联系我们