res/stasis/app.c File Reference

Stasis application support. More...

#include "asterisk.h"
#include "app.h"
#include "control.h"
#include "messaging.h"
#include "asterisk/callerid.h"
#include "asterisk/stasis_app.h"
#include "asterisk/stasis_bridges.h"
#include "asterisk/stasis_channels.h"
#include "asterisk/stasis_endpoints.h"
#include "asterisk/stasis_message_router.h"

Include dependency graph for res/stasis/app.c:

Go to the source code of this file.

Data Structures

struct  app_forwards
struct  stasis_app

Typedefs

typedef struct ast_json *(* channel_snapshot_monitor )(struct ast_channel_snapshot *old_snapshot, struct ast_channel_snapshot *new_snapshot, const struct timeval *tv)
 Typedef for callbacks that get called on channel snapshot updates.

Enumerations

enum  forward_type { FORWARD_CHANNEL, FORWARD_BRIDGE, FORWARD_ENDPOINT }

Functions

struct stasis_appapp_create (const char *name, stasis_app_cb handler, void *data)
 Create a res_stasis application.
void app_deactivate (struct stasis_app *app)
 Deactivates an application.
static void app_dtor (void *obj)
int app_is_active (struct stasis_app *app)
 Checks whether an app is active.
int app_is_finished (struct stasis_app *app)
 Checks whether a deactivated app has no channels.
int app_is_subscribed_bridge_id (struct stasis_app *app, const char *bridge_id)
 Test if an app is subscribed to a bridge.
int app_is_subscribed_channel_id (struct stasis_app *app, const char *channel_id)
 Test if an app is subscribed to a channel.
int app_is_subscribed_endpoint_id (struct stasis_app *app, const char *endpoint_id)
 Test if an app is subscribed to a endpoint.
const char * app_name (const struct stasis_app *app)
 Return an application's name.
void app_send (struct stasis_app *app, struct ast_json *message)
 Send a message to the given application.
void app_shutdown (struct stasis_app *app)
 Tears down an application.
int app_subscribe_bridge (struct stasis_app *app, struct ast_bridge *bridge)
 Add a bridge subscription to an existing channel subscription.
int app_subscribe_channel (struct stasis_app *app, struct ast_channel *chan)
 Subscribes an application to a channel.
int app_subscribe_endpoint (struct stasis_app *app, struct ast_endpoint *endpoint)
 Subscribes an application to a endpoint.
struct ast_jsonapp_to_json (const struct stasis_app *app)
int app_unsubscribe_bridge (struct stasis_app *app, struct ast_bridge *bridge)
 Cancel the bridge subscription for an application.
int app_unsubscribe_bridge_id (struct stasis_app *app, const char *bridge_id)
 Cancel the subscription an app has for a bridge.
int app_unsubscribe_channel (struct stasis_app *app, struct ast_channel *chan)
 Cancel the subscription an app has for a channel.
int app_unsubscribe_channel_id (struct stasis_app *app, const char *channel_id)
 Cancel the subscription an app has for a channel.
int app_unsubscribe_endpoint_id (struct stasis_app *app, const char *endpoint_id)
 Cancel the subscription an app has for a endpoint.
void app_update (struct stasis_app *app, stasis_app_cb handler, void *data)
 Update the handler and data for a res_stasis application.
struct stasis_topicast_app_get_topic (struct stasis_app *app)
 Returns the stasis topic for an app.
static int bridge_app_subscribed (struct stasis_app *app, const char *uniqueid)
 Helper function for determining if the application is subscribed to a given entity.
static int bridge_app_subscribed_involved (struct stasis_app *app, struct ast_bridge_snapshot *snapshot)
 Callback function for checking if channels in a bridge are subscribed to.
static void bridge_attended_transfer_handler (void *data, struct stasis_subscription *sub, struct stasis_message *message)
static void bridge_blind_transfer_handler (void *data, struct stasis_subscription *sub, struct stasis_message *message)
static void bridge_default_handler (void *data, struct stasis_subscription *sub, struct stasis_message *message)
static void * bridge_find (const struct stasis_app *app, const char *id)
static void bridge_merge_handler (void *data, struct stasis_subscription *sub, struct stasis_message *message)
static void call_forwarded_handler (struct stasis_app *app, struct stasis_message *message)
static struct ast_jsonchannel_callerid (struct ast_channel_snapshot *old_snapshot, struct ast_channel_snapshot *new_snapshot, const struct timeval *tv)
static struct ast_jsonchannel_connected_line (struct ast_channel_snapshot *old_snapshot, struct ast_channel_snapshot *new_snapshot, const struct timeval *tv)
static struct ast_jsonchannel_created_event (struct ast_channel_snapshot *snapshot, const struct timeval *tv)
static struct ast_jsonchannel_destroyed_event (struct ast_channel_snapshot *snapshot, const struct timeval *tv)
static struct ast_jsonchannel_dialplan (struct ast_channel_snapshot *old_snapshot, struct ast_channel_snapshot *new_snapshot, const struct timeval *tv)
static void * channel_find (const struct stasis_app *app, const char *id)
static struct ast_jsonchannel_state (struct ast_channel_snapshot *old_snapshot, struct ast_channel_snapshot *new_snapshot, const struct timeval *tv)
 Handle channel state changes.
static struct ast_jsonchannel_state_change_event (struct ast_channel_snapshot *snapshot, const struct timeval *tv)
static void * endpoint_find (const struct stasis_app *app, const char *id)
static struct app_forwardsforwards_create (struct stasis_app *app, const char *id)
static struct app_forwardsforwards_create_bridge (struct stasis_app *app, struct ast_bridge *bridge)
static struct app_forwardsforwards_create_channel (struct stasis_app *app, struct ast_channel *chan)
static struct app_forwardsforwards_create_endpoint (struct stasis_app *app, struct ast_endpoint *endpoint)
static void forwards_dtor (void *obj)
static int forwards_sort (const void *obj_left, const void *obj_right, int flags)
static void forwards_unsubscribe (struct app_forwards *forwards)
static int message_received_handler (const char *endpoint_id, struct ast_json *json_msg, void *pvt)
static struct ast_jsonsimple_bridge_event (const char *type, struct ast_bridge_snapshot *snapshot, const struct timeval *tv)
static struct ast_jsonsimple_channel_event (const char *type, struct ast_channel_snapshot *snapshot, const struct timeval *tv)
static struct ast_jsonsimple_endpoint_event (const char *type, struct ast_endpoint_snapshot *snapshot, const struct timeval *tv)
int stasis_app_is_core_event_source (struct stasis_app_event_source *obj)
 Checks to see if the given object is a core event source.
void stasis_app_register_event_sources (void)
 Register core event sources.
void stasis_app_unregister_event_sources (void)
 Unregister core event sources.
static void sub_bridge_update_handler (void *data, struct stasis_subscription *sub, struct stasis_message *message)
static void sub_channel_update_handler (void *data, struct stasis_subscription *sub, struct stasis_message *message)
static void sub_default_handler (void *data, struct stasis_subscription *sub, struct stasis_message *message)
static void sub_endpoint_update_handler (void *data, struct stasis_subscription *sub, struct stasis_message *message)
static int subscribe_bridge (struct stasis_app *app, void *obj)
static int subscribe_channel (struct stasis_app *app, void *obj)
static int subscribe_endpoint (struct stasis_app *app, void *obj)
static int unsubscribe (struct stasis_app *app, const char *kind, const char *id, int terminate)

Variables

struct stasis_app_event_source bridge_event_source
struct stasis_app_event_source channel_event_source
static channel_snapshot_monitor channel_monitors []
struct stasis_app_event_source endpoint_event_source


Detailed Description

Stasis application support.

Author:
David M. Lee, II <dlee@digium.com>

Definition in file res/stasis/app.c.


Typedef Documentation

typedef struct ast_json*(* channel_snapshot_monitor)(struct ast_channel_snapshot *old_snapshot, struct ast_channel_snapshot *new_snapshot, const struct timeval *tv)

Typedef for callbacks that get called on channel snapshot updates.

Definition at line 315 of file res/stasis/app.c.


Enumeration Type Documentation

Enumerator:
FORWARD_CHANNEL 
FORWARD_BRIDGE 
FORWARD_ENDPOINT 

Definition at line 60 of file res/stasis/app.c.

00060                   {
00061    FORWARD_CHANNEL,
00062    FORWARD_BRIDGE,
00063    FORWARD_ENDPOINT,
00064 };


Function Documentation

struct stasis_app* app_create ( const char *  name,
stasis_app_cb  handler,
void *  data 
) [read]

Create a res_stasis application.

Parameters:
name Name of the application.
handler Callback for messages sent to the application.
data Data pointer provided to the callback.
Returns:
New res_stasis application.

NULL on error.

Definition at line 796 of file res/stasis/app.c.

