| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100 |
- import contextlib
- import os
- import sys
- import tracemalloc
- import unittest
- from unittest.mock import patch
- from test.support.script_helper import (assert_python_ok, assert_python_failure,
- interpreter_requires_environment)
- from test import support
- from test.support import os_helper
- try:
- import _testcapi
- except ImportError:
- _testcapi = None
- EMPTY_STRING_SIZE = sys.getsizeof(b'')
- INVALID_NFRAME = (-1, 2**30)
- def get_frames(nframe, lineno_delta):
- frames = []
- frame = sys._getframe(1)
- for index in range(nframe):
- code = frame.f_code
- lineno = frame.f_lineno + lineno_delta
- frames.append((code.co_filename, lineno))
- lineno_delta = 0
- frame = frame.f_back
- if frame is None:
- break
- return tuple(frames)
- def allocate_bytes(size):
- nframe = tracemalloc.get_traceback_limit()
- bytes_len = (size - EMPTY_STRING_SIZE)
- frames = get_frames(nframe, 1)
- data = b'x' * bytes_len
- return data, tracemalloc.Traceback(frames, min(len(frames), nframe))
- def create_snapshots():
- traceback_limit = 2
- # _tracemalloc._get_traces() returns a list of (domain, size,
- # traceback_frames) tuples. traceback_frames is a tuple of (filename,
- # line_number) tuples.
- raw_traces = [
- (0, 10, (('a.py', 2), ('b.py', 4)), 3),
- (0, 10, (('a.py', 2), ('b.py', 4)), 3),
- (0, 10, (('a.py', 2), ('b.py', 4)), 3),
- (1, 2, (('a.py', 5), ('b.py', 4)), 3),
- (2, 66, (('b.py', 1),), 1),
- (3, 7, (('<unknown>', 0),), 1),
- ]
- snapshot = tracemalloc.Snapshot(raw_traces, traceback_limit)
- raw_traces2 = [
- (0, 10, (('a.py', 2), ('b.py', 4)), 3),
- (0, 10, (('a.py', 2), ('b.py', 4)), 3),
- (0, 10, (('a.py', 2), ('b.py', 4)), 3),
- (2, 2, (('a.py', 5), ('b.py', 4)), 3),
- (2, 5000, (('a.py', 5), ('b.py', 4)), 3),
- (4, 400, (('c.py', 578),), 1),
- ]
- snapshot2 = tracemalloc.Snapshot(raw_traces2, traceback_limit)
- return (snapshot, snapshot2)
- def frame(filename, lineno):
- return tracemalloc._Frame((filename, lineno))
- def traceback(*frames):
- return tracemalloc.Traceback(frames)
- def traceback_lineno(filename, lineno):
- return traceback((filename, lineno))
- def traceback_filename(filename):
- return traceback_lineno(filename, 0)
- class TestTraceback(unittest.TestCase):
- def test_repr(self):
- def get_repr(*args) -> str:
- return repr(tracemalloc.Traceback(*args))
- self.assertEqual(get_repr(()), "<Traceback ()>")
- self.assertEqual(get_repr((), 0), "<Traceback () total_nframe=0>")
- frames = (("f1", 1), ("f2", 2))
- exp_repr_frames = (
- "(<Frame filename='f2' lineno=2>,"
- " <Frame filename='f1' lineno=1>)"
- )
- self.assertEqual(get_repr(frames),
- f"<Traceback {exp_repr_frames}>")
- self.assertEqual(get_repr(frames, 2),
- f"<Traceback {exp_repr_frames} total_nframe=2>")
- class TestTracemallocEnabled(unittest.TestCase):
- def setUp(self):
- if tracemalloc.is_tracing():
- self.skipTest("tracemalloc must be stopped before the test")
- tracemalloc.start(1)
- def tearDown(self):
- tracemalloc.stop()
- def test_get_tracemalloc_memory(self):
- data = [allocate_bytes(123) for count in range(1000)]
- size = tracemalloc.get_tracemalloc_memory()
- self.assertGreaterEqual(size, 0)
- tracemalloc.clear_traces()
- size2 = tracemalloc.get_tracemalloc_memory()
- self.assertGreaterEqual(size2, 0)
- self.assertLessEqual(size2, size)
- def test_get_object_traceback(self):
- tracemalloc.clear_traces()
- obj_size = 12345
- obj, obj_traceback = allocate_bytes(obj_size)
- traceback = tracemalloc.get_object_traceback(obj)
- self.assertEqual(traceback, obj_traceback)
- def test_new_reference(self):
- tracemalloc.clear_traces()
- # gc.collect() indirectly calls PyList_ClearFreeList()
- support.gc_collect()
- # Create a list and "destroy it": put it in the PyListObject free list
- obj = []
- obj = None
- # Create a list which should reuse the previously created empty list
- obj = []
- nframe = tracemalloc.get_traceback_limit()
- frames = get_frames(nframe, -3)
- obj_traceback = tracemalloc.Traceback(frames, min(len(frames), nframe))
- traceback = tracemalloc.get_object_traceback(obj)
- self.assertIsNotNone(traceback)
- self.assertEqual(traceback, obj_traceback)
- def test_set_traceback_limit(self):
- obj_size = 10
- tracemalloc.stop()
- self.assertRaises(ValueError, tracemalloc.start, -1)
- tracemalloc.stop()
- tracemalloc.start(10)
- obj2, obj2_traceback = allocate_bytes(obj_size)
- traceback = tracemalloc.get_object_traceback(obj2)
- self.assertEqual(len(traceback), 10)
- self.assertEqual(traceback, obj2_traceback)
- tracemalloc.stop()
- tracemalloc.start(1)
- obj, obj_traceback = allocate_bytes(obj_size)
- traceback = tracemalloc.get_object_traceback(obj)
- self.assertEqual(len(traceback), 1)
- self.assertEqual(traceback, obj_traceback)
- def find_trace(self, traces, traceback):
- for trace in traces:
- if trace[2] == traceback._frames:
- return trace
- self.fail("trace not found")
- def test_get_traces(self):
- tracemalloc.clear_traces()
- obj_size = 12345
- obj, obj_traceback = allocate_bytes(obj_size)
- traces = tracemalloc._get_traces()
- trace = self.find_trace(traces, obj_traceback)
- self.assertIsInstance(trace, tuple)
- domain, size, traceback, length = trace
- self.assertEqual(size, obj_size)
- self.assertEqual(traceback, obj_traceback._frames)
- tracemalloc.stop()
- self.assertEqual(tracemalloc._get_traces(), [])
- def test_get_traces_intern_traceback(self):
- # dummy wrappers to get more useful and identical frames in the traceback
- def allocate_bytes2(size):
- return allocate_bytes(size)
- def allocate_bytes3(size):
- return allocate_bytes2(size)
- def allocate_bytes4(size):
- return allocate_bytes3(size)
- # Ensure that two identical tracebacks are not duplicated
- tracemalloc.stop()
- tracemalloc.start(4)
- obj_size = 123
- obj1, obj1_traceback = allocate_bytes4(obj_size)
- obj2, obj2_traceback = allocate_bytes4(obj_size)
- traces = tracemalloc._get_traces()
- obj1_traceback._frames = tuple(reversed(obj1_traceback._frames))
- obj2_traceback._frames = tuple(reversed(obj2_traceback._frames))
- trace1 = self.find_trace(traces, obj1_traceback)
- trace2 = self.find_trace(traces, obj2_traceback)
- domain1, size1, traceback1, length1 = trace1
- domain2, size2, traceback2, length2 = trace2
- self.assertIs(traceback2, traceback1)
- def test_get_traced_memory(self):
- # Python allocates some internals objects, so the test must tolerate
- # a small difference between the expected size and the real usage
- max_error = 2048
- # allocate one object
- obj_size = 1024 * 1024
- tracemalloc.clear_traces()
- obj, obj_traceback = allocate_bytes(obj_size)
- size, peak_size = tracemalloc.get_traced_memory()
- self.assertGreaterEqual(size, obj_size)
- self.assertGreaterEqual(peak_size, size)
- self.assertLessEqual(size - obj_size, max_error)
- self.assertLessEqual(peak_size - size, max_error)
- # destroy the object
- obj = None
- size2, peak_size2 = tracemalloc.get_traced_memory()
- self.assertLess(size2, size)
- self.assertGreaterEqual(size - size2, obj_size - max_error)
- self.assertGreaterEqual(peak_size2, peak_size)
- # clear_traces() must reset traced memory counters
- tracemalloc.clear_traces()
- self.assertEqual(tracemalloc.get_traced_memory(), (0, 0))
- # allocate another object
- obj, obj_traceback = allocate_bytes(obj_size)
- size, peak_size = tracemalloc.get_traced_memory()
- self.assertGreaterEqual(size, obj_size)
- # stop() also resets traced memory counters
- tracemalloc.stop()
- self.assertEqual(tracemalloc.get_traced_memory(), (0, 0))
- def test_clear_traces(self):
- obj, obj_traceback = allocate_bytes(123)
- traceback = tracemalloc.get_object_traceback(obj)
- self.assertIsNotNone(traceback)
- tracemalloc.clear_traces()
- traceback2 = tracemalloc.get_object_traceback(obj)
- self.assertIsNone(traceback2)
- def test_reset_peak(self):
- # Python allocates some internals objects, so the test must tolerate
- # a small difference between the expected size and the real usage
- tracemalloc.clear_traces()
- # Example: allocate a large piece of memory, temporarily
- large_sum = sum(list(range(100000)))
- size1, peak1 = tracemalloc.get_traced_memory()
- # reset_peak() resets peak to traced memory: peak2 < peak1
- tracemalloc.reset_peak()
- size2, peak2 = tracemalloc.get_traced_memory()
- self.assertGreaterEqual(peak2, size2)
- self.assertLess(peak2, peak1)
- # check that peak continue to be updated if new memory is allocated:
- # peak3 > peak2
- obj_size = 1024 * 1024
- obj, obj_traceback = allocate_bytes(obj_size)
- size3, peak3 = tracemalloc.get_traced_memory()
- self.assertGreaterEqual(peak3, size3)
- self.assertGreater(peak3, peak2)
- self.assertGreaterEqual(peak3 - peak2, obj_size)
- def test_is_tracing(self):
- tracemalloc.stop()
- self.assertFalse(tracemalloc.is_tracing())
- tracemalloc.start()
- self.assertTrue(tracemalloc.is_tracing())
- def test_snapshot(self):
- obj, source = allocate_bytes(123)
- # take a snapshot
- snapshot = tracemalloc.take_snapshot()
- # This can vary
- self.assertGreater(snapshot.traces[1].traceback.total_nframe, 10)
- # write on disk
- snapshot.dump(os_helper.TESTFN)
- self.addCleanup(os_helper.unlink, os_helper.TESTFN)
- # load from disk
- snapshot2 = tracemalloc.Snapshot.load(os_helper.TESTFN)
- self.assertEqual(snapshot2.traces, snapshot.traces)
- # tracemalloc must be tracing memory allocations to take a snapshot
- tracemalloc.stop()
- with self.assertRaises(RuntimeError) as cm:
- tracemalloc.take_snapshot()
- self.assertEqual(str(cm.exception),
- "the tracemalloc module must be tracing memory "
- "allocations to take a snapshot")
- def test_snapshot_save_attr(self):
- # take a snapshot with a new attribute
- snapshot = tracemalloc.take_snapshot()
- snapshot.test_attr = "new"
- snapshot.dump(os_helper.TESTFN)
- self.addCleanup(os_helper.unlink, os_helper.TESTFN)
- # load() should recreate the attribute
- snapshot2 = tracemalloc.Snapshot.load(os_helper.TESTFN)
- self.assertEqual(snapshot2.test_attr, "new")
- def fork_child(self):
- if not tracemalloc.is_tracing():
- return 2
- obj_size = 12345
- obj, obj_traceback = allocate_bytes(obj_size)
- traceback = tracemalloc.get_object_traceback(obj)
- if traceback is None:
- return 3
- # everything is fine
- return 0
- @support.requires_fork()
- def test_fork(self):
- # check that tracemalloc is still working after fork
- pid = os.fork()
- if not pid:
- # child
- exitcode = 1
- try:
- exitcode = self.fork_child()
- finally:
- os._exit(exitcode)
- else:
- support.wait_process(pid, exitcode=0)
- def test_no_incomplete_frames(self):
- tracemalloc.stop()
- tracemalloc.start(8)
- def f(x):
- def g():
- return x
- return g
- obj = f(0).__closure__[0]
- traceback = tracemalloc.get_object_traceback(obj)
- self.assertIn("test_tracemalloc", traceback[-1].filename)
- self.assertNotIn("test_tracemalloc", traceback[-2].filename)
- class TestSnapshot(unittest.TestCase):
- maxDiff = 4000
- def test_create_snapshot(self):
- raw_traces = [(0, 5, (('a.py', 2),), 10)]
- with contextlib.ExitStack() as stack:
- stack.enter_context(patch.object(tracemalloc, 'is_tracing',
- return_value=True))
- stack.enter_context(patch.object(tracemalloc, 'get_traceback_limit',
- return_value=5))
- stack.enter_context(patch.object(tracemalloc, '_get_traces',
- return_value=raw_traces))
- snapshot = tracemalloc.take_snapshot()
- self.assertEqual(snapshot.traceback_limit, 5)
- self.assertEqual(len(snapshot.traces), 1)
- trace = snapshot.traces[0]
- self.assertEqual(trace.size, 5)
- self.assertEqual(trace.traceback.total_nframe, 10)
- self.assertEqual(len(trace.traceback), 1)
- self.assertEqual(trace.traceback[0].filename, 'a.py')
- self.assertEqual(trace.traceback[0].lineno, 2)
- def test_filter_traces(self):
- snapshot, snapshot2 = create_snapshots()
- filter1 = tracemalloc.Filter(False, "b.py")
- filter2 = tracemalloc.Filter(True, "a.py", 2)
- filter3 = tracemalloc.Filter(True, "a.py", 5)
- original_traces = list(snapshot.traces._traces)
- # exclude b.py
- snapshot3 = snapshot.filter_traces((filter1,))
- self.assertEqual(snapshot3.traces._traces, [
- (0, 10, (('a.py', 2), ('b.py', 4)), 3),
- (0, 10, (('a.py', 2), ('b.py', 4)), 3),
- (0, 10, (('a.py', 2), ('b.py', 4)), 3),
- (1, 2, (('a.py', 5), ('b.py', 4)), 3),
- (3, 7, (('<unknown>', 0),), 1),
- ])
- # filter_traces() must not touch the original snapshot
- self.assertEqual(snapshot.traces._traces, original_traces)
- # only include two lines of a.py
- snapshot4 = snapshot3.filter_traces((filter2, filter3))
- self.assertEqual(snapshot4.traces._traces, [
- (0, 10, (('a.py', 2), ('b.py', 4)), 3),
- (0, 10, (('a.py', 2), ('b.py', 4)), 3),
- (0, 10, (('a.py', 2), ('b.py', 4)), 3),
- (1, 2, (('a.py', 5), ('b.py', 4)), 3),
- ])
- # No filter: just duplicate the snapshot
- snapshot5 = snapshot.filter_traces(())
- self.assertIsNot(snapshot5, snapshot)
- self.assertIsNot(snapshot5.traces, snapshot.traces)
- self.assertEqual(snapshot5.traces, snapshot.traces)
- self.assertRaises(TypeError, snapshot.filter_traces, filter1)
- def test_filter_traces_domain(self):
- snapshot, snapshot2 = create_snapshots()
- filter1 = tracemalloc.Filter(False, "a.py", domain=1)
- filter2 = tracemalloc.Filter(True, "a.py", domain=1)
- original_traces = list(snapshot.traces._traces)
- # exclude a.py of domain 1
- snapshot3 = snapshot.filter_traces((filter1,))
- self.assertEqual(snapshot3.traces._traces, [
- (0, 10, (('a.py', 2), ('b.py', 4)), 3),
- (0, 10, (('a.py', 2), ('b.py', 4)), 3),
- (0, 10, (('a.py', 2), ('b.py', 4)), 3),
- (2, 66, (('b.py', 1),), 1),
- (3, 7, (('<unknown>', 0),), 1),
- ])
- # include domain 1
- snapshot3 = snapshot.filter_traces((filter1,))
- self.assertEqual(snapshot3.traces._traces, [
- (0, 10, (('a.py', 2), ('b.py', 4)), 3),
- (0, 10, (('a.py', 2), ('b.py', 4)), 3),
- (0, 10, (('a.py', 2), ('b.py', 4)), 3),
- (2, 66, (('b.py', 1),), 1),
- (3, 7, (('<unknown>', 0),), 1),
- ])
- def test_filter_traces_domain_filter(self):
- snapshot, snapshot2 = create_snapshots()
- filter1 = tracemalloc.DomainFilter(False, domain=3)
- filter2 = tracemalloc.DomainFilter(True, domain=3)
- # exclude domain 2
- snapshot3 = snapshot.filter_traces((filter1,))
- self.assertEqual(snapshot3.traces._traces, [
- (0, 10, (('a.py', 2), ('b.py', 4)), 3),
- (0, 10, (('a.py', 2), ('b.py', 4)), 3),
- (0, 10, (('a.py', 2), ('b.py', 4)), 3),
- (1, 2, (('a.py', 5), ('b.py', 4)), 3),
- (2, 66, (('b.py', 1),), 1),
- ])
- # include domain 2
- snapshot3 = snapshot.filter_traces((filter2,))
- self.assertEqual(snapshot3.traces._traces, [
- (3, 7, (('<unknown>', 0),), 1),
- ])
- def test_snapshot_group_by_line(self):
- snapshot, snapshot2 = create_snapshots()
- tb_0 = traceback_lineno('<unknown>', 0)
- tb_a_2 = traceback_lineno('a.py', 2)
- tb_a_5 = traceback_lineno('a.py', 5)
- tb_b_1 = traceback_lineno('b.py', 1)
- tb_c_578 = traceback_lineno('c.py', 578)
- # stats per file and line
- stats1 = snapshot.statistics('lineno')
- self.assertEqual(stats1, [
- tracemalloc.Statistic(tb_b_1, 66, 1),
- tracemalloc.Statistic(tb_a_2, 30, 3),
- tracemalloc.Statistic(tb_0, 7, 1),
- tracemalloc.Statistic(tb_a_5, 2, 1),
- ])
- # stats per file and line (2)
- stats2 = snapshot2.statistics('lineno')
- self.assertEqual(stats2, [
- tracemalloc.Statistic(tb_a_5, 5002, 2),
- tracemalloc.Statistic(tb_c_578, 400, 1),
- tracemalloc.Statistic(tb_a_2, 30, 3),
- ])
- # stats diff per file and line
- statistics = snapshot2.compare_to(snapshot, 'lineno')
- self.assertEqual(statistics, [
- tracemalloc.StatisticDiff(tb_a_5, 5002, 5000, 2, 1),
- tracemalloc.StatisticDiff(tb_c_578, 400, 400, 1, 1),
- tracemalloc.StatisticDiff(tb_b_1, 0, -66, 0, -1),
- tracemalloc.StatisticDiff(tb_0, 0, -7, 0, -1),
- tracemalloc.StatisticDiff(tb_a_2, 30, 0, 3, 0),
- ])
- def test_snapshot_group_by_file(self):
- snapshot, snapshot2 = create_snapshots()
- tb_0 = traceback_filename('<unknown>')
- tb_a = traceback_filename('a.py')
- tb_b = traceback_filename('b.py')
- tb_c = traceback_filename('c.py')
- # stats per file
- stats1 = snapshot.statistics('filename')
- self.assertEqual(stats1, [
- tracemalloc.Statistic(tb_b, 66, 1),
- tracemalloc.Statistic(tb_a, 32, 4),
- tracemalloc.Statistic(tb_0, 7, 1),
- ])
- # stats per file (2)
- stats2 = snapshot2.statistics('filename')
- self.assertEqual(stats2, [
- tracemalloc.Statistic(tb_a, 5032, 5),
- tracemalloc.Statistic(tb_c, 400, 1),
- ])
- # stats diff per file
- diff = snapshot2.compare_to(snapshot, 'filename')
- self.assertEqual(diff, [
- tracemalloc.StatisticDiff(tb_a, 5032, 5000, 5, 1),
- tracemalloc.StatisticDiff(tb_c, 400, 400, 1, 1),
- tracemalloc.StatisticDiff(tb_b, 0, -66, 0, -1),
- tracemalloc.StatisticDiff(tb_0, 0, -7, 0, -1),
- ])
- def test_snapshot_group_by_traceback(self):
- snapshot, snapshot2 = create_snapshots()
- # stats per file
- tb1 = traceback(('a.py', 2), ('b.py', 4))
- tb2 = traceback(('a.py', 5), ('b.py', 4))
- tb3 = traceback(('b.py', 1))
- tb4 = traceback(('<unknown>', 0))
- stats1 = snapshot.statistics('traceback')
- self.assertEqual(stats1, [
- tracemalloc.Statistic(tb3, 66, 1),
- tracemalloc.Statistic(tb1, 30, 3),
- tracemalloc.Statistic(tb4, 7, 1),
- tracemalloc.Statistic(tb2, 2, 1),
- ])
- # stats per file (2)
- tb5 = traceback(('c.py', 578))
- stats2 = snapshot2.statistics('traceback')
- self.assertEqual(stats2, [
- tracemalloc.Statistic(tb2, 5002, 2),
- tracemalloc.Statistic(tb5, 400, 1),
- tracemalloc.Statistic(tb1, 30, 3),
- ])
- # stats diff per file
- diff = snapshot2.compare_to(snapshot, 'traceback')
- self.assertEqual(diff, [
- tracemalloc.StatisticDiff(tb2, 5002, 5000, 2, 1),
- tracemalloc.StatisticDiff(tb5, 400, 400, 1, 1),
- tracemalloc.StatisticDiff(tb3, 0, -66, 0, -1),
- tracemalloc.StatisticDiff(tb4, 0, -7, 0, -1),
- tracemalloc.StatisticDiff(tb1, 30, 0, 3, 0),
- ])
- self.assertRaises(ValueError,
- snapshot.statistics, 'traceback', cumulative=True)
- def test_snapshot_group_by_cumulative(self):
- snapshot, snapshot2 = create_snapshots()
- tb_0 = traceback_filename('<unknown>')
- tb_a = traceback_filename('a.py')
- tb_b = traceback_filename('b.py')
- tb_a_2 = traceback_lineno('a.py', 2)
- tb_a_5 = traceback_lineno('a.py', 5)
- tb_b_1 = traceback_lineno('b.py', 1)
- tb_b_4 = traceback_lineno('b.py', 4)
- # per file
- stats = snapshot.statistics('filename', True)
- self.assertEqual(stats, [
- tracemalloc.Statistic(tb_b, 98, 5),
- tracemalloc.Statistic(tb_a, 32, 4),
- tracemalloc.Statistic(tb_0, 7, 1),
- ])
- # per line
- stats = snapshot.statistics('lineno', True)
- self.assertEqual(stats, [
- tracemalloc.Statistic(tb_b_1, 66, 1),
- tracemalloc.Statistic(tb_b_4, 32, 4),
- tracemalloc.Statistic(tb_a_2, 30, 3),
- tracemalloc.Statistic(tb_0, 7, 1),
- tracemalloc.Statistic(tb_a_5, 2, 1),
- ])
- def test_trace_format(self):
- snapshot, snapshot2 = create_snapshots()
- trace = snapshot.traces[0]
- self.assertEqual(str(trace), 'b.py:4: 10 B')
- traceback = trace.traceback
- self.assertEqual(str(traceback), 'b.py:4')
- frame = traceback[0]
- self.assertEqual(str(frame), 'b.py:4')
- def test_statistic_format(self):
- snapshot, snapshot2 = create_snapshots()
- stats = snapshot.statistics('lineno')
- stat = stats[0]
- self.assertEqual(str(stat),
- 'b.py:1: size=66 B, count=1, average=66 B')
- def test_statistic_diff_format(self):
- snapshot, snapshot2 = create_snapshots()
- stats = snapshot2.compare_to(snapshot, 'lineno')
- stat = stats[0]
- self.assertEqual(str(stat),
- 'a.py:5: size=5002 B (+5000 B), count=2 (+1), average=2501 B')
- def test_slices(self):
- snapshot, snapshot2 = create_snapshots()
- self.assertEqual(snapshot.traces[:2],
- (snapshot.traces[0], snapshot.traces[1]))
- traceback = snapshot.traces[0].traceback
- self.assertEqual(traceback[:2],
- (traceback[0], traceback[1]))
- def test_format_traceback(self):
- snapshot, snapshot2 = create_snapshots()
- def getline(filename, lineno):
- return ' <%s, %s>' % (filename, lineno)
- with unittest.mock.patch('tracemalloc.linecache.getline',
- side_effect=getline):
- tb = snapshot.traces[0].traceback
- self.assertEqual(tb.format(),
- [' File "b.py", line 4',
- ' <b.py, 4>',
- ' File "a.py", line 2',
- ' <a.py, 2>'])
- self.assertEqual(tb.format(limit=1),
- [' File "a.py", line 2',
- ' <a.py, 2>'])
- self.assertEqual(tb.format(limit=-1),
- [' File "b.py", line 4',
- ' <b.py, 4>'])
- self.assertEqual(tb.format(most_recent_first=True),
- [' File "a.py", line 2',
- ' <a.py, 2>',
- ' File "b.py", line 4',
- ' <b.py, 4>'])
- self.assertEqual(tb.format(limit=1, most_recent_first=True),
- [' File "a.py", line 2',
- ' <a.py, 2>'])
- self.assertEqual(tb.format(limit=-1, most_recent_first=True),
- [' File "b.py", line 4',
- ' <b.py, 4>'])
- class TestFilters(unittest.TestCase):
- maxDiff = 2048
- def test_filter_attributes(self):
- # test default values
- f = tracemalloc.Filter(True, "abc")
- self.assertEqual(f.inclusive, True)
- self.assertEqual(f.filename_pattern, "abc")
- self.assertIsNone(f.lineno)
- self.assertEqual(f.all_frames, False)
- # test custom values
- f = tracemalloc.Filter(False, "test.py", 123, True)
- self.assertEqual(f.inclusive, False)
- self.assertEqual(f.filename_pattern, "test.py")
- self.assertEqual(f.lineno, 123)
- self.assertEqual(f.all_frames, True)
- # parameters passed by keyword
- f = tracemalloc.Filter(inclusive=False, filename_pattern="test.py", lineno=123, all_frames=True)
- self.assertEqual(f.inclusive, False)
- self.assertEqual(f.filename_pattern, "test.py")
- self.assertEqual(f.lineno, 123)
- self.assertEqual(f.all_frames, True)
- # read-only attribute
- self.assertRaises(AttributeError, setattr, f, "filename_pattern", "abc")
- def test_filter_match(self):
- # filter without line number
- f = tracemalloc.Filter(True, "abc")
- self.assertTrue(f._match_frame("abc", 0))
- self.assertTrue(f._match_frame("abc", 5))
- self.assertTrue(f._match_frame("abc", 10))
- self.assertFalse(f._match_frame("12356", 0))
- self.assertFalse(f._match_frame("12356", 5))
- self.assertFalse(f._match_frame("12356", 10))
- f = tracemalloc.Filter(False, "abc")
- self.assertFalse(f._match_frame("abc", 0))
- self.assertFalse(f._match_frame("abc", 5))
- self.assertFalse(f._match_frame("abc", 10))
- self.assertTrue(f._match_frame("12356", 0))
- self.assertTrue(f._match_frame("12356", 5))
- self.assertTrue(f._match_frame("12356", 10))
- # filter with line number > 0
- f = tracemalloc.Filter(True, "abc", 5)
- self.assertFalse(f._match_frame("abc", 0))
- self.assertTrue(f._match_frame("abc", 5))
- self.assertFalse(f._match_frame("abc", 10))
- self.assertFalse(f._match_frame("12356", 0))
- self.assertFalse(f._match_frame("12356", 5))
- self.assertFalse(f._match_frame("12356", 10))
- f = tracemalloc.Filter(False, "abc", 5)
- self.assertTrue(f._match_frame("abc", 0))
- self.assertFalse(f._match_frame("abc", 5))
- self.assertTrue(f._match_frame("abc", 10))
- self.assertTrue(f._match_frame("12356", 0))
- self.assertTrue(f._match_frame("12356", 5))
- self.assertTrue(f._match_frame("12356", 10))
- # filter with line number 0
- f = tracemalloc.Filter(True, "abc", 0)
- self.assertTrue(f._match_frame("abc", 0))
- self.assertFalse(f._match_frame("abc", 5))
- self.assertFalse(f._match_frame("abc", 10))
- self.assertFalse(f._match_frame("12356", 0))
- self.assertFalse(f._match_frame("12356", 5))
- self.assertFalse(f._match_frame("12356", 10))
- f = tracemalloc.Filter(False, "abc", 0)
- self.assertFalse(f._match_frame("abc", 0))
- self.assertTrue(f._match_frame("abc", 5))
- self.assertTrue(f._match_frame("abc", 10))
- self.assertTrue(f._match_frame("12356", 0))
- self.assertTrue(f._match_frame("12356", 5))
- self.assertTrue(f._match_frame("12356", 10))
- def test_filter_match_filename(self):
- def fnmatch(inclusive, filename, pattern):
- f = tracemalloc.Filter(inclusive, pattern)
- return f._match_frame(filename, 0)
- self.assertTrue(fnmatch(True, "abc", "abc"))
- self.assertFalse(fnmatch(True, "12356", "abc"))
- self.assertFalse(fnmatch(True, "<unknown>", "abc"))
- self.assertFalse(fnmatch(False, "abc", "abc"))
- self.assertTrue(fnmatch(False, "12356", "abc"))
- self.assertTrue(fnmatch(False, "<unknown>", "abc"))
- def test_filter_match_filename_joker(self):
- def fnmatch(filename, pattern):
- filter = tracemalloc.Filter(True, pattern)
- return filter._match_frame(filename, 0)
- # empty string
- self.assertFalse(fnmatch('abc', ''))
- self.assertFalse(fnmatch('', 'abc'))
- self.assertTrue(fnmatch('', ''))
- self.assertTrue(fnmatch('', '*'))
- # no *
- self.assertTrue(fnmatch('abc', 'abc'))
- self.assertFalse(fnmatch('abc', 'abcd'))
- self.assertFalse(fnmatch('abc', 'def'))
- # a*
- self.assertTrue(fnmatch('abc', 'a*'))
- self.assertTrue(fnmatch('abc', 'abc*'))
- self.assertFalse(fnmatch('abc', 'b*'))
- self.assertFalse(fnmatch('abc', 'abcd*'))
- # a*b
- self.assertTrue(fnmatch('abc', 'a*c'))
- self.assertTrue(fnmatch('abcdcx', 'a*cx'))
- self.assertFalse(fnmatch('abb', 'a*c'))
- self.assertFalse(fnmatch('abcdce', 'a*cx'))
- # a*b*c
- self.assertTrue(fnmatch('abcde', 'a*c*e'))
- self.assertTrue(fnmatch('abcbdefeg', 'a*bd*eg'))
- self.assertFalse(fnmatch('abcdd', 'a*c*e'))
- self.assertFalse(fnmatch('abcbdefef', 'a*bd*eg'))
- # replace .pyc suffix with .py
- self.assertTrue(fnmatch('a.pyc', 'a.py'))
- self.assertTrue(fnmatch('a.py', 'a.pyc'))
- if os.name == 'nt':
- # case insensitive
- self.assertTrue(fnmatch('aBC', 'ABc'))
- self.assertTrue(fnmatch('aBcDe', 'Ab*dE'))
- self.assertTrue(fnmatch('a.pyc', 'a.PY'))
- self.assertTrue(fnmatch('a.py', 'a.PYC'))
- else:
- # case sensitive
- self.assertFalse(fnmatch('aBC', 'ABc'))
- self.assertFalse(fnmatch('aBcDe', 'Ab*dE'))
- self.assertFalse(fnmatch('a.pyc', 'a.PY'))
- self.assertFalse(fnmatch('a.py', 'a.PYC'))
- if os.name == 'nt':
- # normalize alternate separator "/" to the standard separator "\"
- self.assertTrue(fnmatch(r'a/b', r'a\b'))
- self.assertTrue(fnmatch(r'a\b', r'a/b'))
- self.assertTrue(fnmatch(r'a/b\c', r'a\b/c'))
- self.assertTrue(fnmatch(r'a/b/c', r'a\b\c'))
- else:
- # there is no alternate separator
- self.assertFalse(fnmatch(r'a/b', r'a\b'))
- self.assertFalse(fnmatch(r'a\b', r'a/b'))
- self.assertFalse(fnmatch(r'a/b\c', r'a\b/c'))
- self.assertFalse(fnmatch(r'a/b/c', r'a\b\c'))
- # as of 3.5, .pyo is no longer munged to .py
- self.assertFalse(fnmatch('a.pyo', 'a.py'))
- def test_filter_match_trace(self):
- t1 = (("a.py", 2), ("b.py", 3))
- t2 = (("b.py", 4), ("b.py", 5))
- t3 = (("c.py", 5), ('<unknown>', 0))
- unknown = (('<unknown>', 0),)
- f = tracemalloc.Filter(True, "b.py", all_frames=True)
- self.assertTrue(f._match_traceback(t1))
- self.assertTrue(f._match_traceback(t2))
- self.assertFalse(f._match_traceback(t3))
- self.assertFalse(f._match_traceback(unknown))
- f = tracemalloc.Filter(True, "b.py", all_frames=False)
- self.assertFalse(f._match_traceback(t1))
- self.assertTrue(f._match_traceback(t2))
- self.assertFalse(f._match_traceback(t3))
- self.assertFalse(f._match_traceback(unknown))
- f = tracemalloc.Filter(False, "b.py", all_frames=True)
- self.assertFalse(f._match_traceback(t1))
- self.assertFalse(f._match_traceback(t2))
- self.assertTrue(f._match_traceback(t3))
- self.assertTrue(f._match_traceback(unknown))
- f = tracemalloc.Filter(False, "b.py", all_frames=False)
- self.assertTrue(f._match_traceback(t1))
- self.assertFalse(f._match_traceback(t2))
- self.assertTrue(f._match_traceback(t3))
- self.assertTrue(f._match_traceback(unknown))
- f = tracemalloc.Filter(False, "<unknown>", all_frames=False)
- self.assertTrue(f._match_traceback(t1))
- self.assertTrue(f._match_traceback(t2))
- self.assertTrue(f._match_traceback(t3))
- self.assertFalse(f._match_traceback(unknown))
- f = tracemalloc.Filter(True, "<unknown>", all_frames=True)
- self.assertFalse(f._match_traceback(t1))
- self.assertFalse(f._match_traceback(t2))
- self.assertTrue(f._match_traceback(t3))
- self.assertTrue(f._match_traceback(unknown))
- f = tracemalloc.Filter(False, "<unknown>", all_frames=True)
- self.assertTrue(f._match_traceback(t1))
- self.assertTrue(f._match_traceback(t2))
- self.assertFalse(f._match_traceback(t3))
- self.assertFalse(f._match_traceback(unknown))
- class TestCommandLine(unittest.TestCase):
- def test_env_var_disabled_by_default(self):
- # not tracing by default
- code = 'import tracemalloc; print(tracemalloc.is_tracing())'
- ok, stdout, stderr = assert_python_ok('-c', code)
- stdout = stdout.rstrip()
- self.assertEqual(stdout, b'False')
- @unittest.skipIf(interpreter_requires_environment(),
- 'Cannot run -E tests when PYTHON env vars are required.')
- def test_env_var_ignored_with_E(self):
- """PYTHON* environment variables must be ignored when -E is present."""
- code = 'import tracemalloc; print(tracemalloc.is_tracing())'
- ok, stdout, stderr = assert_python_ok('-E', '-c', code, PYTHONTRACEMALLOC='1')
- stdout = stdout.rstrip()
- self.assertEqual(stdout, b'False')
- def test_env_var_disabled(self):
- # tracing at startup
- code = 'import tracemalloc; print(tracemalloc.is_tracing())'
- ok, stdout, stderr = assert_python_ok('-c', code, PYTHONTRACEMALLOC='0')
- stdout = stdout.rstrip()
- self.assertEqual(stdout, b'False')
- def test_env_var_enabled_at_startup(self):
- # tracing at startup
- code = 'import tracemalloc; print(tracemalloc.is_tracing())'
- ok, stdout, stderr = assert_python_ok('-c', code, PYTHONTRACEMALLOC='1')
- stdout = stdout.rstrip()
- self.assertEqual(stdout, b'True')
- def test_env_limit(self):
- # start and set the number of frames
- code = 'import tracemalloc; print(tracemalloc.get_traceback_limit())'
- ok, stdout, stderr = assert_python_ok('-c', code, PYTHONTRACEMALLOC='10')
- stdout = stdout.rstrip()
- self.assertEqual(stdout, b'10')
- def check_env_var_invalid(self, nframe):
- with support.SuppressCrashReport():
- ok, stdout, stderr = assert_python_failure(
- '-c', 'pass',
- PYTHONTRACEMALLOC=str(nframe))
- if b'ValueError: the number of frames must be in range' in stderr:
- return
- if b'PYTHONTRACEMALLOC: invalid number of frames' in stderr:
- return
- self.fail(f"unexpected output: {stderr!a}")
- def test_env_var_invalid(self):
- for nframe in INVALID_NFRAME:
- with self.subTest(nframe=nframe):
- self.check_env_var_invalid(nframe)
- def test_sys_xoptions(self):
- for xoptions, nframe in (
- ('tracemalloc', 1),
- ('tracemalloc=1', 1),
- ('tracemalloc=15', 15),
- ):
- with self.subTest(xoptions=xoptions, nframe=nframe):
- code = 'import tracemalloc; print(tracemalloc.get_traceback_limit())'
- ok, stdout, stderr = assert_python_ok('-X', xoptions, '-c', code)
- stdout = stdout.rstrip()
- self.assertEqual(stdout, str(nframe).encode('ascii'))
- def check_sys_xoptions_invalid(self, nframe):
- args = ('-X', 'tracemalloc=%s' % nframe, '-c', 'pass')
- with support.SuppressCrashReport():
- ok, stdout, stderr = assert_python_failure(*args)
- if b'ValueError: the number of frames must be in range' in stderr:
- return
- if b'-X tracemalloc=NFRAME: invalid number of frames' in stderr:
- return
- self.fail(f"unexpected output: {stderr!a}")
- def test_sys_xoptions_invalid(self):
- for nframe in INVALID_NFRAME:
- with self.subTest(nframe=nframe):
- self.check_sys_xoptions_invalid(nframe)
- @unittest.skipIf(_testcapi is None, 'need _testcapi')
- def test_pymem_alloc0(self):
- # Issue #21639: Check that PyMem_Malloc(0) with tracemalloc enabled
- # does not crash.
- code = 'import _testcapi; _testcapi.test_pymem_alloc0(); 1'
- assert_python_ok('-X', 'tracemalloc', '-c', code)
- @unittest.skipIf(_testcapi is None, 'need _testcapi')
- class TestCAPI(unittest.TestCase):
- maxDiff = 80 * 20
- def setUp(self):
- if tracemalloc.is_tracing():
- self.skipTest("tracemalloc must be stopped before the test")
- self.domain = 5
- self.size = 123
- self.obj = allocate_bytes(self.size)[0]
- # for the type "object", id(obj) is the address of its memory block.
- # This type is not tracked by the garbage collector
- self.ptr = id(self.obj)
- def tearDown(self):
- tracemalloc.stop()
- def get_traceback(self):
- frames = _testcapi.tracemalloc_get_traceback(self.domain, self.ptr)
- if frames is not None:
- return tracemalloc.Traceback(frames)
- else:
- return None
- def track(self, release_gil=False, nframe=1):
- frames = get_frames(nframe, 1)
- _testcapi.tracemalloc_track(self.domain, self.ptr, self.size,
- release_gil)
- return frames
- def untrack(self):
- _testcapi.tracemalloc_untrack(self.domain, self.ptr)
- def get_traced_memory(self):
- # Get the traced size in the domain
- snapshot = tracemalloc.take_snapshot()
- domain_filter = tracemalloc.DomainFilter(True, self.domain)
- snapshot = snapshot.filter_traces([domain_filter])
- return sum(trace.size for trace in snapshot.traces)
- def check_track(self, release_gil):
- nframe = 5
- tracemalloc.start(nframe)
- size = tracemalloc.get_traced_memory()[0]
- frames = self.track(release_gil, nframe)
- self.assertEqual(self.get_traceback(),
- tracemalloc.Traceback(frames))
- self.assertEqual(self.get_traced_memory(), self.size)
- def test_track(self):
- self.check_track(False)
- def test_track_without_gil(self):
- # check that calling _PyTraceMalloc_Track() without holding the GIL
- # works too
- self.check_track(True)
- def test_track_already_tracked(self):
- nframe = 5
- tracemalloc.start(nframe)
- # track a first time
- self.track()
- # calling _PyTraceMalloc_Track() must remove the old trace and add
- # a new trace with the new traceback
- frames = self.track(nframe=nframe)
- self.assertEqual(self.get_traceback(),
- tracemalloc.Traceback(frames))
- def test_untrack(self):
- tracemalloc.start()
- self.track()
- self.assertIsNotNone(self.get_traceback())
- self.assertEqual(self.get_traced_memory(), self.size)
- # untrack must remove the trace
- self.untrack()
- self.assertIsNone(self.get_traceback())
- self.assertEqual(self.get_traced_memory(), 0)
- # calling _PyTraceMalloc_Untrack() multiple times must not crash
- self.untrack()
- self.untrack()
- def test_stop_track(self):
- tracemalloc.start()
- tracemalloc.stop()
- with self.assertRaises(RuntimeError):
- self.track()
- self.assertIsNone(self.get_traceback())
- def test_stop_untrack(self):
- tracemalloc.start()
- self.track()
- tracemalloc.stop()
- with self.assertRaises(RuntimeError):
- self.untrack()
- if __name__ == "__main__":
- unittest.main()
|