res_stasis_device_state.c 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491
  1. /*
  2. * Asterisk -- An open source telephony toolkit.
  3. *
  4. * Copyright (C) 2013, Digium, Inc.
  5. *
  6. * Kevin Harwell <kharwell@digium.com>
  7. *
  8. * See http://www.asterisk.org for more information about
  9. * the Asterisk project. Please do not directly contact
  10. * any of the maintainers of this project for assistance;
  11. * the project provides a web site, mailing lists and IRC
  12. * channels for your use.
  13. *
  14. * This program is free software, distributed under the terms of
  15. * the GNU General Public License Version 2. See the LICENSE file
  16. * at the top of the source tree.
  17. */
  18. /*** MODULEINFO
  19. <depend type="module">res_stasis</depend>
  20. <support_level>core</support_level>
  21. ***/
  22. #include "asterisk.h"
  23. ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
  24. #include "asterisk/astdb.h"
  25. #include "asterisk/astobj2.h"
  26. #include "asterisk/module.h"
  27. #include "asterisk/stasis_app_impl.h"
  28. #include "asterisk/stasis_app_device_state.h"
  29. #define DEVICE_STATE_SIZE 64
  30. /*! astdb family name */
  31. #define DEVICE_STATE_FAMILY "StasisDeviceState"
  32. /*! Stasis device state provider */
  33. #define DEVICE_STATE_PROVIDER_STASIS "Stasis"
  34. /*! Scheme for custom device states */
  35. #define DEVICE_STATE_SCHEME_STASIS "Stasis:"
  36. /*! Scheme for device state subscriptions */
  37. #define DEVICE_STATE_SCHEME_SUB "deviceState:"
  38. /*! Number of hash buckets for device state subscriptions */
  39. #define DEVICE_STATE_BUCKETS 37
  40. /*! The key used for tracking a subscription to all device states */
  41. #define DEVICE_STATE_ALL "__AST_DEVICE_STATE_ALL_TOPIC"
  42. /*! Container for subscribed device states */
  43. static struct ao2_container *device_state_subscriptions;
  44. /*!
  45. * \brief Device state subscription object.
  46. */
  47. struct device_state_subscription {
  48. AST_DECLARE_STRING_FIELDS(
  49. AST_STRING_FIELD(app_name);
  50. AST_STRING_FIELD(device_name);
  51. );
  52. /*! The subscription object */
  53. struct stasis_subscription *sub;
  54. };
  55. static int device_state_subscriptions_hash(const void *obj, const int flags)
  56. {
  57. const struct device_state_subscription *object;
  58. switch (flags & OBJ_SEARCH_MASK) {
  59. case OBJ_SEARCH_OBJECT:
  60. object = obj;
  61. return ast_str_hash(object->device_name);
  62. case OBJ_SEARCH_KEY:
  63. default:
  64. /* Hash can only work on something with a full key. */
  65. ast_assert(0);
  66. return 0;
  67. }
  68. }
  69. static int device_state_subscriptions_cmp(void *obj, void *arg, int flags)
  70. {
  71. const struct device_state_subscription *object_left = obj;
  72. const struct device_state_subscription *object_right = arg;
  73. int cmp;
  74. switch (flags & OBJ_SEARCH_MASK) {
  75. case OBJ_SEARCH_OBJECT:
  76. /* find objects matching both device and app names */
  77. if (strcmp(object_left->device_name,
  78. object_right->device_name)) {
  79. return 0;
  80. }
  81. cmp = strcmp(object_left->app_name, object_right->app_name);
  82. break;
  83. case OBJ_SEARCH_KEY:
  84. case OBJ_SEARCH_PARTIAL_KEY:
  85. ast_assert(0); /* not supported by container */
  86. /* fall through */
  87. default:
  88. cmp = 0;
  89. break;
  90. }
  91. return cmp ? 0 : CMP_MATCH | CMP_STOP;
  92. }
  93. static void device_state_subscription_destroy(void *obj)
  94. {
  95. struct device_state_subscription *sub = obj;
  96. ast_string_field_free_memory(sub);
  97. }
  98. static struct device_state_subscription *device_state_subscription_create(
  99. const struct stasis_app *app, const char *device_name)
  100. {
  101. struct device_state_subscription *sub;
  102. const char *app_name = stasis_app_name(app);
  103. size_t size;
  104. if (ast_strlen_zero(device_name)) {
  105. device_name = DEVICE_STATE_ALL;
  106. }
  107. size = strlen(device_name) + strlen(app_name) + 2;
  108. sub = ao2_alloc(sizeof(*sub), device_state_subscription_destroy);
  109. if (!sub) {
  110. return NULL;
  111. }
  112. if (ast_string_field_init(sub, size)) {
  113. ao2_ref(sub, -1);
  114. return NULL;
  115. }
  116. ast_string_field_set(sub, app_name, app_name);
  117. ast_string_field_set(sub, device_name, device_name);
  118. return sub;
  119. }
  120. static struct device_state_subscription *find_device_state_subscription(
  121. struct stasis_app *app, const char *name)
  122. {
  123. struct device_state_subscription dummy_sub = {
  124. .app_name = stasis_app_name(app),
  125. .device_name = name
  126. };
  127. return ao2_find(device_state_subscriptions, &dummy_sub, OBJ_SEARCH_OBJECT | OBJ_NOLOCK);
  128. }
  129. static void remove_device_state_subscription(
  130. struct device_state_subscription *sub)
  131. {
  132. if (sub->sub) {
  133. sub->sub = stasis_unsubscribe_and_join(sub->sub);
  134. }
  135. ao2_unlink_flags(device_state_subscriptions, sub, OBJ_NOLOCK);
  136. }
  137. struct ast_json *stasis_app_device_state_to_json(
  138. const char *name, enum ast_device_state state)
  139. {
  140. return ast_json_pack("{s: s, s: s}",
  141. "name", name,
  142. "state", ast_devstate_str(state));
  143. }
  144. struct ast_json *stasis_app_device_states_to_json(void)
  145. {
  146. struct ast_json *array = ast_json_array_create();
  147. struct ast_db_entry *tree;
  148. struct ast_db_entry *entry;
  149. tree = ast_db_gettree(DEVICE_STATE_FAMILY, NULL);
  150. for (entry = tree; entry; entry = entry->next) {
  151. const char *name = strrchr(entry->key, '/');
  152. if (!ast_strlen_zero(name)) {
  153. char device[DEVICE_STATE_SIZE];
  154. snprintf(device, sizeof(device), "%s%s", DEVICE_STATE_SCHEME_STASIS, ++name);
  155. ast_json_array_append(array,
  156. stasis_app_device_state_to_json(device, ast_device_state(device)));
  157. }
  158. }
  159. ast_db_freetree(tree);
  160. return array;
  161. }
  162. static void send_device_state(struct device_state_subscription *sub,
  163. const char *name, enum ast_device_state state)
  164. {
  165. RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
  166. json = ast_json_pack("{s:s, s:s, s:o, s:o}",
  167. "type", "DeviceStateChanged",
  168. "application", sub->app_name,
  169. "timestamp", ast_json_timeval(ast_tvnow(), NULL),
  170. "device_state", stasis_app_device_state_to_json(
  171. name, state));
  172. if (!json) {
  173. ast_log(LOG_ERROR, "Unable to create device state json object\n");
  174. return;
  175. }
  176. stasis_app_send(sub->app_name, json);
  177. }
  178. enum stasis_device_state_result stasis_app_device_state_update(
  179. const char *name, const char *value)
  180. {
  181. size_t size = strlen(DEVICE_STATE_SCHEME_STASIS);
  182. enum ast_device_state state;
  183. ast_debug(3, "Updating device name = %s, value = %s", name, value);
  184. if (strncasecmp(name, DEVICE_STATE_SCHEME_STASIS, size)) {
  185. ast_log(LOG_ERROR, "Update can only be used to set "
  186. "'%s' device state!\n", DEVICE_STATE_SCHEME_STASIS);
  187. return STASIS_DEVICE_STATE_NOT_CONTROLLED;
  188. }
  189. name += size;
  190. if (ast_strlen_zero(name)) {
  191. ast_log(LOG_ERROR, "Update requires custom device name!\n");
  192. return STASIS_DEVICE_STATE_MISSING;
  193. }
  194. if (!value || (state = ast_devstate_val(value)) == AST_DEVICE_UNKNOWN) {
  195. ast_log(LOG_ERROR, "Unknown device state "
  196. "value '%s'\n", value);
  197. return STASIS_DEVICE_STATE_UNKNOWN;
  198. }
  199. ast_db_put(DEVICE_STATE_FAMILY, name, value);
  200. ast_devstate_changed(state, AST_DEVSTATE_CACHABLE, "%s%s",
  201. DEVICE_STATE_SCHEME_STASIS, name);
  202. return STASIS_DEVICE_STATE_OK;
  203. }
  204. enum stasis_device_state_result stasis_app_device_state_delete(const char *name)
  205. {
  206. const char *full_name = name;
  207. size_t size = strlen(DEVICE_STATE_SCHEME_STASIS);
  208. if (strncasecmp(name, DEVICE_STATE_SCHEME_STASIS, size)) {
  209. ast_log(LOG_ERROR, "Can only delete '%s' device states!\n",
  210. DEVICE_STATE_SCHEME_STASIS);
  211. return STASIS_DEVICE_STATE_NOT_CONTROLLED;
  212. }
  213. name += size;
  214. if (ast_strlen_zero(name)) {
  215. ast_log(LOG_ERROR, "Delete requires a device name!\n");
  216. return STASIS_DEVICE_STATE_MISSING;
  217. }
  218. if (ast_device_state_clear_cache(full_name)) {
  219. return STASIS_DEVICE_STATE_UNKNOWN;
  220. }
  221. ast_db_del(DEVICE_STATE_FAMILY, name);
  222. /* send state change for delete */
  223. ast_devstate_changed(
  224. AST_DEVICE_UNKNOWN, AST_DEVSTATE_CACHABLE, "%s%s",
  225. DEVICE_STATE_SCHEME_STASIS, name);
  226. return STASIS_DEVICE_STATE_OK;
  227. }
  228. static void populate_cache(void)
  229. {
  230. RAII_VAR(struct ast_db_entry *, tree,
  231. ast_db_gettree(DEVICE_STATE_FAMILY, NULL), ast_db_freetree);
  232. struct ast_db_entry *entry;
  233. for (entry = tree; entry; entry = entry->next) {
  234. const char *name = strrchr(entry->key, '/');
  235. if (!ast_strlen_zero(name)) {
  236. ast_devstate_changed(
  237. ast_devstate_val(entry->data),
  238. AST_DEVSTATE_CACHABLE, "%s%s\n",
  239. DEVICE_STATE_SCHEME_STASIS, name + 1);
  240. }
  241. }
  242. }
  243. static enum ast_device_state stasis_device_state_cb(const char *data)
  244. {
  245. char buf[DEVICE_STATE_SIZE];
  246. ast_db_get(DEVICE_STATE_FAMILY, data, buf, sizeof(buf));
  247. return ast_devstate_val(buf);
  248. }
  249. static void device_state_cb(void *data, struct stasis_subscription *sub,
  250. struct stasis_message *msg)
  251. {
  252. struct ast_device_state_message *device_state;
  253. if (stasis_subscription_final_message(sub, msg)) {
  254. /* Remove stasis subscription's reference to device_state_subscription */
  255. ao2_ref(data, -1);
  256. return;
  257. }
  258. if (ast_device_state_message_type() != stasis_message_type(msg)) {
  259. return;
  260. }
  261. device_state = stasis_message_data(msg);
  262. if (device_state->eid) {
  263. /* ignore non-aggregate states */
  264. return;
  265. }
  266. send_device_state(data, device_state->device, device_state->state);
  267. }
  268. static void *find_device_state(const struct stasis_app *app, const char *name)
  269. {
  270. return device_state_subscription_create(app, name);
  271. }
  272. static int is_subscribed_device_state(struct stasis_app *app, const char *name)
  273. {
  274. struct device_state_subscription *sub;
  275. sub = find_device_state_subscription(app, DEVICE_STATE_ALL);
  276. if (sub) {
  277. ao2_ref(sub, -1);
  278. return 1;
  279. }
  280. sub = find_device_state_subscription(app, name);
  281. if (sub) {
  282. ao2_ref(sub, -1);
  283. return 1;
  284. }
  285. return 0;
  286. }
  287. static int is_subscribed_device_state_lock(struct stasis_app *app, const char *name)
  288. {
  289. int is_subscribed;
  290. ao2_lock(device_state_subscriptions);
  291. is_subscribed = is_subscribed_device_state(app, name);
  292. ao2_unlock(device_state_subscriptions);
  293. return is_subscribed;
  294. }
  295. static int subscribe_device_state(struct stasis_app *app, void *obj)
  296. {
  297. struct device_state_subscription *sub = obj;
  298. struct stasis_topic *topic;
  299. if (!sub) {
  300. sub = device_state_subscription_create(app, NULL);
  301. if (!sub) {
  302. return -1;
  303. }
  304. }
  305. if (strcmp(sub->device_name, DEVICE_STATE_ALL)) {
  306. topic = ast_device_state_topic(sub->device_name);
  307. } else {
  308. topic = ast_device_state_topic_all();
  309. }
  310. ao2_lock(device_state_subscriptions);
  311. if (is_subscribed_device_state(app, sub->device_name)) {
  312. ao2_unlock(device_state_subscriptions);
  313. ast_debug(3, "App %s is already subscribed to %s\n", stasis_app_name(app), sub->device_name);
  314. return 0;
  315. }
  316. ast_debug(3, "Subscribing to device %s\n", sub->device_name);
  317. sub->sub = stasis_subscribe_pool(topic, device_state_cb, ao2_bump(sub));
  318. if (!sub->sub) {
  319. ao2_unlock(device_state_subscriptions);
  320. ast_log(LOG_ERROR, "Unable to subscribe to device %s\n",
  321. sub->device_name);
  322. /* Reference we added when attempting to stasis_subscribe_pool */
  323. ao2_ref(sub, -1);
  324. return -1;
  325. }
  326. stasis_subscription_accept_message_type(sub->sub, ast_device_state_message_type());
  327. stasis_subscription_accept_message_type(sub->sub, stasis_subscription_change_type());
  328. stasis_subscription_set_filter(sub->sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
  329. ao2_link_flags(device_state_subscriptions, sub, OBJ_NOLOCK);
  330. ao2_unlock(device_state_subscriptions);
  331. return 0;
  332. }
  333. static int unsubscribe_device_state(struct stasis_app *app, const char *name)
  334. {
  335. struct device_state_subscription *sub;
  336. ao2_lock(device_state_subscriptions);
  337. sub = find_device_state_subscription(app, name);
  338. if (sub) {
  339. remove_device_state_subscription(sub);
  340. }
  341. ao2_unlock(device_state_subscriptions);
  342. ao2_cleanup(sub);
  343. return 0;
  344. }
  345. static int device_to_json_cb(void *obj, void *arg, void *data, int flags)
  346. {
  347. struct device_state_subscription *sub = obj;
  348. const char *app_name = arg;
  349. struct ast_json *array = data;
  350. if (strcmp(sub->app_name, app_name)) {
  351. return 0;
  352. }
  353. ast_json_array_append(
  354. array, ast_json_string_create(sub->device_name));
  355. return 0;
  356. }
  357. static void devices_to_json(const struct stasis_app *app, struct ast_json *json)
  358. {
  359. struct ast_json *array = ast_json_array_create();
  360. ao2_callback_data(device_state_subscriptions, OBJ_NODATA,
  361. device_to_json_cb, (void *)stasis_app_name(app), array);
  362. ast_json_object_set(json, "device_names", array);
  363. }
  364. struct stasis_app_event_source device_state_event_source = {
  365. .scheme = DEVICE_STATE_SCHEME_SUB,
  366. .find = find_device_state,
  367. .subscribe = subscribe_device_state,
  368. .unsubscribe = unsubscribe_device_state,
  369. .is_subscribed = is_subscribed_device_state_lock,
  370. .to_json = devices_to_json
  371. };
  372. static int load_module(void)
  373. {
  374. populate_cache();
  375. if (ast_devstate_prov_add(DEVICE_STATE_PROVIDER_STASIS,
  376. stasis_device_state_cb)) {
  377. return AST_MODULE_LOAD_DECLINE;
  378. }
  379. device_state_subscriptions = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_MUTEX, 0,
  380. DEVICE_STATE_BUCKETS, device_state_subscriptions_hash, NULL,
  381. device_state_subscriptions_cmp);
  382. if (!device_state_subscriptions) {
  383. ast_devstate_prov_del(DEVICE_STATE_PROVIDER_STASIS);
  384. return AST_MODULE_LOAD_DECLINE;
  385. }
  386. stasis_app_register_event_source(&device_state_event_source);
  387. return AST_MODULE_LOAD_SUCCESS;
  388. }
  389. static int unload_module(void)
  390. {
  391. ast_devstate_prov_del(DEVICE_STATE_PROVIDER_STASIS);
  392. stasis_app_unregister_event_source(&device_state_event_source);
  393. ao2_cleanup(device_state_subscriptions);
  394. device_state_subscriptions = NULL;
  395. return 0;
  396. }
  397. AST_MODULE_INFO(ASTERISK_GPL_KEY, AST_MODFLAG_GLOBAL_SYMBOLS, "Stasis application device state support",
  398. .support_level = AST_MODULE_SUPPORT_CORE,
  399. .load = load_module,
  400. .unload = unload_module,
  401. );