patchpanel: Change SocketForwarder exit condition
This CL replaces the exit condition of the SocketForwarder from quitting
as soon as one of the sockets receives a EPOLLRDHUP signal to waiting
for both sockets to receive EOF.
EPOLLRDHUP means that the peer will no longer write to the socket, but
they can still receive data on that socket. In practice, this is
encountered when downloading PlayStore apps through a SocketForwarder.
BUG=chromium:1125177
TEST=downloading PlayStore apps on DUT works
Change-Id: I387cdf413e4a0ececd4453d0783424388de145d1
Reviewed-on: https://chromium-review.googlesource.com/c/chromiumos/platform2/+/2398698
Reviewed-by: Garrick Evans <garrick@chromium.org>
Reviewed-by: Hugo Benichi <hugobenichi@google.com>
Tested-by: Andreea-Elena Costinas <acostinas@google.com>
Commit-Queue: Hugo Benichi <hugobenichi@google.com>
diff --git a/patchpanel/socket_forwarder.cc b/patchpanel/socket_forwarder.cc
index c3404ae..3c149b7 100644
--- a/patchpanel/socket_forwarder.cc
+++ b/patchpanel/socket_forwarder.cc
@@ -23,6 +23,25 @@
constexpr int kWaitTimeoutMs = 1000;
// Maximum number of epoll events to process per wait.
constexpr int kMaxEvents = 4;
+
+std::ostream& operator<<(std::ostream& stream,
+ const struct epoll_event& event) {
+ stream << "{ fd: " << event.data.fd << ", events: 0x" << std::hex
+ << event.events << "}";
+ return stream;
+}
+
+bool SetPollEvents(Socket* socket, int cfd, uint32_t events) {
+ struct epoll_event ev;
+ ev.events = events;
+ ev.data.fd = socket->fd();
+ if (epoll_ctl(cfd, EPOLL_CTL_MOD, socket->fd(), &ev) == -1) {
+ PLOG(ERROR) << "epoll_ctl(" << ev << ") failed";
+ return false;
+ }
+ return true;
+}
+
} // namespace
SocketForwarder::SocketForwarder(const std::string& name,
@@ -33,6 +52,7 @@
sock1_(std::move(sock1)),
len0_(0),
len1_(0),
+ eof_(-1),
poll_(false),
done_(false) {
DCHECK(sock0_);
@@ -49,6 +69,10 @@
return !done_;
}
+void SocketForwarder::SetStopQuitClosureForTesting(base::OnceClosure closure) {
+ stop_quit_closure_for_testing_ = std::move(closure);
+}
+
void SocketForwarder::Run() {
LOG(INFO) << "Starting forwarder: " << *sock0_ << " <-> " << *sock1_;
@@ -58,6 +82,8 @@
fcntl(sock1_->fd(), F_SETFL,
fcntl(sock1_->fd(), F_GETFL, 0) | O_NONBLOCK) < 0) {
PLOG(ERROR) << "fcntl failed";
+ if (stop_quit_closure_for_testing_)
+ std::move(stop_quit_closure_for_testing_).Run();
return;
}
@@ -67,6 +93,8 @@
done_ = true;
sock1_.reset();
sock0_.reset();
+ if (stop_quit_closure_for_testing_)
+ std::move(stop_quit_closure_for_testing_).Run();
}
void SocketForwarder::Poll() {
@@ -76,7 +104,7 @@
return;
}
struct epoll_event ev;
- ev.events = EPOLLIN | EPOLLRDHUP;
+ ev.events = EPOLLIN;
ev.data.fd = sock0_->fd();
if (epoll_ctl(cfd.get(), EPOLL_CTL_ADD, sock0_->fd(), &ev) == -1) {
PLOG(ERROR) << "epoll_ctl failed";
@@ -117,10 +145,6 @@
<< *sock1_;
return false;
}
- if (events & (EPOLLHUP | EPOLLRDHUP)) {
- LOG(INFO) << "Peer closed connection: " << *sock0_ << " <-> " << *sock1_;
- return false;
- }
if (events & EPOLLOUT) {
Socket* dst;
@@ -149,44 +173,39 @@
memmove(&buf[0], &buf[bytes], *len - bytes);
*len -= bytes;
- if (*len == 0) {
- struct epoll_event ev;
- ev.events = EPOLLIN | EPOLLRDHUP;
- ev.data.fd = dst->fd();
- if (epoll_ctl(cfd, EPOLL_CTL_MOD, dst->fd(), &ev) == -1) {
- PLOG(ERROR) << "epoll_ctl failed";
- return false;
- }
- }
+ // If all the buffered data was written to the socket and the peer socket is
+ // still open for writing, listen for read events on the socket.
+ if (*len == 0 && eof_ != dst->fd() && !SetPollEvents(dst, cfd, EPOLLIN))
+ return false;
}
+ Socket *src, *dst;
+ char* buf;
+ ssize_t* len;
+ if (sock0_->fd() == efd) {
+ src = sock0_.get();
+ dst = sock1_.get();
+ buf = buf0_;
+ len = &len0_;
+ } else {
+ src = sock1_.get();
+ dst = sock0_.get();
+ buf = buf1_;
+ len = &len1_;
+ }
+
+ // Skip the read if this buffer is still pending write: requires that
+ // epoll_wait is in level-triggered mode.
+ if (*len > 0)
+ return true;
+
if (events & EPOLLIN) {
- Socket *src, *dst;
- char* buf;
- ssize_t* len;
- if (sock0_->fd() == efd) {
- src = sock0_.get();
- dst = sock1_.get();
- buf = buf0_;
- len = &len0_;
- } else {
- src = sock1_.get();
- dst = sock0_.get();
- buf = buf1_;
- len = &len1_;
- }
-
- // Skip the read if this buffer is still pending write: requires that
- // epoll_wait is in level-triggered mode.
- if (*len > 0)
- return true;
-
*len = src->RecvFrom(buf, kBufSize);
if (*len < 0)
return false;
if (*len == 0)
- return true;
+ return HandleConnectionClosed(src, dst, cfd);
ssize_t bytes = dst->SendTo(buf, *len);
if (bytes < 0)
@@ -199,18 +218,41 @@
*len -= bytes;
}
- if (*len > 0) {
- struct epoll_event ev;
- ev.events = EPOLLOUT | EPOLLRDHUP;
- ev.data.fd = dst->fd();
- if (epoll_ctl(cfd, EPOLL_CTL_MOD, dst->fd(), &ev) == -1) {
- PLOG(ERROR) << "epoll_ctl failed";
- return false;
- }
- }
+ if (*len > 0 && !SetPollEvents(dst, cfd, EPOLLOUT))
+ return false;
}
+ if (events & EPOLLHUP) {
+ LOG(INFO) << "Peer closed connection: " << *sock0_ << " <-> " << *sock1_;
+ return false;
+ }
return true;
}
+bool SocketForwarder::HandleConnectionClosed(Socket* src,
+ Socket* dst,
+ int cfd) {
+ LOG(INFO) << "Peer closed connection: " << *src;
+ if (eof_ == dst->fd()) {
+ // Stop the forwarder since the other peer has already closed the
+ // connection.
+ LOG(INFO) << "Closed connection: " << *sock0_ << " <-> " << *sock1_;
+ return false;
+ }
+ // Stop listening for read ready events from |src|.
+ if (!SetPollEvents(src, cfd, 0))
+ return false;
+
+ // Propagate the shut down for writing to the other peer. This is safe
+ // to do since reading the EOF on |src| only happens if the buffer
+ // associated with the |src| socket if empty, so there's no outstanding
+ // data to be written to |dst|.
+ if (shutdown(dst->fd(), SHUT_WR) == -1) {
+ PLOG(ERROR) << "Shutting down " << *socket << " for writing failed";
+ return false;
+ }
+
+ eof_ = src->fd();
+ return true;
+}
} // namespace patchpanel