关注 spark技术分享,
撸spark源码 玩spark最佳实践

实战例子|自定义一个窗口函数来计算网站的会话数

admin阅读(1134)

If you’ve worked with Spark, you have probably written some custom UDF or UDAFs.
UDFs are ‘User Defined Functions’, so you can introduce complex logic in your queries/jobs, for instance, to calculate a digest for a string, or if you want to use a java/scala library in your queries.

UDAF stands for ‘User Defined Aggregate Function’ and it works on aggregates, so you can implement functions that can be used in a GROUP BY clause, similar to AVG.

You may not be familiar with Window functions, which are similar to aggregate functions, but they add a layer of complexity, since they are applied within a PARTITION BY clause. An example of window function is RANK(). You can read more about window functions here.

在使用 spark sql 的时候,有时候默认提供的sql 函数可能满足不了需求,这时候可以自定义一些函数,也就是UDF 或者UDAF(顾名思义,User Defined Functions)。

UDF 只是在sql中简单的处理转换一些字段,类似默认的trim 函数把一个字符串类型的列的头尾空格去掉, 还有一种sql函数叫做UDAF,不同于UDF,这种是在sql聚合语句中使用的sql函数,必须配合 GROUP BY 一同使用,类似默认的count,sum函数,但是还有一种自定义函数叫做 UDWF, 这种一般人就不知道了,这种叫做窗口自定义函数,不了解窗口函数的,可以参考上一篇文章,或者官方的介绍

While aggregate functions work over a group, window functions work over a logical window of record and allow you to produce new columns from the combination of a record and one or more records in the window.
Describing what window functions are is beyond the scope of this article, so for that refer to the previously mentioned article from Databricks, but in particular, we are interested at the ‘previous event in time for a user’ in order to figure out sessions.

There is plenty of documentation on how to write UDFs and UDAFs, see for instance This link for UDFs or this link for UDAFs.

I was surprised to find out there’s not much info on how to build an custom window function, so I dug up the source code for spark and started looking at how window functions are implemented. That opened to me a whole new world, since Window functions, although conceptually similar to UDAFs, use a lower level Spark API than UDAFs, they are written using Catalyst expressions.

窗口函数是 SQL 中一类特别的函数。和聚合函数相似,窗口函数的输入也是多行记录。不 同的是,聚合函数的作用于由 GROUP BY 子句聚合的组,而窗口函数则作用于一个窗口

这里怎么理解一个窗口呢,spark君在这里得好好的解释解释,一个窗口是怎么定义的,

窗口语句中,partition by用来指定分区的列,在同一个分区的行属于同一个窗口

order by用来指定数据在一个窗口内的多行,如何排序

windowing_clause 用来指定开窗方式,在spark sql 中开窗方式有那么几种

  • 一个分区中的所有行作为一个窗口:UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING(上下都没有边界),这种情况下,spark sql 会把所有行作为一个输入,进行一次求值
  • Growing frame:UNBOUNDED PRECEDING AND ….(上无边界), 这种就是不断的把当前行加入的窗口中,而不删除, 例子:.rowsBetween(Long.MinValue, 0) :窗口的大小是按照排序从最小值到当前行,在数据迭代过程中,不断的把当前行加入的窗口中。
  • Shrinking frame:… AND UNBOUNDED FOLLOWING(下无边界)和Growing frame 相反,窗口不断的把迭代到的当前行从窗口中删除掉。
  • Moving frame:滑动的窗口,举例:.rowsBetween(-1, 1) 就是指窗口定义从 -1(当前行前一行)到 1(当前行后一行) ,每一个滑动的窗口总用有3行
  • Offset frame  窗口中只有一条数据,就是偏移当前行一定距离的哪一行,举例:lag(field, n): 就是取从当前字段往前第n个值

这里就针对窗口函数就介绍这么多,如果不懂请参考相关文档,加强理解,我们在平时使用 spark sql 的过程中,会发现有很多教你自定义 UDF 和 UDAF 的教程,却没有针对UDWF的教程,这是为啥呢,这是因为 UDF 和UDAF 都作为上层API暴露给用户了,使用scala很简单就可以写一个函数出来,但是UDWF没有对上层用户暴露,只能使用 Catalyst expressions. 也就是Catalyst框架底层的表达式语句才可以定义,如果没有对源码有很深入的研究,根本就搞不出来。spark 君在工作中写了一些UDWF的函数,但是都比较复杂,不太好单独抽出来作为一个简明的例子给大家讲解,这里翻译一篇文章来做说明。

窗口函数的使用场景

Now, for what kind of problem do we need window functions in the first place?
A common problem when working on any kind of website, is to determine ‘user sessions’, periods of user activity. if an user is inactive for a certain time T, then it’s considered a new ‘session’. Statistics over sessions are used to determine for instance if the user is a bot, to find out what pages have the most activity, etc.

Let’s say that we consider a session over if we don’t see any activity for one hour (sixty minutes). Let’s see an example of user activity, where ‘event’ has the name of the page the user visited and time is the time of the event. I simplified it, since the event would be a URL, while the time would be a full timestamp, and the session id would be generated as a random UUID, but I put simpler names/times just to illustrate the logic.

我们来举个实际例子来说明 窗口函数的使用场景,在网站的统计指标中,有一个概念叫做用户会话,什么叫做用户会话呢,我来说明一下,我们在网站服务端使用用户session来管理用户状态,过程如下

1) 服务端session是用户第一次访问应用时,服务器就会创建的对象,代表用户的一次会话过程,可以用来存放数据。服务器为每一个session都分配一个唯一的sessionid,以保证每个用户都有一个不同的session对象。

2)服务器在创建完session后,会把sessionid通过cookie返回给用户所在的浏览器,这样当用户第二次及以后向服务器发送请求的时候,就会通过cookie把sessionid传回给服务器,以便服务器能够根据sessionid找到与该用户对应的session对象。

3)session通常有失效时间的设定,比如1个小时。当失效时间到,服务器会销毁之前的session,并创建新的session返回给用户。但是只要用户在失效时间内,有发送新的请求给服务器,通常服务器都会把他对应的session的失效时间根据当前的请求时间再延长1个小时。

也就是说如果用户在1个超过一个小时不产生用户事件,当前会话就结束了,如果后续再产生用户事件,就当做新的用户会话,我们现在就使用spark sql 来统计用户的会话数,这种场景就很适合使用窗口函数来做统计,因为判断当前是否是一个新会话的依据,需要依赖当前行的前一行的时间戳和当前行的时间戳的间隔来判断,下面的表格可以帮助你理解这个概念,例子中有3列数据,用户,event字段代表用户访问了一个页面,time字段代表访问页面的时间戳:

usereventtimesession
user1page110:12session1 (new session)
user1page210:20session1 (same session, 8 minutes from last event)
user1page111:13session1 (same session, 53 minutes from last event)
user1page314:12session2 (new session, 3 hours after last event)

Note that this is the activity for one user. We do have many users, and in fact partitioning by user is the job of the window function.

上面只有一个用户,如果多个用户,可以使用 partition by 来进行分区。

深入研究

It’s better to use an example to illustrate how the function works in respect of the window definition.
Let’s assume we have a very simple user activity data, with a user ID called user, while tsis a numeric timestamp and session is a session ID, that may be already present. While we may start with no session whatsoever, in most practical cases, we may be processing data hourly, so at hour N + 1 we want to continue the sessions
we calculated at hour n.

Let’s create some test data and show what we want to achieve.

我们来构造一些假数据:

First, the window specification. Sessions are create per user, and the ordering is of course by timestamp.
Hence, we want to apply the function partitionBy user and orderBy timestamp.

怎么使用 spark sql 来统计会话数目呢,因为不同用户产生的是不同的会话,首先使用user字段进行分区,然后按照时间戳进行排序

We want to write a createSession function that will use the following logic:

这时候我们需要一个自定义函数来加一个列,这个列的值的逻辑如下

and will produce something like this:

运行结果如下:

usertssessionnewsession
user11508863564166f237e656-1e..f237e656-1e..
user11508864164166nullf237e656-1e..
user11508864464166nullf237e656-1e5..
user11508871964166null51c05c35-6f..
user11508873164166null51c05c35-6f..
user21508863864166null2c16b61a-6c..
user21508864464166null2c16b61a-6c..

Note that we are using random UUIDs as it’s pretty much the standard, and we’re shortening them for typographical reasons.

As you see, for each user, it will create a new session whenever the difference between two events is bigger than the session threshold.

Internally, for every record, we want to keep track of:

  • The current session ID
  • The timestamp of the previous session

This is going to be the state that we must maintain. Spark takes care of initializing it for us.
It is also going to be the parameters the function expects.

Let’s see the skeleton of the function:

我们使用 UUID 来作为会话id, 当后一行的时间戳和前一行的时间戳间隔大于1小时的时候,就创建一个新的会话id作为列值,否则使用老的会话id作为列值。

这种就涉及到状态,我们在内部需要维护的状态数据

  • 当前的session ID
  • 当前session的最后活动事件的时间戳

自定义函数的代码如下:

A few notes here:

  • Our ‘state’ is going to be a Seq[AttributeReference]
  • Each AttributeReference must be declared with its type. As we said, we keep the current Session and the timestamp of the previous one.
  • We inizialize it by overriding initialValues
  • For every record, within the window, spark will call first updateExpressions, then will produce the values calling evaluateExpression

Now it’s time to implement the updateExpressionsand evaluateExpression functions.

