blob: cb1228b687671759c66a0205437cf01aa990c717 [file] [log] [blame]
Marc-Antoine Ruel34f5f282018-05-16 16:04:31 -04001# Copyright 2018 The LUCI Authors. All rights reserved.
2# Use of this source code is governed under the Apache License, Version 2.0
3# that can be found in the LICENSE file.
4
5"""Define local cache policies."""
6
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -04007import errno
8import io
9import logging
10import os
11import sys
12
13from utils import file_path
14from utils import fs
15from utils import lru
16from utils import threading_utils
17from utils import tools
18
19
20# The file size to be used when we don't know the correct file size,
21# generally used for .isolated files.
22UNKNOWN_FILE_SIZE = None
23
24
25def file_write(path, content_generator):
26 """Writes file content as generated by content_generator.
27
28 Creates the intermediary directory as needed.
29
30 Returns the number of bytes written.
31
32 Meant to be mocked out in unit tests.
33 """
34 file_path.ensure_tree(os.path.dirname(path))
35 total = 0
36 with fs.open(path, 'wb') as f:
37 for d in content_generator:
38 total += len(d)
39 f.write(d)
40 return total
41
42
43def is_valid_file(path, size):
44 """Determines if the given files appears valid.
45
46 Currently it just checks the file exists and its size matches the expectation.
47 """
48 if size == UNKNOWN_FILE_SIZE:
49 return fs.isfile(path)
50 try:
51 actual_size = fs.stat(path).st_size
52 except OSError as e:
53 logging.warning(
54 'Can\'t read item %s, assuming it\'s invalid: %s',
55 os.path.basename(path), e)
56 return False
57 if size != actual_size:
58 logging.warning(
59 'Found invalid item %s; %d != %d',
60 os.path.basename(path), actual_size, size)
61 return False
62 return True
63
64
65class NoMoreSpace(Exception):
66 """Not enough space to map the whole directory."""
67 pass
68
Marc-Antoine Ruel34f5f282018-05-16 16:04:31 -040069
70class CachePolicies(object):
71 def __init__(self, max_cache_size, min_free_space, max_items, max_age_secs):
72 """Common caching policies for the multiple caches (isolated, named, cipd).
73
74 Arguments:
75 - max_cache_size: Trim if the cache gets larger than this value. If 0, the
76 cache is effectively a leak.
77 - min_free_space: Trim if disk free space becomes lower than this value. If
78 0, it will unconditionally fill the disk.
79 - max_items: Maximum number of items to keep in the cache. If 0, do not
80 enforce a limit.
81 - max_age_secs: Maximum age an item is kept in the cache until it is
82 automatically evicted. Having a lot of dead luggage slows
83 everything down.
84 """
85 self.max_cache_size = max_cache_size
86 self.min_free_space = min_free_space
87 self.max_items = max_items
88 self.max_age_secs = max_age_secs
89
90 def __str__(self):
91 return (
92 'CachePolicies(max_cache_size=%s; max_items=%s; min_free_space=%s; '
93 'max_age_secs=%s)') % (
94 self.max_cache_size, self.max_items, self.min_free_space,
95 self.max_age_secs)
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -040096
97
98class CacheMiss(Exception):
99 """Raised when an item is not in cache."""
100
101 def __init__(self, digest):
102 self.digest = digest
103 super(CacheMiss, self).__init__(
104 'Item with digest %r is not found in cache' % digest)
105
106
107class ContentAddressedCache(object):
108 """Content addressed cache that stores objects temporarily.
109
110 It can be accessed concurrently from multiple threads, so it should protect
111 its internal state with some lock.
112 """
113 cache_dir = None
114
115 def __init__(self):
116 self._lock = threading_utils.LockWithAssert()
117 # Profiling values.
118 self._added = []
119 self._initial_number_items = 0
120 self._initial_size = 0
121 self._evicted = []
122 self._used = []
123
124 def __contains__(self, digest):
125 raise NotImplementedError()
126
127 def __enter__(self):
128 """Context manager interface."""
129 return self
130
131 def __exit__(self, _exc_type, _exec_value, _traceback):
132 """Context manager interface."""
133 return False
134
135 @property
136 def added(self):
137 return self._added[:]
138
139 @property
140 def evicted(self):
141 return self._evicted[:]
142
143 @property
144 def used(self):
145 return self._used[:]
146
147 @property
148 def initial_number_items(self):
149 return self._initial_number_items
150
151 @property
152 def initial_size(self):
153 return self._initial_size
154
155 @property
156 def number_items(self):
157 """Returns the total size of the cache in bytes."""
158 raise NotImplementedError()
159
160 @property
161 def total_size(self):
162 """Returns the total size of the cache in bytes."""
163 raise NotImplementedError()
164
165 def cached_set(self):
166 """Returns a set of all cached digests (always a new object)."""
167 raise NotImplementedError()
168
169 def cleanup(self):
170 """Deletes any corrupted item from the cache and trims it if necessary."""
171 raise NotImplementedError()
172
173 def touch(self, digest, size):
174 """Ensures item is not corrupted and updates its LRU position.
175
176 Arguments:
177 digest: hash digest of item to check.
178 size: expected size of this item.
179
180 Returns:
181 True if item is in cache and not corrupted.
182 """
183 raise NotImplementedError()
184
185 def evict(self, digest):
186 """Removes item from cache if it's there."""
187 raise NotImplementedError()
188
189 def getfileobj(self, digest):
190 """Returns a readable file like object.
191
192 If file exists on the file system it will have a .name attribute with an
193 absolute path to the file.
194 """
195 raise NotImplementedError()
196
197 def write(self, digest, content):
198 """Reads data from |content| generator and stores it in cache.
199
200 Returns digest to simplify chaining.
201 """
202 raise NotImplementedError()
203
204 def trim(self):
205 """Enforces cache policies.
206
207 Returns:
208 Number of items evicted.
209 """
210 raise NotImplementedError()
211
212
213class MemoryContentAddressedCache(ContentAddressedCache):
214 """ContentAddressedCache implementation that stores everything in memory."""
215
216 def __init__(self, file_mode_mask=0500):
217 """Args:
218 file_mode_mask: bit mask to AND file mode with. Default value will make
219 all mapped files to be read only.
220 """
221 super(MemoryContentAddressedCache, self).__init__()
222 self._file_mode_mask = file_mode_mask
223 self._contents = {}
224
225 def __contains__(self, digest):
226 with self._lock:
227 return digest in self._contents
228
229 @property
230 def number_items(self):
231 with self._lock:
232 return len(self._contents)
233
234 @property
235 def total_size(self):
236 with self._lock:
237 return sum(len(i) for i in self._contents.itervalues())
238
239 def cached_set(self):
240 with self._lock:
241 return set(self._contents)
242
243 def cleanup(self):
244 pass
245
246 def touch(self, digest, size):
247 with self._lock:
248 return digest in self._contents
249
250 def evict(self, digest):
251 with self._lock:
252 v = self._contents.pop(digest, None)
253 if v is not None:
254 self._evicted.add(v)
255
256 def getfileobj(self, digest):
257 with self._lock:
258 try:
259 d = self._contents[digest]
260 except KeyError:
261 raise CacheMiss(digest)
262 self._used.append(len(d))
263 return io.BytesIO(d)
264
265 def write(self, digest, content):
266 # Assemble whole stream before taking the lock.
267 data = ''.join(content)
268 with self._lock:
269 self._contents[digest] = data
270 self._added.append(len(data))
271 return digest
272
273 def trim(self):
274 """Trimming is not implemented for MemoryContentAddressedCache."""
275 return 0
276
277
278class DiskContentAddressedCache(ContentAddressedCache):
279 """Stateful LRU cache in a flat hash table in a directory.
280
281 Saves its state as json file.
282 """
283 STATE_FILE = u'state.json'
284
285 def __init__(self, cache_dir, policies, hash_algo, trim, time_fn=None):
286 """
287 Arguments:
288 cache_dir: directory where to place the cache.
289 policies: CachePolicies instance, cache retention policies.
290 algo: hashing algorithm used.
291 trim: if True to enforce |policies| right away.
292 It can be done later by calling trim() explicitly.
293 """
294 # All protected methods (starting with '_') except _path should be called
295 # with self._lock held.
296 super(DiskContentAddressedCache, self).__init__()
297 self.cache_dir = cache_dir
298 self.policies = policies
299 self.hash_algo = hash_algo
300 self.state_file = os.path.join(cache_dir, self.STATE_FILE)
301 # Items in a LRU lookup dict(digest: size).
302 self._lru = lru.LRUDict()
303 # Current cached free disk space. It is updated by self._trim().
304 file_path.ensure_tree(self.cache_dir)
305 self._free_disk = file_path.get_free_space(self.cache_dir)
306 # The first item in the LRU cache that must not be evicted during this run
307 # since it was referenced. All items more recent that _protected in the LRU
308 # cache are also inherently protected. It could be a set() of all items
309 # referenced but this increases memory usage without a use case.
310 self._protected = None
311 # Cleanup operations done by self._load(), if any.
312 self._operations = []
313 with tools.Profiler('Setup'):
314 with self._lock:
315 self._load(trim, time_fn)
316
317 def __contains__(self, digest):
318 with self._lock:
319 return digest in self._lru
320
321 def __enter__(self):
322 return self
323
324 def __exit__(self, _exc_type, _exec_value, _traceback):
325 with tools.Profiler('CleanupTrimming'):
326 with self._lock:
327 self._trim()
328
329 logging.info(
330 '%5d (%8dkb) added',
331 len(self._added), sum(self._added) / 1024)
332 logging.info(
333 '%5d (%8dkb) current',
334 len(self._lru),
335 sum(self._lru.itervalues()) / 1024)
336 logging.info(
337 '%5d (%8dkb) evicted',
338 len(self._evicted), sum(self._evicted) / 1024)
339 logging.info(
340 ' %8dkb free',
341 self._free_disk / 1024)
342 return False
343
344 @property
345 def number_items(self):
346 with self._lock:
347 return len(self._lru)
348
349 @property
350 def total_size(self):
351 with self._lock:
352 return sum(self._lru.itervalues())
353
354 def cached_set(self):
355 with self._lock:
356 return self._lru.keys_set()
357
358 def cleanup(self):
359 """Cleans up the cache directory.
360
361 Ensures there is no unknown files in cache_dir.
362 Ensures the read-only bits are set correctly.
363
364 At that point, the cache was already loaded, trimmed to respect cache
365 policies.
366 """
367 with self._lock:
368 fs.chmod(self.cache_dir, 0700)
369 # Ensure that all files listed in the state still exist and add new ones.
370 previous = self._lru.keys_set()
371 # It'd be faster if there were a readdir() function.
372 for filename in fs.listdir(self.cache_dir):
373 if filename == self.STATE_FILE:
374 fs.chmod(os.path.join(self.cache_dir, filename), 0600)
375 continue
376 if filename in previous:
377 fs.chmod(os.path.join(self.cache_dir, filename), 0400)
378 previous.remove(filename)
379 continue
380
381 # An untracked file. Delete it.
382 logging.warning('Removing unknown file %s from cache', filename)
383 p = self._path(filename)
384 if fs.isdir(p):
385 try:
386 file_path.rmtree(p)
387 except OSError:
388 pass
389 else:
390 file_path.try_remove(p)
391 continue
392
393 if previous:
394 # Filter out entries that were not found.
395 logging.warning('Removed %d lost files', len(previous))
396 for filename in previous:
397 self._lru.pop(filename)
398 self._save()
399
400 # What remains to be done is to hash every single item to
401 # detect corruption, then save to ensure state.json is up to date.
402 # Sadly, on a 50GiB cache with 100MiB/s I/O, this is still over 8 minutes.
403 # TODO(maruel): Let's revisit once directory metadata is stored in
404 # state.json so only the files that had been mapped since the last cleanup()
405 # call are manually verified.
406 #
407 #with self._lock:
408 # for digest in self._lru:
409 # if not isolated_format.is_valid_hash(
410 # self._path(digest), self.hash_algo):
411 # self.evict(digest)
412 # logging.info('Deleted corrupted item: %s', digest)
413
414 def touch(self, digest, size):
415 """Verifies an actual file is valid and bumps its LRU position.
416
417 Returns False if the file is missing or invalid. Doesn't kick it from LRU
418 though (call 'evict' explicitly).
419
420 Note that is doesn't compute the hash so it could still be corrupted if the
421 file size didn't change.
422
423 TODO(maruel): More stringent verification while keeping the check fast.
424 """
425 # Do the check outside the lock.
426 if not is_valid_file(self._path(digest), size):
427 return False
428
429 # Update its LRU position.
430 with self._lock:
431 if digest not in self._lru:
432 return False
433 self._lru.touch(digest)
434 self._protected = self._protected or digest
435 return True
436
437 def evict(self, digest):
438 with self._lock:
439 # Do not check for 'digest == self._protected' since it could be because
440 # the object is corrupted.
441 self._lru.pop(digest)
442 self._delete_file(digest, UNKNOWN_FILE_SIZE)
443
444 def getfileobj(self, digest):
445 try:
446 f = fs.open(self._path(digest), 'rb')
447 with self._lock:
448 self._used.append(self._lru[digest])
449 return f
450 except IOError:
451 raise CacheMiss(digest)
452
453 def write(self, digest, content):
454 assert content is not None
455 with self._lock:
456 self._protected = self._protected or digest
457 path = self._path(digest)
458 # A stale broken file may remain. It is possible for the file to have write
459 # access bit removed which would cause the file_write() call to fail to open
460 # in write mode. Take no chance here.
461 file_path.try_remove(path)
462 try:
463 size = file_write(path, content)
464 except:
465 # There are two possible places were an exception can occur:
466 # 1) Inside |content| generator in case of network or unzipping errors.
467 # 2) Inside file_write itself in case of disk IO errors.
468 # In any case delete an incomplete file and propagate the exception to
469 # caller, it will be logged there.
470 file_path.try_remove(path)
471 raise
472 # Make the file read-only in the cache. This has a few side-effects since
473 # the file node is modified, so every directory entries to this file becomes
474 # read-only. It's fine here because it is a new file.
475 file_path.set_read_only(path, True)
476 with self._lock:
477 self._add(digest, size)
478 return digest
479
480 def get_oldest(self):
481 """Returns digest of the LRU item or None."""
482 try:
483 return self._lru.get_oldest()[0]
484 except KeyError:
485 return None
486
487 def get_timestamp(self, digest):
488 """Returns timestamp of last use of an item.
489
490 Raises KeyError if item is not found.
491 """
492 return self._lru.get_timestamp(digest)
493
494 def trim(self):
495 """Forces retention policies."""
496 with self._lock:
497 return self._trim()
498
499 def _load(self, trim, time_fn):
500 """Loads state of the cache from json file.
501
502 If cache_dir does not exist on disk, it is created.
503 """
504 self._lock.assert_locked()
505
506 if not fs.isfile(self.state_file):
507 if not fs.isdir(self.cache_dir):
508 fs.makedirs(self.cache_dir)
509 else:
510 # Load state of the cache.
511 try:
512 self._lru = lru.LRUDict.load(self.state_file)
513 except ValueError as err:
514 logging.error('Failed to load cache state: %s' % (err,))
515 # Don't want to keep broken state file.
516 file_path.try_remove(self.state_file)
517 if time_fn:
518 self._lru.time_fn = time_fn
519 if trim:
520 self._trim()
521 # We want the initial cache size after trimming, i.e. what is readily
522 # avaiable.
523 self._initial_number_items = len(self._lru)
524 self._initial_size = sum(self._lru.itervalues())
525 if self._evicted:
526 logging.info(
527 'Trimming evicted items with the following sizes: %s',
528 sorted(self._evicted))
529
530 def _save(self):
531 """Saves the LRU ordering."""
532 self._lock.assert_locked()
533 if sys.platform != 'win32':
534 d = os.path.dirname(self.state_file)
535 if fs.isdir(d):
536 # Necessary otherwise the file can't be created.
537 file_path.set_read_only(d, False)
538 if fs.isfile(self.state_file):
539 file_path.set_read_only(self.state_file, False)
540 self._lru.save(self.state_file)
541
542 def _trim(self):
543 """Trims anything we don't know, make sure enough free space exists."""
544 self._lock.assert_locked()
545
546 # Trim old items.
547 if self.policies.max_age_secs:
548 cutoff = self._lru.time_fn() - self.policies.max_age_secs
549 while self._lru:
550 oldest = self._lru.get_oldest()
551 if oldest[1][1] >= cutoff:
552 break
553 self._remove_lru_file(True)
554
555 # Ensure maximum cache size.
556 if self.policies.max_cache_size:
557 total_size = sum(self._lru.itervalues())
558 while total_size > self.policies.max_cache_size:
559 total_size -= self._remove_lru_file(True)
560
561 # Ensure maximum number of items in the cache.
562 if self.policies.max_items and len(self._lru) > self.policies.max_items:
563 for _ in xrange(len(self._lru) - self.policies.max_items):
564 self._remove_lru_file(True)
565
566 # Ensure enough free space.
567 self._free_disk = file_path.get_free_space(self.cache_dir)
568 trimmed_due_to_space = 0
569 while (
570 self.policies.min_free_space and
571 self._lru and
572 self._free_disk < self.policies.min_free_space):
573 trimmed_due_to_space += 1
574 self._remove_lru_file(True)
575
576 if trimmed_due_to_space:
577 total_usage = sum(self._lru.itervalues())
578 usage_percent = 0.
579 if total_usage:
580 usage_percent = 100. * float(total_usage) / self.policies.max_cache_size
581
582 logging.warning(
583 'Trimmed %s file(s) due to not enough free disk space: %.1fkb free,'
584 ' %.1fkb cache (%.1f%% of its maximum capacity of %.1fkb)',
585 trimmed_due_to_space,
586 self._free_disk / 1024.,
587 total_usage / 1024.,
588 usage_percent,
589 self.policies.max_cache_size / 1024.)
590 self._save()
591 return trimmed_due_to_space
592
593 def _path(self, digest):
594 """Returns the path to one item."""
595 return os.path.join(self.cache_dir, digest)
596
597 def _remove_lru_file(self, allow_protected):
598 """Removes the lastest recently used file and returns its size."""
599 self._lock.assert_locked()
600 try:
601 digest, (size, _) = self._lru.get_oldest()
602 if not allow_protected and digest == self._protected:
603 total_size = sum(self._lru.itervalues())+size
604 msg = (
605 'Not enough space to fetch the whole isolated tree.\n'
606 ' %s\n cache=%dbytes, %d items; %sb free_space') % (
607 self.policies, total_size, len(self._lru)+1, self._free_disk)
608 raise NoMoreSpace(msg)
609 except KeyError:
610 # That means an internal error.
611 raise NoMoreSpace('Nothing to remove, can\'t happend')
612 digest, (size, _) = self._lru.pop_oldest()
613 logging.debug('Removing LRU file %s', digest)
614 self._delete_file(digest, size)
615 return size
616
617 def _add(self, digest, size=UNKNOWN_FILE_SIZE):
618 """Adds an item into LRU cache marking it as a newest one."""
619 self._lock.assert_locked()
620 if size == UNKNOWN_FILE_SIZE:
621 size = fs.stat(self._path(digest)).st_size
622 self._added.append(size)
623 self._lru.add(digest, size)
624 self._free_disk -= size
625 # Do a quicker version of self._trim(). It only enforces free disk space,
626 # not cache size limits. It doesn't actually look at real free disk space,
627 # only uses its cache values. self._trim() will be called later to enforce
628 # real trimming but doing this quick version here makes it possible to map
629 # an isolated that is larger than the current amount of free disk space when
630 # the cache size is already large.
631 while (
632 self.policies.min_free_space and
633 self._lru and
634 self._free_disk < self.policies.min_free_space):
635 if self._remove_lru_file(False) == -1:
636 break
637
638 def _delete_file(self, digest, size=UNKNOWN_FILE_SIZE):
639 """Deletes cache file from the file system."""
640 self._lock.assert_locked()
641 try:
642 if size == UNKNOWN_FILE_SIZE:
643 try:
644 size = fs.stat(self._path(digest)).st_size
645 except OSError:
646 size = 0
647 file_path.try_remove(self._path(digest))
648 self._evicted.append(size)
649 self._free_disk += size
650 except OSError as e:
651 if e.errno != errno.ENOENT:
652 logging.error('Error attempting to delete a file %s:\n%s' % (digest, e))