根据您的评论,您收到的错误消息是导入错误,似乎问题仅与python有关。
dag1.py
from __future__ import print_function
import datetime
from airflow import models
from airflow.operators import bash_operator
from airflow.operators import python_operator
from new1 import hi as h1
default_dag_args = {
# The start_date describes when a DAG is valid / can be run. Set this to a
# fixed point in time rather than dynamically, since it is evaluated every
# time a DAG is parsed. See:
# https://airflow.apache.org/faq.html#what-s-the-deal-with-start-date
'start_date': datetime.datetime(2018, 1, 1),
}
# Define a DAG (directed acyclic graph) of tasks.
# Any task you create within the context manager is automatically added to the
# DAG object.
with models.DAG(
'demo_run',
schedule_interval=datetime.timedelta(days=1),
default_args=default_dag_args) as dag:
# An instance of an operator is called a task. In this case, the
# hello_python task calls the "greeting" Python function.
hello_python = python_operator.PythonOperator(
task_id='hello_world',
python_callable=h1.funt,
op_kwargs={"x" : "python"})
# Likewise, the goodbye_bash task calls a Bash script.
goodbye_bash = bash_operator.BashOperator(
task_id='bye',
bash_command='echo Goodbye.')
new1.py
class hi:
@staticmethod
def funt(x):
return x + " is a programming language"
-
由于您使用的所有方法都是静态方法,因此没有必要将self传递给您的方法。start方法中的self关键字指的是对象。因为可以在不创建对象的情况下调用静态方法,所以它们没有self关键字。
-
如果在方法中传递一些参数,请确保这些参数也传递给DAG任务,方法是提供
op_args and op_kwargs arguments
.
为了回答您的问题,如果这将是一个kubernetes的问题,因为它托管在那里。
这个问题与kubernetes无关,因为
-
当我们创建Composer环境时,Composer服务会为每个环境创建一个GKE集群。集群是自动命名和标记的,用户不应手动删除。集群是通过部署管理器创建和管理的。
-
如果删除了群集,则环境将无法修复,需要重新创建。Kubernetes错误如下
Http错误状态代码:400 Http错误消息:BAD REQUEST
等