概述

相较于Redis4.0,Redis5.0增加了很多新的特性,而streams是其中最重要的特性之一。streams是redis 的一种基本数据结构,它是一个新的强大的支持多播的可持久化的消息队列,在设计上借鉴了kafaka。streams的数据类型本身非常简单,有点类似于hash结构,但是它的额外特性异常强大且复杂:

  • 支持持久化。streams能持久化存储数据,不同于 pub/sub 机制和 list  消息被消费后就会被删除,streams消费过的数据会被持久化的保存在历史中。
  • 支持多播。 这一点跟  pub/sub 有些类似。
  • 支持消费者组。streams 允许同一消费组内的消费者竞争消息,并提供了一系列机制允许消费者查看自己的历史消费消息。并允许监控streams的消费者组信息,消费者组内消费者信息,也可以监控streams内消息的状态。

基础内容

数据 ID

streams 提供了默认的id模式用来唯一标识streams中的每一条数据,由两部分组成:
 <millisecondsTime>-<sequenceNumber> 
millisecondsTime是redis服务所在机器的时间,sequenceNumber用于同一毫秒创建的数据。需要注意的一点是streams的id总是单调增长的,即使redis服务所在的服务器时间异常。如果当前的毫秒数小于以前的毫秒数,就会使用历史记录中最大的毫秒数,然后序列号递增。而这样做的原因是因为streams的机制允许根据时间区间或者某一个时间节点或者某一id查找数据。

向streams插入数据

streams 的基础写命令为 XADD ,其语法为 XADD key ID field value [field value ...]

127.0.0.1:6379> XADD mystream * name dwj age 18
"1574925508730-0"
127.0.0.1:6379>

上面的例子使用 XADD 向名为 mystream 的streams中添加了一条数据,ID使用*表示使用streams使用默认的ID,在本例中redis返回的 1574925508730-0 就是redis为我们插入的数据生成的ID。

另外streams 查看streams长度的命令为 XLEN

127.0.0.1:6379> XLEN mystream
(integer) 3
127.0.0.1:6379>

从streams中读取数据

从streams中读取数据会比写数据复杂很多,用日志文件进行对比,我们可以查看历史日志,可以根据范围查询日志,我们可以通过unix的命令 tail -f 来监听日志,可以多个用户查看到同一份日志,也可以多个用户只能查看到自己有权限查看的那一部分日志。

按范围查询: XRANGE 和 XREVRANGE

首先来介绍一下 根据范围查询,这两种操作都比较简单,以 XRANGE 为例,它的语法格式为 XRANGE key start end [COUNT count] , 我们只需要提供两个id, start 和 end ,返回的将是一个包含 start 和 end 的闭区间。两个特殊的ID - 和 + 分别表示可能的最小ID和最大ID。

127.0.0.1:6379> XRANGE mystream - +
1) 1) "1574835253335-0"
2) 1) "name"
2) "bob"
3) "age"
4) "23"
2) 1) "1574925508730-0"
2) 1) "name"
2) "dwj"
3) "age"
4) "18"
127.0.0.1:6379>

  

我们前边提到过数据id中包含了创建数据的时间信息,这意味着我们可以根据时间范围查询数据,为了根据时间范围查询,我们省略掉ID的序列号部分,如果省略,对于start ID会使用0作为默认的序列号,对于end ID会使用最大序列号作为默认值,这样的话我们使用两个unix时间戳去查询数据就可以得到那个时间区间内所有的数据。

) ) "1574835253335-0"
) ) "name"
) "bob"
) "age"
) ""
127.0.0.1:>

可能还会有同学注意到语法的最后边还有 count 参数,这个参数允许我们一次只返回固定数量的数据,然后根据返回数据的last_id,作为下一次查询的start,这样就允许我们在一个量非常大的streams里批量返回数据。
XREVRANGE命令与XRANGE相同,但是以相反的顺序返回元素,就不重复介绍了。

通过XREAD读取数据

