使用者角度看bionic pthread_mutex和linux futex实现

本文所大篇幅引用的参考文章主要描述针对glibc和pthread实现;而本文的考察代码主要是android的bionic和pthread实现。

1.   futex引入的意义

传统的SYSTEM V IPC机制需要系统调用进入内核态去操作某个内核对象,由内核来仲裁同步,事实上大部分情况下并没有资源竞争即多个申请者不会同时去竞争同步对象,此种情况下仍然进入内核态会显得很浪费,系统开销增加进而造成性能折扣。

Futex(Fast Userspace Mutex)快速用户态互斥体,它是一种由用户态和内核态共同完成的同步机制。创建时是在用户空间通过mmap申请一片共享内存以便多进程间共同访问此futex,用户程序首先访问用户态的futex,只在判断出存在冲突(如一个进程已经拥有此futex,另一个进程申请访问,此时便存在一个冲突)时才进入内核态进行仲裁同步。

用户空间的访问和冲突判断由glibc库完成,冲突仲裁由内核的futex模块完成。

应用判断:根据futex的类型从用户空间的futex_t字中获取futex状态,看futex是否已被占用;

内核仲裁:将用户态的锁置上等待标志表明有锁的等待者存在,并调用schedule()将锁申请者挂起;当锁的拥有者释放锁(由glibc库完成)时,检查发现该锁有等待者就进入内核将等待者唤醒。

2.   mutex使用概述

Glibc库中实现有pthread_mutex_lock()/pthread_mutex_unlock()等用户态锁接口,以提供快速的futex机制。

Bionic 的pthread实现中也提供标准的pthread_mutex_lock()/pthread_mutex_unlock()接口,其实现也使用linux futex机制。

Android Frameworks中经常使用如下代码做线程同步,

Mutex::Autolock lock(MutexClassInstance);

Class Mutex声明在frameworks/native/include/utils/Mutex.h中。

36/*

37 * Simple mutex class.  The implementation is system-dependent.

38 *

39 * The mutex must be unlocked by the thread that locked it.  They are not

40 * recursive, i.e. the same thread can't lock it multiple times.

41 */

42class Mutex {

43public:

44    enum {

45        PRIVATE = 0,

46        SHARED = 1

47    };

48

49                Mutex();

50                Mutex(const char* name);

51                Mutex(int type, const char* name = NULL);

52                ~Mutex();

53

54    // lock or unlock the mutex

55    status_t    lock();

56    void        unlock();

57

58    // lock if possible; returns 0 on success, error otherwise

59    status_t    tryLock();

60

61    // Manages the mutex automatically. It'll be locked when Autolock is

62    // constructed and released when Autolock goes out of scope.

63    class Autolock {

64    public:

65        inline Autolock(Mutex& mutex) : mLock(mutex)  { mLock.lock(); }

66        inline Autolock(Mutex* mutex) : mLock(*mutex) { mLock.lock(); }

67        inline ~Autolock() { mLock.unlock(); }

68    private:

69        Mutex& mLock;

70    };

71

72private:

73    friend class Condition;

74

75    // A mutex cannot be copied

76                Mutex(const Mutex&);

77    Mutex&      operator = (const Mutex&);

78

79#if defined(HAVE_PTHREADS)

80    pthread_mutex_t mMutex;

81#else

82    void    _init();

83    void*   mState;

84#endif

85};

可以看到在有pthread支持的时候,Class Mutex使用pthread_mutex实现。pthread_mutex声明在bionic/libc/include/pthread.h中,

40typedef struct

41{

42    int volatile value;

43} pthread_mutex_t;

因为同一线程组使用同一用户空间,从而组内各线程是可以直接访问value变量的,不需要前文所述的open(“/lock”)和mmap(fd)。

pthread_mutex的常用接口此处只列出下面几个,用途由函数名自解释。

164int pthread_mutex_init(pthread_mutex_t *mutex, const pthread_mutexattr_t *attr);

166int pthread_mutex_destroy(pthread_mutex_t *mutex);

167int pthread_mutex_lock(pthread_mutex_t *mutex);

168int pthread_mutex_unlock(pthread_mutex_t *mutex);

pthread_mutex类型有三种,可以在pthread_mutex创建时设定,

{

…..

pthread_mutexattr_init(&attr);

pthread_mutexattr_settype(&attr, mutex_type)

pthread_mutex_init(&mutex, &attr);

…..

}

