Skip to content

Commit 7f551be

Browse files
Provide subscription handler interface (adnanademovic#186)
1 parent dbb156e commit 7f551be

File tree

9 files changed

+115
-50
lines changed

9 files changed

+115
-50
lines changed

‎rosrust/src/api/handlers.rs‎

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
use crate::Message;
2+
use std::collections::HashMap;
3+
use std::marker::PhantomData;
4+
5+
/// Handles all calls involved with a subscription
6+
///
7+
/// Each subscription is done in one thread, so there is no synchronization necessary between
8+
/// these calls.
9+
pub trait SubscriptionHandler<T>: Send + 'static {
10+
/// Called before any message is accepted from a certain caller ID
11+
///
12+
/// Contains the headers for handling the specific connection
13+
fn connection(&mut self, headers: HashMap<String, String>);
14+
15+
/// Called upon receiving any message
16+
fn message(&mut self, message: T, callerid: &str);
17+
}
18+
19+
pub struct CallbackSubscriptionHandler<T, F, G> {
20+
on_message: F,
21+
on_connect: G,
22+
_phantom: PhantomData<T>,
23+
}
24+
25+
impl<T, F, G> CallbackSubscriptionHandler<T, F, G>
26+
where
27+
T: Message,
28+
F: Fn(T, &str) + Send + 'static,
29+
G: Fn(HashMap<String, String>) + Send + 'static,
30+
{
31+
pub fn new(on_message: F, on_connect: G) -> Self {
32+
Self {
33+
on_message,
34+
on_connect,
35+
_phantom: PhantomData,
36+
}
37+
}
38+
}
39+
40+
impl<T, F, G> SubscriptionHandler<T> for CallbackSubscriptionHandler<T, F, G>
41+
where
42+
T: Message,
43+
F: Fn(T, &str) + Send + 'static,
44+
G: Fn(HashMap<String, String>) + Send + 'static,
45+
{
46+
fn connection(&mut self, headers: HashMap<String, String>) {
47+
(self.on_connect)(headers)
48+
}
49+
50+
fn message(&mut self, message: T, callerid: &str) {
51+
(self.on_message)(message, callerid)
52+
}
53+
}

‎rosrust/src/api/mod.rs‎

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ pub mod raii;
1111
pub mod resolve;
1212
mod ros;
1313
mod slave;
14+
pub mod handlers;
1415

