




版權(quán)說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請(qǐng)進(jìn)行舉報(bào)或認(rèn)領(lǐng)
文檔簡介
實(shí)時(shí)計(jì)算:KafkaStreams:KafkaStreams核心API詳解1實(shí)時(shí)計(jì)算:KafkaStreams:KafkaStreams核心API詳解1.1KafkaStreams簡介KafkaStreams是一個(gè)用于構(gòu)建實(shí)時(shí)流數(shù)據(jù)應(yīng)用和微服務(wù)的客戶端庫。它允許開發(fā)者使用JavaAPI處理和分析流式數(shù)據(jù),而無需依賴于外部系統(tǒng)或服務(wù)。KafkaStreams將數(shù)據(jù)處理任務(wù)封裝在應(yīng)用程序中,使得數(shù)據(jù)處理更加靈活和可擴(kuò)展。1.1.1核心功能流處理:實(shí)時(shí)地讀取和寫入數(shù)據(jù)流。狀態(tài)存儲(chǔ):維護(hù)和查詢流處理中的狀態(tài)信息。窗口操作:對(duì)數(shù)據(jù)流進(jìn)行時(shí)間窗口或滑動(dòng)窗口操作,以實(shí)現(xiàn)復(fù)雜的時(shí)間序列分析。1.1.2優(yōu)勢(shì)低延遲:能夠?qū)崟r(shí)處理數(shù)據(jù),延遲通常在毫秒級(jí)別。高吞吐量:利用Kafka的高吞吐量特性,處理大量數(shù)據(jù)。容錯(cuò)性:自動(dòng)恢復(fù)和狀態(tài)持久化,確保數(shù)據(jù)處理的可靠性。1.2實(shí)時(shí)計(jì)算的重要性在大數(shù)據(jù)和物聯(lián)網(wǎng)時(shí)代,實(shí)時(shí)計(jì)算變得至關(guān)重要。它能夠即時(shí)響應(yīng)數(shù)據(jù)流中的變化,提供即時(shí)的洞察和決策支持。例如,在金融交易中,實(shí)時(shí)計(jì)算可以檢測(cè)異常交易并立即采取行動(dòng);在社交媒體分析中,它可以實(shí)時(shí)分析用戶行為,提供個(gè)性化推薦。1.3KafkaStreams與Kafka的區(qū)別Kafka主要是一個(gè)分布式流處理平臺(tái),用于發(fā)布和訂閱消息。而KafkaStreams是基于Kafka構(gòu)建的一個(gè)流處理庫,它提供了高級(jí)的流處理API,使得開發(fā)者能夠更方便地進(jìn)行數(shù)據(jù)處理和分析。KafkaStreams將數(shù)據(jù)處理邏輯封裝在應(yīng)用程序中,而Kafka本身則專注于數(shù)據(jù)的傳輸和存儲(chǔ)。1.4核心概念:流處理與狀態(tài)存儲(chǔ)1.4.1流處理流處理是指對(duì)連續(xù)的、無界的數(shù)據(jù)流進(jìn)行實(shí)時(shí)處理。在KafkaStreams中,流處理通常包括以下步驟:1.讀取數(shù)據(jù)流:從Kafka主題中讀取數(shù)據(jù)。2.數(shù)據(jù)轉(zhuǎn)換:對(duì)讀取的數(shù)據(jù)進(jìn)行轉(zhuǎn)換或過濾。3.聚合操作:對(duì)數(shù)據(jù)進(jìn)行聚合,如求和、平均值等。4.寫入結(jié)果:將處理后的數(shù)據(jù)寫入另一個(gè)Kafka主題或外部系統(tǒng)。示例代碼Propertiesprops=newProperties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG,"wordcount-application");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,Serdes.String().getClass());
StreamsBuilderbuilder=newStreamsBuilder();
KStream<String,String>textLines=builder.stream("input-topic");
KTable<String,Long>wordCounts=textLines
.flatMapValues(value->Arrays.asList(value.toLowerCase().split("\\W+")))
.groupBy((key,word)->word)
.count(Materialized.as("counts-store"));
wordCounts.toStream().to("output-topic",Produced.with(Serdes.String(),Serdes.Long()));
KafkaStreamsstreams=newKafkaStreams(builder.build(),props);
streams.start();在這個(gè)例子中,我們從input-topic讀取數(shù)據(jù),將每行文本轉(zhuǎn)換為單詞流,然后對(duì)每個(gè)單詞進(jìn)行計(jì)數(shù),并將結(jié)果寫入output-topic。1.4.2狀態(tài)存儲(chǔ)狀態(tài)存儲(chǔ)是流處理中的關(guān)鍵概念,它允許應(yīng)用程序在處理數(shù)據(jù)時(shí)維護(hù)狀態(tài)信息。KafkaStreams提供了多種狀態(tài)存儲(chǔ)類型,包括:-KeyValueStore:用于存儲(chǔ)鍵值對(duì)狀態(tài)。-WindowStore:用于存儲(chǔ)基于時(shí)間窗口的狀態(tài)。-SessionStore:用于存儲(chǔ)基于會(huì)話的狀態(tài)。示例代碼StreamsBuilderbuilder=newStreamsBuilder();
KStream<String,String>textLines=builder.stream("input-topic");
KTable<String,Long>wordCounts=textLines
.flatMapValues(value->Arrays.asList(value.toLowerCase().split("\\W+")))
.groupBy((key,word)->word)
.windowedBy(TimeWindows.of(Duration.ofMinutes(5)))
.count(Materialized.<String,Long,WindowStore<Bytes,byte[],Long>>as("word-counts-store"));
wordCounts.toStream().foreach((k,v)->System.out.println(k.key+":"+v));在這個(gè)例子中,我們使用了WindowStore來存儲(chǔ)基于5分鐘時(shí)間窗口的單詞計(jì)數(shù)。這使得我們能夠進(jìn)行時(shí)間窗口內(nèi)的數(shù)據(jù)分析,例如計(jì)算每5分鐘內(nèi)的單詞頻率。1.5總結(jié)KafkaStreams提供了一套強(qiáng)大的API,用于構(gòu)建實(shí)時(shí)流數(shù)據(jù)處理應(yīng)用。通過流處理和狀態(tài)存儲(chǔ),開發(fā)者可以實(shí)現(xiàn)低延遲、高吞吐量的數(shù)據(jù)處理,滿足現(xiàn)代應(yīng)用對(duì)實(shí)時(shí)性的需求。上述示例展示了如何使用KafkaStreams進(jìn)行基本的流處理和狀態(tài)存儲(chǔ)操作,為構(gòu)建更復(fù)雜的數(shù)據(jù)處理邏輯奠定了基礎(chǔ)。2KafkaStreams基礎(chǔ)2.1環(huán)境搭建與依賴引入在開始使用KafkaStreams進(jìn)行實(shí)時(shí)數(shù)據(jù)處理之前,首先需要搭建開發(fā)環(huán)境并引入必要的依賴。以下步驟將指導(dǎo)你完成這一過程。2.1.1環(huán)境搭建安裝Java:KafkaStreams基于Java開發(fā),確保你的系統(tǒng)中已安裝Java8或更高版本。安裝Kafka:下載并安裝ApacheKafka,版本應(yīng)與KafkaStreams兼容。設(shè)置Kafka環(huán)境:配置Kafka的perties文件,確保Broker運(yùn)行正常。2.1.2依賴引入在你的項(xiàng)目中,需要添加KafkaStreams的依賴。如果你使用Maven,可以在pom.xml文件中添加以下依賴:<!--KafkaStreams依賴-->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>3.2.0</version>
</dependency>2.2創(chuàng)建KafkaStreams實(shí)例創(chuàng)建KafkaStreams實(shí)例是開始流處理的第一步。以下是一個(gè)創(chuàng)建KafkaStreams實(shí)例的基本示例:importorg.apache.kafka.streams.KafkaStreams;
importorg.apache.kafka.streams.StreamsBuilder;
importorg.apache.kafka.streams.StreamsConfig;
importjava.util.Properties;
publicclassKafkaStreamsExample{
publicstaticvoidmain(String[]args){
//配置KafkaStreams
Propertiesprops=newProperties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG,"my-stream-app");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,mon.serialization.Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,mon.serialization.Serdes.String().getClass().getName());
//創(chuàng)建StreamsBuilder
StreamsBuilderbuilder=newStreamsBuilder();
//定義流處理邏輯
builder.stream("input-topic")
.to("output-topic");
//創(chuàng)建KafkaStreams實(shí)例
KafkaStreamsstreams=newKafkaStreams(builder.build(),props);
//啟動(dòng)流處理
streams.start();
//等待程序結(jié)束
Runtime.getRuntime().addShutdownHook(newThread(streams::close));
}
}在這個(gè)示例中,我們首先配置了KafkaStreams的參數(shù),然后使用StreamsBuilder定義了流處理邏輯,最后創(chuàng)建并啟動(dòng)了KafkaStreams實(shí)例。2.3配置KafkaStreams參數(shù)KafkaStreams的配置參數(shù)對(duì)于流處理的性能和行為至關(guān)重要。以下是一些關(guān)鍵的配置參數(shù):APPLICATION_ID_CONFIG:應(yīng)用程序的唯一ID,用于區(qū)分不同的KafkaStreams實(shí)例。BOOTSTRAP_SERVERS_CONFIG:Kafka集群的Broker列表。DEFAULT_KEY_SERDE_CLASS_CONFIG和DEFAULT_VALUE_SERDE_CLASS_CONFIG:用于序列化和反序列化鍵和值的Serde類。2.4編寫簡單的流處理程序KafkaStreams提供了多種API來處理流數(shù)據(jù),包括KStream和KTable。下面是一個(gè)使用KStream進(jìn)行簡單流處理的示例:importorg.apache.kafka.streams.KafkaStreams;
importorg.apache.kafka.streams.StreamsBuilder;
importorg.apache.kafka.streams.StreamsConfig;
importorg.apache.kafka.streams.kstream.KStream;
importmon.serialization.Serdes;
importjava.util.Properties;
publicclassSimpleStreamProcessing{
publicstaticvoidmain(String[]args){
//配置KafkaStreams
Propertiesprops=newProperties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG,"simple-stream-app");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,Serdes.String().getClass().getName());
//創(chuàng)建StreamsBuilder
StreamsBuilderbuilder=newStreamsBuilder();
//定義流處理邏輯
KStream<String,String>input=builder.stream("input-topic");
KStream<String,String>output=input.mapValues(value->value.toUpperCase());
output.to("output-topic");
//創(chuàng)建KafkaStreams實(shí)例
KafkaStreamsstreams=newKafkaStreams(builder.build(),props);
//啟動(dòng)流處理
streams.start();
//等待程序結(jié)束
Runtime.getRuntime().addShutdownHook(newThread(streams::close));
}
}在這個(gè)示例中,我們從input-topic讀取數(shù)據(jù),將數(shù)據(jù)值轉(zhuǎn)換為大寫,然后將結(jié)果寫入output-topic。這展示了KafkaStreams如何處理流數(shù)據(jù)的基本流程。2.4.1數(shù)據(jù)樣例假設(shè)input-topic中的數(shù)據(jù)如下:key1:value1
key2:value2
key3:value3經(jīng)過處理后,output-topic中的數(shù)據(jù)將變?yōu)椋簁ey1:VALUE1
key2:VALUE2
key3:VALUE3這個(gè)示例展示了如何使用KafkaStreams的mapValues方法來修改流中的數(shù)據(jù)值。3實(shí)時(shí)計(jì)算:KafkaStreams核心API詳解3.1處理API:KStream與KTableKafkaStreams提供了兩種主要的數(shù)據(jù)處理抽象:KStream和KTable。KStream代表了無界的數(shù)據(jù)流,而KTable則代表了可以被查詢的、有狀態(tài)的、無界的數(shù)據(jù)流。3.1.1KStreamKStream是KafkaStreams中處理流數(shù)據(jù)的主要API。它允許你對(duì)流數(shù)據(jù)進(jìn)行各種操作,如過濾、映射、扁平化、聚合等。示例:使用KStream進(jìn)行數(shù)據(jù)映射Propertiesprops=newProperties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG,"wordcount-application");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,Serdes.String().getClass());
StreamsBuilderbuilder=newStreamsBuilder();
KStream<String,String>textLines=builder.stream("streams-plaintext-input");
KStream<String,Long>wordCounts=textLines
.flatMapValues(value->Arrays.asList(value.toLowerCase().split("\\W+")))
.groupBy((key,word)->word)
.count(Materialized.as("counts-store"));
wordCounts.to("streams-wordcount-output",Produced.with(Serdes.String(),Serdes.Long()));
KafkaStreamsstreams=newKafkaStreams(builder.build(),props);
streams.start();在這個(gè)例子中,我們從一個(gè)主題streams-plaintext-input中讀取數(shù)據(jù),然后使用flatMapValues方法將每行文本分割成單詞,接著使用groupBy和count方法來計(jì)算每個(gè)單詞的出現(xiàn)次數(shù),并將結(jié)果寫入到streams-wordcount-output主題。3.1.2KTableKTable是一個(gè)可以被查詢的、有狀態(tài)的數(shù)據(jù)流。它通常用于處理需要持久化狀態(tài)的流數(shù)據(jù),如聚合操作。示例:使用KTable進(jìn)行數(shù)據(jù)聚合Propertiesprops=newProperties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG,"aggregation-application");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,Serdes.Double().getClass());
StreamsBuilderbuilder=newStreamsBuilder();
KTable<String,Double>purchaseAmounts=builder.table("purchase-amounts");
KTable<String,Double>totalAmounts=purchaseAmounts
.groupBy((key,value)->key)
.reduce((value1,value2)->value1+value2);
totalAmounts.toStream().to("total-purchase-amounts",Produced.with(Serdes.String(),Serdes.Double()));
KafkaStreamsstreams=newKafkaStreams(builder.build(),props);
streams.start();在這個(gè)例子中,我們從purchase-amounts主題讀取數(shù)據(jù),然后使用reduce方法來計(jì)算每個(gè)用戶的總購買金額,并將結(jié)果寫入到total-purchase-amounts主題。3.2轉(zhuǎn)換API:mapValues,map,flatMap轉(zhuǎn)換API允許你修改流中的數(shù)據(jù),包括mapValues、map和flatMap。3.2.1mapValuesmapValues用于修改流中的值,但不改變鍵。示例:使用mapValues修改值KStream<String,String>input=builder.stream("input-topic");
KStream<String,Integer>output=input.mapValues(value->value.length());在這個(gè)例子中,我們將輸入流中的字符串值轉(zhuǎn)換為其長度。3.2.2mapmap用于修改流中的鍵和值。示例:使用map修改鍵和值KStream<String,String>input=builder.stream("input-topic");
KStream<Integer,String>output=input.map((key,value)->newKeyValue<>(value.length(),value));在這個(gè)例子中,我們將輸入流中的鍵轉(zhuǎn)換為值的長度,值保持不變。3.2.3flatMapflatMap用于將流中的值轉(zhuǎn)換為多個(gè)值。示例:使用flatMap處理數(shù)據(jù)KStream<String,String>input=builder.stream("input-topic");
KStream<String,String>output=input.flatMapValues(value->Arrays.asList(value.split("")));在這個(gè)例子中,我們將輸入流中的每個(gè)字符串值分割成多個(gè)單詞,并將它們作為新的值輸出。3.3聚合API:reduce,aggregate,groupByKey聚合API用于在流數(shù)據(jù)上執(zhí)行聚合操作,如求和、平均值等。3.3.1reducereduce用于在流中具有相同鍵的記錄上執(zhí)行聚合操作。示例:使用reduce計(jì)算總和KTable<String,Double>purchases=builder.table("purchase-amounts");
KTable<String,Double>totalPurchases=purchases.reduce((value1,value2)->value1+value2);在這個(gè)例子中,我們計(jì)算了每個(gè)用戶的總購買金額。3.3.2aggregateaggregate用于在流中具有相同鍵的記錄上執(zhí)行聚合操作,同時(shí)可以初始化狀態(tài)。示例:使用aggregate計(jì)算平均值KTable<String,Double>purchases=builder.table("purchase-amounts");
KTable<String,Double>averagePurchases=purchases.aggregate(
()->0.0,//初始化狀態(tài)
(key,value,aggregate)->aggregate+value,//聚合操作
Materialized.with(Serdes.String(),Serdes.Double())
).toStream()
.groupByKey(Grouped.with(Serdes.String(),Serdes.Double()))
.reduce((value1,value2)->value1+value2,Materialized.with(Serdes.String(),Serdes.Double()))
.map((key,value)->newKeyValue<>(key,value/purchases.count()));在這個(gè)例子中,我們首先使用aggregate來計(jì)算每個(gè)用戶的總購買金額,然后使用reduce來計(jì)算總購買次數(shù),最后計(jì)算平均購買金額。3.3.3groupByKeygroupByKey用于將流中的記錄按照鍵進(jìn)行分組。示例:使用groupByKey進(jìn)行分組KStream<String,String>input=builder.stream("input-topic");
KStream<String,String>grouped=input.groupByKey().selectKey((key,value)->value);在這個(gè)例子中,我們首先將輸入流中的記錄按照鍵進(jìn)行分組,然后使用selectKey方法將鍵轉(zhuǎn)換為值。3.4連接API:join,leftJoin連接API用于將兩個(gè)流或表連接在一起,基于它們的鍵進(jìn)行匹配。3.4.1joinjoin用于連接兩個(gè)流或表,只返回兩個(gè)流或表中鍵匹配的記錄。示例:使用join連接兩個(gè)表KTable<String,Double>purchases=builder.table("purchase-amounts");
KTable<String,Integer>userAges=builder.table("user-ages");
KTable<String,Tuple2<Double,Integer>>joined=purchases.join(userAges,(purchase,age)->newTuple2<>(purchase,age));
joined.toStream().to("joined-topic",Produced.with(Serdes.String(),Serdes.serdeFrom(Serdes.Double(),Serdes.Integer())));在這個(gè)例子中,我們將purchase-amounts表和user-ages表連接在一起,返回每個(gè)用戶的購買金額和年齡。3.4.2leftJoinleftJoin用于連接兩個(gè)流或表,返回左表中的所有記錄,即使右表中沒有匹配的鍵。示例:使用leftJoin連接兩個(gè)表KTable<String,Double>purchases=builder.table("purchase-amounts");
KTable<String,Integer>userAges=builder.table("user-ages");
KTable<String,Tuple2<Double,Integer>>joined=purchases.leftJoin(userAges,(purchase,age)->newTuple2<>(purchase,age!=null?age:0));
joined.toStream().to("left-joined-topic",Produced.with(Serdes.String(),Serdes.serdeFrom(Serdes.Double(),Serdes.Integer())));在這個(gè)例子中,我們將purchase-amounts表和user-ages表進(jìn)行左連接,返回purchase-amounts表中的所有記錄,如果user-ages表中沒有匹配的鍵,則使用默認(rèn)值0。通過上述示例,我們可以看到KafkaStreams提供了豐富的API來處理流數(shù)據(jù),包括數(shù)據(jù)的轉(zhuǎn)換、聚合和連接。這些API的使用可以讓你輕松地構(gòu)建復(fù)雜的數(shù)據(jù)流處理應(yīng)用程序。4狀態(tài)存儲(chǔ)與查詢4.1內(nèi)置狀態(tài)存儲(chǔ)介紹在KafkaStreams中,狀態(tài)存儲(chǔ)是實(shí)現(xiàn)流處理的關(guān)鍵組件之一。它允許流處理應(yīng)用程序在處理事件時(shí)保持狀態(tài),從而實(shí)現(xiàn)復(fù)雜的數(shù)據(jù)處理邏輯,如窗口操作、聚合和連接。KafkaStreams提供了兩種內(nèi)置的狀態(tài)存儲(chǔ)類型:InMemory和OnDisk。4.1.1InMemory狀態(tài)存儲(chǔ)InMemory狀態(tài)存儲(chǔ)將所有數(shù)據(jù)保存在應(yīng)用程序的本地內(nèi)存中。這種存儲(chǔ)方式提供了極快的讀寫速度,但其數(shù)據(jù)持久性較差,一旦應(yīng)用程序重啟,所有數(shù)據(jù)將丟失。InMemory狀態(tài)存儲(chǔ)適用于那些不需要數(shù)據(jù)持久性,且數(shù)據(jù)量較小的場景。4.1.2OnDisk狀態(tài)存儲(chǔ)OnDisk狀態(tài)存儲(chǔ)將數(shù)據(jù)保存在磁盤上,使用RocksDB作為底層存儲(chǔ)引擎。這種存儲(chǔ)方式提供了數(shù)據(jù)持久性,即使應(yīng)用程序重啟,數(shù)據(jù)也不會(huì)丟失。OnDisk狀態(tài)存儲(chǔ)適用于那些需要數(shù)據(jù)持久性,且數(shù)據(jù)量較大的場景。4.1.3示例:使用內(nèi)置狀態(tài)存儲(chǔ)下面的示例展示了如何在KafkaStreams應(yīng)用程序中使用內(nèi)置的OnDisk狀態(tài)存儲(chǔ)。importorg.apache.kafka.streams.KafkaStreams;
importorg.apache.kafka.streams.StreamsBuilder;
importorg.apache.kafka.streams.StreamsConfig;
importorg.apache.kafka.streams.kstream.KStream;
importorg.apache.kafka.streams.kstream.Materialized;
importmon.serialization.Serdes;
importjava.util.Properties;
publicclassStateStoreExample{
publicstaticvoidmain(String[]args){
Propertiesprops=newProperties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG,"state-store-example");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,Serdes.String().getClass());
StreamsBuilderbuilder=newStreamsBuilder();
//創(chuàng)建一個(gè)KStream,從輸入主題讀取數(shù)據(jù)
KStream<String,String>input=builder.stream("input-topic");
//使用OnDisk狀態(tài)存儲(chǔ),將數(shù)據(jù)聚合到一個(gè)狀態(tài)存儲(chǔ)中
input.groupByKey()
.reduce((value1,value2)->value1+value2)
.to("output-topic",Materialized.as("my-state-store"));
KafkaStreamsstreams=newKafkaStreams(builder.build(),props);
streams.start();
}
}在這個(gè)示例中,我們創(chuàng)建了一個(gè)KafkaStreams應(yīng)用程序,它從input-topic主題讀取數(shù)據(jù),然后使用OnDisk狀態(tài)存儲(chǔ)將數(shù)據(jù)聚合到my-state-store狀態(tài)存儲(chǔ)中。最后,聚合后的數(shù)據(jù)被寫入到output-topic主題。4.2自定義狀態(tài)存儲(chǔ)除了內(nèi)置的狀態(tài)存儲(chǔ),KafkaStreams還允許用戶自定義狀態(tài)存儲(chǔ)。這為用戶提供了極大的靈活性,可以根據(jù)自己的需求選擇最適合的狀態(tài)存儲(chǔ)類型。自定義狀態(tài)存儲(chǔ)需要實(shí)現(xiàn)StateStore接口,并提供自己的存儲(chǔ)邏輯。4.2.1示例:實(shí)現(xiàn)自定義狀態(tài)存儲(chǔ)下面的示例展示了如何實(shí)現(xiàn)一個(gè)自定義的狀態(tài)存儲(chǔ)。importcessor.StateStore;
importcessor.StateStoreSupplier;
importorg.apache.kafka.streams.state.KeyValueStore;
importjava.util.Map;
publicclassCustomStateStoreimplementsStateStoreSupplier{
@Override
publicStringname(){
return"custom-state-store";
}
@Override
publicbooleanpersistent(){
returntrue;
}
@Override
publicvoidenableLoading(booleanenable){
//實(shí)現(xiàn)加載邏輯
}
@Override
publicvoidsetLoggingEnabled(booleanenabled){
//實(shí)現(xiàn)日志記錄邏輯
}
@Override
publicKeyValueStore<Windowed<String>,Long>get(){
returnnewCustomKeyValueStore();
}
@Override
publicvoidclose(){
//實(shí)現(xiàn)關(guān)閉邏輯
}
@Override
publicvoidinit(Map<String,String>configs){
//實(shí)現(xiàn)初始化邏輯
}
privatestaticclassCustomKeyValueStoreimplementsKeyValueStore<Windowed<String>,Long>{
//實(shí)現(xiàn)KeyValueStore接口的方法
}
}在這個(gè)示例中,我們定義了一個(gè)CustomStateStore類,它實(shí)現(xiàn)了StateStoreSupplier接口。CustomStateStore類中的get方法返回一個(gè)自定義的CustomKeyValueStore實(shí)例,該實(shí)例實(shí)現(xiàn)了KeyValueStore接口,提供了自定義的存儲(chǔ)邏輯。4.3狀態(tài)存儲(chǔ)查詢接口KafkaStreams提供了狀態(tài)存儲(chǔ)查詢接口,允許應(yīng)用程序在處理事件時(shí)查詢狀態(tài)存儲(chǔ)中的數(shù)據(jù)。這使得應(yīng)用程序可以在處理事件時(shí)訪問歷史數(shù)據(jù),從而實(shí)現(xiàn)更復(fù)雜的業(yè)務(wù)邏輯。4.3.1示例:查詢狀態(tài)存儲(chǔ)下面的示例展示了如何在KafkaStreams應(yīng)用程序中查詢狀態(tài)存儲(chǔ)。importorg.apache.kafka.streams.KafkaStreams;
importorg.apache.kafka.streams.StreamsBuilder;
importorg.apache.kafka.streams.StreamsConfig;
importorg.apache.kafka.streams.kstream.KStream;
importorg.apache.kafka.streams.kstream.Materialized;
importmon.serialization.Serdes;
importorg.apache.kafka.streams.state.QueryableStoreType;
importorg.apache.kafka.streams.state.ReadOnlyKeyValueStore;
importjava.util.Properties;
publicclassStateStoreQueryExample{
publicstaticvoidmain(String[]args){
Propertiesprops=newProperties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG,"state-store-query-example");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,Serdes.String().getClass());
StreamsBuilderbuilder=newStreamsBuilder();
//創(chuàng)建一個(gè)KStream,從輸入主題讀取數(shù)據(jù)
KStream<String,String>input=builder.stream("input-topic");
//使用OnDisk狀態(tài)存儲(chǔ),將數(shù)據(jù)聚合到一個(gè)狀態(tài)存儲(chǔ)中
input.groupByKey()
.reduce((value1,value2)->value1+value2)
.to("output-topic",Materialized.as("my-state-store"));
//創(chuàng)建一個(gè)KafkaStreams實(shí)例
KafkaStreamsstreams=newKafkaStreams(builder.build(),props);
streams.start();
//查詢狀態(tài)存儲(chǔ)
ReadOnlyKeyValueStore<String,String>store=streams.store("my-state-store",QueryableStoreType.keyValueStore());
Stringvalue=store.get("key");
System.out.println("Valueforkey:"+value);
}
}在這個(gè)示例中,我們創(chuàng)建了一個(gè)KafkaStreams應(yīng)用程序,它從input-topic主題讀取數(shù)據(jù),然后使用OnDisk狀態(tài)存儲(chǔ)將數(shù)據(jù)聚合到my-state-store狀態(tài)存儲(chǔ)中。在應(yīng)用程序啟動(dòng)后,我們使用store方法查詢my-state-store狀態(tài)存儲(chǔ)中的數(shù)據(jù),并打印出查詢結(jié)果。通過上述示例,我們可以看到KafkaStreams如何使用內(nèi)置和自定義狀態(tài)存儲(chǔ),以及如何查詢狀態(tài)存儲(chǔ)中的數(shù)據(jù)。這些功能使得KafkaStreams能夠處理復(fù)雜的流數(shù)據(jù)處理任務(wù),滿足各種業(yè)務(wù)需求。5故障恢復(fù)與一致性5.1故障恢復(fù)機(jī)制在實(shí)時(shí)計(jì)算場景中,數(shù)據(jù)流處理系統(tǒng)如KafkaStreams必須具備強(qiáng)大的故障恢復(fù)能力,以確保即使在節(jié)點(diǎn)故障的情況下,數(shù)據(jù)處理流程也能繼續(xù)進(jìn)行,不會(huì)丟失數(shù)據(jù)或?qū)е聰?shù)據(jù)處理的不連續(xù)。KafkaStreams通過以下機(jī)制實(shí)現(xiàn)故障恢復(fù):5.1.1狀態(tài)存儲(chǔ)的備份與恢復(fù)KafkaStreams使用狀態(tài)存儲(chǔ)(StateStores)來保存中間計(jì)算結(jié)果,這些狀態(tài)存儲(chǔ)可以是本地的,也可以是遠(yuǎn)程的。為了確保故障恢復(fù),KafkaStreams會(huì)定期將狀態(tài)存儲(chǔ)的數(shù)據(jù)備份到Kafka的內(nèi)部主題中。當(dāng)一個(gè)處理節(jié)點(diǎn)發(fā)生故障時(shí),KafkaStreams可以利用這些備份數(shù)據(jù)在另一個(gè)節(jié)點(diǎn)上恢復(fù)狀態(tài)存儲(chǔ),從而繼續(xù)數(shù)據(jù)處理流程。5.1.2任務(wù)的重新分配KafkaStreams將數(shù)據(jù)處理流程劃分為多個(gè)任務(wù)(Tasks),每個(gè)任務(wù)可以獨(dú)立運(yùn)行在不同的處理節(jié)點(diǎn)上。當(dāng)一個(gè)節(jié)點(diǎn)發(fā)生故障時(shí),KafkaStreams會(huì)自動(dòng)將該節(jié)點(diǎn)上的任務(wù)重新分配到其他健康的節(jié)點(diǎn)上,以確保數(shù)據(jù)處理的連續(xù)性。5.1.3消費(fèi)者組的重新平衡KafkaStreams使用消費(fèi)者組(ConsumerGroups)來管理多個(gè)處理節(jié)點(diǎn)之間的數(shù)據(jù)消費(fèi)。當(dāng)一個(gè)節(jié)點(diǎn)發(fā)生故障時(shí),消費(fèi)者組會(huì)觸發(fā)重新平衡(Rebalance),將故障節(jié)點(diǎn)的分區(qū)重新分配給其他節(jié)點(diǎn),確保所有數(shù)據(jù)都能被消費(fèi)和處理。5.2保證數(shù)據(jù)一致性數(shù)據(jù)一致性是實(shí)時(shí)計(jì)算系統(tǒng)中的關(guān)鍵要求,特別是在處理狀態(tài)存儲(chǔ)和進(jìn)行復(fù)雜的數(shù)據(jù)流操作時(shí)。KafkaStreams通過以下方式保證數(shù)據(jù)一致性:5.2.1狀態(tài)存儲(chǔ)的原子更新KafkaStreams在更新狀態(tài)存儲(chǔ)時(shí),使用原子操作來確保數(shù)據(jù)的一致性。這意味著每次更新都是不可分割的,要么完全成功,要么完全失敗,不會(huì)出現(xiàn)部分更新的情況。5.2.2事務(wù)處理KafkaStreams支持事務(wù)處理,允許將多個(gè)操作組合成一個(gè)事務(wù),確保這些操作要么全部成功,要么全部失敗。事務(wù)處理可以跨越多個(gè)Kafka主題和狀態(tài)存儲(chǔ),提供強(qiáng)一致性保證。5.2.3重復(fù)數(shù)據(jù)檢測(cè)與處理在故障恢復(fù)過程中,可能會(huì)出現(xiàn)數(shù)據(jù)的重復(fù)消費(fèi)。KafkaStreams通過內(nèi)置的重復(fù)數(shù)據(jù)檢測(cè)機(jī)制,確保即使在數(shù)據(jù)重復(fù)消費(fèi)的情況下,也能正確處理數(shù)據(jù),避免數(shù)據(jù)的重復(fù)計(jì)算或存儲(chǔ)。5.3狀態(tài)存儲(chǔ)的持久化狀態(tài)存儲(chǔ)的持久化是KafkaStreams實(shí)現(xiàn)高可用性和數(shù)據(jù)一致性的關(guān)鍵。KafkaStreams通過以下方式實(shí)現(xiàn)狀態(tài)存儲(chǔ)的持久化:5.3.1寫入Kafka內(nèi)部主題KafkaStreams將狀態(tài)存儲(chǔ)的數(shù)據(jù)定期寫入Kafka的內(nèi)部主題中,這些主題用于存儲(chǔ)狀態(tài)快照和更改日志。即使處理節(jié)點(diǎn)發(fā)生故障,狀態(tài)數(shù)據(jù)也可以從這些內(nèi)部主題中恢復(fù)。5.3.2異步持久化狀態(tài)存儲(chǔ)的持久化操作是異步進(jìn)行的,這意味著處理節(jié)點(diǎn)在進(jìn)行數(shù)據(jù)處理時(shí),不會(huì)因?yàn)槌志没僮鞫枞?。這提高了系統(tǒng)的吞吐量和響應(yīng)速度。5.3.3狀態(tài)存儲(chǔ)的版本控制KafkaStreams對(duì)狀態(tài)存儲(chǔ)進(jìn)行版本控制,確保在恢復(fù)狀態(tài)存儲(chǔ)時(shí),可以使用正確的版本數(shù)據(jù)。這避免了因版本不一致導(dǎo)致的數(shù)據(jù)處理錯(cuò)誤。5.3.4示例代碼:使用KafkaStreams的事務(wù)處理importorg.apache.kafka.streams.KafkaStreams;
importorg.apache.kafka.streams.StreamsBuilder;
importorg.apache.kafka.streams.StreamsConfig;
importorg.apache.kafka.streams.kstream.KStream;
importorg.apache.kafka.streams.kstream.Materialized;
importmon.serialization.Serdes;
importjava.util.Properties;
publicclassFaultToleranceExample{
publicstaticvoidmain(String[]args){
Propertiesprops=newProperties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG,"fault-tolerance-example");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,Serdes.String().getClass());
StreamsBuilderbuilder=newStreamsBuilder();
KStream<String,String>source=builder.stream("input-topic");
//使用事務(wù)處理
source
.transformValues(()->newValueTransformerWithKeySupplier<String,String,String>(){
privateProcessorContextcontext;
privateKeyValueStore<String,String>store;
@Override
publicvoidinit(ProcessorContextcontext){
this.context=context;
this.store=(KeyValueStore<String,String>)context.getStateStore("my-store");
}
@Override
publicStringtransform(Stringkey,Stringvalue){
mit();//提交事務(wù)
returnstore.get(key)+value;
}
@Override
publicvoidclose(){}
@Override
publicValueTransformerWithKeySupplier<String,String,String>get(){
returnthis;
}
},Materialized.as("my-store"))
.to("output-topic");
KafkaStreamsstreams=newKafkaStreams(builder.build(),props);
streams.start();
}
}5.3.5示例描述在上述示例中,我們創(chuàng)建了一個(gè)KafkaStreams應(yīng)用,該應(yīng)用從input-topic主題讀取數(shù)據(jù),使用一個(gè)狀態(tài)存儲(chǔ)my-store來保存中間計(jì)算結(jié)果,并將結(jié)果寫入output-topic主題。我們使用了事務(wù)處理來確保數(shù)據(jù)的一致性,每次更新狀態(tài)存儲(chǔ)時(shí)都會(huì)提交一個(gè)事務(wù)。這樣,即使在處理節(jié)點(diǎn)發(fā)生故障的情況下,狀態(tài)存儲(chǔ)的數(shù)據(jù)也可以從Kafka的內(nèi)部主題中恢復(fù),確保數(shù)據(jù)處理的連續(xù)性和一致性。通過KafkaStreams的故障恢復(fù)機(jī)制、數(shù)據(jù)一致性和狀態(tài)存儲(chǔ)的持久化,我們可以構(gòu)建出高可用、強(qiáng)一致的實(shí)時(shí)數(shù)據(jù)流處理系統(tǒng),滿足現(xiàn)代大數(shù)據(jù)處理的需求。6性能調(diào)優(yōu)與監(jiān)控6.1性能調(diào)優(yōu)策略在KafkaStreams中,性能調(diào)優(yōu)是一個(gè)關(guān)鍵的步驟,以確保應(yīng)用程序能夠高效地處理大量數(shù)據(jù)。以下是一些核心的性能調(diào)優(yōu)策略:6.1.1并行處理KafkaStreams允許你通過增加num.stream.threads配置參數(shù)的值來并行處理數(shù)據(jù)。這可以利用多核處理器的計(jì)算能力,從而提高處理速度。例如:Propertiesprops=newProperties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG,"my-stream-processing-app");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG,4);//默認(rèn)為1,這里設(shè)置為4以利用更多CPU核心6.1.2優(yōu)化狀態(tài)存儲(chǔ)狀態(tài)存儲(chǔ)是KafkaStreams中的關(guān)鍵組件,用于存儲(chǔ)中間結(jié)果。優(yōu)化狀態(tài)存儲(chǔ)可以顯著提高性能。例如,使用GlobalKTable可以減少狀態(tài)存儲(chǔ)的查詢延遲,因?yàn)樗鼘?shù)據(jù)存儲(chǔ)在所有任務(wù)中,而不是像KTable那樣只存儲(chǔ)在一部分任務(wù)中。StreamsBuilderbuilder=newStreamsBuilder();
GlobalKTable<String,Long>globalTable=builder.globalTable("my-topic",Consumed.with(Serdes.String(),Serdes.Long()));6.1.3批處理KafkaStreams支持批處理,通過調(diào)整erval.ms配置參數(shù),可以控制批處理的頻率和大小,從而影響性能和延遲。props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,10000);//設(shè)置提交間隔為10秒6.2監(jiān)控與度量KafkaStreams提供了豐富的監(jiān)控和度量功能,幫助你了解應(yīng)用程序的運(yùn)行狀態(tài)和性能。以下是一些關(guān)鍵的監(jiān)控指標(biāo):6.2.1應(yīng)用程序指標(biāo)KafkaStreams應(yīng)用程序可以報(bào)告各種指標(biāo),包括處理延遲、輸入和輸出記錄的速率、任務(wù)的運(yùn)行狀態(tài)等。這些指標(biāo)可以通過JMX或Prometheus等監(jiān)控工具獲取。//使用Prometheus監(jiān)控
StreamsConfigconfig=newStreamsConfig(props);
config.setMetricsRecordingLevel(MetricConfig.MetricLevel.DEBUG);
config.setMetricsReporters(Arrays.asList(newPrometheusMetricsReporter()));6.2.2狀態(tài)存儲(chǔ)指標(biāo)狀態(tài)存儲(chǔ)的指標(biāo)包括讀寫操作的速率、緩存命中率、存儲(chǔ)大小等。這些指標(biāo)對(duì)于理解狀態(tài)存儲(chǔ)的性能至關(guān)重要。//監(jiān)控狀態(tài)存儲(chǔ)
GlobalKTable<String,Long>globalTable=builder.globalTable("my-topic",Consumed.with(Serdes.String(),Serdes.Long()));
globalTable.toStream().peek((k,v)->{
//可以在這里添加自定義的監(jiān)控邏輯
});6.3資源管理與優(yōu)化資源管理是確保KafkaStreams應(yīng)用程序高效運(yùn)行的關(guān)鍵。以下是一些資源管理的策略:6.3.1內(nèi)存管理KafkaStreams使用內(nèi)存來存儲(chǔ)狀態(tài)和緩存數(shù)據(jù)。通過調(diào)整state.store.direct.buffer.size和state.store.cache.max.bytes.buffer.size配置參數(shù),可以優(yōu)化內(nèi)存使用。props.put(StreamsConfig.STATE_STORE_CACHE_MAX_BYTES_BUFFER_SIZE_CONFIG,1024*1024*1024);//設(shè)置狀態(tài)存儲(chǔ)緩存的最大大小為1GB6.3.2CPU和IO資源KafkaStreams應(yīng)用程序的性能受到CPU和IO資源的限制。通過調(diào)整并行度和批處理大小,可以優(yōu)化這些資源的使用。例如,增加并行度可以提高CPU利用率,但可能會(huì)增加IO負(fù)載。6.3.3網(wǎng)絡(luò)資源KafkaStreams應(yīng)用程序與Kafka集群之間的網(wǎng)絡(luò)通信也會(huì)影響性能。優(yōu)化網(wǎng)絡(luò)資源包括減少不必要的網(wǎng)絡(luò)調(diào)用,例如通過本地緩存結(jié)果,以及優(yōu)化網(wǎng)絡(luò)配置,如增加網(wǎng)絡(luò)緩沖區(qū)大小。props.put(StreamsConfig.NETWORK_BUFFER_SIZE_CONFIG,1024*1024);//設(shè)置網(wǎng)絡(luò)緩沖區(qū)大小為1MB通過上述策略,你可以有效地調(diào)優(yōu)和監(jiān)控KafkaStreams應(yīng)用程序,確保其在處理大量數(shù)據(jù)時(shí)能夠保持高性能和穩(wěn)定性。7高級(jí)主題7.1窗口操作窗口操作是KafkaStreams中處理流數(shù)據(jù)的關(guān)鍵特性之一,它允許用戶基于時(shí)間或數(shù)據(jù)量對(duì)流進(jìn)行分段,從而實(shí)現(xiàn)對(duì)數(shù)據(jù)的聚合和分析。KafkaStreams提供了三種窗口類型:TumblingWindow、SlidingWindow和SessionWindow。7.1.1TumblingWindow滾動(dòng)窗口(TumblingWindow)是一種固定大小的窗口,窗口之間沒有重疊。一旦一個(gè)窗口結(jié)束,下一個(gè)窗口立即開始,沒有空隙。示例代碼StreamsBuilderbuilder=newStreamsBuilder();
KStream<String,String>source=builder.stream("input-topic");
KTable<Windowed<String>,Long>counts=source
.mapValues(value->value.toLowerCase())//將所有消息轉(zhuǎn)換為小寫
.groupBy((key,value)->value)//按消息內(nèi)容分組
.windowedBy(TimeWindows.of(Duration.ofMinutes(5)))//使用5分鐘的滾動(dòng)窗口
.count(Materialized.<String,Long,WindowStore<Bytes,byte[]>>as("counts-store"));
counts.toStream().to("output-topic",Produced.with(Serdes.String(),Serdes.Long()));7.1.2SlidingWindow滑動(dòng)窗口(SlidingWindow)允許窗口之間有重疊,這使得在窗口大小和滑動(dòng)間隔之間有更大的靈活性。示例代碼StreamsBuilderbuilder=newStreamsBuilder();
KStream<String,String>source=builder.stream("input-topic");
KTable<Windowed<String>,Long>counts=source
.mapValues(value->value.toLowerCase())
.groupBy((key,value)->value)
.windowedBy(TimeWindows.of(Duration.ofMinutes(5)).advanceBy(Duration.ofMinutes(1)))//使用5分鐘的窗口,每1分鐘滑動(dòng)一次
.count(Materialized.<String,Long,WindowStore<Bytes,byte[]>>as("counts-store"));
counts.toStream().to("output-topic",Produced.with(Serdes.String(),Serdes.Long()));7.1.3SessionWindow會(huì)話窗口(SessionWindow)基于事件的間隔來定義窗口,當(dāng)事件之間的間隔超過一定閾值時(shí),會(huì)話窗口結(jié)束。示例代碼StreamsBuilderbuilder=newStreamsBuilder();
KStream<String,String>source=builder.stream("input-topic");
KTable<Windowed<String>,Long>counts=source
.mapValues(value->value.toLowerCase())
.groupBy((key,value)->value)
.windowedBy(SessionWindows.with(Duration.ofMinutes(5)))//使用5分鐘的會(huì)話間隔
.count(Materialized.<String,Long,WindowStore<Bytes,byte[]>>as("counts-store"));
counts.toStream().to("output-topic",Produced.with(Serdes.String(),Serdes.Long()));7.2時(shí)間概念:事件時(shí)間與處理時(shí)間在流處理中,時(shí)間是一個(gè)核心概念,KafkaStreams支持兩種時(shí)間模型:事件時(shí)間(EventTime)和處理時(shí)間(ProcessingTime)。7.2.1事件時(shí)間事件時(shí)間是指事件實(shí)際發(fā)生的時(shí)間,這通常存儲(chǔ)在事件數(shù)據(jù)中。KafkaStreams使用事件時(shí)間來處理時(shí)間敏感的流操作,如窗口操作。7.2.2處理時(shí)間處理時(shí)間是指事件被處理的時(shí)間,即流處理系統(tǒng)接收到事件并開始處理的時(shí)間。處理時(shí)間通常用于系統(tǒng)內(nèi)部的定時(shí)任務(wù)。7.3流處理的并行性KafkaStreams支持并行處理,通過將流數(shù)據(jù)分割成多個(gè)分區(qū),每個(gè)分區(qū)可以在不同的線程或不同的機(jī)器上并行處理。這種并行性提高了處理速度和系統(tǒng)的可擴(kuò)展性。7.3.1示例代碼Propertiesprops=newProperties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG,"my-stream-processing-app");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,Serdes.String().getClass());
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG,4);//設(shè)置4個(gè)處理線程
StreamsBuilderbuilder=newStreamsBuilder();
KStream<String,String>source=builder.stream("input-topic");
KTable<Windowed<String>,Long>counts=source
.mapValues(value->value.toLowerCase())
.groupBy((key,value)->value)
.windowedBy(TimeWindows.of(Duration.ofMinutes(5)))
.count(Materialized.<String,Long,WindowStore<Bytes,byte[]>>as("counts-store"));
KafkaStreamsstreams=newKafkaStreams(builder.build(),props);
streams.start();7.4KafkaStreams與KafkaConnect的集成KafkaStreams和KafkaConnect都是Kafka生態(tài)中的重要組件,它們可以協(xié)同工作,KafkaStreams用于實(shí)時(shí)流處理,而KafkaConnect用于數(shù)據(jù)的導(dǎo)入和導(dǎo)出。7.4.1示例代碼使用KafkaConnect將數(shù)據(jù)從外部系統(tǒng)導(dǎo)入到Kafka,然后使用KafkaStreams進(jìn)行實(shí)時(shí)處理,最后將處理結(jié)果導(dǎo)出到另一個(gè)Kafka主題或外部系統(tǒng)。配置KafkaConnect{
"name":"my-source-connector",
"config":{
"connector.class":"io.confluent.connect.jdbc.JdbcSourceConnector",
"topic.prefix":"jdbc.",
"connection.url":"jdbc:mysql://localhost:3306/mydatabase",
"connection.user":"myuser",
"connection.password":"mypassword",
"table.whitelist":"mytable",
"mode":"incrementing",
"":"id",
"key.converter":"org.apache.kafka.connect.json.JsonConverter",
"value.converter":"org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable":"false",
"value.converter.schemas.enable":"false"
}
}使用KafkaStreams處理數(shù)據(jù)StreamsBuilderbuilder=newStreamsBuilder();
KStream<String,String>source=builder.stream("jdbc.mytable");
KTable<Windowed<String>,Long>counts=source
.mapValues(value->value.toLowerCase())
.groupBy((key,value)->value)
.windowedBy(TimeWindows.of(Duration.ofMinutes(5)))
溫馨提示
- 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請(qǐng)下載最新的WinRAR軟件解壓。
- 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請(qǐng)聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
- 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內(nèi)容里面會(huì)有圖紙預(yù)覽,若沒有圖紙預(yù)覽就沒有圖紙。
- 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
- 5. 人人文庫網(wǎng)僅提供信息存儲(chǔ)空間,僅對(duì)用戶上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對(duì)用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對(duì)任何下載內(nèi)容負(fù)責(zé)。
- 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請(qǐng)與我們聯(lián)系,我們立即糾正。
- 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時(shí)也不承擔(dān)用戶因使用這些下載資源對(duì)自己和他人造成任何形式的傷害或損失。
最新文檔
- 2025年度商業(yè)企業(yè)購銷合同印花稅稅率調(diào)整與稅務(wù)風(fēng)險(xiǎn)防范協(xié)議
- 2025年度代付農(nóng)民工工資保障服務(wù)合同模板
- 2025年度公司法人掛名品牌授權(quán)合同
- 2025年度勞動(dòng)仲裁調(diào)解協(xié)議范文:智能制造領(lǐng)域員工糾紛處理指南
- 2025年惠州城市職業(yè)學(xué)院單招職業(yè)適應(yīng)性測(cè)試題庫附答案
- 2025年澳大利亞數(shù)字商務(wù)消費(fèi)者見解報(bào)告(英文版)-Wunderkind
- 2025年度宅基地永久轉(zhuǎn)讓與農(nóng)村旅游項(xiàng)目投資合同
- 2024大眾養(yǎng)老金融調(diào)研報(bào)告-太平洋保險(xiǎn)
- 2025年度家庭緊急救援服務(wù)家政合同范例雙方
- 2025年哈密職業(yè)技術(shù)學(xué)院單招職業(yè)適應(yīng)性測(cè)試題庫匯編
- 《中醫(yī)藥學(xué)概論》期末考試復(fù)習(xí)題庫(含答案)
- 2024年秋季新外研版三年級(jí)上冊(cè)英語課件 Unit 1 第1課時(shí)(Get ready)
- 單位委托員工辦理水表業(yè)務(wù)委托書
- 2024版《保密法》培訓(xùn)課件
- 2024年內(nèi)蒙古中考地理生物試卷(含答案)
- 廣東省汕尾市汕尾市2024年中考一模英語試題(含答案)
- 2024年江西電力職業(yè)技術(shù)學(xué)院單招職業(yè)適應(yīng)性測(cè)試題庫含答案
- 2024年邵陽職業(yè)技術(shù)學(xué)院單招職業(yè)適應(yīng)性測(cè)試題庫完美版
- 醫(yī)院dip付費(fèi)績效考核制度
- 支氣管肺泡灌洗技術(shù)
- 體育概論課外體育活動(dòng)
評(píng)論
0/150
提交評(píng)論