《數(shù)據(jù)實時處理flink》課件-第六章 狀態(tài)和檢查點_第1頁
《數(shù)據(jù)實時處理flink》課件-第六章 狀態(tài)和檢查點_第2頁
《數(shù)據(jù)實時處理flink》課件-第六章 狀態(tài)和檢查點_第3頁
《數(shù)據(jù)實時處理flink》課件-第六章 狀態(tài)和檢查點_第4頁
《數(shù)據(jù)實時處理flink》課件-第六章 狀態(tài)和檢查點_第5頁
已閱讀5頁,還剩45頁未讀, 繼續(xù)免費閱讀

下載本文檔

版權(quán)說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請進行舉報或認領(lǐng)

文檔簡介

狀態(tài)和檢查點本章將重點圍繞狀態(tài)、檢查點(Checkpoint)和保存點(Savepoint)三個概念來介紹如何在Flink上進行有狀態(tài)的計算。在Flink架構(gòu)體系中,有狀態(tài)計算可以說是Flink非常重要的特性之一。有狀態(tài)計算是指在程序計算過程中,在Flink程序內(nèi)部存儲計算產(chǎn)生的中間結(jié)果,并提供給后續(xù)Function或算子計算結(jié)果使用。檢查點是Flink保證exactly-once的重要特性。通過本節(jié)學習您將可以:掌握Flink中幾種常用的狀態(tài)以及具體使用方法。掌握Checkpoint機制的原理和配置方法。了解Savepoint機制的原理和使用方法。實現(xiàn)有狀態(tài)的計算Checkpoint原理及配置方法Savepoint原理及使用方法

什么是有狀態(tài)的計算有狀態(tài)計算的潛在場景數(shù)據(jù)去重:需要記錄哪些數(shù)據(jù)已經(jīng)流入過應(yīng)用,當新數(shù)據(jù)流入時,根據(jù)已流入數(shù)據(jù)去重檢查輸入流是否符合某個特定模式:之前流入的數(shù)據(jù)以狀態(tài)的形式緩存下來對一個窗口內(nèi)的數(shù)據(jù)進行聚合分析,比如分析一小時內(nèi)某項指標75分位值或99分位值Flink分布式計算,一個算子有多個算子子任務(wù)狀態(tài)可以被理解為某個算子子任務(wù)在當前實例上的一個變量,變量記錄了數(shù)據(jù)流的歷史信息,新數(shù)據(jù)流入,可以結(jié)合歷史信息來進行計算接收輸入流/獲取對應(yīng)狀態(tài)/更新狀態(tài)狀態(tài)管理的難點要解決問題:實時性,延遲不能太高數(shù)據(jù)不丟不重、恰好計算一次,尤其發(fā)生故障恢復后程序的可靠性要高,保證7*24小時穩(wěn)定運行難點不能將狀態(tài)直接交由內(nèi)存,因為內(nèi)存空間有限用持久化的系統(tǒng)備份狀態(tài),出現(xiàn)故障時,如何從備份中恢復需要考慮擴展到多個節(jié)點時的伸縮性Flink解決了上述問題,提供有狀態(tài)的計算APIManaged

State和Raw

State托管狀態(tài)(ManagedState)是由Flink管理的,F(xiàn)link幫忙存儲、恢復和優(yōu)化原生狀態(tài)(RawState)是開發(fā)者自己管理的,需要自己序列化Managed

State又細分為Keyed

State和Operator

StateFlink的幾種狀態(tài)類型

ManagedStateRawState狀態(tài)管理方式FlinkRuntime托管,自動存儲、自動恢復、自動伸縮用戶自己管理狀態(tài)數(shù)據(jù)結(jié)構(gòu)Flink提供的常用數(shù)據(jù)結(jié)構(gòu),如ListState、MapState等字節(jié)數(shù)組:byte[]使用場景絕大多數(shù)Flink函數(shù)用戶自定義函數(shù)Keyed

State是KeyedStream上的狀態(tài),每個Key共享一個狀態(tài)OperatorState每個算子子任務(wù)共享一個狀態(tài)Keyed

State和Operator

StateKeyed

