关注 spark技术分享,
撸spark源码 玩spark最佳实践

实践|图解AQE的使用

admin阅读(4722)


们都知道,之前的 CBO,都是基于静态信息来对 执行计划进行优化,静态统计信息大家都懂的,不一定准确,比如hive中的catalog中记录的统计信息可以认为是不可信的,在一个不准确的统计信息的基础上优化出来的执行计划必然不是最优的。AQE 就是为了解决这个问题而诞生的,随着spark 官方对AQE持续的优化,下面举一些用户使用场景来展示AQE是如何用的。

优化 Shuffles 过程

Spark shuffles 可以认为是影响查询性能最重要的影响因素,在 shuffle 的时候, 配置多少个 reducer 从来都是spark 用户的老大难问题,相信很多使用spark 的朋友在配置  spark.sql.shuffle.partitions 参数的时候,都是多少有些懵逼的,配置大了,会产生很多小的task 影响运行性能, 配置小了,就会导致task数目很少,单个task 拉取大量的数据,从而带来GC,spill磁盘,甚至OOM的问题,相信很多朋友都碰到过 executor lost, fetch failure等等错误,这里的本质问题是我们并不是很清楚真实的数据量到底有多大, 即使知道了,因为这个参数是全局的,我们一个application 里面不同的query 之间,甚至同一个query 同一个job 不同的stage 之间的shuffle read 的数据量并不是相同的,所以很难使用一个固定的值来统一。


现在 AQE 的实现了动态调整 shuffler partition number 的机制,在跑不同的query 不同的stage的时候,会根据 map 端 shuffle write 的实际数据量,来决定启动多少个 reducer 来处理,这样无论数据量怎么变换,都可以通过不同的 reducer 个数来均衡数据,从而保证单个 reducer 拉取的数据量不至于太大。 



这里需要说明的是,AQE 并不是万能的,AQE 并不晓得 map 端需要对数据分出来多少份,所以实际使用的时候,可以把 spark.sql.shuffle.partitions 参数往大了设。

调整 Join 策略

在成本优化中,选择 join 的类型是比较重要的一块,因为在合适的时候选择 broadcast join,就直接避免了 shuffle, 会大大提升执行的效率,但是如果静态数据是错误的,对一个比较大的(统计数据看起来比较小)的 relation 进行了broadcast,就会直接把 driver 内存给搞爆。

AQE 中,会在运行时根据真实的数据来进行判断,如果有一个表小于 broadcast join 配置的阈值,就会把执行计划中的 shuffle join 动态修改为 broadcast join。


处理Join 过程中的数据倾斜

数据倾斜历来都是老大难的问题,数据倾斜,顾名思义,就是指数据中某些 key 的数据量特别大,然后按照 hash 分区的时候,某个分区的数据量就特别大,这种数据分布会导致性能严重下降,特别是在 sort merge join 的情况下,在 spark ui 上可以看到,某几个 task 拉取的数据量远远大于其他的task,运行时间也远远超过其他task,从而这个短板拖慢了整体的运行时间。因为某些task 拉取了大多数的数据量,就会导致 spill 到磁盘,这样的话,就会更慢,更严重的话,直接就把executor 的内存搞爆了。


因为我们很难事先知道数据的特征,所以在join 的时候数据倾斜就很难通过静态统计信息来避免了,即使加上 hint, 在AQE中,通过收集运行时统计信息,我们就可以动态探测出倾斜的分区,从而对倾斜的分区,分裂出来子分区,每个子分区对应一个 reducer, 从而缓解数据倾斜对性能的影响。


从Spark UI 上观察AQE的运行情况

Understand AQE Query Plans

AQE 的执行计划是在运行过程中动态变化的,在 spark 3.0 中,针对 AQE 引入了几个特定的执行计划节点,AQE 会在Spark UI 上同时显示出初始的计划,和最终优化过的计划,下面我们通过图示的方式来展示一下。

The AdaptiveSparkPlan Node

开启了 AQE,查询中会添加一个或者多个 AdaptiveSparkPlan 节点作为query 或者子查询的根节点,在执行前和执行过程中,isFinalPlan 会被标记为false, query 执行完成后,isFinalPlan 会变为true, 一旦被标记为 true 在 AdaptiveSparkPlan 节点下面的计划也就不再变动。

实践|图解AQE的使用

The CustomShuffleReader Node

CustomShuffleReader 是AQE优化中关键的一环,这个算子节点会根据上一个stage 运行后的真实统计数据,动态的调整后一个 stage 分区的数目,在 spark UI 上,鼠标放在上面,如果你看到 coalesced 标记的话,就说明AQE 已经探测出了大量的小分区,根据配置的比较合适的分区数据量,把他们合并在了一起,可以点开 details, 里面可以看到原始的分区数据,已经合并后的分区数目。


实践|图解AQE的使用

当出现 skewed 标记的时候,说明 AQE在 sort-merge 的计算过程中, 探测出了倾斜的分区,details 里面可以看到,有多少个倾斜的分区,已经从这些倾斜分区中分裂出的分区数目。


实践|图解AQE的使用

当然上面两种优化效果是可以叠加的:

实践|图解AQE的使用

Detecting Join Strategy Change

对比执行计划,可以看出来在AQE优化前后的执行计划的区别,执行计划中,会展示出来初始 执行计划,和 Final 执行计划,下面的例子中,可以看出,初始的 SortMergeJoin 被优化为了 BroadcastHashJoin。

实践|图解AQE的使用


在 Spark UI 上面可以更加清楚的看到优化效果,当然spark ui 上只会展示当前的执行计划图,你可以在 query 开始的时候,和query 完成的时候,对比当时的执行计划图的区别。

实践|图解AQE的使用

Detecting Skew Join

下面的图例中可以根据 skew=true 的标记来判断 引擎有没有执行数据倾斜优化:


实践|图解AQE的使用

AQE 还是很强大的,因为依据的是真实数据的统计信息,AQE 可以很准确的选择最合适的 reducer 数目,转化join 策略,以及处理数据倾斜。



大家都在看


spark sql源码系列:

是时候学习真正的spark技术了  从0到1认识 spark sql  spark sql 源码剖析 PushDownPredicate   spark sql 源码剖析 OptimizeIn 篇

structured streaming 系列:

structured streaming 原理剖析   structured streaming 碰上kafka structured streaming 是如何搞定乱序时间的

spark streaming 系列:

spark streaming 读取kafka各种姿势详解   spark streaming流式计算中的困境与解决之道 

spark core 系列:

彻底搞懂spark shuffle过程(1)  彻底搞懂spark shuffle过程(2)  spark内存管理-Tungsten框架探秘

spark 机器学习系列:

学习了 streaming 和 sql,别忘了还有 Mlib





关注【spark技术分享】


一起撸spark源码,一起玩spark最佳实践


实践|图解AQE的使用



原文始发于微信公众号(spark技术分享):实践|图解AQE的使用

Spark 3.0 AQE 专治各种数据倾斜

admin阅读(4605)

1、前言

近些年来,在对Spark SQL优化上,CBO是最成功的一个特性之一。

CBO会计算一些和业务数据相关的统计数据,来优化查询,例如行数、去重后的行数、空值、最大最小值等。
Spark根据这些数据,自动选择BHJ或者SMJ,对于多Join场景下的Cost-based Join Reorder,来达到优化执行计划的目的。
但是,由于这些统计数据是需要预先处理的,会过时,所以我们在用过时的数据进行判断,在某些情况下反而会变成负面效果,拉低了SQL执行效率。
AQE在执行过程中统计数据,并动态地调节执行计划,从而解决了这个问题。

2、框架

对于AQE而言,最重要的问题就是什么时候去重新计算优化执行计划。Spark任务的算子如果管道排列,依次并行执行。然而,shuffle或者broadcast exchange会打断算子的排列执行,我们称其为物化点(Materialization Points),并且用”Query Stages”来代表那些被物化点所分割的小片段。每个Query Stage会产出中间结果,当且仅当该stage及其并行的所有stage都执行完成后,下游的Query Stage才能被执行。所以当上游部分stage执行完成,partitions的统计数据也获取到了,并且下游还未开始执行,这就给AQE提供了reoptimization的机会。
Spark 3.0 AQE 专治各种数据倾斜
在查询开始时,生成完了执行计划,AQE框架首先会找到并执行那些不存在上游的stages。一旦这些stage有一个或多个完成,AQE框架就会将其在physical plan中标记为完成,并根据已完成的stages提供的执行数据来更新整个logical plan。基于这些新产出的统计数据,AQE框架会执行optimizer,根据一系列的优化规则来进行优化;AQE框架还会执行生成普通physical plan的optimizer以及自适应执行专属的优化规则,例如分区合并、数据倾斜处理等。于是,我们就获得了最新优化过的执行计划和一些已经执行完成的stages,至此为一次循环。接着我们只需要继续重复上面的步骤,直到整个query都跑完。

在Spark 3.0中,AQE框架拥有三大特征:

  • 动态折叠shuffle过程中的partition

  • 动态选择join策略

  • 动态优化存在数据倾斜的join

接下来我们就具体来看看这三大特征。

① 动态合并shuffle partitions

在我们处理的数据量级非常大时,shuffle通常来说是最影响性能的。因为shuffle是一个非常耗时的算子,它需要通过网络移动数据,分发给下游算子。

在shuffle中,partition的数量十分关键。partition的最佳数量取决于数据,而数据大小在不同的query不同stage都会有很大的差异,所以很难去确定一个具体的数目:

  • 如果partition过少,每个partition数据量就会过多,可能就会导致大量数据要落到磁盘上,从而拖慢了查询。

  • 如果partition过多,每个partition数据量就会很少,就会产生很多额外的网络开销,并且影响Spark task scheduler,从而拖慢查询。

为了解决该问题,我们在最开始设置相对较大的shuffle partition个数,通过执行过程中shuffle文件的数据来合并相邻的小partitions。
例如,假设我们执行SELECT max(i) FROM tbl GROUP BY j,表tbl只有2个partition并且数据量非常小。我们将初始shuffle partition设为5,因此在分组后会出现5个partitions。若不进行AQE优化,会产生5个tasks来做聚合结果,事实上有3个partitions数据量是非常小的。Spark 3.0 AQE 专治各种数据倾斜
然而在这种情况下,AQE只会生成3个reduce task。

