




版權(quán)說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請(qǐng)進(jìn)行舉報(bào)或認(rèn)領(lǐng)
文檔簡(jiǎn)介
實(shí)時(shí)計(jì)算:KafkaStreams:KafkaStreams數(shù)據(jù)流處理模型技術(shù)教程1實(shí)時(shí)計(jì)算:KafkaStreams:KafkaStreams數(shù)據(jù)流處理模型1.1簡(jiǎn)介1.1.1KafkaStreams概述KafkaStreams是一個(gè)用于構(gòu)建實(shí)時(shí)流數(shù)據(jù)微服務(wù)的客戶端庫,它允許開發(fā)者在ApacheKafka中處理數(shù)據(jù)流,實(shí)現(xiàn)數(shù)據(jù)的實(shí)時(shí)分析和處理。KafkaStreams提供了強(qiáng)大的流處理能力,包括數(shù)據(jù)的轉(zhuǎn)換、聚合、連接等操作,同時(shí)保證了數(shù)據(jù)處理的高吞吐量、低延遲和容錯(cuò)性。1.1.2實(shí)時(shí)計(jì)算的重要性實(shí)時(shí)計(jì)算在現(xiàn)代數(shù)據(jù)處理中扮演著至關(guān)重要的角色,尤其是在需要即時(shí)響應(yīng)和決策的場(chǎng)景中,如金融交易、網(wǎng)絡(luò)安全、物聯(lián)網(wǎng)數(shù)據(jù)分析等。實(shí)時(shí)計(jì)算能夠快速處理和分析數(shù)據(jù)流,提供即時(shí)的洞察和反饋,這對(duì)于提高業(yè)務(wù)效率和響應(yīng)速度至關(guān)重要。1.1.3KafkaStreams與Kafka的區(qū)別KafkaStreams是基于ApacheKafka構(gòu)建的流處理框架,而Kafka本身是一個(gè)分布式流處理平臺(tái),主要用于數(shù)據(jù)的發(fā)布和訂閱。KafkaStreams提供了更高級(jí)的流處理API,使得開發(fā)者能夠直接在Kafka中進(jìn)行數(shù)據(jù)處理,而無需將數(shù)據(jù)導(dǎo)出到其他處理系統(tǒng)。1.2KafkaStreams的核心概念1.2.1Streams在KafkaStreams中,數(shù)據(jù)流(Streams)是處理數(shù)據(jù)的基本單位。數(shù)據(jù)流可以是Kafka中的一個(gè)或多個(gè)主題,也可以是內(nèi)存中的數(shù)據(jù)結(jié)構(gòu)。KafkaStreams提供了豐富的API來操作數(shù)據(jù)流,包括map、filter、reduce等。1.2.2StateStores狀態(tài)存儲(chǔ)(StateStores)是KafkaStreams中用于保存中間狀態(tài)的數(shù)據(jù)結(jié)構(gòu)。這些狀態(tài)存儲(chǔ)可以是持久化的,保存在Kafka的主題中,也可以是易失的,保存在本地內(nèi)存中。狀態(tài)存儲(chǔ)使得KafkaStreams能夠進(jìn)行復(fù)雜的數(shù)據(jù)處理,如窗口操作和連接操作。1.2.3Topologies拓?fù)洌═opologies)是KafkaStreams中描述數(shù)據(jù)流處理邏輯的圖形表示。它定義了數(shù)據(jù)流的處理流程,包括數(shù)據(jù)的輸入、處理和輸出。開發(fā)者可以通過定義拓?fù)鋪順?gòu)建復(fù)雜的流處理應(yīng)用。1.3KafkaStreams的應(yīng)用場(chǎng)景1.3.1實(shí)時(shí)數(shù)據(jù)分析KafkaStreams可以用于實(shí)時(shí)數(shù)據(jù)分析,如實(shí)時(shí)監(jiān)控和警報(bào)系統(tǒng)。例如,一個(gè)網(wǎng)絡(luò)安全系統(tǒng)可以使用KafkaStreams實(shí)時(shí)分析網(wǎng)絡(luò)流量數(shù)據(jù),檢測(cè)異常行為并立即發(fā)出警報(bào)。1.3.2數(shù)據(jù)集成KafkaStreams可以作為數(shù)據(jù)集成的工具,用于連接不同的數(shù)據(jù)源和數(shù)據(jù)目標(biāo)。例如,一個(gè)企業(yè)可能需要將多個(gè)系統(tǒng)的數(shù)據(jù)實(shí)時(shí)整合到一個(gè)數(shù)據(jù)倉庫中,KafkaStreams可以實(shí)現(xiàn)這一需求。1.3.3微服務(wù)構(gòu)建KafkaStreams可以用于構(gòu)建微服務(wù),每個(gè)微服務(wù)可以處理特定的數(shù)據(jù)流。例如,一個(gè)電商系統(tǒng)可能有多個(gè)微服務(wù),分別處理訂單流、庫存流和支付流,KafkaStreams可以作為這些微服務(wù)的數(shù)據(jù)處理引擎。1.3.4示例代碼:實(shí)時(shí)數(shù)據(jù)分析importorg.apache.kafka.streams.KafkaStreams;
importorg.apache.kafka.streams.StreamsBuilder;
importorg.apache.kafka.streams.StreamsConfig;
importorg.apache.kafka.streams.kstream.KStream;
importmon.serialization.Serdes;
importjava.util.Properties;
publicclassRealTimeDataAnalysis{
publicstaticvoidmain(String[]args){
Propertiesprops=newProperties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG,"real-time-data-analysis");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,Serdes.String().getClass());
StreamsBuilderbuilder=newStreamsBuilder();
KStream<String,String>rawClickStream=builder.stream("click-stream-topic");
KStream<String,String>filteredClickStream=rawClickStream
.filter((key,value)->value.contains("purchase"));
filteredClickStream.to("purchase-events-topic");
KafkaStreamsstreams=newKafkaStreams(builder.build(),props);
streams.start();
}
}1.3.5示例描述上述代碼示例展示了如何使用KafkaStreams進(jìn)行實(shí)時(shí)數(shù)據(jù)分析。在這個(gè)例子中,我們從一個(gè)名為click-stream-topic的主題中讀取數(shù)據(jù)流,然后過濾出包含“purchase”關(guān)鍵字的事件,最后將這些事件寫入到另一個(gè)名為purchase-events-topic的主題中。這個(gè)例子可以用于實(shí)時(shí)監(jiān)控和分析用戶的購買行為,以便于即時(shí)響應(yīng)和決策。1.4結(jié)論KafkaStreams提供了一個(gè)強(qiáng)大且靈活的平臺(tái),用于構(gòu)建實(shí)時(shí)流數(shù)據(jù)處理應(yīng)用。通過理解其核心概念和應(yīng)用場(chǎng)景,開發(fā)者可以充分利用KafkaStreams的能力,實(shí)現(xiàn)高效的數(shù)據(jù)處理和分析。2KafkaStreams基礎(chǔ)2.1KafkaStreams的安裝與配置在開始使用KafkaStreams之前,首先需要確保Kafka和KafkaStreams正確安裝并配置在你的系統(tǒng)中。以下步驟將指導(dǎo)你完成這一過程:2.1.1安裝Kafka下載Kafka:訪問ApacheKafka的官方網(wǎng)站下載最新版本的Kafka包。通常,下載的是一個(gè)壓縮文件,例如kafka_2.12-2.8.1.tgz。解壓縮Kafka包:tar-xzfkafka_2.12-2.8.1.tgz配置Kafka:進(jìn)入解壓后的Kafka目錄,編輯config/perties文件,設(shè)置必要的參數(shù),如broker.id和listeners。2.1.2安裝KafkaStreamsKafkaStreams是Kafka的一部分,因此在安裝Kafka后,你已經(jīng)擁有了KafkaStreams。但是,為了在Java項(xiàng)目中使用KafkaStreams,你需要在你的pom.xml或build.gradle文件中添加依賴。Maven依賴<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>2.8.1</version>
</dependency>Gradle依賴implementation'org.apache.kafka:kafka-streams:2.8.1'2.1.3配置KafkaStreams在Java項(xiàng)目中,你需要?jiǎng)?chuàng)建一個(gè)StreamsConfig對(duì)象來配置KafkaStreams應(yīng)用。以下是一個(gè)配置示例:importorg.apache.kafka.streams.StreamsConfig;
importjava.util.Properties;
publicclassKafkaStreamsConfig{
publicstaticvoidmain(String[]args){
Propertiesprops=newProperties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG,"my-stream-app");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,mon.serialization.Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,mon.serialization.Serdes.String().getClass().getName());
StreamsConfigconfig=newStreamsConfig(props);
}
}2.2KafkaStreams的開發(fā)環(huán)境搭建搭建KafkaStreams的開發(fā)環(huán)境,主要涉及設(shè)置Java開發(fā)環(huán)境和集成KafkaStreams庫。2.2.1設(shè)置Java開發(fā)環(huán)境確保你的系統(tǒng)中安裝了JavaDevelopmentKit(JDK)。你可以通過運(yùn)行以下命令來檢查JDK的版本:java-version2.2.2集成KafkaStreams庫在你的IDE中(如IntelliJIDEA或Eclipse),創(chuàng)建一個(gè)新的Java項(xiàng)目,并按照上述Maven或Gradle依賴添加KafkaStreams庫。2.3KafkaStreams的基本操作KafkaStreams提供了處理流數(shù)據(jù)的強(qiáng)大功能,包括讀取、轉(zhuǎn)換和寫入數(shù)據(jù)流。以下是一個(gè)簡(jiǎn)單的KafkaStreams應(yīng)用示例,該應(yīng)用從一個(gè)主題讀取數(shù)據(jù),轉(zhuǎn)換數(shù)據(jù),并將結(jié)果寫入另一個(gè)主題。importmon.serialization.Serdes;
importorg.apache.kafka.streams.StreamsBuilder;
importorg.apache.kafka.streams.StreamsConfig;
importorg.apache.kafka.streams.kstream.KStream;
importorg.apache.kafka.streams.kstream.KStreamBuilder;
importcessor.Topology;
importjava.util.Properties;
publicclassSimpleKafkaStreamsApp{
publicstaticvoidmain(String[]args){
Propertiesprops=newProperties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG,"simple-stream-app");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,Serdes.String().getClass().getName());
StreamsBuilderbuilder=newStreamsBuilder();
KStream<String,String>source=builder.stream("input-topic");
KStream<String,String>transformed=source.mapValues(value->value.toUpperCase());
transformed.to("output-topic");
Topologytopology=builder.build();
KafkaStreamsstreams=newKafkaStreams(topology,props);
streams.start();
}
}2.3.1解釋配置:創(chuàng)建一個(gè)StreamsConfig對(duì)象,設(shè)置應(yīng)用ID、Kafka服務(wù)器地址以及默認(rèn)的序列化和反序列化類。構(gòu)建流處理拓?fù)?使用StreamsBuilder來定義流處理邏輯。在這個(gè)例子中,我們從input-topic讀取數(shù)據(jù),將數(shù)據(jù)值轉(zhuǎn)換為大寫,然后將結(jié)果寫入output-topic。啟動(dòng)流應(yīng)用:創(chuàng)建一個(gè)KafkaStreams對(duì)象,并使用構(gòu)建的拓?fù)浜团渲脝?dòng)應(yīng)用。2.4KafkaStreams的API介紹KafkaStreams提供了豐富的API來處理流數(shù)據(jù),包括KStream和KTableAPI。KStream用于處理無界數(shù)據(jù)流,而KTable用于表示有界數(shù)據(jù)集。2.4.1KStreamAPIKStreamAPI允許你執(zhí)行以下操作:讀取數(shù)據(jù)流:使用stream方法從一個(gè)或多個(gè)主題讀取數(shù)據(jù)。轉(zhuǎn)換數(shù)據(jù)流:使用mapValues、flatMap等方法轉(zhuǎn)換數(shù)據(jù)流中的數(shù)據(jù)。連接數(shù)據(jù)流:使用join方法將兩個(gè)數(shù)據(jù)流連接起來,基于鍵進(jìn)行操作。2.4.2KTableAPIKTableAPI提供了以下功能:讀取數(shù)據(jù)集:使用table方法從一個(gè)主題讀取數(shù)據(jù),并將其轉(zhuǎn)換為一個(gè)鍵值對(duì)表。更新數(shù)據(jù)集:使用transformValues方法更新表中的值。連接數(shù)據(jù)集:使用join方法將兩個(gè)表連接起來,基于鍵進(jìn)行操作。2.4.3示例:使用KTableAPI以下是一個(gè)使用KTableAPI的示例,該應(yīng)用從一個(gè)主題讀取數(shù)據(jù),更新數(shù)據(jù),并將結(jié)果寫入另一個(gè)主題。importmon.serialization.Serdes;
importorg.apache.kafka.streams.StreamsBuilder;
importorg.apache.kafka.streams.StreamsConfig;
importorg.apache.kafka.streams.kstream.KTable;
importorg.apache.kafka.streams.kstream.Materialized;
importjava.util.Properties;
publicclassSimpleKTableApp{
publicstaticvoidmain(String[]args){
Propertiesprops=newProperties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG,"simple-table-app");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,Serdes.String().getClass().getName());
StreamsBuilderbuilder=newStreamsBuilder();
KTable<String,String>source=builder.table("input-topic");
KTable<String,String>updated=source.transformValues((key,value,out)->out.send(key,value.toUpperCase()),"upper-case-processor");
updated.toStream().to("output-topic",Produced.with(Serdes.String(),Serdes.String()));
KafkaStreamsstreams=newKafkaStreams(builder.build(),props);
streams.start();
}
}2.4.4解釋配置:創(chuàng)建一個(gè)StreamsConfig對(duì)象,設(shè)置應(yīng)用ID、Kafka服務(wù)器地址以及默認(rèn)的序列化和反序列化類。構(gòu)建表處理拓?fù)?使用StreamsBuilder來定義表處理邏輯。在這個(gè)例子中,我們從input-topic讀取數(shù)據(jù),將數(shù)據(jù)值轉(zhuǎn)換為大寫,然后將結(jié)果寫入output-topic。啟動(dòng)流應(yīng)用:創(chuàng)建一個(gè)KafkaStreams對(duì)象,并使用構(gòu)建的拓?fù)浜团渲脝?dòng)應(yīng)用。通過以上步驟和示例,你已經(jīng)了解了如何安裝和配置KafkaStreams,搭建開發(fā)環(huán)境,以及使用基本的KafkaStreamsAPI來處理流數(shù)據(jù)和數(shù)據(jù)集。這將為你在實(shí)時(shí)計(jì)算領(lǐng)域使用KafkaStreams打下堅(jiān)實(shí)的基礎(chǔ)。3實(shí)時(shí)計(jì)算:KafkaStreams:KafkaStreams數(shù)據(jù)流處理模型3.1數(shù)據(jù)流處理的步驟在KafkaStreams中,數(shù)據(jù)流處理主要遵循以下步驟:讀取數(shù)據(jù):從Kafka主題中讀取數(shù)據(jù)流。處理數(shù)據(jù):對(duì)數(shù)據(jù)流進(jìn)行轉(zhuǎn)換、過濾、聚合等操作。寫入結(jié)果:將處理后的數(shù)據(jù)寫入新的Kafka主題或外部系統(tǒng)。3.1.1示例:使用KStream進(jìn)行數(shù)據(jù)流處理importorg.apache.kafka.streams.KafkaStreams;
importorg.apache.kafka.streams.StreamsBuilder;
importorg.apache.kafka.streams.StreamsConfig;
importorg.apache.kafka.streams.kstream.KStream;
importmon.serialization.Serdes;
importjava.util.Properties;
publicclassWordCountApplication{
publicstaticvoidmain(String[]args){
Propertiesprops=newProperties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG,"wordcount");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,Serdes.String().getClass());
StreamsBuilderbuilder=newStreamsBuilder();
KStream<String,String>textLines=builder.stream("input-topic");
KStream<String,Long>wordCounts=textLines
.flatMapValues(value->Arrays.asList(value.toLowerCase().split("\\W+")))
.groupBy((key,word)->word)
.count(Materialized.as("counts-store"));
wordCounts.to("output-topic");
KafkaStreamsstreams=newKafkaStreams(builder.build(),props);
streams.start();
}
}此示例展示了如何使用KStream從input-topic主題讀取數(shù)據(jù),進(jìn)行單詞計(jì)數(shù)處理,并將結(jié)果寫入output-topic主題。3.2KStream和KTable的概念KafkaStreams提供了兩種主要的數(shù)據(jù)抽象:KStream和KTable。KStream:表示無界的數(shù)據(jù)流,可以進(jìn)行各種流處理操作,如map、filter、join等。KTable:表示有界的數(shù)據(jù)集,通常用于狀態(tài)存儲(chǔ)和查詢,可以進(jìn)行聚合操作,如count、reduce等。3.2.1示例:KTable的使用importorg.apache.kafka.streams.KafkaStreams;
importorg.apache.kafka.streams.StreamsBuilder;
importorg.apache.kafka.streams.StreamsConfig;
importorg.apache.kafka.streams.kstream.KTable;
importmon.serialization.Serdes;
importjava.util.Properties;
publicclassUserActivityAggregator{
publicstaticvoidmain(String[]args){
Propertiesprops=newProperties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG,"user-activity-aggregator");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,Serdes.String().getClass());
StreamsBuilderbuilder=newStreamsBuilder();
KTable<String,Long>userActivityCounts=builder.table("user-activity-topic")
.groupBy((key,value)->value.split(":")[0])
.count(Materialized.as("user-activity-store"));
userActivityCounts.toStream().to("user-activity-counts-topic");
KafkaStreamsstreams=newKafkaStreams(builder.build(),props);
streams.start();
}
}此示例展示了如何使用KTable從user-activity-topic主題讀取用戶活動(dòng)數(shù)據(jù),進(jìn)行用戶活動(dòng)計(jì)數(shù),并將結(jié)果寫入user-activity-counts-topic主題。3.3窗口和會(huì)話窗口的使用KafkaStreams支持窗口操作,包括時(shí)間窗口和會(huì)話窗口,用于處理有時(shí)間限制的數(shù)據(jù)流。時(shí)間窗口:基于時(shí)間戳進(jìn)行分組,可以設(shè)置固定窗口或滑動(dòng)窗口。會(huì)話窗口:基于事件之間的間隔進(jìn)行分組,適用于需要根據(jù)事件間隔進(jìn)行聚合的場(chǎng)景。3.3.1示例:使用時(shí)間窗口進(jìn)行聚合importorg.apache.kafka.streams.KafkaStreams;
importorg.apache.kafka.streams.StreamsBuilder;
importorg.apache.kafka.streams.StreamsConfig;
importorg.apache.kafka.streams.kstream.KStream;
importmon.serialization.Serdes;
importorg.apache.kafka.streams.kstream.TimeWindows;
importjava.util.Properties;
publicclassTimeWindowAggregator{
publicstaticvoidmain(String[]args){
Propertiesprops=newProperties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG,"time-window-aggregator");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,Serdes.String().getClass());
StreamsBuilderbuilder=newStreamsBuilder();
KStream<String,String>purchaseEvents=builder.stream("purchase-events-topic");
KStream<String,Long>purchaseCounts=purchaseEvents
.groupBy((key,value)->value.split(":")[0])
.windowedBy(TimeWindows.of(Duration.ofMinutes(5)))
.count(Materialized.as("purchase-counts-store"));
purchaseCounts.to("purchase-counts-topic");
KafkaStreamsstreams=newKafkaStreams(builder.build(),props);
streams.start();
}
}此示例展示了如何使用時(shí)間窗口從purchase-events-topic主題讀取購買事件數(shù)據(jù),進(jìn)行每5分鐘的購買次數(shù)計(jì)數(shù),并將結(jié)果寫入purchase-counts-topic主題。3.4狀態(tài)存儲(chǔ)和查詢KafkaStreams允許在處理數(shù)據(jù)流時(shí)使用狀態(tài)存儲(chǔ),這有助于實(shí)現(xiàn)復(fù)雜的數(shù)據(jù)流處理邏輯,如累積聚合、狀態(tài)查詢等。3.4.1示例:使用狀態(tài)存儲(chǔ)進(jìn)行累積聚合importorg.apache.kafka.streams.KafkaStreams;
importorg.apache.kafka.streams.StreamsBuilder;
importorg.apache.kafka.streams.StreamsConfig;
importorg.apache.kafka.streams.kstream.KStream;
importmon.serialization.Serdes;
importorg.apache.kafka.streams.kstream.Materialized;
importorg.apache.kafka.streams.kstream.Aggregator;
importjava.util.Properties;
publicclassCumulativeAggregator{
publicstaticvoidmain(String[]args){
Propertiesprops=newProperties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG,"cumulative-aggregator");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,Serdes.Long().getClass());
StreamsBuilderbuilder=newStreamsBuilder();
KStream<String,Long>purchaseEvents=builder.stream("purchase-events-topic");
KStream<String,Long>cumulativeSums=purchaseEvents
.groupBy((key,value)->key)
.aggregate(
()->0L,
(key,value,aggregate)->aggregate+value,
Materialized.as("cumulative-sums-store")
);
cumulativeSums.to("cumulative-sums-topic");
KafkaStreamsstreams=newKafkaStreams(builder.build(),props);
streams.start();
}
}此示例展示了如何使用狀態(tài)存儲(chǔ)從purchase-events-topic主題讀取購買事件數(shù)據(jù),進(jìn)行累積求和操作,并將結(jié)果寫入cumulative-sums-topic主題。3.4.2狀態(tài)查詢狀態(tài)存儲(chǔ)不僅用于內(nèi)部處理,還可以通過KafkaStreams的查詢API對(duì)外提供狀態(tài)查詢服務(wù)。importorg.apache.kafka.streams.KafkaStreams;
importorg.apache.kafka.streams.StreamsBuilder;
importorg.apache.kafka.streams.StreamsConfig;
importorg.apache.kafka.streams.kstream.KStream;
importmon.serialization.Serdes;
importorg.apache.kafka.streams.kstream.Materialized;
importorg.apache.kafka.streams.state.QueryableStoreTypes;
importorg.apache.kafka.streams.state.ReadOnlyWindowStore;
importjava.util.Properties;
publicclassStateQueryApplication{
publicstaticvoidmain(String[]args){
Propertiesprops=newProperties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG,"state-query");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,Serdes.Long().getClass());
StreamsBuilderbuilder=newStreamsBuilder();
KStream<String,Long>purchaseEvents=builder.stream("purchase-events-topic");
KStream<String,Long>purchaseCounts=purchaseEvents
.groupBy((key,value)->key)
.windowedBy(TimeWindows.of(Duration.ofMinutes(5)))
.count(Materialized.<String,Long,WindowStore<Bytes,byte[]>>as("purchase-counts-store")
.withValueSerde(Serdes.Long())
.withQueryableStore("purchase-counts-query",QueryableStoreTypes.<String,Long>windowStore()));
purchaseCounts.to("purchase-counts-topic");
KafkaStreamsstreams=newKafkaStreams(builder.build(),props);
streams.start();
//狀態(tài)查詢
ReadOnlyWindowStore<String,Long>store=streams.store("purchase-counts-query",QueryableStoreTypes.windowStore());
Stringkey="user123";
longwindowStart=1597036800000L;//2020-08-09T00:00:00Z
longwindowEnd=1597040400000L;//2020-08-09T01:00:00Z
Longcount=store.fetch(key,windowStart,windowEnd).iterator().next().value();
System.out.println("User"+key+"made"+count+"purchasesinthelasthour.");
}
}此示例展示了如何使用狀態(tài)存儲(chǔ)從purchase-events-topic主題讀取購買事件數(shù)據(jù),進(jìn)行每5分鐘的購買次數(shù)計(jì)數(shù),并提供狀態(tài)查詢服務(wù),查詢user123在上一個(gè)小時(shí)的購買次數(shù)。通過以上示例,我們可以看到KafkaStreams提供了強(qiáng)大的數(shù)據(jù)流處理能力,包括讀取數(shù)據(jù)流、使用KStream和KTable進(jìn)行數(shù)據(jù)處理、使用窗口操作進(jìn)行時(shí)間敏感的聚合,以及利用狀態(tài)存儲(chǔ)進(jìn)行累積聚合和狀態(tài)查詢。這些功能使得KafkaStreams成為構(gòu)建實(shí)時(shí)數(shù)據(jù)處理和分析應(yīng)用的理想選擇。4KafkaStreams的高級(jí)特性4.1流處理的故障恢復(fù)4.1.1原理KafkaStreams提供了一套強(qiáng)大的故障恢復(fù)機(jī)制,確保在處理過程中遇到故障時(shí),數(shù)據(jù)處理的正確性和一致性。這一機(jī)制主要依賴于Kafka的持久化存儲(chǔ)和狀態(tài)存儲(chǔ)特性,以及流處理應(yīng)用程序的checkpoint和rebalance功能。CheckpointCheckpoint是KafkaStreams用于記錄流處理狀態(tài)點(diǎn)的一種機(jī)制。當(dāng)應(yīng)用程序運(yùn)行時(shí),它會(huì)定期將當(dāng)前的處理狀態(tài)保存到Kafka的狀態(tài)存儲(chǔ)中。這樣,在發(fā)生故障后,應(yīng)用程序可以從最近的checkpoint恢復(fù),繼續(xù)處理數(shù)據(jù),而不會(huì)丟失處理狀態(tài)或重復(fù)處理數(shù)據(jù)。RebalanceRebalance機(jī)制用于處理流處理應(yīng)用程序中的任務(wù)重新分配。當(dāng)集群中的節(jié)點(diǎn)發(fā)生故障或添加新節(jié)點(diǎn)時(shí),KafkaStreams會(huì)重新分配任務(wù),確保數(shù)據(jù)處理的負(fù)載均衡。在rebalance過程中,應(yīng)用程序會(huì)自動(dòng)從checkpoint恢復(fù),繼續(xù)處理數(shù)據(jù)。4.1.2示例代碼importorg.apache.kafka.streams.KafkaStreams;
importorg.apache.kafka.streams.StreamsBuilder;
importorg.apache.kafka.streams.StreamsConfig;
importorg.apache.kafka.streams.kstream.KStream;
importmon.serialization.Serdes;
importjava.util.Properties;
publicclassFaultToleranceExample{
publicstaticvoidmain(String[]args){
Propertiesprops=newProperties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG,"fault-tolerance-example");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,Serdes.String().getClass());
StreamsBuilderbuilder=newStreamsBuilder();
KStream<String,String>source=builder.stream("input-topic");
KStream<String,String>processed=source
.mapValues(value->value.toUpperCase())
.to("output-topic");
KafkaStreamsstreams=newKafkaStreams(builder.build(),props);
streams.start();
}
}在上述代碼中,我們創(chuàng)建了一個(gè)簡(jiǎn)單的流處理應(yīng)用程序,它從input-topic讀取數(shù)據(jù),將數(shù)據(jù)轉(zhuǎn)換為大寫,然后寫入output-topic。KafkaStreams會(huì)自動(dòng)處理checkpoint和rebalance,確保應(yīng)用程序的高可用性和數(shù)據(jù)的正確處理。4.2性能調(diào)優(yōu)技巧4.2.1原理KafkaStreams的性能調(diào)優(yōu)主要涉及以下幾個(gè)方面:數(shù)據(jù)分區(qū)、狀態(tài)存儲(chǔ)優(yōu)化、并行處理、資源管理和網(wǎng)絡(luò)優(yōu)化。通過調(diào)整這些參數(shù)和策略,可以顯著提高流處理應(yīng)用程序的吞吐量和響應(yīng)時(shí)間。數(shù)據(jù)分區(qū)合理的數(shù)據(jù)分區(qū)策略可以提高數(shù)據(jù)處理的并行度,減少數(shù)據(jù)的跨節(jié)點(diǎn)傳輸,從而提高處理速度。KafkaStreams支持自定義分區(qū)器,可以根據(jù)業(yè)務(wù)需求優(yōu)化數(shù)據(jù)的分布。狀態(tài)存儲(chǔ)優(yōu)化狀態(tài)存儲(chǔ)是KafkaStreams中的關(guān)鍵組件,用于存儲(chǔ)流處理過程中的中間狀態(tài)。優(yōu)化狀態(tài)存儲(chǔ)的使用,如選擇合適的狀態(tài)存儲(chǔ)類型(如in-memory、rocksdb等),可以顯著提高處理性能。并行處理KafkaStreams支持并行處理,通過增加應(yīng)用程序的并行實(shí)例數(shù),可以提高數(shù)據(jù)處理的吞吐量。但是,過多的并行實(shí)例也會(huì)增加資源消耗和管理復(fù)雜度,因此需要根據(jù)實(shí)際需求和資源情況調(diào)整并行度。資源管理合理配置應(yīng)用程序的資源使用,如CPU、內(nèi)存和磁盤I/O,可以避免資源瓶頸,提高處理性能。KafkaStreams提供了資源管理的API,可以根據(jù)業(yè)務(wù)需求動(dòng)態(tài)調(diào)整資源分配。網(wǎng)絡(luò)優(yōu)化網(wǎng)絡(luò)延遲和帶寬是影響流處理性能的重要因素。通過優(yōu)化網(wǎng)絡(luò)配置,如減少數(shù)據(jù)的序列化和反序列化,使用壓縮等,可以減少網(wǎng)絡(luò)延遲,提高數(shù)據(jù)傳輸效率。4.2.2示例代碼importorg.apache.kafka.streams.KafkaStreams;
importorg.apache.kafka.streams.StreamsBuilder;
importorg.apache.kafka.streams.StreamsConfig;
importorg.apache.kafka.streams.kstream.KStream;
importmon.serialization.Serdes;
importjava.util.Properties;
publicclassPerformanceTuningExample{
publicstaticvoidmain(String[]args){
Propertiesprops=newProperties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG,"performance-tuning-example");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,Serdes.String().getClass());
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG,4);//設(shè)置并行處理線程數(shù)
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,1000);//設(shè)置checkpoint的間隔
StreamsBuilderbuilder=newStreamsBuilder();
KStream<String,String>source=builder.stream("input-topic");
KStream<String,String>processed=source
.mapValues(value->value.toUpperCase())
.to("output-topic");
KafkaStreamsstreams=newKafkaStreams(builder.build(),props);
streams.start();
}
}在上述代碼中,我們通過設(shè)置NUM_STREAM_THREADS_CONFIG和COMMIT_INTERVAL_MS_CONFIG參數(shù),調(diào)整了應(yīng)用程序的并行處理線程數(shù)和checkpoint的間隔,以優(yōu)化性能。4.3KafkaStreams的監(jiān)控與日志4.3.1原理KafkaStreams提供了豐富的監(jiān)控和日志功能,用于監(jiān)控流處理應(yīng)用程序的運(yùn)行狀態(tài)和性能。這些功能包括但不限于:應(yīng)用程序的運(yùn)行狀態(tài)、處理延遲、吞吐量、錯(cuò)誤率等。通過監(jiān)控和日志,可以及時(shí)發(fā)現(xiàn)和解決問題,確保應(yīng)用程序的穩(wěn)定運(yùn)行。4.3.2監(jiān)控指標(biāo)app-id:應(yīng)用程序的ID。task-id:任務(wù)的ID。store-name:狀態(tài)存儲(chǔ)的名稱。record-count:處理的記錄數(shù)。record-lag:記錄的延遲。record-throughput:記錄的吞吐量。store-read-latency:狀態(tài)存儲(chǔ)的讀取延遲。store-write-latency:狀態(tài)存儲(chǔ)的寫入延遲。4.3.3日志配置KafkaStreams的日志配置主要通過perties或logback.xml文件進(jìn)行??梢耘渲萌罩镜募?jí)別、輸出格式、輸出目的地等,以滿足不同的日志需求。4.4與KafkaConnect的集成4.4.1原理KafkaConnect是一個(gè)用于高效、可靠地將數(shù)據(jù)導(dǎo)入或?qū)С鯧afka的工具。KafkaStreams可以與KafkaConnect集成,實(shí)現(xiàn)數(shù)據(jù)的實(shí)時(shí)處理和批量處理的結(jié)合。通過KafkaConnect,可以將外部數(shù)據(jù)源的數(shù)據(jù)導(dǎo)入Kafka,然后使用KafkaStreams進(jìn)行實(shí)時(shí)處理,最后將處理結(jié)果導(dǎo)出到外部數(shù)據(jù)源。4.4.2示例代碼importorg.apache.kafka.connect.source.SourceConnector;
importorg.apache.kafka.connect.source.SourceRecord;
importorg.apache.kafka.connect.source.SourceTask;
importorg.apache.kafka.connect.data.Schema;
importorg.apache.kafka.connect.data.SchemaBuilder;
importorg.apache.kafka.connect.data.Struct;
importorg.apache.kafka.connect.source.SourceConnector;
importjava.util.ArrayList;
importjava.util.List;
importjava.util.Map;
publicclassMySourceConnectorextendsSourceConnector{
@Override
publicStringversion(){
return"1.0";
}
@Override
publicvoidstart(Map<String,String>props){
//初始化連接器
}
@Override
publicClass<?extendsSourceTask>taskClass(){
returnMySourceTask.class;
}
@Override
publicList<Map<String,String>>taskConfigs(intmaxTasks){
List<Map<String,String>>configs=newArrayList<>();
for(inti=0;i<maxTasks;i++){
Map<String,String>config=newHashMap<>();
config.put("topic","my-topic");
configs.add(config);
}
returnconfigs;
}
@Override
publicvoidstop(){
//停止連接器
}
}在上述代碼中,我們定義了一個(gè)自定義的KafkaConnect源連接器MySourceConnector,它可以從外部數(shù)據(jù)源讀取數(shù)據(jù),然后寫入Kafka的my-topic主題。然后,我們可以使用KafkaStreams從my-topic讀取數(shù)據(jù),進(jìn)行實(shí)時(shí)處理。4.4.3集成流程定義和實(shí)現(xiàn)自定義的KafkaConnect源連接器或目標(biāo)連接器。使用KafkaConnect將數(shù)據(jù)導(dǎo)入或?qū)С鯧afka。使用KafkaStreams從Kafka讀取數(shù)據(jù),進(jìn)行實(shí)時(shí)處理。將處理結(jié)果寫回Kafka,或使用KafkaConnect將結(jié)果導(dǎo)出到外部數(shù)據(jù)源。通過上述流程,可以實(shí)現(xiàn)KafkaStreams與KafkaConnect的無縫集成,實(shí)現(xiàn)數(shù)據(jù)的實(shí)時(shí)處理和批量處理的結(jié)合。5實(shí)戰(zhàn)案例分析5.1KafkaStreams在金融交易中的使用5.1.1實(shí)時(shí)數(shù)據(jù)分析案例在金融交易領(lǐng)域,實(shí)時(shí)數(shù)據(jù)處理至關(guān)重要。KafkaStreams提供了一種高效、可擴(kuò)展的方式來處理這些數(shù)據(jù),使得金融機(jī)構(gòu)能夠?qū)崟r(shí)地監(jiān)控市場(chǎng)動(dòng)態(tài)、檢測(cè)異常交易、執(zhí)行風(fēng)險(xiǎn)控制策略等。下面,我們將通過一個(gè)具體的案例來展示KafkaStreams在金融交易中的應(yīng)用。案例背景假設(shè)我們有一個(gè)高頻交易系統(tǒng),需要實(shí)時(shí)處理股票交易數(shù)據(jù),以檢測(cè)潛在的市場(chǎng)操縱行為。數(shù)據(jù)源包括股票交易記錄、市場(chǎng)報(bào)價(jià)和交易者信息。我們的目標(biāo)是實(shí)時(shí)分析這些數(shù)據(jù),識(shí)別出可能的市場(chǎng)操縱行為,如“洗售”(washtrading)。數(shù)據(jù)模型交易記錄:包含交易ID、股票代碼、交易價(jià)格、交易量、交易時(shí)間、交易者ID等字段。市場(chǎng)報(bào)價(jià):包含股票代碼、報(bào)價(jià)時(shí)間、買入價(jià)、賣出價(jià)等字段。交易者信息:包含交易者ID、交易者類型(個(gè)人/機(jī)構(gòu))、交易者信用等級(jí)等字段。實(shí)現(xiàn)步驟數(shù)據(jù)攝取:使用Kafka作為數(shù)據(jù)攝取和存儲(chǔ)的平臺(tái),將交易記錄、市場(chǎng)報(bào)價(jià)和交易者信息分別發(fā)送到不同的Kafka主題。數(shù)據(jù)處理:使用KafkaStreamsAPI來處理這些數(shù)據(jù)。首先,我們需要將交易記錄和市場(chǎng)報(bào)價(jià)進(jìn)行連接,以獲取每個(gè)交易的實(shí)時(shí)市場(chǎng)情況。然后,我們將交易者信息加入到處理流程中,以評(píng)估交易者的信用等級(jí)。異常檢測(cè):通過定義規(guī)則來檢測(cè)異常交易行為。例如,如果一個(gè)交易者的交易量突然增加,且交易價(jià)格接近市場(chǎng)報(bào)價(jià)的中間值,這可能是一種市場(chǎng)操縱行為。結(jié)果輸出:將檢測(cè)到的異常交易行為輸出到另一個(gè)Kafka主題,供后續(xù)的風(fēng)險(xiǎn)控制和合規(guī)檢查使用。代碼示例importorg.apache.kafka.streams.KafkaStreams;
importorg.apache.kafka.streams.StreamsBuilder;
importorg.apache.kafka.streams.StreamsConfig;
importorg.apache.kafka.streams.kstream.KStream;
importorg.apache.kafka.streams.kstream.KTable;
importorg.apache.kafka.streams.kstream.Materialized;
importmon.serialization.Serdes;
publicclassFinancialTransactionProcessor{
publicstaticvoidmain(String[]args){
StreamsConfigconfig=newStreamsConfig(loadProps());
StreamsBuilderbuilder=newStreamsBuilder();
//讀取交易記錄主題
KStream<String,String>transactionStream=builder.stream("transactions");
//讀取市場(chǎng)報(bào)價(jià)主題
KTable<String,String>marketPriceTable=builder.table("market-prices");
//連接交易記錄和市場(chǎng)報(bào)價(jià)
KStream<String,String>joinedStream=transactionStream.leftJoin(
marketPriceTable,
(transaction,marketPrice)->{
//這里可以進(jìn)行數(shù)據(jù)處理,例如計(jì)算交易價(jià)格與市場(chǎng)報(bào)價(jià)的差值
returntransaction+""+marketPrice;
},
Materialized.as("join-state-store")
);
//進(jìn)一步處理,例如檢測(cè)異常交易
KStream<String,String>anomalyDetectionStream=joinedStream.filter(
(key,value)->{
//這里可以定義異常檢測(cè)的邏輯,例如檢查交易價(jià)格是否接近市場(chǎng)報(bào)價(jià)的中間值
returnisAnomaly(value);
}
);
//輸出結(jié)果到Kafka主題
anomalyDetectionStream.to("anomaly-detections");
KafkaStreamsstreams=newKafkaStreams(builder.build(),config);
streams.start();
}
privatestaticbooleanisAnomaly(StringjoinedData){
//假設(shè)這里有一個(gè)邏輯來判斷交易是否異常
returnfalse;
}
privatestaticPropertiesloadProps(){
Propertiesprops=newProperties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG,"financial-transaction-processor");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,Serdes.String().getClass().getName());
returnprops;
}
}5.1.2流處理在電商領(lǐng)域的應(yīng)用構(gòu)建實(shí)時(shí)推薦系統(tǒng)在電商領(lǐng)域,實(shí)時(shí)推薦系統(tǒng)能夠根據(jù)用戶的瀏覽和購買行為,實(shí)時(shí)推薦相關(guān)商品,從而提高用戶滿意度和轉(zhuǎn)化率。KafkaStreams提供了一種構(gòu)建實(shí)時(shí)推薦系統(tǒng)的強(qiáng)大工具,能夠處理大量用戶行為數(shù)據(jù),生成個(gè)性化推薦。數(shù)據(jù)模型用戶行為數(shù)據(jù):包括用戶ID、行為類型(瀏覽/購買)、商品ID、行為時(shí)間等字段。商品信息:包括商品ID、商品類別、商品描述等字段。實(shí)現(xiàn)步驟數(shù)據(jù)攝?。簩⒂脩粜袨閿?shù)據(jù)和商品信息發(fā)送到Kafka主題。數(shù)據(jù)處理:使用KafkaStreamsAPI處理用戶行為數(shù)據(jù),通過聚合和連接操作,生成用戶興趣模型。推薦生成:基于用戶興趣模型,從商品信息中選擇相關(guān)商品,生成推薦列表。結(jié)果輸出:將推薦列表輸出到Kafka主題,供前端應(yīng)用使用。代碼示例importorg.apache.kafka.streams.KafkaStreams;
importorg.apache.kafka.streams.StreamsBuilder;
importorg.apache.kafka.streams.StreamsConfig;
importorg.apache.kafka.streams.kstream.KStream;
importorg.apache.kafka.streams.kstream.KTable;
importorg.apache.kafka.streams.kstream.Materialized;
importmon.serialization.Serdes;
publicclassEcommerceRecommendationSystem{
publicstaticvoidmain(String[]args){
StreamsConfigconfig=newStreamsConfig(loadProps());
StreamsBuilderbuilder=newStreamsBuilder();
//讀取用戶行為數(shù)據(jù)主題
KStream<String,String>userBehaviorStream=builder.stream("user-behaviors");
//讀取商品信息主題
KTable<String,String>productInfoTable=builder.table("product-info");
//連接用戶行為數(shù)據(jù)和商品信息
KStream<String,String>joinedStream=userBehaviorStream.leftJoin(
productInfoTable,
(behavior,productInfo)->{
//這里可以進(jìn)行數(shù)據(jù)處理,例如生成用戶興趣模型
returnbehavior+""+productInfo;
},
Materialized.as("join-state-store")
);
//進(jìn)一步處理,例如生成推薦列表
KStream<String,String>recommendationStream=joinedStream.filter(
(key,value)->{
//這里可以定義推薦邏輯,例如檢查商品是否與用戶興趣匹配
returnisRelevant(value);
}
);
//輸出結(jié)果到Kafka主題
recommendationStream.to("recommendations");
KafkaStreamsstreams=newKafkaStreams(builder.build(),config);
streams.start();
}
privatestaticbooleanisRelevant(StringjoinedData){
//假設(shè)這里有一個(gè)邏輯來判斷商品是否與用戶興趣匹配
re
溫馨提示
- 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請(qǐng)下載最新的WinRAR軟件解壓。
- 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請(qǐng)聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
- 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內(nèi)容里面會(huì)有圖紙預(yù)覽,若沒有圖紙預(yù)覽就沒有圖紙。
- 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
- 5. 人人文庫網(wǎng)僅提供信息存儲(chǔ)空間,僅對(duì)用戶上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對(duì)用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對(duì)任何下載內(nèi)容負(fù)責(zé)。
- 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請(qǐng)與我們聯(lián)系,我們立即糾正。
- 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時(shí)也不承擔(dān)用戶因使用這些下載資源對(duì)自己和他人造成任何形式的傷害或損失。
最新文檔
- 2025年鏈家房屋買賣定金支付及退還標(biāo)準(zhǔn)協(xié)議
- 二零二五年度住房租賃補(bǔ)貼擔(dān)保服務(wù)合同
- 二零二五年度蘇州市教育機(jī)構(gòu)用工企業(yè)勞動(dòng)合同書
- 二零二五年度云計(jì)算資源合作共享合同
- 2025年度電子商務(wù)平臺(tái)招防范合同法律風(fēng)險(xiǎn)合作協(xié)議
- 2025年度涂料班組涂料行業(yè)市場(chǎng)分析咨詢合同
- 二零二五年度特色日租房短租體驗(yàn)協(xié)議書
- 二零二五年度貸款居間代理及金融科技創(chuàng)新應(yīng)用合同
- 2025年度高端合同事務(wù)律師服務(wù)合同
- 2025年度智慧交通項(xiàng)目提前終止合同及交通設(shè)施移交協(xié)議
- 司機(jī)安全駕駛培訓(xùn)課件
- 硬化性肺泡細(xì)胞瘤-課件
- 簡(jiǎn)明新疆地方史趙陽
- 狹窄性腱鞘炎中醫(yī)臨床路徑及表單
- Q∕SY 19001-2017 風(fēng)險(xiǎn)分類分級(jí)規(guī)范
- 智慧消防綜合解決方案
- 市場(chǎng)營銷組合策略及營銷戰(zhàn)略課件
- 信息技術(shù)基礎(chǔ)ppt課件(完整版)
- DGJ 08-70-2021 建筑物、構(gòu)筑物拆除技術(shù)標(biāo)準(zhǔn)
- 2022年義務(wù)教育語文課程標(biāo)準(zhǔn)(2022版)解讀【新課標(biāo)背景下的初中名著閱讀教學(xué)質(zhì)量提升思考】
- 屋面網(wǎng)架結(jié)構(gòu)液壓提升施工方案(50頁)
評(píng)論
0/150
提交評(píng)論