第16周flink快速上手篇it資源附件162學習預覽_第1頁
第16周flink快速上手篇it資源附件162學習預覽_第2頁
第16周flink快速上手篇it資源附件162學習預覽_第3頁
第16周flink快速上手篇it資源附件162學習預覽_第4頁
第16周flink快速上手篇it資源附件162學習預覽_第5頁
已閱讀5頁,還剩24頁未讀, 繼續(xù)免費閱讀

下載本文檔

版權(quán)說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請進行舉報或認領(lǐng)

文檔簡介

1:Flinkermr個特定的時間后,必須觸發(fā)Window去進行計算了,inorderoutoforder:Watermark的生成方式Watermark的生成方式有兩種 WithPeriodic每隔N秒自動向流里面注入一個Watermark,時間間隔由ExecutionConfig.setAutoWatermarkIntervalFlink200ms。之前默100ms WithPunctuatedsocket模擬產(chǎn)生數(shù)據(jù),數(shù)據(jù)的格式為:0001,1790820682000其中1790820682000是數(shù)據(jù)產(chǎn)生的時間,也就是EventTimeWindow打印信息來驗證Window被觸發(fā)的時機scalapackagepackageimportimportimportimportimportorg.apache.flink.streaming.api.scala.StreamExecutionEnvironmentimportorg.apache.flink.streaming.api.scala.function.WindowFunctionimportorg.apache.flink.streaming.api.windowing.time.Timeimportimportimportscala.collection.mutable.ArrayBufferimportscala.util.SortingWatermark+EventTimeCreatedbyobjectWatermarkOpScaladefmain(args:Array[String]):Unit=valenv=//1importorg.apache.flink.api.scala._//tuple2valtupStream=text.map(line=>{valarr=line.split(",")(arr(0), uration.ofSeconds(10))//最大允許的數(shù)據(jù)亂序時間{valsdf=newSimpleDateFormat("yyyy-MM-ddHH:mm:ss")varcurrentMaxTimestamp=0LoverridedefextractTimestamp(element:(String,Long),recordTimestamp:Long):=valtimestamp=currentMaxTimestamp=//計算當前的watermarkvalcurrentWatermark=currentMaxTimestamp-//print}).apply(newWindowFunction[Tuple2[String,Long],String,Tuple,TimeWindow]overridedefapply(key:Tuple,window:TimeWindow,input:Ible[(String,Long)],out:Collector[String]):Unit={valkeyStr=//windowarrBuffvalarrBuff=ArrayBuffer[Long]()//arrBuffarrvalarr//arrvalsdf=newSimpleDateFormat("yyyy-MM-dd }}}[root@bigdata04soft]#nc-l900110:11:22],watermark:[1790820672000|2026-10-0110:11:12]EventEvent2026-10-012026-10-01[root@bigdata04soft]#nc-l900010:11:26],watermark:[1790820676000|2026-10-0110:11:16]EventEvent2026-10-012026-10-012026-10-012026-10-01[root@bigdata04soft]#nc-l900010:11:32],watermark:[1790820682000|2026-10-0110:11:22]2026-10-012026-10-012026-10-012026-10-012026-10-012026-10-012026-10-01[root@bigdata04soft]#nc-l900010:11:33],watermark:[1790820683000|2026-10-0110:11:23]EventEvent2026-10-012026-10-012026-10-012026-10-012026-10-012026-10-012026-10-012026-10-01來算,最早的數(shù)據(jù)已經(jīng)過去了11s了,Window還沒有開始計算,那到底什么時候會觸發(fā)Window呢?[root@bigdata04soft]#nc-l900010:11:34],watermark:[1790820684000|2026-10-0110:11:24](0001),1,2026-10-0110:11:22,2026-10-0110:11:22,2026-10-0110:11:21,2026-10-01Event數(shù)據(jù),則當Watermark時間>=EventTime時,就符合了Window觸發(fā)的條件了,最終決定Window觸發(fā),還是由數(shù)據(jù)本身的EventTime所屬Windowwindow_end_time決定。早的一條記錄所在Window的window_end_time,所以Window就被觸發(fā)了。[root@bigdata04soft]#nc-l900010:11:36],watermark:[1790820686000|2026-10-0110:11:26] Event