State相同Key的數(shù)據(jù)可以訪問、更新這個狀態(tài)Operator

State流入這個算子子任務(wù)的所有數(shù)據(jù)可以訪問、更新這個狀態(tài)Keyed

State和Operator

State都是基于本地的,每個算子子任務(wù)維護著自身的狀態(tài),不能訪問其他算子子任務(wù)的狀態(tài)具體的實現(xiàn)層面,Keyed

State需要重寫Rich

Function函數(shù)類,Operator

State需要實現(xiàn)CheckpointedFunction等接口Keyed

State和Operator

State

KeyedStateOperatorState適用算子類型只適用于KeyedStream上的算子可以用于所有算子狀態(tài)分配每個Key對應(yīng)一個狀態(tài)一個算子子任務(wù)對應(yīng)一個狀態(tài)創(chuàng)建和訪問方式重寫RichFunction,通過里面的RuntimeContext訪問實現(xiàn)CheckpointedFunction等接口橫向擴展狀態(tài)隨著Key自動在多個算子子任務(wù)上遷移有多種狀態(tài)重新分配的方式支持的數(shù)據(jù)結(jié)構(gòu)ValueState、ListState、MapState等ListState、BroadcastState等修改Flink應(yīng)用的并行度:每個算子的并行算子子任務(wù)數(shù)發(fā)生了變化,整個應(yīng)用需要關(guān)停和啟動一些算子子任務(wù)某份在原來某個算子子任務(wù)的狀態(tài)需要平滑更新到新的算子子任務(wù)上Flink的Checkpoint可以輔助狀態(tài)數(shù)據(jù)在算子子任務(wù)之間遷移算子子任務(wù)生成快照(Snapshot)保存到分布式存儲上子任務(wù)重啟后,相應(yīng)的狀態(tài)在分布式存儲上重建(Restore)Keyed

State與Operator

State的橫向擴展方式稍有不同橫向擴展問題Flink提供了封裝好的數(shù)據(jù)結(jié)構(gòu)供我們使用,包括ValueState、ListState等主要有:ValueState:單值MapState:Key-Value對ListState:列表ReducingState和AggregatingState:合并Keyed

State由于跟Key綁定,Key自動分布到不同算子子任務(wù),Keyed

State也可以根據(jù)Key分發(fā)到不同算子子任務(wù)上Keyed

State實現(xiàn)RichFunction函數(shù)類,比如RichFlatMapFunction創(chuàng)建StateDescriptor,StateDescriptor描述狀態(tài)的名字和狀態(tài)的數(shù)據(jù)結(jié)構(gòu),每種類型的狀態(tài)有對應(yīng)的StateDescriptor通過StateDescriptor,從RuntimeContext中獲取狀態(tài)調(diào)用狀態(tài)提供的方法,獲取狀態(tài),更新數(shù)據(jù)Keyed

State//創(chuàng)建StateDescriptor

MapStateDescriptor<String,Integer>behaviorMapStateDescriptor=newMapStateDescriptor<String,Integer>("behaviorMap",Types.STRING,Types.INT);//通過StateDescriptor獲取運行時上下文中的狀態(tài)

behaviorMapState=getRuntimeContext().getMapState(behaviorMapStateDescriptor);MapState<UK,UV>:UVget(UKkey)voidput(UKkey,UVvalue)booleancontains(UKkey)…案例:統(tǒng)計電商用戶行為UserBehavior場景下,某個用戶(userId)下某種用戶行為(behavior)的數(shù)量Keyed

State/**

*MapStateFunction繼承并實現(xiàn)RichFlatMapFunction*兩個泛型分別為輸入數(shù)據(jù)類型和輸出數(shù)據(jù)類型*/

publicstaticclass

MapStateFunction

extends

