pjsip_scheduler.c 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608
  1. /*
  2. * Asterisk -- An open source telephony toolkit.
  3. *
  4. * Copyright (C) 2016, Fairview 5 Engineering, LLC
  5. *
  6. * George Joseph <george.joseph@fairview5.com>
  7. *
  8. * See http://www.asterisk.org for more information about
  9. * the Asterisk project. Please do not directly contact
  10. * any of the maintainers of this project for assistance;
  11. * the project provides a web site, mailing lists and IRC
  12. * channels for your use.
  13. *
  14. * This program is free software, distributed under the terms of
  15. * the GNU General Public License Version 2. See the LICENSE file
  16. * at the top of the source tree.
  17. */
  18. /*! \file
  19. *
  20. * \brief res_pjsip Scheduler
  21. *
  22. * \author George Joseph <george.joseph@fairview5.com>
  23. */
  24. #include "asterisk.h"
  25. ASTERISK_REGISTER_FILE()
  26. #include "asterisk/res_pjsip.h"
  27. #include "include/res_pjsip_private.h"
  28. #include "asterisk/res_pjsip_cli.h"
  29. #include "asterisk/taskprocessor.h"
  30. #define TASK_BUCKETS 53
  31. static struct ast_sched_context *scheduler_context;
  32. static struct ao2_container *tasks;
  33. static int task_count;
  34. struct ast_sip_sched_task {
  35. /*! The serializer to be used (if any) (Holds a ref) */
  36. struct ast_taskprocessor *serializer;
  37. /*! task data */
  38. void *task_data;
  39. /*! task function */
  40. ast_sip_task task;
  41. /*! the time the task was originally scheduled/queued */
  42. struct timeval when_queued;
  43. /*! the last time the task was started */
  44. struct timeval last_start;
  45. /*! the last time the task was ended */
  46. struct timeval last_end;
  47. /*! When the periodic task is next expected to run */
  48. struct timeval next_periodic;
  49. /*! reschedule interval in milliseconds */
  50. int interval;
  51. /*! ast_sched scheudler id */
  52. int current_scheduler_id;
  53. /*! task is currently running */
  54. int is_running;
  55. /*! times run */
  56. int run_count;
  57. /*! the task reschedule, cleanup and policy flags */
  58. enum ast_sip_scheduler_task_flags flags;
  59. /*! A name to be associated with the task */
  60. char name[0];
  61. };
  62. AO2_STRING_FIELD_HASH_FN(ast_sip_sched_task, name);
  63. AO2_STRING_FIELD_CMP_FN(ast_sip_sched_task, name);
  64. AO2_STRING_FIELD_SORT_FN(ast_sip_sched_task, name);
  65. static int push_to_serializer(const void *data);
  66. /*
  67. * This function is run in the context of the serializer.
  68. * It runs the task with a simple call and reschedules based on the result.
  69. */
  70. static int run_task(void *data)
  71. {
  72. RAII_VAR(struct ast_sip_sched_task *, schtd, data, ao2_cleanup);
  73. int res;
  74. int delay;
  75. if (!schtd->interval) {
  76. /* Task was cancelled while waiting to be executed by the serializer */
  77. return -1;
  78. }
  79. if (schtd->flags & AST_SIP_SCHED_TASK_TRACK) {
  80. ast_log(LOG_DEBUG, "Sched %p: Running %s\n", schtd, schtd->name);
  81. }
  82. ao2_lock(schtd);
  83. schtd->last_start = ast_tvnow();
  84. schtd->is_running = 1;
  85. ++schtd->run_count;
  86. ao2_unlock(schtd);
  87. res = schtd->task(schtd->task_data);
  88. ao2_lock(schtd);
  89. schtd->is_running = 0;
  90. schtd->last_end = ast_tvnow();
  91. /*
  92. * Don't restart if the task returned <= 0 or if the interval
  93. * was set to 0 while the task was running
  94. */
  95. if (res <= 0 || !schtd->interval) {
  96. schtd->interval = 0;
  97. ao2_unlock(schtd);
  98. ao2_unlink(tasks, schtd);
  99. return -1;
  100. }
  101. if (schtd->flags & AST_SIP_SCHED_TASK_VARIABLE) {
  102. schtd->interval = res;
  103. }
  104. if (schtd->flags & AST_SIP_SCHED_TASK_DELAY) {
  105. delay = schtd->interval;
  106. } else {
  107. int64_t diff;
  108. /* Determine next periodic interval we need to expire. */
  109. do {
  110. schtd->next_periodic = ast_tvadd(schtd->next_periodic,
  111. ast_samp2tv(schtd->interval, 1000));
  112. diff = ast_tvdiff_ms(schtd->next_periodic, schtd->last_end);
  113. } while (diff <= 0);
  114. delay = diff;
  115. }
  116. schtd->current_scheduler_id = ast_sched_add(scheduler_context, delay, push_to_serializer, schtd);
  117. if (schtd->current_scheduler_id < 0) {
  118. schtd->interval = 0;
  119. ao2_unlock(schtd);
  120. ast_log(LOG_ERROR, "Sched %p: Failed to reschedule task %s\n", schtd, schtd->name);
  121. ao2_unlink(tasks, schtd);
  122. return -1;
  123. }
  124. ao2_unlock(schtd);
  125. if (schtd->flags & AST_SIP_SCHED_TASK_TRACK) {
  126. ast_log(LOG_DEBUG, "Sched %p: Rescheduled %s for %d ms\n", schtd, schtd->name,
  127. delay);
  128. }
  129. return 0;
  130. }
  131. /*
  132. * This function is run by the scheduler thread. Its only job is to push the task
  133. * to the serialize and return. It returns 0 so it's not rescheduled.
  134. */
  135. static int push_to_serializer(const void *data)
  136. {
  137. struct ast_sip_sched_task *schtd = (struct ast_sip_sched_task *)data;
  138. int sched_id;
  139. ao2_lock(schtd);
  140. sched_id = schtd->current_scheduler_id;
  141. schtd->current_scheduler_id = -1;
  142. ao2_unlock(schtd);
  143. if (sched_id < 0) {
  144. /* Task was cancelled while waiting on the lock */
  145. return 0;
  146. }
  147. if (schtd->flags & AST_SIP_SCHED_TASK_TRACK) {
  148. ast_log(LOG_DEBUG, "Sched %p: Ready to run %s\n", schtd, schtd->name);
  149. }
  150. ao2_t_ref(schtd, +1, "Give ref to run_task()");
  151. if (ast_sip_push_task(schtd->serializer, run_task, schtd)) {
  152. /*
  153. * Oh my. Have to cancel the scheduled item because we
  154. * unexpectedly cannot run it anymore.
  155. */
  156. ao2_unlink(tasks, schtd);
  157. ao2_lock(schtd);
  158. schtd->interval = 0;
  159. ao2_unlock(schtd);
  160. ao2_t_ref(schtd, -1, "Failed so release run_task() ref");
  161. }
  162. return 0;
  163. }
  164. int ast_sip_sched_task_cancel(struct ast_sip_sched_task *schtd)
  165. {
  166. int res;
  167. int sched_id;
  168. if (schtd->flags & AST_SIP_SCHED_TASK_TRACK) {
  169. ast_log(LOG_DEBUG, "Sched %p: Canceling %s\n", schtd, schtd->name);
  170. }
  171. /*
  172. * Prevent any tasks in the serializer queue from
  173. * running and restarting the scheduled item on us
  174. * first.
  175. */
  176. ao2_lock(schtd);
  177. schtd->interval = 0;
  178. sched_id = schtd->current_scheduler_id;
  179. schtd->current_scheduler_id = -1;
  180. ao2_unlock(schtd);
  181. res = ast_sched_del(scheduler_context, sched_id);
  182. ao2_unlink(tasks, schtd);
  183. return res;
  184. }
  185. int ast_sip_sched_task_cancel_by_name(const char *name)
  186. {
  187. int res;
  188. struct ast_sip_sched_task *schtd;
  189. if (ast_strlen_zero(name)) {
  190. return -1;
  191. }
  192. schtd = ao2_find(tasks, name, OBJ_SEARCH_KEY);
  193. if (!schtd) {
  194. return -1;
  195. }
  196. res = ast_sip_sched_task_cancel(schtd);
  197. ao2_ref(schtd, -1);
  198. return res;
  199. }
  200. int ast_sip_sched_task_get_times(struct ast_sip_sched_task *schtd,
  201. struct timeval *queued, struct timeval *last_start, struct timeval *last_end)
  202. {
  203. ao2_lock(schtd);
  204. if (queued) {
  205. memcpy(queued, &schtd->when_queued, sizeof(struct timeval));
  206. }
  207. if (last_start) {
  208. memcpy(last_start, &schtd->last_start, sizeof(struct timeval));
  209. }
  210. if (last_end) {
  211. memcpy(last_end, &schtd->last_end, sizeof(struct timeval));
  212. }
  213. ao2_unlock(schtd);
  214. return 0;
  215. }
  216. int ast_sip_sched_task_get_times_by_name(const char *name,
  217. struct timeval *queued, struct timeval *last_start, struct timeval *last_end)
  218. {
  219. int res;
  220. struct ast_sip_sched_task *schtd;
  221. if (ast_strlen_zero(name)) {
  222. return -1;
  223. }
  224. schtd = ao2_find(tasks, name, OBJ_SEARCH_KEY);
  225. if (!schtd) {
  226. return -1;
  227. }
  228. res = ast_sip_sched_task_get_times(schtd, queued, last_start, last_end);
  229. ao2_ref(schtd, -1);
  230. return res;
  231. }
  232. int ast_sip_sched_task_get_name(struct ast_sip_sched_task *schtd, char *name, size_t maxlen)
  233. {
  234. if (maxlen <= 0) {
  235. return -1;
  236. }
  237. ao2_lock(schtd);
  238. ast_copy_string(name, schtd->name, maxlen);
  239. ao2_unlock(schtd);
  240. return 0;
  241. }
  242. int ast_sip_sched_task_get_next_run(struct ast_sip_sched_task *schtd)
  243. {
  244. int delay;
  245. struct timeval since_when;
  246. struct timeval now;
  247. ao2_lock(schtd);
  248. if (schtd->interval) {
  249. delay = schtd->interval;
  250. now = ast_tvnow();
  251. if (schtd->flags & AST_SIP_SCHED_TASK_DELAY) {
  252. since_when = schtd->is_running ? now : schtd->last_end;
  253. } else {
  254. since_when = schtd->last_start.tv_sec ? schtd->last_start : schtd->when_queued;
  255. }
  256. delay -= ast_tvdiff_ms(now, since_when);
  257. delay = delay < 0 ? 0 : delay;
  258. } else {
  259. delay = -1;
  260. }
  261. ao2_unlock(schtd);
  262. return delay;
  263. }
  264. int ast_sip_sched_task_get_next_run_by_name(const char *name)
  265. {
  266. int next_run;
  267. struct ast_sip_sched_task *schtd;
  268. if (ast_strlen_zero(name)) {
  269. return -1;
  270. }
  271. schtd = ao2_find(tasks, name, OBJ_SEARCH_KEY);
  272. if (!schtd) {
  273. return -1;
  274. }
  275. next_run = ast_sip_sched_task_get_next_run(schtd);
  276. ao2_ref(schtd, -1);
  277. return next_run;
  278. }
  279. int ast_sip_sched_is_task_running(struct ast_sip_sched_task *schtd)
  280. {
  281. return schtd ? schtd->is_running : 0;
  282. }
  283. int ast_sip_sched_is_task_running_by_name(const char *name)
  284. {
  285. int is_running;
  286. struct ast_sip_sched_task *schtd;
  287. if (ast_strlen_zero(name)) {
  288. return 0;
  289. }
  290. schtd = ao2_find(tasks, name, OBJ_SEARCH_KEY);
  291. if (!schtd) {
  292. return 0;
  293. }
  294. is_running = schtd->is_running;
  295. ao2_ref(schtd, -1);
  296. return is_running;
  297. }
  298. static void schtd_dtor(void *data)
  299. {
  300. struct ast_sip_sched_task *schtd = data;
  301. if (schtd->flags & AST_SIP_SCHED_TASK_TRACK) {
  302. ast_log(LOG_DEBUG, "Sched %p: Destructor %s\n", schtd, schtd->name);
  303. }
  304. if (schtd->flags & AST_SIP_SCHED_TASK_DATA_AO2) {
  305. /* release our own ref, then release the callers if asked to do so */
  306. ao2_ref(schtd->task_data, (schtd->flags & AST_SIP_SCHED_TASK_DATA_FREE) ? -2 : -1);
  307. } else if (schtd->task_data && (schtd->flags & AST_SIP_SCHED_TASK_DATA_FREE)) {
  308. ast_free(schtd->task_data);
  309. }
  310. ast_taskprocessor_unreference(schtd->serializer);
  311. }
  312. struct ast_sip_sched_task *ast_sip_schedule_task(struct ast_taskprocessor *serializer,
  313. int interval, ast_sip_task sip_task, const char *name, void *task_data,
  314. enum ast_sip_scheduler_task_flags flags)
  315. {
  316. #define ID_LEN 13 /* task_deadbeef */
  317. struct ast_sip_sched_task *schtd;
  318. int res;
  319. if (interval <= 0) {
  320. return NULL;
  321. }
  322. schtd = ao2_alloc((sizeof(*schtd) + (!ast_strlen_zero(name) ? strlen(name) : ID_LEN) + 1),
  323. schtd_dtor);
  324. if (!schtd) {
  325. return NULL;
  326. }
  327. schtd->serializer = ao2_bump(serializer);
  328. schtd->task_data = task_data;
  329. schtd->task = sip_task;
  330. schtd->interval = interval;
  331. schtd->flags = flags;
  332. if (!ast_strlen_zero(name)) {
  333. strcpy(schtd->name, name); /* Safe */
  334. } else {
  335. uint32_t task_id;
  336. task_id = ast_atomic_fetchadd_int(&task_count, 1);
  337. sprintf(schtd->name, "task_%08x", task_id);
  338. }
  339. if (schtd->flags & AST_SIP_SCHED_TASK_TRACK) {
  340. ast_log(LOG_DEBUG, "Sched %p: Scheduling %s for %d ms\n", schtd, schtd->name,
  341. interval);
  342. }
  343. schtd->when_queued = ast_tvnow();
  344. if (!(schtd->flags & AST_SIP_SCHED_TASK_DELAY)) {
  345. schtd->next_periodic = ast_tvadd(schtd->when_queued,
  346. ast_samp2tv(schtd->interval, 1000));
  347. }
  348. if (flags & AST_SIP_SCHED_TASK_DATA_AO2) {
  349. ao2_ref(task_data, +1);
  350. }
  351. /*
  352. * We must put it in the 'tasks' container before scheduling
  353. * the task because we don't want the push_to_serializer()
  354. * sched task to "remove" it on failure before we even put
  355. * it in. If this happens then nothing would remove it from
  356. * the 'tasks' container.
  357. */
  358. ao2_link(tasks, schtd);
  359. /*
  360. * Lock so we are guaranteed to get the sched id set before
  361. * the push_to_serializer() sched task can clear it.
  362. */
  363. ao2_lock(schtd);
  364. res = ast_sched_add(scheduler_context, interval, push_to_serializer, schtd);
  365. schtd->current_scheduler_id = res;
  366. ao2_unlock(schtd);
  367. if (res < 0) {
  368. ao2_unlink(tasks, schtd);
  369. ao2_ref(schtd, -1);
  370. return NULL;
  371. }
  372. return schtd;
  373. #undef ID_LEN
  374. }
  375. static char *cli_show_tasks(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
  376. {
  377. struct ao2_iterator iter;
  378. struct ao2_container *sorted_tasks;
  379. struct ast_sip_sched_task *schtd;
  380. const char *log_format;
  381. struct ast_tm tm;
  382. char queued[32];
  383. char last_start[32];
  384. char next_start[32];
  385. int datelen;
  386. struct timeval now;
  387. static const char separator[] = "=============================================";
  388. switch (cmd) {
  389. case CLI_INIT:
  390. e->command = "pjsip show scheduled_tasks";
  391. e->usage = "Usage: pjsip show scheduled_tasks\n"
  392. " Show all scheduled tasks\n";
  393. return NULL;
  394. case CLI_GENERATE:
  395. return NULL;
  396. }
  397. if (a->argc != 3) {
  398. return CLI_SHOWUSAGE;
  399. }
  400. /* Get a sorted snapshot of the scheduled tasks */
  401. sorted_tasks = ao2_container_alloc_rbtree(AO2_ALLOC_OPT_LOCK_NOLOCK, 0,
  402. ast_sip_sched_task_sort_fn, NULL);
  403. if (!sorted_tasks) {
  404. return CLI_SUCCESS;
  405. }
  406. if (ao2_container_dup(sorted_tasks, tasks, 0)) {
  407. ao2_ref(sorted_tasks, -1);
  408. return CLI_SUCCESS;
  409. }
  410. now = ast_tvnow();
  411. log_format = ast_logger_get_dateformat();
  412. ast_localtime(&now, &tm, NULL);
  413. datelen = ast_strftime(queued, sizeof(queued), log_format, &tm);
  414. ast_cli(a->fd, "PJSIP Scheduled Tasks:\n\n");
  415. ast_cli(a->fd, "%1$-45s %2$-9s %3$-9s %4$-5s %6$-*5$s %7$-*5$s %8$-*5$s %9$7s\n",
  416. "Task Name", "Interval", "Times Run", "State",
  417. datelen, "Queued", "Last Started", "Next Start", "( secs)");
  418. ast_cli(a->fd, "%1$-45.45s %2$-9.9s %3$-9.9s %4$-5.5s %6$-*5$.*5$s %7$-*5$.*5$s %9$-*8$.*8$s\n",
  419. separator, separator, separator, separator,
  420. datelen, separator, separator, datelen + 8, separator);
  421. iter = ao2_iterator_init(sorted_tasks, AO2_ITERATOR_UNLINK);
  422. for (; (schtd = ao2_iterator_next(&iter)); ao2_ref(schtd, -1)) {
  423. int next_run_sec;
  424. struct timeval next;
  425. ao2_lock(schtd);
  426. next_run_sec = ast_sip_sched_task_get_next_run(schtd) / 1000;
  427. if (next_run_sec < 0) {
  428. /* Scheduled task is now canceled */
  429. ao2_unlock(schtd);
  430. continue;
  431. }
  432. next = ast_tvadd(now, ast_tv(next_run_sec, 0));
  433. ast_localtime(&schtd->when_queued, &tm, NULL);
  434. ast_strftime(queued, sizeof(queued), log_format, &tm);
  435. if (ast_tvzero(schtd->last_start)) {
  436. strcpy(last_start, "not yet started");
  437. } else {
  438. ast_localtime(&schtd->last_start, &tm, NULL);
  439. ast_strftime(last_start, sizeof(last_start), log_format, &tm);
  440. }
  441. ast_localtime(&next, &tm, NULL);
  442. ast_strftime(next_start, sizeof(next_start), log_format, &tm);
  443. ast_cli(a->fd, "%1$-46.46s%2$9.3f %3$9d %4$-5s %6$-*5$s %7$-*5$s %8$-*5$s (%9$5d)\n",
  444. schtd->name,
  445. schtd->interval / 1000.0,
  446. schtd->run_count,
  447. schtd->is_running ? "run" : "wait",
  448. datelen, queued, last_start,
  449. next_start,
  450. next_run_sec);
  451. ao2_unlock(schtd);
  452. }
  453. ao2_iterator_destroy(&iter);
  454. ao2_ref(sorted_tasks, -1);
  455. ast_cli(a->fd, "\n");
  456. return CLI_SUCCESS;
  457. }
  458. static struct ast_cli_entry cli_commands[] = {
  459. AST_CLI_DEFINE(cli_show_tasks, "Show all scheduled tasks"),
  460. };
  461. int ast_sip_initialize_scheduler(void)
  462. {
  463. scheduler_context = ast_sched_context_create();
  464. if (!scheduler_context) {
  465. ast_log(LOG_ERROR, "Failed to create scheduler. Aborting load\n");
  466. return -1;
  467. }
  468. if (ast_sched_start_thread(scheduler_context)) {
  469. ast_log(LOG_ERROR, "Failed to start scheduler. Aborting load\n");
  470. ast_sched_context_destroy(scheduler_context);
  471. return -1;
  472. }
  473. tasks = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_RWLOCK,
  474. AO2_CONTAINER_ALLOC_OPT_DUPS_REJECT, TASK_BUCKETS, ast_sip_sched_task_hash_fn,
  475. ast_sip_sched_task_sort_fn, ast_sip_sched_task_cmp_fn);
  476. if (!tasks) {
  477. ast_log(LOG_ERROR, "Failed to allocate task container. Aborting load\n");
  478. ast_sched_context_destroy(scheduler_context);
  479. return -1;
  480. }
  481. ast_cli_register_multiple(cli_commands, ARRAY_LEN(cli_commands));
  482. return 0;
  483. }
  484. int ast_sip_destroy_scheduler(void)
  485. {
  486. ast_cli_unregister_multiple(cli_commands, ARRAY_LEN(cli_commands));
  487. if (scheduler_context) {
  488. if (tasks) {
  489. struct ao2_iterator iter;
  490. struct ast_sip_sched_task *schtd;
  491. /* Cancel all scheduled tasks */
  492. iter = ao2_iterator_init(tasks, 0);
  493. while ((schtd = ao2_iterator_next(&iter))) {
  494. ast_sip_sched_task_cancel(schtd);
  495. ao2_ref(schtd, -1);
  496. }
  497. ao2_iterator_destroy(&iter);
  498. }
  499. ast_sched_context_destroy(scheduler_context);
  500. scheduler_context = NULL;
  501. }
  502. ao2_cleanup(tasks);
  503. tasks = NULL;
  504. return 0;
  505. }