基于Golang實現延遲隊列(DelayQueue)_第1頁
基于Golang實現延遲隊列(DelayQueue)_第2頁
基于Golang實現延遲隊列(DelayQueue)_第3頁
基于Golang實現延遲隊列(DelayQueue)_第4頁
基于Golang實現延遲隊列(DelayQueue)_第5頁
已閱讀5頁,還剩3頁未讀 繼續(xù)免費閱讀

下載本文檔

版權說明:本文檔由用戶提供并上傳,收益歸屬內容提供方,若內容存在侵權,請進行舉報或認領

文檔簡介

第基于Golang實現延遲隊列(DelayQueue)目錄背景原理堆隨機刪除重置元素到期時間Golang實現數據結構實現原理添加元素阻塞獲取元素Channel方式阻塞讀取性能測試總結

背景

延遲隊列是一種特殊的隊列,元素入隊時需要指定到期時間(或延遲時間),從隊頭出隊的元素必須是已經到期的,而且最先到期的元素最先出隊,也就是隊列里面的元素是按照到期時間排序的,添加元素和從隊頭出隊的時間復雜度是O(log(n))。

由于以上性質,延遲隊列一般可以用于以下場景(定時任務、延遲任務):

緩存:用戶淘汰過期元素通知:在指定時間通知用戶,比如會議開始前30分鐘訂單:30分鐘未支付取消訂單超時:服務器自動斷開太長時間沒有心跳的連接

其實在Golang中是自帶定時器的,也就是time.After()、time.AfterFunc()等函數,它們的性能也是非常好的,隨著Golang版本升級還會優(yōu)化。但是對于某些場景來說確實不夠方便,比如緩存場景我們需要能夠支持隨機刪除定時器,隨機重置過期時間,更加靈活的刪除一小批過期元素。而且像Kafka的時間輪算法(TimeWheel)里面也用到了延遲隊列,因此還是有必要了解下如何實現延遲隊列。

原理

延遲隊列每次出隊的是最小到期時間的元素,而堆就是用來獲取最值的數據結構。使用堆我們可以實現O(log(n))時間復雜度添加元素和移除最小到期時間元素。

隨機刪除

有時候延遲隊列還需要具有隨機刪除元素的能力,可以通過以下方式實現:

元素添加刪除標記字段:堆中每個元素都添加一個刪除標記字段,并把這個元素的地址返回給用戶,用戶就可以標記元素的這個字段為true,這樣元素到達堆頂時如果判斷到這個字段為true就會被清除,而延遲隊列里的元素邏輯上是一定會到達堆頂的(因為時間會流逝)。這是一種懶刪除的方式。元素添加堆中下標字段(或用map記錄下標):堆中每個元素都添加一個堆中下標字段,并把這個元素的地址返回給用戶,這樣我們就可以通過這個元素里面記錄的下標快速定位元素在堆中的位置,從而刪除元素。詳細可以看文章如何實現一個支持O(log(n))隨機刪除元素的堆。

重置元素到期時間

如果需要重置延遲隊列里面元素的到期時間,則必須知道元素在堆中的下標,因為重置到期時間之后必須對堆進行調整,因此只能是元素添加堆中下標字段。

Golang實現

這里我們實現一個最簡單的延遲隊列,也就是不支持隨機刪除元素和重置元素的到期時間,因為有些場景只需要添加元素和獲取到期元素這兩個功能,比如Kafka中的時間輪,而且這種簡單實現性能會高一點。

代碼地址

數據結構

主要的結構可以看到就是一個heap,Entry是每個元素在堆中的表示,Value是具體的元素值,Expired是為了堆中元素根據到期時間排序。

mutex是一個互斥鎖,主要是保證操作并發(fā)安全。

wakeup是一個緩沖區(qū)長度為1的通道,通過它實現添加元素的時候喚醒等待隊列不為空或者有更小到期時間元素加入的協(xié)程。(重點)