RichFlatMapFunction<UserBehavior,Tuple3<Long,String,Integer>>{//指向MapState的句柄

privateMapState<String,Integer>behaviorMapState;@Overridepublicvoidopen(Configurationconfiguration){//創(chuàng)建StateDescriptor

MapStateDescriptor<String,Integer>behaviorMapStateDescriptor=newMapStateDescriptor<String,Integer>("behaviorMap",Types.STRING,Types.INT);//通過StateDescriptor獲取運行時上下文中的狀態(tài)

behaviorMapState=getRuntimeContext().getMapState(behaviorMapStateDescriptor);}@OverridepublicvoidflatMap(UserBehaviorinput,Collector<Tuple3<Long,String,Integer>>out)throwsException{intbehaviorCnt=1;//behavior有可能為pv、cart、fav、buy等

//判斷狀態(tài)中是否有該behavior

if(behaviorMapState.contains(input.behavior)){behaviorCnt=behaviorMapState.get(input.behavior)+1;}//更新狀態(tài)

behaviorMapState.put(input.behavior,behaviorCnt);out.collect(Tuple3.of(input.userId,input.behavior,behaviorCnt));}}使用MapState記錄某個behavior下的數(shù)量<behavior,

behaviorCnt>UserBehavior案例先基于userId進行keyBy再使用有狀態(tài)的MapStateFunction進行處理Keyed

Stateenv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);DataStream<UserBehavior>userBehaviorStream=...//生成一個KeyedStream

KeyedStream<UserBehavior,Long>keyedStream=userBehaviorStream.keyBy(user->user.userId);//在KeyedStream上進行flatMap

DataStream<Tuple3<Long,String,Integer>>behaviorCountStream=keyedStream.flatMap(newMapStateFunction());狀態(tài):算子子任務(wù)的本地數(shù)據(jù)在Checkpoint過程時寫入存儲,這個過程被稱為備份(Snapshot)初始化或重啟一個Flink作業(yè)時,以一定邏輯從存儲中讀出并變?yōu)樗阕幼尤蝿?wù)的本地數(shù)據(jù),這個過程被稱為重建(Restore)Keyed

State開箱即用:數(shù)據(jù)劃分基于Key,Snapshot和Restore過程可以基于Key在多個算子子任務(wù)之間做數(shù)據(jù)遷移Operator

State每個算子子任務(wù)管理自己的狀態(tài),流入到這個算子子任務(wù)上的所有數(shù)據(jù)可以訪問和修改Operator

State故障重啟后,數(shù)據(jù)流中某個元素不一定流入重啟前的算子子任務(wù)上需要根據(jù)具體業(yè)務(wù)場景設(shè)計Snapshot和Restore的邏輯使用CheckpointedFunction接口類Operator

StateFlink定期執(zhí)行Checkpoint,會將狀態(tài)數(shù)據(jù)Snapshot到存儲上每次執(zhí)行Snapshot,會調(diào)用snapshotState()方法,因此我們要實現(xiàn)一些Snapshot邏輯,比如將哪些狀態(tài)持久化initializeState()在算子子任務(wù)初始化狀態(tài)時調(diào)用,有兩種被調(diào)用的可能:整個Flink作業(yè)第一次執(zhí)行,狀態(tài)數(shù)據(jù)需要初始化一個默認值Flink作業(yè)遇到故障重啟,基于之前已經(jīng)持久化的狀態(tài)恢復ListState

/

UnionListStateBroadcastStateOperator

Statepublic

interface

CheckpointedFunction{//Checkpoint時會調(diào)用這個方法,我們要實現(xiàn)具體的snapshot邏輯,比如將哪些本地狀態(tài)持久化

void

snapshotState(FunctionSnapshotContextcontext)

throwsException;//初始化時會調(diào)用這個方法,向本地狀態(tài)中填充數(shù)據(jù)

void

initializeState(FunctionInitializationContextcontext)

throwsException;}CheckpointedFunction源碼狀態(tài)以列表的形式序列化并存儲單個狀態(tài)為S,每個算子子任務(wù)有零到多個狀態(tài),共同組成一個列表ListState[S],Snapshot時將這些狀態(tài)以列表形式寫入存儲包含所有狀態(tài)的大列表,當作業(yè)重啟時,將這個大列表重新分布到各個算子子任務(wù)上ListState:將大列表按照Round-Ribon模式均勻分布到各個算子子任務(wù)上,每個算子子任務(wù)得到的是大列表的子集UnionListState:將大列表廣播給所有算子子任務(wù)應(yīng)用場景:Source上保存流入數(shù)據(jù)的偏移量,Sink上對輸出數(shù)據(jù)做緩存Operator