References AO2_ALLOC_OPT_LOCK_MUTEX, ao2_alloc_options, ao2_cleanup, AO2_CONTAINER_ALLOC_OPT_DUPS_OBJ_REJECT, ao2_container_alloc_rbtree, ao2_ref, app, app_dtor(), ast_assert, ast_attended_transfer_type(), ast_blind_transfer_type(), ast_bridge_merge_message_type(), ast_bridge_snapshot_type(), ast_bridge_topic_all(), ast_channel_snapshot_type(), ast_endpoint_snapshot_type(), ast_verb, bridge_attended_transfer_handler(), bridge_blind_transfer_handler(), bridge_default_handler(), bridge_merge_handler(), forwards_sort(), NULL, RAII_VAR, stasis_message_router_add(), stasis_message_router_add_cache_update(), stasis_message_router_create(), stasis_message_router_set_default(), stasis_topic_create(), sub_bridge_update_handler(), sub_channel_update_handler(), sub_default_handler(), and sub_endpoint_update_handler().

Referenced by stasis_app_register().

00797 {
00798    RAII_VAR(struct stasis_app *, app, NULL, ao2_cleanup);
00799    size_t size;
00800    int res = 0;
00801 
00802    ast_assert(name != NULL);
00803    ast_assert(handler != NULL);
00804 
00805    ast_verb(1, "Creating Stasis app '%s'\n", name);
00806 
00807    size = sizeof(*app) + strlen(name) + 1;
00808    app = ao2_alloc_options(size, app_dtor, AO2_ALLOC_OPT_LOCK_MUTEX);
00809 
00810    if (!app) {
00811       return NULL;
00812    }
00813 
00814    app->forwards = ao2_container_alloc_rbtree(AO2_ALLOC_OPT_LOCK_MUTEX,
00815       AO2_CONTAINER_ALLOC_OPT_DUPS_OBJ_REJECT,
00816       forwards_sort, NULL);
00817    if (!app->forwards) {
00818       return NULL;
00819    }
00820 
00821    app->topic = stasis_topic_create(name);
00822    if (!app->topic) {
00823       return NULL;
00824    }
00825 
00826    app->bridge_router = stasis_message_router_create(ast_bridge_topic_all());
00827    if (!app->bridge_router) {
00828       return NULL;
00829    }
00830 
00831    res |= stasis_message_router_add(app->bridge_router,
00832       ast_bridge_merge_message_type(), bridge_merge_handler, app);
00833 
00834    res |= stasis_message_router_add(app->bridge_router,
00835       ast_blind_transfer_type(), bridge_blind_transfer_handler, app);
00836 
00837    res |= stasis_message_router_add(app->bridge_router,
00838       ast_attended_transfer_type(), bridge_attended_transfer_handler, app);
00839 
00840    res |= stasis_message_router_set_default(app->bridge_router,
00841       bridge_default_handler, app);
00842 
00843    if (res != 0) {
00844       return NULL;
00845    }
00846    /* Bridge router holds a reference */
00847    ao2_ref(app, +1);
00848 
00849    app->router = stasis_message_router_create(app->topic);
00850    if (!app->router) {
00851       return NULL;
00852    }
00853 
00854    res |= stasis_message_router_add_cache_update(app->router,
00855       ast_bridge_snapshot_type(), sub_bridge_update_handler, app);
00856 
00857    res |= stasis_message_router_add_cache_update(app->router,
00858       ast_channel_snapshot_type(), sub_channel_update_handler, app);
00859 
00860    res |= stasis_message_router_add_cache_update(app->router,
00861       ast_endpoint_snapshot_type(), sub_endpoint_update_handler, app);
00862 
00863    res |= stasis_message_router_set_default(app->router,
00864       sub_default_handler, app);
00865 
00866    if (res != 0) {
00867       return NULL;
00868    }
00869    /* Router holds a reference */
00870    ao2_ref(app, +1);
00871 
00872    strncpy(app->name, name, size - sizeof(*app));
00873    app->handler = handler;
00874    ao2_ref(data, +1);
00875    app->data = data;
00876 
00877    ao2_ref(app, +1);
00878    return app;
00879 }

void app_deactivate ( struct stasis_app app  ) 

Deactivates an application.

Any channels currently in the application remain active (since the app might come back), but new channels are rejected.

Parameters:
app Application to deactivate.

Definition at line 915 of file res/stasis/app.c.

References ao2_cleanup, ast_verb, stasis_app::data, stasis_app::handler, lock, stasis_app::name, NULL, and SCOPED_AO2LOCK.

Referenced by stasis_app_unregister().

00916 {
00917    SCOPED_AO2LOCK(lock, app);
00918    ast_verb(1, "Deactivating Stasis app '%s'\n", app->name);
00919    app->handler = NULL;
00920    ao2_cleanup(app->data);
00921    app->data = NULL;
00922 }

static void app_dtor ( void *  obj  )  [static]

Definition at line 255 of file res/stasis/app.c.

References ao2_cleanup, app, ast_assert, ast_verb, stasis_app::bridge_router, stasis_app::data, stasis_app::forwards, stasis_app::name, NULL, stasis_app::router, and stasis_app::topic.

Referenced by app_create().

00256 {
00257    struct stasis_app *app = obj;
00258 
00259    ast_verb(1, "Destroying Stasis app %s\n", app->name);
00260 
00261    ast_assert(app->router == NULL);
00262    ast_assert(app->bridge_router == NULL);
00263 
00264    ao2_cleanup(app->topic);
00265    app->topic = NULL;
00266    ao2_cleanup(app->forwards);
00267    app->forwards = NULL;
00268    ao2_cleanup(app->data);
00269    app->data = NULL;
00270 }

int app_is_active ( struct stasis_app app  ) 

Checks whether an app is active.

Parameters:
app Application to check.
Returns:
True (non-zero) if app is active.

False (zero) if app has been deactivated.

Definition at line 936 of file res/stasis/app.c.

References stasis_app::handler, lock, NULL, and SCOPED_AO2LOCK.

Referenced by stasis_app_exec().

00937 {
00938    SCOPED_AO2LOCK(lock, app);
00939    return app->handler != NULL;
00940 }

int app_is_finished ( struct stasis_app app  ) 

Checks whether a deactivated app has no channels.

Parameters:
app Application to check.
True (non-zero) if app is deactivated, and has no associated channels.
False (zero) otherwise.

Definition at line 942 of file res/stasis/app.c.

References ao2_container_count(), stasis_app::forwards, stasis_app::handler, lock, NULL, and SCOPED_AO2LOCK.

Referenced by app_shutdown(), and cleanup_cb().

00943 {
00944    SCOPED_AO2LOCK(lock, app);
00945 
00946    return app->handler == NULL && ao2_container_count(app->forwards) == 0;
00947 }

int app_is_subscribed_bridge_id ( struct stasis_app app,
const char *  bridge_id 
)

Test if an app is subscribed to a bridge.

Parameters:
app Subscribing application.
bridge_id Id of bridge to check.
Returns:
True (non-zero) if bridge is subscribed to app.

False (zero) if bridge is not subscribed.

Definition at line 1184 of file res/stasis/app.c.

References ao2_cleanup, ao2_find, stasis_app::forwards, NULL, OBJ_SEARCH_KEY, and RAII_VAR.

01185 {
01186    RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
01187    forwards = ao2_find(app->forwards, bridge_id, OBJ_SEARCH_KEY);
01188    return forwards != NULL;
01189 }

int app_is_subscribed_channel_id ( struct stasis_app app,
const char *  channel_id 
)

Test if an app is subscribed to a channel.

Parameters:
app Subscribing application.
channel_id Id of channel to check.
Returns:
True (non-zero) if channel is subscribed to app.

False (zero) if channel is not subscribed.

Definition at line 1115 of file res/stasis/app.c.

References ao2_cleanup, ao2_find, stasis_app::forwards, NULL, OBJ_SEARCH_KEY, and RAII_VAR.

01116 {
01117    RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
01118    forwards = ao2_find(app->forwards, channel_id, OBJ_SEARCH_KEY);
01119    return forwards != NULL;
01120 }

int app_is_subscribed_endpoint_id ( struct stasis_app app,
const char *  endpoint_id 
)

Test if an app is subscribed to a endpoint.

Parameters:
app Subscribing application.
endpoint_id Id of endpoint to check.
Returns:
True (non-zero) if endpoint is subscribed to app.

False (zero) if endpoint is not subscribed.

Definition at line 1247 of file res/stasis/app.c.

References ao2_cleanup, ao2_find, stasis_app::forwards, NULL, OBJ_SEARCH_KEY, and RAII_VAR.

01248 {
01249    RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
01250    forwards = ao2_find(app->forwards, endpoint_id, OBJ_SEARCH_KEY);
01251    return forwards != NULL;
01252 }

const char* app_name ( const struct stasis_app app  ) 

Return an application's name.

Parameters:
app Application.
Returns:
Name of the application.

NULL is app is NULL.

