首页 技术 正文
技术 2022年11月23日
0 收藏 381 点赞 4,128 浏览 11985 个字

理解 Linux 条件变量

1 简介


例1: 当系统不忙(这是一个条件)时,执行扫描文件状态的线程。

例2: 多个线程组成线程池,只有当任务队列中存在任务时,才用其中一个线程去执行这个任务。为避免惊群(thrundering herd),可以采用条件变量同步线程池中的线程。

2 用法



1) pthread_cond_init

2) pthread_cond_signal / pthread_cond_broadcast

3) pthread_cond_wait / pthread_cond_timedwait

4) pthread_cond_destroy


include <stdio.h>  include <sys/time.h>  include <unistd.h>  include <pthread.h>  include <errno.h>...void A_thread_run(void *arg){    ...    pthread_mutex_lock (& lock);    // 条件满足, 发出通知    pthread_cond_signal (& cond);    pthread_mutex_unlock (& lock);    ...}


void B_thread_run(void *arg){    for ( ; ; ) {        pthread_mutex_lock (&lock);        /* pthread_cond_wait 原子调用: 等待条件变量, 解除锁, 然后阻塞         * 当 pthread_cond_wait 返回,则条件变量有信号,同时上锁         *         * 等待条件有两种方式:条件等待pthread_cond_wait()和计时等待pthread_cond_timedwait(),         * 其中计时等待方式如果在给定时刻前条件没有满足,则返回ETIMEOUT         * 无论哪种等待方式,都必须和一个互斥锁配合,以防止多个线程同时请求pthread_cond_wait()         * (或pthread_cond_timedwait(),下同)的竞争条件(Race Condition)。         * mutex互斥锁必须是普通锁(PTHREAD_MUTEX_TIMED_NP)或者适应锁(PTHREAD_MUTEX_ADAPTIVE_NP),         * 且在调用pthread_cond_wait()前必须由本线程加锁(pthread_mutex_lock()),而在更新条件等待队列以前,         * mutex保持锁定状态,并在线程挂起进入等待前解锁。         * 在条件满足从而离开pthread_cond_wait()之前,mutex将被重新加锁,以与进入pthread_cond_wait()前的加锁动作对应。         * 激发条件有两种形式,pthread_cond_signal()激活一个等待该条件的线程,存在多个等待线程时按入队顺序激活其中一个;         * 而pthread_cond_broadcast()则激活所有等待线程(惊群)。         */        pthread_cond_wait (&cond, &lock);        if (shutdown) {            break;        }        /* Unlock */        pthread_mutex_unlock (&lock);        /* do your task here */    }    pthread_mutex_unlock (&lock);    pthread_exit (0);}



3 避免惊群

这是个狼多肉少,僧多粥少,色鬼多美女少的时代。每当一块肉丢到狼群,就引发一群狼去争抢,但最后只有一只狼得到了肉。这就是惊群(thrundering herd)。现实世界的惊群,比如老师在课堂上每次提出一个问题,最后只找一个学生回答,时间久了,学生对这个老师的问题就倦怠了。计算机的惊群会造成服务器资源空耗。



4 线程池threadpool


每个线程有特定于线程的参数(thread argument),每个任务也有特定于任务的数据(task argument)。线程函数执行任务函数,同时传递给任务函数线程参数和任务参数。




/* * 2014-06-18: last modified by cheungmine * * Copyright (c) 2011, Mathias Brossard <mathias@brossard.org>. * All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are * met: * *  1. Redistributions of source code must retain the above copyright *     notice, this list of conditions and the following disclaimer. * *  2. Redistributions in binary form must reproduce the above copyright *     notice, this list of conditions and the following disclaimer in the *     documentation and/or other materials provided with the distribution. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT * HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */#ifndef _THREADPOOL_H_#define _THREADPOOL_H_#ifndef POOL_MAX_THREADS#   define POOL_MAX_THREADS  256#endif#ifndef POOL_MAX_QUEUES#   define POOL_MAX_QUEUES  1024#endif#ifndef POOL_DEFAULT_THREADS#   define POOL_DEFAULT_THREADS  32#endif#ifndef POOL_DEFAULT_QUEUES#   define POOL_DEFAULT_QUEUES  256#endif/** * @file threadpool.h * @brief Threadpool Header file */typedef struct threadpool_t threadpool_t;/** * @file threadpool.h * @brief thread_context_t *   thread can take itself argument *   added by cheungmine. *   2014-06-17 */typedef struct thread_context_t{    void *pool;    pthread_t thread;    void *thread_arg;    struct threadpool_task_t *task;} thread_context_t;/** *  @struct threadpool_task *  @brief the work struct * *  @var function Pointer to the function that will perform the task. *  @var argument Argument to be passed to the function. */typedef struct threadpool_task_t{    void (*function)(thread_context_t *);    int    flags;     /* user defined */    void * argument;} threadpool_task_t;typedef enum{    threadpool_invalid        = -1,    threadpool_lock_failure   = -2,    threadpool_queue_full     = -3,    threadpool_shutdown       = -4,    threadpool_run_failure    = -5,    threadpool_out_memory     = -6} threadpool_error_t;static const char* threadpool_error_messages[] = {    "threadpool_success",    "threadpool_invalid",    "threadpool_lock_failure",    "threadpool_queue_full",    "threadpool_shutdown",    "threadpool_run_failure",    "threadpool_out_memory"};/** * @function threadpool_create * @brief Creates a threadpool_t object. * @param thread_count Number of worker threads. * @param queue_size   Size of the queue. * @param thread_args  array of arguments with count of thread_count, NULL if ignored. * @param flags        Unused parameter. * @return a newly created thread pool or NULL */threadpool_t *threadpool_create (int thread_count, int queue_size, void **thread_args, int flags);/** * @function threadpool_add * @brief add a new task in the queue of a thread pool * @param pool     Thread pool to which add the task. * @param function Pointer to the function that will perform the task. * @param argument Argument to be passed to the function. * @param flags    Unused parameter. * @return 0 if all goes well, negative values in case of error (@see * threadpool_error_t for codes). */int threadpool_add (threadpool_t *pool, void (*routine)(thread_context_t *), void *task_arg, int flags);/** * @function threadpool_destroy * @brief Stops and destroys a thread pool. * @param pool  Thread pool to destroy. * @param flags Unused parameter. */int threadpool_destroy (threadpool_t *pool, int flags);#endif /* _THREADPOOL_H_ */


/* * 2014-06-18: last modified by cheungmine * * Copyright (c) 2011, Mathias Brossard <mathias@brossard.org>. * All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are * met: * *  1. Redistributions of source code must retain the above copyright *     notice, this list of conditions and the following disclaimer. * *  2. Redistributions in binary form must reproduce the above copyright *     notice, this list of conditions and the following disclaimer in the *     documentation and/or other materials provided with the distribution. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT * HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. *//** * @file threadpool.c * @brief Threadpool implementation file */#include <stdlib.h>#include <pthread.h>#include <unistd.h>#include "threadpool.h"/** *  @struct threadpool *  @brief The threadpool struct * *  @var notify       Condition variable to notify worker threads. *  @var threads      Array containing worker threads ID. *  @var thread_count Number of threads *  @var queue        Array containing the task queue. *  @var queue_size   Size of the task queue. *  @var head         Index of the first element. *  @var tail         Index of the next element. *  @var shutdown     Flag indicating if the pool is shutting down */struct threadpool_t {    pthread_mutex_t lock;    pthread_cond_t notify;    int head;    int tail;    int count;    int shutdown;    int started;    int thread_count;    int queue_size;    threadpool_task_t *queues;    thread_context_t thread_ctxs[0];};/** * @function void *threadpool_run(void *threadpool) * @brief the worker thread * @param threadpool the pool which own the thread */static void *threadpool_run (void *threadpool);int threadpool_free(threadpool_t *pool);threadpool_t *threadpool_create(int thread_count, int queue_size, void **thread_args, int flags){    int i;    threadpool_t *pool = NULL;    /* Check thread_count for negative or otherwise very big input parameters */    if (thread_count < 0 || thread_count > POOL_MAX_THREADS) {        goto err;    }    if (thread_count == 0) {        thread_count = POOL_DEFAULT_THREADS;    }    /* Check queue_size for negative or otherwise very big input parameters */    if (queue_size < 0 || queue_size > POOL_MAX_QUEUES) {        goto err;    }    if (queue_size == 0) {        queue_size = POOL_DEFAULT_QUEUES;    }    /* create threadpool */    if ( (pool = (threadpool_t *) malloc (sizeof(threadpool_t) +        sizeof(thread_context_t) * thread_count +        sizeof(threadpool_task_t) * queue_size)) == NULL ) {        goto err;    }    /* Initialize */    pool->thread_count = thread_count;    pool->queue_size = queue_size;    pool->head = pool->tail = pool->count = 0;    pool->shutdown = pool->started = 0;    pool->queues = (threadpool_task_t *) (& pool->thread_ctxs[thread_count]);    /* Initialize mutex and conditional variable first */    if ((pthread_mutex_init (&(pool->lock), NULL) != 0) ||       (pthread_cond_init (&(pool->notify), NULL) != 0)) {        goto err;    }    /* Start worker threads */    for (i = 0; i < thread_count; i++) {        thread_context_t * pctx = & pool->thread_ctxs[i];        /* set pool to each thread context */        pctx->pool = (void*) pool;        /* assign thread argument if valid */        if (thread_args) {            pctx->thread_arg = thread_args[i];        } else {            pctx->thread_arg = 0;        }        if ( pthread_create (& pctx->thread, NULL, threadpool_run, (void*) pctx) != 0) {            threadpool_destroy (pool, 0);            return NULL;        } else {            pool->started++;        }    }    return pool; err:    if(pool) {        threadpool_free(pool);    }    return NULL;}int threadpool_add (threadpool_t *pool, void (*function)(thread_context_t *), void *task_arg, int flags){    int err = 0;    int next;    if ( pool == NULL || function == NULL ) {        return threadpool_invalid;    }    if (pthread_mutex_lock (&(pool->lock)) != 0) {        return threadpool_lock_failure;    }    next = pool->tail + 1;    next = (next == pool->queue_size) ? 0 : next;    do {        /* Are we full ? */        if (pool->count == pool->queue_size) {            err = threadpool_queue_full;            break;        }        /* Are we shutting down ? */        if (pool->shutdown) {            err = threadpool_shutdown;            break;        }        /* Add task to queue */        pool->queues[pool->tail].function = function;        pool->queues[pool->tail].argument = task_arg;        pool->queues[pool->tail].flags = flags;        pool->tail = next;        pool->count += 1;        /* pthread_cond_broadcast */        if (pthread_cond_signal (&(pool->notify)) != 0) {            err = threadpool_lock_failure;            break;        }    } while(0);    if (pthread_mutex_unlock (&pool->lock) != 0) {        err = threadpool_lock_failure;    }    return err;}int threadpool_destroy (threadpool_t *pool, int flags){    int i, err = 0;    if (pool == NULL) {        return threadpool_invalid;    }    if (pthread_mutex_lock (&(pool->lock)) != 0) {        return threadpool_lock_failure;    }    do {        /* Already shutting down */        if (pool->shutdown) {            err = threadpool_shutdown;            break;        }        pool->shutdown = 1;        /* Wake up all worker threads */        if ((pthread_cond_broadcast(&(pool->notify)) != 0) ||           (pthread_mutex_unlock(&(pool->lock)) != 0)) {            err = threadpool_lock_failure;            break;        }        /* Join all worker thread */        for (i = 0; i < pool->thread_count; i++) {            if (pthread_join (pool->thread_ctxs[i].thread, NULL) != 0) {                err = threadpool_run_failure;            }        }    } while(0);    if (pthread_mutex_unlock (&pool->lock) != 0) {        err = threadpool_lock_failure;    }    /* Only if everything went well do we deallocate the pool */    if (!err) {        threadpool_free (pool);    }    return err;}int threadpool_free (threadpool_t *pool){    if (pool == NULL || pool->started > 0) {        return -1;    }    pthread_mutex_lock (&(pool->lock));    pthread_mutex_destroy (&(pool->lock));    pthread_cond_destroy (&(pool->notify));    free(pool);    return 0;}/** * each thread run function */static void *threadpool_run (void * param){    threadpool_task_t task;    thread_context_t * thread_ctx = (thread_context_t *) param;    threadpool_t * pool = thread_ctx->pool;    for ( ; ; ) {        /* Lock must be taken to wait on conditional variable */        pthread_mutex_lock (&(pool->lock));        /* Wait on condition variable, check for spurious wakeups.           When returning from pthread_cond_wait(), we own the lock. */        while ((pool->count == 0) && (!pool->shutdown)) {            pthread_cond_wait (&(pool->notify), &(pool->lock));        }        if (pool->shutdown) {            break;        }        /* Grab our task */        task.function = pool->queues[pool->head].function;        task.argument = pool->queues[pool->head].argument;        task.flags    = pool->queues[pool->head].flags;        thread_ctx->task = &task;        pool->head += 1;        pool->head = (pool->head == pool->queue_size) ? 0 : pool->head;        pool->count -= 1;        /* Unlock */        pthread_mutex_unlock (&(pool->lock));        /* Get to work */        (*(task.function)) (thread_ctx);    }    pool->started--;    pthread_mutex_unlock (&(pool->lock));    pthread_exit (NULL);    return (NULL);}
日期:2022-11-24 点赞:878 阅读:8,944
Educational Codeforces Round 11 C. Hard Process 二分
C. Hard Process题目连接:http://www.codeforces.com/contest/660/problem/CDes…
日期:2022-11-24 点赞:807 阅读:5,470
下载Ubuntn 17.04 内核源代码
zengkefu@server1:/usr/src$ uname -aLinux server1 4.10.0-19-generic #21…
日期:2022-11-24 点赞:569 阅读:6,284
可用Active Desktop Calendar V7.86 注册码序列号
可用Active Desktop Calendar V7.86 注册码序列号Name: www.greendown.cn Code: &nb…
日期:2022-11-24 点赞:733 阅读:6,100
日期:2022-11-24 点赞:512 阅读:7,730
一、Struts2的获取  Struts的官方网站为:http://struts.apache.org/  下载完Struts2的jar包,…
日期:2022-11-24 点赞:671 阅读:4,766