blob: 2174e1c96aaff74b9ccc9d9799bdfbec76026e98 [file] [log] [blame]
Wei-Ning Huang91aaeed2015-09-24 14:51:56 +08001#!/usr/bin/python -u
2# -*- coding: utf-8 -*-
3#
4# Copyright 2015 The Chromium OS Authors. All rights reserved.
5# Use of this source code is governed by a BSD-style license that can be
6# found in the LICENSE file.
7
8from __future__ import print_function
9
10import argparse
11import ast
12import base64
13import fcntl
Wei-Ning Huangba768ab2016-02-07 14:38:06 +080014import getpass
Wei-Ning Huang91aaeed2015-09-24 14:51:56 +080015import hashlib
16import httplib
17import json
18import jsonrpclib
19import logging
20import os
21import re
22import select
23import signal
24import socket
25import StringIO
26import struct
27import subprocess
28import sys
29import tempfile
30import termios
31import threading
32import time
33import tty
34import urllib2
35import urlparse
36
37from jsonrpclib.SimpleJSONRPCServer import SimpleJSONRPCServer
38from jsonrpclib.config import Config
39from ws4py.client import WebSocketBaseClient
40
41# Python version >= 2.7.9 enables SSL check by default, bypass it.
42try:
43 import ssl
44 # pylint: disable=W0212
45 ssl._create_default_https_context = ssl._create_unverified_context
46except Exception:
47 pass
48
49
50_ESCAPE = '~'
51_BUFSIZ = 8192
52_OVERLORD_PORT = 4455
53_OVERLORD_HTTP_PORT = 9000
54_OVERLORD_CLIENT_DAEMON_PORT = 4488
55_OVERLORD_CLIENT_DAEMON_RPC_ADDR = ('127.0.0.1', _OVERLORD_CLIENT_DAEMON_PORT)
56
Wei-Ning Huangee7ca8d2015-12-12 05:48:02 +080057_DEFAULT_HTTP_TIMEOUT = 30
Wei-Ning Huang91aaeed2015-09-24 14:51:56 +080058_LIST_CACHE_TIMEOUT = 2
59_DEFAULT_TERMINAL_WIDTH = 80
Wei-Ning Huangee7ca8d2015-12-12 05:48:02 +080060_RETRY_TIMES = 3
Wei-Ning Huang91aaeed2015-09-24 14:51:56 +080061
62# echo -n overlord | md5sum
63_HTTP_BOUNDARY_MAGIC = '9246f080c855a69012707ab53489b921'
64
Wei-Ning Huang9083b7c2016-01-26 16:44:11 +080065# Terminal resize control
Wei-Ning Huang91aaeed2015-09-24 14:51:56 +080066_CONTROL_START = 128
67_CONTROL_END = 129
Wei-Ning Huang9083b7c2016-01-26 16:44:11 +080068
69# Stream control
70_STDIN_CLOSED = '##STDIN_CLOSED##'
71
Wei-Ning Huang91aaeed2015-09-24 14:51:56 +080072_SSH_CONTROL_SOCKET_PREFIX = os.path.join(tempfile.gettempdir(),
73 'ovl-ssh-control-')
74
75# A string that will always be included in the response of
76# GET http://OVERLORD_SERVER:_OVERLORD_HTTP_PORT
Wei-Ning Huangba768ab2016-02-07 14:38:06 +080077_OVERLORD_RESPONSE_KEYWORD = 'HTTP'
Wei-Ning Huang91aaeed2015-09-24 14:51:56 +080078
79
80def GetVersionDigest():
81 """Return the sha1sum of the current executing script."""
82 with open(__file__, 'r') as f:
83 return hashlib.sha1(f.read()).hexdigest()
84
85
86def KillGraceful(pid, wait_secs=1):
87 """Kill a process gracefully by first sending SIGTERM, wait for some time,
88 then send SIGKILL to make sure it's killed."""
89 try:
90 os.kill(pid, signal.SIGTERM)
91 time.sleep(wait_secs)
92 os.kill(pid, signal.SIGKILL)
93 except OSError:
94 pass
95
96
Wei-Ning Huangee7ca8d2015-12-12 05:48:02 +080097def AutoRetry(action_name, retries):
98 """Decorator for retry function call."""
99 def Wrap(func):
100 def Loop(*args, **kwargs):
Wei-Ning Huang5564eea2016-01-19 14:36:45 +0800101 for unused_i in range(retries):
Wei-Ning Huangee7ca8d2015-12-12 05:48:02 +0800102 try:
103 func(*args, **kwargs)
104 except Exception as e:
Wei-Ning Huang5564eea2016-01-19 14:36:45 +0800105 print('error: %s: %s: retrying ...' % (args[0], e))
Wei-Ning Huangee7ca8d2015-12-12 05:48:02 +0800106 else:
107 break
108 else:
Wei-Ning Huang5564eea2016-01-19 14:36:45 +0800109 print('error: failed to %s %s' % (action_name, args[0]))
Wei-Ning Huangee7ca8d2015-12-12 05:48:02 +0800110 return Loop
111 return Wrap
112
113
Wei-Ning Huang91aaeed2015-09-24 14:51:56 +0800114def BasicAuthHeader(user, password):
115 """Return HTTP basic auth header."""
116 credential = base64.b64encode('%s:%s' % (user, password))
117 return ('Authorization', 'Basic %s' % credential)
118
119
120def GetTerminalSize():
121 """Retrieve terminal window size."""
122 ws = struct.pack('HHHH', 0, 0, 0, 0)
123 ws = fcntl.ioctl(0, termios.TIOCGWINSZ, ws)
124 lines, columns, unused_x, unused_y = struct.unpack('HHHH', ws)
125 return lines, columns
126
127
128def MakeRequestUrl(state, url):
129 return 'http%s://%s' % ('s' if state.ssl else '', url)
130
131
132class ProgressBar(object):
133 SIZE_WIDTH = 11
134 SPEED_WIDTH = 10
135 DURATION_WIDTH = 6
136 PERCENTAGE_WIDTH = 8
137
138 def __init__(self, name):
139 self._start_time = time.time()
140 self._name = name
141 self._size = 0
142 self._width = 0
143 self._name_width = 0
144 self._name_max = 0
145 self._stat_width = 0
146 self._max = 0
147 self.CalculateSize()
148 self.SetProgress(0)
149
150 def CalculateSize(self):
151 self._width = GetTerminalSize()[1] or _DEFAULT_TERMINAL_WIDTH
152 self._name_width = int(self._width * 0.3)
153 self._name_max = self._name_width
154 self._stat_width = self.SIZE_WIDTH + self.SPEED_WIDTH + self.DURATION_WIDTH
155 self._max = (self._width - self._name_width - self._stat_width -
156 self.PERCENTAGE_WIDTH)
157
158 def SizeToHuman(self, size_in_bytes):
159 if size_in_bytes < 1024:
160 unit = 'B'
161 value = size_in_bytes
162 elif size_in_bytes < 1024 ** 2:
163 unit = 'KiB'
164 value = size_in_bytes / 1024.0
165 elif size_in_bytes < 1024 ** 3:
166 unit = 'MiB'
167 value = size_in_bytes / (1024.0 ** 2)
168 elif size_in_bytes < 1024 ** 4:
169 unit = 'GiB'
170 value = size_in_bytes / (1024.0 ** 3)
171 return ' %6.1f %3s' % (value, unit)
172
173 def SpeedToHuman(self, speed_in_bs):
174 if speed_in_bs < 1024:
175 unit = 'B'
176 value = speed_in_bs
177 elif speed_in_bs < 1024 ** 2:
178 unit = 'K'
179 value = speed_in_bs / 1024.0
180 elif speed_in_bs < 1024 ** 3:
181 unit = 'M'
182 value = speed_in_bs / (1024.0 ** 2)
183 elif speed_in_bs < 1024 ** 4:
184 unit = 'G'
185 value = speed_in_bs / (1024.0 ** 3)
186 return ' %6.1f%s/s' % (value, unit)
187
188 def DurationToClock(self, duration):
189 return ' %02d:%02d' % (duration / 60, duration % 60)
190
191 def SetProgress(self, percentage, size=None):
192 current_width = GetTerminalSize()[1]
193 if self._width != current_width:
194 self.CalculateSize()
195
196 if size is not None:
197 self._size = size
198
199 elapse_time = time.time() - self._start_time
200 speed = self._size / float(elapse_time)
201
202 size_str = self.SizeToHuman(self._size)
203 speed_str = self.SpeedToHuman(speed)
204 elapse_str = self.DurationToClock(elapse_time)
205
206 width = int(self._max * percentage / 100.0)
207 sys.stdout.write(
208 '%*s' % (- self._name_max,
209 self._name if len(self._name) <= self._name_max else
210 self._name[:self._name_max - 4] + ' ...') +
211 size_str + speed_str + elapse_str +
212 ((' [' + '#' * width + ' ' * (self._max - width) + ']' +
213 '%4d%%' % int(percentage)) if self._max > 2 else '') + '\r')
214 sys.stdout.flush()
215
216 def End(self):
217 self.SetProgress(100.0)
218 sys.stdout.write('\n')
219 sys.stdout.flush()
220
221
222class DaemonState(object):
223 """DaemonState is used for storing Overlord state info."""
224 def __init__(self):
225 self.version_sha1sum = GetVersionDigest()
226 self.host = None
227 self.port = None
228 self.ssl = False
229 self.ssh = False
230 self.orig_host = None
231 self.ssh_pid = None
232 self.username = None
233 self.password = None
234 self.selected_mid = None
235 self.forwards = {}
236 self.listing = []
237 self.last_list = 0
238
239
240class OverlordClientDaemon(object):
241 """Overlord Client Daemon."""
242 def __init__(self):
243 self._state = DaemonState()
244 self._server = None
245
246 def Start(self):
247 self.StartRPCServer()
248
249 def StartRPCServer(self):
250 self._server = SimpleJSONRPCServer(_OVERLORD_CLIENT_DAEMON_RPC_ADDR,
251 logRequests=False)
252 exports = [
253 (self.State, 'State'),
254 (self.Ping, 'Ping'),
255 (self.GetPid, 'GetPid'),
256 (self.Connect, 'Connect'),
257 (self.Clients, 'Clients'),
258 (self.SelectClient, 'SelectClient'),
259 (self.AddForward, 'AddForward'),
260 (self.RemoveForward, 'RemoveForward'),
261 (self.RemoveAllForward, 'RemoveAllForward'),
262 ]
263 for func, name in exports:
264 self._server.register_function(func, name)
265
266 pid = os.fork()
267 if pid == 0:
268 self._server.serve_forever()
269
270 @staticmethod
271 def GetRPCServer():
272 """Returns the Overlord client daemon RPC server."""
273 server = jsonrpclib.Server('http://%s:%d' %
274 _OVERLORD_CLIENT_DAEMON_RPC_ADDR)
275 try:
276 server.Ping()
277 except Exception:
278 return None
279 return server
280
281 def State(self):
282 return self._state
283
284 def Ping(self):
285 return True
286
287 def GetPid(self):
288 return os.getpid()
289
290 def _UrlOpen(self, url):
291 """Wrapper for urllib2.urlopen.
292
293 It selects correct HTTP scheme according to self._stat.ssl and add HTTP
294 basic auth headers.
295 """
296 url = MakeRequestUrl(self._state, url)
297 request = urllib2.Request(url)
298 if self._state.username is not None and self._state.password is not None:
299 request.add_header(*BasicAuthHeader(self._state.username,
300 self._state.password))
Wei-Ning Huangee7ca8d2015-12-12 05:48:02 +0800301 return urllib2.urlopen(request, timeout=_DEFAULT_HTTP_TIMEOUT)
Wei-Ning Huang91aaeed2015-09-24 14:51:56 +0800302
303 def _GetJSON(self, path):
304 url = '%s:%d%s' % (self._state.host, self._state.port, path)
305 return json.loads(self._UrlOpen(url).read())
306
307 def Connect(self, host, port=_OVERLORD_HTTP_PORT, ssh_pid=None,
308 username=None, password=None, orig_host=None):
309 self._state.username = username
310 self._state.password = password
311 self._state.host = host
312 self._state.port = port
313 self._state.ssl = False
314 self._state.orig_host = orig_host
315 self._state.ssh_pid = ssh_pid
316 self._state.selected_mid = None
317
318 try:
319 h = self._UrlOpen('%s:%d' % (host, port))
320 # Probably not an HTTP server, try HTTPS
321 if _OVERLORD_RESPONSE_KEYWORD not in h.read():
322 self._state.ssl = True
323 self._UrlOpen('%s:%d' % (host, port))
324 except urllib2.HTTPError as e:
Wei-Ning Huangba768ab2016-02-07 14:38:06 +0800325 return (e.getcode(), str(e), e.read().strip())
Wei-Ning Huang91aaeed2015-09-24 14:51:56 +0800326 except Exception as e:
Wei-Ning Huang91aaeed2015-09-24 14:51:56 +0800327 return str(e)
Wei-Ning Huangba768ab2016-02-07 14:38:06 +0800328 else:
329 return True
Wei-Ning Huang91aaeed2015-09-24 14:51:56 +0800330
331 def Clients(self):
332 if time.time() - self._state.last_list <= _LIST_CACHE_TIMEOUT:
333 return self._state.listing
334
335 mids = [client['mid'] for client in self._GetJSON('/api/agents/list')]
336 self._state.listing = sorted(list(set(mids)))
337 self._state.last_list = time.time()
338 return self._state.listing
339
340 def SelectClient(self, mid):
341 self._state.selected_mid = mid
342
343 def AddForward(self, mid, remote, local, pid):
344 self._state.forwards[local] = (mid, remote, pid)
345
346 def RemoveForward(self, local_port):
347 try:
348 unused_mid, unused_remote, pid = self._state.forwards[local_port]
349 KillGraceful(pid)
350 del self._state.forwards[local_port]
351 except (KeyError, OSError):
352 pass
353
354 def RemoveAllForward(self):
355 for unused_mid, unused_remote, pid in self._state.forwards.values():
356 try:
357 KillGraceful(pid)
358 except OSError:
359 pass
360 self._state.forwards = {}
361
362
363class TerminalWebSocketClient(WebSocketBaseClient):
364 def __init__(self, mid, *args, **kwargs):
365 super(TerminalWebSocketClient, self).__init__(*args, **kwargs)
366 self._mid = mid
367 self._stdin_fd = sys.stdin.fileno()
368 self._old_termios = None
369
370 def handshake_ok(self):
371 pass
372
373 def opened(self):
374 nonlocals = {'size': (80, 40)}
375
376 def _ResizeWindow():
377 size = GetTerminalSize()
378 if size != nonlocals['size']: # Size not changed, ignore
379 control = {'command': 'resize', 'params': list(size)}
380 payload = chr(_CONTROL_START) + json.dumps(control) + chr(_CONTROL_END)
381 nonlocals['size'] = size
382 try:
383 self.send(payload, binary=True)
384 except Exception:
385 pass
386
387 def _FeedInput():
Wei-Ning Huang91aaeed2015-09-24 14:51:56 +0800388 self._old_termios = termios.tcgetattr(self._stdin_fd)
389 tty.setraw(self._stdin_fd)
390
391 READY, ENTER_PRESSED, ESCAPE_PRESSED = range(3)
392
393 try:
394 state = READY
395 while True:
Wei-Ning Huang85e763d2016-01-21 15:53:18 +0800396 # Check if terminal is resized
Wei-Ning Huang91aaeed2015-09-24 14:51:56 +0800397 _ResizeWindow()
398
Wei-Ning Huang85e763d2016-01-21 15:53:18 +0800399 ch = sys.stdin.read(1)
Wei-Ning Huang91aaeed2015-09-24 14:51:56 +0800400
Wei-Ning Huang85e763d2016-01-21 15:53:18 +0800401 # Scan for escape sequence
402 if state == READY:
403 state = ENTER_PRESSED if ch == chr(0x0d) else READY
404 elif state == ENTER_PRESSED:
405 state = ESCAPE_PRESSED if ch == _ESCAPE else READY
406 elif state == ESCAPE_PRESSED:
407 if ch == '.':
408 self.close()
Wei-Ning Huang9083b7c2016-01-26 16:44:11 +0800409 break
Wei-Ning Huang85e763d2016-01-21 15:53:18 +0800410 else:
411 state = READY
Wei-Ning Huang91aaeed2015-09-24 14:51:56 +0800412
Wei-Ning Huang85e763d2016-01-21 15:53:18 +0800413 self.send(ch)
Wei-Ning Huang91aaeed2015-09-24 14:51:56 +0800414 except (KeyboardInterrupt, RuntimeError):
415 pass
416
417 t = threading.Thread(target=_FeedInput)
418 t.daemon = True
419 t.start()
420
421 def closed(self, code, reason=None):
422 termios.tcsetattr(self._stdin_fd, termios.TCSANOW, self._old_termios)
423 print('Connection to %s closed.' % self._mid)
424
425 def received_message(self, msg):
426 if msg.is_binary:
427 sys.stdout.write(msg.data)
428 sys.stdout.flush()
429
430
431class ShellWebSocketClient(WebSocketBaseClient):
432 def __init__(self, output, *args, **kwargs):
433 """Constructor.
434
435 Args:
436 output: output file object.
437 """
438 self.output = output
439 super(ShellWebSocketClient, self).__init__(*args, **kwargs)
440
441 def handshake_ok(self):
442 pass
443
444 def opened(self):
Wei-Ning Huang9083b7c2016-01-26 16:44:11 +0800445 def _FeedInput():
446 try:
447 while True:
448 data = sys.stdin.read(1)
449
450 if len(data) == 0:
451 self.send(_STDIN_CLOSED * 2)
452 break
453 self.send(data, binary=True)
454 except (KeyboardInterrupt, RuntimeError):
455 pass
456
457 t = threading.Thread(target=_FeedInput)
458 t.daemon = True
459 t.start()
Wei-Ning Huang91aaeed2015-09-24 14:51:56 +0800460
461 def closed(self, code, reason=None):
462 pass
463
464 def received_message(self, msg):
465 if msg.is_binary:
466 self.output.write(msg.data)
467 self.output.flush()
468
469
470class ForwarderWebSocketClient(WebSocketBaseClient):
471 def __init__(self, sock, *args, **kwargs):
472 super(ForwarderWebSocketClient, self).__init__(*args, **kwargs)
473 self._sock = sock
474 self._stop = threading.Event()
475
476 def handshake_ok(self):
477 pass
478
479 def opened(self):
480 def _FeedInput():
481 try:
482 self._sock.setblocking(False)
483 while True:
484 rd, unused_w, unused_x = select.select([self._sock], [], [], 0.5)
485 if self._stop.is_set():
486 break
487 if self._sock in rd:
488 data = self._sock.recv(_BUFSIZ)
489 if len(data) == 0:
Wei-Ning Huang9083b7c2016-01-26 16:44:11 +0800490 self.close()
Wei-Ning Huang91aaeed2015-09-24 14:51:56 +0800491 break
492 self.send(data, binary=True)
493 except Exception:
494 pass
495 finally:
496 self._sock.close()
Wei-Ning Huang91aaeed2015-09-24 14:51:56 +0800497
498 t = threading.Thread(target=_FeedInput)
499 t.daemon = True
500 t.start()
501
502 def closed(self, code, reason=None):
503 self._stop.set()
504 sys.exit(0)
505
506 def received_message(self, msg):
507 if msg.is_binary:
508 self._sock.send(msg.data)
509
510
511def Arg(*args, **kwargs):
512 return (args, kwargs)
513
514
515def Command(command, help_msg=None, args=None):
516 """Decorator for adding argparse parameter for a method."""
517 if args is None:
518 args = []
519 def WrapFunc(func):
520 def Wrapped(*args, **kwargs):
521 return func(*args, **kwargs)
522 # pylint: disable=W0212
523 Wrapped.__arg_attr = {'command': command, 'help': help_msg, 'args': args}
524 return Wrapped
525 return WrapFunc
526
527
528def ParseMethodSubCommands(cls):
529 """Decorator for a class using the @Command decorator.
530
531 This decorator retrieve command info from each method and append it in to the
532 SUBCOMMANDS class variable, which is later used to construct parser.
533 """
534 for unused_key, method in cls.__dict__.iteritems():
535 if hasattr(method, '__arg_attr'):
536 cls.SUBCOMMANDS.append(method.__arg_attr) # pylint: disable=W0212
537 return cls
538
539
540@ParseMethodSubCommands
541class OverlordCLIClient(object):
542 """Overlord command line interface client."""
543
544 SUBCOMMANDS = []
545
546 def __init__(self):
547 self._parser = self._BuildParser()
548 self._selected_mid = None
549 self._server = None
550 self._state = None
551
552 def _BuildParser(self):
553 root_parser = argparse.ArgumentParser(prog='ovl')
554 subparsers = root_parser.add_subparsers(help='sub-command')
555
556 root_parser.add_argument('-s', dest='selected_mid', action='store',
557 default=None,
558 help='select target to execute command on')
559 root_parser.add_argument('-S', dest='select_mid_before_action',
560 action='store_true', default=False,
561 help='select target before executing command')
562
563 for attr in self.SUBCOMMANDS:
564 parser = subparsers.add_parser(attr['command'], help=attr['help'])
565 parser.set_defaults(which=attr['command'])
566 for arg in attr['args']:
567 parser.add_argument(*arg[0], **arg[1])
568
569 return root_parser
570
571 def Main(self):
572 # We want to pass the rest of arguments after shell command directly to the
573 # function without parsing it.
574 try:
575 index = sys.argv.index('shell')
576 except ValueError:
577 args = self._parser.parse_args()
578 else:
579 args = self._parser.parse_args(sys.argv[1:index + 1])
580
581 command = args.which
582 self._selected_mid = args.selected_mid
583
Wei-Ning Huang5564eea2016-01-19 14:36:45 +0800584 if command == 'start-server':
585 self.StartServer()
586 return
587 elif command == 'kill-server':
Wei-Ning Huang91aaeed2015-09-24 14:51:56 +0800588 self.KillServer()
589 return
590
Wei-Ning Huang5564eea2016-01-19 14:36:45 +0800591 self.CheckDaemon()
Wei-Ning Huang91aaeed2015-09-24 14:51:56 +0800592 if command == 'status':
593 self.Status()
594 return
595 elif command == 'connect':
596 self.Connect(args)
597 return
598
599 # The following command requires connection to the server
600 self.CheckConnection()
601
602 if args.select_mid_before_action:
603 self.SelectClient(store=False)
604
605 if command == 'select':
606 self.SelectClient(args)
607 elif command == 'ls':
608 self.ListClients()
609 elif command == 'shell':
610 command = sys.argv[sys.argv.index('shell') + 1:]
611 self.Shell(command)
612 elif command == 'push':
613 self.Push(args)
614 elif command == 'pull':
615 self.Pull(args)
616 elif command == 'forward':
617 self.Forward(args)
618
619 def _UrlOpen(self, url):
620 url = MakeRequestUrl(self._state, url)
621 request = urllib2.Request(url)
622 if self._state.username is not None and self._state.password is not None:
623 request.add_header(*BasicAuthHeader(self._state.username,
624 self._state.password))
Wei-Ning Huangee7ca8d2015-12-12 05:48:02 +0800625 return urllib2.urlopen(request, timeout=_DEFAULT_HTTP_TIMEOUT)
Wei-Ning Huang91aaeed2015-09-24 14:51:56 +0800626
627 def _HTTPPostFile(self, url, filename, progress=None, user=None, passwd=None):
628 """Perform HTTP POST and upload file to Overlord.
629
630 To minimize the external dependencies, we construct the HTTP post request
631 by ourselves.
632 """
633 url = MakeRequestUrl(self._state, url)
634 size = os.stat(filename).st_size
635 boundary = '-----------%s' % _HTTP_BOUNDARY_MAGIC
636 CRLF = '\r\n'
637 parse = urlparse.urlparse(url)
638
639 part_headers = [
640 '--' + boundary,
641 'Content-Disposition: form-data; name="file"; '
642 'filename="%s"' % os.path.basename(filename),
643 'Content-Type: application/octet-stream',
644 '', ''
645 ]
646 part_header = CRLF.join(part_headers)
647 end_part = CRLF + '--' + boundary + '--' + CRLF
648
649 content_length = len(part_header) + size + len(end_part)
650 if parse.scheme == 'http':
651 h = httplib.HTTP(parse.netloc)
652 else:
653 h = httplib.HTTPS(parse.netloc)
654
655 post_path = url[url.index(parse.netloc) + len(parse.netloc):]
656 h.putrequest('POST', post_path)
657 h.putheader('Content-Length', content_length)
658 h.putheader('Content-Type', 'multipart/form-data; boundary=%s' % boundary)
659
660 if user and passwd:
661 h.putheader(*BasicAuthHeader(user, passwd))
662 h.endheaders()
663 h.send(part_header)
664
665 count = 0
666 with open(filename, 'r') as f:
667 while True:
668 data = f.read(_BUFSIZ)
669 if not data:
670 break
671 count += len(data)
672 if progress:
673 progress(int(count * 100.0 / size), count)
674 h.send(data)
675
676 h.send(end_part)
677 progress(100)
678
679 if count != size:
680 logging.warning('file changed during upload, upload may be truncated.')
681
682 errcode, unused_x, unused_y = h.getreply()
683 return errcode == 200
684
Wei-Ning Huang5564eea2016-01-19 14:36:45 +0800685 def CheckDaemon(self):
Wei-Ning Huang91aaeed2015-09-24 14:51:56 +0800686 self._server = OverlordClientDaemon.GetRPCServer()
687 if self._server is None:
688 print('* daemon not running, starting it now on port %d ... *' %
689 _OVERLORD_CLIENT_DAEMON_PORT)
Wei-Ning Huang5564eea2016-01-19 14:36:45 +0800690 self.StartServer()
Wei-Ning Huang91aaeed2015-09-24 14:51:56 +0800691
692 self._state = self._server.State()
693 sha1sum = GetVersionDigest()
694
695 if sha1sum != self._state.version_sha1sum:
696 print('ovl server is out of date. killing...')
697 KillGraceful(self._server.GetPid())
Wei-Ning Huang5564eea2016-01-19 14:36:45 +0800698 self.StartServer()
Wei-Ning Huang91aaeed2015-09-24 14:51:56 +0800699
700 def GetSSHControlFile(self, host):
701 return _SSH_CONTROL_SOCKET_PREFIX + host
702
703 def SSHTunnel(self, user, host, port):
704 """SSH forward the remote overlord server.
705
706 Overlord server may not have port 9000 open to the public network, in such
707 case we can SSH forward the port to localhost.
708 """
709
710 control_file = self.GetSSHControlFile(host)
711 try:
712 os.unlink(control_file)
713 except Exception:
714 pass
715
716 subprocess.Popen([
717 'ssh', '-Nf',
718 '-M', # Enable master mode
719 '-S', control_file,
720 '-L', '9000:localhost:9000',
721 '-p', str(port),
722 '%s%s' % (user + '@' if user else '', host)
723 ]).wait()
724
725 p = subprocess.Popen([
726 'ssh',
727 '-S', control_file,
728 '-O', 'check', host,
729 ], stderr=subprocess.PIPE)
730 unused_stdout, stderr = p.communicate()
731
732 s = re.search(r'pid=(\d+)', stderr)
733 if s:
734 return int(s.group(1))
735
736 raise RuntimeError('can not establish ssh connection')
737
738 def CheckConnection(self):
739 if self._state.host is None:
740 raise RuntimeError('not connected to any server, abort')
741
742 try:
743 self._server.Clients()
744 except Exception:
745 raise RuntimeError('remote server disconnected, abort')
746
747 if self._state.ssh_pid is not None:
748 ret = subprocess.Popen(['kill', '-0', str(self._state.ssh_pid)],
749 stdout=subprocess.PIPE,
750 stderr=subprocess.PIPE).wait()
751 if ret != 0:
752 raise RuntimeError('ssh tunnel disconnected, please re-connect')
753
754 def CheckClient(self):
755 if self._selected_mid is None:
756 if self._state.selected_mid is None:
757 raise RuntimeError('No client is selected')
758 self._selected_mid = self._state.selected_mid
759
760 if self._selected_mid not in self._server.Clients():
761 raise RuntimeError('client %s disappeared' % self._selected_mid)
762
763 def CheckOutput(self, command):
764 headers = []
765 if self._state.username is not None and self._state.password is not None:
766 headers.append(BasicAuthHeader(self._state.username,
767 self._state.password))
768
769 scheme = 'ws%s://' % ('s' if self._state.ssl else '')
770 sio = StringIO.StringIO()
771 ws = ShellWebSocketClient(sio,
772 scheme + '%s:%d/api/agent/shell/%s?command=%s' %
773 (self._state.host, self._state.port,
774 self._selected_mid, urllib2.quote(command)),
775 headers=headers)
776 ws.connect()
777 ws.run()
778 return sio.getvalue()
779
780 @Command('status', 'show Overlord connection status')
781 def Status(self):
782 if self._state.host is None:
783 print('Not connected to any host.')
784 else:
785 if self._state.ssh_pid is not None:
786 print('Connected to %s with SSH tunneling.' % self._state.orig_host)
787 else:
788 print('Connected to %s:%d.' % (self._state.host, self._state.port))
789
790 if self._selected_mid is None:
791 self._selected_mid = self._state.selected_mid
792
793 if self._selected_mid is None:
794 print('No client is selected.')
795 else:
796 print('Client %s selected.' % self._selected_mid)
797
798 @Command('connect', 'connect to Overlord server', [
799 Arg('host', metavar='HOST', type=str, default='localhost',
800 help='Overlord hostname/IP'),
801 Arg('port', metavar='PORT', type=int,
802 default=_OVERLORD_HTTP_PORT, help='Overlord port'),
803 Arg('-f', '--forward', dest='ssh_forward', default=False,
804 action='store_true',
805 help='connect with SSH forwarding to the host'),
806 Arg('-p', '--ssh-port', dest='ssh_port', default=22,
807 type=int, help='SSH server port for SSH forwarding'),
808 Arg('-l', '--ssh-login', dest='ssh_login', default='',
809 type=str, help='SSH server login name for SSH forwarding'),
810 Arg('-u', '--user', dest='user', default=None,
811 type=str, help='Overlord HTTP auth username'),
812 Arg('-w', '--passwd', dest='passwd', default=None, type=str,
813 help='Overlord HTTP auth password')])
814 def Connect(self, args):
815 ssh_pid = None
816 host = args.host
817 orig_host = args.host
818
819 if args.ssh_forward:
820 # Kill previous SSH tunnel
821 self.KillSSHTunnel()
822
823 ssh_pid = self.SSHTunnel(args.ssh_login, args.host, args.ssh_port)
824 host = 'localhost'
825
Wei-Ning Huangba768ab2016-02-07 14:38:06 +0800826 username_provided = args.user is not None
827 password_provided = args.passwd is not None
828 prompt = False
829
830 for unused_i in range(3):
831 try:
832 if prompt:
833 if not username_provided:
834 args.user = raw_input('Username: ')
835 if not password_provided:
836 args.passwd = getpass.getpass('Password: ')
837
838 ret = self._server.Connect(host, args.port, ssh_pid, args.user,
839 args.passwd, orig_host)
840 # HTTPError
841 if isinstance(ret, list):
842 code, except_str, body = ret
843 if code == 401:
844 print('connect: %s' % body)
845 prompt = True
846 if not username_provided or not password_provided:
847 continue
848 else:
849 break
850 else:
851 logging.error('%s; %s', except_str, body)
852
853 if ret is not True:
854 print('can not connect to %s: %s' % (host, ret))
855 except Exception as e:
856 logging.exception(e)
Wei-Ning Huang91aaeed2015-09-24 14:51:56 +0800857 else:
Wei-Ning Huangba768ab2016-02-07 14:38:06 +0800858 break
Wei-Ning Huang91aaeed2015-09-24 14:51:56 +0800859
Wei-Ning Huang5564eea2016-01-19 14:36:45 +0800860 @Command('start-server', 'start overlord CLI client server')
861 def StartServer(self):
862 self._server = OverlordClientDaemon.GetRPCServer()
863 if self._server is None:
864 OverlordClientDaemon().Start()
865 time.sleep(1)
866 self._server = OverlordClientDaemon.GetRPCServer()
867 if self._server is not None:
868 print('* daemon started successfully *')
869
Wei-Ning Huang91aaeed2015-09-24 14:51:56 +0800870 @Command('kill-server', 'kill overlord CLI client server')
871 def KillServer(self):
872 self._server = OverlordClientDaemon.GetRPCServer()
873 if self._server is None:
874 return
875
Wei-Ning Huangee7ca8d2015-12-12 05:48:02 +0800876 self._state = self._server.State()
877
Wei-Ning Huang91aaeed2015-09-24 14:51:56 +0800878 # Kill SSH Tunnel
879 self.KillSSHTunnel()
880
881 # Kill server daemon
882 KillGraceful(self._server.GetPid())
883
884 def KillSSHTunnel(self):
885 if self._state.ssh_pid is not None:
886 KillGraceful(self._state.ssh_pid)
887
888 @Command('ls', 'list all clients')
889 def ListClients(self):
890 for client in self._server.Clients():
891 print(client)
892
893 @Command('select', 'select default client', [
894 Arg('mid', metavar='mid', nargs='?', default=None)])
895 def SelectClient(self, args=None, store=True):
896 clients = self._server.Clients()
897
898 mid = args.mid if args is not None else None
899 if mid is None:
900 print('Select from the following clients:')
901 for i, client in enumerate(clients):
902 print(' %d. %s' % (i + 1, client))
903
904 print('\nSelection: ', end='')
905 try:
906 choice = int(raw_input()) - 1
907 mid = clients[choice]
908 except ValueError:
909 raise RuntimeError('select: invalid selection')
910 except IndexError:
911 raise RuntimeError('select: selection out of range')
912 else:
913 if mid not in clients:
914 raise RuntimeError('select: client %s does not exist' % mid)
915
916 self._selected_mid = mid
917 if store:
918 self._server.SelectClient(mid)
919 print('Client %s selected' % mid)
920
921 @Command('shell', 'open a shell or execute a shell command', [
922 Arg('command', metavar='CMD', nargs='?', help='command to execute')])
923 def Shell(self, command=None):
924 if command is None:
925 command = []
926 self.CheckClient()
927
928 headers = []
929 if self._state.username is not None and self._state.password is not None:
930 headers.append(BasicAuthHeader(self._state.username,
931 self._state.password))
932
933 scheme = 'ws%s://' % ('s' if self._state.ssl else '')
934 if len(command) == 0:
935 ws = TerminalWebSocketClient(self._selected_mid,
936 scheme + '%s:%d/api/agent/tty/%s' %
937 (self._state.host, self._state.port,
938 self._selected_mid), headers=headers)
939 else:
940 cmd = ' '.join(command)
941 ws = ShellWebSocketClient(sys.stdout,
942 scheme + '%s:%d/api/agent/shell/%s?command=%s' %
943 (self._state.host, self._state.port,
944 self._selected_mid, urllib2.quote(cmd)),
945 headers=headers)
Wei-Ning Huang9083b7c2016-01-26 16:44:11 +0800946 try:
947 ws.connect()
948 ws.run()
949 except socket.error as e:
950 if e.errno == 32: # Broken pipe
951 pass
952 else:
953 raise
Wei-Ning Huang91aaeed2015-09-24 14:51:56 +0800954
955 @Command('push', 'push a file or directory to remote', [
Wei-Ning Huang37e61102016-02-07 13:41:07 +0800956 Arg('srcs', nargs='+', metavar='SOURCE'),
Wei-Ning Huang91aaeed2015-09-24 14:51:56 +0800957 Arg('dst', metavar='DESTINATION')])
958 def Push(self, args):
959 self.CheckClient()
960
Wei-Ning Huangee7ca8d2015-12-12 05:48:02 +0800961 @AutoRetry('push', _RETRY_TIMES)
Wei-Ning Huang91aaeed2015-09-24 14:51:56 +0800962 def _push(src, dst):
963 src_base = os.path.basename(src)
Wei-Ning Huangee7ca8d2015-12-12 05:48:02 +0800964
965 # Local file is a link
966 if os.path.islink(src):
967 pbar = ProgressBar(src_base)
968 link_path = os.readlink(src)
969 self.CheckOutput('mkdir -p %(dirname)s; '
970 'if [ -d "%(dst)s" ]; then '
971 'ln -sf "%(link_path)s" "%(dst)s/%(link_name)s"; '
972 'else ln -sf "%(link_path)s" "%(dst)s"; fi' %
973 dict(dirname=os.path.dirname(dst),
974 link_path=link_path, dst=dst,
975 link_name=src_base))
976 pbar.End()
977 return
978
Wei-Ning Huang91aaeed2015-09-24 14:51:56 +0800979 mode = '0%o' % (0x1FF & os.stat(src).st_mode)
980 url = ('%s:%d/api/agent/upload/%s?dest=%s&perm=%s' %
981 (self._state.host, self._state.port, self._selected_mid, dst,
982 mode))
983 try:
984 self._UrlOpen(url + '&filename=%s' % src_base)
985 except urllib2.HTTPError as e:
986 msg = json.loads(e.read()).get('error', None)
987 raise RuntimeError('push: %s' % msg)
988
989 pbar = ProgressBar(src_base)
990 self._HTTPPostFile(url, src, pbar.SetProgress,
991 self._state.username, self._state.password)
992 pbar.End()
993
Wei-Ning Huang37e61102016-02-07 13:41:07 +0800994 def _push_single_target(src, dst):
995 if os.path.isdir(src):
996 dst_exists = ast.literal_eval(self.CheckOutput(
997 'stat %s >/dev/null 2>&1 && echo True || echo False' % dst))
998 for root, unused_x, files in os.walk(src):
999 # If destination directory does not exist, we should strip the first
1000 # layer of directory. For example: src_dir contains a single file 'A'
1001 #
1002 # push src_dir dest_dir
1003 #
1004 # If dest_dir exists, the resulting directory structure should be:
1005 # dest_dir/src_dir/A
1006 # If dest_dir does not exist, the resulting directory structure should
1007 # be:
1008 # dest_dir/A
1009 dst_root = root if dst_exists else root[len(src):].lstrip('/')
1010 for name in files:
1011 _push(os.path.join(root, name),
1012 os.path.join(dst, dst_root, name))
1013 else:
1014 _push(src, dst)
1015
1016 if len(args.srcs) > 1:
1017 dst_type = self.CheckOutput('stat \'%s\' --printf \'%%F\' '
1018 '2>/dev/null' % args.dst).strip()
1019 if not dst_type:
1020 raise RuntimeError('push: %s: No such file or directory' % args.dst)
1021 if dst_type != 'directory':
1022 raise RuntimeError('push: %s: Not a directory' % args.dst)
1023
1024 for src in args.srcs:
1025 if not os.path.exists(src):
1026 raise RuntimeError('push: can not stat "%s": no such file or directory'
1027 % src)
1028 if not os.access(src, os.R_OK):
1029 raise RuntimeError('push: can not open "%s" for reading' % src)
1030
1031 _push_single_target(src, args.dst)
Wei-Ning Huang91aaeed2015-09-24 14:51:56 +08001032
1033 @Command('pull', 'pull a file or directory from remote', [
1034 Arg('src', metavar='SOURCE'),
1035 Arg('dst', metavar='DESTINATION', default='.', nargs='?')])
1036 def Pull(self, args):
1037 self.CheckClient()
1038
Wei-Ning Huangee7ca8d2015-12-12 05:48:02 +08001039 @AutoRetry('pull', _RETRY_TIMES)
1040 def _pull(src, dst, ftype, perm=0644, link=None):
1041 try:
1042 os.makedirs(os.path.dirname(dst))
1043 except Exception:
1044 pass
1045
1046 src_base = os.path.basename(src)
1047
1048 # Remote file is a link
1049 if ftype == 'l':
1050 pbar = ProgressBar(src_base)
1051 if os.path.exists(dst):
1052 os.remove(dst)
1053 os.symlink(link, dst)
1054 pbar.End()
1055 return
1056
Wei-Ning Huang91aaeed2015-09-24 14:51:56 +08001057 url = ('%s:%d/api/agent/download/%s?filename=%s' %
1058 (self._state.host, self._state.port, self._selected_mid,
1059 urllib2.quote(src)))
1060 try:
1061 h = self._UrlOpen(url)
1062 except urllib2.HTTPError as e:
1063 msg = json.loads(e.read()).get('error', 'unkown error')
1064 raise RuntimeError('pull: %s' % msg)
1065 except KeyboardInterrupt:
1066 return
1067
Wei-Ning Huangee7ca8d2015-12-12 05:48:02 +08001068 pbar = ProgressBar(src_base)
Wei-Ning Huang91aaeed2015-09-24 14:51:56 +08001069 with open(dst, 'w') as f:
1070 os.fchmod(f.fileno(), perm)
1071 total_size = int(h.headers.get('Content-Length'))
1072 downloaded_size = 0
Wei-Ning Huangee7ca8d2015-12-12 05:48:02 +08001073
Wei-Ning Huang91aaeed2015-09-24 14:51:56 +08001074 while True:
1075 data = h.read(_BUFSIZ)
Wei-Ning Huangee7ca8d2015-12-12 05:48:02 +08001076 if len(data) == 0:
1077 break
Wei-Ning Huang91aaeed2015-09-24 14:51:56 +08001078 downloaded_size += len(data)
1079 pbar.SetProgress(float(downloaded_size) * 100 / total_size,
1080 downloaded_size)
Wei-Ning Huang91aaeed2015-09-24 14:51:56 +08001081 f.write(data)
1082 pbar.End()
1083
1084 # Use find to get a listing of all files under a root directory. The 'stat'
1085 # command is used to retrieve the filename and it's filemode.
1086 output = self.CheckOutput(
1087 'cd $HOME; '
1088 'stat "%(src)s" >/dev/null && '
Wei-Ning Huangee7ca8d2015-12-12 05:48:02 +08001089 'find "%(src)s" \'(\' -type f -o -type l \')\' '
1090 '-printf \'%%m\t%%p\t%%y\t%%l\n\''
Wei-Ning Huang91aaeed2015-09-24 14:51:56 +08001091 % {'src': args.src})
1092
1093 # We got error from the stat command
1094 if output.startswith('stat: '):
1095 sys.stderr.write(output)
1096 return
1097
Wei-Ning Huangee7ca8d2015-12-12 05:48:02 +08001098 entries = output.strip('\n').split('\n')
Wei-Ning Huang91aaeed2015-09-24 14:51:56 +08001099 common_prefix = os.path.dirname(args.src)
1100
1101 if len(entries) == 1:
1102 entry = entries[0]
Wei-Ning Huangee7ca8d2015-12-12 05:48:02 +08001103 perm, src_path, ftype, link = entry.split('\t', -1)
Wei-Ning Huang91aaeed2015-09-24 14:51:56 +08001104 if os.path.isdir(args.dst):
1105 dst = os.path.join(args.dst, os.path.basename(src_path))
1106 else:
1107 dst = args.dst
Wei-Ning Huangee7ca8d2015-12-12 05:48:02 +08001108 _pull(src_path, dst, ftype, int(perm, base=8), link)
Wei-Ning Huang91aaeed2015-09-24 14:51:56 +08001109 else:
1110 if not os.path.exists(args.dst):
1111 common_prefix = args.src
1112
1113 for entry in entries:
Wei-Ning Huangee7ca8d2015-12-12 05:48:02 +08001114 perm, src_path, ftype, link = entry.split('\t', -1)
Wei-Ning Huang91aaeed2015-09-24 14:51:56 +08001115 rel_dst = src_path[len(common_prefix):].lstrip('/')
Wei-Ning Huangee7ca8d2015-12-12 05:48:02 +08001116 _pull(src_path, os.path.join(args.dst, rel_dst), ftype,
1117 int(perm, base=8), link)
Wei-Ning Huang91aaeed2015-09-24 14:51:56 +08001118
1119 @Command('forward', 'forward remote port to local port', [
1120 Arg('--list', dest='list_all', action='store_true', default=False,
1121 help='list all port forwarding sessions'),
1122 Arg('--remove', metavar='LOCAL_PORT', dest='remove', type=int,
1123 default=None,
1124 help='remove port forwarding for local port LOCAL_PORT'),
1125 Arg('--remove-all', dest='remove_all', action='store_true',
1126 default=False, help='remove all port forwarding'),
1127 Arg('remote', metavar='REMOTE_PORT', type=int, nargs='?'),
1128 Arg('local', metavar='LOCAL_PORT', type=int, nargs='?')])
1129 def Forward(self, args):
1130 if args.list_all:
1131 max_len = 10
1132 if len(self._state.forwards):
1133 max_len = max([len(v[0]) for v in self._state.forwards.values()])
1134
1135 print('%-*s %-8s %-8s' % (max_len, 'Client', 'Remote', 'Local'))
1136 for local in sorted(self._state.forwards.keys()):
1137 value = self._state.forwards[local]
1138 print('%-*s %-8s %-8s' % (max_len, value[0], value[1], local))
1139 return
1140
1141 if args.remove_all:
1142 self._server.RemoveAllForward()
1143 return
1144
1145 if args.remove:
1146 self._server.RemoveForward(args.remove)
1147 return
1148
1149 self.CheckClient()
1150
Wei-Ning Huang9083b7c2016-01-26 16:44:11 +08001151 if args.remote is None:
1152 raise RuntimeError('remote port not specified')
1153
Wei-Ning Huang91aaeed2015-09-24 14:51:56 +08001154 if args.local is None:
1155 args.local = args.remote
1156 remote = int(args.remote)
1157 local = int(args.local)
1158
1159 def HandleConnection(conn):
1160 headers = []
1161 if self._state.username is not None and self._state.password is not None:
1162 headers.append(BasicAuthHeader(self._state.username,
1163 self._state.password))
1164
1165 scheme = 'ws%s://' % ('s' if self._state.ssl else '')
1166 ws = ForwarderWebSocketClient(
1167 conn,
1168 scheme + '%s:%d/api/agent/forward/%s?port=%d' %
1169 (self._state.host, self._state.port, self._selected_mid, remote),
1170 headers=headers)
1171 try:
1172 ws.connect()
1173 ws.run()
1174 except Exception as e:
1175 print('error: %s' % e)
1176 finally:
1177 ws.close()
1178
1179 server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1180 server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
1181 server.bind(('0.0.0.0', local))
1182 server.listen(5)
1183
1184 pid = os.fork()
1185 if pid == 0:
1186 while True:
1187 conn, unused_addr = server.accept()
1188 t = threading.Thread(target=HandleConnection, args=(conn,))
1189 t.daemon = True
1190 t.start()
1191 else:
1192 self._server.AddForward(self._selected_mid, remote, local, pid)
1193
1194
1195def main():
Wei-Ning Huangba768ab2016-02-07 14:38:06 +08001196 # Setup logging format
1197 logger = logging.getLogger()
1198 logger.setLevel(logging.INFO)
1199 handler = logging.StreamHandler()
1200 formatter = logging.Formatter('%(asctime)s %(message)s', '%Y/%m/%d %H:%M:%S')
1201 handler.setFormatter(formatter)
1202 logger.addHandler(handler)
Wei-Ning Huang91aaeed2015-09-24 14:51:56 +08001203
1204 # Add DaemonState to JSONRPC lib classes
1205 Config.instance().classes.add(DaemonState)
1206
1207 ovl = OverlordCLIClient()
1208 try:
1209 ovl.Main()
1210 except KeyboardInterrupt:
1211 print('Ctrl-C received, abort')
1212 except Exception as e:
1213 print('error: %s' % e)
1214
1215
1216if __name__ == '__main__':
1217 main()