代码之家  ›  专栏  ›  技术社区  ›  Nitin

Hangfire。Net核心和实体框架:并发异常

  •  1
  • Nitin  · 技术社区  · 7 年前

    我正在开发一个。使用Hangfire的Net core应用程序,面临以下异常

    在上一个操作完成之前,在此上下文上启动了第二个操作。任何实例成员都不能保证线程安全。

    我使用Hangfire以1小时的间隔安排作业。当新流程/作业在早期作业完成其流程之前开始时,我面临上述问题。

    我们如何实现多个Hangfire流程/作业(多个工人)来(并行)完成任务。(通过使用默认的AspNetCoreJobActivator,现已解决)

    var scopeFactory = serviceProvider.GetService<IServiceScopeFactory>();
                if (scopeFactory != null)
                    GlobalConfiguration.Configuration.UseActivator(new AspNetCoreJobActivator(scopeFactory));
    

    现在,我在CreateOrderData中得到以下异常。cs:-

    /*系统。InvalidOperationException:引发了一个异常 可能是由于瞬时故障。如果连接到SQL Azure数据库考虑使用SqlAzureExecutionStrategy---&燃气轮机; 微软EntityFrameworkCore。DbUpdateException:发生错误 更新条目时。有关详细信息,请参阅内部异常---&燃气轮机; 系统数据SqlClient。SqlException:事务(进程ID 103)为 与另一个进程锁定资源时陷入僵局,已选择 作为僵局的受害者。重新运行事务*/

    我正在安排hangfire cron作业,如下所示:-

    RecurringJob.AddOrUpdate<IS2SScheduledJobs>(x => x.ProcessInputXML(), Cron.MinuteInterval(1));
    

    启动。cs公司

    public void ConfigureServices(IServiceCollection services)
    {
        string hangFireConnection = Configuration["ConnectionStrings:HangFire"];
        GlobalConfiguration.Configuration.UseSqlServerStorage(hangFireConnection);
    
        var config = new AutoMapper.MapperConfiguration(cfg =>
        {
           cfg.AddProfile(new AutoMapperProfileConfiguration());
        );
    
        var mapper = config.CreateMapper();
        services.AddSingleton(mapper);
    
        services.AddScoped<IHangFireJob, HangFireJob>();
        services.AddScoped<IScheduledJobs, ScheduledJobs>();
        services.AddScoped<BusinessLogic>();
        services.AddHangfire(opt => 
             opt.UseSqlServerStorage(Configuration["ConnectionStrings:HangFire"]));
    
        services.AddEntityFrameworkSqlServer().AddDbContext<ABCContext>(options => 
             options.UseSqlServer(Configuration["ConnectionStrings:ABC"]));
    }
    
    public void Configure(IApplicationBuilder app, IHostingEnvironment env, ILoggerFactory loggerFactory, IServiceProvider serviceProvider)
    {
        GlobalConfiguration.Configuration.UseActivator(new HangFireActivator(serviceProvider));
    
        //hangFireJob.Jobs();
    
        // add NLog to ASP.NET Core
        loggerFactory.AddConsole(Configuration.GetSection("Logging"));
        loggerFactory.AddDebug();
        loggerFactory.AddNLog();
        // app.UseCors("AllowSpecificOrigin");
    
        foreach (DatabaseTarget target in LogManager.Configuration.AllTargets.Where(t => t is DatabaseTarget))
        {
            target.ConnectionString = Configuration.GetConnectionString("Logging");
        }
    
        LogManager.ReconfigExistingLoggers();
    }
    

    机关枪。cs公司

    public class HangFireJob : IHangFireJob
    {
            private ABCContext _abcContext;
            private IScheduledJobs scheduledJobs;
    
            public HangFireJob(ABCContext abcContext, IScheduledJobs scheduledJobs)
            {
                _abcContext = abcContext;
                this.scheduledJobs = scheduledJobs;           
            }
    
            public void Jobs()
            {
                 RecurringJob.AddOrUpdate<IScheduledJobs>(x => x.ProcessInputXML(), Cron.HourInterval(1));
            }
    }
    

    计划作业。cs公司

    public class S2SScheduledJobs : IS2SScheduledJobs
    {
        private BusinessLogic _businessLogic;
    
        public ScheduledJobs(BusinessLogic businessLogic)
        {
            _businessLogic = businessLogic;
        }
    
        public async Task<string> ProcessInputXML()
        {
            await _businessLogic.ProcessXML();
        }
    }
    

    业务逻辑。cs公司

    public class BusinessLogic
    {
        private ABCContext _abcContext;
    
        public BusinessLogic(ABCContext abcContext) : base(abcContext)
        {
                _abcContext = abcContext;
        }
    
        public async Task ProcessXML()
        {
           var batchRepository = new BatchRepository(_abcContext);
           var unprocessedBatchRecords = await BatchRepository.GetUnprocessedBatch();
    
           foreach (var batchRecord in unprocessedBatchRecords)
           {
             try
             {
               int orderId = await LoadDataToOrderTable(batchRecord.BatchId);  
               await UpdateBatchProcessedStatus(batchRecord.BatchId);
    
               if (orderId > 0)
               {
                    await CreateOrderData(orderId);
               }
             }
             catch(Exception ex)
             {
             }
           }
        }
    

    CreateOrderData。cs公司

    public async Task<int> CreateOrderData(int orderId)
    {
      try
      {
        await OrderRepo.InsertOrder(order);
        await _abcContext.SaveChangesAsync();   
      }
      catch(Exception ex)
      {
        /*System.InvalidOperationException: An exception has been raised that is likely due to a transient failure. If you are connecting to a SQL Azure database consider using SqlAzureExecutionStrategy. ---> Microsoft.EntityFrameworkCore.DbUpdateException: An error occurred while updating the entries. See the inner exception for details. ---> System.Data.SqlClient.SqlException: Transaction (Process ID 103) was deadlocked on lock resources with another process and has been chosen as the deadlock victim. Rerun the transaction. */ 
      }
    }
    

    插入顺序。cs公司

    public async Task InsertOrder(Order o)
    {
       // creation of large number of entites(more than 50) to be inserted in the database
        woRepo.Insert(p);
        poRepo.Insert(y);
     //and many more like above
    
        Insert(order);
    }
    

    插入cs公司

    public virtual void Insert(TEntity entity)
        {
            entity.ObjectState = ObjectState.Added;
            if (entity is IXYZEntity xyzEntity)
            {
                xyzEntity.CreatedDate = DateTime.Now;
                xyzEntity.UpdatedDate = xyzEntity.CreatedDate;
                xyzEntity.CreatedBy = _context.UserName ?? string.Empty;
                xyzEntity.UpdatedBy = _context.UserName ?? string.Empty;
            }
            else if (entity is IxyzEntityNull xyzEntityNull)
            {
                xyzEntityNull.CreatedDate = DateTime.Now;
                xyzEntityNull.UpdatedDate = xyzEntityNull.CreatedDate;
                xyzEntityNull.CreatedBy = _context.UserName;
                xyzEntityNull.UpdatedBy = _context.UserName;
            }
            _dbSet.Add(entity);
            _context.SyncObjectState(entity);
        }
    

    将数据加载到订单。cs公司

    public async Task<int> LoadDataToOrder(int batchId)
    {
            //  using (var unitOfWork = new UnitOfWork(_abcContext))
            //  {
            var orderRepo = new OrderRepository(_abcContext);
            Entities.Order order = new Entities.Order();
    
            order.Guid = Guid.NewGuid();
            order.BatchId = batchId;
            order.VendorId = null;
    
            orderRepo.Insert(order);
            //unitOfWork.SaveChanges();
            await _abcContext.SaveChangesAsync();
            return order.OrderId;
            //  
    }
    }
    

    HangfireActivator。cs公司

    public class HangFireActivator : Hangfire.JobActivator
    {
            private readonly IServiceProvider _serviceProvider;
    
            public HangFireActivator(IServiceProvider serviceProvider)
            {
                _serviceProvider = serviceProvider;
            }
    
            public override object ActivateJob(Type type)
            {
                return _serviceProvider.GetService(type);
            }
    }
    

    请告知。

    谢谢

    1 回复  |  直到 7 年前
        1
  •  3
  •   bfontaine cat-walk    7 年前

    以下解决方案解决了这两个问题:

    1. 实施多个Hangfire流程/作业(多个工人)以工作(并行)。 回答:当我使用内置 AspNetCoreJobActivator 相反,这是现成的,即删除了HangfireActivator类并删除了对UseActivator方法的调用。

      var scopeFactory = serviceProvider.GetService<IServiceScopeFactory>();
      if (scopeFactory != null)
          GlobalConfiguration.Configuration.UseActivator(new AspNetCoreJobActivator(scopeFactory));
      
    2. SqlAzureExecutionStrategy 中的异常 CreateOrder.cs (交易陷入僵局)

    答:通过在发生死锁时自动重试查询,解决了此问题。

    感谢odinserj的建议。