Skip to content

Commit

Permalink
feat(router): stream files to socket
Browse files Browse the repository at this point in the history
  • Loading branch information
mookums committed Nov 16, 2024
1 parent fd16c0e commit 51296f2
Show file tree
Hide file tree
Showing 6 changed files with 203 additions and 108 deletions.
2 changes: 1 addition & 1 deletion src/core/job.zig
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ const TaskFn = @import("tardy").TaskFn;

pub const AfterType = union(enum) {
recv,
sse: struct {
other: struct {
func: *const anyopaque,
ctx: *anyopaque,
},
Expand Down
105 changes: 77 additions & 28 deletions src/http/context.zig
Original file line number Diff line number Diff line change
Expand Up @@ -38,60 +38,109 @@ pub fn Context(comptime Server: type) type {
triggered: bool = false,

pub fn to_sse(self: *Self, then: TaskFn(bool, *SSE)) !void {
assert(!self.triggered);
self.triggered = true;

self.response.set(.{
.status = .OK,
.body = "",
.mime = Mime{
.extension = ".sse",
.description = "Server-Sent Events",
.content_type = "text/event-stream",
},
});

const headers = try self.provision.response.headers_into_buffer(
self.provision.buffer,
null,
);

const sse = try self.allocator.create(SSE);
sse.* = .{
.context = self,
.runtime = self.runtime,
.allocator = self.allocator,
};

const pslice = Pseudoslice.init(headers, "", self.provision.buffer);
try self.respond_headers_only(
.{
.status = .OK,
.body = "",
.mime = Mime{
.extension = ".sse",
.description = "Server-Sent Events",
.content_type = "text/event-stream",
},
},
null,
sse,
then,
);
}

pub fn close(self: *Self) !void {
self.provision.job = .close;
try self.runtime.net.close(
self.provision,
Server.close_task,
self.provision.socket,
);
}

pub fn send_then(
self: *Self,
data: []const u8,
ctx: anytype,
then: TaskFn(bool, @TypeOf(ctx)),
) !void {
const pslice = Pseudoslice.init(data, "", self.provision.buffer);

const first_chunk = try Server.prepare_send(
self.runtime,
self.provision,
.{ .sse = .{
.func = then,
.ctx = sse,
} },
.{
.other = .{
.func = then,
.ctx = ctx,
},
},
pslice,
);

try self.runtime.net.send(
self.provision,
Server.send_then_sse_task,
Server.send_then_other_task,
self.provision.socket,
first_chunk,
);
}

pub fn close(self: *Self) !void {
self.provision.job = .close;
try self.runtime.net.close(
pub fn send_then_recv(self: *Self, data: []const u8) !void {
const pslice = Pseudoslice.init(data, "", self.provision.buffer);

const first_chunk = try Server.prepare_send(
self.runtime,
self.provision,
Server.close_task,
.recv,
pslice,
);

try self.runtime.net.send(
self.provision,
Server.send_then_recv_task,
self.provision.socket,
first_chunk,
);
}

// This will respond with the headers only.
// You will be in charge of sending the body.
pub fn respond_headers_only(
self: *Self,
options: ResponseSetOptions,
content_length: ?usize,
ctx: anytype,
then: TaskFn(bool, @TypeOf(ctx)),
) !void {
assert(!self.triggered);
self.triggered = true;

// the body should not be set.
assert(options.body == null);
self.response.set(options);

const headers = try self.provision.response.headers_into_buffer(
self.provision.buffer,
content_length,
);

try self.send_then(headers, ctx, then);
}

/// This is your standard response.
pub fn respond(self: *Self, options: ResponseSetOptions) !void {
assert(!self.triggered);
self.triggered = true;
Expand Down
2 changes: 1 addition & 1 deletion src/http/response.zig
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ pub const Response = struct {
}
}

pub fn headers_into_buffer(self: *Response, buffer: []u8, content_length: ?u32) ![]u8 {
pub fn headers_into_buffer(self: *Response, buffer: []u8, content_length: ?usize) ![]u8 {
var index: usize = 0;

// Status Line
Expand Down
163 changes: 111 additions & 52 deletions src/http/router.zig
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ const QueryMap = @import("routing_trie.zig").QueryMap;

const Runtime = @import("tardy").Runtime;
const Task = @import("tardy").Task;
const Stat = @import("tardy").Stat;

pub fn Router(comptime Server: type) type {
return struct {
Expand Down Expand Up @@ -50,9 +51,11 @@ pub fn Router(comptime Server: type) type {
const FileProvision = struct {
mime: Mime,
context: *Context,
request: *const Request,
response: *Response,
fd: std.posix.fd_t,
offset: usize,
list: std.ArrayList(u8),
file_size: u64,
rd_offset: usize,
buffer: []u8,
};

Expand All @@ -73,72 +76,132 @@ pub fn Router(comptime Server: type) type {
}
provision.fd = fd;

// TODO: If we have a If-None-Match by this point, we should fire off a stat request
// that way we can check the last modified time and compare that with our ETag.
// We generally avoid using the HTTP Date stuff since it can be so slow.
//
// If we have a matching etag, we can just respond with Not Modified.
// If we don't then we continue doing what we normally do.
try rt.fs.stat(provision, stat_file_task, fd);
}

fn stat_file_task(_: *Runtime, stat: Stat, provision: *FileProvision) !void {
errdefer provision.context.respond(.{
.status = .@"Internal Server Error",
.mime = Mime.HTML,
.body = "",
}) catch unreachable;

// Set file size.
provision.file_size = stat.size;
log.err("file size: {d}", .{provision.file_size});

// generate the etag and attach it to the response.
var hash = std.hash.Wyhash.init(0);
hash.update(std.mem.asBytes(&stat.size));
if (stat.modified) |modified| {
hash.update(std.mem.asBytes(&modified.seconds));
hash.update(std.mem.asBytes(&modified.nanos));
}
const etag_hash = hash.final();

const calc_etag = try std.fmt.allocPrint(
provision.context.allocator,
"\"{d}\"",
.{etag_hash},
);

try provision.response.headers.add("ETag", calc_etag);

// If we have an ETag on the request...
if (provision.request.headers.get("If-None-Match")) |etag| {
if (std.mem.eql(u8, etag, calc_etag)) {
// If the ETag matches.
try provision.context.respond(.{
.status = .@"Not Modified",
.mime = Mime.HTML,
.body = "",
});
return;
}
}

try provision.context.respond_headers_only(.{
.status = .OK,
.mime = provision.mime,
}, stat.size, provision, start_stream_file_task);
}

fn start_stream_file_task(rt: *Runtime, success: bool, provision: *FileProvision) !void {
errdefer {
std.posix.close(provision.fd);
provision.context.close() catch unreachable;
}

if (!success) {
log.warn("starting file stream failed!", .{});
try provision.context.close();
return;
}

// start streaming...
try rt.fs.read(
provision,
read_file_task,
fd,
provision.fd,
provision.buffer,
0,
provision.rd_offset,
);
}

fn read_file_task(rt: *Runtime, result: i32, provision: *FileProvision) !void {
errdefer provision.context.respond(.{
.status = .@"Internal Server Error",
.mime = Mime.HTML,
.body = "",
}) catch unreachable;
fn read_file_task(_: *Runtime, result: i32, provision: *FileProvision) !void {
errdefer {
std.posix.close(provision.fd);
provision.context.close() catch unreachable;
}

if (result <= 0) {
// If we are done reading...
try rt.fs.close(
provision,
close_file_task,
provision.fd,
);
if (result <= -1) {
log.warn("read file task failed", .{});
std.posix.close(provision.fd);
try provision.context.close();
return;
}

const length: usize = @intCast(result);
try provision.list.appendSlice(provision.buffer[0..length]);
provision.rd_offset += length;
log.debug("current offset: {d} | fd: {}", .{ provision.rd_offset, provision.fd });

if (provision.rd_offset >= provision.file_size) {
log.debug("done streaming file | rd off: {d} | f size: {d} | result: {d}", .{
provision.rd_offset,
provision.file_size,
result,
});

// TODO: This needs to be a setting you pass in to the router.
//
//if (provision.list.items.len > 1024 * 1024 * 4) {
// provision.context.respond(.{
// .status = .@"Content Too Large",
// .mime = Mime.HTML,
// .body = "File Too Large",
// });
// return;
//}
std.posix.close(provision.fd);
try provision.context.send_then_recv(provision.buffer[0..length]);
} else {
try provision.context.send_then(provision.buffer[0..length], provision, send_file_task);
}
}

fn send_file_task(rt: *Runtime, success: bool, provision: *FileProvision) !void {
errdefer {
std.posix.close(provision.fd);
provision.context.close() catch unreachable;
}

provision.offset += length;
if (!success) {
log.warn("send file stream failed!", .{});
std.posix.close(provision.fd);
try provision.context.close();
return;
}

// continue streaming..
try rt.fs.read(
provision,
read_file_task,
provision.fd,
provision.buffer,
provision.offset,
provision.rd_offset,
);
}

fn close_file_task(_: *Runtime, _: void, provision: *FileProvision) !void {
try provision.context.respond(.{
.status = .OK,
.mime = provision.mime,
.body = provision.list.items[0..],
});
}

pub fn serve_fs_dir(self: *Self, comptime url_path: []const u8, comptime dir_path: []const u8) !void {
assert(!self.locked);
const arena = self.arena.allocator();
Expand All @@ -149,10 +212,6 @@ pub fn Router(comptime Server: type) type {

const route = Route.init().get(slice, struct {
fn handler_fn(ctx: *Context, real_dir: *const []const u8) !void {
// TODO: Add caching support. We shouldn't need to resend files
// all the time, especially if the user has gotten them before
// and has an ETag.

if (ctx.captures.len == 0) {
try ctx.respond(.{
.status = .@"Not Found",
Expand Down Expand Up @@ -197,14 +256,14 @@ pub fn Router(comptime Server: type) type {
provision.* = .{
.mime = mime,
.context = ctx,
.request = ctx.request,
.response = ctx.response,
.fd = -1,
.offset = 0,
.list = std.ArrayList(u8).init(ctx.allocator),
.file_size = 0,
.rd_offset = 0,
.buffer = ctx.provision.buffer,
};

// We also need to support chunked encoding.
// It makes a lot more sense for files atleast.
try ctx.runtime.fs.open(
provision,
open_file_task,
Expand Down
Loading

0 comments on commit 51296f2

Please sign in to comment.