1516
pub struct ShutdownManager {
1617
should_shutdown: AtomicBool,

‎rosrust/src/api/raii.rs‎

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,8 @@ use super::master::Master;
44
use super::slave::Slave;
55
use crate::rosxmlrpc::Response;
66
use crate::tcpros::{Message, PublisherStream, ServicePair, ServiceResult};
7-
use crate::RawMessageDescription;
7+
use crate::{RawMessageDescription, SubscriptionHandler};
88
use log::error;
9-
use std::collections::HashMap;
109
use std::sync::atomic::AtomicUsize;
1110
use std::sync::Arc;
1211

@@ -101,20 +100,18 @@ pub struct Subscriber {
101100
}
102101

103102
impl Subscriber {
104-
pub(crate) fn new<T, F, G>(
103+
pub(crate) fn new<T, H>(
105104
master: Arc<Master>,
106105
slave: Arc<Slave>,
107106
name: &str,
108107
queue_size: usize,
109-
on_message: F,
110-
on_connect: G,
108+
handler: H,
111109
) -> Result<Self>
112110
where
113111
T: Message,
114-
F: Fn(T, &str) + Send + 'static,
115-
G: Fn(HashMap<String, String>) + Send + 'static,
112+
H: SubscriptionHandler<T>,
116113
{
117-
let id = slave.add_subscription::<T, F, G>(name, queue_size, on_message, on_connect)?;
114+
let id = slave.add_subscription::<T, H>(name, queue_size, handler)?;
118115

119116
let info = Arc::new(InteractorRaii::new(SubscriberInfo {
120117
master,

‎rosrust/src/api/ros.rs‎

Lines changed: 27 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,12 @@ use super::raii::{Publisher, Service, Subscriber};
77
use super::resolve;
88
use super::slave::Slave;
99
use crate::api::clock::Delay;
10+
use crate::api::handlers::CallbackSubscriptionHandler;
1011
use crate::api::ShutdownManager;
1112
use crate::msg::rosgraph_msgs::{Clock as ClockMsg, Log};
1213
use crate::msg::std_msgs::Header;
1314
use crate::tcpros::{Client, Message, ServicePair, ServiceResult};
14-
use crate::{RawMessage, RawMessageDescription};
15+
use crate::{RawMessage, RawMessageDescription, SubscriptionHandler};
1516
use error_chain::bail;
1617
use log::error;
1718
use ros_message::{Duration, Time};
@@ -293,13 +294,35 @@ impl Ros {
293294
queue_size = usize::max_value();
294295
}
295296
let name = self.resolver.translate(topic)?;
296-
Subscriber::new::<T, F, G>(
297+
Subscriber::new::<T, _>(
297298
Arc::clone(&self.master),
298299
Arc::clone(&self.slave),
299300
&name,
300301
queue_size,
301-
on_message,
302-
on_connect,
302+
CallbackSubscriptionHandler::new(on_message, on_connect),
303+
)
304+
}
305+
306+
pub fn subscribe_with<T, H>(
307+
&self,
308+
topic: &str,
309+
mut queue_size: usize,
310+
handler: H,
311+
) -> Result<Subscriber>
312+
where
313+
T: Message,
314+
H: SubscriptionHandler<T>,
315+
{
316+
if queue_size == 0 {
317+
queue_size = usize::max_value();
318+
}
319+
let name = self.resolver.translate(topic)?;
320+
Subscriber::new::<T, H>(
321+
Arc::clone(&self.master),
322+
Arc::clone(&self.slave),
323+
&name,
324+
queue_size,
325+
handler,
303326
)
304327
}
305328

‎rosrust/src/api/slave/mod.rs‎

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use super::error::{self, ErrorKind, Result};
77
use crate::api::ShutdownManager;
88
use crate::tcpros::{Message, PublisherStream, Service, ServicePair, ServiceResult};
99
use crate::util::{kill, FAILED_TO_LOCK};
10-
use crate::RawMessageDescription;
10+
use crate::{RawMessageDescription, SubscriptionHandler};
1111
use crossbeam::channel::TryRecvError;
1212
use error_chain::bail;
1313
use log::error;
@@ -148,20 +148,18 @@ impl Slave {
148148
}
149149

150150
#[inline]
151-
pub fn add_subscription<T, F, G>(
151+
pub fn add_subscription<T, H>(
152152
&self,
153153
topic: &str,
154154
queue_size: usize,
155-
on_message: F,
156-
on_connect: G,
155+
handler: H,
157156
) -> Result<usize>
158157
where
159158
T: Message,
160-
F: Fn(T, &str) + Send + 'static,
161-
G: Fn(HashMap<String, String>) + Send + 'static,
159+
H: SubscriptionHandler<T>,
162160
{
163161
self.subscriptions
164-
.add(&self.name, topic, queue_size, on_message, on_connect)
162+
.add(&self.name, topic, queue_size, handler)
165163
}
166164

167165
#[inline]

‎rosrust/src/api/slave/subscriptions.rs‎

Lines changed: 4 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use crate::api::error::{self, ErrorKind, Result};
22
use crate::tcpros::{SubscriberRosConnection, Topic};
33
use crate::util::FAILED_TO_LOCK;
4-
use crate::Message;
4+
use crate::{Message, SubscriptionHandler};
55
use error_chain::bail;
66
use log::error;
77
use std::collections::{BTreeSet, HashMap};
@@ -51,18 +51,10 @@ impl SubscriptionsTracker {
5151
.collect()
5252
}
5353

54-
pub fn add<T, F, G>(
55-
&self,
56-
name: &str,
57-
topic: &str,
58-
queue_size: usize,
59-
on_message: F,
60-
on_connect: G,
61-
) -> Result<usize>
54+
pub fn add<T, H>(&self, name: &str, topic: &str, queue_size: usize, handler: H) -> Result<usize>
6255
where
6356
T: Message,
64-
F: Fn(T, &str) + Send + 'static,
65-
G: Fn(HashMap<String, String>) + Send + 'static,
57+
H: SubscriptionHandler<T>,
6658
{
6759
let msg_definition = T::msg_definition();
6860
let msg_type = T::msg_type();
@@ -92,7 +84,7 @@ impl SubscriptionsTracker {
9284
)
9385
.into())
9486
} else {
95-
Ok(connection.add_subscriber(queue_size, on_message, on_connect))
87+
Ok(connection.add_subscriber(queue_size, handler))
9688
}
9789
}
9890

‎rosrust/src/lib.rs‎

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
#![recursion_limit = "1024"]
22

33
pub use crate::api::raii::{Publisher, Service, Subscriber};
4+
pub use crate::api::handlers::{SubscriptionHandler};
45
pub use crate::api::{error, Clock, Parameter};
56
pub use crate::raw_message::{RawMessage, RawMessageDescription};
67
#[doc(hidden)]

‎rosrust/src/singleton.rs‎

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use crate::error::{ErrorKind, Result};
55
use crate::rosxmlrpc::Response;
66
use crate::tcpros::{Client, Message, ServicePair, ServiceResult};
77
use crate::util::FAILED_TO_LOCK;
8-
use crate::RawMessageDescription;
8+
use crate::{RawMessageDescription, SubscriptionHandler};
99
use crossbeam::sync::ShardedLock;
1010
use ctrlc;
1111
use error_chain::bail;
@@ -198,6 +198,15 @@ where
198198
ros!().subscribe_with_ids_and_headers::<T, F, G>(topic, queue_size, on_message, on_connect)
199199
}
200200

201+
#[inline]
202+
pub fn subscribe_with<T, H>(topic: &str, queue_size: usize, handler: H) -> Result<Subscriber>
203+
where
204+
T: Message,
205+
H: SubscriptionHandler<T>,
206+
{
207+
ros!().subscribe_with::<T, H>(topic, queue_size, handler)
208+
}
209+
201210
#[inline]
202211
pub fn publish<T>(topic: &str, queue_size: usize) -> Result<Publisher<T>>
203212
where

‎rosrust/src/tcpros/subscriber.rs‎

Lines changed: 9 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use super::header::{decode, encode, match_field};
33
use super::{Message, Topic};
44
use crate::rosmsg::RosMsg;
55
use crate::util::lossy_channel::{lossy_channel, LossyReceiver, LossySender};
6+
use crate::SubscriptionHandler;
67
use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
78
use crossbeam::channel::{bounded, select, Receiver, Sender, TrySendError};
89
use log::error;
@@ -77,16 +78,10 @@ impl SubscriberRosConnection {
7778
// This creates a new thread to call on_message. Next API change should
7879
// allow subscribing with either callback or inline handler of the queue.
7980
// The queue is lossy, so it wouldn't be blocking.
80-
pub fn add_subscriber<T, F, G>(
81-
&mut self,
82-
queue_size: usize,
83-
on_message: F,
84-
on_connect: G,
85-
) -> usize
81+
pub fn add_subscriber<T, H>(&mut self, queue_size: usize, handler: H) -> usize
8682
where
8783
T: Message,
88-
F: Fn(T, &str) + Send + 'static,
89-
G: Fn(HashMap<String, String>) + Send + 'static,
84+
H: SubscriptionHandler<T>,
9085
{
9186
let data_stream_id = self.next_data_stream_id;
9287
self.connected_ids.insert(data_stream_id);
@@ -105,9 +100,7 @@ impl SubscriberRosConnection {
105100
// TODO: we might want to panic here
106101
error!("Subscriber failed to connect to data stream");
107102
}
108-
thread::spawn(move || {
109-
handle_data::<T, F, G>(data_rx, connection_rx, on_message, on_connect)
110-
});
103+
thread::spawn(move || handle_data::<T, H>(data_rx, connection_rx, handler));
111104
data_stream_id
112105
}
113106

@@ -176,29 +169,27 @@ impl SubscriberRosConnection {
176169
}
177170
}
178171

179-
fn handle_data<T, F, G>(
172+
fn handle_data<T, H>(
180173
data: LossyReceiver<MessageInfo>,
181174
connections: Receiver<HashMap<String, String>>,
182-
on_message: F,
183-
on_connect: G,
175+
mut handler: H,
184176
) where
185177
T: Message,
186-
F: Fn(T, &str),
187-
G: Fn(HashMap<String, String>) + Send + 'static,
178+
H: SubscriptionHandler<T>,
188179
{
189180
loop {
190181
select! {
191182
recv(data.kill_rx.kill_rx) -> _ => break,
192183
recv(data.data_rx) -> msg => match msg {
193184
Err(_) => break,
194185
Ok(buffer) => match RosMsg::decode_slice(&buffer.data) {
195-
Ok(value) => on_message(value, &buffer.caller_id),
186+
Ok(value) => handler.message(value, &buffer.caller_id),
196187
Err(err) => error!("Failed to decode message: {}", err),
197188
},
198189
},
199190
recv(connections) -> msg => match msg {
200191
Err(_) => break,
201-
Ok(conn) => on_connect(conn),
192+
Ok(conn) => handler.connection(conn),
202193
},
203194
}
204195
}

0 commit comments

Comments
 (0)