arc: Move platform2/arc/network/ to platform2/patchpanel

Next step in the arc-networkd -> patchpanel rename, this patch moves the
location of the code.

BUG=b:151879931
TEST=units,flashed image to atlas
TEST=tasts arc.PlayStore, crostini.LaunchTerminal.download

Change-Id: I1b5cf8d670e1631d46f6449b725395157bf88dde
Reviewed-on: https://chromium-review.googlesource.com/c/chromiumos/platform2/+/2115863
Tested-by: Garrick Evans <garrick@chromium.org>
Commit-Queue: Garrick Evans <garrick@chromium.org>
Reviewed-by: Hidehiko Abe <hidehiko@chromium.org>
Reviewed-by: Eric Caruso <ejcaruso@chromium.org>
Reviewed-by: Chirantan Ekbote <chirantan@chromium.org>
Reviewed-by: Hugo Benichi <hugobenichi@google.com>
diff --git a/patchpanel/message_dispatcher.cc b/patchpanel/message_dispatcher.cc
new file mode 100644
index 0000000..deba22d
--- /dev/null
+++ b/patchpanel/message_dispatcher.cc
@@ -0,0 +1,85 @@
+// Copyright 2016 The Chromium OS Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "patchpanel/message_dispatcher.h"
+
+#include <utility>
+#include <vector>
+
+#include <base/bind.h>
+#include <base/files/scoped_file.h>
+#include <base/logging.h>
+#include <base/posix/unix_domain_socket.h>
+
+namespace patchpanel {
+
+MessageDispatcher::MessageDispatcher(base::ScopedFD fd, bool start)
+    : fd_(std::move(fd)) {
+  if (start)
+    Start();
+}
+
+void MessageDispatcher::Start() {
+  watcher_ = base::FileDescriptorWatcher::WatchReadable(
+      fd_.get(),
+      base::BindRepeating(&MessageDispatcher::OnFileCanReadWithoutBlocking,
+                          base::Unretained(this)));
+}
+
+void MessageDispatcher::RegisterFailureHandler(
+    const base::Callback<void()>& handler) {
+  failure_handler_ = handler;
+}
+
+void MessageDispatcher::RegisterGuestMessageHandler(
+    const base::Callback<void(const GuestMessage&)>& handler) {
+  guest_handler_ = handler;
+}
+
+void MessageDispatcher::RegisterDeviceMessageHandler(
+    const base::Callback<void(const DeviceMessage&)>& handler) {
+  device_handler_ = handler;
+}
+
+void MessageDispatcher::OnFileCanReadWithoutBlocking() {
+  char buffer[1024];
+  std::vector<base::ScopedFD> fds{};
+  ssize_t len =
+      base::UnixDomainSocket::RecvMsg(fd_.get(), buffer, sizeof(buffer), &fds);
+
+  if (len <= 0) {
+    PLOG(ERROR) << "Read failed: exiting";
+    watcher_.reset();
+    if (!failure_handler_.is_null())
+      failure_handler_.Run();
+    return;
+  }
+
+  msg_.Clear();
+  if (!msg_.ParseFromArray(buffer, len)) {
+    LOG(ERROR) << "Error parsing protobuf";
+    return;
+  }
+
+  if (msg_.has_guest_message() && !guest_handler_.is_null()) {
+    guest_handler_.Run(msg_.guest_message());
+  }
+
+  if (msg_.has_device_message() && !device_handler_.is_null()) {
+    device_handler_.Run(msg_.device_message());
+  }
+}
+void MessageDispatcher::SendMessage(
+    const google::protobuf::MessageLite& proto) const {
+  std::string str;
+  if (!proto.SerializeToString(&str)) {
+    LOG(ERROR) << "error serializing protobuf";
+  }
+  if (write(fd_.get(), str.data(), str.size()) !=
+      static_cast<ssize_t>(str.size())) {
+    LOG(ERROR) << "short write on protobuf";
+  }
+}
+
+}  // namespace patchpanel