pthread_mutex的三种类型是,

53enum {

54    PTHREAD_MUTEX_NORMAL = 0,

55    PTHREAD_MUTEX_RECURSIVE = 1,

56    PTHREAD_MUTEX_ERRORCHECK = 2,

57

58    PTHREAD_MUTEX_ERRORCHECK_NP = PTHREAD_MUTEX_ERRORCHECK,

59    PTHREAD_MUTEX_RECURSIVE_NP  = PTHREAD_MUTEX_RECURSIVE,

60

61    PTHREAD_MUTEX_DEFAULT = PTHREAD_MUTEX_NORMAL

62};

其中为PTHREAD_MUTEX_NORMAL类型时,pthread_mutex_t::value中只含有mutex是否已被线程获取的状态;后两种会含有mutex的状态,占有mutex的线程tid,重获取次数等;详见pthread.h中的注释。此处关乎到的一个使用处是当程序出现线程同步问题时,想知道哪个线程占有mutex和哪些线程在等待mutex,因此需要加以说明。

对于mutex of PTHREAD_MUTEX_NORMAL只能知道其是否被占用和是否存在竞争,但是无法知道占有锁的线程id,无论从用户空间还是内核状态的相关变量,因为这个信息并没有被保存;从内核数据futex_queues hash table可以知道该线程组内有哪些线程阻塞在该mutex上,需要注意的一点是futex_queues是个hash table。

对于后两种类型的mutex,由pthread_mutex_t::value::bit_field_tid可以知道占有线程id,从内核数据futex_queues hash table可以知道该线程组内有哪些线程阻塞在该mutex上。

在bionic/libc/bionic/pthread.c中,pthread_mutex_lock()/pthread_mutex_unlock()分别使用pthread_mutex_lock_impl(pthread_mutex_t *mutex)/ pthread_mutex_unlock_impl(pthread_mutex_t *mutex)实现,其内部根据mutex类型做不同的分支实现。

#define  MUTEX_TYPE_BITS_NORMAL      MUTEX_TYPE_TO_BITS(MUTEX_TYPE_NORMAL)

#define  MUTEX_TYPE_BITS_RECURSIVE   MUTEX_TYPE_TO_BITS(MUTEX_TYPE_RECURSIVE)

#define  MUTEX_TYPE_BITS_ERRORCHECK  MUTEX_TYPE_TO_BITS(MUTEX_TYPE_ERRORCHECK)

对于MUTEX_TYPE_BITS_NORMAL,走_normal_lock/ _normal_unlock分支;对于后两种尝试获取/释放,增减重占有计数,交换状态;需要和内核交互时,最后都会执行到__futex_wait_ex/ __futex_wake_ex统一入口进行系统调用,

63int  __futex_wake_ex(volatile void *ftx, int pshared, int val)

64{

65    return __futex_syscall3(ftx, pshared ? FUTEX_WAKE : FUTEX_WAKE_PRIVATE, val);

66}

67

68int  __futex_wait_ex(volatile void *ftx, int pshared, int val, const struct timespec *timeout)

69{

70    return __futex_syscall4(ftx, pshared ? FUTEX_WAIT : FUTEX_WAIT_PRIVATE, val, timeout);

71}

详细流程见下节。

#define __NR_futex              240

sys_api int futex (int *uaddr, int op, int val, const struct timespec *timeout,int *uaddr2, int val3);

pthread_mutex三种类型都是使用非优先级继承的futex。不带优先级继承功能的锁,会调用内核流程futex_wait()/futex_wake(),带有优先级继承功能的锁会调用内核流程futex_lock_pi()/futex_unlock_pi();pi锁导致业务性能下降是其模型导致。

3.   glibc futex用户态流程分析

bionic pthread_mutex的仅实现了非优先级继承 的futex。

3.1不带优先级继承(非pi)

pthread_mutex_lock

Ø  尝试获取锁,如果失败则进入内核阻塞

Ø  重复上述过程直到成功,并置持有锁标志

pthread_mutex_unlock

Ø  如果有等待者,则进入内核态唤醒

3.2 带优先级继承(pi)

pthread_mutex_lock

Ø  尝试获取锁,如果失败则进入内核阻塞

Ø  从内核返回后(通常表明加锁成功),置持有锁标志

