分布式id生成器

有时我们需要能够生成类似MySQL自增ID这样不断增大,同时又不会重复的id。以支持业务中的高并发场景。比较典型的,电商促销时,短时间内会有大量的订单涌入到系统,比如每秒10w+。明星出轨时,会有大量热情的粉丝发微博以表心意,同样会在短时间内产生大量的消息。

在插入数据库之前,我们需要给这些消息、订单先打上一个ID,然后再插入到我们的数据库。对这个id的要求是希望其中能带有一些时间信息,这样即使我们后端的系统对消息进行了分库分表,也能够以时间顺序对这些消息进行排序。

Twitter的snowflake算法是这种场景下的一个典型解法。先来看看snowflake是怎么一回事

snowflake中的比特位分布

首先确定我们的数值是64位,int64类型,被划分为四部分,不含开头的第一个bit,因为这个bit是符号位。用41位来表示收到请求时的时间戳,单位为毫秒,然后五位来表示数据中心的id,然后再五位来表示机器的实例id,最后是12位的循环自增id(到达1111,1111,1111后会归0)。

这样的机制可以支持我们在同一台机器上,同一毫秒内产生2 ^ 12 = 4096条消息。一秒共409.6万条消息。从值域上来讲完全够用了。

数据中心加上实例id共有10位,可以支持我们每数据中心部署32台机器,所有数据中心共1024台实例。

表示timestamp的41位,可以支持我们使用69年。当然,我们的时间毫秒计数不会真的从1970年开始记,那样我们的系统跑到2039/9/7 23:47:35就不能用了,所以这里的timestamp实际上只是相对于某个时间的增量,比如我们的系统上线是2018-08-01,那么我们可以把这个timestamp当作是从2018-08-01 00:00:00.000的偏移量。

worker_id分配

timestampdatacenter_idworker_idsequence_id这四个字段中,timestampsequence_id是由程序在运行期生成的。但datacenter_idworker_id需要我们在部署阶段就能够获取得到,并且一旦程序启动之后,就是不可更改的了(想想,如果可以随意更改,可能被不慎修改,造成最终生成的id有冲突)。

一般不同数据中心的机器,会提供对应的获取数据中心id的API,所以datacenter_id我们可以在部署阶段轻松地获取到。而worker_id是我们逻辑上给机器分配的一个id,这个要怎么办呢?比较简单的想法是由能够提供这种自增id功能的工具来支持,比如MySQL:

mysql> insert into a (ip) values("10.1.2.101");
Query OK, 1 row affected (0.00 sec) mysql> select last_insert_id();
+------------------+
| last_insert_id() |
+------------------+
| 2 |
+------------------+
1 row in set (0.00 sec)

从MySQL中获取到worker_id之后,就把这个worker_id直接持久化到本地,以避免每次上线时都需要获取新的worker_id。让单实例的worker_id可以始终保持不变。

当然,使用MySQL相当于给我们简单的id生成服务增加了一个外部依赖。依赖越多,我们的服务的可运维性就越差。

考虑到集群中即使有单个id生成服务的实例挂了,也就是损失一段时间的一部分id,所以我们也可以更简单暴力一些,把worker_id直接写在worker的配置中,上线时,由部署脚本完成worker_id字段替换。

开源实例

标准snowflake实现

github.com/bwmarrin/snowflake 是一个相当轻量化的snowflake的Go实现。其文档对各位使用的定义见图 6-2所示。

图 6-2 snowflake库

和标准的snowflake完全一致。使用上比较简单:

package main

import (
"fmt"
"os" "github.com/bwmarrin/snowflake"
) func main() {
n, err := snowflake.NewNode(1)
if err != nil {
println(err)
os.Exit(1)
} for i := 0; i < 3; i++ {
id := n.Generate()
fmt.Println("id", id)
fmt.Println(
"node: ", id.Node(),
"step: ", id.Step(),
"time: ", id.Time(),
"\n",
)
}
}

当然,这个库也给我们留好了定制的后路,其中预留了一些可定制字段:

    // Epoch is set to the twitter snowflake epoch of Nov 04 2010 01:42:54 UTC
// You may customize this to set a different epoch for your application.
Epoch int64 = 1288834974657 // Number of bits to use for Node
// Remember, you have a total 22 bits to share between Node/Step
NodeBits uint8 = 10 // Number of bits to use for Step
// Remember, you have a total 22 bits to share between Node/Step
StepBits uint8 = 12

Epoch就是本节开头讲的起始时间,NodeBits指的是机器编号的位长,StepBits指的是自增序列的位长。

sonyflake

sonyflake是Sony公司的一个开源项目,基本思路和snowflake差不多,不过位分配上稍有不同,见图 6-3

