Wed Oct 28 11:51:08 2009

Asterisk developer's documentation


res_timing_pthread.c

Go to the documentation of this file.
00001 /*
00002  * Asterisk -- An open source telephony toolkit.
00003  *
00004  * Copyright (C) 2008, Digium, Inc.
00005  *
00006  * Russell Bryant <russell@digium.com>
00007  *
00008  * See http://www.asterisk.org for more information about
00009  * the Asterisk project. Please do not directly contact
00010  * any of the maintainers of this project for assistance;
00011  * the project provides a web site, mailing lists and IRC
00012  * channels for your use.
00013  *
00014  * This program is free software, distributed under the terms of
00015  * the GNU General Public License Version 2. See the LICENSE file
00016  * at the top of the source tree.
00017  */
00018 
00019 /*!
00020  * \file
00021  * \author Russell Bryant <russell@digium.com>
00022  *
00023  * \brief pthread timing interface
00024  */
00025 
00026 #include "asterisk.h"
00027 
00028 ASTERISK_FILE_VERSION(__FILE__, "$Revision: 199745 $");
00029 
00030 #include <math.h>
00031 #include <sys/select.h>
00032 
00033 #include "asterisk/module.h"
00034 #include "asterisk/timing.h"
00035 #include "asterisk/utils.h"
00036 #include "asterisk/astobj2.h"
00037 #include "asterisk/time.h"
00038 #include "asterisk/lock.h"
00039 
00040 static void *timing_funcs_handle;
00041 
00042 static int pthread_timer_open(void);
00043 static void pthread_timer_close(int handle);
00044 static int pthread_timer_set_rate(int handle, unsigned int rate);
00045 static void pthread_timer_ack(int handle, unsigned int quantity);
00046 static int pthread_timer_enable_continuous(int handle);
00047 static int pthread_timer_disable_continuous(int handle);
00048 static enum ast_timer_event pthread_timer_get_event(int handle);
00049 static unsigned int pthread_timer_get_max_rate(int handle);
00050 
00051 static struct ast_timing_interface pthread_timing = {
00052    .name = "pthread",
00053    .priority = 0, /* use this as a last resort */
00054    .timer_open = pthread_timer_open,
00055    .timer_close = pthread_timer_close,
00056    .timer_set_rate = pthread_timer_set_rate,
00057    .timer_ack = pthread_timer_ack,
00058    .timer_enable_continuous = pthread_timer_enable_continuous,
00059    .timer_disable_continuous = pthread_timer_disable_continuous,
00060    .timer_get_event = pthread_timer_get_event,
00061    .timer_get_max_rate = pthread_timer_get_max_rate,
00062 };
00063 
00064 /* 1 tick / 10 ms */
00065 #define MAX_RATE 100
00066 
00067 static struct ao2_container *pthread_timers;
00068 #define PTHREAD_TIMER_BUCKETS 563
00069 
00070 enum {
00071    PIPE_READ =  0,
00072    PIPE_WRITE = 1
00073 };
00074 
00075 enum pthread_timer_state {
00076    TIMER_STATE_IDLE,
00077    TIMER_STATE_TICKING,
00078 };
00079 
00080 struct pthread_timer {
00081    int pipe[2];
00082    enum pthread_timer_state state;
00083    unsigned int rate;
00084    /*! Interval in ms for current rate */
00085    unsigned int interval;
00086    unsigned int tick_count;
00087    unsigned int pending_ticks;
00088    struct timeval start;
00089    unsigned int continuous:1;
00090 };
00091 
00092 static void pthread_timer_destructor(void *obj);
00093 static struct pthread_timer *find_timer(int handle, int unlinkobj);
00094 static void write_byte(struct pthread_timer *timer);
00095 static void read_pipe(struct pthread_timer *timer, unsigned int num);
00096 
00097 /*!
00098  * \brief Data for the timing thread
00099  */
00100 static struct {
00101    pthread_t thread;
00102    ast_mutex_t lock;
00103    ast_cond_t cond;
00104    unsigned int stop:1;
00105 } timing_thread;
00106 
00107 static int pthread_timer_open(void)
00108 {
00109    struct pthread_timer *timer;
00110    int fd;
00111 
00112    if (!(timer = ao2_alloc(sizeof(*timer), pthread_timer_destructor))) {
00113       errno = ENOMEM;
00114       return -1;
00115    }
00116 
00117    timer->pipe[PIPE_READ] = timer->pipe[PIPE_WRITE] = -1;
00118    timer->state = TIMER_STATE_IDLE;
00119 
00120    if (pipe(timer->pipe)) {
00121       ao2_ref(timer, -1);
00122       return -1;
00123    }
00124 
00125    ao2_lock(pthread_timers);
00126    if (!ao2_container_count(pthread_timers)) {
00127       ast_mutex_lock(&timing_thread.lock);
00128       ast_cond_signal(&timing_thread.cond);
00129       ast_mutex_unlock(&timing_thread.lock);
00130    }
00131    ao2_link(pthread_timers, timer);
00132    ao2_unlock(pthread_timers);
00133 
00134    fd = timer->pipe[PIPE_READ];
00135 
00136    ao2_ref(timer, -1);
00137 
00138    return fd;
00139 }
00140 
00141 static void pthread_timer_close(int handle)
00142 {
00143    struct pthread_timer *timer;
00144 
00145    if (!(timer = find_timer(handle, 1))) {
00146       return;
00147    }
00148 
00149    ao2_ref(timer, -1);
00150 }
00151 
00152 static int pthread_timer_set_rate(int handle, unsigned int rate)
00153 {
00154    struct pthread_timer *timer;
00155 
00156    if (!(timer = find_timer(handle, 0))) {
00157       errno = EINVAL;
00158       return -1;
00159    }
00160 
00161    if (rate > MAX_RATE) {
00162       ast_log(LOG_ERROR, "res_timing_pthread only supports timers at a "
00163             "max rate of %d / sec\n", MAX_RATE);
00164       errno = EINVAL;
00165       return -1;
00166    }
00167 
00168    ao2_lock(timer);
00169 
00170    if ((timer->rate = rate)) {
00171       timer->interval = roundf(1000.0 / ((float) rate));
00172       timer->start = ast_tvnow();
00173       timer->state = TIMER_STATE_TICKING;
00174    } else {
00175       timer->interval = 0;
00176       timer->start = ast_tv(0, 0);
00177       timer->state = TIMER_STATE_IDLE;
00178    }
00179    timer->tick_count = 0;
00180 
00181    ao2_unlock(timer);
00182 
00183    ao2_ref(timer, -1);
00184 
00185    return 0;
00186 }
00187 
00188 static void pthread_timer_ack(int handle, unsigned int quantity)
00189 {
00190    struct pthread_timer *timer;
00191 
00192    ast_assert(quantity > 0);
00193 
00194    if (!(timer = find_timer(handle, 0))) {
00195       return;
00196    }
00197 
00198    ao2_lock(timer);
00199    read_pipe(timer, quantity);
00200    ao2_unlock(timer);
00201 
00202    ao2_ref(timer, -1);
00203 }
00204 
00205 static int pthread_timer_enable_continuous(int handle)
00206 {
00207    struct pthread_timer *timer;
00208 
00209    if (!(timer = find_timer(handle, 0))) {
00210       errno = EINVAL;
00211       return -1;
00212    }
00213 
00214    ao2_lock(timer);
00215    if (!timer->continuous) {
00216       timer->continuous = 1;
00217       write_byte(timer);
00218    }
00219    ao2_unlock(timer);
00220 
00221    ao2_ref(timer, -1);
00222 
00223    return 0;
00224 }
00225 
00226 static int pthread_timer_disable_continuous(int handle)
00227 {
00228    struct pthread_timer *timer;
00229 
00230    if (!(timer = find_timer(handle, 0))) {
00231       errno = EINVAL;
00232       return -1;
00233    }
00234 
00235    ao2_lock(timer);
00236    if (timer->continuous) {
00237       timer->continuous = 0;
00238       read_pipe(timer, 1);
00239    }
00240    ao2_unlock(timer);
00241 
00242    ao2_ref(timer, -1);
00243 
00244    return 0;
00245 }
00246 
00247 static enum ast_timer_event pthread_timer_get_event(int handle)
00248 {
00249    struct pthread_timer *timer;
00250    enum ast_timer_event res = AST_TIMING_EVENT_EXPIRED;
00251 
00252    if (!(timer = find_timer(handle, 0))) {
00253       return res;
00254    }
00255 
00256    ao2_lock(timer);
00257    if (timer->continuous && timer->pending_ticks == 1) {
00258       res = AST_TIMING_EVENT_CONTINUOUS;
00259    }
00260    ao2_unlock(timer);
00261 
00262    ao2_ref(timer, -1);
00263 
00264    return res;
00265 }
00266 
00267 static unsigned int pthread_timer_get_max_rate(int handle)
00268 {
00269    return MAX_RATE;
00270 }
00271 
00272 static struct pthread_timer *find_timer(int handle, int unlinkobj)
00273 {
00274    struct pthread_timer *timer;
00275    struct pthread_timer tmp_timer;
00276    int flags = OBJ_POINTER;
00277 
00278    tmp_timer.pipe[PIPE_READ] = handle;
00279 
00280    if (unlinkobj) {
00281       flags |= OBJ_UNLINK;
00282    }
00283 
00284    if (!(timer = ao2_find(pthread_timers, &tmp_timer, flags))) {
00285       ast_assert(timer != NULL);
00286       return NULL;
00287    }
00288 
00289    return timer;
00290 }
00291 
00292 static void pthread_timer_destructor(void *obj)
00293 {
00294    struct pthread_timer *timer = obj;
00295 
00296    if (timer->pipe[PIPE_READ] > -1) {
00297       close(timer->pipe[PIPE_READ]);
00298       timer->pipe[PIPE_READ] = -1;
00299    }
00300 
00301    if (timer->pipe[PIPE_WRITE] > -1) {
00302       close(timer->pipe[PIPE_WRITE]);
00303       timer->pipe[PIPE_WRITE] = -1;
00304    }
00305 }
00306 
00307 /*!
00308  * \note only PIPE_READ is guaranteed valid
00309  */
00310 static int pthread_timer_hash(const void *obj, const int flags)
00311 {
00312    const struct pthread_timer *timer = obj;
00313 
00314    return timer->pipe[PIPE_READ];
00315 }
00316 
00317 /*!
00318  * \note only PIPE_READ is guaranteed valid
00319  */
00320 static int pthread_timer_cmp(void *obj, void *arg, int flags)
00321 {
00322    struct pthread_timer *timer1 = obj, *timer2 = arg;
00323 
00324    return (timer1->pipe[PIPE_READ] == timer2->pipe[PIPE_READ]) ? CMP_MATCH | CMP_STOP : 0;
00325 }
00326 
00327 /*!
00328  * \retval 0 no timer tick needed
00329  * \retval non-zero write to the timing pipe needed
00330  */
00331 static int check_timer(struct pthread_timer *timer)
00332 {
00333    struct timeval now;
00334 
00335    if (timer->state == TIMER_STATE_IDLE) {
00336       return 0;
00337    }
00338 
00339    now = ast_tvnow();
00340 
00341    if (timer->tick_count < (ast_tvdiff_ms(now, timer->start) / timer->interval)) {
00342       timer->tick_count++;
00343       if (!timer->tick_count) {
00344          /* Handle overflow. */
00345          timer->start = now;
00346       }
00347       return 1;
00348    }
00349 
00350    return 0;
00351 }
00352 
00353 /*!
00354  * \internal
00355  * \pre timer is locked
00356  */
00357 static void read_pipe(struct pthread_timer *timer, unsigned int quantity)
00358 {
00359    int rd_fd = timer->pipe[PIPE_READ];
00360    int pending_ticks = timer->pending_ticks;
00361 
00362    ast_assert(quantity);
00363 
00364    if (timer->continuous && pending_ticks) {
00365       pending_ticks--;
00366    }
00367 
00368    if (quantity > pending_ticks) {
00369       quantity = pending_ticks;
00370    }
00371 
00372    if (!quantity) {
00373       return;
00374    }
00375 
00376    do {
00377       unsigned char buf[1024];
00378       ssize_t res;
00379       fd_set rfds;
00380       struct timeval timeout = {
00381          .tv_sec = 0,
00382       };
00383 
00384       /* Make sure there is data to read */
00385       FD_ZERO(&rfds);
00386       FD_SET(rd_fd, &rfds);
00387 
00388       if (select(rd_fd + 1, &rfds, NULL, NULL, &timeout) != 1) {
00389          ast_debug(1, "Reading not available on timing pipe, "
00390                "quantity: %u\n", quantity);
00391          break;
00392       }
00393 
00394       res = read(rd_fd, buf,
00395          (quantity < sizeof(buf)) ? quantity : sizeof(buf));
00396 
00397       if (res == -1) {
00398          if (errno == EAGAIN) {
00399             continue;
00400          }
00401          ast_log(LOG_ERROR, "read failed on timing pipe: %s\n",
00402                strerror(errno));
00403          break;
00404       }
00405 
00406       quantity -= res;
00407       timer->pending_ticks -= res;
00408    } while (quantity);
00409 }
00410 
00411 /*!
00412  * \internal
00413  * \pre timer is locked
00414  */
00415 static void write_byte(struct pthread_timer *timer)
00416 {
00417    ssize_t res;
00418    unsigned char x = 42;
00419 
00420    do {
00421       res = write(timer->pipe[PIPE_WRITE], &x, 1);
00422    } while (res == -1 && errno == EAGAIN);
00423 
00424    if (res == -1) {
00425       ast_log(LOG_ERROR, "Error writing to timing pipe: %s\n",
00426             strerror(errno));
00427    } else {
00428       timer->pending_ticks++;
00429    }
00430 }
00431 
00432 static int run_timer(void *obj, void *arg, int flags)
00433 {
00434    struct pthread_timer *timer = obj;
00435 
00436    if (timer->state == TIMER_STATE_IDLE) {
00437       return 0;
00438    }
00439 
00440    ao2_lock(timer);
00441    if (check_timer(timer)) {
00442       write_byte(timer);
00443    }
00444    ao2_unlock(timer);
00445 
00446    return 0;
00447 }
00448 
00449 static void *do_timing(void *arg)
00450 {
00451    struct timeval next_wakeup = ast_tvnow();
00452 
00453    while (!timing_thread.stop) {
00454       struct timespec ts = { 0, };
00455 
00456       ao2_callback(pthread_timers, OBJ_NODATA, run_timer, NULL);
00457 
00458       next_wakeup = ast_tvadd(next_wakeup, ast_tv(0, 5000));
00459 
00460       ts.tv_sec = next_wakeup.tv_sec;
00461       ts.tv_nsec = next_wakeup.tv_usec * 1000;
00462 
00463       ast_mutex_lock(&timing_thread.lock);
00464       if (!timing_thread.stop) {
00465          if (ao2_container_count(pthread_timers)) {
00466             ast_cond_timedwait(&timing_thread.cond, &timing_thread.lock, &ts);
00467          } else {
00468             ast_cond_wait(&timing_thread.cond, &timing_thread.lock);
00469          }
00470       }
00471       ast_mutex_unlock(&timing_thread.lock);
00472    }
00473 
00474    return NULL;
00475 }
00476 
00477 static int init_timing_thread(void)
00478 {
00479    ast_mutex_init(&timing_thread.lock);
00480    ast_cond_init(&timing_thread.cond, NULL);
00481 
00482    if (ast_pthread_create_background(&timing_thread.thread, NULL, do_timing, NULL)) {
00483       ast_log(LOG_ERROR, "Unable to start timing thread.\n");
00484       return -1;
00485    }
00486 
00487    return 0;
00488 }
00489 
00490 static int load_module(void)
00491 {
00492    if (!(pthread_timers = ao2_container_alloc(PTHREAD_TIMER_BUCKETS,
00493       pthread_timer_hash, pthread_timer_cmp))) {
00494       return AST_MODULE_LOAD_DECLINE;
00495    }
00496 
00497    if (init_timing_thread()) {
00498       ao2_ref(pthread_timers, -1);
00499       pthread_timers = NULL;
00500       return AST_MODULE_LOAD_DECLINE;
00501    }
00502 
00503    return (timing_funcs_handle = ast_register_timing_interface(&pthread_timing)) ?
00504       AST_MODULE_LOAD_SUCCESS : AST_MODULE_LOAD_DECLINE;
00505 }
00506 
00507 static int unload_module(void)
00508 {
00509    int res;
00510 
00511    ast_mutex_lock(&timing_thread.lock);
00512    timing_thread.stop = 1;
00513    ast_cond_signal(&timing_thread.cond);
00514    ast_mutex_unlock(&timing_thread.lock);
00515    pthread_join(timing_thread.thread, NULL);
00516 
00517    if (!(res = ast_unregister_timing_interface(timing_funcs_handle))) {
00518       ao2_ref(pthread_timers, -1);
00519       pthread_timers = NULL;
00520    }
00521 
00522    return res;
00523 }
00524 AST_MODULE_INFO(ASTERISK_GPL_KEY, AST_MODFLAG_LOAD_ORDER, "pthread Timing Interface",
00525       .load = load_module,
00526       .unload = unload_module,
00527       .load_pri = 10,
00528       );

Generated on Wed Oct 28 11:51:08 2009 for Asterisk - the Open Source PBX by  doxygen 1.5.6