RDD整体上分为Value类型、双Value类型和Key-Value类型
4)具体实现
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}object ScalaRDD {def main(args: Array[String]): Unit = {val conf: SparkConf = new SparkConf().setMaster("local[2]").setAppName("ScalaRdd")val sc = new SparkContext(conf)val rdd: RDD[Int] = sc.parallelize(1 to 4,2)val mapRdd: RDD[Int] = rdd.map(_*llect().foreach(println)sc.stop()}
}
4)具体实现
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}object ScalaRDD {def main(args: Array[String]): Unit = {val conf: SparkConf = new SparkConf().setMaster("local[2]").setAppName("ScalaRdd")val sc = new SparkContext(conf)val rdd: RDD[Int] = sc.parallelize(1 to 4,2)val mapRdd: RDD[Int] = rdd.mapPartitions(x=>x.map(_*2))llect().foreach(println)sc.stop()}
}
4)具体实现
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}object ScalaRDD {def main(args: Array[String]): Unit = {val conf: SparkConf = new SparkConf().setMaster("local[2]").setAppName("ScalaRdd")val sc = new SparkContext(conf)val rdd: RDD[Int] = sc.parallelize(1 to 4,2)val mapRdd: RDD[(Int, Int)] = rdd.mapPartitionsWithIndex((index,items)=>{items.map((index,_))})llect().foreach(println)sc.stop()}
}
//扩展功能:第二个分区元素*2,其余分区不变
val mapRdd: RDD[(Int, Int)] = rdd.mapPartitionsWithIndex((index,items)=>{items.map(x=>{if (index==1){(index,x*2)}else{(index,x)}})})
4)具体实现
object ScalaRDD {def main(args: Array[String]): Unit = {val conf: SparkConf = new SparkConf().setMaster("local[2]").setAppName("ScalaRdd")val sc = new SparkContext(conf)val listRDD=sc.makeRDD(List(List(1,2),List(3,4),List(5,6),List(7)), 2)listRDD.flatMap(x=>x).collect.foreach(println)sc.stop()}
}
4)具体实现
object ScalaRDD {def main(args: Array[String]): Unit = {val conf: SparkConf = new SparkConf().setMaster("local[2]").setAppName("ScalaRdd")val sc = new SparkContext(conf)val rdd: RDD[Int] = sc.parallelize(1 to 4,2)val max: RDD[Int] = rdd.glom().map(_.llect.foreach(println)sc.stop()}
}
4)具体实现
object ScalaRDD {def main(args: Array[String]): Unit = {val conf: SparkConf = new SparkConf().setMaster("local[2]").setAppName("ScalaRdd")val sc = new SparkContext(conf)val rdd: RDD[Int] = sc.parallelize(1 to 4,2)val groupRdd: RDD[(Int, Iterable[Int])] = upBy(x=>x%llect.foreach(println)sc.stop()}
}
按照首字母第一个单词相同分组
val rdd1: RDD[String] = sc.makeRDD(List("hello","hive","hadoop","spark","scala"))
upBy(x=>x.substring(0,1)).collect().foreach(println)
groupBy会存在shuffle过程
shuffle:将不同的分区数据进行打乱重组的过程
shuffle一定会落盘。可以在local模式下执行程序,通过4040看效果。
object ScalaRDD {def main(args: Array[String]): Unit = {val conf: SparkConf = new SparkConf().setMaster("local[2]").setAppName("ScalaRdd")val sc = new SparkContext(conf)val rdd: RDD[String] = sc.makeRDD( List("Hello Scala", "Hello Spark", "Hello World"))rdd.flatMap(_.split(" ")).map((_,1)).groupBy(_._1).map(x=>(x._1,x._2.size)).collect.foreach(println)sc.stop()}
}
扩展复杂版WordCount
val rdd: RDD[(String, Int)] = sc.makeRDD(List(("Hello Scala", 2), ("Hello Spark", 3), ("Hello World", 2)))
rdd.map(x=>(x._1+" ")*x._2).flatMap(_.split(" ")).map((_,1)).groupBy(_._1).map(x=>(x._1,x._2.size)).foreach(println)
4)具体实现
object ScalaRDD {def main(args: Array[String]): Unit = {val conf: SparkConf = new SparkConf().setMaster("local[2]").setAppName("ScalaRdd")val sc = new SparkContext(conf)val rdd: RDD[Int] = sc.parallelize(1 to 4,2)rdd.filter(_%2==0).collect.foreach(println)sc.stop()}
}
4)具体实现
object ScalaRDD {def main(args: Array[String]): Unit = {val conf: SparkConf = new SparkConf().setMaster("local[2]").setAppName("ScalaRdd")val sc = new SparkContext(conf)val rdd: RDD[Int] = sc.parallelize(1 to 100,2)//放回抽样结果rdd.sample(true,0.4,2).collect.foreach(println)//不放回抽样结果rdd.sample(false,0.2,4).collect.foreach(println)sc.stop()}
}
具体实现
object ScalaRDD {def main(args: Array[String]): Unit = {val conf: SparkConf = new SparkConf().setMaster("local[2]").setAppName("ScalaRdd")val sc = new SparkContext(conf)val rdd: RDD[Int] = sc.makeRDD(List(1,2,1,5,2,9,6,1))rdd.distinct().collect().foreach(println)//对rdd采用多个Task去重,提高并发度rdd.distinct(4).collect().foreach(println)sc.stop()}
}
Coalesce算子包括:配置执行Shuffle和配置不执行Shuffle两种方式。
1、不执行Shuffle方式
5)具体实现
object ScalaRDD {def main(args: Array[String]): Unit = {val conf: SparkConf = new SparkConf().setMaster("local[2]").setAppName("ScalaRdd")val sc = new SparkContext(conf)val rdd: RDD[Int] = sc.makeRDD(Array(1,2,3,4),4)val coalesceRdd: RDD[Int] = alesce(4)//打印查看对应的分区数val indexRdd: RDD[Int] = coalesceRdd.mapPartitionsWithIndex((index, datas) => {//打印每个分区数据,并带分区号datas.foreach(data => {println(index + ":" + data)})//返回分区的数据datas})llect()sc.stop()}
}
2、执行Shuffle方式
val rdd: RDD[Int] = sc.makeRDD(Array(1,2,3,4,5,6),3)
val coalesceRdd: RDD[Int] = alesce(4,true)
输出结果:
0:2
3:1
0:4
3:3
0:6
3:5
3、Shuffle原理
4)具体实现
object ScalaRDD {def main(args: Array[String]): Unit = {val conf: SparkConf = new SparkConf().setMaster("local[2]").setAppName("ScalaRdd")val sc = new SparkContext(conf)val rdd: RDD[Int] = sc.makeRDD(Array(1,2,3,4,5,6),3)val coalesceRdd: RDD[Int] = alesce(4,true)val repartitionRdd: RDD[Int] = partition(2)//打印查看对应的分区数val indexRdd: RDD[Int] = repartitionRdd.mapPartitionsWithIndex((index, datas) => {//打印每个分区数据,并带分区号datas.foreach(data => {println(index + ":" + data)})//返回分区的数据datas})llect()sc.stop()}
}
1)coalesce重新分区,可以选择是否进行shuffle过程。由参数shuffle: Boolean = false/true决定。
2)repartition实际上是调用的coalesce,进行shuffle。源码如下:
def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {coalesce(numPartitions, shuffle = true)
}
3)coalesce一般为缩减分区,如果扩大分区,不使用shuffle是没有意义的,repartition扩大分区执行shuffle。
object ScalaRDD {def main(args: Array[String]): Unit = {val conf: SparkConf = new SparkConf().setMaster("local[2]").setAppName("ScalaRdd")val sc = new SparkContext(conf)val rdd: RDD[Int] = sc.makeRDD(Array(1,2,3,4,5,6,7,8,9,10,11),3)//合并分区(没有shuffle)// coalesce一般为缩减分区,如果扩大分区,不使用shuffle是没有意义的val cRdd: RDD[Int] = alesce(4)//重新分区(有shuffle)val rRdd: RDD[Int] = partition(4)//打印查看对应的分区数val indexRdd: RDD[Int] = rRdd.mapPartitionsWithIndex((index, datas) => {//打印每个分区数据,并带分区号datas.foreach(data => {println(index + ":" + data)})//返回分区的数据datas})llect()sc.stop()}
}
object ScalaRDD {def main(args: Array[String]): Unit = {val conf: SparkConf = new SparkConf().setMaster("local[2]").setAppName("ScalaRdd")val sc = new SparkContext(conf)val rdd: RDD[Int] = sc.makeRDD(List(2,1,5,3,4,6))rdd.sortBy(x=>x).collect().foreach(print)rdd.sortBy(x=>x,false).collect().foreach(print)sc.stop()}
}
3)需求说明:编写一个脚本,使用管道将脚本作用于RDD上。
(1)编写一个脚本,并增加执行权限
vi pipe.sh
#!/bin/sh
echo "Start"
while read LINE; doecho ">>>"${LINE}
done
chmod 777 pipe.sh
(2)创建一个只有一个分区的RDD
val rdd = sc.makeRDD (List("hi","Hello","how","are","you"),1)
(3)将脚本作用该RDD并打印
rdd.pipe("/opt/spark/pipe.sh").collect()
(4)创建一个有两个分区的RDD
val rdd = sc.makeRDD(List("hi","Hello","how","are","you"),2)
(5)将脚本作用该RDD并打印
rdd.pipe("/opt/spark/pipe.sh").collect()
4)具体实现
object ScalaRDD {def main(args: Array[String]): Unit = {val conf: SparkConf = new SparkConf().setMaster("local[2]").setAppName("ScalaRdd")val sc = new SparkContext(conf)val rdd: RDD[Int] = sc.makeRDD(1 to 4)val rdd1: RDD[Int] = sc.makeRDD(4 to 8)rdd.union(rdd1).collect().foreach(println)sc.stop()}
}
4)具体实现
object ScalaRDD {def main(args: Array[String]): Unit = {val conf: SparkConf = new SparkConf().setMaster("local[2]").setAppName("ScalaRdd")val sc = new SparkContext(conf)val rdd: RDD[Int] = sc.makeRDD(1 to 4)val rdd1: RDD[Int] = sc.makeRDD(4 to 8)rdd.subtract(rdd1).collect().foreach(println)sc.stop()}
}
4)具体实现
object ScalaRDD {def main(args: Array[String]): Unit = {val conf: SparkConf = new SparkConf().setMaster("local[2]").setAppName("ScalaRdd")val sc = new SparkContext(conf)val rdd: RDD[Int] = sc.makeRDD(1 to 4)val rdd1: RDD[Int] = sc.makeRDD(4 to 8)rdd.intersection(rdd1).collect().foreach(println)sc.stop()}
}
3)需求说明:创建两个RDD,并将两个RDD组合到一起形成一个(k,v)RDD
4)代码实现:
object ScalaRDD {def main(args: Array[String]): Unit = {val conf: SparkConf = new SparkConf().setMaster("local[2]").setAppName("ScalaRdd")val sc = new SparkContext(conf)val rdd1: RDD[Int] = sc.makeRDD(Array(1,2,3,4),3)val rdd2: RDD[String] = sc.makeRDD(Array("a","b","c","d"),3)val rdd3: RDD[String] = sc.makeRDD(Array("a","b","c"),3)val rdd4: RDD[String] = sc.makeRDD(Array("a","b","c"),4)//元素个数不同,不能拉链//Can only zip RDDs with same number of elements in each partition//rdd1.zip(rdd3).collect().foreach(println)//分区数不同,不能拉链//Can't zip RDDs with unequal numbers of partitions: List(3, 4)//rdd1.zip(rdd4).collect().foreach(println)rdd1.zip(rdd2).collect().foreach(println)sc.stop()}
}
4)具体实现
object ScalaRDD {def main(args: Array[String]): Unit = {val conf: SparkConf = new SparkConf().setMaster("local[2]").setAppName("ScalaRdd")val sc = new SparkContext(conf)val rdd: RDD[(Int, String)] = sc.makeRDD(Array((1,"aaa"),(2,"bbb"),(3,"ccc")),3)val rdd1: RDD[(Int, String)] = rdd.partitionBy(new HashPartitioner(2))println(rdd1.partitions.size)sc.stop()}
}
5)HashPartitioner源码解读
class HashPartitioner(partitions: Int) extends Partitioner {require(partitions >= 0, s"Number of partitions ($partitions) cannot be negative.")def numPartitions: Int = partitionsdef getPartition(key: Any): Int = key match {case null => 0case _ => NegativeMod(key.hashCode, numPartitions)}override def equals(other: Any): Boolean = other match {case h: HashPartitioner =>h.numPartitions == numPartitionscase _ =>false}override def hashCode: Int = numPartitions
}
6)自定义分区器
//自定义分区
class MyPartitioner(num:Int) extends Partitioner{//设置分区数override def numPartitions: Int = num//具体分区逻辑override def getPartition(key: Any): Int = {if (key.isInstanceOf[Int]){if (key.asInstanceOf[Int]%2==0){0}else{1}}else{0}}
}
object ScalaRDD {def main(args: Array[String]): Unit = {val conf: SparkConf = new SparkConf().setMaster("local[2]").setAppName("ScalaRdd")val sc = new SparkContext(conf)val rdd: RDD[(Int, String)] = sc.makeRDD(Array((1,"aaa"),(2,"bbb"),(3,"ccc")),3)val rdd1: RDD[(Int, String)] = rdd.partitionBy(new MyPartitioner(2))val indexRdd: RDD[(Int, String)] = rdd1.mapPartitionsWithIndex((index, datas) => {datas.foreach(data => {println(index + ":" + data)})datas})llect()sc.stop()}
}
4)具体实现
object ScalaRDD {def main(args: Array[String]): Unit = {val conf: SparkConf = new SparkConf().setMaster("local[2]").setAppName("ScalaRdd")val sc = new SparkContext(conf)val rdd: RDD[(String, Int)] = sc.makeRDD(List(("a",1),("b",5),("a",5),("b",2)))val value: RDD[(String, Int)] = duceByKey(_+_)value.foreach(println)sc.stop()}
}
3)需求说明:创建一个pairRDD,将相同key对应值聚合到一个seq中,并计算相同key对应值的相加结果。
4)代码实现:
object ScalaRDD {def main(args: Array[String]): Unit = {val conf: SparkConf = new SparkConf().setMaster("local[2]").setAppName("ScalaRdd")val sc = new SparkContext(conf)val rdd: RDD[(String, Int)] = sc.makeRDD(List(("a",1),("b",5),("a",5),("b",2)))val groupRdd: RDD[(String, Iterable[Int])] = upByKey()llect().foreach(println)groupRdd.map(x=>(x._1,x._2.sum)).collect().foreach(println)sc.stop()}
}
1)reduceByKey:按照key进行聚合,在shuffle之前有combine(预聚合)操作,返回结果是RDD[k,v]。
2)groupByKey:按照key进行分组,直接进行shuffle。
3)开发指导:在不影响业务逻辑的前提下,优先选用reduceByKey。求和操作不影响业务逻辑,求平均值影响业务逻辑。
4)需求分析
5)代码实现:
object ScalaRDD {def main(args: Array[String]): Unit = {val conf: SparkConf = new SparkConf().setMaster("local[2]").setAppName("ScalaRdd")val sc = new SparkContext(conf)val rdd: RDD[(String, Int)] = sc.makeRDD(List(("a", 3), ("a", 2), ("c", 4), ("b", 3), ("c", 6), ("c", 8)), 2)rdd.aggregateByKey(0)(math.max(_,_),_+_).collect().foreach(println)sc.stop()}
}
4)代码实现:
object ScalaRDD {def main(args: Array[String]): Unit = {val conf: SparkConf = new SparkConf().setMaster("local[2]").setAppName("ScalaRdd")val sc = new SparkContext(conf)val rdd: RDD[(String, Int)] = sc.makeRDD(List(("a", 3), ("a", 2), ("c", 4), ("b", 3), ("c", 6), ("c", 8)), 2)rdd.foldByKey(0)(_+_).collect().foreach(println)sc.stop()}
}
3)需求说明:创建一个pairRDD,根据key计算每种key的均值。(先计算每个key对应值的总和以及key出现的次数,再相除得到结果)
4)需求分析:
5)代码实现
object ScalaRDD {def main(args: Array[String]): Unit = {val conf: SparkConf = new SparkConf().setMaster("local[2]").setAppName("ScalaRdd")val sc = new SparkContext(conf)val rdd: RDD[(String, Int)] = sc.makeRDD(List(("a", 88), ("b", 95), ("a", 91), ("b", 93), ("a", 95), ("b", 98)), 2)val combineRdd: RDD[(String, (Int, Int))] = rddbineByKey((_, 1),(acc: (Int, Int), v) => (acc._1 + v, acc._2 + 1),(acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2))combineRdd.map(x=>{(x._1,x._2._1/x._2._2)}).collect().foreach(println)sc.stop()}
}
3)需求说明:创建一个pairRDD,按照key的正序和倒序进行排序
4)代码实现:
object ScalaRDD {def main(args: Array[String]): Unit = {val conf: SparkConf = new SparkConf().setMaster("local[2]").setAppName("ScalaRdd")val sc = new SparkContext(conf)val rdd: RDD[(Int, String)] = sc.makeRDD(Array((3,"aa"),(6,"cc"),(2,"bb"),(1,"dd")))rdd.sortByKey(true).collect().foreach(println)rdd.sortByKey(false).collect().foreach(println)sc.stop()}
}
4)代码实现:
object ScalaRDD {def main(args: Array[String]): Unit = {val conf: SparkConf = new SparkConf().setMaster("local[2]").setAppName("ScalaRdd")val sc = new SparkContext(conf)val rdd: RDD[(Int, String)] = sc.makeRDD(Array((3,"aa"),(6,"cc"),(2,"bb"),(1,"dd")))rdd.mapValues(x=>x+"|||").collect().foreach(println)sc.stop()}
}
3)需求说明:创建两个pairRDD,并将key相同的数据聚合到一个元组。
4)代码实现:
object ScalaRDD {def main(args: Array[String]): Unit = {val conf: SparkConf = new SparkConf().setMaster("local[2]").setAppName("ScalaRdd")val sc = new SparkContext(conf)val rdd: RDD[(Int, String)] = sc.makeRDD(Array((1, "a"), (2, "b"), (3, "c")))val rdd1: RDD[(Int, Int)] = sc.makeRDD(Array((1, 4), (2, 5), (4, 6)))rdd.join(rdd1).collect().foreach(println)sc.stop()}
}
操作两个RDD中的KV元素,每个RDD中相同key中的元素分别聚合成一个集合。
4)代码实现:
object ScalaRDD {def main(args: Array[String]): Unit = {val conf: SparkConf = new SparkConf().setMaster("local[2]").setAppName("ScalaRdd")val sc = new SparkContext(conf)val rdd: RDD[(Int, String)] = sc.makeRDD(Array((1, "a"), (2, "b"), (3, "c")))val rdd1: RDD[(Int, Int)] = sc.makeRDD(Array((1, 4), (2, 5), (4, 6)))up(rdd1).collect().foreach(println)sc.stop()}
}
练习
object ScalaRDD {def main(args: Array[String]): Unit = {val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("ScalaRdd")val sc = new SparkContext(conf)val rdd: RDD[String] = sc.textFile("in/agent.log")val rdd1: RDD[Array[String]] = rdd.map(x=>x.split(" "))val rdd2: RDD[(String, Int)] = rdd1.map(x=>(x(1)+"-"+x(4),1))duceByKey(_+_).map(x=>{val strings: Array[String] = x._1.split("-")(strings(0),(strings(1),x._2))}).groupByKey().mapValues(x=>{x.toList.sortBy(-_._2).take(3)}).collect().foreach(println)sc.stop()}
}
行动算子是触发了整个作业的执行。因为转换算子都是懒加载,并不会立即执行。
1)函数签名:def reduce(f: (T, T) => T): T
2)功能说明:f函数聚集RDD中的所有元素,先聚合分区内数据,再聚合分区间数据。
3)需求说明:创建一个RDD,将所有元素聚合得到结果
object ScalaRDD {def main(args: Array[String]): Unit = {val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("ScalaRdd")val sc = new SparkContext(conf)val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4))duce(_ + _))sc.stop()}
}
1)函数签名:def collect(): Array[T]
2)功能说明:在驱动程序中,以数组Array的形式返回数据集的所有元素。
注意:所有的数据都会被拉取到Driver端,慎用
3)需求说明:创建一个RDD,并将RDD内容收集到Driver端打印
object ScalaRDD {def main(args: Array[String]): Unit = {val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("ScalaRdd")val sc = new SparkContext(conf)val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4))llect().toList)sc.stop()}
}
1)函数签名:def count(): Long
2)功能说明:返回RDD中元素的个数
3)需求说明:创建一个RDD,统计该RDD的条数
object ScalaRDD {def main(args: Array[String]): Unit = {val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("ScalaRdd")val sc = new SparkContext(conf)val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4))unt())sc.stop()}
}
1)函数签名: def first(): T
2)功能说明:返回RDD中的第一个元素
3)需求说明:创建一个RDD,返回该RDD中的第一个元素
object ScalaRDD {def main(args: Array[String]): Unit = {val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("ScalaRdd")val sc = new SparkContext(conf)val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4))println(rdd.first())sc.stop()}
}
1)函数签名: def take(num: Int): Array[T]
2)功能说明:返回一个由RDD的前n个元素组成的数组
3)需求说明:创建一个RDD,统计该RDD的条数
object ScalaRDD {def main(args: Array[String]): Unit = {val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("ScalaRdd")val sc = new SparkContext(conf)val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4))println(rdd.take(3).toList)sc.stop()}
}
1)函数签名: def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T]
2)功能说明:返回该RDD排序后的前n个元素组成的数组
3)需求说明:创建一个RDD,获取该RDD排序后的前2个元素
object ScalaRDD {def main(args: Array[String]): Unit = {val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("ScalaRdd")val sc = new SparkContext(conf)val rdd: RDD[Int] = sc.makeRDD(List(1,3,2,2,3,4))println(rdd.takeOrdered(3).toList)sc.stop()}
}
3)需求说明:创建一个RDD,将所有元素相加得到结果
object ScalaRDD {def main(args: Array[String]): Unit = {val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("ScalaRdd")val sc = new SparkContext(conf)val rdd: RDD[Int] = sc.makeRDD(List(1,3,2,2,3,4))println(rdd.aggregate(0)(_ + _, _ + _))sc.stop()}
}
3)需求说明:创建一个RDD,将所有元素相加得到结果
object ScalaRDD {def main(args: Array[String]): Unit = {val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("ScalaRdd")val sc = new SparkContext(conf)val rdd: RDD[Int] = sc.makeRDD(List(1,3,2,2,3,4))println(rdd.fold(0)(_ + _))sc.stop()}
}
1)函数签名:def countByKey(): Map[K, Long]
2)功能说明:统计每种key的个数
3)需求说明:创建一个PairRDD,统计每种key的个数
object ScalaRDD {def main(args: Array[String]): Unit = {val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("ScalaRdd")val sc = new SparkContext(conf)val rdd = sc.makeRDD(List((1, "a"), (1, "a"), (1, "a"), (2, "b"), (3, "c"), (3, "c")))untByKey().foreach(println)sc.stop()}
}
1)saveAsTextFile(path)保存成Text文件
(1)函数签名
(2)功能说明:将数据集的元素以textfile的形式保存到HDFS文件系统或者其他支持的文件系统,对于每个元素,Spark将会调用toString方法,将它装换为文件中的文本
2)saveAsSequenceFile(path)?保存成Sequencefile文件
(1)函数签名
(2)功能说明:将数据集中的元素以Hadoop Sequencefile的格式保存到指定的目录下,可以使HDFS或者其他Hadoop支持的文件系统。
注意:只有kv类型RDD有该操作,单值的没有
3)saveAsObjectFile(path)?序列化成对象保存到文件
(1)函数签名
(2)功能说明:用于将RDD中的元素序列化成对象,存储到文件中。
本文发布于:2024-01-28 04:40:59,感谢您对本站的认可!
本文链接:https://www.4u4v.net/it/17063880654850.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
留言与评论(共有 0 条评论) |