注解:

  • 状态保存在 Seq[AttributeReference]
  • 重写 initialValues方法进行初始化
  • spark sql 在迭代处理每一行数据的时候,都会调用 updateExpressions 函数来处理,根据当后一行的时间戳和前一行的时间戳间隔大于1小时来进行不同的逻辑处理,如果不大于,就使用 aggBufferAttributes(0) 中保存的老的sessionid,如果大于,就把 createNewSession 包装为一个scalaUDF作为一个子表达式来创建一个新的sessionID,并且每次都把当前行的时间戳作为用户活动的最后时间戳。

Notice how we use catalyst expressions, while in normal UDAFs we just use plain scala expressions.

Last thing, we need to declare a static method that we can invoke from the query that will instantiate the function. Notice how I created two, one that allows the user to specify what’s the max duration of a session, and one that takes the default:

最后包装为静态对象的方法,就可以在spark sql中使用这个自定义窗口函数了,下面是两个重载的方法,一个最大间隔时间使用默认值,一个可以运行用户自定义,perfect。

Now creating session IDs is as easy as:

现在,我们就可以拿来用在我们的main函数中了。

Notice that here we specified 10 second sessions.

There’s a little more piping involved which was omitted for clarity, but you can find the complete code, including unit tests, in my github project

spark window函数使用案例

admin阅读(4196)

什么是简单移动平均值

简单移动平均(英语:Simple Moving Average,SMA)是某变数之前n个数值的未作加权算术平均。例如,收市价的10日简单移动平均指之前10日收市价的平均数。

关键词请忽略:spark window函数 spark sql窗口函数 spark窗口函数 pyspark window pyspark window function spark sql row_number spark window condition spark sql first_value pyspark.sql.window pyspark window.partitionby

直接看例子吧

这个 window spec 中,数据根据用户(customer)来分去。每一个用户数据根据时间排序。然后,窗口定义从 -1(前一行)到 1(后一行) ,每一个滑动的窗口总用有3行

这段代码添加了一个新列,movingAvg,在滑动的窗口中使用了均值函数:

窗口函数和窗口特征定义

关键词请忽略:spark window函数 spark sql窗口函数 spark窗口函数 pyspark window pyspark window function spark sql row_number spark window condition spark sql first_value pyspark.sql.window pyspark window.partitionby

正如上述例子中,窗口函数主要包含两个部分:

  1. 指定窗口特征(wSpec)
    1. “partitionyBY” 定义数据如何分组;在上面的例子中,他是用户
    2. “orderBy” 定义分组中的排序
    3. “rowsBetween” 定义窗口的大小
  2. 指定窗口函数函数 你可以使用 org.apache.spark.sql.functions 的“聚合函数(Aggregate Functions)”和”窗口函数(Window Functions)“类别下的函数

累计汇总

.rowsBetween(Long.MinValue, 0) :窗口的大小是按照排序从最小值到当前行

前一行数据

关键词请忽略:spark window函数 spark sql窗口函数 spark窗口函数 pyspark window pyspark window function spark sql row_number spark window condition spark sql first_value pyspark.sql.window pyspark window.partitionby

lag(field, n): 就是取从当前字段往前第n个值,这里是取前一行的值

如果计算环比的时候,是不是特别有用啊?!

在介绍几个常用的行数:

  • first/last(): 提取这个分组特定排序的第一个最后一个,在获取用户退出的时候,你可能会用到
  • lag/lead(field, n): lead 就是 lag 相反的操作,这个用于做数据回测特别用,结果回推条件

排名

这个数据在提取每个分组的前n项时特别有用,省了不少麻烦。

关键词请忽略:spark window函数 spark sql窗口函数 spark窗口函数 pyspark window pyspark window function spark sql row_number spark window condition spark sql first_value pyspark.sql.window pyspark window.partitionby


Spark持续流处理和微批处理的对比

admin阅读(2262)

Spark从2.3版本开始引入了持续流式处理模型,可将流处理延迟降低至毫秒级别,让 Structured Streaming 达到了一个里程碑式的高度。

 

下面的架构图中,既有微批处理,还有持续流处理,两种模式对用户是暴露的API是高度统一的:

 

Spark持续流处理和微批处理的对比

今天我们着重看下两者的设计思路和区别点

关键字请忽略:spark continuous processing kafka continuous spark spark continuous mode spark continuous query spark continuous processing vs flink spark continuous processing 流式数据是什么 流数据库 流数据存储 实时流数据处理 流式处理英文 流处理乱序 批处理流式计算 什么叫流数据 structured streaming

在持续模式下,流处理器持续不断地从数据源拉取和处理数据,而不是每隔一段时间读取一个批次的数据,这样就可以及时地处理刚到达的数据。如下图所示,延迟被降低到毫秒级别,完全满足了极低延迟的要求。

 

Spark持续流处理和微批处理的对比

持续模式目前支持的 Dataset 操作包括 Projection、Selection 以及除 current_timestamp()、current_date()、聚合函数之外的 SQL 操作。它还支持将 Kafka 作为数据源和数据池(Sink),也支持将控制台和内存作为数据池。

开发者可以根据实际的延迟需求来选择使用持续模式还是微批次模式,总之,Structured Streaming 为开发者提供了容错和可靠性方面的保证。以及端到端的毫秒级延迟、至少一次处理保证等。

关键字请忽略:spark continuous processing kafka continuous spark spark continuous mode spark continuous query spark continuous processing vs flink spark continuous processing 流式数据是什么 流数据库 流数据存储 实时流数据处理 流式处理英文 流处理乱序 批处理流式计算 什么叫流数据 structured streaming

微批处理

Structured Streaming  默认使用微批模式,spark 引擎会定期检查是否有新数据到达,然后开启一个新的批次进行处理,如下图:

 

Spark持续流处理和微批处理的对比

在微批模式下, driver 在执行每个批次前,都需要先把 offset range 写入 WAL, 为了挂掉后可以 recover,当一条日志到达后,并不会立即处理,需要先处理完上一个批次,然后把这一个批次的 offset 记录后,才会处理,如下图:

 

Spark持续流处理和微批处理的对比

 

 

