threadpool.c

Go to the documentation of this file.
00001 /*
00002  * Asterisk -- An open source telephony toolkit.
00003  *
00004  * Copyright (C) 2012-2013, Digium, Inc.
00005  *
00006  * Mark Michelson <mmmichelson@digium.com>
00007  *
00008  * See http://www.asterisk.org for more information about
00009  * the Asterisk project. Please do not directly contact
00010  * any of the maintainers of this project for assistance;
00011  * the project provides a web site, mailing lists and IRC
00012  * channels for your use.
00013  *
00014  * This program is free software, distributed under the terms of
00015  * the GNU General Public License Version 2. See the LICENSE file
00016  * at the top of the source tree.
00017  */
00018 
00019 
00020 #include "asterisk.h"
00021 
00022 #include "asterisk/threadpool.h"
00023 #include "asterisk/taskprocessor.h"
00024 #include "asterisk/astobj2.h"
00025 #include "asterisk/utils.h"
00026 
00027 /* Needs to stay prime if increased */
00028 #define THREAD_BUCKETS 89
00029 
00030 /*!
00031  * \brief An opaque threadpool structure
00032  *
00033  * A threadpool is a collection of threads that execute
00034  * tasks from a common queue.
00035  */
00036 struct ast_threadpool {
00037    /*! Threadpool listener */
00038    struct ast_threadpool_listener *listener;
00039    /*!
00040     * \brief The container of active threads.
00041     * Active threads are those that are currently running tasks
00042     */
00043    struct ao2_container *active_threads;
00044    /*!
00045     * \brief The container of idle threads.
00046     * Idle threads are those that are currenly waiting to run tasks
00047     */
00048    struct ao2_container *idle_threads;
00049    /*!
00050     * \brief The container of zombie threads.
00051     * Zombie threads may be running tasks, but they are scheduled to die soon
00052     */
00053    struct ao2_container *zombie_threads;
00054    /*!
00055     * \brief The main taskprocessor
00056     *
00057     * Tasks that are queued in this taskprocessor are
00058     * doled out to the worker threads. Worker threads that
00059     * execute tasks from the threadpool are executing tasks
00060     * in this taskprocessor.
00061     *
00062     * The threadpool itself is actually the private data for
00063     * this taskprocessor's listener. This way, as taskprocessor
00064     * changes occur, the threadpool can alert its listeners
00065     * appropriately.
00066     */
00067    struct ast_taskprocessor *tps;
00068    /*!
00069     * \brief The control taskprocessor
00070     *
00071     * This is a standard taskprocessor that uses the default
00072     * taskprocessor listener. In other words, all tasks queued to
00073     * this taskprocessor have a single thread that executes the
00074     * tasks.
00075     *
00076     * All tasks that modify the state of the threadpool and all tasks
00077     * that call out to threadpool listeners are pushed to this
00078     * taskprocessor.
00079     *
00080     * For instance, when the threadpool changes sizes, a task is put
00081     * into this taskprocessor to do so. When it comes time to tell the
00082     * threadpool listener that worker threads have changed state,
00083     * the task is placed in this taskprocessor.
00084     *
00085     * This is done for three main reasons
00086     * 1) It ensures that listeners are given an accurate portrayal
00087     * of the threadpool's current state. In other words, when a listener
00088     * gets told a count of active, idle and zombie threads, it does not
00089     * need to worry that internal state of the threadpool might be different
00090     * from what it has been told.
00091     * 2) It minimizes the locking required in both the threadpool and in
00092     * threadpool listener's callbacks.
00093     * 3) It ensures that listener callbacks are called in the same order
00094     * that the threadpool had its state change.
00095     */
00096    struct ast_taskprocessor *control_tps;
00097    /*! True if the threadpool is in the process of shutting down */
00098    int shutting_down;
00099    /*! Threadpool-specific options */
00100    struct ast_threadpool_options options;
00101 };
00102 
00103 /*!
00104  * \brief listener for a threadpool
00105  *
00106  * The listener is notified of changes in a threadpool. It can
00107  * react by doing things like increasing the number of threads
00108  * in the pool
00109  */
00110 struct ast_threadpool_listener {
00111    /*! Callbacks called by the threadpool */
00112    const struct ast_threadpool_listener_callbacks *callbacks;
00113    /*! User data for the listener */
00114    void *user_data;
00115 };
00116 
00117 /*!
00118  * \brief states for worker threads
00119  */
00120 enum worker_state {
00121    /*! The worker is either active or idle */
00122    ALIVE,
00123    /*!
00124     * The worker has been asked to shut down but
00125     * may still be in the process of executing tasks.
00126     * This transition happens when the threadpool needs
00127     * to shrink and needs to kill active threads in order
00128     * to do so.
00129     */
00130    ZOMBIE,
00131    /*!
00132     * The worker has been asked to shut down. Typically
00133     * only idle threads go to this state directly, but
00134     * active threads may go straight to this state when
00135     * the threadpool is shut down.
00136     */
00137    DEAD,
00138 };
00139 
00140 /*!
00141  * A thread that executes threadpool tasks
00142  */
00143 struct worker_thread {
00144    /*! A unique (within a run of Asterisk) ID for the thread. Used for hashing and searching */
00145    int id;
00146    /*! Condition used in conjunction with state changes */
00147    ast_cond_t cond;
00148    /*! Lock used alongside the condition for state changes */
00149    ast_mutex_t lock;
00150    /*! The actual thread that is executing tasks */
00151    pthread_t thread;
00152    /*! A pointer to the threadpool. Needed to be able to execute tasks */
00153    struct ast_threadpool *pool;
00154    /*! The current state of the worker thread */
00155    enum worker_state state;
00156    /*! A boolean used to determine if an idle thread should become active */
00157    int wake_up;
00158    /*! Options for this threadpool */
00159    struct ast_threadpool_options options;
00160 };
00161 
00162 /* Worker thread forward declarations. See definitions for documentation */
00163 static int worker_thread_hash(const void *obj, int flags);
00164 static int worker_thread_cmp(void *obj, void *arg, int flags);
00165 static void worker_thread_destroy(void *obj);
00166 static void worker_active(struct worker_thread *worker);
00167 static void *worker_start(void *arg);
00168 static struct worker_thread *worker_thread_alloc(struct ast_threadpool *pool);
00169 static int worker_thread_start(struct worker_thread *worker);
00170 static int worker_idle(struct worker_thread *worker);
00171 static void worker_set_state(struct worker_thread *worker, enum worker_state state);
00172 static void worker_shutdown(struct worker_thread *worker);
00173 
00174 /*!
00175  * \brief Notify the threadpool listener that the state has changed.
00176  *
00177  * This notifies the threadpool listener via its state_changed callback.
00178  * \param pool The threadpool whose state has changed
00179  */
00180 static void threadpool_send_state_changed(struct ast_threadpool *pool)
00181 {
00182    int active_size = ao2_container_count(pool->active_threads);
00183    int idle_size = ao2_container_count(pool->idle_threads);
00184 
00185    if (pool->listener && pool->listener->callbacks->state_changed) {
00186       pool->listener->callbacks->state_changed(pool, pool->listener, active_size, idle_size);
00187    }
00188 }
00189 
00190 /*!
00191  * \brief Struct used for queued operations involving worker state changes
00192  */
00193 struct thread_worker_pair {
00194    /*! Threadpool that contains the worker whose state has changed */
00195    struct ast_threadpool *pool;
00196    /*! Worker whose state has changed */
00197    struct worker_thread *worker;
00198 };
00199 
00200 /*!
00201  * \brief Destructor for thread_worker_pair
00202  */
00203 static void thread_worker_pair_destructor(void *obj)
00204 {
00205    struct thread_worker_pair *pair = obj;
00206    ao2_ref(pair->worker, -1);
00207 }
00208 
00209 /*!
00210  * \brief Allocate and initialize a thread_worker_pair
00211  * \param pool Threadpool to assign to the thread_worker_pair
00212  * \param worker Worker thread to assign to the thread_worker_pair
00213  */
00214 static struct thread_worker_pair *thread_worker_pair_alloc(struct ast_threadpool *pool,
00215       struct worker_thread *worker)
00216 {
00217    struct thread_worker_pair *pair = ao2_alloc(sizeof(*pair), thread_worker_pair_destructor);
00218    if (!pair) {
00219       return NULL;
00220    }
00221    pair->pool = pool;
00222    ao2_ref(worker, +1);
00223    pair->worker = worker;
00224    return pair;
00225 }
00226 
00227 /*!
00228  * \brief Move a worker thread from the active container to the idle container.
00229  *
00230  * This function is called from the threadpool's control taskprocessor thread.
00231  * \param data A thread_worker_pair containing the threadpool and the worker to move.
00232  * \return 0
00233  */
00234 static int queued_active_thread_idle(void *data)
00235 {
00236    struct thread_worker_pair *pair = data;
00237 
00238    ao2_link(pair->pool->idle_threads, pair->worker);
00239    ao2_unlink(pair->pool->active_threads, pair->worker);
00240 
00241    threadpool_send_state_changed(pair->pool);
00242 
00243    ao2_ref(pair, -1);
00244    return 0;
00245 }
00246 
00247 /*!
00248  * \brief Queue a task to move a thread from the active list to the idle list
00249  *
00250  * This is called by a worker thread when it runs out of tasks to perform and
00251  * goes idle.
00252  * \param pool The threadpool to which the worker belongs
00253  * \param worker The worker thread that has gone idle
00254  */
00255 static void threadpool_active_thread_idle(struct ast_threadpool *pool,
00256       struct worker_thread *worker)
00257 {
00258    struct thread_worker_pair *pair;
00259    SCOPED_AO2LOCK(lock, pool);
00260    if (pool->shutting_down) {
00261       return;
00262    }
00263    pair = thread_worker_pair_alloc(pool, worker);
00264    if (!pair) {
00265       return;
00266    }
00267    ast_taskprocessor_push(pool->control_tps, queued_active_thread_idle, pair);
00268 }
00269 
00270 /*!
00271  * \brief Kill a zombie thread
00272  *
00273  * This runs from the threadpool's control taskprocessor thread.
00274  *
00275  * \param data A thread_worker_pair containing the threadpool and the zombie thread
00276  * \return 0
00277  */
00278 static int queued_zombie_thread_dead(void *data)
00279 {
00280    struct thread_worker_pair *pair = data;
00281 
00282    ao2_unlink(pair->pool->zombie_threads, pair->worker);
00283    threadpool_send_state_changed(pair->pool);
00284 
00285    ao2_ref(pair, -1);
00286    return 0;
00287 }
00288 
00289 /*!
00290  * \brief Queue a task to kill a zombie thread
00291  *
00292  * This is called by a worker thread when it acknowledges that it is time for
00293  * it to die.
00294  */
00295 static void threadpool_zombie_thread_dead(struct ast_threadpool *pool,
00296       struct worker_thread *worker)
00297 {
00298    struct thread_worker_pair *pair;
00299    SCOPED_AO2LOCK(lock, pool);
00300    if (pool->shutting_down) {
00301       return;
00302    }
00303    pair = thread_worker_pair_alloc(pool, worker);
00304    if (!pair) {
00305       return;
00306    }
00307    ast_taskprocessor_push(pool->control_tps, queued_zombie_thread_dead, pair);
00308 }
00309 
00310 static int queued_idle_thread_dead(void *data)
00311 {
00312    struct thread_worker_pair *pair = data;
00313 
00314    ao2_unlink(pair->pool->idle_threads, pair->worker);
00315    threadpool_send_state_changed(pair->pool);
00316 
00317    ao2_ref(pair, -1);
00318    return 0;
00319 }
00320 
00321 static void threadpool_idle_thread_dead(struct ast_threadpool *pool,
00322       struct worker_thread *worker)
00323 {
00324    struct thread_worker_pair *pair;
00325    SCOPED_AO2LOCK(lock, pool);
00326    if (pool->shutting_down) {
00327       return;
00328    }
00329    pair = thread_worker_pair_alloc(pool, worker);
00330    if (!pair) {
00331       return;
00332    }
00333    ast_taskprocessor_push(pool->control_tps, queued_idle_thread_dead, pair);
00334 }
00335 
00336 /*!
00337  * \brief Execute a task in the threadpool
00338  *
00339  * This is the function that worker threads call in order to execute tasks
00340  * in the threadpool
00341  *
00342  * \param pool The pool to which the tasks belong.
00343  * \retval 0 Either the pool has been shut down or there are no tasks.
00344  * \retval 1 There are still tasks remaining in the pool.
00345  */
00346 static int threadpool_execute(struct ast_threadpool *pool)
00347 {
00348    ao2_lock(pool);
00349    if (!pool->shutting_down) {
00350       ao2_unlock(pool);
00351       return ast_taskprocessor_execute(pool->tps);
00352    }
00353    ao2_unlock(pool);
00354    return 0;
00355 }
00356 
00357 /*!
00358  * \brief Destroy a threadpool's components.
00359  *
00360  * This is the destructor called automatically when the threadpool's
00361  * reference count reaches zero. This is not to be confused with
00362  * threadpool_destroy.
00363  *
00364  * By the time this actually gets called, most of the cleanup has already
00365  * been done in the pool. The only thing left to do is to release the
00366  * final reference to the threadpool listener.
00367  *
00368  * \param obj The pool to destroy
00369  */
00370 static void threadpool_destructor(void *obj)
00371 {
00372    struct ast_threadpool *pool = obj;
00373    ao2_cleanup(pool->listener);
00374 }
00375 
00376 /*
00377  * \brief Allocate a threadpool
00378  *
00379  * This is implemented as a taskprocessor listener's alloc callback. This
00380  * is because the threadpool exists as the private data on a taskprocessor
00381  * listener.
00382  *
00383  * \param name The name of the threadpool.
00384  * \param options The options the threadpool uses.
00385  * \retval NULL Could not initialize threadpool properly
00386  * \retval non-NULL The newly-allocated threadpool
00387  */
00388 static struct ast_threadpool *threadpool_alloc(const char *name, const struct ast_threadpool_options *options)
00389 {
00390    RAII_VAR(struct ast_threadpool *, pool, NULL, ao2_cleanup);
00391    struct ast_str *control_tps_name;
00392 
00393    pool = ao2_alloc(sizeof(*pool), threadpool_destructor);
00394    control_tps_name = ast_str_create(64);
00395    if (!pool || !control_tps_name) {
00396       ast_free(control_tps_name);
00397       return NULL;
00398    }
00399 
00400    ast_str_set(&control_tps_name, 0, "%s-control", name);
00401 
00402    pool->control_tps = ast_taskprocessor_get(ast_str_buffer(control_tps_name), TPS_REF_DEFAULT);
00403    ast_free(control_tps_name);
00404    if (!pool->control_tps) {
00405       return NULL;
00406    }
00407    pool->active_threads = ao2_container_alloc(THREAD_BUCKETS, worker_thread_hash, worker_thread_cmp);
00408    if (!pool->active_threads) {
00409       return NULL;
00410    }
00411    pool->idle_threads = ao2_container_alloc(THREAD_BUCKETS, worker_thread_hash, worker_thread_cmp);
00412    if (!pool->idle_threads) {
00413       return NULL;
00414    }
00415    pool->zombie_threads = ao2_container_alloc(THREAD_BUCKETS, worker_thread_hash, worker_thread_cmp);
00416    if (!pool->zombie_threads) {
00417       return NULL;
00418    }
00419    pool->options = *options;
00420 
00421    ao2_ref(pool, +1);
00422    return pool;
00423 }
00424 
00425 static int threadpool_tps_start(struct ast_taskprocessor_listener *listener)
00426 {
00427    return 0;
00428 }
00429 
00430 /*!
00431  * \brief helper used for queued task when tasks are pushed
00432  */
00433 struct task_pushed_data {
00434    /*! Pool into which a task was pushed */
00435    struct ast_threadpool *pool;
00436    /*! Indicator of whether the pool had no tasks prior to the new task being added */
00437    int was_empty;
00438 };
00439 
00440 /*!
00441  * \brief Allocate and initialize a task_pushed_data
00442  * \param pool The threadpool to set in the task_pushed_data
00443  * \param was_empty The was_empty value to set in the task_pushed_data
00444  * \retval NULL Unable to allocate task_pushed_data
00445  * \retval non-NULL The newly-allocated task_pushed_data
00446  */
00447 static struct task_pushed_data *task_pushed_data_alloc(struct ast_threadpool *pool,
00448       int was_empty)
00449 {
00450    struct task_pushed_data *tpd = ao2_alloc(sizeof(*tpd), NULL);
00451 
00452    if (!tpd) {
00453       return NULL;
00454    }
00455    tpd->pool = pool;
00456    tpd->was_empty = was_empty;
00457    return tpd;
00458 }
00459 
00460 /*!
00461  * \brief Activate idle threads
00462  *
00463  * This function always returns CMP_MATCH because all workers that this
00464  * function acts on need to be seen as matches so they are unlinked from the
00465  * list of idle threads.
00466  *
00467  * Called as an ao2_callback in the threadpool's control taskprocessor thread.
00468  * \param obj The worker to activate
00469  * \param arg The pool where the worker belongs
00470  * \retval CMP_MATCH
00471  */
00472 static int activate_thread(void *obj, void *arg, int flags)
00473 {
00474    struct worker_thread *worker = obj;
00475    struct ast_threadpool *pool = arg;
00476 
00477    if (!ao2_link(pool->active_threads, worker)) {
00478       /* If we can't link the idle thread into the active container, then
00479        * we'll just leave the thread idle and not wake it up.
00480        */
00481       ast_log(LOG_WARNING, "Failed to activate thread %d. Remaining idle\n",
00482             worker->id);
00483       return 0;
00484    }
00485    worker_set_state(worker, ALIVE);
00486    return CMP_MATCH;
00487 }
00488 
00489 /*!
00490  * \brief Add threads to the threadpool
00491  *
00492  * This function is called from the threadpool's control taskprocessor thread.
00493  * \param pool The pool that is expanding
00494  * \delta The number of threads to add to the pool
00495  */
00496 static void grow(struct ast_threadpool *pool, int delta)
00497 {
00498    int i;
00499 
00500    int current_size = ao2_container_count(pool->active_threads) +
00501       ao2_container_count(pool->idle_threads);
00502 
00503    if (pool->options.max_size && current_size + delta > pool->options.max_size) {
00504       delta = pool->options.max_size - current_size;
00505    }
00506 
00507    ast_debug(3, "Increasing threadpool %s's size by %d\n",
00508          ast_taskprocessor_name(pool->tps), delta);
00509 
00510    for (i = 0; i < delta; ++i) {
00511       struct worker_thread *worker = worker_thread_alloc(pool);
00512       if (!worker) {
00513          return;
00514       }
00515       if (ao2_link(pool->idle_threads, worker)) {
00516          if (worker_thread_start(worker)) {
00517             ast_log(LOG_ERROR, "Unable to start worker thread %d. Destroying.\n", worker->id);
00518             ao2_unlink(pool->active_threads, worker);
00519          }
00520       } else {
00521          ast_log(LOG_WARNING, "Failed to activate worker thread %d. Destroying.\n", worker->id);
00522       }
00523       ao2_ref(worker, -1);
00524    }
00525 }
00526 
00527 /*!
00528  * \brief Queued task called when tasks are pushed into the threadpool
00529  *
00530  * This function first calls into the threadpool's listener to let it know
00531  * that a task has been pushed. It then wakes up all idle threads and moves
00532  * them into the active thread container.
00533  * \param data A task_pushed_data
00534  * \return 0
00535  */
00536 static int queued_task_pushed(void *data)
00537 {
00538    struct task_pushed_data *tpd = data;
00539    struct ast_threadpool *pool = tpd->pool;
00540    int was_empty = tpd->was_empty;
00541 
00542    if (pool->listener && pool->listener->callbacks->task_pushed) {
00543       pool->listener->callbacks->task_pushed(pool, pool->listener, was_empty);
00544    }
00545    if (ao2_container_count(pool->idle_threads) == 0) {
00546       if (!pool->options.auto_increment) {
00547          return 0;
00548       }
00549       grow(pool, pool->options.auto_increment);
00550    }
00551 
00552    ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA,
00553          activate_thread, pool);
00554 
00555    threadpool_send_state_changed(pool);
00556    ao2_ref(tpd, -1);
00557    return 0;
00558 }
00559 
00560 /*!
00561  * \brief Taskprocessor listener callback called when a task is added
00562  *
00563  * The threadpool uses this opportunity to queue a task on its control taskprocessor
00564  * in order to activate idle threads and notify the threadpool listener that the
00565  * task has been pushed.
00566  * \param listener The taskprocessor listener. The threadpool is the listener's private data
00567  * \param was_empty True if the taskprocessor was empty prior to the task being pushed
00568  */
00569 static void threadpool_tps_task_pushed(struct ast_taskprocessor_listener *listener,
00570       int was_empty)
00571 {
00572    struct ast_threadpool *pool = ast_taskprocessor_listener_get_user_data(listener);
00573    struct task_pushed_data *tpd;
00574    SCOPED_AO2LOCK(lock, pool);
00575 
00576    if (pool->shutting_down) {
00577       return;
00578    }
00579    tpd = task_pushed_data_alloc(pool, was_empty);
00580    if (!tpd) {
00581       return;
00582    }
00583 
00584    ast_taskprocessor_push(pool->control_tps, queued_task_pushed, tpd);
00585 }
00586 
00587 /*!
00588  * \brief Queued task that handles the case where the threadpool's taskprocessor is emptied
00589  *
00590  * This simply lets the threadpool's listener know that the threadpool is devoid of tasks
00591  * \param data The pool that has become empty
00592  * \return 0
00593  */
00594 static int queued_emptied(void *data)
00595 {
00596    struct ast_threadpool *pool = data;
00597 
00598    /* We already checked for existence of this callback when this was queued */
00599    pool->listener->callbacks->emptied(pool, pool->listener);
00600    return 0;
00601 }
00602 
00603 /*!
00604  * \brief Taskprocessor listener emptied callback
00605  *
00606  * The threadpool queues a task to let the threadpool listener know that
00607  * the threadpool no longer contains any tasks.
00608  * \param listener The taskprocessor listener. The threadpool is the listener's private data.
00609  */
00610 static void threadpool_tps_emptied(struct ast_taskprocessor_listener *listener)
00611 {
00612    struct ast_threadpool *pool = ast_taskprocessor_listener_get_user_data(listener);
00613    SCOPED_AO2LOCK(lock, pool);
00614 
00615    if (pool->shutting_down) {
00616       return;
00617    }
00618 
00619    if (pool->listener && pool->listener->callbacks->emptied) {
00620       ast_taskprocessor_push(pool->control_tps, queued_emptied, pool);
00621    }
00622 }
00623 
00624 /*!
00625  * \brief Taskprocessor listener shutdown callback
00626  *
00627  * The threadpool will shut down and destroy all of its worker threads when
00628  * this is called back. By the time this gets called, the taskprocessor's
00629  * control taskprocessor has already been destroyed. Therefore there is no risk
00630  * in outright destroying the worker threads here.
00631  * \param listener The taskprocessor listener. The threadpool is the listener's private data.
00632  */
00633 static void threadpool_tps_shutdown(struct ast_taskprocessor_listener *listener)
00634 {
00635    struct ast_threadpool *pool = ast_taskprocessor_listener_get_user_data(listener);
00636 
00637    if (pool->listener && pool->listener->callbacks->shutdown) {
00638       pool->listener->callbacks->shutdown(pool->listener);
00639    }
00640    ao2_cleanup(pool->active_threads);
00641    ao2_cleanup(pool->idle_threads);
00642    ao2_cleanup(pool->zombie_threads);
00643    ao2_cleanup(pool);
00644 }
00645 
00646 /*!
00647  * \brief Table of taskprocessor listener callbacks for threadpool's main taskprocessor
00648  */
00649 static struct ast_taskprocessor_listener_callbacks threadpool_tps_listener_callbacks = {
00650    .start = threadpool_tps_start,
00651    .task_pushed = threadpool_tps_task_pushed,
00652    .emptied = threadpool_tps_emptied,
00653    .shutdown = threadpool_tps_shutdown,
00654 };
00655 
00656 /*!
00657  * \brief ao2 callback to kill a set number of threads.
00658  *
00659  * Threads will be unlinked from the container as long as the
00660  * counter has not reached zero. The counter is decremented with
00661  * each thread that is removed.
00662  * \param obj The worker thread up for possible destruction
00663  * \param arg The counter
00664  * \param flags Unused
00665  * \retval CMP_MATCH The counter has not reached zero, so this flag should be removed.
00666  * \retval CMP_STOP The counter has reached zero so no more threads should be removed.
00667  */
00668 static int kill_threads(void *obj, void *arg, int flags)
00669 {
00670    int *num_to_kill = arg;
00671 
00672    if (*num_to_kill > 0) {
00673       --(*num_to_kill);
00674       return CMP_MATCH;
00675    } else {
00676       return CMP_STOP;
00677    }
00678 }
00679 
00680 /*!
00681  * \brief ao2 callback to zombify a set number of threads.
00682  *
00683  * Threads will be zombified as long as as the counter has not reached
00684  * zero. The counter is decremented with each thread that is zombified.
00685  *
00686  * Zombifying a thread involves removing it from its current container,
00687  * adding it to the zombie container, and changing the state of the
00688  * worker to a zombie
00689  *
00690  * This callback is called from the threadpool control taskprocessor thread.
00691  *
00692  * \param obj The worker thread that may be zombified
00693  * \param arg The pool to which the worker belongs
00694  * \param data The counter
00695  * \param flags Unused
00696  * \retval CMP_MATCH The zombified thread should be removed from its current container
00697  * \retval CMP_STOP Stop attempting to zombify threads
00698  */
00699 static int zombify_threads(void *obj, void *arg, void *data, int flags)
00700 {
00701    struct worker_thread *worker = obj;
00702    struct ast_threadpool *pool = arg;
00703    int *num_to_zombify = data;
00704 
00705    if ((*num_to_zombify)-- > 0) {
00706       if (!ao2_link(pool->zombie_threads, worker)) {
00707          ast_log(LOG_WARNING, "Failed to zombify active thread %d. Thread will remain active\n", worker->id);
00708          return 0;
00709       }
00710       worker_set_state(worker, ZOMBIE);
00711       return CMP_MATCH;
00712    } else {
00713       return CMP_STOP;
00714    }
00715 }
00716 
00717 /*!
00718  * \brief Remove threads from the threadpool
00719  *
00720  * The preference is to kill idle threads. However, if there are
00721  * more threads to remove than there are idle threads, then active
00722  * threads will be zombified instead.
00723  *
00724  * This function is called from the threadpool control taskprocessor thread.
00725  *
00726  * \param pool The threadpool to remove threads from
00727  * \param delta The number of threads to remove
00728  */
00729 static void shrink(struct ast_threadpool *pool, int delta)
00730 {
00731    /*
00732     * Preference is to kill idle threads, but
00733     * we'll move on to deactivating active threads
00734     * if we have to
00735     */
00736    int idle_threads = ao2_container_count(pool->idle_threads);
00737    int idle_threads_to_kill = MIN(delta, idle_threads);
00738    int active_threads_to_zombify = delta - idle_threads_to_kill;
00739 
00740    ast_debug(3, "Destroying %d idle threads in threadpool %s\n", idle_threads_to_kill,
00741          ast_taskprocessor_name(pool->tps));
00742 
00743    ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA | OBJ_MULTIPLE,
00744          kill_threads, &idle_threads_to_kill);
00745 
00746    ast_debug(3, "Destroying %d active threads in threadpool %s\n", active_threads_to_zombify,
00747          ast_taskprocessor_name(pool->tps));
00748 
00749    ao2_callback_data(pool->active_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA | OBJ_MULTIPLE,
00750          zombify_threads, pool, &active_threads_to_zombify);
00751 }
00752 
00753 /*!
00754  * \brief Helper struct used for queued operations that change the size of the threadpool
00755  */
00756 struct set_size_data {
00757    /*! The pool whose size is to change */
00758    struct ast_threadpool *pool;
00759    /*! The requested new size of the pool */
00760    unsigned int size;
00761 };
00762 
00763 /*!
00764  * \brief Allocate and initialize a set_size_data
00765  * \param pool The pool for the set_size_data
00766  * \param size The size to store in the set_size_data
00767  */
00768 static struct set_size_data *set_size_data_alloc(struct ast_threadpool *pool,
00769       unsigned int size)
00770 {
00771    struct set_size_data *ssd = ao2_alloc(sizeof(*ssd), NULL);
00772    if (!ssd) {
00773       return NULL;
00774    }
00775 
00776    ssd->pool = pool;
00777    ssd->size = size;
00778    return ssd;
00779 }
00780 
00781 /*!
00782  * \brief Change the size of the threadpool
00783  *
00784  * This can either result in shrinking or growing the threadpool depending
00785  * on the new desired size and the current size.
00786  *
00787  * This function is run from the threadpool control taskprocessor thread
00788  *
00789  * \param data A set_size_data used for determining how to act
00790  * \return 0
00791  */
00792 static int queued_set_size(void *data)
00793 {
00794    RAII_VAR(struct set_size_data *, ssd, data, ao2_cleanup);
00795    struct ast_threadpool *pool = ssd->pool;
00796    unsigned int num_threads = ssd->size;
00797 
00798    /* We don't count zombie threads as being "live" when potentially resizing */
00799    unsigned int current_size = ao2_container_count(pool->active_threads) +
00800       ao2_container_count(pool->idle_threads);
00801 
00802    if (current_size == num_threads) {
00803       ast_debug(3, "Not changing threadpool size since new size %u is the same as current %u\n",
00804            num_threads, current_size);
00805       return 0;
00806    }
00807 
00808    if (current_size < num_threads) {
00809       grow(pool, num_threads - current_size);
00810       ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA | OBJ_MULTIPLE,
00811             activate_thread, pool);
00812    } else {
00813       shrink(pool, current_size - num_threads);
00814    }
00815 
00816    threadpool_send_state_changed(pool);
00817    return 0;
00818 }
00819 
00820 void ast_threadpool_set_size(struct ast_threadpool *pool, unsigned int size)
00821 {
00822    struct set_size_data *ssd;
00823    SCOPED_AO2LOCK(lock, pool);
00824    if (pool->shutting_down) {
00825       return;
00826    }
00827 
00828    ssd = set_size_data_alloc(pool, size);
00829    if (!ssd) {
00830       return;
00831    }
00832 
00833    ast_taskprocessor_push(pool->control_tps, queued_set_size, ssd);
00834 }
00835 
00836 struct ast_threadpool_listener *ast_threadpool_listener_alloc(
00837       const struct ast_threadpool_listener_callbacks *callbacks, void *user_data)
00838 {
00839    struct ast_threadpool_listener *listener = ao2_alloc(sizeof(*listener), NULL);
00840    if (!listener) {
00841       return NULL;
00842    }
00843    listener->callbacks = callbacks;
00844    listener->user_data = user_data;
00845    return listener;
00846 }
00847 
00848 void *ast_threadpool_listener_get_user_data(const struct ast_threadpool_listener *listener)
00849 {
00850    return listener->user_data;
00851 }
00852 
00853 struct pool_options_pair {
00854    struct ast_threadpool *pool;
00855    struct ast_threadpool_options options;
00856 };
00857 
00858 struct ast_threadpool *ast_threadpool_create(const char *name,
00859       struct ast_threadpool_listener *listener,
00860       const struct ast_threadpool_options *options)
00861 {
00862    struct ast_taskprocessor *tps;
00863    RAII_VAR(struct ast_taskprocessor_listener *, tps_listener, NULL, ao2_cleanup);
00864    RAII_VAR(struct ast_threadpool *, pool, NULL, ao2_cleanup);
00865 
00866    pool = threadpool_alloc(name, options);
00867    if (!pool) {
00868       return NULL;
00869    }
00870 
00871    tps_listener = ast_taskprocessor_listener_alloc(&threadpool_tps_listener_callbacks, pool);
00872    if (!tps_listener) {
00873       return NULL;
00874    }
00875 
00876    if (options->version != AST_THREADPOOL_OPTIONS_VERSION) {
00877       ast_log(LOG_WARNING, "Incompatible version of threadpool options in use.\n");
00878       return NULL;
00879    }
00880 
00881    tps = ast_taskprocessor_create_with_listener(name, tps_listener);
00882    if (!tps) {
00883       return NULL;
00884    }
00885 
00886    pool->tps = tps;
00887    if (listener) {
00888       ao2_ref(listener, +1);
00889       pool->listener = listener;
00890    }
00891    ast_threadpool_set_size(pool, pool->options.initial_size);
00892    ao2_ref(pool, +1);
00893    return pool;
00894 }
00895 
00896 int ast_threadpool_push(struct ast_threadpool *pool, int (*task)(void *data), void *data)
00897 {
00898    SCOPED_AO2LOCK(lock, pool);
00899    if (!pool->shutting_down) {
00900       return ast_taskprocessor_push(pool->tps, task, data);
00901    }
00902    return -1;
00903 }
00904 
00905 void ast_threadpool_shutdown(struct ast_threadpool *pool)
00906 {
00907    if (!pool) {
00908       return;
00909    }
00910    /* Shut down the taskprocessors and everything else just
00911     * takes care of itself via the taskprocessor callbacks
00912     */
00913    ao2_lock(pool);
00914    pool->shutting_down = 1;
00915    ao2_unlock(pool);
00916    ast_taskprocessor_unreference(pool->control_tps);
00917    ast_taskprocessor_unreference(pool->tps);
00918 }
00919 
00920 /*!
00921  * A monotonically increasing integer used for worker
00922  * thread identification.
00923  */
00924 static int worker_id_counter;
00925 
00926 static int worker_thread_hash(const void *obj, int flags)
00927 {
00928    const struct worker_thread *worker = obj;
00929 
00930    return worker->id;
00931 }
00932 
00933 static int worker_thread_cmp(void *obj, void *arg, int flags)
00934 {
00935    struct worker_thread *worker1 = obj;
00936    struct worker_thread *worker2 = arg;
00937 
00938    return worker1->id == worker2->id ? CMP_MATCH : 0;
00939 }
00940 
00941 /*!
00942  * \brief shut a worker thread down
00943  *
00944  * Set the worker dead and then wait for its thread
00945  * to finish executing.
00946  *
00947  * \param worker The worker thread to shut down
00948  */
00949 static void worker_shutdown(struct worker_thread *worker)
00950 {
00951    worker_set_state(worker, DEAD);
00952    if (worker->thread != AST_PTHREADT_NULL) {
00953       pthread_join(worker->thread, NULL);
00954       worker->thread = AST_PTHREADT_NULL;
00955    }
00956 }
00957 
00958 /*!
00959  * \brief Worker thread destructor
00960  *
00961  * Called automatically when refcount reaches 0. Shuts
00962  * down the worker thread and destroys its component
00963  * parts
00964  */
00965 static void worker_thread_destroy(void *obj)
00966 {
00967    struct worker_thread *worker = obj;
00968    ast_debug(3, "Destroying worker thread %d\n", worker->id);
00969    worker_shutdown(worker);
00970    ast_mutex_destroy(&worker->lock);
00971    ast_cond_destroy(&worker->cond);
00972 }
00973 
00974 /*!
00975  * \brief start point for worker threads
00976  *
00977  * Worker threads start in the active state but may
00978  * immediately go idle if there is no work to be
00979  * done
00980  *
00981  * \param arg The worker thread
00982  * \retval NULL
00983  */
00984 static void *worker_start(void *arg)
00985 {
00986    struct worker_thread *worker = arg;
00987 
00988    if (worker->options.thread_start) {
00989       worker->options.thread_start();
00990    }
00991 
00992    ast_mutex_lock(&worker->lock);
00993    while (worker_idle(worker)) {
00994       ast_mutex_unlock(&worker->lock);
00995       worker_active(worker);
00996       ast_mutex_lock(&worker->lock);
00997       if (worker->state != ALIVE) {
00998          break;
00999       }
01000       threadpool_active_thread_idle(worker->pool, worker);
01001    }
01002    ast_mutex_unlock(&worker->lock);
01003 
01004    /* Reaching this portion means the thread is
01005     * on death's door. It may have been killed while
01006     * it was idle, in which case it can just die
01007     * peacefully. If it's a zombie, though, then
01008     * it needs to let the pool know so
01009     * that the thread can be removed from the
01010     * list of zombie threads.
01011     */
01012    if (worker->state == ZOMBIE) {
01013       threadpool_zombie_thread_dead(worker->pool, worker);
01014    }
01015 
01016    if (worker->options.thread_end) {
01017       worker->options.thread_end();
01018    }
01019    return NULL;
01020 }
01021 
01022 /*!
01023  * \brief Allocate and initialize a new worker thread
01024  *
01025  * This will create, initialize, and start the thread.
01026  *
01027  * \param pool The threadpool to which the worker will be added
01028  * \retval NULL Failed to allocate or start the worker thread
01029  * \retval non-NULL The newly-created worker thread
01030  */
01031 static struct worker_thread *worker_thread_alloc(struct ast_threadpool *pool)
01032 {
01033    struct worker_thread *worker = ao2_alloc(sizeof(*worker), worker_thread_destroy);
01034    if (!worker) {
01035       return NULL;
01036    }
01037    worker->id = ast_atomic_fetchadd_int(&worker_id_counter, 1);
01038    ast_mutex_init(&worker->lock);
01039    ast_cond_init(&worker->cond, NULL);
01040    worker->pool = pool;
01041    worker->thread = AST_PTHREADT_NULL;
01042    worker->state = ALIVE;
01043    worker->options = pool->options;
01044    return worker;
01045 }
01046 
01047 static int worker_thread_start(struct worker_thread *worker)
01048 {
01049    return ast_pthread_create(&worker->thread, NULL, worker_start, worker);
01050 }
01051 
01052 /*!
01053  * \brief Active loop for worker threads
01054  *
01055  * The worker will stay in this loop for its lifetime,
01056  * executing tasks as they become available. If there
01057  * are no tasks currently available, then the thread
01058  * will go idle.
01059  *
01060  * \param worker The worker thread executing tasks.
01061  */
01062 static void worker_active(struct worker_thread *worker)
01063 {
01064    int alive;
01065 
01066    /* The following is equivalent to 
01067     *
01068     * while (threadpool_execute(worker->pool));
01069     *
01070     * However, reviewers have suggested in the past
01071     * doing that can cause optimizers to (wrongly)
01072     * optimize the code away.
01073     */
01074    do {
01075       alive = threadpool_execute(worker->pool);
01076    } while (alive);
01077 }
01078 
01079 /*!
01080  * \brief Idle function for worker threads
01081  *
01082  * The worker waits here until it gets told by the threadpool
01083  * to wake up.
01084  *
01085  * worker is locked before entering this function.
01086  *
01087  * \param worker The idle worker
01088  * \retval 0 The thread is being woken up so that it can conclude.
01089  * \retval non-zero The thread is being woken up to do more work.
01090  */
01091 static int worker_idle(struct worker_thread *worker)
01092 {
01093    struct timeval start = ast_tvnow();
01094    struct timespec end = {
01095       .tv_sec = start.tv_sec + worker->options.idle_timeout,
01096       .tv_nsec = start.tv_usec * 1000,
01097    };
01098    while (!worker->wake_up) {
01099       if (worker->options.idle_timeout <= 0) {
01100          ast_cond_wait(&worker->cond, &worker->lock);
01101       } else if (ast_cond_timedwait(&worker->cond, &worker->lock, &end) == ETIMEDOUT) {
01102          break;
01103       }
01104    }
01105 
01106    if (!worker->wake_up) {
01107       ast_debug(1, "Worker thread idle timeout reached. Dying.\n");
01108       threadpool_idle_thread_dead(worker->pool, worker);
01109       worker->state = DEAD;
01110    }
01111    worker->wake_up = 0;
01112    return worker->state == ALIVE;
01113 }
01114 
01115 /*!
01116  * \brief Change a worker's state
01117  *
01118  * The threadpool calls into this function in order to let a worker know
01119  * how it should proceed.
01120  */
01121 static void worker_set_state(struct worker_thread *worker, enum worker_state state)
01122 {
01123    SCOPED_MUTEX(lock, &worker->lock);
01124    worker->state = state;
01125    worker->wake_up = 1;
01126    ast_cond_signal(&worker->cond);
01127 }
01128 
01129 struct serializer {
01130    struct ast_threadpool *pool;
01131 };
01132 
01133 static void serializer_dtor(void *obj)
01134 {
01135    struct serializer *ser = obj;
01136    ao2_cleanup(ser->pool);
01137    ser->pool = NULL;
01138 }
01139 
01140 static struct serializer *serializer_create(struct ast_threadpool *pool)
01141 {
01142    struct serializer *ser;
01143 
01144    ser = ao2_alloc_options(sizeof(*ser), serializer_dtor, AO2_ALLOC_OPT_LOCK_NOLOCK);
01145    if (!ser) {
01146       return NULL;
01147    }
01148    ao2_ref(pool, +1);
01149    ser->pool = pool;
01150    return ser;
01151 }
01152 
01153 static int execute_tasks(void *data)
01154 {
01155    struct ast_taskprocessor *tps = data;
01156 
01157    while (ast_taskprocessor_execute(tps)) {
01158       /* No-op */
01159    }
01160 
01161    ast_taskprocessor_unreference(tps);
01162    return 0;
01163 }
01164 
01165 static void serializer_task_pushed(struct ast_taskprocessor_listener *listener, int was_empty)
01166 {
01167    if (was_empty) {
01168       struct serializer *ser = ast_taskprocessor_listener_get_user_data(listener);
01169       struct ast_taskprocessor *tps = ast_taskprocessor_listener_get_tps(listener);
01170 
01171       if (ast_threadpool_push(ser->pool, execute_tasks, tps)) {
01172          ast_taskprocessor_unreference(tps);
01173       }
01174    }
01175 }
01176 
01177 static int serializer_start(struct ast_taskprocessor_listener *listener)
01178 {
01179    /* No-op */
01180    return 0;
01181 }
01182 
01183 static void serializer_shutdown(struct ast_taskprocessor_listener *listener)
01184 {
01185    struct serializer *ser = ast_taskprocessor_listener_get_user_data(listener);
01186    ao2_cleanup(ser);
01187 }
01188 
01189 static struct ast_taskprocessor_listener_callbacks serializer_tps_listener_callbacks = {
01190    .task_pushed = serializer_task_pushed,
01191    .start = serializer_start,
01192    .shutdown = serializer_shutdown,
01193 };
01194 
01195 struct ast_taskprocessor *ast_threadpool_serializer(const char *name, struct ast_threadpool *pool)
01196 {
01197    RAII_VAR(struct serializer *, ser, NULL, ao2_cleanup);
01198    RAII_VAR(struct ast_taskprocessor_listener *, listener, NULL, ao2_cleanup);
01199    struct ast_taskprocessor *tps = NULL;
01200 
01201    ser = serializer_create(pool);
01202    if (!ser) {
01203       return NULL;
01204    }
01205 
01206    listener = ast_taskprocessor_listener_alloc(&serializer_tps_listener_callbacks, ser);
01207    if (!listener) {
01208       return NULL;
01209    }
01210    ser = NULL; /* ownership transferred to listener */
01211 
01212    tps = ast_taskprocessor_create_with_listener(name, listener);
01213    if (!tps) {
01214       return NULL;
01215    }
01216 
01217    return tps;
01218 }

Generated on Thu Apr 16 06:28:01 2015 for Asterisk - The Open Source Telephony Project by  doxygen 1.5.6