blob: d31cb0f572d588b13b665a062dd8522e5e7ee5b5 [file] [log] [blame]
Wei-Ning Huang91aaeed2015-09-24 14:51:56 +08001#!/usr/bin/python -u
2# -*- coding: utf-8 -*-
3#
4# Copyright 2015 The Chromium OS Authors. All rights reserved.
5# Use of this source code is governed by a BSD-style license that can be
6# found in the LICENSE file.
7
8from __future__ import print_function
9
10import argparse
11import ast
12import base64
13import fcntl
14import hashlib
15import httplib
16import json
17import jsonrpclib
18import logging
19import os
20import re
21import select
22import signal
23import socket
24import StringIO
25import struct
26import subprocess
27import sys
28import tempfile
29import termios
30import threading
31import time
32import tty
33import urllib2
34import urlparse
35
36from jsonrpclib.SimpleJSONRPCServer import SimpleJSONRPCServer
37from jsonrpclib.config import Config
38from ws4py.client import WebSocketBaseClient
39
40# Python version >= 2.7.9 enables SSL check by default, bypass it.
41try:
42 import ssl
43 # pylint: disable=W0212
44 ssl._create_default_https_context = ssl._create_unverified_context
45except Exception:
46 pass
47
48
49_ESCAPE = '~'
50_BUFSIZ = 8192
51_OVERLORD_PORT = 4455
52_OVERLORD_HTTP_PORT = 9000
53_OVERLORD_CLIENT_DAEMON_PORT = 4488
54_OVERLORD_CLIENT_DAEMON_RPC_ADDR = ('127.0.0.1', _OVERLORD_CLIENT_DAEMON_PORT)
55
Wei-Ning Huangee7ca8d2015-12-12 05:48:02 +080056_DEFAULT_HTTP_TIMEOUT = 30
Wei-Ning Huang91aaeed2015-09-24 14:51:56 +080057_LIST_CACHE_TIMEOUT = 2
58_DEFAULT_TERMINAL_WIDTH = 80
Wei-Ning Huangee7ca8d2015-12-12 05:48:02 +080059_RETRY_TIMES = 3
Wei-Ning Huang91aaeed2015-09-24 14:51:56 +080060
61# echo -n overlord | md5sum
62_HTTP_BOUNDARY_MAGIC = '9246f080c855a69012707ab53489b921'
63
Wei-Ning Huang9083b7c2016-01-26 16:44:11 +080064# Terminal resize control
Wei-Ning Huang91aaeed2015-09-24 14:51:56 +080065_CONTROL_START = 128
66_CONTROL_END = 129
Wei-Ning Huang9083b7c2016-01-26 16:44:11 +080067
68# Stream control
69_STDIN_CLOSED = '##STDIN_CLOSED##'
70
Wei-Ning Huang91aaeed2015-09-24 14:51:56 +080071_SSH_CONTROL_SOCKET_PREFIX = os.path.join(tempfile.gettempdir(),
72 'ovl-ssh-control-')
73
74# A string that will always be included in the response of
75# GET http://OVERLORD_SERVER:_OVERLORD_HTTP_PORT
76_OVERLORD_RESPONSE_KEYWORD = '<html>'
77
78
79def GetVersionDigest():
80 """Return the sha1sum of the current executing script."""
81 with open(__file__, 'r') as f:
82 return hashlib.sha1(f.read()).hexdigest()
83
84
85def KillGraceful(pid, wait_secs=1):
86 """Kill a process gracefully by first sending SIGTERM, wait for some time,
87 then send SIGKILL to make sure it's killed."""
88 try:
89 os.kill(pid, signal.SIGTERM)
90 time.sleep(wait_secs)
91 os.kill(pid, signal.SIGKILL)
92 except OSError:
93 pass
94
95
Wei-Ning Huangee7ca8d2015-12-12 05:48:02 +080096def AutoRetry(action_name, retries):
97 """Decorator for retry function call."""
98 def Wrap(func):
99 def Loop(*args, **kwargs):
Wei-Ning Huang5564eea2016-01-19 14:36:45 +0800100 for unused_i in range(retries):
Wei-Ning Huangee7ca8d2015-12-12 05:48:02 +0800101 try:
102 func(*args, **kwargs)
103 except Exception as e:
Wei-Ning Huang5564eea2016-01-19 14:36:45 +0800104 print('error: %s: %s: retrying ...' % (args[0], e))
Wei-Ning Huangee7ca8d2015-12-12 05:48:02 +0800105 else:
106 break
107 else:
Wei-Ning Huang5564eea2016-01-19 14:36:45 +0800108 print('error: failed to %s %s' % (action_name, args[0]))
Wei-Ning Huangee7ca8d2015-12-12 05:48:02 +0800109 return Loop
110 return Wrap
111
112
Wei-Ning Huang91aaeed2015-09-24 14:51:56 +0800113def BasicAuthHeader(user, password):
114 """Return HTTP basic auth header."""
115 credential = base64.b64encode('%s:%s' % (user, password))
116 return ('Authorization', 'Basic %s' % credential)
117
118
119def GetTerminalSize():
120 """Retrieve terminal window size."""
121 ws = struct.pack('HHHH', 0, 0, 0, 0)
122 ws = fcntl.ioctl(0, termios.TIOCGWINSZ, ws)
123 lines, columns, unused_x, unused_y = struct.unpack('HHHH', ws)
124 return lines, columns
125
126
127def MakeRequestUrl(state, url):
128 return 'http%s://%s' % ('s' if state.ssl else '', url)
129
130
131class ProgressBar(object):
132 SIZE_WIDTH = 11
133 SPEED_WIDTH = 10
134 DURATION_WIDTH = 6
135 PERCENTAGE_WIDTH = 8
136
137 def __init__(self, name):
138 self._start_time = time.time()
139 self._name = name
140 self._size = 0
141 self._width = 0
142 self._name_width = 0
143 self._name_max = 0
144 self._stat_width = 0
145 self._max = 0
146 self.CalculateSize()
147 self.SetProgress(0)
148
149 def CalculateSize(self):
150 self._width = GetTerminalSize()[1] or _DEFAULT_TERMINAL_WIDTH
151 self._name_width = int(self._width * 0.3)
152 self._name_max = self._name_width
153 self._stat_width = self.SIZE_WIDTH + self.SPEED_WIDTH + self.DURATION_WIDTH
154 self._max = (self._width - self._name_width - self._stat_width -
155 self.PERCENTAGE_WIDTH)
156
157 def SizeToHuman(self, size_in_bytes):
158 if size_in_bytes < 1024:
159 unit = 'B'
160 value = size_in_bytes
161 elif size_in_bytes < 1024 ** 2:
162 unit = 'KiB'
163 value = size_in_bytes / 1024.0
164 elif size_in_bytes < 1024 ** 3:
165 unit = 'MiB'
166 value = size_in_bytes / (1024.0 ** 2)
167 elif size_in_bytes < 1024 ** 4:
168 unit = 'GiB'
169 value = size_in_bytes / (1024.0 ** 3)
170 return ' %6.1f %3s' % (value, unit)
171
172 def SpeedToHuman(self, speed_in_bs):
173 if speed_in_bs < 1024:
174 unit = 'B'
175 value = speed_in_bs
176 elif speed_in_bs < 1024 ** 2:
177 unit = 'K'
178 value = speed_in_bs / 1024.0
179 elif speed_in_bs < 1024 ** 3:
180 unit = 'M'
181 value = speed_in_bs / (1024.0 ** 2)
182 elif speed_in_bs < 1024 ** 4:
183 unit = 'G'
184 value = speed_in_bs / (1024.0 ** 3)
185 return ' %6.1f%s/s' % (value, unit)
186
187 def DurationToClock(self, duration):
188 return ' %02d:%02d' % (duration / 60, duration % 60)
189
190 def SetProgress(self, percentage, size=None):
191 current_width = GetTerminalSize()[1]
192 if self._width != current_width:
193 self.CalculateSize()
194
195 if size is not None:
196 self._size = size
197
198 elapse_time = time.time() - self._start_time
199 speed = self._size / float(elapse_time)
200
201 size_str = self.SizeToHuman(self._size)
202 speed_str = self.SpeedToHuman(speed)
203 elapse_str = self.DurationToClock(elapse_time)
204
205 width = int(self._max * percentage / 100.0)
206 sys.stdout.write(
207 '%*s' % (- self._name_max,
208 self._name if len(self._name) <= self._name_max else
209 self._name[:self._name_max - 4] + ' ...') +
210 size_str + speed_str + elapse_str +
211 ((' [' + '#' * width + ' ' * (self._max - width) + ']' +
212 '%4d%%' % int(percentage)) if self._max > 2 else '') + '\r')
213 sys.stdout.flush()
214
215 def End(self):
216 self.SetProgress(100.0)
217 sys.stdout.write('\n')
218 sys.stdout.flush()
219
220
221class DaemonState(object):
222 """DaemonState is used for storing Overlord state info."""
223 def __init__(self):
224 self.version_sha1sum = GetVersionDigest()
225 self.host = None
226 self.port = None
227 self.ssl = False
228 self.ssh = False
229 self.orig_host = None
230 self.ssh_pid = None
231 self.username = None
232 self.password = None
233 self.selected_mid = None
234 self.forwards = {}
235 self.listing = []
236 self.last_list = 0
237
238
239class OverlordClientDaemon(object):
240 """Overlord Client Daemon."""
241 def __init__(self):
242 self._state = DaemonState()
243 self._server = None
244
245 def Start(self):
246 self.StartRPCServer()
247
248 def StartRPCServer(self):
249 self._server = SimpleJSONRPCServer(_OVERLORD_CLIENT_DAEMON_RPC_ADDR,
250 logRequests=False)
251 exports = [
252 (self.State, 'State'),
253 (self.Ping, 'Ping'),
254 (self.GetPid, 'GetPid'),
255 (self.Connect, 'Connect'),
256 (self.Clients, 'Clients'),
257 (self.SelectClient, 'SelectClient'),
258 (self.AddForward, 'AddForward'),
259 (self.RemoveForward, 'RemoveForward'),
260 (self.RemoveAllForward, 'RemoveAllForward'),
261 ]
262 for func, name in exports:
263 self._server.register_function(func, name)
264
265 pid = os.fork()
266 if pid == 0:
267 self._server.serve_forever()
268
269 @staticmethod
270 def GetRPCServer():
271 """Returns the Overlord client daemon RPC server."""
272 server = jsonrpclib.Server('http://%s:%d' %
273 _OVERLORD_CLIENT_DAEMON_RPC_ADDR)
274 try:
275 server.Ping()
276 except Exception:
277 return None
278 return server
279
280 def State(self):
281 return self._state
282
283 def Ping(self):
284 return True
285
286 def GetPid(self):
287 return os.getpid()
288
289 def _UrlOpen(self, url):
290 """Wrapper for urllib2.urlopen.
291
292 It selects correct HTTP scheme according to self._stat.ssl and add HTTP
293 basic auth headers.
294 """
295 url = MakeRequestUrl(self._state, url)
296 request = urllib2.Request(url)
297 if self._state.username is not None and self._state.password is not None:
298 request.add_header(*BasicAuthHeader(self._state.username,
299 self._state.password))
Wei-Ning Huangee7ca8d2015-12-12 05:48:02 +0800300 return urllib2.urlopen(request, timeout=_DEFAULT_HTTP_TIMEOUT)
Wei-Ning Huang91aaeed2015-09-24 14:51:56 +0800301
302 def _GetJSON(self, path):
303 url = '%s:%d%s' % (self._state.host, self._state.port, path)
304 return json.loads(self._UrlOpen(url).read())
305
306 def Connect(self, host, port=_OVERLORD_HTTP_PORT, ssh_pid=None,
307 username=None, password=None, orig_host=None):
308 self._state.username = username
309 self._state.password = password
310 self._state.host = host
311 self._state.port = port
312 self._state.ssl = False
313 self._state.orig_host = orig_host
314 self._state.ssh_pid = ssh_pid
315 self._state.selected_mid = None
316
317 try:
318 h = self._UrlOpen('%s:%d' % (host, port))
319 # Probably not an HTTP server, try HTTPS
320 if _OVERLORD_RESPONSE_KEYWORD not in h.read():
321 self._state.ssl = True
322 self._UrlOpen('%s:%d' % (host, port))
323 except urllib2.HTTPError as e:
324 logging.exception(e)
325 return e.getcode()
326 except Exception as e:
327 logging.exception(e)
328 return str(e)
329 return True
330
331 def Clients(self):
332 if time.time() - self._state.last_list <= _LIST_CACHE_TIMEOUT:
333 return self._state.listing
334
335 mids = [client['mid'] for client in self._GetJSON('/api/agents/list')]
336 self._state.listing = sorted(list(set(mids)))
337 self._state.last_list = time.time()
338 return self._state.listing
339
340 def SelectClient(self, mid):
341 self._state.selected_mid = mid
342
343 def AddForward(self, mid, remote, local, pid):
344 self._state.forwards[local] = (mid, remote, pid)
345
346 def RemoveForward(self, local_port):
347 try:
348 unused_mid, unused_remote, pid = self._state.forwards[local_port]
349 KillGraceful(pid)
350 del self._state.forwards[local_port]
351 except (KeyError, OSError):
352 pass
353
354 def RemoveAllForward(self):
355 for unused_mid, unused_remote, pid in self._state.forwards.values():
356 try:
357 KillGraceful(pid)
358 except OSError:
359 pass
360 self._state.forwards = {}
361
362
363class TerminalWebSocketClient(WebSocketBaseClient):
364 def __init__(self, mid, *args, **kwargs):
365 super(TerminalWebSocketClient, self).__init__(*args, **kwargs)
366 self._mid = mid
367 self._stdin_fd = sys.stdin.fileno()
368 self._old_termios = None
369
370 def handshake_ok(self):
371 pass
372
373 def opened(self):
374 nonlocals = {'size': (80, 40)}
375
376 def _ResizeWindow():
377 size = GetTerminalSize()
378 if size != nonlocals['size']: # Size not changed, ignore
379 control = {'command': 'resize', 'params': list(size)}
380 payload = chr(_CONTROL_START) + json.dumps(control) + chr(_CONTROL_END)
381 nonlocals['size'] = size
382 try:
383 self.send(payload, binary=True)
384 except Exception:
385 pass
386
387 def _FeedInput():
Wei-Ning Huang91aaeed2015-09-24 14:51:56 +0800388 self._old_termios = termios.tcgetattr(self._stdin_fd)
389 tty.setraw(self._stdin_fd)
390
391 READY, ENTER_PRESSED, ESCAPE_PRESSED = range(3)
392
393 try:
394 state = READY
395 while True:
Wei-Ning Huang85e763d2016-01-21 15:53:18 +0800396 # Check if terminal is resized
Wei-Ning Huang91aaeed2015-09-24 14:51:56 +0800397 _ResizeWindow()
398
Wei-Ning Huang85e763d2016-01-21 15:53:18 +0800399 ch = sys.stdin.read(1)
Wei-Ning Huang91aaeed2015-09-24 14:51:56 +0800400
Wei-Ning Huang85e763d2016-01-21 15:53:18 +0800401 # Scan for escape sequence
402 if state == READY:
403 state = ENTER_PRESSED if ch == chr(0x0d) else READY
404 elif state == ENTER_PRESSED:
405 state = ESCAPE_PRESSED if ch == _ESCAPE else READY
406 elif state == ESCAPE_PRESSED:
407 if ch == '.':
408 self.close()
Wei-Ning Huang9083b7c2016-01-26 16:44:11 +0800409 break
Wei-Ning Huang85e763d2016-01-21 15:53:18 +0800410 else:
411 state = READY
Wei-Ning Huang91aaeed2015-09-24 14:51:56 +0800412
Wei-Ning Huang85e763d2016-01-21 15:53:18 +0800413 self.send(ch)
Wei-Ning Huang91aaeed2015-09-24 14:51:56 +0800414 except (KeyboardInterrupt, RuntimeError):
415 pass
416
417 t = threading.Thread(target=_FeedInput)
418 t.daemon = True
419 t.start()
420
421 def closed(self, code, reason=None):
422 termios.tcsetattr(self._stdin_fd, termios.TCSANOW, self._old_termios)
423 print('Connection to %s closed.' % self._mid)
424
425 def received_message(self, msg):
426 if msg.is_binary:
427 sys.stdout.write(msg.data)
428 sys.stdout.flush()
429
430
431class ShellWebSocketClient(WebSocketBaseClient):
432 def __init__(self, output, *args, **kwargs):
433 """Constructor.
434
435 Args:
436 output: output file object.
437 """
438 self.output = output
439 super(ShellWebSocketClient, self).__init__(*args, **kwargs)
440
441 def handshake_ok(self):
442 pass
443
444 def opened(self):
Wei-Ning Huang9083b7c2016-01-26 16:44:11 +0800445 def _FeedInput():
446 try:
447 while True:
448 data = sys.stdin.read(1)
449
450 if len(data) == 0:
451 self.send(_STDIN_CLOSED * 2)
452 break
453 self.send(data, binary=True)
454 except (KeyboardInterrupt, RuntimeError):
455 pass
456
457 t = threading.Thread(target=_FeedInput)
458 t.daemon = True
459 t.start()
Wei-Ning Huang91aaeed2015-09-24 14:51:56 +0800460
461 def closed(self, code, reason=None):
462 pass
463
464 def received_message(self, msg):
465 if msg.is_binary:
466 self.output.write(msg.data)
467 self.output.flush()
468
469
470class ForwarderWebSocketClient(WebSocketBaseClient):
471 def __init__(self, sock, *args, **kwargs):
472 super(ForwarderWebSocketClient, self).__init__(*args, **kwargs)
473 self._sock = sock
474 self._stop = threading.Event()
475
476 def handshake_ok(self):
477 pass
478
479 def opened(self):
480 def _FeedInput():
481 try:
482 self._sock.setblocking(False)
483 while True:
484 rd, unused_w, unused_x = select.select([self._sock], [], [], 0.5)
485 if self._stop.is_set():
486 break
487 if self._sock in rd:
488 data = self._sock.recv(_BUFSIZ)
489 if len(data) == 0:
Wei-Ning Huang9083b7c2016-01-26 16:44:11 +0800490 self.close()
Wei-Ning Huang91aaeed2015-09-24 14:51:56 +0800491 break
492 self.send(data, binary=True)
493 except Exception:
494 pass
495 finally:
496 self._sock.close()
Wei-Ning Huang91aaeed2015-09-24 14:51:56 +0800497
498 t = threading.Thread(target=_FeedInput)
499 t.daemon = True
500 t.start()
501
502 def closed(self, code, reason=None):
503 self._stop.set()
504 sys.exit(0)
505
506 def received_message(self, msg):
507 if msg.is_binary:
508 self._sock.send(msg.data)
509
510
511def Arg(*args, **kwargs):
512 return (args, kwargs)
513
514
515def Command(command, help_msg=None, args=None):
516 """Decorator for adding argparse parameter for a method."""
517 if args is None:
518 args = []
519 def WrapFunc(func):
520 def Wrapped(*args, **kwargs):
521 return func(*args, **kwargs)
522 # pylint: disable=W0212
523 Wrapped.__arg_attr = {'command': command, 'help': help_msg, 'args': args}
524 return Wrapped
525 return WrapFunc
526
527
528def ParseMethodSubCommands(cls):
529 """Decorator for a class using the @Command decorator.
530
531 This decorator retrieve command info from each method and append it in to the
532 SUBCOMMANDS class variable, which is later used to construct parser.
533 """
534 for unused_key, method in cls.__dict__.iteritems():
535 if hasattr(method, '__arg_attr'):
536 cls.SUBCOMMANDS.append(method.__arg_attr) # pylint: disable=W0212
537 return cls
538
539
540@ParseMethodSubCommands
541class OverlordCLIClient(object):
542 """Overlord command line interface client."""
543
544 SUBCOMMANDS = []
545
546 def __init__(self):
547 self._parser = self._BuildParser()
548 self._selected_mid = None
549 self._server = None
550 self._state = None
551
552 def _BuildParser(self):
553 root_parser = argparse.ArgumentParser(prog='ovl')
554 subparsers = root_parser.add_subparsers(help='sub-command')
555
556 root_parser.add_argument('-s', dest='selected_mid', action='store',
557 default=None,
558 help='select target to execute command on')
559 root_parser.add_argument('-S', dest='select_mid_before_action',
560 action='store_true', default=False,
561 help='select target before executing command')
562
563 for attr in self.SUBCOMMANDS:
564 parser = subparsers.add_parser(attr['command'], help=attr['help'])
565 parser.set_defaults(which=attr['command'])
566 for arg in attr['args']:
567 parser.add_argument(*arg[0], **arg[1])
568
569 return root_parser
570
571 def Main(self):
572 # We want to pass the rest of arguments after shell command directly to the
573 # function without parsing it.
574 try:
575 index = sys.argv.index('shell')
576 except ValueError:
577 args = self._parser.parse_args()
578 else:
579 args = self._parser.parse_args(sys.argv[1:index + 1])
580
581 command = args.which
582 self._selected_mid = args.selected_mid
583
Wei-Ning Huang5564eea2016-01-19 14:36:45 +0800584 if command == 'start-server':
585 self.StartServer()
586 return
587 elif command == 'kill-server':
Wei-Ning Huang91aaeed2015-09-24 14:51:56 +0800588 self.KillServer()
589 return
590
Wei-Ning Huang5564eea2016-01-19 14:36:45 +0800591 self.CheckDaemon()
Wei-Ning Huang91aaeed2015-09-24 14:51:56 +0800592 if command == 'status':
593 self.Status()
594 return
595 elif command == 'connect':
596 self.Connect(args)
597 return
598
599 # The following command requires connection to the server
600 self.CheckConnection()
601
602 if args.select_mid_before_action:
603 self.SelectClient(store=False)
604
605 if command == 'select':
606 self.SelectClient(args)
607 elif command == 'ls':
608 self.ListClients()
609 elif command == 'shell':
610 command = sys.argv[sys.argv.index('shell') + 1:]
611 self.Shell(command)
612 elif command == 'push':
613 self.Push(args)
614 elif command == 'pull':
615 self.Pull(args)
616 elif command == 'forward':
617 self.Forward(args)
618
619 def _UrlOpen(self, url):
620 url = MakeRequestUrl(self._state, url)
621 request = urllib2.Request(url)
622 if self._state.username is not None and self._state.password is not None:
623 request.add_header(*BasicAuthHeader(self._state.username,
624 self._state.password))
Wei-Ning Huangee7ca8d2015-12-12 05:48:02 +0800625 return urllib2.urlopen(request, timeout=_DEFAULT_HTTP_TIMEOUT)
Wei-Ning Huang91aaeed2015-09-24 14:51:56 +0800626
627 def _HTTPPostFile(self, url, filename, progress=None, user=None, passwd=None):
628 """Perform HTTP POST and upload file to Overlord.
629
630 To minimize the external dependencies, we construct the HTTP post request
631 by ourselves.
632 """
633 url = MakeRequestUrl(self._state, url)
634 size = os.stat(filename).st_size
635 boundary = '-----------%s' % _HTTP_BOUNDARY_MAGIC
636 CRLF = '\r\n'
637 parse = urlparse.urlparse(url)
638
639 part_headers = [
640 '--' + boundary,
641 'Content-Disposition: form-data; name="file"; '
642 'filename="%s"' % os.path.basename(filename),
643 'Content-Type: application/octet-stream',
644 '', ''
645 ]
646 part_header = CRLF.join(part_headers)
647 end_part = CRLF + '--' + boundary + '--' + CRLF
648
649 content_length = len(part_header) + size + len(end_part)
650 if parse.scheme == 'http':
651 h = httplib.HTTP(parse.netloc)
652 else:
653 h = httplib.HTTPS(parse.netloc)
654
655 post_path = url[url.index(parse.netloc) + len(parse.netloc):]
656 h.putrequest('POST', post_path)
657 h.putheader('Content-Length', content_length)
658 h.putheader('Content-Type', 'multipart/form-data; boundary=%s' % boundary)
659
660 if user and passwd:
661 h.putheader(*BasicAuthHeader(user, passwd))
662 h.endheaders()
663 h.send(part_header)
664
665 count = 0
666 with open(filename, 'r') as f:
667 while True:
668 data = f.read(_BUFSIZ)
669 if not data:
670 break
671 count += len(data)
672 if progress:
673 progress(int(count * 100.0 / size), count)
674 h.send(data)
675
676 h.send(end_part)
677 progress(100)
678
679 if count != size:
680 logging.warning('file changed during upload, upload may be truncated.')
681
682 errcode, unused_x, unused_y = h.getreply()
683 return errcode == 200
684
Wei-Ning Huang5564eea2016-01-19 14:36:45 +0800685 def CheckDaemon(self):
Wei-Ning Huang91aaeed2015-09-24 14:51:56 +0800686 self._server = OverlordClientDaemon.GetRPCServer()
687 if self._server is None:
688 print('* daemon not running, starting it now on port %d ... *' %
689 _OVERLORD_CLIENT_DAEMON_PORT)
Wei-Ning Huang5564eea2016-01-19 14:36:45 +0800690 self.StartServer()
Wei-Ning Huang91aaeed2015-09-24 14:51:56 +0800691
692 self._state = self._server.State()
693 sha1sum = GetVersionDigest()
694
695 if sha1sum != self._state.version_sha1sum:
696 print('ovl server is out of date. killing...')
697 KillGraceful(self._server.GetPid())
Wei-Ning Huang5564eea2016-01-19 14:36:45 +0800698 self.StartServer()
Wei-Ning Huang91aaeed2015-09-24 14:51:56 +0800699
700 def GetSSHControlFile(self, host):
701 return _SSH_CONTROL_SOCKET_PREFIX + host
702
703 def SSHTunnel(self, user, host, port):
704 """SSH forward the remote overlord server.
705
706 Overlord server may not have port 9000 open to the public network, in such
707 case we can SSH forward the port to localhost.
708 """
709
710 control_file = self.GetSSHControlFile(host)
711 try:
712 os.unlink(control_file)
713 except Exception:
714 pass
715
716 subprocess.Popen([
717 'ssh', '-Nf',
718 '-M', # Enable master mode
719 '-S', control_file,
720 '-L', '9000:localhost:9000',
721 '-p', str(port),
722 '%s%s' % (user + '@' if user else '', host)
723 ]).wait()
724
725 p = subprocess.Popen([
726 'ssh',
727 '-S', control_file,
728 '-O', 'check', host,
729 ], stderr=subprocess.PIPE)
730 unused_stdout, stderr = p.communicate()
731
732 s = re.search(r'pid=(\d+)', stderr)
733 if s:
734 return int(s.group(1))
735
736 raise RuntimeError('can not establish ssh connection')
737
738 def CheckConnection(self):
739 if self._state.host is None:
740 raise RuntimeError('not connected to any server, abort')
741
742 try:
743 self._server.Clients()
744 except Exception:
745 raise RuntimeError('remote server disconnected, abort')
746
747 if self._state.ssh_pid is not None:
748 ret = subprocess.Popen(['kill', '-0', str(self._state.ssh_pid)],
749 stdout=subprocess.PIPE,
750 stderr=subprocess.PIPE).wait()
751 if ret != 0:
752 raise RuntimeError('ssh tunnel disconnected, please re-connect')
753
754 def CheckClient(self):
755 if self._selected_mid is None:
756 if self._state.selected_mid is None:
757 raise RuntimeError('No client is selected')
758 self._selected_mid = self._state.selected_mid
759
760 if self._selected_mid not in self._server.Clients():
761 raise RuntimeError('client %s disappeared' % self._selected_mid)
762
763 def CheckOutput(self, command):
764 headers = []
765 if self._state.username is not None and self._state.password is not None:
766 headers.append(BasicAuthHeader(self._state.username,
767 self._state.password))
768
769 scheme = 'ws%s://' % ('s' if self._state.ssl else '')
770 sio = StringIO.StringIO()
771 ws = ShellWebSocketClient(sio,
772 scheme + '%s:%d/api/agent/shell/%s?command=%s' %
773 (self._state.host, self._state.port,
774 self._selected_mid, urllib2.quote(command)),
775 headers=headers)
776 ws.connect()
777 ws.run()
778 return sio.getvalue()
779
780 @Command('status', 'show Overlord connection status')
781 def Status(self):
782 if self._state.host is None:
783 print('Not connected to any host.')
784 else:
785 if self._state.ssh_pid is not None:
786 print('Connected to %s with SSH tunneling.' % self._state.orig_host)
787 else:
788 print('Connected to %s:%d.' % (self._state.host, self._state.port))
789
790 if self._selected_mid is None:
791 self._selected_mid = self._state.selected_mid
792
793 if self._selected_mid is None:
794 print('No client is selected.')
795 else:
796 print('Client %s selected.' % self._selected_mid)
797
798 @Command('connect', 'connect to Overlord server', [
799 Arg('host', metavar='HOST', type=str, default='localhost',
800 help='Overlord hostname/IP'),
801 Arg('port', metavar='PORT', type=int,
802 default=_OVERLORD_HTTP_PORT, help='Overlord port'),
803 Arg('-f', '--forward', dest='ssh_forward', default=False,
804 action='store_true',
805 help='connect with SSH forwarding to the host'),
806 Arg('-p', '--ssh-port', dest='ssh_port', default=22,
807 type=int, help='SSH server port for SSH forwarding'),
808 Arg('-l', '--ssh-login', dest='ssh_login', default='',
809 type=str, help='SSH server login name for SSH forwarding'),
810 Arg('-u', '--user', dest='user', default=None,
811 type=str, help='Overlord HTTP auth username'),
812 Arg('-w', '--passwd', dest='passwd', default=None, type=str,
813 help='Overlord HTTP auth password')])
814 def Connect(self, args):
815 ssh_pid = None
816 host = args.host
817 orig_host = args.host
818
819 if args.ssh_forward:
820 # Kill previous SSH tunnel
821 self.KillSSHTunnel()
822
823 ssh_pid = self.SSHTunnel(args.ssh_login, args.host, args.ssh_port)
824 host = 'localhost'
825
826 status = self._server.Connect(host, args.port, ssh_pid, args.user,
827 args.passwd, orig_host)
828 if status is not True:
829 if isinstance(status, int):
830 if status == 401:
831 msg = '401 Unauthorized'
832 else:
833 msg = 'HTTP %d' % status
834 else:
835 msg = status
836 print('can not connect to %s: %s' % (host, msg))
837
Wei-Ning Huang5564eea2016-01-19 14:36:45 +0800838 @Command('start-server', 'start overlord CLI client server')
839 def StartServer(self):
840 self._server = OverlordClientDaemon.GetRPCServer()
841 if self._server is None:
842 OverlordClientDaemon().Start()
843 time.sleep(1)
844 self._server = OverlordClientDaemon.GetRPCServer()
845 if self._server is not None:
846 print('* daemon started successfully *')
847
Wei-Ning Huang91aaeed2015-09-24 14:51:56 +0800848 @Command('kill-server', 'kill overlord CLI client server')
849 def KillServer(self):
850 self._server = OverlordClientDaemon.GetRPCServer()
851 if self._server is None:
852 return
853
Wei-Ning Huangee7ca8d2015-12-12 05:48:02 +0800854 self._state = self._server.State()
855
Wei-Ning Huang91aaeed2015-09-24 14:51:56 +0800856 # Kill SSH Tunnel
857 self.KillSSHTunnel()
858
859 # Kill server daemon
860 KillGraceful(self._server.GetPid())
861
862 def KillSSHTunnel(self):
863 if self._state.ssh_pid is not None:
864 KillGraceful(self._state.ssh_pid)
865
866 @Command('ls', 'list all clients')
867 def ListClients(self):
868 for client in self._server.Clients():
869 print(client)
870
871 @Command('select', 'select default client', [
872 Arg('mid', metavar='mid', nargs='?', default=None)])
873 def SelectClient(self, args=None, store=True):
874 clients = self._server.Clients()
875
876 mid = args.mid if args is not None else None
877 if mid is None:
878 print('Select from the following clients:')
879 for i, client in enumerate(clients):
880 print(' %d. %s' % (i + 1, client))
881
882 print('\nSelection: ', end='')
883 try:
884 choice = int(raw_input()) - 1
885 mid = clients[choice]
886 except ValueError:
887 raise RuntimeError('select: invalid selection')
888 except IndexError:
889 raise RuntimeError('select: selection out of range')
890 else:
891 if mid not in clients:
892 raise RuntimeError('select: client %s does not exist' % mid)
893
894 self._selected_mid = mid
895 if store:
896 self._server.SelectClient(mid)
897 print('Client %s selected' % mid)
898
899 @Command('shell', 'open a shell or execute a shell command', [
900 Arg('command', metavar='CMD', nargs='?', help='command to execute')])
901 def Shell(self, command=None):
902 if command is None:
903 command = []
904 self.CheckClient()
905
906 headers = []
907 if self._state.username is not None and self._state.password is not None:
908 headers.append(BasicAuthHeader(self._state.username,
909 self._state.password))
910
911 scheme = 'ws%s://' % ('s' if self._state.ssl else '')
912 if len(command) == 0:
913 ws = TerminalWebSocketClient(self._selected_mid,
914 scheme + '%s:%d/api/agent/tty/%s' %
915 (self._state.host, self._state.port,
916 self._selected_mid), headers=headers)
917 else:
918 cmd = ' '.join(command)
919 ws = ShellWebSocketClient(sys.stdout,
920 scheme + '%s:%d/api/agent/shell/%s?command=%s' %
921 (self._state.host, self._state.port,
922 self._selected_mid, urllib2.quote(cmd)),
923 headers=headers)
Wei-Ning Huang9083b7c2016-01-26 16:44:11 +0800924 try:
925 ws.connect()
926 ws.run()
927 except socket.error as e:
928 if e.errno == 32: # Broken pipe
929 pass
930 else:
931 raise
Wei-Ning Huang91aaeed2015-09-24 14:51:56 +0800932
933 @Command('push', 'push a file or directory to remote', [
934 Arg('src', metavar='SOURCE'),
935 Arg('dst', metavar='DESTINATION')])
936 def Push(self, args):
937 self.CheckClient()
938
939 if not os.path.exists(args.src):
940 raise RuntimeError('push: can not stat "%s": no such file or directory'
941 % args.src)
942
943 if not os.access(args.src, os.R_OK):
944 raise RuntimeError('push: can not open "%s" for reading' % args.src)
945
Wei-Ning Huangee7ca8d2015-12-12 05:48:02 +0800946 @AutoRetry('push', _RETRY_TIMES)
Wei-Ning Huang91aaeed2015-09-24 14:51:56 +0800947 def _push(src, dst):
948 src_base = os.path.basename(src)
Wei-Ning Huangee7ca8d2015-12-12 05:48:02 +0800949
950 # Local file is a link
951 if os.path.islink(src):
952 pbar = ProgressBar(src_base)
953 link_path = os.readlink(src)
954 self.CheckOutput('mkdir -p %(dirname)s; '
955 'if [ -d "%(dst)s" ]; then '
956 'ln -sf "%(link_path)s" "%(dst)s/%(link_name)s"; '
957 'else ln -sf "%(link_path)s" "%(dst)s"; fi' %
958 dict(dirname=os.path.dirname(dst),
959 link_path=link_path, dst=dst,
960 link_name=src_base))
961 pbar.End()
962 return
963
Wei-Ning Huang91aaeed2015-09-24 14:51:56 +0800964 mode = '0%o' % (0x1FF & os.stat(src).st_mode)
965 url = ('%s:%d/api/agent/upload/%s?dest=%s&perm=%s' %
966 (self._state.host, self._state.port, self._selected_mid, dst,
967 mode))
968 try:
969 self._UrlOpen(url + '&filename=%s' % src_base)
970 except urllib2.HTTPError as e:
971 msg = json.loads(e.read()).get('error', None)
972 raise RuntimeError('push: %s' % msg)
973
974 pbar = ProgressBar(src_base)
975 self._HTTPPostFile(url, src, pbar.SetProgress,
976 self._state.username, self._state.password)
977 pbar.End()
978
979 if os.path.isdir(args.src):
980 dst_exists = ast.literal_eval(self.CheckOutput(
981 'stat %s >/dev/null 2>&1 && echo True || echo False' % args.dst))
982 for root, unused_x, files in os.walk(args.src):
983 # If destination directory does not exist, we should strip the first
984 # layer of directory. For example: src_dir contains a single file 'A'
985 #
986 # push src_dir dest_dir
987 #
988 # If dest_dir exists, the resulting directory structure should be:
989 # dest_dir/src_dir/A
990 # If dest_dir does not exist, the resulting directory structure should
991 # be:
992 # dest_dir/A
993 dst_root = root if dst_exists else root[len(args.src):].lstrip('/')
994 for name in files:
995 _push(os.path.join(root, name),
996 os.path.join(args.dst, dst_root, name))
997 else:
998 _push(args.src, args.dst)
999
1000 @Command('pull', 'pull a file or directory from remote', [
1001 Arg('src', metavar='SOURCE'),
1002 Arg('dst', metavar='DESTINATION', default='.', nargs='?')])
1003 def Pull(self, args):
1004 self.CheckClient()
1005
Wei-Ning Huangee7ca8d2015-12-12 05:48:02 +08001006 @AutoRetry('pull', _RETRY_TIMES)
1007 def _pull(src, dst, ftype, perm=0644, link=None):
1008 try:
1009 os.makedirs(os.path.dirname(dst))
1010 except Exception:
1011 pass
1012
1013 src_base = os.path.basename(src)
1014
1015 # Remote file is a link
1016 if ftype == 'l':
1017 pbar = ProgressBar(src_base)
1018 if os.path.exists(dst):
1019 os.remove(dst)
1020 os.symlink(link, dst)
1021 pbar.End()
1022 return
1023
Wei-Ning Huang91aaeed2015-09-24 14:51:56 +08001024 url = ('%s:%d/api/agent/download/%s?filename=%s' %
1025 (self._state.host, self._state.port, self._selected_mid,
1026 urllib2.quote(src)))
1027 try:
1028 h = self._UrlOpen(url)
1029 except urllib2.HTTPError as e:
1030 msg = json.loads(e.read()).get('error', 'unkown error')
1031 raise RuntimeError('pull: %s' % msg)
1032 except KeyboardInterrupt:
1033 return
1034
Wei-Ning Huangee7ca8d2015-12-12 05:48:02 +08001035 pbar = ProgressBar(src_base)
Wei-Ning Huang91aaeed2015-09-24 14:51:56 +08001036 with open(dst, 'w') as f:
1037 os.fchmod(f.fileno(), perm)
1038 total_size = int(h.headers.get('Content-Length'))
1039 downloaded_size = 0
Wei-Ning Huangee7ca8d2015-12-12 05:48:02 +08001040
Wei-Ning Huang91aaeed2015-09-24 14:51:56 +08001041 while True:
1042 data = h.read(_BUFSIZ)
Wei-Ning Huangee7ca8d2015-12-12 05:48:02 +08001043 if len(data) == 0:
1044 break
Wei-Ning Huang91aaeed2015-09-24 14:51:56 +08001045 downloaded_size += len(data)
1046 pbar.SetProgress(float(downloaded_size) * 100 / total_size,
1047 downloaded_size)
Wei-Ning Huang91aaeed2015-09-24 14:51:56 +08001048 f.write(data)
1049 pbar.End()
1050
1051 # Use find to get a listing of all files under a root directory. The 'stat'
1052 # command is used to retrieve the filename and it's filemode.
1053 output = self.CheckOutput(
1054 'cd $HOME; '
1055 'stat "%(src)s" >/dev/null && '
Wei-Ning Huangee7ca8d2015-12-12 05:48:02 +08001056 'find "%(src)s" \'(\' -type f -o -type l \')\' '
1057 '-printf \'%%m\t%%p\t%%y\t%%l\n\''
Wei-Ning Huang91aaeed2015-09-24 14:51:56 +08001058 % {'src': args.src})
1059
1060 # We got error from the stat command
1061 if output.startswith('stat: '):
1062 sys.stderr.write(output)
1063 return
1064
Wei-Ning Huangee7ca8d2015-12-12 05:48:02 +08001065 entries = output.strip('\n').split('\n')
Wei-Ning Huang91aaeed2015-09-24 14:51:56 +08001066 common_prefix = os.path.dirname(args.src)
1067
1068 if len(entries) == 1:
1069 entry = entries[0]
Wei-Ning Huangee7ca8d2015-12-12 05:48:02 +08001070 perm, src_path, ftype, link = entry.split('\t', -1)
Wei-Ning Huang91aaeed2015-09-24 14:51:56 +08001071 if os.path.isdir(args.dst):
1072 dst = os.path.join(args.dst, os.path.basename(src_path))
1073 else:
1074 dst = args.dst
Wei-Ning Huangee7ca8d2015-12-12 05:48:02 +08001075 _pull(src_path, dst, ftype, int(perm, base=8), link)
Wei-Ning Huang91aaeed2015-09-24 14:51:56 +08001076 else:
1077 if not os.path.exists(args.dst):
1078 common_prefix = args.src
1079
1080 for entry in entries:
Wei-Ning Huangee7ca8d2015-12-12 05:48:02 +08001081 perm, src_path, ftype, link = entry.split('\t', -1)
Wei-Ning Huang91aaeed2015-09-24 14:51:56 +08001082 rel_dst = src_path[len(common_prefix):].lstrip('/')
Wei-Ning Huangee7ca8d2015-12-12 05:48:02 +08001083 _pull(src_path, os.path.join(args.dst, rel_dst), ftype,
1084 int(perm, base=8), link)
Wei-Ning Huang91aaeed2015-09-24 14:51:56 +08001085
1086 @Command('forward', 'forward remote port to local port', [
1087 Arg('--list', dest='list_all', action='store_true', default=False,
1088 help='list all port forwarding sessions'),
1089 Arg('--remove', metavar='LOCAL_PORT', dest='remove', type=int,
1090 default=None,
1091 help='remove port forwarding for local port LOCAL_PORT'),
1092 Arg('--remove-all', dest='remove_all', action='store_true',
1093 default=False, help='remove all port forwarding'),
1094 Arg('remote', metavar='REMOTE_PORT', type=int, nargs='?'),
1095 Arg('local', metavar='LOCAL_PORT', type=int, nargs='?')])
1096 def Forward(self, args):
1097 if args.list_all:
1098 max_len = 10
1099 if len(self._state.forwards):
1100 max_len = max([len(v[0]) for v in self._state.forwards.values()])
1101
1102 print('%-*s %-8s %-8s' % (max_len, 'Client', 'Remote', 'Local'))
1103 for local in sorted(self._state.forwards.keys()):
1104 value = self._state.forwards[local]
1105 print('%-*s %-8s %-8s' % (max_len, value[0], value[1], local))
1106 return
1107
1108 if args.remove_all:
1109 self._server.RemoveAllForward()
1110 return
1111
1112 if args.remove:
1113 self._server.RemoveForward(args.remove)
1114 return
1115
1116 self.CheckClient()
1117
Wei-Ning Huang9083b7c2016-01-26 16:44:11 +08001118 if args.remote is None:
1119 raise RuntimeError('remote port not specified')
1120
Wei-Ning Huang91aaeed2015-09-24 14:51:56 +08001121 if args.local is None:
1122 args.local = args.remote
1123 remote = int(args.remote)
1124 local = int(args.local)
1125
1126 def HandleConnection(conn):
1127 headers = []
1128 if self._state.username is not None and self._state.password is not None:
1129 headers.append(BasicAuthHeader(self._state.username,
1130 self._state.password))
1131
1132 scheme = 'ws%s://' % ('s' if self._state.ssl else '')
1133 ws = ForwarderWebSocketClient(
1134 conn,
1135 scheme + '%s:%d/api/agent/forward/%s?port=%d' %
1136 (self._state.host, self._state.port, self._selected_mid, remote),
1137 headers=headers)
1138 try:
1139 ws.connect()
1140 ws.run()
1141 except Exception as e:
1142 print('error: %s' % e)
1143 finally:
1144 ws.close()
1145
1146 server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1147 server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
1148 server.bind(('0.0.0.0', local))
1149 server.listen(5)
1150
1151 pid = os.fork()
1152 if pid == 0:
1153 while True:
1154 conn, unused_addr = server.accept()
1155 t = threading.Thread(target=HandleConnection, args=(conn,))
1156 t.daemon = True
1157 t.start()
1158 else:
1159 self._server.AddForward(self._selected_mid, remote, local, pid)
1160
1161
1162def main():
1163 logging.basicConfig(level=logging.INFO)
1164
1165 # Add DaemonState to JSONRPC lib classes
1166 Config.instance().classes.add(DaemonState)
1167
1168 ovl = OverlordCLIClient()
1169 try:
1170 ovl.Main()
1171 except KeyboardInterrupt:
1172 print('Ctrl-C received, abort')
1173 except Exception as e:
1174 print('error: %s' % e)
1175
1176
1177if __name__ == '__main__':
1178 main()