代码之家  ›  专栏  ›  技术社区  ›  Kannaiyan

S3事件触发器是否可扩展?

  •  7
  • Kannaiyan  · 技术社区  · 8 年前

    将大约3K个对象(文件)加载到S3。每个文件都有一个事件触发器加载到S3存储桶。

    Lambda仅接收大约300个对象的事件触发器。若我重试(从S3移回S3并将其放回S3),它会为另外400个对象生成事件,其余的事件甚至并没有到达lambda。

    我在这里遗漏了什么,如何缩放创建的任意数量的对象?

    var async = require('async');                                                                                                                                                                                
    var aws = require('aws-sdk');                                                                                                                                                                                
    var s3 = new aws.S3();                                                                                                                                                                                       
    var kinesis = new aws.Kinesis();                                                                                                                                                                             
    var sns = new aws.SNS();                                                                                                                                                                                     
    var config = require('./config.js');                                                                                                                                                                         
    
    
    var logError = function(errormsg) {                                                                                                                                                                          
        sns.publish({                                                                                                                                                                                            
            TopicArn: config.TopicArn,                                                                                                                                                                           
            Message: errormsg                                                                                                                                                                                    
        }, function(err, data) {                                                                                                                                                                                 
            if (err) {                                                                                                                                                                                           
                console.log(errormsg);                                                                                                                                                                           
            }                                                                                                                                                                                                    
        });                                                                                                                                                                                                      
    };                                                                                                                                                                                                           
    
    
    exports.handler = function(event, context, callback) {                                                                                                                                                       
    
        var readS3andSendtoKinesis = function(record, index, cb) {                                                                                                                                               
            var params = {                                                                                                                                                                                       
                Bucket: record.s3.bucket.name,                                                                                                                                                                   
                Key: record.s3.object.key                                                                                                                                                                        
            }; 
            console.log('Received File: ' +  record.s3.object.key);                                                                                                                                                                                                 
            s3.getObject(params, function(err, data) {                                                                                                                                                           
                if (!err) {                                                                                                                                                                                      
                    var kinesisParams = {                                                                                                                                                                        
                        Data: data.Body.toString('utf8'),                                                                                                                                                        
                        PartitionKey: config.PartitionKey,                                                                                                                                                       
                        StreamName: config.StreamName                                                                                                                                                            
                    };                                                                                                                                                                                           
                    kinesis.putRecord(kinesisParams, function(err, data) {                                                                                                                                       
                        if (err) {                                                                                                                                                                               
                            // Handle Kinesis Failures                                                                                                                                                           
                            logError(JSON.stringify(err, null, 2));                                                                                                                                              
                        }                                                                                                                                                                                        
                        cb(null, 'done');                                                                                                                                                                        
                    });                                                                                                                                                                                          
                } else {                                                                                                                                                                                         
                    // Handle S3 Failures                                                                                                                                                                        
                    logError(JSON.stringify(err, null, 2));                                                                                                                                                      
                    cb(null, 'done');                                                                                                                                                                            
                }                                                                                                                                                                                                
            });                                                                                                                                                                                                  
        };                                                                                                                                                                                                       
    
        async.eachOfLimit(event.Records, 1, readS3andSendtoKinesis, function(err) {                                                                                                                              
            callback(null, 'Done');                                                                                                                                                                              
        });                                                                                                                                                                                                      
    }; 
    

    由于每个人都建议查看cloudwatch,在此处共享相关lambda的cloudwatch指标,

    enter image description here

    2 回复  |  直到 7 年前
        1
  •  2
  •   Kannaiyan    7 年前

    我们发现根本原因似乎是资源的另一端出现了故障。S3触发器正在发生,无法扩展到它接收到的巨大触发器。

    要解决,

    尽快返回S3 Lambda触发器,延迟会导致问题。

    如果您花太多时间处理触发器内的业务逻辑,那么在我们的示例中,我们从S3读取数据并写入流。相反,我们只写S3的位置,然后从接收端的S3读取。

    希望有帮助。

        2
  •  0
  •   Efren    7 年前

    AWS Lambda具有节流配置,可避免失控情况。

    对于S3,Lambda调用也依赖于 permissions ,因此您应该检查这些权限。

    由于S3不是基于流的源代码,您可能看到了同步场景,其中 throttling 达到极限,S3不再重试。检查lambdas中的节流和错误429。