State

ListState、UnionListStateOperator

State使用方法重點實現(xiàn)snapshotState()和initializeState()兩個方法在initializeState()方法里初始化并獲取狀態(tài)注冊StateDescriptor,指定狀態(tài)名字和數(shù)據(jù)類型從FunctionInitializationContext中獲取OperatorStateStore,進而獲取Operator

State在snapshotState()方法里實現(xiàn)一些業(yè)務(wù)邏輯基于ListState實現(xiàn)可緩存的Sink//重寫CheckpointedFunction中的snapshotState()

//將本地緩存Snapshot到存儲上

@OverridepublicvoidsnapshotState(FunctionSnapshotContextcontext)throwsException{//將之前的Checkpoint清理

checkpointedState.clear();for(Tuple2<String,Integer>element:bufferedElements){//將最新的數(shù)據(jù)寫到狀態(tài)中

checkpointedState.add(element);}}//重寫CheckpointedFunction中的initializeState()

//初始化狀態(tài)

@OverridepublicvoidinitializeState(FunctionInitializationContextcontext)throwsException{//注冊ListStateDescriptor

ListStateDescriptor<Tuple2<String,Integer>>descriptor=newListStateDescriptor<>("buffered-elements",TypeInformation.of(newTypeHint<Tuple2<String,Integer>>(){}));//從FunctionInitializationContext中獲取OperatorStateStore,進而獲取ListState

checkpointedState=context.getOperatorStateStore().getListState(descriptor);//如果是作業(yè)重啟,讀取存儲中的狀態(tài)數(shù)據(jù)并填充到本地緩存中

if(context.isRestored()){for(Tuple2<String,Integer>element:checkpointedState.get()){bufferedElements.add(element);}}}Sink先將數(shù)據(jù)放在本地緩存中,并定期通過snapshotState()方法進行SnapshotinitializeState()初始化狀態(tài),需判斷是新作業(yè)還是重啟作業(yè)snapshotState()initializeState()Broadcast可以將部分數(shù)據(jù)同步到所有實例上使用場景:一個主數(shù)據(jù)流,一個控制規(guī)則流,主數(shù)據(jù)流比較大,只能分散在多個算子實例上,控制規(guī)則流數(shù)據(jù)比較小,可以廣播分發(fā)到所有算子實例上。與Join的區(qū)別:控制規(guī)則流較小,可以放到每個算子實例里電商用戶行為分析案例:識別用戶行為模式,行為模式包括“反復猶豫下單類”、“頻繁爬取數(shù)據(jù)類”等,控制流里包含了這些行為模式,使用Flink實時識別用戶Broadcast

State主邏輯中讀取兩個數(shù)據(jù)流Broadcast

State支持Key-Value形式,需要使用MapStateDescriptor來構(gòu)建再使用broadcast()方法將數(shù)據(jù)廣播到所有算子子任務(wù)上,得到BroadcastStream主數(shù)據(jù)流先進行keyBy(),然后與廣播流合并,在KeyedBroadcastProcessFunction中實現(xiàn)具體業(yè)務(wù)邏輯BroadcastPatternFunction是KeyedBroadcastProcessFunction的具體實現(xiàn)Broadcast

State//主數(shù)據(jù)流

DataStream<UserBehavior>userBehaviorStream=...//BehaviorPattern數(shù)據(jù)流

DataStream<BehaviorPattern>patternStream=...//BroadcastState只能使用Key->Value結(jié)構(gòu),基于MapStateDescriptor

MapStateDescriptor<Void,BehaviorPattern>broadcastStateDescriptor=newMapStateDescriptor<>("behaviorPattern",Types.VOID,Types.POJO(BehaviorPattern.class));BroadcastStream<BehaviorPattern>broadcastStream=patternStream.broadcast(broadcastStateDescriptor);//生成一個KeyedStream

KeyedStream<UserBehavior,Long>keyedStream=userBehaviorStream.keyBy(user->user.userId);//在KeyedStream上進行connect()和process()

