版權(quán)說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請進(jìn)行舉報或認(rèn)領(lǐng)
文檔簡介
消息隊列:RabbitMQ:消息隊列基礎(chǔ)概念1消息隊列基礎(chǔ)1.1消息隊列簡介1.1.1消息隊列的歷史消息隊列的概念起源于20世紀(jì)70年代的IBM,最初是為了解決大型機(jī)之間的通信問題。隨著計算機(jī)技術(shù)的發(fā)展,消息隊列逐漸被應(yīng)用于分布式系統(tǒng)中,成為解決異步通信、服務(wù)解耦、流量削峰等場景的重要工具。在互聯(lián)網(wǎng)時代,消息隊列因其高可用、高并發(fā)、高效率的特性,被廣泛應(yīng)用于各種復(fù)雜系統(tǒng)架構(gòu)中,如電子商務(wù)、金融交易、大數(shù)據(jù)處理等。1.1.2消息隊列的作用消息隊列在現(xiàn)代軟件架構(gòu)中扮演著關(guān)鍵角色,主要作用包括:異步處理:允許發(fā)送者和接收者異步通信,提高系統(tǒng)響應(yīng)速度。服務(wù)解耦:通過消息隊列,服務(wù)之間可以獨立開發(fā)、部署和擴(kuò)展,降低系統(tǒng)間的耦合度。流量削峰:在高并發(fā)場景下,消息隊列可以作為緩沖,避免后端系統(tǒng)因瞬時大量請求而崩潰。數(shù)據(jù)持久化:消息隊列可以保證消息的持久化存儲,即使接收者暫時不可用,消息也不會丟失。消息路由:支持消息的靈活路由,可以根據(jù)不同的規(guī)則將消息發(fā)送到不同的接收者。1.2RabbitMQ介紹1.2.1RabbitMQ的歷史背景RabbitMQ最初由LShift公司于2007年開發(fā),基于Erlang語言實現(xiàn)。它遵循AMQP(AdvancedMessageQueuingProtocol)標(biāo)準(zhǔn),是一個開源的消息隊列服務(wù)。2010年,RabbitMQ被PivotalSoftware收購,隨后成為CloudFoundry平臺的一部分,進(jìn)一步推動了其在企業(yè)級應(yīng)用中的普及。RabbitMQ因其穩(wěn)定性和可擴(kuò)展性,被全球眾多大型企業(yè)采用,包括Cisco、eBay、Adobe等。1.2.2RabbitMQ的特點與優(yōu)勢RabbitMQ作為一款成熟的消息隊列服務(wù),具有以下顯著特點和優(yōu)勢:高可用性:支持集群部署,可以實現(xiàn)消息的自動重路由和故障轉(zhuǎn)移,確保服務(wù)的連續(xù)性和消息的可靠性。靈活的消息路由:支持多種消息路由模式,包括直接路由(Direct)、主題路由(Topic)、頭部分發(fā)(Headers)等,滿足不同場景的需求。消息持久化:可以將消息存儲在磁盤上,即使服務(wù)器重啟,消息也不會丟失。多語言支持:提供了多種語言的客戶端庫,包括Java、Python、C#等,方便不同開發(fā)團(tuán)隊的集成。易于管理和監(jiān)控:提供了Web界面和API,可以方便地管理和監(jiān)控隊列、交換機(jī)、連接等狀態(tài)。1.3示例:使用Python與RabbitMQ交互下面是一個使用Python與RabbitMQ交互的簡單示例,包括發(fā)送和接收消息的基本流程。1.3.1發(fā)送消息importpika
#建立與RabbitMQ的連接
connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel=connection.channel()
#聲明一個隊列
channel.queue_declare(queue='hello')
#發(fā)送消息
channel.basic_publish(exchange='',
routing_key='hello',
body='HelloRabbitMQ!')
print("[x]Sent'HelloRabbitMQ!'")
connection.close()1.3.2接收消息importpika
defcallback(ch,method,properties,body):
print("[x]Received%r"%body)
#建立與RabbitMQ的連接
connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel=connection.channel()
#聲明一個隊列
channel.queue_declare(queue='hello')
#設(shè)置隊列的回調(diào)函數(shù)
channel.basic_consume(queue='hello',
on_message_callback=callback,
auto_ack=True)
print('[*]Waitingformessages.ToexitpressCTRL+C')
channel.start_consuming()在這個示例中,我們首先使用pika庫建立與RabbitMQ服務(wù)器的連接。然后,我們聲明一個名為hello的隊列。在發(fā)送消息的示例中,我們使用basic_publish方法將消息發(fā)送到隊列中。在接收消息的示例中,我們定義了一個回調(diào)函數(shù)callback,當(dāng)隊列中有消息時,這個函數(shù)會被調(diào)用,處理接收到的消息。通過basic_consume方法,我們設(shè)置隊列的回調(diào)函數(shù),并開始消費隊列中的消息。通過這個示例,我們可以看到RabbitMQ的基本使用流程,包括連接、隊列聲明、消息發(fā)送和接收。RabbitMQ的強(qiáng)大功能和靈活性使其成為處理復(fù)雜消息傳遞場景的理想選擇。2RabbitMQ核心概念2.1交換機(jī)2.1.1交換機(jī)的作用交換機(jī)在RabbitMQ中扮演著消息分發(fā)的角色,它接收來自生產(chǎn)者的消息,然后根據(jù)配置的規(guī)則將消息發(fā)送到一個或多個隊列中。交換機(jī)的存在使得RabbitMQ能夠?qū)崿F(xiàn)更復(fù)雜的消息路由邏輯,而不僅僅是簡單的點對點通信。2.1.2交換機(jī)的類型RabbitMQ支持多種類型的交換機(jī),每種類型都有其特定的路由規(guī)則:DirectExchange:直接交換機(jī),消息根據(jù)綁定的路由鍵(routingkey)直接發(fā)送到隊列。FanoutExchange:扇出交換機(jī),將所有接收到的消息廣播到所有綁定的隊列,類似于發(fā)布/訂閱模式。TopicExchange:主題交換機(jī),支持模式匹配的路由鍵,可以實現(xiàn)更靈活的消息路由。HeadersExchange:頭信息交換機(jī),不使用路由鍵,而是通過消息頭信息進(jìn)行路由。示例:使用DirectExchangeimportpika
#連接到RabbitMQ服務(wù)器
connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel=connection.channel()
#聲明一個直接類型的交換機(jī)
channel.exchange_declare(exchange='direct_logs',exchange_type='direct')
#聲明隊列并綁定到交換機(jī)
channel.queue_declare(queue='error')
channel.queue_bind(exchange='direct_logs',queue='error',routing_key='error')
#發(fā)送消息
channel.basic_publish(exchange='direct_logs',routing_key='error',body='Criticalerroroccurred')
#關(guān)閉連接
connection.close()在這個例子中,我們創(chuàng)建了一個名為direct_logs的直接類型交換機(jī),并聲明了一個名為error的隊列,然后將隊列綁定到交換機(jī)上,使用路由鍵error。當(dāng)消息被發(fā)送到direct_logs交換機(jī)時,只有綁定的error隊列會接收到消息。2.2隊列2.2.1隊列的創(chuàng)建與管理隊列是消息的容器,生產(chǎn)者將消息發(fā)送到隊列,消費者從隊列中讀取消息。隊列可以被創(chuàng)建、綁定到交換機(jī)、以及被刪除。創(chuàng)建隊列importpika
connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel=connection.channel()
#聲明隊列
channel.queue_declare(queue='hello')
#發(fā)送消息到隊列
channel.basic_publish(exchange='',routing_key='hello',body='HelloWorld!')
#關(guān)閉連接
connection.close()在這個例子中,我們創(chuàng)建了一個名為hello的隊列,并發(fā)送了一條消息HelloWorld!到這個隊列。2.2.2隊列的持久化策略RabbitMQ提供了隊列的持久化策略,以確保消息在服務(wù)器重啟后不會丟失。持久化可以通過聲明隊列時設(shè)置durable=True來實現(xiàn)。示例:持久化隊列importpika
connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel=connection.channel()
#聲明一個持久化隊列
channel.queue_declare(queue='hello',durable=True)
#發(fā)送持久化消息
channel.basic_publish(exchange='',routing_key='hello',body='HelloWorld!',properties=pika.BasicProperties(delivery_mode=2))
#關(guān)閉連接
connection.close()在這個例子中,我們聲明了一個持久化的隊列hello,并通過設(shè)置delivery_mode=2來發(fā)送持久化消息。2.3消息路由2.3.1消息的發(fā)布與訂閱模式發(fā)布與訂閱模式允許消息被廣播到所有訂閱的隊列,這在需要將消息發(fā)送給多個消費者時非常有用。示例:FanoutExchangeimportpika
connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel=connection.channel()
#聲明一個扇出類型的交換機(jī)
channel.exchange_declare(exchange='logs',exchange_type='fanout')
#聲明隊列并綁定到交換機(jī)
channel.queue_declare(queue='log1')
channel.queue_bind(exchange='logs',queue='log1')
channel.queue_declare(queue='log2')
channel.queue_bind(exchange='logs',queue='log2')
#發(fā)送消息
channel.basic_publish(exchange='logs',routing_key='',body='Logmessage')
#關(guān)閉連接
connection.close()在這個例子中,我們創(chuàng)建了一個名為logs的扇出類型交換機(jī),并聲明了兩個隊列l(wèi)og1和log2,然后將這兩個隊列都綁定到交換機(jī)上。當(dāng)消息被發(fā)送到logs交換機(jī)時,所有綁定的隊列都會接收到這條消息。2.3.2消息的路由模式路由模式允許消息根據(jù)特定的路由鍵被發(fā)送到特定的隊列,這在需要根據(jù)消息內(nèi)容進(jìn)行精確路由時非常有用。示例:使用TopicExchangeimportpika
connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel=connection.channel()
#聲明一個主題類型的交換機(jī)
channel.exchange_declare(exchange='topic_logs',exchange_type='topic')
#聲明隊列并綁定到交換機(jī)
channel.queue_declare(queue='kern.error')
channel.queue_bind(exchange='topic_logs',queue='kern.error',routing_key='kern.*')
channel.queue_declare(queue='work')
channel.queue_bind(exchange='topic_logs',queue='work',routing_key='*.network')
#發(fā)送消息
channel.basic_publish(exchange='topic_logs',routing_key='kern.critical',body='Kernelcriticalerror')
#關(guān)閉連接
connection.close()在這個例子中,我們創(chuàng)建了一個名為topic_logs的主題類型交換機(jī),并聲明了兩個隊列kern.error和work,然后將隊列綁定到交換機(jī)上,使用模式匹配的路由鍵。當(dāng)消息被發(fā)送到topic_logs交換機(jī)時,只有匹配到的隊列會接收到消息。例如,路由鍵kern.critical會匹配到kern.error隊列,而work隊列則不會接收到這條消息。3RabbitMQ工作模式3.1簡單模式3.1.1發(fā)送者與接收者在RabbitMQ的簡單模式中,發(fā)送者直接將消息發(fā)送到隊列,接收者從隊列中獲取消息。這種模式是最基礎(chǔ)的,適用于一對一的通信場景。代碼示例發(fā)送者代碼示例:importpika
#建立與RabbitMQ的連接
connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel=connection.channel()
#聲明隊列
channel.queue_declare(queue='hello')
#發(fā)送消息
channel.basic_publish(exchange='',
routing_key='hello',
body='HelloWorld!')
print("[x]Sent'HelloWorld!'")
#關(guān)閉連接
connection.close()接收者代碼示例:importpika
defcallback(ch,method,properties,body):
print("[x]Received%r"%body)
#建立與RabbitMQ的連接
connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel=connection.channel()
#聲明隊列
channel.queue_declare(queue='hello')
#開始消費消息
channel.basic_consume(queue='hello',
on_message_callback=callback,
auto_ack=True)
print('[*]Waitingformessages.ToexitpressCTRL+C')
channel.start_consuming()3.1.2工作隊列模式負(fù)載均衡的概念工作隊列模式是RabbitMQ中用于實現(xiàn)負(fù)載均衡的一種方式。在這種模式下,多個接收者可以同時從隊列中獲取消息,RabbitMQ會將消息均勻地分發(fā)給不同的接收者,確保工作負(fù)載的均衡分配。工作隊列的實現(xiàn)發(fā)送者代碼示例:importpika
#建立與RabbitMQ的連接
connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel=connection.channel()
#聲明隊列
channel.queue_declare(queue='task_queue',durable=True)
#發(fā)送消息
message='Aheavytask'
channel.basic_publish(exchange='',
routing_key='task_queue',
body=message,
properties=pika.BasicProperties(
delivery_mode=2,#makemessagepersistent
))
print("[x]Sent%r"%message)
#關(guān)閉連接
connection.close()接收者代碼示例:importpika
importtime
defcallback(ch,method,properties,body):
print("[x]Received%r"%body)
time.sleep(body.count(b'.'))
print("[x]Done")
ch.basic_ack(delivery_tag=method.delivery_tag)
#建立與RabbitMQ的連接
connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel=connection.channel()
#聲明隊列
channel.queue_declare(queue='task_queue',durable=True)
#開始消費消息
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='task_queue',
on_message_callback=callback)
print('[*]Waitingformessages.ToexitpressCTRL+C')
channel.start_consuming()3.1.3發(fā)布確認(rèn)模式發(fā)布確認(rèn)的原理發(fā)布確認(rèn)模式是RabbitMQ中用于確保消息可靠傳輸?shù)囊环N機(jī)制。當(dāng)發(fā)送者將消息發(fā)送到RabbitMQ時,它會等待RabbitMQ的確認(rèn)。如果RabbitMQ確認(rèn)消息已接收,發(fā)送者將繼續(xù)發(fā)送下一條消息;如果未收到確認(rèn),發(fā)送者將重新發(fā)送消息。發(fā)布確認(rèn)的代碼實現(xiàn)發(fā)送者代碼示例:importpika
#建立與RabbitMQ的連接
connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel=connection.channel()
#開啟發(fā)布確認(rèn)
channel.confirm_delivery()
#發(fā)送消息
message='HelloWorld!'
channel.basic_publish(exchange='',
routing_key='confirm_queue',
body=message)
print("[x]Sent%r"%message)
#檢查確認(rèn)狀態(tài)
whileTrue:
ifchannel._delivery_confirmation:
print("[x]Confirmreceived")
break
else:
print("[x]Waitingforconfirm")
time.sleep(1)
#關(guān)閉連接
connection.close()接收者代碼示例:importpika
defcallback(ch,method,properties,body):
print("[x]Received%r"%body)
ch.basic_ack(delivery_tag=method.delivery_tag)
#建立與RabbitMQ的連接
connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel=connection.channel()
#聲明隊列
channel.queue_declare(queue='confirm_queue')
#開始消費消息
channel.basic_consume(queue='confirm_queue',
on_message_callback=callback)
print('[*]Waitingformessages.ToexitpressCTRL+C')
channel.start_consuming()以上代碼示例展示了如何在RabbitMQ中實現(xiàn)簡單模式、工作隊列模式和發(fā)布確認(rèn)模式。通過這些示例,你可以理解RabbitMQ的基本工作原理和如何在實際應(yīng)用中使用這些模式。4RabbitMQ高級特性4.1事務(wù)與確認(rèn)4.1.1事務(wù)的使用在RabbitMQ中,事務(wù)提供了一種確保消息處理可靠性的機(jī)制。通過開啟事務(wù),可以確保消息在被確認(rèn)前的任何操作(如發(fā)布、取消發(fā)布等)要么全部成功,要么全部失敗,從而保證數(shù)據(jù)的一致性。示例代碼importpika
#建立連接
connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel=connection.channel()
#開啟事務(wù)
channel.confirm_delivery()
#發(fā)布消息
try:
channel.basic_publish(exchange='',routing_key='queue_name',body='HelloWorld!')
#提交事務(wù)
channel.tx_commit()
exceptExceptionase:
#回滾事務(wù)
channel.tx_rollback()
print(f"Erroroccurred:{e}")
#關(guān)閉連接
connection.close()解釋在上述代碼中,我們首先通過pika庫建立了一個到RabbitMQ服務(wù)器的連接。然后,我們調(diào)用confirm_delivery方法開啟事務(wù)模式,這意味著所有在事務(wù)模式下的操作都會被跟蹤,直到事務(wù)被提交或回滾。在事務(wù)模式下,我們嘗試發(fā)布一條消息到隊列queue_name。如果消息發(fā)布成功,我們調(diào)用tx_commit方法提交事務(wù);如果在發(fā)布過程中發(fā)生任何異常,我們調(diào)用tx_rollback方法回滾事務(wù),確保不會有任何消息被錯誤地處理。4.1.2確認(rèn)機(jī)制的深入理解確認(rèn)機(jī)制是RabbitMQ中用于確保消息可靠傳遞的關(guān)鍵特性。當(dāng)消息被發(fā)布到隊列后,RabbitMQ會等待消費者確認(rèn)消息的接收和處理。如果消費者在接收消息后沒有發(fā)送確認(rèn),RabbitMQ會將消息重新發(fā)布,以防止數(shù)據(jù)丟失。示例代碼importpika
defcallback(ch,method,properties,body):
print("Receivedmessage:",body.decode())
#模擬消息處理
#...
#確認(rèn)消息
ch.basic_ack(delivery_tag=method.delivery_tag)
#建立連接
connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel=connection.channel()
#聲明隊列
channel.queue_declare(queue='queue_name')
#設(shè)置確認(rèn)模式
channel.basic_qos(prefetch_count=1)
#開始消費
channel.basic_consume(queue='queue_name',on_message_callback=callback)
print('Waitingformessages.ToexitpressCTRL+C')
channel.start_consuming()解釋在本例中,我們定義了一個callback函數(shù),該函數(shù)在接收到消息時被調(diào)用。在callback函數(shù)中,我們首先打印接收到的消息,然后模擬消息的處理過程。處理完成后,我們通過調(diào)用basic_ack方法確認(rèn)消息的接收和處理。basic_qos方法被用來設(shè)置確認(rèn)模式,其中prefetch_count=1表示RabbitMQ在等待消費者確認(rèn)前,只會發(fā)送一條消息給消費者,這有助于防止消費者在處理消息時因異常而丟失消息。4.2死信隊列4.2.1死信隊列的原理死信隊列(DeadLetterQueue,DLQ)是RabbitMQ中用于處理無法被正常消費的消息的隊列。當(dāng)消息在原隊列中達(dá)到最大重試次數(shù)、過期、或被顯式標(biāo)記為無法處理時,這些消息會被轉(zhuǎn)移到DLQ中,以便進(jìn)行進(jìn)一步的處理或分析。4.2.2死信隊列的配置要配置DLQ,需要在原隊列的聲明中設(shè)置arguments參數(shù),其中包含x-dead-letter-exchange和x-dead-letter-routing-key,分別指定死信交換機(jī)和死信隊列的路由鍵。示例代碼importpika
#建立連接
connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel=connection.channel()
#聲明死信交換機(jī)
channel.exchange_declare(exchange='dlx_exchange',exchange_type='direct')
#聲明死信隊列
channel.queue_declare(queue='dlq_queue')
#將死信隊列綁定到死信交換機(jī)
channel.queue_bind(exchange='dlx_exchange',queue='dlq_queue',routing_key='dlq_routing_key')
#聲明原隊列,并設(shè)置死信隊列參數(shù)
channel.queue_declare(queue='original_queue',arguments={
'x-dead-letter-exchange':'dlx_exchange',
'x-dead-letter-routing-key':'dlq_routing_key'
})
#發(fā)布消息
channel.basic_publish(exchange='',routing_key='original_queue',body='HelloWorld!')
#關(guān)閉連接
conne
溫馨提示
- 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
- 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
- 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內(nèi)容里面會有圖紙預(yù)覽,若沒有圖紙預(yù)覽就沒有圖紙。
- 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
- 5. 人人文庫網(wǎng)僅提供信息存儲空間,僅對用戶上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對任何下載內(nèi)容負(fù)責(zé)。
- 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請與我們聯(lián)系,我們立即糾正。
- 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時也不承擔(dān)用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。
最新文檔
- 幼兒園防騙防拐演練
- 知榮辱課件教學(xué)課件
- 食品安全與健康相關(guān)
- 退行性脊椎病X線
- 酶促反應(yīng)原理臨床治療
- DB1304T 488-2024大麗花露地栽培技術(shù)規(guī)程
- 聰聰課件 教學(xué)課件
- 高溫燙傷應(yīng)急預(yù)案演練
- 肺全切術(shù)后護(hù)理查房
- 運動治療儀器及使用方法
- 在高三學(xué)生月考總結(jié)表彰會上的講話
- 高價值醫(yī)療設(shè)備產(chǎn)品定價過程
- 保險行業(yè)創(chuàng)說會-課件
- 初中語文-江城子·密州出獵蘇軾教學(xué)設(shè)計學(xué)情分析教材分析課后反思
- -讓生活更美好 作文批改評語
- 超星爾雅《百年風(fēng)流人物:曾國藩》課程完整答案
- 離線論文 關(guān)于科學(xué)思維方法在實際生活和工作中的應(yīng)用、意義
- GK1C內(nèi)燃機(jī) 操作規(guī)程
- 梅嶺三章導(dǎo)學(xué)案
- 登桿培訓(xùn)材料
- 手術(shù)室護(hù)理風(fēng)險防范措施
評論
0/150
提交評論