Coverage for pyTooling / CLIAbstraction / __init__.py: 76%

215 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-11-21 22:22 +0000

1# ==================================================================================================================== # 

2# _____ _ _ ____ _ ___ _ _ _ _ _ # 

3# _ __ _ |_ _|__ ___ | (_)_ __ __ _ / ___| | |_ _| / \ | |__ ___| |_ _ __ __ _ ___| |_(_) ___ _ __ # 

4# | '_ \| | | || |/ _ \ / _ \| | | '_ \ / _` || | | | | | / _ \ | '_ \/ __| __| '__/ _` |/ __| __| |/ _ \| '_ \ # 

5# | |_) | |_| || | (_) | (_) | | | | | | (_| || |___| |___ | | / ___ \| |_) \__ \ |_| | | (_| | (__| |_| | (_) | | | | # 

6# | .__/ \__, ||_|\___/ \___/|_|_|_| |_|\__, (_)____|_____|___/_/ \_\_.__/|___/\__|_| \__,_|\___|\__|_|\___/|_| |_| # 

7# |_| |___/ |___/ # 

8# ==================================================================================================================== # 

9# Authors: # 

10# Patrick Lehmann # 

11# # 

12# License: # 

13# ==================================================================================================================== # 

14# Copyright 2017-2025 Patrick Lehmann - Bötzingen, Germany # 

15# Copyright 2014-2016 Technische Universität Dresden - Germany, Chair of VLSI-Design, Diagnostics and Architecture # 

16# # 

17# Licensed under the Apache License, Version 2.0 (the "License"); # 

18# you may not use this file except in compliance with the License. # 

19# You may obtain a copy of the License at # 

20# # 

21# http://www.apache.org/licenses/LICENSE-2.0 # 

22# # 

23# Unless required by applicable law or agreed to in writing, software # 

24# distributed under the License is distributed on an "AS IS" BASIS, # 

25# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # 

26# See the License for the specific language governing permissions and # 

27# limitations under the License. # 

28# # 

29# SPDX-License-Identifier: Apache-2.0 # 

30# ==================================================================================================================== # 

31# 

32"""Basic abstraction layer for executables.""" 

33 

34# __keywords__ = ["abstract", "executable", "cli", "cli arguments"] 

35 

36from os import environ as os_environ 

37from pathlib import Path 

38from platform import system 

39from shutil import which as shutil_which 

40from subprocess import Popen as Subprocess_Popen, PIPE as Subprocess_Pipe, STDOUT as Subprocess_StdOut 

41from typing import Dict, Optional as Nullable, ClassVar, Type, List, Tuple, Iterator, Generator, Any, Mapping, Iterable 

42 

43try: 

44 from pyTooling.Decorators import export, readonly 

45 from pyTooling.MetaClasses import ExtendedType 

46 from pyTooling.Exceptions import ToolingException, PlatformNotSupportedException 

47 from pyTooling.Common import getFullyQualifiedName 

48 from pyTooling.Attributes import Attribute 

49 from pyTooling.CLIAbstraction.Argument import CommandLineArgument, ExecutableArgument 

50 from pyTooling.CLIAbstraction.Argument import NamedAndValuedArgument, ValuedArgument, PathArgument, PathListArgument, NamedTupledArgument 

51 from pyTooling.CLIAbstraction.ValuedFlag import ValuedFlag 

52except (ImportError, ModuleNotFoundError): # pragma: no cover 

53 print("[pyTooling.CLIAbstraction] Could not import from 'pyTooling.*'!") 

54 

55 try: 

56 from Decorators import export, readonly 

57 from MetaClasses import ExtendedType 

58 from Exceptions import ToolingException, PlatformNotSupportedException 

59 from Common import getFullyQualifiedName 

60 from Attributes import Attribute 

61 from CLIAbstraction.Argument import CommandLineArgument, ExecutableArgument 

