NSQ简介

NSQ 是实时的分布式消息处理平台,其设计的目的是用来大规模地处理每天数以十亿计级别的消息。NSQ 具有分布式和去中心化拓扑结构,该结构具有无单点故障、故障容错、高可用性以及能够保证消息的可靠传递的特征,是一个成熟的、已在大规模生成环境下应用的产品。

NSQ 由 3 个守护进程组成: 
nsqd 是接收、保存和传送消息到客户端的守护进程。 
nsqlookupd 是管理的拓扑信息,维护着所有nsqd的状态,并提供了最终一致发现服务的守护进程。 
nsqadmin 是一个 Web UI 来实时监控集群和执行各种管理任务。 

这篇文章介绍主要介绍nsqd的实现。

Topic与Channel

Topic与Channel是NSQ中重要的两个概念。 
生产者将消息写到Topic中,一个Topic下可以有多个Channel,每个Channel都是Topic的完整副本。 
消费者从Channel处订阅消息,如果有多个消费者订阅同一个Channel,Channel中的消息将被传递到一个随机的消费者。

要理解Topic Channel中各种chan的作用,关键是要理解golang中如何在并发环境下如何操作一个结构体(多个goroutine同时操作topic),与C/C++多线程操作同一个结构体时加锁(mutex,rwmutex)不同,go语言中一般是为这个结构体(topic,channel)开启一个主goroutine(messagePump函数),所有对该结构体的改变的操作都应是该主goroutine完成的,也就不存在并发的问题了,其它goroutine如果想要改变这个结构体则应该向结构体提供的chan中发送消息(msgchan)或者通知(exitchan,updatechan),主goroutine会一直监听所有的chan,当有消息或者通知到来时做相应的处理。

数据的持久化

了解数据的持久化之前,我们先来看两个问题? 
1. 往Topic中写入消息就是将消息发送到Topic.memoryMsgChan中,但是memoryMsgChan是一个固定内存大小的内存队列,如果队列满了怎么办呢?会阻塞吗? 
2. 如果消息都存放在memoryMsgChan这个内存队列中,程序退出了消息就全部丢失了吗?

NSQ是如何解决的呢,nsq在创建Topic、Channel的时候都会创建一个DiskQueue,DiskQueue负责向磁盘文件中写入消息、从磁盘文件中读取消息,是NSQ实现数据持久化的最重要结构。 
以Topic为例,如果向Topic.memoryMsgChan写入消息但是memoryMsgChan已满时,nsq会将消息写到topic.DiskQueue中,DiskQueue会负责将消息内存同步到磁盘上。 
如果从Topic.memoryMsgChan中读取消息时,但是memoryMsgChan并没有消息时,就从topic.DiskQueue中取出同步到磁盘文件中的消息。

我们看到topic.backend(diskQueue)负责将消息写到磁盘并从磁盘中读取消息,diskQueue提供了两个chan供外部使用:readChan与writeChan。 
我们来看下diskQueue实现中的几个要点。

  1. diskQueue在创建时会开启一个goroutine,从磁盘文件中读取消息写到readChan中,外部goroutine可以从readChan中获取消息;随时监听writeChan,当有消息时从wirtechan中取出消息,写到本地磁盘文件。
  2. diskQueue既要提供文件的读服务又要提供文件的写服务,所以要记录下文件的读位置(readIndex),写位置(writeIndex)。每次从文件中读取消息时使用file.Seek(readindex)定位到文件读位置然后读取消息信息,每次往文件中写入消息时都要file.Seek(writeIndex)定位到写位置再将消息写入。
  3. readIndex,writeIndex很重要,程序退出时要将这些信息(meta data)写到另外的磁盘文件(元信息文件)中,程序启动时首先读取元信息文件,在根据元信息文件中的readIndex writeIndex操作存储信息的文件。
  4. 由于操作系统层也有缓存,调用file.Write()写入的信息,也可能只是存在缓存中并没有同步到磁盘,需要显示调用file.sync()才可以强制要求操作系统把缓存同步到磁盘。可以通过指定创建diskQueue时传入的syncEvery,syncTimeout来控制调用file.sync()的频率。syncTimeout是指每隔syncTimeout秒调用一次file.sync(),syncEvery是指每当写入syncEvery个消息后调用一次file.sync()。这两个参数都可以在启动nsqd程序时通过命令行指定。

