代码之家  ›  专栏  ›  技术社区  ›  John

在Async Promise内批量加载时Grails DuplicateKeyException/NonUniqueObjectException

  •  0
  • John  · 技术社区  · 10 年前

    我有大量数据,希望使用GORM加载到数据库中。

    class DbLoadingService {
    
        static transactional = false    
        // these are used to expedite the batch loading process
        def sessionFactory
        def propertyInstanceMap = org.codehaus.groovy.grails.plugins.DomainClassGrailsPlugin.PROPERTY_INSTANCE_MAP
    
        // these are example services that will assist in the parsing of the input data    
        def auxLoadingServiceA
        def auxLoadingServiceB
    
        def handleInputFile(String filename) {
            def inputFile = new File(filename)
            // parse each line and process according to record type
            inputFile.eachLine { line, lineNumber ->
                this.handleLine(line, lineNumber)
            }
        }
    
    
        @Transactional
        def handleLine(String line, int lineNumber) {
            // do some further parsing of the line, based on its content
            // example here is based on 1st 2 chars of line
            switch (line[0..1]) {
                case 'AA':
                    auxLoadingServiceA.doSomethingWithLine(line)
                    break;
    
                case 'BB':
                    auxLoadingServiceB.doSomethingElseWithLine(line)
                    break;
    
                default:
                    break;
    
            }
            if (lineNumber % 100 == 0) cleanUpGorm()
        }
    
        def cleanUpGorm() {
            def session = sessionFactory.getCurrentSession()
            session.flush()
            session.clear()
            propertyInstanceMap.get().clear()
        }
    
    }
    
    class AuxLoadingServiceA {
        static transactional = false
    
        doSomethingWithLine(String line) {
            // do something here
        }
    }
    
    class AuxLoadingServiceB {
        static transactional = false
    
        doSomethingElseWithLine(String line) {
            // do something else here
        }
    }
    

    我特意使顶级服务仅针对每行的负载进行事务处理。实际上,顶层下有相当多的服务级别,而不仅仅是单个AuxA&B服务层。因此,我不希望有多层事务的开销:我认为我只需要1。

    加载到DB中的数据模型包括两个具有Many/belongsTo关系的域对象。与域对象的这种交互是在子层中完成的,为了保持示例的大小可管理,我的代码中没有显示。

    似乎导致问题的域对象与以下内容类似:

    class Parent {
        static hasMany = [children: Child]
        static mapping = {
            children lazy: false
            cache true
        }
    }
    
    class Child {
        String someValue
        // also contains some other sub-objects
    
        static belongsTo = [parent : Parent]
    
        static mapping = {
            parent index: 'parent_idx'
            cache true
        }
    }
    

    所示的cleanupGorm()方法是必需的,否则服务会在大量的行之后完全停止。

    当我启动数据库加载时,所有工作都完全按照预期进行:

    // Called from with a service / controller
    dbLoadingService.handleInputFile("someFile.txt")
    

    但是,一旦我将负载移动到异步进程中,如下所示:

    def promise = task {
        dbLoadingService.handleInputFile("someFile.txt")
    }
    

    我得到了DuplicateKeyException/NonUniqueObjectException:

    error details: org.springframework.dao.DuplicateKeyException: A different object with the same identifier value was already associated with the session : [com.example.SampleDomainObject#1]; nested exception is org.hibernate.NonUniqueObjectException: A different object with the same identifier value was already associated with the session : [com.example.SampleDomainObject#1]
    

    所以,我的问题是,在将大量数据异步加载到Grails DB中方面,有哪些最佳实践?为了确保内存中的对象在会话中保持一致,是否需要对刷新/清除会话进行一些操作?缓存对象时需要做什么吗?

    1 回复  |  直到 10 年前
        1
  •  0
  •   John    10 年前

    解决方案是按照@JoshuaMoore的建议,使用新的会话。此外,还有一个对域对象的引用,该对象是从一个事务外部引用的,而该事务在新会话中没有调用merge(),从而导致错误。

    def obj = DomainObject.findBySomeProperty('xyz')
    
    // now start new session
    
    obj.someProperty // causes exception
    obj = obj.merge()
    obj.someProperty // doesn't cause an exception
    

    Joshua的评论促使我深入了解Hibernate的文档( https://docs.jboss.org/hibernate/orm/3.6/reference/en-US/html/transactions.html )

    具体而言,从第13章:

    SessionFactory是一个昂贵的线程安全对象, 旨在由所有应用线程共享。它被创建一次, 通常在应用程序启动时从Configuration实例启动。

    会话是一个廉价的非线程安全对象,应该使用 一次,然后丢弃:单个请求、会话或 单个工作单元。会话将无法获得JDBC连接或 数据源,除非需要。它不会消耗任何资源 直到使用。

    其他人可能感兴趣的是,随着解析对象数量的增加,我看到批处理加载的性能逐渐下降,即使Burt Beckwith建议进行性能优化 here :并由Ted Naleid进一步详细解释 here .

    因此,使用文档中的提示,性能问题的答案不是尝试在所有处理中使用会话,而是使用它进行少量处理,然后丢弃它并创建一个新的会话。

    当我删除问题中的cleanupGorm()方法并用以下方法替换它时,我得到了 性能提高6倍 ,即使在解析了数以千万计的记录之后,加载时间也不会随批处理大小而增加:

    // somewhere in the service method that is doing the batch parse
    def currentSession = sessionFactory.openSession()
    
    // start some form of batch parse, perhaps in a loop
    
        // do work here
        // periodically, perhaps in the %N way shown above
        currentSession.flush()
        currentSession.close()
        currentSession = sessionFactory.openSession()
    
    // end of loop
    

    当我需要在跨越服务的事务中包装东西时,我做了以下操作:

    currentSession = sessionFactory.openSession()
    currentSession.beginTransaction()
    
    // start loop
    // do work
    
    // when we want to commit
    def tx = currentSession?.getTransaction()
    if (tx?.isActive()) tx.commit()
    currentSession?.close()
    
    // if we're in a loop and we need a new transaction
    currentSession = sessionFactory.openSession()
    currentSession.beginTransaction()
    

    虽然我承认使用SpringBatch这样的东西可能更好,但这可能会涉及丢弃大量代码,否则这些代码将按预期工作。我将在下一次需要这样做时研究这一点,但同时,希望这可能对其他需要使用Grails进行大规模批处理的人有用,并且发现批处理大小会导致性能下降。

    约书亚注意:非常感谢你的帮助,非常感谢!

    推荐文章