hadoop進(jìn)行分布式并行編程第二部分_第1頁
hadoop進(jìn)行分布式并行編程第二部分_第2頁
hadoop進(jìn)行分布式并行編程第二部分_第3頁
hadoop進(jìn)行分布式并行編程第二部分_第4頁
hadoop進(jìn)行分布式并行編程第二部分_第5頁
已閱讀5頁,還剩8頁未讀 繼續(xù)免費(fèi)閱讀

下載本文檔

版權(quán)說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請進(jìn)行舉報(bào)或認(rèn)領(lǐng)

文檔簡介

1、用 Hadoop 進(jìn)行分布式并行編程, 第 2 部分程序?qū)嵗c分析曹 羽中 (caoyuz), 軟件工程師, IBM中國開發(fā)中心簡介: Hadoop 是一個實(shí)現(xiàn)了 MapReduce 計(jì)算模型的開源分布式并行編程框架,借助于 Hadoop, 程序員可以輕松地編寫分布式并行程序,將其運(yùn)行于計(jì)算機(jī)集群上,完成海量數(shù)據(jù)的計(jì)算。在本文中,詳細(xì)介紹了如何針對一個具體的并行計(jì)算任務(wù),基于 Hadoop 編寫程序,如何使用 IBM MapReduce Tools 在 Eclipse 環(huán)境中編譯并運(yùn)行 Hadoop 程序。本文的標(biāo)簽:  eclipse下開發(fā)hadoop

2、, mapreduce標(biāo)記本文!發(fā)布日期: 2008 年 5 月 22 日 級別: 初級 訪問情況 : 11069 次瀏覽 評論: 1 (查看 | 添加評論 - 登錄) 平均分 (12個評分)為本文評分前言在上一篇文章:“用 Hadoop 進(jìn)行分布式并行編程 第一部分 基本概念與安裝部署”中,介紹了 MapReduce 計(jì)算模型,分布式文件系統(tǒng) HDFS,分布式并行計(jì)算等的基本原理, 并且詳細(xì)介紹了如何安裝 Hadoop,如何運(yùn)行基于 Hadoop 的并行程序。在本

3、文中,將針對一個具體的計(jì)算任務(wù),介紹如何基于 Hadoop 編寫并行程序,如何使用 IBM 開發(fā)的 Hadoop Eclipse plugin 在 Eclipse 環(huán)境中編譯并運(yùn)行程序?;仨撌追治?WordCount 程序我們先來看看 Hadoop 自帶的示例程序 WordCount,這個程序用于統(tǒng)計(jì)一批文本文件中單詞出現(xiàn)的頻率,完整的代碼可在下載的 Hadoop 安裝包中得到(在 src/examples 目錄中)。1.實(shí)現(xiàn)Map類見代碼清單1。這個類實(shí)現(xiàn) Mapper 接口中的 map 方法,輸入?yún)?shù)中的 value 是文本文件中的一行,利用 StringTokenizer 將這個字符串拆

4、成單詞,然后將輸出結(jié)果 <單詞,1> 寫入到 org.apache.hadoop.mapred.OutputCollector 中。OutputCollector 由 Hadoop 框架提供, 負(fù)責(zé)收集 Mapper 和 Reducer 的輸出數(shù)據(jù),實(shí)現(xiàn) map 函數(shù)和 reduce 函數(shù)時(shí),只需要簡單地將其輸出的 <key,value> 對往 OutputCollector 中一丟即可,剩余的事框架自會幫你處理好。代碼中 LongWritable, IntWritable, Text 均是 Hadoop 中實(shí)現(xiàn)的用于封裝 Java 數(shù)據(jù)類型的類,這些類都能夠被串行化從

5、而便于在分布式環(huán)境中進(jìn)行數(shù)據(jù)交換,你可以將它們分別視為 long, int, String 的替代品。Reporter 則可用于報(bào)告整個應(yīng)用的運(yùn)行進(jìn)度,本例中未使用。代碼清單1 public static class MapClass extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public