pthread_mutex_unlock

Ø  如果有等待者,则进入内核态唤醒

3.3 流程对比

在加锁阶段,非pi锁可能会在glibc库中多次竞争并多次进入内核态阻塞直到获取锁;而pi锁最多只会进入内核态一次。从此角度可以看出,非pi锁存在竞争的不公平性。

4.   内核对非 PI futex的处理流程

内核函数实现在kernel/kernel/futex.c中。

sys_call后入口sys_futex和do_futex,根据参数op做分支处理。

2677SYSCALL_DEFINE6(futex, u32 __user *, uaddr, int, op, u32, val,

2678           struct timespec __user *, utime, u32 __user *, uaddr2,

2679           u32, val3)

2680{

2681 struct timespec ts;

2682 ktime_t t, *tp = NULL;

2683 u32 val2 = 0;

2684 int cmd = op & FUTEX_CMD_MASK;

2685

2686 if (utime && (cmd == FUTEX_WAIT || cmd == FUTEX_LOCK_PI ||

2687                 cmd == FUTEX_WAIT_BITSET ||

2688                 cmd == FUTEX_WAIT_REQUEUE_PI)) {

2689           if (copy_from_user(&ts, utime, sizeof(ts)) != 0)

2690                    return -EFAULT;

2691           if (!timespec_valid(&ts))

2692                    return -EINVAL;

2693

2694           t = timespec_to_ktime(ts);

2695           if (cmd == FUTEX_WAIT)

2696                    t = ktime_add_safe(ktime_get(), t);

2697           tp = &t;

2698 }

2699 /*

2700 * requeue parameter in 'utime' if cmd == FUTEX_*_REQUEUE_*.

2701 * number of waiters to wake in 'utime' if cmd == FUTEX_WAKE_OP.

2702 */

2703 if (cmd == FUTEX_REQUEUE || cmd == FUTEX_CMP_REQUEUE ||

2704     cmd == FUTEX_CMP_REQUEUE_PI || cmd == FUTEX_WAKE_OP)

2705           val2 = (u32) (unsigned long) utime;

2706

2707 return do_futex(uaddr, op, val, tp, uaddr2, val2, val3);

2708}

2620long do_futex(u32 __user *uaddr, int op, u32 val, ktime_t *timeout,

2621           u32 __user *uaddr2, u32 val2, u32 val3)

2622{

2623 int cmd = op & FUTEX_CMD_MASK;

2624 unsigned int flags = 0;

2625

2626 if (!(op & FUTEX_PRIVATE_FLAG))

2627           flags |= FLAGS_SHARED;

2628

2629 if (op & FUTEX_CLOCK_REALTIME) {

2630           flags |= FLAGS_CLOCKRT;

2631           if (cmd != FUTEX_WAIT_BITSET && cmd != FUTEX_WAIT_REQUEUE_PI)

2632                    return -ENOSYS;

2633 }

2634

2635 switch (cmd) {

2636 case FUTEX_LOCK_PI:

2637 case FUTEX_UNLOCK_PI:

2638 case FUTEX_TRYLOCK_PI:

2639 case FUTEX_WAIT_REQUEUE_PI:

2640 case FUTEX_CMP_REQUEUE_PI:

2641           if (!futex_cmpxchg_enabled)

2642                    return -ENOSYS;

2643 }

2644

2645 switch (cmd) {

2646 case FUTEX_WAIT:

2647           val3 = FUTEX_BITSET_MATCH_ANY;

2648 case FUTEX_WAIT_BITSET:

2649           return futex_wait(uaddr, flags, val, timeout, val3);

2650 case FUTEX_WAKE:

2651           val3 = FUTEX_BITSET_MATCH_ANY;

2652 case FUTEX_WAKE_BITSET:

2653           return futex_wake(uaddr, flags, val, val3);

2654 case FUTEX_REQUEUE:

2655           return futex_requeue(uaddr, flags, uaddr2, val, val2, NULL, 0);

2656 case FUTEX_CMP_REQUEUE:

2657           return futex_requeue(uaddr, flags, uaddr2, val, val2, &val3, 0);

2658 case FUTEX_WAKE_OP:

2659           return futex_wake_op(uaddr, flags, uaddr2, val, val2, val3);

2660 case FUTEX_LOCK_PI:

2661           return futex_lock_pi(uaddr, flags, val, timeout, 0);

2662 case FUTEX_UNLOCK_PI:

2663           return futex_unlock_pi(uaddr, flags);

2664 case FUTEX_TRYLOCK_PI:

2665           return futex_lock_pi(uaddr, flags, 0, timeout, 1);

2666 case FUTEX_WAIT_REQUEUE_PI:

2667           val3 = FUTEX_BITSET_MATCH_ANY;

2668           return futex_wait_requeue_pi(uaddr, flags, val, timeout, val3,

2669                                            uaddr2);

2670 case FUTEX_CMP_REQUEUE_PI:

2671           return futex_requeue(uaddr, flags, uaddr2, val, val2, &val3, 1);

2672 }

2673 return -ENOSYS;

2674}

