Handle Ctrl+C more gracefully when uploading files.
It still doesn't abort instantly (since there's no mechanism to interrupt
ongoing network call). Better than noting. An alternative would be to 'detach'
worker threads without waiting for them to finish, but it seems to be more
dirty.
Also add timing information to logger output.
R=maruel@chromium.org
BUG=
Review URL: https://codereview.appspot.com/158980043
diff --git a/isolateserver.py b/isolateserver.py
index 04c312e..2f800ff 100755
--- a/isolateserver.py
+++ b/isolateserver.py
@@ -12,6 +12,7 @@
import os
import re
import shutil
+import signal
import sys
import tempfile
import threading
@@ -106,6 +107,11 @@
pass
+class Aborted(Error):
+ """Operation aborted."""
+ pass
+
+
def stream_read(stream, chunk_size):
"""Reads chunks from |stream| and yields them."""
while True:
@@ -322,7 +328,8 @@
Works only within single namespace (and thus hashing algorithm and compression
scheme are fixed).
- Spawns multiple internal threads. Thread safe, but not fork safe.
+ Spawns multiple internal threads. Thread safe, but not fork safe. Modifies
+ signal handlers table to handle Ctrl+C.
"""
def __init__(self, storage_api):
@@ -332,6 +339,8 @@
self._hash_algo = isolated_format.get_hash_algo(storage_api.namespace)
self._cpu_thread_pool = None
self._net_thread_pool = None
+ self._aborted = False
+ self._prev_sig_handlers = {}
@property
def hash_algo(self):
@@ -375,6 +384,7 @@
def close(self):
"""Waits for all pending tasks to finish."""
+ logging.info('Waiting for all threads to die...')
if self._cpu_thread_pool:
self._cpu_thread_pool.join()
self._cpu_thread_pool.close()
@@ -383,14 +393,31 @@
self._net_thread_pool.join()
self._net_thread_pool.close()
self._net_thread_pool = None
+ logging.info('Done.')
+
+ def abort(self):
+ """Cancels any pending or future operations."""
+ # This is not strictly theadsafe, but in the worst case the logging message
+ # will be printed twice. Not a big deal. In other places it is assumed that
+ # unprotected reads and writes to _aborted are serializable (it is true
+ # for python) and thus no locking is used.
+ if not self._aborted:
+ logging.warning('Aborting... It can take a while.')
+ self._aborted = True
def __enter__(self):
"""Context manager interface."""
+ assert not self._prev_sig_handlers, self._prev_sig_handlers
+ for s in (signal.SIGINT, signal.SIGTERM):
+ self._prev_sig_handlers[s] = signal.signal(s, lambda *_args: self.abort())
return self
def __exit__(self, _exc_type, _exc_value, _traceback):
"""Context manager interface."""
self.close()
+ while self._prev_sig_handlers:
+ s, h = self._prev_sig_handlers.popitem()
+ signal.signal(s, h)
return False
def upload_items(self, items):
@@ -406,11 +433,6 @@
"""
logging.info('upload_items(items=%d)', len(items))
- # TODO(vadimsh): Optimize special case of len(items) == 1 that is frequently
- # used by swarming.py. There's no need to spawn multiple threads and try to
- # do stuff in parallel: there's nothing to parallelize. 'contains' check and
- # 'push' should be performed sequentially in the context of current thread.
-
# Ensure all digests are calculated.
for item in items:
item.prepare(self._hash_algo)
@@ -510,6 +532,8 @@
def push(content):
"""Pushes an Item and returns it to |channel|."""
+ if self._aborted:
+ raise Aborted()
item.prepare(self._hash_algo)
self._storage_api.push(item, push_state, content)
return item
@@ -525,6 +549,8 @@
# TODO(vadimsh): Implement streaming uploads. Before it's done, assemble
# content right here. It will block until all file is zipped.
try:
+ if self._aborted:
+ raise Aborted()
stream = zip_compress(item.content(), item.compression_level)
data = ''.join(stream)
except Exception as exc:
@@ -607,11 +633,15 @@
for item in items:
item.prepare(self._hash_algo)
+ def contains(batch):
+ if self._aborted:
+ raise Aborted()
+ return self._storage_api.contains(batch)
+
# Enqueue all requests.
for batch in batch_items_for_check(items):
self.net_thread_pool.add_task_with_channel(
- channel, threading_utils.PRIORITY_HIGH,
- self._storage_api.contains, batch)
+ channel, threading_utils.PRIORITY_HIGH, contains, batch)
pending += 1
# Yield results as they come in.
@@ -1086,8 +1116,6 @@
push_state.finalized = True
def contains(self, items):
- logging.info('Checking existence of %d files...', len(items))
-
# Ensure all items were initialized with 'prepare' call. Storage does that.
assert all(i.digest is not None and i.size is not None for i in items)