-
Notifications
You must be signed in to change notification settings - Fork 1k
/
Copy pathPgSQL_Data_Stream.h
276 lines (233 loc) · 6.53 KB
/
PgSQL_Data_Stream.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
#ifndef __CLASS_PGSQL_DATA_STREAM_H
#define __CLASS_PGSQL_DATA_STREAM_H
#include "proxysql.h"
#include "cpp.h"
#include "PgSQL_Protocol.h"
#include "scram.h"
#ifndef uchar
typedef unsigned char uchar;
#endif
#include "ma_pvio.h"
#define QUEUE_T_DEFAULT_SIZE 32768
#define MY_SSL_BUFFER 8192
typedef struct _pgsql_queue_t {
void* buffer;
unsigned int size;
unsigned int head;
unsigned int tail;
unsigned int partial;
PtrSize_t pkt;
mysql_hdr hdr;
} pgsql_queue_t;
// this class avoid copying data
class PgSQL_MyDS_real_query {
public:
PtrSize_t pkt; // packet coming from the client
char* QueryPtr; // pointer to beginning of the query
unsigned int QuerySize; // size of the query
void init(PtrSize_t* _pkt) {
/*
assert(QueryPtr==NULL);
assert(QuerySize==0);
assert(pkt.ptr==NULL);
assert(pkt.size==0);
*/
pkt.ptr = _pkt->ptr;
pkt.size = _pkt->size;
QuerySize = pkt.size - 5;
if (QuerySize == 0) {
QueryPtr = const_cast<char*>("");
}
else {
QueryPtr = (char*)pkt.ptr + 5;
}
}
void end() {
l_free(pkt.size, pkt.ptr);
pkt.size = 0;
QuerySize = 0;
pkt.ptr = NULL;
QueryPtr = NULL;
}
void reset() {
pkt.size = 0;
QuerySize = 0;
pkt.ptr = NULL;
QueryPtr = NULL;
}
};
enum pgsql_sslstatus { PGSQL_SSLSTATUS_OK, PGSQL_SSLSTATUS_WANT_IO, PGSQL_SSLSTATUS_FAIL };
class PgSQL_Data_Stream
{
private:
int array2buffer();
int buffer2array();
enum pgsql_sslstatus do_ssl_handshake();
void queue_encrypted_bytes(const char* buf, size_t len);
public:
void* operator new(size_t);
void operator delete(void*);
pgsql_queue_t queueIN;
uint64_t pkts_recv; // counter of received packets
pgsql_queue_t queueOUT;
uint64_t pkts_sent; // counter of sent packets
struct {
PtrSize_t pkt;
unsigned int partial;
} CompPktIN;
struct {
PtrSize_t pkt;
unsigned int partial;
} CompPktOUT;
PgSQL_Protocol myprot;
PgSQL_MyDS_real_query mysql_real_query;
bytes_stats_t bytes_info; // bytes statistics
PtrSize_t multi_pkt;
unsigned long long pause_until;
unsigned long long wait_until;
unsigned long long killed_at;
unsigned long long max_connect_time;
struct {
unsigned long long questions;
unsigned long long pgconnpoll_get;
unsigned long long pgconnpoll_put;
} statuses;
PtrSizeArray* PSarrayIN;
PtrSizeArray* PSarrayOUT;
FixedSizeQueue data_packets_history_IN;
FixedSizeQueue data_packets_history_OUT;
//PtrSizeArray *PSarrayOUTpending;
//PtrSizeArray* resultset;
//unsigned int resultset_length;
ProxySQL_Poll<PgSQL_Data_Stream>* mypolls;
//int listener;
PgSQL_Connection* myconn;
PgSQL_Session* sess; // pointer to the session using this data stream
PgSQL_Backend* mybe; // if this is a connection to a mysql server, this points to a backend structure
char* x509_subject_alt_name;
SSL* ssl;
BIO* rbio_ssl;
BIO* wbio_ssl;
char* ssl_write_buf;
size_t ssl_write_len;
struct sockaddr* client_addr;
struct {
char* addr;
int port;
} addr;
struct {
char* addr;
int port;
} proxy_addr;
AUTHENTICATION_METHOD auth_method = AUTHENTICATION_METHOD::NO_PASSWORD;
uint32_t auth_next_pkt_type = 0;
bool auth_received_startup = false;
unsigned char tmp_login_salt[4];
ScramState* scram_state;
unsigned int connect_tries;
int query_retries_on_failure;
int connect_retries_on_failure;
enum mysql_data_stream_status DSS;
enum MySQL_DS_type myds_type;
socklen_t client_addrlen;
int fd; // file descriptor
int poll_fds_idx;
int active_transaction; // 1 if there is an active transaction
int active; // data stream is active. If not, shutdown+close needs to be called
int status; // status . FIXME: make it a ORable variable
int switching_auth_stage;
int switching_auth_type;
unsigned int tmp_charset;
short revents;
char kill_type;
bool encrypted;
bool net_failure;
uint8_t pkt_sid;
bool com_field_list;
char* com_field_wild;
PgSQL_Data_Stream();
virtual ~PgSQL_Data_Stream();
int array2buffer_full();
void init(); // initialize the data stream
void init(enum MySQL_DS_type, PgSQL_Session*, int); // initialize with arguments
void shut_soft();
void shut_hard();
int read_from_net();
int write_to_net();
int write_to_net_poll();
bool available_data_out();
void remove_pollout();
void set_pollout();
void mysql_free();
void set_net_failure();
void setDSS_STATE_QUERY_SENT_NET();
void setDSS(enum mysql_data_stream_status dss) {
DSS = dss;
}
int read_pkts();
int write_pkts();
void unplug_backend();
void check_data_flow();
int assign_fd_from_mysql_conn();
static unsigned char* copy_array_to_buffer(PtrSizeArray* resultset, size_t resultset_length, bool del);
static void copy_buffer_to_resultset(PtrSizeArray* resultset, unsigned char* ptr, uint64_t size,
char current_transaction_state);
// safe way to attach a PgSQL Connection
void attach_connection(PgSQL_Connection* mc) {
statuses.pgconnpoll_get++;
myconn = mc;
myconn->statuses.pgconnpoll_get++;
mc->myds = this;
encrypted = false; // this is the default
// PMC-10005
// we handle encryption for backend
//
// we have a similar code in MySQL_Connection
// in case of ASYNC_CONNECT_SUCCESSFUL
if (sess != NULL && sess->session_fast_forward) {
// if frontend and backend connection use SSL we will set
// encrypted = true and we will start using the SSL structure
// directly from PGconn SSL structure.
//
// For futher details:
// - without ssl: we use the file descriptor from pgsql connection
// - with ssl: we use the SSL structure from pgsql connection
if (myconn->is_connected() && myconn->get_pg_ssl_in_use()) {
if (ssl == NULL) {
encrypted = true;
SSL* ssl_obj = myconn->get_pg_ssl_object();
if (ssl_obj == NULL) assert(0); // Should not be null
ssl = ssl_obj;
rbio_ssl = BIO_new(BIO_s_mem());
wbio_ssl = BIO_new(BIO_s_mem());
SSL_set_bio(ssl, rbio_ssl, wbio_ssl);
}
}
}
}
// safe way to detach a PgSQL Connection
void detach_connection() {
assert(myconn);
myconn->statuses.pgconnpoll_put++;
statuses.pgconnpoll_put++;
myconn->myds = NULL;
myconn = NULL;
if (encrypted == true) {
if (sess != NULL && sess->session_fast_forward) {
// it seems we are a connection with SSL on a fast_forward session.
// See attach_connection() for more details .
// We now disable SSL metadata from the Data Stream
encrypted = false;
ssl = NULL;
}
}
}
void return_MySQL_Connection_To_Pool();
void destroy_MySQL_Connection_From_Pool(bool sq);
void free_mysql_real_query();
void reinit_queues();
void destroy_queues();
bool data_in_rbio();
void reset_connection();
};
#endif /* __CLASS_PGSQL_DATA_STREAM_H */