blob: 47e664f33bce473b6e68021df3b81dac44df90cd [file] [log] [blame]
Wei-Ning Huang91aaeed2015-09-24 14:51:56 +08001#!/usr/bin/python -u
2# -*- coding: utf-8 -*-
3#
4# Copyright 2015 The Chromium OS Authors. All rights reserved.
5# Use of this source code is governed by a BSD-style license that can be
6# found in the LICENSE file.
7
8from __future__ import print_function
9
10import argparse
11import ast
12import base64
13import fcntl
14import hashlib
15import httplib
16import json
17import jsonrpclib
18import logging
19import os
20import re
21import select
22import signal
23import socket
24import StringIO
25import struct
26import subprocess
27import sys
28import tempfile
29import termios
30import threading
31import time
32import tty
33import urllib2
34import urlparse
35
36from jsonrpclib.SimpleJSONRPCServer import SimpleJSONRPCServer
37from jsonrpclib.config import Config
38from ws4py.client import WebSocketBaseClient
39
40# Python version >= 2.7.9 enables SSL check by default, bypass it.
41try:
42 import ssl
43 # pylint: disable=W0212
44 ssl._create_default_https_context = ssl._create_unverified_context
45except Exception:
46 pass
47
48
49_ESCAPE = '~'
50_BUFSIZ = 8192
51_OVERLORD_PORT = 4455
52_OVERLORD_HTTP_PORT = 9000
53_OVERLORD_CLIENT_DAEMON_PORT = 4488
54_OVERLORD_CLIENT_DAEMON_RPC_ADDR = ('127.0.0.1', _OVERLORD_CLIENT_DAEMON_PORT)
55
Wei-Ning Huangee7ca8d2015-12-12 05:48:02 +080056_DEFAULT_HTTP_TIMEOUT = 30
Wei-Ning Huang91aaeed2015-09-24 14:51:56 +080057_LIST_CACHE_TIMEOUT = 2
58_DEFAULT_TERMINAL_WIDTH = 80
Wei-Ning Huangee7ca8d2015-12-12 05:48:02 +080059_RETRY_TIMES = 3
Wei-Ning Huang91aaeed2015-09-24 14:51:56 +080060
61# echo -n overlord | md5sum
62_HTTP_BOUNDARY_MAGIC = '9246f080c855a69012707ab53489b921'
63
Wei-Ning Huang9083b7c2016-01-26 16:44:11 +080064# Terminal resize control
Wei-Ning Huang91aaeed2015-09-24 14:51:56 +080065_CONTROL_START = 128
66_CONTROL_END = 129
Wei-Ning Huang9083b7c2016-01-26 16:44:11 +080067
68# Stream control
69_STDIN_CLOSED = '##STDIN_CLOSED##'
70
Wei-Ning Huang91aaeed2015-09-24 14:51:56 +080071_SSH_CONTROL_SOCKET_PREFIX = os.path.join(tempfile.gettempdir(),
72 'ovl-ssh-control-')
73
74# A string that will always be included in the response of
75# GET http://OVERLORD_SERVER:_OVERLORD_HTTP_PORT
76_OVERLORD_RESPONSE_KEYWORD = '<html>'
77
78
79def GetVersionDigest():
80 """Return the sha1sum of the current executing script."""
81 with open(__file__, 'r') as f:
82 return hashlib.sha1(f.read()).hexdigest()
83
84
85def KillGraceful(pid, wait_secs=1):
86 """Kill a process gracefully by first sending SIGTERM, wait for some time,
87 then send SIGKILL to make sure it's killed."""
88 try:
89 os.kill(pid, signal.SIGTERM)
90 time.sleep(wait_secs)
91 os.kill(pid, signal.SIGKILL)
92 except OSError:
93 pass
94
95
Wei-Ning Huangee7ca8d2015-12-12 05:48:02 +080096def AutoRetry(action_name, retries):
97 """Decorator for retry function call."""
98 def Wrap(func):
99 def Loop(*args, **kwargs):
Wei-Ning Huang5564eea2016-01-19 14:36:45 +0800100 for unused_i in range(retries):
Wei-Ning Huangee7ca8d2015-12-12 05:48:02 +0800101 try:
102 func(*args, **kwargs)
103 except Exception as e:
Wei-Ning Huang5564eea2016-01-19 14:36:45 +0800104 print('error: %s: %s: retrying ...' % (args[0], e))
Wei-Ning Huangee7ca8d2015-12-12 05:48:02 +0800105 else:
106 break
107 else:
Wei-Ning Huang5564eea2016-01-19 14:36:45 +0800108 print('error: failed to %s %s' % (action_name, args[0]))
Wei-Ning Huangee7ca8d2015-12-12 05:48:02 +0800109 return Loop
110 return Wrap
111
112
Wei-Ning Huang91aaeed2015-09-24 14:51:56 +0800113def BasicAuthHeader(user, password):
114 """Return HTTP basic auth header."""
115 credential = base64.b64encode('%s:%s' % (user, password))
116 return ('Authorization', 'Basic %s' % credential)
117
118
119def GetTerminalSize():
120 """Retrieve terminal window size."""
121 ws = struct.pack('HHHH', 0, 0, 0, 0)
122 ws = fcntl.ioctl(0, termios.TIOCGWINSZ, ws)
123 lines, columns, unused_x, unused_y = struct.unpack('HHHH', ws)
124 return lines, columns
125
126
127def MakeRequestUrl(state, url):
128 return 'http%s://%s' % ('s' if state.ssl else '', url)
129
130
131class ProgressBar(object):
132 SIZE_WIDTH = 11
133 SPEED_WIDTH = 10
134 DURATION_WIDTH = 6
135 PERCENTAGE_WIDTH = 8
136
137 def __init__(self, name):
138 self._start_time = time.time()
139 self._name = name
140 self._size = 0
141 self._width = 0
142 self._name_width = 0
143 self._name_max = 0
144 self._stat_width = 0
145 self._max = 0
146 self.CalculateSize()
147 self.SetProgress(0)
148
149 def CalculateSize(self):
150 self._width = GetTerminalSize()[1] or _DEFAULT_TERMINAL_WIDTH
151 self._name_width = int(self._width * 0.3)
152 self._name_max = self._name_width
153 self._stat_width = self.SIZE_WIDTH + self.SPEED_WIDTH + self.DURATION_WIDTH
154 self._max = (self._width - self._name_width - self._stat_width -
155 self.PERCENTAGE_WIDTH)
156
157 def SizeToHuman(self, size_in_bytes):
158 if size_in_bytes < 1024:
159 unit = 'B'
160 value = size_in_bytes
161 elif size_in_bytes < 1024 ** 2:
162 unit = 'KiB'
163 value = size_in_bytes / 1024.0
164 elif size_in_bytes < 1024 ** 3:
165 unit = 'MiB'
166 value = size_in_bytes / (1024.0 ** 2)
167 elif size_in_bytes < 1024 ** 4:
168 unit = 'GiB'
169 value = size_in_bytes / (1024.0 ** 3)
170 return ' %6.1f %3s' % (value, unit)
171
172 def SpeedToHuman(self, speed_in_bs):
173 if speed_in_bs < 1024:
174 unit = 'B'
175 value = speed_in_bs
176 elif speed_in_bs < 1024 ** 2:
177 unit = 'K'
178 value = speed_in_bs / 1024.0
179 elif speed_in_bs < 1024 ** 3:
180 unit = 'M'
181 value = speed_in_bs / (1024.0 ** 2)
182 elif speed_in_bs < 1024 ** 4:
183 unit = 'G'
184 value = speed_in_bs / (1024.0 ** 3)
185 return ' %6.1f%s/s' % (value, unit)
186
187 def DurationToClock(self, duration):
188 return ' %02d:%02d' % (duration / 60, duration % 60)
189
190 def SetProgress(self, percentage, size=None):
191 current_width = GetTerminalSize()[1]
192 if self._width != current_width:
193 self.CalculateSize()
194
195 if size is not None:
196 self._size = size
197
198 elapse_time = time.time() - self._start_time
199 speed = self._size / float(elapse_time)
200
201 size_str = self.SizeToHuman(self._size)
202 speed_str = self.SpeedToHuman(speed)
203 elapse_str = self.DurationToClock(elapse_time)
204
205 width = int(self._max * percentage / 100.0)
206 sys.stdout.write(
207 '%*s' % (- self._name_max,
208 self._name if len(self._name) <= self._name_max else
209 self._name[:self._name_max - 4] + ' ...') +
210 size_str + speed_str + elapse_str +
211 ((' [' + '#' * width + ' ' * (self._max - width) + ']' +
212 '%4d%%' % int(percentage)) if self._max > 2 else '') + '\r')
213 sys.stdout.flush()
214
215 def End(self):
216 self.SetProgress(100.0)
217 sys.stdout.write('\n')
218 sys.stdout.flush()
219
220
221class DaemonState(object):
222 """DaemonState is used for storing Overlord state info."""
223 def __init__(self):
224 self.version_sha1sum = GetVersionDigest()
225 self.host = None
226 self.port = None
227 self.ssl = False
228 self.ssh = False
229 self.orig_host = None
230 self.ssh_pid = None
231 self.username = None
232 self.password = None
233 self.selected_mid = None
234 self.forwards = {}
235 self.listing = []
236 self.last_list = 0
237
238
239class OverlordClientDaemon(object):
240 """Overlord Client Daemon."""
241 def __init__(self):
242 self._state = DaemonState()
243 self._server = None
244
245 def Start(self):
246 self.StartRPCServer()
247
248 def StartRPCServer(self):
249 self._server = SimpleJSONRPCServer(_OVERLORD_CLIENT_DAEMON_RPC_ADDR,
250 logRequests=False)
251 exports = [
252 (self.State, 'State'),
253 (self.Ping, 'Ping'),
254 (self.GetPid, 'GetPid'),
255 (self.Connect, 'Connect'),
256 (self.Clients, 'Clients'),
257 (self.SelectClient, 'SelectClient'),
258 (self.AddForward, 'AddForward'),
259 (self.RemoveForward, 'RemoveForward'),
260 (self.RemoveAllForward, 'RemoveAllForward'),
261 ]
262 for func, name in exports:
263 self._server.register_function(func, name)
264
265 pid = os.fork()
266 if pid == 0:
267 self._server.serve_forever()
268
269 @staticmethod
270 def GetRPCServer():
271 """Returns the Overlord client daemon RPC server."""
272 server = jsonrpclib.Server('http://%s:%d' %
273 _OVERLORD_CLIENT_DAEMON_RPC_ADDR)
274 try:
275 server.Ping()
276 except Exception:
277 return None
278 return server
279
280 def State(self):
281 return self._state
282
283 def Ping(self):
284 return True
285
286 def GetPid(self):
287 return os.getpid()
288
289 def _UrlOpen(self, url):
290 """Wrapper for urllib2.urlopen.
291
292 It selects correct HTTP scheme according to self._stat.ssl and add HTTP
293 basic auth headers.
294 """
295 url = MakeRequestUrl(self._state, url)
296 request = urllib2.Request(url)
297 if self._state.username is not None and self._state.password is not None:
298 request.add_header(*BasicAuthHeader(self._state.username,
299 self._state.password))
Wei-Ning Huangee7ca8d2015-12-12 05:48:02 +0800300 return urllib2.urlopen(request, timeout=_DEFAULT_HTTP_TIMEOUT)
Wei-Ning Huang91aaeed2015-09-24 14:51:56 +0800301
302 def _GetJSON(self, path):
303 url = '%s:%d%s' % (self._state.host, self._state.port, path)
304 return json.loads(self._UrlOpen(url).read())
305
306 def Connect(self, host, port=_OVERLORD_HTTP_PORT, ssh_pid=None,
307 username=None, password=None, orig_host=None):
308 self._state.username = username
309 self._state.password = password
310 self._state.host = host
311 self._state.port = port
312 self._state.ssl = False
313 self._state.orig_host = orig_host
314 self._state.ssh_pid = ssh_pid
315 self._state.selected_mid = None
316
317 try:
318 h = self._UrlOpen('%s:%d' % (host, port))
319 # Probably not an HTTP server, try HTTPS
320 if _OVERLORD_RESPONSE_KEYWORD not in h.read():
321 self._state.ssl = True
322 self._UrlOpen('%s:%d' % (host, port))
323 except urllib2.HTTPError as e:
324 logging.exception(e)
325 return e.getcode()
326 except Exception as e:
327 logging.exception(e)
328 return str(e)
329 return True
330
331 def Clients(self):
332 if time.time() - self._state.last_list <= _LIST_CACHE_TIMEOUT:
333 return self._state.listing
334
335 mids = [client['mid'] for client in self._GetJSON('/api/agents/list')]
336 self._state.listing = sorted(list(set(mids)))
337 self._state.last_list = time.time()
338 return self._state.listing
339
340 def SelectClient(self, mid):
341 self._state.selected_mid = mid
342
343 def AddForward(self, mid, remote, local, pid):
344 self._state.forwards[local] = (mid, remote, pid)
345
346 def RemoveForward(self, local_port):
347 try:
348 unused_mid, unused_remote, pid = self._state.forwards[local_port]
349 KillGraceful(pid)
350 del self._state.forwards[local_port]
351 except (KeyError, OSError):
352 pass
353
354 def RemoveAllForward(self):
355 for unused_mid, unused_remote, pid in self._state.forwards.values():
356 try:
357 KillGraceful(pid)
358 except OSError:
359 pass
360 self._state.forwards = {}
361
362
363class TerminalWebSocketClient(WebSocketBaseClient):
364 def __init__(self, mid, *args, **kwargs):
365 super(TerminalWebSocketClient, self).__init__(*args, **kwargs)
366 self._mid = mid
367 self._stdin_fd = sys.stdin.fileno()
368 self._old_termios = None
369
370 def handshake_ok(self):
371 pass
372
373 def opened(self):
374 nonlocals = {'size': (80, 40)}
375
376 def _ResizeWindow():
377 size = GetTerminalSize()
378 if size != nonlocals['size']: # Size not changed, ignore
379 control = {'command': 'resize', 'params': list(size)}
380 payload = chr(_CONTROL_START) + json.dumps(control) + chr(_CONTROL_END)
381 nonlocals['size'] = size
382 try:
383 self.send(payload, binary=True)
384 except Exception:
385 pass
386
387 def _FeedInput():
Wei-Ning Huang91aaeed2015-09-24 14:51:56 +0800388 self._old_termios = termios.tcgetattr(self._stdin_fd)
389 tty.setraw(self._stdin_fd)
390
391 READY, ENTER_PRESSED, ESCAPE_PRESSED = range(3)
392
393 try:
394 state = READY
395 while True:
Wei-Ning Huang85e763d2016-01-21 15:53:18 +0800396 # Check if terminal is resized
Wei-Ning Huang91aaeed2015-09-24 14:51:56 +0800397 _ResizeWindow()
398
Wei-Ning Huang85e763d2016-01-21 15:53:18 +0800399 ch = sys.stdin.read(1)
Wei-Ning Huang91aaeed2015-09-24 14:51:56 +0800400
Wei-Ning Huang85e763d2016-01-21 15:53:18 +0800401 # Scan for escape sequence
402 if state == READY:
403 state = ENTER_PRESSED if ch == chr(0x0d) else READY
404 elif state == ENTER_PRESSED:
405 state = ESCAPE_PRESSED if ch == _ESCAPE else READY
406 elif state == ESCAPE_PRESSED:
407 if ch == '.':
408 self.close()
Wei-Ning Huang9083b7c2016-01-26 16:44:11 +0800409 break
Wei-Ning Huang85e763d2016-01-21 15:53:18 +0800410 else:
411 state = READY
Wei-Ning Huang91aaeed2015-09-24 14:51:56 +0800412
Wei-Ning Huang85e763d2016-01-21 15:53:18 +0800413 self.send(ch)
Wei-Ning Huang91aaeed2015-09-24 14:51:56 +0800414 except (KeyboardInterrupt, RuntimeError):
415 pass
416
417 t = threading.Thread(target=_FeedInput)
418 t.daemon = True
419 t.start()
420
421 def closed(self, code, reason=None):
422 termios.tcsetattr(self._stdin_fd, termios.TCSANOW, self._old_termios)
423 print('Connection to %s closed.' % self._mid)
424
425 def received_message(self, msg):
426 if msg.is_binary:
427 sys.stdout.write(msg.data)
428 sys.stdout.flush()
429
430
431class ShellWebSocketClient(WebSocketBaseClient):
432 def __init__(self, output, *args, **kwargs):
433 """Constructor.
434
435 Args:
436 output: output file object.
437 """
438 self.output = output
439 super(ShellWebSocketClient, self).__init__(*args, **kwargs)
440
441 def handshake_ok(self):
442 pass
443
444 def opened(self):
Wei-Ning Huang9083b7c2016-01-26 16:44:11 +0800445 def _FeedInput():
446 try:
447 while True:
448 data = sys.stdin.read(1)
449
450 if len(data) == 0:
451 self.send(_STDIN_CLOSED * 2)
452 break
453 self.send(data, binary=True)
454 except (KeyboardInterrupt, RuntimeError):
455 pass
456
457 t = threading.Thread(target=_FeedInput)
458 t.daemon = True
459 t.start()
Wei-Ning Huang91aaeed2015-09-24 14:51:56 +0800460
461 def closed(self, code, reason=None):
462 pass
463
464 def received_message(self, msg):
465 if msg.is_binary:
466 self.output.write(msg.data)
467 self.output.flush()
468
469
470class ForwarderWebSocketClient(WebSocketBaseClient):
471 def __init__(self, sock, *args, **kwargs):
472 super(ForwarderWebSocketClient, self).__init__(*args, **kwargs)
473 self._sock = sock
474 self._stop = threading.Event()
475
476 def handshake_ok(self):
477 pass
478
479 def opened(self):
480 def _FeedInput():
481 try:
482 self._sock.setblocking(False)
483 while True:
484 rd, unused_w, unused_x = select.select([self._sock], [], [], 0.5)
485 if self._stop.is_set():
486 break
487 if self._sock in rd:
488 data = self._sock.recv(_BUFSIZ)
489 if len(data) == 0:
Wei-Ning Huang9083b7c2016-01-26 16:44:11 +0800490 self.close()
Wei-Ning Huang91aaeed2015-09-24 14:51:56 +0800491 break
492 self.send(data, binary=True)
493 except Exception:
494 pass
495 finally:
496 self._sock.close()
Wei-Ning Huang91aaeed2015-09-24 14:51:56 +0800497
498 t = threading.Thread(target=_FeedInput)
499 t.daemon = True
500 t.start()
501
502 def closed(self, code, reason=None):
503 self._stop.set()
504 sys.exit(0)
505
506 def received_message(self, msg):
507 if msg.is_binary:
508 self._sock.send(msg.data)
509
510
511def Arg(*args, **kwargs):
512 return (args, kwargs)
513
514
515def Command(command, help_msg=None, args=None):
516 """Decorator for adding argparse parameter for a method."""
517 if args is None:
518 args = []
519 def WrapFunc(func):
520 def Wrapped(*args, **kwargs):
521 return func(*args, **kwargs)
522 # pylint: disable=W0212
523 Wrapped.__arg_attr = {'command': command, 'help': help_msg, 'args': args}
524 return Wrapped
525 return WrapFunc
526
527
528def ParseMethodSubCommands(cls):
529 """Decorator for a class using the @Command decorator.
530
531 This decorator retrieve command info from each method and append it in to the
532 SUBCOMMANDS class variable, which is later used to construct parser.
533 """
534 for unused_key, method in cls.__dict__.iteritems():
535 if hasattr(method, '__arg_attr'):
536 cls.SUBCOMMANDS.append(method.__arg_attr) # pylint: disable=W0212
537 return cls
538
539
540@ParseMethodSubCommands
541class OverlordCLIClient(object):
542 """Overlord command line interface client."""
543
544 SUBCOMMANDS = []
545
546 def __init__(self):
547 self._parser = self._BuildParser()
548 self._selected_mid = None
549 self._server = None
550 self._state = None
551
552 def _BuildParser(self):
553 root_parser = argparse.ArgumentParser(prog='ovl')
554 subparsers = root_parser.add_subparsers(help='sub-command')
555
556 root_parser.add_argument('-s', dest='selected_mid', action='store',
557 default=None,
558 help='select target to execute command on')
559 root_parser.add_argument('-S', dest='select_mid_before_action',
560 action='store_true', default=False,
561 help='select target before executing command')
562
563 for attr in self.SUBCOMMANDS:
564 parser = subparsers.add_parser(attr['command'], help=attr['help'])
565 parser.set_defaults(which=attr['command'])
566 for arg in attr['args']:
567 parser.add_argument(*arg[0], **arg[1])
568
569 return root_parser
570
571 def Main(self):
572 # We want to pass the rest of arguments after shell command directly to the
573 # function without parsing it.
574 try:
575 index = sys.argv.index('shell')
576 except ValueError:
577 args = self._parser.parse_args()
578 else:
579 args = self._parser.parse_args(sys.argv[1:index + 1])
580
581 command = args.which
582 self._selected_mid = args.selected_mid
583
Wei-Ning Huang5564eea2016-01-19 14:36:45 +0800584 if command == 'start-server':
585 self.StartServer()
586 return
587 elif command == 'kill-server':
Wei-Ning Huang91aaeed2015-09-24 14:51:56 +0800588 self.KillServer()
589 return
590
Wei-Ning Huang5564eea2016-01-19 14:36:45 +0800591 self.CheckDaemon()
Wei-Ning Huang91aaeed2015-09-24 14:51:56 +0800592 if command == 'status':
593 self.Status()
594 return
595 elif command == 'connect':
596 self.Connect(args)
597 return
598
599 # The following command requires connection to the server
600 self.CheckConnection()
601
602 if args.select_mid_before_action:
603 self.SelectClient(store=False)
604
605 if command == 'select':
606 self.SelectClient(args)
607 elif command == 'ls':
608 self.ListClients()
609 elif command == 'shell':
610 command = sys.argv[sys.argv.index('shell') + 1:]
611 self.Shell(command)
612 elif command == 'push':
613 self.Push(args)
614 elif command == 'pull':
615 self.Pull(args)
616 elif command == 'forward':
617 self.Forward(args)
618
619 def _UrlOpen(self, url):
620 url = MakeRequestUrl(self._state, url)
621 request = urllib2.Request(url)
622 if self._state.username is not None and self._state.password is not None:
623 request.add_header(*BasicAuthHeader(self._state.username,
624 self._state.password))
Wei-Ning Huangee7ca8d2015-12-12 05:48:02 +0800625 return urllib2.urlopen(request, timeout=_DEFAULT_HTTP_TIMEOUT)
Wei-Ning Huang91aaeed2015-09-24 14:51:56 +0800626
627 def _HTTPPostFile(self, url, filename, progress=None, user=None, passwd=None):
628 """Perform HTTP POST and upload file to Overlord.
629
630 To minimize the external dependencies, we construct the HTTP post request
631 by ourselves.
632 """
633 url = MakeRequestUrl(self._state, url)
634 size = os.stat(filename).st_size
635 boundary = '-----------%s' % _HTTP_BOUNDARY_MAGIC
636 CRLF = '\r\n'
637 parse = urlparse.urlparse(url)
638
639 part_headers = [
640 '--' + boundary,
641 'Content-Disposition: form-data; name="file"; '
642 'filename="%s"' % os.path.basename(filename),
643 'Content-Type: application/octet-stream',
644 '', ''
645 ]
646 part_header = CRLF.join(part_headers)
647 end_part = CRLF + '--' + boundary + '--' + CRLF
648
649 content_length = len(part_header) + size + len(end_part)
650 if parse.scheme == 'http':
651 h = httplib.HTTP(parse.netloc)
652 else:
653 h = httplib.HTTPS(parse.netloc)
654
655 post_path = url[url.index(parse.netloc) + len(parse.netloc):]
656 h.putrequest('POST', post_path)
657 h.putheader('Content-Length', content_length)
658 h.putheader('Content-Type', 'multipart/form-data; boundary=%s' % boundary)
659
660 if user and passwd:
661 h.putheader(*BasicAuthHeader(user, passwd))
662 h.endheaders()
663 h.send(part_header)
664
665 count = 0
666 with open(filename, 'r') as f:
667 while True:
668 data = f.read(_BUFSIZ)
669 if not data:
670 break
671 count += len(data)
672 if progress:
673 progress(int(count * 100.0 / size), count)
674 h.send(data)
675
676 h.send(end_part)
677 progress(100)
678
679 if count != size:
680 logging.warning('file changed during upload, upload may be truncated.')
681
682 errcode, unused_x, unused_y = h.getreply()
683 return errcode == 200
684
Wei-Ning Huang5564eea2016-01-19 14:36:45 +0800685 def CheckDaemon(self):
Wei-Ning Huang91aaeed2015-09-24 14:51:56 +0800686 self._server = OverlordClientDaemon.GetRPCServer()
687 if self._server is None:
688 print('* daemon not running, starting it now on port %d ... *' %
689 _OVERLORD_CLIENT_DAEMON_PORT)
Wei-Ning Huang5564eea2016-01-19 14:36:45 +0800690 self.StartServer()
Wei-Ning Huang91aaeed2015-09-24 14:51:56 +0800691
692 self._state = self._server.State()
693 sha1sum = GetVersionDigest()
694
695 if sha1sum != self._state.version_sha1sum:
696 print('ovl server is out of date. killing...')
697 KillGraceful(self._server.GetPid())
Wei-Ning Huang5564eea2016-01-19 14:36:45 +0800698 self.StartServer()
Wei-Ning Huang91aaeed2015-09-24 14:51:56 +0800699
700 def GetSSHControlFile(self, host):
701 return _SSH_CONTROL_SOCKET_PREFIX + host
702
703 def SSHTunnel(self, user, host, port):
704 """SSH forward the remote overlord server.
705
706 Overlord server may not have port 9000 open to the public network, in such
707 case we can SSH forward the port to localhost.
708 """
709
710 control_file = self.GetSSHControlFile(host)
711 try:
712 os.unlink(control_file)
713 except Exception:
714 pass
715
716 subprocess.Popen([
717 'ssh', '-Nf',
718 '-M', # Enable master mode
719 '-S', control_file,
720 '-L', '9000:localhost:9000',
721 '-p', str(port),
722 '%s%s' % (user + '@' if user else '', host)
723 ]).wait()
724
725 p = subprocess.Popen([
726 'ssh',
727 '-S', control_file,
728 '-O', 'check', host,
729 ], stderr=subprocess.PIPE)
730 unused_stdout, stderr = p.communicate()
731
732 s = re.search(r'pid=(\d+)', stderr)
733 if s:
734 return int(s.group(1))
735
736 raise RuntimeError('can not establish ssh connection')
737
738 def CheckConnection(self):
739 if self._state.host is None:
740 raise RuntimeError('not connected to any server, abort')
741
742 try:
743 self._server.Clients()
744 except Exception:
745 raise RuntimeError('remote server disconnected, abort')
746
747 if self._state.ssh_pid is not None:
748 ret = subprocess.Popen(['kill', '-0', str(self._state.ssh_pid)],
749 stdout=subprocess.PIPE,
750 stderr=subprocess.PIPE).wait()
751 if ret != 0:
752 raise RuntimeError('ssh tunnel disconnected, please re-connect')
753
754 def CheckClient(self):
755 if self._selected_mid is None:
756 if self._state.selected_mid is None:
757 raise RuntimeError('No client is selected')
758 self._selected_mid = self._state.selected_mid
759
760 if self._selected_mid not in self._server.Clients():
761 raise RuntimeError('client %s disappeared' % self._selected_mid)
762
763 def CheckOutput(self, command):
764 headers = []
765 if self._state.username is not None and self._state.password is not None:
766 headers.append(BasicAuthHeader(self._state.username,
767 self._state.password))
768
769 scheme = 'ws%s://' % ('s' if self._state.ssl else '')
770 sio = StringIO.StringIO()
771 ws = ShellWebSocketClient(sio,
772 scheme + '%s:%d/api/agent/shell/%s?command=%s' %
773 (self._state.host, self._state.port,
774 self._selected_mid, urllib2.quote(command)),
775 headers=headers)
776 ws.connect()
777 ws.run()
778 return sio.getvalue()
779
780 @Command('status', 'show Overlord connection status')
781 def Status(self):
782 if self._state.host is None:
783 print('Not connected to any host.')
784 else:
785 if self._state.ssh_pid is not None:
786 print('Connected to %s with SSH tunneling.' % self._state.orig_host)
787 else:
788 print('Connected to %s:%d.' % (self._state.host, self._state.port))
789
790 if self._selected_mid is None:
791 self._selected_mid = self._state.selected_mid
792
793 if self._selected_mid is None:
794 print('No client is selected.')
795 else:
796 print('Client %s selected.' % self._selected_mid)
797
798 @Command('connect', 'connect to Overlord server', [
799 Arg('host', metavar='HOST', type=str, default='localhost',
800 help='Overlord hostname/IP'),
801 Arg('port', metavar='PORT', type=int,
802 default=_OVERLORD_HTTP_PORT, help='Overlord port'),
803 Arg('-f', '--forward', dest='ssh_forward', default=False,
804 action='store_true',
805 help='connect with SSH forwarding to the host'),
806 Arg('-p', '--ssh-port', dest='ssh_port', default=22,
807 type=int, help='SSH server port for SSH forwarding'),
808 Arg('-l', '--ssh-login', dest='ssh_login', default='',
809 type=str, help='SSH server login name for SSH forwarding'),
810 Arg('-u', '--user', dest='user', default=None,
811 type=str, help='Overlord HTTP auth username'),
812 Arg('-w', '--passwd', dest='passwd', default=None, type=str,
813 help='Overlord HTTP auth password')])
814 def Connect(self, args):
815 ssh_pid = None
816 host = args.host
817 orig_host = args.host
818
819 if args.ssh_forward:
820 # Kill previous SSH tunnel
821 self.KillSSHTunnel()
822
823 ssh_pid = self.SSHTunnel(args.ssh_login, args.host, args.ssh_port)
824 host = 'localhost'
825
826 status = self._server.Connect(host, args.port, ssh_pid, args.user,
827 args.passwd, orig_host)
828 if status is not True:
829 if isinstance(status, int):
830 if status == 401:
831 msg = '401 Unauthorized'
832 else:
833 msg = 'HTTP %d' % status
834 else:
835 msg = status
836 print('can not connect to %s: %s' % (host, msg))
837
Wei-Ning Huang5564eea2016-01-19 14:36:45 +0800838 @Command('start-server', 'start overlord CLI client server')
839 def StartServer(self):
840 self._server = OverlordClientDaemon.GetRPCServer()
841 if self._server is None:
842 OverlordClientDaemon().Start()
843 time.sleep(1)
844 self._server = OverlordClientDaemon.GetRPCServer()
845 if self._server is not None:
846 print('* daemon started successfully *')
847
Wei-Ning Huang91aaeed2015-09-24 14:51:56 +0800848 @Command('kill-server', 'kill overlord CLI client server')
849 def KillServer(self):
850 self._server = OverlordClientDaemon.GetRPCServer()
851 if self._server is None:
852 return
853
Wei-Ning Huangee7ca8d2015-12-12 05:48:02 +0800854 self._state = self._server.State()
855
Wei-Ning Huang91aaeed2015-09-24 14:51:56 +0800856 # Kill SSH Tunnel
857 self.KillSSHTunnel()
858
859 # Kill server daemon
860 KillGraceful(self._server.GetPid())
861
862 def KillSSHTunnel(self):
863 if self._state.ssh_pid is not None:
864 KillGraceful(self._state.ssh_pid)
865
866 @Command('ls', 'list all clients')
867 def ListClients(self):
868 for client in self._server.Clients():
869 print(client)
870
871 @Command('select', 'select default client', [
872 Arg('mid', metavar='mid', nargs='?', default=None)])
873 def SelectClient(self, args=None, store=True):
874 clients = self._server.Clients()
875
876 mid = args.mid if args is not None else None
877 if mid is None:
878 print('Select from the following clients:')
879 for i, client in enumerate(clients):
880 print(' %d. %s' % (i + 1, client))
881
882 print('\nSelection: ', end='')
883 try:
884 choice = int(raw_input()) - 1
885 mid = clients[choice]
886 except ValueError:
887 raise RuntimeError('select: invalid selection')
888 except IndexError:
889 raise RuntimeError('select: selection out of range')
890 else:
891 if mid not in clients:
892 raise RuntimeError('select: client %s does not exist' % mid)
893
894 self._selected_mid = mid
895 if store:
896 self._server.SelectClient(mid)
897 print('Client %s selected' % mid)
898
899 @Command('shell', 'open a shell or execute a shell command', [
900 Arg('command', metavar='CMD', nargs='?', help='command to execute')])
901 def Shell(self, command=None):
902 if command is None:
903 command = []
904 self.CheckClient()
905
906 headers = []
907 if self._state.username is not None and self._state.password is not None:
908 headers.append(BasicAuthHeader(self._state.username,
909 self._state.password))
910
911 scheme = 'ws%s://' % ('s' if self._state.ssl else '')
912 if len(command) == 0:
913 ws = TerminalWebSocketClient(self._selected_mid,
914 scheme + '%s:%d/api/agent/tty/%s' %
915 (self._state.host, self._state.port,
916 self._selected_mid), headers=headers)
917 else:
918 cmd = ' '.join(command)
919 ws = ShellWebSocketClient(sys.stdout,
920 scheme + '%s:%d/api/agent/shell/%s?command=%s' %
921 (self._state.host, self._state.port,
922 self._selected_mid, urllib2.quote(cmd)),
923 headers=headers)
Wei-Ning Huang9083b7c2016-01-26 16:44:11 +0800924 try:
925 ws.connect()
926 ws.run()
927 except socket.error as e:
928 if e.errno == 32: # Broken pipe
929 pass
930 else:
931 raise
Wei-Ning Huang91aaeed2015-09-24 14:51:56 +0800932
933 @Command('push', 'push a file or directory to remote', [
Wei-Ning Huang37e61102016-02-07 13:41:07 +0800934 Arg('srcs', nargs='+', metavar='SOURCE'),
Wei-Ning Huang91aaeed2015-09-24 14:51:56 +0800935 Arg('dst', metavar='DESTINATION')])
936 def Push(self, args):
937 self.CheckClient()
938
Wei-Ning Huangee7ca8d2015-12-12 05:48:02 +0800939 @AutoRetry('push', _RETRY_TIMES)
Wei-Ning Huang91aaeed2015-09-24 14:51:56 +0800940 def _push(src, dst):
941 src_base = os.path.basename(src)
Wei-Ning Huangee7ca8d2015-12-12 05:48:02 +0800942
943 # Local file is a link
944 if os.path.islink(src):
945 pbar = ProgressBar(src_base)
946 link_path = os.readlink(src)
947 self.CheckOutput('mkdir -p %(dirname)s; '
948 'if [ -d "%(dst)s" ]; then '
949 'ln -sf "%(link_path)s" "%(dst)s/%(link_name)s"; '
950 'else ln -sf "%(link_path)s" "%(dst)s"; fi' %
951 dict(dirname=os.path.dirname(dst),
952 link_path=link_path, dst=dst,
953 link_name=src_base))
954 pbar.End()
955 return
956
Wei-Ning Huang91aaeed2015-09-24 14:51:56 +0800957 mode = '0%o' % (0x1FF & os.stat(src).st_mode)
958 url = ('%s:%d/api/agent/upload/%s?dest=%s&perm=%s' %
959 (self._state.host, self._state.port, self._selected_mid, dst,
960 mode))
961 try:
962 self._UrlOpen(url + '&filename=%s' % src_base)
963 except urllib2.HTTPError as e:
964 msg = json.loads(e.read()).get('error', None)
965 raise RuntimeError('push: %s' % msg)
966
967 pbar = ProgressBar(src_base)
968 self._HTTPPostFile(url, src, pbar.SetProgress,
969 self._state.username, self._state.password)
970 pbar.End()
971
Wei-Ning Huang37e61102016-02-07 13:41:07 +0800972 def _push_single_target(src, dst):
973 if os.path.isdir(src):
974 dst_exists = ast.literal_eval(self.CheckOutput(
975 'stat %s >/dev/null 2>&1 && echo True || echo False' % dst))
976 for root, unused_x, files in os.walk(src):
977 # If destination directory does not exist, we should strip the first
978 # layer of directory. For example: src_dir contains a single file 'A'
979 #
980 # push src_dir dest_dir
981 #
982 # If dest_dir exists, the resulting directory structure should be:
983 # dest_dir/src_dir/A
984 # If dest_dir does not exist, the resulting directory structure should
985 # be:
986 # dest_dir/A
987 dst_root = root if dst_exists else root[len(src):].lstrip('/')
988 for name in files:
989 _push(os.path.join(root, name),
990 os.path.join(dst, dst_root, name))
991 else:
992 _push(src, dst)
993
994 if len(args.srcs) > 1:
995 dst_type = self.CheckOutput('stat \'%s\' --printf \'%%F\' '
996 '2>/dev/null' % args.dst).strip()
997 if not dst_type:
998 raise RuntimeError('push: %s: No such file or directory' % args.dst)
999 if dst_type != 'directory':
1000 raise RuntimeError('push: %s: Not a directory' % args.dst)
1001
1002 for src in args.srcs:
1003 if not os.path.exists(src):
1004 raise RuntimeError('push: can not stat "%s": no such file or directory'
1005 % src)
1006 if not os.access(src, os.R_OK):
1007 raise RuntimeError('push: can not open "%s" for reading' % src)
1008
1009 _push_single_target(src, args.dst)
Wei-Ning Huang91aaeed2015-09-24 14:51:56 +08001010
1011 @Command('pull', 'pull a file or directory from remote', [
1012 Arg('src', metavar='SOURCE'),
1013 Arg('dst', metavar='DESTINATION', default='.', nargs='?')])
1014 def Pull(self, args):
1015 self.CheckClient()
1016
Wei-Ning Huangee7ca8d2015-12-12 05:48:02 +08001017 @AutoRetry('pull', _RETRY_TIMES)
1018 def _pull(src, dst, ftype, perm=0644, link=None):
1019 try:
1020 os.makedirs(os.path.dirname(dst))
1021 except Exception:
1022 pass
1023
1024 src_base = os.path.basename(src)
1025
1026 # Remote file is a link
1027 if ftype == 'l':
1028 pbar = ProgressBar(src_base)
1029 if os.path.exists(dst):
1030 os.remove(dst)
1031 os.symlink(link, dst)
1032 pbar.End()
1033 return
1034
Wei-Ning Huang91aaeed2015-09-24 14:51:56 +08001035 url = ('%s:%d/api/agent/download/%s?filename=%s' %
1036 (self._state.host, self._state.port, self._selected_mid,
1037 urllib2.quote(src)))
1038 try:
1039 h = self._UrlOpen(url)
1040 except urllib2.HTTPError as e:
1041 msg = json.loads(e.read()).get('error', 'unkown error')
1042 raise RuntimeError('pull: %s' % msg)
1043 except KeyboardInterrupt:
1044 return
1045
Wei-Ning Huangee7ca8d2015-12-12 05:48:02 +08001046 pbar = ProgressBar(src_base)
Wei-Ning Huang91aaeed2015-09-24 14:51:56 +08001047 with open(dst, 'w') as f:
1048 os.fchmod(f.fileno(), perm)
1049 total_size = int(h.headers.get('Content-Length'))
1050 downloaded_size = 0
Wei-Ning Huangee7ca8d2015-12-12 05:48:02 +08001051
Wei-Ning Huang91aaeed2015-09-24 14:51:56 +08001052 while True:
1053 data = h.read(_BUFSIZ)
Wei-Ning Huangee7ca8d2015-12-12 05:48:02 +08001054 if len(data) == 0:
1055 break
Wei-Ning Huang91aaeed2015-09-24 14:51:56 +08001056 downloaded_size += len(data)
1057 pbar.SetProgress(float(downloaded_size) * 100 / total_size,
1058 downloaded_size)
Wei-Ning Huang91aaeed2015-09-24 14:51:56 +08001059 f.write(data)
1060 pbar.End()
1061
1062 # Use find to get a listing of all files under a root directory. The 'stat'
1063 # command is used to retrieve the filename and it's filemode.
1064 output = self.CheckOutput(
1065 'cd $HOME; '
1066 'stat "%(src)s" >/dev/null && '
Wei-Ning Huangee7ca8d2015-12-12 05:48:02 +08001067 'find "%(src)s" \'(\' -type f -o -type l \')\' '
1068 '-printf \'%%m\t%%p\t%%y\t%%l\n\''
Wei-Ning Huang91aaeed2015-09-24 14:51:56 +08001069 % {'src': args.src})
1070
1071 # We got error from the stat command
1072 if output.startswith('stat: '):
1073 sys.stderr.write(output)
1074 return
1075
Wei-Ning Huangee7ca8d2015-12-12 05:48:02 +08001076 entries = output.strip('\n').split('\n')
Wei-Ning Huang91aaeed2015-09-24 14:51:56 +08001077 common_prefix = os.path.dirname(args.src)
1078
1079 if len(entries) == 1:
1080 entry = entries[0]
Wei-Ning Huangee7ca8d2015-12-12 05:48:02 +08001081 perm, src_path, ftype, link = entry.split('\t', -1)
Wei-Ning Huang91aaeed2015-09-24 14:51:56 +08001082 if os.path.isdir(args.dst):
1083 dst = os.path.join(args.dst, os.path.basename(src_path))
1084 else:
1085 dst = args.dst
Wei-Ning Huangee7ca8d2015-12-12 05:48:02 +08001086 _pull(src_path, dst, ftype, int(perm, base=8), link)
Wei-Ning Huang91aaeed2015-09-24 14:51:56 +08001087 else:
1088 if not os.path.exists(args.dst):
1089 common_prefix = args.src
1090
1091 for entry in entries:
Wei-Ning Huangee7ca8d2015-12-12 05:48:02 +08001092 perm, src_path, ftype, link = entry.split('\t', -1)
Wei-Ning Huang91aaeed2015-09-24 14:51:56 +08001093 rel_dst = src_path[len(common_prefix):].lstrip('/')
Wei-Ning Huangee7ca8d2015-12-12 05:48:02 +08001094 _pull(src_path, os.path.join(args.dst, rel_dst), ftype,
1095 int(perm, base=8), link)
Wei-Ning Huang91aaeed2015-09-24 14:51:56 +08001096
1097 @Command('forward', 'forward remote port to local port', [
1098 Arg('--list', dest='list_all', action='store_true', default=False,
1099 help='list all port forwarding sessions'),
1100 Arg('--remove', metavar='LOCAL_PORT', dest='remove', type=int,
1101 default=None,
1102 help='remove port forwarding for local port LOCAL_PORT'),
1103 Arg('--remove-all', dest='remove_all', action='store_true',
1104 default=False, help='remove all port forwarding'),
1105 Arg('remote', metavar='REMOTE_PORT', type=int, nargs='?'),
1106 Arg('local', metavar='LOCAL_PORT', type=int, nargs='?')])
1107 def Forward(self, args):
1108 if args.list_all:
1109 max_len = 10
1110 if len(self._state.forwards):
1111 max_len = max([len(v[0]) for v in self._state.forwards.values()])
1112
1113 print('%-*s %-8s %-8s' % (max_len, 'Client', 'Remote', 'Local'))
1114 for local in sorted(self._state.forwards.keys()):
1115 value = self._state.forwards[local]
1116 print('%-*s %-8s %-8s' % (max_len, value[0], value[1], local))
1117 return
1118
1119 if args.remove_all:
1120 self._server.RemoveAllForward()
1121 return
1122
1123 if args.remove:
1124 self._server.RemoveForward(args.remove)
1125 return
1126
1127 self.CheckClient()
1128
Wei-Ning Huang9083b7c2016-01-26 16:44:11 +08001129 if args.remote is None:
1130 raise RuntimeError('remote port not specified')
1131
Wei-Ning Huang91aaeed2015-09-24 14:51:56 +08001132 if args.local is None:
1133 args.local = args.remote
1134 remote = int(args.remote)
1135 local = int(args.local)
1136
1137 def HandleConnection(conn):
1138 headers = []
1139 if self._state.username is not None and self._state.password is not None:
1140 headers.append(BasicAuthHeader(self._state.username,
1141 self._state.password))
1142
1143 scheme = 'ws%s://' % ('s' if self._state.ssl else '')
1144 ws = ForwarderWebSocketClient(
1145 conn,
1146 scheme + '%s:%d/api/agent/forward/%s?port=%d' %
1147 (self._state.host, self._state.port, self._selected_mid, remote),
1148 headers=headers)
1149 try:
1150 ws.connect()
1151 ws.run()
1152 except Exception as e:
1153 print('error: %s' % e)
1154 finally:
1155 ws.close()
1156
1157 server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1158 server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
1159 server.bind(('0.0.0.0', local))
1160 server.listen(5)
1161
1162 pid = os.fork()
1163 if pid == 0:
1164 while True:
1165 conn, unused_addr = server.accept()
1166 t = threading.Thread(target=HandleConnection, args=(conn,))
1167 t.daemon = True
1168 t.start()
1169 else:
1170 self._server.AddForward(self._selected_mid, remote, local, pid)
1171
1172
1173def main():
1174 logging.basicConfig(level=logging.INFO)
1175
1176 # Add DaemonState to JSONRPC lib classes
1177 Config.instance().classes.add(DaemonState)
1178
1179 ovl = OverlordCLIClient()
1180 try:
1181 ovl.Main()
1182 except KeyboardInterrupt:
1183 print('Ctrl-C received, abort')
1184 except Exception as e:
1185 print('error: %s' % e)
1186
1187
1188if __name__ == '__main__':
1189 main()