DataStream<Tuple2<Long,BehaviorPattern>>matchedStream=keyedStream.connect(broadcastStream).process(newBroadcastPatternFunction());processElement()方法處理主數(shù)據(jù)流中的每條元素,輸出零到多個數(shù)據(jù)processBroadcastElement()方法處理廣播流,可以輸出零到多個數(shù)據(jù),一般用來更新BroadcastStateKeyedBroadcastProcessFunction屬于ProcessFunction系列函數(shù),可以注冊Timer,并在onTimer方法中實現(xiàn)回調(diào)邏輯。KeyedBroadcastProcessFunction實現(xiàn)有狀態(tài)的計算Checkpoint原理及配置方法Savepoint原理及使用方法Flink的狀態(tài)是基于本地的,本地狀態(tài)數(shù)據(jù)不可靠Checkpoint機制:Flink定期將狀態(tài)數(shù)據(jù)保存到存儲上,故障發(fā)生后將狀態(tài)數(shù)據(jù)恢復。快照(Snapshot)、分布式快照(DistributedSnapshot)和檢查點(Checkpoint)均指的是Flink將狀態(tài)寫入存儲的過程一個簡單的Checkpoint流程:暫停處理新流入數(shù)據(jù),將新數(shù)據(jù)緩存下來將算子子任務(wù)的本地狀態(tài)數(shù)據(jù)拷貝到一個遠程的持久化存儲上繼續(xù)處理新流入的數(shù)據(jù),包括剛才緩存起來的數(shù)據(jù)Checkpoint機制檢查點分界線(CheckpointBarrier)被插入到數(shù)據(jù)流中,將數(shù)據(jù)流切分成段。Flink的算子接收到CheckpointBarrier后,對狀態(tài)進行快照。每個CheckpointBarrier有一個ID,表示該段數(shù)據(jù)屬于哪次Checkpoint。當ID為n的CheckpointBarrier到達每個算子后,表示要對n-1和n之間狀態(tài)更新做快照。

Checkpoint

Barrier構(gòu)建并行度為2的數(shù)據(jù)流圖Flink的檢查點協(xié)調(diào)器(CheckpointCoordinator)觸發(fā)一次Checkpoint(TriggerCheckpoint),這個請求會發(fā)送給Source的各個子任務(wù)。分布式快照流程各Source算子子任務(wù)接收到這個Checkpoint請求之后,會將自己的狀態(tài)寫入到狀態(tài)后端,生成一次快照向下游廣播CheckpointBarrier分布式快照流程Source算子做完快照后,還會給CheckpointCoodinator發(fā)送一個確認(ACK)ACK中包括了一些元數(shù)據(jù),包括備份到State

Backend的狀態(tài)句柄(指向狀態(tài)的指針)Source算子完成了一次Checkpoint分布式快照流程對于下游算子來說,可能有多個與之相連的上游輸入。一個輸入被稱為一條通道。Id為n的Checkpoint

Barrier會被廣播到多個通道。不同通道的Checkpoint

Barrier傳播速度不同。需要進行對齊(BarrierAlignment)對齊分四步:1

.算子子任務(wù)在某個輸入通道中收到第一個ID為n的CheckpointBarrier,其他輸入通道中ID為n的CheckpointBarrier還未到達。2

.算子子任務(wù)將第一個輸入通道的數(shù)據(jù)緩存下來,同時繼續(xù)處理其他輸入通道的數(shù)據(jù),這個過程被稱為對齊。3

.第二個輸入通道的CheckpointBarrier抵達該算子子任務(wù),該算子子任務(wù)執(zhí)行快照,將狀態(tài)寫入StateBackend,然后將ID為n的CheckpointBarrier向下游所有輸出通道廣播。4

.對于這個算子子任務(wù),快照執(zhí)行結(jié)束,繼續(xù)處理各個通道中新流入數(shù)據(jù),包括剛才緩存起來的數(shù)據(jù)。Checkpoint

