大數(shù)據(jù)技術及應用-基于Python語言 課件 第7章 分布式計算框架MapReduce_第1頁
大數(shù)據(jù)技術及應用-基于Python語言 課件 第7章 分布式計算框架MapReduce_第2頁
大數(shù)據(jù)技術及應用-基于Python語言 課件 第7章 分布式計算框架MapReduce_第3頁
大數(shù)據(jù)技術及應用-基于Python語言 課件 第7章 分布式計算框架MapReduce_第4頁
大數(shù)據(jù)技術及應用-基于Python語言 課件 第7章 分布式計算框架MapReduce_第5頁
已閱讀5頁,還剩108頁未讀 繼續(xù)免費閱讀

下載本文檔

版權說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權,請進行舉報或認領

文檔簡介

第7章分布式計算框架MapReduce本章學習目標掌握MapReduce的基本工作原理;掌握MapReduce各步驟輸入/輸出的銜接關系;掌握利用HadoopStreaming進行代碼測試的基本思路與方法;掌握利用Python設計MapReduce程序的思路和方法;掌握MapReduce程序設計模式的基本概念,以及理解常見的幾種設計模式;理解MRJob的工作原理,并能夠進行簡單的MRJob程序設計應用。01MapReduce概述02WordCount實例詳解目錄Contents03Python代碼測試04利用Python的迭代器和生成器優(yōu)化wordCount程序05MapReduce程序設計模式06用MRJob庫編寫MapReduce程序07本章小結MapReduce概述01MapReduce概述HadoopMapReduce是一個計算框架,目的是為了方便編寫具有高可靠性、高容錯性的,能在大型集群(數(shù)千個節(jié)點)上并行地處理大量數(shù)據(jù)(數(shù)TB的數(shù)據(jù)集)的分布式應用程序。該計算框架管理數(shù)據(jù)傳遞的所有細節(jié),如發(fā)起任務、驗證任務完成情況以及在節(jié)點之間復制數(shù)據(jù)等。第一個MapReduce的Python程序Hadoop自帶有計算圓周率PI的示例jar包,程序用隨機數(shù)的方式計算PI值,該程序是一個java程序,通過執(zhí)行以下命令調(diào)用集群進行計算:hadoopjar$HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.3.0.jarpi1010輸出結果如下:第一個MapReduce的Python程序第一個Python程序也是實現(xiàn)PI的計算。第一個MapReduce的Python程序第一個MapReduce的Python程序在HDFS上創(chuàng)建一個空的文件,之后以該空文件作為輸入文件便可以運行計算PI的程序了。運行完成后,可以在HDFS文件系統(tǒng)中查看輸出文件內(nèi)容。假設把程序對應的mapper和reducer存放在“/codes/Pi”文件夾中,HDFS系統(tǒng)上用作輸入的數(shù)據(jù)存放于HDFS的“/data”文件夾,輸出存放于“/data_rlt”。其操作步驟具體如下:第一個MapReduce的Python程序chmod+x/codes/Pi/*.*第一個MapReduce的Python程序利用如下命令運行該程序運行結果如下:mapredstreaming-input/data/tmp.txt-output/data_rlt-mapper/codes/pi/pi_mapper.py-reducer/codes/pi/pi_reducer.py第一個MapReduce的Python程序MapReduce工作原理MapReduce程序分三個階段執(zhí)行,即map階段、shuffle階段和reduce階段其中shuffle可以由MapReduce框架自動完成。因此,多數(shù)情況下,MapReduce算法主要包括兩個任務:Map和Reduce。MapReduce工作原理1.Map階段Map的任務是對輸入數(shù)據(jù)進行處理,通常情況下,MapReduce作業(yè)的輸入和輸出都存儲在文件系統(tǒng)中,MapReduce作業(yè)將輸入數(shù)據(jù)集拆分為獨立的塊,這些塊由Map任務以完全并行的方式處理,并轉換為一種(key,value)的數(shù)據(jù)模式。MapReduce工作原理2.Reduce階段該階段包括shuffle和reduce,Reducer的任務是把Mapper生成的(key,value)集整合處理,并輸出最終結果到HDFS中。系統(tǒng)對map任務的輸出執(zhí)行排序和轉換,并映射為reduce任務的輸入,此過程就是Shuffle。兩階段任務完成后,集群收集并縮減數(shù)據(jù)以形成最后結果,并將其發(fā)送回Hadoop服務器。MapReduce工作原理CombinerMapReduce程序還可以包含combiner,combiner在Map之后執(zhí)行,通常情況下實現(xiàn)對Map輸出數(shù)據(jù)的簡單歸納,從而減少數(shù)據(jù)傳輸。在多數(shù)應用中,combiner都是能夠大幅提高程序效率的關鍵步驟。MapReduce工作原理Hadoop框架是用Java實現(xiàn)的,MapReduce應用程序卻可以不用Java編寫。Hadoop流(HadoopStreaming)是一個工具應用程序(Utility),它允許用戶使用任何可執(zhí)行文件作為mapper或reducer來創(chuàng)建和運行作業(yè);此外,Hadoop管道(HadoopPipes)是一種SWIG兼容的C++API,他可以支持編寫非JNI(JavaNativeInterface)的MapReduce應用程序。MapReduce作業(yè)的工作流程MapReduce作業(yè)的工作流程用戶程序中的MapReduce庫首先將輸入文件劃分為M片,每片大小一般在16M到64M之間(可由參數(shù)指定),然后,它在集群多臺機器上啟動相同的程序復制。其中一個復制程序為master(主模塊),其余的都是worker(分模塊)。worker接收master分配的任務,其中有M個Map任務和R個Reduce任務要分配。master挑選一個空閑的worker并為其分配一個map任務或者reduce任務。MapReduce作業(yè)的工作流程被分配到Map任務的worker會去讀取相應的輸入塊的內(nèi)容。它從輸入文件中解析出鍵值對并且將每個鍵值對傳送給用戶定義的Map函數(shù)。由Map函數(shù)產(chǎn)生的中間鍵值對則緩存在內(nèi)存中。被緩存的鍵值對會階段性地寫回本地磁盤,并且被劃分函數(shù)分割成R份。這些緩存在磁盤上的數(shù)據(jù)位置會被回傳給master(主模塊),master再負責將這些位置轉發(fā)給Reduceworker。MapReduce作業(yè)的工作流程當Reduceworker(Reduce分模塊)從master(主模塊)那里接收到這些位置信息時,它會使用遠程過程調(diào)用從Mapworker的本地磁盤中獲取緩存的數(shù)據(jù)。當Reduceworker讀入全部的中間數(shù)據(jù)之后,將根據(jù)中間鍵對進行排序,這樣所有具有相同鍵的鍵值對就都聚集在一起。因為要把具有相同鍵的鍵值對映射到同一個reducetask中,所以需要對中間鍵值對進行排序。MapReduce作業(yè)的工作流程Reduceworker遍歷已經(jīng)排好序的中間數(shù)據(jù),每當遇到一個新的中間鍵,它會將key和相應的中間值傳遞給Reduce函數(shù)。Reduce函數(shù)的輸出會被添加到這個Reduce部分的輸出文件中。當所有的Maptasks和Reducetasks都完成時,master將喚醒用戶程序。至此,用戶代碼中的MapReduce調(diào)用返回。MapReduce作業(yè)的工作流程執(zhí)行完后,MapReduce的執(zhí)行結果被存放在R個輸出文件中(每個Reducetask對應一個,文件名由用戶指定)。因為這些輸出文件經(jīng)常作為另一個MapReduce調(diào)用的輸入,或者將它們用于另外一個能夠以多個文件作為輸入的分布式應用,所以,用戶并不需要將R個輸出文件歸并成一個。MapReduce作業(yè)的工作流程MapReduce的輸入/輸出MapReduce框架只對<key,value>(鍵值對)進行操作,該框架將作業(yè)的輸入視為一組<key,value>,并生成<key,value>作為作業(yè)的輸出。與Java程序不同,Python的Map程序輸入/輸出流為標準輸入/輸出,STDIN和STDOUT,mapper輸入從STDIN中接收數(shù)據(jù),reducer輸出結果到STDOUT;在mapper和reducer之間的數(shù)據(jù)也是通過標準輸入/輸出進行關聯(lián)。至于他們之間的數(shù)據(jù)流關系則交由hadoopStreaming處理。WordCount實例詳解02WordCount程序源碼使用Python編寫MapReduce的要點在于利用Hadoop流的API,通過STDIN(標準輸入)、STDOUT(標準輸出)在Map函數(shù)和Reduce函數(shù)之間傳遞數(shù)據(jù)。我們需要做的是利用Python的sys.stdin讀取輸入數(shù)據(jù),并把輸出傳送給sys.stdout。Hadoop流將會幫助我們處理中間的過程。WordCount程序一共包含兩個部分,分布式map階段程序wordCount_mapper.py和reduce階段程序wordCount_reducer.py,分別如下所示。WordCount程序源碼WordCount程序源碼WordCount程序源碼WordCount程序源碼編寫完代碼,并把他們分別存儲在文件wordCount_mapper.py和wordCount_reducer.py。檢查上述文件是否具有可執(zhí)行權限,若沒有,可以通過chmod命令設置兩個文件的執(zhí)行權限,如“chmod+x./wordCounter/mapper.py”。WordCount程序執(zhí)行利用HadoopStreaming執(zhí)行一個MapReduce任務的調(diào)用方法如下所示。hadoop-streaming.jar通常命名為hadoop-streaming-x.x.x.jar,其中x.x.x為具體版本號(例如對應hadoop3.3.0版的文件名為hadoop-streaming-3.3.0.jar)。WordCount程序執(zhí)行(1)數(shù)據(jù)準備(路徑可能會因個人不同)后在HDFS上創(chuàng)建“/wordCountData”文件夾,并把數(shù)據(jù)文件(文本文件)上傳到該文件夾;WordCount程序執(zhí)行(2)執(zhí)行命令如下(當命令在一行中寫不下時,可以使用“\”換行)mapredstreaming\-input/wordCountData\-output/wordCount_rlt\-mapper"/codes/wordCount/wordCount_mapper.py"\-reducer"/codes/wordCount/wordCount_reducer.py"WordCount程序執(zhí)行(3)運行完成后,查看文件夾“/wordCount_rlt”中的文件,可以通過cat直接查看WordCount程序原理將文件拆分成多片段(splits),由于測試用的文件較小,所以每個文件為一個split,并將文件形成標準輸入流,這一步由MapReduce框架自動完成;在mapper中按行讀取標準輸入數(shù)據(jù),并對單詞進行分割形成<key,value>,其中key為單詞,value為該單詞出現(xiàn)次數(shù),這里沒有對單詞進行統(tǒng)計,所以value都是1;得到map方法輸出的<key,value>集后,shuffle將它們按照key值進行排序,并作為reduce的輸入數(shù)據(jù);得到排過序的鍵值對后,利用自定義的reduce方法進行處理,得到新的<key,value>對,并作為WordCount的輸出結果。WordCount程序原理MapReducePython代碼的測試03MapReducePython代碼的測試使用Python編寫完成的MapReduce程序,通常需要進行調(diào)試、測試,以排除隱藏的邏輯問題。對運行于分布式環(huán)境的MapReduce程序而言,其運行的整個過程由mapReduce框架自動管理,在這種情況下要通過一遍遍運行查找程序,再在輸出的日志中發(fā)現(xiàn)邏輯錯誤、甚至語法問題將是非常耗時且煩瑣的。因此,我們需要通過使用一些工具和方法來協(xié)助程序代碼的調(diào)試。MapReducePython代碼的測試Python程序的運行是通過HadoopStreaming工具進行數(shù)據(jù)流的控制,并且Python的Map程序是通過標準輸入STDIN接收輸入數(shù)據(jù)的,所以可以通過構造標準輸出作為map程序的輸入,然后利用linux的管道把map輸出到sort工具實現(xiàn)排序;排序完成后的數(shù)據(jù)再通過管道輸出到reduce程序,從而實現(xiàn)簡單測試環(huán)境的構建。而reduce的最終輸出結果,則可以通過重定向標準輸出到文件的方式實現(xiàn)結果的輸出和存儲。MapReducePython代碼的測試MapReducePython代碼的測試以前面的wordCount程序為例,可以先把輸入文件之一復制到與mapper和reducer同一文件夾中,此時,該文件夾包含wordCount_mapper.py、wordCount_reducer.py兩個程序文件,運行如下命令:MapReducePython代碼的測試還可以把上述節(jié)點分開。例如,可以首先把mapper的輸出重定向到tmp.txt文件中,以便查看mapper階段的運行結果。然后再把中間結果作為reducer的輸入便可以分步查看兩個階段的運行結果。運行命令如下:catinput.txt|./wordCount_mapper.py>tmp.txtcattmp.txt|sort-k1,1|./wordCount_reducer.py>rlt.txt利用迭代器和生成器優(yōu)化wordCount程序04Python中的迭代器迭代器是訪問集合元素的一種方式,它提供了一個統(tǒng)一的訪問集合的接口。迭代器對象從集合的第一個元素開始訪問,直到訪問完所有的元素結束。迭代器只提供了next方法,所以只能往前不會后退。Python中的迭代器一個簡單的迭代器及使用Python中的迭代器把一個類作為一個迭代器使用需要在類中實現(xiàn)兩個方法__iter__()與__next__()Python中的生成器帶有yield的函數(shù)在Python中被稱為generator(生成器),生成器是迭代器的一種。例如:Python中的生成器簡單而言,yield的作用就是把一個函數(shù)變成一個generator生成器常被用于數(shù)據(jù)文件讀取,生成器可以根據(jù)需要讀取數(shù)據(jù)。如果直接對文件對象調(diào)用read()方法,會導致不可預測的內(nèi)存占用。使用yield利用固定長度的緩沖區(qū)來不斷讀取文件內(nèi)容,我們無須編寫讀文件的迭代類,就可以輕松實現(xiàn)文件讀取。Python中的生成器讀取文件的生成器,每次讀取BLOCK_SIZE大小itertools模塊itertools是Python內(nèi)置的模塊,itertools包提供了更加靈活的生成循環(huán)器的工具,其使用簡單且功能強大。該模塊中的部分內(nèi)容,可以參考教材7.6.3.其中,gropby()迭代器能夠把可迭代的輸入數(shù)據(jù)按關鍵字進行分組后輸出,例如:itertools模塊fromitertoolsimportgroupbyfromoperatorimportitemgettermapout=[('Hello',1),('hadoop',1),('Bye',1),('hadoop',1),('Hello',1),('world',1),('Bye',1),('world',1)]mapout=sorted(mapout,key=itemgetter(0))#排序rlt_groupby=groupby(mapout,key=itemgetter(0))forkey,itemsinrlt_groupby:print("key:",key)forsubiteminitems:print("subitem:",subitem)優(yōu)化wordCount程序利用上述Python的生成器以及itertools模塊工具,我們對wordCount程序進行修改,形成更具有Python風格的wordCount。優(yōu)化wordCount程序優(yōu)化wordCount程序#!/usr/bin/envpython#-*-coding:UTF-8-*-fromoperatorimportitemgetterfromitertoolsimportgroupbyimportsysdefread_mapper_output(file,separator='\t'):forlineinfile:yieldline.rstrip().split(separator,1)defmain(separator='\t'):data=read_mapper_output(sys.stdin,separator=separator)#data=sorted(data,key=itemgetter(0))forcurrent_word,groupingroupby(data,itemgetter(0)):try:total_count=sum(int(count)forcurrent_word,countingroup)print("%s%s%d"%(current_word,separator,total_count))exceptvalueError:passif__name__=="__main__":main()例7.6.4測試結果MapReduce程序設計模式05MR程序設計模式MapReduce程序設計模式的目的是通過總結MapReduce任務的模式,形成具有一定可替代性的程序設計模式,以便幫助程序員提高學習、實現(xiàn)MapReduce框架上的程序開發(fā)的效率。數(shù)據(jù)集介紹為了能夠更好的示范程序的數(shù)據(jù)處理與應用,我們采用空氣質量數(shù)據(jù)作為本章處理的目標數(shù)據(jù),全國空氣質量數(shù)據(jù)來自中國環(huán)境監(jiān)測總站的全國城市空氣質量實時發(fā)布平臺。數(shù)據(jù)格式:城市空氣指標值、站點空氣指標、站點詳細信息20140513,0,AQI,北京,8120140513,0,AQI,天津,6920140513,0,AQI,石家莊,12520140513,0,PM.5,唐山,82(1)聚合查詢模式聚合查詢是指對所要處理的數(shù)據(jù)進行一定的統(tǒng)計聚合處理,以從概要、全局的層面上掌握數(shù)據(jù)的特征和狀況。例如,今年北京地區(qū)的空氣質量總體情況,包括空氣質量各個指標的平均值、最大最小值,或者空氣質量評價為優(yōu)的天數(shù)等。聚合查詢模式的核心思想是按照一定的關鍵字對數(shù)據(jù)進行分組,然后對分組數(shù)據(jù)進行數(shù)據(jù)的聚合、整理、處理。該模式又可以具體分為數(shù)值概要模式、倒排序索引模式、計數(shù)器模式等。聚合查詢模式—數(shù)值概要模式數(shù)值概要模式的目的是按照某些鍵值對數(shù)據(jù)進行分組,然后計算特定分組下數(shù)據(jù)的聚合值,如平均值、最大最小值、求和等。任務1:統(tǒng)計每個城市的PM2.5的最大值、最小值和平均值聚合查詢模式--數(shù)值概要模式聚合查詢模式—倒排序索引模式任務1的源代碼請參考:教材P238-239mapredstreaming\-input/CityAQ/*\-output/AQI_rlt\-mapper/codes/AQI/-mapper.py\-reducer/codes/AQI/-reducer.py聚合查詢模式—數(shù)值概要模式任務2:增加combiner,實現(xiàn)更加高效的數(shù)據(jù)統(tǒng)計工作;聚合查詢模式—倒排序索引模式任務3的源代碼請參考:教材P240但該版教材的源代碼有點小bug聚合查詢模式—倒排序索引模式倒排索引源于實際應用中需要根據(jù)屬性的值來查找記錄,這種索引表中的每一項都包括一個屬性值和具有該屬性值的記錄的地址。由于不是由記錄確定屬性值,而是由屬性值確定記錄的位置,因而稱為倒排索引(invertedindex)。帶有倒排索引的文件稱為倒排索引文件,簡稱倒排文件(invertedfile)。聚合查詢模式—倒排序索引模式任務1:構建所有城市指定年份空氣質量為優(yōu)的日期倒排序索引。輸入:城市空氣質量監(jiān)測記錄輸出:一個倒排序文件,每一條記錄格式為<城市,年份,空氣質量為優(yōu)的日期列表>為了完成該任務,在Mapper中需要把城市空氣質量符合條件的對應的日期和時刻加以記錄,并輸出。在Reducer中,其接受到的數(shù)據(jù)序列中,具有相同城市的數(shù)據(jù)相鄰,所以只要把具有相同關鍵字的鍵值對的值取出,并形成列表便可以得到空氣質量為優(yōu)的日期序列。過濾模式過濾模式(FilterPattern)或稱標準模式(CriteriaPattern)是一種設計模式,這種模式允許開發(fā)人員使用不同的標準來過濾一組對象,通過邏輯運算以解耦的方式把它們連接起來。這種類型的設計模式屬于結構型模式,它結合多個標準來獲得單一標準。過濾模式最重要的特點是保持數(shù)據(jù)的完整性,只是對記錄保留與否進行判斷,輸出滿足條件的記錄。過濾模式--過濾器模式過濾器的功能就是通過設置一定的條件篩選并輸出滿足條件的記錄,其作用類似于SQL語句中的過濾條件。在過濾器模式的實現(xiàn)中,由于讀取的記錄中已經(jīng)包含了需要的全部信息,所以簡單的過濾模式一般情況下都只需要mapper程序。很多情況下,若最終得到的數(shù)據(jù)是原來數(shù)據(jù)的子集,并且其體量也大大縮小,那么我們希望這些數(shù)據(jù)能夠輸出到一個文件中,而不是被分為多個part進行存儲,這時可以利用reduce階段的排序,再把所有數(shù)據(jù)通過一個reducer輸出到一個文件中。若是一些綜合性任務,這里的reducer還可能會在過濾的基礎上完成其他任務。過濾模式--過濾器模式任務1:空氣質量優(yōu)秀的數(shù)據(jù)檢索。小王是個運動愛好者,他幾乎每天早上6-7點之間都進行戶外運動,為此,他想看看北京市這幾年在他運動期間的空氣質量為“優(yōu)”的詳細數(shù)據(jù),請幫他從文件中找出這些數(shù)據(jù)。輸入:所有城市所有日期的空氣質量狀況及其各項指標;輸出:北京市早上六點時空氣質量為優(yōu)的數(shù)據(jù)記錄。過濾模式--過濾器模式過濾模式--TopK模式TopK模式是指按照一定的排序條件,從所有符合條件的數(shù)據(jù)中挑選出排在最前的K個元素并輸出。對于該模式,其基本處理思路是從所有mapper中輸出該節(jié)點數(shù)據(jù)的TopK,之后再reducer中對所有mapper的TopK進行匯總,輸出最終的K的元素。過濾模式--TopK模式任務1:空氣質量的比較。每年,大眾都會對哪些城市是空氣質量最好的城市爭論不休,小劉老師提了個衡量城市質量的好方法,他的方法是從全國所有城市中找出空氣質量最好的1000條記錄,再看看這1000條記錄都屬于哪些城市。很明顯,如果一個城市在前1000名最好記錄里占比最多,那么可以認為該城市的空氣質量是最好的。現(xiàn)在請你幫幫小劉老師,從所有城市空氣質量數(shù)據(jù)中找到Top1000的空氣質量數(shù)據(jù)。由于某一城市某天的空氣質量有24條監(jiān)測數(shù)據(jù),為了簡化問題,我們假定以正午,即12時的空氣質量作為當天的空氣質量衡量標準。輸入:所有城市所有的空氣質量狀況及其各項指標;輸出:所有城市12時的空氣質量Top1000的記錄。過濾模式--TopK模式分析因為數(shù)據(jù)分布在多個節(jié)點中,要選取TopK數(shù)據(jù)就必須分別從各個節(jié)點中獲取TopK數(shù)據(jù),然后在Reduce階段再進行多個K數(shù)據(jù)集的再次選擇。如果map輸出仍舊是以城市、時間為關鍵字,那么最終的數(shù)據(jù)有可能被兩個或者更多的reducer進行匯總處理,很顯然這情況下是沒有辦法找到TopK數(shù)據(jù)的。針對這一問題,有三種方法可以解決:(1)設置所有mapper的輸出具有相同的鍵值,那么所有數(shù)據(jù)都會匯總到一個reducer進行處理;(2)設置只有一個reducer,這種情況也自然可以把所有數(shù)據(jù)匯總到一個站點;(3)設置兩個reducer階段,其中第二個reducer階段設置只有一個reducer。對于數(shù)據(jù)量不大,mapper數(shù)量較少的情況,前兩種方法不會影響整體性能;若數(shù)據(jù)量大,數(shù)據(jù)的站點分布廣,那么第三種方法則會有更好的表現(xiàn)。TopK模式的應用過程中通常會用到過濾模式,或者通過分組的方式找到每組數(shù)據(jù)的TopK數(shù)據(jù),該任務的mapper階段需要先進行數(shù)據(jù)的過濾,去除非正午時間數(shù)據(jù),然后再進行各個數(shù)據(jù)節(jié)點上Top1000數(shù)據(jù)的選擇和輸出。因此,該任務結合了過濾器模式與TopK模式兩種模式的綜合應用。過濾模式--去重模式去重模式是指對輸入的數(shù)據(jù)進行對比,把其中重復的數(shù)據(jù)去除,從而保證每個關鍵字確定的記錄都只有一個。去重模式是一個運算量較大的操作,若操作方法選擇不恰當,則容易導致整個計算過程變得非常緩慢。過濾模式--去重模式任務:空氣質量上報數(shù)據(jù)的去重空氣質量監(jiān)測是一個全國性的工程,全國各地都需要定時上報具體的空氣質量監(jiān)測數(shù)據(jù)。但是在系統(tǒng)的運行初期,由于上報系統(tǒng)還不夠完善,以及網(wǎng)絡傳輸?shù)葐栴},各地經(jīng)常會出現(xiàn)重復上報空氣質量數(shù)據(jù)的情況。例如上傳數(shù)據(jù)完成了,可是當?shù)刈罱K頁面沒有收到確認信息,工作人員將再次上傳數(shù)據(jù)。這一問題在初期并沒有受到重視,但是隨著數(shù)據(jù)規(guī)范和要求越來越嚴格,現(xiàn)在希望清除歷史數(shù)據(jù)中重復上報的記錄,整理出一套不包含重復數(shù)據(jù)的完整數(shù)據(jù)集。請你幫忙完成這一工作。輸入:所有地方上報的空氣質量數(shù)據(jù),其中可能包含部分重復數(shù)據(jù);輸出:清除所有重復數(shù)據(jù)后的空氣質量數(shù)據(jù)。過濾模式--去重模式分析:數(shù)據(jù)分布存儲在各個節(jié)點上,在一個節(jié)點上運行的mapper無法獲知其包含的數(shù)據(jù)是否也存在于其他節(jié)點,只有當數(shù)據(jù)匯總到一起時才能進行唯一性判斷,也才能進行重復數(shù)據(jù)的消除。在實現(xiàn)過程中,對于map操作,若要進行數(shù)據(jù)的重復性檢驗則需要進行數(shù)據(jù)排序,然后再進行數(shù)據(jù)的遍歷,之后把數(shù)據(jù)輸出作為Reducer的輸入;在Reducer之前,MapReduce系統(tǒng)總是需要在進行一次數(shù)據(jù)的排序,所以我們可以弱化mapper的功能,讓mapper中把(日期、時刻、類型、城市、數(shù)據(jù)值)直接輸出,把排序工作交由MapReduce的Shuffle和sort。在Reduce階段,在遍歷所有數(shù)據(jù),進行重復元素的查找和去除。數(shù)據(jù)連接模式--連接操作定義所謂的數(shù)據(jù)連接操作是指一個特定數(shù)據(jù)集中的記錄只包含了所需要數(shù)據(jù)的一部分,與其相關聯(lián)的其余信息則需要利用該記錄中的一些元素值到另外一個數(shù)據(jù)集中進行檢索,并把兩部分數(shù)據(jù)合并起來組成完整的記錄。例如站點表示的空氣質量數(shù)據(jù)集中有數(shù)據(jù)“20140513,0,AQI,1013A,77”,我們需要根據(jù)其站點編號1013A從站點數(shù)據(jù)集中查找得到具體的站點信息,從而得到完整的數(shù)據(jù)“20140513,0,AQI,1013A,77,市監(jiān)測中心,天津,117.151,39.097”。數(shù)據(jù)連接模式--連接操作定義連接操作是MapReduce數(shù)據(jù)處理中最常用的操作之一,因此,本小節(jié)主要針對連接操作總結出該類程序的設計模式--連接模式。連接模式是MapReduce設計模式中最主要的模式之一,通過連接操作可以實現(xiàn)多個數(shù)據(jù)集的關聯(lián)查詢。具體而言,連接又可以分為內(nèi)連接、外連接(左外連接、右外連接,全外連接)和笛卡爾積等。數(shù)據(jù)連接模式--連接操作定義數(shù)據(jù)連接模式--連接操作定義內(nèi)連接利用站點編號進行內(nèi)連接操作時,需要根據(jù)監(jiān)測數(shù)據(jù)集中的站點編號在站點信息集中進行站點的查找,并把兩條對應的記錄整合,形成包含完整信息的監(jiān)測數(shù)據(jù)。對于監(jiān)測數(shù)據(jù)集中未能在站點信息集中找到站點信息的數(shù)據(jù),則不加入結果集中;當然,對站點信息集中存在的站點,若其未能被某一監(jiān)測數(shù)據(jù)利用上,則在結果集中也不會體現(xiàn)該數(shù)據(jù)。數(shù)據(jù)連接模式--連接操作定義外連接使用內(nèi)連接時,當碰到無法匹配的數(shù)據(jù)時便把數(shù)據(jù)丟棄,而使用外連接,則按照規(guī)則保留相關數(shù)據(jù)項。左外連接操作時,若連接操作為“A連接B”,則保留A中無法匹配的元素;右外連接操作時,若連接操作為“A連接B”,則保留B中無法匹配的元素;全外連接則保留A、B中所有無法匹配的記錄。數(shù)據(jù)連接模式--連接操作定義數(shù)據(jù)連接模式--連接操作定義數(shù)據(jù)連接模式--連接操作定義笛卡爾積笛卡爾積操作A×B則是把數(shù)據(jù)集A中的所有記錄與數(shù)據(jù)集B中的所有記錄進行匹配,從而得到新的數(shù)據(jù)集。數(shù)據(jù)連接模式--內(nèi)連接操作在進行連接操作的過程中,程序會讀取所要連接的數(shù)據(jù),并以連接操作鍵值為關鍵字輸出數(shù)據(jù);mapper輸出數(shù)據(jù)經(jīng)過reducer前的排序,可以使得具有相同連接操作鍵值的數(shù)據(jù)形成連續(xù)的輸入序列;最后,在該連續(xù)輸入序列的基礎上進行連接操作并輸出最后結果。需要注意的是,為了能夠在reducer階段區(qū)分數(shù)據(jù)記錄的來源,在mapper輸出數(shù)據(jù)時需要為兩個數(shù)據(jù)集來源的數(shù)據(jù)分別加上標識以便于區(qū)分。數(shù)據(jù)連接模式--內(nèi)連接操作數(shù)據(jù)連接模式--內(nèi)連接操作數(shù)據(jù)連接模式--內(nèi)連接操作數(shù)據(jù)連接模式--內(nèi)連接操作實現(xiàn)任務:空氣質量數(shù)據(jù)完整信息構造與輸出在上述站點數(shù)據(jù)中進行查詢,輸出福州地區(qū)所有站點的空氣質量數(shù)據(jù),以作為對福州地區(qū)空氣質量進行針對性研究的數(shù)據(jù)基礎。后續(xù)查詢以兩個數(shù)據(jù)集連接操作結果為基礎。輸入:站點監(jiān)測數(shù)據(jù)集以及站點列表。輸出:空氣質量站點檢測數(shù)據(jù),其中站點信息完整。需要注意的是,在該任務中,只需要輸出福州地區(qū)的站點監(jiān)測數(shù)據(jù),所以,還需要在mapper中進行數(shù)據(jù)過濾,以減少需要傳輸和連接的數(shù)據(jù)量,即把投影提到連接前完成。用MRJob庫編寫MapReduce程序06用MRJob庫編寫MapReduce程序MRJob是一個Python庫,實現(xiàn)了Hadoop的MapReduce操作。它封裝了Hadoopstreaming,可以用全Python腳本實現(xiàn)HadoopMapReduce計算,甚至可以在沒有Hadoop的環(huán)境下完成測試,并允許用戶將mapper和reducer寫進一個類里。以下是MRJob的一些特性,這些特性使得利用MRJob編寫MapReduce作業(yè)更容易:將一個作業(yè)的所有MapReduce代碼保存在一個類中;易于上載和安裝程序代碼和數(shù)據(jù)依賴項;簡單的一行代碼便可實現(xiàn)輸入和輸出格式的切換;自動下載并分析Python執(zhí)行的錯誤日志;允許在Python代碼之前或之后放置命令行過濾器。第一個MRJob程序步驟1:安裝MRJobMRJob的安裝由兩種方式,一種是利用pip進行MRJob庫的安裝:$pipinstallmrjob或pip3installmrjob或者從git上下載源碼再進行安裝:$pythonsetup.pytest&&pythonsetup.pyinstall安裝完成后,啟動Python,并輸入importmrjob,若可以成功引入,則說明Python環(huán)境下已經(jīng)成功引入MRJob。設置好MRJob后,接下來便可以編寫第一個MRJob程序了。第一個MRJob程序步驟2:編寫MRJob程序作為第一個MRJob程序,我們還是以wordCount為例。首先,建立文件mr_word_count.py,并輸入如下Python代碼。第一個MRJob程序步驟3運行第一個MRJob程序在mr_word_count.py文件所在文件夾運行該程序,其中input.txt可以是自己編寫的一個文本文件,也可以是系統(tǒng)中任意一個文本文件(輸出會因為輸入的input.txt內(nèi)容不同而不同)。第一個MRJob程序步驟4理解第一個MRJob程序利用MRJob編寫Python的MapReduce程序時,一個job就是一個繼承自MRJob的一個類,同時在該類中定義了job的具體步驟。一個job可以包括mapper、combiner、reducer三個步驟,他們分別對應mapper(),combiner()和reducer()函數(shù)。當然在具體的MRJob程序中,這些步驟都是可選項,用戶可以根據(jù)需要只定義其中的一個或幾個步驟。第一個MRJob程序步驟4理解第一個MRJob程序mapper()方法輸入為<key,value>,在這里,key被忽略了,所以key的位置用一個占位符“_”替代。mapper()的輸出yield產(chǎn)生的<key,value>數(shù)據(jù)對。輸出是利用split()分割得到的每個單詞與其值1所形成的元組序列。reducer()方法的輸入為<key,values>,其中的key是mapper輸出的key,而values則不是單個value,而是一個可迭代對象。在上述第一個MRJob程序中,其輸出為<key,sum(values)>,其中sum(values)是計算values迭代對象中數(shù)據(jù)之和。從這里可以看到,MRJob的reducer輸入與Hadoop-streaming的reducer輸入是不同的,后者的輸入是按照key排序后的<key,value>序列,不是一個可迭代的value集合。最后是把該程序作為一個job運行的啟動語句”MRWordCount.run()”。第一個MRJob程序步驟5運行MRJob:假設在同級文件夾下存在一個input.txt文件,那么其運行可以是如下幾種方式之一。方式1:$pythonmy_job.pyinput.txt方式2:$pythonmy_job.py<input.txt若有多個文件都是該MRJob的數(shù)據(jù)來源,那么可以在程序后列出所有輸入文件,其形式如下所示:方式3:$pythonmy_job.pyinput1.txtinput2.txt還可以結合文件輸入和STDIN輸入的方式,例如方式4中input1.txt和input2.txt是文件輸入,而input3.txt則是STDIN的方式輸入。方式4:$pythonmy_job.pyinput1.txtinput2.txt-<input3.txt第一個MRJob程序步驟6運行模式:缺省情況下,MRJob是以單Python線程的方式運行,這種運行方式提供了友好的程序調(diào)試環(huán)境,但這并不是分布式的程序執(zhí)行方式。MRJob程序執(zhí)行方法可以通過“-r”或者“--runner”來指定,具體執(zhí)行方式包括:“-rinline”:缺省方式為單線程方式,通常用在程序的調(diào)試階段;“-rlocal”:以多處理器方式運行MRJob,這種方式可以模擬部分Hadoop特性;“-rhadoop”:在Hadoop集群上運行該MRJob,在這種方式下,其數(shù)據(jù)文件的來源可以指定為hdfs文件系統(tǒng)中的數(shù)據(jù)文件;如:python3max_word_count.py-rhadoophdfs:///data-ohdfs:///user注:hdfs中不能預先有/user文件夾第一個MRJob程序步驟7為了能夠輸出出現(xiàn)次數(shù)最多的單詞,同時避免標點符號對單詞分割的影響,需要對上述程序進行三個方面的改進。首先,利用正則表達式的方式進行單詞的匹配,從而消除標點符號等因素對單詞提取的影響;其次,在第一個reducer輸出單詞和出現(xiàn)次數(shù)的基礎上又添加了一個reducer,從而形成多次規(guī)約的程序結構;最后,利用combiner進行程序的優(yōu)化,使得mapper所在節(jié)點能夠對數(shù)據(jù)進行初步的統(tǒng)計,從而減少數(shù)據(jù)的通信量。具體實現(xiàn)請參考教材7.8.1MRJob應用詳解通過編寫第一個MRJob程序可知,為了定義一個MapReduce程序,只需要重載MRJob中的mapper、combiner和reducer,并且重載steps函數(shù)來定義MRJob的具體處理流程。在MRJob中,除了上述四個可被重載的函數(shù)外,還有如表7.14的函數(shù)可以被重載以定義用戶需要的處理流程MRJob應用詳解MRJob應用詳解任務的初始化與釋放(SetupandTeardown)Python腳本在每個任務中都會被HadoopStreaming調(diào)用,開始Python程序后,從STDIN輸入數(shù)據(jù)給Python。Mrjob將會在每個任務初始化時調(diào)用*_init()方法,在每個任務結束時調(diào)用*_final()方法。與之對應的方法有:mapper_init()combiner_init()reducer_init()mapper_final()combiner_final()reducer_final()如果在一個任務中需要預先加載一些文件(例如用到sqlite數(shù)據(jù)庫)或者創(chuàng)建臨時文件等操作,就可以使用這些方法的重載來完成。MRJob應用詳解Shell命令作為任務(SetupandTeardown)Linux的shell提供了大量實用的小工具,例如grep、wc等,當某個任務的工作正好與這些工具的功能相同時,便可以通過將某個步驟指定為shell命令,從而完全放棄該步驟的腳本,而利用shell命令代替之。Mrjob提供了支持這種實現(xiàn)的接口,如果需要這樣做,可以用mapper_cmd、combiner_cmd或reducer_cmd作為MRStep的參數(shù),或者重載MRJob中對應的方法mapper_cmd()、combiner_cmd()、和reducer_cmd()。需要注意的是:缺省的”-rinline”運行Python方式不支持”*_cmd()”模式,若要在本地使用該命令,需要指定運行方式為”-rlocal”。MRJob應用詳解利用shell命令指定過濾器MRStep允許用戶為任務的輸入指定一個過濾器,在數(shù)據(jù)送達具體任務前對數(shù)據(jù)進行過濾。其設定方法可以是為MRStep設定mapper_pre_filter和reducer_pre_filter的參數(shù),也可以通過在MRJob中重載mapper_pre_filter()方法和reducer_pre_filter()方

溫馨提示

  • 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯(lián)系上傳者。文件的所有權益歸上傳用戶所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內(nèi)容里面會有圖紙預覽,若沒有圖紙預覽就沒有圖紙。
  • 4. 未經(jīng)權益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
  • 5. 人人文庫網(wǎng)僅提供信息存儲空間,僅對用戶上傳內(nèi)容的表現(xiàn)方式做保護處理,對用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對任何下載內(nèi)容負責。
  • 6. 下載文件中如有侵權或不適當內(nèi)容,請與我們聯(lián)系,我們立即糾正。
  • 7. 本站不保證下載資源的準確性、安全性和完整性, 同時也不承擔用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。

評論

0/150

提交評論