Coverage for pyTooling / Tracing / __init__.py: 90%

202 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2025-12-24 00:02 +0000

1# ==================================================================================================================== # 

2# _____ _ _ _____ _ # 

3# _ __ _ |_ _|__ ___ | (_)_ __ __ _|_ _| __ __ _ ___(_)_ __ __ _ # 

4# | '_ \| | | || |/ _ \ / _ \| | | '_ \ / _` | | || '__/ _` |/ __| | '_ \ / _` | # 

5# | |_) | |_| || | (_) | (_) | | | | | | (_| |_| || | | (_| | (__| | | | | (_| | # 

6# | .__/ \__, ||_|\___/ \___/|_|_|_| |_|\__, (_)_||_| \__,_|\___|_|_| |_|\__, | # 

7# |_| |___/ |___/ |___/ # 

8# ==================================================================================================================== # 

9# Authors: # 

10# Patrick Lehmann # 

11# # 

12# License: # 

13# ==================================================================================================================== # 

14# Copyright 2025-2025 Patrick Lehmann - Bötzingen, Germany # 

15# # 

16# Licensed under the Apache License, Version 2.0 (the "License"); # 

17# you may not use this file except in compliance with the License. # 

18# You may obtain a copy of the License at # 

19# # 

20# http://www.apache.org/licenses/LICENSE-2.0 # 

21# # 

22# Unless required by applicable law or agreed to in writing, software # 

23# distributed under the License is distributed on an "AS IS" BASIS, # 

24# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # 

25# See the License for the specific language governing permissions and # 

26# limitations under the License. # 

27# # 

28# SPDX-License-Identifier: Apache-2.0 # 

29# ==================================================================================================================== # 

30# 

31"""Tools for software execution tracing.""" 

32from datetime import datetime 

33from time import perf_counter_ns 

34from threading import local 

35from types import TracebackType 

36from typing import Optional as Nullable, List, Iterator, Type, Self, Iterable, Dict, Any, Tuple 

37 

38 

39try: 

40 from pyTooling.Decorators import export, readonly 

41 from pyTooling.MetaClasses import ExtendedType 

42 from pyTooling.Exceptions import ToolingException 

43 from pyTooling.Common import getFullyQualifiedName 

44except (ImportError, ModuleNotFoundError): # pragma: no cover 

45 print("[pyTooling.Tracing] Could not import from 'pyTooling.*'!") 

46 

47 try: 

48 from Decorators import export, readonly 

49 from MetaClasses import ExtendedType 

50 from Exceptions import ToolingException 

51 from Common import getFullyQualifiedName 

52 except (ImportError, ModuleNotFoundError) as ex: # pragma: no cover 

53 print("[pyTooling.Tracing] Could not import directly!") 

54 raise ex 

55 

56 

57_threadLocalData = local() 

58 

59 

60@export 

61class TracingException(ToolingException): 

62 """This exception is caused by wrong usage of the stopwatch.""" 

63 

64 

65@export 

66class Event(metaclass=ExtendedType, slots=True): 

67 _name: str 

68 _parent: Nullable["Span"] 

69 _time: Nullable[datetime] 

70 _dict: Dict[str, Any] 

71 

72 def __init__(self, name: str, parent: Nullable["Span"] = None) -> None: 

73 if isinstance(name, str): 73 ↛ 76line 73 didn't jump to line 76 because the condition on line 73 was always true

74 self._name = name 

75 else: 

76 ex = TypeError("Parameter 'name' is not of type 'str'.") 

77 ex.add_note(f"Got type '{getFullyQualifiedName(parent)}'.") 

78 raise ex 

79 

80 if parent is None: 

81 self._parent = None 

82 elif isinstance(parent, Span): 82 ↛ 86line 82 didn't jump to line 86 because the condition on line 82 was always true

83 self._parent = parent 

84 parent._events.append(self) 

85 else: 

86 ex = TypeError("Parameter 'parent' is not of type 'Span'.") 

87 ex.add_note(f"Got type '{getFullyQualifiedName(parent)}'.") 

88 raise ex 

89 

90 self._dict = {} 