62 from CLIAbstraction.Argument import NamedAndValuedArgument, ValuedArgument, PathArgument, PathListArgument, NamedTupledArgument 

63 from CLIAbstraction.ValuedFlag import ValuedFlag 

64 except (ImportError, ModuleNotFoundError) as ex: # pragma: no cover 

65 print("[pyTooling.CLIAbstraction] Could not import directly!") 

66 raise ex 

67 

68 

69@export 

70class CLIAbstractionException(ToolingException): 

71 pass 

72 

73 

74@export 

75class DryRunException(CLIAbstractionException): 

76 """This exception is raised if an executable is launched while in dry-run mode.""" 

77 

78 

79@export 

80class CLIArgument(Attribute): 

81 """An attribute to annotate nested classes as an CLI argument.""" 

82 

83 

84@export 

85class Environment(metaclass=ExtendedType, slots=True): 

86 _variables: Dict[str, str] 

87 

88 def __init__( 

89 self, 

90 newVariables: Nullable[Mapping[str, str]] = None, 

91 addVariables: Nullable[Mapping[str, str]] = None, 

92 delVariables: Nullable[Iterable[str]] = None 

93 ) -> None: 

94 if newVariables is None: 

95 newVariables = os_environ 

96 

97 self._variables = {name: value for name, value in newVariables.items()} 

98 

99 if delVariables is not None: 99 ↛ 100line 99 didn't jump to line 100 because the condition on line 99 was never true

100 for variableName in delVariables: 

101 del self._variables[variableName] 

102 

103 if addVariables is not None: 

104 self._variables.update(addVariables) 

105 

106 def __len__(self) -> len: 

107 return len(self._variables) 

108 

109 def __contains__(self, name: str) -> bool: 

110 return name in self._variables 

111 

112 def __getitem__(self, name: str) -> str: 

113 return self._variables[name] 

114 

115 def __setitem__(self, name: str, value: str) -> None: 

116 self._variables[name] = value 

117 

118 def __delitem__(self, name: str) -> None: 

119 del self._variables[name] 

120 

121 

122@export 

123class Program(metaclass=ExtendedType, slots=True): 

124 """Represent a simple command line interface (CLI) executable (program or script).""" 

125 

126 _platform: str #: Current platform the executable runs on (Linux, Windows, ...) 

127 _executableNames: ClassVar[Dict[str, str]] #: Dictionary of platform specific executable names. 

128 _executablePath: Path #: The path to the executable (binary, script, ...). 

129 _dryRun: bool #: True, if program shall run in *dry-run mode*. 

130 __cliOptions__: ClassVar[Dict[Type[CommandLineArgument], int]] #: List of all possible CLI options. 

131 __cliParameters__: Dict[Type[CommandLineArgument], Nullable[CommandLineArgument]] #: List of all CLI parameters (used CLI options). 

132 

133 def __init_subclass__(cls, *args: Any, **kwargs: Any): 

134 """ 

135 Whenever a subclass is derived from :class:``Program``, all nested classes declared within ``Program`` and which are 

136 marked with attribute ``CLIArgument`` are collected and then listed in the ``__cliOptions__`` dictionary. 

137 """ 

138 super().__init_subclass__(*args, **kwargs) 

139 

140 # register all available CLI options (nested classes marked with attribute 'CLIArgument') 

141 cls.__cliOptions__: Dict[Type[CommandLineArgument], int] = {} 

142 order: int = 0 

143 for option in CLIArgument.GetClasses(scope=cls): 

144 cls.__cliOptions__[option] = order 

145 order += 1 

146 

147 def __init__(self, executablePath: Nullable[Path] = None, binaryDirectoryPath: Nullable[Path] = None, dryRun: bool = False) -> None: 

148 self._platform = system() 

149 self._dryRun = dryRun 

150 

151 if executablePath is not None: 

152 if isinstance(executablePath, Path): 

153 if not executablePath.exists(): 

