Coverage for pyTooling/CLIAbstraction/__init__.py: 77%

212 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-04-25 22:22 +0000

1# ==================================================================================================================== # 

2# _____ _ _ ____ _ ___ _ _ _ _ _ # 

3# _ __ _ |_ _|__ ___ | (_)_ __ __ _ / ___| | |_ _| / \ | |__ ___| |_ _ __ __ _ ___| |_(_) ___ _ __ # 

4# | '_ \| | | || |/ _ \ / _ \| | | '_ \ / _` || | | | | | / _ \ | '_ \/ __| __| '__/ _` |/ __| __| |/ _ \| '_ \ # 

5# | |_) | |_| || | (_) | (_) | | | | | | (_| || |___| |___ | | / ___ \| |_) \__ \ |_| | | (_| | (__| |_| | (_) | | | | # 

6# | .__/ \__, ||_|\___/ \___/|_|_|_| |_|\__, (_)____|_____|___/_/ \_\_.__/|___/\__|_| \__,_|\___|\__|_|\___/|_| |_| # 

7# |_| |___/ |___/ # 

8# ==================================================================================================================== # 

9# Authors: # 

10# Patrick Lehmann # 

11# # 

12# License: # 

13# ==================================================================================================================== # 

14# Copyright 2017-2025 Patrick Lehmann - Bötzingen, Germany # 

15# Copyright 2014-2016 Technische Universität Dresden - Germany, Chair of VLSI-Design, Diagnostics and Architecture # 

16# # 

17# Licensed under the Apache License, Version 2.0 (the "License"); # 

18# you may not use this file except in compliance with the License. # 

19# You may obtain a copy of the License at # 

20# # 

21# http://www.apache.org/licenses/LICENSE-2.0 # 

22# # 

23# Unless required by applicable law or agreed to in writing, software # 

24# distributed under the License is distributed on an "AS IS" BASIS, # 

25# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # 

26# See the License for the specific language governing permissions and # 

27# limitations under the License. # 

28# # 

29# SPDX-License-Identifier: Apache-2.0 # 

30# ==================================================================================================================== # 

31# 

32"""Basic abstraction layer for executables.""" 

33 

34# __keywords__ = ["abstract", "executable", "cli", "cli arguments"] 

35 

36from os import environ as os_environ 

37from pathlib import Path 

38from platform import system 

39from shutil import which as shutil_which 

40from subprocess import Popen as Subprocess_Popen, PIPE as Subprocess_Pipe, STDOUT as Subprocess_StdOut 

41from sys import version_info # needed for versions before Python 3.11 

42from typing import Dict, Optional as Nullable, ClassVar, Type, List, Tuple, Iterator, Generator, Any, Mapping, Iterable 

43 

44try: 

45 from pyTooling.Decorators import export, readonly 

46 from pyTooling.MetaClasses import ExtendedType 

47 from pyTooling.Exceptions import ToolingException, PlatformNotSupportedException 

48 from pyTooling.Common import getFullyQualifiedName 

49 from pyTooling.Attributes import Attribute 

50 from pyTooling.CLIAbstraction.Argument import CommandLineArgument, ExecutableArgument 

51 from pyTooling.CLIAbstraction.Argument import NamedAndValuedArgument, ValuedArgument, PathArgument, PathListArgument, NamedTupledArgument 

52 from pyTooling.CLIAbstraction.ValuedFlag import ValuedFlag 

53except (ImportError, ModuleNotFoundError): # pragma: no cover 

54 print("[pyTooling.CLIAbstraction] Could not import from 'pyTooling.*'!") 

55 

56 try: 

57 from Decorators import export, readonly 

58 from MetaClasses import ExtendedType 

59 from Exceptions import ToolingException, PlatformNotSupportedException 

60 from Common import getFullyQualifiedName 

61 from Attributes import Attribute 

62 from CLIAbstraction.Argument import CommandLineArgument, ExecutableArgument 

