版權(quán)說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請進行舉報或認(rèn)領(lǐng)
文檔簡介
Flink連接器本章將詳細介紹Flink的Connector相關(guān)知識。在實際生產(chǎn)環(huán)境中,數(shù)據(jù)可能存放在不同的系統(tǒng)中,比如文件系統(tǒng)、數(shù)據(jù)庫或消息隊列。一個完整的Flink作業(yè)包括Source和Sink兩大模塊,Source和Sink肩負(fù)著Flink與外部系統(tǒng)進行數(shù)據(jù)交互的重要功能,它們又被稱為連接器(Connector)。通過本節(jié)學(xué)習(xí)您將可以:掌握Flink端到端的Exactly-Once保障。掌握自定義Source和Sink。熟悉Flink中常用的Connector,如文件系統(tǒng)、Kafka等。端到端的Exactly-Once自定義Source和Sink常用流式連接器
端到端Exactly-OnceExactly-Once:某條數(shù)據(jù)投遞到某個流處理系統(tǒng)后,該系統(tǒng)對這條數(shù)據(jù)只處理一次有數(shù)據(jù)重發(fā)(Replay)問題:作業(yè)重啟后,Source必須從某個Offset位置重新發(fā)送數(shù)據(jù)數(shù)據(jù)重發(fā)會導(dǎo)致一條輸入數(shù)據(jù)可能多次影響下游系統(tǒng),有可能產(chǎn)生At-Least-Once的效果,沒有達到Exactly-Once的效果為了達到端到端的Exactly-Once,必須:Source有重發(fā)功能Sink支持冪等寫或事務(wù)寫冪等寫(IdempotentWrite):任意多次向一個系統(tǒng)寫入數(shù)據(jù),只對目標(biāo)系統(tǒng)產(chǎn)生一次結(jié)果影響:重復(fù)向一個HashMap里插入同一個Key-Value對,第一次插入時這個HashMap發(fā)生變化,后續(xù)的插入操作不會改變HashMap的結(jié)果。Key-Value必須是可確定性(Deterministic)計算的:Key為name
+
curTimestamp,curTimestamp一直變化,Key非可確定性Key為name+eventTimestamp,Event
Time確定,Key可確定性有短暫的數(shù)據(jù)閃回現(xiàn)象:只有當(dāng)后續(xù)所有數(shù)據(jù)都重發(fā)一遍后,所有應(yīng)該被覆蓋的Key都被最新數(shù)據(jù)覆蓋后,整個系統(tǒng)才達到數(shù)據(jù)的一致狀態(tài)。冪等寫事務(wù)寫(TransactionWrite):Flink先將待輸出的數(shù)據(jù)保存下來暫時不向外部系統(tǒng)提交,等待Checkpoint結(jié)束的時刻,F(xiàn)link上下游所有算子的數(shù)據(jù)都是一致時,將之前保存的數(shù)據(jù)全部提交(Commit)到外部系統(tǒng):預(yù)寫日志(Write-Ahead-Log,WAL)兩階段提交(Two-Phase-Commit,2PC)Write-Ahead-Log方式使用OperatorState緩存待輸出的數(shù)據(jù)Two-Phase-Commit方式需要外部系統(tǒng)自身就支持事務(wù)(比如Kafka)端到端的Exactly-Once,犧牲了低延遲,數(shù)據(jù)分批次地提交事物寫端到端的Exactly-Once自定義Source和Sink常用流式連接器Flink在1.11對Source進行了重構(gòu),改動較大,之前的稱為老Source接口,之后的稱為新Source接口老Source接口實現(xiàn)SourceFunction:接口類SourceFunctionRich函數(shù)類RichSourceFunction必須實現(xiàn)兩個方法:run()和cancel()方法:run()方法:Source啟動后開始運行,在方法中使用循環(huán),循環(huán)內(nèi)不斷向下游發(fā)送數(shù)據(jù)cancel()方法:停止向下游繼續(xù)發(fā)送數(shù)據(jù)老Source接口//Source啟動后調(diào)用run方法,生成數(shù)據(jù)向下游發(fā)送
void
run(SourceContext<T>ctx)
throwsException;//停止
void
cancel();使用標(biāo)志位isRunning標(biāo)記Source是否在運行run()方法內(nèi)一直循環(huán),使用SourceContext.collect()方法收集數(shù)據(jù),發(fā)送到下游停止Source時,要修改標(biāo)志位isRunning主邏輯中調(diào)用:老Source接口private
static
class
SimpleSource
implements
SourceFunction<Tuple2<String,Integer>>{private
intoffset=0;private
booleanisRunning=true;@Overridepublic
void
run(SourceContext<Tuple2<String,Integer>>ctx)
throwsException{while(isRunning){Thread.sleep(500);ctx.collect(newTuple2<>(""+offset,offset));offset++;if(offset==1000){isRunning=false;}}}@Overridepublic
void
cancel()
{isRunning=false;}}自定義Source:將數(shù)字發(fā)送到下游DataStream<Tuple2<String,Integer>>countStream=env.addSource(newSimpleSource());前頁的例子沒有進行任何Checkpoint,重啟后從0重新開始,為了整個作業(yè)重啟后可恢復(fù),Source需要支持重發(fā),將Offset作為狀態(tài)記錄下來使用Operator
State記錄Offset,需要繼承CheckpointedFunction接口類,實現(xiàn)snapshotState()和initializeState()方法整個作業(yè)第一次啟動時,調(diào)用initializeState()方法,offset為0,之后每隔一段時間調(diào)用snapshotState()將狀態(tài)數(shù)據(jù)進行Checkpoint可恢復(fù)的Source@Overridepublic
void
snapshotState(FunctionSnapshotContextsnapshotContext)
throwsException{//清除上次狀態(tài)
offsetState.clear();//將最新的offset添加到狀態(tài)中
offsetState.add(offset);}@Overridepublic
void
initializeState(FunctionInitializationContextinitializationContext)
throwsException{//初始化offsetState
ListStateDescriptor<Integer>desc=newListStateDescriptor<Integer>("offset",Types.INT);offsetState=initializationContext.getOperatorStateStore().getListState(desc);Iterable<Integer>iter=offsetState.get();if(iter==null||!iter.iterator().hasNext()){//第一次初始化,從0開始計數(shù)
offset=0;}else{//從狀態(tài)中恢復(fù)offset
offset=iter.iterator().next();}}privateListState<Integer>offsetState;在Source發(fā)送數(shù)據(jù)時也設(shè)置數(shù)據(jù)對應(yīng)的時間戳,并生成Watermark:collectWithTimestamp()方法,發(fā)送數(shù)據(jù)的同時也設(shè)置時間戳emitWatermark()方法,生成Watermark越早設(shè)置時間戳和Watermark,越能保證整個作業(yè)在時間序列上的準(zhǔn)確性和健壯性時間戳和Watermark@Overridepublic
void
run(SourceContext<Tuple2<String,Integer>>ctx)
throwsException{while(isRunning){ Thread.sleep(100);//將系統(tǒng)當(dāng)前時間作為該條數(shù)據(jù)的時間戳發(fā)送出去
ctx.collectWithTimestamp( newTuple2<>(""+offset,offset),System.currentTimeMillis());offset++;//每隔一段時間,發(fā)送一個Watermark
if(offset%100==0){ctx.emitWatermark(newWatermark(System.currentTimeMillis()));}if(offset==1000){isRunning=false;}}}老Source接口只適合流處理,不適合批處理,新的Source接口統(tǒng)一了流批處理,提供了更大規(guī)模并行處理能力三個組件:分片(Split):將數(shù)據(jù)源切分后的一小部分。讀取器(SourceReader):在TaskManager上,負(fù)責(zé)Split的讀取和處理,可分布式地并行運行。例如,單個SourceReader可以讀取文件夾里的單個文件,多個SourceReader實例共同完成讀取整個文件夾的任務(wù)。分片枚舉器(SplitEnumerator):在JobManager上,負(fù)責(zé)發(fā)現(xiàn)和分配Split,按照負(fù)載均衡策略將多個Split分配到多個SourceReader。新Source接口類SinkFunctionRich函數(shù)類RichSinkFunction實現(xiàn)invoke()方法如果想達到端到端Exactly-Once,需要實現(xiàn)冪等寫和事務(wù)寫冪等寫:使用一些Key-Value存儲,并設(shè)計好Key,采用更新插入(Upsert)方式,將舊數(shù)據(jù)覆蓋事務(wù)寫:Write-Ahead-Log、Two-Phase-Commit
自定義Sink//每條數(shù)據(jù)到達Sink后都會調(diào)用invoke方法,發(fā)送到下游外部系統(tǒng)
//value為待輸出數(shù)據(jù)
void
invoke(INvalue,Contextcontext)在數(shù)據(jù)寫入到下游系統(tǒng)之前,先把數(shù)據(jù)以日志(Log)的形式緩存下來,等收到明確的確認(rèn)提交信息后,再將Log中的數(shù)據(jù)提交到下游系統(tǒng)0、兩次Checkpoint之間的待輸出數(shù)據(jù)組成一個批次,待輸出批次緩存在Sink的Operator
State中1、接收到新的CheckpointBarrier
2、開啟一個新待輸出批次3、Sink向CheckpointCommitter查詢某批次是否已經(jīng)提交。CheckpointCommitter是一個與外部系統(tǒng)緊密相連的插件,里面存儲了各批次數(shù)據(jù)是否已經(jīng)寫入到外部系統(tǒng)4、Sink得知某批次數(shù)據(jù)還未提交,則使用sendValues()方法,發(fā)送待輸出的數(shù)據(jù)到外部系統(tǒng)5、提交成功后,Sink會刪除OperatorState中存儲的這些數(shù)據(jù)Write-Ahead-Log待輸出數(shù)據(jù)直接寫入外部系統(tǒng),與外部系統(tǒng)一起協(xié)作提供事物寫功能0、Sink直接將待發(fā)送數(shù)據(jù)寫到外部系統(tǒng)的第k次事務(wù)(Transaction)中1、接收到新的CheckpointBarrier
2、preCommit()將第k次Transaction的數(shù)據(jù)預(yù)提交到外部系統(tǒng)中,數(shù)據(jù)寫到外部系統(tǒng),但是并未確認(rèn),外部系統(tǒng)也不可見3、beginTransaction()方法,開啟下一次Transaction(Transactionk+1),在這之后的上游算子流入的待輸入數(shù)據(jù)都將流入新的Transaction(k+1)4、第2步和第3步完成后,執(zhí)行commit()方法,確認(rèn)提交Transaction
k,該批次數(shù)據(jù)在外部可見Two-Phase-Commit端到端的Exactly-Once自定義Source和Sink常用流式連接器內(nèi)置I/O(Input/Output)接口flink-connector項目所涉及的ConnectorApacheBahir所提供的Connector
系統(tǒng)類型:消息隊列、數(shù)據(jù)庫、文件系統(tǒng)具體技術(shù):Kafka、Elasticsearch、HBase、Cassandra、JDBC、Kinesis、Redis
…常用流式連接器基于Socket的Source和Sink無法實現(xiàn)數(shù)據(jù)重發(fā),適合用來調(diào)試基于內(nèi)存集合的Source打印到標(biāo)準(zhǔn)輸出的Sinkprint()打印到STDOUTprintToErr()打印到STDERR數(shù)據(jù)類型要實現(xiàn)toString()方法實際是在TaskManager上執(zhí)行內(nèi)置I/O接口//讀取Socket中的數(shù)據(jù),數(shù)據(jù)流元素之間用\n來切分
env.socketTextStream(hostname,port,"\n");//向Socket中寫數(shù)據(jù),數(shù)據(jù)以SimpleStringSchema序列化
stream.writeToSocket(outputHost,outputPort,newSimpleStringSchema());DataStream<Integer>sourceDataStream=env.fromElements(1,2,3);從內(nèi)存集合讀取數(shù)據(jù)從Socket中讀取數(shù)據(jù)通過文件系統(tǒng)描述符來確定使用什么文件系統(tǒng):hdfs://、s3://周期性檢測功能:每隔一定時間周期性地檢查filePath路徑下的內(nèi)容是否有更新基于文件系統(tǒng)的SourceStreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment();StringtextPath=...//以UTF-8編碼格式讀取文件
DataStream<String>text=env.readTextFile(textPath)//文件路徑
StringfilePath=...//文件為純文本格式
TextInputFormattextInputFormat=newTextInputFormat(neworg.apache.flink.core.fs.Path(filePath));//每隔100毫秒檢測一遍
DataStream<String>inputStream=env.readFile(textInputFormat,filePath, FileProcessingMode.PROCESS_CONTINUOUSLY,100);簡單接口復(fù)雜接口writeAsText()方法:無法進行Checkpoint,逐漸被廢棄StreamingFileSink行式存儲和列式存儲桶:輸出路徑的子文件夾可以按時間分桶基于文件系統(tǒng)的SinkDataStream<Address>stream=env.addSource(...)//使用StreamingFileSink將DataStream輸出為一個文本文件
StreamingFileSink<String>fileSink=StreamingFileSink.forRowFormat(newPath("/file/base/path"),
newSimpleStringEncoder<String>("UTF-8")).build();stream.addSink(fileSink);[base-path]/[bucket-path]/part-[task-id]-[id]/file/base/path└──2020-02-25--15├──part-0-0.inprogress.92c7be6f-8cfc-4ca3-905b-91b0e20ba9a9├──part-1-0.inprogress.18f9fa71-1525-4776-a7bc-fe02ee1f2ddaStreamingFileSink接口桶的文件夾結(jié)構(gòu)Kafka:被廣泛使用的消息隊列,非常具有代表性可以作為Flink的上游,此時要構(gòu)建Flink的Source;也可以作為Flink的下游,此時要構(gòu)建Flink的Sink不在Flink核心程序中,使用時需要額外在Maven中添加依賴Flink
Kafka
Connector<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_2.11</artifactId><version>1.11.0</version></dependency>Kafka是一個Producer,F(xiàn)link作為Kafka的Consumer消費Kafka中的數(shù)據(jù)創(chuàng)建FlinkKafkaConsumer需要三個參數(shù):Topic、反序列化方式和Kafka相關(guān)參數(shù)Kafka中傳輸?shù)氖嵌M制的數(shù)據(jù),需要提供一個反序列化方式,將數(shù)據(jù)轉(zhuǎn)化為具體的Java或Scala對象開啟Flink
Checkpoint后,Checkpoint會記錄Offset,以進行故障恢復(fù)Flink
Kafka
Source//Kafka參數(shù)
Propertiesproperties=newProperties();properties.setProperty("bootstrap.servers","localhost:9092");properties.setProperty("group.id","flink-group");StringinputTopic="Shakespeare";//
溫馨提示
- 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
- 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
- 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內(nèi)容里面會有圖紙預(yù)覽,若沒有圖紙預(yù)覽就沒有圖紙。
- 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
- 5. 人人文庫網(wǎng)僅提供信息存儲空間,僅對用戶上傳內(nèi)容的表現(xiàn)方式做保護處理,對用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對任何下載內(nèi)容負(fù)責(zé)。
- 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請與我們聯(lián)系,我們立即糾正。
- 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時也不承擔(dān)用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。
最新文檔
- 二零二五版國際金融風(fēng)險管理合同范本3篇
- 二零二五版建筑工地勞務(wù)用工及社會保障服務(wù)合同范本3篇
- 二零二五年酒店客房協(xié)議價優(yōu)惠合作合同3篇
- 2024政府采購合同環(huán)境與安全監(jiān)督協(xié)議3篇
- 2025年新型城鎮(zhèn)化項目水電設(shè)施安裝施工合同3篇
- 二零二五版板房租賃與租賃期滿資產(chǎn)評估與轉(zhuǎn)讓合同3篇
- 二零二五年度出租車司機服務(wù)規(guī)范與客戶滿意度提升合同3篇
- 二零二五年透水混凝土工程驗收與評估合同2篇
- 二零二五年智能交通管理系統(tǒng)采購合同3篇
- 二零二五版房屋代理租賃資產(chǎn)評估合同3篇
- 蓋洛普Q12解讀和實施完整版
- 2023年Web前端技術(shù)試題
- GB/T 20840.8-2007互感器第8部分:電子式電流互感器
- GB/T 14864-2013實心聚乙烯絕緣柔軟射頻電纜
- 品牌策劃與推廣-項目5-品牌推廣課件
- 信息學(xué)奧賽-計算機基礎(chǔ)知識(完整版)資料
- 發(fā)煙硫酸(CAS:8014-95-7)理化性質(zhì)及危險特性表
- 數(shù)字信號處理(課件)
- 公路自然災(zāi)害防治對策課件
- 耳鳴中醫(yī)臨床路徑
- 安徽身份證號碼前6位
評論
0/150
提交評論