Chapter5-廈門大學-林子雨-Spark編程基礎(chǔ)-第5章-RDD編程_第1頁
Chapter5-廈門大學-林子雨-Spark編程基礎(chǔ)-第5章-RDD編程_第2頁
Chapter5-廈門大學-林子雨-Spark編程基礎(chǔ)-第5章-RDD編程_第3頁
Chapter5-廈門大學-林子雨-Spark編程基礎(chǔ)-第5章-RDD編程_第4頁
Chapter5-廈門大學-林子雨-Spark編程基礎(chǔ)-第5章-RDD編程_第5頁
已閱讀5頁,還剩99頁未讀, 繼續(xù)免費閱讀

下載本文檔

版權(quán)說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請進行舉報或認領(lǐng)

文檔簡介

第5章RDD編程

提綱5.1RDD編程基礎(chǔ)5.2鍵值對RDD5.3數(shù)據(jù)讀寫5.4WordCount程序解析5.6綜合案例5.1RDD編程基礎(chǔ)5.1.1RDD創(chuàng)建5.1.2RDD操作5.1.3持久化5.1.4分區(qū)5.1.5一個綜合實例5.1.1RDD創(chuàng)建1.從文件系統(tǒng)中加載數(shù)據(jù)創(chuàng)建RDD2.通過并行集合(數(shù)組)創(chuàng)建RDD5.1.1RDD創(chuàng)建Spark采用textFile()方法來從文件系統(tǒng)中加載數(shù)據(jù)創(chuàng)建RDD該方法把文件的URI作為參數(shù),這個URI可以是:本地文件系統(tǒng)的地址或者是分布式文件系統(tǒng)HDFS的地址或者是AmazonS3的地址等等1.從文件系統(tǒng)中加載數(shù)據(jù)創(chuàng)建RDD5.1.1RDD創(chuàng)建scala>vallines=sc.textFile("file:///usr/local/spark/mycode/rdd/word.txt")lines:org.apache.spark.rdd.RDD[String]=file:///usr/local/spark/mycode/rdd/word.txtMapPartitionsRDD[12]attextFileat<console>:27圖

從文件中加載數(shù)據(jù)生成RDD(1)從本地文件系統(tǒng)中加載數(shù)據(jù)創(chuàng)建RDD5.1.1RDD創(chuàng)建scala>vallines=sc.textFile("hdfs://localhost:9000/user/hadoop/word.txt")scala>vallines=sc.textFile("/user/hadoop/word.txt")scala>vallines=sc.textFile("word.txt")(2)從分布式文件系統(tǒng)HDFS中加載數(shù)據(jù)三條語句是完全等價的,可以使用其中任意一種方式5.1.1RDD創(chuàng)建可以調(diào)用SparkContext的parallelize方法,在Driver中一個已經(jīng)存在的集合(數(shù)組)上創(chuàng)建。scala>valarray=Array(1,2,3,4,5)array:Array[Int]=Array(1,2,3,4,5)scala>valrdd=sc.parallelize(array)rdd:org.apache.spark.rdd.RDD[Int]=ParallelCollectionRDD[13]atparallelizeat<console>:29或者,也可以從列表中創(chuàng)建:scala>vallist=List(1,2,3,4,5)list:List[Int]=List(1,2,3,4,5)scala>valrdd=sc.parallelize(list)rdd:org.apache.spark.rdd.RDD[Int]=ParallelCollectionRDD[14]atparallelizeat<console>:29圖

從數(shù)組創(chuàng)建RDD示意圖2.通過并行集合(數(shù)組)創(chuàng)建RDD5.1.2RDD操作1.轉(zhuǎn)換操作2.行動操作3.惰性機制5.1.2RDD操作對于RDD而言,每一次轉(zhuǎn)換操作都會產(chǎn)生不同的RDD,供給下一個“轉(zhuǎn)換”使用轉(zhuǎn)換得到的RDD是惰性求值的,也就是說,整個轉(zhuǎn)換過程只是記錄了轉(zhuǎn)換的軌跡,并不會發(fā)生真正的計算,只有遇到行動操作時,才會發(fā)生真正的計算,開始從血緣關(guān)系源頭開始,進行物理的轉(zhuǎn)換操作動作轉(zhuǎn)換轉(zhuǎn)換轉(zhuǎn)換轉(zhuǎn)換轉(zhuǎn)換創(chuàng)建創(chuàng)建1.轉(zhuǎn)換操作5.1.2RDD操作操作含義filter(func)篩選出滿足函數(shù)func的元素,并返回一個新的數(shù)據(jù)集map(func)將每個元素傳遞到函數(shù)func中,并將結(jié)果返回為一個新的數(shù)據(jù)集flatMap(func)與map()相似,但每個輸入元素都可以映射到0或多個輸出結(jié)果groupByKey()應(yīng)用于(K,V)鍵值對的數(shù)據(jù)集時,返回一個新的(K,Iterable)形式的數(shù)據(jù)集reduceByKey(func)應(yīng)用于(K,V)鍵值對的數(shù)據(jù)集時,返回一個新的(K,V)形式的數(shù)據(jù)集,其中每個值是將每個key傳遞到函數(shù)func中進行聚合后的結(jié)果表