91 

92 @readonly 

93 def Name(self) -> str: 

94 return self._name 

95 

96 @readonly 

97 def Parent(self) -> Nullable["Span"]: 

98 return self._parent 

99 

100 def __getitem__(self, key: str) -> Any: 

101 """ 

102 Read an event's attached attributes (key-value-pairs) by key. 

103 

104 :param key: The key to look for. 

105 :returns: The value associated to the given key. 

106 """ 

107 return self._dict[key] 

108 

109 def __setitem__(self, key: str, value: Any) -> None: 

110 """ 

111 Create or update an event's attached attributes (key-value-pairs) by key. 

112 

113 If a key doesn't exist yet, a new key-value-pair is created. 

114 

115 :param key: The key to create or update. 

116 :param value: The value to associate to the given key. 

117 """ 

118 self._dict[key] = value 

119 

120 def __delitem__(self, key: str) -> None: 

121 """ 

122 Remove an entry from event's attached attributes (key-value-pairs) by key. 

123 

124 :param key: The key to remove. 

125 :raises KeyError: If key doesn't exist in the event's attributes. 

126 """ 

127 del self._dict[key] 

128 

129 def __contains__(self, key: str) -> bool: 

130 """ 

131 Checks if the key is an attached attribute (key-value-pairs) on this event. 

132 

133 :param key: The key to check. 

134 :returns: ``True``, if the key is an attached attribute. 

135 """ 

136 return key in self._dict 

137 

138 def __iter__(self) -> Iterator[Tuple[str, Any]]: 

139 return iter(self._dict.items()) 

140 

141 def __len__(self) -> int: 

142 """ 

143 Returns the number of attached attributes (key-value-pairs) on this event. 

144 

145 :returns: Number of attached attributes. 

146 """ 

147 return len(self._dict) 

148 

149 def __str__(self) -> str: 

150 return self._name 

151 

152 

153@export 

154class Span(metaclass=ExtendedType, slots=True): 

155 _name: str 

156 _parent: Nullable["Span"] 

157 

158 _beginTime: Nullable[datetime] 

159 _endTime: Nullable[datetime] 

160 _startTime: Nullable[int] 

161 _stopTime: Nullable[int] 

162 _totalTime: Nullable[int] 

163 

164 _spans: List["Span"] 

165 _events: List[Event] 

166 _dict: Dict[str, Any] 

167 

168 def __init__(self, name: str, parent: Nullable["Span"] = None) -> None: 

169 if isinstance(name, str): 169 ↛ 172line 169 didn't jump to line 172 because the condition on line 169 was always true

170 self._name = name 

171 else: 

172 ex = TypeError("Parameter 'name' is not of type 'str'.") 

173 ex.add_note(f"Got type '{getFullyQualifiedName(parent)}'.") 

174 raise ex 

175 

176 if parent is None: 

177 self._parent = None 

178 elif isinstance(parent, Span): 178 ↛ 182line 178 didn't jump to line 182 because the condition on line 178 was always true

179 self._parent = parent 

180 parent._spans.append(self) 

181 else: 

182 ex = TypeError("Parameter 'parent' is not of type 'Span'.") 

183 ex.add_note(f"Got type '{getFullyQualifiedName(parent)}'.") 

184 raise ex 

185 

186 self._beginTime = None 

187 self._startTime = None 

188 self._endTime = None 

189 self._stopTime = None 

190 self._totalTime = None 

191 

192 self._spans = [] 

193 self._events = [] 

194 self._dict = {} 

195 

196 @readonly 

197 def Name(self) -> str: 

198 return self._name 

199 

200 @readonly 

201 def Parent(self) -> Nullable["Span"]: 

202 return self._parent 

203 

204 def _AddSpan(self, span: "Span") -> Self: 

205 self._spans.append(span) 

206 span._parent = self 

207 

208 return span 

209 

210 @readonly 

211 def HasSubSpans(self) -> bool: 

212 return len(self._spans) > 0 

213 

214 @readonly 

215 def SubSpanCount(self) -> int: 

216 """ 

217 Return the number of sub-spans within this span. 

218 

219 :return: Number of nested spans. 

220 """ 

