Spark 3.x Spark Core详解 & 性能优化

最近在测试UDAF性能的时候,本来以为UDAF的原理和aggregateByKey算子底层算法相同,但是实际在性能测试发现时才发现多了很多额外的开销,导致性能有一定的下降。后来发现Spark 3.0提供的实现方式才是合理的。基于此,就好好地去了解了一下Spark 3.0。

在Spark 3.0中,引入了一些新的函数,并且对Spark 2.x性能做了优化。下文会详细介绍。

Spark性能调优与原理分析
京东
¥65.00
去购买​

AQE

AQE即为Adaptive Query Execution。Spark在执行之前会先生成DAG图,生成执行计划,AQE就是在执行时,在对这个执行计划根据统计数据重新做进一步的优化。

引入AQE之后的执行流程如下:

在Spark 3.0引入AQE之后,性能有了很大的提升。

动态coalesc分区

Spark 2.x中,如果我们设置了一个很大的分区数量,那么每个分区数据处理的数据量不多,就会降低性能。在Spark 3.0之后,AQE会自动将小分区合并到大分区。这样能一定程度上能够提升性能。

没有动态coalesc分区的时候如下:

有动态coalesc分区的时候如下:

动态切换join策略

在Spark 2.x中,broadcast-hash join只能通过参数控制。但这个参数不太好控制。在Spark 3.x之后,AQE能够根据运行时的统计信息自动将sort-merge join切换到broadcast-hash join。

应用动态切换join策略之后的改变如下:

动态优化数据倾斜的join

在Hive中可以通过参数控制数据倾斜的join,本质上就是先加盐后join。但Spark 2.x中没有这个功能,我们每次都需要手动处理数据倾斜问题。在Spark 3.x之后,可以自动将倾斜的分区分成一个个小的分区去进行join。极大优化了性能。

没有优化数据倾斜的时候如下:

有优化数据倾斜的时候如下:

动态裁剪分区

在行星模型中,Spark 2.x的优化器很难在编译时就确定哪些分区可以跳过不读。这就会导致读了一些不需要的数据。但Spark 3.0之后,Spark会首先过滤维表,根据过滤后的结果找到只需要读事实表的哪些分区。这样就能极大提升性能。在TPC-DS基准性能测试中,102中的60个SQL性能提升了2倍到18倍,性能得到了显著提升。

Spark大数据商业实战三部曲:内核解密 商业案例 性能
京东
¥306.00
去购买​

UDAF性能优化

在Spark 2.x中,实现UDAF有两种方式,一种是实现UserDefinedAggregationFunction,另一种是实现Aggregator。但是实现Aggregator的UDAF无法被注册。

前者适应于类型不固定的数据,而后者适应于类型固定的数据。所以在前者的实现中,input/output/buffer都是Row。而后者则是定义好的类型。

如果是处理基础类型,没有什么问题,但是如果要处理复杂类型,比如Map,那么要先放到Row中,显然不如直接更新本地变量要快。

在我们的性能测试中,如果用Row,那么函数执行1s,有250ms的时间在读Row中的map和把map写到Row中。但是换成Aggregator则没有这个开销。

在Spark 3.0中,基于Aggregator的实现才能被注册,性能有了大幅提升。

更详细的对比可以在这篇文章里面找到:UDAF and Aggregators: Custom Aggregation Approaches for Datasets in Apache Spark

用于DataFrame转换的新函数

from_csv

和Spark 2.x中的from_json差不多,这个函数会将一个CSV字符串转换成一个Struct结构。如果这个CSV字符串无法被解析,就会返回null。

用法如下:

val exployeeInfo = "1,张三,前端"::"2,李四,后端"::"3,王五,计算端"::Nil
val schema = new StructType()
    .add("IdCard",IntegerType)
    .add("Name",StringType) 
    .add("DeptName",StringType)
