第十一講流計(jì)算圖計(jì)算_第1頁(yè)
第十一講流計(jì)算圖計(jì)算_第2頁(yè)
第十一講流計(jì)算圖計(jì)算_第3頁(yè)
第十一講流計(jì)算圖計(jì)算_第4頁(yè)
第十一講流計(jì)算圖計(jì)算_第5頁(yè)
已閱讀5頁(yè),還剩84頁(yè)未讀 繼續(xù)免費(fèi)閱讀

下載本文檔

版權(quán)說(shuō)明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請(qǐng)進(jìn)行舉報(bào)或認(rèn)領(lǐng)

文檔簡(jiǎn)介

第十一講-流計(jì)算、圖計(jì)算圖計(jì)算流計(jì)算流計(jì)算什么是流計(jì)算流計(jì)算處理流程流計(jì)算應(yīng)用實(shí)例流計(jì)算框架–TwitterStorm流計(jì)算框架匯總參考資料流計(jì)算產(chǎn)生的背景大數(shù)據(jù)時(shí)代數(shù)據(jù)處理及業(yè)務(wù)的變化?初期:數(shù)據(jù)量小,業(yè)務(wù)簡(jiǎn)單

–少量人力、服務(wù)器就可以滿足需求?過(guò)渡期:數(shù)據(jù)量有所膨脹,業(yè)務(wù)較復(fù)雜

–需要增加大量服務(wù)器以支撐業(yè)務(wù)?大數(shù)據(jù)時(shí)期:數(shù)據(jù)量急劇膨脹,業(yè)務(wù)很復(fù)雜

