endpoints.c

Go to the documentation of this file.
00001 /*
00002  * Asterisk -- An open source telephony toolkit.
00003  *
00004  * Copyright (C) 2013, Digium, Inc.
00005  *
00006  * David M. Lee, II <dlee@digium.com>
00007  *
00008  * See http://www.asterisk.org for more information about
00009  * the Asterisk project. Please do not directly contact
00010  * any of the maintainers of this project for assistance;
00011  * the project provides a web site, mailing lists and IRC
00012  * channels for your use.
00013  *
00014  * This program is free software, distributed under the terms of
00015  * the GNU General Public License Version 2. See the LICENSE file
00016  * at the top of the source tree.
00017  */
00018 
00019 /*! \file
00020  *
00021  * \brief Asterisk endpoint API.
00022  *
00023  * \author David M. Lee, II <dlee@digium.com>
00024  */
00025 
00026 /*** MODULEINFO
00027    <support_level>core</support_level>
00028  ***/
00029 
00030 #include "asterisk.h"
00031 
00032 ASTERISK_FILE_VERSION(__FILE__, "$Revision: 433065 $")
00033 
00034 #include "asterisk/astobj2.h"
00035 #include "asterisk/endpoints.h"
00036 #include "asterisk/stasis.h"
00037 #include "asterisk/stasis_channels.h"
00038 #include "asterisk/stasis_endpoints.h"
00039 #include "asterisk/stasis_message_router.h"
00040 #include "asterisk/stringfields.h"
00041 #include "asterisk/_private.h"
00042 
00043 /*! Buckets for endpoint->channel mappings. Keep it prime! */
00044 #define ENDPOINT_CHANNEL_BUCKETS 127
00045 
00046 /*! Buckets for endpoint hash. Keep it prime! */
00047 #define ENDPOINT_BUCKETS 127
00048 
00049 /*! Buckets for technology endpoints. */
00050 #define TECH_ENDPOINT_BUCKETS 11
00051 
00052 static struct ao2_container *endpoints;
00053 
00054 static struct ao2_container *tech_endpoints;
00055 
00056 struct ast_endpoint {
00057    AST_DECLARE_STRING_FIELDS(
00058       AST_STRING_FIELD(tech); /*!< Technology (SIP, IAX2, etc.). */
00059       AST_STRING_FIELD(resource);   /*!< Name, unique to the tech. */
00060       AST_STRING_FIELD(id);   /*!< tech/resource id */
00061       );
00062    /*! Endpoint's current state */
00063    enum ast_endpoint_state state;
00064    /*!
00065     * \brief Max channels for this endpoint. -1 means unlimited or unknown.
00066     *
00067     * Note that this simply documents the limits of an endpoint, and does
00068     * nothing to try to enforce the limit.
00069     */
00070    int max_channels;
00071    /*! Topic for this endpoint's messages */
00072    struct stasis_cp_single *topics;
00073    /*! Router for handling this endpoint's messages */
00074    struct stasis_message_router *router;
00075    /*! ast_str_container of channels associated with this endpoint */
00076    struct ao2_container *channel_ids;
00077    /*! Forwarding subscription from an endpoint to its tech endpoint */
00078    struct stasis_forward *tech_forward;
00079 };
00080 
00081 static int endpoint_hash(const void *obj, int flags)
00082 {
00083    const struct ast_endpoint *endpoint;
00084    const char *key;
00085 
00086    switch (flags & OBJ_SEARCH_MASK) {
00087    case OBJ_SEARCH_KEY:
00088       key = obj;
00089       return ast_str_hash(key);
00090    case OBJ_SEARCH_OBJECT:
00091       endpoint = obj;
00092       return ast_str_hash(endpoint->id);
00093    default:
00094       /* Hash can only work on something with a full key. */
00095       ast_assert(0);
00096       return 0;
00097    }
00098 }
00099 
00100 static int endpoint_cmp(void *obj, void *arg, int flags)
00101 {
00102    const struct ast_endpoint *left = obj;
00103    const struct ast_endpoint *right = arg;
00104    const char *right_key = arg;
00105    int cmp;
00106 
00107    switch (flags & OBJ_SEARCH_MASK) {
00108    case OBJ_SEARCH_OBJECT:
00109       right_key = right->id;
00110       /* Fall through */
00111    case OBJ_SEARCH_KEY:
00112       cmp = strcmp(left->id, right_key);
00113       break;
00114    case OBJ_SEARCH_PARTIAL_KEY:
00115       cmp = strncmp(left->id, right_key, strlen(right_key));
00116       break;
00117    default:
00118       ast_assert(0);
00119       cmp = 0;
00120       break;
00121    }
00122    if (cmp) {
00123       return 0;
00124    }
00125 
00126    return CMP_MATCH;
00127 }
00128 
00129 struct ast_endpoint *ast_endpoint_find_by_id(const char *id)
00130 {
00131    struct ast_endpoint *endpoint = ao2_find(endpoints, id, OBJ_KEY);
00132 
00133    if (!endpoint) {
00134       endpoint = ao2_find(tech_endpoints, id, OBJ_KEY);
00135    }
00136 
00137    return endpoint;
00138 }
00139 
00140 struct stasis_topic *ast_endpoint_topic(struct ast_endpoint *endpoint)
00141 {
00142    if (!endpoint) {
00143       return ast_endpoint_topic_all();
00144    }
00145    return stasis_cp_single_topic(endpoint->topics);
00146 }
00147 
00148 struct stasis_topic *ast_endpoint_topic_cached(struct ast_endpoint *endpoint)
00149 {
00150    if (!endpoint) {
00151       return ast_endpoint_topic_all_cached();
00152    }
00153    return stasis_cp_single_topic_cached(endpoint->topics);
00154 }
00155 
00156 const char *ast_endpoint_state_to_string(enum ast_endpoint_state state)
00157 {
00158    switch (state) {
00159    case AST_ENDPOINT_UNKNOWN:
00160       return "unknown";
00161    case AST_ENDPOINT_OFFLINE:
00162       return "offline";
00163    case AST_ENDPOINT_ONLINE:
00164       return "online";
00165    }
00166    return "?";
00167 }
00168 
00169 static void endpoint_publish_snapshot(struct ast_endpoint *endpoint)
00170 {
00171    RAII_VAR(struct ast_endpoint_snapshot *, snapshot, NULL, ao2_cleanup);
00172    RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup);
00173 
00174    ast_assert(endpoint != NULL);
00175    ast_assert(endpoint->topics != NULL);
00176 
00177    if (!ast_endpoint_snapshot_type()) {
00178       return;
00179    }
00180 
00181    snapshot = ast_endpoint_snapshot_create(endpoint);
00182    if (!snapshot) {
00183       return;
00184    }
00185    message = stasis_message_create(ast_endpoint_snapshot_type(), snapshot);
00186    if (!message) {
00187       return;
00188    }
00189    stasis_publish(ast_endpoint_topic(endpoint), message);
00190 }
00191 
00192 static void endpoint_dtor(void *obj)
00193 {
00194    struct ast_endpoint *endpoint = obj;
00195 
00196    /* The router should be shut down already */
00197    ast_assert(stasis_message_router_is_done(endpoint->router));
00198    ao2_cleanup(endpoint->router);
00199    endpoint->router = NULL;
00200 
00201    stasis_cp_single_unsubscribe(endpoint->topics);
00202    endpoint->topics = NULL;
00203 
00204    ao2_cleanup(endpoint->channel_ids);
00205    endpoint->channel_ids = NULL;
00206 
00207    ast_string_field_free_memory(endpoint);
00208 }
00209 
00210 
00211 int ast_endpoint_add_channel(struct ast_endpoint *endpoint,
00212    struct ast_channel *chan)
00213 {
00214    ast_assert(chan != NULL);
00215    ast_assert(endpoint != NULL);
00216    ast_assert(!ast_strlen_zero(endpoint->resource));
00217 
00218    ast_channel_forward_endpoint(chan, endpoint);
00219 
00220    ao2_lock(endpoint);
00221    ast_str_container_add(endpoint->channel_ids, ast_channel_uniqueid(chan));
00222    ao2_unlock(endpoint);
00223 
00224    endpoint_publish_snapshot(endpoint);
00225 
00226    return 0;
00227 }
00228 
00229 /*! \brief Handler for channel snapshot cache clears */
00230 static void endpoint_cache_clear(void *data,
00231    struct stasis_subscription *sub,
00232    struct stasis_message *message)
00233 {
00234    struct ast_endpoint *endpoint = data;
00235    struct stasis_message *clear_msg = stasis_message_data(message);
00236    struct ast_channel_snapshot *clear_snapshot;
00237 
00238    if (stasis_message_type(clear_msg) != ast_channel_snapshot_type()) {
00239       return;
00240    }
00241 
00242    clear_snapshot = stasis_message_data(clear_msg);
00243 
00244    ast_assert(endpoint != NULL);
00245 
00246    ao2_lock(endpoint);
00247    ast_str_container_remove(endpoint->channel_ids, clear_snapshot->uniqueid);
00248    ao2_unlock(endpoint);
00249    endpoint_publish_snapshot(endpoint);
00250 }
00251 
00252 static void endpoint_default(void *data,
00253    struct stasis_subscription *sub,
00254    struct stasis_message *message)
00255 {
00256    struct stasis_endpoint *endpoint = data;
00257 
00258    if (stasis_subscription_final_message(sub, message)) {
00259       ao2_cleanup(endpoint);
00260    }
00261 }
00262 
00263 static struct ast_endpoint *endpoint_internal_create(const char *tech, const char *resource)
00264 {
00265    RAII_VAR(struct ast_endpoint *, endpoint, NULL, ao2_cleanup);
00266    RAII_VAR(struct ast_endpoint *, tech_endpoint, NULL, ao2_cleanup);
00267    int r = 0;
00268 
00269    /* Get/create the technology endpoint */
00270    if (!ast_strlen_zero(resource)) {
00271       tech_endpoint = ao2_find(tech_endpoints, tech, OBJ_KEY);
00272       if (!tech_endpoint) {
00273          tech_endpoint = endpoint_internal_create(tech, NULL);
00274          if (!tech_endpoint) {
00275             return NULL;
00276          }
00277       }
00278    }
00279 
00280    endpoint = ao2_alloc(sizeof(*endpoint), endpoint_dtor);
00281    if (!endpoint) {
00282       return NULL;
00283    }
00284 
00285    endpoint->max_channels = -1;
00286    endpoint->state = AST_ENDPOINT_UNKNOWN;
00287 
00288    if (ast_string_field_init(endpoint, 80) != 0) {
00289       return NULL;
00290    }
00291    ast_string_field_set(endpoint, tech, tech);
00292    ast_string_field_set(endpoint, resource, S_OR(resource, ""));
00293    ast_string_field_build(endpoint, id, "%s%s%s",
00294       tech,
00295       !ast_strlen_zero(resource) ? "/" : "",
00296       S_OR(resource, ""));
00297 
00298    /* All access to channel_ids should be covered by the endpoint's
00299     * lock; no extra lock needed. */
00300    endpoint->channel_ids = ast_str_container_alloc_options(
00301       AO2_ALLOC_OPT_LOCK_NOLOCK, ENDPOINT_CHANNEL_BUCKETS);
00302    if (!endpoint->channel_ids) {
00303       return NULL;
00304    }
00305 
00306    endpoint->topics = stasis_cp_single_create(ast_endpoint_cache_all(),
00307       endpoint->id);
00308    if (!endpoint->topics) {
00309       return NULL;
00310    }
00311 
00312    if (!ast_strlen_zero(resource)) {
00313       endpoint->router = stasis_message_router_create_pool(ast_endpoint_topic(endpoint));
00314       if (!endpoint->router) {
00315          return NULL;
00316       }
00317       r |= stasis_message_router_add(endpoint->router,
00318          stasis_cache_clear_type(), endpoint_cache_clear,
00319          endpoint);
00320       r |= stasis_message_router_set_default(endpoint->router,
00321          endpoint_default, endpoint);
00322       if (r) {
00323          return NULL;
00324       }
00325 
00326       endpoint->tech_forward = stasis_forward_all(stasis_cp_single_topic(endpoint->topics),
00327          stasis_cp_single_topic(tech_endpoint->topics));
00328       endpoint_publish_snapshot(endpoint);
00329       ao2_link(endpoints, endpoint);
00330    } else {
00331       ao2_link(tech_endpoints, endpoint);
00332    }
00333 
00334    ao2_ref(endpoint, +1);
00335    return endpoint;
00336 }
00337 
00338 struct ast_endpoint *ast_endpoint_create(const char *tech, const char *resource)
00339 {
00340    if (ast_strlen_zero(tech)) {
00341       ast_log(LOG_ERROR, "Endpoint tech cannot be empty\n");
00342       return NULL;
00343    }
00344 
00345    if (ast_strlen_zero(resource)) {
00346       ast_log(LOG_ERROR, "Endpoint resource cannot be empty\n");
00347       return NULL;
00348    }
00349 
00350    return endpoint_internal_create(tech, resource);
00351 }
00352 
00353 static struct stasis_message *create_endpoint_snapshot_message(struct ast_endpoint *endpoint)
00354 {
00355    RAII_VAR(struct ast_endpoint_snapshot *, snapshot, NULL, ao2_cleanup);
00356 
00357    if (!ast_endpoint_snapshot_type()) {
00358       return NULL;
00359    }
00360 
00361    snapshot = ast_endpoint_snapshot_create(endpoint);
00362    if (!snapshot) {
00363       return NULL;
00364    }
00365 
00366    return stasis_message_create(ast_endpoint_snapshot_type(), snapshot);
00367 }
00368 
00369 void ast_endpoint_shutdown(struct ast_endpoint *endpoint)
00370 {
00371    RAII_VAR(struct stasis_message *, clear_msg, NULL, ao2_cleanup);
00372 
00373    if (endpoint == NULL) {
00374       return;
00375    }
00376 
00377    ao2_unlink(endpoints, endpoint);
00378    endpoint->tech_forward = stasis_forward_cancel(endpoint->tech_forward);
00379 
00380    clear_msg = create_endpoint_snapshot_message(endpoint);
00381    if (clear_msg) {
00382       RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup);
00383       message = stasis_cache_clear_create(clear_msg);
00384       if (message) {
00385          stasis_publish(ast_endpoint_topic(endpoint), message);
00386       }
00387    }
00388 
00389    /* Bump refcount to hold on to the router */
00390    ao2_ref(endpoint->router, +1);
00391    stasis_message_router_unsubscribe(endpoint->router);
00392 }
00393 
00394 const char *ast_endpoint_get_tech(const struct ast_endpoint *endpoint)
00395 {
00396    if (!endpoint) {
00397       return NULL;
00398    }
00399    return endpoint->tech;
00400 }
00401 
00402 const char *ast_endpoint_get_resource(const struct ast_endpoint *endpoint)
00403 {
00404    if (!endpoint) {
00405       return NULL;
00406    }
00407    return endpoint->resource;
00408 }
00409 
00410 const char *ast_endpoint_get_id(const struct ast_endpoint *endpoint)
00411 {
00412    if (!endpoint) {
00413       return NULL;
00414    }
00415    return endpoint->id;
00416 }
00417 
00418 void ast_endpoint_set_state(struct ast_endpoint *endpoint,
00419    enum ast_endpoint_state state)
00420 {
00421    ast_assert(endpoint != NULL);
00422    ast_assert(!ast_strlen_zero(endpoint->resource));
00423 
00424    ao2_lock(endpoint);
00425    endpoint->state = state;
00426    ao2_unlock(endpoint);
00427    endpoint_publish_snapshot(endpoint);
00428 }
00429 
00430 void ast_endpoint_set_max_channels(struct ast_endpoint *endpoint,
00431    int max_channels)
00432 {
00433    ast_assert(endpoint != NULL);
00434    ast_assert(!ast_strlen_zero(endpoint->resource));
00435 
00436    ao2_lock(endpoint);
00437    endpoint->max_channels = max_channels;
00438    ao2_unlock(endpoint);
00439    endpoint_publish_snapshot(endpoint);
00440 }
00441 
00442 static void endpoint_snapshot_dtor(void *obj)
00443 {
00444    struct ast_endpoint_snapshot *snapshot = obj;
00445    int channel;
00446 
00447    ast_assert(snapshot != NULL);
00448 
00449    for (channel = 0; channel < snapshot->num_channels; channel++) {
00450       ao2_ref(snapshot->channel_ids[channel], -1);
00451    }
00452 
00453    ast_string_field_free_memory(snapshot);
00454 }
00455 
00456 struct ast_endpoint_snapshot *ast_endpoint_snapshot_create(
00457    struct ast_endpoint *endpoint)
00458 {
00459    RAII_VAR(struct ast_endpoint_snapshot *, snapshot, NULL, ao2_cleanup);
00460    int channel_count;
00461    struct ao2_iterator i;
00462    void *obj;
00463    SCOPED_AO2LOCK(lock, endpoint);
00464 
00465    ast_assert(endpoint != NULL);
00466    ast_assert(!ast_strlen_zero(endpoint->resource));
00467 
00468    channel_count = ao2_container_count(endpoint->channel_ids);
00469 
00470    snapshot = ao2_alloc_options(
00471       sizeof(*snapshot) + channel_count * sizeof(char *),
00472       endpoint_snapshot_dtor,
00473       AO2_ALLOC_OPT_LOCK_NOLOCK);
00474 
00475    if (!snapshot || ast_string_field_init(snapshot, 80) != 0) {
00476       ao2_cleanup(snapshot);
00477       return NULL;
00478    }
00479 
00480    ast_string_field_build(snapshot, id, "%s/%s", endpoint->tech,
00481       endpoint->resource);
00482    ast_string_field_set(snapshot, tech, endpoint->tech);
00483    ast_string_field_set(snapshot, resource, endpoint->resource);
00484 
00485    snapshot->state = endpoint->state;
00486    snapshot->max_channels = endpoint->max_channels;
00487 
00488    i = ao2_iterator_init(endpoint->channel_ids, 0);
00489    while ((obj = ao2_iterator_next(&i))) {
00490       /* The reference is kept so the channel id does not go away until the snapshot is gone */
00491       snapshot->channel_ids[snapshot->num_channels++] = obj;
00492    }
00493    ao2_iterator_destroy(&i);
00494 
00495    ao2_ref(snapshot, +1);
00496    return snapshot;
00497 }
00498 
00499 static void endpoint_cleanup(void)
00500 {
00501    ao2_cleanup(endpoints);
00502    endpoints = NULL;
00503 
00504    ao2_cleanup(tech_endpoints);
00505    tech_endpoints = NULL;
00506 }
00507 
00508 int ast_endpoint_init(void)
00509 {
00510    ast_register_cleanup(endpoint_cleanup);
00511 
00512    endpoints = ao2_container_alloc(ENDPOINT_BUCKETS, endpoint_hash,
00513       endpoint_cmp);
00514    if (!endpoints) {
00515       return -1;
00516    }
00517 
00518    tech_endpoints = ao2_container_alloc(TECH_ENDPOINT_BUCKETS, endpoint_hash,
00519       endpoint_cmp);
00520    if (!tech_endpoints) {
00521       return -1;
00522    }
00523 
00524    return 0;
00525 }

Generated on Thu Apr 16 06:27:33 2015 for Asterisk - The Open Source Telephony Project by  doxygen 1.5.6