1:nsq的流量控制 RDY

  • 消息中间件的实现无非两种套路,一种让客户端pull,典型的比如kafka便是如此,而另一种则是push,也就是让客户端不需要做任何操作,只需要做好conn便可以源源不断收到服务端的推送,典型的代表就是我们今天介绍的nsq。

    pull的优势在于客户端可以自己做流控,比如客户端想什么时候pull就什么时候pull,不会因为服务端的强迫而接受,但劣势也很明显,如果服务端的生产速度很慢,客户端需要不断的轮询会让cpu处于繁忙且无用的状态。

    push的优势则在于能够不受限于客户端的速度,可以让服务端更快的、批量的把数据push给客户端,因此大部分push实现的消息中间件都是属于内存型,而nsq比较特殊,它实际上是内存+磁盘的一个消息中间件。

  • 上面也说了,pull流的优势在于可以让客户端自由控制消息的速度,但是push流不一样,push流不管客户端是否多繁忙都会推送消息,如果没有一个流控机制,很容易让客户端最终因为消费速度跟不上导致产生各种性能问题。nsq其实也考虑到这一点,于是采用了一个RDY的状态字段来表示流控。简单来说,就是客户端连接上nsqd之后,会告诉nsqd它的可接受的消息数量是多少,每当nsqd给客户端推送一条消息这个RDY就会减一,而客户端消费完毕并且发送一个FIN之后,这个RDY又会加一(其实这个设计有点类似tcp中的用来控制流量的窗口机制)。

	// ConnectToNSQD 部分代码
// pre-emptive signal to existing connections to lower their RDY count
r.connections[addr] = conn for _, c := range r.conns() {
r.maybeUpdateRDY(c)
}
  • 可以看到 go-nsq 的Consumer 结构体中有一个字段 connections,它是一个 map,key是nsqd addr,value是连接成功的Conn。
  • 上面的代码表示会遍历这个Customer的所有 nsqd conn(customer可以同时连接多个nsqd),然后调用 maybeUpdateRDY 方法。
func (r *Consumer) maybeUpdateRDY(conn *Conn) {
inBackoff := r.inBackoff()
inBackoffTimeout := r.inBackoffTimeout()
if inBackoff || inBackoffTimeout {
r.log(LogLevelDebug, "(%s) skip sending RDY inBackoff:%v || inBackoffTimeout:%v",
conn, inBackoff, inBackoffTimeout)
return
} count := r.perConnMaxInFlight()
r.log(LogLevelDebug, "(%s) sending RDY %d", conn, count)
r.updateRDY(conn, count)
}
  • 计算平均每一个连接的最大 InFlight 数量。
func (r *Consumer) updateRDY(c *Conn, count int64) error {
if c.IsClosing() {
return ErrClosing
} // never exceed the nsqd's configured max RDY count
if count > c.MaxRDY() {
count = c.MaxRDY()
} // stop any pending retry of an old RDY update
r.rdyRetryMtx.Lock()
if timer, ok := r.rdyRetryTimers[c.String()]; ok {
timer.Stop()
delete(r.rdyRetryTimers, c.String())
}
r.rdyRetryMtx.Unlock() // never exceed our global max in flight. truncate if possible.
// this could help a new connection get partial max-in-flight
rdyCount := c.RDY()
maxPossibleRdy := int64(r.getMaxInFlight()) - atomic.LoadInt64(&r.totalRdyCount) + rdyCount
if maxPossibleRdy > 0 && maxPossibleRdy < count {
count = maxPossibleRdy
}
if maxPossibleRdy <= 0 && count > 0 {
if rdyCount == 0 {
// we wanted to exit a zero RDY count but we couldn't send it...
// in order to prevent eternal starvation we reschedule this attempt
// (if any other RDY update succeeds this timer will be stopped)
r.rdyRetryMtx.Lock()
r.rdyRetryTimers[c.String()] = time.AfterFunc(5*time.Second,
func() {
r.updateRDY(c, count)
})
r.rdyRetryMtx.Unlock()
}
return ErrOverMaxInFlight
} return r.sendRDY(c, count)
}
  • 此处主要是对 rdy计数count 进行了预处理。
func (r *Consumer) sendRDY(c *Conn, count int64) error {
if count == 0 && c.LastRDY() == 0 {
// no need to send. It's already that RDY count
return nil
} atomic.AddInt64(&r.totalRdyCount, count-c.RDY())
c.SetRDY(count)
err := c.WriteCommand(Ready(int(count)))
if err != nil {
r.log(LogLevelError, "(%s) error sending RDY %d - %s", c.String(), count, err)
return err
}
return nil
}
  • 组装成 RDY 命令,通过当前连接的 WriteCommand方法告诉给 nsqd。
// NewConsumer
func (r *Consumer) rdyLoop() {
redistributeTicker := time.NewTicker(r.config.RDYRedistributeInterval) for {
select {
case <-redistributeTicker.C:
r.redistributeRDY()
case <-r.exitChan:
goto exit
}
} exit:
redistributeTicker.Stop()
r.log(LogLevelInfo, "rdyLoop exiting")
r.wg.Done()
}
  • 在连接成功之后,也会单独开启一个 goroutine 在后台不断去调整这个rdycount

2.nsq消息传输的可靠性与持久化

消息传输的可靠性

思考下面的问题:

  • 网络传输的不确定性,比如超时。
  • 客户端处理消息时崩溃,消息如何重传。
  • 如何标识消息被客户端成功处理完毕。
  • 消息的持久化,nsq服务端重新启动时消息不丢失。