Barrier對齊每個算子都要執(zhí)行一遍上述的對齊、快照、確認的工作最后的Sink算子發(fā)送確認后,說明ID為n的Checkpoint執(zhí)行結(jié)束,CheckpointCoordinator向StateBackend寫入一些本次Checkpoint的元數(shù)據(jù)Checkpoint完成CheckpointBarrier對齊時,必須等待所有上游通道都處理完。假如某個上游通道處理很慢,這可能造成整個數(shù)據(jù)流堵塞。一個算子子任務(wù)不需要等待所有上游通道的CheckpointBarrier,直接將CheckpointBarrier廣播,算子子任務(wù)直接執(zhí)行快照并繼續(xù)處理后續(xù)流入數(shù)據(jù)。Flink必須將那些上下游正在傳輸?shù)臄?shù)據(jù)也作為狀態(tài)保存到快照中。開啟Unaligned

Checkpoint:Unaligned

Checkpoint優(yōu)缺點:不需要對齊,Checkpoint速度快傳輸數(shù)據(jù)也要快照,狀態(tài)數(shù)據(jù)大,磁盤負載加重,重啟后狀態(tài)恢復時間過長,運維管理難度大Unaligned

Checkpointenv.getCheckpointConfig().enableUnalignedCheckpoints();

每次執(zhí)行數(shù)據(jù)快照時,不需要暫停新流入數(shù)據(jù)。Flink啟動一個后臺線程,它創(chuàng)建本地狀態(tài)的一份復制,這個線程用來將本地狀態(tài)的復制同步到StateBackend上,一旦數(shù)據(jù)同步完成,再給CheckpointCoordinator發(fā)送確認信息。該過程被稱為異步快照(AsynchronousSnapshot)。利用寫入時復制(Copy-on-Write):如果這份內(nèi)存數(shù)據(jù)沒有任何修改,那沒必要生成一份復制,如果這份內(nèi)存數(shù)據(jù)有一些更新,那再去申請額外的內(nèi)存空間并維護兩份數(shù)據(jù),一份是快照時的數(shù)據(jù),一份是更新后的數(shù)據(jù)。是否開啟異步快照可配置。異步快照State

Backend用來持久化狀態(tài)數(shù)據(jù)Flink內(nèi)置三種State

Backend:MemoryStateBackendFsStateBackendRocksDBStateBackendState

Backend基于內(nèi)存,數(shù)據(jù)存儲在Java的堆區(qū)。進行分布式快照時,所有算子子任務(wù)會將自己內(nèi)存上的狀態(tài)同步到JobManager的堆上,因此一個作業(yè)的所有狀態(tài)要小于JobManager的內(nèi)存大小,否則將拋出OutOfMemoryError異常。只適合調(diào)試或者實驗,不建議在生產(chǎn)環(huán)境下使用。如果不做其他聲明,默認情況是使用這種模式作為StateBackend。設(shè)置使用內(nèi)存作為State

Backend,MAX_MEM_STATE_SIZE為設(shè)置的狀態(tài)的最大值:MemoryStateBackendenv.setStateBackend(newMemoryStateBackend(MAX_MEM_STATE_SIZE));基于文件系統(tǒng),數(shù)據(jù)最終持久化到文件系統(tǒng)上文件系統(tǒng)包括本地磁盤、HDFS、Amazon、阿里云等在內(nèi)的云存儲服務(wù),使用時需要提供文件系統(tǒng)的地址,寫明前綴:file://、hdfs://或s3://默認開啟異步快照本地的狀態(tài)在TaskManager的堆內(nèi)存上,執(zhí)行快照時狀態(tài)數(shù)據(jù)會寫到文件系統(tǒng)上FsStateBackend//使用HDFS作為StateBackend

env.setStateBackend(newFsStateBackend("hdfs://namenode:port/flink-checkpoints/chk-17/"));//使用阿里云OSS作為StateBackend

env.setStateBackend(newFsStateBackend("oss://<your-bucket>/<object-name>"));//使用Amazon作為StateBackend

env.setStateBackend(newFsStateBackend("s3://<your-bucket>/<endpoint>"));//關(guān)閉AsynchronousSnapshot