常用的RDD轉(zhuǎn)換操作API5.1.2RDD操作filter(func)scala>vallines=sc.textFile(file:///usr/local/spark/mycode/rdd/word.txt)scala>vallinesWithSpark=lines.filter(line=>line.contains("Spark"))圖filter()操作實例執(zhí)行過程示意圖5.1.2RDD操作map(func)map(func)操作將每個元素傳遞到函數(shù)func中,并將結(jié)果返回為一個新的數(shù)據(jù)集scala>data=Array(1,2,3,4,5)scala>valrdd1=sc.parallelize(data)scala>valrdd2=rdd1.map(x=>x+10)圖map()操作實例執(zhí)行過程示意圖5.1.2RDD操作map(func)scala>vallines=sc.textFile("file:///usr/local/spark/mycode/rdd/word.txt")scala>valwords=lines.map(line=>line.split(""))另外一個實例圖map()操作實例執(zhí)行過程示意圖5.1.2RDD操作flatMap(func)scala>vallines=sc.textFile("file:///usr/local/spark/mycode/rdd/word.txt")scala>valwords=lines.flatMap(line=>line.split(""))圖flatMap()操作實例執(zhí)行過程示意圖5.1.2RDD操作groupByKey()groupByKey()應(yīng)用于(K,V)鍵值對的數(shù)據(jù)集時,返回一個新的(K,Iterable)形式的數(shù)據(jù)集圖groupByKey()操作實例執(zhí)行過程示意圖5.1.2RDD操作reduceByKey(func)reduceByKey(func)應(yīng)用于(K,V)鍵值對的數(shù)據(jù)集時,返回一個新的(K,V)形式的數(shù)據(jù)集,其中的每個值是將每個key傳遞到函數(shù)func中進行聚合后得到的結(jié)果圖reduceByKey()操作實例執(zhí)行過程示意圖5.1.2RDD操作行動操作是真正觸發(fā)計算的地方。Spark程序執(zhí)行到行動操作時,才會執(zhí)行真正的計算,從文件中加載數(shù)據(jù),完成一次又一次轉(zhuǎn)換操作,最終,完成行動操作得到結(jié)果。操作含義count()返回數(shù)據(jù)集中的元素個數(shù)collect()以數(shù)組的形式返回數(shù)據(jù)集中的所有元素first()返回數(shù)據(jù)集中的第一個元素take(n)以數(shù)組的形式返回數(shù)據(jù)集中的前n個元素reduce(func)通過函數(shù)func(輸入兩個參數(shù)并返回一個值)聚合數(shù)據(jù)集中的元素foreach(func)將數(shù)據(jù)集中的每個元素傳遞到函數(shù)func中運行表

常用的RDD行動操作API2.行動操作5.1.2RDD操作scala>valrdd=sc.parallelize(Array(1,2,3,4,5))rdd:org.apache.spark.rdd.RDD[Int]=ParallelCollectionRDD[1]atparallelizeat<console>:24scala>rdd.count()res0:Long=5scala>rdd.first()res1:Int=1scala>rdd.take(3)res2:Array[Int]=Array(1,2,3)scala>rdd.reduce((a,b)=>a+b)res3:Int=15scala>rdd.collect()res4:Array[Int]=Array(1,2,3,4,5)scala>rdd.foreach(elem=>println(elem))123455.1.3惰性機制scala>vallines=sc.textFile("data.txt")scala>vallineLengths=lines.map(s=>s.length)scala>valtotalLength=lineLengths.reduce((a,b)=>a+b)所謂的“惰性機制”是指,整個轉(zhuǎn)換過程只是記錄了轉(zhuǎn)換的軌跡,并不會發(fā)生真正的計算,只有遇到行動操作時,才會觸發(fā)“從頭到尾”的真正的計算。這里給出一段簡單的語句來解釋Spark的惰性機制。5.1.4持久化在Spark中,RDD采用惰性求值的機制,每次遇到行動操作,都會從頭開始執(zhí)行計算。每次調(diào)用行動操作,都會觸發(fā)一次從頭開始的計算。這對于迭代計算而言,代價是很大的,迭代計算經(jīng)常需要多次重復(fù)使用同一組數(shù)據(jù)下面就是多次計算同一個RDD的例子:scala>vallist=List("Hadoop","Spark","Hive")list:List[String]=List(Hadoop,Spark,Hive)scala>valrdd=sc.parallelize(list)rdd:org.apache.spark.rdd.RDD[String]=ParallelCollectionRDD[22]atparallelizeat<console>:29scala>println(rdd.count())//行動操作,觸發(fā)一次真正從頭到尾的計算3scala>println(rdd.collect().mkString(","))//行動操作,觸發(fā)一次真正從頭到尾的計算Hadoop,Spark,Hive5.1.3持久化可以通過持久化(緩存)機制避免這種重復(fù)計算的開銷可以使用persist()方法對一個RDD標記為持久化之所以說“標記為持久化”,是因為出現(xiàn)persist()語句的地方,并不會馬上計算生成RDD并把它持久化,而是要等到遇到第一個行動操作觸發(fā)真正計算以后,才會把計算結(jié)果進行持久化持久化后的RDD將會被保留在計算節(jié)點的內(nèi)存中被后面的行動操作重復(fù)使用5.1.3持久化persist()的圓括號中包含的是持久化級別參數(shù):可以使用unpersist()方法手動地把持久化的RDD從緩存中移除persist(MEMORY_ONLY):表示將RDD作為反序列化的對象存儲于JVM中,如果內(nèi)存不足,就要按照LRU原則替換緩存中的內(nèi)容persist(MEMORY_AND_DISK)表示將RDD作為反序列化的對象存儲在JVM中,如果內(nèi)存不足,超出的分區(qū)將會被存放在硬盤上一般而言,使用cache()方法時,會調(diào)用persist(MEMORY_ONLY)5.1.3持久化scala>vallist=List("Hadoop","Spark","Hive")list:List[String]=List(Hadoop,Spark,Hive)scala>valrdd=sc.parallelize(list)rdd:org.apache.spark.rdd.RDD[String]=ParallelCollectionRDD[22]atparallelizeat<console>:29scala>rdd.cache()//會調(diào)用persist(MEMORY_ONLY),但是,語句執(zhí)行到這里,并不會緩存rdd,因為這時rdd還沒有被計算生成scala>println(rdd.count())//第一次行動操作,觸發(fā)一次真正從頭到尾的計算,這時上面的rdd.cache()才會被執(zhí)行,把這個rdd放到緩存中3scala>println(rdd.collect().mkString(","))//第二次行動操作,不需要觸發(fā)從頭到尾的計算,只需要重復(fù)使用上面緩存中的rddHadoop,Spark,Hive針對上面的實例,增加持久化語句以后的執(zhí)行過程如下:5.1.4分區(qū)RDD是彈性分布式數(shù)據(jù)集,通常RDD很大,會被分成很多個分區(qū),分別保存在不同的節(jié)點上圖RDD分區(qū)被保存到不同節(jié)點上1.分區(qū)的作用(1)增加并行度5.1.4分區(qū)1.分區(qū)的作用(2)減少通信開銷圖

未分區(qū)時對UserData和Events兩個表進行連接操作5.1.4分區(qū)圖

采用分區(qū)以后對UserData和Events兩個表進行連接操作1.分區(qū)的作用(2)減少通信開銷5.1.4分區(qū)2.RDD分區(qū)原則RDD分區(qū)的一個原則是使得分區(qū)的個數(shù)盡量等于集群中的CPU核心(core)數(shù)目

對于不同的Spark部署模式而言(本地模式、Standalone模式、YARN模式、Mesos模式),都可以通過設(shè)置spark.default.parallelism這個參數(shù)的值,來配置默認的分區(qū)數(shù)目,一般而言:

*本地模式:默認為本地機器的CPU數(shù)目,若設(shè)置了local[N],則默認為N

*ApacheMesos:默認的分區(qū)數(shù)為8

*Standalone或YARN:在“集群中所有CPU核心數(shù)目總和”和“2”二者中取較大值作為默認值5.1.4分區(qū)3.設(shè)置分區(qū)的個數(shù)(1)創(chuàng)建RDD時手動指定分區(qū)個數(shù)在調(diào)用textFile()和parallelize()方法的時候手動指定分區(qū)個數(shù)即可,語法格式如下:sc.textFile(path,partitionNum)其中,path參數(shù)用于指定要加載的文件的地址,partitionNum參數(shù)用于指定分區(qū)個數(shù)。

scala>valarray=Array(1,2,3,4,5)scala>valrdd=sc.parallelize(array,2)//設(shè)置兩個分區(qū)5.1.4分區(qū)3.設(shè)置分區(qū)的個數(shù)(2)使用reparititon方法重新設(shè)置分區(qū)個數(shù)通過轉(zhuǎn)換操作得到新RDD時,直接調(diào)用repartition方法即可。例如:scala>valdata=sc.textFile("file:///usr/local/spark/mycode/rdd/word.txt",2)data:org.apache.spark.rdd.RDD[String]=file:///usr/local/spark/mycode/rdd/word.txtMapPartitionsRDD[12]attextFileat<console>:24scala>data.partitions.size//顯示data這個RDD的分區(qū)數(shù)量res2:Int=2scala>valrdd=data.repartition(1)//對data這個RDD進行重新分區(qū)rdd:org.apache.spark.rdd.RDD[String]=MapPartitionsRDD[11]atrepartitionat:26scala>rdd.partitions.sizeres4:Int=15.1.4分區(qū)4.自定義分區(qū)方法Spark提供了自帶的HashPartitioner(哈希分區(qū))與RangePartitioner(區(qū)域分區(qū)),能夠滿足大多數(shù)應(yīng)用場景的需求。與此同時,Spark也支持自定義分區(qū)方式,即通過提供一個自定義的Partitioner對象來控制RDD的分區(qū)方式,從而利用領(lǐng)域知識進一步減少通信開銷。要實現(xiàn)自定義分區(qū),需要定義一個類,這個自定義類需要繼承org.apache.spark.Partitioner類,并實現(xiàn)下面三個方法:numPartitions:Int返回創(chuàng)建出來的分區(qū)數(shù)getPartition(key:Any):Int返回給定鍵的分區(qū)編號(0到numPartitions-1)equals()Java判斷相等性的標準方法5.1.4分區(qū)實例:根據(jù)key值的最后一位數(shù)字,寫到不同的文件例如:10寫入到part-0000011寫入到part-00001...19寫入到part-000095.1.4分區(qū)importorg.apache.spark.{Partitioner,SparkContext,SparkConf}//自定義分區(qū)類,需要繼承org.apache.spark.Partitioner類classMyPartitioner(numParts:Int)extendsPartitioner{//覆蓋分區(qū)數(shù)

overridedefnumPartitions:Int=numParts

//覆蓋分區(qū)號獲取函數(shù)

overridedefgetPartition(key:Any):Int={key.toString.toInt%10}}objectTestPartitioner{defmain(args:Array[String]){

valconf=newSparkConf()

valsc=newSparkContext(conf)//模擬5個分區(qū)的數(shù)據(jù)

valdata=sc.parallelize(1to10,5)//根據(jù)尾號轉(zhuǎn)變?yōu)?0個分區(qū),分別寫到10個文件

data.map((_,1)).partitionBy(newMyPartitioner(10)).map(_._1).saveAsTextFile("file:///usr/local/spark/mycode/rdd/partitioner")}}5.1.5一個綜合實例假設(shè)有一個本地文件word.txt,里面包含了很多行文本,每行文本由多個單詞構(gòu)成,單詞之間用空格分隔??梢允褂萌缦抡Z句進行詞頻統(tǒng)計(即統(tǒng)計每個單詞出現(xiàn)的次數(shù)):scala>vallines=sc.//代碼一行放不下,可以在圓點后回車,在下行繼續(xù)輸入|textFile("file:///usr/local/spark/mycode/wordcount/word.txt")scala>valwordCount=lines.flatMap(line=>line.split("")).|map(word=>(word,1)).reduceByKey((a,b)=>a+b)scala>wordCount.collect()scala>wordCount.foreach(println)5.1.5一個綜合實例scala>vallines=sc.//代碼一行放不下,可以在圓點后回車,在下行繼續(xù)輸入|textFile("file:///usr/local/spark/mycode/wordcount/word.txt")scala>valwordCount=lines.flatMap(line=>line.split("")).|map(word=>(word,1)).reduceByKey((a,b)=>a+b)scala>wordCount.collect()scala>wordCount.foreach(println)5.1.5一個綜合實例在實際應(yīng)用中,單詞文件可能非常大,會被保存到分布式文件系統(tǒng)HDFS中,Spark和Hadoop會統(tǒng)一部署在一個集群上圖

