system-proxy: Accept CONNECT requests

This CL enables workers to process proxy CONNECT requests.
It extracts the url from the incoming request and sets up a
connection to the remote proxy using curl for
authentication.

BUG=chromium:1042626
TEST=unittests

Change-Id: Iebbed5a8229f17aa0f13fb2c7413e084bf276051
Reviewed-on: https://chromium-review.googlesource.com/c/chromiumos/platform2/+/2106136
Reviewed-by: Pavol Marko <pmarko@chromium.org>
Tested-by: Andreea-Elena Costinas <acostinas@google.com>
Commit-Queue: Andreea-Elena Costinas <acostinas@google.com>
diff --git a/system-proxy/BUILD.gn b/system-proxy/BUILD.gn
index 64b1295..92d035a 100644
--- a/system-proxy/BUILD.gn
+++ b/system-proxy/BUILD.gn
@@ -73,7 +73,9 @@
 static_library("libsystemproxyworker") {
   configs += [ ":target_defaults" ]
   sources = [
+    "curl_socket.cc",
     "protobuf_util.cc",
+    "proxy_connect_job.cc",
     "server_proxy.cc",
   ]
   deps = [
@@ -117,7 +119,9 @@
       ":system-proxy_test_config",
       ":target_defaults",
     ]
+    all_dependent_pkg_deps = [ "libcurl" ]
     sources = [
+      "proxy_connect_job_test.cc",
       "server_proxy_test.cc",
       "system_proxy_adaptor_test.cc",
     ]
diff --git a/system-proxy/curl_socket.cc b/system-proxy/curl_socket.cc
new file mode 100644
index 0000000..8182c02
--- /dev/null
+++ b/system-proxy/curl_socket.cc
@@ -0,0 +1,18 @@
+// Copyright 2020 The Chromium OS Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "system-proxy/curl_socket.h"
+
+#include <memory>
+#include <utility>
+
+namespace system_proxy {
+
+CurlSocket::CurlSocket(base::ScopedFD fd, ScopedCurlEasyhandle curl_easyhandle)
+    : arc_networkd::Socket(std::move(fd)),
+      curl_easyhandle_(std::move(curl_easyhandle)) {}
+
+CurlSocket::~CurlSocket() = default;
+
+}  // namespace system_proxy
diff --git a/system-proxy/curl_socket.h b/system-proxy/curl_socket.h
new file mode 100644
index 0000000..99a22a5
--- /dev/null
+++ b/system-proxy/curl_socket.h
@@ -0,0 +1,38 @@
+// Copyright 2020 The Chromium OS Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+#ifndef SYSTEM_PROXY_CURL_SOCKET_H_
+#define SYSTEM_PROXY_CURL_SOCKET_H_
+
+#include <memory>
+
+#include <curl/curl.h>
+#include <curl/easy.h>
+
+#include <arc/network/socket.h>
+#include <base/files/scoped_file.h>
+
+namespace system_proxy {
+
+// Frees the resources allocated by curl_easy_init.
+struct FreeCurlEasyhandle {
+  void operator()(CURL* ptr) const { curl_easy_cleanup(ptr); }
+};
+
+typedef std::unique_ptr<CURL, FreeCurlEasyhandle> ScopedCurlEasyhandle;
+
+// CurlSocket wraps a socket opened by curl in an arc_networkd::Socket object
+// with an owned CURL handle.
+class CurlSocket : public arc_networkd::Socket {
+ public:
+  CurlSocket(base::ScopedFD fd, ScopedCurlEasyhandle curl_easyhandle);
+  CurlSocket(const CurlSocket&) = delete;
+  CurlSocket& operator=(const CurlSocket&) = delete;
+  ~CurlSocket() override;
+
+ private:
+  ScopedCurlEasyhandle curl_easyhandle_;
+};
+}  // namespace system_proxy
+
+#endif  // SYSTEM_PROXY_CURL_SOCKET_H_
diff --git a/system-proxy/proxy_connect_job.cc b/system-proxy/proxy_connect_job.cc
new file mode 100644
index 0000000..0dad121
--- /dev/null
+++ b/system-proxy/proxy_connect_job.cc
@@ -0,0 +1,272 @@
+// Copyright 2020 The Chromium OS Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "system-proxy/proxy_connect_job.h"
+
+#include <algorithm>
+#include <utility>
+#include <vector>
+
+#include <curl/curl.h>
+#include <curl/easy.h>
+
+#include <arc/network/net_util.h>
+#include <arc/network/socket.h>
+#include <arc/network/socket_forwarder.h>
+#include <base/base64.h>
+#include <base/bind.h>
+#include <base/bind_helpers.h>
+#include <base/callback_helpers.h>
+#include <base/files/file_util.h>
+#include <base/strings/stringprintf.h>
+#include <base/strings/string_split.h>
+#include <base/strings/string_util.h>
+#include <base/time/time.h>
+#include <brillo/http/http_transport.h>
+
+#include "system-proxy/curl_socket.h"
+
+// The libarcnetwork-util library overloads << for socket data structures.
+// By C++'s argument-dependent lookup rules, operators defined in a
+// different namespace are not visible. We need the using directive to make
+// the overload available this namespace.
+using arc_networkd::operator<<;
+
+namespace {
+// There's no RFC recomandation for the max size of http request headers but
+// popular http server implementations (Apache, IIS, Tomcat) set the lower limit
+// to 8000.
+constexpr int kMaxHttpRequestHeadersSize = 8000;
+constexpr char kConnectMethod[] = "CONNECT";
+constexpr char kHttpScheme[] = "http://";
+constexpr base::TimeDelta kCurlConnectTimeout = base::TimeDelta::FromMinutes(2);
+constexpr size_t kMaxBadRequestPrintSize = 120;
+
+// HTTP error codes and messages with origin information for debugging (RFC723,
+// section 6.1).
+const std::string_view kHttpBadRequest =
+    "HTTP/1.1 400 Bad Request - Origin: local proxy\r\n\r\n";
+const std::string_view kHttpInternalServerError =
+    "HTTP/1.1 500 Internal Server Error - Origin: local proxy\r\n\r\n";
+const std::string_view kHttpBadGateway =
+    "HTTP/1.1 502 Bad Gateway - Origin: local proxy\r\n\r\n";
+
+static size_t WriteCallback(char* contents,
+                            size_t size,
+                            size_t nmemb,
+                            void* userp) {
+  for (int i = 0; i < nmemb * size; ++i) {
+    ((std::vector<char>*)userp)->push_back(contents[i]);
+  }
+  return size * nmemb;
+}
+
+// Parses the first line of the http CONNECT request and extracts the target
+// url. The destination URI is specified in the request line as the host name
+// and destination port number separated by a colon (RFC2817, section 5.2):
+//      CONNECT server.example.com:80 HTTP/1.1
+// If the first line in |raw_request| (the Request-Line) is a correctly formed
+// CONNECT request, it will return the destination URI as scheme://host:port,
+// otherwise it will return an empty string.
+std::string GetUrlFromHttpHeader(const std::vector<char>& raw_request) {
+  base::StringPiece request(raw_request.data(), raw_request.size());
+  // Request-Line ends with CRLF (RFC2616, section 5.1).
+  size_t i = request.find_first_of("\r\n");
+  if (i == base::StringPiece::npos)
+    return std::string();
+  // Elements are delimited by non-breaking space (SP).
+  auto pieces =
+      base::SplitString(request.substr(0, i), " ", base::TRIM_WHITESPACE,
+                        base::SPLIT_WANT_NONEMPTY);
+  // Request-Line has the format: Method SP Request-URI SP HTTP-Version CRLF.
+  if (pieces.size() < 3)
+    return std::string();
+  if (pieces[0] != kConnectMethod)
+    return std::string();
+
+  return base::JoinString({kHttpScheme, pieces[1]}, "");
+}
+}  // namespace
+
+namespace system_proxy {
+
+ProxyConnectJob::ProxyConnectJob(
+    std::unique_ptr<arc_networkd::Socket> socket,
+    const std::string& credentials,
+    ResolveProxyCallback resolve_proxy_callback,
+    OnConnectionSetupFinishedCallback setup_finished_callback)
+    : credentials_(credentials),
+      resolve_proxy_callback_(std::move(resolve_proxy_callback)),
+      setup_finished_callback_(std::move(setup_finished_callback)) {
+  client_socket_ = std::move(socket);
+}
+
+ProxyConnectJob::~ProxyConnectJob() = default;
+
+bool ProxyConnectJob::Start() {
+  // Make the socket non-blocking.
+  if (!base::SetNonBlocking(client_socket_->fd())) {
+    PLOG(ERROR) << *this << " Failed to mark the socket as non-blocking.";
+    client_socket_->SendTo(kHttpInternalServerError.data(),
+                           kHttpInternalServerError.size());
+    return false;
+  }
+  read_watcher_ = base::FileDescriptorWatcher::WatchReadable(
+      client_socket_->fd(),
+      base::Bind(&ProxyConnectJob::OnClientReadReady, base::Unretained(this)));
+  return true;
+}
+
+void ProxyConnectJob::OnClientReadReady() {
+  // Stop watching.
+  read_watcher_.reset();
+  // The first message should be a HTTP CONNECT request.
+  std::vector<char> connect_request;
+  if (!TryReadHttpHeader(&connect_request)) {
+    std::string encoded;
+    base::Base64Encode(
+        base::StringPiece(connect_request.data(), connect_request.size()),
+        &encoded);
+    LOG(ERROR) << *this
+               << " Failure to read proxy CONNECT request. Base 64 encoded "
+                  "request message from client: "
+               << encoded;
+    OnError(kHttpBadRequest);
+    return;
+  }
+
+  target_url_ = GetUrlFromHttpHeader(connect_request);
+  if (target_url_.empty()) {
+    LOG(ERROR)
+        << *this
+        << " Failed to extract target url from the HTTP CONNECT request.";
+    OnError(kHttpBadRequest);
+    return;
+  }
+
+  std::move(resolve_proxy_callback_)
+      .Run(target_url_, base::Bind(&ProxyConnectJob::OnProxyResolution,
+                                   base::Unretained(this)));
+}
+
+bool ProxyConnectJob::TryReadHttpHeader(std::vector<char>* raw_request) {
+  // Used to identify the end of a HTTP header which should be an empty line.
+  // Note: all HTTP header lines end with CRLF. HTTP connect requests don't have
+  // a body so end of header is end of request.
+  std::string crlf_crlf = "\r\n\r\n";
+  size_t read_byte_count = 0;
+  raw_request->resize(kMaxHttpRequestHeadersSize);
+
+  // Read byte-by-byte and stop when reading an empty line (only CRLF) or when
+  // exceeding the max buffer size.
+  // TODO(acostinas, chromium:1064536) This may have some measurable performance
+  // impact. We should read larger blocks of data, consume the HTTP headers,
+  // cache the tunneled payload that may have already been included (e.g. TLS
+  // ClientHello) and send it to server after the connection is established.
+  while (read_byte_count < kMaxHttpRequestHeadersSize) {
+    if (client_socket_->RecvFrom(raw_request->data() + read_byte_count, 1) <=
+        0) {
+      raw_request->resize(std::min(read_byte_count, kMaxBadRequestPrintSize));
+      return false;
+    }
+    ++read_byte_count;
+
+    // Check if we have an empty line.
+    if (read_byte_count > crlf_crlf.size() &&
+        std::memcmp(crlf_crlf.data(),
+                    raw_request->data() + read_byte_count - crlf_crlf.size(),
+                    crlf_crlf.size()) == 0) {
+      raw_request->resize(read_byte_count);
+      return true;
+    }
+  }
+  return false;
+}
+
+void ProxyConnectJob::OnProxyResolution(
+    const std::list<std::string>& proxy_servers) {
+  proxy_servers_ = proxy_servers;
+  DoCurlServerConnection(proxy_servers.front());
+}
+
+void ProxyConnectJob::DoCurlServerConnection(const std::string& proxy_url) {
+  CURL* easyhandle = curl_easy_init();
+  CURLcode res;
+  int newSocket = -1;
+  std::vector<char> server_connect_reply;
+
+  if (!easyhandle) {
+    // Unfortunately it's not possible to get the failure reason.
+    LOG(ERROR) << *this << " Failure to create curl handle.";
+    curl_easy_cleanup(easyhandle);
+    OnError(kHttpInternalServerError);
+    return;
+  }
+  curl_easy_setopt(easyhandle, CURLOPT_URL, target_url_.c_str());
+
+  if (proxy_url != brillo::http::kDirectProxy) {
+    curl_easy_setopt(easyhandle, CURLOPT_PROXY, proxy_url.c_str());
+    curl_easy_setopt(easyhandle, CURLOPT_HTTPPROXYTUNNEL, 1L);
+    curl_easy_setopt(easyhandle, CURLOPT_CONNECT_ONLY, 1);
+    // Allow libcurl to pick authentication method. Curl will use the most
+    // secure one the remote site claims to support.
+    curl_easy_setopt(easyhandle, CURLOPT_PROXYAUTH, CURLAUTH_ANY);
+    curl_easy_setopt(easyhandle, CURLOPT_PROXYUSERPWD, credentials_.c_str());
+  }
+  curl_easy_setopt(easyhandle, CURLOPT_CONNECTTIMEOUT_MS,
+                   kCurlConnectTimeout.InMilliseconds());
+  curl_easy_setopt(easyhandle, CURLOPT_HEADERFUNCTION, WriteCallback);
+  curl_easy_setopt(easyhandle, CURLOPT_HEADERDATA, server_connect_reply.data());
+
+  res = curl_easy_perform(easyhandle);
+
+  if (res != CURLE_OK) {
+    LOG(ERROR) << *this << " curl_easy_perform() failed with error: ",
+        curl_easy_strerror(res);
+    curl_easy_cleanup(easyhandle);
+    OnError(kHttpInternalServerError);
+    return;
+  }
+  // Extract the socket from the curl handle.
+  res = curl_easy_getinfo(easyhandle, CURLINFO_ACTIVESOCKET, &newSocket);
+  if (res != CURLE_OK) {
+    LOG(ERROR) << *this << " Failed to get socket from curl with error: "
+               << curl_easy_strerror(res);
+    curl_easy_cleanup(easyhandle);
+    OnError(kHttpBadGateway);
+    return;
+  }
+
+  ScopedCurlEasyhandle scoped_handle(easyhandle, FreeCurlEasyhandle());
+  auto server_conn = std::make_unique<CurlSocket>(base::ScopedFD(newSocket),
+                                                  std::move(scoped_handle));
+
+  // Send the server reply to the client. If the connection is successful, the
+  // reply should be "HTTP/1.1 200 Connection Established".
+  client_socket_->SendTo(server_connect_reply.data(),
+                         server_connect_reply.size());
+
+  auto fwd = std::make_unique<arc_networkd::SocketForwarder>(
+      base::StringPrintf("%d-%d", client_socket_->fd(), server_conn->fd()),
+      std::move(client_socket_), std::move(server_conn));
+  // Start forwarding data between sockets.
+  fwd->Start();
+  std::move(setup_finished_callback_).Run(std::move(fwd), this);
+}
+
+void ProxyConnectJob::OnError(const std::string_view& http_error_message) {
+  client_socket_->SendTo(http_error_message.data(), http_error_message.size());
+  std::move(setup_finished_callback_).Run(nullptr, this);
+}
+
+std::ostream& operator<<(std::ostream& stream, const ProxyConnectJob& job) {
+  stream << "{fd: " << job.client_socket_->fd();
+  if (!job.target_url_.empty()) {
+    stream << ", url: " << job.target_url_;
+  }
+  stream << "}";
+  return stream;
+}
+
+}  // namespace system_proxy
diff --git a/system-proxy/proxy_connect_job.h b/system-proxy/proxy_connect_job.h
new file mode 100644
index 0000000..7fc3fd3
--- /dev/null
+++ b/system-proxy/proxy_connect_job.h
@@ -0,0 +1,90 @@
+// Copyright 2020 The Chromium OS Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+#ifndef SYSTEM_PROXY_PROXY_CONNECT_JOB_H_
+#define SYSTEM_PROXY_PROXY_CONNECT_JOB_H_
+
+#include <list>
+#include <memory>
+#include <string>
+#include <string_view>
+#include <vector>
+
+#include <base/callback_forward.h>
+#include <base/files/file_descriptor_watcher_posix.h>
+#include <gtest/gtest_prod.h>  // for FRIEND_TEST
+
+namespace arc_networkd {
+class SocketForwarder;
+class Socket;
+}  // namespace arc_networkd
+
+namespace system_proxy {
+// ProxyConnectJob asynchronously sets up a connection to a remote target on
+// behalf of a client. Internally, it performs the following steps:
+// - waits for the client to send a HTTP connect request;
+// - extracts the target url from the connect request;
+// - requests proxy resolution for the target url and waits for the result;
+// - performs the proxy authentication and connection setup to the remote
+// target.
+class ProxyConnectJob {
+ public:
+  using OnConnectionSetupFinishedCallback = base::OnceCallback<void(
+      std::unique_ptr<arc_networkd::SocketForwarder>, ProxyConnectJob*)>;
+
+  // Will be invoked by ProxyConnectJob to resolve the proxy for |target_url_|.
+  // The passed |callback| is expected to be called with the list of proxy
+  // servers, which will always contain at least one entry, the default proxy.
+  using ResolveProxyCallback = base::OnceCallback<void(
+      const std::string& url,
+      base::OnceCallback<void(const std::list<std::string>&)> callback)>;
+
+  ProxyConnectJob(std::unique_ptr<arc_networkd::Socket> socket,
+                  const std::string& credentials,
+                  ResolveProxyCallback resolve_proxy_callback,
+                  OnConnectionSetupFinishedCallback setup_finished_callback);
+  ProxyConnectJob(const ProxyConnectJob&) = delete;
+  ProxyConnectJob& operator=(const ProxyConnectJob&) = delete;
+  virtual ~ProxyConnectJob();
+
+  // Marks |client_socket_| as non-blocking and adds a watcher that calls
+  // |OnClientReadReady| when the socket is read ready.
+  virtual bool Start();
+  void OnProxyResolution(const std::list<std::string>& proxy_servers);
+
+  friend std::ostream& operator<<(std::ostream& stream,
+                                  const ProxyConnectJob& job);
+
+ private:
+  friend class ServerProxyTest;
+  friend class ProxyConnectJobTest;
+  FRIEND_TEST(ServerProxyTest, HandlePendingJobs);
+  FRIEND_TEST(ProxyConnectJobTest, SuccessfulConnection);
+  FRIEND_TEST(ProxyConnectJobTest, BadHttpRequestWrongMethod);
+  FRIEND_TEST(ProxyConnectJobTest, BadHttpRequestNoEmptyLine);
+
+  // Reads data from the socket into |raw_request| until the first empty line,
+  // which would mark the end of the HTTP request header.
+  // This method does not validate the headers.
+  bool TryReadHttpHeader(std::vector<char>* raw_request);
+
+  // Called when the client socket is ready for reading.
+  void OnClientReadReady();
+
+  // Called from |OnProxyResolution|, after the proxy for |target_url_| is
+  // resolved.
+  void DoCurlServerConnection(const std::string& proxy_url);
+
+  void OnError(const std::string_view& http_error_message);
+
+  std::string target_url_;
+  const std::string credentials_;
+  std::list<std::string> proxy_servers_;
+  ResolveProxyCallback resolve_proxy_callback_;
+  OnConnectionSetupFinishedCallback setup_finished_callback_;
+  std::unique_ptr<arc_networkd::Socket> client_socket_;
+  std::unique_ptr<base::FileDescriptorWatcher::Controller> read_watcher_;
+};
+}  // namespace system_proxy
+
+#endif  // SYSTEM_PROXY_PROXY_CONNECT_JOB_H_
diff --git a/system-proxy/proxy_connect_job_test.cc b/system-proxy/proxy_connect_job_test.cc
new file mode 100644
index 0000000..e8286f9
--- /dev/null
+++ b/system-proxy/proxy_connect_job_test.cc
@@ -0,0 +1,123 @@
+// Copyright 2020 The Chromium OS Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "system-proxy/proxy_connect_job.h"
+
+#include <netinet/in.h>
+#include <sys/socket.h>
+#include <sys/types.h>
+
+#include <gmock/gmock.h>
+#include <gtest/gtest.h>
+#include <utility>
+
+#include <arc/network/socket.h>
+#include <arc/network/socket_forwarder.h>
+#include <base/bind.h>
+#include <base/bind_helpers.h>
+#include <base/callback_helpers.h>
+#include <base/files/file_util.h>
+#include <base/files/scoped_file.h>
+#include <brillo/message_loops/base_message_loop.h>
+
+#include "bindings/worker_common.pb.h"
+#include "system-proxy/protobuf_util.h"
+
+namespace {
+constexpr char kProxyServerUrl[] = "172.0.0.1:8888";
+}  // namespace
+
+namespace system_proxy {
+
+using ::testing::_;
+using ::testing::Return;
+
+class ProxyConnectJobTest : public ::testing::Test {
+ public:
+  ProxyConnectJobTest() = default;
+  ProxyConnectJobTest(const ProxyConnectJobTest&) = delete;
+  ProxyConnectJobTest& operator=(const ProxyConnectJobTest&) = delete;
+  ~ProxyConnectJobTest() = default;
+
+  void SetUp() override {
+    int fds[2];
+    ASSERT_NE(-1,
+              socketpair(AF_UNIX, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC,
+                         0 /* protocol */, fds));
+    cros_client_socket_ =
+        std::make_unique<arc_networkd::Socket>(base::ScopedFD(fds[1]));
+
+    connect_job_ = std::make_unique<ProxyConnectJob>(
+        std::make_unique<arc_networkd::Socket>(base::ScopedFD(fds[0])), "",
+        base::BindOnce(&ProxyConnectJobTest::ResolveProxy,
+                       base::Unretained(this)),
+        base::BindOnce(&ProxyConnectJobTest::OnConnectionSetupFinished,
+                       base::Unretained(this)));
+    connect_job_->Start();
+  }
+
+ protected:
+  void ResolveProxy(
+      const std::string& target_url,
+      base::OnceCallback<void(const std::list<std::string>&)> callback) {
+    std::move(callback).Run({kProxyServerUrl});
+  }
+
+  void OnConnectionSetupFinished(
+      std::unique_ptr<arc_networkd::SocketForwarder> fwd,
+      ProxyConnectJob* connect_job) {
+    ASSERT_EQ(connect_job, connect_job_.get());
+  }
+
+  std::unique_ptr<ProxyConnectJob> connect_job_;
+  base::MessageLoopForIO loop_;
+  brillo::BaseMessageLoop brillo_loop_{&loop_};
+  std::unique_ptr<arc_networkd::Socket> cros_client_socket_;
+};
+
+TEST_F(ProxyConnectJobTest, SuccessfulConnection) {
+  char validConnRequest[] =
+      "CONNECT www.example.server.com:443 HTTP/1.1\r\n\r\n";
+  cros_client_socket_->SendTo(validConnRequest, std::strlen(validConnRequest));
+  brillo_loop_.RunOnce(false);
+
+  EXPECT_EQ("http://www.example.server.com:443", connect_job_->target_url_);
+  EXPECT_EQ(1, connect_job_->proxy_servers_.size());
+  EXPECT_EQ(kProxyServerUrl, connect_job_->proxy_servers_.front());
+}
+
+TEST_F(ProxyConnectJobTest, BadHttpRequestWrongMethod) {
+  char badConnRequest[] = "GET www.example.server.com:443 HTTP/1.1\r\n\r\n";
+  cros_client_socket_->SendTo(badConnRequest, std::strlen(badConnRequest));
+  brillo_loop_.RunOnce(false);
+
+  EXPECT_EQ("", connect_job_->target_url_);
+  EXPECT_EQ(0, connect_job_->proxy_servers_.size());
+  const std::string expected_http_response =
+      "HTTP/1.1 400 Bad Request - Origin: local proxy\r\n\r\n";
+  std::vector<char> buf(expected_http_response.size());
+  ASSERT_TRUE(
+      base::ReadFromFD(cros_client_socket_->fd(), buf.data(), buf.size()));
+  std::string actual_response(buf.data(), buf.size());
+  EXPECT_EQ(expected_http_response, actual_response);
+}
+
+TEST_F(ProxyConnectJobTest, BadHttpRequestNoEmptyLine) {
+  // No empty line after http message.
+  char badConnRequest[] = "CONNECT www.example.server.com:443 HTTP/1.1\r\n";
+  cros_client_socket_->SendTo(badConnRequest, std::strlen(badConnRequest));
+  brillo_loop_.RunOnce(false);
+
+  EXPECT_EQ("", connect_job_->target_url_);
+  EXPECT_EQ(0, connect_job_->proxy_servers_.size());
+  const std::string expected_http_response =
+      "HTTP/1.1 400 Bad Request - Origin: local proxy\r\n\r\n";
+  std::vector<char> buf(expected_http_response.size());
+  ASSERT_TRUE(
+      base::ReadFromFD(cros_client_socket_->fd(), buf.data(), buf.size()));
+  std::string actual_response(buf.data(), buf.size());
+  EXPECT_EQ(expected_http_response, actual_response);
+}
+
+}  // namespace system_proxy
diff --git a/system-proxy/server_proxy.cc b/system-proxy/server_proxy.cc
index 9754e29..e4fc6de 100644
--- a/system-proxy/server_proxy.cc
+++ b/system-proxy/server_proxy.cc
@@ -9,31 +9,48 @@
 #include <utility>
 #include <vector>
 
+#include <arc/network/socket.h>
+#include <arc/network/socket_forwarder.h>
 #include <base/bind.h>
 #include <base/bind_helpers.h>
 #include <base/callback_helpers.h>
 #include <base/posix/eintr_wrapper.h>
 #include <base/files/file_util.h>
+#include <base/strings/string_util.h>
 #include <base/threading/thread.h>
 #include <base/threading/thread_task_runner_handle.h>
+#include <brillo/data_encoding.h>
+#include <brillo/http/http_transport.h>
 
 #include "bindings/worker_common.pb.h"
 #include "system-proxy/protobuf_util.h"
+#include "system-proxy/proxy_connect_job.h"
 
 namespace system_proxy {
 
 namespace {
-const int kMaxConn = 1000;
+
+constexpr int kMaxConn = 100;
+
+// Returns the URL encoded value of |text|. It also verifies if the string was
+// already encoded and, if true it returns it unmodified.
+std::string UrlEncode(const std::string& text) {
+  if (text == brillo::data_encoding::UrlDecode(text.c_str()))
+    return brillo::data_encoding::UrlEncode(text.c_str(), false);
+  return text;
+}
+
 }  // namespace
 
 ServerProxy::ServerProxy(base::OnceClosure quit_closure)
-    : quit_closure_(std::move(quit_closure)) {}
+    : quit_closure_(std::move(quit_closure)), weak_ptr_factory_(this) {}
+ServerProxy::~ServerProxy() = default;
 
 void ServerProxy::Init() {
   // Start listening for input.
   stdin_watcher_ = base::FileDescriptorWatcher::WatchReadable(
-      GetStdinPipe(),
-      base::Bind(&ServerProxy::HandleStdinReadable, base::Unretained(this)));
+      GetStdinPipe(), base::Bind(&ServerProxy::HandleStdinReadable,
+                                 weak_ptr_factory_.GetWeakPtr()));
 
   // Handle termination signals.
   signal_handler_.Init();
@@ -44,7 +61,12 @@
   }
 }
 