env.setStateBackend(newFsStateBackend(checkpointPath,false));本地狀態(tài)存儲在本地RocksDB上,Checkpoint時將RocksDB數(shù)據(jù)再寫到遠程的存儲上,因此需要配置一個分布式存儲的地址。本地狀態(tài)基于RocksDB,可以突破內(nèi)存空間的限制,可存儲的狀態(tài)量更大。但RocksDB需要序列化和反序列化,讀寫時間成本高。支持增量快照(IncrementalCheckpoint):只對發(fā)生變化的數(shù)據(jù)增量寫到分布式存儲上,而不是將所有的本地狀態(tài)都拷貝過去。非常適合超大規(guī)模的狀態(tài)。但重啟恢復的時間更長。需要手動開啟:RocksDBStateBackend//開啟IncrementalCheckpoint

booleanenableIncrementalCheckpointing=true;env.setStateBackend(newRocksDBStateBackend(checkpointPath,enableIncrementalCheckpointing));默認情況下,Checkpoint機制是關(guān)閉的,開啟:n表示每隔n毫秒進行一次CheckpointCheckpoint耗時可能比較長,n設(shè)置過小,有可能出現(xiàn)一次Checkpoint還沒完成,下次Checkpoint已經(jīng)被觸發(fā),n設(shè)置過大,如果重啟,整個作業(yè)需要從更長的Offset開始重新處理數(shù)據(jù)開啟Checkpoint,使用Checkpoint

Barrier對齊功能,可以提供Exactly-Once語義At-Least-Once語義:不使用Checkpoint

Barrier對齊功能,但某些數(shù)據(jù)可能被處理多次一些其他Checkpoint設(shè)置,在CheckpointConfig中設(shè)置:Checkpoint相關(guān)配置env.enableCheckpointing(n)//使用At-Least-OncecheckpointCfg.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE);CheckpointConfigcheckpointCfg=env.getCheckpointConfig();重啟恢復流程:重啟應(yīng)用,在集群上重新部署數(shù)據(jù)流圖。從持久化存儲上讀取最近一次的Checkpoint數(shù)據(jù),加載到各算子子任務(wù)上。繼續(xù)處理新流入的數(shù)據(jù)。作業(yè)故障重啟,會暫停一段時間,這段時間上游數(shù)據(jù)仍然會繼續(xù)發(fā)送過來,作業(yè)重啟后,需要消化這些未處理的數(shù)據(jù)。重啟恢復流程由于異常導致故障,異常根源不消除,重啟后仍然出現(xiàn)故障,因此要避免無限次重啟。固定延遲(FixedDelay):作業(yè)每次失敗后,按照設(shè)定的時間間隔進行重啟嘗試,重啟次數(shù)不會超過某個設(shè)定值失敗率(FailureRate):計算一個時間段內(nèi)作業(yè)失敗的次數(shù),如果失敗次數(shù)小于設(shè)定值,繼續(xù)重啟,否則不重啟不重啟(NoRestart)在conf/flink-conf.yaml設(shè)置或者在代碼中設(shè)置三種重啟策略實現(xiàn)有狀態(tài)的計算Checkpoint原理及配置方法Savepoint原理及使用方法Checkpoint和Savepoint生成的數(shù)據(jù)近乎一樣Checkpoint目的是為了故障重啟,使得重啟前后作業(yè)狀態(tài)一致Savepoint目的是手動備份數(shù)據(jù),以便進行調(diào)試、遷移、迭代等狀態(tài)數(shù)據(jù)從零積累成本很高迭代:在初版代碼的基礎(chǔ)上,保留狀態(tài)到Savepoint中,方便修改業(yè)務(wù)邏輯遷移:把程序遷移到新的機房、集群等有計劃地備份、停機,手動管理和刪除狀態(tài)數(shù)據(jù)場景:同一個作業(yè)不斷調(diào)整并行度,以找到最優(yōu)方案進行A/B實驗,使用相同的狀態(tài)數(shù)據(jù)測試不同的程序版本Savepoint與Checkpoint的區(qū)別每個算子應(yīng)該分配一個唯一ID,Savepoint中的狀態(tài)數(shù)據(jù)以算子ID來存儲和區(qū)分不設(shè)置ID,F(xiàn)link自動為其分配一個ID算子IDDataStream<X>stream=env.//一個帶有OperatorState的Source,例如KafkaSource

