Skip to content
This repository was archived by the owner on Feb 2, 2025. It is now read-only.

Commit d1fc1da

Browse files
authored
Merge pull request #7 from belozierov/NewSharedCoroutineDispatcher
New shared coroutine dispatcher
2 parents faa347f + 2cca971 commit d1fc1da

File tree

13 files changed

+313
-163
lines changed

13 files changed

+313
-163
lines changed

Sources/SwiftCoroutine/Coroutine/CoroutineProtocol/CoroutineProtocol.swift

Lines changed: 4 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -24,17 +24,10 @@ import Darwin
2424
extension CoroutineProtocol {
2525

2626
@inlinable internal func performAsCurrent<T>(_ block: () -> T) -> T {
27-
let unmanaged = Unmanaged.passRetained(self)
28-
defer { unmanaged.release() }
29-
if let caller = pthread_getspecific(.coroutine) {
30-
pthread_setspecific(.coroutine, unmanaged.toOpaque())
31-
defer { pthread_setspecific(.coroutine, caller) }
32-
return block()
33-
} else {
34-
pthread_setspecific(.coroutine, unmanaged.toOpaque())
35-
defer { pthread_setspecific(.coroutine, nil) }
36-
return block()
37-
}
27+
let caller = pthread_getspecific(.coroutine)
28+
pthread_setspecific(.coroutine, Unmanaged.passUnretained(self).toOpaque())
29+
defer { pthread_setspecific(.coroutine, caller) }
30+
return block()
3831
}
3932

4033
}

Sources/SwiftCoroutine/Coroutine/SharedCoroutineDispatcher/SharedCoroutineDispatcher.swift

Lines changed: 63 additions & 109 deletions
Original file line numberDiff line numberDiff line change
@@ -6,151 +6,105 @@
66
// Copyright © 2020 Alex Belozierov. All rights reserved.
77
//
88

