blob: c9ff2886952c5a5e68f2bc6a807d2cacde8aeb10 [file] [log] [blame]
Dennis Kempind5b59022013-07-17 14:12:55 -07001#! /usr/bin/env python
2# Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5from mtlib.log import FeedbackDownloader, FeedbackLog, Log
6from mtreplay import MTReplay
7import json
8import multiprocessing
9import os
10import random
11import re
12import traceback
13import urllib
14
15
16# prepare folder for log files
17script_dir = os.path.dirname(os.path.realpath(__file__))
18log_dir = os.path.join(script_dir, '../cache/logs/')
19if not os.path.exists(log_dir):
20 os.mkdir(log_dir)
21
22
23class Query(object):
24 """ Abstract class for queries.
25
26 These objects are applied to files by the QueryEngine, which
27 calls FindMatches to execute the search on the replay results.
28
29 capture_logs can be set to true to direct the QueryEngine to return
30 the Log object in the QueryResults.
31 """
32 def __init__(self):
33 self.capture_logs = False
34
35 def FindMatches(self, replay_results, filename):
36 """ Returns a list of QueryMatch objects """
37 return []
38
39
40class QueryMatch(object):
41 """ Describes a match and contains information on how to locate it """
42 def __init__(self, filename, timestamp, line):
43 self.filename = filename
44 self.timestamp = timestamp
45 self.line = line
46
47 def __str__(self):
48 if self.timestamp:
49 return str(self.timestamp) + ": " + self.line
50 else:
51 return self.line
52
53 def __repr__(self):
54 return str(self)
55
56
57class QueryResult(object):
58 """ Describes the results of a query on a file.
59
60 This includes all matches found in this file, the number of
61 SYN reports processed and optionally the activity Log object,
62 if requested by the Query."""
63 def __init__(self, filename):
64 self.filename = filename
65 self.syn_count = 0
66 self.matches = []
67 self.log = None
68
69
70class QueryEngine(object):
71 """ This class allows queries to be executed on a large number of log files.
72
73 It managed a pool of log files, allows more log files to be downloaded and
74 can execute queries in parallel on this pool of log files.
75 """
76
77 def ExecuteSingle(self, filename, query):
78 """ Executes a query on a single log file """
79 log = Log(filename)
80 replay = MTReplay()
81 result = QueryResult(filename)
82
83 # find platform for file
84 platform = replay.PlatformOf(log)
85 if not platform:
86 print 'No platform for %s' % os.path.basename(filename)
87 return result
88
89 # count the number of syn reports in log file
90 result.syn_count = len(tuple(re.finditer("0000 0000 0", log.evdev)))
91
92 # run replay
93 try:
94 replay_result = replay.Replay(log)
95 except:
96 return result
97
98 result.matches = query.FindMatches(replay_result, filename)
99 if result.matches:
100 result.log = replay_result.log
101
102 return result
103
104 def Execute(self, filenames, queries, parallel=True):
105 """ Executes a query on a list of log files.
106
107 filenames: list of filenames to execute
108 queries: either a single query object for all files,
109 or a dictionary mapping filenames to query objects.
110 parallel: set to False to execute sequentially.
111 """
112
113 print "Processing %d log files" % len(filenames)
114
115 if hasattr(queries, 'FindMatches'):
116 queries = dict([(filename, queries) for filename in filenames])
117
118 # arguments for QuerySubprocess
119 parameters = [(name, queries[name])
120 for name in filenames if name in queries]
121
122 # process all files either in parallel or sequential
123 if parallel:
124 pool = multiprocessing.Pool()
125 results = pool.map(ExecuteSingleSubprocess, parameters)
126 pool.terminate()
127 else:
128 results = map(ExecuteSingleSubprocess, parameters)
129
130 # count syn reports
131 syn_count = sum([result.syn_count for result in results])
132
133 # create dict of results by filename
134 result_dict = dict([(result.filename, result)
135 for result in results
136 if result.matches])
137
138 # syn reports are coming at approx 60 Hz on most platforms
139 syn_per_second = 60.0
140 hours = syn_count / syn_per_second / 60.0 / 60.0
141 print "Processed ~%.2f hours of interaction" % hours
142
143 return result_dict
144
145 def SelectFiles(self, number):
146 """ Returns a random selection of files from the pool """
147 # list all log files
148 files = [os.path.abspath(os.path.join(log_dir, f))
149 for f in os.listdir(log_dir)
150 if f.isdigit()]
151
152 # randomly select subset of files
153 if number is not None:
154 files = random.sample(files, number)
155 return files
156
157 def DownloadFile(self, id):
158 """Download one feedback log into the pool."""
159 downloader = FeedbackDownloader()
160
161 filename = os.path.join(log_dir, id)
162 if os.path.exists(filename):
163 print 'Skipping existing report', id
164 return
165
166 print 'Downloading new report', id
167 try:
168 # might throw IO/Tar/Zip/etc exceptions
169 report = FeedbackLog(id, force_latest='pad')
170 # Test parse. Will throw exception on malformed log
171 json.loads(report.activity)
172 except:
173 print 'Invalid report %s' % id
174 return
175
176 # check if report contains logs and actual events
177 if report.activity and report.evdev and 'E:' in report.evdev:
178 report.SaveAs(filename)
179 else:
180 print 'Invalid report %s' % id
181
182 def Download(self, num, offset=0, parallel=True):
183 """Download 'num' new feedback logs into the pool."""
184 downloader = FeedbackDownloader()
185
186 # download list of feedback report id's
187 params = {
188 '$limit': str(num),
189 '$offset': str(offset),
190 'mapping': ':list',
191 'productId': '208' # ChromeOS
192 }
193 url = ('https://feedback.corp.google.com/resty/ReportSearch?' +
194 urllib.urlencode(params))
195 data = downloader.DownloadFile(url)
196 data = data[data.find('{'):] # strip garbage before json data
197
198 reports_json = json.loads(data)
199 report_ids = [item['id'] for item in reports_json['results']]
200
201 # Download and check each report
202 if parallel:
203 pool = multiprocessing.Pool()
204 results = pool.map(DownloadFileSubprocess, report_ids)
205 pool.terminate()
206 else:
207 results = map(DownloadFileSubprocess, report_ids)
208
209
210def ExecuteSingleSubprocess(args):
211 """ Wrapper for subprocesses to run ExecuteSingle """
212 try:
213 return QueryEngine().ExecuteSingle(args[0], args[1])
214 except Exception, e:
215 traceback.print_exc()
216 raise e
217
218
219def DownloadFileSubprocess(id):
220 """ Wrapper for subprocesses to run DownloadFile """
221 try:
222 return QueryEngine().DownloadFile(id)
223 except Exception, e:
224 traceback.print_exc()
225 raise e