<返回更多

等待的艺术:从Spin Lock到Token Bucket算法

2022-10-28  CSDN  码洞
加入收藏

前言

CPU (Central Processing Unit)作为整个冯·诺依曼架构的控制与运算中心,终其一生都在执行没有边界的指令,用无差别的计算支撑起智能时代“算力取之不尽用之不竭”的梦。
但这样的计算并不是100%有意义的:糟糕的算法设计造成了大量的重复计算;忽视局部性与连续性的代码用cache miss粗暴地蹂躏着多级缓存,甚至触发频繁的cpu stall;低效的调度和密集的资源竞争拉低了程序的整体运行效率与吞吐...etc
因此在CS的不同领域,不管是kernel,语言运行时,网络,存储...都结合了各自的场景,催生出了无数的策略以最小化“等待”的计算量,让宝贵的算力尽可能服务于真正有价值的指令。本文罗列的的例子也许并不是那么Apple to apple,但它们都从不同的视角给我启发,且有一定的共通。

出于篇幅考虑文章内容可能不尽详细,默认读者对操作系统,网络,concurrency control等前置知识有一定程度的了解。

Spin Lock

在多核处理器的并发编程中,线程/进程(后统一用线程)间的同步原语是永远都绕不开的,spin lock可以说是其中最简单直接的一种实现。说白了spin lock只是一个忙等待的循环,看似低效,但在kernel和各种高性能库中却应用广泛。
歪?并发的资源竞争中,保护临界区的方法无非两种:1. 抢占并持续获得线程所有权;2. 依赖内核的线程调度机制。但内核调度的隐性开销是content switching,在竞争密集时这样的开销会被放大,代价十分可观,但如果临界区长度短小可控,spin的次数也可控,那么显然spin lock会成为一个更优的选择。

// 一个最简单的spin lock实现
std::atomic<bool> lock_{false};
void lock() {
    while(true) {
        while (lock_.load());
        if (!lock_.exchange(true)) break;
    }
}
void unlock() {
    lock_.store(false);
}

但聪明的CPU与编译器并不会直译代码,为了指令最大程度的并行执行通常会对代码进行一定程度的打乱和重排序,你亲手敲下的代码的行为可能会与预期有出入,而约束指令乱序行为依赖于不同等级的memory-order内存序模型。在主流的x86平台中,默认的memory order约束实现为SC(Sequential Consistency),是一种较强的内存序保证。Wiki上的官方定义为:

...the result of any execution is the same as if the operations of all the processors were executed in some sequential order, and the operations of each individual processor appear in this sequence in the order specified by its program...

简而言之,还是会有一定的乱序和重排,但是在程序员的可见视角,最终的执行顺序与代码定义的提交顺序是一致的。上述实现中使用了c++标准库封装的atomic接口,默认的memory order同样也是SC。C++提供了6种标准的memory order实现,我们可以在保证临界区安全性的前提下,通过放松内存序来进一步提升spin lock的性能。


保证临界区安全性前提是:临界区内的代码必须严格遵循sequential consistency。
对照6种内存序列,我们把目光聚焦在通常成对使用的acquire-release模型上:acquire-release模型能够保证原子变量一端store,另一端永远能够load到最新的值,其隐含的意思是:release之前的store()操作,对其他线程acquire之后的任何load()操作都可见!同时,acquire前的代码可以往后乱序(但acquire后的代码不允许往前乱序),release后的代码可以往前乱序(但release前的代码不允许往后乱序)!应用到spin lock中,意味着aquire-release保护的临界区可以在多个线程之间保证SC-level的可见性。

在临界区安全的前提下扩大了乱序执行的区间,通过细粒度的memory order配置进一步提升了并发度,缩短spin等待时间。

// acquire-release模型的spin lock
std::atomic<bool> lock_{false};
void lock() {
    while(true) {
        while (lock_.load(std::memory_order_relaxed));
        if (!lock_.exchange(true, std::memory_order_acquire)) break;
    }
}
void unlock() {
    lock_.store(false, std::memory_order_release);
}

