1212# See the License for the specific language governing permissions and
1313# limitations under the License.
1414
15+ from collections .abc import Generator
1516import tempfile
16- from typing import Dict , List , Optional , Text
17+ from typing import Optional , TypeAlias , Union
1718
1819import launch
1920import yaml
2021
22+ YamlValue : TypeAlias = Union [str , int , float , bool ]
23+
2124
2225class DictItemReference :
2326
24- def __init__ (self , dictionary , key ):
27+ def __init__ (self , dictionary : dict [ str , YamlValue ], key : str ):
2528 self .dictionary = dictionary
2629 self .dictKey = key
2730
28- def key (self ):
31+ def key (self ) -> str :
2932 return self .dictKey
3033
31- def setValue (self , value ) :
34+ def setValue (self , value : YamlValue ) -> None :
3235 self .dictionary [self .dictKey ] = value
3336
3437
35- class RewrittenYaml (launch .Substitution ):
38+ class RewrittenYaml (launch .Substitution ): # type: ignore[misc]
3639 """
3740 Substitution that modifies the given YAML file.
3841
@@ -42,10 +45,10 @@ class RewrittenYaml(launch.Substitution):
4245 def __init__ (
4346 self ,
4447 source_file : launch .SomeSubstitutionsType ,
45- param_rewrites : Dict ,
48+ param_rewrites : dict [ str , launch . SomeSubstitutionsType ] ,
4649 root_key : Optional [launch .SomeSubstitutionsType ] = None ,
47- key_rewrites : Optional [Dict ] = None ,
48- convert_types = False ,
50+ key_rewrites : Optional [dict [ str , launch . SomeSubstitutionsType ] ] = None ,
51+ convert_types : bool = False ,
4952 ) -> None :
5053 super ().__init__ ()
5154 """
@@ -61,7 +64,8 @@ def __init__(
6164 # import here to avoid loop
6265 from launch .utilities import normalize_to_list_of_substitutions
6366
64- self .__source_file = normalize_to_list_of_substitutions (source_file )
67+ self .__source_file : list [launch .Substitution ] = \
68+ normalize_to_list_of_substitutions (source_file )
6569 self .__param_rewrites = {}
6670 self .__key_rewrites = {}
6771 self .__convert_types = convert_types
@@ -79,19 +83,19 @@ def __init__(
7983 self .__root_key = normalize_to_list_of_substitutions (root_key )
8084
8185 @property
82- def name (self ) -> List [launch .Substitution ]:
86+ def name (self ) -> list [launch .Substitution ]:
8387 """Getter for name."""
8488 return self .__source_file
8589
86- def describe (self ) -> Text :
90+ def describe (self ) -> str :
8791 """Return a description of this substitution as a string."""
8892 return ''
8993
90- def perform (self , context : launch .LaunchContext ) -> Text :
94+ def perform (self , context : launch .LaunchContext ) -> str :
9195 yaml_filename = launch .utilities .perform_substitutions (context , self .name )
9296 rewritten_yaml = tempfile .NamedTemporaryFile (mode = 'w' , delete = False )
9397 param_rewrites , keys_rewrites = self .resolve_rewrites (context )
94- data = yaml .safe_load (open (yaml_filename , 'r' ))
98+ data = yaml .safe_load (open (yaml_filename ))
9599 self .substitute_params (data , param_rewrites )
96100 self .add_params (data , param_rewrites )
97101 self .substitute_keys (data , keys_rewrites )
@@ -103,7 +107,8 @@ def perform(self, context: launch.LaunchContext) -> Text:
103107 rewritten_yaml .close ()
104108 return rewritten_yaml .name
105109
106- def resolve_rewrites (self , context ):
110+ def resolve_rewrites (self , context : launch .LaunchContext ) -> \
111+ tuple [dict [str , str ], dict [str , str ]]:
107112 resolved_params = {}
108113 for key in self .__param_rewrites :
109114 resolved_params [key ] = launch .utilities .perform_substitutions (
@@ -116,7 +121,8 @@ def resolve_rewrites(self, context):
116121 )
117122 return resolved_params , resolved_keys
118123
119- def substitute_params (self , yaml , param_rewrites ):
124+ def substitute_params (self , yaml : dict [str , YamlValue ],
125+ param_rewrites : dict [str , str ]) -> None :
120126 # substitute leaf-only parameters
121127 for key in self .getYamlLeafKeys (yaml ):
122128 if key .key () in param_rewrites :
@@ -132,7 +138,8 @@ def substitute_params(self, yaml, param_rewrites):
132138 yaml_keys = path .split ('.' )
133139 yaml = self .updateYamlPathVals (yaml , yaml_keys , rewrite_val )
134140
135- def add_params (self , yaml , param_rewrites ):
141+ def add_params (self , yaml : dict [str , YamlValue ],
142+ param_rewrites : dict [str , str ]) -> None :
136143 # add new total path parameters
137144 yaml_paths = self .pathify (yaml )
138145 for path in param_rewrites :
@@ -142,7 +149,10 @@ def add_params(self, yaml, param_rewrites):
142149 if 'ros__parameters' in yaml_keys :
143150 yaml = self .updateYamlPathVals (yaml , yaml_keys , new_val )
144151
145- def updateYamlPathVals (self , yaml , yaml_key_list , rewrite_val ):
152+ def updateYamlPathVals (
153+ self , yaml : dict [str , YamlValue ],
154+ yaml_key_list : list [str ], rewrite_val : YamlValue ) -> dict [str , YamlValue ]:
155+
146156 for key in yaml_key_list :
147157 if key == yaml_key_list [- 1 ]:
148158 yaml [key ] = rewrite_val
@@ -153,12 +163,15 @@ def updateYamlPathVals(self, yaml, yaml_key_list, rewrite_val):
153163 yaml [int (key )], yaml_key_list , rewrite_val
154164 )
155165 else :
156- yaml [key ] = self .updateYamlPathVals (
157- yaml .get (key , {}), yaml_key_list , rewrite_val
166+ yaml [key ] = self .updateYamlPathVals ( # type: ignore[assignment]
167+ yaml .get (key , {}), # type: ignore[arg-type]
168+ yaml_key_list ,
169+ rewrite_val
158170 )
159171 return yaml
160172
161- def substitute_keys (self , yaml , key_rewrites ):
173+ def substitute_keys (
174+ self , yaml : dict [str , YamlValue ], key_rewrites : dict [str , str ]) -> None :
162175 if len (key_rewrites ) != 0 :
163176 for key in list (yaml .keys ()):
164177 val = yaml [key ]
@@ -169,20 +182,31 @@ def substitute_keys(self, yaml, key_rewrites):
169182 if isinstance (val , dict ):
170183 self .substitute_keys (val , key_rewrites )
171184
172- def getYamlLeafKeys (self , yamlData ):
173- try :
174- for key in yamlData .keys ():
175- for k in self .getYamlLeafKeys (yamlData [key ]):
176- yield k
177- yield DictItemReference (yamlData , key )
178- except AttributeError :
185+ def getYamlLeafKeys (self , yamlData : dict [str , YamlValue ]) -> \
186+ Generator [DictItemReference , None , None ]:
187+ if not isinstance (yamlData , dict ):
179188 return
180189
181- def pathify (self , d , p = None , paths = None , joinchar = '.' ):
190+ for key in yamlData .keys ():
191+ child = yamlData [key ]
192+
193+ if isinstance (child , dict ):
194+ # Recursively process nested dictionaries
195+ yield from self .getYamlLeafKeys (child )
196+
197+ yield DictItemReference (yamlData , key )
198+
199+ def pathify (
200+ self , d : Union [dict [str , YamlValue ], list [YamlValue ], YamlValue ],
201+ p : Optional [str ] = None ,
202+ paths : Optional [dict [str , YamlValue ]] = None ,
203+ joinchar : str = '.' ) -> dict [str , YamlValue ]:
182204 if p is None :
183205 paths = {}
184206 self .pathify (d , '' , paths , joinchar = joinchar )
185207 return paths
208+
209+ assert paths is not None
186210 pn = p
187211 if p != '' :
188212 pn += joinchar
@@ -195,8 +219,9 @@ def pathify(self, d, p=None, paths=None, joinchar='.'):
195219 self .pathify (e , pn + str (idx ), paths , joinchar = joinchar )
196220 else :
197221 paths [p ] = d
222+ return paths
198223
199- def convert (self , text_value ) :
224+ def convert (self , text_value : str ) -> YamlValue :
200225 if self .__convert_types :
201226 # try converting to int or float
202227 try :
0 commit comments