其实还是推荐这个网站,写的很棒,点我
Applies a transformation function on each item of the RDD and returns the result as a new RDD. (返回一个新的RDD,该RDD有每一个输入元素经过func函数转换后组成)
def map[U: ClassTag](f: T => U): RDD[U] val a=sc.parallelize(List("dog","salmon","salmon","rat","elephant"),3)
res2: Array[String] = Array(dog, salmon, salmon, rat, elephant)val b = a.map(_.length)
res3: Array[Int] = Array(3, 6, 6, 3, 8)val c = a.zip(b)
res4: Array[(String, Int)] = Array((dog,3), (salmon,6), (salmon,6), (rat,3), (elephant,8))
Evaluates a boolean function for each data item of the RDD and puts the items for which the function returned true into the resulting RDD(返回一个新的RDD,该RDD由经过func函数计算后返回值为ture的输入元素组成)
def filter(f: T => Boolean): RDD[T]val a = sc.parallelize(1 to 10,3)
res6: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)val b = a.filter(_%2==0)
res5: Array[Int] = Array(2, 4, 6, 8, 10)
def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U]val b = a.flatMap(1 to_)
res7: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)val b = a.flatMap(1 to _)
res8: Array[Int] = Array(1, 1, 2, 1, 2, 3, 1, 2, 3, 4, 1, 2, 3, 4, 5, 1, 2, 3, 4, 5, 6, 1, 2, 3, 4, 5, 6, 7, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, 9, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10)val a = sc.parallelize(List(1,2,3))
res9: Array[Int] = Array(1, 2, 3)val b = a.flatMap(x=>List(x,x,x))
res10: Array[Int] = Array(1, 1, 1, 2, 2, 2, 3, 3, 3)
This is a specialized map that is called only once for each partition. The entire content of the respective partitions is available as a sequential stream of values via the input argument (Iterarator[T]). The custom function must return yet another Iterator[U]. The combined result iterators are automatically converted into a new RDD. (类似于map,但独立的在RDD的每一个分片上运行,因此在类型为T的RDD上运行时,func的函数类型必须是Iterator[T] =>Iterator[U])
def mapPartitions[U: ClassTag](f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]
参数说明:Iterator[T] 传入T类型的IteratorIterator[U] 返回U类型的IteratorpreservesPartitioning 是否保留父RDD的partitioner分区信息
//example
val a = sc.parallelize(1 to 9,3)
def myfunc[T](iter:Iterator[T]):Iterator[(T,T)]={| var res = List[(T,T)]()| var pre = while(iter.hasNext){| val cur = ;| res .::= (pre, cur)| pre = cur;| }| res.iterator| }
myfunc: [T](iter: Iterator[T])Iterator[(T, T)]
a.mapPartitions(myfunc).collect
res1: Array[(Int, Int)] = Array((2,3), (1,2), (5,6), (4,5), (8,9), (7,8))
Similar to mapPartitions, but takes two parameters. The first parameter is the index of the partition and the second is an iterator through all the items within this partition. The output is an iterator containing the list of items after applying whatever transformation the function encodes (与mapPartitions类似,但是需要两个参数,第一个参数是分区索引,第二个参数是一个迭代器用于遍历每个分区中的所有元素。输出的是一个迭代器)
def mapPartitionsWithIndex[U: ClassTag](f: (Int, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]
参数说明:Int 分区索引Iterator[T] 迭代器(遍历每个分区的元素)Iterator[U] 迭代器(结果)preservesPartitioning 是否保留父RDD partitioner信息
// example
val x = sc.parallelize(List(1,2,3,4,5,6,7,8,9,10),3)
def myfunc(index:Int,iter:Iterator[Int]):Iterator[String]={iter.map(x => index+"->"+x)}
myfunc: (index: Int, iter: Iterator[Int])Iterator[String]x.mapPartitionsWithIndex(myfunc).collectres0: Array[String] = Array(0->1, 0->2, 0->3, 1->4, 1->5, 1->6, 2->7, 2->8, 2->9, 2->10)
def ++(other: RDD[T]): RDD[T]
def union(other: RDD[T]): RDD[T]val a = sc.parallelize(1 to 3)
val b = sc.parallelize(5 to 7)val c = a.union(b)
c.collect
res2: Array[Int] = Array(1, 2, 3, 5, 6, 7)
Returns the elements in the two RDDs which are the same (对原RDD和参数RDD求交集后返回一个新的RDD)
发生shuffle
```scala
def intersection(other: RDD[T], numPartitions: Int): RDD[T]
def intersection(other: RDD[T], partitioner: Partitioner)(implicit ord: Ordering[T] = null): RDD[T]
def intersection(other: RDD[T]): RDD[T]
val x = sc.parallelize(1 to 20)
val y = sc.parallelize(10 to 30)
val z = x.intersection(y)
z.collect
res5: Array[Int] = Array(16, 14, 12, 18, 20, 10, 13, 19, 15, 11, 17)
Returns a new RDD that contains each unique value only once (对原RDD去重后返回一个新的RDD)
发生shuffle
```scala
def distinct(): RDD[T]
def distinct(numPartitions: Int): RDD[T]
numPartitions 默认为partitions.length,partitions为parent RDD的分区
val a = sc.parallelize(List(1,2,3,4,5,6,7,8,9,10))
a.distinct(2).partitions.length
res3: Int = 2
a.distinct(3).partitions.length
res4: Int = 3
def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C): RDD[(K, C)]
// createCombiner: V => C 将当前的V作为参数,返回的类型为C(相当于初始换操作)
// mergeValue: (C, V) => C 将元素V 合并到C(即createCombiner),(该函数作用于每个分区)
// mergeCombiners: (C, C) => C 将两个元素C合并(该函数作用于不同的分区)
def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, numPartitions: Int): RDD[(K, C)]
// numPartitions: Int 分区数
def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, partitioner: Partitioner, mapSideCombine: Boolean = true, serializerClass: String = null): RDD[(K, C)]
// partitioner: Partitioner 分区器
// mapSideCombine: Boolean = true 是否在map端进行合并(默认为true)// Example
val a = sc.parallelize(List("dog","cat","gnu","salmon","rabbit","turkey","wolf","bear","bee"),3)
val b = sc.parallelize(List(1,1,2,2,2,1,2,2,2), 3)
val c = b.zip(a)
c.collect
res0: Array[(Int, String)] = Array((1,dog), (1,cat), (2,gnu), (2,salmon), (2,rabbit), (1,turkey), (2,wolf), (2,bear), (2,bee))val d = cbineByKey(List(_),(x:List[String],y:String) => y :: x,(m:List[String],n:List[String]) => m ::: llect
res1: Array[(Int, List[String])] = Array((1,List(cat, dog, turkey)), (2,List(gnu, rabbit, salmon, bee, bear, wolf)))
// groupByKey() 调用了combinebyKey
def groupByKey(): RDD[(K, Iterable[V])]
K key
Iterable[V] value的迭代器
def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])]
numPartitions: Int 分区数
def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])]
partitioner: Partitioner 分区器// example
val a = sc.parallelize(List("dog","tiger","lion","cat","spider","eagle"),2)
val b = a.keyBy(_.length)
b: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[6] at keyBy at <console>:26
llect
res6: Array[(Int, Iterable[String])] = Array((4,CompactBuffer(lion)), (6,CompactBuffer(spider)), (3,CompactBuffer(dog, cat)), (5,CompactBuffer(tiger, eagle)))
def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.size): RDD[P]
//ascending: Boolean = true 默认为升序
//numPartitions 分区数(默认为父RDD的分区数)//example
val a = sc.parallelize(List("dog","cat","owl","gun","ant"),2)
val b = sc.parallelize(1 Int,2)
val c = a.zip(b)
c.sortByKey(true).collect
res12: Array[(String, Int)] = Array((ant,5), (cat,2), (dog,1), (gun,4), (owl,3))
先对每个分区的原属进行聚合,然后对所有分区的结果做聚合,聚合过程中使用的是给定的函数及初始值,返回一个与原RDD不同类型的RDD(需要一个合并RDD类型T到结果类型U 的函数,还需要一个合并类型U的函数,这两个函数都可以修改和返回它们的第一个参数)
def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U
//zeroValue: U 给定的初始值,作用于局部聚合的每个分区及全局聚合的每个分区
//seqOp: (U, T) 每个分区聚合函数
//combOp: (U, U) 全局聚合//example
val z = sc.parallelize(List(1,2,3,4,5,6),2)
def myfunc(index:Int,iter:Iterator[Int]):Iterator[String]={iter.map(x => ("[partID: " + index + ",val: " + x + "]"))}
myfunc: (index: Int, iter: Iterator[Int])Iterator[String]z.mapPartitionsWithIndex(myfunc).collect
res24: Array[String] = Array([partID: 0,val: 1], [partID: 0,val: 2], [partID: 0,val: 3], [partID: 1,val: 4], [partID: 1,val: 5], [partID: 1,val: 6])
z.aggregate(5)(math.max(_,_),_+_)
res26: Int = 16
//分析:初始值为5,partition 0 求最大值(5,1,2,3)得到最大值 5partition 1 求最大值(5,4,5,6)得到最大值 6全局聚合 5+5+6
作用于相同key的value,初始值只作用于局部聚合,而不作用于全局聚合
发生shuffle
// 底层调用了combineByKey
def aggregateByKey[U](zeroValue: U)(seqOp: (U, V) ⇒ U, combOp: (U, U) ⇒ U)(implicit arg0: ClassTag[U]): RDD[(K, U)]
// zeroValue: U 自定义的初始值
// (seqOp: (U, V) ⇒ U 局部聚合函数,返回类型为U类型
// combOp: (U, U) ⇒ U 全局聚合,返回类型为U类型
def aggregateByKey[U](zeroValue: U, numPartitions: Int)(seqOp: (U, V) ⇒ U, combOp: (U, U) ⇒ U)(implicit arg0: ClassTag[U]): RDD[(K, U)]
// numPartitions: Int 分区数
def aggregateByKey[U](zeroValue: U, partitioner: Partitioner)(seqOp: (U, V) ⇒ U, combOp: (U, U) ⇒ U)(implicit arg0: ClassTag[U]): RDD[(K, U)]
// partitioner: Partitioner 分区器// example
val pairRDD = sc.parallelize(List(("cat",2),("cat",5),("mouse",4),("cat",12),("dog",12),("mouse",2)),2)
def myfunc(index:Int,iter:Iterator[(String,Int)]):Iterator[String]={iter.map(x=>"[partID: "+ index + ",val: "+ x +"]")}
myfunc: (index: Int, iter: Iterator[(String, Int)])Iterator[String]
res1: Array[String] = Array([partID: 0,val: (cat,2)], [partID: 0,val: (cat,5)], [partID: 0,val: (mouse,4)], [partID: 1,val: (cat,12)], [partID: 1,val: (dog,12)], [partID: 1,val: (mouse,2)])pairRDD.aggregateByKey(0)(math.max(_,_),_+_).collect
res2: Array[(String, Int)] = Array((dog,12), (cat,17), (mouse,6))pairRDD.aggregateByKey(100)(math.max(_,_),_+_).collect
res3: Array[(String, Int)] = Array((dog,100), (cat,200), (mouse,200))
def reduceByKey(func: (V, V) => V): RDD[(K, V)]
//func: (V, V) 传入的func
def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)]
//numPartitions: Int 分区数
def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)]
//partitioner: Partitioner 分区器
val a = sc.parallelize(List("dog","cat","owl","gnu","ant"),duceByKey(_+_).collect
res8: Array[(Int, String)] = Array((3,dogcatowlgnuant))val a = sc.parallelize(List("dog","tiger","lion","cat","panther","eagle"))
b.reduceByKey(_+_).collect
res10: Array[(Int, String)] = Array((4,lion), (3,dogcat), (7,panther), (5,tigereagle))
This function sorts the input RDD’s data and stores it in a new RDD. The first parameter requires you to specify a function which maps the input data into the key that you want to sortBy. The second parameter (optional) specifies whether you want the data to be sorted in ascending or descending order (对输入的RDD的数据进行排序并将其存储在一个新的RDD中,第一个参数需要指定一个func,该函数将输入的数据映射到要排序的key,第二个参数(可选)指定排序规则)
发生fhuffle
def sortBy[K](f: (T) ⇒ K, ascending: Boolean = true, numPartitions: Int = this.partitions.size)(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T]
// f: (T) ⇒ K 指定的函数
// ascending: Boolean 排序规则(默认为升序)
// numPartitions: Int 分区数// example
val y = sc.parallelize(Array(5,7,1,3,2,1))y.sortBy(x => x,true).collect
res5: Array[Int] = Array(1, 1, 2, 3, 5, 7)y.sortBy(x => x,false).collect
res6: Array[Int] = Array(7, 5, 3, 2, 1, 1)val z = sc.parallelize(Array(("H",10),("A",26),("Z",1),("L",5)))z.sortBy(x => x._1,true).collect
res7: Array[(String, Int)] = Array((A,26), (H,10), (L,5), (Z,1))z.sortBy(x => x._2,true).collect
res8: Array[(String, Int)] = Array((Z,1), (L,5), (H,10), (A,26))
Performs an inner join using two key-value RDDs. Please note that the keys must be generally comparable to make this work. (在类型(K,V)和(K,W)的RDD上调用,返回一个相同的key对应的所有元素在一起的(K,(V,W))的RDD
发生shuffle
```scala
def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))]
def join[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, W))]
def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))]
val a = sc.parallelize(List(“dog”,“salmon”,“salmon”,“rat”,“elephant”))
val b = a.keyBy(_.length)
val c = sc.parallelize(List(“dog”,“cat”,“gnu”,“salmon”,“rabbit”,“turkey”,“wolf”,“bear”,“bee”))
val d = c.keyBy(_.length)
d.collect
res7: Array[(Int, String)] = Array((3,dog), (3,cat), (3,gnu), (6,salmon), (6,rabbit), (6,turkey), (4,wolf), (4,bear), (3,bee))
b.join(d).collect
res8: Array[(Int, (String, String))] = Array((6,(salmon,salmon)), (6,(salmon,rabbit)), (6,(salmon,turkey)), (6,(salmon,salmon)), (6,(salmon,rabbit)), (6,(salmon,turkey)), (3,(dog,dog)), (3,(dog,cat)), (3,(dog,gnu)), (3,(dog,bee)), (3,(rat,dog)), (3,(rat,cat)), (3,(rat,gnu)), (3,(rat,bee)))
def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))]
// other: RDD[(K, W)] 传入的RDD
// RDD[(K, (Iterable[V], Iterable[W]))] 返回的RDD(K:key,(Iterable[V], Iterable[W]) vaiue的迭代器
def cogroup[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Iterable[V], Iterable[W]))]
// numPartitions: Int 分区数
def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Iterable[V], Iterable[W]))]
// partitioner: Partitioner 分区器
def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)]): RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))]
def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], numPartitions: Int): RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))]
def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], partitioner: Partitioner): RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))]//exampie
val a = sc.parallelize(List(1,2,1,3),1)
val b = a.map((_,"b"))b.collect
res9: Array[(Int, String)] = Array((1,b), (2,b), (1,b), (3,b))
val c = a.map((_,"c"))
c.collect
res10: Array[(Int, String)] = Array((1,c), (2,c), (1,c), (3,c))
val d = a.map((_,"d"))
d.collect
res12: Array[(Int, String)] = Array((1,d), (2,d), (1,d), (3,d))b.cogroup(c).collect
res11: Array[(Int, (Iterable[String], Iterable[String]))] = Array((1,(CompactBuffer(b, b),CompactBuffer(c, c))), (3,(CompactBuffer(b),CompactBuffer(c))), (2,(CompactBuffer(b),CompactBuffer(c))))b.cogroup(c,d).collect
res13: Array[(Int, (Iterable[String], Iterable[String], Iterable[String]))] = Array((1,(CompactBuffer(b, b),CompactBuffer(c, c),CompactBuffer(d, d))), (3,(CompactBuffer(b),CompactBuffer(c),CompactBuffer(d))), (2,(CompactBuffer(b),CompactBuffer(c),CompactBuffer(d))))
//-----------------val x = sc.parallelize(List((1,"apple"),(2,"banana"),(3,"orange"),(4,"kiwi")),2)
val y = sc.parallelize(List((5,"computer"),(1,"laptop"),(1,"desktop"),(4,"iPad")),2)
x.cogroup(y).collect
res15: Array[(Int, (Iterable[String], Iterable[String]))] = Array((4,(CompactBuffer(kiwi),CompactBuffer(iPad))), (2,(CompactBuffer(banana),CompactBuffer())), (1,(CompactBuffer(apple),CompactBuffer(laptop, desktop))), (3,(CompactBuffer(orange),CompactBuffer())), (5,(CompactBuffer(),CompactBuffer(computer))))
合并分区(coalesce:多—>少,窄依赖),
def coalesce ( numPartitions : Int , shuffle : Boolean = false ): RDD [T]
// numPartitions : Int 分区数
// shuffle : Boolean 是否发生shuffle,默认为false
def repartition ( numPartitions : Int ): RDD [T]
// numPartitions : Int 分区数,发生shuffleval y = sc.parallelize(1 to 10,5)
y.partitions.size
res16: Int = 5
val z = y.coalesce(2,false)
z.partitions.size
res17: Int = 2
根据给定的分区器对RDD重新分区,在给定的partitioner内部进行排序,性能比repartition要高
def repartitionAndSortWithinPartitions(partitioner: Partitioner): RDD[(K, V)]
// partitioner: Partitioner 给定的分区器
//example
val randRDD = sc.parallelize(List((2,"cat"),(6,"mouse"),(7,"cup"),(3,"book"),(4,"tv"),(1,"screen"),(5,"heater")),3)
val rPartitioner = new org.apache.spark.RangePartitioner(3,randRDD)
val partitioned = randRDD.partitionBy(rPartitioner)def myfunc(index:Int,iter:Iterator[(Int,String)]):Iterator[String]={iter.map(x => "[partID:"+index+",val:"+x+"]")}
partitioned.mapPartitionsWithIndex(myfunc).collect
res18: Array[String] = Array([partID:0,val:(2,cat)], [partID:0,val:(3,book)], [partID:0,val:(1,screen)], [partID:1,val:(4,tv)], [partID:1,val:(5,heater)], [partID:2,val:(6,mouse)], [partID:2,val:(7,cup)])val partitioned = partitionAndSortWithinPartitions(rPartitioner)
partitioned.mapPartitionsWithIndex(myfunc).collect
res19: Array[String] = Array([partID:0,val:(1,screen)], [partID:0,val:(2,cat)], [partID:0,val:(3,book)], [partID:1,val:(4,tv)], [partID:1,val:(5,heater)], [partID:2,val:(6,mouse)], [partID:2,val:(7,cup)])
Constructs two-component tuples (key-value pairs) by applying a function on each data item. The result of the function becomes the key and the original data item becomes the value of the newly created tuples. (使用func为RDD每一个元素创建一个key-value对元素,函数的结果成为键,原始数据项成为新创建元组的值
)
def keyBy[K](f: T => K): RDD[(K, T)
val a = sc.parallelize(List("dog","salmon","salmon","rat","elephant"))
val b = a.keyBy(_.llect
res6: Array[(Int, String
接收一个函数,这个函数的返回值作为key,然后根据这个key来对里面的元素进行分组
发生shuffle
```scala
def groupBy[K: ClassTag](f: T => K): RDD[(K, Iterable[T])]
def groupBy[K: ClassTag](f: T => K, numPartitions: Int): RDD[(K, Iterable[T])]
def groupBy[K: ClassTag](f: T => K, p: Partitioner): RDD[(K, Iterable[T])]
val a = sc.parallelize(1 to 9)
a.groupBy(x=>{if(x%2==0) “even” else “odd”}).collect
res10: Array[(String, Iterable[Int])] = Array((even,CompactBuffer(2, 4, 6, 8)), (odd,CompactBuffer(1, 3, 5, 7, 9)))
val a = sc.parallelize(1 to 9)
def myfunc(a:Int):Int={a%2}
myfunc: (a: Int)Int
a.groupBy(myfunc).collect
res11: Array[(Int, Iterable[Int])] = Array((0,CompactBuffer(2, 4, 6, 8)), (1,CompactBuffer(1, 3, 5, 7, 9)))
def myfunc(a:Int):Double={a%2}
a.groupBy(myfunc).collect
res12: Array[(Double, Iterable[Int])] = Array((0.0,CompactBuffer(2, 4, 6, 8)), (1.0,CompactBuffer(1, 3, 5, 7, 9)))
val a = sc.parallelize(1 to 9, 3)
def myfunc(a: Int) : Int =
| {
| a % 2
| }
myfunc: (a: Int)Int
a.groupBy(x => myfunc(x), 3).collect
res13: Array[(Int, Iterable[Int])] = Array((0,CompactBuffer(2, 4, 6, 8)), (1,CompactBuffer(1, 3, 5, 7, 9)))
##### 22 combineByKey[Pair] - [ ] 用于将[K,V]型的RDD转换为[K,C]型的RDD,V和C可以相同也可以不同```scala
def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C): RDD[(K, C)]
// createCombiner 组合器函数,用于将V类型转换为C类型,输入参数为RDD[K,V]中的V,输出为C
// mergeValue 合并值函数,将一个C类型和一个V类型值合并成一个C类型,输入类型为(C, V),输出类型为C
// mergeCombiners 合并器函数,将两个C类型值合并为一个C类型,输入为(C, C),输出为C
def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, numPartitions: Int): RDD[(K, C)]
// numPartitions: Int 结果的分区数,默认保持原有的分区数
def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, partitioner: Partitioner, mapSideCombine: Boolean = true, serializerClass: String = null): RDD[(K, C)]
// partitioner 分区器,默认为HashPartitioner
// mapSideCombine 是否在map端进行combin操作,默认为true// example
val a = sc.parallelize(List("dog","cat","gnu","salmon","rabbit","turkey","wolf","bear","bee"),3)
val b = sc.parallelize(List(1,1,2,2,2,1,2,2,2),3)
val c = b.zip(a)
res0: Array[(Int, String)] = Array((1,dog), (1,cat), (2,gnu), (2,salmon), (2,rabbit), (1,turkey), (2,wolf), (2,bear), (2,bee))
val d = cbineByKey(List(_),(x:List[String],y:String)=>y :: x,(x:List[String],y:List[String])=> x ::: y)
d.collect
res22: Array[(Int, List[String])] = Array(
(1,List(cat, dog, turkey)),
(2,List(gnu, rabbit, salmon, bee, bear, wolf)))
def countByKey(): Map[K, Long]
// 返回一个Map[K,long] K:RDD中的key,value:相同key所对应的value的个数
//example
val c = sc.parallelize(List((3,"gun"),(3,"yak"),(5,"mouse"),(3,"dog")),2)
c.countByKey
res3: llection.Map[Int,Long] = Map(3 -> 3, 5 -> 1)
def reduce(f: (T, T) => T): T
// f: (T, T) 函数的参数列表(T,T),函数的返回值类型 T
//example
val a = sc.parallelize(1 to 10,3)
a.reduce(_+_)
res6: Int = 55
将RDD转换为Scala数组并返回
def collect(): Array[T]
Array[T] 返回结果
def collect[U: ClassTag](f: PartialFunction[T, U]): RDD[U]
// f: PartialFunction[T, U]): RDD[U] 偏函数,输入类型为T,结果为RDD[U]
def toArray(): Array[T]
// example
val c = sc.parallelize(List("Gnu", "Cat", "Rat", "Dog", "Gnu", "Rat"), 2)
c.collect
res8: Array[String] = Array(Gnu, Cat, Rat, Dog, Gnu, Rat)
def count(): Long
// 计数
// example
val c = sc.parallelize(List("Gnu", "Cat", "Rat", "Dog"), 2)
c.count
res9: Long = 4
def first(): T
// example
val c = sc.parallelize(List("Gnu", "Cat", "Rat", "Dog"), 2)
c.first
res10: String = Gnu
def take(num: Int): Array[T]
// num: Int 传入的参数,即要获取的前多少个元素
// Array[T] 返回结果
val b = sc.parallelize(List("dog", "cat", "ape", "salmon", "gnu"), 2)
b.take(2)
res11: Array[String] = Array(dog, cat)val b = sc.parallelize(1 to 10000,5000)
b.take(10)
res13: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
通过隐式排序函数对RDD的元素进行排序,并获取前n个元素
def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T]
// num: Int 获取的元素的个数
// Array[T] 返回的结果
// example
val b = sc.parallelize(List("dog", "cat", "ape", "salmon", "gnu"), 2)
b.takeOrdered(3)
res14: Array[String] = Array(ape, cat, dog)
def foreach(f: T => Unit)
// 传入的类型为T类型
// 返回类型为 Unit
本文发布于:2024-01-28 15:14:18,感谢您对本站的认可!
本文链接:https://www.4u4v.net/it/17064260648320.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
留言与评论(共有 0 条评论) |