Definition at line 976 of file res/stasis/app.c.

References stasis_app::name.

Referenced by app_exec(), app_send_end_msg(), app_subscribe(), app_unsubscribe(), AST_TEST_DEFINE(), bridge_stasis_push_peek(), bridge_stasis_run_cb(), device_state_subscription_create(), device_to_json_cb(), lua_pbx_exec(), lua_pbx_findapp(), send_start_msg_snapshots(), and stasis_app_name().

00977 {
00978    return app->name;
00979 }

void app_send ( struct stasis_app app,
struct ast_json message 
)

Send a message to the given application.

Send a message to an application.

Parameters:
app App to send the message to.
message Message to send.

Definition at line 890 of file res/stasis/app.c.

References ao2_cleanup, ao2_ref, ast_verb, stasis_app::data, stasis_app::handler, handler(), lock, stasis_app::name, NULL, RAII_VAR, and SCOPED_AO2LOCK.

Referenced by app_update(), message_received_handler(), stasis_app_send(), sub_bridge_update_handler(), sub_channel_update_handler(), sub_default_handler(), and sub_endpoint_update_handler().

00891 {
00892    stasis_app_cb handler;
00893    RAII_VAR(void *, data, NULL, ao2_cleanup);
00894 
00895    /* Copy off mutable state with lock held */
00896    {
00897       SCOPED_AO2LOCK(lock, app);
00898       handler = app->handler;
00899       if (app->data) {
00900          ao2_ref(app->data, +1);
00901          data = app->data;
00902       }
00903       /* Name is immutable; no need to copy */
00904    }
00905 
00906    if (!handler) {
00907       ast_verb(3,
00908          "Inactive Stasis app '%s' missed message\n", app->name);
00909       return;
00910    }
00911 
00912    handler(data, app->name, message);
00913 }

void app_shutdown ( struct stasis_app app  ) 

Tears down an application.

It should be finished before calling this.

Parameters:
app Application to unsubscribe.

Definition at line 924 of file res/stasis/app.c.

References app_is_finished(), ast_assert, stasis_app::bridge_router, lock, NULL, stasis_app::router, SCOPED_AO2LOCK, and stasis_message_router_unsubscribe().

Referenced by cleanup_cb().

00925 {
00926    SCOPED_AO2LOCK(lock, app);
00927 
00928    ast_assert(app_is_finished(app));
00929 
00930    stasis_message_router_unsubscribe(app->router);
00931    app->router = NULL;
00932    stasis_message_router_unsubscribe(app->bridge_router);
00933    app->bridge_router = NULL;
00934 }

int app_subscribe_bridge ( struct stasis_app app,
struct ast_bridge bridge 
)

Add a bridge subscription to an existing channel subscription.

Parameters:
app Application.
bridge Bridge to subscribe to.
Returns:
0 on success.

Non-zero on error.

Definition at line 1135 of file res/stasis/app.c.

References ao2_cleanup, ao2_find, ao2_link_flags, ast_debug, stasis_app::forwards, forwards_create_bridge(), lock, stasis_app::name, NULL, OBJ_NOLOCK, OBJ_SEARCH_KEY, RAII_VAR, SCOPED_AO2LOCK, and ast_bridge::uniqueid.

Referenced by control_add_channel_to_bridge(), stasis_app_exec(), and subscribe_bridge().

01136 {
01137    if (!app || !bridge) {
01138       return -1;
01139    } else {
01140       RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
01141       SCOPED_AO2LOCK(lock, app->forwards);
01142 
01143       forwards = ao2_find(app->forwards, bridge->uniqueid,
01144          OBJ_SEARCH_KEY | OBJ_NOLOCK);
01145 
01146       if (!forwards) {
01147          /* Forwards not found, create one */
01148          forwards = forwards_create_bridge(app, bridge);
01149          if (!forwards) {
01150             return -1;
01151          }
01152          ao2_link_flags(app->forwards, forwards, OBJ_NOLOCK);
01153       }
01154 
01155       ++forwards->interested;
01156       ast_debug(3, "Bridge '%s' is %d interested in %s\n", bridge->uniqueid, forwards->interested, app->name);
01157       return 0;
01158    }
01159 }

int app_subscribe_channel ( struct stasis_app app,
struct ast_channel chan 
)

Subscribes an application to a channel.

Parameters:
app Application.
chan Channel to subscribe to.
Returns:
0 on success.

Non-zero on error.

Definition at line 1031 of file res/stasis/app.c.

References ao2_cleanup, ao2_find, ao2_link_flags, ast_channel_uniqueid(), ast_debug, stasis_app::forwards, forwards_create_channel(), lock, stasis_app::name, NULL, OBJ_NOLOCK, OBJ_SEARCH_KEY, RAII_VAR, and SCOPED_AO2LOCK.

Referenced by call_forwarded_handler(), send_start_msg_snapshots(), stasis_app_subscribe_channel(), and subscribe_channel().

01032 {
01033    int res;
01034 
01035    if (!app || !chan) {
01036       return -1;
01037    } else {
01038       RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
01039       SCOPED_AO2LOCK(lock, app->forwards);
01040 
01041       forwards = ao2_find(app->forwards, ast_channel_uniqueid(chan),
01042          OBJ_SEARCH_KEY | OBJ_NOLOCK);
01043       if (!forwards) {
01044          /* Forwards not found, create one */
01045          forwards = forwards_create_channel(app, chan);
01046          if (!forwards) {
01047             return -1;
01048          }
01049 
01050          res = ao2_link_flags(app->forwards, forwards,
01051             OBJ_NOLOCK);
01052          if (!res) {
01053             return -1;
01054          }
01055       }
01056 
01057       ++forwards->interested;
01058       ast_debug(3, "Channel '%s' is %d interested in %s\n", ast_channel_uniqueid(chan), forwards->interested, app->name);
01059       return 0;
01060    }
01061 }

int app_subscribe_endpoint ( struct stasis_app app,
struct ast_endpoint endpoint 
)

Subscribes an application to a endpoint.

Parameters:
app Application.
chan Endpoint to subscribe to.
Returns:
0 on success.

Non-zero on error.

Definition at line 1204 of file res/stasis/app.c.

References ao2_cleanup, ao2_find, ao2_link_flags, ast_debug, ast_endpoint_get_id(), stasis_app::forwards, forwards_create_endpoint(), lock, message_received_handler(), messaging_app_subscribe_endpoint(), stasis_app::name, NULL, OBJ_NOLOCK, OBJ_SEARCH_KEY, RAII_VAR, and SCOPED_AO2LOCK.

Referenced by subscribe_endpoint().

01205 {
01206    if (!app || !endpoint) {
01207       return -1;
01208    } else {
01209       RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
01210       SCOPED_AO2LOCK(lock, app->forwards);
01211 
01212       forwards = ao2_find(app->forwards, ast_endpoint_get_id(endpoint),
01213          OBJ_SEARCH_KEY | OBJ_NOLOCK);
01214 
01215       if (!forwards) {
01216          /* Forwards not found, create one */
01217          forwards = forwards_create_endpoint(app, endpoint);
01218          if (!forwards) {
01219             return -1;
01220          }
01221          ao2_link_flags(app->forwards, forwards, OBJ_NOLOCK);
01222 
01223          /* Subscribe for messages */
01224          messaging_app_subscribe_endpoint(app->name, endpoint, &message_received_handler, app);
01225       }
01226 
01227       ++forwards->interested;
01228       ast_debug(3, "Endpoint '%s' is %d interested in %s\n", ast_endpoint_get_id(endpoint), forwards->interested, app->name);
01229       return 0;
01230    }
01231 }

struct ast_json* app_to_json ( const struct stasis_app app  )  [read]

Definition at line 981 of file res/stasis/app.c.

References ao2_cleanup, ao2_iterator_destroy(), ao2_iterator_init(), ao2_iterator_next, ast_json_array_append(), ast_json_object_get(), ast_json_pack(), ast_json_ref(), ast_json_string_create(), ast_json_unref(), ast_log, bridges, channels, endpoints, FORWARD_BRIDGE, FORWARD_CHANNEL, FORWARD_ENDPOINT, stasis_app::forwards, LOG_ERROR, stasis_app::name, NULL, and RAII_VAR.

Referenced by stasis_app_object_to_json().

