Moving lro lib into cros-platform

Straight move to start.
The next CL will update to make it work in cros-platform (so the diff is
clearer and decoupled from the git repo move itself).

BUG=None
TEST=None

Change-Id: I50999b152f098eabaacd04a17c072db31293391e
Reviewed-on: https://chromium-review.googlesource.com/c/chromiumos/platform/dev-util/+/2875880
Tested-by: C Shapiro <shapiroc@chromium.org>
Auto-Submit: C Shapiro <shapiroc@chromium.org>
Reviewed-by: Seewai Fu <seewaifu@google.com>
Reviewed-by: Jae Hoon Kim <kimjae@chromium.org>
Commit-Queue: C Shapiro <shapiroc@chromium.org>
diff --git a/lib/src/chromiumos/lro/manager.go b/lib/src/chromiumos/lro/manager.go
new file mode 100644
index 0000000..6c4a865
--- /dev/null
+++ b/lib/src/chromiumos/lro/manager.go
@@ -0,0 +1,228 @@
+// Copyright 2020 The Chromium OS Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+// Package lro provides a universal implementation of longrunning.OperationsServer,
+// and helper functions for dealing with long-running operations.
+package lro
+
+import (
+	"context"
+	"fmt"
+	"log"
+	"sync"
+	"time"
+
+	"github.com/golang/protobuf/proto"
+	"github.com/golang/protobuf/ptypes"
+	"github.com/golang/protobuf/ptypes/empty"
+	"github.com/google/uuid"
+	"go.chromium.org/chromiumos/config/go/api/test/tls/dependencies/longrunning"
+	"google.golang.org/grpc/codes"
+	"google.golang.org/grpc/status"
+)
+
+// operation is used by Manager to hold extra metadata.
+type operation struct {
+	op         *longrunning.Operation
+	finishTime time.Time
+	done       chan struct{}
+}
+
+// Manager keeps track of longrunning operations and serves operations related requests.
+// Manager implements longrunning.OperationsServer.
+// Manager is safe to use concurrently.
+// Finished operations are expired after 30 days.
+type Manager struct {
+	mu sync.Mutex
+	// Provide stubs for unimplemented methods
+	longrunning.UnimplementedOperationsServer
+	// Mapping of operation name to operation.
+	operations map[string]*operation
+	// expiryStopper signals the expiration goroutine to terminate.
+	expiryStopper chan struct{}
+}
+
+// New returns a new Manager which must be closed after use.
+func New() *Manager {
+	m := &Manager{
+		operations:    make(map[string]*operation),
+		expiryStopper: make(chan struct{}),
+	}
+	go func() {
+		for {
+			select {
+			case <-m.expiryStopper:
+				return
+			case <-time.After(time.Hour):
+				m.deleteExpiredOperations()
+			}
+		}
+	}()
+	return m
+}
+
+// Close will close the Manager.
+func (m *Manager) Close() {
+	close(m.expiryStopper)
+}
+
+// NewOperation returns a new longrunning.Operation managed by Manager.
+// The caller should return this directly from the gRPC method without
+// modifying it or inspecting it, except to read the Name field.
+func (m *Manager) NewOperation() *longrunning.Operation {
+	m.mu.Lock()
+	defer m.mu.Unlock()
+	name := "operations/" + uuid.New().String()
+	if _, ok := m.operations[name]; ok {
+		panic("Generated a duplicate UUID, likely due to RNG issue.")
+	}
+	m.operations[name] = &operation{
+		op: &longrunning.Operation{
+			Name: name,
+		},
+		done: make(chan struct{}),
+	}
+	return m.operations[name].op
+}
+
+func (m *Manager) delete(name string) error {
+	m.mu.Lock()
+	defer m.mu.Unlock()
+	if _, ok := m.operations[name]; !ok {
+		return fmt.Errorf("lro delete: unknown name %s", name)
+	}
+	if !m.operations[name].op.Done {
+		close(m.operations[name].done)
+	}
+	delete(m.operations, name)
+	return nil
+}
+
+func (m *Manager) deleteExpiredOperations() {
+	m.mu.Lock()
+	defer m.mu.Unlock()
+	for name, operation := range m.operations {
+		// Don't do anything for an Operation which isn't done.
+		if !operation.op.Done {
+			continue
+		}
+		// If finish time is nil, panic as it should have been set when done.
+		if operation.finishTime.IsZero() {
+			panic(fmt.Sprintf("Missing finishTime for %s", name))
+		}
+		// Remove the Operation after 30 days of being done.
+		expire := operation.finishTime.Add(30 * 24 * time.Hour)
+		if time.Now().After(expire) {
+			log.Printf("lro deleteExpiredOperations: deleting expired %s", name)
+			delete(m.operations, name)
+		}
+	}
+}
+
+// SetResult sets the operation with the given name to done with Operation response.
+// After calling this method, the caller must not mutate or read the passed-in argument
+// as the manager must ensure safe concurrent access.
+func (m *Manager) SetResult(name string, resp proto.Message) error {
+	m.mu.Lock()
+	defer m.mu.Unlock()
+	if _, ok := m.operations[name]; !ok {
+		return fmt.Errorf("lro SetResult: unknown name %s", name)
+	}
+	if m.operations[name].op.Done {
+		return fmt.Errorf("lro SetResult: name %s is already done", name)
+	}
+	a, err := ptypes.MarshalAny(resp)
+	if err != nil {
+		return err
+	}
+	m.operations[name].op.Result = &longrunning.Operation_Response{
+		Response: a,
+	}
+	m.operations[name].finishTime = time.Now()
+	m.operations[name].op.Done = true
+	close(m.operations[name].done)
+	return nil
+}
+
+// SetError sets the operation with the given name to done with Operation error.
+// After calling this method, the caller must not mutate or read the passed-in argument
+// as the manager must ensure safe concurrent access.
+func (m *Manager) SetError(name string, opErr *status.Status) error {
+	m.mu.Lock()
+	defer m.mu.Unlock()
+	if _, ok := m.operations[name]; !ok {
+		return fmt.Errorf("lro SetError: unknown name %s", name)
+	}
+	if m.operations[name].op.Done {
+		return fmt.Errorf("lro SetError: name %s is already done", name)
+	}
+	s := opErr.Proto()
+	m.operations[name].op.Result = &longrunning.Operation_Error{
+		Error: &longrunning.Status{
+			Code:    s.GetCode(),
+			Message: s.GetMessage(),
+			Details: s.GetDetails(),
+		},
+	}
+	m.operations[name].finishTime = time.Now()
+	m.operations[name].op.Done = true
+	close(m.operations[name].done)
+	return nil
+}
+
+func (m *Manager) getOperationClone(name string) (*longrunning.Operation, error) {
+	m.mu.Lock()
+	defer m.mu.Unlock()
+	v, ok := m.operations[name]
+	if !ok {
+		return nil, status.Errorf(codes.NotFound, "name %s does not exist", name)
+	}
+	return proto.Clone(v.op).(*longrunning.Operation), nil
+}
+
+// GetOperation returns the longrunning.Operation if managed.
+func (m *Manager) GetOperation(ctx context.Context, req *longrunning.GetOperationRequest) (*longrunning.Operation, error) {
+	return m.getOperationClone(req.Name)
+}
+
+// DeleteOperation deletes the longrunning.Operation if managed.
+func (m *Manager) DeleteOperation(ctx context.Context, req *longrunning.DeleteOperationRequest) (*empty.Empty, error) {
+	name := req.Name
+	if err := m.delete(name); err != nil {
+		return nil, status.Error(codes.NotFound, fmt.Sprintf("failed to delete name %s, %s", name, err))
+	}
+	return &empty.Empty{}, nil
+}
+
+func (m *Manager) getOperationChannel(name string) (chan struct{}, bool) {
+	m.mu.Lock()
+	defer m.mu.Unlock()
+	v, ok := m.operations[name]
+	if !ok {
+		return nil, ok
+	}
+	return v.done, ok
+}
+
+// WaitOperation returns once the longrunning.Operation is done or timeout.
+func (m *Manager) WaitOperation(ctx context.Context, req *longrunning.WaitOperationRequest) (*longrunning.Operation, error) {
+	name := req.Name
+	ch, ok := m.getOperationChannel(name)
+	if !ok {
+		return nil, status.Error(codes.NotFound, fmt.Sprintf("name %s does not exist", name))
+	}
+
+	if req.Timeout != nil && req.Timeout.Seconds > 0 {
+		var cancel context.CancelFunc
+		ctx, cancel = context.WithTimeout(ctx, req.Timeout.AsDuration())
+		defer cancel()
+	}
+
+	// Wait until the operation is done or timeout.
+	select {
+	case <-ch:
+	case <-ctx.Done():
+	}
+	return m.getOperationClone(name)
+}
diff --git a/lib/src/chromiumos/lro/manager_test.go b/lib/src/chromiumos/lro/manager_test.go
new file mode 100644
index 0000000..531e505
--- /dev/null
+++ b/lib/src/chromiumos/lro/manager_test.go
@@ -0,0 +1,53 @@
+package lro_test
+
+import (
+	"context"
+	"net"
+
+	"infra/libs/lro"
+
+	"go.chromium.org/chromiumos/config/go/api/test/tls"
+	"go.chromium.org/chromiumos/config/go/api/test/tls/dependencies/longrunning"
+	"google.golang.org/grpc"
+	"google.golang.org/grpc/codes"
+	"google.golang.org/grpc/status"
+)
+
+type exampleServer struct {
+	tls.UnimplementedCommonServer
+	*lro.Manager
+}
+
+func (s *exampleServer) Serve(l net.Listener) error {
+	s.Manager = lro.New()
+	defer s.Manager.Close()
+	server := grpc.NewServer()
+	tls.RegisterCommonServer(server, s)
+	longrunning.RegisterOperationsServer(server, s.Manager)
+	return server.Serve(l)
+}
+
+func (s *exampleServer) ProvisionDut(ctx context.Context, req *tls.ProvisionDutRequest) (*longrunning.Operation, error) {
+	op := s.Manager.NewOperation()
+	go s.provision(ctx, req, op.Name)
+	return op, nil
+}
+
+func (s *exampleServer) provision(ctx context.Context, req *tls.ProvisionDutRequest, op string) {
+	if req.GetName() != "some host" {
+		s.Manager.SetError(op, status.Newf(codes.NotFound, "Unknown DUT %s", req.GetName()))
+		return
+	}
+	s.Manager.SetResult(op, &tls.ProvisionDutResponse{})
+}
+
+func Example() {
+	l, err := net.Listen("tcp", ":0")
+	if err != nil {
+		panic(err)
+	}
+	s := exampleServer{}
+	if err := s.Serve(l); err != nil {
+		panic(err)
+	}
+}
diff --git a/lib/src/chromiumos/lro/wait.go b/lib/src/chromiumos/lro/wait.go
new file mode 100644
index 0000000..d68f85c
--- /dev/null
+++ b/lib/src/chromiumos/lro/wait.go
@@ -0,0 +1,62 @@
+// Copyright 2020 The Chromium OS Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+package lro
+
+import (
+	"context"
+	"math/rand"
+	"time"
+
+	"go.chromium.org/chromiumos/config/go/api/test/tls/dependencies/longrunning"
+	"go.chromium.org/luci/common/clock"
+	"google.golang.org/grpc"
+	"google.golang.org/grpc/codes"
+	status "google.golang.org/grpc/status"
+)
+
+// Wait waits until the long-running operation specified by the provided
+// operation name is done. If the operation is already done,
+// it returns immediately.
+// Unlike OperationsClient's WaitOperation(), it only returns on context
+// timeout or completion of the operation.
+func Wait(ctx context.Context, client longrunning.OperationsClient, name string, opts ...grpc.CallOption) (*longrunning.Operation, error) {
+	// Exponential backoff is used for retryable gRPC errors. In future, we
+	// may want to make these parameters configurable.
+	const initialBackoffMillis = 1000
+	const maxAttempts = 4
+	attempt := 0
+
+	// WaitOperation() can return before the provided timeout even though the
+	// underlying operation is in progress. It may also fail for retryable
+	// reasons. Thus, we must loop until timeout ourselves.
+	for {
+		// WaitOperation respects timeout in the RPC Context as well as through
+		// an explicit field in WaitOperationRequest. We depend on Context
+		// cancellation for timeouts (like everywhere else in this codebase).
+		// On timeout, WaitOperation() will return an appropriate error
+		// response.
+		op, err := client.WaitOperation(ctx, &longrunning.WaitOperationRequest{
+			Name: name,
+		}, opts...)
+		switch status.Code(err) {
+		case codes.OK:
+			attempt = 0
+		case codes.Unavailable, codes.ResourceExhausted:
+			// Retryable error; retry with exponential backoff.
+			if attempt >= maxAttempts {
+				return op, err
+			}
+			delay := rand.Int63n(initialBackoffMillis * (1 << attempt))
+			clock.Sleep(ctx, time.Duration(delay)*time.Millisecond)
+			attempt++
+		default:
+			// Non-retryable error
+			return op, err
+		}
+		if op.Done {
+			return op, nil
+		}
+	}
+}