做网站有哪些语言,定制型网站开发,做网站公司怎么开拓更多业务,网店推广软文范例目录
基本介绍
数据结构
消息
消费组
消费者
基本使用命令
概述
xadd 命令
xtrim 命令 xdel 命令
xlen 命令
xrange 命令
xread 命令
xgroup 命令
xreadgroup 命令
xack 命令 基本介绍
Redis stream#xff08;流#xff09;是一种数据结构#xff0c;其…目录
基本介绍
数据结构
消息
消费组
消费者
基本使用命令
概述
xadd 命令
xtrim 命令 xdel 命令
xlen 命令
xrange 命令
xread 命令
xgroup 命令
xreadgroup 命令
xack 命令 基本介绍
Redis stream流是一种数据结构其作用类似于仅追加日志但也实现了多个操作来克服典型仅追加日志的一些限制。其中包括O1时间的随机访问和复杂的消费策略如消费者群体。 您可以使用流实时记录和同时联合事件。
Redis 为每个stream流条目生成一个唯一的 ID。可以在以后使用这些 ID 检索其关联的条目或读取和处理流中的所有后续条目。 Redis Stream 主要用于消息队列MQMessage QueueRedis 本身是有一个 Redis 发布订阅 (pub/sub) 来实现消息队列的功能但它有个缺点就是消息无法持久化如果出现网络断开、Redis 宕机等消息就会被丢弃。Redis Stream 提供了消息的持久化和主备复制功能可以让任何客户端访问任何时刻的数据并且能记住每一个客户端的访问位置还能保证消息不丢失。stream 有一个消息链表将所有加入的消息都串起来每个消息都有一个唯一的 ID 和对应的内容。消息是持久化的Redis 重启后内容还在。 数据结构
Redis Stream 的结构如下所示它有一个消息链表将所有加入的消息都串起来每个消息都有一个唯一的 ID 和对应的内容 消息 每个 Stream 都有唯一的名称它就是 Redis 的 key在我们首次使用 xadd 指令追加消息时自动创建。XADD key ID field value[field value ...]
消息 ID消息 ID 的形式是 timestampInMillis-sequence例如 1527846880572-5它表示当前的消息在毫米时间戳 1527846880572 时产生并且是该毫秒内产生的第5条消息。消息 ID 可以由服务器自动生成也可以由客户端自己指定但是形式必须是整数-整数而且必须是后面加入的消息的 ID 要大于前面的消息 ID。消息内容消息内容就是键值对形如 hash 结构的键值对。
消费组
每个 Stream 都可以挂多个消费组Consumer Group每个消费组会有个游标last_delivered_id在 Stream 数组之上往前移动表示当前消费组已经消费到哪条消息了。每个消费组都有一个 Stream 内唯一的名称消费组不会自动创建它需要单独的指令xgroup create进行创建需要指定从 Stream 的某个消息ID开始消费这个 ID 用来初始化 last_delivered_id 变量。每个消费组的状态都是独立的相互不受影响。也就是说同一份 Stream 内部的消息会被每个消费组都消费到。同一个消费组可以挂接多个消费者Consumer这些消费者之间是竞争关系任意一个消费者读取了消息都会使游标 last_delivered_id 往前移动。每个消费者有一个组内唯一名称。
消费者
消费者内部会有一个状态变量pending_ids维护消费者的未确认的 id。 pending_ids 记录了当前已经被客户端读取的消息但是还没有ack(Acknowledge character确认字符。如果客户端没有 ack这个变量里面的消息 ID 就会越来越多一旦某个消息被 ack它就开始减少。这个 pending_ids 变量在 Redis 官方被称为 PEL也就是 Pending Entries List这是一个核心的数据结构它用来确保客户端至少消费了消息一次而不会在网络传输的中途丢失了而没被处理。
基本使用命令
概述
消息队列相关命令
XADD - 添加消息到末尾XTRIM - 对流进行修剪限制长度XDEL - 删除消息XLEN - 获取流包含的元素数量即消息长度XRANGE - 获取消息列表会自动过滤已经删除的消息XREVRANGE - 反向获取消息列表ID 从大到小XREAD - 以阻塞或非阻塞方式获取消息列表
消费者组相关命令
XGROUP CREATE - 创建消费者组XREADGROUP GROUP - 读取消费者组中的消息XACK - 将消息标记为已处理XGROUP SETID - 为消费者组设置新的最后递送消息IDXGROUP DELCONSUMER - 删除消费者XGROUP DESTROY - 删除消费者组XPENDING - 显示待处理消息的相关信息XCLAIM - 转移消息的归属权XINFO - 查看流和消费者组的相关信息XINFO GROUPS - 打印消费者组的信息XINFO STREAM - 打印流信息
xadd 命令
XADD 命令将指定的流条目追加到指定 key 的流中。如果 key 不存在将使用流的条目自动创建 key。 一个条目是由一组键值对组成的它基本上是一个小的字典。键值对以用户给定的顺序存储并且读取流的命令如 XRANGE 或者 XREAD可以保证按照通过 XADD 添加的顺序返回。 XADD key ID field value [field value ...]
key 队列名称如果不存在就创建ID 消息 id我们使用 * 表示由 redis 生成可以自定义但是要自己保证递增性。field value 记录。
redis XADD mystream * name Sara surname OConnor
1601372323627-0
redis XADD mystream * field1 value1 field2 value2 field3 value3
1601372323627-1
redis XLEN mystream
(integer) 2
redis XRANGE mystream -
1) 1) 1601372323627-02) 1) name2) Sara3) surname4) OConnor
2) 1) 1601372323627-12) 1) field12) value13) field24) value25) field36) value3 返回值该命令返回添加的条目的 ID。如果 ID 参数传的是*那么 ID 是自动生成的否则命令仅返回用户在插入期间指定的相同的 ID。 xtrim 命令
XTRIM 将流裁剪为指定数量的项目如有需要将驱逐旧的项目ID较小的项目。
XTRIM key MAXLEN [~] count
key 队列名称MAXLEN 长度count 数量
127.0.0.1:6379 XADD mystream * field1 A field2 B field3 C field4 D
1601372434568-0
127.0.0.1:6379 XTRIM mystream MAXLEN 2
(integer) 0
127.0.0.1:6379 XRANGE mystream -
1) 1) 1601372434568-02) 1) field12) A3) field24) B5) field36) C7) field48) D 返回值返回从流中删除的条目数。 xdel 命令
从指定流中移除指定的条目并返回成功删除的条目的数量。在传递的ID不存在的情况下返回的数量可能与传递的ID数量不同。
XDEL key ID[ID ...]key队列名称。ID消息 ID XADD mystream * a 1
1538561698944-0XADD mystream * b 2
1538561700640-0XADD mystream * c 3
1538561701744-0XDEL mystream 1538561700640-0
(integer) 1
127.0.0.1:6379 XRANGE mystream -
1) 1) 1538561698944-02) 1) a2) 1
2) 1) 1538561701744-02) 1) c2) 3 返回值删除的条目数量。 xlen 命令
返回流中的条目数。如果指定的key不存在则此命令返回0就好像该流为空。 与其他的Redis类型不同零长度流是可能的所以你应该调用TYPE或者EXISTS来检查一个key是否存在。一旦内部没有任何的条目例如调用XDEL后流不会被自动删除因为可能还存在与其相关联的消费者组。 redis XADD mystream * item 1
1601372563177-0
redis XADD mystream * item 2
1601372563178-0
redis XADD mystream * item 3
1601372563178-1
redis XLEN mystream
(integer) 3 返回值流中包含的条目数量 xrange 命令
xrange命令返回流中满足给定ID范围的条目。范围由最小和最大ID指定。所有ID在指定的两个ID之间或与其中一个ID相等闭合区间的条目将会被返回。
XRANGE key start end [COUNT count]
key 队列名start 开始值 - 表示最小值end 结束值 表示最大值count 数量
redis XADD writers * name Virginia surname Woolf
1601372577811-0
redis XADD writers * name Jane surname Austen
1601372577811-1
redis XADD writers * name Toni surname Morrison
1601372577811-2
redis XADD writers * name Agatha surname Christie
1601372577812-0
redis XADD writers * name Ngozi surname Adichie
1601372577812-1
redis XLEN writers
(integer) 5
redis XRANGE writers - COUNT 2
1) 1) 1601372577811-02) 1) name2) Virginia3) surname4) Woolf
2) 1) 1601372577811-12) 1) name2) Jane3) surname4) Austen 返回值该命令返回ID与指定范围匹配的条目。返回的条目是完整的这意味着ID和所有组成条目的字段都将返回。此外返回的条目及其字段和值的顺序与使用XADD添加它们的顺序完全一致。 xread 命令
从一个或者多个流中读取数据仅返回ID大于调用者报告的最后接收ID的条目。此命令有一个阻塞选项用于等待可用的项目类似于BRPOP或者BZPOPMIN等等。
XREAD[COUNT count][BLOCK milliseconds] STREAMS key[key ...]id[id ...]
count数量。milliseconds可选阻塞毫秒数没有设置就是非阻塞模式。key 队列名。id消息 ID。
redis XREAD COUNT 2 STREAMS mystream writers 0-0 0-0
1) 1) mystream2) 1) 1) 1526984818136-02) 1) duration2) 15323) event-id4) 55) user-id6) 77828132) 1) 1526999352406-02) 1) duration2) 8123) event-id4) 95) user-id6) 388234
2) 1) writers2) 1) 1) 1526985676425-02) 1) name2) Virginia3) surname4) Woolf2) 1) 1526985685298-02) 1) name2) Jane3) surname4) Austen 返回值该命令返回一个结果数组返回数组的每个元素都是一个由两个元素组成的数组键名和为该键报告的条目。报告的条目是完整的流条目具有ID以及所有字段和值的列表。返回的条目及其字段和值的顺序与使用XADD添加它们的顺序完全一致。 当使用BLOCK时超时时将返回一个空回复nil。 xgroup 命令
使用 XGROUP CREATE 创建消费者组语法格式
XGROUP [CREATE key groupname id-or-$] [SETID key groupname id-or-$] [DESTROY key groupname] [DELCONSUMER key groupname consumername]
key 队列名称如果不存在就创建groupname 组名。$ 表示从尾部开始消费只接受新消息当前 Stream 消息会全部忽略。
从头开始消费:
XGROUP CREATE mystream consumer-group-name 0-0
从尾部开始消费:
XGROUP CREATE mystream consumer-group-name $
xreadgroup 命令
使用 XREADGROUP GROUP 读取消费组中的消息语法格式
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]
group 消费组名consumer 消费者名。count 读取数量。milliseconds 阻塞毫秒数。key 队列名。ID 消息 ID。
xack 命令
XACK命令用于从流的消费者组的待处理条目列表简称PEL中删除一条或多条消息。当一条消息交付到某个消费者时它将被存储在PEL中等待处理这通常出现在作为调用XREADGROUP命令的副作用或者一个消费者通过调用XCLAIM命令接管消息的时候。
一旦消费者成功地处理完一条消息它应该调用XACK这样这个消息就不会被再次处理且作为一个副作用关于此消息的PEL条目也会被清除从Redis服务器释放内存。
XACK key group ID[ID ...] 返回值该命令返回成功确认的消息数。某些消息ID可能不再是PEL的一部分例如因为它们已经被确认而且XACK不会把他们算到成功确认的数量中。