在一個集群中同時部署Hadoop和Spark5.1.5一個綜合實例圖

在集群中執(zhí)行詞頻統(tǒng)計過程示意圖5.2鍵值對RDD5.2.1鍵值對RDD的創(chuàng)建5.2.2常用的鍵值對RDD轉(zhuǎn)換操作5.2.3一個綜合實例5.2.1鍵值對RDD的創(chuàng)建(1)第一種創(chuàng)建方式:從文件中加載可以采用多種方式創(chuàng)建PairRDD,其中一種主要方式是使用map()函數(shù)來實現(xiàn)scala>vallines=sc.textFile("file:///usr/local/spark/mycode/pairrdd/word.txt")lines:org.apache.spark.rdd.RDD[String]=file:///usr/local/spark/mycode/pairrdd/word.txtMapPartitionsRDD[1]attextFileat<console>:27scala>valpairRDD=lines.flatMap(line=>line.split("")).map(word=>(word,1))pairRDD:org.apache.spark.rdd.RDD[(String,Int)]=MapPartitionsRDD[3]atmapat<console>:29scala>pairRDD.foreach(println)(i,1)(love,1)(hadoop,1)……5.2.1鍵值對RDD的創(chuàng)建(2)第二種創(chuàng)建方式:通過并行集合(數(shù)組)創(chuàng)建RDDscala>vallist=List("Hadoop","Spark","Hive","Spark")list:List[String]=List(Hadoop,Spark,Hive,Spark)