154 if dryRun: 154 ↛ 155line 154 didn't jump to line 155 because the condition on line 154 was never true

155 self.LogDryRun(f"File check for '{executablePath}' failed. [SKIPPING]") 

156 else: 

157 raise CLIAbstractionException(f"Program '{executablePath}' not found.") from FileNotFoundError(executablePath) 

158 else: 

159 ex = TypeError(f"Parameter 'executablePath' is not of type 'Path'.") 

160 ex.add_note(f"Got type '{getFullyQualifiedName(executablePath)}'.") 

161 raise ex 

162 elif binaryDirectoryPath is not None: 

163 if isinstance(binaryDirectoryPath, Path): 

164 if not binaryDirectoryPath.exists(): 

165 if dryRun: 165 ↛ 166line 165 didn't jump to line 166 because the condition on line 165 was never true

166 self.LogDryRun(f"Directory check for '{binaryDirectoryPath}' failed. [SKIPPING]") 

167 else: 

168 raise CLIAbstractionException(f"Binary directory '{binaryDirectoryPath}' not found.") from FileNotFoundError(binaryDirectoryPath) 

169 

170 try: 

171 executablePath = binaryDirectoryPath / self.__class__._executableNames[self._platform] 

172 except KeyError: 

173 raise CLIAbstractionException(f"Program is not supported on platform '{self._platform}'.") from PlatformNotSupportedException(self._platform) 

174 

175 if not executablePath.exists(): 

176 if dryRun: 176 ↛ 177line 176 didn't jump to line 177 because the condition on line 176 was never true

177 self.LogDryRun(f"File check for '{executablePath}' failed. [SKIPPING]") 

178 else: 

179 raise CLIAbstractionException(f"Program '{executablePath}' not found.") from FileNotFoundError(executablePath) 

180 else: 

181 ex = TypeError(f"Parameter 'binaryDirectoryPath' is not of type 'Path'.") 

182 ex.add_note(f"Got type '{getFullyQualifiedName(binaryDirectoryPath)}'.") 

183 raise ex 

184 else: 

185 try: 

186 executablePath = Path(self._executableNames[self._platform]) 

187 except KeyError: 

188 raise CLIAbstractionException(f"Program is not supported on platform '{self._platform}'.") from PlatformNotSupportedException(self._platform) 

189 

190 resolvedExecutable = shutil_which(str(executablePath)) 

191 if dryRun: 191 ↛ 192line 191 didn't jump to line 192 because the condition on line 191 was never true

192 if resolvedExecutable is None: 

193 pass 

194 # XXX: log executable not found in PATH 

195 # self.LogDryRun(f"Which '{executablePath}' failed. [SKIPPING]") 

196 else: 

197 fullExecutablePath = Path(resolvedExecutable) 

198 if not fullExecutablePath.exists(): 

199 pass 

200 # XXX: log executable not found 

201 # self.LogDryRun(f"File check for '{fullExecutablePath}' failed. [SKIPPING]") 

202 else: 

203 if resolvedExecutable is None: 

204 raise CLIAbstractionException(f"Program could not be found in PATH.") from FileNotFoundError(executablePath) 

205 

206 fullExecutablePath = Path(resolvedExecutable) 

207 if not fullExecutablePath.exists(): 207 ↛ 208line 207 didn't jump to line 208 because the condition on line 207 was never true

208 raise CLIAbstractionException(f"Program '{fullExecutablePath}' not found.") from FileNotFoundError(fullExecutablePath) 

209 

210 # TODO: log found executable in PATH 

211 # TODO: check if found executable has execute permissions 

212 # raise ValueError(f"Neither parameter 'executablePath' nor 'binaryDirectoryPath' was set.") 

213 

214 self._executablePath = executablePath 

215 self.__cliParameters__ = {} 

216 

217 @staticmethod 

218 def _NeedsParameterInitialization(key): 

