forked from yedf2/handy
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathkqueue.cc
132 lines (123 loc) · 4.42 KB
/
kqueue.cc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
/*
* 编译:c++ -o kqueue kqueue.cc
* 运行: ./kqueue
* 测试:echo abc | nc localhost 2099
* 结果:abc
* 例子的echo返回了 abc
*/
#include <sys/socket.h>
#include <sys/event.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <fcntl.h>
#include <unistd.h>
#include <stdio.h>
#include <errno.h>
#include <string.h>
#include <stdlib.h>
#define exit_if(r, ...) if(r) {printf(__VA_ARGS__); printf("error no: %d error msg %s\n", errno, strerror(errno)); exit(1);}
const int kReadEvent = 1;
const int kWriteEvent = 2;
void setNonBlock(int fd) {
int flags = fcntl(fd, F_GETFL, 0);
exit_if(flags<0, "fcntl failed");
int r = fcntl(fd, F_SETFL, flags | O_NONBLOCK);
exit_if(r<0, "fcntl failed");
}
void updateEvents(int efd, int fd, int events, bool modify) {
struct kevent ev[2];
int n = 0;
if (events & kReadEvent) {
EV_SET(&ev[n++], fd, EVFILT_READ, EV_ADD|EV_ENABLE, 0, 0, (void*)(intptr_t)fd);
} else if (modify){
EV_SET(&ev[n++], fd, EVFILT_READ, EV_DELETE, 0, 0, (void*)(intptr_t)fd);
}
if (events & kWriteEvent) {
EV_SET(&ev[n++], fd, EVFILT_WRITE, EV_ADD|EV_ENABLE, 0, 0, (void*)(intptr_t)fd);
} else if (modify){
EV_SET(&ev[n++], fd, EVFILT_WRITE, EV_DELETE, 0, 0, (void*)(intptr_t)fd);
}
printf("%s fd %d events read %d write %d\n",
modify ? "mod" : "add", fd, events & kReadEvent, events & kWriteEvent);
int r = kevent(efd, ev, n, NULL, 0, NULL);
exit_if(r, "kevent failed ");
}
void handleAccept(int efd, int fd) {
struct sockaddr_in raddr;
socklen_t rsz = sizeof(raddr);
int cfd = accept(fd,(struct sockaddr *)&raddr,&rsz);
exit_if(cfd<0, "accept failed");
sockaddr_in peer, local;
socklen_t alen = sizeof(peer);
int r = getpeername(cfd, (sockaddr*)&peer, &alen);
exit_if(r<0, "getpeername failed");
printf("accept a connection from %s\n", inet_ntoa(raddr.sin_addr));
setNonBlock(cfd);
updateEvents(efd, cfd, kReadEvent|kWriteEvent, false);
}
void handleRead(int efd, int fd) {
char buf[4096];
int n = 0;
while ((n=::read(fd, buf, sizeof buf)) > 0) {
printf("read %d bytes\n", n);
int r = ::write(fd, buf, n); //写出读取的数据
//实际应用中,写出数据可能会返回EAGAIN,此时应当监听可写事件,当可写时再把数据写出
exit_if(r<=0, "write error");
}
if (n<0 && (errno == EAGAIN || errno == EWOULDBLOCK))
return;
exit_if(n<0, "read error"); //实际应用中,n<0应当检查各类错误,如EINTR
printf("fd %d closed\n", fd);
close(fd);
}
void handleWrite(int efd, int fd) {
//实际应用应当实现可写时写出数据,无数据可写才关闭可写事件
updateEvents(efd, fd, kReadEvent, true);
}
void loop_once(int efd, int lfd, int waitms) {
struct timespec timeout;
timeout.tv_sec = waitms / 1000;
timeout.tv_nsec = (waitms % 1000) * 1000 * 1000;
const int kMaxEvents = 20;
struct kevent activeEvs[kMaxEvents];
int n = kevent(efd, NULL, 0, activeEvs, kMaxEvents, &timeout);
printf("kqueue return %d\n", n);
for (int i = 0; i < n; i ++) {
int fd = (int)(intptr_t)activeEvs[i].udata;
int events = activeEvs[i].filter;
if (events == EVFILT_READ) {
if (fd == lfd) {
handleAccept(efd, fd);
} else {
handleRead(efd, fd);
}
} else if (events == EVFILT_WRITE) {
handleWrite(efd, fd);
} else {
exit_if(1, "unknown event");
}
}
}
int main() {
short port = 2099;
int epollfd = kqueue();
exit_if(epollfd < 0, "kqueue failed");
int listenfd = socket(AF_INET, SOCK_STREAM, 0);
exit_if(listenfd < 0, "socket failed");
struct sockaddr_in addr;
memset(&addr, 0, sizeof addr);
addr.sin_family = AF_INET;
addr.sin_port = htons(port);
addr.sin_addr.s_addr = INADDR_ANY;
int r = ::bind(listenfd,(struct sockaddr *)&addr, sizeof(struct sockaddr));
exit_if(r, "bind to 0.0.0.0:%d failed %d %s", port, errno, strerror(errno));
r = listen(listenfd, 20);
exit_if(r, "listen failed %d %s", errno, strerror(errno));
printf("fd %d listening at %d\n", listenfd, port);
setNonBlock(listenfd);
updateEvents(epollfd, listenfd, kReadEvent, false);
for (;;) { //实际应用应当注册信号处理函数,退出时清理资源
loop_once(epollfd, listenfd, 10000);
}
return 0;
}