kafka相關(guān)工具基本概念及使用_第1頁(yè)
kafka相關(guān)工具基本概念及使用_第2頁(yè)
kafka相關(guān)工具基本概念及使用_第3頁(yè)
kafka相關(guān)工具基本概念及使用_第4頁(yè)
kafka相關(guān)工具基本概念及使用_第5頁(yè)
已閱讀5頁(yè),還剩42頁(yè)未讀, 繼續(xù)免費(fèi)閱讀

下載本文檔

版權(quán)說(shuō)明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請(qǐng)進(jìn)行舉報(bào)或認(rèn)領(lǐng)

文檔簡(jiǎn)介

本課件包括演示文稿、示例、代碼、題庫(kù)、視頻和聲音等內(nèi)容,北風(fēng)網(wǎng)和講師擁有完全知識(shí)產(chǎn)權(quán);只限于善意學(xué)習(xí)者在本課程使用,不得在課程范圍外向任何第三方散播。任何其他人或者機(jī)構(gòu)不得盜版、復(fù)制、仿造其中的創(chuàng)意和內(nèi)容,我們保留一切通過(guò)法律手段追究違反者的權(quán)利。課程詳情請(qǐng)咨詢微信公眾號(hào):北風(fēng)教育官方網(wǎng)址:/法律聲明消息訂閱框架KAFKA高吞吐量的分布式發(fā)布訂閱消息系統(tǒng)主講人:Gerry上海育創(chuàng)網(wǎng)絡(luò)科技有限公司課上課下“九字”真言認(rèn)真聽(tīng),善摘錄,勤思考多溫故,樂(lè)實(shí)踐,再發(fā)散四不原則不懶散惰性,不遲到早退不請(qǐng)假曠課,不拖延作業(yè)一點(diǎn)注意事項(xiàng)違反“四不原則”,不包就業(yè)和推薦就業(yè)課程要求嚴(yán)格是大愛(ài)寄語(yǔ)Kafka初識(shí)Kafka功能架構(gòu)

Kafka重要概念

Kafka安裝部署與測(cè)試

Kafka

Producer講解

Kafka

Consumer講解

Kafka與Flume集成

Kafka與Log4j集成

Kafka集群監(jiān)控學(xué)習(xí)內(nèi)容與目標(biāo)It

lets

you

publish

and

subscribe

to

streams

of

records.

In

this

respect

itis

similar

to

a

message

queue

or

enterprise

messaging

systemKafka?

is

a

distributed,

partitioned,

replicated

commit

log

serviceKafka?

isusedfor

building

real-timedata

pipelines

apps.Features:Horizontally

Scalable:水平可擴(kuò)展(擴(kuò)展性)

Fault-tolerant:容錯(cuò)(容錯(cuò)性&可用性&可靠性)

Fast:快速Distributed:分布式What

is

Kafka??由于Kafka存在高容錯(cuò)、高擴(kuò)展、分布式等特性,Kafka主要應(yīng)用場(chǎng)景如下:消息系統(tǒng)日志收集系統(tǒng)

Metrics監(jiān)控系統(tǒng)Kafka適用場(chǎng)景Node2Zookeeper負(fù)責(zé)Kafka元數(shù)據(jù)管理以及Consumer相關(guān)數(shù)據(jù)管理Node1Producers1Producers3Producers2Topic1Topic2Topic3Consumers1Consumers2分區(qū)1分區(qū)1分區(qū)3分區(qū)2分區(qū)3分區(qū)1Broker2分區(qū)2Broker2分區(qū)1

