代码之家  ›  专栏  ›  技术社区  ›  John Phillips

AIrflow数据库初始化未拾取外部依赖项

  •  0
  • John Phillips  · 技术社区  · 2 年前

    问题

    跑步 airflow db init 在我的 virtualenv 给出了以下错误,我可以看到该错误是用 pip freeze

    ❯ airflow db init
    DB: sqlite:////Users/.../airflow.db
    [2023-07-24T23:03:08.958+0100] {migration.py:213} INFO - Context impl SQLiteImpl.
    [2023-07-24T23:03:08.961+0100] {migration.py:216} INFO - Will assume non-transactional DDL.
    [2023-07-24T23:03:09.058+0100] {migration.py:213} INFO - Context impl SQLiteImpl.
    [2023-07-24T23:03:09.058+0100] {migration.py:216} INFO - Will assume non-transactional DDL.
    [2023-07-24T23:03:09.059+0100] {db.py:1591} INFO - Creating tables
    INFO  [alembic.runtime.migration] Context impl SQLiteImpl.
    INFO  [alembic.runtime.migration] Will assume non-transactional DDL.
    ERROR [airflow.models.dagbag.DagBag] Failed to import: /Users/.../dags/test_dag.py
    Traceback (most recent call last):
      File "/opt/homebrew/lib/python3.11/site-packages/airflow/models/dagbag.py", line 346, in parse
        loader.exec_module(new_module)
      File "<frozen importlib._bootstrap_external>", line 940, in exec_module
      File "<frozen importlib._bootstrap>", line 241, in _call_with_frames_removed
      File "/Users/.../airflow/dags/test_dag.py", line 4, in <module>
        from airflow_package.components.extractors import PyTrendsKeywordExtractor
      File "/Users/.../dags/airflow_package/components/extractors.py", line 3, in <module>
        from pytrends.request import TrendReq
    ModuleNotFoundError: No module named 'pytrends'
    WARNI [airflow.models.crypto] empty cryptography key - values will not be stored encrypted.
    Initialization done
    ❯ pip freeze | grep pytrends
    pytrends==4.9.2
    

    上下文

    我写了一些使用外包装的自定义匕首 pytrends google-search-results .使用 setuptools ,我已经注册了我的自定义包: airflow_package 它包含一个类,例如。 PyTrendsKeywordExtractor 使用 pytrends .

    #setup.py
    from setuptools import setup, find_packages
    
    setup(
       name='airflow_package',
       version='1.2',
       description='A useful module',
       author='Man Foo',
       author_email='[email protected]',
       packages=find_packages(),  #same as name
       install_requires=["pytrends", "google-search-results", "python-dotenv", "pandas"], #external packages as dependencies
    )
    
    #Dag Code (dag_test.py)
    
    with DAG(dag_id="my_test_dag", schedule="@hourly", start_date=datetime(2023, 7, 24), catchup=False) as dag:
    
    
        @task.python(task_id="keyword_extractor")
        def keyword_extractor(**kwargs):
            ti = kwargs['ti']
            extractor = PyTrendsKeywordExtractor(kw_list=["Nike", "Canada Goose", "Sports Direct"], fuzzy_find_key="company")
            keyword_dict = extractor.extract()
            ti.xcom_push("target_keywords", json.dumps(keyword_dict))
        ...
    

    我有以下文件结构:

    airflow/
    ├── airflow.cfg
    ├── airflow.db
    ├── dags
    │   ├── __pycache__
    │   │   └── test_dag.cpython-311.pyc
    │   ├── airflow_package
    │   │   ├── __init__.py
    │   │   ├── __pycache__
    │   │   │   └── __init__.cpython-311.pyc
    │   │   ├── components
    │   │   │   ├── __init__.py
    │   │   │   ├── __pycache__
    │   │   │   │   ├── __init__.cpython-311.pyc
    │   │   │   │   └── extractors.cpython-311.pyc
    │   │   │   ├── base.py
    │   │   │   ├── extractors.py
    │   │   │   └── new_extractors.py
    │   │   └── settings.py
    │   └── test_dag.py
    ├── logs
    │   └── scheduler
    │       ├── 2023-07-24
    │ 
    └── webserver_config.py
    

    以下文档:

    事实证明,气流文件很难遵循;我尝试了以下操作:

    模块结构: https://airflow.apache.org/docs/apache-airflow/stable/administration-and-deployment/modules_management.html#adding-directories-to-the-pythonpath

    0 回复  |  直到 2 年前
    推荐文章