Skip to content

Commit

Permalink
fix conflicts
Browse files Browse the repository at this point in the history
  • Loading branch information
kedixa committed Jun 9, 2021
2 parents cde9fbb + 3d92b8b commit faffe4d
Show file tree
Hide file tree
Showing 34 changed files with 622 additions and 228 deletions.
8 changes: 5 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ $ make
* Asynchronous File IO tasks
* [Http server with file IO:http\_file\_server](docs/en/tutorial-09-http_file_server.md)
* User-defined protocol
* [A simple user-defined portocol: client/server](docs/en/tutorial-10-user_defined_protocol.md)
* [A simple user-defined protocol: client/server](docs/en/tutorial-10-user_defined_protocol.md)
* Timing tasks and counting tasks
* [About timer](docs/en/about-timer.md)
* [About counter](docs/en/about-counter.md)
Expand Down Expand Up @@ -141,7 +141,9 @@ Memory reclamation mechanism
* When a series is a branch of a parallel, it will be recycled after the callback of the parallel that it belongs to.
* This project doesn’t use `std::shared_ptr` to manage memory.
#### More design documents
#### Any other questions?
To be continued...
You may check the [FAQ](https://github.com/sogou/workflow/issues/406) and [issues](https://github.com/sogou/workflow/issues) list first to see if you can find the answer.
You are very welcome to send the problems you encounter in use to [issues](https://github.com/sogou/workflow/issues), and we will answer them as soon as possible. At the same time, more issues will also help new users.
4 changes: 4 additions & 0 deletions docs/en/tutorial-12-mysql_cli.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ After startup, you can directly enter MySQL command in the terminal to interact

mysql://username:password@host:port/dbname?character\_set=charset&character\_set\_results=charset

- set scheme to be **mysqls://** for accessing MySQL with SSL connnection (MySQL server 5.7 or above is required).

- fill in the username and the password for the MySQL database;

- the default port number is 3306;
Expand All @@ -34,6 +36,8 @@ mysql://root:[email protected]

mysql://@test.mysql.com:3306/db1?character\_set=utf8&character_set_results=utf8

mysqls://localhost/db1?character\_set=big5

# Creating and starting a MySQL task

You can use WFTaskFactory to create a MySQL task. The usage of creating interface and callback functions are similar to other tasks in workflow:
Expand Down
4 changes: 4 additions & 0 deletions docs/tutorial-12-mysql_cli.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@

mysql://username:password@host:port/dbname?character_set=charset&character_set_results=charset

- 如果以SSL连接访问MySQL,则scheme设为**mysqls://**。MySQL server 5.7及以上支持;

- username和password按需填写;

- port默认为3306;
Expand All @@ -33,6 +35,8 @@ mysql://root:[email protected]

mysql://@test.mysql.com:3306/db1?character_set=utf8&character_set_results=utf8

mysqls://localhost/db1?character\_set=big5

# 创建并启动MySQL任务

用户可以使用WFTaskFactory创建MySQL任务,创建接口与回调函数的用法都与workflow其他任务类似:
Expand Down
1 change: 1 addition & 0 deletions src/client/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,5 @@ if (KAFKA STREQUAL "y")
WFKafkaClient.cc
)
add_library("client_kafka" OBJECT ${SRC})
set_property(SOURCE WFKafkaClient.cc APPEND PROPERTY COMPILE_OPTIONS "-fno-rtti")
endif ()
3 changes: 3 additions & 0 deletions src/client/WFKafkaClient.cc
Original file line number Diff line number Diff line change
Expand Up @@ -503,6 +503,7 @@ void ComplexKafkaTask::kafka_timer_callback(WFTimerTask *task)
kafka_heartbeat_callback);