00982 {
00983    RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
00984    struct ast_json *channels;
00985    struct ast_json *bridges;
00986    struct ast_json *endpoints;
00987    struct ao2_iterator i;
00988    void *obj;
00989 
00990    json = ast_json_pack("{s: s, s: [], s: [], s: []}",
00991       "name", app->name,
00992       "channel_ids", "bridge_ids", "endpoint_ids");
00993    channels = ast_json_object_get(json, "channel_ids");
00994    bridges = ast_json_object_get(json, "bridge_ids");
00995    endpoints = ast_json_object_get(json, "endpoint_ids");
00996 
00997    i = ao2_iterator_init(app->forwards, 0);
00998    while ((obj = ao2_iterator_next(&i))) {
00999       RAII_VAR(struct app_forwards *, forwards, obj, ao2_cleanup);
01000       RAII_VAR(struct ast_json *, id, NULL, ast_json_unref);
01001       int append_res = -1;
01002 
01003       id = ast_json_string_create(forwards->id);
01004 
01005       switch (forwards->forward_type) {
01006       case FORWARD_CHANNEL:
01007          append_res = ast_json_array_append(channels,
01008             ast_json_ref(id));
01009          break;
01010       case FORWARD_BRIDGE:
01011          append_res = ast_json_array_append(bridges,
01012             ast_json_ref(id));
01013          break;
01014       case FORWARD_ENDPOINT:
01015          append_res = ast_json_array_append(endpoints,
01016             ast_json_ref(id));
01017          break;
01018       }
01019 
01020       if (append_res != 0) {
01021          ast_log(LOG_ERROR, "Error building response\n");
01022          ao2_iterator_destroy(&i);
01023          return NULL;
01024       }
01025    }
01026    ao2_iterator_destroy(&i);
01027 
01028    return ast_json_ref(json);
01029 }

int app_unsubscribe_bridge ( struct stasis_app app,
struct ast_bridge bridge 
)

Cancel the bridge subscription for an application.

Parameters:
forwards Return from app_subscribe_channel().
bridge Bridge to subscribe to.
Returns:
0 on success.

Non-zero on error.

Definition at line 1166 of file res/stasis/app.c.

References app_unsubscribe_bridge_id(), and ast_bridge::uniqueid.

Referenced by bridge_after_cb(), and stasis_app_exec().

01167 {
01168    if (!app || !bridge) {
01169       return -1;
01170    }
01171 
01172    return app_unsubscribe_bridge_id(app, bridge->uniqueid);
01173 }

int app_unsubscribe_bridge_id ( struct stasis_app app,
const char *  bridge_id 
)

Cancel the subscription an app has for a bridge.

Parameters:
app Subscribing application.
bridge_id Id of bridge to unsubscribe from.
Returns:
0 on success.

Non-zero on error.

Definition at line 1175 of file res/stasis/app.c.

References unsubscribe().

Referenced by app_unsubscribe_bridge().

01176 {
01177    if (!app || !bridge_id) {
01178       return -1;
01179    }
01180 
01181    return unsubscribe(app, "bridge", bridge_id, 0);
01182 }

int app_unsubscribe_channel ( struct stasis_app app,
struct ast_channel chan 
)

Cancel the subscription an app has for a channel.

Parameters:
app Subscribing application.
chan Channel to unsubscribe from.
Returns:
0 on success.

Non-zero on error.

Definition at line 1097 of file res/stasis/app.c.

References app_unsubscribe_channel_id(), and ast_channel_uniqueid().

Referenced by app_send_end_msg().

01098 {
01099    if (!app || !chan) {
01100       return -1;
01101    }
01102 
01103    return app_unsubscribe_channel_id(app, ast_channel_uniqueid(chan));
01104 }

int app_unsubscribe_channel_id ( struct stasis_app app,
const char *  channel_id 
)

Cancel the subscription an app has for a channel.

Parameters:
app Subscribing application.
channel_id Id of channel to unsubscribe from.
Returns:
0 on success.

Non-zero on error.

Definition at line 1106 of file res/stasis/app.c.

References unsubscribe().

Referenced by app_unsubscribe_channel(), and send_start_msg_snapshots().

01107 {
01108    if (!app || !channel_id) {
01109       return -1;
01110    }
01111 
01112    return unsubscribe(app, "channel", channel_id, 0);
01113 }

int app_unsubscribe_endpoint_id ( struct stasis_app app,
const char *  endpoint_id 
)

Cancel the subscription an app has for a endpoint.

Parameters:
app Subscribing application.
endpoint_id Id of endpoint to unsubscribe from.
Returns:
0 on success.

Non-zero on error.

Definition at line 1238 of file res/stasis/app.c.

References unsubscribe().

01239 {
01240    if (!app || !endpoint_id) {
01241       return -1;
01242    }
01243 
01244    return unsubscribe(app, "endpoint", endpoint_id, 0);
01245 }

void app_update ( struct stasis_app app,
stasis_app_cb  handler,
void *  data 
)

Update the handler and data for a res_stasis application.

If app has been deactivated, this will reactivate it.

Parameters:
app Application to update.
handler New application callback.
data New data pointer for the callback.

Definition at line 949 of file res/stasis/app.c.

References ao2_cleanup, ao2_ref, app_send(), ast_json_pack(), ast_json_unref(), ast_verb, stasis_app::data, stasis_app::handler, lock, stasis_app::name, NULL, RAII_VAR, and SCOPED_AO2LOCK.

Referenced by stasis_app_register().

00950 {
00951    SCOPED_AO2LOCK(lock, app);
00952 
00953    if (app->handler) {
00954       RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
00955 
00956       ast_verb(1, "Replacing Stasis app '%s'\n", app->name);
00957 
00958       msg = ast_json_pack("{s: s, s: s}",
00959          "type", "ApplicationReplaced",
00960          "application", app->name);
00961       if (msg) {
00962          app_send(app, msg);
00963       }
00964    } else {
00965       ast_verb(1, "Activating Stasis app '%s'\n", app->name);
00966    }
00967 
00968    app->handler = handler;
00969    ao2_cleanup(app->data);
00970    if (data) {
00971       ao2_ref(data, +1);
00972    }
00973    app->data = data;
00974 }

struct stasis_topic* ast_app_get_topic ( struct stasis_app app  )  [read]

Returns the stasis topic for an app.

Parameters:
app Stasis app to get topic of

Definition at line 881 of file res/stasis/app.c.

References stasis_app::topic.

Referenced by app_send_end_msg(), send_start_msg_snapshots(), and stasis_app_user_event().

00881                                                                {
00882    return app->topic;
00883 }

static int bridge_app_subscribed ( struct stasis_app app,
const char *  uniqueid 
) [static]

Helper function for determining if the application is subscribed to a given entity.

Definition at line 674 of file res/stasis/app.c.

References ao2_find, ao2_ref, stasis_app::forwards, NULL, and OBJ_SEARCH_KEY.

Referenced by bridge_app_subscribed_involved(), bridge_attended_transfer_handler(), bridge_blind_transfer_handler(), and bridge_merge_handler().

00675 {
00676    struct app_forwards *forwards = NULL;
00677 
00678    forwards = ao2_find(app->forwards, uniqueid, OBJ_SEARCH_KEY);
00679    if (!forwards) {
00680       return 0;
00681    }
00682 
00683    ao2_ref(forwards, -1);
00684    return 1;
00685 }

static int bridge_app_subscribed_involved ( struct stasis_app app,
struct ast_bridge_snapshot snapshot 
) [static]

Callback function for checking if channels in a bridge are subscribed to.

Definition at line 704 of file res/stasis/app.c.

References ao2_iterator_destroy(), ao2_iterator_init(), ao2_iterator_next, ao2_ref, bridge_app_subscribed(), ast_bridge_snapshot::channels, subscribed, and ast_bridge_snapshot::uniqueid.

Referenced by bridge_attended_transfer_handler(), and bridge_blind_transfer_handler().

00705 {
00706    int subscribed = 0;
00707    struct ao2_iterator iter;
00708    char *uniqueid;
00709 
00710    if (bridge_app_subscribed(app, snapshot->uniqueid)) {
00711       return 1;
00712    }
00713 
00714    iter = ao2_iterator_init(snapshot->channels, 0);
00715    for (; (uniqueid = ao2_iterator_next(&iter)); ao2_ref(uniqueid, -1)) {
00716       if (bridge_app_subscribed(app, uniqueid)) {
00717          subscribed = 1;
00718          ao2_ref(uniqueid, -1);
00719          break;
00720       }
00721    }
00722    ao2_iterator_destroy(&iter);
00723 
00724    return subscribed;
00725 }

static void bridge_attended_transfer_handler ( void *  data,
struct stasis_subscription sub,
struct stasis_message message 
) [static]

Definition at line 740 of file res/stasis/app.c.

References app, AST_ATTENDED_TRANSFER_DEST_BRIDGE_MERGE, AST_ATTENDED_TRANSFER_DEST_LINK, AST_ATTENDED_TRANSFER_DEST_THREEWAY, ast_attended_transfer_message::bridge, bridge_app_subscribed(), bridge_app_subscribed_involved(), ast_bridge_channel_snapshot_pair::bridge_snapshot, ast_bridge_channel_snapshot_pair::channel_snapshot, ast_attended_transfer_message::dest, ast_attended_transfer_message::dest_type, ast_attended_transfer_message::links, stasis_message_data(), stasis_publish(), subscribed, ast_attended_transfer_message::threeway, ast_attended_transfer_message::to_transfer_target, ast_attended_transfer_message::to_transferee, stasis_app::topic, and ast_channel_snapshot::uniqueid.

