PHPmemcache實(shí)現(xiàn)消息隊(duì)列實(shí)例_第1頁(yè)
PHPmemcache實(shí)現(xiàn)消息隊(duì)列實(shí)例_第2頁(yè)
PHPmemcache實(shí)現(xiàn)消息隊(duì)列實(shí)例_第3頁(yè)
PHPmemcache實(shí)現(xiàn)消息隊(duì)列實(shí)例_第4頁(yè)
PHPmemcache實(shí)現(xiàn)消息隊(duì)列實(shí)例_第5頁(yè)
已閱讀5頁(yè),還剩9頁(yè)未讀, 繼續(xù)免費(fèi)閱讀

下載本文檔

版權(quán)說(shuō)明:本文檔由用戶(hù)提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請(qǐng)進(jìn)行舉報(bào)或認(rèn)領(lǐng)

文檔簡(jiǎn)介

1、PHP memcache實(shí)現(xiàn)消息隊(duì)列實(shí)例現(xiàn)在memcache在服務(wù)器緩存應(yīng)用比較廣泛,下面我來(lái)介紹memcache實(shí)現(xiàn)消息隊(duì)列等待的一個(gè)例子,有需要了解的朋友可參考。memche消息隊(duì)列的原理就是在key上做文章,用以做一個(gè)連續(xù)的數(shù)字加上前綴記錄序列化以后消息或者日志。然后通過(guò)定時(shí)程序?qū)?nèi)容落地到文件或者數(shù)據(jù)庫(kù)。php實(shí)現(xiàn)消息隊(duì)列的用處比如在做發(fā)送郵件時(shí)發(fā)送大量郵件很費(fèi)時(shí)間的問(wèn)題,那么可以采取隊(duì)列。方便實(shí)現(xiàn)隊(duì)列的輕量級(jí)隊(duì)列服務(wù)器是:starling支持memcache協(xié)議的輕量級(jí)持久化服務(wù)器Beanstalkd輕量、高效,支持持久化,每秒可處理3000左右的隊(duì)列php中也可以使用memcach