kafka_task->user_data = t;
kafka_task->get_req()->set_config(*t->get_config());
kafka_task->get_req()->set_api(Kafka_Heartbeat);
kafka_task->get_req()->set_cgroup(*t->get_cgroup());
kafka_task->get_req()->set_broker(*coordinator);
Expand Down Expand Up @@ -772,6 +773,7 @@ void ComplexKafkaTask::kafka_cgroup_callback(__WFKafkaTask *task)
t->retry_max,
kafka_heartbeat_callback);
kafka_task->user_data = hb;
kafka_task->get_req()->set_config(t->config);
kafka_task->get_req()->set_api(Kafka_Heartbeat);
kafka_task->get_req()->set_cgroup(t->cgroup);
kafka_task->get_req()->set_broker(*coordinator);
Expand Down Expand Up @@ -1166,6 +1168,7 @@ void ComplexKafkaTask::dispatch()
task = __WFKafkaTaskFactory::create_kafka_task(addr, socklen,
0, kafka_leavegroup_callback);
task->user_data = this;
task->get_req()->set_config(this->config);
task->get_req()->set_api(Kafka_LeaveGroup);
task->get_req()->set_broker(*coordinator);
task->get_req()->set_cgroup(this->cgroup);
Expand Down
1 change: 1 addition & 0 deletions src/factory/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,6 @@ if (KAFKA STREQUAL "y")
KafkaTaskImpl.cc
)
add_library("factory_kafka" OBJECT ${SRC})
set_property(SOURCE KafkaTaskImpl.cc APPEND PROPERTY COMPILE_OPTIONS "-fno-rtti")
endif ()

121 changes: 62 additions & 59 deletions src/factory/HttpTaskImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -433,13 +433,13 @@ class ComplexHttpProxyTask : public ComplexHttpTask
ComplexHttpProxyTask(int redirect_max,
int retry_max,
http_callback_t&& callback):
ComplexHttpTask(redirect_max, retry_max, std::move(callback))
ComplexHttpTask(redirect_max, retry_max, std::move(callback)),
is_user_request_(true)
{ }

void set_user_uri(ParsedURI&& uri) { user_uri_ = std::move(uri); }
void set_user_uri(const ParsedURI& uri) { user_uri_ = uri; }

virtual WFConnection *get_connection() const;
virtual const ParsedURI *get_current_uri() const { return &user_uri_; }

protected:
Expand All @@ -449,44 +449,52 @@ class ComplexHttpProxyTask : public ComplexHttpTask
virtual bool init_success();
virtual bool finish_once();

protected:
virtual WFConnection *get_connection() const
{
WFConnection *conn = this->ComplexHttpTask::get_connection();

if (conn && is_ssl_)
return (SSLConnection *)conn->get_context();

return conn;
}

private:
struct ConnContext : public WFConnection
struct SSLConnection : public WFConnection
{
SSL *ssl;
SSL *ssl_;
SSLHandshaker handshaker_;
SSLWrapper wrapper_;
SSLConnection(SSL *ssl) : handshaker_(ssl), wrapper_(NULL, ssl)
{
ssl_ = ssl;
}
};

SSL *get_ssl() const;
int set_ssl();
SSLHandshaker *get_ssl_handshaker() const
{
return &((SSLConnection *)this->get_connection())->handshaker_;
}

SSLWrapper *get_ssl_wrapper(ProtocolMessage *msg) const
{
SSLConnection *conn = (SSLConnection *)this->get_connection();
conn->wrapper_ = SSLWrapper(msg, conn->ssl_);
return &conn->wrapper_;
}

int init_ssl_connection();

std::string proxy_auth_;
ParsedURI user_uri_;
bool is_ssl_;
bool is_user_request_;
int state_;
short state_;
int error_;
};

WFConnection *ComplexHttpProxyTask::get_connection() const
{
WFConnection *conn = this->WFComplexClientTask::get_connection();

if (conn && is_ssl_)
return (ConnContext *)conn->get_context();

return conn;
}

SSL *ComplexHttpProxyTask::get_ssl() const
{
WFConnection *conn = this->WFComplexClientTask::get_connection();

if (conn && is_ssl_)
return ((ConnContext *)conn->get_context())->ssl;

return NULL;
}