分區(qū)2分區(qū)2分區(qū)3Broker1分區(qū)1分區(qū)2分區(qū)2分區(qū)2Broker1分區(qū)1分區(qū)3分區(qū)1分區(qū)3Message(消息):傳遞的數(shù)據(jù)對(duì)象,主要由四部分構(gòu)成:offset(偏移量)、key、value、timestamp(插入時(shí)間)Broker(代理者):Kafka集群中的機(jī)器/服務(wù)被成為broker,是一個(gè)物理概念。Topic(主題):維護(hù)Kafka上的消息類型被稱為T(mén)opic,是一個(gè)邏輯概念。Partition(分區(qū)):具體維護(hù)Kafka上的消息數(shù)據(jù)的最小單位,一個(gè)Topic可以包含多個(gè)分區(qū);Partition特性:ordered&immutable。(在數(shù)據(jù)的產(chǎn)生和消費(fèi)過(guò)程中,不需要關(guān)注數(shù)據(jù)具體存儲(chǔ)的Partition在那個(gè)Broker上,只需要指定Topic即可,由Kafka負(fù)責(zé)將數(shù)據(jù)和對(duì)應(yīng)的Partition關(guān)聯(lián)上)Producer(生產(chǎn)者):負(fù)責(zé)將數(shù)據(jù)發(fā)送到Kafka對(duì)應(yīng)Topic的進(jìn)程

Consumer(消費(fèi)者):負(fù)責(zé)從對(duì)應(yīng)Topic獲取數(shù)據(jù)的進(jìn)程ConsumerGroup(消費(fèi)者組):每個(gè)consumer都屬于一個(gè)特定的group組,一個(gè)group組可以包含多個(gè)consumer,但一個(gè)組中只會(huì)有一個(gè)consumer消費(fèi)數(shù)據(jù)。Kafka基本信息術(shù)語(yǔ)Kafka是由LinkedIn公司開(kāi)發(fā)的,之后貢獻(xiàn)給Apache基金會(huì),成為Apache的一個(gè)頂級(jí)項(xiàng)目,開(kāi)發(fā)語(yǔ)言為Scala。提供了各種不同語(yǔ)言的API,具體參考Kafka的cwiki頁(yè)面;安裝方式主要由三種,分別是:?jiǎn)螜C(jī)、偽分布式、完全分布式;其中偽分布式和完全分布式基本一樣安裝步驟:安裝JAVA和Scala安裝Zookeeper安裝KafkaKafka安裝介紹下載安裝包、解壓并配置環(huán)境變量KAFKA_HOME修改配置文件${KAFKA_HOME}/conf/perties。如果是偽分布式,那么需要在的單臺(tái)機(jī)器上copy多個(gè)perties文件;如果是完全分布式,那么需要將修改好的KAFKA完全copy到其他機(jī)器上啟動(dòng)Kafka服務(wù),啟動(dòng)命令如下(偽分布式):${KAFKA_HOME}/bin/kafka-server-start.sh

xxx/perties${KAFKA_HOME}/bin/kafka-server-start.sh

xxx/perties關(guān)閉服務(wù)使用${KAFKA_HOME}/bin/kafka-server-stop.sh進(jìn)行操作Kafka安裝(偽分布式)Kafka安裝配置項(xiàng)(一)Kafka安裝配置項(xiàng)(二)創(chuàng)建Topic列出Topic查看Topic信息修改Topic啟動(dòng)Kafka自帶Producer和Consumer進(jìn)行數(shù)據(jù)測(cè)試Kafka基本操作一個(gè)Kafka的Message由一個(gè)固定長(zhǎng)度的header和一個(gè)變長(zhǎng)的消息體body組成header部分由一個(gè)字節(jié)的magic(文件格式)和四個(gè)字節(jié)的CRC32(用于判斷body消息體是否正常)構(gòu)成。當(dāng)magic的值為1的時(shí)候,會(huì)在magic和crc32之間多一個(gè)字節(jié)的數(shù)據(jù):attributes(保存一些相關(guān)屬性,比如是否壓縮、壓縮格式等等);如果magic的值為0,那么不存在attributes屬性body是由N個(gè)字節(jié)構(gòu)成的一個(gè)消息體,包含了具體的key/value消息備注:每個(gè)版本的Kafka消息格式是不一樣的Kafka發(fā)送消息格式存儲(chǔ)在磁盤(pán)的日志采用不同于Producer發(fā)送的消息格式,每個(gè)日志文件都是一個(gè)

