stasis.h File Reference

Stasis Message Bus API. See Stasis Message Bus API for detailed documentation. More...

#include "asterisk/json.h"
#include "asterisk/manager.h"
#include "asterisk/utils.h"
#include "asterisk/event.h"

Include dependency graph for stasis.h:

Go to the source code of this file.

Data Structures

struct  stasis_cache_update
 Cache update message. More...
struct  stasis_message_sanitizer
 Structure containing callbacks for Stasis message sanitization. More...
struct  stasis_message_vtable
 Virtual table providing methods for messages. More...
struct  stasis_subscription_change
 Holds details about changes to subscriptions for the specified topic. More...
#define STASIS_MESSAGE_TYPE_CLEANUP(name)
 Boiler-plate messaging macro for cleaning up message types.
#define STASIS_MESSAGE_TYPE_DEFN(name,...)
 Boiler-plate messaging macro for defining public message types.
#define STASIS_MESSAGE_TYPE_DEFN_LOCAL(name,...)
 Boiler-plate messaging macro for defining local message types.
#define STASIS_MESSAGE_TYPE_INIT(name)
 Boiler-plate messaging macro for initializing message types.
void stasis_log_bad_type_access (const char *name)
#define STASIS_UMOS_MAX   (STASIS_UMOS_ENDPOINT + 1)
 Number of snapshot types.
enum  stasis_user_multi_object_snapshot_type { STASIS_UMOS_CHANNEL = 0, STASIS_UMOS_BRIDGE, STASIS_UMOS_ENDPOINT }
 Object type code for multi user object snapshots. More...
typedef struct stasis_message *(* cache_aggregate_calc_fn )(struct stasis_cache_entry *entry, struct stasis_message *new_snapshot)
 Callback to calculate the aggregate cache entry.
typedef void(* cache_aggregate_publish_fn )(struct stasis_topic *topic, struct stasis_message *aggregate)
 Callback to publish the aggregate cache entry message.
typedef const char *(* snapshot_get_id )(struct stasis_message *message)
 Callback extract a unique identity from a snapshot message.
void ast_multi_object_blob_add (struct ast_multi_object_blob *multi, enum stasis_user_multi_object_snapshot_type type, void *object)
 Add an object to a multi object blob previously created.
struct ast_multi_object_blobast_multi_object_blob_create (struct ast_json *blob)
 Create a stasis multi object blob.
void ast_multi_object_blob_single_channel_publish (struct ast_channel *chan, struct stasis_message_type *type, struct ast_json *blob)
 Create and publish a stasis message blob on a channel with it's snapshot.
struct stasis_message_typeast_multi_user_event_type (void)
 Message type for custom user defined events with multi object blobs.
struct stasis_messagestasis_cache_clear_create (struct stasis_message *message)
 A message which instructs the caching topic to remove an entry from its cache.
struct stasis_cachestasis_cache_create (snapshot_get_id id_fn)
 Create a cache.
struct stasis_cachestasis_cache_create_full (snapshot_get_id id_fn, cache_aggregate_calc_fn aggregate_calc_fn, cache_aggregate_publish_fn aggregate_publish_fn)
 Create a cache.
struct ao2_containerstasis_cache_dump (struct stasis_cache *cache, struct stasis_message_type *type)
 Dump cached items to a subscription for the ast_eid_default entity.
struct ao2_containerstasis_cache_dump_all (struct stasis_cache *cache, struct stasis_message_type *type)
 Dump all entity items from the cache to a subscription.
struct ao2_containerstasis_cache_dump_by_eid (struct stasis_cache *cache, struct stasis_message_type *type, const struct ast_eid *eid)
 Dump cached items to a subscription for a specific entity.
struct stasis_messagestasis_cache_entry_get_aggregate (struct stasis_cache_entry *entry)
 Get the aggregate cache entry snapshot.
struct stasis_messagestasis_cache_entry_get_local (struct stasis_cache_entry *entry)
 Get the local entity's cache entry snapshot.
struct stasis_messagestasis_cache_entry_get_remote (struct stasis_cache_entry *entry, int idx)
 Get a remote entity's cache entry snapshot by index.
struct stasis_messagestasis_cache_get (struct stasis_cache *cache, struct stasis_message_type *type, const char *id)
 Retrieve an item from the cache for the ast_eid_default entity.
struct ao2_containerstasis_cache_get_all (struct stasis_cache *cache, struct stasis_message_type *type, const char *id)
 Retrieve all matching entity items from the cache.
struct stasis_messagestasis_cache_get_by_eid (struct stasis_cache *cache, struct stasis_message_type *type, const char *id, const struct ast_eid *eid)
 Retrieve an item from the cache for a specific entity.
struct stasis_topicstasis_caching_get_topic (struct stasis_caching_topic *caching_topic)
 Returns the topic of cached events from a caching topics.
struct stasis_caching_topicstasis_caching_topic_create (struct stasis_topic *original_topic, struct stasis_cache *cache)
 Create a topic which monitors and caches messages from another topic.
struct stasis_caching_topicstasis_caching_unsubscribe (struct stasis_caching_topic *caching_topic)
 Unsubscribes a caching topic from its upstream topic.
struct stasis_caching_topicstasis_caching_unsubscribe_and_join (struct stasis_caching_topic *caching_topic)
 Unsubscribes a caching topic from its upstream topic, blocking until all messages have been forwarded.
enum  stasis_message_type_result { STASIS_MESSAGE_TYPE_ERROR = -1, STASIS_MESSAGE_TYPE_SUCCESS, STASIS_MESSAGE_TYPE_DECLINED }
 Return code for Stasis message type creation attempts. More...
struct stasis_messagestasis_message_create (struct stasis_message_type *type, void *data)
 Create a new message.
struct stasis_messagestasis_message_create_full (struct stasis_message_type *type, void *data, const struct ast_eid *eid)
 Create a new message for an entity.
void * stasis_message_data (const struct stasis_message *msg)
 Get the data contained in a message.
struct ast_eidstasis_message_eid (const struct stasis_message *msg)
 Get the entity id for a stasis_message.
struct timeval * stasis_message_timestamp (const struct stasis_message *msg)
 Get the time when a message was created.
struct ast_manager_event_blobstasis_message_to_ami (struct stasis_message *message)
 Build the AMI representation of the message.
struct ast_eventstasis_message_to_event (struct stasis_message *message)
 Build the Generic event system representation of the message.
struct ast_jsonstasis_message_to_json (struct stasis_message *message, struct stasis_message_sanitizer *sanitize)
 Build the JSON representation of the message.