-ServerProxy::~ServerProxy() = default;
+void ServerProxy::ResolveProxy(const std::string& target_url,
+                               OnProxyResolvedCallback callback) {
+  // TODO(acostinas, crbug.com/1042626) Ask Chrome to resolve proxy for
+  // |target_url|.
+  std::move(callback).Run({brillo::http::kDirectProxy});
+}
 
 void ServerProxy::HandleStdinReadable() {
   WorkerConfigs config;
@@ -54,8 +76,9 @@
   }
 
   if (config.has_credentials()) {
-    username_ = config.credentials().username();
-    password_ = config.credentials().password();
+    const std::string username = UrlEncode(config.credentials().username());
+    const std::string password = UrlEncode(config.credentials().password());
+    credentials_ = base::JoinString({username.c_str(), password.c_str()}, ":");
   }
 
   if (config.has_listening_address()) {
@@ -100,18 +123,41 @@
   }
 
   fd_watcher_ = base::FileDescriptorWatcher::WatchReadable(
-      listening_fd_->fd(),
-      base::BindRepeating(&ServerProxy::OnConnectionRequest,
-                          base::Unretained(this)));
+      listening_fd_->fd(), base::BindRepeating(&ServerProxy::OnConnectionAccept,
+                                               weak_ptr_factory_.GetWeakPtr()));
 }
 
