python 3.x - Dask compute subgraph with futures -
i want submit dask task following:
- build lazy dask graph using dask.bag (def fakejob)
- compute graph 1. , save parquet (left part out, motivation)
i need multiple inputs, i've been trying use futures feature of dask.distributed so.
from dask.distributed import client  client = client(processes=true)  def fakejob(path):     return (         dask.bag         .read_text(path)         .to_dataframe()     )  futures = client.map(fakejob, [input_path1, input_path2]) the problem keep getting: assertionerror: daemonic processes not allowed have children
i've tried following this link , ended second version (differs in 1 line first), futures stay 'pending' forever.
from dask.distributed import client  client = client(processes=true)  def fakejob(path):     dask.set_options(get=client.get):         return (             dask.bag             .read_text(path)             .to_dataframe()         )  futures = client.map(fakejob, [input_path1, input_path2]) any clues on how this?
cheers.
the strange , humerous error message comes trying construct dask graph (which bag is) within worker process, things end if called client.map. second attempt work local client if put whole work-flow within function, including writing parquet, , didn't attempt pass bag caller.
the solution simpler.
bags = [dask.bag.read_text(path)         .to_dataframe() path in [input_path1, input_path2]) futures = client.compute(bags)   # run in background on cluster client.gather(futures)   # wait , results here, bags list of dask-bags, i.e., work tasks defined not yet running. replace last 2 lines dask.compute(*bags) result without worrying futures.
Comments
Post a Comment