–傳統(tǒng)方案扛不住,簡(jiǎn)單的增加服務(wù)器已不能滿足需求挑戰(zhàn)?數(shù)據(jù)量膨脹所帶來(lái)的質(zhì)變?個(gè)性化、實(shí)時(shí)化的需求?…什么是流計(jì)算流計(jì)算來(lái)自于一個(gè)信念:數(shù)據(jù)的價(jià)值隨著時(shí)間的流逝而降低,所以事件出現(xiàn)后必須盡快地對(duì)它們進(jìn)行處理,最好數(shù)據(jù)出現(xiàn)時(shí)便立刻對(duì)其進(jìn)行處理,發(fā)生一個(gè)事件進(jìn)行一次處理,而不是緩存起來(lái)成一批再處理。流計(jì)算的概念:?流計(jì)算是針對(duì)流式數(shù)據(jù)的實(shí)時(shí)計(jì)算。?流式數(shù)據(jù)(流數(shù)據(jù)):是指將數(shù)據(jù)看作數(shù)據(jù)流的形式來(lái)處理。數(shù)據(jù)流是在時(shí)間分布和數(shù)量上無(wú)限的一系列動(dòng)態(tài)數(shù)據(jù)集合體;數(shù)據(jù)記錄是數(shù)據(jù)流的最小組成單元。?流數(shù)據(jù)具有數(shù)據(jù)實(shí)時(shí)持續(xù)不斷到達(dá)、到達(dá)次序獨(dú)立、數(shù)據(jù)來(lái)源眾多格式復(fù)雜、數(shù)據(jù)規(guī)模大且不十分關(guān)注存儲(chǔ)、注重?cái)?shù)據(jù)的整體價(jià)值而不關(guān)注個(gè)別數(shù)據(jù)等特點(diǎn)。什么是流計(jì)算流計(jì)算應(yīng)用場(chǎng)景?流計(jì)算是針對(duì)流數(shù)據(jù)的實(shí)時(shí)計(jì)算,其主要應(yīng)用在于產(chǎn)生大量流數(shù)據(jù)、同時(shí)對(duì)實(shí)時(shí)性要求高的領(lǐng)域。?流計(jì)算一方面可應(yīng)用于處理金融服務(wù)如股票交易、銀行交易等產(chǎn)生的大量實(shí)時(shí)數(shù)據(jù)。?另一方面流計(jì)算主要應(yīng)用于各種實(shí)時(shí)Web服務(wù)中,如搜索引擎、購(gòu)物網(wǎng)站的實(shí)時(shí)廣告推薦,SNS社交類(lèi)網(wǎng)站的實(shí)時(shí)個(gè)性化內(nèi)容推薦,大型網(wǎng)站、網(wǎng)店的實(shí)時(shí)用戶訪問(wèn)情況分析等。什么是流計(jì)算流計(jì)算:對(duì)流數(shù)據(jù)實(shí)時(shí)分析,從而獲取有價(jià)值的實(shí)時(shí)信息流計(jì)算與關(guān)系存儲(chǔ)模型的區(qū)別主要區(qū)別有如下幾個(gè)方面:?流中的數(shù)據(jù)元素在線到達(dá);?系統(tǒng)無(wú)法控制將要處理的新到達(dá)的數(shù)據(jù)元素的順序;?數(shù)據(jù)流的潛在大小也許是無(wú)窮無(wú)盡的;?一旦數(shù)據(jù)流中的某個(gè)元素經(jīng)過(guò)處理,要么被丟棄,要么被歸檔存儲(chǔ)。因此,除非該數(shù)據(jù)被直接存儲(chǔ)在內(nèi)存中,否則將不容易被檢索。相對(duì)于數(shù)據(jù)流的大小,這是一種典型的極小相關(guān)。流計(jì)算需求對(duì)于一個(gè)流計(jì)算系統(tǒng)來(lái)說(shuō),它應(yīng)達(dá)到如下需求:?高性能:處理大數(shù)據(jù)的基本要求,如每秒處理幾十萬(wàn)條數(shù)據(jù)。?海量式:支持TB級(jí)甚至是PB級(jí)的數(shù)據(jù)規(guī)模。?實(shí)時(shí)性:必須保證一個(gè)較低的延遲時(shí)間,達(dá)到秒級(jí)別,甚至是毫秒級(jí)別。?分布式:支持大數(shù)據(jù)的基本架構(gòu),必須能夠平滑擴(kuò)展。?易用性:能夠快速進(jìn)行開(kāi)發(fā)和部署。?可靠性:能可靠地處理流數(shù)據(jù)。?針對(duì)不同的應(yīng)用場(chǎng)景,相應(yīng)的流計(jì)算系統(tǒng)會(huì)有不同的需求,但是,針對(duì)海量數(shù)據(jù)的流計(jì)算,無(wú)論在數(shù)據(jù)采集、數(shù)據(jù)處理中都應(yīng)達(dá)到秒級(jí)別的要求。流計(jì)算與Hadoop?Hadoop的批量化處理是人們喜愛(ài)它的地方,但這在某些領(lǐng)域仍顯不足,尤其是在例如移動(dòng)、Web客戶端或金融、網(wǎng)頁(yè)廣告等需要實(shí)時(shí)計(jì)算的領(lǐng)域。這些領(lǐng)域產(chǎn)生的數(shù)據(jù)量極大,沒(méi)有足夠的存儲(chǔ)空間來(lái)存儲(chǔ)每個(gè)業(yè)務(wù)收到的數(shù)據(jù)。而流計(jì)算則可以實(shí)時(shí)對(duì)數(shù)據(jù)進(jìn)行分析,并決定是否拋棄無(wú)用的數(shù)據(jù),而這無(wú)需經(jīng)過(guò)Map/Reduce的環(huán)節(jié)。?MapReduce框架為批處理做了高度優(yōu)化,系統(tǒng)典型地通過(guò)調(diào)度批量任務(wù)來(lái)操作靜態(tài)數(shù)據(jù),任務(wù)不是常駐服務(wù),數(shù)據(jù)也不是實(shí)時(shí)流入;而數(shù)據(jù)流計(jì)算的典型范式之一是不確定數(shù)據(jù)速率的事件流流入系統(tǒng),系統(tǒng)處理能力必須與事件流量匹配。數(shù)據(jù)流實(shí)時(shí)處理的模式?jīng)Q定了要和批處理使用非常不同的架構(gòu),試圖搭建一個(gè)既適合流式計(jì)算又適合批處理的通用平臺(tái),結(jié)果可能會(huì)是一個(gè)高度復(fù)雜的系統(tǒng),并且最終系統(tǒng)可能對(duì)兩種計(jì)算都不理想。流計(jì)算與Hadoop基于MapReduce的業(yè)務(wù)不得不面對(duì)處理延遲的問(wèn)題。有一種想法是將基于MapReduce的批量處理轉(zhuǎn)為小批量處理,將輸入數(shù)據(jù)切成小的片段,每隔一個(gè)周期就啟動(dòng)一次MapReduce作業(yè),這種實(shí)現(xiàn)需要減少每個(gè)片段的延遲,并且需要考慮系統(tǒng)的復(fù)雜度:將輸入數(shù)據(jù)分隔成固定大小的片段,再由MapReduce平臺(tái)處理,缺點(diǎn)在于處理延遲與數(shù)據(jù)片段的長(zhǎng)度、初始化處理任務(wù)的開(kāi)銷(xiāo)成正比。小的分段是會(huì)降低延遲,但是,也增加附加開(kāi)銷(xiāo),并且分段之間的依賴(lài)管理更加復(fù)雜(例如一個(gè)分段可能會(huì)需要前一個(gè)分段的信息);反之,大的分段會(huì)增加延遲。最優(yōu)化的分段大小取決于具體應(yīng)用。為了支持流式處理,MapReduce需要被改造成Pipeline的模式,而不是reduce直接輸出;考慮到效率,中間結(jié)果最好只保存在內(nèi)存中等等。這些改動(dòng)使得原有的MapReduce框架的復(fù)雜度大大增加,不利于系統(tǒng)的維護(hù)和擴(kuò)展。用戶被迫使用MapReduce的接口來(lái)定義流式作業(yè),這使得用戶程序的可伸縮性降低。傳統(tǒng)數(shù)據(jù)處理流程傳統(tǒng)數(shù)據(jù)處理流程示意圖?傳統(tǒng)的數(shù)據(jù)操作,首先將數(shù)據(jù)采集并存儲(chǔ)在DBMS中,然后通過(guò)query和DBMS進(jìn)行交互,得到用戶想要的結(jié)果。這樣的一個(gè)流程隱含了兩個(gè)前提:–Dataisold。當(dāng)對(duì)數(shù)據(jù)做查詢(xún)的時(shí)候,里面數(shù)據(jù)其實(shí)是過(guò)去某一個(gè)時(shí)刻數(shù)據(jù)的一個(gè)snapshot,數(shù)據(jù)可能已經(jīng)過(guò)期了;–這樣的流程需要人們主動(dòng)發(fā)出query。也就是說(shuō)用戶是主動(dòng)的,而DBMS系統(tǒng)是被動(dòng)的。流計(jì)算處理流程?流計(jì)算一般有三個(gè)處理流程:數(shù)據(jù)實(shí)時(shí)采集、數(shù)據(jù)實(shí)時(shí)計(jì)算、實(shí)時(shí)查詢(xún)服務(wù)。實(shí)時(shí)計(jì)算三個(gè)階段流計(jì)算處理流程階段一:數(shù)據(jù)實(shí)時(shí)采集為流計(jì)算提供實(shí)時(shí)數(shù)據(jù),要保證實(shí)時(shí)性、低延遲、穩(wěn)定可靠。許多開(kāi)源分布式日志收集系統(tǒng)均可滿足每秒數(shù)百M(fèi)B的數(shù)據(jù)采集和傳輸需求。–Hadoop的Chukwa–Facebook的Scribe–LinkedIn的Kafka–Cloudera的Flume–淘寶的TimeTunnel流計(jì)算的階段階段一:數(shù)據(jù)實(shí)時(shí)采集?數(shù)據(jù)采集系統(tǒng)基本架構(gòu)一般由三部分組成–Agent:主動(dòng)采集數(shù)據(jù),并把數(shù)據(jù)推送到collector–Collector:接收多個(gè)Agent的數(shù)據(jù),并實(shí)現(xiàn)有序、可靠、高性能的轉(zhuǎn)發(fā)–Store:存儲(chǔ)Collector的數(shù)據(jù)(對(duì)于流計(jì)算來(lái)說(shuō),這邊接收的數(shù)據(jù)一般直接用于計(jì)算)實(shí)時(shí)采集系統(tǒng)基本架構(gòu)流計(jì)算的階段階段二:數(shù)據(jù)實(shí)時(shí)計(jì)算?現(xiàn)在大量存在的實(shí)時(shí)數(shù)據(jù),人們需要根據(jù)當(dāng)前的數(shù)據(jù)實(shí)時(shí)的作出判斷。流計(jì)算在流數(shù)據(jù)不斷變化的運(yùn)動(dòng)過(guò)程中實(shí)時(shí)地進(jìn)行分析,捕捉到可能對(duì)用戶有用的信息,并把結(jié)果發(fā)送出去,在這種情況下:–能對(duì)流數(shù)據(jù)做出實(shí)時(shí)回應(yīng);–用戶是被動(dòng)的而DBMS是主動(dòng)的。數(shù)據(jù)實(shí)時(shí)計(jì)算示意圖流計(jì)算的階段階段三:實(shí)時(shí)查詢(xún)服務(wù)?經(jīng)由流計(jì)算框架得出的結(jié)果可供實(shí)時(shí)查詢(xún)、展示或存儲(chǔ)。流計(jì)算的應(yīng)用分析系統(tǒng)?傳統(tǒng)的分析系統(tǒng)都是離線計(jì)算,即將數(shù)據(jù)全部保存下來(lái),然后每隔一定時(shí)間進(jìn)行離線分析,再將結(jié)果保存。但這樣會(huì)有一定的延時(shí),這取決于離線計(jì)算的間隔時(shí)間和計(jì)算時(shí)長(zhǎng)。?而通過(guò)流計(jì)算,能在秒級(jí)別內(nèi)得到實(shí)時(shí)分析結(jié)果,有利于根據(jù)實(shí)時(shí)分析結(jié)果及時(shí)做出決策、調(diào)整?;诜治鱿到y(tǒng)的應(yīng)用場(chǎng)景?廣告系統(tǒng):如搜索引擎和購(gòu)物網(wǎng)站,實(shí)時(shí)分析用戶信息,展示更佳的相關(guān)廣告。?個(gè)性化推薦:如社交網(wǎng)站,實(shí)時(shí)統(tǒng)計(jì)和分析用戶行為,精確推薦,增加用戶粘性。?…流計(jì)算的應(yīng)用–量子恒道挑戰(zhàn)?實(shí)時(shí)計(jì)算處理數(shù)據(jù)3T/日?離線分布式計(jì)算處理數(shù)據(jù)超過(guò)20T/日?服務(wù)超過(guò)百萬(wàn)的淘寶賣(mài)家?…問(wèn)題?離線計(jì)算分析延時(shí)太大,對(duì)于需要實(shí)時(shí)分析數(shù)據(jù)的應(yīng)用場(chǎng)景(如雙11,雙12,一年就一次,需要實(shí)時(shí)數(shù)據(jù)來(lái)幫助調(diào)整決策),如何實(shí)現(xiàn)秒級(jí)別的實(shí)時(shí)分析?流計(jì)算的應(yīng)用–量子恒道SuperMario2.0流計(jì)算框架?海量數(shù)據(jù)實(shí)時(shí)計(jì)算引擎、實(shí)時(shí)流傳輸框架?基于Erlang+Zookeeper開(kāi)發(fā)?低延遲、高可靠性SuperMario2.0(監(jiān)控界面)流計(jì)算的應(yīng)用–量子恒道實(shí)時(shí)數(shù)據(jù)處理流程?Log數(shù)據(jù)由TimeTunnel在毫秒級(jí)別內(nèi)實(shí)時(shí)送達(dá)。?實(shí)時(shí)數(shù)據(jù)經(jīng)由SuperMario流計(jì)算框架進(jìn)行處理。?HBase輸出、存儲(chǔ)結(jié)果實(shí)現(xiàn)效果?可處理每天TB級(jí)的實(shí)時(shí)流數(shù)據(jù)。?從用戶發(fā)起請(qǐng)求到數(shù)據(jù)展示,延時(shí)控制在2-3秒內(nèi)。量子恒道實(shí)時(shí)數(shù)據(jù)處理示意圖流計(jì)算的應(yīng)用–實(shí)時(shí)交通信息管理?IBM的流計(jì)算平臺(tái)InfoSphereStreams能夠廣泛應(yīng)用于制造、零售、交通運(yùn)輸、金融證券以及監(jiān)管各行各業(yè)的解決方案之中,使得實(shí)時(shí)快速做出決策的理念得以實(shí)現(xiàn)。匯總來(lái)自不同源的實(shí)時(shí)數(shù)據(jù)InfoSphereStream界面流計(jì)算的應(yīng)用–實(shí)時(shí)交通信息管理?Streams應(yīng)用于斯德哥爾摩的交通信息管理,通過(guò)結(jié)合來(lái)自不同源的實(shí)時(shí)數(shù)據(jù),Streams可以生成動(dòng)態(tài)的、多方位的看待交通流量的方式,為城市規(guī)劃者和乘客提供實(shí)時(shí)交通狀況查看。通過(guò)InfoSphereStreams分析實(shí)時(shí)交通信息流計(jì)算框架要求流計(jì)算框架要求?高性能–處理大數(shù)據(jù)的基本要求,如每秒處理幾十萬(wàn)條數(shù)據(jù)?海量式–支持TB級(jí)數(shù)據(jù),甚至是PB級(jí)?實(shí)時(shí)性–保證較低延遲事件,達(dá)到秒級(jí),最好是毫秒級(jí)?分布式–支持大數(shù)據(jù)的基本架構(gòu),必須能平滑擴(kuò)展?易用性?可靠性?…TwitterStorm簡(jiǎn)介?免費(fèi)、開(kāi)源的分布式實(shí)時(shí)計(jì)算系統(tǒng)?簡(jiǎn)單、高效、可靠地處理大量的流數(shù)據(jù)?Storm對(duì)于實(shí)時(shí)計(jì)算的意義類(lèi)似于Hadoop對(duì)于批處理的意義?基于Clojure和Java開(kāi)發(fā)Storm流式計(jì)算TwitterStorm簡(jiǎn)介T(mén)witter數(shù)據(jù)系統(tǒng)分層處理架構(gòu)?為了處理最近的數(shù)據(jù),需要一個(gè)實(shí)時(shí)系統(tǒng)和批處理系統(tǒng)同時(shí)運(yùn)行。要計(jì)算一個(gè)查詢(xún)函數(shù),需要查詢(xún)批處理視圖和實(shí)時(shí)視圖,并把它們合并起來(lái)以得到最終的數(shù)據(jù)。?Twitter中進(jìn)行實(shí)時(shí)計(jì)算的系統(tǒng)就是Storm,它在數(shù)據(jù)流上進(jìn)行持續(xù)計(jì)算,并且對(duì)這種流式數(shù)據(jù)處理提供了有力保障。?Twitter分層的數(shù)據(jù)處理架構(gòu)由Hadoop和ElephantDB組成批處理系統(tǒng),Storm和Cassandra組成實(shí)時(shí)系統(tǒng),實(shí)時(shí)系統(tǒng)處理的結(jié)果最終會(huì)由批處理系統(tǒng)來(lái)修正,正是這個(gè)觀點(diǎn)使得Storm的設(shè)計(jì)與眾不同。Twitter數(shù)據(jù)系統(tǒng)分層處理架構(gòu)Storm應(yīng)用領(lǐng)域?流計(jì)算(Streamprocessing)?實(shí)時(shí)分析(Real-timeanalytics)?連續(xù)計(jì)算(Continuouscomputation)?分布式遠(yuǎn)程過(guò)程調(diào)用(DistributedRPC)?在線機(jī)器學(xué)習(xí)(Onlinemachinelearning)?更多…Storm主要特點(diǎn)?簡(jiǎn)單的編程模型:Storm降低了進(jìn)行實(shí)時(shí)處理的復(fù)雜性。?支持各種編程語(yǔ)言:默認(rèn)支持Clojure、Java、Ruby和Python。要增加對(duì)其他語(yǔ)言的支持,只需實(shí)現(xiàn)一個(gè)簡(jiǎn)單的Storm通信協(xié)議即可。?容錯(cuò)性:Storm會(huì)自動(dòng)管理工作進(jìn)程和節(jié)點(diǎn)的故障。?水平擴(kuò)展:計(jì)算是在多個(gè)線程、進(jìn)程和服務(wù)器之間并行進(jìn)行的。?可靠的消息處理:Storm保證每個(gè)消息至少能得到一次完整處理。?快速:系統(tǒng)的設(shè)計(jì)保證了消息能得到快速的處理。?本地模式:Storm有一個(gè)“本地模式”,可以在處理過(guò)程中完全模擬Storm集群,這樣可以快速進(jìn)行開(kāi)發(fā)和單元測(cè)試。?容易部署:Storm集群易于部署,只需少量的安裝和配置就可運(yùn)行。Storm設(shè)計(jì)思想?Storm對(duì)于流Stream的抽象:流是一個(gè)不間斷的無(wú)界的連續(xù)Tuple(元組,是元素有序列表)。Stream消息流,是一個(gè)沒(méi)有邊界的Tuple序列,這些Tuples會(huì)被以一種分布式的方式并行地創(chuàng)建和處理。Stream消息流,是一個(gè)沒(méi)有邊界的Tuple序列,這些Tuples會(huì)被以一種分布式的方式并行地創(chuàng)建和處理。Storm設(shè)計(jì)思想?Storm認(rèn)為每個(gè)Stream都有一個(gè)源頭,它將這個(gè)源頭抽象為Spouts。Spouts流數(shù)據(jù)源,它會(huì)從外部讀取流數(shù)據(jù)并發(fā)出Tuple。Spouts流數(shù)據(jù)源,它會(huì)從外部讀取流數(shù)據(jù)并發(fā)出Tuple。Storm設(shè)計(jì)思想?Storm將流的中間狀態(tài)轉(zhuǎn)換抽象為Bolts,Bolts可以處理Tuples,同時(shí)它也可以發(fā)送新的流給其他Bolts使用。Bolts消息處理者,所有的消息處理邏輯被封裝在Bolts里面,處理輸入的數(shù)據(jù)流并產(chǎn)生輸出的新數(shù)據(jù)流,可執(zhí)行過(guò)濾,聚合,查詢(xún)數(shù)據(jù)庫(kù)等操作。Bolts消息處理者,所有的消息處理邏輯被封裝在Bolts里面,處理輸入的數(shù)據(jù)流并產(chǎn)生輸出的新數(shù)據(jù)流,可執(zhí)行過(guò)濾,聚合,查詢(xún)數(shù)據(jù)庫(kù)等操作。Storm設(shè)計(jì)思想?為了提高效率,在Spout源接上多個(gè)Bolts處理器。Storm將這樣的無(wú)向環(huán)圖抽象為T(mén)opology(拓?fù)洌opology是Storm中最高層次的抽象概念,它可以被提交到Storm集群執(zhí)行,一個(gè)拓?fù)渚褪且粋€(gè)流轉(zhuǎn)換圖。圖中的邊表示Bolt訂閱了哪些流。當(dāng)Spout或者Bolt發(fā)送元組到流時(shí),它就發(fā)送元組到每個(gè)訂閱了該流的Bolt上進(jìn)行處理。Storm設(shè)計(jì)思想?Topology實(shí)現(xiàn):Storm中拓?fù)涠x僅僅是一些Thrift結(jié)構(gòu)體(Thrift是基于二進(jìn)制的高性能的通訊中間件),這樣一來(lái)就可以使用其他語(yǔ)言來(lái)創(chuàng)建和提交拓?fù)洹?Tuple實(shí)現(xiàn):一個(gè)Tuple就是一個(gè)值列表。列表中的每個(gè)value都有一個(gè)name,并且該value可以是基本類(lèi)型,字符類(lèi)型,字節(jié)數(shù)組等,也可以是其他可序列化的類(lèi)型。?拓?fù)涞拿總€(gè)節(jié)點(diǎn)都要說(shuō)明它所發(fā)射出的元組字段的name,其他節(jié)點(diǎn)只需要訂閱該name就可以接收數(shù)據(jù)。Storm設(shè)計(jì)思想?Streamgroupings(消息分發(fā)策略):定義一個(gè)Stream應(yīng)該如何分配給Bolts,解決兩個(gè)組件(Spout和Bolt)之間發(fā)送tuple元組的問(wèn)題。?Task(任務(wù)):每一個(gè)Spout和Bolt會(huì)被當(dāng)作很多task在整個(gè)集群里面執(zhí)行,每一個(gè)task對(duì)應(yīng)到一個(gè)線程。Streamgroupings示意圖Task示意圖Storm設(shè)計(jì)思想一個(gè)Topology的完整示意圖Storm框架設(shè)計(jì)?Storm集群表面類(lèi)似Hadoop集群。?在Hadoop上運(yùn)行的是“MapReducejobs”,在Storm上運(yùn)行的是“Topologies”。兩者大不相同,一個(gè)關(guān)鍵不同是一個(gè)MapReduce的Job最終會(huì)結(jié)束,而一個(gè)Topology永遠(yuǎn)處理消息(或直到kill它)。?Storm集群有兩種節(jié)點(diǎn):控制(Master)節(jié)點(diǎn)和工作者(Worker)節(jié)點(diǎn)。?控制節(jié)點(diǎn)運(yùn)行一個(gè)稱(chēng)之為“Nimbus”的后臺(tái)程序,負(fù)責(zé)在集群范圍內(nèi)分發(fā)代碼、為worker分配任務(wù)和故障監(jiān)測(cè)。?每個(gè)工作者節(jié)點(diǎn)運(yùn)行一個(gè)稱(chēng)之“Supervisor”的后臺(tái)程序,監(jiān)聽(tīng)分配給它所在機(jī)器的工作,基于Nimbus分配給它的事情來(lái)決定啟動(dòng)或停止工作者進(jìn)程。Storm框架設(shè)計(jì)?一個(gè)Zookeeper集群負(fù)責(zé)Nimbus和多個(gè)Supervisor之間的所有協(xié)調(diào)工作(一個(gè)完整的拓?fù)淇赡鼙环譃槎鄠€(gè)子拓?fù)洳⒂啥鄠€(gè)supervisor完成)。Storm框架設(shè)計(jì)?Nimbus后臺(tái)程序和Supervisor后臺(tái)程序都是快速失?。╢ail-fast)和無(wú)狀態(tài)的,所有狀態(tài)維持在Zookeeper或本地磁盤(pán)。?這種設(shè)計(jì)中master并沒(méi)有直接和worker通信,而是借助中介Zookeeper,這樣一來(lái)可以分離master和worker的依賴(lài),將狀態(tài)信息存放在zookeeper集群內(nèi)以快速回復(fù)任何失敗的一方。?這意味著你可以kill殺掉nimbus進(jìn)程和supervisor進(jìn)程,然后重啟,它們將恢復(fù)狀態(tài)并繼續(xù)工作,這種設(shè)計(jì)使Storm極其穩(wěn)定。Storm框架設(shè)計(jì)Storm工作流程示意圖Storm實(shí)例單詞統(tǒng)計(jì)?編程模型非常簡(jiǎn)單,通過(guò)Topology定義整個(gè)處理邏輯。?Topology中定義了一個(gè)Spout和兩個(gè)處理消息的Bolt。Bolt通過(guò)訂閱Tuple的name值來(lái)接收數(shù)據(jù)Storm實(shí)例單詞統(tǒng)計(jì)?ShuffleGrouping是隨機(jī)分組,表示Tuple會(huì)被隨機(jī)的分發(fā)給Bolt。?FieldsGrouping是按字段分組,保證具有相同field值的Tuple會(huì)分發(fā)給同一個(gè)Task進(jìn)行統(tǒng)計(jì),保證統(tǒng)計(jì)的準(zhǔn)確性。Storm實(shí)例SplitSentenceStorm實(shí)例WordCountStorm實(shí)例?每個(gè)從spout發(fā)送出來(lái)的消息(英文句子)都會(huì)觸發(fā)很多的task被創(chuàng)建。?Bolts將句子分解為獨(dú)立的單詞,然后發(fā)射這些單詞。?最后,實(shí)時(shí)的輸出每個(gè)單詞以及它出現(xiàn)過(guò)的次數(shù)。一個(gè)句子經(jīng)單詞統(tǒng)計(jì)后的統(tǒng)計(jì)結(jié)果示意圖Storm實(shí)例Storm應(yīng)用使用Storm的公司和項(xiàng)目Storm應(yīng)用?淘寶、阿里巴巴將流計(jì)算廣泛應(yīng)用于業(yè)務(wù)監(jiān)控、廣告推薦、買(mǎi)家實(shí)時(shí)數(shù)據(jù)分析等場(chǎng)景。淘寶數(shù)據(jù)部新架構(gòu)流計(jì)算框架匯總?IBMInfoSphereStreams:商業(yè)級(jí)高級(jí)計(jì)算平臺(tái),幫助用戶開(kāi)發(fā)的應(yīng)用程序快速攝取、分析和關(guān)聯(lián)來(lái)自數(shù)千個(gè)實(shí)時(shí)源的信息。/software/products/cn/zh/infosphere-streams/?IBMStreamBase:IBM開(kāi)發(fā)的另一款商業(yè)流計(jì)算系統(tǒng),在金融部門(mén)和政府部門(mén)使用。?TwitterStorm:免費(fèi)、開(kāi)源的分布式實(shí)時(shí)計(jì)算系統(tǒng),可簡(jiǎn)單、高效、可靠地處理大量的流數(shù)據(jù)/?Yahoo!S4(SimpleScalableStreamingSystem):開(kāi)源流計(jì)算平臺(tái),是通用的、分布式的、可擴(kuò)展的、分區(qū)容錯(cuò)的、可插拔的流式系統(tǒng)。/s4/?FacebookPuma:Facebook使用Puma和Hbase相結(jié)合來(lái)處理實(shí)時(shí)數(shù)據(jù)。?DStream:百度正在開(kāi)發(fā)的屬于百度的通用實(shí)時(shí)數(shù)據(jù)流計(jì)算系統(tǒng)。?銀河流數(shù)據(jù)處理平臺(tái):淘寶開(kāi)發(fā)的通用流數(shù)據(jù)實(shí)時(shí)計(jì)算系統(tǒng)。?SuperMario:基于erlang語(yǔ)言和zookeeper模塊開(kāi)發(fā)的高性能數(shù)據(jù)流處理框架。?Hstream、Esper、SQLstream等…網(wǎng)上資料?關(guān)于流計(jì)算的文章–對(duì)互聯(lián)網(wǎng)海量數(shù)據(jù)實(shí)時(shí)計(jì)算的理解/panfeng412/archive/2011/10/28/2227195.html–BeyondMapReduce:談2011年風(fēng)靡的數(shù)據(jù)流計(jì)算系統(tǒng)/9642/?TwitterStorm–/(Storm官方網(wǎng)站)–/nathanmarz/storm(Storm的GitHub主頁(yè),有完善的Wiki)–(徐明明,GitHub上Storm的核心貢獻(xiàn)者,博客中提供了不少關(guān)于Storm的文章)–(量子恒道提供的Storm入門(mén)教程)圖計(jì)算圖計(jì)算簡(jiǎn)介GooglePregel圖計(jì)算模型Pregel的C++APIPregel模型的基本體系結(jié)構(gòu)Pregel模型的應(yīng)用實(shí)例改進(jìn)的圖計(jì)算模型參考資料圖計(jì)算中的問(wèn)題大型圖(像社交網(wǎng)絡(luò)和網(wǎng)絡(luò)圖等)常常作為現(xiàn)在系統(tǒng)計(jì)算需要的一部分?,F(xiàn)在存在許多圖計(jì)算問(wèn)題像最短路徑、集群、網(wǎng)頁(yè)排名、最小切割、連通分支等等,但還沒(méi)有一個(gè)可擴(kuò)展的通用系統(tǒng)來(lái)解決這些問(wèn)題。解決這些問(wèn)題的算法的特點(diǎn):它們常常表現(xiàn)為比較差的內(nèi)存訪問(wèn)局部性、針對(duì)單個(gè)頂點(diǎn)的處理工作過(guò)少、以及計(jì)算過(guò)程中伴隨著的并行度的改變等問(wèn)題。可能的解決方法:為特定的圖應(yīng)用定制相應(yīng)的分布式實(shí)現(xiàn)基于現(xiàn)有的分布式計(jì)算平臺(tái)使用單機(jī)的圖算法庫(kù)——如BGL,LEAD,NetworkX,JDSL,Standford,GraphBase,F(xiàn)GL等使用已有的并行圖計(jì)算系統(tǒng)——如ParallelBGL,CGMgraph等圖計(jì)算的兩種軟件目前通用的圖處理軟件主要包括兩種。一種主要基于遍歷算法、實(shí)時(shí)的圖數(shù)據(jù)庫(kù),如Neo4j,OrientDB,DEX,和InfiniteGraph.另一種則是以圖頂點(diǎn)為中心的消息傳遞批處理的并行引擎,如Hama,GoldenOrb,Giraph,和Pregel.第一種基本都基于tinkerpop的圖基礎(chǔ)框架,tinkerpop項(xiàng)目關(guān)系如圖1所示:BSP模型以圖頂點(diǎn)為中心的消息傳遞批處理的并行引擎主要是基于BSP(BulkSynchronousParallel)模型所實(shí)現(xiàn)的并行圖處理包。BSP是由哈佛大學(xué)Viliant和牛津大學(xué)BillMcColl提出的并行計(jì)算模型。一個(gè)BSP模型由大量相互關(guān)聯(lián)的處理器(processor)所組成,它們之間形成了一個(gè)通信網(wǎng)絡(luò)。每個(gè)處理器都有快速的本地內(nèi)存和不同的計(jì)算線程。一次BSP計(jì)算過(guò)程由一系列全局超步組成,超步就是計(jì)算中一次迭代。每個(gè)超步主要包括三個(gè)組件:并發(fā)計(jì)算(Concurrentcomputation):每個(gè)參與的處理器都有自身的計(jì)算任務(wù),它們只讀取存儲(chǔ)在本地內(nèi)存的值。這些計(jì)算都是異步并且獨(dú)立的。通訊(Communication):處理器群相互交換數(shù)據(jù),交換的形式:由一方發(fā)起推送(put)和獲取(get)操作。柵欄同步(Barriersynchronisation):當(dāng)一個(gè)處理器遇到路障,會(huì)等到其他所有處理器完成它們的計(jì)算步驟。每一次同步也是一個(gè)超步的完成和下一個(gè)超步的開(kāi)始。SuperstepPregel圖計(jì)算框架Pregel是由Google開(kāi)發(fā)的一個(gè)用于分布式圖計(jì)算的計(jì)算框架,主要用于圖遍歷(BFS)、最短路徑(SSSP)、PageRank計(jì)算等等。共享內(nèi)存的運(yùn)行庫(kù)有很多,但是對(duì)于Google來(lái)說(shuō),一臺(tái)機(jī)器早已經(jīng)放不下需要計(jì)算的數(shù)據(jù)了,所以需要分布式的這樣一個(gè)計(jì)算環(huán)境。沒(méi)有Pregel之前,可以選擇用MapReduce來(lái)做,但是效率很低。下面簡(jiǎn)單介紹一下PageRank算法在Pregel和MapReduce中的實(shí)現(xiàn)。PageRank算法作為Google的網(wǎng)頁(yè)鏈接排名算法,具體公式如下:

