提交 9d4b72b2 authored 作者: Anthony Minessale's avatar Anthony Minessale

fold the last round of sql manager changes back into the core and use it for the…

fold the last round of sql manager changes back into the core and use it for the central db and simplify the algorithm
上级 3ef548ee
......@@ -256,8 +256,6 @@ struct switch_runtime {
switch_profile_timer_t *profile_timer;
double profile_time;
double min_idle_time;
int sql_buffer_len;
int max_sql_buffer_len;
switch_dbtype_t odbc_dbtype;
char hostname[256];
char *switchname;
......
......@@ -2205,8 +2205,9 @@ SWITCH_DECLARE(switch_status_t) switch_core_chat_send(const char *dest_proto, sw
SWITCH_DECLARE(switch_status_t) switch_core_chat_deliver(const char *dest_proto, switch_event_t **message_event);
SWITCH_DECLARE(switch_status_t) switch_ivr_preprocess_session(switch_core_session_t *session, const char *cmds);
SWITCH_DECLARE(void) switch_core_sqldb_stop_thread(void);
SWITCH_DECLARE(void) switch_core_sqldb_start_thread(void);
SWITCH_DECLARE(void) switch_core_sqldb_pause(void);
SWITCH_DECLARE(void) switch_core_sqldb_resume(void);
///\}
......
......@@ -1470,11 +1470,10 @@ SWITCH_DECLARE(switch_status_t) switch_core_init(switch_core_flag_t flags, switc
runtime.db_handle_timeout = 5000000;
runtime.runlevel++;
runtime.sql_buffer_len = 1024 * 32;
runtime.max_sql_buffer_len = 1024 * 1024;
runtime.dummy_cng_frame.data = runtime.dummy_data;
runtime.dummy_cng_frame.datalen = sizeof(runtime.dummy_data);
runtime.dummy_cng_frame.buflen = sizeof(runtime.dummy_data);
runtime.dbname = "core";
switch_set_flag((&runtime.dummy_cng_frame), SFF_CNG);
switch_set_flag((&runtime), SCF_AUTO_SCHEMAS);
switch_set_flag((&runtime), SCF_CLEAR_SQL);
......@@ -1754,37 +1753,6 @@ static void switch_load_core_config(const char *file)
} else if (!strcasecmp(var, "multiple-registrations")) {
runtime.multiple_registrations = switch_true(val);
} else if (!strcasecmp(var, "sql-buffer-len")) {
int tmp = atoi(val);
if (end_of(val) == 'k') {
tmp *= 1024;
} else if (end_of(val) == 'm') {
tmp *= (1024 * 1024);
}
if (tmp >= 32000 && tmp < 10500000) {
runtime.sql_buffer_len = tmp;
} else {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "sql-buffer-len: Value is not within rage 32k to 10m\n");
}
} else if (!strcasecmp(var, "max-sql-buffer-len")) {
int tmp = atoi(val);
if (end_of(val) == 'k') {
tmp *= 1024;
} else if (end_of(val) == 'm') {
tmp *= (1024 * 1024);
}
if (tmp < runtime.sql_buffer_len) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Value is not larger than sql-buffer-len\n");
} else if (tmp >= 32000 && tmp < 10500000) {
runtime.max_sql_buffer_len = tmp;
} else {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "max-sql-buffer-len: Value is not within rage 32k to 10m\n");
}
} else if (!strcasecmp(var, "auto-create-schemas")) {
if (switch_true(val)) {
switch_set_flag((&runtime), SCF_AUTO_SCHEMAS);
......@@ -2256,9 +2224,9 @@ SWITCH_DECLARE(int32_t) switch_core_session_ctl(switch_session_ctl_t cmd, void *
break;
case SCSC_SQL:
if (oldintval) {
switch_core_sqldb_start_thread();
switch_core_sqldb_resume();
} else {
switch_core_sqldb_stop_thread();
switch_core_sqldb_pause();
}
break;
case SCSC_PAUSE_ALL:
......
......@@ -56,26 +56,25 @@ struct switch_cache_db_handle {
};
static struct {
switch_cache_db_handle_t *event_db;
switch_queue_t *sql_queue[4];
switch_memory_pool_t *memory_pool;
switch_thread_t *thread;
switch_thread_t *db_thread;
int thread_running;
int db_thread_running;
switch_bool_t manage;
switch_mutex_t *io_mutex;
switch_mutex_t *dbh_mutex;
switch_mutex_t *ctl_mutex;
switch_cache_db_handle_t *handle_pool;
switch_thread_cond_t *cond;
switch_mutex_t *cond_mutex;
uint32_t total_handles;
uint32_t total_used_handles;
switch_cache_db_handle_t *dbh;
switch_sql_queue_manager_t *qm;
int paused;
} sql_manager;
static void switch_core_sqldb_start_thread(void);
static void switch_core_sqldb_stop_thread(void);
static switch_cache_db_handle_t *create_handle(switch_cache_db_handle_type_t type)
{
switch_cache_db_handle_t *new_dbh = NULL;
......@@ -575,19 +574,6 @@ static switch_status_t switch_cache_db_execute_sql_real(switch_cache_db_handle_t
return status;
}
static void wake_thread(int force)
{
if (force) {
switch_thread_cond_signal(sql_manager.cond);
return;
}
if (switch_mutex_trylock(sql_manager.cond_mutex) == SWITCH_STATUS_SUCCESS) {
switch_thread_cond_signal(sql_manager.cond);
switch_mutex_unlock(sql_manager.cond_mutex);
}
}
/**
OMFG you cruel bastards. Who chooses 64k as a max buffer len for a sql statement, have you ever heard of transactions?
**/
......@@ -1195,7 +1181,6 @@ static void *SWITCH_THREAD_FUNC switch_core_sql_db_thread(switch_thread_t *threa
while (sql_manager.db_thread_running == 1) {
if (++sec == SQL_CACHE_TIMEOUT) {
sql_close(switch_epoch_time_now(NULL));
wake_thread(0);
sec = 0;
}
......@@ -1217,7 +1202,6 @@ struct switch_sql_queue_manager {
const char *name;
switch_cache_db_handle_t *event_db;
switch_queue_t **sql_queue;
uint32_t *pre_written;
uint32_t *written;
uint32_t numq;
char *dsn;
......@@ -1281,6 +1265,7 @@ SWITCH_DECLARE(switch_status_t) switch_sql_queue_manager_stop(switch_sql_queue_m
}
if (qm->thread) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "%s Stopping SQL thread.\n", qm->name);
switch_thread_join(&status, qm->thread);
qm->thread = NULL;
status = SWITCH_STATUS_SUCCESS;
......@@ -1294,7 +1279,7 @@ SWITCH_DECLARE(switch_status_t) switch_sql_queue_manager_start(switch_sql_queue_
switch_threadattr_t *thd_attr;
if (!qm->thread_running) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Starting SQL thread.\n");
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "%s Starting SQL thread.\n", qm->name);
switch_threadattr_create(&thd_attr, qm->pool);
switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE);
switch_threadattr_priority_set(thd_attr, SWITCH_PRI_NORMAL);
......@@ -1306,24 +1291,40 @@ SWITCH_DECLARE(switch_status_t) switch_sql_queue_manager_start(switch_sql_queue_
}
static void do_flush(switch_queue_t *q, switch_cache_db_handle_t *dbh)
{
void *pop = NULL;
while (switch_queue_trypop(q, &pop) == SWITCH_STATUS_SUCCESS) {
if (pop) {
if (dbh) {
switch_cache_db_execute_sql(dbh, (char *) pop, NULL);
}
free(pop);
}
}
}
SWITCH_DECLARE(switch_status_t) switch_sql_queue_manager_destroy(switch_sql_queue_manager_t **qmp)
{
switch_sql_queue_manager_t *qm;
switch_status_t status = SWITCH_STATUS_SUCCESS;
switch_memory_pool_t *pool;
void *pop;
uint32_t i;
switch_assert(qmp);
qm = *qmp;
*qmp = NULL;
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "%s Destroying SQL queue.\n", qm->name);
switch_sql_queue_manager_stop(qm);
for(i = 0; i < qm->numq; i++) {
while (switch_queue_trypop(qm->sql_queue[i], &pop) == SWITCH_STATUS_SUCCESS) {
switch_safe_free(pop);
}
do_flush(qm->sql_queue[i], NULL);
}
pool = qm->pool;
......@@ -1335,11 +1336,14 @@ SWITCH_DECLARE(switch_status_t) switch_sql_queue_manager_destroy(switch_sql_queu
SWITCH_DECLARE(switch_status_t) switch_sql_queue_manager_push(switch_sql_queue_manager_t *qm, const char *sql, uint32_t pos, switch_bool_t dup)
{
if (!qm->thread_running) {
return SWITCH_STATUS_FALSE;
if (sql_manager.paused) {
if (!dup) free((char *)sql);
qm_wake(qm);
return SWITCH_STATUS_SUCCESS;
}
if (sql_manager.thread_running != 1) {
if (!qm->thread_running) {
if (!dup) free((char *)sql);
return SWITCH_STATUS_FALSE;
}
......@@ -1362,11 +1366,14 @@ SWITCH_DECLARE(switch_status_t) switch_sql_queue_manager_push_confirm(switch_sql
int size, x = 0, sanity = 0;
uint32_t written, want;
if (!qm->thread_running) {
return SWITCH_STATUS_FALSE;
if (sql_manager.paused) {
if (!dup) free((char *)sql);
qm_wake(qm);
return SWITCH_STATUS_SUCCESS;
}
if (sql_manager.thread_running != 1) {
if (!qm->thread_running) {
if (!dup) free((char *)sql);
return SWITCH_STATUS_FALSE;
}
......@@ -1430,7 +1437,6 @@ SWITCH_DECLARE(switch_status_t) switch_sql_queue_manager_init_name(const char *n
qm->sql_queue = switch_core_alloc(qm->pool, sizeof(switch_queue_t *) * numq);
qm->written = switch_core_alloc(qm->pool, sizeof(uint32_t) * numq);
qm->pre_written = switch_core_alloc(qm->pool, sizeof(uint32_t) * numq);
for (i = 0; i < qm->numq; i++) {
switch_queue_create(&qm->sql_queue[i], SWITCH_SQL_QUEUE_LEN, qm->pool);
......@@ -1448,479 +1454,213 @@ SWITCH_DECLARE(switch_status_t) switch_sql_queue_manager_init_name(const char *n
return SWITCH_STATUS_SUCCESS;
}
static void *SWITCH_THREAD_FUNC switch_user_sql_thread(switch_thread_t *thread, void *obj)
{
void *pop = NULL;
uint32_t iterations = 0;
uint8_t trans = 0;
uint32_t target = 20000;
switch_size_t len = 0, sql_len = runtime.sql_buffer_len;
char *tmp, *sqlbuf = (char *) malloc(sql_len);
char *sql = NULL, *save_sql = NULL;
switch_size_t newlen;
int lc = 0, wrote = 0, do_sleep = 1;
uint32_t sanity = 120;
int auto_pause = 0;
switch_sql_queue_manager_t *qm = (switch_sql_queue_manager_t *) obj;
uint32_t i;
switch_assert(sqlbuf);
while (!qm->event_db) {
if (switch_cache_db_get_db_handle_dsn(&qm->event_db, qm->dsn) == SWITCH_STATUS_SUCCESS && qm->event_db)
break;
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "%s Error getting db handle, Retrying\n", qm->name);
switch_yield(500000);
sanity--;
}
static uint32_t do_trans(switch_cache_db_handle_t *dbh,
switch_queue_t *q,
switch_mutex_t *mutex,
const char *pre_trans_execute,
const char *post_trans_execute,
const char *inner_pre_trans_execute,
const char *inner_post_trans_execute)
{
char *errmsg = NULL;
void *pop;
switch_status_t status;
uint32_t ttl = 0;
if (!qm->event_db) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "%s Error getting db handle\n", qm->name);
return NULL;
if (!switch_queue_size(q)) {
return 0;
}
qm->thread_running = 1;
switch_mutex_lock(qm->cond_mutex);
switch (qm->event_db->type) {
case SCDB_TYPE_PGSQL:
break;
case SCDB_TYPE_ODBC:
break;
switch(dbh->type) {
case SCDB_TYPE_CORE_DB:
{
switch_cache_db_execute_sql(qm->event_db, "PRAGMA synchronous=OFF;", NULL);
switch_cache_db_execute_sql(qm->event_db, "PRAGMA count_changes=OFF;", NULL);
switch_cache_db_execute_sql(qm->event_db, "PRAGMA temp_store=MEMORY;", NULL);
switch_cache_db_execute_sql(qm->event_db, "PRAGMA journal_mode=OFF;", NULL);
}
break;
}
while (qm->thread_running == 1) {
int proceed = !!save_sql;
int pindex = -1;
if (!proceed) {
for (i = 0; i < qm->numq; i++) {
switch_status_t status;
switch_mutex_lock(qm->mutex);
status = switch_queue_trypop(qm->sql_queue[i], &pop);
switch_mutex_unlock(qm->mutex);
if (status == SWITCH_STATUS_SUCCESS) {
if (sql_manager.thread_running != 1) {
if (pop) {
switch_cache_db_execute_sql(qm->event_db, (char *) pop, NULL);
free(pop);
pop = NULL;
switch_cache_db_execute_sql_real(dbh, "BEGIN", &errmsg);
}
} else {
pindex = i;
proceed = 1;
break;
}
}
}
}
if (proceed) {
if (save_sql) {
sql = save_sql;
save_sql = NULL;
} else if ((sql = (char *) pop)) {
pop = NULL;
}
if (sql) {
newlen = strlen(sql) + 2;
if (iterations == 0) {
trans = 1;
}
if (len + newlen + 1 > sql_len) {
int new_mlen = len + newlen + 10240;
if (new_mlen < runtime.max_sql_buffer_len) {
sql_len = new_mlen;
if (switch_test_flag((&runtime), SCF_DEBUG_SQL)) {
for (i = 0; i < qm->numq; i++) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT,
"%s REALLOC QUEUE %ld %d %d\n",
qm->name,
(long int)sql_len,
i,
switch_queue_size(qm->sql_queue[i]));
case SCDB_TYPE_ODBC:
{
switch_odbc_status_t result;
if ((result = switch_odbc_SQLSetAutoCommitAttr(dbh->native_handle.odbc_dbh, 0)) != SWITCH_ODBC_SUCCESS) {
char tmp[100];
switch_snprintfv(tmp, sizeof(tmp), "%q-%i", "Unable to Set AutoCommit Off", result);
errmsg = strdup(tmp);
}
}
if (!(tmp = realloc(sqlbuf, sql_len))) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "%s SQL thread ending on mem err\n", qm->name);
abort();
break;
}
sqlbuf = tmp;
} else {
if (switch_test_flag((&runtime), SCF_DEBUG_SQL)) {
for (i = 0; i < qm->numq; i++) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT,
"%s SAVE QUEUE %d %d\n",
qm->name,
i,
switch_queue_size(qm->sql_queue[i]));
case SCDB_TYPE_PGSQL:
{
switch_pgsql_status_t result;
if ((result = switch_pgsql_SQLSetAutoCommitAttr(dbh->native_handle.pgsql_dbh, 0)) != SWITCH_PGSQL_SUCCESS) {
char tmp[100];
switch_snprintfv(tmp, sizeof(tmp), "%q-%i", "Unable to Set AutoCommit Off", result);
errmsg = strdup(tmp);
}
}
save_sql = sql;
sql = NULL;
lc = 0;
goto skip;
}
}
switch_mutex_lock(qm->mutex);
qm->pre_written[pindex]++;
switch_mutex_unlock(qm->mutex);
iterations++;
sprintf(sqlbuf + len, "%s;\n", sql);
len += newlen;
free(sql);
sql = NULL;
} else {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "%s, SQL thread ending\n", qm->name);
break;
}
}
lc = qm_ttl(qm);
if (lc > SWITCH_SQL_QUEUE_PAUSE_LEN) {
if (!auto_pause) {
auto_pause = 1;
switch_core_session_ctl(SCSC_PAUSE_INBOUND, &auto_pause);
auto_pause = 1;
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "%s, SQL Queue overflowing [%d], Pausing calls.\n", qm->name, lc);
}
} else {
if (auto_pause && lc < 1000) {
auto_pause = 0;
switch_core_session_ctl(SCSC_PAUSE_INBOUND, &auto_pause);
auto_pause = 0;
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "%s, SQL Queue back to normal size, resuming..\n", qm->name);
}
if (errmsg) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "ERROR [%s]\n", errmsg);
free(errmsg);
goto end;
}
skip:
wrote = 0;
for(;;) {
if (mutex) switch_mutex_lock(mutex);
status = switch_queue_trypop(q, &pop);
if (mutex) switch_mutex_unlock(mutex);
if (trans && iterations && (iterations > target || !lc)) {
if (status != SWITCH_STATUS_SUCCESS || !pop) break;
if (switch_test_flag((&runtime), SCF_DEBUG_SQL)) {
char line[128] = "";
int l;
switch_snprintf(line, sizeof(line), "%s RUN QUEUE ", qm->name);
for (i = 0; i < qm->numq; i++) {
l = strlen(line);
switch_snprintf(line + l, sizeof(line) - l, "%d:%d ", i, switch_queue_size(qm->sql_queue[i]));
if ((status = switch_cache_db_execute_sql(dbh, (char *) pop, NULL)) == SWITCH_STATUS_SUCCESS) {
ttl++;
}
free(pop);
l = strlen(line);
switch_snprintf(line + l, sizeof(line) - l, "%d\n", iterations);
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "%s", line);
}
if (switch_cache_db_persistant_execute_trans_full(qm->event_db, sqlbuf, 1,
qm->pre_trans_execute,
qm->post_trans_execute,
qm->inner_pre_trans_execute,
qm->inner_post_trans_execute
) != SWITCH_STATUS_SUCCESS) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "%s SQL thread unable to commit transaction, records lost!\n", qm->name);
}
if (switch_test_flag((&runtime), SCF_DEBUG_SQL)) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "%s DONE\n", qm->name);
if (status != SWITCH_STATUS_SUCCESS) break;
}
iterations = 0;
trans = 0;
len = 0;
*sqlbuf = '\0';
lc = 0;
if (do_sleep) {
switch_yield(200000);
} else {
switch_yield(1000);
}
wrote = 1;
}
lc = qm_ttl(qm);
switch_mutex_lock(qm->mutex);
for (i = 0; i < qm->numq; i++) {
qm->written[i] += qm->pre_written[i];
qm->pre_written[i] = 0;
}
switch_mutex_unlock(qm->mutex);
end:
if (!lc) {
switch_thread_cond_wait(qm->cond, qm->cond_mutex);
} else if (wrote) {
if (lc > 2000) {
do_sleep = 0;
} else {
do_sleep = 1;
}
}
switch(dbh->type) {
case SCDB_TYPE_CORE_DB:
{
switch_cache_db_execute_sql_real(dbh, "COMMIT", NULL);
}
switch_mutex_unlock(qm->cond_mutex);
for(i = 0; i < qm->numq; i++) {
while (switch_queue_trypop(qm->sql_queue[i], &pop) == SWITCH_STATUS_SUCCESS) {
if (pop) {
switch_cache_db_execute_sql(qm->event_db, (char *) pop, NULL);
free(pop);
break;
case SCDB_TYPE_ODBC:
{
switch_odbc_SQLEndTran(dbh->native_handle.odbc_dbh, 1);
switch_odbc_SQLSetAutoCommitAttr(dbh->native_handle.odbc_dbh, 1);
}
break;
case SCDB_TYPE_PGSQL:
{
switch_pgsql_SQLEndTran(dbh->native_handle.pgsql_dbh, 1);
switch_pgsql_SQLSetAutoCommitAttr(dbh->native_handle.pgsql_dbh, 1);
switch_pgsql_finish_results(dbh->native_handle.pgsql_dbh);
}
break;
}
free(sqlbuf);
qm->thread_running = 0;
switch_cache_db_release_db_handle(&qm->event_db);
return NULL;
return ttl;
}
static void *SWITCH_THREAD_FUNC switch_core_sql_thread(switch_thread_t *thread, void *obj)
static void *SWITCH_THREAD_FUNC switch_user_sql_thread(switch_thread_t *thread, void *obj)
{
void *pop = NULL;
uint32_t iterations = 0;
uint8_t trans = 0;
uint32_t target = 20000;
switch_size_t len = 0, sql_len = runtime.sql_buffer_len;
char *tmp, *sqlbuf = (char *) malloc(sql_len);
char *sql = NULL, *save_sql = NULL;
switch_size_t newlen;
int lc = 0, wrote = 0, do_sleep = 1;
uint32_t sanity = 120;
int auto_pause = 0;
switch_assert(sqlbuf);
uint32_t sanity = 120;
switch_sql_queue_manager_t *qm = (switch_sql_queue_manager_t *) obj;
uint32_t i;
while (!sql_manager.event_db) {
if (switch_core_db_handle(&sql_manager.event_db) == SWITCH_STATUS_SUCCESS && sql_manager.event_db)
while (!qm->event_db) {
if (switch_cache_db_get_db_handle_dsn(&qm->event_db, qm->dsn) == SWITCH_STATUS_SUCCESS && qm->event_db)
break;
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Error getting core db, Retrying\n");
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "%s Error getting db handle, Retrying\n", qm->name);
switch_yield(500000);
sanity--;
}
if (!sql_manager.event_db) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Error getting core db Disabling core sql functionality\n");
if (!qm->event_db) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "%s Error getting db handle\n", qm->name);
return NULL;
}
sql_manager.thread_running = 1;
qm->thread_running = 1;
switch_mutex_lock(sql_manager.cond_mutex);
switch_mutex_lock(qm->cond_mutex);
switch (sql_manager.event_db->type) {
switch (qm->event_db->type) {
case SCDB_TYPE_PGSQL:
break;
case SCDB_TYPE_ODBC:
break;
case SCDB_TYPE_CORE_DB:
{
switch_cache_db_execute_sql(sql_manager.event_db, "PRAGMA synchronous=OFF;", NULL);
switch_cache_db_execute_sql(sql_manager.event_db, "PRAGMA count_changes=OFF;", NULL);
switch_cache_db_execute_sql(sql_manager.event_db, "PRAGMA temp_store=MEMORY;", NULL);
switch_cache_db_execute_sql(sql_manager.event_db, "PRAGMA journal_mode=OFF;", NULL);
switch_cache_db_execute_sql(qm->event_db, "PRAGMA synchronous=OFF;", NULL);
switch_cache_db_execute_sql(qm->event_db, "PRAGMA count_changes=OFF;", NULL);
switch_cache_db_execute_sql(qm->event_db, "PRAGMA temp_store=MEMORY;", NULL);
switch_cache_db_execute_sql(qm->event_db, "PRAGMA journal_mode=OFF;", NULL);
}
break;
}
while (sql_manager.thread_running == 1) {
if (save_sql ||
switch_queue_trypop(sql_manager.sql_queue[0], &pop) == SWITCH_STATUS_SUCCESS ||
switch_queue_trypop(sql_manager.sql_queue[1], &pop) == SWITCH_STATUS_SUCCESS ||
switch_queue_trypop(sql_manager.sql_queue[2], &pop) == SWITCH_STATUS_SUCCESS ||
switch_queue_trypop(sql_manager.sql_queue[3], &pop) == SWITCH_STATUS_SUCCESS
) {
if (save_sql) {
sql = save_sql;
save_sql = NULL;
} else if ((sql = (char *) pop)) {
pop = NULL;
}
if (sql) {
newlen = strlen(sql) + 2;
if (iterations == 0) {
trans = 1;
}
if (len + newlen + 1 > sql_len) {
int new_mlen = len + newlen + 10240;
if (new_mlen < runtime.max_sql_buffer_len) {
sql_len = new_mlen;
if (switch_test_flag((&runtime), SCF_DEBUG_SQL)) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT,
"REALLOC %ld %d %d\n", (long int)sql_len, switch_queue_size(sql_manager.sql_queue[0]),
switch_queue_size(sql_manager.sql_queue[1]));
}
if (!(tmp = realloc(sqlbuf, sql_len))) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "SQL thread ending on mem err\n");
abort();
break;
}
sqlbuf = tmp;
} else {
if (switch_test_flag((&runtime), SCF_DEBUG_SQL)) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT,
"SAVE %d %d %d %d\n",
switch_queue_size(sql_manager.sql_queue[0]),
switch_queue_size(sql_manager.sql_queue[1]),
switch_queue_size(sql_manager.sql_queue[2]),
switch_queue_size(sql_manager.sql_queue[3])
);
}
save_sql = sql;
sql = NULL;
lc = 0;
goto skip;
}
}
while (qm->thread_running == 1) {
int lc;
int i;
uint32_t iterations = 0;
iterations++;
sprintf(sqlbuf + len, "%s;\n", sql);
len += newlen;
free(sql);
sql = NULL;
} else {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "SQL thread ending\n");
break;
if (sql_manager.paused) {
for (i = 0; i < qm->numq; i++) {
do_flush(qm->sql_queue[i], NULL);
}
goto check;
}
lc = switch_queue_size(sql_manager.sql_queue[0]) + switch_queue_size(sql_manager.sql_queue[1]) +
switch_queue_size(sql_manager.sql_queue[2]) + switch_queue_size(sql_manager.sql_queue[3]);
for (i = 0; i < qm->numq; i++) {
uint32_t written = do_trans(qm->event_db, qm->sql_queue[i], NULL,
qm->pre_trans_execute,
qm->post_trans_execute,
qm->inner_pre_trans_execute,
qm->inner_post_trans_execute);
iterations += written;
if (lc > SWITCH_SQL_QUEUE_PAUSE_LEN) {
if (!auto_pause) {
auto_pause = 1;
switch_core_session_ctl(SCSC_PAUSE_INBOUND, &auto_pause);
auto_pause = 1;
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "SQL Queue overflowing [%d], Pausing calls.\n", lc);
}
} else {
if (auto_pause && lc < 1000) {
auto_pause = 0;
switch_core_session_ctl(SCSC_PAUSE_INBOUND, &auto_pause);
auto_pause = 0;
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "SQL Queue back to normal size, resuming..\n");
}
switch_mutex_lock(qm->mutex);
qm->written[i] += written;
switch_mutex_unlock(qm->mutex);
}
skip:
wrote = 0;
if (trans && iterations && (iterations > target || !lc)) {
if (switch_test_flag((&runtime), SCF_DEBUG_SQL)) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT,
"RUN %d %d %d %d %d\n",
switch_queue_size(sql_manager.sql_queue[0]),
switch_queue_size(sql_manager.sql_queue[1]),
switch_queue_size(sql_manager.sql_queue[2]),
switch_queue_size(sql_manager.sql_queue[3]),
iterations);
}
if (switch_cache_db_persistant_execute_trans_full(sql_manager.event_db, sqlbuf, 1,
runtime.core_db_pre_trans_execute,
runtime.core_db_post_trans_execute,
runtime.core_db_inner_pre_trans_execute,
runtime.core_db_inner_post_trans_execute
) != SWITCH_STATUS_SUCCESS) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "SQL thread unable to commit transaction, records lost!\n");
}
if (switch_test_flag((&runtime), SCF_DEBUG_SQL)) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "DONE\n");
}
char line[128] = "";
int l;
switch_snprintf(line, sizeof(line), "%s RUN QUEUE [", qm->name);
iterations = 0;
trans = 0;
len = 0;
*sqlbuf = '\0';
lc = 0;
if (do_sleep) {
switch_yield(200000);
} else {
switch_yield(1000);
}
wrote = 1;
for (i = 0; i < qm->numq; i++) {
l = strlen(line);
switch_snprintf(line + l, sizeof(line) - l, "%d%s", switch_queue_size(qm->sql_queue[i]), i == qm->numq - 1 ? "" : "|");
}
lc = switch_queue_size(sql_manager.sql_queue[0]) + switch_queue_size(sql_manager.sql_queue[1]) +
switch_queue_size(sql_manager.sql_queue[2]) + switch_queue_size(sql_manager.sql_queue[3]);
if (!lc) {
switch_thread_cond_wait(sql_manager.cond, sql_manager.cond_mutex);
} else if (wrote) {
if (lc > 2000) {
do_sleep = 0;
} else {
do_sleep = 1;
}
}
l = strlen(line);
switch_snprintf(line + l, sizeof(line) - l, "]--[%d]\n", iterations);
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "%s", line);
}
switch_mutex_unlock(sql_manager.cond_mutex);
check:
while (switch_queue_trypop(sql_manager.sql_queue[0], &pop) == SWITCH_STATUS_SUCCESS) {
free(pop);
}
lc = qm_ttl(qm);
while (switch_queue_trypop(sql_manager.sql_queue[1], &pop) == SWITCH_STATUS_SUCCESS) {
free(pop);
if (!lc) {
switch_thread_cond_wait(qm->cond, qm->cond_mutex);
} else if (lc < 2000) {
switch_yield(200000);
}
while (switch_queue_trypop(sql_manager.sql_queue[2], &pop) == SWITCH_STATUS_SUCCESS) {
free(pop);
}
while (switch_queue_trypop(sql_manager.sql_queue[3], &pop) == SWITCH_STATUS_SUCCESS) {
free(pop);
}
switch_mutex_unlock(qm->cond_mutex);
free(sqlbuf);
for(i = 0; i < qm->numq; i++) {
do_flush(qm->sql_queue[i], qm->event_db);
}
sql_manager.thread_running = 0;
qm->thread_running = 0;
switch_cache_db_release_db_handle(&sql_manager.event_db);
switch_cache_db_release_db_handle(&qm->event_db);
return NULL;
}
static char *parse_presence_data_cols(switch_event_t *event)
{
char *cols[128] = { 0 };
......@@ -2388,12 +2128,11 @@ static void core_event_handler(switch_event_t *event)
for (i = 0; i < sql_idx; i++) {
if (switch_stristr("update channels", sql[i]) || switch_stristr("delete from channels", sql[i])) {
switch_queue_push(sql_manager.sql_queue[1], sql[i]);
switch_sql_queue_manager_push(sql_manager.qm, sql[i], 1, SWITCH_FALSE);
} else {
switch_queue_push(sql_manager.sql_queue[0], sql[i]);
switch_sql_queue_manager_push(sql_manager.qm, sql[i], 0, SWITCH_FALSE);
}
sql[i] = NULL;
wake_thread(0);
}
}
}
......@@ -2770,6 +2509,11 @@ SWITCH_DECLARE(int) switch_core_recovery_recover(const char *technology, const c
switch_cache_db_handle_t *dbh;
int r = 0;
if (!sql_manager.manage) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "DATABASE NOT AVAIALBLE, REVCOVERY NOT POSSIBLE\n");
return 0;
}
if (switch_core_db_handle(&dbh) != SWITCH_STATUS_SUCCESS) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error Opening DB!\n");
return 0;
......@@ -2839,16 +2583,21 @@ SWITCH_DECLARE(int) switch_core_recovery_recover(const char *technology, const c
SWITCH_DECLARE(switch_cache_db_handle_type_t) switch_core_dbtype(void)
{
return sql_manager.event_db->type;
return sql_manager.qm ? sql_manager.qm->event_db->type : SCDB_TYPE_CORE_DB;
}
SWITCH_DECLARE(void) switch_core_sql_exec(const char *sql)
{
if (!sql_manager.manage) {
return;
}
if (!switch_test_flag((&runtime), SCF_USE_SQL)) {
return;
}
switch_queue_push(sql_manager.sql_queue[3], strdup(sql));
switch_sql_queue_manager_push(sql_manager.qm, sql, 3, SWITCH_TRUE);
}
SWITCH_DECLARE(void) switch_core_recovery_untrack(switch_core_session_t *session, switch_bool_t force)
......@@ -2856,6 +2605,10 @@ SWITCH_DECLARE(void) switch_core_recovery_untrack(switch_core_session_t *session
char *sql = NULL;
switch_channel_t *channel = switch_core_session_get_channel(session);
if (!sql_manager.manage) {
return;
}
if (!switch_channel_test_flag(channel, CF_ANSWERED) || switch_channel_get_state(channel) < CS_SOFT_EXECUTE) {
return;
}
......@@ -2878,7 +2631,7 @@ SWITCH_DECLARE(void) switch_core_recovery_untrack(switch_core_session_t *session
switch_core_get_uuid(), switch_core_session_get_uuid(session));
}
switch_queue_push(sql_manager.sql_queue[3], sql);
switch_sql_queue_manager_push(sql_manager.qm, sql, 3, SWITCH_FALSE);
switch_channel_clear_flag(channel, CF_TRACKED);
}
......@@ -2894,6 +2647,9 @@ SWITCH_DECLARE(void) switch_core_recovery_track(switch_core_session_t *session)
const char *profile_name;
const char *technology;
if (!sql_manager.manage) {
return;
}
if (!switch_channel_test_flag(channel, CF_ANSWERED) || switch_channel_get_state(channel) < CS_SOFT_EXECUTE) {
return;
......@@ -2921,7 +2677,7 @@ SWITCH_DECLARE(void) switch_core_recovery_track(switch_core_session_t *session)
switch_str_nil(profile_name), switch_core_get_hostname(), switch_core_session_get_uuid(session), xml_cdr_text);
}
switch_queue_push(sql_manager.sql_queue[2], sql);
switch_sql_queue_manager_push(sql_manager.qm, sql, 2, SWITCH_FALSE);
free(xml_cdr_text);
switch_channel_set_flag(channel, CF_TRACKED);
......@@ -2950,7 +2706,7 @@ SWITCH_DECLARE(switch_status_t) switch_core_add_registration(const char *user, c
user, realm, switch_core_get_switchname());
}
switch_queue_push(sql_manager.sql_queue[0], sql);
switch_sql_queue_manager_push(sql_manager.qm, sql, 0, SWITCH_FALSE);
if ( !zstr(metadata) ) {
sql = switch_mprintf("insert into registrations (reg_user,realm,token,url,expires,network_ip,network_port,network_proto,hostname,metadata) "
......@@ -2982,7 +2738,7 @@ SWITCH_DECLARE(switch_status_t) switch_core_add_registration(const char *user, c
}
switch_queue_push(sql_manager.sql_queue[0], sql);
switch_sql_queue_manager_push(sql_manager.qm, sql, 0, SWITCH_FALSE);
return SWITCH_STATUS_SUCCESS;
}
......@@ -3002,7 +2758,8 @@ SWITCH_DECLARE(switch_status_t) switch_core_del_registration(const char *user, c
sql = switch_mprintf("delete from registrations where reg_user='%q' and realm='%q' and hostname='%q'", user, realm, switch_core_get_switchname());
}
switch_queue_push(sql_manager.sql_queue[0], sql);
switch_sql_queue_manager_push(sql_manager.qm, sql, 0, SWITCH_FALSE);
return SWITCH_STATUS_SUCCESS;
}
......@@ -3025,7 +2782,7 @@ SWITCH_DECLARE(switch_status_t) switch_core_expire_registration(int force)
sql = switch_mprintf("delete from registrations where expires > 0 and expires <= %ld and hostname='%q'", now, switch_core_get_switchname());
}
switch_queue_push(sql_manager.sql_queue[0], sql);
switch_sql_queue_manager_push(sql_manager.qm, sql, 0, SWITCH_FALSE);
return SWITCH_STATUS_SUCCESS;
......@@ -3034,20 +2791,14 @@ SWITCH_DECLARE(switch_status_t) switch_core_expire_registration(int force)
switch_status_t switch_core_sqldb_start(switch_memory_pool_t *pool, switch_bool_t manage)
{
switch_threadattr_t *thd_attr;
uint32_t sanity = 400;
sql_manager.memory_pool = pool;
sql_manager.manage = manage;
switch_mutex_init(&sql_manager.dbh_mutex, SWITCH_MUTEX_NESTED, sql_manager.memory_pool);
switch_mutex_init(&sql_manager.io_mutex, SWITCH_MUTEX_NESTED, sql_manager.memory_pool);
switch_mutex_init(&sql_manager.cond_mutex, SWITCH_MUTEX_NESTED, sql_manager.memory_pool);
switch_mutex_init(&sql_manager.ctl_mutex, SWITCH_MUTEX_NESTED, sql_manager.memory_pool);
switch_thread_cond_create(&sql_manager.cond, sql_manager.memory_pool);
if (!sql_manager.manage) goto skip;
top:
......@@ -3117,6 +2868,15 @@ switch_status_t switch_core_sqldb_start(switch_memory_pool_t *pool, switch_bool_
switch_cache_db_test_reactive(sql_manager.dbh, "select metadata from registrations", NULL, "ALTER TABLE registrations ADD COLUMN metadata VARCHAR(256)");
switch_cache_db_test_reactive(sql_manager.dbh, "select hostname from recovery", "DROP TABLE recovery", recovery_sql);
switch_cache_db_execute_sql(sql_manager.dbh, "create index recovery1 on recovery(technology)", NULL);
switch_cache_db_execute_sql(sql_manager.dbh, "create index recovery2 on recovery(profile_name)", NULL);
switch_cache_db_execute_sql(sql_manager.dbh, "create index recovery3 on recovery(uuid)", NULL);
switch_cache_db_execute_sql(sql_manager.dbh, "create index recovery3 on recovery(runtime_uuid)", NULL);
switch (sql_manager.dbh->type) {
case SCDB_TYPE_PGSQL:
case SCDB_TYPE_ODBC:
......@@ -3227,126 +2987,76 @@ switch_status_t switch_core_sqldb_start(switch_memory_pool_t *pool, switch_bool_
switch_event_bind("core_db", SWITCH_EVENT_NAT, SWITCH_EVENT_SUBCLASS_ANY, core_event_handler, NULL);
#endif
switch_queue_create(&sql_manager.sql_queue[0], SWITCH_SQL_QUEUE_LEN, sql_manager.memory_pool);
switch_queue_create(&sql_manager.sql_queue[1], SWITCH_SQL_QUEUE_LEN, sql_manager.memory_pool);
switch_queue_create(&sql_manager.sql_queue[2], SWITCH_SQL_QUEUE_LEN, sql_manager.memory_pool);
switch_queue_create(&sql_manager.sql_queue[3], SWITCH_SQL_QUEUE_LEN, sql_manager.memory_pool);
switch_threadattr_create(&thd_attr, sql_manager.memory_pool);
switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE);
switch_threadattr_priority_set(thd_attr, SWITCH_PRI_REALTIME);
switch_core_sqldb_start_thread();
switch_thread_create(&sql_manager.db_thread, thd_attr, switch_core_sql_db_thread, NULL, sql_manager.memory_pool);
while (sql_manager.manage && !sql_manager.thread_running && --sanity) {
switch_yield(10000);
}
}
return SWITCH_STATUS_SUCCESS;
}
SWITCH_DECLARE(void) switch_core_sqldb_pause(void)
{
if (sql_manager.paused) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "SQL is already paused.\n");
}
sql_manager.paused = 1;
}
SWITCH_DECLARE(void) switch_core_sqldb_stop_thread(void)
SWITCH_DECLARE(void) switch_core_sqldb_resume(void)
{
switch_mutex_lock(sql_manager.ctl_mutex);
if (sql_manager.thread && sql_manager.thread_running) {
switch_status_t st;
if (!sql_manager.paused) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "SQL is already running.\n");
}
sql_manager.paused = 0;
}
static void switch_core_sqldb_stop_thread(void)
{
switch_mutex_lock(sql_manager.ctl_mutex);
if (sql_manager.manage) {
switch_queue_push(sql_manager.sql_queue[0], NULL);
switch_queue_push(sql_manager.sql_queue[1], NULL);
switch_queue_push(sql_manager.sql_queue[2], NULL);
switch_queue_push(sql_manager.sql_queue[3], NULL);
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Waiting for unfinished SQL transactions\n");
wake_thread(0);
sql_manager.thread_running = -1;
switch_thread_join(&st, sql_manager.thread);
sql_manager.thread = NULL;
switch_cache_db_release_db_handle(&sql_manager.dbh);
sql_manager.dbh = NULL;
} else {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "SQL is not enabled\n");
if (sql_manager.qm) {
switch_sql_queue_manager_destroy(&sql_manager.qm);
}
} else {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "SQL thread is not running\n");
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "SQL is not enabled\n");
}
switch_mutex_unlock(sql_manager.ctl_mutex);
}
SWITCH_DECLARE(void) switch_core_sqldb_start_thread(void)
static void switch_core_sqldb_start_thread(void)
{
switch_cache_db_handle_t *dbh;
switch_mutex_lock(sql_manager.ctl_mutex);
if (switch_core_db_handle(&dbh) != SWITCH_STATUS_SUCCESS) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error Opening DB!\n");
if (switch_test_flag((&runtime), SCF_CORE_NON_SQLITE_DB_REQ)) {
int arg = 1;
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Failure! ODBC OR PGSQL IS REQUIRED!\n");
switch_core_session_ctl(SCSC_SHUTDOWN_NOW, &arg);
}
} else {
switch_cache_db_test_reactive(dbh, "select hostname from recovery", "DROP TABLE recovery", recovery_sql);
switch_cache_db_execute_sql(dbh, "create index recovery1 on recovery(technology)", NULL);
switch_cache_db_execute_sql(dbh, "create index recovery2 on recovery(profile_name)", NULL);
switch_cache_db_execute_sql(dbh, "create index recovery3 on recovery(uuid)", NULL);
switch_cache_db_execute_sql(dbh, "create index recovery3 on recovery(runtime_uuid)", NULL);
switch_cache_db_release_db_handle(&dbh);
}
if (sql_manager.manage) {
if (!sql_manager.qm) {
char *dbname = runtime.odbc_dsn;
top:
if (!sql_manager.dbh) {
/* Activate SQL database */
if (switch_core_db_handle(&sql_manager.dbh) != SWITCH_STATUS_SUCCESS) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error Opening DB!\n");
if (switch_test_flag((&runtime), SCF_CORE_NON_SQLITE_DB_REQ)) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Failure! ODBC OR PGSQL IS REQUIRED!\n");
goto end;
if (zstr(dbname)) {
dbname = runtime.dbname;
if (zstr(dbname)) {
dbname = "core";
}
if (runtime.odbc_dsn) {
runtime.odbc_dsn = NULL;
runtime.odbc_dbtype = DBTYPE_DEFAULT;
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Falling back to core_db.\n");
sql_manager.dbh = NULL;
goto top;
}
switch_clear_flag((&runtime), SCF_USE_SQL);
goto end;
}
switch_cache_db_execute_sql(sql_manager.dbh, "delete from channels", NULL);
switch_cache_db_execute_sql(sql_manager.dbh, "delete from calls", NULL);
}
switch_sql_queue_manager_init_name("CORE",
&sql_manager.qm,
4,
dbname,
runtime.core_db_pre_trans_execute,
runtime.core_db_post_trans_execute,
runtime.core_db_inner_pre_trans_execute,
runtime.core_db_inner_post_trans_execute);
if (!sql_manager.thread) {
switch_threadattr_t *thd_attr;
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Starting SQL thread.\n");
switch_threadattr_create(&thd_attr, sql_manager.memory_pool);
switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE);
switch_threadattr_priority_set(thd_attr, SWITCH_PRI_REALTIME);
switch_thread_create(&sql_manager.thread, thd_attr, switch_core_sql_thread, NULL, sql_manager.memory_pool);
} else {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "SQL thread is already running\n");
}
switch_sql_queue_manager_start(sql_manager.qm);
} else {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "SQL is not enabled\n");
}
end:
switch_mutex_unlock(sql_manager.ctl_mutex);
}
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论