Skip to content

Commit

Permalink
fixed errors; now translation goes up to real a/v broadcast
Browse files Browse the repository at this point in the history
  • Loading branch information
arut committed Mar 14, 2012
1 parent 8e32bae commit 68c6411
Show file tree
Hide file tree
Showing 7 changed files with 270 additions and 77 deletions.
2 changes: 2 additions & 0 deletions TODO
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
- closing session on send error
causes crash because of double-freeing stack

- recognize amf-meta

- shortcuts for big-endian copy

- implement loc confs (=fms apps)
Expand Down
3 changes: 1 addition & 2 deletions ngx_rtmp.c
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,6 @@ ngx_rtmp_init_event_handlers(ngx_conf_t *cf, ngx_rtmp_core_main_conf_t *cmcf)
eh = ngx_array_push(&cmcf->events[NGX_RTMP_MSG_AMF0_CMD]);
*eh = ngx_rtmp_amf0_message_handler;


/* init calls */
h = cmcf->calls.elts;
for(n = 0; n < cmcf->calls.nelts; ++n, ++h) {
Expand All @@ -326,7 +325,7 @@ ngx_rtmp_init_event_handlers(ngx_conf_t *cf, ngx_rtmp_core_main_conf_t *cmcf)

calls_hash.hash = &cmcf->calls_hash;
calls_hash.key = ngx_hash_key_lc;
calls_hash.max_size = 1024;
calls_hash.max_size = 512;
calls_hash.bucket_size = ngx_cacheline_size;
calls_hash.name = "calls_hash";
calls_hash.pool = cf->pool;
Expand Down
5 changes: 3 additions & 2 deletions ngx_rtmp.h
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ typedef struct {


typedef struct ngx_rtmp_stream_t {
ngx_rtmp_header_t hdr;
ngx_rtmp_header_t hdr;
ngx_chain_t *in;
} ngx_rtmp_stream_t;

Expand Down Expand Up @@ -257,7 +257,6 @@ typedef struct {


void ngx_rtmp_init_connection(ngx_connection_t *c);
/*void ngx_rtmp_close_session(ngx_rtmp_session_t *s);*/
void ngx_rtmp_close_connection(ngx_connection_t *c);
u_char * ngx_rtmp_log_error(ngx_log_t *log, u_char *buf, size_t len);

Expand All @@ -272,6 +271,8 @@ ngx_int_t ngx_rtmp_amf0_message_handler(ngx_rtmp_session_t *s,

/* Sending messages */
ngx_chain_t * ngx_rtmp_alloc_shared_buf(ngx_rtmp_session_t *s);
ngx_int_t ngx_rtmp_release_shared_buf(ngx_rtmp_session_t *s,
ngx_chain_t *out);
void ngx_rtmp_prepare_message(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
ngx_chain_t *out, uint8_t fmt);
ngx_int_t ngx_rtmp_send_message(ngx_rtmp_session_t *s, ngx_chain_t *out);
Expand Down
1 change: 0 additions & 1 deletion ngx_rtmp_amf0.c
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,6 @@ ngx_rtmp_amf0_put(ngx_rtmp_amf0_ctx_t *ctx, void *p, size_t n)

ctx->link = l;
b = l->buf;
b->pos = b->last = b->start;
}

size = b->end - b->last;
Expand Down
227 changes: 197 additions & 30 deletions ngx_rtmp_broadcast_module.c
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,30 @@ static void * ngx_rtmp_broadcast_create_srv_conf(ngx_conf_t *cf);
static char * ngx_rtmp_broadcast_merge_srv_conf(ngx_conf_t *cf,
void *parent, void *child);

static ngx_int_t ngx_rtmp_broadcast_connect(ngx_rtmp_session_t *s,
double in_trans, ngx_chain_t *in);
static ngx_int_t ngx_rtmp_broadcast_create_stream(ngx_rtmp_session_t *s,
double in_trans, ngx_chain_t *in);
static ngx_int_t ngx_rtmp_broadcast_publish(ngx_rtmp_session_t *s,
double in_trans, ngx_chain_t *in);
static ngx_int_t ngx_rtmp_broadcast_ok(ngx_rtmp_session_t *s,
double in_trans, ngx_chain_t *in);


typedef struct {
ngx_str_t name;
ngx_rtmp_call_handler_pt handler;
} ngx_rtmp_broadcast_map_t;


static ngx_rtmp_broadcast_map_t ngx_rtmp_broadcast_map[] = {
{ ngx_string("connect"), ngx_rtmp_broadcast_connect },
{ ngx_string("createStream"), ngx_rtmp_broadcast_create_stream },
{ ngx_string("publish"), ngx_rtmp_broadcast_publish },
{ ngx_string("releaseStream"), ngx_rtmp_broadcast_ok },
{ ngx_string("FCPublish"), ngx_rtmp_broadcast_ok },
};


typedef struct {
/* use hash-map
Expand Down Expand Up @@ -119,6 +143,20 @@ ngx_rtmp_broadcast_get_head(ngx_rtmp_session_t *s)
}


static void
ngx_rtmp_broadcast_set_flags(ngx_rtmp_session_t *s, ngx_uint_t flags)
{
ngx_rtmp_broadcast_ctx_t *ctx;

ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_broadcast_module);
if (ctx == NULL) {
return;
}

ctx->flags |= flags;
}


static void
ngx_rtmp_broadcast_join(ngx_rtmp_session_t *s, ngx_str_t *stream,
ngx_uint_t flags)
Expand Down Expand Up @@ -187,8 +225,9 @@ ngx_rtmp_broadcast_av(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
{
ngx_connection_t *c;
ngx_rtmp_broadcast_ctx_t *ctx, *cctx;
ngx_chain_t *out, *l;
ngx_chain_t *out, *l, **ll;
u_char *p;
size_t nsubs;

c = s->connection;

Expand All @@ -206,22 +245,25 @@ ngx_rtmp_broadcast_av(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,

/* copy data to output stream */
out = NULL;
ll = &out;
p = in->buf->pos;

for(;;) {
for ( ;; ) {
l = ngx_rtmp_alloc_shared_buf(s);
if (l == NULL || l->buf == NULL) {
return NGX_ERROR;
}

if (out == NULL) {
out = l;
}
*ll = l;
ll = &l->next;

while (l->buf->end - l->buf->last > in->buf->last - p) {
while (l->buf->end - l->buf->last >= in->buf->last - p) {
l->buf->last = ngx_cpymem(l->buf->last, p,
in->buf->last - p);
in = in->next;
if (in == NULL) {
goto done;
}
p = in->buf->pos;
}

Expand All @@ -230,10 +272,14 @@ ngx_rtmp_broadcast_av(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
p += (l->buf->end - l->buf->last);
}

done:
*ll = NULL;

ngx_rtmp_prepare_message(s, h, out, 0/*fmt*/);

/* broadcast to all subscribers */
for(cctx = *ngx_rtmp_broadcast_get_head(s);
nsubs = 0;
for (cctx = *ngx_rtmp_broadcast_get_head(s);
cctx; cctx = cctx->next)
{
if (cctx != ctx
Expand All @@ -245,9 +291,17 @@ ngx_rtmp_broadcast_av(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
if (ngx_rtmp_send_message(s, out) != NGX_OK) {
return NGX_ERROR;
}
++nsubs;
}
}

/* no one subscriber? */
if (!nsubs
&& ngx_rtmp_release_shared_buf(s, out) != NGX_OK)
{
return NGX_ERROR;
}

return NGX_OK;
}

Expand Down Expand Up @@ -298,60 +352,173 @@ ngx_rtmp_broadcast_connect(ngx_rtmp_session_t *s, double in_trans,
"connect() called; app='%s' url='%s'",
app, url);

if (0) {
app_str.data = app;
app_str.len = ngx_strlen(app);
ngx_rtmp_broadcast_join(s, &app_str, 0);
}

return ngx_rtmp_send_ack_size(s, 65536)
|| ngx_rtmp_send_bandwidth(s, 65536, NGX_RTMP_LIMIT_SOFT)
|| ngx_rtmp_send_user_stream_begin(s, 1)
|| ngx_rtmp_send_amf0(s, 3, 1, out_elts,
/*FIXME: app_str allocation!!!!!!! */
/*FIXME: add memsetting input data */
/* join stream */
ngx_str_set(&app_str, "preved");
/*
app_str.data = app;
app_str.len = ngx_strlen(app);
*/
ngx_rtmp_broadcast_join(s, &app_str, 0);

return ngx_rtmp_send_ack_size(s, 2500000)
|| ngx_rtmp_send_bandwidth(s, 2500000, NGX_RTMP_LIMIT_DYNAMIC)
|| ngx_rtmp_send_user_stream_begin(s, 0)
|| ngx_rtmp_send_amf0(s, 3, 0, out_elts,
sizeof(out_elts) / sizeof(out_elts[0]))
? NGX_ERROR
: NGX_OK;
}


static ngx_int_t
ngx_rtmp_broadcast_postconfiguration(ngx_conf_t *cf)
ngx_rtmp_broadcast_create_stream(ngx_rtmp_session_t *s, double in_trans,
ngx_chain_t *in)
{
ngx_rtmp_core_main_conf_t *cmcf;
ngx_hash_key_t *h;
ngx_rtmp_disconnect_handler_pt *dh;
ngx_rtmp_event_handler_pt *avh;
static double trans;
static double stream;

cmcf = ngx_rtmp_conf_get_module_main_conf(cf, ngx_rtmp_core_module);
static ngx_rtmp_amf0_elt_t out_elts[] = {
{ NGX_RTMP_AMF0_STRING, NULL, "_result", sizeof("_result") - 1 },
{ NGX_RTMP_AMF0_NUMBER, NULL, &trans, sizeof(trans) },
{ NGX_RTMP_AMF0_NULL , NULL, NULL, 0 },
{ NGX_RTMP_AMF0_NUMBER, NULL, &stream, sizeof(stream) },
};

ngx_log_debug0(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
"createStream() called");

/* add connect() handler */
h = ngx_array_push(&cmcf->calls);
trans = in_trans;
stream = 1;

if (h == NULL) {
return ngx_rtmp_send_amf0(s, 3, 0, out_elts,
sizeof(out_elts) / sizeof(out_elts[0]));
}


static ngx_int_t
ngx_rtmp_broadcast_publish(ngx_rtmp_session_t *s, double in_trans,
ngx_chain_t *in)
{
static double trans;
static u_char pub_name[1024];
static u_char pub_type[1024];

static ngx_rtmp_amf0_elt_t out_inf[] = {
{ NGX_RTMP_AMF0_STRING, "code", NULL, 0 },
{ NGX_RTMP_AMF0_STRING, "level", NULL, 0 },
{ NGX_RTMP_AMF0_STRING, "description", NULL, 0 },
};

static ngx_rtmp_amf0_elt_t in_elts[] = {
{ NGX_RTMP_AMF0_NULL, NULL, NULL, 0 },
{ NGX_RTMP_AMF0_STRING, NULL, pub_name, sizeof(pub_name) },
{ NGX_RTMP_AMF0_STRING, NULL, pub_type, sizeof(pub_type) },
};

static ngx_rtmp_amf0_elt_t out_elts[] = {
{ NGX_RTMP_AMF0_STRING, NULL, "onStatus", sizeof("onStatus") - 1 },
{ NGX_RTMP_AMF0_NUMBER, NULL, &trans, sizeof(trans) },
{ NGX_RTMP_AMF0_NULL , NULL, NULL, 0 },
{ NGX_RTMP_AMF0_OBJECT, NULL, out_inf, sizeof(out_inf) },
};


if (ngx_rtmp_receive_amf0(s, in, in_elts,
sizeof(in_elts) / sizeof(in_elts[0])))
{
return NGX_ERROR;
}

ngx_log_debug2(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
"publish() called; pubName='%s' pubType='%s'",
pub_name, pub_type);

ngx_rtmp_broadcast_set_flags(s, NGX_RTMP_BROADCAST_PUBLISHER);

trans = in_trans;
ngx_str_set(&out_inf[0], "NetStream.Publish.Start");
ngx_str_set(&out_inf[1], "status");
ngx_str_set(&out_inf[2], "Publish succeeded.");

if (ngx_rtmp_send_amf0(s, 3, 0, out_elts,
sizeof(out_elts) / sizeof(out_elts[0])) != NGX_OK)
{
return NGX_ERROR;
}

ngx_str_set(&h->key, "connect");
h->value = ngx_rtmp_broadcast_connect;
return NGX_OK;
}


static ngx_int_t ngx_rtmp_broadcast_ok(ngx_rtmp_session_t *s,
double in_trans, ngx_chain_t *in)
{
static double trans;

static ngx_rtmp_amf0_elt_t out_inf[] = {
{ NGX_RTMP_AMF0_STRING, "code", NULL, 0 },
{ NGX_RTMP_AMF0_STRING, "level", NULL, 0 },
{ NGX_RTMP_AMF0_STRING, "description", NULL, 0 },
};

static ngx_rtmp_amf0_elt_t out_elts[] = {
{ NGX_RTMP_AMF0_STRING, NULL, "onStatus", sizeof("onStatus") - 1 },
{ NGX_RTMP_AMF0_NUMBER, NULL, &trans, sizeof(trans) },
{ NGX_RTMP_AMF0_NULL , NULL, NULL, 0 },
{ NGX_RTMP_AMF0_OBJECT, NULL, out_inf, sizeof(out_inf) },
};


trans = in_trans;

return ngx_rtmp_send_amf0(s, 3, 0, out_elts,
sizeof(out_elts) / sizeof(out_elts[0]));
}


static ngx_int_t
ngx_rtmp_broadcast_postconfiguration(ngx_conf_t *cf)
{
ngx_rtmp_core_main_conf_t *cmcf;
ngx_hash_key_t *h;
ngx_rtmp_disconnect_handler_pt *dh;
ngx_rtmp_event_handler_pt *avh;
ngx_rtmp_broadcast_map_t *bm;
size_t n, ncalls;

cmcf = ngx_rtmp_conf_get_module_main_conf(cf, ngx_rtmp_core_module);

/* register audio/video broadcast handler */
avh = ngx_array_push(&cmcf->events[NGX_RTMP_MSG_AUDIO]);
*avh = ngx_rtmp_broadcast_av;

avh = ngx_array_push(&cmcf->events[NGX_RTMP_MSG_VIDEO]);
*avh = ngx_rtmp_broadcast_av;


/* add disconnect handler */
/* register disconnect handler */
dh = ngx_array_push(&cmcf->disconnect);

if (dh == NULL) {
return NGX_ERROR;
}

*dh = ngx_rtmp_broadcast_leave;

/* register AMF0 call handlers */
ncalls = sizeof(ngx_rtmp_broadcast_map)
/ sizeof(ngx_rtmp_broadcast_map[0]);
h = ngx_array_push_n(&cmcf->calls, ncalls);
if (h == NULL) {
return NGX_ERROR;
}

bm = ngx_rtmp_broadcast_map;
for(n = 0; n < ncalls; ++n, ++h, ++bm) {
h->key = bm->name;
h->value = bm->handler;
}

return NGX_OK;
}
Loading

0 comments on commit 68c6411

Please sign in to comment.