18
18
final readonly class ConcurrencyHandler
19
19
{
20
20
private AsyncExecutionHandler $ executionHandler ;
21
+ private AwaitHandler $ awaitHandler ;
21
22
22
23
/**
23
24
* @param AsyncExecutionHandler $executionHandler Handler for async execution
24
25
*/
25
26
public function __construct (AsyncExecutionHandler $ executionHandler )
26
27
{
27
28
$ this ->executionHandler = $ executionHandler ;
29
+ $ this ->awaitHandler = new AwaitHandler (new FiberContextHandler );
28
30
}
29
31
30
32
/**
31
33
* Execute multiple tasks concurrently with a specified concurrency limit.
32
34
*
33
35
* This method runs multiple tasks simultaneously while ensuring that no more
34
36
* than the specified number of tasks run at the same time. Tasks can be either
35
- * callable functions or existing Promise instances.
37
+ * callable functions or existing Promise instances. Promise instances will be
38
+ * automatically wrapped to ensure proper concurrency control.
36
39
*
37
40
* @param array $tasks Array of callable tasks or Promise instances
38
41
* @param int $concurrency Maximum number of tasks to run simultaneously (default: 10)
@@ -45,29 +48,33 @@ public function concurrent(array $tasks, int $concurrency = 10): PromiseInterfac
45
48
return new Promise (function ($ resolve , $ reject ) use ($ tasks , $ concurrency ) {
46
49
if ($ concurrency <= 0 ) {
47
50
$ reject (new \InvalidArgumentException ('Concurrency limit must be greater than 0 ' ));
48
-
49
51
return ;
50
52
}
51
53
52
54
if (empty ($ tasks )) {
53
55
$ resolve ([]);
54
-
55
56
return ;
56
57
}
57
58
58
59
// Convert tasks to indexed array and preserve original keys
59
60
$ taskList = array_values ($ tasks );
60
61
$ originalKeys = array_keys ($ tasks );
61
62
63
+ // Process tasks to ensure proper async wrapping
64
+ $ processedTasks = [];
65
+ foreach ($ taskList as $ index => $ task ) {
66
+ $ processedTasks [$ index ] = $ this ->wrapTaskForConcurrency ($ task );
67
+ }
68
+
62
69
$ results = [];
63
70
$ running = 0 ;
64
71
$ completed = 0 ;
65
- $ total = count ($ taskList );
72
+ $ total = count ($ processedTasks );
66
73
$ taskIndex = 0 ;
67
74
68
75
$ processNext = function () use (
69
76
&$ processNext ,
70
- &$ taskList ,
77
+ &$ processedTasks ,
71
78
&$ originalKeys ,
72
79
&$ running ,
73
80
&$ completed ,
@@ -81,22 +88,19 @@ public function concurrent(array $tasks, int $concurrency = 10): PromiseInterfac
81
88
// Start as many tasks as we can up to the concurrency limit
82
89
while ($ running < $ concurrency && $ taskIndex < $ total ) {
83
90
$ currentIndex = $ taskIndex ++;
84
- $ task = $ taskList [$ currentIndex ];
91
+ $ task = $ processedTasks [$ currentIndex ];
85
92
$ originalKey = $ originalKeys [$ currentIndex ];
86
93
$ running ++;
87
94
88
95
try {
89
- $ promise = is_callable ($ task )
90
- ? $ this ->executionHandler ->async ($ task )()
91
- : $ task ;
96
+ $ promise = $ this ->executionHandler ->async ($ task )();
92
97
93
98
if (! ($ promise instanceof PromiseInterface)) {
94
99
throw new RuntimeException ('Task must return a Promise or be a callable that returns a Promise ' );
95
100
}
96
101
} catch (Throwable $ e ) {
97
102
$ running --;
98
103
$ reject ($ e );
99
-
100
104
return ;
101
105
}
102
106
@@ -124,8 +128,7 @@ public function concurrent(array $tasks, int $concurrency = 10): PromiseInterfac
124
128
->catch (function ($ error ) use (&$ running , $ reject ) {
125
129
$ running --;
126
130
$ reject ($ error );
127
- })
128
- ;
131
+ });
129
132
}
130
133
};
131
134
@@ -139,7 +142,8 @@ public function concurrent(array $tasks, int $concurrency = 10): PromiseInterfac
139
142
*
140
143
* This method processes tasks in batches sequentially, where each batch
141
144
* runs tasks concurrently up to the specified limit, but waits for the
142
- * entire batch to complete before starting the next batch.
145
+ * entire batch to complete before starting the next batch. Promise instances
146
+ * will be automatically wrapped to ensure proper concurrency control.
143
147
*
144
148
* @param array $tasks Array of callable tasks or Promise instances
145
149
* @param int $batchSize Number of tasks per batch (default: 10)
@@ -151,22 +155,27 @@ public function batch(array $tasks, int $batchSize = 10, ?int $concurrency = nul
151
155
return new Promise (function ($ resolve , $ reject ) use ($ tasks , $ batchSize , $ concurrency ) {
152
156
if ($ batchSize <= 0 ) {
153
157
$ reject (new \InvalidArgumentException ('Batch size must be greater than 0 ' ));
154
-
155
158
return ;
156
159
}
157
160
158
161
if (empty ($ tasks )) {
159
162
$ resolve ([]);
160
-
161
163
return ;
162
164
}
163
165
164
166
$ concurrency = $ concurrency ?? $ batchSize ;
165
167
166
- // Preserve original keys
168
+ // Preserve original keys and wrap tasks
167
169
$ originalKeys = array_keys ($ tasks );
168
170
$ taskValues = array_values ($ tasks );
169
- $ batches = array_chunk ($ taskValues , $ batchSize , false );
171
+
172
+ // Process tasks to ensure proper async wrapping
173
+ $ processedTasks = [];
174
+ foreach ($ taskValues as $ index => $ task ) {
175
+ $ processedTasks [$ index ] = $ this ->wrapTaskForConcurrency ($ task );
176
+ }
177
+
178
+ $ batches = array_chunk ($ processedTasks , $ batchSize , false );
170
179
$ keyBatches = array_chunk ($ originalKeys , $ batchSize , false );
171
180
172
181
$ allResults = [];
@@ -186,7 +195,6 @@ public function batch(array $tasks, int $batchSize = 10, ?int $concurrency = nul
186
195
) {
187
196
if ($ batchIndex >= $ totalBatches ) {
188
197
$ resolve ($ allResults );
189
-
190
198
return ;
191
199
}
192
200
@@ -205,11 +213,44 @@ public function batch(array $tasks, int $batchSize = 10, ?int $concurrency = nul
205
213
$ batchIndex ++;
206
214
EventLoop::getInstance ()->nextTick ($ processNextBatch );
207
215
})
208
- ->catch ($ reject )
209
- ;
216
+ ->catch ($ reject );
210
217
};
211
218
212
219
EventLoop::getInstance ()->nextTick ($ processNextBatch );
213
220
});
214
221
}
222
+
223
+ /**
224
+ * Wrap a task to ensure proper concurrency control.
225
+ *
226
+ * This method ensures all tasks use the await pattern for proper fiber-based concurrency:
227
+ * - All callables are wrapped to ensure their results are awaited
228
+ * - Promise instances are wrapped with await
229
+ * - Other types are wrapped in a callable
230
+ *
231
+ * @param mixed $task The task to wrap
232
+ * @return callable A callable that properly defers execution
233
+ */
234
+ private function wrapTaskForConcurrency (mixed $ task ): callable
235
+ {
236
+ if (is_callable ($ task )) {
237
+ return function () use ($ task ) {
238
+ $ result = $ task ();
239
+ if ($ result instanceof PromiseInterface) {
240
+ return $ this ->awaitHandler ->await ($ result );
241
+ }
242
+ return $ result ;
243
+ };
244
+ }
245
+
246
+ if ($ task instanceof PromiseInterface) {
247
+ return function () use ($ task ) {
248
+ return $ this ->awaitHandler ->await ($ task );
249
+ };
250
+ }
251
+
252
+ return function () use ($ task ) {
253
+ return $ task ;
254
+ };
255
+ }
215
256
}
0 commit comments