blob: f7a9dc463aa98a6a237146a2a80759dab7b9c3f7 [file] [log] [blame]
Prashanth B4ec98672014-05-15 10:44:54 -07001#!/usr/bin/python
mbligh36768f02008-02-22 18:28:33 +00002"""
3Autotest scheduler
4"""
showard909c7a62008-07-15 21:52:38 +00005
Dan Shif6c65bd2014-08-29 16:15:07 -07006import datetime
7import gc
8import logging
9import optparse
10import os
11import signal
12import sys
13import time
showard402934a2009-12-21 22:20:47 +000014
Alex Miller05d7b4c2013-03-04 07:49:38 -080015import common
showard21baa452008-10-21 00:08:39 +000016from autotest_lib.frontend import setup_django_environment
showard402934a2009-12-21 22:20:47 +000017
18import django.db
19
Dan Shiec1d47d2015-02-13 11:38:13 -080020from autotest_lib.client.common_lib import control_data
Prashanth B0e960282014-05-13 19:38:28 -070021from autotest_lib.client.common_lib import global_config
beeps5e2bb4a2013-10-28 11:26:45 -070022from autotest_lib.client.common_lib import utils
Gabe Black1e1c41b2015-02-04 23:55:15 -080023from autotest_lib.client.common_lib.cros.graphite import autotest_stats
Prashanth B0e960282014-05-13 19:38:28 -070024from autotest_lib.frontend.afe import models, rpc_utils
Fang Dengc330bee2014-10-21 18:10:55 -070025from autotest_lib.scheduler import agent_task, drone_manager
beeps5e2bb4a2013-10-28 11:26:45 -070026from autotest_lib.scheduler import email_manager, gc_stats, host_scheduler
27from autotest_lib.scheduler import monitor_db_cleanup, prejob_task
Prashanth B0e960282014-05-13 19:38:28 -070028from autotest_lib.scheduler import postjob_task
Prashanth Bf66d51b2014-05-06 12:42:25 -070029from autotest_lib.scheduler import query_managers
Prashanth B0e960282014-05-13 19:38:28 -070030from autotest_lib.scheduler import scheduler_lib
jamesrenc44ae992010-02-19 00:12:54 +000031from autotest_lib.scheduler import scheduler_models
Alex Miller05d7b4c2013-03-04 07:49:38 -080032from autotest_lib.scheduler import status_server, scheduler_config
Prashanth Bf66d51b2014-05-06 12:42:25 -070033from autotest_lib.scheduler import scheduler_lib
Aviv Keshet308e7362013-05-21 14:43:16 -070034from autotest_lib.server import autoserv_utils
Prashanth Balasubramanian8c98ac12014-12-23 11:26:44 -080035from autotest_lib.server import utils as server_utils
Dan Shicf2e8dd2015-05-07 17:18:48 -070036from autotest_lib.site_utils import metadata_reporter
Dan Shib9144a42014-12-01 16:09:32 -080037from autotest_lib.site_utils import server_manager_utils
Alex Miller05d7b4c2013-03-04 07:49:38 -080038
Dan Shicf2e8dd2015-05-07 17:18:48 -070039
showard549afad2009-08-20 23:33:36 +000040BABYSITTER_PID_FILE_PREFIX = 'monitor_db_babysitter'
41PID_FILE_PREFIX = 'monitor_db'
mblighb090f142008-02-27 21:33:46 +000042
mbligh36768f02008-02-22 18:28:33 +000043RESULTS_DIR = '.'
mbligh36768f02008-02-22 18:28:33 +000044AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
45
46if os.environ.has_key('AUTOTEST_DIR'):
jadmanski0afbb632008-06-06 21:10:57 +000047 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
mbligh36768f02008-02-22 18:28:33 +000048AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
49AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
50
51if AUTOTEST_SERVER_DIR not in sys.path:
jadmanski0afbb632008-06-06 21:10:57 +000052 sys.path.insert(0, AUTOTEST_SERVER_DIR)
mbligh36768f02008-02-22 18:28:33 +000053
showard35162b02009-03-03 02:17:30 +000054# error message to leave in results dir when an autoserv process disappears
55# mysteriously
56_LOST_PROCESS_ERROR = """\
57Autoserv failed abnormally during execution for this job, probably due to a
58system error on the Autotest server. Full results may not be available. Sorry.
59"""
60
Prashanth B0e960282014-05-13 19:38:28 -070061_db_manager = None
Jakob Jülich36accc62014-07-23 10:26:55 -070062_db = None
mbligh36768f02008-02-22 18:28:33 +000063_shutdown = False
beeps5e2bb4a2013-10-28 11:26:45 -070064
65# These 2 globals are replaced for testing
66_autoserv_directory = autoserv_utils.autoserv_directory
67_autoserv_path = autoserv_utils.autoserv_path
mbligh4314a712008-02-29 22:44:30 +000068_testing_mode = False
jamesrenc44ae992010-02-19 00:12:54 +000069_drone_manager = None
Prashanth Bf66d51b2014-05-06 12:42:25 -070070_inline_host_acquisition = global_config.global_config.get_config_value(
71 scheduler_config.CONFIG_SECTION, 'inline_host_acquisition', type=bool,
72 default=True)
73
Dan Shiec1d47d2015-02-13 11:38:13 -080074_enable_ssp_container = global_config.global_config.get_config_value(
75 'AUTOSERV', 'enable_ssp_container', type=bool,
76 default=True)
mbligh36768f02008-02-22 18:28:33 +000077
mbligh83c1e9e2009-05-01 23:10:41 +000078def _site_init_monitor_db_dummy():
79 return {}
80
81
jamesren76fcf192010-04-21 20:39:50 +000082def _verify_default_drone_set_exists():
83 if (models.DroneSet.drone_sets_enabled() and
84 not models.DroneSet.default_drone_set_name()):
Prashanth B0e960282014-05-13 19:38:28 -070085 raise scheduler_lib.SchedulerError(
Dale Curtisaa513362011-03-01 17:27:44 -080086 'Drone sets are enabled, but no default is set')
jamesren76fcf192010-04-21 20:39:50 +000087
88
89def _sanity_check():
90 """Make sure the configs are consistent before starting the scheduler"""
91 _verify_default_drone_set_exists()
92
93
mbligh36768f02008-02-22 18:28:33 +000094def main():
showard27f33872009-04-07 18:20:53 +000095 try:
showard549afad2009-08-20 23:33:36 +000096 try:
97 main_without_exception_handling()
98 except SystemExit:
99 raise
100 except:
101 logging.exception('Exception escaping in monitor_db')
102 raise
103 finally:
104 utils.delete_pid_file_if_exists(PID_FILE_PREFIX)
showard27f33872009-04-07 18:20:53 +0000105
106
107def main_without_exception_handling():
Prashanth B0e960282014-05-13 19:38:28 -0700108 scheduler_lib.setup_logging(
109 os.environ.get('AUTOTEST_SCHEDULER_LOG_DIR', None),
110 os.environ.get('AUTOTEST_SCHEDULER_LOG_NAME', None))
showard136e6dc2009-06-10 19:38:49 +0000111 usage = 'usage: %prog [options] results_dir'
jadmanski0afbb632008-06-06 21:10:57 +0000112 parser = optparse.OptionParser(usage)
113 parser.add_option('--recover-hosts', help='Try to recover dead hosts',
114 action='store_true')
jadmanski0afbb632008-06-06 21:10:57 +0000115 parser.add_option('--test', help='Indicate that scheduler is under ' +
116 'test and should use dummy autoserv and no parsing',
117 action='store_true')
Dan Shif6c65bd2014-08-29 16:15:07 -0700118 parser.add_option('--production',
119 help=('Indicate that scheduler is running in production '
120 'environment and it can use database that is not '
121 'hosted in localhost. If it is set to False, '
122 'scheduler will fail if database is not in '
123 'localhost.'),
Dan Shi06b09b72014-09-09 16:06:17 -0700124 action='store_true', default=False)
jadmanski0afbb632008-06-06 21:10:57 +0000125 (options, args) = parser.parse_args()
126 if len(args) != 1:
127 parser.print_usage()
128 return
mbligh36768f02008-02-22 18:28:33 +0000129
Dan Shif6c65bd2014-08-29 16:15:07 -0700130 scheduler_lib.check_production_settings(options)
131
showard5613c662009-06-08 23:30:33 +0000132 scheduler_enabled = global_config.global_config.get_config_value(
133 scheduler_config.CONFIG_SECTION, 'enable_scheduler', type=bool)
134
135 if not scheduler_enabled:
Aviv Keshet05c08ac2013-01-10 16:11:44 -0800136 logging.error("Scheduler not enabled, set enable_scheduler to true in "
137 "the global_config's SCHEDULER section to enable it. "
138 "Exiting.")
showard5613c662009-06-08 23:30:33 +0000139 sys.exit(1)
140
jadmanski0afbb632008-06-06 21:10:57 +0000141 global RESULTS_DIR
142 RESULTS_DIR = args[0]
mbligh36768f02008-02-22 18:28:33 +0000143
mbligh83c1e9e2009-05-01 23:10:41 +0000144 site_init = utils.import_site_function(__file__,
145 "autotest_lib.scheduler.site_monitor_db", "site_init_monitor_db",
146 _site_init_monitor_db_dummy)
147 site_init()
148
showardcca334f2009-03-12 20:38:34 +0000149 # Change the cwd while running to avoid issues incase we were launched from
150 # somewhere odd (such as a random NFS home directory of the person running
151 # sudo to launch us as the appropriate user).
152 os.chdir(RESULTS_DIR)
153
jamesrenc7d387e2010-08-10 21:48:30 +0000154 # This is helpful for debugging why stuff a scheduler launches is
155 # misbehaving.
156 logging.info('os.environ: %s', os.environ)
showardc85c21b2008-11-24 22:17:37 +0000157
jadmanski0afbb632008-06-06 21:10:57 +0000158 if options.test:
159 global _autoserv_path
160 _autoserv_path = 'autoserv_dummy'
161 global _testing_mode
162 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +0000163
jamesrenc44ae992010-02-19 00:12:54 +0000164 server = status_server.StatusServer()
showardd1ee1dd2009-01-07 21:33:08 +0000165 server.start()
166
Dan Shicf2e8dd2015-05-07 17:18:48 -0700167 # Start the thread to report metadata.
168 metadata_reporter.start()
169
jadmanski0afbb632008-06-06 21:10:57 +0000170 try:
jamesrenc44ae992010-02-19 00:12:54 +0000171 initialize()
showardc5afc462009-01-13 00:09:39 +0000172 dispatcher = Dispatcher()
showard915958d2009-04-22 21:00:58 +0000173 dispatcher.initialize(recover_hosts=options.recover_hosts)
Shuqian Zhaoce59fe52015-03-18 10:53:58 -0700174 minimum_tick_sec = global_config.global_config.get_config_value(
175 scheduler_config.CONFIG_SECTION, 'minimum_tick_sec', type=float)
showardc5afc462009-01-13 00:09:39 +0000176
Eric Lia82dc352011-02-23 13:15:52 -0800177 while not _shutdown and not server._shutdown_scheduler:
Shuqian Zhaoce59fe52015-03-18 10:53:58 -0700178 start = time.time()
jadmanski0afbb632008-06-06 21:10:57 +0000179 dispatcher.tick()
Shuqian Zhaoce59fe52015-03-18 10:53:58 -0700180 curr_tick_sec = time.time() - start
181 if (minimum_tick_sec > curr_tick_sec):
182 time.sleep(minimum_tick_sec - curr_tick_sec)
183 else:
184 time.sleep(0.0001)
Prashanth B4ec98672014-05-15 10:44:54 -0700185 except Exception:
showard170873e2009-01-07 00:22:26 +0000186 email_manager.manager.log_stacktrace(
187 "Uncaught exception; terminating monitor_db")
jadmanski0afbb632008-06-06 21:10:57 +0000188
Dan Shicf2e8dd2015-05-07 17:18:48 -0700189 metadata_reporter.abort()
showard170873e2009-01-07 00:22:26 +0000190 email_manager.manager.send_queued_emails()
showard55b4b542009-01-08 23:30:30 +0000191 server.shutdown()
showard170873e2009-01-07 00:22:26 +0000192 _drone_manager.shutdown()
Prashanth B0e960282014-05-13 19:38:28 -0700193 _db_manager.disconnect()
showard136e6dc2009-06-10 19:38:49 +0000194
195
Prashanth B4ec98672014-05-15 10:44:54 -0700196def handle_signal(signum, frame):
jadmanski0afbb632008-06-06 21:10:57 +0000197 global _shutdown
198 _shutdown = True
showardb18134f2009-03-20 20:52:18 +0000199 logging.info("Shutdown request received.")
mbligh36768f02008-02-22 18:28:33 +0000200
201
jamesrenc44ae992010-02-19 00:12:54 +0000202def initialize():
showardb18134f2009-03-20 20:52:18 +0000203 logging.info("%s> dispatcher starting", time.strftime("%X %x"))
204 logging.info("My PID is %d", os.getpid())
mbligh36768f02008-02-22 18:28:33 +0000205
showard8de37132009-08-31 18:33:08 +0000206 if utils.program_is_alive(PID_FILE_PREFIX):
showard549afad2009-08-20 23:33:36 +0000207 logging.critical("monitor_db already running, aborting!")
208 sys.exit(1)
209 utils.write_pid(PID_FILE_PREFIX)
mblighfb676032009-04-01 18:25:38 +0000210
showardb1e51872008-10-07 11:08:18 +0000211 if _testing_mode:
212 global_config.global_config.override_config_value(
Prashanth Bf66d51b2014-05-06 12:42:25 -0700213 scheduler_lib.DB_CONFIG_SECTION, 'database',
214 'stresstest_autotest_web')
showardb1e51872008-10-07 11:08:18 +0000215
Dan Shib9144a42014-12-01 16:09:32 -0800216 # If server database is enabled, check if the server has role `scheduler`.
217 # If the server does not have scheduler role, exception will be raised and
218 # scheduler will not continue to run.
219 if server_manager_utils.use_server_db():
220 server_manager_utils.confirm_server_has_role(hostname='localhost',
221 role='scheduler')
222
jadmanski0afbb632008-06-06 21:10:57 +0000223 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
Prashanth B0e960282014-05-13 19:38:28 -0700224 global _db_manager
225 _db_manager = scheduler_lib.ConnectionManager()
Jakob Jülich36accc62014-07-23 10:26:55 -0700226 global _db
227 _db = _db_manager.get_connection()
showardb18134f2009-03-20 20:52:18 +0000228 logging.info("Setting signal handler")
Prashanth B4ec98672014-05-15 10:44:54 -0700229 signal.signal(signal.SIGINT, handle_signal)
230 signal.signal(signal.SIGTERM, handle_signal)
jadmanski0afbb632008-06-06 21:10:57 +0000231
jamesrenc44ae992010-02-19 00:12:54 +0000232 initialize_globals()
233 scheduler_models.initialize()
234
Dan Shib9144a42014-12-01 16:09:32 -0800235 if server_manager_utils.use_server_db():
236 drone_list = server_manager_utils.get_drones()
237 else:
238 drones = global_config.global_config.get_config_value(
239 scheduler_config.CONFIG_SECTION, 'drones', default='localhost')
240 drone_list = [hostname.strip() for hostname in drones.split(',')]
showard170873e2009-01-07 00:22:26 +0000241 results_host = global_config.global_config.get_config_value(
showardd1ee1dd2009-01-07 21:33:08 +0000242 scheduler_config.CONFIG_SECTION, 'results_host', default='localhost')
showard170873e2009-01-07 00:22:26 +0000243 _drone_manager.initialize(RESULTS_DIR, drone_list, results_host)
244
showardb18134f2009-03-20 20:52:18 +0000245 logging.info("Connected! Running...")
mbligh36768f02008-02-22 18:28:33 +0000246
247
jamesrenc44ae992010-02-19 00:12:54 +0000248def initialize_globals():
249 global _drone_manager
250 _drone_manager = drone_manager.instance()
251
252
showarded2afea2009-07-07 20:54:07 +0000253def _autoserv_command_line(machines, extra_args, job=None, queue_entry=None,
254 verbose=True):
showardf1ae3542009-05-11 19:26:02 +0000255 """
256 @returns The autoserv command line as a list of executable + parameters.
257
258 @param machines - string - A machine or comma separated list of machines
259 for the (-m) flag.
showardf1ae3542009-05-11 19:26:02 +0000260 @param extra_args - list - Additional arguments to pass to autoserv.
Aviv Keshet308e7362013-05-21 14:43:16 -0700261 @param job - Job object - If supplied, -u owner, -l name, --test-retry,
262 and client -c or server -s parameters will be added.
showardf1ae3542009-05-11 19:26:02 +0000263 @param queue_entry - A HostQueueEntry object - If supplied and no Job
264 object was supplied, this will be used to lookup the Job object.
265 """
Simran Basi1bf60eb2015-12-01 16:39:29 -0800266 command = autoserv_utils.autoserv_run_job_command(_autoserv_directory,
Aviv Keshet308e7362013-05-21 14:43:16 -0700267 machines, results_directory=drone_manager.WORKING_DIRECTORY,
268 extra_args=extra_args, job=job, queue_entry=queue_entry,
Simran Basi1bf60eb2015-12-01 16:39:29 -0800269 verbose=verbose, in_lab=True)
270 return command
showard87ba02a2009-04-20 19:37:32 +0000271
272
Simran Basia858a232012-08-21 11:04:37 -0700273class BaseDispatcher(object):
Alex Miller05d7b4c2013-03-04 07:49:38 -0800274
275
jadmanski0afbb632008-06-06 21:10:57 +0000276 def __init__(self):
277 self._agents = []
showard3bb499f2008-07-03 19:42:20 +0000278 self._last_clean_time = time.time()
Alex Millerac189f32014-06-23 13:55:23 -0700279 user_cleanup_time = scheduler_config.config.clean_interval_minutes
mblighf3294cc2009-04-08 21:17:38 +0000280 self._periodic_cleanup = monitor_db_cleanup.UserCleanup(
Jakob Jülich36accc62014-07-23 10:26:55 -0700281 _db, user_cleanup_time)
Prashanth B0e960282014-05-13 19:38:28 -0700282 self._24hr_upkeep = monitor_db_cleanup.TwentyFourHourUpkeep(
Dan Shic458f662015-04-29 12:12:38 -0700283 _db, _drone_manager)
showard170873e2009-01-07 00:22:26 +0000284 self._host_agents = {}
285 self._queue_entry_agents = {}
showardf13a9e22009-12-18 22:54:09 +0000286 self._tick_count = 0
287 self._last_garbage_stats_time = time.time()
288 self._seconds_between_garbage_stats = 60 * (
289 global_config.global_config.get_config_value(
290 scheduler_config.CONFIG_SECTION,
Dale Curtis456d3c12011-07-19 11:42:51 -0700291 'gc_stats_interval_mins', type=int, default=6*60))
Simran Basi0ec94dd2012-08-28 09:50:10 -0700292 self._tick_debug = global_config.global_config.get_config_value(
293 scheduler_config.CONFIG_SECTION, 'tick_debug', type=bool,
294 default=False)
Simran Basidef92872012-09-20 13:34:34 -0700295 self._extra_debugging = global_config.global_config.get_config_value(
296 scheduler_config.CONFIG_SECTION, 'extra_debugging', type=bool,
297 default=False)
mbligh36768f02008-02-22 18:28:33 +0000298
Prashanth Bf66d51b2014-05-06 12:42:25 -0700299 # If _inline_host_acquisition is set the scheduler will acquire and
300 # release hosts against jobs inline, with the tick. Otherwise the
301 # scheduler will only focus on jobs that already have hosts, and
302 # will not explicitly unlease a host when a job finishes using it.
303 self._job_query_manager = query_managers.AFEJobQueryManager()
304 self._host_scheduler = (host_scheduler.BaseHostScheduler()
305 if _inline_host_acquisition else
306 host_scheduler.DummyHostScheduler())
307
mbligh36768f02008-02-22 18:28:33 +0000308
showard915958d2009-04-22 21:00:58 +0000309 def initialize(self, recover_hosts=True):
310 self._periodic_cleanup.initialize()
311 self._24hr_upkeep.initialize()
Dan Shi55d58992015-05-05 09:10:02 -0700312 # Execute all actions queued in the cleanup tasks. Scheduler tick will
313 # run a refresh task first. If there is any action in the queue, refresh
314 # will raise an exception.
315 _drone_manager.execute_actions()
showard915958d2009-04-22 21:00:58 +0000316
jadmanski0afbb632008-06-06 21:10:57 +0000317 # always recover processes
318 self._recover_processes()
mblighbb421852008-03-11 22:36:16 +0000319
jadmanski0afbb632008-06-06 21:10:57 +0000320 if recover_hosts:
321 self._recover_hosts()
mbligh36768f02008-02-22 18:28:33 +0000322
323
Simran Basi0ec94dd2012-08-28 09:50:10 -0700324 def _log_tick_msg(self, msg):
325 if self._tick_debug:
326 logging.debug(msg)
327
328
Simran Basidef92872012-09-20 13:34:34 -0700329 def _log_extra_msg(self, msg):
330 if self._extra_debugging:
331 logging.debug(msg)
332
333
jadmanski0afbb632008-06-06 21:10:57 +0000334 def tick(self):
Simran Basi0ec94dd2012-08-28 09:50:10 -0700335 """
336 This is an altered version of tick() where we keep track of when each
337 major step begins so we can try to figure out where we are using most
338 of the tick time.
339 """
Gabe Black1e1c41b2015-02-04 23:55:15 -0800340 timer = autotest_stats.Timer('scheduler.tick')
Simran Basi3f6717d2012-09-13 15:21:22 -0700341 self._log_tick_msg('Calling new tick, starting garbage collection().')
showardf13a9e22009-12-18 22:54:09 +0000342 self._garbage_collection()
Prashanth B340fd1e2014-06-22 12:44:10 -0700343 self._log_tick_msg('Calling _drone_manager.trigger_refresh().')
344 _drone_manager.trigger_refresh()
Simran Basi3f6717d2012-09-13 15:21:22 -0700345 self._log_tick_msg('Calling _process_recurring_runs().')
showard29f7cd22009-04-29 21:16:24 +0000346 self._process_recurring_runs()
Simran Basi3f6717d2012-09-13 15:21:22 -0700347 self._log_tick_msg('Calling _schedule_delay_tasks().')
showard8cc058f2009-09-08 16:26:33 +0000348 self._schedule_delay_tasks()
Simran Basi3f6717d2012-09-13 15:21:22 -0700349 self._log_tick_msg('Calling _schedule_running_host_queue_entries().')
showard8cc058f2009-09-08 16:26:33 +0000350 self._schedule_running_host_queue_entries()
Simran Basi3f6717d2012-09-13 15:21:22 -0700351 self._log_tick_msg('Calling _schedule_special_tasks().')
showard8cc058f2009-09-08 16:26:33 +0000352 self._schedule_special_tasks()
Simran Basi3f6717d2012-09-13 15:21:22 -0700353 self._log_tick_msg('Calling _schedule_new_jobs().')
showard65db3932009-10-28 19:54:35 +0000354 self._schedule_new_jobs()
Prashanth B340fd1e2014-06-22 12:44:10 -0700355 self._log_tick_msg('Calling _drone_manager.sync_refresh().')
356 _drone_manager.sync_refresh()
Dan Shi55d58992015-05-05 09:10:02 -0700357 # _run_cleanup must be called between drone_manager.sync_refresh, and
358 # drone_manager.execute_actions, as sync_refresh will clear the calls
359 # queued in drones. Therefore, any action that calls drone.queue_call
360 # to add calls to the drone._calls, should be after drone refresh is
361 # completed and before drone_manager.execute_actions at the end of the
362 # tick.
363 self._log_tick_msg('Calling _run_cleanup().')
364 self._run_cleanup()
Prashanth B67548092014-07-11 18:46:01 -0700365 self._log_tick_msg('Calling _find_aborting().')
366 self._find_aborting()
367 self._log_tick_msg('Calling _find_aborted_special_tasks().')
368 self._find_aborted_special_tasks()
Simran Basi3f6717d2012-09-13 15:21:22 -0700369 self._log_tick_msg('Calling _handle_agents().')
jadmanski0afbb632008-06-06 21:10:57 +0000370 self._handle_agents()
Simran Basi3f6717d2012-09-13 15:21:22 -0700371 self._log_tick_msg('Calling _host_scheduler.tick().')
jamesrene21bf412010-02-26 02:30:07 +0000372 self._host_scheduler.tick()
Simran Basi3f6717d2012-09-13 15:21:22 -0700373 self._log_tick_msg('Calling _drone_manager.execute_actions().')
showard170873e2009-01-07 00:22:26 +0000374 _drone_manager.execute_actions()
Simran Basi3f6717d2012-09-13 15:21:22 -0700375 self._log_tick_msg('Calling '
Simran Basi0ec94dd2012-08-28 09:50:10 -0700376 'email_manager.manager.send_queued_emails().')
Fang Deng1d6c2a02013-04-17 15:25:45 -0700377 with timer.get_client('email_manager_send_queued_emails'):
378 email_manager.manager.send_queued_emails()
Simran Basi3f6717d2012-09-13 15:21:22 -0700379 self._log_tick_msg('Calling django.db.reset_queries().')
Fang Deng1d6c2a02013-04-17 15:25:45 -0700380 with timer.get_client('django_db_reset_queries'):
381 django.db.reset_queries()
showardf13a9e22009-12-18 22:54:09 +0000382 self._tick_count += 1
mbligh36768f02008-02-22 18:28:33 +0000383
showard97aed502008-11-04 02:01:24 +0000384
mblighf3294cc2009-04-08 21:17:38 +0000385 def _run_cleanup(self):
386 self._periodic_cleanup.run_cleanup_maybe()
387 self._24hr_upkeep.run_cleanup_maybe()
showarda3ab0d52008-11-03 19:03:47 +0000388
mbligh36768f02008-02-22 18:28:33 +0000389
showardf13a9e22009-12-18 22:54:09 +0000390 def _garbage_collection(self):
391 threshold_time = time.time() - self._seconds_between_garbage_stats
392 if threshold_time < self._last_garbage_stats_time:
393 # Don't generate these reports very often.
394 return
395
396 self._last_garbage_stats_time = time.time()
397 # Force a full level 0 collection (because we can, it doesn't hurt
398 # at this interval).
399 gc.collect()
400 logging.info('Logging garbage collector stats on tick %d.',
401 self._tick_count)
402 gc_stats._log_garbage_collector_stats()
403
404
showard170873e2009-01-07 00:22:26 +0000405 def _register_agent_for_ids(self, agent_dict, object_ids, agent):
406 for object_id in object_ids:
407 agent_dict.setdefault(object_id, set()).add(agent)
408
409
410 def _unregister_agent_for_ids(self, agent_dict, object_ids, agent):
411 for object_id in object_ids:
412 assert object_id in agent_dict
413 agent_dict[object_id].remove(agent)
Dan Shi76af8022013-10-19 01:59:49 -0700414 # If an ID has no more active agent associated, there is no need to
415 # keep it in the dictionary. Otherwise, scheduler will keep an
416 # unnecessarily big dictionary until being restarted.
417 if not agent_dict[object_id]:
418 agent_dict.pop(object_id)
showard170873e2009-01-07 00:22:26 +0000419
420
showardd1195652009-12-08 22:21:02 +0000421 def add_agent_task(self, agent_task):
beeps8bb1f7d2013-08-05 01:30:09 -0700422 """
423 Creates and adds an agent to the dispatchers list.
424
425 In creating the agent we also pass on all the queue_entry_ids and
426 host_ids from the special agent task. For every agent we create, we
427 add it to 1. a dict against the queue_entry_ids given to it 2. A dict
428 against the host_ids given to it. So theoritically, a host can have any
429 number of agents associated with it, and each of them can have any
430 special agent task, though in practice we never see > 1 agent/task per
431 host at any time.
432
433 @param agent_task: A SpecialTask for the agent to manage.
434 """
showardd1195652009-12-08 22:21:02 +0000435 agent = Agent(agent_task)
jadmanski0afbb632008-06-06 21:10:57 +0000436 self._agents.append(agent)
437 agent.dispatcher = self
showard170873e2009-01-07 00:22:26 +0000438 self._register_agent_for_ids(self._host_agents, agent.host_ids, agent)
439 self._register_agent_for_ids(self._queue_entry_agents,
440 agent.queue_entry_ids, agent)
mblighd5c95802008-03-05 00:33:46 +0000441
showard170873e2009-01-07 00:22:26 +0000442
443 def get_agents_for_entry(self, queue_entry):
444 """
445 Find agents corresponding to the specified queue_entry.
446 """
showardd3dc1992009-04-22 21:01:40 +0000447 return list(self._queue_entry_agents.get(queue_entry.id, set()))
showard170873e2009-01-07 00:22:26 +0000448
449
450 def host_has_agent(self, host):
451 """
452 Determine if there is currently an Agent present using this host.
453 """
454 return bool(self._host_agents.get(host.id, None))
mbligh36768f02008-02-22 18:28:33 +0000455
456
jadmanski0afbb632008-06-06 21:10:57 +0000457 def remove_agent(self, agent):
458 self._agents.remove(agent)
showard170873e2009-01-07 00:22:26 +0000459 self._unregister_agent_for_ids(self._host_agents, agent.host_ids,
460 agent)
461 self._unregister_agent_for_ids(self._queue_entry_agents,
462 agent.queue_entry_ids, agent)
showardec113162008-05-08 00:52:49 +0000463
464
showard8cc058f2009-09-08 16:26:33 +0000465 def _host_has_scheduled_special_task(self, host):
466 return bool(models.SpecialTask.objects.filter(host__id=host.id,
467 is_active=False,
468 is_complete=False))
469
470
jadmanski0afbb632008-06-06 21:10:57 +0000471 def _recover_processes(self):
showardd1195652009-12-08 22:21:02 +0000472 agent_tasks = self._create_recovery_agent_tasks()
473 self._register_pidfiles(agent_tasks)
showard170873e2009-01-07 00:22:26 +0000474 _drone_manager.refresh()
showardd1195652009-12-08 22:21:02 +0000475 self._recover_tasks(agent_tasks)
showard8cc058f2009-09-08 16:26:33 +0000476 self._recover_pending_entries()
showardb8900452009-10-12 20:31:01 +0000477 self._check_for_unrecovered_verifying_entries()
showard170873e2009-01-07 00:22:26 +0000478 self._reverify_remaining_hosts()
479 # reinitialize drones after killing orphaned processes, since they can
480 # leave around files when they die
481 _drone_manager.execute_actions()
482 _drone_manager.reinitialize_drones()
mblighbb421852008-03-11 22:36:16 +0000483
showard170873e2009-01-07 00:22:26 +0000484
showardd1195652009-12-08 22:21:02 +0000485 def _create_recovery_agent_tasks(self):
486 return (self._get_queue_entry_agent_tasks()
487 + self._get_special_task_agent_tasks(is_active=True))
488
489
490 def _get_queue_entry_agent_tasks(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700491 """
492 Get agent tasks for all hqe in the specified states.
493
494 Loosely this translates to taking a hqe in one of the specified states,
495 say parsing, and getting an AgentTask for it, like the FinalReparseTask,
496 through _get_agent_task_for_queue_entry. Each queue entry can only have
497 one agent task at a time, but there might be multiple queue entries in
498 the group.
499
500 @return: A list of AgentTasks.
501 """
showardd1195652009-12-08 22:21:02 +0000502 # host queue entry statuses handled directly by AgentTasks (Verifying is
503 # handled through SpecialTasks, so is not listed here)
504 statuses = (models.HostQueueEntry.Status.STARTING,
505 models.HostQueueEntry.Status.RUNNING,
506 models.HostQueueEntry.Status.GATHERING,
mbligh4608b002010-01-05 18:22:35 +0000507 models.HostQueueEntry.Status.PARSING,
508 models.HostQueueEntry.Status.ARCHIVING)
showardd1195652009-12-08 22:21:02 +0000509 status_list = ','.join("'%s'" % status for status in statuses)
jamesrenc44ae992010-02-19 00:12:54 +0000510 queue_entries = scheduler_models.HostQueueEntry.fetch(
showardd1195652009-12-08 22:21:02 +0000511 where='status IN (%s)' % status_list)
Gabe Black1e1c41b2015-02-04 23:55:15 -0800512 autotest_stats.Gauge('scheduler.jobs_per_tick').send(
Alex Miller47cd2472013-11-25 15:20:04 -0800513 'running', len(queue_entries))
showardd1195652009-12-08 22:21:02 +0000514
515 agent_tasks = []
516 used_queue_entries = set()
517 for entry in queue_entries:
518 if self.get_agents_for_entry(entry):
519 # already being handled
520 continue
521 if entry in used_queue_entries:
522 # already picked up by a synchronous job
523 continue
524 agent_task = self._get_agent_task_for_queue_entry(entry)
525 agent_tasks.append(agent_task)
526 used_queue_entries.update(agent_task.queue_entries)
527 return agent_tasks
showard170873e2009-01-07 00:22:26 +0000528
529
showardd1195652009-12-08 22:21:02 +0000530 def _get_special_task_agent_tasks(self, is_active=False):
531 special_tasks = models.SpecialTask.objects.filter(
532 is_active=is_active, is_complete=False)
533 return [self._get_agent_task_for_special_task(task)
534 for task in special_tasks]
535
536
537 def _get_agent_task_for_queue_entry(self, queue_entry):
538 """
beeps8bb1f7d2013-08-05 01:30:09 -0700539 Construct an AgentTask instance for the given active HostQueueEntry.
540
showardd1195652009-12-08 22:21:02 +0000541 @param queue_entry: a HostQueueEntry
beeps8bb1f7d2013-08-05 01:30:09 -0700542 @return: an AgentTask to run the queue entry
showardd1195652009-12-08 22:21:02 +0000543 """
544 task_entries = queue_entry.job.get_group_entries(queue_entry)
545 self._check_for_duplicate_host_entries(task_entries)
546
547 if queue_entry.status in (models.HostQueueEntry.Status.STARTING,
548 models.HostQueueEntry.Status.RUNNING):
showarda9545c02009-12-18 22:44:26 +0000549 if queue_entry.is_hostless():
550 return HostlessQueueTask(queue_entry=queue_entry)
showardd1195652009-12-08 22:21:02 +0000551 return QueueTask(queue_entries=task_entries)
552 if queue_entry.status == models.HostQueueEntry.Status.GATHERING:
beeps5e2bb4a2013-10-28 11:26:45 -0700553 return postjob_task.GatherLogsTask(queue_entries=task_entries)
showardd1195652009-12-08 22:21:02 +0000554 if queue_entry.status == models.HostQueueEntry.Status.PARSING:
beeps5e2bb4a2013-10-28 11:26:45 -0700555 return postjob_task.FinalReparseTask(queue_entries=task_entries)
mbligh4608b002010-01-05 18:22:35 +0000556 if queue_entry.status == models.HostQueueEntry.Status.ARCHIVING:
beeps5e2bb4a2013-10-28 11:26:45 -0700557 return postjob_task.ArchiveResultsTask(queue_entries=task_entries)
showardd1195652009-12-08 22:21:02 +0000558
Prashanth B0e960282014-05-13 19:38:28 -0700559 raise scheduler_lib.SchedulerError(
Dale Curtisaa513362011-03-01 17:27:44 -0800560 '_get_agent_task_for_queue_entry got entry with '
561 'invalid status %s: %s' % (queue_entry.status, queue_entry))
showardd1195652009-12-08 22:21:02 +0000562
563
564 def _check_for_duplicate_host_entries(self, task_entries):
mbligh4608b002010-01-05 18:22:35 +0000565 non_host_statuses = (models.HostQueueEntry.Status.PARSING,
566 models.HostQueueEntry.Status.ARCHIVING)
showardd1195652009-12-08 22:21:02 +0000567 for task_entry in task_entries:
showarda9545c02009-12-18 22:44:26 +0000568 using_host = (task_entry.host is not None
mbligh4608b002010-01-05 18:22:35 +0000569 and task_entry.status not in non_host_statuses)
showarda9545c02009-12-18 22:44:26 +0000570 if using_host:
showardd1195652009-12-08 22:21:02 +0000571 self._assert_host_has_no_agent(task_entry)
572
573
574 def _assert_host_has_no_agent(self, entry):
575 """
576 @param entry: a HostQueueEntry or a SpecialTask
577 """
578 if self.host_has_agent(entry.host):
579 agent = tuple(self._host_agents.get(entry.host.id))[0]
Prashanth B0e960282014-05-13 19:38:28 -0700580 raise scheduler_lib.SchedulerError(
showardd1195652009-12-08 22:21:02 +0000581 'While scheduling %s, host %s already has a host agent %s'
582 % (entry, entry.host, agent.task))
583
584
585 def _get_agent_task_for_special_task(self, special_task):
586 """
587 Construct an AgentTask class to run the given SpecialTask and add it
588 to this dispatcher.
beeps8bb1f7d2013-08-05 01:30:09 -0700589
MK Ryu35d661e2014-09-25 17:44:10 -0700590 A special task is created through schedule_special_tasks, but only if
beeps8bb1f7d2013-08-05 01:30:09 -0700591 the host doesn't already have an agent. This happens through
592 add_agent_task. All special agent tasks are given a host on creation,
593 and a Null hqe. To create a SpecialAgentTask object, you need a
594 models.SpecialTask. If the SpecialTask used to create a SpecialAgentTask
595 object contains a hqe it's passed on to the special agent task, which
596 creates a HostQueueEntry and saves it as it's queue_entry.
597
showardd1195652009-12-08 22:21:02 +0000598 @param special_task: a models.SpecialTask instance
599 @returns an AgentTask to run this SpecialTask
600 """
601 self._assert_host_has_no_agent(special_task)
602
beeps5e2bb4a2013-10-28 11:26:45 -0700603 special_agent_task_classes = (prejob_task.CleanupTask,
604 prejob_task.VerifyTask,
605 prejob_task.RepairTask,
606 prejob_task.ResetTask,
607 prejob_task.ProvisionTask)
608
showardd1195652009-12-08 22:21:02 +0000609 for agent_task_class in special_agent_task_classes:
610 if agent_task_class.TASK_TYPE == special_task.task:
611 return agent_task_class(task=special_task)
612
Prashanth B0e960282014-05-13 19:38:28 -0700613 raise scheduler_lib.SchedulerError(
Dale Curtisaa513362011-03-01 17:27:44 -0800614 'No AgentTask class for task', str(special_task))
showardd1195652009-12-08 22:21:02 +0000615
616
617 def _register_pidfiles(self, agent_tasks):
618 for agent_task in agent_tasks:
619 agent_task.register_necessary_pidfiles()
620
621
622 def _recover_tasks(self, agent_tasks):
623 orphans = _drone_manager.get_orphaned_autoserv_processes()
624
625 for agent_task in agent_tasks:
626 agent_task.recover()
627 if agent_task.monitor and agent_task.monitor.has_process():
628 orphans.discard(agent_task.monitor.get_process())
629 self.add_agent_task(agent_task)
630
631 self._check_for_remaining_orphan_processes(orphans)
showarded2afea2009-07-07 20:54:07 +0000632
633
showard8cc058f2009-09-08 16:26:33 +0000634 def _get_unassigned_entries(self, status):
jamesrenc44ae992010-02-19 00:12:54 +0000635 for entry in scheduler_models.HostQueueEntry.fetch(where="status = '%s'"
636 % status):
showard0db3d432009-10-12 20:29:15 +0000637 if entry.status == status and not self.get_agents_for_entry(entry):
638 # The status can change during iteration, e.g., if job.run()
639 # sets a group of queue entries to Starting
showard8cc058f2009-09-08 16:26:33 +0000640 yield entry
641
642
showard6878e8b2009-07-20 22:37:45 +0000643 def _check_for_remaining_orphan_processes(self, orphans):
644 if not orphans:
645 return
646 subject = 'Unrecovered orphan autoserv processes remain'
647 message = '\n'.join(str(process) for process in orphans)
648 email_manager.manager.enqueue_notify_email(subject, message)
mbligh5fa9e112009-08-03 16:46:06 +0000649
650 die_on_orphans = global_config.global_config.get_config_value(
651 scheduler_config.CONFIG_SECTION, 'die_on_orphans', type=bool)
652
653 if die_on_orphans:
654 raise RuntimeError(subject + '\n' + message)
jadmanski0afbb632008-06-06 21:10:57 +0000655
showard170873e2009-01-07 00:22:26 +0000656
showard8cc058f2009-09-08 16:26:33 +0000657 def _recover_pending_entries(self):
658 for entry in self._get_unassigned_entries(
659 models.HostQueueEntry.Status.PENDING):
showard56824072009-10-12 20:30:21 +0000660 logging.info('Recovering Pending entry %s', entry)
showard8cc058f2009-09-08 16:26:33 +0000661 entry.on_pending()
662
663
showardb8900452009-10-12 20:31:01 +0000664 def _check_for_unrecovered_verifying_entries(self):
jamesrenc44ae992010-02-19 00:12:54 +0000665 queue_entries = scheduler_models.HostQueueEntry.fetch(
showardb8900452009-10-12 20:31:01 +0000666 where='status = "%s"' % models.HostQueueEntry.Status.VERIFYING)
667 unrecovered_hqes = []
668 for queue_entry in queue_entries:
669 special_tasks = models.SpecialTask.objects.filter(
670 task__in=(models.SpecialTask.Task.CLEANUP,
671 models.SpecialTask.Task.VERIFY),
672 queue_entry__id=queue_entry.id,
673 is_complete=False)
674 if special_tasks.count() == 0:
675 unrecovered_hqes.append(queue_entry)
showardd3dc1992009-04-22 21:01:40 +0000676
showardb8900452009-10-12 20:31:01 +0000677 if unrecovered_hqes:
678 message = '\n'.join(str(hqe) for hqe in unrecovered_hqes)
Prashanth B0e960282014-05-13 19:38:28 -0700679 raise scheduler_lib.SchedulerError(
showard37757f32009-10-19 18:34:24 +0000680 '%d unrecovered verifying host queue entries:\n%s' %
showardb8900452009-10-12 20:31:01 +0000681 (len(unrecovered_hqes), message))
showard170873e2009-01-07 00:22:26 +0000682
683
showard65db3932009-10-28 19:54:35 +0000684 def _schedule_special_tasks(self):
685 """
686 Execute queued SpecialTasks that are ready to run on idle hosts.
beeps8bb1f7d2013-08-05 01:30:09 -0700687
688 Special tasks include PreJobTasks like verify, reset and cleanup.
689 They are created through _schedule_new_jobs and associated with a hqe
690 This method translates SpecialTasks to the appropriate AgentTask and
691 adds them to the dispatchers agents list, so _handle_agents can execute
692 them.
showard65db3932009-10-28 19:54:35 +0000693 """
Prashanth B4ec98672014-05-15 10:44:54 -0700694 # When the host scheduler is responsible for acquisition we only want
695 # to run tasks with leased hosts. All hqe tasks will already have
696 # leased hosts, and we don't want to run frontend tasks till the host
697 # scheduler has vetted the assignment. Note that this doesn't include
698 # frontend tasks with hosts leased by other active hqes.
699 for task in self._job_query_manager.get_prioritized_special_tasks(
700 only_tasks_with_leased_hosts=not _inline_host_acquisition):
showard8cc058f2009-09-08 16:26:33 +0000701 if self.host_has_agent(task.host):
showard2fe3f1d2009-07-06 20:19:11 +0000702 continue
showardd1195652009-12-08 22:21:02 +0000703 self.add_agent_task(self._get_agent_task_for_special_task(task))
showard1ff7b2e2009-05-15 23:17:18 +0000704
705
showard170873e2009-01-07 00:22:26 +0000706 def _reverify_remaining_hosts(self):
showarded2afea2009-07-07 20:54:07 +0000707 # recover active hosts that have not yet been recovered, although this
showard170873e2009-01-07 00:22:26 +0000708 # should never happen
showarded2afea2009-07-07 20:54:07 +0000709 message = ('Recovering active host %s - this probably indicates a '
showard170873e2009-01-07 00:22:26 +0000710 'scheduler bug')
showarded2afea2009-07-07 20:54:07 +0000711 self._reverify_hosts_where(
Alex Millerdfff2fd2013-05-28 13:05:06 -0700712 "status IN ('Repairing', 'Verifying', 'Cleaning', 'Provisioning')",
showarded2afea2009-07-07 20:54:07 +0000713 print_message=message)
mblighbb421852008-03-11 22:36:16 +0000714
715
jadmanski0afbb632008-06-06 21:10:57 +0000716 def _reverify_hosts_where(self, where,
showard2fe3f1d2009-07-06 20:19:11 +0000717 print_message='Reverifying host %s'):
Dale Curtis456d3c12011-07-19 11:42:51 -0700718 full_where='locked = 0 AND invalid = 0 AND ' + where
jamesrenc44ae992010-02-19 00:12:54 +0000719 for host in scheduler_models.Host.fetch(where=full_where):
showard170873e2009-01-07 00:22:26 +0000720 if self.host_has_agent(host):
721 # host has already been recovered in some way
jadmanski0afbb632008-06-06 21:10:57 +0000722 continue
showard8cc058f2009-09-08 16:26:33 +0000723 if self._host_has_scheduled_special_task(host):
Paul Hobbsb92af21b2015-04-09 15:12:41 -0700724 # host will have a special task scheduled on the next tick
showard8cc058f2009-09-08 16:26:33 +0000725 continue
showard170873e2009-01-07 00:22:26 +0000726 if print_message:
showardb18134f2009-03-20 20:52:18 +0000727 logging.info(print_message, host.hostname)
showard8cc058f2009-09-08 16:26:33 +0000728 models.SpecialTask.objects.create(
729 task=models.SpecialTask.Task.CLEANUP,
showard9bb960b2009-11-19 01:02:11 +0000730 host=models.Host.objects.get(id=host.id))
mbligh36768f02008-02-22 18:28:33 +0000731
732
jadmanski0afbb632008-06-06 21:10:57 +0000733 def _recover_hosts(self):
734 # recover "Repair Failed" hosts
735 message = 'Reverifying dead host %s'
736 self._reverify_hosts_where("status = 'Repair Failed'",
737 print_message=message)
mbligh62ba2ed2008-04-30 17:09:25 +0000738
739
showard89f84db2009-03-12 20:39:13 +0000740 def _refresh_pending_queue_entries(self):
741 """
742 Lookup the pending HostQueueEntries and call our HostScheduler
743 refresh() method given that list. Return the list.
744
745 @returns A list of pending HostQueueEntries sorted in priority order.
746 """
Prashanth Bf66d51b2014-05-06 12:42:25 -0700747 queue_entries = self._job_query_manager.get_pending_queue_entries(
748 only_hostless=not _inline_host_acquisition)
showard63a34772008-08-18 19:32:50 +0000749 if not queue_entries:
showard89f84db2009-03-12 20:39:13 +0000750 return []
showard89f84db2009-03-12 20:39:13 +0000751 return queue_entries
752
753
showarda9545c02009-12-18 22:44:26 +0000754 def _schedule_hostless_job(self, queue_entry):
beepscc9fc702013-12-02 12:45:38 -0800755 """Schedule a hostless (suite) job.
756
757 @param queue_entry: The queue_entry representing the hostless job.
758 """
showarda9545c02009-12-18 22:44:26 +0000759 self.add_agent_task(HostlessQueueTask(queue_entry))
Jakob Juelichd615a1e2014-09-04 11:48:36 -0700760
761 # Need to set execution_subdir before setting the status:
762 # After a restart of the scheduler, agents will be restored for HQEs in
763 # Starting, Running, Gathering, Parsing or Archiving. To do this, the
764 # execution_subdir is needed. Therefore it must be set before entering
765 # one of these states.
766 # Otherwise, if the scheduler was interrupted between setting the status
767 # and the execution_subdir, upon it's restart restoring agents would
768 # fail.
769 # Is there a way to get a status in one of these states without going
770 # through this code? Following cases are possible:
771 # - If it's aborted before being started:
772 # active bit will be 0, so there's nothing to parse, it will just be
773 # set to completed by _find_aborting. Critical statuses are skipped.
774 # - If it's aborted or it fails after being started:
775 # It was started, so this code was executed.
776 queue_entry.update_field('execution_subdir', 'hostless')
jamesren47bd7372010-03-13 00:58:17 +0000777 queue_entry.set_status(models.HostQueueEntry.Status.STARTING)
showarda9545c02009-12-18 22:44:26 +0000778
779
beepscc9fc702013-12-02 12:45:38 -0800780 def _schedule_host_job(self, host, queue_entry):
781 """Schedules a job on the given host.
782
783 1. Assign the host to the hqe, if it isn't already assigned.
784 2. Create a SpecialAgentTask for the hqe.
785 3. Activate the hqe.
786
787 @param queue_entry: The job to schedule.
788 @param host: The host to schedule the job on.
789 """
790 if self.host_has_agent(host):
791 host_agent_task = list(self._host_agents.get(host.id))[0].task
792 subject = 'Host with agents assigned to an HQE'
793 message = ('HQE: %s assigned host %s, but the host has '
794 'agent: %s for queue_entry %s. The HQE '
beepsf7d35162014-02-13 16:42:13 -0800795 'will have to try and acquire a host next tick ' %
beepscc9fc702013-12-02 12:45:38 -0800796 (queue_entry, host.hostname, host_agent_task,
797 host_agent_task.queue_entry))
798 email_manager.manager.enqueue_notify_email(subject, message)
beepscc9fc702013-12-02 12:45:38 -0800799 else:
Prashanth Bf66d51b2014-05-06 12:42:25 -0700800 self._host_scheduler.schedule_host_job(host, queue_entry)
beepscc9fc702013-12-02 12:45:38 -0800801
802
showard89f84db2009-03-12 20:39:13 +0000803 def _schedule_new_jobs(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700804 """
805 Find any new HQEs and call schedule_pre_job_tasks for it.
806
807 This involves setting the status of the HQE and creating a row in the
808 db corresponding the the special task, through
809 scheduler_models._queue_special_task. The new db row is then added as
810 an agent to the dispatcher through _schedule_special_tasks and
811 scheduled for execution on the drone through _handle_agents.
812 """
showard89f84db2009-03-12 20:39:13 +0000813 queue_entries = self._refresh_pending_queue_entries()
showard89f84db2009-03-12 20:39:13 +0000814
beepscc9fc702013-12-02 12:45:38 -0800815 key = 'scheduler.jobs_per_tick'
beepsb255fc52013-10-13 23:28:54 -0700816 new_hostless_jobs = 0
beepsb255fc52013-10-13 23:28:54 -0700817 new_jobs_with_hosts = 0
818 new_jobs_need_hosts = 0
beepscc9fc702013-12-02 12:45:38 -0800819 host_jobs = []
Simran Basi3f6717d2012-09-13 15:21:22 -0700820 logging.debug('Processing %d queue_entries', len(queue_entries))
jamesren883492a2010-02-12 00:45:18 +0000821
beepscc9fc702013-12-02 12:45:38 -0800822 for queue_entry in queue_entries:
jamesren883492a2010-02-12 00:45:18 +0000823 if queue_entry.is_hostless():
showarda9545c02009-12-18 22:44:26 +0000824 self._schedule_hostless_job(queue_entry)
beepsb255fc52013-10-13 23:28:54 -0700825 new_hostless_jobs = new_hostless_jobs + 1
showarde55955f2009-10-07 20:48:58 +0000826 else:
beepscc9fc702013-12-02 12:45:38 -0800827 host_jobs.append(queue_entry)
beepsb255fc52013-10-13 23:28:54 -0700828 new_jobs_need_hosts = new_jobs_need_hosts + 1
beepsb255fc52013-10-13 23:28:54 -0700829
Gabe Black1e1c41b2015-02-04 23:55:15 -0800830 autotest_stats.Gauge(key).send('new_hostless_jobs', new_hostless_jobs)
beepscc9fc702013-12-02 12:45:38 -0800831 if not host_jobs:
832 return
Prashanth Bf66d51b2014-05-06 12:42:25 -0700833 if not _inline_host_acquisition:
834 message = ('Found %s jobs that need hosts though '
835 '_inline_host_acquisition=%s. Will acquire hosts.' %
836 ([str(job) for job in host_jobs],
837 _inline_host_acquisition))
838 email_manager.manager.enqueue_notify_email(
839 'Processing unexpected host acquisition requests', message)
840 jobs_with_hosts = self._host_scheduler.find_hosts_for_jobs(host_jobs)
841 for host_assignment in jobs_with_hosts:
842 self._schedule_host_job(host_assignment.host, host_assignment.job)
843 new_jobs_with_hosts = new_jobs_with_hosts + 1
beepscc9fc702013-12-02 12:45:38 -0800844
Gabe Black1e1c41b2015-02-04 23:55:15 -0800845 autotest_stats.Gauge(key).send('new_jobs_with_hosts',
846 new_jobs_with_hosts)
847 autotest_stats.Gauge(key).send('new_jobs_without_hosts',
848 new_jobs_need_hosts -
849 new_jobs_with_hosts)
showardb95b1bd2008-08-15 18:11:04 +0000850
851
showard8cc058f2009-09-08 16:26:33 +0000852 def _schedule_running_host_queue_entries(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700853 """
854 Adds agents to the dispatcher.
855
856 Any AgentTask, like the QueueTask, is wrapped in an Agent. The
857 QueueTask for example, will have a job with a control file, and
858 the agent will have methods that poll, abort and check if the queue
859 task is finished. The dispatcher runs the agent_task, as well as
860 other agents in it's _agents member, through _handle_agents, by
861 calling the Agents tick().
862
863 This method creates an agent for each HQE in one of (starting, running,
864 gathering, parsing, archiving) states, and adds it to the dispatcher so
865 it is handled by _handle_agents.
866 """
showardd1195652009-12-08 22:21:02 +0000867 for agent_task in self._get_queue_entry_agent_tasks():
868 self.add_agent_task(agent_task)
showard8cc058f2009-09-08 16:26:33 +0000869
870
871 def _schedule_delay_tasks(self):
jamesrenc44ae992010-02-19 00:12:54 +0000872 for entry in scheduler_models.HostQueueEntry.fetch(
873 where='status = "%s"' % models.HostQueueEntry.Status.WAITING):
showard8cc058f2009-09-08 16:26:33 +0000874 task = entry.job.schedule_delayed_callback_task(entry)
875 if task:
showardd1195652009-12-08 22:21:02 +0000876 self.add_agent_task(task)
showard8cc058f2009-09-08 16:26:33 +0000877
878
jadmanski0afbb632008-06-06 21:10:57 +0000879 def _find_aborting(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700880 """
881 Looks through the afe_host_queue_entries for an aborted entry.
882
883 The aborted bit is set on an HQE in many ways, the most common
884 being when a user requests an abort through the frontend, which
885 results in an rpc from the afe to abort_host_queue_entries.
886 """
jamesrene7c65cb2010-06-08 20:38:10 +0000887 jobs_to_stop = set()
jamesrenc44ae992010-02-19 00:12:54 +0000888 for entry in scheduler_models.HostQueueEntry.fetch(
Alex Miller9fe39662013-08-09 11:53:09 -0700889 where='aborted=1 and complete=0'):
Prashanth Balasubramanian047e1c52014-12-22 19:25:23 -0800890
891 # If the job is running on a shard, let the shard handle aborting
892 # it and sync back the right status.
Prashanth Balasubramanian8c98ac12014-12-23 11:26:44 -0800893 if entry.job.shard_id is not None and not server_utils.is_shard():
Prashanth Balasubramanian047e1c52014-12-22 19:25:23 -0800894 logging.info('Waiting for shard %s to abort hqe %s',
895 entry.job.shard_id, entry)
896 continue
897
showardf4a2e502009-07-28 20:06:39 +0000898 logging.info('Aborting %s', entry)
beepse50d8752013-11-20 18:23:02 -0800899
900 # The task would have started off with both is_complete and
901 # is_active = False. Aborted tasks are neither active nor complete.
902 # For all currently active tasks this will happen through the agent,
903 # but we need to manually update the special tasks that haven't
904 # started yet, because they don't have agents.
905 models.SpecialTask.objects.filter(is_active=False,
906 queue_entry_id=entry.id).update(is_complete=True)
907
showardd3dc1992009-04-22 21:01:40 +0000908 for agent in self.get_agents_for_entry(entry):
909 agent.abort()
910 entry.abort(self)
jamesrene7c65cb2010-06-08 20:38:10 +0000911 jobs_to_stop.add(entry.job)
Simran Basi3f6717d2012-09-13 15:21:22 -0700912 logging.debug('Aborting %d jobs this tick.', len(jobs_to_stop))
jamesrene7c65cb2010-06-08 20:38:10 +0000913 for job in jobs_to_stop:
914 job.stop_if_necessary()
jadmanski0afbb632008-06-06 21:10:57 +0000915
916
beeps8bb1f7d2013-08-05 01:30:09 -0700917 def _find_aborted_special_tasks(self):
918 """
919 Find SpecialTasks that have been marked for abortion.
920
921 Poll the database looking for SpecialTasks that are active
922 and have been marked for abortion, then abort them.
923 """
924
925 # The completed and active bits are very important when it comes
926 # to scheduler correctness. The active bit is set through the prolog
927 # of a special task, and reset through the cleanup method of the
928 # SpecialAgentTask. The cleanup is called both through the abort and
929 # epilog. The complete bit is set in several places, and in general
930 # a hanging job will have is_active=1 is_complete=0, while a special
931 # task which completed will have is_active=0 is_complete=1. To check
932 # aborts we directly check active because the complete bit is set in
933 # several places, including the epilog of agent tasks.
934 aborted_tasks = models.SpecialTask.objects.filter(is_active=True,
935 is_aborted=True)
936 for task in aborted_tasks:
937 # There are 2 ways to get the agent associated with a task,
938 # through the host and through the hqe. A special task
939 # always needs a host, but doesn't always need a hqe.
940 for agent in self._host_agents.get(task.host.id, []):
beeps5e2bb4a2013-10-28 11:26:45 -0700941 if isinstance(agent.task, agent_task.SpecialAgentTask):
Prashanth Bf47a6bb2014-08-29 18:07:06 +0000942
beeps8bb1f7d2013-08-05 01:30:09 -0700943 # The epilog preforms critical actions such as
944 # queueing the next SpecialTask, requeuing the
945 # hqe etc, however it doesn't actually kill the
946 # monitor process and set the 'done' bit. Epilogs
947 # assume that the job failed, and that the monitor
948 # process has already written an exit code. The
949 # done bit is a necessary condition for
950 # _handle_agents to schedule any more special
951 # tasks against the host, and it must be set
952 # in addition to is_active, is_complete and success.
953 agent.task.epilog()
Prashanth Bf47a6bb2014-08-29 18:07:06 +0000954 agent.task.abort()
beeps8bb1f7d2013-08-05 01:30:09 -0700955
956
Paul Hobbsb92af21b2015-04-09 15:12:41 -0700957 def _can_start_agent(self, agent, have_reached_limit):
showard4c5374f2008-09-04 17:02:56 +0000958 # always allow zero-process agents to run
showardd1195652009-12-08 22:21:02 +0000959 if agent.task.num_processes == 0:
showard4c5374f2008-09-04 17:02:56 +0000960 return True
961 # don't allow any nonzero-process agents to run after we've reached a
962 # limit (this avoids starvation of many-process agents)
963 if have_reached_limit:
964 return False
965 # total process throttling
showard9bb960b2009-11-19 01:02:11 +0000966 max_runnable_processes = _drone_manager.max_runnable_processes(
jamesren76fcf192010-04-21 20:39:50 +0000967 agent.task.owner_username,
968 agent.task.get_drone_hostnames_allowed())
showardd1195652009-12-08 22:21:02 +0000969 if agent.task.num_processes > max_runnable_processes:
showard4c5374f2008-09-04 17:02:56 +0000970 return False
showard4c5374f2008-09-04 17:02:56 +0000971 return True
972
973
jadmanski0afbb632008-06-06 21:10:57 +0000974 def _handle_agents(self):
beeps8bb1f7d2013-08-05 01:30:09 -0700975 """
976 Handles agents of the dispatcher.
977
978 Appropriate Agents are added to the dispatcher through
979 _schedule_running_host_queue_entries. These agents each
980 have a task. This method runs the agents task through
981 agent.tick() leading to:
982 agent.start
983 prolog -> AgentTasks prolog
984 For each queue entry:
985 sets host status/status to Running
986 set started_on in afe_host_queue_entries
987 run -> AgentTasks run
988 Creates PidfileRunMonitor
989 Queues the autoserv command line for this AgentTask
990 via the drone manager. These commands are executed
991 through the drone managers execute actions.
992 poll -> AgentTasks/BaseAgentTask poll
993 checks the monitors exit_code.
994 Executes epilog if task is finished.
995 Executes AgentTasks _finish_task
996 finish_task is usually responsible for setting the status
997 of the HQE/host, and updating it's active and complete fileds.
998
999 agent.is_done
1000 Removed the agent from the dispatchers _agents queue.
1001 Is_done checks the finished bit on the agent, that is
1002 set based on the Agents task. During the agents poll
1003 we check to see if the monitor process has exited in
1004 it's finish method, and set the success member of the
1005 task based on this exit code.
1006 """
Paul Hobbsb92af21b2015-04-09 15:12:41 -07001007 num_started_this_tick = 0
1008 num_finished_this_tick = 0
showard4c5374f2008-09-04 17:02:56 +00001009 have_reached_limit = False
1010 # iterate over copy, so we can remove agents during iteration
Simran Basi3f6717d2012-09-13 15:21:22 -07001011 logging.debug('Handling %d Agents', len(self._agents))
showard4c5374f2008-09-04 17:02:56 +00001012 for agent in list(self._agents):
Simran Basidef92872012-09-20 13:34:34 -07001013 self._log_extra_msg('Processing Agent with Host Ids: %s and '
1014 'queue_entry ids:%s' % (agent.host_ids,
1015 agent.queue_entry_ids))
showard8cc058f2009-09-08 16:26:33 +00001016 if not agent.started:
Paul Hobbsb92af21b2015-04-09 15:12:41 -07001017 if not self._can_start_agent(agent, have_reached_limit):
showard4c5374f2008-09-04 17:02:56 +00001018 have_reached_limit = True
Simran Basi3f6717d2012-09-13 15:21:22 -07001019 logging.debug('Reached Limit of allowed running Agents.')
showard4c5374f2008-09-04 17:02:56 +00001020 continue
Paul Hobbsb92af21b2015-04-09 15:12:41 -07001021 num_started_this_tick += agent.task.num_processes
Simran Basidef92872012-09-20 13:34:34 -07001022 self._log_extra_msg('Starting Agent')
showard4c5374f2008-09-04 17:02:56 +00001023 agent.tick()
Simran Basidef92872012-09-20 13:34:34 -07001024 self._log_extra_msg('Agent tick completed.')
showard8cc058f2009-09-08 16:26:33 +00001025 if agent.is_done():
Paul Hobbsb92af21b2015-04-09 15:12:41 -07001026 num_finished_this_tick += agent.task.num_processes
Simran Basidef92872012-09-20 13:34:34 -07001027 self._log_extra_msg("Agent finished")
showard8cc058f2009-09-08 16:26:33 +00001028 self.remove_agent(agent)
Gabe Black1e1c41b2015-02-04 23:55:15 -08001029 autotest_stats.Gauge('scheduler.jobs_per_tick').send(
Paul Hobbsb92af21b2015-04-09 15:12:41 -07001030 'agents_started', num_started_this_tick)
Gabe Black1e1c41b2015-02-04 23:55:15 -08001031 autotest_stats.Gauge('scheduler.jobs_per_tick').send(
Paul Hobbsb92af21b2015-04-09 15:12:41 -07001032 'agents_finished', num_finished_this_tick)
1033 logging.info('%d running processes. %d added this tick.',
Simran Basi3f6717d2012-09-13 15:21:22 -07001034 _drone_manager.total_running_processes(),
Paul Hobbsb92af21b2015-04-09 15:12:41 -07001035 num_started_this_tick)
mbligh36768f02008-02-22 18:28:33 +00001036
1037
showard29f7cd22009-04-29 21:16:24 +00001038 def _process_recurring_runs(self):
1039 recurring_runs = models.RecurringRun.objects.filter(
1040 start_date__lte=datetime.datetime.now())
1041 for rrun in recurring_runs:
1042 # Create job from template
1043 job = rrun.job
1044 info = rpc_utils.get_job_info(job)
showarda9435c02009-05-13 21:28:17 +00001045 options = job.get_object_dict()
showard29f7cd22009-04-29 21:16:24 +00001046
1047 host_objects = info['hosts']
1048 one_time_hosts = info['one_time_hosts']
1049 metahost_objects = info['meta_hosts']
1050 dependencies = info['dependencies']
1051 atomic_group = info['atomic_group']
1052
1053 for host in one_time_hosts or []:
1054 this_host = models.Host.create_one_time_host(host.hostname)
1055 host_objects.append(this_host)
1056
1057 try:
1058 rpc_utils.create_new_job(owner=rrun.owner.login,
showarda9435c02009-05-13 21:28:17 +00001059 options=options,
showard29f7cd22009-04-29 21:16:24 +00001060 host_objects=host_objects,
1061 metahost_objects=metahost_objects,
showard29f7cd22009-04-29 21:16:24 +00001062 atomic_group=atomic_group)
1063
1064 except Exception, ex:
1065 logging.exception(ex)
1066 #TODO send email
1067
1068 if rrun.loop_count == 1:
1069 rrun.delete()
1070 else:
1071 if rrun.loop_count != 0: # if not infinite loop
1072 # calculate new start_date
1073 difference = datetime.timedelta(seconds=rrun.loop_period)
1074 rrun.start_date = rrun.start_date + difference
1075 rrun.loop_count -= 1
1076 rrun.save()
1077
1078
Simran Basia858a232012-08-21 11:04:37 -07001079SiteDispatcher = utils.import_site_class(
1080 __file__, 'autotest_lib.scheduler.site_monitor_db',
1081 'SiteDispatcher', BaseDispatcher)
1082
1083class Dispatcher(SiteDispatcher):
1084 pass
1085
1086
mbligh36768f02008-02-22 18:28:33 +00001087class Agent(object):
showard77182562009-06-10 00:16:05 +00001088 """
Alex Miller47715eb2013-07-24 03:34:01 -07001089 An agent for use by the Dispatcher class to perform a task. An agent wraps
1090 around an AgentTask mainly to associate the AgentTask with the queue_entry
1091 and host ids.
showard77182562009-06-10 00:16:05 +00001092
1093 The following methods are required on all task objects:
1094 poll() - Called periodically to let the task check its status and
1095 update its internal state. If the task succeeded.
1096 is_done() - Returns True if the task is finished.
1097 abort() - Called when an abort has been requested. The task must
1098 set its aborted attribute to True if it actually aborted.
1099
1100 The following attributes are required on all task objects:
1101 aborted - bool, True if this task was aborted.
showard77182562009-06-10 00:16:05 +00001102 success - bool, True if this task succeeded.
1103 queue_entry_ids - A sequence of HostQueueEntry ids this task handles.
1104 host_ids - A sequence of Host ids this task represents.
showard77182562009-06-10 00:16:05 +00001105 """
1106
1107
showard418785b2009-11-23 20:19:59 +00001108 def __init__(self, task):
showard77182562009-06-10 00:16:05 +00001109 """
Alex Miller47715eb2013-07-24 03:34:01 -07001110 @param task: An instance of an AgentTask.
showard77182562009-06-10 00:16:05 +00001111 """
showard8cc058f2009-09-08 16:26:33 +00001112 self.task = task
showard8cc058f2009-09-08 16:26:33 +00001113
showard77182562009-06-10 00:16:05 +00001114 # This is filled in by Dispatcher.add_agent()
jadmanski0afbb632008-06-06 21:10:57 +00001115 self.dispatcher = None
jadmanski0afbb632008-06-06 21:10:57 +00001116
showard8cc058f2009-09-08 16:26:33 +00001117 self.queue_entry_ids = task.queue_entry_ids
1118 self.host_ids = task.host_ids
showard170873e2009-01-07 00:22:26 +00001119
showard8cc058f2009-09-08 16:26:33 +00001120 self.started = False
showard9bb960b2009-11-19 01:02:11 +00001121 self.finished = False
mbligh36768f02008-02-22 18:28:33 +00001122
1123
jadmanski0afbb632008-06-06 21:10:57 +00001124 def tick(self):
showard8cc058f2009-09-08 16:26:33 +00001125 self.started = True
showard9bb960b2009-11-19 01:02:11 +00001126 if not self.finished:
showard8cc058f2009-09-08 16:26:33 +00001127 self.task.poll()
1128 if self.task.is_done():
showard9bb960b2009-11-19 01:02:11 +00001129 self.finished = True
showardec113162008-05-08 00:52:49 +00001130
1131
jadmanski0afbb632008-06-06 21:10:57 +00001132 def is_done(self):
showard9bb960b2009-11-19 01:02:11 +00001133 return self.finished
mbligh36768f02008-02-22 18:28:33 +00001134
1135
showardd3dc1992009-04-22 21:01:40 +00001136 def abort(self):
showard8cc058f2009-09-08 16:26:33 +00001137 if self.task:
1138 self.task.abort()
1139 if self.task.aborted:
showard08a36412009-05-05 01:01:13 +00001140 # tasks can choose to ignore aborts
showard9bb960b2009-11-19 01:02:11 +00001141 self.finished = True
showard20f9bdd2009-04-29 19:48:33 +00001142
showardd3dc1992009-04-22 21:01:40 +00001143
beeps5e2bb4a2013-10-28 11:26:45 -07001144class AbstractQueueTask(agent_task.AgentTask, agent_task.TaskWithJobKeyvals):
showarda9545c02009-12-18 22:44:26 +00001145 """
1146 Common functionality for QueueTask and HostlessQueueTask
1147 """
1148 def __init__(self, queue_entries):
1149 super(AbstractQueueTask, self).__init__()
showardd1195652009-12-08 22:21:02 +00001150 self.job = queue_entries[0].job
jadmanski0afbb632008-06-06 21:10:57 +00001151 self.queue_entries = queue_entries
mbligh36768f02008-02-22 18:28:33 +00001152
1153
showard73ec0442009-02-07 02:05:20 +00001154 def _keyval_path(self):
showardd1195652009-12-08 22:21:02 +00001155 return os.path.join(self._working_directory(), self._KEYVAL_FILE)
showard73ec0442009-02-07 02:05:20 +00001156
1157
jamesrenc44ae992010-02-19 00:12:54 +00001158 def _write_control_file(self, execution_path):
1159 control_path = _drone_manager.attach_file_to_execution(
1160 execution_path, self.job.control_file)
1161 return control_path
1162
1163
Aviv Keshet308e7362013-05-21 14:43:16 -07001164 # TODO: Refactor into autoserv_utils. crbug.com/243090
showardd1195652009-12-08 22:21:02 +00001165 def _command_line(self):
jamesrenc44ae992010-02-19 00:12:54 +00001166 execution_path = self.queue_entries[0].execution_path()
1167 control_path = self._write_control_file(execution_path)
1168 hostnames = ','.join(entry.host.hostname
1169 for entry in self.queue_entries
1170 if not entry.is_hostless())
1171
1172 execution_tag = self.queue_entries[0].execution_tag()
1173 params = _autoserv_command_line(
1174 hostnames,
Alex Miller9f01d5d2013-08-08 02:26:01 -07001175 ['-P', execution_tag, '-n',
jamesrenc44ae992010-02-19 00:12:54 +00001176 _drone_manager.absolute_path(control_path)],
1177 job=self.job, verbose=False)
Dale Curtis30cb8eb2011-06-09 12:22:26 -07001178 if self.job.is_image_update_job():
1179 params += ['--image', self.job.update_image_path]
1180
jamesrenc44ae992010-02-19 00:12:54 +00001181 return params
showardd1195652009-12-08 22:21:02 +00001182
1183
1184 @property
1185 def num_processes(self):
1186 return len(self.queue_entries)
1187
1188
1189 @property
1190 def owner_username(self):
1191 return self.job.owner
1192
1193
1194 def _working_directory(self):
1195 return self._get_consistent_execution_path(self.queue_entries)
mblighbb421852008-03-11 22:36:16 +00001196
1197
jadmanski0afbb632008-06-06 21:10:57 +00001198 def prolog(self):
showardd9205182009-04-27 20:09:55 +00001199 queued_key, queued_time = self._job_queued_keyval(self.job)
showardc1a98d12010-01-15 00:22:22 +00001200 keyval_dict = self.job.keyval_dict()
1201 keyval_dict[queued_key] = queued_time
showardd1195652009-12-08 22:21:02 +00001202 group_name = self.queue_entries[0].get_group_name()
1203 if group_name:
1204 keyval_dict['host_group_name'] = group_name
showardf1ae3542009-05-11 19:26:02 +00001205 self._write_keyvals_before_job(keyval_dict)
jadmanski0afbb632008-06-06 21:10:57 +00001206 for queue_entry in self.queue_entries:
showard8cc058f2009-09-08 16:26:33 +00001207 queue_entry.set_status(models.HostQueueEntry.Status.RUNNING)
showarda9545c02009-12-18 22:44:26 +00001208 queue_entry.set_started_on_now()
mbligh36768f02008-02-22 18:28:33 +00001209
1210
showard35162b02009-03-03 02:17:30 +00001211 def _write_lost_process_error_file(self):
showardd1195652009-12-08 22:21:02 +00001212 error_file_path = os.path.join(self._working_directory(), 'job_failure')
showard35162b02009-03-03 02:17:30 +00001213 _drone_manager.write_lines_to_file(error_file_path,
1214 [_LOST_PROCESS_ERROR])
1215
1216
showardd3dc1992009-04-22 21:01:40 +00001217 def _finish_task(self):
showard08a36412009-05-05 01:01:13 +00001218 if not self.monitor:
1219 return
1220
showardd9205182009-04-27 20:09:55 +00001221 self._write_job_finished()
1222
showard35162b02009-03-03 02:17:30 +00001223 if self.monitor.lost_process:
1224 self._write_lost_process_error_file()
showard4ac47542009-08-31 18:32:19 +00001225
jadmanskif7fa2cc2008-10-01 14:13:23 +00001226
showardcbd74612008-11-19 21:42:02 +00001227 def _write_status_comment(self, comment):
showard170873e2009-01-07 00:22:26 +00001228 _drone_manager.write_lines_to_file(
showardd1195652009-12-08 22:21:02 +00001229 os.path.join(self._working_directory(), 'status.log'),
showard170873e2009-01-07 00:22:26 +00001230 ['INFO\t----\t----\t' + comment],
showard35162b02009-03-03 02:17:30 +00001231 paired_with_process=self.monitor.get_process())
showardcbd74612008-11-19 21:42:02 +00001232
1233
jadmanskif7fa2cc2008-10-01 14:13:23 +00001234 def _log_abort(self):
showard170873e2009-01-07 00:22:26 +00001235 if not self.monitor or not self.monitor.has_process():
1236 return
1237
jadmanskif7fa2cc2008-10-01 14:13:23 +00001238 # build up sets of all the aborted_by and aborted_on values
1239 aborted_by, aborted_on = set(), set()
1240 for queue_entry in self.queue_entries:
1241 if queue_entry.aborted_by:
1242 aborted_by.add(queue_entry.aborted_by)
1243 t = int(time.mktime(queue_entry.aborted_on.timetuple()))
1244 aborted_on.add(t)
1245
1246 # extract some actual, unique aborted by value and write it out
showard64a95952010-01-13 21:27:16 +00001247 # TODO(showard): this conditional is now obsolete, we just need to leave
1248 # it in temporarily for backwards compatibility over upgrades. delete
1249 # soon.
jadmanskif7fa2cc2008-10-01 14:13:23 +00001250 assert len(aborted_by) <= 1
1251 if len(aborted_by) == 1:
showardcbd74612008-11-19 21:42:02 +00001252 aborted_by_value = aborted_by.pop()
1253 aborted_on_value = max(aborted_on)
1254 else:
1255 aborted_by_value = 'autotest_system'
1256 aborted_on_value = int(time.time())
showard170873e2009-01-07 00:22:26 +00001257
showarda0382352009-02-11 23:36:43 +00001258 self._write_keyval_after_job("aborted_by", aborted_by_value)
1259 self._write_keyval_after_job("aborted_on", aborted_on_value)
showard170873e2009-01-07 00:22:26 +00001260
showardcbd74612008-11-19 21:42:02 +00001261 aborted_on_string = str(datetime.datetime.fromtimestamp(
1262 aborted_on_value))
1263 self._write_status_comment('Job aborted by %s on %s' %
1264 (aborted_by_value, aborted_on_string))
jadmanskic2ac77f2008-05-16 21:44:04 +00001265
1266
jadmanski0afbb632008-06-06 21:10:57 +00001267 def abort(self):
showarda9545c02009-12-18 22:44:26 +00001268 super(AbstractQueueTask, self).abort()
jadmanskif7fa2cc2008-10-01 14:13:23 +00001269 self._log_abort()
showardd3dc1992009-04-22 21:01:40 +00001270 self._finish_task()
showard21baa452008-10-21 00:08:39 +00001271
1272
jadmanski0afbb632008-06-06 21:10:57 +00001273 def epilog(self):
showarda9545c02009-12-18 22:44:26 +00001274 super(AbstractQueueTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00001275 self._finish_task()
showarda9545c02009-12-18 22:44:26 +00001276
1277
1278class QueueTask(AbstractQueueTask):
1279 def __init__(self, queue_entries):
1280 super(QueueTask, self).__init__(queue_entries)
1281 self._set_ids(queue_entries=queue_entries)
1282
1283
1284 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00001285 self._check_queue_entry_statuses(
1286 self.queue_entries,
1287 allowed_hqe_statuses=(models.HostQueueEntry.Status.STARTING,
1288 models.HostQueueEntry.Status.RUNNING),
1289 allowed_host_statuses=(models.Host.Status.PENDING,
1290 models.Host.Status.RUNNING))
showarda9545c02009-12-18 22:44:26 +00001291
1292 super(QueueTask, self).prolog()
1293
1294 for queue_entry in self.queue_entries:
1295 self._write_host_keyvals(queue_entry.host)
1296 queue_entry.host.set_status(models.Host.Status.RUNNING)
1297 queue_entry.host.update_field('dirty', 1)
showarda9545c02009-12-18 22:44:26 +00001298
1299
1300 def _finish_task(self):
1301 super(QueueTask, self)._finish_task()
1302
1303 for queue_entry in self.queue_entries:
1304 queue_entry.set_status(models.HostQueueEntry.Status.GATHERING)
jamesrenb8f3f352010-06-10 00:44:06 +00001305 queue_entry.host.set_status(models.Host.Status.RUNNING)
mbligh36768f02008-02-22 18:28:33 +00001306
1307
Alex Miller9f01d5d2013-08-08 02:26:01 -07001308 def _command_line(self):
Dan Shi36cfd832014-10-10 13:38:51 -07001309 invocation = super(QueueTask, self)._command_line()
1310 # Check if server-side packaging is needed.
1311 if (_enable_ssp_container and
1312 self.job.control_type == control_data.CONTROL_TYPE.SERVER and
1313 self.job.require_ssp != False):
Dan Shiec1d47d2015-02-13 11:38:13 -08001314 invocation += ['--require-ssp']
Dan Shi36cfd832014-10-10 13:38:51 -07001315 keyval_dict = self.job.keyval_dict()
1316 test_source_build = keyval_dict.get('test_source_build', None)
1317 if test_source_build:
1318 invocation += ['--test_source_build', test_source_build]
Dan Shi70647ca2015-07-16 22:52:35 -07001319 if self.job.parent_job_id:
1320 invocation += ['--parent_job_id', str(self.job.parent_job_id)]
Dan Shi36cfd832014-10-10 13:38:51 -07001321 return invocation + ['--verify_job_repo_url']
Alex Miller9f01d5d2013-08-08 02:26:01 -07001322
1323
Dan Shi1a189052013-10-28 14:41:35 -07001324class HostlessQueueTask(AbstractQueueTask):
mbligh4608b002010-01-05 18:22:35 +00001325 def __init__(self, queue_entry):
1326 super(HostlessQueueTask, self).__init__([queue_entry])
1327 self.queue_entry_ids = [queue_entry.id]
1328
1329
1330 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00001331 super(HostlessQueueTask, self).prolog()
1332
1333
mbligh4608b002010-01-05 18:22:35 +00001334 def _finish_task(self):
1335 super(HostlessQueueTask, self)._finish_task()
Dan Shi76af8022013-10-19 01:59:49 -07001336
1337 # When a job is added to database, its initial status is always
1338 # Starting. In a scheduler tick, scheduler finds all jobs in Starting
1339 # status, check if any of them can be started. If scheduler hits some
Paul Hobbsb92af21b2015-04-09 15:12:41 -07001340 # limit, e.g., max_hostless_jobs_per_drone, scheduler will
1341 # leave these jobs in Starting status. Otherwise, the jobs'
1342 # status will be changed to Running, and an autoserv process
1343 # will be started in drone for each of these jobs.
Dan Shi76af8022013-10-19 01:59:49 -07001344 # If the entry is still in status Starting, the process has not started
1345 # yet. Therefore, there is no need to parse and collect log. Without
1346 # this check, exception will be raised by scheduler as execution_subdir
1347 # for this queue entry does not have a value yet.
1348 hqe = self.queue_entries[0]
1349 if hqe.status != models.HostQueueEntry.Status.STARTING:
1350 hqe.set_status(models.HostQueueEntry.Status.PARSING)
mbligh4608b002010-01-05 18:22:35 +00001351
1352
mbligh36768f02008-02-22 18:28:33 +00001353if __name__ == '__main__':
jadmanski0afbb632008-06-06 21:10:57 +00001354 main()