代码之家  ›  专栏  ›  技术社区  ›  Dawid

spark:优化向sql server写入数据帧

  •  0
  • Dawid  · 技术社区  · 6 年前

    我正在使用下面的代码将一个包含43列和大约2000000行的数据帧写入SQL Server中的一个表中:

    dataFrame
      .write
      .format("jdbc")
      .mode("overwrite")
      .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver")
      .option("url", url)
      .option("dbtable", tablename)
      .option("user", user)
      .option("password", password)
      .save()
    

    不幸的是,虽然它确实适用于小数据帧,但它要么非常慢,要么对于大数据帧超时。有什么关于如何优化它的提示吗?

    我试过设置 rewriteBatchedStatements=true

    谢谢。

    0 回复  |  直到 6 年前
        1
  •  6
  •   Jasper-M    6 年前

    我们使用 azure-sqldb-spark 库而不是spark的默认内置导出功能。这个图书馆给你 bulkCopyToSqlDB 方法是 真实的 批量插入和转到 很多 更快。它的实用性比内置的功能要差一点,但以我的经验来看还是值得的。

    我们差不多是这样使用的:

    import com.microsoft.azure.sqldb.spark.config.Config
    import com.microsoft.azure.sqldb.spark.connect._
    import com.microsoft.azure.sqldb.spark.query._
    
    val options = Map(
      "url"          -> "***",
      "databaseName" -> "***",
      "user"         -> "***",
      "password"     -> "***",
      "driver"       -> "com.microsoft.sqlserver.jdbc.SQLServerDriver"
    )
    
    // first make sure the table exists, with the correct column types
    // and is properly cleaned up if necessary
    val query = dropAndCreateQuery(df, "myTable")
    val createConfig = Config(options ++ Map("QueryCustom" -> query))
    spark.sqlContext.sqlDBQuery(createConfig)
    
    val bulkConfig = Config(options ++ Map(
      "dbTable"           -> "myTable",
      "bulkCopyBatchSize" -> "20000",
      "bulkCopyTableLock" -> "true",
      "bulkCopyTimeout"   -> "600"
    ))
    
    df.bulkCopyToSqlDB(bulkConfig)
    

    正如你所看到的,我们生成 CREATE TABLE 扪心自问。你 可以 让库创建表,但它只会 dataFrame.limit(0).write.sqlDB(config) 这仍然是非常低效的,可能需要你缓存 DataFrame ,它不允许您选择 SaveMode .

    也可能很有趣:我们不得不使用 ExclusionRule 当将此库添加到sbt构建时,或 assembly 任务将失败。

    libraryDependencies += "com.microsoft.azure" % "azure-sqldb-spark" % "1.0.2" excludeAll(
      ExclusionRule(organization = "org.apache.spark")
    )
    
        2
  •  6
  •   notNull    6 年前

    尝试添加 batchsize 你至少可以选择 > 10000 (相应地更改此值以获得更好的性能)并再次执行写操作。

    From spark docs:

    JDBC批处理大小,它决定 每个要插入多少行 往返行程 . 这有助于提高jdbc驱动程序的性能。这个选项 只适用于写作。它 默认为1000 .

    同样值得一看的是:

    • numPartitions option 增加并行性(这也决定了并发jdbc连接的最大数量)

    • queryTimeout 选项 增加写入选项的超时。

        3
  •  1
  •   Chakshu Goyal    6 年前

    是否可以将数据转换为csv文件并复制这些csv文件? 我们已经为更大的表自动化了这个过程,并以csv格式传输gcp格式的表。而不是通过jdbc阅读。