123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437 |
- /*
- * Asterisk -- An open source telephony toolkit.
- *
- * Copyright (C) 2012-2013, Digium, Inc.
- *
- * Mark Michelson <mmmichelson@digium.com>
- *
- * See http://www.asterisk.org for more information about
- * the Asterisk project. Please do not directly contact
- * any of the maintainers of this project for assistance;
- * the project provides a web site, mailing lists and IRC
- * channels for your use.
- *
- * This program is free software, distributed under the terms of
- * the GNU General Public License Version 2. See the LICENSE file
- * at the top of the source tree.
- */
- #include "asterisk.h"
- #include "asterisk/threadpool.h"
- #include "asterisk/taskprocessor.h"
- #include "asterisk/astobj2.h"
- #include "asterisk/utils.h"
- /* Needs to stay prime if increased */
- #define THREAD_BUCKETS 89
- /*!
- * \brief An opaque threadpool structure
- *
- * A threadpool is a collection of threads that execute
- * tasks from a common queue.
- */
- struct ast_threadpool {
- /*! Threadpool listener */
- struct ast_threadpool_listener *listener;
- /*!
- * \brief The container of active threads.
- * Active threads are those that are currently running tasks
- */
- struct ao2_container *active_threads;
- /*!
- * \brief The container of idle threads.
- * Idle threads are those that are currenly waiting to run tasks
- */
- struct ao2_container *idle_threads;
- /*!
- * \brief The container of zombie threads.
- * Zombie threads may be running tasks, but they are scheduled to die soon
- */
- struct ao2_container *zombie_threads;
- /*!
- * \brief The main taskprocessor
- *
- * Tasks that are queued in this taskprocessor are
- * doled out to the worker threads. Worker threads that
- * execute tasks from the threadpool are executing tasks
- * in this taskprocessor.
- *
- * The threadpool itself is actually the private data for
- * this taskprocessor's listener. This way, as taskprocessor
- * changes occur, the threadpool can alert its listeners
- * appropriately.
- */
- struct ast_taskprocessor *tps;
- /*!
- * \brief The control taskprocessor
- *
- * This is a standard taskprocessor that uses the default
- * taskprocessor listener. In other words, all tasks queued to
- * this taskprocessor have a single thread that executes the
- * tasks.
- *
- * All tasks that modify the state of the threadpool and all tasks
- * that call out to threadpool listeners are pushed to this
- * taskprocessor.
- *
- * For instance, when the threadpool changes sizes, a task is put
- * into this taskprocessor to do so. When it comes time to tell the
- * threadpool listener that worker threads have changed state,
- * the task is placed in this taskprocessor.
- *
- * This is done for three main reasons
- * 1) It ensures that listeners are given an accurate portrayal
- * of the threadpool's current state. In other words, when a listener
- * gets told a count of active, idle and zombie threads, it does not
- * need to worry that internal state of the threadpool might be different
- * from what it has been told.
- * 2) It minimizes the locking required in both the threadpool and in
- * threadpool listener's callbacks.
- * 3) It ensures that listener callbacks are called in the same order
- * that the threadpool had its state change.
- */
- struct ast_taskprocessor *control_tps;
- /*! True if the threadpool is in the process of shutting down */
- int shutting_down;
- /*! Threadpool-specific options */
- struct ast_threadpool_options options;
- };
- /*!
- * \brief listener for a threadpool
- *
- * The listener is notified of changes in a threadpool. It can
- * react by doing things like increasing the number of threads
- * in the pool
- */
- struct ast_threadpool_listener {
- /*! Callbacks called by the threadpool */
- const struct ast_threadpool_listener_callbacks *callbacks;
- /*! User data for the listener */
- void *user_data;
- };
- /*!
- * \brief states for worker threads
- */
- enum worker_state {
- /*! The worker is either active or idle */
- ALIVE,
- /*!
- * The worker has been asked to shut down but
- * may still be in the process of executing tasks.
- * This transition happens when the threadpool needs
- * to shrink and needs to kill active threads in order
- * to do so.
- */
- ZOMBIE,
- /*!
- * The worker has been asked to shut down. Typically
- * only idle threads go to this state directly, but
- * active threads may go straight to this state when
- * the threadpool is shut down.
- */
- DEAD,
- };
- /*!
- * A thread that executes threadpool tasks
- */
- struct worker_thread {
- /*! A unique (within a run of Asterisk) ID for the thread. Used for hashing and searching */
- int id;
- /*! Condition used in conjunction with state changes */
- ast_cond_t cond;
- /*! Lock used alongside the condition for state changes */
- ast_mutex_t lock;
- /*! The actual thread that is executing tasks */
- pthread_t thread;
- /*! A pointer to the threadpool. Needed to be able to execute tasks */
- struct ast_threadpool *pool;
- /*! The current state of the worker thread */
- enum worker_state state;
- /*! A boolean used to determine if an idle thread should become active */
- int wake_up;
- /*! Options for this threadpool */
- struct ast_threadpool_options options;
- };
- /* Worker thread forward declarations. See definitions for documentation */
- static int worker_thread_hash(const void *obj, int flags);
- static int worker_thread_cmp(void *obj, void *arg, int flags);
- static void worker_thread_destroy(void *obj);
- static void worker_active(struct worker_thread *worker);
- static void *worker_start(void *arg);
- static struct worker_thread *worker_thread_alloc(struct ast_threadpool *pool);
- static int worker_thread_start(struct worker_thread *worker);
- static int worker_idle(struct worker_thread *worker);
- static int worker_set_state(struct worker_thread *worker, enum worker_state state);
- static void worker_shutdown(struct worker_thread *worker);
- /*!
- * \brief Notify the threadpool listener that the state has changed.
- *
- * This notifies the threadpool listener via its state_changed callback.
- * \param pool The threadpool whose state has changed
- */
- static void threadpool_send_state_changed(struct ast_threadpool *pool)
- {
- int active_size = ao2_container_count(pool->active_threads);
- int idle_size = ao2_container_count(pool->idle_threads);
- if (pool->listener && pool->listener->callbacks->state_changed) {
- pool->listener->callbacks->state_changed(pool, pool->listener, active_size, idle_size);
- }
- }
- /*!
- * \brief Struct used for queued operations involving worker state changes
- */
- struct thread_worker_pair {
- /*! Threadpool that contains the worker whose state has changed */
- struct ast_threadpool *pool;
- /*! Worker whose state has changed */
- struct worker_thread *worker;
- };
- /*!
- * \brief Destructor for thread_worker_pair
- */
- static void thread_worker_pair_free(struct thread_worker_pair *pair)
- {
- ao2_ref(pair->worker, -1);
- ast_free(pair);
- }
- /*!
- * \brief Allocate and initialize a thread_worker_pair
- * \param pool Threadpool to assign to the thread_worker_pair
- * \param worker Worker thread to assign to the thread_worker_pair
- */
- static struct thread_worker_pair *thread_worker_pair_alloc(struct ast_threadpool *pool,
- struct worker_thread *worker)
- {
- struct thread_worker_pair *pair = ast_malloc(sizeof(*pair));
- if (!pair) {
- return NULL;
- }
- pair->pool = pool;
- ao2_ref(worker, +1);
- pair->worker = worker;
- return pair;
- }
- /*!
- * \brief Move a worker thread from the active container to the idle container.
- *
- * This function is called from the threadpool's control taskprocessor thread.
- * \param data A thread_worker_pair containing the threadpool and the worker to move.
- * \return 0
- */
- static int queued_active_thread_idle(void *data)
- {
- struct thread_worker_pair *pair = data;
- ao2_link(pair->pool->idle_threads, pair->worker);
- ao2_unlink(pair->pool->active_threads, pair->worker);
- threadpool_send_state_changed(pair->pool);
- thread_worker_pair_free(pair);
- return 0;
- }
- /*!
- * \brief Queue a task to move a thread from the active list to the idle list
- *
- * This is called by a worker thread when it runs out of tasks to perform and
- * goes idle.
- * \param pool The threadpool to which the worker belongs
- * \param worker The worker thread that has gone idle
- */
- static void threadpool_active_thread_idle(struct ast_threadpool *pool,
- struct worker_thread *worker)
- {
- struct thread_worker_pair *pair;
- SCOPED_AO2LOCK(lock, pool);
- if (pool->shutting_down) {
- return;
- }
- pair = thread_worker_pair_alloc(pool, worker);
- if (!pair) {
- return;
- }
- if (ast_taskprocessor_push(pool->control_tps, queued_active_thread_idle, pair)) {
- thread_worker_pair_free(pair);
- }
- }
- /*!
- * \brief Kill a zombie thread
- *
- * This runs from the threadpool's control taskprocessor thread.
- *
- * \param data A thread_worker_pair containing the threadpool and the zombie thread
- * \return 0
- */
- static int queued_zombie_thread_dead(void *data)
- {
- struct thread_worker_pair *pair = data;
- ao2_unlink(pair->pool->zombie_threads, pair->worker);
- threadpool_send_state_changed(pair->pool);
- thread_worker_pair_free(pair);
- return 0;
- }
- /*!
- * \brief Queue a task to kill a zombie thread
- *
- * This is called by a worker thread when it acknowledges that it is time for
- * it to die.
- */
- static void threadpool_zombie_thread_dead(struct ast_threadpool *pool,
- struct worker_thread *worker)
- {
- struct thread_worker_pair *pair;
- SCOPED_AO2LOCK(lock, pool);
- if (pool->shutting_down) {
- return;
- }
- pair = thread_worker_pair_alloc(pool, worker);
- if (!pair) {
- return;
- }
- if (ast_taskprocessor_push(pool->control_tps, queued_zombie_thread_dead, pair)) {
- thread_worker_pair_free(pair);
- }
- }
- static int queued_idle_thread_dead(void *data)
- {
- struct thread_worker_pair *pair = data;
- ao2_unlink(pair->pool->idle_threads, pair->worker);
- threadpool_send_state_changed(pair->pool);
- thread_worker_pair_free(pair);
- return 0;
- }
- static void threadpool_idle_thread_dead(struct ast_threadpool *pool,
- struct worker_thread *worker)
- {
- struct thread_worker_pair *pair;
- SCOPED_AO2LOCK(lock, pool);
- if (pool->shutting_down) {
- return;
- }
- pair = thread_worker_pair_alloc(pool, worker);
- if (!pair) {
- return;
- }
- if (ast_taskprocessor_push(pool->control_tps, queued_idle_thread_dead, pair)) {
- thread_worker_pair_free(pair);
- }
- }
- /*!
- * \brief Execute a task in the threadpool
- *
- * This is the function that worker threads call in order to execute tasks
- * in the threadpool
- *
- * \param pool The pool to which the tasks belong.
- * \retval 0 Either the pool has been shut down or there are no tasks.
- * \retval 1 There are still tasks remaining in the pool.
- */
- static int threadpool_execute(struct ast_threadpool *pool)
- {
- ao2_lock(pool);
- if (!pool->shutting_down) {
- ao2_unlock(pool);
- return ast_taskprocessor_execute(pool->tps);
- }
- ao2_unlock(pool);
- return 0;
- }
- /*!
- * \brief Destroy a threadpool's components.
- *
- * This is the destructor called automatically when the threadpool's
- * reference count reaches zero. This is not to be confused with
- * threadpool_destroy.
- *
- * By the time this actually gets called, most of the cleanup has already
- * been done in the pool. The only thing left to do is to release the
- * final reference to the threadpool listener.
- *
- * \param obj The pool to destroy
- */
- static void threadpool_destructor(void *obj)
- {
- struct ast_threadpool *pool = obj;
- ao2_cleanup(pool->listener);
- }
- /*
- * \brief Allocate a threadpool
- *
- * This is implemented as a taskprocessor listener's alloc callback. This
- * is because the threadpool exists as the private data on a taskprocessor
- * listener.
- *
- * \param name The name of the threadpool.
- * \param options The options the threadpool uses.
- * \retval NULL Could not initialize threadpool properly
- * \retval non-NULL The newly-allocated threadpool
- */
- static struct ast_threadpool *threadpool_alloc(const char *name, const struct ast_threadpool_options *options)
- {
- RAII_VAR(struct ast_threadpool *, pool, NULL, ao2_cleanup);
- struct ast_str *control_tps_name;
- pool = ao2_alloc(sizeof(*pool), threadpool_destructor);
- control_tps_name = ast_str_create(64);
- if (!pool || !control_tps_name) {
- ast_free(control_tps_name);
- return NULL;
- }
- ast_str_set(&control_tps_name, 0, "%s-control", name);
- pool->control_tps = ast_taskprocessor_get(ast_str_buffer(control_tps_name), TPS_REF_DEFAULT);
- ast_free(control_tps_name);
- if (!pool->control_tps) {
- return NULL;
- }
- pool->active_threads = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_MUTEX, 0,
- THREAD_BUCKETS, worker_thread_hash, NULL, worker_thread_cmp);
- if (!pool->active_threads) {
- return NULL;
- }
- pool->idle_threads = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_MUTEX, 0,
- THREAD_BUCKETS, worker_thread_hash, NULL, worker_thread_cmp);
- if (!pool->idle_threads) {
- return NULL;
- }
- pool->zombie_threads = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_MUTEX, 0,
- THREAD_BUCKETS, worker_thread_hash, NULL, worker_thread_cmp);
- if (!pool->zombie_threads) {
- return NULL;
- }
- pool->options = *options;
- ao2_ref(pool, +1);
- return pool;
- }
- static int threadpool_tps_start(struct ast_taskprocessor_listener *listener)
- {
- return 0;
- }
- /*!
- * \brief helper used for queued task when tasks are pushed
- */
- struct task_pushed_data {
- /*! Pool into which a task was pushed */
- struct ast_threadpool *pool;
- /*! Indicator of whether the pool had no tasks prior to the new task being added */
- int was_empty;
- };
- /*!
- * \brief Allocate and initialize a task_pushed_data
- * \param pool The threadpool to set in the task_pushed_data
- * \param was_empty The was_empty value to set in the task_pushed_data
- * \retval NULL Unable to allocate task_pushed_data
- * \retval non-NULL The newly-allocated task_pushed_data
- */
- static struct task_pushed_data *task_pushed_data_alloc(struct ast_threadpool *pool,
- int was_empty)
- {
- struct task_pushed_data *tpd = ast_malloc(sizeof(*tpd));
- if (!tpd) {
- return NULL;
- }
- tpd->pool = pool;
- tpd->was_empty = was_empty;
- return tpd;
- }
- /*!
- * \brief Activate idle threads
- *
- * This function always returns CMP_MATCH because all workers that this
- * function acts on need to be seen as matches so they are unlinked from the
- * list of idle threads.
- *
- * Called as an ao2_callback in the threadpool's control taskprocessor thread.
- * \param obj The worker to activate
- * \param arg The pool where the worker belongs
- * \retval CMP_MATCH
- */
- static int activate_thread(void *obj, void *arg, int flags)
- {
- struct worker_thread *worker = obj;
- struct ast_threadpool *pool = arg;
- if (!ao2_link(pool->active_threads, worker)) {
- /* If we can't link the idle thread into the active container, then
- * we'll just leave the thread idle and not wake it up.
- */
- ast_log(LOG_WARNING, "Failed to activate thread %d. Remaining idle\n",
- worker->id);
- return 0;
- }
- if (worker_set_state(worker, ALIVE)) {
- ast_debug(1, "Failed to activate thread %d. It is dead\n",
- worker->id);
- /* The worker thread will no longer exist in the active threads or
- * idle threads container after this.
- */
- ao2_unlink(pool->active_threads, worker);
- }
- return CMP_MATCH;
- }
- /*!
- * \brief Add threads to the threadpool
- *
- * This function is called from the threadpool's control taskprocessor thread.
- * \param pool The pool that is expanding
- * \delta The number of threads to add to the pool
- */
- static void grow(struct ast_threadpool *pool, int delta)
- {
- int i;
- int current_size = ao2_container_count(pool->active_threads) +
- ao2_container_count(pool->idle_threads);
- if (pool->options.max_size && current_size + delta > pool->options.max_size) {
- delta = pool->options.max_size - current_size;
- }
- ast_debug(3, "Increasing threadpool %s's size by %d\n",
- ast_taskprocessor_name(pool->tps), delta);
- for (i = 0; i < delta; ++i) {
- struct worker_thread *worker = worker_thread_alloc(pool);
- if (!worker) {
- return;
- }
- if (ao2_link(pool->idle_threads, worker)) {
- if (worker_thread_start(worker)) {
- ast_log(LOG_ERROR, "Unable to start worker thread %d. Destroying.\n", worker->id);
- ao2_unlink(pool->active_threads, worker);
- }
- } else {
- ast_log(LOG_WARNING, "Failed to activate worker thread %d. Destroying.\n", worker->id);
- }
- ao2_ref(worker, -1);
- }
- }
- /*!
- * \brief Queued task called when tasks are pushed into the threadpool
- *
- * This function first calls into the threadpool's listener to let it know
- * that a task has been pushed. It then wakes up all idle threads and moves
- * them into the active thread container.
- * \param data A task_pushed_data
- * \return 0
- */
- static int queued_task_pushed(void *data)
- {
- struct task_pushed_data *tpd = data;
- struct ast_threadpool *pool = tpd->pool;
- int was_empty = tpd->was_empty;
- unsigned int existing_active;
- ast_free(tpd);
- if (pool->listener && pool->listener->callbacks->task_pushed) {
- pool->listener->callbacks->task_pushed(pool, pool->listener, was_empty);
- }
- existing_active = ao2_container_count(pool->active_threads);
- /* The first pass transitions any existing idle threads to be active, and
- * will also remove any worker threads that have recently entered the dead
- * state.
- */
- ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA,
- activate_thread, pool);
- /* If no idle threads could be transitioned to active grow the pool as permitted. */
- if (ao2_container_count(pool->active_threads) == existing_active) {
- if (!pool->options.auto_increment) {
- return 0;
- }
- grow(pool, pool->options.auto_increment);
- /* An optional second pass transitions any newly added threads. */
- ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA,
- activate_thread, pool);
- }
- threadpool_send_state_changed(pool);
- return 0;
- }
- /*!
- * \brief Taskprocessor listener callback called when a task is added
- *
- * The threadpool uses this opportunity to queue a task on its control taskprocessor
- * in order to activate idle threads and notify the threadpool listener that the
- * task has been pushed.
- * \param listener The taskprocessor listener. The threadpool is the listener's private data
- * \param was_empty True if the taskprocessor was empty prior to the task being pushed
- */
- static void threadpool_tps_task_pushed(struct ast_taskprocessor_listener *listener,
- int was_empty)
- {
- struct ast_threadpool *pool = ast_taskprocessor_listener_get_user_data(listener);
- struct task_pushed_data *tpd;
- SCOPED_AO2LOCK(lock, pool);
- if (pool->shutting_down) {
- return;
- }
- tpd = task_pushed_data_alloc(pool, was_empty);
- if (!tpd) {
- return;
- }
- if (ast_taskprocessor_push(pool->control_tps, queued_task_pushed, tpd)) {
- ast_free(tpd);
- }
- }
- /*!
- * \brief Queued task that handles the case where the threadpool's taskprocessor is emptied
- *
- * This simply lets the threadpool's listener know that the threadpool is devoid of tasks
- * \param data The pool that has become empty
- * \return 0
- */
- static int queued_emptied(void *data)
- {
- struct ast_threadpool *pool = data;
- /* We already checked for existence of this callback when this was queued */
- pool->listener->callbacks->emptied(pool, pool->listener);
- return 0;
- }
- /*!
- * \brief Taskprocessor listener emptied callback
- *
- * The threadpool queues a task to let the threadpool listener know that
- * the threadpool no longer contains any tasks.
- * \param listener The taskprocessor listener. The threadpool is the listener's private data.
- */
- static void threadpool_tps_emptied(struct ast_taskprocessor_listener *listener)
- {
- struct ast_threadpool *pool = ast_taskprocessor_listener_get_user_data(listener);
- SCOPED_AO2LOCK(lock, pool);
- if (pool->shutting_down) {
- return;
- }
- if (pool->listener && pool->listener->callbacks->emptied) {
- if (ast_taskprocessor_push(pool->control_tps, queued_emptied, pool)) {
- /* Nothing to do here but we need the check to keep the compiler happy. */
- }
- }
- }
- /*!
- * \brief Taskprocessor listener shutdown callback
- *
- * The threadpool will shut down and destroy all of its worker threads when
- * this is called back. By the time this gets called, the taskprocessor's
- * control taskprocessor has already been destroyed. Therefore there is no risk
- * in outright destroying the worker threads here.
- * \param listener The taskprocessor listener. The threadpool is the listener's private data.
- */
- static void threadpool_tps_shutdown(struct ast_taskprocessor_listener *listener)
- {
- struct ast_threadpool *pool = ast_taskprocessor_listener_get_user_data(listener);
- if (pool->listener && pool->listener->callbacks->shutdown) {
- pool->listener->callbacks->shutdown(pool->listener);
- }
- ao2_cleanup(pool->active_threads);
- ao2_cleanup(pool->idle_threads);
- ao2_cleanup(pool->zombie_threads);
- ao2_cleanup(pool);
- }
- /*!
- * \brief Table of taskprocessor listener callbacks for threadpool's main taskprocessor
- */
- static struct ast_taskprocessor_listener_callbacks threadpool_tps_listener_callbacks = {
- .start = threadpool_tps_start,
- .task_pushed = threadpool_tps_task_pushed,
- .emptied = threadpool_tps_emptied,
- .shutdown = threadpool_tps_shutdown,
- };
- /*!
- * \brief ao2 callback to kill a set number of threads.
- *
- * Threads will be unlinked from the container as long as the
- * counter has not reached zero. The counter is decremented with
- * each thread that is removed.
- * \param obj The worker thread up for possible destruction
- * \param arg The counter
- * \param flags Unused
- * \retval CMP_MATCH The counter has not reached zero, so this flag should be removed.
- * \retval CMP_STOP The counter has reached zero so no more threads should be removed.
- */
- static int kill_threads(void *obj, void *arg, int flags)
- {
- int *num_to_kill = arg;
- if (*num_to_kill > 0) {
- --(*num_to_kill);
- return CMP_MATCH;
- } else {
- return CMP_STOP;
- }
- }
- /*!
- * \brief ao2 callback to zombify a set number of threads.
- *
- * Threads will be zombified as long as the counter has not reached
- * zero. The counter is decremented with each thread that is zombified.
- *
- * Zombifying a thread involves removing it from its current container,
- * adding it to the zombie container, and changing the state of the
- * worker to a zombie
- *
- * This callback is called from the threadpool control taskprocessor thread.
- *
- * \param obj The worker thread that may be zombified
- * \param arg The pool to which the worker belongs
- * \param data The counter
- * \param flags Unused
- * \retval CMP_MATCH The zombified thread should be removed from its current container
- * \retval CMP_STOP Stop attempting to zombify threads
- */
- static int zombify_threads(void *obj, void *arg, void *data, int flags)
- {
- struct worker_thread *worker = obj;
- struct ast_threadpool *pool = arg;
- int *num_to_zombify = data;
- if ((*num_to_zombify)-- > 0) {
- if (!ao2_link(pool->zombie_threads, worker)) {
- ast_log(LOG_WARNING, "Failed to zombify active thread %d. Thread will remain active\n", worker->id);
- return 0;
- }
- worker_set_state(worker, ZOMBIE);
- return CMP_MATCH;
- } else {
- return CMP_STOP;
- }
- }
- /*!
- * \brief Remove threads from the threadpool
- *
- * The preference is to kill idle threads. However, if there are
- * more threads to remove than there are idle threads, then active
- * threads will be zombified instead.
- *
- * This function is called from the threadpool control taskprocessor thread.
- *
- * \param pool The threadpool to remove threads from
- * \param delta The number of threads to remove
- */
- static void shrink(struct ast_threadpool *pool, int delta)
- {
- /*
- * Preference is to kill idle threads, but
- * we'll move on to deactivating active threads
- * if we have to
- */
- int idle_threads = ao2_container_count(pool->idle_threads);
- int idle_threads_to_kill = MIN(delta, idle_threads);
- int active_threads_to_zombify = delta - idle_threads_to_kill;
- ast_debug(3, "Destroying %d idle threads in threadpool %s\n", idle_threads_to_kill,
- ast_taskprocessor_name(pool->tps));
- ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA | OBJ_MULTIPLE,
- kill_threads, &idle_threads_to_kill);
- ast_debug(3, "Destroying %d active threads in threadpool %s\n", active_threads_to_zombify,
- ast_taskprocessor_name(pool->tps));
- ao2_callback_data(pool->active_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA | OBJ_MULTIPLE,
- zombify_threads, pool, &active_threads_to_zombify);
- }
- /*!
- * \brief Helper struct used for queued operations that change the size of the threadpool
- */
- struct set_size_data {
- /*! The pool whose size is to change */
- struct ast_threadpool *pool;
- /*! The requested new size of the pool */
- unsigned int size;
- };
- /*!
- * \brief Allocate and initialize a set_size_data
- * \param pool The pool for the set_size_data
- * \param size The size to store in the set_size_data
- */
- static struct set_size_data *set_size_data_alloc(struct ast_threadpool *pool,
- unsigned int size)
- {
- struct set_size_data *ssd = ast_malloc(sizeof(*ssd));
- if (!ssd) {
- return NULL;
- }
- ssd->pool = pool;
- ssd->size = size;
- return ssd;
- }
- /*!
- * \brief Change the size of the threadpool
- *
- * This can either result in shrinking or growing the threadpool depending
- * on the new desired size and the current size.
- *
- * This function is run from the threadpool control taskprocessor thread
- *
- * \param data A set_size_data used for determining how to act
- * \return 0
- */
- static int queued_set_size(void *data)
- {
- struct set_size_data *ssd = data;
- struct ast_threadpool *pool = ssd->pool;
- unsigned int num_threads = ssd->size;
- /* We don't count zombie threads as being "live" when potentially resizing */
- unsigned int current_size = ao2_container_count(pool->active_threads) +
- ao2_container_count(pool->idle_threads);
- ast_free(ssd);
- if (current_size == num_threads) {
- ast_debug(3, "Not changing threadpool size since new size %u is the same as current %u\n",
- num_threads, current_size);
- return 0;
- }
- if (current_size < num_threads) {
- ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA | OBJ_MULTIPLE,
- activate_thread, pool);
- /* As the above may have altered the number of current threads update it */
- current_size = ao2_container_count(pool->active_threads) +
- ao2_container_count(pool->idle_threads);
- grow(pool, num_threads - current_size);
- ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA | OBJ_MULTIPLE,
- activate_thread, pool);
- } else {
- shrink(pool, current_size - num_threads);
- }
- threadpool_send_state_changed(pool);
- return 0;
- }
- void ast_threadpool_set_size(struct ast_threadpool *pool, unsigned int size)
- {
- struct set_size_data *ssd;
- SCOPED_AO2LOCK(lock, pool);
- if (pool->shutting_down) {
- return;
- }
- ssd = set_size_data_alloc(pool, size);
- if (!ssd) {
- return;
- }
- if (ast_taskprocessor_push(pool->control_tps, queued_set_size, ssd)) {
- ast_free(ssd);
- }
- }
- struct ast_threadpool_listener *ast_threadpool_listener_alloc(
- const struct ast_threadpool_listener_callbacks *callbacks, void *user_data)
- {
- struct ast_threadpool_listener *listener = ao2_alloc(sizeof(*listener), NULL);
- if (!listener) {
- return NULL;
- }
- listener->callbacks = callbacks;
- listener->user_data = user_data;
- return listener;
- }
- void *ast_threadpool_listener_get_user_data(const struct ast_threadpool_listener *listener)
- {
- return listener->user_data;
- }
- struct pool_options_pair {
- struct ast_threadpool *pool;
- struct ast_threadpool_options options;
- };
- struct ast_threadpool *ast_threadpool_create(const char *name,
- struct ast_threadpool_listener *listener,
- const struct ast_threadpool_options *options)
- {
- struct ast_taskprocessor *tps;
- RAII_VAR(struct ast_taskprocessor_listener *, tps_listener, NULL, ao2_cleanup);
- RAII_VAR(struct ast_threadpool *, pool, NULL, ao2_cleanup);
- pool = threadpool_alloc(name, options);
- if (!pool) {
- return NULL;
- }
- tps_listener = ast_taskprocessor_listener_alloc(&threadpool_tps_listener_callbacks, pool);
- if (!tps_listener) {
- return NULL;
- }
- if (options->version != AST_THREADPOOL_OPTIONS_VERSION) {
- ast_log(LOG_WARNING, "Incompatible version of threadpool options in use.\n");
- return NULL;
- }
- tps = ast_taskprocessor_create_with_listener(name, tps_listener);
- if (!tps) {
- return NULL;
- }
- pool->tps = tps;
- if (listener) {
- ao2_ref(listener, +1);
- pool->listener = listener;
- }
- ast_threadpool_set_size(pool, pool->options.initial_size);
- ao2_ref(pool, +1);
- return pool;
- }
- int ast_threadpool_push(struct ast_threadpool *pool, int (*task)(void *data), void *data)
- {
- SCOPED_AO2LOCK(lock, pool);
- if (!pool->shutting_down) {
- return ast_taskprocessor_push(pool->tps, task, data);
- }
- return -1;
- }
- void ast_threadpool_shutdown(struct ast_threadpool *pool)
- {
- if (!pool) {
- return;
- }
- /* Shut down the taskprocessors and everything else just
- * takes care of itself via the taskprocessor callbacks
- */
- ao2_lock(pool);
- pool->shutting_down = 1;
- ao2_unlock(pool);
- ast_taskprocessor_unreference(pool->control_tps);
- ast_taskprocessor_unreference(pool->tps);
- }
- /*!
- * A monotonically increasing integer used for worker
- * thread identification.
- */
- static int worker_id_counter;
- static int worker_thread_hash(const void *obj, int flags)
- {
- const struct worker_thread *worker = obj;
- return worker->id;
- }
- static int worker_thread_cmp(void *obj, void *arg, int flags)
- {
- struct worker_thread *worker1 = obj;
- struct worker_thread *worker2 = arg;
- return worker1->id == worker2->id ? CMP_MATCH : 0;
- }
- /*!
- * \brief shut a worker thread down
- *
- * Set the worker dead and then wait for its thread
- * to finish executing.
- *
- * \param worker The worker thread to shut down
- */
- static void worker_shutdown(struct worker_thread *worker)
- {
- worker_set_state(worker, DEAD);
- if (worker->thread != AST_PTHREADT_NULL) {
- pthread_join(worker->thread, NULL);
- worker->thread = AST_PTHREADT_NULL;
- }
- }
- /*!
- * \brief Worker thread destructor
- *
- * Called automatically when refcount reaches 0. Shuts
- * down the worker thread and destroys its component
- * parts
- */
- static void worker_thread_destroy(void *obj)
- {
- struct worker_thread *worker = obj;
- ast_debug(3, "Destroying worker thread %d\n", worker->id);
- worker_shutdown(worker);
- ast_mutex_destroy(&worker->lock);
- ast_cond_destroy(&worker->cond);
- }
- /*!
- * \brief start point for worker threads
- *
- * Worker threads start in the active state but may
- * immediately go idle if there is no work to be
- * done
- *
- * \param arg The worker thread
- * \retval NULL
- */
- static void *worker_start(void *arg)
- {
- struct worker_thread *worker = arg;
- enum worker_state saved_state;
- if (worker->options.thread_start) {
- worker->options.thread_start();
- }
- ast_mutex_lock(&worker->lock);
- while (worker_idle(worker)) {
- ast_mutex_unlock(&worker->lock);
- worker_active(worker);
- ast_mutex_lock(&worker->lock);
- if (worker->state != ALIVE) {
- break;
- }
- threadpool_active_thread_idle(worker->pool, worker);
- }
- saved_state = worker->state;
- ast_mutex_unlock(&worker->lock);
- /* Reaching this portion means the thread is
- * on death's door. It may have been killed while
- * it was idle, in which case it can just die
- * peacefully. If it's a zombie, though, then
- * it needs to let the pool know so
- * that the thread can be removed from the
- * list of zombie threads.
- */
- if (saved_state == ZOMBIE) {
- threadpool_zombie_thread_dead(worker->pool, worker);
- }
- if (worker->options.thread_end) {
- worker->options.thread_end();
- }
- return NULL;
- }
- /*!
- * \brief Allocate and initialize a new worker thread
- *
- * This will create, initialize, and start the thread.
- *
- * \param pool The threadpool to which the worker will be added
- * \retval NULL Failed to allocate or start the worker thread
- * \retval non-NULL The newly-created worker thread
- */
- static struct worker_thread *worker_thread_alloc(struct ast_threadpool *pool)
- {
- struct worker_thread *worker = ao2_alloc(sizeof(*worker), worker_thread_destroy);
- if (!worker) {
- return NULL;
- }
- worker->id = ast_atomic_fetchadd_int(&worker_id_counter, 1);
- ast_mutex_init(&worker->lock);
- ast_cond_init(&worker->cond, NULL);
- worker->pool = pool;
- worker->thread = AST_PTHREADT_NULL;
- worker->state = ALIVE;
- worker->options = pool->options;
- return worker;
- }
- static int worker_thread_start(struct worker_thread *worker)
- {
- return ast_pthread_create(&worker->thread, NULL, worker_start, worker);
- }
- /*!
- * \brief Active loop for worker threads
- *
- * The worker will stay in this loop for its lifetime,
- * executing tasks as they become available. If there
- * are no tasks currently available, then the thread
- * will go idle.
- *
- * \param worker The worker thread executing tasks.
- */
- static void worker_active(struct worker_thread *worker)
- {
- int alive;
- /* The following is equivalent to
- *
- * while (threadpool_execute(worker->pool));
- *
- * However, reviewers have suggested in the past
- * doing that can cause optimizers to (wrongly)
- * optimize the code away.
- */
- do {
- alive = threadpool_execute(worker->pool);
- } while (alive);
- }
- /*!
- * \brief Idle function for worker threads
- *
- * The worker waits here until it gets told by the threadpool
- * to wake up.
- *
- * worker is locked before entering this function.
- *
- * \param worker The idle worker
- * \retval 0 The thread is being woken up so that it can conclude.
- * \retval non-zero The thread is being woken up to do more work.
- */
- static int worker_idle(struct worker_thread *worker)
- {
- struct timeval start = ast_tvnow();
- struct timespec end = {
- .tv_sec = start.tv_sec + worker->options.idle_timeout,
- .tv_nsec = start.tv_usec * 1000,
- };
- while (!worker->wake_up) {
- if (worker->options.idle_timeout <= 0) {
- ast_cond_wait(&worker->cond, &worker->lock);
- } else if (ast_cond_timedwait(&worker->cond, &worker->lock, &end) == ETIMEDOUT) {
- break;
- }
- }
- if (!worker->wake_up) {
- ast_debug(1, "Worker thread idle timeout reached. Dying.\n");
- threadpool_idle_thread_dead(worker->pool, worker);
- worker->state = DEAD;
- }
- worker->wake_up = 0;
- return worker->state == ALIVE;
- }
- /*!
- * \brief Change a worker's state
- *
- * The threadpool calls into this function in order to let a worker know
- * how it should proceed.
- *
- * \retval -1 failure (state transition not permitted)
- * \retval 0 success
- */
- static int worker_set_state(struct worker_thread *worker, enum worker_state state)
- {
- SCOPED_MUTEX(lock, &worker->lock);
- switch (state) {
- case ALIVE:
- /* This can occur due to a race condition between being told to go active
- * and an idle timeout happening.
- */
- if (worker->state == DEAD) {
- return -1;
- }
- ast_assert(worker->state != ZOMBIE);
- break;
- case DEAD:
- break;
- case ZOMBIE:
- ast_assert(worker->state != DEAD);
- break;
- }
- worker->state = state;
- worker->wake_up = 1;
- ast_cond_signal(&worker->cond);
- return 0;
- }
- /*! Serializer group shutdown control object. */
- struct ast_serializer_shutdown_group {
- /*! Shutdown thread waits on this conditional. */
- ast_cond_t cond;
- /*! Count of serializers needing to shutdown. */
- int count;
- };
- static void serializer_shutdown_group_dtor(void *vdoomed)
- {
- struct ast_serializer_shutdown_group *doomed = vdoomed;
- ast_cond_destroy(&doomed->cond);
- }
- struct ast_serializer_shutdown_group *ast_serializer_shutdown_group_alloc(void)
- {
- struct ast_serializer_shutdown_group *shutdown_group;
- shutdown_group = ao2_alloc(sizeof(*shutdown_group), serializer_shutdown_group_dtor);
- if (!shutdown_group) {
- return NULL;
- }
- ast_cond_init(&shutdown_group->cond, NULL);
- return shutdown_group;
- }
- int ast_serializer_shutdown_group_join(struct ast_serializer_shutdown_group *shutdown_group, int timeout)
- {
- int remaining;
- ast_mutex_t *lock;
- if (!shutdown_group) {
- return 0;
- }
- lock = ao2_object_get_lockaddr(shutdown_group);
- ast_assert(lock != NULL);
- ao2_lock(shutdown_group);
- if (timeout) {
- struct timeval start;
- struct timespec end;
- start = ast_tvnow();
- end.tv_sec = start.tv_sec + timeout;
- end.tv_nsec = start.tv_usec * 1000;
- while (shutdown_group->count) {
- if (ast_cond_timedwait(&shutdown_group->cond, lock, &end)) {
- /* Error or timed out waiting for the count to reach zero. */
- break;
- }
- }
- } else {
- while (shutdown_group->count) {
- if (ast_cond_wait(&shutdown_group->cond, lock)) {
- /* Error */
- break;
- }
- }
- }
- remaining = shutdown_group->count;
- ao2_unlock(shutdown_group);
- return remaining;
- }
- /*!
- * \internal
- * \brief Increment the number of serializer members in the group.
- * \since 13.5.0
- *
- * \param shutdown_group Group shutdown controller.
- *
- * \return Nothing
- */
- static void serializer_shutdown_group_inc(struct ast_serializer_shutdown_group *shutdown_group)
- {
- ao2_lock(shutdown_group);
- ++shutdown_group->count;
- ao2_unlock(shutdown_group);
- }
- /*!
- * \internal
- * \brief Decrement the number of serializer members in the group.
- * \since 13.5.0
- *
- * \param shutdown_group Group shutdown controller.
- *
- * \return Nothing
- */
- static void serializer_shutdown_group_dec(struct ast_serializer_shutdown_group *shutdown_group)
- {
- ao2_lock(shutdown_group);
- --shutdown_group->count;
- if (!shutdown_group->count) {
- ast_cond_signal(&shutdown_group->cond);
- }
- ao2_unlock(shutdown_group);
- }
- struct serializer {
- /*! Threadpool the serializer will use to process the jobs. */
- struct ast_threadpool *pool;
- /*! Which group will wait for this serializer to shutdown. */
- struct ast_serializer_shutdown_group *shutdown_group;
- };
- static void serializer_dtor(void *obj)
- {
- struct serializer *ser = obj;
- ao2_cleanup(ser->pool);
- ser->pool = NULL;
- ao2_cleanup(ser->shutdown_group);
- ser->shutdown_group = NULL;
- }
- static struct serializer *serializer_create(struct ast_threadpool *pool,
- struct ast_serializer_shutdown_group *shutdown_group)
- {
- struct serializer *ser;
- ser = ao2_alloc_options(sizeof(*ser), serializer_dtor, AO2_ALLOC_OPT_LOCK_NOLOCK);
- if (!ser) {
- return NULL;
- }
- ao2_ref(pool, +1);
- ser->pool = pool;
- ser->shutdown_group = ao2_bump(shutdown_group);
- return ser;
- }
- AST_THREADSTORAGE_RAW(current_serializer);
- static int execute_tasks(void *data)
- {
- struct ast_taskprocessor *tps = data;
- ast_threadstorage_set_ptr(¤t_serializer, tps);
- while (ast_taskprocessor_execute(tps)) {
- /* No-op */
- }
- ast_threadstorage_set_ptr(¤t_serializer, NULL);
- ast_taskprocessor_unreference(tps);
- return 0;
- }
- static void serializer_task_pushed(struct ast_taskprocessor_listener *listener, int was_empty)
- {
- if (was_empty) {
- struct serializer *ser = ast_taskprocessor_listener_get_user_data(listener);
- struct ast_taskprocessor *tps = ast_taskprocessor_listener_get_tps(listener);
- if (ast_threadpool_push(ser->pool, execute_tasks, tps)) {
- ast_taskprocessor_unreference(tps);
- }
- }
- }
- static int serializer_start(struct ast_taskprocessor_listener *listener)
- {
- /* No-op */
- return 0;
- }
- static void serializer_shutdown(struct ast_taskprocessor_listener *listener)
- {
- struct serializer *ser = ast_taskprocessor_listener_get_user_data(listener);
- if (ser->shutdown_group) {
- serializer_shutdown_group_dec(ser->shutdown_group);
- }
- ao2_cleanup(ser);
- }
- static struct ast_taskprocessor_listener_callbacks serializer_tps_listener_callbacks = {
- .task_pushed = serializer_task_pushed,
- .start = serializer_start,
- .shutdown = serializer_shutdown,
- };
- struct ast_taskprocessor *ast_threadpool_serializer_get_current(void)
- {
- return ast_threadstorage_get_ptr(¤t_serializer);
- }
- struct ast_taskprocessor *ast_threadpool_serializer_group(const char *name,
- struct ast_threadpool *pool, struct ast_serializer_shutdown_group *shutdown_group)
- {
- struct serializer *ser;
- struct ast_taskprocessor_listener *listener;
- struct ast_taskprocessor *tps;
- ser = serializer_create(pool, shutdown_group);
- if (!ser) {
- return NULL;
- }
- listener = ast_taskprocessor_listener_alloc(&serializer_tps_listener_callbacks, ser);
- if (!listener) {
- ao2_ref(ser, -1);
- return NULL;
- }
- tps = ast_taskprocessor_create_with_listener(name, listener);
- if (!tps) {
- /* ser ref transferred to listener but not cleaned without tps */
- ao2_ref(ser, -1);
- } else if (shutdown_group) {
- serializer_shutdown_group_inc(shutdown_group);
- }
- ao2_ref(listener, -1);
- return tps;
- }
- struct ast_taskprocessor *ast_threadpool_serializer(const char *name, struct ast_threadpool *pool)
- {
- return ast_threadpool_serializer_group(name, pool, NULL);
- }
- long ast_threadpool_queue_size(struct ast_threadpool *pool)
- {
- return ast_taskprocessor_size(pool->tps);
- }
|