Skip to content

Commit

Permalink
http_client_send_async
Browse files Browse the repository at this point in the history
  • Loading branch information
ithewei committed Nov 30, 2020
1 parent 5a900d7 commit cd30f2e
Show file tree
Hide file tree
Showing 13 changed files with 390 additions and 116 deletions.
8 changes: 4 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -85,17 +85,17 @@ httpd: prepare
$(MAKEF) TARGET=$@ SRCDIRS=". base utils event http http/server examples/httpd"

curl: prepare
$(MAKEF) TARGET=$@ SRCDIRS=". base utils http http/client" SRCS="examples/curl.cpp"
# $(MAKEF) TARGET=$@ SRCDIRS=". base utils http http/client" SRCS="examples/curl.cpp" WITH_CURL=yes DEFINES="CURL_STATICLIB"
$(MAKEF) TARGET=$@ SRCDIRS=". base utils event http http/client" SRCS="examples/curl.cpp"
# $(MAKEF) TARGET=$@ SRCDIRS=". base utils event http http/client" SRCS="examples/curl.cpp" WITH_CURL=yes DEFINES="CURL_STATICLIB"

http_server_test: prepare
$(MAKEF) TARGET=$@ SRCDIRS=". base utils event http http/server" SRCS="examples/http_server_test.cpp"

http_client_test: prepare
$(MAKEF) TARGET=$@ SRCDIRS=". base utils http http/client" SRCS="examples/http_client_test.cpp"
$(MAKEF) TARGET=$@ SRCDIRS=". base utils event http http/client" SRCS="examples/http_client_test.cpp"

consul_cli: prepare
$(MAKEF) TARGET=$@ SRCDIRS=". base utils http http/client consul" SRCS="examples/consul_cli.cpp" DEFINES="PRINT_DEBUG"
$(MAKEF) TARGET=$@ SRCDIRS=". base utils event http http/client consul" SRCS="examples/consul_cli.cpp" DEFINES="PRINT_DEBUG"

unittest: prepare
$(CC) -g -Wall -std=c99 -I. -Ibase -o bin/mkdir_p unittest/mkdir_test.c base/hbase.c
Expand Down
5 changes: 3 additions & 2 deletions Makefile.in
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,10 @@ endif

# CFLAGS, CXXFLAGS, ARFLAGS
ifeq ($(BUILD_TYPE), DEBUG)
DEFAULT_CFLAGS = -g -Wall
DEFAULT_CFLAGS = -g -Wall -O0
else
DEFAULT_CFLAGS += -O2
endif
DEFAULT_CFLAGS += -O2

CFLAGS ?= $(DEFAULT_CFLAGS)
CXXFLAGS ?= $(DEFAULT_CFLAGS)
Expand Down
44 changes: 21 additions & 23 deletions event/hloop.c
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,16 @@ static void hloop_cleanup(hloop_t* loop) {
loop->pendings[i] = NULL;
}

// ios
printd("cleanup ios...\n");
for (int i = 0; i < loop->ios.maxsize; ++i) {
hio_t* io = loop->ios.ptr[i];
if (io) {
hio_free(io);
}
}
io_array_cleanup(&loop->ios);

// idles
printd("cleanup idles...\n");
struct list_node* node = loop->idles.next;
Expand All @@ -223,16 +233,6 @@ static void hloop_cleanup(hloop_t* loop) {
}
heap_init(&loop->timers, NULL);

// ios
printd("cleanup ios...\n");
for (int i = 0; i < loop->ios.maxsize; ++i) {
hio_t* io = loop->ios.ptr[i];
if (io) {
hio_free(io);
}
}
io_array_cleanup(&loop->ios);