“l(fā)ogentries”序列,每一個(gè)logentry包含一個(gè)四字節(jié)整型數(shù)(message長(zhǎng)度,值為1+4+N),一個(gè)字節(jié)的magic,四個(gè)字節(jié)的CRC32值,最終是N個(gè)字節(jié)的消息數(shù)據(jù)。每條消息都有一個(gè)當(dāng)前Partition下唯一的64位offset,指定該消息的起始下標(biāo)位置,存儲(chǔ)消息格式如下:Kafka

Log消息格式(一)這個(gè)“l(fā)ogentries”并非由一個(gè)文件構(gòu)成,而是分成多個(gè)segmentfile(日志文件,存儲(chǔ)具體的消息記錄)和一個(gè)索引文件(存儲(chǔ)每個(gè)segment文件的offset偏移量范

圍)。結(jié)構(gòu)如右圖所示:Kafka

Log消息格式(二)一個(gè)Topic分為多個(gè)Partition來(lái)進(jìn)行數(shù)據(jù)管理,一個(gè)Partition中的數(shù)據(jù)是有序、不可變的,使用偏移量(offset)唯一標(biāo)識(shí)一條數(shù)據(jù),是一個(gè)long類型的數(shù)據(jù)Partition接收到producer發(fā)送過(guò)來(lái)數(shù)據(jù)后,會(huì)產(chǎn)生一個(gè)遞增的offset偏移量數(shù)據(jù),同時(shí)將數(shù)據(jù)保存到本地的磁盤(pán)文件中(文件內(nèi)容追加的方式寫(xiě)入數(shù)據(jù));Partition中的數(shù)據(jù)存活時(shí)間超過(guò)參數(shù)值(log.retention.{ms,minutes,hours},默認(rèn)7天)的時(shí)候進(jìn)行刪除(默認(rèn))Consumer根據(jù)offset消費(fèi)對(duì)應(yīng)Topic的Partition中的數(shù)據(jù)(也就是每個(gè)Consumer消費(fèi)的每個(gè)Topic的Partition都擁有自己的offset偏移量)注意:Kafka的數(shù)據(jù)消費(fèi)是順序讀寫(xiě)的,磁盤(pán)的順序讀寫(xiě)速度(600MB/sec)比隨機(jī)讀寫(xiě)速度(100k/sec)快Kafka消息存儲(chǔ)機(jī)制(一)Kafka消息存儲(chǔ)機(jī)制(二)一個(gè)Topic中的所有數(shù)據(jù)分布式的存儲(chǔ)在kafka集群的所有機(jī)器(broker)上,以分區(qū)(partition)的的形式進(jìn)行數(shù)據(jù)存儲(chǔ);每個(gè)分區(qū)允許存在備份數(shù)據(jù)/備份分區(qū)(存儲(chǔ)在同一kafka集群的其它broker上的分區(qū))每個(gè)數(shù)據(jù)分區(qū)在Kafka集群中存在一個(gè)broker節(jié)點(diǎn)上的分區(qū)叫做leader,存儲(chǔ)在其它broker上的備份分區(qū)叫做followers;只有l(wèi)eader節(jié)點(diǎn)負(fù)責(zé)該分區(qū)的數(shù)據(jù)讀寫(xiě)操作,followers節(jié)點(diǎn)作為leader節(jié)點(diǎn)的熱備節(jié)點(diǎn),從leader節(jié)點(diǎn)備份數(shù)據(jù);當(dāng)

