blob: 4f57aa582ea1226507f6bb52455a76ed00334166 [file] [log] [blame]
Wei-Ning Huang1cea6112015-03-02 12:45:34 +08001#!/usr/bin/python -u
2# -*- coding: utf-8 -*-
3#
4# Copyright 2015 The Chromium OS Authors. All rights reserved.
5# Use of this source code is governed by a BSD-style license that can be
6# found in the LICENSE file.
7
Wei-Ning Huang7d029b12015-03-06 10:32:15 +08008import argparse
Wei-Ning Huang1cea6112015-03-02 12:45:34 +08009import fcntl
10import json
11import logging
12import os
13import Queue
14import select
15import socket
16import subprocess
17import sys
18import threading
19import time
20import uuid
21
22
23_OVERLORD_PORT = 4455
24_OVERLORD_LAN_DISCOVERY_PORT = 4456
25
26_BUFSIZE = 8192
27_RETRY_INTERVAL = 2
28_SEPARATOR = '\r\n'
29_PING_TIMEOUT = 3
30_PING_INTERVAL = 5
31_REQUEST_TIMEOUT_SECS = 60
32_SHELL = os.getenv('SHELL', '/bin/bash')
33
34RESPONSE_SUCCESS = 'success'
35RESPONSE_FAILED = 'failed'
36
37
38class PingTimeoutError(Exception):
39 pass
40
41
42class RequestError(Exception):
43 pass
44
45
46class Ghost(object):
47 """Ghost implements the client protocol of Overlord.
48
49 Ghost provide terminal/shell/logcat functionality and manages the client
50 side connectivity.
51 """
52 NONE, AGENT, SHELL, LOGCAT, SLOGCAT = range(5)
53
54 MODE_NAME = {
55 NONE: 'NONE',
56 AGENT: 'Agent',
57 SHELL: 'Shell',
58 LOGCAT: 'Logcat',
59 SLOGCAT: 'Simple-Logcat'
60 }
61
62 def __init__(self, overlord_addrs, mode=AGENT, sid=None, filename=None):
63 """Constructor.
64
65 Args:
66 overlord_addrs: a list of possible address of overlord.
67 mode: client mode, either AGENT, SHELL or LOGCAT
68 sid: session id. If the connection is requested by overlord, sid should
69 be set to the corresponding session id assigned by overlord.
70 filename: the filename to cat when we are in LOGCAT mode.
71 """
72 assert mode in [Ghost.AGENT, Ghost.SHELL, Ghost.LOGCAT]
73 if mode == Ghost.LOGCAT:
74 assert filename is not None
75
76 self._overlord_addrs = overlord_addrs
77 self._mode = mode
78 self._sock = None
79 self._machine_id = self.GetMachineID()
80 self._client_id = sid if sid is not None else str(uuid.uuid4())
Wei-Ning Huang7d029b12015-03-06 10:32:15 +080081 self._properties = {}
Wei-Ning Huang1cea6112015-03-02 12:45:34 +080082 self._logcat_filename = filename
83 self._buf = ''
84 self._requests = {}
85 self._reset = False
86 self._last_ping = 0
87 self._queue = Queue.Queue()
88
Wei-Ning Huang7d029b12015-03-06 10:32:15 +080089 def LoadPropertiesFromFile(self, filename):
90 try:
91 with open(filename, 'r') as f:
92 self._properties = json.loads(f.read())
93 except Exception as e:
94 logging.exception('LoadPropertiesFromFile: ' + str(e))
95
Wei-Ning Huang1cea6112015-03-02 12:45:34 +080096 def SpawnGhost(self, mode, sid, filename=None):
97 """Spawn a child ghost with specific mode.
98
99 Returns:
100 The spawned child process pid.
101 """
102 pid = os.fork()
103 if pid == 0:
104 g = Ghost(self._overlord_addrs, mode, sid, filename)
Wei-Ning Huang7d029b12015-03-06 10:32:15 +0800105 g.Start(True)
Wei-Ning Huang1cea6112015-03-02 12:45:34 +0800106 sys.exit(0)
107 else:
108 return pid
109
110 def Timestamp(self):
111 return int(time.time())
112
113 def GetGateWayIP(self):
114 with open('/proc/net/route', 'r') as f:
115 lines = f.readlines()
116
117 ips = []
118 for line in lines:
119 parts = line.split('\t')
120 if parts[2] == '00000000':
121 continue
122
123 try:
124 h = parts[2].decode('hex')
125 ips.append('%d.%d.%d.%d' % tuple(ord(x) for x in reversed(h)))
126 except TypeError:
127 pass
128
129 return ips
130
131 def GetMachineID(self):
132 """Generates machine-dependent ID string for a machine.
133 There are many ways to generate a machine ID:
134 1. factory device-data
135 2. /sys/class/dmi/id/product_uuid (only available on intel machines)
136 3. MAC address
137 We follow the listed order to generate machine ID, and fallback to the next
138 alternative if the previous doesn't work.
139 """
140 try:
141 p = subprocess.Popen('factory device-data | grep mlb_serial_number | '
142 'cut -d " " -f 2', stdout=subprocess.PIPE,
Wei-Ning Huang7d029b12015-03-06 10:32:15 +0800143 stderr=subprocess.PIPE, shell=True)
Wei-Ning Huang1cea6112015-03-02 12:45:34 +0800144 stdout, _ = p.communicate()
145 if stdout == '':
146 raise RuntimeError("empty mlb number")
147 return stdout.strip()
148 except Exception:
149 pass
150
151 try:
152 with open('/sys/class/dmi/id/product_uuid', 'r') as f:
153 return f.read().strip()
154 except Exception:
155 pass
156
157 try:
158 macs = []
159 ifaces = sorted(os.listdir('/sys/class/net'))
160 for iface in ifaces:
161 if iface == 'lo':
162 continue
163
164 with open('/sys/class/net/%s/address' % iface, 'r') as f:
165 macs.append(f.read().strip())
166
167 return ';'.join(macs)
168 except Exception:
169 pass
170
171 raise RuntimeError("can't generate machine ID")
172
173 def Reset(self):
174 """Reset state and clear request handlers."""
175 self._reset = False
176 self._buf = ""
177 self._last_ping = 0
178 self._requests = {}
179
180 def SendMessage(self, msg):
181 """Serialize the message and send it through the socket."""
182 self._sock.send(json.dumps(msg) + _SEPARATOR)
183
184 def SendRequest(self, name, args, handler=None,
185 timeout=_REQUEST_TIMEOUT_SECS):
186 if handler and not callable(handler):
187 raise RequestError('Invalid requiest handler for msg "%s"' % name)
188
189 rid = str(uuid.uuid4())
190 msg = {'rid': rid, 'timeout': timeout, 'name': name, 'params': args}
191 self._requests[rid] = [self.Timestamp(), timeout, handler]
192 self.SendMessage(msg)
193
194 def SendResponse(self, omsg, status, params=None):
195 msg = {'rid': omsg['rid'], 'response': status, 'params': params}
196 self.SendMessage(msg)
197
198 def SpawnPTYServer(self, _):
199 """Spawn a PTY server and forward I/O to the TCP socket."""
200 logging.info('SpawnPTYServer: started')
201
202 pid, fd = os.forkpty()
203 if pid == 0:
204 env = os.environ.copy()
205 env['USER'] = os.getenv('USER', 'root')
206 env['HOME'] = os.getenv('HOME', '/root')
207 os.chdir(env['HOME'])
208 os.execve(_SHELL, [_SHELL], env)
209 else:
210 try:
211 while True:
212 rd, _, _ = select.select([self._sock, fd], [], [])
213
214 if fd in rd:
215 self._sock.send(os.read(fd, _BUFSIZE))
216
217 if self._sock in rd:
218 ret = self._sock.recv(_BUFSIZE)
219 if len(ret) == 0:
220 raise RuntimeError("socket closed")
221 os.write(fd, ret)
222 except (OSError, socket.error, RuntimeError):
223 self._sock.close()
224 logging.info('SpawnPTYServer: terminated')
225 sys.exit(0)
226
227 def SpawnLogcatServer(self, _):
228 """Spawn a Logcat server and forward output to the TCP socket."""
229 logging.info('SpawnLogcatServer: started')
230
231 p = subprocess.Popen('tail -n +0 -f "%s"' % self._logcat_filename,
232 stdout=subprocess.PIPE, stderr=subprocess.PIPE,
233 shell=True)
234
235 def make_non_block(fd):
236 fl = fcntl.fcntl(fd, fcntl.F_GETFL)
237 fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
238
239 make_non_block(p.stdout)
240 make_non_block(p.stderr)
241
242 try:
243 while True:
244 rd, _, _ = select.select([p.stdout, p.stderr, self._sock], [], [])
245
246 if p.stdout in rd:
247 self._sock.send(p.stdout.read(_BUFSIZE))
248
249 if p.stderr in rd:
250 self._sock.send(p.stderr.read(_BUFSIZE))
251
252 if self._sock in rd:
253 ret = self._sock.recv(_BUFSIZE)
254 if len(ret) == 0:
255 raise RuntimeError("socket closed")
256 except (OSError, socket.error, RuntimeError):
257 self._sock.close()
258 logging.info('SpawnLogcatServer: terminated')
259 sys.exit(0)
260
261
262 def Ping(self):
263 def timeout_handler(x):
264 if x is None:
265 raise PingTimeoutError
266
267 self._last_ping = self.Timestamp()
268 self.SendRequest('ping', {}, timeout_handler, 5)
269
270 def HandleShellRequest(self, msg):
271 params = msg['params']
272 stdout = stderr = err_msg = ""
273 try:
274 p = subprocess.Popen([params['cmd']] + params['args'],
275 stdout=subprocess.PIPE, stderr=subprocess.PIPE)
276 stdout, stderr = p.communicate()
277 except Exception as e:
278 err_msg = str(e)
279
280 self.SendResponse(msg, RESPONSE_SUCCESS,
281 {'output': stdout + stderr, 'err_msg': err_msg})
282
283 def HandleRequest(self, msg):
284 if msg['name'] == 'shell':
285 self.HandleShellRequest(msg)
286 elif msg['name'] == 'terminal':
287 self.SpawnGhost(self.SHELL, msg['params']['sid'])
288 self.SendResponse(msg, RESPONSE_SUCCESS)
289 elif msg['name'] == 'logcat':
290 self.SpawnGhost(self.LOGCAT, msg['params']['sid'],
291 msg['params']['filename'])
292 self.SendResponse(msg, RESPONSE_SUCCESS)
293
294 def HandleResponse(self, response):
295 rid = str(response['rid'])
296 if rid in self._requests:
297 handler = self._requests[rid][2]
298 del self._requests[rid]
299 if callable(handler):
300 handler(response)
301 else:
302 print(response, self._requests.keys())
303 logging.warning('Recvied unsolicited response, ignored')
304
305 def ParseMessage(self):
306 msgs_json = self._buf.split(_SEPARATOR)
307 self._buf = msgs_json.pop()
308
309 for msg_json in msgs_json:
310 try:
311 msg = json.loads(msg_json)
312 except ValueError:
313 # Ignore mal-formed message.
314 continue
315
316 if 'name' in msg:
317 self.HandleRequest(msg)
318 elif 'response' in msg:
319 self.HandleResponse(msg)
320 else: # Ingnore mal-formed message.
321 pass
322
323 def ScanForTimeoutRequests(self):
324 for rid in self._requests.keys()[:]:
325 request_time, timeout, handler = self._requests[rid]
326 if self.Timestamp() - request_time > timeout:
327 handler(None)
328 del self._requests[rid]
329
330 def Listen(self):
331 try:
332 while True:
333 rds, _, _ = select.select([self._sock], [], [], _PING_INTERVAL / 2)
334
335 if self._sock in rds:
336 self._buf += self._sock.recv(_BUFSIZE)
337 self.ParseMessage()
338
339 if self.Timestamp() - self._last_ping > _PING_INTERVAL:
340 self.Ping()
341 self.ScanForTimeoutRequests()
342
343 if self._reset:
344 self.Reset()
345 break
346 except socket.error:
347 raise RuntimeError('Connection dropped')
348 except PingTimeoutError:
349 raise RuntimeError('Connection timeout')
350 finally:
351 self._sock.close()
352
353 self._queue.put('resume')
354
355 if self._mode != Ghost.AGENT:
356 sys.exit(1)
357
358 def Register(self):
359 non_local = {}
360 for addr in self._overlord_addrs:
361 non_local['addr'] = addr
362 def registered(response):
363 if response is None:
364 self._reset = True
365 raise RuntimeError('Register request timeout')
366 logging.info('Registered with Overlord at %s:%d', *non_local['addr'])
367 self._queue.put("pause", True)
368
369 try:
370 logging.info('Trying %s:%d ...', *addr)
371 self.Reset()
372 self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
373 self._sock.settimeout(_PING_TIMEOUT)
374 self._sock.connect(addr)
375
376 logging.info('Connection established, registering...')
377 handler = {
378 Ghost.AGENT: registered,
379 Ghost.SHELL: self.SpawnPTYServer,
380 Ghost.LOGCAT: self.SpawnLogcatServer
381 }[self._mode]
382
383 # Machine ID may change if MAC address is used (USB-ethernet dongle
384 # plugged/unplugged)
385 self._machine_id = self.GetMachineID()
Wei-Ning Huang7d029b12015-03-06 10:32:15 +0800386 self.SendRequest('register',
387 {'mode': self._mode, 'mid': self._machine_id,
388 'cid': self._client_id,
389 'properties': self._properties}, handler)
Wei-Ning Huang1cea6112015-03-02 12:45:34 +0800390 except socket.error:
391 pass
392 else:
393 self._sock.settimeout(None)
394 self.Listen()
395
396 raise RuntimeError("Cannot connect to any server")
397
398 def StartLanDiscovery(self):
399 """Start to listen to LAN discovery packet at
400 _OVERLORD_LAN_DISCOVERY_PORT."""
401 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
402 s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
403 s.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
404 try:
405 s.bind(('0.0.0.0', _OVERLORD_LAN_DISCOVERY_PORT))
406 except socket.error as e:
407 logging.error("LAN discovery: %s, abort", e)
408 return
409
410 logging.info('LAN Discovery: started')
411 while True:
412 rd, _, _ = select.select([s], [], [], 1)
413
414 if s in rd:
415 data, source_addr = s.recvfrom(_BUFSIZE)
416 parts = data.split()
417 if parts[0] == 'OVERLORD':
418 ip = source_addr[0]
419 port = int(parts[1].lstrip(':'))
420 addr = (ip, port)
421 self._queue.put(addr, True)
422
423 try:
424 obj = self._queue.get(False)
425 except Queue.Empty:
426 pass
427 else:
428 if type(obj) is not str:
429 self._queue.put(obj)
430 elif obj == 'pause':
431 logging.info('LAN Discovery: paused')
432 while True:
433 obj = self._queue.get(True)
434 if obj == 'resume':
435 logging.info('LAN Discovery: resumed')
436 break
437
438 def ScanGateway(self):
439 for addr in [(x, _OVERLORD_PORT) for x in self.GetGateWayIP()]:
440 if addr not in self._overlord_addrs:
441 self._overlord_addrs.append(addr)
442
Wei-Ning Huang7d029b12015-03-06 10:32:15 +0800443 def Start(self, no_lan_disc=False):
Wei-Ning Huang1cea6112015-03-02 12:45:34 +0800444 logging.info('%s started', self.MODE_NAME[self._mode])
445 logging.info('MID: %s', self._machine_id)
446 logging.info('CID: %s', self._client_id)
447
Wei-Ning Huang7d029b12015-03-06 10:32:15 +0800448 if not no_lan_disc:
Wei-Ning Huang1cea6112015-03-02 12:45:34 +0800449 t = threading.Thread(target=self.StartLanDiscovery)
450 t.daemon = True
451 t.start()
452
453 try:
454 while True:
455 try:
456 addr = self._queue.get(False)
457 except Queue.Empty:
458 pass
459 else:
460 if type(addr) == tuple and addr not in self._overlord_addrs:
461 logging.info('LAN Discovery: got overlord address %s:%d', *addr)
462 self._overlord_addrs.append(addr)
463
464 try:
465 self.ScanGateway()
466 self.Register()
467 except Exception as e:
468 logging.info(str(e) + ', retrying in %ds' % _RETRY_INTERVAL)
469 time.sleep(_RETRY_INTERVAL)
470
471 self.Reset()
472 except KeyboardInterrupt:
473 logging.error('Received keyboard interrupt, quit')
474 sys.exit(0)
475
476
477def main():
478 logger = logging.getLogger()
479 logger.setLevel(logging.INFO)
480
Wei-Ning Huang7d029b12015-03-06 10:32:15 +0800481 parser = argparse.ArgumentParser()
482 parser.add_argument('--no-lan-disc', dest='no_lan_disc', action='store_true',
483 default=False, help='disable LAN discovery')
484 parser.add_argument("--prop-file", dest="prop_file", type=str, default=None,
485 help='file containing the JSON representation of client '
486 'properties')
487 parser.add_argument('overlord_ip', metavar='OVERLORD_IP', type=str,
488 nargs='*', help='overlord server address')
489 args = parser.parse_args()
490
Wei-Ning Huang1cea6112015-03-02 12:45:34 +0800491 addrs = [('localhost', _OVERLORD_PORT)]
Wei-Ning Huang7d029b12015-03-06 10:32:15 +0800492 addrs += [(x, _OVERLORD_PORT) for x in args.overlord_ip]
Wei-Ning Huang1cea6112015-03-02 12:45:34 +0800493
494 g = Ghost(addrs)
Wei-Ning Huang7d029b12015-03-06 10:32:15 +0800495 if args.prop_file:
496 g.LoadPropertiesFromFile(args.prop_file)
497 g.Start(args.no_lan_disc)
Wei-Ning Huang1cea6112015-03-02 12:45:34 +0800498
499
500if __name__ == '__main__':
501 main()