taskprocessor.c 28 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063
  1. /*
  2. * Asterisk -- An open source telephony toolkit.
  3. *
  4. * Copyright (C) 2007-2013, Digium, Inc.
  5. *
  6. * Dwayne M. Hubbard <dhubbard@digium.com>
  7. *
  8. * See http://www.asterisk.org for more information about
  9. * the Asterisk project. Please do not directly contact
  10. * any of the maintainers of this project for assistance;
  11. * the project provides a web site, mailing lists and IRC
  12. * channels for your use.
  13. *
  14. * This program is free software, distributed under the terms of
  15. * the GNU General Public License Version 2. See the LICENSE file
  16. * at the top of the source tree.
  17. */
  18. /*!
  19. * \file
  20. * \brief Maintain a container of uniquely-named taskprocessor threads that can be shared across modules.
  21. *
  22. * \author Dwayne Hubbard <dhubbard@digium.com>
  23. */
  24. /*** MODULEINFO
  25. <support_level>core</support_level>
  26. ***/
  27. #include "asterisk.h"
  28. ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
  29. #include "asterisk/_private.h"
  30. #include "asterisk/module.h"
  31. #include "asterisk/time.h"
  32. #include "asterisk/astobj2.h"
  33. #include "asterisk/cli.h"
  34. #include "asterisk/taskprocessor.h"
  35. #include "asterisk/sem.h"
  36. /*!
  37. * \brief tps_task structure is queued to a taskprocessor
  38. *
  39. * tps_tasks are processed in FIFO order and freed by the taskprocessing
  40. * thread after the task handler returns. The callback function that is assigned
  41. * to the execute() function pointer is responsible for releasing datap resources if necessary.
  42. */
  43. struct tps_task {
  44. /*! \brief The execute() task callback function pointer */
  45. union {
  46. int (*execute)(void *datap);
  47. int (*execute_local)(struct ast_taskprocessor_local *local);
  48. } callback;
  49. /*! \brief The data pointer for the task execute() function */
  50. void *datap;
  51. /*! \brief AST_LIST_ENTRY overhead */
  52. AST_LIST_ENTRY(tps_task) list;
  53. unsigned int wants_local:1;
  54. };
  55. /*! \brief tps_taskprocessor_stats maintain statistics for a taskprocessor. */
  56. struct tps_taskprocessor_stats {
  57. /*! \brief This is the maximum number of tasks queued at any one time */
  58. unsigned long max_qsize;
  59. /*! \brief This is the current number of tasks processed */
  60. unsigned long _tasks_processed_count;
  61. };
  62. /*! \brief A ast_taskprocessor structure is a singleton by name */
  63. struct ast_taskprocessor {
  64. /*! \brief Taskprocessor statistics */
  65. struct tps_taskprocessor_stats stats;
  66. void *local_data;
  67. /*! \brief Taskprocessor current queue size */
  68. long tps_queue_size;
  69. /*! \brief Taskprocessor low water clear alert level */
  70. long tps_queue_low;
  71. /*! \brief Taskprocessor high water alert trigger level */
  72. long tps_queue_high;
  73. /*! \brief Taskprocessor queue */
  74. AST_LIST_HEAD_NOLOCK(tps_queue, tps_task) tps_queue;
  75. struct ast_taskprocessor_listener *listener;
  76. /*! Current thread executing the tasks */
  77. pthread_t thread;
  78. /*! Indicates if the taskprocessor is currently executing a task */
  79. unsigned int executing:1;
  80. /*! Indicates that a high water warning has been issued on this task processor */
  81. unsigned int high_water_warned:1;
  82. /*! Indicates that a high water alert is active on this taskprocessor */
  83. unsigned int high_water_alert:1;
  84. /*! Indicates if the taskprocessor is currently suspended */
  85. unsigned int suspended:1;
  86. /*! \brief Friendly name of the taskprocessor */
  87. char name[0];
  88. };
  89. /*!
  90. * \brief A listener for taskprocessors
  91. *
  92. * \since 12.0.0
  93. *
  94. * When a taskprocessor's state changes, the listener
  95. * is notified of the change. This allows for tasks
  96. * to be addressed in whatever way is appropriate for
  97. * the module using the taskprocessor.
  98. */
  99. struct ast_taskprocessor_listener {
  100. /*! The callbacks the taskprocessor calls into to notify of state changes */
  101. const struct ast_taskprocessor_listener_callbacks *callbacks;
  102. /*! The taskprocessor that the listener is listening to */
  103. struct ast_taskprocessor *tps;
  104. /*! Data private to the listener */
  105. void *user_data;
  106. };
  107. #ifdef LOW_MEMORY
  108. #define TPS_MAX_BUCKETS 61
  109. #else
  110. /*! \brief Number of buckets in the tps_singletons container. */
  111. #define TPS_MAX_BUCKETS 1567
  112. #endif
  113. /*! \brief tps_singletons is the astobj2 container for taskprocessor singletons */
  114. static struct ao2_container *tps_singletons;
  115. /*! \brief CLI <example>taskprocessor ping &lt;blah&gt;</example> operation requires a ping condition */
  116. static ast_cond_t cli_ping_cond;
  117. /*! \brief CLI <example>taskprocessor ping &lt;blah&gt;</example> operation requires a ping condition lock */
  118. AST_MUTEX_DEFINE_STATIC(cli_ping_cond_lock);
  119. /*! \brief The astobj2 hash callback for taskprocessors */
  120. static int tps_hash_cb(const void *obj, const int flags);
  121. /*! \brief The astobj2 compare callback for taskprocessors */
  122. static int tps_cmp_cb(void *obj, void *arg, int flags);
  123. /*! \brief CLI <example>taskprocessor ping &lt;blah&gt;</example> handler function */
  124. static int tps_ping_handler(void *datap);
  125. static char *cli_tps_ping(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a);
  126. static char *cli_tps_report(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a);
  127. static struct ast_cli_entry taskprocessor_clis[] = {
  128. AST_CLI_DEFINE(cli_tps_ping, "Ping a named task processor"),
  129. AST_CLI_DEFINE(cli_tps_report, "List instantiated task processors and statistics"),
  130. };
  131. struct default_taskprocessor_listener_pvt {
  132. pthread_t poll_thread;
  133. int dead;
  134. struct ast_sem sem;
  135. };
  136. static void default_listener_pvt_destroy(struct default_taskprocessor_listener_pvt *pvt)
  137. {
  138. ast_assert(pvt->dead);
  139. ast_sem_destroy(&pvt->sem);
  140. ast_free(pvt);
  141. }
  142. static void default_listener_pvt_dtor(struct ast_taskprocessor_listener *listener)
  143. {
  144. struct default_taskprocessor_listener_pvt *pvt = listener->user_data;
  145. default_listener_pvt_destroy(pvt);
  146. listener->user_data = NULL;
  147. }
  148. /*!
  149. * \brief Function that processes tasks in the taskprocessor
  150. * \internal
  151. */
  152. static void *default_tps_processing_function(void *data)
  153. {
  154. struct ast_taskprocessor_listener *listener = data;
  155. struct ast_taskprocessor *tps = listener->tps;
  156. struct default_taskprocessor_listener_pvt *pvt = listener->user_data;
  157. int sem_value;
  158. int res;
  159. while (!pvt->dead) {
  160. res = ast_sem_wait(&pvt->sem);
  161. if (res != 0 && errno != EINTR) {
  162. ast_log(LOG_ERROR, "ast_sem_wait(): %s\n",
  163. strerror(errno));
  164. /* Just give up */
  165. break;
  166. }
  167. ast_taskprocessor_execute(tps);
  168. }
  169. /* No posting to a dead taskprocessor! */
  170. res = ast_sem_getvalue(&pvt->sem, &sem_value);
  171. ast_assert(res == 0 && sem_value == 0);
  172. /* Free the shutdown reference (see default_listener_shutdown) */
  173. ao2_t_ref(listener->tps, -1, "tps-shutdown");
  174. return NULL;
  175. }
  176. static int default_listener_start(struct ast_taskprocessor_listener *listener)
  177. {
  178. struct default_taskprocessor_listener_pvt *pvt = listener->user_data;
  179. if (ast_pthread_create(&pvt->poll_thread, NULL, default_tps_processing_function, listener)) {
  180. return -1;
  181. }
  182. return 0;
  183. }
  184. static void default_task_pushed(struct ast_taskprocessor_listener *listener, int was_empty)
  185. {
  186. struct default_taskprocessor_listener_pvt *pvt = listener->user_data;
  187. if (ast_sem_post(&pvt->sem) != 0) {
  188. ast_log(LOG_ERROR, "Failed to notify of enqueued task: %s\n",
  189. strerror(errno));
  190. }
  191. }
  192. static int default_listener_die(void *data)
  193. {
  194. struct default_taskprocessor_listener_pvt *pvt = data;
  195. pvt->dead = 1;
  196. return 0;
  197. }
  198. static void default_listener_shutdown(struct ast_taskprocessor_listener *listener)
  199. {
  200. struct default_taskprocessor_listener_pvt *pvt = listener->user_data;
  201. int res;
  202. /* Hold a reference during shutdown */
  203. ao2_t_ref(listener->tps, +1, "tps-shutdown");
  204. if (ast_taskprocessor_push(listener->tps, default_listener_die, pvt)) {
  205. /* This will cause the thread to exit early without completing tasks already
  206. * in the queue. This is probably the least bad option in this situation. */
  207. default_listener_die(pvt);
  208. }
  209. ast_assert(pvt->poll_thread != AST_PTHREADT_NULL);
  210. if (pthread_equal(pthread_self(), pvt->poll_thread)) {
  211. res = pthread_detach(pvt->poll_thread);
  212. if (res != 0) {
  213. ast_log(LOG_ERROR, "pthread_detach(): %s\n", strerror(errno));
  214. }
  215. } else {
  216. res = pthread_join(pvt->poll_thread, NULL);
  217. if (res != 0) {
  218. ast_log(LOG_ERROR, "pthread_join(): %s\n", strerror(errno));
  219. }
  220. }
  221. pvt->poll_thread = AST_PTHREADT_NULL;
  222. }
  223. static const struct ast_taskprocessor_listener_callbacks default_listener_callbacks = {
  224. .start = default_listener_start,
  225. .task_pushed = default_task_pushed,
  226. .shutdown = default_listener_shutdown,
  227. .dtor = default_listener_pvt_dtor,
  228. };
  229. /*!
  230. * \internal
  231. * \brief Clean up resources on Asterisk shutdown
  232. */
  233. static void tps_shutdown(void)
  234. {
  235. ast_cli_unregister_multiple(taskprocessor_clis, ARRAY_LEN(taskprocessor_clis));
  236. ao2_t_ref(tps_singletons, -1, "Unref tps_singletons in shutdown");
  237. tps_singletons = NULL;
  238. }
  239. /* initialize the taskprocessor container and register CLI operations */
  240. int ast_tps_init(void)
  241. {
  242. tps_singletons = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_MUTEX, 0,
  243. TPS_MAX_BUCKETS, tps_hash_cb, NULL, tps_cmp_cb);
  244. if (!tps_singletons) {
  245. ast_log(LOG_ERROR, "taskprocessor container failed to initialize!\n");
  246. return -1;
  247. }
  248. ast_cond_init(&cli_ping_cond, NULL);
  249. ast_cli_register_multiple(taskprocessor_clis, ARRAY_LEN(taskprocessor_clis));
  250. ast_register_cleanup(tps_shutdown);
  251. return 0;
  252. }
  253. /* allocate resources for the task */
  254. static struct tps_task *tps_task_alloc(int (*task_exe)(void *datap), void *datap)
  255. {
  256. struct tps_task *t;
  257. if (!task_exe) {
  258. ast_log(LOG_ERROR, "task_exe is NULL!\n");
  259. return NULL;
  260. }
  261. t = ast_calloc(1, sizeof(*t));
  262. if (!t) {
  263. ast_log(LOG_ERROR, "failed to allocate task!\n");
  264. return NULL;
  265. }
  266. t->callback.execute = task_exe;
  267. t->datap = datap;
  268. return t;
  269. }
  270. static struct tps_task *tps_task_alloc_local(int (*task_exe)(struct ast_taskprocessor_local *local), void *datap)
  271. {
  272. struct tps_task *t;
  273. if (!task_exe) {
  274. ast_log(LOG_ERROR, "task_exe is NULL!\n");
  275. return NULL;
  276. }
  277. t = ast_calloc(1, sizeof(*t));
  278. if (!t) {
  279. ast_log(LOG_ERROR, "failed to allocate task!\n");
  280. return NULL;
  281. }
  282. t->callback.execute_local = task_exe;
  283. t->datap = datap;
  284. t->wants_local = 1;
  285. return t;
  286. }
  287. /* release task resources */
  288. static void *tps_task_free(struct tps_task *task)
  289. {
  290. ast_free(task);
  291. return NULL;
  292. }
  293. /* taskprocessor tab completion */
  294. static char *tps_taskprocessor_tab_complete(struct ast_cli_args *a)
  295. {
  296. int tklen;
  297. struct ast_taskprocessor *p;
  298. struct ao2_iterator i;
  299. if (a->pos != 3) {
  300. return NULL;
  301. }
  302. tklen = strlen(a->word);
  303. i = ao2_iterator_init(tps_singletons, 0);
  304. while ((p = ao2_iterator_next(&i))) {
  305. if (!strncasecmp(a->word, p->name, tklen)) {
  306. if (ast_cli_completion_add(ast_strdup(p->name))) {
  307. ast_taskprocessor_unreference(p);
  308. break;
  309. }
  310. }
  311. ast_taskprocessor_unreference(p);
  312. }
  313. ao2_iterator_destroy(&i);
  314. return NULL;
  315. }
  316. /* ping task handling function */
  317. static int tps_ping_handler(void *datap)
  318. {
  319. ast_mutex_lock(&cli_ping_cond_lock);
  320. ast_cond_signal(&cli_ping_cond);
  321. ast_mutex_unlock(&cli_ping_cond_lock);
  322. return 0;
  323. }
  324. /* ping the specified taskprocessor and display the ping time on the CLI */
  325. static char *cli_tps_ping(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
  326. {
  327. struct timeval begin, end, delta;
  328. const char *name;
  329. struct timeval when;
  330. struct timespec ts;
  331. struct ast_taskprocessor *tps;
  332. switch (cmd) {
  333. case CLI_INIT:
  334. e->command = "core ping taskprocessor";
  335. e->usage =
  336. "Usage: core ping taskprocessor <taskprocessor>\n"
  337. " Displays the time required for a task to be processed\n";
  338. return NULL;
  339. case CLI_GENERATE:
  340. return tps_taskprocessor_tab_complete(a);
  341. }
  342. if (a->argc != 4)
  343. return CLI_SHOWUSAGE;
  344. name = a->argv[3];
  345. if (!(tps = ast_taskprocessor_get(name, TPS_REF_IF_EXISTS))) {
  346. ast_cli(a->fd, "\nping failed: %s not found\n\n", name);
  347. return CLI_SUCCESS;
  348. }
  349. ast_cli(a->fd, "\npinging %s ...", name);
  350. /*
  351. * Wait up to 5 seconds for a ping reply.
  352. *
  353. * On a very busy system it could take awhile to get a
  354. * ping response from some taskprocessors.
  355. */
  356. begin = ast_tvnow();
  357. when = ast_tvadd(begin, ast_samp2tv(5000, 1000));
  358. ts.tv_sec = when.tv_sec;
  359. ts.tv_nsec = when.tv_usec * 1000;
  360. ast_mutex_lock(&cli_ping_cond_lock);
  361. if (ast_taskprocessor_push(tps, tps_ping_handler, 0) < 0) {
  362. ast_mutex_unlock(&cli_ping_cond_lock);
  363. ast_cli(a->fd, "\nping failed: could not push task to %s\n\n", name);
  364. ast_taskprocessor_unreference(tps);
  365. return CLI_FAILURE;
  366. }
  367. ast_cond_timedwait(&cli_ping_cond, &cli_ping_cond_lock, &ts);
  368. ast_mutex_unlock(&cli_ping_cond_lock);
  369. end = ast_tvnow();
  370. delta = ast_tvsub(end, begin);
  371. ast_cli(a->fd, "\n\t%24s ping time: %.1ld.%.6ld sec\n\n", name, (long)delta.tv_sec, (long int)delta.tv_usec);
  372. ast_taskprocessor_unreference(tps);
  373. return CLI_SUCCESS;
  374. }
  375. /*!
  376. * \internal
  377. * \brief Taskprocessor ao2 container sort function.
  378. * \since 13.8.0
  379. *
  380. * \param obj_left pointer to the (user-defined part) of an object.
  381. * \param obj_right pointer to the (user-defined part) of an object.
  382. * \param flags flags from ao2_callback()
  383. * OBJ_SEARCH_OBJECT - if set, 'obj_right', is an object.
  384. * OBJ_SEARCH_KEY - if set, 'obj_right', is a search key item that is not an object.
  385. * OBJ_SEARCH_PARTIAL_KEY - if set, 'obj_right', is a partial search key item that is not an object.
  386. *
  387. * \retval <0 if obj_left < obj_right
  388. * \retval =0 if obj_left == obj_right
  389. * \retval >0 if obj_left > obj_right
  390. */
  391. static int tps_sort_cb(const void *obj_left, const void *obj_right, int flags)
  392. {
  393. const struct ast_taskprocessor *tps_left = obj_left;
  394. const struct ast_taskprocessor *tps_right = obj_right;
  395. const char *right_key = obj_right;
  396. int cmp;
  397. switch (flags & OBJ_SEARCH_MASK) {
  398. default:
  399. case OBJ_SEARCH_OBJECT:
  400. right_key = tps_right->name;
  401. /* Fall through */
  402. case OBJ_SEARCH_KEY:
  403. cmp = strcasecmp(tps_left->name, right_key);
  404. break;
  405. case OBJ_SEARCH_PARTIAL_KEY:
  406. cmp = strncasecmp(tps_left->name, right_key, strlen(right_key));
  407. break;
  408. }
  409. return cmp;
  410. }
  411. static char *cli_tps_report(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
  412. {
  413. char name[256];
  414. int tcount;
  415. unsigned long qsize;
  416. unsigned long maxqsize;
  417. unsigned long processed;
  418. struct ao2_container *sorted_tps;
  419. struct ast_taskprocessor *tps;
  420. struct ao2_iterator iter;
  421. #define FMT_HEADERS "%-45s %10s %10s %10s %10s %10s\n"
  422. #define FMT_FIELDS "%-45s %10lu %10lu %10lu %10lu %10lu\n"
  423. switch (cmd) {
  424. case CLI_INIT:
  425. e->command = "core show taskprocessors";
  426. e->usage =
  427. "Usage: core show taskprocessors\n"
  428. " Shows a list of instantiated task processors and their statistics\n";
  429. return NULL;
  430. case CLI_GENERATE:
  431. return NULL;
  432. }
  433. if (a->argc != e->args) {
  434. return CLI_SHOWUSAGE;
  435. }
  436. sorted_tps = ao2_container_alloc_rbtree(AO2_ALLOC_OPT_LOCK_NOLOCK, 0, tps_sort_cb,
  437. NULL);
  438. if (!sorted_tps
  439. || ao2_container_dup(sorted_tps, tps_singletons, 0)) {
  440. ao2_cleanup(sorted_tps);
  441. return CLI_FAILURE;
  442. }
  443. ast_cli(a->fd, "\n" FMT_HEADERS, "Processor", "Processed", "In Queue", "Max Depth", "Low water", "High water");
  444. tcount = 0;
  445. iter = ao2_iterator_init(sorted_tps, AO2_ITERATOR_UNLINK);
  446. while ((tps = ao2_iterator_next(&iter))) {
  447. ast_copy_string(name, tps->name, sizeof(name));
  448. qsize = tps->tps_queue_size;
  449. maxqsize = tps->stats.max_qsize;
  450. processed = tps->stats._tasks_processed_count;
  451. ast_cli(a->fd, FMT_FIELDS, name, processed, qsize, maxqsize,
  452. tps->tps_queue_low, tps->tps_queue_high);
  453. ast_taskprocessor_unreference(tps);
  454. ++tcount;
  455. }
  456. ao2_iterator_destroy(&iter);
  457. ast_cli(a->fd, "\n%d taskprocessors\n\n", tcount);
  458. ao2_ref(sorted_tps, -1);
  459. return CLI_SUCCESS;
  460. }
  461. /* hash callback for astobj2 */
  462. static int tps_hash_cb(const void *obj, const int flags)
  463. {
  464. const struct ast_taskprocessor *tps = obj;
  465. const char *name = flags & OBJ_KEY ? obj : tps->name;
  466. return ast_str_case_hash(name);
  467. }
  468. /* compare callback for astobj2 */
  469. static int tps_cmp_cb(void *obj, void *arg, int flags)
  470. {
  471. struct ast_taskprocessor *lhs = obj, *rhs = arg;
  472. const char *rhsname = flags & OBJ_KEY ? arg : rhs->name;
  473. return !strcasecmp(lhs->name, rhsname) ? CMP_MATCH | CMP_STOP : 0;
  474. }
  475. /*! Count of the number of taskprocessors in high water alert. */
  476. static unsigned int tps_alert_count;
  477. /*! Access protection for tps_alert_count */
  478. AST_RWLOCK_DEFINE_STATIC(tps_alert_lock);
  479. /*!
  480. * \internal
  481. * \brief Add a delta to tps_alert_count with protection.
  482. * \since 13.10.0
  483. *
  484. * \param tps Taskprocessor updating queue water mark alert trigger.
  485. * \param delta The amount to add to tps_alert_count.
  486. *
  487. * \return Nothing
  488. */
  489. static void tps_alert_add(struct ast_taskprocessor *tps, int delta)
  490. {
  491. unsigned int old;
  492. ast_rwlock_wrlock(&tps_alert_lock);
  493. old = tps_alert_count;
  494. tps_alert_count += delta;
  495. if (DEBUG_ATLEAST(3)
  496. /* and tps_alert_count becomes zero or non-zero */
  497. && !old != !tps_alert_count) {
  498. ast_log(LOG_DEBUG, "Taskprocessor '%s' %s the high water alert.\n",
  499. tps->name, tps_alert_count ? "triggered" : "cleared");
  500. }
  501. ast_rwlock_unlock(&tps_alert_lock);
  502. }
  503. unsigned int ast_taskprocessor_alert_get(void)
  504. {
  505. unsigned int count;
  506. ast_rwlock_rdlock(&tps_alert_lock);
  507. count = tps_alert_count;
  508. ast_rwlock_unlock(&tps_alert_lock);
  509. return count;
  510. }
  511. int ast_taskprocessor_alert_set_levels(struct ast_taskprocessor *tps, long low_water, long high_water)
  512. {
  513. if (!tps || high_water < 0 || high_water < low_water) {
  514. return -1;
  515. }
  516. if (low_water < 0) {
  517. /* Set low water level to 90% of high water level */
  518. low_water = (high_water * 9) / 10;
  519. }
  520. ao2_lock(tps);
  521. tps->tps_queue_low = low_water;
  522. tps->tps_queue_high = high_water;
  523. if (tps->high_water_alert) {
  524. if (!tps->tps_queue_size || tps->tps_queue_size < low_water) {
  525. /* Update water mark alert immediately */
  526. tps->high_water_alert = 0;
  527. tps_alert_add(tps, -1);
  528. }
  529. } else {
  530. if (high_water < tps->tps_queue_size) {
  531. /* Update water mark alert immediately */
  532. tps->high_water_alert = 1;
  533. tps_alert_add(tps, +1);
  534. }
  535. }
  536. ao2_unlock(tps);
  537. return 0;
  538. }
  539. /* destroy the taskprocessor */
  540. static void tps_taskprocessor_dtor(void *tps)
  541. {
  542. struct ast_taskprocessor *t = tps;
  543. struct tps_task *task;
  544. while ((task = AST_LIST_REMOVE_HEAD(&t->tps_queue, list))) {
  545. tps_task_free(task);
  546. }
  547. t->tps_queue_size = 0;
  548. if (t->high_water_alert) {
  549. t->high_water_alert = 0;
  550. tps_alert_add(t, -1);
  551. }
  552. ao2_cleanup(t->listener);
  553. t->listener = NULL;
  554. }
  555. /* pop the front task and return it */
  556. static struct tps_task *tps_taskprocessor_pop(struct ast_taskprocessor *tps)
  557. {
  558. struct tps_task *task;
  559. if ((task = AST_LIST_REMOVE_HEAD(&tps->tps_queue, list))) {
  560. --tps->tps_queue_size;
  561. if (tps->high_water_alert && tps->tps_queue_size <= tps->tps_queue_low) {
  562. tps->high_water_alert = 0;
  563. tps_alert_add(tps, -1);
  564. }
  565. }
  566. return task;
  567. }
  568. long ast_taskprocessor_size(struct ast_taskprocessor *tps)
  569. {
  570. return (tps) ? tps->tps_queue_size : -1;
  571. }
  572. /* taskprocessor name accessor */
  573. const char *ast_taskprocessor_name(struct ast_taskprocessor *tps)
  574. {
  575. if (!tps) {
  576. ast_log(LOG_ERROR, "no taskprocessor specified!\n");
  577. return NULL;
  578. }
  579. return tps->name;
  580. }
  581. static void listener_shutdown(struct ast_taskprocessor_listener *listener)
  582. {
  583. listener->callbacks->shutdown(listener);
  584. ao2_ref(listener->tps, -1);
  585. }
  586. static void taskprocessor_listener_dtor(void *obj)
  587. {
  588. struct ast_taskprocessor_listener *listener = obj;
  589. if (listener->callbacks->dtor) {
  590. listener->callbacks->dtor(listener);
  591. }
  592. }
  593. struct ast_taskprocessor_listener *ast_taskprocessor_listener_alloc(const struct ast_taskprocessor_listener_callbacks *callbacks, void *user_data)
  594. {
  595. struct ast_taskprocessor_listener *listener;
  596. listener = ao2_alloc(sizeof(*listener), taskprocessor_listener_dtor);
  597. if (!listener) {
  598. return NULL;
  599. }
  600. listener->callbacks = callbacks;
  601. listener->user_data = user_data;
  602. return listener;
  603. }
  604. struct ast_taskprocessor *ast_taskprocessor_listener_get_tps(const struct ast_taskprocessor_listener *listener)
  605. {
  606. ao2_ref(listener->tps, +1);
  607. return listener->tps;
  608. }
  609. void *ast_taskprocessor_listener_get_user_data(const struct ast_taskprocessor_listener *listener)
  610. {
  611. return listener->user_data;
  612. }
  613. static void *default_listener_pvt_alloc(void)
  614. {
  615. struct default_taskprocessor_listener_pvt *pvt;
  616. pvt = ast_calloc(1, sizeof(*pvt));
  617. if (!pvt) {
  618. return NULL;
  619. }
  620. pvt->poll_thread = AST_PTHREADT_NULL;
  621. if (ast_sem_init(&pvt->sem, 0, 0) != 0) {
  622. ast_log(LOG_ERROR, "ast_sem_init(): %s\n", strerror(errno));
  623. ast_free(pvt);
  624. return NULL;
  625. }
  626. return pvt;
  627. }
  628. /*!
  629. * \internal
  630. * \brief Allocate a task processor structure
  631. *
  632. * \param name Name of the task processor.
  633. * \param listener Listener to associate with the task processor.
  634. *
  635. * \return The newly allocated task processor.
  636. *
  637. * \pre tps_singletons must be locked by the caller.
  638. */
  639. static struct ast_taskprocessor *__allocate_taskprocessor(const char *name, struct ast_taskprocessor_listener *listener)
  640. {
  641. struct ast_taskprocessor *p;
  642. p = ao2_alloc(sizeof(*p) + strlen(name) + 1, tps_taskprocessor_dtor);
  643. if (!p) {
  644. ast_log(LOG_WARNING, "failed to create taskprocessor '%s'\n", name);
  645. return NULL;
  646. }
  647. /* Set default congestion water level alert triggers. */
  648. p->tps_queue_low = (AST_TASKPROCESSOR_HIGH_WATER_LEVEL * 9) / 10;
  649. p->tps_queue_high = AST_TASKPROCESSOR_HIGH_WATER_LEVEL;
  650. strcpy(p->name, name); /*SAFE*/
  651. ao2_ref(listener, +1);
  652. p->listener = listener;
  653. p->thread = AST_PTHREADT_NULL;
  654. ao2_ref(p, +1);
  655. listener->tps = p;
  656. if (!(ao2_link_flags(tps_singletons, p, OBJ_NOLOCK))) {
  657. ast_log(LOG_ERROR, "Failed to add taskprocessor '%s' to container\n", p->name);
  658. listener->tps = NULL;
  659. ao2_ref(p, -2);
  660. return NULL;
  661. }
  662. return p;
  663. }
  664. static struct ast_taskprocessor *__start_taskprocessor(struct ast_taskprocessor *p)
  665. {
  666. if (p && p->listener->callbacks->start(p->listener)) {
  667. ast_log(LOG_ERROR, "Unable to start taskprocessor listener for taskprocessor %s\n",
  668. p->name);
  669. ast_taskprocessor_unreference(p);
  670. return NULL;
  671. }
  672. return p;
  673. }
  674. /* Provide a reference to a taskprocessor. Create the taskprocessor if necessary, but don't
  675. * create the taskprocessor if we were told via ast_tps_options to return a reference only
  676. * if it already exists */
  677. struct ast_taskprocessor *ast_taskprocessor_get(const char *name, enum ast_tps_options create)
  678. {
  679. struct ast_taskprocessor *p;
  680. struct ast_taskprocessor_listener *listener;
  681. struct default_taskprocessor_listener_pvt *pvt;
  682. if (ast_strlen_zero(name)) {
  683. ast_log(LOG_ERROR, "requesting a nameless taskprocessor!!!\n");
  684. return NULL;
  685. }
  686. ao2_lock(tps_singletons);
  687. p = ao2_find(tps_singletons, name, OBJ_KEY | OBJ_NOLOCK);
  688. if (p || (create & TPS_REF_IF_EXISTS)) {
  689. /* calling function does not want a new taskprocessor to be created if it doesn't already exist */
  690. ao2_unlock(tps_singletons);
  691. return p;
  692. }
  693. /* Create a new taskprocessor. Start by creating a default listener */
  694. pvt = default_listener_pvt_alloc();
  695. if (!pvt) {
  696. ao2_unlock(tps_singletons);
  697. return NULL;
  698. }
  699. listener = ast_taskprocessor_listener_alloc(&default_listener_callbacks, pvt);
  700. if (!listener) {
  701. ao2_unlock(tps_singletons);
  702. default_listener_pvt_destroy(pvt);
  703. return NULL;
  704. }
  705. p = __allocate_taskprocessor(name, listener);
  706. ao2_unlock(tps_singletons);
  707. p = __start_taskprocessor(p);
  708. ao2_ref(listener, -1);
  709. return p;
  710. }
  711. struct ast_taskprocessor *ast_taskprocessor_create_with_listener(const char *name, struct ast_taskprocessor_listener *listener)
  712. {
  713. struct ast_taskprocessor *p;
  714. ao2_lock(tps_singletons);
  715. p = ao2_find(tps_singletons, name, OBJ_KEY | OBJ_NOLOCK);
  716. if (p) {
  717. ao2_unlock(tps_singletons);
  718. ast_taskprocessor_unreference(p);
  719. return NULL;
  720. }
  721. p = __allocate_taskprocessor(name, listener);
  722. ao2_unlock(tps_singletons);
  723. return __start_taskprocessor(p);
  724. }
  725. void ast_taskprocessor_set_local(struct ast_taskprocessor *tps,
  726. void *local_data)
  727. {
  728. SCOPED_AO2LOCK(lock, tps);
  729. tps->local_data = local_data;
  730. }
  731. /* decrement the taskprocessor reference count and unlink from the container if necessary */
  732. void *ast_taskprocessor_unreference(struct ast_taskprocessor *tps)
  733. {
  734. if (!tps) {
  735. return NULL;
  736. }
  737. /* To prevent another thread from finding and getting a reference to this
  738. * taskprocessor we hold the singletons lock. If we didn't do this then
  739. * they may acquire it and find that the listener has been shut down.
  740. */
  741. ao2_lock(tps_singletons);
  742. if (ao2_ref(tps, -1) > 3) {
  743. ao2_unlock(tps_singletons);
  744. return NULL;
  745. }
  746. /* If we're down to 3 references, then those must be:
  747. * 1. The reference we just got rid of
  748. * 2. The container
  749. * 3. The listener
  750. */
  751. ao2_unlink_flags(tps_singletons, tps, OBJ_NOLOCK);
  752. ao2_unlock(tps_singletons);
  753. listener_shutdown(tps->listener);
  754. return NULL;
  755. }
  756. /* push the task into the taskprocessor queue */
  757. static int taskprocessor_push(struct ast_taskprocessor *tps, struct tps_task *t)
  758. {
  759. int previous_size;
  760. int was_empty;
  761. if (!tps) {
  762. ast_log(LOG_ERROR, "tps is NULL!\n");
  763. return -1;
  764. }
  765. if (!t) {
  766. ast_log(LOG_ERROR, "t is NULL!\n");
  767. return -1;
  768. }
  769. ao2_lock(tps);
  770. AST_LIST_INSERT_TAIL(&tps->tps_queue, t, list);
  771. previous_size = tps->tps_queue_size++;
  772. if (tps->tps_queue_high <= tps->tps_queue_size) {
  773. if (!tps->high_water_alert) {
  774. ast_log(LOG_WARNING, "The '%s' task processor queue reached %ld scheduled tasks%s.\n",
  775. tps->name, tps->tps_queue_size, tps->high_water_warned ? " again" : "");
  776. tps->high_water_warned = 1;
  777. tps->high_water_alert = 1;
  778. tps_alert_add(tps, +1);
  779. }
  780. }
  781. /* The currently executing task counts as still in queue */
  782. was_empty = tps->executing ? 0 : previous_size == 0;
  783. ao2_unlock(tps);
  784. tps->listener->callbacks->task_pushed(tps->listener, was_empty);
  785. return 0;
  786. }
  787. int ast_taskprocessor_push(struct ast_taskprocessor *tps, int (*task_exe)(void *datap), void *datap)
  788. {
  789. return taskprocessor_push(tps, tps_task_alloc(task_exe, datap));
  790. }
  791. int ast_taskprocessor_push_local(struct ast_taskprocessor *tps, int (*task_exe)(struct ast_taskprocessor_local *datap), void *datap)
  792. {
  793. return taskprocessor_push(tps, tps_task_alloc_local(task_exe, datap));
  794. }
  795. int ast_taskprocessor_suspend(struct ast_taskprocessor *tps)
  796. {
  797. if (tps) {
  798. ao2_lock(tps);
  799. tps->suspended = 1;
  800. ao2_unlock(tps);
  801. return 0;
  802. }
  803. return -1;
  804. }
  805. int ast_taskprocessor_unsuspend(struct ast_taskprocessor *tps)
  806. {
  807. if (tps) {
  808. ao2_lock(tps);
  809. tps->suspended = 0;
  810. ao2_unlock(tps);
  811. return 0;
  812. }
  813. return -1;
  814. }
  815. int ast_taskprocessor_is_suspended(struct ast_taskprocessor *tps)
  816. {
  817. return tps ? tps->suspended : -1;
  818. }
  819. int ast_taskprocessor_execute(struct ast_taskprocessor *tps)
  820. {
  821. struct ast_taskprocessor_local local;
  822. struct tps_task *t;
  823. long size;
  824. ao2_lock(tps);
  825. t = tps_taskprocessor_pop(tps);
  826. if (!t) {
  827. ao2_unlock(tps);
  828. return 0;
  829. }
  830. tps->thread = pthread_self();
  831. tps->executing = 1;
  832. if (t->wants_local) {
  833. local.local_data = tps->local_data;
  834. local.data = t->datap;
  835. }
  836. ao2_unlock(tps);
  837. if (t->wants_local) {
  838. t->callback.execute_local(&local);
  839. } else {
  840. t->callback.execute(t->datap);
  841. }
  842. tps_task_free(t);
  843. ao2_lock(tps);
  844. tps->thread = AST_PTHREADT_NULL;
  845. /* We need to check size in the same critical section where we reset the
  846. * executing bit. Avoids a race condition where a task is pushed right
  847. * after we pop an empty stack.
  848. */
  849. tps->executing = 0;
  850. size = ast_taskprocessor_size(tps);
  851. /* Update the stats */
  852. ++tps->stats._tasks_processed_count;
  853. /* Include the task we just executed as part of the queue size. */
  854. if (size >= tps->stats.max_qsize) {
  855. tps->stats.max_qsize = size + 1;
  856. }
  857. ao2_unlock(tps);
  858. /* If we executed a task, check for the transition to empty */
  859. if (size == 0 && tps->listener->callbacks->emptied) {
  860. tps->listener->callbacks->emptied(tps->listener);
  861. }
  862. return size > 0;
  863. }
  864. int ast_taskprocessor_is_task(struct ast_taskprocessor *tps)
  865. {
  866. int is_task;
  867. ao2_lock(tps);
  868. is_task = pthread_equal(tps->thread, pthread_self());
  869. ao2_unlock(tps);
  870. return is_task;
  871. }
  872. unsigned int ast_taskprocessor_seq_num(void)
  873. {
  874. static int seq_num;
  875. return (unsigned int) ast_atomic_fetchadd_int(&seq_num, +1);
  876. }
  877. void ast_taskprocessor_build_name(char *buf, unsigned int size, const char *format, ...)
  878. {
  879. va_list ap;
  880. int user_size;
  881. #define SEQ_STR_SIZE (1 + 8 + 1) /* Dash plus 8 hex digits plus null terminator */
  882. ast_assert(buf != NULL);
  883. ast_assert(SEQ_STR_SIZE <= size);
  884. va_start(ap, format);
  885. user_size = vsnprintf(buf, size - (SEQ_STR_SIZE - 1), format, ap);
  886. va_end(ap);
  887. if (user_size < 0) {
  888. /*
  889. * Wow! We got an output error to a memory buffer.
  890. * Assume no user part of name written.
  891. */
  892. user_size = 0;
  893. } else if (size < user_size + SEQ_STR_SIZE) {
  894. /* Truncate user part of name to make sequence number fit. */
  895. user_size = size - SEQ_STR_SIZE;
  896. }
  897. /* Append sequence number to end of user name. */
  898. snprintf(buf + user_size, SEQ_STR_SIZE, "-%08x", ast_taskprocessor_seq_num());
  899. }