




版權(quán)說(shuō)明:本文檔由用戶(hù)提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請(qǐng)進(jìn)行舉報(bào)或認(rèn)領(lǐng)
文檔簡(jiǎn)介
第4章SparkStreaming流數(shù)據(jù)計(jì)算Spark大數(shù)據(jù)分析與實(shí)戰(zhàn)01SparkStreaming基本原理SparkStreaming詞頻統(tǒng)計(jì)0203Dstream數(shù)據(jù)轉(zhuǎn)換操作目錄CONTENTS04DStream輸出操作05DStream數(shù)據(jù)源讀取01SparkStreaming基本原理SparkStreaming基本原理SparkStreaming基本原理批計(jì)算一般是先有全量數(shù)據(jù),然后將計(jì)算應(yīng)用于這些數(shù)據(jù),因此計(jì)算結(jié)果是一次性輸出的。流計(jì)算的輸入數(shù)據(jù)是時(shí)間上無(wú)界的持續(xù),也永遠(yuǎn)拿不到完整的全量數(shù)據(jù)進(jìn)行分析,且結(jié)果也是持續(xù)輸出的,在時(shí)間上沒(méi)有邊界,就像水流一樣,數(shù)據(jù)連綿不斷地產(chǎn)生并被快速處理。流計(jì)算適用于有一定實(shí)時(shí)性要求的場(chǎng)合,為了提高計(jì)算效率,往往盡可能采用增量計(jì)算的方式,已計(jì)算過(guò)的數(shù)據(jù)不再重復(fù)處理SparkStreaming建立在SparkRDD基礎(chǔ)之上,它接收實(shí)時(shí)輸入的數(shù)據(jù)流,然后將數(shù)據(jù)拆分成多個(gè)批次,并將每個(gè)批次的數(shù)據(jù)交給Spark計(jì)算引擎處理,這樣每個(gè)批次的數(shù)據(jù)就相當(dāng)于一個(gè)局部范圍內(nèi)的離線(xiàn)數(shù)據(jù)SparkStreaming基本原理SparkStreaming設(shè)計(jì)了一個(gè)名為DStream的數(shù)據(jù)結(jié)構(gòu)(離散化數(shù)據(jù)流),代表連續(xù)不斷的數(shù)據(jù)流,實(shí)現(xiàn)上是將輸入數(shù)據(jù)流按照預(yù)設(shè)的時(shí)間片進(jìn)行分段,比如每秒切分一次,每個(gè)切分的數(shù)據(jù)段都被Spark轉(zhuǎn)換成一個(gè)批次的RDD數(shù)據(jù),這些數(shù)據(jù)分段則為DStream由于DStream是由切分?jǐn)?shù)據(jù)段的RDD構(gòu)成的,所以DStream的數(shù)據(jù)處理也就變成了對(duì)RDD的操作。在DStream數(shù)據(jù)集上應(yīng)用的任何算子,比如map、reduce、join、window等,Spark底層都會(huì)將其翻譯為對(duì)DStream的局部RDD的操作,并生成一個(gè)新的DStream02SparkStreaming詞頻統(tǒng)計(jì)Netcat網(wǎng)絡(luò)工具測(cè)試DStream詞頻統(tǒng)計(jì)Netcat網(wǎng)絡(luò)工具測(cè)試Ubuntu系統(tǒng)已經(jīng)附帶Netcat網(wǎng)絡(luò)工具文件名為nc。我們可以開(kāi)啟兩個(gè)終端窗體,其中左邊終端窗體充當(dāng)監(jiān)聽(tīng)9999端口的服務(wù)端,右邊終端窗體充當(dāng)連接到服務(wù)器的客戶(hù)端,然后雙方互發(fā)數(shù)據(jù)并在對(duì)方顯示如果一切正常,nc服務(wù)端和客戶(hù)端就可以互發(fā)數(shù)據(jù),每次輸入的內(nèi)容都會(huì)在對(duì)方的窗體顯示,這是因?yàn)閚c同時(shí)支持雙向交互操作。測(cè)試完畢,按Ctrl+C快捷鍵結(jié)束右側(cè)終端窗體中運(yùn)行的nc客戶(hù)端,保留左邊的服務(wù)端窗體DStream詞頻統(tǒng)計(jì)SparkStreaming接收的是一個(gè)源源不斷產(chǎn)生數(shù)據(jù)的數(shù)據(jù)源,是不間斷循環(huán)運(yùn)行的。在SparkStreaming應(yīng)用程序中至少需要啟動(dòng)兩個(gè)線(xiàn)程,其中一個(gè)用來(lái)接收數(shù)據(jù),另一個(gè)用來(lái)處理數(shù)據(jù),如果只有一個(gè)線(xiàn)程,就無(wú)法對(duì)數(shù)據(jù)進(jìn)行處理,也就看不到任何實(shí)質(zhì)性的效果DStream詞頻統(tǒng)計(jì)NetworkWordCount.py具體代碼如下:DStream詞頻統(tǒng)計(jì)接下來(lái)通過(guò)spark-submit命令將SparkStreaming應(yīng)用程序提交到Spark中運(yùn)行當(dāng)NetworkWordCount.py程序運(yùn)行之后,每隔3秒就會(huì)處理并輸出一次結(jié)果。接下來(lái)找到前面運(yùn)行nc服務(wù)端的終端窗體,在其中隨便輸入一些單詞,稍等片刻即可在SparkStreaming應(yīng)用程序中打印詞頻統(tǒng)計(jì)的結(jié)果信息DStream詞頻統(tǒng)計(jì)Spark在運(yùn)行時(shí)默認(rèn)會(huì)產(chǎn)生大量日志信息,此時(shí)可在Spark的conf目錄中通過(guò)perties配置文件設(shè)定全局的運(yùn)行日志級(jí)別,或在代碼中使用sc.setLogLevel()方法設(shè)定當(dāng)前應(yīng)用程序的運(yùn)行日志級(jí)別,可以是WARN、ERROR、INFO、DEBUG、ALL等幾種保存修改并退出編輯器,重新執(zhí)行spark-submit命令將NetworkWordCount.py程序提交運(yùn)行,并在nc服務(wù)端窗體再次輸入一些單詞,此時(shí)就可以比較清楚地查看SparkStreaming處理后的輸出結(jié)果,避免了很多干擾信息03DStream轉(zhuǎn)換操作DStream無(wú)狀態(tài)轉(zhuǎn)換操作DStream基于狀態(tài)更新的轉(zhuǎn)換DStream基于滑動(dòng)窗口的轉(zhuǎn)換DStream無(wú)狀態(tài)轉(zhuǎn)換操作DStream無(wú)狀態(tài)轉(zhuǎn)換操作僅僅計(jì)算當(dāng)前時(shí)間片的數(shù)據(jù)內(nèi)容,每個(gè)批次的處理結(jié)果不依賴(lài)于先前批次的數(shù)據(jù),也不影響后續(xù)批次的數(shù)據(jù)。所以,DStream無(wú)狀態(tài)轉(zhuǎn)換操作處理的批次數(shù)據(jù)都是獨(dú)立的,轉(zhuǎn)換操作被直接應(yīng)用到每個(gè)批次的RDD數(shù)據(jù)上,DStream的無(wú)狀態(tài)轉(zhuǎn)換操作與RDD的轉(zhuǎn)換操作是類(lèi)似的,返回的也是一個(gè)新的DStream對(duì)象除了無(wú)狀態(tài)轉(zhuǎn)換操作,DStream還支持有狀態(tài)的轉(zhuǎn)換操作,在這種情況下,DStream在計(jì)算當(dāng)前批次數(shù)據(jù)時(shí),會(huì)依賴(lài)之前批次的數(shù)據(jù)或中間結(jié)果,并不斷把當(dāng)前計(jì)算的數(shù)據(jù)與歷史時(shí)間片的數(shù)據(jù)進(jìn)行累計(jì)DStream基于狀態(tài)更新的轉(zhuǎn)換DStream基于狀態(tài)更新的轉(zhuǎn)換允許將當(dāng)前時(shí)間和歷史時(shí)間片的RDD數(shù)據(jù)疊加計(jì)算。比如,對(duì)DStream的數(shù)據(jù)按key執(zhí)行reduce操作,然后將各個(gè)批次的中間結(jié)果累加到一起。Spark并非保存所有的歷史數(shù)據(jù),只是將當(dāng)前的計(jì)算結(jié)果保存到磁盤(pán)以便下一次計(jì)算調(diào)用,避免重復(fù)計(jì)算DStream基于狀態(tài)更新的轉(zhuǎn)換在使用updateStateByKey算子時(shí),必須開(kāi)啟checkpoint(檢查點(diǎn))機(jī)制并設(shè)置中間結(jié)果數(shù)據(jù)保存的目錄(集群環(huán)境一般為HDFS上的目錄,開(kāi)發(fā)測(cè)試階段可指定使用本地目錄),這樣才能把每個(gè)key對(duì)應(yīng)的狀態(tài)值長(zhǎng)期保存,避免內(nèi)存數(shù)據(jù)的丟失。比如,統(tǒng)計(jì)“雙十一”當(dāng)天的總銷(xiāo)量和成交金額,操作者會(huì)在各個(gè)時(shí)段分批計(jì)算和局部累計(jì)產(chǎn)生的數(shù)據(jù),最后進(jìn)行全部匯總DStream基于狀態(tài)更新的轉(zhuǎn)換通過(guò)spark-submit命令將NetworkWordCountAll.py程序提交到Spark運(yùn)行。需要注意的是,要確保nc服務(wù)端正在運(yùn)行,否則需要重新執(zhí)行nc-lk9999命令來(lái)啟動(dòng)它,不然在提交時(shí)會(huì)遇到錯(cuò)誤根據(jù)輸出結(jié)果可以看出,通過(guò)updateStateByKey算子統(tǒng)計(jì)的詞頻,都是在之前批次的中間統(tǒng)計(jì)結(jié)果基礎(chǔ)上累積的數(shù)值,而不是從最開(kāi)始的單詞重復(fù)計(jì)算一遍后的數(shù)值DStream基于滑動(dòng)窗口的轉(zhuǎn)換DStream基于滑動(dòng)窗口的轉(zhuǎn)換是在時(shí)間軸上設(shè)置批次數(shù)據(jù)所在的“時(shí)間窗口”大小,以及窗口滑動(dòng)的間隔,從而動(dòng)態(tài)獲取數(shù)據(jù)流的一種機(jī)制,比如每隔10秒統(tǒng)計(jì)一次最近30分鐘的新聞熱搜詞。它是在一個(gè)比單批次間隔更長(zhǎng)的時(shí)間范圍內(nèi),通過(guò)整合位于窗口范圍內(nèi)的多個(gè)批次數(shù)據(jù)計(jì)算得到的數(shù)據(jù)結(jié)果window1=time1+time2+time3window3=window1+time4+time5-time1-time2window5=time3+time4+time5基于滑動(dòng)窗口的轉(zhuǎn)換是Spark提供的一組“短線(xiàn)”操作,它比單批次的“局部時(shí)間”長(zhǎng),但又比基于狀態(tài)更新的“全程時(shí)間”短,通過(guò)滑動(dòng)窗口技術(shù)可以實(shí)現(xiàn)大規(guī)模數(shù)據(jù)的增量更新和統(tǒng)計(jì),即對(duì)任意一段時(shí)間內(nèi)的數(shù)據(jù)進(jìn)行處理,且不重復(fù)計(jì)算已處理過(guò)的數(shù)據(jù)DStream基于滑動(dòng)窗口的轉(zhuǎn)換SparkStreaming支持的滑動(dòng)窗口算子包括reduceByKeyAndWindow、reduceByWindow、window等多種。下面以reduceByKeyAndWindow算子為例對(duì)詞頻統(tǒng)計(jì)例子進(jìn)行改造,以實(shí)現(xiàn)基于滑動(dòng)窗口的詞頻統(tǒng)計(jì)功能DStream基于滑動(dòng)窗口的轉(zhuǎn)換保存修改的代碼并退出,確保nc服務(wù)端在監(jiān)聽(tīng)9999端口,然后將修改后的代碼提交到Spark運(yùn)行,然后在nc服務(wù)端輸入文字內(nèi)容在nc服務(wù)端所在的終端窗體任意輸入一些內(nèi)容,此時(shí)SparkStreaming就會(huì)通過(guò)滑動(dòng)窗口方式統(tǒng)計(jì)出相應(yīng)的詞頻結(jié)果04DStream輸出操作DStream輸出操作DStream輸出操作DStream可以根據(jù)需要輸出到外部使用或保存,比如發(fā)送給Kafka消息系統(tǒng),保存到數(shù)據(jù)庫(kù)或者文件等。下面列出一些常用的DStream輸出操作算子DStream輸出操作以saveAsTextFiles算子為例,將詞頻統(tǒng)計(jì)結(jié)果保存到文本文件中DStream輸出操作保存以上代碼并退出編輯器,確保nc服務(wù)端在監(jiān)聽(tīng)9999端口,然后在Linux終端窗體中通過(guò)spark-submit命令將NetworkWordCountSave.py程序提交到Spark運(yùn)行,然后在nc服務(wù)端輸入文字內(nèi)容查看主目錄中的streaming文件夾,里面出現(xiàn)了很多以output-開(kāi)關(guān)的子目錄,其中保存的就是不同批次時(shí)間點(diǎn)的數(shù)據(jù)文件05DStream數(shù)據(jù)源讀取讀取文件數(shù)據(jù)流讀取Kafka數(shù)據(jù)流讀取文件數(shù)據(jù)流SparkStreaming能夠從本地文件目錄、HDFS文件系統(tǒng)中讀取數(shù)據(jù),只需通過(guò)調(diào)用textFileStream()方法即可創(chuàng)建一個(gè)基于文件流類(lèi)型的數(shù)據(jù)源讀取文件數(shù)據(jù)流將代碼提交到Spark運(yùn)行再新打開(kāi)一個(gè)Linux終端窗體,切換到~/streaming目錄,使用echo命令依次在logfile子目錄中創(chuàng)建1.txt和2.txt文件,同時(shí)觀(guān)察運(yùn)行代碼的終端窗體所發(fā)生的變化SparkStreaming會(huì)持續(xù)監(jiān)視logfile目錄中的新文件,一旦處理后就不會(huì)再重復(fù)讀取(即使修改文件內(nèi)容也會(huì)被忽略)。如果是使用HDFS文件,只需將textFileStream()方法的路徑參數(shù)修改為HDFS目錄,比如hdfs://localhost:9000/logfile,然后不斷地上傳新文件到HDFS目錄讀取Kafka數(shù)據(jù)流(1)Kafka介紹Kafka是一個(gè)分布式的消息“發(fā)布-訂閱”系統(tǒng),也被稱(chēng)為消息中間件,它通過(guò)一個(gè)強(qiáng)大的消息隊(duì)列處理大量的數(shù)據(jù),并能夠?qū)⑾囊粋€(gè)端點(diǎn)可靠地傳遞到另一個(gè)端點(diǎn)。Kafka非常適合離線(xiàn)和實(shí)時(shí)的數(shù)據(jù)消費(fèi),支持將消息內(nèi)容保存在磁盤(pán)以防止數(shù)據(jù)丟失,能方便地與Spark集成,用于實(shí)時(shí)的流數(shù)據(jù)計(jì)算生產(chǎn)者和消費(fèi)者(producer和consumer):消息的發(fā)送者是producer,消息的使用和接收者是consumer。生產(chǎn)者將數(shù)據(jù)保存到Kafka中,消費(fèi)者從Kafka中獲取消息讀取Kafka數(shù)據(jù)流(1)Kafka介紹Kafka實(shí)例(broker):Kafka集群中有多個(gè)節(jié)點(diǎn),每個(gè)節(jié)點(diǎn)都可以存儲(chǔ)消息,每個(gè)節(jié)點(diǎn)就是一個(gè)Kafka實(shí)例,也被稱(chēng)為broker,其字面含義是“經(jīng)紀(jì)人”主題(topic):一個(gè)topic保存的是同一類(lèi)消息,相當(dāng)于消息分類(lèi)。每個(gè)生產(chǎn)者將消息發(fā)送到Kafka時(shí),必須指明要保存到哪個(gè)topic,以指明這個(gè)消息屬于哪一類(lèi)。topic的消息格式一般包含key和value兩個(gè)字段,是一個(gè)(K,V)鍵值對(duì)形式的二元組分區(qū)(partition):每個(gè)topic又可以分為多個(gè)partition,每個(gè)分區(qū)在磁盤(pán)上就是一個(gè)追加模式的log文件。任何發(fā)布到此partition的消息都會(huì)被追加到對(duì)應(yīng)log文件的尾部。設(shè)置分區(qū)的原因是,Kafka基于文件進(jìn)行存儲(chǔ),當(dāng)文件內(nèi)容多到一定程度時(shí)就很容易達(dá)到單個(gè)磁盤(pán)文件的上限,而采用一個(gè)分區(qū)對(duì)應(yīng)一個(gè)文件的做法,數(shù)據(jù)可以被分別存儲(chǔ)到不同的節(jié)點(diǎn)上,還能進(jìn)行負(fù)載均衡讀取Kafka數(shù)據(jù)流(1)Kafka介紹偏移量(offset):一個(gè)分區(qū)是磁盤(pán)上的一個(gè)文件,消息存儲(chǔ)在文件中的位置就稱(chēng)為offset,即偏移量,它用來(lái)唯一標(biāo)記一條消息的整數(shù)。由于Kafka沒(méi)有提供其他索引機(jī)制存儲(chǔ)偏移量,因此文件只能按順序操作,不允許隨機(jī)讀/寫(xiě)消息讀取Kafka數(shù)據(jù)流(2)Kafka安裝與測(cè)試啟動(dòng)Kafka依賴(lài)的ZooKeeper服務(wù)讀取Kafka數(shù)據(jù)流(2)Kafka安裝與測(cè)試將Kafka服務(wù)啟動(dòng),啟動(dòng)完畢后,正常情況下會(huì)出現(xiàn)一個(gè)名為Kafka的進(jìn)程名稱(chēng)至此,Kafka就運(yùn)行起來(lái)了,Java進(jìn)程中也多了一個(gè)名為Kafka的進(jìn)程,默認(rèn)使用9092端口讀取Kafka數(shù)據(jù)流(2)Kafka安裝與測(cè)試接下來(lái)運(yùn)行程序先對(duì)Kafka進(jìn)行簡(jiǎn)單的測(cè)試,我們?cè)诘?個(gè)終端窗體中創(chuàng)建mytopic主題,并向其發(fā)送幾條測(cè)試的消息內(nèi)容,然后在第2個(gè)終端窗體中獲取mytopic主題收到的消息并顯示將運(yùn)行以上命令的Linux終端窗體稱(chēng)為A,代表消息的生產(chǎn)者讀取Kafka數(shù)據(jù)流(2)Kafka安裝與測(cè)試新打開(kāi)一個(gè)Linux終端窗體B(代表消費(fèi)者),在里面輸入下面的命令準(zhǔn)備完畢,現(xiàn)在測(cè)試Kafka的消息生產(chǎn)和消費(fèi)。在終端窗體A中隨便輸
溫馨提示
- 1. 本站所有資源如無(wú)特殊說(shuō)明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請(qǐng)下載最新的WinRAR軟件解壓。
- 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請(qǐng)聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶(hù)所有。
- 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁(yè)內(nèi)容里面會(huì)有圖紙預(yù)覽,若沒(méi)有圖紙預(yù)覽就沒(méi)有圖紙。
- 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
- 5. 人人文庫(kù)網(wǎng)僅提供信息存儲(chǔ)空間,僅對(duì)用戶(hù)上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對(duì)用戶(hù)上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對(duì)任何下載內(nèi)容負(fù)責(zé)。
- 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請(qǐng)與我們聯(lián)系,我們立即糾正。
- 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時(shí)也不承擔(dān)用戶(hù)因使用這些下載資源對(duì)自己和他人造成任何形式的傷害或損失。
最新文檔
- 泉州工程職業(yè)技術(shù)學(xué)院《過(guò)程控制專(zhuān)業(yè)實(shí)驗(yàn)》2023-2024學(xué)年第二學(xué)期期末試卷
- 泉州紡織服裝職業(yè)學(xué)院《注冊(cè)電氣工程師概論》2023-2024學(xué)年第二學(xué)期期末試卷
- 上??萍即髮W(xué)《會(huì)計(jì)制度設(shè)計(jì)》2023-2024學(xué)年第二學(xué)期期末試卷
- 商丘師范學(xué)院《信息安全攻防對(duì)抗實(shí)訓(xùn)》2023-2024學(xué)年第二學(xué)期期末試卷
- 興安職業(yè)技術(shù)學(xué)院《機(jī)器學(xué)習(xí)與人工智能導(dǎo)論》2023-2024學(xué)年第二學(xué)期期末試卷
- 3《植物媽媽有辦法》教學(xué)設(shè)計(jì)-2024-2025學(xué)年統(tǒng)編版語(yǔ)文二年級(jí)上冊(cè)
- 人教版七年級(jí)歷史與社會(huì)下冊(cè)6.4.2-高原圣城-拉薩教學(xué)設(shè)計(jì)
- 河池2025年廣西河池市事業(yè)單位招聘731人筆試歷年參考題庫(kù)附帶答案詳解
- 7微生物與健康 教學(xué)設(shè)計(jì) -2023-2024學(xué)年科學(xué)六年級(jí)上冊(cè)教科版
- 揚(yáng)州環(huán)境資源職業(yè)技術(shù)學(xué)院《田徑教學(xué)與實(shí)踐》2023-2024學(xué)年第二學(xué)期期末試卷
- 2023年設(shè)備檢修標(biāo)準(zhǔn)化作業(yè)規(guī)范
- 光伏電站除草服務(wù)(合同)范本【詳盡多條款】
- 2023年考核銀行安全保衛(wèi)人員真題與答案
- 儲(chǔ)能全系統(tǒng)解決方案及產(chǎn)品手冊(cè)
- (高清版)DZT 0309-2017 地質(zhì)環(huán)境監(jiān)測(cè)標(biāo)志
- 人員轉(zhuǎn)移安置實(shí)施方案(公司重組)
- 病歷書(shū)寫(xiě)相關(guān)法律法規(guī)
- 老舊小區(qū)加裝電梯方案
- 老年人誤吸與預(yù)防-護(hù)理團(tuán)標(biāo)
- 輸氣場(chǎng)站工藝流程切換操作規(guī)程課件
- 青少年網(wǎng)絡(luò)安全教育課件
評(píng)論
0/150
提交評(píng)論