Skip to content
This repository was archived by the owner on Feb 20, 2024. It is now read-only.

Commit 0a8e61e

Browse files
authored
Merge pull request #31 from catmullet/fix-no-error
Fix for issue #30 Error not bubbling up.
2 parents ae53f26 + 361f319 commit 0a8e61e

File tree

2 files changed

+167
-57
lines changed

2 files changed

+167
-57
lines changed

workers.go

Lines changed: 20 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -156,10 +156,10 @@ func (r *runner) SetTimeout(duration time.Duration) Runner {
156156

157157
// Wait calls stop on workers and waits for the channel to drain.
158158
// !!Should only be called when certain nothing will send to worker.
159-
func (r *runner) Wait() (err error) {
159+
func (r *runner) Wait() error {
160160
r.waitForDrain()
161-
if err = <-r.Stop(); err != nil || !errors.Is(err, context.Canceled) {
162-
return
161+
if err := <-r.Stop(); err != nil && !errors.Is(err, context.Canceled) {
162+
return err
163163
}
164164
return nil
165165
}
@@ -216,6 +216,8 @@ func (r *runner) startWork() {
216216
go func() {
217217
var workerWG = new(sync.WaitGroup)
218218
var closeOnce = new(sync.Once)
219+
220+
// write out error if not nil on exit.
219221
defer func() {
220222
workerWG.Wait()
221223
r.errChan <- err
@@ -227,27 +229,22 @@ func (r *runner) startWork() {
227229
r.wg.Done()
228230
}()
229231
for in := range r.inChan {
230-
select {
231-
case <-r.ctx.Done():
232-
err = context.Canceled
233-
continue
234-
default:
235-
r.limiter <- struct{}{}
236-
workerWG.Add(1)
237-
go func() {
238-
defer func() {
239-
<-r.limiter
240-
workerWG.Done()
241-
}()
242-
if workErr := r.workFunc(in, r.outChan); workErr != nil {
243-
r.once.Do(func() {
244-
errors.As(err, &workErr)
245-
r.cancel()
246-
return
247-
})
248-
}
232+
input := in
233+
r.limiter <- struct{}{}
234+
workerWG.Add(1)
235+
go func() {
236+
defer func() {
237+
<-r.limiter
238+
workerWG.Done()
249239
}()
250-
}
240+
if err := r.workFunc(input, r.outChan); err != nil {
241+
r.once.Do(func() {
242+
r.errChan <- err
243+
r.cancel()
244+
return
245+
})
246+
}
247+
}()
251248
}
252249
}()
253250
}

workers_test.go

Lines changed: 147 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -14,53 +14,58 @@ import (
1414
)
1515

1616
const (
17-
workerCount = 1000
17+
workerCount = 100000
1818
workerTimeout = time.Millisecond * 300
1919
runTimes = 100000
2020
)
2121

2222
type WorkerOne struct {
23+
Count int
24+
sync.Mutex
2325
}
2426
type WorkerTwo struct {
27+
Count int
28+
sync.Mutex
2529
}
2630

27-
func NewWorkerOne() Worker {
31+
func NewWorkerOne() *WorkerOne {
2832
return &WorkerOne{}
2933
}
3034

31-
func NewWorkerTwo() Worker {
35+
func NewWorkerTwo() *WorkerTwo {
3236
return &WorkerTwo{}
3337
}
3438

39+
func (wo *WorkerOne) CurrentCount() int {
40+
wo.Lock()
41+
defer wo.Unlock()
42+
return wo.Count
43+
}
44+
3545
func (wo *WorkerOne) Work(in interface{}, out chan<- interface{}) error {
36-
var workerOne = "worker_one"
3746
mut.Lock()
38-
if val, ok := count[workerOne]; ok {
39-
count[workerOne] = val + 1
40-
} else {
41-
count[workerOne] = 1
42-
}
47+
wo.Count = wo.Count + 1
4348
mut.Unlock()
4449

4550
total := in.(int) * 2
4651
out <- total
4752
return nil
4853
}
4954

55+
func (wt *WorkerTwo) CurrentCount() int {
56+
wt.Lock()
57+
defer wt.Unlock()
58+
return wt.Count
59+
}
60+
5061
func (wt *WorkerTwo) Work(in interface{}, out chan<- interface{}) error {
51-
var workerTwo = "worker_two"
5262
mut.Lock()
53-
if val, ok := count[workerTwo]; ok {
54-
count[workerTwo] = val + 1
55-
} else {
56-
count[workerTwo] = 1
57-
}
63+
wt.Count = wt.Count + 1
5864
mut.Unlock()
5965
return nil
6066
}
6167

6268
var (
63-
count = make(map[string]int)
6469
mut = sync.RWMutex{}
6570
err = errors.New("test error")
6671
deadline = func() time.Time { return time.Now().Add(workerTimeout) }
@@ -184,24 +189,57 @@ func TestWorkers(t *testing.T) {
184189
workerOne.Send(i)
185190
}
186191

187-
if err := workerOne.Wait(); err != nil && !tt.errExpected {
188-
fmt.Println(err)
189-
t.Fail()
192+
if err := workerOne.Wait(); err != nil && (!tt.errExpected) {
193+
t.Error(err)
190194
}
191195
if err := workerTwo.Wait(); err != nil && !tt.errExpected {
192-
fmt.Println(err)
193-
t.Fail()
196+
t.Error(err)
194197
}
195198
})
196199
}
197200
}
198201

199-
func TestWorkersFinish(t *testing.T) {
202+
func TestWorkersFinish100(t *testing.T) {
203+
const workCount = 100
204+
ctx := context.Background()
205+
w1 := NewWorkerOne()
206+
w2 := NewWorkerTwo()
207+
workerOne := NewRunner(ctx, w1, 1000).Start()
208+
workerTwo := NewRunner(ctx, w2, 1000).InFrom(workerOne).Start()
209+
210+
for i := 0; i < workCount; i++ {
211+
workerOne.Send(rand.Intn(100))
212+
}
213+
214+
if err := workerOne.Wait(); err != nil {
215+
fmt.Println(err)
216+
}
217+
218+
if err := workerTwo.Wait(); err != nil {
219+
fmt.Println(err)
220+
}
221+
222+
if w1.CurrentCount() != workCount {
223+
t.Log("worker one failed to finish,", "worker_one count", w1.CurrentCount(), "/ 100000")
224+
t.Fail()
225+
}
226+
if w2.CurrentCount() != workCount {
227+
t.Log("worker two failed to finish,", "worker_two count", w2.CurrentCount(), "/ 100000")
228+
t.Fail()
229+
}
230+
231+
t.Logf("worker_one count: %d, worker_two count: %d", w1.CurrentCount(), w2.CurrentCount())
232+
}
233+
234+
func TestWorkersFinish100000(t *testing.T) {
235+
const workCount = 100000
200236
ctx := context.Background()
201-
workerOne := NewRunner(ctx, NewWorkerOne(), 1000).Start()
202-
workerTwo := NewRunner(ctx, NewWorkerTwo(), 1000).InFrom(workerOne).Start()
237+
w1 := NewWorkerOne()
238+
w2 := NewWorkerTwo()
239+
workerOne := NewRunner(ctx, w1, 1000).Start()
240+
workerTwo := NewRunner(ctx, w2, 1000).InFrom(workerOne).Start()
203241

204-
for i := 0; i < 100000; i++ {
242+
for i := 0; i < workCount; i++ {
205243
workerOne.Send(rand.Intn(100))
206244
}
207245

@@ -213,28 +251,103 @@ func TestWorkersFinish(t *testing.T) {
213251
fmt.Println(err)
214252
}
215253

216-
if count["worker_one"] != 100000 {
217-
fmt.Println("worker one failed to finish,", "worker_one count", count["worker_one"], "/ 100000")
254+
if w1.CurrentCount() != workCount {
255+
t.Log("worker one failed to finish,", "worker_one count", w1.CurrentCount(), "/ 100000")
218256
t.Fail()
219257
}
220-
if count["worker_two"] != 100000 {
221-
fmt.Println("worker two failed to finish,", "worker_two count", count["worker_two"], "/ 100000")
258+
if w2.CurrentCount() != workCount {
259+
t.Log("worker two failed to finish,", "worker_two count", w2.CurrentCount(), "/ 100000")
222260
t.Fail()
223261
}
262+
263+
t.Logf("worker_one count: %d, worker_two count: %d", w1.CurrentCount(), w2.CurrentCount())
224264
}
225265

226-
func BenchmarkGoWorkers(b *testing.B) {
266+
func TestWorkersFinish1000000(t *testing.T) {
267+
const workCount = 1000000
227268
ctx := context.Background()
228-
worker := NewRunner(ctx, NewTestWorkerObject(workBasicNoOut()), workerCount).Start()
269+
w1 := NewWorkerOne()
270+
w2 := NewWorkerTwo()
271+
workerOne := NewRunner(ctx, w1, 1000).Start()
272+
workerTwo := NewRunner(ctx, w2, 1000).InFrom(workerOne).Start()
273+
274+
for i := 0; i < workCount; i++ {
275+
workerOne.Send(rand.Intn(100))
276+
}
277+
278+
if err := workerOne.Wait(); err != nil {
279+
fmt.Println(err)
280+
}
281+
282+
if err := workerTwo.Wait(); err != nil {
283+
fmt.Println(err)
284+
}
229285

230-
b.StartTimer()
286+
if w1.CurrentCount() != workCount {
287+
t.Log("worker one failed to finish,", "worker_one count", w1.CurrentCount(), "/ 100000")
288+
t.Fail()
289+
}
290+
if w2.CurrentCount() != workCount {
291+
t.Log("worker two failed to finish,", "worker_two count", w2.CurrentCount(), "/ 100000")
292+
t.Fail()
293+
}
294+
295+
t.Logf("worker_one count: %d, worker_two count: %d", w1.CurrentCount(), w2.CurrentCount())
296+
}
297+
298+
func BenchmarkGoWorkers1to1(b *testing.B) {
299+
worker := NewRunner(context.Background(), NewTestWorkerObject(workBasicNoOut()), 1000).Start()
300+
301+
b.ResetTimer()
231302
for i := 0; i < b.N; i++ {
232-
for j := 0; j < runTimes; j++ {
303+
for j := 0; j < 1000; j++ {
233304
worker.Send(j)
234305
}
235306
}
236-
237307
b.StopTimer()
308+
309+
if err := worker.Wait(); err != nil {
310+
b.Error(err)
311+
}
312+
}
313+
314+
func Benchmark100GoWorkers(b *testing.B) {
315+
b.ReportAllocs()
316+
worker := NewRunner(context.Background(), NewTestWorkerObject(workBasicNoOut()), 100).Start()
317+
318+
b.ResetTimer()
319+
for i := 0; i < b.N; i++ {
320+
worker.Send(i)
321+
}
322+
323+
if err := worker.Wait(); err != nil {
324+
b.Error(err)
325+
}
326+
}
327+
328+
func Benchmark1000GoWorkers(b *testing.B) {
329+
b.ReportAllocs()
330+
worker := NewRunner(context.Background(), NewTestWorkerObject(workBasicNoOut()), 1000).Start()
331+
332+
b.ResetTimer()
333+
for i := 0; i < b.N; i++ {
334+
worker.Send(i)
335+
}
336+
337+
if err := worker.Wait(); err != nil {
338+
b.Error(err)
339+
}
340+
}
341+
342+
func Benchmark10000GoWorkers(b *testing.B) {
343+
b.ReportAllocs()
344+
worker := NewRunner(context.Background(), NewTestWorkerObject(workBasicNoOut()), 10000).Start()
345+
346+
b.ResetTimer()
347+
for i := 0; i < b.N; i++ {
348+
worker.Send(i)
349+
}
350+
238351
if err := worker.Wait(); err != nil {
239352
b.Error(err)
240353
}

0 commit comments

Comments
 (0)