Skip to content

Commit

Permalink
Added GLib Hashtables (used like STL sets) to detect and discard dupl…
Browse files Browse the repository at this point in the history
…icate RTP packets

Signed-off-by: Carlos Rafael Giani <[email protected]>
  • Loading branch information
dv1 committed Jul 10, 2012
1 parent 3f759ac commit 551e0f1
Showing 1 changed file with 40 additions and 8 deletions.
48 changes: 40 additions & 8 deletions fecdec.c
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,6 @@
#define RTP_FEC_HEADER_SIZE 12


/* TODO: detect and drop duplicate packets */


struct fec_dec_s
{
guint num_media_packets;
Expand All @@ -46,6 +43,9 @@ struct fec_dec_s
GQueue *fec_packets;
GQueue *recovered_packets;

GHashTable *media_packet_set;
GHashTable *fec_packet_set;

guint32 received_media_packet_mask;
guint num_received_media_packets;
guint num_received_fec_packets;
Expand Down Expand Up @@ -75,6 +75,8 @@ fec_dec* fec_dec_create(guint const num_media_packets, guint const num_fec_packe
dec->media_packets = g_queue_new();
dec->fec_packets = g_queue_new();
dec->recovered_packets = g_queue_new();
dec->media_packet_set = g_hash_table_new(g_direct_hash, g_direct_equal);
dec->fec_packet_set = g_hash_table_new(g_direct_hash, g_direct_equal);
dec->received_media_packet_mask = 0;
dec->num_received_media_packets = 0;
dec->num_received_fec_packets = 0;
Expand All @@ -89,6 +91,8 @@ void fec_dec_destroy(fec_dec *dec)
g_queue_free(dec->media_packets);
g_queue_free(dec->fec_packets);
g_queue_free(dec->recovered_packets);
g_hash_table_destroy(dec->media_packet_set);
g_hash_table_destroy(dec->fec_packet_set);
free(dec);
}

Expand Down Expand Up @@ -122,6 +126,8 @@ static void fec_dec_cleanup(fec_dec *dec)
g_queue_foreach(dec->fec_packets, fec_dec_clear_packet, NULL);
g_queue_clear(dec->media_packets);
g_queue_clear(dec->fec_packets);
g_hash_table_remove_all(dec->media_packet_set);
g_hash_table_remove_all(dec->fec_packet_set);
}


Expand Down Expand Up @@ -255,10 +261,18 @@ static void fec_dec_check_state(fec_dec *dec)