219 return issubclass(key, (ValuedFlag, ValuedArgument, NamedAndValuedArgument, NamedTupledArgument, PathArgument, PathListArgument)) 

220 

221 def __getitem__(self, key): 

222 """Access to a CLI parameter by CLI option (key must be of type :class:`CommandLineArgument`), which is already used.""" 

223 if not issubclass(key, CommandLineArgument): 223 ↛ 224line 223 didn't jump to line 224 because the condition on line 223 was never true

224 ex = TypeError(f"Key '{key}' is not a subclass of 'CommandLineArgument'.") 

225 ex.add_note(f"Got type '{getFullyQualifiedName(key)}'.") 

226 raise ex 

227 

228 # TODO: is nested check 

229 return self.__cliParameters__[key] 

230 

231 def __setitem__(self, key, value): 

232 if not issubclass(key, CommandLineArgument): 232 ↛ 233line 232 didn't jump to line 233 because the condition on line 232 was never true

233 ex = TypeError(f"Key '{key}' is not a subclass of 'CommandLineArgument'.") 

234 ex.add_note(f"Got type '{getFullyQualifiedName(key)}'.") 

235 raise ex 

236 elif key not in self.__cliOptions__: 

237 raise KeyError(f"Option '{key}' is not allowed on executable '{self.__class__.__name__}'") 

238 elif key in self.__cliParameters__: 

239 raise KeyError(f"Option '{key}' is already set to a value.") 

240 

241 if self._NeedsParameterInitialization(key): 

242 self.__cliParameters__[key] = key(value) 

243 else: 

244 self.__cliParameters__[key] = key() 

245 

246 @readonly 

247 def Path(self) -> Path: 

248 return self._executablePath 

249 

250 def ToArgumentList(self) -> List[str]: 

251 result: List[str] = [] 

252 

253 result.append(str(self._executablePath)) 

254 

255 def predicate(item: Tuple[Type[CommandLineArgument], int]) -> int: 

256 return self.__cliOptions__[item[0]] 

257 

258 for key, value in sorted(self.__cliParameters__.items(), key=predicate): 

259 param = value.AsArgument() 

260 if isinstance(param, str): 

261 result.append(param) 

262 elif isinstance(param, (Tuple, List)): 262 ↛ 265line 262 didn't jump to line 265 because the condition on line 262 was always true

263 result += param 

264 else: 

265 raise TypeError(f"") # XXX: needs error message 

266 

267 return result 

268 

269 def __repr__(self): 

270 return "[" + ", ".join([f"\"{item}\"" for item in self.ToArgumentList()]) + "]" 

271 

272 def __str__(self): 

273 return " ".join([f"\"{item}\"" for item in self.ToArgumentList()]) 

274 

275 

276@export 

277class Executable(Program): # (ILogable): 

278 """Represent a CLI executable derived from :class:`Program`, that adds an abstraction of :class:`subprocess.Popen`.""" 

279 

280 _BOUNDARY: ClassVar[str] = "====== BOUNDARY pyTooling.CLIAbstraction BOUNDARY ======" 

281 

282 _workingDirectory: Nullable[Path] 

283 _environment: Nullable[Environment] 

284 _process: Nullable[Subprocess_Popen] 

285 _iterator: Nullable[Iterator] 

286 

287 def __init__( 

288 self, 

289 executablePath: Path = None, 

290 binaryDirectoryPath: Path = None, 

291 workingDirectory: Path = None, 

292 environment: Nullable[Environment] = None, 

293 dryRun: bool = False 

294 ): 

295 super().__init__(executablePath, binaryDirectoryPath, dryRun) 

296 

297 self._workingDirectory = None 

298 self._environment = environment 

299 self._process = None 

300 self._iterator = None 

301 

302 def StartProcess( 

303 self, 

304 environment: Nullable[Environment] = None 

305 ): 

306 # start child process 

307 