-void ServerProxy::OnConnectionRequest() {
+void ServerProxy::OnConnectionAccept() {
   struct sockaddr_storage client_src = {};
   socklen_t sockaddr_len = sizeof(client_src);
   if (auto client_conn =
           listening_fd_->Accept((struct sockaddr*)&client_src, &sockaddr_len)) {
-    // TODO(acostinas,chromium:1042626): Do curl authentication.
+    auto connect_job = std::make_unique<ProxyConnectJob>(
+        std::move(client_conn), credentials_,
+        base::BindOnce(&ServerProxy::ResolveProxy, base::Unretained(this)),
+        base::BindOnce(&ServerProxy::OnConnectionSetupFinished,
+                       base::Unretained(this)));
+    if (connect_job->Start())
+      pending_connect_jobs_[connect_job.get()] = std::move(connect_job);
   }
+
+  // Cleanup any defunct forwarders.
+  // TODO(acostinas, chromium:1064536) Monitor the client and server sockets
+  // and remove the corresponding SocketForwarder when a socket closes.
+  for (auto it = forwarders_.begin(); it != forwarders_.end(); ++it) {
+    if (!(*it)->IsRunning() && (*it)->HasBeenStarted())
+      it = forwarders_.erase(it);
+  }
+}
+
+void ServerProxy::OnConnectionSetupFinished(
+    std::unique_ptr<arc_networkd::SocketForwarder> fwd,
+    ProxyConnectJob* connect_job) {
+  if (fwd) {
+    // The connection was set up successfully.
+    forwarders_.emplace_back(std::move(fwd));
+  }
+  pending_connect_jobs_.erase(connect_job);
 }
 
 }  // namespace system_proxy