scala>valrdd=sc.parallelize(list)rdd:org.apache.spark.rdd.RDD[String]=ParallelCollectionRDD[11]atparallelizeat<console>:29

scala>valpairRDD=rdd.map(word=>(word,1))pairRDD:org.apache.spark.rdd.RDD[(String,Int)]=MapPartitionsRDD[12]atmapat<console>:31

scala>pairRDD.foreach(println)(Hadoop,1)(Spark,1)(Hive,1)(Spark,1)5.2.2常用的鍵值對RDD轉(zhuǎn)換操作reduceByKey(func)groupByKey()keysvaluessortByKey()mapValues(func)joincombineByKey5.2.2常用的鍵值對RDD轉(zhuǎn)換操作reduceByKey(func)reduceByKey(func)的功能是,使用func函數(shù)合并具有相同鍵的值(Hadoop,1)(Spark,1)(Hive,1)(Spark,1)scala>pairRDD.reduceByKey((a,b)=>a+b).foreach(println)(Spark,2)(Hive,1)(Hadoop,1)5.2.2常用的鍵值對RDD轉(zhuǎn)換操作groupByKey()的功能是,對具有相同鍵的值進行分組groupByKey()比如,對四個鍵值對("spark",1)、("spark",2)、("hadoop",3)和("hadoop",5),采用groupByKey()后得到的結(jié)果是:("spark",(1,2))和("hadoop",(3,5))(Hadoop,1)(Spark,1)(Hive,1)(Spark,1)scala>pairRDD.groupByKey()res15:org.apache.spark.rdd.RDD[(String,Iterable[Int])]=ShuffledRDD[15]atgroupByKeyat<console>:345.2.2常用的鍵值對RDD轉(zhuǎn)換操作reduceByKey用于對每個key對應(yīng)的多個value進行merge操作,最重要的是它能夠在本地先進行merge操作,并且merge操作可以通過函數(shù)自定義reduceByKey和groupByKey的區(qū)別groupByKey也是對每個key進行操作,但只生成一個sequence,groupByKey本身不能自定義函數(shù),需要先用groupByKey生成RDD,然后才能對此RDD通過map進行自定義函數(shù)操作5.2.2常用的鍵值對RDD轉(zhuǎn)換操作scala>val

words

=

Array("one",

"two",

"two",

"three",

"three",

"three")

scala>val

wordPairsRDD

=

sc.parallelize(words).map(word

=>

(word,

1))

scala>val

wordCountsWithReduce

=

wordPairsRDD.reduceByKey(_

+

_)

scala>val

wordCountsWithGroup

=

wordPairsRDD.groupByKey().map(t

=>

(t._1,

t._2.sum))

reduceByKey和groupByKey的區(qū)別上面得到的wordCountsWithReduce和wordCountsWithGroup是完全一樣的,但是,它們的內(nèi)部運算過程是不同的5.2.2常用的鍵值對RDD轉(zhuǎn)換操作(1)當采用reduceByKey時,Spark可以在每個分區(qū)移動數(shù)據(jù)之前將待輸出數(shù)據(jù)與一個共用的key結(jié)合5.2.2常用的鍵值對RDD轉(zhuǎn)換操作(2)當采用groupByKey時,由于它不接收函數(shù),Spark只能先將所有的鍵值對(key-valuepair)都移動,這樣的后果是集群節(jié)點之間的開銷很大,導(dǎo)致傳輸延時5.2.2常用的鍵值對RDD轉(zhuǎn)換操作keyskeys只會把PairRDD中的key返回形成一個新的RDDscala>pairRDD.keysres17:org.apache.spark.rdd.RDD[String]=MapPartitionsRDD[17]atkeysat<console>:34scala>pairRDD.keys.foreach(println)HadoopSparkHiveSpark(Hadoop,1)(Spark,1)(Hive,1)(Spark,1)5.2.2常用的鍵值對RDD轉(zhuǎn)換操作valuesvalues只會把PairRDD中的value返回形成一個新的RDD。(Hadoop,1)(Spark,1)(Hive,1)(Spark,1)scala>pairRDD.valuesres0:org.apache.spark.rdd.RDD[Int]=MapPartitionsRDD[2]atvaluesat<console>:34