對(duì)任意一個(gè)鏈接,其PR值為鏈入到該鏈接的源鏈接的PR值對(duì)該鏈接的貢獻(xiàn)和(分母Ni為第i個(gè)源鏈接的鏈出度)。Pregel的計(jì)算模型主要來(lái)源于BSP并行計(jì)算模型的啟發(fā)。要用Pregel計(jì)算模型實(shí)現(xiàn)PageRank算法,也就是將網(wǎng)頁(yè)排名算法映射到圖計(jì)算中,這其實(shí)是很自然的,網(wǎng)絡(luò)鏈接是一個(gè)連通圖。PageRank在Pregel中的實(shí)現(xiàn)上圖就是四個(gè)網(wǎng)頁(yè)(A,B,C,D)互相鏈入鏈出組成的聯(lián)通圖。根據(jù)Pregel的計(jì)算模型,將計(jì)算定義到頂點(diǎn)(vertex)即A,B,C,D上來(lái),對(duì)應(yīng)一個(gè)對(duì)象,即一個(gè)計(jì)算單元。每一個(gè)計(jì)算單元包含三個(gè)成員變量:?Vertexvalue:頂點(diǎn)對(duì)應(yīng)的PR值?Outedge:只需要表示一條邊,可以不取值?Message:傳遞的消息,因?yàn)樾枰獙⒈緑ertex對(duì)其它vertex的PR貢獻(xiàn)傳遞給目標(biāo)vertex每一個(gè)計(jì)算單元包含一個(gè)成員函數(shù):?Compute:該函數(shù)定義了vertex上的運(yùn)算,包括該vertex的PR值計(jì)算,以及從該vertex發(fā)送消息到其鏈出vertexPageRank在Pregel中的實(shí)現(xiàn)classPageRankVertex

