代码之家  ›  专栏  ›  技术社区  ›  Michael

与pandas和gnu parallel并行将数据导入mysql

  •  1
  • Michael  · 技术社区  · 6 年前

    我有几千个目录,我想从中导入数据到MySQL。我创建了一个python脚本,它从单个目录中读取数据并将其放到数据库中。以下是将数据发送到数据库的部分:

    host = 'localhost'
    engine = create_engine('mysql://user:pass@%s/db?charset=utf8' % host)
    conn = engine.connect()
    trans = conn.begin()
    try:
        conn.execute('delete from tests where ml="%s"' % ml)
        tests.to_sql(con=conn, name='tests', if_exists='append', index=False)
        data.to_sql(con=conn, name='data', if_exists='append', index=False)
        trans.commit()
        print(CGRE + ml + ': OK' + CEND)
    except:
        trans.rollback()
        print(CRED + ml + ': database error!' + CEND)
        raise
    conn.close()
    

    一个线程执行效果很好,但速度太慢:

    parallel -j 1 "[[ -d {} ]] && (cd {} && data_to_db.py) || echo {} >> ~/Data/failed_db" ::: *
    

    现在我想启动几个进程:

    parallel -j 8 .........
    

    有时在执行过程中,我会得到这个错误:

    sqlAlchemy.exc.internalError:(pymysql.err.internalError)(1213,'尝试获取锁时发现死锁;尝试重新启动事务')

    是否有一种方法可以增加事务的等待时间或以其他方式解决事务,因为如果没有并行执行,导入所有数据将花费太长的时间?

    1 回复  |  直到 6 年前
        1
  •  0
  •   Michael    6 年前

    多亏了@romanperekhrest,这里有一个 solution 来自mysql手册 LOCK/UNLOCK TABLES .

    engine = create_engine('mysql://user:pass@%s/db?charset=utf8' % host)
    conn = engine.connect()
    trans = conn.begin()
    try:
        conn.execute('set autocommit=0')
        conn.execute('lock tables tests write, data write')
        conn.execute('delete from tests where ml="%s"' % ml)
        tests.to_sql(con=conn, name='tests', if_exists='append', index=False)
        data.to_sql(con=conn, name='data', if_exists='append', index=False)
        trans.commit()
        conn.execute('unlock tables')
        print(CGRE + ml + ': OK' + CEND)
    except:
        trans.rollback()
        conn.execute('unlock tables')
        conn.close()
        print(CRED + ml + ': database error!' + CEND)
        raise
    conn.close()