308 if self._dryRun: 308 ↛ 309line 308 didn't jump to line 309 because the condition on line 308 was never true

309 self.LogDryRun(f"Start process: {self!r}") 

310 return 

311 

312 if self._environment is not None: 312 ↛ 313line 312 didn't jump to line 313 because the condition on line 312 was never true

313 envVariables = self._environment._variables 

314 elif environment is not None: 314 ↛ 317line 314 didn't jump to line 317 because the condition on line 314 was always true

315 envVariables = environment._variables 

316 else: 

317 envVariables = None 

318 

319 # FIXME: verbose log start process 

320 # FIXME: debug log - parameter list 

321 try: 

322 self._process = Subprocess_Popen( 

323 self.ToArgumentList(), 

324 stdin=Subprocess_Pipe, 

325 stdout=Subprocess_Pipe, 

326 stderr=Subprocess_StdOut, 

327 cwd=self._workingDirectory, 

328 env=envVariables, 

329 universal_newlines=True, 

330 bufsize=256 

331 ) 

332 

333 except OSError as ex: 

334 raise CLIAbstractionException(f"Error while launching a process for '{self._executablePath}'.") from ex 

335 

336 def Send(self, line: str, end: str = "\n") -> None: 

337 try: 

338 self._process.stdin.write(line + end) 

339 self._process.stdin.flush() 

340 except Exception as ex: 

341 raise CLIAbstractionException(f"") from ex # XXX: need error message 

342 

343 # This is TCL specific ... 

344 # def SendBoundary(self): 

345 # self.Send("puts \"{0}\"".format(self._pyIPCMI_BOUNDARY)) 

346 

347 def GetLineReader(self) -> Generator[str, None, None]: 

348 if self._dryRun: 348 ↛ 349line 348 didn't jump to line 349 because the condition on line 348 was never true

349 raise DryRunException() # XXX: needs a message 

350 

351 try: 

352 for line in iter(self._process.stdout.readline, ""): # FIXME: can it be improved? 

353 yield line[:-1] 

354 except Exception as ex: 

355 raise CLIAbstractionException() from ex # XXX: need error message 

356 # finally: 

357 # self._process.terminate() 

358 

359 def Terminate(self): 

360 self._process.terminate() 

361 

362 @readonly 

363 def ExitCode(self) -> int: 

364 if self._process is None: 

365 raise CLIAbstractionException(f"Process not yet started, thus no exit code.") 

366 

367 # TODO: check if process is still running 

368 

369 return self._process.returncode 

370 

371 # This is TCL specific 

372 # def ReadUntilBoundary(self, indent=0): 

373 # __indent = " " * indent 

374 # if self._iterator is None: 

375 # self._iterator = iter(self.GetReader()) 

376 # 

377 # for line in self._iterator: 

378 # print(__indent + line) 

379 # if self._pyIPCMI_BOUNDARY in line: 

380 # break 

381 # self.LogDebug("Quartus II is ready") 

382 

383 

384@export 

385class OutputFilteredExecutable(Executable): 

386 _hasOutput: bool 

387 _hasWarnings: bool 

388 _hasErrors: bool 

389 _hasFatals: bool 

390 

391 def __init__(self, platform, dryrun, executablePath): #, environment=None, logger=None) -> None: 

392 super().__init__(platform, dryrun, executablePath) #, environment=environment, logger=logger) 

393 

394 self._hasOutput = False 

395 self._hasWarnings = False 

396 self._hasErrors = False 

397 self._hasFatals = False 

398 

399 @readonly 

400 def HasWarnings(self): 

401 """True if warnings were found while processing the output stream.""" 

402 return self._hasWarnings 

403 

404 @readonly 

405 def HasErrors(self): 

406 """True if errors were found while processing the output stream.""" 

407 return self._hasErrors 

408 

409 @readonly 

410 def HasFatals(self): 

411 """True if fatals were found while processing the output stream.""" 

412 return self._hasErrors