我的目标是通过AWS Lambda使用Snowpipe REST API进行自动化,如文档中所述。
https://docs.snowflake.com/en/user-guide/data-load-snowpipe-rest-lambda
我创建了具有适当密钥对身份验证和S3连接的函数,并在AWS上部署了lambda函数。
对于每个lambda函数,都有一个处理程序,用于描述S3位置和其他内容。为了将S3上上传的文件接收到Snowflake,我们在Snowflak中创建了一个外部阶段,显示哪些文件已准备好加载到Snowf薄片。我测试了我的连接,一切正常。所有连接都正确。
处理程序函数为:
def handler(event, context):
for record in event['Records']:
bucket = record['s3']['bucket']['name']
key = record['s3']['object']['key']
print("Bucket: " + bucket + " Key: " + key)
# List of files in the stage specified in the pipe definition
# wrapped into a class
staged_file_list = ['customer_1.csv']
staged_file_list.append(StagedFile(key, None))
print('Pushing file list to ingest REST API')
resp = ingest_manager.ingest_files(staged_file_list)
在上面你可以看到
staged_file_list
其中列出了要加载的文件,并给出了执行此操作的注释:
https://docs.snowflake.com/en/user-guide/data-load-snowpipe-rest-lambda
关键字是“relative”-现在整个URL看起来像-
当我试图监视lambda函数的日志时,这对我来说是正确的,在那里你可以看到预期的正确的bucket和键值,但我面临着一个错误,我无法找到文档或解决方案。错误如下:
我试图重新构造目录,以正确地指向正确的S3存储桶和文件名,但没有成功。在我看来,这就像是摄入管理器中的一个bug。我期待你对我可能出错的地方提供意见。
期望-我将“customer_1.csv”上传到我的S3,这会触发lambda函数,该函数依次将文件加载到Snowflake表。