Skip to content
This repository has been archived by the owner on Jun 28, 2024. It is now read-only.

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
Nymphium committed Aug 23, 2022
1 parent 7d83a71 commit f481497
Show file tree
Hide file tree
Showing 16 changed files with 338 additions and 282 deletions.
2 changes: 1 addition & 1 deletion bin/bench.sh
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ proto="test/proto.proto"
service="grpc_test.Echo"
rpc="Greet"

cat <<EOL | ghz -r "${RATE:-0}" -c "${CONCURRENCY:-50}" -n "${NUMBER:-200}" --proto "${proto}" --import-paths test --call "${service}/${rpc}" --data-file=/dev/stdin --insecure localhost:$GRPC_PORT ${DEBUG}
cat <<EOL | ghz -r "${RATE:-0}" -c "${CONCURRENCY:-50}" -n "${NUMBER:-200}" --proto "${proto}" -m '{ "x-req-md": "hello" }' --import-paths test --call "${service}/${rpc}" --data-file=/dev/stdin --insecure localhost:$GRPC_PORT ${DEBUG}
{
"message": "hello"
}
Expand Down
4 changes: 1 addition & 3 deletions client/client.ml
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,7 @@ let make ~host ~port ?credentials ?(args = []) () =
Option.(value ~default:id @@ map (fun id -> Headers.add "request_id" id) request_id)
in
fun ?(metadata = []) req ->
ignore timeout;
(* TODO: correctly send metadata *)
(* let metadata = Headers.(metadata |> Timeout.set_second timeout) |> set_request_id in *)
let metadata = Headers.(metadata |> Timeout.set_second timeout) |> set_request_id in
let body = PB.Writer.contents @@ decoder req in
let call = Call.make ~channel ~methd:path () in
Lwt.return @@ Call.unary_request call ~metadata ~message:body
Expand Down
44 changes: 24 additions & 20 deletions core/byte_buffer.ml
Original file line number Diff line number Diff line change
@@ -1,41 +1,45 @@
open Import
include T.Byte_buffer
include F.Byte_buffer

open struct
module M = struct
module T = T.Byte_buffer
module F = F.Byte_buffer
end
module Reader = struct
include T.Byte_buffer.Reader
include F.Byte_buffer.Reader

type raw = t

let raw = t
end

type raw = t

let raw = t

let allocate () =
let finalise t = F.Byte_buffer.destroy @@ Ctypes.addr t in
let t = Ctypes.make ~finalise T.Byte_buffer.t in
t @. T.Byte_buffer.reserved <-@ __reserved__;
let finalise t = destroy @@ Ctypes.addr t in
let t = Ctypes.make ~finalise raw in
t @. reserved <-@ __reserved__;
Ctypes.addr t
;;

let length b = Unsigned.Size_t.to_int @@ F.Byte_buffer.length b
let destroy = F.Byte_buffer.destroy
let length = Unsigned.Size_t.to_int <@ length

let from_string ?(copy = false) s =
let slice_of_string =
if copy then F.Slice.from_copied_string else F.Slice.from_static_string
in
let slice = slice_of_string s in
let buf = F.Byte_buffer.create_raw (Ctypes.addr slice) (Unsigned.Size_t.of_int 1) in
let () = F.Slice.unref slice in
let from_string ?copy s =
let slice = Slice.from_string ?copy s in
let buf = create_raw (Ctypes.addr slice) (Unsigned.Size_t.of_int 1) in
let () = Slice.unref slice in
buf
;;

let to_string_opt b =
if Ctypes.is_null b
then None
else (
let reader = malloc T.Byte_buffer.Reader.t in
let is_buf_reader_initialized = 0 < F.Byte_buffer.Reader.init reader b in
let reader = malloc Reader.t in
let is_buf_reader_initialized = 0 < Reader.init reader b in
if not is_buf_reader_initialized
then failwith "Error initializing byte_buffer_reader"
else (
let b = F.Byte_buffer.Reader.readall reader in
let b = Reader.readall reader in
Some (Slice.to_string b)))
;;
120 changes: 63 additions & 57 deletions core/call.ml
Original file line number Diff line number Diff line change
@@ -1,22 +1,19 @@
open Import
include T.Call

open struct
module M = T.Call
end
type raw = t

let raw = t

module Batch_stack = struct
type t

let t : t structure typ = Ctypes.structure "grpc_ocaml_run_batch_stack"

let recv_initial_metadata =
Ctypes.field t "recv_initial_metadata" @@ ptr T.Metadata.Array.t
;;

