版權(quán)說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請(qǐng)進(jìn)行舉報(bào)或認(rèn)領(lǐng)
文檔簡介
1、ZeroMQ使用說明1.ZeroMQ介紹ZMQ(0MQ、ZeroMQ,OMQ)套嵌入式的網(wǎng)絡(luò)看起來像是一鏈接庫,但工作起來更像是一個(gè)并發(fā)式的框架,介于應(yīng)用層和傳輸層之間(按照TCP/IP劃分)。它提供的套接字可以在多種協(xié)議中傳輸消息,如線程間、進(jìn)程間、TCP、廣播等。你可以使用套接字構(gòu)建多對(duì)多的連接模式,如扇出、發(fā)布-訂閱、任務(wù)分發(fā)、請(qǐng)求-應(yīng)答等。一開始ZMQ代表零中間件、零延遲,同時(shí),它又有了新的含義:零管理、零成本、零浪費(fèi)??偟膩碚f,零表示最小、最簡,這是貫穿于該項(xiàng)目的哲理。傳統(tǒng)的TCPSocket的連接是1對(duì)1的,可以認(rèn)為“1個(gè)Socket=1個(gè)連接”,每一個(gè)線程獨(dú)立的維護(hù)一個(gè)Socke
2、t。但是ZMQ摒棄了這種1對(duì)1的模式,ZMQ的Socket可以很輕松的實(shí)現(xiàn)1對(duì)N,N對(duì)1和N對(duì)N的連接模式,一個(gè)ZMQ的Socket可以自動(dòng)的維護(hù)一組連接,用戶無法操作這些連接,用戶只能操作套接字,而不是連接本身,所以說ZMQ的世界里,連接是私有的。2.基礎(chǔ)知識(shí):套接字APIzmq建立在標(biāo)準(zhǔn)套接字api之上,用IMessageSocket類封裝,其生命周期主要包含四個(gè)部分:創(chuàng)建和銷毀套接字:IMessageSocket:IMessageSocket-zmq_socket(void*,inttype),IMessageSocket:close()-zmq_close()配置和讀取套接字選項(xiàng):zm
3、q_setsockopt(void*s,intoption,constvoid*optval,size_toptvallen),zmq_getsockopt(void*s,intoption,void*optval,size_t*optvallen)為套接字建立連接:IMessageSocket:Bind-zmq_bind(void*s,constchar*addr),IMessageSocket:Connect-zmq_connect(void*s,constchar*addr)發(fā)送和接收消息:zmq_send(void*s,constvoid*buf,size_tlen,intflags)
4、,zmq_recv(void*s,void*buf,size_tlen,intflags)3.ZMQ常用模式1)發(fā)布-訂閱(PUB-SUB:1-N)PublisherPUBsueSiabscfiberCMmectbmdlCflllllflCtCMneGtSUBSubscriber2)請(qǐng)求-應(yīng)答(REQ-REP)對(duì)于Request類型的socket,它是同步的,它一個(gè)時(shí)刻只能對(duì)一個(gè)連接進(jìn)行操作,在一個(gè)連接上發(fā)送了數(shù)據(jù)之后,必須接著在這個(gè)連接上執(zhí)行recv,也就是send與recv必須同時(shí)匹配出現(xiàn)。因此ZMQ在使用一個(gè)Socket處理請(qǐng)求的過程中,會(huì)阻塞同一個(gè)端口的其他請(qǐng)求。Response類型的
5、socket也是同步的,與Request的意思差不多,不過順序是先recv再send。3)獨(dú)立對(duì)模式(PAIR-PAIR):線程間l-t0-1隊(duì)列的實(shí)現(xiàn),采用了lockfree實(shí)現(xiàn),所以速度很快。對(duì)于特定的線程PAIR是最好的選擇。4)管道模式(PUSH-PULL):這種模式主要用于發(fā)布數(shù)據(jù)到由管道排列的節(jié)點(diǎn)上面,數(shù)據(jù)總是沿著管道流動(dòng)。每個(gè)管道階段連接了至少一個(gè)節(jié)點(diǎn)。push會(huì)負(fù)載均衡的將消息分發(fā)到pull端,worker可以隨時(shí)自由加入。push端無法recv,pull無法send。PULLSXrtk信封機(jī)制(ROUTER-DEALER):Replyaddress4EnvelopeEmpty
6、messagepart*Oaia信封機(jī)制的根本作用是讓ROUTER知道如何將消息遞送給正確的應(yīng)答目標(biāo)。*從ROUTER中讀取一條消息時(shí),0MQ會(huì)包上一層信封,上面注明了消息的來源。Frame1Frame2Frarne3*向ROUTER寫入一條消息時(shí)(包含信封),0MQ會(huì)將信封拆開,并將消息遞送給相應(yīng)的對(duì)象。作用:實(shí)現(xiàn)多線程的請(qǐng)求-應(yīng)答。4.消息收發(fā)接口發(fā)送進(jìn)程間請(qǐng)求消息:發(fā)送消息體:SccMqContext_T:instance()-GetContext()-SendMessage/GetContext()-SendSyncMessage組幀:BccDealerHander:OnDealMes
7、sage發(fā)送數(shù)據(jù)包:SccMqContext_T:instance()-GetMessagSender()-SendMessage()接收進(jìn)程間請(qǐng)求消息:接收:SccMqContext_T:instance()-GetMessagQueue()-OnMessage()處理:SccMqContext_T:instance()-bccAsynHandler.onDealAsynMsg(pAppFrame)發(fā)送線程間請(qǐng)求消息:發(fā)送:SCCManager_T:instance()-GetDispatcher()-PushMsg接收處理線程間請(qǐng)求消息:接收:SccWorkTask:OnMessage處理
8、:session_op發(fā)送響應(yīng)消息:發(fā)送:MsgDealWorker:SendMsg2Dispatcher轉(zhuǎn)發(fā):SccMqContext_T:instance()-GetContext()-SendAsynResponse接收并處理響應(yīng)消息:分發(fā):DealerTask:HandleRpnMsg接收:AsynReqWorker:RecvReponse處理:AsynMsgCallbackltf:OnRecvResponse()發(fā)送訂閱消息:SccMqContext_T:instance()-GetContext()-Notify訂閱消息處理:BccSubHandler:OnMessage()超時(shí)處
9、理:AsynMsgCallbackltf:OnTimeout()5.消息通信流程1)進(jìn)程間通信消息發(fā)送SccMqContext:StartNotify()訂閱消息SccMqContext_T:instance()-GetContext()-SendMessage/SendSyncMessage進(jìn)程間通信請(qǐng)求消息ptrReq-、endMessageGetContext()-NotifyclientPairSocket-SendPtdINDINGASYNREQTiyEOUTSTRtimeoutSocket_接收超時(shí)任務(wù)請(qǐng)求,并添加至timeoutReqMapasynReqWorkerSocket_
10、發(fā)送超時(shí)任務(wù)請(qǐng)求,TimeoutApynRequestPUSH起任務(wù)定時(shí)器ptrPublisher-Notify、PAIRTAIR用serverPairSocket接收請(qǐng)求消息AsynReqWorker:RecvPairy器時(shí)./定起ptrReq_-pTimeoutTask_-HandlerTimeout()/時(shí)1同:internalSocket-SendUBAsynReqWorker:RegAsynRemoteService注冊(cè)一個(gè)DEALER保存至到AsynReqMap、用AsynSocketInfo-pSocket轉(zhuǎn)發(fā)請(qǐng)求;并保存至msgMap_遍歷timeoutReqMap_判斷超時(shí)后
11、由pushWorkSockyt發(fā)送超吋消息PUSHBINDING_ASYNPULLREQ_WORKER_TIMEOUT_STRtimeoutRecvSocket接收消息msgMap_中找sequence寸應(yīng)的AsynMsgCallbackItf消息接收SUBConsumerRecvTask*pRecvTask_-pSocke接收轉(zhuǎn)BccSubHandler:OnMessag處理(通過RegMsgConsume綁定IMessageConsume,bccSubHandler)-DEALERROUTERcallback-OnTimeoutpDealTask_-dealerSocket接收消息轉(zhuǎn)Bcc
12、DealerHander:OnDealMessag處理通過ptrDealer_-RegMsgDeale綁定msgid,bccDea1erHand1erpQueue-OPULL)nMessage()g.pSender-SendMessage()發(fā)送至主線程PUSHbccAsynHandler.onDealAsynMsg在OnAsynMsgMap找消息處理函數(shù)pDealTask_-dealerSocket接收消息Task:異步/同步消息處理過程DealerRecvTDealerTask::HandleCallback.同步消息DealerTask:ReponseMsg異步消息DealerTask:
13、DealerMsgmsgDealerMapj找pRpnHandlermsgHandleMapj找pRpnHandler:起響應(yīng)定時(shí)器reqMsgSocket發(fā)送給piDeabr-AplRpuL、timeoutRegSocket發(fā)送時(shí))響應(yīng)超時(shí)請(qǐng)求和PUSHPULL;rpnSocket_收ReponseTask:Rec處理*I.BiNDING_DPULLI異步:pRpnHandler-OnDealMessage!同步:pRpnHandlerpOnReponseMessage:同步消息Iu:IMessageDealen:SendAsynResponse響應(yīng)超時(shí)請(qǐng)求TimeoutAsynReques
14、tPUSH)LE_TIMEOUTreponseTimeoutSocket接收響應(yīng)超時(shí)任務(wù)請(qǐng)求,并添口至pnTimeoutMap停定時(shí)器從rpnTimeoutMapf刪除2)線程間通信消息發(fā)送消息接收SccMqContext_T:instance()-GetContext()-SendAsynResponseSCCManager_T:instance()-GetDispatcher()-PushMsg定Workerid)線程間通信(指MsgDispatcher-OnMessage(20)用pPushSockeL獲取待發(fā)送的響應(yīng)消息clientPairSocket-SendPtr1PAIRrMgD
15、ispatchePAIRrOnMessaoe1rMsgDispatcher:RecvPair用serverPairSocket_接收并由pPushSocket轉(zhuǎn)發(fā)出去ROUTERROUTERSCCManager_T:instance()-DEALER例如:MsgDealWorker的子類::SccWorkTask:OnMessage(調(diào)用:!session_op中消息對(duì)應(yīng)的處理函數(shù))GetDispatcher()SccMqContext:ReceiveZmqLoopDealer在MsgDispatcher的taskMap_中找到workerid對(duì)應(yīng)MsgDealWorker,用pDealSoc
16、ket_接收消息,并調(diào)OnMessgeO處理用pDealSocket發(fā)送消息若需要回處理結(jié)果響應(yīng)I處理完成后發(fā)送響應(yīng)結(jié)果MIsgDealWorker:SendMsg2Dispatcher.回結(jié)果響應(yīng)3)響應(yīng)機(jī)制響應(yīng)發(fā)送III:ptrDealer_-SendAsynResponse(sequence,msgbody)asynRpnSender2AsynRpnServerSocket_發(fā)送響應(yīng)消息,c;/ubp.mq.dealer.asyn.rpn_*PUSHPULLptrDealer_-pDealTask_-asynRpnServerSocket_接收響應(yīng)消息pDealTask-HandleRp
17、nMsg停響應(yīng)定時(shí)器pDealTask-dealerSocket發(fā)送響應(yīng)ROUTERDEALERptrReq_-workTask_-asynReqMap_AsynSocketInfo-pSocket_接收消息AsynReqWorker:RecvReponse在msgMap_中找到原始請(qǐng)求,并調(diào)用對(duì)應(yīng)handler處理響應(yīng)AsynMsgCallbackItf:OnRecvResponse(同步消息:AsynMsgRpnHandler:OnRecvResponse,把結(jié)果冋灌到promise)6.關(guān)鍵類SccMqContextclassSccMqContext:privateNonCopyable
18、,publicubp:platform:thread:WorkerRequestpublic:BccAsynMsgHandlerbccAsynHandler;/處理pMsgQueue里的消息(OnAsynMsgMap)private:IMessageContext*contextPtr_;/綁定上下文BccSubHandlerbccSubHandler_;消息訂閱handler,SUB注冊(cè)到contextPtr_-consumerList_(msgTopic_,pHandler_)BccSubHandlerbccSubStatusHandler_;狀態(tài)訂閱handler,SUBBccDeale
19、rHanderbccDealerHandler_;請(qǐng)求消息處理handler,注冊(cè)到contextPtr_-ptrDealer_-MsgDealerMapmsgId,IMessageDealerHandler*IMessageQueue*pMsgQueue_;/PULL:接收隊(duì)列IMessageSender*pMsgSender_;/PUSH:發(fā)送隊(duì)列;IMessageContextlmpl:classMQ_IMPORT_EXPORTIMessageContextImpl:private:void*zmqContext_;/ZMQcontextstd:auto_ptrptrReq_;/std:
20、auto_ptrptrDealer_;/std:auto_ptrptrPublisher_;/訂閱消息發(fā)布者,PUBMsgConsumerListconsumerList_;DefaultReponseHander*pDftRpnHandler_;響應(yīng)消息處理handler注冊(cè)至UcontextPtr_-ptrDealer_-msgHandleMap_msgId,IMessageReponseHandler*;IMessageContextlmpl-ptrReq_:std:auto_ptr-ptrDealer_:std:auto_ptr-ptrPublisher_:std:auto_ptr-c
21、onsumerList_:MsgConsumerList-pDftRpnHandler:DefaultReponseHander+SendSyncMessage(destSvcKey:std:string,message:【Message,timeout:ACE_UINT32,retry:ACE_UINT32):IMessage+SendMessage(destSvcKey:std:string,msgbody:IMessage,cb:AsynMsgCallbackltf,timeout:ACE_UINT32):ACE_INT32+SendAsynResponse(sequenee:ACE_U
22、INT64,msgbody:IMessage):ACE_INT32+RegMsgReponser(handler:IMessageReponseHandler,msgIds:char):ACE_INT32+RegMsgDealer(handler:IMessageDealerHandler,msgIds:char):ACE_INT32+Notify(header:IMessageHeader,msgBody:IMessage,topic:std:string,filter:std:string):ACE_INT32+RegMsgConsumer(destSvcKey:std:string,to
23、pic:std:string,cb:IMessageConsumerHandler,filter:std:string):ACE_INT32,IMessageAsynRequestIMessageReponseIMessageDealerIMessagePublisherIMessageAsynRequest:AsynReqWorkerIMessageAsynRequest-mqContext_:IMessageContextlmpl-clientPairSocket_:std:auto_ptr-serverPairSocket_:std:auto_ptr-workTask_:AsynReqW
24、orker-pTimeoutTask:RequestTimeoutTaskge+SendSyncMessage(destSvcKey:std:string,header:IMessageHeader,message:【Message,timeout:ACE_UINT32,tryTimes:ACE_UINT32):IMessa+SendMessage(promise:IMessagePromise,destSvcKey:std:string,message:IMessage,timeout:ACE_UINT32):ACE_INT32AsynReqWorker-context_:IMessageC
25、ontextImpl-pAsynReq_:IMessageAsynRequest-msgMap_:MsgCallbackMap-socketList_:std:vector-asynReqMap_:AsynReqMap-timeoutRecvSocket_:std:auto_ptr-asynReqWorkerSocket_:std:auto_ptr-asynpn_wait_timeout_:ACE_UINT64-maxidletime:ACEUINT64+Run(:void):ACE_INT32#FindMsgCallback(seq:ACE_UINT64):AsynMsgCallbackIt
26、f#RefreshPollSockets():zmq_pollitem_t#RecvReponse(:IMessageSocket):void#RecvPair(:IMessageSocket):void#RecvTimeout(:IMessageSocket):void#RegAsynRemoteService(endpoint:std:string,destSvcKey:std:string):AsynSocketInfo#DealIdleSocket():voidIMessageDealer:DealerTaskIMessageDealer-mqContext_:IMessageCont
27、extImpl-pMsgTimeoutTask_:TimeoutMsgTask-pDealTask_:DealerTask-msgDealerMap_:MsgDealerMap-msgHandleMap_:SyncMsgHandleMap-asynRpnSender2AsynRpnServerSocket:std:autoptr+SendAsynResponse(sequenee:ACE_UINT64/msgbody:IMessage):ACE_INT32+RegMsgDealer(handler:IMessageDealerHandler,msgIds:char):ACE_INT32+RegMsgReponser(handler:IMessageReponseHandler,msgIds:char):ACE_INT32DealerTask-pDealer_:IMessageDealer-dealerSocket_:std:auto_ptr-asynRpnServerSocket_:std:auto_ptr-dealTaskRegMsg2TimeoutSocket_:std:auto_ptr-timeout2DealTaskServerSocket_:std:auto_ptr-ptrRpn_:st
溫馨提示
- 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請(qǐng)下載最新的WinRAR軟件解壓。
- 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請(qǐng)聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
- 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內(nèi)容里面會(huì)有圖紙預(yù)覽,若沒有圖紙預(yù)覽就沒有圖紙。
- 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
- 5. 人人文庫網(wǎng)僅提供信息存儲(chǔ)空間,僅對(duì)用戶上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對(duì)用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對(duì)任何下載內(nèi)容負(fù)責(zé)。
- 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請(qǐng)與我們聯(lián)系,我們立即糾正。
- 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時(shí)也不承擔(dān)用戶因使用這些下載資源對(duì)自己和他人造成任何形式的傷害或損失。
最新文檔
- 三態(tài)股份招股書解析
- 安全生產(chǎn)責(zé)任狀(模板)
- 微積分習(xí)題(答案)
- 通信電源標(biāo)準(zhǔn)規(guī)范
- 香料作物種植與農(nóng)業(yè)科技創(chuàng)新能力提升策略制定與實(shí)施考核試卷
- 高鐵設(shè)備智能制造與大數(shù)據(jù)分析考核試卷
- 節(jié)能工程合同管理考核試卷
- 危機(jī)干預(yù)中的自我調(diào)適策略-洞察分析
- 虛擬現(xiàn)實(shí)音箱集成-洞察分析
- 2024-2025學(xué)年福建省龍巖市一級(jí)聯(lián)盟校高三上學(xué)期11月期中聯(lián)考生物試題(解析版)
- 礦大畢業(yè)設(shè)計(jì)-固定式帶式輸送機(jī)設(shè)計(jì)
- 《膽囊結(jié)石的護(hù)理》PPT
- 安徽云帆藥業(yè)有限公司原料藥生產(chǎn)項(xiàng)目環(huán)境影響報(bào)告
- 藥品質(zhì)量受權(quán)人管理規(guī)程
- 校本課程之《紅樓夢(mèng)詩詞曲賞析》教案
- 熱動(dòng)復(fù)習(xí)題材料熱力學(xué)與動(dòng)力學(xué)
- 馬工程-公共財(cái)政概論-課程教案
- GB/T 38058-2019民用多旋翼無人機(jī)系統(tǒng)試驗(yàn)方法
- GB/T 30902-2014無機(jī)化工產(chǎn)品雜質(zhì)元素的測定電感耦合等離子體發(fā)射光譜法(ICP-OES)
- GB/T 22638.2-2016鋁箔試驗(yàn)方法第2部分:針孔的檢測
- GB/T 13275-1991一般用途離心通風(fēng)機(jī)技術(shù)條件
評(píng)論
0/150
提交評(píng)論