Skip to content

Commit

Permalink
* FIX [mqtt_quic] Remove the finish for aios to notify disconnect.
Browse files Browse the repository at this point in the history
  • Loading branch information
wanghaEMQ authored and JaylinYu committed Jul 9, 2023
1 parent 4a37177 commit 8edffd8
Show file tree
Hide file tree
Showing 2 changed files with 2 additions and 132 deletions.
15 changes: 1 addition & 14 deletions src/mqtt/protocol/mqtt/mqtt_quic.c
Original file line number Diff line number Diff line change
Expand Up @@ -1222,7 +1222,7 @@ mqtt_quic_sock_fini(void *arg)
{
mqtt_sock_t *s = arg;
nni_aio *aio;
nni_msg *tmsg = NULL, *msg = NULL;
nni_msg *msg = NULL;
size_t count = 0;
/*
#if defined(NNG_SUPP_SQLITE) && defined(NNG_HAVE_MQTT_BROKER)
Expand All @@ -1247,19 +1247,6 @@ mqtt_quic_sock_fini(void *arg)
nni_lmq_fini(s->ack_lmq);
nng_free(s->ack_lmq, sizeof(nni_lmq));
}
// emulate disconnect notify msg as a normal publish
while ((aio = nni_list_first(&s->recv_queue)) != NULL) {
// Pipe was closed. just push an error back to the
// entire socket, because we only have one pipe
nni_list_remove(&s->recv_queue, aio);
nni_aio_set_msg(aio, tmsg);
// only return pipe closed error once for notification
// sync action to avoid NULL conn param
count == 0 ? nni_aio_finish_sync(aio, NNG_ECONNSHUT, 0)
: nni_aio_finish_error(aio, NNG_ECLOSED);
// there should be no msg waiting
count++;
}
while ((aio = nni_list_first(&s->send_queue)) != NULL) {
nni_list_remove(&s->send_queue, aio);
msg = nni_aio_get_msg(aio);
Expand Down
119 changes: 1 addition & 118 deletions src/mqtt/protocol/mqtt/mqttv5_quic.c
Original file line number Diff line number Diff line change
Expand Up @@ -682,10 +682,10 @@ mqtt_quic_data_strm_recv_cb(void *arg)
}
if (nni_atomic_get_bool(&p->closed)) {
//free msg and dont return data when pipe is closed.
nni_mtx_unlock(&p->lk);
if (msg) {
nni_msg_free(msg);
}
nni_mtx_unlock(&p->lk);
return;
}
nni_mqtt_msg_proto_data_alloc(msg);
Expand Down Expand Up @@ -1257,19 +1257,6 @@ mqtt_quic_sock_fini(void *arg)
nni_lmq_fini(s->ack_lmq);
nng_free(s->ack_lmq, sizeof(nni_lmq));
}
// emulate disconnect notify msg as a normal publish
while ((aio = nni_list_first(&s->recv_queue)) != NULL) {
// Pipe was closed. just push an error back to the
// entire socket, because we only have one pipe
nni_list_remove(&s->recv_queue, aio);
nni_aio_set_msg(aio, tmsg);
// only return pipe closed error once for notification
// sync action to avoid NULL conn param
count == 0 ? nni_aio_finish_sync(aio, NNG_ECONNSHUT, 0)
: nni_aio_finish_error(aio, NNG_ECLOSED);
// there should be no msg waiting
count++;
}
while ((aio = nni_list_first(&s->send_queue)) != NULL) {
nni_list_remove(&s->send_queue, aio);
msg = nni_aio_get_msg(aio);
Expand Down Expand Up @@ -1593,11 +1580,6 @@ quic_mqtt_stream_stop(void *arg)
nni_aio_fini(&p->recv_aio);
nni_aio_fini(&p->rep_aio);

/*
#if defined(NNG_HAVE_MQTT_BROKER) && defined(NNG_SUPP_SQLITE)
nni_id_map_fini(&p->sent_unack);
#endif
*/
nni_id_map_fini(&p->recv_unack);
nni_id_map_fini(&p->sent_unack);
if (s->multi_stream)
Expand Down Expand Up @@ -1943,105 +1925,6 @@ nng_mqttv5_quic_client_open_conf(nng_socket *sock, const char *url, conf_quic *c
return rv;
}

/**
* init an AIO for Acknoledgement message only, in order to make QoS/connect truly asychrounous
* For QoS 0 message, we do not care the result of sending
* valid with Connack + puback + pubcomp
* return 0 if set callback sucessfully
*/
// int
// nng_mqtt_quic_ack_callback_set(nng_socket *sock, void (*cb)(void *), void *arg)
// {
// nni_sock *nsock = NULL;
// nni_aio *aio;
//
// nni_sock_find(&nsock, sock->id);
// if (nsock) {
// mqtt_sock_t *mqtt_sock = nni_sock_proto_data(nsock);
// if ((aio = NNI_ALLOC_STRUCT(aio)) == NULL) {
// return (NNG_ENOMEM);
// }
// nni_aio_init(aio, (nni_cb) cb, aio);
// nni_aio_set_prov_data(aio, arg);
// mqtt_sock->ack_aio = aio;
// mqtt_sock->ack_lmq = nni_alloc(sizeof(nni_lmq));
// nni_lmq_init(mqtt_sock->ack_lmq, NNG_MAX_RECV_LMQ);
// } else {
// nni_sock_rele(nsock);
// return -1;
// }
// nni_sock_rele(nsock);
// return 0;
// }

// int
// nng_mqtt_quic_set_connect_cb(nng_socket *sock, int (*cb)(void *, void *), void *arg)
// {
// nni_sock *nsock = NULL;
//
// nni_sock_find(&nsock, sock->id);
// if (nsock) {
// mqtt_sock_t *mqtt_sock = nni_sock_proto_data(nsock);
// mqtt_sock->cb.connect_cb = cb;
// mqtt_sock->cb.connarg = arg;
// } else {
// return -1;
// }
// nni_sock_rele(nsock);
// return 0;
// }

// int
// nng_mqtt_quic_set_disconnect_cb(nng_socket *sock, int (*cb)(void *, void *), void *arg)
// {
// nni_sock *nsock = NULL;
//
// nni_sock_find(&nsock, sock->id);
// if (nsock) {
// mqtt_sock_t *mqtt_sock = nni_sock_proto_data(nsock);
// mqtt_sock->cb.disconnect_cb = cb;
// mqtt_sock->cb.discarg = arg;
// } else {
// return -1;
// }
// nni_sock_rele(nsock);
// return 0;
// }

// int
// nng_mqtt_quic_set_msg_recv_cb(nng_socket *sock, int (*cb)(void *, void *), void *arg)
// {
// nni_sock *nsock = NULL;
//
// nni_sock_find(&nsock, sock->id);
// if (nsock) {
// mqtt_sock_t *mqtt_sock = nni_sock_proto_data(nsock);
// mqtt_sock->cb.msg_recv_cb = cb;
// mqtt_sock->cb.recvarg = arg;
// } else {
// return -1;
// }
// nni_sock_rele(nsock);
// return 0;
// }
//
// int
// nng_mqtt_quic_set_msg_send_cb(nng_socket *sock, int (*cb)(void *, void *), void *arg)
// {
// nni_sock *nsock = NULL;
//
// nni_sock_find(&nsock, sock->id);
// if (nsock) {
// mqtt_sock_t *mqtt_sock = nni_sock_proto_data(nsock);
// mqtt_sock->cb.msg_send_cb = cb;
// mqtt_sock->cb.sendarg = arg;
// } else {
// return -1;
// }
// nni_sock_rele(nsock);
// return 0;
// }

static int
nng_mqtt_quic_set_config(nng_socket *sock, void *node)
{
Expand Down

0 comments on commit 8edffd8

Please sign in to comment.