1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
|
// Copyright 2009 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package time
import (
"errors"
"sync"
)
// A Ticker holds a synchronous channel that delivers `ticks' of a clock
// at intervals.
type Ticker struct {
C <-chan int64 // The channel on which the ticks are delivered.
c chan<- int64 // The same channel, but the end we use.
ns int64
shutdown chan bool // Buffered channel used to signal shutdown.
nextTick int64
next *Ticker
}
// Stop turns off a ticker. After Stop, no more ticks will be sent.
func (t *Ticker) Stop() {
select {
case t.shutdown <- true:
// ok
default:
// Stop in progress already
}
}
// Tick is a convenience wrapper for NewTicker providing access to the ticking
// channel only. Useful for clients that have no need to shut down the ticker.
func Tick(ns int64) <-chan int64 {
if ns <= 0 {
return nil
}
return NewTicker(ns).C
}
type alarmer struct {
wakeUp chan bool // wakeup signals sent/received here
wakeMeAt chan int64
wakeTime int64
}
// Set alarm to go off at time ns, if not already set earlier.
func (a *alarmer) set(ns int64) {
switch {
case a.wakeTime > ns:
// Next tick we expect is too late; shut down the late runner
// and (after fallthrough) start a new wakeLoop.
close(a.wakeMeAt)
fallthrough
case a.wakeMeAt == nil:
// There's no wakeLoop, start one.
a.wakeMeAt = make(chan int64)
a.wakeUp = make(chan bool, 1)
go wakeLoop(a.wakeMeAt, a.wakeUp)
fallthrough
case a.wakeTime == 0:
// Nobody else is waiting; it's just us.
a.wakeTime = ns
a.wakeMeAt <- ns
default:
// There's already someone scheduled.
}
}
// Channel to notify tickerLoop of new Tickers being created.
var newTicker chan *Ticker
func startTickerLoop() {
newTicker = make(chan *Ticker)
go tickerLoop()
}
// wakeLoop delivers ticks at scheduled times, sleeping until the right moment.
// If another, earlier Ticker is created while it sleeps, tickerLoop() will start a new
// wakeLoop and signal that this one is done by closing the wakeMeAt channel.
func wakeLoop(wakeMeAt chan int64, wakeUp chan bool) {
for wakeAt := range wakeMeAt {
Sleep(wakeAt - Nanoseconds())
wakeUp <- true
}
}
// A single tickerLoop serves all ticks to Tickers. It waits for two events:
// either the creation of a new Ticker or a tick from the alarm,
// signaling a time to wake up one or more Tickers.
func tickerLoop() {
// Represents the next alarm to be delivered.
var alarm alarmer
var now, wakeTime int64
var tickers *Ticker
for {
select {
case t := <-newTicker:
// Add Ticker to list
t.next = tickers
tickers = t
// Arrange for a new alarm if this one precedes the existing one.
alarm.set(t.nextTick)
case <-alarm.wakeUp:
now = Nanoseconds()
wakeTime = now + 1e15 // very long in the future
var prev *Ticker = nil
// Scan list of tickers, delivering updates to those
// that need it and determining the next wake time.
// TODO(r): list should be sorted in time order.
for t := tickers; t != nil; t = t.next {
select {
case <-t.shutdown:
// Ticker is done; remove it from list.
if prev == nil {
tickers = t.next
} else {
prev.next = t.next
}
continue
default:
}
if t.nextTick <= now {
if len(t.c) == 0 {
// Only send if there's room. We must not block.
// The channel is allocated with a one-element
// buffer, which is sufficient: if he hasn't picked
// up the last tick, no point in sending more.
t.c <- now
}
t.nextTick += t.ns
if t.nextTick <= now {
// Still behind; advance in one big step.
t.nextTick += (now - t.nextTick + t.ns) / t.ns * t.ns
}
}
if t.nextTick < wakeTime {
wakeTime = t.nextTick
}
prev = t
}
if tickers != nil {
// Please send wakeup at earliest required time.
// If there are no tickers, don't bother.
alarm.wakeTime = wakeTime
alarm.wakeMeAt <- wakeTime
} else {
alarm.wakeTime = 0
}
}
}
}
var onceStartTickerLoop sync.Once
// NewTicker returns a new Ticker containing a channel that will
// send the time, in nanoseconds, every ns nanoseconds. It adjusts the
// intervals to make up for pauses in delivery of the ticks. The value of
// ns must be greater than zero; if not, NewTicker will panic.
func NewTicker(ns int64) *Ticker {
if ns <= 0 {
panic(errors.New("non-positive interval for NewTicker"))
}
c := make(chan int64, 1) // See comment on send in tickerLoop
t := &Ticker{
C: c,
c: c,
ns: ns,
shutdown: make(chan bool, 1),
nextTick: Nanoseconds() + ns,
}
onceStartTickerLoop.Do(startTickerLoop)
// must be run in background so global Tickers can be created
go func() { newTicker <- t }()
return t
}
|