res_pjsip_transport_websocket.c 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539
  1. /*
  2. * Asterisk -- An open source telephony toolkit.
  3. *
  4. * Copyright (C) 2013, Digium, Inc.
  5. *
  6. * Jason Parker <jparker@digium.com>
  7. *
  8. * See http://www.asterisk.org for more information about
  9. * the Asterisk project. Please do not directly contact
  10. * any of the maintainers of this project for assistance;
  11. * the project provides a web site, mailing lists and IRC
  12. * channels for your use.
  13. *
  14. * This program is free software, distributed under the terms of
  15. * the GNU General Public License Version 2. See the LICENSE file
  16. * at the top of the source tree.
  17. */
  18. /*!
  19. * \brief WebSocket transport module
  20. */
  21. /*** MODULEINFO
  22. <depend>pjproject</depend>
  23. <depend>res_pjsip</depend>
  24. <depend>res_http_websocket</depend>
  25. <support_level>core</support_level>
  26. ***/
  27. #include "asterisk.h"
  28. #include <pjsip.h>
  29. #include <pjsip_ua.h>
  30. #include "asterisk/module.h"
  31. #include "asterisk/http_websocket.h"
  32. #include "asterisk/res_pjsip.h"
  33. #include "asterisk/res_pjsip_session.h"
  34. #include "asterisk/taskprocessor.h"
  35. static int transport_type_wss;
  36. static int transport_type_wss_ipv6;
  37. /*!
  38. * Used to ensure uniqueness among WS transport names
  39. */
  40. static int ws_obj_name_serial;
  41. /*!
  42. * \brief Wrapper for pjsip_transport, for storing the WebSocket session
  43. */
  44. struct ws_transport {
  45. pjsip_transport transport;
  46. pjsip_rx_data rdata;
  47. struct ast_websocket *ws_session;
  48. };
  49. /*!
  50. * \brief Send a message over the WebSocket connection.
  51. *
  52. * Called by pjsip transport manager.
  53. */
  54. static pj_status_t ws_send_msg(pjsip_transport *transport,
  55. pjsip_tx_data *tdata,
  56. const pj_sockaddr_t *rem_addr,
  57. int addr_len,
  58. void *token,
  59. pjsip_transport_callback callback)
  60. {
  61. struct ws_transport *wstransport = (struct ws_transport *)transport;
  62. uint64_t len = tdata->buf.cur - tdata->buf.start;
  63. if (ast_websocket_write(wstransport->ws_session, AST_WEBSOCKET_OPCODE_TEXT, tdata->buf.start, len)) {
  64. return PJ_EUNKNOWN;
  65. }
  66. return PJ_SUCCESS;
  67. }
  68. /*!
  69. * \brief Destroy the pjsip transport.
  70. *
  71. * Called by pjsip transport manager.
  72. */
  73. static pj_status_t ws_destroy(pjsip_transport *transport)
  74. {
  75. struct ws_transport *wstransport = (struct ws_transport *)transport;
  76. int fd = ast_websocket_fd(wstransport->ws_session);
  77. if (fd > 0) {
  78. ast_websocket_close(wstransport->ws_session, 1000);
  79. shutdown(fd, SHUT_RDWR);
  80. }
  81. ao2_ref(wstransport, -1);
  82. return PJ_SUCCESS;
  83. }
  84. static void transport_dtor(void *arg)
  85. {
  86. struct ws_transport *wstransport = arg;
  87. if (wstransport->ws_session) {
  88. ast_websocket_unref(wstransport->ws_session);
  89. }
  90. if (wstransport->transport.ref_cnt) {
  91. pj_atomic_destroy(wstransport->transport.ref_cnt);
  92. }
  93. if (wstransport->transport.lock) {
  94. pj_lock_destroy(wstransport->transport.lock);
  95. }
  96. if (wstransport->transport.endpt && wstransport->transport.pool) {
  97. pjsip_endpt_release_pool(wstransport->transport.endpt, wstransport->transport.pool);
  98. }
  99. if (wstransport->rdata.tp_info.pool) {
  100. pjsip_endpt_release_pool(wstransport->transport.endpt, wstransport->rdata.tp_info.pool);
  101. }
  102. }
  103. static int transport_shutdown(void *data)
  104. {
  105. struct ws_transport *wstransport = data;
  106. if (!wstransport->transport.is_shutdown && !wstransport->transport.is_destroying) {
  107. pjsip_transport_shutdown(&wstransport->transport);
  108. }
  109. /* Note that the destructor calls PJSIP functions,
  110. * therefore it must be called in a PJSIP thread.
  111. */
  112. ao2_ref(wstransport, -1);
  113. return 0;
  114. }
  115. struct transport_create_data {
  116. struct ws_transport *transport;
  117. struct ast_websocket *ws_session;
  118. };
  119. /*!
  120. * \brief Create a pjsip transport.
  121. */
  122. static int transport_create(void *data)
  123. {
  124. struct transport_create_data *create_data = data;
  125. struct ws_transport *newtransport = NULL;
  126. pjsip_tp_state_callback state_cb;
  127. pjsip_endpoint *endpt = ast_sip_get_pjsip_endpoint();
  128. struct pjsip_tpmgr *tpmgr = pjsip_endpt_get_tpmgr(endpt);
  129. char *ws_addr_str;
  130. pj_pool_t *pool;
  131. pj_str_t buf;
  132. pj_status_t status;
  133. newtransport = ao2_t_alloc_options(sizeof(*newtransport), transport_dtor,
  134. AO2_ALLOC_OPT_LOCK_NOLOCK, "pjsip websocket transport");
  135. if (!newtransport) {
  136. ast_log(LOG_ERROR, "Failed to allocate WebSocket transport.\n");
  137. goto on_error;
  138. }
  139. /* Give websocket transport a unique name for its lifetime */
  140. snprintf(newtransport->transport.obj_name, PJ_MAX_OBJ_NAME, "ws%p-%d",
  141. &newtransport->transport, ast_atomic_fetchadd_int(&ws_obj_name_serial, 1));
  142. newtransport->transport.endpt = endpt;
  143. if (!(pool = pjsip_endpt_create_pool(endpt, "ws", 512, 512))) {
  144. ast_log(LOG_ERROR, "Failed to allocate WebSocket endpoint pool.\n");
  145. goto on_error;
  146. }
  147. newtransport->transport.pool = pool;
  148. newtransport->ws_session = create_data->ws_session;
  149. /* Keep the session until transport dies */
  150. ast_websocket_ref(newtransport->ws_session);
  151. status = pj_atomic_create(pool, 0, &newtransport->transport.ref_cnt);
  152. if (status != PJ_SUCCESS) {
  153. goto on_error;
  154. }
  155. status = pj_lock_create_recursive_mutex(pool, pool->obj_name, &newtransport->transport.lock);
  156. if (status != PJ_SUCCESS) {
  157. goto on_error;
  158. }
  159. /*
  160. * The type_name here is mostly used by log messages eihter in
  161. * pjproject or Asterisk. Other places are reconstituting subscriptions
  162. * after a restart (which could never work for a websocket connection anyway),
  163. * received MESSAGE requests to set PJSIP_TRANSPORT, and most importantly
  164. * by pjproject when generating the Via header.
  165. */
  166. newtransport->transport.type_name = ast_websocket_is_secure(newtransport->ws_session)
  167. ? "WSS" : "WS";
  168. ws_addr_str = ast_sockaddr_stringify(ast_websocket_remote_address(newtransport->ws_session));
  169. ast_debug(4, "Creating websocket transport for %s:%s\n",
  170. newtransport->transport.type_name, ws_addr_str);
  171. newtransport->transport.info = (char *) pj_pool_alloc(newtransport->transport.pool,
  172. strlen(newtransport->transport.type_name) + strlen(ws_addr_str) + sizeof(" to "));
  173. sprintf(newtransport->transport.info, "%s to %s", newtransport->transport.type_name, ws_addr_str);
  174. pj_sockaddr_parse(pj_AF_UNSPEC(), 0, pj_cstr(&buf, ws_addr_str), &newtransport->transport.key.rem_addr);
  175. if (newtransport->transport.key.rem_addr.addr.sa_family == pj_AF_INET6()) {
  176. newtransport->transport.key.type = transport_type_wss_ipv6;
  177. } else {
  178. newtransport->transport.key.type = transport_type_wss;
  179. }
  180. newtransport->transport.addr_len = pj_sockaddr_get_len(&newtransport->transport.key.rem_addr);
  181. ws_addr_str = ast_sockaddr_stringify(ast_websocket_local_address(newtransport->ws_session));
  182. pj_sockaddr_parse(pj_AF_UNSPEC(), 0, pj_cstr(&buf, ws_addr_str), &newtransport->transport.local_addr);
  183. pj_strdup2(pool, &newtransport->transport.local_name.host, ast_sockaddr_stringify_host(ast_websocket_local_address(newtransport->ws_session)));
  184. newtransport->transport.local_name.port = ast_sockaddr_port(ast_websocket_local_address(newtransport->ws_session));
  185. newtransport->transport.flag = pjsip_transport_get_flag_from_type((pjsip_transport_type_e)newtransport->transport.key.type);
  186. newtransport->transport.dir = PJSIP_TP_DIR_INCOMING;
  187. newtransport->transport.tpmgr = tpmgr;
  188. newtransport->transport.send_msg = &ws_send_msg;
  189. newtransport->transport.destroy = &ws_destroy;
  190. status = pjsip_transport_register(newtransport->transport.tpmgr,
  191. (pjsip_transport *)newtransport);
  192. if (status != PJ_SUCCESS) {
  193. goto on_error;
  194. }
  195. /* Add a reference for pjsip transport manager */
  196. ao2_ref(newtransport, +1);
  197. newtransport->rdata.tp_info.transport = &newtransport->transport;
  198. newtransport->rdata.tp_info.pool = pjsip_endpt_create_pool(endpt, "rtd%p",
  199. PJSIP_POOL_RDATA_LEN, PJSIP_POOL_RDATA_INC);
  200. if (!newtransport->rdata.tp_info.pool) {
  201. ast_log(LOG_ERROR, "Failed to allocate WebSocket rdata.\n");
  202. pjsip_transport_destroy((pjsip_transport *)newtransport);
  203. goto on_error;
  204. }
  205. create_data->transport = newtransport;
  206. /* Notify application of transport state */
  207. state_cb = pjsip_tpmgr_get_state_cb(newtransport->transport.tpmgr);
  208. if (state_cb) {
  209. pjsip_transport_state_info state_info;
  210. memset(&state_info, 0, sizeof(state_info));
  211. state_cb(&newtransport->transport, PJSIP_TP_STATE_CONNECTED, &state_info);
  212. }
  213. return 0;
  214. on_error:
  215. ao2_cleanup(newtransport);
  216. return -1;
  217. }
  218. struct transport_read_data {
  219. struct ws_transport *transport;
  220. char *payload;
  221. uint64_t payload_len;
  222. };
  223. /*!
  224. * \brief Pass WebSocket data into pjsip transport manager.
  225. */
  226. static int transport_read(void *data)
  227. {
  228. struct transport_read_data *read_data = data;
  229. struct ws_transport *newtransport = read_data->transport;
  230. struct ast_websocket *session = newtransport->ws_session;
  231. pjsip_rx_data *rdata = &newtransport->rdata;
  232. int recvd;
  233. pj_str_t buf;
  234. int pjsip_pkt_len;
  235. pj_gettimeofday(&rdata->pkt_info.timestamp);
  236. pjsip_pkt_len = PJSIP_MAX_PKT_LEN < read_data->payload_len ? PJSIP_MAX_PKT_LEN : read_data->payload_len;
  237. pj_memcpy(rdata->pkt_info.packet, read_data->payload, pjsip_pkt_len);
  238. rdata->pkt_info.len = pjsip_pkt_len;
  239. rdata->pkt_info.zero = 0;
  240. pj_sockaddr_parse(pj_AF_UNSPEC(), 0, pj_cstr(&buf, ast_sockaddr_stringify(ast_websocket_remote_address(session))), &rdata->pkt_info.src_addr);
  241. rdata->pkt_info.src_addr_len = sizeof(rdata->pkt_info.src_addr);
  242. pj_ansi_strcpy(rdata->pkt_info.src_name, ast_sockaddr_stringify_addr(ast_websocket_remote_address(session)));
  243. rdata->pkt_info.src_port = ast_sockaddr_port(ast_websocket_remote_address(session));
  244. recvd = pjsip_tpmgr_receive_packet(rdata->tp_info.transport->tpmgr, rdata);
  245. pj_pool_reset(rdata->tp_info.pool);
  246. return (read_data->payload_len == recvd) ? 0 : -1;
  247. }
  248. static int get_write_timeout(void)
  249. {
  250. int write_timeout = -1;
  251. struct ao2_container *transport_states;
  252. transport_states = ast_sip_get_transport_states();
  253. if (transport_states) {
  254. struct ao2_iterator it_transport_states = ao2_iterator_init(transport_states, 0);
  255. struct ast_sip_transport_state *transport_state;
  256. for (; (transport_state = ao2_iterator_next(&it_transport_states)); ao2_cleanup(transport_state)) {
  257. struct ast_sip_transport *transport;
  258. if (transport_state->type != AST_TRANSPORT_WS && transport_state->type != AST_TRANSPORT_WSS) {
  259. continue;
  260. }
  261. transport = ast_sorcery_retrieve_by_id(ast_sip_get_sorcery(), "transport", transport_state->id);
  262. if (!transport) {
  263. continue;
  264. }
  265. ast_debug(5, "Found %s transport with write timeout: %d\n",
  266. transport->type == AST_TRANSPORT_WS ? "WS" : "WSS",
  267. transport->write_timeout);
  268. write_timeout = MAX(write_timeout, transport->write_timeout);
  269. }
  270. ao2_iterator_destroy(&it_transport_states);
  271. ao2_cleanup(transport_states);
  272. }
  273. if (write_timeout < 0) {
  274. write_timeout = AST_DEFAULT_WEBSOCKET_WRITE_TIMEOUT;
  275. }
  276. ast_debug(1, "Write timeout for WS/WSS transports: %d\n", write_timeout);
  277. return write_timeout;
  278. }
  279. static struct ast_taskprocessor *create_websocket_serializer(void)
  280. {
  281. char tps_name[AST_TASKPROCESSOR_MAX_NAME + 1];
  282. /* Create name with seq number appended. */
  283. ast_taskprocessor_build_name(tps_name, sizeof(tps_name), "pjsip/websocket");
  284. return ast_sip_create_serializer_named(tps_name);
  285. }
  286. /*! \brief WebSocket connection handler. */
  287. static void websocket_cb(struct ast_websocket *session, struct ast_variable *parameters, struct ast_variable *headers)
  288. {
  289. struct ast_taskprocessor *serializer;
  290. struct transport_create_data create_data;
  291. struct ws_transport *transport;
  292. struct transport_read_data read_data;
  293. if (ast_websocket_set_nonblock(session)) {
  294. ast_websocket_unref(session);
  295. return;
  296. }
  297. if (ast_websocket_set_timeout(session, get_write_timeout())) {
  298. ast_websocket_unref(session);
  299. return;
  300. }
  301. serializer = create_websocket_serializer();
  302. if (!serializer) {
  303. ast_websocket_unref(session);
  304. return;
  305. }
  306. create_data.ws_session = session;
  307. if (ast_sip_push_task_wait_serializer(serializer, transport_create, &create_data)) {
  308. ast_log(LOG_ERROR, "Could not create WebSocket transport.\n");
  309. ast_taskprocessor_unreference(serializer);
  310. ast_websocket_unref(session);
  311. return;
  312. }
  313. transport = create_data.transport;
  314. read_data.transport = transport;
  315. while (ast_wait_for_input(ast_websocket_fd(session), -1) > 0) {
  316. enum ast_websocket_opcode opcode;
  317. int fragmented;
  318. if (ast_websocket_read(session, &read_data.payload, &read_data.payload_len, &opcode, &fragmented)) {
  319. break;
  320. }
  321. if (opcode == AST_WEBSOCKET_OPCODE_TEXT || opcode == AST_WEBSOCKET_OPCODE_BINARY) {
  322. if (read_data.payload_len) {
  323. ast_sip_push_task_wait_serializer(serializer, transport_read, &read_data);
  324. }
  325. } else if (opcode == AST_WEBSOCKET_OPCODE_CLOSE) {
  326. break;
  327. }
  328. }
  329. ast_sip_push_task_wait_serializer(serializer, transport_shutdown, transport);
  330. ast_taskprocessor_unreference(serializer);
  331. ast_websocket_unref(session);
  332. }
  333. /*!
  334. * \brief Store the transport a message came in on, so it can be used for outbound messages to that contact.
  335. */
  336. static pj_bool_t websocket_on_rx_msg(pjsip_rx_data *rdata)
  337. {
  338. static const pj_str_t STR_WS = { "ws", 2 };
  339. pjsip_contact_hdr *contact;
  340. long type = rdata->tp_info.transport->key.type;
  341. if (type != (long) transport_type_wss && type != (long) transport_type_wss_ipv6) {
  342. return PJ_FALSE;
  343. }
  344. contact = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_CONTACT, NULL);
  345. if (contact
  346. && !contact->star
  347. && (PJSIP_URI_SCHEME_IS_SIP(contact->uri) || PJSIP_URI_SCHEME_IS_SIPS(contact->uri))) {
  348. pjsip_sip_uri *uri = pjsip_uri_get_uri(contact->uri);
  349. const pj_str_t *txp_str = &STR_WS;
  350. if (DEBUG_ATLEAST(4)) {
  351. char src_addr_buffer[AST_SOCKADDR_BUFLEN];
  352. const char *ipv6_s = "", *ipv6_e = "";
  353. if (pj_strchr(&uri->host, ':')) {
  354. ipv6_s = "[";
  355. ipv6_e = "]";
  356. }
  357. ast_log(LOG_DEBUG, "%s re-writing Contact URI from %s%.*s%s:%d%s%.*s to %s;transport=%s\n",
  358. pjsip_rx_data_get_info(rdata),
  359. ipv6_s, (int) pj_strlen(&uri->host), pj_strbuf(&uri->host), ipv6_e, uri->port,
  360. pj_strlen(&uri->transport_param) ? ";transport=" : "",
  361. (int) pj_strlen(&uri->transport_param), pj_strbuf(&uri->transport_param),
  362. pj_sockaddr_print(&rdata->pkt_info.src_addr, src_addr_buffer, sizeof(src_addr_buffer), 3),
  363. pj_strbuf(txp_str));
  364. }
  365. pj_cstr(&uri->host, rdata->pkt_info.src_name);
  366. uri->port = rdata->pkt_info.src_port;
  367. pj_strdup(rdata->tp_info.pool, &uri->transport_param, txp_str);
  368. }
  369. rdata->msg_info.via->rport_param = 0;
  370. return PJ_FALSE;
  371. }
  372. static pjsip_module websocket_module = {
  373. .name = { "WebSocket Transport Module", 26 },
  374. .id = -1,
  375. .priority = PJSIP_MOD_PRIORITY_TRANSPORT_LAYER,
  376. .on_rx_request = websocket_on_rx_msg,
  377. .on_rx_response = websocket_on_rx_msg,
  378. };
  379. /*! \brief Function called when an INVITE goes out */
  380. static void websocket_outgoing_invite_request(struct ast_sip_session *session, struct pjsip_tx_data *tdata)
  381. {
  382. if (session->inv_session->state == PJSIP_INV_STATE_NULL) {
  383. pjsip_dlg_add_usage(session->inv_session->dlg, &websocket_module, NULL);
  384. }
  385. }
  386. /*! \brief Supplement for adding Websocket functionality to dialog */
  387. static struct ast_sip_session_supplement websocket_supplement = {
  388. .method = "INVITE",
  389. .priority = AST_SIP_SUPPLEMENT_PRIORITY_FIRST + 1,
  390. .outgoing_request = websocket_outgoing_invite_request,
  391. };
  392. static int load_module(void)
  393. {
  394. CHECK_PJSIP_MODULE_LOADED();
  395. /*
  396. * We only need one transport type name (ws) defined. Firefox
  397. * and Chrome do not support anything other than secure websockets
  398. * anymore.
  399. *
  400. * Also we really cannot have two transports with the same name
  401. * and address family because it would be ambiguous. Outgoing
  402. * requests may try to find the transport by name and pjproject
  403. * only finds the first one registered.
  404. */
  405. pjsip_transport_register_type(PJSIP_TRANSPORT_RELIABLE | PJSIP_TRANSPORT_SECURE, "ws", 5060, &transport_type_wss);
  406. pjsip_transport_register_type(PJSIP_TRANSPORT_RELIABLE | PJSIP_TRANSPORT_SECURE | PJSIP_TRANSPORT_IPV6, "ws", 5060, &transport_type_wss_ipv6);
  407. if (ast_sip_register_service(&websocket_module) != PJ_SUCCESS) {
  408. return AST_MODULE_LOAD_DECLINE;
  409. }
  410. if (ast_sip_session_register_supplement(&websocket_supplement)) {
  411. ast_sip_unregister_service(&websocket_module);
  412. return AST_MODULE_LOAD_DECLINE;
  413. }
  414. if (ast_websocket_add_protocol("sip", websocket_cb)) {
  415. ast_sip_session_unregister_supplement(&websocket_supplement);
  416. ast_sip_unregister_service(&websocket_module);
  417. return AST_MODULE_LOAD_DECLINE;
  418. }
  419. return AST_MODULE_LOAD_SUCCESS;
  420. }
  421. static int unload_module(void)
  422. {
  423. ast_sip_unregister_service(&websocket_module);
  424. ast_sip_session_unregister_supplement(&websocket_supplement);
  425. ast_websocket_remove_protocol("sip", websocket_cb);
  426. return 0;
  427. }
  428. AST_MODULE_INFO(ASTERISK_GPL_KEY, AST_MODFLAG_LOAD_ORDER, "PJSIP WebSocket Transport Support",
  429. .support_level = AST_MODULE_SUPPORT_CORE,
  430. .load = load_module,
  431. .unload = unload_module,
  432. .load_pri = AST_MODPRI_APP_DEPEND,
  433. );