XREAD允许我们从某一结点开始从streams中读取数据,它的语法为 XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] id [id ...] ,我们在这里主要将的是通过 XREAD 来订阅到达streams新的数据。这种操作可能跟REDIS中原有的 pub/sub 机制或者 阻塞队列 的概念有些类似,都是等待一个key然后获取到新的数据,但是跟这两种有着本质的差别:

  • streams跟 pub/sub 和 阻塞队列 允许多个客户端一起等待数据,默认情况下,streams会把消息推送给所有等待streams数据的客户端,这个能力跟 pub/sub 有点类似,但是streams也允许把消息通过竞争机制推送给其中的一个客户端(这种模式需要用到消费者组的概念,会在后边讲到)。
  • pub/sub 的消息是fire and forget并且从不存储,你只可以订阅到在你订阅时间之后产生的消息,并且消息只会推送给客户端一次,不能查看历史记录。以及使用 阻塞队列 时,当客户端收到消息时,这个元素会从队列中弹出,换句话说,不能查看某个消费者消费消息的历史。而在streams中所有的消息会被无限期的加入到streams中(消息可以被显式的删除并且存在淘汰机制),客户端需要记住收到的最后一条消息,用于获取到节点之后的新消息。
  • Streams 消费者组提供了一种Pub/Sub或者阻塞列表都不能实现的控制级别,同一个Stream不同的群组,显式地确认已经处理的项目,检查待处理的项目的能力,申明未处理的消息,以及每个消费者拥有连贯历史可见性,单个客户端只能查看自己过去的消息历史记录。
    从streams中读取数据
    127.0.0.1:> XREAD COUNT  STREAMS mystream
    ) ) "mystream"
    ) ) ) "1574835253335-0"
    ) ) "name"
    ) "bob"
    ) "age"
    ) ""
    ) ) "1574925508730-0"
    ) ) "name"
    ) "dwj"
    ) "age"
    ) ""
    127.0.0.1:>

    同list结构一样,streams也提供了阻塞读取的命令

    XREAD BLOCK 0 STREAMS mystream

    在上边的命令中指定了BLOCK选项,超时时间为0毫秒(意味着永不会过期)。此外,这个地方使用了特殊的id  $ ,这个特殊的id代表着当前streams中最大的id,这就意味着你只会读取streams中在你监听时间以后的消息。有点类似于Unix的 tail -f 。另外XREAD可以同时监听多个流中的数据。

消费者组

如果我们想要的不是多个客户端处理相同的消息,而是多个客户端从streams中获取到不同的消息进行处理。也就是我们常用的生产者-消费者模型。假如想象我们具有两个生产者p1,p2,三个消费者c1,c2,c3以及7个商品。我们想按照下面的效果进行处理

p1 =>item1 => c1
p2 =>item2 => c2
p1 =>item3 => c3
p2 =>item4 => c1
p1 =>item5 => c2
p2 =>item6 => c3
p1 =>item7 => c1

为了解决这种场景,redis使用了一个名为消费者的概念,有点类似于kafka,但只是表现上。消费者组就像是一个伪消费者,它从流内读取数据,然后分发给组内的消费者,并记录该消费者组消费了哪些数据,处理了那些数据,并提供了一系列功能。

  1. 每条消息都提供给不同的消费者,因此不可能将相同的消息传递给多个消费者。
  2. 消费者在消费者组中通过名称来识别,该名称是实施消费者的客户必须选择的区分大小写的字符串。这意味着即便断开连接过后,消费者组仍然保留了所有的状态,因为客户端会重新申请成为相同的消费者。 然而,这也意味着由客户端提供唯一的标识符。
  3. 每一个消费者组都有一个第一个ID永远不会被消费的概念,这样一来,当消费者请求新消息时,它能提供以前从未传递过的消息。
  4. 消费消息需要使用特定的命令进行显式确认,表示:这条消息已经被正确处理了,所以可以从消费者组中逐出。
  5. 消费者组跟踪所有当前所有待处理的消息,也就是,消息被传递到消费者组的一些消费者,但是还没有被确认为已处理。由于这个特性,当访问一个Stream的历史消息的时候,每个消费者将只能看到传递给它的消息。

它的模型类似于如下

| consumer_group_name: mygroup           |
| consumer_group_stream: somekey |
| last_delivered_id: - |
| |
| consumers: |
| "consumer-1" with pending messages |
| - |
| - |
| "consumer-42" with pending messages |
| ... (and so forth) |

从上边的模型中我们可以看出消费者组记录处理的最后一条消息,将消息分发给不同的消费者,每个消费者只能看到自己的消息。如果把消费者组看做streams的辅助数据结构,我们可以看出一个streams可以拥有多个消费者组,一个消费者组内可以拥有多个消费者。实际上,一个streams允许客户端使用XREAD读取的同时另一个客户端通过消费者群组读取数据。