struct stasis_message_typestasis_message_type (const struct stasis_message *msg)
 Get the message type for a stasis_message.
enum stasis_message_type_result stasis_message_type_create (const char *name, struct stasis_message_vtable *vtable, struct stasis_message_type **result)
 Create a new message type.
int stasis_message_type_declined (const char *name)
 Check whether a message type is declined.
const char * stasis_message_type_name (const struct stasis_message_type *type)
 Gets the name of a given message type.
typedef void(* stasis_subscription_cb )(void *data, struct stasis_subscription *sub, struct stasis_message *message)
 Callback function type for Stasis subscriptions.
struct stasis_forwardstasis_forward_all (struct stasis_topic *from_topic, struct stasis_topic *to_topic)
 Create a subscription which forwards all messages from one topic to another.
struct stasis_forwardstasis_forward_cancel (struct stasis_forward *forward)
struct stasis_subscriptionstasis_subscribe (struct stasis_topic *topic, stasis_subscription_cb callback, void *data)
 Create a subscription.
struct stasis_subscriptionstasis_subscribe_pool (struct stasis_topic *topic, stasis_subscription_cb callback, void *data)
 Create a subscription whose callbacks occur on a thread pool.
struct stasis_message_typestasis_subscription_change_type (void)
 Gets the message type for subscription change notices.
int stasis_subscription_final_message (struct stasis_subscription *sub, struct stasis_message *msg)
 Determine whether a message is the final message to be received on a subscription.
int stasis_subscription_is_done (struct stasis_subscription *subscription)
 Returns whether subscription has received its final message.
int stasis_subscription_is_subscribed (const struct stasis_subscription *sub)
 Returns whether a subscription is currently subscribed.
void stasis_subscription_join (struct stasis_subscription *subscription)
 Block until the last message is processed on a subscription.
const char * stasis_subscription_uniqueid (const struct stasis_subscription *sub)
 Get the unique ID for the subscription.
struct stasis_subscriptionstasis_unsubscribe (struct stasis_subscription *subscription)
 Cancel a subscription.
struct stasis_subscriptionstasis_unsubscribe_and_join (struct stasis_subscription *subscription)
 Cancel a subscription, blocking until the last message is processed.

Functions

struct stasis_message_typestasis_cache_clear_type (void)
 Message type for clearing a message from a stasis cache.
struct stasis_message_typestasis_cache_update_type (void)
 Message type for cache update messages.
int stasis_cache_init (void)
int stasis_config_init (void)
int stasis_init (void)
 Initialize the Stasis subsystem.
void stasis_publish (struct stasis_topic *topic, struct stasis_message *message)
 Publish a message to a topic's subscribers.
void stasis_publish_sync (struct stasis_subscription *sub, struct stasis_message *message)
 Publish a message to a topic's subscribers, synchronizing on the specified subscriber.
struct stasis_topicstasis_topic_create (const char *name)
 Create a new topic.
const char * stasis_topic_name (const struct stasis_topic *topic)
 Return the name of a topic.
struct stasis_topic_poolstasis_topic_pool_create (struct stasis_topic *pooled_topic)
 Create a topic pool that routes messages from dynamically generated topics to the given topic.
struct stasis_topicstasis_topic_pool_get_topic (struct stasis_topic_pool *pool, const char *topic_name)
 Find or create a topic in the pool.


Detailed Description

Stasis Message Bus API. See Stasis Message Bus API for detailed documentation.

Author:
David M. Lee, II <dlee@digium.com>
Since:
12

Definition in file stasis.h.


Typedef Documentation

typedef void(* stasis_subscription_cb)(void *data, struct stasis_subscription *sub, struct stasis_message *message)

Callback function type for Stasis subscriptions.

Parameters:
data Data field provided with subscription.
message Published message.
Since:
12

Definition at line 521 of file stasis.h.


Enumeration Type Documentation

Return code for Stasis message type creation attempts.

Enumerator:
STASIS_MESSAGE_TYPE_ERROR  Message type was not created due to allocation failure
STASIS_MESSAGE_TYPE_SUCCESS  Message type was created successfully
STASIS_MESSAGE_TYPE_DECLINED  Message type was not created due to configuration

Definition at line 290 of file stasis.h.

00290                                 {
00291    STASIS_MESSAGE_TYPE_ERROR = -1,  /*!< Message type was not created due to allocation failure */
00292    STASIS_MESSAGE_TYPE_SUCCESS,  /*!< Message type was created successfully */
00293    STASIS_MESSAGE_TYPE_DECLINED, /*!< Message type was not created due to configuration */
00294 };


Function Documentation

struct stasis_forward* stasis_forward_all ( struct stasis_topic from_topic,
struct stasis_topic to_topic 
) [read]

Create a subscription which forwards all messages from one topic to another.

Note that the topic parameter of the invoked callback will the be the topic the message was sent to, not the topic the subscriber subscribed to.

Parameters:
from_topic Topic to forward.
to_topic Destination topic of forwarded messages.
Returns:
New forwarding subscription.

NULL on error.

Since:
12

Definition at line 884 of file stasis.c.

References AO2_ALLOC_OPT_LOCK_NOLOCK, ao2_alloc_options, ao2_bump, ao2_cleanup, ao2_unlock, AST_VECTOR_APPEND, AST_VECTOR_GET, AST_VECTOR_SIZE, forward_dtor(), NULL, RAII_VAR, topic_add_subscription(), and topic_lock_both.

Referenced by __init_manager(), ari_bridges_play_new(), ast_ari_bridges_record(), ast_channel_forward_endpoint(), AST_TEST_DEFINE(), create_subscriptions(), endpoint_internal_create(), forwards_create_bridge(), forwards_create_channel(), forwards_create_endpoint(), load_general_config(), load_module(), manager_bridging_init(), manager_channels_init(), manager_mwi_init(), manager_subscriptions_init(), manager_system_init(), stasis_cp_all_create(), stasis_cp_single_create(), and stasis_topic_pool_get_topic().

