提交 bbdcd33b authored 作者: Anthony Minessale's avatar Anthony Minessale

performance tweaks for sip message parsing and event system

上级 b117a65c
......@@ -2329,6 +2329,8 @@ SWITCH_DECLARE(void) switch_say_file(switch_say_file_handle_t *sh, const char *f
SWITCH_DECLARE(int) switch_max_file_desc(void);
SWITCH_DECLARE(void) switch_close_extra_files(int *keep, int keep_ttl);
SWITCH_DECLARE(switch_status_t) switch_core_thread_set_cpu_affinity(int cpu);
SWITCH_DECLARE(void) switch_os_yield(void);
SWITCH_END_EXTERN_C
#endif
/* For Emacs:
......
......@@ -5445,7 +5445,10 @@ SWITCH_MODULE_LOAD_FUNCTION(mod_sofia_load)
switch_yield(1500000);
mod_sofia_globals.cpu_count = switch_core_cpu_count();
mod_sofia_globals.max_msg_queues = mod_sofia_globals.cpu_count + 1;
mod_sofia_globals.max_msg_queues = (mod_sofia_globals.cpu_count / 2) + 1;
if (mod_sofia_globals.max_msg_queues < 2) {
mod_sofia_globals.max_msg_queues = 2;
}
if (mod_sofia_globals.max_msg_queues > SOFIA_MAX_MSG_QUEUE) {
mod_sofia_globals.max_msg_queues = SOFIA_MAX_MSG_QUEUE;
......@@ -5627,11 +5630,12 @@ SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_sofia_shutdown)
}
for (i = 0; i < mod_sofia_globals.msg_queue_len; i++) {
switch_queue_push(mod_sofia_globals.msg_queue[i], NULL);
for (i = 0; mod_sofia_globals.msg_queue_thread[i]; i++) {
switch_queue_push(mod_sofia_globals.msg_queue, NULL);
}
for (i = 0; i < mod_sofia_globals.msg_queue_len; i++) {
for (i = 0; mod_sofia_globals.msg_queue_thread[i]; i++) {
switch_status_t st;
switch_thread_join(&st, mod_sofia_globals.msg_queue_thread[i]);
}
......
......@@ -342,7 +342,7 @@ typedef enum {
} TFLAGS;
#define SOFIA_MAX_MSG_QUEUE 64
#define SOFIA_MSG_QUEUE_SIZE 250
#define SOFIA_MSG_QUEUE_SIZE 100
struct mod_sofia_globals {
switch_memory_pool_t *pool;
......@@ -359,7 +359,7 @@ struct mod_sofia_globals {
char hostname[512];
switch_queue_t *presence_queue;
switch_queue_t *mwi_queue;
switch_queue_t *msg_queue[SOFIA_MAX_MSG_QUEUE];
switch_queue_t *msg_queue;
switch_thread_t *msg_queue_thread[SOFIA_MAX_MSG_QUEUE];
int msg_queue_len;
struct sofia_private destroy_private;
......
......@@ -1347,34 +1347,56 @@ void sofia_process_dispatch_event(sofia_dispatch_event_t **dep)
nua_handle_unref(nh);
nua_stack_unref(nua);
switch_os_yield();
}
static int msg_queue_threads = 0;
//static int count = 0;
void *SWITCH_THREAD_FUNC sofia_msg_thread_run(switch_thread_t *thread, void *obj)
{
void *pop;
switch_queue_t *q = (switch_queue_t *) obj;
int my_id;
for (my_id = 0; my_id < mod_sofia_globals.msg_queue_len; my_id++) {
if (mod_sofia_globals.msg_queue[my_id] == q) {
if (mod_sofia_globals.msg_queue_thread[my_id] == thread) {
break;
}
}
switch_mutex_lock(mod_sofia_globals.mutex);
msg_queue_threads++;
switch_mutex_unlock(mod_sofia_globals.mutex);
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "MSG Thread %d Started\n", my_id);
switch_core_thread_set_cpu_affinity(my_id);
while(switch_queue_pop(q, &pop) == SWITCH_STATUS_SUCCESS && pop) {
sofia_dispatch_event_t *de = (sofia_dispatch_event_t *) pop;
sofia_process_dispatch_event(&de);
switch_cond_next();
for(;;) {
if (switch_queue_pop(q, &pop) != SWITCH_STATUS_SUCCESS) {
switch_cond_next();
continue;
}
if (pop) {
sofia_dispatch_event_t *de = (sofia_dispatch_event_t *) pop;
sofia_process_dispatch_event(&de);
switch_os_yield();
} else {
break;
}
}
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "MSG Thread Ended\n");
switch_mutex_lock(mod_sofia_globals.mutex);
msg_queue_threads--;
switch_mutex_unlock(mod_sofia_globals.mutex);
return NULL;
}
......@@ -1392,19 +1414,22 @@ void sofia_msg_thread_start(int idx)
int i;
mod_sofia_globals.msg_queue_len = idx + 1;
if (!mod_sofia_globals.msg_queue) {
switch_queue_create(&mod_sofia_globals.msg_queue, SOFIA_MSG_QUEUE_SIZE * mod_sofia_globals.cpu_count, mod_sofia_globals.pool);
}
for (i = 0; i < mod_sofia_globals.msg_queue_len; i++) {
if (!mod_sofia_globals.msg_queue[i]) {
if (!mod_sofia_globals.msg_queue_thread[i]) {
switch_threadattr_t *thd_attr = NULL;
switch_queue_create(&mod_sofia_globals.msg_queue[i], SOFIA_MSG_QUEUE_SIZE, mod_sofia_globals.pool);
switch_threadattr_create(&thd_attr, mod_sofia_globals.pool);
switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE);
switch_threadattr_priority_increase(thd_attr);
switch_thread_create(&mod_sofia_globals.msg_queue_thread[i],
thd_attr,
sofia_msg_thread_run,
mod_sofia_globals.msg_queue[i],
mod_sofia_globals.msg_queue,
mod_sofia_globals.pool);
}
}
......@@ -1413,12 +1438,12 @@ void sofia_msg_thread_start(int idx)
switch_mutex_unlock(mod_sofia_globals.mutex);
}
//static int foo = 0;
static void sofia_queue_message(sofia_dispatch_event_t *de)
{
int idx = 0, queued = 0;
int launch = 0;
if (mod_sofia_globals.running == 0 || !mod_sofia_globals.msg_queue[0]) {
if (mod_sofia_globals.running == 0 || !mod_sofia_globals.msg_queue) {
sofia_process_dispatch_event(&de);
return;
}
......@@ -1430,25 +1455,18 @@ static void sofia_queue_message(sofia_dispatch_event_t *de)
}
again:
for (idx = 0; idx < mod_sofia_globals.msg_queue_len; idx++) {
if (switch_queue_trypush(mod_sofia_globals.msg_queue[idx], de) == SWITCH_STATUS_SUCCESS) {
queued++;
break;
}
if ((switch_queue_size(mod_sofia_globals.msg_queue) > (SOFIA_MSG_QUEUE_SIZE * msg_queue_threads))) {
launch++;
}
if (!queued) {
if (launch) {
if (mod_sofia_globals.msg_queue_len < mod_sofia_globals.max_msg_queues) {
sofia_msg_thread_start(mod_sofia_globals.msg_queue_len + 1);
goto again;
}
switch_queue_push(mod_sofia_globals.msg_queue[0], de);
}
switch_queue_push(mod_sofia_globals.msg_queue, de);
}
......@@ -1468,6 +1486,7 @@ void sofia_event_callback(nua_event_t event,
return;
}
switch_mutex_lock(profile->flag_mutex);
profile->queued_events++;
......@@ -1483,6 +1502,13 @@ void sofia_event_callback(nua_event_t event,
de->nua = nua_stack_ref(nua);
if (event == nua_i_invite && !sofia_private) {
int critical = (((SOFIA_MSG_QUEUE_SIZE * mod_sofia_globals.max_msg_queues) * 900) / 1000);
if (switch_queue_size(mod_sofia_globals.msg_queue) > critical) {
nua_respond(nh, 503, "Maximum Calls In Progress", SIPTAG_RETRY_AFTER_STR("300"), TAG_END());
return;
}
if (!(sofia_private = su_alloc(nh->nh_home, sizeof(*sofia_private)))) {
abort();
}
......@@ -1516,8 +1542,8 @@ void sofia_event_callback(nua_event_t event,
}
}
sofia_queue_message(de);
switch_os_yield();
}
......
......@@ -1415,7 +1415,7 @@ SWITCH_DECLARE(void) switch_channel_wait_for_state(switch_channel_t *channel, sw
(other_channel && switch_channel_down_nosig(other_channel)) || switch_channel_down(channel)) {
break;
}
switch_yield(20000);
switch_cond_next();
}
}
......
差异被折叠。
......@@ -140,7 +140,7 @@ typedef struct timer_matrix timer_matrix_t;
static timer_matrix_t TIMER_MATRIX[MAX_ELEMENTS + 1];
static void os_yield(void)
SWITCH_DECLARE(void) switch_os_yield(void)
{
#if defined(WIN32)
SwitchToThread();
......@@ -467,7 +467,7 @@ SWITCH_DECLARE(void) switch_sleep(switch_interval_time_t t)
SWITCH_DECLARE(void) switch_cond_next(void)
{
if (runtime.tipping_point && globals.timer_count >= runtime.tipping_point) {
os_yield();
switch_os_yield();
return;
}
#ifdef DISABLE_1MS_COND
......@@ -633,7 +633,7 @@ static switch_status_t timer_next(switch_timer_t *timer)
check_roll();
if (runtime.tipping_point && globals.timer_count >= runtime.tipping_point) {
os_yield();
switch_os_yield();
globals.use_cond_yield = 0;
} else {
if (globals.use_cond_yield == 1) {
......@@ -884,7 +884,7 @@ SWITCH_MODULE_RUNTIME_FUNCTION(softtimer_runtime)
}
if (runtime.tipping_point && globals.timer_count >= runtime.tipping_point) {
os_yield();
switch_os_yield();
} else {
if (tfd > -1 && globals.RUNNING == 1) {
uint64_t exp;
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论