overlord: implement port forwarding

Implement the port forwarding functionality by forwarding the port data
through WebSocket. The WebSocket HTTP path is:
/api/agent/forward/{mid}/?port=PORT

BUG=chromium:517520
TEST=manually

Change-Id: Id21422ff705dbae710e155e936b0873e4e2f2b3e
Reviewed-on: https://chromium-review.googlesource.com/301430
Commit-Ready: Wei-Ning Huang <wnhuang@chromium.org>
Tested-by: Wei-Ning Huang <wnhuang@chromium.org>
Reviewed-by: Hsu Wei-Cheng <mojahsu@chromium.org>
diff --git a/py/tools/ghost.py b/py/tools/ghost.py
index 30f7bd2..8c68ce2 100755
--- a/py/tools/ghost.py
+++ b/py/tools/ghost.py
@@ -298,7 +298,7 @@
   Ghost provide terminal/shell/logcat functionality and manages the client
   side connectivity.
   """
-  NONE, AGENT, TERMINAL, SHELL, LOGCAT, FILE = range(6)
+  NONE, AGENT, TERMINAL, SHELL, LOGCAT, FILE, FORWARD = range(7)
 
   MODE_NAME = {
       NONE: 'NONE',
@@ -306,14 +306,15 @@
       TERMINAL: 'Terminal',
       SHELL: 'Shell',
       LOGCAT: 'Logcat',
-      FILE: 'File'
+      FILE: 'File',
+      FORWARD: 'Forward'
       }
 
   RANDOM_MID = '##random_mid##'
 
   def __init__(self, overlord_addrs, mode=AGENT, mid=None, sid=None,
                prop_file=None, terminal_sid=None, tty_device=None,
-               command=None, file_op=None):
+               command=None, file_op=None, port=None):
     """Constructor.
 
     Args:
@@ -332,8 +333,10 @@
       file_op: a tuple (action, filepath, pid). action is either 'download' or
         'upload'. pid is the pid of the target shell, used to determine where
         the current working is and thus where to upload to.
+      port: port number to forward.
     """
-    assert mode in [Ghost.AGENT, Ghost.TERMINAL, Ghost.SHELL, Ghost.FILE]
+    assert mode in [Ghost.AGENT, Ghost.TERMINAL, Ghost.SHELL, Ghost.FILE,
+                    Ghost.FORWARD]
     if mode == Ghost.SHELL:
       assert command is not None
     if mode == Ghost.FILE:
@@ -365,6 +368,7 @@
     self._shell_command = command
     self._file_op = file_op
     self._download_queue = Queue.Queue()
+    self._port = port
 
     # SSH Forwarding related
     self._forward_ssh = False
@@ -463,7 +467,7 @@
         pass
 
   def SpawnGhost(self, mode, sid=None, terminal_sid=None, tty_device=None,
-                 command=None, file_op=None):
+                 command=None, file_op=None, port=None):
     """Spawn a child ghost with specific mode.
 
     Returns:
@@ -477,7 +481,7 @@
       self.CloseSockets()
       g = Ghost([self._connected_addr], mode, Ghost.RANDOM_MID, sid,
                 terminal_sid=terminal_sid, tty_device=tty_device,
-                command=command, file_op=file_op)
+                command=command, file_op=file_op, port=port)
       g.Start()
       sys.exit(0)
     else:
@@ -820,6 +824,45 @@
     logging.info('StartUploadServer: terminated')
     sys.exit(0)
 
+  def SpawnPortForwardServer(self, _):
+    """Spawn a port forwarding server and forward I/O to the TCP socket."""
+    logging.info('SpawnPortForwardServer: started')
+
+    src_sock = None
+    try:
+      src_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+      src_sock.connect(('localhost', self._port))
+      src_sock.setblocking(False)
+
+      # Pass the leftovers of the previous buffer
+      if self._buf:
+        src_sock.send(self._buf)
+        self._buf = ''
+
+      while True:
+        rd, _, _ = select.select([self._sock, src_sock], [], [])
+
+        if self._sock in rd:
+          data = self._sock.recv(_BUFSIZE)
+          if not data:
+            break
+          src_sock.send(data)
+
+        if src_sock in rd:
+          data = src_sock.recv(_BUFSIZE)
+          if not data:
+            break
+          self._sock.send(data)
+    except Exception as e:
+      logging.error('SpawnPortForwardServer: %s', e)
+    finally:
+      if src_sock:
+        src_sock.close()
+      self._sock.close()
+
+    logging.info('SpawnPortForwardServer: terminated')
+    sys.exit(0)
+
   def Ping(self):
     def timeout_handler(x):
       if x is None:
@@ -828,7 +871,6 @@
     self._last_ping = self.Timestamp()
     self.SendRequest('ping', {}, timeout_handler, 5)
 
-
   def HanldeFileDownloadRequest(self, msg):
     params = msg['params']
     try:
@@ -863,6 +905,9 @@
       self.SpawnGhost(self.FILE, params['sid'],
                       file_op=('upload', params['filename'], pid))
       self.SendResponse(msg, SUCCESS)
+    elif command == 'forward':
+      self.SpawnGhost(self.FORWARD, params['sid'], port=params['port'])
+      self.SendResponse(msg, SUCCESS)
 
   def HandleResponse(self, response):
     rid = str(response['rid'])
@@ -875,15 +920,21 @@
       print(response, self._requests.keys())
       logging.warning('Received unsolicited response, ignored')
 
-  def ParseMessage(self):
-    msgs_json = self._buf.split(_SEPARATOR)
-    self._buf = msgs_json.pop()
+  def ParseMessage(self, single=True):
+    if single:
+      index = self._buf.index(_SEPARATOR)
+      msgs_json = [self._buf[:index]]
+      self._buf = self._buf[index + 2:]
+    else:
+      msgs_json = self._buf.split(_SEPARATOR)
+      self._buf = msgs_json.pop()
 
     for msg_json in msgs_json:
       try:
         msg = json.loads(msg_json)
       except ValueError:
         # Ignore mal-formed message.
+        logging.error('mal-formed JSON request, ignored')
         continue
 
       if 'name' in msg:
@@ -891,7 +942,7 @@
       elif 'response' in msg:
         self.HandleResponse(msg)
       else:  # Ingnore mal-formed message.
-        pass
+        logging.error('mal-formed JSON request, ignored')
 
   def ScanForTimeoutRequests(self):
     """Scans for pending requests which have timed out.
@@ -921,7 +972,7 @@
 
         if self._sock in rds:
           self._buf += self._sock.recv(_BUFSIZE)
-          self.ParseMessage()
+          self.ParseMessage(self._register_status != SUCCESS)
 
         if (self._mode == self.AGENT and
             self.Timestamp() - self._last_ping > _PING_INTERVAL):
@@ -982,6 +1033,7 @@
             Ghost.TERMINAL: self.SpawnTTYServer,
             Ghost.SHELL: self.SpawnShellServer,
             Ghost.FILE: self.InitiateFileOperation,
+            Ghost.FORWARD: self.SpawnPortForwardServer,
             }[self._mode]
 
         # Machine ID may change if MAC address is used (USB-ethernet dongle