stasis_endpoints.c 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476
  1. /*
  2. * Asterisk -- An open source telephony toolkit.
  3. *
  4. * Copyright (C) 2013, Digium, Inc.
  5. *
  6. * David M. Lee, II <dlee@digium.com>
  7. *
  8. * See http://www.asterisk.org for more information about
  9. * the Asterisk project. Please do not directly contact
  10. * any of the maintainers of this project for assistance;
  11. * the project provides a web site, mailing lists and IRC
  12. * channels for your use.
  13. *
  14. * This program is free software, distributed under the terms of
  15. * the GNU General Public License Version 2. See the LICENSE file
  16. * at the top of the source tree.
  17. */
  18. /*! \file
  19. *
  20. * \brief Stasis endpoint API.
  21. *
  22. * \author David M. Lee, II <dlee@digium.com>
  23. */
  24. /*** MODULEINFO
  25. <support_level>core</support_level>
  26. ***/
  27. #include "asterisk.h"
  28. ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
  29. #include "asterisk/astobj2.h"
  30. #include "asterisk/stasis.h"
  31. #include "asterisk/stasis_endpoints.h"
  32. /*** DOCUMENTATION
  33. <managerEvent language="en_US" name="PeerStatus">
  34. <managerEventInstance class="EVENT_FLAG_SYSTEM">
  35. <synopsis>Raised when the state of a peer changes.</synopsis>
  36. <syntax>
  37. <parameter name="ChannelType">
  38. <para>The channel technology of the peer.</para>
  39. </parameter>
  40. <parameter name="Peer">
  41. <para>The name of the peer (including channel technology).</para>
  42. </parameter>
  43. <parameter name="PeerStatus">
  44. <para>New status of the peer.</para>
  45. <enumlist>
  46. <enum name="Unknown"/>
  47. <enum name="Registered"/>
  48. <enum name="Unregistered"/>
  49. <enum name="Rejected"/>
  50. <enum name="Lagged"/>
  51. </enumlist>
  52. </parameter>
  53. <parameter name="Cause">
  54. <para>The reason the status has changed.</para>
  55. </parameter>
  56. <parameter name="Address">
  57. <para>New address of the peer.</para>
  58. </parameter>
  59. <parameter name="Port">
  60. <para>New port for the peer.</para>
  61. </parameter>
  62. <parameter name="Time">
  63. <para>Time it takes to reach the peer and receive a response.</para>
  64. </parameter>
  65. </syntax>
  66. </managerEventInstance>
  67. </managerEvent>
  68. <managerEvent language="en_US" name="ContactStatus">
  69. <managerEventInstance class="EVENT_FLAG_SYSTEM">
  70. <synopsis>Raised when the state of a contact changes.</synopsis>
  71. <syntax>
  72. <parameter name="URI">
  73. <para>This contact's URI.</para>
  74. </parameter>
  75. <parameter name="ContactStatus">
  76. <para>New status of the contact.</para>
  77. <enumlist>
  78. <enum name="Unknown"/>
  79. <enum name="Unreachable"/>
  80. <enum name="Reachable"/>
  81. <enum name="Created"/>
  82. <enum name="Removed"/>
  83. <enum name="Updated"/>
  84. </enumlist>
  85. </parameter>
  86. <parameter name="AOR">
  87. <para>The name of the associated aor.</para>
  88. </parameter>
  89. <parameter name="EndpointName">
  90. <para>The name of the associated endpoint.</para>
  91. </parameter>
  92. <parameter name="RoundtripUsec">
  93. <para>The RTT measured during the last qualify.</para>
  94. </parameter>
  95. </syntax>
  96. </managerEventInstance>
  97. </managerEvent>
  98. ***/
  99. static struct stasis_cp_all *endpoint_cache_all;
  100. struct stasis_cp_all *ast_endpoint_cache_all(void)
  101. {
  102. return endpoint_cache_all;
  103. }
  104. struct stasis_cache *ast_endpoint_cache(void)
  105. {
  106. return stasis_cp_all_cache(endpoint_cache_all);
  107. }
  108. struct stasis_topic *ast_endpoint_topic_all(void)
  109. {
  110. return stasis_cp_all_topic(endpoint_cache_all);
  111. }
  112. struct stasis_topic *ast_endpoint_topic_all_cached(void)
  113. {
  114. return stasis_cp_all_topic_cached(endpoint_cache_all);
  115. }
  116. STASIS_MESSAGE_TYPE_DEFN(ast_endpoint_snapshot_type);
  117. static struct ast_manager_event_blob *peerstatus_to_ami(struct stasis_message *msg)
  118. {
  119. struct ast_endpoint_blob *obj = stasis_message_data(msg);
  120. RAII_VAR(struct ast_str *, peerstatus_event_string, ast_str_create(64), ast_free);
  121. const char *value;
  122. /* peer_status is the only *required* thing */
  123. if (!(value = ast_json_string_get(ast_json_object_get(obj->blob, "peer_status")))) {
  124. return NULL;
  125. }
  126. ast_str_append(&peerstatus_event_string, 0, "PeerStatus: %s\r\n", value);
  127. if ((value = ast_json_string_get(ast_json_object_get(obj->blob, "cause")))) {
  128. ast_str_append(&peerstatus_event_string, 0, "Cause: %s\r\n", value);
  129. }
  130. if ((value = ast_json_string_get(ast_json_object_get(obj->blob, "address")))) {
  131. ast_str_append(&peerstatus_event_string, 0, "Address: %s\r\n", value);
  132. }
  133. if ((value = ast_json_string_get(ast_json_object_get(obj->blob, "port")))) {
  134. ast_str_append(&peerstatus_event_string, 0, "Port: %s\r\n", value);
  135. }
  136. if ((value = ast_json_string_get(ast_json_object_get(obj->blob, "time")))) {
  137. ast_str_append(&peerstatus_event_string, 0, "Time: %s\r\n", value);
  138. }
  139. return ast_manager_event_blob_create(EVENT_FLAG_SYSTEM, "PeerStatus",
  140. "ChannelType: %s\r\n"
  141. "Peer: %s/%s\r\n"
  142. "%s",
  143. obj->snapshot->tech,
  144. obj->snapshot->tech,
  145. obj->snapshot->resource,
  146. ast_str_buffer(peerstatus_event_string));
  147. }
  148. static struct ast_json *peerstatus_to_json(struct stasis_message *msg, const struct stasis_message_sanitizer *sanitize)
  149. {
  150. struct ast_endpoint_blob *obj = stasis_message_data(msg);
  151. struct ast_json *json_endpoint;
  152. struct ast_json *json_peer;
  153. struct ast_json *json_final;
  154. const struct timeval *tv = stasis_message_timestamp(msg);
  155. json_endpoint = ast_endpoint_snapshot_to_json(obj->snapshot, NULL);
  156. if (!json_endpoint) {
  157. return NULL;
  158. }
  159. json_peer = ast_json_object_create();
  160. if (!json_peer) {
  161. ast_json_unref(json_endpoint);
  162. return NULL;
  163. }
  164. /* Copy all fields from the blob */
  165. ast_json_object_update(json_peer, obj->blob);
  166. json_final = ast_json_pack("{s: s, s: o, s: o, s: o }",
  167. "type", "PeerStatusChange",
  168. "timestamp", ast_json_timeval(*tv, NULL),
  169. "endpoint", json_endpoint,
  170. "peer", json_peer);
  171. if (!json_final) {
  172. ast_json_unref(json_endpoint);
  173. ast_json_unref(json_peer);
  174. }
  175. return json_final;
  176. }
  177. STASIS_MESSAGE_TYPE_DEFN(ast_endpoint_state_type,
  178. .to_ami = peerstatus_to_ami,
  179. .to_json = peerstatus_to_json,
  180. );
  181. static struct ast_manager_event_blob *contactstatus_to_ami(struct stasis_message *msg)
  182. {
  183. struct ast_endpoint_blob *obj = stasis_message_data(msg);
  184. RAII_VAR(struct ast_str *, contactstatus_event_string, ast_str_create(64), ast_free);
  185. const char *value;
  186. if (!(value = ast_json_string_get(ast_json_object_get(obj->blob, "uri")))) {
  187. return NULL;
  188. }
  189. ast_str_append(&contactstatus_event_string, 0, "URI: %s\r\n", value);
  190. if (!(value = ast_json_string_get(ast_json_object_get(obj->blob, "contact_status")))) {
  191. return NULL;
  192. }
  193. ast_str_append(&contactstatus_event_string, 0, "ContactStatus: %s\r\n", value);
  194. if (!(value = ast_json_string_get(ast_json_object_get(obj->blob, "aor")))) {
  195. return NULL;
  196. }
  197. ast_str_append(&contactstatus_event_string, 0, "AOR: %s\r\n", value);
  198. if (!(value = ast_json_string_get(ast_json_object_get(obj->blob, "endpoint_name")))) {
  199. return NULL;
  200. }
  201. ast_str_append(&contactstatus_event_string, 0, "EndpointName: %s\r\n", value);
  202. if ((value = ast_json_string_get(ast_json_object_get(obj->blob, "roundtrip_usec")))) {
  203. ast_str_append(&contactstatus_event_string, 0, "RoundtripUsec: %s\r\n", value);
  204. }
  205. return ast_manager_event_blob_create(EVENT_FLAG_SYSTEM, "ContactStatus",
  206. "%s", ast_str_buffer(contactstatus_event_string));
  207. }
  208. static struct ast_json *contactstatus_to_json(struct stasis_message *msg, const struct stasis_message_sanitizer *sanitize)
  209. {
  210. struct ast_endpoint_blob *obj = stasis_message_data(msg);
  211. struct ast_json *json_endpoint;
  212. struct ast_json *json_final;
  213. const char *rtt;
  214. const struct timeval *tv = stasis_message_timestamp(msg);
  215. json_endpoint = ast_endpoint_snapshot_to_json(obj->snapshot, NULL);
  216. if (!json_endpoint) {
  217. return NULL;
  218. }
  219. /* The roundtrip time is optional. */
  220. rtt = ast_json_string_get(ast_json_object_get(obj->blob, "roundtrip_usec"));
  221. if (!ast_strlen_zero(rtt)) {
  222. json_final = ast_json_pack("{s: s, s: o, s: o, s: { s: s, s: s, s: s, s: s } } ",
  223. "type", "ContactStatusChange",
  224. "timestamp", ast_json_timeval(*tv, NULL),
  225. "endpoint", json_endpoint,
  226. "contact_info",
  227. "uri", ast_json_string_get(ast_json_object_get(obj->blob, "uri")),
  228. "contact_status", ast_json_string_get(ast_json_object_get(obj->blob,
  229. "contact_status")),
  230. "aor", ast_json_string_get(ast_json_object_get(obj->blob, "aor")),
  231. "roundtrip_usec", rtt);
  232. } else {
  233. json_final = ast_json_pack("{s: s, s: o, s: o, s: { s: s, s: s, s: s } } ",
  234. "type", "ContactStatusChange",
  235. "timestamp", ast_json_timeval(*tv, NULL),
  236. "endpoint", json_endpoint,
  237. "contact_info",
  238. "uri", ast_json_string_get(ast_json_object_get(obj->blob, "uri")),
  239. "contact_status", ast_json_string_get(ast_json_object_get(obj->blob,
  240. "contact_status")),
  241. "aor", ast_json_string_get(ast_json_object_get(obj->blob, "aor")));
  242. }
  243. if (!json_final) {
  244. ast_json_unref(json_endpoint);
  245. }
  246. return json_final;
  247. }
  248. STASIS_MESSAGE_TYPE_DEFN(ast_endpoint_contact_state_type,
  249. .to_ami = contactstatus_to_ami,
  250. .to_json = contactstatus_to_json
  251. );
  252. static void endpoint_blob_dtor(void *obj)
  253. {
  254. struct ast_endpoint_blob *event = obj;
  255. ao2_cleanup(event->snapshot);
  256. ast_json_unref(event->blob);
  257. }
  258. struct stasis_message *ast_endpoint_blob_create(struct ast_endpoint *endpoint,
  259. struct stasis_message_type *type, struct ast_json *blob)
  260. {
  261. struct ast_endpoint_blob *obj;
  262. struct stasis_message *msg;
  263. if (!type) {
  264. return NULL;
  265. }
  266. if (!blob) {
  267. blob = ast_json_null();
  268. }
  269. if (!(obj = ao2_alloc(sizeof(*obj), endpoint_blob_dtor))) {
  270. return NULL;
  271. }
  272. if (endpoint) {
  273. if (!(obj->snapshot = ast_endpoint_snapshot_create(endpoint))) {
  274. ao2_ref(obj, -1);
  275. return NULL;
  276. }
  277. }
  278. obj->blob = ast_json_ref(blob);
  279. msg = stasis_message_create(type, obj);
  280. ao2_ref(obj, -1);
  281. return msg;
  282. }
  283. void ast_endpoint_blob_publish(struct ast_endpoint *endpoint, struct stasis_message_type *type,
  284. struct ast_json *blob)
  285. {
  286. struct stasis_message *message;
  287. if (!blob) {
  288. return;
  289. }
  290. message = ast_endpoint_blob_create(endpoint, type, blob);
  291. if (message) {
  292. stasis_publish(ast_endpoint_topic(endpoint), message);
  293. ao2_ref(message, -1);
  294. }
  295. }
  296. struct ast_endpoint_snapshot *ast_endpoint_latest_snapshot(const char *tech,
  297. const char *name)
  298. {
  299. char *id = NULL;
  300. struct stasis_message *msg;
  301. struct ast_endpoint_snapshot *snapshot;
  302. if (ast_strlen_zero(name)) {
  303. ast_asprintf(&id, "%s", tech);
  304. } else {
  305. ast_asprintf(&id, "%s/%s", tech, name);
  306. }
  307. if (!id) {
  308. return NULL;
  309. }
  310. ast_tech_to_upper(id);
  311. msg = stasis_cache_get(ast_endpoint_cache(), ast_endpoint_snapshot_type(), id);
  312. ast_free(id);
  313. if (!msg) {
  314. return NULL;
  315. }
  316. snapshot = stasis_message_data(msg);
  317. ast_assert(snapshot != NULL);
  318. ao2_ref(snapshot, +1);
  319. ao2_ref(msg, -1);
  320. return snapshot;
  321. }
  322. /*!
  323. * \brief Callback extract a unique identity from a snapshot message.
  324. *
  325. * This identity is unique to the underlying object of the snapshot, such as the
  326. * UniqueId field of a channel.
  327. *
  328. * \param message Message to extract id from.
  329. * \return String representing the snapshot's id.
  330. * \return \c NULL if the message_type of the message isn't a handled snapshot.
  331. * \since 12
  332. */
  333. static const char *endpoint_snapshot_get_id(struct stasis_message *message)
  334. {
  335. struct ast_endpoint_snapshot *snapshot;
  336. if (ast_endpoint_snapshot_type() != stasis_message_type(message)) {
  337. return NULL;
  338. }
  339. snapshot = stasis_message_data(message);
  340. return snapshot->id;
  341. }
  342. struct ast_json *ast_endpoint_snapshot_to_json(
  343. const struct ast_endpoint_snapshot *snapshot,
  344. const struct stasis_message_sanitizer *sanitize)
  345. {
  346. struct ast_json *json;
  347. struct ast_json *channel_array;
  348. int i;
  349. json = ast_json_pack("{s: s, s: s, s: s, s: []}",
  350. "technology", snapshot->tech,
  351. "resource", snapshot->resource,
  352. "state", ast_endpoint_state_to_string(snapshot->state),
  353. "channel_ids");
  354. if (json == NULL) {
  355. return NULL;
  356. }
  357. if (snapshot->max_channels != -1) {
  358. int res = ast_json_object_set(json, "max_channels",
  359. ast_json_integer_create(snapshot->max_channels));
  360. if (res != 0) {
  361. ast_json_unref(json);
  362. return NULL;
  363. }
  364. }
  365. channel_array = ast_json_object_get(json, "channel_ids");
  366. ast_assert(channel_array != NULL);
  367. for (i = 0; i < snapshot->num_channels; ++i) {
  368. int res;
  369. if (sanitize && sanitize->channel_id
  370. && sanitize->channel_id(snapshot->channel_ids[i])) {
  371. continue;
  372. }
  373. res = ast_json_array_append(channel_array,
  374. ast_json_string_create(snapshot->channel_ids[i]));
  375. if (res != 0) {
  376. ast_json_unref(json);
  377. return NULL;
  378. }
  379. }
  380. return json;
  381. }
  382. static void endpoints_stasis_cleanup(void)
  383. {
  384. STASIS_MESSAGE_TYPE_CLEANUP(ast_endpoint_snapshot_type);
  385. STASIS_MESSAGE_TYPE_CLEANUP(ast_endpoint_state_type);
  386. STASIS_MESSAGE_TYPE_CLEANUP(ast_endpoint_contact_state_type);
  387. ao2_cleanup(endpoint_cache_all);
  388. endpoint_cache_all = NULL;
  389. }
  390. int ast_endpoint_stasis_init(void)
  391. {
  392. int res = 0;
  393. ast_register_cleanup(endpoints_stasis_cleanup);
  394. endpoint_cache_all = stasis_cp_all_create("endpoint_topic_all",
  395. endpoint_snapshot_get_id);
  396. if (!endpoint_cache_all) {
  397. return -1;
  398. }
  399. res |= STASIS_MESSAGE_TYPE_INIT(ast_endpoint_snapshot_type);
  400. res |= STASIS_MESSAGE_TYPE_INIT(ast_endpoint_state_type);
  401. res |= STASIS_MESSAGE_TYPE_INIT(ast_endpoint_contact_state_type);
  402. return res;
  403. }