代码之家  ›  专栏  ›  技术社区  ›  Christopher Armstrong

我如何处理AWS Glue中映射函数的错误?

  •  1
  • Christopher Armstrong  · 技术社区  · 7 年前

    我用的是 map 动态框架方法 Map.apply 方法)。我注意到传递给这些函数的函数中的任何错误都会被静默地忽略,并导致返回的dynamicframe为空。

    假设我有这样的工作脚本:

    import sys
    from pyspark.context import SparkContext
    from awsglue.context import GlueContext
    from awsglue.transforms import *
    
    glueContext = GlueContext(SparkContext.getOrCreate())
    dyF = glueContext.create_dynamic_frame.from_catalog(database="radixdemo", table_name="census_csv")
    
    def my_mapper(rec):
        import logging
        logging.error("[RADIX] An error-log from in the mapper!")
        print "[RADIX] from in the mapper!"
        raise Exception("[RADIX] A bug!")
    dyF = dyF.map(my_mapper, 'my_mapper')
    
    print "Count:  ", dyF.count()
    dyF.printSchema()
    dyF.toDF().show()
    

    如果我在glue dev端点中运行此脚本, gluepython ,我得到如下输出:

    [glue@ip-172-31-83-196 ~]$ gluepython gluejob.py
    SLF4J: Class path contains multiple SLF4J bindings.
    SLF4J: Found binding in [jar:file:/usr/share/aws/glue/etl/jars/glue-assembly.jar!/org/slf4j/impl/StaticLoggerBinder.class]
    SLF4J: Found binding in [jar:file:/usr/lib/spark/jars/slf4j-log4j12-1.7.16.jar!/org/slf4j/impl/StaticLoggerBinder.class]
    SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
    SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
    Setting default log level to "WARN".
    To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
    18/05/23 20:56:46 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.
    ERROR StatusLogger No log4j2 configuration file found. Using default configuration: logging only errors to the console.
    Count:   0
    root
    
    ++
    ||
    ++
    ++
    

    有关此输出的说明:

    • 我看不出 print 声明或 logging.error 声明。
    • 没有迹象表明 my_mapper 引发异常。
    • 这个 printSchema 调用显示生成的dynamicframe上没有架构元数据
    • 这个 show 方法也没有生成任何输出,这表明所有行都已不存在。

    同样,当我在AWS Glue控制台中将此脚本保存为作业并运行它时,该作业并不表示发生了任何错误——作业状态为“成功”。值得注意的是,我 获取 打印 声明和 日志记录错误 调用输出到作业日志,但只在常规的“日志”中,而不是“错误日志”。

    我想要的是能够指出我的工作失败了,并且能够很容易地找到这些错误日志。最重要的是指出它已经失败了。

    是否有一种方法可以在映射函数中记录错误,使glue将其作为“错误日志”(并将其放在单独的AWS CloudWatch日志路径中)?如果发生这种情况,它会自动将整个作业标记为失败吗?或者是否有其他方法从映射函数中显式地使作业失败?

    (如果有记录错误和/或将作业标记为失败的方法,我的计划是创建一个装饰器或其他实用程序函数,该函数将自动捕获映射函数中的异常,并确保它们被记录并标记为失败)。

    1 回复  |  直到 7 年前
        1
  •  3
  •   Christopher Armstrong    6 年前

    我发现使粘合作业显示为“失败”的唯一方法是从主脚本引发异常( 在一个映射器或过滤器函数中,这些函数似乎被旋转到数据处理单元中)。

    幸运的是,那里 检测映射或筛选函数内部是否发生异常的方法:使用 DynamicFrame.stageErrorsCount() 方法。它将返回一个数字,指示在运行最近的转换时引发了多少异常。

    所以解决所有问题的正确方法是:

    • 确保映射或转换功能 明确地 记录它内部发生的任何异常。最好使用decorator函数或其他可重用机制,而不是依赖 try/except 在您编写的每个函数中都有语句。
    • 在每次希望捕获错误的转换之后,调用 stageErrorsCount() 方法并检查它是否大于0。如果您想中止该作业,只需引发一个异常。

    例如:

    import logging
    
    def log_errors(inner):
        def wrapper(*args, **kwargs):
            try:
                inner(*args, **kwargs)
            except Exception as e:
                logging.exception('Error in function: {}'.format(inner))
                raise
        return wrapper
    
    @log_errors
    def foo(record):
        1 / 0
    

    然后,在你的工作中,你会做一些类似的事情:

    df = df.map(foo, "foo")
    if df.stageErrorsCount() > 0:
        raise Exception("Error in job! See the log!")
    

    注意,即使打电话 logging.exception 从mapper函数内部仍然不将日志写入 错误 出于某种原因,请登录AWS CloudWatch日志。它会被写入常规的成功日志中。但是,使用这种技术,您至少可以看到作业失败,并且能够在日志中找到信息。另一个警告:开发人员端点似乎没有显示来自映射器或过滤器函数的任何日志。