00886 {
00887    int res;
00888    size_t idx;
00889    RAII_VAR(struct stasis_forward *, forward, NULL, ao2_cleanup);
00890 
00891    if (!from_topic || !to_topic) {
00892       return NULL;
00893    }
00894 
00895    forward = ao2_alloc_options(sizeof(*forward), forward_dtor, AO2_ALLOC_OPT_LOCK_NOLOCK);
00896    if (!forward) {
00897       return NULL;
00898    }
00899 
00900    /* Forwards to ourselves are implicit. */
00901    if (to_topic == from_topic) {
00902       return ao2_bump(forward);
00903    }
00904 
00905    forward->from_topic = ao2_bump(from_topic);
00906    forward->to_topic = ao2_bump(to_topic);
00907 
00908    topic_lock_both(to_topic, from_topic);
00909    res = AST_VECTOR_APPEND(&to_topic->upstream_topics, from_topic);
00910    if (res != 0) {
00911       ao2_unlock(from_topic);
00912       ao2_unlock(to_topic);
00913       return NULL;
00914    }
00915 
00916    for (idx = 0; idx < AST_VECTOR_SIZE(&to_topic->subscribers); ++idx) {
00917       topic_add_subscription(from_topic, AST_VECTOR_GET(&to_topic->subscribers, idx));
00918    }
00919    ao2_unlock(from_topic);
00920    ao2_unlock(to_topic);
00921 
00922    return ao2_bump(forward);
00923 }

struct stasis_forward* stasis_forward_cancel ( struct stasis_forward forward  )  [read]

Definition at line 854 of file stasis.c.

References ao2_cleanup, ao2_unlock, AST_VECTOR_ELEM_CLEANUP_NOOP, AST_VECTOR_GET, AST_VECTOR_REMOVE_ELEM_UNORDERED, AST_VECTOR_SIZE, stasis_forward::from_topic, NULL, stasis_forward::to_topic, topic_lock_both, and topic_remove_subscription().

Referenced by all_dtor(), ari_bridges_play_new(), ast_ari_bridges_record(), ast_channel_forward_endpoint(), ast_channel_internal_cleanup(), ast_endpoint_shutdown(), AST_TEST_DEFINE(), bridge_channel_control_thread(), cleanup_module(), destroy_subscriptions(), forwards_create_bridge(), forwards_create_channel(), forwards_create_endpoint(), forwards_unsubscribe(), load_general_config(), load_module(), manager_bridging_cleanup(), manager_channels_shutdown(), manager_mwi_shutdown(), manager_shutdown(), manager_system_shutdown(), stasis_cp_single_unsubscribe(), topic_pool_entry_dtor(), and unload_module().

00855 {
00856    int idx;
00857    struct stasis_topic *from;
00858    struct stasis_topic *to;
00859 
00860    if (!forward) {
00861       return NULL;
00862    }
00863 
00864    from = forward->from_topic;
00865    to = forward->to_topic;
00866 
00867    if (from && to) {
00868       topic_lock_both(to, from);
00869       AST_VECTOR_REMOVE_ELEM_UNORDERED(&to->upstream_topics, from,
00870          AST_VECTOR_ELEM_CLEANUP_NOOP);
00871 
00872       for (idx = 0; idx < AST_VECTOR_SIZE(&to->subscribers); ++idx) {
00873          topic_remove_subscription(from, AST_VECTOR_GET(&to->subscribers, idx));
00874       }
00875       ao2_unlock(from);
00876       ao2_unlock(to);
00877    }
00878 
00879    ao2_cleanup(forward);
00880 
00881    return NULL;
00882 }

struct stasis_message* stasis_message_create ( struct stasis_message_type type,
void *  data 
) [read]

Create a new message.

This message is an ao2 object, and must be ao2_cleanup()'ed when you are done with it. Messages are also immutable, and must not be modified after they are initialized. Especially the data in the message.

Parameters:
type Type of the message
data Immutable data that is the actual contents of the message
Returns:
New message

NULL on error

Since:
12
Examples:
/tmp/asterisk-trunk/trunk/main/app.c.

Definition at line 136 of file stasis_message.c.

References ast_eid_default, and stasis_message_create_full().

Referenced by aoc_publish_blob(), ast_bridge_blob_create(), ast_bridge_publish_attended_transfer(), ast_bridge_publish_blind_transfer(), ast_bridge_publish_merge(), ast_bridge_publish_state(), ast_cdr_engine_term(), ast_channel_publish_dial_internal(), ast_channel_publish_snapshot(), ast_endpoint_blob_create(), ast_manager_publish_event(), ast_multi_object_blob_single_channel_publish(), ast_mwi_blob_create(), ast_publish_channel_state(), ast_rtp_publish_rtcp_message(), ast_system_publish_registry(), AST_TEST_DEFINE(), bridge_publish_state_from_blob(), cc_publish(), cdr_prop_write(), cdr_read(), cdr_write(), create_bridge_snapshot_message(), create_channel_blob_message(), create_channel_snapshot_message(), create_endpoint_snapshot_message(), endpoint_publish_snapshot(), forkcdr_exec(), handle_security_event(), local_optimization_finished_cb(), local_optimization_started_cb(), presence_state_event(), publish_acl_change(), publish_app_cdr_message(), publish_chanspy_message(), publish_corosync_ping_to_stasis(), publish_format_update(), publish_local_bridge_message(), publish_parked_call(), publish_parked_call_failure(), publish_reload_message(), queue_publish_member_blob(), queue_publish_multi_channel_snapshot_blob(), realtime_exec(), send_call_pickup_stasis_message(), send_start_msg_snapshots(), send_subscription_subscribe(), send_subscription_unsubscribe(), stasis_app_user_event(), stasis_cache_clear_create(), stasis_test_message_create(), stun_monitor_request(), and update_create().

00137 {
00138    return stasis_message_create_full(type, data, &ast_eid_default);
00139 }

struct stasis_message* stasis_message_create_full ( struct stasis_message_type type,
void *  data,
const struct ast_eid eid 
) [read]

Create a new message for an entity.

This message is an ao2 object, and must be ao2_cleanup()'ed when you are done with it. Messages are also immutable, and must not be modified after they are initialized. Especially the data in the message.

Parameters:
type Type of the message
data Immutable data that is the actual contents of the message
eid What entity originated this message. (NULL for aggregate)
Note:
An aggregate message is a combined representation of the local and remote entities publishing the message data. e.g., An aggregate device state represents the combined device state from the local and any remote entities publishing state for a device. e.g., An aggregate MWI message is the old/new MWI counts accumulated from the local and any remote entities publishing to a mailbox.
Return values:
New message
\c NULL on error
Since:
12.2.0
Examples:
/tmp/asterisk-trunk/trunk/main/app.c.

Definition at line 110 of file stasis_message.c.

References ao2_ref, ao2_t_alloc, ast_tvnow(), stasis_message::data, stasis_message::eid, stasis_message::eid_ptr, stasis_message_type::name, NULL, stasis_message_dtor(), stasis_message::timestamp, and stasis_message::type.

Referenced by ast_publish_device_state_full(), AST_TEST_DEFINE(), cache_test_message_create_full(), device_state_aggregate_calc(), mwi_state_create_message(), and stasis_message_create().