这种模式下,最低的延迟可以搞到 100ms, 这种模式是架构在 Spark SQL 上的,所以就坐享 spark SQL 中已有的优化方式(code generation 和 project Tungsten,参考 https://databricks.com/blog/2016/05/23/apache-spark-as-a-compiler-joining-a-billion-rows-per-second-on-a-laptop.html),这种模式主要是面向吞吐量进行设计,而且可以满足绝大部分应用场景,比如ETL和准实时监控,但是对于要求延迟在 10ms 的场景就力不从心了,所以2.3 版本中又引入了 持续流处理 模型。

关键字请忽略:spark continuous processing kafka continuous spark spark continuous mode spark continuous query spark continuous processing vs flink spark continuous processing 流式数据是什么 流数据库 流数据存储 实时流数据处理 流式处理英文 流处理乱序 批处理流式计算 什么叫流数据 structured streaming

持续流处理

在持续流模式下,spark不是定期调度新批次的任务,而是启动一直运行的驻守在 executor 上的任务,源源不断的进行读取处理输出数据,如下图:

 

Spark持续流处理和微批处理的对比

 

Spark持续流处理和微批处理的对比

 

 

因为在 executor 端是持续流处理的,所以最低延迟可以降到 几毫秒,spark 内部采用的分布式快照算法类似 Chandy-Lamport 算法,不过略有区别,在source 端隔一段时间注入特殊标记(epoch markers)到数据流,然后就相当于把数据切分为不同的 epochs, 当特殊标记流到 最后的 operator, executor 获取后,向driver 汇报,driver等齐所有executor的汇报,统一发号施令,统一提交,有点类似于二次提交算法,全部的 executor 都提交后,driver 再写入提交日志中,这个 epochs 就算是全部ok了,防止重复执行。

后续这个分布式算法的设计和实现我会抽一篇文章来单独介绍。

 

关键字请忽略:spark continuous processing kafka continuous spark spark continuous mode spark continuous query spark continuous processing vs flink spark continuous processing 流式数据是什么 流数据库 流数据存储 实时流数据处理 流式处理英文 流处理乱序 批处理流式计算 什么叫流数据 structured streaming

 

大家都在看

spark sql源码系列:

是时候学习真正的spark技术了从0到1认识 spark sqlspark sql 源码剖析 PushDownPredicatespark sql 源码剖析 OptimizeIn 篇

structured streaming 系列:

structured streaming 原理剖析structured streaming 碰上kafkastructured streaming 是如何搞定乱序时间的

spark streaming 系列:

spark streaming 读取kafka各种姿势详解spark streaming流式计算中的困境与解决之道

spark core 系列:

彻底搞懂spark shuffle过程(1)彻底搞懂spark shuffle过程(2)spark内存管理-Tungsten框架探秘

spark 机器学习系列

关注【spark技术分享】

一起撸spark源码,一起玩spark最佳实践

Spark持续流处理和微批处理的对比


原文始发于微信公众号(spark技术分享):Spark持续流处理和微批处理的对比

案例|使用 spark Pivot 处理复杂的数据统计需求

admin阅读(1737)

Pivot 算子是 spark 1.6 版本开始引入的,在 spark2.4版本中功能做了增强,还是比较强大的,做过数据清洗ETL工作的都知道,行列转换是一个常见的数据整理需求。spark 中的Pivot 可以根据枢轴点(Pivot Point) 把多行的值归并到一行数据的不同列,这个估计不太好理解,我们下面使用例子说明,看看pivot 这个算子在处理复杂数据时候的威力。

关键词:spark pivot用法 spark dataframe转置 spark行转列 spark pivot scala spark列转行

使用Pivot 来统计天气走势

下面是西雅图的天气数据表,每行代表一天的天气最高值:

Date Temp (°F)
07-22-2018 86
07-23-2018 90
07-24-2018 91
07-25-2018 92
07-26-2018 92
07-27-2018 88
07-28-2018 85
07-29-2018 94
07-30-2018 89

如果我们想看下最近几年的天气走势,如果这样一天一行数据,是很难看出趋势来的,最直观的方式是 按照年来分行,然后每一列代表一个月的平均天气,这样一行数据,就可以看到这一年12个月的一个天气走势,下面我们使用 pivot 来构造这样一个查询结果:

案例|使用 spark Pivot 处理复杂的数据统计需求

结果如下图:

案例|使用 spark Pivot 处理复杂的数据统计需求

 

是不是很直观,第一行就代表 2018 年,从1月到12月的平均天气,能看出一年的天气走势。

关键词:spark pivot用法 spark dataframe转置 spark行转列 spark pivot scala spark列转行

这个SQL应该怎么理解

我们来看下这个 sql 是怎么玩的,首先是一个 子查询语句,我们最终关心的是 年份,月份,和最高天气值,所以先使用子查询对原始数据进行处理,从日期里面抽取出来年份和月份。

下面在子查询的结果上使用 pivot 语句,pivot 第一个参数是一个聚合语句,这个代表聚合出来一个月30天的一个平均气温,第二个参数是 FOR month,这个是指定以哪个列为枢轴列,第三个 In 子语句指定我们需要进行行列转换的具体的 枢轴点(Pivot Point)的值,上面的例子中 1到12月份都包含了,而且给了一个别名,如果只指定 1到6月份,结果就如下了:

案例|使用 spark Pivot 处理复杂的数据统计需求

上面sql语句里面有个特别的点需要注意, 就是聚合的时候有个隐含的维度字段,就是 年份,按理来讲,我们没有写  group-by year, 为啥结果表里面不同年份区分在了不同的行,原因是,FORM 子查询出来的每行有 3个列, year,month,tmp,如果一个列既不出现在进行聚合计算的列中(temp 是聚合计算的列), 也不作为枢轴列 , 就会作为聚合的时候一个隐含的维度。我们的例子中算平均值聚合操作的维度是 (year, month),一个是隐含维度,一个是 枢轴列维度, 这一点一定要注意,如果不需要按照 year 来区分,FORM 查询的时候就不要加上这个列。

关键词:spark pivot用法 spark dataframe转置 spark行转列 spark pivot scala spark列转行

指定多个聚合语句

上文中只有一个聚合语句,就是计算平均天气,其实是可以加多个聚合语句的,比如我们需要看到 7,8,9 月份每个月的最大气温和平均气温,就可以用以下SQL语句

 

案例|使用 spark Pivot 处理复杂的数据统计需求

上文中指定了两个聚合语句,查询后, 枢轴点(Pivot Point)  和 聚合语句 的笛卡尔积作为结果的不同列,也就是  <value>_<aggExpr>, 看下图:

 

案例|使用 spark Pivot 处理复杂的数据统计需求

聚合列(Grouping Columns)和 枢轴列(Pivot Columns)的不同之处

现在假如我们有西雅图每天的最低温数据,我们需要把最高温和最低温放在同一张表里面看对比着看

Date Temp (°F)
08-01-2018 59
08-02-2018 58
08-03-2018 59
08-04-2018 58
08-05-2018 59
08-06-2018 59

 

我们使用  UNION ALL 把两张表做一个合并:

案例|使用 spark Pivot 处理复杂的数据统计需求

 

现在使用 pivot 来进行处理:

案例|使用 spark Pivot 处理复杂的数据统计需求

我们统计了 4年中  7,8,9 月份最低温和最高温的平均值,这里要注意的是,我们把 year 和 一个最低最高的标记(H/L)都作为隐含维度,不然算出来的就是最低最高温度在一起的平均值了。结果如下图:

案例|使用 spark Pivot 处理复杂的数据统计需求

上面的查询中,我们把最低最高的标记(H/L)也作为了一个隐含维度,group-by 的维度就变成了 (year, H/L, month), 但是year 和 H/L 体现在了不同行上面,month 体现在了不同的列上面,这就是  聚合列(Grouping Columns)和 枢轴列(Pivot Columns)的不同之处。

再接再厉,我们把 H/L 作为  枢轴列(Pivot Columns) 进行查询:

案例|使用 spark Pivot 处理复杂的数据统计需求

 

结果的展现方式就和上面的不同了,虽然每个单元格值都是相同的,但是把  H/L 和 month 笛卡尔积作为列,H/L 维度体现在了列上面了:

案例|使用 spark Pivot 处理复杂的数据统计需求

关键词:spark pivot用法 spark dataframe转置 spark行转列 spark pivot scala spark列转行

大家都在看

spark sql源码系列:

是时候学习真正的spark技术了从0到1认识 spark sqlspark sql 源码剖析 PushDownPredicatespark sql 源码剖析 OptimizeIn 篇

structured streaming 系列:

structured streaming 原理剖析structured streaming 碰上kafkastructured streaming 是如何搞定乱序时间的

spark streaming 系列:

spark streaming 读取kafka各种姿势详解spark streaming流式计算中的困境与解决之道

spark core 系列:

彻底搞懂spark shuffle过程(1)彻底搞懂spark shuffle过程(2)spark内存管理-Tungsten框架探秘

spark 机器学习系列

关注【spark技术分享】

一起撸spark源码,一起玩spark最佳实践

案例|使用 spark Pivot 处理复杂的数据统计需求

原文始发于微信公众号(spark技术分享):案例|使用 spark Pivot 处理复杂的数据统计需求

学习 | Spark 2.4 原生支持内置支持avro, spark read avro

admin阅读(894)

在hadoop 生态圈,我们经常会看到 avro, avro 是什么呢,首先 avro 是可以作为一种基于二进制数据传输高性能的中间件, 比如在 Flume 中,我们使用 avro sinker 来发送数据,使用 avro source 来接受网络数据。

avro 尽管提供了 rpc 机制,事实上Avro的核心特性决定了它通常用在“大数据”存储场景, 即我们通过借助schema将数据写入到“本地文件”或者HDFS中,然后reader再根据schema去迭代获取数据条目, 好处是它的schema 是动态可演变的,这种强大的可扩展性正是“文件数据”存储所必须的。

关键词: Spark 2.4 原生支持内置支持avro, spark read avro

在 spark2.4 之前的版本中,我们如果需要在 spark 中读取 avro 格式的文件,通常要使用第三方库, 也就是这个项目 https://github.com/databricks/spark-avro,  spark2.4中内置了对avro 格式的支持,有以下好处

  • 不用依赖第三方包,开箱即用

  • 可以在 DataFrame 中方便使用 from_avro() ,  to_avro()  转来转去

  • 支持 Logical types , 也就是我们自己扩展出来的 avro 格式

  • 读性能 2倍提升,写性能 提高10%, 原因下文中说明

可以看到,databrick 的人觉得 avro 还是一种很重要的格式,有必要把他放在 spark sql 源码中内置了,不用你再去别的地方去引用,同时官方也顺手做了些性能优化

一个简单的例子

spark2.4中,读取和写入 avro 格式极其方便,只要指定一下 format 为 avro 即可,指定后, DataFrameReader 和 DataFrameWriter 就可以找到相关的 source 实现类,当然这些我们完全不需要关心。

学习 | Spark 2.4 原生支持了avro, 别再引用第三方库了

from_avro() and to_avro()

 

我们都知道,在kafka 中是可以写入 avro 格式的数据,比如 flume 收集的数据就可以直接用 avro 格式写入 kafka, 针对这种场景, 官方内置了两个函数,from_avro() , to_avro(), 很实用,我们从kafka 中加载数据到 spark , schema 中有这么几个字段:

学习 | Spark 2.4 原生支持了avro, 别再引用第三方库了

 

关键词: Spark 2.4 原生支持内置支持avro, spark read avro

这里的 value 是一个字节数组,如果kafka中是 avro 格式的数据,那么这个字节数组中保存的就是 avro 格式的数据,这时候你只需要使用 from_avro()  函数,同时传入一个avro shema(你使用json 格式定义的 avro schema), 就可以轻松的转换,后面你就可以使用 avro 定义的字段进行相应的操作了 :

学习 | Spark 2.4 原生支持了avro, 别再引用第三方库了

如果你想使用 avro 格式写入kafka 就更方便了,上述例子中,使用o_avro() 转换为avro 格式,然后 输出到 kafka中。

关键词: Spark 2.4 原生支持内置支持avro, spark read avro

你的老代码应该怎么办

spark2.4 中的内置的 avro 完全兼容原来的第三方库中的 spark-avro,也就是说如果你原来使用的 format 是 com.databricks.spark.avro , 现在这个代码你完全不用动,如果说你自己在第三方库上做了自己的变更,就是想用外部引用过来的类,也可以,设置 spark.sql.legacy.replaceDatabricksSparkAvro.enabled 为 false 就行。

关键词: Spark 2.4 原生支持内置支持avro, spark read avro

性能提升

这个性能提升才应该是我们使用新版本最大的动力,这个性能优化在哪里呢,SPARK-24800, 大家可以看下这个 pr 学习一下,我们都知道 spark 内部数据流转格式是 InternalRow,DataFrame 的Row 是要靠 RowEncoder 转换为 InternalRow 格式的, 对于avro 格式来将,原来的实现方式是先把 avro 格式转换为 Row, 然后把 Row 转换为 InternalRow, 新的实现方式是直接把 avro 格式转换为 InternalRow, 省了一步。

下面有一个 databrick 官方做的性能测试报告:

学习 | Spark 2.4 原生支持了avro, 别再引用第三方库了

可以看到,读性能提高了一倍,写性能提高了8%左右,这么来看,spark2.4 中使用 avro 确实便利了很多,而且有了 IO 性能的提高,估计以后最新的变动应该第一时间体现在内置的 avro 支持中,而不是第三方的库里面。

关键词: Spark 2.4 原生支持内置支持avro, spark read avro

所以升不升级你看着办,我先升为敬

 

大家都在看

spark sql源码系列:

是时候学习真正的spark技术了从0到1认识 spark sql

structured streaming 系列:

structured streaming 原理剖析structured streaming 碰上kafkastructured streaming 是如何搞定乱序时间的

spark streaming 系列:

spark streaming 读取kafka各种姿势详解spark streaming流式计算中的困境与解决之道

spark core 系列:

彻底搞懂spark shuffle过程(1)彻底搞懂spark shuffle过程(2)spark内存管理-Tungsten框架探秘

spark 机器学习系列

 

 

关注【spark技术分享】

一起撸spark源码,一起玩spark最佳实践

学习 | Spark 2.4 原生支持了avro, 别再引用第三方库了

听说新版微信这有个好看

原文始发于微信公众号(spark技术分享):学习 | Spark 2.4 原生支持了avro, 别再引用第三方库了

写在阿里Blink正式开源之际

admin阅读(841)

写在阿里Blink正式开源之际

阿里最近要正式将内部Blink开源,搞Blink 的大牛也在 AI 前线公众号上推了文章,介绍Blink的优势重磅!阿里Blink正式开源,重要优化点解读 , 首先 spark 君对于每秒处理多少数据量,单个作业处理数据量超过400T 这些标榜不是特别在意,因为这涉及到使用了多少资源,处理时长以及使用姿势,不说清楚就是无意义的。spark君比较在意的是文中提到的Blink的一些重要的feature,大致看有 5 个方面

  • Hive的兼容性,Blink:为了打通元数据,我们重构了 Flink catalog 的实现,并且增加了两种 catalog,一个是基于内存存储的 FlinkInMemoryCatalog,另外一个是能够桥接 Hive metaStore 的 HiveCatalog。有了这个 HiveCatalog,Flink 作业就能读取 Hive 的 metaData,  spark君:这个功能怎么看怎么像从 spark 这边抄过来的,至少是借鉴,我们都知道,spark 定义了ExternalCatalog 用来管理操作数据库表或者注册函数等,然后分为 InMemoryCatalog 和HiveExternalCatalog ,连名字都差不多,到底是谁抄谁呢,spark君在平时使用中,感觉 spark sql 除了 像 hive 中带有buckets的table 等极少数几个操作不支持,spark 君敢说 Spark SQL支持绝大部分的Hive特征。
  •  SQL/TableAPI的优化,Blink: 我们对 SQL engine 的架构做了较大的调整。提出了全新的 Query Processor(QP), 它包括了一个优化层(Query Optimizer)和一个算子层(Query Executor)。这样一来,流计算和批计算的在这两层大部分的设计工作就能做到尽可能地复用,   spark 君:这个就对应 spark sql 里面的逻辑优化和物理执行两个流程吧,这个在这里就不用多说了吧,spark君一直孜孜不倦的给小伙伴们普及 spark逻辑优化和物理优化都做了什么,感觉spark在这块都优化的不能再优化了。至于说到 流批复用,我就默默提一下 structured streaming ,  完全架构在 spark sql 之上,一张无限流动的大表。看文章中重点强调的 BinaryRow 和  codegen 优化方式,这都是 spark 玩了很久很久的东西了,不了解的小伙伴,需要多看看相关的,后续spark君会专门抽文章介绍。
  • 更好的runtime支持,Blink: 首先 Blink 引入了 Pluggable Shuffle Architecture,开发者可以根据不同的计算模型或者新硬件的需要实现不同的 shuffle 策略进行适配。此外 Blink 还引入新的调度架构,容许开发者根据计算模型自身的特点定制不同调度器。说道这个,spark君表示我对 spark 官方最佩服的一点就是源码里面体现出来的高度工程抽象能力,高内聚,低耦合,面向变化和接口编程,早早就支持调度全家桶了(standalone, yarn, mesos, k8s  spark2.3 之后也是原生的哦)。
  • Zeppelin for Flink,  zeppelin 就是个脚手架和大杂烩,spark也早早的入坑了,这个是小功能,就不说了。 
  • Flink Web, Blink 这次介绍出来的多种指标展示维度,看着不错,其实我对 spark 的界面是有些不满意的,特别是 针对 structured streaming 的指标展示,还是比较弱。

下面是 一位 spark 大牛的文章,里面很多观点和我的一致,在这里转发给大家看下:

前言

今天朋友圈有篇【阿里技术】发的文章,说Blink的性能如何强悍,功能现在也已经比较完善。譬如:

Blink 在 TPC-DS 上和 Spark 相比有着非常明显的性能优势,而且这种性能优势随着数据量的增加而变得越来越大。在实际的场景这种优势已经超过 Spark 三倍,在流计算性能上我们也取得了类似的提升。我们线上的很多典型作业,性能是原来的 3 到 5 倍。在有数据倾斜的场景,以及若干比较有挑战的 TPC-H query,流计算性能甚至得到了数十倍的提升。

什么时候可以享受这波红利

还要等待一段时间。要想享受Blink的加持,大家可能还要等待一段时间,因为除了功能合并,还有代码质量。代码质量理论上应该是没有原生flink好的。这个需要时间,不是靠人力就能搞定的。

一点忧思

阿里收购Flink母公司,然后马上发通告,说blink要合并进flink了,之前还是商量口吻。显然,这对于社区来说,是一个非常不友好的感觉。我猜测,社区部分优秀的人才(包括母公司)肯定会有人走的。开源项目对于PR的质量除了功能,更多的是架构,代码质量等等的考量。

那和Spark的对比怎么样?

Spark 和 Flink不在一个level级别战斗。Spark 从诞生没多久开始,就朝着AI方向发展,包括内置的mllib,深度学习后也马上抓住机遇,在2.2.x之后发力,DB公司开发了一套生态辅助系统,比如Spark deep Learning,Tensorframes, GraphFrames等等,另外还有众多第三方框架的加持。2.3-2.4在商业版本里则已经集成了如horovod等分布式深度学习框架,所以说,2.2.x之后,Spark的主战场早就已经是AI,而 Flink依然停留在流,批战场。

Flink,Spark性能好对机器学习有啥影响

有人会问,机器学习对性能不是很在乎么?现在flink性能据说那么好?到底有多好,这是一家之言,但是 在这些框架里
性能在AI方面不是很重要,因为他们对AI重在集成,而不是自己实现。这就意味着瓶颈是在数据交换以及AI框架自身之上。模型构件好进行预测,也是对应的AI框架自己去加载,提供预测接口,其他只是wrap一层而已。

盛夏即将发布的3.0则对AI更加友好,包括CPU/GPU的管理,K8s backend, 数据交换(Spark – AI框架)的提速,内部Barrier API 的等进一步的完,显然让Spark在AI领域进一步保持优势

 和AI集成的基础,Spark以有所沉淀

和AI集成的好坏,取决于Java/Scala语言和Python语言的互通的质量。Spark 在1.6之前就已经支持Python,经过这么多年的优化,已经有了很好的经验,最新的arrow引入让速度更是成两位数的提升。

Flink 盛夏之下的喧闹

这次关于bink 合并进flink的通告不是由社区主导发送,而是阿里技术发送,显然有喧宾夺主的意味,会加大他们(母公司和阿里巴巴)融合的难度。极端点,Flink可能就由一个社区项目变成一个公司产品。阿里开源了那么多东西,有几个达到了真正的国际影响力,并且处于持续的发展之中的?公司加持对于社区而言,短期是利好,但是如果干预多了,长期就不被看好了。

Presto是facebook开源的并且运作,一切以满足公司需求为最高优先级,虽然presto很优秀,但是社区没有主导权,极大的限制了他的发展,终于发生了分裂(大家可以自己搜搜)。

最后加一句

不要再拿Spark streaming 和Flink比了,请拿Structured streaming 以及Continue Processing 来和Flink比。为啥国内还在拿Spark Streaming 和Flink比?

因为惯性使然,structured streaming 新引入了一堆概念,并且限制也比较多,spark streaming大家把之前该遇到的问题都遇到了,而且也有一定的积累,要切换就没那么容易了。加上flink有阿里加持,宣传势头很大,可能有的直接就从,spark streaming 切到flink去了。

大家都在看

spark sql源码系列:

是时候学习真正的spark技术了从0到1认识 spark sqlspark sql 源码剖析 PushDownPredicatespark sql 源码剖析 OptimizeIn 篇

structured streaming 系列:

structured streaming 原理剖析structured streaming 碰上kafkastructured streaming 是如何搞定乱序时间的

spark streaming 系列:

spark streaming 读取kafka各种姿势详解spark streaming流式计算中的困境与解决之道

spark core 系列:

彻底搞懂spark shuffle过程(1)彻底搞懂spark shuffle过程(2)spark内存管理-Tungsten框架探秘

spark 机器学习系列

关注【spark技术分享】

一起撸spark源码,一起玩spark最佳实践

写在阿里Blink正式开源之际

原文始发于微信公众号(spark技术分享):写在阿里Blink正式开源之际

一文搞懂spark sql中的CBO(基于代价的优化)

admin阅读(936)


Spark CBO 背景。。。。。。。。。。。

我们在 是时候学习真正的spark技术了 这篇文章中介绍了很多基于规则的优化方式,也就是RBO,实现简单有效。它属于 LogicalPlan 的优化,所有优化均基于 LogicalPlan 本身的特点,未考虑数据本身的特点,也未考虑算子本身的代价。


本文将介绍 CBO,它充分考虑了数据本身的特点(如大小、分布)以及操作算子的特点(中间结果集的分布及大小)及代价,从而更好的选择执行代价最小的物理执行计划,即 SparkPlan。


Spark CBO 原理

CBO 原理是计算所有可能的物理计划的代价,并挑选出代价最小的物理执行计划。其核心在于评估一个给定的物理执行计划的代价。


物理执行计划是一个树状结构,其代价等于每个执行节点的代价总合,如下图所示。


一文搞懂spark sql中的CBO(基于代价的优化)


而每个执行节点的代价,分为两个部分


  • 该执行节点对数据集的影响,或者说该节点输出数据集的大小与分布

  • 该执行节点操作算子的代价


每个操作算子的代价相对固定,可用规则来描述。而执行节点输出数据集的大小与分布,分为两个部分:1) 初始数据集,也即原始表,其数据集的大小与分布可直接通过统计得到;2)中间节点输出数据集的大小与分布可由其输入数据集的信息与操作本身的特点推算。