val options = Map("delimiter" ->",")
val exployeeDF = exployeeInfo.toDF("Exployee_Info")
    .withColumn("struct_csv",from_csv('Exployee_Info, schema,options))exployeeDF.show()

to_csv

和from_csv相反,将一个Struct类型的数据转换为CSV字符串。

用法如下:

 exployeeDF
    .withColumn("csv_string",to_csv($"struct_csv",Map.empty[String, String].asJava))
    .show

schema_of_csv

查询CSV字符串的schema。

用法如下:

exployeeDF
    .withColumn("schema",schema_of_csv("csv_string"))
    .show

forall

对数组中的每个元素都执行特定操作,如果全部都成功就返回true,否则返回false.

用法如下,下面这个例子检查每个元素是否为奇数:

val  df = Seq(Seq(2,4,6),Seq(5,10,3)).toDF("integer_array")
df.withColumn("number",forall($"integer_array",(x:Column)=>(lit(x%2==1)))).show

transform

对数组中的每个元素都执行特定操作,返回一个新的数组。

用法如下,下面这个例子将数组中每个数字都乘2:

val df = Seq((Seq(1,2,3)),(Seq(4,5,6))).toDF("integer_array")
df.withColumn("integer_array",transform($"integer_array",x=>x*2)).show

overlay

用于替换某一列的数据,将指定位置的数据替换成特定数据。

用法如下,下面这个例子将Hello World换成Hello 张三

val message = "Hello World"::Nil
val messageDF = greetingMsg.toDF("message")
messageDF.withColumn("message",overlay($"message",lit("World"),lit("7"),lit("12"))).show

split

按照给定正则对字符串进行拆分。

用法如下,下面这个例子将张三,李四一个字符串拆分为张三李四两个字符串:

val names = "张三,李四"
val namesDF = names.toDF("names")
namesDF.withColumn("names", split($"names", ",", 2)).show()

map_entries

类似于Java中的map.entrySet()函数。

用法如下:

val df = Seq(Map(1->"张三",2->"李四")).toDF("id_name")
df.withColumn("id_name_entry", map_entries($"id_name")).show()

map_zip_with

用指定函数将两个map合并成一个。这个算子特别有用。之前将两个map相同key合并到一起要写好长一段代码,就像老太太的裹脚布,又臭又长。

用法如下,下面的例子是拿到每个员工的提成总和:

val df = Seq((Map("张三"->10000,"李四"->25000),
             Map("张三"->1000,"李四"->2500))).toDF("emp_sales_dept1","emp_sales_dept2")

df.withColumn("total_emp_sales",map_zip_with($"emp_sales_dept1",$"emp_sales_dept2",(k,v1,v2)=>(v1+v2))).show

map_filter

对map进行过滤,只保留满足特定条件的数据。会返回一个新的map。

用法如下,下面这个例子找出提成大于1000的员工:

val df = Seq(Map("张三"->100,"李四"->2500))
          .toDF("emp_sales")

df.withColumn("filtered_sales",map_filter($"emp_sales",(k,v)=>(v>1000))).show

transform_values

对map中的value用指定函数进行转换。

用法如下,下面的例子给每位员工薪资+1000块:

val df = Seq(Map("张三"->1000,"李四"->2500))
         .toDF("employee_salary")

df.withColumn("employee_salary",transform_values($"employee_salary",(name,salary)=>(salary+5100))).show

transform_keys

transform_values类似,只是对key进行操作。

xhash64

对给定列用64bit的xxhash算法计算hash值。

从Spark SQL迁移到Scala API的算子

有些Spark SQL中的函数,在Spark中也用的很频繁。所以Spark 3.0将这一些函数从Spark SQL迁移到了Scala API。以前只能通过Spark SQL或者Spark callUDF调用。但是现在可以很方便的通过Scala API进行调用啦。

date_sub

日期/时间戳/字符串中减去特定天数。如果是字符串,格式需要是yyyy-MM-dd or yyyy-MM-dd HH:mm:ss.SSSS。这个函数也很有用。

用法如下,下面这个例子将日期减去一天:

var df = Seq(
        "2021-01-02 01:01:01",
        "2021-01-03 01:01:01"
    ).toDF("time")
df.withColumn("subtracted_time", date_sub($"time",1)).show()

date_add

date_sub相反,日期加上特定天数。

months_add

date_add类似,但是是日期加上特定月数。

zip_with

合并两个数组。

用法如下:‍

val df = Seq((Seq(2,4,6),Seq(5,10,3)))
         .toDF("array_1","array_2")

df.withColumn("merged_array",zip_with($"array_1",$"array_2",(x,y)=>(x+y))).show

filter

从数组中过滤出来满足条件的数据。

aggregate

对数组中的数据进行合并。

Spark SQL 新函数

acosh

查找给定表达式的双曲余弦的倒数。

asinh

求给定表达式的双曲正弦的倒数。

atanh

求给定表达式的双曲正切的倒数。

bit_and/bit_or/bit_xor

计算比特与,比特或,比特异或。

bit_count

返回特定字符串的bit长度

bool_and and bool_or

count_if

返回特定列中满足特定函数的个数

date_part

从date/timestamp中提取特定部分,比如小时,分钟等。

div

做除法操作

every/some

every: 如果列中的每个数据满足条件,返回true。some: 如果列中的只要有一个满足条件,返回true。

make_date/make_interval/make_timestamp

构造日期

max_by/min_by

比较两列,返回最大/最小值。

typeof

返回给定列的数据类型

version

返回Spark版本

justify_days/justify_hours/justify_interval

调整时间

商匡云商
Logo
注册新帐户
对比商品
  • 合计 (0)
对比
0
购物车