stasis_cache_pattern.c 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239
  1. /*
  2. * Asterisk -- An open source telephony toolkit.
  3. *
  4. * Copyright (C) 2013, Digium, Inc.
  5. *
  6. * David M. Lee, II <dlee@digium.com>
  7. *
  8. * See http://www.asterisk.org for more information about
  9. * the Asterisk project. Please do not directly contact
  10. * any of the maintainers of this project for assistance;
  11. * the project provides a web site, mailing lists and IRC
  12. * channels for your use.
  13. *
  14. * This program is free software, distributed under the terms of
  15. * the GNU General Public License Version 2. See the LICENSE file
  16. * at the top of the source tree.
  17. */
  18. /*! \file
  19. *
  20. * \brief Typical cache pattern for Stasis topics.
  21. *
  22. * \author David M. Lee, II <dlee@digium.com>
  23. */
  24. /*** MODULEINFO
  25. <support_level>core</support_level>
  26. ***/
  27. #include "asterisk.h"
  28. ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
  29. #include "asterisk/astobj2.h"
  30. #include "asterisk/stasis_cache_pattern.h"
  31. struct stasis_cp_all {
  32. struct stasis_topic *topic;
  33. struct stasis_topic *topic_cached;
  34. struct stasis_cache *cache;
  35. struct stasis_forward *forward_all_to_cached;
  36. };
  37. struct stasis_cp_single {
  38. struct stasis_topic *topic;
  39. struct stasis_caching_topic *topic_cached;
  40. struct stasis_forward *forward_topic_to_all;
  41. struct stasis_forward *forward_cached_to_all;
  42. };
  43. static void all_dtor(void *obj)
  44. {
  45. struct stasis_cp_all *all = obj;
  46. ao2_cleanup(all->topic);
  47. all->topic = NULL;
  48. ao2_cleanup(all->topic_cached);
  49. all->topic_cached = NULL;
  50. ao2_cleanup(all->cache);
  51. all->cache = NULL;
  52. stasis_forward_cancel(all->forward_all_to_cached);
  53. all->forward_all_to_cached = NULL;
  54. }
  55. struct stasis_cp_all *stasis_cp_all_create(const char *name,
  56. snapshot_get_id id_fn)
  57. {
  58. char *cached_name = NULL;
  59. struct stasis_cp_all *all;
  60. all = ao2_t_alloc(sizeof(*all), all_dtor, name);
  61. if (!all) {
  62. return NULL;
  63. }
  64. ast_asprintf(&cached_name, "%s-cached", name);
  65. if (!cached_name) {
  66. ao2_ref(all, -1);
  67. return NULL;
  68. }
  69. all->topic = stasis_topic_create(name);
  70. all->topic_cached = stasis_topic_create(cached_name);
  71. ast_free(cached_name);
  72. all->cache = stasis_cache_create(id_fn);
  73. all->forward_all_to_cached =
  74. stasis_forward_all(all->topic, all->topic_cached);
  75. if (!all->topic || !all->topic_cached || !all->cache ||
  76. !all->forward_all_to_cached) {
  77. ao2_ref(all, -1);
  78. return NULL;
  79. }
  80. return all;
  81. }
  82. struct stasis_topic *stasis_cp_all_topic(struct stasis_cp_all *all)
  83. {
  84. if (!all) {
  85. return NULL;
  86. }
  87. return all->topic;
  88. }
  89. struct stasis_topic *stasis_cp_all_topic_cached(
  90. struct stasis_cp_all *all)
  91. {
  92. if (!all) {
  93. return NULL;
  94. }
  95. return all->topic_cached;
  96. }
  97. struct stasis_cache *stasis_cp_all_cache(struct stasis_cp_all *all)
  98. {
  99. if (!all) {
  100. return NULL;
  101. }
  102. return all->cache;
  103. }
  104. static void one_dtor(void *obj)
  105. {
  106. struct stasis_cp_single *one = obj;
  107. /* Should already be unsubscribed */
  108. ast_assert(one->topic_cached == NULL);
  109. ast_assert(one->forward_topic_to_all == NULL);
  110. ast_assert(one->forward_cached_to_all == NULL);
  111. ao2_cleanup(one->topic);
  112. one->topic = NULL;
  113. }
  114. struct stasis_cp_single *stasis_cp_single_create(struct stasis_cp_all *all,
  115. const char *name)
  116. {
  117. struct stasis_cp_single *one;
  118. one = stasis_cp_sink_create(all, name);
  119. if (!one) {
  120. return NULL;
  121. }
  122. one->forward_topic_to_all = stasis_forward_all(one->topic, all->topic);
  123. one->forward_cached_to_all = stasis_forward_all(
  124. stasis_caching_get_topic(one->topic_cached), all->topic_cached);
  125. if (!one->forward_topic_to_all || !one->forward_cached_to_all) {
  126. ao2_ref(one, -1);
  127. return NULL;
  128. }
  129. return one;
  130. }
  131. struct stasis_cp_single *stasis_cp_sink_create(struct stasis_cp_all *all,
  132. const char *name)
  133. {
  134. struct stasis_cp_single *one;
  135. one = ao2_t_alloc(sizeof(*one), one_dtor, name);
  136. if (!one) {
  137. return NULL;
  138. }
  139. one->topic = stasis_topic_create(name);
  140. if (!one->topic) {
  141. ao2_ref(one, -1);
  142. return NULL;
  143. }
  144. one->topic_cached = stasis_caching_topic_create(one->topic, all->cache);
  145. if (!one->topic_cached) {
  146. ao2_ref(one, -1);
  147. return NULL;
  148. }
  149. return one;
  150. }
  151. void stasis_cp_single_unsubscribe(struct stasis_cp_single *one)
  152. {
  153. if (!one) {
  154. return;
  155. }
  156. stasis_forward_cancel(one->forward_topic_to_all);
  157. one->forward_topic_to_all = NULL;
  158. stasis_forward_cancel(one->forward_cached_to_all);
  159. one->forward_cached_to_all = NULL;
  160. stasis_caching_unsubscribe(one->topic_cached);
  161. one->topic_cached = NULL;
  162. ao2_cleanup(one);
  163. }
  164. struct stasis_topic *stasis_cp_single_topic(struct stasis_cp_single *one)
  165. {
  166. if (!one) {
  167. return NULL;
  168. }
  169. return one->topic;
  170. }
  171. struct stasis_topic *stasis_cp_single_topic_cached(
  172. struct stasis_cp_single *one)
  173. {
  174. if (!one) {
  175. return NULL;
  176. }
  177. return stasis_caching_get_topic(one->topic_cached);
  178. }
  179. int stasis_cp_single_accept_message_type(struct stasis_cp_single *one,
  180. struct stasis_message_type *type)
  181. {
  182. if (!one) {
  183. return -1;
  184. }
  185. return stasis_caching_accept_message_type(one->topic_cached, type);
  186. }
  187. int stasis_cp_single_set_filter(struct stasis_cp_single *one,
  188. enum stasis_subscription_message_filter filter)
  189. {
  190. if (!one) {
  191. return -1;
  192. }
  193. return stasis_caching_set_filter(one->topic_cached, filter);
  194. }