Revert "Take out listen support from AsyncPacketSocket"
This reverts commit b141c162ee2ef88a7498ba8cb8bc852287f93ad2.
Reason for revert: Breaking WebRTC rolls. See https://ci.chromium.org/ui/b/8832847811929676465 for an example failed build.
Original change's description:
> Take out listen support from AsyncPacketSocket
>
> Moved to new interface class AsyncListenSocket.
>
> Bug: webrtc:13065
> Change-Id: Ib96ce154ba19979360ecd8144981d947ff5b8b18
> Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/232607
> Reviewed-by: Mirko Bonadei <mbonadei@webrtc.org>
> Reviewed-by: Harald Alvestrand <hta@webrtc.org>
> Commit-Queue: Niels Moller <nisse@webrtc.org>
> Cr-Commit-Position: refs/heads/main@{#35234}
# Not skipping CQ checks because original CL landed > 1 day ago.
Bug: webrtc:13065
Change-Id: Id5d5b35cb21704ca4e3006caf1636906df062609
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/235824
Reviewed-by: Evan Shrubsole <eshr@webrtc.org>
Reviewed-by: Mirko Bonadei <mbonadei@webrtc.org>
Reviewed-by: Tommi <tommi@webrtc.org>
Commit-Queue: Evan Shrubsole <eshr@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#35249}
diff --git a/rtc_base/async_tcp_socket.cc b/rtc_base/async_tcp_socket.cc
index 37a1052..76efb6d 100644
--- a/rtc_base/async_tcp_socket.cc
+++ b/rtc_base/async_tcp_socket.cc
@@ -62,11 +62,16 @@
}
AsyncTCPSocketBase::AsyncTCPSocketBase(Socket* socket,
+ bool listen,
size_t max_packet_size)
: socket_(socket),
+ listen_(listen),
max_insize_(max_packet_size),
max_outsize_(max_packet_size) {
- inbuf_.EnsureCapacity(kMinimumRecvSize);
+ if (!listen_) {
+ // Listening sockets don't send/receive data, so they don't need buffers.
+ inbuf_.EnsureCapacity(kMinimumRecvSize);
+ }
RTC_DCHECK(socket_.get() != nullptr);
socket_->SignalConnectEvent.connect(this,
@@ -74,6 +79,12 @@
socket_->SignalReadEvent.connect(this, &AsyncTCPSocketBase::OnReadEvent);
socket_->SignalWriteEvent.connect(this, &AsyncTCPSocketBase::OnWriteEvent);
socket_->SignalCloseEvent.connect(this, &AsyncTCPSocketBase::OnCloseEvent);
+
+ if (listen_) {
+ if (socket_->Listen(kListenBacklog) < 0) {
+ RTC_LOG(LS_ERROR) << "Listen() failed with error " << socket_->GetError();
+ }
+ }
}
AsyncTCPSocketBase::~AsyncTCPSocketBase() {}
@@ -95,7 +106,11 @@
case Socket::CS_CLOSED:
return STATE_CLOSED;
case Socket::CS_CONNECTING:
- return STATE_CONNECTING;
+ if (listen_) {
+ return STATE_BOUND;
+ } else {
+ return STATE_CONNECTING;
+ }
case Socket::CS_CONNECTED:
return STATE_CONNECTED;
default:
@@ -134,6 +149,7 @@
}
int AsyncTCPSocketBase::FlushOutBuffer() {
+ RTC_DCHECK(!listen_);
RTC_DCHECK_GT(outbuf_.size(), 0);
rtc::ArrayView<uint8_t> view = outbuf_;
int res;
@@ -173,6 +189,7 @@
void AsyncTCPSocketBase::AppendToOutBuffer(const void* pv, size_t cb) {
RTC_DCHECK(outbuf_.size() + cb <= max_outsize_);
+ RTC_DCHECK(!listen_);
outbuf_.AppendData(static_cast<const uint8_t*>(pv), cb);
}
@@ -183,44 +200,62 @@
void AsyncTCPSocketBase::OnReadEvent(Socket* socket) {
RTC_DCHECK(socket_.get() == socket);
- size_t total_recv = 0;
- while (true) {
- size_t free_size = inbuf_.capacity() - inbuf_.size();
- if (free_size < kMinimumRecvSize && inbuf_.capacity() < max_insize_) {
- inbuf_.EnsureCapacity(std::min(max_insize_, inbuf_.capacity() * 2));
- free_size = inbuf_.capacity() - inbuf_.size();
+ if (listen_) {
+ rtc::SocketAddress address;
+ rtc::Socket* new_socket = socket->Accept(&address);
+ if (!new_socket) {
+ // TODO(stefan): Do something better like forwarding the error
+ // to the user.
+ RTC_LOG(LS_ERROR) << "TCP accept failed with error "
+ << socket_->GetError();
+ return;
}
- int len = socket_->Recv(inbuf_.data() + inbuf_.size(), free_size, nullptr);
- if (len < 0) {
- // TODO(stefan): Do something better like forwarding the error to the
- // user.
- if (!socket_->IsBlocking()) {
- RTC_LOG(LS_ERROR) << "Recv() returned error: " << socket_->GetError();
- }
- break;
- }
+ HandleIncomingConnection(new_socket);
- total_recv += len;
- inbuf_.SetSize(inbuf_.size() + len);
- if (!len || static_cast<size_t>(len) < free_size) {
- break;
- }
- }
-
- if (!total_recv) {
- return;
- }
-
- size_t size = inbuf_.size();
- ProcessInput(inbuf_.data<char>(), &size);
-
- if (size > inbuf_.size()) {
- RTC_LOG(LS_ERROR) << "input buffer overflow";
- RTC_NOTREACHED();
- inbuf_.Clear();
+ // Prime a read event in case data is waiting.
+ new_socket->SignalReadEvent(new_socket);
} else {
- inbuf_.SetSize(size);
+ size_t total_recv = 0;
+ while (true) {
+ size_t free_size = inbuf_.capacity() - inbuf_.size();
+ if (free_size < kMinimumRecvSize && inbuf_.capacity() < max_insize_) {
+ inbuf_.EnsureCapacity(std::min(max_insize_, inbuf_.capacity() * 2));
+ free_size = inbuf_.capacity() - inbuf_.size();
+ }
+
+ int len =
+ socket_->Recv(inbuf_.data() + inbuf_.size(), free_size, nullptr);
+ if (len < 0) {
+ // TODO(stefan): Do something better like forwarding the error to the
+ // user.
+ if (!socket_->IsBlocking()) {
+ RTC_LOG(LS_ERROR) << "Recv() returned error: " << socket_->GetError();
+ }
+ break;
+ }
+
+ total_recv += len;
+ inbuf_.SetSize(inbuf_.size() + len);
+ if (!len || static_cast<size_t>(len) < free_size) {
+ break;
+ }
+ }
+
+ if (!total_recv) {
+ return;
+ }
+
+ size_t size = inbuf_.size();
+ ProcessInput(inbuf_.data<char>(), &size);
+
+ if (size > inbuf_.size()) {
+ RTC_LOG(LS_ERROR) << "input buffer overflow";
+ RTC_NOTREACHED();
+ inbuf_.Clear();
+ } else {
+ inbuf_.SetSize(size);
+ }
}
}
@@ -248,11 +283,12 @@
const SocketAddress& bind_address,
const SocketAddress& remote_address) {
return new AsyncTCPSocket(
- AsyncTCPSocketBase::ConnectSocket(socket, bind_address, remote_address));
+ AsyncTCPSocketBase::ConnectSocket(socket, bind_address, remote_address),
+ false);
}
-AsyncTCPSocket::AsyncTCPSocket(Socket* socket)
- : AsyncTCPSocketBase(socket, kBufSize) {}
+AsyncTCPSocket::AsyncTCPSocket(Socket* socket, bool listen)
+ : AsyncTCPSocketBase(socket, listen, kBufSize) {}
int AsyncTCPSocket::Send(const void* pv,
size_t cb,
@@ -307,59 +343,8 @@
}
}
-AsyncTcpListenSocket::AsyncTcpListenSocket(std::unique_ptr<Socket> socket)
- : socket_(std::move(socket)) {
- RTC_DCHECK(socket_.get() != nullptr);
- socket_->SignalReadEvent.connect(this, &AsyncTcpListenSocket::OnReadEvent);
- if (socket_->Listen(kListenBacklog) < 0) {
- RTC_LOG(LS_ERROR) << "Listen() failed with error " << socket_->GetError();
- }
-}
-
-AsyncTcpListenSocket::State AsyncTcpListenSocket::GetState() const {
- switch (socket_->GetState()) {
- case Socket::CS_CLOSED:
- return State::kClosed;
- case Socket::CS_CONNECTING:
- return State::kBound;
- default:
- RTC_NOTREACHED();
- return State::kClosed;
- }
-}
-
-SocketAddress AsyncTcpListenSocket::GetLocalAddress() const {
- return socket_->GetLocalAddress();
-}
-
-int AsyncTcpListenSocket::GetOption(Socket::Option opt, int* value) {
- return socket_->GetOption(opt, value);
-}
-
-int AsyncTcpListenSocket::SetOption(Socket::Option opt, int value) {
- return socket_->SetOption(opt, value);
-}
-
-void AsyncTcpListenSocket::OnReadEvent(Socket* socket) {
- RTC_DCHECK(socket_.get() == socket);
-
- rtc::SocketAddress address;
- rtc::Socket* new_socket = socket->Accept(&address);
- if (!new_socket) {
- // TODO(stefan): Do something better like forwarding the error
- // to the user.
- RTC_LOG(LS_ERROR) << "TCP accept failed with error " << socket_->GetError();
- return;
- }
-
- HandleIncomingConnection(new_socket);
-
- // Prime a read event in case data is waiting.
- new_socket->SignalReadEvent(new_socket);
-}
-
-void AsyncTcpListenSocket::HandleIncomingConnection(Socket* socket) {
- SignalNewConnection(this, new AsyncTCPSocket(socket));
+void AsyncTCPSocket::HandleIncomingConnection(Socket* socket) {
+ SignalNewConnection(this, new AsyncTCPSocket(socket, false));
}
} // namespace rtc