diff --git a/core/call.ml b/core/call.ml index c6ce74f..4b652e1 100644 --- a/core/call.ml +++ b/core/call.ml @@ -9,18 +9,54 @@ module Batch_stack = struct type t let t : t structure typ = Ctypes.structure "grpc_ocaml_run_batch_stack" - let recv_initial_metadata = Ctypes.field t "recv_initial_metadata" @@ ptr Metadata.raw - let recv_message = Ctypes.field t "recv_message" @@ ptr (ptr Byte_buffer.raw) - let status = Ctypes.field t "recv_status" @@ ptr Status.Code.t - let tr = Ctypes.field t "tr" @@ ptr Metadata.raw - let details = Ctypes.field t "details" @@ ptr Slice.t - let error_message = Ctypes.field t "error_message" @@ ptr Ctypes.string - let cancelled = Ctypes.field t "cancelled" @@ ptr int + let ops = Ctypes.field t "ops" @@ ptr Op.t + let ops_num = Ctypes.field t "ops_num" int + let send_metadata = Ctypes.field t "send_metadata" Metadata.t + let send_trailing_metadata = Ctypes.field t "send_trailing_metadata" Metadata.t + let send_status_details = Ctypes.field t "send_status_details" Slice.t + let recv_initial_metadata = Ctypes.field t "recv_initial_metadata" Metadata.t + let recv_message = Ctypes.field t "recv_message" @@ ptr Byte_buffer.raw + let status = Ctypes.field t "recv_status" @@ Status.Code.t + let recv_trailing_metadata = Ctypes.field t "recv_trailing_metadata" Metadata.t + let recv_status_details = Ctypes.field t "recv_status_details" Slice.t + let error_message = Ctypes.field t "error_message" string + let cancelled = Ctypes.field t "cancelled" int let () = seal t - let destroy = free + + let destroy t = + (* let destroy_md_with_entries md = *) + (* Metadata.destroy_entries *) + (* (md |->* Metadata.metadata) *) + (* (md |->* Metadata.count |> Unsigned.Size_t.to_int); *) + (* Metadata.destroy md *) + (* in *) + (* destroy_md_with_entries (t |-> send_metadata); *) + (* destroy_md_with_entries (t |-> send_trailing_metadata); *) + Metadata.destroy (t |-> recv_initial_metadata); + Metadata.destroy (t |-> recv_trailing_metadata) + ;; + + (* let recv_status_details = t |->* recv_status_details in *) + (* if Slice.start_ptr recv_status_details |> (not <@ is_null) *) + (* then Slice.unref recv_status_details *) + + (* let recv_message = t |->* recv_message in *) + (* if not @@ is_null recv_message then Byte_buffer.destroy recv_message; *) + (* CArray.from_ptr (t |->* ops) (t |->* ops_num) *) + (* |> CArray.iter *) + (* @@ fun op' -> *) + (* match op' @.* Op.op with *) + (* | Op.Type.SEND_MESSAGE -> *) + (* Byte_buffer.destroy *) + (* (op' @. Op.data |-> Op.Data.send_message |->* Op.Data.Send_message.send_message) *) + (* | _ -> () *) let make_tag_pair () = let t = malloc t in + Metadata.init (t |-> send_metadata); + Metadata.init (t |-> send_trailing_metadata); + Metadata.init (t |-> recv_initial_metadata); + Metadata.init (t |-> recv_trailing_metadata); let tag = to_voidp t in t, tag ;; @@ -107,17 +143,10 @@ let make wrap_raw ~cq ~call ~flags () ;; -let run_batch ?(tag = Ctypes.null) t ops = - let ops_size = List.length ops in - let ops_size' = Unsigned.Size_t.of_int ops_size in - let ops = - (CArray.(start <@ of_list T.Op.t) - <@ List.map (fun op -> - op @. T.Op.flags <-@ t.flags; - op)) - (Op.make_ops ops) +let run_batch t ?(tag = null) ops ops_num = + let err = + F.Call.start_batch t.call ops (Unsigned.Size_t.of_int ops_num) tag __reserved__ in - let err = F.Call.start_batch t.call ops ops_size' tag __reserved__ in let () = if err <> Error.OK then failwith @@ Printf.sprintf "prepare call error(%s)" @@ Error.show err @@ -128,20 +157,17 @@ let run_batch ?(tag = Ctypes.null) t ops = then ( let st = ev @.* T.Event.typ in failwith @@ Printf.sprintf "run_batch failed(%s)" @@ Completion_queue.Type.show st) - else ops, ops_size ;; let unary_request ?(metadata = []) ?message t = - let (`Recv_initial_metadata recv_initial_metadata' as rim) = - Op.make_ref_initial_metadata () - in - let (`Recv_message recv_message' as rm) = Op.make_ref_recv_message () in - let (`Recv_status_on_client recv_status_on_client' as rsoc) = - Op.make_ref_recv_status_on_client () - in let ops = let it = - [ `Send_initial_metadata metadata; `Send_close_from_client; rim; rm; rsoc ] + [ `Send_initial_metadata metadata + ; `Send_close_from_client + ; `Recv_initial_metadata + ; `Recv_message + ; `Recv_status_on_client + ] in let add_msg = match message with @@ -150,62 +176,84 @@ let unary_request ?(metadata = []) ?message t = in add_msg it in + let ops' = Op.make_ops ops t.flags in + let ops_num = List.length ops in + let recv_message = + Op.( + get ops' ops_num `Recv_message @. Data.recv_message + |-> Data.Recv_message.recv_message) + in + let recv_initial_metadata = + Op.(get ops' ops_num `Recv_initial_metadata @. Data.recv_initial_metadata) + in + let recv_status_on_client = + Op.(get ops' ops_num `Recv_status_on_client @. Data.recv_status_on_client) + in + let md = + recv_initial_metadata |-> Op.Data.Recv_initial_metadata.recv_initial_metadata + in + let status = recv_status_on_client |-> Op.Data.Recv_status_on_client.status in + let status_details = + recv_status_on_client |-> Op.Data.Recv_status_on_client.status_details + in + let error_string = + recv_status_on_client |-> Op.Data.Recv_status_on_client.error_string + in + let tr = recv_status_on_client |-> Op.Data.Recv_status_on_client.trailing_metadata in let stack, tag = Batch_stack.make_tag_pair () in - Batch_stack.( - stack |-> recv_message <-@ recv_message'.message; - stack |-> recv_initial_metadata <-@ recv_initial_metadata'.metadata; - stack |-> status <-@ recv_status_on_client'.status; - stack |-> details <-@ recv_status_on_client'.details; - stack |-> error_message <-@ recv_status_on_client'.error_message; - stack |-> tr <-@ recv_status_on_client'.metadata); - let ops, len = run_batch ~tag t ops in - let status = !@(recv_status_on_client'.status) in + stack |-> Batch_stack.ops <-@ ops'; + stack |-> Batch_stack.ops_num <-@ ops_num; + recv_message <-@ (stack |-> Batch_stack.recv_message); + md <-@ (stack |-> Batch_stack.recv_initial_metadata); + status <-@ (stack |-> Batch_stack.status); + status_details <-@ (stack |-> Batch_stack.recv_status_details); + error_string <-@ (stack |-> Batch_stack.error_message); + tr <-@ (stack |-> Batch_stack.recv_trailing_metadata); + let () = run_batch ~tag t ops' ops_num in + let status = !@status in let md = - let init_md = Metadata.to_bwd recv_initial_metadata'.metadata in - let md = Metadata.to_bwd recv_status_on_client'.metadata in - md @ init_md + let md = Metadata.to_bwd !@md in + let tr = Metadata.to_bwd !@tr in + tr @ md in - let status = Status.Code.to_bwd status in + let status = Status.Code.to_bwd !@status in + Fun.protect ~finally:(fun () -> Batch_stack.destroy stack) + @@ fun () -> match status with | `OK -> - let recv_message = Byte_buffer.to_string_opt !@(recv_message'.message) in - let () = - Batch_stack.destroy stack; - Op.destroy ops len - in + let recv_message = Byte_buffer.to_string_opt !@(!@recv_message) in Ok (recv_message, md) | #Status.Code.fail_bwd as status -> let details = - let slice = !@(recv_status_on_client'.details) in - if Slice.is_empty slice then None else Some (Slice.to_string slice) - in - let () = - Batch_stack.destroy stack; - Op.destroy ops len + let slice = !@status_details in + if is_null slice then None else Some (Slice.to_string !@slice) in Error (status, details, md) ;; let remote_read t = - let (`Recv_message recv_message as rm) = Op.make_ref_recv_message () in - let ops = [ rm ] in - let stack, tag = Batch_stack.make_tag_pair () in - stack |-> Batch_stack.recv_message <-@ recv_message.message; - let ops, len = run_batch ~tag t ops in - let message = Byte_buffer.to_string_opt !@(recv_message.message) in - let () = - Batch_stack.destroy stack; - Op.destroy ops len + let ops = [ `Recv_message ] in + let ops_num = 1 in + let ops' = Op.make_ops ops t.flags in + let recv_message = + Op.( + get ops' ops_num `Recv_message @. Data.recv_message + |-> Data.Recv_message.recv_message) in + let stack, tag = Batch_stack.make_tag_pair () in + stack |-> Batch_stack.ops <-@ ops'; + stack |-> Batch_stack.ops_num <-@ ops_num; + recv_message <-@ (stack |-> Batch_stack.recv_message); + Fun.protect ~finally:(fun () -> Batch_stack.destroy stack) + @@ fun () -> + let () = run_batch ~tag t ops' ops_num in + let message = Byte_buffer.to_string_opt !@(!@recv_message) in message ;; (** send unary response: run RECV_CLOSE_ON_SERVER, SEND_INITIAL_METADATA and SEND_STATUS_FROM_SERVER ops *) let unary_response ?(code = `OK) ?details ?(md = []) ?(tr = []) t res = let code = Status.Code.of_bwd code in - let (`Recv_close_on_server recv_close_on_server as rcos) = - Op.make_ref_recv_close_on_server () - in let ops = let add_msg = match res with @@ -215,17 +263,44 @@ let unary_response ?(code = `OK) ?details ?(md = []) ?(tr = []) t res = add_msg [ `Send_initial_metadata md ; `Send_status_from_server Status.{ code; details; metadata = tr } - ; rcos + ; `Recv_close_on_server ] in + let ops_num = List.length ops in + let ops' = Op.make_ops ops t.flags in let stack, tag = Batch_stack.make_tag_pair () in - stack |-> Batch_stack.cancelled <-@ recv_close_on_server.cancelled; - let ops, len = run_batch ~tag t ops in - let closed_on_server = deref @@ recv_close_on_server.cancelled in - let () = - Batch_stack.destroy stack; - Op.destroy ops len + let send_initial_metadata = + Op.(get ops' ops_num `Send_initial_metadata @. Data.send_initial_metadata) + in + let send_status_from_server = + Op.(get ops' ops_num `Send_status_from_server @. Data.send_status_from_server) + in + let send_md = send_initial_metadata |-> Op.Data.Send_initial_metadata.metadata in + let send_md_count = send_initial_metadata |-> Op.Data.Send_initial_metadata.count in + let send_metadata = stack |-> Batch_stack.send_metadata in + send_metadata |-> Metadata.metadata <-@ !@send_md; + send_metadata |-> Metadata.count <-@ !@send_md_count; + (* let tr'' = Metadata.make tr in *) + let send_tr = + send_status_from_server |->* Op.Data.Send_status_from_server.trailing_metadata + in + let send_tr_count = + send_status_from_server |->* Op.Data.Send_status_from_server.trailing_metadata_count + in + let send_trailing_metadata = stack |-> Batch_stack.send_trailing_metadata in + send_trailing_metadata |-> Metadata.metadata <-@ send_tr; + send_trailing_metadata |-> Metadata.count <-@ send_tr_count; + let recv_close_on_server = + Op.(get ops' ops_num `Recv_close_on_server @. Data.recv_close_on_server) in + let cancelled = recv_close_on_server |-> Op.Data.Recv_close_on_server.cancelled in + stack |-> Batch_stack.ops <-@ ops'; + stack |-> Batch_stack.ops_num <-@ ops_num; + cancelled <-@ (stack |-> Batch_stack.cancelled); + Fun.protect ~finally:(fun () -> Batch_stack.destroy stack) + @@ fun () -> + let () = run_batch ~tag t ops' ops_num in + let closed_on_server = !@(!@cancelled) in if closed_on_server <= 0 then Log.message __FILE__ __LINE__ `Info "not (properly) closed request by server" ;; diff --git a/core/import.ml b/core/import.ml index 5d6bd5c..af470c7 100644 --- a/core/import.ml +++ b/core/import.ml @@ -17,7 +17,8 @@ let[@inline] malloc ?finalise t = ptr ;; -let[@inline] free t = F.Alloc.free @@ Ctypes.to_voidp t +let[@inline] free v = F.Alloc.free @@ Ctypes.to_voidp v +let[@inline] nullof t = Ctypes.from_voidp t Ctypes.null (** [NULL] *) let __reserved__ = Ctypes.null diff --git a/core/metadata.ml b/core/metadata.ml index 1062ca9..fbeb6ca 100644 --- a/core/metadata.ml +++ b/core/metadata.ml @@ -31,14 +31,16 @@ let destroy_entries md len = let allocate ?(size = 0) ?(count = 0) () = let t = malloc Array.t in let () = init t in + let msize = Unsigned.Size_t.of_int (size * sizeof elem) in let size = Unsigned.Size_t.of_int size in + t |-> Array.metadata <-@ from_voidp elem @@ F.Alloc.zalloc msize; t |-> Array.capacity <-@ size; t |-> Array.count <-@ Unsigned.Size_t.of_int count; t ;; open struct - let fwd1 (k, v) = + let fwd1 (k, v) elem = let k' = Slice.from_string ~copy:true k in let key_is_valid = F.Header.key_is_legal k' > 0 in let () = @@ -55,12 +57,8 @@ open struct Slice.unref v'; failwith @@ Printf.sprintf "invalid value: %s" v) in - let vl = Ctypes.make elem in - let () = - vl @. key <-@ k'; - vl @. value <-@ v' - in - vl + elem @. key <-@ k'; + elem @. value <-@ v' ;; let bwd1 md = @@ -73,11 +71,12 @@ end let make bwd = let len = List.length bwd in let arr = allocate ~size:len ~count:len () in - bwd - |> List.map fwd1 - |> fun l -> - let md = CArray.(of_list elem l |> start) in - arr |-> metadata <-@ md; + let count = ref 0 in + CArray.from_ptr (arr |->* metadata) len + |> CArray.iter (fun elem -> + let kv = List.nth bwd !count in + fwd1 kv elem; + incr count); arr ;; diff --git a/core/op.ml b/core/op.ml index c011716..e078366 100644 --- a/core/op.ml +++ b/core/op.ml @@ -5,61 +5,26 @@ type raw = t let raw = t -type ref_recv_initial_metadata = { metadata : Metadata.raw structure ptr } -type ref_recv_message = { message : Byte_buffer.raw structure ptr ptr } - -type ref_recv_status_on_client = - { status : Status.Code.t ptr - ; metadata : Metadata.raw structure ptr - ; details : Slice.raw structure ptr - ; error_message : string ptr - } - -type ref_recv_close_on_server = { cancelled : int ptr } - type bwd = [ `Send_initial_metadata of Metadata.bwd | `Send_message of string | `Send_status_from_server of Status.t | `Send_close_from_client - | `Recv_initial_metadata of ref_recv_initial_metadata - | `Recv_message of ref_recv_message - | `Recv_status_on_client of ref_recv_status_on_client - | `Recv_close_on_server of ref_recv_close_on_server + | `Recv_initial_metadata + | `Recv_message + | `Recv_status_on_client + | `Recv_close_on_server ] -let make_ref_initial_metadata () = - let metadata = Metadata.allocate () in - `Recv_initial_metadata { metadata } -;; - -let make_ref_recv_message () = - let message = malloc (ptr Byte_buffer.t) in - `Recv_message { message } -;; - -let make_ref_recv_status_on_client () = - let status = Status.allocate () in - let metadata = Metadata.allocate () in - let details = malloc Slice.t in - let error_message = malloc string in - `Recv_status_on_client { status; metadata; details; error_message } -;; - -let make_ref_recv_close_on_server () = - let cancelled = malloc int in - `Recv_close_on_server { cancelled } -;; - let to_op_type = function | `Send_initial_metadata _ -> Type.SEND_INITIAL_METADATA | `Send_message _ -> Type.SEND_MESSAGE | `Send_status_from_server _ -> Type.SEND_STATUS_FROM_SERVER | `Send_close_from_client -> Type.SEND_CLOSE_FROM_CLIENT - | `Recv_initial_metadata _ -> Type.RECV_INITIAL_METADATA - | `Recv_message _ -> Type.RECV_MESSAGE - | `Recv_status_on_client _ -> Type.RECV_STATUS_ON_CLIENT - | `Recv_close_on_server _ -> Type.RECV_CLOSE_ON_SERVER + | `Recv_initial_metadata -> Type.RECV_INITIAL_METADATA + | `Recv_message -> Type.RECV_MESSAGE + | `Recv_status_on_client -> Type.RECV_STATUS_ON_CLIENT + | `Recv_close_on_server -> Type.RECV_CLOSE_ON_SERVER ;; let destroy ops len = @@ -69,10 +34,7 @@ let destroy ops len = let data = op' @. data in let typ = op' @.* op in match typ with - | Type.SEND_CLOSE_FROM_CLIENT | Type.RECV_CLOSE_ON_SERVER -> () - | Type.SEND_MESSAGE -> - let it = data |-> Data.send_message in - Byte_buffer.destroy (it |->* Data.Send_message.send_message) + | Type.SEND_CLOSE_FROM_CLIENT | Type.RECV_CLOSE_ON_SERVER | Type.SEND_MESSAGE -> () | Type.SEND_INITIAL_METADATA -> let it = data |-> Data.send_initial_metadata in Data.Send_initial_metadata.( @@ -87,7 +49,8 @@ let destroy ops len = let len = it |->* trailing_metadata_count |> Unsigned.Size_t.to_int in Metadata.destroy_entries md len in - ignore @@ Option.map (Slice.unref <@ deref) (it |->* status_details)) + let status_details = it |->* status_details in + if not @@ is_null status_details then Slice.unref !@status_details) | Type.RECV_INITIAL_METADATA -> let it = data |-> Data.recv_initial_metadata in let md' = it |->* Data.Recv_initial_metadata.recv_initial_metadata in @@ -97,7 +60,8 @@ let destroy ops len = Metadata.destroy md' | Type.RECV_MESSAGE -> let it = data |-> Data.recv_message in - Byte_buffer.destroy !@(it |->* Data.Recv_message.recv_message) + let recv_message = !@(it |->* Data.Recv_message.recv_message) in + if not @@ is_null recv_message then Byte_buffer.destroy recv_message | Type.RECV_STATUS_ON_CLIENT -> let it = data |-> Data.recv_status_on_client in Data.Recv_status_on_client.( @@ -107,8 +71,10 @@ let destroy ops len = let len = md |->* count |> Unsigned.Size_t.to_int in destroy_entries md' len; destroy md); - ignore @@ Option.map free (it |->* error_string); - ignore @@ Option.map (Slice.unref <@ deref) (it |->* status_details)) + let error_string = it |-> error_string in + F.Alloc.free (to_voidp error_string); + let status_details = it |->* status_details in + if not @@ is_null status_details then Slice.unref !@status_details) ;; let make (bwd : bwd) = @@ -133,7 +99,9 @@ let make (bwd : bwd) = it |-> Data.Send_message.send_message <-@ msg' | `Send_status_from_server st -> let status_details' = - Option.map (Ctypes.addr <@ Slice.from_string ~copy:true) st.Status.details + match st.Status.details with + | Some details -> (Ctypes.addr <@ Slice.from_string ~copy:true) details + | None -> from_voidp T.Slice.t null in let metadata = Metadata.(make st.metadata |->* metadata) in let it = data |-> Data.send_status_from_server in @@ -144,24 +112,55 @@ let make (bwd : bwd) = it |-> trailing_metadata_count <-@ Unsigned.Size_t.of_int @@ List.length st.metadata) - | `Recv_initial_metadata ref -> + | `Recv_initial_metadata -> let it = data |-> Data.recv_initial_metadata in - it |-> Data.Recv_initial_metadata.recv_initial_metadata <-@ ref.metadata - | `Recv_message ref -> - let it = data |-> Data.recv_message in - it |-> Data.Recv_message.recv_message <-@ ref.message - | `Recv_status_on_client ref -> + let metadata = Metadata.allocate () in + it |-> Data.Recv_initial_metadata.recv_initial_metadata <-@ metadata + | `Recv_status_on_client -> let it = data |-> Data.recv_status_on_client in - Data.Recv_status_on_client.( - it |-> trailing_metadata <-@ ref.metadata; - it |-> status <-@ ref.status; - it |-> status_details <-@ Some ref.details; - it |-> error_string <-@ Some ref.error_message) - | `Recv_close_on_server ref -> - let it = data |-> Data.recv_close_on_server in - it |-> Data.Recv_close_on_server.cancelled <-@ ref.cancelled + let tr = Metadata.allocate () in + Data.Recv_status_on_client.(it |-> trailing_metadata <-@ tr) + | _ -> () in t ;; -let make_ops = List.map make +let make_ops ops write_flag = + let ops' = List.map make ops in + List.iter + (fun op' -> + match op' @.* op with + | Type.SEND_MESSAGE -> op' @. flags <-@ write_flag + | _ -> ()) + ops'; + CArray.(start <@ of_list T.Op.t) ops' +;; + +let get ops len typ = + let r = ref None in + let open CArray in + (let exception Break in + try + from_ptr ops len + |> iter + @@ fun op' -> + let data = op' @.* data in + r := Some data; + match op' @.* op with + | Type.SEND_INITIAL_METADATA when typ = `Send_initial_metadata -> raise Break + | Type.SEND_MESSAGE when typ = `Send_message -> + r := Some data; + raise Break + | Type.SEND_STATUS_FROM_SERVER when typ = `Send_status_from_server -> raise Break + | Type.SEND_CLOSE_FROM_CLIENT when typ = `Send_close_from_client -> raise Break + | Type.RECV_INITIAL_METADATA when typ = `Recv_initial_metadata -> raise Break + | Type.RECV_MESSAGE when typ = `Recv_message -> raise Break + | Type.RECV_STATUS_ON_CLIENT when typ = `Recv_status_on_client -> raise Break + | Type.RECV_CLOSE_ON_SERVER when typ = `Recv_close_on_server -> raise Break + | _ -> () + with + | Break -> ()); + match !r with + | Some v -> v + | None -> failwith "failed to get metadata" +;; diff --git a/core/server.ml b/core/server.ml index f414a61..2165f64 100644 --- a/core/server.ml +++ b/core/server.ml @@ -264,10 +264,10 @@ let dispatch t (Rpc.{ methd; host; metadata; call; deadline } as rpc) = (fun () -> try match res with - | Some (Ok (res, md)) -> Call.unary_response call ~md (Some res) - | Some (Error (code, details, md)) -> + | Some (Ok (res, tr)) -> Call.unary_response call ~tr (Some res) + | Some (Error (code, details, tr)) -> let code = (code :> Status.Code.bwd) in - Call.unary_response call ~code ~md ?details None + Call.unary_response call ~code ~tr ?details None (* deadline exceeded *) | None -> () with diff --git a/stub/type_description.ml b/stub/type_description.ml index 7ca5ca9..a1a117f 100644 --- a/stub/type_description.ml +++ b/stub/type_description.ml @@ -839,7 +839,7 @@ module Types (F : Ctypes.TYPE) = struct let trailing_metadata_count = "trailing_metadata_count" <-. size_t let trailing_metadata = "trailing_metadata" <-. ptr Metadata.t let status = "status" <-. Status_code.t - let status_details = "status_details" <-. ptr_opt Slice.t + let status_details = "status_details" <-. ptr Slice.t let () = seal t end @@ -868,8 +868,8 @@ module Types (F : Ctypes.TYPE) = struct let trailing_metadata = "trailing_metadata" <-. ptr Metadata.Array.t let status = "status" <-. ptr Status_code.t - let status_details = "status_details" <-. ptr_opt Slice.t - let error_string = "error_string" <-. ptr_opt string + let status_details = "status_details" <-. ptr Slice.t + let error_string = "error_string" <-. ptr string let () = seal t end