scala>pairRDD.values.foreach(println)11115.2.2常用的鍵值對RDD轉(zhuǎn)換操作sortByKey()sortByKey()的功能是返回一個根據(jù)鍵排序的RDD(Hadoop,1)(Spark,1)(Hive,1)(Spark,1)scala>pairRDD.sortByKey()res0:org.apache.spark.rdd.RDD[(String,Int)]=ShuffledRDD[2]atsortByKeyat<console>:34scala>pairRDD.sortByKey().foreach(println)(Hadoop,1)(Hive,1)(Spark,1)(Spark,1)5.2.2常用的鍵值對RDD轉(zhuǎn)換操作sortByKey()和sortBy()scala>vald1=sc.parallelize(Array((“c",8),(“b“,25),(“c“,17),(“a“,42),(“b“,4),(“d“,9),(“e“,17),(“c“,2),(“f“,29),(“g“,21),(“b“,9)))scala>d1.reduceByKey(_+_).sortByKey(false).collectres2:Array[(String,Int)]=Array((g,21),(f,29),(e,17),(d,9),(c,27),(b,38),(a,42))scala>vald2=sc.parallelize(Array((“c",8),(“b“,25),(“c“,17),(“a“,42),(“b“,4),(“d“,9),(“e“,17),(“c“,2),(“f“,29),(“g“,21),(“b“,9)))scala>d2.reduceByKey(_+_).sortBy(_._2,false).collectres4:Array[(String,Int)]=Array((a,42),(b,38),(f,29),(c,27),(g,21),(e,17),(d,9))5.2.2常用的鍵值對RDD轉(zhuǎn)換操作mapValues(func)對鍵值對RDD中的每個value都應(yīng)用一個函數(shù),但是,key不會發(fā)生變化scala>pairRDD.mapValues(x=>x+1)res2:org.apache.spark.rdd.RDD[(String,Int)]=MapPartitionsRDD[4]atmapValuesat<console>:34scala>pairRDD.mapValues(x=>x+1).foreach(println)(Hadoop,2)(Spark,2)(Hive,2)(Spark,2)(Hadoop,1)(Spark,1)(Hive,1)(Spark,1)5.2.2常用的鍵值對RDD轉(zhuǎn)換操作joinjoin就表示內(nèi)連接。對于內(nèi)連接,對于給定的兩個輸入數(shù)據(jù)集(K,V1)和(K,V2),只有在兩個數(shù)據(jù)集中都存在的key才會被輸出,最終得到一個(K,(V1,V2))類型的數(shù)據(jù)集。scala>valpairRDD1=sc.parallelize(Array(("spark",1),("spark",2),("hadoop",3),("hadoop",5)))pairRDD1:org.apache.spark.rdd.RDD[(String,Int)]=ParallelCollectionRDD[24]atparallelizeat<console>:27

scala>valpairRDD2=sc.parallelize(Array(("spark","fast")))pairRDD2:org.apache.spark.rdd.RDD[(String,String)]=ParallelCollectionRDD[25]atparallelizeat<console>:27

scala>pairRDD1.join(pairRDD2)res9:org.apache.spark.rdd.RDD[(String,(Int,String))]=MapPartitionsRDD[28]atjoinat<console>:32

scala>pairRDD1.join(pairRDD2).foreach(println)(spark,(1,fast))(spark,(2,fast))5.2.2常用的鍵值對RDD轉(zhuǎn)換操作combineByKeycombineByKey(createCombiner,mergeValue,mergeCombiners,partitioner,mapSideCombine)createCombiner:在第一次遇到Key時創(chuàng)建組合器函數(shù),將RDD數(shù)據(jù)集中的V類型值轉(zhuǎn)換C類型值(V=>C)mergeValue:合并值函數(shù),再次遇到相同的Key時,將createCombiner的C類型值與這次傳入的V類型值合并成一個C類型值(C,V)=>CmergeCombiners:合并組合器函數(shù),將C類型值兩兩合并成一個C類型值partitioner:使用已有的或自定義的分區(qū)函數(shù),默認是HashPartitioner

mapSideCombine:是否在map端進行Combine操作,默認為true5.2.2常用的鍵值對RDD轉(zhuǎn)換操作例:編程實現(xiàn)自定義Spark合并方案。給定一些銷售數(shù)據(jù),數(shù)據(jù)采用鍵值對的形式<公司,收入>,求出每個公司的總收入和平均收入,保存在本地文件提示:可直接用sc.parallelize在內(nèi)存中生成數(shù)據(jù),在求每個公司總收入時,先分三個分區(qū)進行求和,然后再把三個分區(qū)進行合并。只需要編寫RDDcombineByKey函數(shù)的前三個參數(shù)的實現(xiàn)5.2.2常用的鍵值對RDD轉(zhuǎn)換操作importorg.apache.spark.SparkContextimportorg.apache.spark.SparkConfobjectCombine{defmain(args:Array[String]){

valconf=newSparkConf().setAppName("Combine").setMaster(“l(fā)ocal”)

valsc=newSparkContext(conf)

valdata=sc.parallelize(Array(("company-1",92),("company-1",85),("company-1",82),("company-2",78),("company-2",96),("company-2",85),("company-3",88),("company-3",94),("company-3",80)),3)

valres=bineByKey((income)=>(income,1),(acc:(Int,Int),income)=>(acc._1+income,acc._2+1),(acc1:(Int,Int),acc2:(Int,Int))=>(acc1._1+acc2._1,acc1._2+acc2._2)).map{case(key,value)=>(key,value._1,value._1/value._2.toFloat)}

res.repartition(1).saveAsTextFile("./result")}}5.2.3一個綜合實例一個綜合實例題目:給定一組鍵值對("spark",2),("hadoop",6),("hadoop",4),("spark",6),鍵值對的key表示圖書名稱,value表示某天圖書銷量,請計算每個鍵對應(yīng)的平均值,也就是計算每種圖書的每天平均銷量。scala>valrdd=sc.parallelize(Array(("spark",2),("hadoop",6),("hadoop",4),("spark",6)))rdd:org.apache.spark.rdd.RDD[(String,Int)]=ParallelCollectionRDD[38]atparallelizeat<console>:27

