消息隊(duì)列:RabbitMQ:RabbitMQ性能調(diào)優(yōu)與監(jiān)控_第1頁(yè)
消息隊(duì)列:RabbitMQ:RabbitMQ性能調(diào)優(yōu)與監(jiān)控_第2頁(yè)
消息隊(duì)列:RabbitMQ:RabbitMQ性能調(diào)優(yōu)與監(jiān)控_第3頁(yè)
消息隊(duì)列:RabbitMQ:RabbitMQ性能調(diào)優(yōu)與監(jiān)控_第4頁(yè)
消息隊(duì)列:RabbitMQ:RabbitMQ性能調(diào)優(yōu)與監(jiān)控_第5頁(yè)
已閱讀5頁(yè),還剩10頁(yè)未讀, 繼續(xù)免費(fèi)閱讀

下載本文檔

版權(quán)說(shuō)明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請(qǐng)進(jìn)行舉報(bào)或認(rèn)領(lǐng)

文檔簡(jiǎn)介

消息隊(duì)列:RabbitMQ:RabbitMQ性能調(diào)優(yōu)與監(jiān)控1RabbitMQ基礎(chǔ)概念1.1消息隊(duì)列簡(jiǎn)介消息隊(duì)列是一種應(yīng)用程序間通信的模式,它允許消息的發(fā)送和接收在不同的時(shí)間點(diǎn)進(jìn)行。這種模式在分布式系統(tǒng)中非常有用,因?yàn)樗梢越怦罘?wù),提高系統(tǒng)的可擴(kuò)展性和容錯(cuò)性。消息隊(duì)列通常用于異步處理、負(fù)載均衡、微服務(wù)通信和日志處理等場(chǎng)景。1.1.1為什么使用消息隊(duì)列解耦:消息隊(duì)列可以將生產(chǎn)者和消費(fèi)者解耦,使得兩者可以獨(dú)立開(kāi)發(fā)和部署。異步處理:消息隊(duì)列允許生產(chǎn)者異步發(fā)送消息,消費(fèi)者在有空閑資源時(shí)處理消息,提高系統(tǒng)響應(yīng)速度。負(fù)載均衡:消息隊(duì)列可以將任務(wù)均勻地分配給多個(gè)消費(fèi)者,實(shí)現(xiàn)負(fù)載均衡。冗余:消息隊(duì)列可以存儲(chǔ)消息,即使消費(fèi)者暫時(shí)不可用,消息也不會(huì)丟失。擴(kuò)展性:通過(guò)增加消費(fèi)者數(shù)量,可以輕松地?cái)U(kuò)展系統(tǒng)的處理能力。1.2RabbitMQ工作原理RabbitMQ是一個(gè)開(kāi)源的消息代理和隊(duì)列服務(wù)器,實(shí)現(xiàn)AMQP(AdvancedMessageQueuingProtocol)協(xié)議。它提供了一種在分布式系統(tǒng)中存儲(chǔ)和轉(zhuǎn)發(fā)消息的可靠方式。1.2.1工作流程生產(chǎn)者將消息發(fā)送到交換器(Exchange)。交換器根據(jù)規(guī)則將消息路由到一個(gè)或多個(gè)隊(duì)列(Queue)。消費(fèi)者從隊(duì)列中拉取消息進(jìn)行處理。1.2.2交換器類(lèi)型RabbitMQ支持多種類(lèi)型的交換器,包括:-Direct:基于消息的路由鍵直接將消息路由到隊(duì)列。-Fanout:將消息廣播到所有綁定的隊(duì)列。-Topic:基于消息的路由鍵和隊(duì)列的綁定鍵進(jìn)行模式匹配。-Headers:基于消息頭進(jìn)行路由,較少使用。1.2.3隊(duì)列隊(duì)列是消息的容器,消息在被消費(fèi)者消費(fèi)前存儲(chǔ)在隊(duì)列中。隊(duì)列可以設(shè)置持久化,確保消息不會(huì)因?yàn)镽abbitMQ服務(wù)重啟而丟失。1.2.4消費(fèi)者消費(fèi)者是處理消息的應(yīng)用程序。消費(fèi)者可以設(shè)置為自動(dòng)確認(rèn)或手動(dòng)確認(rèn)消息,以確保消息被正確處理。1.3RabbitMQ核心組件理解1.3.1交換器(Exchange)交換器是RabbitMQ的核心組件之一,它負(fù)責(zé)接收生產(chǎn)者發(fā)送的消息,并根據(jù)規(guī)則將消息路由到一個(gè)或多個(gè)隊(duì)列。交換器的類(lèi)型決定了消息的路由規(guī)則。示例代碼:創(chuàng)建Direct類(lèi)型的交換器importpika

connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel=connection.channel()

#創(chuàng)建Direct類(lèi)型的交換器

channel.exchange_declare(exchange='direct_logs',exchange_type='direct')

#發(fā)送消息到交換器

message="Info:HelloRabbitMQ!"

channel.basic_publish(exchange='direct_logs',routing_key='info',body=message)

connection.close()1.3.2隊(duì)列(Queue)隊(duì)列是消息的臨時(shí)存儲(chǔ)空間。在RabbitMQ中,隊(duì)列是消息的最終目的地,消費(fèi)者從隊(duì)列中拉取消息進(jìn)行處理。示例代碼:創(chuàng)建隊(duì)列并綁定到交換器importpika

connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel=connection.channel()

#創(chuàng)建隊(duì)列

channel.queue_declare(queue='task_queue',durable=True)

#將隊(duì)列綁定到Direct類(lèi)型的交換器

channel.queue_bind(exchange='direct_logs',queue='task_queue',routing_key='info')

connection.close()1.3.3消費(fèi)者(Consumer)消費(fèi)者是處理消息的應(yīng)用程序。在RabbitMQ中,消費(fèi)者通過(guò)監(jiān)聽(tīng)隊(duì)列來(lái)接收消息。示例代碼:消費(fèi)者從隊(duì)列中拉取消息importpika

defcallback(ch,method,properties,body):

print("Received%r"%body)

#手動(dòng)確認(rèn)消息

ch.basic_ack(delivery_tag=method.delivery_tag)

connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel=connection.channel()

#聲明隊(duì)列,確保隊(duì)列存在

channel.queue_declare(queue='task_queue',durable=True)

#開(kāi)始消費(fèi)消息

channel.basic_consume(queue='task_queue',on_message_callback=callback)

print('Waitingformessages.ToexitpressCTRL+C')

channel.start_consuming()1.3.4生產(chǎn)者(Producer)生產(chǎn)者是發(fā)送消息的應(yīng)用程序。在RabbitMQ中,生產(chǎn)者將消息發(fā)送到交換器,由交換器決定消息的去向。示例代碼:生產(chǎn)者發(fā)送消息到隊(duì)列importpika

connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel=connection.channel()

#聲明隊(duì)列,確保隊(duì)列存在

channel.queue_declare(queue='hello')

#發(fā)送消息到隊(duì)列

channel.basic_publish(exchange='',routing_key='hello',body='HelloWorld!')

print("[x]Sent'HelloWorld!'")

