代码之家  ›  专栏  ›  技术社区  ›  Lutaaya Huzaifah Idris

更新查询不适用于Celery和FastAPI

  •  0
  • Lutaaya Huzaifah Idris  · 技术社区  · 1 年前

    我使用的是FastAPI存储库模式,它在Celery中运行得很好,但我有一个更新查询,它没有执行。(有趣的是,其他查询工作得很好)

    这是我的问题;

    UPDATE_REMINDER_SENT_AT_QUERY = """
    UPDATE public.trainings
    SET reminder_sent_at = :next_reminder_date  AT TIME ZONE 'utc'
    WHERE id = :training_id
    RETURNING id;
    """
    

    这是我的存储库;

     async def update_reminder_sent_at(
            self, training_id: UUID, next_reminder_date
        ):
            async with self.db.transaction():
                record: int = await self.db.fetch_val(
                    query=UPDATE_REMINDER_SENT_AT_QUERY,
                    values={
                        "training_id": training_id,
                        "next_reminder_date": next_reminder_date,
                    },
                )
                logger.info(f'record {record}')
    

    以及服务:

    async def update_reminder_sent_at(
        self, training_id: UUID, next_reminder_date
    ):
        await self.repository.update_reminder_sent_at(training_id, next_reminder_date)
    

    最后叫了一个芹菜任务;

    @app_celery.task
    def task_update(
        training_id: UUID, user_id: UUID, tenant_id: UUID
    ):
        """Send email reminders for a specific training and user."""
    
        async def send_reminder(training_id: UUID, user_id: UUID, tenant_id: UUID):
             next_reminder_date = datetime.utcnow() + timedelta(
                        days=training.reminder.schedule_in_days
                    )
                    await training_service.update_reminder_sent_at(
                        training_id, next_reminder_date
                    )
    
          asyncio.run(send_reminder(training_id, user_id, tenant_id))
    

    因此,当我登录到存储库logger.info(f'record{record}')时,我会得到这样的信息:

    celery_worker       | 2024-06-15 12:00:00.388 |INFO     | api.trainings.repository:update_reminder_sent_at:163 - record d1d33ada-1e98-40e7-982c-f553b09bcaa0 
    

    令人惊讶的是,检查数据库时,没有对字段产生任何影响。(该字段根本没有更新)。

    laas_api=# select id, reminder_sent_at  from trainings
    laas_api-# ;
                      id                  | reminder_sent_at
    --------------------------------------+------------------
     b8b6b8d3-20ca-4ced-9720-4b6585f116d9 |
     3f9e08af-dacf-4979-97be-bbe3f86c5979 |
     fba1a12d-a8d9-418d-851a-a55e29c02996 |
     679596a0-264c-4dad-aca2-37c197534621 |
     24d3469d-bfa7-43f0-95b9-996105af6df0 |
     45213723-0711-4d86-97ef-d9c7528bcebd |
     d1d33ada-1e98-40e7-982c-f553b09bcaa0 |
    (7 rows)
    

    注意:

    以下是我的数据库的声明方式;

    @asynccontextmanager
    async def get_db():
        database = Database(
            settings.database_url, force_rollback=True, min_size=3, max_size=20
        )
        await database.connect()
        try:
            yield database
        finally:
            await database.disconnect()
    
    1 回复  |  直到 1 年前
        1
  •  1
  •   lord_haffi    1 年前

    我们在评论中解决了这个问题,但对于任何遇到类似问题的人,以下是答案:

    为了将数据持久化到数据库中,您总是必须提交数据。在您的 get_db 函数你不做任何提交。此外,您使用配置连接对象 force_rollback=True 这会导致您的所有更改在关闭时回滚。

    来自 documentation :

    这将确保所有数据库连接都在事务中运行,一旦数据库断开连接,事务就会回滚。

    因此,在断开连接之前删除此标志并提交应该可以解决您的问题。