:publicVertex<double,void,double>{public:

virtualvoidCompute(MessageIterator*msgs){

if(superstep()>=1){

doublesum=0;

for(;!msgs->Done();

msgs->Next())sum+=msgs->Value();

*MutableValue()=0.15/NumVertices()+0.85*sum;

}

if(superstep()<30){

constint64n=GetOutEdgeIterator().size();

SendMessageToAllNeighbors(GetValue()/n);

}else{

VoteToHalt();

}

}};PageRank在Pregel中的實(shí)現(xiàn)Pregel的執(zhí)行包含PageRankVertex類(lèi),它繼承了Vertex類(lèi)。該類(lèi)頂點(diǎn)值的類(lèi)型是double,用來(lái)存儲(chǔ)暫定的PageRank,消息類(lèi)型也是double,用來(lái)傳遞PageRank的部分。圖在第0個(gè)超步中被初始化,所以它的每個(gè)頂點(diǎn)值為1.0。在每個(gè)超步中,每個(gè)頂點(diǎn)都會(huì)沿著它的出射邊發(fā)送它的PageRank值除以出射邊數(shù)后的結(jié)果值。從第1個(gè)超步開(kāi)始,每個(gè)頂點(diǎn)會(huì)將到達(dá)的消息中的值加到sum值中,同時(shí)將它的PageRank值設(shè)為0.15/NumVertices()+0.85*sum。為了收斂,可以設(shè)置一個(gè)超步數(shù)量的限制或用aggregators來(lái)檢查是否滿足收斂條件PageRank在MapReduce中的實(shí)現(xiàn)階段1:解析網(wǎng)頁(yè)Maptask把(URL,pagecontent)對(duì)映射為(URL,(PRinit,list-of-urls))PRinit是URL的“seed”P(pán)ageRank。list-of-urls包含通過(guò)URL指向的所有頁(yè)。Reducetask只是恒等函數(shù)。階段2:PageRank分配Maptask得到(URL,(cur_rank,url_list))對(duì)于每一個(gè)url_list中的u,輸出(u,cur_rank/|url_list|)。輸出(URL,url_list)通過(guò)迭代器來(lái)獲取列表的指向。Reducetask獲得(URL,url_list)和很多(URL,var)值對(duì)匯總vals,乘上d(0.85)。輸出(URL,(new_rank,url_list))。最后階段:一個(gè)非并行組件決定是否達(dá)到收斂。如果達(dá)到收斂,寫(xiě)出PageRank生成的列表。否則,回退到第2階段的輸出,進(jìn)行另一個(gè)第2階段的迭代。MapReduce也是Google提出的一種計(jì)算模型,它是為全量計(jì)算而設(shè)計(jì)。它實(shí)現(xiàn)MapReduce需要以下三個(gè)階段:PageRank在MapReduce中的實(shí)現(xiàn)?下面是第二階段,把網(wǎng)頁(yè)鏈接映射到key-value對(duì)的偽代碼:Mapper函數(shù)的偽碼:input<PageN,RankN>->PageA,PageB,PageC...//鏈接關(guān)系begin

