代码之家  ›  专栏  ›  技术社区  ›  Kyle Bridenstine

除周末外,每天都有空气流通?

  •  1
  • Kyle Bridenstine  · 技术社区  · 6 年前

    是否可以创建除周六和周日外每天运行的气流DAG?这似乎不可能,因为你只有一个 开始日期 还有一个 时间表\间隔 .

    我正在建立一个工作流程,每天早上处理一批文件。虽然只有星期一到星期五,但这些文件不会在周末出现。我可以简单地使用24小时的超时设置,这基本上会导致周六和周日的超时,因为文件不会在那些天出现,但这会将DAG标记为在这两天失败,这将是非常愉快的。

    3 回复  |  直到 6 年前
        1
  •  5
  •   Zack    6 年前

    'schedule_interval': '0 0 * * 1-5' 从星期一到星期五,每天00:00运行。

        2
  •  6
  •   Taylor D. Edmiston    6 年前

    扎克的答案已经有了一个工作日的cron时间表,可以做你想做的事情。( 0 0 * * 1-5 ,但是我想添加一个关于常见cron调度的站点的答案, crontab表达式 .

    我经常把这个和气流结合起来设计一个DAG schedule_interval .

    帮助您交互设计cron时间表的主要应用程序位于 crontab.guru .

    仅在工作日日程上的示例- https://crontab.guru/every-weekday

    更常见的例子(例如每半小时、每季度等) https://crontab.guru/examples.html

        3
  •  1
  •   Ben Gregory    6 年前

    我有一个类似的需求,最后把它放在我的DAG的开头——它类似于短路器。

    import logging
    from airflow.models import SkipMixin, BaseOperator
    from airflow.utils.decorators import apply_defaults
    
    
    class pull_file_decision_operator(BaseOperator, SkipMixin):
    
       template_fields = ('execution_date',)
    
       @apply_defaults
       def __init__(self,
                    day_of_week, 
                    hour_of_day,
                    execution_date,
                    **kwargs):
    
           self.day_of_week = day_of_week
           self.hour_of_week = hour_of_day
           self.execution_date = execution_date
    
       def execute(self, context):
           # https://docs.python.org/3/library/datetime.html#datetime.date.weekday
    
           run_dt = self.execution_date
           dow = self.day_of_week
           hod = self.hour_of_day
    
           if run_dt.weekday() == dow and run_dt.hour == hod:
              return True
           else:
              downstream_tasks = context['task'].get_flat_relatives(upstream=False)
              logging.info('Skipping downstream tasks...')
              logging.info("Downstream task_ids %s", downstream_tasks)
    
              if downstream_tasks:
                 self.skip(context['dag_run'],
                           context['ti'].execution_date,
                           downstream_tasks)