对非PI futex的处理,涉及到的futex内核数据结构只有struct futex_q和static struct futex_hash_bucket futex_queues hash table,其具体用途参见下节。非PI的futex只需要将futex_q挂入到/摘除掉futex_queues hash table entry。对应的futex_q::pi_state=NULL。

futex_wait

Ø  取__lock的值,与传进来的val参数比较,如果不等,则直接返回;

Ø  将自己加入到等待队列中,然后调用schedule将自己调度出去。

1870static int futex_wait(u32 __user *uaddr, unsigned int flags, u32 val,

1871                 ktime_t *abs_time, u32 bitset)

1872{

1873 struct hrtimer_sleeper timeout, *to = NULL;

1874 struct restart_block *restart;

1875 struct futex_hash_bucket *hb;

1876 struct futex_q q = futex_q_init;

1877 int ret;

1878

1879 if (!bitset)

1880           return -EINVAL;

1881 q.bitset = bitset;

1882

1883 if (abs_time) {

1884           to = &timeout;

1885

1886           hrtimer_init_on_stack(&to->timer, (flags & FLAGS_CLOCKRT) ?

1887                                   CLOCK_REALTIME : CLOCK_MONOTONIC,

1888                                   HRTIMER_MODE_ABS);

1889           hrtimer_init_sleeper(to, current);

1890           hrtimer_set_expires_range_ns(&to->timer, *abs_time,

1891                                            current->timer_slack_ns);

1892 }

1893

1894retry:

1895 /*

1896 * Prepare to wait on uaddr. On success, holds hb lock and increments

1897 * q.key refs.

1898 */

1899 ret = futex_wait_setup(uaddr, val, flags, &q, &hb);

1900 if (ret)

1901           goto out;

1902

1903 /* queue_me and wait for wakeup, timeout, or a signal. */

1904 futex_wait_queue_me(hb, &q, to);

1905

1906 /* If we were woken (and unqueued), we succeeded, whatever.*/

1907 ret = 0;

1908 /* unqueue_me() drops q.key ref */

1909 if (!unqueue_me(&q))

1910           goto out;

1911 ret = -ETIMEDOUT;

1912 if (to && !to->task)

1913           goto out;

1914

1915 /*

1916 * We expect signal_pending(current), but we might be the

1917 * victim of a spurious wakeup as well.

1918 */

1919 if (!signal_pending(current))

1920           goto retry;

1921

1922 ret = -ERESTARTSYS;

1923 if (!abs_time)

1924           goto out;

1925

1926 restart = &current_thread_info()->restart_block;

1927 restart->fn = futex_wait_restart;

1928 restart->futex.uaddr = uaddr;

1929 restart->futex.val = val;

1930 restart->futex.time = abs_time->tv64;

1931 restart->futex.bitset = bitset;

1932 restart->futex.flags = flags | FLAGS_HAS_TIMEOUT;

1933

1934 ret = -ERESTART_RESTARTBLOCK;

1935

1936out:

1937 if (to) {

1938           hrtimer_cancel(&to->timer);

1939           destroy_hrtimer_on_stack(&to->timer);

1940 }

1941 return ret;

1942}

1753/**

1754 * futex_wait_queue_me() - queue_me() and wait for wakeup, timeout, or signal

1755 * @hb:               the futex hash bucket, must be locked by the caller

1756 * @q:                  the futex_q to queue up on

1757 * @timeout:       the prepared hrtimer_sleeper, or null for no timeout

1758 */

1759static void futex_wait_queue_me(struct futex_hash_bucket *hb, struct futex_q *q,

1760                             struct hrtimer_sleeper *timeout)

