blob: 43c7163141358125bdb6626a8c8a814a9b0ff332 [file] [log] [blame]
# Copyright 2017 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Module of config file readers."""
# pylint: disable=invalid-name
import collections
import ConfigParser
import json
import logging
import re
import build_utils
# Config readers.
Configs = collections.namedtuple(
'Configs',
[
'lab_config',
])
# The entries of android settings in lab config file.
ANDROID_SETTINGS = 'ANDROID'
# The entries of CrOS settings in lab config file.
CROS_SETTINGS = 'CROS'
# The entries of Rubik boards in rubik board config file.
RUBIK_SETTINGS = 'RUBIK'
def forgive_config_error(func):
"""A decorator making ConfigParser get*() functions return None on fail."""
def wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except ConfigParser.Error:
return None
return wrapper
class ConfigReader(ConfigParser.SafeConfigParser):
"""A SafeConfigReader that returns None on any error in get*()."""
def __init__(self, filename):
"""Initialize a config reader."""
ConfigParser.SafeConfigParser.__init__(self)
self.read(filename)
def read(self, filename):
"""Read the file indicated by the given filename.
Args:
filename: string for the file name to read.
"""
self.filename = filename
if filename is not None:
ConfigParser.SafeConfigParser.read(self, filename)
@forgive_config_error
def getstring(self, section, option):
"""Get string value for the given section and option."""
return ConfigParser.SafeConfigParser.get(self, section, option)
@forgive_config_error
def getint(self, section, option):
"""Get int value for the given section and option."""
return ConfigParser.SafeConfigParser.getint(self, section, option)
@forgive_config_error
def getfloat(self, section, option):
"""Get float value for the given section and option."""
return ConfigParser.SafeConfigParser.getfloat(self, section, option)
@forgive_config_error
def getboolean(self, section, option):
"""Get boolean value for the given section and option."""
return ConfigParser.SafeConfigParser.getboolean(self, section, option)
class LabConfig(object):
"""Class for reading lab config file for suite scheduler."""
def __init__(self, config_reader):
"""Initialize a LabConfig to read lab info.
Args:
config_reader: a ConfigReader to read lab configs.
"""
self._config_reader = config_reader
def get_android_board_list(self):
"""Get the android board list from lab config file.
Returns:
A set of Android boards: (android-angler, android-bullhead, ...)
Raises:
ValueError: no android board list is found.
"""
return set()
def get_cros_board_list(self):
"""Get the CrOS board list from lab config file.
Returns:
A set of CrOS boards: (abox_edge, alex, ...)
Raises:
ValueError: no CrOS board list is found.
"""
board_list = self._config_reader.getstring(CROS_SETTINGS, 'board_list')
if board_list is None:
raise ValueError('Cannot find CrOS board_list in lab config file.')
android_board_list = self.get_android_board_list()
# Filter board like xxxx_1 to xxxx.
cros_board_list = set([re.match(r'(.*?)(?:-\d+)?$', board.strip()).group(1)
for board in board_list.split(',')])
logging.info('Found CROS boards in lab_config.ini: %s', str(cros_board_list))
return cros_board_list - android_board_list
def get_cros_model_map(self):
"""Gets a map of CrOS boards onto a list of their corresponding models.
Returns:
A map of CrOS board names onto a list of models
"""
return self._get_model_map(CROS_SETTINGS)
def get_android_model_map(self):
"""Gets a map of Android boards onto a list of their corresponding models.
Returns:
A map of Android board names onto a list of models
"""
return self._get_model_map(ANDROID_SETTINGS)
def _get_model_map(self, setting_name):
result = {}
if not self._config_reader:
return result
encoded_map = self._config_reader.getstring(setting_name, 'model_list')
if encoded_map is None:
return result
# Models are read from boardName_modelName/familyName_boardName_modelName.
for full_model in encoded_map.split(','):
board_name, model_name = build_utils.parse_full_model(full_model)
if board_name in result:
result[board_name].append(model_name)
else:
result[board_name] = [model_name]
return result
class RubikConfig(object):
"""Class for reading the Rubik config file for suite scheduler."""
def __init__(self, config_reader):
"""Initialize a RubikConfig to read rubik config.
Args:
config_reader: a ConfigReader to read rubik config.
"""
self._config_reader = config_reader
def get_stable_rubik_milestones(self):
"""Get stable rubik milestones from the Rubik config file.
Returns:
A dict mapping build target (e.g. eve) to a stable rubik milestone.
Raises:
ValueError: no build target list is found.
"""
data = self._config_reader.getstring(RUBIK_SETTINGS, 'stable_rubik_milestones')
if data is None:
raise ValueError('Cannot find stable_rubik_milestones in Rubik config file.')
data = set([bt.strip() for bt in data.split(',')])
mstone_map = {}
for entry in data:
entry = entry.split('=')
mstone_map[entry[0]] = int(entry[1])
logging.info('Found stable rubik milestones targets in rubik_config.ini: %s', str(mstone_map))
return mstone_map
def config_dump(res):
"""Dump a config dict to a json string.
Args:
res: A dict object to dump.
Returns:
A json string.
"""
return json.dumps(res,
sort_keys=True,
indent=4,
separators=(',', ': ')) + '\n'
def _parse_model_list(models):
r"""Parse models of list.
Args:
models: A string, read from lab or migration config file.
e.g. '\nreef,\nastronaut,\nrobo360'
Returns:
A set of string models, e.g. set(['reef', 'astronaut', 'robo360', ...])
"""
return set([re.match(r'(.*?)(?:-\d+)?$', m.strip()).group(1)
for m in models.split(',')])
def _parse_pool_list(l):
r"""Parse allowlist/denylist for a suite in migration config file.
Args:
l: A string, read from migration config file entry
allowlist or denylist. e.g. '\ncq:reef,\nbvt:peppy...'
Returns:
A list of allowlist/denylist <pool:model> string.
e.g. ['cq:reef', 'bvt:peppy', ...]
"""
if l is None or not l:
return []
else:
return list(set([w.strip() for w in l.split('\n') if w.strip()]))
def _get_models_by_pool(model_pool_list, pool):
"""Get models from a model_pool_list by pool.
Args:
model_pool_list: A list of model_pool string. e.g.
[(cq, reef), (bvt, peppy), (bvt, link), ...]
pool: A string pool. e.g. 'bvt'
Returns:
A list of string models searched by pool. e.g.
['peppy', 'link', ...]
"""
res = set([])
for mp in model_pool_list:
p, m = _parse_pool_model_key(mp)
if p == pool:
res.add(m)
return list(res)
def suite_key(suite_name):
return 'suite:%s' % suite_name
def model_key(model_name):
return 'model:%s' % model_name
def pool_key(pool_name):
return 'pool:%s' % pool_name
def _pool_model_key(pool_name, model_name=None):
if model_name is None:
return '(%s)' % pool_name
return '(%s, %s)' % (pool_name, model_name)
def _parse_pool_model_key(model_pool):
return (model_pool[1:-1].split(',')[0].strip(),
model_pool[1:-1].split(',')[1].strip())