Skip to content

Commit

Permalink
Extend C++ RecordingStream with global/thread_local/flush/save/connec…
Browse files Browse the repository at this point in the history
…t APIs & introduce C++ SDK tests (rerun-io#2890)

* Part of rerun-io#2516
(SDK! Not codegen! :))
* Next in the cpp series after rerun-io#2874

### What

Adds a test dependency to the
[Catch2](https://github.com/catchorg/Catch2/) testing framework in order
to start testing all the new RecordingStream features added here.

C++ tests can be conveniently run via
`./rerun_cpp/build_and_run_tests.sh` now!

For quick api overview start with the `rerun.h` and `recording_stream.h`
headers.

Fixes a range of compiler warnings as a consequence of improving some of
the CMake setup, more to do there!
Adds lots more documentation to RecordingStream as well.


Next steps:
* Add C++ to ci (linting, running this test suite)
* Add roundtrip tests
*  Add codegen custom code injection
    * Improve API usability in varous places, including 
* rerun-io#2873
* add other tests
* serialize unions
* serialize datatypes nested in unions, structs and lists
* more testing & roundtripping

### Checklist
* [x] I have read and agree to [Contributor
Guide](https://github.com/rerun-io/rerun/blob/main/CONTRIBUTING.md) and
the [Code of
Conduct](https://github.com/rerun-io/rerun/blob/main/CODE_OF_CONDUCT.md)
* [x] I've included a screenshot or gif (if applicable)
* [x] I have tested [demo.rerun.io](https://demo.rerun.io/pr/2890) (if
applicable)

- [PR Build Summary](https://build.rerun.io/pr/2890)
- [Docs
preview](https://rerun.io/preview/pr%3Aandreas%2Fcpp-api%2Fbetter-recording-stream/docs)
- [Examples
preview](https://rerun.io/preview/pr%3Aandreas%2Fcpp-api%2Fbetter-recording-stream/examples)
  • Loading branch information
Wumpf authored Aug 2, 2023
1 parent 6dcbcc0 commit 09653f0
Show file tree
Hide file tree
Showing 20 changed files with 806 additions and 240 deletions.
20 changes: 19 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,24 @@
cmake_minimum_required(VERSION 3.16)
cmake_minimum_required(VERSION 3.19)

project(rerun_cpp_proj LANGUAGES CXX)

function(set_default_warning_settings target)
if(MSVC)
target_compile_options(${target} PRIVATE /W4 /WX)
else()
target_compile_options(${target} PRIVATE -Wall -Wextra -Wpedantic -Wcast-align -Wcast-qual -Wformat=2 -Wmissing-include-dirs -Wnull-dereference -Woverloaded-virtual -Wpointer-arith -Wshadow -Wswitch-enum -Wvla -Wno-sign-compare -Wconversion -Wunused -Wold-style-cast -Wno-missing-braces)

# arrow has a bunch of unused parameters in its headers.
target_compile_options(${target} PRIVATE -Wno-unused-parameter)
endif()

# Enable this to fail on warnings.
# set_property(TARGET ${target} PROPERTY COMPILE_WARNING_AS_ERROR ON)
endfunction()

if(NOT DEFINED CMAKE_GENERATOR AND UNIX)
set(CMAKE_GENERATOR "Unix Makefiles")
endif()

add_subdirectory(rerun_cpp) # The Rerun C++ SDK library
add_subdirectory(examples/cpp/minimal)
1 change: 1 addition & 0 deletions crates/re_sdk/src/recording_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,7 @@ impl RecordingStreamBuilder {
///
/// This means that e.g. flushing the pipeline ([`Self::flush_blocking`]) guarantees that all
/// previous data sent by the calling thread has been recorded; no more, no less.
/// (e.g. it does not mean that all file caches are flushed)
///
/// ## Shutdown
///
Expand Down
2 changes: 1 addition & 1 deletion crates/re_types/source_hash.txt

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 4 additions & 2 deletions crates/re_types_builder/src/codegen/cpp/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -324,13 +324,15 @@ impl QuotedObject {
// Builder methods for all optional components.
for obj_field in obj.fields.iter().filter(|field| field.is_nullable) {
let field_ident = format_ident!("{}", obj_field.name);
// C++ compilers give warnings for re-using the same name as the member variable.
let parameter_ident = format_ident!("_{}", obj_field.name);
let method_ident = format_ident!("with_{}", obj_field.name);
let non_nullable = ObjectField {
is_nullable: false,
..obj_field.clone()
};
let parameter_declaration =
quote_variable(&mut hpp_includes, &non_nullable, &field_ident);
quote_variable(&mut hpp_includes, &non_nullable, &parameter_ident);
methods.push(Method {
docs: obj_field.docs.clone().into(),
declaration: MethodDeclaration {
Expand All @@ -341,7 +343,7 @@ impl QuotedObject {
},
},
definition_body: quote! {
this->#field_ident = std::move(#field_ident);
#field_ident = std::move(#parameter_ident);
return *this;
},
inline: true,
Expand Down
157 changes: 109 additions & 48 deletions crates/rerun_c/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,18 @@ use once_cell::sync::Lazy;
use parking_lot::Mutex;

use re_sdk::{
external::re_log_types::{self, StoreInfo, StoreSource},
external::re_log_types::{self},
log::{DataCell, DataRow},
sink::TcpSink,
time::Time,
ApplicationId, ComponentName, EntityPath, RecordingStream, StoreId, StoreKind,
ComponentName, EntityPath, RecordingStream, RecordingStreamBuilder, StoreKind,
};

// ----------------------------------------------------------------------------
// Types:

type CRecStreamId = u32;

#[repr(u32)]
#[derive(Debug)]
#[repr(i32)]
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum CStoreKind {
/// A recording of user-data.
Recording = 1,
Expand All @@ -34,14 +32,23 @@ pub enum CStoreKind {
Blueprint = 2,
}

/// Simple C version of [`StoreInfo`]
impl From<CStoreKind> for StoreKind {
fn from(kind: CStoreKind) -> Self {
match kind {
CStoreKind::Recording => StoreKind::Recording,
CStoreKind::Blueprint => StoreKind::Blueprint,
}
}
}

/// Simple C version of [`CStoreInfo`]
#[repr(C)]
#[derive(Debug)]
pub struct CStoreInfo {
/// The user-chosen name of the application doing the logging.
pub application_id: *const c_char,

pub store_kind: u32, // CStoreKind
pub store_kind: CStoreKind,
}

#[repr(C)]
Expand All @@ -66,6 +73,9 @@ pub struct CDataRow {
// ----------------------------------------------------------------------------
// Global data:

const RERUN_REC_STREAM_CURRENT_RECORDING: CRecStreamId = 0xFFFFFFFF;
const RERUN_REC_STREAM_CURRENT_BLUEPRINT: CRecStreamId = 0xFFFFFFFE;

#[derive(Default)]
pub struct RecStreams {
next_id: CRecStreamId,
Expand All @@ -79,6 +89,21 @@ impl RecStreams {
self.streams.insert(id, stream);
id
}

fn get(&self, id: CRecStreamId) -> Option<RecordingStream> {
match id {
RERUN_REC_STREAM_CURRENT_RECORDING => RecordingStream::get(StoreKind::Recording, None),
RERUN_REC_STREAM_CURRENT_BLUEPRINT => RecordingStream::get(StoreKind::Blueprint, None),
_ => self.streams.get(&id).cloned(),
}
}

fn remove(&mut self, id: CRecStreamId) -> Option<RecordingStream> {
match id {
RERUN_REC_STREAM_CURRENT_BLUEPRINT | RERUN_REC_STREAM_CURRENT_RECORDING => None,
_ => self.streams.remove(&id),
}
}
}

/// All recording streams created from C.
Expand All @@ -100,10 +125,7 @@ pub extern "C" fn rr_version_string() -> *const c_char {

#[allow(unsafe_code)]
#[no_mangle]
pub unsafe extern "C" fn rr_recording_stream_new(
cstore_info: *const CStoreInfo,
tcp_addr: *const c_char,
) -> CRecStreamId {
pub unsafe extern "C" fn rr_recording_stream_new(cstore_info: *const CStoreInfo) -> CRecStreamId {
initialize_logging();

let cstore_info = unsafe { &*cstore_info };
Expand All @@ -114,54 +136,96 @@ pub unsafe extern "C" fn rr_recording_stream_new(
} = *cstore_info;
let application_id = unsafe { CStr::from_ptr(application_id) };

let application_id =
ApplicationId::from(application_id.to_str().expect("invalid application_id"));
let mut rec_stream_builder =
RecordingStreamBuilder::new(application_id.to_str().expect("invalid application_id"))
//.is_official_example(is_official_example) // TODO(andreas): Is there a meaningful way to expose this?
//.store_id(recording_id.clone()) // TODO(andreas): Expose store id.
.store_source(re_log_types::StoreSource::CSdk);

let store_kind = match store_kind {
1 => StoreKind::Recording,
2 => StoreKind::Blueprint,
_ => panic!("invalid store_kind: expected 1 or 2, got {store_kind}"),
};
if store_kind == CStoreKind::Blueprint {
rec_stream_builder = rec_stream_builder.blueprint();
}

let store_info = StoreInfo {
application_id,
store_id: StoreId::random(store_kind),
is_official_example: false,
started: Time::now(),
store_source: StoreSource::CSdk,
store_kind,
};
let rec_stream = rec_stream_builder
.buffered()
.expect("Failed to create recording stream");

let batcher_config = Default::default();
RECORDING_STREAMS.lock().insert(rec_stream)
}

assert!(!tcp_addr.is_null());
let tcp_addr = unsafe { CStr::from_ptr(tcp_addr) };
let tcp_addr = tcp_addr
.to_str()
.expect("invalid tcp_addr string")
.parse()
.expect("invalid tcp_addr");
let sink = Box::new(TcpSink::new(tcp_addr, re_sdk::default_flush_timeout()));
#[allow(unsafe_code)]
#[no_mangle]
pub extern "C" fn rr_recording_stream_free(id: CRecStreamId) {
if let Some(stream) = RECORDING_STREAMS.lock().remove(id) {
stream.disconnect();
}
}

let rec_stream = RecordingStream::new(store_info, batcher_config, sink).unwrap();
#[allow(unsafe_code)]
#[no_mangle]
pub extern "C" fn rr_recording_stream_set_global(id: CRecStreamId, store_kind: CStoreKind) {
let stream = RECORDING_STREAMS.lock().get(id);
RecordingStream::set_global(store_kind.into(), stream);
}

RECORDING_STREAMS.lock().insert(rec_stream)
#[allow(unsafe_code)]
#[no_mangle]
pub extern "C" fn rr_recording_stream_set_thread_local(id: CRecStreamId, store_kind: CStoreKind) {
let stream = RECORDING_STREAMS.lock().get(id);
RecordingStream::set_thread_local(store_kind.into(), stream);
}

#[allow(unsafe_code)]
#[no_mangle]
pub extern "C" fn rr_recording_stream_free(id: CRecStreamId) {
let mut lock = RECORDING_STREAMS.lock();
if let Some(sink) = lock.streams.remove(&id) {
sink.disconnect();
pub extern "C" fn rr_recording_stream_flush_blocking(id: CRecStreamId) {
if let Some(stream) = RECORDING_STREAMS.lock().remove(id) {
stream.flush_blocking();
}
}

#[allow(unsafe_code)]
#[no_mangle]
pub unsafe extern "C" fn rr_log(stream: CRecStreamId, data_row: *const CDataRow) {
let lock = RECORDING_STREAMS.lock();
let Some(stream) = lock.streams.get(&stream) else {
pub unsafe extern "C" fn rr_recording_stream_connect(
id: CRecStreamId,
tcp_addr: *const c_char,
flush_timeout_sec: f32,
) {
let Some(stream) = RECORDING_STREAMS.lock().get(id) else {
return;
};

let tcp_addr = unsafe { CStr::from_ptr(tcp_addr) };
let tcp_addr = tcp_addr.to_str().expect("invalid tcp_addr");
let tcp_addr = tcp_addr.parse().expect("failed to parse tcp_addr");

let flush_timeout = if flush_timeout_sec >= 0.0 {
Some(std::time::Duration::from_secs_f32(flush_timeout_sec))
} else {
None
};

stream.connect(tcp_addr, flush_timeout);
}

#[allow(unsafe_code)]
#[no_mangle]
pub unsafe extern "C" fn rr_recording_stream_save(id: CRecStreamId, path: *const c_char) {
let Some(stream) = RECORDING_STREAMS.lock().get(id) else {
return;
};

let path = unsafe { CStr::from_ptr(path) };
let path = path.to_str().expect("invalid path");

stream
.save(path)
.expect("Failed to save recording stream to file");
}

#[allow(unsafe_code)]
#[no_mangle]
pub unsafe extern "C" fn rr_log(id: CRecStreamId, data_row: *const CDataRow, inject_time: bool) {
let Some(stream) = RECORDING_STREAMS.lock().get(id) else {
return;
};

Expand Down Expand Up @@ -211,7 +275,6 @@ pub unsafe extern "C" fn rr_log(stream: CRecStreamId, data_row: *const CDataRow)
cells: re_log_types::DataCellRow(cells),
};

let inject_time = true;
stream.record_row(data_row, inject_time);
}

Expand Down Expand Up @@ -273,5 +336,3 @@ fn parse_arrow_ipc_encapsulated_message(

Ok(arrays.into_iter().next().unwrap())
}

// ----------------------------------------------------------------------------
Loading

0 comments on commit 09653f0

Please sign in to comment.