代码之家  ›  专栏  ›  技术社区  ›  calyxofheld

工人内部的sidekiq执行(延迟)忽略延迟

  •  1
  • calyxofheld  · 技术社区  · 7 年前

    我的应用程序中的用户创建 Transactions ,我需要这些事务(以及为将事务状态更改为而创建的关联作业) ignored 当用户在特定时间内没有响应时)取消自己,除非用户执行 pay 行动

    我在一个示例中使用的方法使用 perform_async 状态更改为 approved ,如果未及时响应,则取消:

    Class Transaction < ApplicationRecord
     #when approved
     def create_worker
      MyWorker.perform_async(self.id)
     end
    
     #if user responds in time, cancel the jobs and update the record to `paid` etc
     def cancel_worker
      jid = MyWorker.perform_async(self.id)
      MyWorker.cancel! jid
     end
    end
    

    按照建议 here here ,我在worker中添加了关于何时取消的附加功能。它看起来像这样:

    class MyWorker
     include Sidekiq::Worker
    
     def perform(transaction_id)
      return if paid?
      transaction = Transaction.find transaction_id
      self.class.perform_in(1.minutes, transaction.ignore!)
     end
    
     def paid?
      Sidekiq.redis { |c| c.exists("paid-#{jid}") }
     end
    
     def self.cancel! jid
      Sidekiq.redis { |c| c.setex("paid-#{jid}", 86400, 1) }
     end
    end
    

    此代码产生以下端子输出:

    2018-12-16T01:40:50.645Z 30530 TID-oxm547nes MyWorker JID-6c97e448fe30998235dee95d INFO: start
    Changing transaction 4 approved to ignored (event: ignore!)
    2018-12-16T01:40:50.884Z 30530 TID-oxm547nes MyWorker JID-6c97e448fe30998235dee95d INFO: done: 0.239 sec
    2018-12-16T01:41:56.122Z 30530 TID-oxm547oag MyWorker JID-b46bb3b002e00f480a04be16 INFO: start
    2018-12-16T01:41:56.125Z 30530 TID-oxm547oag MyWorker JID-b46bb3b002e00f480a04be16 INFO: fail: 0.003 sec
    2018-12-16T01:41:56.126Z 30530 TID-oxm547oag WARN: {"context":"Job raised exception","job":{"class":"MyWorker","args":[true],"retry":true,"queue":"default","jid":"b46bb3b002e00f480a04be16","created_at":1544924450.884224,"enqueued_at":1544924516.107598,"error_message":"Couldn't find Transaction with 'id'=true","error_class":"ActiveRecord::RecordNotFound","failed_at":1544924516.125679,"retry_count":0},"jobstr":"{\"class\":\"MyWorker\",\"args\":[true],\"retry\":true,\"queue\":\"default\",\"jid\":\"b46bb3b002e00f480a04be16\",\"created_at\":1544924450.884224,\"enqueued_at\":1544924516.107598}"}
    

    6c97e448fe30998235dee95d 并立即将事务设置为 忽略 ,然后是一个有点 b46bb3b002e00f480a04be16 这正好超过了工人们提前返回的时间 perform 函数(因为它不使用与第一个作业相同的jid)。

    我可以猜测为什么这不能按我所希望的方式工作的一个原因是 MyWorker.cancel! 不能得到我的工人的圣杯 希望

    创建数据库迁移以包含工作人员的jid是确保在操作之间可以访问jid的首选方法吗?这是怎么回事 id=true 进去吗?正如上面的错误所说: Couldn't find Transaction with 'id'=true"

    1 回复  |  直到 7 年前
        1
  •  2
  •   rewritten    7 年前

    好的,让我们一件一件地来。

    1. self.class.perform_in(1.minute, transaction.ignore!)
      

      正在传递 ignore! 方法(在本例中, true )作为作业的参数,这将导致异常。

      您应该确保传递正确的参数:

      self.class.perform_in(1.minute, transaction.tap(&:ignore!).id)
      
    2. 每次你打电话 MyWorker.perform_async (或任何其他表演类方法)你正在创造一份新的工作,因此你没有得到同样的工作也就不足为奇了 jid

      吉德 在交易表中,然后在付款时检索以取消。否则,作业id将丢失。另一种方法是实际使用相同的redis来存储paid标志,但由交易输入。 c.exists("paid-#{transaction.id}")

    3. 您的代码不会等待1分钟来忽略事务,它只是立即忽略事务,并将自身设置为在1分钟内再次执行。

      你可能想打电话

      jid = MyWorker.perform_in(1.minute, transaction.id)
      

      直接从 create_worker


    如果像我想象的那样,您正在使用某种持久状态机,那么更容易“忽略,除非完成”,而忘记取消作业

    class Transaction
      # I'm inventing a DSL here
      include SomeStateMachine
    
      state :accepted do
        event :ignore, to: :ignored
        event :confirm, to: :confirmed
      end
      state :ignored
      state :confirmed
    
      def create_worker
        # no need to track it
        MyWorker.perform_in(1.minute, id)
      end
    end
    
    class MyWorker
      include Sidekiq::Worker
    
      def perform(id)
        transaction = Transaction.find(id)
        transaction.ignore! if transaction.can_ignore?
      end
    end
    

    您可以让作业运行,它将很高兴地跳过任何不可忽略的事务。