Moving lro lib into cros-platform
Straight move to start.
The next CL will update to make it work in cros-platform (so the diff is
clearer and decoupled from the git repo move itself).
BUG=None
TEST=None
Change-Id: I50999b152f098eabaacd04a17c072db31293391e
Reviewed-on: https://chromium-review.googlesource.com/c/chromiumos/platform/dev-util/+/2875880
Tested-by: C Shapiro <shapiroc@chromium.org>
Auto-Submit: C Shapiro <shapiroc@chromium.org>
Reviewed-by: Seewai Fu <seewaifu@google.com>
Reviewed-by: Jae Hoon Kim <kimjae@chromium.org>
Commit-Queue: C Shapiro <shapiroc@chromium.org>
diff --git a/lib/src/chromiumos/lro/manager.go b/lib/src/chromiumos/lro/manager.go
new file mode 100644
index 0000000..6c4a865
--- /dev/null
+++ b/lib/src/chromiumos/lro/manager.go
@@ -0,0 +1,228 @@
+// Copyright 2020 The Chromium OS Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+// Package lro provides a universal implementation of longrunning.OperationsServer,
+// and helper functions for dealing with long-running operations.
+package lro
+
+import (
+ "context"
+ "fmt"
+ "log"
+ "sync"
+ "time"
+
+ "github.com/golang/protobuf/proto"
+ "github.com/golang/protobuf/ptypes"
+ "github.com/golang/protobuf/ptypes/empty"
+ "github.com/google/uuid"
+ "go.chromium.org/chromiumos/config/go/api/test/tls/dependencies/longrunning"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
+)
+
+// operation is used by Manager to hold extra metadata.
+type operation struct {
+ op *longrunning.Operation
+ finishTime time.Time
+ done chan struct{}
+}
+
+// Manager keeps track of longrunning operations and serves operations related requests.
+// Manager implements longrunning.OperationsServer.
+// Manager is safe to use concurrently.
+// Finished operations are expired after 30 days.
+type Manager struct {
+ mu sync.Mutex
+ // Provide stubs for unimplemented methods
+ longrunning.UnimplementedOperationsServer
+ // Mapping of operation name to operation.
+ operations map[string]*operation
+ // expiryStopper signals the expiration goroutine to terminate.
+ expiryStopper chan struct{}
+}
+
+// New returns a new Manager which must be closed after use.
+func New() *Manager {
+ m := &Manager{
+ operations: make(map[string]*operation),
+ expiryStopper: make(chan struct{}),
+ }
+ go func() {
+ for {
+ select {
+ case <-m.expiryStopper:
+ return
+ case <-time.After(time.Hour):
+ m.deleteExpiredOperations()
+ }
+ }
+ }()
+ return m
+}
+
+// Close will close the Manager.
+func (m *Manager) Close() {
+ close(m.expiryStopper)
+}
+
+// NewOperation returns a new longrunning.Operation managed by Manager.
+// The caller should return this directly from the gRPC method without
+// modifying it or inspecting it, except to read the Name field.
+func (m *Manager) NewOperation() *longrunning.Operation {
+ m.mu.Lock()
+ defer m.mu.Unlock()
+ name := "operations/" + uuid.New().String()
+ if _, ok := m.operations[name]; ok {
+ panic("Generated a duplicate UUID, likely due to RNG issue.")
+ }
+ m.operations[name] = &operation{
+ op: &longrunning.Operation{
+ Name: name,
+ },
+ done: make(chan struct{}),
+ }
+ return m.operations[name].op
+}
+
+func (m *Manager) delete(name string) error {
+ m.mu.Lock()
+ defer m.mu.Unlock()
+ if _, ok := m.operations[name]; !ok {
+ return fmt.Errorf("lro delete: unknown name %s", name)
+ }
+ if !m.operations[name].op.Done {
+ close(m.operations[name].done)
+ }
+ delete(m.operations, name)
+ return nil
+}
+
+func (m *Manager) deleteExpiredOperations() {
+ m.mu.Lock()
+ defer m.mu.Unlock()
+ for name, operation := range m.operations {
+ // Don't do anything for an Operation which isn't done.
+ if !operation.op.Done {
+ continue
+ }
+ // If finish time is nil, panic as it should have been set when done.
+ if operation.finishTime.IsZero() {
+ panic(fmt.Sprintf("Missing finishTime for %s", name))
+ }
+ // Remove the Operation after 30 days of being done.
+ expire := operation.finishTime.Add(30 * 24 * time.Hour)
+ if time.Now().After(expire) {
+ log.Printf("lro deleteExpiredOperations: deleting expired %s", name)
+ delete(m.operations, name)
+ }
+ }
+}
+
+// SetResult sets the operation with the given name to done with Operation response.
+// After calling this method, the caller must not mutate or read the passed-in argument
+// as the manager must ensure safe concurrent access.
+func (m *Manager) SetResult(name string, resp proto.Message) error {
+ m.mu.Lock()
+ defer m.mu.Unlock()
+ if _, ok := m.operations[name]; !ok {
+ return fmt.Errorf("lro SetResult: unknown name %s", name)
+ }
+ if m.operations[name].op.Done {
+ return fmt.Errorf("lro SetResult: name %s is already done", name)
+ }
+ a, err := ptypes.MarshalAny(resp)
+ if err != nil {
+ return err
+ }
+ m.operations[name].op.Result = &longrunning.Operation_Response{
+ Response: a,
+ }
+ m.operations[name].finishTime = time.Now()
+ m.operations[name].op.Done = true
+ close(m.operations[name].done)
+ return nil
+}
+
+// SetError sets the operation with the given name to done with Operation error.
+// After calling this method, the caller must not mutate or read the passed-in argument
+// as the manager must ensure safe concurrent access.
+func (m *Manager) SetError(name string, opErr *status.Status) error {
+ m.mu.Lock()
+ defer m.mu.Unlock()
+ if _, ok := m.operations[name]; !ok {
+ return fmt.Errorf("lro SetError: unknown name %s", name)
+ }
+ if m.operations[name].op.Done {
+ return fmt.Errorf("lro SetError: name %s is already done", name)
+ }
+ s := opErr.Proto()
+ m.operations[name].op.Result = &longrunning.Operation_Error{
+ Error: &longrunning.Status{
+ Code: s.GetCode(),
+ Message: s.GetMessage(),
+ Details: s.GetDetails(),
+ },
+ }
+ m.operations[name].finishTime = time.Now()
+ m.operations[name].op.Done = true
+ close(m.operations[name].done)
+ return nil
+}
+
+func (m *Manager) getOperationClone(name string) (*longrunning.Operation, error) {
+ m.mu.Lock()
+ defer m.mu.Unlock()
+ v, ok := m.operations[name]
+ if !ok {
+ return nil, status.Errorf(codes.NotFound, "name %s does not exist", name)
+ }
+ return proto.Clone(v.op).(*longrunning.Operation), nil
+}
+
+// GetOperation returns the longrunning.Operation if managed.
+func (m *Manager) GetOperation(ctx context.Context, req *longrunning.GetOperationRequest) (*longrunning.Operation, error) {
+ return m.getOperationClone(req.Name)
+}
+
+// DeleteOperation deletes the longrunning.Operation if managed.
+func (m *Manager) DeleteOperation(ctx context.Context, req *longrunning.DeleteOperationRequest) (*empty.Empty, error) {
+ name := req.Name
+ if err := m.delete(name); err != nil {
+ return nil, status.Error(codes.NotFound, fmt.Sprintf("failed to delete name %s, %s", name, err))
+ }
+ return &empty.Empty{}, nil
+}
+
+func (m *Manager) getOperationChannel(name string) (chan struct{}, bool) {
+ m.mu.Lock()
+ defer m.mu.Unlock()
+ v, ok := m.operations[name]
+ if !ok {
+ return nil, ok
+ }
+ return v.done, ok
+}
+
+// WaitOperation returns once the longrunning.Operation is done or timeout.
+func (m *Manager) WaitOperation(ctx context.Context, req *longrunning.WaitOperationRequest) (*longrunning.Operation, error) {
+ name := req.Name
+ ch, ok := m.getOperationChannel(name)
+ if !ok {
+ return nil, status.Error(codes.NotFound, fmt.Sprintf("name %s does not exist", name))
+ }
+
+ if req.Timeout != nil && req.Timeout.Seconds > 0 {
+ var cancel context.CancelFunc
+ ctx, cancel = context.WithTimeout(ctx, req.Timeout.AsDuration())
+ defer cancel()
+ }
+
+ // Wait until the operation is done or timeout.
+ select {
+ case <-ch:
+ case <-ctx.Done():
+ }
+ return m.getOperationClone(name)
+}
diff --git a/lib/src/chromiumos/lro/manager_test.go b/lib/src/chromiumos/lro/manager_test.go
new file mode 100644
index 0000000..531e505
--- /dev/null
+++ b/lib/src/chromiumos/lro/manager_test.go
@@ -0,0 +1,53 @@
+package lro_test
+
+import (
+ "context"
+ "net"
+
+ "infra/libs/lro"
+
+ "go.chromium.org/chromiumos/config/go/api/test/tls"
+ "go.chromium.org/chromiumos/config/go/api/test/tls/dependencies/longrunning"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
+)
+
+type exampleServer struct {
+ tls.UnimplementedCommonServer
+ *lro.Manager
+}
+
+func (s *exampleServer) Serve(l net.Listener) error {
+ s.Manager = lro.New()
+ defer s.Manager.Close()
+ server := grpc.NewServer()
+ tls.RegisterCommonServer(server, s)
+ longrunning.RegisterOperationsServer(server, s.Manager)
+ return server.Serve(l)
+}
+
+func (s *exampleServer) ProvisionDut(ctx context.Context, req *tls.ProvisionDutRequest) (*longrunning.Operation, error) {
+ op := s.Manager.NewOperation()
+ go s.provision(ctx, req, op.Name)
+ return op, nil
+}
+
+func (s *exampleServer) provision(ctx context.Context, req *tls.ProvisionDutRequest, op string) {
+ if req.GetName() != "some host" {
+ s.Manager.SetError(op, status.Newf(codes.NotFound, "Unknown DUT %s", req.GetName()))
+ return
+ }
+ s.Manager.SetResult(op, &tls.ProvisionDutResponse{})
+}
+
+func Example() {
+ l, err := net.Listen("tcp", ":0")
+ if err != nil {
+ panic(err)
+ }
+ s := exampleServer{}
+ if err := s.Serve(l); err != nil {
+ panic(err)
+ }
+}
diff --git a/lib/src/chromiumos/lro/wait.go b/lib/src/chromiumos/lro/wait.go
new file mode 100644
index 0000000..d68f85c
--- /dev/null
+++ b/lib/src/chromiumos/lro/wait.go
@@ -0,0 +1,62 @@
+// Copyright 2020 The Chromium OS Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+package lro
+
+import (
+ "context"
+ "math/rand"
+ "time"
+
+ "go.chromium.org/chromiumos/config/go/api/test/tls/dependencies/longrunning"
+ "go.chromium.org/luci/common/clock"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/codes"
+ status "google.golang.org/grpc/status"
+)
+
+// Wait waits until the long-running operation specified by the provided
+// operation name is done. If the operation is already done,
+// it returns immediately.
+// Unlike OperationsClient's WaitOperation(), it only returns on context
+// timeout or completion of the operation.
+func Wait(ctx context.Context, client longrunning.OperationsClient, name string, opts ...grpc.CallOption) (*longrunning.Operation, error) {
+ // Exponential backoff is used for retryable gRPC errors. In future, we
+ // may want to make these parameters configurable.
+ const initialBackoffMillis = 1000
+ const maxAttempts = 4
+ attempt := 0
+
+ // WaitOperation() can return before the provided timeout even though the
+ // underlying operation is in progress. It may also fail for retryable
+ // reasons. Thus, we must loop until timeout ourselves.
+ for {
+ // WaitOperation respects timeout in the RPC Context as well as through
+ // an explicit field in WaitOperationRequest. We depend on Context
+ // cancellation for timeouts (like everywhere else in this codebase).
+ // On timeout, WaitOperation() will return an appropriate error
+ // response.
+ op, err := client.WaitOperation(ctx, &longrunning.WaitOperationRequest{
+ Name: name,
+ }, opts...)
+ switch status.Code(err) {
+ case codes.OK:
+ attempt = 0
+ case codes.Unavailable, codes.ResourceExhausted:
+ // Retryable error; retry with exponential backoff.
+ if attempt >= maxAttempts {
+ return op, err
+ }
+ delay := rand.Int63n(initialBackoffMillis * (1 << attempt))
+ clock.Sleep(ctx, time.Duration(delay)*time.Millisecond)
+ attempt++
+ default:
+ // Non-retryable error
+ return op, err
+ }
+ if op.Done {
+ return op, nil
+ }
+ }
+}