所以,最终主要需要解决两个问题

  • 如何获取原始数据集的统计信息

  • 如何根据输入数据集估算特定算子的输出数据集


Statistics 收集

过如下 SQL 语句,可计算出整个表的记录总数以及总大小


一文搞懂spark sql中的CBO(基于代价的优化)


从如下示例中,Statistics 一行可见, customer 表数据总大小为 37026233 字节,即 35.3MB,总记录数为 28万,与事实相符。


一文搞懂spark sql中的CBO(基于代价的优化)


通过如下 SQL 语句,可计算出指定列的统计信息


一文搞懂spark sql中的CBO(基于代价的优化)


从如下示例可见,customer 表的 c_customer_sk 列最小值为 1, 最大值为 280000,null 值个数为 0,不同值个数为 274368,平均列长度为 8,最大列长度为 8。


一文搞懂spark sql中的CBO(基于代价的优化)


除上述示例中的统计信息外,Spark CBO 还直接等高直方图。在上例中,histogram 为 NULL。其原因是,spark.sql.statistics.histogram.enabled 默认值为 false,也即 ANALYZE 时默认不计算及存储 histogram。


下例中,通过 SET spark.sql.statistics.histogram.enabled=true; 启用 histogram 后,完整的统计信息如下:


一文搞懂spark sql中的CBO(基于代价的优化)


