sched.c

Go to the documentation of this file.
00001 /*
00002  * Asterisk -- An open source telephony toolkit.
00003  *
00004  * Copyright (C) 1999 - 2010, Digium, Inc.
00005  *
00006  * Mark Spencer <markster@digium.com>
00007  * Russell Bryant <russell@digium.com>
00008  *
00009  * See http://www.asterisk.org for more information about
00010  * the Asterisk project. Please do not directly contact
00011  * any of the maintainers of this project for assistance;
00012  * the project provides a web site, mailing lists and IRC
00013  * channels for your use.
00014  *
00015  * This program is free software, distributed under the terms of
00016  * the GNU General Public License Version 2. See the LICENSE file
00017  * at the top of the source tree.
00018  */
00019 
00020 /*! \file
00021  *
00022  * \brief Scheduler Routines (from cheops-NG)
00023  *
00024  * \author Mark Spencer <markster@digium.com>
00025  */
00026 
00027 /*** MODULEINFO
00028    <support_level>core</support_level>
00029  ***/
00030 
00031 #include "asterisk.h"
00032 
00033 ASTERISK_FILE_VERSION(__FILE__, "$Revision: 431918 $")
00034 
00035 #ifdef DEBUG_SCHEDULER
00036 #define DEBUG(a) do { \
00037    if (option_debug) \
00038       DEBUG_M(a) \
00039    } while (0)
00040 #else
00041 #define DEBUG(a)
00042 #endif
00043 
00044 #include <sys/time.h>
00045 
00046 #include "asterisk/sched.h"
00047 #include "asterisk/channel.h"
00048 #include "asterisk/lock.h"
00049 #include "asterisk/utils.h"
00050 #include "asterisk/heap.h"
00051 #include "asterisk/threadstorage.h"
00052 
00053 /*!
00054  * \brief Max num of schedule structs
00055  *
00056  * \note The max number of schedule structs to keep around
00057  * for use.  Undefine to disable schedule structure
00058  * caching. (Only disable this on very low memory
00059  * machines)
00060  */
00061 #define SCHED_MAX_CACHE 128
00062 
00063 AST_THREADSTORAGE(last_del_id);
00064 
00065 struct sched {
00066    AST_LIST_ENTRY(sched) list;
00067    int id;                       /*!< ID number of event */
00068    struct timeval when;          /*!< Absolute time event should take place */
00069    int resched;                  /*!< When to reschedule */
00070    int variable;                 /*!< Use return value from callback to reschedule */
00071    const void *data;             /*!< Data */
00072    ast_sched_cb callback;        /*!< Callback */
00073    ssize_t __heap_index;
00074    /*!
00075     * Used to synchronize between thread running a task and thread
00076     * attempting to delete a task
00077     */
00078    ast_cond_t cond;
00079    /*! Indication that a running task was deleted. */
00080    unsigned int deleted:1;
00081 };
00082 
00083 struct sched_thread {
00084    pthread_t thread;
00085    ast_cond_t cond;
00086    unsigned int stop:1;
00087 };
00088 
00089 struct ast_sched_context {
00090    ast_mutex_t lock;
00091    unsigned int eventcnt;                  /*!< Number of events processed */
00092    unsigned int highwater;             /*!< highest count so far */
00093    struct ast_heap *sched_heap;
00094    struct sched_thread *sched_thread;
00095    /*! The scheduled task that is currently executing */
00096    struct sched *currently_executing;
00097 
00098 #ifdef SCHED_MAX_CACHE
00099    AST_LIST_HEAD_NOLOCK(, sched) schedc;   /*!< Cache of unused schedule structures and how many */
00100    unsigned int schedccnt;
00101 #endif
00102 };
00103 
00104 static void *sched_run(void *data)
00105 {
00106    struct ast_sched_context *con = data;
00107 
00108    while (!con->sched_thread->stop) {
00109       int ms;
00110       struct timespec ts = {
00111          .tv_sec = 0,
00112       };
00113 
00114       ast_mutex_lock(&con->lock);
00115 
00116       if (con->sched_thread->stop) {
00117          ast_mutex_unlock(&con->lock);
00118          return NULL;
00119       }
00120 
00121       ms = ast_sched_wait(con);
00122 
00123       if (ms == -1) {
00124          ast_cond_wait(&con->sched_thread->cond, &con->lock);
00125       } else {
00126          struct timeval tv;
00127          tv = ast_tvadd(ast_tvnow(), ast_samp2tv(ms, 1000));
00128          ts.tv_sec = tv.tv_sec;
00129          ts.tv_nsec = tv.tv_usec * 1000;
00130          ast_cond_timedwait(&con->sched_thread->cond, &con->lock, &ts);
00131       }
00132 
00133       ast_mutex_unlock(&con->lock);
00134 
00135       if (con->sched_thread->stop) {
00136          return NULL;
00137       }
00138 
00139       ast_sched_runq(con);
00140    }
00141 
00142    return NULL;
00143 }
00144 
00145 static void sched_thread_destroy(struct ast_sched_context *con)
00146 {
00147    if (!con->sched_thread) {
00148       return;
00149    }
00150 
00151    if (con->sched_thread->thread != AST_PTHREADT_NULL) {
00152       ast_mutex_lock(&con->lock);
00153       con->sched_thread->stop = 1;
00154       ast_cond_signal(&con->sched_thread->cond);
00155       ast_mutex_unlock(&con->lock);
00156       pthread_join(con->sched_thread->thread, NULL);
00157       con->sched_thread->thread = AST_PTHREADT_NULL;
00158    }
00159 
00160    ast_cond_destroy(&con->sched_thread->cond);
00161 
00162    ast_free(con->sched_thread);
00163 
00164    con->sched_thread = NULL;
00165 }
00166 
00167 int ast_sched_start_thread(struct ast_sched_context *con)
00168 {
00169    struct sched_thread *st;
00170 
00171    if (con->sched_thread) {
00172       ast_log(LOG_ERROR, "Thread already started on this scheduler context\n");
00173       return -1;
00174    }
00175 
00176    if (!(st = ast_calloc(1, sizeof(*st)))) {
00177       return -1;
00178    }
00179 
00180    ast_cond_init(&st->cond, NULL);
00181 
00182    st->thread = AST_PTHREADT_NULL;
00183 
00184    con->sched_thread = st;
00185 
00186    if (ast_pthread_create_background(&st->thread, NULL, sched_run, con)) {
00187       ast_log(LOG_ERROR, "Failed to create scheduler thread\n");
00188       sched_thread_destroy(con);
00189       return -1;
00190    }
00191 
00192    return 0;
00193 }
00194 
00195 static int sched_time_cmp(void *a, void *b)
00196 {
00197    return ast_tvcmp(((struct sched *) b)->when, ((struct sched *) a)->when);
00198 }
00199 
00200 struct ast_sched_context *ast_sched_context_create(void)
00201 {
00202    struct ast_sched_context *tmp;
00203 
00204    if (!(tmp = ast_calloc(1, sizeof(*tmp)))) {
00205       return NULL;
00206    }
00207 
00208    ast_mutex_init(&tmp->lock);
00209    tmp->eventcnt = 1;
00210 
00211    if (!(tmp->sched_heap = ast_heap_create(8, sched_time_cmp,
00212          offsetof(struct sched, __heap_index)))) {
00213       ast_sched_context_destroy(tmp);
00214       return NULL;
00215    }
00216 
00217    return tmp;
00218 }
00219 
00220 static void sched_free(struct sched *task)
00221 {
00222    ast_cond_destroy(&task->cond);
00223    ast_free(task);
00224 }
00225 
00226 void ast_sched_context_destroy(struct ast_sched_context *con)
00227 {
00228    struct sched *s;
00229 
00230    sched_thread_destroy(con);
00231    con->sched_thread = NULL;
00232 
00233    ast_mutex_lock(&con->lock);
00234 
00235 #ifdef SCHED_MAX_CACHE
00236    while ((s = AST_LIST_REMOVE_HEAD(&con->schedc, list))) {
00237       sched_free(s);
00238    }
00239 #endif
00240 
00241    if (con->sched_heap) {
00242       while ((s = ast_heap_pop(con->sched_heap))) {
00243          sched_free(s);
00244       }
00245       ast_heap_destroy(con->sched_heap);
00246       con->sched_heap = NULL;
00247    }
00248 
00249    ast_mutex_unlock(&con->lock);
00250    ast_mutex_destroy(&con->lock);
00251 
00252    ast_free(con);
00253 }
00254 
00255 static struct sched *sched_alloc(struct ast_sched_context *con)
00256 {
00257    struct sched *tmp;
00258 
00259    /*
00260     * We keep a small cache of schedule entries
00261     * to minimize the number of necessary malloc()'s
00262     */
00263 #ifdef SCHED_MAX_CACHE
00264    if ((tmp = AST_LIST_REMOVE_HEAD(&con->schedc, list))) {
00265       con->schedccnt--;
00266    } else 
00267 #endif
00268    {
00269       tmp = ast_calloc(1, sizeof(*tmp));
00270       ast_cond_init(&tmp->cond, NULL);
00271    }
00272 
00273    return tmp;
00274 }
00275 
00276 static void sched_release(struct ast_sched_context *con, struct sched *tmp)
00277 {
00278    /*
00279     * Add to the cache, or just free() if we
00280     * already have too many cache entries
00281     */
00282 
00283 #ifdef SCHED_MAX_CACHE
00284    if (con->schedccnt < SCHED_MAX_CACHE) {
00285       AST_LIST_INSERT_HEAD(&con->schedc, tmp, list);
00286       con->schedccnt++;
00287    } else
00288 #endif
00289       sched_free(tmp);
00290 }
00291 
00292 void ast_sched_clean_by_callback(struct ast_sched_context *con, ast_sched_cb match, ast_sched_cb cleanup_cb)
00293 {
00294    int i = 1;
00295    struct sched *current;
00296 
00297    ast_mutex_lock(&con->lock);
00298    while ((current = ast_heap_peek(con->sched_heap, i))) {
00299       if (current->callback != match) {
00300          i++;
00301          continue;
00302       }
00303 
00304       ast_heap_remove(con->sched_heap, current);
00305 
00306       cleanup_cb(current->data);
00307       sched_release(con, current);
00308    }
00309    ast_mutex_unlock(&con->lock);
00310 }
00311 
00312 /*! \brief
00313  * Return the number of milliseconds
00314  * until the next scheduled event
00315  */
00316 int ast_sched_wait(struct ast_sched_context *con)
00317 {
00318    int ms;
00319    struct sched *s;
00320 
00321    DEBUG(ast_debug(1, "ast_sched_wait()\n"));
00322 
00323    ast_mutex_lock(&con->lock);
00324    if ((s = ast_heap_peek(con->sched_heap, 1))) {
00325       ms = ast_tvdiff_ms(s->when, ast_tvnow());
00326       if (ms < 0) {
00327          ms = 0;
00328       }
00329    } else {
00330       ms = -1;
00331    }
00332    ast_mutex_unlock(&con->lock);
00333 
00334    return ms;
00335 }
00336 
00337 
00338 /*! \brief
00339  * Take a sched structure and put it in the
00340  * queue, such that the soonest event is
00341  * first in the list.
00342  */
00343 static void schedule(struct ast_sched_context *con, struct sched *s)
00344 {
00345    ast_heap_push(con->sched_heap, s);
00346 
00347    if (ast_heap_size(con->sched_heap) > con->highwater) {
00348       con->highwater = ast_heap_size(con->sched_heap);
00349    }
00350 }
00351 
00352 /*! \brief
00353  * given the last event *tv and the offset in milliseconds 'when',
00354  * computes the next value,
00355  */
00356 static int sched_settime(struct timeval *t, int when)
00357 {
00358    struct timeval now = ast_tvnow();
00359 
00360    /*ast_debug(1, "TV -> %lu,%lu\n", tv->tv_sec, tv->tv_usec);*/
00361    if (ast_tvzero(*t))  /* not supplied, default to now */
00362       *t = now;
00363    *t = ast_tvadd(*t, ast_samp2tv(when, 1000));
00364    if (ast_tvcmp(*t, now) < 0) {
00365       *t = now;
00366    }
00367    return 0;
00368 }
00369 
00370 int ast_sched_replace_variable(int old_id, struct ast_sched_context *con, int when, ast_sched_cb callback, const void *data, int variable)
00371 {
00372    /* 0 means the schedule item is new; do not delete */
00373    if (old_id > 0) {
00374       AST_SCHED_DEL(con, old_id);
00375    }
00376    return ast_sched_add_variable(con, when, callback, data, variable);
00377 }
00378 
00379 /*! \brief
00380  * Schedule callback(data) to happen when ms into the future
00381  */
00382 int ast_sched_add_variable(struct ast_sched_context *con, int when, ast_sched_cb callback, const void *data, int variable)
00383 {
00384    struct sched *tmp;
00385    int res = -1;
00386 
00387    DEBUG(ast_debug(1, "ast_sched_add()\n"));
00388 
00389    ast_mutex_lock(&con->lock);
00390    if ((tmp = sched_alloc(con))) {
00391       tmp->id = con->eventcnt++;
00392       tmp->callback = callback;
00393       tmp->data = data;
00394       tmp->resched = when;
00395       tmp->variable = variable;
00396       tmp->when = ast_tv(0, 0);
00397       tmp->deleted = 0;
00398       if (sched_settime(&tmp->when, when)) {
00399          sched_release(con, tmp);
00400       } else {
00401          schedule(con, tmp);
00402          res = tmp->id;
00403       }
00404    }
00405 #ifdef DUMP_SCHEDULER
00406    /* Dump contents of the context while we have the lock so nothing gets screwed up by accident. */
00407    if (option_debug)
00408       ast_sched_dump(con);
00409 #endif
00410    if (con->sched_thread) {
00411       ast_cond_signal(&con->sched_thread->cond);
00412    }
00413    ast_mutex_unlock(&con->lock);
00414 
00415    return res;
00416 }
00417 
00418 int ast_sched_replace(int old_id, struct ast_sched_context *con, int when, ast_sched_cb callback, const void *data)
00419 {
00420    if (old_id > -1) {
00421       AST_SCHED_DEL(con, old_id);
00422    }
00423    return ast_sched_add(con, when, callback, data);
00424 }
00425 
00426 int ast_sched_add(struct ast_sched_context *con, int when, ast_sched_cb callback, const void *data)
00427 {
00428    return ast_sched_add_variable(con, when, callback, data, 0);
00429 }
00430 
00431 static struct sched *sched_find(struct ast_sched_context *con, int id)
00432 {
00433    int x;
00434    size_t heap_size;
00435 
00436    heap_size = ast_heap_size(con->sched_heap);
00437    for (x = 1; x <= heap_size; x++) {
00438       struct sched *cur = ast_heap_peek(con->sched_heap, x);
00439 
00440       if (cur->id == id) {
00441          return cur;
00442       }
00443    }
00444 
00445    return NULL;
00446 }
00447 
00448 const void *ast_sched_find_data(struct ast_sched_context *con, int id)
00449 {
00450    struct sched *s;
00451    const void *data = NULL;
00452 
00453    ast_mutex_lock(&con->lock);
00454 
00455    s = sched_find(con, id);
00456    if (s) {
00457       data = s->data;
00458    }
00459 
00460    ast_mutex_unlock(&con->lock);
00461 
00462    return data;
00463 }
00464 
00465 /*! \brief
00466  * Delete the schedule entry with number
00467  * "id".  It's nearly impossible that there
00468  * would be two or more in the list with that
00469  * id.
00470  */
00471 #ifndef AST_DEVMODE
00472 int ast_sched_del(struct ast_sched_context *con, int id)
00473 #else
00474 int _ast_sched_del(struct ast_sched_context *con, int id, const char *file, int line, const char *function)
00475 #endif
00476 {
00477    struct sched *s = NULL;
00478    int *last_id = ast_threadstorage_get(&last_del_id, sizeof(int));
00479 
00480    DEBUG(ast_debug(1, "ast_sched_del(%d)\n", id));
00481 
00482    if (id < 0) {
00483       return 0;
00484    }
00485 
00486    ast_mutex_lock(&con->lock);
00487 
00488    s = sched_find(con, id);
00489    if (s) {
00490       if (!ast_heap_remove(con->sched_heap, s)) {
00491          ast_log(LOG_WARNING,"sched entry %d not in the sched heap?\n", s->id);
00492       }
00493       sched_release(con, s);
00494    } else if (con->currently_executing && (id == con->currently_executing->id)) {
00495       s = con->currently_executing;
00496       s->deleted = 1;
00497       /* Wait for executing task to complete so that caller of ast_sched_del() does not
00498        * free memory out from under the task.
00499        */
00500       ast_cond_wait(&s->cond, &con->lock);
00501       /* Do not sched_release() here because ast_sched_runq() will do it */
00502    }
00503 
00504 #ifdef DUMP_SCHEDULER
00505    /* Dump contents of the context while we have the lock so nothing gets screwed up by accident. */
00506    if (option_debug)
00507       ast_sched_dump(con);
00508 #endif
00509    if (con->sched_thread) {
00510       ast_cond_signal(&con->sched_thread->cond);
00511    }
00512    ast_mutex_unlock(&con->lock);
00513 
00514    if (!s && *last_id != id) {
00515       ast_debug(1, "Attempted to delete nonexistent schedule entry %d!\n", id);
00516 #ifndef AST_DEVMODE
00517       ast_assert(s != NULL);
00518 #else
00519       {
00520          char buf[100];
00521 
00522          snprintf(buf, sizeof(buf), "s != NULL, id=%d", id);
00523          _ast_assert(0, buf, file, line, function);
00524       }
00525 #endif
00526       *last_id = id;
00527       return -1;
00528    } else if (!s) {
00529       return -1;
00530    }
00531 
00532    return 0;
00533 }
00534 
00535 void ast_sched_report(struct ast_sched_context *con, struct ast_str **buf, struct ast_cb_names *cbnames)
00536 {
00537    int i, x;
00538    struct sched *cur;
00539    int countlist[cbnames->numassocs + 1];
00540    size_t heap_size;
00541 
00542    memset(countlist, 0, sizeof(countlist));
00543    ast_str_set(buf, 0, " Highwater = %u\n schedcnt = %zu\n", con->highwater, ast_heap_size(con->sched_heap));
00544 
00545    ast_mutex_lock(&con->lock);
00546 
00547    heap_size = ast_heap_size(con->sched_heap);
00548    for (x = 1; x <= heap_size; x++) {
00549       cur = ast_heap_peek(con->sched_heap, x);
00550       /* match the callback to the cblist */
00551       for (i = 0; i < cbnames->numassocs; i++) {
00552          if (cur->callback == cbnames->cblist[i]) {
00553             break;
00554          }
00555       }
00556       if (i < cbnames->numassocs) {
00557          countlist[i]++;
00558       } else {
00559          countlist[cbnames->numassocs]++;
00560       }
00561    }
00562 
00563    ast_mutex_unlock(&con->lock);
00564 
00565    for (i = 0; i < cbnames->numassocs; i++) {
00566       ast_str_append(buf, 0, "    %s : %d\n", cbnames->list[i], countlist[i]);
00567    }
00568 
00569    ast_str_append(buf, 0, "   <unknown> : %d\n", countlist[cbnames->numassocs]);
00570 }
00571 
00572 /*! \brief Dump the contents of the scheduler to LOG_DEBUG */
00573 void ast_sched_dump(struct ast_sched_context *con)
00574 {
00575    struct sched *q;
00576    struct timeval when = ast_tvnow();
00577    int x;
00578    size_t heap_size;
00579 #ifdef SCHED_MAX_CACHE
00580    ast_debug(1, "Asterisk Schedule Dump (%zu in Q, %u Total, %u Cache, %u high-water)\n", ast_heap_size(con->sched_heap), con->eventcnt - 1, con->schedccnt, con->highwater);
00581 #else
00582    ast_debug(1, "Asterisk Schedule Dump (%zu in Q, %u Total, %u high-water)\n", ast_heap_size(con->sched_heap), con->eventcnt - 1, con->highwater);
00583 #endif
00584 
00585    ast_debug(1, "=============================================================\n");
00586    ast_debug(1, "|ID    Callback          Data              Time  (sec:ms)   |\n");
00587    ast_debug(1, "+-----+-----------------+-----------------+-----------------+\n");
00588    ast_mutex_lock(&con->lock);
00589    heap_size = ast_heap_size(con->sched_heap);
00590    for (x = 1; x <= heap_size; x++) {
00591       struct timeval delta;
00592       q = ast_heap_peek(con->sched_heap, x);
00593       delta = ast_tvsub(q->when, when);
00594       ast_debug(1, "|%.4d | %-15p | %-15p | %.6ld : %.6ld |\n",
00595          q->id,
00596          q->callback,
00597          q->data,
00598          (long)delta.tv_sec,
00599          (long int)delta.tv_usec);
00600    }
00601    ast_mutex_unlock(&con->lock);
00602    ast_debug(1, "=============================================================\n");
00603 }
00604 
00605 /*! \brief
00606  * Launch all events which need to be run at this time.
00607  */
00608 int ast_sched_runq(struct ast_sched_context *con)
00609 {
00610    struct sched *current;
00611    struct timeval when;
00612    int numevents;
00613    int res;
00614 
00615    DEBUG(ast_debug(1, "ast_sched_runq()\n"));
00616 
00617    ast_mutex_lock(&con->lock);
00618 
00619    when = ast_tvadd(ast_tvnow(), ast_tv(0, 1000));
00620    for (numevents = 0; (current = ast_heap_peek(con->sched_heap, 1)); numevents++) {
00621       /* schedule all events which are going to expire within 1ms.
00622        * We only care about millisecond accuracy anyway, so this will
00623        * help us get more than one event at one time if they are very
00624        * close together.
00625        */
00626       if (ast_tvcmp(current->when, when) != -1) {
00627          break;
00628       }
00629 
00630       current = ast_heap_pop(con->sched_heap);
00631 
00632       /*
00633        * At this point, the schedule queue is still intact.  We
00634        * have removed the first event and the rest is still there,
00635        * so it's permissible for the callback to add new events, but
00636        * trying to delete itself won't work because it isn't in
00637        * the schedule queue.  If that's what it wants to do, it
00638        * should return 0.
00639        */
00640 
00641       con->currently_executing = current;
00642       ast_mutex_unlock(&con->lock);
00643       res = current->callback(current->data);
00644       ast_mutex_lock(&con->lock);
00645       con->currently_executing = NULL;
00646       ast_cond_signal(&current->cond);
00647 
00648       if (res && !current->deleted) {
00649          /*
00650           * If they return non-zero, we should schedule them to be
00651           * run again.
00652           */
00653          if (sched_settime(&current->when, current->variable? res : current->resched)) {
00654             sched_release(con, current);
00655          } else {
00656             schedule(con, current);
00657          }
00658       } else {
00659          /* No longer needed, so release it */
00660          sched_release(con, current);
00661       }
00662    }
00663 
00664    ast_mutex_unlock(&con->lock);
00665 
00666    return numevents;
00667 }
00668 
00669 long ast_sched_when(struct ast_sched_context *con,int id)
00670 {
00671    struct sched *s;
00672    long secs = -1;
00673    DEBUG(ast_debug(1, "ast_sched_when()\n"));
00674 
00675    ast_mutex_lock(&con->lock);
00676 
00677    s = sched_find(con, id);
00678    if (s) {
00679       struct timeval now = ast_tvnow();
00680       secs = s->when.tv_sec - now.tv_sec;
00681    }
00682 
00683    ast_mutex_unlock(&con->lock);
00684 
00685    return secs;
00686 }

Generated on Thu Apr 16 06:27:54 2015 for Asterisk - The Open Source Telephony Project by  doxygen 1.5.6