connection.close()通過(guò)以上核心組件的理解和示例代碼,我們可以看到RabbitMQ如何在生產(chǎn)者和消費(fèi)者之間傳遞消息,以及如何通過(guò)交換器和隊(duì)列實(shí)現(xiàn)消息的路由和存儲(chǔ)。這些組件的靈活組合,使得RabbitMQ能夠滿足各種復(fù)雜的消息傳遞需求。2消息隊(duì)列性能調(diào)優(yōu)與監(jiān)控:RabbitMQ2.1消息隊(duì)列性能瓶頸分析2.1.1理解性能瓶頸在消息隊(duì)列系統(tǒng)中,性能瓶頸可能出現(xiàn)在多個(gè)層面,包括但不限于網(wǎng)絡(luò)延遲、消息處理速度、磁盤(pán)I/O、內(nèi)存使用、CPU負(fù)載等。識(shí)別這些瓶頸是優(yōu)化RabbitMQ性能的第一步。2.1.2分析工具RabbitMQManagementUI:提供詳細(xì)的系統(tǒng)監(jiān)控信息,包括隊(duì)列、交換機(jī)、連接、通道的狀態(tài)和統(tǒng)計(jì)。Prometheus:與RabbitMQManagementUI集成,收集監(jiān)控?cái)?shù)據(jù),用于更深入的性能分析。Grafana:用于可視化Prometheus收集的數(shù)據(jù),幫助理解系統(tǒng)性能趨勢(shì)。2.1.3常見(jiàn)瓶頸網(wǎng)絡(luò)延遲:通過(guò)監(jiān)控網(wǎng)絡(luò)延遲,確保消息的發(fā)送和接收效率。消息處理速度:分析消費(fèi)者處理消息的速度,確保消息隊(duì)列不會(huì)因處理速度慢而積壓。磁盤(pán)I/O:檢查磁盤(pán)讀寫(xiě)速度,優(yōu)化磁盤(pán)配置以提高持久化消息的處理能力。內(nèi)存使用:監(jiān)控內(nèi)存使用情況,避免因內(nèi)存不足導(dǎo)致的消息處理延遲。CPU負(fù)載:確保CPU資源充足,避免因CPU瓶頸導(dǎo)致的系統(tǒng)性能下降。2.2RabbitMQ參數(shù)優(yōu)化2.2.1配置文件調(diào)整RabbitMQ的配置文件(rabbitmq.config)中包含了許多可以調(diào)整的參數(shù),以?xún)?yōu)化性能。示例:調(diào)整隊(duì)列參數(shù)[

{rabbit,[

{queue_master_locator,min_members},

{default_message_ttl,3600000},

{default_disk_free_limit,50000000}

]}

].queue_master_locator:設(shè)置為min_members可以提高集群的可用性。default_message_ttl:設(shè)置消息的過(guò)期時(shí)間,減少不必要的消息存儲(chǔ)。default_disk_free_limit:設(shè)置磁盤(pán)空間的最低限制,避免因磁盤(pán)空間不足導(dǎo)致的性能問(wèn)題。2.2.2調(diào)整運(yùn)行時(shí)參數(shù)通過(guò)RabbitMQ的管理命令,可以在運(yùn)行時(shí)調(diào)整某些參數(shù)。示例:調(diào)整消息確認(rèn)機(jī)制rabbitmqctlset_parameterfederation_upstream_confirmsfalse此命令禁用了消息確認(rèn)機(jī)制,可以提高消息的發(fā)送速度,但會(huì)犧牲一定的消息可靠性。2.3網(wǎng)絡(luò)與硬件調(diào)優(yōu)2.3.1網(wǎng)絡(luò)優(yōu)化減少網(wǎng)絡(luò)延遲:確保RabbitMQ服務(wù)器與客戶端之間的網(wǎng)絡(luò)連接穩(wěn)定,減少延遲。優(yōu)化網(wǎng)絡(luò)配置:調(diào)整網(wǎng)絡(luò)緩沖區(qū)大小,以適應(yīng)高吞吐量的網(wǎng)絡(luò)傳輸。示例:調(diào)整網(wǎng)絡(luò)緩沖區(qū)大小sysctl-wnet.core.rmem_max=104857600

sysctl-wnet.core.wmem_max=104857600這些命令調(diào)整了網(wǎng)絡(luò)接收和發(fā)送緩沖區(qū)的大小,以提高網(wǎng)絡(luò)傳輸效率。2.3.2硬件優(yōu)化增加內(nèi)存:RabbitMQ是基于內(nèi)存的,增加物理內(nèi)存可以顯著提高性能。使用SSD:對(duì)于需要持久化的消息,使用SSD可以大幅減少磁盤(pán)I/O延遲。2.4消息持久化策略2.4.1理解持久化消息持久化是指將消息存儲(chǔ)在磁盤(pán)上,以防止因服務(wù)器重啟或故障導(dǎo)致消息丟失。但持久化會(huì)增加消息處理的延遲。2.4.2優(yōu)化策略使用x-message-ttl:為隊(duì)列設(shè)置過(guò)期時(shí)間,減少磁盤(pán)上的消息存儲(chǔ)量。使用x-max-length:限制隊(duì)列中消息的數(shù)量,避免隊(duì)列無(wú)限增長(zhǎng)。示例:創(chuàng)建持久化隊(duì)列importpika

connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel=connection.channel()

channel.queue_declare(queue='durable_queue',durable=True)此代碼創(chuàng)建了一個(gè)持久化的隊(duì)列,即使RabbitMQ重啟,隊(duì)列中的消息也不會(huì)丟失。2.5集群與高可用性2.5.1集群配置RabbitMQ集群可以提高系統(tǒng)的可用性和擴(kuò)展性,但需要正確配置以避免性能瓶頸。示例:配置RabbitMQ集群rabbitmqctlstop_app

rabbitmqctljoin_clusterrabbit@node1

