代码之家  ›  专栏  ›  技术社区  ›  MikeMB

自动清除无符号整数的最低非零位

  •  6
  • MikeMB  · 技术社区  · 6 年前

    问题:
    我正在寻找清除类似无符号原子的最低非零位的最佳方法 std::atomic_uint64_t 以线程安全的方式,而不使用额外的互斥或类似。此外,我还需要知道,哪一个位被清除了。

    例子: 例如,如果存储的当前值是 0b0110 我想知道最低的非零位是位1(0索引),并将变量设置为 0b0100 .

    我想到的最好版本是 this :

    #include <atomic>
    #include <cstdint>
    
    inline uint64_t with_lowest_non_zero_cleared(std::uint64_t v){
        return v-1 & v;
    }
    inline uint64_t only_keep_lowest_non_zero(std::uint64_t v){
        return v & ~with_lowest_non_zero_cleared(v);
    }
    
    uint64_t pop_lowest_non_zero(std::atomic_uint64_t& foo)
    {
        auto expected = foo.load(std::memory_order_relaxed);
        while(!foo.compare_exchange_weak(expected,with_lowest_non_zero_cleared(expected), std::memory_order_seq_cst, std::memory_order_relaxed))
        {;}
        return only_keep_lowest_non_zero(expected);
    }
    

    有更好的解决方案吗?

    笔记:

    • 它不必是最低的非零位-我也会很高兴有一个解决方案可以清除最高的位或设置最高/最低的“零位”,如果这有区别的话。
    • 我更喜欢一个标准的C++(17)版本,但是它将使用一个用于CLAN和MSVC的内含函数的答案。
    • 它应该是可移植的(使用clang或msvc编译x64和aarch64),但我最感兴趣的是使用clang编译时,在最近的Intel和AMD x64体系结构上的性能。
    • 编辑:更新必须是原子的,并且必须保证全局进度,但是正如上面的解决方案一样,它不必是无等待的算法(当然,如果你能给我看一个,我会非常高兴)。
    2 回复  |  直到 6 年前
        1
  •  7
  •   Peter Cordes    6 年前

    在x86上没有对原子清除最低位的直接硬件支持 . BMI1 blsr 仅在内存源形式中可用,而不是内存目标形式; lock blsr [shared_var] 不存在。(编译器知道如何优化 (var-1) & (var) 进入之内 BLSR 当使用编译时用于本地变量 -march=haswell 或者启用假定bmi1支持的代码生成。)因此即使可以假定bmi1支持 它不会让你做任何根本不同的事情。

    在x86上你能做的最好的是 lock cmpxchg 你在问题中提出的重试循环 . 这比在旧版本的变量中找到正确的位然后使用 lock btr 尤其是如果在位扫描和 锁定BTR . 如果另一个线程已经清除了您想要的位,您仍然需要一个重试循环。

    在实践中,CAS重试循环并不坏:重试非常罕见

    (除非您的程序对共享变量有很高的争用,否则即使使用 lock add 在没有尝试的地方,只是硬件仲裁访问缓存线。如果是这样的话,您可能应该重新设计您的无锁算法和/或考虑某种操作系统辅助的睡眠/唤醒,而不是让大量的内核花费大量的CPU时间在同一缓存线上。在低争用情况下,无锁是很好的。)

    CPU丢失要获取的负载之间的缓存线的窗口 expected 和跑步 锁定cmpxchg 通过对该值的几条指令,结果是很小的。 通常它会第一次成功通过 ,因为当 cmpxchg 跑。当缓存线到达时,它(希望)已经处于mesi-exclusive状态,如果CPU看到足够远的距离为它做一个RFO。

    您可以检测cmpxchg重试循环,以查看在实际程序中实际获得的争用量。 . (例如,通过增加循环内的局部值并使用原子松弛 += 一旦成功,或使用线程本地计数器,则返回共享计数器。)

    记住,您真正的代码(希望)在这个位掩码上的原子操作之间做了大量的工作,所以它与所有线程花费所有时间在缓存线上的微基准非常不同。

    编辑:更新必须是原子的,并且 必须保证全球进步 但是,正如上面的解决方案一样,它不必是无等待算法(当然,如果你能给我看一个,我会很高兴的)。

    从技术意义上讲,CAS重试循环(即使在ll/sc机器上编译,请参见下文)是无锁的:只有在其他线程取得进展时,才需要重试。 .

    如果失败,cas将不修改缓存线。在x86上,它会弄脏缓存线(mesi m state),但x86 cmpxchg没有检测到aba,它只会进行比较,因此加载相同缓存线的另一个线程 预期 会成功的。在ll/sc机器上,希望一个核心上的sc失败不会导致其他核心上的sc错误,否则它会激活锁。这是您可以假设的CPU架构师的想法。

    它不是等待自由的,因为理论上一个线程可以重试无限次。


    您的代码编译时使用 gcc8.1 -O3 -march=haswell 到这个ASM( from the Godbolt compiler explorer )

    # gcc's code-gen for x86 with BMI1 looks optimal to me.  No wasted instructions!
    # presumably you'll get something similar when inlining.
    pop_lowest_non_zero(std::atomic<unsigned long>&):
        mov     rax, QWORD PTR [rdi]
    .L2:
        blsr    rdx, rax                      # Bit Lowest Set Reset
        lock cmpxchg    QWORD PTR [rdi], rdx
        jne     .L2                           # fall through on success: cmpxchg sets ZF on equal like regular cmp
        blsi    rax, rax                      # Bit Lowest Set Isolate
        ret
    

    没有bmi1,blsr和blsi就变成了两个指令。考虑到 lock ED指令。

    不幸的是,clang和msvc稍显笨重,其分支处于无争用的快速路径上。(而且clang通过剥离第一次迭代而使函数膨胀。IDK,如果这有助于分支预测或者其他什么,或者它纯粹是一个遗漏的优化。我们可以让clang使用 unlikely() 宏。 How do the likely() and unlikely() macros in the Linux kernel work and what is their benefit? )。


    脚注1:

    除非您正在为一组已知的机器构建二进制文件,否则无论如何都不能假设bmi1可用。所以编译器需要做一些类似的事情 lea rdx, [rax-1] / and rdx, rax 而不是 blsr rdx, rax . 这对这个函数来说是一个微不足道的区别;大部分成本是原子操作,即使是在未争用的情况下,甚至对于热的in-l1d缓存没有争用的情况,查看无序执行吞吐量对周围代码的影响。(例如,对于 锁定cmpxchg 在天湖上( http://agner.org/optimize/ )与保存1个UOP相比 BLSR 而不是其他两条指令。假设前端是瓶颈,而不是内存或其他东西。作为一个完整的内存屏障,也会对周围代码中的加载/存储产生影响,但幸运的是,不会对独立的ALU指令的无序执行产生影响。)


    非x86: LL/SC machines

    大多数非x86机器都使用加载链接/存储条件重试循环执行其所有原子操作。有点遗憾的是,C++ 11不允许您创建定制的LL/SC操作(例如 (x-1) & x 在一个ll/sc中,而不仅仅是使用 fetch_add 但是CAS机器(如x86)不能提供ll/sc提供的ABA检测。因此,还不清楚如何设计一个C++类,它可以在x86上高效编译,也可以直接在ARM和其他L/SC ISAS上进行L/SC重试循环。(见 this discussion )

    所以你只需要用 compare_exchange_weak 如果C++没有提供所需的操作(例如 fetch_or fetch_and )

    从当前的编译器中得到的实践是 比较交换弱 用ll/sc实现,算术运算与之分离。asm实际上在独占加载获取之前执行松弛加载( ldaxr ,而不是仅仅基于 十二烷基硫酸钠 结果。还有额外的分支来检查 预期 在尝试存储之前,上一次尝试与新的加载结果匹配。

    ll/sc窗口可能比在加载和存储之间使用2个依赖的ALU指令要短,以防发生问题。CPU具有预先准备好的所需值,而不依赖于负载结果。(这取决于 以前的 加载结果。)Clang的代码gen将 sub / and 在循环内部,但是依赖于前一次迭代的负载,因此,在无序执行的情况下,它们仍然可以提前准备好。

    但如果这真的是最有效的方法,编译器应该将其用于 获取添加 也。它们不是因为可能不是这样。ll/sc重试很少,就像x86上的cas重试一样。我不知道它是否会对非常高的争用情况产生不同的影响。也许吧,但是编译器不会在编译时放慢优化的速度。 获取添加 .

    我重新命名了您的函数并重新格式化了 while() 为了可读性,因为它对于一行来说已经太长了,而没有用 不太可能() .

    这个版本编译的ASM可能比您的原始版本稍微好一点,带有clang . 我还修正了函数名,所以它实际上可以编译;您的问题不匹配。我选择了与x86的bmi指令名相似的完全不同的名称,因为它们简洁地描述了操作。

    #include <atomic>
    #include <cstdint>
    
    #ifdef __GNUC__
    #define unlikely(expr) __builtin_expect(!!(expr), 0)
    #define likely(expr) __builtin_expect(!!(expr), 1)
    #else
    #define unlikely(expr) (expr)
    #define likely(expr) (expr)
    #endif
    
    inline uint64_t clear_lowest_set(std::uint64_t v){
        return v-1 & v;
    }
    inline uint64_t isolate_lowest_set(std::uint64_t v){
        //return v & ~clear_lowest_set(v);
        return (-v) & v;
        // MSVC optimizes this better for ARM when used separately.
        // The other way (in terms of clear_lowest_set) does still CSE to 2 instructions
        //  when the clear_lowest_set result is already available
    }
    
    uint64_t pop_lowest_non_zero(std::atomic_uint64_t& foo)
    {
        auto expected = foo.load(std::memory_order_relaxed);
        while(unlikely(!foo.compare_exchange_weak(
            expected, clear_lowest_set(expected), 
            std::memory_order_seq_cst, std::memory_order_relaxed)))
            {}
    
        return isolate_lowest_set(expected);
    }
    

    铿锵声 -O3 对于AARCH64( -target aarch64-linux-android -stdlib=libc++ -mcpu=cortex-a57 做了一些奇怪的代码。这是从clang6.0开始的,但是旧版本也有一些奇怪之处,在寄存器中创建一个0/1整数,然后测试它,而不是一开始就跳到正确的位置。

    pop_lowest_non_zero(std::__1::atomic<unsigned long long>&): // @pop_lowest_non_zero(std::__1::atomic<unsigned long long>&)
    
        ldr     x9, [x0]                   @ the relaxed load
        ldaxr   x8, [x0]                   @ the CAS load (a=acquire, x=exclusive: pairs with a later stxr)
        cmp     x8, x9                   @ compare part of the CAS
        b.ne    .LBB0_3
        sub     x10, x9, #1
        and     x10, x10, x9             @ clear_lowest( relaxed load result)
        stlxr   w11, x10, [x0]           @ the CAS store (sequential-release)
        cbnz    w11, .LBB0_4             @ if(SC failed) goto retry loop
    
              # else fall through and eventually reach the epilogue
    
        # this looks insane.  w10 = 0|1, then branch if bit[0] != 0.  Always taken, right?
        orr     w10, wzr, #0x1
        tbnz    w10, #0, .LBB0_5         @ test-bit not-zero will always jump to the epilogue
            b       .LBB0_6              # never reached
    
    
    .LBB0_3:
        clrex                           @ clear the ldrx/stxr transaction
    .LBB0_4:
        # This block is pointless; just should b to .LBB0_6
        mov     w10, wzr
        tbz     w10, #0, .LBB0_6    # go to the retry loop, below the ret (not shown here)
    
    .LBB0_5:               # isolate_lowest_set and return
        neg     x8, x9
        and     x0, x9, x8
        ret
    
    .LBB0_6:
         @ the retry loop.  Only reached if the compare or SC failed.
         ...
    

    所有这些疯狂的分支可能不是一个真正的性能问题,但很明显,这可能更有效(即使没有教clang如何直接使用ll/sc而不是模拟的cas)。 报告为clang/llvm bug 38173]。( https://bugs.llvm.org/show_bug.cgi?id=38173 )

    似乎msvc不知道aarch64的发布存储是顺序发布的(不仅仅是像x86这样的常规发布),因为它仍在使用 dmb ish 之后的指令(完全内存屏障) stlxr . 它减少了浪费的指令,但它的x86偏见正在显现,或者说: 比较交换弱 编译如下 compare_exchange_strong 使用一个可能无用的重试循环,在ll/sc失败时,使用旧的预期/期望值再次尝试CA。这通常是因为另一个线程修改了该行,所以预期将不匹配。(Godbolt在任何旧版本中都没有针对AARCH64的MSVC,因此支持可能是全新的。这可以解释 dmb )

       ## MSVC Pre 2018 -Ox
    |pop_lowest_non_zero| PROC
        ldr         x10,[x0]          # x10 = expected = foo.load(relaxed)
    
    |$LL2@pop_lowest|           @ L2  # top of the while() loop
        sub         x8,x10,#1
        and         x11,x8,x10        # clear_lowest(relaxed load result)
    |$LN76@pop_lowest|          @ LN76
        ldaxr       x8,[x0]
        cmp         x8,x10            # the compare part of CAS
        bne         |$LN77@pop_lowest|
        stlxr       w9,x11,[x0]
        cbnz        w9,|$LN76@pop_lowest|  # retry just the CAS on LL/SC fail; this looks like compare_exchange_strong
         # fall through on LL/SC success
    
    |$LN77@pop_lowest|          @ LN77
        dmb         ish                # full memory barrier: slow
        cmp         x8,x10             # compare again, because apparently MSVC wants to share the `dmb` instruction between the CAS-fail and CAS-success paths.
        beq         |$LN75@pop_lowest| # if(expected matches) goto epilogue
        mov         x10,x8             # else update expected
        b           |$LL2@pop_lowest|  # and goto the top of the while loop
    
    |$LN75@pop_lowest|          @ LN75   # function epilogue
        neg         x8,x10
        and         x0,x8,x10
        ret
    

    对于AARCH64,GCC6.3也会生成奇怪的代码,存储 预期 到堆栈。(Godbolt没有更新的AARCH64 GCC)。

    我对AARCH64编译器非常不满意!我想知道他们为什么不生产这样干净高效的产品 在快速路径上没有执行分支,只有一点行外代码设置为跳转回CAS重试。

    pop_lowest_non_zero ## hand written based on clang
                        # clang could emit this even without turning CAS into LL/SC directly
    
        ldr     x9, [x0]                   @ x9 = expected = foo.load(relaxed)
     .Lcas_retry:
        ldaxr   x8, [x0]                   @ x8 = the CAS load (a=acquire, x=exclusive: pairs with a later stxr)
        cmp     x8, x9                   @ compare part of the CAS
        b.ne    .Lcas_fail
        sub     x10, x9, #1
        and     x10, x10, x9             @ clear_lowest (relaxed load result)
        stlxr   w11, x10, [x0]           @ the CAS store (sequential-release)
        cbnz    w11, .Lllsc_fail
    
        # LL/SC success: isolate_lowest_set and return
    .Lepilogue:
        neg     x8, x9
        and     x0, x9, x8
        ret
    
    .Lcas_fail:
        clrex                           @ clear the ldrx/stxr transaction
    .Lllsc_fail:
        mov     x9, x8                  @ update expected
        b     .Lcas_retry           @ instead of duplicating the loop, jump back to the existing one with x9 = expected
    

    与…比较 .fetch_add :

    Clang确实为 fetch_add() :

        mov     x8, x0
    .LBB1_1:
        ldxr    x0, [x8]                # relaxed exclusive load: I used mo_release
        add     x9, x0, #1
        stlxr   w10, x9, [x8]
        cbnz    w10, .LBB1_1            # retry if LL/SC failed
        ret
    

    而不是 add #1 ,我们想要 sub x9, x8, #1 / and x9, x9, x8 ,所以我们只得到一个ll/sc重试循环。这节省了代码大小,但速度不会显著加快。在大多数情况下,甚至可能不会更快,特别是作为整个程序的一部分,它没有使用一个疯狂的数量。


    备选方案:您甚至需要这个位图操作吗?你能把它分解以减少争用吗?

    你能用原子计数器代替位图吗?需要时能把它映射到位图上吗? ?需要位图的操作可以将计数器映射到具有 uint64_t(~0ULL) << (64-counter) (仅适用于非零计数器)。或者可能 tmp=1ULL << counter; tmp ^= tmp-1; (即x86 xor eax,eax / bts rax, rdi (rax=1设置位,位置0..63)/ blsmsk rax, rax (rax=设置到该位置的所有位)。hmm,对于mask=0仍然需要一个特殊的情况,因为一个连续的位掩码有65种可能的状态:64个位置中的一个位置的最高(或最低)位,或者根本没有设置任何位。

    或者如果位图上有某种图案, x86 BMI2 pdep 可以将连续的位分散到该模式中。获取n个相邻的集合位 (1ULL << counter) - 1 ,对于计数器<64。


    或者它必须是位掩码,但不需要是单个位掩码?

    不知道您的用例是什么,但是这种想法可能有用:

    你…吗 需要 所有线程都必须争用的单个原子位图?也许您可以将它分成多个块,每个块位于单独的缓存行中。(但这使得不可能原子地快照整个映射。)不过,如果每个线程都有一个首选块,并且总是在继续寻找另一个块中的槽之前尝试这样做,那么在一般情况下,您可能会减少争用。

    在ASM(或者用C++中的恐怖联合黑客),你可以尝试减少CMPXCHG失败而不减少争用,通过找到64位整数的正确字节来更新,然后只有 锁定cmpxchg 在上面。这对这种情况实际上没有帮助,因为看到相同整数的两个线程都将尝试清除相同的位。但它可以减少这和在Qword中设置其他位之间的重试。当然,只有在 的正确性问题 锁定cmpxchg 当QWORD的其他字节发生更改时成功。

        2
  •  0
  •   catnip    6 年前

    所以基本上,问题是,我是否更好地循环 compare_exchange_weak 直到我找到/得到我想要的,或者我应该使用 mutex ?好吧,至少我现在理解了这个问题,让我们稍微探讨一下。

    所以,外面的人可能都在尖叫 哪个更快,哪个更快 ?嗯,我个人不太在乎,除非这在实践中是个问题,但是如果 注意,然后你应该做基准测试。你可以得到一阶近似值 Wandbox .

    但还有另一个更微妙的问题:锁定。第一个解决方案是无锁的,但是有一个繁忙的循环。第二个需要一把锁,这会有副作用。

    繁忙的循环可能是相当无害的。它只吸收了一个内核,不太可能长时间运行,但是如果有人怀疑是这样的话,当在现实环境下运行时,人们会想分析代码。

    另一方面,锁可能不会那么无害,因为它会导致 优先级反转 从而导致低优先级线程先于高优先级线程。在以不同优先级运行协作线程的任何应用程序中,这都可能是一个问题,但对我来说尤其如此,因为我编写了实时音频代码。它在Mac上咬了我一口,我就是这样知道这一切的。

    所以,我希望这至少能告诉你一些你想知道的事情。对于你的代表,我不应该试图告诉你如何“写代码”。

    参考文献: https://en.m.wikipedia.org/wiki/Priority_inversion