resource_events.c 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341
  1. /*
  2. * Asterisk -- An open source telephony toolkit.
  3. *
  4. * Copyright (C) 2012 - 2013, Digium, Inc.
  5. *
  6. * David M. Lee, II <dlee@digium.com>
  7. *
  8. * See http://www.asterisk.org for more information about
  9. * the Asterisk project. Please do not directly contact
  10. * any of the maintainers of this project for assistance;
  11. * the project provides a web site, mailing lists and IRC
  12. * channels for your use.
  13. *
  14. * This program is free software, distributed under the terms of
  15. * the GNU General Public License Version 2. See the LICENSE file
  16. * at the top of the source tree.
  17. */
  18. /*! \file
  19. *
  20. * \brief /api-docs/events.{format} implementation- WebSocket resource
  21. *
  22. * \author David M. Lee, II <dlee@digium.com>
  23. */
  24. /*** MODULEINFO
  25. <depend type="module">res_http_websocket</depend>
  26. <support_level>core</support_level>
  27. ***/
  28. #include "asterisk.h"
  29. ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
  30. #include "asterisk/astobj2.h"
  31. #include "asterisk/stasis_app.h"
  32. #include "resource_events.h"
  33. /*! Number of buckets for the Stasis application hash table. Remember to keep it
  34. * a prime number!
  35. */
  36. #define APPS_NUM_BUCKETS 7
  37. /*! \brief A connection to the event WebSocket */
  38. struct event_session {
  39. struct ast_ari_websocket_session *ws_session;
  40. struct ao2_container *websocket_apps;
  41. };
  42. /*!
  43. * \brief Explicitly shutdown a session.
  44. *
  45. * An explicit shutdown is necessary, since stasis-app has a reference to this
  46. * session. We also need to be sure to null out the \c ws_session field, since
  47. * the websocket is about to go away.
  48. *
  49. * \param session Session info struct.
  50. */
  51. static void session_shutdown(struct event_session *session)
  52. {
  53. struct ao2_iterator i;
  54. char *app;
  55. SCOPED_AO2LOCK(lock, session);
  56. i = ao2_iterator_init(session->websocket_apps, 0);
  57. while ((app = ao2_iterator_next(&i))) {
  58. stasis_app_unregister(app);
  59. ao2_cleanup(app);
  60. }
  61. ao2_iterator_destroy(&i);
  62. ao2_cleanup(session->websocket_apps);
  63. session->websocket_apps = NULL;
  64. session->ws_session = NULL;
  65. }
  66. static void session_dtor(void *obj)
  67. {
  68. #ifdef AST_DEVMODE /* Avoid unused variable warning */
  69. struct event_session *session = obj;
  70. #endif
  71. /* session_shutdown should have been called before */
  72. ast_assert(session->ws_session == NULL);
  73. ast_assert(session->websocket_apps == NULL);
  74. }
  75. static void session_cleanup(struct event_session *session)
  76. {
  77. session_shutdown(session);
  78. ao2_cleanup(session);
  79. }
  80. static struct event_session *session_create(
  81. struct ast_ari_websocket_session *ws_session)
  82. {
  83. RAII_VAR(struct event_session *, session, NULL, ao2_cleanup);
  84. session = ao2_alloc(sizeof(*session), session_dtor);
  85. session->ws_session = ws_session;
  86. session->websocket_apps =
  87. ast_str_container_alloc(APPS_NUM_BUCKETS);
  88. if (!session->websocket_apps) {
  89. return NULL;
  90. }
  91. ao2_ref(session, +1);
  92. return session;
  93. }
  94. /*!
  95. * \brief Callback handler for Stasis application messages.
  96. */
  97. static void app_handler(void *data, const char *app_name,
  98. struct ast_json *message)
  99. {
  100. struct event_session *session = data;
  101. int res;
  102. const char *msg_type = S_OR(
  103. ast_json_string_get(ast_json_object_get(message, "type")),
  104. "");
  105. const char *msg_application = S_OR(
  106. ast_json_string_get(ast_json_object_get(message, "application")),
  107. "");
  108. if (!session) {
  109. return;
  110. }
  111. /* Determine if we've been replaced */
  112. if (strcmp(msg_type, "ApplicationReplaced") == 0 &&
  113. strcmp(msg_application, app_name) == 0) {
  114. ao2_find(session->websocket_apps, msg_application,
  115. OBJ_UNLINK | OBJ_NODATA);
  116. }
  117. res = ast_json_object_set(message, "application",
  118. ast_json_string_create(app_name));
  119. if(res != 0) {
  120. return;
  121. }
  122. ao2_lock(session);
  123. if (session->ws_session) {
  124. if (stasis_app_get_debug_by_name(app_name)) {
  125. char *str = ast_json_dump_string_format(message, ast_ari_json_format());
  126. ast_verbose("<--- Sending ARI event to %s --->\n%s\n",
  127. ast_sockaddr_stringify(ast_ari_websocket_session_get_remote_addr(session->ws_session)),
  128. str);
  129. ast_json_free(str);
  130. }
  131. ast_ari_websocket_session_write(session->ws_session, message);
  132. }
  133. ao2_unlock(session);
  134. }
  135. /*!
  136. * \brief Register for all of the apps given.
  137. * \param session Session info struct.
  138. * \param app_name Name of application to register.
  139. * \param register_handler Pointer to the application registration handler
  140. */
  141. static int session_register_app(struct event_session *session,
  142. const char *app_name,
  143. int (* register_handler)(const char *, stasis_app_cb handler, void *data))
  144. {
  145. SCOPED_AO2LOCK(lock, session);
  146. ast_assert(session->ws_session != NULL);
  147. ast_assert(session->websocket_apps != NULL);
  148. if (ast_strlen_zero(app_name)) {
  149. return -1;
  150. }
  151. if (ast_str_container_add(session->websocket_apps, app_name)) {
  152. ast_ari_websocket_session_write(session->ws_session,
  153. ast_ari_oom_json());
  154. return -1;
  155. }
  156. register_handler(app_name, app_handler, session);
  157. return 0;
  158. }
  159. int ast_ari_websocket_events_event_websocket_attempted(struct ast_tcptls_session_instance *ser,
  160. struct ast_variable *headers,
  161. struct ast_ari_events_event_websocket_args *args)
  162. {
  163. int res = 0;
  164. size_t i, j;
  165. int (* register_handler)(const char *, stasis_app_cb handler, void *data);
  166. ast_debug(3, "/events WebSocket attempted\n");
  167. if (args->app_count == 0) {
  168. ast_http_error(ser, 400, "Bad Request", "Missing param 'app'");
  169. return -1;
  170. }
  171. if (args->subscribe_all) {
  172. register_handler = &stasis_app_register_all;
  173. } else {
  174. register_handler = &stasis_app_register;
  175. }
  176. for (i = 0; i < args->app_count; ++i) {
  177. if (ast_strlen_zero(args->app[i])) {
  178. res = -1;
  179. break;
  180. }
  181. res |= register_handler(args->app[i], app_handler, NULL);
  182. }
  183. if (res) {
  184. for (j = 0; j < i; ++j) {
  185. stasis_app_unregister(args->app[j]);
  186. }
  187. ast_http_error(ser, 400, "Bad Request", "Invalid application provided in param 'app'.");
  188. }
  189. return res;
  190. }
  191. void ast_ari_websocket_events_event_websocket_established(struct ast_ari_websocket_session *ws_session,
  192. struct ast_variable *headers,
  193. struct ast_ari_events_event_websocket_args *args)
  194. {
  195. RAII_VAR(struct event_session *, session, NULL, session_cleanup);
  196. struct ast_json *msg;
  197. int res;
  198. size_t i;
  199. int (* register_handler)(const char *, stasis_app_cb handler, void *data);
  200. ast_debug(3, "/events WebSocket connection\n");
  201. session = session_create(ws_session);
  202. if (!session) {
  203. ast_ari_websocket_session_write(ws_session, ast_ari_oom_json());
  204. return;
  205. }
  206. if (args->subscribe_all) {
  207. register_handler = &stasis_app_register_all;
  208. } else {
  209. register_handler = &stasis_app_register;
  210. }
  211. res = 0;
  212. for (i = 0; i < args->app_count; ++i) {
  213. if (ast_strlen_zero(args->app[i])) {
  214. continue;
  215. }
  216. res |= session_register_app(session, args->app[i], register_handler);
  217. }
  218. if (ao2_container_count(session->websocket_apps) == 0) {
  219. RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
  220. msg = ast_json_pack("{s: s, s: [s]}",
  221. "type", "MissingParams",
  222. "params", "app");
  223. if (!msg) {
  224. msg = ast_json_ref(ast_ari_oom_json());
  225. }
  226. ast_ari_websocket_session_write(session->ws_session, msg);
  227. return;
  228. }
  229. if (res != 0) {
  230. ast_ari_websocket_session_write(ws_session, ast_ari_oom_json());
  231. return;
  232. }
  233. /* We don't process any input, but we'll consume it waiting for EOF */
  234. while ((msg = ast_ari_websocket_session_read(ws_session))) {
  235. ast_json_unref(msg);
  236. }
  237. }
  238. void ast_ari_events_user_event(struct ast_variable *headers,
  239. struct ast_ari_events_user_event_args *args,
  240. struct ast_ari_response *response)
  241. {
  242. enum stasis_app_user_event_res res;
  243. struct ast_json *json_variables = NULL;
  244. if (args->variables) {
  245. ast_ari_events_user_event_parse_body(args->variables, args);
  246. json_variables = ast_json_object_get(args->variables, "variables");
  247. }
  248. if (ast_strlen_zero(args->application)) {
  249. ast_ari_response_error(response, 400, "Bad Request",
  250. "Missing parameter application");
  251. return;
  252. }
  253. res = stasis_app_user_event(args->application,
  254. args->event_name,
  255. args->source, args->source_count,
  256. json_variables);
  257. switch (res) {
  258. case STASIS_APP_USER_OK:
  259. ast_ari_response_no_content(response);
  260. break;
  261. case STASIS_APP_USER_APP_NOT_FOUND:
  262. ast_ari_response_error(response, 404, "Not Found",
  263. "Application not found");
  264. break;
  265. case STASIS_APP_USER_EVENT_SOURCE_NOT_FOUND:
  266. ast_ari_response_error(response, 422, "Unprocessable Entity",
  267. "Event source was not found");
  268. break;
  269. case STASIS_APP_USER_EVENT_SOURCE_BAD_SCHEME:
  270. ast_ari_response_error(response, 400, "Bad Request",
  271. "Invalid event source URI scheme");
  272. break;
  273. case STASIS_APP_USER_USEREVENT_INVALID:
  274. ast_ari_response_error(response, 400, "Bad Request",
  275. "Invalid userevnet data");
  276. break;
  277. case STASIS_APP_USER_INTERNAL_ERROR:
  278. default:
  279. ast_ari_response_error(response, 500, "Internal Server Error",
  280. "Error processing request");
  281. }
  282. }