blob: 3726f03532a866f47d714bcf10dcf7dc023371a4 [file] [log] [blame]
Todd Broch407e8292013-09-05 17:58:33 -07001# Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import logging, os, re, utils
6
7from autotest_lib.client.bin import utils
8from autotest_lib.client.common_lib import error
9
10class KernelTrace(object):
11 """Allows access and control to Kernel tracing facilities.
12
13 Example code snippet:
14 trace = KernelTrace(events=['mali_dvfs:mali_dvfs_set_clock'])
15 results = trace.read(regexp=r'frequency=(\d+)')
16
17 Public methods:
18 on : Enables tracing
19 off : Disables tracing
20 flush : Flushes trace buffer
21 read : Reads trace buffer returns list of
22 - tuples if regexp provided
23 - else matching string
24 uptime_secs : Returns float of current uptime.
25
26 Private functions:
27 _onoff : Disable/enable tracing
28 _onoff_event : Disable/enable events
29
30 Private attributes:
31 _buffer : list to hold parsed results from trace buffer
32 _buffer_ptr : integer pointing to last byte read
33
34 TODO(tbroch): List of potential enhancements
35 - currently only supports trace events. Add other tracers.
36 """
37 _TRACE_ROOT = '/sys/kernel/debug/tracing'
38 _TRACE_EN_PATH = os.path.join(_TRACE_ROOT, 'tracing_enabled')
39
40 def __init__(self, flush=True, events=None, on=True):
41 """Constructor for KernelTrace class"""
42 self._buffer = []
43 self._buffer_ptr = 0
44 self._events = events
45 self._on = on
46
47 if flush:
48 self.flush()
49 for event in events:
50 self.event_on(event)
51 if on:
52 self.on()
53
54
55 def __del__(self, flush=True, events=None, on=True):
56 """Deconstructor for KernelTrace class"""
57 for event in self._events:
58 self.event_off(event)
59 if self._on:
60 self.off()
61
62
63 def _onoff(self, val):
64 """Enable/Disable tracing.
65
66 Arguments:
67 val: integer, 1 for on, 0 for off
68
69 Raises:
70 error.TestFail: If unable to enable/disable tracing
71 boolean of tracing on/off status
72 """
73 utils.write_one_line(self._TRACE_EN_PATH, val)
74 fname = os.path.join(self._TRACE_ROOT, 'tracing_on')
75 result = int(utils.read_one_line(fname).strip())
76 if not result == val:
77 raise error.TestFail("Unable to %sable tracing" %
78 'en' if val == 1 else 'dis')
79
80
81 def on(self):
82 """Enable tracing."""
83 return self._onoff(1)
84
85
86 def off(self):
87 """Disable tracing."""
88 self._onoff(0)
89
90
91 def _event_onoff(self, event, val):
92 """Enable/Disable tracing event.
93
94 TODO(tbroch) Consider allowing wild card enabling of trace events via
95 /sys/kernel/debug/tracing/set_event although it makes filling buffer
96 really easy
97
98 Arguments:
99 event: list of events.
100 See kernel(Documentation/trace/events.txt) for formatting.
101 val: integer, 1 for on, 0 for off
102
103 Raises:
104 error.TestFail: If unable to enable/disable event
105 """
106 logging.debug("event_onoff: event:%s val:%d", event, val)
107 event_path = event.replace(':', '/')
108 fname = os.path.join(self._TRACE_ROOT, 'events', event_path, 'enable')
109
110 if not os.path.exists(fname):
111 raise error.TestFail("Unable to locate tracing event %s" % fname)
112 utils.write_one_line(fname, val)
113
114 fname = os.path.join(self._TRACE_ROOT, "set_event")
115 found = False
116 with open(fname) as fd:
117 for ln in fd.readlines():
118 logging.debug("set_event ln:%s", ln)
119 if re.findall(event, ln):
120 found = True
121 break
122
123 if val == 1 and not found:
124 raise error.TestFail("Event %s not enabled" % event)
125 if val == 0 and found:
126 raise error.TestFail("Event %s not disabled" % event)
127
128
129 def event_on(self, event):
130 self._event_onoff(event, 1)
131
132
133 def event_off(self, event):
134 self._event_onoff(event, 0)
135
136
137 def flush(self):
138 """Flush trace buffer.
139
140 Raises:
141 error.TestFail: If unable to flush
142 """
143
144 self.off()
145 fname = os.path.join(self._TRACE_ROOT, 'free_buffer')
146 utils.write_one_line(fname, 1)
147 self._buffer_ptr = 0
148
149 fname = os.path.join(self._TRACE_ROOT, 'buffer_size_kb')
150 result = utils.read_one_line(fname).strip()
151 if result is '0':
152 return True
153 return False
154
155
156 def read(self, regexp=None):
157 fname = os.path.join(self._TRACE_ROOT, 'trace')
158 fd = open(fname)
159 fd.seek(self._buffer_ptr)
160 for ln in fd.readlines():
161 if regexp is None:
162 self._buffer.append(ln)
163 continue
164 results = re.findall(regexp, ln)
165 if results:
166 logging.debug(ln)
167 self._buffer.append(results[0])
168 self._buffer_ptr = fd.tell()
169 fd.close()
170 return self._buffer
171
172
173 @staticmethod
174 def uptime_secs():
175 results = utils.read_one_line("/proc/uptime")
176 return float(results.split()[0])