.addSource(newStatefulSource()).uid(“source-id”)

//算子ID

.keyBy(...)//一個帶有KeyedState的StatefulMap

.map(newStatefulMapper()).uid(“mapper-id”)

//算子ID

//print是一種無狀態(tài)的Sink

.print();//Flink為其自動分配一個算子ID對某個作業(yè)的狀態(tài)進行備份,將Savepoint目錄保存到某個目錄下:從某個Savepoint目錄中恢復一個作業(yè):備份和恢復$

./bin/flinksavepoint<jobId>[savepointDirectory]$

./bin/flinkrun-s<savepointPath>[OPTIONS]<xxx.jar>StateProcessorAPI:基于DataSet

API,讀寫和修改Savepoint數(shù)據(jù)Savepoint以一定的Schema存儲,像讀寫數(shù)據(jù)庫一樣讀寫SavepointReaderFunction是一個KeyedStateReaderFunction的實現(xiàn),需要實現(xiàn)open()和readKey()方法:open()方法中注冊StateDescriptorreadKey()方法中逐Key讀取數(shù)據(jù),輸出到Collector中從Savepoint中讀數(shù)據(jù)Savepoint中的數(shù)據(jù)存儲形式DataSet<Integer>listState=savepoint.readListState<>("source-id","os1",Types.INT);//ReaderFunction需要繼承并實現(xiàn)KeyedStateReaderFunction

DataSet<KeyedState>keyedState=savepoint.readKeyedState("mapper-id",newReaderFunction());向Savepoint中寫入狀態(tài),適合作業(yè)冷啟動構(gòu)建BootstrapTransformation操作,是一個狀態(tài)寫入的過程,可以理解為流處理時使用的有狀態(tài)的算子withOperator()向Savepoint中添加算子,參數(shù)分別為:算子ID一個BootstrapTransformationKeyed

State和Operator

State的BootstrapTransformation實現(xiàn)不同向Savepoint寫數(shù)據(jù)ExecutionEnvironmentbEnv=ExecutionEnvironment.getExecutionEnvironment();//最大并行度

intmaxParallelism=128;StateBackendbackend=...//準備好寫入狀態(tài)的數(shù)據(jù)

DataSet<Account>accountDataSet=bEnv.fromCollection(accounts);//構(gòu)建一個BootstrapTransformation,將accountDataSet寫入

BootstrapTransformation<Account>transformation=OperatorTransformation.bootstrapWith(accountDataSet).keyBy(acc->acc.id).transform(newAccountBootstrapper());//創(chuàng)建算子,算子ID為accountsSavepoint.create(backend,maxParallelism).withOperator("accounts",transformation).write(savepointPath);bEnv.execute("bootstrap");Operator

State要實現(xiàn)StateBootstrapFunction實現(xiàn)processElement()方法,每來一個輸入,processElement()方法會被調(diào)用一次,用于將數(shù)據(jù)寫入Savepoint。Operator

State:StateBootstrapFunction/**

*繼承并實現(xiàn)StateBootstrapFunction

*泛型參數(shù)為輸入類型*/

public

class

SimpleBootstrapFunction

extends

StateBootstrapFunction<Integer>{privateListState<Integer>state;//每個輸入都會調(diào)用一次processElement,這里將輸入加入到狀態(tài)中

@Overridepublic

void

processElement(Integervalue,Contextctx)

throwsException{state.add(value);}

溫馨提示

  • 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內(nèi)容里面會有圖紙預(yù)覽,若沒有圖紙預(yù)覽就沒有圖紙。
  • 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
  • 5. 人人文庫網(wǎng)僅提供信息存儲空間,僅對用戶上傳內(nèi)容的表現(xiàn)方式做保護處理,對用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對任何下載內(nèi)容負責。
  • 6. 下載文件中如有侵權(quán)或不適當內(nèi)容,請與我們聯(lián)系,我們立即糾正。
  • 7. 本站不保證下載資源的準確性、安全性和完整性, 同時也不承擔用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。

評論

0/150

提交評論