数据开发之离线计算_Spark任务调优与RSS

数据开发之离线计算_Spark任务调优与RSS

1.Spark常用参数解释

Spark的默认配置文件位于堡垒机上的这个位置: $SPARK_CONF_DIR/spark-defaults.conf,用户可以自行查看和理解。默认值优先级最低,用户如果提交任务时或者代码里明确指定配置,则以用户配置为先。以下常用参数配置均可以通过--conf XXX=Y方式使用:

2.Spark调优技巧

2.1 并发相关参数

1.什么时候需要调整Spark任务并发?

若一个stage内的所有task启动时间差很多,则说明并行度不够,想要提升任务执行时间就需要增加并发。

2.如何提升当前任务的最大并发task个数?

就是调节–conf spark.dynamicAllocation.maxExecutors=XXX(当前默认配置为100)。注意,提高executor个数会占用队列资源,请根据队列资源情况合理调整。

3.shuffle stage的partition数量怎么确定?

根据task执行情况来定,根据经验,一般保持每个task的 input + shuffle read 量在300-500M左右比较合适–conf spark.sql.shuffle.partitions=XXXX (注意:该值建议< 10000,设置过大shuffle元信息过多,容易导致driver OOM)

2.2 任务申请内存

1.如何计算每个task可用内存大小?

平均每个task的可用内存大致可以认为是:spark.executor.memory * spark.task.cpus / spark.executor.cores。

2.如何设置spark.executor.memory 和spark.executor.cores?

一般来说,spark.executor.cores : spark.executor.memory比例保持在1:2比较合适,调整的时候建议保持spark.executor.cores不动(一般是4),只调节spark.executor.memory。若shuffle数据量很大,个别任务发生spill,则说明单个task需要更多的内存,这个时候可以同时调节这两个值,减少spark.executor.cores / 增加spark.executor.memory ,来提高平均每个task的可用内存。stage发生spill不是失败,只是task想要的内存量比较大,由于内存不足发生写磁盘,故发生spill任务一定会变慢。

2.3 Fetch failure如何定位

当日志中大量出现fetch failure报错,一般都是由于磁盘繁忙所致。可以在task的日志中找到打印fetch failure的相关行,会有ip:7337的fetch failure报错,然后访问对应IP物理机监控模块,查询该IP对应日志报错时段的磁盘指标,如磁盘使用率打满,就可以确定该IP对应节点磁盘繁忙,因此shuffle服务故障会发生fetch failure。处理方案就是挑磁盘不繁忙时候重试即可。

3.Spark Shuffle参数调优

3.1 Spark Shuffle过程

Spark 包含三种 shuffle writer,上图展示的是BypassMergeSortShuffleWriter,以此为例简单介绍 spark
shuffle。Shuffle Writer会将每个shuffle write task的数据,按照一定的规则进行划分,比如说hash(具体什么规则是通过rdd之间的transform进行设定的,也可以用户自行设定 rdd 的 Partitioner),并对应的写到临时文件中。在所有数据写完之后,会将这些临时文件合并成一个文件,并生成 index 文件用于索引每个reduce task数据所对应的部分。
当所有的shuffle write task完成后,会进入到shuffle read stage,并启动 shuffle read task。read task会读取之前所有write task所产生的shuffle数据中对应为自己的某一部分。如上图右边所示,每个read task内的数据来源与左边的write task颜色一致。

3.2 Spark Shuffle缺点

从上述流程可以发现,对于一次shuffle read,需要读取前面m个write task所产生的对应自己的那一部分,所以一个read task 需要读取m个write task生成的临时文件,如果一次shuffle read包含r个read task,那么就需要读取m * r个block。

假设一次shuffle的数据量是100GB,m和r都是1000,那么平均来说,一个block的大小是 100GB / (1000 * 1000) = 104 kb。总共需要100w次网络请求,每次需要读取104kb。这会带来大量的随机读,随机读小块数据,也会带来极低的shuffle read性能;更糟糕的是,如果shuffle read性能过低,很有可能触发FetchFailedException,从而导致shuffle失败,需要整个从shuffle write task开始重算,进一步增大了 shuffle 的时长。

3.3 Remote Shuffle Service过程

remote shuffle service被引进用来解决上述缺点,是一个只负责写不负责读的shuffle service,市面上有很多该类型的shuffle service,比如Facebook所做的Cosco,Linkedkin做的Magnet,Uber做的 Zeus。

简单来说,就是把上述的spark shuffle过程进行改写:shuffle write的时候,将数据直接写到remote shuffle service上而不落盘;相同的reduce partition会被写到相同的shuffle service上,以此进行一个聚合,并写到 hdfs 上的文件中。shuffle read的时候,会直接读取 hdfs。通过 hdfs 上的 shuffle 数据备份,以此来消除 FetchFailedException的产生,避免shuffle write task的重算,也避免了小块数据的频繁读取。