crosperf: introducing new locking mechanism

In this patch, we modified the behavior of crosperf locking system. We
no longer need to specify whether to use file locking or afe locking,
instead locking methods are all automatically detected.

We also introduced a new leasing mechanism for skylab duts.

Now, if a dut:
 - in afe: use afe lock on it
 - in skylab: lease via skylab command
 - local: use file lock on it

When crosperf finishes or dies, we will also try to unlock those locked
by us.

There will be another CL to change the naming of afe_lcok to general
locks and remove unused afe local server.

BUG=chromium:984790
TEST=Tested crosperf with DUTs in Skylab quota pool, afe and locally.

Change-Id: I0a9bf4a16f54cfa0e8af077765fbe5a0a39e7c88
Reviewed-on: https://chromium-review.googlesource.com/c/chromiumos/third_party/toolchain-utils/+/1793911
Reviewed-by: Caroline Tice <cmtice@chromium.org>
Tested-by: Zhizhou Yang <zhizhouy@google.com>
Auto-Submit: Zhizhou Yang <zhizhouy@google.com>
Legacy-Commit-Queue: Commit Bot <commit-bot@chromium.org>
Commit-Queue: ChromeOS CL Exonerator Bot <chromiumos-cl-exonerator@appspot.gserviceaccount.com>
diff --git a/afe_lock_machine.py b/afe_lock_machine.py
index f83e897..a273a79 100755
--- a/afe_lock_machine.py
+++ b/afe_lock_machine.py
@@ -1,6 +1,9 @@
 #!/usr/bin/env python2
-#
-# Copyright 2015 Google INc.  All Rights Reserved.
+# -*- coding: utf-8 -*-
+# Copyright 2019 The Chromium OS Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
 """This module controls locking and unlocking of test machines."""
 
 from __future__ import print_function
@@ -11,6 +14,9 @@
 import sys
 import traceback
 
+import file_lock_machine
+
+from cros_utils import command_executer
 from cros_utils import logger
 from cros_utils import machines
 
@@ -75,13 +81,20 @@
   """
 
   LOCAL_SERVER = 'chrotomation2.svl.corp.google.com'
+  SKYLAB_PATH = '/usr/local/bin/skylab'
+  LEASE_MINS = 600
+  SKYLAB_CREDENTIAL = '/usr/local/google/home/mobiletc-prebuild/' \
+                      'chromeos-swarming-1adbe355c97c.json'
+  SWARMING = 'chromite/third_party/swarming.client/swarming.py'
+  SUCCESS = 0
 
   def __init__(self,
                remotes,
                force_option,
                chromeos_root,
                local_server,
-               use_local=True,
+               locks_dir='',
+               use_local=False,
                log=None):
     """Initializes an AFELockManager object.
 
@@ -95,6 +108,7 @@
       local_server: A string containing the name or ip address of the machine
         that is running an AFE server, which is to be used for managing
         machines that are not in the ChromeOS HW lab.
+      locks_dir: A directory used for file locking local devices.
       local: A Boolean indicating whether or not to use/allow a local AFE
         server to be used (see local_server argument).
       use_local: Use the local server instead of the official one.
@@ -105,6 +119,7 @@
     self.chromeos_root = chromeos_root
     self.user = getpass.getuser()
     self.logger = log or logger.GetLogger()
+    self.ce = command_executer.GetCommandExecuter(self.logger)
     autotest_path = os.path.join(chromeos_root,
                                  'src/third_party/autotest/files')
 
@@ -112,6 +127,8 @@
     sys.path.append(autotest_path)
     sys.path.append(os.path.join(autotest_path, 'server', 'cros'))
 
+    self.locks_dir = locks_dir
+
     # We have to wait to do these imports until the paths above have
     # been fixed.
     # pylint: disable=import-error
@@ -145,6 +162,9 @@
       self.machines = self.toolchain_lab_machines + self.GetAllNonlabMachines()
     self.force = force_option
 
+    self.local_machines = []
+    self.skylab_machines = []
+
   def AllLabMachines(self):
     """Check to see if all machines being used are HW Lab machines."""
     all_lab = True