1761{

1762 /*

1763 * The task state is guaranteed to be set before another task can

1764 * wake it. set_current_state() is implemented using set_mb() and

1765 * queue_me() calls spin_unlock() upon completion, both serializing

1766 * access to the hash list and forcing another memory barrier.

1767 */

1768 set_current_state(TASK_INTERRUPTIBLE);

1769 queue_me(q, hb);

1770

1771 /* Arm the timer */

1772 if (timeout) {

1773           hrtimer_start_expires(&timeout->timer, HRTIMER_MODE_ABS);

1774           if (!hrtimer_active(&timeout->timer))

1775                    timeout->task = NULL;

1776 }

1777

1778 /*

1779 * If we have been removed from the hash list, then another task

1780 * has tried to wake us, and we can skip the call to schedule().

1781 */

1782 if (likely(!plist_node_empty(&q->list))) {

1783           /*

1784           * If the timer has already expired, current will already be

1785           * flagged for rescheduling. Only call schedule if there

1786           * is no timeout, or if it has yet to expire.

1787           */

1788           if (!timeout || timeout->task)

1789                    schedule();

1790 }

1791 __set_current_state(TASK_RUNNING);

1792}

futex_wake

Ø  遍历hash链表,找到对应的futex,调用wake_futex唤醒对应阻塞的线程。因为系统调用传进来的nr_wake参数为1, 实际上只唤醒1个线程就退出,优先唤醒优先级高的。

970/*

971 * Wake up waiters matching bitset queued on this futex (uaddr).

972 */

973static int

974futex_wake(u32 __user *uaddr, unsigned int flags, int nr_wake, u32 bitset)

975{

976   struct futex_hash_bucket *hb;

977   struct futex_q *this, *next;

978   struct plist_head *head;

979   union futex_key key = FUTEX_KEY_INIT;

980   int ret;

981

982   if (!bitset)

983             return -EINVAL;

984

985   ret = get_futex_key(uaddr, flags & FLAGS_SHARED, &key, VERIFY_READ);

986   if (unlikely(ret != 0))

987             goto out;

988

989   hb = hash_futex(&key);

990   spin_lock(&hb->lock);

991   head = &hb->chain;

992

993   plist_for_each_entry_safe(this, next, head, list) {

994             if (match_futex (&this->key, &key)) {

995                      if (this->pi_state || this->rt_waiter) {

996                               ret = -EINVAL;

997                               break;

998                      }

999

1000                    /* Check if one of the bits is set in both bitsets */

1001                    if (!(this->bitset & bitset))

1002                             continue;

1003

1004                    wake_futex(this);

1005                    if (++ret >= nr_wake)

1006                             break;

1007           }

1008 }

1009

1010 spin_unlock(&hb->lock);

1011 put_futex_key(&key);

1012out:

1013 return ret;

1014}

835/*

836 * The hash bucket lock must be held when this is called.

837 * Afterwards, the futex_q must not be accessed.

838 */

839static void wake_futex(struct futex_q *q)

840{

841   struct task_struct *p = q->task;

842

843   /*

844   * We set q->lock_ptr = NULL _before_ we wake up the task. If

845   * a non-futex wake up happens on another CPU then the task

846   * might exit and p would dereference a non-existing task

847   * struct. Prevent this by holding a reference on p across the

848   * wake up.

849   */

850   get_task_struct(p);

851

852   __unqueue_futex(q);

853   /*

854   * The waiting task can free the futex_q as soon as

855   * q->lock_ptr = NULL is written, without taking any locks. A

856   * memory barrier is required here to prevent the following

857   * store to lock_ptr from getting ahead of the plist_del.

858   */

859   smp_wmb();

860   q->lock_ptr = NULL;

861

862   wake_up_state(p, TASK_NORMAL);

863   put_task_struct(p);

864}

注意:

queue_me()的注释可以看出,对PI rt_mutex的rt_thread,首先唤醒;对non rt_thread使用FIFO规则唤醒。所以对rt_thread,会将其对应的futex_q链入prio_list和node_list;而对non rt_thread,其优先级都使用MAX_RT_PRIO,只链入node_list,其prio_list字段是empty(指向该list_head自己)。这是在plist_add中实现的。

5.   内核PI futex相关的数据结构

