Skip to content

Commit

Permalink
fix: more side effects inside fiber
Browse files Browse the repository at this point in the history
  • Loading branch information
rgrinberg committed Feb 12, 2022
1 parent 762acb8 commit 893a15c
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 27 deletions.
36 changes: 19 additions & 17 deletions jsonrpc-fiber/src/jsonrpc_fiber.ml
Original file line number Diff line number Diff line change
Expand Up @@ -228,15 +228,16 @@ struct
(Dyn.to_string (Dyn.list Exn_with_backtrace.to_dyn errors));
loop ()
in
t.running <- true;
let* () =
Fiber.fork_and_join_unit
(fun () ->
let* () = loop () in
Fiber.Pool.stop later)
(fun () -> Fiber.Pool.run later)
in
close t
Fiber.of_thunk (fun () ->
t.running <- true;
let* () =
Fiber.fork_and_join_unit
(fun () ->
let* () = loop () in
Fiber.Pool.stop later)
(fun () -> Fiber.Pool.run later)
in
close t)

let on_notification_fail ctx =
let state = Context.state ctx in
Expand Down Expand Up @@ -288,14 +289,15 @@ struct
| Error `Stopped -> raise (Stopped req)

let request t (req : Message.request) =
check_running t;
let* () =
let req = { req with Message.id = Some req.id } in
Chan.send t.chan [ Message req ]
in
let ivar = Fiber.Ivar.create () in
register_request_ivar t req.id ivar;
read_request_ivar req ivar
Fiber.of_thunk (fun () ->
check_running t;
let* () =
let req = { req with Message.id = Some req.id } in
Chan.send t.chan [ Message req ]
in
let ivar = Fiber.Ivar.create () in
register_request_ivar t req.id ivar;
read_request_ivar req ivar)

module Batch = struct
type response =
Expand Down
21 changes: 11 additions & 10 deletions jsonrpc-fiber/test/jsonrpc_fiber_tests.ml
Original file line number Diff line number Diff line change
Expand Up @@ -171,18 +171,19 @@ let%expect_test "concurrent requests" =
let waiter_in, waitee_out = pipe () in
let waitee = waitee (waitee_in, waitee_out) in
let waiter = waiter (waiter_in, waiter_out) in
let initial_request () =
let request =
Jsonrpc.Message.create ~id:(`String "initial") ~method_:"init" ()
let run () =
let initial_request () =
let request =
Jsonrpc.Message.create ~id:(`String "initial") ~method_:"init" ()
in
print_endline "initial: waitee requests from waiter";
let+ resp = Jrpc.request waitee request in
print_endline "initial request response:";
print (Response resp)
in
print_endline "initial: waitee requests from waiter";
let+ resp = Jrpc.request waitee request in
print_endline "initial request response:";
print (Response resp)
Fiber.all_concurrently_unit
[ Jrpc.run waitee; initial_request (); Jrpc.run waiter ]
in
let all = [ Jrpc.run waiter; Jrpc.run waitee ] in
let all = initial_request () :: all in
let run () = Fiber.parallel_iter all ~f:Fun.id in
Fiber_test.test Dyn.opaque run;
[%expect
{|
Expand Down

0 comments on commit 893a15c

Please sign in to comment.