问题
跑步
airflow db init
在我的
virtualenv
给出了以下错误,我可以看到该错误是用
pip freeze
⯠airflow db init
DB: sqlite:////Users/.../airflow.db
[2023-07-24T23:03:08.958+0100] {migration.py:213} INFO - Context impl SQLiteImpl.
[2023-07-24T23:03:08.961+0100] {migration.py:216} INFO - Will assume non-transactional DDL.
[2023-07-24T23:03:09.058+0100] {migration.py:213} INFO - Context impl SQLiteImpl.
[2023-07-24T23:03:09.058+0100] {migration.py:216} INFO - Will assume non-transactional DDL.
[2023-07-24T23:03:09.059+0100] {db.py:1591} INFO - Creating tables
INFO [alembic.runtime.migration] Context impl SQLiteImpl.
INFO [alembic.runtime.migration] Will assume non-transactional DDL.
ERROR [airflow.models.dagbag.DagBag] Failed to import: /Users/.../dags/test_dag.py
Traceback (most recent call last):
File "/opt/homebrew/lib/python3.11/site-packages/airflow/models/dagbag.py", line 346, in parse
loader.exec_module(new_module)
File "<frozen importlib._bootstrap_external>", line 940, in exec_module
File "<frozen importlib._bootstrap>", line 241, in _call_with_frames_removed
File "/Users/.../airflow/dags/test_dag.py", line 4, in <module>
from airflow_package.components.extractors import PyTrendsKeywordExtractor
File "/Users/.../dags/airflow_package/components/extractors.py", line 3, in <module>
from pytrends.request import TrendReq
ModuleNotFoundError: No module named 'pytrends'
WARNI [airflow.models.crypto] empty cryptography key - values will not be stored encrypted.
Initialization done
⯠pip freeze | grep pytrends
pytrends==4.9.2
上下文
我写了一些使用外包装的自定义匕首
pytrends
和
google-search-results
.使用
setuptools
,我已经注册了我的自定义包:
airflow_package
它包含一个类,例如。
PyTrendsKeywordExtractor
使用
pytrends
.
#setup.py
from setuptools import setup, find_packages
setup(
name='airflow_package',
version='1.2',
description='A useful module',
author='Man Foo',
author_email='[email protected]',
packages=find_packages(), #same as name
install_requires=["pytrends", "google-search-results", "python-dotenv", "pandas"], #external packages as dependencies
)
#Dag Code (dag_test.py)
with DAG(dag_id="my_test_dag", schedule="@hourly", start_date=datetime(2023, 7, 24), catchup=False) as dag:
@task.python(task_id="keyword_extractor")
def keyword_extractor(**kwargs):
ti = kwargs['ti']
extractor = PyTrendsKeywordExtractor(kw_list=["Nike", "Canada Goose", "Sports Direct"], fuzzy_find_key="company")
keyword_dict = extractor.extract()
ti.xcom_push("target_keywords", json.dumps(keyword_dict))
...
我有以下文件结构:
airflow/
âââ airflow.cfg
âââ airflow.db
âââ dags
â  âââ __pycache__
â  â  âââ test_dag.cpython-311.pyc
â  âââ airflow_package
â  â  âââ __init__.py
â  â  âââ __pycache__
â  â  â  âââ __init__.cpython-311.pyc
â  â  âââ components
â  â  â  âââ __init__.py
â  â  â  âââ __pycache__
â  â  â  â  âââ __init__.cpython-311.pyc
â  â  â  â  âââ extractors.cpython-311.pyc
â  â  â  âââ base.py
â  â  â  âââ extractors.py
â  â  â  âââ new_extractors.py
â  â  âââ settings.py
â  âââ test_dag.py
âââ logs
â  âââ scheduler
â  âââ 2023-07-24
âÂ
âââ webserver_config.py
以下文档:
事实证明,气流文件很难遵循;我尝试了以下操作:
模块结构:
https://airflow.apache.org/docs/apache-airflow/stable/administration-and-deployment/modules_management.html#adding-directories-to-the-pythonpath