Skip to content

Commit

Permalink
Call io_uring_prep_accept() right after last accept() completes
Browse files Browse the repository at this point in the history
  • Loading branch information
xxyzz committed Jan 26, 2022
1 parent 8862661 commit ebee738
Show file tree
Hide file tree
Showing 4 changed files with 137 additions and 142 deletions.
19 changes: 2 additions & 17 deletions 33/client.c
Original file line number Diff line number Diff line change
@@ -1,27 +1,12 @@
#include "connection.h"
#include <arpa/inet.h> // htons, htonl
#include <netinet/in.h> // sockaddr_in
#include <stdio.h>
#include <string.h> // memset
#include <sys/socket.h> // socket, connect
#include <unistd.h> // close
#include <unistd.h> // close

int main(int argc, char *argv[]) {
if (argc != 2) {
printf("Usage: %s file_path\n", argv[0]);
exit(EXIT_FAILURE);
}
struct sockaddr_in server_addr;
int sfd = socket(AF_INET, SOCK_STREAM, 0);
if (sfd == -1)
handle_error("socket");
memset(&server_addr, 0, sizeof(server_addr));
server_addr.sin_family = AF_INET;
server_addr.sin_port = htons(SOCKET_PORT);
server_addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);

if (connect(sfd, (struct sockaddr *)&server_addr, sizeof(server_addr)) != 0)
handle_error("connect");
int sfd = init_socket(0);
if (send(sfd, argv[1], strlen(argv[1]), 0) == -1)
handle_error("send");
char buf[BUFSIZ] = "";
Expand Down
29 changes: 28 additions & 1 deletion 33/connection.h
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
#include <stdlib.h> // perror, exit, atoi
#include <arpa/inet.h> // htons, htonl
#include <netinet/in.h> // sockaddr_in
#include <stdio.h> // perror
#include <stdlib.h> // exit, atoi
#include <string.h> // memset, memcpy, strerror
#include <sys/socket.h> // socket, bind, listen, AF_INET

#define LISTEN_BACKLOG 80 // maxium length of the pending connections queue
#define SOCKET_PORT 8080
Expand All @@ -7,3 +12,25 @@
perror(msg); \
exit(EXIT_FAILURE); \
} while (0)

int init_socket(int is_server) {
struct sockaddr_in addr;
int sfd = socket(AF_INET, SOCK_STREAM, 0);
if (sfd == -1)
handle_error("socket");
memset(&addr, 0, sizeof(addr));
addr.sin_family = AF_INET;
addr.sin_port = htons(SOCKET_PORT);
addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
const int optval = 1;
if (setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(optval)))
handle_error("setsocketopt");
if (is_server) {
if (bind(sfd, (struct sockaddr *)&addr, sizeof(addr)) == -1)
handle_error("bind");
if (listen(sfd, LISTEN_BACKLOG) == -1)
handle_error("listen");
} else if (connect(sfd, (struct sockaddr *)&addr, sizeof(addr)) != 0)
handle_error("connect");
return sfd;
}
24 changes: 1 addition & 23 deletions 33/server_epoll.c
Original file line number Diff line number Diff line change
@@ -1,13 +1,8 @@
#include "connection.h"
#include <arpa/inet.h> // htons, htonl
#include <errno.h>
#include <fcntl.h> // open
#include <netinet/in.h> // sockaddr_in
#include <stdio.h>
#include <string.h> // memset
#include <sys/epoll.h>
#include <sys/sendfile.h>
#include <sys/socket.h> // socket, bind, listen, accept, recv, AF_INET
#include <sys/stat.h>
#include <unistd.h> // close

Expand All @@ -20,24 +15,7 @@ int main(int argc, char *argv[]) {
exit(EXIT_FAILURE);
}
int numReqs = atoi(argv[1]);

struct sockaddr_in my_addr;
int sfd = socket(AF_INET, SOCK_STREAM, 0);
if (sfd == -1)
handle_error("socket");
memset(&my_addr, 0, sizeof(my_addr));
my_addr.sin_family = AF_INET;
my_addr.sin_port = htons(SOCKET_PORT);
my_addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
const int optval = 1;
if (setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &optval,
sizeof(optval)))
handle_error("setsocketopt");
if (bind(sfd, (struct sockaddr *)&my_addr, sizeof(my_addr)) == -1)
handle_error("bind");
if (listen(sfd, LISTEN_BACKLOG) == -1)
handle_error("listen");

int sfd = init_socket(1);
int epfd = epoll_create1(0);
if (epfd == -1)
handle_error("epoll_create1");
Expand Down
207 changes: 106 additions & 101 deletions 33/server_io_uring.c
Original file line number Diff line number Diff line change
@@ -1,18 +1,15 @@
#include "connection.h"
#include <fcntl.h> // open
#include <liburing.h>
#include <netinet/in.h> // sockaddr_in
#include <stdio.h>
#include <stdlib.h> // calloc, free
#include <string.h> // memset, memcpy
#include <sys/socket.h> // socket, bind, listen, accept, recv, AF_INET
#include <stdlib.h> // calloc, free
#include <sys/stat.h>
#include <unistd.h> // close, pipe