x86平台中,为了进一步压榨spin lock的性能,实现了一种平台独占的pause指令:不同于thread::yield(),pause并没有放弃线程使用权,而是告诉processor这是一个spin-lock,可以把一些指令优化暂时关闭,稍稍减慢线程的执行速度,带来的收益是:减少了锁变量的竞争,免去了潜在的memory order检测,以及保护了当前的cache line不被其他waiter清空,优化的效果显著。通常的使用姿势如下:

// acquire-release模型+pause指令的spin lock
#define PAUSE() asm("pausen")
std::atomic<bool> lock_{false};
void lock() {
    while(true) {
        while (lock_.load(std::memory_order_relaxed)) {
            PAUSE();
        }
        if (!lock_.exchange(true, std::memory_order_acquire)) break;
    }
}
void unlock() {
    lock_.store(false, std::memory_order_release);
}

作为高频使用的利器,spin lock的优化并未止步于此,诸如以下。

MCS spin lock

MCS spin lock是学术界提出的一种spin lock改进版本,已被最新的linux kernel采纳,目的是减少lock在不同cpu core之间的迁移,每个core都持有一个本地标志变量,每个core上的线程只在本地变量自旋,通过一个全局的链表将这些waiters串联起来。申请MCS spin lock时会得到前置锁拥有者的锁结构体指针(其中包含各自的waiting本地标识变量),并把next指针指向自己,在原子操作的保证下获得了一个FIFO的申请顺序。当一个线程执行完临界区的代码后,判断next指针域是否还有别的waiter,如果有的话就按的顺序把它的waiting置0,交出锁的使用权。以这种巧妙的方法实现了锁的传递,大幅减少锁的竞争和跨cpu core传递。

NUMA Aware spin lock

当前主流服务器支持的NUMA(Non-Uniform Memory Access) 架构下,每个cpu socket都被分配了自己的内存空间,极大降低了local socket的内存访问延迟,如果需要访问的数据在其他cpu socket(NUMA node),则需要依赖特殊的remote access协议。在这样的场景下,可以引入分布式一致性机制来减少cache line在不同cpu socket之间的迁移,详见参考资料。
(底层的世界就是分布式系统的微观表现!)

Mutex per core

当线程数>cpu核数时,在spin和CFS调度的共同作用下,大量线程持续不断在争夺锁资源,这时为每个cpu core引入一个mutex,使得每个core上的某个时刻只允许有一个线程参与竞争,大幅减少全局的锁竞争程度。

Golang Runtime

Golang作为云计算时代的"C"语言(见仁见智了),也是笔者目前的主力语言,其重要特色之一是其内置的用户态协程机制——goroutine,在runtime的优秀设计与调度下,goroutine真正实现了高效运行而无需过多关心栈空间、上下文切换、并发上限等问题,just run as you go!
Golang遵循M-P-G的并发模型,将用户态的最小执行单元goroutine映射到内核态的OS线程中,呈M:N的映射关系:

 

M-P-G模型

Runtime的工作,就是基于以上的模型定义让每一个Machine充分负载,无间断地执行用户代码,做一个没有感情的指令执行机。顺利的话,runtime的工作流如下:

  1. P从其维护的本地可运行队列(LRQ)里取出G来执行。为保证公平,也有一定几率(1/61)会从全局可运行队列中(GRQ)取G。
  2. P的LRQ满了(运行队列的本质是一个ring buffer),就把新到达的G放入GRQ,把负载转移给其他相对空闲的P。
  3. P的LRQ空,P就尝试去GRQ中偷任务,把其中的一半G转移到自己的LRQ中。
  4. 如果LRQ与GRQ都为空,则P从随机选择一个其他P,从其LRQ里偷一半的G来执行。
  5. 还有一个单独的daemon goroutine,负责监控其他goroutine的执行时间,把运行时长>10ms的goroutine调度到GRQ中,让短而小的goroutine优先执行并结束。

 

goroutine主要状态迁移

