dut: Add DUT-aware 'SystemStatus' component to replace state.SystemStatus
The 'cros.factory.system.state' module provides two objects: SystemInfo and
SystemStatus and we should migrate them into new DUT API. This change migrates
SystemStatus into cros.factory.test.dut.status. SystemInfo will be moved on its
own change set.
Previously the SystemStatus was made as an object with collected static values.
This has some disadvantages including:
- We can't just read one value. Always need to refresh all.
- Hard to make "live status".
- The retrieval is implicitly done when the object is created.
- Some tricks were made by setting class variable for overriding values.
So in new implementation we changed the SystemStatus into a DUTComponent that
all information can be retrieved individually as property. So that:
- Use self.dut.status.XXX when you need single status. It's probed immediately.
- Use self.dut.status.Snapshot() gives you an object with all known properties
with values fixed, just like before. To get a dictionary like info.GetAll(),
use status.Snapshot().__dict__.
- Use self.dut.status.Overrides(name, value) to override a value.
BUG=chromium:557573
TEST=make test
Change-Id: I694d57f1035f8ba1d88523353067108aaac88631
Reviewed-on: https://chromium-review.googlesource.com/318395
Commit-Ready: Hung-Te Lin <hungte@chromium.org>
Tested-by: Hung-Te Lin <hungte@chromium.org>
Reviewed-by: Hung-Te Lin <hungte@chromium.org>
diff --git a/py/goofy/goofy.py b/py/goofy/goofy.py
index 6d588d0..ac3dc26 100755
--- a/py/goofy/goofy.py
+++ b/py/goofy/goofy.py
@@ -1551,7 +1551,7 @@
self.test_list.options.min_charge_pct is not None):
self.charge_manager = ChargeManager(self.test_list.options.min_charge_pct,
self.test_list.options.max_charge_pct)
- system.state.SystemStatus.charge_manager = self.charge_manager
+ self.dut.status.Overrides('charge_manager', self.charge_manager)
else:
# Goofy should set charger state to charge if charge_manager is disabled.
self.charge()
diff --git a/py/system/state.py b/py/system/state.py
index cbc3c05..3ef2283 100755
--- a/py/system/state.py
+++ b/py/system/state.py
@@ -6,21 +6,17 @@
from __future__ import print_function
-import collections
import glob
import logging
-import netifaces
import os
import re
import subprocess
import factory_common # pylint: disable=W0611
from cros.factory import hwid
-from cros.factory.system import GetBoard
from cros.factory.system import partitions
from cros.factory import test
from cros.factory.test import dut
-from cros.factory.test.dut import power
from cros.factory.test import factory
from cros.factory.utils.process_utils import Spawn
from cros.factory.utils.sys_utils import MountDeviceAndReadFile
@@ -228,174 +224,6 @@
self.update_md5sum = SystemInfo.update_md5sum
-# TODO(hungte) Move these functions to network module.
-
-def GetIPv4Interfaces():
- """Returns a list of IPv4 interfaces."""
- interfaces = sorted(netifaces.interfaces())
- return [x for x in interfaces if not x.startswith('lo')]
-
-
-def GetIPv4InterfaceAddresses(interface):
- """Returns a list of ips of an interface"""
- try:
- addresses = netifaces.ifaddresses(interface).get(netifaces.AF_INET, [])
- except ValueError:
- pass
- ips = [x.get('addr') for x in addresses
- if 'addr' in x] or ['none']
- return ips
-
-
-def IsInterfaceConnected(prefix):
- """Returns whether any interface starting with prefix is connected"""
- ips = []
- for interface in GetIPv4Interfaces():
- if interface.startswith(prefix):
- ips += [x for x in GetIPv4InterfaceAddresses(interface) if x != 'none']
-
- return ips != []
-
-
-def GetIPv4Addresses():
- """Returns a string describing interfaces' IPv4 addresses.
-
- The returned string is of the format
-
- eth0=192.168.1.10, wlan0=192.168.16.14
- """
- ret = []
- interfaces = GetIPv4Interfaces()
- for interface in interfaces:
- ips = GetIPv4InterfaceAddresses(interface)
- ret.append('%s=%s' % (interface, '+'.join(ips)))
-
- return ', '.join(ret)
-
-
-_SysfsAttribute = collections.namedtuple('SysfsAttribute',
- ['name', 'type', 'optional'])
-_SysfsBatteryAttributes = [
- _SysfsAttribute('charge_full', int, False),
- _SysfsAttribute('charge_full_design', int, False),
- _SysfsAttribute('charge_now', int, False),
- _SysfsAttribute('current_now', int, False),
- _SysfsAttribute('present', bool, False),
- _SysfsAttribute('status', str, False),
- _SysfsAttribute('voltage_now', int, False),
- _SysfsAttribute('voltage_min_design', int, True),
- _SysfsAttribute('energy_full', int, True),
- _SysfsAttribute('energy_full_design', int, True),
- _SysfsAttribute('energy_now', int, True),
-]
-
-
-class SystemStatus(object):
- """Information about the current system status.
-
- This is information that changes frequently, e.g., load average
- or battery information.
-
- We log a bunch of system status here.
- """
- # Class variable: a charge_manager instance for checking force
- # charge status.
- charge_manager = None
-
- def __init__(self, dut_instance=None):
- def _CalculateBatteryFractionFull(battery):
- for t in ['charge', 'energy']:
- now = battery['%s_now' % t]
- full = battery['%s_full' % t]
- if (now is not None and full is not None and full > 0 and now >= 0):
- return float(now) / full
- return None
-
- self.dut = dut.Create() if dut_instance is None else dut_instance
- self.battery = {}
- self.battery_sysfs_path = None
- path_list = glob.glob('/sys/class/power_supply/*/type')
- for p in path_list:
- try:
- if open(p).read().strip() == 'Battery':
- self.battery_sysfs_path = os.path.dirname(p)
- break
- except:
- logging.warning('sysfs path %s is unavailable', p)
-
- for k, item_type, optional in _SysfsBatteryAttributes:
- self.battery[k] = None
- try:
- if self.battery_sysfs_path:
- self.battery[k] = item_type(
- open(os.path.join(self.battery_sysfs_path, k)).read().strip())
- except:
- log_func = logging.error
- if optional:
- log_func = logging.debug
- log_func('sysfs path %s is unavailable',
- os.path.join(self.battery_sysfs_path, k))
-
- self.battery['fraction_full'] = _CalculateBatteryFractionFull(self.battery)
-
- self.battery['force'] = False
- if self.charge_manager:
- force_status = {
- power.Power.ChargeState.DISCHARGE: 'Discharging',
- power.Power.ChargeState.CHARGE: 'Charging',
- power.Power.ChargeState.IDLE: 'Idle'}.get(
- self.charge_manager.state)
- if force_status:
- self.battery['status'] = force_status
- self.battery['force'] = True
-
- # Get fan speed
- try:
- self.fan_rpm = self.dut.thermal.GetFanRPM()
- except:
- self.fan_rpm = None
-
- # Get temperatures from sensors
- try:
- self.temperatures = self.dut.thermal.GetTemperatures()
- except:
- self.temperatures = []
-
- try:
- self.main_temperature_index = self.dut.thermal.GetMainTemperatureIndex()
- except:
- self.main_temperature_index = None
-
- try:
- self.load_avg = map(
- float, open('/proc/loadavg').read().split()[0:3])
- except:
- self.load_avg = None
-
- try:
- self.cpu = map(int, open('/proc/stat').readline().split()[1:])
- except:
- self.cpu = None
-
- try:
- self.ips = GetIPv4Addresses()
- except:
- self.ips = None
-
- try:
- self.eth_on = IsInterfaceConnected('eth')
- except:
- self.eth_on = None
-
- try:
- self.wlan_on = (IsInterfaceConnected('mlan') or
- IsInterfaceConnected('wlan'))
- except:
- self.wlan_on = None
-
-
if __name__ == '__main__':
import yaml
- print(yaml.dump(dict(system_info=SystemInfo(None, None).__dict__,
- system_status=SystemStatus().__dict__),
- default_flow_style=False))
+ print(yaml.dump(SystemInfo(None, None).__dict__, default_flow_style=False))
diff --git a/py/system/state_unittest.py b/py/system/state_unittest.py
index e31333b..19ea7a2 100755
--- a/py/system/state_unittest.py
+++ b/py/system/state_unittest.py
@@ -15,61 +15,11 @@
import factory_common # pylint: disable=W0611
from cros.factory.system import partitions
from cros.factory.system import state
-from cros.factory.test import dut as dut_module
-from cros.factory.test.dut import thermal
-from cros.factory.test.dut.board import DUTProperty
MOCK_RELEASE_IMAGE_LSB_RELEASE = ('GOOGLE_RELEASE=5264.0.0\n'
'CHROMEOS_RELEASE_TRACK=canary-channel\n')
-class SystemStatusTest(unittest.TestCase):
- """Unittest for SystemStatus."""
-
- def setUp(self):
- self.mox = mox.Mox()
-
- def tearDown(self):
- self.mox.UnsetStubs()
-
- def runTest(self):
-
- # Set up mocks for Board interface
- dut = dut_module.Create()
- DUTProperty.Override(dut, 'thermal', self.mox.CreateMock(thermal.Thermal))
- self.mox.StubOutWithMock(state, 'GetBoard')
- # Set up mocks for netifaces.
- netifaces = state.netifaces = self.mox.CreateMockAnything()
- netifaces.AF_INET = 2
- netifaces.AF_INET6 = 10
-
- dut.thermal.GetFanRPM().AndReturn([2000])
- dut.thermal.GetTemperatures().AndReturn([1, 2, 3, 4, 5])
- dut.thermal.GetMainTemperatureIndex().AndReturn(2)
- netifaces.interfaces().AndReturn(['lo', 'eth0', 'wlan0'])
- netifaces.ifaddresses('eth0').AndReturn(
- {netifaces.AF_INET6: [{'addr': 'aa:aa:aa:aa:aa:aa'}],
- netifaces.AF_INET: [{'broadcast': '192.168.1.255',
- 'addr': '192.168.1.100'}]})
- netifaces.ifaddresses('wlan0').AndReturn(
- {netifaces.AF_INET: [{'addr': '192.168.16.100'},
- {'addr': '192.168.16.101'}]})
- self.mox.ReplayAll()
-
- # Don't care about the values; just make sure there's something
- # there.
- status = state.SystemStatus(dut)
- # Don't check battery, since this system might not even have one.
- self.assertTrue(isinstance(status.battery, dict))
- self.assertEquals(3, len(status.load_avg))
- self.assertEquals(10, len(status.cpu))
- self.assertEquals(
- 'eth0=192.168.1.100, wlan0=192.168.16.100+192.168.16.101',
- status.ips)
-
- self.mox.VerifyAll()
-
-
class SystemInfoTest(unittest.TestCase):
"""Unittest for SystemInfo."""
diff --git a/py/test/dut/board.py b/py/test/dut/board.py
index 91f7422..e69cb4a 100644
--- a/py/test/dut/board.py
+++ b/py/test/dut/board.py
@@ -23,6 +23,7 @@
from cros.factory.test.dut import hooks
from cros.factory.test.dut import led
from cros.factory.test.dut import power
+from cros.factory.test.dut import status
from cros.factory.test.dut import temp
from cros.factory.test.dut import thermal
from cros.factory.test.dut import vpd
@@ -127,6 +128,11 @@
def vpd(self):
return vpd.VitalProductData(self)
+ @DUTProperty
+ def status(self):
+ """Returns live system status (dynamic data like CPU loading)."""
+ return status.SystemStatus(self)
+
# Helper functions to access DUT via link.
def IsReady(self):
diff --git a/py/test/dut/status.py b/py/test/dut/status.py
new file mode 100755
index 0000000..d08adee
--- /dev/null
+++ b/py/test/dut/status.py
@@ -0,0 +1,236 @@
+#!/usr/bin/env python
+# Copyright 2015 The Chromium OS Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+"""Module to retrieve system information and status."""
+
+from __future__ import print_function
+
+import collections
+import copy
+import logging
+
+import netifaces
+
+import factory_common # pylint: disable=W0611
+from cros.factory.test.dut import component
+
+
+# Static list of known properties in SystemStatus.
+_PROP_LIST = []
+
+
+def StatusProperty(f):
+ """Decoration function for SystemStatus properties."""
+ global _PROP_LIST
+ name = f.__name__
+ if not name.startswith('_'):
+ _PROP_LIST.append(name)
+ @property
+ def prop(self):
+ if name in self._overrides:
+ return self._overrides[name]
+ try:
+ return f(self)
+ except:
+ return None
+ return prop
+
+
+# TODO(hungte) These functions currently only reads local network information,
+# and should be changed to support remote DUT (for example by using 'ip'
+# command). We may also move them to class internal or shared network utility
+# modules.
+def GetIPv4Interfaces():
+ """Returns a list of IPv4 interfaces."""
+ interfaces = sorted(netifaces.interfaces())
+ return [x for x in interfaces if not x.startswith('lo')]
+
+
+def GetIPv4InterfaceAddresses(interface):
+ """Returns a list of ips of an interface"""
+ try:
+ addresses = netifaces.ifaddresses(interface).get(netifaces.AF_INET, [])
+ except ValueError:
+ pass
+ ips = [x.get('addr') for x in addresses
+ if 'addr' in x] or ['none']
+ return ips
+
+
+def IsInterfaceConnected(prefix):
+ """Returns whether any interface starting with prefix is connected"""
+ ips = []
+ for interface in GetIPv4Interfaces():
+ if interface.startswith(prefix):
+ ips += [x for x in GetIPv4InterfaceAddresses(interface) if x != 'none']
+
+ return ips != []
+
+
+def GetIPv4Addresses():
+ """Returns a string describing interfaces' IPv4 addresses.
+
+ The returned string is of the format
+
+ eth0=192.168.1.10, wlan0=192.168.16.14
+ """
+ ret = []
+ interfaces = GetIPv4Interfaces()
+ for interface in interfaces:
+ ips = GetIPv4InterfaceAddresses(interface)
+ ret.append('%s=%s' % (interface, '+'.join(ips)))
+
+ return ', '.join(ret)
+
+
+_SysfsAttribute = collections.namedtuple('SysfsAttribute',
+ ['name', 'type', 'optional'])
+_SysfsBatteryAttributes = [
+ _SysfsAttribute('charge_full', int, False),
+ _SysfsAttribute('charge_full_design', int, False),
+ _SysfsAttribute('charge_now', int, False),
+ _SysfsAttribute('current_now', int, False),
+ _SysfsAttribute('present', bool, False),
+ _SysfsAttribute('status', str, False),
+ _SysfsAttribute('voltage_now', int, False),
+ _SysfsAttribute('voltage_min_design', int, True),
+ _SysfsAttribute('energy_full', int, True),
+ _SysfsAttribute('energy_full_design', int, True),
+ _SysfsAttribute('energy_now', int, True),
+]
+
+
+class SystemStatusSnapshot(object):
+ """A snapshot object allows accessing pre-fetched data."""
+ def __init__(self, status):
+ self.__dict__.update(copy.deepcopy(
+ dict((name, getattr(status, name)) for name in _PROP_LIST)))
+
+
+class SystemStatus(component.DUTComponent):
+ """Information about the current system status.
+
+ This is information that changes frequently, e.g., load average
+ or battery information.
+ """
+
+ def __init__(self, dut=None):
+ super(SystemStatus, self).__init__(dut)
+ self._overrides = {}
+
+ def Snapshot(self):
+ """Returns a SystemStatusSnapshot object with all properties."""
+ return SystemStatusSnapshot(self)
+
+ def Overrides(self, name, value):
+ """Overrides a status property to given value.
+
+ This is useful for setting shared data like charge_manager.
+
+ Args:
+ name: A string for the property to override.
+ value: The value to return in future for given property.
+ """
+ self._overrides[name] = value
+
+ def _CalculateBatteryFractionFull(self, battery):
+ for t in ['charge', 'energy']:
+ now = battery['%s_now' % t]
+ full = battery['%s_full' % t]
+ if (now is not None and full is not None and full > 0 and now >= 0):
+ return float(now) / full
+ return None
+
+ @StatusProperty
+ def charge_manager(self):
+ """The charge_manager instance for checking force charge status.
+
+ This can be set by using Overrides('charge_manager', instance).
+ """
+ return None
+
+ @StatusProperty
+ def battery_sysfs_path(self):
+ path_list = self._dut.Glob('/sys/class/power_supply/*/type')
+ for p in path_list:
+ try:
+ if self._dut.ReadFile(p).strip() == 'Battery':
+ return self._dut.path.dirname(p)
+ except:
+ logging.warning('sysfs path %s is unavailable', p)
+ return None
+
+ @StatusProperty
+ def battery(self):
+ result = {}
+ sysfs_path = self.battery_sysfs_path
+ for k, item_type, optional in _SysfsBatteryAttributes:
+ result[k] = None
+ try:
+ if sysfs_path:
+ result[k] = item_type(
+ self._dut.ReadFile(self._dut.path.join(sysfs_path, k)).strip())
+ except:
+ log_func = logging.error
+ if optional:
+ log_func = logging.debug
+ log_func('sysfs path %s is unavailable',
+ self._dut.path.join(sysfs_path, k))
+
+ result['fraction_full'] = self._CalculateBatteryFractionFull(result)
+ result['force'] = False
+ if self.charge_manager:
+ force_status = {
+ self._dut.power.ChargeState.DISCHARGE: 'Discharging',
+ self._dut.power.ChargeState.CHARGE: 'Charging',
+ self._dut.power.ChargeState.IDLE: 'Idle'}.get(
+ self.charge_manager.state)
+ if force_status:
+ result['status'] = force_status
+ result['force'] = True
+ return result
+
+ @StatusProperty
+ def fan_rpm(self):
+ """Gets fan speed."""
+ return self._dut.thermal.GetFanRPM()
+
+ @StatusProperty
+ def temperatures(self):
+ """Gets temperatures from sensors."""
+ return self._dut.thermal.GetTemperatures()
+
+ @StatusProperty
+ def main_temperature_index(self):
+ return self._dut.thermal.GetMainTemperatureIndex()
+
+ @StatusProperty
+ def load_avg(self):
+ return map(float, self._dut.ReadFile('/proc/loadavg').split()[0:3])
+
+ @StatusProperty
+ def cpu(self):
+ return map(int,
+ self._dut.ReadFile('/proc/stat').splitlines()[0].split()[1:])
+
+ @StatusProperty
+ def ips(self):
+ return GetIPv4Addresses()
+
+ @StatusProperty
+ def eth_on(self):
+ return IsInterfaceConnected('eth')
+
+ @StatusProperty
+ def wlan_on(self):
+ return IsInterfaceConnected('mlan') or IsInterfaceConnected('wlan')
+
+
+if __name__ == '__main__':
+ import yaml
+ from cros.factory.test import dut
+ logging.basicConfig()
+ status = SystemStatus(dut.Create())
+ print(yaml.dump(status.Snapshot().__dict__, default_flow_style=False))
diff --git a/py/test/dut/status_unittest.py b/py/test/dut/status_unittest.py
new file mode 100755
index 0000000..5e77877
--- /dev/null
+++ b/py/test/dut/status_unittest.py
@@ -0,0 +1,70 @@
+#!/usr/bin/python -u
+#
+# Copyright 2015 The Chromium OS Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+
+"""Unittest for SystemStatus."""
+
+
+import logging
+import mox
+import unittest
+
+import factory_common # pylint: disable=W0611
+from cros.factory.test.dut import board as board_module
+from cros.factory.test.dut import thermal
+from cros.factory.test.dut import status as status_module
+
+
+class SystemStatusTest(unittest.TestCase):
+ """Unittest for SystemStatus."""
+
+ def setUp(self):
+ self.mox = mox.Mox()
+
+ def tearDown(self):
+ self.mox.UnsetStubs()
+
+ def runTest(self):
+ # Set up mocks for netifaces.
+ netifaces = status_module.netifaces = self.mox.CreateMockAnything()
+ netifaces.AF_INET = 2
+ netifaces.AF_INET6 = 10
+
+ # Set up mocks for Board interface
+ board = self.mox.CreateMock(board_module.DUTBoard)
+ board.thermal = self.mox.CreateMock(thermal.Thermal)
+ board.thermal.GetFanRPM().AndReturn([2000])
+ board.thermal.GetTemperatures().AndReturn([1, 2, 3, 4, 5])
+ board.thermal.GetMainTemperatureIndex().AndReturn(2)
+ netifaces.interfaces().AndReturn(['lo', 'eth0', 'wlan0'])
+ netifaces.ifaddresses('eth0').AndReturn(
+ {netifaces.AF_INET6: [{'addr': 'aa:aa:aa:aa:aa:aa'}],
+ netifaces.AF_INET: [{'broadcast': '192.168.1.255',
+ 'addr': '192.168.1.100'}]})
+ netifaces.ifaddresses('wlan0').AndReturn(
+ {netifaces.AF_INET: [{'addr': '192.168.16.100'},
+ {'addr': '192.168.16.101'}]})
+ self.mox.ReplayAll()
+
+ status = status_module.SystemStatus(board).Snapshot()
+
+ # Don't check battery, since this system might not even have one.
+ self.assertTrue(isinstance(status.battery, dict))
+ self.assertEquals([2000], status.fan_rpm)
+ self.assertEquals(2, status.main_temperature_index)
+ self.assertEquals([1, 2, 3, 4, 5], status.temperatures)
+ self.assertEquals(
+ 'eth0=192.168.1.100, wlan0=192.168.16.100+192.168.16.101',
+ status.ips)
+
+ self.mox.VerifyAll()
+
+
+if __name__ == '__main__':
+ logging.basicConfig(
+ format='%(asctime)s:%(filename)s:%(lineno)d:%(levelname)s:%(message)s',
+ level=logging.DEBUG)
+ unittest.main()
diff --git a/py/test/pytests/battery_cycle.py b/py/test/pytests/battery_cycle.py
index c2586df..f08937a 100644
--- a/py/test/pytests/battery_cycle.py
+++ b/py/test/pytests/battery_cycle.py
@@ -23,7 +23,6 @@
import factory_common # pylint: disable=W0611
from cros.factory.test.event_log import Log
-from cros.factory.system.state import SystemStatus
from cros.factory.test import test_ui
from cros.factory.test import ui_templates
from cros.factory.test import utils
@@ -95,7 +94,7 @@
def setUp(self):
self.ui = test_ui.UI()
- self.status = SystemStatus()
+ self.status = self.dut.status.Snapshot()
self.template = ui_templates.OneSection(self.ui)
self.template.SetState(HTML)
self.ui.AppendCSS(CSS)
@@ -210,7 +209,7 @@
phase_start_time = time.time()
last_log_time = None
while True:
- self.status = SystemStatus()
+ self.status = self.dut.status.Snapshot()
now = time.time()
if (last_log_time is None) or (
now - last_log_time >= self.args.log_interval_secs):
diff --git a/py/test/pytests/check_image_version.py b/py/test/pytests/check_image_version.py
index e8c660a..bb7eda8 100644
--- a/py/test/pytests/check_image_version.py
+++ b/py/test/pytests/check_image_version.py
@@ -16,7 +16,7 @@
import factory_common # pylint: disable=W0611
from cros.factory.test.event_log import Log
-from cros.factory.system.state import SystemInfo, SystemStatus
+from cros.factory.system.state import SystemInfo
from cros.factory.test import factory
from cros.factory.test import shopfloor
from cros.factory.test import test_ui
@@ -81,7 +81,7 @@
self._test = test
def CheckNetwork(self):
- while not SystemStatus().eth_on:
+ while not self.dut.status.eth_on:
time.sleep(0.5)
self._test.template.SetState(_MSG_NETWORK)
diff --git a/py/test/pytests/countdown.py b/py/test/pytests/countdown.py
index 1b9043d..de6c616 100644
--- a/py/test/pytests/countdown.py
+++ b/py/test/pytests/countdown.py
@@ -15,7 +15,6 @@
import factory_common # pylint: disable=W0611
from cros.factory.test.event_log import Log
-from cros.factory.system.state import SystemStatus
from cros.factory.test import factory, test_ui
from cros.factory.test.args import Arg
from cros.factory.utils import time_utils
@@ -185,7 +184,7 @@
self._remaining_secs = self.args.duration_secs
self._next_log_time = 0
self._next_ui_update_time = 0
- last_status = SystemStatus()
+ last_status = self.dut.status.Snapshot()
try:
self.UpdateLegend(self.dut.thermal.GetTemperatureSensorNames())
@@ -199,7 +198,7 @@
current_time = time.time()
if (current_time >= self._next_log_time or
current_time >= self._next_ui_update_time):
- sys_status = SystemStatus()
+ sys_status = self.dut.status.Snapshot()
if current_time >= self._next_log_time:
Log('system_status', elapsed_secs=self._elapsed_secs,
diff --git a/py/test/pytests/thermal_load.py b/py/test/pytests/thermal_load.py
index db2d7ed..4539670 100644
--- a/py/test/pytests/thermal_load.py
+++ b/py/test/pytests/thermal_load.py
@@ -20,7 +20,6 @@
import unittest
import factory_common # pylint: disable=W0611
-from cros.factory.system.state import SystemStatus
from cros.factory.test.event_log import Log
from cros.factory.test.args import Arg
from cros.factory.test.utils import LoadManager
@@ -51,8 +50,9 @@
def GetTemperatures(self):
"""Gets the temperature reading from specified sensor."""
temperatures = []
+ system_temperatures = self.dut.status.temperatures
for sensor_index in self.args.sensor_index:
- temperatures.append(SystemStatus().temperatures[sensor_index])
+ temperatures.append(system_temperatures[sensor_index])
return temperatures
def CheckTemperatures(self, temperatures, elapsed):
diff --git a/py/test/pytests/thermal_slope.py b/py/test/pytests/thermal_slope.py
index c4e3ce1..095295b 100644
--- a/py/test/pytests/thermal_slope.py
+++ b/py/test/pytests/thermal_slope.py
@@ -37,7 +37,6 @@
import unittest
import factory_common # pylint: disable=W0611
-from cros.factory.system.state import SystemStatus
from cros.factory.system.msr import MSRSnapshot
from cros.factory.test import factory
from cros.factory.test.args import Arg
@@ -109,7 +108,7 @@
is False).
'''
self.msr = MSRSnapshot(self.msr)
- self.system_status = SystemStatus()
+ self.system_status = self.dut.status
elapsed_time = time.time() - self.stage_start_time
self.log.info(
u'%s (%.1f s): fan_rpm=%d, temp=%d°C, pkg_power_w=%.3f W' % (
diff --git a/py/test/state.py b/py/test/state.py
index 391145a..3487842 100644
--- a/py/test/state.py
+++ b/py/test/state.py
@@ -30,7 +30,7 @@
from jsonrpclib import jsonclass
from jsonrpclib import jsonrpc
from jsonrpclib import SimpleJSONRPCServer
-from cros.factory import system
+from cros.factory.test import dut
from cros.factory.test import factory
from cros.factory.test.factory import TestState
from cros.factory.test import unicode_to_string
@@ -166,6 +166,9 @@
self._generated_data_expiration = Queue.PriorityQueue()
self._resolver = PathResolver()
+ # TODO(hungte) Support remote dynamic DUT.
+ self._dut = dut.Create()
+
if TestState not in jsonclass.supported_types:
jsonclass.supported_types.append(TestState)
@@ -466,9 +469,9 @@
'''Returns system status information.
This may include system load, battery status, etc. See
- system.state.SystemStatus().
+ cros.factory.test.dut.status.SystemStatus.
'''
- return system.state.SystemStatus().__dict__
+ return self._dut.status.Snapshot().__dict__
def get_instance(address=DEFAULT_FACTORY_STATE_ADDRESS,
diff --git a/py/tools/cpu_usage_monitor.py b/py/tools/cpu_usage_monitor.py
index 1d7fa1f..59ee8dc 100755
--- a/py/tools/cpu_usage_monitor.py
+++ b/py/tools/cpu_usage_monitor.py
@@ -8,7 +8,6 @@
import time
import factory_common # pylint: disable=W0611
-from cros.factory import system
from cros.factory.test import factory
from cros.factory.utils import process_utils
@@ -23,13 +22,13 @@
# Only reports processes with CPU usage exceeding this threshold.
CPU_THRESHOLD = 10
- def __init__(self, period_secs):
+ def __init__(self, period_secs, dut):
self._period_secs = period_secs
+ self.dut = dut
factory.init_logging()
def _GetLoadString(self):
- return ', '.join('%.1f' % load for load in
- system.state.SystemStatus().load_avg)
+ return ', '.join('%.1f' % load for load in self.dut.status.load_avg)
def Check(self):
"""Checks the current CPU usage status.
@@ -41,6 +40,8 @@
try:
msg.append('Load average: %s' % self._GetLoadString())
+ # TODO(hungte) Change process_utils.CheckOutput to dut.CheckOutput so we
+ # can support getting remote system load.
# Get column legend from 'top' and throw away summary header and legend
top_output = process_utils.CheckOutput(
['top', '-b', '-c', '-n', '1']).split('\n')
@@ -75,7 +76,10 @@
type=int, required=False, default=120)
args = parser.parse_args()
- monitor = CPUUsageMonitor(args.period_secs)
+ # TODO(hungte) This currently only reads from local system. We need to expore
+ # DUT options and allow reading from remote in future.
+ from cros.factory.test import dut
+ monitor = CPUUsageMonitor(args.period_secs, dut.Create())
monitor.CheckForever()
if __name__ == '__main__':
diff --git a/py/tools/cpu_usage_monitor_unittest.py b/py/tools/cpu_usage_monitor_unittest.py
index f62495c..2bb64d7 100755
--- a/py/tools/cpu_usage_monitor_unittest.py
+++ b/py/tools/cpu_usage_monitor_unittest.py
@@ -9,6 +9,7 @@
import unittest
import factory_common # pylint: disable=W0611
+from cros.factory.test import dut
from cros.factory.tools.cpu_usage_monitor import CPUUsageMonitor
from cros.factory.utils import process_utils
@@ -37,7 +38,7 @@
class TestCpuUsageMonitor(unittest.TestCase):
def setUp(self):
- self.monitor = CPUUsageMonitor(120)
+ self.monitor = CPUUsageMonitor(120, dut.Create())
self.monitor.COMMAND_LENGTH = 10
self.mox = mox.Mox()
self.mox.StubOutWithMock(logging, 'info')