Nn:=thenumberofoutlinksforPageN;

foreachoutlinkPageK

outputPageK-><PageN,RankN/Nn>//同時(shí)輸出鏈接關(guān)系,用于迭代

outputPageN->PageA,PageB,PageC...EndMapper的輸出如下(已經(jīng)排序,所以PageK的數(shù)據(jù)排在一起,最后一列則是鏈接關(guān)系對(duì)):PageK-><PageN1,RankN1/Nn1>PageK-><PageN2,RankN2/Nn2>...PageK-><PageAk,PageBk,PageCk>Reduce函數(shù)的偽碼:inputmapper‘s

outputbegin

RankK:=0;

foreachinlinkPageNi

RankK+=RankNi/Nni*beta

//outputthePageKanditsnewRankforthenextiteration

output<PageK,RankK>-><PageAk,PageBk,PageCk...>endPageRank在兩種模型中實(shí)現(xiàn)的總結(jié)總結(jié):簡(jiǎn)單地來(lái)講,Pregel將PageRank處理對(duì)象看成是連通圖,而MapReduce則將其看成是Key-Value對(duì)。Pregel將計(jì)算細(xì)化到頂點(diǎn)vertex,同時(shí)在vertex內(nèi)控制循環(huán)迭代次數(shù),而MapReduce則將計(jì)算批量化處理,按任務(wù)進(jìn)行循環(huán)迭代控制。PageRank算法如果用MapReduce實(shí)現(xiàn),需要一系列的MapReduce的調(diào)用。從一個(gè)階段到下一個(gè)階段,它需要傳遞整個(gè)圖的狀態(tài),這樣就需要許多的通信和隨之而來(lái)的序列化和反序列化的開(kāi)銷(xiāo)。另外,這一連串的MapReduce作業(yè)各執(zhí)行階段需要的協(xié)同工作也增加了編程復(fù)雜度,而Pregel使用超步簡(jiǎn)化了這個(gè)過(guò)程。Pregel的計(jì)算一個(gè)典型的Pregel計(jì)算過(guò)程如下:讀取輸入初始化該圖,當(dāng)圖被初始化好后,運(yùn)行一系列的超步直到整個(gè)計(jì)算結(jié)束,這些超步之間通過(guò)一些全局的同步點(diǎn)分隔開(kāi),輸出結(jié)果結(jié)束計(jì)算。在每一個(gè)超步中,計(jì)算框架都會(huì)針對(duì)每個(gè)頂點(diǎn)調(diào)用用戶自定義的函數(shù),這個(gè)過(guò)程是并行的。該函數(shù)描述的是一個(gè)頂點(diǎn)V在一個(gè)超步S中需要執(zhí)行的操作。函數(shù)可以:讀取前一個(gè)超步(S-1)中發(fā)送給V的消息發(fā)送消息給其他頂點(diǎn),這些消息將會(huì)在下一個(gè)超步(S+1)中被接收修改頂點(diǎn)V及其出射邊的狀態(tài)發(fā)生拓?fù)渥兓麄€(gè)Pregel程序的輸出是所有頂點(diǎn)輸出的集合。頂點(diǎn):每一個(gè)頂點(diǎn)都有一個(gè)相應(yīng)的由String描述的頂點(diǎn)標(biāo)識(shí)符。每一個(gè)頂點(diǎn)都有一個(gè)與之對(duì)應(yīng)的可修改的用戶自定義值。邊:每一條有向邊都和其源頂點(diǎn)關(guān)聯(lián),還記錄了其目標(biāo)頂點(diǎn)的標(biāo)識(shí)符。每一條有向邊擁有一個(gè)可修改的用戶自定義值。Pregel計(jì)算模型的進(jìn)程在第0個(gè)超步,所有頂點(diǎn)都處于active狀態(tài)只有active頂點(diǎn)參與對(duì)應(yīng)超步中的計(jì)算頂點(diǎn)通過(guò)將其自身的status設(shè)置成“halt”來(lái)進(jìn)入inactive狀態(tài)inactive頂點(diǎn)收到其它頂點(diǎn)傳送的消息被喚醒進(jìn)入active狀態(tài)整個(gè)計(jì)算在所有頂點(diǎn)都達(dá)到“inactive”狀態(tài),并且沒(méi)有message在傳送的時(shí)候宣告結(jié)束。Pregel的消息傳遞模型計(jì)算模型是一種純消息傳遞模型,忽略遠(yuǎn)程數(shù)據(jù)讀取和其他共享內(nèi)存的方式,有兩個(gè)原因。第一,消息傳遞模型足夠表達(dá)所有圖算法。第二,出于性能的考慮。在一個(gè)集群環(huán)境中,從遠(yuǎn)程機(jī)器上讀取一個(gè)值是會(huì)有很高的延遲的。而我們的消息傳遞模式通過(guò)異步的方式傳輸批量消息,可以減少遠(yuǎn)程讀取的延遲。Pregel的一個(gè)消息傳遞的例子?通過(guò)一個(gè)簡(jiǎn)單的例子來(lái)說(shuō)明這些基本概念:給定一個(gè)強(qiáng)連通圖,圖中每個(gè)頂點(diǎn)都包含一個(gè)值,它會(huì)將最大值傳播到每個(gè)頂點(diǎn)。在每個(gè)超步中,頂點(diǎn)會(huì)從接收到的消息中選出一個(gè)最大值,并將這個(gè)值傳送給其所有的相鄰頂點(diǎn)。當(dāng)某個(gè)超步中已經(jīng)沒(méi)有頂點(diǎn)更新其值,那么算法就宣告結(jié)束。PregelC++API編寫(xiě)一個(gè)Pregel程序需要繼承Pregel中已預(yù)定義好的一個(gè)基類(lèi)——Vertex類(lèi)template<typenameVertexValue,typenameEdgeValue,typenameMessageValue>