@@ -229,6 +249,24 @@
       print('\nMachine (Board)\t\tStatus')
       print('---------------\t\t------\n')
 
+  def AddMachineToLocal(self, machine):
+    """Adds a machine to local machine list.
+
+    Args:
+      machine: The machine to be added.
+    """
+    if machine not in self.local_machines:
+      self.local_machines.append(machine)
+
+  def AddMachineToSkylab(self, machine):
+    """Adds a machine to skylab machine list.
+
+    Args:
+      machine: The machine to be added.
+    """
+    if machine not in self.skylab_machines:
+      self.skylab_machines.append(machine)
+
   def RemoveLocalMachine(self, m):
     """Removes a machine from the local AFE server.
 
@@ -360,15 +398,12 @@
         or unlock the machine (False).
       machine: The machine to update.
 
-    Raises:
-      LockingError:  An error occurred while attempting to update the machine
-        state.
+    Returns:
+      True if requested action succeeded, else False.
     """
-    action = 'lock'
-    if not should_lock_machine:
-      action = 'unlock'
     kwargs = {'locked': should_lock_machine}
-    kwargs['lock_reason'] = 'toolchain user request (%s)' % self.user
+    if should_lock_machine:
+      kwargs['lock_reason'] = 'toolchain user request (%s)' % self.user
 
     cros_name = machine + '.cros'
     if cros_name in self.toolchain_lab_machines:
@@ -385,9 +420,50 @@
           'modify_hosts',
           host_filter_data={'hostname__in': [m]},
           update_data=kwargs)
-    except Exception as e:
-      traceback.print_exc()
-      raise LockingError('Unable to %s machine %s. %s' % (action, m, str(e)))
+    except Exception:
+      return False
+    return True
+
+  def UpdateLockInSkylab(self, should_lock_machine, machine):
+    """Ask skylab to lease/release a machine.
+
+    Args:
+      should_lock_machine: Boolean indicating whether to lock the machine (True)
+        or unlock the machine (False).
+      machine: The machine to update.
+
+    Returns:
+      True if requested action succeeded, else False.
+    """
+    try:
+      if should_lock_machine:
+        ret = self.LeaseSkylabMachine(machine)
+      else:
+        ret = self.ReleaseSkylabMachine(machine)
+    except Exception:
+      return False
+    return ret
+
+  def UpdateFileLock(self, should_lock_machine, machine):
+    """Use file lock for local machines,
+
+    Args:
+      should_lock_machine: Boolean indicating whether to lock the machine (True)
+        or unlock the machine (False).
+      machine: The machine to update.
+
+    Returns:
+      True if requested action succeeded, else False.
+    """
+    try:
+      if should_lock_machine:
+        ret = file_lock_machine.Machine(machine, self.locks_dir).Lock(
+            True, sys.argv[0])
+      else:
+        ret = file_lock_machine.Machine(machine, self.locks_dir).Unlock(True)
+    except Exception:
+      return False
+    return ret
 
   def UpdateMachines(self, lock_machines):
     """Sets the locked state of the machines to the requested value.
@@ -403,16 +479,29 @@
       A list of the machines whose state was successfully updated.
     """
     updated_machines = []
+    action = 'Locking' if lock_machines else 'Unlocking'
     for m in self.machines:
-      self.UpdateLockInAFE(lock_machines, m)
-      # Since we returned from self.UpdateLockInAFE we assume the request
-      # succeeded.
-      if lock_machines:
-        self.logger.LogOutput('Locked machine(s) %s.' % m)
+      # TODO(zhizhouy): Handling exceptions with more details when locking
+      # doesn't succeed.
+      machine_type = 'afe'
+      if m in self.skylab_machines:
+        ret = self.UpdateLockInSkylab(lock_machines, m)
+        machine_type = 'skylab'
+      elif m in self.local_machines:
+        ret = self.UpdateFileLock(lock_machines, m)
+        machine_type = 'local'
       else:
-        self.logger.LogOutput('Unlocked machine(s) %s.' % m)
-      updated_machines.append(m)
+        ret = self.UpdateLockInAFE(lock_machines, m)
 