63 from CLIAbstraction.Argument import NamedAndValuedArgument, ValuedArgument, PathArgument, PathListArgument, NamedTupledArgument 

64 from CLIAbstraction.ValuedFlag import ValuedFlag 

65 except (ImportError, ModuleNotFoundError) as ex: # pragma: no cover 

66 print("[pyTooling.CLIAbstraction] Could not import directly!") 

67 raise ex 

68 

69 

70@export 

71class CLIAbstractionException(ToolingException): 

72 pass 

73 

74 

75@export 

76class DryRunException(CLIAbstractionException): 

77 """This exception is raised if an executable is launched while in dry-run mode.""" 

78 

79 

80@export 

81class CLIArgument(Attribute): 

82 """An attribute to annotate nested classes as an CLI argument.""" 

83 

84 

85@export 

86class Environment(metaclass=ExtendedType, slots=True): 

87 _variables: Dict[str, str] 

88 

89 def __init__( 

90 self, 

91 newVariables: Nullable[Mapping[str, str]] = None, 

92 addVariables: Nullable[Mapping[str, str]] = None, 

93 delVariables: Nullable[Iterable[str]] = None 

94 ) -> None: 

95 if newVariables is None: 

96 newVariables = os_environ 

97 

98 self._variables = {name: value for name, value in newVariables.items()} 

99 

100 if delVariables is not None: 100 ↛ 101line 100 didn't jump to line 101 because the condition on line 100 was never true

101 for variableName in delVariables: 

102 del self._variables[variableName] 

103 

104 if addVariables is not None: 

105 self._variables.update(addVariables) 

106 

107 def __len__(self) -> len: 

108 return len(self._variables) 

109 

110 def __contains__(self, name: str) -> bool: 

111 return name in self._variables 

112 

113 def __getitem__(self, name: str) -> str: 

114 return self._variables[name] 

115 

116 def __setitem__(self, name: str, value: str) -> None: 

117 self._variables[name] = value 

118 

119 def __delitem__(self, name: str) -> None: 

120 del self._variables[name] 

121 

122 

123@export 

124class Program(metaclass=ExtendedType, slots=True): 

125 """Represent a simple command line interface (CLI) executable (program or script).""" 

126 

127 _platform: str #: Current platform the executable runs on (Linux, Windows, ...) 

128 _executableNames: ClassVar[Dict[str, str]] #: Dictionary of platform specific executable names. 

129 _executablePath: Path #: The path to the executable (binary, script, ...). 

130 _dryRun: bool #: True, if program shall run in *dry-run mode*. 

131 __cliOptions__: ClassVar[Dict[Type[CommandLineArgument], int]] #: List of all possible CLI options. 

132 __cliParameters__: Dict[Type[CommandLineArgument], Nullable[CommandLineArgument]] #: List of all CLI parameters (used CLI options). 

133 

134 def __init_subclass__(cls, *args: Any, **kwargs: Any): 

135 """ 

136 Whenever a subclass is derived from :class:``Program``, all nested classes declared within ``Program`` and which are 

137 marked with attribute ``CLIArgument`` are collected and then listed in the ``__cliOptions__`` dictionary. 

138 """ 

139 super().__init_subclass__(*args, **kwargs) 

140 

141 # register all available CLI options (nested classes marked with attribute 'CLIArgument') 

142 cls.__cliOptions__: Dict[Type[CommandLineArgument], int] = {} 

143 order: int = 0 

144 for option in CLIArgument.GetClasses(scope=cls): 

145 cls.__cliOptions__[option] = order 

146 order += 1 

147 

148 def __init__(self, executablePath: Nullable[Path] = None, binaryDirectoryPath: Nullable[Path] = None, dryRun: bool = False) -> None: 

149 self._platform = system() 

150 self._dryRun = dryRun 

151 

152 if executablePath is not None: 

153 if isinstance(executablePath, Path): 

154 if not executablePath.exists(): 

155 if dryRun: 155 ↛ 156line 155 didn't jump to line 156 because the condition on line 155 was never true