Spark 3.0 AQE 专治各种数据倾斜

② 动态切换join策略

在Spark所支持的众多join中,broadcast hash join性能是最好的。因此,如果需要广播的表的预估大小小于了广播限制阈值,那么我们就应该将其设为BHJ。但是,对于表的大小估计不当会导致决策错误,比如join表有很多的filter(容易把表估大)或者join表有很多其他算子(容易把表估小),而不仅仅是全量扫描一张表。

由于AQE拥有精确的上游统计数据,因此可以解决该问题。比如下面这个例子,右表的实际大小为15M,而在该场景下,经过filter过滤后,实际参与join的数据大小为8M,小于了默认broadcast阈值10M,应该被广播。

Spark 3.0 AQE 专治各种数据倾斜
在我们执行过程中转化为BHJ的同时,我们甚至可以将传统shuffle优化为本地shuffle(例如shuffle读在mapper而不是基于reducer)来减小网络开销。

③ 动态优化数据倾斜

数据倾斜是由于集群上数据在分区之间分布不均匀所导致的,它会拉慢join场景下整个查询。AQE根据shuffle文件统计数据自动检测倾斜数据,将那些倾斜的分区打散成小的子分区,然后各自进行join。

我们可以看下这个场景,Table A join Table B,其中Table A的partition A0数据远大于其他分区。
Spark 3.0 AQE 专治各种数据倾斜
AQE会将partition A0切分成2个子分区,并且让他们独自和Table B的partition B0进行join。

Spark 3.0 AQE 专治各种数据倾斜
如果不做这个优化,SMJ将会产生4个tasks并且其中一个执行时间远大于其他。经优化,这个join将会有5个tasks,但每个task执行耗时差不多相同,因此个整个查询带来了更好的性能。

3、使用

我们可以设置参数spark.sql.adaptive.enabled为true来开启AQE,在Spark 3.0中默认是false,并满足以下条件:

  • 非流式查询

  • 包含至少一个exchange(如join、聚合、窗口算子)或者一个子查询

AQE通过减少了对静态统计数据的依赖,成功解决了Spark CBO的一个难以处理的trade off(生成统计数据的开销和查询耗时)以及数据精度问题。相比之前具有局限性的CBO,现在就显得非常灵活 – 我们再也不需要提前去分析数据了!



大家都在看


spark sql源码系列:

是时候学习真正的spark技术了  从0到1认识 spark sql  spark sql 源码剖析 PushDownPredicate   spark sql 源码剖析 OptimizeIn 篇

structured streaming 系列:

structured streaming 原理剖析   structured streaming 碰上kafka structured streaming 是如何搞定乱序时间的

spark streaming 系列:

spark streaming 读取kafka各种姿势详解   spark streaming流式计算中的困境与解决之道 

spark core 系列:

彻底搞懂spark shuffle过程(1)  彻底搞懂spark shuffle过程(2)  spark内存管理-Tungsten框架探秘

spark 机器学习系列:

学习了 streaming 和 sql,别忘了还有 Mlib





关注【spark技术分享】


一起撸spark源码,一起玩spark最佳实践


Spark 3.0 AQE 专治各种数据倾斜


原文始发于微信公众号(spark技术分享):Spark 3.0 AQE 专治各种数据倾斜

实战例子|自定义一个窗口函数来计算网站的会话数

admin阅读(7129)

If you’ve worked with Spark, you have probably written some custom UDF or UDAFs.
UDFs are ‘User Defined Functions’, so you can introduce complex logic in your queries/jobs, for instance, to calculate a digest for a string, or if you want to use a java/scala library in your queries.

UDAF stands for ‘User Defined Aggregate Function’ and it works on aggregates, so you can implement functions that can be used in a GROUP BY clause, similar to AVG.

You may not be familiar with Window functions, which are similar to aggregate functions, but they add a layer of complexity, since they are applied within a PARTITION BY clause. An example of window function is RANK(). You can read more about window functions here.

在使用 spark sql 的时候,有时候默认提供的sql 函数可能满足不了需求,这时候可以自定义一些函数,也就是UDF 或者UDAF(顾名思义,User Defined Functions)。

UDF 只是在sql中简单的处理转换一些字段,类似默认的trim 函数把一个字符串类型的列的头尾空格去掉, 还有一种sql函数叫做UDAF,不同于UDF,这种是在sql聚合语句中使用的sql函数,必须配合 GROUP BY 一同使用,类似默认的count,sum函数,但是还有一种自定义函数叫做 UDWF, 这种一般人就不知道了,这种叫做窗口自定义函数,不了解窗口函数的,可以参考上一篇文章,或者官方的介绍

While aggregate functions work over a group, window functions work over a logical window of record and allow you to produce new columns from the combination of a record and one or more records in the window.
Describing what window functions are is beyond the scope of this article, so for that refer to the previously mentioned article from Databricks, but in particular, we are interested at the ‘previous event in time for a user’ in order to figure out sessions.

There is plenty of documentation on how to write UDFs and UDAFs, see for instance This link for UDFs or this link for UDAFs.

I was surprised to find out there’s not much info on how to build an custom window function, so I dug up the source code for spark and started looking at how window functions are implemented. That opened to me a whole new world, since Window functions, although conceptually similar to UDAFs, use a lower level Spark API than UDAFs, they are written using Catalyst expressions.

窗口函数是 SQL 中一类特别的函数。和聚合函数相似,窗口函数的输入也是多行记录。不 同的是,聚合函数的作用于由 GROUP BY 子句聚合的组,而窗口函数则作用于一个窗口

这里怎么理解一个窗口呢,spark君在这里得好好的解释解释,一个窗口是怎么定义的,

窗口语句中,partition by用来指定分区的列,在同一个分区的行属于同一个窗口

order by用来指定数据在一个窗口内的多行,如何排序

windowing_clause 用来指定开窗方式,在spark sql 中开窗方式有那么几种

  • 一个分区中的所有行作为一个窗口:UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING(上下都没有边界),这种情况下,spark sql 会把所有行作为一个输入,进行一次求值
  • Growing frame:UNBOUNDED PRECEDING AND ….(上无边界), 这种就是不断的把当前行加入的窗口中,而不删除, 例子:.rowsBetween(Long.MinValue, 0) :窗口的大小是按照排序从最小值到当前行,在数据迭代过程中,不断的把当前行加入的窗口中。
  • Shrinking frame:… AND UNBOUNDED FOLLOWING(下无边界)和Growing frame 相反,窗口不断的把迭代到的当前行从窗口中删除掉。
  • Moving frame:滑动的窗口,举例:.rowsBetween(-1, 1) 就是指窗口定义从 -1(当前行前一行)到 1(当前行后一行) ,每一个滑动的窗口总用有3行
  • Offset frame  窗口中只有一条数据,就是偏移当前行一定距离的哪一行,举例:lag(field, n): 就是取从当前字段往前第n个值

这里就针对窗口函数就介绍这么多,如果不懂请参考相关文档,加强理解,我们在平时使用 spark sql 的过程中,会发现有很多教你自定义 UDF 和 UDAF 的教程,却没有针对UDWF的教程,这是为啥呢,这是因为 UDF 和UDAF 都作为上层API暴露给用户了,使用scala很简单就可以写一个函数出来,但是UDWF没有对上层用户暴露,只能使用 Catalyst expressions. 也就是Catalyst框架底层的表达式语句才可以定义,如果没有对源码有很深入的研究,根本就搞不出来。spark 君在工作中写了一些UDWF的函数,但是都比较复杂,不太好单独抽出来作为一个简明的例子给大家讲解,这里翻译一篇文章来做说明。

窗口函数的使用场景

Now, for what kind of problem do we need window functions in the first place?
A common problem when working on any kind of website, is to determine ‘user sessions’, periods of user activity. if an user is inactive for a certain time T, then it’s considered a new ‘session’. Statistics over sessions are used to determine for instance if the user is a bot, to find out what pages have the most activity, etc.

Let’s say that we consider a session over if we don’t see any activity for one hour (sixty minutes). Let’s see an example of user activity, where ‘event’ has the name of the page the user visited and time is the time of the event. I simplified it, since the event would be a URL, while the time would be a full timestamp, and the session id would be generated as a random UUID, but I put simpler names/times just to illustrate the logic.

我们来举个实际例子来说明 窗口函数的使用场景,在网站的统计指标中,有一个概念叫做用户会话,什么叫做用户会话呢,我来说明一下,我们在网站服务端使用用户session来管理用户状态,过程如下

1) 服务端session是用户第一次访问应用时,服务器就会创建的对象,代表用户的一次会话过程,可以用来存放数据。服务器为每一个session都分配一个唯一的sessionid,以保证每个用户都有一个不同的session对象。

2)服务器在创建完session后,会把sessionid通过cookie返回给用户所在的浏览器,这样当用户第二次及以后向服务器发送请求的时候,就会通过cookie把sessionid传回给服务器,以便服务器能够根据sessionid找到与该用户对应的session对象。

3)session通常有失效时间的设定,比如1个小时。当失效时间到,服务器会销毁之前的session,并创建新的session返回给用户。但是只要用户在失效时间内,有发送新的请求给服务器,通常服务器都会把他对应的session的失效时间根据当前的请求时间再延长1个小时。

也就是说如果用户在1个超过一个小时不产生用户事件,当前会话就结束了,如果后续再产生用户事件,就当做新的用户会话,我们现在就使用spark sql 来统计用户的会话数,这种场景就很适合使用窗口函数来做统计,因为判断当前是否是一个新会话的依据,需要依赖当前行的前一行的时间戳和当前行的时间戳的间隔来判断,下面的表格可以帮助你理解这个概念,例子中有3列数据,用户,event字段代表用户访问了一个页面,time字段代表访问页面的时间戳:

usereventtimesession
user1page110:12session1 (new session)
user1page210:20session1 (same session, 8 minutes from last event)
user1page111:13session1 (same session, 53 minutes from last event)
user1page314:12session2 (new session, 3 hours after last event)

Note that this is the activity for one user. We do have many users, and in fact partitioning by user is the job of the window function.