// https://kernel.dk/io_uring.pdf
// https://github.com/axboe/liburing
// https://github.com/torvalds/linux/blob/master/fs/io_uring.c
// https://github.com/torvalds/linux/blob/master/include/uapi/linux/io_uring.h
// https://lwn.net/Kernel/Index/#io_uring

struct user_data {
char buf[BUFSIZ];
Expand All @@ -23,142 +20,150 @@ struct user_data {
int file_fd;
};

void prep_accept(struct io_uring *ring, struct io_uring_sqe *sqe, int sfd) {
int numAccepts = 0, numReqs = 0;

void prep_accept(struct io_uring *ring, int sfd) {
struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
if (sqe == NULL)
handle_error("io_uring_get_sqe");
struct user_data *data = malloc(sizeof(struct user_data));
if (data == NULL)
handle_error("calloc");
handle_error("malloc");
data->io_op = IORING_OP_ACCEPT;
io_uring_prep_accept(sqe, sfd, NULL, NULL, 0);
// https://github.com/axboe/liburing/commit/8ecd3fd959634df81d66af8b3a69c16202a014e8
io_uring_sqe_set_data(sqe, data);
if (io_uring_submit(ring) < 0) {
free(data);
handle_error("io_muring_submit");
handle_error("io_uring_submit");
}
numAccepts--;
}

void prep_recv(struct io_uring *ring, int sfd, int cfd) {
struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
if (sqe == NULL)
handle_error("io_uring_get_sqe");
struct user_data *data = calloc(1, sizeof(struct user_data));
if (data == NULL)
handle_error("calloc");
data->io_op = IORING_OP_RECV;
data->socket_fd = cfd;
io_uring_prep_recv(sqe, cfd, data->buf, BUFSIZ, 0);
io_uring_sqe_set_data(sqe, data);
if (numAccepts > 0)
prep_accept(ring, sfd);
else if (io_uring_submit(ring) < 0) {
free(data);
handle_error("io_uring_submit");
}
}

void prep_first_splice(struct io_uring *ring, struct user_data *data) {
struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
if (sqe == NULL)
handle_error("io_uring_get_sqe");

int file_fd = open(data->buf, O_RDONLY);
if (file_fd == -1)
handle_error("open");
struct stat statbuf;
if (fstat(file_fd, &statbuf) == -1)
handle_error("fstat");
int pipefd[2];
if (pipe(pipefd) == -1)
handle_error("pipe");

struct user_data *new_data = malloc(sizeof(struct user_data));
if (new_data == NULL)
handle_error("malloc");
new_data->io_op = IORING_OP_SPLICE;
new_data->socket_fd = data->socket_fd;
new_data->file_fd = file_fd;
new_data->size = statbuf.st_size;
memcpy(new_data->pipefd, pipefd, sizeof(pipefd));
// https://github.com/axboe/liburing/blob/29ff69397fa13478b5619201347c51159874279e/src/include/liburing.h#L289-L307
io_uring_prep_splice(sqe, file_fd, -1, pipefd[1], -1, statbuf.st_size, 0);
io_uring_sqe_set_data(sqe, new_data);
if (io_uring_submit(ring) < 0) {
free(new_data);
handle_error("io_uring_submit");
}
}

void prep_second_splice(struct io_uring *ring, struct user_data *data) {
if (data->size != -1) {
struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
if (sqe == NULL)
handle_error("io_uring_get_sqe");

close(data->file_fd);
close(data->pipefd[1]);
struct user_data *new_data = malloc(sizeof(struct user_data));
if (new_data == NULL)
handle_error("malloc");
memcpy(new_data, data, sizeof(struct user_data));
new_data->size = -1;
io_uring_prep_splice(sqe, data->pipefd[0], -1, data->socket_fd, -1,
data->size, 0);
io_uring_sqe_set_data(sqe, new_data);
if (io_uring_submit(ring) < 0) {
free(new_data);
handle_error("io_uring_submit");
}
} else {
close(data->pipefd[0]);
close(data->socket_fd);
numReqs--;
}
}

int main(int argc, char *argv[]) {
if (argc != 2) {
printf("Usage: %s numReqs", argv[0]);
fprintf(stderr, "Usage: %s numReqs\n", argv[0]);
exit(EXIT_FAILURE);
}
int numReqs = atoi(argv[1]);

struct sockaddr_in my_addr;
int sfd = socket(AF_INET, SOCK_STREAM, 0);
if (sfd == -1)
handle_error("socket");
memset(&my_addr, 0, sizeof(my_addr));
my_addr.sin_family = AF_INET;
my_addr.sin_port = htons(SOCKET_PORT);
my_addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
const int optval = 1;
if (setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &optval,
sizeof(optval)))
handle_error("setsocketopt");
if (bind(sfd, (struct sockaddr *)&my_addr, sizeof(my_addr)) == -1)
handle_error("bind");
if (listen(sfd, LISTEN_BACKLOG) == -1)
handle_error("listen");

numReqs = atoi(argv[1]);
if (numReqs <= 0) {
fprintf(stderr, "Get out\n");
exit(EXIT_FAILURE);
}
numAccepts = numReqs;
int sfd = init_socket(1);
struct io_uring ring;
if (io_uring_queue_init(LISTEN_BACKLOG, &ring, 0))
handle_error("io_uring_queue_init");
struct io_uring_sqe *sqe = io_uring_get_sqe(&ring);
if (sqe == NULL)
handle_error("io_uring_get_sqe");
prep_accept(&ring, sqe, sfd);
prep_accept(&ring, sfd);

while (numReqs > 0) {
struct io_uring_cqe *cqe;
if (io_uring_wait_cqe(&ring, &cqe))
handle_error("io_uring_wait_cqe");
if (cqe->res < 0) {
fprintf(stderr, "I/O error: %s\n", strerror(-cqe->res));
exit(EXIT_FAILURE);
}
struct user_data *data = io_uring_cqe_get_data(cqe);
if (data == NULL) {
fprintf(stderr, "cqe->user_data is NULL\n");
exit(EXIT_FAILURE);
}
struct io_uring_sqe *sqe = io_uring_get_sqe(&ring);
if (sqe == NULL)
handle_error("io_uring_get_sqe");

switch (data->io_op) {
case IORING_OP_ACCEPT:
if (cqe->res == -1)
handle_error("accept");
struct user_data *new_data = calloc(1, sizeof(struct user_data));
if (new_data == NULL)
handle_error("calloc");
new_data->io_op = IORING_OP_RECV;
new_data->socket_fd = cqe->res;
io_uring_prep_recv(sqe, cqe->res, new_data->buf, BUFSIZ, 0);
io_uring_sqe_set_data(sqe, new_data);
if (io_uring_submit(&ring) < 0) {
free(new_data);
handle_error("io_uring_submit");
}
prep_recv(&ring, sfd, cqe->res);
break;
case IORING_OP_RECV:
if (cqe->res == -1)
handle_error("recv");

int file_fd = open(data->buf, O_RDONLY);
if (file_fd == -1)
handle_error("open");
struct stat statbuf;
if (fstat(file_fd, &statbuf) == -1)
handle_error("fstat");
int pipefd[2];
if (pipe(pipefd) == -1)
handle_error("pipe");

struct user_data *new_user_data = malloc(sizeof(struct user_data));
if (new_user_data == NULL)
handle_error("malloc");
new_user_data->io_op = IORING_OP_SPLICE;
new_user_data->socket_fd = data->socket_fd;
new_user_data->file_fd = file_fd;
new_user_data->size = statbuf.st_size;
memcpy(new_user_data->pipefd, pipefd, sizeof(pipefd));
// https://github.com/axboe/liburing/blob/29ff69397fa13478b5619201347c51159874279e/src/include/liburing.h#L289-L307
io_uring_prep_splice(sqe, file_fd, -1, pipefd[1], -1, statbuf.st_size, 0);
io_uring_sqe_set_data(sqe, new_user_data);
if (io_uring_submit(&ring) < 0) {
free(new_user_data);
handle_error("io_uring_submit");
}
prep_first_splice(&ring, data);
break;
case IORING_OP_SPLICE:
if (cqe->res == -1)
handle_error("splice");

if (data->size != -1) {
close(data->file_fd);
close(data->pipefd[1]);
struct user_data *new_data = malloc(sizeof(struct user_data));
if (new_data == NULL)
handle_error("malloc");
memcpy(new_data, data, sizeof(struct user_data));
new_data->size = -1;
io_uring_prep_splice(sqe, data->pipefd[0], -1, data->socket_fd, -1,
data->size, 0);
io_uring_sqe_set_data(sqe, new_data);
if (io_uring_submit(&ring) < 0) {
free(new_data);
handle_error("io_uring_submit");
}
} else {
close(data->pipefd[0]);
close(data->socket_fd);
if (--numReqs > 0)
prep_accept(&ring, sqe, sfd);
}
prep_second_splice(&ring, data);
break;
default:
handle_error("Unknown I/O");
}
io_uring_cqe_seen(&ring, cqe);
free(data);
io_uring_cqe_seen(&ring, cqe);
}
io_uring_queue_exit(&ring);
close(sfd);
Expand Down

0 comments on commit ebee738

Please sign in to comment.