但以上工作流,还不足以做到“无间断地执行用户代码”。Goroutine和POSIX线程一样,主要在3种状态之间转移:Waiting/Runnable/Running,注意LRQ和GRQ存储的是状态为可运行(Runnable)的G,P拿到G后,将它的状态置为Running并分配给M来真正执行代码。但当陷入系统调用(syscall)或同步/异步等待IO时,goroutine会阻塞并进入Waiting状态,这时runtime会有其他机制来保证计算资源被充分利用:

  1. 当陷入系统调用时,或同步等待IO时,当前M被阻塞,M与P暂时解绑但M-G保持关联关系,P会创建一个新的M,即向OS申请一个新的线程,把新到达的G继续分配给新M执行。等阻塞解除,原先的M会被保存起来静待后续复用。
  2. 当陷入异步等待IO,如网络IO时,阻塞的G会被一个成.NETwork poller的结构接手,而M则继续服务别的G,异步IO相关的文件描述符(fd)就绪后,阻塞的G重新进入LRQ。在Linux平台,network poller实际上是对epoll的封装,注册阻塞的fd并监听到达的事件,就绪的fd所在的G就会被重新调度,回到原来的LRQ等待被执行。

至此,基本实现了golang runtime的高效调度模型,实际上用户态的协程切换除了上下文切换的代价小,还有其他诸如CPU缓存友好等隐性收益。而很多内置的语言特性及标准库实现都基于这个模型,:

浅析channel

把channel单独拎出来说说,它是golang最为推崇的并发通信哲学的实现:

Don't communicate by sharing memory; share memory by communicating.

扒一下channel的定义:

// channel runtime struct
type hchan struct {
    // 过滤部分重点成员变量
    qcount   uint
    dataqsiz uint
    buf      unsafe.Pointer
    sendx    uint  
    recvx    uint
    recvq    waitq
    sendq    waitq
    // ...
}

// waitq代表channel收/发的两个队列
// sudog是对goroutine结构的封装,channel持有相关等待goroutine的引用,
// sudog中也同样持有对channel结构的指针
type waitq struct {
    first *sudog
    last  *sudog
}

可以看到实现跨线程消息收发,主要依赖了channel结构中的ring buffer和用waitq表示的收/发两个goroutine队列。
channel也分为unbuffered chan和buffered chan,通过初始化时是否制定了capacity来区分,unbuffered chan是标准的同步阻塞IO模型,而buffered chan则给予了一定限度的异步非阻塞空间。

 

goroutine channel

下面从一些关键的代码片段来看看这两种chan在runtime的实现中具体有什么区别:

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {

    // ...

    // 已有chan在别的goroutine中等待接受,可以直接发送并返回
    if sg := c.recvq.dequeue(); sg != nil {
        // Found a waiting receiver. We pass the value we want to send
        // directly to the receiver, bypassing the channel buffer (if any).
        send(c, sg, ep, func() { unlock(&c.lock) }, 3)
        return true
    }

    // ...

    // 当前为buffered chan,且buffer还有富余,直接将变量拷贝到buffer,
    // 更新相关标记位后直接可以返回
    if c.qcount < c.dataqsiz {
        // Space is available in the channel buffer. Enqueue the element to send.
        qp := chanbuf(c, c.sendx)
        typedmemmove(c.elemtype, qp, ep)
        c.sendx++
        if c.sendx == c.dataqsiz {
            c.sendx = 0
        }
        c.qcount++
        unlock(&c.lock)
        return true
    }

    // ...

    // 最后剩下阻塞发送的逻辑,既不是buffered chan且其他goroutine中暂无
    // 就绪的接收方,或者buffered chan已满,都会执行阻塞发送,以下代码仍
    // 有所精简
    if !block {
        unlock(&c.lock)
        return false
    }

    // 主要的动作是获取当前goroutine,和channel一起与sudog绑定并设置到
    // channel的发送队列中,同时通过goparkunlock函数让出当前的goroutine
    // 执行权,等待接收方的唤醒

    // Block on the channel. Some receiver will complete our operation for us.
    gp := getg()
    mysg := acquireSudog()
    // No stack splits between assigning elem and enqueuing mysg
    // on gp.waiting where copystack can find it.
    mysg.elem = ep
    mysg.waitlink = nil
    mysg.g = gp
    mysg.isSelect = false
    mysg.c = c
    gp.waiting = mysg
    gp.param = nil
    c.sendq.enqueue(mysg)
    goparkunlock(&c.lock, waitReasonChanSend, traceEvGoBlockSend, 3)

    // ...
    // 被接收方唤醒后,该goroutine重新获得执行权,继续扫尾工作
    mysg.c = nil
    releaseSudog(mysg)
    return true
}

