| #!/usr/bin/env python3 |
| # Copyright 2020 The Chromium OS Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| """Multicast service to spawn uftp server""" |
| |
| import argparse |
| import logging |
| import os |
| import time |
| |
| from cros.factory.utils import json_utils |
| from cros.factory.utils import process_utils |
| |
| |
| MCAST_CONFIG_NAME = 'multicast_config.json' |
| UMPIRE_CONFIG_NAME = 'active_umpire.json' |
| UMPIRE_DIR = '/var/db/factory/umpire' |
| UFTP_PATH = '/usr/bin/uftp' |
| |
| |
| class UftpProcess: |
| CC_TYPE = 'tfmcc' # TCP friendly multicast congestion control. |
| LOG_LEVEL = '0' |
| ROBUST_FACTOR = '50' # The number of announcements sent before file transfer. |
| TTL = '10' |
| |
| def __init__(self, file_path, multicast_addr, status_file_path, interface): |
| """Constructor of a UFTP process wrapper. |
| |
| Args: |
| file_path: The file path to be transferred. |
| multicast_addr: The multicast address for sending announcement messages. |
| status_file_path: The path for logging file transfer status. |
| interface: The network interface name or IP to send the data from. |
| """ |
| self.file_path = file_path |
| self.multicast_addr = multicast_addr |
| self.status_file_path = status_file_path |
| self.interface = interface |
| |
| self.Spawn() |
| |
| def Spawn(self): |
| addr, port = self.multicast_addr.split(':') |
| |
| # "-u" defines the the UDP source port, while "-p" defines the UDP |
| # destination port. |
| cmd = [ |
| UFTP_PATH, '-M', addr, '-t', self.TTL, '-u', port, '-p', port, '-x', |
| self.LOG_LEVEL, '-S', self.status_file_path, '-C', self.CC_TYPE, '-s', |
| self.ROBUST_FACTOR |
| ] |
| |
| if self.interface != '': |
| cmd += ['-I', self.interface] |
| |
| cmd += [self.file_path] |
| |
| self.process = process_utils.Spawn(cmd, stderr=process_utils.PIPE) |
| |
| def RespawnIfDied(self): |
| ANNOUNCE_TIMED_OUT_RETCODE = 7 |
| if self.process is not None and self.process.poll() is not None: |
| # Skip announce timed out message. |
| if self.process.returncode != ANNOUNCE_TIMED_OUT_RETCODE: |
| logging.error(self.process.stderr.read()) |
| self.Spawn() |
| |
| def Kill(self): |
| self.process.kill() |
| self.process.wait() |
| |
| |
| def IsUmpireEnabled(project): |
| """Return True if corresponding Umpire container is running.""" |
| container_name = 'umpire_%s' % project |
| |
| container_list = process_utils.CheckOutput( |
| ['docker', 'ps', '--all', '--format', '{{.Names}}'], |
| encoding='utf-8').splitlines() |
| return container_name in container_list |
| |
| |
| def ScanUftpArgsFromConfig(mcast_config, resource_dir, log_dir): |
| """Scan an Umpire config to extract UFTP arguments for the project. |
| |
| Args: |
| mcast_config: The multicast config file generated by multicast service. |
| The content of multicast config is similar to Umpire payload |
| config file, but has an additional "multicast" key for |
| multicast information. |
| resource_dir: The Umpire resource directory. |
| log_dir: The log directory for UFTP server. |
| """ |
| scanned_args = [] |
| |
| mcast_addr = mcast_config['multicast'] |
| uftp_interface = mcast_config['multicast'].get('server_ip', '') |
| |
| for component in mcast_addr: |
| if component == 'server_ip': |
| continue |
| for part in mcast_addr[component]: |
| file_name = mcast_config[component][part] |
| |
| uftp_file_path = os.path.join(resource_dir, file_name) |
| uftp_mcast_addr = mcast_addr[component][part] |
| uftp_status_file_path = os.path.join(log_dir, 'uftp_%s.log' % file_name) |
| |
| scanned_args += [(uftp_file_path, uftp_mcast_addr, uftp_status_file_path, |
| uftp_interface)] |
| |
| return scanned_args |
| |
| |
| def Main(): |
| parser = argparse.ArgumentParser() |
| parser.add_argument('-l', '--log-dir', help='path to Umpire log directory', |
| required=True) |
| |
| args = parser.parse_args() |
| |
| active_process = {} |
| active_config = {} |
| |
| while True: |
| scanned_config = {} |
| scanned_args = {} |
| |
| for project in os.listdir(UMPIRE_DIR): |
| project_path = os.path.join(UMPIRE_DIR, project) |
| try: |
| umpire_config = json_utils.LoadFile( |
| os.path.join(project_path, UMPIRE_CONFIG_NAME)) |
| mcast_enabled = umpire_config['services']['multicast']['active'] |
| mcast_config = json_utils.LoadFile( |
| os.path.join(project_path, MCAST_CONFIG_NAME)) |
| except Exception: |
| continue |
| |
| if not mcast_enabled or not IsUmpireEnabled(project): |
| continue |
| |
| resource_dir = os.path.join(project_path, 'resources') |
| |
| scanned_config[project] = mcast_config |
| scanned_args[project] = ScanUftpArgsFromConfig(mcast_config, resource_dir, |
| args.log_dir) |
| |
| for project in scanned_config: |
| if scanned_config[project] != active_config.get(project): |
| active_config[project] = scanned_config[project] |
| for proc in active_process.get(project, []): |
| proc.Kill() |
| |
| active_process[project] = [] |
| for uftp_args in scanned_args[project]: |
| active_process[project] += [UftpProcess(*uftp_args)] |
| else: |
| for proc in active_process[project]: |
| proc.RespawnIfDied() |
| |
| for project in set(active_config) - set(scanned_config): |
| active_config.pop(project) |
| for proc in active_process[project]: |
| proc.Kill() |
| active_process.pop(project) |
| |
| time.sleep(1) |
| |
| |
| if __name__ == '__main__': |
| Main() |