2
2
// SharedCoroutineDispatcher.swift
3
3
// SwiftCoroutine
4
4
//
5
- // Created by Alex Belozierov on 09.03 .2020.
5
+ // Created by Alex Belozierov on 03.04 .2020.
6
6
// Copyright © 2020 Alex Belozierov. All rights reserved.
7
7
//
8
8
9
9
import Dispatch
10
10
11
- internal final class SharedCoroutineDispatcher : _CoroutineTaskExecutor {
11
+ internal final class SharedCoroutineDispatcher : CoroutineTaskExecutor {
12
12
13
- private struct Task {
13
+ internal struct Task {
14
14
let scheduler : CoroutineScheduler , task : ( ) -> Void
15
15
}
16
16
17
17
private let mutex = PsxLock ( )
18
18
private let stackSize : Int
19
+ private var tasks = FifoQueue < Task > ( )
20
+
19
21
private var contextsCount : Int
20
22
private var freeQueues = [ SharedCoroutineQueue] ( )
21
23
private var suspendedQueues = Set < SharedCoroutineQueue > ( )
22
- private var tasks = FifoQueue < Task > ( )
23
24
private var freeCount : AtomicInt
24
25
25
26
internal init ( contextsCount: Int , stackSize: Int ) {
@@ -31,14 +32,15 @@ internal final class SharedCoroutineDispatcher: _CoroutineTaskExecutor {
31
32
startDispatchSource ( )
32
33
}
33
34
35
+ // MARK: - Start
36
+
34
37
internal func execute( on scheduler: CoroutineScheduler , task: @escaping ( ) -> Void ) {
35
38
func perform( ) {
36
39
freeCount. update { max ( 0 , $0 - 1 ) }
37
40
mutex. lock ( )
38
41
if let queue = freeQueue {
39
42
mutex. unlock ( )
40
- SharedCoroutine ( dispatcher: self , queue: queue, scheduler: scheduler) . start ( task)
41
- performNext ( for: queue)
43
+ queue. start ( dispatcher: self , task: . init( scheduler: scheduler, task: task) )
42
44
} else {
43
45
tasks. push ( . init( scheduler: scheduler, task: task) )
44
46
mutex. unlock ( )
@@ -58,8 +60,7 @@ internal final class SharedCoroutineDispatcher: _CoroutineTaskExecutor {
58
60
if let queue = freeQueues. popLast ( ) { return queue }
59
61
if contextsCount > 0 {
60
62
contextsCount -= 1
61
- let context = CoroutineContext ( stackSize: stackSize)
62
- return SharedCoroutineQueue ( context: context)
63
+ return SharedCoroutineQueue ( stackSize: stackSize)
63
64
} else if suspendedQueues. count < 2 {
64
65
return suspendedQueues. popFirst ( )
65
66
}
@@ -74,28 +75,9 @@ internal final class SharedCoroutineDispatcher: _CoroutineTaskExecutor {
74
75
return suspendedQueues. remove ( min)
75
76
}
76
77
78
+ // MARK: - Resume
79
+
77
80
internal func resume( _ coroutine: SharedCoroutine ) {
78
- // func perform() {
79
- // mutex.lock()
80
- // if suspendedQueues.remove(coroutine.queue) == nil {
81
- // coroutine.queue.push(coroutine)
82
- // mutex.unlock()
83
- // } else {
84
- // freeCount.decrease()
85
- // mutex.unlock()
86
- // coroutine.resume()
87
- // performNext(for: coroutine.queue)
88
- // }
89
- // }
90
- // mutex.lock()
91
- // if suspendedQueues.contains(coroutine.queue) {
92
- // mutex.unlock()
93
- // coroutine.scheduler.scheduleTask(perform)
94
- // } else {
95
- // coroutine.queue.push(coroutine)
96
- // mutex.unlock()
97
- // }
98
-
99
81
mutex. lock ( )
100
82
if suspendedQueues. remove ( coroutine. queue) == nil {
101
83
coroutine. queue. push ( coroutine)
@@ -104,54 +86,33 @@ internal final class SharedCoroutineDispatcher: _CoroutineTaskExecutor {
104
86
mutex. unlock ( )
105
87
freeCount. decrease ( )
106
88
coroutine. scheduler. scheduleTask {
107
- coroutine. resume ( )
108
- self . performNext ( for: coroutine. queue)
89
+ coroutine. queue. resume ( coroutine: coroutine)
109
90
}
110
91
}
111
92
}
112
93
113
- internal func restart( _ coroutine: SharedCoroutine ) {
114
- coroutine. scheduler. scheduleTask {
115
- coroutine. resume ( )
116
- self . performNext ( for: coroutine. queue)
117
- }
118
- }
94
+ // MARK: - Next
119
95
120
- private enum NextState : Int {
121
- case running, none
122
- }
123
-
124
- private func performNext( for queue: SharedCoroutineQueue ) {
125
- var state = AtomicEnum ( value: NextState . none)
126
- while queue. isFree {
127
- mutex. lock ( )
128
- if let coroutine = queue. pop ( ) {
129
- mutex. unlock ( )
130
- state. value = . running
131
- coroutine. scheduler. scheduleTask {
132
- coroutine. resume ( )
133
- if state. update ( . none) == . running { return }
134
- self . performNext ( for: queue)
135
- }
136
- } else if let task = tasks. pop ( ) {
137
- mutex. unlock ( )
138
- state. value = . running
139
- task. scheduler. scheduleTask {
140
- SharedCoroutine ( dispatcher: self , queue: queue, scheduler: task. scheduler)
141
- . start ( task. task)
142
- if state. update ( . none) == . running { return }
143
- self . performNext ( for: queue)
144
- }
145
- } else if queue. started == 0 {
96
+ internal func performNext( for queue: SharedCoroutineQueue ) {
97
+ mutex. lock ( )
98
+ if let coroutine = queue. pop ( ) {
99
+ mutex. unlock ( )
100
+ coroutine. scheduler. scheduleTask {
101
+ queue. resume ( coroutine: coroutine)
102
+ }
103
+ } else if let task = tasks. pop ( ) {
104
+ mutex. unlock ( )
105
+ task. scheduler. scheduleTask {
106
+ queue. start ( dispatcher: self , task: task)
107
+ }
108
+ } else {
109
+ if queue. started == 0 {
146
110
freeQueues. append ( queue)
147
- freeCount. increase ( )
148
- return mutex. unlock ( )
149
111
} else {
150
112
suspendedQueues. insert ( queue)
151
- freeCount. increase ( )
152
- return mutex. unlock ( )
153
113
}
154
- if state. update ( . none) == . running { return }
114
+ freeCount. increase ( )
115
+ mutex. unlock ( )
155
116
}
156
117
}
157
118
0 commit comments