上面只有一个用户,如果多个用户,可以使用 partition by 来进行分区。

深入研究

It’s better to use an example to illustrate how the function works in respect of the window definition.
Let’s assume we have a very simple user activity data, with a user ID called user, while tsis a numeric timestamp and session is a session ID, that may be already present. While we may start with no session whatsoever, in most practical cases, we may be processing data hourly, so at hour N + 1 we want to continue the sessions
we calculated at hour n.

Let’s create some test data and show what we want to achieve.

我们来构造一些假数据:

First, the window specification. Sessions are create per user, and the ordering is of course by timestamp.
Hence, we want to apply the function partitionBy user and orderBy timestamp.

怎么使用 spark sql 来统计会话数目呢,因为不同用户产生的是不同的会话,首先使用user字段进行分区,然后按照时间戳进行排序

We want to write a createSession function that will use the following logic:

这时候我们需要一个自定义函数来加一个列,这个列的值的逻辑如下

and will produce something like this:

运行结果如下:

usertssessionnewsession
user11508863564166f237e656-1e..f237e656-1e..
user11508864164166nullf237e656-1e..
user11508864464166nullf237e656-1e5..
user11508871964166null51c05c35-6f..
user11508873164166null51c05c35-6f..
user21508863864166null2c16b61a-6c..
user21508864464166null2c16b61a-6c..

Note that we are using random UUIDs as it’s pretty much the standard, and we’re shortening them for typographical reasons.

As you see, for each user, it will create a new session whenever the difference between two events is bigger than the session threshold.

Internally, for every record, we want to keep track of:

  • The current session ID
  • The timestamp of the previous session

This is going to be the state that we must maintain. Spark takes care of initializing it for us.
It is also going to be the parameters the function expects.

Let’s see the skeleton of the function:

我们使用 UUID 来作为会话id, 当后一行的时间戳和前一行的时间戳间隔大于1小时的时候,就创建一个新的会话id作为列值,否则使用老的会话id作为列值。

这种就涉及到状态,我们在内部需要维护的状态数据

  • 当前的session ID
  • 当前session的最后活动事件的时间戳

自定义函数的代码如下:

A few notes here:

  • Our ‘state’ is going to be a Seq[AttributeReference]
  • Each AttributeReference must be declared with its type. As we said, we keep the current Session and the timestamp of the previous one.
  • We inizialize it by overriding initialValues
  • For every record, within the window, spark will call first updateExpressions, then will produce the values calling evaluateExpression

Now it’s time to implement the updateExpressionsand evaluateExpression functions.

注解:

  • 状态保存在 Seq[AttributeReference]
  • 重写 initialValues方法进行初始化
  • spark sql 在迭代处理每一行数据的时候,都会调用 updateExpressions 函数来处理,根据当后一行的时间戳和前一行的时间戳间隔大于1小时来进行不同的逻辑处理,如果不大于,就使用 aggBufferAttributes(0) 中保存的老的sessionid,如果大于,就把 createNewSession 包装为一个scalaUDF作为一个子表达式来创建一个新的sessionID,并且每次都把当前行的时间戳作为用户活动的最后时间戳。

Notice how we use catalyst expressions, while in normal UDAFs we just use plain scala expressions.

Last thing, we need to declare a static method that we can invoke from the query that will instantiate the function. Notice how I created two, one that allows the user to specify what’s the max duration of a session, and one that takes the default:

最后包装为静态对象的方法,就可以在spark sql中使用这个自定义窗口函数了,下面是两个重载的方法,一个最大间隔时间使用默认值,一个可以运行用户自定义,perfect。

Now creating session IDs is as easy as:

现在,我们就可以拿来用在我们的main函数中了。

Notice that here we specified 10 second sessions.

There’s a little more piping involved which was omitted for clarity, but you can find the complete code, including unit tests, in my github project

spark window函数使用案例

admin阅读(10460)

什么是简单移动平均值

简单移动平均(英语:Simple Moving Average,SMA)是某变数之前n个数值的未作加权算术平均。例如,收市价的10日简单移动平均指之前10日收市价的平均数。

关键词请忽略:spark window函数 spark sql窗口函数 spark窗口函数 pyspark window pyspark window function spark sql row_number spark window condition spark sql first_value pyspark.sql.window pyspark window.partitionby

直接看例子吧

这个 window spec 中,数据根据用户(customer)来分去。每一个用户数据根据时间排序。然后,窗口定义从 -1(前一行)到 1(后一行) ,每一个滑动的窗口总用有3行

这段代码添加了一个新列,movingAvg,在滑动的窗口中使用了均值函数:

窗口函数和窗口特征定义

关键词请忽略:spark window函数 spark sql窗口函数 spark窗口函数 pyspark window pyspark window function spark sql row_number spark window condition spark sql first_value pyspark.sql.window pyspark window.partitionby

正如上述例子中,窗口函数主要包含两个部分:

  1. 指定窗口特征(wSpec)
    1. “partitionyBY” 定义数据如何分组;在上面的例子中,他是用户
    2. “orderBy” 定义分组中的排序
    3. “rowsBetween” 定义窗口的大小
  2. 指定窗口函数函数 你可以使用 org.apache.spark.sql.functions 的“聚合函数(Aggregate Functions)”和”窗口函数(Window Functions)“类别下的函数

累计汇总

.rowsBetween(Long.MinValue, 0) :窗口的大小是按照排序从最小值到当前行

前一行数据

关键词请忽略:spark window函数 spark sql窗口函数 spark窗口函数 pyspark window pyspark window function spark sql row_number spark window condition spark sql first_value pyspark.sql.window pyspark window.partitionby

lag(field, n): 就是取从当前字段往前第n个值,这里是取前一行的值

如果计算环比的时候,是不是特别有用啊?!

在介绍几个常用的行数:

  • first/last(): 提取这个分组特定排序的第一个最后一个,在获取用户退出的时候,你可能会用到
  • lag/lead(field, n): lead 就是 lag 相反的操作,这个用于做数据回测特别用,结果回推条件

排名

这个数据在提取每个分组的前n项时特别有用,省了不少麻烦。

关键词请忽略:spark window函数 spark sql窗口函数 spark窗口函数 pyspark window pyspark window function spark sql row_number spark window condition spark sql first_value pyspark.sql.window pyspark window.partitionby


Spark持续流处理和微批处理的对比

admin阅读(7499)

Spark从2.3版本开始引入了持续流式处理模型,可将流处理延迟降低至毫秒级别,让 Structured Streaming 达到了一个里程碑式的高度。

 

下面的架构图中,既有微批处理,还有持续流处理,两种模式对用户是暴露的API是高度统一的:

 

Spark持续流处理和微批处理的对比

今天我们着重看下两者的设计思路和区别点

关键字请忽略:spark continuous processing kafka continuous spark spark continuous mode spark continuous query spark continuous processing vs flink spark continuous processing 流式数据是什么 流数据库 流数据存储 实时流数据处理 流式处理英文 流处理乱序 批处理流式计算 什么叫流数据 structured streaming

在持续模式下,流处理器持续不断地从数据源拉取和处理数据,而不是每隔一段时间读取一个批次的数据,这样就可以及时地处理刚到达的数据。如下图所示,延迟被降低到毫秒级别,完全满足了极低延迟的要求。

 

Spark持续流处理和微批处理的对比

持续模式目前支持的 Dataset 操作包括 Projection、Selection 以及除 current_timestamp()、current_date()、聚合函数之外的 SQL 操作。它还支持将 Kafka 作为数据源和数据池(Sink),也支持将控制台和内存作为数据池。

开发者可以根据实际的延迟需求来选择使用持续模式还是微批次模式,总之,Structured Streaming 为开发者提供了容错和可靠性方面的保证。以及端到端的毫秒级延迟、至少一次处理保证等。

关键字请忽略:spark continuous processing kafka continuous spark spark continuous mode spark continuous query spark continuous processing vs flink spark continuous processing 流式数据是什么 流数据库 流数据存储 实时流数据处理 流式处理英文 流处理乱序 批处理流式计算 什么叫流数据 structured streaming

微批处理

Structured Streaming  默认使用微批模式,spark 引擎会定期检查是否有新数据到达,然后开启一个新的批次进行处理,如下图:

 

Spark持续流处理和微批处理的对比

在微批模式下, driver 在执行每个批次前,都需要先把 offset range 写入 WAL, 为了挂掉后可以 recover,当一条日志到达后,并不会立即处理,需要先处理完上一个批次,然后把这一个批次的 offset 记录后,才会处理,如下图:

 

Spark持续流处理和微批处理的对比

 

 