00111 {
00112    struct stasis_message *message;
00113 
00114    if (type == NULL || data == NULL) {
00115       return NULL;
00116    }
00117 
00118    message = ao2_t_alloc(sizeof(*message), stasis_message_dtor, type->name);
00119    if (message == NULL) {
00120       return NULL;
00121    }
00122 
00123    message->timestamp = ast_tvnow();
00124    ao2_ref(type, +1);
00125    message->type = type;
00126    ao2_ref(data, +1);
00127    message->data = data;
00128    if (eid) {
00129       message->eid_ptr = &message->eid;
00130       message->eid = *eid;
00131    }
00132 
00133    return message;
00134 }

void* stasis_message_data ( const struct stasis_message msg  ) 

Get the data contained in a message.

Parameters:
msg Message.
Returns:
Immutable data pointer

NULL if msg is NULL.

Since:
12
Examples:
/tmp/asterisk-trunk/trunk/main/app.c.

Definition at line 157 of file stasis_message.c.

References stasis_message::data, and NULL.

Referenced by action_coreshowchannels(), agent_login_to_ami(), agent_logoff_to_ami(), agi_channel_to_ami(), aoc_to_ami(), appcdr_callback(), ast_ari_bridges_list(), ast_ari_channels_get(), ast_ari_channels_list(), ast_ari_endpoints_list(), ast_ari_endpoints_list_by_tech(), ast_bridge_merge_message_to_json(), ast_bridge_publish_enter(), ast_bridge_publish_leave(), ast_bridge_snapshot_get_latest(), ast_channel_entered_bridge_to_json(), ast_channel_left_bridge_to_json(), ast_channel_snapshot_get_latest(), ast_channel_snapshot_get_latest_by_name(), ast_complete_channels(), ast_delete_mwi_state_full(), ast_endpoint_latest_snapshot(), ast_publish_mwi_state_full(), AST_TEST_DEFINE(), asterisk_publisher_devstate_cb(), asterisk_publisher_mwistate_cb(), attended_transfer_to_ami(), attended_transfer_to_json(), blind_transfer_to_ami(), blind_transfer_to_json(), bridge_attended_transfer_handler(), bridge_blind_transfer_handler(), bridge_merge_cb(), bridge_merge_handler(), bridge_show_specific_print_channel(), bridge_snapshot_get_id(), bridge_snapshot_update(), cache_test_aggregate_calc_fn(), cache_test_data_id(), cache_update(), caching_topic_exec(), call_forwarded_handler(), call_pickup_to_ami(), cc_available_to_ami(), cc_callerrecalling_to_ami(), cc_callerstartmonitoring_to_ami(), cc_callerstopmonitoring_to_ami(), cc_failure_to_ami(), cc_monitorfailed_to_ami(), cc_offertimerstart_to_ami(), cc_recallcomplete_to_ami(), cc_requestacknowledged_to_ami(), cc_requested_to_ami(), cdr_prop_write_callback(), cdr_read_callback(), cdr_write_callback(), cel_attended_transfer_cb(), cel_blind_transfer_cb(), cel_bridge_enter_cb(), cel_bridge_leave_cb(), cel_dial_cb(), cel_generic_cb(), cel_local_cb(), cel_parking_cb(), cel_pickup_cb(), cel_snapshot_update_cb(), chan_pjsip_devicestate(), channel_blob_to_json(), channel_chanspy_start_cb(), channel_chanspy_stop_cb(), channel_dial_cb(), channel_dtmf_begin_cb(), channel_dtmf_end_cb(), channel_enter_cb(), channel_fax_cb(), channel_hangup_handler_cb(), channel_hangup_request_cb(), channel_hold_cb(), channel_leave_cb(), channel_moh_start_cb(), channel_moh_stop_cb(), channel_monitor_start_cb(), channel_monitor_stop_cb(), channel_snapshot_get_id(), channel_snapshot_get_name(), channel_snapshot_update(), channel_unhold_cb(), check_cache_aggregate(), complete_bridge_stasis(), confbridge_publish_manager_event(), confbridge_talking_cb(), consumer_exec(), corosync_ping_to_event(), dahdichannel_to_ami(), device_state_aggregate_calc(), device_state_cb(), device_state_get_id(), devstate_cached(), devstate_change_cb(), devstate_to_ami(), devstate_to_event(), dial_to_json(), dtmf_end_to_json(), endpoint_cache_clear(), endpoint_snapshot_get_id(), fake_ami(), fake_json(), filter_bridge_type_cb(), find_route(), forkcdr_callback(), generic_agent_devstate_cb(), generic_monitor_devstate_cb(), get_admin_header(), get_cached_mwi(), get_message_count(), handle_attended_transfer(), handle_blind_transfer(), handle_bridge_enter(), handle_bridge_enter_message(), handle_bridge_leave_message(), handle_bridge_show_all(), handle_bridge_show_specific(), handle_chanlist(), handle_channel_cache_message(), handle_channelstatus(), handle_dial_message(), handle_hangup(), handle_local_optimization_begin(), handle_local_optimization_end(), handle_parked_call_message(), has_voicemail(), hold_to_json(), local_message_to_ami(), manager_bridge_info(), manager_generic_msg_cb(), meetme_stasis_cb(), moh_post_start(), moh_post_stop(), multi_user_event_to_ami(), multi_user_event_to_json(), mwi_app_event_cb(), mwi_event_cb(), mwi_state_create_message(), mwi_state_get_id(), mwi_to_event(), mwi_update_cb(), park_announce_update_cb(), parker_update_cb(), parking_event_cb(), peerstatus_to_ami(), playback_to_json(), presence_state_cached(), presence_state_cb(), presence_state_get_id(), presence_state_to_ami(), queue_agent_cb(), queue_channel_to_ami(), queue_member_to_ami(), queue_multi_channel_to_ami(), recording_to_json(), refer_progress_bridge(), remove_device_states_cb(), rtcp_report_to_ami(), rtcp_report_to_json(), security_event_to_ami(), security_stasis_cb(), send_bridge_info_item_cb(), send_bridge_list_item_cb(), session_timeout_to_ami(), stasis_app_control_get_snapshot(), stasis_end_to_json(), stasis_start_to_json(), stasis_subscription_final_message(), sub_bridge_update_handler(), sub_channel_update_handler(), sub_endpoint_update_handler(), subscription_persistence_event_cb(), system_registry_to_ami(), talking_start_to_ami(), talking_stop_to_ami(), unhold_to_json(), unistim_send_mwi_to_peer(), update_registry(), updates(), varset_to_ami(), xmpp_pubsub_devstate_cb(), and xmpp_pubsub_mwi_cb().