rabbitmqctlstart_app這些命令用于將當(dāng)前節(jié)點(diǎn)加入到一個(gè)已存在的RabbitMQ集群中。2.5.2高可用性策略鏡像隊(duì)列:確保隊(duì)列中的消息在集群中的多個(gè)節(jié)點(diǎn)上都有副本,提高消息的可用性。故障轉(zhuǎn)移:配置故障轉(zhuǎn)移策略,當(dāng)主節(jié)點(diǎn)不可用時(shí),自動(dòng)切換到備用節(jié)點(diǎn)。示例:創(chuàng)建鏡像隊(duì)列rabbitmqctlset_policyha-all".*"'{"ha-mode":"all"}'此命令設(shè)置了一個(gè)策略,使得所有隊(duì)列都將在集群中的所有節(jié)點(diǎn)上鏡像,提高高可用性。以上內(nèi)容詳細(xì)介紹了RabbitMQ性能調(diào)優(yōu)與監(jiān)控的各個(gè)方面,包括性能瓶頸分析、參數(shù)優(yōu)化、網(wǎng)絡(luò)與硬件調(diào)優(yōu)、消息持久化策略以及集群與高可用性的配置。通過(guò)這些策略的實(shí)施,可以顯著提高RabbitMQ的性能和穩(wěn)定性。3監(jiān)控與管理3.1RabbitMQ監(jiān)控工具介紹RabbitMQ提供了多種監(jiān)控工具,幫助我們實(shí)時(shí)了解其運(yùn)行狀態(tài)和性能。主要工具包括:RabbitMQManagementConsole:一個(gè)基于Web的界面,可以查看和管理RabbitMQ的各個(gè)方面,包括隊(duì)列、交換機(jī)、連接、頻道等。RabbitMQMonitoringPlugin:提供了對(duì)RabbitMQ的詳細(xì)監(jiān)控?cái)?shù)據(jù),如消息速率、內(nèi)存使用、磁盤(pán)使用等。PrometheusExporterPlugin:用于將RabbitMQ的監(jiān)控?cái)?shù)據(jù)以Prometheus格式輸出,便于與Prometheus監(jiān)控系統(tǒng)集成。ErlangShell:通過(guò)命令行直接與RabbitMQ交互,執(zhí)行監(jiān)控和管理命令。3.1.1示例:使用RabbitMQManagementConsole啟動(dòng)RabbitMQManagementConsole:sudorabbitmq-pluginsenablerabbitmq_management訪問(wèn)Web界面:打開(kāi)瀏覽器,訪問(wèn)http://localhost:15672/。登錄,默認(rèn)用戶名和密碼均為guest。查看隊(duì)列信息:在Web界面中,選擇Queues選項(xiàng),可以看到所有隊(duì)列的列表。點(diǎn)擊某個(gè)隊(duì)列,可以查看該隊(duì)列的詳細(xì)信息,包括消息數(shù)量、消息速率等。3.2性能指標(biāo)監(jiān)控性能指標(biāo)監(jiān)控是RabbitMQ調(diào)優(yōu)的關(guān)鍵。主要監(jiān)控指標(biāo)包括:消息速率:發(fā)送和接收消息的速率。內(nèi)存使用:RabbitMQ進(jìn)程的內(nèi)存使用情況。磁盤(pán)使用:用于存儲(chǔ)持久化消息的磁盤(pán)空間使用情況。CPU使用:RabbitMQ進(jìn)程的CPU使用率。網(wǎng)絡(luò)I/O:網(wǎng)絡(luò)輸入輸出的速率。3.2.1示例:使用PrometheusExporterPlugin監(jiān)控RabbitMQ安裝PrometheusExporterPlugin:sudorabbitmq-pluginsenablerabbitmq_prometheus配置Prometheus:在Prometheus的配置文件prometheus.yml中添加RabbitMQ的監(jiān)控目標(biāo)。scrape_configs:

-job_name:'rabbitmq'

static_configs:

