代码之家  ›  专栏  ›  技术社区  ›  wandermonk

提高Apache Spark到Redis的写性能

  •  0
  • wandermonk  · 技术社区  · 7 年前

    我有一个应用程序,它使用Apache Spark将密钥、值数据写入Redis。应用程序工作正常,没有任何问题。但是,应用程序要慢得多。我在这里寻找一些建议,以提高写入吞吐量和提高并行性,同时将数据写入Redis。

    这是密码

    Dataset<Row> rowkeyMapping = services.select(regexp_replace(col("rowkey"), "_", "").as("rowkey"),struct(regexp_replace(col("name"), "\\[", ","), regexp_replace(col("oname"), "\\[", ","), col("cid")).as("detailsinfo"));
    
    rowkeyMapping.foreach(obj -> {
        JedisPoolConfig poolConfig = new JedisPoolConfig();
        poolConfig.setMaxTotal(5000);
        JedisPool pool = new JedisPool(poolConfig, "redis-host", Integer.parseInt("6379"));
        Jedis jedis = pool.getResource();
        ObjectMapper om = new ObjectMapper();
        String[] rowArray = obj.mkString()
            .replaceAll("[\\[]", ",")
            .split(",");
        String key = rowArray[0];
        DetailInfo detail = new DetailInfo();
        detail.setName(rowArray[1]);
        detail.setOName(rowArray[2]);
        detail.setCid(rowArray[3]);
    
        String value = om.writeValueAsString(detail);
        logger.info("writing key value pairs to Redis cache (Key) :: " + key);
        jedis.set(key, value);
        jedis.quit();
    });
    

    我对Redis管道系统知之甚少。但是,我认为流水线更多的是对命令进行批处理。在这里,就我而言,我正在处理数百万的数据。我不确定流水线是否最适合。

    2 回复  |  直到 7 年前
        1
  •  0
  •   Amir Kost    7 年前

    我对Spark和Redis都不是专家,但我认为以下几行应该在foreach循环之外:

    JedisPoolConfig poolConfig = new JedisPoolConfig();
    poolConfig.setMaxTotal(5000);
    JedisPool pool = new JedisPool(poolConfig, "redis-host", Integer.parseInt("6379"));
    
        2
  •  0
  •   for_stack    7 年前

    正如@Amir Kost在他的回答中提到的,您的问题是,当您设置单个键值对时,会创建一个新连接。为了提高性能,应该对一批键值对重用连接。

    foreachPartition 方法 Dataset<Row> foreach . 前庭 ForeachPartitionFunction<T> 整个分区的函数。所以您可以创建一个连接,并对分区中的所有项重复使用它。检查 doc

    还有,用 前庭 ,可以在分区中获取一批项,然后使用Redis Pipline获得更好的性能。检查管道 doc