app.c 42 KB


  1. /*
  2. * Asterisk -- An open source telephony toolkit.
  3. *
  4. * Copyright (C) 2013, Digium, Inc.
  5. *
  6. * David M. Lee, II <dlee@digium.com>
  7. *
  8. * See http://www.asterisk.org for more information about
  9. * the Asterisk project. Please do not directly contact
  10. * any of the maintainers of this project for assistance;
  11. * the project provides a web site, mailing lists and IRC
  12. * channels for your use.
  13. *
  14. * This program is free software, distributed under the terms of
  15. * the GNU General Public License Version 2. See the LICENSE file
  16. * at the top of the source tree.
  17. */
  18. /*! \file
  19. *
  20. * \brief Stasis application support.
  21. *
  22. * \author David M. Lee, II <dlee@digium.com>
  23. */
  24. #include "asterisk.h"
  25. ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
  26. #include "app.h"
  27. #include "control.h"
  28. #include "messaging.h"
  29. #include "asterisk/callerid.h"
  30. #include "asterisk/cli.h"
  31. #include "asterisk/stasis_app.h"
  32. #include "asterisk/stasis_bridges.h"
  33. #include "asterisk/stasis_channels.h"
  34. #include "asterisk/stasis_endpoints.h"
  35. #include "asterisk/stasis_message_router.h"
  36. #define BRIDGE_ALL "__AST_BRIDGE_ALL_TOPIC"
  37. #define CHANNEL_ALL "__AST_CHANNEL_ALL_TOPIC"
  38. #define ENDPOINT_ALL "__AST_ENDPOINT_ALL_TOPIC"
  39. /*! Global debug flag. No need for locking */
  40. int global_debug;
  41. static int unsubscribe(struct stasis_app *app, const char *kind, const char *id, int terminate);
  42. struct stasis_app {
  43. /*! Aggregation topic for this application. */
  44. struct stasis_topic *topic;
  45. /*! Router for handling messages forwarded to \a topic. */
  46. struct stasis_message_router *router;
  47. /*! Router for handling messages to the bridge all \a topic. */
  48. struct stasis_message_router *bridge_router;
  49. /*! Optional router for handling endpoint messages in 'all' subscriptions */
  50. struct stasis_message_router *endpoint_router;
  51. /*! Container of the channel forwards to this app's topic. */
  52. struct ao2_container *forwards;
  53. /*! Callback function for this application. */
  54. stasis_app_cb handler;
  55. /*! Opaque data to hand to callback function. */
  56. void *data;
  57. /*! Subscription model for the application */
  58. enum stasis_app_subscription_model subscription_model;
  59. /*! Whether or not someone wants to see debug messages about this app */
  60. int debug;
  61. /*! Name of the Stasis application */
  62. char name[];
  63. };
  64. enum forward_type {
  65. FORWARD_CHANNEL,
  66. FORWARD_BRIDGE,
  67. FORWARD_ENDPOINT,
  68. };
  69. /*! Subscription info for a particular channel/bridge. */
  70. struct app_forwards {
  71. /*! Count of number of times this channel/bridge has been subscribed */
  72. int interested;
  73. /*! Forward for the regular topic */
  74. struct stasis_forward *topic_forward;
  75. /*! Forward for the caching topic */
  76. struct stasis_forward *topic_cached_forward;
  77. /* Type of object being forwarded */
  78. enum forward_type forward_type;
  79. /*! Unique id of the object being forwarded */
  80. char id[];
  81. };
  82. static void forwards_dtor(void *obj)
  83. {
  84. #ifdef AST_DEVMODE
  85. struct app_forwards *forwards = obj;
  86. #endif /* AST_DEVMODE */
  87. ast_assert(forwards->topic_forward == NULL);
  88. ast_assert(forwards->topic_cached_forward == NULL);
  89. }
  90. static void forwards_unsubscribe(struct app_forwards *forwards)
  91. {
  92. stasis_forward_cancel(forwards->topic_forward);
  93. forwards->topic_forward = NULL;
  94. stasis_forward_cancel(forwards->topic_cached_forward);
  95. forwards->topic_cached_forward = NULL;
  96. }
  97. static struct app_forwards *forwards_create(struct stasis_app *app,
  98. const char *id)
  99. {
  100. struct app_forwards *forwards;
  101. if (!app || ast_strlen_zero(id)) {
  102. return NULL;
  103. }
  104. forwards = ao2_t_alloc(sizeof(*forwards) + strlen(id) + 1, forwards_dtor, id);
  105. if (!forwards) {
  106. return NULL;
  107. }
  108. strcpy(forwards->id, id); /* SAFE */
  109. return forwards;
  110. }
  111. /*! Forward a channel's topics to an app */
  112. static struct app_forwards *forwards_create_channel(struct stasis_app *app,
  113. struct ast_channel *chan)
  114. {
  115. struct app_forwards *forwards;
  116. if (!app) {
  117. return NULL;
  118. }
  119. forwards = forwards_create(app, chan ? ast_channel_uniqueid(chan) : CHANNEL_ALL);
  120. if (!forwards) {
  121. return NULL;
  122. }
  123. forwards->forward_type = FORWARD_CHANNEL;
  124. if (chan) {
  125. forwards->topic_forward = stasis_forward_all(ast_channel_topic(chan),
  126. app->topic);
  127. }
  128. forwards->topic_cached_forward = stasis_forward_all(
  129. chan ? ast_channel_topic_cached(chan) : ast_channel_topic_all_cached(),
  130. app->topic);
  131. if ((!forwards->topic_forward && chan) || !forwards->topic_cached_forward) {
  132. /* Half-subscribed is a bad thing */
  133. forwards_unsubscribe(forwards);
  134. ao2_ref(forwards, -1);
  135. return NULL;
  136. }
  137. return forwards;
  138. }
  139. /*! Forward a bridge's topics to an app */
  140. static struct app_forwards *forwards_create_bridge(struct stasis_app *app,
  141. struct ast_bridge *bridge)
  142. {
  143. struct app_forwards *forwards;
  144. if (!app) {
  145. return NULL;
  146. }
  147. forwards = forwards_create(app, bridge ? bridge->uniqueid : BRIDGE_ALL);
  148. if (!forwards) {
  149. return NULL;
  150. }
  151. forwards->forward_type = FORWARD_BRIDGE;
  152. if (bridge) {
  153. forwards->topic_forward = stasis_forward_all(ast_bridge_topic(bridge),
  154. app->topic);
  155. }
  156. forwards->topic_cached_forward = stasis_forward_all(
  157. bridge ? ast_bridge_topic_cached(bridge) : ast_bridge_topic_all_cached(),
  158. app->topic);
  159. if ((!forwards->topic_forward && bridge) || !forwards->topic_cached_forward) {
  160. /* Half-subscribed is a bad thing */
  161. forwards_unsubscribe(forwards);
  162. ao2_ref(forwards, -1);
  163. return NULL;
  164. }
  165. return forwards;
  166. }
  167. static void endpoint_state_cb(void *data, struct stasis_subscription *sub,
  168. struct stasis_message *message)
  169. {
  170. struct stasis_app *app = data;
  171. stasis_publish(app->topic, message);
  172. }
  173. /*! Forward a endpoint's topics to an app */
  174. static struct app_forwards *forwards_create_endpoint(struct stasis_app *app,
  175. struct ast_endpoint *endpoint)
  176. {
  177. struct app_forwards *forwards;
  178. int ret = 0;
  179. if (!app) {
  180. return NULL;
  181. }
  182. forwards = forwards_create(app, endpoint ? ast_endpoint_get_id(endpoint) : ENDPOINT_ALL);
  183. if (!forwards) {
  184. return NULL;
  185. }
  186. forwards->forward_type = FORWARD_ENDPOINT;
  187. if (endpoint) {
  188. forwards->topic_forward = stasis_forward_all(ast_endpoint_topic(endpoint),
  189. app->topic);
  190. forwards->topic_cached_forward = stasis_forward_all(
  191. ast_endpoint_topic_cached(endpoint), app->topic);
  192. if (!forwards->topic_forward || !forwards->topic_cached_forward) {
  193. /* Half-subscribed is a bad thing */
  194. forwards_unsubscribe(forwards);
  195. ao2_ref(forwards, -1);
  196. return NULL;
  197. }
  198. } else {
  199. /* Since endpoint subscriptions also subscribe to channels, in the case
  200. * of all endpoint subscriptions, we only want messages for the endpoints.
  201. * As such, we route those particular messages and then re-publish them
  202. * on the app's topic.
  203. */
  204. ast_assert(app->endpoint_router == NULL);
  205. app->endpoint_router = stasis_message_router_create(ast_endpoint_topic_all_cached());
  206. if (!app->endpoint_router) {
  207. forwards_unsubscribe(forwards);
  208. ao2_ref(forwards, -1);
  209. return NULL;
  210. }
  211. ret |= stasis_message_router_add(app->endpoint_router,
  212. ast_endpoint_state_type(), endpoint_state_cb, app);
  213. ret |= stasis_message_router_add(app->endpoint_router,
  214. ast_endpoint_contact_state_type(), endpoint_state_cb, app);
  215. if (ret) {
  216. ao2_ref(app->endpoint_router, -1);
  217. app->endpoint_router = NULL;
  218. ao2_ref(forwards, -1);
  219. return NULL;
  220. }
  221. }
  222. return forwards;
  223. }
  224. static int forwards_sort(const void *obj_left, const void *obj_right, int flags)
  225. {
  226. const struct app_forwards *object_left = obj_left;
  227. const struct app_forwards *object_right = obj_right;
  228. const char *right_key = obj_right;
  229. int cmp;
  230. switch (flags & (OBJ_POINTER | OBJ_KEY | OBJ_PARTIAL_KEY)) {
  231. case OBJ_POINTER:
  232. right_key = object_right->id;
  233. /* Fall through */
  234. case OBJ_KEY:
  235. cmp = strcmp(object_left->id, right_key);
  236. break;
  237. case OBJ_PARTIAL_KEY:
  238. /*
  239. * We could also use a partial key struct containing a length
  240. * so strlen() does not get called for every comparison instead.
  241. */
  242. cmp = strncmp(object_left->id, right_key, strlen(right_key));
  243. break;
  244. default:
  245. /* Sort can only work on something with a full or partial key. */
  246. ast_assert(0);
  247. cmp = 0;
  248. break;
  249. }
  250. return cmp;
  251. }
  252. static void app_dtor(void *obj)
  253. {
  254. struct stasis_app *app = obj;
  255. size_t size = strlen("stasis-") + strlen(app->name) + 1;
  256. char context_name[size];
  257. ast_verb(1, "Destroying Stasis app %s\n", app->name);
  258. ast_assert(app->router == NULL);
  259. ast_assert(app->bridge_router == NULL);
  260. ast_assert(app->endpoint_router == NULL);
  261. /* If we created a context for this application, remove it */
  262. strcpy(context_name, "stasis-");
  263. strcat(context_name, app->name);
  264. ast_context_destroy_by_name(context_name, "res_stasis");
  265. ao2_cleanup(app->topic);
  266. app->topic = NULL;
  267. ao2_cleanup(app->forwards);
  268. app->forwards = NULL;
  269. ao2_cleanup(app->data);
  270. app->data = NULL;
  271. }
  272. static void call_forwarded_handler(struct stasis_app *app, struct stasis_message *message)
  273. {
  274. struct ast_multi_channel_blob *payload = stasis_message_data(message);
  275. struct ast_channel_snapshot *snapshot = ast_multi_channel_blob_get_channel(payload, "forwarded");
  276. struct ast_channel *chan;
  277. if (!snapshot) {
  278. return;
  279. }
  280. chan = ast_channel_get_by_name(snapshot->uniqueid);
  281. if (!chan) {
  282. return;
  283. }
  284. app_subscribe_channel(app, chan);
  285. ast_channel_unref(chan);
  286. }
  287. static void sub_subscription_change_handler(void *data, struct stasis_subscription *sub,
  288. struct stasis_message *message)
  289. {
  290. struct stasis_app *app = data;
  291. if (stasis_subscription_final_message(sub, message)) {
  292. ao2_cleanup(app);
  293. }
  294. }
  295. static void sub_default_handler(void *data, struct stasis_subscription *sub,
  296. struct stasis_message *message)
  297. {
  298. struct stasis_app *app = data;
  299. struct ast_json *json;
  300. /* The dial type can be converted to JSON so it will always be passed
  301. * here.
  302. */
  303. if (stasis_message_type(message) == ast_channel_dial_type()) {
  304. call_forwarded_handler(app, message);
  305. }
  306. /* By default, send any message that has a JSON representation */
  307. json = stasis_message_to_json(message, stasis_app_get_sanitizer());
  308. if (!json) {
  309. return;
  310. }
  311. app_send(app, json);
  312. ast_json_unref(json);
  313. }
  314. /*! \brief Typedef for callbacks that get called on channel snapshot updates */
  315. typedef struct ast_json *(*channel_snapshot_monitor)(
  316. struct ast_channel_snapshot *old_snapshot,
  317. struct ast_channel_snapshot *new_snapshot,
  318. const struct timeval *tv);
  319. static struct ast_json *simple_channel_event(
  320. const char *type,
  321. struct ast_channel_snapshot *snapshot,
  322. const struct timeval *tv)
  323. {
  324. struct ast_json *json_channel = ast_channel_snapshot_to_json(snapshot, stasis_app_get_sanitizer());
  325. if (!json_channel) {
  326. return NULL;
  327. }
  328. return ast_json_pack("{s: s, s: o, s: o}",
  329. "type", type,
  330. "timestamp", ast_json_timeval(*tv, NULL),
  331. "channel", json_channel);
  332. }
  333. static struct ast_json *channel_created_event(
  334. struct ast_channel_snapshot *snapshot,
  335. const struct timeval *tv)
  336. {
  337. return simple_channel_event("ChannelCreated", snapshot, tv);
  338. }
  339. static struct ast_json *channel_destroyed_event(
  340. struct ast_channel_snapshot *snapshot,
  341. const struct timeval *tv)
  342. {
  343. struct ast_json *json_channel = ast_channel_snapshot_to_json(snapshot, stasis_app_get_sanitizer());
  344. if (!json_channel) {
  345. return NULL;
  346. }
  347. return ast_json_pack("{s: s, s: o, s: i, s: s, s: o}",
  348. "type", "ChannelDestroyed",
  349. "timestamp", ast_json_timeval(*tv, NULL),
  350. "cause", snapshot->hangupcause,
  351. "cause_txt", ast_cause2str(snapshot->hangupcause),
  352. "channel", json_channel);
  353. }
  354. static struct ast_json *channel_state_change_event(
  355. struct ast_channel_snapshot *snapshot,
  356. const struct timeval *tv)
  357. {
  358. return simple_channel_event("ChannelStateChange", snapshot, tv);
  359. }
  360. /*! \brief Handle channel state changes */
  361. static struct ast_json *channel_state(
  362. struct ast_channel_snapshot *old_snapshot,
  363. struct ast_channel_snapshot *new_snapshot,
  364. const struct timeval *tv)
  365. {
  366. struct ast_channel_snapshot *snapshot = new_snapshot ?
  367. new_snapshot : old_snapshot;
  368. if (!old_snapshot) {
  369. return channel_created_event(snapshot, tv);
  370. } else if (!new_snapshot) {
  371. return channel_destroyed_event(snapshot, tv);
  372. } else if (old_snapshot->state != new_snapshot->state) {
  373. return channel_state_change_event(snapshot, tv);
  374. }
  375. return NULL;
  376. }
  377. static struct ast_json *channel_dialplan(
  378. struct ast_channel_snapshot *old_snapshot,
  379. struct ast_channel_snapshot *new_snapshot,
  380. const struct timeval *tv)
  381. {
  382. struct ast_json *json_channel;
  383. /* No Newexten event on cache clear or first event */
  384. if (!old_snapshot || !new_snapshot) {
  385. return NULL;
  386. }
  387. /* Empty application is not valid for a Newexten event */
  388. if (ast_strlen_zero(new_snapshot->appl)) {
  389. return NULL;
  390. }
  391. if (ast_channel_snapshot_cep_equal(old_snapshot, new_snapshot)) {
  392. return NULL;
  393. }
  394. json_channel = ast_channel_snapshot_to_json(new_snapshot, stasis_app_get_sanitizer());
  395. if (!json_channel) {
  396. return NULL;
  397. }
  398. return ast_json_pack("{s: s, s: o, s: s, s: s, s: o}",
  399. "type", "ChannelDialplan",
  400. "timestamp", ast_json_timeval(*tv, NULL),
  401. "dialplan_app", new_snapshot->appl,
  402. "dialplan_app_data", AST_JSON_UTF8_VALIDATE(new_snapshot->data),
  403. "channel", json_channel);
  404. }
  405. static struct ast_json *channel_callerid(
  406. struct ast_channel_snapshot *old_snapshot,
  407. struct ast_channel_snapshot *new_snapshot,
  408. const struct timeval *tv)
  409. {
  410. struct ast_json *json_channel;
  411. /* No NewCallerid event on cache clear or first event */
  412. if (!old_snapshot || !new_snapshot) {
  413. return NULL;
  414. }
  415. if (ast_channel_snapshot_caller_id_equal(old_snapshot, new_snapshot)) {
  416. return NULL;
  417. }
  418. json_channel = ast_channel_snapshot_to_json(new_snapshot, stasis_app_get_sanitizer());
  419. if (!json_channel) {
  420. return NULL;
  421. }
  422. return ast_json_pack("{s: s, s: o, s: i, s: s, s: o}",
  423. "type", "ChannelCallerId",
  424. "timestamp", ast_json_timeval(*tv, NULL),
  425. "caller_presentation", new_snapshot->caller_pres,
  426. "caller_presentation_txt", ast_describe_caller_presentation(
  427. new_snapshot->caller_pres),
  428. "channel", json_channel);
  429. }
  430. static struct ast_json *channel_connected_line(
  431. struct ast_channel_snapshot *old_snapshot,
  432. struct ast_channel_snapshot *new_snapshot,
  433. const struct timeval *tv)
  434. {
  435. struct ast_json *json_channel;
  436. /* No ChannelConnectedLine event on cache clear or first event */
  437. if (!old_snapshot || !new_snapshot) {
  438. return NULL;
  439. }
  440. if (ast_channel_snapshot_connected_line_equal(old_snapshot, new_snapshot)) {
  441. return NULL;
  442. }
  443. json_channel = ast_channel_snapshot_to_json(new_snapshot, stasis_app_get_sanitizer());
  444. if (!json_channel) {
  445. return NULL;
  446. }
  447. return ast_json_pack("{s: s, s: o, s: o}",
  448. "type", "ChannelConnectedLine",
  449. "timestamp", ast_json_timeval(*tv, NULL),
  450. "channel", json_channel);
  451. }
  452. static channel_snapshot_monitor channel_monitors[] = {
  453. channel_state,
  454. channel_dialplan,
  455. channel_callerid,
  456. channel_connected_line,
  457. };
  458. static void sub_channel_update_handler(void *data,
  459. struct stasis_subscription *sub,
  460. struct stasis_message *message)
  461. {
  462. struct stasis_app *app = data;
  463. struct stasis_cache_update *update;
  464. struct ast_channel_snapshot *new_snapshot;
  465. struct ast_channel_snapshot *old_snapshot;
  466. const struct timeval *tv;
  467. int i;
  468. ast_assert(stasis_message_type(message) == stasis_cache_update_type());
  469. update = stasis_message_data(message);
  470. ast_assert(update->type == ast_channel_snapshot_type());
  471. new_snapshot = stasis_message_data(update->new_snapshot);
  472. old_snapshot = stasis_message_data(update->old_snapshot);
  473. /* Pull timestamp from the new snapshot, or from the update message
  474. * when there isn't one. */
  475. tv = update->new_snapshot ?
  476. stasis_message_timestamp(update->new_snapshot) :
  477. stasis_message_timestamp(message);
  478. for (i = 0; i < ARRAY_LEN(channel_monitors); ++i) {
  479. struct ast_json *msg;
  480. msg = channel_monitors[i](old_snapshot, new_snapshot, tv);
  481. if (msg) {
  482. app_send(app, msg);
  483. ast_json_unref(msg);
  484. }
  485. }
  486. if (!new_snapshot && old_snapshot) {
  487. unsubscribe(app, "channel", old_snapshot->uniqueid, 1);
  488. }
  489. }
  490. static struct ast_json *simple_endpoint_event(
  491. const char *type,
  492. struct ast_endpoint_snapshot *snapshot,
  493. const struct timeval *tv)
  494. {
  495. struct ast_json *json_endpoint = ast_endpoint_snapshot_to_json(snapshot, stasis_app_get_sanitizer());
  496. if (!json_endpoint) {
  497. return NULL;
  498. }
  499. return ast_json_pack("{s: s, s: o, s: o}",
  500. "type", type,
  501. "timestamp", ast_json_timeval(*tv, NULL),
  502. "endpoint", json_endpoint);
  503. }
  504. static int message_received_handler(const char *endpoint_id, struct ast_json *json_msg, void *pvt)
  505. {
  506. struct ast_endpoint_snapshot *snapshot;
  507. struct ast_json *json_endpoint;
  508. struct ast_json *message;
  509. struct stasis_app *app = pvt;
  510. char *tech;
  511. char *resource;
  512. tech = ast_strdupa(endpoint_id);
  513. resource = strchr(tech, '/');
  514. if (resource) {
  515. resource[0] = '\0';
  516. resource++;
  517. }
  518. if (ast_strlen_zero(tech) || ast_strlen_zero(resource)) {
  519. return -1;
  520. }
  521. snapshot = ast_endpoint_latest_snapshot(tech, resource);
  522. if (!snapshot) {
  523. return -1;
  524. }
  525. json_endpoint = ast_endpoint_snapshot_to_json(snapshot, stasis_app_get_sanitizer());
  526. ao2_ref(snapshot, -1);
  527. if (!json_endpoint) {
  528. return -1;
  529. }
  530. message = ast_json_pack("{s: s, s: o, s: o, s: o}",
  531. "type", "TextMessageReceived",
  532. "timestamp", ast_json_timeval(ast_tvnow(), NULL),
  533. "endpoint", json_endpoint,
  534. "message", ast_json_ref(json_msg));
  535. if (message) {
  536. app_send(app, message);
  537. ast_json_unref(message);
  538. }
  539. return 0;
  540. }
  541. static void sub_endpoint_update_handler(void *data,
  542. struct stasis_subscription *sub,
  543. struct stasis_message *message)
  544. {
  545. struct stasis_app *app = data;
  546. struct stasis_cache_update *update;
  547. struct ast_endpoint_snapshot *new_snapshot;
  548. struct ast_endpoint_snapshot *old_snapshot;
  549. const struct timeval *tv;
  550. ast_assert(stasis_message_type(message) == stasis_cache_update_type());
  551. update = stasis_message_data(message);
  552. ast_assert(update->type == ast_endpoint_snapshot_type());
  553. new_snapshot = stasis_message_data(update->new_snapshot);
  554. old_snapshot = stasis_message_data(update->old_snapshot);
  555. if (new_snapshot) {
  556. struct ast_json *json;
  557. tv = stasis_message_timestamp(update->new_snapshot);
  558. json = simple_endpoint_event("EndpointStateChange", new_snapshot, tv);
  559. if (!json) {
  560. return;
  561. }
  562. app_send(app, json);
  563. ast_json_unref(json);
  564. }
  565. if (!new_snapshot && old_snapshot) {
  566. unsubscribe(app, "endpoint", old_snapshot->id, 1);
  567. }
  568. }
  569. static struct ast_json *simple_bridge_event(
  570. const char *type,
  571. struct ast_bridge_snapshot *snapshot,
  572. const struct timeval *tv)
  573. {
  574. struct ast_json *json_bridge = ast_bridge_snapshot_to_json(snapshot, stasis_app_get_sanitizer());
  575. if (!json_bridge) {
  576. return NULL;
  577. }
  578. return ast_json_pack("{s: s, s: o, s: o}",
  579. "type", type,
  580. "timestamp", ast_json_timeval(*tv, NULL),
  581. "bridge", json_bridge);
  582. }
  583. static void sub_bridge_update_handler(void *data,
  584. struct stasis_subscription *sub,
  585. struct stasis_message *message)
  586. {
  587. struct ast_json *json = NULL;
  588. struct stasis_app *app = data;
  589. struct stasis_cache_update *update;
  590. struct ast_bridge_snapshot *new_snapshot;
  591. struct ast_bridge_snapshot *old_snapshot;
  592. const struct timeval *tv;
  593. ast_assert(stasis_message_type(message) == stasis_cache_update_type());
  594. update = stasis_message_data(message);
  595. ast_assert(update->type == ast_bridge_snapshot_type());
  596. new_snapshot = stasis_message_data(update->new_snapshot);
  597. old_snapshot = stasis_message_data(update->old_snapshot);
  598. tv = update->new_snapshot ?
  599. stasis_message_timestamp(update->new_snapshot) :
  600. stasis_message_timestamp(message);
  601. if (!new_snapshot) {
  602. json = simple_bridge_event("BridgeDestroyed", old_snapshot, tv);
  603. } else if (!old_snapshot) {
  604. json = simple_bridge_event("BridgeCreated", new_snapshot, tv);
  605. } else if (new_snapshot && old_snapshot
  606. && strcmp(new_snapshot->video_source_id, old_snapshot->video_source_id)) {
  607. json = simple_bridge_event("BridgeVideoSourceChanged", new_snapshot, tv);
  608. if (json && !ast_strlen_zero(old_snapshot->video_source_id)) {
  609. ast_json_object_set(json, "old_video_source_id",
  610. ast_json_string_create(old_snapshot->video_source_id));
  611. }
  612. }
  613. if (json) {
  614. app_send(app, json);
  615. ast_json_unref(json);
  616. }
  617. if (!new_snapshot && old_snapshot) {
  618. unsubscribe(app, "bridge", old_snapshot->uniqueid, 1);
  619. }
  620. }
  621. /*! \brief Helper function for determining if the application is subscribed to a given entity */
  622. static int bridge_app_subscribed(struct stasis_app *app, const char *uniqueid)
  623. {
  624. struct app_forwards *forwards = NULL;
  625. forwards = ao2_find(app->forwards, uniqueid, OBJ_SEARCH_KEY);
  626. if (!forwards) {
  627. return 0;
  628. }
  629. ao2_ref(forwards, -1);
  630. return 1;
  631. }
  632. static void bridge_merge_handler(void *data, struct stasis_subscription *sub,
  633. struct stasis_message *message)
  634. {
  635. struct stasis_app *app = data;
  636. struct ast_bridge_merge_message *merge;
  637. merge = stasis_message_data(message);
  638. /* Find out if we're subscribed to either bridge */
  639. if (bridge_app_subscribed(app, merge->from->uniqueid) ||
  640. bridge_app_subscribed(app, merge->to->uniqueid)) {
  641. /* Forward the message to the app */
  642. stasis_publish(app->topic, message);
  643. }
  644. }
  645. /*! \brief Callback function for checking if channels in a bridge are subscribed to */
  646. static int bridge_app_subscribed_involved(struct stasis_app *app, struct ast_bridge_snapshot *snapshot)
  647. {
  648. int subscribed = 0;
  649. struct ao2_iterator iter;
  650. char *uniqueid;
  651. if (bridge_app_subscribed(app, snapshot->uniqueid)) {
  652. return 1;
  653. }
  654. iter = ao2_iterator_init(snapshot->channels, 0);
  655. for (; (uniqueid = ao2_iterator_next(&iter)); ao2_ref(uniqueid, -1)) {
  656. if (bridge_app_subscribed(app, uniqueid)) {
  657. subscribed = 1;
  658. ao2_ref(uniqueid, -1);
  659. break;
  660. }
  661. }
  662. ao2_iterator_destroy(&iter);
  663. return subscribed;
  664. }
  665. static void bridge_blind_transfer_handler(void *data, struct stasis_subscription *sub,
  666. struct stasis_message *message)
  667. {
  668. struct stasis_app *app = data;
  669. struct ast_blind_transfer_message *transfer_msg = stasis_message_data(message);
  670. struct ast_bridge_snapshot *bridge = transfer_msg->bridge;
  671. if (bridge_app_subscribed(app, transfer_msg->transferer->uniqueid) ||
  672. (bridge && bridge_app_subscribed_involved(app, bridge))) {
  673. stasis_publish(app->topic, message);
  674. }
  675. }
  676. static void bridge_attended_transfer_handler(void *data, struct stasis_subscription *sub,
  677. struct stasis_message *message)
  678. {
  679. struct stasis_app *app = data;
  680. struct ast_attended_transfer_message *transfer_msg = stasis_message_data(message);
  681. int subscribed = 0;
  682. subscribed = bridge_app_subscribed(app, transfer_msg->to_transferee.channel_snapshot->uniqueid);
  683. if (!subscribed) {
  684. subscribed = bridge_app_subscribed(app, transfer_msg->to_transfer_target.channel_snapshot->uniqueid);
  685. }
  686. if (!subscribed && transfer_msg->to_transferee.bridge_snapshot) {
  687. subscribed = bridge_app_subscribed_involved(app, transfer_msg->to_transferee.bridge_snapshot);
  688. }
  689. if (!subscribed && transfer_msg->to_transfer_target.bridge_snapshot) {
  690. subscribed = bridge_app_subscribed_involved(app, transfer_msg->to_transfer_target.bridge_snapshot);
  691. }
  692. if (!subscribed) {
  693. switch (transfer_msg->dest_type) {
  694. case AST_ATTENDED_TRANSFER_DEST_BRIDGE_MERGE:
  695. subscribed = bridge_app_subscribed(app, transfer_msg->dest.bridge);
  696. break;
  697. case AST_ATTENDED_TRANSFER_DEST_LINK:
  698. subscribed = bridge_app_subscribed(app, transfer_msg->dest.links[0]->uniqueid);
  699. if (!subscribed) {
  700. subscribed = bridge_app_subscribed(app, transfer_msg->dest.links[1]->uniqueid);
  701. }
  702. break;
  703. break;
  704. case AST_ATTENDED_TRANSFER_DEST_THREEWAY:
  705. subscribed = bridge_app_subscribed_involved(app, transfer_msg->dest.threeway.bridge_snapshot);
  706. if (!subscribed) {
  707. subscribed = bridge_app_subscribed(app, transfer_msg->dest.threeway.channel_snapshot->uniqueid);
  708. }
  709. break;
  710. default:
  711. break;
  712. }
  713. }
  714. if (subscribed) {
  715. stasis_publish(app->topic, message);
  716. }
  717. }
  718. static void bridge_subscription_change_handler(void *data, struct stasis_subscription *sub,
  719. struct stasis_message *message)
  720. {
  721. struct stasis_app *app = data;
  722. if (stasis_subscription_final_message(sub, message)) {
  723. ao2_cleanup(app);
  724. }
  725. }
  726. void stasis_app_set_debug(struct stasis_app *app, int debug)
  727. {
  728. if (!app) {
  729. return;
  730. }
  731. app->debug = debug;
  732. }
  733. void stasis_app_set_debug_by_name(const char *app_name, int debug)
  734. {
  735. struct stasis_app *app = stasis_app_get_by_name(app_name);
  736. if (!app) {
  737. return;
  738. }
  739. app->debug = debug;
  740. ao2_cleanup(app);
  741. }
  742. int stasis_app_get_debug(struct stasis_app *app)
  743. {
  744. return (app ? app->debug : 0) || global_debug;
  745. }
  746. int stasis_app_get_debug_by_name(const char *app_name)
  747. {
  748. int debug_enabled = 0;
  749. if (global_debug) {
  750. debug_enabled = 1;
  751. } else {
  752. struct stasis_app *app = stasis_app_get_by_name(app_name);
  753. if (app) {
  754. if (app->debug) {
  755. debug_enabled = 1;
  756. }
  757. ao2_ref(app, -1);
  758. }
  759. }
  760. return debug_enabled;
  761. }
  762. void stasis_app_set_global_debug(int debug)
  763. {
  764. global_debug = debug;
  765. if (!global_debug) {
  766. struct ao2_container *app_names = stasis_app_get_all();
  767. struct ao2_iterator it_app_names;
  768. char *app_name;
  769. struct stasis_app *app;
  770. if (!app_names || !ao2_container_count(app_names)) {
  771. ao2_cleanup(app_names);
  772. return;
  773. }
  774. it_app_names = ao2_iterator_init(app_names, 0);
  775. while ((app_name = ao2_iterator_next(&it_app_names))) {
  776. if ((app = stasis_app_get_by_name(app_name))) {
  777. stasis_app_set_debug(app, 0);
  778. }
  779. ao2_cleanup(app_name);
  780. ao2_cleanup(app);
  781. }
  782. ao2_iterator_cleanup(&it_app_names);
  783. ao2_cleanup(app_names);
  784. }
  785. }
  786. struct stasis_app *app_create(const char *name, stasis_app_cb handler, void *data, enum stasis_app_subscription_model subscription_model)
  787. {
  788. RAII_VAR(struct stasis_app *, app, NULL, ao2_cleanup);
  789. size_t size;
  790. int res = 0;
  791. size_t context_size = strlen("stasis-") + strlen(name) + 1;
  792. char context_name[context_size];
  793. ast_assert(name != NULL);
  794. ast_assert(handler != NULL);
  795. ast_verb(1, "Creating Stasis app '%s'\n", name);
  796. size = sizeof(*app) + strlen(name) + 1;
  797. app = ao2_alloc_options(size, app_dtor, AO2_ALLOC_OPT_LOCK_MUTEX);
  798. if (!app) {
  799. return NULL;
  800. }
  801. app->subscription_model = subscription_model;
  802. app->forwards = ao2_container_alloc_rbtree(AO2_ALLOC_OPT_LOCK_MUTEX,
  803. AO2_CONTAINER_ALLOC_OPT_DUPS_OBJ_REJECT,
  804. forwards_sort, NULL);
  805. if (!app->forwards) {
  806. return NULL;
  807. }
  808. app->topic = stasis_topic_create(name);
  809. if (!app->topic) {
  810. return NULL;
  811. }
  812. app->bridge_router = stasis_message_router_create(ast_bridge_topic_all());
  813. if (!app->bridge_router) {
  814. return NULL;
  815. }
  816. res |= stasis_message_router_add(app->bridge_router,
  817. ast_bridge_merge_message_type(), bridge_merge_handler, app);
  818. res |= stasis_message_router_add(app->bridge_router,
  819. ast_blind_transfer_type(), bridge_blind_transfer_handler, app);
  820. res |= stasis_message_router_add(app->bridge_router,
  821. ast_attended_transfer_type(), bridge_attended_transfer_handler, app);
  822. res |= stasis_message_router_add(app->bridge_router,
  823. stasis_subscription_change_type(), bridge_subscription_change_handler, app);
  824. if (res != 0) {
  825. return NULL;
  826. }
  827. /* Bridge router holds a reference */
  828. ao2_ref(app, +1);
  829. app->router = stasis_message_router_create(app->topic);
  830. if (!app->router) {
  831. return NULL;
  832. }
  833. res |= stasis_message_router_add_cache_update(app->router,
  834. ast_bridge_snapshot_type(), sub_bridge_update_handler, app);
  835. res |= stasis_message_router_add_cache_update(app->router,
  836. ast_channel_snapshot_type(), sub_channel_update_handler, app);
  837. res |= stasis_message_router_add_cache_update(app->router,
  838. ast_endpoint_snapshot_type(), sub_endpoint_update_handler, app);
  839. res |= stasis_message_router_add(app->router,
  840. stasis_subscription_change_type(), sub_subscription_change_handler, app);
  841. stasis_message_router_set_formatters_default(app->router,
  842. sub_default_handler, app, STASIS_SUBSCRIPTION_FORMATTER_JSON);
  843. if (res != 0) {
  844. return NULL;
  845. }
  846. /* Router holds a reference */
  847. ao2_ref(app, +1);
  848. strncpy(app->name, name, size - sizeof(*app));
  849. app->handler = handler;
  850. app->data = ao2_bump(data);
  851. /* Create a context, a match-all extension, and a 'h' extension for this application. Note that
  852. * this should only be done if a context does not already exist. */
  853. strcpy(context_name, "stasis-");
  854. strcat(context_name, name);
  855. if (!ast_context_find(context_name)) {
  856. if (!ast_context_find_or_create(NULL, NULL, context_name, "res_stasis")) {
  857. ast_log(LOG_WARNING, "Could not create context '%s' for Stasis application '%s'\n", context_name, name);
  858. } else {
  859. ast_add_extension(context_name, 0, "_.", 1, NULL, NULL, "Stasis", ast_strdup(name), ast_free_ptr, "res_stasis");
  860. ast_add_extension(context_name, 0, "h", 1, NULL, NULL, "NoOp", NULL, NULL, "res_stasis");
  861. }
  862. } else {
  863. ast_log(LOG_WARNING, "Not creating context '%s' for Stasis application '%s' because it already exists\n",
  864. context_name, name);
  865. }
  866. ao2_ref(app, +1);
  867. return app;
  868. }
  869. struct stasis_topic *ast_app_get_topic(struct stasis_app *app)
  870. {
  871. return app->topic;
  872. }
  873. /*!
  874. * \brief Send a message to the given application.
  875. * \param app App to send the message to.
  876. * \param message Message to send.
  877. */
  878. void app_send(struct stasis_app *app, struct ast_json *message)
  879. {
  880. stasis_app_cb handler;
  881. char eid[20];
  882. void *data;
  883. if (ast_json_object_set(message, "asterisk_id", ast_json_string_create(
  884. ast_eid_to_str(eid, sizeof(eid), &ast_eid_default)))) {
  885. ast_log(AST_LOG_WARNING, "Failed to append EID to outgoing event %s\n",
  886. ast_json_string_get(ast_json_object_get(message, "type")));
  887. }
  888. /* Copy off mutable state with lock held */
  889. ao2_lock(app);
  890. handler = app->handler;
  891. data = ao2_bump(app->data);
  892. ao2_unlock(app);
  893. /* Name is immutable; no need to copy */
  894. if (handler) {
  895. handler(data, app->name, message);
  896. } else {
  897. ast_verb(3,
  898. "Inactive Stasis app '%s' missed message\n", app->name);
  899. }
  900. ao2_cleanup(data);
  901. }
  902. void app_deactivate(struct stasis_app *app)
  903. {
  904. ao2_lock(app);
  905. ast_verb(1, "Deactivating Stasis app '%s'\n", app->name);
  906. app->handler = NULL;
  907. ao2_cleanup(app->data);
  908. app->data = NULL;
  909. ao2_unlock(app);
  910. }
  911. void app_shutdown(struct stasis_app *app)
  912. {
  913. ao2_lock(app);
  914. ast_assert(app_is_finished(app));
  915. stasis_message_router_unsubscribe(app->router);
  916. app->router = NULL;
  917. stasis_message_router_unsubscribe(app->bridge_router);
  918. app->bridge_router = NULL;
  919. stasis_message_router_unsubscribe(app->endpoint_router);
  920. app->endpoint_router = NULL;
  921. ao2_unlock(app);
  922. }
  923. int app_is_active(struct stasis_app *app)
  924. {
  925. int ret;
  926. ao2_lock(app);
  927. ret = app->handler != NULL;
  928. ao2_unlock(app);
  929. return ret;
  930. }
  931. int app_is_finished(struct stasis_app *app)
  932. {
  933. int ret;
  934. ao2_lock(app);
  935. ret = app->handler == NULL && ao2_container_count(app->forwards) == 0;
  936. ao2_unlock(app);
  937. return ret;
  938. }
  939. void app_update(struct stasis_app *app, stasis_app_cb handler, void *data)
  940. {
  941. ao2_lock(app);
  942. if (app->handler && app->data) {
  943. struct ast_json *msg;
  944. ast_verb(1, "Replacing Stasis app '%s'\n", app->name);
  945. msg = ast_json_pack("{s: s, s: s}",
  946. "type", "ApplicationReplaced",
  947. "application", app->name);
  948. if (msg) {
  949. app_send(app, msg);
  950. ast_json_unref(msg);
  951. }
  952. } else {
  953. ast_verb(1, "Activating Stasis app '%s'\n", app->name);
  954. }
  955. app->handler = handler;
  956. ao2_replace(app->data, data);
  957. ao2_unlock(app);
  958. }
  959. const char *stasis_app_name(const struct stasis_app *app)
  960. {
  961. return app->name;
  962. }
  963. static int forwards_filter_by_type(void *obj, void *arg, int flags)
  964. {
  965. struct app_forwards *forward = obj;
  966. enum forward_type *forward_type = arg;
  967. if (forward->forward_type == *forward_type) {
  968. return CMP_MATCH;
  969. }
  970. return 0;
  971. }
  972. void stasis_app_to_cli(const struct stasis_app *app, struct ast_cli_args *a)
  973. {
  974. struct ao2_iterator *channels;
  975. struct ao2_iterator *endpoints;
  976. struct ao2_iterator *bridges;
  977. struct app_forwards *forward;
  978. enum forward_type forward_type;
  979. ast_cli(a->fd, "Name: %s\n"
  980. " Debug: %s\n"
  981. " Subscription Model: %s\n",
  982. app->name,
  983. app->debug ? "Yes" : "No",
  984. app->subscription_model == STASIS_APP_SUBSCRIBE_ALL ?
  985. "Global Resource Subscription" :
  986. "Application/Explicit Resource Subscription");
  987. ast_cli(a->fd, " Subscriptions: %d\n", ao2_container_count(app->forwards));
  988. ast_cli(a->fd, " Channels:\n");
  989. forward_type = FORWARD_CHANNEL;
  990. channels = ao2_callback(app->forwards, OBJ_MULTIPLE,
  991. forwards_filter_by_type, &forward_type);
  992. if (channels) {
  993. while ((forward = ao2_iterator_next(channels))) {
  994. ast_cli(a->fd, " %s (%d)\n", forward->id, forward->interested);
  995. ao2_ref(forward, -1);
  996. }
  997. ao2_iterator_destroy(channels);
  998. }
  999. ast_cli(a->fd, " Bridges:\n");
  1000. forward_type = FORWARD_BRIDGE;
  1001. bridges = ao2_callback(app->forwards, OBJ_MULTIPLE,
  1002. forwards_filter_by_type, &forward_type);
  1003. if (bridges) {
  1004. while ((forward = ao2_iterator_next(bridges))) {
  1005. ast_cli(a->fd, " %s (%d)\n", forward->id, forward->interested);
  1006. ao2_ref(forward, -1);
  1007. }
  1008. ao2_iterator_destroy(bridges);
  1009. }
  1010. ast_cli(a->fd, " Endpoints:\n");
  1011. forward_type = FORWARD_ENDPOINT;
  1012. endpoints = ao2_callback(app->forwards, OBJ_MULTIPLE,
  1013. forwards_filter_by_type, &forward_type);
  1014. if (endpoints) {
  1015. while ((forward = ao2_iterator_next(endpoints))) {
  1016. ast_cli(a->fd, " %s (%d)\n", forward->id, forward->interested);
  1017. ao2_ref(forward, -1);
  1018. }
  1019. ao2_iterator_destroy(endpoints);
  1020. }
  1021. }
  1022. struct ast_json *app_to_json(const struct stasis_app *app)
  1023. {
  1024. struct ast_json *json;
  1025. struct ast_json *channels;
  1026. struct ast_json *bridges;
  1027. struct ast_json *endpoints;
  1028. struct ao2_iterator i;
  1029. struct app_forwards *forwards;
  1030. json = ast_json_pack("{s: s, s: [], s: [], s: []}",
  1031. "name", app->name,
  1032. "channel_ids", "bridge_ids", "endpoint_ids");
  1033. if (!json) {
  1034. return NULL;
  1035. }
  1036. channels = ast_json_object_get(json, "channel_ids");
  1037. bridges = ast_json_object_get(json, "bridge_ids");
  1038. endpoints = ast_json_object_get(json, "endpoint_ids");
  1039. i = ao2_iterator_init(app->forwards, 0);
  1040. while ((forwards = ao2_iterator_next(&i))) {
  1041. struct ast_json *array = NULL;
  1042. int append_res;
  1043. switch (forwards->forward_type) {
  1044. case FORWARD_CHANNEL:
  1045. array = channels;
  1046. break;
  1047. case FORWARD_BRIDGE:
  1048. array = bridges;
  1049. break;
  1050. case FORWARD_ENDPOINT:
  1051. array = endpoints;
  1052. break;
  1053. }
  1054. /* If forward_type value is unexpected this will safely return an error. */
  1055. append_res = ast_json_array_append(array, ast_json_string_create(forwards->id));
  1056. ao2_ref(forwards, -1);
  1057. if (append_res != 0) {
  1058. ast_log(LOG_ERROR, "Error building response\n");
  1059. ao2_iterator_destroy(&i);
  1060. ast_json_unref(json);
  1061. return NULL;
  1062. }
  1063. }
  1064. ao2_iterator_destroy(&i);
  1065. return json;
  1066. }
  1067. int app_subscribe_channel(struct stasis_app *app, struct ast_channel *chan)
  1068. {
  1069. struct app_forwards *forwards;
  1070. if (!app) {
  1071. return -1;
  1072. }
  1073. ao2_lock(app->forwards);
  1074. /* If subscribed to all, don't subscribe again */
  1075. forwards = ao2_find(app->forwards, CHANNEL_ALL, OBJ_SEARCH_KEY | OBJ_NOLOCK);
  1076. if (forwards) {
  1077. ao2_unlock(app->forwards);
  1078. ao2_ref(forwards, -1);
  1079. return 0;
  1080. }
  1081. forwards = ao2_find(app->forwards,
  1082. chan ? ast_channel_uniqueid(chan) : CHANNEL_ALL,
  1083. OBJ_SEARCH_KEY | OBJ_NOLOCK);
  1084. if (!forwards) {
  1085. int res;
  1086. /* Forwards not found, create one */
  1087. forwards = forwards_create_channel(app, chan);
  1088. if (!forwards) {
  1089. ao2_unlock(app->forwards);
  1090. return -1;
  1091. }
  1092. res = ao2_link_flags(app->forwards, forwards, OBJ_NOLOCK);
  1093. if (!res) {
  1094. ao2_unlock(app->forwards);
  1095. ao2_ref(forwards, -1);
  1096. return -1;
  1097. }
  1098. }
  1099. ++forwards->interested;
  1100. ast_debug(3, "Channel '%s' is %d interested in %s\n",
  1101. chan ? ast_channel_uniqueid(chan) : "ALL",
  1102. forwards->interested,
  1103. app->name);
  1104. ao2_unlock(app->forwards);
  1105. ao2_ref(forwards, -1);
  1106. return 0;
  1107. }
  1108. static int subscribe_channel(struct stasis_app *app, void *obj)
  1109. {
  1110. return app_subscribe_channel(app, obj);
  1111. }
  1112. static int unsubscribe(struct stasis_app *app, const char *kind, const char *id, int terminate)
  1113. {
  1114. struct app_forwards *forwards;
  1115. if (!id) {
  1116. if (!strcmp(kind, "bridge")) {
  1117. id = BRIDGE_ALL;
  1118. } else if (!strcmp(kind, "channel")) {
  1119. id = CHANNEL_ALL;
  1120. } else if (!strcmp(kind, "endpoint")) {
  1121. id = ENDPOINT_ALL;
  1122. } else {
  1123. ast_log(LOG_WARNING, "Unknown subscription kind '%s'\n", kind);
  1124. return -1;
  1125. }
  1126. }
  1127. ao2_lock(app->forwards);
  1128. forwards = ao2_find(app->forwards, id, OBJ_SEARCH_KEY | OBJ_NOLOCK);
  1129. if (!forwards) {
  1130. ao2_unlock(app->forwards);
  1131. ast_debug(3, "App '%s' not subscribed to %s '%s'\n", app->name, kind, id);
  1132. return -1;
  1133. }
  1134. forwards->interested--;
  1135. ast_debug(3, "%s '%s': is %d interested in %s\n", kind, id, forwards->interested, app->name);
  1136. if (forwards->interested == 0 || terminate) {
  1137. /* No one is interested any more; unsubscribe */
  1138. ast_debug(3, "%s '%s' unsubscribed from %s\n", kind, id, app->name);
  1139. forwards_unsubscribe(forwards);
  1140. ao2_find(app->forwards, forwards,
  1141. OBJ_POINTER | OBJ_NOLOCK | OBJ_UNLINK |
  1142. OBJ_NODATA);
  1143. if (!strcmp(kind, "endpoint")) {
  1144. messaging_app_unsubscribe_endpoint(app->name, id);
  1145. }
  1146. }
  1147. ao2_unlock(app->forwards);
  1148. ao2_ref(forwards, -1);
  1149. return 0;
  1150. }
  1151. int app_unsubscribe_channel(struct stasis_app *app, struct ast_channel *chan)
  1152. {
  1153. if (!app) {
  1154. return -1;
  1155. }
  1156. return app_unsubscribe_channel_id(app, chan ? ast_channel_uniqueid(chan) : CHANNEL_ALL);
  1157. }
  1158. int app_unsubscribe_channel_id(struct stasis_app *app, const char *channel_id)
  1159. {
  1160. if (!app) {
  1161. return -1;
  1162. }
  1163. return unsubscribe(app, "channel", channel_id, 0);
  1164. }
  1165. int app_is_subscribed_channel_id(struct stasis_app *app, const char *channel_id)
  1166. {
  1167. struct app_forwards *forwards;
  1168. if (ast_strlen_zero(channel_id)) {
  1169. channel_id = CHANNEL_ALL;
  1170. }
  1171. forwards = ao2_find(app->forwards, channel_id, OBJ_SEARCH_KEY);
  1172. ao2_cleanup(forwards);
  1173. return forwards != NULL;
  1174. }
  1175. static void *channel_find(const struct stasis_app *app, const char *id)
  1176. {
  1177. return ast_channel_get_by_name(id);
  1178. }
  1179. struct stasis_app_event_source channel_event_source = {
  1180. .scheme = "channel:",
  1181. .find = channel_find,
  1182. .subscribe = subscribe_channel,
  1183. .unsubscribe = app_unsubscribe_channel_id,
  1184. .is_subscribed = app_is_subscribed_channel_id
  1185. };
  1186. int app_subscribe_bridge(struct stasis_app *app, struct ast_bridge *bridge)
  1187. {
  1188. struct app_forwards *forwards;
  1189. if (!app) {
  1190. return -1;
  1191. }
  1192. ao2_lock(app->forwards);
  1193. /* If subscribed to all, don't subscribe again */
  1194. forwards = ao2_find(app->forwards, BRIDGE_ALL, OBJ_SEARCH_KEY | OBJ_NOLOCK);
  1195. if (forwards) {
  1196. ao2_unlock(app->forwards);
  1197. ao2_ref(forwards, -1);
  1198. return 0;
  1199. }
  1200. forwards = ao2_find(app->forwards,
  1201. bridge ? bridge->uniqueid : BRIDGE_ALL,
  1202. OBJ_SEARCH_KEY | OBJ_NOLOCK);
  1203. if (!forwards) {
  1204. int res;
  1205. /* Forwards not found, create one */
  1206. forwards = forwards_create_bridge(app, bridge);
  1207. if (!forwards) {
  1208. ao2_unlock(app->forwards);
  1209. return -1;
  1210. }
  1211. res = ao2_link_flags(app->forwards, forwards, OBJ_NOLOCK);
  1212. if (!res) {
  1213. ao2_unlock(app->forwards);
  1214. ao2_ref(forwards, -1);
  1215. return -1;
  1216. }
  1217. }
  1218. ++forwards->interested;
  1219. ast_debug(3, "Bridge '%s' is %d interested in %s\n",
  1220. bridge ? bridge->uniqueid : "ALL",
  1221. forwards->interested,
  1222. app->name);
  1223. ao2_unlock(app->forwards);
  1224. ao2_ref(forwards, -1);
  1225. return 0;
  1226. }
  1227. static int subscribe_bridge(struct stasis_app *app, void *obj)
  1228. {
  1229. return app_subscribe_bridge(app, obj);
  1230. }
  1231. int app_unsubscribe_bridge(struct stasis_app *app, struct ast_bridge *bridge)
  1232. {
  1233. if (!app) {
  1234. return -1;
  1235. }
  1236. return app_unsubscribe_bridge_id(app, bridge ? bridge->uniqueid : BRIDGE_ALL);
  1237. }
  1238. int app_unsubscribe_bridge_id(struct stasis_app *app, const char *bridge_id)
  1239. {
  1240. if (!app) {
  1241. return -1;
  1242. }
  1243. return unsubscribe(app, "bridge", bridge_id, 0);
  1244. }
  1245. int app_is_subscribed_bridge_id(struct stasis_app *app, const char *bridge_id)
  1246. {
  1247. struct app_forwards *forwards;
  1248. if (ast_strlen_zero(bridge_id)) {
  1249. bridge_id = BRIDGE_ALL;
  1250. }
  1251. forwards = ao2_find(app->forwards, bridge_id, OBJ_SEARCH_KEY);
  1252. ao2_cleanup(forwards);
  1253. return forwards != NULL;
  1254. }
  1255. static void *bridge_find(const struct stasis_app *app, const char *id)
  1256. {
  1257. return stasis_app_bridge_find_by_id(id);
  1258. }
  1259. struct stasis_app_event_source bridge_event_source = {
  1260. .scheme = "bridge:",
  1261. .find = bridge_find,
  1262. .subscribe = subscribe_bridge,
  1263. .unsubscribe = app_unsubscribe_bridge_id,
  1264. .is_subscribed = app_is_subscribed_bridge_id
  1265. };
  1266. int app_subscribe_endpoint(struct stasis_app *app, struct ast_endpoint *endpoint)
  1267. {
  1268. struct app_forwards *forwards;
  1269. if (!app) {
  1270. return -1;
  1271. }
  1272. ao2_lock(app->forwards);
  1273. /* If subscribed to all, don't subscribe again */
  1274. forwards = ao2_find(app->forwards, ENDPOINT_ALL, OBJ_SEARCH_KEY | OBJ_NOLOCK);
  1275. if (forwards) {
  1276. ao2_unlock(app->forwards);
  1277. ao2_ref(forwards, -1);
  1278. return 0;
  1279. }
  1280. forwards = ao2_find(app->forwards,
  1281. endpoint ? ast_endpoint_get_id(endpoint) : ENDPOINT_ALL,
  1282. OBJ_SEARCH_KEY | OBJ_NOLOCK);
  1283. if (!forwards) {
  1284. int res;
  1285. /* Forwards not found, create one */
  1286. forwards = forwards_create_endpoint(app, endpoint);
  1287. if (!forwards) {
  1288. ao2_unlock(app->forwards);
  1289. return -1;
  1290. }
  1291. res = ao2_link_flags(app->forwards, forwards, OBJ_NOLOCK);
  1292. if (!res) {
  1293. ao2_unlock(app->forwards);
  1294. ao2_ref(forwards, -1);
  1295. return -1;
  1296. }
  1297. /* Subscribe for messages */
  1298. messaging_app_subscribe_endpoint(app->name, endpoint, &message_received_handler, app);
  1299. }
  1300. ++forwards->interested;
  1301. ast_debug(3, "Endpoint '%s' is %d interested in %s\n",
  1302. endpoint ? ast_endpoint_get_id(endpoint) : "ALL",
  1303. forwards->interested,
  1304. app->name);
  1305. ao2_unlock(app->forwards);
  1306. ao2_ref(forwards, -1);
  1307. return 0;
  1308. }
  1309. static int subscribe_endpoint(struct stasis_app *app, void *obj)
  1310. {
  1311. return app_subscribe_endpoint(app, obj);
  1312. }
  1313. int app_unsubscribe_endpoint_id(struct stasis_app *app, const char *endpoint_id)
  1314. {
  1315. if (!app) {
  1316. return -1;
  1317. }
  1318. return unsubscribe(app, "endpoint", endpoint_id, 0);
  1319. }
  1320. int app_is_subscribed_endpoint_id(struct stasis_app *app, const char *endpoint_id)
  1321. {
  1322. struct app_forwards *forwards;
  1323. if (ast_strlen_zero(endpoint_id)) {
  1324. endpoint_id = ENDPOINT_ALL;
  1325. }
  1326. forwards = ao2_find(app->forwards, endpoint_id, OBJ_SEARCH_KEY);
  1327. ao2_cleanup(forwards);
  1328. return forwards != NULL;
  1329. }
  1330. static void *endpoint_find(const struct stasis_app *app, const char *id)
  1331. {
  1332. return ast_endpoint_find_by_id(id);
  1333. }
  1334. struct stasis_app_event_source endpoint_event_source = {
  1335. .scheme = "endpoint:",
  1336. .find = endpoint_find,
  1337. .subscribe = subscribe_endpoint,
  1338. .unsubscribe = app_unsubscribe_endpoint_id,
  1339. .is_subscribed = app_is_subscribed_endpoint_id
  1340. };
  1341. void stasis_app_register_event_sources(void)
  1342. {
  1343. stasis_app_register_event_source(&channel_event_source);
  1344. stasis_app_register_event_source(&bridge_event_source);
  1345. stasis_app_register_event_source(&endpoint_event_source);
  1346. }
  1347. int stasis_app_is_core_event_source(struct stasis_app_event_source *obj)
  1348. {
  1349. return obj == &endpoint_event_source ||
  1350. obj == &bridge_event_source ||
  1351. obj == &channel_event_source;
  1352. }
  1353. void stasis_app_unregister_event_sources(void)
  1354. {
  1355. stasis_app_unregister_event_source(&endpoint_event_source);
  1356. stasis_app_unregister_event_source(&bridge_event_source);
  1357. stasis_app_unregister_event_source(&channel_event_source);
  1358. }