在继续论述PI futex之前,对PI futex涉及到的背景和引入其解决的问题--优先级翻转和优先级继承做简略叙述。

Linux 内核信号量操作中存在两种优先级倒转现象:(1)等待队列中的进程是按照“先进先出”的原则被唤醒进入临界区的,进程的优先级并没有起作用; (2)信号量所保护的临界区内允许进程调度,临界区内的低优先级进程一旦受阻进入睡眠就很难再得到机会运行,从而阻塞了等待队列中的高优先级进程。这种阻塞可能是有限的,也可能是无限的。

为了将Linux 内核用于实时系统,应该对Linux内核信号量操作中存在的优先级倒转问题加以解决。对于第一种优先级倒转现象,只需要修改等待队列的排队规则,让进程按照优先级排队;而对于第二种优先级倒转现象,则需要实施优先级继承。

关于优先级继承原理可参考相关论文。叙述到此,以后部分仅供专题研究者参考。

下面针对recursive pi类型的futex以及线程间共享锁的情况,讲解内核中的futex_lock_pi()流程,展示所涉及的几个核心数据结构。

PI futex与realtime相关,所以futex对应的内核数据结构名称前冠以“rt”---- struct rt_mutex。

全局变量static struct futex_hash_bucket futex_queues[1<<FUTEX_HASHBITS];

futex_queues做为一个桥梁作用,glibc系统调用进入内核时,第一件事情就是通过它来找到内核的rt_mutex对象;利用当前task_struct->mm->mmap_sem和用户态lock字段的地址生成key,用此key哈希到此变量数组的某个成员,从而建立起二者的联系。

数据结构查找关系:

futex_lock_pi():

1.uaddr(即用户态lock字段的地址),  mmap_sem->key;

2.栈上分配futex_q, 将futex_q挂入futex_queues[hash(key)]中;

3.futex_q->futex_pi_state->rt_mutex,查找或分配futex_pi_state进而获得rt_mutex;

4.栈上分配rt_mutex_waiter,将rt_mutex_waiter挂入当前task和步骤3的rt_mutex中。

futex_unlock_pi():

1.key->futex_queues[hash(key)]->futex_q->futex_pi_state->rt_mutex;

2.从rt_mutex ->rt_mutex_waiter->task,找到等待唤醒的任务。

数据结构关系图:

注意:

最新的内核版本,数据结构已经有很大变化。以下所列仅作参考,具体以代码为准。

已根据内核2.6.38版本对原文做了修正,futex主要数据结构定义在文件kernel/kernel/futex.c中,其中对各主要数据结构都有比较详细的注释。

数据结构描述(同种颜色相互对应)

struct task_struct {

spinlock_t pi_lock;

struct plist_head pi_waiters; /* 只链入某rtmutex中优先级最高的rt_mutex_waiter */

/* Deadlock detection and priority inheritance handling */

struct rt_mutex_waiter *pi_blocked_on;

struct list_head pi_state_list;

struct futex_pi_state *pi_state_cache; /* use as a cache after futex_pi_state allocated*/

}

struct futex_hash_bucket {

spinlock_t lock;

struct plist_head chain;

};

static struct futex_hash_bucket futex_queues[1<<FUTEX_HASHBITS];

struct futex_q {

struct plist_node list; /* 根据当前线程的nomal_prio值链入 futex_hash_bucket

的chain中:plist_add(&q->list, &hb->chain); */

struct task_struct *task;

spinlock_t *lock_ptr; /* 指向futex_hash_bucket中的lock */

union futex_key key; /* futex_hash_bucket table hash key */

struct futex_pi_state *pi_state; /*指向task_struct::pi_state_cache, 不需每次分配释放 */

struct rt_mutex_waiter *rt_waiter;

union futex_key *requeue_pi_key;

u32 bitset;

};

struct futex_pi_state {

struct list_head list;  /* 链入task_struct中的pi_state_list */  <---(cross-refer)----+

|

/* the PI object*/ /*-Notice, it’s an OBJECT-*/                                                            |

struct rt_mutex pi_mutex;                                                                                             |

struct task_struct *owner; /* futex_pi_state的拥有者,通常是第一个waiter*/ ---

atomic_t refcount;

union futex_key key; /* 拷贝futex_q中的 key */

};

