Skip to content

Commit

Permalink
Merge pull request alibaba#329 from zhcn381/upstream2
Browse files Browse the repository at this point in the history
nginx-tfs new features and bug-fixs.
  • Loading branch information
monadbobo committed Oct 14, 2013
2 parents 78c4fcb + eb46ef7 commit 5734b40
Show file tree
Hide file tree
Showing 15 changed files with 510 additions and 92 deletions.
97 changes: 73 additions & 24 deletions src/http/modules/tfs/ngx_http_tfs.c
Original file line number Diff line number Diff line change
Expand Up @@ -579,6 +579,8 @@ ngx_http_tfs_event_handler(ngx_event_t *ev)
} else {
t->read_event_handler(r, t);
}

ngx_http_run_posted_requests(c);
}


Expand Down Expand Up @@ -715,37 +717,39 @@ ngx_http_tfs_alloc_buf(ngx_http_tfs_t *t)
static ngx_int_t
ngx_http_tfs_process_header(ngx_http_tfs_t *t, ngx_int_t n)
{
ngx_int_t body_size, rc;
ngx_int_t body_size, rc;
ngx_http_tfs_peer_connection_t *tp;

tp = t->tfs_peer;

if (n < t->header_size) {
t->header_buffer.last += n;
t->header_size -= n;
return NGX_AGAIN;
}

t->header_buffer.last += t->header_size;
t->header = (void *) t->header_buffer.pos;

t->header_buffer.last += t->header_size;
body_size = n - t->header_size;
if (body_size > 0) {
tp->body_buffer.last += body_size;
}
if (t->input_filter != NULL) {
rc = t->input_filter(t);
if (rc != NGX_OK) {
/* error or NGX_DONE */
/* error or NGX_AGAIN or NGX_DONE */
return rc;
}
}

if (body_size > 0) {
return body_size;
}

return NGX_DECLINED;
return NGX_OK;
}


