1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063 |
- /*
- * Asterisk -- An open source telephony toolkit.
- *
- * Copyright (C) 2007-2013, Digium, Inc.
- *
- * Dwayne M. Hubbard <dhubbard@digium.com>
- *
- * See http://www.asterisk.org for more information about
- * the Asterisk project. Please do not directly contact
- * any of the maintainers of this project for assistance;
- * the project provides a web site, mailing lists and IRC
- * channels for your use.
- *
- * This program is free software, distributed under the terms of
- * the GNU General Public License Version 2. See the LICENSE file
- * at the top of the source tree.
- */
- /*!
- * \file
- * \brief Maintain a container of uniquely-named taskprocessor threads that can be shared across modules.
- *
- * \author Dwayne Hubbard <dhubbard@digium.com>
- */
- /*** MODULEINFO
- <support_level>core</support_level>
- ***/
- #include "asterisk.h"
- ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
- #include "asterisk/_private.h"
- #include "asterisk/module.h"
- #include "asterisk/time.h"
- #include "asterisk/astobj2.h"
- #include "asterisk/cli.h"
- #include "asterisk/taskprocessor.h"
- #include "asterisk/sem.h"
- /*!
- * \brief tps_task structure is queued to a taskprocessor
- *
- * tps_tasks are processed in FIFO order and freed by the taskprocessing
- * thread after the task handler returns. The callback function that is assigned
- * to the execute() function pointer is responsible for releasing datap resources if necessary.
- */
- struct tps_task {
- /*! \brief The execute() task callback function pointer */
- union {
- int (*execute)(void *datap);
- int (*execute_local)(struct ast_taskprocessor_local *local);
- } callback;
- /*! \brief The data pointer for the task execute() function */
- void *datap;
- /*! \brief AST_LIST_ENTRY overhead */
- AST_LIST_ENTRY(tps_task) list;
- unsigned int wants_local:1;
- };
- /*! \brief tps_taskprocessor_stats maintain statistics for a taskprocessor. */
- struct tps_taskprocessor_stats {
- /*! \brief This is the maximum number of tasks queued at any one time */
- unsigned long max_qsize;
- /*! \brief This is the current number of tasks processed */
- unsigned long _tasks_processed_count;
- };
- /*! \brief A ast_taskprocessor structure is a singleton by name */
- struct ast_taskprocessor {
- /*! \brief Taskprocessor statistics */
- struct tps_taskprocessor_stats stats;
- void *local_data;
- /*! \brief Taskprocessor current queue size */
- long tps_queue_size;
- /*! \brief Taskprocessor low water clear alert level */
- long tps_queue_low;
- /*! \brief Taskprocessor high water alert trigger level */
- long tps_queue_high;
- /*! \brief Taskprocessor queue */
- AST_LIST_HEAD_NOLOCK(tps_queue, tps_task) tps_queue;
- struct ast_taskprocessor_listener *listener;
- /*! Current thread executing the tasks */
- pthread_t thread;
- /*! Indicates if the taskprocessor is currently executing a task */
- unsigned int executing:1;
- /*! Indicates that a high water warning has been issued on this task processor */
- unsigned int high_water_warned:1;
- /*! Indicates that a high water alert is active on this taskprocessor */
- unsigned int high_water_alert:1;
- /*! Indicates if the taskprocessor is currently suspended */
- unsigned int suspended:1;
- /*! \brief Friendly name of the taskprocessor */
- char name[0];
- };
- /*!
- * \brief A listener for taskprocessors
- *
- * \since 12.0.0
- *
- * When a taskprocessor's state changes, the listener
- * is notified of the change. This allows for tasks
- * to be addressed in whatever way is appropriate for
- * the module using the taskprocessor.
- */
- struct ast_taskprocessor_listener {
- /*! The callbacks the taskprocessor calls into to notify of state changes */
- const struct ast_taskprocessor_listener_callbacks *callbacks;
- /*! The taskprocessor that the listener is listening to */
- struct ast_taskprocessor *tps;
- /*! Data private to the listener */
- void *user_data;
- };
- #ifdef LOW_MEMORY
- #define TPS_MAX_BUCKETS 61
- #else
- /*! \brief Number of buckets in the tps_singletons container. */
- #define TPS_MAX_BUCKETS 1567
- #endif
- /*! \brief tps_singletons is the astobj2 container for taskprocessor singletons */
- static struct ao2_container *tps_singletons;
- /*! \brief CLI <example>taskprocessor ping <blah></example> operation requires a ping condition */
- static ast_cond_t cli_ping_cond;
- /*! \brief CLI <example>taskprocessor ping <blah></example> operation requires a ping condition lock */
- AST_MUTEX_DEFINE_STATIC(cli_ping_cond_lock);
- /*! \brief The astobj2 hash callback for taskprocessors */
- static int tps_hash_cb(const void *obj, const int flags);
- /*! \brief The astobj2 compare callback for taskprocessors */
- static int tps_cmp_cb(void *obj, void *arg, int flags);
- /*! \brief CLI <example>taskprocessor ping <blah></example> handler function */
- static int tps_ping_handler(void *datap);
- static char *cli_tps_ping(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a);
- static char *cli_tps_report(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a);
- static struct ast_cli_entry taskprocessor_clis[] = {
- AST_CLI_DEFINE(cli_tps_ping, "Ping a named task processor"),
- AST_CLI_DEFINE(cli_tps_report, "List instantiated task processors and statistics"),
- };
- struct default_taskprocessor_listener_pvt {
- pthread_t poll_thread;
- int dead;
- struct ast_sem sem;
- };
- static void default_listener_pvt_destroy(struct default_taskprocessor_listener_pvt *pvt)
- {
- ast_assert(pvt->dead);
- ast_sem_destroy(&pvt->sem);
- ast_free(pvt);
- }
- static void default_listener_pvt_dtor(struct ast_taskprocessor_listener *listener)
- {
- struct default_taskprocessor_listener_pvt *pvt = listener->user_data;
- default_listener_pvt_destroy(pvt);
- listener->user_data = NULL;
- }
- /*!
- * \brief Function that processes tasks in the taskprocessor
- * \internal
- */
- static void *default_tps_processing_function(void *data)
- {
- struct ast_taskprocessor_listener *listener = data;
- struct ast_taskprocessor *tps = listener->tps;
- struct default_taskprocessor_listener_pvt *pvt = listener->user_data;
- int sem_value;
- int res;
- while (!pvt->dead) {
- res = ast_sem_wait(&pvt->sem);
- if (res != 0 && errno != EINTR) {
- ast_log(LOG_ERROR, "ast_sem_wait(): %s\n",
- strerror(errno));
- /* Just give up */
- break;
- }
- ast_taskprocessor_execute(tps);
- }
- /* No posting to a dead taskprocessor! */
- res = ast_sem_getvalue(&pvt->sem, &sem_value);
- ast_assert(res == 0 && sem_value == 0);
- /* Free the shutdown reference (see default_listener_shutdown) */
- ao2_t_ref(listener->tps, -1, "tps-shutdown");
- return NULL;
- }
- static int default_listener_start(struct ast_taskprocessor_listener *listener)
- {
- struct default_taskprocessor_listener_pvt *pvt = listener->user_data;
- if (ast_pthread_create(&pvt->poll_thread, NULL, default_tps_processing_function, listener)) {
- return -1;
- }
- return 0;
- }
- static void default_task_pushed(struct ast_taskprocessor_listener *listener, int was_empty)
- {
- struct default_taskprocessor_listener_pvt *pvt = listener->user_data;
- if (ast_sem_post(&pvt->sem) != 0) {
- ast_log(LOG_ERROR, "Failed to notify of enqueued task: %s\n",
- strerror(errno));
- }
- }
- static int default_listener_die(void *data)
- {
- struct default_taskprocessor_listener_pvt *pvt = data;
- pvt->dead = 1;
- return 0;
- }
- static void default_listener_shutdown(struct ast_taskprocessor_listener *listener)
- {
- struct default_taskprocessor_listener_pvt *pvt = listener->user_data;
- int res;
- /* Hold a reference during shutdown */
- ao2_t_ref(listener->tps, +1, "tps-shutdown");
- if (ast_taskprocessor_push(listener->tps, default_listener_die, pvt)) {
- /* This will cause the thread to exit early without completing tasks already
- * in the queue. This is probably the least bad option in this situation. */
- default_listener_die(pvt);
- }
- ast_assert(pvt->poll_thread != AST_PTHREADT_NULL);
- if (pthread_equal(pthread_self(), pvt->poll_thread)) {
- res = pthread_detach(pvt->poll_thread);
- if (res != 0) {
- ast_log(LOG_ERROR, "pthread_detach(): %s\n", strerror(errno));
- }
- } else {
- res = pthread_join(pvt->poll_thread, NULL);
- if (res != 0) {
- ast_log(LOG_ERROR, "pthread_join(): %s\n", strerror(errno));
- }
- }
- pvt->poll_thread = AST_PTHREADT_NULL;
- }
- static const struct ast_taskprocessor_listener_callbacks default_listener_callbacks = {
- .start = default_listener_start,
- .task_pushed = default_task_pushed,
- .shutdown = default_listener_shutdown,
- .dtor = default_listener_pvt_dtor,
- };
- /*!
- * \internal
- * \brief Clean up resources on Asterisk shutdown
- */
- static void tps_shutdown(void)
- {
- ast_cli_unregister_multiple(taskprocessor_clis, ARRAY_LEN(taskprocessor_clis));
- ao2_t_ref(tps_singletons, -1, "Unref tps_singletons in shutdown");
- tps_singletons = NULL;
- }
- /* initialize the taskprocessor container and register CLI operations */
- int ast_tps_init(void)
- {
- tps_singletons = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_MUTEX, 0,
- TPS_MAX_BUCKETS, tps_hash_cb, NULL, tps_cmp_cb);
- if (!tps_singletons) {
- ast_log(LOG_ERROR, "taskprocessor container failed to initialize!\n");
- return -1;
- }
- ast_cond_init(&cli_ping_cond, NULL);
- ast_cli_register_multiple(taskprocessor_clis, ARRAY_LEN(taskprocessor_clis));
- ast_register_cleanup(tps_shutdown);
- return 0;
- }
- /* allocate resources for the task */
- static struct tps_task *tps_task_alloc(int (*task_exe)(void *datap), void *datap)
- {
- struct tps_task *t;
- if (!task_exe) {
- ast_log(LOG_ERROR, "task_exe is NULL!\n");
- return NULL;
- }
- t = ast_calloc(1, sizeof(*t));
- if (!t) {
- ast_log(LOG_ERROR, "failed to allocate task!\n");
- return NULL;
- }
- t->callback.execute = task_exe;
- t->datap = datap;
- return t;
- }
- static struct tps_task *tps_task_alloc_local(int (*task_exe)(struct ast_taskprocessor_local *local), void *datap)
- {
- struct tps_task *t;
- if (!task_exe) {
- ast_log(LOG_ERROR, "task_exe is NULL!\n");
- return NULL;
- }
- t = ast_calloc(1, sizeof(*t));
- if (!t) {
- ast_log(LOG_ERROR, "failed to allocate task!\n");
- return NULL;
- }
- t->callback.execute_local = task_exe;
- t->datap = datap;
- t->wants_local = 1;
- return t;
- }
- /* release task resources */
- static void *tps_task_free(struct tps_task *task)
- {
- ast_free(task);
- return NULL;
- }
- /* taskprocessor tab completion */
- static char *tps_taskprocessor_tab_complete(struct ast_cli_args *a)
- {
- int tklen;
- struct ast_taskprocessor *p;
- struct ao2_iterator i;
- if (a->pos != 3) {
- return NULL;
- }
- tklen = strlen(a->word);
- i = ao2_iterator_init(tps_singletons, 0);
- while ((p = ao2_iterator_next(&i))) {
- if (!strncasecmp(a->word, p->name, tklen)) {
- if (ast_cli_completion_add(ast_strdup(p->name))) {
- ast_taskprocessor_unreference(p);
- break;
- }
- }
- ast_taskprocessor_unreference(p);
- }
- ao2_iterator_destroy(&i);
- return NULL;
- }
- /* ping task handling function */
- static int tps_ping_handler(void *datap)
- {
- ast_mutex_lock(&cli_ping_cond_lock);
- ast_cond_signal(&cli_ping_cond);
- ast_mutex_unlock(&cli_ping_cond_lock);
- return 0;
- }
- /* ping the specified taskprocessor and display the ping time on the CLI */
- static char *cli_tps_ping(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
- {
- struct timeval begin, end, delta;
- const char *name;
- struct timeval when;
- struct timespec ts;
- struct ast_taskprocessor *tps;
- switch (cmd) {
- case CLI_INIT:
- e->command = "core ping taskprocessor";
- e->usage =
- "Usage: core ping taskprocessor <taskprocessor>\n"
- " Displays the time required for a task to be processed\n";
- return NULL;
- case CLI_GENERATE:
- return tps_taskprocessor_tab_complete(a);
- }
- if (a->argc != 4)
- return CLI_SHOWUSAGE;
- name = a->argv[3];
- if (!(tps = ast_taskprocessor_get(name, TPS_REF_IF_EXISTS))) {
- ast_cli(a->fd, "\nping failed: %s not found\n\n", name);
- return CLI_SUCCESS;
- }
- ast_cli(a->fd, "\npinging %s ...", name);
- /*
- * Wait up to 5 seconds for a ping reply.
- *
- * On a very busy system it could take awhile to get a
- * ping response from some taskprocessors.
- */
- begin = ast_tvnow();
- when = ast_tvadd(begin, ast_samp2tv(5000, 1000));
- ts.tv_sec = when.tv_sec;
- ts.tv_nsec = when.tv_usec * 1000;
- ast_mutex_lock(&cli_ping_cond_lock);
- if (ast_taskprocessor_push(tps, tps_ping_handler, 0) < 0) {
- ast_mutex_unlock(&cli_ping_cond_lock);
- ast_cli(a->fd, "\nping failed: could not push task to %s\n\n", name);
- ast_taskprocessor_unreference(tps);
- return CLI_FAILURE;
- }
- ast_cond_timedwait(&cli_ping_cond, &cli_ping_cond_lock, &ts);
- ast_mutex_unlock(&cli_ping_cond_lock);
- end = ast_tvnow();
- delta = ast_tvsub(end, begin);
- ast_cli(a->fd, "\n\t%24s ping time: %.1ld.%.6ld sec\n\n", name, (long)delta.tv_sec, (long int)delta.tv_usec);
- ast_taskprocessor_unreference(tps);
- return CLI_SUCCESS;
- }
- /*!
- * \internal
- * \brief Taskprocessor ao2 container sort function.
- * \since 13.8.0
- *
- * \param obj_left pointer to the (user-defined part) of an object.
- * \param obj_right pointer to the (user-defined part) of an object.
- * \param flags flags from ao2_callback()
- * OBJ_SEARCH_OBJECT - if set, 'obj_right', is an object.
- * OBJ_SEARCH_KEY - if set, 'obj_right', is a search key item that is not an object.
- * OBJ_SEARCH_PARTIAL_KEY - if set, 'obj_right', is a partial search key item that is not an object.
- *
- * \retval <0 if obj_left < obj_right
- * \retval =0 if obj_left == obj_right
- * \retval >0 if obj_left > obj_right
- */
- static int tps_sort_cb(const void *obj_left, const void *obj_right, int flags)
- {
- const struct ast_taskprocessor *tps_left = obj_left;
- const struct ast_taskprocessor *tps_right = obj_right;
- const char *right_key = obj_right;
- int cmp;
- switch (flags & OBJ_SEARCH_MASK) {
- default:
- case OBJ_SEARCH_OBJECT:
- right_key = tps_right->name;
- /* Fall through */
- case OBJ_SEARCH_KEY:
- cmp = strcasecmp(tps_left->name, right_key);
- break;
- case OBJ_SEARCH_PARTIAL_KEY:
- cmp = strncasecmp(tps_left->name, right_key, strlen(right_key));
- break;
- }
- return cmp;
- }
- static char *cli_tps_report(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
- {
- char name[256];
- int tcount;
- unsigned long qsize;
- unsigned long maxqsize;
- unsigned long processed;
- struct ao2_container *sorted_tps;
- struct ast_taskprocessor *tps;
- struct ao2_iterator iter;
- #define FMT_HEADERS "%-45s %10s %10s %10s %10s %10s\n"
- #define FMT_FIELDS "%-45s %10lu %10lu %10lu %10lu %10lu\n"
- switch (cmd) {
- case CLI_INIT:
- e->command = "core show taskprocessors";
- e->usage =
- "Usage: core show taskprocessors\n"
- " Shows a list of instantiated task processors and their statistics\n";
- return NULL;
- case CLI_GENERATE:
- return NULL;
- }
- if (a->argc != e->args) {
- return CLI_SHOWUSAGE;
- }
- sorted_tps = ao2_container_alloc_rbtree(AO2_ALLOC_OPT_LOCK_NOLOCK, 0, tps_sort_cb,
- NULL);
- if (!sorted_tps
- || ao2_container_dup(sorted_tps, tps_singletons, 0)) {
- ao2_cleanup(sorted_tps);
- return CLI_FAILURE;
- }
- ast_cli(a->fd, "\n" FMT_HEADERS, "Processor", "Processed", "In Queue", "Max Depth", "Low water", "High water");
- tcount = 0;
- iter = ao2_iterator_init(sorted_tps, AO2_ITERATOR_UNLINK);
- while ((tps = ao2_iterator_next(&iter))) {
- ast_copy_string(name, tps->name, sizeof(name));
- qsize = tps->tps_queue_size;
- maxqsize = tps->stats.max_qsize;
- processed = tps->stats._tasks_processed_count;
- ast_cli(a->fd, FMT_FIELDS, name, processed, qsize, maxqsize,
- tps->tps_queue_low, tps->tps_queue_high);
- ast_taskprocessor_unreference(tps);
- ++tcount;
- }
- ao2_iterator_destroy(&iter);
- ast_cli(a->fd, "\n%d taskprocessors\n\n", tcount);
- ao2_ref(sorted_tps, -1);
- return CLI_SUCCESS;
- }
- /* hash callback for astobj2 */
- static int tps_hash_cb(const void *obj, const int flags)
- {
- const struct ast_taskprocessor *tps = obj;
- const char *name = flags & OBJ_KEY ? obj : tps->name;
- return ast_str_case_hash(name);
- }
- /* compare callback for astobj2 */
- static int tps_cmp_cb(void *obj, void *arg, int flags)
- {
- struct ast_taskprocessor *lhs = obj, *rhs = arg;
- const char *rhsname = flags & OBJ_KEY ? arg : rhs->name;
- return !strcasecmp(lhs->name, rhsname) ? CMP_MATCH | CMP_STOP : 0;
- }
- /*! Count of the number of taskprocessors in high water alert. */
- static unsigned int tps_alert_count;
- /*! Access protection for tps_alert_count */
- AST_RWLOCK_DEFINE_STATIC(tps_alert_lock);
- /*!
- * \internal
- * \brief Add a delta to tps_alert_count with protection.
- * \since 13.10.0
- *
- * \param tps Taskprocessor updating queue water mark alert trigger.
- * \param delta The amount to add to tps_alert_count.
- *
- * \return Nothing
- */
- static void tps_alert_add(struct ast_taskprocessor *tps, int delta)
- {
- unsigned int old;
- ast_rwlock_wrlock(&tps_alert_lock);
- old = tps_alert_count;
- tps_alert_count += delta;
- if (DEBUG_ATLEAST(3)
- /* and tps_alert_count becomes zero or non-zero */
- && !old != !tps_alert_count) {
- ast_log(LOG_DEBUG, "Taskprocessor '%s' %s the high water alert.\n",
- tps->name, tps_alert_count ? "triggered" : "cleared");
- }
- ast_rwlock_unlock(&tps_alert_lock);
- }
- unsigned int ast_taskprocessor_alert_get(void)
- {
- unsigned int count;
- ast_rwlock_rdlock(&tps_alert_lock);
- count = tps_alert_count;
- ast_rwlock_unlock(&tps_alert_lock);
- return count;
- }
- int ast_taskprocessor_alert_set_levels(struct ast_taskprocessor *tps, long low_water, long high_water)
- {
- if (!tps || high_water < 0 || high_water < low_water) {
- return -1;
- }
- if (low_water < 0) {
- /* Set low water level to 90% of high water level */
- low_water = (high_water * 9) / 10;
- }
- ao2_lock(tps);
- tps->tps_queue_low = low_water;
- tps->tps_queue_high = high_water;
- if (tps->high_water_alert) {
- if (!tps->tps_queue_size || tps->tps_queue_size < low_water) {
- /* Update water mark alert immediately */
- tps->high_water_alert = 0;
- tps_alert_add(tps, -1);
- }
- } else {
- if (high_water < tps->tps_queue_size) {
- /* Update water mark alert immediately */
- tps->high_water_alert = 1;
- tps_alert_add(tps, +1);
- }
- }
- ao2_unlock(tps);
- return 0;
- }
- /* destroy the taskprocessor */
- static void tps_taskprocessor_dtor(void *tps)
- {
- struct ast_taskprocessor *t = tps;
- struct tps_task *task;
- while ((task = AST_LIST_REMOVE_HEAD(&t->tps_queue, list))) {
- tps_task_free(task);
- }
- t->tps_queue_size = 0;
- if (t->high_water_alert) {
- t->high_water_alert = 0;
- tps_alert_add(t, -1);
- }
- ao2_cleanup(t->listener);
- t->listener = NULL;
- }
- /* pop the front task and return it */
- static struct tps_task *tps_taskprocessor_pop(struct ast_taskprocessor *tps)
- {
- struct tps_task *task;
- if ((task = AST_LIST_REMOVE_HEAD(&tps->tps_queue, list))) {
- --tps->tps_queue_size;
- if (tps->high_water_alert && tps->tps_queue_size <= tps->tps_queue_low) {
- tps->high_water_alert = 0;
- tps_alert_add(tps, -1);
- }
- }
- return task;
- }
- long ast_taskprocessor_size(struct ast_taskprocessor *tps)
- {
- return (tps) ? tps->tps_queue_size : -1;
- }
- /* taskprocessor name accessor */
- const char *ast_taskprocessor_name(struct ast_taskprocessor *tps)
- {
- if (!tps) {
- ast_log(LOG_ERROR, "no taskprocessor specified!\n");
- return NULL;
- }
- return tps->name;
- }
- static void listener_shutdown(struct ast_taskprocessor_listener *listener)
- {
- listener->callbacks->shutdown(listener);
- ao2_ref(listener->tps, -1);
- }
- static void taskprocessor_listener_dtor(void *obj)
- {
- struct ast_taskprocessor_listener *listener = obj;
- if (listener->callbacks->dtor) {
- listener->callbacks->dtor(listener);
- }
- }
- struct ast_taskprocessor_listener *ast_taskprocessor_listener_alloc(const struct ast_taskprocessor_listener_callbacks *callbacks, void *user_data)
- {
- struct ast_taskprocessor_listener *listener;
- listener = ao2_alloc(sizeof(*listener), taskprocessor_listener_dtor);
- if (!listener) {
- return NULL;
- }
- listener->callbacks = callbacks;
- listener->user_data = user_data;
- return listener;
- }
- struct ast_taskprocessor *ast_taskprocessor_listener_get_tps(const struct ast_taskprocessor_listener *listener)
- {
- ao2_ref(listener->tps, +1);
- return listener->tps;
- }
- void *ast_taskprocessor_listener_get_user_data(const struct ast_taskprocessor_listener *listener)
- {
- return listener->user_data;
- }
- static void *default_listener_pvt_alloc(void)
- {
- struct default_taskprocessor_listener_pvt *pvt;
- pvt = ast_calloc(1, sizeof(*pvt));
- if (!pvt) {
- return NULL;
- }
- pvt->poll_thread = AST_PTHREADT_NULL;
- if (ast_sem_init(&pvt->sem, 0, 0) != 0) {
- ast_log(LOG_ERROR, "ast_sem_init(): %s\n", strerror(errno));
- ast_free(pvt);
- return NULL;
- }
- return pvt;
- }
- /*!
- * \internal
- * \brief Allocate a task processor structure
- *
- * \param name Name of the task processor.
- * \param listener Listener to associate with the task processor.
- *
- * \return The newly allocated task processor.
- *
- * \pre tps_singletons must be locked by the caller.
- */
- static struct ast_taskprocessor *__allocate_taskprocessor(const char *name, struct ast_taskprocessor_listener *listener)
- {
- struct ast_taskprocessor *p;
- p = ao2_alloc(sizeof(*p) + strlen(name) + 1, tps_taskprocessor_dtor);
- if (!p) {
- ast_log(LOG_WARNING, "failed to create taskprocessor '%s'\n", name);
- return NULL;
- }
- /* Set default congestion water level alert triggers. */
- p->tps_queue_low = (AST_TASKPROCESSOR_HIGH_WATER_LEVEL * 9) / 10;
- p->tps_queue_high = AST_TASKPROCESSOR_HIGH_WATER_LEVEL;
- strcpy(p->name, name); /*SAFE*/
- ao2_ref(listener, +1);
- p->listener = listener;
- p->thread = AST_PTHREADT_NULL;
- ao2_ref(p, +1);
- listener->tps = p;
- if (!(ao2_link_flags(tps_singletons, p, OBJ_NOLOCK))) {
- ast_log(LOG_ERROR, "Failed to add taskprocessor '%s' to container\n", p->name);
- listener->tps = NULL;
- ao2_ref(p, -2);
- return NULL;
- }
- return p;
- }
- static struct ast_taskprocessor *__start_taskprocessor(struct ast_taskprocessor *p)
- {
- if (p && p->listener->callbacks->start(p->listener)) {
- ast_log(LOG_ERROR, "Unable to start taskprocessor listener for taskprocessor %s\n",
- p->name);
- ast_taskprocessor_unreference(p);
- return NULL;
- }
- return p;
- }
- /* Provide a reference to a taskprocessor. Create the taskprocessor if necessary, but don't
- * create the taskprocessor if we were told via ast_tps_options to return a reference only
- * if it already exists */
- struct ast_taskprocessor *ast_taskprocessor_get(const char *name, enum ast_tps_options create)
- {
- struct ast_taskprocessor *p;
- struct ast_taskprocessor_listener *listener;
- struct default_taskprocessor_listener_pvt *pvt;
- if (ast_strlen_zero(name)) {
- ast_log(LOG_ERROR, "requesting a nameless taskprocessor!!!\n");
- return NULL;
- }
- ao2_lock(tps_singletons);
- p = ao2_find(tps_singletons, name, OBJ_KEY | OBJ_NOLOCK);
- if (p || (create & TPS_REF_IF_EXISTS)) {
- /* calling function does not want a new taskprocessor to be created if it doesn't already exist */
- ao2_unlock(tps_singletons);
- return p;
- }
- /* Create a new taskprocessor. Start by creating a default listener */
- pvt = default_listener_pvt_alloc();
- if (!pvt) {
- ao2_unlock(tps_singletons);
- return NULL;
- }
- listener = ast_taskprocessor_listener_alloc(&default_listener_callbacks, pvt);
- if (!listener) {
- ao2_unlock(tps_singletons);
- default_listener_pvt_destroy(pvt);
- return NULL;
- }
- p = __allocate_taskprocessor(name, listener);
- ao2_unlock(tps_singletons);
- p = __start_taskprocessor(p);
- ao2_ref(listener, -1);
- return p;
- }
- struct ast_taskprocessor *ast_taskprocessor_create_with_listener(const char *name, struct ast_taskprocessor_listener *listener)
- {
- struct ast_taskprocessor *p;
- ao2_lock(tps_singletons);
- p = ao2_find(tps_singletons, name, OBJ_KEY | OBJ_NOLOCK);
- if (p) {
- ao2_unlock(tps_singletons);
- ast_taskprocessor_unreference(p);
- return NULL;
- }
- p = __allocate_taskprocessor(name, listener);
- ao2_unlock(tps_singletons);
- return __start_taskprocessor(p);
- }
- void ast_taskprocessor_set_local(struct ast_taskprocessor *tps,
- void *local_data)
- {
- SCOPED_AO2LOCK(lock, tps);
- tps->local_data = local_data;
- }
- /* decrement the taskprocessor reference count and unlink from the container if necessary */
- void *ast_taskprocessor_unreference(struct ast_taskprocessor *tps)
- {
- if (!tps) {
- return NULL;
- }
- /* To prevent another thread from finding and getting a reference to this
- * taskprocessor we hold the singletons lock. If we didn't do this then
- * they may acquire it and find that the listener has been shut down.
- */
- ao2_lock(tps_singletons);
- if (ao2_ref(tps, -1) > 3) {
- ao2_unlock(tps_singletons);
- return NULL;
- }
- /* If we're down to 3 references, then those must be:
- * 1. The reference we just got rid of
- * 2. The container
- * 3. The listener
- */
- ao2_unlink_flags(tps_singletons, tps, OBJ_NOLOCK);
- ao2_unlock(tps_singletons);
- listener_shutdown(tps->listener);
- return NULL;
- }
- /* push the task into the taskprocessor queue */
- static int taskprocessor_push(struct ast_taskprocessor *tps, struct tps_task *t)
- {
- int previous_size;
- int was_empty;
- if (!tps) {
- ast_log(LOG_ERROR, "tps is NULL!\n");
- return -1;
- }
- if (!t) {
- ast_log(LOG_ERROR, "t is NULL!\n");
- return -1;
- }
- ao2_lock(tps);
- AST_LIST_INSERT_TAIL(&tps->tps_queue, t, list);
- previous_size = tps->tps_queue_size++;
- if (tps->tps_queue_high <= tps->tps_queue_size) {
- if (!tps->high_water_alert) {
- ast_log(LOG_WARNING, "The '%s' task processor queue reached %ld scheduled tasks%s.\n",
- tps->name, tps->tps_queue_size, tps->high_water_warned ? " again" : "");
- tps->high_water_warned = 1;
- tps->high_water_alert = 1;
- tps_alert_add(tps, +1);
- }
- }
- /* The currently executing task counts as still in queue */
- was_empty = tps->executing ? 0 : previous_size == 0;
- ao2_unlock(tps);
- tps->listener->callbacks->task_pushed(tps->listener, was_empty);
- return 0;
- }
- int ast_taskprocessor_push(struct ast_taskprocessor *tps, int (*task_exe)(void *datap), void *datap)
- {
- return taskprocessor_push(tps, tps_task_alloc(task_exe, datap));
- }
- int ast_taskprocessor_push_local(struct ast_taskprocessor *tps, int (*task_exe)(struct ast_taskprocessor_local *datap), void *datap)
- {
- return taskprocessor_push(tps, tps_task_alloc_local(task_exe, datap));
- }
- int ast_taskprocessor_suspend(struct ast_taskprocessor *tps)
- {
- if (tps) {
- ao2_lock(tps);
- tps->suspended = 1;
- ao2_unlock(tps);
- return 0;
- }
- return -1;
- }
- int ast_taskprocessor_unsuspend(struct ast_taskprocessor *tps)
- {
- if (tps) {
- ao2_lock(tps);
- tps->suspended = 0;
- ao2_unlock(tps);
- return 0;
- }
- return -1;
- }
- int ast_taskprocessor_is_suspended(struct ast_taskprocessor *tps)
- {
- return tps ? tps->suspended : -1;
- }
- int ast_taskprocessor_execute(struct ast_taskprocessor *tps)
- {
- struct ast_taskprocessor_local local;
- struct tps_task *t;
- long size;
- ao2_lock(tps);
- t = tps_taskprocessor_pop(tps);
- if (!t) {
- ao2_unlock(tps);
- return 0;
- }
- tps->thread = pthread_self();
- tps->executing = 1;
- if (t->wants_local) {
- local.local_data = tps->local_data;
- local.data = t->datap;
- }
- ao2_unlock(tps);
- if (t->wants_local) {
- t->callback.execute_local(&local);
- } else {
- t->callback.execute(t->datap);
- }
- tps_task_free(t);
- ao2_lock(tps);
- tps->thread = AST_PTHREADT_NULL;
- /* We need to check size in the same critical section where we reset the
- * executing bit. Avoids a race condition where a task is pushed right
- * after we pop an empty stack.
- */
- tps->executing = 0;
- size = ast_taskprocessor_size(tps);
- /* Update the stats */
- ++tps->stats._tasks_processed_count;
- /* Include the task we just executed as part of the queue size. */
- if (size >= tps->stats.max_qsize) {
- tps->stats.max_qsize = size + 1;
- }
- ao2_unlock(tps);
- /* If we executed a task, check for the transition to empty */
- if (size == 0 && tps->listener->callbacks->emptied) {
- tps->listener->callbacks->emptied(tps->listener);
- }
- return size > 0;
- }
- int ast_taskprocessor_is_task(struct ast_taskprocessor *tps)
- {
- int is_task;
- ao2_lock(tps);
- is_task = pthread_equal(tps->thread, pthread_self());
- ao2_unlock(tps);
- return is_task;
- }
- unsigned int ast_taskprocessor_seq_num(void)
- {
- static int seq_num;
- return (unsigned int) ast_atomic_fetchadd_int(&seq_num, +1);
- }
- void ast_taskprocessor_build_name(char *buf, unsigned int size, const char *format, ...)
- {
- va_list ap;
- int user_size;
- #define SEQ_STR_SIZE (1 + 8 + 1) /* Dash plus 8 hex digits plus null terminator */
- ast_assert(buf != NULL);
- ast_assert(SEQ_STR_SIZE <= size);
- va_start(ap, format);
- user_size = vsnprintf(buf, size - (SEQ_STR_SIZE - 1), format, ap);
- va_end(ap);
- if (user_size < 0) {
- /*
- * Wow! We got an output error to a memory buffer.
- * Assume no user part of name written.
- */
- user_size = 0;
- } else if (size < user_size + SEQ_STR_SIZE) {
- /* Truncate user part of name to make sequence number fit. */
- user_size = size - SEQ_STR_SIZE;
- }
- /* Append sequence number to end of user name. */
- snprintf(buf + user_size, SEQ_STR_SIZE, "-%08x", ast_taskprocessor_seq_num());
- }
|