这种模式下,最低的延迟可以搞到 100ms, 这种模式是架构在 Spark SQL 上的,所以就坐享 spark SQL 中已有的优化方式(code generation 和 project Tungsten,参考 https://databricks.com/blog/2016/05/23/apache-spark-as-a-compiler-joining-a-billion-rows-per-second-on-a-laptop.html),这种模式主要是面向吞吐量进行设计,而且可以满足绝大部分应用场景,比如ETL和准实时监控,但是对于要求延迟在 10ms 的场景就力不从心了,所以2.3 版本中又引入了 持续流处理 模型。

关键字请忽略:spark continuous processing kafka continuous spark spark continuous mode spark continuous query spark continuous processing vs flink spark continuous processing 流式数据是什么 流数据库 流数据存储 实时流数据处理 流式处理英文 流处理乱序 批处理流式计算 什么叫流数据 structured streaming

持续流处理

在持续流模式下,spark不是定期调度新批次的任务,而是启动一直运行的驻守在 executor 上的任务,源源不断的进行读取处理输出数据,如下图:

 

Spark持续流处理和微批处理的对比

 

Spark持续流处理和微批处理的对比

 

 

因为在 executor 端是持续流处理的,所以最低延迟可以降到 几毫秒,spark 内部采用的分布式快照算法类似 Chandy-Lamport 算法,不过略有区别,在source 端隔一段时间注入特殊标记(epoch markers)到数据流,然后就相当于把数据切分为不同的 epochs, 当特殊标记流到 最后的 operator, executor 获取后,向driver 汇报,driver等齐所有executor的汇报,统一发号施令,统一提交,有点类似于二次提交算法,全部的 executor 都提交后,driver 再写入提交日志中,这个 epochs 就算是全部ok了,防止重复执行。

后续这个分布式算法的设计和实现我会抽一篇文章来单独介绍。

 

关键字请忽略:spark continuous processing kafka continuous spark spark continuous mode spark continuous query spark continuous processing vs flink spark continuous processing 流式数据是什么 流数据库 流数据存储 实时流数据处理 流式处理英文 流处理乱序 批处理流式计算 什么叫流数据 structured streaming

 

大家都在看

spark sql源码系列:

是时候学习真正的spark技术了从0到1认识 spark sqlspark sql 源码剖析 PushDownPredicatespark sql 源码剖析 OptimizeIn 篇

structured streaming 系列:

structured streaming 原理剖析structured streaming 碰上kafkastructured streaming 是如何搞定乱序时间的

spark streaming 系列:

spark streaming 读取kafka各种姿势详解spark streaming流式计算中的困境与解决之道

spark core 系列:

彻底搞懂spark shuffle过程(1)彻底搞懂spark shuffle过程(2)spark内存管理-Tungsten框架探秘

spark 机器学习系列

关注【spark技术分享】

一起撸spark源码,一起玩spark最佳实践

Spark持续流处理和微批处理的对比


原文始发于微信公众号(spark技术分享):Spark持续流处理和微批处理的对比

案例|使用 spark Pivot 处理复杂的数据统计需求

admin阅读(4622)

Pivot 算子是 spark 1.6 版本开始引入的,在 spark2.4版本中功能做了增强,还是比较强大的,做过数据清洗ETL工作的都知道,行列转换是一个常见的数据整理需求。spark 中的Pivot 可以根据枢轴点(Pivot Point) 把多行的值归并到一行数据的不同列,这个估计不太好理解,我们下面使用例子说明,看看pivot 这个算子在处理复杂数据时候的威力。

关键词:spark pivot用法 spark dataframe转置 spark行转列 spark pivot scala spark列转行

使用Pivot 来统计天气走势

下面是西雅图的天气数据表,每行代表一天的天气最高值:

Date Temp (°F)
07-22-2018 86
07-23-2018 90
07-24-2018 91
07-25-2018 92
07-26-2018 92
07-27-2018 88
07-28-2018 85
07-29-2018 94
07-30-2018 89

如果我们想看下最近几年的天气走势,如果这样一天一行数据,是很难看出趋势来的,最直观的方式是 按照年来分行,然后每一列代表一个月的平均天气,这样一行数据,就可以看到这一年12个月的一个天气走势,下面我们使用 pivot 来构造这样一个查询结果:

案例|使用 spark Pivot 处理复杂的数据统计需求

结果如下图:

案例|使用 spark Pivot 处理复杂的数据统计需求

 

是不是很直观,第一行就代表 2018 年,从1月到12月的平均天气,能看出一年的天气走势。

关键词:spark pivot用法 spark dataframe转置 spark行转列 spark pivot scala spark列转行

这个SQL应该怎么理解

我们来看下这个 sql 是怎么玩的,首先是一个 子查询语句,我们最终关心的是 年份,月份,和最高天气值,所以先使用子查询对原始数据进行处理,从日期里面抽取出来年份和月份。

下面在子查询的结果上使用 pivot 语句,pivot 第一个参数是一个聚合语句,这个代表聚合出来一个月30天的一个平均气温,第二个参数是 FOR month,这个是指定以哪个列为枢轴列,第三个 In 子语句指定我们需要进行行列转换的具体的 枢轴点(Pivot Point)的值,上面的例子中 1到12月份都包含了,而且给了一个别名,如果只指定 1到6月份,结果就如下了:

案例|使用 spark Pivot 处理复杂的数据统计需求

上面sql语句里面有个特别的点需要注意, 就是聚合的时候有个隐含的维度字段,就是 年份,按理来讲,我们没有写  group-by year, 为啥结果表里面不同年份区分在了不同的行,原因是,FORM 子查询出来的每行有 3个列, year,month,tmp,如果一个列既不出现在进行聚合计算的列中(temp 是聚合计算的列), 也不作为枢轴列 , 就会作为聚合的时候一个隐含的维度。我们的例子中算平均值聚合操作的维度是 (year, month),一个是隐含维度,一个是 枢轴列维度, 这一点一定要注意,如果不需要按照 year 来区分,FORM 查询的时候就不要加上这个列。

关键词:spark pivot用法 spark dataframe转置 spark行转列 spark pivot scala spark列转行

指定多个聚合语句

上文中只有一个聚合语句,就是计算平均天气,其实是可以加多个聚合语句的,比如我们需要看到 7,8,9 月份每个月的最大气温和平均气温,就可以用以下SQL语句

 

案例|使用 spark Pivot 处理复杂的数据统计需求

上文中指定了两个聚合语句,查询后, 枢轴点(Pivot Point)  和 聚合语句 的笛卡尔积作为结果的不同列,也就是  <value>_<aggExpr>, 看下图:

 

案例|使用 spark Pivot 处理复杂的数据统计需求

聚合列(Grouping Columns)和 枢轴列(Pivot Columns)的不同之处

现在假如我们有西雅图每天的最低温数据,我们需要把最高温和最低温放在同一张表里面看对比着看

Date Temp (°F)
08-01-2018 59
08-02-2018 58
08-03-2018 59
08-04-2018 58
08-05-2018 59
08-06-2018 59

 

我们使用  UNION ALL 把两张表做一个合并:

案例|使用 spark Pivot 处理复杂的数据统计需求

 

现在使用 pivot 来进行处理:

案例|使用 spark Pivot 处理复杂的数据统计需求

我们统计了 4年中  7,8,9 月份最低温和最高温的平均值,这里要注意的是,我们把 year 和 一个最低最高的标记(H/L)都作为隐含维度,不然算出来的就是最低最高温度在一起的平均值了。结果如下图:

案例|使用 spark Pivot 处理复杂的数据统计需求

上面的查询中,我们把最低最高的标记(H/L)也作为了一个隐含维度,group-by 的维度就变成了 (year, H/L, month), 但是year 和 H/L 体现在了不同行上面,month 体现在了不同的列上面,这就是  聚合列(Grouping Columns)和 枢轴列(Pivot Columns)的不同之处。

再接再厉,我们把 H/L 作为  枢轴列(Pivot Columns) 进行查询:

案例|使用 spark Pivot 处理复杂的数据统计需求

 

结果的展现方式就和上面的不同了,虽然每个单元格值都是相同的,但是把  H/L 和 month 笛卡尔积作为列,H/L 维度体现在了列上面了:

案例|使用 spark Pivot 处理复杂的数据统计需求

关键词:spark pivot用法 spark dataframe转置 spark行转列 spark pivot scala spark列转行

大家都在看

spark sql源码系列:

是时候学习真正的spark技术了从0到1认识 spark sqlspark sql 源码剖析 PushDownPredicatespark sql 源码剖析 OptimizeIn 篇

structured streaming 系列:

structured streaming 原理剖析structured streaming 碰上kafkastructured streaming 是如何搞定乱序时间的

spark streaming 系列:

spark streaming 读取kafka各种姿势详解spark streaming流式计算中的困境与解决之道

spark core 系列:

彻底搞懂spark shuffle过程(1)彻底搞懂spark shuffle过程(2)spark内存管理-Tungsten框架探秘

spark 机器学习系列

关注【spark技术分享】

一起撸spark源码,一起玩spark最佳实践

案例|使用 spark Pivot 处理复杂的数据统计需求

原文始发于微信公众号(spark技术分享):案例|使用 spark Pivot 处理复杂的数据统计需求

学习 | Spark 2.4 原生支持内置支持avro, spark read avro

admin阅读(2623)

在hadoop 生态圈,我们经常会看到 avro, avro 是什么呢,首先 avro 是可以作为一种基于二进制数据传输高性能的中间件, 比如在 Flume 中,我们使用 avro sinker 来发送数据,使用 avro source 来接受网络数据。

avro 尽管提供了 rpc 机制,事实上Avro的核心特性决定了它通常用在“大数据”存储场景, 即我们通过借助schema将数据写入到“本地文件”或者HDFS中,然后reader再根据schema去迭代获取数据条目, 好处是它的schema 是动态可演变的,这种强大的可扩展性正是“文件数据”存储所必须的。

关键词: Spark 2.4 原生支持内置支持avro, spark read avro

在 spark2.4 之前的版本中,我们如果需要在 spark 中读取 avro 格式的文件,通常要使用第三方库, 也就是这个项目 https://github.com/databricks/spark-avro,  spark2.4中内置了对avro 格式的支持,有以下好处

  • 不用依赖第三方包,开箱即用

  • 可以在 DataFrame 中方便使用 from_avro() ,  to_avro()  转来转去

  • 支持 Logical types , 也就是我们自己扩展出来的 avro 格式

  • 读性能 2倍提升,写性能 提高10%, 原因下文中说明

可以看到,databrick 的人觉得 avro 还是一种很重要的格式,有必要把他放在 spark sql 源码中内置了,不用你再去别的地方去引用,同时官方也顺手做了些性能优化

一个简单的例子

spark2.4中,读取和写入 avro 格式极其方便,只要指定一下 format 为 avro 即可,指定后, DataFrameReader 和 DataFrameWriter 就可以找到相关的 source 实现类,当然这些我们完全不需要关心。

学习 | Spark 2.4 原生支持了avro, 别再引用第三方库了

from_avro() and to_avro()

 

我们都知道,在kafka 中是可以写入 avro 格式的数据,比如 flume 收集的数据就可以直接用 avro 格式写入 kafka, 针对这种场景, 官方内置了两个函数,from_avro() , to_avro(), 很实用,我们从kafka 中加载数据到 spark , schema 中有这么几个字段:

学习 | Spark 2.4 原生支持了avro, 别再引用第三方库了

 

关键词: Spark 2.4 原生支持内置支持avro, spark read avro

这里的 value 是一个字节数组,如果kafka中是 avro 格式的数据,那么这个字节数组中保存的就是 avro 格式的数据,这时候你只需要使用 from_avro()  函数,同时传入一个avro shema(你使用json 格式定义的 avro schema), 就可以轻松的转换,后面你就可以使用 avro 定义的字段进行相应的操作了 :

学习 | Spark 2.4 原生支持了avro, 别再引用第三方库了

如果你想使用 avro 格式写入kafka 就更方便了,上述例子中,使用o_avro() 转换为avro 格式,然后 输出到 kafka中。

关键词: Spark 2.4 原生支持内置支持avro, spark read avro

你的老代码应该怎么办

spark2.4 中的内置的 avro 完全兼容原来的第三方库中的 spark-avro,也就是说如果你原来使用的 format 是 com.databricks.spark.avro , 现在这个代码你完全不用动,如果说你自己在第三方库上做了自己的变更,就是想用外部引用过来的类,也可以,设置 spark.sql.legacy.replaceDatabricksSparkAvro.enabled 为 false 就行。

关键词: Spark 2.4 原生支持内置支持avro, spark read avro

性能提升

这个性能提升才应该是我们使用新版本最大的动力,这个性能优化在哪里呢,SPARK-24800, 大家可以看下这个 pr 学习一下,我们都知道 spark 内部数据流转格式是 InternalRow,DataFrame 的Row 是要靠 RowEncoder 转换为 InternalRow 格式的, 对于avro 格式来将,原来的实现方式是先把 avro 格式转换为 Row, 然后把 Row 转换为 InternalRow, 新的实现方式是直接把 avro 格式转换为 InternalRow, 省了一步。

下面有一个 databrick 官方做的性能测试报告:

学习 | Spark 2.4 原生支持了avro, 别再引用第三方库了

可以看到,读性能提高了一倍,写性能提高了8%左右,这么来看,spark2.4 中使用 avro 确实便利了很多,而且有了 IO 性能的提高,估计以后最新的变动应该第一时间体现在内置的 avro 支持中,而不是第三方的库里面。

关键词: Spark 2.4 原生支持内置支持avro, spark read avro

所以升不升级你看着办,我先升为敬

 

大家都在看

spark sql源码系列:

是时候学习真正的spark技术了从0到1认识 spark sql

structured streaming 系列:

structured streaming 原理剖析structured streaming 碰上kafkastructured streaming 是如何搞定乱序时间的

spark streaming 系列:

spark streaming 读取kafka各种姿势详解spark streaming流式计算中的困境与解决之道

spark core 系列:

彻底搞懂spark shuffle过程(1)彻底搞懂spark shuffle过程(2)spark内存管理-Tungsten框架探秘

spark 机器学习系列

 

 

关注【spark技术分享】

一起撸spark源码,一起玩spark最佳实践

学习 | Spark 2.4 原生支持了avro, 别再引用第三方库了

听说新版微信这有个好看

原文始发于微信公众号(spark技术分享):学习 | Spark 2.4 原生支持了avro, 别再引用第三方库了

写在阿里Blink正式开源之际

admin阅读(2629)

写在阿里Blink正式开源之际

阿里最近要正式将内部Blink开源,搞Blink 的大牛也在 AI 前线公众号上推了文章,介绍Blink的优势重磅!阿里Blink正式开源,重要优化点解读 , 首先 spark 君对于每秒处理多少数据量,单个作业处理数据量超过400T 这些标榜不是特别在意,因为这涉及到使用了多少资源,处理时长以及使用姿势,不说清楚就是无意义的。spark君比较在意的是文中提到的Blink的一些重要的feature,大致看有 5 个方面

  • Hive的兼容性,Blink:为了打通元数据,我们重构了 Flink catalog 的实现,并且增加了两种 catalog,一个是基于内存存储的 FlinkInMemoryCatalog,另外一个是能够桥接 Hive metaStore 的 HiveCatalog。有了这个 HiveCatalog,Flink 作业就能读取 Hive 的 metaData,  spark君:这个功能怎么看怎么像从 spark 这边抄过来的,至少是借鉴,我们都知道,spark 定义了ExternalCatalog 用来管理操作数据库表或者注册函数等,然后分为 InMemoryCatalog 和HiveExternalCatalog ,连名字都差不多,到底是谁抄谁呢,spark君在平时使用中,感觉 spark sql 除了 像 hive 中带有buckets的table 等极少数几个操作不支持,spark 君敢说 Spark SQL支持绝大部分的Hive特征。
  •  SQL/TableAPI的优化,Blink: 我们对 SQL engine 的架构做了较大的调整。提出了全新的 Query Processor(QP), 它包括了一个优化层(Query Optimizer)和一个算子层(Query Executor)。这样一来,流计算和批计算的在这两层大部分的设计工作就能做到尽可能地复用,   spark 君:这个就对应 spark sql 里面的逻辑优化和物理执行两个流程吧,这个在这里就不用多说了吧,spark君一直孜孜不倦的给小伙伴们普及 spark逻辑优化和物理优化都做了什么,感觉spark在这块都优化的不能再优化了。至于说到 流批复用,我就默默提一下 structured streaming ,  完全架构在 spark sql 之上,一张无限流动的大表。看文章中重点强调的 BinaryRow 和  codegen 优化方式,这都是 spark 玩了很久很久的东西了,不了解的小伙伴,需要多看看相关的,后续spark君会专门抽文章介绍。
  • 更好的runtime支持,Blink: 首先 Blink 引入了 Pluggable Shuffle Architecture,开发者可以根据不同的计算模型或者新硬件的需要实现不同的 shuffle 策略进行适配。此外 Blink 还引入新的调度架构,容许开发者根据计算模型自身的特点定制不同调度器。说道这个,spark君表示我对 spark 官方最佩服的一点就是源码里面体现出来的高度工程抽象能力,高内聚,低耦合,面向变化和接口编程,早早就支持调度全家桶了(standalone, yarn, mesos, k8s  spark2.3 之后也是原生的哦)。
  • Zeppelin for Flink,  zeppelin 就是个脚手架和大杂烩,spark也早早的入坑了,这个是小功能,就不说了。 
  • Flink Web, Blink 这次介绍出来的多种指标展示维度,看着不错,其实我对 spark 的界面是有些不满意的,特别是 针对 structured streaming 的指标展示,还是比较弱。

下面是 一位 spark 大牛的文章,里面很多观点和我的一致,在这里转发给大家看下:

前言

今天朋友圈有篇【阿里技术】发的文章,说Blink的性能如何强悍,功能现在也已经比较完善。譬如:

Blink 在 TPC-DS 上和 Spark 相比有着非常明显的性能优势,而且这种性能优势随着数据量的增加而变得越来越大。在实际的场景这种优势已经超过 Spark 三倍,在流计算性能上我们也取得了类似的提升。我们线上的很多典型作业,性能是原来的 3 到 5 倍。在有数据倾斜的场景,以及若干比较有挑战的 TPC-H query,流计算性能甚至得到了数十倍的提升。

什么时候可以享受这波红利

还要等待一段时间。要想享受Blink的加持,大家可能还要等待一段时间,因为除了功能合并,还有代码质量。代码质量理论上应该是没有原生flink好的。这个需要时间,不是靠人力就能搞定的。

一点忧思

阿里收购Flink母公司,然后马上发通告,说blink要合并进flink了,之前还是商量口吻。显然,这对于社区来说,是一个非常不友好的感觉。我猜测,社区部分优秀的人才(包括母公司)肯定会有人走的。开源项目对于PR的质量除了功能,更多的是架构,代码质量等等的考量。

那和Spark的对比怎么样?

Spark 和 Flink不在一个level级别战斗。Spark 从诞生没多久开始,就朝着AI方向发展,包括内置的mllib,深度学习后也马上抓住机遇,在2.2.x之后发力,DB公司开发了一套生态辅助系统,比如Spark deep Learning,Tensorframes, GraphFrames等等,另外还有众多第三方框架的加持。2.3-2.4在商业版本里则已经集成了如horovod等分布式深度学习框架,所以说,2.2.x之后,Spark的主战场早就已经是AI,而 Flink依然停留在流,批战场。

Flink,Spark性能好对机器学习有啥影响

有人会问,机器学习对性能不是很在乎么?现在flink性能据说那么好?到底有多好,这是一家之言,但是 在这些框架里
性能在AI方面不是很重要,因为他们对AI重在集成,而不是自己实现。这就意味着瓶颈是在数据交换以及AI框架自身之上。模型构件好进行预测,也是对应的AI框架自己去加载,提供预测接口,其他只是wrap一层而已。

盛夏即将发布的3.0则对AI更加友好,包括CPU/GPU的管理,K8s backend, 数据交换(Spark – AI框架)的提速,内部Barrier API 的等进一步的完,显然让Spark在AI领域进一步保持优势

 和AI集成的基础,Spark以有所沉淀

和AI集成的好坏,取决于Java/Scala语言和Python语言的互通的质量。Spark 在1.6之前就已经支持Python,经过这么多年的优化,已经有了很好的经验,最新的arrow引入让速度更是成两位数的提升。

Flink 盛夏之下的喧闹

这次关于bink 合并进flink的通告不是由社区主导发送,而是阿里技术发送,显然有喧宾夺主的意味,会加大他们(母公司和阿里巴巴)融合的难度。极端点,Flink可能就由一个社区项目变成一个公司产品。阿里开源了那么多东西,有几个达到了真正的国际影响力,并且处于持续的发展之中的?公司加持对于社区而言,短期是利好,但是如果干预多了,长期就不被看好了。

Presto是facebook开源的并且运作,一切以满足公司需求为最高优先级,虽然presto很优秀,但是社区没有主导权,极大的限制了他的发展,终于发生了分裂(大家可以自己搜搜)。

最后加一句

不要再拿Spark streaming 和Flink比了,请拿Structured streaming 以及Continue Processing 来和Flink比。为啥国内还在拿Spark Streaming 和Flink比?

因为惯性使然,structured streaming 新引入了一堆概念,并且限制也比较多,spark streaming大家把之前该遇到的问题都遇到了,而且也有一定的积累,要切换就没那么容易了。加上flink有阿里加持,宣传势头很大,可能有的直接就从,spark streaming 切到flink去了。

大家都在看

spark sql源码系列:

是时候学习真正的spark技术了从0到1认识 spark sqlspark sql 源码剖析 PushDownPredicatespark sql 源码剖析 OptimizeIn 篇

structured streaming 系列:

structured streaming 原理剖析structured streaming 碰上kafkastructured streaming 是如何搞定乱序时间的

spark streaming 系列:

spark streaming 读取kafka各种姿势详解spark streaming流式计算中的困境与解决之道

spark core 系列:

彻底搞懂spark shuffle过程(1)彻底搞懂spark shuffle过程(2)spark内存管理-Tungsten框架探秘

spark 机器学习系列

关注【spark技术分享】

一起撸spark源码,一起玩spark最佳实践

写在阿里Blink正式开源之际

原文始发于微信公众号(spark技术分享):写在阿里Blink正式开源之际

一文搞懂spark sql中的CBO(基于代价的优化)

admin阅读(4602)


Spark CBO 背景。。。。。。。。。。。

我们在 是时候学习真正的spark技术了 这篇文章中介绍了很多基于规则的优化方式,也就是RBO,实现简单有效。它属于 LogicalPlan 的优化,所有优化均基于 LogicalPlan 本身的特点,未考虑数据本身的特点,也未考虑算子本身的代价。


本文将介绍 CBO,它充分考虑了数据本身的特点(如大小、分布)以及操作算子的特点(中间结果集的分布及大小)及代价,从而更好的选择执行代价最小的物理执行计划,即 SparkPlan。


Spark CBO 原理

CBO 原理是计算所有可能的物理计划的代价,并挑选出代价最小的物理执行计划。其核心在于评估一个给定的物理执行计划的代价。


物理执行计划是一个树状结构,其代价等于每个执行节点的代价总合,如下图所示。


一文搞懂spark sql中的CBO(基于代价的优化)


而每个执行节点的代价,分为两个部分


  • 该执行节点对数据集的影响,或者说该节点输出数据集的大小与分布

  • 该执行节点操作算子的代价


每个操作算子的代价相对固定,可用规则来描述。而执行节点输出数据集的大小与分布,分为两个部分:1) 初始数据集,也即原始表,其数据集的大小与分布可直接通过统计得到;2)中间节点输出数据集的大小与分布可由其输入数据集的信息与操作本身的特点推算。