void
ngx_http_tfs_finalize_state(ngx_http_tfs_t *t, ngx_int_t rc)
{
uint16_t action;
ngx_http_request_t *r;
ngx_peer_connection_t *p;
ngx_http_tfs_peer_connection_t *tp;
Expand Down Expand Up @@ -832,9 +836,22 @@ ngx_http_tfs_finalize_state(ngx_http_tfs_t *t, ngx_int_t rc)
if (rc == NGX_DONE) {
/* need stat data */
if (!t->parent && t->srv_conf->log != NULL) {
action = t->r_ctx.action.code;
if (action == NGX_HTTP_TFS_ACTION_REMOVE_FILE){
if (t->r_ctx.unlink_type == NGX_HTTP_TFS_UNLINK_UNDELETE) {
action = NGX_HTTP_TFS_ACTION_UNDELETE_FILE;

} else if (t->r_ctx.unlink_type == NGX_HTTP_TFS_UNLINK_CONCEAL) {
action = NGX_HTTP_TFS_ACTION_CONCEAL_FILE;

} else if (t->r_ctx.unlink_type == NGX_HTTP_TFS_UNLINK_REVEAL) {
action = NGX_HTTP_TFS_ACTION_REVEAL_FILE;
}
}

ngx_log_error(NGX_LOG_INFO, t->srv_conf->log, 0,
"%d, %uL, %V, %V, %uD, %uL, %uL, %uL, %V",
t->r_ctx.action.code,
action,
t->loc_conf->upstream->enable_rcs ?
t->rc_info_node->app_id : NGX_HTTP_TFS_DEFAULT_APPID,
&t->file_name,
Expand Down Expand Up @@ -911,6 +928,12 @@ ngx_http_tfs_process_upstream_request(ngx_http_request_t *r, ngx_http_tfs_t *t)
tp->peer.name,
tp->peer_addr_text);

/* for test ds retry */
//if (ngx_strncmp(p->name->data, ds_name.data, p->name->len) == 0) {
// ngx_http_tfs_handle_connection_failure(t, tp);
// return;
//}

if (!t->request_sent && ngx_http_tfs_test_connect(c) != NGX_OK) {
ngx_http_tfs_handle_connection_failure(t, tp);
return;
Expand Down Expand Up @@ -977,11 +1000,7 @@ ngx_http_tfs_process_upstream_request(ngx_http_request_t *r, ngx_http_tfs_t *t)
if (t->parse_state == NGX_HTTP_TFS_HEADER) {
rc = ngx_http_tfs_process_header(t, n);

if (rc == NGX_DECLINED) {
t->parse_state = NGX_HTTP_TFS_BODY;
}

if (rc == NGX_AGAIN || rc == NGX_DECLINED) {
if (rc == NGX_AGAIN) {
continue;
}

Expand All @@ -990,10 +1009,10 @@ ngx_http_tfs_process_upstream_request(ngx_http_request_t *r, ngx_http_tfs_t *t)
}

t->parse_state = NGX_HTTP_TFS_BODY;
n = rc;
}

tp->body_buffer.last += n;
} else {
tp->body_buffer.last += n;
}

rc = t->process_request_body(t);

Expand All @@ -1014,6 +1033,7 @@ ngx_http_tfs_send_response(ngx_http_request_t *r, ngx_http_tfs_t *t)
{
int tcp_nodelay;
ngx_int_t rc;
ngx_http_tfs_t *parent_tfs;
ngx_connection_t *c;
ngx_http_core_loc_conf_t *clcf;

Expand Down Expand Up @@ -1047,7 +1067,14 @@ ngx_http_tfs_send_response(ngx_http_request_t *r, ngx_http_tfs_t *t)
}
}

if (!r->header_sent) {
if (t->parent == NULL) {
parent_tfs = t;

} else {
parent_tfs = t->parent;
}

if (!parent_tfs->header_sent) {
ngx_http_tfs_set_header_line(t);

rc = ngx_http_send_header(r);
Expand All @@ -1057,6 +1084,8 @@ ngx_http_tfs_send_response(ngx_http_request_t *r, ngx_http_tfs_t *t)
return;
}

parent_tfs->header_sent = 1;

if (t->header_only) {
ngx_http_tfs_finalize_request(r, t, rc);
return;
Expand Down Expand Up @@ -1272,6 +1301,13 @@ ngx_http_tfs_set_header_line(ngx_http_tfs_t *t)
if (t->r_ctx.chk_file_hole && t->json_output) {
r->headers_out.content_type_len = sizeof("application/json")- 1;
ngx_str_set(&r->headers_out.content_type, "application/json");

} else {
if (t->headers_in.content_type != NULL) {
r->headers_out.content_type_len = t->headers_in.content_type->value.len;
r->headers_out.content_type = t->headers_in.content_type->value;
r->headers_out.content_type_lowcase = NULL;
}
}

/* set last-modified if have */
Expand Down Expand Up @@ -1922,7 +1958,7 @@ ngx_http_tfs_misc_ctx_init(ngx_http_tfs_t *t, ngx_http_tfs_rcs_info_t *rc_info)
break;

case NGX_HTTP_TFS_ACTION_WRITE_FILE:
t->state = NGX_HTTP_TFS_STATE_WRITE_CLUSTER_ID_NS;
t->state = NGX_HTTP_TFS_STATE_WRITE_GET_BLK_INFO;
break;

case NGX_HTTP_TFS_ACTION_REMOVE_FILE:
Expand Down Expand Up @@ -1951,13 +1987,17 @@ ngx_http_tfs_misc_ctx_init(ngx_http_tfs_t *t, ngx_http_tfs_rcs_info_t *rc_info)
&t->tfs_peer_servers[NGX_HTTP_TFS_NAME_SERVER], &t->name_server_addr);

logical_cluster = &rc_info->logical_clusters[t->logical_cluster_index];
/* skip get cluster id from ns */
if (t->r_ctx.action.code == NGX_HTTP_TFS_ACTION_WRITE_FILE) {
/* check if need get cluster id from ns */
if (t->r_ctx.action.code == NGX_HTTP_TFS_ACTION_WRITE_FILE
&& t->state == NGX_HTTP_TFS_STATE_WRITE_GET_BLK_INFO)
{
physical_cluster =
&logical_cluster->rw_clusters[t->rw_cluster_index];
if (physical_cluster->cluster_id > 0) {
t->file.cluster_id = physical_cluster->cluster_id;
t->state = NGX_HTTP_TFS_STATE_WRITE_GET_BLK_INFO;

} else {
t->state = NGX_HTTP_TFS_STATE_WRITE_CLUSTER_ID_NS;
}
}

Expand All @@ -1984,6 +2024,13 @@ ngx_http_tfs_misc_ctx_init(ngx_http_tfs_t *t, ngx_http_tfs_rcs_info_t *rc_info)
}

t->file.segment_data[0].data = t->send_body;
/* copy data to orig_data so that we can retry write */
rc = ngx_chain_add_copy_with_buf(t->pool,
&t->file.segment_data[0].orig_data, t->file.segment_data[0].data);
if (rc == NGX_ERROR) {
return NGX_ERROR;
}

t->file.segment_data[0].segment_info.size =
ngx_http_tfs_get_chain_buf_size(t->send_body);
t->file.left_length= t->file.segment_data[0].segment_info.size;
Expand Down Expand Up @@ -2093,6 +2140,7 @@ ngx_http_tfs_misc_ctx_init(ngx_http_tfs_t *t, ngx_http_tfs_rcs_info_t *rc_info)
break;

case NGX_HTTP_TFS_ACTION_WRITE_FILE:
t->group_seq = -1;
if (t->is_large_file || t->r_ctx.version == 2) {
rc = ngx_http_tfs_get_segment_for_write(t);
if (rc == NGX_ERROR) {
Expand All @@ -2109,7 +2157,7 @@ ngx_http_tfs_misc_ctx_init(ngx_http_tfs_t *t, ngx_http_tfs_rcs_info_t *rc_info)
&& !t->r_ctx.no_dedup)
{
/* update is not allowed when using dedup */
if (t->r_ctx.file_path_s.len > 0) {
if (t->r_ctx.is_raw_update > 0) {
return NGX_HTTP_BAD_REQUEST;
}

Expand Down Expand Up @@ -2228,6 +2276,7 @@ ngx_http_tfs_batch_process_start(ngx_http_tfs_t *t)
return NGX_ERROR;
}

st->index = i;
st->sp_callback = ngx_http_tfs_batch_process_next;

/* send(to upstream servers) and output(to client) bufs */
Expand Down
8 changes: 6 additions & 2 deletions src/http/modules/tfs/ngx_http_tfs.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,15 +74,14 @@ struct ngx_http_tfs_segment_data_s {
uint32_t oper_offset;
/* read/write size inside this segment */
uint32_t oper_size;
/* current writing data's crc */
uint32_t curr_crc;
union {
uint64_t write_file_number;
};
ngx_http_tfs_block_info_t block_info;
ngx_uint_t ds_retry;
ngx_uint_t ds_index;
ngx_chain_t *data;
ngx_chain_t *orig_data; /* for write retry */
} NGX_PACKED;


Expand Down Expand Up @@ -321,10 +320,14 @@ struct ngx_http_tfs_s {
ngx_http_tfs_raw_file_info_t file_info;
ngx_buf_t *readv2_rsp_tail_buf;
uint8_t read_ver;
uint8_t retry_count;

/* block cache */
ngx_http_tfs_block_cache_ctx_t block_cache_ctx;

/* read index */
uint16_t index;

/* for parallel write segments */
ngx_http_tfs_t *parent;
ngx_http_tfs_t *next;
Expand All @@ -350,6 +353,7 @@ struct ngx_http_tfs_s {
unsigned request_timeout:1;
unsigned client_abort:1;
unsigned is_rolling_back:1;
unsigned header_sent:1;
};


Expand Down
Loading

0 comments on commit 5734b40

Please sign in to comment.