首頁(yè)技術(shù)文章正文

RDD行動(dòng)算子API講解【黑馬程序員】

更新時(shí)間:2021-04-28 來(lái)源:黑馬程序員 瀏覽量:

1577370495235_學(xué)IT就到黑馬程序員.gif


行動(dòng)算子主要是將在數(shù)據(jù)集上運(yùn)行計(jì)算后的數(shù)值返回到驅(qū)動(dòng)程序,從而觸發(fā)真正的計(jì)算。下面,列舉一些常用的行動(dòng)算子API,如表1所示。

表1 常用的行動(dòng)算子API

行動(dòng)算子相關(guān)說明
count()返回?cái)?shù)據(jù)集中的元素個(gè)數(shù)                                                                                     
first()返回?cái)?shù)組的第一個(gè)元素
take(n)以數(shù)組的形式返回?cái)?shù)組集中的前n個(gè)元素
reduce(func)通過函數(shù)func(輸入兩個(gè)參數(shù)并返回一個(gè)值)聚合數(shù)據(jù)集中的元素
collect()以數(shù)組的形式返回?cái)?shù)據(jù)集中的所有元素
foreach(func)將數(shù)據(jù)集中的每個(gè)元素傳遞到函數(shù)func中運(yùn)行

下面,結(jié)合具體的示例對(duì)這些行動(dòng)算子API進(jìn)行詳細(xì)講解。

  • count()

count()主要用于返回?cái)?shù)據(jù)集中的元素個(gè)數(shù)。假設(shè),現(xiàn)有一個(gè)arrRdd,如果要統(tǒng)計(jì)arrRdd元素的個(gè)數(shù),示例代碼如下:

  scala> val arrRdd=sc.parallelize(Array(1,2,3,4,5))
  arrRdd: org.apache.spark.rdd.RDD[Int]=
ParallelcollectionRDD[0] at parallelize at <console>:24
  scala> arrRdd.count()
  res0: Long = 5


1619589844943_RDD行動(dòng)算子.jpg


上述代碼中,第1行代碼創(chuàng)建了一個(gè)RDD對(duì)象,當(dāng)arrRdd調(diào)用count()操作后,返回的結(jié)果是5,說明成功獲取到了RDD數(shù)據(jù)集的元素。值得一提的是,可以將第一行代碼分解成下面兩行代碼,具體如下:

val arr = Array(1,2,3,4,5)
val arrRdd = sc.parallelize(arr)

上述代碼中,第1行代碼創(chuàng)建了一個(gè)RDD對(duì)象,當(dāng)arrRdd調(diào)用count()操作后,返回的結(jié)果是5,說明成功獲取到了RDD數(shù)據(jù)集的元素。值得一提的是,可以將第一行代碼分解成下面兩行代碼,具體如下:

val arr = Array(1,2,3,4,5)
val arrRdd = sc.parallelize(arr)
  • first()

first()主要用于返回?cái)?shù)組的第一個(gè)元素。現(xiàn)有一個(gè)arrRdd,如果要獲取arrRdd中第一個(gè)元素,示例代碼如下:

  scala> val arrRdd=sc.parallelize(Array(1,2,3,4,5))
  arrRdd: org.apache.spark.rdd.RDD[Int]=
ParallelcollectionRDD[0] at parallelize at <console>:24
  scala> arrRdd.first()
  res1: Int = 1

從上述結(jié)果可以看出,當(dāng)執(zhí)行arrRdd.first()操作后返回的結(jié)果是1,說明成功獲取到了第1個(gè)元素。

  • take(n)

take()主要用于以數(shù)組的形式返回?cái)?shù)組集中的前n個(gè)元素。現(xiàn)有一個(gè)arrRdd,如果要獲取arrRdd中的前三個(gè)元素,示例代碼如下:

 scala> val arrRdd =sc.parallelize(Array(1,2,3,4,5))
  arrRdd: org.apache.spark.rdd.RDD[Int]=
ParallelcollectionRDD[0] at parallelize  at <console>:24
  scala> arrRdd.take(3)
  res2: Array[Int]=Array(1,2,3)

從上述代碼可以看出,執(zhí)行arrRdd.take(3)操作后返回的結(jié)果是Array(1,2,3),說明成功獲取到了RDD數(shù)據(jù)集的前3個(gè)元素。

  • reduce(func)

reduce()主要用于通過函數(shù)func(輸入兩個(gè)參數(shù)并返回一個(gè)值)聚合數(shù)據(jù)集中的元素?,F(xiàn)有一個(gè)arrRdd,如果要對(duì)arrRdd中的元素進(jìn)行聚合,示例代碼如下:

 scala> val arrRdd =sc.parallelize(Array(1,2,3,4,5))
  arrRdd: org.apache.spark.rdd.RDD[Int]=
ParallelcollectionRDD[0] at parallelize at <console>:24
  scala> arrRdd.reduce((a,b)=>a+b)
  res3: Int = 15

在上述代碼中,執(zhí)行arrRdd.reduce((a,b)=>a+b)操作后返回的結(jié)果是15,說明成功的將RDD數(shù)據(jù)集中的所有元素進(jìn)行求和,結(jié)果為15。

  • collect()

collect()主要用于以數(shù)組的形式返回?cái)?shù)據(jù)集中的所有元素?,F(xiàn)有一個(gè)rdd,如果希望rdd中的元素以數(shù)組的形式輸出,示例代碼如下:

  scala> val arrRdd =sc.parallelize(Array(1,2,3,4,5))
  arrRdd: org.apache.spark.rdd.RDD[Int]=
ParallelcollectionRDD[0] at parallelize at <console>:24
  scala> arrRdd.collect()
  res4: Array[Int] = Array(1,2,3,4,5)

在上述代碼中,執(zhí)行arrRdd.collect()操作后返回的結(jié)果是Array(1,2,3,4,5),說明成功的將RDD數(shù)據(jù)集中的元素以數(shù)組的形式輸出。

  • foreach(func)

foreach()主要用于將數(shù)據(jù)集中的每個(gè)元素傳遞到函數(shù)func中運(yùn)行。現(xiàn)有一個(gè)arrRdd,如果希望遍歷輸出arrRdd中的元素,示例代碼如下:

 scala> val arrRdd =sc.parallelize(Array(1,2,3,4,5))
  arrRdd: org.apache.spark.rdd.RDD[Int]=
ParallelcollectionRDD[0] at parallelize at <console>:24
  scala> arrRdd.foreach(x => println(x))  1
  2
  3
  4
  5

在上述代碼中,foreach(x => println(x))的含義是依次遍歷arrRdd中的每一個(gè)元素,把當(dāng)前遍歷的元素賦值給變量x,并且通過println(x)打印出x的值。執(zhí)行arrRdd.foreach()操作后,arrRdd中的元素被依次輸出了(即RDD數(shù)據(jù)集中所有的元素被遍歷輸出)。這里的arrRdd.foreach(x => println(x))可以簡(jiǎn)寫為arrRdd.foreach(println)。




猜你喜歡:

RDD轉(zhuǎn)換算子API過程演示【大數(shù)據(jù)文章】

RDD是如何操作數(shù)據(jù)轉(zhuǎn)換的?RDD轉(zhuǎn)換算子API示例

DataFrame是什么?與RDD有什么區(qū)別?

RDD為什么要進(jìn)行數(shù)據(jù)持久化?它的操作方法有哪些?

黑馬程序員Python+大數(shù)據(jù)開發(fā)培訓(xùn)

分享到:
在線咨詢 我要報(bào)名
和我們?cè)诰€交談!