所以,最终主要需要解决两个问题

  • 如何获取原始数据集的统计信息

  • 如何根据输入数据集估算特定算子的输出数据集


Statistics 收集

过如下 SQL 语句,可计算出整个表的记录总数以及总大小


一文搞懂spark sql中的CBO(基于代价的优化)


从如下示例中,Statistics 一行可见, customer 表数据总大小为 37026233 字节,即 35.3MB,总记录数为 28万,与事实相符。


一文搞懂spark sql中的CBO(基于代价的优化)


通过如下 SQL 语句,可计算出指定列的统计信息


一文搞懂spark sql中的CBO(基于代价的优化)


从如下示例可见,customer 表的 c_customer_sk 列最小值为 1, 最大值为 280000,null 值个数为 0,不同值个数为 274368,平均列长度为 8,最大列长度为 8。


一文搞懂spark sql中的CBO(基于代价的优化)


除上述示例中的统计信息外,Spark CBO 还直接等高直方图。在上例中,histogram 为 NULL。其原因是,spark.sql.statistics.histogram.enabled 默认值为 false,也即 ANALYZE 时默认不计算及存储 histogram。


下例中,通过 SET spark.sql.statistics.histogram.enabled=true; 启用 histogram 后,完整的统计信息如下:


一文搞懂spark sql中的CBO(基于代价的优化)


