123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500 |
- /*
- * Asterisk -- An open source telephony toolkit.
- *
- * Copyright (C) 2013, Digium, Inc.
- *
- * David M. Lee, II <dlee@digium.com>
- *
- * See http://www.asterisk.org for more information about
- * the Asterisk project. Please do not directly contact
- * any of the maintainers of this project for assistance;
- * the project provides a web site, mailing lists and IRC
- * channels for your use.
- *
- * This program is free software, distributed under the terms of
- * the GNU General Public License Version 2. See the LICENSE file
- * at the top of the source tree.
- */
- /*! \file
- *
- * \brief Asterisk endpoint API.
- *
- * \author David M. Lee, II <dlee@digium.com>
- */
- /*** MODULEINFO
- <support_level>core</support_level>
- ***/
- #include "asterisk.h"
- ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
- #include "asterisk/astobj2.h"
- #include "asterisk/endpoints.h"
- #include "asterisk/stasis.h"
- #include "asterisk/stasis_channels.h"
- #include "asterisk/stasis_endpoints.h"
- #include "asterisk/stasis_message_router.h"
- #include "asterisk/stringfields.h"
- #include "asterisk/_private.h"
- /*! Buckets for endpoint->channel mappings. Keep it prime! */
- #define ENDPOINT_CHANNEL_BUCKETS 127
- /*! Buckets for endpoint hash. Keep it prime! */
- #define ENDPOINT_BUCKETS 127
- /*! Buckets for technology endpoints. */
- #define TECH_ENDPOINT_BUCKETS 11
- static struct ao2_container *endpoints;
- static struct ao2_container *tech_endpoints;
- struct ast_endpoint {
- AST_DECLARE_STRING_FIELDS(
- AST_STRING_FIELD(tech); /*!< Technology (SIP, IAX2, etc.). */
- AST_STRING_FIELD(resource); /*!< Name, unique to the tech. */
- AST_STRING_FIELD(id); /*!< tech/resource id */
- );
- /*! Endpoint's current state */
- enum ast_endpoint_state state;
- /*!
- * \brief Max channels for this endpoint. -1 means unlimited or unknown.
- *
- * Note that this simply documents the limits of an endpoint, and does
- * nothing to try to enforce the limit.
- */
- int max_channels;
- /*! Topic for this endpoint's messages */
- struct stasis_cp_single *topics;
- /*! Router for handling this endpoint's messages */
- struct stasis_message_router *router;
- /*! ast_str_container of channels associated with this endpoint */
- struct ao2_container *channel_ids;
- /*! Forwarding subscription from an endpoint to its tech endpoint */
- struct stasis_forward *tech_forward;
- };
- AO2_STRING_FIELD_HASH_FN(ast_endpoint, id)
- AO2_STRING_FIELD_CMP_FN(ast_endpoint, id)
- struct ast_endpoint *ast_endpoint_find_by_id(const char *id)
- {
- struct ast_endpoint *endpoint = ao2_find(endpoints, id, OBJ_KEY);
- if (!endpoint) {
- endpoint = ao2_find(tech_endpoints, id, OBJ_KEY);
- }
- return endpoint;
- }
- struct stasis_topic *ast_endpoint_topic(struct ast_endpoint *endpoint)
- {
- if (!endpoint) {
- return ast_endpoint_topic_all();
- }
- return stasis_cp_single_topic(endpoint->topics);
- }
- struct stasis_topic *ast_endpoint_topic_cached(struct ast_endpoint *endpoint)
- {
- if (!endpoint) {
- return ast_endpoint_topic_all_cached();
- }
- return stasis_cp_single_topic_cached(endpoint->topics);
- }
- const char *ast_endpoint_state_to_string(enum ast_endpoint_state state)
- {
- switch (state) {
- case AST_ENDPOINT_UNKNOWN:
- return "unknown";
- case AST_ENDPOINT_OFFLINE:
- return "offline";
- case AST_ENDPOINT_ONLINE:
- return "online";
- }
- return "?";
- }
- static void endpoint_publish_snapshot(struct ast_endpoint *endpoint)
- {
- RAII_VAR(struct ast_endpoint_snapshot *, snapshot, NULL, ao2_cleanup);
- RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup);
- ast_assert(endpoint != NULL);
- ast_assert(endpoint->topics != NULL);
- if (!ast_endpoint_snapshot_type()) {
- return;
- }
- snapshot = ast_endpoint_snapshot_create(endpoint);
- if (!snapshot) {
- return;
- }
- message = stasis_message_create(ast_endpoint_snapshot_type(), snapshot);
- if (!message) {
- return;
- }
- stasis_publish(ast_endpoint_topic(endpoint), message);
- }
- static void endpoint_dtor(void *obj)
- {
- struct ast_endpoint *endpoint = obj;
- /* The router should be shut down already */
- ast_assert(stasis_message_router_is_done(endpoint->router));
- ao2_cleanup(endpoint->router);
- endpoint->router = NULL;
- stasis_cp_single_unsubscribe(endpoint->topics);
- endpoint->topics = NULL;
- ao2_cleanup(endpoint->channel_ids);
- endpoint->channel_ids = NULL;
- ast_string_field_free_memory(endpoint);
- }
- int ast_endpoint_add_channel(struct ast_endpoint *endpoint,
- struct ast_channel *chan)
- {
- ast_assert(chan != NULL);
- ast_assert(endpoint != NULL);
- ast_assert(!ast_strlen_zero(endpoint->resource));
- ast_channel_forward_endpoint(chan, endpoint);
- ao2_lock(endpoint);
- ast_str_container_add(endpoint->channel_ids, ast_channel_uniqueid(chan));
- ao2_unlock(endpoint);
- endpoint_publish_snapshot(endpoint);
- return 0;
- }
- /*! \brief Handler for channel snapshot cache clears */
- static void endpoint_cache_clear(void *data,
- struct stasis_subscription *sub,
- struct stasis_message *message)
- {
- struct ast_endpoint *endpoint = data;
- struct stasis_message *clear_msg = stasis_message_data(message);
- struct ast_channel_snapshot *clear_snapshot;
- if (stasis_message_type(clear_msg) != ast_channel_snapshot_type()) {
- return;
- }
- clear_snapshot = stasis_message_data(clear_msg);
- ast_assert(endpoint != NULL);
- ao2_lock(endpoint);
- ast_str_container_remove(endpoint->channel_ids, clear_snapshot->uniqueid);
- ao2_unlock(endpoint);
- endpoint_publish_snapshot(endpoint);
- }
- static void endpoint_subscription_change(void *data,
- struct stasis_subscription *sub,
- struct stasis_message *message)
- {
- struct stasis_endpoint *endpoint = data;
- if (stasis_subscription_final_message(sub, message)) {
- ao2_cleanup(endpoint);
- }
- }
- static struct ast_endpoint *endpoint_internal_create(const char *tech, const char *resource)
- {
- RAII_VAR(struct ast_endpoint *, endpoint, NULL, ao2_cleanup);
- RAII_VAR(struct ast_endpoint *, tech_endpoint, NULL, ao2_cleanup);
- int r = 0;
- /* Get/create the technology endpoint */
- if (!ast_strlen_zero(resource)) {
- tech_endpoint = ao2_find(tech_endpoints, tech, OBJ_KEY);
- if (!tech_endpoint) {
- tech_endpoint = endpoint_internal_create(tech, NULL);
- if (!tech_endpoint) {
- return NULL;
- }
- }
- }
- endpoint = ao2_alloc(sizeof(*endpoint), endpoint_dtor);
- if (!endpoint) {
- return NULL;
- }
- endpoint->max_channels = -1;
- endpoint->state = AST_ENDPOINT_UNKNOWN;
- if (ast_string_field_init(endpoint, 80) != 0) {
- return NULL;
- }
- ast_string_field_set(endpoint, tech, tech);
- ast_string_field_set(endpoint, resource, S_OR(resource, ""));
- ast_string_field_build(endpoint, id, "%s%s%s",
- tech,
- !ast_strlen_zero(resource) ? "/" : "",
- S_OR(resource, ""));
- /* All access to channel_ids should be covered by the endpoint's
- * lock; no extra lock needed. */
- endpoint->channel_ids = ast_str_container_alloc_options(
- AO2_ALLOC_OPT_LOCK_NOLOCK, ENDPOINT_CHANNEL_BUCKETS);
- if (!endpoint->channel_ids) {
- return NULL;
- }
- if (!ast_strlen_zero(resource)) {
- endpoint->topics = stasis_cp_single_create(ast_endpoint_cache_all(),
- endpoint->id);
- if (!endpoint->topics) {
- return NULL;
- }
- stasis_cp_single_accept_message_type(endpoint->topics, ast_endpoint_snapshot_type());
- stasis_cp_single_set_filter(endpoint->topics, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
- endpoint->router = stasis_message_router_create_pool(ast_endpoint_topic(endpoint));
- if (!endpoint->router) {
- return NULL;
- }
- r |= stasis_message_router_add(endpoint->router,
- stasis_cache_clear_type(), endpoint_cache_clear,
- endpoint);
- r |= stasis_message_router_add(endpoint->router,
- stasis_subscription_change_type(), endpoint_subscription_change,
- endpoint);
- if (r) {
- return NULL;
- }
- endpoint->tech_forward = stasis_forward_all(stasis_cp_single_topic(endpoint->topics),
- stasis_cp_single_topic(tech_endpoint->topics));
- endpoint_publish_snapshot(endpoint);
- ao2_link(endpoints, endpoint);
- } else {
- endpoint->topics = stasis_cp_sink_create(ast_endpoint_cache_all(),
- endpoint->id);
- if (!endpoint->topics) {
- return NULL;
- }
- stasis_cp_single_accept_message_type(endpoint->topics, ast_endpoint_snapshot_type());
- stasis_cp_single_set_filter(endpoint->topics, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
- ao2_link(tech_endpoints, endpoint);
- }
- ao2_ref(endpoint, +1);
- return endpoint;
- }
- struct ast_endpoint *ast_endpoint_create(const char *tech, const char *resource)
- {
- if (ast_strlen_zero(tech)) {
- ast_log(LOG_ERROR, "Endpoint tech cannot be empty\n");
- return NULL;
- }
- if (ast_strlen_zero(resource)) {
- ast_log(LOG_ERROR, "Endpoint resource cannot be empty\n");
- return NULL;
- }
- return endpoint_internal_create(tech, resource);
- }
- static struct stasis_message *create_endpoint_snapshot_message(struct ast_endpoint *endpoint)
- {
- RAII_VAR(struct ast_endpoint_snapshot *, snapshot, NULL, ao2_cleanup);
- if (!ast_endpoint_snapshot_type()) {
- return NULL;
- }
- snapshot = ast_endpoint_snapshot_create(endpoint);
- if (!snapshot) {
- return NULL;
- }
- return stasis_message_create(ast_endpoint_snapshot_type(), snapshot);
- }
- void ast_endpoint_shutdown(struct ast_endpoint *endpoint)
- {
- RAII_VAR(struct stasis_message *, clear_msg, NULL, ao2_cleanup);
- if (endpoint == NULL) {
- return;
- }
- ao2_unlink(endpoints, endpoint);
- endpoint->tech_forward = stasis_forward_cancel(endpoint->tech_forward);
- clear_msg = create_endpoint_snapshot_message(endpoint);
- if (clear_msg) {
- RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup);
- message = stasis_cache_clear_create(clear_msg);
- if (message) {
- stasis_publish(ast_endpoint_topic(endpoint), message);
- }
- }
- /* Bump refcount to hold on to the router */
- ao2_ref(endpoint->router, +1);
- stasis_message_router_unsubscribe(endpoint->router);
- }
- const char *ast_endpoint_get_tech(const struct ast_endpoint *endpoint)
- {
- if (!endpoint) {
- return NULL;
- }
- return endpoint->tech;
- }
- const char *ast_endpoint_get_resource(const struct ast_endpoint *endpoint)
- {
- if (!endpoint) {
- return NULL;
- }
- return endpoint->resource;
- }
- const char *ast_endpoint_get_id(const struct ast_endpoint *endpoint)
- {
- if (!endpoint) {
- return NULL;
- }
- return endpoint->id;
- }
- enum ast_endpoint_state ast_endpoint_get_state(const struct ast_endpoint *endpoint)
- {
- if (!endpoint) {
- return AST_ENDPOINT_UNKNOWN;
- }
- return endpoint->state;
- }
- void ast_endpoint_set_state(struct ast_endpoint *endpoint,
- enum ast_endpoint_state state)
- {
- ast_assert(endpoint != NULL);
- ast_assert(!ast_strlen_zero(endpoint->resource));
- ao2_lock(endpoint);
- endpoint->state = state;
- ao2_unlock(endpoint);
- endpoint_publish_snapshot(endpoint);
- }
- void ast_endpoint_set_max_channels(struct ast_endpoint *endpoint,
- int max_channels)
- {
- ast_assert(endpoint != NULL);
- ast_assert(!ast_strlen_zero(endpoint->resource));
- ao2_lock(endpoint);
- endpoint->max_channels = max_channels;
- ao2_unlock(endpoint);
- endpoint_publish_snapshot(endpoint);
- }
- static void endpoint_snapshot_dtor(void *obj)
- {
- struct ast_endpoint_snapshot *snapshot = obj;
- int channel;
- ast_assert(snapshot != NULL);
- for (channel = 0; channel < snapshot->num_channels; channel++) {
- ao2_ref(snapshot->channel_ids[channel], -1);
- }
- ast_string_field_free_memory(snapshot);
- }
- struct ast_endpoint_snapshot *ast_endpoint_snapshot_create(
- struct ast_endpoint *endpoint)
- {
- struct ast_endpoint_snapshot *snapshot;
- int channel_count;
- struct ao2_iterator i;
- void *obj;
- SCOPED_AO2LOCK(lock, endpoint);
- ast_assert(endpoint != NULL);
- ast_assert(!ast_strlen_zero(endpoint->resource));
- channel_count = ao2_container_count(endpoint->channel_ids);
- snapshot = ao2_alloc_options(
- sizeof(*snapshot) + channel_count * sizeof(char *),
- endpoint_snapshot_dtor,
- AO2_ALLOC_OPT_LOCK_NOLOCK);
- if (!snapshot || ast_string_field_init(snapshot, 80) != 0) {
- ao2_cleanup(snapshot);
- return NULL;
- }
- ast_string_field_build(snapshot, id, "%s/%s", endpoint->tech,
- endpoint->resource);
- ast_string_field_set(snapshot, tech, endpoint->tech);
- ast_string_field_set(snapshot, resource, endpoint->resource);
- snapshot->state = endpoint->state;
- snapshot->max_channels = endpoint->max_channels;
- i = ao2_iterator_init(endpoint->channel_ids, 0);
- while ((obj = ao2_iterator_next(&i))) {
- /* The reference is kept so the channel id does not go away until the snapshot is gone */
- snapshot->channel_ids[snapshot->num_channels++] = obj;
- }
- ao2_iterator_destroy(&i);
- return snapshot;
- }
- static void endpoint_cleanup(void)
- {
- ao2_cleanup(endpoints);
- endpoints = NULL;
- ao2_cleanup(tech_endpoints);
- tech_endpoints = NULL;
- }
- int ast_endpoint_init(void)
- {
- ast_register_cleanup(endpoint_cleanup);
- endpoints = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_MUTEX, 0, ENDPOINT_BUCKETS,
- ast_endpoint_hash_fn, NULL, ast_endpoint_cmp_fn);
- if (!endpoints) {
- return -1;
- }
- tech_endpoints = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_MUTEX, 0,
- TECH_ENDPOINT_BUCKETS, ast_endpoint_hash_fn, NULL, ast_endpoint_cmp_fn);
- if (!tech_endpoints) {
- return -1;
- }
- return 0;
- }
|