-
Notifications
You must be signed in to change notification settings - Fork 4
/
jobqueue_test.go
104 lines (79 loc) · 2.23 KB
/
jobqueue_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
package jobqueue_test
import (
"errors"
"runtime"
"sync"
"testing"
"time"
. "gopkg.in/check.v1"
jobqueue "github.com/dirkaholic/kyoo"
"github.com/dirkaholic/kyoo/job"
)
// Hook up gocheck into the "go test" runner.
func Test(t *testing.T) { TestingT(t) }
type JobQueueSuite struct{}
var _ = Suite(&JobQueueSuite{})
func (s *JobQueueSuite) TestJobQueueStartsAndStopsSuccesfully(c *C) {
queue := jobqueue.NewJobQueue(runtime.NumCPU())
queue.Start()
c.Assert(queue.Stopped(), Equals, false)
queue.Stop()
c.Assert(queue.Stopped(), Equals, true)
}
func (s *JobQueueSuite) TestJobQueueExecutesFunctionSuccessfully(c *C) {
queue := jobqueue.NewJobQueue(runtime.NumCPU())
queue.Start()
funcExecuted := false
queue.Submit(&job.FuncExecutorJob{Func: func() error {
funcExecuted = true
return nil
}})
queue.Stop()
c.Assert(funcExecuted, Equals, true)
}
func (s *JobQueueSuite) TestJobQueueSetsJobErrorWhenErrorOccursDuringExecution(c *C) {
queue := jobqueue.NewJobQueue(runtime.NumCPU())
queue.Start()
job := &job.FuncExecutorJob{Func: func() error {
return errors.New("Error while executing function")
}}
c.Assert(job.Err, IsNil)
queue.Submit(job)
queue.Stop()
c.Assert(job.Err, NotNil)
}
func (s *JobQueueSuite) TestAddingManyJobsWithLowNumberOfWorkersAvailable(c *C) {
queue := jobqueue.NewJobQueue(2)
queue.Start()
jobCount := 10000
processCount := 0
processCountMutex := &sync.Mutex{}
for index := 0; index < jobCount; index++ {
queue.Submit(&job.FuncExecutorJob{Func: func() error {
processCountMutex.Lock()
processCount++
processCountMutex.Unlock()
return nil
}})
}
queue.Stop()
c.Assert(processCount, Equals, jobCount)
}
func (s *JobQueueSuite) TestPendingJobsAreExecutedWhenQueueIsStopped(c *C) {
queue := jobqueue.NewJobQueue(runtime.NumCPU())
queue.Start()
jobCount := 100
processCount := 0
processCountMutex := &sync.Mutex{}
for index := 0; index < jobCount; index++ {
queue.Submit(&job.FuncExecutorJob{Func: func() error {
time.Sleep(10 * time.Millisecond) // add a little delay so the queue is stopped before all jobs are finished
processCountMutex.Lock()
processCount++
processCountMutex.Unlock()
return nil
}})
}
queue.Stop()
c.Assert(processCount, Equals, jobCount)
}