提交 e08f9ada authored 作者: Eliot Gable's avatar Eliot Gable

Abstract the sql_manager and cache_db stuff so that modules can utilize the core…

Abstract the sql_manager and cache_db stuff so that modules can utilize the core functionality to run their own instance of a cached SQL connection. Includes patch to fix some segs in case ODBC does not have password set or if using PGSQL support.
上级 5cb354dd
......@@ -2193,7 +2193,11 @@ SWITCH_DECLARE(switch_status_t) switch_core_chat_send(const char *dest_proto, sw
SWITCH_DECLARE(switch_status_t) switch_core_chat_deliver(const char *dest_proto, switch_event_t **message_event);
SWITCH_DECLARE(switch_status_t) switch_ivr_preprocess_session(switch_core_session_t *session, const char *cmds);
SWITCH_DECLARE(void) switch_core_sqldb_stop_thread(void);
SWITCH_DECLARE(void) switch_sqldb_stop_thread(switch_sql_manager_t *sql_manager);
#define switch_core_sqldb_stop_thread() switch_sqldb_stop_thread(&switch_cache_db_sql_manager)
SWITCH_DECLARE(void) switch_sqldb_start_thread(switch_sql_manager_t *sql_manager, void *(SWITCH_THREAD_FUNC * func) (switch_thread_t *, void *),
char **post_connect_sql, int argc);
SWITCH_DECLARE(void) switch_core_sqldb_start_thread(void);
///\}
......@@ -2240,8 +2244,6 @@ typedef union {
switch_cache_db_pgsql_options_t pgsql_options;
} switch_cache_db_connection_options_t;
struct switch_cache_db_handle;
typedef struct switch_cache_db_handle switch_cache_db_handle_t;
static inline const char *switch_cache_db_type_name(switch_cache_db_handle_type_t type)
{
......@@ -2281,7 +2283,9 @@ SWITCH_DECLARE(void) switch_cache_db_dismiss_db_handle(switch_cache_db_handle_t
other threads until the allocating thread actually terminates.
\param [in] The handle
*/
SWITCH_DECLARE(void) _switch_cache_db_release_db_handle(switch_sql_manager_t *sql_manager, switch_cache_db_handle_t ** dbh);
SWITCH_DECLARE(void) switch_cache_db_release_db_handle(switch_cache_db_handle_t ** dbh);
/*!
\brief Gets a new cached handle from the pool, potentially creating a new connection.
The connection is bound to the thread until it (the thread) terminates unless
......@@ -2290,12 +2294,19 @@ SWITCH_DECLARE(void) switch_cache_db_release_db_handle(switch_cache_db_handle_t
\param [in] type - ODBC or SQLLITE
\param [in] connection_options (userid, password, etc)
*/
SWITCH_DECLARE(switch_status_t) __switch_cache_db_get_db_handle(switch_sql_manager_t *sql_manager,
switch_cache_db_handle_t ** dbh,
switch_cache_db_handle_type_t type,
switch_cache_db_connection_options_t *connection_options,
const char *file, const char *func, int line);
SWITCH_DECLARE(switch_status_t) _switch_cache_db_get_db_handle(switch_cache_db_handle_t ** dbh,
switch_cache_db_handle_type_t type,
switch_cache_db_connection_options_t *connection_options,
const char *file, const char *func, int line);
#define switch_cache_db_get_db_handle(_a, _b, _c) _switch_cache_db_get_db_handle(_a, _b, _c, __FILE__, __SWITCH_FUNC__, __LINE__)
/*!
\brief Executes the sql and returns the result as a string
\param [in] dbh The handle
......@@ -2330,15 +2341,21 @@ SWITCH_DECLARE(switch_status_t) switch_cache_db_execute_sql_callback(switch_cach
*/
SWITCH_DECLARE(int) switch_cache_db_affected_rows(switch_cache_db_handle_t *dbh);
extern switch_sql_manager_t switch_cache_db_sql_manager;
/*!
\brief Provides some feedback as to the status of the db connection pool
\param [in] stream stream for status
*/
SWITCH_DECLARE(void) _switch_cache_db_status(switch_sql_manager_t *sql_manager, switch_stream_handle_t *stream);
SWITCH_DECLARE(void) switch_cache_db_status(switch_stream_handle_t *stream);
SWITCH_DECLARE(switch_status_t) __switch_core_db_handle(switch_sql_manager_t *sql_manager, switch_cache_db_handle_t ** dbh, const char *file, const char *func, int line);
SWITCH_DECLARE(switch_status_t) _switch_core_db_handle(switch_cache_db_handle_t ** dbh, const char *file, const char *func, int line);
#define switch_core_db_handle(_a) _switch_core_db_handle(_a, __FILE__, __SWITCH_FUNC__, __LINE__)
SWITCH_DECLARE(switch_status_t) _switch_core_recovery_db_handle(switch_cache_db_handle_t ** dbh, const char *file, const char *func, int line);
#define switch_core_recovery_db_handle(_a) _switch_core_recovery_db_handle(_a, __FILE__, __SWITCH_FUNC__, __LINE__)
SWITCH_DECLARE(switch_status_t) _switch_core_recovery_db_handle(switch_sql_manager_t *sql_manager, switch_cache_db_handle_t ** dbh, const char *file, const char *func, int line);
#define switch_core_recovery_db_handle(_a) _switch_core_recovery_db_handle(&switch_cache_db_sql_manager, _a, __FILE__, __SWITCH_FUNC__, __LINE__)
SWITCH_DECLARE(switch_status_t) _switch_core_persist_db_handle(switch_sql_manager_t *sql_manager, switch_cache_db_handle_t ** dbh, const char *file, const char *func, int line);
#define switch_core_persist_db_handle(_a) _switch_core_persist_db_handle(&switch_cache_db_sql_manager, _a, __FILE__, __SWITCH_FUNC__, __LINE__)
SWITCH_DECLARE(switch_bool_t) switch_cache_db_test_reactive(switch_cache_db_handle_t *db,
const char *test_sql, const char *drop_sql, const char *reactive_sql);
......@@ -2347,7 +2364,9 @@ SWITCH_DECLARE(switch_status_t) switch_cache_db_persistant_execute_trans(switch_
SWITCH_DECLARE(void) switch_core_set_signal_handlers(void);
SWITCH_DECLARE(uint32_t) switch_core_debug_level(void);
SWITCH_DECLARE(void) switch_cache_db_flush_handles(void);
SWITCH_DECLARE(void) switch_cache_db_flush_handles(switch_sql_manager_t *sql_manager);
#define switch_core_cache_db_flush_handles() switch_cache_db_flush_handles(&switch_cache_db_sql_manager)
SWITCH_DECLARE(const char *) switch_core_banner(void);
SWITCH_DECLARE(switch_bool_t) switch_core_session_in_thread(switch_core_session_t *session);
SWITCH_DECLARE(uint32_t) switch_default_ptime(const char *name, uint32_t number);
......
......@@ -1836,6 +1836,11 @@ typedef struct switch_odbc_handle switch_odbc_handle_t;
typedef struct switch_pgsql_handle switch_pgsql_handle_t;
typedef struct switch_pgsql_result switch_pgsql_result_t;
struct switch_cache_db_handle;
typedef struct switch_cache_db_handle switch_cache_db_handle_t;
struct switch_sql_manager;
typedef struct switch_sql_manager switch_sql_manager_t;
typedef struct switch_io_routines switch_io_routines_t;
typedef struct switch_speech_handle switch_speech_handle_t;
typedef struct switch_asr_handle switch_asr_handle_t;
......
......@@ -2260,7 +2260,7 @@ SWITCH_DECLARE(int32_t) switch_core_session_ctl(switch_session_ctl_t cmd, void *
switch_time_calibrate_clock();
break;
case SCSC_FLUSH_DB_HANDLES:
switch_cache_db_flush_handles();
switch_core_cache_db_flush_handles();
break;
case SCSC_SEND_SIGHUP:
handle_SIGHUP(1);
......
......@@ -26,6 +26,7 @@
* Anthony Minessale II <anthm@freeswitch.org>
* Michael Jerris <mike@jerris.com>
* Paul D. Tinsley <pdt at jackhammer.org>
* Eliot Gable <egable@gmail.com>
*
*
* switch_core_sqldb.c -- Main Core Library (statistics tracker)
......@@ -55,7 +56,7 @@ struct switch_cache_db_handle {
struct switch_cache_db_handle *next;
};
static struct {
struct switch_sql_manager {
switch_cache_db_handle_t *event_db;
switch_queue_t *sql_queue[2];
switch_memory_pool_t *memory_pool;
......@@ -73,7 +74,9 @@ static struct {
uint32_t total_handles;
uint32_t total_used_handles;
switch_cache_db_handle_t *dbh;
} sql_manager;
};
switch_sql_manager_t switch_cache_db_sql_manager;
static switch_cache_db_handle_t *create_handle(switch_cache_db_handle_type_t type)
......@@ -90,11 +93,11 @@ static switch_cache_db_handle_t *create_handle(switch_cache_db_handle_type_t typ
return new_dbh;
}
static void add_handle(switch_cache_db_handle_t *dbh, const char *db_str, const char *db_callsite_str, const char *thread_str)
static void add_handle(switch_sql_manager_t *sql_manager, switch_cache_db_handle_t *dbh, const char *db_str, const char *db_callsite_str, const char *thread_str)
{
switch_ssize_t hlen = -1;
switch_mutex_lock(sql_manager.dbh_mutex);
switch_mutex_lock(sql_manager->dbh_mutex);
switch_set_string(dbh->creator, db_callsite_str);
......@@ -103,37 +106,37 @@ static void add_handle(switch_cache_db_handle_t *dbh, const char *db_str, const
dbh->thread_hash = switch_ci_hashfunc_default(thread_str, &hlen);
dbh->use_count++;
sql_manager.total_used_handles++;
dbh->next = sql_manager.handle_pool;
sql_manager->total_used_handles++;
dbh->next = sql_manager->handle_pool;
sql_manager.handle_pool = dbh;
sql_manager.total_handles++;
sql_manager->handle_pool = dbh;
sql_manager->total_handles++;
switch_mutex_lock(dbh->mutex);
switch_mutex_unlock(sql_manager.dbh_mutex);
switch_mutex_unlock(sql_manager->dbh_mutex);
}
static void del_handle(switch_cache_db_handle_t *dbh)
static void del_handle(switch_sql_manager_t *sql_manager, switch_cache_db_handle_t *dbh)
{
switch_cache_db_handle_t *dbh_ptr, *last = NULL;
switch_mutex_lock(sql_manager.dbh_mutex);
for (dbh_ptr = sql_manager.handle_pool; dbh_ptr; dbh_ptr = dbh_ptr->next) {
switch_mutex_lock(sql_manager->dbh_mutex);
for (dbh_ptr = sql_manager->handle_pool; dbh_ptr; dbh_ptr = dbh_ptr->next) {
if (dbh_ptr == dbh) {
if (last) {
last->next = dbh_ptr->next;
} else {
sql_manager.handle_pool = dbh_ptr->next;
sql_manager->handle_pool = dbh_ptr->next;
}
sql_manager.total_handles--;
sql_manager->total_handles--;
break;
}
last = dbh_ptr;
}
switch_mutex_unlock(sql_manager.dbh_mutex);
switch_mutex_unlock(sql_manager->dbh_mutex);
}
static switch_cache_db_handle_t *get_handle(const char *db_str, const char *user_str, const char *thread_str)
static switch_cache_db_handle_t *get_handle(switch_sql_manager_t *sql_manager, const char *db_str, const char *user_str, const char *thread_str)
{
switch_ssize_t hlen = -1;
unsigned long hash = 0, thread_hash = 0;
......@@ -142,9 +145,9 @@ static switch_cache_db_handle_t *get_handle(const char *db_str, const char *user
hash = switch_ci_hashfunc_default(db_str, &hlen);
thread_hash = switch_ci_hashfunc_default(thread_str, &hlen);
switch_mutex_lock(sql_manager.dbh_mutex);
switch_mutex_lock(sql_manager->dbh_mutex);
for (dbh_ptr = sql_manager.handle_pool; dbh_ptr; dbh_ptr = dbh_ptr->next) {
for (dbh_ptr = sql_manager->handle_pool; dbh_ptr; dbh_ptr = dbh_ptr->next) {
if (dbh_ptr->thread_hash == thread_hash && dbh_ptr->hash == hash &&
!switch_test_flag(dbh_ptr, CDF_PRUNE) && switch_mutex_trylock(dbh_ptr->mutex) == SWITCH_STATUS_SUCCESS) {
r = dbh_ptr;
......@@ -152,7 +155,7 @@ static switch_cache_db_handle_t *get_handle(const char *db_str, const char *user
}
if (!r) {
for (dbh_ptr = sql_manager.handle_pool; dbh_ptr; dbh_ptr = dbh_ptr->next) {
for (dbh_ptr = sql_manager->handle_pool; dbh_ptr; dbh_ptr = dbh_ptr->next) {
if (dbh_ptr->hash == hash && !dbh_ptr->use_count && !switch_test_flag(dbh_ptr, CDF_PRUNE) &&
switch_mutex_trylock(dbh_ptr->mutex) == SWITCH_STATUS_SUCCESS) {
r = dbh_ptr;
......@@ -163,13 +166,13 @@ static switch_cache_db_handle_t *get_handle(const char *db_str, const char *user
if (r) {
r->use_count++;
sql_manager.total_used_handles++;
sql_manager->total_used_handles++;
r->hash = switch_ci_hashfunc_default(db_str, &hlen);
r->thread_hash = thread_hash;
switch_set_string(r->last_user, user_str);
}
switch_mutex_unlock(sql_manager.dbh_mutex);
switch_mutex_unlock(sql_manager->dbh_mutex);
return r;
......@@ -181,11 +184,16 @@ static switch_cache_db_handle_t *get_handle(const char *db_str, const char *user
\brief Open the default system database
*/
SWITCH_DECLARE(switch_status_t) _switch_core_db_handle(switch_cache_db_handle_t **dbh, const char *file, const char *func, int line)
{
return __switch_core_db_handle(&switch_cache_db_sql_manager, dbh, file, func, line);
}
SWITCH_DECLARE(switch_status_t) __switch_core_db_handle(switch_sql_manager_t *sql_manager, switch_cache_db_handle_t **dbh, const char *file, const char *func, int line)
{
switch_cache_db_connection_options_t options = { {0} };
switch_status_t r;
if (!sql_manager.manage) {
if (!sql_manager->manage) {
return SWITCH_STATUS_FALSE;
}
......@@ -199,26 +207,26 @@ SWITCH_DECLARE(switch_status_t) _switch_core_db_handle(switch_cache_db_handle_t
} else {
options.core_db_options.db_path = SWITCH_CORE_DB;
}
r = _switch_cache_db_get_db_handle(dbh, SCDB_TYPE_CORE_DB, &options, file, func, line);
r = __switch_cache_db_get_db_handle(sql_manager, dbh, SCDB_TYPE_CORE_DB, &options, file, func, line);
} else {
char *dsn;
if ((dsn = strstr(runtime.odbc_dsn, "pgsql;")) != NULL) {
options.pgsql_options.dsn = (char*)(dsn + 6);
r = _switch_cache_db_get_db_handle(dbh, SCDB_TYPE_PGSQL, &options, file, func, line);
r = __switch_cache_db_get_db_handle(sql_manager, dbh, SCDB_TYPE_PGSQL, &options, file, func, line);
} else {
options.odbc_options.dsn = runtime.odbc_dsn;
options.odbc_options.user = runtime.odbc_user;
options.odbc_options.pass = runtime.odbc_pass;
r = _switch_cache_db_get_db_handle(dbh, SCDB_TYPE_ODBC, &options, file, func, line);
r = __switch_cache_db_get_db_handle(sql_manager, dbh, SCDB_TYPE_ODBC, &options, file, func, line);
}
}
/* I *think* we can do without this now, if not let me know
if (r == SWITCH_STATUS_SUCCESS && !(*dbh)->io_mutex) {
(*dbh)->io_mutex = sql_manager.io_mutex;
(*dbh)->io_mutex = sql_manager->io_mutex;
}
*/
......@@ -229,12 +237,12 @@ SWITCH_DECLARE(switch_status_t) _switch_core_db_handle(switch_cache_db_handle_t
/*!
\brief Open the default system database
*/
SWITCH_DECLARE(switch_status_t) _switch_core_persist_db_handle(switch_cache_db_handle_t **dbh, const char *file, const char *func, int line)
SWITCH_DECLARE(switch_status_t) _switch_core_persist_db_handle(switch_sql_manager_t *sql_manager, switch_cache_db_handle_t **dbh, const char *file, const char *func, int line)
{
switch_cache_db_connection_options_t options = { {0} };
switch_status_t r;
if (!sql_manager.manage) {
if (!sql_manager->manage) {
return SWITCH_STATUS_FALSE;
}
......@@ -248,26 +256,26 @@ SWITCH_DECLARE(switch_status_t) _switch_core_persist_db_handle(switch_cache_db_h
} else {
options.core_db_options.db_path = SWITCH_CORE_PERSIST_DB;
}
r = _switch_cache_db_get_db_handle(dbh, SCDB_TYPE_CORE_DB, &options, file, func, line);
r = __switch_cache_db_get_db_handle(sql_manager, dbh, SCDB_TYPE_CORE_DB, &options, file, func, line);
} else {
char *dsn;
if ((dsn = strstr(runtime.odbc_dsn, "pgsql;")) != NULL) {
options.pgsql_options.dsn = (char*)(dsn + 6);
r = _switch_cache_db_get_db_handle(dbh, SCDB_TYPE_PGSQL, &options, file, func, line);
r = __switch_cache_db_get_db_handle(sql_manager, dbh, SCDB_TYPE_PGSQL, &options, file, func, line);
} else {
options.odbc_options.dsn = runtime.odbc_dsn;
options.odbc_options.user = runtime.odbc_user;
options.odbc_options.pass = runtime.odbc_pass;
r = _switch_cache_db_get_db_handle(dbh, SCDB_TYPE_ODBC, &options, file, func, line);
r = __switch_cache_db_get_db_handle(sql_manager, dbh, SCDB_TYPE_ODBC, &options, file, func, line);
}
}
/* I *think* we can do without this now, if not let me know
if (r == SWITCH_STATUS_SUCCESS && !(*dbh)->io_mutex) {
(*dbh)->io_mutex = sql_manager.io_mutex;
(*dbh)->io_mutex = sql_manager->io_mutex;
}
*/
......@@ -276,12 +284,12 @@ SWITCH_DECLARE(switch_status_t) _switch_core_persist_db_handle(switch_cache_db_h
#define SWITCH_CORE_RECOVERY_DB "core_recovery"
SWITCH_DECLARE(switch_status_t) _switch_core_recovery_db_handle(switch_cache_db_handle_t **dbh, const char *file, const char *func, int line)
SWITCH_DECLARE(switch_status_t) _switch_core_recovery_db_handle(switch_sql_manager_t *sql_manager, switch_cache_db_handle_t **dbh, const char *file, const char *func, int line)
{
switch_cache_db_connection_options_t options = { {0} };
switch_status_t r;
if (!sql_manager.manage) {
if (!sql_manager->manage) {
return SWITCH_STATUS_FALSE;
}
......@@ -295,26 +303,26 @@ SWITCH_DECLARE(switch_status_t) _switch_core_recovery_db_handle(switch_cache_db_
} else {
options.core_db_options.db_path = SWITCH_CORE_RECOVERY_DB;
}
r = _switch_cache_db_get_db_handle(dbh, SCDB_TYPE_CORE_DB, &options, file, func, line);
r = __switch_cache_db_get_db_handle(sql_manager, dbh, SCDB_TYPE_CORE_DB, &options, file, func, line);
} else {
char *dsn;
if ((dsn = strstr(runtime.recovery_odbc_dsn, "pgsql;")) != NULL) {
options.pgsql_options.dsn = (char*)(dsn + 6);
r = _switch_cache_db_get_db_handle(dbh, SCDB_TYPE_PGSQL, &options, file, func, line);
r = __switch_cache_db_get_db_handle(sql_manager, dbh, SCDB_TYPE_PGSQL, &options, file, func, line);
} else {
options.odbc_options.dsn = runtime.recovery_odbc_dsn;
options.odbc_options.user = runtime.recovery_odbc_user;
options.odbc_options.pass = runtime.recovery_odbc_pass;
r = _switch_cache_db_get_db_handle(dbh, SCDB_TYPE_ODBC, &options, file, func, line);
r = __switch_cache_db_get_db_handle(sql_manager, dbh, SCDB_TYPE_ODBC, &options, file, func, line);
}
}
/* I *think* we can do without this now, if not let me know
if (r == SWITCH_STATUS_SUCCESS && !(*dbh)->io_mutex) {
(*dbh)->io_mutex = sql_manager.io_mutex;
(*dbh)->io_mutex = sql_manager->io_mutex;
}
*/
......@@ -326,16 +334,16 @@ SWITCH_DECLARE(switch_status_t) _switch_core_recovery_db_handle(switch_cache_db_
#define SQL_REG_TIMEOUT 15
static void sql_close(time_t prune)
static void sql_close(switch_sql_manager_t *sql_manager, time_t prune)
{
switch_cache_db_handle_t *dbh = NULL;
int locked = 0;
switch_mutex_lock(sql_manager.dbh_mutex);
switch_mutex_lock(sql_manager->dbh_mutex);
top:
locked = 0;
for (dbh = sql_manager.handle_pool; dbh; dbh = dbh->next) {
for (dbh = sql_manager->handle_pool; dbh; dbh = dbh->next) {
time_t diff = 0;
if (prune > 0 && prune > dbh->last_used) {
......@@ -368,7 +376,7 @@ static void sql_close(time_t prune)
break;
}
del_handle(dbh);
del_handle(sql_manager, dbh);
switch_mutex_unlock(dbh->mutex);
switch_core_destroy_memory_pool(&dbh->pool);
goto top;
......@@ -386,7 +394,7 @@ static void sql_close(time_t prune)
goto top;
}
switch_mutex_unlock(sql_manager.dbh_mutex);
switch_mutex_unlock(sql_manager->dbh_mutex);
}
......@@ -395,16 +403,20 @@ SWITCH_DECLARE(switch_cache_db_handle_type_t) switch_cache_db_get_type(switch_ca
return dbh->type;
}
SWITCH_DECLARE(void) switch_cache_db_flush_handles(void)
SWITCH_DECLARE(void) switch_cache_db_flush_handles(switch_sql_manager_t *sql_manager)
{
sql_close(switch_epoch_time_now(NULL) + SQL_CACHE_TIMEOUT + 1);
sql_close(sql_manager, switch_epoch_time_now(NULL) + SQL_CACHE_TIMEOUT + 1);
}
SWITCH_DECLARE(void) switch_cache_db_release_db_handle(switch_cache_db_handle_t **dbh)
{
_switch_cache_db_release_db_handle(&switch_cache_db_sql_manager, dbh);
}
SWITCH_DECLARE(void) _switch_cache_db_release_db_handle(switch_sql_manager_t *sql_manager, switch_cache_db_handle_t **dbh)
{
if (dbh && *dbh) {
switch_mutex_lock(sql_manager.dbh_mutex);
switch_mutex_lock(sql_manager->dbh_mutex);
(*dbh)->last_used = switch_epoch_time_now(NULL);
(*dbh)->io_mutex = NULL;
......@@ -415,9 +427,9 @@ SWITCH_DECLARE(void) switch_cache_db_release_db_handle(switch_cache_db_handle_t
}
}
switch_mutex_unlock((*dbh)->mutex);
sql_manager.total_used_handles--;
sql_manager->total_used_handles--;
*dbh = NULL;
switch_mutex_unlock(sql_manager.dbh_mutex);
switch_mutex_unlock(sql_manager->dbh_mutex);
}
}
......@@ -428,7 +440,16 @@ SWITCH_DECLARE(void) switch_cache_db_dismiss_db_handle(switch_cache_db_handle_t
}
SWITCH_DECLARE(switch_status_t) _switch_cache_db_get_db_handle(switch_cache_db_handle_t **dbh,
SWITCH_DECLARE(switch_status_t) _switch_cache_db_get_db_handle(switch_cache_db_handle_t ** dbh,
switch_cache_db_handle_type_t type,
switch_cache_db_connection_options_t *connection_options,
const char *file, const char *func, int line)
{
return __switch_cache_db_get_db_handle(&switch_cache_db_sql_manager, dbh, type, connection_options, file, func, line);
}
SWITCH_DECLARE(switch_status_t) __switch_cache_db_get_db_handle(switch_sql_manager_t *sql_manager,
switch_cache_db_handle_t **dbh,
switch_cache_db_handle_type_t type,
switch_cache_db_connection_options_t *connection_options,
const char *file, const char *func, int line)
......@@ -445,7 +466,7 @@ SWITCH_DECLARE(switch_status_t) _switch_cache_db_get_db_handle(switch_cache_db_h
const char *odbc_user = NULL;
const char *odbc_pass = NULL;
while(runtime.max_db_handles && sql_manager.total_handles >= runtime.max_db_handles && sql_manager.total_used_handles >= sql_manager.total_handles) {
while(runtime.max_db_handles && sql_manager->total_handles >= runtime.max_db_handles && sql_manager->total_used_handles >= sql_manager->total_handles) {
if (!waiting++) {
switch_log_printf(SWITCH_CHANNEL_ID_LOG, file, func, line, NULL, SWITCH_LOG_WARNING, "Max handles %u exceeded, blocking....\n",
runtime.max_db_handles);
......@@ -496,7 +517,7 @@ SWITCH_DECLARE(switch_status_t) _switch_cache_db_get_db_handle(switch_cache_db_h
snprintf(db_callsite_str, sizeof(db_callsite_str) - 1, "%s:%d", file, line);
snprintf(thread_str, sizeof(thread_str) - 1, "thread=\"%lu\"", (unsigned long) (intptr_t) self);
if ((new_dbh = get_handle(db_str, db_callsite_str, thread_str))) {
if ((new_dbh = get_handle(sql_manager, db_str, db_callsite_str, thread_str))) {
switch_log_printf(SWITCH_CHANNEL_ID_LOG, file, func, line, NULL, SWITCH_LOG_DEBUG10,
"Reuse Unused Cached DB handle %s [%s]\n", new_dbh->name, switch_cache_db_type_name(new_dbh->type));
} else {
......@@ -552,9 +573,6 @@ SWITCH_DECLARE(switch_status_t) _switch_cache_db_get_db_handle(switch_cache_db_h
goto end;
}
switch_log_printf(SWITCH_CHANNEL_ID_LOG, file, func, line, NULL, SWITCH_LOG_DEBUG10,
"Create Cached DB handle %s [%s] %s:%d\n", new_dbh->name, switch_cache_db_type_name(type), file, line);
new_dbh = create_handle(type);
if (db) {
......@@ -565,7 +583,11 @@ SWITCH_DECLARE(switch_status_t) _switch_cache_db_get_db_handle(switch_cache_db_h
new_dbh->native_handle.pgsql_dbh = pgsql_dbh;
}
add_handle(new_dbh, db_str, db_callsite_str, thread_str);
add_handle(sql_manager, new_dbh, db_str, db_callsite_str, thread_str);
switch_log_printf(SWITCH_CHANNEL_ID_LOG, file, func, line, NULL, SWITCH_LOG_DEBUG10,
"Create Cached DB handle %s [%s] %s:%d\n", new_dbh->name, switch_cache_db_type_name(type), file, line);
}
end:
......@@ -638,18 +660,20 @@ static switch_status_t switch_cache_db_execute_sql_real(switch_cache_db_handle_t
return status;
}
static void wake_thread(int force)
static void _wake_thread(switch_sql_manager_t *sql_manager, int force)
{
if (force) {
switch_thread_cond_signal(sql_manager.cond);
switch_thread_cond_signal(sql_manager->cond);
return;
}
if (switch_mutex_trylock(sql_manager.cond_mutex) == SWITCH_STATUS_SUCCESS) {
switch_thread_cond_signal(sql_manager.cond);
switch_mutex_unlock(sql_manager.cond_mutex);
if (switch_mutex_trylock(sql_manager->cond_mutex) == SWITCH_STATUS_SUCCESS) {
switch_thread_cond_signal(sql_manager->cond);
switch_mutex_unlock(sql_manager->cond_mutex);
}
}
#define wake_thread(f) _wake_thread(&switch_cache_db_sql_manager, f)
/**
OMFG you cruel bastards. Who chooses 64k as a max buffer len for a sql statement, have you ever heard of transactions?
......@@ -1173,13 +1197,16 @@ SWITCH_DECLARE(switch_bool_t) switch_cache_db_test_reactive(switch_cache_db_hand
static void *SWITCH_THREAD_FUNC switch_core_sql_db_thread(switch_thread_t *thread, void *obj)
{
int sec = 0, reg_sec = 0;;
int sec = 0, reg_sec = 0;
switch_sql_manager_t *sql_manager;
sql_manager.db_thread_running = 1;
sql_manager = (switch_sql_manager_t*)obj;
while (sql_manager.db_thread_running == 1) {
sql_manager->db_thread_running = 1;
while (sql_manager->db_thread_running == 1) {
if (++sec == SQL_CACHE_TIMEOUT) {
sql_close(switch_epoch_time_now(NULL));
sql_close(sql_manager, switch_epoch_time_now(NULL));
wake_thread(0);
sec = 0;
}
......@@ -1208,44 +1235,47 @@ static void *SWITCH_THREAD_FUNC switch_core_sql_thread(switch_thread_t *thread,
int lc = 0, wrote = 0, do_sleep = 1;
uint32_t sanity = 120;
int auto_pause = 0;
switch_sql_manager_t *sql_manager;
sql_manager = (switch_sql_manager_t*)obj;
switch_assert(sqlbuf);
while (!sql_manager.event_db) {
if (switch_core_db_handle(&sql_manager.event_db) == SWITCH_STATUS_SUCCESS && sql_manager.event_db)
while (!sql_manager->event_db) {
if (switch_core_db_handle(&sql_manager->event_db) == SWITCH_STATUS_SUCCESS && sql_manager->event_db)
break;
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Error getting core db, Retrying\n");
switch_yield(500000);
sanity--;
}
if (!sql_manager.event_db) {
if (!sql_manager->event_db) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Error getting core db Disabling core sql functionality\n");
return NULL;
}
sql_manager.thread_running = 1;
sql_manager->thread_running = 1;
switch_mutex_lock(sql_manager.cond_mutex);
switch_mutex_lock(sql_manager->cond_mutex);
switch (sql_manager.event_db->type) {
switch (switch_cache_db_sql_manager.event_db->type) {
case SCDB_TYPE_PGSQL:
break;
case SCDB_TYPE_ODBC:
break;
case SCDB_TYPE_CORE_DB:
{
switch_cache_db_execute_sql(sql_manager.event_db, "PRAGMA synchronous=OFF;", NULL);
switch_cache_db_execute_sql(sql_manager.event_db, "PRAGMA count_changes=OFF;", NULL);
switch_cache_db_execute_sql(sql_manager.event_db, "PRAGMA temp_store=MEMORY;", NULL);
switch_cache_db_execute_sql(sql_manager.event_db, "PRAGMA journal_mode=OFF;", NULL);
switch_cache_db_execute_sql(sql_manager->event_db, "PRAGMA synchronous=OFF;", NULL);
switch_cache_db_execute_sql(sql_manager->event_db, "PRAGMA count_changes=OFF;", NULL);
switch_cache_db_execute_sql(sql_manager->event_db, "PRAGMA temp_store=MEMORY;", NULL);
switch_cache_db_execute_sql(sql_manager->event_db, "PRAGMA journal_mode=OFF;", NULL);
}
break;
}
while (sql_manager.thread_running == 1) {
if (save_sql || switch_queue_trypop(sql_manager.sql_queue[0], &pop) == SWITCH_STATUS_SUCCESS ||
switch_queue_trypop(sql_manager.sql_queue[1], &pop) == SWITCH_STATUS_SUCCESS) {
while (sql_manager->thread_running == 1) {
if (save_sql || switch_queue_trypop(sql_manager->sql_queue[0], &pop) == SWITCH_STATUS_SUCCESS ||
switch_queue_trypop(sql_manager->sql_queue[1], &pop) == SWITCH_STATUS_SUCCESS) {
if (save_sql) {
sql = save_sql;
......@@ -1268,8 +1298,8 @@ static void *SWITCH_THREAD_FUNC switch_core_sql_thread(switch_thread_t *thread,
sql_len = new_mlen;
if (switch_test_flag((&runtime), SCF_DEBUG_SQL)) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT,
"REALLOC %ld %d %d\n", (long int)sql_len, switch_queue_size(sql_manager.sql_queue[0]),
switch_queue_size(sql_manager.sql_queue[1]));
"REALLOC %ld %d %d\n", (long int)sql_len, switch_queue_size(sql_manager->sql_queue[0]),
switch_queue_size(sql_manager->sql_queue[1]));
}
if (!(tmp = realloc(sqlbuf, sql_len))) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "SQL thread ending on mem err\n");
......@@ -1280,7 +1310,7 @@ static void *SWITCH_THREAD_FUNC switch_core_sql_thread(switch_thread_t *thread,
} else {
if (switch_test_flag((&runtime), SCF_DEBUG_SQL)) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT,
"SAVE %d %d\n", switch_queue_size(sql_manager.sql_queue[0]), switch_queue_size(sql_manager.sql_queue[1]));
"SAVE %d %d\n", switch_queue_size(sql_manager->sql_queue[0]), switch_queue_size(sql_manager->sql_queue[1]));
}
save_sql = sql;
sql = NULL;
......@@ -1289,7 +1319,7 @@ static void *SWITCH_THREAD_FUNC switch_core_sql_thread(switch_thread_t *thread,
}
}
iterations++;
iterations++;
sprintf(sqlbuf + len, "%s;\n", sql);
len += newlen;
free(sql);
......@@ -1300,7 +1330,7 @@ static void *SWITCH_THREAD_FUNC switch_core_sql_thread(switch_thread_t *thread,
}
}
lc = switch_queue_size(sql_manager.sql_queue[0]) + switch_queue_size(sql_manager.sql_queue[1]);
lc = switch_queue_size(sql_manager->sql_queue[0]) + switch_queue_size(sql_manager->sql_queue[1]);
if (lc > SWITCH_SQL_QUEUE_PAUSE_LEN) {
......@@ -1326,9 +1356,9 @@ static void *SWITCH_THREAD_FUNC switch_core_sql_thread(switch_thread_t *thread,
if (trans && iterations && (iterations > target || !lc)) {
if (switch_test_flag((&runtime), SCF_DEBUG_SQL)) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT,
"RUN %d %d %d\n", switch_queue_size(sql_manager.sql_queue[0]), switch_queue_size(sql_manager.sql_queue[1]), iterations);
"RUN %d %d %d\n", switch_queue_size(sql_manager->sql_queue[0]), switch_queue_size(sql_manager->sql_queue[1]), iterations);
}
if (switch_cache_db_persistant_execute_trans(sql_manager.event_db, sqlbuf, 1) != SWITCH_STATUS_SUCCESS) {
if (switch_cache_db_persistant_execute_trans(sql_manager->event_db, sqlbuf, 1) != SWITCH_STATUS_SUCCESS) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "SQL thread unable to commit transaction, records lost!\n");
}
if (switch_test_flag((&runtime), SCF_DEBUG_SQL)) {
......@@ -1349,10 +1379,10 @@ static void *SWITCH_THREAD_FUNC switch_core_sql_thread(switch_thread_t *thread,
wrote = 1;
}
lc = switch_queue_size(sql_manager.sql_queue[0]) + switch_queue_size(sql_manager.sql_queue[1]);
lc = switch_queue_size(sql_manager->sql_queue[0]) + switch_queue_size(sql_manager->sql_queue[1]);
if (!lc) {
switch_thread_cond_wait(sql_manager.cond, sql_manager.cond_mutex);
switch_thread_cond_wait(sql_manager->cond, sql_manager->cond_mutex);
} else if (wrote) {
if (lc > 2000) {
do_sleep = 0;
......@@ -1364,21 +1394,21 @@ static void *SWITCH_THREAD_FUNC switch_core_sql_thread(switch_thread_t *thread,
}
switch_mutex_unlock(sql_manager.cond_mutex);
switch_mutex_unlock(sql_manager->cond_mutex);
while (switch_queue_trypop(sql_manager.sql_queue[0], &pop) == SWITCH_STATUS_SUCCESS) {
while (switch_queue_trypop(sql_manager->sql_queue[0], &pop) == SWITCH_STATUS_SUCCESS) {
free(pop);
}
while (switch_queue_trypop(sql_manager.sql_queue[1], &pop) == SWITCH_STATUS_SUCCESS) {
while (switch_queue_trypop(sql_manager->sql_queue[1], &pop) == SWITCH_STATUS_SUCCESS) {
free(pop);
}
free(sqlbuf);
sql_manager.thread_running = 0;
sql_manager->thread_running = 0;
switch_cache_db_release_db_handle(&sql_manager.event_db);
switch_cache_db_release_db_handle(&sql_manager->event_db);
return NULL;
}
......@@ -1820,9 +1850,9 @@ static void core_event_handler(switch_event_t *event)
for (i = 0; i < sql_idx; i++) {
if (switch_stristr("update channels", sql[i]) || switch_stristr("delete from channels", sql[i])) {
switch_queue_push(sql_manager.sql_queue[1], sql[i]);
switch_queue_push(switch_cache_db_sql_manager.sql_queue[1], sql[i]);
} else {
switch_queue_push(sql_manager.sql_queue[0], sql[i]);
switch_queue_push(switch_cache_db_sql_manager.sql_queue[0], sql[i]);
}
sql[i] = NULL;
wake_thread(0);
......@@ -2381,7 +2411,7 @@ SWITCH_DECLARE(switch_status_t) switch_core_add_registration(const char *user, c
user, realm, switch_core_get_switchname());
}
switch_queue_push(sql_manager.sql_queue[0], sql);
switch_queue_push(switch_cache_db_sql_manager.sql_queue[0], sql);
if ( !zstr(metadata) ) {
sql = switch_mprintf("insert into registrations (reg_user,realm,token,url,expires,network_ip,network_port,network_proto,hostname,metadata) "
......@@ -2413,7 +2443,7 @@ SWITCH_DECLARE(switch_status_t) switch_core_add_registration(const char *user, c
}
switch_queue_push(sql_manager.sql_queue[0], sql);
switch_queue_push(switch_cache_db_sql_manager.sql_queue[0], sql);
return SWITCH_STATUS_SUCCESS;
}
......@@ -2433,7 +2463,7 @@ SWITCH_DECLARE(switch_status_t) switch_core_del_registration(const char *user, c
sql = switch_mprintf("delete from registrations where reg_user='%q' and realm='%q' and hostname='%q'", user, realm, switch_core_get_switchname());
}
switch_queue_push(sql_manager.sql_queue[0], sql);
switch_queue_push(switch_cache_db_sql_manager.sql_queue[0], sql);
return SWITCH_STATUS_SUCCESS;
}
......@@ -2456,35 +2486,33 @@ SWITCH_DECLARE(switch_status_t) switch_core_expire_registration(int force)
sql = switch_mprintf("delete from registrations where expires > 0 and expires <= %ld and hostname='%q'", now, switch_core_get_switchname());
}
switch_queue_push(sql_manager.sql_queue[0], sql);
switch_queue_push(switch_cache_db_sql_manager.sql_queue[0], sql);
return SWITCH_STATUS_SUCCESS;
}
switch_status_t switch_core_sqldb_start(switch_memory_pool_t *pool, switch_bool_t manage)
void switch_sqldb_init_sql_manager(switch_sql_manager_t *sql_manager, switch_memory_pool_t *pool, switch_bool_t manage)
{
switch_threadattr_t *thd_attr;
uint32_t sanity = 400;
sql_manager.memory_pool = pool;
sql_manager.manage = manage;
switch_mutex_init(&sql_manager.dbh_mutex, SWITCH_MUTEX_NESTED, sql_manager.memory_pool);
switch_mutex_init(&sql_manager.io_mutex, SWITCH_MUTEX_NESTED, sql_manager.memory_pool);
switch_mutex_init(&sql_manager.cond_mutex, SWITCH_MUTEX_NESTED, sql_manager.memory_pool);
switch_mutex_init(&sql_manager.ctl_mutex, SWITCH_MUTEX_NESTED, sql_manager.memory_pool);
switch_thread_cond_create(&sql_manager.cond, sql_manager.memory_pool);
sql_manager->memory_pool = pool;
sql_manager->manage = manage;
switch_mutex_init(&sql_manager->dbh_mutex, SWITCH_MUTEX_NESTED, sql_manager->memory_pool);
switch_mutex_init(&sql_manager->io_mutex, SWITCH_MUTEX_NESTED, sql_manager->memory_pool);
switch_mutex_init(&sql_manager->cond_mutex, SWITCH_MUTEX_NESTED, sql_manager->memory_pool);
switch_mutex_init(&sql_manager->ctl_mutex, SWITCH_MUTEX_NESTED, sql_manager->memory_pool);
switch_thread_cond_create(&sql_manager->cond, sql_manager->memory_pool);
}
if (!sql_manager.manage) goto skip;
switch_status_t switch_sqldb_connect(switch_sql_manager_t *sql_manager, switch_memory_pool_t *pool)
{
top:
top:
/* Activate SQL database */
if (switch_core_db_handle(&sql_manager.dbh) != SWITCH_STATUS_SUCCESS) {
if (switch_core_db_handle(&sql_manager->dbh) != SWITCH_STATUS_SUCCESS) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error Opening DB!\n");
if (switch_test_flag((&runtime), SCF_CORE_NON_SQLITE_DB_REQ)) {
......@@ -2508,8 +2536,25 @@ switch_status_t switch_core_sqldb_start(switch_memory_pool_t *pool, switch_bool_
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Opening DB\n");
return SWITCH_STATUS_SUCCESS;
}
switch_status_t switch_core_sqldb_start(switch_memory_pool_t *pool, switch_bool_t manage)
{
switch_threadattr_t *thd_attr;
uint32_t sanity = 400;
switch_sql_manager_t *sql_manager = &switch_cache_db_sql_manager;
switch (sql_manager.dbh->type) {
switch_sqldb_init_sql_manager(sql_manager, pool, manage);
if (manage == SWITCH_FALSE) goto skip;
top:
if (switch_sqldb_connect(sql_manager, pool) == SWITCH_STATUS_FALSE) {
return SWITCH_STATUS_FALSE;
}
switch (sql_manager->dbh->type) {
case SCDB_TYPE_PGSQL:
case SCDB_TYPE_ODBC:
if (switch_test_flag((&runtime), SCF_CLEAR_SQL)) {
......@@ -2520,61 +2565,61 @@ switch_status_t switch_core_sqldb_start(switch_memory_pool_t *pool, switch_bool_
for (i = 0; tables[i]; i++) {
switch_snprintfv(sql, sizeof(sql), "delete from %q where hostname='%q'", tables[i], hostname);
switch_cache_db_execute_sql(sql_manager.dbh, sql, NULL);
switch_cache_db_execute_sql(sql_manager->dbh, sql, NULL);
}
}
break;
case SCDB_TYPE_CORE_DB:
{
switch_cache_db_execute_sql(sql_manager.dbh, "drop table channels", NULL);
switch_cache_db_execute_sql(sql_manager.dbh, "drop table calls", NULL);
switch_cache_db_execute_sql(sql_manager.dbh, "drop view detailed_calls", NULL);
switch_cache_db_execute_sql(sql_manager.dbh, "drop view basic_calls", NULL);
switch_cache_db_execute_sql(sql_manager.dbh, "drop table interfaces", NULL);
switch_cache_db_execute_sql(sql_manager.dbh, "drop table tasks", NULL);
switch_cache_db_execute_sql(sql_manager.dbh, "PRAGMA synchronous=OFF;", NULL);
switch_cache_db_execute_sql(sql_manager.dbh, "PRAGMA count_changes=OFF;", NULL);
switch_cache_db_execute_sql(sql_manager.dbh, "PRAGMA default_cache_size=8000", NULL);
switch_cache_db_execute_sql(sql_manager.dbh, "PRAGMA temp_store=MEMORY;", NULL);
switch_cache_db_execute_sql(sql_manager.dbh, "PRAGMA journal_mode=OFF;", NULL);
switch_cache_db_execute_sql(sql_manager->dbh, "drop table channels", NULL);
switch_cache_db_execute_sql(sql_manager->dbh, "drop table calls", NULL);
switch_cache_db_execute_sql(sql_manager->dbh, "drop view detailed_calls", NULL);
switch_cache_db_execute_sql(sql_manager->dbh, "drop view basic_calls", NULL);
switch_cache_db_execute_sql(sql_manager->dbh, "drop table interfaces", NULL);
switch_cache_db_execute_sql(sql_manager->dbh, "drop table tasks", NULL);
switch_cache_db_execute_sql(sql_manager->dbh, "PRAGMA synchronous=OFF;", NULL);
switch_cache_db_execute_sql(sql_manager->dbh, "PRAGMA count_changes=OFF;", NULL);
switch_cache_db_execute_sql(sql_manager->dbh, "PRAGMA default_cache_size=8000", NULL);
switch_cache_db_execute_sql(sql_manager->dbh, "PRAGMA temp_store=MEMORY;", NULL);
switch_cache_db_execute_sql(sql_manager->dbh, "PRAGMA journal_mode=OFF;", NULL);
}
break;
}
switch_cache_db_test_reactive(sql_manager.dbh, "select hostname from aliases", "DROP TABLE aliases", create_alias_sql);
switch_cache_db_test_reactive(sql_manager.dbh, "select hostname from complete", "DROP TABLE complete", create_complete_sql);
switch_cache_db_test_reactive(sql_manager.dbh, "select hostname from nat", "DROP TABLE nat", create_nat_sql);
switch_cache_db_test_reactive(sql_manager.dbh, "delete from registrations where reg_user='' or network_proto='tcp' or network_proto='tls'",
switch_cache_db_test_reactive(sql_manager->dbh, "select hostname from aliases", "DROP TABLE aliases", create_alias_sql);
switch_cache_db_test_reactive(sql_manager->dbh, "select hostname from complete", "DROP TABLE complete", create_complete_sql);
switch_cache_db_test_reactive(sql_manager->dbh, "select hostname from nat", "DROP TABLE nat", create_nat_sql);
switch_cache_db_test_reactive(sql_manager->dbh, "delete from registrations where reg_user='' or network_proto='tcp' or network_proto='tls'",
"DROP TABLE registrations", create_registrations_sql);
switch_cache_db_test_reactive(sql_manager.dbh, "select metadata from registrations", NULL, "ALTER TABLE registrations ADD COLUMN metadata VARCHAR(256)");
switch_cache_db_test_reactive(sql_manager->dbh, "select metadata from registrations", NULL, "ALTER TABLE registrations ADD COLUMN metadata VARCHAR(256)");
switch (sql_manager.dbh->type) {
switch (sql_manager->dbh->type) {
case SCDB_TYPE_PGSQL:
case SCDB_TYPE_ODBC:
{
char *err;
switch_cache_db_test_reactive(sql_manager.dbh, "select call_uuid, read_bit_rate, sent_callee_name from channels", "DROP TABLE channels", create_channels_sql);
switch_cache_db_test_reactive(sql_manager.dbh, "select * from detailed_calls where sent_callee_name=''", "DROP VIEW detailed_calls", detailed_calls_sql);
switch_cache_db_test_reactive(sql_manager.dbh, "select * from basic_calls where sent_callee_name=''", "DROP VIEW basic_calls", basic_calls_sql);
switch_cache_db_test_reactive(sql_manager.dbh, "select call_uuid from calls", "DROP TABLE calls", create_calls_sql);
switch_cache_db_test_reactive(sql_manager->dbh, "select call_uuid, read_bit_rate, sent_callee_name from channels", "DROP TABLE channels", create_channels_sql);
switch_cache_db_test_reactive(sql_manager->dbh, "select * from detailed_calls where sent_callee_name=''", "DROP VIEW detailed_calls", detailed_calls_sql);
switch_cache_db_test_reactive(sql_manager->dbh, "select * from basic_calls where sent_callee_name=''", "DROP VIEW basic_calls", basic_calls_sql);
switch_cache_db_test_reactive(sql_manager->dbh, "select call_uuid from calls", "DROP TABLE calls", create_calls_sql);
if (runtime.odbc_dbtype == DBTYPE_DEFAULT) {
switch_cache_db_test_reactive(sql_manager.dbh, "delete from registrations where reg_user='' or network_proto='tcp' or network_proto='tls'",
switch_cache_db_test_reactive(sql_manager->dbh, "delete from registrations where reg_user='' or network_proto='tcp' or network_proto='tls'",
"DROP TABLE registrations", create_registrations_sql);
} else {
char *tmp = switch_string_replace(create_registrations_sql, "url TEXT", "url VARCHAR(max)");
switch_cache_db_test_reactive(sql_manager.dbh, "delete from registrations where reg_user='' or network_proto='tcp' or network_proto='tls'",
switch_cache_db_test_reactive(sql_manager->dbh, "delete from registrations where reg_user='' or network_proto='tcp' or network_proto='tls'",
"DROP TABLE registrations", tmp);
free(tmp);
}
switch_cache_db_test_reactive(sql_manager.dbh, "select ikey from interfaces", "DROP TABLE interfaces", create_interfaces_sql);
switch_cache_db_test_reactive(sql_manager.dbh, "select hostname from tasks", "DROP TABLE tasks", create_tasks_sql);
switch_cache_db_test_reactive(sql_manager->dbh, "select ikey from interfaces", "DROP TABLE interfaces", create_interfaces_sql);
switch_cache_db_test_reactive(sql_manager->dbh, "select hostname from tasks", "DROP TABLE tasks", create_tasks_sql);
if (runtime.odbc_dbtype == DBTYPE_DEFAULT) {
switch_cache_db_execute_sql(sql_manager.dbh, "begin;delete from channels where hostname='';delete from channels where hostname='';commit;", &err);
switch_cache_db_execute_sql(sql_manager->dbh, "begin;delete from channels where hostname='';delete from channels where hostname='';commit;", &err);
} else {
switch_cache_db_execute_sql(sql_manager.dbh, "delete from channels where hostname='';delete from channels where hostname='';", &err);
switch_cache_db_execute_sql(sql_manager->dbh, "delete from channels where hostname='';delete from channels where hostname='';", &err);
}
if (err) {
......@@ -2582,7 +2627,7 @@ switch_status_t switch_core_sqldb_start(switch_memory_pool_t *pool, switch_bool_
runtime.odbc_user = NULL;
runtime.odbc_pass = NULL;
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Transactions not supported on your DB, disabling non-SQLite support; using SQLite\n");
switch_cache_db_release_db_handle(&sql_manager.dbh);
switch_cache_db_release_db_handle(&sql_manager->dbh);
free(err);
goto top;
}
......@@ -2590,49 +2635,49 @@ switch_status_t switch_core_sqldb_start(switch_memory_pool_t *pool, switch_bool_
break;
case SCDB_TYPE_CORE_DB:
{
switch_cache_db_execute_sql(sql_manager.dbh, create_channels_sql, NULL);
switch_cache_db_execute_sql(sql_manager.dbh, create_calls_sql, NULL);
switch_cache_db_execute_sql(sql_manager.dbh, create_interfaces_sql, NULL);
switch_cache_db_execute_sql(sql_manager.dbh, create_tasks_sql, NULL);
switch_cache_db_execute_sql(sql_manager.dbh, detailed_calls_sql, NULL);
switch_cache_db_execute_sql(sql_manager.dbh, basic_calls_sql, NULL);
switch_cache_db_execute_sql(sql_manager->dbh, create_channels_sql, NULL);
switch_cache_db_execute_sql(sql_manager->dbh, create_calls_sql, NULL);
switch_cache_db_execute_sql(sql_manager->dbh, create_interfaces_sql, NULL);
switch_cache_db_execute_sql(sql_manager->dbh, create_tasks_sql, NULL);
switch_cache_db_execute_sql(sql_manager->dbh, detailed_calls_sql, NULL);
switch_cache_db_execute_sql(sql_manager->dbh, basic_calls_sql, NULL);
}
break;
}
switch_cache_db_execute_sql(sql_manager.dbh, "delete from complete where sticky=0", NULL);
switch_cache_db_execute_sql(sql_manager.dbh, "delete from aliases where sticky=0", NULL);
switch_cache_db_execute_sql(sql_manager.dbh, "delete from nat where sticky=0", NULL);
switch_cache_db_execute_sql(sql_manager.dbh, "create index alias1 on aliases (alias)", NULL);
switch_cache_db_execute_sql(sql_manager.dbh, "create index tasks1 on tasks (hostname,task_id)", NULL);
switch_cache_db_execute_sql(sql_manager.dbh, "create index complete1 on complete (a1,hostname)", NULL);
switch_cache_db_execute_sql(sql_manager.dbh, "create index complete2 on complete (a2,hostname)", NULL);
switch_cache_db_execute_sql(sql_manager.dbh, "create index complete3 on complete (a3,hostname)", NULL);
switch_cache_db_execute_sql(sql_manager.dbh, "create index complete4 on complete (a4,hostname)", NULL);
switch_cache_db_execute_sql(sql_manager.dbh, "create index complete5 on complete (a5,hostname)", NULL);
switch_cache_db_execute_sql(sql_manager.dbh, "create index complete6 on complete (a6,hostname)", NULL);
switch_cache_db_execute_sql(sql_manager.dbh, "create index complete7 on complete (a7,hostname)", NULL);
switch_cache_db_execute_sql(sql_manager.dbh, "create index complete8 on complete (a8,hostname)", NULL);
switch_cache_db_execute_sql(sql_manager.dbh, "create index complete9 on complete (a9,hostname)", NULL);
switch_cache_db_execute_sql(sql_manager.dbh, "create index complete10 on complete (a10,hostname)", NULL);
switch_cache_db_execute_sql(sql_manager.dbh, "create index complete11 on complete (a1,a2,a3,a4,a5,a6,a7,a8,a9,a10,hostname)", NULL);
switch_cache_db_execute_sql(sql_manager.dbh, "create index nat_map_port_proto on nat (port,proto,hostname)", NULL);
switch_cache_db_execute_sql(sql_manager.dbh, "create index channels1 on channels(hostname)", NULL);
switch_cache_db_execute_sql(sql_manager.dbh, "create index calls1 on calls(hostname)", NULL);
switch_cache_db_execute_sql(sql_manager.dbh, "create index chidx1 on channels (hostname)", NULL);
switch_cache_db_execute_sql(sql_manager.dbh, "create index uuindex on channels (uuid)", NULL);
switch_cache_db_execute_sql(sql_manager.dbh, "create index uuindex2 on channels (call_uuid)", NULL);
switch_cache_db_execute_sql(sql_manager.dbh, "create index callsidx1 on calls (hostname)", NULL);
switch_cache_db_execute_sql(sql_manager.dbh, "create index eruuindex on calls (caller_uuid)", NULL);
switch_cache_db_execute_sql(sql_manager.dbh, "create index eeuuindex on calls (callee_uuid)", NULL);
switch_cache_db_execute_sql(sql_manager.dbh, "create index eeuuindex2 on calls (call_uuid)", NULL);
switch_cache_db_execute_sql(sql_manager.dbh, "create index regindex1 on registrations (reg_user,realm,hostname)", NULL);
switch_cache_db_execute_sql(sql_manager->dbh, "delete from complete where sticky=0", NULL);
switch_cache_db_execute_sql(sql_manager->dbh, "delete from aliases where sticky=0", NULL);
switch_cache_db_execute_sql(sql_manager->dbh, "delete from nat where sticky=0", NULL);
switch_cache_db_execute_sql(sql_manager->dbh, "create index alias1 on aliases (alias)", NULL);
switch_cache_db_execute_sql(sql_manager->dbh, "create index tasks1 on tasks (hostname,task_id)", NULL);
switch_cache_db_execute_sql(sql_manager->dbh, "create index complete1 on complete (a1,hostname)", NULL);
switch_cache_db_execute_sql(sql_manager->dbh, "create index complete2 on complete (a2,hostname)", NULL);
switch_cache_db_execute_sql(sql_manager->dbh, "create index complete3 on complete (a3,hostname)", NULL);
switch_cache_db_execute_sql(sql_manager->dbh, "create index complete4 on complete (a4,hostname)", NULL);
switch_cache_db_execute_sql(sql_manager->dbh, "create index complete5 on complete (a5,hostname)", NULL);
switch_cache_db_execute_sql(sql_manager->dbh, "create index complete6 on complete (a6,hostname)", NULL);
switch_cache_db_execute_sql(sql_manager->dbh, "create index complete7 on complete (a7,hostname)", NULL);
switch_cache_db_execute_sql(sql_manager->dbh, "create index complete8 on complete (a8,hostname)", NULL);
switch_cache_db_execute_sql(sql_manager->dbh, "create index complete9 on complete (a9,hostname)", NULL);
switch_cache_db_execute_sql(sql_manager->dbh, "create index complete10 on complete (a10,hostname)", NULL);
switch_cache_db_execute_sql(sql_manager->dbh, "create index complete11 on complete (a1,a2,a3,a4,a5,a6,a7,a8,a9,a10,hostname)", NULL);
switch_cache_db_execute_sql(sql_manager->dbh, "create index nat_map_port_proto on nat (port,proto,hostname)", NULL);
switch_cache_db_execute_sql(sql_manager->dbh, "create index channels1 on channels(hostname)", NULL);
switch_cache_db_execute_sql(sql_manager->dbh, "create index calls1 on calls(hostname)", NULL);
switch_cache_db_execute_sql(sql_manager->dbh, "create index chidx1 on channels (hostname)", NULL);
switch_cache_db_execute_sql(sql_manager->dbh, "create index uuindex on channels (uuid)", NULL);
switch_cache_db_execute_sql(sql_manager->dbh, "create index uuindex2 on channels (call_uuid)", NULL);
switch_cache_db_execute_sql(sql_manager->dbh, "create index callsidx1 on calls (hostname)", NULL);
switch_cache_db_execute_sql(sql_manager->dbh, "create index eruuindex on calls (caller_uuid)", NULL);
switch_cache_db_execute_sql(sql_manager->dbh, "create index eeuuindex on calls (callee_uuid)", NULL);
switch_cache_db_execute_sql(sql_manager->dbh, "create index eeuuindex2 on calls (call_uuid)", NULL);
switch_cache_db_execute_sql(sql_manager->dbh, "create index regindex1 on registrations (reg_user,realm,hostname)", NULL);
skip:
if (sql_manager.manage) {
if (sql_manager->manage) {
#ifdef SWITCH_SQL_BIND_EVERY_EVENT
switch_event_bind("core_db", SWITCH_EVENT_ALL, SWITCH_EVENT_SUBCLASS_ANY, core_event_handler, NULL);
#else
......@@ -2661,16 +2706,16 @@ switch_status_t switch_core_sqldb_start(switch_memory_pool_t *pool, switch_bool_
switch_event_bind("core_db", SWITCH_EVENT_NAT, SWITCH_EVENT_SUBCLASS_ANY, core_event_handler, NULL);
#endif
switch_queue_create(&sql_manager.sql_queue[0], SWITCH_SQL_QUEUE_LEN, sql_manager.memory_pool);
switch_queue_create(&sql_manager.sql_queue[1], SWITCH_SQL_QUEUE_LEN, sql_manager.memory_pool);
switch_queue_create(&sql_manager->sql_queue[0], SWITCH_SQL_QUEUE_LEN, sql_manager->memory_pool);
switch_queue_create(&sql_manager->sql_queue[1], SWITCH_SQL_QUEUE_LEN, sql_manager->memory_pool);
switch_threadattr_create(&thd_attr, sql_manager.memory_pool);
switch_threadattr_create(&thd_attr, sql_manager->memory_pool);
switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE);
switch_threadattr_priority_set(thd_attr, SWITCH_PRI_REALTIME);
switch_core_sqldb_start_thread();
switch_thread_create(&sql_manager.db_thread, thd_attr, switch_core_sql_db_thread, NULL, sql_manager.memory_pool);
switch_thread_create(&sql_manager->db_thread, thd_attr, switch_core_sql_db_thread, sql_manager, sql_manager->memory_pool);
while (sql_manager.manage && !sql_manager.thread_running && --sanity) {
while (sql_manager->manage && !sql_manager->thread_running && --sanity) {
switch_yield(10000);
}
}
......@@ -2678,37 +2723,36 @@ switch_status_t switch_core_sqldb_start(switch_memory_pool_t *pool, switch_bool_
}
SWITCH_DECLARE(void) switch_core_sqldb_stop_thread(void)
SWITCH_DECLARE(void) switch_sqldb_stop_thread(switch_sql_manager_t *sql_manager)
{
switch_mutex_lock(sql_manager.ctl_mutex);
if (sql_manager.thread && sql_manager.thread_running) {
switch_mutex_lock(sql_manager->ctl_mutex);
if (sql_manager->thread && sql_manager->thread_running) {
switch_status_t st;
if (sql_manager.manage) {
switch_queue_push(sql_manager.sql_queue[0], NULL);
switch_queue_push(sql_manager.sql_queue[1], NULL);
if (sql_manager->manage) {
switch_queue_push(sql_manager->sql_queue[0], NULL);
switch_queue_push(sql_manager->sql_queue[1], NULL);
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Waiting for unfinished SQL transactions\n");
wake_thread(0);
sql_manager.thread_running = -1;
switch_thread_join(&st, sql_manager.thread);
sql_manager.thread = NULL;
switch_cache_db_release_db_handle(&sql_manager.dbh);
sql_manager.dbh = NULL;
sql_manager->thread_running = -1;
switch_thread_join(&st, sql_manager->thread);
sql_manager->thread = NULL;
switch_cache_db_release_db_handle(&sql_manager->dbh);
sql_manager->dbh = NULL;
} else {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "SQL is not enabled\n");
}
} else {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "SQL thread is not running\n");
}
switch_mutex_unlock(sql_manager.ctl_mutex);
switch_mutex_unlock(sql_manager->ctl_mutex);
}
SWITCH_DECLARE(void) switch_core_sqldb_start_thread(void)
void switch_core_recovery_create_indices(void)
{
switch_cache_db_handle_t *dbh;
switch_mutex_lock(sql_manager.ctl_mutex);
if (switch_core_recovery_db_handle(&dbh) != SWITCH_STATUS_SUCCESS) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error Opening DB!\n");
......@@ -2728,14 +2772,34 @@ SWITCH_DECLARE(void) switch_core_sqldb_start_thread(void)
switch_cache_db_release_db_handle(&dbh);
}
}
SWITCH_DECLARE(void) switch_core_sqldb_start_thread(void)
{
char *post_connect_sql[2];
post_connect_sql[0] = "delete from channels";
post_connect_sql[1] = "delete from calls";
switch_core_recovery_create_indices();
switch_sqldb_start_thread(&switch_cache_db_sql_manager, switch_core_sql_thread, post_connect_sql, 2);
}
SWITCH_DECLARE(void) switch_sqldb_start_thread(switch_sql_manager_t *sql_manager, void *(SWITCH_THREAD_FUNC * func) (switch_thread_t *, void *),
char *post_connect_sql[], int argc)
{
int i = 0;
switch_mutex_lock(sql_manager->ctl_mutex);
if (sql_manager.manage) {
if (sql_manager->manage) {
top:
if (!sql_manager.dbh) {
if (!sql_manager->dbh) {
/* Activate SQL database */
if (switch_core_db_handle(&sql_manager.dbh) != SWITCH_STATUS_SUCCESS) {
if (switch_core_db_handle(&sql_manager->dbh) != SWITCH_STATUS_SUCCESS) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error Opening DB!\n");
if (switch_test_flag((&runtime), SCF_CORE_NON_SQLITE_DB_REQ)) {
......@@ -2749,7 +2813,7 @@ SWITCH_DECLARE(void) switch_core_sqldb_start_thread(void)
runtime.odbc_pass = NULL;
runtime.odbc_dbtype = DBTYPE_DEFAULT;
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Falling back to core_db.\n");
sql_manager.dbh = NULL;
sql_manager->dbh = NULL;
goto top;
}
......@@ -2758,18 +2822,21 @@ SWITCH_DECLARE(void) switch_core_sqldb_start_thread(void)
goto end;
}
switch_cache_db_execute_sql(sql_manager.dbh, "delete from channels", NULL);
switch_cache_db_execute_sql(sql_manager.dbh, "delete from calls", NULL);
if (argc && post_connect_sql) {
for ( i = 0; i < argc; i++) {
switch_cache_db_execute_sql(sql_manager->dbh, post_connect_sql[i], NULL);
}
}
}
if (!sql_manager.thread) {
if (!sql_manager->thread) {
switch_threadattr_t *thd_attr;
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Starting SQL thread.\n");
switch_threadattr_create(&thd_attr, sql_manager.memory_pool);
switch_threadattr_create(&thd_attr, sql_manager->memory_pool);
switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE);
switch_threadattr_priority_set(thd_attr, SWITCH_PRI_REALTIME);
switch_thread_create(&sql_manager.thread, thd_attr, switch_core_sql_thread, NULL, sql_manager.memory_pool);
switch_thread_create(&sql_manager->thread, thd_attr, func, sql_manager, sql_manager->memory_pool);
} else {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "SQL thread is already running\n");
}
......@@ -2779,28 +2846,38 @@ SWITCH_DECLARE(void) switch_core_sqldb_start_thread(void)
end:
switch_mutex_unlock(sql_manager.ctl_mutex);
switch_mutex_unlock(sql_manager->ctl_mutex);
}
void switch_core_sqldb_stop(void)
void switch_sqldb_stop(switch_sql_manager_t *sql_manager)
{
switch_status_t st;
switch_event_unbind_callback(core_event_handler);
switch_core_sqldb_stop_thread();
switch_sqldb_stop_thread(sql_manager);
if (sql_manager.db_thread && sql_manager.db_thread_running) {
sql_manager.db_thread_running = -1;
switch_thread_join(&st, sql_manager.db_thread);
if (sql_manager->db_thread && sql_manager->db_thread_running) {
sql_manager->db_thread_running = -1;
switch_thread_join(&st, sql_manager->db_thread);
}
switch_cache_db_flush_handles();
sql_close(0);
switch_cache_db_flush_handles(sql_manager);
sql_close(sql_manager, 0);
}
void switch_core_sqldb_stop()
{
switch_sqldb_stop(&switch_cache_db_sql_manager);
}
SWITCH_DECLARE(void) switch_cache_db_status(switch_stream_handle_t *stream)
{
_switch_cache_db_status(&switch_cache_db_sql_manager, stream);
}
SWITCH_DECLARE(void) _switch_cache_db_status(switch_sql_manager_t *sql_manager, switch_stream_handle_t *stream)
{
/* return some status info suitable for the cli */
switch_cache_db_handle_t *dbh = NULL;
......@@ -2811,11 +2888,16 @@ SWITCH_DECLARE(void) switch_cache_db_status(switch_stream_handle_t *stream)
char *pos2 = NULL;
int count = 0, used = 0;
switch_mutex_lock(sql_manager.dbh_mutex);
switch_mutex_lock(sql_manager->dbh_mutex);
for (dbh = sql_manager.handle_pool; dbh; dbh = dbh->next) {
char *needle = "pass=\"";
for (dbh = sql_manager->handle_pool; dbh; dbh = dbh->next) {
char *needles[3];
time_t diff = 0;
int i = 0;
needles[0] = "pass=\"";
needles[1] = "password=";
needles[2] = "password='";
diff = now - dbh->last_used;
......@@ -2828,11 +2910,26 @@ SWITCH_DECLARE(void) switch_cache_db_status(switch_stream_handle_t *stream)
/* sanitize password */
memset(cleankey_str, 0, sizeof(cleankey_str));
pos1 = strstr(dbh->name, needle) + strlen(needle);
pos2 = strstr(pos1, "\"");
strncpy(cleankey_str, dbh->name, pos1 - dbh->name);
strcpy(&cleankey_str[pos1 - dbh->name], pos2);
for (i = 0; i < 3; i++) {
if((pos1 = strstr(dbh->name, needles[i]))) {
pos1 += strlen(needles[i]);
if (!(pos2 = strstr(pos1, "\""))) {
if (!(pos2 = strstr(pos1, "'"))) {
if (!(pos2 = strstr(pos1, " "))) {
pos2 = pos1 + strlen(pos1);
}
}
}
strncpy(cleankey_str, dbh->name, pos1 - dbh->name);
strcpy(&cleankey_str[pos1 - dbh->name], pos2);
break;
}
}
if (i == 3) {
strncpy(cleankey_str, dbh->name, strlen(dbh->name));
}
count++;
if (dbh->use_count) {
......@@ -2850,7 +2947,7 @@ SWITCH_DECLARE(void) switch_cache_db_status(switch_stream_handle_t *stream)
stream->write_function(stream, "%d total. %d in use.\n", count, used);
switch_mutex_unlock(sql_manager.dbh_mutex);
switch_mutex_unlock(sql_manager->dbh_mutex);
}
SWITCH_DECLARE(char*)switch_sql_concat(void)
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论