Referenced by app_create().

00742 {
00743    struct stasis_app *app = data;
00744    struct ast_attended_transfer_message *transfer_msg = stasis_message_data(message);
00745    int subscribed = 0;
00746 
00747    subscribed = bridge_app_subscribed(app, transfer_msg->to_transferee.channel_snapshot->uniqueid);
00748    if (!subscribed) {
00749       subscribed = bridge_app_subscribed(app, transfer_msg->to_transfer_target.channel_snapshot->uniqueid);
00750    }
00751    if (!subscribed && transfer_msg->to_transferee.bridge_snapshot) {
00752       subscribed = bridge_app_subscribed_involved(app, transfer_msg->to_transferee.bridge_snapshot);
00753    }
00754    if (!subscribed && transfer_msg->to_transfer_target.bridge_snapshot) {
00755       subscribed = bridge_app_subscribed_involved(app, transfer_msg->to_transfer_target.bridge_snapshot);
00756    }
00757 
00758    if (!subscribed) {
00759       switch (transfer_msg->dest_type) {
00760       case AST_ATTENDED_TRANSFER_DEST_BRIDGE_MERGE:
00761          subscribed = bridge_app_subscribed(app, transfer_msg->dest.bridge);
00762          break;
00763       case AST_ATTENDED_TRANSFER_DEST_LINK:
00764          subscribed = bridge_app_subscribed(app, transfer_msg->dest.links[0]->uniqueid);
00765          if (!subscribed) {
00766             subscribed = bridge_app_subscribed(app, transfer_msg->dest.links[1]->uniqueid);
00767          }
00768          break;
00769       break;
00770       case AST_ATTENDED_TRANSFER_DEST_THREEWAY:
00771          subscribed = bridge_app_subscribed_involved(app, transfer_msg->dest.threeway.bridge_snapshot);
00772          if (!subscribed) {
00773             subscribed = bridge_app_subscribed(app, transfer_msg->dest.threeway.channel_snapshot->uniqueid);
00774          }
00775          break;
00776       default:
00777          break;
00778       }
00779    }
00780 
00781    if (subscribed) {
00782       stasis_publish(app->topic, message);
00783    }
00784 }

static void bridge_blind_transfer_handler ( void *  data,
struct stasis_subscription sub,
struct stasis_message message 
) [static]

Definition at line 727 of file res/stasis/app.c.

References app, ast_blind_transfer_message::bridge, bridge_app_subscribed(), bridge_app_subscribed_involved(), stasis_message_data(), stasis_publish(), stasis_app::topic, ast_blind_transfer_message::transferer, and ast_channel_snapshot::uniqueid.

Referenced by app_create().

00729 {
00730    struct stasis_app *app = data;
00731    struct ast_blind_transfer_message *transfer_msg = stasis_message_data(message);
00732    struct ast_bridge_snapshot *bridge = transfer_msg->bridge;
00733 
00734    if (bridge_app_subscribed(app, transfer_msg->transferer->uniqueid) ||
00735       (bridge && bridge_app_subscribed_involved(app, bridge))) {
00736       stasis_publish(app->topic, message);
00737    }
00738 }

static void bridge_default_handler ( void *  data,
struct stasis_subscription sub,
struct stasis_message message 
) [static]

Definition at line 786 of file res/stasis/app.c.

References ao2_cleanup, app, and stasis_subscription_final_message().

Referenced by app_create().

00788 {
00789    struct stasis_app *app = data;
00790 
00791    if (stasis_subscription_final_message(sub, message)) {
00792       ao2_cleanup(app);
00793    }
00794 }

static void* bridge_find ( const struct stasis_app app,
const char *  id 
) [static]

Definition at line 1191 of file res/stasis/app.c.

References stasis_app_bridge_find_by_id().

01192 {
01193    return stasis_app_bridge_find_by_id(id);
01194 }

static void bridge_merge_handler ( void *  data,
struct stasis_subscription sub,
struct stasis_message message 
) [static]

Definition at line 687 of file res/stasis/app.c.

References app, bridge_app_subscribed(), ast_bridge_merge_message::from, stasis_message_data(), stasis_publish(), ast_bridge_merge_message::to, stasis_app::topic, and ast_bridge_snapshot::uniqueid.

Referenced by app_create().

00689 {
00690    struct stasis_app *app = data;
00691    struct ast_bridge_merge_message *merge;
00692 
00693    merge = stasis_message_data(message);
00694 
00695    /* Find out if we're subscribed to either bridge */
00696    if (bridge_app_subscribed(app, merge->from->uniqueid) ||
00697       bridge_app_subscribed(app, merge->to->uniqueid)) {
00698       /* Forward the message to the app */
00699       stasis_publish(app->topic, message);
00700    }
00701 }

static void call_forwarded_handler ( struct stasis_app app,
struct stasis_message message 
) [static]

Definition at line 272 of file res/stasis/app.c.

References app_subscribe_channel(), ast_channel_get_by_name(), ast_channel_unref, ast_multi_channel_blob_get_channel(), stasis_message_data(), and ast_channel_snapshot::uniqueid.

Referenced by sub_default_handler().

00273 {
00274    struct ast_multi_channel_blob *payload = stasis_message_data(message);
00275    struct ast_channel_snapshot *snapshot = ast_multi_channel_blob_get_channel(payload, "forwarded");
00276    struct ast_channel *chan;
00277 
00278    if (!snapshot) {
00279       return;
00280    }
00281 
00282    chan = ast_channel_get_by_name(snapshot->uniqueid);
00283    if (!chan) {
00284       return;
00285    }
00286 
00287    app_subscribe_channel(app, chan);
00288    ast_channel_unref(chan);
00289 }

static struct ast_json* channel_callerid ( struct ast_channel_snapshot old_snapshot,
struct ast_channel_snapshot new_snapshot,
const struct timeval *  tv 
) [static, read]

Definition at line 423 of file res/stasis/app.c.

References ast_channel_snapshot_caller_id_equal(), ast_channel_snapshot_to_json(), ast_describe_caller_presentation(), ast_json_pack(), ast_json_timeval(), ast_channel_snapshot::caller_pres, NULL, and stasis_app_get_sanitizer().

00427 {
00428    struct ast_json *json_channel;
00429 
00430    /* No NewCallerid event on cache clear or first event */
00431    if (!old_snapshot || !new_snapshot) {
00432       return NULL;
00433    }
00434 
00435    if (ast_channel_snapshot_caller_id_equal(old_snapshot, new_snapshot)) {
00436       return NULL;
00437    }
00438 
00439    json_channel = ast_channel_snapshot_to_json(new_snapshot, stasis_app_get_sanitizer());
00440    if (!json_channel) {
00441       return NULL;
00442    }
00443 
00444    return ast_json_pack("{s: s, s: o, s: i, s: s, s: o}",
00445       "type", "ChannelCallerId",
00446       "timestamp", ast_json_timeval(*tv, NULL),
00447       "caller_presentation", new_snapshot->caller_pres,
00448       "caller_presentation_txt", ast_describe_caller_presentation(
00449          new_snapshot->caller_pres),
00450       "channel", json_channel);
00451 }

static struct ast_json* channel_connected_line ( struct ast_channel_snapshot old_snapshot,
struct ast_channel_snapshot new_snapshot,
const struct timeval *  tv 
) [static, read]

Definition at line 453 of file res/stasis/app.c.

References ast_channel_snapshot_connected_line_equal(), ast_channel_snapshot_to_json(), ast_json_pack(), ast_json_timeval(), NULL, and stasis_app_get_sanitizer().

00457 {
00458    struct ast_json *json_channel;
00459 
00460    /* No ChannelConnectedLine event on cache clear or first event */
00461    if (!old_snapshot || !new_snapshot) {
00462       return NULL;
00463    }
00464 
00465    if (ast_channel_snapshot_connected_line_equal(old_snapshot, new_snapshot)) {
00466       return NULL;
00467    }
00468 
00469    json_channel = ast_channel_snapshot_to_json(new_snapshot, stasis_app_get_sanitizer());
00470    if (!json_channel) {
00471       return NULL;
00472    }
00473 
00474    return ast_json_pack("{s: s, s: o, s: o}",
00475       "type", "ChannelConnectedLine",
00476       "timestamp", ast_json_timeval(*tv, NULL),
00477       "channel", json_channel);
00478 }

static struct ast_json* channel_created_event ( struct ast_channel_snapshot snapshot,
const struct timeval *  tv 
) [static, read]

Definition at line 337 of file res/stasis/app.c.

References simple_channel_event().

Referenced by channel_state().

00340 {
00341    return simple_channel_event("ChannelCreated", snapshot, tv);
00342 }

static struct ast_json* channel_destroyed_event ( struct ast_channel_snapshot snapshot,
const struct timeval *  tv 
) [static, read]

