diff --git a/src/aioquic/h3/connection.py b/src/aioquic/h3/connection.py index 13b456f40..310e1a2f1 100644 --- a/src/aioquic/h3/connection.py +++ b/src/aioquic/h3/connection.py @@ -338,6 +338,9 @@ def create_webtransport_stream( self._quic.send_stream_data(stream_id, encode_uint_var(session_id)) else: stream_id = self._quic.get_next_available_stream_id() + self._log_stream_type( + stream_id=stream_id, stream_type=StreamType.WEBTRANSPORT + ) self._quic.send_stream_data( stream_id, encode_uint_var(FrameType.WEBTRANSPORT_STREAM) @@ -409,7 +412,7 @@ def send_push_promise(self, stream_id: int, headers: Headers) -> int: ) #  create push stream - push_stream_id = self._create_uni_stream(StreamType.PUSH) + push_stream_id = self._create_uni_stream(StreamType.PUSH, push_id=push_id) self._quic.send_stream_data(push_stream_id, encode_uint_var(push_id)) return push_stream_id @@ -485,11 +488,16 @@ def send_headers( stream_id, encode_frame(FrameType.HEADERS, frame_data), end_stream ) - def _create_uni_stream(self, stream_type: int) -> int: + def _create_uni_stream( + self, stream_type: int, push_id: Optional[int] = None + ) -> int: """ Create an unidirectional stream of the given type. """ stream_id = self._quic.get_next_available_stream_id(is_unidirectional=True) + self._log_stream_type( + push_id=push_id, stream_id=stream_id, stream_type=stream_type + ) self._quic.send_stream_data(stream_id, encode_uint_var(stream_type)) return stream_id @@ -710,6 +718,28 @@ def _init_connection(self) -> None: StreamType.QPACK_DECODER ) + def _log_stream_type( + self, stream_id: int, stream_type: int, push_id: Optional[int] = None + ) -> None: + if self._quic_logger is not None: + type_name = { + 0: "control", + 1: "push", + 2: "qpack_encoder", + 3: "qpack_decoder", + 0x54: "webtransport", # NOTE: not standardized yet + }.get(stream_type, "unknown") + + data = {"new": type_name, "stream_id": stream_id} + if push_id is not None: + data["associated_push_id"] = push_id + + self._quic_logger.log_event( + category="http", + event="stream_type_set", + data=data, + ) + def _receive_datagram(self, data: bytes) -> List[H3Event]: """ Handle a datagram. @@ -802,6 +832,10 @@ def _receive_request_or_push_data( frame_data = stream.buffer[consumed:] stream.buffer = b"" + self._log_stream_type( + stream_id=stream.stream_id, stream_type=StreamType.WEBTRANSPORT + ) + if frame_data or stream_ended: http_events.append( WebTransportStreamDataReceived( @@ -905,6 +939,12 @@ def _receive_stream_data_uni( ) self._peer_encoder_stream_id = stream.stream_id + # for PUSH, logging is performed once the push_id is known + if stream.stream_type != StreamType.PUSH: + self._log_stream_type( + stream_id=stream.stream_id, stream_type=stream.stream_type + ) + if stream.stream_type == StreamType.CONTROL: if stream_ended: raise ClosedCriticalStream("Closing control stream is not allowed") @@ -928,6 +968,12 @@ def _receive_stream_data_uni( break consumed = buf.tell() + self._log_stream_type( + push_id=stream.push_id, + stream_id=stream.stream_id, + stream_type=stream.stream_type, + ) + # remove processed data from buffer stream.buffer = stream.buffer[consumed:]