diff --git a/system-proxy/server_proxy.h b/system-proxy/server_proxy.h
index 0704b14..5096636 100644
--- a/system-proxy/server_proxy.h
+++ b/system-proxy/server_proxy.h
@@ -4,17 +4,31 @@
 #ifndef SYSTEM_PROXY_SERVER_PROXY_H_
 #define SYSTEM_PROXY_SERVER_PROXY_H_
 
+#include <list>
+#include <map>
 #include <memory>
 #include <string>
+#include <vector>
 
-#include <arc/network/socket.h>
 #include <base/callback_forward.h>
 #include <base/files/file_descriptor_watcher_posix.h>
+#include <base/files/scoped_file.h>
+#include <base/memory/weak_ptr.h>
 #include <brillo/asynchronous_signal_handler.h>
 #include <gtest/gtest_prod.h>  // for FRIEND_TEST
 
+namespace arc_networkd {
+class Socket;
+class SocketForwarder;
+}  // namespace arc_networkd
+
 namespace system_proxy {
 
+using OnProxyResolvedCallback =
+    base::OnceCallback<void(const std::list<std::string>&)>;
+
+class ProxyConnectJob;
+
 // ServerProxy listens for connections from the host (system services, ARC++
 // apps) and sets-up connections to the remote server.
 // Note: System-proxy only supports proxying over IPv4 networks.
@@ -27,6 +41,15 @@
 
   void Init();
 
+  // Creates a proxy resolution request that is forwarded to the parent process
+  // trough the standard output. When the request is resolved, the parent
+  // process will send the result trough the standard input.
+  // |callback| will be called when the proxy is resolved, with the list of
+  // proxy servers as parameter ,or in case of failure, with a list containing
+  // only the direct proxy.
+  void ResolveProxy(const std::string& target_url,
+                    OnProxyResolvedCallback callback);
+
  protected:
   virtual int GetStdinPipe();
 
@@ -34,27 +57,48 @@
   friend class ServerProxyTest;
   FRIEND_TEST(ServerProxyTest, FetchCredentials);
   FRIEND_TEST(ServerProxyTest, FetchListeningAddress);
+  FRIEND_TEST(ServerProxyTest, HandleConnectRequest);
+  FRIEND_TEST(ServerProxyTest, HandlePendingJobs);
 
   void HandleStdinReadable();
   bool HandleSignal(const struct signalfd_siginfo& siginfo);
 
   void CreateListeningSocket();
-  void OnConnectionRequest();
+  void OnConnectionAccept();
+
+  // Called by |ProxyConnectJob| after setting up the connection with the remote
+  // server via the remote proxy server. If the connection is successful, |fwd|
+  // corresponds to the tunnel between the client and the server that has
+  // started to forward data. In case of failure, |fwd| is empty.
+  void OnConnectionSetupFinished(
+      std::unique_ptr<arc_networkd::SocketForwarder> fwd,
+      ProxyConnectJob* connect_job);
 
   // The proxy listening address in network-byte order.
   uint32_t listening_addr_ = 0;
   int listening_port_;
 
-  std::string username_;
-  std::string password_;
-
+  // The user name and password to use for proxy authentication in the format
+  // compatible with libcurl's CURLOPT_USERPWD: both user name and password URL
+  // encoded and separated by colon.
+  std::string credentials_;
   std::unique_ptr<arc_networkd::Socket> listening_fd_;
 
+  // List of SocketForwarders that corresponds to the TCP tunnel between the
+  // local client and the remote  proxy, forwarding data between the TCP
+  // connection initiated by the local client to the local proxy and the TCP
+  // connection initiated by the local proxy to the remote proxy.
+  std::list<std::unique_ptr<arc_networkd::SocketForwarder>> forwarders_;
+
+  std::map<ProxyConnectJob*, std::unique_ptr<ProxyConnectJob>>
+      pending_connect_jobs_;
+
   base::OnceClosure quit_closure_;
   std::unique_ptr<base::FileDescriptorWatcher::Controller> stdin_watcher_;
   std::unique_ptr<base::FileDescriptorWatcher::Controller> fd_watcher_;
-
   brillo::AsynchronousSignalHandler signal_handler_;
+
+  base::WeakPtrFactory<ServerProxy> weak_ptr_factory_;
 };
 }  // namespace system_proxy
 