156 self.LogDryRun(f"File check for '{executablePath}' failed. [SKIPPING]") 

157 else: 

158 raise CLIAbstractionException(f"Program '{executablePath}' not found.") from FileNotFoundError(executablePath) 

159 else: 

160 ex = TypeError(f"Parameter 'executablePath' is not of type 'Path'.") 

161 if version_info >= (3, 11): # pragma: no cover 

162 ex.add_note(f"Got type '{getFullyQualifiedName(executablePath)}'.") 

163 raise ex 

164 elif binaryDirectoryPath is not None: 

165 if isinstance(binaryDirectoryPath, Path): 

166 if not binaryDirectoryPath.exists(): 

167 if dryRun: 167 ↛ 168line 167 didn't jump to line 168 because the condition on line 167 was never true

168 self.LogDryRun(f"Directory check for '{binaryDirectoryPath}' failed. [SKIPPING]") 

169 else: 

170 raise CLIAbstractionException(f"Binary directory '{binaryDirectoryPath}' not found.") from FileNotFoundError(binaryDirectoryPath) 

171 

172 try: 

173 executablePath = binaryDirectoryPath / self.__class__._executableNames[self._platform] 

174 except KeyError: 

175 raise CLIAbstractionException(f"Program is not supported on platform '{self._platform}'.") from PlatformNotSupportedException(self._platform) 

176 

177 if not executablePath.exists(): 

178 if dryRun: 178 ↛ 179line 178 didn't jump to line 179 because the condition on line 178 was never true

179 self.LogDryRun(f"File check for '{executablePath}' failed. [SKIPPING]") 

180 else: 

181 raise CLIAbstractionException(f"Program '{executablePath}' not found.") from FileNotFoundError(executablePath) 

182 else: 

183 ex = TypeError(f"Parameter 'binaryDirectoryPath' is not of type 'Path'.") 

184 if version_info >= (3, 11): # pragma: no cover 

185 ex.add_note(f"Got type '{getFullyQualifiedName(binaryDirectoryPath)}'.") 

186 raise ex 

187 else: 

188 try: 

189 executablePath = Path(self._executableNames[self._platform]) 

190 except KeyError: 

191 raise CLIAbstractionException(f"Program is not supported on platform '{self._platform}'.") from PlatformNotSupportedException(self._platform) 

192 

193 resolvedExecutable = shutil_which(str(executablePath)) 

194 if dryRun: 194 ↛ 195line 194 didn't jump to line 195 because the condition on line 194 was never true

195 if resolvedExecutable is None: 

196 pass 

197 # XXX: log executable not found in PATH 

198 # self.LogDryRun(f"Which '{executablePath}' failed. [SKIPPING]") 

199 else: 

200 fullExecutablePath = Path(resolvedExecutable) 

201 if not fullExecutablePath.exists(): 

202 pass 

203 # XXX: log executable not found 

204 # self.LogDryRun(f"File check for '{fullExecutablePath}' failed. [SKIPPING]") 

205 else: 

206 if resolvedExecutable is None: 

207 raise CLIAbstractionException(f"Program could not be found in PATH.") from FileNotFoundError(executablePath) 

208 

209 fullExecutablePath = Path(resolvedExecutable) 

210 if not fullExecutablePath.exists(): 210 ↛ 211line 210 didn't jump to line 211 because the condition on line 210 was never true

211 raise CLIAbstractionException(f"Program '{fullExecutablePath}' not found.") from FileNotFoundError(fullExecutablePath) 

212 

213 # TODO: log found executable in PATH 

214 # TODO: check if found executable has execute permissions 

215 # raise ValueError(f"Neither parameter 'executablePath' nor 'binaryDirectoryPath' was set.") 

216 

217 self._executablePath = executablePath 

218 self.__cliParameters__ = {} 

219 

220 @staticmethod 

221 def _NeedsParameterInitialization(key): 

