首页技术文章正文

云计算大数据培训之Spark调优(4)

更新时间:2017-09-01 来源:黑马程序员云计算大数据培训学院 浏览量:

六,减少shuffle的发生 

1,尽量避免使用会发生shuffle的算子 
2,在数据源头,对我们的数据提前进行聚合

七,优化数据结构 
1、避免对象套对象 
2、减少数据集合的使用,尽量使用数组 
3、因为对象需要序列化,能不用对象就尽量不用,建议使用字符串拼接 
4、可以使用第三方提供的占用内存小,序列化速度快的数据结构类库,例如 fastUtil类库

八,避免数据倾斜 
1,数据倾斜的概念 
有的时候,我们可能会遇到大数据计算中一个最棘手的问题——数据倾斜,此时spark作业性能会比期望查很多。数据倾斜调优,就是使用各种技术方案解决不同类型的数据倾斜问题,一保证spark作业的性能

2,数据倾斜发生的现象: 
A、绝大多数task执行的都非常快,但个别task执行极慢。比如,总共有1000个task,997个task都在一分钟内执行完了,但是剩余两三个task却要一两个小时,这种情况很常见 
B、原本能够正常执行的spark作业,某天突然爆出 OOM(内存溢出)异常,官场异常栈,是我们写的业务代码造成的,这种情况比较少见

3,数据倾斜发生的原理 
数据倾斜的原理很简单:在进行shuffle的时候,必须将各个节点上相同的key拉取到某个节点上的一个task来进行处理,比如按照key进行聚合或者join等操作,此时,如果某个key对应的数据量特别大的话,就会发生数据倾斜,比如大部分key对应10 条数据,但是个别的key却对应了100万调数据,那么大部分task可能就只会分配到10-条数据,然后1秒就运行完了,但是个别task可能分配到了100万数据,要运行一两个小时。因此,整个spark作业的运行进度是由运行时间最长的那个task决定的

4,然后定位导致数据倾斜的代码 
数据倾斜只会发生在shuffle过程中,这里给大家罗列了一些常用的并且可能会触发shuffle操作的算子:groupByKey, reduceByKey, aggregateByKey, join, cogroup, repartition等。出现数据倾斜时,可能就是你的代码中使用了这些算子的某一个导致的

查看数据倾斜的数据:eventLogRDD.sample(false,0.1).countByKey().foreach(println(_))

5,数据倾斜的解决方案

解决方案一:使用HIve ETL预处理数据

方案使用场景:导致数据倾斜的是Hive表,如果该Hive表中的数据本身很不均匀(比如某个key对应了100万数据,其他的key才对应了10条数据),而且业务场景需要频繁使用spark对Hive表执行某个分析操作,那么比较适合使用这种方案

方案实现思路:此时可以评估一下,是否可以通过Hive来进行数据预处理(即通过Hive ETL预先对数据按照key进行聚合,或者是预先和其他表进行join),然后在spark作业中针对的数据源就不是原来的Hive表了,而是预处理后Hive表。此时由于数据已经预先进行了聚合或join操作,那么spark作业中也就不需要原先的shuffle类算子执行的这类操作了

方案实现原理:这种方案从根本上解决了数据倾斜,因为彻底避免了在spark中执行shuffle类算子,那么就不会有数据倾斜的问题了,但是这里也要提醒一下大家,这种方式属于治标不治本,因为毕竟数据本身就存在不均匀的问题,所以Hive ETL中进行groupBy 或者join等shuffle操作时,还是会出现数据倾斜,导致Hive ETL的速度很慢,我们只是把数据倾斜的发生提前到了Hive ETL中,避免了spark程序发生数据倾斜而已

方案优点:实现起来简单便捷,效果还非常好,完全规避了数据倾斜,spark作业的性能会大幅提升

方案缺点:治标不治本,Hive ETL 中还是会发生数据倾斜

解决方案二:过滤少数导致倾斜的key

方案使用的场景:如果发现导致倾斜的key就少数几个,而且对计算本身的影响并不是很大的话,那么很适合使用这种方案,比如99% 的key就对应10条数据,但是只有一个key对应了100万数据,从而导致了数据倾斜

方案实现思路:如果我们判断那少数的几个数据量特别多的key,对作业的执行和就算结果不是特别重要的话,那么干脆就直接过滤掉那少数的几个key,比如,在spark SQL中可以使用where 子句过滤掉这些key或者在sparkRDD执行filter算子过滤掉这些key,如果每次作业执行时,动态判定那些key的数据量最多然后再进行过滤,那么可以直接使用sample算子对RDD进行采样,然后计算出每个key的数量,取数据量最多的key过滤掉即可

