blob: 77e1b550897f4ba8db03d617608556e1d905b386 [file] [log] [blame]
Prashanth B4ec98672014-05-15 10:44:54 -07001#!/usr/bin/python
mbligh36768f02008-02-22 18:28:33 +00002"""
3Autotest scheduler
4"""
showard909c7a62008-07-15 21:52:38 +00005
Dan Shif6c65bd2014-08-29 16:15:07 -07006import datetime
7import gc
8import logging
9import optparse
10import os
11import signal
12import sys
13import time
showard402934a2009-12-21 22:20:47 +000014
Alex Miller05d7b4c2013-03-04 07:49:38 -080015import common
showard21baa452008-10-21 00:08:39 +000016from autotest_lib.frontend import setup_django_environment
showard402934a2009-12-21 22:20:47 +000017
18import django.db
19
Dan Shiec1d47d2015-02-13 11:38:13 -080020from autotest_lib.client.common_lib import control_data
Prashanth B0e960282014-05-13 19:38:28 -070021from autotest_lib.client.common_lib import global_config
beeps5e2bb4a2013-10-28 11:26:45 -070022from autotest_lib.client.common_lib import utils
Gabe Black1e1c41b2015-02-04 23:55:15 -080023from autotest_lib.client.common_lib.cros.graphite import autotest_stats
Prashanth B0e960282014-05-13 19:38:28 -070024from autotest_lib.frontend.afe import models, rpc_utils
Fang Dengc330bee2014-10-21 18:10:55 -070025from autotest_lib.scheduler import agent_task, drone_manager
beeps5e2bb4a2013-10-28 11:26:45 -070026from autotest_lib.scheduler import email_manager, gc_stats, host_scheduler
27from autotest_lib.scheduler import monitor_db_cleanup, prejob_task
Prashanth B0e960282014-05-13 19:38:28 -070028from autotest_lib.scheduler import postjob_task
Prashanth Bf66d51b2014-05-06 12:42:25 -070029from autotest_lib.scheduler import query_managers
Prashanth B0e960282014-05-13 19:38:28 -070030from autotest_lib.scheduler import scheduler_lib
jamesrenc44ae992010-02-19 00:12:54 +000031from autotest_lib.scheduler import scheduler_models
Alex Miller05d7b4c2013-03-04 07:49:38 -080032from autotest_lib.scheduler import status_server, scheduler_config
Prashanth Bf66d51b2014-05-06 12:42:25 -070033from autotest_lib.scheduler import scheduler_lib
Aviv Keshet308e7362013-05-21 14:43:16 -070034from autotest_lib.server import autoserv_utils
Prashanth Balasubramanian8c98ac12014-12-23 11:26:44 -080035from autotest_lib.server import utils as server_utils
Dan Shicf2e8dd2015-05-07 17:18:48 -070036from autotest_lib.site_utils import metadata_reporter
Dan Shib9144a42014-12-01 16:09:32 -080037from autotest_lib.site_utils import server_manager_utils
Alex Miller05d7b4c2013-03-04 07:49:38 -080038
Dan Shicf2e8dd2015-05-07 17:18:48 -070039
showard549afad2009-08-20 23:33:36 +000040BABYSITTER_PID_FILE_PREFIX = 'monitor_db_babysitter'
41PID_FILE_PREFIX = 'monitor_db'
mblighb090f142008-02-27 21:33:46 +000042
mbligh36768f02008-02-22 18:28:33 +000043RESULTS_DIR = '.'
mbligh36768f02008-02-22 18:28:33 +000044AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
45
46if os.environ.has_key('AUTOTEST_DIR'):
jadmanski0afbb632008-06-06 21:10:57 +000047 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
mbligh36768f02008-02-22 18:28:33 +000048AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
49AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
50
51if AUTOTEST_SERVER_DIR not in sys.path:
jadmanski0afbb632008-06-06 21:10:57 +000052 sys.path.insert(0, AUTOTEST_SERVER_DIR)
mbligh36768f02008-02-22 18:28:33 +000053
showard35162b02009-03-03 02:17:30 +000054# error message to leave in results dir when an autoserv process disappears
55# mysteriously
56_LOST_PROCESS_ERROR = """\
57Autoserv failed abnormally during execution for this job, probably due to a
58system error on the Autotest server. Full results may not be available. Sorry.
59"""
60
Prashanth B0e960282014-05-13 19:38:28 -070061_db_manager = None
Jakob Jülich36accc62014-07-23 10:26:55 -070062_db = None
mbligh36768f02008-02-22 18:28:33 +000063_shutdown = False
beeps5e2bb4a2013-10-28 11:26:45 -070064
65# These 2 globals are replaced for testing
66_autoserv_directory = autoserv_utils.autoserv_directory
67_autoserv_path = autoserv_utils.autoserv_path
mbligh4314a712008-02-29 22:44:30 +000068_testing_mode = False
jamesrenc44ae992010-02-19 00:12:54 +000069_drone_manager = None
Prashanth Bf66d51b2014-05-06 12:42:25 -070070_inline_host_acquisition = global_config.global_config.get_config_value(
71 scheduler_config.CONFIG_SECTION, 'inline_host_acquisition', type=bool,
72 default=True)
73
Dan Shiec1d47d2015-02-13 11:38:13 -080074_enable_ssp_container = global_config.global_config.get_config_value(
75 'AUTOSERV', 'enable_ssp_container', type=bool,
76 default=True)
mbligh36768f02008-02-22 18:28:33 +000077
mbligh83c1e9e2009-05-01 23:10:41 +000078def _site_init_monitor_db_dummy():
79 return {}
80
81
jamesren76fcf192010-04-21 20:39:50 +000082def _verify_default_drone_set_exists():
83 if (models.DroneSet.drone_sets_enabled() and
84 not models.DroneSet.default_drone_set_name()):
Prashanth B0e960282014-05-13 19:38:28 -070085 raise scheduler_lib.SchedulerError(
Dale Curtisaa513362011-03-01 17:27:44 -080086 'Drone sets are enabled, but no default is set')
jamesren76fcf192010-04-21 20:39:50 +000087
88
89def _sanity_check():
90 """Make sure the configs are consistent before starting the scheduler"""
91 _verify_default_drone_set_exists()
92
93
mbligh36768f02008-02-22 18:28:33 +000094def main():
showard27f33872009-04-07 18:20:53 +000095 try:
showard549afad2009-08-20 23:33:36 +000096 try:
97 main_without_exception_handling()
98 except SystemExit:
99 raise
100 except:
101 logging.exception('Exception escaping in monitor_db')
102 raise
103 finally:
104 utils.delete_pid_file_if_exists(PID_FILE_PREFIX)
showard27f33872009-04-07 18:20:53 +0000105
106
107def main_without_exception_handling():
Prashanth B0e960282014-05-13 19:38:28 -0700108 scheduler_lib.setup_logging(
109 os.environ.get('AUTOTEST_SCHEDULER_LOG_DIR', None),
110 os.environ.get('AUTOTEST_SCHEDULER_LOG_NAME', None))
showard136e6dc2009-06-10 19:38:49 +0000111 usage = 'usage: %prog [options] results_dir'
jadmanski0afbb632008-06-06 21:10:57 +0000112 parser = optparse.OptionParser(usage)
113 parser.add_option('--recover-hosts', help='Try to recover dead hosts',
114 action='store_true')
jadmanski0afbb632008-06-06 21:10:57 +0000115 parser.add_option('--test', help='Indicate that scheduler is under ' +
116 'test and should use dummy autoserv and no parsing',
117 action='store_true')
Dan Shif6c65bd2014-08-29 16:15:07 -0700118 parser.add_option('--production',
119 help=('Indicate that scheduler is running in production '
120 'environment and it can use database that is not '
121 'hosted in localhost. If it is set to False, '
122 'scheduler will fail if database is not in '
123 'localhost.'),
Dan Shi06b09b72014-09-09 16:06:17 -0700124 action='store_true', default=False)
jadmanski0afbb632008-06-06 21:10:57 +0000125 (options, args) = parser.parse_args()
126 if len(args) != 1:
127 parser.print_usage()
128 return
mbligh36768f02008-02-22 18:28:33 +0000129
Dan Shif6c65bd2014-08-29 16:15:07 -0700130 scheduler_lib.check_production_settings(options)
131
showard5613c662009-06-08 23:30:33 +0000132 scheduler_enabled = global_config.global_config.get_config_value(
133 scheduler_config.CONFIG_SECTION, 'enable_scheduler', type=bool)
134
135 if not scheduler_enabled:
Aviv Keshet05c08ac2013-01-10 16:11:44 -0800136 logging.error("Scheduler not enabled, set enable_scheduler to true in "
137 "the global_config's SCHEDULER section to enable it. "
138 "Exiting.")
showard5613c662009-06-08 23:30:33 +0000139 sys.exit(1)
140
jadmanski0afbb632008-06-06 21:10:57 +0000141 global RESULTS_DIR
142 RESULTS_DIR = args[0]
mbligh36768f02008-02-22 18:28:33 +0000143
mbligh83c1e9e2009-05-01 23:10:41 +0000144 site_init = utils.import_site_function(__file__,
145 "autotest_lib.scheduler.site_monitor_db", "site_init_monitor_db",
146 _site_init_monitor_db_dummy)
147 site_init()
148
showardcca334f2009-03-12 20:38:34 +0000149 # Change the cwd while running to avoid issues incase we were launched from
150 # somewhere odd (such as a random NFS home directory of the person running
151 # sudo to launch us as the appropriate user).
152 os.chdir(RESULTS_DIR)
153
jamesrenc7d387e2010-08-10 21:48:30 +0000154 # This is helpful for debugging why stuff a scheduler launches is
155 # misbehaving.
156 logging.info('os.environ: %s', os.environ)
showardc85c21b2008-11-24 22:17:37 +0000157
jadmanski0afbb632008-06-06 21:10:57 +0000158 if options.test:
159 global _autoserv_path
160 _autoserv_path = 'autoserv_dummy'
161 global _testing_mode
162 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +0000163
jamesrenc44ae992010-02-19 00:12:54 +0000164 server = status_server.StatusServer()
showardd1ee1dd2009-01-07 21:33:08 +0000165 server.start()
166
Dan Shicf2e8dd2015-05-07 17:18:48 -0700167 # Start the thread to report metadata.
168 metadata_reporter.start()
169
jadmanski0afbb632008-06-06 21:10:57 +0000170 try:
jamesrenc44ae992010-02-19 00:12:54 +0000171 initialize()
showardc5afc462009-01-13 00:09:39 +0000172 dispatcher = Dispatcher()
showard915958d2009-04-22 21:00:58 +0000173 dispatcher.initialize(recover_hosts=options.recover_hosts)
Shuqian Zhaoce59fe52015-03-18 10:53:58 -0700174 minimum_tick_sec = global_config.global_config.get_config_value(
175 scheduler_config.CONFIG_SECTION, 'minimum_tick_sec', type=float)
showardc5afc462009-01-13 00:09:39 +0000176
Eric Lia82dc352011-02-23 13:15:52 -0800177 while not _shutdown and not server._shutdown_scheduler:
Shuqian Zhaoce59fe52015-03-18 10:53:58 -0700178 start = time.time()
jadmanski0afbb632008-06-06 21:10:57 +0000179 dispatcher.tick()
Shuqian Zhaoce59fe52015-03-18 10:53:58 -0700180 curr_tick_sec = time.time() - start
181 if (minimum_tick_sec > curr_tick_sec):
182 time.sleep(minimum_tick_sec - curr_tick_sec)
183 else:
184 time.sleep(0.0001)
Prashanth B4ec98672014-05-15 10:44:54 -0700185 except Exception:
showard170873e2009-01-07 00:22:26 +0000186 email_manager.manager.log_stacktrace(
187 "Uncaught exception; terminating monitor_db")
jadmanski0afbb632008-06-06 21:10:57 +0000188
Dan Shicf2e8dd2015-05-07 17:18:48 -0700189 metadata_reporter.abort()
showard170873e2009-01-07 00:22:26 +0000190 email_manager.manager.send_queued_emails()
showard55b4b542009-01-08 23:30:30 +0000191 server.shutdown()
showard170873e2009-01-07 00:22:26 +0000192 _drone_manager.shutdown()
Prashanth B0e960282014-05-13 19:38:28 -0700193 _db_manager.disconnect()
showard136e6dc2009-06-10 19:38:49 +0000194
195
Prashanth B4ec98672014-05-15 10:44:54 -0700196def handle_signal(signum, frame):
jadmanski0afbb632008-06-06 21:10:57 +0000197 global _shutdown
198 _shutdown = True
showardb18134f2009-03-20 20:52:18 +0000199 logging.info("Shutdown request received.")
mbligh36768f02008-02-22 18:28:33 +0000200
201
jamesrenc44ae992010-02-19 00:12:54 +0000202def initialize():
showardb18134f2009-03-20 20:52:18 +0000203 logging.info("%s> dispatcher starting", time.strftime("%X %x"))
204 logging.info("My PID is %d", os.getpid())
mbligh36768f02008-02-22 18:28:33 +0000205
showard8de37132009-08-31 18:33:08 +0000206 if utils.program_is_alive(PID_FILE_PREFIX):
showard549afad2009-08-20 23:33:36 +0000207 logging.critical("monitor_db already running, aborting!")
208 sys.exit(1)
209 utils.write_pid(PID_FILE_PREFIX)
mblighfb676032009-04-01 18:25:38 +0000210
showardb1e51872008-10-07 11:08:18 +0000211 if _testing_mode:
212 global_config.global_config.override_config_value(
Prashanth Bf66d51b2014-05-06 12:42:25 -0700213 scheduler_lib.DB_CONFIG_SECTION, 'database',
214 'stresstest_autotest_web')
showardb1e51872008-10-07 11:08:18 +0000215
Dan Shib9144a42014-12-01 16:09:32 -0800216 # If server database is enabled, check if the server has role `scheduler`.
217 # If the server does not have scheduler role, exception will be raised and
218 # scheduler will not continue to run.
219 if server_manager_utils.use_server_db():
220 server_manager_utils.confirm_server_has_role(hostname='localhost',
221 role='scheduler')
222
jadmanski0afbb632008-06-06 21:10:57 +0000223 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
Prashanth B0e960282014-05-13 19:38:28 -0700224 global _db_manager
225 _db_manager = scheduler_lib.ConnectionManager()
Jakob Jülich36accc62014-07-23 10:26:55 -0700226 global _db
227 _db = _db_manager.get_connection()
showardb18134f2009-03-20 20:52:18 +0000228 logging.info("Setting signal handler")
Prashanth B4ec98672014-05-15 10:44:54 -0700229 signal.signal(signal.SIGINT, handle_signal)
230 signal.signal(signal.SIGTERM, handle_signal)
jadmanski0afbb632008-06-06 21:10:57 +0000231
jamesrenc44ae992010-02-19 00:12:54 +0000232 initialize_globals()
233 scheduler_models.initialize()
234
Dan Shib9144a42014-12-01 16:09:32 -0800235 if server_manager_utils.use_server_db():
236 drone_list = server_manager_utils.get_drones()
237 else:
238 drones = global_config.global_config.get_config_value(
239 scheduler_config.CONFIG_SECTION, 'drones', default='localhost')
240 drone_list = [hostname.strip() for hostname in drones.split(',')]
showard170873e2009-01-07 00:22:26 +0000241 results_host = global_config.global_config.get_config_value(
showardd1ee1dd2009-01-07 21:33:08 +0000242 scheduler_config.CONFIG_SECTION, 'results_host', default='localhost')
showard170873e2009-01-07 00:22:26 +0000243 _drone_manager.initialize(RESULTS_DIR, drone_list, results_host)
244
showardb18134f2009-03-20 20:52:18 +0000245 logging.info("Connected! Running...")
mbligh36768f02008-02-22 18:28:33 +0000246
247
jamesrenc44ae992010-02-19 00:12:54 +0000248def initialize_globals():
249 global _drone_manager
250 _drone_manager = drone_manager.instance()
251
252
showarded2afea2009-07-07 20:54:07 +0000253def _autoserv_command_line(machines, extra_args, job=None, queue_entry=None,
254 verbose=True):
showardf1ae3542009-05-11 19:26:02 +0000255 """
256 @returns The autoserv command line as a list of executable + parameters.
257
258 @param machines - string - A machine or comma separated list of machines
259 for the (-m) flag.
showardf1ae3542009-05-11 19:26:02 +0000260 @param extra_args - list - Additional arguments to pass to autoserv.
Aviv Keshet308e7362013-05-21 14:43:16 -0700261 @param job - Job object - If supplied, -u owner, -l name, --test-retry,
262 and client -c or server -s parameters will be added.
showardf1ae3542009-05-11 19:26:02 +0000263 @param queue_entry - A HostQueueEntry object - If supplied and no Job
264 object was supplied, this will be used to lookup the Job object.
265 """
Aviv Keshet308e7362013-05-21 14:43:16 -0700266 return autoserv_utils.autoserv_run_job_command(_autoserv_directory,
267 machines, results_directory=drone_manager.WORKING_DIRECTORY,
268 extra_args=extra_args, job=job, queue_entry=queue_entry,
269 verbose=verbose)
showard87ba02a2009-04-20 19:37:32 +0000270
271
Simran Basia858a232012-08-21 11:04:37 -0700272class BaseDispatcher(object):
Alex Miller05d7b4c2013-03-04 07:49:38 -0800273
274
jadmanski0afbb632008-06-06 21:10:57 +0000275 def __init__(self):
276 self._agents = []
showard3bb499f2008-07-03 19:42:20 +0000277 self._last_clean_time = time.time()
Alex Millerac189f32014-06-23 13:55:23 -0700278 user_cleanup_time = scheduler_config.config.clean_interval_minutes
mblighf3294cc2009-04-08 21:17:38 +0000279 self._periodic_cleanup = monitor_db_cleanup.UserCleanup(
Jakob Jülich36accc62014-07-23 10:26:55 -0700280 _db, user_cleanup_time)
Prashanth B0e960282014-05-13 19:38:28 -0700281 self._24hr_upkeep = monitor_db_cleanup.TwentyFourHourUpkeep(
Dan Shic458f662015-04-29 12:12:38 -0700282 _db, _drone_manager)
showard170873e2009-01-07 00:22:26 +0000283 self._host_agents = {}
284 self._queue_entry_agents = {}
showardf13a9e22009-12-18 22:54:09 +0000285 self._tick_count = 0
286 self._last_garbage_stats_time = time.time()
287 self._seconds_between_garbage_stats = 60 * (
288 global_config.global_config.get_config_value(
289 scheduler_config.CONFIG_SECTION,
Dale Curtis456d3c12011-07-19 11:42:51 -0700290 'gc_stats_interval_mins', type=int, default=6*60))
Simran Basi0ec94dd2012-08-28 09:50:10 -0700291 self._tick_debug = global_config.global_config.get_config_value(
292 scheduler_config.CONFIG_SECTION, 'tick_debug', type=bool,
293 default=False)
Simran Basidef92872012-09-20 13:34:34 -0700294 self._extra_debugging = global_config.global_config.get_config_value(
295 scheduler_config.CONFIG_SECTION, 'extra_debugging', type=bool,
296 default=False)
mbligh36768f02008-02-22 18:28:33 +0000297
Prashanth Bf66d51b2014-05-06 12:42:25 -0700298 # If _inline_host_acquisition is set the scheduler will acquire and
299 # release hosts against jobs inline, with the tick. Otherwise the
300 # scheduler will only focus on jobs that already have hosts, and
301 # will not explicitly unlease a host when a job finishes using it.
302 self._job_query_manager = query_managers.AFEJobQueryManager()
303 self._host_scheduler = (host_scheduler.BaseHostScheduler()
304 if _inline_host_acquisition else
305 host_scheduler.DummyHostScheduler())
306
mbligh36768f02008-02-22 18:28:33 +0000307
showard915958d2009-04-22 21:00:58 +0000308 def initialize(self, recover_hosts=True):
309 self._periodic_cleanup.initialize()
310 self._24hr_upkeep.initialize()
Dan Shi55d58992015-05-05 09:10:02 -0700311 # Execute all actions queued in the cleanup tasks. Scheduler tick will
312 # run a refresh task first. If there is any action in the queue, refresh
313 # will raise an exception.
314 _drone_manager.execute_actions()
showard915958d2009-04-22 21:00:58 +0000315
jadmanski0afbb632008-06-06 21:10:57 +0000316 # always recover processes
317 self._recover_processes()
mblighbb421852008-03-11 22:36:16 +0000318
jadmanski0afbb632008-06-06 21:10:57 +0000319 if recover_hosts:
320 self._recover_hosts()
mbligh36768f02008-02-22 18:28:33 +0000321
322
Simran Basi0ec94dd2012-08-28 09:50:10 -0700323 def _log_tick_msg(self, msg):
324 if self._tick_debug:
325 logging.debug(msg)
326
327
Simran Basidef92872012-09-20 13:34:34 -0700328 def _log_extra_msg(self, msg):
329 if self._extra_debugging:
330 logging.debug(msg)
331
332
jadmanski0afbb632008-06-06 21:10:57 +0000333 def tick(self):
Simran Basi0ec94dd2012-08-28 09:50:10 -0700334 """
335 This is an altered version of tick() where we keep track of when each
336 major step begins so we can try to figure out where we are using most
337 of the tick time.
338 """
Gabe Black1e1c41b2015-02-04 23:55:15 -0800339 timer = autotest_stats.Timer('scheduler.tick')
Simran Basi3f6717d2012-09-13 15:21:22 -0700340 self._log_tick_msg('Calling new tick, starting garbage collection().')
showardf13a9e22009-12-18 22:54:09 +0000341 self._garbage_collection()
Prashanth B340fd1e2014-06-22 12:44:10 -0700342 self._log_tick_msg('Calling _drone_manager.trigger_refresh().')
343 _drone_manager.trigger_refresh()
Simran Basi3f6717d2012-09-13 15:21:22 -0700344 self._log_tick_msg('Calling _process_recurring_runs().')
showard29f7cd22009-04-29 21:16:24 +0000345 self._process_recurring_runs()
Simran Basi3f6717d2012-09-13 15:21:22 -0700346 self._log_tick_msg('Calling _schedule_delay_tasks().')
showard8cc058f2009-09-08 16:26:33 +0000347 self._schedule_delay_tasks()
Simran Basi3f6717d2012-09-13 15:21:22 -0700348 self._log_tick_msg('Calling _schedule_running_host_queue_entries().')
showard8cc058f2009-09-08 16:26:33 +0000349 self._schedule_running_host_queue_entries()
Simran Basi3f6717d2012-09-13 15:21:22 -0700350 self._log_tick_msg('Calling _schedule_special_tasks().')
showard8cc058f2009-09-08 16:26:33 +0000351 self._schedule_special_tasks()
Simran Basi3f6717d2012-09-13 15:21:22 -0700352 self._log_tick_msg('Calling _schedule_new_jobs().')
showard65db3932009-10-28 19:54:35 +0000353 self._schedule_new_jobs()
Prashanth B340fd1e2014-06-22 12:44:10 -0700354 self._log_tick_msg('Calling _drone_manager.sync_refresh().')
355 _drone_manager.sync_refresh()
Dan Shi55d58992015-05-05 09:10:02 -0700356 # _run_cleanup must be called between drone_manager.sync_refresh, and
357 # drone_manager.execute_actions, as sync_refresh will clear the calls
358 # queued in drones. Therefore, any action that calls drone.queue_call
359 # to add calls to the drone._calls, should be after drone refresh is
360 # completed and before drone_manager.execute_actions at the end of the
361 # tick.
362 self._log_tick_msg('Calling _run_cleanup().')
363 self._run_cleanup()
Prashanth B67548092014-07-11 18:46:01 -0700364 self._log_tick_msg('Calling _find_aborting().')
365 self._find_aborting()
366 self._log_tick_msg('Calling _find_aborted_special_tasks().')
367 self._find_aborted_special_tasks()
Simran Basi3f6717d2012-09-13 15:21:22 -0700368 self._log_tick_msg('Calling _handle_agents().')
jadmanski0afbb632008-06-06 21:10:57 +0000369 self._handle_agents()
Simran Basi3f6717d2012-09-13 15:21:22 -0700370 self._log_tick_msg('Calling _host_scheduler.tick().')
jamesrene21bf412010-02-26 02:30:07 +0000371 self._host_scheduler.tick()
Simran Basi3f6717d2012-09-13 15:21:22 -0700372 self._log_tick_msg('Calling _drone_manager.execute_actions().')
showard170873e2009-01-07 00:22:26 +0000373 _drone_manager.execute_actions()
Simran Basi3f6717d2012-09-13 15:21:22 -0700374 self._log_tick_msg('Calling '
Simran Basi0ec94dd2012-08-28 09:50:10 -0700375 'email_manager.manager.send_queued_emails().')
Fang Deng1d6c2a02013-04-17 15:25:45 -0700376 with timer.get_client('email_manager_send_queued_emails'):
377 email_manager.manager.send_queued_emails()
Simran Basi3f6717d2012-09-13 15:21:22 -0700378 self._log_tick_msg('Calling django.db.reset_queries().')
Fang Deng1d6c2a02013-04-17 15:25:45 -0700379 with timer.get_client('django_db_reset_queries'):
380 django.db.reset_queries()
showardf13a9e22009-12-18 22:54:09 +0000381 self._tick_count += 1
mbligh36768f02008-02-22 18:28:33 +0000382
showard97aed502008-11-04 02:01:24 +0000383
mblighf3294cc2009-04-08 21:17:38 +0000384 def _run_cleanup(self):
385 self._periodic_cleanup.run_cleanup_maybe()
386 self._24hr_upkeep.run_cleanup_maybe()
showarda3ab0d52008-11-03 19:03:47 +0000387
mbligh36768f02008-02-22 18:28:33 +0000388
showardf13a9e22009-12-18 22:54:09 +0000389 def _garbage_collection(self):
390 threshold_time = time.time() - self._seconds_between_garbage_stats
391 if threshold_time < self._last_garbage_stats_time:
392 # Don't generate these reports very often.
393 return
394
395 self._last_garbage_stats_time = time.time()
396 # Force a full level 0 collection (because we can, it doesn't hurt
397 # at this interval).
398 gc.collect()
399 logging.info('Logging garbage collector stats on tick %d.',
400 self._tick_count)
401 gc_stats._log_garbage_collector_stats()
402
403
showard170873e2009-01-07 00:22:26 +0000404 def _register_agent_for_ids(self, agent_dict, object_ids, agent):
405 for object_id in object_ids:
406 agent_dict.setdefault(object_id, set()).add(agent)
407
408
409 def _unregister_agent_for_ids(self, agent_dict, object_ids, agent):
410 for object_id in object_ids:
411 assert object_id in agent_dict
412 agent_dict[object_id].remove(agent)
Dan Shi76af8022013-10-19 01:59:49 -0700413 # If an ID has no more active agent associated, there is no need to
414 # keep it in the dictionary. Otherwise, scheduler will keep an
415 # unnecessarily big dictionary until being restarted.
416 if not agent_dict[object_id]:
417 agent_dict.pop(object_id)
showard170873e2009-01-07 00:22:26 +0000418
419
showardd1195652009-12-08 22:21:02 +0000420 def add_agent_task(self, agent_task):
beeps8bb1f7d2013-08-05 01:30:09 -0700421 """
422 Creates and adds an agent to the dispatchers list.
423
424 In creating the agent we also pass on all the queue_entry_ids and
425 host_ids from the special agent task. For every agent we create, we
426 add it to 1. a dict against the queue_entry_ids given to it 2. A dict
427 against the host_ids given to it. So theoritically, a host can have any
428 number of agents associated with it, and each of them can have any
429 special agent task, though in practice we never see > 1 agent/task per
430 host at any time.
431
432 @param agent_task: A SpecialTask for the agent to manage.
433 """
showardd1195652009-12-08 22:21:02 +0000434 agent = Agent(agent_task)
jadmanski0afbb632008-06-06 21:10:57 +0000435 self._agents.append(agent)
436 agent.dispatcher = self
showard170873e2009-01-07 00:22:26 +0000437 self._register_agent_for_ids(self._host_agents, agent.host_ids, agent)
438 self._register_agent_for_ids(self._queue_entry_agents,
439 agent.queue_entry_ids, agent)
mblighd5c95802008-03-05 00:33:46 +0000440
showard170873e2009-01-07 00:22:26 +0000441
442 def get_agents_for_entry(self, queue_entry):
443 """
444 Find agents corresponding to the specified queue_entry.
445 """
showardd3dc1992009-04-22 21:01:40 +0000446 return list(self._queue_entry_agents.get(queue_entry.id, set()))
showard170873e2009-01-07 00:22:26 +0000447
448
449 def host_has_agent(self, host):
450 """
451 Determine if there is currently an Agent present using this host.
452 """
453 return bool(self._host_agents.get(host.id, None))
mbligh36768f02008-02-22 18:28:33 +0000454
455
jadmanski0afbb632008-06-06 21:10:57 +0000456 def remove_agent(self, agent):
457 self._agents.remove(agent)
showard170873e2009-01-07 00:22:26 +0000458 self._unregister_agent_for_ids(self._host_agents, agent.host_ids,
459 agent)
460 self._unregister_agent_for_ids(self._queue_entry_agents,
461 agent.queue_entry_ids, agent)
showardec113162008-05-08 00:52:49 +0000462
463
showard8cc058f2009-09-08 16:26:33 +0000464 def _host_has_scheduled_special_task(self, host):
465 return bool(models.SpecialTask.objects.filter(host__id=host.id,
466 is_active=False,
467 is_complete=False))
468
469
jadmanski0afbb632008-06-06 21:10:57 +0000470 def _recover_processes(self):
showardd1195652009-12-08 22:21:02 +0000471 agent_tasks = self._create_recovery_agent_tasks()
472 self._register_pidfiles(agent_tasks)
showard170873e2009-01-07 00:22:26 +0000473 _drone_manager.refresh()
showardd1195652009-12-08 22:21:02 +0000474 self._recover_tasks(agent_tasks)
showard8cc058f2009-09-08 16:26:33 +0000475 self._recover_pending_entries()
showardb8900452009-10-12 20:31:01 +0000476 self._check_for_unrecovered_verifying_entries()
showard170873e2009-01-07 00:22:26 +0000477 self._reverify_remaining_hosts()
478 # reinitialize drones after killing orphaned processes, since they can
479 # leave around files when they die
480 _drone_manager.execute_actions()
481 _drone_manager.reinitialize_drones()
mblighbb421852008-03-11 22:36:16 +0000482
showard170873e2009-01-07 00:22:26 +0000483
showardd1195652009-12-08 22:21:02 +0000484 def _create_recovery_agent_tasks(self):
485 return (self._get_queue_entry_agent_tasks()
486 + self._get_special_task_agent_tasks(is_active=True))
487
488
489 def _get_queue_entry_agent_tasks(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700490 """
491 Get agent tasks for all hqe in the specified states.
492
493 Loosely this translates to taking a hqe in one of the specified states,
494 say parsing, and getting an AgentTask for it, like the FinalReparseTask,
495 through _get_agent_task_for_queue_entry. Each queue entry can only have
496 one agent task at a time, but there might be multiple queue entries in
497 the group.
498
499 @return: A list of AgentTasks.
500 """
showardd1195652009-12-08 22:21:02 +0000501 # host queue entry statuses handled directly by AgentTasks (Verifying is
502 # handled through SpecialTasks, so is not listed here)
503 statuses = (models.HostQueueEntry.Status.STARTING,
504 models.HostQueueEntry.Status.RUNNING,
505 models.HostQueueEntry.Status.GATHERING,
mbligh4608b002010-01-05 18:22:35 +0000506 models.HostQueueEntry.Status.PARSING,
507 models.HostQueueEntry.Status.ARCHIVING)
showardd1195652009-12-08 22:21:02 +0000508 status_list = ','.join("'%s'" % status for status in statuses)
jamesrenc44ae992010-02-19 00:12:54 +0000509 queue_entries = scheduler_models.HostQueueEntry.fetch(
showardd1195652009-12-08 22:21:02 +0000510 where='status IN (%s)' % status_list)
Gabe Black1e1c41b2015-02-04 23:55:15 -0800511 autotest_stats.Gauge('scheduler.jobs_per_tick').send(
Alex Miller47cd2472013-11-25 15:20:04 -0800512 'running', len(queue_entries))
showardd1195652009-12-08 22:21:02 +0000513
514 agent_tasks = []
515 used_queue_entries = set()
516 for entry in queue_entries:
517 if self.get_agents_for_entry(entry):
518 # already being handled
519 continue
520 if entry in used_queue_entries:
521 # already picked up by a synchronous job
522 continue
523 agent_task = self._get_agent_task_for_queue_entry(entry)
524 agent_tasks.append(agent_task)
525 used_queue_entries.update(agent_task.queue_entries)
526 return agent_tasks
showard170873e2009-01-07 00:22:26 +0000527
528
showardd1195652009-12-08 22:21:02 +0000529 def _get_special_task_agent_tasks(self, is_active=False):
530 special_tasks = models.SpecialTask.objects.filter(
531 is_active=is_active, is_complete=False)
532 return [self._get_agent_task_for_special_task(task)
533 for task in special_tasks]
534
535
536 def _get_agent_task_for_queue_entry(self, queue_entry):
537 """
beeps8bb1f7d2013-08-05 01:30:09 -0700538 Construct an AgentTask instance for the given active HostQueueEntry.
539
showardd1195652009-12-08 22:21:02 +0000540 @param queue_entry: a HostQueueEntry
beeps8bb1f7d2013-08-05 01:30:09 -0700541 @return: an AgentTask to run the queue entry
showardd1195652009-12-08 22:21:02 +0000542 """
543 task_entries = queue_entry.job.get_group_entries(queue_entry)
544 self._check_for_duplicate_host_entries(task_entries)
545
546 if queue_entry.status in (models.HostQueueEntry.Status.STARTING,
547 models.HostQueueEntry.Status.RUNNING):
showarda9545c02009-12-18 22:44:26 +0000548 if queue_entry.is_hostless():
549 return HostlessQueueTask(queue_entry=queue_entry)
showardd1195652009-12-08 22:21:02 +0000550 return QueueTask(queue_entries=task_entries)
551 if queue_entry.status == models.HostQueueEntry.Status.GATHERING:
beeps5e2bb4a2013-10-28 11:26:45 -0700552 return postjob_task.GatherLogsTask(queue_entries=task_entries)
showardd1195652009-12-08 22:21:02 +0000553 if queue_entry.status == models.HostQueueEntry.Status.PARSING:
beeps5e2bb4a2013-10-28 11:26:45 -0700554 return postjob_task.FinalReparseTask(queue_entries=task_entries)
mbligh4608b002010-01-05 18:22:35 +0000555 if queue_entry.status == models.HostQueueEntry.Status.ARCHIVING:
beeps5e2bb4a2013-10-28 11:26:45 -0700556 return postjob_task.ArchiveResultsTask(queue_entries=task_entries)
showardd1195652009-12-08 22:21:02 +0000557
Prashanth B0e960282014-05-13 19:38:28 -0700558 raise scheduler_lib.SchedulerError(
Dale Curtisaa513362011-03-01 17:27:44 -0800559 '_get_agent_task_for_queue_entry got entry with '
560 'invalid status %s: %s' % (queue_entry.status, queue_entry))
showardd1195652009-12-08 22:21:02 +0000561
562
563 def _check_for_duplicate_host_entries(self, task_entries):
mbligh4608b002010-01-05 18:22:35 +0000564 non_host_statuses = (models.HostQueueEntry.Status.PARSING,
565 models.HostQueueEntry.Status.ARCHIVING)
showardd1195652009-12-08 22:21:02 +0000566 for task_entry in task_entries:
showarda9545c02009-12-18 22:44:26 +0000567 using_host = (task_entry.host is not None
mbligh4608b002010-01-05 18:22:35 +0000568 and task_entry.status not in non_host_statuses)
showarda9545c02009-12-18 22:44:26 +0000569 if using_host:
showardd1195652009-12-08 22:21:02 +0000570 self._assert_host_has_no_agent(task_entry)
571
572
573 def _assert_host_has_no_agent(self, entry):
574 """
575 @param entry: a HostQueueEntry or a SpecialTask
576 """
577 if self.host_has_agent(entry.host):
578 agent = tuple(self._host_agents.get(entry.host.id))[0]
Prashanth B0e960282014-05-13 19:38:28 -0700579 raise scheduler_lib.SchedulerError(
showardd1195652009-12-08 22:21:02 +0000580 'While scheduling %s, host %s already has a host agent %s'
581 % (entry, entry.host, agent.task))
582
583
584 def _get_agent_task_for_special_task(self, special_task):
585 """
586 Construct an AgentTask class to run the given SpecialTask and add it
587 to this dispatcher.
beeps8bb1f7d2013-08-05 01:30:09 -0700588
MK Ryu35d661e2014-09-25 17:44:10 -0700589 A special task is created through schedule_special_tasks, but only if
beeps8bb1f7d2013-08-05 01:30:09 -0700590 the host doesn't already have an agent. This happens through
591 add_agent_task. All special agent tasks are given a host on creation,
592 and a Null hqe. To create a SpecialAgentTask object, you need a
593 models.SpecialTask. If the SpecialTask used to create a SpecialAgentTask
594 object contains a hqe it's passed on to the special agent task, which
595 creates a HostQueueEntry and saves it as it's queue_entry.
596
showardd1195652009-12-08 22:21:02 +0000597 @param special_task: a models.SpecialTask instance
598 @returns an AgentTask to run this SpecialTask
599 """
600 self._assert_host_has_no_agent(special_task)
601
beeps5e2bb4a2013-10-28 11:26:45 -0700602 special_agent_task_classes = (prejob_task.CleanupTask,
603 prejob_task.VerifyTask,
604 prejob_task.RepairTask,
605 prejob_task.ResetTask,
606 prejob_task.ProvisionTask)
607
showardd1195652009-12-08 22:21:02 +0000608 for agent_task_class in special_agent_task_classes:
609 if agent_task_class.TASK_TYPE == special_task.task:
610 return agent_task_class(task=special_task)
611
Prashanth B0e960282014-05-13 19:38:28 -0700612 raise scheduler_lib.SchedulerError(
Dale Curtisaa513362011-03-01 17:27:44 -0800613 'No AgentTask class for task', str(special_task))
showardd1195652009-12-08 22:21:02 +0000614
615
616 def _register_pidfiles(self, agent_tasks):
617 for agent_task in agent_tasks:
618 agent_task.register_necessary_pidfiles()
619
620
621 def _recover_tasks(self, agent_tasks):
622 orphans = _drone_manager.get_orphaned_autoserv_processes()
623
624 for agent_task in agent_tasks:
625 agent_task.recover()
626 if agent_task.monitor and agent_task.monitor.has_process():
627 orphans.discard(agent_task.monitor.get_process())
628 self.add_agent_task(agent_task)
629
630 self._check_for_remaining_orphan_processes(orphans)
showarded2afea2009-07-07 20:54:07 +0000631
632
showard8cc058f2009-09-08 16:26:33 +0000633 def _get_unassigned_entries(self, status):
jamesrenc44ae992010-02-19 00:12:54 +0000634 for entry in scheduler_models.HostQueueEntry.fetch(where="status = '%s'"
635 % status):
showard0db3d432009-10-12 20:29:15 +0000636 if entry.status == status and not self.get_agents_for_entry(entry):
637 # The status can change during iteration, e.g., if job.run()
638 # sets a group of queue entries to Starting
showard8cc058f2009-09-08 16:26:33 +0000639 yield entry
640
641
showard6878e8b2009-07-20 22:37:45 +0000642 def _check_for_remaining_orphan_processes(self, orphans):
643 if not orphans:
644 return
645 subject = 'Unrecovered orphan autoserv processes remain'
646 message = '\n'.join(str(process) for process in orphans)
647 email_manager.manager.enqueue_notify_email(subject, message)
mbligh5fa9e112009-08-03 16:46:06 +0000648
649 die_on_orphans = global_config.global_config.get_config_value(
650 scheduler_config.CONFIG_SECTION, 'die_on_orphans', type=bool)
651
652 if die_on_orphans:
653 raise RuntimeError(subject + '\n' + message)
jadmanski0afbb632008-06-06 21:10:57 +0000654
showard170873e2009-01-07 00:22:26 +0000655
showard8cc058f2009-09-08 16:26:33 +0000656 def _recover_pending_entries(self):
657 for entry in self._get_unassigned_entries(
658 models.HostQueueEntry.Status.PENDING):
showard56824072009-10-12 20:30:21 +0000659 logging.info('Recovering Pending entry %s', entry)
showard8cc058f2009-09-08 16:26:33 +0000660 entry.on_pending()
661
662
showardb8900452009-10-12 20:31:01 +0000663 def _check_for_unrecovered_verifying_entries(self):
jamesrenc44ae992010-02-19 00:12:54 +0000664 queue_entries = scheduler_models.HostQueueEntry.fetch(
showardb8900452009-10-12 20:31:01 +0000665 where='status = "%s"' % models.HostQueueEntry.Status.VERIFYING)
666 unrecovered_hqes = []
667 for queue_entry in queue_entries:
668 special_tasks = models.SpecialTask.objects.filter(
669 task__in=(models.SpecialTask.Task.CLEANUP,
670 models.SpecialTask.Task.VERIFY),
671 queue_entry__id=queue_entry.id,
672 is_complete=False)
673 if special_tasks.count() == 0:
674 unrecovered_hqes.append(queue_entry)
showardd3dc1992009-04-22 21:01:40 +0000675
showardb8900452009-10-12 20:31:01 +0000676 if unrecovered_hqes:
677 message = '\n'.join(str(hqe) for hqe in unrecovered_hqes)
Prashanth B0e960282014-05-13 19:38:28 -0700678 raise scheduler_lib.SchedulerError(
showard37757f32009-10-19 18:34:24 +0000679 '%d unrecovered verifying host queue entries:\n%s' %
showardb8900452009-10-12 20:31:01 +0000680 (len(unrecovered_hqes), message))
showard170873e2009-01-07 00:22:26 +0000681
682
showard65db3932009-10-28 19:54:35 +0000683 def _schedule_special_tasks(self):
684 """
685 Execute queued SpecialTasks that are ready to run on idle hosts.
beeps8bb1f7d2013-08-05 01:30:09 -0700686
687 Special tasks include PreJobTasks like verify, reset and cleanup.
688 They are created through _schedule_new_jobs and associated with a hqe
689 This method translates SpecialTasks to the appropriate AgentTask and
690 adds them to the dispatchers agents list, so _handle_agents can execute
691 them.
showard65db3932009-10-28 19:54:35 +0000692 """
Prashanth B4ec98672014-05-15 10:44:54 -0700693 # When the host scheduler is responsible for acquisition we only want
694 # to run tasks with leased hosts. All hqe tasks will already have
695 # leased hosts, and we don't want to run frontend tasks till the host
696 # scheduler has vetted the assignment. Note that this doesn't include
697 # frontend tasks with hosts leased by other active hqes.
698 for task in self._job_query_manager.get_prioritized_special_tasks(
699 only_tasks_with_leased_hosts=not _inline_host_acquisition):
showard8cc058f2009-09-08 16:26:33 +0000700 if self.host_has_agent(task.host):
showard2fe3f1d2009-07-06 20:19:11 +0000701 continue
showardd1195652009-12-08 22:21:02 +0000702 self.add_agent_task(self._get_agent_task_for_special_task(task))
showard1ff7b2e2009-05-15 23:17:18 +0000703
704
showard170873e2009-01-07 00:22:26 +0000705 def _reverify_remaining_hosts(self):
showarded2afea2009-07-07 20:54:07 +0000706 # recover active hosts that have not yet been recovered, although this
showard170873e2009-01-07 00:22:26 +0000707 # should never happen
showarded2afea2009-07-07 20:54:07 +0000708 message = ('Recovering active host %s - this probably indicates a '
showard170873e2009-01-07 00:22:26 +0000709 'scheduler bug')
showarded2afea2009-07-07 20:54:07 +0000710 self._reverify_hosts_where(
Alex Millerdfff2fd2013-05-28 13:05:06 -0700711 "status IN ('Repairing', 'Verifying', 'Cleaning', 'Provisioning')",
showarded2afea2009-07-07 20:54:07 +0000712 print_message=message)
mblighbb421852008-03-11 22:36:16 +0000713
714
jadmanski0afbb632008-06-06 21:10:57 +0000715 def _reverify_hosts_where(self, where,
showard2fe3f1d2009-07-06 20:19:11 +0000716 print_message='Reverifying host %s'):
Dale Curtis456d3c12011-07-19 11:42:51 -0700717 full_where='locked = 0 AND invalid = 0 AND ' + where
jamesrenc44ae992010-02-19 00:12:54 +0000718 for host in scheduler_models.Host.fetch(where=full_where):
showard170873e2009-01-07 00:22:26 +0000719 if self.host_has_agent(host):
720 # host has already been recovered in some way
jadmanski0afbb632008-06-06 21:10:57 +0000721 continue
showard8cc058f2009-09-08 16:26:33 +0000722 if self._host_has_scheduled_special_task(host):
Paul Hobbsb92af21b2015-04-09 15:12:41 -0700723 # host will have a special task scheduled on the next tick
showard8cc058f2009-09-08 16:26:33 +0000724 continue
showard170873e2009-01-07 00:22:26 +0000725 if print_message:
showardb18134f2009-03-20 20:52:18 +0000726 logging.info(print_message, host.hostname)
showard8cc058f2009-09-08 16:26:33 +0000727 models.SpecialTask.objects.create(
728 task=models.SpecialTask.Task.CLEANUP,
showard9bb960b2009-11-19 01:02:11 +0000729 host=models.Host.objects.get(id=host.id))
mbligh36768f02008-02-22 18:28:33 +0000730
731
jadmanski0afbb632008-06-06 21:10:57 +0000732 def _recover_hosts(self):
733 # recover "Repair Failed" hosts
734 message = 'Reverifying dead host %s'
735 self._reverify_hosts_where("status = 'Repair Failed'",
736 print_message=message)
mbligh62ba2ed2008-04-30 17:09:25 +0000737
738
showard89f84db2009-03-12 20:39:13 +0000739 def _refresh_pending_queue_entries(self):
740 """
741 Lookup the pending HostQueueEntries and call our HostScheduler
742 refresh() method given that list. Return the list.
743
744 @returns A list of pending HostQueueEntries sorted in priority order.
745 """
Prashanth Bf66d51b2014-05-06 12:42:25 -0700746 queue_entries = self._job_query_manager.get_pending_queue_entries(
747 only_hostless=not _inline_host_acquisition)
showard63a34772008-08-18 19:32:50 +0000748 if not queue_entries:
showard89f84db2009-03-12 20:39:13 +0000749 return []
showard89f84db2009-03-12 20:39:13 +0000750 return queue_entries
751
752
showarda9545c02009-12-18 22:44:26 +0000753 def _schedule_hostless_job(self, queue_entry):
beepscc9fc702013-12-02 12:45:38 -0800754 """Schedule a hostless (suite) job.
755
756 @param queue_entry: The queue_entry representing the hostless job.
757 """
showarda9545c02009-12-18 22:44:26 +0000758 self.add_agent_task(HostlessQueueTask(queue_entry))
Jakob Juelichd615a1e2014-09-04 11:48:36 -0700759
760 # Need to set execution_subdir before setting the status:
761 # After a restart of the scheduler, agents will be restored for HQEs in
762 # Starting, Running, Gathering, Parsing or Archiving. To do this, the
763 # execution_subdir is needed. Therefore it must be set before entering
764 # one of these states.
765 # Otherwise, if the scheduler was interrupted between setting the status
766 # and the execution_subdir, upon it's restart restoring agents would
767 # fail.
768 # Is there a way to get a status in one of these states without going
769 # through this code? Following cases are possible:
770 # - If it's aborted before being started:
771 # active bit will be 0, so there's nothing to parse, it will just be
772 # set to completed by _find_aborting. Critical statuses are skipped.
773 # - If it's aborted or it fails after being started:
774 # It was started, so this code was executed.
775 queue_entry.update_field('execution_subdir', 'hostless')
jamesren47bd7372010-03-13 00:58:17 +0000776 queue_entry.set_status(models.HostQueueEntry.Status.STARTING)
showarda9545c02009-12-18 22:44:26 +0000777
778
beepscc9fc702013-12-02 12:45:38 -0800779 def _schedule_host_job(self, host, queue_entry):
780 """Schedules a job on the given host.
781
782 1. Assign the host to the hqe, if it isn't already assigned.
783 2. Create a SpecialAgentTask for the hqe.
784 3. Activate the hqe.
785
786 @param queue_entry: The job to schedule.
787 @param host: The host to schedule the job on.
788 """
789 if self.host_has_agent(host):
790 host_agent_task = list(self._host_agents.get(host.id))[0].task
791 subject = 'Host with agents assigned to an HQE'
792 message = ('HQE: %s assigned host %s, but the host has '
793 'agent: %s for queue_entry %s. The HQE '
beepsf7d35162014-02-13 16:42:13 -0800794 'will have to try and acquire a host next tick ' %
beepscc9fc702013-12-02 12:45:38 -0800795 (queue_entry, host.hostname, host_agent_task,
796 host_agent_task.queue_entry))
797 email_manager.manager.enqueue_notify_email(subject, message)
beepscc9fc702013-12-02 12:45:38 -0800798 else:
Prashanth Bf66d51b2014-05-06 12:42:25 -0700799 self._host_scheduler.schedule_host_job(host, queue_entry)
beepscc9fc702013-12-02 12:45:38 -0800800
801
showard89f84db2009-03-12 20:39:13 +0000802 def _schedule_new_jobs(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700803 """
804 Find any new HQEs and call schedule_pre_job_tasks for it.
805
806 This involves setting the status of the HQE and creating a row in the
807 db corresponding the the special task, through
808 scheduler_models._queue_special_task. The new db row is then added as
809 an agent to the dispatcher through _schedule_special_tasks and
810 scheduled for execution on the drone through _handle_agents.
811 """
showard89f84db2009-03-12 20:39:13 +0000812 queue_entries = self._refresh_pending_queue_entries()
showard89f84db2009-03-12 20:39:13 +0000813
beepscc9fc702013-12-02 12:45:38 -0800814 key = 'scheduler.jobs_per_tick'
beepsb255fc52013-10-13 23:28:54 -0700815 new_hostless_jobs = 0
beepsb255fc52013-10-13 23:28:54 -0700816 new_jobs_with_hosts = 0
817 new_jobs_need_hosts = 0
beepscc9fc702013-12-02 12:45:38 -0800818 host_jobs = []
Simran Basi3f6717d2012-09-13 15:21:22 -0700819 logging.debug('Processing %d queue_entries', len(queue_entries))
jamesren883492a2010-02-12 00:45:18 +0000820
beepscc9fc702013-12-02 12:45:38 -0800821 for queue_entry in queue_entries:
jamesren883492a2010-02-12 00:45:18 +0000822 if queue_entry.is_hostless():
showarda9545c02009-12-18 22:44:26 +0000823 self._schedule_hostless_job(queue_entry)
beepsb255fc52013-10-13 23:28:54 -0700824 new_hostless_jobs = new_hostless_jobs + 1
showarde55955f2009-10-07 20:48:58 +0000825 else:
beepscc9fc702013-12-02 12:45:38 -0800826 host_jobs.append(queue_entry)
beepsb255fc52013-10-13 23:28:54 -0700827 new_jobs_need_hosts = new_jobs_need_hosts + 1
beepsb255fc52013-10-13 23:28:54 -0700828
Gabe Black1e1c41b2015-02-04 23:55:15 -0800829 autotest_stats.Gauge(key).send('new_hostless_jobs', new_hostless_jobs)
beepscc9fc702013-12-02 12:45:38 -0800830 if not host_jobs:
831 return
Prashanth Bf66d51b2014-05-06 12:42:25 -0700832 if not _inline_host_acquisition:
833 message = ('Found %s jobs that need hosts though '
834 '_inline_host_acquisition=%s. Will acquire hosts.' %
835 ([str(job) for job in host_jobs],
836 _inline_host_acquisition))
837 email_manager.manager.enqueue_notify_email(
838 'Processing unexpected host acquisition requests', message)
839 jobs_with_hosts = self._host_scheduler.find_hosts_for_jobs(host_jobs)
840 for host_assignment in jobs_with_hosts:
841 self._schedule_host_job(host_assignment.host, host_assignment.job)
842 new_jobs_with_hosts = new_jobs_with_hosts + 1
beepscc9fc702013-12-02 12:45:38 -0800843
Gabe Black1e1c41b2015-02-04 23:55:15 -0800844 autotest_stats.Gauge(key).send('new_jobs_with_hosts',
845 new_jobs_with_hosts)
846 autotest_stats.Gauge(key).send('new_jobs_without_hosts',
847 new_jobs_need_hosts -
848 new_jobs_with_hosts)
showardb95b1bd2008-08-15 18:11:04 +0000849
850
showard8cc058f2009-09-08 16:26:33 +0000851 def _schedule_running_host_queue_entries(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700852 """
853 Adds agents to the dispatcher.
854
855 Any AgentTask, like the QueueTask, is wrapped in an Agent. The
856 QueueTask for example, will have a job with a control file, and
857 the agent will have methods that poll, abort and check if the queue
858 task is finished. The dispatcher runs the agent_task, as well as
859 other agents in it's _agents member, through _handle_agents, by
860 calling the Agents tick().
861
862 This method creates an agent for each HQE in one of (starting, running,
863 gathering, parsing, archiving) states, and adds it to the dispatcher so
864 it is handled by _handle_agents.
865 """
showardd1195652009-12-08 22:21:02 +0000866 for agent_task in self._get_queue_entry_agent_tasks():
867 self.add_agent_task(agent_task)
showard8cc058f2009-09-08 16:26:33 +0000868
869
870 def _schedule_delay_tasks(self):
jamesrenc44ae992010-02-19 00:12:54 +0000871 for entry in scheduler_models.HostQueueEntry.fetch(
872 where='status = "%s"' % models.HostQueueEntry.Status.WAITING):
showard8cc058f2009-09-08 16:26:33 +0000873 task = entry.job.schedule_delayed_callback_task(entry)
874 if task:
showardd1195652009-12-08 22:21:02 +0000875 self.add_agent_task(task)
showard8cc058f2009-09-08 16:26:33 +0000876
877
jadmanski0afbb632008-06-06 21:10:57 +0000878 def _find_aborting(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700879 """
880 Looks through the afe_host_queue_entries for an aborted entry.
881
882 The aborted bit is set on an HQE in many ways, the most common
883 being when a user requests an abort through the frontend, which
884 results in an rpc from the afe to abort_host_queue_entries.
885 """
jamesrene7c65cb2010-06-08 20:38:10 +0000886 jobs_to_stop = set()
jamesrenc44ae992010-02-19 00:12:54 +0000887 for entry in scheduler_models.HostQueueEntry.fetch(
Alex Miller9fe39662013-08-09 11:53:09 -0700888 where='aborted=1 and complete=0'):
Prashanth Balasubramanian047e1c52014-12-22 19:25:23 -0800889
890 # If the job is running on a shard, let the shard handle aborting
891 # it and sync back the right status.
Prashanth Balasubramanian8c98ac12014-12-23 11:26:44 -0800892 if entry.job.shard_id is not None and not server_utils.is_shard():
Prashanth Balasubramanian047e1c52014-12-22 19:25:23 -0800893 logging.info('Waiting for shard %s to abort hqe %s',
894 entry.job.shard_id, entry)
895 continue
896
showardf4a2e502009-07-28 20:06:39 +0000897 logging.info('Aborting %s', entry)
beepse50d8752013-11-20 18:23:02 -0800898
899 # The task would have started off with both is_complete and
900 # is_active = False. Aborted tasks are neither active nor complete.
901 # For all currently active tasks this will happen through the agent,
902 # but we need to manually update the special tasks that haven't
903 # started yet, because they don't have agents.
904 models.SpecialTask.objects.filter(is_active=False,
905 queue_entry_id=entry.id).update(is_complete=True)
906
showardd3dc1992009-04-22 21:01:40 +0000907 for agent in self.get_agents_for_entry(entry):
908 agent.abort()
909 entry.abort(self)
jamesrene7c65cb2010-06-08 20:38:10 +0000910 jobs_to_stop.add(entry.job)
Simran Basi3f6717d2012-09-13 15:21:22 -0700911 logging.debug('Aborting %d jobs this tick.', len(jobs_to_stop))
jamesrene7c65cb2010-06-08 20:38:10 +0000912 for job in jobs_to_stop:
913 job.stop_if_necessary()
jadmanski0afbb632008-06-06 21:10:57 +0000914
915
beeps8bb1f7d2013-08-05 01:30:09 -0700916 def _find_aborted_special_tasks(self):
917 """
918 Find SpecialTasks that have been marked for abortion.
919
920 Poll the database looking for SpecialTasks that are active
921 and have been marked for abortion, then abort them.
922 """
923
924 # The completed and active bits are very important when it comes
925 # to scheduler correctness. The active bit is set through the prolog
926 # of a special task, and reset through the cleanup method of the
927 # SpecialAgentTask. The cleanup is called both through the abort and
928 # epilog. The complete bit is set in several places, and in general
929 # a hanging job will have is_active=1 is_complete=0, while a special
930 # task which completed will have is_active=0 is_complete=1. To check
931 # aborts we directly check active because the complete bit is set in
932 # several places, including the epilog of agent tasks.
933 aborted_tasks = models.SpecialTask.objects.filter(is_active=True,
934 is_aborted=True)
935 for task in aborted_tasks:
936 # There are 2 ways to get the agent associated with a task,
937 # through the host and through the hqe. A special task
938 # always needs a host, but doesn't always need a hqe.
939 for agent in self._host_agents.get(task.host.id, []):
beeps5e2bb4a2013-10-28 11:26:45 -0700940 if isinstance(agent.task, agent_task.SpecialAgentTask):
Prashanth Bf47a6bb2014-08-29 18:07:06 +0000941
beeps8bb1f7d2013-08-05 01:30:09 -0700942 # The epilog preforms critical actions such as
943 # queueing the next SpecialTask, requeuing the
944 # hqe etc, however it doesn't actually kill the
945 # monitor process and set the 'done' bit. Epilogs
946 # assume that the job failed, and that the monitor
947 # process has already written an exit code. The
948 # done bit is a necessary condition for
949 # _handle_agents to schedule any more special
950 # tasks against the host, and it must be set
951 # in addition to is_active, is_complete and success.
952 agent.task.epilog()
Prashanth Bf47a6bb2014-08-29 18:07:06 +0000953 agent.task.abort()
beeps8bb1f7d2013-08-05 01:30:09 -0700954
955
Paul Hobbsb92af21b2015-04-09 15:12:41 -0700956 def _can_start_agent(self, agent, have_reached_limit):
showard4c5374f2008-09-04 17:02:56 +0000957 # always allow zero-process agents to run
showardd1195652009-12-08 22:21:02 +0000958 if agent.task.num_processes == 0:
showard4c5374f2008-09-04 17:02:56 +0000959 return True
960 # don't allow any nonzero-process agents to run after we've reached a
961 # limit (this avoids starvation of many-process agents)
962 if have_reached_limit:
963 return False
964 # total process throttling
showard9bb960b2009-11-19 01:02:11 +0000965 max_runnable_processes = _drone_manager.max_runnable_processes(
jamesren76fcf192010-04-21 20:39:50 +0000966 agent.task.owner_username,
967 agent.task.get_drone_hostnames_allowed())
showardd1195652009-12-08 22:21:02 +0000968 if agent.task.num_processes > max_runnable_processes:
showard4c5374f2008-09-04 17:02:56 +0000969 return False
showard4c5374f2008-09-04 17:02:56 +0000970 return True
971
972
jadmanski0afbb632008-06-06 21:10:57 +0000973 def _handle_agents(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700974 """
975 Handles agents of the dispatcher.
976
977 Appropriate Agents are added to the dispatcher through
978 _schedule_running_host_queue_entries. These agents each
979 have a task. This method runs the agents task through
980 agent.tick() leading to:
981 agent.start
982 prolog -> AgentTasks prolog
983 For each queue entry:
984 sets host status/status to Running
985 set started_on in afe_host_queue_entries
986 run -> AgentTasks run
987 Creates PidfileRunMonitor
988 Queues the autoserv command line for this AgentTask
989 via the drone manager. These commands are executed
990 through the drone managers execute actions.
991 poll -> AgentTasks/BaseAgentTask poll
992 checks the monitors exit_code.
993 Executes epilog if task is finished.
994 Executes AgentTasks _finish_task
995 finish_task is usually responsible for setting the status
996 of the HQE/host, and updating it's active and complete fileds.
997
998 agent.is_done
999 Removed the agent from the dispatchers _agents queue.
1000 Is_done checks the finished bit on the agent, that is
1001 set based on the Agents task. During the agents poll
1002 we check to see if the monitor process has exited in
1003 it's finish method, and set the success member of the
1004 task based on this exit code.
1005 """
Paul Hobbsb92af21b2015-04-09 15:12:41 -07001006 num_started_this_tick = 0
1007 num_finished_this_tick = 0
showard4c5374f2008-09-04 17:02:56 +00001008 have_reached_limit = False
1009 # iterate over copy, so we can remove agents during iteration
Simran Basi3f6717d2012-09-13 15:21:22 -07001010 logging.debug('Handling %d Agents', len(self._agents))
showard4c5374f2008-09-04 17:02:56 +00001011 for agent in list(self._agents):
Simran Basidef92872012-09-20 13:34:34 -07001012 self._log_extra_msg('Processing Agent with Host Ids: %s and '
1013 'queue_entry ids:%s' % (agent.host_ids,
1014 agent.queue_entry_ids))
showard8cc058f2009-09-08 16:26:33 +00001015 if not agent.started:
Paul Hobbsb92af21b2015-04-09 15:12:41 -07001016 if not self._can_start_agent(agent, have_reached_limit):
showard4c5374f2008-09-04 17:02:56 +00001017 have_reached_limit = True
Simran Basi3f6717d2012-09-13 15:21:22 -07001018 logging.debug('Reached Limit of allowed running Agents.')
showard4c5374f2008-09-04 17:02:56 +00001019 continue
Paul Hobbsb92af21b2015-04-09 15:12:41 -07001020 num_started_this_tick += agent.task.num_processes
Simran Basidef92872012-09-20 13:34:34 -07001021 self._log_extra_msg('Starting Agent')
showard4c5374f2008-09-04 17:02:56 +00001022 agent.tick()
Simran Basidef92872012-09-20 13:34:34 -07001023 self._log_extra_msg('Agent tick completed.')
showard8cc058f2009-09-08 16:26:33 +00001024 if agent.is_done():
Paul Hobbsb92af21b2015-04-09 15:12:41 -07001025 num_finished_this_tick += agent.task.num_processes
Simran Basidef92872012-09-20 13:34:34 -07001026 self._log_extra_msg("Agent finished")
showard8cc058f2009-09-08 16:26:33 +00001027 self.remove_agent(agent)
Gabe Black1e1c41b2015-02-04 23:55:15 -08001028 autotest_stats.Gauge('scheduler.jobs_per_tick').send(
Paul Hobbsb92af21b2015-04-09 15:12:41 -07001029 'agents_started', num_started_this_tick)
Gabe Black1e1c41b2015-02-04 23:55:15 -08001030 autotest_stats.Gauge('scheduler.jobs_per_tick').send(
Paul Hobbsb92af21b2015-04-09 15:12:41 -07001031 'agents_finished', num_finished_this_tick)
1032 logging.info('%d running processes. %d added this tick.',
Simran Basi3f6717d2012-09-13 15:21:22 -07001033 _drone_manager.total_running_processes(),
Paul Hobbsb92af21b2015-04-09 15:12:41 -07001034 num_started_this_tick)
mbligh36768f02008-02-22 18:28:33 +00001035
1036
showard29f7cd22009-04-29 21:16:24 +00001037 def _process_recurring_runs(self):
1038 recurring_runs = models.RecurringRun.objects.filter(
1039 start_date__lte=datetime.datetime.now())
1040 for rrun in recurring_runs:
1041 # Create job from template
1042 job = rrun.job
1043 info = rpc_utils.get_job_info(job)
showarda9435c02009-05-13 21:28:17 +00001044 options = job.get_object_dict()
showard29f7cd22009-04-29 21:16:24 +00001045
1046 host_objects = info['hosts']
1047 one_time_hosts = info['one_time_hosts']
1048 metahost_objects = info['meta_hosts']
1049 dependencies = info['dependencies']
1050 atomic_group = info['atomic_group']
1051
1052 for host in one_time_hosts or []:
1053 this_host = models.Host.create_one_time_host(host.hostname)
1054 host_objects.append(this_host)
1055
1056 try:
1057 rpc_utils.create_new_job(owner=rrun.owner.login,
showarda9435c02009-05-13 21:28:17 +00001058 options=options,
showard29f7cd22009-04-29 21:16:24 +00001059 host_objects=host_objects,
1060 metahost_objects=metahost_objects,
showard29f7cd22009-04-29 21:16:24 +00001061 atomic_group=atomic_group)
1062
1063 except Exception, ex:
1064 logging.exception(ex)
1065 #TODO send email
1066
1067 if rrun.loop_count == 1:
1068 rrun.delete()
1069 else:
1070 if rrun.loop_count != 0: # if not infinite loop
1071 # calculate new start_date
1072 difference = datetime.timedelta(seconds=rrun.loop_period)
1073 rrun.start_date = rrun.start_date + difference
1074 rrun.loop_count -= 1
1075 rrun.save()
1076
1077
Simran Basia858a232012-08-21 11:04:37 -07001078SiteDispatcher = utils.import_site_class(
1079 __file__, 'autotest_lib.scheduler.site_monitor_db',
1080 'SiteDispatcher', BaseDispatcher)
1081
1082class Dispatcher(SiteDispatcher):
1083 pass
1084
1085
mbligh36768f02008-02-22 18:28:33 +00001086class Agent(object):
showard77182562009-06-10 00:16:05 +00001087 """
Alex Miller47715eb2013-07-24 03:34:01 -07001088 An agent for use by the Dispatcher class to perform a task. An agent wraps
1089 around an AgentTask mainly to associate the AgentTask with the queue_entry
1090 and host ids.
showard77182562009-06-10 00:16:05 +00001091
1092 The following methods are required on all task objects:
1093 poll() - Called periodically to let the task check its status and
1094 update its internal state. If the task succeeded.
1095 is_done() - Returns True if the task is finished.
1096 abort() - Called when an abort has been requested. The task must
1097 set its aborted attribute to True if it actually aborted.
1098
1099 The following attributes are required on all task objects:
1100 aborted - bool, True if this task was aborted.
showard77182562009-06-10 00:16:05 +00001101 success - bool, True if this task succeeded.
1102 queue_entry_ids - A sequence of HostQueueEntry ids this task handles.
1103 host_ids - A sequence of Host ids this task represents.
showard77182562009-06-10 00:16:05 +00001104 """
1105
1106
showard418785b2009-11-23 20:19:59 +00001107 def __init__(self, task):
showard77182562009-06-10 00:16:05 +00001108 """
Alex Miller47715eb2013-07-24 03:34:01 -07001109 @param task: An instance of an AgentTask.
showard77182562009-06-10 00:16:05 +00001110 """
showard8cc058f2009-09-08 16:26:33 +00001111 self.task = task
showard8cc058f2009-09-08 16:26:33 +00001112
showard77182562009-06-10 00:16:05 +00001113 # This is filled in by Dispatcher.add_agent()
jadmanski0afbb632008-06-06 21:10:57 +00001114 self.dispatcher = None
jadmanski0afbb632008-06-06 21:10:57 +00001115
showard8cc058f2009-09-08 16:26:33 +00001116 self.queue_entry_ids = task.queue_entry_ids
1117 self.host_ids = task.host_ids
showard170873e2009-01-07 00:22:26 +00001118
showard8cc058f2009-09-08 16:26:33 +00001119 self.started = False
showard9bb960b2009-11-19 01:02:11 +00001120 self.finished = False
mbligh36768f02008-02-22 18:28:33 +00001121
1122
jadmanski0afbb632008-06-06 21:10:57 +00001123 def tick(self):
showard8cc058f2009-09-08 16:26:33 +00001124 self.started = True
showard9bb960b2009-11-19 01:02:11 +00001125 if not self.finished:
showard8cc058f2009-09-08 16:26:33 +00001126 self.task.poll()
1127 if self.task.is_done():
showard9bb960b2009-11-19 01:02:11 +00001128 self.finished = True
showardec113162008-05-08 00:52:49 +00001129
1130
jadmanski0afbb632008-06-06 21:10:57 +00001131 def is_done(self):
showard9bb960b2009-11-19 01:02:11 +00001132 return self.finished
mbligh36768f02008-02-22 18:28:33 +00001133
1134
showardd3dc1992009-04-22 21:01:40 +00001135 def abort(self):
showard8cc058f2009-09-08 16:26:33 +00001136 if self.task:
1137 self.task.abort()
1138 if self.task.aborted:
showard08a36412009-05-05 01:01:13 +00001139 # tasks can choose to ignore aborts
showard9bb960b2009-11-19 01:02:11 +00001140 self.finished = True
showard20f9bdd2009-04-29 19:48:33 +00001141
showardd3dc1992009-04-22 21:01:40 +00001142
beeps5e2bb4a2013-10-28 11:26:45 -07001143class AbstractQueueTask(agent_task.AgentTask, agent_task.TaskWithJobKeyvals):
showarda9545c02009-12-18 22:44:26 +00001144 """
1145 Common functionality for QueueTask and HostlessQueueTask
1146 """
1147 def __init__(self, queue_entries):
1148 super(AbstractQueueTask, self).__init__()
showardd1195652009-12-08 22:21:02 +00001149 self.job = queue_entries[0].job
jadmanski0afbb632008-06-06 21:10:57 +00001150 self.queue_entries = queue_entries
mbligh36768f02008-02-22 18:28:33 +00001151
1152
showard73ec0442009-02-07 02:05:20 +00001153 def _keyval_path(self):
showardd1195652009-12-08 22:21:02 +00001154 return os.path.join(self._working_directory(), self._KEYVAL_FILE)
showard73ec0442009-02-07 02:05:20 +00001155
1156
jamesrenc44ae992010-02-19 00:12:54 +00001157 def _write_control_file(self, execution_path):
1158 control_path = _drone_manager.attach_file_to_execution(
1159 execution_path, self.job.control_file)
1160 return control_path
1161
1162
Aviv Keshet308e7362013-05-21 14:43:16 -07001163 # TODO: Refactor into autoserv_utils. crbug.com/243090
showardd1195652009-12-08 22:21:02 +00001164 def _command_line(self):
jamesrenc44ae992010-02-19 00:12:54 +00001165 execution_path = self.queue_entries[0].execution_path()
1166 control_path = self._write_control_file(execution_path)
1167 hostnames = ','.join(entry.host.hostname
1168 for entry in self.queue_entries
1169 if not entry.is_hostless())
1170
1171 execution_tag = self.queue_entries[0].execution_tag()
1172 params = _autoserv_command_line(
1173 hostnames,
Alex Miller9f01d5d2013-08-08 02:26:01 -07001174 ['-P', execution_tag, '-n',
jamesrenc44ae992010-02-19 00:12:54 +00001175 _drone_manager.absolute_path(control_path)],
1176 job=self.job, verbose=False)
Dale Curtis30cb8eb2011-06-09 12:22:26 -07001177 if self.job.is_image_update_job():
1178 params += ['--image', self.job.update_image_path]
1179
jamesrenc44ae992010-02-19 00:12:54 +00001180 return params
showardd1195652009-12-08 22:21:02 +00001181
1182
1183 @property
1184 def num_processes(self):
1185 return len(self.queue_entries)
1186
1187
1188 @property
1189 def owner_username(self):
1190 return self.job.owner
1191
1192
1193 def _working_directory(self):
1194 return self._get_consistent_execution_path(self.queue_entries)
mblighbb421852008-03-11 22:36:16 +00001195
1196
jadmanski0afbb632008-06-06 21:10:57 +00001197 def prolog(self):
showardd9205182009-04-27 20:09:55 +00001198 queued_key, queued_time = self._job_queued_keyval(self.job)
showardc1a98d12010-01-15 00:22:22 +00001199 keyval_dict = self.job.keyval_dict()
1200 keyval_dict[queued_key] = queued_time
showardd1195652009-12-08 22:21:02 +00001201 group_name = self.queue_entries[0].get_group_name()
1202 if group_name:
1203 keyval_dict['host_group_name'] = group_name
showardf1ae3542009-05-11 19:26:02 +00001204 self._write_keyvals_before_job(keyval_dict)
jadmanski0afbb632008-06-06 21:10:57 +00001205 for queue_entry in self.queue_entries:
showard8cc058f2009-09-08 16:26:33 +00001206 queue_entry.set_status(models.HostQueueEntry.Status.RUNNING)
showarda9545c02009-12-18 22:44:26 +00001207 queue_entry.set_started_on_now()
mbligh36768f02008-02-22 18:28:33 +00001208
1209
showard35162b02009-03-03 02:17:30 +00001210 def _write_lost_process_error_file(self):
showardd1195652009-12-08 22:21:02 +00001211 error_file_path = os.path.join(self._working_directory(), 'job_failure')
showard35162b02009-03-03 02:17:30 +00001212 _drone_manager.write_lines_to_file(error_file_path,
1213 [_LOST_PROCESS_ERROR])
1214
1215
showardd3dc1992009-04-22 21:01:40 +00001216 def _finish_task(self):
showard08a36412009-05-05 01:01:13 +00001217 if not self.monitor:
1218 return
1219
showardd9205182009-04-27 20:09:55 +00001220 self._write_job_finished()
1221
showard35162b02009-03-03 02:17:30 +00001222 if self.monitor.lost_process:
1223 self._write_lost_process_error_file()
showard4ac47542009-08-31 18:32:19 +00001224
jadmanskif7fa2cc2008-10-01 14:13:23 +00001225
showardcbd74612008-11-19 21:42:02 +00001226 def _write_status_comment(self, comment):
showard170873e2009-01-07 00:22:26 +00001227 _drone_manager.write_lines_to_file(
showardd1195652009-12-08 22:21:02 +00001228 os.path.join(self._working_directory(), 'status.log'),
showard170873e2009-01-07 00:22:26 +00001229 ['INFO\t----\t----\t' + comment],
showard35162b02009-03-03 02:17:30 +00001230 paired_with_process=self.monitor.get_process())
showardcbd74612008-11-19 21:42:02 +00001231
1232
jadmanskif7fa2cc2008-10-01 14:13:23 +00001233 def _log_abort(self):
showard170873e2009-01-07 00:22:26 +00001234 if not self.monitor or not self.monitor.has_process():
1235 return
1236
jadmanskif7fa2cc2008-10-01 14:13:23 +00001237 # build up sets of all the aborted_by and aborted_on values
1238 aborted_by, aborted_on = set(), set()
1239 for queue_entry in self.queue_entries:
1240 if queue_entry.aborted_by:
1241 aborted_by.add(queue_entry.aborted_by)
1242 t = int(time.mktime(queue_entry.aborted_on.timetuple()))
1243 aborted_on.add(t)
1244
1245 # extract some actual, unique aborted by value and write it out
showard64a95952010-01-13 21:27:16 +00001246 # TODO(showard): this conditional is now obsolete, we just need to leave
1247 # it in temporarily for backwards compatibility over upgrades. delete
1248 # soon.
jadmanskif7fa2cc2008-10-01 14:13:23 +00001249 assert len(aborted_by) <= 1
1250 if len(aborted_by) == 1:
showardcbd74612008-11-19 21:42:02 +00001251 aborted_by_value = aborted_by.pop()
1252 aborted_on_value = max(aborted_on)
1253 else:
1254 aborted_by_value = 'autotest_system'
1255 aborted_on_value = int(time.time())
showard170873e2009-01-07 00:22:26 +00001256
showarda0382352009-02-11 23:36:43 +00001257 self._write_keyval_after_job("aborted_by", aborted_by_value)
1258 self._write_keyval_after_job("aborted_on", aborted_on_value)
showard170873e2009-01-07 00:22:26 +00001259
showardcbd74612008-11-19 21:42:02 +00001260 aborted_on_string = str(datetime.datetime.fromtimestamp(
1261 aborted_on_value))
1262 self._write_status_comment('Job aborted by %s on %s' %
1263 (aborted_by_value, aborted_on_string))
jadmanskic2ac77f2008-05-16 21:44:04 +00001264
1265
jadmanski0afbb632008-06-06 21:10:57 +00001266 def abort(self):
showarda9545c02009-12-18 22:44:26 +00001267 super(AbstractQueueTask, self).abort()
jadmanskif7fa2cc2008-10-01 14:13:23 +00001268 self._log_abort()
showardd3dc1992009-04-22 21:01:40 +00001269 self._finish_task()
showard21baa452008-10-21 00:08:39 +00001270
1271
jadmanski0afbb632008-06-06 21:10:57 +00001272 def epilog(self):
showarda9545c02009-12-18 22:44:26 +00001273 super(AbstractQueueTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00001274 self._finish_task()
showarda9545c02009-12-18 22:44:26 +00001275
1276
1277class QueueTask(AbstractQueueTask):
1278 def __init__(self, queue_entries):
1279 super(QueueTask, self).__init__(queue_entries)
1280 self._set_ids(queue_entries=queue_entries)
1281
1282
1283 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00001284 self._check_queue_entry_statuses(
1285 self.queue_entries,
1286 allowed_hqe_statuses=(models.HostQueueEntry.Status.STARTING,
1287 models.HostQueueEntry.Status.RUNNING),
1288 allowed_host_statuses=(models.Host.Status.PENDING,
1289 models.Host.Status.RUNNING))
showarda9545c02009-12-18 22:44:26 +00001290
1291 super(QueueTask, self).prolog()
1292
1293 for queue_entry in self.queue_entries:
1294 self._write_host_keyvals(queue_entry.host)
1295 queue_entry.host.set_status(models.Host.Status.RUNNING)
1296 queue_entry.host.update_field('dirty', 1)
showarda9545c02009-12-18 22:44:26 +00001297
1298
1299 def _finish_task(self):
1300 super(QueueTask, self)._finish_task()
1301
1302 for queue_entry in self.queue_entries:
1303 queue_entry.set_status(models.HostQueueEntry.Status.GATHERING)
jamesrenb8f3f352010-06-10 00:44:06 +00001304 queue_entry.host.set_status(models.Host.Status.RUNNING)
mbligh36768f02008-02-22 18:28:33 +00001305
1306
Alex Miller9f01d5d2013-08-08 02:26:01 -07001307 def _command_line(self):
Dan Shi36cfd832014-10-10 13:38:51 -07001308 invocation = super(QueueTask, self)._command_line()
1309 # Check if server-side packaging is needed.
1310 if (_enable_ssp_container and
1311 self.job.control_type == control_data.CONTROL_TYPE.SERVER and
1312 self.job.require_ssp != False):
Dan Shiec1d47d2015-02-13 11:38:13 -08001313 invocation += ['--require-ssp']
Dan Shi36cfd832014-10-10 13:38:51 -07001314 keyval_dict = self.job.keyval_dict()
1315 test_source_build = keyval_dict.get('test_source_build', None)
1316 if test_source_build:
1317 invocation += ['--test_source_build', test_source_build]
Dan Shi70647ca2015-07-16 22:52:35 -07001318 if self.job.parent_job_id:
1319 invocation += ['--parent_job_id', str(self.job.parent_job_id)]
Dan Shi36cfd832014-10-10 13:38:51 -07001320 return invocation + ['--verify_job_repo_url']
Alex Miller9f01d5d2013-08-08 02:26:01 -07001321
1322
Dan Shi1a189052013-10-28 14:41:35 -07001323class HostlessQueueTask(AbstractQueueTask):
mbligh4608b002010-01-05 18:22:35 +00001324 def __init__(self, queue_entry):
1325 super(HostlessQueueTask, self).__init__([queue_entry])
1326 self.queue_entry_ids = [queue_entry.id]
1327
1328
1329 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00001330 super(HostlessQueueTask, self).prolog()
1331
1332
mbligh4608b002010-01-05 18:22:35 +00001333 def _finish_task(self):
1334 super(HostlessQueueTask, self)._finish_task()
Dan Shi76af8022013-10-19 01:59:49 -07001335
1336 # When a job is added to database, its initial status is always
1337 # Starting. In a scheduler tick, scheduler finds all jobs in Starting
1338 # status, check if any of them can be started. If scheduler hits some
Paul Hobbsb92af21b2015-04-09 15:12:41 -07001339 # limit, e.g., max_hostless_jobs_per_drone, scheduler will
1340 # leave these jobs in Starting status. Otherwise, the jobs'
1341 # status will be changed to Running, and an autoserv process
1342 # will be started in drone for each of these jobs.
Dan Shi76af8022013-10-19 01:59:49 -07001343 # If the entry is still in status Starting, the process has not started
1344 # yet. Therefore, there is no need to parse and collect log. Without
1345 # this check, exception will be raised by scheduler as execution_subdir
1346 # for this queue entry does not have a value yet.
1347 hqe = self.queue_entries[0]
1348 if hqe.status != models.HostQueueEntry.Status.STARTING:
1349 hqe.set_status(models.HostQueueEntry.Status.PARSING)
mbligh4608b002010-01-05 18:22:35 +00001350
1351
mbligh36768f02008-02-22 18:28:33 +00001352if __name__ == '__main__':
jadmanski0afbb632008-06-06 21:10:57 +00001353 main()