1414
1515
1616import math
17+ from typing import Optional
1718
1819from geometry_msgs .msg import (PoseWithCovarianceStamped , Quaternion , TransformStamped , Twist ,
1920 TwistStamped , Vector3 )
2021from nav2_simple_commander .line_iterator import LineIterator
2122from nav_msgs .msg import Odometry
2223from nav_msgs .srv import GetMap
24+ import numpy as np
2325import rclpy
2426from rclpy .duration import Duration
2527from rclpy .node import Node
2628from rclpy .qos import DurabilityPolicy , QoSProfile , ReliabilityPolicy
29+ from rclpy .timer import Timer
2730from rosgraph_msgs .msg import Clock
2831from sensor_msgs .msg import LaserScan
2932from tf2_ros import Buffer , TransformBroadcaster , TransformListener
4447
4548class LoopbackSimulator (Node ):
4649
47- def __init__ (self ):
50+ def __init__ (self ) -> None :
4851 super ().__init__ (node_name = 'loopback_simulator' )
4952 self .curr_cmd_vel = None
5053 self .curr_cmd_vel_time = self .get_clock ().now ()
51- self .initial_pose = None
52- self .timer = None
54+ self .initial_pose : PoseWithCovarianceStamped = None
55+ self .timer : Optional [ Timer ] = None
5356 self .setupTimer = None
5457 self .map = None
55- self .mat_base_to_laser = None
58+ self .mat_base_to_laser : Optional [ np . ndarray [ np . float64 , np . dtype [ np . float64 ]]] = None
5659
5760 self .declare_parameter ('update_duration' , 0.01 )
5861 self .update_dur = self .get_parameter ('update_duration' ).get_parameter_value ().double_value
@@ -126,44 +129,44 @@ def __init__(self):
126129
127130 self .info ('Loopback simulator initialized' )
128131
129- def getBaseToLaserTf (self ):
132+ def getBaseToLaserTf (self ) -> None :
130133 try :
131134 # Wait for transform and lookup
132135 transform = self .tf_buffer .lookup_transform (
133136 self .base_frame_id , self .scan_frame_id , rclpy .time .Time ())
134137 self .mat_base_to_laser = transformStampedToMatrix (transform )
135138
136139 except Exception as ex :
137- self .get_logger ().error ('Transform lookup failed: %s' % str (ex ))
140+ self .get_logger ().error (f 'Transform lookup failed: { str (ex )} ' )
138141
139- def setupTimerCallback (self ):
142+ def setupTimerCallback (self ) -> None :
140143 # Publish initial identity odom transform & laser scan to warm up system
141144 self .tf_broadcaster .sendTransform (self .t_odom_to_base_link )
142145 if self .mat_base_to_laser is None :
143146 self .getBaseToLaserTf ()
144147
145- def clockTimerCallback (self ):
148+ def clockTimerCallback (self ) -> None :
146149 msg = Clock ()
147150 msg .clock = self .get_clock ().now ().to_msg ()
148151 self .clock_pub .publish (msg )
149152
150- def cmdVelCallback (self , msg ) :
153+ def cmdVelCallback (self , msg : Twist ) -> None :
151154 self .debug ('Received cmd_vel' )
152155 if self .initial_pose is None :
153156 # Don't consider velocities before the initial pose is set
154157 return
155158 self .curr_cmd_vel = msg
156159 self .curr_cmd_vel_time = self .get_clock ().now ()
157160
158- def cmdVelStampedCallback (self , msg ) :
161+ def cmdVelStampedCallback (self , msg : TwistStamped ) -> None :
159162 self .debug ('Received cmd_vel' )
160163 if self .initial_pose is None :
161164 # Don't consider velocities before the initial pose is set
162165 return
163166 self .curr_cmd_vel = msg .twist
164167 self .curr_cmd_vel_time = rclpy .time .Time .from_msg (msg .header .stamp )
165168
166- def initialPoseCallback (self , msg ) :
169+ def initialPoseCallback (self , msg : PoseWithCovarianceStamped ) -> None :
167170 self .info ('Received initial pose!' )
168171 if self .initial_pose is None :
169172 # Initialize transforms (map->odom as input pose, odom->base_link start from identity)
@@ -202,7 +205,7 @@ def initialPoseCallback(self, msg):
202205 tf_transformations .concatenate_matrices (mat_map_to_base_link , mat_base_link_to_odom )
203206 self .t_map_to_odom .transform = matrixToTransform (mat_map_to_odom )
204207
205- def timerCallback (self ):
208+ def timerCallback (self ) -> None :
206209 # If no data, just republish existing transforms without change
207210 one_sec = Duration (seconds = 1 )
208211 if self .curr_cmd_vel is None or self .get_clock ().now () - self .curr_cmd_vel_time > one_sec :
@@ -227,7 +230,7 @@ def timerCallback(self):
227230 self .publishTransforms (self .t_map_to_odom , self .t_odom_to_base_link )
228231 self .publishOdometry (self .t_odom_to_base_link )
229232
230- def publishLaserScan (self ):
233+ def publishLaserScan (self ) -> None :
231234 # Publish a bogus laser scan for collision monitor
232235 self .scan_msg = LaserScan ()
233236 self .scan_msg .header .stamp = (self .get_clock ().now ()).to_msg ()
@@ -247,15 +250,16 @@ def publishLaserScan(self):
247250 self .getLaserScan (num_samples )
248251 self .scan_pub .publish (self .scan_msg )
249252
250- def publishTransforms (self , map_to_odom , odom_to_base_link ):
253+ def publishTransforms (self , map_to_odom : TransformStamped ,
254+ odom_to_base_link : TransformStamped ) -> None :
251255 map_to_odom .header .stamp = \
252256 (self .get_clock ().now () + Duration (seconds = self .update_dur )).to_msg ()
253257 odom_to_base_link .header .stamp = self .get_clock ().now ().to_msg ()
254258 if self .publish_map_odom_tf :
255259 self .tf_broadcaster .sendTransform (map_to_odom )
256260 self .tf_broadcaster .sendTransform (odom_to_base_link )
257261
258- def publishOdometry (self , odom_to_base_link ) :
262+ def publishOdometry (self , odom_to_base_link : TransformStamped ) -> None :
259263 odom = Odometry ()
260264 odom .header .stamp = self .get_clock ().now ().to_msg ()
261265 odom .header .frame_id = 'odom'
@@ -266,22 +270,23 @@ def publishOdometry(self, odom_to_base_link):
266270 odom .twist .twist = self .curr_cmd_vel
267271 self .odom_pub .publish (odom )
268272
269- def info (self , msg ) :
273+ def info (self , msg : str ) -> None :
270274 self .get_logger ().info (msg )
271275 return
272276
273- def debug (self , msg ) :
277+ def debug (self , msg : str ) -> None :
274278 self .get_logger ().debug (msg )
275279 return
276280
277- def getMap (self ):
281+ def getMap (self ) -> None :
278282 request = GetMap .Request ()
279283 if self .map_client .wait_for_service (timeout_sec = 5.0 ):
280284 # Request to get map
281285 future = self .map_client .call_async (request )
282286 rclpy .spin_until_future_complete (self , future )
283- if future .result () is not None :
284- self .map = future .result ().map
287+ result = future .result ()
288+ if result is not None :
289+ self .map = result .map
285290 self .get_logger ().info ('Laser scan will be populated using map data' )
286291 else :
287292 self .get_logger ().warn (
@@ -294,7 +299,7 @@ def getMap(self):
294299 'Laser scan will be populated using max range'
295300 )
296301
297- def getLaserPose (self ):
302+ def getLaserPose (self ) -> tuple [ float , float , float ] :
298303 mat_map_to_odom = transformStampedToMatrix (self .t_map_to_odom )
299304 mat_odom_to_base = transformStampedToMatrix (self .t_odom_to_base_link )
300305
@@ -316,7 +321,7 @@ def getLaserPose(self):
316321
317322 return x , y , theta
318323
319- def getLaserScan (self , num_samples ) :
324+ def getLaserScan (self , num_samples : int ) -> None :
320325 if self .map is None or self .initial_pose is None or self .mat_base_to_laser is None :
321326 self .scan_msg .ranges = [self .scan_msg .range_max - 0.1 ] * num_samples
322327 return
@@ -358,7 +363,7 @@ def getLaserScan(self, num_samples):
358363 line_iterator .advance ()
359364
360365
361- def main ():
366+ def main () -> None :
362367 rclpy .init ()
363368 loopback_simulator = LoopbackSimulator ()
364369 rclpy .spin (loopback_simulator )
0 commit comments