00158 {
00159    if (msg == NULL) {
00160       return NULL;
00161    }
00162    return msg->data;
00163 }

struct ast_eid* stasis_message_eid ( const struct stasis_message msg  )  [read]

Get the entity id for a stasis_message.

Since:
12.2.0
Parameters:
msg Message to get eid.
Return values:
Entity id of msg
\c NULL if msg is an aggregate or msg is NULL.

Definition at line 141 of file stasis_message.c.

References stasis_message::eid_ptr, and NULL.

Referenced by AST_TEST_DEFINE(), cache_entry_by_eid(), cache_entry_create(), cache_remove(), cache_udpate(), and caching_topic_exec().

00142 {
00143    if (msg == NULL) {
00144       return NULL;
00145    }
00146    return msg->eid_ptr;
00147 }

struct timeval* stasis_message_timestamp ( const struct stasis_message msg  )  [read]

struct ast_manager_event_blob* stasis_message_to_ami ( struct stasis_message message  )  [read]

Build the AMI representation of the message.

May return NULL, to indicate no representation. The returned object should be ao2_cleanup()'ed.

Parameters:
message Message to convert to AMI.
Returns:
NULL on error.

NULL if AMI format is not supported.

Definition at line 186 of file stasis_message.c.

References INVOKE_VIRTUAL, and to_ami().

Referenced by action_devicestatelist(), action_presencestatelist(), AST_TEST_DEFINE(), and manager_default_msg_cb().

00187 {
00188    return INVOKE_VIRTUAL(to_ami, msg);
00189 }

struct ast_event* stasis_message_to_event ( struct stasis_message message  )  [read]

Build the Generic event system representation of the message.

May return NULL, to indicate no representation. The returned object should be disposed of via ast_event_destroy.

Parameters:
message Message to convert to AMI.
Returns:
NULL on error.

NULL if AMI format is not supported.

Definition at line 198 of file stasis_message.c.

References INVOKE_VIRTUAL.

Referenced by publish_to_corosync().

00199 {
00200    return INVOKE_VIRTUAL(to_event, msg);
00201 }

struct ast_json* stasis_message_to_json ( struct stasis_message message,
struct stasis_message_sanitizer sanitize 
) [read]

Build the JSON representation of the message.

May return NULL, to indicate no representation. The returned object should be ast_json_unref()'ed.

Parameters:
message Message to convert to JSON string.
sanitize Snapshot sanitization callback.
Returns:
Newly allocated string with JSON message.

NULL on error.

NULL if JSON format is not supported.

Definition at line 191 of file stasis_message.c.

References INVOKE_VIRTUAL.

Referenced by AST_TEST_DEFINE(), rtcp_message_handler(), and sub_default_handler().

00194 {
00195    return INVOKE_VIRTUAL(to_json, msg, sanitize);
00196 }

struct stasis_message_type* stasis_message_type ( const struct stasis_message msg  )  [read]

Get the message type for a stasis_message.

Parameters:
msg Message to type
Returns:
Type of msg

NULL if msg is NULL.

Since:
12
Examples:
/tmp/asterisk-trunk/trunk/main/app.c.

Definition at line 149 of file stasis_message.c.

References NULL, and stasis_message::type.

Referenced by acl_change_stasis_cb(), appcdr_callback(), AST_TEST_DEFINE(), asterisk_publisher_devstate_cb(), asterisk_publisher_mwistate_cb(), bridge_snapshot_get_id(), cache_put(), cache_simple(), cache_test_aggregate_calc_fn(), cache_test_data_id(), cache_update(), caching_topic_exec(), cdr_prop_write_callback(), cdr_read_callback(), cdr_write_callback(), channel_snapshot_get_id(), channel_snapshot_get_name(), device_state_cb(), device_state_get_id(), devstate_change_cb(), endpoint_cache_clear(), endpoint_snapshot_get_id(), find_route(), forkcdr_callback(), generic_agent_devstate_cb(), generic_monitor_devstate_cb(), local_message_to_ami(), meetme_stasis_cb(), message_sink_cb(), mwi_event_cb(), mwi_stasis_cb(), mwi_state_get_id(), mwi_update_cb(), network_change_stasis_cb(), parker_update_cb(), parking_event_cb(), presence_state_cb(), presence_state_get_id(), queue_agent_cb(), refer_progress_bridge(), rtcp_report_to_ami(), rtp_topic_handler(), security_event_to_ami(), security_stasis_cb(), stasis_subscription_final_message(), statsmaker(), sub_bridge_update_handler(), sub_channel_update_handler(), sub_default_handler(), sub_endpoint_update_handler(), subscription_persistence_event_cb(), update_create(), xmpp_pubsub_devstate_cb(), and xmpp_pubsub_mwi_cb().

00150 {
00151    if (msg == NULL) {
00152       return NULL;
00153    }
00154    return msg->type;
00155 }

enum stasis_message_type_result stasis_message_type_create ( const char *  name,
struct stasis_message_vtable vtable,
struct stasis_message_type **  result 
)

Create a new message type.

stasis_message_type is an AO2 object, so ao2_cleanup() when you're done with it.

Parameters:
name Name of the new type.
vtable Virtual table of message methods. May be NULL.
[out] result The location where the new message type will be placed
Note:
Stasis message type creation may be declined if the message type is disabled
Returns:
A stasis_message_type_result enum
Since:
12

Definition at line 53 of file stasis_message.c.

References ao2_cleanup, ao2_t_alloc, ast_strdup, message_type_dtor(), stasis_message_type::name, STASIS_MESSAGE_TYPE_DECLINED, stasis_message_type_declined(), STASIS_MESSAGE_TYPE_ERROR, STASIS_MESSAGE_TYPE_SUCCESS, type, and stasis_message_type::vtable.

Referenced by AST_TEST_DEFINE().

00056 {
00057    struct stasis_message_type *type;
00058 
00059    /* Check for declination */
00060    if (name && stasis_message_type_declined(name)) {
00061       return STASIS_MESSAGE_TYPE_DECLINED;
00062    }
00063 
00064    type = ao2_t_alloc(sizeof(*type), message_type_dtor, name);
00065    if (!type) {
00066       return STASIS_MESSAGE_TYPE_ERROR;
00067    }
00068    if (!vtable) {
00069       /* Null object pattern, FTW! */
00070       vtable = &null_vtable;
00071    }
00072 
00073    type->name = ast_strdup(name);
00074    if (!type->name) {
00075       ao2_cleanup(type);
00076       return STASIS_MESSAGE_TYPE_ERROR;
00077    }
00078    type->vtable = vtable;
00079    *result = type;
00080 
00081    return STASIS_MESSAGE_TYPE_SUCCESS;
00082 }