创建一个消费者群组

我们首先创建一个包含了一些数据的streams

127.0.0.1:> XADD fruit * message apple
"1574935311149-0"
127.0.0.1:> XADD fruit * message banada
"1574935315886-0"
127.0.0.1:> XADD fruit * message pomelo
"1574935323628-0"

然后创建一个消费者组

127.0.0.1:> XGROUP CREATE fruit mygroup $
OK

注意我们需要指定一个id,这里我们使用的是特殊id $ ,我们也可以使用0或者一个unix时间戳,这样,消费者组只会读取这个节点之后的消息。

现在消费者组创建好了,我们可以使用XREADGROUP命令立即开始尝试通过消费者组读取消息。
 XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...] ,与 XREAD 类似,提供了BLOCK选项。假设指定消费者分别是Alice和Bob,来看看系统会怎样返回不同消息给Alice和Bob。

127.0.0.1:> XREADGROUP GROUP  mygroup Alice COUNT  STREAMS fruit >
) ) "fruit"
) ) ) "1574936034258-0"
) ) "message"
) "apple"
127.0.0.1:>

上边命令代表的信息是:我要通过 mygroup 读取streams  fruit 中的数据,我在群组中的身份是 Alice ,请给我一条数据。  > 操作符只在消费者组的上线文中有效,代表消息到目前为止没有交给其它消费者处理过。
我们也可以使用一个有效的id,在这种情况下,消费者组会告诉我们的历史待处理消息,而不会告诉我们新的消息。这个特性也是很有用的,当消费者因为某些原因重新启动后,我们可以查看自己的历史待处理消息,处理完待处理消息后再去处理新的消息。
我们可以通过 XACK 命令告诉消费者组某条消息已经被正确处理,不要显示在我的历史待处理消息列表中。 XACK 的语法为 XACK key group ID [ID ...]

127.0.0.1:> XACK fruit mygroup -
(integer)

有几件事需要记住:

  1. 消费者是在他们第一次被提及的时候自动创建的,不需要显式创建。
  2. 即使使用XREADGROUP,你也可以同时从多个key中读取,但是要让其工作,你需要给每一个Stream创建一个名称相同的消费者组。这并不是一个常见的需求,但是需要说明的是,这个功能在技术上是可以实现的。
  3. XREADGROUP命令是一个写命令,因为当它从Stream中读取消息时,消费者组被修改了,所以这个命令只能在master节点调用。

从永久失败中恢复

在一个消费者群组中可能存在多个消费者消费消息,但是也可能会存在某一个消费者永久退出消费者群组的情况,这样我们就需要一种机制,把该消费者的待处理消息分配给消费者群组的另一个消费者。这就需要我们具有查看待处理消息的能力以及把某个消息分配给指定消费者的能力。前者是通过一个叫 XPENDING 的命令,它的语法为 XPENDING key group [start end count] [consumer]

127.0.0.1:> XPENDING fruit mygroup
) (integer)
) "1574936042937-0"
) "1574936042937-0"
) ) ) "Alice"
) ""

上述返回结果代表的是消费者群组有1条待处理命令,待处理消息的起始id为 1574936042937-0 ,结束id为 1574936042937-0 ,名为 Alice 的消费者有一个待处理命令,可能有人会好奇我们在前边往 fruit 放入了3个水果,使用 XACK 处理了一个水果,消费者待处理列表中应该有两个水果,而事实上消费者群组的待处理列表为该群组下消费者待处理消息的合集,当有消费者通过群组获取消息的时候会改变消费者群组的状态,这也是前边提到的为什么 XREADGROUP 必须在master节点进行调用。
我们可以使用start end count 参数来查看某个范围内消息的状态

127.0.0.1:> XPENDING fruit mygroup - +  Alice
) ) "1574936042937-0"
) "Alice"
) (integer)
) (integer)
) ) "1574936052018-0"
) "Alice"
) (integer)
) (integer)

