362 行
11 KiB
362 行
11 KiB
"""
|
|
Lightweight, hierarchical timers for profiling sections of code.
|
|
|
|
Example:
|
|
|
|
@timed
|
|
def foo(t):
|
|
time.sleep(t)
|
|
|
|
def main():
|
|
for i in range(3):
|
|
foo(i + 1)
|
|
with hierarchical_timer("context"):
|
|
foo(1)
|
|
|
|
print(get_timer_tree())
|
|
|
|
This would produce a timer tree like
|
|
(root)
|
|
"foo"
|
|
"context"
|
|
"foo"
|
|
|
|
The total time and counts are tracked for each block of code; in this example "foo" and "context.foo" are considered
|
|
distinct blocks, and are tracked separately.
|
|
|
|
The decorator and contextmanager are equivalent; the context manager may be more useful if you want more control
|
|
over the timer name, or are splitting up multiple sections of a large function.
|
|
"""
|
|
|
|
import math
|
|
import sys
|
|
import time
|
|
import threading
|
|
|
|
from contextlib import contextmanager
|
|
from typing import Any, Callable, Dict, Generator, Optional, TypeVar
|
|
|
|
TIMER_FORMAT_VERSION = "0.1.0"
|
|
|
|
|
|
class TimerNode:
|
|
"""
|
|
Represents the time spent in a block of code.
|
|
"""
|
|
|
|
__slots__ = ["children", "total", "count", "is_parallel"]
|
|
|
|
def __init__(self):
|
|
# Note that since dictionary keys are the node names, we don't explicitly store the name on the TimerNode.
|
|
self.children: Dict[str, TimerNode] = {}
|
|
self.total: float = 0.0
|
|
self.count: int = 0
|
|
self.is_parallel = False
|
|
|
|
def get_child(self, name: str) -> "TimerNode":
|
|
"""
|
|
Get the child node corresponding to the name (and create if it doesn't already exist).
|
|
"""
|
|
child = self.children.get(name)
|
|
if child is None:
|
|
child = TimerNode()
|
|
self.children[name] = child
|
|
return child
|
|
|
|
def add_time(self, elapsed: float) -> None:
|
|
"""
|
|
Accumulate the time spent in the node (and increment the count).
|
|
"""
|
|
self.total += elapsed
|
|
self.count += 1
|
|
|
|
def merge(
|
|
self, other: "TimerNode", root_name: str = None, is_parallel: bool = True
|
|
) -> None:
|
|
"""
|
|
Add the other node to this node, then do the same recursively on its children.
|
|
:param other: The other node to merge
|
|
:param root_name: Optional name of the root node being merged.
|
|
:param is_parallel: Whether or not the code block was executed in parallel.
|
|
:return:
|
|
"""
|
|
if root_name:
|
|
node = self.get_child(root_name)
|
|
else:
|
|
node = self
|
|
|
|
node.total += other.total
|
|
node.count += other.count
|
|
node.is_parallel |= is_parallel
|
|
for other_child_name, other_child_node in other.children.items():
|
|
child = node.get_child(other_child_name)
|
|
child.merge(other_child_node, is_parallel=is_parallel)
|
|
|
|
|
|
class GaugeNode:
|
|
"""
|
|
Tracks the most recent value of a metric. This is analogous to gauges in statsd.
|
|
"""
|
|
|
|
__slots__ = ["value", "min_value", "max_value", "count", "_timestamp"]
|
|
|
|
def __init__(self, value: float):
|
|
self.value = value
|
|
self.min_value = value
|
|
self.max_value = value
|
|
self.count = 1
|
|
# Internal timestamp so we can determine priority.
|
|
self._timestamp = time.time()
|
|
|
|
def update(self, new_value: float) -> None:
|
|
self.min_value = min(self.min_value, new_value)
|
|
self.max_value = max(self.max_value, new_value)
|
|
self.value = new_value
|
|
self.count += 1
|
|
self._timestamp = time.time()
|
|
|
|
def merge(self, other: "GaugeNode") -> None:
|
|
if self._timestamp < other._timestamp:
|
|
# Keep the "later" value
|
|
self.value = other.value
|
|
self._timestamp = other._timestamp
|
|
self.min_value = min(self.min_value, other.min_value)
|
|
self.max_value = max(self.max_value, other.max_value)
|
|
self.count += other.count
|
|
|
|
def as_dict(self) -> Dict[str, float]:
|
|
return {
|
|
"value": self.value,
|
|
"min": self.min_value,
|
|
"max": self.max_value,
|
|
"count": self.count,
|
|
}
|
|
|
|
|
|
class TimerStack:
|
|
"""
|
|
Tracks all the time spent. Users shouldn't use this directly, they should use the contextmanager below to make
|
|
sure that pushes and pops are already matched.
|
|
"""
|
|
|
|
__slots__ = ["root", "stack", "start_time", "gauges", "metadata"]
|
|
|
|
def __init__(self):
|
|
self.root = TimerNode()
|
|
self.stack = [self.root]
|
|
self.start_time = time.perf_counter()
|
|
self.gauges: Dict[str, GaugeNode] = {}
|
|
self.metadata: Dict[str, str] = {}
|
|
self._add_default_metadata()
|
|
|
|
def reset(self):
|
|
self.root = TimerNode()
|
|
self.stack = [self.root]
|
|
self.start_time = time.perf_counter()
|
|
self.gauges: Dict[str, GaugeNode] = {}
|
|
self.metadata: Dict[str, str] = {}
|
|
self._add_default_metadata()
|
|
|
|
def push(self, name: str) -> TimerNode:
|
|
"""
|
|
Called when entering a new block of code that is timed (e.g. with a contextmanager).
|
|
"""
|
|
current_node: TimerNode = self.stack[-1]
|
|
next_node = current_node.get_child(name)
|
|
self.stack.append(next_node)
|
|
return next_node
|
|
|
|
def pop(self) -> None:
|
|
"""
|
|
Called when exiting a new block of code that is timed (e.g. with a contextmanager).
|
|
"""
|
|
self.stack.pop()
|
|
|
|
def get_root(self) -> TimerNode:
|
|
"""
|
|
Update the total time and count of the root name, and return it.
|
|
"""
|
|
root = self.root
|
|
root.total = time.perf_counter() - self.start_time
|
|
root.count = 1
|
|
return root
|
|
|
|
def get_timing_tree(self, node: TimerNode = None) -> Dict[str, Any]:
|
|
"""
|
|
Recursively build a tree of timings, suitable for output/archiving.
|
|
"""
|
|
res: Dict[str, Any] = {}
|
|
if node is None:
|
|
# Special case the root - total is time since it was created, and count is 1
|
|
node = self.get_root()
|
|
res["name"] = "root"
|
|
|
|
# Only output gauges at top level
|
|
if self.gauges:
|
|
res["gauges"] = self._get_gauges()
|
|
|
|
if self.metadata:
|
|
self.metadata["end_time_seconds"] = str(int(time.time()))
|
|
res["metadata"] = self.metadata
|
|
|
|
res["total"] = node.total
|
|
res["count"] = node.count
|
|
|
|
if node.is_parallel:
|
|
# Note when the block ran in parallel, so that it's less confusing that a timer is less that its children.
|
|
res["is_parallel"] = True
|
|
|
|
child_total = 0.0
|
|
child_dict = {}
|
|
for child_name, child_node in node.children.items():
|
|
child_res: Dict[str, Any] = self.get_timing_tree(child_node)
|
|
child_dict[child_name] = child_res
|
|
child_total += child_res["total"]
|
|
|
|
# "self" time is total time minus all time spent on children
|
|
res["self"] = max(0.0, node.total - child_total)
|
|
if child_dict:
|
|
res["children"] = child_dict
|
|
|
|
return res
|
|
|
|
def set_gauge(self, name: str, value: float) -> None:
|
|
if math.isnan(value):
|
|
return
|
|
gauge_node = self.gauges.get(name)
|
|
if gauge_node:
|
|
gauge_node.update(value)
|
|
else:
|
|
self.gauges[name] = GaugeNode(value)
|
|
|
|
def add_metadata(self, key: str, value: str) -> None:
|
|
self.metadata[key] = value
|
|
|
|
def _get_gauges(self) -> Dict[str, Dict[str, float]]:
|
|
gauges = {}
|
|
for gauge_name, gauge_node in self.gauges.items():
|
|
gauges[gauge_name] = gauge_node.as_dict()
|
|
return gauges
|
|
|
|
def _add_default_metadata(self):
|
|
self.metadata["timer_format_version"] = TIMER_FORMAT_VERSION
|
|
self.metadata["start_time_seconds"] = str(int(time.time()))
|
|
self.metadata["python_version"] = sys.version
|
|
self.metadata["command_line_arguments"] = " ".join(sys.argv)
|
|
|
|
|
|
# Maintain a separate "global" timer per thread, so that they don't accidentally conflict with each other.
|
|
_thread_timer_stacks: Dict[int, TimerStack] = {}
|
|
|
|
|
|
def _get_thread_timer() -> TimerStack:
|
|
ident = threading.get_ident()
|
|
if ident not in _thread_timer_stacks:
|
|
timer_stack = TimerStack()
|
|
_thread_timer_stacks[ident] = timer_stack
|
|
return _thread_timer_stacks[ident]
|
|
|
|
|
|
def get_timer_stack_for_thread(t: threading.Thread) -> Optional[TimerStack]:
|
|
if t.ident is None:
|
|
# Thread hasn't started, shouldn't ever happen
|
|
return None
|
|
return _thread_timer_stacks.get(t.ident)
|
|
|
|
|
|
@contextmanager
|
|
def hierarchical_timer(name: str, timer_stack: TimerStack = None) -> Generator:
|
|
"""
|
|
Creates a scoped timer around a block of code. This time spent will automatically be incremented when
|
|
the context manager exits.
|
|
"""
|
|
timer_stack = timer_stack or _get_thread_timer()
|
|
timer_node = timer_stack.push(name)
|
|
start_time = time.perf_counter()
|
|
|
|
try:
|
|
# The wrapped code block will run here.
|
|
yield timer_node
|
|
finally:
|
|
# This will trigger either when the context manager exits, or an exception is raised.
|
|
# We'll accumulate the time, and the exception (if any) gets raised automatically.
|
|
elapsed = time.perf_counter() - start_time
|
|
timer_node.add_time(elapsed)
|
|
timer_stack.pop()
|
|
|
|
|
|
# This is used to ensure the signature of the decorated function is preserved
|
|
# See also https://github.com/python/mypy/issues/3157
|
|
FuncT = TypeVar("FuncT", bound=Callable[..., Any])
|
|
|
|
|
|
def timed(func: FuncT) -> FuncT:
|
|
"""
|
|
Decorator for timing a function or method. The name of the timer will be the qualified name of the function.
|
|
Usage:
|
|
@timed
|
|
def my_func(x, y):
|
|
return x + y
|
|
Note that because this doesn't take arguments, the global timer stack is always used.
|
|
"""
|
|
|
|
def wrapped(*args, **kwargs):
|
|
with hierarchical_timer(func.__qualname__):
|
|
return func(*args, **kwargs)
|
|
|
|
return wrapped # type: ignore
|
|
|
|
|
|
def set_gauge(name: str, value: float, timer_stack: TimerStack = None) -> None:
|
|
"""
|
|
Updates the value of the gauge (or creates it if it hasn't been set before).
|
|
"""
|
|
timer_stack = timer_stack or _get_thread_timer()
|
|
timer_stack.set_gauge(name, value)
|
|
|
|
|
|
def merge_gauges(gauges: Dict[str, GaugeNode], timer_stack: TimerStack = None) -> None:
|
|
"""
|
|
Merge the gauges from another TimerStack with the provided one (or the
|
|
current thread's stack if none is provided).
|
|
:param gauges:
|
|
:param timer_stack:
|
|
:return:
|
|
"""
|
|
timer_stack = timer_stack or _get_thread_timer()
|
|
for n, g in gauges.items():
|
|
if n in timer_stack.gauges:
|
|
timer_stack.gauges[n].merge(g)
|
|
else:
|
|
timer_stack.gauges[n] = g
|
|
|
|
|
|
def add_metadata(key: str, value: str, timer_stack: TimerStack = None) -> None:
|
|
timer_stack = timer_stack or _get_thread_timer()
|
|
timer_stack.add_metadata(key, value)
|
|
|
|
|
|
def get_timer_tree(timer_stack: TimerStack = None) -> Dict[str, Any]:
|
|
"""
|
|
Return the tree of timings from the TimerStack as a dictionary (or the
|
|
current thread's stack if none is provided)
|
|
"""
|
|
timer_stack = timer_stack or _get_thread_timer()
|
|
return timer_stack.get_timing_tree()
|
|
|
|
|
|
def get_timer_root(timer_stack: TimerStack = None) -> TimerNode:
|
|
"""
|
|
Get the root TimerNode of the timer_stack (or the current thread's
|
|
TimerStack if not specified)
|
|
"""
|
|
timer_stack = timer_stack or _get_thread_timer()
|
|
return timer_stack.get_root()
|
|
|
|
|
|
def reset_timers(timer_stack: TimerStack = None) -> None:
|
|
"""
|
|
Reset the timer_stack (or the current thread's TimerStack if not specified)
|
|
"""
|
|
timer_stack = timer_stack or _get_thread_timer()
|
|
timer_stack.reset()
|