package concurrency

import (
    v3 "github.com/coreos/etcd/clientv3"
    "golang.org/x/net/context"
)

// STM is an interface for software transactional memory.
type STM interface {
    // Get returns the value for a key and inserts the key in the txn's read set.
    // If Get fails, it aborts the transaction with an error, never returning.
    Get(key string) string
    // Put adds a value for a key to the write set.
    Put(key, val string, opts ...v3.OpOption)
    // Rev returns the revision of a key in the read set.
    Rev(key string) int64
    // Del deletes a key.
    Del(key string)

    // commit attempts to apply the txn's changes to the server.
    commit() *v3.TxnResponse
    reset()
}

// stmError safely passes STM errors through panic to the STM error channel.
type stmError struct{ err error }

// NewSTMRepeatable initiates new repeatable read transaction; reads within
// the same transaction attempt always return the same data.
func NewSTMRepeatable(ctx context.Context, c *v3.Client, apply func(STM) error) (*v3.TxnResponse, error) {
    s := &stm{client: c, ctx: ctx, getOpts: []v3.OpOption{v3.WithSerializable()}}
    return runSTM(s, apply)
}

// NewSTMSerializable initiates a new serialized transaction; reads within the
// same transactiona attempt return data from the revision of the first read.
func NewSTMSerializable(ctx context.Context, c *v3.Client, apply func(STM) error) (*v3.TxnResponse, error) {
    s := &stmSerializable{
        stm:      stm{client: c, ctx: ctx},
        prefetch: make(map[string]*v3.GetResponse),
    }
    return runSTM(s, apply)
}

// NewSTMReadCommitted initiates a new read committed transaction.
func NewSTMReadCommitted(ctx context.Context, c *v3.Client, apply func(STM) error) (*v3.TxnResponse, error) {
    s := &stmReadCommitted{stm{client: c, ctx: ctx, getOpts: []v3.OpOption{v3.WithSerializable()}}}
    return runSTM(s, apply)
}

type stmResponse struct {
    resp *v3.TxnResponse
    err  error
}

func runSTM(s STM, apply func(STM) error) (*v3.TxnResponse, error) {
    outc := make(chan stmResponse, 1)
    go func() {
        defer func() {
            if r := recover(); r != nil {
                e, ok := r.(stmError)
                if !ok {
                    // client apply panicked
                    panic(r)
                }
                outc <- stmResponse{nil, e.err}
            }
        }()
        var out stmResponse
        for {
            s.reset()
            if out.err = apply(s); out.err != nil {
                break
            }
            if out.resp = s.commit(); out.resp != nil {
                break
            }
        }
        outc <- out
    }()
    r := <-outc
    return r.resp, r.err
}

// stm implements repeatable-read software transactional memory over etcd
type stm struct {
    client *v3.Client
    ctx    context.Context
    // rset holds read key values and revisions
    rset map[string]*v3.GetResponse
    // wset holds overwritten keys and their values
    wset map[string]stmPut
    // getOpts are the opts used for gets
    getOpts []v3.OpOption
}

type stmPut struct {
    val string
    op  v3.Op
}

func (s *stm) Get(key string) string {
    if wv, ok := s.wset[key]; ok {
        return wv.val
    }
    return respToValue(s.fetch(key))
}

func (s *stm) Put(key, val string, opts ...v3.OpOption) {
    s.wset[key] = stmPut{val, v3.OpPut(key, val, opts...)}
}

func (s *stm) Del(key string) { s.wset[key] = stmPut{"", v3.OpDelete(key)} }

func (s *stm) Rev(key string) int64 {
    if resp := s.fetch(key); resp != nil && len(resp.Kvs) != 0 {
        return resp.Kvs[0].ModRevision
    }
    return 0
}

func (s *stm) commit() *v3.TxnResponse {
    txnresp, err := s.client.Txn(s.ctx).If(s.cmps()...).Then(s.puts()...).Commit()
    if err != nil {
        panic(stmError{err})
    }
    if txnresp.Succeeded {
        return txnresp
    }
    return nil
}

// cmps guards the txn from updates to read set
func (s *stm) cmps() []v3.Cmp {
    cmps := make([]v3.Cmp, 0, len(s.rset))
    for k, rk := range s.rset {
        cmps = append(cmps, isKeyCurrent(k, rk))
    }
    return cmps
}

func (s *stm) fetch(key string) *v3.GetResponse {
    if resp, ok := s.rset[key]; ok {
        return resp
    }
    resp, err := s.client.Get(s.ctx, key, s.getOpts...)
    if err != nil {
        panic(stmError{err})
    }
    s.rset[key] = resp
    return resp
}