let recv_message = Ctypes.field t "recv_message" @@ ptr (ptr T.Byte_buffer.t)
let status = Ctypes.field t "recv_status" @@ ptr T.Status_code.t
let tr = Ctypes.field t "tr" @@ ptr T.Metadata.Array.t
let details = Ctypes.field t "details" @@ ptr T.Slice.t
let recv_initial_metadata = Ctypes.field t "recv_initial_metadata" @@ ptr Metadata.raw
let recv_message = Ctypes.field t "recv_message" @@ ptr (ptr Byte_buffer.raw)
let status = Ctypes.field t "recv_status" @@ ptr Status.Code.t
let tr = Ctypes.field t "tr" @@ ptr Metadata.raw
let details = Ctypes.field t "details" @@ ptr Slice.t
let error_message = Ctypes.field t "error_message" @@ ptr Ctypes.string
let cancelled = Ctypes.field t "cancelled" @@ ptr int
let () = seal t
Expand All @@ -30,21 +27,23 @@ module Batch_stack = struct
end

module Details = struct
include T.Call_details

let init = F.Call.Details.init

let allocate () =
let ptr = malloc ~finalise:F.Call.Details.destroy T.Call_details.t in
init ptr;
ptr
let t = malloc t in
init t;
t
;;

let destroy = F.Call.Details.destroy
end