从上图可见,生成的 histogram 为 equal-height histogram,且高度为 1102.36,bin 数为 254。其中 bin 个数可由 spark.sql.statistics.histogram.numBins 配置。对于每个 bin,匀记录其最小值,最大值,以及 distinct count。


值得注意的是,这里的 distinct count 并不是精确值,而是通过 HyperLogLog 计算出来的近似值。使用 HyperLogLog 的原因有二

  • 使用 HyperLogLog 计算 distinct count 速度快速

  • HyperLogLog 计算出的 distinct count 可以合并。例如可以直接将两个 bin 的 HyperLogLog 值合并算出这两个 bin 总共的 distinct count,而无须从重新计算,且合并结果的误差可控


算子对数据集影响估计

对于中间算子,可以根据输入数据集的统计信息以及算子的特性,可以估算出输出数据集的统计结果。


一文搞懂spark sql中的CBO(基于代价的优化)


本节以 Filter 为例说明算子对数据集的影响。


对于常见的 Column A < value B Filter,可通过如下方式估算输出中间结果的统计信息

  • 若 B < A.min,则无数据被选中,输出结果为空

  • 若 B > A.max,则全部数据被选中,输出结果与 A 相同,且统计信息不变

  • 若 A.min < B < A.max,则被选中的数据占比为 (B.value – A.min) / (A.max – A.min),A.min 不变,A.max 更新为 B.value,A.ndv = A.ndv * (B.value – A.min) / (A.max – A.min)



一文搞懂spark sql中的CBO(基于代价的优化)


上述估算的前提是,字段 A 数据均匀分布。但很多时候,数据分布并不均匀,且当数据倾斜严重是,上述估算误差较大。此时,可充分利用 histogram 进行更精确的估算

一文搞懂spark sql中的CBO(基于代价的优化)


启用 Historgram 后,Filter Column A < value B的估算方法为

  • 若 B < A.min,则无数据被选中,输出结果为空

  • 若 B > A.max,则全部数据被选中,输出结果与 A 相同,且统计信息不变

  • 若 A.min < B < A.max,则被选中的数据占比为 height(<B) / height(All),A.min 不变,A.max = B.value,A.ndv = ndv(<B)


在上图中,B.value = 15,A.min = 0,A.max = 32,bin 个数为 10。Filter 后 A.ndv = ndv(<B.value) = ndv(<15)。该值可根据 A < 15 的 5 个 bin 的 ndv 通过 HyperLogLog 合并而得,无须重新计算所有 A < 15 的数据。



算子代价估计

SQL 中常见的操作有 Selection(由 select 语句表示),Filter(由 where 语句表示)以及笛卡尔乘积(由 join 语句表示)。其中代价最高的是 join。


Spark SQL 的 CBO 通过如下方法估算 join 的代价



其中 rows 即记录行数代表了 CPU 代价,size 代表了 IO 代价。weight 由 spark.sql.cbo.joinReorder.card.weight 决定,其默认值为 0.7。



Build侧选择

对于两表Hash Join,一般选择小表作为build size,构建哈希表,另一边作为 probe side。未开启 CBO 时,根据表原始数据大小选择 t2 作为build side


一文搞懂spark sql中的CBO(基于代价的优化)


而开启 CBO 后,基于估计的代价选择 t1 作为 build side。更适合本例

一文搞懂spark sql中的CBO(基于代价的优化)


优化 Join 类型

Spark SQL 中,Join 可分为 Shuffle based Join 和 BroadcastJoin。Shuffle based Join 需要引入 Shuffle,代价相对较高。BroadcastJoin 无须 Join,但要求至少有一张表足够小,能通过 Spark 的 Broadcast 机制广播到每个 Executor 中。


在不开启 CBO 中,Spark SQL 通过 spark.sql.autoBroadcastJoinThreshold 判断是否启用 BroadcastJoin。其默认值为 10485760 即 10 MB。


并且该判断基于参与 Join 的表的原始大小。


在下图示例中,Table 1 大小为 1 TB,Table 2 大小为 20 GB,因此在对二者进行 join 时,由于二者都远大于自动 BroatcastJoin 的阈值,因此 Spark SQL 在未开启 CBO 时选用 SortMergeJoin 对二者进行 Join。


而开启 CBO 后,由于 Table 1 经过 Filter 1 后结果集大小为 500 GB,Table 2 经过 Filter 2 后结果集大小为 10 MB 低于自动 BroatcastJoin 阈值,因此 Spark SQL 选用 BroadcastJoin。

一文搞懂spark sql中的CBO(基于代价的优化)


优化多表 Join 顺序

未开启 CBO 时,Spark SQL 按 SQL 中 join 顺序进行 Join。极端情况下,整个 Join 可能是 left-deep tree。在下图所示 TPC-DS Q25 中,多路 Join 存在如下问题,因此耗时 241 秒。

  • left-deep tree,因此所有后续 Join 都依赖于前面的 Join 结果,各 Join 间无法并行进行

  • 前面的两次 Join 输入输出数据量均非常大,属于大 Join,执行时间较长

一文搞懂spark sql中的CBO(基于代价的优化)


开启 CBO 后, Spark SQL 将执行计划优化如下

一文搞懂spark sql中的CBO(基于代价的优化)


优化后的 Join 有如下优势,因此执行时间降至 71 秒

  • Join 树不再是 left-deep tree,因此 Join 3 与 Join 4 可并行进行,Join 5 与 Join 6 可并行进行

  • 最大的 Join 5 输出数据只有两百万条结果,Join 6 有 1.49 亿条结果,Join 7相当于小 Join


总结

  • RBO 基于规则的优化方式,实现简单有效。它属于 LogicalPlan 的优化,所有优化均基于 LogicalPlan 本身的特点,未考虑数据本身的特点,也未考虑算子本身的代价

  • 本文介绍的 CBO 考虑了数据的统计特征,从而选择总代价最低的物理执行计划。但物理执行计划是固定的,一旦选定,不可更改。未考虑运行时信息

  • 之前介绍的  Spark SQL Adapptive Execution ,它可根据运行时信息动态调整执行计划从而优化执行





大家都在看

spark sql源码系列:

是时候学习真正的spark技术了  从0到1认识 spark sql  spark sql 源码剖析 PushDownPredicate   spark sql 源码剖析 OptimizeIn 篇

structured streaming 系列:

structured streaming 原理剖析   structured streaming 碰上kafka structured streaming 是如何搞定乱序时间的

spark streaming 系列:

spark streaming 读取kafka各种姿势详解   spark streaming流式计算中的困境与解决之道 

spark core 系列:

彻底搞懂spark shuffle过程(1)  彻底搞懂spark shuffle过程(2)  spark内存管理-Tungsten框架探秘

spark 机器学习系列





关注【spark技术分享】


一起撸spark源码,一起玩spark最佳实践


一文搞懂spark sql中的CBO(基于代价的优化)


