stasis_message_router.c 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450
  1. /*
  2. * Asterisk -- An open source telephony toolkit.
  3. *
  4. * Copyright (C) 2013, Digium, Inc.
  5. *
  6. * David M. Lee, II <dlee@digium.com>
  7. *
  8. * See http://www.asterisk.org for more information about
  9. * the Asterisk project. Please do not directly contact
  10. * any of the maintainers of this project for assistance;
  11. * the project provides a web site, mailing lists and IRC
  12. * channels for your use.
  13. *
  14. * This program is free software, distributed under the terms of
  15. * the GNU General Public License Version 2. See the LICENSE file
  16. * at the top of the source tree.
  17. */
  18. /*! \file
  19. *
  20. * \brief Stasis message router implementation.
  21. *
  22. * \author David M. Lee, II <dlee@digium.com>
  23. */
  24. /*** MODULEINFO
  25. <support_level>core</support_level>
  26. ***/
  27. #include "asterisk.h"
  28. ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
  29. #include "asterisk/astobj2.h"
  30. #include "asterisk/stasis_message_router.h"
  31. #include "asterisk/vector.h"
  32. /*! \internal */
  33. struct stasis_message_route {
  34. /*! Message type handle by this route. */
  35. struct stasis_message_type *message_type;
  36. /*! Callback function for incoming message processing. */
  37. stasis_subscription_cb callback;
  38. /*! Data pointer to be handed to the callback. */
  39. void *data;
  40. };
  41. AST_VECTOR(route_table, struct stasis_message_route);
  42. static struct stasis_message_route *route_table_find(struct route_table *table,
  43. struct stasis_message_type *message_type)
  44. {
  45. size_t idx;
  46. struct stasis_message_route *route;
  47. /* While a linear search for routes may seem very inefficient, most
  48. * route tables have six routes or less. For such small data, it's
  49. * hard to beat a linear search. If we start having larger route
  50. * tables, then we can look into containers with more efficient
  51. * lookups.
  52. */
  53. for (idx = 0; idx < AST_VECTOR_SIZE(table); ++idx) {
  54. route = AST_VECTOR_GET_ADDR(table, idx);
  55. if (route->message_type == message_type) {
  56. return route;
  57. }
  58. }
  59. return NULL;
  60. }
  61. /*!
  62. * \brief route_table comparator for AST_VECTOR_REMOVE_CMP_UNORDERED()
  63. *
  64. * \param elem Element to compare against
  65. * \param value Value to compare with the vector element.
  66. *
  67. * \return 0 if element does not match.
  68. * \return Non-zero if element matches.
  69. */
  70. #define ROUTE_TABLE_ELEM_CMP(elem, value) ((elem).message_type == (value))
  71. /*!
  72. * \brief route_table vector element cleanup.
  73. *
  74. * \param elem Element to cleanup
  75. *
  76. * \return Nothing
  77. */
  78. #define ROUTE_TABLE_ELEM_CLEANUP(elem) ao2_cleanup((elem).message_type)
  79. static int route_table_remove(struct route_table *table,
  80. struct stasis_message_type *message_type)
  81. {
  82. return AST_VECTOR_REMOVE_CMP_UNORDERED(table, message_type, ROUTE_TABLE_ELEM_CMP,
  83. ROUTE_TABLE_ELEM_CLEANUP);
  84. }
  85. static int route_table_add(struct route_table *table,
  86. struct stasis_message_type *message_type,
  87. stasis_subscription_cb callback, void *data)
  88. {
  89. struct stasis_message_route route;
  90. int res;
  91. ast_assert(callback != NULL);
  92. ast_assert(route_table_find(table, message_type) == NULL);
  93. route.message_type = ao2_bump(message_type);
  94. route.callback = callback;
  95. route.data = data;
  96. res = AST_VECTOR_APPEND(table, route);
  97. if (res) {
  98. ROUTE_TABLE_ELEM_CLEANUP(route);
  99. }
  100. return res;
  101. }
  102. static void route_table_dtor(struct route_table *table)
  103. {
  104. size_t idx;
  105. struct stasis_message_route *route;
  106. for (idx = 0; idx < AST_VECTOR_SIZE(table); ++idx) {
  107. route = AST_VECTOR_GET_ADDR(table, idx);
  108. ROUTE_TABLE_ELEM_CLEANUP(*route);
  109. }
  110. AST_VECTOR_FREE(table);
  111. }
  112. /*! \internal */
  113. struct stasis_message_router {
  114. /*! Subscription to the upstream topic */
  115. struct stasis_subscription *subscription;
  116. /*! Subscribed routes */
  117. struct route_table routes;
  118. /*! Subscribed routes for \ref stasis_cache_update messages */
  119. struct route_table cache_routes;
  120. /*! Route of last resort */
  121. struct stasis_message_route default_route;
  122. };
  123. static void router_dtor(void *obj)
  124. {
  125. struct stasis_message_router *router = obj;
  126. ast_assert(!stasis_subscription_is_subscribed(router->subscription));
  127. ast_assert(stasis_subscription_is_done(router->subscription));
  128. router->subscription = NULL;
  129. route_table_dtor(&router->routes);
  130. route_table_dtor(&router->cache_routes);
  131. }
  132. static int find_route(
  133. struct stasis_message_router *router,
  134. struct stasis_message *message,
  135. struct stasis_message_route *route_out)
  136. {
  137. struct stasis_message_route *route = NULL;
  138. struct stasis_message_type *type = stasis_message_type(message);
  139. SCOPED_AO2LOCK(lock, router);
  140. ast_assert(route_out != NULL);
  141. if (type == stasis_cache_update_type()) {
  142. /* Find a cache route */
  143. struct stasis_cache_update *update =
  144. stasis_message_data(message);
  145. route = route_table_find(&router->cache_routes, update->type);
  146. }
  147. if (route == NULL) {
  148. /* Find a regular route */
  149. route = route_table_find(&router->routes, type);
  150. }
  151. if (route == NULL && router->default_route.callback) {
  152. /* Maybe the default route, then? */
  153. route = &router->default_route;
  154. }
  155. if (!route) {
  156. return -1;
  157. }
  158. *route_out = *route;
  159. return 0;
  160. }
  161. static void router_dispatch(void *data,
  162. struct stasis_subscription *sub,
  163. struct stasis_message *message)
  164. {
  165. struct stasis_message_router *router = data;
  166. struct stasis_message_route route;
  167. if (find_route(router, message, &route) == 0) {
  168. route.callback(route.data, sub, message);
  169. }
  170. if (stasis_subscription_final_message(sub, message)) {
  171. ao2_cleanup(router);
  172. }
  173. }
  174. static struct stasis_message_router *stasis_message_router_create_internal(
  175. struct stasis_topic *topic, int use_thread_pool, const char *file, int lineno,
  176. const char *func)
  177. {
  178. int res;
  179. struct stasis_message_router *router;
  180. router = ao2_t_alloc(sizeof(*router), router_dtor, stasis_topic_name(topic));
  181. if (!router) {
  182. return NULL;
  183. }
  184. res = 0;
  185. res |= AST_VECTOR_INIT(&router->routes, 0);
  186. res |= AST_VECTOR_INIT(&router->cache_routes, 0);
  187. if (res) {
  188. ao2_ref(router, -1);
  189. return NULL;
  190. }
  191. if (use_thread_pool) {
  192. router->subscription = __stasis_subscribe_pool(topic, router_dispatch, router, file, lineno, func);
  193. } else {
  194. router->subscription = __stasis_subscribe(topic, router_dispatch, router, file, lineno, func);
  195. }
  196. if (!router->subscription) {
  197. ao2_ref(router, -1);
  198. return NULL;
  199. }
  200. /* We need to receive subscription change messages so we know when our subscription goes away */
  201. stasis_subscription_accept_message_type(router->subscription, stasis_subscription_change_type());
  202. return router;
  203. }
  204. struct stasis_message_router *__stasis_message_router_create(
  205. struct stasis_topic *topic, const char *file, int lineno, const char *func)
  206. {
  207. return stasis_message_router_create_internal(topic, 0, file, lineno, func);
  208. }
  209. struct stasis_message_router *__stasis_message_router_create_pool(
  210. struct stasis_topic *topic, const char *file, int lineno, const char *func)
  211. {
  212. return stasis_message_router_create_internal(topic, 1, file, lineno, func);
  213. }
  214. void stasis_message_router_unsubscribe(struct stasis_message_router *router)
  215. {
  216. if (!router) {
  217. return;
  218. }
  219. ao2_lock(router);
  220. router->subscription = stasis_unsubscribe(router->subscription);
  221. ao2_unlock(router);
  222. }
  223. void stasis_message_router_unsubscribe_and_join(
  224. struct stasis_message_router *router)
  225. {
  226. if (!router) {
  227. return;
  228. }
  229. stasis_unsubscribe_and_join(router->subscription);
  230. }
  231. int stasis_message_router_is_done(struct stasis_message_router *router)
  232. {
  233. if (!router) {
  234. /* Null router is about as done as you can get */
  235. return 1;
  236. }
  237. return stasis_subscription_is_done(router->subscription);
  238. }
  239. void stasis_message_router_publish_sync(struct stasis_message_router *router,
  240. struct stasis_message *message)
  241. {
  242. ast_assert(router != NULL);
  243. ao2_bump(router);
  244. stasis_publish_sync(router->subscription, message);
  245. ao2_cleanup(router);
  246. }
  247. int stasis_message_router_set_congestion_limits(struct stasis_message_router *router,
  248. long low_water, long high_water)
  249. {
  250. int res = -1;
  251. if (router) {
  252. res = stasis_subscription_set_congestion_limits(router->subscription,
  253. low_water, high_water);
  254. }
  255. return res;
  256. }
  257. int stasis_message_router_add(struct stasis_message_router *router,
  258. struct stasis_message_type *message_type,
  259. stasis_subscription_cb callback, void *data)
  260. {
  261. int res;
  262. ast_assert(router != NULL);
  263. if (!message_type) {
  264. /* Cannot route to NULL type. */
  265. return -1;
  266. }
  267. ao2_lock(router);
  268. res = route_table_add(&router->routes, message_type, callback, data);
  269. if (!res) {
  270. stasis_subscription_accept_message_type(router->subscription, message_type);
  271. /* Until a specific message type was added we would already drop the message, so being
  272. * selective now doesn't harm us. If we have a default route then we are already forced
  273. * to filter nothing and messages will come in regardless.
  274. */
  275. stasis_subscription_set_filter(router->subscription, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
  276. }
  277. ao2_unlock(router);
  278. return res;
  279. }
  280. int stasis_message_router_add_cache_update(struct stasis_message_router *router,
  281. struct stasis_message_type *message_type,
  282. stasis_subscription_cb callback, void *data)
  283. {
  284. int res;
  285. ast_assert(router != NULL);
  286. if (!message_type) {
  287. /* Cannot cache a route to NULL type. */
  288. return -1;
  289. }
  290. ao2_lock(router);
  291. res = route_table_add(&router->cache_routes, message_type, callback, data);
  292. if (!res) {
  293. stasis_subscription_accept_message_type(router->subscription, stasis_cache_update_type());
  294. stasis_subscription_set_filter(router->subscription, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
  295. }
  296. ao2_unlock(router);
  297. return res;
  298. }
  299. void stasis_message_router_remove(struct stasis_message_router *router,
  300. struct stasis_message_type *message_type)
  301. {
  302. ast_assert(router != NULL);
  303. if (!message_type) {
  304. /* Cannot remove a NULL type. */
  305. return;
  306. }
  307. ao2_lock(router);
  308. route_table_remove(&router->routes, message_type);
  309. ao2_unlock(router);
  310. }
  311. void stasis_message_router_remove_cache_update(
  312. struct stasis_message_router *router,
  313. struct stasis_message_type *message_type)
  314. {
  315. ast_assert(router != NULL);
  316. if (!message_type) {
  317. /* Cannot remove a NULL type. */
  318. return;
  319. }
  320. ao2_lock(router);
  321. route_table_remove(&router->cache_routes, message_type);
  322. ao2_unlock(router);
  323. }
  324. int stasis_message_router_set_default(struct stasis_message_router *router,
  325. stasis_subscription_cb callback,
  326. void *data)
  327. {
  328. stasis_message_router_set_formatters_default(router, callback, data, STASIS_SUBSCRIPTION_FORMATTER_NONE);
  329. /* While this implementation can never fail, it used to be able to */
  330. return 0;
  331. }
  332. void stasis_message_router_set_formatters_default(struct stasis_message_router *router,
  333. stasis_subscription_cb callback,
  334. void *data,
  335. enum stasis_subscription_message_formatters formatters)
  336. {
  337. ast_assert(router != NULL);
  338. ast_assert(callback != NULL);
  339. stasis_subscription_accept_formatters(router->subscription, formatters);
  340. ao2_lock(router);
  341. router->default_route.callback = callback;
  342. router->default_route.data = data;
  343. ao2_unlock(router);
  344. if (formatters == STASIS_SUBSCRIPTION_FORMATTER_NONE) {
  345. /* Formatters govern what messages the default callback get, so it is only if none is
  346. * specified that we accept all messages regardless.
  347. */
  348. stasis_subscription_set_filter(router->subscription, STASIS_SUBSCRIPTION_FILTER_FORCED_NONE);
  349. }
  350. }
  351. void stasis_message_router_accept_formatters(struct stasis_message_router *router,
  352. enum stasis_subscription_message_formatters formatters)
  353. {
  354. ast_assert(router != NULL);
  355. stasis_subscription_accept_formatters(router->subscription, formatters);
  356. return;
  357. }
  358. #ifdef AST_DEVMODE
  359. #undef stasis_message_router_create
  360. struct stasis_message_router *stasis_message_router_create(
  361. struct stasis_topic *topic);
  362. #undef stasis_message_router_create_pool
  363. struct stasis_message_router *stasis_message_router_create_pool(
  364. struct stasis_topic *topic);
  365. #endif
  366. struct stasis_message_router *stasis_message_router_create(
  367. struct stasis_topic *topic)
  368. {
  369. return stasis_message_router_create_internal(topic, 0, __FILE__, __LINE__, __PRETTY_FUNCTION__);
  370. }
  371. struct stasis_message_router *stasis_message_router_create_pool(
  372. struct stasis_topic *topic)
  373. {
  374. return stasis_message_router_create_internal(topic, 1, __FILE__, __LINE__, __PRETTY_FUNCTION__);
  375. }