[swarming] Add CIPD error handling to bot code
Bug: b/242499088
Change-Id: I5147d01d54a39d7df683ec74da18b5c623492f2b
Reviewed-on: https://chromium-review.googlesource.com/c/infra/luci/luci-py/+/3831818
Reviewed-by: Vadim Shtayura <vadimsh@chromium.org>
Reviewed-by: Michal Kiedys <mkiedys@google.com>
Commit-Queue: Justin Luong <justinluong@google.com>
NOKEYCHECK=True
GitOrigin-RevId: c0cba13923a71757b188698d7758826106515032
diff --git a/errors.py b/errors.py
new file mode 100644
index 0000000..fb48e86
--- /dev/null
+++ b/errors.py
@@ -0,0 +1,54 @@
+# Copyright 2022 The LUCI Authors. All rights reserved.
+# Use of this source code is governed under the Apache License, Version 2.0
+# that can be found in the LICENSE file.
+"""Exceptions for handling CIPD and CAS errors"""
+
+
+class NonRecoverableException(Exception):
+ """For handling errors where we cannot recover from and should not retry."""
+
+ def __init__(self, status, msg):
+ super(Exception, self).__init__(msg)
+ self.status = status
+
+ def to_dict(self):
+ """Returns a dictionary with the attributes serialised."""
+ raise NotImplementedError()
+
+
+class NonRecoverableCasException(NonRecoverableException):
+ """For handling a bad CAS input where we should not attempt to retry."""
+
+ def __init__(self, status, digest, instance):
+ super(NonRecoverableCasException, self).__init__(
+ status, "CAS error: {} with digest {} on instance {}".format(
+ status, digest, instance))
+ self.digest = digest
+ self.instance = instance
+
+ def to_dict(self):
+ return {
+ 'status': self.status,
+ 'digest': self.digest,
+ 'instance': self.instance,
+ }
+
+
+class NonRecoverableCipdException(NonRecoverableException):
+ """For handling a bad CIPD package where we should not attempt to retry."""
+
+ def __init__(self, status, package_name, path, version):
+ super(NonRecoverableCipdException, self).__init__(
+ status, "CIPD error: {} with package {}, version {} on path {}".format(
+ status, package_name, version, path))
+ self.package_name = package_name
+ self.path = path
+ self.version = version
+
+ def to_dict(self):
+ return {
+ 'status': self.status,
+ 'package_name': self.package_name,
+ 'path': self.path,
+ 'version': self.version
+ }