leader節(jié)點(diǎn)掛掉的時(shí)候,followers節(jié)點(diǎn)中會(huì)有一個(gè)節(jié)點(diǎn)變成leader節(jié)點(diǎn),重新提供服務(wù)Kafka集群的Partition的leader和followers切換依賴ZookeeperKafka分布式機(jī)制Kafka集群中由producer負(fù)責(zé)數(shù)據(jù)的產(chǎn)生,并發(fā)送到對(duì)應(yīng)的Topic;Producer通過(guò)push的方式將數(shù)據(jù)發(fā)送到對(duì)應(yīng)Topic的分區(qū)Producer發(fā)送到Topic的數(shù)據(jù)是有key/value鍵值對(duì)組成的,Kafka根據(jù)key的不同的值決定數(shù)據(jù)發(fā)送到不同的Partition,默認(rèn)采用Hash的機(jī)制發(fā)送數(shù)據(jù)到對(duì)應(yīng)

Topic的不同Partition中,配置參數(shù)為{partitioner.class}Producer發(fā)送數(shù)據(jù)的方式分為sync(同步)和async(異步)兩種,默認(rèn)為同步方式,由參數(shù){producer.type}決定;當(dāng)發(fā)送模式為異步發(fā)送的時(shí)候,Producer提供重

試機(jī)制,默認(rèn)失敗重試發(fā)送3次Kafka消息產(chǎn)生/收集機(jī)制Kafka有兩種模式消費(fèi)數(shù)據(jù):隊(duì)列和發(fā)布訂閱;在隊(duì)列模式下,一條數(shù)據(jù)只會(huì)發(fā)送給customergroup中的一個(gè)customer進(jìn)行消費(fèi);在發(fā)布訂閱模式下,一條數(shù)據(jù)會(huì)發(fā)送給多個(gè)customer進(jìn)行消費(fèi)Kafka的Customer基于offset對(duì)kafka中的數(shù)據(jù)進(jìn)行消費(fèi),對(duì)于一個(gè)customergroup中的所有customer共享一個(gè)offset偏移量Kafka中通過(guò)控制Customer的參數(shù){group.id}來(lái)決定kafka是什么數(shù)據(jù)消費(fèi)模式,如果所有消費(fèi)者的該參數(shù)值是相同的,那么此時(shí)的kafka就是類似于隊(duì)列模式,

數(shù)據(jù)只會(huì)發(fā)送到一個(gè)customer,此時(shí)類似于負(fù)載均衡;否則就是發(fā)布訂閱模式Kafka消息消費(fèi)機(jī)制(一)Kafka的數(shù)據(jù)是按照分區(qū)進(jìn)行排序的(插入的順序),也就是每個(gè)分區(qū)中的數(shù)據(jù)是有序的。在Consumer進(jìn)行數(shù)據(jù)消費(fèi)的時(shí)候,也是對(duì)分區(qū)的數(shù)據(jù)進(jìn)行有序的消費(fèi)的,但是不保證所有數(shù)據(jù)的有序性(多個(gè)分區(qū)之間)Consumer

Rebalance:當(dāng)一個(gè)consumer

group組中的消費(fèi)者數(shù)量和對(duì)應(yīng)