scala>rdd.mapValues(x=>(x,1)).reduceByKey((x,y)=>(x._1+y._1,x._2+y._2)).mapValues(x=>(x._1/x._2)).collect()res22:Array[(String,Int)]=Array((spark,4),(hadoop,5))5.2.3一個綜合實例圖

計算圖書平均銷量過程示意圖scala>valrdd=sc.parallelize(Array(("spark",2),("hadoop",6),("hadoop",4),("spark",6)))rdd:org.apache.spark.rdd.RDD[(String,Int)]=ParallelCollectionRDD[38]atparallelizeat<console>:27

scala>rdd.mapValues(x=>(x,1)).reduceByKey((x,y)=>(x._1+y._1,x._2+y._2)).mapValues(x=>(x._1/x._2)).collect()res22:Array[(String,Int)]=Array((spark,4),(hadoop,5))5.3數(shù)據(jù)讀寫5.3.1文件數(shù)據(jù)讀寫5.3.2讀寫HBase數(shù)據(jù)5.3.1文件數(shù)據(jù)讀寫本地文件系統(tǒng)的數(shù)據(jù)讀寫分布式文件系統(tǒng)HDFS的數(shù)據(jù)讀寫3.JSON文件的數(shù)據(jù)讀寫5.3.1文件數(shù)據(jù)讀寫1.本地文件系統(tǒng)的數(shù)據(jù)讀寫(1)從文件中讀取數(shù)據(jù)創(chuàng)建RDDscala>valtextFile=sc.|textFile("file:///usr/local/spark/mycode/wordcount/word.txt")因為Spark采用了惰性機制,在執(zhí)行轉(zhuǎn)換操作的時候,即使輸入了錯誤的語句,spark-shell也不會馬上報錯(假設(shè)word123.txt不存在)scala>valtextFile=sc.|textFile("file:///usr/local/spark/mycode/wordcount/word123.txt")5.3.1文件數(shù)據(jù)讀寫1.本地文件系統(tǒng)的數(shù)據(jù)讀寫(2)把RDD寫入到文本文件中scala>valtextFile=sc.|textFile("file:///usr/local/spark/mycode/wordcount/word.txt")scala>textFile.|saveAsTextFile("file:///usr/local/spark/mycode/wordcount/writeback")$cd/usr/local/spark/mycode/wordcount/writeback/$ls如果想再次把數(shù)據(jù)加載在RDD中,只要使用writeback這個目錄即可,如下:scala>valtextFile=sc.textFile("file:///usr/local/spark/mycode/wordcount/writeback")5.3.1文件數(shù)據(jù)讀寫2.分布式文件系統(tǒng)HDFS的數(shù)據(jù)讀寫scala>valtextFile=sc.textFile("hdfs://localhost:9000/user/hadoop/word.txt")scala>textFile.first()從分布式文件系統(tǒng)HDFS中讀取數(shù)據(jù),也是采用textFile()方法,可以為textFile()方法提供一個HDFS文件或目錄地址,如果是一個文件地址,它會加載該文件,如果是一個目錄地址,它會加載該目錄下的所有文件的數(shù)據(jù)同樣,可以使用saveAsTextFile()方法把RDD中的數(shù)據(jù)保存到HDFS文件中,命令如下:scala>textFile.saveAsTextFile("writeback")如下三條語句都是等價的:scala>valtextFile=sc.textFile("hdfs://localhost:9000/user/hadoop/word.txt")scala>valtextFile=sc.textFile("/user/hadoop/word.txt")scala>valtextFile=sc.textFile("word.txt")5.3.1文件數(shù)據(jù)讀寫JSON(JavaScriptObjectNotation)是一種輕量級的數(shù)據(jù)交換格式{"name":"Michael"}{"name":"Andy","age":30}{"name":"Justin","age":19}

Spark提供了一個JSON樣例數(shù)據(jù)文件,存放在“/usr/local/spark/examples/src/main/resources/people.json”中3.JSON文件的讀取5.3.1文件數(shù)據(jù)讀寫把本地文件系統(tǒng)中的people.json文件加載到RDD中:scala>valjsonStr=sc.|textFile("file:///usr/local/spark/examples/src/main/resources/people.json")scala>jsonStr.foreach(println){"name":"Michael"}{"name":"Andy","age":30}{"name":"Justin","age":19}5.3.1文件數(shù)據(jù)讀寫任務(wù):編寫程序完成對JSON數(shù)據(jù)的解析工作Scala中有一個自帶的JSON庫——scala.util.parsing.json.JSON,可以實現(xiàn)對JSON數(shù)據(jù)的解析JSON.parseFull(jsonString:String)函數(shù),以一個JSON字符串作為輸入并進行解析,如果解析成功則返回一個Some(map:Map[String,Any]),如果解析失敗則返回None5.3.1文件數(shù)據(jù)讀寫importorg.apache.spark.SparkContextimportorg.apache.spark.SparkContext._importorg.apache.spark.SparkConfimportscala.util.parsing.json.JSONobjectJSONRead{defmain(args:Array[String]){

val

inputFile="file:///usr/local/spark/examples/src/main/resources/people.json"

valconf=newSparkConf().setAppName("JSONRead")

valsc=newSparkContext(conf)

val

jsonStrs=sc.textFile(inputFile)

valresult=jsonStrs.map(s=>JSON.parseFull(s))

result.foreach({r=>rmatch{caseSome(map:Map[String,Any])=>println(map)caseNone=>println("Parsingfailed")caseother=>println("Unknowndatastructure:"+other)}})}}在JSONRead.scala代碼文件中輸入以下內(nèi)容:5.3.1文件數(shù)據(jù)讀寫將整個應(yīng)用程序打包成JAR包通過spark-submit運行程序$/usr/local/spark/bin/spark-submit\>--class"JSONRead”\

