提交 fa769584 authored 作者: Anthony Minessale's avatar Anthony Minessale 提交者: Michael Jerris

FS-7499: improve generic nack and vpx framing

上级 7fc019a9
......@@ -1493,6 +1493,7 @@ SWITCH_DECLARE(switch_hash_index_t *) switch_core_hash_next(_In_ switch_hash_ind
SWITCH_DECLARE(void) switch_core_hash_this(_In_ switch_hash_index_t *hi, _Out_opt_ptrdiff_cap_(klen)
const void **key, _Out_opt_ switch_ssize_t *klen, _Out_ void **val);
SWITCH_DECLARE(void) switch_core_hash_this_val(switch_hash_index_t *hi, void *val);
SWITCH_DECLARE(switch_status_t) switch_core_inthash_init(switch_inthash_t **hash);
SWITCH_DECLARE(switch_status_t) switch_core_inthash_destroy(switch_inthash_t **hash);
......
......@@ -196,6 +196,7 @@ SWITCH_DECLARE(switch_hashtable_iterator_t*) switch_hashtable_first_iter(switch_
#define switch_hashtable_first(_h) switch_hashtable_first_iter(_h, NULL)
SWITCH_DECLARE(switch_hashtable_iterator_t*) switch_hashtable_next(switch_hashtable_iterator_t **iP);
SWITCH_DECLARE(void) switch_hashtable_this(switch_hashtable_iterator_t *i, const void **key, switch_ssize_t *klen, void **val);
SWITCH_DECLARE(void) switch_hashtable_this_val(switch_hashtable_iterator_t *i, void *val);
static inline uint32_t switch_hash_default_int(void *ky) {
uint32_t x = *((uint32_t *)ky);
......
......@@ -33,6 +33,11 @@
#ifndef SWITCH_VIDDERBUFFER_H
#define SWITCH_VIDDERBUFFER_H
typedef enum {
SVB_QUEUE_ONLY = (1 << 0)
} switch_vb_flag_t;
SWITCH_BEGIN_EXTERN_C
SWITCH_DECLARE(switch_status_t) switch_vb_create(switch_vb_t **vbp, uint32_t min_frame_len, uint32_t max_frame_len, switch_memory_pool_t *pool);
SWITCH_DECLARE(switch_status_t) switch_vb_destroy(switch_vb_t **vbp);
......@@ -44,6 +49,8 @@ SWITCH_DECLARE(switch_status_t) switch_vb_put_packet(switch_vb_t *vb, switch_rtp
SWITCH_DECLARE(switch_status_t) switch_vb_get_packet(switch_vb_t *vb, switch_rtp_packet_t *packet, switch_size_t *len);
SWITCH_DECLARE(uint32_t) switch_vb_pop_nack(switch_vb_t *vb);
SWITCH_DECLARE(switch_status_t) switch_vb_get_packet_by_seq(switch_vb_t *vb, uint16_t seq, switch_rtp_packet_t *packet, switch_size_t *len);
SWITCH_DECLARE(void) switch_vb_set_flag(switch_vb_t *vb, switch_vb_flag_t flag);
SWITCH_DECLARE(void) switch_vb_clear_flag(switch_vb_t *vb, switch_vb_flag_t flag);
SWITCH_END_EXTERN_C
#endif
......
......@@ -231,6 +231,11 @@ SWITCH_DECLARE(void) switch_core_hash_this(switch_hash_index_t *hi, const void *
switch_hashtable_this(hi, key, klen, val);
}
SWITCH_DECLARE(void) switch_core_hash_this_val(switch_hash_index_t *hi, void *val)
{
switch_hashtable_this_val(hi, val);
}
SWITCH_DECLARE(switch_status_t) switch_core_inthash_init(switch_inthash_t **hash)
{
......
......@@ -332,7 +332,12 @@ SWITCH_DECLARE(switch_hashtable_iterator_t *) switch_hashtable_first_iter(switch
return switch_hashtable_next(&iterator);
}
SWITCH_DECLARE(void) switch_hashtable_this_val(switch_hashtable_iterator_t *i, void *val)
{
if (i->e) {
i->e->v = val;
}
}
SWITCH_DECLARE(void) switch_hashtable_this(switch_hashtable_iterator_t *i, const void **key, switch_ssize_t *klen, void **val)
{
......
......@@ -958,7 +958,6 @@ static void handle_ice(switch_rtp_t *rtp_session, switch_rtp_ice_t *ice, void *d
if (rtp_session->flags[SWITCH_RTP_FLAG_VIDEO]) {
switch_core_session_video_reinit(rtp_session->session);
}
switch_rtp_set_flag(rtp_session, SWITCH_RTP_FLAG_FLUSH);
}
}
......@@ -1109,7 +1108,6 @@ static void handle_ice(switch_rtp_t *rtp_session, switch_rtp_ice_t *ice, void *d
if (rtp_session->flags[SWITCH_RTP_FLAG_VIDEO]) {
switch_core_session_video_reinit(rtp_session->session);
}
switch_rtp_set_flag(rtp_session, SWITCH_RTP_FLAG_FLUSH);
}
memset(stunbuf, 0, sizeof(stunbuf));
......@@ -2413,7 +2411,12 @@ SWITCH_DECLARE(switch_status_t) switch_rtp_set_local_address(switch_rtp_t *rtp_s
*err = "Socket Error!";
goto done;
}
if (rtp_session->flags[SWITCH_RTP_FLAG_VIDEO]) {
switch_socket_opt_set(new_sock, SWITCH_SO_RCVBUF, 786432);
switch_socket_opt_set(new_sock, SWITCH_SO_SNDBUF, 786432);
}
if (switch_socket_bind(new_sock, rtp_session->local_addr) != SWITCH_STATUS_SUCCESS) {
char *em = switch_core_sprintf(rtp_session->pool, "Bind Error! %s:%d", host, port);
*err = em;
......@@ -3961,6 +3964,7 @@ SWITCH_DECLARE(switch_status_t) switch_rtp_activate_ice(switch_rtp_t *rtp_sessio
switch_port_t port = 0;
char bufc[30];
switch_mutex_lock(rtp_session->ice_mutex);
if (proto == IPR_RTP) {
......@@ -4045,7 +4049,9 @@ SWITCH_DECLARE(void) switch_rtp_flush(switch_rtp_t *rtp_session)
return;
}
switch_rtp_set_flag(rtp_session, SWITCH_RTP_FLAG_FLUSH);
if (!rtp_session->flags[SWITCH_RTP_FLAG_VIDEO]) {
switch_rtp_set_flag(rtp_session, SWITCH_RTP_FLAG_FLUSH);
}
}
SWITCH_DECLARE(void) switch_rtp_video_refresh(switch_rtp_t *rtp_session)
......@@ -4370,6 +4376,10 @@ SWITCH_DECLARE(void) switch_rtp_clear_flags(switch_rtp_t *rtp_session, switch_rt
SWITCH_DECLARE(void) switch_rtp_set_flag(switch_rtp_t *rtp_session, switch_rtp_flag_t flag)
{
if (flag == SWITCH_RTP_FLAG_FLUSH && rtp_session->flags[SWITCH_RTP_FLAG_VIDEO]) {
return;
}
switch_mutex_lock(rtp_session->flag_mutex);
rtp_session->flags[flag] = 1;
switch_mutex_unlock(rtp_session->flag_mutex);
......@@ -4605,6 +4615,7 @@ SWITCH_DECLARE(void) rtp_flush_read_buffer(switch_rtp_t *rtp_session, switch_rtp
{
if (rtp_session->flags[SWITCH_RTP_FLAG_PROXY_MEDIA] ||
rtp_session->flags[SWITCH_RTP_FLAG_VIDEO] ||
rtp_session->flags[SWITCH_RTP_FLAG_UDPTL]) {
return;
}
......@@ -5300,6 +5311,7 @@ static void handle_nack(switch_rtp_t *rtp_session, uint32_t nack)
send_msg->header.pt, ntohl(send_msg->header.ts), ntohs(send_msg->header.seq), send_msg->header.m);
}
//switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "RE----SEND %u\n", ntohs(send_msg->header.seq));
switch_rtp_write_raw(rtp_session, (void *) send_msg, &bytes, SWITCH_FALSE);
} else {
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(rtp_session->session), SWITCH_LOG_DEBUG, "Cannot send NACK for seq %u\n", ntohs(seq));
......@@ -5321,6 +5333,7 @@ static void handle_nack(switch_rtp_t *rtp_session, uint32_t nack)
send_msg->header.pt, ntohl(send_msg->header.ts), ntohs(send_msg->header.seq), send_msg->header.m);
}
//switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "RE----SEND %u\n", ntohs(send_msg->header.seq));
switch_rtp_write_raw(rtp_session, (void *) &send_msg, &bytes, SWITCH_FALSE);
} else {
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(rtp_session->session), SWITCH_LOG_DEBUG, "Cannot send NACK for seq %u\n", ntohs(seq) + i);
......@@ -7005,7 +7018,8 @@ static int rtp_common_write(switch_rtp_t *rtp_session,
if (rtp_session->flags[SWITCH_RTP_FLAG_NACK]) {
if (!rtp_session->vbw) {
switch_vb_create(&rtp_session->vbw, 5, 5, rtp_session->pool);
switch_vb_create(&rtp_session->vbw, 500, 500, rtp_session->pool);
switch_vb_set_flag(rtp_session->vbw, SVB_QUEUE_ONLY);
//switch_vb_debug_level(rtp_session->vbw, 10);
}
switch_vb_put_packet(rtp_session->vbw, (switch_rtp_packet_t *)send_msg, bytes);
......@@ -7027,7 +7041,12 @@ static int rtp_common_write(switch_rtp_t *rtp_session,
}
}
#else
//if (rtp_session->flags[SWITCH_RTP_FLAG_VIDEO]) {
//
// rtp_session->flags[SWITCH_RTP_FLAG_DEBUG_RTP_READ]++;
//
// //switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "SEND %u\n", ntohs(send_msg->header.seq));
//}
if (switch_socket_sendto(rtp_session->sock_output, rtp_session->remote_addr, 0, (void *) send_msg, &bytes) != SWITCH_STATUS_SUCCESS) {
rtp_session->seq--;
ret = -1;
......
......@@ -34,6 +34,9 @@
#define MAX_MISSING_SEQ 20
#define vb_debug(_vb, _level, _format, ...) if (_vb->debug_level >= _level) switch_log_printf(SWITCH_CHANNEL_LOG_CLEAN, SWITCH_LOG_ALERT, "VB:%p level:%d line:%d ->" _format, (void *) _vb, _level, __LINE__, __VA_ARGS__)
const char *TOKEN_1 = "ONE";
const char *TOKEN_2 = "TWO";
struct switch_vb_s;
typedef struct switch_vb_node_s {
......@@ -66,6 +69,7 @@ struct switch_vb_s {
switch_mutex_t *mutex;
switch_memory_pool_t *pool;
int free_pool;
switch_vb_flag_t flags;
};
static inline switch_vb_node_t *new_node(switch_vb_t *vb)
......@@ -293,6 +297,15 @@ static inline void free_nodes(switch_vb_t *vb)
vb->node_list = NULL;
}
SWITCH_DECLARE(void) switch_vb_set_flag(switch_vb_t *vb, switch_vb_flag_t flag)
{
switch_set_flag(vb, flag);
}
SWITCH_DECLARE(void) switch_vb_clear_flag(switch_vb_t *vb, switch_vb_flag_t flag)
{
switch_clear_flag(vb, flag);
}
SWITCH_DECLARE(int) switch_vb_poll(switch_vb_t *vb)
{
......@@ -394,10 +407,17 @@ SWITCH_DECLARE(uint32_t) switch_vb_pop_nack(switch_vb_t *vb)
for (hi = switch_core_hash_first(vb->missing_seq_hash); hi; hi = switch_core_hash_next(&hi)) {
uint16_t seq;
const char *token;
switch_core_hash_this(hi, &var, NULL, &val);
seq = ntohs(*((uint16_t *) var));
token = (const char *) val;
if (token == TOKEN_2) {
//switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "SKIP %u %s\n", ntohs(*((uint16_t *) var)), token);
continue;
}
seq = ntohs(*((uint16_t *) var));
if (!least || seq < least) {
least = seq;
}
......@@ -406,9 +426,12 @@ SWITCH_DECLARE(uint32_t) switch_vb_pop_nack(switch_vb_t *vb)
if (least && switch_core_inthash_delete(vb->missing_seq_hash, (uint32_t)htons(least))) {
vb_debug(vb, 3, "Found smallest NACKABLE seq %u\n", least);
nack = (uint32_t) htons(least);
switch_core_inthash_insert(vb->missing_seq_hash, nack, (void *) TOKEN_2);
for(i = 0; i < 16; i++) {
if (switch_core_inthash_delete(vb->missing_seq_hash, (uint32_t)htons(least + i + 1))) {
switch_core_inthash_insert(vb->missing_seq_hash, (uint32_t)htons(least + i + 1), (void *) TOKEN_2);
vb_debug(vb, 3, "Found addtl NACKABLE seq %u\n", least + i + 1);
blp |= (1 << i);
}
......@@ -435,23 +458,37 @@ SWITCH_DECLARE(switch_status_t) switch_vb_put_packet(switch_vb_t *vb, switch_rtp
{
uint32_t i;
uint16_t want = ntohs(vb->next_seq), got = ntohs(packet->header.seq);
int missing = 0;
switch_mutex_lock(vb->mutex);
if (!want) want = got;
if (got > want) {
vb_debug(vb, 2, "GOT %u WANTED %u; MARK SEQS MISSING %u - %u\n", got, want, want, got - 1);
for (i = want; i < got; i++) {
switch_core_inthash_insert(vb->missing_seq_hash, (uint32_t)htons(i), (void *)SWITCH_TRUE);
}
if (switch_test_flag(vb, SVB_QUEUE_ONLY)) {
vb->next_seq = htons(got + 1);
} else {
switch_core_inthash_delete(vb->missing_seq_hash, (uint32_t)htons(got));
}
//switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "WTF %u\n", got);
if (switch_core_inthash_delete(vb->missing_seq_hash, (uint32_t)htons(got))) {
//switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "POPPED RESEND %u\n", got);
missing = 1;
}
if (!missing || want == got) {
if (got > want) {
//vb_debug(vb, 2, "GOT %u WANTED %u; MARK SEQS MISSING %u - %u\n", got, want, want, got - 1);
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "XXXXXXXXXXXXXXXXXX WTF GOT %u WANTED %u; MARK SEQS MISSING %u - %u\n", got, want, want, got - 1);
for (i = want; i < got; i++) {
//switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "MISSING %u\n", i);
switch_core_inthash_insert(vb->missing_seq_hash, (uint32_t)htons(i), (void *)TOKEN_1);
}
}
if (got >= want) {
vb->next_seq = htons(got + 1);
if (got >= want || (want - got) > 1000) {
vb->next_seq = htons(got + 1);
}
}
}
add_node(vb, packet, len);
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论