![Spark大數(shù)據(jù)分析技術(shù)(Python版)課件 第6章 Spark Streaming流計(jì)算_第1頁](http://file4.renrendoc.com/view/0e46b6e30e7b70d81e9e82aac73c0ba1/0e46b6e30e7b70d81e9e82aac73c0ba11.gif)
![Spark大數(shù)據(jù)分析技術(shù)(Python版)課件 第6章 Spark Streaming流計(jì)算_第2頁](http://file4.renrendoc.com/view/0e46b6e30e7b70d81e9e82aac73c0ba1/0e46b6e30e7b70d81e9e82aac73c0ba12.gif)
![Spark大數(shù)據(jù)分析技術(shù)(Python版)課件 第6章 Spark Streaming流計(jì)算_第3頁](http://file4.renrendoc.com/view/0e46b6e30e7b70d81e9e82aac73c0ba1/0e46b6e30e7b70d81e9e82aac73c0ba13.gif)
![Spark大數(shù)據(jù)分析技術(shù)(Python版)課件 第6章 Spark Streaming流計(jì)算_第4頁](http://file4.renrendoc.com/view/0e46b6e30e7b70d81e9e82aac73c0ba1/0e46b6e30e7b70d81e9e82aac73c0ba14.gif)
![Spark大數(shù)據(jù)分析技術(shù)(Python版)課件 第6章 Spark Streaming流計(jì)算_第5頁](http://file4.renrendoc.com/view/0e46b6e30e7b70d81e9e82aac73c0ba1/0e46b6e30e7b70d81e9e82aac73c0ba15.gif)
版權(quán)說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請進(jìn)行舉報(bào)或認(rèn)領(lǐng)
文檔簡介
SparkStreaming流計(jì)算1
流計(jì)算概述2 SparkStreaming工作原理3SparkStreaming編程模型4 DStream創(chuàng)建5DStream操作1流計(jì)算概述流數(shù)據(jù)概述流數(shù)據(jù)是一組順序、大量、快速、連續(xù)到達(dá)的數(shù)據(jù)序列,可被視為一個(gè)隨時(shí)間延續(xù)而無限增長的動(dòng)態(tài)數(shù)據(jù)集合。流數(shù)據(jù)具有四個(gè)特點(diǎn):(1)數(shù)據(jù)實(shí)時(shí)到達(dá)。(2)數(shù)據(jù)到達(dá)次序獨(dú)立,不受應(yīng)用系統(tǒng)所控制。(3)數(shù)據(jù)規(guī)模宏大且不能預(yù)知其最大值。(4)數(shù)據(jù)一經(jīng)處理,除非特意保存,否則不能被再次取出處理,或者再次提取數(shù)據(jù)代價(jià)昂貴。
電子書網(wǎng)站通過對眾多用戶的在線內(nèi)容點(diǎn)擊流記錄進(jìn)行流處理,優(yōu)化網(wǎng)站上的內(nèi)容投放,為用戶實(shí)時(shí)推薦相關(guān)內(nèi)容讓用戶獲得最佳的閱讀體驗(yàn)。
網(wǎng)絡(luò)游戲公司收集關(guān)于玩家與游戲間互動(dòng)的流數(shù)據(jù),并將這些數(shù)據(jù)提供給游戲平臺(tái),然后再對這些數(shù)據(jù)進(jìn)行實(shí)時(shí)分析,并提供各種激勵(lì)措施和動(dòng)態(tài)體驗(yàn)來吸引玩家。1流計(jì)算概述流計(jì)算處理流程典型的流計(jì)算處理流程:流處理中的數(shù)據(jù)集是“無邊界”的,完整數(shù)據(jù)集只能代表截至目前已經(jīng)進(jìn)入到系統(tǒng)中的數(shù)據(jù)總量;處理工作是基于事件的,除非明確停止否則沒有“盡頭”,處理結(jié)果立刻可用,并會(huì)隨著新數(shù)據(jù)的抵達(dá)持續(xù)更新。SparkStreaming流計(jì)算1 流計(jì)算概述2 SparkStreaming工作原理3SparkStreaming編程模型4 DStream創(chuàng)建5DStream操作2SparkStreaming工作原理SparkStreaming將數(shù)據(jù)流以時(shí)間片為單位分割形成一系列RDD(一個(gè)RDD對應(yīng)一塊分割數(shù)據(jù)),這些RDD在SparkStreaming中用一個(gè)抽象數(shù)據(jù)模型DStream來描述,稱為“離散流”。DStream可以用Kafka、Flume、Kinesis、Twitter、TCPSockets等數(shù)據(jù)源的輸入數(shù)據(jù)流來創(chuàng)建,也可以通過對其他DStream應(yīng)用map()、reduce()、join()等操作進(jìn)行轉(zhuǎn)換創(chuàng)建。DStream與RDD的對應(yīng)關(guān)系:SparkStreaming基本工作原理如圖所示,使用”微批次”的架構(gòu),把流式計(jì)算當(dāng)做一系列連續(xù)的小規(guī)模批處理來對待。SparkStreaming流計(jì)算1 流計(jì)算概述2 SparkStreaming工作原理3SparkStreaming編程模型4 DStream創(chuàng)建5DStream操作3SparkStreaming編程模型編寫SparkStreaming程序的步驟(1)創(chuàng)建StreamingContext對象。(2)為StreamingContext對象指定輸入源得到DStream對象。(3)操作DStream。用戶定義轉(zhuǎn)換操作和輸出操作來定義流計(jì)算。(4)調(diào)用StreamingContext對象的start()方法來開始接收數(shù)據(jù)和處理流數(shù)據(jù),之前的所有步驟只是創(chuàng)建了執(zhí)行流程。只有StreamingContext.start()執(zhí)行后才真正啟動(dòng)程序進(jìn)行所有預(yù)期的操作,之后就不能再添加任何計(jì)算邏輯了。只能有一個(gè)StreamingContext對象執(zhí)行start()方法并使該對象處于活躍狀態(tài)。(5)調(diào)用StreamingContext對象的awaitTermination()方法來等待流計(jì)算流程結(jié)束,或者通過調(diào)用StreamingContext對象的stop()方法來手動(dòng)結(jié)束流計(jì)算流程。一個(gè)StreamingContext停止之后,是不能重啟的,即調(diào)用stop()之后,不能再調(diào)用start()。3SparkStreaming編程模型創(chuàng)建StreamingContext對象在SparkStreaming編程中,需要先創(chuàng)建一個(gè)StreamingContext對象,他是SparkStreaming應(yīng)用程序的上下文和入口。在SparkStreaming中是通過對DStream(離散數(shù)據(jù)流)執(zhí)行轉(zhuǎn)換操作和輸出操作進(jìn)行數(shù)據(jù)處理的。通過SparkContext對象創(chuàng)建StreamingContext對象的語法格式如下。ssc=StreamingContext(SparkContext,Interval)參數(shù)有兩個(gè),一個(gè)是SparkContext對象,一個(gè)是處理流數(shù)據(jù)的時(shí)間間隔Interval,該參數(shù)指定了SparkStreaming處理流數(shù)據(jù)的時(shí)間間隔。進(jìn)入到pyspark交互式執(zhí)行環(huán)境后會(huì)默認(rèn)創(chuàng)建一個(gè)SparkContext對象sc,但不會(huì)自動(dòng)創(chuàng)建StreamingContext對象,需要人們手工創(chuàng)建:>>>frompyspark.streamingimportStreamingContext>>>ssc=StreamingContext(sc,1)一個(gè)SparkContext對象可以創(chuàng)建多個(gè)StreamingContext對象,只要調(diào)用stop(false)方法停止前一個(gè)StreamingContext對象,就可再創(chuàng)建下一個(gè)。3SparkStreaming編程模型創(chuàng)建StreamingContext對象如果是編寫一個(gè)獨(dú)立的SparkStreaming程序,則需要在代碼文件中通過如下方式創(chuàng)建StreamingContext對象。frompysparkimportSparkConf,SparkContextfrompyspark.streamingimportStreamingContextconf=SparkConf().setMaster('local[2]').setAppName('SelfStreaming')sc=SparkContext(conf=conf)#創(chuàng)建SparkContext對象ssc=StreamingContext(sc,1)說明:appName表示編寫的應(yīng)用程序顯示在集群上的名字如'SelfStreaming',setMaster(master)中的參數(shù)master是一個(gè)Spark、Mesos、YARN集群URL;或者一個(gè)特殊字符串“l(fā)ocal[*]”,表示程序用本地模式運(yùn)行,*的值至少為2,表示有兩個(gè)線程執(zhí)行流計(jì)算,一個(gè)接受數(shù)據(jù),一個(gè)處理數(shù)據(jù)。當(dāng)程序運(yùn)行在集群中時(shí),人們并不希望在程序中設(shè)置master,而是希望用spark-submit啟動(dòng)應(yīng)用程序,并從spark-submit中得到master的值。SparkStreaming流計(jì)算1 流計(jì)算概述2 SparkStreaming工作原理3SparkStreaming編程模型4 DStream創(chuàng)建5DStream操作4DStream創(chuàng)建創(chuàng)建輸入源為文件流的DStream對象1.在pyspark中創(chuàng)建輸入源為文件流的DStream對象的創(chuàng)建過程首先,在Linux系統(tǒng)中打開一個(gè)終端(稱為“數(shù)據(jù)源終端”),創(chuàng)建logfile目錄:$mkdir-p/usr/local/spark/mydata/streaming/logfile#遞歸創(chuàng)建logfile目錄$cd/usr/local/spark/mydata/streaming/logfile然后,在Linux系統(tǒng)中再打開一個(gè)終端(稱為“流計(jì)算終端”),啟動(dòng)pyspark,依次輸入如下語句。>>>frompyspark.streamingimportStreamingContext>>>ssc=StreamingContext(sc,30)#創(chuàng)建StreamingContext對象>>>FileDStream=ssc.textFileStream("file:///usr/local/spark/mydata/streaming/logfile")>>>words=FileDStream.flatMap(lambdax:x.split(""))>>>wordPair=words.map(lambdax:(x,1))>>>wordCounts=wordPair.reduceByKey(lambdaa,b:a+b)>>>wordCounts.pprint()>>>ssc.start()>>>ssc.awaitTermination()4DStream創(chuàng)建創(chuàng)建輸入源為文件流的DStream對象在pyspark中執(zhí)行ssc.start()以后,自動(dòng)進(jìn)入循環(huán)監(jiān)聽狀態(tài),屏幕上會(huì)不斷顯示如下類似信息:#這里省略若干屏幕信息-------------------------------------------Time:2021-12-0821:29:00-------------------------------------------切換到“數(shù)據(jù)源終端”,在/usr/local/spark/mydata/streaming/logfile目錄下新建一個(gè)log1.txt文件,在文件中輸入一些英文語句后保存并退出,具體命令如下。$cat>log1.txt#創(chuàng)建文件HelloWorldHelloScalaHelloSparkHelloPython輸入完成后,按Ctrl+D組合鍵存盤退出cat。此時(shí)可在當(dāng)前文件夾下創(chuàng)建一個(gè)包含剛才輸入內(nèi)容的叫l(wèi)og1.txt的文件。4DStream創(chuàng)建創(chuàng)建輸入源為文件流的DStream對象切換到“流計(jì)算”終端,最多等待30秒,就可以看到詞頻統(tǒng)計(jì)結(jié)果,具體輸出結(jié)果如下。('Hello',4)('World',1)('Scala',1)('Spark',1)('Python',1)如果監(jiān)測的路徑是HDFS上的路徑,直接通過hadoopfs-put***命令將文件***放到到監(jiān)測路徑就可以;如果監(jiān)測的路徑是本地目錄file:///home/data,必須用流的形式寫入到這個(gè)目錄形成文件才能被監(jiān)測到。4DStream創(chuàng)建創(chuàng)建輸入源為文件流的DStream對象2.以程序文件的方式創(chuàng)建文件流在/usr/local/spark/mydata/streaming/logfile目錄下創(chuàng)建FileStreaming.py文件:在文件中輸入以下代碼。frompysparkimportSparkConf,SparkContextfrompyspark.streamingimportStreamingContextconf=SparkConf().setMaster('local[2]').setAppName('SelfStreaming')sc=SparkContext(conf=conf)#創(chuàng)建SparkContext對象ssc=StreamingContext(sc,30)FileDStream=ssc.textFileStream("file:///usr/local/spark/mydata/streaming/logfile")words=FileDStream.flatMap(lambdax:x.split(""))wordPair=words.map(lambdax:(x,1))wordCounts=wordPair.reduceByKey(lambdaa,b:a+b)wordCounts.pprint()ssc.start()ssc.awaitTermination()4DStream創(chuàng)建創(chuàng)建輸入源為文件流的DStream對象2.以程序文件的方式創(chuàng)建文件流保存并關(guān)閉FileStreaming.py文件,執(zhí)行如下命令運(yùn)行FileStreaming.py。$pythonFileStreaming.py執(zhí)行“pythonFileStreaming.py”命令的終端稱為“流計(jì)算終端”。另打開一個(gè)終端,稱為數(shù)據(jù)源終端,在/usr/local/spark/mydata/streaming/logfile目錄下新建一個(gè)log2.txt文件,并輸入一些英文句子:$cat>log2.txt#創(chuàng)建文件HelloWorldHelloScalaHelloSparkHelloPython按Ctrl+D組合鍵存盤退出cat。回到執(zhí)行FileStreaming.py代碼文件的流計(jì)算終端,最多等待30秒,就會(huì)輸出單詞統(tǒng)計(jì)信息。使用Ctrl+C組合鍵即可停止流計(jì)算。4DStream創(chuàng)建定義DStream的輸入數(shù)據(jù)源為套接字流$pythonSocketWordCount.py#運(yùn)行SocketWordCount.py再打開一個(gè)終端(稱為數(shù)據(jù)源終端),啟動(dòng)一個(gè)Socket服務(wù)器端,讓該服務(wù)器端接收客戶端的請求,并向客戶端不斷地發(fā)送數(shù)據(jù)流。使用如下nc命令生成一個(gè)Socket服務(wù)器端:$nc-l-p8888可以通過鍵盤輸入一行英文句子后按Enter鍵,反復(fù)多次輸入英文句子并按Enter鍵,nc程序就會(huì)自動(dòng)把一行又一行的英文句子不斷發(fā)送給SocketWordCount.py程序進(jìn)行處理,這里輸入的兩次英文的內(nèi)容如下。FormanismanandmasterofhisfateBetterlatethanneverSocketWordCount.py每隔5秒就會(huì)執(zhí)行詞頻統(tǒng)計(jì),并輸出詞頻統(tǒng)計(jì)信息:-------------------------------------------Time:2021-12-0909:19:05-------------------------------------------('is',1)('master',1)('of',1)4DStream創(chuàng)建定義DStream的輸入數(shù)據(jù)源為RDD隊(duì)列流$mkdir-p/usr/local/spark/mydata/streaming/rddqueue#創(chuàng)建rddqueue目錄在/usr/local/spark/mydata/streaming/rddqueue目錄下,新建一個(gè)代碼文件:$geditrddQueueStream.py#在打開的rddQueueStream.py文件中并輸入以下代碼:frompysparkimportSparkConf,SparkContextfrompyspark.streamingimportStreamingContextimporttimeconf=SparkConf().setAppName('SelfRddStreaming')sc=SparkContext(conf=conf)#創(chuàng)建SparkContext對象ssc=StreamingContext(sc,5)rddQueue=[]#創(chuàng)建一個(gè)對列foriinrange(5):rddQueue+=[ssc.sparkContext.parallelize([kforkinrange(1,1001)],5)]time.sleep(1)queueStream=ssc.queueStream(rddQueue)result=queueStream.map(lambdar:(r%5,1)).reduceByKey(lambdaa,b:a+b)result.pprint()ssc.start()ssc.stop(stopSparkContext=True,stopGraceFully=True)4DStream創(chuàng)建定義DStream的輸入數(shù)據(jù)源為RDD隊(duì)列流StreamingContext(sc,5)創(chuàng)建每隔5秒對數(shù)據(jù)進(jìn)行處理的StreamingContext對象。ssc.queueStream(rddQueue)創(chuàng)建一個(gè)以“RDD隊(duì)列流”為數(shù)據(jù)源的DStream對象。執(zhí)行ssc.start()語句以后,流計(jì)算過程就開始了,SparkStreaming每隔5秒從rddQueue這個(gè)隊(duì)列中取出數(shù)據(jù)(若干個(gè)RDD)進(jìn)行處理。通過一個(gè)foriinrange(5)循環(huán),不斷向rddQueue中加入新生成的RDD。執(zhí)行for循環(huán)5次以后,ssc.stop(stopSparkContext=True,stopGraceFully=True)語句被執(zhí)行,整個(gè)流計(jì)算過程停止。保存并關(guān)閉rddQueueStream.py文件,執(zhí)行如下命令運(yùn)行rddQueueStream.py。$pythonrddQueueStream.py執(zhí)行上述命令以后,程序就開始運(yùn)行,可以看到類似下面的結(jié)果:-------------------------------------------Time:2021-12-0910:13:00-------------------------------------------(1,200)(2,200)(3,200)(4,200)SparkStreaming流計(jì)算1 流計(jì)算概述2 SparkStreaming工作原理3SparkStreaming編程模型4 DStream創(chuàng)建5DStream操作5DStream操作DStream無狀態(tài)轉(zhuǎn)換操作Dstream的操作可以分成3類:無狀態(tài)轉(zhuǎn)換操作、有狀態(tài)轉(zhuǎn)換操作、輸出操作。每次對新的批次數(shù)據(jù)進(jìn)行處理時(shí),只會(huì)記錄當(dāng)前批次數(shù)據(jù)的狀態(tài),不會(huì)記錄歷史數(shù)據(jù)狀態(tài)信息。DStream對象的常用的無狀態(tài)轉(zhuǎn)換操作。操作描述map(func)對DStream的每個(gè)元素,采用func函數(shù)進(jìn)行轉(zhuǎn)換,返回新DStream對象flatMap(func)與map操作類似,不同的是每個(gè)元素可以被映射為0個(gè)或多個(gè)元素filter(func)返回一個(gè)新的DStream,僅包含源DStream中滿足func函數(shù)的元素repartition(numPartitions)增加或減少DStream中的分區(qū)數(shù),從而改變DStream的并行度union(otherStream)將源DStream和輸入?yún)?shù)為otherDStream的元素合并,并返回一個(gè)新的DStreamcount()通過對DStream中的各個(gè)RDD中的元素進(jìn)行計(jì)數(shù),然后返回只有一個(gè)元素的RDD構(gòu)成的DStreamreduce(func)對源DStream中的各個(gè)RDD中的元素利用函數(shù)func(有兩個(gè)參數(shù)并返回一個(gè)結(jié)果)進(jìn)行聚合操作,返回一個(gè)包含單元素RDD的新DStream5DStream操作DStream無狀態(tài)轉(zhuǎn)換操作操作描述countByValue()計(jì)算DStream中每個(gè)RDD內(nèi)的元素出現(xiàn)的頻次并返回一個(gè)(K,V)鍵值對類型的DStream,其中K是RDD中元素的值,V是K出現(xiàn)的次數(shù)reduceByKey(func,[numTasks])當(dāng)被(K,V)鍵值對組成的DStream調(diào)用時(shí),返回一個(gè)新的由(K,V)鍵值對組成的DStream,其中每個(gè)鍵K的值V由源DStream中鍵為K的值使用func函數(shù)聚合而成groupByKey()將構(gòu)成DStream的RDD中的元素進(jìn)行分組join(otherStream,[numTasks])當(dāng)在兩個(gè)分別為(K,V)和(K,W)鍵值對的DStreams上調(diào)用時(shí),返回類型為(K,(V,W))鍵值對的DStreamcogroup(otherStream,[numTasks])當(dāng)在兩個(gè)分別為(K,V)和(K,W)鍵值對的DStreams上調(diào)用時(shí),返回(K,Seq[V],Seq[W])類型的DStreamtransform(func)通過對源DStream的每RDD應(yīng)用RDD-to-RDD函數(shù),創(chuàng)建一個(gè)新的DStream5DStream操作DStream無狀態(tài)轉(zhuǎn)換操作使用gedit編輯器編輯求每個(gè)人的成績和的sumStreaming.py文件,文件內(nèi)容:frompysparkimportSparkConf,SparkContextfrompyspark.streamingimportStreamingContextconf=SparkConf().setMaster('local[2]').setAppName('SelfStreaming')sc=SparkContext(conf=conf)#創(chuàng)建SparkContext對象ssc=StreamingContext(sc,30)FileDStream=ssc.textFileStream("file:///home/hadoop/filedir")Pair=FileDStream.map(lambdax:(x.split("")[0],int(x.split("")[1])))Sum=Pair.reduceByKey(lambdax,y:x+y)Sum.pprint()ssc.start()ssc.awaitTermination()保存并關(guān)閉sumStreaming.py文件,執(zhí)行如下命令運(yùn)行sumStreaming.py。$pythonsumStreaming.py在pyspark中執(zhí)行ssc.start()以后,自動(dòng)進(jìn)入循環(huán)監(jiān)聽狀態(tài)。這時(shí)切換到“數(shù)據(jù)源終端”,在/home/hadoop/filedir目錄下新建一個(gè)file1.txt文件,在文件中輸入一些英文語句后保存并退出,具體命令如下。$cat>file1.txt#創(chuàng)建文件LiLi89LiHua95ZhangQian88LiLi94LiHua98ZhangQian985DStream操作DStream無狀態(tài)轉(zhuǎn)換操作這時(shí)切換到“數(shù)據(jù)源終端”,在/home/hadoop/filedir目錄下新建一個(gè)file1.txt文件,在文件中輸入一些英文語句后保存并退出,具體命令如下。$cat>file1.txt#創(chuàng)建文件LiLi89LiHua95ZhangQian88LiLi94LiHua98ZhangQian98輸入完成后,按Ctrl+D組合鍵存盤退出cat。切換到“流計(jì)算”終端,最多等待30秒,就可以看到每個(gè)人的成績和,具體輸出結(jié)果如下。('LiLi',183)('LiHua',193)('ZhangQian',186)5DStream操作DStream有狀態(tài)轉(zhuǎn)換操作DStream的有狀態(tài)轉(zhuǎn)化操作是跨時(shí)間區(qū)間跟蹤數(shù)據(jù)的操作,也就是說,一些先前批次的數(shù)據(jù)也被用來參與新的批次中的數(shù)據(jù)計(jì)算。有狀態(tài)轉(zhuǎn)化操作包括滑動(dòng)窗口轉(zhuǎn)換操作和updateStateByKey()操作。通過滑動(dòng)窗口對數(shù)據(jù)進(jìn)行轉(zhuǎn)換的窗口轉(zhuǎn)換操作如表所示。操作描述window(windowLength,slideInterval)返回一個(gè)基于源DStream的窗口批次計(jì)算后得到新的DStreamcountByWindow(windowLength,slideInterval)返回流中元素的滑動(dòng)窗口數(shù)量reduceByWindow(func,windowLength,slideInterval)使用func函數(shù)對滑動(dòng)窗口內(nèi)的元素進(jìn)行聚集,得到一個(gè)單元素流reduceByKeyAndWindow(func,windowLength,slideInterval,[numTasks])當(dāng)在元素類型為(K,V)對的DStream上調(diào)用時(shí),返回一個(gè)新的元素類型為(K,V)鍵值對的DStream,其中每個(gè)key鍵的值在滑動(dòng)窗口中使用給定的reduce函數(shù)func進(jìn)行聚合計(jì)算5DStream操作DStream有狀態(tài)轉(zhuǎn)換操作通過滑動(dòng)窗口對數(shù)據(jù)進(jìn)行轉(zhuǎn)換的窗口轉(zhuǎn)換操作如表所示。操作描述reduceByKeyAndWindow(func,invFunc,windowLength,slideInterval,[numTasks])上述reduceByKeyAndWindow()的一個(gè)更高效的版本,其中每個(gè)窗口的reduce值是使用前一個(gè)窗口的reduce值遞增計(jì)算的countByValueAndWindow(windowLength,slideInterval,[numTasks])在元素類型為(K,V)對的DStream上調(diào)用時(shí),基于滑動(dòng)窗口計(jì)算源DStream中每個(gè)RDD內(nèi)每個(gè)元素出現(xiàn)的頻次并返回DStream[(K,Long)],其中Long是元素頻次5DStream操作DStream有狀態(tài)轉(zhuǎn)換操作下面演示updateStateByKey()用法,創(chuàng)建updateStateByKey.py文件,內(nèi)容:frompysparkimportSparkConf,SparkContextfrompyspark.streamingimportStreamingContextdefupdateFunction(newValues,previousCount):ifpreviousCountisNone:previousCount=0returnsum(newValues,previousCount)#得到新的統(tǒng)計(jì)值conf=SparkConf().setMaster('local[2]').setAppName('updateStateByKey')sc=SparkContext(conf=conf)#創(chuàng)建SparkContext對象ssc=StreamingContext(sc,20)ssc.checkpoint('file:///home/hadoop')#設(shè)置檢查點(diǎn)FileDStream=ssc.textFileStream("file:///home/hadoop/filedir")out1=FileDStream.map(lambdax:(x,1)).reduceByKey(lambdax,y:x+y)runningCounts=out1.updateStateByKey(updateFunction)#更新狀態(tài)runningCounts.pprint()ssc.start()ssc.awai
溫馨提示
- 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
- 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
- 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內(nèi)容里面會(huì)有圖紙預(yù)覽,若沒有圖紙預(yù)覽就沒有圖紙。
- 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
- 5. 人人文庫網(wǎng)僅提供信息存儲(chǔ)空間,僅對用戶上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對任何下載內(nèi)容負(fù)責(zé)。
- 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請與我們聯(lián)系,我們立即糾正。
- 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時(shí)也不承擔(dān)用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。
最新文檔
- 2025年度建筑公工程轉(zhuǎn)讓與建筑安全風(fēng)險(xiǎn)評估合同
- 2025年度旅游節(jié)活動(dòng)免責(zé)協(xié)議范本(含安全保障)
- 2025年度智慧城市運(yùn)營管理技術(shù)咨詢協(xié)議
- 2025年度教育信息化項(xiàng)目投資借款分期還款合同
- 2025年度可再生能源發(fā)電項(xiàng)目建筑工程總承包合同
- 2025年度國際貨物運(yùn)輸與保險(xiǎn)咨詢服務(wù)合同
- 2025年度時(shí)尚服飾購貨合同協(xié)議書(潮流風(fēng)尚)
- 2025年度水利工程建設(shè)項(xiàng)目施工合同專用條款
- 二零二五年度國際美食節(jié)烹飪團(tuán)隊(duì)聘用合同
- 2025年度個(gè)人房屋購買合同樣本下載
- 2024年福建漳州人才發(fā)展集團(tuán)有限公司招聘筆試參考題庫附帶答案詳解
- JTGT F20-2015 公路路面基層施工技術(shù)細(xì)則
- Unit 3 Times change單元教學(xué)設(shè)計(jì)
- 科室醫(yī)院感染風(fēng)險(xiǎn)評估表
- 山東省食用油(植物油)生產(chǎn)企業(yè)名錄496家
- 《智慧農(nóng)業(yè)》的ppt完整版
- GB∕T 33047.1-2016 塑料 聚合物熱重法(TG) 第1部分:通則
- 經(jīng)濟(jì)學(xué)市場失靈與政府失靈課件
- 電力業(yè)務(wù)許可證豁免證明
- 建筑工程資料歸檔立卷分類表(全)
- 六年級上第二單元知識(shí)結(jié)構(gòu)圖
評論
0/150
提交評論