int stasis_message_type_declined ( const char *  name  ) 

Check whether a message type is declined.

Parameters:
name The name of the message type to check
Return values:
zero The message type is not declined
non-zero The message type is declined

Definition at line 1491 of file stasis.c.

References ao2_cleanup, ao2_find, ao2_global_obj_ref, ast_log, globals, LOG_NOTICE, OBJ_SEARCH_KEY, and RAII_VAR.

Referenced by stasis_message_type_create().

01492 {
01493    RAII_VAR(struct stasis_config *, cfg, ao2_global_obj_ref(globals), ao2_cleanup);
01494    char *name_in_declined;
01495    int res;
01496 
01497    if (!cfg || !cfg->declined_message_types) {
01498       return 0;
01499    }
01500 
01501    name_in_declined = ao2_find(cfg->declined_message_types->declined, name, OBJ_SEARCH_KEY);
01502    res = name_in_declined ? 1 : 0;
01503    ao2_cleanup(name_in_declined);
01504    if (res) {
01505       ast_log(LOG_NOTICE, "Declining to allocate Stasis message type '%s' due to configuration\n", name);
01506    }
01507    return res;
01508 }

const char* stasis_message_type_name ( const struct stasis_message_type type  ) 

Gets the name of a given message type.

Parameters:
type The type to get.
Returns:
Name of the type.

NULL if type is NULL.

Since:
12

Definition at line 84 of file stasis_message.c.

References stasis_message_type::name.

Referenced by AST_TEST_DEFINE(), cache_entry_compute_hash(), cache_find(), cache_simple(), cache_test_data_id(), caching_topic_exec(), and statsmaker().

00085 {
00086    return type->name;
00087 }

void stasis_publish ( struct stasis_topic topic,
struct stasis_message message 
)

Publish a message to a topic's subscribers.

Parameters:
topic Topic.
message Message to publish.
This call is asynchronous and will return immediately upon queueing the message for delivery to the topic's subscribers.

Since:
12
Examples:
/tmp/asterisk-trunk/trunk/main/app.c.

Definition at line 817 of file stasis.c.

References NULL, and publish_msg().

Referenced by aoc_publish_blob(), app_send_end_msg(), ast_bridge_publish_attended_transfer(), ast_bridge_publish_blind_transfer(), ast_bridge_publish_enter(), ast_bridge_publish_leave(), ast_bridge_publish_merge(), ast_bridge_publish_state(), ast_cel_publish_event(), ast_channel_publish_blob(), ast_channel_publish_cached_blob(), ast_channel_publish_snapshot(), ast_delete_mwi_state_full(), ast_device_state_clear_cache(), ast_endpoint_blob_publish(), ast_endpoint_shutdown(), ast_manager_publish_event(), ast_monitor_start(), ast_monitor_stop(), ast_multi_object_blob_single_channel_publish(), ast_publish_channel_state(), ast_publish_device_state_full(), ast_publish_mwi_state_full(), ast_rtp_publish_rtcp_message(), ast_system_publish_registry(), AST_TEST_DEFINE(), bridge_attended_transfer_handler(), bridge_blind_transfer_handler(), bridge_merge_handler(), bridge_publish_state_from_blob(), cache_test_aggregate_publish_fn(), caching_topic_exec(), cc_publish(), destroy_bridge(), device_state_aggregate_publish(), endpoint_publish_snapshot(), endpoint_state_cb(), handle_security_event(), local_optimization_finished_cb(), local_optimization_started_cb(), meetme_stasis_generate_msg(), moh_post_start(), moh_post_stop(), notify_new_message(), phase_e_handler(), presence_state_event(), publish_acl_change(), publish_cache_clear(), publish_chanspy_message(), publish_corosync_ping_to_stasis(), publish_format_update(), publish_local_bridge_message(), publish_message_for_channel_topics(), publish_parked_call(), publish_parked_call_failure(), publish_reload_message(), queue_publish_member_blob(), queue_publish_multi_channel_snapshot_blob(), realtime_exec(), remove_device_states_cb(), report_fax_status(), report_receive_fax_status(), report_send_fax_status(), send_call_pickup_stasis_message(), send_conf_stasis(), send_start_msg_snapshots(), send_subscription_subscribe(), send_subscription_unsubscribe(), stasis_app_control_publish(), stasis_app_user_event(), stun_monitor_request(), and talk_detect_audiohook_cb().

00818 {
00819    publish_msg(topic, message, NULL);
00820 }

void stasis_publish_sync ( struct stasis_subscription sub,
struct stasis_message message 
)

Publish a message to a topic's subscribers, synchronizing on the specified subscriber.

Parameters:
sub Subscription to synchronize on.
message Message to publish.
The caller of stasis_publish_sync will block until the specified subscriber completes handling of the message.

All other subscribers to the topic the stasis_subpscription is subscribed to are also delivered the message; this delivery however happens asynchronously.

Since:
12.1.0

Definition at line 822 of file stasis.c.

References ast_assert, NULL, publish_msg(), and stasis_subscription::topic.

Referenced by AST_TEST_DEFINE(), and stasis_message_router_publish_sync().

00823 {
00824    ast_assert(sub != NULL);
00825 
00826    publish_msg(sub->topic, message, sub);
00827 }

struct stasis_subscription* stasis_subscribe ( struct stasis_topic topic,
stasis_subscription_cb  callback,
void *  data 
) [read]

Create a subscription.

In addition to being AO2 managed memory (requiring an ao2_cleanup() to free up this reference), the subscription must be explicitly unsubscribed from its topic using stasis_unsubscribe().

The invocations of the callback are serialized, but may not always occur on the same thread. The invocation order of different subscriptions is unspecified.

Parameters:
topic Topic to subscribe to.
callback Callback function for subscription messages.
data Data to be passed to the callback, in addition to the message.
Returns:
New stasis_subscription object.

NULL on error.

Since:
12

Definition at line 501 of file stasis.c.

References internal_stasis_subscribe().

Referenced by acl_change_stasis_subscribe(), AST_TEST_DEFINE(), asterisk_start_devicestate_publishing(), asterisk_start_mwi_publishing(), cc_generic_agent_start_monitoring(), create_new_generic_list(), devstate_init(), load_module(), load_pbx(), network_change_stasis_subscribe(), parking_manager_enable_stasis(), start_poll_thread(), stasis_message_router_create_internal(), and xmpp_init_event_distribution().

00505 {
00506    return internal_stasis_subscribe(topic, callback, data, 1, 0);
00507 }