图 6-3 sonyflake

这里的时间只用了39个bit,但时间的单位变成了10ms,所以理论上比41位表示的时间还要久(174年)。

Sequence ID和之前的定义一致,Machine ID其实就是节点id。sonyflake与众不同的地方在于其在启动阶段的配置参数:

func NewSonyflake(st Settings) *Sonyflake

Settings数据结构如下:

type Settings struct {
StartTime time.Time
MachineID func() (uint16, error)
CheckMachineID func(uint16) bool
}

StartTime选项和我们之前的Epoch差不多,如果不设置的话,默认是从2014-09-01 00:00:00 +0000 UTC开始。

MachineID可以由用户自定义的函数,如果用户不定义的话,会默认将本机IP的低16位作为machine id

CheckMachineID是由用户提供的检查MachineID是否冲突的函数。这里的设计还是比较巧妙的,如果有另外的中心化存储并支持检查重复的存储,那我们就可以按照自己的想法随意定制这个检查MachineID是否冲突的逻辑。如果公司有现成的Redis集群,那么我们可以很轻松地用Redis的集合类型来检查冲突。

redis 127.0.0.1:6379> SADD base64_encoding_of_last16bits MzI0Mgo=
(integer) 1
redis 127.0.0.1:6379> SADD base64_encoding_of_last16bits MzI0Mgo=
(integer) 0

使用起来也比较简单,有一些逻辑简单的函数就略去实现了:

package main

import (
"fmt"
"os"
"time" "github.com/sony/sonyflake"
) func getMachineID() (uint16, error) {
var machineID uint16
var err error
machineID = readMachineIDFromLocalFile()
if machineID == 0 {
machineID, err = generateMachineID()
if err != nil {
return 0, err
}
} return machineID, nil
} func checkMachineID(machineID uint16) bool {
saddResult, err := saddMachineIDToRedisSet()
if err != nil || saddResult == 0 {
return true
} err := saveMachineIDToLocalFile(machineID)
if err != nil {
return true
} return false
} func main() {
t, _ := time.Parse("2006-01-02", "2018-01-01")
settings := sonyflake.Settings{
StartTime: t,
MachineID: getMachineID,
CheckMachineID: checkMachineID,
} sf := sonyflake.NewSonyflake(settings)
id, err := sf.NextID()
if err != nil {
fmt.Println(err)
os.Exit(1)
} fmt.Println(id)
}

分布式锁

在单机程序并发或并行修改全局变量时,需要对修改行为加锁以创造临界区。为什么需要加锁呢?我们看看在不加锁的情况下并发计数会发生什么情况:

package main

import (
"sync"
) // 全局变量
var counter int func main() {
var wg sync.WaitGroup
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
counter++
}()
} wg.Wait()
println(counter)
}

多次运行会得到不同的结果:

❯❯❯ go run local_lock.go
945
❯❯❯ go run local_lock.go
937
❯❯❯ go run local_lock.go
959

基于Redis的setnx

在分布式场景下,我们也需要这种“抢占”的逻辑,这时候怎么办呢?我们可以使用Redis提供的setnx命令:

package main

import (
"fmt"
"time" "github.com/go-redis/redis"
"github.com/gofrs/uuid"
) // 声明一个全局的rdb变量
var rdb *redis.Client // 初始化连接
func initClient() (err error) {
rdb = redis.NewClient(&redis.Options{
Addr: "localhost:6379",
Password: "zhy1996", // no password set
DB: 0, // use default DB
}) _, err = rdb.Ping().Result()
if err != nil {
return err
}
return nil
} var unlock_lua = `
if redis.call("get",KEYS[1]) == ARGV[1] then
return redis.call("del",KEYS[1])
else
return 0
end
` func Lock(key, value string, expiration time.Duration) (bool, error) {
is, err := rdb.SetNX(key, value, expiration).Result()
if err != nil {
return false, fmt.Errorf("redis setnx failed")
}
return is, nil
} func UnLock(key, value string) (bool, error) {
res, err := rdb.Eval(unlock_lua, []string{key}, value).Result()
if err != nil {
return false, err
}
v, ok := res.(int64)
if !ok {
return false, fmt.Errorf("lua script return is not int")
}
if v == 0 {
return false, nil
}
return true, nil
} func main() {
err := initClient()
if err != nil {
fmt.Println(err)
} ul, _ := uuid.NewV4()
value := ul.String() for i := 0; i < 10; i++ {
is, err := Lock("lock_1", value, time.Second)
if err != nil {
fmt.Println(err)
return
}
fmt.Println("是否拿到锁:", is)
} res, err := UnLock("lock_1", value)
if err != nil {
fmt.Println(err)
return
}
fmt.Println("解锁:", res)
}