此時,Watermark時間雖然已經(jīng)等于第二條數(shù)據(jù)的時間,但是由于其沒有達到第二條數(shù)據(jù)所WindowWindowWindow時[root@bigdata04soft]#nc-l900010:11:37],watermark:[1790820687000|2026-10-0110:11:27](0001),1,2026-10-0110:11:26,2026-10-0110:11:26,2026-10-0110:11:24,2026-10-01Event1:Watermarkwindow_end_time2:在[inow_strt_tie,indow_nd_tim)區(qū)間中有數(shù)據(jù)存在(注意是左閉右開的區(qū)間)。同時滿足了以上2個條件,Window才會觸發(fā)。:+EentTimWatermarkEventTime機制,是如何處理亂序數(shù)據(jù)的。[root@hadoop100soft]#nc-l10:11:39],watermark:[1790820689000|2026-10-0110:11:29]10:11:39],watermark:[1790820689000|2026-10-0110:11:29]EventEvent10:11:31currentMaxTimestamp1:watermark時間>=window_end_timeWatermark時間(10:11:29)<window_end_time(10:11:33),WindowWindow一定就會觸發(fā)了,我們試一試,繼續(xù)輸入內(nèi)容。Event2個數(shù)據(jù),10:11:3110:11:3210:11:33的數(shù)據(jù),上邊的結(jié)果,已經(jīng)表明,對于的數(shù)據(jù),F(xiàn)link可以通過Watermark來實現(xiàn)處理一定范圍內(nèi)的亂序數(shù)據(jù)。那么對于“(lateelement)”太久的數(shù)據(jù),F(xiàn)link是怎么處理的呢?:LateElement(我們輸入一個亂序很多的(EventTimeWatermark時間)數(shù)據(jù)來測試下:輸入2行內(nèi)容。[root@hadoop100soft]#nc-l900010:11:30],watermark:[1790820680000|2026-10-0110:11:20]10:11:43],watermark:[1790820693000|2026-10-0110:11:33](0001),1,2026-10-0110:11:30,2026-10-0110:11:30,2026-10-0110:11:30,2026-10-01Eventwatermark2026-10-01輸入3行內(nèi)容。[root@hadoop100soft]#nc-l900010:11:43],watermark:[1790820693000|2026-10-0110:11:33]10:11:43],watermark:[1790820693000|2026-10-0110:11:33]2:allowedLatenessFlink提供了allowedLateness方法可以實現(xiàn)對的數(shù)據(jù)設(shè)置一個延遲時間,在指定延遲時間內(nèi)到達的數(shù)據(jù)還是可以觸發(fā)window執(zhí)行的。[root@hadoop100soft]#nc-l900010:11:30],watermark:[1790820680000|2026-10-0110:11:20]10:11:43],watermark:[1790820693000|2026-10-01(0001),1,2026-10-0110:11:30,2026-10-0110:11:30,2026-10-0110:11:30,2026-10-01EventWatermark2026-10-0110:11:33EventTime<Watermark的數(shù)據(jù)驗證一下效果,輸入3行內(nèi)容。[root@hadoop100soft]#nc-l900010:11:43],watermark:[1790820693000|2026-10-0110:11:33](0001),2,2026-10-0110:11:30,2026-10-0110:11:30,2026-10-0110:11:30,2026-10-0110:11:3310:11:43],watermark:[1790820693000|2026-10-0110:11:33](0001),3,2026-10-0110:11:30,2026-10-0110:11:31,2026-10-0110:11:30,2026-10-0110:11:3310:11:43],watermark:[1790820693000|2026-10-0110:11:33](0001),4,2026-10-0110:11:30,2026-10-0110:11:32,2026-10-0110:11:30,2026-10-01Event[root@hadoop100soft]#nc-l900010:11:44],watermark:[1790820694000|2026-10-0110:11:34]Event[root@hadoop100soft]#nc-l900010:11:43],watermark:[1790820693000|2026-10-0110:11:33](0001),5,2026-10-0110:11:30,2026-10-0110:11:32,2026-10-0110:11:30,2026-10-0110:11:3310:11:43],watermark:[1790820693000|2026-10-0110:11:33](0001),6,2026-10-0110:11:30,2026-10-0110:11:32,2026-10-0110:11:30,2026-10-0110:11:3310:11:43],watermark:[1790820693000|2026-10-0110:11:33](0001),7,2026-10-0110:11:30,2026-10-0110:11:32,2026-10-0110:11:30,2026-10-011Watermark10:11:35。[root@hadoop100soft]#nc-l900010:11:45],watermark:[1790820695000|2026-10-0110:11:35]此時,Watermark10:11:35我們再輸入幾條EventTime<Watermark3[root@hadoop100soft]#nc-l900010:11:45],watermark:[1790820695000|2026-10-0110:11:35]10:11:45],watermark:[1790820695000|2026-10-0110:11:35]10:11:45],watermark:[1790820695000|2026-10-0110:11:35]□當Watemark等于10:11:33window_end_time所以會觸發(fā)Window當窗口執(zhí)行過后,我們再輸入[10:11:30~10:11:33)WindowWindow是Watemark10:11:34的時候,我們輸入[10:11:30~10:11:33)Window內(nèi)的數(shù)據(jù)會發(fā)現(xiàn)Window也是可以被觸發(fā)的。Watemark10:11:35的時候,我們輸入[10:11:30~10:11:33)Window內(nèi)的數(shù)據(jù)會發(fā)現(xiàn)Window不會被觸發(fā)了。由于我們面設(shè)置了allowedLateness(Time.seconds(2)),因此可以允許延遲在2s內(nèi)的數(shù)據(jù)繼續(xù)觸發(fā)Window執(zhí)行。所以當Watermark是10:11:34的時候可以觸發(fā)Window10:11:35的時候就。□時第二次(或多次)Watermark<window_end_time+allowedLateness時間內(nèi),這個窗口有Late數(shù)據(jù)到達時。Watermark10:11:34EventTime10:11:30、10:11:31、10:11:32window_end_time都是10:11:33,也就是10:11:34<10:11:33+2true。但是當Watermark等于10:11:35的時候,我們再輸入EventTime為10:11:3010:11:3110:11:32window_end_time10:11:33,此時,10:11:35<10:11:33+2為false了,所以最終這些數(shù)據(jù)的時間太久了,就不會再觸發(fā)Window的執(zhí)行操作了。3:sideOutputLateData收集的數(shù)[root@hadoop100soft]#nc-l900010:11:30],watermark:[1790820680000|2026-10-0110:11:20]10:11:43],watermark:[1790820693000|2026-10-0110:11:33](0001),1,2026-10-0110:11:30,2026-10-0110:11:30,2026-10-0110:11:30,2026-10-01此時,WindowWatermark10:11:33[root@hadoop100soft]#nc-l900010:11:43],watermark:[1790820693000|2026-10-0110:11:33]10:11:43],watermark:[1790820693000|2026-10-0110:11:33]10:11:43],watermark:[1790820693000|2026-10-0110:11:33]此時,針對這幾條的數(shù)據(jù),都通過sideOutputLateData保存到了outputTag中[root@hadoop100soft]#nc-l900010:11:22],watermark:/p>

溫馨提示

  • 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內(nèi)容里面會有圖紙預覽,若沒有圖紙預覽就沒有圖紙。
  • 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
  • 5. 人人文庫網(wǎng)僅提供信息存儲空間,僅對用戶上傳內(nèi)容的表現(xiàn)方式做保護處理,對用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對任何下載內(nèi)容負責。
  • 6. 下載文件中如有侵權(quán)或不適當內(nèi)容,請與我們聯(lián)系,我們立即糾正。
  • 7. 本站不保證下載資源的準確性、安全性和完整性, 同時也不承擔用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。

評論

0/150

提交評論