提交 6ae73314 authored 作者: Andrew Thompson's avatar Andrew Thompson

Rework how spawned outbound pids communicate their pid back to the outbound call…

Rework how spawned outbound pids communicate their pid back to the outbound call process, try to improve some memory management


git-svn-id: http://svn.freeswitch.org/svn/freeswitch/trunk@12475 d0543943-73ff-0310-b7d9-9358b9ac24b2
上级 d7cdbeca
...@@ -131,7 +131,6 @@ int ei_pid_from_rpc(struct ei_cnode_s *ec, int sockfd, erlang_ref *ref, char *mo ...@@ -131,7 +131,6 @@ int ei_pid_from_rpc(struct ei_cnode_s *ec, int sockfd, erlang_ref *ref, char *mo
ei_x_buff buf; ei_x_buff buf;
ei_x_new(&buf); ei_x_new(&buf);
ei_x_encode_list_header(&buf, 1); ei_x_encode_list_header(&buf, 1);
ei_init_ref(ec, ref);
ei_x_encode_ref(&buf, ref); ei_x_encode_ref(&buf, ref);
ei_x_encode_empty_list(&buf); ei_x_encode_empty_list(&buf);
......
...@@ -700,7 +700,8 @@ static switch_status_t handle_msg_atom(listener_t *listener, erlang_msg *msg, e ...@@ -700,7 +700,8 @@ static switch_status_t handle_msg_atom(listener_t *listener, erlang_msg *msg, e
static switch_status_t handle_ref_tuple(listener_t *listener, erlang_msg *msg, ei_x_buff *buf, ei_x_buff *rbuf) static switch_status_t handle_ref_tuple(listener_t *listener, erlang_msg *msg, ei_x_buff *buf, ei_x_buff *rbuf)
{ {
erlang_ref ref; erlang_ref ref;
erlang_pid *pid2, *pid = switch_core_alloc(listener->pool, sizeof(erlang_pid)); erlang_pid *pid;/* = switch_core_alloc(listener->pool, sizeof(erlang_pid));*/
void *p;
char hash[100]; char hash[100];
int arity; int arity;
...@@ -711,6 +712,14 @@ static switch_status_t handle_ref_tuple(listener_t *listener, erlang_msg *msg, e ...@@ -711,6 +712,14 @@ static switch_status_t handle_ref_tuple(listener_t *listener, erlang_msg *msg, e
return SWITCH_STATUS_FALSE; return SWITCH_STATUS_FALSE;
} }
if (!(pid = malloc(sizeof(erlang_pid)))) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Memory Error\n");
ei_x_encode_tuple_header(rbuf, 2);
ei_x_encode_atom(rbuf, "error");
ei_x_encode_atom(rbuf, "badmem");
return SWITCH_STATUS_SUCCESS;
}
if (ei_decode_pid(buf->buff, &buf->index, pid)) { if (ei_decode_pid(buf->buff, &buf->index, pid)) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Invalid pid in a reference/pid tuple\n"); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Invalid pid in a reference/pid tuple\n");
return SWITCH_STATUS_FALSE; return SWITCH_STATUS_FALSE;
...@@ -720,19 +729,36 @@ static switch_status_t handle_ref_tuple(listener_t *listener, erlang_msg *msg, e ...@@ -720,19 +729,36 @@ static switch_status_t handle_ref_tuple(listener_t *listener, erlang_msg *msg, e
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Hashed ref to %s\n", hash); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Hashed ref to %s\n", hash);
if ((pid2 = (erlang_pid *) switch_core_hash_find(listener->spawn_pid_hash, hash))) { if ((p = switch_core_hash_find(listener->spawn_pid_hash, hash))) {
if (pid2 == NULL) { if (p == &globals.TIMEOUT) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Found unfilled slot for %s\n", hash); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Handler for %s timed out\n", hash);
switch_core_hash_delete(listener->spawn_pid_hash, hash);
ei_x_encode_tuple_header(rbuf, 2);
ei_x_encode_atom(rbuf, "error");
ei_x_encode_atom(rbuf, "timeout");
} else if (p == &globals.WAITING) {
/* update the key to point at a pid */
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Found waiting slot for %s\n", hash);
switch_core_hash_delete(listener->spawn_pid_hash, hash);
switch_core_hash_insert(listener->spawn_pid_hash, hash, pid);
return SWITCH_STATUS_FALSE; /*no reply */
} else { } else {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Found filled slot for %s\n", hash); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Found filled slot for %s\n", hash);
ei_x_encode_tuple_header(rbuf, 2);
ei_x_encode_atom(rbuf, "error");
ei_x_encode_atom(rbuf, "duplicate_response");
} }
} else { } else {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "No slot for %s\n", hash); /* nothin in the hash */
switch_core_hash_insert(listener->spawn_pid_hash, hash, pid); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Empty slot for %s\n", hash);
ei_x_encode_tuple_header(rbuf, 2);
ei_x_encode_atom(rbuf, "error");
ei_x_encode_atom(rbuf, "invalid_ref");
} }
/* no reply */ free(pid); /* don't need it */
return SWITCH_STATUS_FALSE;
return SWITCH_STATUS_SUCCESS;
} }
...@@ -758,8 +784,7 @@ int handle_msg(listener_t *listener, erlang_msg *msg, ei_x_buff *buf, ei_x_buff ...@@ -758,8 +784,7 @@ int handle_msg(listener_t *listener, erlang_msg *msg, ei_x_buff *buf, ei_x_buff
break; break;
case ERL_REFERENCE_EXT : case ERL_REFERENCE_EXT :
case ERL_NEW_REFERENCE_EXT : case ERL_NEW_REFERENCE_EXT :
handle_ref_tuple(listener, msg, buf, rbuf); ret = handle_ref_tuple(listener, msg, buf, rbuf);
return 0;
default : default :
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "WEEEEEEEE %d\n", type); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "WEEEEEEEE %d\n", type);
/* some other kind of erlang term */ /* some other kind of erlang term */
......
...@@ -413,7 +413,12 @@ static switch_xml_t erlang_fetch(const char *sectionstr, const char *tag_name, c ...@@ -413,7 +413,12 @@ static switch_xml_t erlang_fetch(const char *sectionstr, const char *tag_name, c
if (type != ERL_STRING_EXT && type != ERL_BINARY_EXT) /* XXX no unicode or character codes > 255 */ if (type != ERL_STRING_EXT && type != ERL_BINARY_EXT) /* XXX no unicode or character codes > 255 */
return NULL; return NULL;
char *xmlstr = switch_core_alloc(ptr->listener->pool, size + 1); char *xmlstr;
if (!(xmlstr = malloc(size + 1))) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Memory Error\n");
return NULL;
}
ei_decode_string_or_binary(rep->buff, &rep->index, size, xmlstr); ei_decode_string_or_binary(rep->buff, &rep->index, size, xmlstr);
...@@ -431,7 +436,7 @@ static switch_xml_t erlang_fetch(const char *sectionstr, const char *tag_name, c ...@@ -431,7 +436,7 @@ static switch_xml_t erlang_fetch(const char *sectionstr, const char *tag_name, c
/*switch_safe_free(rep->buff);*/ /*switch_safe_free(rep->buff);*/
/*switch_safe_free(rep);*/ /*switch_safe_free(rep);*/
/*switch_safe_free(xmlstr);*/ free(xmlstr);
return xml; return xml;
} }
...@@ -811,7 +816,6 @@ static void *SWITCH_THREAD_FUNC listener_run(switch_thread_t *thread, void *obj) ...@@ -811,7 +816,6 @@ static void *SWITCH_THREAD_FUNC listener_run(switch_thread_t *thread, void *obj)
switch_mutex_unlock(globals.listener_mutex); switch_mutex_unlock(globals.listener_mutex);
switch_core_hash_init(&listener->fetch_reply_hash, listener->pool); switch_core_hash_init(&listener->fetch_reply_hash, listener->pool);
switch_core_hash_init(&listener->spawn_pid_hash, listener->pool);
switch_assert(listener != NULL); switch_assert(listener != NULL);
...@@ -1071,6 +1075,7 @@ session_elem_t* attach_call_to_spawned_process(listener_t* listener, char *modul ...@@ -1071,6 +1075,7 @@ session_elem_t* attach_call_to_spawned_process(listener_t* listener, char *modul
else { else {
char hash[100]; char hash[100];
int i = 0; int i = 0;
void *p = NULL;
session_element->session = session; session_element->session = session;
erlang_pid *pid; erlang_pid *pid;
erlang_ref ref; erlang_ref ref;
...@@ -1081,12 +1086,16 @@ session_elem_t* attach_call_to_spawned_process(listener_t* listener, char *modul ...@@ -1081,12 +1086,16 @@ session_elem_t* attach_call_to_spawned_process(listener_t* listener, char *modul
/* attach the session to the listener */ /* attach the session to the listener */
add_session_elem_to_listener(listener,session_element); add_session_elem_to_listener(listener,session_element);
ei_init_ref(listener->ec, &ref);
ei_hash_ref(&ref, hash);
/* insert the waiting marker */
switch_core_hash_insert(listener->spawn_pid_hash, hash, &globals.WAITING);
if (!strcmp(function, "!")) { if (!strcmp(function, "!")) {
/* send a message to request a pid */ /* send a message to request a pid */
ei_x_buff rbuf; ei_x_buff rbuf;
ei_x_new_with_version(&rbuf); ei_x_new_with_version(&rbuf);
ei_init_ref(listener->ec, &ref);
ei_x_encode_tuple_header(&rbuf, 3); ei_x_encode_tuple_header(&rbuf, 3);
ei_x_encode_atom(&rbuf, "new_pid"); ei_x_encode_atom(&rbuf, "new_pid");
ei_x_encode_ref(&rbuf, &ref); ei_x_encode_ref(&rbuf, &ref);
...@@ -1107,23 +1116,27 @@ session_elem_t* attach_call_to_spawned_process(listener_t* listener, char *modul ...@@ -1107,23 +1116,27 @@ session_elem_t* attach_call_to_spawned_process(listener_t* listener, char *modul
*/ */
} }
ei_hash_ref(&ref, hash); /* loop until either we timeout or we get a value that's not the waiting marker */
while (!(p = switch_core_hash_find(listener->spawn_pid_hash, hash)) || p == &globals.WAITING) {
while (!(pid = (erlang_pid *) switch_core_hash_find(listener->spawn_pid_hash, hash))) {
if (i > 50) { /* half a second timeout */ if (i > 50) { /* half a second timeout */
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Timed out when waiting for outbound pid\n"); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Timed out when waiting for outbound pid\n");
switch_core_session_rwunlock(session); switch_core_session_rwunlock(session);
remove_session_elem_from_listener(listener,session_element); remove_session_elem_from_listener(listener,session_element);
switch_core_hash_insert(listener->spawn_pid_hash, hash, &globals.TIMEOUT); /* TODO lock this? */
return NULL; return NULL;
} }
i++; i++;
switch_yield(10000); /* 10ms */ switch_yield(10000); /* 10ms */
} }
switch_core_hash_delete(listener->spawn_pid_hash, hash);
pid = (erlang_pid *) p;
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "got pid!\n"); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "got pid!\n");
session_element->process.type = ERLANG_PID; session_element->process.type = ERLANG_PID;
memcpy(&session_element->process.pid, pid, sizeof(erlang_pid)); memcpy(&session_element->process.pid, pid, sizeof(erlang_pid));
free(pid); /* malloced in handle_ref_tuple */
switch_set_flag(session_element, LFLAG_SESSION_ALIVE); switch_set_flag(session_element, LFLAG_SESSION_ALIVE);
switch_clear_flag(session_element, LFLAG_OUTBOUND_INIT); switch_clear_flag(session_element, LFLAG_OUTBOUND_INIT);
switch_clear_flag(session_element, LFLAG_WAITING_FOR_PID); switch_clear_flag(session_element, LFLAG_WAITING_FOR_PID);
...@@ -1222,6 +1235,7 @@ SWITCH_STANDARD_APP(erlang_outbound_function) ...@@ -1222,6 +1235,7 @@ SWITCH_STANDARD_APP(erlang_outbound_function)
} }
if (module && function) { if (module && function) {
switch_core_hash_init(&listener->spawn_pid_hash, listener->pool);
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Creating new spawned session for listener\n"); switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Creating new spawned session for listener\n");
session_element=attach_call_to_spawned_process(listener, module, function, session); session_element=attach_call_to_spawned_process(listener, module, function, session);
} else { } else {
......
...@@ -136,6 +136,8 @@ struct globals_struct { ...@@ -136,6 +136,8 @@ struct globals_struct {
unsigned int reference0; unsigned int reference0;
unsigned int reference1; unsigned int reference1;
unsigned int reference2; unsigned int reference2;
char TIMEOUT; /* marker for a timed out request */
char WAITING; /* marker for a request waiting for a response */
switch_mutex_t *ref_mutex; switch_mutex_t *ref_mutex;
}; };
typedef struct globals_struct globals_t; typedef struct globals_struct globals_t;
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论