我试着把气流设置成簇,然后用
this article
这样做。我刚配置了
airflow.cfg
要使用的文件
CeleryExecutor
,我指了指
sql_alchemy_conn
对于运行在同一主节点上的postgresql数据库,我设置了
broker_url
要使用sqs(我没有设置access_key_id或secret_key,因为它运行在一个ec2实例上,不需要这些),我已经设置了
celery_result_backend
我的postgresql服务器。我保存了新的airlow.cfg更改,运行
airflow initdb
,然后我跑了
airflow scheduler
我从调度程序得到这个错误,
[2018-06-07 21:07:33,420] {celery_executor.py:101} ERROR - Error syncing the celery executor, ignoring it:
[2018-06-07 21:07:33,421] {celery_executor.py:102} ERROR - Can't load plugin: sqlalchemy.dialects:psycopg2
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/airflow/executors/celery_executor.py", line 83, in sync
state = async.state
File "/usr/local/lib/python3.6/site-packages/celery/result.py", line 433, in state
return self._get_task_meta()['status']
File "/usr/local/lib/python3.6/site-packages/celery/result.py", line 372, in _get_task_meta
return self._maybe_set_cache(self.backend.get_task_meta(self.id))
File "/usr/local/lib/python3.6/site-packages/celery/backends/base.py", line 344, in get_task_meta
meta = self._get_task_meta_for(task_id)
File "/usr/local/lib/python3.6/site-packages/celery/backends/database/__init__.py", line 53, in _inner
return fun(*args, **kwargs)
File "/usr/local/lib/python3.6/site-packages/celery/backends/database/__init__.py", line 122, in _get_task_meta_for
session = self.ResultSession()
File "/usr/local/lib/python3.6/site-packages/celery/backends/database/__init__.py", line 99, in ResultSession
**self.engine_options)
File "/usr/local/lib/python3.6/site-packages/celery/backends/database/session.py", line 59, in session_factory
engine, session = self.create_session(dburi, **kwargs)
File "/usr/local/lib/python3.6/site-packages/celery/backends/database/session.py", line 45, in create_session
engine = self.get_engine(dburi, **kwargs)
File "/usr/local/lib/python3.6/site-packages/celery/backends/database/session.py", line 42, in get_engine
return create_engine(dburi, poolclass=NullPool)
File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/__init__.py", line 424, in create_engine
return strategy.create(*args, **kwargs)
File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/strategies.py", line 57, in create
entrypoint = u._get_entrypoint()
File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/url.py", line 156, in _get_entrypoint
cls = registry.load(name)
File "/usr/local/lib/python3.6/site-packages/sqlalchemy/util/langhelpers.py", line 221, in load
(self.group, name))
sqlalchemy.exc.NoSuchModuleError: Can't load plugin: sqlalchemy.dialects:psycopg2
这是我的
空气流动
文件,
[core]
airflow_home = /home/ec2-user/airflow
dags_folder = /home/ec2-user/airflow/dags
base_log_folder = /home/ec2-user/airflow/logs
remote_log_conn_id =
encrypt_s3_logs = False
logging_level = INFO
logging_config_class =
log_format = [%%(asctime)s] {%%(filename)s:%%(lineno)d} %%(levelname)s - %%(message)s
simple_log_format = %%(asctime)s %%(levelname)s - %%(message)s
executor = CeleryExecutor
sql_alchemy_conn = postgresql+psycopg2://postgres:$password@localhost/datalake_airflow_cluster_v1_master1_database_1
sql_alchemy_pool_size = 5
sql_alchemy_pool_recycle = 3600
parallelism = 32
dag_concurrency = 16
dags_are_paused_at_creation = True
non_pooled_task_slot_count = 128
max_active_runs_per_dag = 16
load_examples = True
plugins_folder = /home/ec2-user/airflow/plugins
fernet_key = ibwZ5uSASmZGphBmwdJ4BIhd1-5WZXMTTgMF9u1_dGM=
donot_pickle = False
dagbag_import_timeout = 30
task_runner = BashTaskRunner
default_impersonation =
security =
unit_test_mode = False
task_log_reader = file.task
enable_xcom_pickling = True
killed_task_cleanup_time = 60
[cli]
api_client = airflow.api.client.local_client
endpoint_url = http://localhost:8080
[api]
auth_backend = airflow.api.auth.backend.default
[operators]
default_owner = Airflow
default_cpus = 1
default_ram = 512
default_disk = 512
default_gpus = 0
[webserver]
base_url = http://localhost:8080
web_server_host = 0.0.0.0
web_server_port = 8080
web_server_ssl_cert =
web_server_ssl_key =
web_server_worker_timeout = 120
worker_refresh_batch_size = 1
worker_refresh_interval = 30
secret_key = temporary_key
workers = 4
worker_class = sync
access_logfile = -
error_logfile = -
expose_config = False
authenticate = False
filter_by_owner = False
owner_mode = user
dag_default_view = tree
dag_orientation = LR
demo_mode = False
log_fetch_timeout_sec = 5
hide_paused_dags_by_default = False
page_size = 100
[email]
email_backend = airflow.utils.email.send_email_smtp
[smtp]
smtp_host = localhost
smtp_starttls = True
smtp_ssl = False
smtp_port = 25
smtp_mail_from = airflow@example.com
[celery]
celery_app_name = airflow.executors.celery_executor
celeryd_concurrency = 16
worker_log_server_port = 8793
broker_url = sqs://
celery_result_backend = db+psycopg2://postgres:$password@localhost/datalake_airflow_cluster_v1_master1_database_1
flower_host = 0.0.0.0
flower_port = 5555
default_queue = default
celery_config_options = airflow.config_templates.default_celery.DEFAULT_CELERY_CONFIG
[dask]
cluster_address = 127.0.0.1:8786
[scheduler]
job_heartbeat_sec = 5
scheduler_heartbeat_sec = 5
run_duration = -1
min_file_process_interval = 0
dag_dir_list_interval = 300
print_stats_interval = 30
child_process_log_directory = /home/ec2-user/airflow/logs/scheduler
scheduler_zombie_task_threshold = 300
catchup_by_default = True
max_tis_per_query = 0
statsd_on = False
statsd_host = localhost
statsd_port = 8125
statsd_prefix = airflow
max_threads = 2
authenticate = False
[ldap]
uri =
user_filter = objectClass=*
user_name_attr = uid
group_member_attr = memberOf
superuser_filter =
data_profiler_filter =
bind_user = cn=Manager,dc=example,dc=com
bind_password = insecure
basedn = dc=example,dc=com
cacert = /etc/ca/ldap_ca.crt
search_scope = LEVEL
[mesos]
master = localhost:5050
framework_name = Airflow
task_cpu = 1
task_memory = 256
checkpoint = False
authenticate = False
[kerberos]
ccache = /tmp/airflow_krb5_ccache
principal = airflow
reinit_frequency = 3600
kinit_path = kinit
keytab = airflow.keytab
[github_enterprise]
api_rev = v3
[admin]
hide_sensitive_variable_fields = True
我不太清楚这是怎么回事。我需要在芹菜或其他东西上做额外的设置吗?我也很困惑,它如何知道在aws上使用哪个sqs队列?它是自己创建一个新队列,还是我需要在aws上创建队列并将该url放在某个地方?