blob: 226225f4bc664ef864770e7c1c73a78d2ff8f5d7 [file] [log] [blame]
#!/usr/bin/env python3
# Copyright 2020 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Multicast service to spawn uftp server"""
import argparse
import logging
import os
import time
from cros.factory.utils import json_utils
from cros.factory.utils import process_utils
MCAST_CONFIG_NAME = 'multicast_config.json'
UMPIRE_CONFIG_NAME = 'active_umpire.json'
UMPIRE_DIR = '/var/db/factory/umpire'
UFTP_PATH = '/usr/bin/uftp'
class UftpProcess:
CC_TYPE = 'tfmcc' # TCP friendly multicast congestion control.
LOG_LEVEL = '0'
ROBUST_FACTOR = '50' # The number of announcements sent before file transfer.
TTL = '10'
def __init__(self, file_path, multicast_addr, status_file_path, interface):
"""Constructor of a UFTP process wrapper.
Args:
file_path: The file path to be transferred.
multicast_addr: The multicast address for sending announcement messages.
status_file_path: The path for logging file transfer status.
interface: The network interface name or IP to send the data from.
"""
self.file_path = file_path
self.multicast_addr = multicast_addr
self.status_file_path = status_file_path
self.interface = interface
self.Spawn()
def Spawn(self):
addr, port = self.multicast_addr.split(':')
# "-u" defines the the UDP source port, while "-p" defines the UDP
# destination port.
cmd = [
UFTP_PATH, '-M', addr, '-t', self.TTL, '-u', port, '-p', port, '-x',
self.LOG_LEVEL, '-S', self.status_file_path, '-C', self.CC_TYPE, '-s',
self.ROBUST_FACTOR
]
if self.interface != '':
cmd += ['-I', self.interface]
cmd += [self.file_path]
self.process = process_utils.Spawn(cmd, stderr=process_utils.PIPE)
def RespawnIfDied(self):
ANNOUNCE_TIMED_OUT_RETCODE = 7
if self.process is not None and self.process.poll() is not None:
# Skip announce timed out message.
if self.process.returncode != ANNOUNCE_TIMED_OUT_RETCODE:
logging.error(self.process.stderr.read())
self.Spawn()
def Kill(self):
self.process.kill()
self.process.wait()
def IsUmpireEnabled(project):
"""Return True if corresponding Umpire container is running."""
container_name = 'umpire_%s' % project
container_list = process_utils.CheckOutput(
['docker', 'ps', '--all', '--format', '{{.Names}}'],
encoding='utf-8').splitlines()
return container_name in container_list
def ScanUftpArgsFromConfig(mcast_config, resource_dir, log_dir):
"""Scan an Umpire config to extract UFTP arguments for the project.
Args:
mcast_config: The multicast config file generated by multicast service.
The content of multicast config is similar to Umpire payload
config file, but has an additional "multicast" key for
multicast information.
resource_dir: The Umpire resource directory.
log_dir: The log directory for UFTP server.
"""
scanned_args = []
mcast_addr = mcast_config['multicast']
uftp_interface = mcast_config['multicast'].get('server_ip', '')
for component in mcast_addr:
if component == 'server_ip':
continue
for part in mcast_addr[component]:
file_name = mcast_config[component][part]
uftp_file_path = os.path.join(resource_dir, file_name)
uftp_mcast_addr = mcast_addr[component][part]
uftp_status_file_path = os.path.join(log_dir, 'uftp_%s.log' % file_name)
scanned_args += [(uftp_file_path, uftp_mcast_addr, uftp_status_file_path,
uftp_interface)]
return scanned_args
def Main():
parser = argparse.ArgumentParser()
parser.add_argument('-l', '--log-dir', help='path to Umpire log directory',
required=True)
args = parser.parse_args()
active_process = {}
active_config = {}
while True:
scanned_config = {}
scanned_args = {}
for project in os.listdir(UMPIRE_DIR):
project_path = os.path.join(UMPIRE_DIR, project)
try:
umpire_config = json_utils.LoadFile(
os.path.join(project_path, UMPIRE_CONFIG_NAME))
mcast_enabled = umpire_config['services']['multicast']['active']
mcast_config = json_utils.LoadFile(
os.path.join(project_path, MCAST_CONFIG_NAME))
except Exception:
continue
if not mcast_enabled or not IsUmpireEnabled(project):
continue
resource_dir = os.path.join(project_path, 'resources')
scanned_config[project] = mcast_config
scanned_args[project] = ScanUftpArgsFromConfig(mcast_config, resource_dir,
args.log_dir)
for project in scanned_config:
if scanned_config[project] != active_config.get(project):
active_config[project] = scanned_config[project]
for proc in active_process.get(project, []):
proc.Kill()
active_process[project] = []
for uftp_args in scanned_args[project]:
active_process[project] += [UftpProcess(*uftp_args)]
else:
for proc in active_process[project]:
proc.RespawnIfDied()
for project in set(active_config) - set(scanned_config):
active_config.pop(project)
for proc in active_process[project]:
proc.Kill()
active_process.pop(project)
time.sleep(1)
if __name__ == '__main__':
Main()