这样我们就看到了一条消息的详细信息,id为 1574936042937-0 的消息的消费者为 Alice ,它的pending时间为 903655 ,这个消息被分配了1次。
我们会发现第一条消息的处理时间有点长,我们怀疑 Alice 已经不能处理这条消息了,于是我们想把这条消息分配给 Bob ,这种场景下就需要用到了 XCLAIM 命令,它的语法为 XCLAIM ... ,其中min-idle-time为消息的最小空闲时间,只有消息的空闲时间大于这个值消息才会被分配,因为消息被分配的时候会重置消息的空闲时间,如果有同时把一条消息分配给两个客户端,只会第一条命令生效,因为当消息分配给第一个客户端的时候重置空闲时间,第二条命令则会失效。
我们也可以使用一个独立的进程来不断寻找超时的消息,并把它分配给活跃的消费者,不过需要注意的是,如果消息的分配次数达到某个阙值,不应该把消息再分配出去,而是应该放到别的地方。

streams的可观察性

streams具有不错的可观察性,前边的 XPENDING 命令允许我们查看streams在某个消费者群组内待处理消息的状态。但是我们想看的更多,比如在这个streams下有多少个group, 在这个group下有多少消费者。这就要用到 XINFO 命令:
查看 streams 信息:

127.0.0.1:> XINFO STREAM mystream
) "length"
) (integer)
) "radix-tree-keys"
) (integer)
) "radix-tree-nodes"
) (integer)
) "groups"
) (integer)
) "last-generated-id"
) "1574925508730-0"
) "first-entry"
) ) "1574835253335-0"
) ) "name"
) "bob"
) "age"
) ""
) "last-entry"
) ) "1574925508730-0"
) ) "name"
) "dwj"
) "age"
) ""

输出中会告诉我们streams的长度,群组数量,第一条和最后一条信息的详情。下面看一下streams下群组的信息:

127.0.0.1:> XINFO GROUPS fruit
) ) "name"
) "mygroup"
) "consumers"
) (integer)
) "pending"
) (integer)
) "last-delivered-id"
) "1574936052018-0"
) ) "name"
) "mygroup-1"
) "consumers"
) (integer)
) "pending"
) (integer)
) "last-delivered-id"
) "0-0"

我们可以从输出中看到 fruit 下有两个群组,群组的名称以及待处理消息的数量,处理的最后一条消息。我们可以在详细的查看下消费者群组内消费者的状态。

127.0.0.1:> XINFO CONSUMERS fruit mygroup
) ) "name"
) "Alice"
) "pending"
) (integer)
) "idle"
) (integer)
) ) "name"
) "Bob"
) "pending"
) (integer)
) "idle"
) (integer)
从输出中可以看到消费者待处理消息的数量以及消费者的闲置时间。

设置streams上限

如果从streams可以查看到历史记录,我们可能会有疑惑,如果streams无限期的加入内存会不会够用,一旦消息数量达到上限,将消息永久删除或者持久化到数据库都是有必要的,redis也提供了诸如此类场景的支持。
一种方法是我们使用 XADD 的时候指定streams的最大长度, XADD mystream MAXLEN ~ 1000 其中的数值前可以加上 ~ 标识不需要精确的将长度保持在1000,比1000多一些也可以接受。如果不使用该标识,性能会差一些。另一种方法是使用 XTRIM ,该命令也是使用 MAXLEN 选项, > XTRIM mystream MAXLEN ~ 10

一些特殊的id

前面提到了在streams API里边存在一些特殊的id。
首先是 - 和 + ,这两个ID在 XRANGE 命令中使用,分别代表最小的id和最大的id。 - 代表 0-1 , + 代表 18446744073709551615-18446744073709551615 ,从使用上方便了很多。在 XPENDING 等范围查询中都可以使用。
 $ 代表streams中当前存在的最大的id,在 XREAD 和 XGROUP 中代表只获取新到的消息。需要注意的是 $ 跟 + 的含义并不一致。
还有一个特殊的id是 > ,这个id只能够在 XREADGROUP 命令中使用,意味着在这个消费者群组中,从来没有分配给其他的消费者,所以总是使用 > 作为群组中的 last delivered ID 。

持久化,复制和消息安全性