6、 void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException String line = value.toString(); StringTokenizer itr = new StringTokenizer(line); while (itr.hasMoreTokens() word.set(itr.nextToken(); output.collect(word, one); 2.實(shí)現(xiàn) Reduce

7、類見代碼清單 2。這個類實(shí)現(xiàn) Reducer 接口中的 reduce 方法, 輸入?yún)?shù)中的 key, values 是由 Map 任務(wù)輸出的中間結(jié)果,values 是一個 Iterator, 遍歷這個 Iterator, 就可以得到屬于同一個 key 的所有 value. 此處,key 是一個單詞,value 是詞頻。只需要將所有的 value 相加,就可以得到這個單詞的總的出現(xiàn)次數(shù)。代碼清單 2 public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, Int

8、Writable> public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException int sum = 0; while (values.hasNext() sum += values.next().get(); output.collect(key, new IntWritable(sum); 3.運(yùn)行 Job在 Hadoop 中一次計(jì)算任務(wù)稱之為一

9、個 job, 可以通過一個 JobConf 對象設(shè)置如何運(yùn)行這個 job。此處定義了輸出的 key 的類型是 Text, value 的類型是 IntWritable, 指定使用代碼清單1中實(shí)現(xiàn)的 MapClass 作為 Mapper 類,使用代碼清單2中實(shí)現(xiàn)的 Reduce 作為 Reducer 類和 Combiner 類, 任務(wù)的輸入路徑和輸出路徑由命令行參數(shù)指定,這樣 job 運(yùn)行時(shí)會處理輸入路徑下的所有文件,并將計(jì)算結(jié)果寫到輸出路徑下。然后將 JobConf 對象作為參數(shù),調(diào)用 JobClient 的 runJob, 開始執(zhí)行這個計(jì)算任務(wù)。至于 main 方法中使用的 ToolRunn

10、er 是一個運(yùn)行 MapReduce 任務(wù)的輔助工具類,依樣畫葫蘆用之即可。代碼清單 3 public int run(String args) throws Exception JobConf conf = new JobConf(getConf(), WordCount.class); conf.setJobName("wordcount"); conf.setOutputKeyClass(Text.class); conf.setOutputValueClass(IntWritable.class); conf.setMapperClass(MapClass.clas

11、s); conf.setCombinerClass(Reduce.class); conf.setReducerClass(Reduce.class); conf.setInputPath(new Path(args0); conf.setOutputPath(new Path(args1); JobClient.runJob(conf); return 0; public static void main(String args) throws Exception if(args.length != 2) System.err.println("Usage: WordCount &

12、lt;input path> <output path>"); System.exit(-1); int res = ToolRunner.run(new Configuration(), new WordCount(), args); System.exit(res); 以上就是 WordCount 程序的全部細(xì)節(jié),簡單到讓人吃驚,您都不敢相信就這么幾行代碼就可以分布式運(yùn)行于大規(guī)模集群上,并行處理海量數(shù)據(jù)集。4. 通過 JobConf 定制計(jì)算任務(wù)通過上文所述的 JobConf 對象,程序員可以設(shè)定各種參數(shù),定制如何完成一個計(jì)算任務(wù)。這些參數(shù)很多情況下就是一個 j

13、ava 接口,通過注入這些接口的特定實(shí)現(xiàn),可以定義一個計(jì)算任務(wù)( job )的全部細(xì)節(jié)。了解這些參數(shù)及其缺省設(shè)置,您才能在編寫自己的并行計(jì)算程序時(shí)做到輕車熟路,游刃有余,明白哪些類是需要自己實(shí)現(xiàn)的,哪些類用 Hadoop 的缺省實(shí)現(xiàn)即可。表一是對 JobConf 對象中可以設(shè)置的一些重要參數(shù)的總結(jié)和說明,表中第一列中的參數(shù)在 JobConf 中均會有相應(yīng)的 get/set 方法,對程序員來說,只有在表中第三列中的缺省值無法滿足您的需求時(shí),才需要調(diào)用這些 set 方法,設(shè)定合適的參數(shù)值,實(shí)現(xiàn)自己的計(jì)算目的。針對表格中第一列中的接口,除了第三列的缺省實(shí)現(xiàn)之外,Hadoop 通常還會有一些其它的實(shí)現(xiàn)

14、,我在表格第四列中列出了部分,您可以查閱 Hadoop 的 API 文檔或源代碼獲得更詳細(xì)的信息,在很多的情況下,您都不用實(shí)現(xiàn)自己的 Mapper 和 Reducer, 直接使用 Hadoop 自帶的一些實(shí)現(xiàn)即可。表一 JobConf 常用可定制參數(shù)參數(shù)作用缺省值其它實(shí)現(xiàn)InputFormat將輸入的數(shù)據(jù)集切割成小數(shù)據(jù)集 InputSplits, 每一個 InputSplit 將由一個 Mapper 負(fù)責(zé)處理。此外 InputFormat 中還提供一個 RecordReader 的實(shí)現(xiàn), 將一個 InputSplit 解析成 <key,value> 對提供給 map 函數(shù)。Text

15、InputFormat(針對文本文件,按行將文本文件切割成 InputSplits, 并用 LineRecordReader 將 InputSplit 解析成 <key,value> 對,key 是行在文件中的位置,value 是文件中的一行)SequenceFileInputFormatOutputFormat提供一個 RecordWriter 的實(shí)現(xiàn),負(fù)責(zé)輸出最終結(jié)果TextOutputFormat(用 LineRecordWriter 將最終結(jié)果寫成純文件文件,每個 <key,value> 對一行,key 和 value 之間用 tab 分隔)SequenceFi

16、leOutputFormatOutputKeyClass輸出的最終結(jié)果中 key 的類型LongWritableOutputValueClass輸出的最終結(jié)果中 value 的類型TextMapperClassMapper 類,實(shí)現(xiàn) map 函數(shù),完成輸入的 <key,value> 到中間結(jié)果的映射IdentityMapper(將輸入的 <key,value> 原封不動的輸出為中間結(jié)果)LongSumReducer,LogRegexMapper,InverseMapperCombinerClass實(shí)現(xiàn) combine 函數(shù),將中間結(jié)果中的重復(fù) key 做合并null(不

17、對中間結(jié)果中的重復(fù) key 做合并)ReducerClassReducer 類,實(shí)現(xiàn) reduce 函數(shù),對中間結(jié)果做合并,形成最終結(jié)果IdentityReducer(將中間結(jié)果直接輸出為最終結(jié)果)AccumulatingReducer, LongSumReducerInputPath設(shè)定 job 的輸入目錄, job 運(yùn)行時(shí)會處理輸入目錄下的所有文件nullOutputPath設(shè)定 job 的輸出目錄,job 的最終結(jié)果會寫入輸出目錄下nullMapOutputKeyClass設(shè)定 map 函數(shù)輸出的中間結(jié)果中 key 的類型如果用戶沒有設(shè)定的話,使用 OutputKeyClassMapOu

18、tputValueClass設(shè)定 map 函數(shù)輸出的中間結(jié)果中 value 的類型如果用戶沒有設(shè)定的話,使用 OutputValuesClassOutputKeyComparator對結(jié)果中的 key 進(jìn)行排序時(shí)的使用的比較器WritableComparablePartitionerClass對中間結(jié)果的 key 排序后,用此 Partition 函數(shù)將其劃分為R份,每份由一個 Reducer 負(fù)責(zé)處理。HashPartitioner(使用 Hash 函數(shù)做 partition)KeyFieldBasedPartitioner PipesPartitioner回頁首改進(jìn)的 WordCount

19、程序現(xiàn)在你對 Hadoop 并行程序的細(xì)節(jié)已經(jīng)有了比較深入的了解,我們來把 WordCount 程序改進(jìn)一下,目標(biāo): (1)原 WordCount 程序僅按空格切分單詞,導(dǎo)致各類標(biāo)點(diǎn)符號與單詞混雜在一起,改進(jìn)后的程序應(yīng)該能夠正確的切出單詞,并且單詞不要區(qū)分大小寫。(2)在最終結(jié)果中,按單詞出現(xiàn)頻率的降序進(jìn)行排序。1.修改 Mapper 類,實(shí)現(xiàn)目標(biāo)(1)實(shí)現(xiàn)很簡單,見代碼清單4中的注釋。代碼清單 4 public static class MapClass extends MapReduceBase implements Mapper<LongWritable, Text, Text,

20、IntWritable> private final static IntWritable one = new IntWritable(1); private Text word = new Text(); private String pattern="w" /正則表達(dá)式,代表不是0-9, a-z, A-Z的所有其它字符 public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOE

21、xception String line = value.toString().toLowerCase(); /全部轉(zhuǎn)為小寫字母 line = line.replaceAll(pattern, " "); /將非0-9, a-z, A-Z的字符替換為空格 StringTokenizer itr = new StringTokenizer(line); while (itr.hasMoreTokens() word.set(itr.nextToken(); output.collect(word, one); 2.實(shí)現(xiàn)目標(biāo)(2)用一個并行計(jì)算任務(wù)顯然是無法同時(shí)完成單詞詞頻統(tǒng)計(jì)

22、和排序的,這時(shí)我們可以利用 Hadoop 的任務(wù)管道能力,用上一個任務(wù)(詞頻統(tǒng)計(jì))的輸出做為下一個任務(wù)(排序)的輸入,順序執(zhí)行兩個并行計(jì)算任務(wù)。主要工作是修改代碼清單3中的 run 函數(shù),在其中定義一個排序任務(wù)并運(yùn)行之。在 Hadoop 中要實(shí)現(xiàn)排序是很簡單的,因?yàn)樵?MapReduce 的過程中,會把中間結(jié)果根據(jù) key 排序并按 key 切成 R 份交給 R 個 Reduce 函數(shù),而 Reduce 函數(shù)在處理中間結(jié)果之前也會有一個按 key 進(jìn)行排序的過程,故 MapReduce 輸出的最終結(jié)果實(shí)際上已經(jīng)按 key 排好序。詞頻統(tǒng)計(jì)任務(wù)輸出的 key 是單詞,value 是詞頻,為了實(shí)現(xiàn)

23、按詞頻排序,我們指定使用 InverseMapper 類作為排序任務(wù)的 Mapper 類( sortJob.setMapperClass(InverseMapper.class );),這個類的 map 函數(shù)簡單地將輸入的 key 和 value 互換后作為中間結(jié)果輸出,在本例中即是將詞頻作為 key,單詞作為 value 輸出, 這樣自然就能得到按詞頻排好序的最終結(jié)果。我們無需指定 Reduce 類,Hadoop 會使用缺省的 IdentityReducer 類,將中間結(jié)果原樣輸出。還有一個問題需要解決: 排序任務(wù)中的 Key 的類型是 IntWritable, (sortJob.setOu

24、tputKeyClass(IntWritable.class), Hadoop 默認(rèn)對 IntWritable 按升序排序,而我們需要的是按降序排列。因此我們實(shí)現(xiàn)了一個 IntWritableDecreasingComparator 類,并指定使用這個自定義的 Comparator 類對輸出結(jié)果中的 key (詞頻)進(jìn)行排序:sortJob.setOutputKeyComparatorClass(IntWritableDecreasingComparator.class)詳見代碼清單 5 及其中的注釋。代碼清單 5 public int run(String args) throws Exce

25、ption Path tempDir = new Path("wordcount-temp-" + Integer.toString( new Random().nextInt(Integer.MAX_VALUE); /定義一個臨時(shí)目錄 JobConf conf = new JobConf(getConf(), WordCount.class); try conf.setJobName("wordcount"); conf.setOutputKeyClass(Text.class); conf.setOutputValueClass(IntWritabl

26、e.class); conf.setMapperClass(MapClass.class); conf.setCombinerClass(Reduce.class); conf.setReducerClass(Reduce.class); conf.setInputPath(new Path(args0); conf.setOutputPath(tempDir); /先將詞頻統(tǒng)計(jì)任務(wù)的輸出結(jié)果寫到臨時(shí)目 /錄中, 下一個排序任務(wù)以臨時(shí)目錄為輸入目錄。 conf.setOutputFormat(SequenceFileOutputFormat.class); JobClient.runJob(c

27、onf); JobConf sortJob = new JobConf(getConf(), WordCount.class); sortJob.setJobName("sort"); sortJob.setInputPath(tempDir); sortJob.setInputFormat(SequenceFileInputFormat.class); sortJob.setMapperClass(InverseMapper.class); sortJob.setNumReduceTasks(1); /將 Reducer 的個數(shù)限定為1, 最終輸出的結(jié)果 /文件就是一個。

28、 sortJob.setOutputPath(new Path(args1); sortJob.setOutputKeyClass(IntWritable.class); sortJob.setOutputValueClass(Text.class); sortJob.setOutputKeyComparatorClass(IntWritableDecreasingComparator.class); JobClient.runJob(sortJob); finally FileSystem.get(conf).delete(tempDir);/刪除臨時(shí)目錄 return 0; private

29、 static class IntWritableDecreasingComparator extends IntWritable.Comparator public int compare(WritableComparable a, WritableComparable b) return -pare(a, b); public int compare(byte b1, int s1, int l1, byte b2, int s2, int l2) return -pare(b1, s1, l1, b2, s2, l2); 回頁首在 Eclipse 環(huán)境下進(jìn)行開發(fā)和調(diào)試在 Eclipse

30、環(huán)境下可以方便地進(jìn)行 Hadoop 并行程序的開發(fā)和調(diào)試。推薦使用 IBM MapReduce Tools for Eclipse, 使用這個 Eclipse plugin 可以簡化開發(fā)和部署 Hadoop 并行程序的過程?;谶@個 plugin, 可以在 Eclipse 中創(chuàng)建一個 Hadoop MapReduce 應(yīng)用程序,并且提供了一些基于 MapReduce 框架的類開發(fā)的向?qū)В梢源虬?JAR 文件,部署一個 Hadoop MapReduce 應(yīng)用程序到一個 Hadoop 服務(wù)器(本地和遠(yuǎn)程均可),可以通過一個專門的視圖 ( perspective ) 查看 Hadoop 服務(wù)器、

31、Hadoop 分布式文件系統(tǒng)( DFS )和當(dāng)前運(yùn)行的任務(wù)的狀態(tài)??稍?IBM alphaWorks 網(wǎng)站下載這個 MapReduce Tool, 或在本文的下載清單中下載。將下載后的壓縮包解壓到你 Eclipse 安裝目錄,重新啟動 Eclipse 即可使用了。設(shè)置 Hadoop 主目錄點(diǎn)擊 Eclipse 主菜單上 Windows->Preferences, 然后在左側(cè)選擇 Hadoop Home Directory,設(shè)定你的 Hadoop 主目錄,如圖一所示:圖 1 創(chuàng)立一個 MapReduce Project點(diǎn)擊 Eclipse 主菜單上 File->N

32、ew->Project, 在彈出的對話框中選擇 MapReduce Project, 輸入 project name 如 wordcount,然后點(diǎn)擊 Finish 即可。,如圖 2 所示:圖 2 此后,你就可以象一個普通的 Eclipse Java project 那樣,添加入 Java 類,比如你可以定義一個 WordCount 類,然后將本文代碼清單1,2,3中的代碼寫到此類中,添加入必要的 import 語句 ( Eclipse 快捷鍵 ctrl+shift+o 可以幫你),即可形成一個完整的 wordcount 程序。在我們這個簡單的 wordcount 程序中,我們把全部的內(nèi)容都放在一個 WordCount 類中。實(shí)際上 IBM MapReduce tools 還提供了幾個實(shí)用的向?qū)?( wizard ) 工具,幫你創(chuàng)建單獨(dú)的 Mapper 類,Reducer 類,MapReduce Driver 類(就是代碼清單3中那部分內(nèi)容),在編寫比較復(fù)雜的 MapReduce 程序時(shí),將這些類獨(dú)立出來是非常有必要的,也有利于在不同的計(jì)算任務(wù)中重用你編寫的各種 Mapper 類和 Reducer 類。在 Eclip

溫馨提示

  • 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內(nèi)容里面會有圖紙預(yù)覽,若沒有圖紙預(yù)覽就沒有圖紙。
  • 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
  • 5. 人人文庫網(wǎng)僅提供信息存儲空間,僅對用戶上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對任何下載內(nèi)容負(fù)責(zé)。
  • 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請與我們聯(lián)系,我們立即糾正。
  • 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時(shí)也不承擔(dān)用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。

評論

0/150

提交評論