stasis.h 53 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552
  1. /*
  2. * Asterisk -- An open source telephony toolkit.
  3. *
  4. * Copyright (C) 2013, Digium, Inc.
  5. *
  6. * David M. Lee, II <dlee@digium.com>
  7. *
  8. * See http://www.asterisk.org for more information about
  9. * the Asterisk project. Please do not directly contact
  10. * any of the maintainers of this project for assistance;
  11. * the project provides a web site, mailing lists and IRC
  12. * channels for your use.
  13. *
  14. * This program is free software, distributed under the terms of
  15. * the GNU General Public License Version 2. See the LICENSE file
  16. * at the top of the source tree.
  17. */
  18. #ifndef _ASTERISK_STASIS_H
  19. #define _ASTERISK_STASIS_H
  20. /*! \file
  21. *
  22. * \brief Stasis Message Bus API. See \ref stasis "Stasis Message Bus API" for
  23. * detailed documentation.
  24. *
  25. * \author David M. Lee, II <dlee@digium.com>
  26. * \since 12
  27. *
  28. * \page stasis Stasis Message Bus API
  29. *
  30. * \par Intro
  31. *
  32. * The Stasis Message Bus is a loosely typed mechanism for distributing messages
  33. * within Asterisk. It is designed to be:
  34. * - Loosely coupled; new message types can be added in seperate modules.
  35. * - Easy to use; publishing and subscribing are straightforward operations.
  36. *
  37. * There are three main concepts for using the Stasis Message Bus:
  38. * - \ref stasis_message
  39. * - \ref stasis_topic
  40. * - \ref stasis_subscription
  41. *
  42. * \par stasis_message
  43. *
  44. * Central to the Stasis Message Bus is the \ref stasis_message, the messages
  45. * that are sent on the bus. These messages have:
  46. * - a type (as defined by a \ref stasis_message_type)
  47. * - a value - a \c void pointer to an AO2 object
  48. * - a timestamp when it was created
  49. *
  50. * Once a \ref stasis_message has been created, it is immutable and cannot
  51. * change. The same goes for the value of the message (although this cannot be
  52. * enforced in code). Messages themselves are reference-counted, AO2 objects,
  53. * along with their values. By being both reference counted and immutable,
  54. * messages can be shared throughout the system without any concerns for
  55. * threading.
  56. *
  57. * The type of a message is defined by an instance of \ref stasis_message_type,
  58. * which can be created by calling stasis_message_type_create(). Message types
  59. * are named, which is useful in debugging. It is recommended that the string
  60. * name for a message type match the name of the struct that's stored in the
  61. * message. For example, name for \ref stasis_cache_update's message type is \c
  62. * "stasis_cache_update".
  63. *
  64. * \par stasis_topic
  65. *
  66. * A \ref stasis_topic is an object to which \ref stasis_subscriber's may be
  67. * subscribed, and \ref stasis_message's may be published. Any message published
  68. * to the topic is dispatched to all of its subscribers. The topic itself may be
  69. * named, which is useful in debugging.
  70. *
  71. * Topics themselves are reference counted objects. Since topics are referred to
  72. * by their subscibers, they will not be freed until all of their subscribers
  73. * have unsubscribed. Topics are also thread safe, so no worries about
  74. * publishing/subscribing/unsubscribing to a topic concurrently from multiple
  75. * threads. It's also designed to handle the case of unsubscribing from a topic
  76. * from within the subscription handler.
  77. *
  78. * \par Forwarding
  79. *
  80. * There is one special case of topics that's worth noting: forwarding
  81. * messages. It's a fairly common use case to want to forward all the messages
  82. * published on one topic to another one (for example, an aggregator topic that
  83. * publishes all the events from a set of other topics). This can be
  84. * accomplished easily using stasis_forward_all(). This sets up the forwarding
  85. * between the two topics, and returns a \ref stasis_subscription, which can be
  86. * unsubscribed to stop the forwarding.
  87. *
  88. * \par Caching
  89. *
  90. * Another common use case is to want to cache certain messages that are
  91. * published on the bus. Usually these events are snapshots of the current state
  92. * in the system, and it's desirable to query that state from the cache without
  93. * locking the original object. It's also desirable for subscribers of the
  94. * caching topic to receive messages that have both the old cache value and the
  95. * new value being put into the cache. For this, we have stasis_cache_create()
  96. * and stasis_caching_topic_create(), providing them with the topic which
  97. * publishes the messages that you wish to cache, and a function that can
  98. * identify cacheable messages.
  99. *
  100. * The \ref stasis_cache is designed so that it may be shared amongst several
  101. * \ref stasis_caching_topic objects. This allows you to have individual caching
  102. * topics per-object (i.e. so you can subscribe to updates for a single object),
  103. * and still have a single cache to query for the state of all objects. While a
  104. * cache may be shared amongst different message types, such a usage is probably
  105. * not a good idea.
  106. *
  107. * The \ref stasis_cache can only be written to by \ref stasis_caching_topics.
  108. * It's a thread safe container, so freely use the stasis_cache_get() and
  109. * stasis_cache_dump() to query the cache.
  110. *
  111. * The \ref stasis_caching_topic discards non-cacheable messages. A cacheable
  112. * message is wrapped in a \ref stasis_cache_update message which provides the
  113. * old snapshot (or \c NULL if this is a new cache entry), and the new snapshot
  114. * (or \c NULL if the entry was removed from the cache). A
  115. * stasis_cache_clear_create() message must be sent to the topic in order to
  116. * remove entries from the cache.
  117. *
  118. * In order to unsubscribe a \ref stasis_caching_topic from the upstream topic,
  119. * call stasis_caching_unsubscribe(). Due to cyclic references, the \ref
  120. * stasis_caching_topic will not be freed until after it has been unsubscribed,
  121. * and all other ao2_ref()'s have been cleaned up.
  122. *
  123. * The \ref stasis_cache object is a normal AO2 managed object, which can be
  124. * release with ao2_cleanup().
  125. *
  126. * \par stasis_subscriber
  127. *
  128. * Any topic may be subscribed to by simply providing stasis_subscribe() the
  129. * \ref stasis_topic to subscribe to, a handler function and \c void pointer to
  130. * data that is passed back to the handler. Invocations on the subscription's
  131. * handler are serialized, but different invocations may occur on different
  132. * threads (this usually isn't important unless you use thread locals or
  133. * something similar).
  134. *
  135. * In order to stop receiving messages, call stasis_unsubscribe() with your \ref
  136. * stasis_subscription. Due to cyclic references, the \ref
  137. * stasis_subscription will not be freed until after it has been unsubscribed,
  138. * and all other ao2_ref()'s have been cleaned up.
  139. *
  140. * \par Shutdown
  141. *
  142. * Subscriptions have two options for unsubscribing, depending upon the context
  143. * in which you need to unsubscribe.
  144. *
  145. * If your subscription is owned by a module, and you must unsubscribe from the
  146. * module_unload() function, then you'll want to use the
  147. * stasis_unsubscribe_and_join() function. This will block until the final
  148. * message has been received on the subscription. Otherwise, there's the danger
  149. * of invoking the callback function after it has been unloaded.
  150. *
  151. * If your subscription is owned by an object, then your object should have an
  152. * explicit shutdown() function, which calls stasis_unsubscribe(). In your
  153. * subscription handler, when the stasis_subscription_final_message() has been
  154. * received, decrement the refcount on your object. In your object's destructor,
  155. * you may assert that stasis_subscription_is_done() to validate that the
  156. * subscription's callback will no longer be invoked.
  157. *
  158. * \b Note: You may be tempted to simply call stasis_unsubscribe_and_join() from
  159. * an object's destructor. While code that does this may work most of the time,
  160. * it's got one big downside. There's a general assumption that object
  161. * destruction is non-blocking. If you block the destruction waiting for the
  162. * subscription to complete, there's the danger that the subscription may
  163. * process a message which will bump the refcount up by one. Then it does
  164. * whatever it does, decrements the refcount, which then proceeds to re-destroy
  165. * the object. Now you've got hard to reproduce bugs that only show up under
  166. * certain loads.
  167. */
  168. #include "asterisk/json.h"
  169. #include "asterisk/manager.h"
  170. #include "asterisk/utils.h"
  171. #include "asterisk/event.h"
  172. /*! @{ */
  173. /*!
  174. * \brief Metadata about a \ref stasis_message.
  175. * \since 12
  176. */
  177. struct stasis_message_type;
  178. /*!
  179. * \brief Opaque type for a Stasis message.
  180. * \since 12
  181. */
  182. struct stasis_message;
  183. /*!
  184. * \brief Opaque type for a Stasis subscription.
  185. * \since 12
  186. */
  187. struct stasis_subscription;
  188. /*!
  189. * \brief Structure containing callbacks for Stasis message sanitization
  190. *
  191. * \note If either callback is implemented, both should be implemented since
  192. * not all callers may have access to the full snapshot.
  193. */
  194. struct stasis_message_sanitizer {
  195. /*!
  196. * \brief Callback which determines whether a channel should be sanitized from
  197. * a message based on the channel's unique ID
  198. *
  199. * \param channel_id The unique ID of the channel
  200. *
  201. * \retval non-zero if the channel should be left out of the message
  202. * \retval zero if the channel should remain in the message
  203. */
  204. int (*channel_id)(const char *channel_id);
  205. /*!
  206. * \brief Callback which determines whether a channel should be sanitized from
  207. * a message based on the channel's snapshot
  208. *
  209. * \param snapshot A snapshot generated from the channel
  210. *
  211. * \retval non-zero if the channel should be left out of the message
  212. * \retval zero if the channel should remain in the message
  213. */
  214. int (*channel_snapshot)(const struct ast_channel_snapshot *snapshot);
  215. /*!
  216. * \brief Callback which determines whether a channel should be sanitized from
  217. * a message based on the channel
  218. *
  219. * \param chan The channel to be checked
  220. *
  221. * \retval non-zero if the channel should be left out of the message
  222. * \retval zero if the channel should remain in the message
  223. */
  224. int (*channel)(const struct ast_channel *chan);
  225. };
  226. /*!
  227. * \brief Virtual table providing methods for messages.
  228. * \since 12
  229. */
  230. struct stasis_message_vtable {
  231. /*!
  232. * \brief Build the JSON representation of the message.
  233. *
  234. * May be \c NULL, or may return \c NULL, to indicate no representation.
  235. * The returned object should be ast_json_unref()'ed.
  236. *
  237. * \param message Message to convert to JSON string.
  238. * \param sanitize Snapshot sanitization callback.
  239. *
  240. * \return Newly allocated JSON message.
  241. * \return \c NULL on error.
  242. * \return \c NULL if JSON format is not supported.
  243. */
  244. struct ast_json *(*to_json)(struct stasis_message *message, const struct stasis_message_sanitizer *sanitize);
  245. /*!
  246. * \brief Build the AMI representation of the message.
  247. *
  248. * May be \c NULL, or may return \c NULL, to indicate no representation.
  249. * The returned object should be ao2_cleanup()'ed.
  250. *
  251. * \param message Message to convert to AMI string.
  252. * \return Newly allocated \ref ast_manager_event_blob.
  253. * \return \c NULL on error.
  254. * \return \c NULL if AMI format is not supported.
  255. */
  256. struct ast_manager_event_blob *(*to_ami)(
  257. struct stasis_message *message);
  258. /*!
  259. * \since 12.3.0
  260. * \brief Build the \ref ast_event representation of the message.
  261. *
  262. * May be \c NULL, or may return \c NULL, to indicate no representation.
  263. * The returned object should be free'd.
  264. *
  265. * \param message Message to convert to an \ref ast_event.
  266. * \return Newly allocated \ref ast_event.
  267. * \return \c NULL on error.
  268. * \return \c NULL if AMI format is not supported.
  269. */
  270. struct ast_event *(*to_event)(
  271. struct stasis_message *message);
  272. };
  273. /*!
  274. * \brief Return code for Stasis message type creation attempts
  275. */
  276. enum stasis_message_type_result {
  277. STASIS_MESSAGE_TYPE_ERROR = -1, /*!< Message type was not created due to allocation failure */
  278. STASIS_MESSAGE_TYPE_SUCCESS, /*!< Message type was created successfully */
  279. STASIS_MESSAGE_TYPE_DECLINED, /*!< Message type was not created due to configuration */
  280. };
  281. /*!
  282. * \brief Stasis subscription message filters
  283. */
  284. enum stasis_subscription_message_filter {
  285. STASIS_SUBSCRIPTION_FILTER_NONE = 0, /*!< No filter is in place, all messages are raised */
  286. STASIS_SUBSCRIPTION_FILTER_FORCED_NONE, /*!< No filter is in place or can be set, all messages are raised */
  287. STASIS_SUBSCRIPTION_FILTER_SELECTIVE, /*!< Only messages of allowed message types are raised */
  288. };
  289. /*!
  290. * \brief Stasis subscription formatter filters
  291. *
  292. * There should be an entry here for each member of \ref stasis_message_vtable
  293. *
  294. * \since 13.25.0
  295. * \since 16.2.0
  296. */
  297. enum stasis_subscription_message_formatters {
  298. STASIS_SUBSCRIPTION_FORMATTER_NONE = 0,
  299. STASIS_SUBSCRIPTION_FORMATTER_JSON = 1 << 0, /*!< Allow messages with a to_json formatter */
  300. STASIS_SUBSCRIPTION_FORMATTER_AMI = 1 << 1, /*!< Allow messages with a to_ami formatter */
  301. STASIS_SUBSCRIPTION_FORMATTER_EVENT = 1 << 2, /*!< Allow messages with a to_event formatter */
  302. };
  303. /*!
  304. * \brief Create a new message type.
  305. *
  306. * \ref stasis_message_type is an AO2 object, so ao2_cleanup() when you're done
  307. * with it.
  308. *
  309. * \param name Name of the new type.
  310. * \param vtable Virtual table of message methods. May be \c NULL.
  311. * \param[out] result The location where the new message type will be placed
  312. *
  313. * \note Stasis message type creation may be declined if the message type is disabled
  314. *
  315. * \returns A stasis_message_type_result enum
  316. * \since 12
  317. */
  318. enum stasis_message_type_result stasis_message_type_create(const char *name,
  319. struct stasis_message_vtable *vtable, struct stasis_message_type **result);
  320. /*!
  321. * \brief Gets the name of a given message type
  322. * \param type The type to get.
  323. * \return Name of the type.
  324. * \return \c NULL if \a type is \c NULL.
  325. * \since 12
  326. */
  327. const char *stasis_message_type_name(const struct stasis_message_type *type);
  328. /*!
  329. * \brief Gets the hash of a given message type
  330. * \param type The type to get the hash of.
  331. * \return The hash
  332. * \since 13.24.0
  333. */
  334. unsigned int stasis_message_type_hash(const struct stasis_message_type *type);
  335. /*!
  336. * \brief Gets the id of a given message type
  337. * \param type The type to get the id of.
  338. * \return The id
  339. * \since 17.0.0
  340. */
  341. int stasis_message_type_id(const struct stasis_message_type *type);
  342. /*!
  343. * \brief Check whether a message type is declined
  344. *
  345. * \param name The name of the message type to check
  346. *
  347. * \retval zero The message type is not declined
  348. * \retval non-zero The message type is declined
  349. */
  350. int stasis_message_type_declined(const char *name);
  351. /*!
  352. * \brief Create a new message.
  353. *
  354. * This message is an \c ao2 object, and must be ao2_cleanup()'ed when you are done
  355. * with it. Messages are also immutable, and must not be modified after they
  356. * are initialized. Especially the \a data in the message.
  357. *
  358. * \param type Type of the message
  359. * \param data Immutable data that is the actual contents of the message
  360. *
  361. * \return New message
  362. * \return \c NULL on error
  363. *
  364. * \since 12
  365. */
  366. struct stasis_message *stasis_message_create(struct stasis_message_type *type, void *data);
  367. /*!
  368. * \brief Create a new message for an entity.
  369. *
  370. * This message is an \c ao2 object, and must be ao2_cleanup()'ed when you are done
  371. * with it. Messages are also immutable, and must not be modified after they
  372. * are initialized. Especially the \a data in the message.
  373. *
  374. * \param type Type of the message
  375. * \param data Immutable data that is the actual contents of the message
  376. * \param eid What entity originated this message. (NULL for aggregate)
  377. *
  378. * \note An aggregate message is a combined representation of the local
  379. * and remote entities publishing the message data. e.g., An aggregate
  380. * device state represents the combined device state from the local and
  381. * any remote entities publishing state for a device. e.g., An aggregate
  382. * MWI message is the old/new MWI counts accumulated from the local and
  383. * any remote entities publishing to a mailbox.
  384. *
  385. * \retval New message
  386. * \retval \c NULL on error
  387. *
  388. * \since 12.2.0
  389. */
  390. struct stasis_message *stasis_message_create_full(struct stasis_message_type *type, void *data, const struct ast_eid *eid);
  391. /*!
  392. * \brief Get the entity id for a \ref stasis_message.
  393. * \since 12.2.0
  394. *
  395. * \param msg Message to get eid.
  396. *
  397. * \retval Entity id of \a msg
  398. * \retval \c NULL if \a msg is an aggregate or \a msg is \c NULL.
  399. */
  400. const struct ast_eid *stasis_message_eid(const struct stasis_message *msg);
  401. /*!
  402. * \brief Get the message type for a \ref stasis_message.
  403. * \param msg Message to type
  404. * \return Type of \a msg
  405. * \return \c NULL if \a msg is \c NULL.
  406. * \since 12
  407. */
  408. struct stasis_message_type *stasis_message_type(const struct stasis_message *msg);
  409. /*!
  410. * \brief Get the data contained in a message.
  411. * \param msg Message.
  412. * \return Immutable data pointer
  413. * \return \c NULL if msg is \c NULL.
  414. * \since 12
  415. */
  416. void *stasis_message_data(const struct stasis_message *msg);
  417. /*!
  418. * \brief Get the time when a message was created.
  419. * \param msg Message.
  420. * \return Pointer to the \a timeval when the message was created.
  421. * \return \c NULL if msg is \c NULL.
  422. * \since 12
  423. */
  424. const struct timeval *stasis_message_timestamp(const struct stasis_message *msg);
  425. /*!
  426. * \brief Build the JSON representation of the message.
  427. *
  428. * May return \c NULL, to indicate no representation. The returned object should
  429. * be ast_json_unref()'ed.
  430. *
  431. * \param msg Message to convert to JSON string.
  432. * \param sanitize Snapshot sanitization callback.
  433. *
  434. * \return Newly allocated string with JSON message.
  435. * \return \c NULL on error.
  436. * \return \c NULL if JSON format is not supported.
  437. */
  438. struct ast_json *stasis_message_to_json(struct stasis_message *msg, struct stasis_message_sanitizer *sanitize);
  439. /*!
  440. * \brief Build the AMI representation of the message.
  441. *
  442. * May return \c NULL, to indicate no representation. The returned object should
  443. * be ao2_cleanup()'ed.
  444. *
  445. * \param msg Message to convert to AMI.
  446. * \return \c NULL on error.
  447. * \return \c NULL if AMI format is not supported.
  448. */
  449. struct ast_manager_event_blob *stasis_message_to_ami(struct stasis_message *msg);
  450. /*!
  451. * \brief Determine if the given message can be converted to AMI.
  452. *
  453. * \param msg Message to see if can be converted to AMI.
  454. *
  455. * \retval 0 Cannot be converted
  456. * \retval non-zero Can be converted
  457. */
  458. int stasis_message_can_be_ami(struct stasis_message *msg);
  459. /*!
  460. * \brief Build the \ref AstGenericEvents representation of the message.
  461. *
  462. * May return \c NULL, to indicate no representation. The returned object should
  463. * be disposed of via \ref ast_event_destroy.
  464. *
  465. * \param msg Message to convert to AMI.
  466. * \return \c NULL on error.
  467. * \return \c NULL if AMI format is not supported.
  468. */
  469. struct ast_event *stasis_message_to_event(struct stasis_message *msg);
  470. /*! @} */
  471. /*! @{ */
  472. /*!
  473. * \brief A topic to which messages may be posted, and subscribers, well, subscribe
  474. * \since 12
  475. */
  476. struct stasis_topic;
  477. /*!
  478. * \brief Create a new topic.
  479. * \param name Name of the new topic.
  480. * \return New topic instance.
  481. * \return \c NULL on error.
  482. * \since 12
  483. *
  484. * \note There is no explicit ability to unsubscribe all subscribers
  485. * from a topic and destroy it. As a result the topic can persist until
  486. * the last subscriber unsubscribes itself even if there is no
  487. * publisher.
  488. */
  489. struct stasis_topic *stasis_topic_create(const char *name);
  490. /*!
  491. * \brief Return the name of a topic.
  492. * \param topic Topic.
  493. * \return Name of the topic.
  494. * \return \c NULL if topic is \c NULL.
  495. * \since 12
  496. */
  497. const char *stasis_topic_name(const struct stasis_topic *topic);
  498. /*!
  499. * \brief Return the number of subscribers of a topic.
  500. * \param topic Topic.
  501. * \return Number of subscribers of the topic.
  502. * \since 17.0.0
  503. */
  504. size_t stasis_topic_subscribers(const struct stasis_topic *topic);
  505. /*!
  506. * \brief Publish a message to a topic's subscribers.
  507. * \param topic Topic.
  508. * \param message Message to publish.
  509. *
  510. * This call is asynchronous and will return immediately upon queueing
  511. * the message for delivery to the topic's subscribers.
  512. *
  513. * \since 12
  514. */
  515. void stasis_publish(struct stasis_topic *topic, struct stasis_message *message);
  516. /*!
  517. * \brief Publish a message to a topic's subscribers, synchronizing
  518. * on the specified subscriber
  519. * \param sub Subscription to synchronize on.
  520. * \param message Message to publish.
  521. *
  522. * The caller of stasis_publish_sync will block until the specified
  523. * subscriber completes handling of the message.
  524. *
  525. * All other subscribers to the topic the \ref stasis_subpscription
  526. * is subscribed to are also delivered the message; this delivery however
  527. * happens asynchronously.
  528. *
  529. * \since 12.1.0
  530. */
  531. void stasis_publish_sync(struct stasis_subscription *sub, struct stasis_message *message);
  532. /*! @} */
  533. /*! @{ */
  534. /*!
  535. * \brief Callback function type for Stasis subscriptions.
  536. * \param data Data field provided with subscription.
  537. * \param message Published message.
  538. * \since 12
  539. */
  540. typedef void (*stasis_subscription_cb)(void *data, struct stasis_subscription *sub, struct stasis_message *message);
  541. /*!
  542. * \brief Stasis subscription callback function that does nothing.
  543. *
  544. * \note This callback should be used for events are not directly processed, but need
  545. * to be generated so data can be retrieved from cache later. Subscriptions with this
  546. * callback can be released with \ref stasis_unsubscribe, even during module unload.
  547. *
  548. * \since 13.5
  549. */
  550. void stasis_subscription_cb_noop(void *data, struct stasis_subscription *sub, struct stasis_message *message);
  551. /*!
  552. * \brief Create a subscription.
  553. *
  554. * In addition to being AO2 managed memory (requiring an ao2_cleanup() to free
  555. * up this reference), the subscription must be explicitly unsubscribed from its
  556. * topic using stasis_unsubscribe().
  557. *
  558. * The invocations of the callback are serialized, but may not always occur on
  559. * the same thread. The invocation order of different subscriptions is
  560. * unspecified.
  561. *
  562. * \param topic Topic to subscribe to.
  563. * \param callback Callback function for subscription messages.
  564. * \param data Data to be passed to the callback, in addition to the message.
  565. * \return New \ref stasis_subscription object.
  566. * \return \c NULL on error.
  567. * \since 12
  568. *
  569. * \note This callback will receive a callback with a message indicating it
  570. * has been subscribed. This occurs immediately before accepted message
  571. * types can be set and the callback must expect to receive it.
  572. */
  573. struct stasis_subscription *__stasis_subscribe(struct stasis_topic *topic,
  574. stasis_subscription_cb callback, void *data, const char *file, int lineno, const char *func);
  575. #ifdef AST_DEVMODE
  576. #define stasis_subscribe(topic, callback, data) __stasis_subscribe(topic, callback, data, __FILE__, __LINE__, __PRETTY_FUNCTION__)
  577. #else
  578. struct stasis_subscription *stasis_subscribe(struct stasis_topic *topic,
  579. stasis_subscription_cb callback, void *data);
  580. #endif
  581. /*!
  582. * \brief Create a subscription whose callbacks occur on a thread pool
  583. *
  584. * In addition to being AO2 managed memory (requiring an ao2_cleanup() to free
  585. * up this reference), the subscription must be explicitly unsubscribed from its
  586. * topic using stasis_unsubscribe().
  587. *
  588. * The invocations of the callback are serialized, but will almost certainly not
  589. * always happen on the same thread. The invocation order of different subscriptions
  590. * is unspecified.
  591. *
  592. * Unlike \ref stasis_subscribe, this function will explicitly use a threadpool to
  593. * dispatch items to its \c callback. This form of subscription should be used
  594. * when many subscriptions may be made to the specified \c topic.
  595. *
  596. * \param topic Topic to subscribe to.
  597. * \param callback Callback function for subscription messages.
  598. * \param data Data to be passed to the callback, in addition to the message.
  599. * \return New \ref stasis_subscription object.
  600. * \return \c NULL on error.
  601. * \since 12.8.0
  602. *
  603. * \note This callback will receive a callback with a message indicating it
  604. * has been subscribed. This occurs immediately before accepted message
  605. * types can be set and the callback must expect to receive it.
  606. */
  607. struct stasis_subscription *__stasis_subscribe_pool(struct stasis_topic *topic,
  608. stasis_subscription_cb callback, void *data, const char *file, int lineno, const char *func);
  609. #ifdef AST_DEVMODE
  610. #define stasis_subscribe_pool(topic, callback, data) __stasis_subscribe_pool(topic, callback, data, __FILE__, __LINE__, __PRETTY_FUNCTION__)
  611. #else
  612. struct stasis_subscription *stasis_subscribe_pool(struct stasis_topic *topic,
  613. stasis_subscription_cb callback, void *data);
  614. #endif
  615. /*!
  616. * \brief Indicate to a subscription that we are interested in a message type.
  617. *
  618. * This will cause the subscription to allow the given message type to be
  619. * raised to our subscription callback. This enables internal filtering in
  620. * the stasis message bus to reduce messages.
  621. *
  622. * \param subscription Subscription to add message type to.
  623. * \param type The message type we wish to receive.
  624. * \retval 0 on success
  625. * \retval -1 failure
  626. *
  627. * \since 17.0.0
  628. *
  629. * \note If you are wanting to use stasis_final_message you will need to accept
  630. * \ref stasis_subscription_change_type as a message type.
  631. *
  632. * \note Until the subscription is set to selective filtering it is possible for it
  633. * to receive messages of message types that would not normally be accepted.
  634. */
  635. int stasis_subscription_accept_message_type(struct stasis_subscription *subscription,
  636. const struct stasis_message_type *type);
  637. /*!
  638. * \brief Indicate to a subscription that we are not interested in a message type.
  639. *
  640. * \param subscription Subscription to remove message type from.
  641. * \param type The message type we don't wish to receive.
  642. * \retval 0 on success
  643. * \retval -1 failure
  644. *
  645. * \since 17.0.0
  646. */
  647. int stasis_subscription_decline_message_type(struct stasis_subscription *subscription,
  648. const struct stasis_message_type *type);
  649. /*!
  650. * \brief Set the message type filtering level on a subscription
  651. *
  652. * This will cause the subscription to filter messages according to the
  653. * provided filter level. For example if selective is used then only
  654. * messages matching those provided to \ref stasis_subscription_accept_message_type
  655. * will be raised to the subscription callback.
  656. *
  657. * \param subscription Subscription that should receive all messages.
  658. * \param filter What filter to use
  659. * \retval 0 on success
  660. * \retval -1 failure
  661. *
  662. * \since 17.0.0
  663. */
  664. int stasis_subscription_set_filter(struct stasis_subscription *subscription,
  665. enum stasis_subscription_message_filter filter);
  666. /*!
  667. * \brief Indicate to a subscription that we are interested in messages with one or more formatters.
  668. *
  669. * \param subscription Subscription to alter.
  670. * \param formatters A bitmap of \ref stasis_subscription_message_formatters we wish to receive.
  671. *
  672. * \since 13.25.0
  673. * \since 16.2.0
  674. */
  675. void stasis_subscription_accept_formatters(struct stasis_subscription *subscription,
  676. enum stasis_subscription_message_formatters formatters);
  677. /*!
  678. * \brief Get a bitmap of available formatters for a message type
  679. *
  680. * \param message_type Message type
  681. * \return A bitmap of \ref stasis_subscription_message_formatters
  682. *
  683. * \since 13.25.0
  684. * \since 16.2.0
  685. */
  686. enum stasis_subscription_message_formatters stasis_message_type_available_formatters(
  687. const struct stasis_message_type *message_type);
  688. /*!
  689. * \brief Cancel a subscription.
  690. *
  691. * Note that in an asynchronous system, there may still be messages queued or
  692. * in transit to the subscription's callback. These will still be delivered.
  693. * There will be a final 'SubscriptionCancelled' message, indicating the
  694. * delivery of the final message.
  695. *
  696. * \param subscription Subscription to cancel.
  697. * \return \c NULL for convenience
  698. * \since 12
  699. */
  700. struct stasis_subscription *stasis_unsubscribe(
  701. struct stasis_subscription *subscription);
  702. /*!
  703. * \brief Set the high and low alert water marks of the stasis subscription.
  704. * \since 13.10.0
  705. *
  706. * \param subscription Pointer to a stasis subscription
  707. * \param low_water New queue low water mark. (-1 to set as 90% of high_water)
  708. * \param high_water New queue high water mark.
  709. *
  710. * \retval 0 on success.
  711. * \retval -1 on error (water marks not changed).
  712. */
  713. int stasis_subscription_set_congestion_limits(struct stasis_subscription *subscription,
  714. long low_water, long high_water);
  715. /*!
  716. * \brief Block until the last message is processed on a subscription.
  717. *
  718. * This function will not return until the \a subscription's callback for the
  719. * stasis_subscription_final_message() completes. This allows cleanup routines
  720. * to run before unblocking the joining thread.
  721. *
  722. * \param subscription Subscription to block on.
  723. * \since 12
  724. */
  725. void stasis_subscription_join(struct stasis_subscription *subscription);
  726. /*!
  727. * \brief Returns whether \a subscription has received its final message.
  728. *
  729. * Note that a subscription is considered done even while the
  730. * stasis_subscription_final_message() is being processed. This allows cleanup
  731. * routines to check the status of the subscription.
  732. *
  733. * \param subscription Subscription.
  734. * \return True (non-zero) if stasis_subscription_final_message() has been
  735. * received.
  736. * \return False (zero) if waiting for the end.
  737. */
  738. int stasis_subscription_is_done(struct stasis_subscription *subscription);
  739. /*!
  740. * \brief Cancel a subscription, blocking until the last message is processed.
  741. *
  742. * While normally it's recommended to stasis_unsubscribe() and wait for
  743. * stasis_subscription_final_message(), there are times (like during a module
  744. * unload) where you have to wait for the final message (otherwise you'll call
  745. * a function in a shared module that no longer exists).
  746. *
  747. * \param subscription Subscription to cancel.
  748. * \return \c NULL for convenience
  749. * \since 12
  750. */
  751. struct stasis_subscription *stasis_unsubscribe_and_join(
  752. struct stasis_subscription *subscription);
  753. struct stasis_forward;
  754. /*!
  755. * \brief Create a subscription which forwards all messages from one topic to
  756. * another.
  757. *
  758. * Note that the \a topic parameter of the invoked callback will the be the
  759. * \a topic the message was sent to, not the topic the subscriber subscribed to.
  760. *
  761. * \param from_topic Topic to forward.
  762. * \param to_topic Destination topic of forwarded messages.
  763. * \return New forwarding subscription.
  764. * \return \c NULL on error.
  765. * \since 12
  766. */
  767. struct stasis_forward *stasis_forward_all(struct stasis_topic *from_topic,
  768. struct stasis_topic *to_topic);
  769. struct stasis_forward *stasis_forward_cancel(struct stasis_forward *forward);
  770. /*!
  771. * \brief Get the unique ID for the subscription.
  772. *
  773. * \param sub Subscription for which to get the unique ID.
  774. * \return Unique ID for the subscription.
  775. * \since 12
  776. */
  777. const char *stasis_subscription_uniqueid(const struct stasis_subscription *sub);
  778. /*!
  779. * \brief Returns whether a subscription is currently subscribed.
  780. *
  781. * Note that there may still be messages queued up to be dispatched to this
  782. * subscription, but the stasis_subscription_final_message() has been enqueued.
  783. *
  784. * \param sub Subscription to check
  785. * \return False (zero) if subscription is not subscribed.
  786. * \return True (non-zero) if still subscribed.
  787. */
  788. int stasis_subscription_is_subscribed(const struct stasis_subscription *sub);
  789. /*!
  790. * \brief Determine whether a message is the final message to be received on a subscription.
  791. *
  792. * \param sub Subscription on which the message was received.
  793. * \param msg Message to check.
  794. * \return zero if the provided message is not the final message.
  795. * \return non-zero if the provided message is the final message.
  796. * \since 12
  797. */
  798. int stasis_subscription_final_message(struct stasis_subscription *sub, struct stasis_message *msg);
  799. /*! \addtogroup StasisTopicsAndMessages
  800. * @{
  801. */
  802. /*!
  803. * \brief Holds details about changes to subscriptions for the specified topic
  804. * \since 12
  805. */
  806. struct stasis_subscription_change {
  807. struct stasis_topic *topic; /*!< The topic the subscription is/was subscribing to */
  808. char *uniqueid; /*!< The unique ID associated with this subscription */
  809. char description[0]; /*!< The description of the change to the subscription associated with the uniqueid */
  810. };
  811. /*!
  812. * \brief Gets the message type for subscription change notices
  813. * \return The stasis_message_type for subscription change notices
  814. * \since 12
  815. */
  816. struct stasis_message_type *stasis_subscription_change_type(void);
  817. /*! @} */
  818. /*! @{ */
  819. /*!
  820. * \brief Pool for topic aggregation
  821. */
  822. struct stasis_topic_pool;
  823. /*!
  824. * \brief Create a topic pool that routes messages from dynamically generated topics to the given topic
  825. * \param pooled_topic Topic to which messages will be routed
  826. * \return the new stasis_topic_pool
  827. * \return \c NULL on failure
  828. */
  829. struct stasis_topic_pool *stasis_topic_pool_create(struct stasis_topic *pooled_topic);
  830. /*!
  831. * \brief Find or create a topic in the pool
  832. * \param pool Pool for which to get the topic
  833. * \param topic_name Name of the topic to get
  834. * \return The already stored or newly allocated topic
  835. * \return \c NULL if the topic was not found and could not be allocated
  836. */
  837. struct stasis_topic *stasis_topic_pool_get_topic(struct stasis_topic_pool *pool, const char *topic_name);
  838. /*!
  839. * \brief Delete a topic from the topic pool
  840. *
  841. * \param pool Pool from which to delete the topic
  842. * \param topic_name Name of the topic to delete
  843. *
  844. * \since 13.24
  845. * \since 15.6
  846. * \since 16.1
  847. */
  848. void stasis_topic_pool_delete_topic(struct stasis_topic_pool *pool, const char *topic_name);
  849. /*!
  850. * \brief Check if a topic exists in a pool
  851. * \param pool Pool to check
  852. * \param topic_name Name of the topic to check
  853. * \retval 1 exists
  854. * \retval 0 does not exist
  855. * \since 13.23.0
  856. */
  857. int stasis_topic_pool_topic_exists(const struct stasis_topic_pool *pool, const char *topic_name);
  858. /*! @} */
  859. /*! \addtogroup StasisTopicsAndMessages
  860. * @{
  861. */
  862. /*!
  863. * \brief Message type for cache update messages.
  864. * \return Message type for cache update messages.
  865. * \since 12
  866. */
  867. struct stasis_message_type *stasis_cache_update_type(void);
  868. /*!
  869. * \brief Cache update message
  870. * \since 12
  871. */
  872. struct stasis_cache_update {
  873. /*! \brief Convenience reference to snapshot type */
  874. struct stasis_message_type *type;
  875. /*! \brief Old value from the cache */
  876. struct stasis_message *old_snapshot;
  877. /*! \brief New value */
  878. struct stasis_message *new_snapshot;
  879. };
  880. /*!
  881. * \brief Message type for clearing a message from a stasis cache.
  882. * \since 12
  883. */
  884. struct stasis_message_type *stasis_cache_clear_type(void);
  885. /*! @} */
  886. /*! @{ */
  887. /*!
  888. * \brief A message cache, for use with \ref stasis_caching_topic.
  889. * \since 12
  890. */
  891. struct stasis_cache;
  892. /*! Cache entry used for calculating the aggregate snapshot. */
  893. struct stasis_cache_entry;
  894. /*!
  895. * \brief A topic wrapper, which caches certain messages.
  896. * \since 12
  897. */
  898. struct stasis_caching_topic;
  899. /*!
  900. * \brief Callback extract a unique identity from a snapshot message.
  901. *
  902. * This identity is unique to the underlying object of the snapshot, such as the
  903. * UniqueId field of a channel.
  904. *
  905. * \param message Message to extract id from.
  906. * \return String representing the snapshot's id.
  907. * \return \c NULL if the message_type of the message isn't a handled snapshot.
  908. * \since 12
  909. */
  910. typedef const char *(*snapshot_get_id)(struct stasis_message *message);
  911. /*!
  912. * \brief Callback to calculate the aggregate cache entry.
  913. * \since 12.2.0
  914. *
  915. * \param entry Cache entry to calculate a new aggregate snapshot.
  916. * \param new_snapshot The shapshot that is being updated.
  917. *
  918. * \note Return a ref bumped pointer from stasis_cache_entry_get_aggregate()
  919. * if a new aggregate could not be calculated because of error.
  920. *
  921. * \note An aggregate message is a combined representation of the local
  922. * and remote entities publishing the message data. e.g., An aggregate
  923. * device state represents the combined device state from the local and
  924. * any remote entities publishing state for a device. e.g., An aggregate
  925. * MWI message is the old/new MWI counts accumulated from the local and
  926. * any remote entities publishing to a mailbox.
  927. *
  928. * \return New aggregate-snapshot calculated on success.
  929. * Caller has a reference on return.
  930. */
  931. typedef struct stasis_message *(*cache_aggregate_calc_fn)(struct stasis_cache_entry *entry, struct stasis_message *new_snapshot);
  932. /*!
  933. * \brief Callback to publish the aggregate cache entry message.
  934. * \since 12.2.0
  935. *
  936. * \details
  937. * Once an aggregate message is calculated. This callback publishes the
  938. * message so subscribers will know the new value of an aggregated state.
  939. *
  940. * \param topic The aggregate message may be published to this topic.
  941. * It is the topic to which the cache itself is subscribed.
  942. * \param aggregate The aggregate shapshot message to publish.
  943. *
  944. * \note It is up to the function to determine if there is a better topic
  945. * the aggregate message should be published over.
  946. *
  947. * \note An aggregate message is a combined representation of the local
  948. * and remote entities publishing the message data. e.g., An aggregate
  949. * device state represents the combined device state from the local and
  950. * any remote entities publishing state for a device. e.g., An aggregate
  951. * MWI message is the old/new MWI counts accumulated from the local and
  952. * any remote entities publishing to a mailbox.
  953. *
  954. * \return Nothing
  955. */
  956. typedef void (*cache_aggregate_publish_fn)(struct stasis_topic *topic, struct stasis_message *aggregate);
  957. /*!
  958. * \brief Get the aggregate cache entry snapshot.
  959. * \since 12.2.0
  960. *
  961. * \param entry Cache entry to get the aggregate snapshot.
  962. *
  963. * \note A reference is not given to the returned pointer so don't unref it.
  964. *
  965. * \note An aggregate message is a combined representation of the local
  966. * and remote entities publishing the message data. e.g., An aggregate
  967. * device state represents the combined device state from the local and
  968. * any remote entities publishing state for a device. e.g., An aggregate
  969. * MWI message is the old/new MWI counts accumulated from the local and
  970. * any remote entities publishing to a mailbox.
  971. *
  972. * \retval Aggregate-snapshot in cache.
  973. * \retval NULL if not present.
  974. */
  975. struct stasis_message *stasis_cache_entry_get_aggregate(struct stasis_cache_entry *entry);
  976. /*!
  977. * \brief Get the local entity's cache entry snapshot.
  978. * \since 12.2.0
  979. *
  980. * \param entry Cache entry to get the local entity's snapshot.
  981. *
  982. * \note A reference is not given to the returned pointer so don't unref it.
  983. *
  984. * \retval Internal-snapshot in cache.
  985. * \retval NULL if not present.
  986. */
  987. struct stasis_message *stasis_cache_entry_get_local(struct stasis_cache_entry *entry);
  988. /*!
  989. * \brief Get a remote entity's cache entry snapshot by index.
  990. * \since 12.2.0
  991. *
  992. * \param entry Cache entry to get a remote entity's snapshot.
  993. * \param idx Which remote entity's snapshot to get.
  994. *
  995. * \note A reference is not given to the returned pointer so don't unref it.
  996. *
  997. * \retval Remote-entity-snapshot in cache.
  998. * \retval NULL if not present.
  999. */
  1000. struct stasis_message *stasis_cache_entry_get_remote(struct stasis_cache_entry *entry, int idx);
  1001. /*!
  1002. * \brief Create a cache.
  1003. *
  1004. * This is the backend store for a \ref stasis_caching_topic. The cache is
  1005. * thread safe, allowing concurrent reads and writes.
  1006. *
  1007. * The returned object is AO2 managed, so ao2_cleanup() when you're done.
  1008. *
  1009. * \param id_fn Callback to extract the id from a snapshot message.
  1010. *
  1011. * \retval New cache indexed by \a id_fn.
  1012. * \retval \c NULL on error
  1013. *
  1014. * \since 12
  1015. */
  1016. struct stasis_cache *stasis_cache_create(snapshot_get_id id_fn);
  1017. /*!
  1018. * \brief Create a cache.
  1019. *
  1020. * This is the backend store for a \ref stasis_caching_topic. The cache is
  1021. * thread safe, allowing concurrent reads and writes.
  1022. *
  1023. * The returned object is AO2 managed, so ao2_cleanup() when you're done.
  1024. *
  1025. * \param id_fn Callback to extract the id from a snapshot message.
  1026. * \param aggregate_calc_fn Callback to calculate the aggregate cache entry.
  1027. * \param aggregate_publish_fn Callback to publish the aggregate cache entry.
  1028. *
  1029. * \note An aggregate message is a combined representation of the local
  1030. * and remote entities publishing the message data. e.g., An aggregate
  1031. * device state represents the combined device state from the local and
  1032. * any remote entities publishing state for a device. e.g., An aggregate
  1033. * MWI message is the old/new MWI counts accumulated from the local and
  1034. * any remote entities publishing to a mailbox.
  1035. *
  1036. * \retval New cache indexed by \a id_fn.
  1037. * \retval \c NULL on error
  1038. *
  1039. * \since 12.2.0
  1040. */
  1041. struct stasis_cache *stasis_cache_create_full(snapshot_get_id id_fn, cache_aggregate_calc_fn aggregate_calc_fn, cache_aggregate_publish_fn aggregate_publish_fn);
  1042. /*!
  1043. * \brief Create a topic which monitors and caches messages from another topic.
  1044. *
  1045. * The idea is that some topics publish 'snapshots' of some other object's state
  1046. * that should be cached. When these snapshot messages are received, the cache
  1047. * is updated, and a stasis_cache_update() message is forwarded, which has both
  1048. * the original snapshot message and the new message.
  1049. *
  1050. * The returned object is AO2 managed, so ao2_cleanup() when done with it.
  1051. *
  1052. * \param original_topic Topic publishing snapshot messages.
  1053. * \param cache Backend cache in which to keep snapshots.
  1054. * \return New topic which changes snapshot messages to stasis_cache_update()
  1055. * messages, and forwards all other messages from the original topic.
  1056. * \return \c NULL on error
  1057. * \since 12
  1058. */
  1059. struct stasis_caching_topic *stasis_caching_topic_create(
  1060. struct stasis_topic *original_topic, struct stasis_cache *cache);
  1061. /*!
  1062. * \brief Unsubscribes a caching topic from its upstream topic.
  1063. *
  1064. * This function returns immediately, so be sure to cleanup when
  1065. * stasis_subscription_final_message() is received.
  1066. *
  1067. * \param caching_topic Caching topic to unsubscribe
  1068. * \return \c NULL for convenience
  1069. * \since 12
  1070. */
  1071. struct stasis_caching_topic *stasis_caching_unsubscribe(
  1072. struct stasis_caching_topic *caching_topic);
  1073. /*!
  1074. * \brief Unsubscribes a caching topic from its upstream topic, blocking until
  1075. * all messages have been forwarded.
  1076. *
  1077. * See stasis_unsubscriben_and_join() for more info on when to use this as
  1078. * opposed to stasis_caching_unsubscribe().
  1079. *
  1080. * \param caching_topic Caching topic to unsubscribe
  1081. * \return \c NULL for convenience
  1082. * \since 12
  1083. */
  1084. struct stasis_caching_topic *stasis_caching_unsubscribe_and_join(
  1085. struct stasis_caching_topic *caching_topic);
  1086. /*!
  1087. * \brief Returns the topic of cached events from a caching topics.
  1088. * \param caching_topic The caching topic.
  1089. * \return The topic that publishes cache update events, along with passthrough
  1090. * events from the underlying topic.
  1091. * \return \c NULL if \a caching_topic is \c NULL.
  1092. * \since 12
  1093. */
  1094. struct stasis_topic *stasis_caching_get_topic(
  1095. struct stasis_caching_topic *caching_topic);
  1096. /*!
  1097. * \brief Indicate to a caching topic that we are interested in a message type.
  1098. *
  1099. * This will cause the caching topic to receive messages of the given message
  1100. * type. This enables internal filtering in the stasis message bus to reduce
  1101. * messages.
  1102. *
  1103. * \param caching_topic The caching topic.
  1104. * \param type The message type we wish to receive.
  1105. * \retval 0 on success
  1106. * \retval -1 failure
  1107. *
  1108. * \since 17.0.0
  1109. */
  1110. int stasis_caching_accept_message_type(struct stasis_caching_topic *caching_topic,
  1111. struct stasis_message_type *type);
  1112. /*!
  1113. * \brief Set the message type filtering level on a cache
  1114. *
  1115. * This will cause the underlying subscription to filter messages according to the
  1116. * provided filter level. For example if selective is used then only
  1117. * messages matching those provided to \ref stasis_subscription_accept_message_type
  1118. * will be raised to the subscription callback.
  1119. *
  1120. * \param caching_topic The caching topic.
  1121. * \param filter What filter to use
  1122. * \retval 0 on success
  1123. * \retval -1 failure
  1124. *
  1125. * \since 17.0.0
  1126. */
  1127. int stasis_caching_set_filter(struct stasis_caching_topic *caching_topic,
  1128. enum stasis_subscription_message_filter filter);
  1129. /*!
  1130. * \brief A message which instructs the caching topic to remove an entry from
  1131. * its cache.
  1132. *
  1133. * \param message Message representative of the cache entry that should be
  1134. * cleared. This will become the data held in the
  1135. * stasis_cache_clear message.
  1136. *
  1137. * \return Message which, when sent to a \ref stasis_caching_topic, will clear
  1138. * the item from the cache.
  1139. * \return \c NULL on error.
  1140. * \since 12
  1141. */
  1142. struct stasis_message *stasis_cache_clear_create(struct stasis_message *message);
  1143. /*!
  1144. * \brief Retrieve an item from the cache for the ast_eid_default entity.
  1145. *
  1146. * The returned item is AO2 managed, so ao2_cleanup() when you're done with it.
  1147. *
  1148. * \param cache The cache to query.
  1149. * \param type Type of message to retrieve.
  1150. * \param id Identity of the snapshot to retrieve.
  1151. *
  1152. * \retval Message from the cache.
  1153. * \retval \c NULL if message is not found.
  1154. *
  1155. * \since 12
  1156. */
  1157. struct stasis_message *stasis_cache_get(struct stasis_cache *cache, struct stasis_message_type *type, const char *id);
  1158. /*!
  1159. * \brief Retrieve an item from the cache for a specific entity.
  1160. *
  1161. * The returned item is AO2 managed, so ao2_cleanup() when you're done with it.
  1162. *
  1163. * \param cache The cache to query.
  1164. * \param type Type of message to retrieve.
  1165. * \param id Identity of the snapshot to retrieve.
  1166. * \param eid Specific entity id to retrieve. NULL for aggregate.
  1167. *
  1168. * \note An aggregate message is a combined representation of the local
  1169. * and remote entities publishing the message data. e.g., An aggregate
  1170. * device state represents the combined device state from the local and
  1171. * any remote entities publishing state for a device. e.g., An aggregate
  1172. * MWI message is the old/new MWI counts accumulated from the local and
  1173. * any remote entities publishing to a mailbox.
  1174. *
  1175. * \retval Message from the cache.
  1176. * \retval \c NULL if message is not found.
  1177. *
  1178. * \since 12.2.0
  1179. */
  1180. struct stasis_message *stasis_cache_get_by_eid(struct stasis_cache *cache, struct stasis_message_type *type, const char *id, const struct ast_eid *eid);
  1181. /*!
  1182. * \brief Retrieve all matching entity items from the cache.
  1183. * \since 12.2.0
  1184. *
  1185. * \param cache The cache to query.
  1186. * \param type Type of message to retrieve.
  1187. * \param id Identity of the snapshot to retrieve.
  1188. *
  1189. * \retval Container of matching items found.
  1190. * \retval \c NULL if error.
  1191. */
  1192. struct ao2_container *stasis_cache_get_all(struct stasis_cache *cache, struct stasis_message_type *type, const char *id);
  1193. /*!
  1194. * \brief Dump cached items to a subscription for the ast_eid_default entity.
  1195. *
  1196. * \param cache The cache to query.
  1197. * \param type Type of message to dump (any type if \c NULL).
  1198. *
  1199. * \retval ao2_container containing all matches (must be unreffed by caller)
  1200. * \retval \c NULL on allocation error
  1201. *
  1202. * \since 12
  1203. */
  1204. struct ao2_container *stasis_cache_dump(struct stasis_cache *cache, struct stasis_message_type *type);
  1205. /*!
  1206. * \brief Dump cached items to a subscription for a specific entity.
  1207. * \since 12.2.0
  1208. *
  1209. * \param cache The cache to query.
  1210. * \param type Type of message to dump (any type if \c NULL).
  1211. * \param eid Specific entity id to retrieve. NULL for aggregate.
  1212. *
  1213. * \retval ao2_container containing all matches (must be unreffed by caller)
  1214. * \retval \c NULL on allocation error
  1215. */
  1216. struct ao2_container *stasis_cache_dump_by_eid(struct stasis_cache *cache, struct stasis_message_type *type, const struct ast_eid *eid);
  1217. /*!
  1218. * \brief Dump all entity items from the cache to a subscription.
  1219. * \since 12.2.0
  1220. *
  1221. * \param cache The cache to query.
  1222. * \param type Type of message to dump (any type if \c NULL).
  1223. *
  1224. * \retval ao2_container containing all matches (must be unreffed by caller)
  1225. * \retval \c NULL on allocation error
  1226. */
  1227. struct ao2_container *stasis_cache_dump_all(struct stasis_cache *cache, struct stasis_message_type *type);
  1228. /*!
  1229. * \brief Object type code for multi user object snapshots
  1230. */
  1231. enum stasis_user_multi_object_snapshot_type {
  1232. STASIS_UMOS_CHANNEL = 0, /*!< Channel Snapshots */
  1233. STASIS_UMOS_BRIDGE, /*!< Bridge Snapshots */
  1234. STASIS_UMOS_ENDPOINT, /*!< Endpoint Snapshots */
  1235. };
  1236. /*! \brief Number of snapshot types */
  1237. #define STASIS_UMOS_MAX (STASIS_UMOS_ENDPOINT + 1)
  1238. /*!
  1239. * \brief Message type for custom user defined events with multi object blobs
  1240. * \return The stasis_message_type for user event
  1241. * \since 12.3.0
  1242. */
  1243. struct stasis_message_type *ast_multi_user_event_type(void);
  1244. /*!
  1245. * \brief Create a stasis multi object blob
  1246. * \since 12.3.0
  1247. *
  1248. * \details
  1249. * Multi object blob can store a combination of arbitrary json values
  1250. * (the blob) and also snapshots of various other system objects (such
  1251. * as channels, bridges, etc) for delivery through a stasis message.
  1252. * The multi object blob is first created, then optionally objects
  1253. * are added to it, before being attached to a message and delivered
  1254. * to stasis topic.
  1255. *
  1256. * \param blob Json blob
  1257. *
  1258. * \note When used for an ast_multi_user_event_type message, the
  1259. * json blob should contain at minimum {eventname: name}.
  1260. *
  1261. * \retval ast_multi_object_blob* if succeeded
  1262. * \retval NULL if creation failed
  1263. */
  1264. struct ast_multi_object_blob *ast_multi_object_blob_create(struct ast_json *blob);
  1265. /*!
  1266. * \brief Add an object to a multi object blob previously created
  1267. * \since 12.3.0
  1268. *
  1269. * \param multi The multi object blob previously created
  1270. * \param type Type code for the object such as channel, bridge, etc.
  1271. * \param object Snapshot object of the type supplied to typename
  1272. *
  1273. * \return Nothing
  1274. */
  1275. void ast_multi_object_blob_add(struct ast_multi_object_blob *multi, enum stasis_user_multi_object_snapshot_type type, void *object);
  1276. /*!
  1277. * \brief Create and publish a stasis message blob on a channel with it's snapshot
  1278. * \since 12.3.0
  1279. *
  1280. * \details
  1281. * For compatibility with app_userevent, this creates a multi object
  1282. * blob message, attaches the channel snapshot to it, and publishes it
  1283. * to the channel's topic.
  1284. *
  1285. * \param chan The channel to snapshot and publish event to
  1286. * \param type The message type
  1287. * \param blob A json blob to publish with the snapshot
  1288. *
  1289. * \return Nothing
  1290. */
  1291. void ast_multi_object_blob_single_channel_publish(struct ast_channel *chan, struct stasis_message_type *type, struct ast_json *blob);
  1292. /*! @} */
  1293. /*! @{ */
  1294. /*!
  1295. * \internal
  1296. * \brief Log a message about invalid attempt to access a type.
  1297. */
  1298. void stasis_log_bad_type_access(const char *name);
  1299. /*!
  1300. * \brief Boiler-plate messaging macro for defining public message types.
  1301. *
  1302. * \code
  1303. * STASIS_MESSAGE_TYPE_DEFN(ast_foo_type,
  1304. * .to_ami = foo_to_ami,
  1305. * .to_json = foo_to_json,
  1306. * .to_event = foo_to_event,
  1307. * );
  1308. * \endcode
  1309. *
  1310. * \param name Name of message type.
  1311. * \param ... Virtual table methods for messages of this type.
  1312. * \since 12
  1313. */
  1314. #define STASIS_MESSAGE_TYPE_DEFN(name, ...) \
  1315. static struct stasis_message_vtable _priv_ ## name ## _v = { \
  1316. __VA_ARGS__ \
  1317. }; \
  1318. static struct stasis_message_type *_priv_ ## name; \
  1319. struct stasis_message_type *name(void) { \
  1320. if (_priv_ ## name == NULL) { \
  1321. stasis_log_bad_type_access(#name); \
  1322. } \
  1323. return _priv_ ## name; \
  1324. }
  1325. /*!
  1326. * \brief Boiler-plate messaging macro for defining local message types.
  1327. *
  1328. * \code
  1329. * STASIS_MESSAGE_TYPE_DEFN_LOCAL(ast_foo_type,
  1330. * .to_ami = foo_to_ami,
  1331. * .to_json = foo_to_json,
  1332. * .to_event = foo_to_event,
  1333. * );
  1334. * \endcode
  1335. *
  1336. * \param name Name of message type.
  1337. * \param ... Virtual table methods for messages of this type.
  1338. * \since 12
  1339. */
  1340. #define STASIS_MESSAGE_TYPE_DEFN_LOCAL(name, ...) \
  1341. static struct stasis_message_vtable _priv_ ## name ## _v = { \
  1342. __VA_ARGS__ \
  1343. }; \
  1344. static struct stasis_message_type *_priv_ ## name; \
  1345. static struct stasis_message_type *name(void) { \
  1346. if (_priv_ ## name == NULL) { \
  1347. stasis_log_bad_type_access(#name); \
  1348. } \
  1349. return _priv_ ## name; \
  1350. }
  1351. /*!
  1352. * \brief Boiler-plate messaging macro for initializing message types.
  1353. *
  1354. * \code
  1355. * if (STASIS_MESSAGE_TYPE_INIT(ast_foo_type) != 0) {
  1356. * return -1;
  1357. * }
  1358. * \endcode
  1359. *
  1360. * \param name Name of message type.
  1361. * \return 0 if initialization is successful.
  1362. * \return Non-zero on failure.
  1363. * \since 12
  1364. */
  1365. #define STASIS_MESSAGE_TYPE_INIT(name) \
  1366. ({ \
  1367. ast_assert(_priv_ ## name == NULL); \
  1368. stasis_message_type_create(#name, \
  1369. &_priv_ ## name ## _v, &_priv_ ## name) == STASIS_MESSAGE_TYPE_ERROR ? 1 : 0; \
  1370. })
  1371. /*!
  1372. * \brief Boiler-plate messaging macro for cleaning up message types.
  1373. *
  1374. * Note that if your type is defined in core instead of a loadable module, you
  1375. * should call message type cleanup from an ast_register_cleanup() handler
  1376. * instead of an ast_register_atexit() handler.
  1377. *
  1378. * The reason is that during an immediate shutdown, loadable modules (which may
  1379. * refer to core message types) are not unloaded. While the atexit handlers are
  1380. * run, there's a window of time where a module subscription might reference a
  1381. * core message type after it's been cleaned up. Which is bad.
  1382. *
  1383. * \param name Name of message type.
  1384. * \since 12
  1385. */
  1386. #define STASIS_MESSAGE_TYPE_CLEANUP(name) \
  1387. ({ \
  1388. ao2_cleanup(_priv_ ## name); \
  1389. _priv_ ## name = NULL; \
  1390. })
  1391. /*! @} */
  1392. /*! @{ */
  1393. /*!
  1394. * \brief Initialize the Stasis subsystem.
  1395. * \return 0 on success.
  1396. * \return Non-zero on error.
  1397. * \since 12
  1398. */
  1399. int stasis_init(void);
  1400. /*! @} */
  1401. /*! @{ */
  1402. /*!
  1403. * \internal
  1404. * \brief called by stasis_init() for cache initialization.
  1405. * \return 0 on success.
  1406. * \return Non-zero on error.
  1407. * \since 12
  1408. */
  1409. int stasis_cache_init(void);
  1410. /*!
  1411. * \internal
  1412. * \brief called by stasis_init() for config initialization.
  1413. * \return 0 on success.
  1414. * \return Non-zero on error.
  1415. * \since 12
  1416. */
  1417. int stasis_config_init(void);
  1418. /*! @} */
  1419. /*!
  1420. * \defgroup StasisTopicsAndMessages Stasis topics, and their messages.
  1421. *
  1422. * This group contains the topics, messages and corresponding message types
  1423. * found within Asterisk.
  1424. */
  1425. #endif /* _ASTERISK_STASIS_H */