221 return len(self._spans) 

222 

223 # iterate subspans with optional predicate 

224 def IterateSubSpans(self) -> Iterator["Span"]: 

225 return iter(self._spans) 

226 

227 @readonly 

228 def HasEvents(self) -> bool: 

229 return len(self._events) > 0 

230 

231 @readonly 

232 def EventCount(self) -> int: 

233 """ 

234 Return the number of events within this span. 

235 

236 :return: Number of events. 

237 """ 

238 return len(self._events) 

239 

240 # iterate events with optional predicate 

241 def IterateEvents(self) -> Iterator[Event]: 

242 return iter(self._events) 

243 

244 @readonly 

245 def StartTime(self) -> Nullable[datetime]: 

246 """ 

247 Read-only property returning the absolute time when the span was started. 

248 

249 :return: The time when the span was entered, otherwise None. 

250 """ 

251 return self._beginTime 

252 

253 @readonly 

254 def StopTime(self) -> Nullable[datetime]: 

255 """ 

256 Read-only property returning the absolute time when the span was stopped. 

257 

258 :return: The time when the span was exited, otherwise None. 

259 """ 

260 return self._endTime 

261 

262 @readonly 

263 def Duration(self) -> float: 

264 """ 

265 Read-only property returning the duration from start operation to stop operation. 

266 

267 If the stopwatch is not yet stopped, the duration from start to now is returned. 

268 

269 :return: Duration since stopwatch was started in seconds. If the stopwatch was never started, the return value will 

270 be 0.0. 

271 """ 

272 if self._startTime is None: 

273 raise TracingException(f"{self.__class__.__name__} was never started.") 

274 

275 return ((perf_counter_ns() - self._startTime) if self._stopTime is None else self._totalTime) / 1e9 

276 

277 @classmethod 

278 def CurrentSpan(cls) -> "Span": 

279 global _threadLocalData 

280 

281 try: 

282 currentSpan = _threadLocalData.currentSpan 

283 except AttributeError as ex: 

284 currentSpan = None 

285 

286 return currentSpan 

287 

288 def __enter__(self) -> Self: 

289 """ 

290 Implementation of the :ref:`context manager protocol's <context-managers>` ``__enter__(...)`` method. 

291 

292 A span will be started. 

293 

294 :return: The span itself. 

295 """ 

296 global _threadLocalData 

297 

298 try: 

299 currentSpan = _threadLocalData.currentSpan 

300 except AttributeError: 

301 ex = TracingException("Can't setup span. No active trace.") 

302 ex.add_note("Use with-statement using 'Trace()' to setup software execution tracing.") 

303 raise ex 

304 

305 _threadLocalData.currentSpan = currentSpan._AddSpan(self) 

306 

307 self._beginTime = datetime.now() 

308 self._startTime = perf_counter_ns() 

309 

310 return self 

311 

312 def __exit__( 

313 self, 

314 exc_type: Nullable[Type[BaseException]] = None, 

315 exc_val: Nullable[BaseException] = None, 

316 exc_tb: Nullable[TracebackType] = None 

317 ) -> Nullable[bool]: 

318 """ 

319 Implementation of the :ref:`context manager protocol's <context-managers>` ``__exit__(...)`` method. 

320 

321 An active span will be stopped. 

322 

323 Exit the context and ...... 

324 

325 :param exc_type: Exception type 

326 :param exc_val: Exception instance 

327 :param exc_tb: Exception's traceback. 

328 :returns: ``None`` 

329 """ 

330 global _threadLocalData 

331 

332 self._stopTime = perf_counter_ns() 

333 self._endTime = datetime.now() 

334 self._totalTime = self._stopTime - self._startTime 

335 

336 currentSpan = _threadLocalData.currentSpan 

337 _threadLocalData.currentSpan = currentSpan._parent 

338 

339 def __getitem__(self, key: str) -> Any: 

340 """ 

341 Read an event's attached attributes (key-value-pairs) by key. 

342 

343 :param key: The key to look for. 

344 :returns: The value associated to the given key. 

345 """ 