Topic的分區(qū)數(shù)量一致的時(shí)候,此時(shí)一個(gè)Consumer消費(fèi)一個(gè)Partition的數(shù)據(jù);如果不一致,那么可能出現(xiàn)一個(gè)Consumer消費(fèi)多個(gè)Partition的數(shù)據(jù)或者不消費(fèi)數(shù)據(jù)的情況,這個(gè)機(jī)制是根據(jù)Consumer和Partition的數(shù)量動(dòng)態(tài)變化的Consumer通過(guò)poll的方式主動(dòng)從Kafka集群中獲取數(shù)據(jù)Kafka消息消費(fèi)機(jī)制(二)Kafka消息消費(fèi)機(jī)制(三)Kafka的Replication指的是Partition的復(fù)制,一個(gè)Partition的所有分區(qū)中只有一個(gè)分區(qū)是leader節(jié)點(diǎn),其它分區(qū)是follower節(jié)點(diǎn)。Replication對(duì)Kafka的吞吐率有一定的影響,但是極大的增強(qiáng)了可用性Follower節(jié)點(diǎn)會(huì)定時(shí)的從leader節(jié)點(diǎn)上獲取增量數(shù)據(jù),一個(gè)活躍的follower節(jié)點(diǎn)必須滿足一下兩個(gè)條件:所有的節(jié)點(diǎn)必須維護(hù)和zookeeper的連接(通過(guò)zk的heartbeat實(shí)現(xiàn))follower必須能夠及時(shí)的將leader上的writing復(fù)制過(guò)來(lái),不能“落后太多”;“落后太多”由參數(shù){replica.lag.time.max.ms}和{replica.lag.max.messages}決定KafkaReplicationKafka提供了一個(gè)in-syncreplicas(ISR)來(lái)確保Kafka的Leader選舉,ISR是一個(gè)保存分區(qū)node的集合,如果一個(gè)node宕機(jī)了或數(shù)據(jù)“落后太多”,leader會(huì)將該node節(jié)點(diǎn)從ISR中移除,只有ISR中的follower節(jié)點(diǎn)才有可能成為leader節(jié)點(diǎn)Leader節(jié)點(diǎn)的切換基于Zookeeper的Watcher機(jī)制,當(dāng)leader節(jié)點(diǎn)宕機(jī)的時(shí)候,其他ISR中的follower節(jié)點(diǎn)會(huì)競(jìng)爭(zhēng)的在zk中創(chuàng)建一個(gè)文件目錄(只會(huì)有一個(gè)

follower節(jié)點(diǎn)創(chuàng)建成功),創(chuàng)建成功的follower節(jié)點(diǎn)成為leader節(jié)點(diǎn)Kafka

Leader

ElectionMessageDeliverySemantics是消息系統(tǒng)中數(shù)據(jù)傳輸?shù)目煽啃员WC的一個(gè)定義,主要分為三種類型:At

most

once(最多一次):消息可能會(huì)丟失,但是不可能重復(fù)發(fā)送

At

least

once(最少一次):消息不可能丟失,但是可能重復(fù)發(fā)送

Exactly

once(僅僅一次):消息只發(fā)送一次,但不存在消息的丟失Kafka的Producer通過(guò)參數(shù){request.required.acks}來(lái)定義確定Producer和Broker之間是那種消息傳遞類型Kafka的數(shù)據(jù)是分區(qū)存儲(chǔ)的,每個(gè)分區(qū)中的數(shù)據(jù)是按照進(jìn)入kafka的時(shí)間進(jìn)行排序的,這樣不需要為每條數(shù)據(jù)存儲(chǔ)一個(gè)元數(shù)據(jù)(是否消費(fèi)),只需要為每個(gè)Consumer記錄一個(gè)對(duì)應(yīng)分區(qū)數(shù)據(jù)消費(fèi)的最高標(biāo)記位,Kafka中叫做“偏移量”(offset)Message

Delivery

Semantics消息集(messageset)二進(jìn)制傳輸

順序讀取磁盤(pán)

“零”拷貝端到端數(shù)據(jù)壓縮Why

Kafkais

Fast?消息集(message

set):Producer可以將多條消息一次發(fā)送給Kafka集群,

Kafka可以一次將所有的數(shù)據(jù)追加到文件中,減少磁盤(pán)零碎的磁盤(pán)IO;同時(shí)

consumer也可以一次性的請(qǐng)求一個(gè)數(shù)據(jù)集的數(shù)據(jù)二進(jìn)制傳輸:同時(shí)消息在傳遞過(guò)程中是基于二進(jìn)制進(jìn)行傳遞的,不需要進(jìn)行反序列化,在高負(fù)載的情況下,對(duì)性能是有一定的提升的順序讀寫(xiě)磁盤(pán):Kafka的所有數(shù)據(jù)操作都是基于文件操作的,而操作文件的方式都是順序讀寫(xiě),而順序讀寫(xiě)磁盤(pán)的速度會(huì)比隨機(jī)讀寫(xiě)快6000倍左右Why

Kafkais