与redis的其它数据结构一样,streams会异步复制到从节点,并持久化到AOF和RDB文件中,并且消费者群组的状态也会按照此机制进行持久化。
需要注意的几点是:

  • 如果消息的持久化以及状态很重要,则AOF必须使用强fsync配合(AOF记录每一条更改redis数据的命令,有很多种持久化机制,在这个地方要用到的是 appendfsync always  这样会严重降低Redis的速度)
  • 默认情况下,异步复制不能保证从节点的数据与主节点保持一致,在故障转移以后可能会丢失一些内容,这跟从节点从主节点接受数据的能力有关。
  • WAIT 命令可以用于强制将更改传输到一组从节点上。虽然这使得数据不太可能会丢失,但是redis的Sentinel和cluster在进行故障转移的时候不一定会使用具有最新数据的从节点,在一些特殊故障下,反而会使用缺少一些数据的从节点。
    因此在使用redis streams和消费者群组在设计程序的时候,确保了解你的应用程序在故障期间的应对策略,并进行相应地配置,评估它对你的程序是否足够安全。

从streams中删除数据

删除streams中的数据使用 XDEL 命令,其语法为 XDEL key ID [ID ...] ,需要注意的是在当前的实现中,在宏节点完全为空之前,内存并没有真正回收,所以你不应该滥用这个特性。

streams的性能

streams的不阻塞命令,比如 XRANGE 或者不使用BLOCK选项的 XREAD 和 XREADGROUP 跟redis普通命令一致,所以没有必要讨论。如果有兴趣的话可以在redis的文档中查看到对应命令的时间复杂度。streams命令的速度在一定范围内跟 set 是一致的, XADD 命令的速度非常快,在一个普通的机器上,一秒钟可以插入50w~100w条数据。
我们感兴趣的是在消费者群组的阻塞场景下,从通过 XADD 命令向streams中插入一条数据,到消费者通过群组读取到这条消息的性能。
为了测试消息从产生到消费间的延迟,我们使用ruby程序进行测试,将消息的产生时间作为消息的一个字段,然后把消息推送到streams中,客户端收到消息后使用当前时间跟生产时间进行对比,从而计算出消息的延迟时间。这个程序未进行性能优化,运行在一个双核的机器上,同时redis也运行在这台机器上,以此来模拟不是理想条件下的场景。消息每秒钟产生1w条,群组内有10个消费者消费数据。测试结果如下:

Processed between 0 and 1 ms -> 74.11%
Processed between 1 and 2 ms -> 25.80%
Processed between 2 and 3 ms -> 0.06%
Processed between 3 and 4 ms -> 0.01%
Processed between 4 and 5 ms -> 0.02%

99.9%的请求的延迟小于等于2毫秒,而且异常值非常接近平均值。另外需要注意的两点:

  1. 消费者每次处理1w条消息,这样增加了一些延迟,这样做是为了消费速度较慢的消费者能够保持保持消息流。
  2. 用来做测试的系统相比于现在的系统非常慢。

原文链接: https://redis.io/topics/streams-intro

Worktile官网:www.worktile.com

本文作者:Worktile 工程师 杜文杰

文章首发于「Worktile官方博客」,转载请注明来源。

最新文章

  1. 从Unity3D编译器升级聊起Mono
  2. 架构实例之Demo_JSP_JavaBean_Servlet
  3. Java final 修饰符知识点总结
  4. 纯css用图片代替checkbox和radio,无js实现方法
  5. MDX 占比同比环比
  6. Swift面向对象基础(中)——Swift中的方法
  7. busybox sz rz命令
  8. ubunut 12.04 (64bit) android编译环境搭建
  9. samba 问题Windows能看到文件夹但是不能打开
  10. Powershell创建数组
  11. navigationBar 背景色
  12. JQuery - 点击,滚动回到顶部 / 底部刷新回到顶部
  13. iOS开发tips-UINavigationBar的切换
  14. 一些java方面面试题,没事做做看看(带答案)
  15. JavaScript中闭包实现的私有属性的getter()和setter()方法
  16. 南阳理工oj_The Triangle
  17. 设置PATH和CLASSPATH
  18. Hibernate3 Criteria对象详解
  19. Spring Boot 实现 RabbitMQ 延迟消费和延迟重试队列
  20. markdown语法测试集合

热门文章

  1. docker安装sshd
  2. 死磕 java线程系列之线程池深入解析——未来任务执行流程
  3. zabbix导入数据库报错1046 (3D000) : No database selected
  4. SVD分解
  5. [Luogu5384][Cnoi2019] 雪松果树
  6. CSPS_101
  7. javaScipt类定义和实现
  8. win7/win10系列的office安装与激活
  9. java-optional-快速使用-教程
  10. Python 基础之 线程与进程