res_stasis_test.c 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286
  1. /*
  2. * Asterisk -- An open source telephony toolkit.
  3. *
  4. * Copyright (C) 2013, Digium, Inc.
  5. *
  6. * David M. Lee, II <dlee@digium.com>
  7. *
  8. * See http://www.asterisk.org for more information about
  9. * the Asterisk project. Please do not directly contact
  10. * any of the maintainers of this project for assistance;
  11. * the project provides a web site, mailing lists and IRC
  12. * channels for your use.
  13. *
  14. * This program is free software, distributed under the terms of
  15. * the GNU General Public License Version 2. See the LICENSE file
  16. * at the top of the source tree.
  17. */
  18. /*!
  19. * \file \brief Test infrastructure for dealing with Stasis.
  20. *
  21. * \author David M. Lee, II <dlee@digium.com>
  22. */
  23. /*** MODULEINFO
  24. <depend>TEST_FRAMEWORK</depend>
  25. <support_level>core</support_level>
  26. ***/
  27. #include "asterisk.h"
  28. ASTERISK_FILE_VERSION(__FILE__, "$Revision$");
  29. #include "asterisk/astobj2.h"
  30. #include "asterisk/module.h"
  31. #include "asterisk/stasis_test.h"
  32. STASIS_MESSAGE_TYPE_DEFN(stasis_test_message_type);
  33. static void stasis_message_sink_dtor(void *obj)
  34. {
  35. struct stasis_message_sink *sink = obj;
  36. {
  37. SCOPED_MUTEX(lock, &sink->lock);
  38. while (!sink->is_done) {
  39. /* Normally waiting forever is bad, but if we're not
  40. * done, we're not done. */
  41. ast_cond_wait(&sink->cond, &sink->lock);
  42. }
  43. }
  44. ast_mutex_destroy(&sink->lock);
  45. ast_cond_destroy(&sink->cond);
  46. while (sink->num_messages > 0) {
  47. ao2_cleanup(sink->messages[--sink->num_messages]);
  48. }
  49. ast_free(sink->messages);
  50. sink->messages = NULL;
  51. sink->max_messages = 0;
  52. }
  53. static struct timespec make_deadline(int timeout_millis)
  54. {
  55. struct timeval start = ast_tvnow();
  56. struct timeval delta = {
  57. .tv_sec = timeout_millis / 1000,
  58. .tv_usec = (timeout_millis % 1000) * 1000,
  59. };
  60. struct timeval deadline_tv = ast_tvadd(start, delta);
  61. struct timespec deadline = {
  62. .tv_sec = deadline_tv.tv_sec,
  63. .tv_nsec = 1000 * deadline_tv.tv_usec,
  64. };
  65. return deadline;
  66. }
  67. struct stasis_message_sink *stasis_message_sink_create(void)
  68. {
  69. RAII_VAR(struct stasis_message_sink *, sink, NULL, ao2_cleanup);
  70. sink = ao2_alloc(sizeof(*sink), stasis_message_sink_dtor);
  71. if (!sink) {
  72. return NULL;
  73. }
  74. ast_mutex_init(&sink->lock);
  75. ast_cond_init(&sink->cond, NULL);
  76. sink->max_messages = 4;
  77. sink->messages =
  78. ast_malloc(sizeof(*sink->messages) * sink->max_messages);
  79. if (!sink->messages) {
  80. return NULL;
  81. }
  82. ao2_ref(sink, +1);
  83. return sink;
  84. }
  85. /*!
  86. * \brief Implementation of the stasis_message_sink_cb() callback.
  87. *
  88. * Why the roundabout way of exposing this via stasis_message_sink_cb()? Well,
  89. * it has to do with how we load modules.
  90. *
  91. * Modules have their own metadata compiled into them in the module info block
  92. * at the end of the file.
  93. *
  94. * Asterisk loads the module, inspects the field, then loads any needed
  95. * dependencies. This works because Asterisk passes \c RTLD_LAZY to the initial
  96. * dlopen(), which defers binding function references until they are called.
  97. *
  98. * But when you take the address of a function, that function needs to be
  99. * available at load time. So if some module used the address of
  100. * message_sink_cb() directly, and \c res_stasis_test.so wasn't loaded yet, then
  101. * that module would fail to load.
  102. *
  103. * The stasis_message_sink_cb() function gives us a layer of indirection so that
  104. * the initial lazy binding will still work as expected.
  105. */
  106. static void message_sink_cb(void *data, struct stasis_subscription *sub,
  107. struct stasis_message *message)
  108. {
  109. struct stasis_message_sink *sink = data;
  110. SCOPED_MUTEX(lock, &sink->lock);
  111. if (stasis_subscription_final_message(sub, message)) {
  112. sink->is_done = 1;
  113. ast_cond_signal(&sink->cond);
  114. return;
  115. }
  116. if (stasis_subscription_change_type() == stasis_message_type(message)) {
  117. /* Ignore subscription changes */
  118. return;
  119. }
  120. if (sink->num_messages == sink->max_messages) {
  121. size_t new_max_messages = sink->max_messages * 2;
  122. struct stasis_message **new_messages = ast_realloc(
  123. sink->messages,
  124. sizeof(*new_messages) * new_max_messages);
  125. if (!new_messages) {
  126. return;
  127. }
  128. sink->max_messages = new_max_messages;
  129. sink->messages = new_messages;
  130. }
  131. ao2_ref(message, +1);
  132. sink->messages[sink->num_messages++] = message;
  133. ast_cond_signal(&sink->cond);
  134. }
  135. stasis_subscription_cb stasis_message_sink_cb(void)
  136. {
  137. return message_sink_cb;
  138. }
  139. int stasis_message_sink_wait_for_count(struct stasis_message_sink *sink,
  140. int num_messages, int timeout_millis)
  141. {
  142. struct timespec deadline = make_deadline(timeout_millis);
  143. SCOPED_MUTEX(lock, &sink->lock);
  144. while (sink->num_messages < num_messages) {
  145. int r = ast_cond_timedwait(&sink->cond, &sink->lock, &deadline);
  146. if (r == ETIMEDOUT) {
  147. break;
  148. }
  149. if (r != 0) {
  150. ast_log(LOG_ERROR, "Unexpected condition error: %s\n",
  151. strerror(r));
  152. break;
  153. }
  154. }
  155. return sink->num_messages;
  156. }
  157. int stasis_message_sink_should_stay(struct stasis_message_sink *sink,
  158. int num_messages, int timeout_millis)
  159. {
  160. struct timespec deadline = make_deadline(timeout_millis);
  161. SCOPED_MUTEX(lock, &sink->lock);
  162. while (sink->num_messages == num_messages) {
  163. int r = ast_cond_timedwait(&sink->cond, &sink->lock, &deadline);
  164. if (r == ETIMEDOUT) {
  165. break;
  166. }
  167. if (r != 0) {
  168. ast_log(LOG_ERROR, "Unexpected condition error: %s\n",
  169. strerror(r));
  170. break;
  171. }
  172. }
  173. return sink->num_messages;
  174. }
  175. int stasis_message_sink_wait_for(struct stasis_message_sink *sink, int start,
  176. stasis_wait_cb cmp_cb, const void *data, int timeout_millis)
  177. {
  178. struct timespec deadline = make_deadline(timeout_millis);
  179. SCOPED_MUTEX(lock, &sink->lock);
  180. /* wait for the start */
  181. while (sink->num_messages < start + 1) {
  182. int r = ast_cond_timedwait(&sink->cond, &sink->lock, &deadline);
  183. if (r == ETIMEDOUT) {
  184. /* Timed out waiting for the start */
  185. return -1;
  186. }
  187. if (r != 0) {
  188. ast_log(LOG_ERROR, "Unexpected condition error: %s\n",
  189. strerror(r));
  190. return -2;
  191. }
  192. }
  193. while (!cmp_cb(sink->messages[start], data)) {
  194. ++start;
  195. while (sink->num_messages < start + 1) {
  196. int r = ast_cond_timedwait(&sink->cond,
  197. &sink->lock, &deadline);
  198. if (r == ETIMEDOUT) {
  199. return -1;
  200. }
  201. if (r != 0) {
  202. ast_log(LOG_ERROR,
  203. "Unexpected condition error: %s\n",
  204. strerror(r));
  205. return -2;
  206. }
  207. }
  208. }
  209. return start;
  210. }
  211. struct stasis_message *stasis_test_message_create(void)
  212. {
  213. RAII_VAR(void *, data, NULL, ao2_cleanup);
  214. if (!stasis_test_message_type()) {
  215. return NULL;
  216. }
  217. /* We just need the unique pointer; don't care what's in it */
  218. data = ao2_alloc(1, NULL);
  219. if (!data) {
  220. return NULL;
  221. }
  222. return stasis_message_create(stasis_test_message_type(), data);
  223. }
  224. static int unload_module(void)
  225. {
  226. STASIS_MESSAGE_TYPE_CLEANUP(stasis_test_message_type);
  227. return 0;
  228. }
  229. static int load_module(void)
  230. {
  231. if (STASIS_MESSAGE_TYPE_INIT(stasis_test_message_type) != 0) {
  232. return AST_MODULE_LOAD_DECLINE;
  233. }
  234. return AST_MODULE_LOAD_SUCCESS;
  235. }
  236. AST_MODULE_INFO(ASTERISK_GPL_KEY, AST_MODFLAG_GLOBAL_SYMBOLS | AST_MODFLAG_LOAD_ORDER, "Stasis test utilities",
  237. .support_level = AST_MODULE_SUPPORT_CORE,
  238. .load = load_module,
  239. .unload = unload_module,
  240. .load_pri = AST_MODPRI_APP_DEPEND,
  241. );