c语言计算磁盘大小,通过累积器获取RDD磁盘大小并计算分区数量

阅读: 评论:0

c语言计算磁盘大小,通过累积器获取RDD磁盘大小并计算分区数量

c语言计算磁盘大小,通过累积器获取RDD磁盘大小并计算分区数量

在工作过程中,很多spark在saveAsTextFile的过程中都会repartition,用于减少磁盘文件,节约内存空间;但是repartition的分区数,只能根据实际结果测试后重新进行调整或者根据自己的经验进行预估。但是当数据骤增或骤减的时候之前的经验值就不是那么可靠,而且当数据骤增时采用snapyy、gz等让当个文件过大导致task oom。

但是在实际测试、API说明中发现,SizeEstimator 只是计算了Java Heap中大小而不是磁盘落地大小。

于是使用:RDD总数/takeSample取样数*takeSample取样大小=RDD总大小;(sample 返回的数据占比不一定与参数相同)

这样有两个缺点:

有两个shuffle

结果浮动较大

所以,我们使用了累加器Accumulator,使用map获取每行的大小并进行累加,代码如下:

import org.apache.spark.SparkContext

import org.apache.spark.rdd.RDD

/**

* 通过累积器计算分区数量

*/

object ComputePartitionNum {

def compute(sc: SparkContext, rdd: RDD[String], blockSize:Double = 128, compressClass: String = "snappy"): Int = {

val accum = sc.longAccumulator("rddSizeToPartitionNum")

rdd.mapPartitions(iter => iter.map(x=>accum.Bytes.length))).collect()

val rddSize = accum.value

compressClass match {

case "snappy" => il(rddSize / 1024 / 1024 / blockSize * 0.27).toInt

case "gz" => il(rddSize / 1024 / 1024 / blockSize * 0.17).toInt

}

}

}

累加器必须通过Action产生Shuffle之后才会进行累加,试过take(1)、count、sample,都比collect效率地下

rdd.map(x => accum.Bytes.length)).collect()

压缩格式compress snappy、gz 的压缩比;本来要采用2012年google官方提供的的压缩比,但是与实际差距较大;于是自己测试了snappy、gz的压缩比,计算得出snappy:0.27(浮动在0.22-0.28) gz:0.17

有一点不明:

能否不使用collect或者比collect效率更高的action

本文发布于:2024-01-28 04:41:38,感谢您对本站的认可!

本文链接:https://www.4u4v.net/it/17063881024853.html

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。

下一篇:RDD
标签:磁盘   大小   分区   数量   语言
留言与评论(共有 0 条评论)
   
验证码:

Copyright ©2019-2022 Comsenz Inc.Powered by ©

网站地图1 网站地图2 网站地图3 网站地图4 网站地图5 网站地图6 网站地图7 网站地图8 网站地图9 网站地图10 网站地图11 网站地图12 网站地图13 网站地图14 网站地图15 网站地图16 网站地图17 网站地图18 网站地图19 网站地图20 网站地图21 网站地图22/a> 网站地图23