>/usr/local/spark/mycode/json/target/scala-2.11/json-project_2.11-1.0.jar執(zhí)行后可以在屏幕上的大量輸出信息中找到如下結(jié)果:Map(name->Michael)Map(name->Andy,age->30.0)Map(name->Justin,age->19.0)5.3.2讀寫HBase數(shù)據(jù)0.HBase簡介1.創(chuàng)建一個HBase表2.配置Spark3.編寫程序讀取HBase數(shù)據(jù)

4.編寫程序向HBase寫入數(shù)據(jù)5.3.2讀寫HBase數(shù)據(jù)圖Hadoop生態(tài)系統(tǒng)中HBase與其他部分的關(guān)系HBase是GoogleBigTable的開源實現(xiàn)0.HBase簡介5.3.2讀寫HBase數(shù)據(jù)HBase是一個稀疏、多維度、排序的映射表,這張表的索引是行鍵、列族、列限定符和時間戳每個值是一個未經(jīng)解釋的字符串,沒有數(shù)據(jù)類型用戶在表中存儲數(shù)據(jù),每一行都有一個可排序的行鍵和任意多的列表在水平方向由一個或者多個列族組成,一個列族中可以包含任意多個列,同一個列族里面的數(shù)據(jù)存儲在一起列族支持動態(tài)擴展,可以很輕松地添加一個列族或列,無需預(yù)先定義列的數(shù)量以及類型,所有列均以字符串形式存儲,用戶需要自行進行數(shù)據(jù)類型轉(zhuǎn)換HBase中執(zhí)行更新操作時,并不會刪除數(shù)據(jù)舊的版本,而是生成一個新的版本,舊有的版本仍然保留(這是和HDFS只允許追加不允許修改的特性相關(guān)的)5.3.2讀寫HBase數(shù)據(jù)表:HBase采用表來組織數(shù)據(jù),表由行和列組成,列劃分為若干個列族行:每個HBase表都由若干行組成,每個行由行鍵(rowkey)來標識。列族:一個HBase表被分組成許多“列族”(ColumnFamily)的集合,它是基本的訪問控制單元列限定符:列族里的數(shù)據(jù)通過列限定符(或列)來定位單元格:在HBase表中,通過行、列族和列限定符確定一個“單元格”(cell),單元格中存儲的數(shù)據(jù)沒有數(shù)據(jù)類型,總被視為字節(jié)數(shù)組byte[]時間戳:每個單元格都保存著同一份數(shù)據(jù)的多個版本,這些版本采用時間戳進行索引5.3.2讀寫HBase數(shù)據(jù)HBase中需要根據(jù)行鍵、列族、列限定符和時間戳來確定一個單元格,因此,可以視為一個“四維坐標”,即[行鍵,列族,列限定符,時間戳]鍵值[“201505003”,“Info”,“email”,1174184619081]“xie@”[“201505003”,“Info”,“email”,1174184620720]“you@163.com”5.3.2讀寫HBase數(shù)據(jù)表

HBase數(shù)據(jù)的概念視圖行鍵時間戳列族contents列族anchor"n.www"t5anchor:=”CNN”t4anchor:my.look.ca="CNN.com"t3contents:html="<html>..."t2contents:html="<html>..."t1contents:html="<html>..."5.3.2讀寫HBase數(shù)據(jù)表

HBase數(shù)據(jù)的物理視圖列族contents行鍵時間戳列族contents"n.www"t3contents:html="<html>..."t2contents:html="<html>..."t1contents:html="<html>..."列族anchor行鍵時間戳列族anchor"n.www"t5anchor:=”CNN”t4anchor:my.look.ca="CNN.com"5.3.2讀寫HBase數(shù)據(jù)首先,請參照廈門大學數(shù)據(jù)庫實驗室博客完成HBase的安裝(偽分布式模式):/blog/install-hbase/因為HBase是偽分布式模式,需要調(diào)用HDFS,所以,請首先在終端中輸入下面命令啟動Hadoop:下面就可以啟動HBase,命令如下:如果里面已經(jīng)有一個名稱為student的表,請使用如下命令刪除:1.創(chuàng)建一個HBase表5.3.2讀寫HBase數(shù)據(jù)下面創(chuàng)建一個student表,要在這個表中錄入如下數(shù)據(jù):5.3.2讀寫HBase數(shù)據(jù)把HBase的lib目錄下的一些jar文件拷貝到Spark中,這些都是編程時需要引入的jar包,需要拷貝的jar文件包括:所有hbase開頭的jar文件、guava-12.0.1.jar、htrace-core-3.1.0-incubating.jar和protobuf-java-2.5.0.jar$cd/usr/local/spark/jars$mkdirhbase$cdhbase$cp/usr/local/hbase/lib/hbase*.jar./$cp/usr/local/hbase/lib/guava-12.0.1.jar./$cp/usr/local/hbase/lib/htrace-core-3.1.0-incubating.jar./$cp/usr/local/hbase/lib/protobuf-java-2.5.0.jar./執(zhí)行如下命令:2.配置Spark5.3.2讀寫HBase數(shù)據(jù)如果要讓Spark讀取HBase,就需要使用SparkContext提供的newAPIHadoopRDD這個API將表的內(nèi)容以RDD的形式加載到Spark中。importorg.apache.hadoop.conf.Configurationimportorg.apache.hadoop.hbase._importorg.apache.hadoop.hbase.client._importorg.apache.hadoop.hbase.mapreduce.TableInputFormatimportorg.apache.hadoop.hbase.util.Bytesimportorg.apache.spark.SparkContextimportorg.apache.spark.SparkContext._importorg.apache.spark.SparkConf//剩余代碼見下一頁3.編寫程序讀取HBase數(shù)據(jù)5.3.2讀寫HBase數(shù)據(jù)objectSparkOperateHBase{defmain(args:Array[String]){