看看运行结果:

是否拿到锁: true
是否拿到锁: false
是否拿到锁: false
是否拿到锁: false
是否拿到锁: false
是否拿到锁: false
是否拿到锁: false
是否拿到锁: false
是否拿到锁: false
是否拿到锁: false
解锁: true

通过代码和执行结果可以看到,我们远程调用setnx实际上和单机的trylock非常相似,如果获取锁失败,那么相关的任务逻辑就不应该继续向前执行。

setnx很适合在高并发场景下,用来争抢一些“唯一”的资源。比如交易撮合系统中卖家发起订单,而多个买家会对其进行并发争抢。这种场景我们没有办法依赖具体的时间来判断先后,因为不管是用户设备的时间,还是分布式场景下的各台机器的时间,都是没有办法在合并后保证正确的时序的。哪怕是我们同一个机房的集群,不同的机器的系统时间可能也会有细微的差别。

所以,我们需要依赖于这些请求到达Redis节点的顺序来做正确的抢锁操作。如果用户的网络环境比较差,那也只能自求多福了。

基于etcd

etcd是分布式系统中,功能上与ZooKeeper类似的组件,这两年越来越火了。上面基于ZooKeeper我们实现了分布式阻塞锁,基于etcd,也可以实现类似的功能:

package main

import (
"log" "github.com/zieckey/etcdsync"
) func main() {
m, err := etcdsync.New("/lock", 10, []string{"http://127.0.0.1:2379"})
if m == nil || err != nil {
log.Printf("etcdsync.New failed")
return
}
err = m.Lock()
if err != nil {
log.Printf("etcdsync.Lock failed")
return
} log.Printf("etcdsync.Lock OK")
log.Printf("Get the lock. Do something here.") err = m.Unlock()
if err != nil {
log.Printf("etcdsync.Unlock failed")
} else {
log.Printf("etcdsync.Unlock OK")
}
}

etcd中没有像ZooKeeper那样的Sequence节点。所以其锁实现和基于ZooKeeper实现的有所不同。在上述示例代码中使用的etcdsync的Lock流程是:

  1. 先检查/lock路径下是否有值,如果有值,说明锁已经被别人抢了
  2. 如果没有值,那么写入自己的值。写入成功返回,说明加锁成功。写入时如果节点被其它节点写入过了,那么会导致加锁失败,这时候到 3
  3. watch /lock下的事件,此时陷入阻塞
  4. /lock路径下发生事件时,当前进程被唤醒。检查发生的事件是否是删除事件(说明锁被持有者主动unlock),或者过期事件(说明锁过期失效)。如果是的话,那么回到 1,走抢锁流程。

值得一提的是,在etcdv3的API中官方已经提供了可以直接使用的锁API,读者可以查阅etcd的文档做进一步的学习。

参考文章:《go语言高级编程》

最新文章

  1. CSS三个定位——常规、浮动、绝对定位
  2. sql 判断 函数 存储过程是否存在的方法
  3. table td 文字超出显示省略号
  4. 百度富文本编辑器UEDITOR
  5. ListFragment创建及其生命周期
  6. MapReduce实例-基于内容的推荐(一)
  7. JavaScript笔记基础篇(二)
  8. HTML DOM appendChild() 方法
  9. ie下不显示图片
  10. Asp.net Core WebApi 返回JSON自动驼峰格式化问题
  11. Android开发学习总结(一)——搭建最新版本的Android开发环境
  12. IOS 页面之间的传值(主讲delegate)
  13. js随机出现2个数字
  14. 网络协议 终章 - GTP 协议:复杂的移动网络
  15. kubernetes安装
  16. Vue 组件&amp;组件之间的通信 之 父组件向子组件传值
  17. 水晶报表使用IEnumerable&lt;T&gt;数据源
  18. Hibernate学习(二)———— 一级缓存和三种状态解析
  19. Anaconda 安装、使用
  20. Shell中怎么获取当前日期和时间

热门文章

  1. mysql 版本在springboot 中定义位置
  2. Drupal 远程代码执行漏洞(CVE-2019-6339)
  3. 租了一台华为云耀云服务器,却直接被封公网ip,而且是官方的bug导致!
  4. JavaScript关于返回数据类型一个小小的笔记
  5. openstack June all-in-one 安装手册
  6. HTML5(十一)——WebSocket 基础教程
  7. 拦截器HandlerInterceptorAdapter的postHandle和afterCompletion无法获取response返回值问题
  8. Android系统编程入门系列之服务Service中的进程间通信
  9. CobaltStrike去除流量特征
  10. npm 基本命令的使用