9-
import Dispatch
10-
119
internal final class SharedCoroutineDispatcher: CoroutineTaskExecutor {
1210

1311
internal struct Task {
1412
let scheduler: CoroutineScheduler, task: () -> Void
1513
}
1614

17-
private let mutex = PsxLock()
18-
private let stackSize: Int
19-
private var tasks = FifoQueue<Task>()
20-
21-
private var contextsCount: Int
22-
private var freeQueues = [SharedCoroutineQueue]()
23-
private var suspendedQueues = Set<SharedCoroutineQueue>()
24-
private var freeCount: AtomicInt
15+
private let queuesCount: Int
16+
private let queues: UnsafeMutablePointer<SharedCoroutineQueue>
17+
private var freeQueuesMask = AtomicBitMask()
18+
private var suspendedQueuesMask = AtomicBitMask()
19+
private var tasks = ThreadSafeFifoQueues<Task>()
2520

2621
internal init(contextsCount: Int, stackSize: Int) {
27-
self.stackSize = stackSize
28-
self.contextsCount = contextsCount
29-
freeCount = AtomicInt(value: contextsCount)
30-
freeQueues.reserveCapacity(contextsCount)
31-
suspendedQueues.reserveCapacity(contextsCount)
32-
startDispatchSource()
22+
queuesCount = min(contextsCount, 63)
23+
queues = .allocate(capacity: queuesCount)
24+
(0..<queuesCount).forEach {
25+
freeQueuesMask.insert($0)
26+
(queues + $0).initialize(to: .init(tag: $0, stackSize: stackSize))
27+
}
3328
}
3429

35-
// MARK: - Start
30+
// MARK: - Free
3631

37-
internal func execute(on scheduler: CoroutineScheduler, task: @escaping () -> Void) {
38-
func perform() {
39-
freeCount.update { max(0, $0 - 1) }
40-
mutex.lock()
41-
if let queue = freeQueue {
42-
mutex.unlock()
43-
queue.start(dispatcher: self, task: .init(scheduler: scheduler, task: task))
44-
} else {
45-
tasks.push(.init(scheduler: scheduler, task: task))
46-
mutex.unlock()
47-
}
48-
}
49-
if freeCount.value == 0 {
50-
mutex.lock()
51-
defer { mutex.unlock() }
52-
if freeCount.value == 0 {
53-
return tasks.push(.init(scheduler: scheduler, task: task))
54-
}
55-
}
56-
scheduler.scheduleTask(perform)
32+
private var hasFree: Bool {
33+
!freeQueuesMask.isEmpty || !suspendedQueuesMask.isEmpty
5734
}
5835

5936
private var freeQueue: SharedCoroutineQueue? {
60-
if let queue = freeQueues.popLast() { return queue }
61-
if contextsCount > 0 {
62-
contextsCount -= 1
63-
return SharedCoroutineQueue(stackSize: stackSize)
64-
} else if suspendedQueues.count < 2 {
65-
return suspendedQueues.popFirst()
37+
if !freeQueuesMask.isEmpty, let index = freeQueuesMask.pop() { return queues[index] }
38+
if !suspendedQueuesMask.isEmpty, let index = suspendedQueuesMask
39+
.pop(offset: suspendedQueuesMask.rawValue % queuesCount) {
40+
return queues[index]
6641
}
67-
var min = suspendedQueues.first!
68-
for queue in suspendedQueues {
69-
if queue.started == 1 {
70-
return suspendedQueues.remove(queue)
71-
} else if queue.started < min.started {
72-
min = queue
73-
}
74-
}
75-
return suspendedQueues.remove(min)
42+
return nil
7643
}
7744

78-
// MARK: - Resume
79-
80-
internal func resume(_ coroutine: SharedCoroutine) {
81-
mutex.lock()
82-
if suspendedQueues.remove(coroutine.queue) == nil {
83-
coroutine.queue.push(coroutine)
84-
mutex.unlock()
85-
} else {
86-
mutex.unlock()
87-
freeCount.decrease()
88-
coroutine.scheduler.scheduleTask {
89-
coroutine.queue.resume(coroutine: coroutine)
90-
}
91-
}
45+
private func pushTask(_ task: Task) {
46+
tasks.push(task)
47+
if hasFree { tasks.pop().map(startTask) }
9248
}
9349

94-
// MARK: - Next
50+
// MARK: - Start
9551

96-
internal func performNext(for queue: SharedCoroutineQueue) {
97-
mutex.lock()
98-
if let coroutine = queue.pop() {
99-
mutex.unlock()
100-
coroutine.scheduler.scheduleTask {
101-
queue.resume(coroutine: coroutine)
102-
}
103-
} else if let task = tasks.pop() {
104-
mutex.unlock()
105-
task.scheduler.scheduleTask {
52+
internal func execute(on scheduler: CoroutineScheduler, task: @escaping () -> Void) {
53+
hasFree
54+
? startTask(.init(scheduler: scheduler, task: task))
55+
: pushTask(.init(scheduler: scheduler, task: task))
56+
}
57+
58+
private func startTask(_ task: Task) {
59+
task.scheduler.scheduleTask {
60+
if let queue = self.freeQueue {
10661
queue.start(dispatcher: self, task: task)
107-
}
108-
} else {
109-
if queue.started == 0 {
110-
freeQueues.append(queue)
11162
} else {
112-
suspendedQueues.insert(queue)
63+
self.pushTask(task)
11364
}
114-
freeCount.increase()
115-
mutex.unlock()
11665
}
11766
}
11867

119-
// MARK: - DispatchSourceMemoryPressure
120-
121-
#if os(Linux)
122-
123-
private func startDispatchSource() {}
124-
125-
#else
126-
127-
private lazy var memoryPressureSource: DispatchSourceMemoryPressure = {
128-
let source = DispatchSource.makeMemoryPressureSource(eventMask: [.warning, .critical])
129-
source.setEventHandler { [unowned self] in self.reset() }
130-
return source
131-
}()
68+
// MARK: - Resume
13269

133-
private func startDispatchSource() {
134-
if #available(OSX 10.12, iOS 10.0, *) {
135-
memoryPressureSource.activate()
70+
internal func resume(_ coroutine: SharedCoroutine) {
71+
coroutine.queue.mutex.lock()
72+
if suspendedQueuesMask.remove(coroutine.queue.tag) {
73+
coroutine.queue.mutex.unlock()
74+
coroutine.resumeOnQueue()
13675
} else {
137-
memoryPressureSource.resume()
76+
coroutine.queue.prepared.push(coroutine)
77+
coroutine.queue.mutex.unlock()
13878
}
13979
}
14080

141-
#endif
81+
// MARK: - Next
14282

143-
internal func reset() {
144-
mutex.lock()
145-
contextsCount += freeQueues.count
146-
freeCount.add(freeQueues.count)
147-
freeQueues.removeAll(keepingCapacity: true)
148-
mutex.unlock()
83+
internal func performNext(for queue: SharedCoroutineQueue) {
84+
queue.mutex.lock()
85+
if let coroutine = queue.prepared.pop() {
86+
queue.mutex.unlock()
87+
coroutine.resumeOnQueue()
88+
} else {
89+
queue.started == 0
90+
? freeQueuesMask.insert(queue.tag)
91+
: suspendedQueuesMask.insert(queue.tag)
92+
queue.mutex.unlock()
93+
if hasFree { tasks.pop().map(startTask) }
94+
}
14995
}
15096

15197
deinit {
152-
mutex.free()
98+
queues.deinitialize(count: queuesCount)
99+
queues.deallocate()
153100
}
154101

155102
}
156103

104+
extension SharedCoroutine {
105+
106+
fileprivate func resumeOnQueue() {
107+
scheduler.scheduleTask { self.queue.resume(coroutine: self) }
108+
}
109+
110+
}

Sources/SwiftCoroutine/Coroutine/SharedCoroutineDispatcher/SharedCoroutineQueue.swift

Lines changed: 7 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -14,25 +14,18 @@ internal final class SharedCoroutineQueue {
1414
case finished, suspended, restarting
1515
}
1616

17+
internal let tag: Int
1718
internal let context: CoroutineContext
19+
internal let mutex = PsxLock()
20+
internal var prepared = FifoQueue<SharedCoroutine>()
1821
private var coroutine: SharedCoroutine?
19-
private var prepared = FifoQueue<SharedCoroutine>()
2022
private(set) var started = 0
2123

22-
internal init(stackSize size: Int) {
24+
internal init(tag: Int, stackSize size: Int) {
25+
self.tag = tag
2326
context = CoroutineContext(stackSize: size)
2427
}
2528

26-
// MARK: - Queue
27-
28-
internal func push(_ coroutine: SharedCoroutine) {
29-
prepared.push(coroutine)
30-
}
31-
32-
internal func pop() -> SharedCoroutine? {
33-
prepared.pop()
34-
}
35-
3629
// MARK: - Actions
3730

3831
internal func start(dispatcher: SharedCoroutineDispatcher, task: Task) {
@@ -69,16 +62,8 @@ internal final class SharedCoroutineQueue {
6962
}
7063
}
7164

72-
}
73-
74-
extension SharedCoroutineQueue: Hashable {
75-
76-
@inlinable internal static func == (lhs: SharedCoroutineQueue, rhs: SharedCoroutineQueue) -> Bool {
77-
lhs === rhs
78-
}
79-
80-
@inlinable internal func hash(into hasher: inout Hasher) {
81-
ObjectIdentifier(self).hash(into: &hasher)
65+
deinit {
66+
mutex.free()
8267
}
8368

8469
}
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
//
2+
// AtomicBitMask.swift
3+
// SwiftCoroutine
4+
//
5+
// Created by Alex Belozierov on 06.04.2020.
6+
// Copyright © 2020 Alex Belozierov. All rights reserved.
7+
//
8+
9+
private let deBruijn = [00, 01, 48, 02, 57, 49, 28, 03,
10+
61, 58, 50, 42, 38, 29, 17, 04,
11+
62, 55, 59, 36, 53, 51, 43, 22,
12+
45, 39, 33, 30, 24, 18, 12, 05,
13+
63, 47, 56, 27, 60, 41, 37, 16,
14+
54, 35, 52, 21, 44, 32, 23, 11,
15+
46, 26, 40, 15, 34, 20, 31, 10,
16+
25, 14, 19, 09, 13, 08, 07, 06]
17+
18+
struct AtomicBitMask {
19+
20+
private var atomic = AtomicInt(value: 0)
21+
var rawValue: Int { atomic.value }
22+
var isEmpty: Bool { atomic.value == 0 }
23+
24+
mutating func insert(_ index: Int) {
25+
atomic.update { $0 | (1 << index) }
26+
}
27+
28+
mutating func remove(_ index: Int) -> Bool {
29+
if isEmpty { return false }
30+
let (new, old) = atomic.update { $0 & ~(1 << index) }
31+
return new != old
32+
}
33+
34+
mutating func pop() -> Int? {
35+
var index: Int!
36+
atomic.update {
37+
if $0 == 0 { index = nil; return $0 }
38+
let uint = UInt(bitPattern: $0)
39+
let value = uint & (0 &- uint)
40+
index = deBruijn[Int((value &* 285870213051386505) >> 58)]
41+
return $0 & ~(1 << index)
42+
}
43+
return index
44+
}
45+
46+
mutating func pop(offset: Int) -> Int? {
47+
var index: Int!
48+
atomic.update {
49+
if $0 == 0 { index = nil; return $0 }
50+
let uint = UInt(bitPattern: ($0 << offset) + ($0 >> (64 - offset)))
51+
let value = uint & (0 &- uint)
52+
index = deBruijn[Int((value &* 285870213051386505) >> 58)] - offset
53+
if index < 0 { index += 64 }
54+
return $0 & ~(1 << index)
55+
}
56+
return index
57+
}
58+
59+
}

Sources/SwiftCoroutine/Helpers/AtomicInt.swift

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,6 @@ internal struct AtomicInt {
3333
}
3434
}
3535

36-
@inlinable mutating func increase() { add(1) }
37-
@inlinable mutating func decrease() { add(-1) }
38-
3936
@discardableResult @inlinable
4037
mutating func update(_ transform: (Int) -> Int) -> (old: Int, new: Int) {
4138
withUnsafeMutablePointer(to: &_value) {

0 commit comments

Comments
 (0)