system-proxy: Accept CONNECT requests

This CL enables workers to process proxy CONNECT requests.
It extracts the url from the incoming request and sets up a
connection to the remote proxy using curl for
authentication.

BUG=chromium:1042626
TEST=unittests

Change-Id: Iebbed5a8229f17aa0f13fb2c7413e084bf276051
Reviewed-on: https://chromium-review.googlesource.com/c/chromiumos/platform2/+/2106136
Reviewed-by: Pavol Marko <pmarko@chromium.org>
Tested-by: Andreea-Elena Costinas <acostinas@google.com>
Commit-Queue: Andreea-Elena Costinas <acostinas@google.com>
diff --git a/system-proxy/server_proxy.cc b/system-proxy/server_proxy.cc
index 9754e29..e4fc6de 100644
--- a/system-proxy/server_proxy.cc
+++ b/system-proxy/server_proxy.cc
@@ -9,31 +9,48 @@
 #include <utility>
 #include <vector>
 
+#include <arc/network/socket.h>
+#include <arc/network/socket_forwarder.h>
 #include <base/bind.h>
 #include <base/bind_helpers.h>
 #include <base/callback_helpers.h>
 #include <base/posix/eintr_wrapper.h>
 #include <base/files/file_util.h>
+#include <base/strings/string_util.h>
 #include <base/threading/thread.h>
 #include <base/threading/thread_task_runner_handle.h>
+#include <brillo/data_encoding.h>
+#include <brillo/http/http_transport.h>
 
 #include "bindings/worker_common.pb.h"
 #include "system-proxy/protobuf_util.h"
+#include "system-proxy/proxy_connect_job.h"
 
 namespace system_proxy {
 
 namespace {
-const int kMaxConn = 1000;
+
+constexpr int kMaxConn = 100;
+
+// Returns the URL encoded value of |text|. It also verifies if the string was
+// already encoded and, if true it returns it unmodified.
+std::string UrlEncode(const std::string& text) {
+  if (text == brillo::data_encoding::UrlDecode(text.c_str()))
+    return brillo::data_encoding::UrlEncode(text.c_str(), false);
+  return text;
+}
+
 }  // namespace
 
 ServerProxy::ServerProxy(base::OnceClosure quit_closure)
-    : quit_closure_(std::move(quit_closure)) {}
+    : quit_closure_(std::move(quit_closure)), weak_ptr_factory_(this) {}
+ServerProxy::~ServerProxy() = default;
 
 void ServerProxy::Init() {
   // Start listening for input.
   stdin_watcher_ = base::FileDescriptorWatcher::WatchReadable(
-      GetStdinPipe(),
-      base::Bind(&ServerProxy::HandleStdinReadable, base::Unretained(this)));
+      GetStdinPipe(), base::Bind(&ServerProxy::HandleStdinReadable,
+                                 weak_ptr_factory_.GetWeakPtr()));
 
   // Handle termination signals.
   signal_handler_.Init();
@@ -44,7 +61,12 @@
   }
 }
 
-ServerProxy::~ServerProxy() = default;
+void ServerProxy::ResolveProxy(const std::string& target_url,
+                               OnProxyResolvedCallback callback) {
+  // TODO(acostinas, crbug.com/1042626) Ask Chrome to resolve proxy for
+  // |target_url|.
+  std::move(callback).Run({brillo::http::kDirectProxy});
+}
 
 void ServerProxy::HandleStdinReadable() {
   WorkerConfigs config;
@@ -54,8 +76,9 @@
   }
 
   if (config.has_credentials()) {
-    username_ = config.credentials().username();
-    password_ = config.credentials().password();
+    const std::string username = UrlEncode(config.credentials().username());
+    const std::string password = UrlEncode(config.credentials().password());
+    credentials_ = base::JoinString({username.c_str(), password.c_str()}, ":");
   }
 
   if (config.has_listening_address()) {
@@ -100,18 +123,41 @@
   }
 
   fd_watcher_ = base::FileDescriptorWatcher::WatchReadable(
-      listening_fd_->fd(),
-      base::BindRepeating(&ServerProxy::OnConnectionRequest,
-                          base::Unretained(this)));
+      listening_fd_->fd(), base::BindRepeating(&ServerProxy::OnConnectionAccept,
+                                               weak_ptr_factory_.GetWeakPtr()));
 }
 
-void ServerProxy::OnConnectionRequest() {
+void ServerProxy::OnConnectionAccept() {
   struct sockaddr_storage client_src = {};
   socklen_t sockaddr_len = sizeof(client_src);
   if (auto client_conn =
           listening_fd_->Accept((struct sockaddr*)&client_src, &sockaddr_len)) {
-    // TODO(acostinas,chromium:1042626): Do curl authentication.
+    auto connect_job = std::make_unique<ProxyConnectJob>(
+        std::move(client_conn), credentials_,
+        base::BindOnce(&ServerProxy::ResolveProxy, base::Unretained(this)),
+        base::BindOnce(&ServerProxy::OnConnectionSetupFinished,
+                       base::Unretained(this)));
+    if (connect_job->Start())
+      pending_connect_jobs_[connect_job.get()] = std::move(connect_job);
   }
+
+  // Cleanup any defunct forwarders.
+  // TODO(acostinas, chromium:1064536) Monitor the client and server sockets
+  // and remove the corresponding SocketForwarder when a socket closes.
+  for (auto it = forwarders_.begin(); it != forwarders_.end(); ++it) {
+    if (!(*it)->IsRunning() && (*it)->HasBeenStarted())
+      it = forwarders_.erase(it);
+  }
+}
+
+void ServerProxy::OnConnectionSetupFinished(
+    std::unique_ptr<arc_networkd::SocketForwarder> fwd,
+    ProxyConnectJob* connect_job) {
+  if (fwd) {
+    // The connection was set up successfully.
+    forwarders_.emplace_back(std::move(fwd));
+  }
+  pending_connect_jobs_.erase(connect_job);
 }
 
 }  // namespace system_proxy