-
-
Notifications
You must be signed in to change notification settings - Fork 720
/
scheduler.go
368 lines (330 loc) · 10.4 KB
/
scheduler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
// Copyright 2020 Kentaro Hibino. All rights reserved.
// Use of this source code is governed by a MIT license
// that can be found in the LICENSE file.
package asynq
import (
"fmt"
"os"
"sync"
"time"
"github.com/google/uuid"
"github.com/hibiken/asynq/internal/base"
"github.com/hibiken/asynq/internal/log"
"github.com/hibiken/asynq/internal/rdb"
"github.com/redis/go-redis/v9"
"github.com/robfig/cron/v3"
)
// A Scheduler kicks off tasks at regular intervals based on the user defined schedule.
//
// Schedulers are safe for concurrent use by multiple goroutines.
type Scheduler struct {
id string
state *serverState
heartbeatInterval time.Duration
logger *log.Logger
client *Client
rdb *rdb.RDB
cron *cron.Cron
location *time.Location
done chan struct{}
wg sync.WaitGroup
preEnqueueFunc func(task *Task, opts []Option)
postEnqueueFunc func(info *TaskInfo, err error)
errHandler func(task *Task, opts []Option, err error)
// guards idmap
mu sync.Mutex
// idmap maps Scheduler's entry ID to cron.EntryID
// to avoid using cron.EntryID as the public API of
// the Scheduler.
idmap map[string]cron.EntryID
// When a Scheduler has been created with an existing Redis connection, we do
// not want to close it.
sharedConnection bool
}
const defaultHeartbeatInterval = 10 * time.Second
// NewScheduler returns a new Scheduler instance given the redis connection option.
// The parameter opts is optional, defaults will be used if opts is set to nil
func NewScheduler(r RedisConnOpt, opts *SchedulerOpts) *Scheduler {
redisClient, ok := r.MakeRedisClient().(redis.UniversalClient)
if !ok {
panic(fmt.Sprintf("asynq: unsupported RedisConnOpt type %T", r))
}
scheduler := NewSchedulerFromRedisClient(redisClient, opts)
scheduler.sharedConnection = false
return scheduler
}
// NewSchedulerFromRedisClient returns a new instance of Scheduler given a redis.UniversalClient
// The parameter opts is optional, defaults will be used if opts is set to nil.
// Warning: The underlying redis connection pool will not be closed by Asynq, you are responsible for closing it.
func NewSchedulerFromRedisClient(c redis.UniversalClient, opts *SchedulerOpts) *Scheduler {
if opts == nil {
opts = &SchedulerOpts{}
}
heartbeatInterval := opts.HeartbeatInterval
if heartbeatInterval <= 0 {
heartbeatInterval = defaultHeartbeatInterval
}
logger := log.NewLogger(opts.Logger)
loglevel := opts.LogLevel
if loglevel == level_unspecified {
loglevel = InfoLevel
}
logger.SetLevel(toInternalLogLevel(loglevel))
loc := opts.Location
if loc == nil {
loc = time.UTC
}
return &Scheduler{
id: generateSchedulerID(),
state: &serverState{value: srvStateNew},
heartbeatInterval: heartbeatInterval,
logger: logger,
client: NewClientFromRedisClient(c),
rdb: rdb.NewRDB(c),
cron: cron.New(cron.WithLocation(loc)),
location: loc,
done: make(chan struct{}),
preEnqueueFunc: opts.PreEnqueueFunc,
postEnqueueFunc: opts.PostEnqueueFunc,
errHandler: opts.EnqueueErrorHandler,
idmap: make(map[string]cron.EntryID),
}
}
func generateSchedulerID() string {
host, err := os.Hostname()
if err != nil {
host = "unknown-host"
}
return fmt.Sprintf("%s:%d:%v", host, os.Getpid(), uuid.New())
}
// SchedulerOpts specifies scheduler options.
type SchedulerOpts struct {
// HeartbeatInterval specifies the interval between scheduler heartbeats.
//
// If unset, zero or a negative value, the interval is set to 10 second.
//
// Note: Setting this value too low may add significant load to redis.
//
// By default, HeartbeatInterval is set to 10 seconds.
HeartbeatInterval time.Duration
// Logger specifies the logger used by the scheduler instance.
//
// If unset, the default logger is used.
Logger Logger
// LogLevel specifies the minimum log level to enable.
//
// If unset, InfoLevel is used by default.
LogLevel LogLevel
// Location specifies the time zone location.
//
// If unset, the UTC time zone (time.UTC) is used.
Location *time.Location
// PreEnqueueFunc, if provided, is called before a task gets enqueued by Scheduler.
// The callback function should return quickly to not block the current thread.
PreEnqueueFunc func(task *Task, opts []Option)
// PostEnqueueFunc, if provided, is called after a task gets enqueued by Scheduler.
// The callback function should return quickly to not block the current thread.
PostEnqueueFunc func(info *TaskInfo, err error)
// Deprecated: Use PostEnqueueFunc instead
// EnqueueErrorHandler gets called when scheduler cannot enqueue a registered task
// due to an error.
EnqueueErrorHandler func(task *Task, opts []Option, err error)
}
// enqueueJob encapsulates the job of enqueuing a task and recording the event.
type enqueueJob struct {
id uuid.UUID
cronspec string
task *Task
opts []Option
location *time.Location
logger *log.Logger
client *Client
rdb *rdb.RDB
preEnqueueFunc func(task *Task, opts []Option)
postEnqueueFunc func(info *TaskInfo, err error)
errHandler func(task *Task, opts []Option, err error)
}
func (j *enqueueJob) Run() {
if j.preEnqueueFunc != nil {
j.preEnqueueFunc(j.task, j.opts)
}
info, err := j.client.Enqueue(j.task, j.opts...)
if j.postEnqueueFunc != nil {
j.postEnqueueFunc(info, err)
}
if err != nil {
if j.errHandler != nil {
j.errHandler(j.task, j.opts, err)
}
return
}
j.logger.Debugf("scheduler enqueued a task: %+v", info)
event := &base.SchedulerEnqueueEvent{
TaskID: info.ID,
EnqueuedAt: time.Now().In(j.location),
}
err = j.rdb.RecordSchedulerEnqueueEvent(j.id.String(), event)
if err != nil {
j.logger.Warnf("scheduler could not record enqueue event of enqueued task %s: %v", info.ID, err)
}
}
// Register registers a task to be enqueued on the given schedule specified by the cronspec.
// It returns an ID of the newly registered entry.
func (s *Scheduler) Register(cronspec string, task *Task, opts ...Option) (entryID string, err error) {
job := &enqueueJob{
id: uuid.New(),
cronspec: cronspec,
task: task,
opts: opts,
location: s.location,
client: s.client,
rdb: s.rdb,
logger: s.logger,
preEnqueueFunc: s.preEnqueueFunc,
postEnqueueFunc: s.postEnqueueFunc,
errHandler: s.errHandler,
}
cronID, err := s.cron.AddJob(cronspec, job)
if err != nil {
return "", err
}
s.mu.Lock()
s.idmap[job.id.String()] = cronID
s.mu.Unlock()
return job.id.String(), nil
}
// Unregister removes a registered entry by entry ID.
// Unregister returns a non-nil error if no entries were found for the given entryID.
func (s *Scheduler) Unregister(entryID string) error {
s.mu.Lock()
defer s.mu.Unlock()
cronID, ok := s.idmap[entryID]
if !ok {
return fmt.Errorf("asynq: no scheduler entry found")
}
delete(s.idmap, entryID)
s.cron.Remove(cronID)
return nil
}
// Run starts the scheduler until an os signal to exit the program is received.
// It returns an error if scheduler is already running or has been shutdown.
func (s *Scheduler) Run() error {
if err := s.Start(); err != nil {
return err
}
s.waitForSignals()
s.Shutdown()
return nil
}
// Start starts the scheduler.
// It returns an error if the scheduler is already running or has been shutdown.
func (s *Scheduler) Start() error {
if err := s.start(); err != nil {
return err
}
s.logger.Info("Scheduler starting")
s.logger.Infof("Scheduler timezone is set to %v", s.location)
s.cron.Start()
s.wg.Add(1)
go s.runHeartbeater()
return nil
}
// Checks server state and returns an error if pre-condition is not met.
// Otherwise it sets the server state to active.
func (s *Scheduler) start() error {
s.state.mu.Lock()
defer s.state.mu.Unlock()
switch s.state.value {
case srvStateActive:
return fmt.Errorf("asynq: the scheduler is already running")
case srvStateClosed:
return fmt.Errorf("asynq: the scheduler has already been stopped")
}
s.state.value = srvStateActive
return nil
}
// Shutdown stops and shuts down the scheduler.
func (s *Scheduler) Shutdown() {
s.state.mu.Lock()
if s.state.value == srvStateNew || s.state.value == srvStateClosed {
// scheduler is not running, do nothing and return.
s.state.mu.Unlock()
return
}
s.state.value = srvStateClosed
s.state.mu.Unlock()
s.logger.Info("Scheduler shutting down")
close(s.done) // signal heartbeater to stop
ctx := s.cron.Stop()
<-ctx.Done()
s.wg.Wait()
s.clearHistory()
if err := s.client.Close(); err != nil {
s.logger.Errorf("Failed to close redis client connection: %v", err)
}
if !s.sharedConnection {
s.rdb.Close()
}
s.logger.Info("Scheduler stopped")
}
func (s *Scheduler) runHeartbeater() {
defer s.wg.Done()
ticker := time.NewTicker(s.heartbeatInterval)
for {
select {
case <-s.done:
s.logger.Debugf("Scheduler heatbeater shutting down")
if err := s.rdb.ClearSchedulerEntries(s.id); err != nil {
s.logger.Errorf("Failed to clear the scheduler entries: %v", err)
}
ticker.Stop()
return
case <-ticker.C:
s.beat()
}
}
}
// beat writes a snapshot of entries to redis.
func (s *Scheduler) beat() {
var entries []*base.SchedulerEntry
for _, entry := range s.cron.Entries() {
job := entry.Job.(*enqueueJob)
e := &base.SchedulerEntry{
ID: job.id.String(),
Spec: job.cronspec,
Type: job.task.Type(),
Payload: job.task.Payload(),
Opts: stringifyOptions(job.opts),
Next: entry.Next,
Prev: entry.Prev,
}
entries = append(entries, e)
}
if err := s.rdb.WriteSchedulerEntries(s.id, entries, s.heartbeatInterval*2); err != nil {
s.logger.Warnf("Scheduler could not write heartbeat data: %v", err)
}
}
func stringifyOptions(opts []Option) []string {
var res []string
for _, opt := range opts {
res = append(res, opt.String())
}
return res
}
func (s *Scheduler) clearHistory() {
for _, entry := range s.cron.Entries() {
job := entry.Job.(*enqueueJob)
if err := s.rdb.ClearSchedulerHistory(job.id.String()); err != nil {
s.logger.Warnf("Could not clear scheduler history for entry %q: %v", job.id.String(), err)
}
}
}
// Ping performs a ping against the redis connection.
func (s *Scheduler) Ping() error {
s.state.mu.Lock()
defer s.state.mu.Unlock()
if s.state.value == srvStateClosed {
return nil
}
return s.rdb.Ping()
}