原文始发于微信公众号(spark技术分享):一文搞懂spark sql中的CBO(基于代价的优化)

Adaptive Execution 让 Spark SQL 更智能更高效

admin阅读(4288)

1 背景

spark sql 的catalyst框架在内部通过ROB(基于规则的优化)和 CBO(基于成本的优化),从查询本身与目标数据的特点的角度尽可能保证了最终生成的执行计划的高效性。但是

  • 执行计划一旦生成,便不可更改,即使执行过程中发现后续执行计划可以进一步优化,也只能按原计划执行
  • CBO 基于统计信息生成最优执行计划,需要提前生成统计信息,成本较大,且不适合数据更新频繁的场景
  • CBO 基于基础表的统计信息与操作对数据的影响推测中间结果的信息,只是估算,不够精确

本文介绍的 Adaptive Execution 将可以根据执行过程中的中间数据优化后续执行,从而提高整体执行效率。核心在于两点

  • 执行计划可动态调整
  • 调整的依据是中间结果的精确统计信息

2 动态设置 Shuffle Partition

2.1 Spark Shuffle 原理

Spark Shuffle 一般用于将上游 Stage 中的数据按 Key 分区,保证来自不同 Mapper (表示上游 Stage 的 Task)的相同的 Key 进入相同的 Reducer (表示下游 Stage 的 Task)。一般用于 group by 或者 Join 操作。

Adaptive Execution 让 Spark SQL 更智能更高效
Spark Shuffle 过程

如上图所示,该 Shuffle 总共有 2 个 Mapper 与 5 个 Reducer。每个 Mapper 会按相同的规则(由 Partitioner 定义)将自己的数据分为五份。每个 Reducer 从这两个 Mapper 中拉取属于自己的那一份数据。

2.2 原有 Shuffle 的问题

使用 Spark SQL 时,可通过 spark.sql.shuffle.partitions 指定 Shuffle 时 Partition 个数,也即 Reducer 个数

该参数决定了一个 Spark SQL Job 中包含的所有 Shuffle 的 Partition 个数。如下图所示,当该参数值为 3 时,所有 Shuffle 中 Reducer 个数都为 3

Adaptive Execution 让 Spark SQL 更智能更高效
Spark SQL with multiple Shuffle

这种方法有如下问题

  • Partition 个数不宜设置过大
  • Reducer(代指 Spark Shuffle 过程中执行 Shuffle Read 的 Task) 个数过多,每个 Reducer 处理的数据量过小。大量小 Task 造成不必要的 Task 调度开销与可能的资源调度开销(如果开启了 Dynamic Allocation)
  • Reducer 个数过大,如果 Reducer 直接写 HDFS 会生成大量小文件,从而造成大量 addBlock RPC,Name node 可能成为瓶颈,并影响其它使用 HDFS 的应用
  • 过多 Reducer 写小文件,会造成后面读取这些小文件时产生大量 getBlock RPC,对 Name node 产生冲击
  • Partition 个数不宜设置过小
  • 每个 Reducer 处理的数据量太大,Spill 到磁盘开销增大
  • Reducer GC 时间增长
  • Reducer 如果写 HDFS,每个 Reducer 写入数据量较大,无法充分发挥并行处理优势
  • 很难保证所有 Shuffle 都最优
  • 不同的 Shuffle 对应的数据量不一样,因此最优的 Partition 个数也不一样。使用统一的 Partition 个数很难保证所有 Shuffle 都最优
  • 定时任务不同时段数据量不一样,相同的 Partition 数设置无法保证所有时间段执行时都最优

2.3 自动设置 Shuffle Partition 原理

如 Spark Shuffle 原理 一节图中所示,Stage 1 的 5 个 Partition 数据量分别为 60MB,40MB,1MB,2MB,50MB。其中 1MB 与 2MB 的 Partition 明显过小(实际场景中,部分小 Partition 只有几十 KB 及至几十字节)

开启 Adaptive Execution 后

  • Spark 在 Stage 0 的 Shuffle Write 结束后,根据各 Mapper 输出,统计得到各 Partition 的数据量,即 60MB,40MB,1MB,2MB,50MB
  • 通过 ExchangeCoordinator 计算出合适的 post-shuffle Partition 个数(即 Reducer)个数(本例中 Reducer 个数设置为 3)
  • 启动相应个数的 Reducer 任务
  • 每个 Reducer 读取一个或多个 Shuffle Write Partition 数据(如下图所示,Reducer 0 读取 Partition 0,Reducer 1 读取 Partition 1、2、3,Reducer 2 读取 Partition 4)

    Adaptive Execution 让 Spark SQL 更智能更高效
    Spark SQL adaptive reducer 1

三个 Reducer 这样分配是因为

  • targetPostShuffleInputSize 默认为 64MB,每个 Reducer 读取数据量不超过 64MB
  • 如果 Partition 0 与 Partition 2 结合,Partition 1 与 Partition 3 结合,虽然也都不超过 64 MB。但读完 Partition 0 再读 Partition 2,对于同一个 Mapper 而言,如果每个 Partition 数据比较少,跳着读多个 Partition 相当于随机读,在 HDD 上性能不高
  • 目前的做法是只结合相临的 Partition,从而保证顺序读,提高磁盘 IO 性能
  • 该方案只会合并多个小的 Partition,不会将大的 Partition 拆分,因为拆分过程需要引入一轮新的 Shuffle
  • 基于上面的原因,默认 Partition 个数(本例中为 5)可以大一点,然后由 ExchangeCoordinator 合并。如果设置的 Partition 个数太小,Adaptive Execution 在此场景下无法发挥作用

由上图可见,Reducer 1 从每个 Mapper 读取 Partition 1、2、3 都有三根线,是因为原来的 Shuffle 设计中,每个 Reducer 每次通过 Fetch 请求从一个特定 Mapper 读数据时,只能读一个 Partition 的数据。也即在上图中,Reducer 1 读取 Mapper 0 的数据,需要 3 轮 Fetch 请求。对于 Mapper 而言,需要读三次磁盘,相当于随机 IO。

为了解决这个问题,Spark 新增接口,一次 Shuffle Read 可以读多个 Partition 的数据。如下图所示,Task 1 通过一轮请求即可同时读取 Task 0 内 Partition 0、1 和 2 的数据,减少了网络请求数量。同时 Mapper 0 一次性读取并返回三个 Partition 的数据,相当于顺序 IO,从而提升了性能。 

Adaptive Execution 让 Spark SQL 更智能更高效
Spark SQL adaptive reducer 2

由于 Adaptive Execution 的自动设置 Reducer 是由 ExchangeCoordinator 根据 Shuffle Write 统计信息决定的,因此即使在同一个 Job 中不同 Shuffle 的 Reducer 个数都可以不一样,从而使得每次 Shuffle 都尽可能最优。

上文 原有 Shuffle 的问题 一节中的例子,在启用 Adaptive Execution 后,三次 Shuffle 的 Reducer 个数从原来的全部为 3 变为 2、4、3。

Adaptive Execution 让 Spark SQL 更智能更高效
Spark SQL with adaptive Shuffle

2.4 使用与优化方法

spark.sql.adaptive.enabled=true 启用 Adaptive Execution 从而启用自动设置 Shuffle Reducer 这一特性

spark.sql.adaptive.shuffle.targetPostShuffleInputSize 可设置每个 Reducer 读取的目标数据量,其单位是字节,默认值为 64 MB。上文例子中,如果将该值设置为 50 MB,最终效果仍然如上文所示,而不会将 Partition 0 的 60MB 拆分。具体原因上文已说明

3 动态调整执行计划

3.1 固定执行计划的不足

在不开启 Adaptive Execution 之前,执行计划一旦确定,即使发现后续执行计划可以优化,也不可更改。如下图所示,SortMergJoin 的 Shuffle Write 结束后,发现 Join 一方的 Shuffle 输出只有 46.9KB,仍然继续执行 SortMergeJoin

Adaptive Execution 让 Spark SQL 更智能更高效
Spark SQL with fixed DAG

此时完全可将 SortMergeJoin 变更为 BroadcastJoin 从而提高整体执行效率。

3.2 SortMergeJoin 原理

SortMergeJoin 是常用的分布式 Join 方式,它几乎可使用于所有需要 Join 的场景。但有些场景下,它的性能并不是最好的。

SortMergeJoin 的原理如下图所示

  • 将 Join 双方以 Join Key 为 Key 按照 HashPartitioner 分区,且保证分区数一致
  • Stage 0 与 Stage 1 的所有 Task 在 Shuffle Write 时,都将数据分为 5 个 Partition,并且每个 Partition 内按 Join Key 排序
  • Stage 2 启动 5 个 Task 分别去 Stage 0 与 Stage 1 中所有包含 Partition 分区数据的 Task 中取对应 Partition 的数据。(如果某个 Mapper 不包含该 Partition 的数据,则 Redcuer 无须向其发起读取请求)。
  • Stage 2 的 Task 2 分别从 Stage 0 的 Task 0、1、2 中读取 Partition 2 的数据,并且通过 MergeSort 对其进行排序
  • Stage 2 的 Task 2 分别从 Stage 1 的 Task 0、1 中读取 Partition 2 的数据,且通过 MergeSort 对其进行排序
  • Stage 2 的 Task 2 在上述两步 MergeSort 的同时,使用 SortMergeJoin 对二者进行 Join
Adaptive Execution 让 Spark SQL 更智能更高效
Spark SQL SortMergeJoin

3.3 BroadcastJoin 原理

当参与 Join 的一方足够小,可全部置于 Executor 内存中时,可使用 Broadcast 机制将整个 RDD 数据广播到每一个 Executor 中,该 Executor 上运行的所有 Task 皆可直接读取其数据。(本文中,后续配图,为了方便展示,会将整个 RDD 的数据置于 Task 框内,而隐藏 Executor)

对于大 RDD,按正常方式,每个 Task 读取并处理一个 Partition 的数据,同时读取 Executor 内的广播数据,该广播数据包含了小 RDD 的全量数据,因此可直接与每个 Task 处理的大 RDD 的部分数据直接 Join

Adaptive Execution 让 Spark SQL 更智能更高效
Spark SQL BroadcastJoin