以上是channel发送端的主要逻辑,在一段逻辑中区分开了3种case,比较有意思的是函数并不是一次性执行完的,发送前的准备工作就绪后即让出当前的执行权,等待接收方唤醒后进行扫尾工作,函数正常退出。

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {

    // 从nil chan中试图接收,直接让出执行权
    if c == nil {
        if !block {
            return
        }
        gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
        throw("unreachable")
    }

    // ...

    // chan已被关闭,且buffer中没有残留元素,清除相关堆栈信息后返回
    if c.closed != 0 && c.qcount == 0 {
        unlock(&c.lock)
        if ep != nil {
            typedmemclr(c.elemtype, ep)
        }
        return true, false
    }

    // 对于同步阻塞的chan发送方,执行recv,接收并拷贝对应的变量值,然后通过goready
    // 函数唤醒发送方的goroutine
    if sg := c.sendq.dequeue(); sg != nil {
        // Found a waiting sender. If buffer is size 0, receive value
        // directly from sender. Otherwise, receive from head of queue
        // and add sender's value to the tail of the queue (both map to
        // the same buffer slot because the queue is full).
        recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
        return true, true
    }

    // 对于buffered chan,从buffer的对应索引处取出元素并消费后直接返回,这里不需要
    // 调用goready,因为对于成功发送的发送方本就是一个异步操作,无需唤醒
    if c.qcount > 0 {
        // Receive directly from queue
        qp := chanbuf(c, c.recvx)
        if ep != nil {
            typedmemmove(c.elemtype, ep, qp)
        }
        typedmemclr(c.elemtype, qp)
        c.recvx++
        if c.recvx == c.dataqsiz {
            c.recvx = 0
        }
        c.qcount--
        unlock(&c.lock)
        return true, true
    }

    // 最后落到了和sendchan类似的逻辑,接收方等待接收但当前没有可用的发送方,于是陷入
    // 阻塞等待,获取当前goroutine,连同channel一起与sudog绑定,设置到channel的
    // 接受队列中,并让出执行权
    if !block {
        unlock(&c.lock)
        return false, false
    }

    // no sender available: block on this channel.
    gp := getg()
    mysg := acquireSudog()
    // No stack splits between assigning elem and enqueuing mysg
    // on gp.waiting where copystack can find it.
    mysg.elem = ep
    mysg.waitlink = nil
    gp.waiting = mysg
    mysg.g = gp
    mysg.isSelect = false
    mysg.c = c
    gp.param = nil
    c.recvq.enqueue(mysg)
    goparkunlock(&c.lock, waitReasonChanReceive, traceEvGoBlockRecv, 3)

    // ...

    // 被发送方唤醒后,该goroutine重新获得执行权,继续扫尾工作
    closed := gp.param == nil
    gp.param = nil
    mysg.c = nil
    releaseSudog(mysg)
    return true, !closed
}

以上是channel接收端的主要逻辑,细分了5个case,大致的工作流与发送端类似。已经做了精简但两段代码看着还是比较长,不过好在逻辑清晰,应该很容易能看懂。
为了在channel与goroutine之间建立联系,runtime封装了sudog结构,引用了当前阻塞的channel与goroutine,并追加到发送/接收队列中,经过调度获得执行权后又可以解引用来唤醒对端执行数据的收与发,读与写,巧妙地实现了并发通信机制。限于篇幅,还是有很多细节没有铺开,感兴趣的读者可以从参考资料中获取更多信息。

值得一提的是,经过不断的版本迭代,go runtime的STW(Stop The World)时间已被控制的十分优秀,这也是"优化等待"的一个重要主题。

Token Bucket