Definition at line 344 of file res/stasis/app.c.

References ast_cause2str(), ast_channel_snapshot_to_json(), ast_json_pack(), ast_json_timeval(), ast_channel_snapshot::hangupcause, NULL, and stasis_app_get_sanitizer().

Referenced by channel_state().

00347 {
00348    struct ast_json *json_channel = ast_channel_snapshot_to_json(snapshot, stasis_app_get_sanitizer());
00349 
00350    if (!json_channel) {
00351       return NULL;
00352    }
00353 
00354    return ast_json_pack("{s: s, s: o, s: i, s: s, s: o}",
00355       "type", "ChannelDestroyed",
00356       "timestamp", ast_json_timeval(*tv, NULL),
00357       "cause", snapshot->hangupcause,
00358       "cause_txt", ast_cause2str(snapshot->hangupcause),
00359       "channel", json_channel);
00360 }

static struct ast_json* channel_dialplan ( struct ast_channel_snapshot old_snapshot,
struct ast_channel_snapshot new_snapshot,
const struct timeval *  tv 
) [static, read]

Definition at line 389 of file res/stasis/app.c.

References ast_channel_snapshot::appl, ast_channel_snapshot_cep_equal(), ast_channel_snapshot_to_json(), ast_json_pack(), ast_json_timeval(), ast_strlen_zero, ast_channel_snapshot::data, NULL, and stasis_app_get_sanitizer().

00393 {
00394    struct ast_json *json_channel;
00395 
00396    /* No Newexten event on cache clear or first event */
00397    if (!old_snapshot || !new_snapshot) {
00398       return NULL;
00399    }
00400 
00401    /* Empty application is not valid for a Newexten event */
00402    if (ast_strlen_zero(new_snapshot->appl)) {
00403       return NULL;
00404    }
00405 
00406    if (ast_channel_snapshot_cep_equal(old_snapshot, new_snapshot)) {
00407       return NULL;
00408    }
00409 
00410    json_channel = ast_channel_snapshot_to_json(new_snapshot, stasis_app_get_sanitizer());
00411    if (!json_channel) {
00412       return NULL;
00413    }
00414 
00415    return ast_json_pack("{s: s, s: o, s: s, s: s, s: o}",
00416       "type", "ChannelDialplan",
00417       "timestamp", ast_json_timeval(*tv, NULL),
00418       "dialplan_app", new_snapshot->appl,
00419       "dialplan_app_data", new_snapshot->data,
00420       "channel", json_channel);
00421 }

static void* channel_find ( const struct stasis_app app,
const char *  id 
) [static]

Definition at line 1122 of file res/stasis/app.c.

References ast_channel_get_by_name().

01123 {
01124    return ast_channel_get_by_name(id);
01125 }

static struct ast_json* channel_state ( struct ast_channel_snapshot old_snapshot,
struct ast_channel_snapshot new_snapshot,
const struct timeval *  tv 
) [static, read]

Handle channel state changes.

Definition at line 370 of file res/stasis/app.c.

References channel_created_event(), channel_destroyed_event(), channel_state_change_event(), NULL, and ast_channel_snapshot::state.

00374 {
00375    struct ast_channel_snapshot *snapshot = new_snapshot ?
00376       new_snapshot : old_snapshot;
00377 
00378    if (!old_snapshot) {
00379       return channel_created_event(snapshot, tv);
00380    } else if (!new_snapshot) {
00381       return channel_destroyed_event(snapshot, tv);
00382    } else if (old_snapshot->state != new_snapshot->state) {
00383       return channel_state_change_event(snapshot, tv);
00384    }
00385 
00386    return NULL;
00387 }

static struct ast_json* channel_state_change_event ( struct ast_channel_snapshot snapshot,
const struct timeval *  tv 
) [static, read]

Definition at line 362 of file res/stasis/app.c.

References simple_channel_event().

Referenced by channel_state().

00365 {
00366    return simple_channel_event("ChannelStateChange", snapshot, tv);
00367 }

static void* endpoint_find ( const struct stasis_app app,
const char *  id 
) [static]

Definition at line 1254 of file res/stasis/app.c.

References ast_endpoint_find_by_id().

01255 {
01256    return ast_endpoint_find_by_id(id);
01257 }

static struct app_forwards* forwards_create ( struct stasis_app app,
const char *  id 
) [static, read]

Definition at line 100 of file res/stasis/app.c.

References ao2_alloc, ao2_cleanup, ao2_ref, ast_strlen_zero, forwards_dtor(), NULL, and RAII_VAR.

Referenced by forwards_create_bridge(), forwards_create_channel(), and forwards_create_endpoint().

00102 {
00103    RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
00104 
00105    if (!app || ast_strlen_zero(id)) {
00106       return NULL;
00107    }
00108 
00109    forwards = ao2_alloc(sizeof(*forwards) + strlen(id) + 1, forwards_dtor);
00110    if (!forwards) {
00111       return NULL;
00112    }
00113 
00114    strcpy(forwards->id, id);
00115 
00116    ao2_ref(forwards, +1);
00117    return forwards;
00118 }

static struct app_forwards* forwards_create_bridge ( struct stasis_app app,
struct ast_bridge bridge 
) [static, read]

Forward a bridge's topics to an app

Definition at line 156 of file res/stasis/app.c.

References ao2_cleanup, ao2_ref, ast_bridge_topic(), ast_bridge_topic_cached(), FORWARD_BRIDGE, forwards_create(), NULL, RAII_VAR, stasis_forward_all(), stasis_forward_cancel(), stasis_app::topic, and ast_bridge::uniqueid.

Referenced by app_subscribe_bridge().

00158 {
00159    RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
00160 
00161    if (!app || !bridge) {
00162       return NULL;
00163    }
00164 
00165    forwards = forwards_create(app, bridge->uniqueid);
00166    if (!forwards) {
00167       return NULL;
00168    }
00169 
00170    forwards->forward_type = FORWARD_BRIDGE;
00171    forwards->topic_forward = stasis_forward_all(ast_bridge_topic(bridge),
00172       app->topic);
00173    if (!forwards->topic_forward) {
00174       return NULL;
00175    }
00176 
00177    forwards->topic_cached_forward = stasis_forward_all(
00178       ast_bridge_topic_cached(bridge), app->topic);
00179    if (!forwards->topic_cached_forward) {
00180       /* Half-subscribed is a bad thing */
00181       stasis_forward_cancel(forwards->topic_forward);
00182       forwards->topic_forward = NULL;
00183       return NULL;
00184    }
00185 
00186    ao2_ref(forwards, +1);
00187    return forwards;
00188 }

static struct app_forwards* forwards_create_channel ( struct stasis_app app,
struct ast_channel chan 
) [static, read]

Forward a channel's topics to an app

Definition at line 121 of file res/stasis/app.c.

References ao2_cleanup, ao2_ref, ast_channel_topic(), ast_channel_topic_cached(), ast_channel_uniqueid(), FORWARD_CHANNEL, forwards_create(), NULL, RAII_VAR, stasis_forward_all(), stasis_forward_cancel(), and stasis_app::topic.

Referenced by app_subscribe_channel().

00123 {
00124    RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
00125 
00126    if (!app || !chan) {
00127       return NULL;
00128    }
00129 
00130    forwards = forwards_create(app, ast_channel_uniqueid(chan));
00131    if (!forwards) {
00132       return NULL;
00133    }
00134 
00135    forwards->forward_type = FORWARD_CHANNEL;
00136    forwards->topic_forward = stasis_forward_all(ast_channel_topic(chan),
00137       app->topic);
00138    if (!forwards->topic_forward) {
00139       return NULL;
00140    }
00141 
00142    forwards->topic_cached_forward = stasis_forward_all(
00143       ast_channel_topic_cached(chan), app->topic);
00144    if (!forwards->topic_cached_forward) {
00145       /* Half-subscribed is a bad thing */
00146       stasis_forward_cancel(forwards->topic_forward);
00147       forwards->topic_forward = NULL;
00148       return NULL;
00149    }
00150 
00151    ao2_ref(forwards, +1);
00152    return forwards;
00153 }

static struct app_forwards* forwards_create_endpoint ( struct stasis_app app,
struct ast_endpoint endpoint 
) [static, read]

Forward a endpoint's topics to an app

Definition at line 191 of file res/stasis/app.c.

References ao2_cleanup, ao2_ref, ast_endpoint_get_id(), ast_endpoint_topic(), ast_endpoint_topic_cached(), FORWARD_ENDPOINT, forwards_create(), NULL, RAII_VAR, stasis_forward_all(), stasis_forward_cancel(), and stasis_app::topic.

Referenced by app_subscribe_endpoint().