方案实现原理:将导致数据倾斜的key过滤掉之后,这些key就不参与计算了,自然不可能产生数据倾斜

方案优点:实现简单,而且效果也很好,可以完全规避掉数据倾斜。

方案缺点:使用场景不多,大多数情况下,导致倾斜的key还是很多的,并不是只有少数几个

解决方案三:提高shuffle操作的并行度

方案使用场景:如果我们必须要对数据倾斜迎难而上,那么就建议优先使用这种方案,因为这是处理数据倾斜最简单的一种方案

方案实现思路:在对RDD执行shuffle算子时,给shuffle算子传入一个参数,比如:reduceByKey(+,1000),该参数就设置了这个shuffle算子执行时shuffle read task的数量,对于sparkSQL中的shuffle类语句,比如GroupByKeyDemo,join,等,需要设置一个参数,即spark.sql.shuffle.partitions,该参数代表了shuffle read task 的并行度,该值默认是200,对于很多场景来说都有点过小

方案实现原理:增加shuffle read task的数量,可以让原本分配给一个task的多个key分配给多个task,从而让每个task处理比原来更少的数据。举例来说,如果有5个key,每个key对应10条数据,这5个key都是分配给一个task的。那么这个task就要处理50条数据,而增加了shuffle read task 以后。每个task就分配到一个key,即每个task就处理10条数据,那么自然每个task的执行时间都会变短了。

方案优点:实现起来比较简单,可以有效缓解和减轻数据倾斜的影响

方案缺点:只是缓解了数据倾斜而已,没有彻底根除问题,根据时间来看,其效果有限

解决方案四:两阶段聚合(局部集合+全局聚合)

方案使用场景:对RDD执行reduceByKey等聚合类shuffle算子或者在sparkSQL中使用GroupBy 语句进行分组聚合时,比较适用这种场景

方案实现思路:这个方案的核心实现思路就是进行两阶段聚合。第一次是局部聚合,先给每个key都打上一个随机数,比如10以内的随机数,此时原先的key就变得不一样了,比如(hello,1),(hello,1),(hello,1),(hello,1)… 就会变成(1_hello,1),(1_hello,1),(2_hello,1),(2_hello,1)… 接着对打上随机数后的数据,执行reduceByKey等聚合操作,进行局部聚合,那么聚合结果,就会变成(1_hello,2),(2_hello,2)… 然后将各个key的前缀给去掉,就会变成(hello,2),(hello,2)… ,再次进行全局聚合操作,就可以得到最终结果了,比如(hello,4)

方案实现原理:将原本相同的key通过附加随机前缀的方式,变成多个不听的key,就可以让原本被一个task处理的数据分散到多个task上去做局部聚合,进而解决单个task处理数据过多的问题,接着取出掉随机前缀,再次进行全局聚合,就可以得到最终的结果

方案优点:对于聚合类的shuffle操作导致的数据倾斜,效果是非常不错的,通常都可以解决掉数据倾斜,或者至少是大幅度缓解数据倾斜,将spark作业的性能提升数倍以上

方案缺点:仅仅适用于聚合类的shuffle操作,适用范围相对较窄。如果是join类的shuffle操作,还得用其他的解决方案

九,开启推测机制

推测机制后,如果集群中,某一台机器的几个task特别慢,推测机制会将任务分配到其他机器执行,最后spark会选取最快的作为最终结果

在spark-default.conf 中添加:spark.speculation true

推测机制与一下几个参数有关: 
1、spark.speculation.interval 100: 检测周期,单位毫秒 
2、Spark.speculation.quantile 0.75: 完成task的百分比时启动推测 
3、Spark.speculation.multiplier 1.5: 比其他的慢多少倍时启动推测

十,算子使用技巧

1、非必要的情况下,不要使用collect 
2、尽量使用,例如:foreachPartition 就不要使用foreach,在创建连接对象(数据连接对象等),尽量在分区上创建


本文版权归黑马程序员云计算大数据培训学院所有,欢迎转载,转载请注明作者出处。谢谢!
作者:黑马程序员云计算大数据培训学院
首发:http://cloud.itheima.com/;
分享到:
在线咨询 我要报名
和我们在线交谈!