从上图可见,生成的 histogram 为 equal-height histogram,且高度为 1102.36,bin 数为 254。其中 bin 个数可由 spark.sql.statistics.histogram.numBins 配置。对于每个 bin,匀记录其最小值,最大值,以及 distinct count。


值得注意的是,这里的 distinct count 并不是精确值,而是通过 HyperLogLog 计算出来的近似值。使用 HyperLogLog 的原因有二

  • 使用 HyperLogLog 计算 distinct count 速度快速

  • HyperLogLog 计算出的 distinct count 可以合并。例如可以直接将两个 bin 的 HyperLogLog 值合并算出这两个 bin 总共的 distinct count,而无须从重新计算,且合并结果的误差可控


算子对数据集影响估计

对于中间算子,可以根据输入数据集的统计信息以及算子的特性,可以估算出输出数据集的统计结果。


一文搞懂spark sql中的CBO(基于代价的优化)


本节以 Filter 为例说明算子对数据集的影响。


对于常见的 Column A < value B Filter,可通过如下方式估算输出中间结果的统计信息

  • 若 B < A.min,则无数据被选中,输出结果为空

  • 若 B > A.max,则全部数据被选中,输出结果与 A 相同,且统计信息不变

  • 若 A.min < B < A.max,则被选中的数据占比为 (B.value – A.min) / (A.max – A.min),A.min 不变,A.max 更新为 B.value,A.ndv = A.ndv * (B.value – A.min) / (A.max – A.min)



一文搞懂spark sql中的CBO(基于代价的优化)


上述估算的前提是,字段 A 数据均匀分布。但很多时候,数据分布并不均匀,且当数据倾斜严重是,上述估算误差较大。此时,可充分利用 histogram 进行更精确的估算

一文搞懂spark sql中的CBO(基于代价的优化)


启用 Historgram 后,Filter Column A < value B的估算方法为

  • 若 B < A.min,则无数据被选中,输出结果为空

  • 若 B > A.max,则全部数据被选中,输出结果与 A 相同,且统计信息不变

  • 若 A.min < B < A.max,则被选中的数据占比为 height(<B) / height(All),A.min 不变,A.max = B.value,A.ndv = ndv(<B)


在上图中,B.value = 15,A.min = 0,A.max = 32,bin 个数为 10。Filter 后 A.ndv = ndv(<B.value) = ndv(<15)。该值可根据 A < 15 的 5 个 bin 的 ndv 通过 HyperLogLog 合并而得,无须重新计算所有 A < 15 的数据。



算子代价估计

SQL 中常见的操作有 Selection(由 select 语句表示),Filter(由 where 语句表示)以及笛卡尔乘积(由 join 语句表示)。其中代价最高的是 join。


Spark SQL 的 CBO 通过如下方法估算 join 的代价



其中 rows 即记录行数代表了 CPU 代价,size 代表了 IO 代价。weight 由 spark.sql.cbo.joinReorder.card.weight 决定,其默认值为 0.7。



Build侧选择

对于两表Hash Join,一般选择小表作为build size,构建哈希表,另一边作为 probe side。未开启 CBO 时,根据表原始数据大小选择 t2 作为build side


一文搞懂spark sql中的CBO(基于代价的优化)


而开启 CBO 后,基于估计的代价选择 t1 作为 build side。更适合本例

一文搞懂spark sql中的CBO(基于代价的优化)


优化 Join 类型

Spark SQL 中,Join 可分为 Shuffle based Join 和 BroadcastJoin。Shuffle based Join 需要引入 Shuffle,代价相对较高。BroadcastJoin 无须 Join,但要求至少有一张表足够小,能通过 Spark 的 Broadcast 机制广播到每个 Executor 中。


在不开启 CBO 中,Spark SQL 通过 spark.sql.autoBroadcastJoinThreshold 判断是否启用 BroadcastJoin。其默认值为 10485760 即 10 MB。


并且该判断基于参与 Join 的表的原始大小。


在下图示例中,Table 1 大小为 1 TB,Table 2 大小为 20 GB,因此在对二者进行 join 时,由于二者都远大于自动 BroatcastJoin 的阈值,因此 Spark SQL 在未开启 CBO 时选用 SortMergeJoin 对二者进行 Join。


而开启 CBO 后,由于 Table 1 经过 Filter 1 后结果集大小为 500 GB,Table 2 经过 Filter 2 后结果集大小为 10 MB 低于自动 BroatcastJoin 阈值,因此 Spark SQL 选用 BroadcastJoin。

一文搞懂spark sql中的CBO(基于代价的优化)


优化多表 Join 顺序

未开启 CBO 时,Spark SQL 按 SQL 中 join 顺序进行 Join。极端情况下,整个 Join 可能是 left-deep tree。在下图所示 TPC-DS Q25 中,多路 Join 存在如下问题,因此耗时 241 秒。

  • left-deep tree,因此所有后续 Join 都依赖于前面的 Join 结果,各 Join 间无法并行进行

  • 前面的两次 Join 输入输出数据量均非常大,属于大 Join,执行时间较长

一文搞懂spark sql中的CBO(基于代价的优化)


开启 CBO 后, Spark SQL 将执行计划优化如下

一文搞懂spark sql中的CBO(基于代价的优化)


优化后的 Join 有如下优势,因此执行时间降至 71 秒

  • Join 树不再是 left-deep tree,因此 Join 3 与 Join 4 可并行进行,Join 5 与 Join 6 可并行进行

  • 最大的 Join 5 输出数据只有两百万条结果,Join 6 有 1.49 亿条结果,Join 7相当于小 Join


总结

  • RBO 基于规则的优化方式,实现简单有效。它属于 LogicalPlan 的优化,所有优化均基于 LogicalPlan 本身的特点,未考虑数据本身的特点,也未考虑算子本身的代价

  • 本文介绍的 CBO 考虑了数据的统计特征,从而选择总代价最低的物理执行计划。但物理执行计划是固定的,一旦选定,不可更改。未考虑运行时信息

  • 之前介绍的  Spark SQL Adapptive Execution ,它可根据运行时信息动态调整执行计划从而优化执行





大家都在看

spark sql源码系列:

是时候学习真正的spark技术了  从0到1认识 spark sql  spark sql 源码剖析 PushDownPredicate   spark sql 源码剖析 OptimizeIn 篇

structured streaming 系列:

structured streaming 原理剖析   structured streaming 碰上kafka structured streaming 是如何搞定乱序时间的

spark streaming 系列:

spark streaming 读取kafka各种姿势详解   spark streaming流式计算中的困境与解决之道 

spark core 系列:

彻底搞懂spark shuffle过程(1)  彻底搞懂spark shuffle过程(2)  spark内存管理-Tungsten框架探秘

spark 机器学习系列





关注【spark技术分享】


一起撸spark源码,一起玩spark最佳实践


一文搞懂spark sql中的CBO(基于代价的优化)


原文始发于微信公众号(spark技术分享):一文搞懂spark sql中的CBO(基于代价的优化)

Adaptive Execution 让 Spark SQL 更智能更高效