根据 Task 内具体的 Join 实现的不同,又可分为 BroadcastHashJoin 与 BroadcastNestedLoopJoin。后文不区分这两种实现,统称为 BroadcastJoin

与 SortMergeJoin 相比,BroadcastJoin 不需要 Shuffle,减少了 Shuffle 带来的开销,同时也避免了 Shuffle 带来的数据倾斜,从而极大地提升了 Job 执行效率

同时,BroadcastJoin 带来了广播小 RDD 的开销。另外,如果小 RDD 过大,无法存于 Executor 内存中,则无法使用 BroadcastJoin

对于基础表的 Join,可在生成执行计划前,直接通过 HDFS 获取各表的大小,从而判断是否适合使用 BroadcastJoin。但对于中间表的 Join,无法提前准确判断中间表大小从而精确判断是否适合使用 BroadcastJoin

《Spark SQL 性能优化再进一步 CBO 基于代价的优化》一文介绍的 CBO 可通过表的统计信息与各操作对数据统计信息的影响,推测出中间表的统计信息,但是该方法得到的统计信息不够准确。同时该方法要求提前分析表,具有较大开销

而开启 Adaptive Execution 后,可直接根据 Shuffle Write 数据判断是否适用 BroadcastJoin

3.4 动态调整执行计划原理

如上文 SortMergeJoin 原理 中配图所示,SortMergeJoin 需要先对 Stage 0 与 Stage 1 按同样的 Partitioner 进行 Shuffle Write

Shuffle Write 结束后,可从每个 ShuffleMapTask 的 MapStatus 中统计得到按原计划执行时 Stage 2 各 Partition 的数据量以及 Stage 2 需要读取的总数据量。(一般来说,Partition 是 RDD 的属性而非 Stage 的属性,本文为了方便,不区分 Stage 与 RDD。可以简单认为一个 Stage 只有一个 RDD,此时 Stage 与 RDD 在本文讨论范围内等价)

如果其中一个 Stage 的数据量较小,适合使用 BroadcastJoin,无须继续执行 Stage 2 的 Shuffle Read。相反,可利用 Stage 0 与 Stage 1 的数据进行 BroadcastJoin,如下图所示

Adaptive Execution 让 Spark SQL 更智能更高效
Spark SQL Auto BroadcastJoin

具体做法是

  • 将 Stage 1 全部 Shuffle Write 结果广播出去
  • 启动 Stage 2,Partition 个数与 Stage 0 一样,都为 3
  • 每个 Stage 2 每个 Task 读取 Stage 0 每个 Task 的 Shuffle Write 数据,同时与广播得到的 Stage 1 的全量数据进行 Join

注:广播数据存于每个 Executor 中,其上所有 Task 共享,无须为每个 Task 广播一份数据。上图中,为了更清晰展示为什么能够直接 Join 而将 Stage 2 每个 Task 方框内都放置了一份 Stage 1 的全量数据

虽然 Shuffle Write 已完成,将后续的 SortMergeJoin 改为 Broadcast 仍然能提升执行效率

  • SortMergeJoin 需要在 Shuffle Read 时对来自 Stage 0 与 Stage 1 的数据进行 Merge Sort,并且可能需要 Spill 到磁盘,开销较大
  • SortMergeJoin 时,Stage 2 的所有 Task 需要取 Stage 0 与 Stage 1 的所有 Task 的输出数据(如果有它要的数据 ),会造成大量的网络连接。且当 Stage 2 的 Task 较多时,会造成大量的磁盘随机读操作,效率不高,且影响相同机器上其它 Job 的执行效率
  • SortMergeJoin 时,Stage 2 每个 Task 需要从几乎所有 Stage 0 与 Stage 1 的 Task 取数据,无法很好利用 Locality
  • Stage 2 改用 Broadcast,每个 Task 直接读取 Stage 0 的每个 Task 的数据(一对一),可很好利用 Locality 特性。最好在 Stage 0 使用的 Executor 上直接启动 Stage 2 的 Task。如果 Stage 0 的 Shuffle Write 数据并未 Spill 而是在内存中,则 Stage 2 的 Task 可直接读取内存中的数据,效率非常高。如果有 Spill,那可直接从本地文件中读取数据,且是顺序读取,效率远比通过网络随机读数据效率高

3.5 使用与优化方法

该特性的使用方式如下

  • 当 spark.sql.adaptive.enabled 与 spark.sql.adaptive.join.enabled 都设置为 true 时,开启 Adaptive Execution 的动态调整 Join 功能
  • spark.sql.adaptiveBroadcastJoinThreshold 设置了 SortMergeJoin 转 BroadcastJoin 的阈值。如果不设置该参数,该阈值与 spark.sql.autoBroadcastJoinThreshold 的值相等
  • 除了本文所述 SortMergeJoin 转 BroadcastJoin,Adaptive Execution 还可提供其它 Join 优化策略。部分优化策略可能会需要增加 Shuffle。spark.sql.adaptive.allowAdditionalShuffle 参数决定了是否允许为了优化 Join 而增加 Shuffle。其默认值为 false

4 自动处理数据倾斜

4.1 解决数据倾斜典型方案

  • 保证文件可 Split 从而避免读 HDFS 时数据倾斜
  • 保证 Kafka 各 Partition 数据均衡从而避免读 Kafka 引起的数据倾斜
  • 调整并行度或自定义 Partitioner 从而分散分配给同一 Task 的大量不同 Key
  • 使用 BroadcastJoin 代替 ReduceJoin 消除 Shuffle 从而避免 Shuffle 引起的数据倾斜
  • 对倾斜 Key 使用随机前缀或后缀从而分散大量倾斜 Key,同时将参与 Join 的小表扩容,从而保证 Join 结果的正确性

4.2 自动解决数据倾斜

目前 Adaptive Execution 可解决 Join 时数据倾斜问题。其思路可理解为将部分倾斜的 Partition (倾斜的判断标准为该 Partition 数据是所有 Partition Shuffle Write 中位数的 N 倍) 进行单独处理,类似于 BroadcastJoin,如下图所示

Adaptive Execution 让 Spark SQL 更智能更高效
Spark SQL resolve joinm skew

在上图中,左右两边分别是参与 Join 的 Stage 0 与 Stage 1 (实际应该是两个 RDD 进行 Join,但如同上文所述,这里不区分 RDD 与 Stage),中间是获取 Join 结果的 Stage 2

明显 Partition 0 的数据量较大,这里假设 Partition 0 符合“倾斜”的条件,其它 4 个 Partition 未倾斜

以 Partition 对应的 Task 2 为例,它需获取 Stage 0 的三个 Task 中所有属于 Partition 2 的数据,并使用 MergeSort 排序。同时获取 Stage 1 的两个 Task 中所有属于 Partition 2 的数据并使用 MergeSort 排序。然后对二者进行 SortMergeJoin

对于 Partition 0,可启动多个 Task

  • 在上图中,启动了两个 Task 处理 Partition 0 的数据,分别名为 Task 0-0 与 Task 0-1
  • Task 0-0 读取 Stage 0 Task 0 中属于 Partition 0 的数据
  • Task 0-1 读取 Stage 0 Task 1 与 Task 2 中属于 Partition 0 的数据,并进行 MergeSort
  • Task 0-0 与 Task 0-1 都从 Stage 1 的两个 Task 中所有属于 Partition 0 的数据
  • Task 0-0 与 Task 0-1 使用 Stage 0 中属于 Partition 0 的部分数据与 Stage 1 中属于 Partition 0 的全量数据进行 Join

通过该方法,原本由一个 Task 处理的 Partition 0 的数据由多个 Task 共同处理,每个 Task 需处理的数据量减少,从而避免了 Partition 0 的倾斜

对于 Partition 0 的处理,有点类似于 BroadcastJoin 的做法。但区别在于,Stage 2 的 Task 0-0 与 Task 0-1 同时获取 Stage 1 中属于 Partition 0 的全量数据,是通过正常的 Shuffle Read 机制实现,而非 BroadcastJoin 中的变量广播实现

4.3 使用与优化方法

开启与调优该特性的方法如下

  • 将 spark.sql.adaptive.skewedJoin.enabled 设置为 true 即可自动处理 Join 时数据倾斜
  • spark.sql.adaptive.skewedPartitionMaxSplits 控制处理一个倾斜 Partition 的 Task 个数上限,默认值为 5
  • spark.sql.adaptive.skewedPartitionRowCountThreshold 设置了一个 Partition 被视为倾斜 Partition 的行数下限,也即行数低于该值的 Partition 不会被当作倾斜 Partition 处理。其默认值为 10L * 1000 * 1000 即一千万
  • spark.sql.adaptive.skewedPartitionSizeThreshold设置了一个 Partition 被视为倾斜 Partition 的大小下限,也即大小小于该值的 Partition 不会被视作倾斜 Partition。其默认值为 64 * 1024 * 1024 也即 64MB
  • spark.sql.adaptive.skewedPartitionFactor 该参数设置了倾斜因子。如果一个 Partition 的大小大于 spark.sql.adaptive.skewedPartitionSizeThreshold的同时大于各 Partition 大小中位数与该因子的乘积,或者行数大于 spark.sql.adaptive.skewedPartitionRowCountThreshold 的同时大于各 Partition 行数中位数与该因子的乘积,则它会被视为倾斜的 Partition

大家都在看

spark sql源码系列:

是时候学习真正的spark技术了  从0到1认识 spark sql  spark sql 源码剖析 PushDownPredicate   spark sql 源码剖析 OptimizeIn 篇

structured streaming 系列:

structured streaming 原理剖析   structured streaming 碰上kafka structured streaming 是如何搞定乱序时间的

spark streaming 系列:

spark streaming 读取kafka各种姿势详解   spark streaming流式计算中的困境与解决之道 

spark core 系列:

彻底搞懂spark shuffle过程(1)  彻底搞懂spark shuffle过程(2)  spark内存管理-Tungsten框架探秘

spark 机器学习系列





关注【spark技术分享】


一起撸spark源码,一起玩spark最佳实践


Adaptive Execution 让 Spark SQL 更智能更高效

原文始发于微信公众号(spark技术分享):Adaptive Execution 让 Spark SQL 更智能更高效

关注公众号:spark技术分享

联系我们联系我们