Skip to content

Commit

Permalink
[http3-server] pass request body to ASGI app
Browse files Browse the repository at this point in the history
  • Loading branch information
jlaine committed Aug 10, 2019
1 parent 3f38ca1 commit d48e26a
Showing 1 changed file with 95 additions and 81 deletions.
176 changes: 95 additions & 81 deletions examples/http3-server.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from aioquic.buffer import Buffer
from aioquic.h0.connection import H0Connection
from aioquic.h3.connection import H3Connection
from aioquic.h3.events import RequestReceived
from aioquic.h3.events import DataReceived, Event, RequestReceived
from aioquic.quic.configuration import QuicConfiguration
from aioquic.quic.connection import NetworkAddress, QuicConnection
from aioquic.quic.events import QuicEvent
Expand All @@ -40,6 +40,48 @@
HttpConnection = Union[H0Connection, H3Connection]


class HttpRequestHandler:
def __init__(
self,
*,
connection: HttpConnection,
scope: Dict,
send_pending: Callable[[], None],
stream_id: int,
):
self.connection = connection
self.queue: asyncio.Queue[Dict] = asyncio.Queue()
self.scope = scope
self.send_pending = send_pending
self.stream_id = stream_id

async def run_asgi(self, app: AsgiApplication) -> None:
await application(self.scope, self.receive, self.send)

self.connection.send_data(stream_id=self.stream_id, data=b"", end_stream=True)
self.send_pending()

async def receive(self) -> Dict:
return await self.queue.get()

async def send(self, message: Dict):
if message["type"] == "http.response.start":
self.connection.send_headers(
stream_id=self.stream_id,
headers=[
(b":status", str(message["status"]).encode("ascii")),
(b"server", b"aioquic"),
(b"date", formatdate(time.time(), usegmt=True).encode()),
]
+ [(k, v) for k, v in message["headers"]],
)
elif message["type"] == "http.response.body":
self.connection.send_data(
stream_id=self.stream_id, data=message["body"], end_stream=False
)
self.send_pending()


class HttpServer(asyncio.DatagramProtocol):
def __init__(
self,
Expand Down Expand Up @@ -141,6 +183,7 @@ def datagram_received(self, data: Union[bytes, Text], addr: NetworkAddress) -> N
class HttpServerProtocol(QuicConnectionProtocol):
def __init__(self, quic: QuicConnection, server: HttpServer):
super().__init__(quic)
self._handlers: Dict[int, HttpRequestHandler] = {}
self._http: Optional[HttpConnection] = None
self._server = server

Expand All @@ -165,15 +208,57 @@ def _handle_event(self, event: QuicEvent):
#  pass event to the HTTP layer
if self._http is not None:
for http_event in self._http.handle_event(event):
if isinstance(http_event, RequestReceived):
asyncio.ensure_future(
handle_http_request(
self._server._application,
self._http,
http_event,
self._send_pending,
)
)
self.handle_http_event(http_event)

def handle_http_event(self, event: Event) -> None:
if isinstance(event, RequestReceived):
headers = []
raw_path = b""
method = ""
for header, value in event.headers:
if header == b":authority":
headers.append((b"host", value))
elif header == b":method":
method = value.decode("utf8")
elif header == b":path":
raw_path = value
elif header and not header.startswith(b":"):
headers.append((header, value))

if b"?" in raw_path:
path_bytes, query_string = raw_path.split(b"?", maxsplit=1)
else:
path_bytes, query_string = raw_path, b""

scope = {
"headers": headers,
"http_version": "0.9" if isinstance(self._http, H0Connection) else "3",
"method": method,
"path": path_bytes.decode("utf8"),
"query_string": query_string,
"raw_path": raw_path,
"root_path": "",
"scheme": "https",
"type": "http",
}

handler = HttpRequestHandler(
connection=self._http,
scope=scope,
send_pending=self._send_pending,
stream_id=event.stream_id,
)
self._handlers[event.stream_id] = handler
asyncio.ensure_future(handler.run_asgi(self._server._application))
elif isinstance(event, DataReceived):
handler = self._handlers[event.stream_id]
handler.queue.put_nowait(
{
"type": "http.request",
"body": event.data,
"more_body": not event.stream_ended,
}
)


class SessionTicketStore:
Expand All @@ -191,77 +276,6 @@ def pop(self, label: bytes) -> Optional[SessionTicket]:
return self.tickets.pop(label, None)


async def handle_http_request(
application: AsgiApplication,
connection: HttpConnection,
event: aioquic.h3.events.RequestReceived,
send_pending: Callable[[], None],
) -> None:
"""
Pass HTTP requests to the ASGI application.
"""
stream_id = event.stream_id

headers = []
raw_path = b""
method = ""
for header, value in event.headers:
if header == b":authority":
headers.append((b"host", value))
elif header == b":method":
method = value.decode("utf8")
elif header == b":path":
raw_path = value
elif header and not header.startswith(b":"):
headers.append((header, value))

if b"?" in raw_path:
path_bytes, query_string = raw_path.split(b"?", maxsplit=1)
else:
path_bytes, query_string = raw_path, b""

scope = {
"headers": headers,
"http_version": "0.9" if isinstance(connection, H0Connection) else "3",
"method": method,
"path": path_bytes.decode("utf8"),
"query_string": query_string,
"raw_path": raw_path,
"root_path": "",
"scheme": "https",
"type": "http",
}

# FIXME: actually handle request body
queue: asyncio.Queue[Dict] = asyncio.Queue()
queue.put_nowait({"type": "http.request", "body": b"", "more_body": False})

async def receive():
return await queue.get()

async def send(event):
if event["type"] == "http.response.start":
connection.send_headers(
stream_id=stream_id,
headers=[
(b":status", str(event["status"]).encode("ascii")),
(b"server", b"aioquic"),
(b"date", formatdate(time.time(), usegmt=True).encode()),
]
+ [(k, v) for k, v in event["headers"]],
)
elif event["type"] == "http.response.body":
connection.send_data(
stream_id=stream_id, data=event["body"], end_stream=False
)
send_pending()

await application(scope, receive, send)

connection.send_data(stream_id=stream_id, data=b"", end_stream=True)
send_pending()


if __name__ == "__main__":
parser = argparse.ArgumentParser(description="QUIC server")
parser.add_argument(
Expand Down

0 comments on commit d48e26a

Please sign in to comment.