222 return issubclass(key, (ValuedFlag, ValuedArgument, NamedAndValuedArgument, NamedTupledArgument, PathArgument, PathListArgument)) 

223 

224 def __getitem__(self, key): 

225 """Access to a CLI parameter by CLI option (key must be of type :class:`CommandLineArgument`), which is already used.""" 

226 if not issubclass(key, CommandLineArgument): 226 ↛ 227line 226 didn't jump to line 227 because the condition on line 226 was never true

227 ex = TypeError(f"Key '{key}' is not a subclass of 'CommandLineArgument'.") 

228 if version_info >= (3, 11): # pragma: no cover 

229 ex.add_note(f"Got type '{getFullyQualifiedName(key)}'.") 

230 raise ex 

231 

232 # TODO: is nested check 

233 return self.__cliParameters__[key] 

234 

235 def __setitem__(self, key, value): 

236 if not issubclass(key, CommandLineArgument): 236 ↛ 237line 236 didn't jump to line 237 because the condition on line 236 was never true

237 ex = TypeError(f"Key '{key}' is not a subclass of 'CommandLineArgument'.") 

238 if version_info >= (3, 11): # pragma: no cover 

239 ex.add_note(f"Got type '{getFullyQualifiedName(key)}'.") 

240 raise ex 

241 elif key not in self.__cliOptions__: 

242 raise KeyError(f"Option '{key}' is not allowed on executable '{self.__class__.__name__}'") 

243 elif key in self.__cliParameters__: 

244 raise KeyError(f"Option '{key}' is already set to a value.") 

245 

246 if self._NeedsParameterInitialization(key): 

247 self.__cliParameters__[key] = key(value) 

248 else: 

249 self.__cliParameters__[key] = key() 

250 

251 @readonly 

252 def Path(self) -> Path: 

253 return self._executablePath 

254 

255 def ToArgumentList(self) -> List[str]: 

256 result: List[str] = [] 

257 

258 result.append(str(self._executablePath)) 

259 

260 def predicate(item: Tuple[Type[CommandLineArgument], int]) -> int: 

261 return self.__cliOptions__[item[0]] 

262 

263 for key, value in sorted(self.__cliParameters__.items(), key=predicate): 

264 param = value.AsArgument() 

265 if isinstance(param, str): 

266 result.append(param) 

267 elif isinstance(param, (Tuple, List)): 267 ↛ 270line 267 didn't jump to line 270 because the condition on line 267 was always true

268 result += param 

269 else: 

270 raise TypeError(f"") # XXX: needs error message 

271 

272 return result 

273 

274 def __repr__(self): 

275 return "[" + ", ".join([f"\"{item}\"" for item in self.ToArgumentList()]) + "]" 

276 

277 def __str__(self): 

278 return " ".join([f"\"{item}\"" for item in self.ToArgumentList()]) 

279 

280 

281@export 

282class Executable(Program): # (ILogable): 

283 """Represent a CLI executable derived from :class:`Program`, that adds an abstraction of :class:`subprocess.Popen`.""" 

284 

285 _BOUNDARY: ClassVar[str] = "====== BOUNDARY pyTooling.CLIAbstraction BOUNDARY ======" 

286 

287 _workingDirectory: Nullable[Path] 

288 _environment: Nullable[Environment] 

289 _process: Nullable[Subprocess_Popen] 

290 _iterator: Nullable[Iterator] 

291 

292 def __init__( 

293 self, 

294 executablePath: Path = None, 

295 binaryDirectoryPath: Path = None, 

296 workingDirectory: Path = None, 

297 environment: Nullable[Environment] = None, 

298 dryRun: bool = False 

299 ): 

300 super().__init__(executablePath, binaryDirectoryPath, dryRun) 

301 

302 self._workingDirectory = None 

303 self._environment = environment 

304 self._process = None 

305 self._iterator = None 

306 

307 def StartProcess( 

308 self, 

309 environment: Nullable[Environment] = None 

310 ): 

311 # start child process 