admin阅读(2250)

1 背景

spark sql 的catalyst框架在内部通过ROB(基于规则的优化)和 CBO(基于成本的优化),从查询本身与目标数据的特点的角度尽可能保证了最终生成的执行计划的高效性。但是

  • 执行计划一旦生成,便不可更改,即使执行过程中发现后续执行计划可以进一步优化,也只能按原计划执行
  • CBO 基于统计信息生成最优执行计划,需要提前生成统计信息,成本较大,且不适合数据更新频繁的场景
  • CBO 基于基础表的统计信息与操作对数据的影响推测中间结果的信息,只是估算,不够精确

本文介绍的 Adaptive Execution 将可以根据执行过程中的中间数据优化后续执行,从而提高整体执行效率。核心在于两点

  • 执行计划可动态调整
  • 调整的依据是中间结果的精确统计信息

2 动态设置 Shuffle Partition

2.1 Spark Shuffle 原理

Spark Shuffle 一般用于将上游 Stage 中的数据按 Key 分区,保证来自不同 Mapper (表示上游 Stage 的 Task)的相同的 Key 进入相同的 Reducer (表示下游 Stage 的 Task)。一般用于 group by 或者 Join 操作。

Adaptive Execution 让 Spark SQL 更智能更高效
Spark Shuffle 过程

如上图所示,该 Shuffle 总共有 2 个 Mapper 与 5 个 Reducer。每个 Mapper 会按相同的规则(由 Partitioner 定义)将自己的数据分为五份。每个 Reducer 从这两个 Mapper 中拉取属于自己的那一份数据。

2.2 原有 Shuffle 的问题

使用 Spark SQL 时,可通过 spark.sql.shuffle.partitions 指定 Shuffle 时 Partition 个数,也即 Reducer 个数

该参数决定了一个 Spark SQL Job 中包含的所有 Shuffle 的 Partition 个数。如下图所示,当该参数值为 3 时,所有 Shuffle 中 Reducer 个数都为 3

Adaptive Execution 让 Spark SQL 更智能更高效
Spark SQL with multiple Shuffle

这种方法有如下问题

  • Partition 个数不宜设置过大
  • Reducer(代指 Spark Shuffle 过程中执行 Shuffle Read 的 Task) 个数过多,每个 Reducer 处理的数据量过小。大量小 Task 造成不必要的 Task 调度开销与可能的资源调度开销(如果开启了 Dynamic Allocation)
  • Reducer 个数过大,如果 Reducer 直接写 HDFS 会生成大量小文件,从而造成大量 addBlock RPC,Name node 可能成为瓶颈,并影响其它使用 HDFS 的应用
  • 过多 Reducer 写小文件,会造成后面读取这些小文件时产生大量 getBlock RPC,对 Name node 产生冲击
  • Partition 个数不宜设置过小
  • 每个 Reducer 处理的数据量太大,Spill 到磁盘开销增大
  • Reducer GC 时间增长
  • Reducer 如果写 HDFS,每个 Reducer 写入数据量较大,无法充分发挥并行处理优势
  • 很难保证所有 Shuffle 都最优
  • 不同的 Shuffle 对应的数据量不一样,因此最优的 Partition 个数也不一样。使用统一的 Partition 个数很难保证所有 Shuffle 都最优
  • 定时任务不同时段数据量不一样,相同的 Partition 数设置无法保证所有时间段执行时都最优

2.3 自动设置 Shuffle Partition 原理

如 Spark Shuffle 原理 一节图中所示,Stage 1 的 5 个 Partition 数据量分别为 60MB,40MB,1MB,2MB,50MB。其中 1MB 与 2MB 的 Partition 明显过小(实际场景中,部分小 Partition 只有几十 KB 及至几十字节)

开启 Adaptive Execution 后

  • Spark 在 Stage 0 的 Shuffle Write 结束后,根据各 Mapper 输出,统计得到各 Partition 的数据量,即 60MB,40MB,1MB,2MB,50MB
  • 通过 ExchangeCoordinator 计算出合适的 post-shuffle Partition 个数(即 Reducer)个数(本例中 Reducer 个数设置为 3)
  • 启动相应个数的 Reducer 任务
  • 每个 Reducer 读取一个或多个 Shuffle Write Partition 数据(如下图所示,Reducer 0 读取 Partition 0,Reducer 1 读取 Partition 1、2、3,Reducer 2 读取 Partition 4)

    Adaptive Execution 让 Spark SQL 更智能更高效
    Spark SQL adaptive reducer 1

三个 Reducer 这样分配是因为

  • targetPostShuffleInputSize 默认为 64MB,每个 Reducer 读取数据量不超过 64MB
  • 如果 Partition 0 与 Partition 2 结合,Partition 1 与 Partition 3 结合,虽然也都不超过 64 MB。但读完 Partition 0 再读 Partition 2,对于同一个 Mapper 而言,如果每个 Partition 数据比较少,跳着读多个 Partition 相当于随机读,在 HDD 上性能不高
  • 目前的做法是只结合相临的 Partition,从而保证顺序读,提高磁盘 IO 性能
  • 该方案只会合并多个小的 Partition,不会将大的 Partition 拆分,因为拆分过程需要引入一轮新的 Shuffle
  • 基于上面的原因,默认 Partition 个数(本例中为 5)可以大一点,然后由 ExchangeCoordinator 合并。如果设置的 Partition 个数太小,Adaptive Execution 在此场景下无法发挥作用

由上图可见,Reducer 1 从每个 Mapper 读取 Partition 1、2、3 都有三根线,是因为原来的 Shuffle 设计中,每个 Reducer 每次通过 Fetch 请求从一个特定 Mapper 读数据时,只能读一个 Partition 的数据。也即在上图中,Reducer 1 读取 Mapper 0 的数据,需要 3 轮 Fetch 请求。对于 Mapper 而言,需要读三次磁盘,相当于随机 IO。

为了解决这个问题,Spark 新增接口,一次 Shuffle Read 可以读多个 Partition 的数据。如下图所示,Task 1 通过一轮请求即可同时读取 Task 0 内 Partition 0、1 和 2 的数据,减少了网络请求数量。同时 Mapper 0 一次性读取并返回三个 Partition 的数据,相当于顺序 IO,从而提升了性能。 

Adaptive Execution 让 Spark SQL 更智能更高效
Spark SQL adaptive reducer 2

由于 Adaptive Execution 的自动设置 Reducer 是由 ExchangeCoordinator 根据 Shuffle Write 统计信息决定的,因此即使在同一个 Job 中不同 Shuffle 的 Reducer 个数都可以不一样,从而使得每次 Shuffle 都尽可能最优。

上文 原有 Shuffle 的问题 一节中的例子,在启用 Adaptive Execution 后,三次 Shuffle 的 Reducer 个数从原来的全部为 3 变为 2、4、3。

Adaptive Execution 让 Spark SQL 更智能更高效
Spark SQL with adaptive Shuffle

2.4 使用与优化方法

spark.sql.adaptive.enabled=true 启用 Adaptive Execution 从而启用自动设置 Shuffle Reducer 这一特性

spark.sql.adaptive.shuffle.targetPostShuffleInputSize 可设置每个 Reducer 读取的目标数据量,其单位是字节,默认值为 64 MB。上文例子中,如果将该值设置为 50 MB,最终效果仍然如上文所示,而不会将 Partition 0 的 60MB 拆分。具体原因上文已说明

3 动态调整执行计划

3.1 固定执行计划的不足

在不开启 Adaptive Execution 之前,执行计划一旦确定,即使发现后续执行计划可以优化,也不可更改。如下图所示,SortMergJoin 的 Shuffle Write 结束后,发现 Join 一方的 Shuffle 输出只有 46.9KB,仍然继续执行 SortMergeJoin

Adaptive Execution 让 Spark SQL 更智能更高效
Spark SQL with fixed DAG

此时完全可将 SortMergeJoin 变更为 BroadcastJoin 从而提高整体执行效率。

3.2 SortMergeJoin 原理

SortMergeJoin 是常用的分布式 Join 方式,它几乎可使用于所有需要 Join 的场景。但有些场景下,它的性能并不是最好的。

SortMergeJoin 的原理如下图所示

  • 将 Join 双方以 Join Key 为 Key 按照 HashPartitioner 分区,且保证分区数一致
  • Stage 0 与 Stage 1 的所有 Task 在 Shuffle Write 时,都将数据分为 5 个 Partition,并且每个 Partition 内按 Join Key 排序
  • Stage 2 启动 5 个 Task 分别去 Stage 0 与 Stage 1 中所有包含 Partition 分区数据的 Task 中取对应 Partition 的数据。(如果某个 Mapper 不包含该 Partition 的数据,则 Redcuer 无须向其发起读取请求)。
  • Stage 2 的 Task 2 分别从 Stage 0 的 Task 0、1、2 中读取 Partition 2 的数据,并且通过 MergeSort 对其进行排序
  • Stage 2 的 Task 2 分别从 Stage 1 的 Task 0、1 中读取 Partition 2 的数据,且通过 MergeSort 对其进行排序
  • Stage 2 的 Task 2 在上述两步 MergeSort 的同时,使用 SortMergeJoin 对二者进行 Join
Adaptive Execution 让 Spark SQL 更智能更高效
Spark SQL SortMergeJoin

3.3 BroadcastJoin 原理

当参与 Join 的一方足够小,可全部置于 Executor 内存中时,可使用 Broadcast 机制将整个 RDD 数据广播到每一个 Executor 中,该 Executor 上运行的所有 Task 皆可直接读取其数据。(本文中,后续配图,为了方便展示,会将整个 RDD 的数据置于 Task 框内,而隐藏 Executor)