-targets:['localhost:15672']查詢(xún)指標(biāo):在Prometheus的查詢(xún)界面中,輸入rabbitmq_queue_messages,可以查看所有隊(duì)列的消息數(shù)量。輸入rabbitmq_connections,可以查看當(dāng)前的連接數(shù)。3.3隊(duì)列與交換機(jī)管理隊(duì)列和交換機(jī)的管理是RabbitMQ的核心操作。包括創(chuàng)建、刪除、綁定等操作。3.3.1示例:使用RabbitMQManagementAPI創(chuàng)建隊(duì)列使用curl創(chuàng)建隊(duì)列:curl-uguest:guest-XPUThttp://localhost:15672/api/queues/%2F/myqueue檢查隊(duì)列:使用curl命令,可以查看創(chuàng)建的隊(duì)列信息。curl-uguest:guesthttp://localhost:15672/api/queues3.4故障排查與日志分析RabbitMQ的日志文件包含了運(yùn)行時(shí)的詳細(xì)信息,對(duì)于故障排查至關(guān)重要。3.4.1示例:分析RabbitMQ日志查看日志:日志文件通常位于/var/log/rabbitmq/rabbit@hostname.log。使用tail-f命令,可以實(shí)時(shí)查看日志的更新。tail-f/var/log/rabbitmq/rabbit@hostname.log日志級(jí)別調(diào)整:通過(guò)修改rabbitmq.config配置文件,可以調(diào)整日志級(jí)別。{rabbit,[{logger,[{level,debug}]}]}.故障排查:當(dāng)RabbitMQ出現(xiàn)故障時(shí),日志中通常會(huì)有錯(cuò)誤信息,如disk_free_limit、memory_limit等。根據(jù)這些信息,可以定位問(wèn)題,如磁盤(pán)空間不足、內(nèi)存溢出等。通過(guò)以上介紹和示例,我們可以有效地監(jiān)控和管理RabbitMQ,確保其穩(wěn)定運(yùn)行,及時(shí)發(fā)現(xiàn)并解決問(wèn)題。4消息隊(duì)列設(shè)計(jì)模式在設(shè)計(jì)消息隊(duì)列系統(tǒng)時(shí),采用合適的設(shè)計(jì)模式至關(guān)重要。這不僅影響系統(tǒng)的性能,還決定了系統(tǒng)的可擴(kuò)展性和可靠性。以下是幾種常見(jiàn)的設(shè)計(jì)模式:4.1發(fā)布/訂閱模式(Publish/Subscribe)4.1.1原理發(fā)布/訂閱模式允許消息的發(fā)布者(Producer)向特定的主題(Topic)發(fā)布消息,而多個(gè)訂閱者(Subscriber)可以訂閱同一主題,接收發(fā)布者發(fā)送的消息。這種模式下,發(fā)布者和訂閱者之間沒(méi)有直接的連接,它們通過(guò)消息隊(duì)列進(jìn)行間接通信。4.1.2示例importpika

#連接到RabbitMQ

connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel=connection.channel()

#聲明一個(gè)交換器

channel.exchange_declare(exchange='logs',exchange_type='fanout')

#生成一個(gè)隨機(jī)的隊(duì)列名稱(chēng)

result=channel.queue_declare(queue='',exclusive=True)

queue_name=result.method.queue

#將隊(duì)列綁定到交換器

channel.queue_bind(exchange='logs',queue=queue_name)

#定義接收消息的回調(diào)函數(shù)

defcallback(ch,method,properties,body):

print("[x]%r"%body)

#開(kāi)始接收消息

channel.basic_consume(queue=queue_name,on_message_callback=callback,auto_ack=True)

print('[*]Waitingformessages.ToexitpressCTRL+C')

channel.start_consuming()在上述代碼中,我們創(chuàng)建了一個(gè)發(fā)布/訂閱模式的消費(fèi)者。它通過(guò)fanout類(lèi)型的交換器接收消息,這種交換器會(huì)將消息廣播到所有綁定的隊(duì)列中。4.2路由模式(Routing)4.2.1原理路由模式允許消息根據(jù)特定的路由鍵(RoutingKey)被發(fā)送到指定的隊(duì)列。發(fā)布者在發(fā)送消息時(shí),需要指定一個(gè)路由鍵,而隊(duì)列在綁定到交換器時(shí),也需要指定一個(gè)路由鍵。只有當(dāng)兩者匹配時(shí),消息才會(huì)被發(fā)送到隊(duì)列。4.2.2示例importpika

#連接到RabbitMQ

connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel=connection.channel()

#聲明一個(gè)直接類(lèi)型的交換器

channel.exchange_declare(exchange='direct_logs',exchange_type='direct')

#聲明隊(duì)列并綁定到交換器

severities=['info','warning','error']

forseverityinseverities:

queue_name=severity

channel.queue_declare(queue=queue_name)

channel.queue_bind(exchange='direct_logs',queue=queue_name,routing_key=severity)

#發(fā)布消息

