Skip to content
项目
群组
代码片段
帮助
正在加载...
登录
切换导航
F
freeswitch
项目
项目
详情
活动
周期分析
仓库
仓库
文件
提交
分支
标签
贡献者
分枝图
比较
统计图
议题
0
议题
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
CI / CD
CI / CD
流水线
作业
日程
统计图
Wiki
Wiki
代码片段
代码片段
成员
成员
折叠边栏
关闭边栏
活动
分枝图
统计图
创建新议题
作业
提交
议题看板
打开侧边栏
张华
freeswitch
Commits
a7add335
提交
a7add335
authored
2月 15, 2017
作者:
Shane Bryldt
提交者:
Mike Jerris
3月 22, 2017
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
FS-9952: Committing to show problem with ks_pool_resize
上级
d6d8ede6
隐藏空白字符变更
内嵌
并排
正在显示
7 个修改的文件
包含
269 行增加
和
136 行删除
+269
-136
blade_identity.c
libs/libblade/src/blade_identity.c
+2
-5
blade_module_wss.c
libs/libblade/src/blade_module_wss.c
+206
-100
blade_stack.c
libs/libblade/src/blade_stack.c
+34
-7
blade_identity.h
libs/libblade/src/include/blade_identity.h
+1
-1
blade_stack.h
libs/libblade/src/include/blade_stack.h
+1
-1
bladec.c
libs/libblade/test/bladec.c
+25
-4
bladec2.cfg
libs/libblade/test/bladec2.cfg
+0
-18
没有找到文件。
libs/libblade/src/blade_identity.c
浏览文件 @
a7add335
...
...
@@ -150,15 +150,12 @@ KS_DECLARE(const char *) blade_identity_uri(blade_identity_t *bi)
return
bi
->
uri
;
}
KS_DECLARE
(
ks_status_t
)
blade_identity_parameter_get
(
blade_identity_t
*
bi
,
const
char
*
key
,
const
char
**
value
)
KS_DECLARE
(
const
char
*
)
blade_identity_parameter_get
(
blade_identity_t
*
bi
,
const
char
*
key
)
{
ks_assert
(
bi
);
ks_assert
(
key
);
ks_assert
(
value
);
*
value
=
(
const
char
*
)
ks_hash_search
(
bi
->
parameters
,
(
void
*
)
key
,
KS_UNLOCKED
);
return
KS_STATUS_SUCCESS
;
return
(
const
char
*
)
ks_hash_search
(
bi
->
parameters
,
(
void
*
)
key
,
KS_UNLOCKED
);
}
...
...
libs/libblade/src/blade_module_wss.c
浏览文件 @
a7add335
...
...
@@ -229,6 +229,38 @@ KS_DECLARE(ks_status_t) blade_module_wss_on_unload(blade_module_t *bm)
return
KS_STATUS_SUCCESS
;
}
ks_status_t
blade_transport_wss_init_create
(
blade_transport_wss_init_t
**
bt_wssiP
,
blade_module_wss_t
*
bm_wss
,
ks_socket_t
sock
)
{
blade_transport_wss_init_t
*
bt_wssi
=
NULL
;
ks_assert
(
bt_wssiP
);
ks_assert
(
bm_wss
);
ks_assert
(
sock
!=
KS_SOCK_INVALID
);
bt_wssi
=
ks_pool_alloc
(
bm_wss
->
pool
,
sizeof
(
blade_transport_wss_init_t
));
bt_wssi
->
module
=
bm_wss
;
bt_wssi
->
pool
=
bm_wss
->
pool
;
bt_wssi
->
sock
=
sock
;
*
bt_wssiP
=
bt_wssi
;
return
KS_STATUS_SUCCESS
;
}
ks_status_t
blade_transport_wss_init_destroy
(
blade_transport_wss_init_t
**
bt_wssiP
)
{
blade_transport_wss_init_t
*
bt_wssi
=
NULL
;
ks_assert
(
bt_wssiP
);
ks_assert
(
*
bt_wssiP
);
bt_wssi
=
*
bt_wssiP
;
ks_pool_free
(
bt_wssi
->
pool
,
bt_wssiP
);
return
KS_STATUS_SUCCESS
;
}
ks_status_t
blade_module_wss_config
(
blade_module_wss_t
*
bm_wss
,
config_setting_t
*
config
)
{
config_setting_t
*
wss
=
NULL
;
...
...
@@ -254,73 +286,71 @@ ks_status_t blade_module_wss_config(blade_module_wss_t *bm_wss, config_setting_t
}
wss
=
config_setting_get_member
(
config
,
"wss"
);
if
(
!
wss
)
{
ks_log
(
KS_LOG_DEBUG
,
"!wss
\n
"
);
return
KS_STATUS_FAIL
;
}
wss_endpoints
=
config_setting_get_member
(
wss
,
"endpoints"
);
if
(
!
wss_endpoints
)
{
ks_log
(
KS_LOG_DEBUG
,
"!wss_endpoints
\n
"
);
return
KS_STATUS_FAIL
;
}
wss_endpoints_ipv4
=
config_lookup_from
(
wss_endpoints
,
"ipv4"
);
wss_endpoints_ipv6
=
config_lookup_from
(
wss_endpoints
,
"ipv6"
);
if
(
wss_endpoints_ipv4
)
{
if
(
config_setting_type
(
wss_endpoints_ipv4
)
!=
CONFIG_TYPE_LIST
)
return
KS_STATUS_FAIL
;
if
((
config_wss_endpoints_ipv4_length
=
config_setting_length
(
wss_endpoints_ipv4
))
>
BLADE_MODULE_WSS_ENDPOINTS_MULTIHOME_MAX
)
if
(
wss
)
{
wss_endpoints
=
config_setting_get_member
(
wss
,
"endpoints"
);
if
(
!
wss_endpoints
)
{
ks_log
(
KS_LOG_DEBUG
,
"!wss_endpoints
\n
"
);
return
KS_STATUS_FAIL
;
for
(
int32_t
index
=
0
;
index
<
config_wss_endpoints_ipv4_length
;
++
index
)
{
element
=
config_setting_get_elem
(
wss_endpoints_ipv4
,
index
);
tmp1
=
config_lookup_from
(
element
,
"address"
);
tmp2
=
config_lookup_from
(
element
,
"port"
);
if
(
!
tmp1
||
!
tmp2
)
return
KS_STATUS_FAIL
;
if
(
config_setting_type
(
tmp1
)
!=
CONFIG_TYPE_STRING
)
return
KS_STATUS_FAIL
;
if
(
config_setting_type
(
tmp2
)
!=
CONFIG_TYPE_INT
)
return
KS_STATUS_FAIL
;
if
(
ks_addr_set
(
&
config_wss_endpoints_ipv4
[
index
],
config_setting_get_string
(
tmp1
),
config_setting_get_int
(
tmp2
),
AF_INET
)
!=
KS_STATUS_SUCCESS
)
return
KS_STATUS_FAIL
;
ks_log
(
KS_LOG_DEBUG
,
"Binding to IPV4 %s on port %d
\n
"
,
ks_addr_get_host
(
&
config_wss_endpoints_ipv4
[
index
]),
ks_addr_get_port
(
&
config_wss_endpoints_ipv4
[
index
]));
}
}
if
(
wss_endpoints_ipv6
)
{
if
(
config_setting_type
(
wss_endpoints_ipv6
)
!=
CONFIG_TYPE_LIST
)
return
KS_STATUS_FAIL
;
if
((
config_wss_endpoints_ipv6_length
=
config_setting_length
(
wss_endpoints_ipv6
))
>
BLADE_MODULE_WSS_ENDPOINTS_MULTIHOME_MAX
)
return
KS_STATUS_FAIL
;
wss_endpoints_ipv4
=
config_lookup_from
(
wss_endpoints
,
"ipv4"
);
wss_endpoints_ipv6
=
config_lookup_from
(
wss_endpoints
,
"ipv6"
);
if
(
wss_endpoints_ipv4
)
{
if
(
config_setting_type
(
wss_endpoints_ipv4
)
!=
CONFIG_TYPE_LIST
)
return
KS_STATUS_FAIL
;
if
((
config_wss_endpoints_ipv4_length
=
config_setting_length
(
wss_endpoints_ipv4
))
>
BLADE_MODULE_WSS_ENDPOINTS_MULTIHOME_MAX
)
return
KS_STATUS_FAIL
;
for
(
int32_t
index
=
0
;
index
<
config_wss_endpoints_ipv6_length
;
++
index
)
{
element
=
config_setting_get_elem
(
wss_endpoints_ipv6
,
index
);
tmp1
=
config_lookup_from
(
element
,
"address"
);
tmp2
=
config_lookup_from
(
element
,
"port"
);
if
(
!
tmp1
||
!
tmp2
)
return
KS_STATUS_FAIL
;
if
(
config_setting_type
(
tmp1
)
!=
CONFIG_TYPE_STRING
)
return
KS_STATUS_FAIL
;
if
(
config_setting_type
(
tmp2
)
!=
CONFIG_TYPE_INT
)
return
KS_STATUS_FAIL
;
if
(
ks_addr_set
(
&
config_wss_endpoints_ipv6
[
index
],
config_setting_get_string
(
tmp1
),
config_setting_get_int
(
tmp2
),
AF_INET6
)
!=
KS_STATUS_SUCCESS
)
return
KS_STATUS_FAIL
;
ks_log
(
KS_LOG_DEBUG
,
"Binding to IPV6 %s on port %d
\n
"
,
ks_addr_get_host
(
&
config_wss_endpoints_ipv6
[
index
]),
ks_addr_get_port
(
&
config_wss_endpoints_ipv6
[
index
]));
for
(
int32_t
index
=
0
;
index
<
config_wss_endpoints_ipv4_length
;
++
index
)
{
element
=
config_setting_get_elem
(
wss_endpoints_ipv4
,
index
);
tmp1
=
config_lookup_from
(
element
,
"address"
);
tmp2
=
config_lookup_from
(
element
,
"port"
);
if
(
!
tmp1
||
!
tmp2
)
return
KS_STATUS_FAIL
;
if
(
config_setting_type
(
tmp1
)
!=
CONFIG_TYPE_STRING
)
return
KS_STATUS_FAIL
;
if
(
config_setting_type
(
tmp2
)
!=
CONFIG_TYPE_INT
)
return
KS_STATUS_FAIL
;
if
(
ks_addr_set
(
&
config_wss_endpoints_ipv4
[
index
],
config_setting_get_string
(
tmp1
),
config_setting_get_int
(
tmp2
),
AF_INET
)
!=
KS_STATUS_SUCCESS
)
return
KS_STATUS_FAIL
;
ks_log
(
KS_LOG_DEBUG
,
"Binding to IPV4 %s on port %d
\n
"
,
ks_addr_get_host
(
&
config_wss_endpoints_ipv4
[
index
]),
ks_addr_get_port
(
&
config_wss_endpoints_ipv4
[
index
]));
}
}
if
(
wss_endpoints_ipv6
)
{
if
(
config_setting_type
(
wss_endpoints_ipv6
)
!=
CONFIG_TYPE_LIST
)
return
KS_STATUS_FAIL
;
if
((
config_wss_endpoints_ipv6_length
=
config_setting_length
(
wss_endpoints_ipv6
))
>
BLADE_MODULE_WSS_ENDPOINTS_MULTIHOME_MAX
)
return
KS_STATUS_FAIL
;
for
(
int32_t
index
=
0
;
index
<
config_wss_endpoints_ipv6_length
;
++
index
)
{
element
=
config_setting_get_elem
(
wss_endpoints_ipv6
,
index
);
tmp1
=
config_lookup_from
(
element
,
"address"
);
tmp2
=
config_lookup_from
(
element
,
"port"
);
if
(
!
tmp1
||
!
tmp2
)
return
KS_STATUS_FAIL
;
if
(
config_setting_type
(
tmp1
)
!=
CONFIG_TYPE_STRING
)
return
KS_STATUS_FAIL
;
if
(
config_setting_type
(
tmp2
)
!=
CONFIG_TYPE_INT
)
return
KS_STATUS_FAIL
;
if
(
ks_addr_set
(
&
config_wss_endpoints_ipv6
[
index
],
config_setting_get_string
(
tmp1
),
config_setting_get_int
(
tmp2
),
AF_INET6
)
!=
KS_STATUS_SUCCESS
)
return
KS_STATUS_FAIL
;
ks_log
(
KS_LOG_DEBUG
,
"Binding to IPV6 %s on port %d
\n
"
,
ks_addr_get_host
(
&
config_wss_endpoints_ipv6
[
index
]),
ks_addr_get_port
(
&
config_wss_endpoints_ipv6
[
index
]));
}
}
if
(
config_wss_endpoints_ipv4_length
+
config_wss_endpoints_ipv6_length
<=
0
)
return
KS_STATUS_FAIL
;
tmp1
=
config_lookup_from
(
wss_endpoints
,
"backlog"
);
if
(
tmp1
)
{
if
(
config_setting_type
(
tmp1
)
!=
CONFIG_TYPE_INT
)
return
KS_STATUS_FAIL
;
config_wss_endpoints_backlog
=
config_setting_get_int
(
tmp1
);
}
wss_ssl
=
config_setting_get_member
(
wss
,
"ssl"
);
if
(
wss_ssl
)
{
// @todo: SSL stuffs from wss_ssl into config_wss_ssl envelope
}
}
if
(
config_wss_endpoints_ipv4_length
+
config_wss_endpoints_ipv6_length
<=
0
)
return
KS_STATUS_FAIL
;
tmp1
=
config_lookup_from
(
wss_endpoints
,
"backlog"
);
if
(
tmp1
)
{
if
(
config_setting_type
(
tmp1
)
!=
CONFIG_TYPE_INT
)
return
KS_STATUS_FAIL
;
config_wss_endpoints_backlog
=
config_setting_get_int
(
tmp1
);
}
wss_ssl
=
config_setting_get_member
(
wss
,
"ssl"
);
if
(
wss_ssl
)
{
// @todo: SSL stuffs from wss_ssl into config_wss_ssl envelope
}
...
...
@@ -454,6 +484,11 @@ ks_status_t blade_module_wss_listen(blade_module_wss_t *bm_wss, ks_sockaddr_t *a
goto
done
;
}
ks_log
(
KS_LOG_DEBUG
,
"Listeners Before
\n
"
);
for
(
int
index
=
0
;
index
<
bm_wss
->
listeners_count
;
++
index
)
{
ks_log
(
KS_LOG_DEBUG
,
" Listener %d = %d
\n
"
,
index
,
bm_wss
->
listeners_poll
[
index
].
fd
);
}
listener_index
=
bm_wss
->
listeners_count
++
;
bm_wss
->
listeners_poll
=
(
struct
pollfd
*
)
ks_pool_resize
(
bm_wss
->
pool
,
bm_wss
->
listeners_poll
,
...
...
@@ -462,6 +497,11 @@ ks_status_t blade_module_wss_listen(blade_module_wss_t *bm_wss, ks_sockaddr_t *a
bm_wss
->
listeners_poll
[
listener_index
].
fd
=
listener
;
bm_wss
->
listeners_poll
[
listener_index
].
events
=
POLLIN
|
POLLERR
;
ks_log
(
KS_LOG_DEBUG
,
"Listeners After
\n
"
);
for
(
int
index
=
0
;
index
<
bm_wss
->
listeners_count
;
++
index
)
{
ks_log
(
KS_LOG_DEBUG
,
" Listener %d = %d
\n
"
,
index
,
bm_wss
->
listeners_poll
[
index
].
fd
);
}
done
:
if
(
ret
!=
KS_STATUS_SUCCESS
)
{
if
(
listener
!=
KS_SOCK_INVALID
)
{
...
...
@@ -484,26 +524,30 @@ void *blade_module_wss_listeners_thread(ks_thread_t *thread, void *data)
bm_wss
=
(
blade_module_wss_t
*
)
data
;
ks_log
(
KS_LOG_DEBUG
,
"Started
\n
"
);
while
(
!
bm_wss
->
shutdown
)
{
// @todo take exact timeout from a setting in config_wss_endpoints
if
(
ks_poll
(
bm_wss
->
listeners_poll
,
bm_wss
->
listeners_count
,
100
)
>
0
)
{
for
(
int32_t
index
=
0
;
index
<
bm_wss
->
listeners_count
;
++
index
)
{
ks_socket_t
sock
=
KS_SOCK_INVALID
;
if
(
!
(
bm_wss
->
listeners_poll
[
index
].
revents
&
POLLIN
))
continue
;
if
(
bm_wss
->
listeners_poll
[
index
].
revents
&
POLLERR
)
{
// @todo: error handling, just skip the listener for now, it might recover, could skip X times before closing?
ks_log
(
KS_LOG_DEBUG
,
"Listener POLLERR
\n
"
);
continue
;
}
if
(
!
(
bm_wss
->
listeners_poll
[
index
].
revents
&
POLLIN
))
continue
;
if
((
sock
=
accept
(
bm_wss
->
listeners_poll
[
index
].
fd
,
NULL
,
NULL
))
==
KS_SOCK_INVALID
)
{
// @todo: error handling, just skip the socket for now as most causes are because remote side became unreachable
continue
;
}
ks_log
(
KS_LOG_DEBUG
,
"Socket Accepted
\n
"
);
blade_transport_wss_init_create
(
&
bt_wss_init
,
bm_wss
,
sock
);
ks_assert
(
bt_wss_init
);
blade_connection_create
(
&
bc
,
bm_wss
->
handle
,
bt_wss_init
,
bm_wss
->
transport_callbacks
);
ks_assert
(
bc
);
...
...
@@ -529,6 +573,7 @@ void *blade_module_wss_listeners_thread(ks_thread_t *thread, void *data)
if
(
bt_wss
)
blade_transport_wss_destroy
(
&
bt_wss
);
}
}
ks_log
(
KS_LOG_DEBUG
,
"Stopped
\n
"
);
return
NULL
;
}
...
...
@@ -572,16 +617,86 @@ ks_status_t blade_transport_wss_destroy(blade_transport_wss_t **bt_wssP)
ks_status_t
blade_transport_wss_on_connect
(
blade_connection_t
**
bcP
,
blade_module_t
*
bm
,
blade_identity_t
*
target
)
{
ks_status_t
ret
=
KS_STATUS_SUCCESS
;
blade_module_wss_t
*
bm_wss
=
NULL
;
ks_sockaddr_t
addr
;
ks_socket_t
sock
=
KS_SOCK_INVALID
;
int
family
=
AF_INET
;
const
char
*
ip
=
NULL
;
const
char
*
portstr
=
NULL
;
ks_port_t
port
=
1234
;
blade_transport_wss_init_t
*
bt_wss_init
=
NULL
;
blade_connection_t
*
bc
=
NULL
;
ks_assert
(
bcP
);
ks_assert
(
bm
);
ks_assert
(
target
);
bm_wss
=
(
blade_module_wss_t
*
)
blade_module_data_get
(
bm
);
*
bcP
=
NULL
;
// @todo connect-out equivilent of accept
ks_log
(
KS_LOG_DEBUG
,
"Connect Callback: %s
\n
"
,
blade_identity_uri
(
target
));
return
KS_STATUS_SUCCESS
;
// @todo completely rework all of this once more is known about connecting when an identity has no explicit transport details but this transport
// has been choosen anyway
ip
=
blade_identity_parameter_get
(
target
,
"host"
);
portstr
=
blade_identity_parameter_get
(
target
,
"port"
);
if
(
!
ip
)
{
// @todo: temporary, this should fall back on DNS SRV or whatever else can turn "a@b.com" into an ip (and port?) to connect to
// also need to deal with hostname lookup, so identities with wss transport need to have a host parameter that is an IP for the moment
ret
=
KS_STATUS_FAIL
;
goto
done
;
}
// @todo wrap this code to get address family from string IP between IPV4 and IPV6, and put it in libks somewhere
{
ks_size_t
len
=
strlen
(
ip
);
if
(
len
<=
3
)
{
ret
=
KS_STATUS_FAIL
;
goto
done
;
}
if
(
ip
[
1
]
==
'.'
||
ip
[
2
]
==
'.'
||
(
len
>
3
&&
ip
[
3
]
==
'.'
))
family
=
AF_INET
;
else
family
=
AF_INET6
;
}
if
(
portstr
)
{
int
p
=
atoi
(
portstr
);
if
(
p
>
0
&&
p
<=
UINT16_MAX
)
port
=
p
;
}
ks_addr_set
(
&
addr
,
ip
,
port
,
family
);
if
((
sock
=
ks_socket_connect
(
SOCK_STREAM
,
IPPROTO_TCP
,
&
addr
))
==
KS_SOCK_INVALID
)
{
// @todo: error handling, just fail for now as most causes are because remote side became unreachable
ret
=
KS_STATUS_FAIL
;
goto
done
;
}
ks_log
(
KS_LOG_DEBUG
,
"Socket Connected
\n
"
);
blade_transport_wss_init_create
(
&
bt_wss_init
,
bm_wss
,
sock
);
ks_assert
(
bt_wss_init
);
blade_connection_create
(
&
bc
,
bm_wss
->
handle
,
bt_wss_init
,
bm_wss
->
transport_callbacks
);
ks_assert
(
bc
);
if
(
blade_connection_startup
(
bc
,
BLADE_CONNECTION_DIRECTION_OUTBOUND
)
!=
KS_STATUS_SUCCESS
)
{
blade_connection_destroy
(
&
bc
);
blade_transport_wss_init_destroy
(
&
bt_wss_init
);
ks_socket_close
(
&
sock
);
ret
=
KS_STATUS_FAIL
;
goto
done
;
}
// @todo make sure it's sensible to be mixing outbound and inbound connections in the same list, but this allows entering the destruction pipeline
// for module shutdown, disconnects and errors without special considerations
list_append
(
&
bm_wss
->
connected
,
bc
);
*
bcP
=
bc
;
blade_connection_state_set
(
bc
,
BLADE_CONNECTION_STATE_NEW
);
done
:
return
ret
;
}
blade_connection_rank_t
blade_transport_wss_on_rank
(
blade_connection_t
*
bc
,
blade_identity_t
*
target
)
...
...
@@ -723,12 +838,22 @@ blade_connection_state_hook_t blade_transport_wss_on_state_new_inbound(blade_con
blade_connection_state_hook_t
blade_transport_wss_on_state_new_outbound
(
blade_connection_t
*
bc
,
blade_connection_state_condition_t
condition
)
{
blade_transport_wss_t
*
bt_wss
=
NULL
;
blade_transport_wss_init_t
*
bt_wss_init
=
NULL
;
ks_assert
(
bc
);
ks_log
(
KS_LOG_DEBUG
,
"State Callback: %d
\n
"
,
(
int32_t
)
condition
);
if
(
condition
==
BLADE_CONNECTION_STATE_CONDITION_PRE
)
return
BLADE_CONNECTION_STATE_HOOK_SUCCESS
;
bt_wss_init
=
(
blade_transport_wss_init_t
*
)
blade_connection_transport_init_get
(
bc
);
blade_transport_wss_create
(
&
bt_wss
,
bt_wss_init
->
module
,
bt_wss_init
->
sock
);
ks_assert
(
bt_wss
);
blade_connection_transport_set
(
bc
,
bt_wss
);
return
BLADE_CONNECTION_STATE_HOOK_SUCCESS
;
}
...
...
@@ -755,10 +880,22 @@ blade_connection_state_hook_t blade_transport_wss_on_state_connect_inbound(blade
blade_connection_state_hook_t
blade_transport_wss_on_state_connect_outbound
(
blade_connection_t
*
bc
,
blade_connection_state_condition_t
condition
)
{
blade_transport_wss_t
*
bt_wss
=
NULL
;
ks_assert
(
bc
);
ks_log
(
KS_LOG_DEBUG
,
"State Callback: %d
\n
"
,
(
int32_t
)
condition
);
if
(
condition
==
BLADE_CONNECTION_STATE_CONDITION_PRE
)
return
BLADE_CONNECTION_STATE_HOOK_SUCCESS
;
bt_wss
=
(
blade_transport_wss_t
*
)
blade_connection_transport_get
(
bc
);
// @todo: SSL init stuffs based on data from config to pass into kws_init
if
(
kws_init
(
&
bt_wss
->
kws
,
bt_wss
->
sock
,
NULL
,
"/blade:blade.invalid:blade"
,
KWS_BLOCK
,
bt_wss
->
pool
)
!=
KS_STATUS_SUCCESS
)
{
// @todo error logging
return
BLADE_CONNECTION_STATE_HOOK_DISCONNECT
;
}
return
BLADE_CONNECTION_STATE_HOOK_SUCCESS
;
}
...
...
@@ -781,7 +918,8 @@ blade_connection_state_hook_t blade_transport_wss_on_state_attach_outbound(blade
ks_log
(
KS_LOG_DEBUG
,
"State Callback: %d
\n
"
,
(
int32_t
)
condition
);
return
BLADE_CONNECTION_STATE_HOOK_SUCCESS
;
ks_sleep_ms
(
1000
);
// @todo temporary testing, remove this and return success once negotiations are done
return
BLADE_CONNECTION_STATE_HOOK_BYPASS
;
}
blade_connection_state_hook_t
blade_transport_wss_on_state_detach
(
blade_connection_t
*
bc
,
blade_connection_state_condition_t
condition
)
...
...
@@ -802,38 +940,6 @@ blade_connection_state_hook_t blade_transport_wss_on_state_ready(blade_connectio
return
BLADE_CONNECTION_STATE_HOOK_SUCCESS
;
}
ks_status_t
blade_transport_wss_init_create
(
blade_transport_wss_init_t
**
bt_wssiP
,
blade_module_wss_t
*
bm_wss
,
ks_socket_t
sock
)
{
blade_transport_wss_init_t
*
bt_wssi
=
NULL
;
ks_assert
(
bt_wssiP
);
ks_assert
(
bm_wss
);
ks_assert
(
sock
!=
KS_SOCK_INVALID
);
bt_wssi
=
ks_pool_alloc
(
bm_wss
->
pool
,
sizeof
(
blade_transport_wss_init_t
));
bt_wssi
->
module
=
bm_wss
;
bt_wssi
->
pool
=
bm_wss
->
pool
;
bt_wssi
->
sock
=
sock
;
*
bt_wssiP
=
bt_wssi
;
return
KS_STATUS_SUCCESS
;
}
ks_status_t
blade_transport_wss_init_destroy
(
blade_transport_wss_init_t
**
bt_wssiP
)
{
blade_transport_wss_init_t
*
bt_wssi
=
NULL
;
ks_assert
(
bt_wssiP
);
ks_assert
(
*
bt_wssiP
);
bt_wssi
=
*
bt_wssiP
;
ks_pool_free
(
bt_wssi
->
pool
,
bt_wssiP
);
return
KS_STATUS_SUCCESS
;
}
/* For Emacs:
* Local Variables:
* mode:c
...
...
libs/libblade/src/blade_stack.c
浏览文件 @
a7add335
...
...
@@ -44,11 +44,12 @@ struct blade_handle_s {
ks_pool_t
*
pool
;
ks_thread_pool_t
*
tpool
;
config_setting_t
*
config_
service
;
config_setting_t
*
config_
directory
;
config_setting_t
*
config_datastore
;
ks_hash_t
*
transports
;
blade_identity_t
*
identity
;
blade_datastore_t
*
datastore
;
};
...
...
@@ -164,7 +165,7 @@ KS_DECLARE(ks_status_t) blade_handle_destroy(blade_handle_t **bhP)
ks_status_t
blade_handle_config
(
blade_handle_t
*
bh
,
config_setting_t
*
config
)
{
config_setting_t
*
service
=
NULL
;
config_setting_t
*
directory
=
NULL
;
config_setting_t
*
datastore
=
NULL
;
ks_assert
(
bh
);
...
...
@@ -172,13 +173,13 @@ ks_status_t blade_handle_config(blade_handle_t *bh, config_setting_t *config)
if
(
!
config
)
return
KS_STATUS_FAIL
;
if
(
!
config_setting_is_group
(
config
))
return
KS_STATUS_FAIL
;
service
=
config_setting_get_member
(
config
,
"service
"
);
directory
=
config_setting_get_member
(
config
,
"directory
"
);
datastore
=
config_setting_get_member
(
config
,
"datastore"
);
//if (datastore && !config_setting_is_group(datastore)) return KS_STATUS_FAIL;
bh
->
config_
service
=
service
;
bh
->
config_
directory
=
directory
;
bh
->
config_datastore
=
datastore
;
return
KS_STATUS_SUCCESS
;
...
...
@@ -201,7 +202,11 @@ KS_DECLARE(ks_status_t) blade_handle_startup(blade_handle_t *bh, config_setting_
return
KS_STATUS_FAIL
;
}
}
// @todo load DSOs
// @todo call onload and onstartup callbacks for modules from DSOs
return
KS_STATUS_SUCCESS
;
}
...
...
@@ -209,9 +214,12 @@ KS_DECLARE(ks_status_t) blade_handle_shutdown(blade_handle_t *bh)
{
ks_assert
(
bh
);
// @todo cleanup registered transports
// @todo call onshutdown and onunload callbacks for modules from DSOs
// @todo unload DSOs
if
(
blade_handle_datastore_available
(
bh
))
blade_datastore_destroy
(
&
bh
->
datastore
);
return
KS_STATUS_SUCCESS
;
}
...
...
@@ -279,14 +287,33 @@ KS_DECLARE(ks_status_t) blade_handle_connect(blade_handle_t *bh, blade_connectio
ks_assert
(
bh
);
ks_assert
(
target
);
// @todo this should take a callback, and push this to a queue to be processed async from another thread on the handle
// which will allow the onconnect callback to block while doing things like DNS lookups without having unknown
// impact depending on the caller thread
ks_hash_read_lock
(
bh
->
transports
);
blade_identity_parameter_get
(
target
,
"transport"
,
&
tname
);
tname
=
blade_identity_parameter_get
(
target
,
"transport"
);
if
(
tname
)
{
bhtr
=
ks_hash_search
(
bh
->
transports
,
(
void
*
)
tname
,
KS_UNLOCKED
);
if
(
!
bhtr
)
{
// @todo error logging, target has an explicit transport that is not available in the local transports registry
// discuss later whether this scenario should still attempt other transports when target is explicit
// @note discussions indicate that by default messages should favor relaying through a master service, unless
// an existing direct connection already exists to the target (which if the target is the master node, then there is
// no conflict of proper routing). This also applies to routing for identities which relate to groups, relaying should
// most often occur through a master service, however there may be scenarios that exist where an existing session
// exists dedicated to faster delivery for a group (IE, through an ampq cluster directly, such as master services
// syncing with each other through a pub/sub). There is also the potential that instead of a separate session, the
// current session with a master service may be able to have another connection attached which represents access through
// amqp, which in turn acts as a preferred router for only group identities
// This information does not directly apply to connecting, but should be noted for the next level up where you simply
// send a message which will not actually connect, only check for existing sessions for the target and master service
// @note relaying by master services should take a slightly different path, when they receive something not for the
// master service itself, it should relay this on to all other master services, which in turn all including original
// receiver pass on to any sessions matching an identity that is part of the group, alternatively they can use a pub/sub
// like amqp to relay between the master services more efficiently than using the websocket to send every master service
// session the message individually
}
}
else
{
for
(
ks_hash_iterator_t
*
it
=
ks_hash_first
(
bh
->
transports
,
KS_UNLOCKED
);
it
;
it
=
ks_hash_next
(
&
it
))
{
...
...
libs/libblade/src/include/blade_identity.h
浏览文件 @
a7add335
...
...
@@ -40,7 +40,7 @@ KS_DECLARE(ks_status_t) blade_identity_create(blade_identity_t **biP, ks_pool_t
KS_DECLARE
(
ks_status_t
)
blade_identity_destroy
(
blade_identity_t
**
biP
);
KS_DECLARE
(
ks_status_t
)
blade_identity_parse
(
blade_identity_t
*
bi
,
const
char
*
uri
);
KS_DECLARE
(
const
char
*
)
blade_identity_uri
(
blade_identity_t
*
bi
);
KS_DECLARE
(
ks_status_t
)
blade_identity_parameter_get
(
blade_identity_t
*
bi
,
const
char
*
key
,
const
char
**
value
);
KS_DECLARE
(
const
char
*
)
blade_identity_parameter_get
(
blade_identity_t
*
bi
,
const
char
*
key
);
KS_END_EXTERN_C
#endif
...
...
libs/libblade/src/include/blade_stack.h
浏览文件 @
a7add335
...
...
@@ -50,7 +50,7 @@ KS_DECLARE(ks_thread_pool_t *) blade_handle_tpool_get(blade_handle_t *bh);
KS_DECLARE
(
ks_status_t
)
blade_handle_transport_register
(
blade_handle_t
*
bh
,
blade_module_t
*
bm
,
const
char
*
name
,
blade_transport_callbacks_t
*
callbacks
);
KS_DECLARE
(
ks_status_t
)
blade_handle_transport_unregister
(
blade_handle_t
*
bh
,
const
char
*
name
);
KS_DECLARE
(
ks_status_t
)
blade_handle_connect
(
blade_handle_t
*
bh
,
blade_connection_t
**
bcP
,
blade_identity_t
*
target
);
KS_DECLARE
(
ks_bool_t
)
blade_handle_datastore_available
(
blade_handle_t
*
bh
);
KS_DECLARE
(
ks_status_t
)
blade_handle_datastore_store
(
blade_handle_t
*
bh
,
const
void
*
key
,
int32_t
key_length
,
const
void
*
data
,
int64_t
data_length
);
...
...
libs/libblade/test/bladec.c
浏览文件 @
a7add335
...
...
@@ -30,12 +30,14 @@ void command_test(blade_handle_t *bh, char *args);
void
command_quit
(
blade_handle_t
*
bh
,
char
*
args
);
void
command_store
(
blade_handle_t
*
bh
,
char
*
args
);
void
command_fetch
(
blade_handle_t
*
bh
,
char
*
args
);
void
command_connect
(
blade_handle_t
*
bh
,
char
*
args
);
static
const
struct
command_def_s
command_defs
[]
=
{
{
"test"
,
command_test
},
{
"quit"
,
command_quit
},
{
"store"
,
command_store
},
{
"fetch"
,
command_fetch
},
{
"connect"
,
command_connect
},
{
NULL
,
NULL
}
};
...
...
@@ -47,6 +49,8 @@ int main(int argc, char **argv)
config_setting_t
*
config_blade
=
NULL
;
blade_module_t
*
mod_wss
=
NULL
;
//blade_identity_t *id = NULL;
const
char
*
cfgpath
=
"bladec.cfg"
;
ks_global_set_default_logger
(
KS_LOG_LEVEL_DEBUG
);
...
...
@@ -54,12 +58,10 @@ int main(int argc, char **argv)
blade_handle_create
(
&
bh
,
NULL
,
NULL
);
//blade_identity_create(&id, blade_handle_pool_get(bh));
//blade_identity_parse(id, "test@domain.com/laptop?transport=wss&host=127.0.0.1&port=1234");
if
(
argc
>
1
)
cfgpath
=
argv
[
1
];
// @todo load config file, and lookup "blade" setting to put into config_blade
config_init
(
&
config
);
if
(
!
config_read_file
(
&
config
,
"bladec.cfg"
))
{
if
(
!
config_read_file
(
&
config
,
cfgpath
))
{
ks_log
(
KS_LOG_ERROR
,
"%s:%d - %s
\n
"
,
config_error_file
(
&
config
),
config_error_line
(
&
config
),
config_error_text
(
&
config
));
config_destroy
(
&
config
);
return
EXIT_FAILURE
;
...
...
@@ -91,6 +93,10 @@ int main(int argc, char **argv)
loop
(
bh
);
blade_module_wss_on_shutdown
(
mod_wss
);
blade_module_wss_on_unload
(
mod_wss
);
blade_handle_destroy
(
&
bh
);
blade_shutdown
();
...
...
@@ -236,3 +242,18 @@ void command_fetch(blade_handle_t *bh, char *args)
blade_handle_datastore_fetch
(
bh
,
blade_datastore_fetch_callback
,
key
,
strlen
(
key
),
bh
);
}
void
command_connect
(
blade_handle_t
*
bh
,
char
*
args
)
{
blade_connection_t
*
bc
=
NULL
;
blade_identity_t
*
target
=
NULL
;
ks_assert
(
bh
);
ks_assert
(
args
);
blade_identity_create
(
&
target
,
blade_handle_pool_get
(
bh
));
if
(
blade_identity_parse
(
target
,
args
)
==
KS_STATUS_SUCCESS
)
blade_handle_connect
(
bh
,
&
bc
,
target
);
blade_identity_destroy
(
&
target
);
}
libs/libblade/test/bladec2.cfg
浏览文件 @
a7add335
blade:
{
identity = "peer@domain";
directory:
{
uris = ( "directory@domain?transport=wss&host=127.0.0.1&port=2100" );
};
datastore:
{
database:
...
...
@@ -12,18 +8,4 @@ blade:
path = ":mem:";
};
};
wss:
{
endpoints:
{
ipv4 = ( { address = "0.0.0.0", port = 2101 } );
ipv6 = ( { address = "::", port = 2101 } );
backlog = 128;
};
# SSL group is optional, disabled when absent
ssl:
{
# todo: server SSL stuffs here
};
};
};
编写
预览
Markdown
格式
0%
重试
或
添加新文件
添加附件
取消
您添加了
0
人
到此讨论。请谨慎行事。
请先完成此评论的编辑!
取消
请
注册
或者
登录
后发表评论