Skip to content

Commit 310cc23

Browse files
authored
Increase buffer in mapPar, add more tests, improve performance (#350)
1 parent f087751 commit 310cc23

File tree

2 files changed

+127
-6
lines changed

2 files changed

+127
-6
lines changed

core/src/main/scala/ox/flow/FlowOps.scala

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -192,12 +192,16 @@ class FlowOps[+T]:
192192
*/
193193
def mapPar[U](parallelism: Int)(f: T => U)(using BufferCapacity): Flow[U] = Flow.usingEmitInline: emit =>
194194
val s = new Semaphore(parallelism)
195-
val inProgress = Channel.withCapacity[Fork[Option[U]]](parallelism)
195+
// providing extra capacity in the `inProgress` channel (but still limiting it so that processing is bounded):
196+
// 1. starting more forks than parallelism, so that they are read to do their work immediately after a permit becomes available
197+
// 2. allowing for some slack after the mapping is completed, but its result not yet received; then, new mappings can already be started
198+
val inProgress = Channel.withCapacity[Fork[Option[U]]](parallelism * 4)
196199
val results = BufferCapacity.newChannel[U]
197200

198201
def forkMapping(t: T)(using OxUnsupervised): Fork[Option[U]] =
199202
forkUnsupervised:
200203
try
204+
s.acquire()
201205
val u = f(t)
202206
s.release() // not in finally, as in case of an exception, no point in starting subsequent forks
203207
Some(u)
@@ -214,11 +218,7 @@ class FlowOps[+T]:
214218
// notifying only the `results` channels, as it will cause the scope to end, and any other forks to be
215219
// interrupted, including the inProgress-fork, which might be waiting on a join()
216220
forkPropagate(results):
217-
last.run(
218-
FlowEmit.fromInline: t =>
219-
s.acquire()
220-
inProgress.sendOrClosed(forkMapping(t)).discard
221-
)
221+
last.run(FlowEmit.fromInline(t => inProgress.sendOrClosed(forkMapping(t)).discard))
222222
inProgress.doneOrClosed().discard
223223

224224
// a fork in which we wait for the created forks to finish (in sequence), and forward the mapped values to `results`

core/src/test/scala/ox/flow/FlowOpsMapParTest.scala

Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,4 +112,125 @@ class FlowOpsMapParTest extends AnyFlatSpec with Matchers with Eventually:
112112
// checking if the forks aren't left running
113113
sleep(200.millis)
114114
trail.get shouldBe Vector("done", "done", "exception") // TODO: 3 isn't cancelled because it's already taken off the queue
115+
116+
// Edge Cases
117+
it should "handle empty flow" in supervised:
118+
// given
119+
val flow = Flow.fromIterable(List.empty[Int])
120+
val processedCount = new AtomicInteger(0)
121+
122+
// when
123+
val result = flow.mapPar(5): i =>
124+
processedCount.incrementAndGet()
125+
i * 2
126+
127+
// then
128+
result.runToList() shouldBe List.empty
129+
processedCount.get() shouldBe 0
130+
131+
it should "handle flow with exactly parallelism number of elements" in supervised:
132+
// given
133+
val parallelism = 3
134+
val flow = Flow.fromIterable(1 to parallelism)
135+
val running = new AtomicInteger(0)
136+
val maxRunning = new AtomicInteger(0)
137+
138+
def f(i: Int) =
139+
val current = running.incrementAndGet()
140+
maxRunning.updateAndGet(current.max)
141+
try
142+
sleep(100.millis)
143+
i * 2
144+
finally running.decrementAndGet().discard
145+
end try
146+
end f
147+
148+
// when
149+
val result = flow.mapPar(parallelism)(f).runToList()
150+
151+
// then
152+
result shouldBe List(2, 4, 6)
153+
maxRunning.get() shouldBe parallelism
154+
155+
it should "handle flow with less than parallelism number of elements" in supervised:
156+
// given
157+
val flow = Flow.fromIterable(1 to 2)
158+
val running = new AtomicInteger(0)
159+
val maxRunning = new AtomicInteger(0)
160+
161+
def f(i: Int) =
162+
val current = running.incrementAndGet()
163+
maxRunning.updateAndGet(current.max)
164+
try
165+
sleep(100.millis)
166+
i * 2
167+
finally running.decrementAndGet().discard
168+
end try
169+
end f
170+
171+
// when
172+
val result = flow.mapPar(5)(f).runToList()
173+
174+
// then
175+
result shouldBe List(2, 4)
176+
maxRunning.get() shouldBe 2 // should never exceed actual number of elements
177+
178+
// Order Preservation Tests
179+
it should "preserve order even with varying processing times" in supervised:
180+
// given
181+
val flow = Flow.fromIterable(1 to 10)
182+
183+
def f(i: Int) =
184+
// Later elements finish faster to test order preservation
185+
val delay = if i <= 5 then (6 - i) * 50 else 50
186+
sleep(delay.millis)
187+
i * 2
188+
189+
// when
190+
val result = flow.mapPar(3)(f).runToList()
191+
192+
// then
193+
result shouldBe List(2, 4, 6, 8, 10, 12, 14, 16, 18, 20)
194+
195+
it should "preserve order with random processing times" in supervised:
196+
// given
197+
val elements = 1 to 20
198+
val flow = Flow.fromIterable(elements)
199+
200+
def f(i: Int) =
201+
// Random delay to test order preservation
202+
val delay = scala.util.Random.nextInt(100) + 10
203+
sleep(delay.millis)
204+
i
205+
206+
// when
207+
val result = flow.mapPar(5)(f).runToList()
208+
209+
// then
210+
result shouldBe elements.toList
211+
212+
// Other
213+
it should "work with very high parallelism values" in supervised:
214+
// given
215+
val flow = Flow.fromIterable(1 to 5)
216+
val running = new AtomicInteger(0)
217+
val maxRunning = new AtomicInteger(0)
218+
219+
def f(i: Int) =
220+
val current = running.incrementAndGet()
221+
maxRunning.updateAndGet(current.max)
222+
try
223+
sleep(50.millis)
224+
i * 2
225+
finally running.decrementAndGet().discard
226+
end try
227+
end f
228+
229+
// when
230+
val result = flow.mapPar(1000)(f).runToList()
231+
232+
// then
233+
result shouldBe List(2, 4, 6, 8, 10)
234+
maxRunning.get() shouldBe 5 // Should not exceed actual number of elements
235+
115236
end FlowOpsMapParTest

0 commit comments

Comments
 (0)