diff --git a/system-proxy/server_proxy_test.cc b/system-proxy/server_proxy_test.cc
index efbdbc0..4f079cb 100644
--- a/system-proxy/server_proxy_test.cc
+++ b/system-proxy/server_proxy_test.cc
@@ -5,31 +5,40 @@
 #include "system-proxy/server_proxy.h"
 
 #include <netinet/in.h>
+#include <sys/socket.h>
+#include <sys/types.h>
 
 #include <gmock/gmock.h>
 #include <gtest/gtest.h>
 #include <utility>
 
+#include <arc/network/socket.h>
+#include <arc/network/socket_forwarder.h>
 #include <base/bind.h>
 #include <base/bind_helpers.h>
 #include <base/callback_helpers.h>
 #include <base/files/file_util.h>
 #include <base/files/scoped_file.h>
 #include <base/message_loop/message_loop.h>
+#include <base/strings/string_util.h>
 #include <brillo/dbus/async_event_sequencer.h>
-#include <brillo/dbus/dbus_object.h>
 #include <brillo/message_loops/base_message_loop.h>
 
 #include "bindings/worker_common.pb.h"
 #include "system-proxy/protobuf_util.h"
+#include "system-proxy/proxy_connect_job.h"
 
 namespace system_proxy {
 namespace {
-constexpr char kUser[] = "proxy_user";
-constexpr char kPassword[] = "proxy_password";
+constexpr char kUsername[] = "proxy:user";
+constexpr char kUsernameEncoded[] = "proxy%3Auser";
+constexpr char kPassword[] = "proxy password";
+constexpr char kPasswordEncoded[] = "proxy%20password";
 constexpr int kTestPort = 3128;
+
 }  // namespace
 
+using ::testing::_;
 using ::testing::Return;
 
 class MockServerProxy : public ServerProxy {
@@ -43,6 +52,23 @@
   MOCK_METHOD(int, GetStdinPipe, (), (override));
 };
 
+class MockProxyConnectJob : public ProxyConnectJob {
+ public:
+  MockProxyConnectJob(std::unique_ptr<arc_networkd::Socket> socket,
+                      const std::string& credentials,
+                      ResolveProxyCallback resolve_proxy_callback,
+                      OnConnectionSetupFinishedCallback setup_finished_callback)
+      : ProxyConnectJob(std::move(socket),
+                        credentials,
+                        std::move(resolve_proxy_callback),
+                        std::move(setup_finished_callback)) {}
+  MockProxyConnectJob(const MockProxyConnectJob&) = delete;
+  MockProxyConnectJob& operator=(const MockProxyConnectJob&) = delete;
+  ~MockProxyConnectJob() override = default;
+
+  MOCK_METHOD(bool, Start, (), (override));
+};
+
 class ServerProxyTest : public ::testing::Test {
  public:
   ServerProxyTest() {
@@ -74,7 +100,7 @@
 
 TEST_F(ServerProxyTest, FetchCredentials) {
   Credentials credentials;
-  credentials.set_username(kUser);
+  credentials.set_username(kUsername);
   credentials.set_password(kPassword);
   WorkerConfigs configs;
   *configs.mutable_credentials() = credentials;
@@ -84,8 +110,9 @@
 
   brillo_loop_.RunOnce(false);
 
-  EXPECT_EQ(server_proxy_->username_, kUser);
-  EXPECT_EQ(server_proxy_->password_, kPassword);
+  std::string expected_credentials =
+      base::JoinString({kUsernameEncoded, kPasswordEncoded}, ":");
+  EXPECT_EQ(server_proxy_->credentials_, expected_credentials);
 }
 
 TEST_F(ServerProxyTest, FetchListeningAddress) {
@@ -104,4 +131,76 @@
   EXPECT_EQ(server_proxy_->listening_port_, kTestPort);
 }
 
+TEST_F(ServerProxyTest, HandleConnectRequest) {
+  server_proxy_->listening_addr_ = htonl(INADDR_LOOPBACK);
+  server_proxy_->listening_port_ = kTestPort;
+  // Redirect the worker stdin and stdout pipes.
+  RedirectStdPipes();
+  server_proxy_->CreateListeningSocket();
+
+  CHECK_NE(-1, server_proxy_->listening_fd_->fd());
+  brillo_loop_.RunOnce(false);
+
+  struct sockaddr_in ipv4addr;
+  ipv4addr.sin_family = AF_INET;
+  ipv4addr.sin_port = htons(kTestPort);
+  ipv4addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
+
+  auto client_socket =
+      std::make_unique<arc_networkd::Socket>(AF_INET, SOCK_STREAM);
+  EXPECT_TRUE(client_socket->Connect((const struct sockaddr*)&ipv4addr,
+                                     sizeof(ipv4addr)));
+  brillo_loop_.RunOnce(false);
+
+  EXPECT_EQ(1, server_proxy_->pending_connect_jobs_.size());
+}
+
+// Tests the |OnConnectionSetupFinished| callback is handled correctly in case
+// of success or error.
+TEST_F(ServerProxyTest, HandlePendingJobs) {
+  int connection_count = 100;
+  int success_count = 51;
+  int failure_count = 49;
+  // Create |connection_count| connections.
+  for (int i = 0; i < connection_count; ++i) {
+    auto client_socket =
+        std::make_unique<arc_networkd::Socket>(AF_INET, SOCK_STREAM);
+    auto mock_connect_job = std::make_unique<MockProxyConnectJob>(
+        std::move(client_socket), "" /* credentials */,
+        base::BindOnce([](const std::string& target_url,
+                          OnProxyResolvedCallback callback) {}),
+        base::BindOnce(&ServerProxy::OnConnectionSetupFinished,
+                       base::Unretained(server_proxy_.get())));
+    server_proxy_->pending_connect_jobs_[mock_connect_job.get()] =
+        std::move(mock_connect_job);
+  }
+  // Resolve |failure_count| pending connections with error.
+  for (int i = 0; i < failure_count; ++i) {
+    auto job_iter = server_proxy_->pending_connect_jobs_.begin();
+    std::move(job_iter->second->setup_finished_callback_)
+        .Run(nullptr, job_iter->first);
+  }
+  // Expect failed requests have been cleared from the pending list and no
+  // forwarder.
+  EXPECT_EQ(success_count, server_proxy_->pending_connect_jobs_.size());
+  EXPECT_EQ(0, server_proxy_->forwarders_.size());
+
+  // Resolve |success_count| successful connections.
+  for (int i = 0; i < success_count; ++i) {
+    auto fwd = std::make_unique<arc_networkd::SocketForwarder>(
+        "" /* thread name */,
+        std::make_unique<arc_networkd::Socket>(AF_INET, SOCK_STREAM),
+        std::make_unique<arc_networkd::Socket>(AF_INET, SOCK_STREAM));
+    fwd->Start();
+    auto job_iter = server_proxy_->pending_connect_jobs_.begin();
+    std::move(job_iter->second->setup_finished_callback_)
+        .Run(std::move(fwd), job_iter->first);
+  }
+
+  // Expect the successful requests to have been cleared and |success_count|
+  // active forwarders.
+  EXPECT_EQ(0, server_proxy_->pending_connect_jobs_.size());
+  EXPECT_EQ(success_count, server_proxy_->forwarders_.size());
+}
+
 }  // namespace system_proxy