+      if ret:
+        self.logger.LogOutput(
+            '%s %s machine succeeded: %s.' % (action, machine_type, m))
+        updated_machines.append(m)
+      else:
+        self.logger.LogOutput(
+            '%s %s machine failed: %s.' % (action, machine_type, m))
+
+    self.machines = updated_machines
     return updated_machines
 
   def _InternalRemoveMachine(self, machine):
@@ -454,7 +543,8 @@
                                  '(%s).' % k)
           self._InternalRemoveMachine(k)
 
-        if state['locked'] and state['locked_by'] != self.user:
+        if state['locked'] and 'locked_by' in state and \
+          state['locked_by'] != self.user:
           raise DontOwnLock('Attempt to unlock machine (%s) locked by someone '
                             'else (%s).' % (k, state['locked_by']))
       elif cmd == 'lock':
@@ -507,6 +597,14 @@
 
     machine_list = {}
     for m in self.machines:
+      # For local or skylab machines, we simply set {'locked': status} for them
+      # TODO(zhizhouy): This is a quick fix since skylab cannot return host info
+      # as afe does. We need to get more info such as locked_by when skylab
+      # supports that.
+      if m in self.local_machines or m in self.skylab_machines:
+        machine_list[m] = {'locked': 0 if cmd == 'lock' else 1}
+        continue
+
       host_info = None
       cros_name = m + '.cros'
       if (m in self.toolchain_lab_machines or
@@ -538,6 +636,71 @@
         machine_list[m] = {}
     return machine_list
 
+  def CheckMachineInSkylab(self, machine):
+    """Run command to check if machine is in Skylab or not.
+
+    Returns:
+      True if machine in skylab, else False
+    """
+    credential = ''
+    if os.path.exists(self.SKYLAB_CREDENTIAL):
+      credential = '--auth-service-account-json %s' % self.SKYLAB_CREDENTIAL
+    swarming = os.path.join(self.chromeos_root, self.SWARMING)
+    cmd = (('%s query --swarming https://chromeos-swarming.appspot.com ' \
+            "%s 'bots/list?is_dead=FALSE&dimensions=dut_name:%s'") % \
+           (swarming,
+            credential,
+            machine.rstrip('.cros')))
+    ret_tup = self.ce.RunCommandWOutput(cmd)
+    # The command will return a json output as stdout. If machine not in skylab
+    # stdout will look like this:
+    #  {
+    #    "death_timeout": "600",
+    #    "now": "TIMESTAMP"
+    #  }
+    # Otherwise there will be a tuple starting with 'items', we simply detect
+    # this keyword for result.
+    if 'items' not in ret_tup[1]:
+      return False
+    else:
+      return True
+
+  def LeaseSkylabMachine(self, machine):
+    """Run command to lease dut from skylab.
+
+    Returns:
+      True if succeeded, False if failed.
+    """
+    credential = ''
+    if os.path.exists(self.SKYLAB_CREDENTIAL):
+      credential = '-service-account-json %s' % self.SKYLAB_CREDENTIAL
+    cmd = (('%s lease-dut -minutes %s %s %s') % \
+           (self.SKYLAB_PATH,
+            self.LEASE_MINS,
+            credential,
+            machine.rstrip('.cros')))
+    # Wait 120 seconds for server to start the lease task, if not started,
+    # we will treat it as unavailable.
+    check_interval_time = 120
+    retval = self.ce.RunCommand(cmd, command_timeout=check_interval_time)
+    return retval == self.SUCCESS
+
+  def ReleaseSkylabMachine(self, machine):
+    """Run command to release dut from skylab.
+
+    Returns:
+      True if succeeded, False if failed.
+    """
+    credential = ''
+    if os.path.exists(self.SKYLAB_CREDENTIAL):
+      credential = '-service-account-json %s' % self.SKYLAB_CREDENTIAL
+    cmd = (('%s release-dut %s %s') % \
+           (self.SKYLAB_PATH,
+            credential,
+            machine.rstrip('.cros')))
+    retval = self.ce.RunCommand(cmd)
+    return retval == self.SUCCESS
+
 
 def Main(argv):
   """Parse the options, initialize lock manager and dispatch proper method.