代码之家  ›  专栏  ›  技术社区  ›  Andrew Cassidy

Python3.6 Airflow,带有需要2.7的运算符

  •  2
  • Andrew Cassidy  · 技术社区  · 6 年前

    我目前正在Python3.6.5上运行一个Airflow(1.9.0)实例。我有一个手动工作流,我想移到DAG。这个手动工作流现在需要用python 2和3编写代码。让我们将我的DAG简化为3个步骤:

    1. 为机器处理数据和设置数据的数据流作业 学习训练
    2. Tensorflow ML培训工作
    3. 其他 我用python 3代码编写的pythonoperators

    dataflow作业是用python 2.7编写的(google需要),tensorflow模型代码是用python 3编写的。查看Airflow1.9.0中的“mlenginetrainingoperator”,有一个python_version参数,它设置“训练中使用的python版本”。

    问题:

    • 我可以在工作进程中动态指定特定的python版本吗 环境?
    • 我必须在Python2.7上安装Airflow才能运行步骤1吗?
    • 我能在Python3中有TensorFlow模型代码吗?这些代码只是通过运行在Python2上的MLengineTraining打包并提交的?
    • 我必须用python 2重写我的3)操作符吗?
    2 回复  |  直到 6 年前
        1
  •  3
  •   cwurtz    6 年前

    没有办法在工作机上动态指定python版本。但是,如果使用芹菜执行器,则可以在不同的服务器/vm上或在不同的虚拟环境中运行多个worker。

    您可以让一个worker运行python 3,一个worker运行2.7,并让每个worker监听不同的队列。这可以通过三种不同的方式实现:

    • 启动辅助进程时,可以添加 -q [queue-name] 标志
    • 设置环境 AIRFLOW__CELERY__DEFAULT_QUEUE
    • 更新 default_queue 在下面 [celery] 在airlow.cfg中。

    然后在任务定义中指定 queue 参数,根据任务需要运行的python版本更改队列。

    我不熟悉mlengioperator,但您可以指定 python_version 在pythonoperator中,它应该在那个版本的virtualenv中运行。另外,您可以使用bash操作符,编写代码以在不同的文件中运行,并指定python命令以使用要使用的python版本的绝对路径来运行它。

    不管任务是如何运行的,您只需要确保dag本身与运行它的python版本兼容。例如,如果要在不同的python版本中启动airlow worker,dag文件本身需要与python 2&3兼容。DAG可以具有它使用的附加文件依赖项,这些依赖项具有版本不兼容性。

        2
  •  0
  •   Andrew Cassidy    6 年前

    好吧,开箱即用,您不能在常规python 3气流集群上运行python 2气流工作程序:

    Airflow使用sqlalchemy(我相信可以将有关DAG的元数据读写到数据库中)。当你在一个工人身上运行一个dag时,它会从数据库中读取关于该dag的pickled信息。如果其他非工作组件在python 3中,则它们将在pickle 4中写入db,而工作组件将尝试从python 2中的db读取数据。

    特别是在sqlalchemy中查看sqltypes.py:

    class PickleType(TypeDecorator):
        """Holds Python objects, which are serialized using pickle.
    
        PickleType builds upon the Binary type to apply Python's
        ``pickle.dumps()`` to incoming objects, and ``pickle.loads()`` on
        the way out, allowing any pickleable Python object to be stored as
        a serialized binary field.
    
        To allow ORM change events to propagate for elements associated
        with :class:`.PickleType`, see :ref:`mutable_toplevel`.
    
        """
    
        impl = LargeBinary
    
        def __init__(self, protocol=pickle.HIGHEST_PROTOCOL,
                     pickler=None, comparator=None): 
    

    然后在compat.py中,最终在sqltypes.py中进行酸洗。

    py36 = sys.version_info >= (3, 6)
    py33 = sys.version_info >= (3, 3)
    py35 = sys.version_info >= (3, 5)
    py32 = sys.version_info >= (3, 2)
    py3k = sys.version_info >= (3, 0)
    py2k = sys.version_info < (3, 0)
    py265 = sys.version_info >= (2, 6, 5)
    jython = sys.platform.startswith('java')
    pypy = hasattr(sys, 'pypy_version_info')
    win32 = sys.platform.startswith('win')
    cpython = not pypy and not jython  # TODO: something better for this ?
    
    import collections
    next = next
    
    if py3k:
        import pickle
    else:
        try:
            import cPickle as pickle
        except ImportError:
            import pickle
    

    同样,pickle=true在气流中似乎不会对此产生影响???也许是因为根据 here 它只与回填有关?