Fast?“零”拷貝:在Kafka服務(wù)中,數(shù)據(jù)發(fā)送到consumer的過(guò)程中采用的是“零拷貝”,比普通的讀寫(xiě)文件方式減少了兩次操作,速度能夠提高50%端到端數(shù)據(jù)壓縮:Producer可以需要發(fā)送的數(shù)據(jù)/數(shù)據(jù)集進(jìn)行壓縮后發(fā)送到

Kafka集群,Kafka集群直接將數(shù)據(jù)保存到文件,然后Consumer消費(fèi)數(shù)據(jù)的時(shí)候,將壓縮后的數(shù)據(jù)獲取到,進(jìn)行解壓縮操作。在性能瓶頸是網(wǎng)絡(luò)帶寬的情況下,非

常有效。默認(rèn)情況下,kafka支持gzip和snappy壓縮Why

Kafkais

Fast?“零”拷貝Kafka分別提供了基于Java和Scala的API,由于Kafka不僅僅只是在大數(shù)據(jù)中使用到,所以Kafka的JavaAPI應(yīng)用的比較多?;贛aven進(jìn)行Kafka的開(kāi)發(fā),KafkaMaven依賴如下:Kafka

API<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka_2.10</artifactId><version>${kafka.version}</version></dependency>Kafka

Producer涉及到的配置信息(一)參數(shù)名稱默認(rèn)參數(shù)值備注metadata.broker.list指定kafka服務(wù)器監(jiān)聽(tīng)的主機(jī)名和端口號(hào)列表,不同服務(wù)器之間使用“,”進(jìn)行分割request.required.acks0指定producer需要等待broker返回?cái)?shù)據(jù)成功接收標(biāo)識(shí);

0表示不等待,1表示等待一個(gè)broker返回結(jié)果,-1表示等待所有broker返回結(jié)果request.timeout.ms10000當(dāng)acks參數(shù)配置的時(shí)候,指定producer等待連接過(guò)期的時(shí)間毫米數(shù)producer.typesync指定producer發(fā)送數(shù)據(jù)的方式是異步(async)還是同步(sync)serializer.classkafka.serializer.DefaultEncoder指定producer發(fā)送數(shù)據(jù)的時(shí)候數(shù)據(jù)/消息編碼器,即將消息轉(zhuǎn)換為byte數(shù)組的編碼器key.serializer.class指定producer發(fā)送數(shù)據(jù)的時(shí)候key類型的數(shù)據(jù)編碼器,默認(rèn)使用${serializer.class}給定的值partitioner.classducer.DefaultPartitioner指定producer發(fā)送數(shù)據(jù)的數(shù)據(jù)分區(qū)器,默認(rèn)采用hash進(jìn)行數(shù)據(jù)分區(qū)操作;該參數(shù)的主要功能是:決定數(shù)據(jù)到底發(fā)送到那一個(gè)分區(qū)中Kafka

