val rdd = sc makeRDD(List(( "Tom ",100),( " ">
IT培訓(xùn)網(wǎng)
IT在線學(xué)習(xí)
1. mapValues
mapValues算子 ,作用于 [K,V] 格式的RDD上,并且只對(duì)V(Value)進(jìn)行操作,Key值保持不變。
(1)將[K,V] 格式的List轉(zhuǎn)換為[K,V] 格式的RDD。
scala> val rdd = sc.makeRDD(List(("Tom",100),("Mike",80)))
rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[2] at makeRDD at :24
(2)使用mapValues算子,將value值乘以100,key值保持不變
scala> val rdd2=rdd.mapValues(_*100)
rdd2: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[1] at mapValues at :26
(3)使用collect算子回收,查看結(jié)果
scala> rdd2.collect
res0: Array[(String, Int)] = Array((Tom,10000), (Mike,8000))
2. mapPartitions
作用于RDD上的每一個(gè)分區(qū),傳遞的函數(shù)相當(dāng)于一個(gè)迭代器,有幾個(gè)分區(qū),就會(huì)迭代幾次。
object Test1 {
def main(args: Array[String]): Unit = {
val conf=new SparkConf()
.setMaster("local[*]")
.setAppName(this.getClass.getSimpleName)
val sc=new SparkContext(conf)
val rdd=sc.makeRDD(List(1,2,3,4,5,6),3);
val values: RDD[Int] = rdd.mapPartitions(t => {
t.map(_ * 10)
})
//打印輸出結(jié)果
values.foreach(println)
}
}
使用上面的代碼進(jìn)行測(cè)試。輸出結(jié)果如下:
可以看到,因?yàn)樵O(shè)置了3個(gè)分區(qū),所以相應(yīng)啟動(dòng)了3個(gè)任務(wù),在每個(gè)分區(qū)上進(jìn)行迭代計(jì)算。
3. filter
filter算子過濾出所有的滿足條件的元素。
另外fliter算子不會(huì)改變分區(qū)的數(shù)量,所以經(jīng)過過濾后,即使某些分區(qū)沒有數(shù)據(jù)了,但是分區(qū)依然存在的。
scala> val rdd1 = sc.makeRDD(List(1,2,3,4,5,6),3)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at makeRDD at :24
scala> val rdd2 = rdd1.filter(_>3)
rdd2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[3] at filter at :26
scala> rdd2.partitions.size
res3: Int = 3
4. sortBy
sortBy算子按照指定條件進(jìn)行排序。
我們使用下面的代碼進(jìn)行測(cè)試:
object Test2 {
def main(args: Array[String]): Unit = {
val conf=new SparkConf()
.setMaster("local[*]")
.setAppName(this.getClass.getSimpleName)
val sc=new SparkContext(conf)
val rdd: RDD[(String, Int)] = sc.makeRDD(List(("Tom", 80), ("Mike", 90), ("Mary", 85),("John",60)))
//按value值升序排列
val res1: RDD[(String, Int)] = rdd.sortBy(_._2)
res1.collect.foreach(println)
// 按value值降序排列
val res2: RDD[(String, Int)] = rdd.sortBy(_._2, false)
res2.collect.foreach(println)
}
}
升序輸出的結(jié)果如下:
降序輸出的結(jié)果如下:
有一點(diǎn)需要說明的是,輸出結(jié)果前,要使用collect算子把結(jié)果回收到本地。因?yàn)閿?shù)據(jù)是分散在集群各節(jié)點(diǎn)的,如果不回收,看到的結(jié)果可能是不正確的。
>>本文地址:http://m.hqfphsz.com/zhuanye/2021/69463.html
聲明:本站稿件版權(quán)均屬中公教育優(yōu)就業(yè)所有,未經(jīng)許可不得擅自轉(zhuǎn)載。
1 您的年齡
2 您的學(xué)歷
3 您更想做哪個(gè)方向的工作?