// puts is the list of ops for all pending writes
func (s *stm) puts() []v3.Op {
    puts := make([]v3.Op, 0, len(s.wset))
    for _, v := range s.wset {
        puts = append(puts, v.op)
    }
    return puts
}

func (s *stm) reset() {
    s.rset = make(map[string]*v3.GetResponse)
    s.wset = make(map[string]stmPut)
}

type stmSerializable struct {
    stm
    prefetch map[string]*v3.GetResponse
}

func (s *stmSerializable) Get(key string) string {
    if wv, ok := s.wset[key]; ok {
        return wv.val
    }
    firstRead := len(s.rset) == 0
    if resp, ok := s.prefetch[key]; ok {
        delete(s.prefetch, key)
        s.rset[key] = resp
    }
    resp := s.stm.fetch(key)
    if firstRead {
        // txn's base revision is defined by the first read
        s.getOpts = []v3.OpOption{
            v3.WithRev(resp.Header.Revision),
            v3.WithSerializable(),
        }
    }
    return respToValue(resp)
}

func (s *stmSerializable) Rev(key string) int64 {
    s.Get(key)
    return s.stm.Rev(key)
}

func (s *stmSerializable) gets() ([]string, []v3.Op) {
    keys := make([]string, 0, len(s.rset))
    ops := make([]v3.Op, 0, len(s.rset))
    for k := range s.rset {
        keys = append(keys, k)
        ops = append(ops, v3.OpGet(k))
    }
    return keys, ops
}

func (s *stmSerializable) commit() *v3.TxnResponse {
    keys, getops := s.gets()
    txn := s.client.Txn(s.ctx).If(s.cmps()...).Then(s.puts()...)
    // use Else to prefetch keys in case of conflict to save a round trip
    txnresp, err := txn.Else(getops...).Commit()
    if err != nil {
        panic(stmError{err})
    }
    if txnresp.Succeeded {
        return txnresp
    }
    // load prefetch with Else data
    for i := range keys {
        resp := txnresp.Responses[i].GetResponseRange()
        s.rset[keys[i]] = (*v3.GetResponse)(resp)
    }
    s.prefetch = s.rset
    s.getOpts = nil
    return nil
}

type stmReadCommitted struct{ stm }

// commit always goes through when read committed
func (s *stmReadCommitted) commit() *v3.TxnResponse {
    s.rset = nil
    return s.stm.commit()
}

func isKeyCurrent(k string, r *v3.GetResponse) v3.Cmp {
    rev := r.Header.Revision + 1
    if len(r.Kvs) != 0 {
        rev = r.Kvs[0].ModRevision + 1
    }
    return v3.Compare(v3.ModRevision(k), "<", rev)
}

func respToValue(resp *v3.GetResponse) string {
    if len(resp.Kvs) == 0 {
        return ""
    }
    return string(resp.Kvs[0].Value)
}

最新文章

  1. 使用js_md5加密密码
  2. jquery ajax异步请求
  3. 常用shell命令操作
  4. ADF_ManagedBean的概念和管理(概念)
  5. iOS时间那点事儿–NSTimeZone
  6. TP快捷函数
  7. 初学Hibernate
  8. 一张图让你学会LVM
  9. grails的插件
  10. [转载]Div和Table的区别
  11. Tomcat源码学习一
  12. c/c++ 算法之快速排序法 冒泡排序法,选择排序法,插入排序法
  13. android log4j日志管理的使用
  14. 结队编程--基于GUI的四则运算
  15. Python学习笔记-SQLSERVER的大批量导入以及日常操作(比executemany快3倍)
  16. Python给照片换底色(蓝底换红底)
  17. HDU 3333 Turing Tree 离线 线段树/树状数组 区间求和单点修改
  18. Codeforces 1132E (看题解)
  19. layui upload 后台获取不到值
  20. HDU 6047 17多校 Maximum Sequence(优先队列)

热门文章

  1. SQLSERVER 执行过的语句查询
  2. 我的摸索过程之IIS下配置asp.net 的注意事项
  3. InnoDB存储引擎的总览
  4. Day9 进程同步锁 进程队列 进程池 生产消费模型 进程池 paramike模块
  5. pg_dump命令帮助信息
  6. Ocelot中文文档-入门
  7. git push The requested URL returned error: 403 Forbidden while accessing
  8. Jmeter——HTTP协议的接口压力测试环境搭建
  9. 国内各大支付平台的API地址
  10. 如何书写一篇能看懂的html和CSS代码