blob: 32c077e8b48a35f9dafa499cc36ee807cfba5ff5 [file] [log] [blame]
Dennis Kempind5b59022013-07-17 14:12:55 -07001#! /usr/bin/env python
2# Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5from mtlib.log import FeedbackDownloader, FeedbackLog, Log
6from mtreplay import MTReplay
Dennis Kempin7432eb02014-03-18 13:41:41 -07007import fnmatch
Dennis Kempind5b59022013-07-17 14:12:55 -07008import json
9import multiprocessing
10import os
11import random
12import re
13import traceback
14import urllib
15
16
17# prepare folder for log files
18script_dir = os.path.dirname(os.path.realpath(__file__))
19log_dir = os.path.join(script_dir, '../cache/logs/')
20if not os.path.exists(log_dir):
21 os.mkdir(log_dir)
22
23
24class Query(object):
25 """ Abstract class for queries.
26
27 These objects are applied to files by the QueryEngine, which
28 calls FindMatches to execute the search on the replay results.
29
30 capture_logs can be set to true to direct the QueryEngine to return
31 the Log object in the QueryResults.
32 """
33 def __init__(self):
34 self.capture_logs = False
35
36 def FindMatches(self, replay_results, filename):
37 """ Returns a list of QueryMatch objects """
38 return []
39
40
41class QueryMatch(object):
42 """ Describes a match and contains information on how to locate it """
43 def __init__(self, filename, timestamp, line):
44 self.filename = filename
45 self.timestamp = timestamp
46 self.line = line
47
48 def __str__(self):
49 if self.timestamp:
50 return str(self.timestamp) + ": " + self.line
51 else:
52 return self.line
53
54 def __repr__(self):
55 return str(self)
56
57
58class QueryResult(object):
59 """ Describes the results of a query on a file.
60
61 This includes all matches found in this file, the number of
62 SYN reports processed and optionally the activity Log object,
63 if requested by the Query."""
64 def __init__(self, filename):
65 self.filename = filename
66 self.syn_count = 0
67 self.matches = []
68 self.log = None
69
70
71class QueryEngine(object):
72 """ This class allows queries to be executed on a large number of log files.
73
74 It managed a pool of log files, allows more log files to be downloaded and
75 can execute queries in parallel on this pool of log files.
76 """
77
78 def ExecuteSingle(self, filename, query):
79 """ Executes a query on a single log file """
80 log = Log(filename)
81 replay = MTReplay()
82 result = QueryResult(filename)
83
84 # find platform for file
85 platform = replay.PlatformOf(log)
86 if not platform:
87 print 'No platform for %s' % os.path.basename(filename)
88 return result
89
90 # count the number of syn reports in log file
91 result.syn_count = len(tuple(re.finditer("0000 0000 0", log.evdev)))
92
93 # run replay
94 try:
95 replay_result = replay.Replay(log)
96 except:
97 return result
98
99 result.matches = query.FindMatches(replay_result, filename)
100 if result.matches:
101 result.log = replay_result.log
102
103 return result
104
105 def Execute(self, filenames, queries, parallel=True):
106 """ Executes a query on a list of log files.
107
108 filenames: list of filenames to execute
109 queries: either a single query object for all files,
110 or a dictionary mapping filenames to query objects.
111 parallel: set to False to execute sequentially.
112 """
113
114 print "Processing %d log files" % len(filenames)
115
116 if hasattr(queries, 'FindMatches'):
117 queries = dict([(filename, queries) for filename in filenames])
118
119 # arguments for QuerySubprocess
120 parameters = [(name, queries[name])
121 for name in filenames if name in queries]
122
123 # process all files either in parallel or sequential
124 if parallel:
125 pool = multiprocessing.Pool()
126 results = pool.map(ExecuteSingleSubprocess, parameters)
127 pool.terminate()
128 else:
129 results = map(ExecuteSingleSubprocess, parameters)
130
131 # count syn reports
132 syn_count = sum([result.syn_count for result in results])
133
134 # create dict of results by filename
135 result_dict = dict([(result.filename, result)
136 for result in results
137 if result.matches])
138
139 # syn reports are coming at approx 60 Hz on most platforms
140 syn_per_second = 60.0
141 hours = syn_count / syn_per_second / 60.0 / 60.0
142 print "Processed ~%.2f hours of interaction" % hours
143
144 return result_dict
145
Dennis Kempin7432eb02014-03-18 13:41:41 -0700146 def SelectFiles(self, number, platform=None):
Dennis Kempind5b59022013-07-17 14:12:55 -0700147 """ Returns a random selection of files from the pool """
148 # list all log files
149 files = [os.path.abspath(os.path.join(log_dir, f))
150 for f in os.listdir(log_dir)
151 if f.isdigit()]
152
Dennis Kempin7432eb02014-03-18 13:41:41 -0700153 if platform:
154 print "Filtering files by platform. This may take a while."
155 replay = MTReplay()
156 pool = multiprocessing.Pool()
157 platforms = pool.map(GetPlatformSubprocess, files)
158 pool.terminate()
159
160 filtered = filter(lambda (f, p): p and fnmatch.fnmatch(p, platform),
161 platforms)
162 files = map(lambda (f, p): f, filtered)
163 print "found", len(files), "log files matching", platform
164
Dennis Kempind5b59022013-07-17 14:12:55 -0700165 # randomly select subset of files
166 if number is not None:
167 files = random.sample(files, number)
168 return files
169
170 def DownloadFile(self, id):
171 """Download one feedback log into the pool."""
172 downloader = FeedbackDownloader()
173
174 filename = os.path.join(log_dir, id)
175 if os.path.exists(filename):
176 print 'Skipping existing report', id
177 return
178
179 print 'Downloading new report', id
180 try:
181 # might throw IO/Tar/Zip/etc exceptions
182 report = FeedbackLog(id, force_latest='pad')
183 # Test parse. Will throw exception on malformed log
184 json.loads(report.activity)
185 except:
186 print 'Invalid report %s' % id
187 return
188
189 # check if report contains logs and actual events
190 if report.activity and report.evdev and 'E:' in report.evdev:
191 report.SaveAs(filename)
192 else:
193 print 'Invalid report %s' % id
194
195 def Download(self, num, offset=0, parallel=True):
196 """Download 'num' new feedback logs into the pool."""
197 downloader = FeedbackDownloader()
198
199 # download list of feedback report id's
200 params = {
201 '$limit': str(num),
202 '$offset': str(offset),
203 'mapping': ':list',
204 'productId': '208' # ChromeOS
205 }
206 url = ('https://feedback.corp.google.com/resty/ReportSearch?' +
207 urllib.urlencode(params))
208 data = downloader.DownloadFile(url)
209 data = data[data.find('{'):] # strip garbage before json data
210
211 reports_json = json.loads(data)
212 report_ids = [item['id'] for item in reports_json['results']]
213
214 # Download and check each report
215 if parallel:
216 pool = multiprocessing.Pool()
217 results = pool.map(DownloadFileSubprocess, report_ids)
218 pool.terminate()
219 else:
220 results = map(DownloadFileSubprocess, report_ids)
221
Dennis Kempin7432eb02014-03-18 13:41:41 -0700222def GetPlatformSubprocess(filename):
223 replay = MTReplay()
224 log = Log(filename)
225 detected_platform = replay.PlatformOf(log)
226 if detected_platform:
227 print filename + ": " + detected_platform.name
228 return filename, detected_platform.name
229 else:
230 return filename, None
Dennis Kempind5b59022013-07-17 14:12:55 -0700231
232def ExecuteSingleSubprocess(args):
233 """ Wrapper for subprocesses to run ExecuteSingle """
234 try:
235 return QueryEngine().ExecuteSingle(args[0], args[1])
236 except Exception, e:
237 traceback.print_exc()
238 raise e
239
240
241def DownloadFileSubprocess(id):
242 """ Wrapper for subprocesses to run DownloadFile """
243 try:
244 return QueryEngine().DownloadFile(id)
245 except Exception, e:
246 traceback.print_exc()
247 raise e