activeMQ,JMS學(xué)習(xí)資料.doc_第1頁
activeMQ,JMS學(xué)習(xí)資料.doc_第2頁
activeMQ,JMS學(xué)習(xí)資料.doc_第3頁
activeMQ,JMS學(xué)習(xí)資料.doc_第4頁
activeMQ,JMS學(xué)習(xí)資料.doc_第5頁
已閱讀5頁,還剩22頁未讀, 繼續(xù)免費閱讀

下載本文檔

版權(quán)說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請進(jìn)行舉報或認(rèn)領(lǐng)

文檔簡介

1 JMSJMS源于企業(yè)應(yīng)用對于消息中間件的需求,使應(yīng)用程序可以通過消息進(jìn)行異步處理而互不影響。Sun公司和它的合作伙伴設(shè)計的JMS API定義了一組公共的應(yīng)用程序接口和相應(yīng)語法,使得Java程序能夠和其他消息組件進(jìn)行通信。1.1 JMS的基本構(gòu)件1.1.1 連接工廠連接工廠是客戶用來創(chuàng)建連接的對象,例如ActiveMQ提供的ActiveMQConnectionFactory。1.1.2 連接JMS Connection封裝了客戶與JMS提供者之間的一個虛擬的連接。1.1.3 會話JMS Session是生產(chǎn)和消費消息的一個單線程上下文。會話用于創(chuàng)建消息生產(chǎn)者(producer)、消息消費者(consumer)和消息(message)等。會話提供了一個事務(wù)性的上下文,在這個上下文中,一組發(fā)送和接收被組合到了一個原子操作中。1.1.4 目的地目的地是客戶用來指定它生產(chǎn)的消息的目標(biāo)和它消費的消息的來源的對象。JMS1.0.2規(guī)范中定義了兩種消息傳遞域:點對點(PTP)消息傳遞域和發(fā)布/訂閱消息傳遞域。點對點消息傳遞域的特點如下:l 每個消息只能有一個消費者。l 消息的生產(chǎn)者和消費者之間沒有時間上的相關(guān)性。無論消費者在生產(chǎn)者發(fā)送消息的時候是否處于運行狀態(tài),它都可以提取消息。發(fā)布/訂閱消息傳遞域的特點如下:l 每個消息可以有多個消費者。l 生產(chǎn)者和消費者之間有時間上的相關(guān)性。訂閱一個主題的消費者只能消費自它訂閱之后發(fā)布的消息。JMS規(guī)范允許客戶創(chuàng)建持久訂閱,這在一定程度上放松了時間上的相關(guān)性要求。持久訂閱允許消費者消費它在未處于激活狀態(tài)時發(fā)送的消息。在點對點消息傳遞域中,目的地被成為隊列(queue);在發(fā)布/訂閱消息傳遞域中,目的地被成為主題(topic)。1.1.5 消息生產(chǎn)者消息生產(chǎn)者是由會話創(chuàng)建的一個對象,用于把消息發(fā)送到一個目的地。1.1.6 消息消費者消息消費者是由會話創(chuàng)建的一個對象,它用于接收發(fā)送到目的地的消息。消息的消費可以采用以下兩種方法之一:l 同步消費。通過調(diào)用消費者的receive方法從目的地中顯式提取消息。receive方法可以一直阻塞到消息到達(dá)。l 異步消費??蛻艨梢詾橄M者注冊一個消息監(jiān)聽器,以定義在消息到達(dá)時所采取的動作。1.1.7 消息JMS消息由以下三部分組成:l 消息頭。每個消息頭字段都有相應(yīng)的getter和setter方法。l 消息屬性。如果需要除消息頭字段以外的值,那么可以使用消息屬性。l 消息體。JMS定義的消息類型有TextMessage、MapMessage、BytesMessage、StreamMessage和ObjectMessage。1.2 JMS的可靠性機(jī)制1.2.1 消息確認(rèn)JMS消息只有在被確認(rèn)之后,才認(rèn)為已經(jīng)被成功地消費了。消息的成功消費通常包含三個階段:客戶接收消息、客戶處理消息和消息被確認(rèn)。在事務(wù)性會話中,當(dāng)一個事務(wù)被提交的時候,確認(rèn)自動發(fā)生。在非事務(wù)性會話中,消息何時被確認(rèn)取決于創(chuàng)建會話時的應(yīng)答模式(acknowledgement mode)。該參數(shù)有以下三個可選值:l Session.AUTO_ACKNOWLEDGE。當(dāng)客戶成功的從receive方法返回的時候,或者從MessageListener.onMessage方法成功返回的時候,會話自動確認(rèn)客戶收到的消息。l Session.CLIENT_ACKNOWLEDGE??蛻敉ㄟ^消息的acknowledge方法確認(rèn)消息。需要注意的是,在這種模式中,確認(rèn)是在會話層上進(jìn)行:確認(rèn)一個被消費的消息將自動確認(rèn)所有已被會話消費的消息。例如,如果一個消息消費者消費了10個消息,然后確認(rèn)第5個消息,那么所有10個消息都被確認(rèn)。l Session.DUPS_ACKNOWLEDGE。該選擇只是會話遲鈍第確認(rèn)消息的提交。如果JMS provider失敗,那么可能會導(dǎo)致一些重復(fù)的消息。如果是重復(fù)的消息,那么JMS provider必須把消息頭的JMSRedelivered字段設(shè)置為true。1.2.2 持久性JMS 支持以下兩種消息提交模式:l PERSISTENT。指JMS provider持久保存消息,以保證消息不會因為JMS provider的失敗而丟失。l NON_PERSISTENT。不要求JMS provider持久保存消息。1.2.3 優(yōu)先級可以使用消息優(yōu)先級來指示JMS provider首先提交緊急的消息。優(yōu)先級分10個級別,從0(最低)到9(最高)。如果不指定優(yōu)先級,默認(rèn)級別是4。需要注意的是,JMS provider并不一定保證按照優(yōu)先級的順序提交消息。1.2.4 消息過期可以設(shè)置消息在一定時間后過期,默認(rèn)是永不過期。1.2.5 本地事務(wù)在一個JMS客戶端,可以使用本地事務(wù)來組合消息的發(fā)送和接收。JMS Session接口提供了commit和rollback方法。事務(wù)提交意味著生產(chǎn)的所有消息被發(fā)送,消費的所有消息被確認(rèn);事務(wù)回滾意味著生產(chǎn)的所有消息被銷毀,消費的所有消息被恢復(fù)并重新提交,除非它們已經(jīng)過期。事務(wù)性的會話總是牽涉到事務(wù)處理中,commit或rollback方法一旦被調(diào)用,一個事務(wù)就結(jié)束了,而另一個事務(wù)被開始。關(guān)閉事務(wù)性會話將回滾其中的事務(wù)。需要注意的是,如果使用請求/回復(fù)機(jī)制,即發(fā)送一個消息,同時希望在同一個事務(wù)中等待接收該消息的回復(fù),那么程序?qū)⒈粧炱?,因為知道事?wù)提交,發(fā)送操作才會真正執(zhí)行。需要注意的還有一個,消息的生產(chǎn)和消費不能包含在同一個事務(wù)中。2 ActiveMQActiveMQ是apache旗下開源消息中間件,是目前最流行的開源的消息中間件。ActiveMQ功能特點:l 支持跨語言的客戶端,如:Java,C和C + +,C,Ruby,Perl,Python和PHP。l 在JMS客戶端和消息代理都全面支持企業(yè)集成模式。l 支持許多高級功能,如:信息組,虛擬目的地,通配符和復(fù)合目的地。l 完全支持JMS 1.1和J2EE1.4 。l 支持Spring,以便ActiveMQ可以很容易地嵌入到Spring應(yīng)用程序和使用Spring的XML配置機(jī)制 。l 通過了常見J2EE服務(wù)器(如 Geronimo,JBoss 4, GlassFish,WebLogic)的測試。 l 支持可插拔傳輸協(xié)議,如:in-VM,TCP, SSL, NIO, UDP, multicast, JGroups and JXTA 傳輸。l 支持通過JDBC和journal提供高速的消息持久化。l 設(shè)計基于高性能集群,客戶端服務(wù)器,對等通信。l Ajax支持網(wǎng)絡(luò)流媒體,支持使用純DHTML的Web瀏覽器,允許Web瀏覽器成為消息傳遞結(jié)構(gòu)的一部分。l 支持CXF和Axis,以便ActiveMQ可以很容易地進(jìn)入這些Web服務(wù)并提供可靠的消息傳遞。l 可作為一個在內(nèi)存中的JMS提供者,是JMS的單元測試的理想選擇。2.1 安裝ActiveMQ 可以從官方網(wǎng)站(/)下載最新版的ActiveMQ。(最新版的為:5.4.2)2.2 啟動ActiveMQ在下載最新的ActiveMQ將其解壓到相應(yīng)的目錄就可以了。需要啟動ActiveMQ只要找到$activemq.home/bin目錄,雙擊運行activemq.bat就可以了(windows版本)。啟動后的界面如上圖。(ActiveMQ5.3.0版本啟動后的截圖,不同版本會有所不同)Listening for connections at: tcp:/74:61616(這里的IP顯示的是機(jī)器名稱)tcp:/74:61616表示監(jiān)聽的端口地址,也就是編寫程序時,獲取連接進(jìn)用到的URL。ActiveMQ Console at 74:8161/admin 74:8161/admin這個是新版本的activeMQ提供的管理工具訪問地址,可以查看隊列詳情,生產(chǎn)者,消費者等信息。2.3 配置ActiveMQ2.3.1 基本配置說明ActiveMQ默認(rèn)使用的是XML格式配置,配置文件在$activemq.home/conf目錄下,文件名為activemq.xml file:$activemq.base/conf/perties !- The element is used to configure the ActiveMQ broker. - producerFlowControl=true memoryLimit=1mb producerFlowControl=true memoryLimit=1mb !- Use VM cursor for better latency For more information, see: /message-cursors.html - !- The systemUsage controls the maximum amount of space the broker will use before slowing down producers. For more information, see: /producer-flow-control.html - Kahadb配置說明配置示例:KahaDB的各個可配置屬性:屬性默認(rèn)值描述directoryactivemq-data保存message store數(shù)據(jù)文件的目錄indexWriteBatchSize1000批量更新索引的閥值,當(dāng)要更新的索引到達(dá)這個索引時,批量更新到metadata store中indexCacheSize10000指定metadata cache的大小enableIndexWriteAsyncfalse寫入索引文件到metadata store中的方式是否采用異步寫入journalMaxFileLength32mb消息持久數(shù)據(jù)文件的大小enableJournalDiskSyncstrue如果為true,保證使用同步寫入的方式持久化消息到j(luò)ournal文件中cleanupInterval30000清除(清除或歸檔)不再使用的journal 文件的時間周期(毫秒)。checkpointInterval5000寫入索引信息到metadata store中的時間周期(毫秒)ignoreMissingJournalfilesfalse是否忽略丟失的journal文件。如果為false,當(dāng)丟失了journal文件時,broker啟動時會拋異常并關(guān)閉checkForCorruptJournalFilesfalse如果為true,broker在啟動的時候會檢測journal文件是否損壞,若損壞便嘗試恢復(fù)它。checksumJournalFilesfalse如果為true。KahaDB為journal文件生產(chǎn)一個checksum,以便能夠檢測journal文件是否損壞。archiveDataLogsfalse如果為true,當(dāng)達(dá)到cleanupInterval周期時,會歸檔journal文件而不是刪除directoryArchivenull指定歸檔journal文件存放的路徑databaseLockedWaitDelay10000在使用主從數(shù)據(jù)庫備份時,等待獲取DB上的lock的延遲時間。maxAsyncJobs10000等待寫入journal文件的任務(wù)隊列的最大數(shù)量。應(yīng)該大于或等于最大并發(fā)producer的數(shù)量。配合并行存儲轉(zhuǎn)發(fā)屬性使用。concurrentStoreAndDispatchTransactionsfalse如果為true,轉(zhuǎn)發(fā)消息的時候同時提交事務(wù)concurrentStoreAndDispatchTopicsfalse如果為true,轉(zhuǎn)發(fā)Topic消息的時候同時存儲消息的message store中。concurrentStoreAndDispatchQueuestrue如果為true,轉(zhuǎn)發(fā)Queue消息的時候同時存儲消息到message store中。2.3.2 安全性配置說明(以5.3.0的版本為例,各個版本配置會有所不同,需查閱相應(yīng)資料)訪問activeMQ權(quán)限配置是在$activemq.home/conf目錄下的activemq.xml中配置的,默認(rèn)是沒有進(jìn)行配置,任何用戶都是可以訪問activeMQ。如果需要對訪問權(quán)限進(jìn)行控制可以進(jìn)行相應(yīng)配置,可以指定到相應(yīng)隊列的訪問權(quán)限。1. 配置activemq.xml配置權(quán)限需要在,activemq.xml的元素中加入以下部分。 /queue=對應(yīng)的是所有隊列/ read=”admins” 表示admins組的所有用戶都可以消費前面queue屬性指定的隊列的消息/ write=admins表示admins組的所有用戶都可以向前面queue屬性指定的隊列發(fā)送消息/ admin=admins表示admins組的所有用戶可以在前面queue屬性指定的隊列不存在的情況創(chuàng)建隊列 read=admins write=admins admin=admins / queue=TEST.表示隊列名稱所有以”TEST.”(包含“.”)開頭的隊列,(這里建議隊列名稱以“.”作分隔符) read=users write=users admin=users / read=users write=users admin=users / write=guests,users admin=guests,users可以同時指定多個組 read=guests write=guests,users admin=guests,users /主題相關(guān)配置,與隊列配置類似 read=admins write=admins admin=admins / read=users write=users admin=users / read=guests write=guests,users admin=guests,users / read=guests,users write=guests,users admin=guests,users/ 2. 用戶配置在$activemq.home/conf目錄下增加 login.config、perties、perties三個文件。l login.config內(nèi)容activemq-domain org.apache.activemq.jaas.PropertiesLoginModule required debug=true perties.user=perties perties.group=perties; ;perties.user=perties指定用戶對應(yīng)的配置文件perties.group=perties;指定用戶組對應(yīng)的配置文件l perties內(nèi)容admins=systemusers=zengjun,llguests=guestusers=zengjun,llusers表示用戶組名,與activemq.xml安全性配置中的read,write,admin屬性值對應(yīng)。zengjun,ll表示兩個用戶名zengjun和ll,這里表示users組下有兩個用戶zengjun和ll。l perties內(nèi)容system=managerzengjun=zjll=llzengjun=zjzengjun表示用戶名,與perties配置用戶名對應(yīng)zj表示等號左邊用戶對應(yīng)的密碼。注:在配置安全性配置后,在寫代碼創(chuàng)建連接時需要加上對應(yīng)的用戶名與密碼。如:Connection connection = connectionFactory.createConnection(zengjun,zj);2.4 點對點域2.4.1 生產(chǎn)消息生產(chǎn)消息都步驟如上圖的深色部分所示。首先需要從連接工廠中獲取到連接,然后通過連接來創(chuàng)建會話,再通過會話來創(chuàng)建目的地,再用會話與目的地來創(chuàng)建生產(chǎn)者。需要發(fā)送的消息也是通過會話來創(chuàng)建的。最后通過生產(chǎn)者來發(fā)送消息。示例代碼:/初使化連接工廠ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(tcp:/74:61616); Connection connection = null; Session session = null; MessageProducer producer = null;Destination destination = null;/創(chuàng)建連接connection = connectionFactory.createConnection();/創(chuàng)建會話 session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);/創(chuàng)目的地destination = session.createQueue(TEST_QUEUE_ZJ);/創(chuàng)生產(chǎn)者producer = session.createProducer(destination);/設(shè)置消息的持久模式producer.setDeliveryMode(DeliveryMode.PERSISTENT);connection.start();/創(chuàng)建消息TextMessage message = session.createTextMessage();/設(shè)置消息屬性 double d = Math.random();message.setStringProperty(ID, String.valueOf(d);message.setText(tttt111);message.setText(tttt22222222);/發(fā)送消息producer.send(message);2.4.2 消費消息消費消息如上圖深色部分所示。首先需要從連接工廠創(chuàng)建連接,然后再通過連接創(chuàng)建會話,然后通過會話創(chuàng)建目的地,再通過會話與目的地來創(chuàng)消費者,然后消費者調(diào)用接口來消費消息。代碼示例:/初始連接工廠 ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(tcp:/74:61616); Session session = null; MessageConsumer consumer = null; Connection connection = null; Message message = null;Destination destination=null;/創(chuàng)建連接connection = connectionFactory.createConnection();connection.start();/創(chuàng)建會話,指定消息的確認(rèn)模式,(確認(rèn)模式請參1.2.1)session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);/創(chuàng)建目的地destination = session.createQueue(TEST_QUEUE_ZJ);/創(chuàng)建消息者consumer = session.createConsumer(destination);/消息者調(diào)用接收消息接口來接收消息。/ 無時間參數(shù)表示一直等待,直到收到消息。/ message = consumer.receive();/ 有時間參數(shù)表示指定時間后沒有消息則結(jié)束時,如果存在消息就在取完消息后結(jié)束message = consumer.receive(5 * 1000);/ 如果沒有收到消息,不會等待,立即往下執(zhí)行,不建議使用/ message = consumer.receiveNoWait();/判斷是否收到消息。if (message != null) /判斷消息的類型if (message instanceof TextMessage) TextMessage textMessage = (TextMessage) message;String text = textMessage.getText();System.out.println(TEXT: + text);/ 確認(rèn)消息textMessage.acknowledge(); else if (message instanceof StreamMessage) StreamMessage streamMessage = (StreamMessage) message;String strId = streamMessage.getStringProperty(ID);System.out.println(streammessage ID: + strId);/確認(rèn)消息streamMessage.acknowledge(); else System.out.println(沒有收到消息);還可以采用監(jiān)聽的方式來消費消息。采用監(jiān)聽的方式,需要實現(xiàn)MessageListener接口,創(chuàng)建消費者的方式與前面描消費消息的步驟一致,在建創(chuàng)建好消費者后需要設(shè)置實現(xiàn)MessageListener接口的監(jiān)聽器,當(dāng)監(jiān)聽器監(jiān)聽到消費者對應(yīng)的目的地上有消息時,會自動調(diào)用onMessage方法,在onMessage方法中可以得到消息。代碼示例:/實現(xiàn)MessageListener接口public class AmqConsumerOnMessage implements MessageListener public static void main(String args) new AmqConsumerOnMessage().createConsumer(TEST_queuE);/實現(xiàn)onMessage方法 public void onMessage(Message message) try if (message != null) if (message instanceof TextMessage) TextMessage textMessage = (TextMessage) message; String text = textMessage.getText(); System.out.println(TEXT: + text); else if (message instanceof StreamMessage) StreamMessage streamMessage = (StreamMessage) message; String strId = streamMessage.getStringProperty(ID); System.out.println(streammessage ID: + strId); /確認(rèn)消息 message.acknowledge(); else System.out.println(沒有收到消息); catch (JMSException e) e.printStackTrace(); /創(chuàng)建消費者 public void createConsumer(String queue) MessageConsumer consumer = null; Session session = null; Connection connection = null; try ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(tcp:/74:61616); connection = connectionFactory.createConnection(); connection.start(); session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); Destination objDestination = null; objDestination = session.createQueue(queue); consumer = session.createConsumer(objDestination);/設(shè)置監(jiān)聽器,來監(jiān)聽消息 consumer.setMessageListener(this); catch (JMSException e) e.printStackTrace(); finally ConnectionUtil.closeAll(connection, session, consumer); 2.5 發(fā)布/訂閱域2.5.1 發(fā)布消息發(fā)布消息與點對點域的生產(chǎn)消息類似。只是將隊列換成了主題,將生產(chǎn)者換成發(fā)布者代碼示例:/初始化連接工廠ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(tcp:/74:61616);TopicConnection connection = null;ActiveMQTopicSession session = null;ActiveMQTopicPublisher publisher = null;ActiveMQTopic topic = null; /創(chuàng)建連接connection = connectionFactory.createTopicConnection(zengjun, zj);/創(chuàng)建會話session = (ActiveMQTopicSession) connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);/創(chuàng)建主題topic = (ActiveMQTopic) session.createTopic(TEST.topic.zj);/創(chuàng)建發(fā)布者publisher = (ActiveMQTopicPublisher) session.createPublisher(topic);/設(shè)置消息持久方式,如果要實現(xiàn)持久訂閱,持久方法必須是DeliveryMode.PERSISTENTpublisher.setDeliveryMode(DeliveryMode.PERSISTENT);connection.start();TextMessage message = session.createTextMessage();mes

溫馨提示

  • 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內(nèi)容里面會有圖紙預(yù)覽,若沒有圖紙預(yù)覽就沒有圖紙。
  • 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
  • 5. 人人文庫網(wǎng)僅提供信息存儲空間,僅對用戶上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對任何下載內(nèi)容負(fù)責(zé)。
  • 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請與我們聯(lián)系,我們立即糾正。
  • 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時也不承擔(dān)用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。

評論

0/150

提交評論