void fec_dec_push_media_packet(fec_dec *dec, GstBuffer *packet)
{
guint32 original_seqnum;
original_seqnum = gst_rtp_buffer_get_seq(packet);

if (g_hash_table_lookup_extended(dec->media_packet_set, GINT_TO_POINTER(original_seqnum), NULL, NULL))
{
GST_DEBUG("Media packet with seqnum %u is already in queue - discarding duplicate", original_seqnum);
return;
}

if (dec->has_snbase)
{
guint32 original_seqnum, corrected_seqnum;
original_seqnum = gst_rtp_buffer_get_seq(packet);
guint32 corrected_seqnum;
corrected_seqnum = fec_dec_correct_seqnum(dec, original_seqnum);

GST_DEBUG("Pushing media packet with seqnum %u, current snbase is %u", original_seqnum, dec->cur_snbase);
Expand All @@ -271,15 +285,18 @@ void fec_dec_push_media_packet(fec_dec *dec, GstBuffer *packet)
dec->blacklisted_snbase = dec->cur_snbase;
g_queue_foreach(dec->fec_packets, fec_dec_clear_packet, NULL);
g_queue_clear(dec->fec_packets);
g_hash_table_remove_all(dec->fec_packet_set);
dec->num_received_fec_packets = 0;

GST_DEBUG("Pushing media packet with seqnum %u, no current snbase set", original_seqnum);
g_queue_push_tail(dec->media_packets, gst_buffer_ref(packet));
g_hash_table_add(dec->media_packet_set, GINT_TO_POINTER(original_seqnum));
++dec->num_received_media_packets;
}
else if ((corrected_seqnum >= (guint32)(dec->cur_snbase)) && (corrected_seqnum <= ((guint32)(dec->cur_snbase) + ((guint32)(dec->num_media_packets)) - 1)))
{
g_queue_push_tail(dec->media_packets, gst_buffer_ref(packet));
g_hash_table_add(dec->media_packet_set, GINT_TO_POINTER(original_seqnum));
++dec->num_received_media_packets;
dec->received_media_packet_mask |= (1ul << (corrected_seqnum - dec->cur_snbase));
dec->max_packet_size = MAX(dec->max_packet_size, GST_BUFFER_SIZE(packet));
Expand All @@ -293,9 +310,9 @@ void fec_dec_push_media_packet(fec_dec *dec, GstBuffer *packet)
}
else
{
guint16 seqnum = gst_rtp_buffer_get_seq(packet);
GST_DEBUG("Pushing media packet with seqnum %u, no current snbase set", seqnum);
GST_DEBUG("Pushing media packet with seqnum %u, no current snbase set", original_seqnum);
g_queue_push_tail(dec->media_packets, gst_buffer_ref(packet));
g_hash_table_add(dec->media_packet_set, GINT_TO_POINTER(original_seqnum));
++dec->num_received_media_packets;
}

Expand All @@ -307,7 +324,13 @@ void fec_dec_push_media_packet(fec_dec *dec, GstBuffer *packet)

for (i = dec->num_media_packets; i < dec->num_received_media_packets; ++i)
{
GstBuffer *media_packet = g_queue_pop_head(dec->media_packets);
GstBuffer *media_packet;
guint16 seqnum;

media_packet = g_queue_pop_head(dec->media_packets);
seqnum = gst_rtp_buffer_get_seq(packet);
g_hash_table_remove(dec->media_packet_set, GINT_TO_POINTER(seqnum));

gst_buffer_unref(media_packet);
}

Expand Down Expand Up @@ -335,11 +358,18 @@ void fec_dec_push_fec_packet(fec_dec *dec, GstBuffer *packet)
return;
}

if (g_hash_table_lookup_extended(dec->fec_packet_set, GINT_TO_POINTER(seqnum), NULL, NULL))
{
GST_DEBUG("FEC packet with seqnum %u is already in queue - discarding duplicate", seqnum);
return;
}

if (dec->cur_snbase != snbase)
{
GST_DEBUG("snbase changed from %u to %u - purging FEC queue (%u FEC packets and %u media packets present)", dec->cur_snbase, snbase, dec->num_received_fec_packets, dec->num_received_media_packets);
g_queue_foreach(dec->fec_packets, fec_dec_clear_packet, NULL);
g_queue_clear(dec->fec_packets);
g_hash_table_remove_all(dec->fec_packet_set);
dec->num_received_fec_packets = 0;
}

Expand All @@ -349,6 +379,7 @@ void fec_dec_push_fec_packet(fec_dec *dec, GstBuffer *packet)
dec->num_received_media_packets = 0;
dec->max_packet_size = 0;
g_queue_push_tail(dec->fec_packets, gst_buffer_ref(packet));
g_hash_table_add(dec->fec_packet_set, GINT_TO_POINTER(seqnum));
++dec->num_received_fec_packets;

for (link = g_queue_peek_head_link(dec->media_packets); link != NULL;)
Expand All @@ -365,6 +396,7 @@ void fec_dec_push_fec_packet(fec_dec *dec, GstBuffer *packet)
GList *old_link;

GST_DEBUG("Found media packet with seqnum %u outside bounds [%u, %u] - purging", seqnum, snbase, snbase + dec->num_media_packets - 1);
g_hash_table_remove(dec->media_packet_set, GINT_TO_POINTER(seqnum));
gst_buffer_unref(media_packet);

old_link = link;
Expand Down

0 comments on commit 551e0f1

Please sign in to comment.