Producer涉及到的配置信息(二)參數(shù)名稱默認(rèn)參數(shù)值備注compression.codecnone給定發(fā)送數(shù)據(jù)是否進(jìn)行壓縮設(shè)置,默認(rèn)不進(jìn)行壓縮;參數(shù)可選:none、gzip、snappymessage.send.max.retries3指定數(shù)據(jù)發(fā)送失敗,重試次數(shù),默認(rèn)3次retry.backoff.ms100在數(shù)據(jù)重新發(fā)送過(guò)程中,producer會(huì)刷新topic的元數(shù)據(jù)信息(leader信息),由于topic元數(shù)據(jù)的變化需要一點(diǎn)點(diǎn)時(shí)間,故該參數(shù)指定的值主要用于在producer刷新元數(shù)據(jù)之前的等待時(shí)間erval.ms600000給定producer中topic元數(shù)據(jù)周期性刷新的間隔時(shí)間,默認(rèn)10分鐘;當(dāng)該參數(shù)給定的值為負(fù)數(shù)的時(shí)候,topic元數(shù)據(jù)的刷新只有在發(fā)送數(shù)據(jù)失敗后進(jìn)行刷新;當(dāng)該參數(shù)給定為0的時(shí)候,每次發(fā)送數(shù)據(jù)后都進(jìn)行元數(shù)據(jù)刷新(不推薦);注意:元數(shù)據(jù)的刷新是在發(fā)送數(shù)據(jù)后觸發(fā)的,如果永遠(yuǎn)不發(fā)送數(shù)據(jù),那么元數(shù)據(jù)不會(huì)被刷新queue.buffering.max.ms5000當(dāng)數(shù)據(jù)傳輸方式是async(異步)的時(shí)候,指定數(shù)據(jù)在producer端停留的最長(zhǎng)時(shí)間,該參數(shù)對(duì)于數(shù)據(jù)吞吐量有一定的影響,當(dāng)時(shí)會(huì)增加數(shù)據(jù)的延遲性queue.buffering.max.messages10000當(dāng)數(shù)據(jù)傳輸方式為async(異步)的時(shí)候,指定producer端最多允許臨時(shí)保存的最大數(shù)據(jù)量,當(dāng)數(shù)據(jù)量超過(guò)該值的時(shí)候,發(fā)送一次數(shù)據(jù)Kafka

Producer涉及到的配置信息(三)參數(shù)名稱默認(rèn)參數(shù)值備注queue.enqueue.timeout.ms-1當(dāng)數(shù)據(jù)發(fā)送方式為async(異步),而且等待隊(duì)列數(shù)據(jù)填充滿的時(shí)候{queue.buffering.max.messages},一條新的數(shù)據(jù)過(guò)來(lái),最大阻塞時(shí)間;設(shè)置為0表示,不阻塞,當(dāng)隊(duì)列滿的時(shí)候,直接將新數(shù)據(jù)刪除(不發(fā)送);當(dāng)設(shè)置為正數(shù)的時(shí)候,表示等待給定毫秒數(shù)后,進(jìn)行重試操作,失敗則數(shù)據(jù)刪除(不發(fā)送);設(shè)置為-1表示一直等待,直到隊(duì)列允許添加數(shù)據(jù)batch.num.messages200當(dāng)數(shù)據(jù)發(fā)送方式為async(異步)的時(shí)候,producer一個(gè)批次發(fā)送的數(shù)據(jù)條數(shù);當(dāng)producer中的數(shù)據(jù)量達(dá)到該參數(shù)${batch.num.messages}的設(shè)置值或者數(shù)據(jù)停留時(shí)間超過(guò)參數(shù)${queue.buffering.max.ms}的時(shí)候,觸發(fā)producer發(fā)送數(shù)據(jù)的動(dòng)作(實(shí)際發(fā)送數(shù)據(jù)量可能不超過(guò)該參數(shù)值)send.buffer.bytes102400指定producer端數(shù)據(jù)緩存區(qū)大小,默認(rèn)值為:10KBKafka

Producer開(kāi)發(fā)參考頁(yè)面:

/082/documentation.html#producerapi

/081/documentation.html#producerconfigs

/081/documentation.html#apidesign

/081/documentation.html#producerapiKafka的Producer

API主要提供下列三個(gè)方法:public

void

send(KeyedMessage<K,V>

message)發(fā)送單條數(shù)據(jù)到Kafka集群public

void

send(List<KeyedMessage<K,V>>

messages)發(fā)送多條數(shù)據(jù)(數(shù)據(jù)集)到Kafka集群public

voidclose()關(guān)閉Kafka連接資源案例:使用Java語(yǔ)言實(shí)現(xiàn)一個(gè)Kafka

Producer程序并測(cè)試Kafka

Producer

APIKafka