(** wrapped call data *)
type t =
{ call : M.t
; cq : T.Completion.Queue.t
{ call : raw
; cq : Completion_queue.raw
; flags : T.Flags.Write.t
}

Expand All @@ -53,14 +52,13 @@ type write_flag =
| `No_compress
]

let wrap_raw ?(flags = Propagation_bits.defaults) ~cq ~call () = { call; cq; flags }

let destroy { call; cq; _ } =
F.Completion_queue.destroy cq;
F.Call.unref call
F.Call.unref call;
F.Completion_queue.destroy cq
;;

let allocate () = malloc M.t
let wrap_raw ?(flags = Propagation_bits.defaults) ~cq ~call () = { call; cq; flags }
let allocate () = malloc t

(** make call to ["${host}/${method}"], which [host] is [get_target channel] by default.
If [parent] is not empty, the call is a child call of [parent].
Expand Down Expand Up @@ -106,8 +104,6 @@ let make
@@ Printf.sprintf "cannot create call with method %s"
@@ Slice.to_string methd_slice
in
let () = Slice.unref methd_slice in
let () = Slice.unref host_slice in
wrap_raw ~cq ~call ~flags ()
;;

Expand All @@ -123,24 +119,24 @@ let run_batch ?(tag = Ctypes.null) t ops =
in
let err = F.Call.start_batch t.call ops ops_size' tag __reserved__ in
let () =
if err <> M.Error.OK
then failwith @@ Printf.sprintf "prepare call error(%s)" @@ M.Error.show err
if err <> Error.OK
then failwith @@ Printf.sprintf "prepare call error(%s)" @@ Error.show err
in
let inf = Timespec.(inf_future Clock_type.realtime) in
let ev = Completion_queue.pluck t.cq inf tag in
if ev @.* T.Event.success < 1
then (
let st = ev @.* T.Event.typ in
failwith @@ Printf.sprintf "run_batch failed(%s)" @@ T.Completion.Type.show st)
failwith @@ Printf.sprintf "run_batch failed(%s)" @@ Completion_queue.Type.show st)
else ops, ops_size
;;

let unary_request ?(metadata = []) ?message t =
let (`Recv_initial_metadata recv_initial_metadata as rim) =
let (`Recv_initial_metadata recv_initial_metadata' as rim) =
Op.make_ref_initial_metadata ()
in
let (`Recv_message recv_message as rm) = Op.make_ref_recv_message () in
let (`Recv_status_on_client recv_status_on_client as rsoc) =
let (`Recv_message recv_message' as rm) = Op.make_ref_recv_message () in
let (`Recv_status_on_client recv_status_on_client' as rsoc) =
Op.make_ref_recv_status_on_client ()
in
let ops =
Expand All @@ -155,45 +151,53 @@ let unary_request ?(metadata = []) ?message t =
add_msg it
in
let stack, tag = Batch_stack.make_tag_pair () in
stack |-> Batch_stack.recv_message <-@ recv_message.message;
stack |-> Batch_stack.recv_initial_metadata <-@ recv_initial_metadata.metadata;
stack |-> Batch_stack.status <-@ recv_status_on_client.status;
stack |-> Batch_stack.details <-@ recv_status_on_client.details;
stack |-> Batch_stack.error_message <-@ recv_status_on_client.error_message;
stack |-> Batch_stack.tr <-@ recv_status_on_client.metadata;
let _o = run_batch ~tag t ops in
let status = !@(recv_status_on_client.status) in
Batch_stack.(
stack |-> recv_message <-@ recv_message'.message;
stack |-> recv_initial_metadata <-@ recv_initial_metadata'.metadata;
stack |-> status <-@ recv_status_on_client'.status;
stack |-> details <-@ recv_status_on_client'.details;
stack |-> error_message <-@ recv_status_on_client'.error_message;
stack |-> tr <-@ recv_status_on_client'.metadata);
let ops, len = run_batch ~tag t ops in
let status = !@(recv_status_on_client'.status) in
let md =
let init_md = Metadata.to_bwd recv_initial_metadata.metadata in
let md = Metadata.to_bwd recv_status_on_client.metadata in
let init_md = Metadata.to_bwd recv_initial_metadata'.metadata in
let md = Metadata.to_bwd recv_status_on_client'.metadata in
md @ init_md
in
let status = Status.Code.to_bwd status in
match status with
| `OK ->
let recv_message = Byte_buffer.to_string_opt !@(recv_message.message) in
let recv_message = Byte_buffer.to_string_opt !@(recv_message'.message) in
let () =
Batch_stack.destroy stack;
Op.destroy ops len
in
Ok (recv_message, md)
| #Status.Code.fail_bwd as status ->
let details =
let slice = !@(recv_status_on_client.details) in
if is_null @@ Slice.start_ptr slice then None else Some (Slice.to_string slice)
let slice = !@(recv_status_on_client'.details) in
if Slice.is_empty slice then None else Some (Slice.to_string slice)
in
let () =
Batch_stack.destroy stack;
Op.destroy ops len
in
Error (status, details, md)
;;

let remote_read t is_metadata_received =
let remote_read t =
let (`Recv_message recv_message as rm) = Op.make_ref_recv_message () in
let (`Recv_initial_metadata recv_initial_metadata as rim) =
Op.make_ref_initial_metadata ()
in
let ops = if is_metadata_received then [ rm; rim ] else [ rm ] in
let ops = [ rm ] in
let stack, tag = Batch_stack.make_tag_pair () in
stack |-> Batch_stack.recv_message <-@ recv_message.message;
stack |-> Batch_stack.recv_initial_metadata <-@ recv_initial_metadata.metadata;
let _ = run_batch ~tag t ops in
let () = Batch_stack.destroy stack in
let md = Metadata.to_bwd recv_initial_metadata.metadata in
Byte_buffer.to_string_opt !@(recv_message.message), md
let ops, len = run_batch ~tag t ops in
let message = Byte_buffer.to_string_opt !@(recv_message.message) in
let () =
Batch_stack.destroy stack;
Op.destroy ops len
in
message
;;

(** send unary response: run RECV_CLOSE_ON_SERVER, SEND_INITIAL_METADATA and SEND_STATUS_FROM_SERVER ops *)
Expand All @@ -214,12 +218,14 @@ let unary_response ?(code = `OK) ?details ?(md = []) ?(tr = []) t res =
; rcos
]
in
let stack = malloc Batch_stack.t in
let tag = Ctypes.to_voidp @@ stack in
let stack, tag = Batch_stack.make_tag_pair () in
stack |-> Batch_stack.cancelled <-@ recv_close_on_server.cancelled;
let _ = run_batch ~tag t ops in
let () = Batch_stack.destroy stack in
let ops, len = run_batch ~tag t ops in
let closed_on_server = deref @@ recv_close_on_server.cancelled in
let () =
Batch_stack.destroy stack;
Op.destroy ops len
in
if closed_on_server <= 0
then Log.message __FILE__ __LINE__ `Info "not (properly) closed request by server"
;;
Expand Down
6 changes: 1 addition & 5 deletions core/channel.ml
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,7 @@ module Credentials = struct
type t = M.Credentials.t

let free crd = F.Channel.Credentials.release crd

let make_insecure () =
let () = Top.init () in
F.Channel.Credentials.create_insecure ()
;;
let make_insecure () = F.Channel.Credentials.create_insecure ()
end

let free_internal = F.Channel.destroy
Expand Down
12 changes: 9 additions & 3 deletions core/completion_queue.ml
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,28 @@ open struct
module M = T.Completion.Queue
end

module Type = T.Completion.Type

type raw = M.t

let t = M.t

(** 1 tic 20 msec by default *)
let tic : Timespec.t' ref = ref (`Millis 20L)

let create_for_pluck () = F.Completion_queue.create_for_pluck __reserved__

let destroy t =
F.Completion_queue.shutdown t;
F.Completion_queue.destroy t
;;

let create_for_pluck () = F.Completion_queue.create_for_pluck __reserved__

let pluck cq timeout tag =
let next_tic () = Timespec.now_after !tic in
let rec go () =
let deadline = next_tic () in
let ev = F.Completion_queue.pluck cq tag deadline __reserved__ in
let is_not_timeout = Event.type_of ev <> T.Completion.Type.QUEUE_TIMEOUT in
let is_not_timeout = Event.type_of ev <> Type.QUEUE_TIMEOUT in
let is_deadline = Timespec.cmp deadline timeout > 0 in
if is_not_timeout || is_deadline then ev else go ()
in
Expand Down
8 changes: 4 additions & 4 deletions core/log.ml
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ let severity_of_log_level = function
| INFO -> `Info
;;

let init =
let it = lazy (F.Log.init ()) in
fun () -> Lazy.force it
;;
(* let init = *)
(* let it = lazy (F.Log.init ()) in *)
(* fun () -> Lazy.force it *)
(* ;; *)

let should_log log_level =
let severity = log_level_of_severity log_level in
Expand Down
Loading

0 comments on commit f481497

Please sign in to comment.