struct rt_mutex {

spinlock_t             wait_lock;

struct plist_head wait_list; /*所有的该锁的rt_mutex_waiter */

struct task_struct  *owner;  /* the holder of the lock */

}

struct rt_mutex_waiter {

struct plist_node list_entry;  /* 链入rt_mutex中的wait_list */

struct plist_node pi_list_entry; /*链入task_struct中的pi_waiters */

struct task_struct  *task;

struct rt_mutex   *lock;

}

rt_mutex_waiter是rt_mutex和task_struct的ER object,所以其含有二者指针,并且链入二者的链表。

struct plist_head {

struct list_head prio_list;

struct list_head node_list;

/* WARNING: DO NOT EDIT, AUTO-GENERATED CODE - SEE TOP FOR INSTRUCTIONS */

};

struct plist_node {

int prio;

struct plist_head plist;

/* WARNING: DO NOT EDIT, AUTO-GENERATED CODE - SEE TOP FOR INSTRUCTIONS */

};

struct rt_mutex锁的owner的低2位作为如下标志使用:

#define RT_MUTEX_OWNER_PENDING    1UL  /* 唤醒时会将实时锁置上此标志,用于try_to_steal_lock() */

#define RT_MUTEX_HAS_WAITERS     2UL  /* 有线程等待实时锁置上此标志 */

#define RT_MUTEX_OWNER_MASKALL  3UL

6.      内核PI futex处理流程

第一个waiter want to lock时,rt_mutex未必存在。

futex_lock_pi

Ø  使用queue_lock获取spin_lock锁,保证后面对信号量相关的操作都是安全的。

Ø  再次使用cmpxchg_futex_value_locked原子指令试图将__lock字段改为tid,如果能修改成功,表明锁拥有者释放了该锁。加锁成功直接返回。

Ø  将__lock字段的bit 31置1,表明现在开始有线程将阻塞到该锁上。

Ø  从内核维护的相关锁信息pi_state中,找到对应的内核实时信号量;将自己放到用户态信号量等待队列,并调用rt_mutex_timed_lock阻塞到内核的实时信号量上。

Ø  从rt_mutex_timed_lock返回时,可能失败,也可能是真正获取到了信号量;这期间可能会导致pi_state相关信息不一致,如果不一致,则修正。

Ø  必要时对锁拥有者线程进行优先级的提升。

Ø  返回rt_mutex_timed_lock的返回结果。

futex_lock_pi()具体流程:

栈上分配struct futex_q q;

分配当前线程的pi_state_cache内存;

根据glibc中的用户态futex锁uaddr的地址和task_struct->mm-> mmap_sem生成q. key;

根据key找到hash_bucket, 初始化q.lock_ptr,并上锁;

如果锁的拥有都为当前线程,释放q.lock_ptr锁后退出;

将uaddr置位FUTEX_WAITERS;

正在持有futex锁uaddr的线程p,第一次会以uaddr&0x0fff_ffff为pid找到;

lookup_pi_state():遍历hash_bucket-> chain,根据q.key值找到相应的pi_state;如果找不到则返回之前分配好的当前线程的pi_state_cache,并初始化pi_state:拷贝q.key,owner指向线程p,初始化pi_mutex(初始化wait_lock,将owner指向线程p),将list链入线程p的pi_state_list(注pi_state始终链入锁的拥有者线程中);

__queue_me():初始化q,将q.list链入hash_bucket->chain中,q.task指向当前线程,并释放q.lock_ptr锁;

如果是高优先级的线程则可以偷锁,偷锁成功后会调用fixup_pi_state_owner()对pi_state以及uaddr的值进行修正;

rt_mutex_timed_lock()->rt_mutex_slowlock()->{task_blocks_on_rt_mutex(lock, &waiter, detect) + schedule_rt_mutex(lock)-> __rt_mutex_slowlock()(defined as schedule())};

其中waiter为栈上分配的rt_mutex_waiter结构;设定当前线程为TASK_INTERRUPTIBLE状态;初始化waiter,task指向当前线程,lock指向当前rt_mutex锁,并使用当前线程的prio初始化两个plist_node结构;waiter->list_entry链入lock->wait_list, waiter->pi_list_entry链入owner ->pi_waiters(owner为拥有锁的线程即p);当前线程的pi_blocked_on指向waiter(用于检测死锁和PI处理,会在被唤醒时检查并置NULL),最后__rt_mutex_adjust_prio(owner)进行优先级继承操作,如果存在锁链(一个低优先级rtmutex1的拥有者会继承高优先级的waiter的优先级, 谓之PI;如果此线程阻塞在另外的rtmutex2,这个优先级会传播到rtmutex2的拥有者,依次类推传播,默认允许传播1024个;此种情形暂称之为存在锁链),则调用rt_mutex_adjust_prio_chain()进行优先级的传播;涉及到的锁为:lock->wait_lock和task->pi_lock;

