threadpool.c 39 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437
  1. /*
  2. * Asterisk -- An open source telephony toolkit.
  3. *
  4. * Copyright (C) 2012-2013, Digium, Inc.
  5. *
  6. * Mark Michelson <mmmichelson@digium.com>
  7. *
  8. * See http://www.asterisk.org for more information about
  9. * the Asterisk project. Please do not directly contact
  10. * any of the maintainers of this project for assistance;
  11. * the project provides a web site, mailing lists and IRC
  12. * channels for your use.
  13. *
  14. * This program is free software, distributed under the terms of
  15. * the GNU General Public License Version 2. See the LICENSE file
  16. * at the top of the source tree.
  17. */
  18. #include "asterisk.h"
  19. #include "asterisk/threadpool.h"
  20. #include "asterisk/taskprocessor.h"
  21. #include "asterisk/astobj2.h"
  22. #include "asterisk/utils.h"
  23. /* Needs to stay prime if increased */
  24. #define THREAD_BUCKETS 89
  25. /*!
  26. * \brief An opaque threadpool structure
  27. *
  28. * A threadpool is a collection of threads that execute
  29. * tasks from a common queue.
  30. */
  31. struct ast_threadpool {
  32. /*! Threadpool listener */
  33. struct ast_threadpool_listener *listener;
  34. /*!
  35. * \brief The container of active threads.
  36. * Active threads are those that are currently running tasks
  37. */
  38. struct ao2_container *active_threads;
  39. /*!
  40. * \brief The container of idle threads.
  41. * Idle threads are those that are currenly waiting to run tasks
  42. */
  43. struct ao2_container *idle_threads;
  44. /*!
  45. * \brief The container of zombie threads.
  46. * Zombie threads may be running tasks, but they are scheduled to die soon
  47. */
  48. struct ao2_container *zombie_threads;
  49. /*!
  50. * \brief The main taskprocessor
  51. *
  52. * Tasks that are queued in this taskprocessor are
  53. * doled out to the worker threads. Worker threads that
  54. * execute tasks from the threadpool are executing tasks
  55. * in this taskprocessor.
  56. *
  57. * The threadpool itself is actually the private data for
  58. * this taskprocessor's listener. This way, as taskprocessor
  59. * changes occur, the threadpool can alert its listeners
  60. * appropriately.
  61. */
  62. struct ast_taskprocessor *tps;
  63. /*!
  64. * \brief The control taskprocessor
  65. *
  66. * This is a standard taskprocessor that uses the default
  67. * taskprocessor listener. In other words, all tasks queued to
  68. * this taskprocessor have a single thread that executes the
  69. * tasks.
  70. *
  71. * All tasks that modify the state of the threadpool and all tasks
  72. * that call out to threadpool listeners are pushed to this
  73. * taskprocessor.
  74. *
  75. * For instance, when the threadpool changes sizes, a task is put
  76. * into this taskprocessor to do so. When it comes time to tell the
  77. * threadpool listener that worker threads have changed state,
  78. * the task is placed in this taskprocessor.
  79. *
  80. * This is done for three main reasons
  81. * 1) It ensures that listeners are given an accurate portrayal
  82. * of the threadpool's current state. In other words, when a listener
  83. * gets told a count of active, idle and zombie threads, it does not
  84. * need to worry that internal state of the threadpool might be different
  85. * from what it has been told.
  86. * 2) It minimizes the locking required in both the threadpool and in
  87. * threadpool listener's callbacks.
  88. * 3) It ensures that listener callbacks are called in the same order
  89. * that the threadpool had its state change.
  90. */
  91. struct ast_taskprocessor *control_tps;
  92. /*! True if the threadpool is in the process of shutting down */
  93. int shutting_down;
  94. /*! Threadpool-specific options */
  95. struct ast_threadpool_options options;
  96. };
  97. /*!
  98. * \brief listener for a threadpool
  99. *
  100. * The listener is notified of changes in a threadpool. It can
  101. * react by doing things like increasing the number of threads
  102. * in the pool
  103. */
  104. struct ast_threadpool_listener {
  105. /*! Callbacks called by the threadpool */
  106. const struct ast_threadpool_listener_callbacks *callbacks;
  107. /*! User data for the listener */
  108. void *user_data;
  109. };
  110. /*!
  111. * \brief states for worker threads
  112. */
  113. enum worker_state {
  114. /*! The worker is either active or idle */
  115. ALIVE,
  116. /*!
  117. * The worker has been asked to shut down but
  118. * may still be in the process of executing tasks.
  119. * This transition happens when the threadpool needs
  120. * to shrink and needs to kill active threads in order
  121. * to do so.
  122. */
  123. ZOMBIE,
  124. /*!
  125. * The worker has been asked to shut down. Typically
  126. * only idle threads go to this state directly, but
  127. * active threads may go straight to this state when
  128. * the threadpool is shut down.
  129. */
  130. DEAD,
  131. };
  132. /*!
  133. * A thread that executes threadpool tasks
  134. */
  135. struct worker_thread {
  136. /*! A unique (within a run of Asterisk) ID for the thread. Used for hashing and searching */
  137. int id;
  138. /*! Condition used in conjunction with state changes */
  139. ast_cond_t cond;
  140. /*! Lock used alongside the condition for state changes */
  141. ast_mutex_t lock;
  142. /*! The actual thread that is executing tasks */
  143. pthread_t thread;
  144. /*! A pointer to the threadpool. Needed to be able to execute tasks */
  145. struct ast_threadpool *pool;
  146. /*! The current state of the worker thread */
  147. enum worker_state state;
  148. /*! A boolean used to determine if an idle thread should become active */
  149. int wake_up;
  150. /*! Options for this threadpool */
  151. struct ast_threadpool_options options;
  152. };
  153. /* Worker thread forward declarations. See definitions for documentation */
  154. static int worker_thread_hash(const void *obj, int flags);
  155. static int worker_thread_cmp(void *obj, void *arg, int flags);
  156. static void worker_thread_destroy(void *obj);
  157. static void worker_active(struct worker_thread *worker);
  158. static void *worker_start(void *arg);
  159. static struct worker_thread *worker_thread_alloc(struct ast_threadpool *pool);
  160. static int worker_thread_start(struct worker_thread *worker);
  161. static int worker_idle(struct worker_thread *worker);
  162. static int worker_set_state(struct worker_thread *worker, enum worker_state state);
  163. static void worker_shutdown(struct worker_thread *worker);
  164. /*!
  165. * \brief Notify the threadpool listener that the state has changed.
  166. *
  167. * This notifies the threadpool listener via its state_changed callback.
  168. * \param pool The threadpool whose state has changed
  169. */
  170. static void threadpool_send_state_changed(struct ast_threadpool *pool)
  171. {
  172. int active_size = ao2_container_count(pool->active_threads);
  173. int idle_size = ao2_container_count(pool->idle_threads);
  174. if (pool->listener && pool->listener->callbacks->state_changed) {
  175. pool->listener->callbacks->state_changed(pool, pool->listener, active_size, idle_size);
  176. }
  177. }
  178. /*!
  179. * \brief Struct used for queued operations involving worker state changes
  180. */
  181. struct thread_worker_pair {
  182. /*! Threadpool that contains the worker whose state has changed */
  183. struct ast_threadpool *pool;
  184. /*! Worker whose state has changed */
  185. struct worker_thread *worker;
  186. };
  187. /*!
  188. * \brief Destructor for thread_worker_pair
  189. */
  190. static void thread_worker_pair_free(struct thread_worker_pair *pair)
  191. {
  192. ao2_ref(pair->worker, -1);
  193. ast_free(pair);
  194. }
  195. /*!
  196. * \brief Allocate and initialize a thread_worker_pair
  197. * \param pool Threadpool to assign to the thread_worker_pair
  198. * \param worker Worker thread to assign to the thread_worker_pair
  199. */
  200. static struct thread_worker_pair *thread_worker_pair_alloc(struct ast_threadpool *pool,
  201. struct worker_thread *worker)
  202. {
  203. struct thread_worker_pair *pair = ast_malloc(sizeof(*pair));
  204. if (!pair) {
  205. return NULL;
  206. }
  207. pair->pool = pool;
  208. ao2_ref(worker, +1);
  209. pair->worker = worker;
  210. return pair;
  211. }
  212. /*!
  213. * \brief Move a worker thread from the active container to the idle container.
  214. *
  215. * This function is called from the threadpool's control taskprocessor thread.
  216. * \param data A thread_worker_pair containing the threadpool and the worker to move.
  217. * \return 0
  218. */
  219. static int queued_active_thread_idle(void *data)
  220. {
  221. struct thread_worker_pair *pair = data;
  222. ao2_link(pair->pool->idle_threads, pair->worker);
  223. ao2_unlink(pair->pool->active_threads, pair->worker);
  224. threadpool_send_state_changed(pair->pool);
  225. thread_worker_pair_free(pair);
  226. return 0;
  227. }
  228. /*!
  229. * \brief Queue a task to move a thread from the active list to the idle list
  230. *
  231. * This is called by a worker thread when it runs out of tasks to perform and
  232. * goes idle.
  233. * \param pool The threadpool to which the worker belongs
  234. * \param worker The worker thread that has gone idle
  235. */
  236. static void threadpool_active_thread_idle(struct ast_threadpool *pool,
  237. struct worker_thread *worker)
  238. {
  239. struct thread_worker_pair *pair;
  240. SCOPED_AO2LOCK(lock, pool);
  241. if (pool->shutting_down) {
  242. return;
  243. }
  244. pair = thread_worker_pair_alloc(pool, worker);
  245. if (!pair) {
  246. return;
  247. }
  248. if (ast_taskprocessor_push(pool->control_tps, queued_active_thread_idle, pair)) {
  249. thread_worker_pair_free(pair);
  250. }
  251. }
  252. /*!
  253. * \brief Kill a zombie thread
  254. *
  255. * This runs from the threadpool's control taskprocessor thread.
  256. *
  257. * \param data A thread_worker_pair containing the threadpool and the zombie thread
  258. * \return 0
  259. */
  260. static int queued_zombie_thread_dead(void *data)
  261. {
  262. struct thread_worker_pair *pair = data;
  263. ao2_unlink(pair->pool->zombie_threads, pair->worker);
  264. threadpool_send_state_changed(pair->pool);
  265. thread_worker_pair_free(pair);
  266. return 0;
  267. }
  268. /*!
  269. * \brief Queue a task to kill a zombie thread
  270. *
  271. * This is called by a worker thread when it acknowledges that it is time for
  272. * it to die.
  273. */
  274. static void threadpool_zombie_thread_dead(struct ast_threadpool *pool,
  275. struct worker_thread *worker)
  276. {
  277. struct thread_worker_pair *pair;
  278. SCOPED_AO2LOCK(lock, pool);
  279. if (pool->shutting_down) {
  280. return;
  281. }
  282. pair = thread_worker_pair_alloc(pool, worker);
  283. if (!pair) {
  284. return;
  285. }
  286. if (ast_taskprocessor_push(pool->control_tps, queued_zombie_thread_dead, pair)) {
  287. thread_worker_pair_free(pair);
  288. }
  289. }
  290. static int queued_idle_thread_dead(void *data)
  291. {
  292. struct thread_worker_pair *pair = data;
  293. ao2_unlink(pair->pool->idle_threads, pair->worker);
  294. threadpool_send_state_changed(pair->pool);
  295. thread_worker_pair_free(pair);
  296. return 0;
  297. }
  298. static void threadpool_idle_thread_dead(struct ast_threadpool *pool,
  299. struct worker_thread *worker)
  300. {
  301. struct thread_worker_pair *pair;
  302. SCOPED_AO2LOCK(lock, pool);
  303. if (pool->shutting_down) {
  304. return;
  305. }
  306. pair = thread_worker_pair_alloc(pool, worker);
  307. if (!pair) {
  308. return;
  309. }
  310. if (ast_taskprocessor_push(pool->control_tps, queued_idle_thread_dead, pair)) {
  311. thread_worker_pair_free(pair);
  312. }
  313. }
  314. /*!
  315. * \brief Execute a task in the threadpool
  316. *
  317. * This is the function that worker threads call in order to execute tasks
  318. * in the threadpool
  319. *
  320. * \param pool The pool to which the tasks belong.
  321. * \retval 0 Either the pool has been shut down or there are no tasks.
  322. * \retval 1 There are still tasks remaining in the pool.
  323. */
  324. static int threadpool_execute(struct ast_threadpool *pool)
  325. {
  326. ao2_lock(pool);
  327. if (!pool->shutting_down) {
  328. ao2_unlock(pool);
  329. return ast_taskprocessor_execute(pool->tps);
  330. }
  331. ao2_unlock(pool);
  332. return 0;
  333. }
  334. /*!
  335. * \brief Destroy a threadpool's components.
  336. *
  337. * This is the destructor called automatically when the threadpool's
  338. * reference count reaches zero. This is not to be confused with
  339. * threadpool_destroy.
  340. *
  341. * By the time this actually gets called, most of the cleanup has already
  342. * been done in the pool. The only thing left to do is to release the
  343. * final reference to the threadpool listener.
  344. *
  345. * \param obj The pool to destroy
  346. */
  347. static void threadpool_destructor(void *obj)
  348. {
  349. struct ast_threadpool *pool = obj;
  350. ao2_cleanup(pool->listener);
  351. }
  352. /*
  353. * \brief Allocate a threadpool
  354. *
  355. * This is implemented as a taskprocessor listener's alloc callback. This
  356. * is because the threadpool exists as the private data on a taskprocessor
  357. * listener.
  358. *
  359. * \param name The name of the threadpool.
  360. * \param options The options the threadpool uses.
  361. * \retval NULL Could not initialize threadpool properly
  362. * \retval non-NULL The newly-allocated threadpool
  363. */
  364. static struct ast_threadpool *threadpool_alloc(const char *name, const struct ast_threadpool_options *options)
  365. {
  366. RAII_VAR(struct ast_threadpool *, pool, NULL, ao2_cleanup);
  367. struct ast_str *control_tps_name;
  368. pool = ao2_alloc(sizeof(*pool), threadpool_destructor);
  369. control_tps_name = ast_str_create(64);
  370. if (!pool || !control_tps_name) {
  371. ast_free(control_tps_name);
  372. return NULL;
  373. }
  374. ast_str_set(&control_tps_name, 0, "%s-control", name);
  375. pool->control_tps = ast_taskprocessor_get(ast_str_buffer(control_tps_name), TPS_REF_DEFAULT);
  376. ast_free(control_tps_name);
  377. if (!pool->control_tps) {
  378. return NULL;
  379. }
  380. pool->active_threads = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_MUTEX, 0,
  381. THREAD_BUCKETS, worker_thread_hash, NULL, worker_thread_cmp);
  382. if (!pool->active_threads) {
  383. return NULL;
  384. }
  385. pool->idle_threads = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_MUTEX, 0,
  386. THREAD_BUCKETS, worker_thread_hash, NULL, worker_thread_cmp);
  387. if (!pool->idle_threads) {
  388. return NULL;
  389. }
  390. pool->zombie_threads = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_MUTEX, 0,
  391. THREAD_BUCKETS, worker_thread_hash, NULL, worker_thread_cmp);
  392. if (!pool->zombie_threads) {
  393. return NULL;
  394. }
  395. pool->options = *options;
  396. ao2_ref(pool, +1);
  397. return pool;
  398. }
  399. static int threadpool_tps_start(struct ast_taskprocessor_listener *listener)
  400. {
  401. return 0;
  402. }
  403. /*!
  404. * \brief helper used for queued task when tasks are pushed
  405. */
  406. struct task_pushed_data {
  407. /*! Pool into which a task was pushed */
  408. struct ast_threadpool *pool;
  409. /*! Indicator of whether the pool had no tasks prior to the new task being added */
  410. int was_empty;
  411. };
  412. /*!
  413. * \brief Allocate and initialize a task_pushed_data
  414. * \param pool The threadpool to set in the task_pushed_data
  415. * \param was_empty The was_empty value to set in the task_pushed_data
  416. * \retval NULL Unable to allocate task_pushed_data
  417. * \retval non-NULL The newly-allocated task_pushed_data
  418. */
  419. static struct task_pushed_data *task_pushed_data_alloc(struct ast_threadpool *pool,
  420. int was_empty)
  421. {
  422. struct task_pushed_data *tpd = ast_malloc(sizeof(*tpd));
  423. if (!tpd) {
  424. return NULL;
  425. }
  426. tpd->pool = pool;
  427. tpd->was_empty = was_empty;
  428. return tpd;
  429. }
  430. /*!
  431. * \brief Activate idle threads
  432. *
  433. * This function always returns CMP_MATCH because all workers that this
  434. * function acts on need to be seen as matches so they are unlinked from the
  435. * list of idle threads.
  436. *
  437. * Called as an ao2_callback in the threadpool's control taskprocessor thread.
  438. * \param obj The worker to activate
  439. * \param arg The pool where the worker belongs
  440. * \retval CMP_MATCH
  441. */
  442. static int activate_thread(void *obj, void *arg, int flags)
  443. {
  444. struct worker_thread *worker = obj;
  445. struct ast_threadpool *pool = arg;
  446. if (!ao2_link(pool->active_threads, worker)) {
  447. /* If we can't link the idle thread into the active container, then
  448. * we'll just leave the thread idle and not wake it up.
  449. */
  450. ast_log(LOG_WARNING, "Failed to activate thread %d. Remaining idle\n",
  451. worker->id);
  452. return 0;
  453. }
  454. if (worker_set_state(worker, ALIVE)) {
  455. ast_debug(1, "Failed to activate thread %d. It is dead\n",
  456. worker->id);
  457. /* The worker thread will no longer exist in the active threads or
  458. * idle threads container after this.
  459. */
  460. ao2_unlink(pool->active_threads, worker);
  461. }
  462. return CMP_MATCH;
  463. }
  464. /*!
  465. * \brief Add threads to the threadpool
  466. *
  467. * This function is called from the threadpool's control taskprocessor thread.
  468. * \param pool The pool that is expanding
  469. * \delta The number of threads to add to the pool
  470. */
  471. static void grow(struct ast_threadpool *pool, int delta)
  472. {
  473. int i;
  474. int current_size = ao2_container_count(pool->active_threads) +
  475. ao2_container_count(pool->idle_threads);
  476. if (pool->options.max_size && current_size + delta > pool->options.max_size) {
  477. delta = pool->options.max_size - current_size;
  478. }
  479. ast_debug(3, "Increasing threadpool %s's size by %d\n",
  480. ast_taskprocessor_name(pool->tps), delta);
  481. for (i = 0; i < delta; ++i) {
  482. struct worker_thread *worker = worker_thread_alloc(pool);
  483. if (!worker) {
  484. return;
  485. }
  486. if (ao2_link(pool->idle_threads, worker)) {
  487. if (worker_thread_start(worker)) {
  488. ast_log(LOG_ERROR, "Unable to start worker thread %d. Destroying.\n", worker->id);
  489. ao2_unlink(pool->active_threads, worker);
  490. }
  491. } else {
  492. ast_log(LOG_WARNING, "Failed to activate worker thread %d. Destroying.\n", worker->id);
  493. }
  494. ao2_ref(worker, -1);
  495. }
  496. }
  497. /*!
  498. * \brief Queued task called when tasks are pushed into the threadpool
  499. *
  500. * This function first calls into the threadpool's listener to let it know
  501. * that a task has been pushed. It then wakes up all idle threads and moves
  502. * them into the active thread container.
  503. * \param data A task_pushed_data
  504. * \return 0
  505. */
  506. static int queued_task_pushed(void *data)
  507. {
  508. struct task_pushed_data *tpd = data;
  509. struct ast_threadpool *pool = tpd->pool;
  510. int was_empty = tpd->was_empty;
  511. unsigned int existing_active;
  512. ast_free(tpd);
  513. if (pool->listener && pool->listener->callbacks->task_pushed) {
  514. pool->listener->callbacks->task_pushed(pool, pool->listener, was_empty);
  515. }
  516. existing_active = ao2_container_count(pool->active_threads);
  517. /* The first pass transitions any existing idle threads to be active, and
  518. * will also remove any worker threads that have recently entered the dead
  519. * state.
  520. */
  521. ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA,
  522. activate_thread, pool);
  523. /* If no idle threads could be transitioned to active grow the pool as permitted. */
  524. if (ao2_container_count(pool->active_threads) == existing_active) {
  525. if (!pool->options.auto_increment) {
  526. return 0;
  527. }
  528. grow(pool, pool->options.auto_increment);
  529. /* An optional second pass transitions any newly added threads. */
  530. ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA,
  531. activate_thread, pool);
  532. }
  533. threadpool_send_state_changed(pool);
  534. return 0;
  535. }
  536. /*!
  537. * \brief Taskprocessor listener callback called when a task is added
  538. *
  539. * The threadpool uses this opportunity to queue a task on its control taskprocessor
  540. * in order to activate idle threads and notify the threadpool listener that the
  541. * task has been pushed.
  542. * \param listener The taskprocessor listener. The threadpool is the listener's private data
  543. * \param was_empty True if the taskprocessor was empty prior to the task being pushed
  544. */
  545. static void threadpool_tps_task_pushed(struct ast_taskprocessor_listener *listener,
  546. int was_empty)
  547. {
  548. struct ast_threadpool *pool = ast_taskprocessor_listener_get_user_data(listener);
  549. struct task_pushed_data *tpd;
  550. SCOPED_AO2LOCK(lock, pool);
  551. if (pool->shutting_down) {
  552. return;
  553. }
  554. tpd = task_pushed_data_alloc(pool, was_empty);
  555. if (!tpd) {
  556. return;
  557. }
  558. if (ast_taskprocessor_push(pool->control_tps, queued_task_pushed, tpd)) {
  559. ast_free(tpd);
  560. }
  561. }
  562. /*!
  563. * \brief Queued task that handles the case where the threadpool's taskprocessor is emptied
  564. *
  565. * This simply lets the threadpool's listener know that the threadpool is devoid of tasks
  566. * \param data The pool that has become empty
  567. * \return 0
  568. */
  569. static int queued_emptied(void *data)
  570. {
  571. struct ast_threadpool *pool = data;
  572. /* We already checked for existence of this callback when this was queued */
  573. pool->listener->callbacks->emptied(pool, pool->listener);
  574. return 0;
  575. }
  576. /*!
  577. * \brief Taskprocessor listener emptied callback
  578. *
  579. * The threadpool queues a task to let the threadpool listener know that
  580. * the threadpool no longer contains any tasks.
  581. * \param listener The taskprocessor listener. The threadpool is the listener's private data.
  582. */
  583. static void threadpool_tps_emptied(struct ast_taskprocessor_listener *listener)
  584. {
  585. struct ast_threadpool *pool = ast_taskprocessor_listener_get_user_data(listener);
  586. SCOPED_AO2LOCK(lock, pool);
  587. if (pool->shutting_down) {
  588. return;
  589. }
  590. if (pool->listener && pool->listener->callbacks->emptied) {
  591. if (ast_taskprocessor_push(pool->control_tps, queued_emptied, pool)) {
  592. /* Nothing to do here but we need the check to keep the compiler happy. */
  593. }
  594. }
  595. }
  596. /*!
  597. * \brief Taskprocessor listener shutdown callback
  598. *
  599. * The threadpool will shut down and destroy all of its worker threads when
  600. * this is called back. By the time this gets called, the taskprocessor's
  601. * control taskprocessor has already been destroyed. Therefore there is no risk
  602. * in outright destroying the worker threads here.
  603. * \param listener The taskprocessor listener. The threadpool is the listener's private data.
  604. */
  605. static void threadpool_tps_shutdown(struct ast_taskprocessor_listener *listener)
  606. {
  607. struct ast_threadpool *pool = ast_taskprocessor_listener_get_user_data(listener);
  608. if (pool->listener && pool->listener->callbacks->shutdown) {
  609. pool->listener->callbacks->shutdown(pool->listener);
  610. }
  611. ao2_cleanup(pool->active_threads);
  612. ao2_cleanup(pool->idle_threads);
  613. ao2_cleanup(pool->zombie_threads);
  614. ao2_cleanup(pool);
  615. }
  616. /*!
  617. * \brief Table of taskprocessor listener callbacks for threadpool's main taskprocessor
  618. */
  619. static struct ast_taskprocessor_listener_callbacks threadpool_tps_listener_callbacks = {
  620. .start = threadpool_tps_start,
  621. .task_pushed = threadpool_tps_task_pushed,
  622. .emptied = threadpool_tps_emptied,
  623. .shutdown = threadpool_tps_shutdown,
  624. };
  625. /*!
  626. * \brief ao2 callback to kill a set number of threads.
  627. *
  628. * Threads will be unlinked from the container as long as the
  629. * counter has not reached zero. The counter is decremented with
  630. * each thread that is removed.
  631. * \param obj The worker thread up for possible destruction
  632. * \param arg The counter
  633. * \param flags Unused
  634. * \retval CMP_MATCH The counter has not reached zero, so this flag should be removed.
  635. * \retval CMP_STOP The counter has reached zero so no more threads should be removed.
  636. */
  637. static int kill_threads(void *obj, void *arg, int flags)
  638. {
  639. int *num_to_kill = arg;
  640. if (*num_to_kill > 0) {
  641. --(*num_to_kill);
  642. return CMP_MATCH;
  643. } else {
  644. return CMP_STOP;
  645. }
  646. }
  647. /*!
  648. * \brief ao2 callback to zombify a set number of threads.
  649. *
  650. * Threads will be zombified as long as the counter has not reached
  651. * zero. The counter is decremented with each thread that is zombified.
  652. *
  653. * Zombifying a thread involves removing it from its current container,
  654. * adding it to the zombie container, and changing the state of the
  655. * worker to a zombie
  656. *
  657. * This callback is called from the threadpool control taskprocessor thread.
  658. *
  659. * \param obj The worker thread that may be zombified
  660. * \param arg The pool to which the worker belongs
  661. * \param data The counter
  662. * \param flags Unused
  663. * \retval CMP_MATCH The zombified thread should be removed from its current container
  664. * \retval CMP_STOP Stop attempting to zombify threads
  665. */
  666. static int zombify_threads(void *obj, void *arg, void *data, int flags)
  667. {
  668. struct worker_thread *worker = obj;
  669. struct ast_threadpool *pool = arg;
  670. int *num_to_zombify = data;
  671. if ((*num_to_zombify)-- > 0) {
  672. if (!ao2_link(pool->zombie_threads, worker)) {
  673. ast_log(LOG_WARNING, "Failed to zombify active thread %d. Thread will remain active\n", worker->id);
  674. return 0;
  675. }
  676. worker_set_state(worker, ZOMBIE);
  677. return CMP_MATCH;
  678. } else {
  679. return CMP_STOP;
  680. }
  681. }
  682. /*!
  683. * \brief Remove threads from the threadpool
  684. *
  685. * The preference is to kill idle threads. However, if there are
  686. * more threads to remove than there are idle threads, then active
  687. * threads will be zombified instead.
  688. *
  689. * This function is called from the threadpool control taskprocessor thread.
  690. *
  691. * \param pool The threadpool to remove threads from
  692. * \param delta The number of threads to remove
  693. */
  694. static void shrink(struct ast_threadpool *pool, int delta)
  695. {
  696. /*
  697. * Preference is to kill idle threads, but
  698. * we'll move on to deactivating active threads
  699. * if we have to
  700. */
  701. int idle_threads = ao2_container_count(pool->idle_threads);
  702. int idle_threads_to_kill = MIN(delta, idle_threads);
  703. int active_threads_to_zombify = delta - idle_threads_to_kill;
  704. ast_debug(3, "Destroying %d idle threads in threadpool %s\n", idle_threads_to_kill,
  705. ast_taskprocessor_name(pool->tps));
  706. ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA | OBJ_MULTIPLE,
  707. kill_threads, &idle_threads_to_kill);
  708. ast_debug(3, "Destroying %d active threads in threadpool %s\n", active_threads_to_zombify,
  709. ast_taskprocessor_name(pool->tps));
  710. ao2_callback_data(pool->active_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA | OBJ_MULTIPLE,
  711. zombify_threads, pool, &active_threads_to_zombify);
  712. }
  713. /*!
  714. * \brief Helper struct used for queued operations that change the size of the threadpool
  715. */
  716. struct set_size_data {
  717. /*! The pool whose size is to change */
  718. struct ast_threadpool *pool;
  719. /*! The requested new size of the pool */
  720. unsigned int size;
  721. };
  722. /*!
  723. * \brief Allocate and initialize a set_size_data
  724. * \param pool The pool for the set_size_data
  725. * \param size The size to store in the set_size_data
  726. */
  727. static struct set_size_data *set_size_data_alloc(struct ast_threadpool *pool,
  728. unsigned int size)
  729. {
  730. struct set_size_data *ssd = ast_malloc(sizeof(*ssd));
  731. if (!ssd) {
  732. return NULL;
  733. }
  734. ssd->pool = pool;
  735. ssd->size = size;
  736. return ssd;
  737. }
  738. /*!
  739. * \brief Change the size of the threadpool
  740. *
  741. * This can either result in shrinking or growing the threadpool depending
  742. * on the new desired size and the current size.
  743. *
  744. * This function is run from the threadpool control taskprocessor thread
  745. *
  746. * \param data A set_size_data used for determining how to act
  747. * \return 0
  748. */
  749. static int queued_set_size(void *data)
  750. {
  751. struct set_size_data *ssd = data;
  752. struct ast_threadpool *pool = ssd->pool;
  753. unsigned int num_threads = ssd->size;
  754. /* We don't count zombie threads as being "live" when potentially resizing */
  755. unsigned int current_size = ao2_container_count(pool->active_threads) +
  756. ao2_container_count(pool->idle_threads);
  757. ast_free(ssd);
  758. if (current_size == num_threads) {
  759. ast_debug(3, "Not changing threadpool size since new size %u is the same as current %u\n",
  760. num_threads, current_size);
  761. return 0;
  762. }
  763. if (current_size < num_threads) {
  764. ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA | OBJ_MULTIPLE,
  765. activate_thread, pool);
  766. /* As the above may have altered the number of current threads update it */
  767. current_size = ao2_container_count(pool->active_threads) +
  768. ao2_container_count(pool->idle_threads);
  769. grow(pool, num_threads - current_size);
  770. ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA | OBJ_MULTIPLE,
  771. activate_thread, pool);
  772. } else {
  773. shrink(pool, current_size - num_threads);
  774. }
  775. threadpool_send_state_changed(pool);
  776. return 0;
  777. }
  778. void ast_threadpool_set_size(struct ast_threadpool *pool, unsigned int size)
  779. {
  780. struct set_size_data *ssd;
  781. SCOPED_AO2LOCK(lock, pool);
  782. if (pool->shutting_down) {
  783. return;
  784. }
  785. ssd = set_size_data_alloc(pool, size);
  786. if (!ssd) {
  787. return;
  788. }
  789. if (ast_taskprocessor_push(pool->control_tps, queued_set_size, ssd)) {
  790. ast_free(ssd);
  791. }
  792. }
  793. struct ast_threadpool_listener *ast_threadpool_listener_alloc(
  794. const struct ast_threadpool_listener_callbacks *callbacks, void *user_data)
  795. {
  796. struct ast_threadpool_listener *listener = ao2_alloc(sizeof(*listener), NULL);
  797. if (!listener) {
  798. return NULL;
  799. }
  800. listener->callbacks = callbacks;
  801. listener->user_data = user_data;
  802. return listener;
  803. }
  804. void *ast_threadpool_listener_get_user_data(const struct ast_threadpool_listener *listener)
  805. {
  806. return listener->user_data;
  807. }
  808. struct pool_options_pair {
  809. struct ast_threadpool *pool;
  810. struct ast_threadpool_options options;
  811. };
  812. struct ast_threadpool *ast_threadpool_create(const char *name,
  813. struct ast_threadpool_listener *listener,
  814. const struct ast_threadpool_options *options)
  815. {
  816. struct ast_taskprocessor *tps;
  817. RAII_VAR(struct ast_taskprocessor_listener *, tps_listener, NULL, ao2_cleanup);
  818. RAII_VAR(struct ast_threadpool *, pool, NULL, ao2_cleanup);
  819. pool = threadpool_alloc(name, options);
  820. if (!pool) {
  821. return NULL;
  822. }
  823. tps_listener = ast_taskprocessor_listener_alloc(&threadpool_tps_listener_callbacks, pool);
  824. if (!tps_listener) {
  825. return NULL;
  826. }
  827. if (options->version != AST_THREADPOOL_OPTIONS_VERSION) {
  828. ast_log(LOG_WARNING, "Incompatible version of threadpool options in use.\n");
  829. return NULL;
  830. }
  831. tps = ast_taskprocessor_create_with_listener(name, tps_listener);
  832. if (!tps) {
  833. return NULL;
  834. }
  835. pool->tps = tps;
  836. if (listener) {
  837. ao2_ref(listener, +1);
  838. pool->listener = listener;
  839. }
  840. ast_threadpool_set_size(pool, pool->options.initial_size);
  841. ao2_ref(pool, +1);
  842. return pool;
  843. }
  844. int ast_threadpool_push(struct ast_threadpool *pool, int (*task)(void *data), void *data)
  845. {
  846. SCOPED_AO2LOCK(lock, pool);
  847. if (!pool->shutting_down) {
  848. return ast_taskprocessor_push(pool->tps, task, data);
  849. }
  850. return -1;
  851. }
  852. void ast_threadpool_shutdown(struct ast_threadpool *pool)
  853. {
  854. if (!pool) {
  855. return;
  856. }
  857. /* Shut down the taskprocessors and everything else just
  858. * takes care of itself via the taskprocessor callbacks
  859. */
  860. ao2_lock(pool);
  861. pool->shutting_down = 1;
  862. ao2_unlock(pool);
  863. ast_taskprocessor_unreference(pool->control_tps);
  864. ast_taskprocessor_unreference(pool->tps);
  865. }
  866. /*!
  867. * A monotonically increasing integer used for worker
  868. * thread identification.
  869. */
  870. static int worker_id_counter;
  871. static int worker_thread_hash(const void *obj, int flags)
  872. {
  873. const struct worker_thread *worker = obj;
  874. return worker->id;
  875. }
  876. static int worker_thread_cmp(void *obj, void *arg, int flags)
  877. {
  878. struct worker_thread *worker1 = obj;
  879. struct worker_thread *worker2 = arg;
  880. return worker1->id == worker2->id ? CMP_MATCH : 0;
  881. }
  882. /*!
  883. * \brief shut a worker thread down
  884. *
  885. * Set the worker dead and then wait for its thread
  886. * to finish executing.
  887. *
  888. * \param worker The worker thread to shut down
  889. */
  890. static void worker_shutdown(struct worker_thread *worker)
  891. {
  892. worker_set_state(worker, DEAD);
  893. if (worker->thread != AST_PTHREADT_NULL) {
  894. pthread_join(worker->thread, NULL);
  895. worker->thread = AST_PTHREADT_NULL;
  896. }
  897. }
  898. /*!
  899. * \brief Worker thread destructor
  900. *
  901. * Called automatically when refcount reaches 0. Shuts
  902. * down the worker thread and destroys its component
  903. * parts
  904. */
  905. static void worker_thread_destroy(void *obj)
  906. {
  907. struct worker_thread *worker = obj;
  908. ast_debug(3, "Destroying worker thread %d\n", worker->id);
  909. worker_shutdown(worker);
  910. ast_mutex_destroy(&worker->lock);
  911. ast_cond_destroy(&worker->cond);
  912. }
  913. /*!
  914. * \brief start point for worker threads
  915. *
  916. * Worker threads start in the active state but may
  917. * immediately go idle if there is no work to be
  918. * done
  919. *
  920. * \param arg The worker thread
  921. * \retval NULL
  922. */
  923. static void *worker_start(void *arg)
  924. {
  925. struct worker_thread *worker = arg;
  926. enum worker_state saved_state;
  927. if (worker->options.thread_start) {
  928. worker->options.thread_start();
  929. }
  930. ast_mutex_lock(&worker->lock);
  931. while (worker_idle(worker)) {
  932. ast_mutex_unlock(&worker->lock);
  933. worker_active(worker);
  934. ast_mutex_lock(&worker->lock);
  935. if (worker->state != ALIVE) {
  936. break;
  937. }
  938. threadpool_active_thread_idle(worker->pool, worker);
  939. }
  940. saved_state = worker->state;
  941. ast_mutex_unlock(&worker->lock);
  942. /* Reaching this portion means the thread is
  943. * on death's door. It may have been killed while
  944. * it was idle, in which case it can just die
  945. * peacefully. If it's a zombie, though, then
  946. * it needs to let the pool know so
  947. * that the thread can be removed from the
  948. * list of zombie threads.
  949. */
  950. if (saved_state == ZOMBIE) {
  951. threadpool_zombie_thread_dead(worker->pool, worker);
  952. }
  953. if (worker->options.thread_end) {
  954. worker->options.thread_end();
  955. }
  956. return NULL;
  957. }
  958. /*!
  959. * \brief Allocate and initialize a new worker thread
  960. *
  961. * This will create, initialize, and start the thread.
  962. *
  963. * \param pool The threadpool to which the worker will be added
  964. * \retval NULL Failed to allocate or start the worker thread
  965. * \retval non-NULL The newly-created worker thread
  966. */
  967. static struct worker_thread *worker_thread_alloc(struct ast_threadpool *pool)
  968. {
  969. struct worker_thread *worker = ao2_alloc(sizeof(*worker), worker_thread_destroy);
  970. if (!worker) {
  971. return NULL;
  972. }
  973. worker->id = ast_atomic_fetchadd_int(&worker_id_counter, 1);
  974. ast_mutex_init(&worker->lock);
  975. ast_cond_init(&worker->cond, NULL);
  976. worker->pool = pool;
  977. worker->thread = AST_PTHREADT_NULL;
  978. worker->state = ALIVE;
  979. worker->options = pool->options;
  980. return worker;
  981. }
  982. static int worker_thread_start(struct worker_thread *worker)
  983. {
  984. return ast_pthread_create(&worker->thread, NULL, worker_start, worker);
  985. }
  986. /*!
  987. * \brief Active loop for worker threads
  988. *
  989. * The worker will stay in this loop for its lifetime,
  990. * executing tasks as they become available. If there
  991. * are no tasks currently available, then the thread
  992. * will go idle.
  993. *
  994. * \param worker The worker thread executing tasks.
  995. */
  996. static void worker_active(struct worker_thread *worker)
  997. {
  998. int alive;
  999. /* The following is equivalent to
  1000. *
  1001. * while (threadpool_execute(worker->pool));
  1002. *
  1003. * However, reviewers have suggested in the past
  1004. * doing that can cause optimizers to (wrongly)
  1005. * optimize the code away.
  1006. */
  1007. do {
  1008. alive = threadpool_execute(worker->pool);
  1009. } while (alive);
  1010. }
  1011. /*!
  1012. * \brief Idle function for worker threads
  1013. *
  1014. * The worker waits here until it gets told by the threadpool
  1015. * to wake up.
  1016. *
  1017. * worker is locked before entering this function.
  1018. *
  1019. * \param worker The idle worker
  1020. * \retval 0 The thread is being woken up so that it can conclude.
  1021. * \retval non-zero The thread is being woken up to do more work.
  1022. */
  1023. static int worker_idle(struct worker_thread *worker)
  1024. {
  1025. struct timeval start = ast_tvnow();
  1026. struct timespec end = {
  1027. .tv_sec = start.tv_sec + worker->options.idle_timeout,
  1028. .tv_nsec = start.tv_usec * 1000,
  1029. };
  1030. while (!worker->wake_up) {
  1031. if (worker->options.idle_timeout <= 0) {
  1032. ast_cond_wait(&worker->cond, &worker->lock);
  1033. } else if (ast_cond_timedwait(&worker->cond, &worker->lock, &end) == ETIMEDOUT) {
  1034. break;
  1035. }
  1036. }
  1037. if (!worker->wake_up) {
  1038. ast_debug(1, "Worker thread idle timeout reached. Dying.\n");
  1039. threadpool_idle_thread_dead(worker->pool, worker);
  1040. worker->state = DEAD;
  1041. }
  1042. worker->wake_up = 0;
  1043. return worker->state == ALIVE;
  1044. }
  1045. /*!
  1046. * \brief Change a worker's state
  1047. *
  1048. * The threadpool calls into this function in order to let a worker know
  1049. * how it should proceed.
  1050. *
  1051. * \retval -1 failure (state transition not permitted)
  1052. * \retval 0 success
  1053. */
  1054. static int worker_set_state(struct worker_thread *worker, enum worker_state state)
  1055. {
  1056. SCOPED_MUTEX(lock, &worker->lock);
  1057. switch (state) {
  1058. case ALIVE:
  1059. /* This can occur due to a race condition between being told to go active
  1060. * and an idle timeout happening.
  1061. */
  1062. if (worker->state == DEAD) {
  1063. return -1;
  1064. }
  1065. ast_assert(worker->state != ZOMBIE);
  1066. break;
  1067. case DEAD:
  1068. break;
  1069. case ZOMBIE:
  1070. ast_assert(worker->state != DEAD);
  1071. break;
  1072. }
  1073. worker->state = state;
  1074. worker->wake_up = 1;
  1075. ast_cond_signal(&worker->cond);
  1076. return 0;
  1077. }
  1078. /*! Serializer group shutdown control object. */
  1079. struct ast_serializer_shutdown_group {
  1080. /*! Shutdown thread waits on this conditional. */
  1081. ast_cond_t cond;
  1082. /*! Count of serializers needing to shutdown. */
  1083. int count;
  1084. };
  1085. static void serializer_shutdown_group_dtor(void *vdoomed)
  1086. {
  1087. struct ast_serializer_shutdown_group *doomed = vdoomed;
  1088. ast_cond_destroy(&doomed->cond);
  1089. }
  1090. struct ast_serializer_shutdown_group *ast_serializer_shutdown_group_alloc(void)
  1091. {
  1092. struct ast_serializer_shutdown_group *shutdown_group;
  1093. shutdown_group = ao2_alloc(sizeof(*shutdown_group), serializer_shutdown_group_dtor);
  1094. if (!shutdown_group) {
  1095. return NULL;
  1096. }
  1097. ast_cond_init(&shutdown_group->cond, NULL);
  1098. return shutdown_group;
  1099. }
  1100. int ast_serializer_shutdown_group_join(struct ast_serializer_shutdown_group *shutdown_group, int timeout)
  1101. {
  1102. int remaining;
  1103. ast_mutex_t *lock;
  1104. if (!shutdown_group) {
  1105. return 0;
  1106. }
  1107. lock = ao2_object_get_lockaddr(shutdown_group);
  1108. ast_assert(lock != NULL);
  1109. ao2_lock(shutdown_group);
  1110. if (timeout) {
  1111. struct timeval start;
  1112. struct timespec end;
  1113. start = ast_tvnow();
  1114. end.tv_sec = start.tv_sec + timeout;
  1115. end.tv_nsec = start.tv_usec * 1000;
  1116. while (shutdown_group->count) {
  1117. if (ast_cond_timedwait(&shutdown_group->cond, lock, &end)) {
  1118. /* Error or timed out waiting for the count to reach zero. */
  1119. break;
  1120. }
  1121. }
  1122. } else {
  1123. while (shutdown_group->count) {
  1124. if (ast_cond_wait(&shutdown_group->cond, lock)) {
  1125. /* Error */
  1126. break;
  1127. }
  1128. }
  1129. }
  1130. remaining = shutdown_group->count;
  1131. ao2_unlock(shutdown_group);
  1132. return remaining;
  1133. }
  1134. /*!
  1135. * \internal
  1136. * \brief Increment the number of serializer members in the group.
  1137. * \since 13.5.0
  1138. *
  1139. * \param shutdown_group Group shutdown controller.
  1140. *
  1141. * \return Nothing
  1142. */
  1143. static void serializer_shutdown_group_inc(struct ast_serializer_shutdown_group *shutdown_group)
  1144. {
  1145. ao2_lock(shutdown_group);
  1146. ++shutdown_group->count;
  1147. ao2_unlock(shutdown_group);
  1148. }
  1149. /*!
  1150. * \internal
  1151. * \brief Decrement the number of serializer members in the group.
  1152. * \since 13.5.0
  1153. *
  1154. * \param shutdown_group Group shutdown controller.
  1155. *
  1156. * \return Nothing
  1157. */
  1158. static void serializer_shutdown_group_dec(struct ast_serializer_shutdown_group *shutdown_group)
  1159. {
  1160. ao2_lock(shutdown_group);
  1161. --shutdown_group->count;
  1162. if (!shutdown_group->count) {
  1163. ast_cond_signal(&shutdown_group->cond);
  1164. }
  1165. ao2_unlock(shutdown_group);
  1166. }
  1167. struct serializer {
  1168. /*! Threadpool the serializer will use to process the jobs. */
  1169. struct ast_threadpool *pool;
  1170. /*! Which group will wait for this serializer to shutdown. */
  1171. struct ast_serializer_shutdown_group *shutdown_group;
  1172. };
  1173. static void serializer_dtor(void *obj)
  1174. {
  1175. struct serializer *ser = obj;
  1176. ao2_cleanup(ser->pool);
  1177. ser->pool = NULL;
  1178. ao2_cleanup(ser->shutdown_group);
  1179. ser->shutdown_group = NULL;
  1180. }
  1181. static struct serializer *serializer_create(struct ast_threadpool *pool,
  1182. struct ast_serializer_shutdown_group *shutdown_group)
  1183. {
  1184. struct serializer *ser;
  1185. ser = ao2_alloc_options(sizeof(*ser), serializer_dtor, AO2_ALLOC_OPT_LOCK_NOLOCK);
  1186. if (!ser) {
  1187. return NULL;
  1188. }
  1189. ao2_ref(pool, +1);
  1190. ser->pool = pool;
  1191. ser->shutdown_group = ao2_bump(shutdown_group);
  1192. return ser;
  1193. }
  1194. AST_THREADSTORAGE_RAW(current_serializer);
  1195. static int execute_tasks(void *data)
  1196. {
  1197. struct ast_taskprocessor *tps = data;
  1198. ast_threadstorage_set_ptr(&current_serializer, tps);
  1199. while (ast_taskprocessor_execute(tps)) {
  1200. /* No-op */
  1201. }
  1202. ast_threadstorage_set_ptr(&current_serializer, NULL);
  1203. ast_taskprocessor_unreference(tps);
  1204. return 0;
  1205. }
  1206. static void serializer_task_pushed(struct ast_taskprocessor_listener *listener, int was_empty)
  1207. {
  1208. if (was_empty) {
  1209. struct serializer *ser = ast_taskprocessor_listener_get_user_data(listener);
  1210. struct ast_taskprocessor *tps = ast_taskprocessor_listener_get_tps(listener);
  1211. if (ast_threadpool_push(ser->pool, execute_tasks, tps)) {
  1212. ast_taskprocessor_unreference(tps);
  1213. }
  1214. }
  1215. }
  1216. static int serializer_start(struct ast_taskprocessor_listener *listener)
  1217. {
  1218. /* No-op */
  1219. return 0;
  1220. }
  1221. static void serializer_shutdown(struct ast_taskprocessor_listener *listener)
  1222. {
  1223. struct serializer *ser = ast_taskprocessor_listener_get_user_data(listener);
  1224. if (ser->shutdown_group) {
  1225. serializer_shutdown_group_dec(ser->shutdown_group);
  1226. }
  1227. ao2_cleanup(ser);
  1228. }
  1229. static struct ast_taskprocessor_listener_callbacks serializer_tps_listener_callbacks = {
  1230. .task_pushed = serializer_task_pushed,
  1231. .start = serializer_start,
  1232. .shutdown = serializer_shutdown,
  1233. };
  1234. struct ast_taskprocessor *ast_threadpool_serializer_get_current(void)
  1235. {
  1236. return ast_threadstorage_get_ptr(&current_serializer);
  1237. }
  1238. struct ast_taskprocessor *ast_threadpool_serializer_group(const char *name,
  1239. struct ast_threadpool *pool, struct ast_serializer_shutdown_group *shutdown_group)
  1240. {
  1241. struct serializer *ser;
  1242. struct ast_taskprocessor_listener *listener;
  1243. struct ast_taskprocessor *tps;
  1244. ser = serializer_create(pool, shutdown_group);
  1245. if (!ser) {
  1246. return NULL;
  1247. }
  1248. listener = ast_taskprocessor_listener_alloc(&serializer_tps_listener_callbacks, ser);
  1249. if (!listener) {
  1250. ao2_ref(ser, -1);
  1251. return NULL;
  1252. }
  1253. tps = ast_taskprocessor_create_with_listener(name, listener);
  1254. if (!tps) {
  1255. /* ser ref transferred to listener but not cleaned without tps */
  1256. ao2_ref(ser, -1);
  1257. } else if (shutdown_group) {
  1258. serializer_shutdown_group_inc(shutdown_group);
  1259. }
  1260. ao2_ref(listener, -1);
  1261. return tps;
  1262. }
  1263. struct ast_taskprocessor *ast_threadpool_serializer(const char *name, struct ast_threadpool *pool)
  1264. {
  1265. return ast_threadpool_serializer_group(name, pool, NULL);
  1266. }
  1267. long ast_threadpool_queue_size(struct ast_threadpool *pool)
  1268. {
  1269. return ast_taskprocessor_size(pool->tps);
  1270. }