代码之家  ›  专栏  ›  技术社区  ›  Fusionist

Airflow 2.0.1/Python 3.7.9自定义挂钩的ModuleNotFoundError

  •  0
  • Fusionist  · 技术社区  · 3 年前

    我的原生气流结构如下

    dags/cust_dag.py dags/jhook.py——包含类UtilTriggers,在该类下有多个方法

    在cust_dag代码中,我将该钩子/模块调用为:

    从jhook导入UtilTriggers作为触发器

    当我检查Airflow UI时,我收到了客户数据的坏数据,其中提到了错误 ModuleNotFoundError:没有名为jhook的模块

    同样的代码也在composer 1.9上运行,目前我正在原生气流上运行。

    此外,我还尝试添加 初始化 .py文件,并创建了一个新文件夹job_trigger,在该文件夹下我添加了仍然无法工作的文件。

    我已经尝试了这个问题中提到的解决方案 Apache Airflow DAG cannot import local module

    即在hook自定义模块和dag文件中添加以下代码行 导入系统 sys.path.insert(0,os.path.abspath(os.path.dirname( 文件 )))

    当一切正常时,请指导我是什么原因导致了这个ModuleNotFound错误。

    0 回复  |  直到 3 年前
        1
  •  1
  •   Sandeep Mohanty    3 年前

    根据您的评论,您收到的错误消息是导入错误,似乎问题仅与python有关。

    dag1.py

    from __future__ import print_function
    
    import datetime
    
    from airflow import models
    from airflow.operators import bash_operator
    from airflow.operators import python_operator
    
    from new1 import hi as h1
    
    default_dag_args = {
      # The start_date describes when a DAG is valid / can be run. Set this to a
      # fixed point in time rather than dynamically, since it is evaluated every
      # time a DAG is parsed. See:
      # https://airflow.apache.org/faq.html#what-s-the-deal-with-start-date
      'start_date': datetime.datetime(2018, 1, 1),
    }
    
    # Define a DAG (directed acyclic graph) of tasks.
    # Any task you create within the context manager is automatically added to the
    # DAG object.
    with models.DAG(
          'demo_run',
          schedule_interval=datetime.timedelta(days=1),
          default_args=default_dag_args) as dag:
    
      # An instance of an operator is called a task. In this case, the
      # hello_python task calls the "greeting" Python function.
      hello_python = python_operator.PythonOperator(
          task_id='hello_world',
          python_callable=h1.funt,
          op_kwargs={"x" : "python"})
    
      # Likewise, the goodbye_bash task calls a Bash script.
      goodbye_bash = bash_operator.BashOperator(
          task_id='bye',
          bash_command='echo Goodbye.')
    
    
    

    new1.py

    
    class hi:
       @staticmethod
       def funt(x):
           return x + " is a programming language"
    
    
    
    1. 由于您使用的所有方法都是静态方法,因此没有必要将self传递给您的方法。start方法中的self关键字指的是对象。因为可以在不创建对象的情况下调用静态方法,所以它们没有self关键字。
    2. 如果在方法中传递一些参数,请确保这些参数也传递给DAG任务,方法是提供 op_args and op_kwargs arguments .

    为了回答您的问题,如果这将是一个kubernetes的问题,因为它托管在那里。 这个问题与kubernetes无关,因为

    • 当我们创建Composer环境时,Composer服务会为每个环境创建一个GKE集群。集群是自动命名和标记的,用户不应手动删除。集群是通过部署管理器创建和管理的。
    • 如果删除了群集,则环境将无法修复,需要重新创建。Kubernetes错误如下 Http错误状态代码:400 Http错误消息:BAD REQUEST