schedule();/* unlock pi时会调用wakeup_next_waiter()唤醒此线程 */

正常流程调用try_to_steal_lock()成功返回:当成功后清除RT_MUTEX_OWNER_PENDING标志。

当rt_mutex_timed_lock()调用失败时(如信号唤醒),如果实时锁的拥有者恰好为当前线程则进行rt_mutex_trylock()获取锁;其它情况进行相应参数修正退出;

把q.list从hash_bucket中摘除,根据情况把q.pi_state从task_struct中摘除,把q.pi_state ->pi_mutex的owner置NULL(如果没有等待者),清除pi_state资源等。

futex_unlock_pi

Ø  如果__lock中的tid不是自己,返回错误。

Ø  使用cmpxchg_futex_value_locked原子操作,如果__lock等于当前的tid,则将其改为0,然后返回。

Ø  如果用户态信号量的等待队列中还有线程阻塞,则使用wake_futex_pi函数挑选优先级最高的线程为新的owner,修改__lock的tid属性为新owner的tid,并唤醒。

Ø  如果等待队列中没有线程阻塞,则在unlock_futex_pi函数中将__lock值改为0。

futex_unlock_pi()具体流程:

从略。

PI futex处理流程对比

整个加锁/解锁流程中,主要有三点区别:

1. pi锁远比非pi锁复杂,并使用rt_mutex内核对象进行线程的管理和唤醒,因此pi锁在内核中的执行时间比非pi锁要长得多;

2. pi锁直接参于锁的管理,非pi锁只是简单的挂起和唤醒线程(在glibc中管理锁,见3.1);

3. pi锁会改变锁当前持有者的优先级(优先级继承,以避免优先级反转)。

注:2.6.21内核存在许多pi futex的BUG。例如,在频繁fork的时候没有处理cmpxchg_futex_value_locked返回-EFAULT的情况,还是用高版本内核为好。

参考资料:

linux-2.6.21 pi futex关键数据结构关系图及lock流程 ---- http://blog.chinaunix.net/uid-7295895-id-3011238.html

最新文章

  1. PhoneGap--001 入门 安装
  2. 哈希 poj 2002
  3. CSS DIV HOVER
  4. MySQL数据库迁移(转)
  5. paip.php 与js 的相似性以及为什么它们这么烂还很流行。。
  6. JAVA基础学习day14--集合一
  7. 编写高质量代码改善C#程序的157个建议[勿选List&lt;T&gt;做基类、迭代器是只读的、慎用集合可写属性]
  8. linux nandflash驱动之MTD层
  9. PHP dirname() 函数
  10. 密钥public/private key登陆linux
  11. ACE首页更改
  12. Hibernate错误:Exception in thread &quot;main&quot; org.hibernate.exception.SQLGrammarException: Could not execute JDBC batch update
  13. python几种常见语法
  14. 工厂方法模式(Method Factory),理解多态应用的好例子.
  15. php程序员招聘
  16. Linux基础命令---sar显示系统活动信息
  17. java中全角半角字符的相互转换的代码
  18. Windows批处理命令学习中遇到的坑--持续更新中
  19. http 连接 analysis service (ssas)
  20. iOS 网络编程 TCP/UDP HTTP

热门文章

  1. Jenkins - 持续集成部署
  2. i8浏览器不支持placeholder属性解决办法,以及解决后,文字不居中问题
  3. cocos2d-x 场景切换
  4. React远程服务
  5. 转 Using $.ajaxPrefilter() To Configure AJAX Requests In jQuery 1.5
  6. Elasticsearch 监控和部署
  7. js调用本地office打开服务器的office文件预览
  8. 输入的是util包下面的 时间, 接受的是java.sql.date 或者 java.util.date类型
  9. hdu 2199 Can you solve this equation? (二分法)
  10. poj 2226 Muddy Fields (二分匹配)