你可以创建一个
PythonOperator
查询气流数据库以查找所需信息的任务。这样做的另一个好处是可以传递查询所需数据的信息:
from contextlib import closing
from airflow import models, settings
from airflow.utils.state import State
def your_python_operator_callable(**context):
with closing(settings.Session()) as session:
print("There are {} failed tasks in this execution".format(
session.query(
models.TaskInstance
).filter(
models.TaskInstance.dag_id == context["dag"].dag_id,
models.TaskInstance.execution_date == context["execution_date"],
models.TaskInstance.state == State.FAILED).count()
)
然后将任务添加到
DAG
用一个
蟒蛇
.
(
我没有测试过上面的内容,但希望能让你走上正确的道路
)