struct stasis_subscription* stasis_subscribe_pool ( struct stasis_topic topic,
stasis_subscription_cb  callback,
void *  data 
) [read]

Create a subscription whose callbacks occur on a thread pool.

In addition to being AO2 managed memory (requiring an ao2_cleanup() to free up this reference), the subscription must be explicitly unsubscribed from its topic using stasis_unsubscribe().

The invocations of the callback are serialized, but will almost certainly not always happen on the same thread. The invocation order of different subscriptions is unspecified.

Unlike stasis_subscribe, this function will explicitly use a threadpool to dispatch items to its callback. This form of subscription should be used when many subscriptions may be made to the specified topic.

Parameters:
topic Topic to subscribe to.
callback Callback function for subscription messages.
data Data to be passed to the callback, in addition to the message.
Returns:
New stasis_subscription object.

NULL on error.

Since:
12.8.0

Definition at line 509 of file stasis.c.

References internal_stasis_subscribe().

Referenced by add_peer_mwi_subs(), AST_TEST_DEFINE(), build_gateway(), build_peer(), config_line(), create_parked_subscription_full(), load_module(), mkintf(), mwi_stasis_subscription_alloc(), park_and_announce_app_exec(), refer_blind_callback(), stasis_message_router_create_internal(), subscribe_device_state(), and xmpp_init_event_distribution().

00513 {
00514    return internal_stasis_subscribe(topic, callback, data, 1, 1);
00515 }

int stasis_subscription_final_message ( struct stasis_subscription sub,
struct stasis_message msg 
)

Determine whether a message is the final message to be received on a subscription.

Parameters:
sub Subscription on which the message was received.
msg Message to check.
Returns:
zero if the provided message is not the final message.

non-zero if the provided message is the final message.

Since:
12

Definition at line 619 of file stasis.c.

References stasis_subscription_change::description, stasis_message_data(), stasis_message_type(), stasis_subscription_change_type(), stasis_subscription_uniqueid(), and stasis_subscription_change::uniqueid.

Referenced by bridge_default_handler(), caching_topic_exec(), consumer_exec(), consumer_exec_sync(), consumer_finalize(), default_route(), endpoint_default(), generic_agent_devstate_cb(), message_sink_cb(), mwi_event_cb(), mwi_stasis_cb(), park_announce_update_cb(), parker_update_cb(), queue_bridge_cb(), queue_channel_cb(), refer_progress_bridge(), router_dispatch(), statsmaker(), sub_default_handler(), and subscription_invoke().

00620 {
00621    struct stasis_subscription_change *change;
00622 
00623    if (stasis_message_type(msg) != stasis_subscription_change_type()) {
00624       return 0;
00625    }
00626 
00627    change = stasis_message_data(msg);
00628    if (strcmp("Unsubscribe", change->description)) {
00629       return 0;
00630    }
00631 
00632    if (strcmp(stasis_subscription_uniqueid(sub), change->uniqueid)) {
00633       return 0;
00634    }
00635 
00636    return 1;
00637 }

int stasis_subscription_is_done ( struct stasis_subscription subscription  ) 

Returns whether subscription has received its final message.

Note that a subscription is considered done even while the stasis_subscription_final_message() is being processed. This allows cleanup routines to check the status of the subscription.

Parameters:
subscription Subscription.
Returns:
True (non-zero) if stasis_subscription_final_message() has been received.

False (zero) if waiting for the end.

Definition at line 569 of file stasis.c.

References stasis_subscription::final_message_rxed, lock, and SCOPED_AO2LOCK.

Referenced by router_dtor(), stasis_caching_topic_dtor(), stasis_message_router_is_done(), and subscription_dtor().

00570 {
00571    if (subscription) {
00572       SCOPED_AO2LOCK(lock, subscription);
00573 
00574       return subscription->final_message_rxed;
00575    }
00576 
00577    /* Null subscription is about as done as you can get */
00578    return 1;
00579 }

int stasis_subscription_is_subscribed ( const struct stasis_subscription sub  ) 

Returns whether a subscription is currently subscribed.

Note that there may still be messages queued up to be dispatched to this subscription, but the stasis_subscription_final_message() has been enqueued.

Parameters:
sub Subscription to check
Returns:
False (zero) if subscription is not subscribed.

True (non-zero) if still subscribed.

Definition at line 597 of file stasis.c.

References AST_VECTOR_GET, AST_VECTOR_SIZE, SCOPED_AO2LOCK, and stasis_subscription::topic.

Referenced by asterisk_publisher_devstate_cb(), asterisk_publisher_mwistate_cb(), router_dtor(), send_subscription_subscribe(), send_subscription_unsubscribe(), stasis_caching_topic_dtor(), stasis_caching_unsubscribe(), subscription_dtor(), xmpp_pubsub_devstate_cb(), and xmpp_pubsub_mwi_cb().

00598 {
00599    if (sub) {
00600       size_t i;
00601       struct stasis_topic *topic = sub->topic;
00602       SCOPED_AO2LOCK(lock_topic, topic);
00603 
00604       for (i = 0; i < AST_VECTOR_SIZE(&topic->subscribers); ++i) {
00605          if (AST_VECTOR_GET(&topic->subscribers, i) == sub) {
00606             return 1;
00607          }
00608       }
00609    }
00610 
00611    return 0;
00612 }

void stasis_subscription_join ( struct stasis_subscription subscription  ) 

Block until the last message is processed on a subscription.

This function will not return until the subscription's callback for the stasis_subscription_final_message() completes. This allows cleanup routines to run before unblocking the joining thread.

Parameters:
subscription Subscription to block on.
Since:
12

Definition at line 556 of file stasis.c.

References ao2_object_get_lockaddr(), ast_cond_wait, stasis_subscription::final_message_processed, stasis_subscription::join_cond, lock, and SCOPED_AO2LOCK.

Referenced by stasis_caching_unsubscribe_and_join(), and stasis_unsubscribe_and_join().

00557 {
00558    if (subscription) {
00559       SCOPED_AO2LOCK(lock, subscription);
00560 
00561       /* Wait until the processed flag has been set */
00562       while (!subscription->final_message_processed) {
00563          ast_cond_wait(&subscription->join_cond,
00564             ao2_object_get_lockaddr(subscription));
00565       }
00566    }
00567 }

const char* stasis_subscription_uniqueid ( const struct stasis_subscription sub  ) 

Get the unique ID for the subscription.

Parameters:
sub Subscription for which to get the unique ID.
Returns:
Unique ID for the subscription.
Since:
12

Definition at line 614 of file stasis.c.

