blob: c7f89edc4c70314cb5ce77735d1d5404af09995c [file] [log] [blame]
Wei-Ning Huang91aaeed2015-09-24 14:51:56 +08001#!/usr/bin/python -u
2# -*- coding: utf-8 -*-
3#
4# Copyright 2015 The Chromium OS Authors. All rights reserved.
5# Use of this source code is governed by a BSD-style license that can be
6# found in the LICENSE file.
7
8from __future__ import print_function
9
10import argparse
11import ast
12import base64
13import fcntl
14import hashlib
15import httplib
16import json
17import jsonrpclib
18import logging
19import os
20import re
21import select
22import signal
23import socket
24import StringIO
25import struct
26import subprocess
27import sys
28import tempfile
29import termios
30import threading
31import time
32import tty
33import urllib2
34import urlparse
35
36from jsonrpclib.SimpleJSONRPCServer import SimpleJSONRPCServer
37from jsonrpclib.config import Config
38from ws4py.client import WebSocketBaseClient
39
40# Python version >= 2.7.9 enables SSL check by default, bypass it.
41try:
42 import ssl
43 # pylint: disable=W0212
44 ssl._create_default_https_context = ssl._create_unverified_context
45except Exception:
46 pass
47
48
49_ESCAPE = '~'
50_BUFSIZ = 8192
51_OVERLORD_PORT = 4455
52_OVERLORD_HTTP_PORT = 9000
53_OVERLORD_CLIENT_DAEMON_PORT = 4488
54_OVERLORD_CLIENT_DAEMON_RPC_ADDR = ('127.0.0.1', _OVERLORD_CLIENT_DAEMON_PORT)
55
Wei-Ning Huangee7ca8d2015-12-12 05:48:02 +080056_DEFAULT_HTTP_TIMEOUT = 30
Wei-Ning Huang91aaeed2015-09-24 14:51:56 +080057_LIST_CACHE_TIMEOUT = 2
58_DEFAULT_TERMINAL_WIDTH = 80
Wei-Ning Huangee7ca8d2015-12-12 05:48:02 +080059_RETRY_TIMES = 3
Wei-Ning Huang91aaeed2015-09-24 14:51:56 +080060
61# echo -n overlord | md5sum
62_HTTP_BOUNDARY_MAGIC = '9246f080c855a69012707ab53489b921'
63
64_CONTROL_START = 128
65_CONTROL_END = 129
66_SSH_CONTROL_SOCKET_PREFIX = os.path.join(tempfile.gettempdir(),
67 'ovl-ssh-control-')
68
69# A string that will always be included in the response of
70# GET http://OVERLORD_SERVER:_OVERLORD_HTTP_PORT
71_OVERLORD_RESPONSE_KEYWORD = '<html>'
72
73
74def GetVersionDigest():
75 """Return the sha1sum of the current executing script."""
76 with open(__file__, 'r') as f:
77 return hashlib.sha1(f.read()).hexdigest()
78
79
80def KillGraceful(pid, wait_secs=1):
81 """Kill a process gracefully by first sending SIGTERM, wait for some time,
82 then send SIGKILL to make sure it's killed."""
83 try:
84 os.kill(pid, signal.SIGTERM)
85 time.sleep(wait_secs)
86 os.kill(pid, signal.SIGKILL)
87 except OSError:
88 pass
89
90
Wei-Ning Huangee7ca8d2015-12-12 05:48:02 +080091def AutoRetry(action_name, retries):
92 """Decorator for retry function call."""
93 def Wrap(func):
94 def Loop(*args, **kwargs):
Wei-Ning Huang5564eea2016-01-19 14:36:45 +080095 for unused_i in range(retries):
Wei-Ning Huangee7ca8d2015-12-12 05:48:02 +080096 try:
97 func(*args, **kwargs)
98 except Exception as e:
Wei-Ning Huang5564eea2016-01-19 14:36:45 +080099 print('error: %s: %s: retrying ...' % (args[0], e))
Wei-Ning Huangee7ca8d2015-12-12 05:48:02 +0800100 else:
101 break
102 else:
Wei-Ning Huang5564eea2016-01-19 14:36:45 +0800103 print('error: failed to %s %s' % (action_name, args[0]))
Wei-Ning Huangee7ca8d2015-12-12 05:48:02 +0800104 return Loop
105 return Wrap
106
107
Wei-Ning Huang91aaeed2015-09-24 14:51:56 +0800108def BasicAuthHeader(user, password):
109 """Return HTTP basic auth header."""
110 credential = base64.b64encode('%s:%s' % (user, password))
111 return ('Authorization', 'Basic %s' % credential)
112
113
114def GetTerminalSize():
115 """Retrieve terminal window size."""
116 ws = struct.pack('HHHH', 0, 0, 0, 0)
117 ws = fcntl.ioctl(0, termios.TIOCGWINSZ, ws)
118 lines, columns, unused_x, unused_y = struct.unpack('HHHH', ws)
119 return lines, columns
120
121
122def MakeRequestUrl(state, url):
123 return 'http%s://%s' % ('s' if state.ssl else '', url)
124
125
126class ProgressBar(object):
127 SIZE_WIDTH = 11
128 SPEED_WIDTH = 10
129 DURATION_WIDTH = 6
130 PERCENTAGE_WIDTH = 8
131
132 def __init__(self, name):
133 self._start_time = time.time()
134 self._name = name
135 self._size = 0
136 self._width = 0
137 self._name_width = 0
138 self._name_max = 0
139 self._stat_width = 0
140 self._max = 0
141 self.CalculateSize()
142 self.SetProgress(0)
143
144 def CalculateSize(self):
145 self._width = GetTerminalSize()[1] or _DEFAULT_TERMINAL_WIDTH
146 self._name_width = int(self._width * 0.3)
147 self._name_max = self._name_width
148 self._stat_width = self.SIZE_WIDTH + self.SPEED_WIDTH + self.DURATION_WIDTH
149 self._max = (self._width - self._name_width - self._stat_width -
150 self.PERCENTAGE_WIDTH)
151
152 def SizeToHuman(self, size_in_bytes):
153 if size_in_bytes < 1024:
154 unit = 'B'
155 value = size_in_bytes
156 elif size_in_bytes < 1024 ** 2:
157 unit = 'KiB'
158 value = size_in_bytes / 1024.0
159 elif size_in_bytes < 1024 ** 3:
160 unit = 'MiB'
161 value = size_in_bytes / (1024.0 ** 2)
162 elif size_in_bytes < 1024 ** 4:
163 unit = 'GiB'
164 value = size_in_bytes / (1024.0 ** 3)
165 return ' %6.1f %3s' % (value, unit)
166
167 def SpeedToHuman(self, speed_in_bs):
168 if speed_in_bs < 1024:
169 unit = 'B'
170 value = speed_in_bs
171 elif speed_in_bs < 1024 ** 2:
172 unit = 'K'
173 value = speed_in_bs / 1024.0
174 elif speed_in_bs < 1024 ** 3:
175 unit = 'M'
176 value = speed_in_bs / (1024.0 ** 2)
177 elif speed_in_bs < 1024 ** 4:
178 unit = 'G'
179 value = speed_in_bs / (1024.0 ** 3)
180 return ' %6.1f%s/s' % (value, unit)
181
182 def DurationToClock(self, duration):
183 return ' %02d:%02d' % (duration / 60, duration % 60)
184
185 def SetProgress(self, percentage, size=None):
186 current_width = GetTerminalSize()[1]
187 if self._width != current_width:
188 self.CalculateSize()
189
190 if size is not None:
191 self._size = size
192
193 elapse_time = time.time() - self._start_time
194 speed = self._size / float(elapse_time)
195
196 size_str = self.SizeToHuman(self._size)
197 speed_str = self.SpeedToHuman(speed)
198 elapse_str = self.DurationToClock(elapse_time)
199
200 width = int(self._max * percentage / 100.0)
201 sys.stdout.write(
202 '%*s' % (- self._name_max,
203 self._name if len(self._name) <= self._name_max else
204 self._name[:self._name_max - 4] + ' ...') +
205 size_str + speed_str + elapse_str +
206 ((' [' + '#' * width + ' ' * (self._max - width) + ']' +
207 '%4d%%' % int(percentage)) if self._max > 2 else '') + '\r')
208 sys.stdout.flush()
209
210 def End(self):
211 self.SetProgress(100.0)
212 sys.stdout.write('\n')
213 sys.stdout.flush()
214
215
216class DaemonState(object):
217 """DaemonState is used for storing Overlord state info."""
218 def __init__(self):
219 self.version_sha1sum = GetVersionDigest()
220 self.host = None
221 self.port = None
222 self.ssl = False
223 self.ssh = False
224 self.orig_host = None
225 self.ssh_pid = None
226 self.username = None
227 self.password = None
228 self.selected_mid = None
229 self.forwards = {}
230 self.listing = []
231 self.last_list = 0
232
233
234class OverlordClientDaemon(object):
235 """Overlord Client Daemon."""
236 def __init__(self):
237 self._state = DaemonState()
238 self._server = None
239
240 def Start(self):
241 self.StartRPCServer()
242
243 def StartRPCServer(self):
244 self._server = SimpleJSONRPCServer(_OVERLORD_CLIENT_DAEMON_RPC_ADDR,
245 logRequests=False)
246 exports = [
247 (self.State, 'State'),
248 (self.Ping, 'Ping'),
249 (self.GetPid, 'GetPid'),
250 (self.Connect, 'Connect'),
251 (self.Clients, 'Clients'),
252 (self.SelectClient, 'SelectClient'),
253 (self.AddForward, 'AddForward'),
254 (self.RemoveForward, 'RemoveForward'),
255 (self.RemoveAllForward, 'RemoveAllForward'),
256 ]
257 for func, name in exports:
258 self._server.register_function(func, name)
259
260 pid = os.fork()
261 if pid == 0:
262 self._server.serve_forever()
263
264 @staticmethod
265 def GetRPCServer():
266 """Returns the Overlord client daemon RPC server."""
267 server = jsonrpclib.Server('http://%s:%d' %
268 _OVERLORD_CLIENT_DAEMON_RPC_ADDR)
269 try:
270 server.Ping()
271 except Exception:
272 return None
273 return server
274
275 def State(self):
276 return self._state
277
278 def Ping(self):
279 return True
280
281 def GetPid(self):
282 return os.getpid()
283
284 def _UrlOpen(self, url):
285 """Wrapper for urllib2.urlopen.
286
287 It selects correct HTTP scheme according to self._stat.ssl and add HTTP
288 basic auth headers.
289 """
290 url = MakeRequestUrl(self._state, url)
291 request = urllib2.Request(url)
292 if self._state.username is not None and self._state.password is not None:
293 request.add_header(*BasicAuthHeader(self._state.username,
294 self._state.password))
Wei-Ning Huangee7ca8d2015-12-12 05:48:02 +0800295 return urllib2.urlopen(request, timeout=_DEFAULT_HTTP_TIMEOUT)
Wei-Ning Huang91aaeed2015-09-24 14:51:56 +0800296
297 def _GetJSON(self, path):
298 url = '%s:%d%s' % (self._state.host, self._state.port, path)
299 return json.loads(self._UrlOpen(url).read())
300
301 def Connect(self, host, port=_OVERLORD_HTTP_PORT, ssh_pid=None,
302 username=None, password=None, orig_host=None):
303 self._state.username = username
304 self._state.password = password
305 self._state.host = host
306 self._state.port = port
307 self._state.ssl = False
308 self._state.orig_host = orig_host
309 self._state.ssh_pid = ssh_pid
310 self._state.selected_mid = None
311
312 try:
313 h = self._UrlOpen('%s:%d' % (host, port))
314 # Probably not an HTTP server, try HTTPS
315 if _OVERLORD_RESPONSE_KEYWORD not in h.read():
316 self._state.ssl = True
317 self._UrlOpen('%s:%d' % (host, port))
318 except urllib2.HTTPError as e:
319 logging.exception(e)
320 return e.getcode()
321 except Exception as e:
322 logging.exception(e)
323 return str(e)
324 return True
325
326 def Clients(self):
327 if time.time() - self._state.last_list <= _LIST_CACHE_TIMEOUT:
328 return self._state.listing
329
330 mids = [client['mid'] for client in self._GetJSON('/api/agents/list')]
331 self._state.listing = sorted(list(set(mids)))
332 self._state.last_list = time.time()
333 return self._state.listing
334
335 def SelectClient(self, mid):
336 self._state.selected_mid = mid
337
338 def AddForward(self, mid, remote, local, pid):
339 self._state.forwards[local] = (mid, remote, pid)
340
341 def RemoveForward(self, local_port):
342 try:
343 unused_mid, unused_remote, pid = self._state.forwards[local_port]
344 KillGraceful(pid)
345 del self._state.forwards[local_port]
346 except (KeyError, OSError):
347 pass
348
349 def RemoveAllForward(self):
350 for unused_mid, unused_remote, pid in self._state.forwards.values():
351 try:
352 KillGraceful(pid)
353 except OSError:
354 pass
355 self._state.forwards = {}
356
357
358class TerminalWebSocketClient(WebSocketBaseClient):
359 def __init__(self, mid, *args, **kwargs):
360 super(TerminalWebSocketClient, self).__init__(*args, **kwargs)
361 self._mid = mid
362 self._stdin_fd = sys.stdin.fileno()
363 self._old_termios = None
364
365 def handshake_ok(self):
366 pass
367
368 def opened(self):
369 nonlocals = {'size': (80, 40)}
370
371 def _ResizeWindow():
372 size = GetTerminalSize()
373 if size != nonlocals['size']: # Size not changed, ignore
374 control = {'command': 'resize', 'params': list(size)}
375 payload = chr(_CONTROL_START) + json.dumps(control) + chr(_CONTROL_END)
376 nonlocals['size'] = size
377 try:
378 self.send(payload, binary=True)
379 except Exception:
380 pass
381
382 def _FeedInput():
383 flags = fcntl.fcntl(sys.stdin, fcntl.F_GETFL)
384 fcntl.fcntl(sys.stdin, fcntl.F_SETFL, flags | os.O_NONBLOCK)
385
386 self._old_termios = termios.tcgetattr(self._stdin_fd)
387 tty.setraw(self._stdin_fd)
388
389 READY, ENTER_PRESSED, ESCAPE_PRESSED = range(3)
390
391 try:
392 state = READY
393 while True:
394 rd, unused_w, unused_x = select.select([sys.stdin], [], [], 0.5)
395
396 # We can't install a signal handler in the main thread since it'll
397 # interrupt the read/write system call (ws4py performing send/recv).
398 # Use polling instead (select's timeout is 0.5 seconds)
399 _ResizeWindow()
400
401 if sys.stdin in rd:
402 data = sys.stdin.read()
403
404 # Scan for escape sequence
405 for x in data:
406 if state == READY:
407 state = ENTER_PRESSED if x == chr(0x0d) else READY
408 elif state == ENTER_PRESSED:
409 state = ESCAPE_PRESSED if x == _ESCAPE else READY
410 elif state == ESCAPE_PRESSED:
411 if x == '.':
412 self.close()
413 raise RuntimeError('quit')
414 else:
415 state = READY
416
417 self.send(data)
418 except (KeyboardInterrupt, RuntimeError):
419 pass
420
421 t = threading.Thread(target=_FeedInput)
422 t.daemon = True
423 t.start()
424
425 def closed(self, code, reason=None):
426 termios.tcsetattr(self._stdin_fd, termios.TCSANOW, self._old_termios)
427 print('Connection to %s closed.' % self._mid)
428
429 def received_message(self, msg):
430 if msg.is_binary:
431 sys.stdout.write(msg.data)
432 sys.stdout.flush()
433
434
435class ShellWebSocketClient(WebSocketBaseClient):
436 def __init__(self, output, *args, **kwargs):
437 """Constructor.
438
439 Args:
440 output: output file object.
441 """
442 self.output = output
443 super(ShellWebSocketClient, self).__init__(*args, **kwargs)
444
445 def handshake_ok(self):
446 pass
447
448 def opened(self):
449 pass
450
451 def closed(self, code, reason=None):
452 pass
453
454 def received_message(self, msg):
455 if msg.is_binary:
456 self.output.write(msg.data)
457 self.output.flush()
458
459
460class ForwarderWebSocketClient(WebSocketBaseClient):
461 def __init__(self, sock, *args, **kwargs):
462 super(ForwarderWebSocketClient, self).__init__(*args, **kwargs)
463 self._sock = sock
464 self._stop = threading.Event()
465
466 def handshake_ok(self):
467 pass
468
469 def opened(self):
470 def _FeedInput():
471 try:
472 self._sock.setblocking(False)
473 while True:
474 rd, unused_w, unused_x = select.select([self._sock], [], [], 0.5)
475 if self._stop.is_set():
476 break
477 if self._sock in rd:
478 data = self._sock.recv(_BUFSIZ)
479 if len(data) == 0:
480 break
481 self.send(data, binary=True)
482 except Exception:
483 pass
484 finally:
485 self._sock.close()
486 self.close()
487
488 t = threading.Thread(target=_FeedInput)
489 t.daemon = True
490 t.start()
491
492 def closed(self, code, reason=None):
493 self._stop.set()
494 sys.exit(0)
495
496 def received_message(self, msg):
497 if msg.is_binary:
498 self._sock.send(msg.data)
499
500
501def Arg(*args, **kwargs):
502 return (args, kwargs)
503
504
505def Command(command, help_msg=None, args=None):
506 """Decorator for adding argparse parameter for a method."""
507 if args is None:
508 args = []
509 def WrapFunc(func):
510 def Wrapped(*args, **kwargs):
511 return func(*args, **kwargs)
512 # pylint: disable=W0212
513 Wrapped.__arg_attr = {'command': command, 'help': help_msg, 'args': args}
514 return Wrapped
515 return WrapFunc
516
517
518def ParseMethodSubCommands(cls):
519 """Decorator for a class using the @Command decorator.
520
521 This decorator retrieve command info from each method and append it in to the
522 SUBCOMMANDS class variable, which is later used to construct parser.
523 """
524 for unused_key, method in cls.__dict__.iteritems():
525 if hasattr(method, '__arg_attr'):
526 cls.SUBCOMMANDS.append(method.__arg_attr) # pylint: disable=W0212
527 return cls
528
529
530@ParseMethodSubCommands
531class OverlordCLIClient(object):
532 """Overlord command line interface client."""
533
534 SUBCOMMANDS = []
535
536 def __init__(self):
537 self._parser = self._BuildParser()
538 self._selected_mid = None
539 self._server = None
540 self._state = None
541
542 def _BuildParser(self):
543 root_parser = argparse.ArgumentParser(prog='ovl')
544 subparsers = root_parser.add_subparsers(help='sub-command')
545
546 root_parser.add_argument('-s', dest='selected_mid', action='store',
547 default=None,
548 help='select target to execute command on')
549 root_parser.add_argument('-S', dest='select_mid_before_action',
550 action='store_true', default=False,
551 help='select target before executing command')
552
553 for attr in self.SUBCOMMANDS:
554 parser = subparsers.add_parser(attr['command'], help=attr['help'])
555 parser.set_defaults(which=attr['command'])
556 for arg in attr['args']:
557 parser.add_argument(*arg[0], **arg[1])
558
559 return root_parser
560
561 def Main(self):
562 # We want to pass the rest of arguments after shell command directly to the
563 # function without parsing it.
564 try:
565 index = sys.argv.index('shell')
566 except ValueError:
567 args = self._parser.parse_args()
568 else:
569 args = self._parser.parse_args(sys.argv[1:index + 1])
570
571 command = args.which
572 self._selected_mid = args.selected_mid
573
Wei-Ning Huang5564eea2016-01-19 14:36:45 +0800574 if command == 'start-server':
575 self.StartServer()
576 return
577 elif command == 'kill-server':
Wei-Ning Huang91aaeed2015-09-24 14:51:56 +0800578 self.KillServer()
579 return
580
Wei-Ning Huang5564eea2016-01-19 14:36:45 +0800581 self.CheckDaemon()
Wei-Ning Huang91aaeed2015-09-24 14:51:56 +0800582 if command == 'status':
583 self.Status()
584 return
585 elif command == 'connect':
586 self.Connect(args)
587 return
588
589 # The following command requires connection to the server
590 self.CheckConnection()
591
592 if args.select_mid_before_action:
593 self.SelectClient(store=False)
594
595 if command == 'select':
596 self.SelectClient(args)
597 elif command == 'ls':
598 self.ListClients()
599 elif command == 'shell':
600 command = sys.argv[sys.argv.index('shell') + 1:]
601 self.Shell(command)
602 elif command == 'push':
603 self.Push(args)
604 elif command == 'pull':
605 self.Pull(args)
606 elif command == 'forward':
607 self.Forward(args)
608
609 def _UrlOpen(self, url):
610 url = MakeRequestUrl(self._state, url)
611 request = urllib2.Request(url)
612 if self._state.username is not None and self._state.password is not None:
613 request.add_header(*BasicAuthHeader(self._state.username,
614 self._state.password))
Wei-Ning Huangee7ca8d2015-12-12 05:48:02 +0800615 return urllib2.urlopen(request, timeout=_DEFAULT_HTTP_TIMEOUT)
Wei-Ning Huang91aaeed2015-09-24 14:51:56 +0800616
617 def _HTTPPostFile(self, url, filename, progress=None, user=None, passwd=None):
618 """Perform HTTP POST and upload file to Overlord.
619
620 To minimize the external dependencies, we construct the HTTP post request
621 by ourselves.
622 """
623 url = MakeRequestUrl(self._state, url)
624 size = os.stat(filename).st_size
625 boundary = '-----------%s' % _HTTP_BOUNDARY_MAGIC
626 CRLF = '\r\n'
627 parse = urlparse.urlparse(url)
628
629 part_headers = [
630 '--' + boundary,
631 'Content-Disposition: form-data; name="file"; '
632 'filename="%s"' % os.path.basename(filename),
633 'Content-Type: application/octet-stream',
634 '', ''
635 ]
636 part_header = CRLF.join(part_headers)
637 end_part = CRLF + '--' + boundary + '--' + CRLF
638
639 content_length = len(part_header) + size + len(end_part)
640 if parse.scheme == 'http':
641 h = httplib.HTTP(parse.netloc)
642 else:
643 h = httplib.HTTPS(parse.netloc)
644
645 post_path = url[url.index(parse.netloc) + len(parse.netloc):]
646 h.putrequest('POST', post_path)
647 h.putheader('Content-Length', content_length)
648 h.putheader('Content-Type', 'multipart/form-data; boundary=%s' % boundary)
649
650 if user and passwd:
651 h.putheader(*BasicAuthHeader(user, passwd))
652 h.endheaders()
653 h.send(part_header)
654
655 count = 0
656 with open(filename, 'r') as f:
657 while True:
658 data = f.read(_BUFSIZ)
659 if not data:
660 break
661 count += len(data)
662 if progress:
663 progress(int(count * 100.0 / size), count)
664 h.send(data)
665
666 h.send(end_part)
667 progress(100)
668
669 if count != size:
670 logging.warning('file changed during upload, upload may be truncated.')
671
672 errcode, unused_x, unused_y = h.getreply()
673 return errcode == 200
674
Wei-Ning Huang5564eea2016-01-19 14:36:45 +0800675 def CheckDaemon(self):
Wei-Ning Huang91aaeed2015-09-24 14:51:56 +0800676 self._server = OverlordClientDaemon.GetRPCServer()
677 if self._server is None:
678 print('* daemon not running, starting it now on port %d ... *' %
679 _OVERLORD_CLIENT_DAEMON_PORT)
Wei-Ning Huang5564eea2016-01-19 14:36:45 +0800680 self.StartServer()
Wei-Ning Huang91aaeed2015-09-24 14:51:56 +0800681
682 self._state = self._server.State()
683 sha1sum = GetVersionDigest()
684
685 if sha1sum != self._state.version_sha1sum:
686 print('ovl server is out of date. killing...')
687 KillGraceful(self._server.GetPid())
Wei-Ning Huang5564eea2016-01-19 14:36:45 +0800688 self.StartServer()
Wei-Ning Huang91aaeed2015-09-24 14:51:56 +0800689
690 def GetSSHControlFile(self, host):
691 return _SSH_CONTROL_SOCKET_PREFIX + host
692
693 def SSHTunnel(self, user, host, port):
694 """SSH forward the remote overlord server.
695
696 Overlord server may not have port 9000 open to the public network, in such
697 case we can SSH forward the port to localhost.
698 """
699
700 control_file = self.GetSSHControlFile(host)
701 try:
702 os.unlink(control_file)
703 except Exception:
704 pass
705
706 subprocess.Popen([
707 'ssh', '-Nf',
708 '-M', # Enable master mode
709 '-S', control_file,
710 '-L', '9000:localhost:9000',
711 '-p', str(port),
712 '%s%s' % (user + '@' if user else '', host)
713 ]).wait()
714
715 p = subprocess.Popen([
716 'ssh',
717 '-S', control_file,
718 '-O', 'check', host,
719 ], stderr=subprocess.PIPE)
720 unused_stdout, stderr = p.communicate()
721
722 s = re.search(r'pid=(\d+)', stderr)
723 if s:
724 return int(s.group(1))
725
726 raise RuntimeError('can not establish ssh connection')
727
728 def CheckConnection(self):
729 if self._state.host is None:
730 raise RuntimeError('not connected to any server, abort')
731
732 try:
733 self._server.Clients()
734 except Exception:
735 raise RuntimeError('remote server disconnected, abort')
736
737 if self._state.ssh_pid is not None:
738 ret = subprocess.Popen(['kill', '-0', str(self._state.ssh_pid)],
739 stdout=subprocess.PIPE,
740 stderr=subprocess.PIPE).wait()
741 if ret != 0:
742 raise RuntimeError('ssh tunnel disconnected, please re-connect')
743
744 def CheckClient(self):
745 if self._selected_mid is None:
746 if self._state.selected_mid is None:
747 raise RuntimeError('No client is selected')
748 self._selected_mid = self._state.selected_mid
749
750 if self._selected_mid not in self._server.Clients():
751 raise RuntimeError('client %s disappeared' % self._selected_mid)
752
753 def CheckOutput(self, command):
754 headers = []
755 if self._state.username is not None and self._state.password is not None:
756 headers.append(BasicAuthHeader(self._state.username,
757 self._state.password))
758
759 scheme = 'ws%s://' % ('s' if self._state.ssl else '')
760 sio = StringIO.StringIO()
761 ws = ShellWebSocketClient(sio,
762 scheme + '%s:%d/api/agent/shell/%s?command=%s' %
763 (self._state.host, self._state.port,
764 self._selected_mid, urllib2.quote(command)),
765 headers=headers)
766 ws.connect()
767 ws.run()
768 return sio.getvalue()
769
770 @Command('status', 'show Overlord connection status')
771 def Status(self):
772 if self._state.host is None:
773 print('Not connected to any host.')
774 else:
775 if self._state.ssh_pid is not None:
776 print('Connected to %s with SSH tunneling.' % self._state.orig_host)
777 else:
778 print('Connected to %s:%d.' % (self._state.host, self._state.port))
779
780 if self._selected_mid is None:
781 self._selected_mid = self._state.selected_mid
782
783 if self._selected_mid is None:
784 print('No client is selected.')
785 else:
786 print('Client %s selected.' % self._selected_mid)
787
788 @Command('connect', 'connect to Overlord server', [
789 Arg('host', metavar='HOST', type=str, default='localhost',
790 help='Overlord hostname/IP'),
791 Arg('port', metavar='PORT', type=int,
792 default=_OVERLORD_HTTP_PORT, help='Overlord port'),
793 Arg('-f', '--forward', dest='ssh_forward', default=False,
794 action='store_true',
795 help='connect with SSH forwarding to the host'),
796 Arg('-p', '--ssh-port', dest='ssh_port', default=22,
797 type=int, help='SSH server port for SSH forwarding'),
798 Arg('-l', '--ssh-login', dest='ssh_login', default='',
799 type=str, help='SSH server login name for SSH forwarding'),
800 Arg('-u', '--user', dest='user', default=None,
801 type=str, help='Overlord HTTP auth username'),
802 Arg('-w', '--passwd', dest='passwd', default=None, type=str,
803 help='Overlord HTTP auth password')])
804 def Connect(self, args):
805 ssh_pid = None
806 host = args.host
807 orig_host = args.host
808
809 if args.ssh_forward:
810 # Kill previous SSH tunnel
811 self.KillSSHTunnel()
812
813 ssh_pid = self.SSHTunnel(args.ssh_login, args.host, args.ssh_port)
814 host = 'localhost'
815
816 status = self._server.Connect(host, args.port, ssh_pid, args.user,
817 args.passwd, orig_host)
818 if status is not True:
819 if isinstance(status, int):
820 if status == 401:
821 msg = '401 Unauthorized'
822 else:
823 msg = 'HTTP %d' % status
824 else:
825 msg = status
826 print('can not connect to %s: %s' % (host, msg))
827
Wei-Ning Huang5564eea2016-01-19 14:36:45 +0800828 @Command('start-server', 'start overlord CLI client server')
829 def StartServer(self):
830 self._server = OverlordClientDaemon.GetRPCServer()
831 if self._server is None:
832 OverlordClientDaemon().Start()
833 time.sleep(1)
834 self._server = OverlordClientDaemon.GetRPCServer()
835 if self._server is not None:
836 print('* daemon started successfully *')
837
Wei-Ning Huang91aaeed2015-09-24 14:51:56 +0800838 @Command('kill-server', 'kill overlord CLI client server')
839 def KillServer(self):
840 self._server = OverlordClientDaemon.GetRPCServer()
841 if self._server is None:
842 return
843
Wei-Ning Huangee7ca8d2015-12-12 05:48:02 +0800844 self._state = self._server.State()
845
Wei-Ning Huang91aaeed2015-09-24 14:51:56 +0800846 # Kill SSH Tunnel
847 self.KillSSHTunnel()
848
849 # Kill server daemon
850 KillGraceful(self._server.GetPid())
851
852 def KillSSHTunnel(self):
853 if self._state.ssh_pid is not None:
854 KillGraceful(self._state.ssh_pid)
855
856 @Command('ls', 'list all clients')
857 def ListClients(self):
858 for client in self._server.Clients():
859 print(client)
860
861 @Command('select', 'select default client', [
862 Arg('mid', metavar='mid', nargs='?', default=None)])
863 def SelectClient(self, args=None, store=True):
864 clients = self._server.Clients()
865
866 mid = args.mid if args is not None else None
867 if mid is None:
868 print('Select from the following clients:')
869 for i, client in enumerate(clients):
870 print(' %d. %s' % (i + 1, client))
871
872 print('\nSelection: ', end='')
873 try:
874 choice = int(raw_input()) - 1
875 mid = clients[choice]
876 except ValueError:
877 raise RuntimeError('select: invalid selection')
878 except IndexError:
879 raise RuntimeError('select: selection out of range')
880 else:
881 if mid not in clients:
882 raise RuntimeError('select: client %s does not exist' % mid)
883
884 self._selected_mid = mid
885 if store:
886 self._server.SelectClient(mid)
887 print('Client %s selected' % mid)
888
889 @Command('shell', 'open a shell or execute a shell command', [
890 Arg('command', metavar='CMD', nargs='?', help='command to execute')])
891 def Shell(self, command=None):
892 if command is None:
893 command = []
894 self.CheckClient()
895
896 headers = []
897 if self._state.username is not None and self._state.password is not None:
898 headers.append(BasicAuthHeader(self._state.username,
899 self._state.password))
900
901 scheme = 'ws%s://' % ('s' if self._state.ssl else '')
902 if len(command) == 0:
903 ws = TerminalWebSocketClient(self._selected_mid,
904 scheme + '%s:%d/api/agent/tty/%s' %
905 (self._state.host, self._state.port,
906 self._selected_mid), headers=headers)
907 else:
908 cmd = ' '.join(command)
909 ws = ShellWebSocketClient(sys.stdout,
910 scheme + '%s:%d/api/agent/shell/%s?command=%s' %
911 (self._state.host, self._state.port,
912 self._selected_mid, urllib2.quote(cmd)),
913 headers=headers)
914 ws.connect()
915 ws.run()
916
917 @Command('push', 'push a file or directory to remote', [
918 Arg('src', metavar='SOURCE'),
919 Arg('dst', metavar='DESTINATION')])
920 def Push(self, args):
921 self.CheckClient()
922
923 if not os.path.exists(args.src):
924 raise RuntimeError('push: can not stat "%s": no such file or directory'
925 % args.src)
926
927 if not os.access(args.src, os.R_OK):
928 raise RuntimeError('push: can not open "%s" for reading' % args.src)
929
Wei-Ning Huangee7ca8d2015-12-12 05:48:02 +0800930 @AutoRetry('push', _RETRY_TIMES)
Wei-Ning Huang91aaeed2015-09-24 14:51:56 +0800931 def _push(src, dst):
932 src_base = os.path.basename(src)
Wei-Ning Huangee7ca8d2015-12-12 05:48:02 +0800933
934 # Local file is a link
935 if os.path.islink(src):
936 pbar = ProgressBar(src_base)
937 link_path = os.readlink(src)
938 self.CheckOutput('mkdir -p %(dirname)s; '
939 'if [ -d "%(dst)s" ]; then '
940 'ln -sf "%(link_path)s" "%(dst)s/%(link_name)s"; '
941 'else ln -sf "%(link_path)s" "%(dst)s"; fi' %
942 dict(dirname=os.path.dirname(dst),
943 link_path=link_path, dst=dst,
944 link_name=src_base))
945 pbar.End()
946 return
947
Wei-Ning Huang91aaeed2015-09-24 14:51:56 +0800948 mode = '0%o' % (0x1FF & os.stat(src).st_mode)
949 url = ('%s:%d/api/agent/upload/%s?dest=%s&perm=%s' %
950 (self._state.host, self._state.port, self._selected_mid, dst,
951 mode))
952 try:
953 self._UrlOpen(url + '&filename=%s' % src_base)
954 except urllib2.HTTPError as e:
955 msg = json.loads(e.read()).get('error', None)
956 raise RuntimeError('push: %s' % msg)
957
958 pbar = ProgressBar(src_base)
959 self._HTTPPostFile(url, src, pbar.SetProgress,
960 self._state.username, self._state.password)
961 pbar.End()
962
963 if os.path.isdir(args.src):
964 dst_exists = ast.literal_eval(self.CheckOutput(
965 'stat %s >/dev/null 2>&1 && echo True || echo False' % args.dst))
966 for root, unused_x, files in os.walk(args.src):
967 # If destination directory does not exist, we should strip the first
968 # layer of directory. For example: src_dir contains a single file 'A'
969 #
970 # push src_dir dest_dir
971 #
972 # If dest_dir exists, the resulting directory structure should be:
973 # dest_dir/src_dir/A
974 # If dest_dir does not exist, the resulting directory structure should
975 # be:
976 # dest_dir/A
977 dst_root = root if dst_exists else root[len(args.src):].lstrip('/')
978 for name in files:
979 _push(os.path.join(root, name),
980 os.path.join(args.dst, dst_root, name))
981 else:
982 _push(args.src, args.dst)
983
984 @Command('pull', 'pull a file or directory from remote', [
985 Arg('src', metavar='SOURCE'),
986 Arg('dst', metavar='DESTINATION', default='.', nargs='?')])
987 def Pull(self, args):
988 self.CheckClient()
989
Wei-Ning Huangee7ca8d2015-12-12 05:48:02 +0800990 @AutoRetry('pull', _RETRY_TIMES)
991 def _pull(src, dst, ftype, perm=0644, link=None):
992 try:
993 os.makedirs(os.path.dirname(dst))
994 except Exception:
995 pass
996
997 src_base = os.path.basename(src)
998
999 # Remote file is a link
1000 if ftype == 'l':
1001 pbar = ProgressBar(src_base)
1002 if os.path.exists(dst):
1003 os.remove(dst)
1004 os.symlink(link, dst)
1005 pbar.End()
1006 return
1007
Wei-Ning Huang91aaeed2015-09-24 14:51:56 +08001008 url = ('%s:%d/api/agent/download/%s?filename=%s' %
1009 (self._state.host, self._state.port, self._selected_mid,
1010 urllib2.quote(src)))
1011 try:
1012 h = self._UrlOpen(url)
1013 except urllib2.HTTPError as e:
1014 msg = json.loads(e.read()).get('error', 'unkown error')
1015 raise RuntimeError('pull: %s' % msg)
1016 except KeyboardInterrupt:
1017 return
1018
Wei-Ning Huangee7ca8d2015-12-12 05:48:02 +08001019 pbar = ProgressBar(src_base)
Wei-Ning Huang91aaeed2015-09-24 14:51:56 +08001020 with open(dst, 'w') as f:
1021 os.fchmod(f.fileno(), perm)
1022 total_size = int(h.headers.get('Content-Length'))
1023 downloaded_size = 0
Wei-Ning Huangee7ca8d2015-12-12 05:48:02 +08001024
Wei-Ning Huang91aaeed2015-09-24 14:51:56 +08001025 while True:
1026 data = h.read(_BUFSIZ)
Wei-Ning Huangee7ca8d2015-12-12 05:48:02 +08001027 if len(data) == 0:
1028 break
Wei-Ning Huang91aaeed2015-09-24 14:51:56 +08001029 downloaded_size += len(data)
1030 pbar.SetProgress(float(downloaded_size) * 100 / total_size,
1031 downloaded_size)
Wei-Ning Huang91aaeed2015-09-24 14:51:56 +08001032 f.write(data)
1033 pbar.End()
1034
1035 # Use find to get a listing of all files under a root directory. The 'stat'
1036 # command is used to retrieve the filename and it's filemode.
1037 output = self.CheckOutput(
1038 'cd $HOME; '
1039 'stat "%(src)s" >/dev/null && '
Wei-Ning Huangee7ca8d2015-12-12 05:48:02 +08001040 'find "%(src)s" \'(\' -type f -o -type l \')\' '
1041 '-printf \'%%m\t%%p\t%%y\t%%l\n\''
Wei-Ning Huang91aaeed2015-09-24 14:51:56 +08001042 % {'src': args.src})
1043
1044 # We got error from the stat command
1045 if output.startswith('stat: '):
1046 sys.stderr.write(output)
1047 return
1048
Wei-Ning Huangee7ca8d2015-12-12 05:48:02 +08001049 entries = output.strip('\n').split('\n')
Wei-Ning Huang91aaeed2015-09-24 14:51:56 +08001050 common_prefix = os.path.dirname(args.src)
1051
1052 if len(entries) == 1:
1053 entry = entries[0]
Wei-Ning Huangee7ca8d2015-12-12 05:48:02 +08001054 perm, src_path, ftype, link = entry.split('\t', -1)
Wei-Ning Huang91aaeed2015-09-24 14:51:56 +08001055 if os.path.isdir(args.dst):
1056 dst = os.path.join(args.dst, os.path.basename(src_path))
1057 else:
1058 dst = args.dst
Wei-Ning Huangee7ca8d2015-12-12 05:48:02 +08001059 _pull(src_path, dst, ftype, int(perm, base=8), link)
Wei-Ning Huang91aaeed2015-09-24 14:51:56 +08001060 else:
1061 if not os.path.exists(args.dst):
1062 common_prefix = args.src
1063
1064 for entry in entries:
Wei-Ning Huangee7ca8d2015-12-12 05:48:02 +08001065 perm, src_path, ftype, link = entry.split('\t', -1)
Wei-Ning Huang91aaeed2015-09-24 14:51:56 +08001066 rel_dst = src_path[len(common_prefix):].lstrip('/')
Wei-Ning Huangee7ca8d2015-12-12 05:48:02 +08001067 _pull(src_path, os.path.join(args.dst, rel_dst), ftype,
1068 int(perm, base=8), link)
Wei-Ning Huang91aaeed2015-09-24 14:51:56 +08001069
1070 @Command('forward', 'forward remote port to local port', [
1071 Arg('--list', dest='list_all', action='store_true', default=False,
1072 help='list all port forwarding sessions'),
1073 Arg('--remove', metavar='LOCAL_PORT', dest='remove', type=int,
1074 default=None,
1075 help='remove port forwarding for local port LOCAL_PORT'),
1076 Arg('--remove-all', dest='remove_all', action='store_true',
1077 default=False, help='remove all port forwarding'),
1078 Arg('remote', metavar='REMOTE_PORT', type=int, nargs='?'),
1079 Arg('local', metavar='LOCAL_PORT', type=int, nargs='?')])
1080 def Forward(self, args):
1081 if args.list_all:
1082 max_len = 10
1083 if len(self._state.forwards):
1084 max_len = max([len(v[0]) for v in self._state.forwards.values()])
1085
1086 print('%-*s %-8s %-8s' % (max_len, 'Client', 'Remote', 'Local'))
1087 for local in sorted(self._state.forwards.keys()):
1088 value = self._state.forwards[local]
1089 print('%-*s %-8s %-8s' % (max_len, value[0], value[1], local))
1090 return
1091
1092 if args.remove_all:
1093 self._server.RemoveAllForward()
1094 return
1095
1096 if args.remove:
1097 self._server.RemoveForward(args.remove)
1098 return
1099
1100 self.CheckClient()
1101
1102 if args.local is None:
1103 args.local = args.remote
1104 remote = int(args.remote)
1105 local = int(args.local)
1106
1107 def HandleConnection(conn):
1108 headers = []
1109 if self._state.username is not None and self._state.password is not None:
1110 headers.append(BasicAuthHeader(self._state.username,
1111 self._state.password))
1112
1113 scheme = 'ws%s://' % ('s' if self._state.ssl else '')
1114 ws = ForwarderWebSocketClient(
1115 conn,
1116 scheme + '%s:%d/api/agent/forward/%s?port=%d' %
1117 (self._state.host, self._state.port, self._selected_mid, remote),
1118 headers=headers)
1119 try:
1120 ws.connect()
1121 ws.run()
1122 except Exception as e:
1123 print('error: %s' % e)
1124 finally:
1125 ws.close()
1126
1127 server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1128 server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
1129 server.bind(('0.0.0.0', local))
1130 server.listen(5)
1131
1132 pid = os.fork()
1133 if pid == 0:
1134 while True:
1135 conn, unused_addr = server.accept()
1136 t = threading.Thread(target=HandleConnection, args=(conn,))
1137 t.daemon = True
1138 t.start()
1139 else:
1140 self._server.AddForward(self._selected_mid, remote, local, pid)
1141
1142
1143def main():
1144 logging.basicConfig(level=logging.INFO)
1145
1146 # Add DaemonState to JSONRPC lib classes
1147 Config.instance().classes.add(DaemonState)
1148
1149 ovl = OverlordCLIClient()
1150 try:
1151 ovl.Main()
1152 except KeyboardInterrupt:
1153 print('Ctrl-C received, abort')
1154 except Exception as e:
1155 print('error: %s' % e)
1156
1157
1158if __name__ == '__main__':
1159 main()