基于最小堆的定时器的实现

概述

定时器(Timer Facility)被用于处理延时任务。

有多种数据结构和算法可以实现定时器,包括最小堆,哈希轮,分级时间轮等。在这篇文章中主要介绍基于最小堆的定时器的实现。

原理

最小堆指的是满足除了根节点以外的每个节点都不小于其父节点的堆。这样,堆中的最小值就存放在根节点中,并且在以某个结点为根的子树中,各节点的值都不小于该子树根节点的值。

最小堆其实就是最小优先队列,将新的定时器添加到堆中,根据绝对到期时间完成最小堆的有序化。

在每个时钟周期比较堆顶的定时器的到期时间和当前的时钟,如果定时器的到期时间是小于当前时间,则执行并删除到期定时器; 继续这样做这样的比较,直到堆顶包含一个过期时间大于当前时间的计时器。

操作

基于最小堆的定时器包含的操作包括

  • 插入定时器。该操作的时间复杂度为O(lgn)
  • 获取最先超时的定时器。由于是最小堆,只需返回堆的root即可,此时的算法复杂度为O(1) 。
  • 在定时器到期后,执行相关的动作,它的算法复杂度为 O(1) 。

实现

具体的Go语言实现如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185

package timer

import (
"time"
"container/heap"
)

//定义定时器基本的处理单元
type Timer struct {
id int64 //唯一id
index int //方便调用heap.Fix和heap.Remove
expired time.Time //过期的绝对时间
onTimeOut func(time.Time) error //过期时的处理事件
}

func NewTimer(id int64, expired time.Time, onTimeOut func(time.Time) error) *Timer{
return &Timer{
id: id,
expired:expired,
onTimeOut:onTimeOut,
}
}

//基于container/heap包实现的基于过期时间的优先队列
type TimerQueue []*Timer

func (tq TimerQueue) Len() int {
return len(tq)
}

func (tq TimerQueue) Less(i, j int) bool {
if tq[i].expired.Before(tq[j].expired) {
//过期早的在堆的上面,堆的最顶层就是过期时间最早的一个
return true
} else if tq[i].expired.After(tq[j].expired) {
return false
} else {
//如果时间相等 为了实现稳定排序 再基于id排一次
return tq[i].id < tq[j].id
}
}

func (tq TimerQueue) Swap(i, j int) {
tq[i], tq[j] = tq[j], tq[i]

//要再上面的交换之后 给index赋予正确的值
tq[i].index = i
tq[j].index = j
}

//实现Push和Pop
func (tq *TimerQueue) Push(x interface{}) {
n := len(*tq)
item := x.(*Timer)
item.index = n
*tq = append(*tq, item)

//之所以要将新加入的元素赋给最后一个元素,是因为heap.Push()接口的实现是先调用这个自定义的Push()
//再调用了up方法将最后一个元素上浮至合适的位置
}

func (tq *TimerQueue) Pop() interface{} {
old := *tq
n := len(old)
if n > 0 {
item := old[n-1]
item.index = -1
*tq = old[0 : n-1]
return item
} else {
return nil
}

//之所以要取出最后一个元素,是因为heap.Pop()接口的实现是先把heap的顶部元素交换到最后一个元素,
//再对除了最后一个元素的前面的元素重新排序为堆,之后调用自定义的Pop()
}


type TimerFacility struct {
timerQueue TimerQueue //优先队列
ticker *time.Ticker //检查任务过期的基本时间间隔
addTimer chan *Timer
timers map[int64]*Timer //定时任务集合
workLimit chan *interface{} //控制并发任务的最大量
}

func NewTimerFacility(d time.Duration) *TimerFacility {

tf := &TimerFacility{
timerQueue: TimerQueue{},
ticker: time.NewTicker(d),
addTimer: make(chan *Timer, 2000),
timers: make(map[int64]*Timer),
workLimit: make(chan *interface{}, 2000),
}
//初始化队列
heap.Init(&tf.timerQueue)
//启动
tf.Start()

return tf
}

//启动定时器
func (self *TimerFacility) Start() {

go func() {

for {
select {
case <-self.ticker.C:
//若定时器到了
//处理过期的元素
self.onExpire()
case t := <-self.addTimer:
if _, ok := self.timers[t.id]; !ok {
self.timers[t.id] = t
heap.Push(&self.timerQueue, t)
}
}
}

}()
}

//移出timer
func (self *TimerFacility) RemoveTimer(id int64) *Timer {
if _, ok := self.timers[id]; !ok {
//找不到的话 当做已移出处理
return nil
}

t := self.timers[id]
//从任务集合中删除
delete(self.timers, id)
//从队列中移出
heap.Remove(&self.timerQueue, t.index)
return t
}

//增加timer
func (self *TimerFacility) AddTimer(t *Timer) {
self.addTimer <- t
}

//处理过期的timer
func (self *TimerFacility) onExpire() {

for {

if self.timerQueue.Len() <= 0 {
//如果已经没有元素了
break
}

//当前时间
now := time.Now()
//得到具有最小过期时间的堆顶元素
expired := self.timerQueue[0].expired
if expired.After(now) {
//如果还没过期
break
}

//依次从顶部取出当前最小过期时间的timer
t := heap.Pop(&self.timerQueue).(*Timer)

if t.onTimeOut != nil {
//加入处理队列
self.workLimit <- nil
go func() {
//异步处理过期
defer func() {
<-self.workLimit
}()
t.onTimeOut(now)
}()
}

//删除元素
delete(self.timers, t.id)
}
return
}

参考

https://www.ibm.com/developerworks/cn/linux/l-cn-timers/

http://www.lpnote.com/2017/11/16/hashed-and-hierarchical-timing-wheels/