test_threadpool.c 42 KB


  1. /*
  2. * Asterisk -- An open source telephony toolkit.
  3. *
  4. * Copyright (C) 2012-2013, Digium, Inc.
  5. *
  6. * Mark Michelson <mmichelson@digium.com>
  7. *
  8. * See http://www.asterisk.org for more information about
  9. * the Asterisk project. Please do not directly contact
  10. * any of the maintainers of this project for assistance;
  11. * the project provides a web site, mailing lists and IRC
  12. * channels for your use.
  13. *
  14. * This program is free software, distributed under the terms of
  15. * the GNU General Public License Version 2. See the LICENSE file
  16. * at the top of the source tree.
  17. */
  18. /*!
  19. * \file
  20. * \brief threadpool unit tests
  21. *
  22. * \author Mark Michelson <mmichelson@digium.com>
  23. *
  24. */
  25. /*** MODULEINFO
  26. <depend>TEST_FRAMEWORK</depend>
  27. <support_level>core</support_level>
  28. ***/
  29. #include "asterisk.h"
  30. #include "asterisk/astobj2.h"
  31. #include "asterisk/lock.h"
  32. #include "asterisk/logger.h"
  33. #include "asterisk/module.h"
  34. #include "asterisk/taskprocessor.h"
  35. #include "asterisk/test.h"
  36. #include "asterisk/threadpool.h"
  37. struct test_listener_data {
  38. int num_active;
  39. int num_idle;
  40. int task_pushed;
  41. int num_tasks;
  42. int empty_notice;
  43. int was_empty;
  44. ast_mutex_t lock;
  45. ast_cond_t cond;
  46. };
  47. static struct test_listener_data *test_alloc(void)
  48. {
  49. struct test_listener_data *tld = ast_calloc(1, sizeof(*tld));
  50. if (!tld) {
  51. return NULL;
  52. }
  53. ast_mutex_init(&tld->lock);
  54. ast_cond_init(&tld->cond, NULL);
  55. return tld;
  56. }
  57. static void test_state_changed(struct ast_threadpool *pool,
  58. struct ast_threadpool_listener *listener,
  59. int active_threads,
  60. int idle_threads)
  61. {
  62. struct test_listener_data *tld = ast_threadpool_listener_get_user_data(listener);
  63. SCOPED_MUTEX(lock, &tld->lock);
  64. tld->num_active = active_threads;
  65. tld->num_idle = idle_threads;
  66. ast_log(LOG_NOTICE, "Thread state: %d active, %d idle\n", tld->num_active, tld->num_idle);
  67. ast_cond_signal(&tld->cond);
  68. }
  69. static void test_task_pushed(struct ast_threadpool *pool,
  70. struct ast_threadpool_listener *listener,
  71. int was_empty)
  72. {
  73. struct test_listener_data *tld = ast_threadpool_listener_get_user_data(listener);
  74. SCOPED_MUTEX(lock, &tld->lock);
  75. tld->task_pushed = 1;
  76. ++tld->num_tasks;
  77. tld->was_empty = was_empty;
  78. ast_cond_signal(&tld->cond);
  79. }
  80. static void test_emptied(struct ast_threadpool *pool,
  81. struct ast_threadpool_listener *listener)
  82. {
  83. struct test_listener_data *tld = ast_threadpool_listener_get_user_data(listener);
  84. SCOPED_MUTEX(lock, &tld->lock);
  85. tld->empty_notice = 1;
  86. ast_cond_signal(&tld->cond);
  87. }
  88. static void test_shutdown(struct ast_threadpool_listener *listener)
  89. {
  90. struct test_listener_data *tld = ast_threadpool_listener_get_user_data(listener);
  91. ast_cond_destroy(&tld->cond);
  92. ast_mutex_destroy(&tld->lock);
  93. }
  94. static const struct ast_threadpool_listener_callbacks test_callbacks = {
  95. .state_changed = test_state_changed,
  96. .task_pushed = test_task_pushed,
  97. .emptied = test_emptied,
  98. .shutdown = test_shutdown,
  99. };
  100. struct simple_task_data {
  101. int task_executed;
  102. ast_mutex_t lock;
  103. ast_cond_t cond;
  104. };
  105. static struct simple_task_data *simple_task_data_alloc(void)
  106. {
  107. struct simple_task_data *std = ast_calloc(1, sizeof(*std));
  108. if (!std) {
  109. return NULL;
  110. }
  111. ast_mutex_init(&std->lock);
  112. ast_cond_init(&std->cond, NULL);
  113. return std;
  114. }
  115. static void simple_task_data_free(struct simple_task_data *std)
  116. {
  117. if (!std) {
  118. return;
  119. }
  120. ast_mutex_destroy(&std->lock);
  121. ast_cond_destroy(&std->cond);
  122. ast_free(std);
  123. }
  124. static int simple_task(void *data)
  125. {
  126. struct simple_task_data *std = data;
  127. SCOPED_MUTEX(lock, &std->lock);
  128. std->task_executed = 1;
  129. ast_cond_signal(&std->cond);
  130. return 0;
  131. }
  132. static enum ast_test_result_state wait_until_thread_state(struct ast_test *test, struct test_listener_data *tld, int num_active, int num_idle)
  133. {
  134. struct timeval start = ast_tvnow();
  135. struct timespec end = {
  136. .tv_sec = start.tv_sec + 5,
  137. .tv_nsec = start.tv_usec * 1000
  138. };
  139. enum ast_test_result_state res = AST_TEST_PASS;
  140. SCOPED_MUTEX(lock, &tld->lock);
  141. while (!(tld->num_active == num_active && tld->num_idle == num_idle)) {
  142. if (ast_cond_timedwait(&tld->cond, &tld->lock, &end) == ETIMEDOUT) {
  143. break;
  144. }
  145. }
  146. if (tld->num_active != num_active && tld->num_idle != num_idle) {
  147. ast_test_status_update(test, "Number of active threads and idle threads not what was expected.\n");
  148. ast_test_status_update(test, "Expected %d active threads but got %d\n", num_active, tld->num_active);
  149. ast_test_status_update(test, "Expected %d idle threads but got %d\n", num_idle, tld->num_idle);
  150. res = AST_TEST_FAIL;
  151. }
  152. return res;
  153. }
  154. static void wait_for_task_pushed(struct ast_threadpool_listener *listener)
  155. {
  156. struct test_listener_data *tld = ast_threadpool_listener_get_user_data(listener);
  157. struct timeval start = ast_tvnow();
  158. struct timespec end = {
  159. .tv_sec = start.tv_sec + 5,
  160. .tv_nsec = start.tv_usec * 1000
  161. };
  162. SCOPED_MUTEX(lock, &tld->lock);
  163. while (!tld->task_pushed) {
  164. if (ast_cond_timedwait(&tld->cond, lock, &end) == ETIMEDOUT) {
  165. break;
  166. }
  167. }
  168. }
  169. static enum ast_test_result_state wait_for_completion(struct ast_test *test, struct simple_task_data *std)
  170. {
  171. struct timeval start = ast_tvnow();
  172. struct timespec end = {
  173. .tv_sec = start.tv_sec + 5,
  174. .tv_nsec = start.tv_usec * 1000
  175. };
  176. enum ast_test_result_state res = AST_TEST_PASS;
  177. SCOPED_MUTEX(lock, &std->lock);
  178. while (!std->task_executed) {
  179. if (ast_cond_timedwait(&std->cond, lock, &end) == ETIMEDOUT) {
  180. break;
  181. }
  182. }
  183. if (!std->task_executed) {
  184. ast_test_status_update(test, "Task execution did not occur\n");
  185. res = AST_TEST_FAIL;
  186. }
  187. return res;
  188. }
  189. static enum ast_test_result_state wait_for_empty_notice(struct ast_test *test, struct test_listener_data *tld)
  190. {
  191. struct timeval start = ast_tvnow();
  192. struct timespec end = {
  193. .tv_sec = start.tv_sec + 5,
  194. .tv_nsec = start.tv_usec * 1000
  195. };
  196. enum ast_test_result_state res = AST_TEST_PASS;
  197. SCOPED_MUTEX(lock, &tld->lock);
  198. while (!tld->empty_notice) {
  199. if (ast_cond_timedwait(&tld->cond, lock, &end) == ETIMEDOUT) {
  200. break;
  201. }
  202. }
  203. if (!tld->empty_notice) {
  204. ast_test_status_update(test, "Test listener not notified that threadpool is empty\n");
  205. res = AST_TEST_FAIL;
  206. }
  207. return res;
  208. }
  209. static enum ast_test_result_state listener_check(
  210. struct ast_test *test,
  211. struct ast_threadpool_listener *listener,
  212. int task_pushed,
  213. int was_empty,
  214. int num_tasks,
  215. int num_active,
  216. int num_idle,
  217. int empty_notice)
  218. {
  219. struct test_listener_data *tld = ast_threadpool_listener_get_user_data(listener);
  220. enum ast_test_result_state res = AST_TEST_PASS;
  221. if (tld->task_pushed != task_pushed) {
  222. ast_test_status_update(test, "Expected task %sto be pushed, but it was%s\n",
  223. task_pushed ? "" : "not ", tld->task_pushed ? "" : " not");
  224. res = AST_TEST_FAIL;
  225. }
  226. if (tld->was_empty != was_empty) {
  227. ast_test_status_update(test, "Expected %sto be empty, but it was%s\n",
  228. was_empty ? "" : "not ", tld->was_empty ? "" : " not");
  229. res = AST_TEST_FAIL;
  230. }
  231. if (tld->num_tasks!= num_tasks) {
  232. ast_test_status_update(test, "Expected %d tasks to be pushed, but got %d\n",
  233. num_tasks, tld->num_tasks);
  234. res = AST_TEST_FAIL;
  235. }
  236. if (tld->num_active != num_active) {
  237. ast_test_status_update(test, "Expected %d active threads, but got %d\n",
  238. num_active, tld->num_active);
  239. res = AST_TEST_FAIL;
  240. }
  241. if (tld->num_idle != num_idle) {
  242. ast_test_status_update(test, "Expected %d idle threads, but got %d\n",
  243. num_idle, tld->num_idle);
  244. res = AST_TEST_FAIL;
  245. }
  246. if (tld->empty_notice != empty_notice) {
  247. ast_test_status_update(test, "Expected %s empty notice, but got %s\n",
  248. was_empty ? "an" : "no", tld->task_pushed ? "one" : "none");
  249. res = AST_TEST_FAIL;
  250. }
  251. return res;
  252. }
  253. AST_TEST_DEFINE(threadpool_push)
  254. {
  255. struct ast_threadpool *pool = NULL;
  256. struct ast_threadpool_listener *listener = NULL;
  257. struct simple_task_data *std = NULL;
  258. struct test_listener_data *tld = NULL;
  259. enum ast_test_result_state res = AST_TEST_FAIL;
  260. struct ast_threadpool_options options = {
  261. .version = AST_THREADPOOL_OPTIONS_VERSION,
  262. .idle_timeout = 0,
  263. .auto_increment = 0,
  264. .initial_size = 0,
  265. .max_size = 0,
  266. };
  267. switch (cmd) {
  268. case TEST_INIT:
  269. info->name = "push";
  270. info->category = "/main/threadpool/";
  271. info->summary = "Test task";
  272. info->description =
  273. "Basic threadpool test";
  274. return AST_TEST_NOT_RUN;
  275. case TEST_EXECUTE:
  276. break;
  277. }
  278. tld = test_alloc();
  279. if (!tld) {
  280. return AST_TEST_FAIL;
  281. }
  282. listener = ast_threadpool_listener_alloc(&test_callbacks, tld);
  283. if (!listener) {
  284. goto end;
  285. }
  286. pool = ast_threadpool_create(info->name, listener, &options);
  287. if (!pool) {
  288. goto end;
  289. }
  290. std = simple_task_data_alloc();
  291. if (!std) {
  292. goto end;
  293. }
  294. if (ast_threadpool_push(pool, simple_task, std)) {
  295. goto end;
  296. }
  297. wait_for_task_pushed(listener);
  298. res = listener_check(test, listener, 1, 1, 1, 0, 0, 0);
  299. end:
  300. ast_threadpool_shutdown(pool);
  301. ao2_cleanup(listener);
  302. simple_task_data_free(std);
  303. ast_free(tld);
  304. return res;
  305. }
  306. AST_TEST_DEFINE(threadpool_initial_threads)
  307. {
  308. struct ast_threadpool *pool = NULL;
  309. struct ast_threadpool_listener *listener = NULL;
  310. enum ast_test_result_state res = AST_TEST_FAIL;
  311. struct test_listener_data *tld = NULL;
  312. struct ast_threadpool_options options = {
  313. .version = AST_THREADPOOL_OPTIONS_VERSION,
  314. .idle_timeout = 0,
  315. .auto_increment = 0,
  316. .initial_size = 3,
  317. .max_size = 0,
  318. };
  319. switch (cmd) {
  320. case TEST_INIT:
  321. info->name = "initial_threads";
  322. info->category = "/main/threadpool/";
  323. info->summary = "Test threadpool initialization state";
  324. info->description =
  325. "Ensure that a threadpool created with a specific size contains the\n"
  326. "proper number of idle threads.";
  327. return AST_TEST_NOT_RUN;
  328. case TEST_EXECUTE:
  329. break;
  330. }
  331. tld = test_alloc();
  332. if (!tld) {
  333. return AST_TEST_FAIL;
  334. }
  335. listener = ast_threadpool_listener_alloc(&test_callbacks, tld);
  336. if (!listener) {
  337. goto end;
  338. }
  339. pool = ast_threadpool_create(info->name, listener, &options);
  340. if (!pool) {
  341. goto end;
  342. }
  343. res = wait_until_thread_state(test, tld, 0, 3);
  344. end:
  345. ast_threadpool_shutdown(pool);
  346. ao2_cleanup(listener);
  347. ast_free(tld);
  348. return res;
  349. }
  350. AST_TEST_DEFINE(threadpool_thread_creation)
  351. {
  352. struct ast_threadpool *pool = NULL;
  353. struct ast_threadpool_listener *listener = NULL;
  354. enum ast_test_result_state res = AST_TEST_FAIL;
  355. struct test_listener_data *tld = NULL;
  356. struct ast_threadpool_options options = {
  357. .version = AST_THREADPOOL_OPTIONS_VERSION,
  358. .idle_timeout = 0,
  359. .auto_increment = 0,
  360. .initial_size = 0,
  361. .max_size = 0,
  362. };
  363. switch (cmd) {
  364. case TEST_INIT:
  365. info->name = "thread_creation";
  366. info->category = "/main/threadpool/";
  367. info->summary = "Test threadpool thread creation";
  368. info->description =
  369. "Ensure that threads can be added to a threadpool";
  370. return AST_TEST_NOT_RUN;
  371. case TEST_EXECUTE:
  372. break;
  373. }
  374. tld = test_alloc();
  375. if (!tld) {
  376. return AST_TEST_FAIL;
  377. }
  378. listener = ast_threadpool_listener_alloc(&test_callbacks, tld);
  379. if (!listener) {
  380. goto end;
  381. }
  382. pool = ast_threadpool_create(info->name, listener, &options);
  383. if (!pool) {
  384. goto end;
  385. }
  386. /* Now let's create a thread. It should start active, then go
  387. * idle immediately
  388. */
  389. ast_threadpool_set_size(pool, 1);
  390. res = wait_until_thread_state(test, tld, 0, 1);
  391. end:
  392. ast_threadpool_shutdown(pool);
  393. ao2_cleanup(listener);
  394. ast_free(tld);
  395. return res;
  396. }
  397. AST_TEST_DEFINE(threadpool_thread_destruction)
  398. {
  399. struct ast_threadpool *pool = NULL;
  400. struct ast_threadpool_listener *listener = NULL;
  401. enum ast_test_result_state res = AST_TEST_FAIL;
  402. struct test_listener_data *tld = NULL;
  403. struct ast_threadpool_options options = {
  404. .version = AST_THREADPOOL_OPTIONS_VERSION,
  405. .idle_timeout = 0,
  406. .auto_increment = 0,
  407. .initial_size = 0,
  408. .max_size = 0,
  409. };
  410. switch (cmd) {
  411. case TEST_INIT:
  412. info->name = "thread_destruction";
  413. info->category = "/main/threadpool/";
  414. info->summary = "Test threadpool thread destruction";
  415. info->description =
  416. "Ensure that threads are properly destroyed in a threadpool";
  417. return AST_TEST_NOT_RUN;
  418. case TEST_EXECUTE:
  419. break;
  420. }
  421. tld = test_alloc();
  422. if (!tld) {
  423. return AST_TEST_FAIL;
  424. }
  425. listener = ast_threadpool_listener_alloc(&test_callbacks, tld);
  426. if (!listener) {
  427. goto end;
  428. }
  429. pool = ast_threadpool_create(info->name, listener, &options);
  430. if (!pool) {
  431. goto end;
  432. }
  433. ast_threadpool_set_size(pool, 3);
  434. res = wait_until_thread_state(test, tld, 0, 3);
  435. if (res == AST_TEST_FAIL) {
  436. goto end;
  437. }
  438. res = listener_check(test, listener, 0, 0, 0, 0, 3, 0);
  439. if (res == AST_TEST_FAIL) {
  440. goto end;
  441. }
  442. ast_threadpool_set_size(pool, 2);
  443. res = wait_until_thread_state(test, tld, 0, 2);
  444. end:
  445. ast_threadpool_shutdown(pool);
  446. ao2_cleanup(listener);
  447. ast_free(tld);
  448. return res;
  449. }
  450. AST_TEST_DEFINE(threadpool_thread_timeout)
  451. {
  452. struct ast_threadpool *pool = NULL;
  453. struct ast_threadpool_listener *listener = NULL;
  454. enum ast_test_result_state res = AST_TEST_FAIL;
  455. struct test_listener_data *tld = NULL;
  456. struct ast_threadpool_options options = {
  457. .version = AST_THREADPOOL_OPTIONS_VERSION,
  458. .idle_timeout = 2,
  459. .auto_increment = 0,
  460. .initial_size = 0,
  461. .max_size = 0,
  462. };
  463. switch (cmd) {
  464. case TEST_INIT:
  465. info->name = "thread_timeout";
  466. info->category = "/main/threadpool/";
  467. info->summary = "Test threadpool thread timeout";
  468. info->description =
  469. "Ensure that a thread with a two second timeout dies as expected.";
  470. return AST_TEST_NOT_RUN;
  471. case TEST_EXECUTE:
  472. break;
  473. }
  474. tld = test_alloc();
  475. if (!tld) {
  476. return AST_TEST_FAIL;
  477. }
  478. listener = ast_threadpool_listener_alloc(&test_callbacks, tld);
  479. if (!listener) {
  480. goto end;
  481. }
  482. pool = ast_threadpool_create(info->name, listener, &options);
  483. if (!pool) {
  484. goto end;
  485. }
  486. ast_threadpool_set_size(pool, 1);
  487. res = wait_until_thread_state(test, tld, 0, 1);
  488. if (res == AST_TEST_FAIL) {
  489. goto end;
  490. }
  491. res = listener_check(test, listener, 0, 0, 0, 0, 1, 0);
  492. if (res == AST_TEST_FAIL) {
  493. goto end;
  494. }
  495. res = wait_until_thread_state(test, tld, 0, 0);
  496. if (res == AST_TEST_FAIL) {
  497. goto end;
  498. }
  499. res = listener_check(test, listener, 0, 0, 0, 0, 0, 0);
  500. end:
  501. ast_threadpool_shutdown(pool);
  502. ao2_cleanup(listener);
  503. ast_free(tld);
  504. return res;
  505. }
  506. AST_TEST_DEFINE(threadpool_thread_timeout_thrash)
  507. {
  508. struct ast_threadpool *pool = NULL;
  509. struct ast_threadpool_listener *listener = NULL;
  510. enum ast_test_result_state res = AST_TEST_FAIL;
  511. struct test_listener_data *tld = NULL;
  512. struct ast_threadpool_options options = {
  513. .version = AST_THREADPOOL_OPTIONS_VERSION,
  514. .idle_timeout = 1,
  515. .auto_increment = 1,
  516. .initial_size = 0,
  517. .max_size = 1,
  518. };
  519. int iteration;
  520. switch (cmd) {
  521. case TEST_INIT:
  522. info->name = "thread_timeout_thrash";
  523. info->category = "/main/threadpool/";
  524. info->summary = "Thrash threadpool thread timeout";
  525. info->description =
  526. "Repeatedly queue a task when a threadpool thread should timeout.";
  527. return AST_TEST_NOT_RUN;
  528. case TEST_EXECUTE:
  529. break;
  530. }
  531. tld = test_alloc();
  532. if (!tld) {
  533. return AST_TEST_FAIL;
  534. }
  535. listener = ast_threadpool_listener_alloc(&test_callbacks, tld);
  536. if (!listener) {
  537. goto end;
  538. }
  539. pool = ast_threadpool_create(info->name, listener, &options);
  540. if (!pool) {
  541. goto end;
  542. }
  543. ast_threadpool_set_size(pool, 1);
  544. for (iteration = 0; iteration < 30; ++iteration) {
  545. struct simple_task_data *std = NULL;
  546. struct timeval start = ast_tvnow();
  547. struct timespec end = {
  548. .tv_sec = start.tv_sec + options.idle_timeout,
  549. .tv_nsec = start.tv_usec * 1000
  550. };
  551. std = simple_task_data_alloc();
  552. if (!std) {
  553. goto end;
  554. }
  555. /* Wait until the threadpool thread should timeout due to being idle */
  556. ast_mutex_lock(&tld->lock);
  557. while (ast_cond_timedwait(&tld->cond, &tld->lock, &end) != ETIMEDOUT) {
  558. /* This purposely left empty as we want to loop waiting for a time out */
  559. }
  560. ast_mutex_unlock(&tld->lock);
  561. if (ast_threadpool_push(pool, simple_task, std)) {
  562. res = AST_TEST_FAIL;
  563. } else {
  564. res = wait_for_completion(test, std);
  565. }
  566. simple_task_data_free(std);
  567. if (res == AST_TEST_FAIL) {
  568. goto end;
  569. }
  570. }
  571. res = wait_until_thread_state(test, tld, 0, 0);
  572. if (res == AST_TEST_FAIL) {
  573. goto end;
  574. }
  575. res = listener_check(test, listener, 1, 1, 30, 0, 0, 1);
  576. end:
  577. ast_threadpool_shutdown(pool);
  578. ao2_cleanup(listener);
  579. ast_free(tld);
  580. return res;
  581. }
  582. AST_TEST_DEFINE(threadpool_one_task_one_thread)
  583. {
  584. struct ast_threadpool *pool = NULL;
  585. struct ast_threadpool_listener *listener = NULL;
  586. struct simple_task_data *std = NULL;
  587. enum ast_test_result_state res = AST_TEST_FAIL;
  588. struct test_listener_data *tld = NULL;
  589. struct ast_threadpool_options options = {
  590. .version = AST_THREADPOOL_OPTIONS_VERSION,
  591. .idle_timeout = 0,
  592. .auto_increment = 0,
  593. .initial_size = 0,
  594. .max_size = 0,
  595. };
  596. switch (cmd) {
  597. case TEST_INIT:
  598. info->name = "one_task_one_thread";
  599. info->category = "/main/threadpool/";
  600. info->summary = "Test a single task with a single thread";
  601. info->description =
  602. "Push a task into an empty threadpool, then add a thread to the pool.";
  603. return AST_TEST_NOT_RUN;
  604. case TEST_EXECUTE:
  605. break;
  606. }
  607. tld = test_alloc();
  608. if (!tld) {
  609. return AST_TEST_FAIL;
  610. }
  611. listener = ast_threadpool_listener_alloc(&test_callbacks, tld);
  612. if (!listener) {
  613. goto end;
  614. }
  615. pool = ast_threadpool_create(info->name, listener, &options);
  616. if (!pool) {
  617. goto end;
  618. }
  619. std = simple_task_data_alloc();
  620. if (!std) {
  621. goto end;
  622. }
  623. if (ast_threadpool_push(pool, simple_task, std)) {
  624. goto end;
  625. }
  626. ast_threadpool_set_size(pool, 1);
  627. /* Threads added to the pool are active when they start,
  628. * so the newly-created thread should immediately execute
  629. * the waiting task.
  630. */
  631. res = wait_for_completion(test, std);
  632. if (res == AST_TEST_FAIL) {
  633. goto end;
  634. }
  635. res = wait_for_empty_notice(test, tld);
  636. if (res == AST_TEST_FAIL) {
  637. goto end;
  638. }
  639. /* After completing the task, the thread should go idle */
  640. res = wait_until_thread_state(test, tld, 0, 1);
  641. if (res == AST_TEST_FAIL) {
  642. goto end;
  643. }
  644. res = listener_check(test, listener, 1, 1, 1, 0, 1, 1);
  645. end:
  646. ast_threadpool_shutdown(pool);
  647. ao2_cleanup(listener);
  648. simple_task_data_free(std);
  649. ast_free(tld);
  650. return res;
  651. }
  652. AST_TEST_DEFINE(threadpool_one_thread_one_task)
  653. {
  654. struct ast_threadpool *pool = NULL;
  655. struct ast_threadpool_listener *listener = NULL;
  656. struct simple_task_data *std = NULL;
  657. enum ast_test_result_state res = AST_TEST_FAIL;
  658. struct test_listener_data *tld = NULL;
  659. struct ast_threadpool_options options = {
  660. .version = AST_THREADPOOL_OPTIONS_VERSION,
  661. .idle_timeout = 0,
  662. .auto_increment = 0,
  663. .initial_size = 0,
  664. .max_size = 0,
  665. };
  666. switch (cmd) {
  667. case TEST_INIT:
  668. info->name = "one_thread_one_task";
  669. info->category = "/main/threadpool/";
  670. info->summary = "Test a single thread with a single task";
  671. info->description =
  672. "Add a thread to the pool and then push a task to it.";
  673. return AST_TEST_NOT_RUN;
  674. case TEST_EXECUTE:
  675. break;
  676. }
  677. tld = test_alloc();
  678. if (!tld) {
  679. return AST_TEST_FAIL;
  680. }
  681. listener = ast_threadpool_listener_alloc(&test_callbacks, tld);
  682. if (!listener) {
  683. goto end;
  684. }
  685. pool = ast_threadpool_create(info->name, listener, &options);
  686. if (!pool) {
  687. goto end;
  688. }
  689. std = simple_task_data_alloc();
  690. if (!std) {
  691. goto end;
  692. }
  693. ast_threadpool_set_size(pool, 1);
  694. res = wait_until_thread_state(test, tld, 0, 1);
  695. if (res == AST_TEST_FAIL) {
  696. goto end;
  697. }
  698. if (ast_threadpool_push(pool, simple_task, std)) {
  699. res = AST_TEST_FAIL;
  700. goto end;
  701. }
  702. res = wait_for_completion(test, std);
  703. if (res == AST_TEST_FAIL) {
  704. goto end;
  705. }
  706. res = wait_for_empty_notice(test, tld);
  707. if (res == AST_TEST_FAIL) {
  708. goto end;
  709. }
  710. /* After completing the task, the thread should go idle */
  711. res = wait_until_thread_state(test, tld, 0, 1);
  712. if (res == AST_TEST_FAIL) {
  713. goto end;
  714. }
  715. res = listener_check(test, listener, 1, 1, 1, 0, 1, 1);
  716. end:
  717. ast_threadpool_shutdown(pool);
  718. ao2_cleanup(listener);
  719. simple_task_data_free(std);
  720. ast_free(tld);
  721. return res;
  722. }
  723. AST_TEST_DEFINE(threadpool_one_thread_multiple_tasks)
  724. {
  725. struct ast_threadpool *pool = NULL;
  726. struct ast_threadpool_listener *listener = NULL;
  727. struct simple_task_data *std1 = NULL;
  728. struct simple_task_data *std2 = NULL;
  729. struct simple_task_data *std3 = NULL;
  730. enum ast_test_result_state res = AST_TEST_FAIL;
  731. struct test_listener_data *tld = NULL;
  732. struct ast_threadpool_options options = {
  733. .version = AST_THREADPOOL_OPTIONS_VERSION,
  734. .idle_timeout = 0,
  735. .auto_increment = 0,
  736. .initial_size = 0,
  737. .max_size = 0,
  738. };
  739. switch (cmd) {
  740. case TEST_INIT:
  741. info->name = "one_thread_multiple_tasks";
  742. info->category = "/main/threadpool/";
  743. info->summary = "Test a single thread with multiple tasks";
  744. info->description =
  745. "Add a thread to the pool and then push three tasks to it.";
  746. return AST_TEST_NOT_RUN;
  747. case TEST_EXECUTE:
  748. break;
  749. }
  750. tld = test_alloc();
  751. if (!tld) {
  752. return AST_TEST_FAIL;
  753. }
  754. listener = ast_threadpool_listener_alloc(&test_callbacks, tld);
  755. if (!listener) {
  756. goto end;
  757. }
  758. pool = ast_threadpool_create(info->name, listener, &options);
  759. if (!pool) {
  760. goto end;
  761. }
  762. std1 = simple_task_data_alloc();
  763. std2 = simple_task_data_alloc();
  764. std3 = simple_task_data_alloc();
  765. if (!std1 || !std2 || !std3) {
  766. goto end;
  767. }
  768. ast_threadpool_set_size(pool, 1);
  769. res = wait_until_thread_state(test, tld, 0, 1);
  770. if (res == AST_TEST_FAIL) {
  771. goto end;
  772. }
  773. res = AST_TEST_FAIL;
  774. if (ast_threadpool_push(pool, simple_task, std1)) {
  775. goto end;
  776. }
  777. if (ast_threadpool_push(pool, simple_task, std2)) {
  778. goto end;
  779. }
  780. if (ast_threadpool_push(pool, simple_task, std3)) {
  781. goto end;
  782. }
  783. res = wait_for_completion(test, std1);
  784. if (res == AST_TEST_FAIL) {
  785. goto end;
  786. }
  787. res = wait_for_completion(test, std2);
  788. if (res == AST_TEST_FAIL) {
  789. goto end;
  790. }
  791. res = wait_for_completion(test, std3);
  792. if (res == AST_TEST_FAIL) {
  793. goto end;
  794. }
  795. res = wait_for_empty_notice(test, tld);
  796. if (res == AST_TEST_FAIL) {
  797. goto end;
  798. }
  799. res = wait_until_thread_state(test, tld, 0, 1);
  800. if (res == AST_TEST_FAIL) {
  801. goto end;
  802. }
  803. res = listener_check(test, listener, 1, 0, 3, 0, 1, 1);
  804. end:
  805. ast_threadpool_shutdown(pool);
  806. ao2_cleanup(listener);
  807. simple_task_data_free(std1);
  808. simple_task_data_free(std2);
  809. simple_task_data_free(std3);
  810. ast_free(tld);
  811. return res;
  812. }
  813. static enum ast_test_result_state wait_until_thread_state_task_pushed(struct ast_test *test,
  814. struct test_listener_data *tld, int num_active, int num_idle, int num_tasks)
  815. {
  816. enum ast_test_result_state res = AST_TEST_PASS;
  817. struct timeval start;
  818. struct timespec end;
  819. res = wait_until_thread_state(test, tld, num_active, num_idle);
  820. if (res == AST_TEST_FAIL) {
  821. return res;
  822. }
  823. start = ast_tvnow();
  824. end.tv_sec = start.tv_sec + 5;
  825. end.tv_nsec = start.tv_usec * 1000;
  826. ast_mutex_lock(&tld->lock);
  827. while (tld->num_tasks != num_tasks) {
  828. if (ast_cond_timedwait(&tld->cond, &tld->lock, &end) == ETIMEDOUT) {
  829. break;
  830. }
  831. }
  832. if (tld->num_tasks != num_tasks) {
  833. ast_test_status_update(test, "Number of tasks pushed %d does not match expected %d\n",
  834. tld->num_tasks, num_tasks);
  835. res = AST_TEST_FAIL;
  836. }
  837. ast_mutex_unlock(&tld->lock);
  838. return res;
  839. }
  840. AST_TEST_DEFINE(threadpool_auto_increment)
  841. {
  842. struct ast_threadpool *pool = NULL;
  843. struct ast_threadpool_listener *listener = NULL;
  844. struct simple_task_data *std1 = NULL;
  845. struct simple_task_data *std2 = NULL;
  846. struct simple_task_data *std3 = NULL;
  847. struct simple_task_data *std4 = NULL;
  848. enum ast_test_result_state res = AST_TEST_FAIL;
  849. struct test_listener_data *tld = NULL;
  850. struct ast_threadpool_options options = {
  851. .version = AST_THREADPOOL_OPTIONS_VERSION,
  852. .idle_timeout = 0,
  853. .auto_increment = 3,
  854. .initial_size = 0,
  855. .max_size = 0,
  856. };
  857. switch (cmd) {
  858. case TEST_INIT:
  859. info->name = "auto_increment";
  860. info->category = "/main/threadpool/";
  861. info->summary = "Test that the threadpool grows as tasks are added";
  862. info->description =
  863. "Create an empty threadpool and push a task to it. Once the task is\n"
  864. "pushed, the threadpool should add three threads and be able to\n"
  865. "handle the task. The threads should then go idle";
  866. return AST_TEST_NOT_RUN;
  867. case TEST_EXECUTE:
  868. break;
  869. }
  870. tld = test_alloc();
  871. if (!tld) {
  872. return AST_TEST_FAIL;
  873. }
  874. listener = ast_threadpool_listener_alloc(&test_callbacks, tld);
  875. if (!listener) {
  876. goto end;
  877. }
  878. pool = ast_threadpool_create(info->name, listener, &options);
  879. if (!pool) {
  880. goto end;
  881. }
  882. std1 = simple_task_data_alloc();
  883. std2 = simple_task_data_alloc();
  884. std3 = simple_task_data_alloc();
  885. std4 = simple_task_data_alloc();
  886. if (!std1 || !std2 || !std3 || !std4) {
  887. goto end;
  888. }
  889. if (ast_threadpool_push(pool, simple_task, std1)) {
  890. goto end;
  891. }
  892. /* Pushing the task should result in the threadpool growing
  893. * by three threads. This will allow the task to actually execute
  894. */
  895. res = wait_for_completion(test, std1);
  896. if (res == AST_TEST_FAIL) {
  897. goto end;
  898. }
  899. res = wait_for_empty_notice(test, tld);
  900. if (res == AST_TEST_FAIL) {
  901. goto end;
  902. }
  903. res = wait_until_thread_state(test, tld, 0, 3);
  904. if (res == AST_TEST_FAIL) {
  905. goto end;
  906. }
  907. /* Now push three tasks into the pool and ensure the pool does not
  908. * grow.
  909. */
  910. res = AST_TEST_FAIL;
  911. if (ast_threadpool_push(pool, simple_task, std2)) {
  912. goto end;
  913. }
  914. if (ast_threadpool_push(pool, simple_task, std3)) {
  915. goto end;
  916. }
  917. if (ast_threadpool_push(pool, simple_task, std4)) {
  918. goto end;
  919. }
  920. res = wait_for_completion(test, std2);
  921. if (res == AST_TEST_FAIL) {
  922. goto end;
  923. }
  924. res = wait_for_completion(test, std3);
  925. if (res == AST_TEST_FAIL) {
  926. goto end;
  927. }
  928. res = wait_for_completion(test, std4);
  929. if (res == AST_TEST_FAIL) {
  930. goto end;
  931. }
  932. res = wait_for_empty_notice(test, tld);
  933. if (res == AST_TEST_FAIL) {
  934. goto end;
  935. }
  936. res = wait_until_thread_state_task_pushed(test, tld, 0, 3, 4);
  937. if (res == AST_TEST_FAIL) {
  938. goto end;
  939. }
  940. end:
  941. ast_threadpool_shutdown(pool);
  942. ao2_cleanup(listener);
  943. simple_task_data_free(std1);
  944. simple_task_data_free(std2);
  945. simple_task_data_free(std3);
  946. simple_task_data_free(std4);
  947. ast_free(tld);
  948. return res;
  949. }
  950. AST_TEST_DEFINE(threadpool_max_size)
  951. {
  952. struct ast_threadpool *pool = NULL;
  953. struct ast_threadpool_listener *listener = NULL;
  954. struct simple_task_data *std = NULL;
  955. enum ast_test_result_state res = AST_TEST_FAIL;
  956. struct test_listener_data *tld = NULL;
  957. struct ast_threadpool_options options = {
  958. .version = AST_THREADPOOL_OPTIONS_VERSION,
  959. .idle_timeout = 0,
  960. .auto_increment = 3,
  961. .initial_size = 0,
  962. .max_size = 2,
  963. };
  964. switch (cmd) {
  965. case TEST_INIT:
  966. info->name = "max_size";
  967. info->category = "/main/threadpool/";
  968. info->summary = "Test that the threadpool does not exceed its maximum size restriction";
  969. info->description =
  970. "Create an empty threadpool and push a task to it. Once the task is\n"
  971. "pushed, the threadpool should attempt to grow by three threads, but the\n"
  972. "pool's restrictions should only allow two threads to be added.";
  973. return AST_TEST_NOT_RUN;
  974. case TEST_EXECUTE:
  975. break;
  976. }
  977. tld = test_alloc();
  978. if (!tld) {
  979. return AST_TEST_FAIL;
  980. }
  981. listener = ast_threadpool_listener_alloc(&test_callbacks, tld);
  982. if (!listener) {
  983. goto end;
  984. }
  985. pool = ast_threadpool_create(info->name, listener, &options);
  986. if (!pool) {
  987. goto end;
  988. }
  989. std = simple_task_data_alloc();
  990. if (!std) {
  991. goto end;
  992. }
  993. if (ast_threadpool_push(pool, simple_task, std)) {
  994. goto end;
  995. }
  996. res = wait_for_completion(test, std);
  997. if (res == AST_TEST_FAIL) {
  998. goto end;
  999. }
  1000. res = wait_until_thread_state(test, tld, 0, 2);
  1001. if (res == AST_TEST_FAIL) {
  1002. goto end;
  1003. }
  1004. res = listener_check(test, listener, 1, 1, 1, 0, 2, 1);
  1005. end:
  1006. ast_threadpool_shutdown(pool);
  1007. ao2_cleanup(listener);
  1008. simple_task_data_free(std);
  1009. ast_free(tld);
  1010. return res;
  1011. }
  1012. AST_TEST_DEFINE(threadpool_reactivation)
  1013. {
  1014. struct ast_threadpool *pool = NULL;
  1015. struct ast_threadpool_listener *listener = NULL;
  1016. struct simple_task_data *std1 = NULL;
  1017. struct simple_task_data *std2 = NULL;
  1018. enum ast_test_result_state res = AST_TEST_FAIL;
  1019. struct test_listener_data *tld = NULL;
  1020. struct ast_threadpool_options options = {
  1021. .version = AST_THREADPOOL_OPTIONS_VERSION,
  1022. .idle_timeout = 0,
  1023. .auto_increment = 0,
  1024. .initial_size = 0,
  1025. .max_size = 0,
  1026. };
  1027. switch (cmd) {
  1028. case TEST_INIT:
  1029. info->name = "reactivation";
  1030. info->category = "/main/threadpool/";
  1031. info->summary = "Test that a threadpool reactivates when work is added";
  1032. info->description =
  1033. "Push a task into a threadpool. Make sure the task executes and the\n"
  1034. "thread goes idle. Then push a second task and ensure that the thread\n"
  1035. "awakens and executes the second task.";
  1036. return AST_TEST_NOT_RUN;
  1037. case TEST_EXECUTE:
  1038. break;
  1039. }
  1040. tld = test_alloc();
  1041. if (!tld) {
  1042. return AST_TEST_FAIL;
  1043. }
  1044. listener = ast_threadpool_listener_alloc(&test_callbacks, tld);
  1045. if (!listener) {
  1046. goto end;
  1047. }
  1048. pool = ast_threadpool_create(info->name, listener, &options);
  1049. if (!pool) {
  1050. goto end;
  1051. }
  1052. std1 = simple_task_data_alloc();
  1053. std2 = simple_task_data_alloc();
  1054. if (!std1 || !std2) {
  1055. goto end;
  1056. }
  1057. if (ast_threadpool_push(pool, simple_task, std1)) {
  1058. goto end;
  1059. }
  1060. ast_threadpool_set_size(pool, 1);
  1061. res = wait_for_completion(test, std1);
  1062. if (res == AST_TEST_FAIL) {
  1063. goto end;
  1064. }
  1065. res = wait_for_empty_notice(test, tld);
  1066. if (res == AST_TEST_FAIL) {
  1067. goto end;
  1068. }
  1069. res = wait_until_thread_state(test, tld, 0, 1);
  1070. if (res == AST_TEST_FAIL) {
  1071. goto end;
  1072. }
  1073. res = listener_check(test, listener, 1, 1, 1, 0, 1, 1);
  1074. if (res == AST_TEST_FAIL) {
  1075. goto end;
  1076. }
  1077. /* Now make sure the threadpool reactivates when we add a second task */
  1078. if (ast_threadpool_push(pool, simple_task, std2)) {
  1079. res = AST_TEST_FAIL;
  1080. goto end;
  1081. }
  1082. res = wait_for_completion(test, std2);
  1083. if (res == AST_TEST_FAIL) {
  1084. goto end;
  1085. }
  1086. res = wait_for_empty_notice(test, tld);
  1087. if (res == AST_TEST_FAIL) {
  1088. goto end;
  1089. }
  1090. res = wait_until_thread_state(test, tld, 0, 1);
  1091. if (res == AST_TEST_FAIL) {
  1092. goto end;
  1093. }
  1094. res = listener_check(test, listener, 1, 1, 2, 0, 1, 1);
  1095. end:
  1096. ast_threadpool_shutdown(pool);
  1097. ao2_cleanup(listener);
  1098. simple_task_data_free(std1);
  1099. simple_task_data_free(std2);
  1100. ast_free(tld);
  1101. return res;
  1102. }
  1103. struct complex_task_data {
  1104. int task_started;
  1105. int task_executed;
  1106. int continue_task;
  1107. ast_mutex_t lock;
  1108. ast_cond_t stall_cond;
  1109. ast_cond_t notify_cond;
  1110. };
  1111. static struct complex_task_data *complex_task_data_alloc(void)
  1112. {
  1113. struct complex_task_data *ctd = ast_calloc(1, sizeof(*ctd));
  1114. if (!ctd) {
  1115. return NULL;
  1116. }
  1117. ast_mutex_init(&ctd->lock);
  1118. ast_cond_init(&ctd->stall_cond, NULL);
  1119. ast_cond_init(&ctd->notify_cond, NULL);
  1120. return ctd;
  1121. }
  1122. static void complex_task_data_free(struct complex_task_data *ctd)
  1123. {
  1124. if (!ctd) {
  1125. return;
  1126. }
  1127. ast_mutex_destroy(&ctd->lock);
  1128. ast_cond_destroy(&ctd->stall_cond);
  1129. ast_cond_destroy(&ctd->notify_cond);
  1130. ast_free(ctd);
  1131. }
  1132. static int complex_task(void *data)
  1133. {
  1134. struct complex_task_data *ctd = data;
  1135. SCOPED_MUTEX(lock, &ctd->lock);
  1136. /* Notify that we started */
  1137. ctd->task_started = 1;
  1138. ast_cond_signal(&ctd->notify_cond);
  1139. while (!ctd->continue_task) {
  1140. ast_cond_wait(&ctd->stall_cond, lock);
  1141. }
  1142. /* We got poked. Finish up */
  1143. ctd->task_executed = 1;
  1144. ast_cond_signal(&ctd->notify_cond);
  1145. return 0;
  1146. }
  1147. static void poke_worker(struct complex_task_data *ctd)
  1148. {
  1149. SCOPED_MUTEX(lock, &ctd->lock);
  1150. ctd->continue_task = 1;
  1151. ast_cond_signal(&ctd->stall_cond);
  1152. }
  1153. static int wait_for_complex_start(struct complex_task_data *ctd)
  1154. {
  1155. struct timeval start = ast_tvnow();
  1156. struct timespec end = {
  1157. .tv_sec = start.tv_sec + 5,
  1158. .tv_nsec = start.tv_usec * 1000
  1159. };
  1160. SCOPED_MUTEX(lock, &ctd->lock);
  1161. while (!ctd->task_started) {
  1162. if (ast_cond_timedwait(&ctd->notify_cond, lock, &end) == ETIMEDOUT) {
  1163. break;
  1164. }
  1165. }
  1166. return ctd->task_started;
  1167. }
  1168. static int has_complex_started(struct complex_task_data *ctd)
  1169. {
  1170. struct timeval start = ast_tvnow();
  1171. struct timespec end = {
  1172. .tv_sec = start.tv_sec + 1,
  1173. .tv_nsec = start.tv_usec * 1000
  1174. };
  1175. SCOPED_MUTEX(lock, &ctd->lock);
  1176. while (!ctd->task_started) {
  1177. if (ast_cond_timedwait(&ctd->notify_cond, lock, &end) == ETIMEDOUT) {
  1178. break;
  1179. }
  1180. }
  1181. return ctd->task_started;
  1182. }
  1183. static enum ast_test_result_state wait_for_complex_completion(struct complex_task_data *ctd)
  1184. {
  1185. struct timeval start = ast_tvnow();
  1186. struct timespec end = {
  1187. .tv_sec = start.tv_sec + 5,
  1188. .tv_nsec = start.tv_usec * 1000
  1189. };
  1190. enum ast_test_result_state res = AST_TEST_PASS;
  1191. SCOPED_MUTEX(lock, &ctd->lock);
  1192. while (!ctd->task_executed) {
  1193. if (ast_cond_timedwait(&ctd->notify_cond, lock, &end) == ETIMEDOUT) {
  1194. break;
  1195. }
  1196. }
  1197. if (!ctd->task_executed) {
  1198. res = AST_TEST_FAIL;
  1199. }
  1200. return res;
  1201. }
  1202. AST_TEST_DEFINE(threadpool_task_distribution)
  1203. {
  1204. struct ast_threadpool *pool = NULL;
  1205. struct ast_threadpool_listener *listener = NULL;
  1206. struct complex_task_data *ctd1 = NULL;
  1207. struct complex_task_data *ctd2 = NULL;
  1208. enum ast_test_result_state res = AST_TEST_FAIL;
  1209. struct test_listener_data *tld = NULL;
  1210. struct ast_threadpool_options options = {
  1211. .version = AST_THREADPOOL_OPTIONS_VERSION,
  1212. .idle_timeout = 0,
  1213. .auto_increment = 0,
  1214. .initial_size = 0,
  1215. .max_size = 0,
  1216. };
  1217. switch (cmd) {
  1218. case TEST_INIT:
  1219. info->name = "task_distribution";
  1220. info->category = "/main/threadpool/";
  1221. info->summary = "Test that tasks are evenly distributed to threads";
  1222. info->description =
  1223. "Push two tasks into a threadpool. Ensure that each is handled by\n"
  1224. "a separate thread";
  1225. return AST_TEST_NOT_RUN;
  1226. case TEST_EXECUTE:
  1227. break;
  1228. }
  1229. tld = test_alloc();
  1230. if (!tld) {
  1231. return AST_TEST_FAIL;
  1232. }
  1233. listener = ast_threadpool_listener_alloc(&test_callbacks, tld);
  1234. if (!listener) {
  1235. goto end;
  1236. }
  1237. pool = ast_threadpool_create(info->name, listener, &options);
  1238. if (!pool) {
  1239. goto end;
  1240. }
  1241. ctd1 = complex_task_data_alloc();
  1242. ctd2 = complex_task_data_alloc();
  1243. if (!ctd1 || !ctd2) {
  1244. goto end;
  1245. }
  1246. if (ast_threadpool_push(pool, complex_task, ctd1)) {
  1247. goto end;
  1248. }
  1249. if (ast_threadpool_push(pool, complex_task, ctd2)) {
  1250. goto end;
  1251. }
  1252. ast_threadpool_set_size(pool, 2);
  1253. res = wait_until_thread_state(test, tld, 2, 0);
  1254. if (res == AST_TEST_FAIL) {
  1255. goto end;
  1256. }
  1257. res = listener_check(test, listener, 1, 0, 2, 2, 0, 0);
  1258. if (res == AST_TEST_FAIL) {
  1259. goto end;
  1260. }
  1261. /* The tasks are stalled until we poke them */
  1262. poke_worker(ctd1);
  1263. poke_worker(ctd2);
  1264. res = wait_for_complex_completion(ctd1);
  1265. if (res == AST_TEST_FAIL) {
  1266. goto end;
  1267. }
  1268. res = wait_for_complex_completion(ctd2);
  1269. if (res == AST_TEST_FAIL) {
  1270. goto end;
  1271. }
  1272. res = wait_until_thread_state(test, tld, 0, 2);
  1273. if (res == AST_TEST_FAIL) {
  1274. goto end;
  1275. }
  1276. res = listener_check(test, listener, 1, 0, 2, 0, 2, 1);
  1277. end:
  1278. ast_threadpool_shutdown(pool);
  1279. ao2_cleanup(listener);
  1280. complex_task_data_free(ctd1);
  1281. complex_task_data_free(ctd2);
  1282. ast_free(tld);
  1283. return res;
  1284. }
  1285. AST_TEST_DEFINE(threadpool_more_destruction)
  1286. {
  1287. struct ast_threadpool *pool = NULL;
  1288. struct ast_threadpool_listener *listener = NULL;
  1289. struct complex_task_data *ctd1 = NULL;
  1290. struct complex_task_data *ctd2 = NULL;
  1291. enum ast_test_result_state res = AST_TEST_FAIL;
  1292. struct test_listener_data *tld = NULL;
  1293. struct ast_threadpool_options options = {
  1294. .version = AST_THREADPOOL_OPTIONS_VERSION,
  1295. .idle_timeout = 0,
  1296. .auto_increment = 0,
  1297. .initial_size = 0,
  1298. .max_size = 0,
  1299. };
  1300. switch (cmd) {
  1301. case TEST_INIT:
  1302. info->name = "more_destruction";
  1303. info->category = "/main/threadpool/";
  1304. info->summary = "Test that threads are destroyed as expected";
  1305. info->description =
  1306. "Push two tasks into a threadpool. Set the threadpool size to 4\n"
  1307. "Ensure that there are 2 active and 2 idle threads. Then shrink the\n"
  1308. "threadpool down to 1 thread. Ensure that the thread leftover is active\n"
  1309. "and ensure that both tasks complete.";
  1310. return AST_TEST_NOT_RUN;
  1311. case TEST_EXECUTE:
  1312. break;
  1313. }
  1314. tld = test_alloc();
  1315. if (!tld) {
  1316. return AST_TEST_FAIL;
  1317. }
  1318. listener = ast_threadpool_listener_alloc(&test_callbacks, tld);
  1319. if (!listener) {
  1320. goto end;
  1321. }
  1322. pool = ast_threadpool_create(info->name, listener, &options);
  1323. if (!pool) {
  1324. goto end;
  1325. }
  1326. ctd1 = complex_task_data_alloc();
  1327. ctd2 = complex_task_data_alloc();
  1328. if (!ctd1 || !ctd2) {
  1329. goto end;
  1330. }
  1331. if (ast_threadpool_push(pool, complex_task, ctd1)) {
  1332. goto end;
  1333. }
  1334. if (ast_threadpool_push(pool, complex_task, ctd2)) {
  1335. goto end;
  1336. }
  1337. ast_threadpool_set_size(pool, 4);
  1338. res = wait_until_thread_state(test, tld, 2, 2);
  1339. if (res == AST_TEST_FAIL) {
  1340. goto end;
  1341. }
  1342. res = listener_check(test, listener, 1, 0, 2, 2, 2, 0);
  1343. if (res == AST_TEST_FAIL) {
  1344. goto end;
  1345. }
  1346. ast_threadpool_set_size(pool, 1);
  1347. /* Shrinking the threadpool should kill off the two idle threads
  1348. * and one of the active threads.
  1349. */
  1350. res = wait_until_thread_state(test, tld, 1, 0);
  1351. if (res == AST_TEST_FAIL) {
  1352. goto end;
  1353. }
  1354. res = listener_check(test, listener, 1, 0, 2, 1, 0, 0);
  1355. if (res == AST_TEST_FAIL) {
  1356. goto end;
  1357. }
  1358. /* The tasks are stalled until we poke them */
  1359. poke_worker(ctd1);
  1360. poke_worker(ctd2);
  1361. res = wait_for_complex_completion(ctd1);
  1362. if (res == AST_TEST_FAIL) {
  1363. goto end;
  1364. }
  1365. res = wait_for_complex_completion(ctd2);
  1366. if (res == AST_TEST_FAIL) {
  1367. goto end;
  1368. }
  1369. res = wait_until_thread_state(test, tld, 0, 1);
  1370. if (res == AST_TEST_FAIL) {
  1371. goto end;
  1372. }
  1373. res = listener_check(test, listener, 1, 0, 2, 0, 1, 1);
  1374. end:
  1375. ast_threadpool_shutdown(pool);
  1376. ao2_cleanup(listener);
  1377. complex_task_data_free(ctd1);
  1378. complex_task_data_free(ctd2);
  1379. ast_free(tld);
  1380. return res;
  1381. }
  1382. AST_TEST_DEFINE(threadpool_serializer)
  1383. {
  1384. int started = 0;
  1385. int finished = 0;
  1386. enum ast_test_result_state res = AST_TEST_FAIL;
  1387. struct ast_threadpool *pool = NULL;
  1388. struct ast_taskprocessor *uut = NULL;
  1389. struct complex_task_data *data1 = NULL;
  1390. struct complex_task_data *data2 = NULL;
  1391. struct complex_task_data *data3 = NULL;
  1392. struct ast_threadpool_options options = {
  1393. .version = AST_THREADPOOL_OPTIONS_VERSION,
  1394. .idle_timeout = 0,
  1395. .auto_increment = 0,
  1396. .initial_size = 2,
  1397. .max_size = 0,
  1398. };
  1399. switch (cmd) {
  1400. case TEST_INIT:
  1401. info->name = "threadpool_serializer";
  1402. info->category = "/main/threadpool/";
  1403. info->summary = "Test that serializers";
  1404. info->description =
  1405. "Ensures that tasks enqueued to a serialize execute in sequence.";
  1406. return AST_TEST_NOT_RUN;
  1407. case TEST_EXECUTE:
  1408. break;
  1409. }
  1410. pool = ast_threadpool_create("threadpool_serializer", NULL, &options);
  1411. if (!pool) {
  1412. ast_test_status_update(test, "Could not create threadpool\n");
  1413. goto end;
  1414. }
  1415. uut = ast_threadpool_serializer("ser1", pool);
  1416. data1 = complex_task_data_alloc();
  1417. data2 = complex_task_data_alloc();
  1418. data3 = complex_task_data_alloc();
  1419. if (!uut || !data1 || !data2 || !data3) {
  1420. ast_test_status_update(test, "Allocation failed\n");
  1421. goto end;
  1422. }
  1423. /* This should start right away */
  1424. if (ast_taskprocessor_push(uut, complex_task, data1)) {
  1425. ast_test_status_update(test, "Failed to enqueue data1\n");
  1426. goto end;
  1427. }
  1428. started = wait_for_complex_start(data1);
  1429. if (!started) {
  1430. ast_test_status_update(test, "Failed to start data1\n");
  1431. goto end;
  1432. }
  1433. /* This should not start until data 1 is complete */
  1434. if (ast_taskprocessor_push(uut, complex_task, data2)) {
  1435. ast_test_status_update(test, "Failed to enqueue data2\n");
  1436. goto end;
  1437. }
  1438. started = has_complex_started(data2);
  1439. if (started) {
  1440. ast_test_status_update(test, "data2 started out of order\n");
  1441. goto end;
  1442. }
  1443. /* But the free thread in the pool can still run */
  1444. if (ast_threadpool_push(pool, complex_task, data3)) {
  1445. ast_test_status_update(test, "Failed to enqueue data3\n");
  1446. }
  1447. started = wait_for_complex_start(data3);
  1448. if (!started) {
  1449. ast_test_status_update(test, "Failed to start data3\n");
  1450. goto end;
  1451. }
  1452. /* Finishing data1 should allow data2 to start */
  1453. poke_worker(data1);
  1454. finished = wait_for_complex_completion(data1) == AST_TEST_PASS;
  1455. if (!finished) {
  1456. ast_test_status_update(test, "data1 couldn't finish\n");
  1457. goto end;
  1458. }
  1459. started = wait_for_complex_start(data2);
  1460. if (!started) {
  1461. ast_test_status_update(test, "Failed to start data2\n");
  1462. goto end;
  1463. }
  1464. /* Finish up */
  1465. poke_worker(data2);
  1466. finished = wait_for_complex_completion(data2) == AST_TEST_PASS;
  1467. if (!finished) {
  1468. ast_test_status_update(test, "data2 couldn't finish\n");
  1469. goto end;
  1470. }
  1471. poke_worker(data3);
  1472. finished = wait_for_complex_completion(data3) == AST_TEST_PASS;
  1473. if (!finished) {
  1474. ast_test_status_update(test, "data3 couldn't finish\n");
  1475. goto end;
  1476. }
  1477. res = AST_TEST_PASS;
  1478. end:
  1479. poke_worker(data1);
  1480. poke_worker(data2);
  1481. poke_worker(data3);
  1482. ast_taskprocessor_unreference(uut);
  1483. ast_threadpool_shutdown(pool);
  1484. complex_task_data_free(data1);
  1485. complex_task_data_free(data2);
  1486. complex_task_data_free(data3);
  1487. return res;
  1488. }
  1489. AST_TEST_DEFINE(threadpool_serializer_dupe)
  1490. {
  1491. enum ast_test_result_state res = AST_TEST_FAIL;
  1492. struct ast_threadpool *pool = NULL;
  1493. struct ast_taskprocessor *uut = NULL;
  1494. struct ast_taskprocessor *there_can_be_only_one = NULL;
  1495. struct ast_threadpool_options options = {
  1496. .version = AST_THREADPOOL_OPTIONS_VERSION,
  1497. .idle_timeout = 0,
  1498. .auto_increment = 0,
  1499. .initial_size = 2,
  1500. .max_size = 0,
  1501. };
  1502. switch (cmd) {
  1503. case TEST_INIT:
  1504. info->name = "threadpool_serializer_dupe";
  1505. info->category = "/main/threadpool/";
  1506. info->summary = "Test that serializers are uniquely named";
  1507. info->description =
  1508. "Creating two serializers with the same name should\n"
  1509. "result in error.";
  1510. return AST_TEST_NOT_RUN;
  1511. case TEST_EXECUTE:
  1512. break;
  1513. }
  1514. pool = ast_threadpool_create("threadpool_serializer", NULL, &options);
  1515. if (!pool) {
  1516. ast_test_status_update(test, "Could not create threadpool\n");
  1517. goto end;
  1518. }
  1519. uut = ast_threadpool_serializer("highlander", pool);
  1520. if (!uut) {
  1521. ast_test_status_update(test, "Allocation failed\n");
  1522. goto end;
  1523. }
  1524. there_can_be_only_one = ast_threadpool_serializer("highlander", pool);
  1525. if (there_can_be_only_one) {
  1526. ast_taskprocessor_unreference(there_can_be_only_one);
  1527. ast_test_status_update(test, "Duplicate name error\n");
  1528. goto end;
  1529. }
  1530. res = AST_TEST_PASS;
  1531. end:
  1532. ast_taskprocessor_unreference(uut);
  1533. ast_threadpool_shutdown(pool);
  1534. return res;
  1535. }
  1536. static int unload_module(void)
  1537. {
  1538. ast_test_unregister(threadpool_push);
  1539. ast_test_unregister(threadpool_initial_threads);
  1540. ast_test_unregister(threadpool_thread_creation);
  1541. ast_test_unregister(threadpool_thread_destruction);
  1542. ast_test_unregister(threadpool_thread_timeout);
  1543. ast_test_unregister(threadpool_thread_timeout_thrash);
  1544. ast_test_unregister(threadpool_one_task_one_thread);
  1545. ast_test_unregister(threadpool_one_thread_one_task);
  1546. ast_test_unregister(threadpool_one_thread_multiple_tasks);
  1547. ast_test_unregister(threadpool_auto_increment);
  1548. ast_test_unregister(threadpool_max_size);
  1549. ast_test_unregister(threadpool_reactivation);
  1550. ast_test_unregister(threadpool_task_distribution);
  1551. ast_test_unregister(threadpool_more_destruction);
  1552. ast_test_unregister(threadpool_serializer);
  1553. ast_test_unregister(threadpool_serializer_dupe);
  1554. return 0;
  1555. }
  1556. static int load_module(void)
  1557. {
  1558. ast_test_register(threadpool_push);
  1559. ast_test_register(threadpool_initial_threads);
  1560. ast_test_register(threadpool_thread_creation);
  1561. ast_test_register(threadpool_thread_destruction);
  1562. ast_test_register(threadpool_thread_timeout);
  1563. ast_test_register(threadpool_thread_timeout_thrash);
  1564. ast_test_register(threadpool_one_task_one_thread);
  1565. ast_test_register(threadpool_one_thread_one_task);
  1566. ast_test_register(threadpool_one_thread_multiple_tasks);
  1567. ast_test_register(threadpool_auto_increment);
  1568. ast_test_register(threadpool_max_size);
  1569. ast_test_register(threadpool_reactivation);
  1570. ast_test_register(threadpool_task_distribution);
  1571. ast_test_register(threadpool_more_destruction);
  1572. ast_test_register(threadpool_serializer);
  1573. ast_test_register(threadpool_serializer_dupe);
  1574. return AST_MODULE_LOAD_SUCCESS;
  1575. }
  1576. AST_MODULE_INFO_STANDARD(ASTERISK_GPL_KEY, "threadpool test module");