classVertex{

public:virtualvoidCompute(MessageIterator*msgs)=0;

conststring&vertex_id()const;

int64superstep()const;

constVertexValue&GetValue();

VertexValue*MutableValue();

OutEdgeIteratorGetOutEdgeIterator();

voidSendMessageTo(conststring&dest_vertex,constMessageValue&message);

voidVoteToHalt();};用戶覆寫(xiě)Vertex類(lèi)的虛函數(shù)Compute(),該函數(shù)會(huì)在每一個(gè)超步中對(duì)每一個(gè)頂點(diǎn)進(jìn)行調(diào)用。Compute()方法可以通過(guò)調(diào)用GetValue()方法來(lái)得到當(dāng)前頂點(diǎn)的值,或者通過(guò)調(diào)用MutableValue()方法來(lái)修改當(dāng)前頂點(diǎn)的值。還可以通過(guò)由出射邊的迭代器提供的方法來(lái)查看修改出射邊對(duì)應(yīng)的值。消息傳遞機(jī)制頂點(diǎn)之間的通信是直接通過(guò)發(fā)送消息,每條消息都包含了消息值和目標(biāo)頂點(diǎn)的名稱(chēng)。消息值的數(shù)據(jù)類(lèi)型是由用戶通過(guò)Vertex類(lèi)的模版參數(shù)來(lái)指定。在一個(gè)超步中,一個(gè)頂點(diǎn)可以發(fā)送任意多的消息。在該迭代器中并不保證消息的順序,但是可以保證消息一定會(huì)被傳送并且不會(huì)重復(fù)。消息可以傳給任意標(biāo)識(shí)符已知的頂點(diǎn)combiner發(fā)送消息時(shí),尤其是當(dāng)目標(biāo)頂點(diǎn)在另外一臺(tái)機(jī)器時(shí),會(huì)產(chǎn)生一些開(kāi)銷(xiāo)。某些情況可以用combiner降低這種開(kāi)銷(xiāo)。比方說(shuō),假如Compute()收到許多的int值消息,而它僅僅關(guān)心的是這些值的和,而不是每一個(gè)int的值,這種情況下,系統(tǒng)可以將發(fā)往同一個(gè)頂點(diǎn)的多個(gè)消息combine成一個(gè)消息,該消息中僅包含它們的和值,這樣就可以減少傳輸和緩存的開(kāi)銷(xiāo)。Combiners在默認(rèn)情況下并沒(méi)有被開(kāi)啟,而用戶如果想要開(kāi)啟Combiner的功能,可以通過(guò)重載Combine()方法實(shí)現(xiàn)??蚣懿⒉粫?huì)確保哪些消息會(huì)被Combine而哪些不會(huì),也不會(huì)確保傳送給Combine()的值和Combining操作的執(zhí)行順序。所以Combiner只應(yīng)該對(duì)那些滿足交換律和結(jié)合律的操作打開(kāi)。combiner例子:假設(shè)我們想統(tǒng)計(jì)在一組相關(guān)聯(lián)的頁(yè)面中所有頁(yè)面的鏈接數(shù)。?在第一個(gè)迭代中,對(duì)從每一個(gè)頂點(diǎn)(頁(yè)面)的鏈接,我們會(huì)向目標(biāo)頁(yè)面發(fā)送一個(gè)消息。?這里輸入消息隊(duì)列上的count函數(shù)可以通過(guò)一個(gè)combiner來(lái)優(yōu)化性能。在這個(gè)求最大值的例子中,一個(gè)Maxcombiner可以減少通信負(fù)荷。aggregatorPregel的aggregators是一種提供全局通信,監(jiān)控和數(shù)據(jù)查看的機(jī)制。在一個(gè)超步S中,每一個(gè)頂點(diǎn)都可以向一個(gè)aggregator提供一個(gè)數(shù)據(jù),系統(tǒng)會(huì)使用一種reduce操作來(lái)負(fù)責(zé)聚合這些值,而產(chǎn)生的值將會(huì)對(duì)所有的頂點(diǎn)在超步S+1中可見(jiàn)。Aggregators可以用來(lái)做統(tǒng)計(jì)和全局協(xié)同。Aggregators可以通過(guò)把Aggregator類(lèi)子類(lèi)化來(lái)實(shí)現(xiàn)。應(yīng)該滿足交換律和結(jié)合律默認(rèn)情況下,一個(gè)aggregator僅僅會(huì)對(duì)來(lái)自同一個(gè)超步的輸入進(jìn)行聚合。?例子:Sum運(yùn)算符應(yīng)用于每個(gè)頂點(diǎn)的出射邊數(shù)可以用來(lái)生成圖中邊的總數(shù)并使它能與所有的頂點(diǎn)相通信。更復(fù)雜的Reduce運(yùn)算符甚至可以產(chǎn)生直方圖。在求最大值得例子中,我們我們可以通過(guò)運(yùn)用一個(gè)Maxaggregator在一個(gè)超步中完成整個(gè)程序。topologymutationCompute()算法也可以用來(lái)修改圖的拓?fù)浣Y(jié)構(gòu)。在請(qǐng)求發(fā)出后在該超步中發(fā)生拓?fù)渥兓?。拓?fù)渥兓捻樞颍簞h除操作在添加操作之前刪除邊操作在刪除頂點(diǎn)操作之前添加頂點(diǎn)操作在添加邊操作之前這種局部有序性解決了很多沖突,其余的沖突由用戶自定義的handlers解決。同一種handler機(jī)制將被用于解決由于多個(gè)頂點(diǎn)刪除請(qǐng)求或多個(gè)邊增加請(qǐng)求或刪除請(qǐng)求而造成的沖突。Pregel的協(xié)同機(jī)制是惰性的,全局的拓?fù)涓淖冊(cè)诒籥pply之前不需要進(jìn)行協(xié)調(diào)這種設(shè)計(jì)的選擇是為了優(yōu)化流式處理。直觀來(lái)講就是對(duì)頂點(diǎn)V的修改引發(fā)的沖突由V自己來(lái)處理。Pregel同樣也支持純local的拓?fù)涓淖?,Local的拓?fù)涓淖儾粫?huì)引發(fā)沖突,并且頂點(diǎn)或邊的本地增減能夠立即生效,很大程度上簡(jiǎn)化了分布式的編程。InputandOutput可以采用多種格式進(jìn)行圖的保存,比如可以用text文件,關(guān)系數(shù)據(jù)庫(kù),或者Bigtable中的行。類(lèi)似的,結(jié)果可以以任何一種格式輸出并根據(jù)應(yīng)用程序選擇最適合的存儲(chǔ)方式。用戶可以通過(guò)繼承Reader和Writer類(lèi)來(lái)定義他們自己的讀寫(xiě)方式。ImplementationPregel是為Google的集群架構(gòu)而設(shè)計(jì)的。應(yīng)用程序通常通過(guò)一個(gè)集群管理系統(tǒng)執(zhí)行,該管理系統(tǒng)會(huì)通過(guò)調(diào)度作業(yè)來(lái)優(yōu)化集群資源的使用率,有時(shí)候會(huì)殺掉一些任務(wù)或?qū)⑷蝿?wù)遷移到其他機(jī)器上去。持久化的數(shù)據(jù)被存儲(chǔ)在GFS或Bigtable中,而臨時(shí)文件比如緩存的消息則存儲(chǔ)在本地磁盤(pán)中。Pregellibrary將一張圖劃分成許多的partitions,每一個(gè)partition包含了一些頂點(diǎn)和以這些頂點(diǎn)為起點(diǎn)的邊。將一個(gè)頂點(diǎn)分配到某個(gè)partition上去取決于該頂點(diǎn)的ID。默認(rèn)的partition函數(shù)為hash(ID)modN,N為所有partition總數(shù)。接下來(lái)描述一個(gè)Pregel程序執(zhí)行的幾個(gè)階段。執(zhí)行過(guò)程1.用戶程序的多個(gè)copy開(kāi)始在集群中的機(jī)器上執(zhí)行。其中一個(gè)copy充當(dāng)masterMaster不被分配圖的任意部分,它負(fù)責(zé)協(xié)調(diào)worker的活動(dòng)2.master將圖進(jìn)行分區(qū),然后將一個(gè)或多個(gè)partition分給worker;每一個(gè)worker會(huì)在內(nèi)存中維護(hù)分配到其之上的graphpartition的狀態(tài)。執(zhí)行它的頂點(diǎn)上的用戶定義的Compute()方法并管理來(lái)自或發(fā)給其他頂點(diǎn)的消息。執(zhí)行過(guò)程執(zhí)行過(guò)程3.Master為每個(gè)worker分配用戶輸入的一部分。輸入被看做一系列的記錄,每個(gè)記錄包含任意數(shù)量的頂點(diǎn)和邊。在輸入完成加載后,所有的頂點(diǎn)被標(biāo)記為active。4.在一個(gè)超步中,master通知每一個(gè)worker去執(zhí)行,只要存在active頂點(diǎn)worker一直執(zhí)行,并為每一個(gè)active狀態(tài)的頂點(diǎn)調(diào)用compute()方法。它也會(huì)傳送以前的超步發(fā)送的消息。當(dāng)worker完成后,它會(huì)向master作出響應(yīng),告訴master在下一個(gè)超步中active頂點(diǎn)的數(shù)量。5.計(jì)算結(jié)束后,master會(huì)通知所有的worker保存它那部分的計(jì)算結(jié)果。執(zhí)行過(guò)程執(zhí)行過(guò)程容錯(cuò)性容錯(cuò)是通過(guò)checkpointing來(lái)實(shí)現(xiàn)的。在每個(gè)超步的開(kāi)始階段,master命令worker讓它保存它上面的partitions的狀態(tài)到持久存儲(chǔ)設(shè)備,包括頂點(diǎn)值,邊值,以及接收到的消息。Master通過(guò)ping消息檢測(cè)worker是否故障當(dāng)一個(gè)或多個(gè)worker出現(xiàn)故障時(shí),和它們關(guān)聯(lián)的分區(qū)的當(dāng)前狀態(tài)就會(huì)丟失。Master重新分配圖的partition到當(dāng)前可用的worker集合上。所有的partition會(huì)從最近的某超步S開(kāi)始時(shí)寫(xiě)出的checkpoint中重新加載狀態(tài)信息。該超步可能比在出故障的worker上最后運(yùn)行的超步S’早好幾個(gè)階段整個(gè)系統(tǒng)從該超步重新開(kāi)始Confinedrecovery可以改進(jìn)恢復(fù)執(zhí)行的開(kāi)銷(xiāo)和延遲。除了基本的checkpoint,worker同時(shí)還會(huì)將其在加載圖的過(guò)程中和超步中發(fā)送出去的消息寫(xiě)入日志。這樣恢復(fù)就會(huì)被限制在丟掉的那些partitions上。Worker?一個(gè)worker機(jī)器會(huì)在內(nèi)存中維護(hù)分配到其之上的graphpartition的狀態(tài)。?當(dāng)Compute()請(qǐng)求發(fā)送一個(gè)消息到其他頂點(diǎn)時(shí),worker首先確認(rèn)目標(biāo)頂點(diǎn)是屬于遠(yuǎn)程的worker機(jī)器,還是當(dāng)前worker。如果是在遠(yuǎn)程的worker機(jī)器上,那么消息就會(huì)被緩存,當(dāng)緩存大小達(dá)到一個(gè)閾值,最大的那些緩存數(shù)據(jù)將會(huì)被異步地flush出去,作為單獨(dú)的一個(gè)網(wǎng)絡(luò)消息傳輸?shù)侥繕?biāo)worker。如果是在當(dāng)前worker,那么就可以做相應(yīng)的優(yōu)化:消息就會(huì)直接被放到目標(biāo)頂點(diǎn)的輸入消息隊(duì)列中。?如果用戶提供了Combiner,那么在消息被加入到輸出隊(duì)列或者到達(dá)輸入隊(duì)列時(shí),會(huì)執(zhí)行combiner函數(shù)。后一種情況并不會(huì)節(jié)省網(wǎng)絡(luò)開(kāi)銷(xiāo),但是會(huì)節(jié)省用于消息存儲(chǔ)的空間。Master?Master主要負(fù)責(zé)的worker之間的工作協(xié)調(diào),每一個(gè)worker在其注冊(cè)到master的時(shí)候會(huì)被分配一個(gè)唯一的ID。Master內(nèi)部維護(hù)著一個(gè)當(dāng)前活動(dòng)的worker列表,master中保存這些信息的數(shù)據(jù)結(jié)構(gòu)大小與partitions的個(gè)數(shù)相關(guān),與圖中的頂點(diǎn)和邊的數(shù)目無(wú)關(guān)。?絕大部分的master的工作,包括輸入,輸出,計(jì)算,保存以及從checkpoint中恢復(fù),都將會(huì)在一個(gè)叫做barriers的地方終止:?Master同時(shí)還保存著整個(gè)計(jì)算過(guò)程以及整個(gè)graph的狀態(tài)的統(tǒng)計(jì)數(shù)據(jù)。為方便用戶監(jiān)控,Master在內(nèi)部運(yùn)行了一個(gè)HTTP服務(wù)器來(lái)顯示這些信息。Aggregators?每個(gè)Aggregator會(huì)通過(guò)對(duì)一組value值集合應(yīng)用aggregation函數(shù)計(jì)算出一個(gè)全局值。每一個(gè)worker都保存了一個(gè)aggregators的實(shí)例集,由typename和實(shí)例名稱(chēng)來(lái)標(biāo)識(shí)。當(dāng)一個(gè)worker對(duì)graph的某一個(gè)partition執(zhí)行一個(gè)超級(jí)步時(shí),worker會(huì)combine所有的提供給本地的那個(gè)aggregator實(shí)例的值到一個(gè)localvalue:即利用一個(gè)aggregator對(duì)當(dāng)前partition中包含的所有頂點(diǎn)值進(jìn)行局部規(guī)約。在超級(jí)步結(jié)束時(shí),所有workers會(huì)將所有包含局部規(guī)約值的aggregators的值進(jìn)行最后的匯總,并匯報(bào)給master。這個(gè)過(guò)程是由所有worker構(gòu)造出一棵規(guī)約樹(shù)而不是順序的通過(guò)流水線的方式來(lái)規(guī)約,這樣做的原因是為了并行化規(guī)約時(shí)cpu的使用。在下一個(gè)超級(jí)步開(kāi)始時(shí),master就會(huì)將aggregators的全局值發(fā)送給每一個(gè)worker。應(yīng)用實(shí)例最短路徑二分匹配最短路徑最短路徑問(wèn)題是圖論中最有名的問(wèn)題之一了,同時(shí)具有廣泛的應(yīng)用,該問(wèn)題有幾個(gè)形式:?jiǎn)卧醋疃搪窂?、s-t最短路徑、全局最短路徑。單源最短路徑在算法中我們假設(shè)每個(gè)頂點(diǎn)的關(guān)聯(lián)值被初始化為INF。在每個(gè)超步中,每個(gè)頂點(diǎn)會(huì)首先接收到來(lái)自鄰居傳送過(guò)來(lái)的消息,該消息包含更新過(guò)的從源頂點(diǎn)到該頂點(diǎn)的潛在的最短距離。如果這些更新里的最小值小于該頂點(diǎn)當(dāng)前關(guān)聯(lián)值,那么頂點(diǎn)就會(huì)更新這個(gè)值,并發(fā)送消息給它的鄰居。在第一個(gè)超步中,只有源頂點(diǎn)會(huì)更新它的關(guān)聯(lián)值,然后發(fā)送消息給它的直接鄰居。然后這些鄰居會(huì)更新它們的關(guān)聯(lián)值,然后繼續(xù)發(fā)送消息給它們的鄰居,如此循環(huán)往復(fù)。當(dāng)沒(méi)有更新再發(fā)生的時(shí)候,算法就結(jié)束,之后所有頂點(diǎn)的關(guān)聯(lián)值就是從源頂點(diǎn)到它的最短距離,若值為INF表示該頂點(diǎn)不可達(dá)。如果所有的邊權(quán)重都是非負(fù)的,就可以保證該過(guò)程肯定會(huì)結(jié)束。二分匹配?在循環(huán)的階段0,左邊集合中那些還未被匹配的頂點(diǎn)會(huì)發(fā)送消息給它的每個(gè)鄰居請(qǐng)求匹配,然后會(huì)無(wú)條件的VoteToHalt。如果它沒(méi)有發(fā)送消息,或者是所有的消息接收者都已經(jīng)被匹配,該頂點(diǎn)就不會(huì)再變?yōu)閍ctive狀態(tài)。?在循環(huán)的階段1,

溫馨提示

  • 1. 本站所有資源如無(wú)特殊說(shuō)明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請(qǐng)下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請(qǐng)聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁(yè)內(nèi)容里面會(huì)有圖紙預(yù)覽,若沒(méi)有圖紙預(yù)覽就沒(méi)有圖紙。
  • 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
  • 5. 人人文庫(kù)網(wǎng)僅提供信息存儲(chǔ)空間,僅對(duì)用戶上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對(duì)用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對(duì)任何下載內(nèi)容負(fù)責(zé)。
  • 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請(qǐng)與我們聯(lián)系,我們立即糾正。
  • 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時(shí)也不承擔(dān)用戶因使用這些下載資源對(duì)自己和他人造成任何形式的傷害或損失。

評(píng)論

0/150

提交評(píng)論