对于大 RDD,按正常方式,每个 Task 读取并处理一个 Partition 的数据,同时读取 Executor 内的广播数据,该广播数据包含了小 RDD 的全量数据,因此可直接与每个 Task 处理的大 RDD 的部分数据直接 Join

Adaptive Execution 让 Spark SQL 更智能更高效
Spark SQL BroadcastJoin

根据 Task 内具体的 Join 实现的不同,又可分为 BroadcastHashJoin 与 BroadcastNestedLoopJoin。后文不区分这两种实现,统称为 BroadcastJoin

与 SortMergeJoin 相比,BroadcastJoin 不需要 Shuffle,减少了 Shuffle 带来的开销,同时也避免了 Shuffle 带来的数据倾斜,从而极大地提升了 Job 执行效率

同时,BroadcastJoin 带来了广播小 RDD 的开销。另外,如果小 RDD 过大,无法存于 Executor 内存中,则无法使用 BroadcastJoin

对于基础表的 Join,可在生成执行计划前,直接通过 HDFS 获取各表的大小,从而判断是否适合使用 BroadcastJoin。但对于中间表的 Join,无法提前准确判断中间表大小从而精确判断是否适合使用 BroadcastJoin

《Spark SQL 性能优化再进一步 CBO 基于代价的优化》一文介绍的 CBO 可通过表的统计信息与各操作对数据统计信息的影响,推测出中间表的统计信息,但是该方法得到的统计信息不够准确。同时该方法要求提前分析表,具有较大开销

而开启 Adaptive Execution 后,可直接根据 Shuffle Write 数据判断是否适用 BroadcastJoin

3.4 动态调整执行计划原理

如上文 SortMergeJoin 原理 中配图所示,SortMergeJoin 需要先对 Stage 0 与 Stage 1 按同样的 Partitioner 进行 Shuffle Write

Shuffle Write 结束后,可从每个 ShuffleMapTask 的 MapStatus 中统计得到按原计划执行时 Stage 2 各 Partition 的数据量以及 Stage 2 需要读取的总数据量。(一般来说,Partition 是 RDD 的属性而非 Stage 的属性,本文为了方便,不区分 Stage 与 RDD。可以简单认为一个 Stage 只有一个 RDD,此时 Stage 与 RDD 在本文讨论范围内等价)

如果其中一个 Stage 的数据量较小,适合使用 BroadcastJoin,无须继续执行 Stage 2 的 Shuffle Read。相反,可利用 Stage 0 与 Stage 1 的数据进行 BroadcastJoin,如下图所示

Adaptive Execution 让 Spark SQL 更智能更高效
Spark SQL Auto BroadcastJoin

具体做法是

  • 将 Stage 1 全部 Shuffle Write 结果广播出去
  • 启动 Stage 2,Partition 个数与 Stage 0 一样,都为 3
  • 每个 Stage 2 每个 Task 读取 Stage 0 每个 Task 的 Shuffle Write 数据,同时与广播得到的 Stage 1 的全量数据进行 Join

注:广播数据存于每个 Executor 中,其上所有 Task 共享,无须为每个 Task 广播一份数据。上图中,为了更清晰展示为什么能够直接 Join 而将 Stage 2 每个 Task 方框内都放置了一份 Stage 1 的全量数据

虽然 Shuffle Write 已完成,将后续的 SortMergeJoin 改为 Broadcast 仍然能提升执行效率

  • SortMergeJoin 需要在 Shuffle Read 时对来自 Stage 0 与 Stage 1 的数据进行 Merge Sort,并且可能需要 Spill 到磁盘,开销较大
  • SortMergeJoin 时,Stage 2 的所有 Task 需要取 Stage 0 与 Stage 1 的所有 Task 的输出数据(如果有它要的数据 ),会造成大量的网络连接。且当 Stage 2 的 Task 较多时,会造成大量的磁盘随机读操作,效率不高,且影响相同机器上其它 Job 的执行效率
  • SortMergeJoin 时,Stage 2 每个 Task 需要从几乎所有 Stage 0 与 Stage 1 的 Task 取数据,无法很好利用 Locality
  • Stage 2 改用 Broadcast,每个 Task 直接读取 Stage 0 的每个 Task 的数据(一对一),可很好利用 Locality 特性。最好在 Stage 0 使用的 Executor 上直接启动 Stage 2 的 Task。如果 Stage 0 的 Shuffle Write 数据并未 Spill 而是在内存中,则 Stage 2 的 Task 可直接读取内存中的数据,效率非常高。如果有 Spill,那可直接从本地文件中读取数据,且是顺序读取,效率远比通过网络随机读数据效率高

3.5 使用与优化方法

该特性的使用方式如下

  • 当 spark.sql.adaptive.enabled 与 spark.sql.adaptive.join.enabled 都设置为 true 时,开启 Adaptive Execution 的动态调整 Join 功能
  • spark.sql.adaptiveBroadcastJoinThreshold 设置了 SortMergeJoin 转 BroadcastJoin 的阈值。如果不设置该参数,该阈值与 spark.sql.autoBroadcastJoinThreshold 的值相等
  • 除了本文所述 SortMergeJoin 转 BroadcastJoin,Adaptive Execution 还可提供其它 Join 优化策略。部分优化策略可能会需要增加 Shuffle。spark.sql.adaptive.allowAdditionalShuffle 参数决定了是否允许为了优化 Join 而增加 Shuffle。其默认值为 false

4 自动处理数据倾斜

4.1 解决数据倾斜典型方案

  • 保证文件可 Split 从而避免读 HDFS 时数据倾斜
  • 保证 Kafka 各 Partition 数据均衡从而避免读 Kafka 引起的数据倾斜
  • 调整并行度或自定义 Partitioner 从而分散分配给同一 Task 的大量不同 Key
  • 使用 BroadcastJoin 代替 ReduceJoin 消除 Shuffle 从而避免 Shuffle 引起的数据倾斜
  • 对倾斜 Key 使用随机前缀或后缀从而分散大量倾斜 Key,同时将参与 Join 的小表扩容,从而保证 Join 结果的正确性

4.2 自动解决数据倾斜

目前 Adaptive Execution 可解决 Join 时数据倾斜问题。其思路可理解为将部分倾斜的 Partition (倾斜的判断标准为该 Partition 数据是所有 Partition Shuffle Write 中位数的 N 倍) 进行单独处理,类似于 BroadcastJoin,如下图所示

Adaptive Execution 让 Spark SQL 更智能更高效
Spark SQL resolve joinm skew

在上图中,左右两边分别是参与 Join 的 Stage 0 与 Stage 1 (实际应该是两个 RDD 进行 Join,但如同上文所述,这里不区分 RDD 与 Stage),中间是获取 Join 结果的 Stage 2

明显 Partition 0 的数据量较大,这里假设 Partition 0 符合“倾斜”的条件,其它 4 个 Partition 未倾斜

以 Partition 对应的 Task 2 为例,它需获取 Stage 0 的三个 Task 中所有属于 Partition 2 的数据,并使用 MergeSort 排序。同时获取 Stage 1 的两个 Task 中所有属于 Partition 2 的数据并使用 MergeSort 排序。然后对二者进行 SortMergeJoin

对于 Partition 0,可启动多个 Task

  • 在上图中,启动了两个 Task 处理 Partition 0 的数据,分别名为 Task 0-0 与 Task 0-1
  • Task 0-0 读取 Stage 0 Task 0 中属于 Partition 0 的数据
  • Task 0-1 读取 Stage 0 Task 1 与 Task 2 中属于 Partition 0 的数据,并进行 MergeSort
  • Task 0-0 与 Task 0-1 都从 Stage 1 的两个 Task 中所有属于 Partition 0 的数据
  • Task 0-0 与 Task 0-1 使用 Stage 0 中属于 Partition 0 的部分数据与 Stage 1 中属于 Partition 0 的全量数据进行 Join

通过该方法,原本由一个 Task 处理的 Partition 0 的数据由多个 Task 共同处理,每个 Task 需处理的数据量减少,从而避免了 Partition 0 的倾斜

对于 Partition 0 的处理,有点类似于 BroadcastJoin 的做法。但区别在于,Stage 2 的 Task 0-0 与 Task 0-1 同时获取 Stage 1 中属于 Partition 0 的全量数据,是通过正常的 Shuffle Read 机制实现,而非 BroadcastJoin 中的变量广播实现

4.3 使用与优化方法

开启与调优该特性的方法如下

  • 将 spark.sql.adaptive.skewedJoin.enabled 设置为 true 即可自动处理 Join 时数据倾斜
  • spark.sql.adaptive.skewedPartitionMaxSplits 控制处理一个倾斜 Partition 的 Task 个数上限,默认值为 5
  • spark.sql.adaptive.skewedPartitionRowCountThreshold 设置了一个 Partition 被视为倾斜 Partition 的行数下限,也即行数低于该值的 Partition 不会被当作倾斜 Partition 处理。其默认值为 10L * 1000 * 1000 即一千万
  • spark.sql.adaptive.skewedPartitionSizeThreshold设置了一个 Partition 被视为倾斜 Partition 的大小下限,也即大小小于该值的 Partition 不会被视作倾斜 Partition。其默认值为 64 * 1024 * 1024 也即 64MB
  • spark.sql.adaptive.skewedPartitionFactor 该参数设置了倾斜因子。如果一个 Partition 的大小大于 spark.sql.adaptive.skewedPartitionSizeThreshold的同时大于各 Partition 大小中位数与该因子的乘积,或者行数大于 spark.sql.adaptive.skewedPartitionRowCountThreshold 的同时大于各 Partition 行数中位数与该因子的乘积,则它会被视为倾斜的 Partition