func (p *protocolV2) messagePump(client *clientV2, startedChan chan bool)
  • 客户端处理消息的逻辑:

    • 在上述函数中,服务端会定时检查client端的连接状态,读取客户端发过来的各种命令,发送心跳等。
    • 每一个连接最终的目的就是监听channel的消息,发送给客户端进行消费。
    • 在发送给客户端之前,把这个消息设置为在飞翔中。
    • 对发送给客户端信息设置为在飞翔中,如果在如果处理成功就把这个消息从飞翔中的状态中去掉,如果在规定的时间内没有收到客户端的反馈,则认为这个消息超时,然后重新归队。
func (n *NSQD) queueScanLoop()
  • 服务端处理消息的逻辑:

    • 使用协程定时去扫描随机的channel里是否有过期数据。
    • 在扫描channel的时候,如果发现有过期数据后,会重新放回到队列,进行重发操作。

客户端对消息的处理和响应

func (r *Consumer) handlerLoop(handler Handler)
  • 在服务端发送消息给客户端后,如果在处理业务逻辑时,如果发生错误则给服务器发送Requeue命令告诉服务器,重新发送消息进处理。
  • 如果处理成功,则发送Finish命令。
  • 服务端收到命令后,对飞翔中的消息进行处理,如果成功则去掉,如果是Requeue则执行归队和重发操作,或者进行defer队列处理。

消息的持久化

  • 虽然系统支持消息持久化存储在磁盘中(通过 --mem-queue-size ),不过默认情况下消息都在内存中。
  • 如果将 --mem-queue-size(一个channel的容量) 设置为 0,所有的消息将会存储到磁盘。我们不用担心消息会丢失,nsq 内部机制保证在程序关闭时将队列中的数据持久化到硬盘,重启后就会恢复。
  • NSQ 没有内置的复制机制,却有各种各样的方法管理这种权衡,比如部署拓扑结构和技术,在容错的时候从属并持久化内容到磁盘。
  • diskqueue 是nsq自己实现的一个先进先出的消息文件队列,go-diskqueue是把消息爆出到本地文件内。

3.消息的负载处理

​ 实际应用中,一部分服务集群可能会同时订阅同一个topic,并且处于同一个channel下。当nsqd有消息需要发送给订阅客户端去处理时,发给哪个客户端是需要考虑的,也就是消息的负载。

​ 如果不考虑负载情况,把随机的把消息发送到某一个客服端去处理消息,如果机器的性能不同,可能发生的情况就是某一个或几个客户端处理速度慢,但还有大量新的消息需要处理,其他的客户端处于空闲状态。理想的状态是,找到当前相对空闲的客户端去处理消息。

  • nsq的处理方式是客户端主动向nsqd报告自已的可处理消息数量(也就是RDY命令)。nsqd根据每个连接的客户端的可处理消息的状态来随机把消息发送到可用的客户端,来进行消息处理。
  • MaxInFlight 用来设置最大的处理中的消息数量,会根据这个变量计算是否需要更新RDY
  • 初始化的时候 客户端会向连接的nsqd服务端来发送updateRDY来设置最大处理数。
func (r *Consumer) onConnMessage(c *Conn, msg *Message) {
atomic.AddInt64(&r.totalRdyCount, -1)
atomic.AddUint64(&r.messagesReceived, 1)
r.incomingMessages <- msg
r.maybeUpdateRDY(c)
}
  • 当有消息从nsqd发送过来后也会调用maybeUpdateRDY方法,计算是否需要发送RDY命令。

  • 上面就是主要的处理逻辑,但还有一些逻辑,就是当消息处理发生错误时,nsq有自己的退避算法backoff也会更新RDY 简单来说就是当发现有处理错误时,来进行重试和指数退避,在退避期间RDY会为0,重试时会先放尝试RDY为1看有没有错误,如果没有错误则全部放开,具体处理逻辑这里就不细说了。

参考资料:

https://studygolang.com/articles/21065?fr=sidebar

https://www.cnblogs.com/li-peng/p/11868123.html

https://github.com/nsqio/go-diskqueue

最新文章

  1. 4.修改更新源sources.list,提高软件下载安装速度(提供Kali 2.0 更新源)
  2. 【转】WPF 给DataGridTextColumn统一加上ToolTip
  3. 智能车学习(十三)&mdash;&mdash;角度控制
  4. Caffe 源碼閱讀(一) Blob.hpp
  5. final和static
  6. dict内部方法
  7. DBA_Oracle Erp升级时如何确定具体的Patch ID(案例)
  8. js 对象与函数的区别
  9. Unity3D的几种坐标系
  10. Spark0.8.0的安装配置
  11. 常用webservice接口案例
  12. cocos2dx新建工程分析
  13. asp.net core mvc剖析:mvc执行过程(一)
  14. python+selenium自动化软件测试(第11章):持续集成jenkins和GitHub的使用
  15. 在ssm框架中前后台数据交互均使用json格式
  16. 开源项目-网上公开http代理爬取、简单分类
  17. 【Python】 上下文管理器和contextlib
  18. Flutter去除右上角Debug标签
  19. U3D一些使用
  20. [学习笔记]JS 数组Array push相关问题

热门文章

  1. Shapefile导入Oracle
  2. Vue学习笔记之Vue-Router
  3. No.2.4
  4. Vulnhub:maskcrafter-1.1靶机
  5. AWK nr nf
  6. 【Direct3D 12】配置编译环境
  7. Docker之Elastic Search&amp;Kibana保姆级别安装
  8. HTML网址集合
  9. ubuntu 安装SVN
  10. Docker私服(Registry)