project reactor - Why does not the runOn() method execute map operator on the next available thread from the pool when the current executing threads go to wait state? -
i trying execute following code on 4 core machine. have 5 threads in pool , within map operator, put executing thread sleep few seconds.
i expect core put executing thread on sleep , when next event available, should perform map operation on next available thread thread pool, not behavior see.
i see 4 threads pool go on wait 13 seconds , process next event after wait complete.
why runon()
method not executing map operator on next available thread pool when threads go wait state?
i using reactor-core version '3.0.7.release'
countdownlatch latch = new countdownlatch(10); executorservice executorservice = executors.newfixedthreadpool(5); flux<integer> flux = flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); flux.parallel() .runon(schedulers.fromexecutorservice(executorservice)) .map(l -> { logger.log(reactorparalleltest.class, "map1", "inside run waiting 13 seconds"); try { thread.sleep(13000); } catch (interruptedexception e) { e.printstacktrace(); } logger.log(reactorparalleltest.class, "map1", "l=" + l); latch.countdown(); return l; }).subscribe(l -> { logger.log(reactorparalleltest.class, "onnext", "l=" + l); }, error -> system.err.println(error), () -> { logger.log(reactorparalleltest.class, "oncomplete", "inside complete."); executorservice.shutdown(); }); try { latch.await(60, timeunit.seconds); } catch (interruptedexception e) { e.printstacktrace(); }
you blocking rails code. 4 rails started (number of cpus), , request 1 element each source. since block in map
when done, rail cannot request more upstream, in effect 4 elements @ time, block, more, block... parallelism more limited capacity of thread pool. if want put threads use, .parallel(5)
(same configuration thread pool).
on side note, subscribe(lambda)
parallelflux
invoke oncomplete
callback each rail. if want merge single sequence (and single complete), use .sequential()
before .subscribe
.
Comments
Post a Comment