大家都在看

spark sql源码系列:

是时候学习真正的spark技术了  从0到1认识 spark sql  spark sql 源码剖析 PushDownPredicate   spark sql 源码剖析 OptimizeIn 篇

structured streaming 系列:

structured streaming 原理剖析   structured streaming 碰上kafka structured streaming 是如何搞定乱序时间的

spark streaming 系列:

spark streaming 读取kafka各种姿势详解   spark streaming流式计算中的困境与解决之道 

spark core 系列:

彻底搞懂spark shuffle过程(1)  彻底搞懂spark shuffle过程(2)  spark内存管理-Tungsten框架探秘

spark 机器学习系列





关注【spark技术分享】


一起撸spark源码,一起玩spark最佳实践


Adaptive Execution 让 Spark SQL 更智能更高效

原文始发于微信公众号(spark技术分享):Adaptive Execution 让 Spark SQL 更智能更高效

理解spark sql 优化策略的最好方法就是自己实现一个

admin阅读(754)

spark sql 的优化框架 Catalyst 博大精深,里面的精华是很多大牛一个pr一个pr积累起来的,仔细琢磨琢磨相关源码也是一件痛并快乐的事情,spark 逻辑优化就是在一个 AST 树上进行匹配,匹配到一定的规则,然后进行等价变换规则,从而使计算的成本更低,今天我带大家自己实现一个逻辑优化规则,帮助大家更快地理解spark sql 逻辑优化的底层原理,如果对 spark sql 总体架构不了解的,可以先看这篇文章 是时候学习真正的spark技术了 了解全貌。

我们的例子非常简单,先注册一个表,包含一个 a 字段:

理解spark sql 优化策略的最好方法就是自己实现一个

我们看下当前的执行计划:


理解spark sql 优化策略的最好方法就是自己实现一个

可以看到这个执行计划是比较费的, 因为对于   (a * 1) 这个算式来讲,其实就等于a本身,我们针对这种规则自定义一个 逻辑优化规则

理解spark sql 优化策略的最好方法就是自己实现一个

理解spark sql 优化策略的最好方法就是自己实现一个

上面的代码很好理解,如果匹配到一个变量乘以1的表达式,就直接变换为变量本身, 应用完这个规则 (a#27 * 1) 就变为了 a#27:

理解spark sql 优化策略的最好方法就是自己实现一个

这样就少了一次乘法运算,从而提高了性能。


上面我们是从内部测试,如果你在应用中要定义一个基于规则的优化,然后让这个优化策略自动应用到你写的sql中,可以如下方式定义

理解spark sql 优化策略的最好方法就是自己实现一个

sparkSession 中给用户留了扩展点,Spark catalyst的扩展点在SPARK-18127中被引入,Spark用户可以在SQL处理的各个阶段扩展自定义实现,非常强大高效

  • injectOptimizerRule – 添加optimizer自定义规则,optimizer负责逻辑执行计划的优化,我们例子中就是扩展了逻辑优化规则。
  • injectParser – 添加parser自定义规则,parser负责SQL解析。
  • injectPlannerStrategy – 添加planner strategy自定义规则,planner负责物理执行计划的生成。
  • injectResolutionRule – 添加Analyzer自定义规则到Resolution阶段,analyzer负责逻辑执行计划生成。
  • injectPostHocResolutionRule – 添加Analyzer自定义规则到Post Resolution阶段。
  • injectCheckRule – 添加Analyzer自定义Check规则。

其他几种扩展我们可以也会举例说明,今天只讲解一下怎么扩展逻辑优化规则。

大家都在看

spark sql源码系列:

是时候学习真正的spark技术了  从0到1认识 spark sql  spark sql 源码剖析 PushDownPredicate   spark sql 源码剖析 OptimizeIn 篇

structured streaming 系列:

structured streaming 原理剖析   structured streaming 碰上kafka structured streaming 是如何搞定乱序时间的

spark streaming 系列:

spark streaming 读取kafka各种姿势详解   spark streaming流式计算中的困境与解决之道 

spark core 系列:

彻底搞懂spark shuffle过程(1)  彻底搞懂spark shuffle过程(2)  spark内存管理-Tungsten框架探秘

spark 机器学习系列





关注【spark技术分享】


一起撸spark源码,一起玩spark最佳实践


理解spark sql 优化策略的最好方法就是自己实现一个

原文始发于微信公众号(spark技术分享):理解spark sql 优化策略的最好方法就是自己实现一个

spark 2.4让你飞一般的处理复杂数据类型

admin阅读(1200)

spark 2.4 对复杂数据处理类型引入了 29 个内嵌函数,文档参考 https://docs.databricks.com/_static/notebooks/apache-spark-2.4-functions.html,里面包含一些 higher-order 函数,就跟scala 里面的 map filter reduce 一样,让你在sql中也可以享受函数式编程的快感。


我们都知道,在spark2.4 之前,处理复杂数据类型是一件比较痛苦的事情,有两种比较恶心的处理方式


  •  使用 explod 表达式把嵌套数据类型平展开,应用你自己的处理逻辑,再用 collect_list 表达式在拼凑起来,
  • 自定义一个 udf 函数处理多层嵌套的数据类型

在 spark2.4 之后,你就轻松了,可以使用多种内嵌函数处理复杂类型,对 array 或者 map类型的列处理起来很easy, 如果满足不了你的需求,你可以直接在sql中写lambda 表达式,怎么用,怎么爽。

1 匿名lambda函数使用姿势

下面举个例子说明下:

假如我们有这样一个 dataframe, 有两列,vals 列是个数组,我们的需求是对数组中的多个元素都 +1

spark 2.4让你飞一般的处理复杂数据类型

spark 2.4 之前的写法是:


spark 2.4让你飞一般的处理复杂数据类型

这样会有几个问题, 如果有两个 id 为1的行,平展开在组合后的结果就只有一行了,这就错了,而且带着 group by 肯定就涉及到 shuffle 操作了,性能会下降,而且shuffle 操作不保证数据元素的顺序,有可能数组元素顺序就变了。


另外一种写法是自定义一个UDF:


spark 2.4让你飞一般的处理复杂数据类型

spark 2.4让你飞一般的处理复杂数据类型
这种用法正确性倒是没有问题, 但是会损失性能,下文中会进行详细分析。
如果我们使用 spark2.4 提供的  higher-order 函数, 里面定义一个匿名lambda函数,就轻松了:


spark 2.4让你飞一般的处理复杂数据类型

这个 transform 函数会遍历数组,然后应用你定义的匿名lambda函数,是不是很简单。


下面我举个复杂一些的例子:

key values nested_values
1 [1, 2, 3] [[1, 2, 3], [], [4, 5]]

 如果我们想对数组中的每个元素都加上同一行的key,sql可以写成这样:


spark 2.4让你飞一般的处理复杂数据类型如果你需要处理多层嵌套的数据类型,比如我们例子中的nested_values,没关系,你直接写一个两层的匿名lambda函数 就可以了:

spark 2.4让你飞一般的处理复杂数据类型

2 性能好在哪里


有人就问了,这种在 sql 中写 匿名lambda函数 就是轻便了一些,和 自定义一个 udf 到底有什么差距,其实我今天就是想重点探讨一下这个问题

其实两者的差距就在于直接写lambda函数不需要序列化和反序列化, udf 需要,你想呀,如果对每条数据都要进行序列化和反序列化, 对于海量数据,性能必定有很大的损失。

对于 tansform 处理一个数组,spark2.4 内部会创建一个 tansform 类型的表达式节点

spark 2.4让你飞一般的处理复杂数据类型

这个节点对数组的处理流程如下,需要注意的是,spark 会使用 encoder 把加载的数据,或者jvm对象转换为一种内部的数组字节格式 InternalRow,这种不同于java 序列化,虽然都是把对象转换为字节数组,但是表达式生成的代码可以直接操作字节数组,而不需要反序列化,这种字节数组格式大大提高了处理时间效率和空间效率。

spark 2.4让你飞一般的处理复杂数据类型

arrayTransform 表达式会遍历数组,然后应用你定义的匿名lamdba 函数,最后更新相应的元素。

下面我们来看下 udf 方式的处理方式:

spark 2.4让你飞一般的处理复杂数据类型

看到没有,中间处理过程中,需要先把catalyst类型(也就是  InternalRow 格式) 格式转换为 scala 类型, 然后应用自定义函数,然后再转回去,多了一次序列化和反序列化的性能损耗,所以如果在海量数据下,这种性能损失还是很大的。


大家都在看

spark sql源码系列:

是时候学习真正的spark技术了  从0到1认识 spark sql  spark sql 源码剖析 PushDownPredicate   spark sql 源码剖析 OptimizeIn 篇

structured streaming 系列:

structured streaming 原理剖析   structured streaming 碰上kafka structured streaming 是如何搞定乱序时间的

spark streaming 系列:

spark streaming 读取kafka各种姿势详解   spark streaming流式计算中的困境与解决之道 

spark core 系列:

彻底搞懂spark shuffle过程(1)  彻底搞懂spark shuffle过程(2)  spark内存管理-Tungsten框架探秘

spark 机器学习系列





关注【spark技术分享】


一起撸spark源码,一起玩spark最佳实践


spark 2.4让你飞一般的处理复杂数据类型



听说新版微信这有个好看


原文始发于微信公众号(spark技术分享):spark 2.4让你飞一般的处理复杂数据类型

关注公众号:spark技术分享

联系我们联系我们