python - How to dynamically iterate over the output of an upstream task to create parallel tasks in airflow? -


consider following example of dag first task, get_id_creds, extracts list of credentials database. operation tells me users in database able run further data preprocessing on , writes ids file /tmp/ids.txt. scan ids dag , use them generate list of upload_transaction tasks can run in parallel.

my question is: there more idiomatically correct, dynamic way using airflow? have here feels clumsy , brittle. how can directly pass list of valid ids 1 process defines subsequent downstream processes?

from datetime import datetime, timedelta import os import sys  airflow.models import dag airflow.operators.python_operator import pythonoperator  import ds_dependencies  script_path = os.getenv('dash_preproc_path') if script_path:     sys.path.insert(0, script_path)     import dash_workers else:     print('define dash_preproc_path value in environmental variables')     sys.exit(1)  default_args = {   'start_date': datetime.now(),   'schedule_interval': none }  dag = dag(   dag_id='dash_preproc',   default_args=default_args )  get_id_creds = pythonoperator(     task_id='get_id_creds',     python_callable=dash_workers.get_id_creds,      provide_context=true,     dag=dag)  open('/tmp/ids.txt', 'r') infile:     ids = infile.read().splitlines()  uid in uids:     upload_transactions = pythonoperator(         task_id=uid,         python_callable=dash_workers.upload_transactions,         op_args=[uid],         dag=dag)     upload_transactions.set_downstream(get_id_creds) 

considering apache airflow workflow management tool, ie. determines dependencies between task user defines in comparison (as example) apache nifi dataflow management tool, ie. dependencies here data transferd through tasks.

that said, think approach quit right (my comment based on code posted) but airflow offers concept called xcom. allows tasks "cross-communicate" between them passing data. how big should passed data ? test! should not big. think in form of key,value pairs , stored in airflow meta-database,ie can't pass files example list ids work.

like said should test self. happy know experience. here example dag demonstrates use of xcom , here necessary documentation. cheers!


Comments

Popular posts from this blog

node.js - Node js - Trying to send POST request, but it is not loading javascript content -

javascript - Replicate keyboard event with html button -

javascript - Web audio api 5.1 surround example not working in firefox -