Skip to content
This repository has been archived by the owner on Jun 28, 2024. It is now read-only.

Commit

Permalink
wip: fix metadata problem
Browse files Browse the repository at this point in the history
  • Loading branch information
Nymphium committed Sep 22, 2022
1 parent 6f0f4f3 commit 61b46f0
Show file tree
Hide file tree
Showing 6 changed files with 230 additions and 156 deletions.
215 changes: 145 additions & 70 deletions core/call.ml
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,54 @@ module Batch_stack = struct
type t

let t : t structure typ = Ctypes.structure "grpc_ocaml_run_batch_stack"
let recv_initial_metadata = Ctypes.field t "recv_initial_metadata" @@ ptr Metadata.raw
let recv_message = Ctypes.field t "recv_message" @@ ptr (ptr Byte_buffer.raw)
let status = Ctypes.field t "recv_status" @@ ptr Status.Code.t
let tr = Ctypes.field t "tr" @@ ptr Metadata.raw
let details = Ctypes.field t "details" @@ ptr Slice.t
let error_message = Ctypes.field t "error_message" @@ ptr Ctypes.string
let cancelled = Ctypes.field t "cancelled" @@ ptr int
let ops = Ctypes.field t "ops" @@ ptr Op.t
let ops_num = Ctypes.field t "ops_num" int
let send_metadata = Ctypes.field t "send_metadata" Metadata.t
let send_trailing_metadata = Ctypes.field t "send_trailing_metadata" Metadata.t
let send_status_details = Ctypes.field t "send_status_details" Slice.t
let recv_initial_metadata = Ctypes.field t "recv_initial_metadata" Metadata.t
let recv_message = Ctypes.field t "recv_message" @@ ptr Byte_buffer.raw
let status = Ctypes.field t "recv_status" @@ Status.Code.t
let recv_trailing_metadata = Ctypes.field t "recv_trailing_metadata" Metadata.t
let recv_status_details = Ctypes.field t "recv_status_details" Slice.t
let error_message = Ctypes.field t "error_message" string
let cancelled = Ctypes.field t "cancelled" int
let () = seal t
let destroy = free

let destroy t =
(* let destroy_md_with_entries md = *)
(* Metadata.destroy_entries *)
(* (md |->* Metadata.metadata) *)
(* (md |->* Metadata.count |> Unsigned.Size_t.to_int); *)
(* Metadata.destroy md *)
(* in *)
(* destroy_md_with_entries (t |-> send_metadata); *)
(* destroy_md_with_entries (t |-> send_trailing_metadata); *)
Metadata.destroy (t |-> recv_initial_metadata);
Metadata.destroy (t |-> recv_trailing_metadata)
;;

(* let recv_status_details = t |->* recv_status_details in *)
(* if Slice.start_ptr recv_status_details |> (not <@ is_null) *)
(* then Slice.unref recv_status_details *)