References stasis_subscription::uniqueid.

Referenced by AST_TEST_DEFINE(), and stasis_subscription_final_message().

00615 {
00616    return sub->uniqueid;
00617 }

struct stasis_topic* stasis_topic_create ( const char *  name  )  [read]

Create a new topic.

Parameters:
name Name of the new topic.
Returns:
New topic instance.

NULL on error.

Since:
12
Examples:
/tmp/asterisk-trunk/trunk/main/app.c.

Definition at line 348 of file stasis.c.

References ao2_cleanup, ao2_t_alloc, ast_strdup, AST_VECTOR_INIT, INITIAL_SUBSCRIBERS_MAX, stasis_topic::name, NULL, and topic_dtor().

Referenced by __init_manager(), app_create(), app_init(), ast_cdr_engine_init(), ast_parking_stasis_init(), ast_presence_state_engine_init(), ast_rtp_engine_init(), ast_security_stasis_init(), ast_stasis_system_init(), AST_TEST_DEFINE(), ast_test_init(), create_subscriptions(), devstate_init(), load_module(), stasis_caching_topic_create(), stasis_cp_all_create(), stasis_cp_single_create(), and stasis_topic_pool_get_topic().

00349 {
00350    struct stasis_topic *topic;
00351    int res = 0;
00352 
00353    topic = ao2_t_alloc(sizeof(*topic), topic_dtor, name);
00354    if (!topic) {
00355       return NULL;
00356    }
00357 
00358    topic->name = ast_strdup(name);
00359    res |= AST_VECTOR_INIT(&topic->subscribers, INITIAL_SUBSCRIBERS_MAX);
00360    res |= AST_VECTOR_INIT(&topic->upstream_topics, 0);
00361    if (!topic->name || res) {
00362       ao2_cleanup(topic);
00363       return NULL;
00364    }
00365 
00366    return topic;
00367 }

const char* stasis_topic_name ( const struct stasis_topic topic  ) 

Return the name of a topic.

Parameters:
topic Topic.
Returns:
Name of the topic.

NULL if topic is NULL.

Since:
12

Definition at line 369 of file stasis.c.

References stasis_topic::name.

Referenced by caching_topic_exec(), mwi_sub_event_cb(), stasis_caching_topic_create(), stasis_message_router_create_internal(), topic_pool_entry_cmp(), and topic_pool_entry_hash().

00370 {
00371    return topic->name;
00372 }

struct stasis_subscription* stasis_unsubscribe ( struct stasis_subscription subscription  )  [read]

Cancel a subscription.

Note that in an asynchronous system, there may still be messages queued or in transit to the subscription's callback. These will still be delivered. There will be a final 'SubscriptionCancelled' message, indicating the delivery of the final message.

Parameters:
subscription Subscription to cancel.
Returns:
NULL for convenience
Since:
12

Definition at line 524 of file stasis.c.

References ao2_bump, ao2_cleanup, ast_log, ast_taskprocessor_push(), LOG_ERROR, stasis_subscription::mailbox, NULL, RAII_VAR, send_subscription_unsubscribe(), sub_cleanup(), stasis_subscription::topic, and topic_remove_subscription().

Referenced by add_peer_mwi_subs(), AST_TEST_DEFINE(), ast_xmpp_client_disconnect(), cc_generic_agent_destructor(), destroy_dahdi_pvt(), destroy_endpoint(), destroy_mailbox(), device_state_subscription_destroy(), generic_agent_devstate_cb(), generic_monitor_instance_list_destructor(), park_and_announce_app_exec(), parked_subscription_datastore_destroy(), peer_destructor(), refer_progress_bridge(), refer_progress_destroy(), refer_progress_framehook_destroy(), skinny_reload(), stasis_caching_unsubscribe(), stasis_message_router_unsubscribe(), stasis_unsubscribe_and_join(), subscription_persistence_event_cb(), unload_module(), unsubscribe_stasis(), and xmpp_init_event_distribution().

00525 {
00526    /* The subscription may be the last ref to this topic. Hold
00527     * the topic ref open until after the unlock. */
00528    RAII_VAR(struct stasis_topic *, topic,
00529       ao2_bump(sub ? sub->topic : NULL), ao2_cleanup);
00530 
00531    if (!sub) {
00532       return NULL;
00533    }
00534 
00535    /* We have to remove the subscription first, to ensure the unsubscribe
00536     * is the final message */
00537    if (topic_remove_subscription(sub->topic, sub) != 0) {
00538       ast_log(LOG_ERROR,
00539          "Internal error: subscription has invalid topic\n");
00540       return NULL;
00541    }
00542 
00543    /* Now let everyone know about the unsubscribe */
00544    send_subscription_unsubscribe(topic, sub);
00545 
00546    /* When all that's done, remove the ref the mailbox has on the sub */
00547    if (sub->mailbox) {
00548       ast_taskprocessor_push(sub->mailbox, sub_cleanup, sub);
00549    }
00550 
00551    /* Unsubscribing unrefs the subscription */
00552    ao2_cleanup(sub);
00553    return NULL;
00554 }

struct stasis_subscription* stasis_unsubscribe_and_join ( struct stasis_subscription subscription  )  [read]

Cancel a subscription, blocking until the last message is processed.

While normally it's recommended to stasis_unsubscribe() and wait for stasis_subscription_final_message(), there are times (like during a module unload) where you have to wait for the final message (otherwise you'll call a function in a shared module that no longer exists).

Parameters:
subscription Subscription to cancel.
Returns:
NULL for convenience
Since:
12

Definition at line 581 of file stasis.c.

References ao2_cleanup, ao2_ref, NULL, stasis_subscription_join(), and stasis_unsubscribe().

Referenced by acl_change_event_stasis_unsubscribe(), acl_change_stasis_unsubscribe(), asterisk_stop_devicestate_publishing(), asterisk_stop_mwi_publishing(), devstate_cleanup(), network_change_stasis_unsubscribe(), parking_manager_disable_stasis(), stasis_message_router_unsubscribe_and_join(), stop_poll_thread(), unload_module(), and unload_pbx().

00583 {
00584    if (!subscription) {
00585       return NULL;
00586    }
00587 
00588    /* Bump refcount to hold it past the unsubscribe */
00589    ao2_ref(subscription, +1);
00590    stasis_unsubscribe(subscription);
00591    stasis_subscription_join(subscription);
00592    /* Now decrement the refcount back */
00593    ao2_cleanup(subscription);
00594    return NULL;
00595 }


Generated on Thu Apr 16 06:33:55 2015 for Asterisk - The Open Source Telephony Project by  doxygen 1.5.6