代码之家  ›  专栏  ›  技术社区  ›  Vickie Jack

使用java的cassandra中的高频插入会丢失一些数据

  •  2
  • Vickie Jack  · 技术社区  · 7 年前

    我有5000000个插入查询文件。我想从文件中读取它们,并使用java驱动程序和executeAsync方法,在循环语句中写入cassandra,如以下代码所示:

    public static void main(String[] args) {
            FileReader fr = null;
            try {
                fr = new FileReader("the-file-name.txt");
                BufferedReader br = new BufferedReader(fr);
                String sCurrentLine;
                long time1 = System.currentTimeMillis();
                while ((sCurrentLine = br.readLine()) != null) {
                    session.executeAsync(sCurrentLine);
                }
    
                System.out.println(System.currentTimeMillis() - time1);
                fr.close();
                br.close();
            } catch (FileNotFoundException e) {
                e.printStackTrace();
            } catch (IOException e) {
                e.printStackTrace();
            }
        } 
    

    我的表定义是:

    CREATE TABLE test.climate (
        city text,
        date text,
        time text,
        temprature int,
        PRIMARY KEY ((city, date), time)
    ) WITH CLUSTERING ORDER BY (time ASC)
        AND bloom_filter_fp_chance = 0.01
        AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}
        AND comment = ''
        AND compaction = {'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy', 'max_threshold': '32', 'min_threshold': '4'}
        AND compression = {'chunk_length_in_kb': '64', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'}
        AND crc_check_chance = 1.0
        AND dclocal_read_repair_chance = 0.1
        AND default_time_to_live = 0
        AND gc_grace_seconds = 864000
        AND max_index_interval = 2048
        AND memtable_flush_period_in_ms = 0
        AND min_index_interval = 128
        AND read_repair_chance = 0.0
        AND speculative_retry = '99PERCENTILE';
    

    但运行程序后,表中的行数为2569725

    cqlsh:test> select count(*) from climate ;
    
     count
    ---------
     2569725
    

    我测试了10多次,每次select count(*)的结果都在2400,00到2600000之间

    1 回复  |  直到 7 年前
        1
  •  1
  •   Chris Lohfink    7 年前

    您正在以比执行更快的速度发出异步插入,因此它们最终会超过队列大小并失败。您可以增加队列大小,这会起作用,但之后您只会对内存施加背压,而不是对生产者施加背压,并且仍然可能遇到问题。尝试限制飞行中的查询,如:

    public static void main2(String[] args) {
        FileReader fr = null;
        int permits = 256;
        Semaphore l = new Semaphore(permits);
        try {
            fr = new FileReader("the-file-name.txt");
            BufferedReader br = new BufferedReader(fr);
            String sCurrentLine;
            long time1 = System.currentTimeMillis();
            while ((sCurrentLine = br.readLine()) != null) {
                l.acquire();
                session.executeAsync(sCurrentLine)
                    .addListener(()->l.release(), MoreExecutors.directExecutor());
            }
            l.acquire(permits);
    
            System.out.println(System.currentTimeMillis() - time1);
            fr.close();
            br.close();
        } catch (FileNotFoundException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    

    它可能会运行得同样快,只需要找到合适大小的信号量。还要注意阻塞,直到所有许可都返回为止(在末尾获取max),否则您可以在发送所有可能在队列中的请求之前关闭jvm。

    免责声明:我没有测试上述代码