![消息隊(duì)列:Pulsar:Pulsar的訂閱模式與消息重試_第1頁(yè)](http://file4.renrendoc.com/view14/M0A/02/08/wKhkGWbsxTaAIlWYAAJy30OdKmI362.jpg)
![消息隊(duì)列:Pulsar:Pulsar的訂閱模式與消息重試_第2頁(yè)](http://file4.renrendoc.com/view14/M0A/02/08/wKhkGWbsxTaAIlWYAAJy30OdKmI3622.jpg)
![消息隊(duì)列:Pulsar:Pulsar的訂閱模式與消息重試_第3頁(yè)](http://file4.renrendoc.com/view14/M0A/02/08/wKhkGWbsxTaAIlWYAAJy30OdKmI3623.jpg)
![消息隊(duì)列:Pulsar:Pulsar的訂閱模式與消息重試_第4頁(yè)](http://file4.renrendoc.com/view14/M0A/02/08/wKhkGWbsxTaAIlWYAAJy30OdKmI3624.jpg)
![消息隊(duì)列:Pulsar:Pulsar的訂閱模式與消息重試_第5頁(yè)](http://file4.renrendoc.com/view14/M0A/02/08/wKhkGWbsxTaAIlWYAAJy30OdKmI3625.jpg)
版權(quán)說(shuō)明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請(qǐng)進(jìn)行舉報(bào)或認(rèn)領(lǐng)
文檔簡(jiǎn)介
消息隊(duì)列:Pulsar:Pulsar的訂閱模式與消息重試1消息隊(duì)列基礎(chǔ)1.1消息隊(duì)列的定義消息隊(duì)列是一種應(yīng)用程序間的通信方法,它允許消息的發(fā)送者不會(huì)因?yàn)榻邮照邥簳r(shí)無(wú)法處理消息而阻塞。消息隊(duì)列通過(guò)在消息的生產(chǎn)者和消費(fèi)者之間提供一個(gè)緩沖區(qū),實(shí)現(xiàn)了異步通信和解耦。消息隊(duì)列可以處理大量并發(fā)消息,提高系統(tǒng)的響應(yīng)速度和吞吐量,同時(shí)還能保證消息的可靠傳輸。1.2消息隊(duì)列的作用消息隊(duì)列在現(xiàn)代軟件架構(gòu)中扮演著關(guān)鍵角色,主要作用包括:異步處理:允許生產(chǎn)者和消費(fèi)者異步操作,提高系統(tǒng)響應(yīng)速度。負(fù)載均衡:通過(guò)消息隊(duì)列,可以將任務(wù)均勻地分配給多個(gè)消費(fèi)者,實(shí)現(xiàn)負(fù)載均衡。故障恢復(fù):消息隊(duì)列可以持久化消息,即使消費(fèi)者失敗,消息也不會(huì)丟失,可以重新處理。解耦:生產(chǎn)者和消費(fèi)者不需要直接通信,降低了系統(tǒng)的耦合度,提高了系統(tǒng)的可維護(hù)性和可擴(kuò)展性。1.3Pulsar簡(jiǎn)介ApachePulsar是一個(gè)高性能、可擴(kuò)展的分布式消息隊(duì)列系統(tǒng)。它提供了消息持久化、分層存儲(chǔ)、多租戶、全球地理復(fù)制等功能,使其成為構(gòu)建現(xiàn)代消息隊(duì)列和流處理應(yīng)用的理想選擇。Pulsar的設(shè)計(jì)目標(biāo)是提供一個(gè)統(tǒng)一的平臺(tái),支持消息隊(duì)列和流處理兩種模式,同時(shí)保持高性能和低延遲。1.3.1Pulsar的架構(gòu)Pulsar采用了一種分層的架構(gòu),主要包括:Broker:負(fù)責(zé)消息的路由和管理,處理客戶端的請(qǐng)求。BookKeeper:提供消息的持久化存儲(chǔ),保證消息的可靠性和持久性。ZooKeeper:用于協(xié)調(diào)集群中的Broker,管理集群的元數(shù)據(jù)。1.3.2Pulsar的特性持久化存儲(chǔ):Pulsar使用BookKeeper來(lái)存儲(chǔ)消息,保證消息的持久化和可靠性。多租戶:Pulsar支持多租戶,每個(gè)租戶可以有自己的命名空間和主題。全球地理復(fù)制:Pulsar支持跨地域的復(fù)制,可以將消息復(fù)制到全球的多個(gè)數(shù)據(jù)中心,提高系統(tǒng)的可用性和容災(zāi)能力。分層存儲(chǔ):Pulsar支持冷熱數(shù)據(jù)的分層存儲(chǔ),可以將熱點(diǎn)數(shù)據(jù)存儲(chǔ)在高速的SSD上,將冷數(shù)據(jù)存儲(chǔ)在低成本的HDD上,以優(yōu)化存儲(chǔ)成本和性能。1.3.3Pulsar的使用示例以下是一個(gè)使用Python客戶端向Pulsar主題發(fā)送消息的示例:frompulsarimportClient
#創(chuàng)建Pulsar客戶端
client=Client('pulsar://localhost:6650')
#創(chuàng)建生產(chǎn)者
producer=client.create_producer('persistent://public/default/my-topic')
#發(fā)送消息
foriinrange(10):
producer.send(('HelloPulsar%d'%i).encode('utf-8'))
#關(guān)閉客戶端
client.close()在這個(gè)示例中,我們首先創(chuàng)建了一個(gè)Pulsar客戶端,然后創(chuàng)建了一個(gè)生產(chǎn)者,用于向主題my-topic發(fā)送消息。我們發(fā)送了10條消息,每條消息的內(nèi)容都是HelloPulsar加上一個(gè)數(shù)字。最后,我們關(guān)閉了客戶端。1.3.4Pulsar的訂閱模式Pulsar支持兩種訂閱模式:獨(dú)占訂閱(Exclusive)和共享訂閱(Shared)。獨(dú)占訂閱:一個(gè)主題只能有一個(gè)消費(fèi)者訂閱,如果多個(gè)消費(fèi)者訂閱了同一個(gè)主題,只有其中一個(gè)消費(fèi)者可以接收消息。共享訂閱:一個(gè)主題可以有多個(gè)消費(fèi)者訂閱,消息會(huì)被均勻地分配給所有消費(fèi)者。1.3.5Pulsar的消息重試Pulsar提供了消息重試機(jī)制,當(dāng)消費(fèi)者無(wú)法處理消息時(shí),消息會(huì)被重新發(fā)送給消費(fèi)者。消息重試的次數(shù)和間隔可以通過(guò)配置來(lái)控制。例如,以下是一個(gè)使用Java客戶端配置消息重試的示例:importorg.apache.pulsar.client.api.Consumer;
importorg.apache.pulsar.client.api.Message;
importorg.apache.pulsar.client.api.PulsarClient;
importorg.apache.pulsar.client.api.PulsarClientException;
publicclassRetryConsumer{
publicstaticvoidmain(String[]args)throwsPulsarClientException{
PulsarClientclient=PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();
Consumer<String>consumer=client.newConsumer()
.topic("persistent://public/default/my-topic")
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Shared)
.negativeAckRedeliveryDelay(10,TimeUnit.SECONDS)//設(shè)置消息重試間隔
.maxRedeliveryCount(5)//設(shè)置消息重試次數(shù)
.subscribe();
while(true){
Message<String>msg=consumer.receive();
try{
System.out.println("Receivedmessage:"+msg.getValue());
consumer.acknowledge(msg);
}catch(Exceptione){
consumer.negativeAcknowledge(msg);
}
}
}
}在這個(gè)示例中,我們創(chuàng)建了一個(gè)共享訂閱的消費(fèi)者,設(shè)置了消息重試間隔為10秒,消息重試次數(shù)為5次。當(dāng)消費(fèi)者接收到消息后,如果處理消息時(shí)發(fā)生異常,消費(fèi)者會(huì)調(diào)用negativeAcknowledge方法,將消息標(biāo)記為未處理,Pulsar會(huì)根據(jù)配置的消息重試間隔和次數(shù),重新發(fā)送這條消息給消費(fèi)者。1.3.6總結(jié)Pulsar是一個(gè)功能強(qiáng)大的消息隊(duì)列系統(tǒng),它提供了消息持久化、多租戶、全球地理復(fù)制和分層存儲(chǔ)等功能,支持獨(dú)占訂閱和共享訂閱兩種模式,同時(shí)提供了消息重試機(jī)制,保證了消息的可靠處理。通過(guò)使用Pulsar,可以構(gòu)建出高性能、可擴(kuò)展、可靠的消息隊(duì)列和流處理應(yīng)用。2消息隊(duì)列:Pulsar:深入理解Pulsar的訂閱模式在ApachePulsar消息隊(duì)列中,訂閱模式是消息消費(fèi)的核心機(jī)制之一,它決定了多個(gè)消費(fèi)者如何處理來(lái)自同一主題的消息。Pulsar提供了四種訂閱模式:獨(dú)占訂閱模式、共享訂閱模式、故障轉(zhuǎn)移訂閱模式和鍵共享訂閱模式。每種模式都有其特定的使用場(chǎng)景和優(yōu)勢(shì),下面將詳細(xì)介紹這四種訂閱模式的原理和應(yīng)用場(chǎng)景。2.1獨(dú)占訂閱模式(Exclusive)2.1.1原理在獨(dú)占訂閱模式下,一個(gè)主題只能有一個(gè)活動(dòng)的訂閱者。如果多個(gè)消費(fèi)者嘗試訂閱同一主題,只有第一個(gè)訂閱者能夠成功接收消息,其他訂閱者將被阻止直到第一個(gè)訂閱者斷開(kāi)連接。這種模式確保了消息的順序處理和唯一性。2.1.2示例代碼//創(chuàng)建Pulsar客戶端
PulsarClientclient=PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();
//創(chuàng)建獨(dú)占訂閱
Consumer<String>consumer=client.newConsumer(Schema.STRING)
.topic("persistent://sample/standalone/ns/my-topic")
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Exclusive)
.subscribe();
//消費(fèi)消息
Message<String>msg=consumer.receive();
System.out.println("Receivedmessage:"+msg.getValue());
consumer.acknowledge(msg);2.1.3解釋上述代碼展示了如何在Pulsar中創(chuàng)建一個(gè)獨(dú)占訂閱。SubscriptionType.Exclusive參數(shù)確保了只有創(chuàng)建此訂閱的消費(fèi)者能夠接收消息。如果另一個(gè)消費(fèi)者嘗試使用相同的訂閱名稱訂閱同一主題,它將被阻止直到當(dāng)前訂閱者斷開(kāi)連接。2.2共享訂閱模式(Shared)2.2.1原理共享訂閱模式允許多個(gè)消費(fèi)者同時(shí)訂閱同一主題。消息將被分發(fā)給訂閱者中的任意一個(gè),但不會(huì)重復(fù)發(fā)送給其他訂閱者。這種模式提高了系統(tǒng)的并行處理能力,但不保證消息的順序。2.2.2示例代碼//創(chuàng)建Pulsar客戶端
PulsarClientclient=PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();
//創(chuàng)建共享訂閱
Consumer<String>consumer1=client.newConsumer(Schema.STRING)
.topic("persistent://sample/standalone/ns/my-topic")
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Shared)
.subscribe();
Consumer<String>consumer2=client.newConsumer(Schema.STRING)
.topic("persistent://sample/standalone/ns/my-topic")
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Shared)
.subscribe();
//消費(fèi)消息
Message<String>msg1=consumer1.receive();
System.out.println("Consumer1receivedmessage:"+msg1.getValue());
consumer1.acknowledge(msg1);
Message<String>msg2=consumer2.receive();
System.out.println("Consumer2receivedmessage:"+msg2.getValue());
consumer2.acknowledge(msg2);2.2.3解釋在共享訂閱模式下,多個(gè)消費(fèi)者可以使用相同的訂閱名稱訂閱同一主題。消息將被分發(fā)給任意一個(gè)訂閱者,但不會(huì)重復(fù)發(fā)送給其他訂閱者。這使得系統(tǒng)能夠并行處理消息,提高了處理效率。2.3故障轉(zhuǎn)移訂閱模式(Failover)2.3.1原理故障轉(zhuǎn)移訂閱模式類似于獨(dú)占訂閱,但允許多個(gè)消費(fèi)者訂閱同一主題,每個(gè)消費(fèi)者都有一個(gè)唯一的分區(qū)。當(dāng)一個(gè)消費(fèi)者(分區(qū))失敗時(shí),其未處理的消息將被重新分配給其他消費(fèi)者。這種模式保證了消息的順序處理和高可用性。2.3.2示例代碼//創(chuàng)建Pulsar客戶端
PulsarClientclient=PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();
//創(chuàng)建故障轉(zhuǎn)移訂閱
Consumer<String>consumer1=client.newConsumer(Schema.STRING)
.topic("persistent://sample/standalone/ns/my-topic")
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Failover)
.consumerName("consumer1")
.subscribe();
Consumer<String>consumer2=client.newConsumer(Schema.STRING)
.topic("persistent://sample/standalone/ns/my-topic")
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Failover)
.consumerName("consumer2")
.subscribe();
//消費(fèi)消息
Message<String>msg1=consumer1.receive();
System.out.println("Consumer1receivedmessage:"+msg1.getValue());
consumer1.acknowledge(msg1);
Message<String>msg2=consumer2.receive();
System.out.println("Consumer2receivedmessage:"+msg2.getValue());
consumer2.acknowledge(msg2);2.3.3解釋在故障轉(zhuǎn)移訂閱模式下,每個(gè)消費(fèi)者都有一個(gè)唯一的分區(qū)。當(dāng)一個(gè)消費(fèi)者失敗時(shí),其未處理的消息將被重新分配給其他消費(fèi)者。這確保了即使在消費(fèi)者失敗的情況下,消息也能被正確處理,同時(shí)保持了消息的順序。2.4鍵共享訂閱模式(Key_Shared)2.4.1原理鍵共享訂閱模式允許消息根據(jù)其鍵(key)被分發(fā)到特定的消費(fèi)者。這種模式確保了具有相同鍵的消息總是被同一個(gè)消費(fèi)者處理,即使有多個(gè)消費(fèi)者訂閱同一主題。這在需要根據(jù)消息鍵進(jìn)行一致性處理的場(chǎng)景中非常有用。2.4.2示例代碼//創(chuàng)建Pulsar客戶端
PulsarClientclient=PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();
//創(chuàng)建鍵共享訂閱
Consumer<String>consumer1=client.newConsumer(Schema.STRING)
.topic("persistent://sample/standalone/ns/my-topic")
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Key_Shared)
.consumerName("consumer1")
.subscribe();
Consumer<String>consumer2=client.newConsumer(Schema.STRING)
.topic("persistent://sample/standalone/ns/my-topic")
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Key_Shared)
.consumerName("consumer2")
.subscribe();
//生產(chǎn)者發(fā)送帶有鍵的消息
Producer<String>producer=client.newProducer(Schema.STRING)
.topic("persistent://sample/standalone/ns/my-topic")
.send("message1","key1".getBytes());
producer.send("message2","key2".getBytes());
//消費(fèi)消息
Message<String>msg1=consumer1.receive();
System.out.println("Consumer1receivedmessage:"+msg1.getValue());
Message<String>msg2=consumer2.receive();
System.out.println("Consumer2receivedmessage:"+msg2.getValue());2.4.3解釋在鍵共享訂閱模式下,消息根據(jù)其鍵被分發(fā)到特定的消費(fèi)者。在上述示例中,producer.send方法被用來(lái)發(fā)送帶有鍵的消息。"key1"和"key2"確保了消息將被分發(fā)到不同的消費(fèi)者,具有相同鍵的消息將始終被同一個(gè)消費(fèi)者處理。2.5消息重試在Pulsar中,消息重試機(jī)制允許在消息處理失敗時(shí)重新發(fā)送消息。這可以通過(guò)設(shè)置消息的重試次數(shù)和重試策略來(lái)實(shí)現(xiàn),確保了消息的可靠處理。2.5.1示例代碼//創(chuàng)建Pulsar客戶端
PulsarClientclient=PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();
//創(chuàng)建消費(fèi)者并設(shè)置消息重試策略
Consumer<String>consumer=client.newConsumer(Schema.STRING)
.topic("persistent://sample/standalone/ns/my-topic")
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Shared)
.negativeAckRedeliveryDelay(1,TimeUnit.MINUTES)//設(shè)置消息重試延遲
.subscribe();
//消費(fèi)消息
Message<String>msg=consumer.receive();
try{
System.out.println("Receivedmessage:"+msg.getValue());
//模擬消息處理失敗
if(msg.getValue().equals("message1")){
consumer.negativeAcknowledge(msg);
}else{
consumer.acknowledge(msg);
}
}catch(Exceptione){
consumer.negativeAcknowledge(msg);
}2.5.2解釋在上述代碼中,negativeAckRedeliveryDelay方法被用來(lái)設(shè)置消息重試的延遲時(shí)間。當(dāng)消息處理失敗時(shí),消費(fèi)者調(diào)用negativeAcknowledge方法,這將導(dǎo)致消息在指定的延遲時(shí)間后被重新發(fā)送。這種機(jī)制確保了即使在處理失敗的情況下,消息也能被重新嘗試處理,提高了消息處理的可靠性。通過(guò)理解Pulsar的訂閱模式和消息重試機(jī)制,開(kāi)發(fā)者可以更有效地設(shè)計(jì)和實(shí)現(xiàn)消息處理系統(tǒng),確保消息的正確處理和系統(tǒng)的高可用性。3消息重試機(jī)制3.1消息重試的重要性在分布式系統(tǒng)中,消息隊(duì)列如ApachePulsar扮演著關(guān)鍵角色,用于在服務(wù)之間傳遞消息。然而,網(wǎng)絡(luò)延遲、服務(wù)故障或消費(fèi)者處理邏輯的復(fù)雜性可能導(dǎo)致消息處理失敗。消息重試機(jī)制是確保消息至少被成功處理一次的關(guān)鍵策略。它通過(guò)在消息處理失敗時(shí)自動(dòng)或手動(dòng)地重新發(fā)送消息,從而提高系統(tǒng)的可靠性和容錯(cuò)性。3.2Pulsar消息重試策略ApachePulsar提供了多種消息重試策略,以適應(yīng)不同的業(yè)務(wù)場(chǎng)景和需求。這些策略包括:3.2.1自動(dòng)重試Pulsar可以配置自動(dòng)重試機(jī)制,當(dāng)消息處理失敗時(shí),消息會(huì)被自動(dòng)重新發(fā)送到消費(fèi)者。自動(dòng)重試次數(shù)和重試間隔可以通過(guò)Pulsar的配置參數(shù)進(jìn)行調(diào)整。3.2.2手動(dòng)重試在某些情況下,可能需要更精細(xì)的控制。Pulsar允許消費(fèi)者在消息處理失敗時(shí)手動(dòng)觸發(fā)重試。這通常通過(guò)在消息處理函數(shù)中拋出異常或使用特定的API來(lái)實(shí)現(xiàn)。3.2.3死信隊(duì)列對(duì)于那些即使經(jīng)過(guò)多次重試也無(wú)法成功處理的消息,Pulsar提供了死信隊(duì)列(DeadLetterQueue,DLQ)機(jī)制。這些消息會(huì)被移動(dòng)到DLQ中,以便后續(xù)的人工檢查或特殊處理。3.3實(shí)現(xiàn)消息重試的步驟要實(shí)現(xiàn)Pulsar中的消息重試,可以遵循以下步驟:3.3.1步驟1:配置自動(dòng)重試在Pulsar的消費(fèi)者配置中,可以設(shè)置自動(dòng)重試的次數(shù)和間隔。例如,以下是一個(gè)使用JavaAPI配置自動(dòng)重試的例子:importorg.apache.pulsar.client.api.Consumer;
importorg.apache.pulsar.client.api.PulsarClient;
importorg.apache.pulsar.client.api.PulsarClientException;
publicclassPulsarConsumerWithRetry{
publicstaticvoidmain(String[]args)throwsPulsarClientException{
PulsarClientclient=PulsarClient.builder()
.serviceUrl("pulsar://localhost:6650")
.build();
Consumer<String>consumer=client.newConsumer()
.topic("persistent://public/default/my-topic")
.subscriptionName("my-subscription")
.negativeAckRedeliveryDelay(10,TimeUnit.SECONDS)//設(shè)置重試間隔
.redeliveryBackoff(10,TimeUnit.SECONDS)//設(shè)置重試間隔
.maxRedeliverCount(5)//設(shè)置最大重試次數(shù)
.subscribe();
while(true){
Message<String>msg=consumer.receive();
try{
//消息處理邏輯
System.out.println("Receivedmessage:"+msg.getValue());
consumer.acknowledge(msg);
}catch(Exceptione){
consumer.negativeAcknowledge(msg);//處理失敗,觸發(fā)重試
}
}
}
}3.3.2步驟2:手動(dòng)觸發(fā)重試如果需要在消息處理失敗時(shí)手動(dòng)觸發(fā)重試,可以在消息處理函數(shù)中拋出異?;蚴褂胣egativeAcknowledge方法。以下是一個(gè)示例:importorg.apache.pulsar.client.api.Message;
importorg.apache.pulsar.client.api.Consumer;
importorg.apache.pulsar.client.api.PulsarClientException;
publicclassPulsarConsumerManualRetry{
publicstaticvoidmain(String[]args)throwsPulsarClientException{
PulsarClientclient=PulsarClient.builder()
.serviceUrl("pulsar://localhost:6650")
.build();
Consumer<String>consumer=client.newConsumer()
.topic("persistent://public/default/my-topic")
.subscriptionName("my-subscription")
.subscribe();
while(true){
Message<String>msg=consumer.receive();
try{
//消息處理邏輯
System.out.println("Receivedmessage:"+msg.getValue());
if(msg.getValue().equals("error")){
thrownewException("Messageprocessingerror");
}
consumer.acknowledge(msg);
}catch(Exceptione){
consumer.negativeAcknowledge(msg);//處理失敗,觸發(fā)重試
}
}
}
}3.3.3步驟3:配置死信隊(duì)列對(duì)于那些即使經(jīng)過(guò)多次重試也無(wú)法成功處理的消息,可以配置死信隊(duì)列。以下是一個(gè)配置DLQ的例子:importorg.apache.pulsar.client.api.Consumer;
importorg.apache.pulsar.client.api.PulsarClient;
importorg.apache.pulsar.client.api.PulsarClientException;
publicclassPulsarConsumerWithDLQ{
publicstaticvoidmain(String[]args)throwsPulsarClientException{
PulsarClientclient=PulsarClient.builder()
.serviceUrl("pulsar://localhost:6650")
.build();
Consumer<String>consumer=client.newConsumer()
.topic("persistent://public/default/my-topic")
.subscriptionName("my-subscription")
.deadLetterTopic("persistent://public/default/my-dlq-topic")//設(shè)置DLQ主題
.subscribe();
while(true){
Message<String>msg=consumer.receive();
try{
//消息處理邏輯
System.out.println("Receivedmessage:"+msg.getValue());
consumer.acknowledge(msg);
}catch(Exceptione){
consumer.redeliverLater(msg,10,TimeUnit.SECONDS);//處理失敗,延遲重試
}
}
}
}在上述示例中,如果消息處理失敗,它將被重新發(fā)送到DLQ主題,而不是無(wú)限次重試。通過(guò)以上步驟,可以有效地在ApachePulsar中實(shí)現(xiàn)消息重試機(jī)制,從而提高系統(tǒng)的可靠性和容錯(cuò)性。4高級(jí)訂閱與重試4.1訂閱模式的高級(jí)用法在ApachePulsar中,訂閱模式是消息消費(fèi)的核心機(jī)制,它決定了消息如何被多個(gè)消費(fèi)者處理。Pulsar支持兩種主要的訂閱模式:Exclusive和Shared,以及一種特殊的模式Failover。除此之外,Pulsar還提供了Key_Shared訂閱模式,用于更細(xì)粒度的負(fù)載均衡和消息處理。4.1.1Exclusive訂閱模式在Exclusive模式下,一個(gè)訂閱只能被一個(gè)消費(fèi)者消費(fèi)。如果多個(gè)消費(fèi)者訂閱了同一個(gè)主題,只有第一個(gè)連接的消費(fèi)者能夠接收消息。當(dāng)該消費(fèi)者斷開(kāi)連接時(shí),其他消費(fèi)者才能開(kāi)始接收消息。示例代碼//創(chuàng)建一個(gè)Pulsar客戶端
PulsarClientclient=PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();
//創(chuàng)建一個(gè)Exclusive訂閱的消費(fèi)者
Consumer<String>consumer=client.newConsumer(Schema.STRING)
.topic("persistent://sample/standalone/ns/my-topic")
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Exclusive)
.subscribe();
//消費(fèi)消息
Message<String>msg=consumer.receive();
System.out.println("Receivedmessage:"+msg.getValue());
//確認(rèn)消息
consumer.acknowledge(msg);4.1.2Shared訂閱模式Shared模式允許多個(gè)消費(fèi)者同時(shí)消費(fèi)一個(gè)訂閱。消息會(huì)被均勻地分發(fā)給所有消費(fèi)者,每個(gè)消息只會(huì)被一個(gè)消費(fèi)者處理。示例代碼//創(chuàng)建一個(gè)Pulsar客戶端
PulsarClientclient=PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();
//創(chuàng)建一個(gè)Shared訂閱的消費(fèi)者
Consumer<String>consumer=client.newConsumer(Schema.STRING)
.topic("persistent://sample/standalone/ns/my-topic")
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Shared)
.subscribe();
//消費(fèi)消息
Message<String>msg=consumer.receive();
System.out.println("Receivedmessage:"+msg.getValue());
//確認(rèn)消息
consumer.acknowledge(msg);4.1.3Failover訂閱模式Failover模式類似于Exclusive模式,但當(dāng)一個(gè)消費(fèi)者斷開(kāi)連接時(shí),其未處理的消息會(huì)被重新分配給下一個(gè)消費(fèi)者。4.1.4Key_Shared訂閱模式Key_Shared模式允許消息根據(jù)消息鍵在多個(gè)消費(fèi)者之間共享。這確保了具有相同鍵的消息總是由同一個(gè)消費(fèi)者處理,從而實(shí)現(xiàn)一致性。示例代碼//創(chuàng)建一個(gè)Pulsar客戶端
PulsarClientclient=PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();
//創(chuàng)建一個(gè)Key_Shared訂閱的消費(fèi)者
Consumer<String>consumer=client.newConsumer(Schema.STRING)
.topic("persistent://sample/standalone/ns/my-topic")
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Key_Shared)
.subscribe();
//消費(fèi)消息
Message<String>msg=consumer.receive();
System.out.println("Receivedmessage:"+msg.getValue());
//確認(rèn)消息
consumer.acknowledge(msg);4.2優(yōu)化消息重試在消息隊(duì)列中,消息重試機(jī)制是處理失敗消息的關(guān)鍵。Pulsar提供了多種方式來(lái)優(yōu)化消息重試,包括消息重發(fā)、消息保留策略和死信隊(duì)列。4.2.1消息重發(fā)當(dāng)消費(fèi)者無(wú)法處理消息時(shí),可以通過(guò)negativeAcknowledge方法將消息返回到隊(duì)列中,以便稍后重試。示例代碼//創(chuàng)建一個(gè)Pulsar客戶端
PulsarClientclient=PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();
//創(chuàng)建一個(gè)消費(fèi)者
Consumer<String>consumer=client.newConsumer(Schema.STRING)
.topic("persistent://sample/standalone/ns/my-topic")
.subscriptionName("my-subscription")
.subscribe();
//消費(fèi)消息
Message<String>msg=consumer.receive();
System.out.println("Receivedmessage:"+msg.getValue());
//如果消息處理失敗,重試消息
if(/*處理失敗的條件*/){
consumer.negativeAcknowledge(msg);
}else{
//確認(rèn)消息
consumer.acknowledge(msg);
}4.2.2消息保留策略Pulsar允許設(shè)置消息保留策略,以控制消息在隊(duì)列中的存儲(chǔ)時(shí)間。這可以通過(guò)設(shè)置retentionTimeInMinutes和retentionSizeInMB來(lái)實(shí)現(xiàn)。示例代碼//創(chuàng)建一個(gè)Pulsar管理員對(duì)象
Adminadmin=Admin.builder().serviceHttpUrl("http://localhost:8080").build();
//設(shè)置消息保留策略
admin.topics().setRetention("persistent://sample/standalone/ns/my-topic",
newRetentionPolicies(1,TimeUnit.DAYS),1024);4.2.3死信隊(duì)列當(dāng)消息在一定次數(shù)的重試后仍然無(wú)法被處理時(shí),可以將這些消息發(fā)送到死信隊(duì)列,以便進(jìn)行進(jìn)一步的分析或處理。4.3監(jiān)控與調(diào)整重試策略Pulsar提供了豐富的監(jiān)控指標(biāo),可以用來(lái)跟蹤消息隊(duì)列的健康狀況和性能。通過(guò)監(jiān)控,可以調(diào)整重試策略,以優(yōu)化消息處理流程。4.3.1使用PulsarManager監(jiān)控PulsarManager是一個(gè)圖形界面工具,可以用來(lái)監(jiān)控和管理Pulsar集群。它提供了消息隊(duì)列的實(shí)時(shí)監(jiān)控?cái)?shù)據(jù),包括消息的發(fā)送和接收速率、消息積壓和消費(fèi)者狀態(tài)。4.3.2調(diào)整重試策略根據(jù)監(jiān)控?cái)?shù)據(jù),可以調(diào)整消息重試的次數(shù)和間隔,以適應(yīng)不同的業(yè)務(wù)需求。例如,可以增加重試次數(shù),以提高消息處理的可靠性;或者增加重試間隔,以減輕消費(fèi)者在高負(fù)載下的壓力。示例代碼//創(chuàng)建一個(gè)Pulsar客戶端
PulsarClientclient=PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();
//創(chuàng)建一個(gè)消費(fèi)者,設(shè)置重試策略
Consumer<String>consumer=client.newConsumer(Schema.STRING)
.topic("persistent://sample/standalone/ns/my-topic")
.subscriptionName("my-subscription")
.consumerName("my-consumer")
.maxUnackedMessages(1000)
.ackTimeout(10,TimeUnit.SECONDS)
.subscribe();
//消費(fèi)消息
Message<String>msg=consumer.receive();
System.out.println("Receivedmessage:"+msg.getValue());
//如果消息處理失敗,重試消息
if(/*處理失敗的條件*/){
consumer.negativeAcknowledge(msg);
}else{
//確認(rèn)消息
consumer.acknowledge(msg);
}在這個(gè)例子中,maxUnackedMessages和ackTimeout是用來(lái)控制消息重試策略的參數(shù)。maxUnackedMessages定義了消費(fèi)者可以同時(shí)處理的消息數(shù)量,而ackTimeout定義了消費(fèi)者處理消息的超時(shí)時(shí)間。如果消費(fèi)者在ackTimeout時(shí)間內(nèi)沒(méi)有確認(rèn)消息,Pulsar會(huì)自動(dòng)將消息重發(fā)給其他消費(fèi)者。通過(guò)這些高級(jí)訂閱模式和重試策略的使用,可以構(gòu)建出更健壯、更靈活的消息處理系統(tǒng)。在實(shí)際應(yīng)用中,應(yīng)根據(jù)業(yè)務(wù)需求和系統(tǒng)性能,合理選擇和調(diào)整訂閱模式和重試策略,以達(dá)到最佳的消息處理效果。5實(shí)踐案例5.1使用Pulsar處理高并發(fā)場(chǎng)景在處理高并發(fā)場(chǎng)景時(shí),Pulsar消息隊(duì)列因其高性能和可擴(kuò)展性成為許多企業(yè)的首選。Pulsar支持多種訂閱模式,包括獨(dú)占(Exclusive)、共享(Shared)、鍵共享(Key_Shared)和失敗重試(Failover),這些模式可以靈活地滿足不同場(chǎng)景下的需求。5.1.1獨(dú)占訂閱模式獨(dú)占訂閱模式下,一個(gè)主題只能被一個(gè)消費(fèi)者訂閱。如果多個(gè)消費(fèi)者嘗試訂閱同一主題,只有第一個(gè)訂閱者能夠成功接收消息,其余的將被拒絕。這種模式適用于需要確保消息只被一個(gè)消費(fèi)者處理的場(chǎng)景。示例代碼//創(chuàng)建一個(gè)Pulsar客戶端
PulsarClientclient=PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();
//創(chuàng)建一個(gè)獨(dú)占訂閱的消費(fèi)者
Consumer<String>consumer=client.newConsumer(Schema.STRING)
.topic("persistent://sample/standalone/ns/my-topic")
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Exclusive)
.subscribe();
//消費(fèi)消息
Message<String>msg=consumer.receive();
System.out.println("Receivedmessage:"+msg.getValue());
//確認(rèn)消息已處理
consumer.acknowledge(msg);
//關(guān)閉消費(fèi)者和客戶端
consumer.close();
client.close();5.1.2共享訂閱模式共享訂閱模式下,多個(gè)消費(fèi)者可以訂閱同一主題,消息會(huì)被均勻地分發(fā)給所有訂閱者。這種模式適用于需要水平擴(kuò)展消費(fèi)者以處理更多消息的場(chǎng)景。示例代碼//創(chuàng)建一個(gè)Pulsar客戶端
PulsarClientclient=PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();
//創(chuàng)建一個(gè)共享訂閱的消費(fèi)者
Consumer<String>consumer=client.newConsumer(Schema.STRING)
.topic("persistent://sample/standalone/ns/my-topic")
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Shared)
.subscribe();
//消費(fèi)消息
Message<String>msg=consumer.receive();
System.out.println("Receivedmessage:"+msg.getValue());
//確認(rèn)消息已處理
consumer.acknowledge(msg);
//關(guān)閉消費(fèi)者和客戶端
consumer.close();
client.close();5.1.3鍵共享訂閱模式鍵共享訂閱模式下,消息根據(jù)消息鍵(如果存在)被分發(fā)到特定的消費(fèi)者。這種模式適用于需要根據(jù)消息內(nèi)容進(jìn)行路由的場(chǎng)景,例如,根據(jù)用戶ID將消息路由到處理該用戶請(qǐng)求的消費(fèi)者。示例代碼//創(chuàng)建一個(gè)Pulsar客戶端
PulsarClientclient=PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();
//創(chuàng)建一個(gè)鍵共享訂閱的消費(fèi)者
Consumer<String>consumer=client.newConsumer(Schema.STRING)
.topic("persistent://sample/standalone/ns/my-topic")
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.KeyShared)
.subscribe();
//消費(fèi)消息
Message<String>msg=consumer.receive();
System.out.println("Receivedmessage:"+msg.getValue());
//確認(rèn)消息已處理
consumer.acknowledge(msg);
//關(guān)閉消費(fèi)者和客戶端
consumer.close();
client.close();5.1.4失敗重試訂閱模式失敗重試訂閱模式下,如果一個(gè)消費(fèi)者無(wú)法處理消息,消息會(huì)被重新分發(fā)給其他訂閱者。這種模式適用于需要高可用性和容錯(cuò)性的場(chǎng)景。示例代碼//創(chuàng)建一個(gè)Pulsar客戶端
PulsarClientclient=PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();
//創(chuàng)建一個(gè)失敗重試訂閱的消費(fèi)者
Consumer<String>consumer=client.new
溫馨提示
- 1. 本站所有資源如無(wú)特殊說(shuō)明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請(qǐng)下載最新的WinRAR軟件解壓。
- 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請(qǐng)聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
- 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁(yè)內(nèi)容里面會(huì)有圖紙預(yù)覽,若沒(méi)有圖紙預(yù)覽就沒(méi)有圖紙。
- 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
- 5. 人人文庫(kù)網(wǎng)僅提供信息存儲(chǔ)空間,僅對(duì)用戶上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對(duì)用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對(duì)任何下載內(nèi)容負(fù)責(zé)。
- 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請(qǐng)與我們聯(lián)系,我們立即糾正。
- 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時(shí)也不承擔(dān)用戶因使用這些下載資源對(duì)自己和他人造成任何形式的傷害或損失。
最新文檔
- “十三五”重點(diǎn)項(xiàng)目-石化機(jī)械設(shè)備項(xiàng)目節(jié)能評(píng)估報(bào)告(節(jié)能專)
- 黔江區(qū)電力電纜附件項(xiàng)目資金申請(qǐng)報(bào)告
- 現(xiàn)代辦公環(huán)境的美學(xué)優(yōu)化研究
- 知識(shí)產(chǎn)權(quán)國(guó)際化的趨勢(shì)與挑戰(zhàn)分析
- 電子平臺(tái)的跨行業(yè)采購(gòu)解決方案探討
- 電影產(chǎn)業(yè)鏈商業(yè)模式創(chuàng)新與市場(chǎng)拓展
- 2020-2025年中國(guó)活性炭空氣過(guò)濾器市場(chǎng)前景預(yù)測(cè)及未來(lái)發(fā)展趨勢(shì)報(bào)告
- 土建質(zhì)量員復(fù)習(xí)題及參考答案
- 廣東梅州職業(yè)技術(shù)學(xué)院《網(wǎng)頁(yè)制作與網(wǎng)站開(kāi)發(fā)》2023-2024學(xué)年第二學(xué)期期末試卷
- 廣州涉外經(jīng)濟(jì)職業(yè)技術(shù)學(xué)院《田徑基礎(chǔ)》2023-2024學(xué)年第二學(xué)期期末試卷
- 護(hù)理操作-吸痰
- 重癥肺炎的基本知識(shí)宣教
- 中醫(yī)適宜技術(shù)-腕踝針
- 初二上勞動(dòng)技術(shù)課件電子版
- 創(chuàng)業(yè)計(jì)劃書模板-創(chuàng)業(yè)計(jì)劃書-商業(yè)計(jì)劃書模板-項(xiàng)目計(jì)劃書模板-商業(yè)計(jì)劃書30
- 醫(yī)院護(hù)理帶教老師競(jìng)聘課件
- 四川虹科創(chuàng)新科技有限公司高強(qiáng)超薄耐摔玻璃智能制造產(chǎn)業(yè)化項(xiàng)目環(huán)境影響報(bào)告
- 多聯(lián)機(jī)空調(diào)系統(tǒng)設(shè)計(jì)課件
- 燭之武退秦師 全市一等獎(jiǎng)
- 提高高中教學(xué)質(zhì)量的幾點(diǎn)建議
- 地形圖林地的勘界及面積測(cè)量-林地實(shí)地勘界與勾繪(森林調(diào)查技術(shù))
評(píng)論
0/150
提交評(píng)論