diff --git a/Cargo.toml b/Cargo.toml index e177ce3..4188870 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,6 +22,7 @@ pretty_env_logger = "0.4" galvanic-assert = "0.8" chrono = "0.4" memoffset = "0.6.4" +dyn-clone = "1.0.4" [dev-dependencies] tempfile = "3.1" diff --git a/examples/basic_publisher.rs b/examples/basic_publisher.rs index 4ad9909..f7b65b3 100644 --- a/examples/basic_publisher.rs +++ b/examples/basic_publisher.rs @@ -108,8 +108,8 @@ fn main() { println!("Using CnC file: {}", context.cnc_file_name()); - context.set_new_publication_handler(on_new_publication_handler); - context.set_error_handler(error_handler); + context.set_new_publication_handler(Box::new(on_new_publication_handler)); + context.set_error_handler(Box::new(error_handler)); context.set_pre_touch_mapped_memory(true); let aeron = Aeron::new(context); diff --git a/examples/basic_subscriber.rs b/examples/basic_subscriber.rs index 5a015da..b83e07e 100644 --- a/examples/basic_subscriber.rs +++ b/examples/basic_subscriber.rs @@ -65,10 +65,6 @@ fn parse_cmd_line() -> Settings { Settings::new() } -fn on_new_subscription_handler(channel: CString, stream_id: i32, correlation_id: i64) { - println!("Subscription: {} {} {}", channel.to_str().unwrap(), stream_id, correlation_id); -} - fn available_image_handler(image: &Image) { println!( "Available image correlation_id={} session_id={} at position={} from {}", @@ -132,10 +128,12 @@ fn main() { println!("Using CnC file: {}", context.cnc_file_name()); - context.set_new_subscription_handler(on_new_subscription_handler); - context.set_available_image_handler(available_image_handler); - context.set_unavailable_image_handler(unavailable_image_handler); - context.set_error_handler(error_handler); + context.set_new_subscription_handler(Box::new(|channel: CString, stream_id: i32, correlation_id: i64| { + println!("Subscription: {} {} {}", channel.to_str().unwrap(), stream_id, correlation_id) + })); + context.set_available_image_handler(Box::new(available_image_handler)); + context.set_unavailable_image_handler(Box::new(unavailable_image_handler)); + context.set_error_handler(Box::new(error_handler)); context.set_pre_touch_mapped_memory(true); let aeron = Aeron::new(context); diff --git a/examples/ping.rs b/examples/ping.rs index 707f6e2..2e023c3 100644 --- a/examples/ping.rs +++ b/examples/ping.rs @@ -209,11 +209,11 @@ fn main() { println!("Using CnC file: {}", context.cnc_file_name()); - context.set_new_subscription_handler(on_new_subscription_handler); - context.set_new_publication_handler(on_new_publication_handler); - context.set_available_image_handler(available_image_handler); - context.set_unavailable_image_handler(unavailable_image_handler); - context.set_error_handler(error_handler); + context.set_new_subscription_handler(Box::new(on_new_subscription_handler)); + context.set_new_publication_handler(Box::new(on_new_publication_handler)); + context.set_available_image_handler(Box::new(available_image_handler)); + context.set_unavailable_image_handler(Box::new(unavailable_image_handler)); + context.set_error_handler(Box::new(error_handler)); context.set_pre_touch_mapped_memory(true); //context.set_use_conductor_agent_invoker(true); // start it in one thread for debugging diff --git a/examples/throughput.rs b/examples/throughput.rs index 184bc99..d429df8 100644 --- a/examples/throughput.rs +++ b/examples/throughput.rs @@ -161,11 +161,11 @@ fn main() { println!("Using CnC file: {}", context.cnc_file_name()); - context.set_new_subscription_handler(on_new_subscription_handler); - context.set_new_publication_handler(on_new_publication_handler); - context.set_available_image_handler(available_image_handler); - context.set_unavailable_image_handler(unavailable_image_handler); - context.set_error_handler(error_handler); + context.set_new_subscription_handler(Box::new(on_new_subscription_handler)); + context.set_new_publication_handler(Box::new(on_new_publication_handler)); + context.set_available_image_handler(Box::new(available_image_handler)); + context.set_unavailable_image_handler(Box::new(unavailable_image_handler)); + context.set_error_handler(Box::new(error_handler)); context.set_pre_touch_mapped_memory(true); //context.set_use_conductor_agent_invoker(true); // start it in one thread for debugging diff --git a/src/aeron.rs b/src/aeron.rs index 56f308d..953bf42 100644 --- a/src/aeron.rs +++ b/src/aeron.rs @@ -309,8 +309,8 @@ impl Aeron { &mut self, channel: CString, stream_id: i32, - on_available_image_handler: OnAvailableImage, - on_unavailable_image_handler: OnUnavailableImage, + on_available_image_handler: Box, + on_unavailable_image_handler: Box, ) -> Result { self.conductor.lock().expect("Mutex poisoned").add_subscription( channel, @@ -401,7 +401,7 @@ impl Aeron { * * @param handler to be added to the available counters list. */ - pub fn add_available_counter_handler(&mut self, handler: OnAvailableCounter) { + pub fn add_available_counter_handler(&mut self, handler: Box) { let _ignored = self .conductor .lock() @@ -414,7 +414,7 @@ impl Aeron { * * @param handler to be removed from the available counters list. */ - pub fn remove_available_counter_handler(&mut self, handler: OnAvailableCounter) { + pub fn remove_available_counter_handler(&mut self, handler: Box) { let _ignored = self .conductor .lock() @@ -427,7 +427,7 @@ impl Aeron { * * @param handler to be added to the unavailable counters list. */ - pub fn add_unavailable_counter_handler(&mut self, handler: OnUnavailableCounter) { + pub fn add_unavailable_counter_handler(&mut self, handler: Box) { let _ignored = self .conductor .lock() @@ -440,7 +440,7 @@ impl Aeron { * * @param handler to be removed from the unavailable counters list. */ - pub fn remove_unavailable_counter_handler(&mut self, handler: OnUnavailableCounter) { + pub fn remove_unavailable_counter_handler(&mut self, handler: Box) { let _ignored = self .conductor .lock() @@ -453,7 +453,7 @@ impl Aeron { * * @param handler to be added to the close client handlers list. */ - pub fn add_close_client_handler(&mut self, handler: OnCloseClient) { + pub fn add_close_client_handler(&mut self, handler: Box) { let _ignored = self .conductor .lock() @@ -466,7 +466,7 @@ impl Aeron { * * @param handler to be removed from the close client handlers list. */ - pub fn remove_close_client_handler(&mut self, handler: OnCloseClient) { + pub fn remove_close_client_handler(&mut self, handler: Box) { let _ignored = self .conductor .lock() diff --git a/src/client_conductor.rs b/src/client_conductor.rs index e291eae..5d2c0e9 100644 --- a/src/client_conductor.rs +++ b/src/client_conductor.rs @@ -145,8 +145,8 @@ struct SubscriptionStateDefn { error_message: CString, subscription_cache: Option>>, subscription: Option>>, - on_available_image_handler: OnAvailableImage, - on_unavailable_image_handler: OnUnavailableImage, + on_available_image_handler: Box, + on_unavailable_image_handler: Box, channel: CString, registration_id: i64, time_of_registration_ms: Moment, @@ -161,8 +161,8 @@ impl SubscriptionStateDefn { registration_id: i64, stream_id: i32, now_ms: Moment, - on_available_image_handler: OnAvailableImage, - on_unavailable_image_handler: OnUnavailableImage, + on_available_image_handler: Box, + on_unavailable_image_handler: Box, ) -> Self { Self { error_message: CString::new("").unwrap(), @@ -276,14 +276,14 @@ pub struct ClientConductor { counters_reader: Arc, counter_values_buffer: AtomicBuffer, - on_new_publication_handler: OnNewPublication, - on_new_exclusive_publication_handler: OnNewPublication, - on_new_subscription_handler: OnNewSubscription, - error_handler: ErrorHandler, + on_new_publication_handler: Box, + on_new_exclusive_publication_handler: Box, + on_new_subscription_handler: Box, + error_handler: Box, - on_available_counter_handlers: Vec, - on_unavailable_counter_handlers: Vec, - on_close_client_handlers: Vec, + on_available_counter_handlers: Vec>, + on_unavailable_counter_handlers: Vec>, + on_close_client_handlers: Vec>, epoch_clock: Box Moment>, driver_timeout_ms: Moment, @@ -313,13 +313,13 @@ impl ClientConductor { broadcast_receiver: Arc>, counter_metadata_buffer: AtomicBuffer, counter_values_buffer: AtomicBuffer, - on_new_publication_handler: OnNewPublication, - on_new_exclusive_publication_handler: OnNewPublication, - on_new_subscription_handler: OnNewSubscription, - error_handler: ErrorHandler, - on_available_counter_handler: OnAvailableCounter, - on_unavailable_counter_handler: OnUnavailableCounter, - on_close_client_handler: OnCloseClient, + on_new_publication_handler: Box, + on_new_exclusive_publication_handler: Box, + on_new_subscription_handler: Box, + error_handler: Box, + on_available_counter_handler: Box, + on_unavailable_counter_handler: Box, + on_close_client_handler: Box, driver_timeout_ms: Moment, resource_linger_timeout_ms: Moment, inter_service_timeout_ns: Moment, @@ -378,23 +378,23 @@ impl ClientConductor { self.epoch_clock = new_provider; } - pub fn set_error_handler(&mut self, new_handler: ErrorHandler) { + pub fn set_error_handler(&mut self, new_handler: Box) { self.error_handler = new_handler; } - pub fn set_on_new_publication_handler(&mut self, new_handler: OnNewPublication) { + pub fn set_on_new_publication_handler(&mut self, new_handler: Box) { self.on_new_publication_handler = new_handler; } - pub fn set_on_new_subscription_handler(&mut self, new_handler: OnNewSubscription) { + pub fn set_on_new_subscription_handler(&mut self, new_handler: Box) { self.on_new_subscription_handler = new_handler; } - pub fn add_on_available_counter_handler(&mut self, handler: OnAvailableCounter) { + pub fn add_on_available_counter_handler(&mut self, handler: Box) { self.on_available_counter_handlers.push(handler); } - pub fn add_on_unavailable_counter_handler(&mut self, handler: OnUnavailableCounter) { + pub fn add_on_unavailable_counter_handler(&mut self, handler: Box) { self.on_unavailable_counter_handlers.push(handler); } @@ -441,7 +441,7 @@ impl ClientConductor { ttrace!("on_heartbeat_check_timeouts: {:?}", &err); - (self.error_handler)(err); + self.error_handler.call(err); } self.time_of_last_do_work_ms = now_ms; @@ -460,7 +460,7 @@ impl ClientConductor { ttrace!("on_heartbeat_check_timeouts: {:?}", &err); - (self.error_handler)(err); + self.error_handler.call(err); } let client_id = self.driver_proxy.client_id(); @@ -479,7 +479,7 @@ impl ClientConductor { ttrace!("on_heartbeat_check_timeouts: {:?}", &err); - (self.error_handler)(err); + self.error_handler.call(err); } } else { let counter_id = heartbeat_timestamp::find_counter_id_by_registration_id( @@ -519,14 +519,14 @@ impl ClientConductor { pub fn verify_driver_is_active_via_error_handler(&self) { if !self.driver_active.load(Ordering::SeqCst) { let err = DriverInteractionError::Inactive.into(); - (self.error_handler)(err); + self.error_handler.call(err); } } pub fn ensure_not_reentrant(&self) { if self.is_in_callback { let err = AeronError::ReentrantException; - (self.error_handler)(err); + self.error_handler.call(err); } } @@ -858,8 +858,8 @@ impl ClientConductor { &mut self, channel: CString, stream_id: i32, - on_available_image_handler: OnAvailableImage, - on_unavailable_image_handler: OnUnavailableImage, + on_available_image_handler: Box, + on_unavailable_image_handler: Box, ) -> Result { ttrace!( "add_subscription: on channel:{} stream:{}", @@ -961,7 +961,7 @@ impl ClientConductor { image.close(); let _callback_guard = CallbackGuard::new(&mut self.is_in_callback); - (subscription.on_unavailable_image_handler)(image); + subscription.on_unavailable_image_handler.call(image); } } else { ttrace!( @@ -1239,7 +1239,7 @@ impl ClientConductor { result } - pub fn add_available_counter_handler(&mut self, handler: OnAvailableCounter) -> Result<(), AeronError> { + pub fn add_available_counter_handler(&mut self, handler: Box) -> Result<(), AeronError> { self.ensure_not_reentrant(); self.ensure_open()?; @@ -1247,7 +1247,7 @@ impl ClientConductor { Ok(()) } - pub fn remove_available_counter_handler(&mut self, _handler: OnAvailableCounter) -> Result<(), AeronError> { + pub fn remove_available_counter_handler(&mut self, _handler: Box) -> Result<(), AeronError> { self.ensure_not_reentrant(); self.ensure_open()?; @@ -1255,7 +1255,7 @@ impl ClientConductor { Ok(()) } - pub fn add_unavailable_counter_handler(&mut self, handler: OnUnavailableCounter) -> Result<(), AeronError> { + pub fn add_unavailable_counter_handler(&mut self, handler: Box) -> Result<(), AeronError> { self.ensure_not_reentrant(); self.ensure_open()?; @@ -1263,14 +1263,14 @@ impl ClientConductor { Ok(()) } - pub fn remove_unavailable_counter_handler(&mut self, _handler: OnUnavailableCounter) -> Result<(), AeronError> { + pub fn remove_unavailable_counter_handler(&mut self, _handler: Box) -> Result<(), AeronError> { self.ensure_not_reentrant(); self.ensure_open()?; //self.on_unavailable_counter_handlers.retain(|item| item != handler); FIXME Ok(()) } - pub fn add_close_client_handler(&mut self, handler: OnCloseClient) -> Result<(), AeronError> { + pub fn add_close_client_handler(&mut self, handler: Box) -> Result<(), AeronError> { self.ensure_not_reentrant(); self.ensure_open()?; @@ -1278,7 +1278,7 @@ impl ClientConductor { Ok(()) } - pub fn remove_close_client_handler(&mut self, _handler: OnCloseClient) -> Result<(), AeronError> { + pub fn remove_close_client_handler(&mut self, _handler: Box) -> Result<(), AeronError> { self.ensure_not_reentrant(); self.ensure_open()?; @@ -1321,7 +1321,7 @@ impl ClientConductor { image.close(); let _callback_guard = CallbackGuard::new(&mut self.is_in_callback); - (sub_defn.on_unavailable_image_handler)(image); + sub_defn.on_unavailable_image_handler.call(image); } images_to_linger.push(images); } @@ -1351,7 +1351,7 @@ impl ClientConductor { for handler in &self.on_unavailable_counter_handlers { let _callback_guard = CallbackGuard::new(&mut self.is_in_callback); - handler(&self.counters_reader, registration_id, counter_id); + handler.call(&self.counters_reader, registration_id, counter_id); } if let Some(cache) = &cnt_defn.counter_cache { @@ -1365,7 +1365,7 @@ impl ClientConductor { for handler in &self.on_close_client_handlers { let _callback_guard = CallbackGuard::new(&mut self.is_in_callback); - handler(); + handler.call(); } } @@ -1480,7 +1480,8 @@ impl DriverListener for ClientConductor { state.original_registration_id = original_registration_id; let _callback_guard = CallbackGuard::new(&mut self.is_in_callback); - (self.on_new_publication_handler)(state.channel.clone(), stream_id, session_id, registration_id); + self.on_new_publication_handler + .call(state.channel.clone(), stream_id, session_id, registration_id); } } @@ -1534,7 +1535,8 @@ impl DriverListener for ClientConductor { state.buffers = log_buffers; let _callback_guard = CallbackGuard::new(&mut self.is_in_callback); - (self.on_new_exclusive_publication_handler)(state.channel.clone(), stream_id, session_id, registration_id); + self.on_new_exclusive_publication_handler + .call(state.channel.clone(), stream_id, session_id, registration_id); } } @@ -1559,7 +1561,8 @@ impl DriverListener for ClientConductor { state.subscription = Some(Arc::downgrade(&subscr)); let _callback_guard = CallbackGuard::new(&mut self.is_in_callback); - (self.on_new_subscription_handler)(state.channel.clone(), state.stream_id, registration_id); + self.on_new_subscription_handler + .call(state.channel.clone(), state.stream_id, registration_id); } } @@ -1584,7 +1587,7 @@ impl DriverListener for ClientConductor { if subscription.channel_status_id() == offending_command_correlation_id as i32 { ttrace!("on_channel_endpoint_error_response: for subscription, offending_command_correlation_id {}, error_message {}", offending_command_correlation_id, error_message.to_str().unwrap()); - (self.error_handler)(ChannelEndpointException( + self.error_handler.call(ChannelEndpointException( offending_command_correlation_id, String::from(error_message.to_str().expect("CString conversion error")), )); @@ -1594,7 +1597,7 @@ impl DriverListener for ClientConductor { image.close(); let _callback_guard = CallbackGuard::new(&mut self.is_in_callback); - (subscr_defn.on_unavailable_image_handler)(image); + subscr_defn.on_unavailable_image_handler.call(image); } linger_images.push(images); subscription_to_remove.push(*reg_id); @@ -1622,7 +1625,7 @@ impl DriverListener for ClientConductor { { ttrace!("on_channel_endpoint_error_response: for publication, offending_command_correlation_id {}, error_message {}", offending_command_correlation_id, error_message.to_str().unwrap()); - (self.error_handler)(ChannelEndpointException( + self.error_handler.call(ChannelEndpointException( offending_command_correlation_id, String::from(error_message.to_str().expect("CString conversion error")), )); @@ -1644,7 +1647,7 @@ impl DriverListener for ClientConductor { if publication.lock().expect("Mutex on pub poisoned").channel_status_id() == offending_command_correlation_id as i32 { - (self.error_handler)(ChannelEndpointException( + self.error_handler.call(ChannelEndpointException( offending_command_correlation_id, String::from(error_message.to_str().expect("CString conversion error")), )); @@ -1788,11 +1791,11 @@ impl DriverListener for ClientConductor { source_identity, &subscriber_position, log_buffers.unwrap(), - self.error_handler, + dyn_clone::clone_box(&*self.error_handler), ); let _callback_guard = CallbackGuard::new(&mut self.is_in_callback); - (subscr_defn.on_available_image_handler)(&image); + subscr_defn.on_available_image_handler.call(&image); linger_images = Some(subscription.lock().expect("Mutex poisoned").add_image(image)); } @@ -1823,9 +1826,9 @@ impl DriverListener for ClientConductor { subscription.lock().expect("Mutex poisoned").remove_image(correlation_id) { let _callback_guard = CallbackGuard::new(&mut self.is_in_callback); - (subscr_defn.on_unavailable_image_handler)( - old_image_array.get(index as usize).expect("Bug in image handling"), - ); + subscr_defn + .on_unavailable_image_handler + .call(old_image_array.get(index as usize).expect("Bug in image handling")); linger_images = Some(old_image_array); } } @@ -1862,7 +1865,7 @@ impl DriverListener for ClientConductor { // Handler are called for all counters (not only those created by this Aeron client) for handler in &self.on_available_counter_handlers { let _callback_guard = CallbackGuard::new(&mut self.is_in_callback); - handler(&self.counters_reader, registration_id, counter_id); + handler.call(&self.counters_reader, registration_id, counter_id); } } @@ -1875,7 +1878,7 @@ impl DriverListener for ClientConductor { for handler in &self.on_unavailable_counter_handlers { let _callback_guard = CallbackGuard::new(&mut self.is_in_callback); - handler(&self.counters_reader, registration_id, counter_id); + handler.call(&self.counters_reader, registration_id, counter_id); } } @@ -1884,7 +1887,7 @@ impl DriverListener for ClientConductor { ttrace!("on_client_timeout client_id {}. Closing all resources.", client_id,); self.close_all_resources((self.epoch_clock)()); - (self.error_handler)(AeronError::ClientTimeoutException); + self.error_handler.call(AeronError::ClientTimeoutException); } } } @@ -2027,13 +2030,13 @@ mod tests { local_copy_broadcast_receiver, counters_metadata_buffer, counters_values_buffer, - on_new_publication_handler, - on_new_exclusive_publication_handler, - on_new_subscription_handler, - error_handler, - on_available_counter_handler, - on_unavailable_counter_handler, - on_close_client_handler, + Box::new(on_new_publication_handler), + Box::new(on_new_exclusive_publication_handler), + Box::new(on_new_subscription_handler), + Box::new(error_handler), + Box::new(on_available_counter_handler), + Box::new(on_unavailable_counter_handler), + Box::new(on_close_client_handler), DRIVER_TIMEOUT_MS, RESOURCE_LINGER_TIMEOUT_MS, INTER_SERVICE_TIMEOUT_NS, @@ -2640,8 +2643,8 @@ mod tests { .add_subscription( str_to_c(CHANNEL), STREAM_ID, - on_available_image_handler, - on_unavailable_image_handler, + Box::new(on_available_image_handler), + Box::new(on_unavailable_image_handler), ) .unwrap(); @@ -2662,8 +2665,8 @@ mod tests { .add_subscription( str_to_c(CHANNEL), STREAM_ID, - on_available_image_handler, - on_unavailable_image_handler, + Box::new(on_available_image_handler), + Box::new(on_unavailable_image_handler), ) .unwrap(); @@ -2694,8 +2697,8 @@ mod tests { .add_subscription( str_to_c(CHANNEL), STREAM_ID, - on_available_image_handler, - on_unavailable_image_handler, + Box::new(on_available_image_handler), + Box::new(on_unavailable_image_handler), ) .unwrap(); @@ -2724,8 +2727,8 @@ mod tests { .add_subscription( str_to_c(CHANNEL), STREAM_ID, - on_available_image_handler, - on_unavailable_image_handler, + Box::new(on_available_image_handler), + Box::new(on_unavailable_image_handler), ) .unwrap(); @@ -2768,8 +2771,8 @@ mod tests { .add_subscription( str_to_c(CHANNEL), STREAM_ID, - on_available_image_handler, - on_unavailable_image_handler, + Box::new(on_available_image_handler), + Box::new(on_unavailable_image_handler), ) .unwrap(); let id2 = test @@ -2779,8 +2782,8 @@ mod tests { .add_subscription( str_to_c(CHANNEL), STREAM_ID, - on_available_image_handler, - on_unavailable_image_handler, + Box::new(on_available_image_handler), + Box::new(on_unavailable_image_handler), ) .unwrap(); @@ -2798,8 +2801,8 @@ mod tests { .add_subscription( str_to_c(CHANNEL), STREAM_ID, - on_available_image_handler, - on_unavailable_image_handler, + Box::new(on_available_image_handler), + Box::new(on_unavailable_image_handler), ) .unwrap(); @@ -2827,8 +2830,8 @@ mod tests { .add_subscription( str_to_c(CHANNEL), STREAM_ID, - on_available_image_handler, - on_unavailable_image_handler, + Box::new(on_available_image_handler), + Box::new(on_unavailable_image_handler), ) .unwrap(); let id2 = test @@ -2838,8 +2841,8 @@ mod tests { .add_subscription( str_to_c(CHANNEL), STREAM_ID, - on_available_image_handler, - on_unavailable_image_handler, + Box::new(on_available_image_handler), + Box::new(on_unavailable_image_handler), ) .unwrap(); @@ -2871,8 +2874,8 @@ mod tests { .add_subscription( str_to_c(CHANNEL), STREAM_ID, - on_available_image_handler, - on_unavailable_image_handler, + Box::new(on_available_image_handler), + Box::new(on_unavailable_image_handler), ) .unwrap(); @@ -2897,8 +2900,8 @@ mod tests { .add_subscription( str_to_c(CHANNEL), STREAM_ID, - on_available_image_handler, - on_unavailable_image_handler, + Box::new(on_available_image_handler), + Box::new(on_unavailable_image_handler), ) .unwrap(); @@ -2926,8 +2929,8 @@ mod tests { .add_subscription( str_to_c(CHANNEL), STREAM_ID, - on_available_image_handler, - on_unavailable_image_handler, + Box::new(on_available_image_handler), + Box::new(on_unavailable_image_handler), ) .unwrap(); @@ -2961,7 +2964,7 @@ mod tests { fn should_call_error_handler_when_inter_service_timeout_exceeded() { let test = ClientConductorTest::new(); - test.conductor.lock().unwrap().set_error_handler(error_handler1); + test.conductor.lock().unwrap().set_error_handler(Box::new(error_handler1)); test.conductor .lock() .unwrap() @@ -2991,7 +2994,7 @@ mod tests { fn should_call_error_handler_when_driver_inactive_on_idle() { let mut test = ClientConductorTest::new(); - test.conductor.lock().unwrap().set_error_handler(error_handler3); + test.conductor.lock().unwrap().set_error_handler(Box::new(error_handler3)); test.do_work_until_driver_timeout(); @@ -3012,7 +3015,7 @@ mod tests { fn should_exception_when_add_publication_after_driver_inactive() { let mut test = ClientConductorTest::new(); - test.conductor.lock().unwrap().set_error_handler(error_handler4); + test.conductor.lock().unwrap().set_error_handler(Box::new(error_handler4)); test.do_work_until_driver_timeout(); @@ -3037,7 +3040,7 @@ mod tests { fn should_exception_when_release_publication_after_driver_inactive() { let mut test = ClientConductorTest::new(); - test.conductor.lock().unwrap().set_error_handler(error_handler5); + test.conductor.lock().unwrap().set_error_handler(Box::new(error_handler5)); test.do_work_until_driver_timeout(); let called: bool = ERR_HANDLER_CALLED5.load(Ordering::SeqCst); @@ -3061,7 +3064,7 @@ mod tests { fn should_exception_when_add_subscription_after_driver_inactive() { let mut test = ClientConductorTest::new(); - test.conductor.lock().unwrap().set_error_handler(error_handler6); + test.conductor.lock().unwrap().set_error_handler(Box::new(error_handler6)); test.do_work_until_driver_timeout(); let called: bool = ERR_HANDLER_CALLED6.load(Ordering::SeqCst); @@ -3070,17 +3073,12 @@ mod tests { let result = test.conductor.lock().unwrap().add_subscription( str_to_c(CHANNEL), STREAM_ID, - on_available_image_handler, - on_unavailable_image_handler, + Box::new(on_available_image_handler), + Box::new(on_unavailable_image_handler), ); assert_that!(&result.err().unwrap(), has_structure!(AeronError::DriverTimeout[any_value()])); } - fn error_handler7(error: AeronError) { - ERR_HANDLER_CALLED7.store(true, Ordering::SeqCst); - assert_that!(&error, has_structure!(AeronError::DriverTimeout[any_value()])); - } - lazy_static! { pub static ref ERR_HANDLER_CALLED7: AtomicBool = AtomicBool::from(false); } @@ -3089,7 +3087,10 @@ mod tests { fn should_exception_when_release_subscription_after_driver_inactive() { let mut test = ClientConductorTest::new(); - test.conductor.lock().unwrap().set_error_handler(error_handler7); + test.conductor.lock().unwrap().set_error_handler(Box::new(|error| { + ERR_HANDLER_CALLED7.store(true, Ordering::SeqCst); + assert_that!(&error, has_structure!(AeronError::DriverTimeout[any_value()])); + })); test.do_work_until_driver_timeout(); let called: bool = ERR_HANDLER_CALLED7.load(Ordering::SeqCst); @@ -3125,7 +3126,7 @@ mod tests { test.conductor .lock() .unwrap() - .set_on_new_publication_handler(on_new_publication_handler1); + .set_on_new_publication_handler(Box::new(on_new_publication_handler1)); test.conductor.lock().unwrap().on_new_publication( id, @@ -3163,15 +3164,15 @@ mod tests { .add_subscription( str_to_c(CHANNEL), STREAM_ID, - on_available_image_handler, - on_unavailable_image_handler, + Box::new(on_available_image_handler), + Box::new(on_unavailable_image_handler), ) .unwrap(); test.conductor .lock() .unwrap() - .set_on_new_subscription_handler(on_new_subscription_handler1); + .set_on_new_subscription_handler(Box::new(on_new_subscription_handler1)); test.conductor .lock() @@ -3210,8 +3211,8 @@ mod tests { .add_subscription( str_to_c(CHANNEL), STREAM_ID, - on_available_image_handler2, - on_unavailable_image_handler, + Box::new(on_available_image_handler2), + Box::new(on_unavailable_image_handler), ) .unwrap(); let correlation_id = id + 1; @@ -3219,7 +3220,7 @@ mod tests { test.conductor .lock() .unwrap() - .set_on_new_subscription_handler(on_new_subscription_handler2); + .set_on_new_subscription_handler(Box::new(on_new_subscription_handler2)); test.conductor .lock() @@ -3272,15 +3273,15 @@ mod tests { .add_subscription( str_to_c(CHANNEL), STREAM_ID, - on_available_image_handler3, - on_unavailable_image_handler, + Box::new(on_available_image_handler3), + Box::new(on_unavailable_image_handler), ) .unwrap(); let correlation_id = id + 1; test.conductor .lock() .unwrap() - .set_on_new_subscription_handler(on_new_subscription_handler3); + .set_on_new_subscription_handler(Box::new(on_new_subscription_handler3)); // must be able to handle newImage even if find_subscription not called test.conductor.lock().unwrap().on_available_image( @@ -3328,15 +3329,15 @@ mod tests { .add_subscription( str_to_c(CHANNEL), STREAM_ID, - on_available_image_handler4, - on_unavailable_image_handler, + Box::new(on_available_image_handler4), + Box::new(on_unavailable_image_handler), ) .unwrap(); let correlation_id = id + 1; test.conductor .lock() .unwrap() - .set_on_new_subscription_handler(on_new_subscription_handler4); + .set_on_new_subscription_handler(Box::new(on_new_subscription_handler4)); test.conductor .lock() @@ -3394,15 +3395,15 @@ mod tests { .add_subscription( str_to_c(CHANNEL), STREAM_ID, - on_available_image_handler5, - on_unavailable_image_handler5, + Box::new(on_available_image_handler5), + Box::new(on_unavailable_image_handler5), ) .unwrap(); let correlation_id = id + 1; test.conductor .lock() .unwrap() - .set_on_new_subscription_handler(on_new_subscription_handler5); + .set_on_new_subscription_handler(Box::new(on_new_subscription_handler5)); test.conductor .lock() @@ -3460,15 +3461,15 @@ mod tests { .add_subscription( str_to_c(CHANNEL), STREAM_ID, - on_available_image_handler6, - on_unavailable_image_handler6, + Box::new(on_available_image_handler6), + Box::new(on_unavailable_image_handler6), ) .unwrap(); let correlation_id = id + 1; test.conductor .lock() .unwrap() - .set_on_new_subscription_handler(on_new_subscription_handler6); + .set_on_new_subscription_handler(Box::new(on_new_subscription_handler6)); // must be able to handle newImage even if find_subscription not called test.conductor.lock().unwrap().on_available_image( @@ -3521,15 +3522,15 @@ mod tests { .add_subscription( str_to_c(CHANNEL), STREAM_ID, - on_available_image_handler7, - on_unavailable_image_handler7, + Box::new(on_available_image_handler7), + Box::new(on_unavailable_image_handler7), ) .unwrap(); let correlation_id = id + 1; test.conductor .lock() .unwrap() - .set_on_new_subscription_handler(on_new_subscription_handler7); + .set_on_new_subscription_handler(Box::new(on_new_subscription_handler7)); test.conductor .lock() @@ -3588,15 +3589,15 @@ mod tests { .add_subscription( str_to_c(CHANNEL), STREAM_ID, - on_available_image_handler8, - on_unavailable_image_handler8, + Box::new(on_available_image_handler8), + Box::new(on_unavailable_image_handler8), ) .unwrap(); let correlation_id = id + 1; test.conductor .lock() .unwrap() - .set_on_new_subscription_handler(on_new_subscription_handler8); + .set_on_new_subscription_handler(Box::new(on_new_subscription_handler8)); test.conductor .lock() @@ -3699,8 +3700,8 @@ mod tests { .add_subscription( str_to_c(CHANNEL), STREAM_ID, - on_available_image_handler, - on_unavailable_image_handler, + Box::new(on_available_image_handler), + Box::new(on_unavailable_image_handler), ) .unwrap(); @@ -3744,8 +3745,8 @@ mod tests { .add_subscription( str_to_c(CHANNEL), STREAM_ID, - on_available_image_handler, - on_unavailable_image_handler, + Box::new(on_available_image_handler), + Box::new(on_unavailable_image_handler), ) .unwrap(); @@ -3804,8 +3805,8 @@ mod tests { .add_subscription( str_to_c(CHANNEL), STREAM_ID, - on_available_image_handler, - on_unavailable_image_handler, + Box::new(on_available_image_handler), + Box::new(on_unavailable_image_handler), ) .unwrap(); let correlation_id = id + 1; @@ -3911,7 +3912,7 @@ mod tests { test.conductor .lock() .unwrap() - .add_on_available_counter_handler(on_available_counter1); + .add_on_available_counter_handler(Box::new(on_available_counter1)); let no_key_buffer = Vec::with_capacity(1); let id = test @@ -4042,7 +4043,7 @@ mod tests { test.conductor .lock() .unwrap() - .add_on_available_counter_handler(on_available_counter2); + .add_on_available_counter_handler(Box::new(on_available_counter2)); test.conductor.lock().unwrap().on_available_counter(id1, COUNTER_ID); test.conductor.lock().unwrap().on_available_counter(id2, COUNTER_ID); @@ -4142,7 +4143,7 @@ mod tests { test.conductor .lock() .unwrap() - .add_on_unavailable_counter_handler(on_unavailable_counter1); + .add_on_unavailable_counter_handler(Box::new(on_unavailable_counter1)); test.conductor.lock().unwrap().on_unavailable_counter(id, COUNTER_ID); diff --git a/src/concurrent/agent_invoker.rs b/src/concurrent/agent_invoker.rs index 20b1d55..f16e329 100644 --- a/src/concurrent/agent_invoker.rs +++ b/src/concurrent/agent_invoker.rs @@ -19,14 +19,14 @@ use crate::concurrent::{agent_runner::Agent, logbuffer::term_reader::ErrorHandle pub struct AgentInvoker { agent: Arc>, - exception_handler: ErrorHandler, + exception_handler: Box, is_started: bool, is_running: bool, is_closed: bool, } impl AgentInvoker { - pub fn new(agent: Arc>, exception_handler: ErrorHandler) -> Self { + pub fn new(agent: Arc>, exception_handler: Box) -> Self { Self { agent, exception_handler, @@ -73,7 +73,7 @@ impl AgentInvoker { self.is_started = true; let on_start_result = self.agent.lock().expect("Mutex poisoned").on_start(); if let Err(error) = on_start_result { - (self.exception_handler)(error); + self.exception_handler.call(error); self.close(); } else { self.is_running = true; @@ -93,7 +93,7 @@ impl AgentInvoker { if self.is_running { match self.agent.lock().expect("Mutex poisoned").do_work() { - Err(error) => (self.exception_handler)(error), + Err(error) => self.exception_handler.call(error), Ok(wrk_cnt) => work_count = wrk_cnt, } } @@ -111,7 +111,7 @@ impl AgentInvoker { self.is_running = false; self.is_closed = true; if let Err(error) = self.agent.lock().expect("Mutex poisoned").on_close() { - (self.exception_handler)(error); + self.exception_handler.call(error); } } } diff --git a/src/concurrent/agent_runner.rs b/src/concurrent/agent_runner.rs index 52be36e..20a6eb3 100644 --- a/src/concurrent/agent_runner.rs +++ b/src/concurrent/agent_runner.rs @@ -58,7 +58,7 @@ pub struct AgentRunner< > { agent: Arc>, // need mutable Agent here as AgentRunner will change Agent state while running it idle_strategy: Arc, - exception_handler: ErrorHandler, + exception_handler: Box, name: String, } @@ -67,7 +67,12 @@ impl< I: 'static + std::marker::Send + std::marker::Sync + Strategy, > AgentRunner { - pub fn new(agent: Arc>, idle_strategy: Arc, exception_handler: ErrorHandler, name: &str) -> Self { + pub fn new( + agent: Arc>, + idle_strategy: Arc, + exception_handler: Box, + name: &str, + ) -> Self { Self { agent, idle_strategy, @@ -114,7 +119,7 @@ impl< */ pub fn run(&mut self, stop_rx: Receiver) { if let Err(error) = self.agent.lock().expect("Mutex poisoned").on_start() { - (self.exception_handler)(error); + self.exception_handler.call(error); } loop { @@ -127,12 +132,12 @@ impl< match self.agent.lock().expect("Mutex poisoned").do_work() { Ok(work_cnt) => self.idle_strategy.idle_opt(work_cnt), - Err(error) => (self.exception_handler)(error), + Err(error) => self.exception_handler.call(error), } } if let Err(error) = self.agent.lock().expect("Mutex poisoned").on_close() { - (self.exception_handler)(error); + self.exception_handler.call(error); } } } diff --git a/src/concurrent/logbuffer/term_reader.rs b/src/concurrent/logbuffer/term_reader.rs index 9fa9b2f..6e78eaf 100644 --- a/src/concurrent/logbuffer/term_reader.rs +++ b/src/concurrent/logbuffer/term_reader.rs @@ -13,6 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +use dyn_clone::DynClone; use crate::{ concurrent::{ @@ -25,7 +26,18 @@ use crate::{ utils::{bit_utils, errors::AeronError, types::Index}, }; -pub type ErrorHandler = fn(AeronError); +pub trait ErrorHandler: DynClone { + fn call(&self, error: AeronError); +} + +impl ErrorHandler for F +where + F: Fn(AeronError) + Clone + Send, +{ + fn call(&self, error: AeronError) { + self(error) + } +} /** * Fn(&AtomicBuffer, Index, Index, &Header) -> Result<(), AeronError>; @@ -55,7 +67,6 @@ pub fn read( data_handler: &mut impl FnMut(&AtomicBuffer, Index, Index, &Header), fragments_limit: i32, header: &mut Header, - _exception_handler: impl Fn(AeronError), ) -> ReadOutcome { let mut outcome = ReadOutcome { offset: term_offset, @@ -118,10 +129,6 @@ mod tests { }; } - fn error_handler(error: AeronError) { - println!("Error in term reader test: {:?}", error); - } - fn data_handler(_buf: &AtomicBuffer, offset: Index, length: Index, _header: &Header) { println!("Data of length={} received at offset={}", length, offset); } @@ -140,14 +147,7 @@ mod tests { log.put_ordered::(frame_descriptor::length_offset(aligned_frame_length), 0); - let read_outcome = term_reader::read( - log, - term_offset, - &mut data_handler, - INT_MAX, - &mut fragment_header, - error_handler, - ); + let read_outcome = term_reader::read(log, term_offset, &mut data_handler, INT_MAX, &mut fragment_header); assert_eq!(read_outcome.offset, aligned_frame_length); assert_eq!(read_outcome.fragments_read, 1); @@ -161,14 +161,7 @@ mod tests { log.put_ordered::(frame_descriptor::length_offset(0), 0); - let read_outcome = term_reader::read( - log, - term_offset, - &mut data_handler, - INT_MAX, - &mut fragment_header, - error_handler, - ); + let read_outcome = term_reader::read(log, term_offset, &mut data_handler, INT_MAX, &mut fragment_header); assert_eq!(read_outcome.offset, term_offset); assert_eq!(read_outcome.fragments_read, 0); @@ -187,7 +180,7 @@ mod tests { log.put::(frame_descriptor::type_offset(0), data_frame_header::HDR_TYPE_DATA); - let read_outcome = term_reader::read(log, term_offset, &mut data_handler, 1, &mut fragment_header, error_handler); + let read_outcome = term_reader::read(log, term_offset, &mut data_handler, 1, &mut fragment_header); assert_eq!(read_outcome.offset, aligned_frame_length); assert_eq!(read_outcome.fragments_read, 1); @@ -213,14 +206,7 @@ mod tests { log.put_ordered::(frame_descriptor::length_offset(aligned_frame_length * 2), 0); - let read_outcome = term_reader::read( - log, - term_offset, - &mut data_handler, - INT_MAX, - &mut fragment_header, - error_handler, - ); + let read_outcome = term_reader::read(log, term_offset, &mut data_handler, INT_MAX, &mut fragment_header); assert_eq!(read_outcome.offset, aligned_frame_length * 2); assert_eq!(read_outcome.fragments_read, 2); @@ -241,14 +227,7 @@ mod tests { data_frame_header::HDR_TYPE_DATA, ); - let read_outcome = term_reader::read( - log, - start_of_message, - &mut data_handler, - INT_MAX, - &mut fragment_header, - error_handler, - ); + let read_outcome = term_reader::read(log, start_of_message, &mut data_handler, INT_MAX, &mut fragment_header); assert_eq!(read_outcome.offset, LOG_BUFFER_CAPACITY); assert_eq!(read_outcome.fragments_read, 1); @@ -269,14 +248,7 @@ mod tests { data_frame_header::HDR_TYPE_PAD, ); - let read_outcome = term_reader::read( - log, - start_of_message, - &mut data_handler, - INT_MAX, - &mut fragment_header, - error_handler, - ); + let read_outcome = term_reader::read(log, start_of_message, &mut data_handler, INT_MAX, &mut fragment_header); assert_eq!(read_outcome.offset, LOG_BUFFER_CAPACITY); assert_eq!(read_outcome.fragments_read, 0); diff --git a/src/context.rs b/src/context.rs index 5a786c9..65cfd9a 100644 --- a/src/context.rs +++ b/src/context.rs @@ -18,6 +18,8 @@ use std::env; use std::ffi::CString; use std::sync::Arc; +use dyn_clone::DynClone; + use crate::utils::errors::GenericError; use crate::{ cnc_file_descriptor, @@ -51,7 +53,18 @@ pub const NULL_VALUE: i32 = -1; // TODO replace on Option * * @param image that has become available. */ -pub type OnAvailableImage = fn(image: &Image); +pub trait OnAvailableImage: DynClone { + fn call(&self, image: &Image); +} + +impl OnAvailableImage for F +where + F: Fn(&Image) + Clone, +{ + fn call(&self, image: &Image) { + self(image) + } +} /** * Function called by Aeron to deliver notification that an Image has become unavailable for polling. @@ -63,7 +76,18 @@ pub type OnAvailableImage = fn(image: &Image); * * @param image that has become unavailable */ -pub type OnUnavailableImage = fn(image: &Image); +pub trait OnUnavailableImage: DynClone { + fn call(&self, image: &Image); +} + +impl OnUnavailableImage for F +where + F: Fn(&Image) + Clone, +{ + fn call(&self, image: &Image) { + self(image) + } +} /** * Function called by Aeron to deliver notification that the media driver has added a Publication successfully. @@ -76,7 +100,18 @@ pub type OnUnavailableImage = fn(image: &Image); * @param session_id of the Publication * @param correlation_id used by the Publication for adding. Aka the registration_id returned by Aeron::add_publication */ -pub type OnNewPublication = fn(channel: CString, stream_id: i32, session_id: i32, correlation_id: i64); +pub trait OnNewPublication: DynClone { + fn call(&self, channel: CString, stream_id: i32, session_id: i32, correlation_id: i64); +} + +impl OnNewPublication for F +where + F: Fn(CString, i32, i32, i64) + Clone, +{ + fn call(&self, channel: CString, stream_id: i32, session_id: i32, correlation_id: i64) { + self(channel, stream_id, session_id, correlation_id) + } +} /** * Function called by Aeron to deliver notification that the media driver has added a Subscription successfully. @@ -88,7 +123,18 @@ pub type OnNewPublication = fn(channel: CString, stream_id: i32, session_id: i32 * @param stream_id within the channel of the Subscription * @param correlation_id used by the Subscription for adding. Aka the registration_id returned by Aeron::add_subscription */ -pub type OnNewSubscription = fn(channel: CString, stream_id: i32, correlation_id: i64); +pub trait OnNewSubscription: DynClone { + fn call(&self, channel: CString, stream_id: i32, correlation_id: i64); +} + +impl OnNewSubscription for F +where + F: Fn(CString, i32, i64) + Clone, +{ + fn call(&self, channel: CString, stream_id: i32, correlation_id: i64) { + self(channel, stream_id, correlation_id) + } +} /** * Function called by Aeron to deliver notification of a Counter being available. @@ -100,8 +146,18 @@ pub type OnNewSubscription = fn(channel: CString, stream_id: i32, correlation_id * @param registration_id for the counter. * @param counter_id that is available. */ +pub trait OnAvailableCounter: DynClone { + fn call(&self, counters_reader: &CountersReader, registration_id: i64, counter_id: i32); +} -pub type OnAvailableCounter = fn(counters_reader: &CountersReader, registration_id: i64, counter_id: i32); +impl OnAvailableCounter for F +where + F: Fn(&CountersReader, i64, i32) + Clone, +{ + fn call(&self, counters_reader: &CountersReader, registration_id: i64, counter_id: i32) { + self(counters_reader, registration_id, counter_id) + } +} /** * Function called by Aeron to deliver notification of counter being removed. @@ -113,13 +169,35 @@ pub type OnAvailableCounter = fn(counters_reader: &CountersReader, registration_ * @param registration_id for the counter. * @param counter_id that is unavailable. */ -pub type OnUnavailableCounter = fn(counters_reader: &CountersReader, registration_id: i64, counter_id: i32); +pub trait OnUnavailableCounter: DynClone { + fn call(&self, counters_reader: &CountersReader, registration_id: i64, counter_id: i32); +} + +impl OnUnavailableCounter for F +where + F: Fn(&CountersReader, i64, i32) + Clone, +{ + fn call(&self, counters_reader: &CountersReader, registration_id: i64, counter_id: i32) { + self(counters_reader, registration_id, counter_id) + } +} /** * Function called when the Aeron client is closed to notify that the client or any of it associated resources * should not be used after this event. */ -pub type OnCloseClient = fn(); +pub trait OnCloseClient: DynClone { + fn call(&self); +} + +impl OnCloseClient for F +where + F: Fn() + Clone, +{ + fn call(&self) { + self() + } +} const DEFAULT_MEDIA_DRIVER_TIMEOUT_MS: Moment = 10000; const DEFAULT_RESOURCE_LINGER_MS: Moment = 5000; @@ -157,18 +235,17 @@ fn default_on_close_client_handler() {} * It can also set up error handling as well as application callbacks for connection information from the * Media Driver. */ -#[derive(Clone)] pub struct Context { dir_name: String, - error_handler: ErrorHandler, - on_new_publication_handler: OnNewPublication, - on_new_exclusive_publication_handler: OnNewPublication, - on_new_subscription_handler: OnNewSubscription, - on_available_image_handler: OnAvailableImage, - on_unavailable_image_handler: OnUnavailableImage, - on_available_counter_handler: OnAvailableCounter, - on_unavailable_counter_handler: OnUnavailableCounter, - on_close_client_handler: OnCloseClient, + error_handler: Box, + on_new_publication_handler: Box, + on_new_exclusive_publication_handler: Box, + on_new_subscription_handler: Box, + on_available_image_handler: Box, + on_unavailable_image_handler: Box, + on_available_counter_handler: Box, + on_unavailable_counter_handler: Box, + on_close_client_handler: Box, media_driver_timeout: Moment, resource_linger_timeout: Moment, use_conductor_agent_invoker: bool, @@ -177,6 +254,29 @@ pub struct Context { agent_name: String, } +impl Clone for Context { + fn clone(&self) -> Self { + Self { + dir_name: self.dir_name.clone(), + error_handler: dyn_clone::clone_box(&*self.error_handler), + on_new_publication_handler: dyn_clone::clone_box(&*self.on_new_publication_handler), + on_new_exclusive_publication_handler: dyn_clone::clone_box(&*self.on_new_exclusive_publication_handler), + on_new_subscription_handler: dyn_clone::clone_box(&*self.on_new_subscription_handler), + on_available_image_handler: dyn_clone::clone_box(&*self.on_available_image_handler), + on_unavailable_image_handler: dyn_clone::clone_box(&*self.on_unavailable_image_handler), + on_available_counter_handler: dyn_clone::clone_box(&*self.on_available_counter_handler), + on_unavailable_counter_handler: dyn_clone::clone_box(&*self.on_unavailable_counter_handler), + on_close_client_handler: dyn_clone::clone_box(&*self.on_close_client_handler), + media_driver_timeout: self.media_driver_timeout, + resource_linger_timeout: self.resource_linger_timeout, + use_conductor_agent_invoker: self.use_conductor_agent_invoker, + is_on_new_exclusive_publication_handler_set: self.is_on_new_exclusive_publication_handler_set, + pre_touch_mapped_memory: self.pre_touch_mapped_memory, + agent_name: self.agent_name.clone(), + } + } +} + impl Default for Context { fn default() -> Self { Self::new() @@ -187,15 +287,15 @@ impl Context { pub fn new() -> Self { Self { dir_name: Context::default_aeron_path(), - error_handler: default_error_handler, - on_new_publication_handler: default_on_new_publication_handler, - on_new_exclusive_publication_handler: default_on_new_publication_handler, - on_new_subscription_handler: default_on_new_subscription_handler, - on_available_image_handler: default_on_available_image_handler, - on_unavailable_image_handler: default_on_unavailable_image_handler, - on_available_counter_handler: default_on_available_counter_handler, - on_unavailable_counter_handler: default_on_unavailable_counter_handler, - on_close_client_handler: default_on_close_client_handler, + error_handler: Box::new(default_error_handler), + on_new_publication_handler: Box::new(default_on_new_publication_handler), + on_new_exclusive_publication_handler: Box::new(default_on_new_publication_handler), + on_new_subscription_handler: Box::new(default_on_new_subscription_handler), + on_available_image_handler: Box::new(default_on_available_image_handler), + on_unavailable_image_handler: Box::new(default_on_unavailable_image_handler), + on_available_counter_handler: Box::new(default_on_available_counter_handler), + on_unavailable_counter_handler: Box::new(default_on_unavailable_counter_handler), + on_close_client_handler: Box::new(default_on_close_client_handler), media_driver_timeout: DEFAULT_MEDIA_DRIVER_TIMEOUT_MS, resource_linger_timeout: DEFAULT_RESOURCE_LINGER_MS, use_conductor_agent_invoker: false, @@ -207,7 +307,7 @@ impl Context { pub fn conclude(&mut self) -> &Self { if !self.is_on_new_exclusive_publication_handler_set { - self.on_new_exclusive_publication_handler = self.on_new_publication_handler; + self.on_new_exclusive_publication_handler = dyn_clone::clone_box(&*self.on_new_publication_handler); } self @@ -253,13 +353,13 @@ impl Context { * * @see default_error_handler for how the default behavior is handled */ - pub fn set_error_handler(&mut self, handler: ErrorHandler) -> &Self { + pub fn set_error_handler(&mut self, handler: Box) -> &Self { self.error_handler = handler; self } - pub fn error_handler(&self) -> ErrorHandler { - self.error_handler + pub fn error_handler(&self) -> Box { + dyn_clone::clone_box(&*self.error_handler) } /** @@ -268,13 +368,13 @@ impl Context { * @param handler called when add is completed successfully * @return reference to this Context instance */ - pub fn set_new_publication_handler(&mut self, handler: OnNewPublication) -> &Self { + pub fn set_new_publication_handler(&mut self, handler: Box) -> &Self { self.on_new_publication_handler = handler; self } - pub fn new_publication_handler(&self) -> OnNewPublication { - self.on_new_publication_handler + pub fn new_publication_handler(&self) -> Box { + dyn_clone::clone_box(&*self.on_new_publication_handler) } /** @@ -285,14 +385,14 @@ impl Context { * @param handler called when add is completed successfully * @return reference to this Context instance */ - pub fn set_new_exclusive_publication_handler(&mut self, handler: OnNewPublication) -> &Self { + pub fn set_new_exclusive_publication_handler(&mut self, handler: Box) -> &Self { self.on_new_exclusive_publication_handler = handler; self.is_on_new_exclusive_publication_handler_set = true; self } - pub fn new_exclusive_publication_handler(&self) -> OnNewPublication { - self.on_new_exclusive_publication_handler + pub fn new_exclusive_publication_handler(&self) -> Box { + dyn_clone::clone_box(&*self.on_new_exclusive_publication_handler) } /** @@ -301,13 +401,13 @@ impl Context { * @param handler called when add is completed successfully * @return reference to this Context instance */ - pub fn set_new_subscription_handler(&mut self, handler: OnNewSubscription) -> &Self { + pub fn set_new_subscription_handler(&mut self, handler: Box) -> &Self { self.on_new_subscription_handler = handler; self } - pub fn new_subscription_handler(&self) -> OnNewSubscription { - self.on_new_subscription_handler + pub fn new_subscription_handler(&self) -> Box { + dyn_clone::clone_box(&*self.on_new_subscription_handler) } /** @@ -316,13 +416,13 @@ impl Context { * @param handler called when event occurs * @return reference to this Context instance */ - pub fn set_available_image_handler(&mut self, handler: OnAvailableImage) -> &Self { + pub fn set_available_image_handler(&mut self, handler: Box) -> &Self { self.on_available_image_handler = handler; self } - pub fn available_image_handler(&self) -> OnAvailableImage { - self.on_available_image_handler + pub fn available_image_handler(&self) -> Box { + dyn_clone::clone_box(&*self.on_available_image_handler) } /** @@ -331,13 +431,13 @@ impl Context { * @param handler called when event occurs * @return reference to this Context instance */ - pub fn set_unavailable_image_handler(&mut self, handler: OnUnavailableImage) -> &Self { + pub fn set_unavailable_image_handler(&mut self, handler: Box) -> &Self { self.on_unavailable_image_handler = handler; self } - pub fn unavailable_image_handler(&self) -> OnUnavailableImage { - self.on_unavailable_image_handler + pub fn unavailable_image_handler(&self) -> Box { + dyn_clone::clone_box(&*self.on_unavailable_image_handler) } /** @@ -346,13 +446,13 @@ impl Context { * @param handler called when event occurs * @return reference to this Context instance */ - pub fn set_available_counter_handler(&mut self, handler: OnAvailableCounter) -> &Self { + pub fn set_available_counter_handler(&mut self, handler: Box) -> &Self { self.on_available_counter_handler = handler; self } - pub fn available_counter_handler(&self) -> OnAvailableCounter { - self.on_available_counter_handler + pub fn available_counter_handler(&self) -> Box { + dyn_clone::clone_box(&*self.on_available_counter_handler) } /** @@ -361,13 +461,13 @@ impl Context { * @param handler called when event occurs * @return reference to this Context instance */ - pub fn set_unavailable_counter_handler(&mut self, handler: OnUnavailableCounter) -> &Self { + pub fn set_unavailable_counter_handler(&mut self, handler: Box) -> &Self { self.on_unavailable_counter_handler = handler; self } - pub fn unavailable_counter_handler(&self) -> OnUnavailableCounter { - self.on_unavailable_counter_handler + pub fn unavailable_counter_handler(&self) -> Box { + dyn_clone::clone_box(&*self.on_unavailable_counter_handler) } /** @@ -376,13 +476,13 @@ impl Context { * @param handler to be called when the Aeron client is closed. * @return reference to this Context instance. */ - pub fn set_close_client_handler(&mut self, handler: OnCloseClient) -> &Self { + pub fn set_close_client_handler(&mut self, handler: Box) -> &Self { self.on_close_client_handler = handler; self } - pub fn close_client_handler(&self) -> OnCloseClient { - self.on_close_client_handler + pub fn close_client_handler(&self) -> Box { + dyn_clone::clone_box(&*self.on_close_client_handler) } /** diff --git a/src/exclusive_publication.rs b/src/exclusive_publication.rs index 04ffb27..f71793c 100644 --- a/src/exclusive_publication.rs +++ b/src/exclusive_publication.rs @@ -784,13 +784,13 @@ mod tests { local_copy_broadcast_receiver, counters_metadata_buffer, counters_values_buffer, - on_new_publication_handler, - on_new_exclusive_publication_handler, - on_new_subscription_handler, - error_handler, - on_available_counter_handler, - on_unavailable_counter_handler, - on_close_client_handler, + Box::new(on_new_publication_handler), + Box::new(on_new_exclusive_publication_handler), + Box::new(on_new_subscription_handler), + Box::new(error_handler), + Box::new(on_available_counter_handler), + Box::new(on_unavailable_counter_handler), + Box::new(on_close_client_handler), DRIVER_TIMEOUT_MS, RESOURCE_LINGER_TIMEOUT_MS, INTER_SERVICE_TIMEOUT_MS, diff --git a/src/image.rs b/src/image.rs index 092379e..c944b3c 100644 --- a/src/image.rs +++ b/src/image.rs @@ -77,12 +77,10 @@ pub enum ControlledPollAction { * @param header representing the meta data for the data. * @return The action to be taken with regard to the stream position after the callback. */ - -#[derive(Clone)] pub struct Image { source_identity: CString, log_buffers: Arc, - exception_handler: ErrorHandler, + exception_handler: Box, term_buffers: Vec, subscriber_position: UnsafeBufferPosition, header: Header, @@ -97,6 +95,28 @@ pub struct Image { correlation_id: i64, } +impl Clone for Image { + fn clone(&self) -> Self { + Self { + source_identity: self.source_identity.clone(), + log_buffers: self.log_buffers.clone(), + exception_handler: dyn_clone::clone_box(&*self.exception_handler), + term_buffers: self.term_buffers.clone(), + subscriber_position: self.subscriber_position.clone(), + header: self.header.clone(), + is_closed: self.is_closed.clone(), + is_eos: self.is_eos, + term_length_mask: self.term_length_mask, + position_bits_to_shift: self.position_bits_to_shift, + session_id: self.session_id, + join_position: self.join_position, + final_position: self.final_position, + subscription_registration_id: self.subscription_registration_id, + correlation_id: self.correlation_id, + } + } +} + unsafe impl Send for Image {} unsafe impl Sync for Image {} @@ -111,7 +131,7 @@ impl Image { source_identity: CString, subscriber_position: &UnsafeBufferPosition, log_buffers: Arc, - exception_handler: ErrorHandler, + exception_handler: Box, ) -> Image { let header = Header::new( log_buffer_descriptor::initial_term_id( @@ -328,14 +348,8 @@ impl Image { assert!((0..log_buffer_descriptor::PARTITION_COUNT).contains(&index)); let term_buffer = self.term_buffers[index as usize]; - let read_outcome: ReadOutcome = term_reader::read( - term_buffer, - term_offset, - fragment_handler, - fragment_limit, - &mut self.header, - self.exception_handler, - ); + let read_outcome: ReadOutcome = + term_reader::read(term_buffer, term_offset, fragment_handler, fragment_limit, &mut self.header); if read_outcome.fragments_read > 0 { ttrace!("Image {} poll returned: {:?}", self.correlation_id, read_outcome); @@ -978,7 +992,7 @@ mod tests { CString::new(SOURCE_IDENTITY).unwrap(), &image_test.subscriber_position, image_test.log_buffers.clone(), - error_handler, + Box::new(error_handler), ); assert_eq!(image.initial_term_id(), INITIAL_TERM_ID); @@ -1008,7 +1022,7 @@ mod tests { CString::new(SOURCE_IDENTITY).unwrap(), &image_test.subscriber_position, image_test.log_buffers.clone(), - error_handler, + Box::new(error_handler), ); assert_eq!(image.term_buffer_length(), TERM_LENGTH); @@ -1042,7 +1056,7 @@ mod tests { CString::new(SOURCE_IDENTITY).unwrap(), &image_test.subscriber_position, image_test.log_buffers.clone(), - error_handler, + Box::new(error_handler), ); assert_eq!(image_test.subscriber_position.get(), initial_position); @@ -1084,7 +1098,7 @@ mod tests { CString::new(SOURCE_IDENTITY).unwrap(), &image_test.subscriber_position, image_test.log_buffers.clone(), - error_handler, + Box::new(error_handler), ); assert_eq!(image_test.subscriber_position.get(), initial_position); @@ -1126,7 +1140,7 @@ mod tests { CString::new(SOURCE_IDENTITY).unwrap(), &image_test.subscriber_position, image_test.log_buffers.clone(), - error_handler, + Box::new(error_handler), ); assert_eq!(image_test.subscriber_position.get(), initial_position); @@ -1167,7 +1181,7 @@ mod tests { CString::new(SOURCE_IDENTITY).unwrap(), &image_test.subscriber_position, image_test.log_buffers.clone(), - error_handler, + Box::new(error_handler), ); image.close(); @@ -1198,7 +1212,7 @@ mod tests { CString::new(SOURCE_IDENTITY).unwrap(), &image_test.subscriber_position, image_test.log_buffers.clone(), - error_handler, + Box::new(error_handler), ); image.close(); @@ -1230,7 +1244,7 @@ mod tests { CString::new(SOURCE_IDENTITY).unwrap(), &image_test.subscriber_position, image_test.log_buffers.clone(), - error_handler, + Box::new(error_handler), ); assert_eq!(image_test.subscriber_position.get(), initial_position); @@ -1270,7 +1284,7 @@ mod tests { CString::new(SOURCE_IDENTITY).unwrap(), &image_test.subscriber_position, image_test.log_buffers.clone(), - error_handler, + Box::new(error_handler), ); assert_eq!(image_test.subscriber_position.get(), initial_position); @@ -1310,7 +1324,7 @@ mod tests { CString::new(SOURCE_IDENTITY).unwrap(), &image_test.subscriber_position, image_test.log_buffers.clone(), - error_handler, + Box::new(error_handler), ); assert_eq!(image_test.subscriber_position.get(), initial_position); @@ -1344,7 +1358,7 @@ mod tests { CString::new(SOURCE_IDENTITY).unwrap(), &image_test.subscriber_position, image_test.log_buffers.clone(), - error_handler, + Box::new(error_handler), ); assert_eq!(image_test.subscriber_position.get(), initial_position); @@ -1378,7 +1392,7 @@ mod tests { CString::new(SOURCE_IDENTITY).unwrap(), &image_test.subscriber_position, image_test.log_buffers.clone(), - error_handler, + Box::new(error_handler), ); assert_eq!(image_test.subscriber_position.get(), initial_position); @@ -1417,7 +1431,7 @@ mod tests { CString::new(SOURCE_IDENTITY).unwrap(), &image_test.subscriber_position, image_test.log_buffers.clone(), - error_handler, + Box::new(error_handler), ); assert_eq!(image_test.subscriber_position.get(), initial_position); @@ -1452,7 +1466,7 @@ mod tests { CString::new(SOURCE_IDENTITY).unwrap(), &image_test.subscriber_position, image_test.log_buffers.clone(), - error_handler, + Box::new(error_handler), ); assert_eq!(image_test.subscriber_position.get(), initial_position); @@ -1493,7 +1507,7 @@ mod tests { CString::new(SOURCE_IDENTITY).unwrap(), &image_test.subscriber_position, image_test.log_buffers.clone(), - error_handler, + Box::new(error_handler), ); assert_eq!(image_test.subscriber_position.get(), initial_position); @@ -1531,7 +1545,7 @@ mod tests { CString::new(SOURCE_IDENTITY).unwrap(), &image_test.subscriber_position, image_test.log_buffers.clone(), - error_handler, + Box::new(error_handler), ); assert_eq!(image_test.subscriber_position.get(), initial_position); @@ -1573,7 +1587,7 @@ mod tests { CString::new(SOURCE_IDENTITY).unwrap(), &image_test.subscriber_position, image_test.log_buffers.clone(), - error_handler, + Box::new(error_handler), ); assert_eq!(image_test.subscriber_position.get(), initial_position); @@ -1615,7 +1629,7 @@ mod tests { CString::new(SOURCE_IDENTITY).unwrap(), &image_test.subscriber_position, image_test.log_buffers.clone(), - error_handler, + Box::new(error_handler), ); assert_eq!(image_test.subscriber_position.get(), initial_position); @@ -1658,7 +1672,7 @@ mod tests { CString::new(SOURCE_IDENTITY).unwrap(), &image_test.subscriber_position, image_test.log_buffers.clone(), - error_handler, + Box::new(error_handler), ); assert_eq!(image_test.subscriber_position.get(), initial_position); @@ -1698,7 +1712,7 @@ mod tests { CString::new(SOURCE_IDENTITY).unwrap(), &image_test.subscriber_position, image_test.log_buffers.clone(), - error_handler, + Box::new(error_handler), ); assert_eq!(image_test.subscriber_position.get(), initial_position); @@ -1738,7 +1752,7 @@ mod tests { CString::new(SOURCE_IDENTITY).unwrap(), &image_test.subscriber_position, image_test.log_buffers.clone(), - error_handler, + Box::new(error_handler), ); assert_eq!(image_test.subscriber_position.get(), initial_position); @@ -1773,7 +1787,7 @@ mod tests { CString::new(SOURCE_IDENTITY).unwrap(), &image_test.subscriber_position, image_test.log_buffers.clone(), - error_handler, + Box::new(error_handler), ); assert_eq!(image_test.subscriber_position.get(), initial_position); @@ -1808,7 +1822,7 @@ mod tests { CString::new(SOURCE_IDENTITY).unwrap(), &image_test.subscriber_position, image_test.log_buffers.clone(), - error_handler, + Box::new(error_handler), ); assert_eq!(image_test.subscriber_position.get(), initial_position); diff --git a/src/publication.rs b/src/publication.rs index 1732a97..6d229c1 100644 --- a/src/publication.rs +++ b/src/publication.rs @@ -882,13 +882,13 @@ mod tests { local_copy_broadcast_receiver, counters_metadata_buffer, counters_values_buffer, - on_new_publication_handler, - on_new_exclusive_publication_handler, - on_new_subscription_handler, - error_handler, - on_available_counter_handler, - on_unavailable_counter_handler, - on_close_client_handler, + Box::new(on_new_publication_handler), + Box::new(on_new_exclusive_publication_handler), + Box::new(on_new_subscription_handler), + Box::new(error_handler), + Box::new(on_available_counter_handler), + Box::new(on_unavailable_counter_handler), + Box::new(on_close_client_handler), DRIVER_TIMEOUT_MS, RESOURCE_LINGER_TIMEOUT_MS, INTER_SERVICE_TIMEOUT_MS, diff --git a/tests/counters.rs b/tests/counters.rs index e283d75..7384156 100644 --- a/tests/counters.rs +++ b/tests/counters.rs @@ -94,12 +94,12 @@ fn test_counter_create() { let mut context_b = Context::new(); context_a.set_agent_name("Client A"); - context_a.set_available_counter_handler(counter_handler_a); - context_a.set_unavailable_counter_handler(gone_counter_handler_a); + context_a.set_available_counter_handler(Box::new(counter_handler_a)); + context_a.set_unavailable_counter_handler(Box::new(gone_counter_handler_a)); context_b.set_agent_name("Client B"); - context_b.set_available_counter_handler(counter_handler_b); - context_b.set_unavailable_counter_handler(gone_counter_handler_b); + context_b.set_available_counter_handler(Box::new(counter_handler_b)); + context_b.set_unavailable_counter_handler(Box::new(gone_counter_handler_b)); let mut aeron_a = Aeron::new(context_a).expect("Error creating Aeron A instance"); let aeron_b = Aeron::new(context_b).expect("Error creating Aeron B instance"); diff --git a/tests/publish_subscribe.rs b/tests/publish_subscribe.rs index 50eedb7..f290b34 100644 --- a/tests/publish_subscribe.rs +++ b/tests/publish_subscribe.rs @@ -66,8 +66,8 @@ fn test_publication_create() { let mut context = Context::new(); - context.set_new_publication_handler(on_new_publication_handler); - context.set_error_handler(error_handler); + context.set_new_publication_handler(Box::new(on_new_publication_handler)); + context.set_error_handler(Box::new(error_handler)); context.set_pre_touch_mapped_memory(true); let mut aeron = Aeron::new(context).expect("Error creating Aeron instance"); @@ -100,8 +100,8 @@ fn test_subscription_create() { let mut context = Context::new(); - context.set_new_subscription_handler(on_new_subscription_handler); - context.set_error_handler(error_handler); + context.set_new_subscription_handler(Box::new(on_new_subscription_handler)); + context.set_error_handler(Box::new(error_handler)); context.set_pre_touch_mapped_memory(true); let mut aeron = Aeron::new(context).expect("Error creating Aeron instance"); @@ -149,8 +149,8 @@ fn test_unfragmented_msg() { let mut context = Context::new(); - context.set_new_subscription_handler(on_new_subscription_handler); - context.set_error_handler(error_handler); + context.set_new_subscription_handler(Box::new(on_new_subscription_handler)); + context.set_error_handler(Box::new(error_handler)); context.set_pre_touch_mapped_memory(true); let mut aeron = Aeron::new(context).expect("Error creating Aeron instance"); @@ -230,8 +230,8 @@ fn test_fragmented_msg() { let mut context = Context::new(); - context.set_new_subscription_handler(on_new_subscription_handler); - context.set_error_handler(error_handler); + context.set_new_subscription_handler(Box::new(on_new_subscription_handler)); + context.set_error_handler(Box::new(error_handler)); context.set_pre_touch_mapped_memory(true); let mut aeron = Aeron::new(context).expect("Error creating Aeron instance"); @@ -344,8 +344,8 @@ fn test_sequential_consistency() { let mut context = Context::new(); - context.set_new_subscription_handler(on_new_subscription_handler); - context.set_error_handler(error_handler); + context.set_new_subscription_handler(Box::new(on_new_subscription_handler)); + context.set_error_handler(Box::new(error_handler)); context.set_pre_touch_mapped_memory(true); let mut aeron = Aeron::new(context).expect("Error creating Aeron instance");