版權(quán)說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請進(jìn)行舉報(bào)或認(rèn)領(lǐng)
文檔簡介
1、用Hadoop進(jìn)行分布式并行編程(二)(注:本文檔來自hadoop in china)程序?qū)嵗c分析Hadoop 是一個(gè)實(shí)現(xiàn)了MapReduce 計(jì)算模型的開源分布式并行編程框架,借助于Hadoop, 程序員可以輕松地編寫分布式并行程序,將其運(yùn)行于計(jì)算機(jī)集群上,完成海量數(shù)據(jù)的計(jì)算。在本文中,詳細(xì)介紹了如何針對一個(gè)具體的并行計(jì)算任務(wù),基于Hadoop 編寫程序,如何使用 IBM MapReduce Tools 在 Eclipse 環(huán)境中編譯并運(yùn)行 Hadoop 程序。前言在上一篇文章:“用 Hadoop 進(jìn)行分布式并行編程 第一部分 基本概念與安裝部署”中,介紹了 MapReduce 計(jì)算模型,
2、分布式文件系統(tǒng) HDFS,分布式并行計(jì)算等的基本原理, 并且詳細(xì)介紹了如何安裝 Hadoop,如何運(yùn)行基于 Hadoop 的并行程序。在本文中,將針對一個(gè)具體的計(jì)算任務(wù),介紹如何基于 Hadoop 編寫并行程序,如何使用 IBM 開發(fā)的 Hadoop Eclipse plugin 在 Eclipse 環(huán)境中編譯并運(yùn)行程序。分析 WordCount 程序我們先來看看 Hadoop 自帶的示例程序 WordCount, 這個(gè)程序用于統(tǒng)計(jì)一批文本文件中單詞出現(xiàn)的頻率,完整的代碼可在下載的 Hadoop 安裝包中得到(在 src/examples 目錄中)。1.實(shí)現(xiàn)Map類見代碼清單1。這個(gè)類實(shí)現(xiàn) M
3、apper 接口中的 map 方法,輸入?yún)?shù)中的 value 是文本文件中的一行,利用 StringTokenizer 將這個(gè)字符串拆成單詞,然后將輸出結(jié)果 <單詞,1> 寫入到 org.apache.hadoop.mapred.OutputCollector 中。OutputCollector 由 Hadoop 框架提供, 負(fù)責(zé)收集 Mapper 和 Reducer 的輸出數(shù)據(jù),實(shí)現(xiàn) map 函數(shù)和 reduce 函數(shù)時(shí),只需要簡單地將其輸出的 <key,value> 對往 OutputCollector 中一丟即可,剩余的事框架自會幫你處理好。代碼中 LongWri
4、table, IntWritable, Text 均是 Hadoop 中實(shí)現(xiàn)的用于封裝 Java 數(shù)據(jù)類型的類,這些類都能夠被串行化從而便于在分布式環(huán)境中進(jìn)行數(shù)據(jù)交換,你可以將它們分別視為 long, int, String 的替代品。Reporter 則可用于報(bào)告整個(gè)應(yīng)用的運(yùn)行進(jìn)度,本例中未使用。代碼清單1public static class MapClass extends MapReduceBase implements Mapper<LongWritable, Text, Text,
5、IntWritable> private final static IntWritable one = new IntWritable(1); private Text word = new Text(); pub
6、lic void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException String line = value.toStrin
7、g(); StringTokenizer itr = new StringTokenizer(line); while (itr.hasMoreTokens() word.set(itr.nextToken(); output.collect(word, one); 2.實(shí)現(xiàn)Reduce類見代碼清單2。這個(gè)類實(shí)現(xiàn) Reducer 接口中的
8、reduce 方法, 輸入?yún)?shù)中的 key, values 是由 Map 任務(wù)輸出的中間結(jié)果,values 是一個(gè) Iterator, 遍歷這個(gè) Iterator, 就可以得到屬于同一個(gè) key 的所有 value. 此處,key 是一個(gè)單詞,value 是詞頻。只需要將所有的 value 相加,就可以得到這個(gè)單詞的總的出現(xiàn)次數(shù)。代碼清單 2public static class Reduce extends MapReduceBase implements Reducer<Text, IntW
9、ritable, Text, IntWritable> public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Report
10、er reporter) throws IOException int sum = 0; while (values.hasNext() output.collect(key, new IntWritable(sum); sum += values.next().get(); 3.運(yùn)行Job在 Hado
11、op 中一次計(jì)算任務(wù)稱之為一個(gè) job, 可以通過一個(gè) JobConf 對象設(shè)置如何運(yùn)行這個(gè) job。此處定義了輸出的 key 的類型是 Text, value 的類型是 IntWritable, 指定使用代碼清單1中實(shí)現(xiàn)的 MapClass 作為 Mapper 類,使用代碼清單2中實(shí)現(xiàn)的 Reduce 作為 Reducer 類和 Combiner 類, 任務(wù)的輸入路徑和輸出路徑由命令行參數(shù)指定,這樣 job 運(yùn)行時(shí)會處理輸入路徑下的所有文件,并將計(jì)算結(jié)果寫到輸出路徑下。然后將 JobConf 對象作為參數(shù),調(diào)用 JobClient 的 runJob, 開始執(zhí)行這個(gè)計(jì)算任務(wù)。至于 main 方
12、法中使用的 ToolRunner 是一個(gè)運(yùn)行 MapReduce 任務(wù)的輔助工具類,依樣畫葫蘆用之即可。代碼清單 3public int run(String args) throws Exception JobConf conf = new JobConf(getConf(), WordCount.class); conf.setJobName("wordcount"); conf.setOutputKeyClass(Text.class); conf.setOutputValueClass(IntWritable.c
13、lass); conf.setMapperClass(MapClass.class); conf.setCombinerClass(Reduce.class); conf.setReducerClass(Reduce.class); conf.setInputPath(new Path(args0); conf.setOutputPath(new Path(args1); JobClient.runJob(conf); return 0;public static void main(String args) throws Excep
14、tion if(args.length != 2) int res = ToolRunner.run(new Configuration(), new WordCount(), args); System.exit(res);System.err.println("Usage: WordCount <input path> <output
15、path>"); System.exit(-1);以上就是 WordCount 程序的全部細(xì)節(jié),簡單到讓人吃驚,您都不敢相信就這么幾行代碼就可以分布式運(yùn)行于大規(guī)模集群上,并行處理海量數(shù)據(jù)集。4. 通過JobConf 定制計(jì)算任務(wù)通過上文所述的 JobConf 對象,程序員可以設(shè)定各種參數(shù),定制如何完成一個(gè)計(jì)算任務(wù)。這些參數(shù)很多情況下就是一個(gè) java 接口,通過注入這些接口的特定實(shí)現(xiàn),可以定義一個(gè)計(jì)算任務(wù)( job )的全部細(xì)節(jié)。了解這些參數(shù)及其缺省設(shè)置,您才能在編寫自己的并行計(jì)算程序時(shí)做到輕車熟路,游刃有余,明白哪些類是需要自己實(shí)現(xiàn)的,哪些類用 Hado
16、op 的缺省實(shí)現(xiàn)即可。表一是對 JobConf 對象中可以設(shè)置的一些重要參數(shù)的總結(jié)和說明,表中第一列中的參數(shù)在 JobConf 中均會有相應(yīng)的 get/set 方法,對程序員來說,只有在表中第三列中的缺省值無法滿足您的需求時(shí),才需要調(diào)用這些 set 方法,設(shè)定合適的參數(shù)值,實(shí)現(xiàn)自己的計(jì)算目的。針對表格中第一列中的接口,除了第三列的缺省實(shí)現(xiàn)之外,Hadoop 通常還會有一些其它的實(shí)現(xiàn),我在表格第四列中列出了部分,您可以查閱 Hadoop 的 API 文檔或源代碼獲得更詳細(xì)的信息,在很多的情況下,您都不用實(shí)現(xiàn)自己的 Mapper 和 Reducer, 直接使用 Hadoop 自帶的一些實(shí)現(xiàn)即可。表
17、一 JobConf 常用可定制參數(shù)參數(shù)作用缺省值其它實(shí)現(xiàn)inputFormat將輸入的數(shù)據(jù)集切割成小數(shù)據(jù)集inputSplits, 每一個(gè)InputSplit將 由一個(gè)Mapper負(fù)責(zé)處理。此外inputFormat中還提供一個(gè)RecordReader的 實(shí)現(xiàn), 將一個(gè)InputSplit解 析成key,value 對提供給 map 函數(shù)。TextInputFormat(針 對文本文件,按行將文本文件切割成InputSplits, 并用LineRecordReader將InputSplit解 析成 key,value 對,key是行在文件中的位置,value是文件中的一行)SequenceFi
18、leInputFormatOutputFormat提供一個(gè) RecordWriter 的實(shí)現(xiàn),負(fù)責(zé)輸出最終結(jié)果TextOutputFormat(用 LineRecordWriter 將最終結(jié)果寫成純文件文件,每個(gè)key,value對一行,key 和 value 之間用 tab 分隔)SequenceFileOutputFormatOutputKeyClass輸出的最終結(jié)果中 key 的類型LongWritable OutputValueClass輸出的最終結(jié)果中 value 的類型Text MapperClassMapper 類,實(shí)現(xiàn) map 函數(shù),完成輸入的 key,va
19、lue 到中間結(jié)果的映射IdentityMapper(將 輸入的 key,value 原封不動的輸出為中間結(jié)果)LongSumReducer,LogRegexMapper,InverseMapperCombinerClass實(shí)現(xiàn) combine 函數(shù),將中間結(jié)果中的重復(fù) key 做合并null(不對中間結(jié)果中的重復(fù) key 做合并) ReducerClassReducer 類,實(shí)現(xiàn) reduce 函數(shù),對中間結(jié)果做合并,形成最終結(jié)果IdentityReducer(將 中間結(jié)果直接輸出為最終結(jié)果)AccumulatingReducer,LongSumReducerInputPath設(shè)定
20、 job 的輸入目錄, job 運(yùn)行時(shí)會處理輸入目錄下的所有文件null OutputPath設(shè)定 job 的輸出目錄,job 的最終結(jié)果會寫入輸出目錄下null MapOutputKeyClass設(shè)定 map 函數(shù)輸出的中間結(jié)果中 key 的類型如果用戶沒有設(shè)定的話,使用OutputKeyClass MapOutputValueClass設(shè)定 map 函數(shù)輸出的中間結(jié)果中value 的類型 如果用戶沒有設(shè)定的話,使用 OutputValuesClass OutputKeyComparator對結(jié)果中的 key 進(jìn)行排序時(shí)的使用的比較器WritableC
21、omparable PartitionerClass對中間結(jié)果的 key 排序后,用此 Partition 函數(shù)將其劃分為R份,每份由一個(gè) Reducer 負(fù)責(zé)處理。HashPartitioner(使 用 Hash 函數(shù)做 partition)KeyFieldBasedPartitionerPipesPartitioner改進(jìn)的 WordCount 程序現(xiàn)在你對 Hadoop 并行程序的細(xì)節(jié)已經(jīng)有了比較深入的了解,我們來把 WordCount 程序改進(jìn)一下,目標(biāo): (1)原 WordCount 程序僅按空格切分單詞,導(dǎo)致各類標(biāo)點(diǎn)符號與單詞混雜在一起,改進(jìn)后的程序應(yīng)該能夠正確的切出單詞
22、,并且單詞不要區(qū)分大小寫。(2)在最終結(jié)果中,按單詞出 現(xiàn)頻率的降序進(jìn)行排序。1.修改 Mapper 類,實(shí)現(xiàn)目標(biāo)(1)實(shí)現(xiàn)很簡單,見代碼清單4中 的注釋。代碼清單 4public static class MapClass extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable>
23、 private final static IntWritable one = new IntWritable(1); private Text word = new Text(); &
24、#160; private String pattern="w" /正則表達(dá)式,代表不是0-9, a-z, A-Z的所有其它字符 public void map(LongWritable key, Text value,OutputCollector<Text, IntWritable> out
25、put, Reporter reporter) throws IOException String line = value.toString().toLowerCase(); /全部轉(zhuǎn)為小寫字母
26、; line = line.replaceAll(pattern, " "); /將非0-9, a-z, A-Z的字符替換為空格 StringTokenizer itr = new StringTokenizer(line);
27、; while (itr.hasMoreTokens() word.set(itr.nextToken(); output.collect(word, one);
28、0; 2.實(shí)現(xiàn)目標(biāo)(2)用一個(gè)并行計(jì)算任務(wù)顯然是無法同時(shí)完成單詞詞頻統(tǒng)計(jì)和排序的,這時(shí)我們可以利用 Hadoop 的任務(wù)管道能力,用上一個(gè)任務(wù)(詞頻統(tǒng)計(jì))的輸出做為下一個(gè)任務(wù)(排序)的輸入,順序執(zhí)行兩個(gè)并行計(jì)算任務(wù)。主要工作是修改代碼清單3中的 run 函數(shù),在其中定義一個(gè)排序任務(wù)并運(yùn)行之。在 Hadoop 中要實(shí)現(xiàn)排序是很簡單的,因?yàn)樵?MapReduce 的過程中,會把中間結(jié)果根據(jù) key 排序并按 key 切成 R 份交給
29、R 個(gè) Reduce 函數(shù),而 Reduce 函數(shù)在處理中間結(jié)果之前也會有一個(gè)按 key 進(jìn)行排序的過程,故 MapReduce 輸出的最終結(jié)果實(shí)際上已經(jīng)按 key 排好序。詞頻統(tǒng)計(jì)任務(wù)輸出的 key 是單詞,value 是詞頻,為了實(shí)現(xiàn)按詞頻排序,我們指定使用 InverseMapper 類作為排序任務(wù)的 Mapper 類( sortJob.setMapperClass(InverseMapper.class );),這個(gè)類的 map 函數(shù)簡單地將輸入的 key 和 value 互換后作為中間結(jié)果輸出,在本例中即是將詞頻作為 key,單詞作為 value 輸出, 這樣自然就能得到按詞頻排好序
30、的最終結(jié)果。我們無需指定 Reduce 類,Hadoop 會使用缺省的 IdentityReducer 類,將中間結(jié)果原樣輸出。還有一個(gè)問題需要解決: 排序任務(wù)中的 Key 的類型是 IntWritable, (sortJob.setOutputKeyClass(IntWritable.class), Hadoop 默認(rèn)對 IntWritable 按升序排序,而我們需要的是按降序排列。因此我們實(shí)現(xiàn)了一個(gè) IntWritableDecreasingComparator 類,并指定使用這個(gè)自定義的 Comparator 類對輸出結(jié)果中的 key (詞頻)進(jìn)行排序:sortJob.setOutput
31、KeyComparatorClass(IntWritableDecreasingComparator.class)詳見代碼清單 5 及其中的注釋。代碼清單 5public int run(String args) throws Exception Path tempDir = new Path("wordcount-temp-" + Integer.toString( new Rando
32、m().nextInt(Integer.MAX_VALUE); /定義一個(gè)臨時(shí)目錄 JobConf conf = new JobConf(getConf(), WordCount.class);try conf.setJobName("wordcount"); conf.setOutputKe
33、yClass(Text.class);conf.setOutputValueClass(IntWritable.class);conf.setMapperClass(MapClass.class);conf.setCombinerClass(Reduce.class);conf.setReducerClass(Reduce.class);conf.setInputPath(new Path(args0);conf.setOutputPath(tempDir); /先將詞頻統(tǒng)計(jì)任務(wù)的輸出結(jié)果寫到臨時(shí)目/錄中, 下一個(gè)排序任務(wù)以臨時(shí)目錄為輸入目錄。conf.setOutputFormat(Sequ
34、enceFileOutputFormat.class);JobClient.runJob(conf);JobConf sortJob = new JobConf(getConf(), WordCount.class);sortJob.setJobName("sort"); sortJob.setInputPath(tempDir);sortJob.setInputFormat(SequenceFileInputFormat.class);sortJob.setMapperClass(InverseMapper.class);sortJob.setNumReduceTasks
35、(1); /將 Reducer 的個(gè)數(shù)限定為1, 最終輸出的結(jié)果/文件就是一個(gè)。 sortJob.setOutputPath(new Path(args1);sortJob.setOutputKeyClass(IntWritable.class);sortJob.setOutputValueClass(Text.class);sortJob.setOutputKeyComparatorClass(IntWritableDecreasingComparator.class);JobClient.runJob(sortJob); finally
36、160; FileSystem.get(conf).delete(tempDir); /刪除臨時(shí)目錄 return 0; private static clas
37、s IntWritableDecreasingComparator extends IntWritable.Comparator public int compare(WritableComparable a, WritableComparable b) return -pare(a, b); public int compare(byte b1, int s1, int l1, byte b2, int
38、 s2, int l2) return -pare(b1, s1, l1, b2, s2, l2); 在Eclipse 環(huán)境下進(jìn)行開發(fā)和調(diào)試在 Eclipse 環(huán)境下可以方便地進(jìn)行 Hadoop 并行程序的開發(fā)和調(diào)試。推薦使用 IBM MapReduce Tools for Eclipse, 使用這個(gè) Eclipse plugin 可以簡化開發(fā)和部署 Hadoop 并行程序的過程?;谶@個(gè) plugin, 可以在 Eclipse 中創(chuàng)建一個(gè) Hadoop MapReduce 應(yīng)用程序,并且提供了一些基于 MapReduce 框架的類開發(fā)的向?qū)?,可以打包?JAR 文件,部署一個(gè) Hadoop MapReduce 應(yīng)用程序到一個(gè) Hadoop 服務(wù)器(本地和遠(yuǎn)程均可),可以通過一個(gè)專門的視圖 ( perspective ) 查看 Hadoop 服務(wù)器、Hadoop 分布式文件系統(tǒng)( DFS )和當(dāng)前運(yùn)行的任務(wù)的狀態(tài)??稍?IBM alphaWorks 網(wǎng)站下
溫馨提示
- 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
- 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
- 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內(nèi)容里面會有圖紙預(yù)覽,若沒有圖紙預(yù)覽就沒有圖紙。
- 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
- 5. 人人文庫網(wǎng)僅提供信息存儲空間,僅對用戶上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對任何下載內(nèi)容負(fù)責(zé)。
- 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請與我們聯(lián)系,我們立即糾正。
- 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時(shí)也不承擔(dān)用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。
最新文檔
- 二零二四年無證房屋買賣合同電子化及網(wǎng)絡(luò)安全協(xié)議3篇
- 2025至2030年中國EVA拉鏈袋數(shù)據(jù)監(jiān)測研究報(bào)告
- 二零二五年度噴泉工程進(jìn)度與付款合同
- 2025年中國聚合物水泥砂漿市場調(diào)查研究報(bào)告
- 核醫(yī)學(xué)影像分析-深度研究
- 城市生態(tài)修復(fù)策略-深度研究
- 2025年出租車司機(jī)雇傭合同安全駕駛承諾書4篇
- 2025至2031年中國HID氙氣燈行業(yè)投資前景及策略咨詢研究報(bào)告
- 2025至2030年中國燃油型助力車數(shù)據(jù)監(jiān)測研究報(bào)告
- 2025至2030年中國塑膜封裝機(jī)數(shù)據(jù)監(jiān)測研究報(bào)告
- 《職業(yè)培訓(xùn)師教程》課件
- (康德一診)重慶市2025屆高三高三第一次聯(lián)合診斷檢測 英語試卷(含答案詳解)
- 行業(yè)會計(jì)比較(第三版)PPT完整全套教學(xué)課件
- 值機(jī)業(yè)務(wù)與行李運(yùn)輸實(shí)務(wù)(第3版)高職PPT完整全套教學(xué)課件
- 高考英語語法填空專項(xiàng)訓(xùn)練(含解析)
- 42式太極劍劍譜及動作說明(吳阿敏)
- 危險(xiǎn)化學(xué)品企業(yè)安全生產(chǎn)標(biāo)準(zhǔn)化課件
- 巨鹿二中骨干教師個(gè)人工作業(yè)績材料
- 《美的歷程》導(dǎo)讀課件
- 心電圖 (史上最完美)課件
- HGT 20525-2006 化學(xué)工業(yè)管式爐傳熱計(jì)算設(shè)計(jì)規(guī)定
評論
0/150
提交評論