2、e/memcached來(lái)實(shí)現(xiàn)消息隊(duì)列。 代碼如下復(fù)制代碼<?php/* Memcache 消息隊(duì)列類(lèi)*/class QMC const PREFIX = 'ASDFASDFFWQKE'/* 初始化mc* staticvar string $mc* return Memcache*/static private function mc_init() static $mc = null;if (is_null($mc) $mc = new Memcache;$mc->connect('127.0.0.1', 11211);return $mc;

3、/* mc 計(jì)數(shù)器,增加計(jì)數(shù)并返回新的計(jì)數(shù)* param string $key   計(jì)數(shù)器* param int $offset   計(jì)數(shù)增量,可為負(fù)數(shù).0為不改變計(jì)數(shù)* param int $time     時(shí)間* return int/false    失敗是返回false,成功時(shí)返回更新計(jì)數(shù)器后的計(jì)數(shù)*/static public function set_counter( $key, $offset, $time=0 )$mc = self:mc_init();$val =

4、 $mc->get($key);if( !is_numeric($val) | $val < 0 )$ret = $mc->set( $key, 0, $time );if( !$ret ) return false;$val = 0;$offset = intval( $offset );if( $offset > 0 )return $mc->increment( $key, $offset );elseif( $offset < 0 )return $mc->decrement( $key, -$offset );return $val;/* 寫(xiě)

5、入隊(duì)列* param string $key* param mixed $value* return bool*/static public function input( $key, $value )$mc = self:mc_init();$w_key = self:PREFIX.$key.'W'$v_key = self:PREFIX.$key.self:set_counter($w_key, 1);return $mc->set( $v_key, $value );/* 讀取隊(duì)列里的數(shù)據(jù)* param string $key* param int $max

6、0; 最多讀取條數(shù)* return array*/static public function output( $key, $max=100 )$out = array();$mc = self:mc_init();$r_key = self:PREFIX.$key.'R'$w_key = self:PREFIX.$key.'W'$r_p   = self:set_counter( $r_key, 0 );/讀指針$w_p   = self:set_counter( $w_key, 0 );/寫(xiě)指針if( $r_p = 0

7、 ) $r_p = 1;while( $w_p >= $r_p )if( -$max < 0 ) break;$v_key = self:PREFIX.$key.$r_p;$r_p = self:set_counter( $r_key, 1 );$out = $mc->get( $v_key );$mc->delete($v_key);return $out;/*使用方法:QMC:input($key, $value );/寫(xiě)入隊(duì)列$list = QMC:output($key);/讀取隊(duì)列*/?>基于PHP共享內(nèi)存實(shí)現(xiàn)的消息隊(duì)列: 代碼如下復(fù)制代碼&l

8、t;?php/* 使用共享內(nèi)存的PHP循環(huán)內(nèi)存隊(duì)列實(shí)現(xiàn)* 支持多進(jìn)程, 支持各種數(shù)據(jù)類(lèi)型的存儲(chǔ)* 注: 完成入隊(duì)或出隊(duì)操作,盡快使用unset(), 以釋放臨界區(qū)* author wangbinandi* created 2009-12-23*/class ShmQueueprivate $maxQSize = 0; / 隊(duì)列最大長(zhǎng)度private $front = 0; / 隊(duì)頭指針private $rear = 0;  / 隊(duì)尾指針private $blockSize = 256;  / 塊的大小(byte)private $memSize = 25600; 

9、 / 最大共享內(nèi)存(byte)private $shmId = 0;private $filePtr = './shmq.ptr'private $semId = 0;public function _construct()$shmkey = ftok(_FILE_, 't');$this->shmId = shmop_open($shmkey, "c", 0644, $this->memSize );$this->maxQSize = $this->memSize / $this->blockSize;/ 申?

10、一個(gè)信號(hào)量$this->semId = sem_get($shmkey, 1);sem_acquire($this->semId); / 申請(qǐng)進(jìn)入臨界區(qū)$this->init();private function init()if ( file_exists($this->filePtr) )$contents = file_get_contents($this->filePtr);$data = explode( '|', $contents );if ( isset($data0) && isset($data1)$t

11、his->front = (int)$data0;$this->rear  = (int)$data1;public function getLength()return ($this->rear - $this->front + $this->memSize) % ($this->memSize) )/$this->blockSize;public function enQueue( $value )if ( $this->ptrInc($this->rear) = $this->front ) / 隊(duì)滿(mǎn)return fa

12、lse;$data = $this->encode($value);shmop_write($this->shmId, $data, $this->rear );$this->rear = $this->ptrInc($this->rear);return true;public function deQueue()if ( $this->front = $this->rear ) / 隊(duì)空return false;$value = shmop_read($this->shmId, $this->front, $this->bl

13、ockSize-1);$this->front = $this->Inc($this->front);return $this->decode($value);private function ptrInc( $ptr )return ($ptr + $this->blockSize) % ($this->memSize);private function encode( $value )$data = serialize($value) . "_eof"echo ''echo strlen($data);echo 

14、9;'echo $this->blockSize -1;echo ''if ( strlen($data) > $this->blockSize -1 )throw new Exception(strlen($data)." is overload block size!");return $data;private function decode( $value )$data = explode("_eof", $value);return unserialize($data0);public function

15、 _destruct()$data = $this->front . '|' . $this->rear;file_put_contents($this->filePtr, $data);sem_release($this->semId); / 出臨界區(qū), 釋放信號(hào)量/*/ 進(jìn)隊(duì)操作$shmq = new ShmQueue();$data = 'test data'$shmq->enQueue($data);unset($shmq);/ 出隊(duì)操作$shmq = new ShmQueue();$data = $shmq->deQ

16、ueue();unset($shmq);*/?>對(duì)于一個(gè)很大的消息隊(duì)列,頻繁進(jìn)行進(jìn)行大數(shù)據(jù)庫(kù)的序列化 和 反序列化,有太耗費(fèi)。下面是我用PHP 實(shí)現(xiàn)的一個(gè)消息隊(duì)列,只需要在尾部插入一個(gè)數(shù)據(jù),就操作尾部,不用操作整個(gè)消息隊(duì)列進(jìn)行讀取,與操作。但是,這個(gè)消息隊(duì)列不是線(xiàn)程安全的,我只是盡量的避免了沖突的可能性。如果消息不是非常的密集,比如幾秒鐘才一個(gè),還是可以考慮這樣使用的。 如果你要實(shí)現(xiàn)線(xiàn)程安全的,一個(gè)建議是通過(guò)文件進(jìn)行鎖定,然后進(jìn)行操作。下面是代碼: 代碼如下: 代碼如下復(fù)制代碼class Memcache_Queue  private $

17、memcache; private $name; private $prefix; function _construct($maxSize, $name, $memcache, $prefix = "_memcache_queue_")  if ($memcache = null)  throw new Exception("memcache object is null, new the object first.");  $this->memcache = $me

18、mcache; $this->name = $name; $this->prefix = $prefix; $this->maxSize = $maxSize; $this->front = 0; $this->real = 0; $this->size = 0;  function _get($name)  return $this->get($name);  function _set($name, $value) &#

19、160;$this->add($name, $value); return $this;  function isEmpty()  return $this->size = 0;  function isFull()  return $this->size = $this->maxSize;  function enQueue($data)  if ($this->isFull()  throw new Exception(&

20、quot;Queue is Full");  $this->increment("size"); $this->set($this->real, $data); $this->set("real", ($this->real + 1) % $this->maxSize); return $this;  function deQueue()  if ($this->isEmpty()  throw new

21、Exception("Queue is Empty");  $this->decrement("size"); $this->delete($this->front); $this->set("front", ($this->front + 1) % $this->maxSize); return $this;  function getTop()  return $this->get($this->

22、;front);  function getAll()  return $this->getPage();  function getPage($offset = 0, $limit = 0)  if ($this->isEmpty() | $this->size < $offset)  return null;  $keys = $this->getKeyByPos($this->front + $offset) % $this->maxSi

23、ze); $num = 1; for ($pos = ($this->front + $offset + 1) % $this->maxSize; $pos != $this->real; $pos = ($pos + 1) % $this->maxSize)  $keys = $this->getKeyByPos($pos); $num+; if ($limit > 0 && $limit = $num)  break;   return a

24、rray_values($this->memcache->get($keys);  function makeEmpty()  $keys = $this->getAllKeys(); foreach ($keys as $value)  $this->delete($value);  $this->delete("real"); $this->delete("front"); $this->delete

25、("size"); $this->delete("maxSize");  private function getAllKeys()  if ($this->isEmpty()  return array();  $keys = $this->getKeyByPos($this->front); for ($pos = ($this->front + 1) % $this->maxSize; $pos != $this->real; $pos = ($pos + 1) % $this->maxSize)  $keys = $this->getKeyByPos($pos);  return $keys;  private func

溫馨提示

  • 1. 本站所有資源如無(wú)特殊說(shuō)明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請(qǐng)下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請(qǐng)聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶(hù)所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁(yè)內(nèi)容里面會(huì)有圖紙預(yù)覽,若沒(méi)有圖紙預(yù)覽就沒(méi)有圖紙。
  • 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
  • 5. 人人文庫(kù)網(wǎng)僅提供信息存儲(chǔ)空間,僅對(duì)用戶(hù)上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對(duì)用戶(hù)上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對(duì)任何下載內(nèi)容負(fù)責(zé)。
  • 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請(qǐng)與我們聯(lián)系,我們立即糾正。
  • 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時(shí)也不承擔(dān)用戶(hù)因使用這些下載資源對(duì)自己和他人造成任何形式的傷害或損失。

最新文檔

評(píng)論

0/150

提交評(píng)論