346 return self._dict[key] 

347 

348 def __setitem__(self, key: str, value: Any) -> None: 

349 """ 

350 Create or update an event's attached attributes (key-value-pairs) by key. 

351 

352 If a key doesn't exist yet, a new key-value-pair is created. 

353 

354 :param key: The key to create or update. 

355 :param value: The value to associate to the given key. 

356 """ 

357 self._dict[key] = value 

358 

359 def __delitem__(self, key: str) -> None: 

360 """ 

361 Remove an entry from event's attached attributes (key-value-pairs) by key. 

362 

363 :param key: The key to remove. 

364 :raises KeyError: If key doesn't exist in the event's attributes. 

365 """ 

366 del self._dict[key] 

367 

368 def __contains__(self, key: str) -> bool: 

369 """ 

370 Checks if the key is an attached attribute (key-value-pairs) on this event. 

371 

372 :param key: The key to check. 

373 :returns: ``True``, if the key is an attached attribute. 

374 """ 

375 return key in self._dict 

376 

377 def __iter__(self) -> Iterator[Tuple[str, Any]]: 

378 return iter(self._dict.items()) 

379 

380 def __len__(self) -> int: 

381 """ 

382 Returns the number of attached attributes (key-value-pairs) on this event. 

383 

384 :returns: Number of attached attributes. 

385 """ 

386 return len(self._dict) 

387 

388 def Format(self, indent: int = 1, columnSize: int = 25) -> Iterable[str]: 

389 result = [] 

390 result.append(f"{' ' * indent}🕑{self._name:<{columnSize - 2 * indent}} {self._totalTime/1e6:8.3f} ms") 

391 for span in self._spans: 

392 result.extend(span.Format(indent + 1, columnSize)) 

393 

394 return result 

395 

396 def __repr__(self) -> str: 

397 return f"{self._name} -> {self._parent!r}" 

398 

399 def __str__(self) -> str: 

400 return self._name 

401 

402 

403@export 

404class Trace(Span): 

405 def __init__(self, name: str) -> None: 

406 super().__init__(name) 

407 

408 def __enter__(self) -> Self: 

409 global _threadLocalData 

410 

411 # TODO: check if a trace is already setup 

412 # try: 

413 # currentTrace = _threadLocalData.currentTrace 

414 # except AttributeError: 

415 # pass 

416 

417 _threadLocalData.currentTrace = self 

418 _threadLocalData.currentSpan = self 

419 

420 self._beginTime = datetime.now() 

421 self._startTime = perf_counter_ns() 

422 

423 return self 

424 

425 def __exit__( 

426 self, 

427 exc_type: Nullable[Type[BaseException]] = None, 

428 exc_val: Nullable[BaseException] = None, 

429 exc_tb: Nullable[TracebackType] = None 

430 ) -> Nullable[bool]: 

431 """ 

432 Exit the context and ...... 

433 

434 :param exc_type: Exception type 

435 :param exc_val: Exception instance 

436 :param exc_tb: Exception's traceback. 

437 :returns: ``None`` 

438 """ 

439 global _threadLocalData 

440 

441 self._stopTime = perf_counter_ns() 

442 self._endTime = datetime.now() 

443 self._totalTime = self._stopTime - self._startTime 

444 

445 del _threadLocalData.currentTrace 

446 del _threadLocalData.currentSpan 

447 

448 return None 

449 

450 @classmethod 

451 def CurrentTrace(cls) -> "Trace": 

452 try: 

453 currentTrace = _threadLocalData.currentTrace 

454 except AttributeError: 

455 currentTrace = None 

456 

457 return currentTrace 

458 

459 def Format(self, indent: int = 0, columnSize: int = 25) -> Iterable[str]: 

460 result = [] 

461 result.append(f"{' ' * indent}Software Execution Trace: {self._totalTime/1e6:8.3f} ms") 

462 result.append(f"{' ' * indent}📉{self._name:<{columnSize - 2}} {self._totalTime/1e6:8.3f} ms") 

463 for span in self._spans: 

464 result.extend(span.Format(indent + 1, columnSize - 2)) 

465 

466 return result