typeEntry[Tany]struct{

ValueT

Expiredtime.Time//到期時間

//延遲隊列

typeDelayQueue[Tany]struct{

h*heap.Heap[*Entry[T]]

mutexsync.Mutex//保證并發(fā)安全

wakeupchanstruct{}//喚醒通道

//創(chuàng)建延遲隊列

funcNew[Tany]()*DelayQueue[T]{

returnDelayQueue[T]{

h:heap.New(nil,func(e1,e2*Entry[T])bool{

returne1.Expired.Before(e2.Expired)

wakeup:make(chanstruct{},1),

}

實現原理

阻塞獲取元素的時候如果隊列已經沒有元素,或者沒有元素到期,那么協(xié)程就需要掛起等待。而被喚醒的條件是元素到期、隊列不為空或者有更小到期時間元素加入。

其中元素到期協(xié)程在阻塞獲取元素時發(fā)現堆頂元素還沒到期,因此這個條件可以自己構造并等待。但是條件隊列不為空和有更小到期時間元素加入則需要另外一個協(xié)程在添加元素時才能滿足,因此必須通過一個中間結構來進行協(xié)程間通信,一般Golang里面會使用Channel來實現。

添加元素

一開始加了一個互斥鎖,避免并發(fā)沖突,然后把元素加到堆里。

因為我們Take()操作,既阻塞獲取元素操作,在不滿足條件時會去等待wakeup通道,但是等待通道前必須釋放鎖,否則Push()無法寫入新元素去滿足條件隊列不為空和有更小到期時間元素加入。而從釋放鎖后到開始讀取wakeup通道這段時間是沒有鎖保護的,如果Push()在這期間插入新元素,為了保證通道不阻塞同時又能通知到Take()協(xié)程,我們的通道的長度需要是1,同時使用select+default保證在通道里面已經有元素的時候不阻塞Push()協(xié)程。

//添加延遲元素到隊列

func(q*DelayQueue[T])Push(valueT,delaytime.Duration){

q.mutex.Lock()

deferq.mutex.Unlock()

entry:=Entry[T]{

Value:value,

Expired:time.Now().Add(delay),

q.h.Push(entry)

//喚醒等待的協(xié)程

//這里表示新添加的元素到期時間是最早的,或者原來隊列為空

//因此必須喚醒等待的協(xié)程,因為可以拿到更早到期的元素

ifq.h.Peek()==entry{

select{

caseq.wakeup-struct{}{}:

default:

}

阻塞獲取元素

這里先判斷堆是否有元素,如果有獲取堆頂元素,然后判斷是否已經到期,如果到期則直接出堆并返回。否則等待直到超時或者元素到期或者有新的元素到達。

這里在解鎖之前會清空wakeup通道,這樣可以保證下面讀取的wakeup通道里的元素肯定是新加入的。

//等待直到有元素到期

//或者ctx被關閉

func(q*DelayQueue[T])Take(ctxcontext.Context)(T,bool){

for{

varexpired*time.Timer

q.mutex.Lock()

//有元素

if!q.h.Empty(){

//獲取元素

entry:=q.h.Peek()

iftime.Now().After(entry.Expired){

q.h.Pop()

q.mutex.Unlock()

returnentry.Value,true

//到期時間,使用time.NewTimer()才能夠調用Stop(),從而釋放定時器

expired=time.NewTimer(time.Until(entry.Expired))

//避免被之前的元素假喚醒

select{

case-q.wakeup:

default:

q.mutex.Unlock()

//不為空,需要同時等待元素到期

//并且除非expired到期,否則都需要關閉expired避免泄露

ifexpired!=nil{

select{

case-q.wakeup://新的更快到期元素

expired.Stop()

case-expired.C://首元素到期

case-ctx.Done()://被關閉

expired.Stop()

vartT

returnt,false

}else{

select{

case-q.wakeup://新的更快到期元素

case-ctx.Done()://被關閉

vartT

returnt,false

}

Channel方式阻塞讀取

Golang里面可以使用Channel進行流式消費,因此簡單包裝一個Channel形式的阻塞讀取接口,給通道一點緩沖區(qū)大小可以帶來更好的性能。

//返回一個通道,輸出到期元素

//size是通道緩存大小

func(q*DelayQueue[T])Channel(ctxcontext.Context,sizeint)-chanT{

out:=make(chanT,size)

gofunc(){

for{

entry,ok:=q.Take(ctx)

if!ok{

return

out-entry

returnout

}

使用方式

forentry:=rangeq.Channel(context.Background(),10){

//dosomething

}

性能測試

這里進行一個簡單的性能測試,也就是先添加元素,然后等待到期后全部拿出來。

funcBenchmarkPushAndTake(b*testing.B){

q:=New[int]()

b.ResetTimer()

//添加元素

fori:=0;ib.N;i++{

q.Push(i,time.Duration(i))

//等待全部元素到期

b.StopTimer()

time.Sleep(time.Duration(b.N))

b.StartTimer()

//獲取元素

fori:=0;ib.N;i++{

_,ok:=q.Take(context.Background())

if!ok{

b.Errorf("want%v,but%v",true,ok)

}

測試結果:

Benchmark-82331534476.8ns/op

溫馨提示

  • 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯系上傳者。文件的所有權益歸上傳用戶所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網頁內容里面會有圖紙預覽,若沒有圖紙預覽就沒有圖紙。
  • 4. 未經權益所有人同意不得將文件中的內容挪作商業(yè)或盈利用途。
  • 5. 人人文庫網僅提供信息存儲空間,僅對用戶上傳內容的表現方式做保護處理,對用戶上傳分享的文檔內容本身不做任何修改或編輯,并不能對任何下載內容負責。
  • 6. 下載文件中如有侵權或不適當內容,請與我們聯系,我們立即糾正。
  • 7. 本站不保證下載資源的準確性、安全性和完整性, 同時也不承擔用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。

評論

0/150

提交評論