312 

313 if self._dryRun: 313 ↛ 314line 313 didn't jump to line 314 because the condition on line 313 was never true

314 self.LogDryRun(f"Start process: {self!r}") 

315 return 

316 

317 if self._environment is not None: 317 ↛ 318line 317 didn't jump to line 318 because the condition on line 317 was never true

318 envVariables = self._environment._variables 

319 elif environment is not None: 319 ↛ 322line 319 didn't jump to line 322 because the condition on line 319 was always true

320 envVariables = environment._variables 

321 else: 

322 envVariables = None 

323 

324 # FIXME: verbose log start process 

325 # FIXME: debug log - parameter list 

326 try: 

327 self._process = Subprocess_Popen( 

328 self.ToArgumentList(), 

329 stdin=Subprocess_Pipe, 

330 stdout=Subprocess_Pipe, 

331 stderr=Subprocess_StdOut, 

332 cwd=self._workingDirectory, 

333 env=envVariables, 

334 universal_newlines=True, 

335 bufsize=256 

336 ) 

337 

338 except OSError as ex: 

339 raise CLIAbstractionException(f"Error while launching a process for '{self._executablePath}'.") from ex 

340 

341 def Send(self, line: str, end: str = "\n") -> None: 

342 try: 

343 self._process.stdin.write(line + end) 

344 self._process.stdin.flush() 

345 except Exception as ex: 

346 raise CLIAbstractionException(f"") from ex # XXX: need error message 

347 

348 # This is TCL specific ... 

349 # def SendBoundary(self): 

350 # self.Send("puts \"{0}\"".format(self._pyIPCMI_BOUNDARY)) 

351 

352 def GetLineReader(self) -> Generator[str, None, None]: 

353 if self._dryRun: 353 ↛ 354line 353 didn't jump to line 354 because the condition on line 353 was never true

354 raise DryRunException() # XXX: needs a message 

355 

356 try: 

357 for line in iter(self._process.stdout.readline, ""): # FIXME: can it be improved? 

358 yield line[:-1] 

359 except Exception as ex: 

360 raise CLIAbstractionException() from ex # XXX: need error message 

361 # finally: 

362 # self._process.terminate() 

363 

364 def Terminate(self): 

365 self._process.terminate() 

366 

367 @readonly 

368 def ExitCode(self) -> int: 

369 if self._process is None: 

370 raise CLIAbstractionException(f"Process not yet started, thus no exit code.") 

371 

372 # TODO: check if process is still running 

373 

374 return self._process.returncode 

375 

376 # This is TCL specific 

377 # def ReadUntilBoundary(self, indent=0): 

378 # __indent = " " * indent 

379 # if self._iterator is None: 

380 # self._iterator = iter(self.GetReader()) 

381 # 

382 # for line in self._iterator: 

383 # print(__indent + line) 

384 # if self._pyIPCMI_BOUNDARY in line: 

385 # break 

386 # self.LogDebug("Quartus II is ready") 

387 

388 

389@export 

390class OutputFilteredExecutable(Executable): 

391 _hasOutput: bool 

392 _hasWarnings: bool 

393 _hasErrors: bool 

394 _hasFatals: bool 

395 

396 def __init__(self, platform, dryrun, executablePath): #, environment=None, logger=None) -> None: 

397 super().__init__(platform, dryrun, executablePath) #, environment=environment, logger=logger) 

398 

399 self._hasOutput = False 

400 self._hasWarnings = False 

401 self._hasErrors = False 

402 self._hasFatals = False 

403 

404 @readonly 

405 def HasWarnings(self): 

406 """True if warnings were found while processing the output stream.""" 

407 return self._hasWarnings 

408 

409 @readonly 

410 def HasErrors(self): 

411 """True if errors were found while processing the output stream.""" 

412 return self._hasErrors 

413 

414 @readonly 

415 def HasFatals(self): 

416 """True if fatals were found while processing the output stream.""" 

417 return self._hasErrors