00193 {
00194    RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
00195 
00196    if (!app || !endpoint) {
00197       return NULL;
00198    }
00199 
00200    forwards = forwards_create(app, ast_endpoint_get_id(endpoint));
00201    if (!forwards) {
00202       return NULL;
00203    }
00204 
00205    forwards->forward_type = FORWARD_ENDPOINT;
00206    forwards->topic_forward = stasis_forward_all(ast_endpoint_topic(endpoint),
00207       app->topic);
00208    if (!forwards->topic_forward) {
00209       return NULL;
00210    }
00211 
00212    forwards->topic_cached_forward = stasis_forward_all(
00213       ast_endpoint_topic_cached(endpoint), app->topic);
00214    if (!forwards->topic_cached_forward) {
00215       /* Half-subscribed is a bad thing */
00216       stasis_forward_cancel(forwards->topic_forward);
00217       forwards->topic_forward = NULL;
00218       return NULL;
00219    }
00220 
00221    ao2_ref(forwards, +1);
00222    return forwards;
00223 }

static void forwards_dtor ( void *  obj  )  [static]

Definition at line 82 of file res/stasis/app.c.

References ast_assert, NULL, app_forwards::topic_cached_forward, and app_forwards::topic_forward.

Referenced by forwards_create().

00083 {
00084 #ifdef AST_DEVMODE
00085    struct app_forwards *forwards = obj;
00086 #endif /* AST_DEVMODE */
00087 
00088    ast_assert(forwards->topic_forward == NULL);
00089    ast_assert(forwards->topic_cached_forward == NULL);
00090 }

static int forwards_sort ( const void *  obj_left,
const void *  obj_right,
int  flags 
) [static]

Definition at line 225 of file res/stasis/app.c.

References ast_assert, app_forwards::id, OBJ_KEY, OBJ_PARTIAL_KEY, and OBJ_POINTER.

Referenced by app_create().

00226 {
00227    const struct app_forwards *object_left = obj_left;
00228    const struct app_forwards *object_right = obj_right;
00229    const char *right_key = obj_right;
00230    int cmp;
00231 
00232    switch (flags & (OBJ_POINTER | OBJ_KEY | OBJ_PARTIAL_KEY)) {
00233    case OBJ_POINTER:
00234       right_key = object_right->id;
00235       /* Fall through */
00236    case OBJ_KEY:
00237       cmp = strcmp(object_left->id, right_key);
00238       break;
00239    case OBJ_PARTIAL_KEY:
00240       /*
00241        * We could also use a partial key struct containing a length
00242        * so strlen() does not get called for every comparison instead.
00243        */
00244       cmp = strncmp(object_left->id, right_key, strlen(right_key));
00245       break;
00246    default:
00247       /* Sort can only work on something with a full or partial key. */
00248       ast_assert(0);
00249       cmp = 0;
00250       break;
00251    }
00252    return cmp;
00253 }

static void forwards_unsubscribe ( struct app_forwards forwards  )  [static]

Definition at line 92 of file res/stasis/app.c.

References NULL, stasis_forward_cancel(), app_forwards::topic_cached_forward, and app_forwards::topic_forward.

Referenced by unsubscribe().

00093 {
00094    stasis_forward_cancel(forwards->topic_forward);
00095    forwards->topic_forward = NULL;
00096    stasis_forward_cancel(forwards->topic_cached_forward);
00097    forwards->topic_cached_forward = NULL;
00098 }

static int message_received_handler ( const char *  endpoint_id,
struct ast_json json_msg,
void *  pvt 
) [static]

Definition at line 544 of file res/stasis/app.c.

References ao2_cleanup, app, app_send(), ast_endpoint_latest_snapshot(), ast_endpoint_snapshot_to_json(), ast_json_pack(), ast_json_timeval(), ast_strdupa, ast_strlen_zero, ast_tvnow(), NULL, RAII_VAR, and stasis_app_get_sanitizer().

Referenced by app_subscribe_endpoint().

00545 {
00546    RAII_VAR(struct ast_endpoint_snapshot *, snapshot, NULL, ao2_cleanup);
00547    struct ast_json *json_endpoint;
00548    struct stasis_app *app = pvt;
00549    char *tech;
00550    char *resource;
00551 
00552    tech = ast_strdupa(endpoint_id);
00553    resource = strchr(tech, '/');
00554    if (resource) {
00555       resource[0] = '\0';
00556       resource++;
00557    }
00558 
00559    if (ast_strlen_zero(tech) || ast_strlen_zero(resource)) {
00560       return -1;
00561    }
00562 
00563    snapshot = ast_endpoint_latest_snapshot(tech, resource);
00564    if (!snapshot) {
00565       return -1;
00566    }
00567 
00568    json_endpoint = ast_endpoint_snapshot_to_json(snapshot, stasis_app_get_sanitizer());
00569    if (!json_endpoint) {
00570       return -1;
00571    }
00572 
00573    app_send(app, ast_json_pack("{s: s, s: o, s: o, s: O}",
00574       "type", "TextMessageReceived",
00575       "timestamp", ast_json_timeval(ast_tvnow(), NULL),
00576       "endpoint", json_endpoint,
00577       "message", json_msg));
00578 
00579    return 0;
00580 }

static struct ast_json* simple_bridge_event ( const char *  type,
struct ast_bridge_snapshot snapshot,
const struct timeval *  tv 
) [static, read]

Definition at line 618 of file res/stasis/app.c.

References ast_bridge_snapshot_to_json(), ast_json_pack(), ast_json_timeval(), NULL, and stasis_app_get_sanitizer().

Referenced by sub_bridge_update_handler().

00622 {
00623    struct ast_json *json_bridge = ast_bridge_snapshot_to_json(snapshot, stasis_app_get_sanitizer());
00624    if (!json_bridge) {
00625       return NULL;
00626    }
00627 
00628    return ast_json_pack("{s: s, s: o, s: o}",
00629       "type", type,
00630       "timestamp", ast_json_timeval(*tv, NULL),
00631       "bridge", json_bridge);
00632 }

static struct ast_json* simple_channel_event ( const char *  type,
struct ast_channel_snapshot snapshot,
const struct timeval *  tv 
) [static, read]

Definition at line 320 of file res/stasis/app.c.

References ast_channel_snapshot_to_json(), ast_json_pack(), ast_json_timeval(), NULL, and stasis_app_get_sanitizer().

Referenced by channel_created_event(), and channel_state_change_event().

00324 {
00325    struct ast_json *json_channel = ast_channel_snapshot_to_json(snapshot, stasis_app_get_sanitizer());
00326 
00327    if (!json_channel) {
00328       return NULL;
00329    }
00330 
00331    return ast_json_pack("{s: s, s: o, s: o}",
00332       "type", type,
00333       "timestamp", ast_json_timeval(*tv, NULL),
00334       "channel", json_channel);
00335 }

static struct ast_json* simple_endpoint_event ( const char *  type,
struct ast_endpoint_snapshot snapshot,
const struct timeval *  tv 
) [static, read]

Definition at line 527 of file res/stasis/app.c.

References ast_endpoint_snapshot_to_json(), ast_json_pack(), ast_json_timeval(), NULL, and stasis_app_get_sanitizer().

Referenced by sub_endpoint_update_handler().

00531 {
00532    struct ast_json *json_endpoint = ast_endpoint_snapshot_to_json(snapshot, stasis_app_get_sanitizer());
00533 
00534    if (!json_endpoint) {
00535       return NULL;
00536    }
00537 
00538    return ast_json_pack("{s: s, s: o, s: o}",
00539       "type", type,
00540       "timestamp", ast_json_timeval(*tv, NULL),
00541       "endpoint", json_endpoint);
00542 }

int stasis_app_is_core_event_source ( struct stasis_app_event_source obj  ) 

Checks to see if the given object is a core event source.

Note:
core event sources are currently only endpoint, bridge, and channel.
Parameters:
obj event source object to check
Returns:
non-zero if core event source, otherwise 0 (false)

Definition at line 1274 of file res/stasis/app.c.

Referenced by stasis_app_register_event_source(), and stasis_app_unregister_event_source().

01275 {
01276    return obj == &endpoint_event_source ||
01277       obj == &bridge_event_source ||
01278       obj == &channel_event_source;
01279 }

void stasis_app_register_event_sources ( void   ) 

void stasis_app_unregister_event_sources ( void   ) 

static void sub_bridge_update_handler ( void *  data,
struct stasis_subscription sub,
struct stasis_message message 
) [static]

Definition at line 634 of file res/stasis/app.c.

References app, app_send(), ast_assert, ast_bridge_snapshot_type(), ast_json_unref(), stasis_cache_update::new_snapshot, NULL, stasis_cache_update::old_snapshot, RAII_VAR, simple_bridge_event(), stasis_cache_update_type(), stasis_message_data(), stasis_message_timestamp(), stasis_message_type(), tv, stasis_cache_update::type, ast_bridge_snapshot::uniqueid, unsubscribe(), and update().

Referenced by app_create().

