python 3.x - Dask compute subgraph with futures -
i want submit dask task following:
- build lazy dask graph using dask.bag (
def fakejob
) - compute graph 1. , save parquet (left part out, motivation)
i need multiple inputs, i've been trying use futures feature of dask.distributed so.
from dask.distributed import client client = client(processes=true) def fakejob(path): return ( dask.bag .read_text(path) .to_dataframe() ) futures = client.map(fakejob, [input_path1, input_path2])
the problem keep getting: assertionerror: daemonic processes not allowed have children
i've tried following this link , ended second version (differs in 1 line first), futures stay 'pending' forever.
from dask.distributed import client client = client(processes=true) def fakejob(path): dask.set_options(get=client.get): return ( dask.bag .read_text(path) .to_dataframe() ) futures = client.map(fakejob, [input_path1, input_path2])
any clues on how this?
cheers.
the strange , humerous error message comes trying construct dask graph (which bag is) within worker process, things end if called client.map. second attempt work local client if put whole work-flow within function, including writing parquet, , didn't attempt pass bag caller.
the solution simpler.
bags = [dask.bag.read_text(path) .to_dataframe() path in [input_path1, input_path2]) futures = client.compute(bags) # run in background on cluster client.gather(futures) # wait , results
here, bags
list of dask-bags, i.e., work tasks defined not yet running. replace last 2 lines dask.compute(*bags)
result without worrying futures.
Comments
Post a Comment