project reactor - Why does not the runOn() method execute map operator on the next available thread from the pool when the current executing threads go to wait state? -


i trying execute following code on 4 core machine. have 5 threads in pool , within map operator, put executing thread sleep few seconds.

i expect core put executing thread on sleep , when next event available, should perform map operation on next available thread thread pool, not behavior see.

i see 4 threads pool go on wait 13 seconds , process next event after wait complete.

why runon() method not executing map operator on next available thread pool when threads go wait state?

i using reactor-core version '3.0.7.release'

countdownlatch latch = new countdownlatch(10);  executorservice executorservice = executors.newfixedthreadpool(5);  flux<integer> flux = flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); flux.parallel() .runon(schedulers.fromexecutorservice(executorservice)) .map(l -> {     logger.log(reactorparalleltest.class, "map1", "inside run waiting 13 seconds");     try {         thread.sleep(13000);     } catch (interruptedexception e) {         e.printstacktrace();     }     logger.log(reactorparalleltest.class, "map1", "l=" + l);     latch.countdown();     return l; }).subscribe(l -> {     logger.log(reactorparalleltest.class, "onnext", "l=" + l); }, error -> system.err.println(error),         () -> {             logger.log(reactorparalleltest.class, "oncomplete", "inside complete.");             executorservice.shutdown();         });  try {     latch.await(60, timeunit.seconds); } catch (interruptedexception e) {     e.printstacktrace(); } 

you blocking rails code. 4 rails started (number of cpus), , request 1 element each source. since block in map when done, rail cannot request more upstream, in effect 4 elements @ time, block, more, block... parallelism more limited capacity of thread pool. if want put threads use, .parallel(5) (same configuration thread pool).

on side note, subscribe(lambda) parallelflux invoke oncomplete callback each rail. if want merge single sequence (and single complete), use .sequential() before .subscribe.


Comments

Popular posts from this blog

node.js - Node js - Trying to send POST request, but it is not loading javascript content -

javascript - Replicate keyboard event with html button -

javascript - Web audio api 5.1 surround example not working in firefox -