00637 {
00638    RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
00639    struct stasis_app *app = data;
00640    struct stasis_cache_update *update;
00641    struct ast_bridge_snapshot *new_snapshot;
00642    struct ast_bridge_snapshot *old_snapshot;
00643    const struct timeval *tv;
00644 
00645    ast_assert(stasis_message_type(message) == stasis_cache_update_type());
00646 
00647    update = stasis_message_data(message);
00648 
00649    ast_assert(update->type == ast_bridge_snapshot_type());
00650 
00651    new_snapshot = stasis_message_data(update->new_snapshot);
00652    old_snapshot = stasis_message_data(update->old_snapshot);
00653    tv = update->new_snapshot ?
00654       stasis_message_timestamp(update->new_snapshot) :
00655       stasis_message_timestamp(message);
00656 
00657    if (!new_snapshot) {
00658       json = simple_bridge_event("BridgeDestroyed", old_snapshot, tv);
00659    } else if (!old_snapshot) {
00660       json = simple_bridge_event("BridgeCreated", new_snapshot, tv);
00661    }
00662 
00663    if (json) {
00664       app_send(app, json);
00665    }
00666 
00667    if (!new_snapshot && old_snapshot) {
00668       unsubscribe(app, "bridge", old_snapshot->uniqueid, 1);
00669    }
00670 }

static void sub_channel_update_handler ( void *  data,
struct stasis_subscription sub,
struct stasis_message message 
) [static]

Definition at line 487 of file res/stasis/app.c.

References app, app_send(), ARRAY_LEN, ast_assert, ast_channel_snapshot_type(), ast_json_unref(), stasis_cache_update::new_snapshot, NULL, stasis_cache_update::old_snapshot, RAII_VAR, stasis_cache_update_type(), stasis_message_data(), stasis_message_timestamp(), stasis_message_type(), tv, stasis_cache_update::type, ast_channel_snapshot::uniqueid, unsubscribe(), and update().

Referenced by app_create().

00490 {
00491    struct stasis_app *app = data;
00492    struct stasis_cache_update *update;
00493    struct ast_channel_snapshot *new_snapshot;
00494    struct ast_channel_snapshot *old_snapshot;
00495    const struct timeval *tv;
00496    int i;
00497 
00498    ast_assert(stasis_message_type(message) == stasis_cache_update_type());
00499 
00500    update = stasis_message_data(message);
00501 
00502    ast_assert(update->type == ast_channel_snapshot_type());
00503 
00504    new_snapshot = stasis_message_data(update->new_snapshot);
00505    old_snapshot = stasis_message_data(update->old_snapshot);
00506 
00507    /* Pull timestamp from the new snapshot, or from the update message
00508     * when there isn't one. */
00509    tv = update->new_snapshot ?
00510       stasis_message_timestamp(update->new_snapshot) :
00511       stasis_message_timestamp(message);
00512 
00513    for (i = 0; i < ARRAY_LEN(channel_monitors); ++i) {
00514       RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
00515 
00516       msg = channel_monitors[i](old_snapshot, new_snapshot, tv);
00517       if (msg) {
00518          app_send(app, msg);
00519       }
00520    }
00521 
00522    if (!new_snapshot && old_snapshot) {
00523       unsubscribe(app, "channel", old_snapshot->uniqueid, 1);
00524    }
00525 }

static void sub_default_handler ( void *  data,
struct stasis_subscription sub,
struct stasis_message message 
) [static]

Definition at line 291 of file res/stasis/app.c.

References ao2_cleanup, app, app_send(), ast_channel_dial_type(), ast_json_unref(), call_forwarded_handler(), NULL, RAII_VAR, stasis_app_get_sanitizer(), stasis_message_to_json(), stasis_message_type(), and stasis_subscription_final_message().

Referenced by app_create().

00293 {
00294    struct stasis_app *app = data;
00295    RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
00296 
00297    if (stasis_subscription_final_message(sub, message)) {
00298       ao2_cleanup(app);
00299    }
00300 
00301    if (stasis_message_type(message) == ast_channel_dial_type()) {
00302       call_forwarded_handler(app, message);
00303    }
00304 
00305    /* By default, send any message that has a JSON representation */
00306    json = stasis_message_to_json(message, stasis_app_get_sanitizer());
00307    if (!json) {
00308       return;
00309    }
00310 
00311    app_send(app, json);
00312 }

static void sub_endpoint_update_handler ( void *  data,
struct stasis_subscription sub,
struct stasis_message message 
) [static]

Definition at line 582 of file res/stasis/app.c.

References app, app_send(), ast_assert, ast_endpoint_snapshot_type(), ast_json_unref(), ast_endpoint_snapshot::id, stasis_cache_update::new_snapshot, NULL, stasis_cache_update::old_snapshot, RAII_VAR, simple_endpoint_event(), stasis_cache_update_type(), stasis_message_data(), stasis_message_timestamp(), stasis_message_type(), tv, stasis_cache_update::type, unsubscribe(), and update().

Referenced by app_create().

00585 {
00586    RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
00587    struct stasis_app *app = data;
00588    struct stasis_cache_update *update;
00589    struct ast_endpoint_snapshot *new_snapshot;
00590    struct ast_endpoint_snapshot *old_snapshot;
00591    const struct timeval *tv;
00592 
00593    ast_assert(stasis_message_type(message) == stasis_cache_update_type());
00594 
00595    update = stasis_message_data(message);
00596 
00597    ast_assert(update->type == ast_endpoint_snapshot_type());
00598 
00599    new_snapshot = stasis_message_data(update->new_snapshot);
00600    old_snapshot = stasis_message_data(update->old_snapshot);
00601 
00602    if (new_snapshot) {
00603       tv = stasis_message_timestamp(update->new_snapshot);
00604 
00605       json = simple_endpoint_event("EndpointStateChange", new_snapshot, tv);
00606       if (!json) {
00607          return;
00608       }
00609 
00610       app_send(app, json);
00611    }
00612 
00613    if (!new_snapshot && old_snapshot) {
00614       unsubscribe(app, "endpoint", old_snapshot->id, 1);
00615    }
00616 }

static int subscribe_bridge ( struct stasis_app app,
void *  obj 
) [static]

Definition at line 1161 of file res/stasis/app.c.

References app_subscribe_bridge().

01162 {
01163    return app_subscribe_bridge(app, obj);
01164 }

static int subscribe_channel ( struct stasis_app app,
void *  obj 
) [static]

Definition at line 1063 of file res/stasis/app.c.

References app_subscribe_channel().

01064 {
01065    return app_subscribe_channel(app, obj);
01066 }

static int subscribe_endpoint ( struct stasis_app app,
void *  obj 
) [static]

Definition at line 1233 of file res/stasis/app.c.

References app_subscribe_endpoint().

01234 {
01235    return app_subscribe_endpoint(app, obj);
01236 }

static int unsubscribe ( struct stasis_app app,
const char *  kind,
const char *  id,
int  terminate 
) [static]

Definition at line 1068 of file res/stasis/app.c.

References ao2_cleanup, ao2_find, ast_debug, stasis_app::forwards, forwards_unsubscribe(), lock, messaging_app_unsubscribe_endpoint(), stasis_app::name, NULL, OBJ_NODATA, OBJ_NOLOCK, OBJ_POINTER, OBJ_SEARCH_KEY, OBJ_UNLINK, RAII_VAR, and SCOPED_AO2LOCK.

01069 {
01070    RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup);
01071    SCOPED_AO2LOCK(lock, app->forwards);
01072 
01073    forwards = ao2_find(app->forwards, id, OBJ_SEARCH_KEY | OBJ_NOLOCK);
01074    if (!forwards) {
01075       ast_debug(3, "App '%s' not subscribed to %s '%s'\n", app->name, kind, id);
01076       return -1;
01077    }
01078    forwards->interested--;
01079 
01080    ast_debug(3, "%s '%s': is %d interested in %s\n", kind, id, forwards->interested, app->name);
01081    if (forwards->interested == 0 || terminate) {
01082       /* No one is interested any more; unsubscribe */
01083       ast_debug(3, "%s '%s' unsubscribed from %s\n", kind, id, app->name);
01084       forwards_unsubscribe(forwards);
01085       ao2_find(app->forwards, forwards,
01086          OBJ_POINTER | OBJ_NOLOCK | OBJ_UNLINK |
01087          OBJ_NODATA);
01088 
01089       if (!strcmp(kind, "endpoint")) {
01090          messaging_app_unsubscribe_endpoint(app->name, id);
01091       }
01092    }
01093 
01094    return 0;
01095 }


Variable Documentation

Definition at line 1196 of file res/stasis/app.c.

Definition at line 1127 of file res/stasis/app.c.

Definition at line 480 of file res/stasis/app.c.

Definition at line 1259 of file res/stasis/app.c.


Generated on Thu Apr 16 06:28:15 2015 for Asterisk - The Open Source Telephony Project by  doxygen 1.5.6