Consumer涉及到的配置信息(一)參數(shù)名稱默認(rèn)參數(shù)值備注group.idConsumer的groupid值,如果多個(gè)Consumer的groupid的值一樣,那么表示這多個(gè)Consumer屬于同一個(gè)group組zookeeper.connectKafka元數(shù)據(jù)Zookeeper存儲(chǔ)的url,和配置文件中的參數(shù)一樣consumer.id消費(fèi)者id字符串,如果不給定的話,默認(rèn)自動(dòng)產(chǎn)生一個(gè)隨機(jī)idsocket.timeout.ms30000Consumer連接超時(shí)時(shí)間,實(shí)際超時(shí)時(shí)間是socket.timeout.ms+

max.fetch.waitsocket.receive.buffer.bytes65536接收數(shù)據(jù)的緩沖區(qū)大小,默認(rèn)64kbfetch.message.max.bytes1048576指定每個(gè)分區(qū)每次獲取數(shù)據(jù)的最大字節(jié)數(shù),一般該參數(shù)要求比

message允許的最大字節(jié)數(shù)要大,否則可能出現(xiàn)producer產(chǎn)生的數(shù)據(jù)consumer沒(méi)法消費(fèi)num.consumer.fetchers1Consumer獲取數(shù)據(jù)的線程數(shù)量mit.enabletrue是否自動(dòng)提交offset偏移量,默認(rèn)為true(自動(dòng)提交)erval.ms60000自動(dòng)提交offset偏移量的間隔時(shí)間Kafka

Consumer涉及到的配置信息(二)參數(shù)名稱默認(rèn)參數(shù)值備注rebalance.max.retries4當(dāng)一個(gè)新的Consumer添加到Consumer

Group的時(shí)候,會(huì)觸發(fā)數(shù)據(jù)消費(fèi)的

rebalance操作;rebalance操作可能會(huì)失敗,該參數(shù)的主要作用是設(shè)置

rebalance的最大重試次數(shù)fetch.min.bytes1一個(gè)請(qǐng)求最少返回記錄大小,當(dāng)一個(gè)請(qǐng)求中的返回?cái)?shù)據(jù)大小達(dá)到該參數(shù)的設(shè)置值后,記錄數(shù)據(jù)返回到consumer中fetch.wait.max.ms100一個(gè)請(qǐng)求等待數(shù)據(jù)返回的最大停留時(shí)間rebalance.backoff.ms2000rebalance重試過(guò)程中的間隔時(shí)間auto.offset.resetlargest指定consumer消費(fèi)kafka數(shù)據(jù)的時(shí)候offset初始值是啥,可選參數(shù):largest和smallest;smallest指該consumer的消費(fèi)offset是當(dāng)前kafka數(shù)據(jù)中的最小偏移量;largest指該consumer的消費(fèi)offset是當(dāng)前kafka數(shù)據(jù)中的最大偏移量consumer.timeout.ms-1給定當(dāng)consumer多久時(shí)間沒(méi)有消費(fèi)數(shù)據(jù)后,拋出異常;-1表示不拋出異常zookeeper.session.timeout.ms6000zk會(huì)話時(shí)間zookeeper.connection.timeout.ms6000連接zk過(guò)期時(shí)間Kafka提供了兩種Consumer

API,分別是:High

Level

Consumer

API

和Lower

Level

Consumer

API(Simple

Consumer

API)High

Level

溫馨提示

  • 1. 本站所有資源如無(wú)特殊說(shuō)明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請(qǐng)下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請(qǐng)聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁(yè)內(nèi)容里面會(huì)有圖紙預(yù)覽,若沒(méi)有圖紙預(yù)覽就沒(méi)有圖紙。
  • 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
  • 5. 人人文庫(kù)網(wǎng)僅提供信息存儲(chǔ)空間,僅對(duì)用戶上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對(duì)用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對(duì)任何下載內(nèi)容負(fù)責(zé)。
  • 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請(qǐng)與我們聯(lián)系,我們立即糾正。
  • 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時(shí)也不承擔(dān)用戶因使用這些下載資源對(duì)自己和他人造成任何形式的傷害或損失。

評(píng)論

0/150

提交評(píng)論