网络架构

nsq是一个可靠的、高性能的服务端网络程序,通过阅读nsqd的源码来学习如何搭建一个可靠的网络服务端程序。

客户端已成功的与服务器建立链接了,每一个客户端建立连接后,nsqd都会创建一个Client接口体,该结构体内保存一些client的状态信息。 
每一个Client都会有两个goroutine,一个goroutine负责读取客户端主动发送的各种命令,解析命令,处理命令并将处理结果回复给客户端。 
另一个goutine负责定时发送心跳信息给客户端,如果客户端订阅某个channel的话则将channel中的将消息通过网络发送给客户端。

如果服务端不需要主动推送大量消息给客户端,一个连接只需要开一个goroutine处理请求并发送回复就可以了,这是最简单的方式。开启两个goroutine操作同一个conn的话就需要注意加锁了。

我们来看下NSQ中几个比较重要的命令:

  • NOP 心跳回复,没有实际意义
  • PUB 发布一个消息到 话题(topic) 
    PUB <topic_name>\n [ 四字节消息的大小 ][ 消息的内容 ]
  • SUB 订阅话题(topic) /通道(channel) 
    SUB <topic_name> <channel_name>\n
  • RDY 更新 RDY 状态 (表示客户端已经准备好接收N 消息) 
    RDY <count>\n
  • FIN 完成一个消息 (表示成功处理) 
    FIN <message_id>\n

生产者产生消息的过程比较简单,就是一个PUB命令,先读取四字节的消息大小,然后根据消息大小读取消息内容,然后将内容写到topic.MessageChan中。 
我们重点来看下消费者是如何从nsq中读取消息的。 
1. 消费者首先需要发送SUB命令,告诉nsqd它想订阅哪个Channel,然后nsqd将该Client与Channel建立对应关系。 
2. 消费者发送RDY命令,告诉服务端它以准备好接受count个消息,服务端则向消费者发送count个消息,如果消费者想继续接受消息就需要不断发送RDY命令告诉服务端自己准备好接受消息(类似TCP协议中滑动窗口的概念,消费者并不是按照顺序一个个的消费消息,NSQD最多可以同时count个消息给消费者,每推送给消费者一个消息count数目减一,当消费者处理完消息回复FIN指令时count+1)。


最新文章

  1. 记一次.NET代码重构
  2. eclipse 启动到loading workbench... 自动关闭
  3. xpath爬取网页评论,网址的的调用方法,数据库特殊字符的替换
  4. git的简单使用
  5. JBPM的引擎内核学习
  6. 深入浅出Spring(五) SpringMVC
  7. vijosP1359 Superprime
  8. 深入理解BFC和Margin Collapse
  9. c语言typedef与define的相同
  10. java模式之模板模式——抽象类
  11. 【NPR】铅笔画
  12. 有关mysql索引
  13. Elasticsearch -- 索引管理
  14. 20165237 预备作业3 Linux安装及学习
  15. Git在eclipse中的使用,克隆导入eclipse项目
  16. 使用nginx搭建rtmp服务器
  17. Python3+pyshark捕获数据包并保存为文件
  18. S5PV210初始化系统时钟
  19. 阿里云oss上传文件如何支持https?
  20. Codeforces Beta Round #14 (Div. 2) D. Two Paths 树形dp

热门文章

  1. 深入浅出web服务器与python应用程序之间的联系
  2. 重定向和servlet生命周期
  3. java向上转型和向下转型1
  4. Day4_迭代器
  5. Django error信息邮件通知功能配置部署
  6. 初探Apache Beam
  7. Java 面试知识点解析(一)——基础知识篇
  8. Java枚举enum以及应用:枚举实现单例模式
  9. CentOS 6 安装HBase集群教程
  10. 深入NGINX:nginx高性能的实现原理