test_stasis.c 86 KB


  1. /*
  2. * Asterisk -- An open source telephony toolkit.
  3. *
  4. * Copyright (C) 2013, Digium, Inc.
  5. *
  6. * David M. Lee, II <dlee@digium.com>
  7. *
  8. * See http://www.asterisk.org for more information about
  9. * the Asterisk project. Please do not directly contact
  10. * any of the maintainers of this project for assistance;
  11. * the project provides a web site, mailing lists and IRC
  12. * channels for your use.
  13. *
  14. * This program is free software, distributed under the terms of
  15. * the GNU General Public License Version 2. See the LICENSE file
  16. * at the top of the source tree.
  17. */
  18. /*!
  19. * \file \brief Test Stasis message bus.
  20. *
  21. * \author\verbatim David M. Lee, II <dlee@digium.com> \endverbatim
  22. *
  23. * \ingroup tests
  24. */
  25. /*** MODULEINFO
  26. <depend>TEST_FRAMEWORK</depend>
  27. <support_level>core</support_level>
  28. ***/
  29. #include "asterisk.h"
  30. ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
  31. #include "asterisk/astobj2.h"
  32. #include "asterisk/module.h"
  33. #include "asterisk/stasis.h"
  34. #include "asterisk/stasis_message_router.h"
  35. #include "asterisk/test.h"
  36. #define test_category "/stasis/core/"
  37. static struct ast_event *fake_event(struct stasis_message *message)
  38. {
  39. return ast_event_new(AST_EVENT_CUSTOM,
  40. AST_EVENT_IE_DESCRIPTION, AST_EVENT_IE_PLTYPE_STR, "Dummy", AST_EVENT_IE_END);
  41. }
  42. static struct ast_json *fake_json(struct stasis_message *message, const struct stasis_message_sanitizer *sanitize)
  43. {
  44. const char *text = stasis_message_data(message);
  45. return ast_json_string_create(text);
  46. }
  47. static struct ast_manager_event_blob *fake_ami(struct stasis_message *message)
  48. {
  49. RAII_VAR(struct ast_manager_event_blob *, res, NULL, ao2_cleanup);
  50. const char *text = stasis_message_data(message);
  51. res = ast_manager_event_blob_create(EVENT_FLAG_TEST, "FakeMI",
  52. "Message: %s\r\n", text);
  53. if (res == NULL) {
  54. return NULL;
  55. }
  56. ao2_ref(res, +1);
  57. return res;
  58. }
  59. static struct stasis_message_vtable fake_vtable = {
  60. .to_json = fake_json,
  61. .to_ami = fake_ami
  62. };
  63. AST_TEST_DEFINE(message_type)
  64. {
  65. RAII_VAR(struct stasis_message_type *, uut, NULL, ao2_cleanup);
  66. switch (cmd) {
  67. case TEST_INIT:
  68. info->name = __func__;
  69. info->category = test_category;
  70. info->summary = "Test basic message_type functions";
  71. info->description = "Test basic message_type functions";
  72. return AST_TEST_NOT_RUN;
  73. case TEST_EXECUTE:
  74. break;
  75. }
  76. ast_test_validate(test, stasis_message_type_create(NULL, NULL, NULL) == STASIS_MESSAGE_TYPE_ERROR);
  77. ast_test_validate(test, stasis_message_type_create("SomeMessage", NULL, &uut) == STASIS_MESSAGE_TYPE_SUCCESS);
  78. ast_test_validate(test, 0 == strcmp(stasis_message_type_name(uut), "SomeMessage"));
  79. return AST_TEST_PASS;
  80. }
  81. AST_TEST_DEFINE(message)
  82. {
  83. RAII_VAR(struct stasis_message_type *, type, NULL, ao2_cleanup);
  84. RAII_VAR(struct stasis_message *, uut1, NULL, ao2_cleanup);
  85. RAII_VAR(struct stasis_message *, uut2, NULL, ao2_cleanup);
  86. RAII_VAR(char *, data, NULL, ao2_cleanup);
  87. char *expected = "SomeData";
  88. struct timeval expected_timestamp;
  89. struct timeval time_diff;
  90. struct ast_eid foreign_eid;
  91. switch (cmd) {
  92. case TEST_INIT:
  93. info->name = __func__;
  94. info->category = test_category;
  95. info->summary = "Test basic message functions";
  96. info->description = "Test basic message functions";
  97. return AST_TEST_NOT_RUN;
  98. case TEST_EXECUTE:
  99. break;
  100. }
  101. memset(&foreign_eid, 0xFF, sizeof(foreign_eid));
  102. ast_test_validate(test, stasis_message_type_create("SomeMessage", NULL, &type) == STASIS_MESSAGE_TYPE_SUCCESS);
  103. ast_test_validate(test, NULL == stasis_message_create_full(NULL, NULL, NULL));
  104. ast_test_validate(test, NULL == stasis_message_create_full(type, NULL, NULL));
  105. data = ao2_alloc(strlen(expected) + 1, NULL);
  106. strcpy(data, expected);/* Safe */
  107. expected_timestamp = ast_tvnow();
  108. uut1 = stasis_message_create_full(type, data, &foreign_eid);
  109. uut2 = stasis_message_create_full(type, data, NULL);
  110. ast_test_validate(test, NULL != uut1);
  111. ast_test_validate(test, NULL != uut2);
  112. ast_test_validate(test, type == stasis_message_type(uut1));
  113. ast_test_validate(test, type == stasis_message_type(uut2));
  114. ast_test_validate(test, 0 == strcmp(expected, stasis_message_data(uut1)));
  115. ast_test_validate(test, 0 == strcmp(expected, stasis_message_data(uut2)));
  116. ast_test_validate(test, NULL != stasis_message_eid(uut1));
  117. ast_test_validate(test, NULL == stasis_message_eid(uut2));
  118. ast_test_validate(test, !ast_eid_cmp(&foreign_eid, stasis_message_eid(uut1)));
  119. ast_test_validate(test, 3 == ao2_ref(data, 0)); /* uut1 and uut2 have ref to data */
  120. time_diff = ast_tvsub(*stasis_message_timestamp(uut1), expected_timestamp);
  121. /* 10ms is certainly long enough for the two calls to complete */
  122. ast_test_validate(test, time_diff.tv_sec == 0);
  123. ast_test_validate(test, time_diff.tv_usec < 10000);
  124. ao2_ref(uut1, -1);
  125. uut1 = NULL;
  126. ast_test_validate(test, 2 == ao2_ref(data, 0)); /* uut1 unreffed data */
  127. ao2_ref(uut2, -1);
  128. uut2 = NULL;
  129. ast_test_validate(test, 1 == ao2_ref(data, 0)); /* uut2 unreffed data */
  130. return AST_TEST_PASS;
  131. }
  132. struct consumer {
  133. ast_cond_t out;
  134. struct stasis_message **messages_rxed;
  135. size_t messages_rxed_len;
  136. int ignore_subscriptions;
  137. int complete;
  138. };
  139. static void consumer_dtor(void *obj)
  140. {
  141. struct consumer *consumer = obj;
  142. ast_cond_destroy(&consumer->out);
  143. while (consumer->messages_rxed_len > 0) {
  144. ao2_cleanup(consumer->messages_rxed[--consumer->messages_rxed_len]);
  145. }
  146. ast_free(consumer->messages_rxed);
  147. consumer->messages_rxed = NULL;
  148. }
  149. static struct consumer *consumer_create(int ignore_subscriptions)
  150. {
  151. struct consumer *consumer;
  152. consumer = ao2_alloc(sizeof(*consumer), consumer_dtor);
  153. if (!consumer) {
  154. return NULL;
  155. }
  156. consumer->ignore_subscriptions = ignore_subscriptions;
  157. consumer->messages_rxed = ast_malloc(sizeof(*consumer->messages_rxed));
  158. if (!consumer->messages_rxed) {
  159. ao2_cleanup(consumer);
  160. return NULL;
  161. }
  162. ast_cond_init(&consumer->out, NULL);
  163. return consumer;
  164. }
  165. static void consumer_exec(void *data, struct stasis_subscription *sub, struct stasis_message *message)
  166. {
  167. struct consumer *consumer = data;
  168. RAII_VAR(struct consumer *, consumer_needs_cleanup, NULL, ao2_cleanup);
  169. SCOPED_AO2LOCK(lock, consumer);
  170. if (!consumer->ignore_subscriptions || stasis_message_type(message) != stasis_subscription_change_type()) {
  171. ++consumer->messages_rxed_len;
  172. consumer->messages_rxed = ast_realloc(consumer->messages_rxed, sizeof(*consumer->messages_rxed) * consumer->messages_rxed_len);
  173. ast_assert(consumer->messages_rxed != NULL);
  174. consumer->messages_rxed[consumer->messages_rxed_len - 1] = message;
  175. ao2_ref(message, +1);
  176. }
  177. if (stasis_subscription_final_message(sub, message)) {
  178. consumer->complete = 1;
  179. consumer_needs_cleanup = consumer;
  180. }
  181. ast_cond_signal(&consumer->out);
  182. }
  183. static void consumer_exec_sync(void *data, struct stasis_subscription *sub, struct stasis_message *message)
  184. {
  185. struct consumer *consumer = data;
  186. RAII_VAR(struct consumer *, consumer_needs_cleanup, NULL, ao2_cleanup);
  187. SCOPED_AO2LOCK(lock, consumer);
  188. if (!consumer->ignore_subscriptions || stasis_message_type(message) != stasis_subscription_change_type()) {
  189. ++consumer->messages_rxed_len;
  190. consumer->messages_rxed = ast_realloc(consumer->messages_rxed, sizeof(*consumer->messages_rxed) * consumer->messages_rxed_len);
  191. ast_assert(consumer->messages_rxed != NULL);
  192. consumer->messages_rxed[consumer->messages_rxed_len - 1] = message;
  193. ao2_ref(message, +1);
  194. }
  195. if (stasis_subscription_final_message(sub, message)) {
  196. consumer->complete = 1;
  197. consumer_needs_cleanup = consumer;
  198. }
  199. }
  200. static int consumer_wait_for(struct consumer *consumer, size_t expected_len)
  201. {
  202. struct timeval start = ast_tvnow();
  203. struct timespec end = {
  204. .tv_sec = start.tv_sec + 30,
  205. .tv_nsec = start.tv_usec * 1000
  206. };
  207. SCOPED_AO2LOCK(lock, consumer);
  208. while (consumer->messages_rxed_len < expected_len) {
  209. int r = ast_cond_timedwait(&consumer->out, ao2_object_get_lockaddr(consumer), &end);
  210. if (r == ETIMEDOUT) {
  211. break;
  212. }
  213. ast_assert(r == 0); /* Not expecting any othet types of errors */
  214. }
  215. return consumer->messages_rxed_len;
  216. }
  217. static int consumer_wait_for_completion(struct consumer *consumer)
  218. {
  219. struct timeval start = ast_tvnow();
  220. struct timespec end = {
  221. .tv_sec = start.tv_sec + 3,
  222. .tv_nsec = start.tv_usec * 1000
  223. };
  224. SCOPED_AO2LOCK(lock, consumer);
  225. while (!consumer->complete) {
  226. int r = ast_cond_timedwait(&consumer->out, ao2_object_get_lockaddr(consumer), &end);
  227. if (r == ETIMEDOUT) {
  228. break;
  229. }
  230. ast_assert(r == 0); /* Not expecting any othet types of errors */
  231. }
  232. return consumer->complete;
  233. }
  234. static int consumer_should_stay(struct consumer *consumer, size_t expected_len)
  235. {
  236. struct timeval start = ast_tvnow();
  237. struct timeval diff = {
  238. .tv_sec = 0,
  239. .tv_usec = 100000 /* wait for 100ms */
  240. };
  241. struct timeval end_tv = ast_tvadd(start, diff);
  242. struct timespec end = {
  243. .tv_sec = end_tv.tv_sec,
  244. .tv_nsec = end_tv.tv_usec * 1000
  245. };
  246. SCOPED_AO2LOCK(lock, consumer);
  247. while (consumer->messages_rxed_len == expected_len) {
  248. int r = ast_cond_timedwait(&consumer->out, ao2_object_get_lockaddr(consumer), &end);
  249. if (r == ETIMEDOUT) {
  250. break;
  251. }
  252. ast_assert(r == 0); /* Not expecting any othet types of errors */
  253. }
  254. return consumer->messages_rxed_len;
  255. }
  256. AST_TEST_DEFINE(subscription_messages)
  257. {
  258. RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
  259. RAII_VAR(struct stasis_subscription *, uut, NULL, stasis_unsubscribe);
  260. RAII_VAR(char *, test_data, NULL, ao2_cleanup);
  261. RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup);
  262. RAII_VAR(struct stasis_message *, test_message, NULL, ao2_cleanup);
  263. RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
  264. RAII_VAR(char *, expected_uniqueid, NULL, ast_free);
  265. int complete;
  266. struct stasis_subscription_change *change;
  267. switch (cmd) {
  268. case TEST_INIT:
  269. info->name = __func__;
  270. info->category = test_category;
  271. info->summary = "Test subscribe/unsubscribe messages";
  272. info->description = "Test subscribe/unsubscribe messages";
  273. return AST_TEST_NOT_RUN;
  274. case TEST_EXECUTE:
  275. break;
  276. }
  277. topic = stasis_topic_create("TestTopic");
  278. ast_test_validate(test, NULL != topic);
  279. consumer = consumer_create(0);
  280. ast_test_validate(test, NULL != consumer);
  281. uut = stasis_subscribe(topic, consumer_exec, consumer);
  282. ast_test_validate(test, NULL != uut);
  283. ao2_ref(consumer, +1);
  284. expected_uniqueid = ast_strdup(stasis_subscription_uniqueid(uut));
  285. uut = stasis_unsubscribe(uut);
  286. complete = consumer_wait_for_completion(consumer);
  287. ast_test_validate(test, 1 == complete);
  288. ast_test_validate(test, 2 == consumer->messages_rxed_len);
  289. ast_test_validate(test, stasis_subscription_change_type() == stasis_message_type(consumer->messages_rxed[0]));
  290. ast_test_validate(test, stasis_subscription_change_type() == stasis_message_type(consumer->messages_rxed[1]));
  291. change = stasis_message_data(consumer->messages_rxed[0]);
  292. ast_test_validate(test, topic == change->topic);
  293. ast_test_validate(test, 0 == strcmp("Subscribe", change->description));
  294. ast_test_validate(test, 0 == strcmp(expected_uniqueid, change->uniqueid));
  295. change = stasis_message_data(consumer->messages_rxed[1]);
  296. ast_test_validate(test, topic == change->topic);
  297. ast_test_validate(test, 0 == strcmp("Unsubscribe", change->description));
  298. ast_test_validate(test, 0 == strcmp(expected_uniqueid, change->uniqueid));
  299. return AST_TEST_PASS;
  300. }
  301. AST_TEST_DEFINE(subscription_pool_messages)
  302. {
  303. RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
  304. RAII_VAR(struct stasis_subscription *, uut, NULL, stasis_unsubscribe);
  305. RAII_VAR(char *, test_data, NULL, ao2_cleanup);
  306. RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup);
  307. RAII_VAR(struct stasis_message *, test_message, NULL, ao2_cleanup);
  308. RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
  309. RAII_VAR(char *, expected_uniqueid, NULL, ast_free);
  310. int complete;
  311. struct stasis_subscription_change *change;
  312. switch (cmd) {
  313. case TEST_INIT:
  314. info->name = __func__;
  315. info->category = test_category;
  316. info->summary = "Test subscribe/unsubscribe messages using a threadpool subscription";
  317. info->description = "Test subscribe/unsubscribe messages using a threadpool subscription";
  318. return AST_TEST_NOT_RUN;
  319. case TEST_EXECUTE:
  320. break;
  321. }
  322. topic = stasis_topic_create("TestTopic");
  323. ast_test_validate(test, NULL != topic);
  324. consumer = consumer_create(0);
  325. ast_test_validate(test, NULL != consumer);
  326. uut = stasis_subscribe_pool(topic, consumer_exec, consumer);
  327. ast_test_validate(test, NULL != uut);
  328. ao2_ref(consumer, +1);
  329. expected_uniqueid = ast_strdup(stasis_subscription_uniqueid(uut));
  330. uut = stasis_unsubscribe(uut);
  331. complete = consumer_wait_for_completion(consumer);
  332. ast_test_validate(test, 1 == complete);
  333. ast_test_validate(test, 2 == consumer->messages_rxed_len);
  334. ast_test_validate(test, stasis_subscription_change_type() == stasis_message_type(consumer->messages_rxed[0]));
  335. ast_test_validate(test, stasis_subscription_change_type() == stasis_message_type(consumer->messages_rxed[1]));
  336. change = stasis_message_data(consumer->messages_rxed[0]);
  337. ast_test_validate(test, topic == change->topic);
  338. ast_test_validate(test, 0 == strcmp("Subscribe", change->description));
  339. ast_test_validate(test, 0 == strcmp(expected_uniqueid, change->uniqueid));
  340. change = stasis_message_data(consumer->messages_rxed[1]);
  341. ast_test_validate(test, topic == change->topic);
  342. ast_test_validate(test, 0 == strcmp("Unsubscribe", change->description));
  343. ast_test_validate(test, 0 == strcmp(expected_uniqueid, change->uniqueid));
  344. return AST_TEST_PASS;
  345. }
  346. AST_TEST_DEFINE(publish)
  347. {
  348. RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
  349. RAII_VAR(struct stasis_subscription *, uut, NULL, stasis_unsubscribe);
  350. RAII_VAR(char *, test_data, NULL, ao2_cleanup);
  351. RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup);
  352. RAII_VAR(struct stasis_message *, test_message, NULL, ao2_cleanup);
  353. RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
  354. int actual_len;
  355. const char *actual;
  356. switch (cmd) {
  357. case TEST_INIT:
  358. info->name = __func__;
  359. info->category = test_category;
  360. info->summary = "Test publishing";
  361. info->description = "Test publishing";
  362. return AST_TEST_NOT_RUN;
  363. case TEST_EXECUTE:
  364. break;
  365. }
  366. topic = stasis_topic_create("TestTopic");
  367. ast_test_validate(test, NULL != topic);
  368. consumer = consumer_create(1);
  369. ast_test_validate(test, NULL != consumer);
  370. uut = stasis_subscribe(topic, consumer_exec, consumer);
  371. ast_test_validate(test, NULL != uut);
  372. ao2_ref(consumer, +1);
  373. test_data = ao2_alloc(1, NULL);
  374. ast_test_validate(test, NULL != test_data);
  375. ast_test_validate(test, stasis_message_type_create("TestMessage", NULL, &test_message_type) == STASIS_MESSAGE_TYPE_SUCCESS);
  376. test_message = stasis_message_create(test_message_type, test_data);
  377. stasis_publish(topic, test_message);
  378. actual_len = consumer_wait_for(consumer, 1);
  379. ast_test_validate(test, 1 == actual_len);
  380. actual = stasis_message_data(consumer->messages_rxed[0]);
  381. ast_test_validate(test, test_data == actual);
  382. return AST_TEST_PASS;
  383. }
  384. AST_TEST_DEFINE(publish_sync)
  385. {
  386. RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
  387. RAII_VAR(struct stasis_subscription *, uut, NULL, stasis_unsubscribe);
  388. RAII_VAR(char *, test_data, NULL, ao2_cleanup);
  389. RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup);
  390. RAII_VAR(struct stasis_message *, test_message, NULL, ao2_cleanup);
  391. RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
  392. int actual_len;
  393. const char *actual;
  394. switch (cmd) {
  395. case TEST_INIT:
  396. info->name = __func__;
  397. info->category = test_category;
  398. info->summary = "Test synchronous publishing";
  399. info->description = "Test synchronous publishing";
  400. return AST_TEST_NOT_RUN;
  401. case TEST_EXECUTE:
  402. break;
  403. }
  404. topic = stasis_topic_create("TestTopic");
  405. ast_test_validate(test, NULL != topic);
  406. consumer = consumer_create(1);
  407. ast_test_validate(test, NULL != consumer);
  408. uut = stasis_subscribe(topic, consumer_exec_sync, consumer);
  409. ast_test_validate(test, NULL != uut);
  410. ao2_ref(consumer, +1);
  411. test_data = ao2_alloc(1, NULL);
  412. ast_test_validate(test, NULL != test_data);
  413. ast_test_validate(test, stasis_message_type_create("TestMessage", NULL, &test_message_type) == STASIS_MESSAGE_TYPE_SUCCESS);
  414. test_message = stasis_message_create(test_message_type, test_data);
  415. stasis_publish_sync(uut, test_message);
  416. actual_len = consumer->messages_rxed_len;
  417. ast_test_validate(test, 1 == actual_len);
  418. actual = stasis_message_data(consumer->messages_rxed[0]);
  419. ast_test_validate(test, test_data == actual);
  420. return AST_TEST_PASS;
  421. }
  422. AST_TEST_DEFINE(publish_pool)
  423. {
  424. RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
  425. RAII_VAR(struct stasis_subscription *, uut, NULL, stasis_unsubscribe);
  426. RAII_VAR(char *, test_data, NULL, ao2_cleanup);
  427. RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup);
  428. RAII_VAR(struct stasis_message *, test_message, NULL, ao2_cleanup);
  429. RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
  430. int actual_len;
  431. const char *actual;
  432. switch (cmd) {
  433. case TEST_INIT:
  434. info->name = __func__;
  435. info->category = test_category;
  436. info->summary = "Test publishing with a threadpool";
  437. info->description = "Test publishing to a subscriber whose\n"
  438. "subscription dictates messages are received through a\n"
  439. "threadpool.";
  440. return AST_TEST_NOT_RUN;
  441. case TEST_EXECUTE:
  442. break;
  443. }
  444. topic = stasis_topic_create("TestTopic");
  445. ast_test_validate(test, NULL != topic);
  446. consumer = consumer_create(1);
  447. ast_test_validate(test, NULL != consumer);
  448. uut = stasis_subscribe_pool(topic, consumer_exec, consumer);
  449. ast_test_validate(test, NULL != uut);
  450. ao2_ref(consumer, +1);
  451. test_data = ao2_alloc(1, NULL);
  452. ast_test_validate(test, NULL != test_data);
  453. ast_test_validate(test, stasis_message_type_create("TestMessage", NULL, &test_message_type) == STASIS_MESSAGE_TYPE_SUCCESS);
  454. test_message = stasis_message_create(test_message_type, test_data);
  455. stasis_publish(topic, test_message);
  456. actual_len = consumer_wait_for(consumer, 1);
  457. ast_test_validate(test, 1 == actual_len);
  458. actual = stasis_message_data(consumer->messages_rxed[0]);
  459. ast_test_validate(test, test_data == actual);
  460. return AST_TEST_PASS;
  461. }
  462. AST_TEST_DEFINE(unsubscribe_stops_messages)
  463. {
  464. RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
  465. RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
  466. RAII_VAR(struct stasis_subscription *, uut, NULL, stasis_unsubscribe);
  467. RAII_VAR(char *, test_data, NULL, ao2_cleanup);
  468. RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup);
  469. RAII_VAR(struct stasis_message *, test_message, NULL, ao2_cleanup);
  470. int actual_len;
  471. switch (cmd) {
  472. case TEST_INIT:
  473. info->name = __func__;
  474. info->category = test_category;
  475. info->summary = "Test simple subscriptions";
  476. info->description = "Test simple subscriptions";
  477. return AST_TEST_NOT_RUN;
  478. case TEST_EXECUTE:
  479. break;
  480. }
  481. topic = stasis_topic_create("TestTopic");
  482. ast_test_validate(test, NULL != topic);
  483. consumer = consumer_create(1);
  484. ast_test_validate(test, NULL != consumer);
  485. uut = stasis_subscribe(topic, consumer_exec, consumer);
  486. ast_test_validate(test, NULL != uut);
  487. ao2_ref(consumer, +1);
  488. uut = stasis_unsubscribe(uut);
  489. test_data = ao2_alloc(1, NULL);
  490. ast_test_validate(test, NULL != test_data);
  491. ast_test_validate(test, stasis_message_type_create("TestMessage", NULL, &test_message_type) == STASIS_MESSAGE_TYPE_SUCCESS);
  492. test_message = stasis_message_create(test_message_type, test_data);
  493. stasis_publish(topic, test_message);
  494. actual_len = consumer_should_stay(consumer, 0);
  495. ast_test_validate(test, 0 == actual_len);
  496. return AST_TEST_PASS;
  497. }
  498. AST_TEST_DEFINE(forward)
  499. {
  500. RAII_VAR(struct stasis_topic *, parent_topic, NULL, ao2_cleanup);
  501. RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
  502. RAII_VAR(struct consumer *, parent_consumer, NULL, ao2_cleanup);
  503. RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
  504. RAII_VAR(struct stasis_forward *, forward_sub, NULL, stasis_forward_cancel);
  505. RAII_VAR(struct stasis_subscription *, parent_sub, NULL, stasis_unsubscribe);
  506. RAII_VAR(struct stasis_subscription *, sub, NULL, stasis_unsubscribe);
  507. RAII_VAR(char *, test_data, NULL, ao2_cleanup);
  508. RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup);
  509. RAII_VAR(struct stasis_message *, test_message, NULL, ao2_cleanup);
  510. int actual_len;
  511. switch (cmd) {
  512. case TEST_INIT:
  513. info->name = __func__;
  514. info->category = test_category;
  515. info->summary = "Test sending events to a parent topic";
  516. info->description = "Test sending events to a parent topic.\n"
  517. "This test creates three topics (one parent, two children)\n"
  518. "and publishes a message to one child, and verifies it's\n"
  519. "only seen by that child and the parent";
  520. return AST_TEST_NOT_RUN;
  521. case TEST_EXECUTE:
  522. break;
  523. }
  524. parent_topic = stasis_topic_create("ParentTestTopic");
  525. ast_test_validate(test, NULL != parent_topic);
  526. topic = stasis_topic_create("TestTopic");
  527. ast_test_validate(test, NULL != topic);
  528. forward_sub = stasis_forward_all(topic, parent_topic);
  529. ast_test_validate(test, NULL != forward_sub);
  530. parent_consumer = consumer_create(1);
  531. ast_test_validate(test, NULL != parent_consumer);
  532. consumer = consumer_create(1);
  533. ast_test_validate(test, NULL != consumer);
  534. parent_sub = stasis_subscribe(parent_topic, consumer_exec, parent_consumer);
  535. ast_test_validate(test, NULL != parent_sub);
  536. ao2_ref(parent_consumer, +1);
  537. sub = stasis_subscribe(topic, consumer_exec, consumer);
  538. ast_test_validate(test, NULL != sub);
  539. ao2_ref(consumer, +1);
  540. test_data = ao2_alloc(1, NULL);
  541. ast_test_validate(test, NULL != test_data);
  542. ast_test_validate(test, stasis_message_type_create("TestMessage", NULL, &test_message_type) == STASIS_MESSAGE_TYPE_SUCCESS);
  543. test_message = stasis_message_create(test_message_type, test_data);
  544. stasis_publish(topic, test_message);
  545. actual_len = consumer_wait_for(consumer, 1);
  546. ast_test_validate(test, 1 == actual_len);
  547. actual_len = consumer_wait_for(parent_consumer, 1);
  548. ast_test_validate(test, 1 == actual_len);
  549. return AST_TEST_PASS;
  550. }
  551. AST_TEST_DEFINE(interleaving)
  552. {
  553. RAII_VAR(struct stasis_topic *, parent_topic, NULL, ao2_cleanup);
  554. RAII_VAR(struct stasis_topic *, topic1, NULL, ao2_cleanup);
  555. RAII_VAR(struct stasis_topic *, topic2, NULL, ao2_cleanup);
  556. RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup);
  557. RAII_VAR(char *, test_data, NULL, ao2_cleanup);
  558. RAII_VAR(struct stasis_message *, test_message1, NULL, ao2_cleanup);
  559. RAII_VAR(struct stasis_message *, test_message2, NULL, ao2_cleanup);
  560. RAII_VAR(struct stasis_message *, test_message3, NULL, ao2_cleanup);
  561. RAII_VAR(struct stasis_forward *, forward_sub1, NULL, stasis_forward_cancel);
  562. RAII_VAR(struct stasis_forward *, forward_sub2, NULL, stasis_forward_cancel);
  563. RAII_VAR(struct stasis_subscription *, sub, NULL, stasis_unsubscribe);
  564. RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
  565. int actual_len;
  566. switch (cmd) {
  567. case TEST_INIT:
  568. info->name = __func__;
  569. info->category = test_category;
  570. info->summary = "Test sending interleaved events to a parent topic";
  571. info->description = "Test sending events to a parent topic.\n"
  572. "This test creates three topics (one parent, two children)\n"
  573. "and publishes messages alternately between the children.\n"
  574. "It verifies that the messages are received in the expected\n"
  575. "order.";
  576. return AST_TEST_NOT_RUN;
  577. case TEST_EXECUTE:
  578. break;
  579. }
  580. ast_test_validate(test, stasis_message_type_create("test", NULL, &test_message_type) == STASIS_MESSAGE_TYPE_SUCCESS);
  581. ast_test_validate(test, NULL != test_message_type);
  582. test_data = ao2_alloc(1, NULL);
  583. ast_test_validate(test, NULL != test_data);
  584. test_message1 = stasis_message_create(test_message_type, test_data);
  585. ast_test_validate(test, NULL != test_message1);
  586. test_message2 = stasis_message_create(test_message_type, test_data);
  587. ast_test_validate(test, NULL != test_message2);
  588. test_message3 = stasis_message_create(test_message_type, test_data);
  589. ast_test_validate(test, NULL != test_message3);
  590. parent_topic = stasis_topic_create("ParentTestTopic");
  591. ast_test_validate(test, NULL != parent_topic);
  592. topic1 = stasis_topic_create("Topic1");
  593. ast_test_validate(test, NULL != topic1);
  594. topic2 = stasis_topic_create("Topic2");
  595. ast_test_validate(test, NULL != topic2);
  596. forward_sub1 = stasis_forward_all(topic1, parent_topic);
  597. ast_test_validate(test, NULL != forward_sub1);
  598. forward_sub2 = stasis_forward_all(topic2, parent_topic);
  599. ast_test_validate(test, NULL != forward_sub2);
  600. consumer = consumer_create(1);
  601. ast_test_validate(test, NULL != consumer);
  602. sub = stasis_subscribe(parent_topic, consumer_exec, consumer);
  603. ast_test_validate(test, NULL != sub);
  604. ao2_ref(consumer, +1);
  605. stasis_publish(topic1, test_message1);
  606. stasis_publish(topic2, test_message2);
  607. stasis_publish(topic1, test_message3);
  608. actual_len = consumer_wait_for(consumer, 3);
  609. ast_test_validate(test, 3 == actual_len);
  610. ast_test_validate(test, test_message1 == consumer->messages_rxed[0]);
  611. ast_test_validate(test, test_message2 == consumer->messages_rxed[1]);
  612. ast_test_validate(test, test_message3 == consumer->messages_rxed[2]);
  613. return AST_TEST_PASS;
  614. }
  615. AST_TEST_DEFINE(subscription_interleaving)
  616. {
  617. RAII_VAR(struct stasis_topic *, parent_topic, NULL, ao2_cleanup);
  618. RAII_VAR(struct stasis_topic *, topic1, NULL, ao2_cleanup);
  619. RAII_VAR(struct stasis_topic *, topic2, NULL, ao2_cleanup);
  620. RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup);
  621. RAII_VAR(char *, test_data, NULL, ao2_cleanup);
  622. RAII_VAR(struct stasis_message *, test_message1, NULL, ao2_cleanup);
  623. RAII_VAR(struct stasis_message *, test_message2, NULL, ao2_cleanup);
  624. RAII_VAR(struct stasis_message *, test_message3, NULL, ao2_cleanup);
  625. RAII_VAR(struct stasis_forward *, forward_sub1, NULL, stasis_forward_cancel);
  626. RAII_VAR(struct stasis_forward *, forward_sub2, NULL, stasis_forward_cancel);
  627. RAII_VAR(struct stasis_subscription *, sub1, NULL, stasis_unsubscribe);
  628. RAII_VAR(struct stasis_subscription *, sub2, NULL, stasis_unsubscribe);
  629. RAII_VAR(struct consumer *, consumer1, NULL, ao2_cleanup);
  630. RAII_VAR(struct consumer *, consumer2, NULL, ao2_cleanup);
  631. int actual_len;
  632. switch (cmd) {
  633. case TEST_INIT:
  634. info->name = __func__;
  635. info->category = test_category;
  636. info->summary = "Test sending interleaved events to a parent topic with different subscribers";
  637. info->description = "Test sending events to a parent topic.\n"
  638. "This test creates three topics (one parent, two children)\n"
  639. "and publishes messages alternately between the children.\n"
  640. "It verifies that the messages are received in the expected\n"
  641. "order, for different subscription types: one with a dedicated\n"
  642. "thread, the other on the Stasis threadpool.";
  643. return AST_TEST_NOT_RUN;
  644. case TEST_EXECUTE:
  645. break;
  646. }
  647. ast_test_validate(test, stasis_message_type_create("test", NULL, &test_message_type) == STASIS_MESSAGE_TYPE_SUCCESS);
  648. ast_test_validate(test, NULL != test_message_type);
  649. test_data = ao2_alloc(1, NULL);
  650. ast_test_validate(test, NULL != test_data);
  651. test_message1 = stasis_message_create(test_message_type, test_data);
  652. ast_test_validate(test, NULL != test_message1);
  653. test_message2 = stasis_message_create(test_message_type, test_data);
  654. ast_test_validate(test, NULL != test_message2);
  655. test_message3 = stasis_message_create(test_message_type, test_data);
  656. ast_test_validate(test, NULL != test_message3);
  657. parent_topic = stasis_topic_create("ParentTestTopic");
  658. ast_test_validate(test, NULL != parent_topic);
  659. topic1 = stasis_topic_create("Topic1");
  660. ast_test_validate(test, NULL != topic1);
  661. topic2 = stasis_topic_create("Topic2");
  662. ast_test_validate(test, NULL != topic2);
  663. forward_sub1 = stasis_forward_all(topic1, parent_topic);
  664. ast_test_validate(test, NULL != forward_sub1);
  665. forward_sub2 = stasis_forward_all(topic2, parent_topic);
  666. ast_test_validate(test, NULL != forward_sub2);
  667. consumer1 = consumer_create(1);
  668. ast_test_validate(test, NULL != consumer1);
  669. consumer2 = consumer_create(1);
  670. ast_test_validate(test, NULL != consumer2);
  671. sub1 = stasis_subscribe(parent_topic, consumer_exec, consumer1);
  672. ast_test_validate(test, NULL != sub1);
  673. ao2_ref(consumer1, +1);
  674. sub2 = stasis_subscribe_pool(parent_topic, consumer_exec, consumer2);
  675. ast_test_validate(test, NULL != sub2);
  676. ao2_ref(consumer2, +1);
  677. stasis_publish(topic1, test_message1);
  678. stasis_publish(topic2, test_message2);
  679. stasis_publish(topic1, test_message3);
  680. actual_len = consumer_wait_for(consumer1, 3);
  681. ast_test_validate(test, 3 == actual_len);
  682. actual_len = consumer_wait_for(consumer2, 3);
  683. ast_test_validate(test, 3 == actual_len);
  684. ast_test_validate(test, test_message1 == consumer1->messages_rxed[0]);
  685. ast_test_validate(test, test_message2 == consumer1->messages_rxed[1]);
  686. ast_test_validate(test, test_message3 == consumer1->messages_rxed[2]);
  687. ast_test_validate(test, test_message1 == consumer2->messages_rxed[0]);
  688. ast_test_validate(test, test_message2 == consumer2->messages_rxed[1]);
  689. ast_test_validate(test, test_message3 == consumer2->messages_rxed[2]);
  690. return AST_TEST_PASS;
  691. }
  692. struct cache_test_data {
  693. char *id;
  694. char *value;
  695. };
  696. static void cache_test_data_dtor(void *obj)
  697. {
  698. struct cache_test_data *data = obj;
  699. ast_free(data->id);
  700. ast_free(data->value);
  701. }
  702. static struct stasis_message *cache_test_message_create_full(struct stasis_message_type *type, const char *name, const char *value, struct ast_eid *eid)
  703. {
  704. RAII_VAR(struct cache_test_data *, data, NULL, ao2_cleanup);
  705. data = ao2_alloc(sizeof(*data), cache_test_data_dtor);
  706. if (data == NULL) {
  707. return NULL;
  708. }
  709. ast_assert(name != NULL);
  710. ast_assert(value != NULL);
  711. data->id = ast_strdup(name);
  712. data->value = ast_strdup(value);
  713. if (!data->id || !data->value) {
  714. return NULL;
  715. }
  716. return stasis_message_create_full(type, data, eid);
  717. }
  718. static struct stasis_message *cache_test_message_create(struct stasis_message_type *type, const char *name, const char *value)
  719. {
  720. return cache_test_message_create_full(type, name, value, &ast_eid_default);
  721. }
  722. static const char *cache_test_data_id(struct stasis_message *message)
  723. {
  724. struct cache_test_data *cachable = stasis_message_data(message);
  725. if (0 != strcmp("Cacheable", stasis_message_type_name(stasis_message_type(message)))) {
  726. return NULL;
  727. }
  728. return cachable->id;
  729. }
  730. static struct stasis_message *cache_test_aggregate_calc_fn(struct stasis_cache_entry *entry, struct stasis_message *new_snapshot)
  731. {
  732. struct stasis_message *aggregate_snapshot;
  733. struct stasis_message *snapshot;
  734. struct stasis_message_type *type = NULL;
  735. struct cache_test_data *test_data = NULL;
  736. int idx;
  737. int accumulated = 0;
  738. char aggregate_str[30];
  739. /* Accumulate the aggregate value. */
  740. snapshot = stasis_cache_entry_get_local(entry);
  741. if (snapshot) {
  742. type = stasis_message_type(snapshot);
  743. test_data = stasis_message_data(snapshot);
  744. accumulated += atoi(test_data->value);
  745. }
  746. for (idx = 0; ; ++idx) {
  747. snapshot = stasis_cache_entry_get_remote(entry, idx);
  748. if (!snapshot) {
  749. break;
  750. }
  751. type = stasis_message_type(snapshot);
  752. test_data = stasis_message_data(snapshot);
  753. accumulated += atoi(test_data->value);
  754. }
  755. if (!test_data) {
  756. /* There are no test entries cached. Delete the aggregate. */
  757. return NULL;
  758. }
  759. snapshot = stasis_cache_entry_get_aggregate(entry);
  760. if (snapshot) {
  761. type = stasis_message_type(snapshot);
  762. test_data = stasis_message_data(snapshot);
  763. if (accumulated == atoi(test_data->value)) {
  764. /* Aggregate test entry did not change. */
  765. return ao2_bump(snapshot);
  766. }
  767. }
  768. snprintf(aggregate_str, sizeof(aggregate_str), "%d", accumulated);
  769. aggregate_snapshot = cache_test_message_create_full(type, test_data->id, aggregate_str, NULL);
  770. if (!aggregate_snapshot) {
  771. /* Bummer. We have to keep the old aggregate snapshot. */
  772. ast_log(LOG_ERROR, "Could not create aggregate snapshot.\n");
  773. return ao2_bump(snapshot);
  774. }
  775. return aggregate_snapshot;
  776. }
  777. static void cache_test_aggregate_publish_fn(struct stasis_topic *topic, struct stasis_message *aggregate)
  778. {
  779. stasis_publish(topic, aggregate);
  780. }
  781. static int check_cache_aggregate(struct stasis_cache *cache, struct stasis_message_type *cache_type, const char *id, const char *value)
  782. {
  783. RAII_VAR(struct stasis_message *, aggregate, NULL, ao2_cleanup);
  784. struct cache_test_data *test_data;
  785. aggregate = stasis_cache_get_by_eid(cache, cache_type, id, NULL);
  786. if (!aggregate) {
  787. /* No aggregate, return true if given no value. */
  788. return !value;
  789. }
  790. /* Return true if the given value matches the aggregate value. */
  791. test_data = stasis_message_data(aggregate);
  792. return value && !strcmp(value, test_data->value);
  793. }
  794. AST_TEST_DEFINE(cache_filter)
  795. {
  796. RAII_VAR(struct stasis_message_type *, non_cache_type, NULL, ao2_cleanup);
  797. RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
  798. RAII_VAR(struct stasis_cache *, cache, NULL, ao2_cleanup);
  799. RAII_VAR(struct stasis_caching_topic *, caching_topic, NULL, stasis_caching_unsubscribe);
  800. RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
  801. RAII_VAR(struct stasis_subscription *, sub, NULL, stasis_unsubscribe);
  802. RAII_VAR(struct stasis_message *, test_message, NULL, ao2_cleanup);
  803. int actual_len;
  804. switch (cmd) {
  805. case TEST_INIT:
  806. info->name = __func__;
  807. info->category = test_category;
  808. info->summary = "Test caching topics only forward cache_update messages.";
  809. info->description = "Test caching topics only forward cache_update messages.";
  810. return AST_TEST_NOT_RUN;
  811. case TEST_EXECUTE:
  812. break;
  813. }
  814. ast_test_validate(test, stasis_message_type_create("NonCacheable", NULL, &non_cache_type) == STASIS_MESSAGE_TYPE_SUCCESS);
  815. ast_test_validate(test, NULL != non_cache_type);
  816. topic = stasis_topic_create("SomeTopic");
  817. ast_test_validate(test, NULL != topic);
  818. cache = stasis_cache_create(cache_test_data_id);
  819. ast_test_validate(test, NULL != cache);
  820. caching_topic = stasis_caching_topic_create(topic, cache);
  821. ast_test_validate(test, NULL != caching_topic);
  822. consumer = consumer_create(1);
  823. ast_test_validate(test, NULL != consumer);
  824. sub = stasis_subscribe(stasis_caching_get_topic(caching_topic), consumer_exec, consumer);
  825. ast_test_validate(test, NULL != sub);
  826. ao2_ref(consumer, +1);
  827. test_message = cache_test_message_create(non_cache_type, "1", "1");
  828. ast_test_validate(test, NULL != test_message);
  829. stasis_publish(topic, test_message);
  830. actual_len = consumer_should_stay(consumer, 0);
  831. ast_test_validate(test, 0 == actual_len);
  832. return AST_TEST_PASS;
  833. }
  834. AST_TEST_DEFINE(cache)
  835. {
  836. RAII_VAR(struct stasis_message_type *, cache_type, NULL, ao2_cleanup);
  837. RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
  838. RAII_VAR(struct stasis_cache *, cache, NULL, ao2_cleanup);
  839. RAII_VAR(struct stasis_caching_topic *, caching_topic, NULL, stasis_caching_unsubscribe);
  840. RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
  841. RAII_VAR(struct stasis_subscription *, sub, NULL, stasis_unsubscribe);
  842. RAII_VAR(struct stasis_message *, test_message1_1, NULL, ao2_cleanup);
  843. RAII_VAR(struct stasis_message *, test_message2_1, NULL, ao2_cleanup);
  844. RAII_VAR(struct stasis_message *, test_message2_2, NULL, ao2_cleanup);
  845. RAII_VAR(struct stasis_message *, test_message1_clear, NULL, ao2_cleanup);
  846. int actual_len;
  847. struct stasis_cache_update *actual_update;
  848. switch (cmd) {
  849. case TEST_INIT:
  850. info->name = __func__;
  851. info->category = test_category;
  852. info->summary = "Test passing messages through cache topic unscathed.";
  853. info->description = "Test passing messages through cache topic unscathed.";
  854. return AST_TEST_NOT_RUN;
  855. case TEST_EXECUTE:
  856. break;
  857. }
  858. ast_test_validate(test, stasis_message_type_create("Cacheable", NULL, &cache_type) == STASIS_MESSAGE_TYPE_SUCCESS);
  859. ast_test_validate(test, NULL != cache_type);
  860. topic = stasis_topic_create("SomeTopic");
  861. ast_test_validate(test, NULL != topic);
  862. cache = stasis_cache_create(cache_test_data_id);
  863. ast_test_validate(test, NULL != cache);
  864. caching_topic = stasis_caching_topic_create(topic, cache);
  865. ast_test_validate(test, NULL != caching_topic);
  866. consumer = consumer_create(1);
  867. ast_test_validate(test, NULL != consumer);
  868. sub = stasis_subscribe(stasis_caching_get_topic(caching_topic), consumer_exec, consumer);
  869. ast_test_validate(test, NULL != sub);
  870. ao2_ref(consumer, +1);
  871. test_message1_1 = cache_test_message_create(cache_type, "1", "1");
  872. ast_test_validate(test, NULL != test_message1_1);
  873. test_message2_1 = cache_test_message_create(cache_type, "2", "1");
  874. ast_test_validate(test, NULL != test_message2_1);
  875. /* Post a couple of snapshots */
  876. stasis_publish(topic, test_message1_1);
  877. stasis_publish(topic, test_message2_1);
  878. actual_len = consumer_wait_for(consumer, 2);
  879. ast_test_validate(test, 2 == actual_len);
  880. /* Check for new snapshot messages */
  881. ast_test_validate(test, stasis_cache_update_type() == stasis_message_type(consumer->messages_rxed[0]));
  882. actual_update = stasis_message_data(consumer->messages_rxed[0]);
  883. ast_test_validate(test, NULL == actual_update->old_snapshot);
  884. ast_test_validate(test, test_message1_1 == actual_update->new_snapshot);
  885. ast_test_validate(test, test_message1_1 == stasis_cache_get(cache, cache_type, "1"));
  886. /* stasis_cache_get returned a ref, so unref test_message1_1 */
  887. ao2_ref(test_message1_1, -1);
  888. ast_test_validate(test, stasis_cache_update_type() == stasis_message_type(consumer->messages_rxed[1]));
  889. actual_update = stasis_message_data(consumer->messages_rxed[1]);
  890. ast_test_validate(test, NULL == actual_update->old_snapshot);
  891. ast_test_validate(test, test_message2_1 == actual_update->new_snapshot);
  892. ast_test_validate(test, test_message2_1 == stasis_cache_get(cache, cache_type, "2"));
  893. /* stasis_cache_get returned a ref, so unref test_message2_1 */
  894. ao2_ref(test_message2_1, -1);
  895. /* Update snapshot 2 */
  896. test_message2_2 = cache_test_message_create(cache_type, "2", "2");
  897. ast_test_validate(test, NULL != test_message2_2);
  898. stasis_publish(topic, test_message2_2);
  899. actual_len = consumer_wait_for(consumer, 3);
  900. ast_test_validate(test, 3 == actual_len);
  901. actual_update = stasis_message_data(consumer->messages_rxed[2]);
  902. ast_test_validate(test, test_message2_1 == actual_update->old_snapshot);
  903. ast_test_validate(test, test_message2_2 == actual_update->new_snapshot);
  904. ast_test_validate(test, test_message2_2 == stasis_cache_get(cache, cache_type, "2"));
  905. /* stasis_cache_get returned a ref, so unref test_message2_2 */
  906. ao2_ref(test_message2_2, -1);
  907. /* Clear snapshot 1 */
  908. test_message1_clear = stasis_cache_clear_create(test_message1_1);
  909. ast_test_validate(test, NULL != test_message1_clear);
  910. stasis_publish(topic, test_message1_clear);
  911. actual_len = consumer_wait_for(consumer, 4);
  912. ast_test_validate(test, 4 == actual_len);
  913. actual_update = stasis_message_data(consumer->messages_rxed[3]);
  914. ast_test_validate(test, test_message1_1 == actual_update->old_snapshot);
  915. ast_test_validate(test, NULL == actual_update->new_snapshot);
  916. ast_test_validate(test, NULL == stasis_cache_get(cache, cache_type, "1"));
  917. return AST_TEST_PASS;
  918. }
  919. AST_TEST_DEFINE(cache_dump)
  920. {
  921. RAII_VAR(struct stasis_message_type *, cache_type, NULL, ao2_cleanup);
  922. RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
  923. RAII_VAR(struct stasis_cache *, cache, NULL, ao2_cleanup);
  924. RAII_VAR(struct stasis_caching_topic *, caching_topic, NULL, stasis_caching_unsubscribe);
  925. RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
  926. RAII_VAR(struct stasis_subscription *, sub, NULL, stasis_unsubscribe);
  927. RAII_VAR(struct stasis_message *, test_message1_1, NULL, ao2_cleanup);
  928. RAII_VAR(struct stasis_message *, test_message2_1, NULL, ao2_cleanup);
  929. RAII_VAR(struct stasis_message *, test_message2_2, NULL, ao2_cleanup);
  930. RAII_VAR(struct stasis_message *, test_message1_clear, NULL, ao2_cleanup);
  931. RAII_VAR(struct ao2_container *, cache_dump, NULL, ao2_cleanup);
  932. int actual_len;
  933. struct ao2_iterator i;
  934. void *obj;
  935. switch (cmd) {
  936. case TEST_INIT:
  937. info->name = __func__;
  938. info->category = test_category;
  939. info->summary = "Test cache dump routines.";
  940. info->description = "Test cache dump routines.";
  941. return AST_TEST_NOT_RUN;
  942. case TEST_EXECUTE:
  943. break;
  944. }
  945. ast_test_validate(test, stasis_message_type_create("Cacheable", NULL, &cache_type) == STASIS_MESSAGE_TYPE_SUCCESS);
  946. ast_test_validate(test, NULL != cache_type);
  947. topic = stasis_topic_create("SomeTopic");
  948. ast_test_validate(test, NULL != topic);
  949. cache = stasis_cache_create(cache_test_data_id);
  950. ast_test_validate(test, NULL != cache);
  951. caching_topic = stasis_caching_topic_create(topic, cache);
  952. ast_test_validate(test, NULL != caching_topic);
  953. consumer = consumer_create(1);
  954. ast_test_validate(test, NULL != consumer);
  955. sub = stasis_subscribe(stasis_caching_get_topic(caching_topic), consumer_exec, consumer);
  956. ast_test_validate(test, NULL != sub);
  957. ao2_ref(consumer, +1);
  958. test_message1_1 = cache_test_message_create(cache_type, "1", "1");
  959. ast_test_validate(test, NULL != test_message1_1);
  960. test_message2_1 = cache_test_message_create(cache_type, "2", "1");
  961. ast_test_validate(test, NULL != test_message2_1);
  962. /* Post a couple of snapshots */
  963. stasis_publish(topic, test_message1_1);
  964. stasis_publish(topic, test_message2_1);
  965. actual_len = consumer_wait_for(consumer, 2);
  966. ast_test_validate(test, 2 == actual_len);
  967. /* Check the cache */
  968. ao2_cleanup(cache_dump);
  969. cache_dump = stasis_cache_dump(cache, NULL);
  970. ast_test_validate(test, NULL != cache_dump);
  971. ast_test_validate(test, 2 == ao2_container_count(cache_dump));
  972. i = ao2_iterator_init(cache_dump, 0);
  973. while ((obj = ao2_iterator_next(&i))) {
  974. RAII_VAR(struct stasis_message *, actual_cache_entry, obj, ao2_cleanup);
  975. ast_test_validate(test, actual_cache_entry == test_message1_1 || actual_cache_entry == test_message2_1);
  976. }
  977. ao2_iterator_destroy(&i);
  978. /* Update snapshot 2 */
  979. test_message2_2 = cache_test_message_create(cache_type, "2", "2");
  980. ast_test_validate(test, NULL != test_message2_2);
  981. stasis_publish(topic, test_message2_2);
  982. actual_len = consumer_wait_for(consumer, 3);
  983. ast_test_validate(test, 3 == actual_len);
  984. /* Check the cache */
  985. ao2_cleanup(cache_dump);
  986. cache_dump = stasis_cache_dump(cache, NULL);
  987. ast_test_validate(test, NULL != cache_dump);
  988. ast_test_validate(test, 2 == ao2_container_count(cache_dump));
  989. i = ao2_iterator_init(cache_dump, 0);
  990. while ((obj = ao2_iterator_next(&i))) {
  991. RAII_VAR(struct stasis_message *, actual_cache_entry, obj, ao2_cleanup);
  992. ast_test_validate(test, actual_cache_entry == test_message1_1 || actual_cache_entry == test_message2_2);
  993. }
  994. ao2_iterator_destroy(&i);
  995. /* Clear snapshot 1 */
  996. test_message1_clear = stasis_cache_clear_create(test_message1_1);
  997. ast_test_validate(test, NULL != test_message1_clear);
  998. stasis_publish(topic, test_message1_clear);
  999. actual_len = consumer_wait_for(consumer, 4);
  1000. ast_test_validate(test, 4 == actual_len);
  1001. /* Check the cache */
  1002. ao2_cleanup(cache_dump);
  1003. cache_dump = stasis_cache_dump(cache, NULL);
  1004. ast_test_validate(test, NULL != cache_dump);
  1005. ast_test_validate(test, 1 == ao2_container_count(cache_dump));
  1006. i = ao2_iterator_init(cache_dump, 0);
  1007. while ((obj = ao2_iterator_next(&i))) {
  1008. RAII_VAR(struct stasis_message *, actual_cache_entry, obj, ao2_cleanup);
  1009. ast_test_validate(test, actual_cache_entry == test_message2_2);
  1010. }
  1011. ao2_iterator_destroy(&i);
  1012. /* Dump the cache to ensure that it has no subscription change items in it since those aren't cached */
  1013. ao2_cleanup(cache_dump);
  1014. cache_dump = stasis_cache_dump(cache, stasis_subscription_change_type());
  1015. ast_test_validate(test, 0 == ao2_container_count(cache_dump));
  1016. return AST_TEST_PASS;
  1017. }
  1018. AST_TEST_DEFINE(cache_eid_aggregate)
  1019. {
  1020. RAII_VAR(struct stasis_message_type *, cache_type, NULL, ao2_cleanup);
  1021. RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
  1022. RAII_VAR(struct stasis_cache *, cache, NULL, ao2_cleanup);
  1023. RAII_VAR(struct stasis_caching_topic *, caching_topic, NULL, stasis_caching_unsubscribe);
  1024. RAII_VAR(struct consumer *, cache_consumer, NULL, ao2_cleanup);
  1025. RAII_VAR(struct consumer *, topic_consumer, NULL, ao2_cleanup);
  1026. RAII_VAR(struct stasis_subscription *, topic_sub, NULL, stasis_unsubscribe);
  1027. RAII_VAR(struct stasis_subscription *, cache_sub, NULL, stasis_unsubscribe);
  1028. RAII_VAR(struct stasis_message *, test_message1_1, NULL, ao2_cleanup);
  1029. RAII_VAR(struct stasis_message *, test_message2_1, NULL, ao2_cleanup);
  1030. RAII_VAR(struct stasis_message *, test_message2_2, NULL, ao2_cleanup);
  1031. RAII_VAR(struct stasis_message *, test_message2_3, NULL, ao2_cleanup);
  1032. RAII_VAR(struct stasis_message *, test_message2_4, NULL, ao2_cleanup);
  1033. RAII_VAR(struct stasis_message *, test_message1_clear, NULL, ao2_cleanup);
  1034. RAII_VAR(struct stasis_message *, test_message2_clear, NULL, ao2_cleanup);
  1035. RAII_VAR(struct ao2_container *, cache_dump, NULL, ao2_cleanup);
  1036. int actual_len;
  1037. struct ao2_iterator i;
  1038. void *obj;
  1039. struct ast_eid foreign_eid1;
  1040. struct ast_eid foreign_eid2;
  1041. switch (cmd) {
  1042. case TEST_INIT:
  1043. info->name = __func__;
  1044. info->category = test_category;
  1045. info->summary = "Test cache eid and aggregate support.";
  1046. info->description = "Test cache eid and aggregate support.";
  1047. return AST_TEST_NOT_RUN;
  1048. case TEST_EXECUTE:
  1049. break;
  1050. }
  1051. memset(&foreign_eid1, 0xAA, sizeof(foreign_eid1));
  1052. memset(&foreign_eid2, 0xBB, sizeof(foreign_eid2));
  1053. ast_test_validate(test, stasis_message_type_create("Cacheable", NULL, &cache_type) == STASIS_MESSAGE_TYPE_SUCCESS);
  1054. ast_test_validate(test, NULL != cache_type);
  1055. topic = stasis_topic_create("SomeTopic");
  1056. ast_test_validate(test, NULL != topic);
  1057. /* To consume events published to the topic. */
  1058. topic_consumer = consumer_create(1);
  1059. ast_test_validate(test, NULL != topic_consumer);
  1060. topic_sub = stasis_subscribe(topic, consumer_exec, topic_consumer);
  1061. ast_test_validate(test, NULL != topic_sub);
  1062. ao2_ref(topic_consumer, +1);
  1063. cache = stasis_cache_create_full(cache_test_data_id,
  1064. cache_test_aggregate_calc_fn, cache_test_aggregate_publish_fn);
  1065. ast_test_validate(test, NULL != cache);
  1066. caching_topic = stasis_caching_topic_create(topic, cache);
  1067. ast_test_validate(test, NULL != caching_topic);
  1068. /* To consume update events published to the caching_topic. */
  1069. cache_consumer = consumer_create(1);
  1070. ast_test_validate(test, NULL != cache_consumer);
  1071. cache_sub = stasis_subscribe(stasis_caching_get_topic(caching_topic), consumer_exec, cache_consumer);
  1072. ast_test_validate(test, NULL != cache_sub);
  1073. ao2_ref(cache_consumer, +1);
  1074. /* Create test messages. */
  1075. test_message1_1 = cache_test_message_create_full(cache_type, "1", "1", &ast_eid_default);
  1076. ast_test_validate(test, NULL != test_message1_1);
  1077. test_message2_1 = cache_test_message_create_full(cache_type, "2", "1", &ast_eid_default);
  1078. ast_test_validate(test, NULL != test_message2_1);
  1079. test_message2_2 = cache_test_message_create_full(cache_type, "2", "2", &foreign_eid1);
  1080. ast_test_validate(test, NULL != test_message2_2);
  1081. test_message2_3 = cache_test_message_create_full(cache_type, "2", "3", &foreign_eid2);
  1082. ast_test_validate(test, NULL != test_message2_3);
  1083. test_message2_4 = cache_test_message_create_full(cache_type, "2", "4", &foreign_eid2);
  1084. ast_test_validate(test, NULL != test_message2_4);
  1085. /* Post some snapshots */
  1086. stasis_publish(topic, test_message1_1);
  1087. ast_test_validate(test, check_cache_aggregate(cache, cache_type, "1", "1"));
  1088. stasis_publish(topic, test_message2_1);
  1089. ast_test_validate(test, check_cache_aggregate(cache, cache_type, "2", "1"));
  1090. stasis_publish(topic, test_message2_2);
  1091. ast_test_validate(test, check_cache_aggregate(cache, cache_type, "2", "3"));
  1092. actual_len = consumer_wait_for(cache_consumer, 6);
  1093. ast_test_validate(test, 6 == actual_len);
  1094. actual_len = consumer_wait_for(topic_consumer, 6);
  1095. ast_test_validate(test, 6 == actual_len);
  1096. /* Check the cache */
  1097. ao2_cleanup(cache_dump);
  1098. cache_dump = stasis_cache_dump_all(cache, NULL);
  1099. ast_test_validate(test, NULL != cache_dump);
  1100. ast_test_validate(test, 3 == ao2_container_count(cache_dump));
  1101. i = ao2_iterator_init(cache_dump, 0);
  1102. while ((obj = ao2_iterator_next(&i))) {
  1103. RAII_VAR(struct stasis_message *, actual_cache_entry, obj, ao2_cleanup);
  1104. ast_test_validate(test,
  1105. actual_cache_entry == test_message1_1
  1106. || actual_cache_entry == test_message2_1
  1107. || actual_cache_entry == test_message2_2);
  1108. }
  1109. ao2_iterator_destroy(&i);
  1110. /* Check the local cached items */
  1111. ao2_cleanup(cache_dump);
  1112. cache_dump = stasis_cache_dump_by_eid(cache, NULL, &ast_eid_default);
  1113. ast_test_validate(test, NULL != cache_dump);
  1114. ast_test_validate(test, 2 == ao2_container_count(cache_dump));
  1115. i = ao2_iterator_init(cache_dump, 0);
  1116. while ((obj = ao2_iterator_next(&i))) {
  1117. RAII_VAR(struct stasis_message *, actual_cache_entry, obj, ao2_cleanup);
  1118. ast_test_validate(test,
  1119. actual_cache_entry == test_message1_1
  1120. || actual_cache_entry == test_message2_1);
  1121. }
  1122. ao2_iterator_destroy(&i);
  1123. /* Post snapshot 2 from another eid. */
  1124. stasis_publish(topic, test_message2_3);
  1125. ast_test_validate(test, check_cache_aggregate(cache, cache_type, "2", "6"));
  1126. actual_len = consumer_wait_for(cache_consumer, 8);
  1127. ast_test_validate(test, 8 == actual_len);
  1128. actual_len = consumer_wait_for(topic_consumer, 8);
  1129. ast_test_validate(test, 8 == actual_len);
  1130. /* Check the cache */
  1131. ao2_cleanup(cache_dump);
  1132. cache_dump = stasis_cache_dump_all(cache, NULL);
  1133. ast_test_validate(test, NULL != cache_dump);
  1134. ast_test_validate(test, 4 == ao2_container_count(cache_dump));
  1135. i = ao2_iterator_init(cache_dump, 0);
  1136. while ((obj = ao2_iterator_next(&i))) {
  1137. RAII_VAR(struct stasis_message *, actual_cache_entry, obj, ao2_cleanup);
  1138. ast_test_validate(test,
  1139. actual_cache_entry == test_message1_1
  1140. || actual_cache_entry == test_message2_1
  1141. || actual_cache_entry == test_message2_2
  1142. || actual_cache_entry == test_message2_3);
  1143. }
  1144. ao2_iterator_destroy(&i);
  1145. /* Check the remote cached items */
  1146. ao2_cleanup(cache_dump);
  1147. cache_dump = stasis_cache_dump_by_eid(cache, NULL, &foreign_eid1);
  1148. ast_test_validate(test, NULL != cache_dump);
  1149. ast_test_validate(test, 1 == ao2_container_count(cache_dump));
  1150. i = ao2_iterator_init(cache_dump, 0);
  1151. while ((obj = ao2_iterator_next(&i))) {
  1152. RAII_VAR(struct stasis_message *, actual_cache_entry, obj, ao2_cleanup);
  1153. ast_test_validate(test, actual_cache_entry == test_message2_2);
  1154. }
  1155. ao2_iterator_destroy(&i);
  1156. /* Post snapshot 2 from a repeated eid. */
  1157. stasis_publish(topic, test_message2_4);
  1158. ast_test_validate(test, check_cache_aggregate(cache, cache_type, "2", "7"));
  1159. actual_len = consumer_wait_for(cache_consumer, 10);
  1160. ast_test_validate(test, 10 == actual_len);
  1161. actual_len = consumer_wait_for(topic_consumer, 10);
  1162. ast_test_validate(test, 10 == actual_len);
  1163. /* Check the cache */
  1164. ao2_cleanup(cache_dump);
  1165. cache_dump = stasis_cache_dump_all(cache, NULL);
  1166. ast_test_validate(test, NULL != cache_dump);
  1167. ast_test_validate(test, 4 == ao2_container_count(cache_dump));
  1168. i = ao2_iterator_init(cache_dump, 0);
  1169. while ((obj = ao2_iterator_next(&i))) {
  1170. RAII_VAR(struct stasis_message *, actual_cache_entry, obj, ao2_cleanup);
  1171. ast_test_validate(test,
  1172. actual_cache_entry == test_message1_1
  1173. || actual_cache_entry == test_message2_1
  1174. || actual_cache_entry == test_message2_2
  1175. || actual_cache_entry == test_message2_4);
  1176. }
  1177. ao2_iterator_destroy(&i);
  1178. /* Check all snapshot 2 cache entries. */
  1179. ao2_cleanup(cache_dump);
  1180. cache_dump = stasis_cache_get_all(cache, cache_type, "2");
  1181. ast_test_validate(test, NULL != cache_dump);
  1182. ast_test_validate(test, 3 == ao2_container_count(cache_dump));
  1183. i = ao2_iterator_init(cache_dump, 0);
  1184. while ((obj = ao2_iterator_next(&i))) {
  1185. RAII_VAR(struct stasis_message *, actual_cache_entry, obj, ao2_cleanup);
  1186. ast_test_validate(test,
  1187. actual_cache_entry == test_message2_1
  1188. || actual_cache_entry == test_message2_2
  1189. || actual_cache_entry == test_message2_4);
  1190. }
  1191. ao2_iterator_destroy(&i);
  1192. /* Clear snapshot 1 */
  1193. test_message1_clear = stasis_cache_clear_create(test_message1_1);
  1194. ast_test_validate(test, NULL != test_message1_clear);
  1195. stasis_publish(topic, test_message1_clear);
  1196. ast_test_validate(test, check_cache_aggregate(cache, cache_type, "1", NULL));
  1197. actual_len = consumer_wait_for(cache_consumer, 12);
  1198. ast_test_validate(test, 12 == actual_len);
  1199. actual_len = consumer_wait_for(topic_consumer, 11);
  1200. ast_test_validate(test, 11 == actual_len);
  1201. /* Check the cache */
  1202. ao2_cleanup(cache_dump);
  1203. cache_dump = stasis_cache_dump_all(cache, NULL);
  1204. ast_test_validate(test, NULL != cache_dump);
  1205. ast_test_validate(test, 3 == ao2_container_count(cache_dump));
  1206. i = ao2_iterator_init(cache_dump, 0);
  1207. while ((obj = ao2_iterator_next(&i))) {
  1208. RAII_VAR(struct stasis_message *, actual_cache_entry, obj, ao2_cleanup);
  1209. ast_test_validate(test,
  1210. actual_cache_entry == test_message2_1
  1211. || actual_cache_entry == test_message2_2
  1212. || actual_cache_entry == test_message2_4);
  1213. }
  1214. ao2_iterator_destroy(&i);
  1215. /* Clear snapshot 2 from a remote eid */
  1216. test_message2_clear = stasis_cache_clear_create(test_message2_2);
  1217. ast_test_validate(test, NULL != test_message2_clear);
  1218. stasis_publish(topic, test_message2_clear);
  1219. ast_test_validate(test, check_cache_aggregate(cache, cache_type, "2", "5"));
  1220. actual_len = consumer_wait_for(cache_consumer, 14);
  1221. ast_test_validate(test, 14 == actual_len);
  1222. actual_len = consumer_wait_for(topic_consumer, 13);
  1223. ast_test_validate(test, 13 == actual_len);
  1224. /* Check the cache */
  1225. ao2_cleanup(cache_dump);
  1226. cache_dump = stasis_cache_dump_all(cache, NULL);
  1227. ast_test_validate(test, NULL != cache_dump);
  1228. ast_test_validate(test, 2 == ao2_container_count(cache_dump));
  1229. i = ao2_iterator_init(cache_dump, 0);
  1230. while ((obj = ao2_iterator_next(&i))) {
  1231. RAII_VAR(struct stasis_message *, actual_cache_entry, obj, ao2_cleanup);
  1232. ast_test_validate(test,
  1233. actual_cache_entry == test_message2_1
  1234. || actual_cache_entry == test_message2_4);
  1235. }
  1236. ao2_iterator_destroy(&i);
  1237. return AST_TEST_PASS;
  1238. }
  1239. AST_TEST_DEFINE(router)
  1240. {
  1241. RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
  1242. RAII_VAR(struct stasis_message_router *, uut, NULL, stasis_message_router_unsubscribe_and_join);
  1243. RAII_VAR(char *, test_data, NULL, ao2_cleanup);
  1244. RAII_VAR(struct stasis_message_type *, test_message_type1, NULL, ao2_cleanup);
  1245. RAII_VAR(struct stasis_message_type *, test_message_type2, NULL, ao2_cleanup);
  1246. RAII_VAR(struct stasis_message_type *, test_message_type3, NULL, ao2_cleanup);
  1247. RAII_VAR(struct consumer *, consumer1, NULL, ao2_cleanup);
  1248. RAII_VAR(struct consumer *, consumer2, NULL, ao2_cleanup);
  1249. RAII_VAR(struct consumer *, consumer3, NULL, ao2_cleanup);
  1250. RAII_VAR(struct stasis_message *, test_message1, NULL, ao2_cleanup);
  1251. RAII_VAR(struct stasis_message *, test_message2, NULL, ao2_cleanup);
  1252. RAII_VAR(struct stasis_message *, test_message3, NULL, ao2_cleanup);
  1253. int actual_len, ret;
  1254. struct stasis_message *actual;
  1255. switch (cmd) {
  1256. case TEST_INIT:
  1257. info->name = __func__;
  1258. info->category = test_category;
  1259. info->summary = "Test simple message routing";
  1260. info->description = "Test simple message routing";
  1261. return AST_TEST_NOT_RUN;
  1262. case TEST_EXECUTE:
  1263. break;
  1264. }
  1265. topic = stasis_topic_create("TestTopic");
  1266. ast_test_validate(test, NULL != topic);
  1267. consumer1 = consumer_create(1);
  1268. ast_test_validate(test, NULL != consumer1);
  1269. consumer2 = consumer_create(1);
  1270. ast_test_validate(test, NULL != consumer2);
  1271. consumer3 = consumer_create(1);
  1272. ast_test_validate(test, NULL != consumer3);
  1273. ast_test_validate(test, stasis_message_type_create("TestMessage1", NULL, &test_message_type1) == STASIS_MESSAGE_TYPE_SUCCESS);
  1274. ast_test_validate(test, NULL != test_message_type1);
  1275. ast_test_validate(test, stasis_message_type_create("TestMessage2", NULL, &test_message_type2) == STASIS_MESSAGE_TYPE_SUCCESS);
  1276. ast_test_validate(test, NULL != test_message_type2);
  1277. ast_test_validate(test, stasis_message_type_create("TestMessage3", NULL, &test_message_type3) == STASIS_MESSAGE_TYPE_SUCCESS);
  1278. ast_test_validate(test, NULL != test_message_type3);
  1279. uut = stasis_message_router_create(topic);
  1280. ast_test_validate(test, NULL != uut);
  1281. ret = stasis_message_router_add(
  1282. uut, test_message_type1, consumer_exec, consumer1);
  1283. ast_test_validate(test, 0 == ret);
  1284. ao2_ref(consumer1, +1);
  1285. ret = stasis_message_router_add(
  1286. uut, test_message_type2, consumer_exec, consumer2);
  1287. ast_test_validate(test, 0 == ret);
  1288. ao2_ref(consumer2, +1);
  1289. ret = stasis_message_router_set_default(uut, consumer_exec, consumer3);
  1290. ast_test_validate(test, 0 == ret);
  1291. ao2_ref(consumer3, +1);
  1292. test_data = ao2_alloc(1, NULL);
  1293. ast_test_validate(test, NULL != test_data);
  1294. test_message1 = stasis_message_create(test_message_type1, test_data);
  1295. ast_test_validate(test, NULL != test_message1);
  1296. test_message2 = stasis_message_create(test_message_type2, test_data);
  1297. ast_test_validate(test, NULL != test_message2);
  1298. test_message3 = stasis_message_create(test_message_type3, test_data);
  1299. ast_test_validate(test, NULL != test_message3);
  1300. stasis_publish(topic, test_message1);
  1301. stasis_publish(topic, test_message2);
  1302. stasis_publish(topic, test_message3);
  1303. actual_len = consumer_wait_for(consumer1, 1);
  1304. ast_test_validate(test, 1 == actual_len);
  1305. actual_len = consumer_wait_for(consumer2, 1);
  1306. ast_test_validate(test, 1 == actual_len);
  1307. actual_len = consumer_wait_for(consumer3, 1);
  1308. ast_test_validate(test, 1 == actual_len);
  1309. actual = consumer1->messages_rxed[0];
  1310. ast_test_validate(test, test_message1 == actual);
  1311. actual = consumer2->messages_rxed[0];
  1312. ast_test_validate(test, test_message2 == actual);
  1313. actual = consumer3->messages_rxed[0];
  1314. ast_test_validate(test, test_message3 == actual);
  1315. /* consumer1 and consumer2 do not get the final message. */
  1316. ao2_cleanup(consumer1);
  1317. ao2_cleanup(consumer2);
  1318. return AST_TEST_PASS;
  1319. }
  1320. AST_TEST_DEFINE(router_pool)
  1321. {
  1322. RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
  1323. RAII_VAR(struct stasis_message_router *, uut, NULL, stasis_message_router_unsubscribe_and_join);
  1324. RAII_VAR(char *, test_data, NULL, ao2_cleanup);
  1325. RAII_VAR(struct stasis_message_type *, test_message_type1, NULL, ao2_cleanup);
  1326. RAII_VAR(struct stasis_message_type *, test_message_type2, NULL, ao2_cleanup);
  1327. RAII_VAR(struct stasis_message_type *, test_message_type3, NULL, ao2_cleanup);
  1328. RAII_VAR(struct consumer *, consumer1, NULL, ao2_cleanup);
  1329. RAII_VAR(struct consumer *, consumer2, NULL, ao2_cleanup);
  1330. RAII_VAR(struct consumer *, consumer3, NULL, ao2_cleanup);
  1331. RAII_VAR(struct stasis_message *, test_message1, NULL, ao2_cleanup);
  1332. RAII_VAR(struct stasis_message *, test_message2, NULL, ao2_cleanup);
  1333. RAII_VAR(struct stasis_message *, test_message3, NULL, ao2_cleanup);
  1334. int actual_len, ret;
  1335. struct stasis_message *actual;
  1336. switch (cmd) {
  1337. case TEST_INIT:
  1338. info->name = __func__;
  1339. info->category = test_category;
  1340. info->summary = "Test message routing via threadpool";
  1341. info->description = "Test simple message routing when\n"
  1342. "the subscriptions dictate usage of the Stasis\n"
  1343. "threadpool.";
  1344. return AST_TEST_NOT_RUN;
  1345. case TEST_EXECUTE:
  1346. break;
  1347. }
  1348. topic = stasis_topic_create("TestTopic");
  1349. ast_test_validate(test, NULL != topic);
  1350. consumer1 = consumer_create(1);
  1351. ast_test_validate(test, NULL != consumer1);
  1352. consumer2 = consumer_create(1);
  1353. ast_test_validate(test, NULL != consumer2);
  1354. consumer3 = consumer_create(1);
  1355. ast_test_validate(test, NULL != consumer3);
  1356. ast_test_validate(test, stasis_message_type_create("TestMessage1", NULL, &test_message_type1) == STASIS_MESSAGE_TYPE_SUCCESS);
  1357. ast_test_validate(test, NULL != test_message_type1);
  1358. ast_test_validate(test, stasis_message_type_create("TestMessage2", NULL, &test_message_type2) == STASIS_MESSAGE_TYPE_SUCCESS);
  1359. ast_test_validate(test, NULL != test_message_type2);
  1360. ast_test_validate(test, stasis_message_type_create("TestMessage3", NULL, &test_message_type3) == STASIS_MESSAGE_TYPE_SUCCESS);
  1361. ast_test_validate(test, NULL != test_message_type3);
  1362. uut = stasis_message_router_create_pool(topic);
  1363. ast_test_validate(test, NULL != uut);
  1364. ret = stasis_message_router_add(
  1365. uut, test_message_type1, consumer_exec, consumer1);
  1366. ast_test_validate(test, 0 == ret);
  1367. ao2_ref(consumer1, +1);
  1368. ret = stasis_message_router_add(
  1369. uut, test_message_type2, consumer_exec, consumer2);
  1370. ast_test_validate(test, 0 == ret);
  1371. ao2_ref(consumer2, +1);
  1372. ret = stasis_message_router_set_default(uut, consumer_exec, consumer3);
  1373. ast_test_validate(test, 0 == ret);
  1374. ao2_ref(consumer3, +1);
  1375. test_data = ao2_alloc(1, NULL);
  1376. ast_test_validate(test, NULL != test_data);
  1377. test_message1 = stasis_message_create(test_message_type1, test_data);
  1378. ast_test_validate(test, NULL != test_message1);
  1379. test_message2 = stasis_message_create(test_message_type2, test_data);
  1380. ast_test_validate(test, NULL != test_message2);
  1381. test_message3 = stasis_message_create(test_message_type3, test_data);
  1382. ast_test_validate(test, NULL != test_message3);
  1383. stasis_publish(topic, test_message1);
  1384. stasis_publish(topic, test_message2);
  1385. stasis_publish(topic, test_message3);
  1386. actual_len = consumer_wait_for(consumer1, 1);
  1387. ast_test_validate(test, 1 == actual_len);
  1388. actual_len = consumer_wait_for(consumer2, 1);
  1389. ast_test_validate(test, 1 == actual_len);
  1390. actual_len = consumer_wait_for(consumer3, 1);
  1391. ast_test_validate(test, 1 == actual_len);
  1392. actual = consumer1->messages_rxed[0];
  1393. ast_test_validate(test, test_message1 == actual);
  1394. actual = consumer2->messages_rxed[0];
  1395. ast_test_validate(test, test_message2 == actual);
  1396. actual = consumer3->messages_rxed[0];
  1397. ast_test_validate(test, test_message3 == actual);
  1398. /* consumer1 and consumer2 do not get the final message. */
  1399. ao2_cleanup(consumer1);
  1400. ao2_cleanup(consumer2);
  1401. return AST_TEST_PASS;
  1402. }
  1403. static const char *cache_simple(struct stasis_message *message)
  1404. {
  1405. const char *type_name =
  1406. stasis_message_type_name(stasis_message_type(message));
  1407. if (!ast_begins_with(type_name, "Cache")) {
  1408. return NULL;
  1409. }
  1410. return "cached";
  1411. }
  1412. AST_TEST_DEFINE(router_cache_updates)
  1413. {
  1414. RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
  1415. RAII_VAR(struct stasis_cache *, cache, NULL, ao2_cleanup);
  1416. RAII_VAR(struct stasis_caching_topic *, caching_topic, NULL, stasis_caching_unsubscribe_and_join);
  1417. RAII_VAR(struct stasis_message_type *, test_message_type1, NULL, ao2_cleanup);
  1418. RAII_VAR(struct stasis_message_type *, test_message_type2, NULL, ao2_cleanup);
  1419. RAII_VAR(struct stasis_message_type *, test_message_type3, NULL, ao2_cleanup);
  1420. RAII_VAR(struct stasis_message_router *, uut, NULL, stasis_message_router_unsubscribe_and_join);
  1421. RAII_VAR(char *, test_data, NULL, ao2_cleanup);
  1422. RAII_VAR(struct stasis_message *, test_message1, NULL, ao2_cleanup);
  1423. RAII_VAR(struct stasis_message *, test_message2, NULL, ao2_cleanup);
  1424. RAII_VAR(struct stasis_message *, test_message3, NULL, ao2_cleanup);
  1425. RAII_VAR(struct consumer *, consumer1, NULL, ao2_cleanup);
  1426. RAII_VAR(struct consumer *, consumer2, NULL, ao2_cleanup);
  1427. RAII_VAR(struct consumer *, consumer3, NULL, ao2_cleanup);
  1428. RAII_VAR(struct stasis_message *, message1, NULL, ao2_cleanup);
  1429. RAII_VAR(struct stasis_message *, message2, NULL, ao2_cleanup);
  1430. struct stasis_cache_update *update;
  1431. int actual_len, ret;
  1432. struct stasis_message *actual;
  1433. switch (cmd) {
  1434. case TEST_INIT:
  1435. info->name = __func__;
  1436. info->category = test_category;
  1437. info->summary = "Test special handling cache_update messages";
  1438. info->description = "Test special handling cache_update messages";
  1439. return AST_TEST_NOT_RUN;
  1440. case TEST_EXECUTE:
  1441. break;
  1442. }
  1443. topic = stasis_topic_create("TestTopic");
  1444. ast_test_validate(test, NULL != topic);
  1445. cache = stasis_cache_create(cache_simple);
  1446. ast_test_validate(test, NULL != cache);
  1447. caching_topic = stasis_caching_topic_create(topic, cache);
  1448. ast_test_validate(test, NULL != caching_topic);
  1449. consumer1 = consumer_create(1);
  1450. ast_test_validate(test, NULL != consumer1);
  1451. consumer2 = consumer_create(1);
  1452. ast_test_validate(test, NULL != consumer2);
  1453. consumer3 = consumer_create(1);
  1454. ast_test_validate(test, NULL != consumer3);
  1455. ast_test_validate(test, stasis_message_type_create("Cache1", NULL, &test_message_type1) == STASIS_MESSAGE_TYPE_SUCCESS);
  1456. ast_test_validate(test, NULL != test_message_type1);
  1457. ast_test_validate(test, stasis_message_type_create("Cache2", NULL, &test_message_type2) == STASIS_MESSAGE_TYPE_SUCCESS);
  1458. ast_test_validate(test, NULL != test_message_type2);
  1459. ast_test_validate(test, stasis_message_type_create("NonCache", NULL, &test_message_type3) == STASIS_MESSAGE_TYPE_SUCCESS);
  1460. ast_test_validate(test, NULL != test_message_type3);
  1461. uut = stasis_message_router_create(
  1462. stasis_caching_get_topic(caching_topic));
  1463. ast_test_validate(test, NULL != uut);
  1464. ret = stasis_message_router_add_cache_update(
  1465. uut, test_message_type1, consumer_exec, consumer1);
  1466. ast_test_validate(test, 0 == ret);
  1467. ao2_ref(consumer1, +1);
  1468. ret = stasis_message_router_add(
  1469. uut, stasis_cache_update_type(), consumer_exec, consumer2);
  1470. ast_test_validate(test, 0 == ret);
  1471. ao2_ref(consumer2, +1);
  1472. ret = stasis_message_router_set_default(uut, consumer_exec, consumer3);
  1473. ast_test_validate(test, 0 == ret);
  1474. ao2_ref(consumer3, +1);
  1475. test_data = ao2_alloc(1, NULL);
  1476. ast_test_validate(test, NULL != test_data);
  1477. test_message1 = stasis_message_create(test_message_type1, test_data);
  1478. ast_test_validate(test, NULL != test_message1);
  1479. test_message2 = stasis_message_create(test_message_type2, test_data);
  1480. ast_test_validate(test, NULL != test_message2);
  1481. test_message3 = stasis_message_create(test_message_type3, test_data);
  1482. ast_test_validate(test, NULL != test_message3);
  1483. stasis_publish(topic, test_message1);
  1484. stasis_publish(topic, test_message2);
  1485. stasis_publish(topic, test_message3);
  1486. actual_len = consumer_wait_for(consumer1, 1);
  1487. ast_test_validate(test, 1 == actual_len);
  1488. actual_len = consumer_wait_for(consumer2, 1);
  1489. ast_test_validate(test, 1 == actual_len);
  1490. /* Uncacheable message should not be passed through */
  1491. actual_len = consumer_should_stay(consumer3, 0);
  1492. ast_test_validate(test, 0 == actual_len);
  1493. actual = consumer1->messages_rxed[0];
  1494. ast_test_validate(test, stasis_cache_update_type() == stasis_message_type(actual));
  1495. update = stasis_message_data(actual);
  1496. ast_test_validate(test, test_message_type1 == update->type);
  1497. ast_test_validate(test, test_message1 == update->new_snapshot);
  1498. actual = consumer2->messages_rxed[0];
  1499. ast_test_validate(test, stasis_cache_update_type() == stasis_message_type(actual));
  1500. update = stasis_message_data(actual);
  1501. ast_test_validate(test, test_message_type2 == update->type);
  1502. ast_test_validate(test, test_message2 == update->new_snapshot);
  1503. /* consumer1 and consumer2 do not get the final message. */
  1504. ao2_cleanup(consumer1);
  1505. ao2_cleanup(consumer2);
  1506. return AST_TEST_PASS;
  1507. }
  1508. AST_TEST_DEFINE(no_to_json)
  1509. {
  1510. RAII_VAR(struct stasis_message_type *, type, NULL, ao2_cleanup);
  1511. RAII_VAR(struct stasis_message *, uut, NULL, ao2_cleanup);
  1512. RAII_VAR(char *, data, NULL, ao2_cleanup);
  1513. RAII_VAR(struct ast_json *, actual, NULL, ast_json_unref);
  1514. char *expected = "SomeData";
  1515. switch (cmd) {
  1516. case TEST_INIT:
  1517. info->name = __func__;
  1518. info->category = test_category;
  1519. info->summary = "Test message to_json function";
  1520. info->description = "Test message to_json function";
  1521. return AST_TEST_NOT_RUN;
  1522. case TEST_EXECUTE:
  1523. break;
  1524. }
  1525. /* Test NULL */
  1526. actual = stasis_message_to_json(NULL, NULL);
  1527. ast_test_validate(test, NULL == actual);
  1528. /* Test message with NULL to_json function */
  1529. ast_test_validate(test, stasis_message_type_create("SomeMessage", NULL, &type) == STASIS_MESSAGE_TYPE_SUCCESS);
  1530. data = ao2_alloc(strlen(expected) + 1, NULL);
  1531. strcpy(data, expected);
  1532. uut = stasis_message_create(type, data);
  1533. ast_test_validate(test, NULL != uut);
  1534. actual = stasis_message_to_json(uut, NULL);
  1535. ast_test_validate(test, NULL == actual);
  1536. return AST_TEST_PASS;
  1537. }
  1538. AST_TEST_DEFINE(to_json)
  1539. {
  1540. RAII_VAR(struct stasis_message_type *, type, NULL, ao2_cleanup);
  1541. RAII_VAR(struct stasis_message *, uut, NULL, ao2_cleanup);
  1542. RAII_VAR(char *, data, NULL, ao2_cleanup);
  1543. RAII_VAR(struct ast_json *, actual, NULL, ast_json_unref);
  1544. const char *expected_text = "SomeData";
  1545. RAII_VAR(struct ast_json *, expected, NULL, ast_json_unref);
  1546. switch (cmd) {
  1547. case TEST_INIT:
  1548. info->name = __func__;
  1549. info->category = test_category;
  1550. info->summary = "Test message to_json function when NULL";
  1551. info->description = "Test message to_json function when NULL";
  1552. return AST_TEST_NOT_RUN;
  1553. case TEST_EXECUTE:
  1554. break;
  1555. }
  1556. ast_test_validate(test, stasis_message_type_create("SomeMessage", &fake_vtable, &type) == STASIS_MESSAGE_TYPE_SUCCESS);
  1557. data = ao2_alloc(strlen(expected_text) + 1, NULL);
  1558. strcpy(data, expected_text);
  1559. uut = stasis_message_create(type, data);
  1560. ast_test_validate(test, NULL != uut);
  1561. expected = ast_json_string_create(expected_text);
  1562. actual = stasis_message_to_json(uut, NULL);
  1563. ast_test_validate(test, ast_json_equal(expected, actual));
  1564. return AST_TEST_PASS;
  1565. }
  1566. AST_TEST_DEFINE(no_to_ami)
  1567. {
  1568. RAII_VAR(struct stasis_message_type *, type, NULL, ao2_cleanup);
  1569. RAII_VAR(struct stasis_message *, uut, NULL, ao2_cleanup);
  1570. RAII_VAR(char *, data, NULL, ao2_cleanup);
  1571. RAII_VAR(struct ast_manager_event_blob *, actual, NULL, ao2_cleanup);
  1572. char *expected = "SomeData";
  1573. switch (cmd) {
  1574. case TEST_INIT:
  1575. info->name = __func__;
  1576. info->category = test_category;
  1577. info->summary = "Test message to_ami function when NULL";
  1578. info->description = "Test message to_ami function when NULL";
  1579. return AST_TEST_NOT_RUN;
  1580. case TEST_EXECUTE:
  1581. break;
  1582. }
  1583. /* Test NULL */
  1584. actual = stasis_message_to_ami(NULL);
  1585. ast_test_validate(test, NULL == actual);
  1586. /* Test message with NULL to_ami function */
  1587. ast_test_validate(test, stasis_message_type_create("SomeMessage", NULL, &type) == STASIS_MESSAGE_TYPE_SUCCESS);
  1588. data = ao2_alloc(strlen(expected) + 1, NULL);
  1589. strcpy(data, expected);
  1590. uut = stasis_message_create(type, data);
  1591. ast_test_validate(test, NULL != uut);
  1592. actual = stasis_message_to_ami(uut);
  1593. ast_test_validate(test, NULL == actual);
  1594. return AST_TEST_PASS;
  1595. }
  1596. AST_TEST_DEFINE(to_ami)
  1597. {
  1598. RAII_VAR(struct stasis_message_type *, type, NULL, ao2_cleanup);
  1599. RAII_VAR(struct stasis_message *, uut, NULL, ao2_cleanup);
  1600. RAII_VAR(char *, data, NULL, ao2_cleanup);
  1601. RAII_VAR(struct ast_manager_event_blob *, actual, NULL, ao2_cleanup);
  1602. const char *expected_text = "SomeData";
  1603. const char *expected = "Message: SomeData\r\n";
  1604. switch (cmd) {
  1605. case TEST_INIT:
  1606. info->name = __func__;
  1607. info->category = test_category;
  1608. info->summary = "Test message to_ami function";
  1609. info->description = "Test message to_ami function";
  1610. return AST_TEST_NOT_RUN;
  1611. case TEST_EXECUTE:
  1612. break;
  1613. }
  1614. ast_test_validate(test, stasis_message_type_create("SomeMessage", &fake_vtable, &type) == STASIS_MESSAGE_TYPE_SUCCESS);
  1615. data = ao2_alloc(strlen(expected_text) + 1, NULL);
  1616. strcpy(data, expected_text);
  1617. uut = stasis_message_create(type, data);
  1618. ast_test_validate(test, NULL != uut);
  1619. actual = stasis_message_to_ami(uut);
  1620. ast_test_validate(test, strcmp(expected, actual->extra_fields) == 0);
  1621. return AST_TEST_PASS;
  1622. }
  1623. static void noop(void *data, struct stasis_subscription *sub,
  1624. struct stasis_message *message)
  1625. {
  1626. /* no-op */
  1627. }
  1628. AST_TEST_DEFINE(dtor_order)
  1629. {
  1630. RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
  1631. RAII_VAR(struct stasis_subscription *, sub, NULL, stasis_unsubscribe);
  1632. switch (cmd) {
  1633. case TEST_INIT:
  1634. info->name = __func__;
  1635. info->category = test_category;
  1636. info->summary = "Test that destruction order doesn't bomb stuff";
  1637. info->description = "Test that destruction order doesn't bomb stuff";
  1638. return AST_TEST_NOT_RUN;
  1639. case TEST_EXECUTE:
  1640. break;
  1641. }
  1642. topic = stasis_topic_create("test-topic");
  1643. ast_test_validate(test, NULL != topic);
  1644. sub = stasis_subscribe(topic, noop, NULL);
  1645. ast_test_validate(test, NULL != sub);
  1646. /* With any luck, this won't completely blow everything up */
  1647. ao2_cleanup(topic);
  1648. stasis_unsubscribe(sub);
  1649. /* These refs were cleaned up manually */
  1650. topic = NULL;
  1651. sub = NULL;
  1652. return AST_TEST_PASS;
  1653. }
  1654. static const char *noop_get_id(struct stasis_message *message)
  1655. {
  1656. return NULL;
  1657. }
  1658. AST_TEST_DEFINE(caching_dtor_order)
  1659. {
  1660. RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
  1661. RAII_VAR(struct stasis_cache *, cache, NULL, ao2_cleanup);
  1662. RAII_VAR(struct stasis_caching_topic *, caching_topic, NULL,
  1663. stasis_caching_unsubscribe);
  1664. RAII_VAR(struct stasis_subscription *, sub, NULL, stasis_unsubscribe);
  1665. switch (cmd) {
  1666. case TEST_INIT:
  1667. info->name = __func__;
  1668. info->category = test_category;
  1669. info->summary = "Test that destruction order doesn't bomb stuff";
  1670. info->description = "Test that destruction order doesn't bomb stuff";
  1671. return AST_TEST_NOT_RUN;
  1672. case TEST_EXECUTE:
  1673. break;
  1674. }
  1675. cache = stasis_cache_create(noop_get_id);
  1676. ast_test_validate(test, NULL != cache);
  1677. topic = stasis_topic_create("test-topic");
  1678. ast_test_validate(test, NULL != topic);
  1679. caching_topic = stasis_caching_topic_create(topic, cache);
  1680. ast_test_validate(test, NULL != caching_topic);
  1681. sub = stasis_subscribe(stasis_caching_get_topic(caching_topic), noop,
  1682. NULL);
  1683. ast_test_validate(test, NULL != sub);
  1684. /* With any luck, this won't completely blow everything up */
  1685. ao2_cleanup(cache);
  1686. ao2_cleanup(topic);
  1687. stasis_caching_unsubscribe(caching_topic);
  1688. stasis_unsubscribe(sub);
  1689. /* These refs were cleaned up manually */
  1690. cache = NULL;
  1691. topic = NULL;
  1692. caching_topic = NULL;
  1693. sub = NULL;
  1694. return AST_TEST_PASS;
  1695. }
  1696. struct test_message_types {
  1697. struct stasis_message_type *none;
  1698. struct stasis_message_type *ami;
  1699. struct stasis_message_type *json;
  1700. struct stasis_message_type *event;
  1701. struct stasis_message_type *amievent;
  1702. struct stasis_message_type *type1;
  1703. struct stasis_message_type *type2;
  1704. struct stasis_message_type *type3;
  1705. struct stasis_message_type *change;
  1706. };
  1707. static void destroy_message_types(void *obj)
  1708. {
  1709. struct test_message_types *types = obj;
  1710. ao2_cleanup(types->none);
  1711. ao2_cleanup(types->ami);
  1712. ao2_cleanup(types->json);
  1713. ao2_cleanup(types->event);
  1714. ao2_cleanup(types->amievent);
  1715. ao2_cleanup(types->type1);
  1716. ao2_cleanup(types->type2);
  1717. ao2_cleanup(types->type3);
  1718. /* N.B. Don't cleanup types->change! */
  1719. }
  1720. static struct test_message_types *create_message_types(struct ast_test *test)
  1721. {
  1722. struct stasis_message_vtable vtable = { 0 };
  1723. struct test_message_types *types;
  1724. enum ast_test_result_state __attribute__ ((unused)) rc;
  1725. types = ao2_alloc(sizeof(*types), destroy_message_types);
  1726. if (!types) {
  1727. return NULL;
  1728. }
  1729. ast_test_validate_cleanup(test,
  1730. stasis_message_type_create("TestMessageNONE", &vtable, &types->none) == STASIS_MESSAGE_TYPE_SUCCESS,
  1731. rc, cleanup);
  1732. vtable.to_ami = fake_ami;
  1733. ast_test_validate_cleanup(test,
  1734. stasis_message_type_create("TestMessageAMI", &vtable, &types->ami) == STASIS_MESSAGE_TYPE_SUCCESS,
  1735. rc, cleanup);
  1736. vtable.to_ami = NULL;
  1737. vtable.to_json = fake_json;
  1738. ast_test_validate_cleanup(test,
  1739. stasis_message_type_create("TestMessageJSON", &vtable, &types->json) == STASIS_MESSAGE_TYPE_SUCCESS,
  1740. rc, cleanup);
  1741. vtable.to_ami = NULL;
  1742. vtable.to_json = NULL;
  1743. vtable.to_event = fake_event;
  1744. ast_test_validate_cleanup(test,
  1745. stasis_message_type_create("TestMessageEVENT", &vtable, &types->event) == STASIS_MESSAGE_TYPE_SUCCESS,
  1746. rc, cleanup);
  1747. vtable.to_ami = fake_ami;
  1748. ast_test_validate_cleanup(test,
  1749. stasis_message_type_create("TestMessageAMIEVENT", &vtable, &types->amievent) == STASIS_MESSAGE_TYPE_SUCCESS,
  1750. rc, cleanup);
  1751. ast_test_validate_cleanup(test,
  1752. stasis_message_type_create("TestMessageType1", NULL, &types->type1) == STASIS_MESSAGE_TYPE_SUCCESS,
  1753. rc, cleanup);
  1754. ast_test_validate_cleanup(test,
  1755. stasis_message_type_create("TestMessageType2", NULL, &types->type2) == STASIS_MESSAGE_TYPE_SUCCESS,
  1756. rc, cleanup);
  1757. ast_test_validate_cleanup(test,
  1758. stasis_message_type_create("TestMessageType3", NULL, &types->type3) == STASIS_MESSAGE_TYPE_SUCCESS,
  1759. rc, cleanup);
  1760. types->change = stasis_subscription_change_type();
  1761. return types;
  1762. cleanup:
  1763. ao2_cleanup(types);
  1764. return NULL;
  1765. }
  1766. struct cts {
  1767. struct consumer *consumer;
  1768. struct stasis_topic *topic;
  1769. struct stasis_subscription *sub;
  1770. };
  1771. static void destroy_cts(void *obj)
  1772. {
  1773. struct cts *c = obj;
  1774. stasis_unsubscribe(c->sub);
  1775. ao2_cleanup(c->topic);
  1776. ao2_cleanup(c->consumer);
  1777. }
  1778. static struct cts *create_cts(struct ast_test *test)
  1779. {
  1780. struct cts *cts = ao2_alloc(sizeof(*cts), destroy_cts);
  1781. enum ast_test_result_state __attribute__ ((unused)) rc;
  1782. ast_test_validate_cleanup(test, cts, rc, cleanup);
  1783. cts->topic = stasis_topic_create("TestTopic");
  1784. ast_test_validate_cleanup(test, NULL != cts->topic, rc, cleanup);
  1785. cts->consumer = consumer_create(0);
  1786. ast_test_validate_cleanup(test, NULL != cts->consumer, rc, cleanup);
  1787. ao2_ref(cts->consumer, +1);
  1788. cts->sub = stasis_subscribe(cts->topic, consumer_exec, cts->consumer);
  1789. ast_test_validate_cleanup(test, NULL != cts->sub, rc, cleanup);
  1790. return cts;
  1791. cleanup:
  1792. ao2_cleanup(cts);
  1793. return NULL;
  1794. }
  1795. static int is_msg(struct stasis_message *msg, struct stasis_message_type *mtype, const char *data)
  1796. {
  1797. struct stasis_subscription_change *msg_data = stasis_message_data(msg);
  1798. if (stasis_message_type(msg) != mtype) {
  1799. return 0;
  1800. }
  1801. if (data) {
  1802. return (strcmp(data, msg_data->description) == 0);
  1803. }
  1804. return 1;
  1805. }
  1806. static void dump_consumer(struct ast_test *test, struct cts *cts)
  1807. {
  1808. int i;
  1809. struct stasis_subscription_change *data;
  1810. ast_test_status_update(test, "Messages received: %ld Final? %s\n", cts->consumer->messages_rxed_len,
  1811. cts->consumer->complete ? "yes" : "no");
  1812. for (i = 0; i < cts->consumer->messages_rxed_len; i++) {
  1813. data = stasis_message_data(cts->consumer->messages_rxed[i]);
  1814. ast_test_status_update(test, "Message type received: %s %s\n",
  1815. stasis_message_type_name(stasis_message_type(cts->consumer->messages_rxed[i])),
  1816. data && data->description ? data->description : "no data");
  1817. }
  1818. }
  1819. static int send_msg(struct ast_test *test, struct cts *cts, struct stasis_message_type *msg_type,
  1820. const char *data)
  1821. {
  1822. struct stasis_message *msg;
  1823. struct stasis_subscription_change *test_data =
  1824. ao2_alloc(sizeof(*test_data) + (data ? strlen(data) : strlen("no data")) + 1, NULL);
  1825. if (!test_data) {
  1826. return 0;
  1827. }
  1828. strcpy(test_data->description, S_OR(data, "no data")); /* Safe */
  1829. msg = stasis_message_create(msg_type, test_data);
  1830. ao2_ref(test_data, -1);
  1831. if (!msg) {
  1832. ast_test_status_update(test, "Unable to create %s message\n",
  1833. stasis_message_type_name(msg_type));
  1834. return 0;
  1835. }
  1836. stasis_publish(cts->topic, msg);
  1837. ao2_ref(msg, -1);
  1838. return 1;
  1839. }
  1840. AST_TEST_DEFINE(type_filters)
  1841. {
  1842. RAII_VAR(struct cts *, cts, NULL, ao2_cleanup);
  1843. RAII_VAR(struct test_message_types *, types, NULL, ao2_cleanup);
  1844. int ix = 0;
  1845. switch (cmd) {
  1846. case TEST_INIT:
  1847. info->name = __func__;
  1848. info->category = test_category "filtering/";
  1849. info->summary = "Test message filtering by type";
  1850. info->description = "Test message filtering by type";
  1851. return AST_TEST_NOT_RUN;
  1852. case TEST_EXECUTE:
  1853. break;
  1854. }
  1855. types = create_message_types(test);
  1856. ast_test_validate(test, NULL != types);
  1857. cts = create_cts(test);
  1858. ast_test_validate(test, NULL != cts);
  1859. ast_test_validate(test, stasis_subscription_accept_message_type(cts->sub, types->type1) == 0);
  1860. ast_test_validate(test, stasis_subscription_accept_message_type(cts->sub, types->type2) == 0);
  1861. ast_test_validate(test, stasis_subscription_accept_message_type(cts->sub, types->change) == 0);
  1862. ast_test_validate(test, stasis_subscription_set_filter(cts->sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE) == 0);
  1863. /* We should get these */
  1864. ast_test_validate(test, send_msg(test, cts, types->type1, "Pass"));
  1865. ast_test_validate(test, send_msg(test, cts, types->type2, "Pass"));
  1866. /* ... but not this one */
  1867. ast_test_validate(test, send_msg(test, cts, types->type3, "FAIL"));
  1868. /* Wait for change(subscribe) and "Pass" messages */
  1869. consumer_wait_for(cts->consumer, 3);
  1870. /* Remove type 1 */
  1871. ast_test_validate(test, stasis_subscription_decline_message_type(cts->sub, types->type1) == 0);
  1872. /* We should now NOT get this one */
  1873. ast_test_validate(test, send_msg(test, cts, types->type1, "FAIL"));
  1874. /* We should get this one (again) */
  1875. ast_test_validate(test, send_msg(test, cts, types->type2, "Pass2"));
  1876. /* We still should NOT get this one */
  1877. ast_test_validate(test, send_msg(test, cts, types->type3, "FAIL"));
  1878. /* We should now have a second type2 */
  1879. consumer_wait_for(cts->consumer, 4);
  1880. stasis_unsubscribe(cts->sub);
  1881. cts->sub = NULL;
  1882. consumer_wait_for_completion(cts->consumer);
  1883. dump_consumer(test, cts);
  1884. ast_test_validate(test, 1 == cts->consumer->complete);
  1885. ast_test_validate(test, 5 == cts->consumer->messages_rxed_len);
  1886. ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->change, "Subscribe"));
  1887. ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->type1, "Pass"));
  1888. ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->type2, "Pass"));
  1889. ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->type2, "Pass2"));
  1890. ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->change, "Unsubscribe"));
  1891. return AST_TEST_PASS;
  1892. }
  1893. AST_TEST_DEFINE(formatter_filters)
  1894. {
  1895. RAII_VAR(struct cts *, cts, NULL, ao2_cleanup);
  1896. RAII_VAR(struct test_message_types *, types, NULL, ao2_cleanup) ;
  1897. int ix = 0;
  1898. switch (cmd) {
  1899. case TEST_INIT:
  1900. info->name = __func__;
  1901. info->category = test_category "filtering/";
  1902. info->summary = "Test message filtering by formatter";
  1903. info->description = "Test message filtering by formatter";
  1904. return AST_TEST_NOT_RUN;
  1905. case TEST_EXECUTE:
  1906. break;
  1907. }
  1908. types = create_message_types(test);
  1909. ast_test_validate(test, NULL != types);
  1910. cts = create_cts(test);
  1911. ast_test_validate(test, NULL != cts);
  1912. stasis_subscription_accept_formatters(cts->sub,
  1913. STASIS_SUBSCRIPTION_FORMATTER_AMI | STASIS_SUBSCRIPTION_FORMATTER_JSON);
  1914. /* We should get these */
  1915. ast_test_validate(test, send_msg(test, cts, types->ami, "Pass"));
  1916. ast_test_validate(test, send_msg(test, cts, types->json, "Pass"));
  1917. ast_test_validate(test, send_msg(test, cts, types->amievent, "Pass"));
  1918. /* ... but not these */
  1919. ast_test_validate(test, send_msg(test, cts, types->none, "FAIL"));
  1920. ast_test_validate(test, send_msg(test, cts, types->event, "FAIL"));
  1921. ast_test_validate(test, send_msg(test, cts, types->type1, "FAIL"));
  1922. /* Wait for change(subscribe) and the "Pass" messages */
  1923. consumer_wait_for(cts->consumer, 4);
  1924. /* Change the subscription to accept only event formatters */
  1925. stasis_subscription_accept_formatters(cts->sub, STASIS_SUBSCRIPTION_FORMATTER_EVENT);
  1926. /* We should NOT get these now */
  1927. ast_test_validate(test, send_msg(test, cts, types->ami, "FAIL"));
  1928. ast_test_validate(test, send_msg(test, cts, types->json, "FAIL"));
  1929. /* ... but we should still get this one */
  1930. ast_test_validate(test, send_msg(test, cts, types->amievent, "Pass2"));
  1931. /* ... and this one should be new */
  1932. ast_test_validate(test, send_msg(test, cts, types->event, "Pass"));
  1933. /* We should now have a second amievent */
  1934. consumer_wait_for(cts->consumer, 6);
  1935. stasis_unsubscribe(cts->sub);
  1936. cts->sub = NULL;
  1937. consumer_wait_for_completion(cts->consumer);
  1938. dump_consumer(test, cts);
  1939. ast_test_validate(test, 1 == cts->consumer->complete);
  1940. ast_test_validate(test, 7 == cts->consumer->messages_rxed_len);
  1941. ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->change, "Subscribe"));
  1942. ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->ami, "Pass"));
  1943. ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->json, "Pass"));
  1944. ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->amievent, "Pass"));
  1945. ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->amievent, "Pass2"));
  1946. ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->event, "Pass"));
  1947. ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->change, "Unsubscribe"));
  1948. return AST_TEST_PASS;
  1949. }
  1950. AST_TEST_DEFINE(combo_filters)
  1951. {
  1952. RAII_VAR(struct cts *, cts, NULL, ao2_cleanup);
  1953. RAII_VAR(struct test_message_types *, types, NULL, ao2_cleanup);
  1954. int ix = 0;
  1955. switch (cmd) {
  1956. case TEST_INIT:
  1957. info->name = __func__;
  1958. info->category = test_category "filtering/";
  1959. info->summary = "Test message filtering by type and formatter";
  1960. info->description = "Test message filtering by type and formatter";
  1961. return AST_TEST_NOT_RUN;
  1962. case TEST_EXECUTE:
  1963. break;
  1964. }
  1965. types = create_message_types(test);
  1966. ast_test_validate(test, NULL != types);
  1967. cts = create_cts(test);
  1968. ast_test_validate(test, NULL != cts);
  1969. ast_test_validate(test, stasis_subscription_accept_message_type(cts->sub, types->type1) == 0);
  1970. ast_test_validate(test, stasis_subscription_accept_message_type(cts->sub, types->type2) == 0);
  1971. ast_test_validate(test, stasis_subscription_accept_message_type(cts->sub, types->change) == 0);
  1972. ast_test_validate(test, stasis_subscription_set_filter(cts->sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE) == 0);
  1973. stasis_subscription_accept_formatters(cts->sub,
  1974. STASIS_SUBSCRIPTION_FORMATTER_AMI | STASIS_SUBSCRIPTION_FORMATTER_JSON);
  1975. /* We should get these */
  1976. ast_test_validate(test, send_msg(test, cts, types->type1, "Pass"));
  1977. ast_test_validate(test, send_msg(test, cts, types->type2, "Pass"));
  1978. ast_test_validate(test, send_msg(test, cts, types->ami, "Pass"));
  1979. ast_test_validate(test, send_msg(test, cts, types->amievent, "Pass"));
  1980. ast_test_validate(test, send_msg(test, cts, types->json, "Pass"));
  1981. /* ... but not these */
  1982. ast_test_validate(test, send_msg(test, cts, types->type3, "FAIL"));
  1983. ast_test_validate(test, send_msg(test, cts, types->event, "FAIL"));
  1984. /* Wait for change(subscribe) and the "Pass" messages */
  1985. consumer_wait_for(cts->consumer, 6);
  1986. stasis_unsubscribe(cts->sub);
  1987. cts->sub = NULL;
  1988. consumer_wait_for_completion(cts->consumer);
  1989. dump_consumer(test, cts);
  1990. ast_test_validate(test, 1 == cts->consumer->complete);
  1991. ast_test_validate(test, 7 == cts->consumer->messages_rxed_len);
  1992. ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->change, "Subscribe"));
  1993. ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->type1, "Pass"));
  1994. ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->type2, "Pass"));
  1995. ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->ami, "Pass"));
  1996. ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->amievent, "Pass"));
  1997. ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->json, "Pass"));
  1998. ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->change, "Unsubscribe"));
  1999. return AST_TEST_PASS;
  2000. }
  2001. static int unload_module(void)
  2002. {
  2003. AST_TEST_UNREGISTER(message_type);
  2004. AST_TEST_UNREGISTER(message);
  2005. AST_TEST_UNREGISTER(subscription_messages);
  2006. AST_TEST_UNREGISTER(subscription_pool_messages);
  2007. AST_TEST_UNREGISTER(publish);
  2008. AST_TEST_UNREGISTER(publish_sync);
  2009. AST_TEST_UNREGISTER(publish_pool);
  2010. AST_TEST_UNREGISTER(unsubscribe_stops_messages);
  2011. AST_TEST_UNREGISTER(forward);
  2012. AST_TEST_UNREGISTER(cache_filter);
  2013. AST_TEST_UNREGISTER(cache);
  2014. AST_TEST_UNREGISTER(cache_dump);
  2015. AST_TEST_UNREGISTER(cache_eid_aggregate);
  2016. AST_TEST_UNREGISTER(router);
  2017. AST_TEST_UNREGISTER(router_pool);
  2018. AST_TEST_UNREGISTER(router_cache_updates);
  2019. AST_TEST_UNREGISTER(interleaving);
  2020. AST_TEST_UNREGISTER(subscription_interleaving);
  2021. AST_TEST_UNREGISTER(no_to_json);
  2022. AST_TEST_UNREGISTER(to_json);
  2023. AST_TEST_UNREGISTER(no_to_ami);
  2024. AST_TEST_UNREGISTER(to_ami);
  2025. AST_TEST_UNREGISTER(dtor_order);
  2026. AST_TEST_UNREGISTER(caching_dtor_order);
  2027. AST_TEST_UNREGISTER(type_filters);
  2028. AST_TEST_UNREGISTER(formatter_filters);
  2029. AST_TEST_UNREGISTER(combo_filters);
  2030. return 0;
  2031. }
  2032. static int load_module(void)
  2033. {
  2034. AST_TEST_REGISTER(message_type);
  2035. AST_TEST_REGISTER(message);
  2036. AST_TEST_REGISTER(subscription_messages);
  2037. AST_TEST_REGISTER(subscription_pool_messages);
  2038. AST_TEST_REGISTER(publish);
  2039. AST_TEST_REGISTER(publish_sync);
  2040. AST_TEST_REGISTER(publish_pool);
  2041. AST_TEST_REGISTER(unsubscribe_stops_messages);
  2042. AST_TEST_REGISTER(forward);
  2043. AST_TEST_REGISTER(cache_filter);
  2044. AST_TEST_REGISTER(cache);
  2045. AST_TEST_REGISTER(cache_dump);
  2046. AST_TEST_REGISTER(cache_eid_aggregate);
  2047. AST_TEST_REGISTER(router);
  2048. AST_TEST_REGISTER(router_pool);
  2049. AST_TEST_REGISTER(router_cache_updates);
  2050. AST_TEST_REGISTER(interleaving);
  2051. AST_TEST_REGISTER(subscription_interleaving);
  2052. AST_TEST_REGISTER(no_to_json);
  2053. AST_TEST_REGISTER(to_json);
  2054. AST_TEST_REGISTER(no_to_ami);
  2055. AST_TEST_REGISTER(to_ami);
  2056. AST_TEST_REGISTER(dtor_order);
  2057. AST_TEST_REGISTER(caching_dtor_order);
  2058. AST_TEST_REGISTER(type_filters);
  2059. AST_TEST_REGISTER(formatter_filters);
  2060. AST_TEST_REGISTER(combo_filters);
  2061. return AST_MODULE_LOAD_SUCCESS;
  2062. }
  2063. AST_MODULE_INFO(ASTERISK_GPL_KEY, 0, "Stasis testing",
  2064. .support_level = AST_MODULE_SUPPORT_CORE,
  2065. .load = load_module,
  2066. .unload = unload_module
  2067. );