overlord: implement file upload function

Implement file upload functionality in overlord.  The user can drag and
drop files into the terminal window to upload. The file will be upload
into the current working directory of the active shell.

BUG=chromium:465674
TEST=manually drag and drop file into the terminal

Change-Id: I9c1c2538db07ccb5fc36c20c98bf2a1650cc99b3
Reviewed-on: https://chromium-review.googlesource.com/290485
Commit-Ready: Wei-Ning Huang <wnhuang@chromium.org>
Tested-by: Wei-Ning Huang <wnhuang@chromium.org>
Reviewed-by: Hsu Wei-Cheng <mojahsu@chromium.org>
diff --git a/py/tools/ghost.py b/py/tools/ghost.py
index ad4b5be..17faff0 100755
--- a/py/tools/ghost.py
+++ b/py/tools/ghost.py
@@ -93,8 +93,9 @@
         be set to the corresponding session id assigned by overlord.
       bid: browser ID. Identifies the browser which started the session.
       command: the command to execute when we are in SHELL mode.
-      file_op: a tuple (action, filepath). action is either 'download' or
-        'upload'.
+      file_op: a tuple (action, filepath, pid). action is either 'download' or
+        'upload'. pid is the pid of the target shell, used to determine where
+        the current working is and thus where to upload to.
     """
     assert mode in [Ghost.AGENT, Ghost.TERMINAL, Ghost.SHELL, Ghost.FILE]
     if mode == Ghost.SHELL:
@@ -119,7 +120,8 @@
     self._last_ping = 0
     self._queue = Queue.Queue()
     self._download_queue = Queue.Queue()
-    self._session_map = {}  # Stores the mapping between ttyname and browser_id
+    self._ttyname_to_bid = {}
+    self._terminal_sid_to_pid = {}
 
   def SetIgnoreChild(self, status):
     # Only ignore child for Agent since only it could spawn child Ghost.
@@ -329,7 +331,8 @@
 
     rid = str(uuid.uuid4())
     msg = {'rid': rid, 'timeout': timeout, 'name': name, 'params': args}
-    self._requests[rid] = [self.Timestamp(), timeout, handler]
+    if timeout >= 0:
+      self._requests[rid] = [self.Timestamp(), timeout, handler]
     self.SendMessage(msg)
 
   def SendResponse(self, omsg, status, params=None):
@@ -360,6 +363,7 @@
       try:
         server = GhostRPCServer()
         server.RegisterTTY(self._browser_id, ttyname)
+        server.RegisterSession(self._session_id, os.getpid())
       except Exception:
         # If ghost is launched without RPC server, the call will fail but we
         # can ignore it.
@@ -464,6 +468,11 @@
                        {'bid': self._browser_id,
                         'filename': os.path.basename(self._file_op[1]),
                         'size': size})
+    elif self._file_op[0] == 'upload':
+      self.SendRequest('clear_to_upload', {}, timeout=-1)
+      self.StartUploadServer()
+    else:
+      logging.error('InitiateFileOperation: unknown file operation, ignored')
 
   def StartDownloadServer(self):
     logging.info('StartDownloadServer: started')
@@ -483,6 +492,35 @@
     logging.info('StartDownloadServer: terminated')
     sys.exit(0)
 
+  def StartUploadServer(self):
+    logging.info('StartUploadServer: started')
+
+    try:
+      target_dir = os.getenv('HOME', '/tmp')
+
+      # Get the client's working dir, which is our target upload dir
+      if self._file_op[2]:
+        target_dir = os.readlink('/proc/%d/cwd' % self._file_op[2])
+
+      self._sock.setblocking(False)
+      with open(os.path.join(target_dir, self._file_op[1]), 'wb') as f:
+        while True:
+          rd, _, _ = select.select([self._sock], [], [])
+          if self._sock in rd:
+            buf = self._sock.recv(_BLOCK_SIZE)
+            if len(buf) == 0:
+              break
+            f.write(buf)
+    except socket.error as e:
+      logging.error('StartUploadServer: socket error: %s', e)
+    except Exception as e:
+      logging.error('StartUploadServer: %s', e)
+    finally:
+      self._sock.close()
+
+    logging.info('StartUploadServer: terminated')
+    sys.exit(0)
+
   def Ping(self):
     def timeout_handler(x):
       if x is None:
@@ -492,22 +530,28 @@
     self.SendRequest('ping', {}, timeout_handler, 5)
 
   def HandleRequest(self, msg):
-    if msg['name'] == 'upgrade':
+    command = msg['name']
+    params = msg['params']
+
+    if command == 'upgrade':
       self.Upgrade()
-    elif msg['name'] == 'terminal':
-      self.SpawnGhost(self.TERMINAL, msg['params']['sid'],
-                      bid=msg['params']['bid'])
+    elif command == 'terminal':
+      self.SpawnGhost(self.TERMINAL, params['sid'], bid=params['bid'])
       self.SendResponse(msg, RESPONSE_SUCCESS)
-    elif msg['name'] == 'shell':
-      self.SpawnGhost(self.SHELL, msg['params']['sid'],
-                      command=msg['params']['command'])
+    elif command == 'shell':
+      self.SpawnGhost(self.SHELL, params['sid'], command=params['command'])
       self.SendResponse(msg, RESPONSE_SUCCESS)
-    elif msg['name'] == 'file_download':
-      self.SpawnGhost(self.FILE, msg['params']['sid'],
-                      file_op=('download', msg['params']['filename']))
+    elif command == 'file_download':
+      self.SpawnGhost(self.FILE, params['sid'],
+                      file_op=('download', params['filename'], None))
       self.SendResponse(msg, RESPONSE_SUCCESS)
-    elif msg['name'] == 'clear_to_download':
+    elif command == 'clear_to_download':
       self.StartDownloadServer()
+    elif command == 'file_upload':
+      pid = self._terminal_sid_to_pid.get(params['terminal_sid'], None)
+      self.SpawnGhost(self.FILE, params['sid'],
+                      file_op=('upload', params['filename'], pid))
+      self.SendResponse(msg, RESPONSE_SUCCESS)
 
   def HandleResponse(self, response):
     rid = str(response['rid'])
@@ -550,7 +594,7 @@
 
   def InitiateDownload(self):
     ttyname, filename = self._download_queue.get()
-    bid = self._session_map[ttyname]
+    bid = self._ttyname_to_bid[ttyname]
     self.SpawnGhost(self.FILE, bid=bid, file_op=('download', filename))
 
   def Listen(self):
@@ -635,7 +679,10 @@
     self._download_queue.put((ttyname, filename))
 
   def RegisterTTY(self, browser_id, ttyname):
-    self._session_map[ttyname] = browser_id
+    self._ttyname_to_bid[ttyname] = browser_id
+
+  def RegisterSession(self, session_id, process_id):
+    self._terminal_sid_to_pid[session_id] = process_id
 
   def StartLanDiscovery(self):
     """Start to listen to LAN discovery packet at
@@ -687,6 +734,7 @@
                                      logRequests=False)
     rpc_server.register_function(self.Reconnect, 'Reconnect')
     rpc_server.register_function(self.RegisterTTY, 'RegisterTTY')
+    rpc_server.register_function(self.RegisterSession, 'RegisterSession')
     rpc_server.register_function(self.AddToDownloadQueue, 'AddToDownloadQueue')
     t = threading.Thread(target=rpc_server.serve_forever)
     t.daemon = True