valconf=HBaseConfiguration.create()

valsc=newSparkContext(newSparkConf())//設(shè)置查詢的表名

conf.set(TableInputFormat.INPUT_TABLE,"student")

val

stuRDD=sc.newAPIHadoopRDD(conf,classOf[TableInputFormat],

classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],

classOf[org.apache.hadoop.hbase.client.Result])

valcount=stuRDD.count()

println("StudentsRDDCount:"+count)

stuRDD.cache()//遍歷輸出

stuRDD.foreach({case(_,result)=>

valkey=Bytes.toString(result.getRow)

valname=Bytes.toString(result.getValue("info".getBytes,"name".getBytes))

valgender=Bytes.toString(result.getValue("info".getBytes,"gender".getBytes))

valage=Bytes.toString(result.getValue("info".getBytes,"age".getBytes))

println("Rowkey:"+key+"Name:"+name+"Gender:"+gender+"Age:"+age)})}}在SparkOperateHBase.scala文件中輸入以下代碼:5.3.2讀寫HBase數(shù)據(jù)在simple.sbt中錄入下面內(nèi)容:name:="SimpleProject"version:="1.0"scalaVersion:="2.11.8"libraryDependencies+="org.apache.spark"%%"spark-core"%"2.1.0"libraryDependencies+="org.apache.hbase"%"hbase-client"%"1.1.5"libraryDependencies+="org.apache.hbase"%"hbase-common"%"1.1.5"libraryDependencies+="org.apache.hbase"%"hbase-server"%"1.1.5"采用sbt打包,通過spark-submit運行程序$/usr/local/spark/bin/spark-submit\>--driver-class-path/usr/local/spark/jars/hbase/*:/usr/local/hbase/conf\>--class"SparkOperateHBase"\>/usr/local/spark/mycode/hbase/target/scala-2.11/simple-project_2.11-1.0.jar必須使用“--driver-class-path”參數(shù)指定依賴JAR包的路徑,而且必須把“/usr/local/hbase/conf”也加到路徑中5.3.2讀寫HBase數(shù)據(jù)執(zhí)行后得到如下結(jié)果:StudentsRDDCount:2Rowkey:1Name:Xueqian

Gender:FAge:23Rowkey:2Name:Weiliang

Gender:MAge:245.3.2讀寫HBase數(shù)據(jù)4.編寫程序向HBase寫入數(shù)據(jù)idinfonamegenderage3RongchengM264GuanhuaM27下面編寫應(yīng)用程序把表中的兩個學生信息插入到HBase的student表中表

向student表中插入的新數(shù)據(jù)5.3.2讀寫HBase數(shù)據(jù)在SparkWriteHBase.scala文件中輸入下面代碼:importorg.apache.hadoop.hbase.HBaseConfiguration

importorg.apache.hadoop.hbase.mapreduce.TableOutputFormat

importorg.apache.spark._importorg.apache.hadoop.mapreduce.Job

importorg.apache.hadoop.hbase.io.ImmutableBytesWritable

importorg.apache.hadoop.hbase.client.Result

importorg.apache.hadoop.hbase.client.Put

importorg.apache.hadoop.hbase.util.Bytes//剩余代碼見下一頁

5.3.2讀寫HBase數(shù)據(jù)objectSparkWriteHBase{defmain(args:Array[String]):Unit={

val

sparkConf=newSparkConf().setAppName("SparkWriteHBase").setMaster("local")

valsc=newSparkContext(sparkConf)

val

tablename="student"

sc.hadoopConfiguration.set(TableOutputFormat.OUTPUT_TABLE,tablename)

valjob=newJob(sc.hadoopConfiguration)

job.setOutputKeyClass(classOf[ImmutableBytesWritable])

job.setOutputValueClass(classOf[Result])

job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])

val

indataRDD=sc.makeRDD(Array("3,Rongcheng,M,26","4,Guanhua,M,27"))//構(gòu)建兩行記錄

val

rdd=indataRDD.map(_.split(',')).map{arr=>{

valput=newPut(Bytes.toBytes(arr(0)))//行健的值

put.add(Bytes.toBytes("info"),Bytes.toBytes("name"),Bytes.toBytes(arr(1)))//info:name列的值

put.add(Bytes.toBytes("info"),Bytes.toBytes("gender"),Bytes.toBytes(arr(2)))//info:gender列的值

put.add(Bytes.toBytes("info"),Bytes.toBytes("age"),Bytes.toBytes(arr(3).toInt))//info:age列的值

(newImmutableBytesWritable,put)}}

rdd.saveAsNewAPIHadoopDataset(job.getConfiguration())}}在SparkWriteHBase.scala文件中輸入下面代碼:5.3.2讀寫HBase數(shù)據(jù)$/usr/local/spark/bin/spark-submit\>--driver-class-path/usr/local/spark/jars/hbase/*:/usr/local/hbase/conf\>--class"SparkWriteHBase"\>/usr/local/spark/mycode/hbase/target/scala-2.11/simple-project_2.11-1.0.jarhbase>scan'student'ROWCOLUMN+CELL

溫馨提示

  • 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內(nèi)容里面會有圖紙預(yù)覽,若沒有圖紙預(yù)覽就沒有圖紙。
  • 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
  • 5. 人人文庫網(wǎng)僅提供信息存儲空間,僅對用戶上傳內(nèi)容的表現(xiàn)方式做保護處理,對用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對任何下載內(nèi)容負責。
  • 6. 下載文件中如有侵權(quán)或不適當內(nèi)容,請與我們聯(lián)系,我們立即糾正。
  • 7. 本站不保證下載資源的準確性、安全性和完整性, 同時也不承擔用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。

評論

0/150

提交評論