代码之家  ›  专栏  ›  技术社区  ›  Muhammad Aadil Banaras

如果数据库关闭一段时间,MongoDB更改流超时

  •  0
  • Muhammad Aadil Banaras  · 技术社区  · 6 年前

    我正在nodejs中使用mongoDB change stream,一切正常,但是如果数据库关闭花费了超过10-5秒的时间来获取向上的change stream抛出超时错误,下面是我的change stream观察程序代码

    Service.prototype.watcher = function( db ){
    
    let collection = db.collection('tokens');
    let changeStream = collection.watch({ fullDocument: 'updateLookup' });
    let resumeToken, newChangeStream;
    
    changeStream.on('change', next => {
        resumeToken = next._id;
        console.log('data is ', JSON.stringify(next))
        changeStream.close();
        // console.log('resumeToken is ', JSON.stringify(resumeToken))
        newChangeStream = collection.watch({ resumeAfter : resumeToken });
        newChangeStream.on('change', next => {
            console.log('insert called ', JSON.stringify( next ))
        });
    });
    

    但是在数据库端,我已经处理了它,即如果数据库关闭或使用此代码重新连接

     this.db.on('reconnected', function () {
        console.info('MongoDB reconnected!');
    });
    this.db.on('disconnected', function() {
        console.warn('MongoDB disconnected!');
    });
    

    但我无法处理更改流监视程序,以便在数据库关闭时停止它,并在数据库重新连接时重新启动它,或者是否有其他更好的方法来执行此操作?

    2 回复  |  直到 6 年前
        1
  •  3
  •   kevinadi    6 年前

    您要做的是封装 watch() 调用函数。然后,此函数将在出错时调用自身,以使用以前保存的恢复令牌重新发送集合。代码中缺少的是错误处理程序。例如:

    const MongoClient = require('mongodb').MongoClient
    const uri = 'mongodb://localhost:27017/test?replicaSet=replset'
    var resume_token = null
    
    run()
    
    function watch_collection(con, db, coll) {
      console.log(new Date() + ' watching: ' + coll)
      con.db(db).collection(coll).watch({resumeAfter: resume_token})
        .on('change', data => {
          console.log(data)
          resume_token = data._id
        })
        .on('error', err => {
          console.log(new Date() + ' error: ' + err)
          watch_collection(con, coll)
        })
    }
    
    async function run() {
      con = await MongoClient.connect(uri, {"useNewUrlParser": true})
      watch_collection(con, 'test', 'test')
    }
    

    请注意 watch_collection() 包含 方法及其处理程序。更改时,它将打印更改并存储恢复令牌。出错时,它将调用自身以再次重写集合。

        2
  •  0
  •   Muhammad Aadil Banaras    6 年前

    这是我开发的解决方案,只需添加stream.on(error)函数,这样当出现错误时它不会崩溃,因为在数据库重新连接时重新启动stream,还为每个事件在文件中保存resume token,这在应用程序崩溃或停止时很有帮助,并且在这段时间内,如果添加了x条记录,因此,在应用程序重启时,只需从文件中获取最后一个resume标记,然后从中启动watcher,它将在此后插入所有记录,因此不会丢失任何记录,下面是代码

    var rsToken ;
        try {
            rsToken = await this.getResumetoken()
        } catch (error) {
            rsToken = null ;
        }
    
        if (!rsToken)
            changeStream = collection.watch({ fullDocument: 'updateLookup' });
        else 
            changeStream = collection.watch({ fullDocument: 'updateLookup', resumeAfter : rsToken  });
    
        changeStream.on('change', next => {
    
            resumeToken = next._id;
            THIS.saveTokenInfile(resumeToken)
    
            cs_processor.process( next )
    
    
        });  
        changeStream.on('error', err => {
            console.log('changestream error ')
        })