123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450 |
- /*
- * Asterisk -- An open source telephony toolkit.
- *
- * Copyright (C) 2013, Digium, Inc.
- *
- * David M. Lee, II <dlee@digium.com>
- *
- * See http://www.asterisk.org for more information about
- * the Asterisk project. Please do not directly contact
- * any of the maintainers of this project for assistance;
- * the project provides a web site, mailing lists and IRC
- * channels for your use.
- *
- * This program is free software, distributed under the terms of
- * the GNU General Public License Version 2. See the LICENSE file
- * at the top of the source tree.
- */
- /*! \file
- *
- * \brief Stasis message router implementation.
- *
- * \author David M. Lee, II <dlee@digium.com>
- */
- /*** MODULEINFO
- <support_level>core</support_level>
- ***/
- #include "asterisk.h"
- ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
- #include "asterisk/astobj2.h"
- #include "asterisk/stasis_message_router.h"
- #include "asterisk/vector.h"
- /*! \internal */
- struct stasis_message_route {
- /*! Message type handle by this route. */
- struct stasis_message_type *message_type;
- /*! Callback function for incoming message processing. */
- stasis_subscription_cb callback;
- /*! Data pointer to be handed to the callback. */
- void *data;
- };
- AST_VECTOR(route_table, struct stasis_message_route);
- static struct stasis_message_route *route_table_find(struct route_table *table,
- struct stasis_message_type *message_type)
- {
- size_t idx;
- struct stasis_message_route *route;
- /* While a linear search for routes may seem very inefficient, most
- * route tables have six routes or less. For such small data, it's
- * hard to beat a linear search. If we start having larger route
- * tables, then we can look into containers with more efficient
- * lookups.
- */
- for (idx = 0; idx < AST_VECTOR_SIZE(table); ++idx) {
- route = AST_VECTOR_GET_ADDR(table, idx);
- if (route->message_type == message_type) {
- return route;
- }
- }
- return NULL;
- }
- /*!
- * \brief route_table comparator for AST_VECTOR_REMOVE_CMP_UNORDERED()
- *
- * \param elem Element to compare against
- * \param value Value to compare with the vector element.
- *
- * \return 0 if element does not match.
- * \return Non-zero if element matches.
- */
- #define ROUTE_TABLE_ELEM_CMP(elem, value) ((elem).message_type == (value))
- /*!
- * \brief route_table vector element cleanup.
- *
- * \param elem Element to cleanup
- *
- * \return Nothing
- */
- #define ROUTE_TABLE_ELEM_CLEANUP(elem) ao2_cleanup((elem).message_type)
- static int route_table_remove(struct route_table *table,
- struct stasis_message_type *message_type)
- {
- return AST_VECTOR_REMOVE_CMP_UNORDERED(table, message_type, ROUTE_TABLE_ELEM_CMP,
- ROUTE_TABLE_ELEM_CLEANUP);
- }
- static int route_table_add(struct route_table *table,
- struct stasis_message_type *message_type,
- stasis_subscription_cb callback, void *data)
- {
- struct stasis_message_route route;
- int res;
- ast_assert(callback != NULL);
- ast_assert(route_table_find(table, message_type) == NULL);
- route.message_type = ao2_bump(message_type);
- route.callback = callback;
- route.data = data;
- res = AST_VECTOR_APPEND(table, route);
- if (res) {
- ROUTE_TABLE_ELEM_CLEANUP(route);
- }
- return res;
- }
- static void route_table_dtor(struct route_table *table)
- {
- size_t idx;
- struct stasis_message_route *route;
- for (idx = 0; idx < AST_VECTOR_SIZE(table); ++idx) {
- route = AST_VECTOR_GET_ADDR(table, idx);
- ROUTE_TABLE_ELEM_CLEANUP(*route);
- }
- AST_VECTOR_FREE(table);
- }
- /*! \internal */
- struct stasis_message_router {
- /*! Subscription to the upstream topic */
- struct stasis_subscription *subscription;
- /*! Subscribed routes */
- struct route_table routes;
- /*! Subscribed routes for \ref stasis_cache_update messages */
- struct route_table cache_routes;
- /*! Route of last resort */
- struct stasis_message_route default_route;
- };
- static void router_dtor(void *obj)
- {
- struct stasis_message_router *router = obj;
- ast_assert(!stasis_subscription_is_subscribed(router->subscription));
- ast_assert(stasis_subscription_is_done(router->subscription));
- router->subscription = NULL;
- route_table_dtor(&router->routes);
- route_table_dtor(&router->cache_routes);
- }
- static int find_route(
- struct stasis_message_router *router,
- struct stasis_message *message,
- struct stasis_message_route *route_out)
- {
- struct stasis_message_route *route = NULL;
- struct stasis_message_type *type = stasis_message_type(message);
- SCOPED_AO2LOCK(lock, router);
- ast_assert(route_out != NULL);
- if (type == stasis_cache_update_type()) {
- /* Find a cache route */
- struct stasis_cache_update *update =
- stasis_message_data(message);
- route = route_table_find(&router->cache_routes, update->type);
- }
- if (route == NULL) {
- /* Find a regular route */
- route = route_table_find(&router->routes, type);
- }
- if (route == NULL && router->default_route.callback) {
- /* Maybe the default route, then? */
- route = &router->default_route;
- }
- if (!route) {
- return -1;
- }
- *route_out = *route;
- return 0;
- }
- static void router_dispatch(void *data,
- struct stasis_subscription *sub,
- struct stasis_message *message)
- {
- struct stasis_message_router *router = data;
- struct stasis_message_route route;
- if (find_route(router, message, &route) == 0) {
- route.callback(route.data, sub, message);
- }
- if (stasis_subscription_final_message(sub, message)) {
- ao2_cleanup(router);
- }
- }
- static struct stasis_message_router *stasis_message_router_create_internal(
- struct stasis_topic *topic, int use_thread_pool, const char *file, int lineno,
- const char *func)
- {
- int res;
- struct stasis_message_router *router;
- router = ao2_t_alloc(sizeof(*router), router_dtor, stasis_topic_name(topic));
- if (!router) {
- return NULL;
- }
- res = 0;
- res |= AST_VECTOR_INIT(&router->routes, 0);
- res |= AST_VECTOR_INIT(&router->cache_routes, 0);
- if (res) {
- ao2_ref(router, -1);
- return NULL;
- }
- if (use_thread_pool) {
- router->subscription = __stasis_subscribe_pool(topic, router_dispatch, router, file, lineno, func);
- } else {
- router->subscription = __stasis_subscribe(topic, router_dispatch, router, file, lineno, func);
- }
- if (!router->subscription) {
- ao2_ref(router, -1);
- return NULL;
- }
- /* We need to receive subscription change messages so we know when our subscription goes away */
- stasis_subscription_accept_message_type(router->subscription, stasis_subscription_change_type());
- return router;
- }
- struct stasis_message_router *__stasis_message_router_create(
- struct stasis_topic *topic, const char *file, int lineno, const char *func)
- {
- return stasis_message_router_create_internal(topic, 0, file, lineno, func);
- }
- struct stasis_message_router *__stasis_message_router_create_pool(
- struct stasis_topic *topic, const char *file, int lineno, const char *func)
- {
- return stasis_message_router_create_internal(topic, 1, file, lineno, func);
- }
- void stasis_message_router_unsubscribe(struct stasis_message_router *router)
- {
- if (!router) {
- return;
- }
- ao2_lock(router);
- router->subscription = stasis_unsubscribe(router->subscription);
- ao2_unlock(router);
- }
- void stasis_message_router_unsubscribe_and_join(
- struct stasis_message_router *router)
- {
- if (!router) {
- return;
- }
- stasis_unsubscribe_and_join(router->subscription);
- }
- int stasis_message_router_is_done(struct stasis_message_router *router)
- {
- if (!router) {
- /* Null router is about as done as you can get */
- return 1;
- }
- return stasis_subscription_is_done(router->subscription);
- }
- void stasis_message_router_publish_sync(struct stasis_message_router *router,
- struct stasis_message *message)
- {
- ast_assert(router != NULL);
- ao2_bump(router);
- stasis_publish_sync(router->subscription, message);
- ao2_cleanup(router);
- }
- int stasis_message_router_set_congestion_limits(struct stasis_message_router *router,
- long low_water, long high_water)
- {
- int res = -1;
- if (router) {
- res = stasis_subscription_set_congestion_limits(router->subscription,
- low_water, high_water);
- }
- return res;
- }
- int stasis_message_router_add(struct stasis_message_router *router,
- struct stasis_message_type *message_type,
- stasis_subscription_cb callback, void *data)
- {
- int res;
- ast_assert(router != NULL);
- if (!message_type) {
- /* Cannot route to NULL type. */
- return -1;
- }
- ao2_lock(router);
- res = route_table_add(&router->routes, message_type, callback, data);
- if (!res) {
- stasis_subscription_accept_message_type(router->subscription, message_type);
- /* Until a specific message type was added we would already drop the message, so being
- * selective now doesn't harm us. If we have a default route then we are already forced
- * to filter nothing and messages will come in regardless.
- */
- stasis_subscription_set_filter(router->subscription, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
- }
- ao2_unlock(router);
- return res;
- }
- int stasis_message_router_add_cache_update(struct stasis_message_router *router,
- struct stasis_message_type *message_type,
- stasis_subscription_cb callback, void *data)
- {
- int res;
- ast_assert(router != NULL);
- if (!message_type) {
- /* Cannot cache a route to NULL type. */
- return -1;
- }
- ao2_lock(router);
- res = route_table_add(&router->cache_routes, message_type, callback, data);
- if (!res) {
- stasis_subscription_accept_message_type(router->subscription, stasis_cache_update_type());
- stasis_subscription_set_filter(router->subscription, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
- }
- ao2_unlock(router);
- return res;
- }
- void stasis_message_router_remove(struct stasis_message_router *router,
- struct stasis_message_type *message_type)
- {
- ast_assert(router != NULL);
- if (!message_type) {
- /* Cannot remove a NULL type. */
- return;
- }
- ao2_lock(router);
- route_table_remove(&router->routes, message_type);
- ao2_unlock(router);
- }
- void stasis_message_router_remove_cache_update(
- struct stasis_message_router *router,
- struct stasis_message_type *message_type)
- {
- ast_assert(router != NULL);
- if (!message_type) {
- /* Cannot remove a NULL type. */
- return;
- }
- ao2_lock(router);
- route_table_remove(&router->cache_routes, message_type);
- ao2_unlock(router);
- }
- int stasis_message_router_set_default(struct stasis_message_router *router,
- stasis_subscription_cb callback,
- void *data)
- {
- stasis_message_router_set_formatters_default(router, callback, data, STASIS_SUBSCRIPTION_FORMATTER_NONE);
- /* While this implementation can never fail, it used to be able to */
- return 0;
- }
- void stasis_message_router_set_formatters_default(struct stasis_message_router *router,
- stasis_subscription_cb callback,
- void *data,
- enum stasis_subscription_message_formatters formatters)
- {
- ast_assert(router != NULL);
- ast_assert(callback != NULL);
- stasis_subscription_accept_formatters(router->subscription, formatters);
- ao2_lock(router);
- router->default_route.callback = callback;
- router->default_route.data = data;
- ao2_unlock(router);
- if (formatters == STASIS_SUBSCRIPTION_FORMATTER_NONE) {
- /* Formatters govern what messages the default callback get, so it is only if none is
- * specified that we accept all messages regardless.
- */
- stasis_subscription_set_filter(router->subscription, STASIS_SUBSCRIPTION_FILTER_FORCED_NONE);
- }
- }
- void stasis_message_router_accept_formatters(struct stasis_message_router *router,
- enum stasis_subscription_message_formatters formatters)
- {
- ast_assert(router != NULL);
- stasis_subscription_accept_formatters(router->subscription, formatters);
- return;
- }
- #ifdef AST_DEVMODE
- #undef stasis_message_router_create
- struct stasis_message_router *stasis_message_router_create(
- struct stasis_topic *topic);
- #undef stasis_message_router_create_pool
- struct stasis_message_router *stasis_message_router_create_pool(
- struct stasis_topic *topic);
- #endif
- struct stasis_message_router *stasis_message_router_create(
- struct stasis_topic *topic)
- {
- return stasis_message_router_create_internal(topic, 0, __FILE__, __LINE__, __PRETTY_FUNCTION__);
- }
- struct stasis_message_router *stasis_message_router_create_pool(
- struct stasis_topic *topic)
- {
- return stasis_message_router_create_internal(topic, 1, __FILE__, __LINE__, __PRETTY_FUNCTION__);
- }
|