1
0
mirror of https://github.com/v2fly/v2ray-core.git synced 2024-11-05 01:38:24 -05:00
v2fly/common/collect/timed_queue.go

95 lines
1.8 KiB
Go
Raw Normal View History

2015-09-27 19:11:40 -04:00
package collect
import (
"container/heap"
"sync"
"time"
)
type timedQueueEntry struct {
timeSec int64
value interface{}
}
type timedQueueImpl []*timedQueueEntry
func (queue timedQueueImpl) Len() int {
return len(queue)
}
func (queue timedQueueImpl) Less(i, j int) bool {
return queue[i].timeSec < queue[j].timeSec
}
func (queue timedQueueImpl) Swap(i, j int) {
tmp := queue[i]
queue[i] = queue[j]
queue[j] = tmp
}
func (queue *timedQueueImpl) Push(value interface{}) {
entry := value.(*timedQueueEntry)
*queue = append(*queue, entry)
}
func (queue *timedQueueImpl) Pop() interface{} {
old := *queue
n := len(old)
v := old[n-1]
*queue = old[:n-1]
return v
}
type TimedQueue struct {
queue timedQueueImpl
2015-10-06 17:29:05 -04:00
access sync.RWMutex
2015-09-27 19:11:40 -04:00
removed chan interface{}
}
func NewTimedQueue(updateInterval int) *TimedQueue {
queue := &TimedQueue{
queue: make([]*timedQueueEntry, 0, 256),
removed: make(chan interface{}, 16),
2015-10-06 17:29:05 -04:00
access: sync.RWMutex{},
2015-09-27 19:11:40 -04:00
}
go queue.cleanup(time.Tick(time.Duration(updateInterval) * time.Second))
return queue
}
func (queue *TimedQueue) Add(value interface{}, time2Remove int64) {
queue.access.Lock()
heap.Push(&queue.queue, &timedQueueEntry{
timeSec: time2Remove,
value: value,
})
queue.access.Unlock()
}
func (queue *TimedQueue) RemovedEntries() <-chan interface{} {
return queue.removed
}
func (queue *TimedQueue) cleanup(tick <-chan time.Time) {
for {
now := <-tick
2015-10-06 18:30:44 -04:00
queue.access.RLock()
queueLen := queue.queue.Len()
queue.access.RUnlock()
2015-10-06 17:29:05 -04:00
if queueLen == 0 {
2015-09-27 19:11:40 -04:00
continue
}
nowSec := now.UTC().Unix()
2015-10-06 18:30:44 -04:00
queue.access.RLock()
2015-10-06 17:29:05 -04:00
firstEntryTime := queue.queue[0].timeSec
2015-10-06 18:30:44 -04:00
queue.access.RUnlock()
2015-10-06 17:29:05 -04:00
if firstEntryTime > nowSec {
2015-09-27 19:11:40 -04:00
continue
}
queue.access.Lock()
2015-10-06 17:29:05 -04:00
firstEntryValue := heap.Pop(&queue.queue).(*timedQueueEntry).value
2015-09-27 19:11:40 -04:00
queue.access.Unlock()
2015-10-06 17:29:05 -04:00
queue.removed <- firstEntryValue
2015-09-27 19:11:40 -04:00
}
}