endpoints.c 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500
  1. /*
  2. * Asterisk -- An open source telephony toolkit.
  3. *
  4. * Copyright (C) 2013, Digium, Inc.
  5. *
  6. * David M. Lee, II <dlee@digium.com>
  7. *
  8. * See http://www.asterisk.org for more information about
  9. * the Asterisk project. Please do not directly contact
  10. * any of the maintainers of this project for assistance;
  11. * the project provides a web site, mailing lists and IRC
  12. * channels for your use.
  13. *
  14. * This program is free software, distributed under the terms of
  15. * the GNU General Public License Version 2. See the LICENSE file
  16. * at the top of the source tree.
  17. */
  18. /*! \file
  19. *
  20. * \brief Asterisk endpoint API.
  21. *
  22. * \author David M. Lee, II <dlee@digium.com>
  23. */
  24. /*** MODULEINFO
  25. <support_level>core</support_level>
  26. ***/
  27. #include "asterisk.h"
  28. ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
  29. #include "asterisk/astobj2.h"
  30. #include "asterisk/endpoints.h"
  31. #include "asterisk/stasis.h"
  32. #include "asterisk/stasis_channels.h"
  33. #include "asterisk/stasis_endpoints.h"
  34. #include "asterisk/stasis_message_router.h"
  35. #include "asterisk/stringfields.h"
  36. #include "asterisk/_private.h"
  37. /*! Buckets for endpoint->channel mappings. Keep it prime! */
  38. #define ENDPOINT_CHANNEL_BUCKETS 127
  39. /*! Buckets for endpoint hash. Keep it prime! */
  40. #define ENDPOINT_BUCKETS 127
  41. /*! Buckets for technology endpoints. */
  42. #define TECH_ENDPOINT_BUCKETS 11
  43. static struct ao2_container *endpoints;
  44. static struct ao2_container *tech_endpoints;
  45. struct ast_endpoint {
  46. AST_DECLARE_STRING_FIELDS(
  47. AST_STRING_FIELD(tech); /*!< Technology (SIP, IAX2, etc.). */
  48. AST_STRING_FIELD(resource); /*!< Name, unique to the tech. */
  49. AST_STRING_FIELD(id); /*!< tech/resource id */
  50. );
  51. /*! Endpoint's current state */
  52. enum ast_endpoint_state state;
  53. /*!
  54. * \brief Max channels for this endpoint. -1 means unlimited or unknown.
  55. *
  56. * Note that this simply documents the limits of an endpoint, and does
  57. * nothing to try to enforce the limit.
  58. */
  59. int max_channels;
  60. /*! Topic for this endpoint's messages */
  61. struct stasis_cp_single *topics;
  62. /*! Router for handling this endpoint's messages */
  63. struct stasis_message_router *router;
  64. /*! ast_str_container of channels associated with this endpoint */
  65. struct ao2_container *channel_ids;
  66. /*! Forwarding subscription from an endpoint to its tech endpoint */
  67. struct stasis_forward *tech_forward;
  68. };
  69. AO2_STRING_FIELD_HASH_FN(ast_endpoint, id)
  70. AO2_STRING_FIELD_CMP_FN(ast_endpoint, id)
  71. struct ast_endpoint *ast_endpoint_find_by_id(const char *id)
  72. {
  73. struct ast_endpoint *endpoint = ao2_find(endpoints, id, OBJ_KEY);
  74. if (!endpoint) {
  75. endpoint = ao2_find(tech_endpoints, id, OBJ_KEY);
  76. }
  77. return endpoint;
  78. }
  79. struct stasis_topic *ast_endpoint_topic(struct ast_endpoint *endpoint)
  80. {
  81. if (!endpoint) {
  82. return ast_endpoint_topic_all();
  83. }
  84. return stasis_cp_single_topic(endpoint->topics);
  85. }
  86. struct stasis_topic *ast_endpoint_topic_cached(struct ast_endpoint *endpoint)
  87. {
  88. if (!endpoint) {
  89. return ast_endpoint_topic_all_cached();
  90. }
  91. return stasis_cp_single_topic_cached(endpoint->topics);
  92. }
  93. const char *ast_endpoint_state_to_string(enum ast_endpoint_state state)
  94. {
  95. switch (state) {
  96. case AST_ENDPOINT_UNKNOWN:
  97. return "unknown";
  98. case AST_ENDPOINT_OFFLINE:
  99. return "offline";
  100. case AST_ENDPOINT_ONLINE:
  101. return "online";
  102. }
  103. return "?";
  104. }
  105. static void endpoint_publish_snapshot(struct ast_endpoint *endpoint)
  106. {
  107. RAII_VAR(struct ast_endpoint_snapshot *, snapshot, NULL, ao2_cleanup);
  108. RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup);
  109. ast_assert(endpoint != NULL);
  110. ast_assert(endpoint->topics != NULL);
  111. if (!ast_endpoint_snapshot_type()) {
  112. return;
  113. }
  114. snapshot = ast_endpoint_snapshot_create(endpoint);
  115. if (!snapshot) {
  116. return;
  117. }
  118. message = stasis_message_create(ast_endpoint_snapshot_type(), snapshot);
  119. if (!message) {
  120. return;
  121. }
  122. stasis_publish(ast_endpoint_topic(endpoint), message);
  123. }
  124. static void endpoint_dtor(void *obj)
  125. {
  126. struct ast_endpoint *endpoint = obj;
  127. /* The router should be shut down already */
  128. ast_assert(stasis_message_router_is_done(endpoint->router));
  129. ao2_cleanup(endpoint->router);
  130. endpoint->router = NULL;
  131. stasis_cp_single_unsubscribe(endpoint->topics);
  132. endpoint->topics = NULL;
  133. ao2_cleanup(endpoint->channel_ids);
  134. endpoint->channel_ids = NULL;
  135. ast_string_field_free_memory(endpoint);
  136. }
  137. int ast_endpoint_add_channel(struct ast_endpoint *endpoint,
  138. struct ast_channel *chan)
  139. {
  140. ast_assert(chan != NULL);
  141. ast_assert(endpoint != NULL);
  142. ast_assert(!ast_strlen_zero(endpoint->resource));
  143. ast_channel_forward_endpoint(chan, endpoint);
  144. ao2_lock(endpoint);
  145. ast_str_container_add(endpoint->channel_ids, ast_channel_uniqueid(chan));
  146. ao2_unlock(endpoint);
  147. endpoint_publish_snapshot(endpoint);
  148. return 0;
  149. }
  150. /*! \brief Handler for channel snapshot cache clears */
  151. static void endpoint_cache_clear(void *data,
  152. struct stasis_subscription *sub,
  153. struct stasis_message *message)
  154. {
  155. struct ast_endpoint *endpoint = data;
  156. struct stasis_message *clear_msg = stasis_message_data(message);
  157. struct ast_channel_snapshot *clear_snapshot;
  158. if (stasis_message_type(clear_msg) != ast_channel_snapshot_type()) {
  159. return;
  160. }
  161. clear_snapshot = stasis_message_data(clear_msg);
  162. ast_assert(endpoint != NULL);
  163. ao2_lock(endpoint);
  164. ast_str_container_remove(endpoint->channel_ids, clear_snapshot->uniqueid);
  165. ao2_unlock(endpoint);
  166. endpoint_publish_snapshot(endpoint);
  167. }
  168. static void endpoint_subscription_change(void *data,
  169. struct stasis_subscription *sub,
  170. struct stasis_message *message)
  171. {
  172. struct stasis_endpoint *endpoint = data;
  173. if (stasis_subscription_final_message(sub, message)) {
  174. ao2_cleanup(endpoint);
  175. }
  176. }
  177. static struct ast_endpoint *endpoint_internal_create(const char *tech, const char *resource)
  178. {
  179. RAII_VAR(struct ast_endpoint *, endpoint, NULL, ao2_cleanup);
  180. RAII_VAR(struct ast_endpoint *, tech_endpoint, NULL, ao2_cleanup);
  181. int r = 0;
  182. /* Get/create the technology endpoint */
  183. if (!ast_strlen_zero(resource)) {
  184. tech_endpoint = ao2_find(tech_endpoints, tech, OBJ_KEY);
  185. if (!tech_endpoint) {
  186. tech_endpoint = endpoint_internal_create(tech, NULL);
  187. if (!tech_endpoint) {
  188. return NULL;
  189. }
  190. }
  191. }
  192. endpoint = ao2_alloc(sizeof(*endpoint), endpoint_dtor);
  193. if (!endpoint) {
  194. return NULL;
  195. }
  196. endpoint->max_channels = -1;
  197. endpoint->state = AST_ENDPOINT_UNKNOWN;
  198. if (ast_string_field_init(endpoint, 80) != 0) {
  199. return NULL;
  200. }
  201. ast_string_field_set(endpoint, tech, tech);
  202. ast_string_field_set(endpoint, resource, S_OR(resource, ""));
  203. ast_string_field_build(endpoint, id, "%s%s%s",
  204. tech,
  205. !ast_strlen_zero(resource) ? "/" : "",
  206. S_OR(resource, ""));
  207. /* All access to channel_ids should be covered by the endpoint's
  208. * lock; no extra lock needed. */
  209. endpoint->channel_ids = ast_str_container_alloc_options(
  210. AO2_ALLOC_OPT_LOCK_NOLOCK, ENDPOINT_CHANNEL_BUCKETS);
  211. if (!endpoint->channel_ids) {
  212. return NULL;
  213. }
  214. if (!ast_strlen_zero(resource)) {
  215. endpoint->topics = stasis_cp_single_create(ast_endpoint_cache_all(),
  216. endpoint->id);
  217. if (!endpoint->topics) {
  218. return NULL;
  219. }
  220. stasis_cp_single_accept_message_type(endpoint->topics, ast_endpoint_snapshot_type());
  221. stasis_cp_single_set_filter(endpoint->topics, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
  222. endpoint->router = stasis_message_router_create_pool(ast_endpoint_topic(endpoint));
  223. if (!endpoint->router) {
  224. return NULL;
  225. }
  226. r |= stasis_message_router_add(endpoint->router,
  227. stasis_cache_clear_type(), endpoint_cache_clear,
  228. endpoint);
  229. r |= stasis_message_router_add(endpoint->router,
  230. stasis_subscription_change_type(), endpoint_subscription_change,
  231. endpoint);
  232. if (r) {
  233. return NULL;
  234. }
  235. endpoint->tech_forward = stasis_forward_all(stasis_cp_single_topic(endpoint->topics),
  236. stasis_cp_single_topic(tech_endpoint->topics));
  237. endpoint_publish_snapshot(endpoint);
  238. ao2_link(endpoints, endpoint);
  239. } else {
  240. endpoint->topics = stasis_cp_sink_create(ast_endpoint_cache_all(),
  241. endpoint->id);
  242. if (!endpoint->topics) {
  243. return NULL;
  244. }
  245. stasis_cp_single_accept_message_type(endpoint->topics, ast_endpoint_snapshot_type());
  246. stasis_cp_single_set_filter(endpoint->topics, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
  247. ao2_link(tech_endpoints, endpoint);
  248. }
  249. ao2_ref(endpoint, +1);
  250. return endpoint;
  251. }
  252. struct ast_endpoint *ast_endpoint_create(const char *tech, const char *resource)
  253. {
  254. if (ast_strlen_zero(tech)) {
  255. ast_log(LOG_ERROR, "Endpoint tech cannot be empty\n");
  256. return NULL;
  257. }
  258. if (ast_strlen_zero(resource)) {
  259. ast_log(LOG_ERROR, "Endpoint resource cannot be empty\n");
  260. return NULL;
  261. }
  262. return endpoint_internal_create(tech, resource);
  263. }
  264. static struct stasis_message *create_endpoint_snapshot_message(struct ast_endpoint *endpoint)
  265. {
  266. RAII_VAR(struct ast_endpoint_snapshot *, snapshot, NULL, ao2_cleanup);
  267. if (!ast_endpoint_snapshot_type()) {
  268. return NULL;
  269. }
  270. snapshot = ast_endpoint_snapshot_create(endpoint);
  271. if (!snapshot) {
  272. return NULL;
  273. }
  274. return stasis_message_create(ast_endpoint_snapshot_type(), snapshot);
  275. }
  276. void ast_endpoint_shutdown(struct ast_endpoint *endpoint)
  277. {
  278. RAII_VAR(struct stasis_message *, clear_msg, NULL, ao2_cleanup);
  279. if (endpoint == NULL) {
  280. return;
  281. }
  282. ao2_unlink(endpoints, endpoint);
  283. endpoint->tech_forward = stasis_forward_cancel(endpoint->tech_forward);
  284. clear_msg = create_endpoint_snapshot_message(endpoint);
  285. if (clear_msg) {
  286. RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup);
  287. message = stasis_cache_clear_create(clear_msg);
  288. if (message) {
  289. stasis_publish(ast_endpoint_topic(endpoint), message);
  290. }
  291. }
  292. /* Bump refcount to hold on to the router */
  293. ao2_ref(endpoint->router, +1);
  294. stasis_message_router_unsubscribe(endpoint->router);
  295. }
  296. const char *ast_endpoint_get_tech(const struct ast_endpoint *endpoint)
  297. {
  298. if (!endpoint) {
  299. return NULL;
  300. }
  301. return endpoint->tech;
  302. }
  303. const char *ast_endpoint_get_resource(const struct ast_endpoint *endpoint)
  304. {
  305. if (!endpoint) {
  306. return NULL;
  307. }
  308. return endpoint->resource;
  309. }
  310. const char *ast_endpoint_get_id(const struct ast_endpoint *endpoint)
  311. {
  312. if (!endpoint) {
  313. return NULL;
  314. }
  315. return endpoint->id;
  316. }
  317. enum ast_endpoint_state ast_endpoint_get_state(const struct ast_endpoint *endpoint)
  318. {
  319. if (!endpoint) {
  320. return AST_ENDPOINT_UNKNOWN;
  321. }
  322. return endpoint->state;
  323. }
  324. void ast_endpoint_set_state(struct ast_endpoint *endpoint,
  325. enum ast_endpoint_state state)
  326. {
  327. ast_assert(endpoint != NULL);
  328. ast_assert(!ast_strlen_zero(endpoint->resource));
  329. ao2_lock(endpoint);
  330. endpoint->state = state;
  331. ao2_unlock(endpoint);
  332. endpoint_publish_snapshot(endpoint);
  333. }
  334. void ast_endpoint_set_max_channels(struct ast_endpoint *endpoint,
  335. int max_channels)
  336. {
  337. ast_assert(endpoint != NULL);
  338. ast_assert(!ast_strlen_zero(endpoint->resource));
  339. ao2_lock(endpoint);
  340. endpoint->max_channels = max_channels;
  341. ao2_unlock(endpoint);
  342. endpoint_publish_snapshot(endpoint);
  343. }
  344. static void endpoint_snapshot_dtor(void *obj)
  345. {
  346. struct ast_endpoint_snapshot *snapshot = obj;
  347. int channel;
  348. ast_assert(snapshot != NULL);
  349. for (channel = 0; channel < snapshot->num_channels; channel++) {
  350. ao2_ref(snapshot->channel_ids[channel], -1);
  351. }
  352. ast_string_field_free_memory(snapshot);
  353. }
  354. struct ast_endpoint_snapshot *ast_endpoint_snapshot_create(
  355. struct ast_endpoint *endpoint)
  356. {
  357. struct ast_endpoint_snapshot *snapshot;
  358. int channel_count;
  359. struct ao2_iterator i;
  360. void *obj;
  361. SCOPED_AO2LOCK(lock, endpoint);
  362. ast_assert(endpoint != NULL);
  363. ast_assert(!ast_strlen_zero(endpoint->resource));
  364. channel_count = ao2_container_count(endpoint->channel_ids);
  365. snapshot = ao2_alloc_options(
  366. sizeof(*snapshot) + channel_count * sizeof(char *),
  367. endpoint_snapshot_dtor,
  368. AO2_ALLOC_OPT_LOCK_NOLOCK);
  369. if (!snapshot || ast_string_field_init(snapshot, 80) != 0) {
  370. ao2_cleanup(snapshot);
  371. return NULL;
  372. }
  373. ast_string_field_build(snapshot, id, "%s/%s", endpoint->tech,
  374. endpoint->resource);
  375. ast_string_field_set(snapshot, tech, endpoint->tech);
  376. ast_string_field_set(snapshot, resource, endpoint->resource);
  377. snapshot->state = endpoint->state;
  378. snapshot->max_channels = endpoint->max_channels;
  379. i = ao2_iterator_init(endpoint->channel_ids, 0);
  380. while ((obj = ao2_iterator_next(&i))) {
  381. /* The reference is kept so the channel id does not go away until the snapshot is gone */
  382. snapshot->channel_ids[snapshot->num_channels++] = obj;
  383. }
  384. ao2_iterator_destroy(&i);
  385. return snapshot;
  386. }
  387. static void endpoint_cleanup(void)
  388. {
  389. ao2_cleanup(endpoints);
  390. endpoints = NULL;
  391. ao2_cleanup(tech_endpoints);
  392. tech_endpoints = NULL;
  393. }
  394. int ast_endpoint_init(void)
  395. {
  396. ast_register_cleanup(endpoint_cleanup);
  397. endpoints = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_MUTEX, 0, ENDPOINT_BUCKETS,
  398. ast_endpoint_hash_fn, NULL, ast_endpoint_cmp_fn);
  399. if (!endpoints) {
  400. return -1;
  401. }
  402. tech_endpoints = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_MUTEX, 0,
  403. TECH_ENDPOINT_BUCKETS, ast_endpoint_hash_fn, NULL, ast_endpoint_cmp_fn);
  404. if (!tech_endpoints) {
  405. return -1;
  406. }
  407. return 0;
  408. }