Coverage for pyTooling / Tracing / __init__.py: 90%
202 statements
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-24 00:02 +0000
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-24 00:02 +0000
1# ==================================================================================================================== #
2# _____ _ _ _____ _ #
3# _ __ _ |_ _|__ ___ | (_)_ __ __ _|_ _| __ __ _ ___(_)_ __ __ _ #
4# | '_ \| | | || |/ _ \ / _ \| | | '_ \ / _` | | || '__/ _` |/ __| | '_ \ / _` | #
5# | |_) | |_| || | (_) | (_) | | | | | | (_| |_| || | | (_| | (__| | | | | (_| | #
6# | .__/ \__, ||_|\___/ \___/|_|_|_| |_|\__, (_)_||_| \__,_|\___|_|_| |_|\__, | #
7# |_| |___/ |___/ |___/ #
8# ==================================================================================================================== #
9# Authors: #
10# Patrick Lehmann #
11# #
12# License: #
13# ==================================================================================================================== #
14# Copyright 2025-2025 Patrick Lehmann - Bötzingen, Germany #
15# #
16# Licensed under the Apache License, Version 2.0 (the "License"); #
17# you may not use this file except in compliance with the License. #
18# You may obtain a copy of the License at #
19# #
20# http://www.apache.org/licenses/LICENSE-2.0 #
21# #
22# Unless required by applicable law or agreed to in writing, software #
23# distributed under the License is distributed on an "AS IS" BASIS, #
24# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. #
25# See the License for the specific language governing permissions and #
26# limitations under the License. #
27# #
28# SPDX-License-Identifier: Apache-2.0 #
29# ==================================================================================================================== #
30#
31"""Tools for software execution tracing."""
32from datetime import datetime
33from time import perf_counter_ns
34from threading import local
35from types import TracebackType
36from typing import Optional as Nullable, List, Iterator, Type, Self, Iterable, Dict, Any, Tuple
39try:
40 from pyTooling.Decorators import export, readonly
41 from pyTooling.MetaClasses import ExtendedType
42 from pyTooling.Exceptions import ToolingException
43 from pyTooling.Common import getFullyQualifiedName
44except (ImportError, ModuleNotFoundError): # pragma: no cover
45 print("[pyTooling.Tracing] Could not import from 'pyTooling.*'!")
47 try:
48 from Decorators import export, readonly
49 from MetaClasses import ExtendedType
50 from Exceptions import ToolingException
51 from Common import getFullyQualifiedName
52 except (ImportError, ModuleNotFoundError) as ex: # pragma: no cover
53 print("[pyTooling.Tracing] Could not import directly!")
54 raise ex
57_threadLocalData = local()
60@export
61class TracingException(ToolingException):
62 """This exception is caused by wrong usage of the stopwatch."""
65@export
66class Event(metaclass=ExtendedType, slots=True):
67 _name: str
68 _parent: Nullable["Span"]
69 _time: Nullable[datetime]
70 _dict: Dict[str, Any]
72 def __init__(self, name: str, parent: Nullable["Span"] = None) -> None:
73 if isinstance(name, str): 73 ↛ 76line 73 didn't jump to line 76 because the condition on line 73 was always true
74 self._name = name
75 else:
76 ex = TypeError("Parameter 'name' is not of type 'str'.")
77 ex.add_note(f"Got type '{getFullyQualifiedName(parent)}'.")
78 raise ex
80 if parent is None:
81 self._parent = None
82 elif isinstance(parent, Span): 82 ↛ 86line 82 didn't jump to line 86 because the condition on line 82 was always true
83 self._parent = parent
84 parent._events.append(self)
85 else:
86 ex = TypeError("Parameter 'parent' is not of type 'Span'.")
87 ex.add_note(f"Got type '{getFullyQualifiedName(parent)}'.")
88 raise ex
90 self._dict = {}
92 @readonly
93 def Name(self) -> str:
94 return self._name
96 @readonly
97 def Parent(self) -> Nullable["Span"]:
98 return self._parent
100 def __getitem__(self, key: str) -> Any:
101 """
102 Read an event's attached attributes (key-value-pairs) by key.
104 :param key: The key to look for.
105 :returns: The value associated to the given key.
106 """
107 return self._dict[key]
109 def __setitem__(self, key: str, value: Any) -> None:
110 """
111 Create or update an event's attached attributes (key-value-pairs) by key.
113 If a key doesn't exist yet, a new key-value-pair is created.
115 :param key: The key to create or update.
116 :param value: The value to associate to the given key.
117 """
118 self._dict[key] = value
120 def __delitem__(self, key: str) -> None:
121 """
122 Remove an entry from event's attached attributes (key-value-pairs) by key.
124 :param key: The key to remove.
125 :raises KeyError: If key doesn't exist in the event's attributes.
126 """
127 del self._dict[key]
129 def __contains__(self, key: str) -> bool:
130 """
131 Checks if the key is an attached attribute (key-value-pairs) on this event.
133 :param key: The key to check.
134 :returns: ``True``, if the key is an attached attribute.
135 """
136 return key in self._dict
138 def __iter__(self) -> Iterator[Tuple[str, Any]]:
139 return iter(self._dict.items())
141 def __len__(self) -> int:
142 """
143 Returns the number of attached attributes (key-value-pairs) on this event.
145 :returns: Number of attached attributes.
146 """
147 return len(self._dict)
149 def __str__(self) -> str:
150 return self._name
153@export
154class Span(metaclass=ExtendedType, slots=True):
155 _name: str
156 _parent: Nullable["Span"]
158 _beginTime: Nullable[datetime]
159 _endTime: Nullable[datetime]
160 _startTime: Nullable[int]
161 _stopTime: Nullable[int]
162 _totalTime: Nullable[int]
164 _spans: List["Span"]
165 _events: List[Event]
166 _dict: Dict[str, Any]
168 def __init__(self, name: str, parent: Nullable["Span"] = None) -> None:
169 if isinstance(name, str): 169 ↛ 172line 169 didn't jump to line 172 because the condition on line 169 was always true
170 self._name = name
171 else:
172 ex = TypeError("Parameter 'name' is not of type 'str'.")
173 ex.add_note(f"Got type '{getFullyQualifiedName(parent)}'.")
174 raise ex
176 if parent is None:
177 self._parent = None
178 elif isinstance(parent, Span): 178 ↛ 182line 178 didn't jump to line 182 because the condition on line 178 was always true
179 self._parent = parent
180 parent._spans.append(self)
181 else:
182 ex = TypeError("Parameter 'parent' is not of type 'Span'.")
183 ex.add_note(f"Got type '{getFullyQualifiedName(parent)}'.")
184 raise ex
186 self._beginTime = None
187 self._startTime = None
188 self._endTime = None
189 self._stopTime = None
190 self._totalTime = None
192 self._spans = []
193 self._events = []
194 self._dict = {}
196 @readonly
197 def Name(self) -> str:
198 return self._name
200 @readonly
201 def Parent(self) -> Nullable["Span"]:
202 return self._parent
204 def _AddSpan(self, span: "Span") -> Self:
205 self._spans.append(span)
206 span._parent = self
208 return span
210 @readonly
211 def HasSubSpans(self) -> bool:
212 return len(self._spans) > 0
214 @readonly
215 def SubSpanCount(self) -> int:
216 """
217 Return the number of sub-spans within this span.
219 :return: Number of nested spans.
220 """
221 return len(self._spans)
223 # iterate subspans with optional predicate
224 def IterateSubSpans(self) -> Iterator["Span"]:
225 return iter(self._spans)
227 @readonly
228 def HasEvents(self) -> bool:
229 return len(self._events) > 0
231 @readonly
232 def EventCount(self) -> int:
233 """
234 Return the number of events within this span.
236 :return: Number of events.
237 """
238 return len(self._events)
240 # iterate events with optional predicate
241 def IterateEvents(self) -> Iterator[Event]:
242 return iter(self._events)
244 @readonly
245 def StartTime(self) -> Nullable[datetime]:
246 """
247 Read-only property returning the absolute time when the span was started.
249 :return: The time when the span was entered, otherwise None.
250 """
251 return self._beginTime
253 @readonly
254 def StopTime(self) -> Nullable[datetime]:
255 """
256 Read-only property returning the absolute time when the span was stopped.
258 :return: The time when the span was exited, otherwise None.
259 """
260 return self._endTime
262 @readonly
263 def Duration(self) -> float:
264 """
265 Read-only property returning the duration from start operation to stop operation.
267 If the stopwatch is not yet stopped, the duration from start to now is returned.
269 :return: Duration since stopwatch was started in seconds. If the stopwatch was never started, the return value will
270 be 0.0.
271 """
272 if self._startTime is None:
273 raise TracingException(f"{self.__class__.__name__} was never started.")
275 return ((perf_counter_ns() - self._startTime) if self._stopTime is None else self._totalTime) / 1e9
277 @classmethod
278 def CurrentSpan(cls) -> "Span":
279 global _threadLocalData
281 try:
282 currentSpan = _threadLocalData.currentSpan
283 except AttributeError as ex:
284 currentSpan = None
286 return currentSpan
288 def __enter__(self) -> Self:
289 """
290 Implementation of the :ref:`context manager protocol's <context-managers>` ``__enter__(...)`` method.
292 A span will be started.
294 :return: The span itself.
295 """
296 global _threadLocalData
298 try:
299 currentSpan = _threadLocalData.currentSpan
300 except AttributeError:
301 ex = TracingException("Can't setup span. No active trace.")
302 ex.add_note("Use with-statement using 'Trace()' to setup software execution tracing.")
303 raise ex
305 _threadLocalData.currentSpan = currentSpan._AddSpan(self)
307 self._beginTime = datetime.now()
308 self._startTime = perf_counter_ns()
310 return self
312 def __exit__(
313 self,
314 exc_type: Nullable[Type[BaseException]] = None,
315 exc_val: Nullable[BaseException] = None,
316 exc_tb: Nullable[TracebackType] = None
317 ) -> Nullable[bool]:
318 """
319 Implementation of the :ref:`context manager protocol's <context-managers>` ``__exit__(...)`` method.
321 An active span will be stopped.
323 Exit the context and ......
325 :param exc_type: Exception type
326 :param exc_val: Exception instance
327 :param exc_tb: Exception's traceback.
328 :returns: ``None``
329 """
330 global _threadLocalData
332 self._stopTime = perf_counter_ns()
333 self._endTime = datetime.now()
334 self._totalTime = self._stopTime - self._startTime
336 currentSpan = _threadLocalData.currentSpan
337 _threadLocalData.currentSpan = currentSpan._parent
339 def __getitem__(self, key: str) -> Any:
340 """
341 Read an event's attached attributes (key-value-pairs) by key.
343 :param key: The key to look for.
344 :returns: The value associated to the given key.
345 """
346 return self._dict[key]
348 def __setitem__(self, key: str, value: Any) -> None:
349 """
350 Create or update an event's attached attributes (key-value-pairs) by key.
352 If a key doesn't exist yet, a new key-value-pair is created.
354 :param key: The key to create or update.
355 :param value: The value to associate to the given key.
356 """
357 self._dict[key] = value
359 def __delitem__(self, key: str) -> None:
360 """
361 Remove an entry from event's attached attributes (key-value-pairs) by key.
363 :param key: The key to remove.
364 :raises KeyError: If key doesn't exist in the event's attributes.
365 """
366 del self._dict[key]
368 def __contains__(self, key: str) -> bool:
369 """
370 Checks if the key is an attached attribute (key-value-pairs) on this event.
372 :param key: The key to check.
373 :returns: ``True``, if the key is an attached attribute.
374 """
375 return key in self._dict
377 def __iter__(self) -> Iterator[Tuple[str, Any]]:
378 return iter(self._dict.items())
380 def __len__(self) -> int:
381 """
382 Returns the number of attached attributes (key-value-pairs) on this event.
384 :returns: Number of attached attributes.
385 """
386 return len(self._dict)
388 def Format(self, indent: int = 1, columnSize: int = 25) -> Iterable[str]:
389 result = []
390 result.append(f"{' ' * indent}🕑{self._name:<{columnSize - 2 * indent}} {self._totalTime/1e6:8.3f} ms")
391 for span in self._spans:
392 result.extend(span.Format(indent + 1, columnSize))
394 return result
396 def __repr__(self) -> str:
397 return f"{self._name} -> {self._parent!r}"
399 def __str__(self) -> str:
400 return self._name
403@export
404class Trace(Span):
405 def __init__(self, name: str) -> None:
406 super().__init__(name)
408 def __enter__(self) -> Self:
409 global _threadLocalData
411 # TODO: check if a trace is already setup
412 # try:
413 # currentTrace = _threadLocalData.currentTrace
414 # except AttributeError:
415 # pass
417 _threadLocalData.currentTrace = self
418 _threadLocalData.currentSpan = self
420 self._beginTime = datetime.now()
421 self._startTime = perf_counter_ns()
423 return self
425 def __exit__(
426 self,
427 exc_type: Nullable[Type[BaseException]] = None,
428 exc_val: Nullable[BaseException] = None,
429 exc_tb: Nullable[TracebackType] = None
430 ) -> Nullable[bool]:
431 """
432 Exit the context and ......
434 :param exc_type: Exception type
435 :param exc_val: Exception instance
436 :param exc_tb: Exception's traceback.
437 :returns: ``None``
438 """
439 global _threadLocalData
441 self._stopTime = perf_counter_ns()
442 self._endTime = datetime.now()
443 self._totalTime = self._stopTime - self._startTime
445 del _threadLocalData.currentTrace
446 del _threadLocalData.currentSpan
448 return None
450 @classmethod
451 def CurrentTrace(cls) -> "Trace":
452 try:
453 currentTrace = _threadLocalData.currentTrace
454 except AttributeError:
455 currentTrace = None
457 return currentTrace
459 def Format(self, indent: int = 0, columnSize: int = 25) -> Iterable[str]:
460 result = []
461 result.append(f"{' ' * indent}Software Execution Trace: {self._totalTime/1e6:8.3f} ms")
462 result.append(f"{' ' * indent}📉{self._name:<{columnSize - 2}} {self._totalTime/1e6:8.3f} ms")
463 for span in self._spans:
464 result.extend(span.Format(indent + 1, columnSize - 2))
466 return result