网络也是计算机科学中的宏大主题,是所谓“万物互联”的根基,而网络的可用性基本决定了一个复杂分布式系统可用性的下限。出于全局可用性的考虑,诞生了各种各样的控制算法,如我们非常熟悉的TCP/IP协议中的基于滑动窗口的的流量控制算法,基于慢开始、快速重传、快速恢复等机制的拥塞控制算法。其中在流量整形(Traffic Shaping)与限流(Rate Limiting)中,Token Bucket算法被广泛应用。
例如:在kubernetes非常普遍的controller pattern中,通常会持有一个RateLimitingQueue将其监听到的events先入队,然后由多个goroutine去消费events,而events出队的速率就是由Token Bucket算法保证的。除此之外,在多租户网络限流,路由限速,全局流控等方面,TokenBucket都有妙用,通过局部的限流整形措施,来达到全局的更优。
朴素的TokenBucket算法简单直观:

 

Token Bucket

当然在具体的实现上,间隔interval补充token并不意味着真的要有一个工作线程去定时触发,可以记录相关的tick offset(而不是直接时间戳,tick才是时间维度的最小单位),把补充token的动作延迟到消费token,通过对比时间差去动态的调整token数量。


bucket的大小依据一般是突发流量最长可容忍时长*每一帧数据包的平均大小,超过限流阈值的流量就会因为token不足而触发管制措施:丢包或阻塞等待,至于阻塞等待的时长则会根据缺少的token数量换算出来,并重置bucket。
但网络是个大黑盒,你不会知道下一秒到达的数据包payload有多大,也无法100%准确预测流量的峰值速率,这时候如果只是粗暴地丢包或者等待,对稳定性或性能都存在潜在的风险。于是在朴素TokenBucket算法的基础上,RFC扩展了对于数据包处理的行为:单速率三色标记算法(single rate threecolor marker,srTCM)和双速率三色标记算法(two rate threecolor marker,trTCM)。两者都将数据包分为红/黄/绿三种颜色等级,不同颜色对应不同的处理行为。

单速率三色标记算法

srTCM引入了一个新bucket:Excess Bucket,简称E桶,原先正常使用的bucket为Committed Bucket,简称C桶,双桶共同协作。单速率指的是这两个桶的信息速率相同,即有相同的interval去补充token,每次补充的数量也相同,补充的顺序为先C后E。但两桶的容量可以不相同,E桶的大小即允许的最大超额大小,称为Te,C桶大小为Tc,Te > Tc。
正常情况下并不会用到E桶,只有C桶中未消费完的token,以及C桶满了以后新到达的token会被移交入E桶作备用。当有突发的大尺寸数据包到达,且C桶内的token不足时,就会消费E桶内的token而不会触发限流。
若TokenBucket用于发送端的限流,三色标记就派上用场了,红色代表丢弃数据包,黄色代表阻塞等待,绿色代表合法。同时还有色盲(color-blind)与感色(color-aware)两种工作模式:

单速率三色标记算法更关注处理突发的数据包尺寸。

双速率三色标记算法

trTCM与srTCM的区别主要就在于双桶的信息速率可以不同,即interval与每次补充的token量都可以不同。双速率三色标记算法更注重突发的数据速率。
在trTCM中,第二个桶被称为P(Peak)桶,与srTCM不同,两个桶的token是互相隔离的,C桶中的token并不会转移到P桶。另一个重要不同是每一次都会先比较是否为突发速率的流量,再判断是否为正常速率,因为数据速率并不是对单个数据包的测量。trTCM同样有色盲与感色模式:

关于TokenBucket的实现,推荐参考juju/ratelimit和folly/tokenbucket。

总结

与“等待”博弈的过程就是与性能,全局最优,熵增博弈的过程,不同领域既有自成一套的方法论,也有数不尽的共通之处,最终说白了就是如何用有限的时间片去创造更大的价值。

原文链接:
https://mp.weixin.qq.com/s/c0l-6hcXFACHtS6U57LPqQ

声明:本站部分内容来自互联网,如有版权侵犯或其他问题请与我们联系,我们将立即删除或处理。
▍相关推荐
更多资讯 >>>