blob: cbc3c059d7c807117ecd87546604196fff57eb92 [file] [log] [blame]
# Copyright 2015 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Module to retrieve system information and status."""
from __future__ import print_function
import collections
import glob
import logging
import netifaces
import os
import re
import subprocess
import factory_common # pylint: disable=W0611
from cros.factory import hwid
from cros.factory.system import GetBoard
from cros.factory.system import partitions
from cros.factory import test
from cros.factory.test import dut
from cros.factory.test.dut import power
from cros.factory.test import factory
from cros.factory.utils.process_utils import Spawn
from cros.factory.utils.sys_utils import MountDeviceAndReadFile
class SystemInfo(object):
"""Static information about the system.
This is mostly static information that changes rarely if ever
(e.g., version numbers, serial numbers, etc.).
"""
# If not None, an update that is available from the update server.
update_md5sum = None
# The cached release image version and channel.
release_image_version = None
release_image_channel = None
allowed_release_channels = ['dev', 'beta', 'stable']
def __init__(self, dut_instance=None):
self.dut = dut.Create() if dut_instance is None else dut_instance
self.mlb_serial_number = None
try:
self.mlb_serial_number = test.shopfloor.GetDeviceData()[
'mlb_serial_number']
except:
pass
self.serial_number = None
try:
self.serial_number = test.shopfloor.get_serial_number()
if self.serial_number is not None:
self.serial_number = str(self.serial_number)
except:
pass
self.stage = None
try:
self.stage = test.shopfloor.GetDeviceData()['stage']
except:
pass
self.factory_image_version = None
try:
lsb_release = open('/etc/lsb-release').read()
match = re.search('^GOOGLE_RELEASE=(.+)$', lsb_release,
re.MULTILINE)
if match:
self.factory_image_version = match.group(1)
except:
pass
self.toolkit_version = None
try:
with open('/usr/local/factory/TOOLKIT_VERSION') as f:
self.toolkit_version = f.read().strip()
except:
pass
def _GetReleaseLSBValue(lsb_key):
"""Gets the value from the lsb-release file on release image."""
if not _GetReleaseLSBValue.lsb_content:
try:
release_rootfs = partitions.RELEASE_ROOTFS.path
_GetReleaseLSBValue.lsb_content = (
MountDeviceAndReadFile(release_rootfs, '/etc/lsb-release'))
except:
pass
match = re.search('^%s=(.+)$' % lsb_key,
_GetReleaseLSBValue.lsb_content,
re.MULTILINE)
if match:
return match.group(1)
else:
return None
# The cached content of lsb-release file.
_GetReleaseLSBValue.lsb_content = ""
self.release_image_version = None
if SystemInfo.release_image_version:
self.release_image_version = SystemInfo.release_image_version
logging.debug('Obtained release image version from SystemInfo: %r',
self.release_image_version)
else:
logging.debug('Release image version does not exist in SystemInfo. '
'Try to get it from lsb-release from release partition.')
self.release_image_version = _GetReleaseLSBValue('GOOGLE_RELEASE')
if self.release_image_version:
logging.debug('Release image version: %s', self.release_image_version)
logging.debug('Cache release image version to SystemInfo.')
SystemInfo.release_image_version = self.release_image_version
else:
logging.debug('Can not read release image version from lsb-release.')
self.release_image_channel = None
if SystemInfo.release_image_channel:
self.release_image_channel = SystemInfo.release_image_channel
logging.debug('Obtained release image channel from SystemInfo: %r',
self.release_image_channel)
else:
logging.debug('Release image channel does not exist in SystemInfo. '
'Try to get it from lsb-release from release partition.')
self.release_image_channel = _GetReleaseLSBValue('CHROMEOS_RELEASE_TRACK')
if self.release_image_channel:
logging.debug('Release image channel: %s', self.release_image_channel)
logging.debug('Cache release image channel to SystemInfo.')
SystemInfo.release_image_channel = self.release_image_channel
else:
logging.debug('Can not read release image channel from lsb-release.')
self.wlan0_mac = None
try:
for wlan_interface in ['mlan0', 'wlan0']:
address_path = os.path.join('/sys/class/net/',
wlan_interface, 'address')
if os.path.exists(address_path):
self.wlan0_mac = open(address_path).read().strip()
except:
pass
self.eth_macs = dict()
try:
eth_paths = glob.glob('/sys/class/net/eth*')
for eth_path in eth_paths:
address_path = os.path.join(eth_path, 'address')
if os.path.exists(address_path):
self.eth_macs[os.path.basename(eth_path)] = open(
address_path).read().strip()
except:
self.eth_macs = None
self.kernel_version = None
try:
uname = subprocess.Popen(['uname', '-r'], stdout=subprocess.PIPE)
stdout, _ = uname.communicate()
self.kernel_version = stdout.strip()
except:
pass
self.architecture = None
try:
self.architecture = Spawn(['uname', '-m'],
check_output=True).stdout_data.strip()
except:
pass
self.ec_version = None
try:
self.ec_version = self.dut.ec.GetECVersion()
except:
pass
self.pd_version = None
try:
self.pd_version = self.dut.ec.GetPDVersion()
except:
pass
self.firmware_version = None
try:
crossystem = subprocess.Popen(['crossystem', 'fwid'],
stdout=subprocess.PIPE)
stdout, _ = crossystem.communicate()
self.firmware_version = stdout.strip() or None
except:
pass
self.mainfw_type = None
try:
crossystem = subprocess.Popen(['crossystem', 'mainfw_type'],
stdout=subprocess.PIPE)
stdout, _ = crossystem.communicate()
self.mainfw_type = stdout.strip() or None
except:
pass
self.root_device = None
try:
rootdev = Spawn(['rootdev', '-s'],
stdout=subprocess.PIPE, ignore_stderr=True)
stdout, _ = rootdev.communicate()
self.root_device = stdout.strip()
except:
pass
self.factory_md5sum = factory.get_current_md5sum()
# Uses checksum of hwid file as hwid database version.
self.hwid_database_version = None
try:
hwid_file_path = os.path.join(hwid.common.DEFAULT_HWID_DATA_PATH,
hwid.common.ProbeBoard().upper())
if os.path.exists(hwid_file_path):
self.hwid_database_version = hwid.hwid_utils.ComputeDatabaseChecksum(
hwid_file_path)
except:
pass
# update_md5sum is currently in SystemInfo's __dict__ but not this
# object's. Copy it from SystemInfo into this object's __dict__.
self.update_md5sum = SystemInfo.update_md5sum
# TODO(hungte) Move these functions to network module.
def GetIPv4Interfaces():
"""Returns a list of IPv4 interfaces."""
interfaces = sorted(netifaces.interfaces())
return [x for x in interfaces if not x.startswith('lo')]
def GetIPv4InterfaceAddresses(interface):
"""Returns a list of ips of an interface"""
try:
addresses = netifaces.ifaddresses(interface).get(netifaces.AF_INET, [])
except ValueError:
pass
ips = [x.get('addr') for x in addresses
if 'addr' in x] or ['none']
return ips
def IsInterfaceConnected(prefix):
"""Returns whether any interface starting with prefix is connected"""
ips = []
for interface in GetIPv4Interfaces():
if interface.startswith(prefix):
ips += [x for x in GetIPv4InterfaceAddresses(interface) if x != 'none']
return ips != []
def GetIPv4Addresses():
"""Returns a string describing interfaces' IPv4 addresses.
The returned string is of the format
eth0=192.168.1.10, wlan0=192.168.16.14
"""
ret = []
interfaces = GetIPv4Interfaces()
for interface in interfaces:
ips = GetIPv4InterfaceAddresses(interface)
ret.append('%s=%s' % (interface, '+'.join(ips)))
return ', '.join(ret)
_SysfsAttribute = collections.namedtuple('SysfsAttribute',
['name', 'type', 'optional'])
_SysfsBatteryAttributes = [
_SysfsAttribute('charge_full', int, False),
_SysfsAttribute('charge_full_design', int, False),
_SysfsAttribute('charge_now', int, False),
_SysfsAttribute('current_now', int, False),
_SysfsAttribute('present', bool, False),
_SysfsAttribute('status', str, False),
_SysfsAttribute('voltage_now', int, False),
_SysfsAttribute('voltage_min_design', int, True),
_SysfsAttribute('energy_full', int, True),
_SysfsAttribute('energy_full_design', int, True),
_SysfsAttribute('energy_now', int, True),
]
class SystemStatus(object):
"""Information about the current system status.
This is information that changes frequently, e.g., load average
or battery information.
We log a bunch of system status here.
"""
# Class variable: a charge_manager instance for checking force
# charge status.
charge_manager = None
def __init__(self, dut_instance=None):
def _CalculateBatteryFractionFull(battery):
for t in ['charge', 'energy']:
now = battery['%s_now' % t]
full = battery['%s_full' % t]
if (now is not None and full is not None and full > 0 and now >= 0):
return float(now) / full
return None
self.dut = dut.Create() if dut_instance is None else dut_instance
self.battery = {}
self.battery_sysfs_path = None
path_list = glob.glob('/sys/class/power_supply/*/type')
for p in path_list:
try:
if open(p).read().strip() == 'Battery':
self.battery_sysfs_path = os.path.dirname(p)
break
except:
logging.warning('sysfs path %s is unavailable', p)
for k, item_type, optional in _SysfsBatteryAttributes:
self.battery[k] = None
try:
if self.battery_sysfs_path:
self.battery[k] = item_type(
open(os.path.join(self.battery_sysfs_path, k)).read().strip())
except:
log_func = logging.error
if optional:
log_func = logging.debug
log_func('sysfs path %s is unavailable',
os.path.join(self.battery_sysfs_path, k))
self.battery['fraction_full'] = _CalculateBatteryFractionFull(self.battery)
self.battery['force'] = False
if self.charge_manager:
force_status = {
power.Power.ChargeState.DISCHARGE: 'Discharging',
power.Power.ChargeState.CHARGE: 'Charging',
power.Power.ChargeState.IDLE: 'Idle'}.get(
self.charge_manager.state)
if force_status:
self.battery['status'] = force_status
self.battery['force'] = True
# Get fan speed
try:
self.fan_rpm = self.dut.thermal.GetFanRPM()
except:
self.fan_rpm = None
# Get temperatures from sensors
try:
self.temperatures = self.dut.thermal.GetTemperatures()
except:
self.temperatures = []
try:
self.main_temperature_index = self.dut.thermal.GetMainTemperatureIndex()
except:
self.main_temperature_index = None
try:
self.load_avg = map(
float, open('/proc/loadavg').read().split()[0:3])
except:
self.load_avg = None
try:
self.cpu = map(int, open('/proc/stat').readline().split()[1:])
except:
self.cpu = None
try:
self.ips = GetIPv4Addresses()
except:
self.ips = None
try:
self.eth_on = IsInterfaceConnected('eth')
except:
self.eth_on = None
try:
self.wlan_on = (IsInterfaceConnected('mlan') or
IsInterfaceConnected('wlan'))
except:
self.wlan_on = None
if __name__ == '__main__':
import yaml
print(yaml.dump(dict(system_info=SystemInfo(None, None).__dict__,
system_status=SystemStatus().__dict__),
default_flow_style=False))