版權(quán)說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請進(jìn)行舉報(bào)或認(rèn)領(lǐng)
文檔簡介
1、 課程安排課程安排Kafka是什么kafka體系結(jié)構(gòu)kafka設(shè)計(jì)理念簡介*kafka通信協(xié)議kafka的偽分布安裝、集群安裝*kafka的shell操作、java操作*kafka設(shè)計(jì)理念*kafka producer和consumer開發(fā)*Kafka產(chǎn)生背景產(chǎn)生背景 Kafka 是分布式發(fā)布-訂閱消息系統(tǒng)。它最初由 LinkedIn 公司開發(fā),使用 Scala語言編寫,之后成為 Apache 項(xiàng)目的一部分。Kafka 是一個(gè)分布式的,可劃分的,多訂閱者,冗余備份的持久性的日志服務(wù)。它主要用于處理活躍的流式數(shù)據(jù)。 在大數(shù)據(jù)系統(tǒng)中,常常會碰到一個(gè)問題,整個(gè)大數(shù)據(jù)是由各個(gè)子系統(tǒng)組成,數(shù)據(jù)需要在各個(gè)
2、子系統(tǒng)中高性能,低延遲的不停流轉(zhuǎn)。傳統(tǒng)的企業(yè)消息系統(tǒng)并 不是非常適合大規(guī)模的數(shù)據(jù)處理。為了已在同時(shí)搞定在線應(yīng)用(消息)和離線應(yīng)用(數(shù)據(jù)文件,日志)Kafka 就出現(xiàn)了。Kafka 可以起到兩個(gè)作用:降低系統(tǒng)組網(wǎng)復(fù)雜度降低編程復(fù)雜度,各個(gè)子系統(tǒng)不在是相互協(xié)商接口,各個(gè)子系統(tǒng)類似插口插在插座上,Kafka 承擔(dān)高速數(shù)據(jù)總線的作用。kafka系列文章索引:http:/ 每秒可以生產(chǎn)約 25 萬消息(50 MB),每秒處理 55 萬消息(110 MB)??蛇M(jìn)行持久化操作。將消息持久化到磁盤,因此可用于批量消費(fèi),例如 ETL,以及實(shí)時(shí)應(yīng)用程序。通過將數(shù)據(jù)持久化到硬盤以及 replication 防止數(shù)據(jù)
3、丟失。分布式系統(tǒng),易于向外擴(kuò)展。所有的 producer、broker 和 consumer 都會有多個(gè),均為分布式的。無需停機(jī)即可擴(kuò)展機(jī)器。消息被處理的狀態(tài)是在 consumer 端維護(hù),而不是由 server 端維護(hù)。當(dāng)失敗時(shí)能自動(dòng)平衡。支持 online 和 offline 的場景。Kafka的簡介的簡介設(shè)計(jì)關(guān)注重點(diǎn):為生產(chǎn)者和消費(fèi)者提供一個(gè)通用的API消息的持久化高吞吐量,可以滿足百萬級別消息處理對分布式和高擴(kuò)展性的支持kafka最基本的架構(gòu)是生產(chǎn)者發(fā)布一個(gè)消息到Kafka的一個(gè)主題(topic),這個(gè)主題即是由扮演KafkaServer角色的broker提供,消費(fèi)者訂閱這個(gè)主題,然后
4、從中獲取消息.Kafka是如何解決查找效率的的問題呢?Kafka的兩大法寶的兩大法寶數(shù)據(jù)文件的分段:Kafka解決查詢效率的手段之一是將數(shù)據(jù)文件分段;為數(shù)據(jù)文件建索引:索引優(yōu)化:索引優(yōu)化:稀疏存儲,每隔一定字節(jié)的數(shù)據(jù)建立一條索引。消息隊(duì)列分類消息隊(duì)列分類點(diǎn)對點(diǎn)點(diǎn)對點(diǎn):消息生產(chǎn)者生產(chǎn)消息發(fā)送到queue中,然后消息消費(fèi)者從queue中取出并且消費(fèi)消息。注意:消息被消費(fèi)以后,queue中不再有存儲,所以消息消費(fèi)者不可能消費(fèi)到已經(jīng)被消費(fèi)的消息。Queue支持存在多個(gè)消費(fèi)者,但是對一個(gè)消息而言,只會有一個(gè)消費(fèi)者可以消費(fèi)。發(fā)布發(fā)布/訂閱訂閱:消息生產(chǎn)者(發(fā)布)將消息發(fā)布到topic中,同時(shí)有多個(gè)消息消費(fèi)
5、者(訂閱)消費(fèi)該消息。和點(diǎn)對點(diǎn)方式不同,發(fā)布到topic的消息會被所有訂閱者消費(fèi)。消息隊(duì)列消息隊(duì)列MQ對比對比RabbitMQ:支持的協(xié)議多,非常重量級消息隊(duì)列,對路由(Routing),負(fù)載均衡(Load balance)或者數(shù)據(jù)持久化都有很好的支持。ZeroMQ:號稱最快的消息隊(duì)列系統(tǒng),尤其針對大吞吐量的需求場景,擅長的高級/復(fù)雜的隊(duì)列,但是技術(shù)也復(fù)雜,并且只提供非持久性的隊(duì)列。ActiveMQ:Apache下的一個(gè)子項(xiàng),類似ZeroMQ,能夠以代理人和點(diǎn)對點(diǎn)的技術(shù)實(shí)現(xiàn)隊(duì)列。Redis:是一個(gè)key-Value的NOSql數(shù)據(jù)庫,但也支持MQ功能,數(shù)據(jù)量較小,性能優(yōu)于RabbitMQ,數(shù)據(jù)
6、超過10K就慢的無法忍受Kafka部署架構(gòu)部署架構(gòu)Kafka集群架構(gòu)集群架構(gòu)Kafka的基本概念的基本概念 Topic:特指 Kafka 處理的消息源(feeds of messages)的不同分類。 Partition:Topic 物理上的分組,一個(gè) topic 可以分為多個(gè) partition,每個(gè) partition 是一個(gè)有序的隊(duì)列。partition 中的每條消息都會被分配一個(gè)有序的 id(offset)。 Message:消息,是通信的基本單位,每個(gè) producer 可以向一個(gè) topic(主題)發(fā)布一些消息。 Producers:消息和數(shù)據(jù)生產(chǎn)者,向 Kafka 的一個(gè) top
7、ic 發(fā)布消息的過程叫做 producers。 Consumers:消息和數(shù)據(jù)消費(fèi)者,訂閱 topics 并處理其發(fā)布的消息的過程叫做 consumers。 Broker:緩存代理,Kafka 集群中的一臺或多臺服務(wù)器統(tǒng)稱為 broker。Kafka的的ProducersProducer將消息發(fā)布到指定的Topic中,同時(shí)Producer也能決定將此消息歸屬于哪個(gè)partition;比如基于round-robin方式或者通過其他的一些算法等.消息和數(shù)據(jù)生產(chǎn)者,向 Kafka 的一個(gè) topic 發(fā)布消息的過程叫做 producers。異步發(fā)送批量發(fā)送可以很有效的提高發(fā)送效率。Kafka pro
8、ducer的異步發(fā)送模式允許進(jìn)行批量發(fā)送,先將消息緩存在內(nèi)存中,然后一次請求批量發(fā)送出去。Kafka的的BrokerBroker:緩存代理,Kafka 集群中的一臺或多臺服務(wù)器統(tǒng)稱為 broker。為了減少磁盤寫入的次數(shù),broker會將消息暫時(shí)buffer起來,當(dāng)消息的個(gè)數(shù)(或尺寸)達(dá)到一定閥值時(shí),再flush到磁盤,這樣減少了磁盤IO調(diào)用的次數(shù)。Kafka的的broker無狀態(tài)機(jī)制無狀態(tài)機(jī)制1. Broker沒有副本機(jī)制,一旦broker宕機(jī),該broker的消息將都不可用。2. Broker不保存訂閱者的狀態(tài),由訂閱者自己保存。3. 無狀態(tài)導(dǎo)致消息的刪除成為難題(可能刪除的消息正在被訂閱
9、),kafka采用基于時(shí)間的SLA(服務(wù)水平保證),消息保存一定時(shí)間(通常為7天)后會被刪除。4. 消息訂閱者可以rewind back到任意位置重新進(jìn)行消費(fèi),當(dāng)訂閱者故障時(shí),可以選擇最小的offset(id)進(jìn)行重新讀取消費(fèi)消息。Kafka的的Consumers消息和數(shù)據(jù)消費(fèi)者,訂閱 topics 并處理其發(fā)布的消息的過程叫做 consumers。本質(zhì)上kafka只支持Topic.每個(gè)consumer屬于一個(gè)consumer group;反過來說,每個(gè)group中可以有多個(gè)consumer.發(fā)送到Topic的消息,只會被訂閱此Topic的每個(gè)group中的一個(gè)consumer消費(fèi).在 kaf
10、ka中,我們 可以認(rèn)為一個(gè)group是一個(gè)訂閱者,一個(gè)Topic中的每個(gè)partions,只會被一個(gè)訂閱者中的一個(gè)consumer消費(fèi),不過一個(gè) consumer可以消費(fèi)多個(gè)partitions中的消息.kafka只能保證一個(gè)partition中的消息被某個(gè)consumer消費(fèi)時(shí),消息是順 序的.事實(shí)上,從Topic角度來說,消息仍不是有序的.注: kafka的設(shè)計(jì)原理決定,對于一個(gè)topic,同一個(gè)group中不能有多于partitions個(gè)數(shù)的consumer同時(shí)消費(fèi),否則將意味著某些consumer將無法得到消息.Kafka的的Consumer group1. 允許consumer gr
11、oup(包含多個(gè)consumer,如一個(gè)集群同時(shí)消費(fèi))對一個(gè)topic進(jìn)行消費(fèi),不同的consumer group之間獨(dú)立訂閱。2. 為了對減小一個(gè)consumer group中不同consumer之間的分布式協(xié)調(diào)開銷,指定partition為最小的并行消費(fèi)單位,即一個(gè)group內(nèi)的consumer只能消費(fèi)不同的partition。Kafka的的Topics/Log一個(gè)Topic可以認(rèn)為是一類消息,每個(gè)topic將被分成多partition(區(qū)),每個(gè)partition在存儲層面是append log文件。任何發(fā)布到此partition的消息都會被直接追加到log文件的尾部,每條消息在文件中的
12、位置稱為offset(偏移量),partition是以文件的形式存儲在文件系統(tǒng)中。Logs文件根據(jù)broker中的配置要求,保留一定時(shí)間后刪除來釋放磁盤空間。Partition:Topic 物理上的分組,一個(gè) topic 可以分為多個(gè) partition,每個(gè) partition 是一個(gè)有序的隊(duì)列。 partition 中的每條消息都會被分配一個(gè)有序的 id(offset)。Kafka的的partitions 設(shè)計(jì)目的設(shè)計(jì)目的:kafka基于文件存儲.通過分區(qū),可以將日志內(nèi)容分散到多個(gè)server上,來避免文件尺寸達(dá)到單機(jī)磁盤的上限,每個(gè)partiton都會被當(dāng)前server(kafka實(shí)例)
13、保存;可以將一個(gè)topic切分多任意多個(gè)partitions,來消息保存/消費(fèi)的效率.越多的partitions意味著可以容納更多的consumer,有效提升并發(fā)消費(fèi)的能力.Kafka的的MessageMessage消息:是通信的基本單位,每個(gè) producer 可以向一個(gè) topic(主題)發(fā)布一些消息。Kafka中的Message是以topic為基本單位組織的,不同的topic之間是相互獨(dú)立的。每個(gè)topic又可以分成幾個(gè)不同的partition(每個(gè)topic有幾個(gè)partition是在創(chuàng)建topic時(shí)指定的),每個(gè)partition存儲一部分Message。partition中的每條M
14、essage包含了以下三個(gè)屬性:offset對應(yīng)類型:longMessageSize對應(yīng)類型:int32data是message的具體內(nèi)容Kafka的的MessageKafka的的offset每條消息在文件中的位置稱為offset(偏移量)。offset 為一個(gè)long型數(shù)字,它是唯一標(biāo)記一條消息。它唯一的標(biāo)記一條消息。kafka并沒有提供其他額外的索引機(jī)制來存儲offset,因?yàn)樵趉afka中幾 乎不允許對消息進(jìn)行“隨機(jī)讀寫”。Partition中的每條Message由offset來表示它在這個(gè)partition中的偏移量,這個(gè)offset不是該Message在partition數(shù)據(jù)文件中的
15、實(shí)際存儲位置,而是邏輯上一個(gè)值,它唯一確定了partition中的一條Message。因此,可以認(rèn)為offset是partition中Message的id。Kafka的的 offset怎樣記錄每個(gè)consumer處理的信息的狀態(tài)?在Kafka中僅保存了每個(gè)consumer已經(jīng)處理數(shù)據(jù)的offset。這樣有兩個(gè)好處:1)保 存的數(shù)據(jù)量少 2)當(dāng)consumer出錯(cuò)時(shí),重新啟動(dòng)consumer處理數(shù)據(jù)時(shí),只需從最近的offset開始處理數(shù)據(jù)即可。Kafka的消息處理機(jī)制的消息處理機(jī)制 1. 發(fā)送到partitions中的消息將會按照它接收的順序追加到日志中 2. 對于消費(fèi)者而言,它們消費(fèi)消息的順序
16、和日志中消息順序一致. 3. 如果Topic的replication factor為N,那么允許N-1個(gè)kafka實(shí)例失效.Kafka的消息處理機(jī)制的消息處理機(jī)制4. kafka對消息的重復(fù)、丟失、錯(cuò)誤以及順序型沒有嚴(yán)格的要求。5. kafka提供at-least-once delivery,即當(dāng)consumer宕機(jī)后,有些消息可能會被重復(fù)delivery。6. 因每個(gè)partition只會被consumergroup內(nèi)的一個(gè)consumer消費(fèi),故kafka保證每個(gè)partition內(nèi)的消息會被順序的訂閱。7. Kafka為每條消息為每條消息計(jì)算CRC校驗(yàn),用于錯(cuò)誤檢測,crc校驗(yàn)不通過的消
17、息會直接被丟棄掉。ack校驗(yàn),當(dāng)消費(fèi)者消費(fèi)成功,返回ack信息!數(shù)據(jù)傳輸?shù)氖聞?wù)定義數(shù)據(jù)傳輸?shù)氖聞?wù)定義at most once: 最多一次,這個(gè)和JMS中非持久化消息類似.發(fā)送一次,無論成敗,將不會重發(fā).at least once: 消息至少發(fā)送一次,如果消息未能接受成功,可能會重發(fā),直到接收成功.exactly once: 消息只會發(fā)送一次.at most once: 消費(fèi)者fetch消息,然后保存offset,然后處理消息;當(dāng)client保存offset之后,但是在消息處理過程中出現(xiàn)了異常,導(dǎo)致部分消息未能繼續(xù)處理.那么此后未處理的消息將不能被fetch到,這就是at most once.a
18、t least once: 消費(fèi)者fetch消息,然后處理消息,然后保存offset.如果消息處理成功之后,但是在保存offset階段zookeeper異常導(dǎo)致保存操作未能執(zhí)行成功,這就導(dǎo)致接下來再次fetch時(shí)可能獲得上次已經(jīng)處理過的消息,這就是at least once,原因offset沒有及時(shí)的提交給zookeeper,zookeeper恢復(fù)正常還是之前offset狀態(tài).exactly once: kafka中并沒有嚴(yán)格的去實(shí)現(xiàn)(基于2階段提交,事務(wù)),我們認(rèn)為這種策略在kafka中是沒有必要的.注:通常情況下at-least-once是我們首選.(相比at most once而言,重復(fù)
19、接收數(shù)據(jù)總比丟失數(shù)據(jù)要好).Kafka的儲存策略的儲存策略1. kafka以topic來進(jìn)行消息管理,每個(gè)topic包含多個(gè)part(ition),每個(gè)part對應(yīng)一個(gè)邏輯log,有多個(gè)segment組成。2. 每個(gè)segment中存儲多條消息(見下圖),消息id由其邏輯位置決定,即從消息id可直接定位到消息的存儲位置,避免id到位置的額外映射。3.broker 收到發(fā)布消息往對應(yīng) partition 的最后一個(gè) segment 上添加該消息,Kafka的儲存策略的儲存策略4. 每個(gè)part在內(nèi)存中對應(yīng)一個(gè)index,記錄每個(gè)segment中的第一條消息偏移。5. 發(fā)布者發(fā)到某個(gè)topic的
20、消息會被均勻的分布到多個(gè)part上(隨機(jī)或根據(jù)用戶指定的回調(diào)函數(shù)進(jìn)行分布),broker收到發(fā)布消息往對應(yīng)part的最后一個(gè)segment上添加 該消息,當(dāng)某個(gè)segment上的消息條數(shù)達(dá)到配置值或消息發(fā)布時(shí)間超過閾值時(shí),segment上的消息會被flush到磁盤,只有flush到磁盤上的 消息訂閱者才能訂閱到,segment達(dá)到一定的大小后將不會再往該segment寫數(shù)據(jù),broker會創(chuàng)建新的segment。Kafka的的數(shù)據(jù)傳輸數(shù)據(jù)傳輸1. 發(fā)布者每次可發(fā)布多條消息(將消息加到一個(gè)消息集合中發(fā)布), sub每次迭代一條消息。2. 不創(chuàng)建單獨(dú)的cache,使用系統(tǒng)的page cache。發(fā)
21、布者順序發(fā)布,訂閱者通常比發(fā)布者滯后一點(diǎn)點(diǎn),直接使用linux的page cache效果也比較后,同時(shí)減少了cache管理及垃圾收集的開銷。3. 使用sendfile優(yōu)化網(wǎng)絡(luò)傳輸,減少一次內(nèi)存拷貝。Kafka的消息發(fā)送的流程的消息發(fā)送的流程由于 kafka broker 會持久化數(shù)據(jù),broker 沒有內(nèi)存壓力,因此,consumer 非常適合采取 pull 的方式消費(fèi)數(shù)據(jù)Producer 向Kafka(push)推數(shù)據(jù)consumer 從kafka 拉(pull)數(shù)據(jù)。kafka的消息發(fā)送的流程的消息發(fā)送的流程消息處理的優(yōu)勢消息處理的優(yōu)勢:簡化 kafka 設(shè)計(jì)consumer 根據(jù)消費(fèi)能力
22、自主控制消息拉取速度consumer 根據(jù)自身情況自主選擇消費(fèi)模式,例如批量,重復(fù)消費(fèi),從尾端開始消費(fèi)等kafka 集群接收到 Producer 發(fā)過來的消息后,將其持久化到硬盤,并保留消息指定時(shí)長(可配置),而不關(guān)注消息是否被消費(fèi)。Kafka設(shè)計(jì)原理實(shí)現(xiàn)設(shè)計(jì)原理實(shí)現(xiàn) 直接使用 linux 文件系統(tǒng)的 cache,來高效緩存數(shù)據(jù)。 顯式分布式,即所有的 producer、broker 和 consumer 都會有多個(gè),均為分布式的。Producer 和 broker 之間沒有負(fù)載均衡機(jī)制。broker 和 consumer 之間利用 zookeeper 進(jìn)行負(fù)載均衡。所有 broker 和 c
23、onsumer 都會在 zookeeper 中進(jìn)行注冊,且 zookeeper 會保存他們的一些元數(shù)據(jù)信息。如果某個(gè) broker 和 consumer 發(fā)生了變化,所有其他的 broker 和 consumer 都會得到通知。Kafka設(shè)計(jì)原理實(shí)現(xiàn)設(shè)計(jì)原理實(shí)現(xiàn) kafka 以 topic 來進(jìn)行消息管理,發(fā)布者發(fā)到某個(gè) topic 的消息會被均勻的分布到多個(gè) partition上每個(gè) topic 包含多個(gè) partition,每個(gè) part 對應(yīng)一個(gè)邏輯 log,有多個(gè) segment 組成。每個(gè) segment 中存儲多條消息,消息 id 由其邏輯位置決定,即從消息 id 可直接定位到消息
24、的存儲位置,避免 id 到位置的額外映射。每個(gè) part 在內(nèi)存中對應(yīng)一個(gè) index,記錄每個(gè) segment 中的第一條消息偏移。當(dāng)某個(gè) segment 上的消息條數(shù)達(dá)到配置值或消息發(fā)布時(shí)間超過閾值時(shí),segment 上的消息會被 flush 到磁盤,只有 flush 到磁盤上的消息訂閱者才能訂閱到,segment 達(dá)到一定的大小后將不會再往該 segment 寫數(shù)據(jù),broker 會創(chuàng)建新的 segment。Kafka的通訊協(xié)議的通訊協(xié)議Kafka的Producer、Broker和Consumer之間采用的是一套自行設(shè)計(jì)基于TCP層的協(xié)議,根據(jù)業(yè)務(wù)需求定制,而非實(shí)現(xiàn)一套類似Protoco
25、l Buffer的通用協(xié)議?;緮?shù)據(jù)類型:基本數(shù)據(jù)類型:定長數(shù)據(jù)類型:定長數(shù)據(jù)類型:int8,int16,int32和int64,對應(yīng)到Java中就是byte, short, int和long。變長數(shù)據(jù)類型:變長數(shù)據(jù)類型:bytes和string。變長的數(shù)據(jù)類型由兩部分組成,分別是一個(gè)有符號整數(shù)N(表示內(nèi)容的長度)和N個(gè)字節(jié)的內(nèi)容。其中,N為-1表示內(nèi)容為null。bytes的長度由int32表示,string的長度由int16表示。數(shù)組:數(shù)組:數(shù)組由兩部分組成,分別是一個(gè)由int32類型的數(shù)字表示的數(shù)組長度N和N個(gè)元素。Kafka的通訊協(xié)議的通訊協(xié)議Kafka通訊的基本單位是Request/
26、Response基本結(jié)構(gòu):RequestOrResponse = MessageSize (RequestMessage | ResponseMessage)通訊過程通訊過程:客戶端打開與服務(wù)器端的Socket往Socket寫入一個(gè)int32的數(shù)字(數(shù)字表示這次發(fā)送的Request有多少字節(jié))服務(wù)器端先讀出一個(gè)int32的整數(shù)從而獲取這次Request的大小然后讀取對應(yīng)字節(jié)數(shù)的數(shù)據(jù)從而得到Request的具體內(nèi)容服務(wù)器端處理了請求后,也用同樣的方式來發(fā)送響應(yīng)。Kafka的通訊協(xié)議的通訊協(xié)議RequestMessage結(jié)構(gòu):RequestMessage = ApiKey ApiVersion C
27、orrelationId ClientId RequestKafka的通訊協(xié)議的通訊協(xié)議ResponseMessage結(jié)構(gòu):ResponseMessage = CorrelationId ResponseKafka采用是經(jīng)典的Reactor(同步IO)模式,也就是1個(gè)Acceptor響應(yīng)客戶端的連接請求,N個(gè)Processor來讀取數(shù)據(jù),這種模式可以構(gòu)建出高 性能的服務(wù)器。Kafka的通訊協(xié)議的通訊協(xié)議Message:Producer生產(chǎn)的消息,鍵-值對Message = Crc MagicByte Attributes Key ValueKafka的通訊協(xié)議的通訊協(xié)議MessageSet:用
28、來組合多條Message,它在每條Message的基礎(chǔ)上加上了Offset和MessageSizeMessageSet = Offset MessageSize MessageKafka的通訊協(xié)議的通訊協(xié)議組件組件關(guān)系關(guān)系Request/Respone和Message/MessageSet的關(guān)系:備注:Kafka的通訊協(xié)議中不含Schema,格式也比較簡單,這樣設(shè)計(jì)的好處是協(xié)議自身的Overhead小,再加上把多條Message放在一起做壓縮,提高壓縮比率,從而在網(wǎng)絡(luò)上傳輸?shù)臄?shù)據(jù)量會少一些。Kafka的分布式實(shí)現(xiàn)的分布式實(shí)現(xiàn) 一個(gè)Topic的多個(gè)partitions,被分布在kafka集群中的
29、多個(gè)server上;每個(gè)server(kafka實(shí)例)負(fù)責(zé)partitions中消息的讀寫操作; 此外kafka還可以配置partitions需要備份的個(gè)數(shù)(replicas),每個(gè)partition將會被備份到多臺機(jī)器上,以提高可用性; 基于replicated方案,那么就意味著需要對多個(gè)備份進(jìn)行調(diào)度; 每個(gè)partition都有一個(gè)server為leader;leader負(fù)責(zé)所有的讀寫操作,如果leader失效,那么將會有其他follower來接管(成為新的leader); follower只是單調(diào)的和leader跟進(jìn),同步消息即可.由此可見作為leader的server承載了全部的請求壓
30、力,因此從集群的整體考慮,有多少個(gè)partitions就意味著有多少個(gè)leader; kafka會將leader均衡的分散在每個(gè)實(shí)例上,來確保整體的性能穩(wěn)定.Kafka數(shù)據(jù)持久化數(shù)據(jù)持久化數(shù)據(jù)持久化:發(fā)現(xiàn)線性的訪問磁盤,很多時(shí)候比隨機(jī)的內(nèi)存訪問快得多傳統(tǒng)的使用內(nèi)存做為磁盤的緩存Kafka直接將數(shù)據(jù)寫入到日志文件中日志數(shù)據(jù)持久化特性:寫操作:通過將數(shù)據(jù)追加到文件中實(shí)現(xiàn)讀操作:讀的時(shí)候從文件中讀就好了對比JVM特性:Java對象占用空間是非常大的,差不多是要存儲的數(shù)據(jù)的兩倍甚至更高隨著堆中數(shù)據(jù)量的增加,垃圾回收回變的越來越困難優(yōu)勢:讀操作不會阻塞寫操作和其他操作,數(shù)據(jù)大小不對性能產(chǎn)生影響; 沒有容
31、量限制(相對于內(nèi)存來說)的硬盤空間建立消息系統(tǒng); 線性訪問磁盤,速度快,可以保存任意一段時(shí)間!Kafka安裝安裝下載/downloads.html解壓tar -zxvf kafka_2.10-.tgz啟動(dòng)服務(wù)首先啟動(dòng)zookeeper服務(wù)bin/zookeeper-server-start.sh config/perties啟動(dòng)Kafkabin/kafka-server-start.sh config/perties /dev/null 2&1 &創(chuàng)建topic創(chuàng)建一個(gè)tes
32、t的topic,一個(gè)分區(qū)一個(gè)副本bin/kafka-topics.sh -create -zookeeper localhost:2181 -replication-factor 1 -partitions 1 -topic test查看主題bin/kafka-topics.sh -list -zookeeper localhost:2181查看主題詳情bin/kafka-topics.sh -describe -zookeeper localhost:2181 -topic test刪除主題bin/kafka-topics.sh -zookeeper localhost:2181 -dele
33、te -topic testKafka客戶端操作客戶端操作創(chuàng)建生產(chǎn)者 producerbin/kafka-console-producer.sh -broker-list localhost:9092 -topic test 創(chuàng)建消費(fèi)者 consumerbin/kafka-console-consumer.sh -zookeeper localhost:2181 -topic test -from-beginning參數(shù)使用幫組信息查看:生產(chǎn)者參數(shù)查看:bin/kafka-console-producer.sh消費(fèi)者參數(shù)查看:bin/kafka-console-consumer.shKafka
34、多多broker部署部署修改config/pertiesbroker.id=0port=9020log.dirs=/tmp/kafka0-logs復(fù)制perties生成pertiesbroker.id=1#id不能一樣port=9040#port不能一樣log.dirs=/tmp/kafka1-logs啟動(dòng)多個(gè)brokerbin/kafka-server-start.sh config/perties &bin/kafka-server-start.sh config/pe
35、rties &創(chuàng)建主題bin/kafka-topics.sh -create -zookeeper localhost:2181 -replication-factor 3 -partitions 1 -topic testkafka集群安裝集群安裝安裝zk集群修改配置文件broker.id: 唯一,填數(shù)字:唯一,填服務(wù)器zookeeper.connect=34:2181,32:2181,33:2181Kafka的核心配置的核心配置perties配置詳情見注釋broker.id
36、=work.threads=2num.io.threads=8socket.send.buffer.bytes=1048576socket.receive.buffer.bytes=1048576socket.request.max.bytes=104857600log.dirs=/tmp/kafka-logsnum.partitions=2log.retention.hours=168log.segment.bytes=536870912erval.ms=60000log.cleaner.enable=falsezookeeper.connect
37、=localhost:2181zookeeper.connection.timeout.ms=1000000Kafka的一致性的一致性MQ要實(shí)現(xiàn)從producer到consumer之間的可靠的消息傳送和分發(fā)。傳統(tǒng)的MQ系統(tǒng)通常都是通過broker和consumer間的確認(rèn) (ack)機(jī)制實(shí)現(xiàn)的,并在broker保存消息分發(fā)的狀態(tài)。即使這樣一致性也是很難保證的(當(dāng)然kafka也支持ack)。kafka保證一致性的做法是由 consumer自己保存狀態(tài),也不要任何確認(rèn)。這樣雖然consumer負(fù)擔(dān)更重,但其實(shí)更靈活了。因?yàn)椴还躢onsumer上任何原因?qū)е滦枰匦绿?理消息,都可以再次從broke
38、r獲得。 Kafka的高可用性的高可用性Kafaka可以將log文件復(fù)制到其他topic的分隔點(diǎn)(可以看成是server)。當(dāng)一個(gè)server在集群中fails,可以允許自動(dòng)的failover到其他的復(fù)制的server,所以消息可以繼續(xù)存在在這種情況下。Kafka的的zero-copy采用 linux Zero-Copy 提高發(fā)送性能。傳統(tǒng)的數(shù)據(jù)發(fā)送需要發(fā)送 4 次上下文切換,采用 sendfile 系統(tǒng)調(diào)用之后,數(shù)據(jù)直接在內(nèi)核態(tài)交換,系統(tǒng)上下文切換減少為 2 次。根據(jù)測試結(jié)果,可以提高 60% 的數(shù)據(jù)發(fā)送性能。Kafka的的zero-copy 在Kafka上,有兩個(gè)原因可能導(dǎo)致低效:1)太多
39、的網(wǎng)絡(luò)請求 2)過多的字節(jié)拷貝。為了提高效率,Kafka把message分成一組一組的,每次請求會把一組message發(fā)給相應(yīng)的consumer。 此外, 為了減少字節(jié)拷貝,采用了sendfile系統(tǒng)調(diào)用。為了理解sendfile原理,先說一下傳統(tǒng)的利用socket發(fā)送文件要進(jìn)行拷貝Sendfile系統(tǒng)調(diào)用Kafka的負(fù)載均衡的負(fù)載均衡Producer和broker之間沒有負(fù)載均衡機(jī)制。 負(fù)載均衡可以分為兩個(gè)部分:producer發(fā)消息的負(fù)載均衡和consumer讀消息的負(fù)載均衡。producer有一個(gè)到當(dāng)前所有broker的連接池,當(dāng)一個(gè)消息需要發(fā)送時(shí),需要決定發(fā)到哪個(gè)broker(即par
40、tition)。consumer讀取消息時(shí),除了考慮當(dāng)前的broker情況外,還要考慮其他consumer的情況,才能決定從哪個(gè)partition讀取消息。多個(gè) partition 需要選取出 lead partition,lead partition 負(fù)責(zé)讀寫,broker和consumer之間利用zookeeper進(jìn)行負(fù)載均衡。所有broker和consumer都會在zookeeper中進(jìn)行注冊,且 zookeeper會保存他們的一些元數(shù)據(jù)信息。如果某個(gè)broker和consumer發(fā)生了變化,所有其他的broker和consumer都會得到 通知。Kafka 可擴(kuò)展性可擴(kuò)展性當(dāng)需要增加
41、broker 結(jié)點(diǎn)時(shí),新增的 broker 會向 zookeeper 注冊,而 producer 及 consumer 會根據(jù)注冊在 zookeeper 上的 watcher 感知這些變化,并及時(shí)作出調(diào)整,這樣就保證了添加或去除broker時(shí),各broker間仍能自動(dòng)實(shí)現(xiàn)負(fù)載均衡。Kafka的的Zookeeper協(xié)調(diào)控制協(xié)調(diào)控制1. 管理broker與consumer的動(dòng)態(tài)加入與離開。2. 觸發(fā)負(fù)載均衡,當(dāng)broker或consumer加入或離開時(shí)會觸發(fā)負(fù)載均衡算法,使得一個(gè)consumer group內(nèi)的多個(gè)consumer的訂閱負(fù)載平衡。3. 維護(hù)消費(fèi)關(guān)系及每個(gè)partion的消費(fèi)信息。Z
42、ookeeper上的細(xì)節(jié):1. 每個(gè)broker啟動(dòng)后會在zookeeper上注冊一個(gè)臨時(shí)的broker registry,包含broker的ip地址和端口號,所存儲的topics和partitions信息。2. 每個(gè)consumer啟動(dòng)后會在zookeeper上注冊一個(gè)臨時(shí)的consumer registry:包含consumer所屬的consumer group以及訂閱的topics。3. 每個(gè)consumer group關(guān) 聯(lián)一個(gè)臨時(shí)的owner registry和一個(gè)持久的offset registry。對于被訂閱的每個(gè)partition包含一個(gè)owner registry,內(nèi)容為訂閱這個(gè)partition的consumer id;同時(shí)包含一個(gè)offset registry,內(nèi)容為上一次訂閱的offset。kafka java操作操作生產(chǎn)者消費(fèi)者pom依賴org.ap
溫馨提示
- 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
- 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
- 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內(nèi)容里面會有圖紙預(yù)覽,若沒有圖紙預(yù)覽就沒有圖紙。
- 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
- 5. 人人文庫網(wǎng)僅提供信息存儲空間,僅對用戶上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對任何下載內(nèi)容負(fù)責(zé)。
- 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請與我們聯(lián)系,我們立即糾正。
- 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時(shí)也不承擔(dān)用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。
最新文檔
- 數(shù)學(xué)期中教學(xué)總結(jié)1500字
- 小學(xué)生迎元旦的演講稿范文(31篇)
- Ketoconazole-Standard-生命科學(xué)試劑-MCE
- Isovalerylcarnitine-Standard-生命科學(xué)試劑-MCE
- 英文市場波動(dòng)對馬來西亞酒店效率的影響
- 廣告材料運(yùn)輸安裝合同模板
- 博物館石材運(yùn)送合同樣本
- 化妝品批量配送合同樣本
- 冷凍食品物流服務(wù)合同
- 保健品快遞合同印花稅稅率
- 中小學(xué)幼兒園教師專業(yè)發(fā)展示范學(xué)校建設(shè)標(biāo)準(zhǔn)(試行)
- 屋頂光伏項(xiàng)目施工方案范本
- 兒童歌曲彈唱課程標(biāo)準(zhǔn)
- 基于區(qū)域分割的三維自由曲面相似性評價(jià)算法
- 小學(xué)道德與法治人教六年級上冊我們是公民我是中國公民全國優(yōu)質(zhì)課
- 《無人機(jī)組裝與調(diào)試》第5章-多旋翼無人機(jī)調(diào)試
- 臨床用血執(zhí)行情況自查表
- 2023年02月江西省九江市八里湖新區(qū)公開招考50名城市社區(qū)工作者(專職網(wǎng)格員)參考題庫+答案詳解
- 七度空間消費(fèi)者研究總報(bào)告(Y-1012)
- 醫(yī)學(xué)英語翻譯題匯總
- 解析人體的奧秘智慧樹知到答案章節(jié)測試2023年浙江中醫(yī)藥大學(xué)
評論
0/150
提交評論