// readbuf
if (loop->readbuf.base && loop->readbuf.len) {
HV_FREE(loop->readbuf.base);
Expand Down Expand Up @@ -660,7 +660,7 @@ int hio_del(hio_t* io, int events) {

hio_t* hread(hloop_t* loop, int fd, void* buf, size_t len, hread_cb read_cb) {
hio_t* io = hio_get(loop, fd);
if (io == NULL) return NULL;
assert(io != NULL);
io->readbuf.base = (char*)buf;
io->readbuf.len = len;
if (read_cb) {
Expand All @@ -672,7 +672,7 @@ hio_t* hread(hloop_t* loop, int fd, void* buf, size_t len, hread_cb read_cb) {

hio_t* hwrite(hloop_t* loop, int fd, const void* buf, size_t len, hwrite_cb write_cb) {
hio_t* io = hio_get(loop, fd);
if (io == NULL) return NULL;
assert(io != NULL);
if (write_cb) {
io->write_cb = write_cb;
}
Expand All @@ -682,8 +682,7 @@ hio_t* hwrite(hloop_t* loop, int fd, const void* buf, size_t len, hwrite_cb writ

hio_t* haccept(hloop_t* loop, int listenfd, haccept_cb accept_cb) {
hio_t* io = hio_get(loop, listenfd);
if (io == NULL) return NULL;
io->accept = 1;
assert(io != NULL);
if (accept_cb) {
io->accept_cb = accept_cb;
}
Expand All @@ -693,8 +692,7 @@ hio_t* haccept(hloop_t* loop, int listenfd, haccept_cb accept_cb) {

hio_t* hconnect (hloop_t* loop, int connfd, hconnect_cb connect_cb) {
hio_t* io = hio_get(loop, connfd);
if (io == NULL) return NULL;
io->connect = 1;
assert(io != NULL);
if (connect_cb) {
io->connect_cb = connect_cb;
}
Expand All @@ -704,13 +702,13 @@ hio_t* hconnect (hloop_t* loop, int connfd, hconnect_cb connect_cb) {

void hclose (hloop_t* loop, int fd) {
hio_t* io = hio_get(loop, fd);
if (io == NULL) return;
assert(io != NULL);
hio_close(io);
}

hio_t* hrecv (hloop_t* loop, int connfd, void* buf, size_t len, hread_cb read_cb) {
//hio_t* io = hio_get(loop, connfd);
//if (io == NULL) return NULL;
//assert(io != NULL);
//io->recv = 1;
//if (io->io_type != HIO_TYPE_SSL) {
//io->io_type = HIO_TYPE_TCP;
Expand All @@ -720,7 +718,7 @@ hio_t* hrecv (hloop_t* loop, int connfd, void* buf, size_t len, hread_cb read_cb

hio_t* hsend (hloop_t* loop, int connfd, const void* buf, size_t len, hwrite_cb write_cb) {
//hio_t* io = hio_get(loop, connfd);
//if (io == NULL) return NULL;
//assert(io != NULL);
//io->send = 1;
//if (io->io_type != HIO_TYPE_SSL) {
//io->io_type = HIO_TYPE_TCP;
Expand All @@ -730,15 +728,15 @@ hio_t* hsend (hloop_t* loop, int connfd, const void* buf, size_t len, hwrite_cb

hio_t* hrecvfrom (hloop_t* loop, int sockfd, void* buf, size_t len, hread_cb read_cb) {
//hio_t* io = hio_get(loop, sockfd);
//if (io == NULL) return NULL;
//assert(io != NULL);
//io->recvfrom = 1;
//io->io_type = HIO_TYPE_UDP;
return hread(loop, sockfd, buf, len, read_cb);
}

hio_t* hsendto (hloop_t* loop, int sockfd, const void* buf, size_t len, hwrite_cb write_cb) {
//hio_t* io = hio_get(loop, sockfd);
//if (io == NULL) return NULL;
//assert(io != NULL);
//io->sendto = 1;
//io->io_type = HIO_TYPE_UDP;
return hwrite(loop, sockfd, buf, len, write_cb);
Expand Down Expand Up @@ -771,7 +769,7 @@ hio_t* hloop_create_tcp_client (hloop_t* loop, const char* host, int port, hconn
}

hio_t* io = hio_get(loop, connfd);
if (io == NULL) return NULL;
assert(io != NULL);
hio_set_peeraddr(io, &peeraddr.sa, sockaddr_len(&peeraddr));
hconnect(loop, connfd, connect_cb);
return io;
Expand Down Expand Up @@ -803,7 +801,7 @@ hio_t* hloop_create_udp_client(hloop_t* loop, const char* host, int port) {
}

hio_t* io = hio_get(loop, sockfd);
if (io == NULL) return NULL;
assert(io != NULL);
hio_set_peeraddr(io, &peeraddr.sa, sockaddr_len(&peeraddr));
return io;
}
Expand Down
2 changes: 2 additions & 0 deletions event/nio.c
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,7 @@ static void hio_handle_events(hio_t* io) {
}

int hio_accept(hio_t* io) {
io->accept = 1;
hio_add(io, hio_handle_events, HV_READ);
return 0;
}
Expand All @@ -483,6 +484,7 @@ int hio_connect(hio_t* io) {
int timeout = io->connect_timeout ? io->connect_timeout : HIO_DEFAULT_CONNECT_TIMEOUT;
io->connect_timer = htimer_add(io->loop, __connect_timeout_cb, timeout, 1);
io->connect_timer->privdata = io;
io->connect = 1;
return hio_add(io, hio_handle_events, HV_WRITE);
}

Expand Down
2 changes: 2 additions & 0 deletions event/overlapio.c
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,7 @@ int hio_accept (hio_t* io) {
for (int i = 0; i < ACCEPTEX_NUM; ++i) {
post_acceptex(io, NULL);
}
io->accept = 1;
return hio_add(io, hio_handle_events, HV_READ);
}

Expand Down Expand Up @@ -286,6 +287,7 @@ int hio_connect (hio_t* io) {
goto error;
}
}
io->connect = 1;
return hio_add(io, hio_handle_events, HV_WRITE);
error:
hio_close(io);
Expand Down
48 changes: 46 additions & 2 deletions examples/http_client_test.cpp
Original file line number Diff line number Diff line change
@@ -1,7 +1,38 @@
#include "requests.h"

int main() {
auto resp = requests::get("http://www.example.com");
#include "hthread.h" // import hv_gettid

static void onResponse(int state, HttpRequestPtr req, HttpResponsePtr resp, void* userdata) {
printf("test_http_async_client response thread tid=%ld\n", hv_gettid());
if (state != 0) {
printf("onError: %s:%d\n", http_client_strerror(state), state);
} else {
printf("onSuccess\n");
printf("%d %s\r\n", resp->status_code, resp->status_message());
printf("%s\n", resp->body.c_str());
}

int* finished = (int*)userdata;
*finished = 1;
}

static void test_http_async_client(int* finished) {
printf("test_http_async_client request thread tid=%ld\n", hv_gettid());
HttpRequestPtr req = HttpRequestPtr(new HttpRequest);
HttpResponsePtr resp = HttpResponsePtr(new HttpResponse);
req->method = HTTP_POST;
req->url = "127.0.0.1:8080/echo";
req->body = "this is an async request.";
req->timeout = 10;
int ret = http_client_send_async(req, resp, onResponse, (void*)finished);
if (ret != 0) {
printf("http_client_send_async error: %s:%d\n", http_client_strerror(ret), ret);
*finished = 1;
}
}

static void test_http_sync_client() {
auto resp = requests::get("http://127.0.0.1:8080/ping");
if (resp == NULL) {
printf("request failed!\n");
} else {
Expand All @@ -16,6 +47,19 @@ int main() {
printf("%d %s\r\n", resp2->status_code, resp2->status_message());
printf("%s\n", resp2->body.c_str());
}
}

int main() {
int finished = 0;
test_http_async_client(&finished);

test_http_sync_client();

// demo wait async ResponseCallback
while (!finished) {
hv_delay(100);
}
printf("finished!\n");

return 0;
}
10 changes: 9 additions & 1 deletion http/HttpMessage.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#ifndef HTTP_MESSAGE_H_
#define HTTP_MESSAGE_H_

#include <memory>
#include <string>
#include <map>

Expand Down Expand Up @@ -150,7 +151,8 @@ class HV_EXPORT HttpRequest : public HttpMessage {
std::string path;
QueryParams query_params;
// client_addr
HNetAddr client_addr;
HNetAddr client_addr; // for http server save client addr of request
int timeout; // for http client timeout

HttpRequest() : HttpMessage() {
type = HTTP_REQUEST;
Expand All @@ -165,6 +167,7 @@ class HV_EXPORT HttpRequest : public HttpMessage {
host = "127.0.0.1";
port = DEFAULT_HTTP_PORT;
path = "/";
timeout = 0;
}

virtual void Reset() {
Expand Down Expand Up @@ -214,4 +217,9 @@ class HV_EXPORT HttpResponse : public HttpMessage {
virtual std::string Dump(bool is_dump_headers = true, bool is_dump_body = false);
};

typedef std::shared_ptr<HttpRequest> HttpRequestPtr;
typedef std::shared_ptr<HttpResponse> HttpResponsePtr;
// state: 0 onSucceed other onError
typedef void (*HttpResponseCallback)(int state, HttpRequestPtr req, HttpResponsePtr resp, void* userdata);

#endif // HTTP_MESSAGE_H_
2 changes: 2 additions & 0 deletions http/HttpParser.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,6 @@ class HV_EXPORT HttpParser {
virtual const char* StrError(int error) = 0;
};

typedef std::shared_ptr<HttpParser> HttpParserPtr;

#endif // HTTP_PARSER_H_
Loading

0 comments on commit cd30f2e

Please sign in to comment.