blob: b24a6b724fd6605ce8e5013e1496601092663c2e [file] [log] [blame]
Dennis Kempind5b59022013-07-17 14:12:55 -07001#! /usr/bin/env python
2# Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5from mtlib.log import FeedbackDownloader, FeedbackLog, Log
6from mtreplay import MTReplay
Dennis Kempin7432eb02014-03-18 13:41:41 -07007import fnmatch
Dennis Kempind5b59022013-07-17 14:12:55 -07008import json
9import multiprocessing
10import os
11import random
12import re
13import traceback
14import urllib
Sean O'Brienfd204da2017-05-02 15:13:11 -070015import datetime
Dennis Kempind5b59022013-07-17 14:12:55 -070016
17
18# prepare folder for log files
19script_dir = os.path.dirname(os.path.realpath(__file__))
20log_dir = os.path.join(script_dir, '../cache/logs/')
Sean O'Brienfd204da2017-05-02 15:13:11 -070021invalid_filename = os.path.join(log_dir, 'invalid')
Dennis Kempind5b59022013-07-17 14:12:55 -070022if not os.path.exists(log_dir):
23 os.mkdir(log_dir)
24
25
26class Query(object):
27 """ Abstract class for queries.
28
29 These objects are applied to files by the QueryEngine, which
30 calls FindMatches to execute the search on the replay results.
31
32 capture_logs can be set to true to direct the QueryEngine to return
33 the Log object in the QueryResults.
34 """
35 def __init__(self):
36 self.capture_logs = False
37
38 def FindMatches(self, replay_results, filename):
39 """ Returns a list of QueryMatch objects """
40 return []
41
42
43class QueryMatch(object):
44 """ Describes a match and contains information on how to locate it """
45 def __init__(self, filename, timestamp, line):
46 self.filename = filename
47 self.timestamp = timestamp
48 self.line = line
49
50 def __str__(self):
51 if self.timestamp:
52 return str(self.timestamp) + ": " + self.line
53 else:
54 return self.line
55
56 def __repr__(self):
57 return str(self)
58
59
60class QueryResult(object):
61 """ Describes the results of a query on a file.
62
63 This includes all matches found in this file, the number of
64 SYN reports processed and optionally the activity Log object,
65 if requested by the Query."""
66 def __init__(self, filename):
67 self.filename = filename
68 self.syn_count = 0
69 self.matches = []
70 self.log = None
71
72
73class QueryEngine(object):
74 """ This class allows queries to be executed on a large number of log files.
75
76 It managed a pool of log files, allows more log files to be downloaded and
77 can execute queries in parallel on this pool of log files.
78 """
79
80 def ExecuteSingle(self, filename, query):
81 """ Executes a query on a single log file """
82 log = Log(filename)
83 replay = MTReplay()
84 result = QueryResult(filename)
85
86 # find platform for file
87 platform = replay.PlatformOf(log)
88 if not platform:
89 print 'No platform for %s' % os.path.basename(filename)
90 return result
91
92 # count the number of syn reports in log file
93 result.syn_count = len(tuple(re.finditer("0000 0000 0", log.evdev)))
94
95 # run replay
96 try:
97 replay_result = replay.Replay(log)
98 except:
99 return result
100
101 result.matches = query.FindMatches(replay_result, filename)
102 if result.matches:
103 result.log = replay_result.log
104
105 return result
106
107 def Execute(self, filenames, queries, parallel=True):
108 """ Executes a query on a list of log files.
109
110 filenames: list of filenames to execute
111 queries: either a single query object for all files,
112 or a dictionary mapping filenames to query objects.
113 parallel: set to False to execute sequentially.
114 """
115
116 print "Processing %d log files" % len(filenames)
117
118 if hasattr(queries, 'FindMatches'):
119 queries = dict([(filename, queries) for filename in filenames])
120
121 # arguments for QuerySubprocess
122 parameters = [(name, queries[name])
123 for name in filenames if name in queries]
124
125 # process all files either in parallel or sequential
126 if parallel:
127 pool = multiprocessing.Pool()
128 results = pool.map(ExecuteSingleSubprocess, parameters)
129 pool.terminate()
130 else:
131 results = map(ExecuteSingleSubprocess, parameters)
132
133 # count syn reports
134 syn_count = sum([result.syn_count for result in results])
135
136 # create dict of results by filename
137 result_dict = dict([(result.filename, result)
138 for result in results
139 if result.matches])
140
141 # syn reports are coming at approx 60 Hz on most platforms
142 syn_per_second = 60.0
143 hours = syn_count / syn_per_second / 60.0 / 60.0
144 print "Processed ~%.2f hours of interaction" % hours
145
146 return result_dict
147
Dennis Kempin7432eb02014-03-18 13:41:41 -0700148 def SelectFiles(self, number, platform=None):
Dennis Kempind5b59022013-07-17 14:12:55 -0700149 """ Returns a random selection of files from the pool """
150 # list all log files
151 files = [os.path.abspath(os.path.join(log_dir, f))
152 for f in os.listdir(log_dir)
153 if f.isdigit()]
154
Dennis Kempin7432eb02014-03-18 13:41:41 -0700155 if platform:
156 print "Filtering files by platform. This may take a while."
157 replay = MTReplay()
158 pool = multiprocessing.Pool()
159 platforms = pool.map(GetPlatformSubprocess, files)
160 pool.terminate()
161
162 filtered = filter(lambda (f, p): p and fnmatch.fnmatch(p, platform),
163 platforms)
164 files = map(lambda (f, p): f, filtered)
165 print "found", len(files), "log files matching", platform
166
Dennis Kempind5b59022013-07-17 14:12:55 -0700167 # randomly select subset of files
168 if number is not None:
169 files = random.sample(files, number)
170 return files
171
Sean O'Brienfd204da2017-05-02 15:13:11 -0700172 def GetInvalidIDs(self):
173 """Look for list of feedback IDs with invalid logs"""
174 if not os.path.exists(invalid_filename):
175 return []
176 return [x.strip() for x in open(invalid_filename).readlines()]
Dennis Kempind5b59022013-07-17 14:12:55 -0700177
Sean O'Brienfd204da2017-05-02 15:13:11 -0700178 def DownloadFile(self, id, downloader, invalid_ids):
179 """Download one feedback log into the pool.
180
181 Return 1 if successful, 0 if not.
182 """
Dennis Kempind5b59022013-07-17 14:12:55 -0700183 filename = os.path.join(log_dir, id)
184 if os.path.exists(filename):
185 print 'Skipping existing report', id
Sean O'Brienfd204da2017-05-02 15:13:11 -0700186 return 0
187 if id in invalid_ids:
188 print 'Skipping invalid report', id
189 return 0
Dennis Kempind5b59022013-07-17 14:12:55 -0700190
191 print 'Downloading new report', id
192 try:
193 # might throw IO/Tar/Zip/etc exceptions
Sean O'Brienfd204da2017-05-02 15:13:11 -0700194 report = FeedbackLog(id, force_latest='pad', downloader=downloader)
Dennis Kempind5b59022013-07-17 14:12:55 -0700195 # Test parse. Will throw exception on malformed log
196 json.loads(report.activity)
197 except:
Sean O'Brienfd204da2017-05-02 15:13:11 -0700198 print 'Invalid report', id
199 with open(invalid_filename, 'a') as f:
200 f.write(id + '\n')
201 return 0
Dennis Kempind5b59022013-07-17 14:12:55 -0700202
203 # check if report contains logs and actual events
204 if report.activity and report.evdev and 'E:' in report.evdev:
205 report.SaveAs(filename)
Sean O'Brienfd204da2017-05-02 15:13:11 -0700206 return 1
Dennis Kempind5b59022013-07-17 14:12:55 -0700207 else:
208 print 'Invalid report %s' % id
Sean O'Brienfd204da2017-05-02 15:13:11 -0700209 with open(invalid_filename, 'a') as f:
210 f.write(id + '\n')
211 return 0
Dennis Kempind5b59022013-07-17 14:12:55 -0700212
Sean O'Brienfd204da2017-05-02 15:13:11 -0700213 def Download(self, num_to_download, parallel=True):
Dennis Kempind5b59022013-07-17 14:12:55 -0700214 """Download 'num' new feedback logs into the pool."""
215 downloader = FeedbackDownloader()
216
Sean O'Brienfd204da2017-05-02 15:13:11 -0700217 dt = datetime.datetime.utcnow() - datetime.datetime(1970, 1, 1)
218 end_time = (((dt.days * 24 * 60 * 60 + dt.seconds) * 1000) +
219 (dt.microseconds / 10))
220 num_to_download = int(num_to_download)
221 num_downloaded = 0
222 invalid_ids = self.GetInvalidIDs()
223 page_token = ''
Dennis Kempind5b59022013-07-17 14:12:55 -0700224
Sean O'Brienfd204da2017-05-02 15:13:11 -0700225 while num_to_download > num_downloaded:
226 # Download list of feedback report id's
227 num_this_iteration = min((num_to_download - num_downloaded) * 5, 500)
228 page_token, report_ids = downloader.DownloadIDs(
229 num_this_iteration, end_time, page_token)
Dennis Kempind5b59022013-07-17 14:12:55 -0700230
Sean O'Brienfd204da2017-05-02 15:13:11 -0700231 # Download and check each report
232 parameters = [(r_id, downloader, invalid_ids) for r_id in report_ids]
233 if parallel:
234 pool = multiprocessing.Pool()
235 results = sum(pool.map(DownloadFileSubprocess, parameters))
236 pool.terminate()
237 else:
238 results = sum(map(DownloadFileSubprocess, parameters))
239 num_downloaded += results
240 print "--------------------"
241 print "%d/%d reports found" % (num_downloaded, num_to_download)
242 print "--------------------"
243
Dennis Kempind5b59022013-07-17 14:12:55 -0700244
Dennis Kempin7432eb02014-03-18 13:41:41 -0700245def GetPlatformSubprocess(filename):
246 replay = MTReplay()
247 log = Log(filename)
248 detected_platform = replay.PlatformOf(log)
249 if detected_platform:
250 print filename + ": " + detected_platform.name
251 return filename, detected_platform.name
252 else:
253 return filename, None
Dennis Kempind5b59022013-07-17 14:12:55 -0700254
255def ExecuteSingleSubprocess(args):
256 """ Wrapper for subprocesses to run ExecuteSingle """
257 try:
258 return QueryEngine().ExecuteSingle(args[0], args[1])
259 except Exception, e:
260 traceback.print_exc()
261 raise e
262
Sean O'Brienfd204da2017-05-02 15:13:11 -0700263def DownloadFileSubprocess(args):
Dennis Kempind5b59022013-07-17 14:12:55 -0700264 """ Wrapper for subprocesses to run DownloadFile """
265 try:
Sean O'Brienfd204da2017-05-02 15:13:11 -0700266 return QueryEngine().DownloadFile(args[0], args[1], args[2])
Dennis Kempind5b59022013-07-17 14:12:55 -0700267 except Exception, e:
268 traceback.print_exc()
Sean O'Brienfd204da2017-05-02 15:13:11 -0700269 raise e