python - Airflow default on_failure_callback -
in dag file, have define on_failure_callback() function post slack in case of failure.
it works if specify each operator in dag : on_failure_callback=on_failure_callback()
is there way automate (via default_args instance, or via dag object) dispatch of operators?
i found way that.
you can pass on_failure_callback default_args
class foo: @staticmethod def get_default_args(): """ return default args :return: default_args """ default_args = { 'on_failure_callback': foo.on_failure_callback } return default_args @staticmethod def on_failure_callback(context): """ define callback post on slack if failure detected in workflow :return: operator.execute """ operator = slackapipostoperator( task_id='failure', text=str(context['task_instance']), token=variable.get("slack_access_token"), channel=variable.get("slack_channel") ) return operator.execute(context=context)
Comments
Post a Comment