test_taskprocessor.c 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768
  1. /*
  2. * Asterisk -- An open source telephony toolkit.
  3. *
  4. * Copyright (C) 2012-2013, Digium, Inc.
  5. *
  6. * Mark Michelson <mmichelson@digium.com>
  7. *
  8. * See http://www.asterisk.org for more information about
  9. * the Asterisk project. Please do not directly contact
  10. * any of the maintainers of this project for assistance;
  11. * the project provides a web site, mailing lists and IRC
  12. * channels for your use.
  13. *
  14. * This program is free software, distributed under the terms of
  15. * the GNU General Public License Version 2. See the LICENSE file
  16. * at the top of the source tree.
  17. */
  18. /*!
  19. * \file
  20. * \brief taskprocessor unit tests
  21. *
  22. * \author Mark Michelson <mmichelson@digium.com>
  23. *
  24. */
  25. /*** MODULEINFO
  26. <depend>TEST_FRAMEWORK</depend>
  27. <support_level>core</support_level>
  28. ***/
  29. #include "asterisk.h"
  30. #include "asterisk/test.h"
  31. #include "asterisk/taskprocessor.h"
  32. #include "asterisk/module.h"
  33. #include "asterisk/astobj2.h"
  34. /*!
  35. * \brief userdata associated with baseline taskprocessor test
  36. */
  37. struct task_data {
  38. /* Condition used to signal to queuing thread that task was executed */
  39. ast_cond_t cond;
  40. /* Lock protecting the condition */
  41. ast_mutex_t lock;
  42. /*! Boolean indicating that the task was run */
  43. int task_complete;
  44. };
  45. static void task_data_dtor(void *obj)
  46. {
  47. struct task_data *task_data = obj;
  48. ast_mutex_destroy(&task_data->lock);
  49. ast_cond_destroy(&task_data->cond);
  50. }
  51. /*! \brief Create a task_data object */
  52. static struct task_data *task_data_create(void)
  53. {
  54. struct task_data *task_data =
  55. ao2_alloc(sizeof(*task_data), task_data_dtor);
  56. if (!task_data) {
  57. return NULL;
  58. }
  59. ast_cond_init(&task_data->cond, NULL);
  60. ast_mutex_init(&task_data->lock);
  61. task_data->task_complete = 0;
  62. return task_data;
  63. }
  64. /*!
  65. * \brief Queued task for baseline test.
  66. *
  67. * The task simply sets a boolean to indicate the
  68. * task has been run and then signals a condition
  69. * saying it's complete
  70. */
  71. static int task(void *data)
  72. {
  73. struct task_data *task_data = data;
  74. SCOPED_MUTEX(lock, &task_data->lock);
  75. task_data->task_complete = 1;
  76. ast_cond_signal(&task_data->cond);
  77. return 0;
  78. }
  79. /*!
  80. * \brief Wait for a task to execute.
  81. */
  82. static int task_wait(struct task_data *task_data)
  83. {
  84. struct timeval start = ast_tvnow();
  85. struct timespec end;
  86. SCOPED_MUTEX(lock, &task_data->lock);
  87. end.tv_sec = start.tv_sec + 30;
  88. end.tv_nsec = start.tv_usec * 1000;
  89. while (!task_data->task_complete) {
  90. int res;
  91. res = ast_cond_timedwait(&task_data->cond, &task_data->lock,
  92. &end);
  93. if (res == ETIMEDOUT) {
  94. return -1;
  95. }
  96. }
  97. return 0;
  98. }
  99. /*!
  100. * \brief Baseline test for default taskprocessor
  101. *
  102. * This test ensures that when a task is added to a taskprocessor that
  103. * has been allocated with a default listener that the task gets executed
  104. * as expected
  105. */
  106. AST_TEST_DEFINE(default_taskprocessor)
  107. {
  108. RAII_VAR(struct ast_taskprocessor *, tps, NULL, ast_taskprocessor_unreference);
  109. RAII_VAR(struct task_data *, task_data, NULL, ao2_cleanup);
  110. int res;
  111. switch (cmd) {
  112. case TEST_INIT:
  113. info->name = "default_taskprocessor";
  114. info->category = "/main/taskprocessor/";
  115. info->summary = "Test of default taskproccesor";
  116. info->description =
  117. "Ensures that a queued task gets executed.";
  118. return AST_TEST_NOT_RUN;
  119. case TEST_EXECUTE:
  120. break;
  121. }
  122. tps = ast_taskprocessor_get("test", TPS_REF_DEFAULT);
  123. if (!tps) {
  124. ast_test_status_update(test, "Unable to create test taskprocessor\n");
  125. return AST_TEST_FAIL;
  126. }
  127. task_data = task_data_create();
  128. if (!task_data) {
  129. ast_test_status_update(test, "Unable to create task_data\n");
  130. return AST_TEST_FAIL;
  131. }
  132. if (ast_taskprocessor_push(tps, task, task_data)) {
  133. ast_test_status_update(test, "Failed to queue task\n");
  134. return AST_TEST_FAIL;
  135. }
  136. res = task_wait(task_data);
  137. if (res != 0) {
  138. ast_test_status_update(test, "Queued task did not execute!\n");
  139. return AST_TEST_FAIL;
  140. }
  141. return AST_TEST_PASS;
  142. }
  143. #define NUM_TASKS 20000
  144. /*!
  145. * \brief Relevant data associated with taskprocessor load test
  146. */
  147. static struct load_task_data {
  148. /*! Condition used to indicate a task has completed executing */
  149. ast_cond_t cond;
  150. /*! Lock used to protect the condition */
  151. ast_mutex_t lock;
  152. /*! Counter of the number of completed tasks */
  153. int tasks_completed;
  154. /*! Storage for task-specific data */
  155. int task_rand[NUM_TASKS];
  156. } load_task_results;
  157. /*!
  158. * \brief a queued task to be used in the taskprocessor load test
  159. *
  160. * The task increments the number of tasks executed and puts the passed-in
  161. * data into the next slot in the array of random data.
  162. */
  163. static int load_task(void *data)
  164. {
  165. int *randdata = data;
  166. SCOPED_MUTEX(lock, &load_task_results.lock);
  167. load_task_results.task_rand[load_task_results.tasks_completed++] = *randdata;
  168. ast_cond_signal(&load_task_results.cond);
  169. return 0;
  170. }
  171. /*!
  172. * \brief Load test for taskprocessor with default listener
  173. *
  174. * This test queues a large number of tasks, each with random data associated.
  175. * The test ensures that all of the tasks are run and that the tasks are executed
  176. * in the same order that they were queued
  177. */
  178. AST_TEST_DEFINE(default_taskprocessor_load)
  179. {
  180. struct ast_taskprocessor *tps;
  181. struct timeval start;
  182. struct timespec ts;
  183. enum ast_test_result_state res = AST_TEST_PASS;
  184. int timedwait_res;
  185. int i;
  186. int rand_data[NUM_TASKS];
  187. switch (cmd) {
  188. case TEST_INIT:
  189. info->name = "default_taskprocessor_load";
  190. info->category = "/main/taskprocessor/";
  191. info->summary = "Load test of default taskproccesor";
  192. info->description =
  193. "Ensure that a large number of queued tasks are executed in the proper order.";
  194. return AST_TEST_NOT_RUN;
  195. case TEST_EXECUTE:
  196. break;
  197. }
  198. tps = ast_taskprocessor_get("test", TPS_REF_DEFAULT);
  199. if (!tps) {
  200. ast_test_status_update(test, "Unable to create test taskprocessor\n");
  201. return AST_TEST_FAIL;
  202. }
  203. start = ast_tvnow();
  204. ts.tv_sec = start.tv_sec + 60;
  205. ts.tv_nsec = start.tv_usec * 1000;
  206. ast_cond_init(&load_task_results.cond, NULL);
  207. ast_mutex_init(&load_task_results.lock);
  208. load_task_results.tasks_completed = 0;
  209. for (i = 0; i < NUM_TASKS; ++i) {
  210. rand_data[i] = ast_random();
  211. if (ast_taskprocessor_push(tps, load_task, &rand_data[i])) {
  212. ast_test_status_update(test, "Failed to queue task\n");
  213. res = AST_TEST_FAIL;
  214. goto test_end;
  215. }
  216. }
  217. ast_mutex_lock(&load_task_results.lock);
  218. while (load_task_results.tasks_completed < NUM_TASKS) {
  219. timedwait_res = ast_cond_timedwait(&load_task_results.cond, &load_task_results.lock, &ts);
  220. if (timedwait_res == ETIMEDOUT) {
  221. break;
  222. }
  223. }
  224. ast_mutex_unlock(&load_task_results.lock);
  225. if (load_task_results.tasks_completed != NUM_TASKS) {
  226. ast_test_status_update(test, "Unexpected number of tasks executed. Expected %d but got %d\n",
  227. NUM_TASKS, load_task_results.tasks_completed);
  228. res = AST_TEST_FAIL;
  229. goto test_end;
  230. }
  231. for (i = 0; i < NUM_TASKS; ++i) {
  232. if (rand_data[i] != load_task_results.task_rand[i]) {
  233. ast_test_status_update(test, "Queued tasks did not execute in order\n");
  234. res = AST_TEST_FAIL;
  235. goto test_end;
  236. }
  237. }
  238. test_end:
  239. tps = ast_taskprocessor_unreference(tps);
  240. ast_mutex_destroy(&load_task_results.lock);
  241. ast_cond_destroy(&load_task_results.cond);
  242. return res;
  243. }
  244. /*!
  245. * \brief Private data for the test taskprocessor listener
  246. */
  247. struct test_listener_pvt {
  248. /* Counter of number of tasks pushed to the queue */
  249. int num_pushed;
  250. /* Counter of number of times the queue was emptied */
  251. int num_emptied;
  252. /* Counter of number of times that a pushed task occurred on an empty queue */
  253. int num_was_empty;
  254. /* Boolean indicating whether the shutdown callback was called */
  255. int shutdown;
  256. };
  257. /*!
  258. * \brief test taskprocessor listener's alloc callback
  259. */
  260. static void *test_listener_pvt_alloc(void)
  261. {
  262. struct test_listener_pvt *pvt;
  263. pvt = ast_calloc(1, sizeof(*pvt));
  264. return pvt;
  265. }
  266. /*!
  267. * \brief test taskprocessor listener's start callback
  268. */
  269. static int test_start(struct ast_taskprocessor_listener *listener)
  270. {
  271. return 0;
  272. }
  273. /*!
  274. * \brief test taskprocessor listener's task_pushed callback
  275. *
  276. * Adjusts private data's stats as indicated by the parameters.
  277. */
  278. static void test_task_pushed(struct ast_taskprocessor_listener *listener, int was_empty)
  279. {
  280. struct test_listener_pvt *pvt = ast_taskprocessor_listener_get_user_data(listener);
  281. ++pvt->num_pushed;
  282. if (was_empty) {
  283. ++pvt->num_was_empty;
  284. }
  285. }
  286. /*!
  287. * \brief test taskprocessor listener's emptied callback.
  288. */
  289. static void test_emptied(struct ast_taskprocessor_listener *listener)
  290. {
  291. struct test_listener_pvt *pvt = ast_taskprocessor_listener_get_user_data(listener);
  292. ++pvt->num_emptied;
  293. }
  294. /*!
  295. * \brief test taskprocessor listener's shutdown callback.
  296. */
  297. static void test_shutdown(struct ast_taskprocessor_listener *listener)
  298. {
  299. struct test_listener_pvt *pvt = ast_taskprocessor_listener_get_user_data(listener);
  300. pvt->shutdown = 1;
  301. }
  302. static const struct ast_taskprocessor_listener_callbacks test_callbacks = {
  303. .start = test_start,
  304. .task_pushed = test_task_pushed,
  305. .emptied = test_emptied,
  306. .shutdown = test_shutdown,
  307. };
  308. /*!
  309. * \brief Queued task for taskprocessor listener test.
  310. *
  311. * Does nothing.
  312. */
  313. static int listener_test_task(void *ignore)
  314. {
  315. return 0;
  316. }
  317. /*!
  318. * \brief helper to ensure that statistics the listener is keeping are what we expect
  319. *
  320. * \param test The currently-running test
  321. * \param pvt The private data for the taskprocessor listener
  322. * \param num_pushed The expected current number of tasks pushed to the processor
  323. * \param num_emptied The expected current number of times the taskprocessor has become empty
  324. * \param num_was_empty The expected current number of times that tasks were pushed to an empty taskprocessor
  325. * \retval -1 Stats were not as expected
  326. * \retval 0 Stats were as expected
  327. */
  328. static int check_stats(struct ast_test *test, const struct test_listener_pvt *pvt, int num_pushed, int num_emptied, int num_was_empty)
  329. {
  330. if (pvt->num_pushed != num_pushed) {
  331. ast_test_status_update(test, "Unexpected number of tasks pushed. Expected %d but got %d\n",
  332. num_pushed, pvt->num_pushed);
  333. return -1;
  334. }
  335. if (pvt->num_emptied != num_emptied) {
  336. ast_test_status_update(test, "Unexpected number of empties. Expected %d but got %d\n",
  337. num_emptied, pvt->num_emptied);
  338. return -1;
  339. }
  340. if (pvt->num_was_empty != num_was_empty) {
  341. ast_test_status_update(test, "Unexpected number of empties. Expected %d but got %d\n",
  342. num_was_empty, pvt->num_emptied);
  343. return -1;
  344. }
  345. return 0;
  346. }
  347. /*!
  348. * \brief Test for a taskprocessor with custom listener.
  349. *
  350. * This test pushes tasks to a taskprocessor with a custom listener, executes the taskss,
  351. * and destroys the taskprocessor.
  352. *
  353. * The test ensures that the listener's callbacks are called when expected and that the data
  354. * being passed in is accurate.
  355. */
  356. AST_TEST_DEFINE(taskprocessor_listener)
  357. {
  358. struct ast_taskprocessor *tps = NULL;
  359. struct ast_taskprocessor_listener *listener = NULL;
  360. struct test_listener_pvt *pvt = NULL;
  361. enum ast_test_result_state res = AST_TEST_PASS;
  362. switch (cmd) {
  363. case TEST_INIT:
  364. info->name = "taskprocessor_listener";
  365. info->category = "/main/taskprocessor/";
  366. info->summary = "Test of taskproccesor listeners";
  367. info->description =
  368. "Ensures that listener callbacks are called when expected.";
  369. return AST_TEST_NOT_RUN;
  370. case TEST_EXECUTE:
  371. break;
  372. }
  373. pvt = test_listener_pvt_alloc();
  374. if (!pvt) {
  375. ast_test_status_update(test, "Unable to allocate test taskprocessor listener user data\n");
  376. return AST_TEST_FAIL;
  377. }
  378. listener = ast_taskprocessor_listener_alloc(&test_callbacks, pvt);
  379. if (!listener) {
  380. ast_test_status_update(test, "Unable to allocate test taskprocessor listener\n");
  381. res = AST_TEST_FAIL;
  382. goto test_exit;
  383. }
  384. tps = ast_taskprocessor_create_with_listener("test_listener", listener);
  385. if (!tps) {
  386. ast_test_status_update(test, "Unable to allocate test taskprocessor\n");
  387. res = AST_TEST_FAIL;
  388. goto test_exit;
  389. }
  390. if (ast_taskprocessor_push(tps, listener_test_task, NULL)) {
  391. ast_test_status_update(test, "Failed to queue task\n");
  392. res = AST_TEST_FAIL;
  393. goto test_exit;
  394. }
  395. if (check_stats(test, pvt, 1, 0, 1) < 0) {
  396. res = AST_TEST_FAIL;
  397. goto test_exit;
  398. }
  399. if (ast_taskprocessor_push(tps, listener_test_task, NULL)) {
  400. ast_test_status_update(test, "Failed to queue task\n");
  401. res = AST_TEST_FAIL;
  402. goto test_exit;
  403. }
  404. if (check_stats(test, pvt, 2, 0, 1) < 0) {
  405. res = AST_TEST_FAIL;
  406. goto test_exit;
  407. }
  408. ast_taskprocessor_execute(tps);
  409. if (check_stats(test, pvt, 2, 0, 1) < 0) {
  410. res = AST_TEST_FAIL;
  411. goto test_exit;
  412. }
  413. ast_taskprocessor_execute(tps);
  414. if (check_stats(test, pvt, 2, 1, 1) < 0) {
  415. res = AST_TEST_FAIL;
  416. goto test_exit;
  417. }
  418. tps = ast_taskprocessor_unreference(tps);
  419. if (!pvt->shutdown) {
  420. res = AST_TEST_FAIL;
  421. goto test_exit;
  422. }
  423. test_exit:
  424. ao2_cleanup(listener);
  425. /* This is safe even if tps is NULL */
  426. ast_taskprocessor_unreference(tps);
  427. ast_free(pvt);
  428. return res;
  429. }
  430. struct shutdown_data {
  431. ast_cond_t in;
  432. ast_cond_t out;
  433. ast_mutex_t lock;
  434. int task_complete;
  435. int task_started;
  436. int task_stop_waiting;
  437. };
  438. static void shutdown_data_dtor(void *data)
  439. {
  440. struct shutdown_data *shutdown_data = data;
  441. ast_mutex_destroy(&shutdown_data->lock);
  442. ast_cond_destroy(&shutdown_data->in);
  443. ast_cond_destroy(&shutdown_data->out);
  444. }
  445. static struct shutdown_data *shutdown_data_create(int dont_wait)
  446. {
  447. RAII_VAR(struct shutdown_data *, shutdown_data, NULL, ao2_cleanup);
  448. shutdown_data = ao2_alloc(sizeof(*shutdown_data), shutdown_data_dtor);
  449. if (!shutdown_data) {
  450. return NULL;
  451. }
  452. ast_mutex_init(&shutdown_data->lock);
  453. ast_cond_init(&shutdown_data->in, NULL);
  454. ast_cond_init(&shutdown_data->out, NULL);
  455. shutdown_data->task_stop_waiting = dont_wait;
  456. ao2_ref(shutdown_data, +1);
  457. return shutdown_data;
  458. }
  459. static int shutdown_task_exec(void *data)
  460. {
  461. struct shutdown_data *shutdown_data = data;
  462. SCOPED_MUTEX(lock, &shutdown_data->lock);
  463. shutdown_data->task_started = 1;
  464. ast_cond_signal(&shutdown_data->out);
  465. while (!shutdown_data->task_stop_waiting) {
  466. ast_cond_wait(&shutdown_data->in, &shutdown_data->lock);
  467. }
  468. shutdown_data->task_complete = 1;
  469. ast_cond_signal(&shutdown_data->out);
  470. return 0;
  471. }
  472. static int shutdown_waitfor_completion(struct shutdown_data *shutdown_data)
  473. {
  474. struct timeval start = ast_tvnow();
  475. struct timespec end = {
  476. .tv_sec = start.tv_sec + 5,
  477. .tv_nsec = start.tv_usec * 1000
  478. };
  479. SCOPED_MUTEX(lock, &shutdown_data->lock);
  480. while (!shutdown_data->task_complete) {
  481. if (ast_cond_timedwait(&shutdown_data->out, &shutdown_data->lock, &end) == ETIMEDOUT) {
  482. break;
  483. }
  484. }
  485. return shutdown_data->task_complete;
  486. }
  487. static int shutdown_has_completed(struct shutdown_data *shutdown_data)
  488. {
  489. SCOPED_MUTEX(lock, &shutdown_data->lock);
  490. return shutdown_data->task_complete;
  491. }
  492. static int shutdown_waitfor_start(struct shutdown_data *shutdown_data)
  493. {
  494. struct timeval start = ast_tvnow();
  495. struct timespec end = {
  496. .tv_sec = start.tv_sec + 5,
  497. .tv_nsec = start.tv_usec * 1000
  498. };
  499. SCOPED_MUTEX(lock, &shutdown_data->lock);
  500. while (!shutdown_data->task_started) {
  501. if (ast_cond_timedwait(&shutdown_data->out, &shutdown_data->lock, &end) == ETIMEDOUT) {
  502. break;
  503. }
  504. }
  505. return shutdown_data->task_started;
  506. }
  507. static void shutdown_poke(struct shutdown_data *shutdown_data)
  508. {
  509. SCOPED_MUTEX(lock, &shutdown_data->lock);
  510. shutdown_data->task_stop_waiting = 1;
  511. ast_cond_signal(&shutdown_data->in);
  512. }
  513. static void *tps_shutdown_thread(void *data)
  514. {
  515. struct ast_taskprocessor *tps = data;
  516. ast_taskprocessor_unreference(tps);
  517. return NULL;
  518. }
  519. AST_TEST_DEFINE(taskprocessor_shutdown)
  520. {
  521. RAII_VAR(struct ast_taskprocessor *, tps, NULL, ast_taskprocessor_unreference);
  522. RAII_VAR(struct shutdown_data *, task1, NULL, ao2_cleanup);
  523. RAII_VAR(struct shutdown_data *, task2, NULL, ao2_cleanup);
  524. int push_res;
  525. int wait_res;
  526. int pthread_res;
  527. pthread_t shutdown_thread;
  528. switch (cmd) {
  529. case TEST_INIT:
  530. info->name = "taskprocessor_shutdown";
  531. info->category = "/main/taskprocessor/";
  532. info->summary = "Test of taskproccesor shutdown sequence";
  533. info->description =
  534. "Ensures that all tasks run to completion after the taskprocessor has been unref'ed.";
  535. return AST_TEST_NOT_RUN;
  536. case TEST_EXECUTE:
  537. break;
  538. }
  539. tps = ast_taskprocessor_get("test_shutdown", TPS_REF_DEFAULT);
  540. task1 = shutdown_data_create(0); /* task1 waits to be poked */
  541. task2 = shutdown_data_create(1); /* task2 waits for nothing */
  542. if (!tps || !task1 || !task2) {
  543. ast_test_status_update(test, "Allocation error\n");
  544. return AST_TEST_FAIL;
  545. }
  546. push_res = ast_taskprocessor_push(tps, shutdown_task_exec, task1);
  547. if (push_res != 0) {
  548. ast_test_status_update(test, "Could not push task1\n");
  549. return AST_TEST_FAIL;
  550. }
  551. push_res = ast_taskprocessor_push(tps, shutdown_task_exec, task2);
  552. if (push_res != 0) {
  553. ast_test_status_update(test, "Could not push task2\n");
  554. return AST_TEST_FAIL;
  555. }
  556. wait_res = shutdown_waitfor_start(task1);
  557. if (!wait_res) {
  558. ast_test_status_update(test, "Task1 didn't start\n");
  559. return AST_TEST_FAIL;
  560. }
  561. pthread_res = ast_pthread_create(&shutdown_thread, NULL, tps_shutdown_thread, tps);
  562. if (pthread_res != 0) {
  563. ast_test_status_update(test, "Failed to create shutdown thread\n");
  564. return AST_TEST_FAIL;
  565. }
  566. tps = NULL;
  567. /* Wakeup task1; it should complete */
  568. shutdown_poke(task1);
  569. wait_res = shutdown_waitfor_completion(task1);
  570. if (!wait_res) {
  571. ast_test_status_update(test, "Task1 didn't complete\n");
  572. return AST_TEST_FAIL;
  573. }
  574. /* Wait for shutdown to complete */
  575. pthread_join(shutdown_thread, NULL);
  576. /* Should have also completed task2 */
  577. wait_res = shutdown_has_completed(task2);
  578. if (!wait_res) {
  579. ast_test_status_update(test, "Task2 didn't finish\n");
  580. return AST_TEST_FAIL;
  581. }
  582. return AST_TEST_PASS;
  583. }
  584. static int local_task_exe(struct ast_taskprocessor_local *local)
  585. {
  586. int *local_data = local->local_data;
  587. struct task_data *task_data = local->data;
  588. *local_data = 1;
  589. task(task_data);
  590. return 0;
  591. }
  592. AST_TEST_DEFINE(taskprocessor_push_local)
  593. {
  594. RAII_VAR(struct ast_taskprocessor *, tps, NULL,
  595. ast_taskprocessor_unreference);
  596. RAII_VAR(struct task_data *, task_data, NULL, ao2_cleanup);
  597. int local_data;
  598. int res;
  599. switch (cmd) {
  600. case TEST_INIT:
  601. info->name = __func__;
  602. info->category = "/main/taskprocessor/";
  603. info->summary = "Test of pushing local data";
  604. info->description =
  605. "Ensures that local data is passed along.";
  606. return AST_TEST_NOT_RUN;
  607. case TEST_EXECUTE:
  608. break;
  609. }
  610. tps = ast_taskprocessor_get("test", TPS_REF_DEFAULT);
  611. if (!tps) {
  612. ast_test_status_update(test, "Unable to create test taskprocessor\n");
  613. return AST_TEST_FAIL;
  614. }
  615. task_data = task_data_create();
  616. if (!task_data) {
  617. ast_test_status_update(test, "Unable to create task_data\n");
  618. return AST_TEST_FAIL;
  619. }
  620. local_data = 0;
  621. ast_taskprocessor_set_local(tps, &local_data);
  622. if (ast_taskprocessor_push_local(tps, local_task_exe, task_data)) {
  623. ast_test_status_update(test, "Failed to queue task\n");
  624. return AST_TEST_FAIL;
  625. }
  626. res = task_wait(task_data);
  627. if (res != 0) {
  628. ast_test_status_update(test, "Queued task did not execute!\n");
  629. return AST_TEST_FAIL;
  630. }
  631. if (local_data != 1) {
  632. ast_test_status_update(test,
  633. "Queued task did not set local_data!\n");
  634. return AST_TEST_FAIL;
  635. }
  636. return AST_TEST_PASS;
  637. }
  638. static int unload_module(void)
  639. {
  640. ast_test_unregister(default_taskprocessor);
  641. ast_test_unregister(default_taskprocessor_load);
  642. ast_test_unregister(taskprocessor_listener);
  643. ast_test_unregister(taskprocessor_shutdown);
  644. ast_test_unregister(taskprocessor_push_local);
  645. return 0;
  646. }
  647. static int load_module(void)
  648. {
  649. ast_test_register(default_taskprocessor);
  650. ast_test_register(default_taskprocessor_load);
  651. ast_test_register(taskprocessor_listener);
  652. ast_test_register(taskprocessor_shutdown);
  653. ast_test_register(taskprocessor_push_local);
  654. return AST_MODULE_LOAD_SUCCESS;
  655. }
  656. AST_MODULE_INFO_STANDARD(ASTERISK_GPL_KEY, "taskprocessor test module");