int ComplexHttpProxyTask::set_ssl()
int ComplexHttpProxyTask::init_ssl_connection()
{
SSL *ssl = __create_ssl(WFGlobal::get_ssl_client_ctx());
WFConnection *conn;
Expand All @@ -497,26 +505,23 @@ int ComplexHttpProxyTask::set_ssl()
SSL_set_tlsext_host_name(ssl, user_uri_.host);
SSL_set_connect_state(ssl);

conn = this->WFComplexClientTask::get_connection();
ConnContext *ctx = new ConnContext;
ctx->ssl = ssl;
conn = this->ComplexHttpTask::get_connection();
SSLConnection *ssl_conn = new SSLConnection(ssl);

auto&& deleter = [] (void *c)
auto&& deleter = [] (void *ctx)
{
ConnContext *ctx = (ConnContext *)c;
SSL_free(ctx->ssl);
delete ctx;
SSLConnection *ssl_conn = (SSLConnection *)ctx;
SSL_free(ssl_conn->ssl_);
delete ssl_conn;
};
conn->set_context(ctx, std::move(deleter));
conn->set_context(ssl_conn, std::move(deleter));
return 0;
}

CommMessageOut *ComplexHttpProxyTask::message_out()
{
long long seqid = this->get_seq();

is_user_request_ = false;

if (seqid == 0) // CONNECT
{
HttpRequest *conn_req = new HttpRequest;
Expand All @@ -536,17 +541,17 @@ CommMessageOut *ComplexHttpProxyTask::message_out()
if (!proxy_auth_.empty())
conn_req->add_header_pair("Proxy-Authorization", proxy_auth_);

is_user_request_ = false;
return conn_req;
}
else if (seqid == 1 && is_ssl_) // HANDSHAKE
return new SSLHandshaker(this->get_ssl());
{
is_user_request_ = false;
return get_ssl_handshaker();
}

auto *msg = (ProtocolMessage *)this->ComplexHttpTask::message_out();
if (is_ssl_)
return new SSLWrapper(msg, this->get_ssl());

is_user_request_ = true;
return msg;
return is_ssl_ ? get_ssl_wrapper(msg) : msg;
}

CommMessageIn *ComplexHttpProxyTask::message_in()
Expand All @@ -560,11 +565,11 @@ CommMessageIn *ComplexHttpProxyTask::message_in()
return conn_resp;
}
else if (seqid == 1 && is_ssl_)
return new SSLHandshaker(this->get_ssl());
return get_ssl_handshaker();

auto *msg = (ProtocolMessage *)this->ComplexHttpTask::message_in();
if (is_ssl_)
return new SSLWrapper(msg, this->get_ssl());
return get_ssl_wrapper(msg);

return msg;
}
Expand All @@ -573,6 +578,8 @@ int ComplexHttpProxyTask::keep_alive_timeout()
{
long long seqid = this->get_seq();

state_ = WFT_STATE_SUCCESS;
error_ = 0;
if (seqid == 0)
{
HttpResponse *resp = this->get_resp();
Expand All @@ -597,7 +604,7 @@ int ComplexHttpProxyTask::keep_alive_timeout()

this->clear_resp();

if (is_ssl_ && set_ssl() < 0)
if (is_ssl_ && init_ssl_connection() < 0)
{
state_ = WFT_STATE_SYS_ERROR;
error_ = errno;
Expand Down Expand Up @@ -707,35 +714,31 @@ bool ComplexHttpProxyTask::init_success()
header_host += uri_.port;
}

state_ = WFT_STATE_SUCCESS;
error_ = 0;
is_user_request_ = true;

HttpRequest *client_req = this->get_req();
client_req->set_request_uri(request_uri.c_str());
client_req->set_header_pair("Host", header_host.c_str());
this->WFComplexClientTask::set_transport_type(TT_TCP);

return true;
}

bool ComplexHttpProxyTask::finish_once()
{
if (!is_user_request_)
{
long long seqid = this->get_seq();

delete this->get_message_in();
delete this->get_message_out();

if (this->state == WFT_STATE_SUCCESS && state_ != WFT_STATE_SUCCESS)
{
this->state = state_;
this->error = error_;
}

if (seqid == 0 || (seqid == 1 && is_ssl_))
return false;
if (this->get_seq() == 0)
{
delete this->get_message_in();
delete this->get_message_out();
}

is_user_request_ = true;
return false;
}

if (this->state == WFT_STATE_SUCCESS)
Expand Down
Loading

0 comments on commit faffe4d

Please sign in to comment.