Skip to content

Commit

Permalink
refactor(jsonrpc): remove Jsonrpc.Message.t
Browse files Browse the repository at this point in the history
Signed-off-by: Rudi Grinberg <[email protected]>

ps-id: 3A978DCB-5013-478D-9849-22E2A3F7E4A0
  • Loading branch information
rgrinberg committed Jun 11, 2022
1 parent cccc3c7 commit 5fdb9b9
Show file tree
Hide file tree
Showing 10 changed files with 143 additions and 183 deletions.
4 changes: 1 addition & 3 deletions jsonrpc-fiber/src/import.ml
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,10 @@ end
include struct
open Jsonrpc
module Id = Id
module Message = Message
module Response = Response
module Request = Request
module Notification = Notification

type nonrec packet = packet
module Packet = Packet
end

module Json = struct
Expand Down
44 changes: 15 additions & 29 deletions jsonrpc-fiber/src/jsonrpc_fiber.ml
Original file line number Diff line number Diff line change
Expand Up @@ -58,17 +58,18 @@ end
module Make (Chan : sig
type t

val send : t -> packet list -> unit Fiber.t
val send : t -> Packet.t list -> unit Fiber.t

val recv : t -> packet option Fiber.t
val recv : t -> Packet.t option Fiber.t

val close : t -> [ `Read | `Write ] -> unit Fiber.t
end) =
struct
type 'state t =
{ chan : Chan.t
; on_request : ('state, Id.t) context -> (Reply.t * 'state) Fiber.t
; on_notification : ('state, unit) context -> (Notify.t * 'state) Fiber.t
; on_request : ('state, Request.t) context -> (Reply.t * 'state) Fiber.t
; on_notification :
('state, Notification.t) context -> (Notify.t * 'state) Fiber.t
; pending : (Response.t, [ `Stopped ]) result Fiber.Ivar.t Id.Table.t
; stopped : unit Fiber.Ivar.t
; name : string
Expand All @@ -78,7 +79,7 @@ struct
; mutable pending_requests_stopped : bool
}

and ('a, 'id) context = 'a t * 'id Message.t
and ('a, 'message) context = 'a t * 'message

module Context = struct
type nonrec ('a, 'id) t = ('a, 'id) context
Expand Down Expand Up @@ -175,21 +176,11 @@ struct
| None -> Fiber.return ()
| Some packet -> (
match packet with
| Message r -> on_message r
| Notification r -> on_notification r
| Request r -> on_request r
| Response r ->
let* () = Fiber.Pool.task later ~f:(fun () -> on_response r) in
loop ())
and on_message (r : _ Message.t) =
log t (fun () ->
let what =
match r.id with
| None -> "notification"
| Some _ -> "request"
in
Log.msg ("received " ^ what) [ ("r", Message.yojson_of_either r) ]);
match r.id with
| Some id -> on_request { r with id }
| None -> on_notification { r with id = () }
and on_response r =
let log (what : string) =
log t (fun () ->
Expand All @@ -203,7 +194,7 @@ struct
log "acknowledged";
Id.Table.remove t.pending r.id;
Fiber.Ivar.fill ivar (Ok r)
and on_request (r : Id.t Message.t) =
and on_request (r : Request.t) =
let* result =
let sent = ref false in
Fiber.map_reduce_errors
Expand Down Expand Up @@ -241,7 +232,7 @@ struct
| Error () -> ())
in
loop ()
and on_notification (r : unit Message.t) : unit Fiber.t =
and on_notification (r : Notification.t) : unit Fiber.t =
let* res = Fiber.collect_errors (fun () -> t.on_notification (t, r)) in
match res with
| Ok (next, state) -> (
Expand All @@ -252,7 +243,7 @@ struct
| Error errors ->
Format.eprintf
"Uncaught error when handling notification:@.%[email protected]:@.%s@." Json.pp
(Notification.yojson_of_t (Notification.of_message r))
(Notification.yojson_of_t r)
(Dyn.to_string (Dyn.list Exn_with_backtrace.to_dyn errors));
loop ()
in
Expand All @@ -274,7 +265,7 @@ struct
let notification t (n : Notification.t) =
Fiber.of_thunk (fun () ->
check_running t;
Chan.send t.chan [ Message (Jsonrpc.Notification.to_message_either n) ])
Chan.send t.chan [ Notification n ])

let register_request_ivar t id ivar =
match Id.Table.find_opt t.pending id with
Expand All @@ -290,10 +281,7 @@ struct
let request t (req : Request.t) =
Fiber.of_thunk (fun () ->
check_running t;
let* () =
let req = { req with Message.id = Some req.id } in
Chan.send t.chan [ Message req ]
in
let* () = Chan.send t.chan [ Request req ] in
let ivar = Fiber.Ivar.create () in
register_request_ivar t req.id ivar;
read_request_ivar req ivar)
Expand Down Expand Up @@ -326,11 +314,9 @@ struct
List.fold_left pending ~init:([], []) ~f:(fun (pending, ivars) ->
function
| `Notification n ->
( Jsonrpc.Message (Notification.to_message_either n) :: pending
, ivars )
(Jsonrpc.Packet.Notification n :: pending, ivars)
| `Request ((r : Request.t), ivar) ->
( Jsonrpc.Message { r with Message.id = Some r.id } :: pending
, (r.id, ivar) :: ivars ))
(Jsonrpc.Packet.Request r :: pending, (r.id, ivar) :: ivars))
in
List.iter ivars ~f:(fun (id, ivar) -> register_request_ivar t id ivar);
Chan.send t.chan pending)
Expand Down
13 changes: 7 additions & 6 deletions jsonrpc-fiber/src/jsonrpc_fiber.mli
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,20 @@ exception Stopped of Jsonrpc.Request.t
module Make (Chan : sig
type t

val send : t -> Jsonrpc.packet list -> unit Fiber.t
val send : t -> Jsonrpc.Packet.t list -> unit Fiber.t

val recv : t -> Jsonrpc.packet option Fiber.t
val recv : t -> Jsonrpc.Packet.t option Fiber.t

val close : t -> [ `Read | `Write ] -> unit Fiber.t
end) : sig
type 'state t

module Context : sig
type ('state, 'req) t
type ('state, 'message) t

type 'a session

val message : (_, 'req) t -> 'req Jsonrpc.Message.t
val message : (_, 'message) t -> 'message

val state : ('a, _) t -> 'a

Expand All @@ -43,9 +43,10 @@ end) : sig

val create :
?on_request:
(('state, Jsonrpc.Id.t) Context.t -> (Reply.t * 'state) Fiber.t)
(('state, Jsonrpc.Request.t) Context.t -> (Reply.t * 'state) Fiber.t)
-> ?on_notification:
(('state, unit) Context.t -> (Notify.t * 'state) Fiber.t)
( ('state, Jsonrpc.Notification.t) Context.t
-> (Notify.t * 'state) Fiber.t)
-> name:string
-> Chan.t
-> 'state
Expand Down
54 changes: 24 additions & 30 deletions jsonrpc-fiber/test/jsonrpc_fiber_tests.ml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ open Fiber.O
open Fiber.Stream

module Stream_chan = struct
type t = Jsonrpc.packet In.t * Jsonrpc.packet Out.t
type t = Jsonrpc.Packet.t In.t * Jsonrpc.Packet.t Out.t

let close (_, o) what =
match what with
Expand Down Expand Up @@ -45,17 +45,16 @@ let%expect_test "start and stop server" =

let%expect_test "server accepts notifications" =
let notif =
{ Jsonrpc.Message.id = None
; method_ = "method"
{ Jsonrpc.Notification.method_ = "method"
; params = Some (`List [ `String "bar" ])
}
in
let run () =
let in_ = In.of_list [ Jsonrpc.Message notif ] in
let in_ = In.of_list [ Jsonrpc.Packet.Notification notif ] in
let on_notification c =
let n = Context.message c in
let state = Context.state c in
assert (notif = { n with id = None });
assert (notif = n);
print_endline "received notification";
Fiber.return (Notify.Stop, state)
in
Expand All @@ -79,27 +78,24 @@ let of_ref ref =
let%expect_test "serving requests" =
let id = `Int 1 in
let request =
{ Jsonrpc.Message.id = Some id
; method_ = "bla"
; params = Some (`List [ `Int 100 ])
}
{ Jsonrpc.Request.id; method_ = "bla"; params = Some (`List [ `Int 100 ]) }
in
let response_data = `String "response" in
let run () =
let responses = ref [] in
let in_ = In.of_list [ Jsonrpc.Message request ] in
let in_ = In.of_list [ Jsonrpc.Packet.Request request ] in
let on_request c =
let r = Context.message c in
let state = Context.state c in
assert (r = { request with id = r.id });
assert (r = request);
let response = Jsonrpc.Response.ok r.id response_data in
Fiber.return (Reply.now response, state)
in
let out = of_ref responses in
let jrpc = Jrpc.create ~name:"test" ~on_request (in_, out) () in
let+ () = Jrpc.run jrpc in
List.iter !responses ~f:(fun resp ->
let json = Jsonrpc.yojson_of_packet resp in
let json = Jsonrpc.Packet.yojson_of_t resp in
print_endline (Yojson.Safe.pretty_to_string ~std:false json))
in
Fiber_test.test Dyn.opaque run;
Expand All @@ -114,14 +110,14 @@ let%expect_test "concurrent requests" =
let print packet =
print_endline
(Yojson.Safe.pretty_to_string ~std:false
(Jsonrpc.yojson_of_packet packet))
(Jsonrpc.Packet.yojson_of_t packet))
in
let waiter chan =
let on_request c =
let self = Context.session c in
let request = Context.message c in
print_endline "waiter: received request";
print (Message { request with id = Some request.id });
print (Request request);
let response =
Reply.later (fun send ->
print_endline "waiter: sending response";
Expand All @@ -148,7 +144,7 @@ let%expect_test "concurrent requests" =
let on_request c =
print_endline "waitee: received request";
let request = Context.message c in
print (Message { request with id = Some request.id });
print (Request request);
let response =
Reply.later (fun send ->
let* () = send (Jsonrpc.Response.ok request.id (`Int 42)) in
Expand Down Expand Up @@ -208,33 +204,31 @@ let%expect_test "test from jsonrpc_test.ml" =
`Int !i
in
let on_request ctx =
let req = Context.message ctx in
let req : Jsonrpc.Request.t = Context.message ctx in
let state = Context.state ctx in
Fiber.return (Reply.now (Jsonrpc.Response.ok req.id (response ())), state)
in
let on_notification ctx =
let n = Context.message ctx in
let n : Jsonrpc.Notification.t = Context.message ctx in
if n.method_ = "raise" then failwith "special failure";
let json = Notification.yojson_of_t (Notification.of_message n) in
let json = Notification.yojson_of_t n in
print_endline ">> received notification";
print_json json;
Fiber.return (Jsonrpc_fiber.Notify.Continue, ())
in
let responses = ref [] in
let initial_requests =
let request ?params id method_ =
Jsonrpc.Request.create ?params ~id ~method_ ()
|> Jsonrpc.Request.to_message_either
let request ?params id method_ : Jsonrpc.Packet.t =
Request (Jsonrpc.Request.create ?params ~id ~method_ ())
in
let notification ?params method_ =
Jsonrpc.Notification.create ?params ~method_ ()
|> Jsonrpc.Notification.to_message_either
let notification ?params method_ : Jsonrpc.Packet.t =
Notification (Jsonrpc.Notification.create ?params ~method_ ())
in
[ Message (request (`Int 10) "foo")
; Message (request (`String "testing") "bar")
; Message (notification "notif1")
; Message (notification "notif2")
; Message (notification "raise")
[ request (`Int 10) "foo"
; request (`String "testing") "bar"
; notification "notif1"
; notification "notif2"
; notification "raise"
]
in
let reqs_in, reqs_out = pipe () in
Expand All @@ -254,7 +248,7 @@ let%expect_test "test from jsonrpc_test.ml" =
Fiber.fork_and_join_unit write_reqs (fun () -> Jrpc.run session));
List.rev !responses
|> List.iter ~f:(fun packet ->
let json = Jsonrpc.yojson_of_packet packet in
let json = Jsonrpc.Packet.yojson_of_t packet in
print_json json);
[%expect
{|
Expand Down
Loading

0 comments on commit 5fdb9b9

Please sign in to comment.