message='HelloWorld!'

channel.basic_publish(exchange='direct_logs',routing_key='info',body=message)

print("[x]Sent%r:%r"%('info',message))

#關(guān)閉連接

connection.close()在這個(gè)例子中,我們創(chuàng)建了一個(gè)直接類(lèi)型的交換器,并聲明了三個(gè)隊(duì)列,分別綁定到info、warning和error的路由鍵。發(fā)布者在發(fā)送消息時(shí),需要指定一個(gè)路由鍵,這樣消息就會(huì)被發(fā)送到與之匹配的隊(duì)列中。5RabbitMQ在微服務(wù)架構(gòu)中的應(yīng)用在微服務(wù)架構(gòu)中,RabbitMQ作為消息中間件,可以實(shí)現(xiàn)服務(wù)間的異步通信,提高系統(tǒng)的響應(yīng)速度和可擴(kuò)展性。5.1異步處理5.1.1原理微服務(wù)架構(gòu)中的服務(wù)可以將一些耗時(shí)的操作,如發(fā)送郵件、處理大數(shù)據(jù)等,通過(guò)RabbitMQ異步處理。這樣,服務(wù)在接收到請(qǐng)求后,可以立即返回響應(yīng),而將耗時(shí)操作放入隊(duì)列中,由其他服務(wù)或工作進(jìn)程處理。5.1.2示例importpika

#連接到RabbitMQ

connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel=connection.channel()

#聲明隊(duì)列

channel.queue_declare(queue='task_queue',durable=True)

#發(fā)布消息

message='HelloWorld!'

channel.basic_publish(exchange='',routing_key='task_queue',body=message,

properties=pika.BasicProperties(delivery_mode=2,))#makemessagepersistent

print("[x]Sent%r"%message)

#關(guān)閉連接

connection.close()在這個(gè)例子中,我們創(chuàng)建了一個(gè)持久化的隊(duì)列task_queue,并發(fā)送了一條消息。在微服務(wù)架構(gòu)中,這個(gè)隊(duì)列可以用于異步處理任務(wù),如發(fā)送郵件等。5.2事件驅(qū)動(dòng)5.2.1原理在微服務(wù)架構(gòu)中,服務(wù)可以訂閱其他服務(wù)的事件,當(dāng)事件發(fā)生時(shí),訂閱者會(huì)接收到事件,并進(jìn)行相應(yīng)的處理。這種模式可以實(shí)現(xiàn)服務(wù)間的解耦,提高系統(tǒng)的靈活性。5.2.2示例importpika

#連接到RabbitMQ

connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel=connection.channel()

#聲明交換器

channel.exchange_declare(exchange='events',exchange_type='topic')

#聲明隊(duì)列并綁定到交換器

queue_name='event_queue'

channel.queue_declare(queue=queue_name)

channel.queue_bind(exchange='events',queue=queue_name,routing_key='user.*')

#定義接收消息的回調(diào)函數(shù)

defcallback(ch,method,properties,body):

print("[x]%r:%r"%(method.routing_key,body))

#開(kāi)始接收消息

channel.basic_consume(queue=queue_name,on_message_callback=callback,auto_ack=True)

print('[*]Waitingformessages.ToexitpressCTRL+C')

channel.start_consuming()在這個(gè)例子中,我們創(chuàng)建了一個(gè)事件驅(qū)動(dòng)的消費(fèi)者,它訂閱了所有以u(píng)ser.開(kāi)頭的事件。當(dāng)事件發(fā)生時(shí),消費(fèi)者會(huì)接收到事件,并進(jìn)行相應(yīng)的處理。6

溫馨提示

  • 1. 本站所有資源如無(wú)特殊說(shuō)明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請(qǐng)下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請(qǐng)聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁(yè)內(nèi)容里面會(huì)有圖紙預(yù)覽,若沒(méi)有圖紙預(yù)覽就沒(méi)有圖紙。
  • 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
  • 5. 人人文庫(kù)網(wǎng)僅提供信息存儲(chǔ)空間,僅對(duì)用戶上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對(duì)用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對(duì)任何下載內(nèi)容負(fù)責(zé)。
  • 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請(qǐng)與我們聯(lián)系,我們立即糾正。
  • 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時(shí)也不承擔(dān)用戶因使用這些下載資源對(duì)自己和他人造成任何形式的傷害或損失。

最新文檔

評(píng)論

0/150

提交評(píng)論