版權(quán)說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請進(jìn)行舉報或認(rèn)領(lǐng)
文檔簡介
消息隊列:Pulsar:Pulsar的訂閱模式與消息重試1消息隊列基礎(chǔ)1.1消息隊列的定義消息隊列是一種應(yīng)用程序間的通信方法,它允許消息的發(fā)送者不會因為接收者暫時無法處理消息而阻塞。消息隊列通過在消息的生產(chǎn)者和消費(fèi)者之間提供一個緩沖區(qū),實現(xiàn)了異步通信和解耦。消息隊列可以處理大量并發(fā)消息,提高系統(tǒng)的響應(yīng)速度和吞吐量,同時還能保證消息的可靠傳輸。1.2消息隊列的作用消息隊列在現(xiàn)代軟件架構(gòu)中扮演著關(guān)鍵角色,主要作用包括:異步處理:允許生產(chǎn)者和消費(fèi)者異步操作,提高系統(tǒng)響應(yīng)速度。負(fù)載均衡:通過消息隊列,可以將任務(wù)均勻地分配給多個消費(fèi)者,實現(xiàn)負(fù)載均衡。故障恢復(fù):消息隊列可以持久化消息,即使消費(fèi)者失敗,消息也不會丟失,可以重新處理。解耦:生產(chǎn)者和消費(fèi)者不需要直接通信,降低了系統(tǒng)的耦合度,提高了系統(tǒng)的可維護(hù)性和可擴(kuò)展性。1.3Pulsar簡介ApachePulsar是一個高性能、可擴(kuò)展的分布式消息隊列系統(tǒng)。它提供了消息持久化、分層存儲、多租戶、全球地理復(fù)制等功能,使其成為構(gòu)建現(xiàn)代消息隊列和流處理應(yīng)用的理想選擇。Pulsar的設(shè)計目標(biāo)是提供一個統(tǒng)一的平臺,支持消息隊列和流處理兩種模式,同時保持高性能和低延遲。1.3.1Pulsar的架構(gòu)Pulsar采用了一種分層的架構(gòu),主要包括:Broker:負(fù)責(zé)消息的路由和管理,處理客戶端的請求。BookKeeper:提供消息的持久化存儲,保證消息的可靠性和持久性。ZooKeeper:用于協(xié)調(diào)集群中的Broker,管理集群的元數(shù)據(jù)。1.3.2Pulsar的特性持久化存儲:Pulsar使用BookKeeper來存儲消息,保證消息的持久化和可靠性。多租戶:Pulsar支持多租戶,每個租戶可以有自己的命名空間和主題。全球地理復(fù)制:Pulsar支持跨地域的復(fù)制,可以將消息復(fù)制到全球的多個數(shù)據(jù)中心,提高系統(tǒng)的可用性和容災(zāi)能力。分層存儲:Pulsar支持冷熱數(shù)據(jù)的分層存儲,可以將熱點數(shù)據(jù)存儲在高速的SSD上,將冷數(shù)據(jù)存儲在低成本的HDD上,以優(yōu)化存儲成本和性能。1.3.3Pulsar的使用示例以下是一個使用Python客戶端向Pulsar主題發(fā)送消息的示例:frompulsarimportClient
#創(chuàng)建Pulsar客戶端
client=Client('pulsar://localhost:6650')
#創(chuàng)建生產(chǎn)者
producer=client.create_producer('persistent://public/default/my-topic')
#發(fā)送消息
foriinrange(10):
producer.send(('HelloPulsar%d'%i).encode('utf-8'))
#關(guān)閉客戶端
client.close()在這個示例中,我們首先創(chuàng)建了一個Pulsar客戶端,然后創(chuàng)建了一個生產(chǎn)者,用于向主題my-topic發(fā)送消息。我們發(fā)送了10條消息,每條消息的內(nèi)容都是HelloPulsar加上一個數(shù)字。最后,我們關(guān)閉了客戶端。1.3.4Pulsar的訂閱模式Pulsar支持兩種訂閱模式:獨(dú)占訂閱(Exclusive)和共享訂閱(Shared)。獨(dú)占訂閱:一個主題只能有一個消費(fèi)者訂閱,如果多個消費(fèi)者訂閱了同一個主題,只有其中一個消費(fèi)者可以接收消息。共享訂閱:一個主題可以有多個消費(fèi)者訂閱,消息會被均勻地分配給所有消費(fèi)者。1.3.5Pulsar的消息重試Pulsar提供了消息重試機(jī)制,當(dāng)消費(fèi)者無法處理消息時,消息會被重新發(fā)送給消費(fèi)者。消息重試的次數(shù)和間隔可以通過配置來控制。例如,以下是一個使用Java客戶端配置消息重試的示例:importorg.apache.pulsar.client.api.Consumer;
importorg.apache.pulsar.client.api.Message;
importorg.apache.pulsar.client.api.PulsarClient;
importorg.apache.pulsar.client.api.PulsarClientException;
publicclassRetryConsumer{
publicstaticvoidmain(String[]args)throwsPulsarClientException{
PulsarClientclient=PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();
Consumer<String>consumer=client.newConsumer()
.topic("persistent://public/default/my-topic")
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Shared)
.negativeAckRedeliveryDelay(10,TimeUnit.SECONDS)//設(shè)置消息重試間隔
.maxRedeliveryCount(5)//設(shè)置消息重試次數(shù)
.subscribe();
while(true){
Message<String>msg=consumer.receive();
try{
System.out.println("Receivedmessage:"+msg.getValue());
consumer.acknowledge(msg);
}catch(Exceptione){
consumer.negativeAcknowledge(msg);
}
}
}
}在這個示例中,我們創(chuàng)建了一個共享訂閱的消費(fèi)者,設(shè)置了消息重試間隔為10秒,消息重試次數(shù)為5次。當(dāng)消費(fèi)者接收到消息后,如果處理消息時發(fā)生異常,消費(fèi)者會調(diào)用negativeAcknowledge方法,將消息標(biāo)記為未處理,Pulsar會根據(jù)配置的消息重試間隔和次數(shù),重新發(fā)送這條消息給消費(fèi)者。1.3.6總結(jié)Pulsar是一個功能強(qiáng)大的消息隊列系統(tǒng),它提供了消息持久化、多租戶、全球地理復(fù)制和分層存儲等功能,支持獨(dú)占訂閱和共享訂閱兩種模式,同時提供了消息重試機(jī)制,保證了消息的可靠處理。通過使用Pulsar,可以構(gòu)建出高性能、可擴(kuò)展、可靠的消息隊列和流處理應(yīng)用。2消息隊列:Pulsar:深入理解Pulsar的訂閱模式在ApachePulsar消息隊列中,訂閱模式是消息消費(fèi)的核心機(jī)制之一,它決定了多個消費(fèi)者如何處理來自同一主題的消息。Pulsar提供了四種訂閱模式:獨(dú)占訂閱模式、共享訂閱模式、故障轉(zhuǎn)移訂閱模式和鍵共享訂閱模式。每種模式都有其特定的使用場景和優(yōu)勢,下面將詳細(xì)介紹這四種訂閱模式的原理和應(yīng)用場景。2.1獨(dú)占訂閱模式(Exclusive)2.1.1原理在獨(dú)占訂閱模式下,一個主題只能有一個活動的訂閱者。如果多個消費(fèi)者嘗試訂閱同一主題,只有第一個訂閱者能夠成功接收消息,其他訂閱者將被阻止直到第一個訂閱者斷開連接。這種模式確保了消息的順序處理和唯一性。2.1.2示例代碼//創(chuàng)建Pulsar客戶端
PulsarClientclient=PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();
//創(chuàng)建獨(dú)占訂閱
Consumer<String>consumer=client.newConsumer(Schema.STRING)
.topic("persistent://sample/standalone/ns/my-topic")
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Exclusive)
.subscribe();
//消費(fèi)消息
Message<String>msg=consumer.receive();
System.out.println("Receivedmessage:"+msg.getValue());
consumer.acknowledge(msg);2.1.3解釋上述代碼展示了如何在Pulsar中創(chuàng)建一個獨(dú)占訂閱。SubscriptionType.Exclusive參數(shù)確保了只有創(chuàng)建此訂閱的消費(fèi)者能夠接收消息。如果另一個消費(fèi)者嘗試使用相同的訂閱名稱訂閱同一主題,它將被阻止直到當(dāng)前訂閱者斷開連接。2.2共享訂閱模式(Shared)2.2.1原理共享訂閱模式允許多個消費(fèi)者同時訂閱同一主題。消息將被分發(fā)給訂閱者中的任意一個,但不會重復(fù)發(fā)送給其他訂閱者。這種模式提高了系統(tǒng)的并行處理能力,但不保證消息的順序。2.2.2示例代碼//創(chuàng)建Pulsar客戶端
PulsarClientclient=PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();
//創(chuàng)建共享訂閱
Consumer<String>consumer1=client.newConsumer(Schema.STRING)
.topic("persistent://sample/standalone/ns/my-topic")
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Shared)
.subscribe();
Consumer<String>consumer2=client.newConsumer(Schema.STRING)
.topic("persistent://sample/standalone/ns/my-topic")
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Shared)
.subscribe();
//消費(fèi)消息
Message<String>msg1=consumer1.receive();
System.out.println("Consumer1receivedmessage:"+msg1.getValue());
consumer1.acknowledge(msg1);
Message<String>msg2=consumer2.receive();
System.out.println("Consumer2receivedmessage:"+msg2.getValue());
consumer2.acknowledge(msg2);2.2.3解釋在共享訂閱模式下,多個消費(fèi)者可以使用相同的訂閱名稱訂閱同一主題。消息將被分發(fā)給任意一個訂閱者,但不會重復(fù)發(fā)送給其他訂閱者。這使得系統(tǒng)能夠并行處理消息,提高了處理效率。2.3故障轉(zhuǎn)移訂閱模式(Failover)2.3.1原理故障轉(zhuǎn)移訂閱模式類似于獨(dú)占訂閱,但允許多個消費(fèi)者訂閱同一主題,每個消費(fèi)者都有一個唯一的分區(qū)。當(dāng)一個消費(fèi)者(分區(qū))失敗時,其未處理的消息將被重新分配給其他消費(fèi)者。這種模式保證了消息的順序處理和高可用性。2.3.2示例代碼//創(chuàng)建Pulsar客戶端
PulsarClientclient=PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();
//創(chuàng)建故障轉(zhuǎn)移訂閱
Consumer<String>consumer1=client.newConsumer(Schema.STRING)
.topic("persistent://sample/standalone/ns/my-topic")
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Failover)
.consumerName("consumer1")
.subscribe();
Consumer<String>consumer2=client.newConsumer(Schema.STRING)
.topic("persistent://sample/standalone/ns/my-topic")
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Failover)
.consumerName("consumer2")
.subscribe();
//消費(fèi)消息
Message<String>msg1=consumer1.receive();
System.out.println("Consumer1receivedmessage:"+msg1.getValue());
consumer1.acknowledge(msg1);
Message<String>msg2=consumer2.receive();
System.out.println("Consumer2receivedmessage:"+msg2.getValue());
consumer2.acknowledge(msg2);2.3.3解釋在故障轉(zhuǎn)移訂閱模式下,每個消費(fèi)者都有一個唯一的分區(qū)。當(dāng)一個消費(fèi)者失敗時,其未處理的消息將被重新分配給其他消費(fèi)者。這確保了即使在消費(fèi)者失敗的情況下,消息也能被正確處理,同時保持了消息的順序。2.4鍵共享訂閱模式(Key_Shared)2.4.1原理鍵共享訂閱模式允許消息根據(jù)其鍵(key)被分發(fā)到特定的消費(fèi)者。這種模式確保了具有相同鍵的消息總是被同一個消費(fèi)者處理,即使有多個消費(fèi)者訂閱同一主題。這在需要根據(jù)消息鍵進(jìn)行一致性處理的場景中非常有用。2.4.2示例代碼//創(chuàng)建Pulsar客戶端
PulsarClientclient=PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();
//創(chuàng)建鍵共享訂閱
Consumer<String>consumer1=client.newConsumer(Schema.STRING)
.topic("persistent://sample/standalone/ns/my-topic")
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Key_Shared)
.consumerName("consumer1")
.subscribe();
Consumer<String>consumer2=client.newConsumer(Schema.STRING)
.topic("persistent://sample/standalone/ns/my-topic")
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Key_Shared)
.consumerName("consumer2")
.subscribe();
//生產(chǎn)者發(fā)送帶有鍵的消息
Producer<String>producer=client.newProducer(Schema.STRING)
.topic("persistent://sample/standalone/ns/my-topic")
.send("message1","key1".getBytes());
producer.send("message2","key2".getBytes());
//消費(fèi)消息
Message<String>msg1=consumer1.receive();
System.out.println("Consumer1receivedmessage:"+msg1.getValue());
Message<String>msg2=consumer2.receive();
System.out.println("Consumer2receivedmessage:"+msg2.getValue());2.4.3解釋在鍵共享訂閱模式下,消息根據(jù)其鍵被分發(fā)到特定的消費(fèi)者。在上述示例中,producer.send方法被用來發(fā)送帶有鍵的消息。"key1"和"key2"確保了消息將被分發(fā)到不同的消費(fèi)者,具有相同鍵的消息將始終被同一個消費(fèi)者處理。2.5消息重試在Pulsar中,消息重試機(jī)制允許在消息處理失敗時重新發(fā)送消息。這可以通過設(shè)置消息的重試次數(shù)和重試策略來實現(xiàn),確保了消息的可靠處理。2.5.1示例代碼//創(chuàng)建Pulsar客戶端
PulsarClientclient=PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();
//創(chuàng)建消費(fèi)者并設(shè)置消息重試策略
Consumer<String>consumer=client.newConsumer(Schema.STRING)
.topic("persistent://sample/standalone/ns/my-topic")
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Shared)
.negativeAckRedeliveryDelay(1,TimeUnit.MINUTES)//設(shè)置消息重試延遲
.subscribe();
//消費(fèi)消息
Message<String>msg=consumer.receive();
try{
System.out.println("Receivedmessage:"+msg.getValue());
//模擬消息處理失敗
if(msg.getValue().equals("message1")){
consumer.negativeAcknowledge(msg);
}else{
consumer.acknowledge(msg);
}
}catch(Exceptione){
consumer.negativeAcknowledge(msg);
}2.5.2解釋在上述代碼中,negativeAckRedeliveryDelay方法被用來設(shè)置消息重試的延遲時間。當(dāng)消息處理失敗時,消費(fèi)者調(diào)用negativeAcknowledge方法,這將導(dǎo)致消息在指定的延遲時間后被重新發(fā)送。這種機(jī)制確保了即使在處理失敗的情況下,消息也能被重新嘗試處理,提高了消息處理的可靠性。通過理解Pulsar的訂閱模式和消息重試機(jī)制,開發(fā)者可以更有效地設(shè)計和實現(xiàn)消息處理系統(tǒng),確保消息的正確處理和系統(tǒng)的高可用性。3消息重試機(jī)制3.1消息重試的重要性在分布式系統(tǒng)中,消息隊列如ApachePulsar扮演著關(guān)鍵角色,用于在服務(wù)之間傳遞消息。然而,網(wǎng)絡(luò)延遲、服務(wù)故障或消費(fèi)者處理邏輯的復(fù)雜性可能導(dǎo)致消息處理失敗。消息重試機(jī)制是確保消息至少被成功處理一次的關(guān)鍵策略。它通過在消息處理失敗時自動或手動地重新發(fā)送消息,從而提高系統(tǒng)的可靠性和容錯性。3.2Pulsar消息重試策略ApachePulsar提供了多種消息重試策略,以適應(yīng)不同的業(yè)務(wù)場景和需求。這些策略包括:3.2.1自動重試Pulsar可以配置自動重試機(jī)制,當(dāng)消息處理失敗時,消息會被自動重新發(fā)送到消費(fèi)者。自動重試次數(shù)和重試間隔可以通過Pulsar的配置參數(shù)進(jìn)行調(diào)整。3.2.2手動重試在某些情況下,可能需要更精細(xì)的控制。Pulsar允許消費(fèi)者在消息處理失敗時手動觸發(fā)重試。這通常通過在消息處理函數(shù)中拋出異?;蚴褂锰囟ǖ腁PI來實現(xiàn)。3.2.3死信隊列對于那些即使經(jīng)過多次重試也無法成功處理的消息,Pulsar提供了死信隊列(DeadLetterQueue,DLQ)機(jī)制。這些消息會被移動到DLQ中,以便后續(xù)的人工檢查或特殊處理。3.3實現(xiàn)消息重試的步驟要實現(xiàn)Pulsar中的消息重試,可以遵循以下步驟:3.3.1步驟1:配置自動重試在Pulsar的消費(fèi)者配置中,可以設(shè)置自動重試的次數(shù)和間隔。例如,以下是一個使用JavaAPI配置自動重試的例子:importorg.apache.pulsar.client.api.Consumer;
importorg.apache.pulsar.client.api.PulsarClient;
importorg.apache.pulsar.client.api.PulsarClientException;
publicclassPulsarConsumerWithRetry{
publicstaticvoidmain(String[]args)throwsPulsarClientException{
PulsarClientclient=PulsarClient.builder()
.serviceUrl("pulsar://localhost:6650")
.build();
Consumer<String>consumer=client.newConsumer()
.topic("persistent://public/default/my-topic")
.subscriptionName("my-subscription")
.negativeAckRedeliveryDelay(10,TimeUnit.SECONDS)//設(shè)置重試間隔
.redeliveryBackoff(10,TimeUnit.SECONDS)//設(shè)置重試間隔
.maxRedeliverCount(5)//設(shè)置最大重試次數(shù)
.subscribe();
while(true){
Message<String>msg=consumer.receive();
try{
//消息處理邏輯
System.out.println("Receivedmessage:"+msg.getValue());
consumer.acknowledge(msg);
}catch(Exceptione){
consumer.negativeAcknowledge(msg);//處理失敗,觸發(fā)重試
}
}
}
}3.3.2步驟2:手動觸發(fā)重試如果需要在消息處理失敗時手動觸發(fā)重試,可以在消息處理函數(shù)中拋出異?;蚴褂胣egativeAcknowledge方法。以下是一個示例:importorg.apache.pulsar.client.api.Message;
importorg.apache.pulsar.client.api.Consumer;
importorg.apache.pulsar.client.api.PulsarClientException;
publicclassPulsarConsumerManualRetry{
publicstaticvoidmain(String[]args)throwsPulsarClientException{
PulsarClientclient=PulsarClient.builder()
.serviceUrl("pulsar://localhost:6650")
.build();
Consumer<String>consumer=client.newConsumer()
.topic("persistent://public/default/my-topic")
.subscriptionName("my-subscription")
.subscribe();
while(true){
Message<String>msg=consumer.receive();
try{
//消息處理邏輯
System.out.println("Receivedmessage:"+msg.getValue());
if(msg.getValue().equals("error")){
thrownewException("Messageprocessingerror");
}
consumer.acknowledge(msg);
}catch(Exceptione){
consumer.negativeAcknowledge(msg);//處理失敗,觸發(fā)重試
}
}
}
}3.3.3步驟3:配置死信隊列對于那些即使經(jīng)過多次重試也無法成功處理的消息,可以配置死信隊列。以下是一個配置DLQ的例子:importorg.apache.pulsar.client.api.Consumer;
importorg.apache.pulsar.client.api.PulsarClient;
importorg.apache.pulsar.client.api.PulsarClientException;
publicclassPulsarConsumerWithDLQ{
publicstaticvoidmain(String[]args)throwsPulsarClientException{
PulsarClientclient=PulsarClient.builder()
.serviceUrl("pulsar://localhost:6650")
.build();
Consumer<String>consumer=client.newConsumer()
.topic("persistent://public/default/my-topic")
.subscriptionName("my-subscription")
.deadLetterTopic("persistent://public/default/my-dlq-topic")//設(shè)置DLQ主題
.subscribe();
while(true){
Message<String>msg=consumer.receive();
try{
//消息處理邏輯
System.out.println("Receivedmessage:"+msg.getValue());
consumer.acknowledge(msg);
}catch(Exceptione){
consumer.redeliverLater(msg,10,TimeUnit.SECONDS);//處理失敗,延遲重試
}
}
}
}在上述示例中,如果消息處理失敗,它將被重新發(fā)送到DLQ主題,而不是無限次重試。通過以上步驟,可以有效地在ApachePulsar中實現(xiàn)消息重試機(jī)制,從而提高系統(tǒng)的可靠性和容錯性。4高級訂閱與重試4.1訂閱模式的高級用法在ApachePulsar中,訂閱模式是消息消費(fèi)的核心機(jī)制,它決定了消息如何被多個消費(fèi)者處理。Pulsar支持兩種主要的訂閱模式:Exclusive和Shared,以及一種特殊的模式Failover。除此之外,Pulsar還提供了Key_Shared訂閱模式,用于更細(xì)粒度的負(fù)載均衡和消息處理。4.1.1Exclusive訂閱模式在Exclusive模式下,一個訂閱只能被一個消費(fèi)者消費(fèi)。如果多個消費(fèi)者訂閱了同一個主題,只有第一個連接的消費(fèi)者能夠接收消息。當(dāng)該消費(fèi)者斷開連接時,其他消費(fèi)者才能開始接收消息。示例代碼//創(chuàng)建一個Pulsar客戶端
PulsarClientclient=PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();
//創(chuàng)建一個Exclusive訂閱的消費(fèi)者
Consumer<String>consumer=client.newConsumer(Schema.STRING)
.topic("persistent://sample/standalone/ns/my-topic")
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Exclusive)
.subscribe();
//消費(fèi)消息
Message<String>msg=consumer.receive();
System.out.println("Receivedmessage:"+msg.getValue());
//確認(rèn)消息
consumer.acknowledge(msg);4.1.2Shared訂閱模式Shared模式允許多個消費(fèi)者同時消費(fèi)一個訂閱。消息會被均勻地分發(fā)給所有消費(fèi)者,每個消息只會被一個消費(fèi)者處理。示例代碼//創(chuàng)建一個Pulsar客戶端
PulsarClientclient=PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();
//創(chuàng)建一個Shared訂閱的消費(fèi)者
Consumer<String>consumer=client.newConsumer(Schema.STRING)
.topic("persistent://sample/standalone/ns/my-topic")
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Shared)
.subscribe();
//消費(fèi)消息
Message<String>msg=consumer.receive();
System.out.println("Receivedmessage:"+msg.getValue());
//確認(rèn)消息
consumer.acknowledge(msg);4.1.3Failover訂閱模式Failover模式類似于Exclusive模式,但當(dāng)一個消費(fèi)者斷開連接時,其未處理的消息會被重新分配給下一個消費(fèi)者。4.1.4Key_Shared訂閱模式Key_Shared模式允許消息根據(jù)消息鍵在多個消費(fèi)者之間共享。這確保了具有相同鍵的消息總是由同一個消費(fèi)者處理,從而實現(xiàn)一致性。示例代碼//創(chuàng)建一個Pulsar客戶端
PulsarClientclient=PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();
//創(chuàng)建一個Key_Shared訂閱的消費(fèi)者
Consumer<String>consumer=client.newConsumer(Schema.STRING)
.topic("persistent://sample/standalone/ns/my-topic")
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Key_Shared)
.subscribe();
//消費(fèi)消息
Message<String>msg=consumer.receive();
System.out.println("Receivedmessage:"+msg.getValue());
//確認(rèn)消息
consumer.acknowledge(msg);4.2優(yōu)化消息重試在消息隊列中,消息重試機(jī)制是處理失敗消息的關(guān)鍵。Pulsar提供了多種方式來優(yōu)化消息重試,包括消息重發(fā)、消息保留策略和死信隊列。4.2.1消息重發(fā)當(dāng)消費(fèi)者無法處理消息時,可以通過negativeAcknowledge方法將消息返回到隊列中,以便稍后重試。示例代碼//創(chuàng)建一個Pulsar客戶端
PulsarClientclient=PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();
//創(chuàng)建一個消費(fèi)者
Consumer<String>consumer=client.newConsumer(Schema.STRING)
.topic("persistent://sample/standalone/ns/my-topic")
.subscriptionName("my-subscription")
.subscribe();
//消費(fèi)消息
Message<String>msg=consumer.receive();
System.out.println("Receivedmessage:"+msg.getValue());
//如果消息處理失敗,重試消息
if(/*處理失敗的條件*/){
consumer.negativeAcknowledge(msg);
}else{
//確認(rèn)消息
consumer.acknowledge(msg);
}4.2.2消息保留策略Pulsar允許設(shè)置消息保留策略,以控制消息在隊列中的存儲時間。這可以通過設(shè)置retentionTimeInMinutes和retentionSizeInMB來實現(xiàn)。示例代碼//創(chuàng)建一個Pulsar管理員對象
Adminadmin=Admin.builder().serviceHttpUrl("http://localhost:8080").build();
//設(shè)置消息保留策略
admin.topics().setRetention("persistent://sample/standalone/ns/my-topic",
newRetentionPolicies(1,TimeUnit.DAYS),1024);4.2.3死信隊列當(dāng)消息在一定次數(shù)的重試后仍然無法被處理時,可以將這些消息發(fā)送到死信隊列,以便進(jìn)行進(jìn)一步的分析或處理。4.3監(jiān)控與調(diào)整重試策略Pulsar提供了豐富的監(jiān)控指標(biāo),可以用來跟蹤消息隊列的健康狀況和性能。通過監(jiān)控,可以調(diào)整重試策略,以優(yōu)化消息處理流程。4.3.1使用PulsarManager監(jiān)控PulsarManager是一個圖形界面工具,可以用來監(jiān)控和管理Pulsar集群。它提供了消息隊列的實時監(jiān)控數(shù)據(jù),包括消息的發(fā)送和接收速率、消息積壓和消費(fèi)者狀態(tài)。4.3.2調(diào)整重試策略根據(jù)監(jiān)控數(shù)據(jù),可以調(diào)整消息重試的次數(shù)和間隔,以適應(yīng)不同的業(yè)務(wù)需求。例如,可以增加重試次數(shù),以提高消息處理的可靠性;或者增加重試間隔,以減輕消費(fèi)者在高負(fù)載下的壓力。示例代碼//創(chuàng)建一個Pulsar客戶端
PulsarClientclient=PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();
//創(chuàng)建一個消費(fèi)者,設(shè)置重試策略
Consumer<String>consumer=client.newConsumer(Schema.STRING)
.topic("persistent://sample/standalone/ns/my-topic")
.subscriptionName("my-subscription")
.consumerName("my-consumer")
.maxUnackedMessages(1000)
.ackTimeout(10,TimeUnit.SECONDS)
.subscribe();
//消費(fèi)消息
Message<String>msg=consumer.receive();
System.out.println("Receivedmessage:"+msg.getValue());
//如果消息處理失敗,重試消息
if(/*處理失敗的條件*/){
consumer.negativeAcknowledge(msg);
}else{
//確認(rèn)消息
consumer.acknowledge(msg);
}在這個例子中,maxUnackedMessages和ackTimeout是用來控制消息重試策略的參數(shù)。maxUnackedMessages定義了消費(fèi)者可以同時處理的消息數(shù)量,而ackTimeout定義了消費(fèi)者處理消息的超時時間。如果消費(fèi)者在ackTimeout時間內(nèi)沒有確認(rèn)消息,Pulsar會自動將消息重發(fā)給其他消費(fèi)者。通過這些高級訂閱模式和重試策略的使用,可以構(gòu)建出更健壯、更靈活的消息處理系統(tǒng)。在實際應(yīng)用中,應(yīng)根據(jù)業(yè)務(wù)需求和系統(tǒng)性能,合理選擇和調(diào)整訂閱模式和重試策略,以達(dá)到最佳的消息處理效果。5實踐案例5.1使用Pulsar處理高并發(fā)場景在處理高并發(fā)場景時,Pulsar消息隊列因其高性能和可擴(kuò)展性成為許多企業(yè)的首選。Pulsar支持多種訂閱模式,包括獨(dú)占(Exclusive)、共享(Shared)、鍵共享(Key_Shared)和失敗重試(Failover),這些模式可以靈活地滿足不同場景下的需求。5.1.1獨(dú)占訂閱模式獨(dú)占訂閱模式下,一個主題只能被一個消費(fèi)者訂閱。如果多個消費(fèi)者嘗試訂閱同一主題,只有第一個訂閱者能夠成功接收消息,其余的將被拒絕。這種模式適用于需要確保消息只被一個消費(fèi)者處理的場景。示例代碼//創(chuàng)建一個Pulsar客戶端
PulsarClientclient=PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();
//創(chuàng)建一個獨(dú)占訂閱的消費(fèi)者
Consumer<String>consumer=client.newConsumer(Schema.STRING)
.topic("persistent://sample/standalone/ns/my-topic")
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Exclusive)
.subscribe();
//消費(fèi)消息
Message<String>msg=consumer.receive();
System.out.println("Receivedmessage:"+msg.getValue());
//確認(rèn)消息已處理
consumer.acknowledge(msg);
//關(guān)閉消費(fèi)者和客戶端
consumer.close();
client.close();5.1.2共享訂閱模式共享訂閱模式下,多個消費(fèi)者可以訂閱同一主題,消息會被均勻地分發(fā)給所有訂閱者。這種模式適用于需要水平擴(kuò)展消費(fèi)者以處理更多消息的場景。示例代碼//創(chuàng)建一個Pulsar客戶端
PulsarClientclient=PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();
//創(chuàng)建一個共享訂閱的消費(fèi)者
Consumer<String>consumer=client.newConsumer(Schema.STRING)
.topic("persistent://sample/standalone/ns/my-topic")
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Shared)
.subscribe();
//消費(fèi)消息
Message<String>msg=consumer.receive();
System.out.println("Receivedmessage:"+msg.getValue());
//確認(rèn)消息已處理
consumer.acknowledge(msg);
//關(guān)閉消費(fèi)者和客戶端
consumer.close();
client.close();5.1.3鍵共享訂閱模式鍵共享訂閱模式下,消息根據(jù)消息鍵(如果存在)被分發(fā)到特定的消費(fèi)者。這種模式適用于需要根據(jù)消息內(nèi)容進(jìn)行路由的場景,例如,根據(jù)用戶ID將消息路由到處理該用戶請求的消費(fèi)者。示例代碼//創(chuàng)建一個Pulsar客戶端
PulsarClientclient=PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();
//創(chuàng)建一個鍵共享訂閱的消費(fèi)者
Consumer<String>consumer=client.newConsumer(Schema.STRING)
.topic("persistent://sample/standalone/ns/my-topic")
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.KeyShared)
.subscribe();
//消費(fèi)消息
Message<String>msg=consumer.receive();
System.out.println("Receivedmessage:"+msg.getValue());
//確認(rèn)消息已處理
consumer.acknowledge(msg);
//關(guān)閉消費(fèi)者和客戶端
consumer.close();
client.close();5.1.4失敗重試訂閱模式失敗重試訂閱模式下,如果一個消費(fèi)者無法處理消息,消息會被重新分發(fā)給其他訂閱者。這種模式適用于需要高可用性和容錯性的場景。示例代碼//創(chuàng)建一個Pulsar客戶端
PulsarClientclient=PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();
//創(chuàng)建一個失敗重試訂閱的消費(fèi)者
Consumer<String>consumer=client.new
溫馨提示
- 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
- 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
- 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內(nèi)容里面會有圖紙預(yù)覽,若沒有圖紙預(yù)覽就沒有圖紙。
- 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
- 5. 人人文庫網(wǎng)僅提供信息存儲空間,僅對用戶上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對任何下載內(nèi)容負(fù)責(zé)。
- 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請與我們聯(lián)系,我們立即糾正。
- 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時也不承擔(dān)用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。
最新文檔
- 二零二五年度新能源行業(yè)銷售人員2025年度勞動合同2篇
- 2025年住房公積金租房提取政策執(zhí)行效果評估合同3篇
- 二零二五年度農(nóng)村土地互換及農(nóng)業(yè)科技創(chuàng)新協(xié)議書
- 二零二五年度農(nóng)村房屋贈與合同附農(nóng)業(yè)科技研發(fā)合作協(xié)議
- 二零二五年度醫(yī)療影像設(shè)備加工承攬合同3篇
- 二零二五年度公司租賃車輛駕駛?cè)藛T考核及培訓(xùn)協(xié)議2篇
- 二零二五年度公司與自然人環(huán)保項目合作協(xié)議3篇
- 二零二五年度智能家電產(chǎn)品開發(fā)合作協(xié)議書2篇
- 2025年度網(wǎng)約貨車司機(jī)兼職服務(wù)協(xié)議3篇
- 2025年度環(huán)保型機(jī)械研發(fā)與生產(chǎn)合作協(xié)議3篇
- GB/T 1094.7-2024電力變壓器第7部分:油浸式電力變壓器負(fù)載導(dǎo)則
- 2025版國家開放大學(xué)法律事務(wù)??啤斗勺稍兣c調(diào)解》期末紙質(zhì)考試單項選擇題題庫
- 2024小學(xué)數(shù)學(xué)義務(wù)教育新課程標(biāo)準(zhǔn)(2022版)必考題庫附含答案
- DB32/T 2283-2024 公路工程水泥攪拌樁成樁質(zhì)量檢測規(guī)程
- 2,3-二甲苯酚的理化性質(zhì)及危險特性表
- 申報職稱:副教授演示課件
- 型濾池計算說明書
- 格力離心機(jī)技術(shù)服務(wù)手冊
- 水泥攪拌樁計算(完美)
- 旭化成離子交換膜的介紹
- JJRB輕鋼龍骨隔墻施工方案要點
評論
0/150
提交評論