(* let recv_message = t |->* recv_message in *)
(* if not @@ is_null recv_message then Byte_buffer.destroy recv_message; *)
(* CArray.from_ptr (t |->* ops) (t |->* ops_num) *)
(* |> CArray.iter *)
(* @@ fun op' -> *)
(* match op' @.* Op.op with *)
(* | Op.Type.SEND_MESSAGE -> *)
(* Byte_buffer.destroy *)
(* (op' @. Op.data |-> Op.Data.send_message |->* Op.Data.Send_message.send_message) *)
(* | _ -> () *)

let make_tag_pair () =
let t = malloc t in
Metadata.init (t |-> send_metadata);
Metadata.init (t |-> send_trailing_metadata);
Metadata.init (t |-> recv_initial_metadata);
Metadata.init (t |-> recv_trailing_metadata);
let tag = to_voidp t in
t, tag
;;
Expand Down Expand Up @@ -107,17 +143,10 @@ let make
wrap_raw ~cq ~call ~flags ()
;;

let run_batch ?(tag = Ctypes.null) t ops =
let ops_size = List.length ops in
let ops_size' = Unsigned.Size_t.of_int ops_size in
let ops =
(CArray.(start <@ of_list T.Op.t)
<@ List.map (fun op ->
op @. T.Op.flags <-@ t.flags;
op))
(Op.make_ops ops)
let run_batch t ?(tag = null) ops ops_num =
let err =
F.Call.start_batch t.call ops (Unsigned.Size_t.of_int ops_num) tag __reserved__
in
let err = F.Call.start_batch t.call ops ops_size' tag __reserved__ in
let () =
if err <> Error.OK
then failwith @@ Printf.sprintf "prepare call error(%s)" @@ Error.show err
Expand All @@ -128,20 +157,17 @@ let run_batch ?(tag = Ctypes.null) t ops =
then (
let st = ev @.* T.Event.typ in
failwith @@ Printf.sprintf "run_batch failed(%s)" @@ Completion_queue.Type.show st)
else ops, ops_size
;;

let unary_request ?(metadata = []) ?message t =
let (`Recv_initial_metadata recv_initial_metadata' as rim) =
Op.make_ref_initial_metadata ()
in
let (`Recv_message recv_message' as rm) = Op.make_ref_recv_message () in
let (`Recv_status_on_client recv_status_on_client' as rsoc) =
Op.make_ref_recv_status_on_client ()
in
let ops =
let it =
[ `Send_initial_metadata metadata; `Send_close_from_client; rim; rm; rsoc ]
[ `Send_initial_metadata metadata
; `Send_close_from_client
; `Recv_initial_metadata
; `Recv_message
; `Recv_status_on_client
]
in
let add_msg =
match message with
Expand All @@ -150,62 +176,84 @@ let unary_request ?(metadata = []) ?message t =
in
add_msg it
in
let ops' = Op.make_ops ops t.flags in
let ops_num = List.length ops in
let recv_message =
Op.(
get ops' ops_num `Recv_message @. Data.recv_message
|-> Data.Recv_message.recv_message)
in
let recv_initial_metadata =
Op.(get ops' ops_num `Recv_initial_metadata @. Data.recv_initial_metadata)
in
let recv_status_on_client =
Op.(get ops' ops_num `Recv_status_on_client @. Data.recv_status_on_client)
in
let md =
recv_initial_metadata |-> Op.Data.Recv_initial_metadata.recv_initial_metadata
in
let status = recv_status_on_client |-> Op.Data.Recv_status_on_client.status in
let status_details =
recv_status_on_client |-> Op.Data.Recv_status_on_client.status_details
in
let error_string =
recv_status_on_client |-> Op.Data.Recv_status_on_client.error_string
in
let tr = recv_status_on_client |-> Op.Data.Recv_status_on_client.trailing_metadata in
let stack, tag = Batch_stack.make_tag_pair () in
Batch_stack.(
stack |-> recv_message <-@ recv_message'.message;
stack |-> recv_initial_metadata <-@ recv_initial_metadata'.metadata;
stack |-> status <-@ recv_status_on_client'.status;
stack |-> details <-@ recv_status_on_client'.details;
stack |-> error_message <-@ recv_status_on_client'.error_message;
stack |-> tr <-@ recv_status_on_client'.metadata);
let ops, len = run_batch ~tag t ops in
let status = !@(recv_status_on_client'.status) in
stack |-> Batch_stack.ops <-@ ops';
stack |-> Batch_stack.ops_num <-@ ops_num;
recv_message <-@ (stack |-> Batch_stack.recv_message);
md <-@ (stack |-> Batch_stack.recv_initial_metadata);
status <-@ (stack |-> Batch_stack.status);
status_details <-@ (stack |-> Batch_stack.recv_status_details);
error_string <-@ (stack |-> Batch_stack.error_message);
tr <-@ (stack |-> Batch_stack.recv_trailing_metadata);
let () = run_batch ~tag t ops' ops_num in
let status = !@status in
let md =
let init_md = Metadata.to_bwd recv_initial_metadata'.metadata in
let md = Metadata.to_bwd recv_status_on_client'.metadata in
md @ init_md
let md = Metadata.to_bwd !@md in
let tr = Metadata.to_bwd !@tr in
tr @ md
in
let status = Status.Code.to_bwd status in
let status = Status.Code.to_bwd !@status in
Fun.protect ~finally:(fun () -> Batch_stack.destroy stack)
@@ fun () ->
match status with
| `OK ->
let recv_message = Byte_buffer.to_string_opt !@(recv_message'.message) in
let () =
Batch_stack.destroy stack;
Op.destroy ops len
in
let recv_message = Byte_buffer.to_string_opt !@(!@recv_message) in
Ok (recv_message, md)
| #Status.Code.fail_bwd as status ->
let details =
let slice = !@(recv_status_on_client'.details) in
if Slice.is_empty slice then None else Some (Slice.to_string slice)
in
let () =
Batch_stack.destroy stack;
Op.destroy ops len
let slice = !@status_details in
if is_null slice then None else Some (Slice.to_string !@slice)
in
Error (status, details, md)
;;

let remote_read t =
let (`Recv_message recv_message as rm) = Op.make_ref_recv_message () in
let ops = [ rm ] in
let stack, tag = Batch_stack.make_tag_pair () in
stack |-> Batch_stack.recv_message <-@ recv_message.message;
let ops, len = run_batch ~tag t ops in
let message = Byte_buffer.to_string_opt !@(recv_message.message) in
let () =
Batch_stack.destroy stack;
Op.destroy ops len
let ops = [ `Recv_message ] in
let ops_num = 1 in
let ops' = Op.make_ops ops t.flags in
let recv_message =
Op.(
get ops' ops_num `Recv_message @. Data.recv_message
|-> Data.Recv_message.recv_message)
in
let stack, tag = Batch_stack.make_tag_pair () in
stack |-> Batch_stack.ops <-@ ops';
stack |-> Batch_stack.ops_num <-@ ops_num;
recv_message <-@ (stack |-> Batch_stack.recv_message);
Fun.protect ~finally:(fun () -> Batch_stack.destroy stack)
@@ fun () ->
let () = run_batch ~tag t ops' ops_num in
let message = Byte_buffer.to_string_opt !@(!@recv_message) in
message
;;

(** send unary response: run RECV_CLOSE_ON_SERVER, SEND_INITIAL_METADATA and SEND_STATUS_FROM_SERVER ops *)
let unary_response ?(code = `OK) ?details ?(md = []) ?(tr = []) t res =
let code = Status.Code.of_bwd code in
let (`Recv_close_on_server recv_close_on_server as rcos) =
Op.make_ref_recv_close_on_server ()
in
let ops =
let add_msg =
match res with
Expand All @@ -215,17 +263,44 @@ let unary_response ?(code = `OK) ?details ?(md = []) ?(tr = []) t res =
add_msg
[ `Send_initial_metadata md
; `Send_status_from_server Status.{ code; details; metadata = tr }
; rcos
; `Recv_close_on_server
]
in
let ops_num = List.length ops in
let ops' = Op.make_ops ops t.flags in
let stack, tag = Batch_stack.make_tag_pair () in
stack |-> Batch_stack.cancelled <-@ recv_close_on_server.cancelled;
let ops, len = run_batch ~tag t ops in
let closed_on_server = deref @@ recv_close_on_server.cancelled in
let () =
Batch_stack.destroy stack;
Op.destroy ops len
let send_initial_metadata =
Op.(get ops' ops_num `Send_initial_metadata @. Data.send_initial_metadata)
in
let send_status_from_server =
Op.(get ops' ops_num `Send_status_from_server @. Data.send_status_from_server)
in
let send_md = send_initial_metadata |-> Op.Data.Send_initial_metadata.metadata in
let send_md_count = send_initial_metadata |-> Op.Data.Send_initial_metadata.count in
let send_metadata = stack |-> Batch_stack.send_metadata in
send_metadata |-> Metadata.metadata <-@ !@send_md;
send_metadata |-> Metadata.count <-@ !@send_md_count;
(* let tr'' = Metadata.make tr in *)
let send_tr =
send_status_from_server |->* Op.Data.Send_status_from_server.trailing_metadata
in
let send_tr_count =
send_status_from_server |->* Op.Data.Send_status_from_server.trailing_metadata_count
in
let send_trailing_metadata = stack |-> Batch_stack.send_trailing_metadata in
send_trailing_metadata |-> Metadata.metadata <-@ send_tr;
send_trailing_metadata |-> Metadata.count <-@ send_tr_count;
let recv_close_on_server =
Op.(get ops' ops_num `Recv_close_on_server @. Data.recv_close_on_server)
in
let cancelled = recv_close_on_server |-> Op.Data.Recv_close_on_server.cancelled in
stack |-> Batch_stack.ops <-@ ops';
stack |-> Batch_stack.ops_num <-@ ops_num;
cancelled <-@ (stack |-> Batch_stack.cancelled);
Fun.protect ~finally:(fun () -> Batch_stack.destroy stack)
@@ fun () ->
let () = run_batch ~tag t ops' ops_num in
let closed_on_server = !@(!@cancelled) in
if closed_on_server <= 0
then Log.message __FILE__ __LINE__ `Info "not (properly) closed request by server"
;;
Expand Down
3 changes: 2 additions & 1 deletion core/import.ml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ let[@inline] malloc ?finalise t =
ptr
;;

let[@inline] free t = F.Alloc.free @@ Ctypes.to_voidp t
let[@inline] free v = F.Alloc.free @@ Ctypes.to_voidp v
let[@inline] nullof t = Ctypes.from_voidp t Ctypes.null

(** [NULL] *)
let __reserved__ = Ctypes.null
Expand Down
23 changes: 11 additions & 12 deletions core/metadata.ml
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,16 @@ let destroy_entries md len =
let allocate ?(size = 0) ?(count = 0) () =
let t = malloc Array.t in
let () = init t in
let msize = Unsigned.Size_t.of_int (size * sizeof elem) in
let size = Unsigned.Size_t.of_int size in
t |-> Array.metadata <-@ from_voidp elem @@ F.Alloc.zalloc msize;
t |-> Array.capacity <-@ size;
t |-> Array.count <-@ Unsigned.Size_t.of_int count;
t
;;

open struct
let fwd1 (k, v) =
let fwd1 (k, v) elem =
let k' = Slice.from_string ~copy:true k in
let key_is_valid = F.Header.key_is_legal k' > 0 in
let () =
Expand All @@ -55,12 +57,8 @@ open struct
Slice.unref v';
failwith @@ Printf.sprintf "invalid value: %s" v)
in
let vl = Ctypes.make elem in
let () =
vl @. key <-@ k';
vl @. value <-@ v'
in
vl
elem @. key <-@ k';
elem @. value <-@ v'
;;

let bwd1 md =
Expand All @@ -73,11 +71,12 @@ end
let make bwd =
let len = List.length bwd in
let arr = allocate ~size:len ~count:len () in
bwd
|> List.map fwd1
|> fun l ->
let md = CArray.(of_list elem l |> start) in
arr |-> metadata <-@ md;
let count = ref 0 in
CArray.from_ptr (arr |->* metadata) len
|> CArray.iter (fun elem ->
let kv = List.nth bwd !count in
fwd1 kv elem;
incr count);
arr
;;

Expand Down
Loading

0 comments on commit 61b46f0

Please sign in to comment.