Adds simulated TCP message route for testing.

This TCP message route allows simulation of sending a fixed lengths message
over an existing route. This can be used to simulate reliable signaling in
tests as well as simulating the cross traffic impact of TCP connection.

It is based on the existing Fake TCP cross traffic implementation.

Bug: webrtc:9510
Change-Id: Ibfc2a9a5b95593b00db16de2c09ce929077cf5c5
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/159482
Commit-Queue: Sebastian Jansson <srte@webrtc.org>
Reviewed-by: Niels Moller <nisse@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#29777}
diff --git a/test/network/BUILD.gn b/test/network/BUILD.gn
index c1ad2a5..19b8d94 100644
--- a/test/network/BUILD.gn
+++ b/test/network/BUILD.gn
@@ -120,6 +120,7 @@
     "../../call:simulated_network",
     "../../rtc_base:logging",
     "../../rtc_base:rtc_event",
+    "//test/time_controller:time_controller",
     "//third_party/abseil-cpp/absl/memory",
   ]
 }
diff --git a/test/network/cross_traffic.cc b/test/network/cross_traffic.cc
index 0d1937f..07ca171 100644
--- a/test/network/cross_traffic.cc
+++ b/test/network/cross_traffic.cc
@@ -16,6 +16,7 @@
 
 #include "absl/memory/memory.h"
 #include "absl/types/optional.h"
+#include "cross_traffic.h"
 #include "rtc_base/logging.h"
 #include "rtc_base/numerics/safe_minmax.h"
 
@@ -115,6 +116,127 @@
       32);
 }
 
+TcpMessageRoute::TcpMessageRoute(Clock* clock,
+                                 TaskQueueBase* task_queue,
+                                 EmulatedRoute* send_route,
+                                 EmulatedRoute* ret_route)
+    : clock_(clock),
+      task_queue_(task_queue),
+      request_route_(send_route,
+                     [this](TcpPacket packet, Timestamp) {
+                       OnRequest(std::move(packet));
+                     }),
+      response_route_(ret_route,
+                      [this](TcpPacket packet, Timestamp arrival_time) {
+                        OnResponse(std::move(packet), arrival_time);
+                      }) {}
+
+void TcpMessageRoute::SendMessage(size_t size,
+                                  std::function<void()> on_received) {
+  task_queue_->PostTask(
+      ToQueuedTask([this, size, handler = std::move(on_received)] {
+        // If we are currently sending a message we won't reset the connection,
+        // we'll act as if the messages are sent in the same TCP stream. This is
+        // intended to simulate recreation of a TCP session for each message
+        // in the typical case while avoiding the complexity overhead of
+        // maintaining multiple virtual TCP sessions in parallel.
+        if (pending_.empty() && in_flight_.empty()) {
+          cwnd_ = 10;
+          ssthresh_ = INFINITY;
+        }
+        size_t data_left = size;
+        size_t kMaxPacketSize = 1200;
+        Message message{std::move(handler)};
+        while (data_left > 0) {
+          size_t packet_size = std::min(data_left, kMaxPacketSize);
+          int fragment_id = next_fragment_id_++;
+          pending_.push_back(MessageFragment{fragment_id, packet_size});
+          message.pending_fragment_ids.insert(fragment_id);
+          data_left -= packet_size;
+        }
+        messages_.emplace_back(message);
+        SendPackets(clock_->CurrentTime());
+      }));
+}
+
+void TcpMessageRoute::OnRequest(TcpPacket packet_info) {
+  for (auto it = messages_.begin(); it != messages_.end(); ++it) {
+    if (it->pending_fragment_ids.count(packet_info.fragment.fragment_id) != 0) {
+      it->pending_fragment_ids.erase(packet_info.fragment.fragment_id);
+      if (it->pending_fragment_ids.empty()) {
+        it->handler();
+        messages_.erase(it);
+      }
+      break;
+    }
+  }
+  const size_t kAckPacketSize = 20;
+  response_route_.SendPacket(kAckPacketSize, packet_info);
+}
+
+void TcpMessageRoute::OnResponse(TcpPacket packet_info, Timestamp at_time) {
+  auto it = in_flight_.find(packet_info.sequence_number);
+  if (it != in_flight_.end()) {
+    last_rtt_ = at_time - packet_info.send_time;
+    in_flight_.erase(it);
+  }
+  auto lost_end = in_flight_.lower_bound(packet_info.sequence_number);
+  for (auto lost_it = in_flight_.begin(); lost_it != lost_end;
+       lost_it = in_flight_.erase(lost_it)) {
+    pending_.push_front(lost_it->second.fragment);
+  }
+
+  if (packet_info.sequence_number - last_acked_seq_num_ > 1) {
+    HandleLoss(at_time);
+  } else if (cwnd_ <= ssthresh_) {
+    cwnd_ += 1;
+  } else {
+    cwnd_ += 1.0f / cwnd_;
+  }
+  last_acked_seq_num_ =
+      std::max(packet_info.sequence_number, last_acked_seq_num_);
+  SendPackets(at_time);
+}
+
+void TcpMessageRoute::HandleLoss(Timestamp at_time) {
+  if (at_time - last_reduction_time_ < last_rtt_)
+    return;
+  last_reduction_time_ = at_time;
+  ssthresh_ = std::max(static_cast<int>(in_flight_.size() / 2), 2);
+  cwnd_ = ssthresh_;
+}
+
+void TcpMessageRoute::SendPackets(Timestamp at_time) {
+  const TimeDelta kPacketTimeout = TimeDelta::seconds(1);
+  int cwnd = std::ceil(cwnd_);
+  int packets_to_send = std::max(cwnd - static_cast<int>(in_flight_.size()), 0);
+  while (packets_to_send-- > 0 && !pending_.empty()) {
+    auto seq_num = next_sequence_number_++;
+    TcpPacket send;
+    send.sequence_number = seq_num;
+    send.send_time = at_time;
+    send.fragment = pending_.front();
+    pending_.pop_front();
+    request_route_.SendPacket(send.fragment.size, send);
+    in_flight_.insert({seq_num, send});
+    task_queue_->PostDelayedTask(ToQueuedTask([this, seq_num] {
+                                   HandlePacketTimeout(seq_num,
+                                                       clock_->CurrentTime());
+                                 }),
+                                 kPacketTimeout.ms());
+  }
+}
+
+void TcpMessageRoute::HandlePacketTimeout(int seq_num, Timestamp at_time) {
+  auto lost = in_flight_.find(seq_num);
+  if (lost != in_flight_.end()) {
+    pending_.push_front(lost->second.fragment);
+    in_flight_.erase(lost);
+    HandleLoss(at_time);
+    SendPackets(at_time);
+  }
+}
+
 FakeTcpCrossTraffic::FakeTcpCrossTraffic(Clock* clock,
                                          FakeTcpConfig config,
                                          EmulatedRoute* send_route,
diff --git a/test/network/cross_traffic.h b/test/network/cross_traffic.h
index 98e56d6..aba02e1 100644
--- a/test/network/cross_traffic.h
+++ b/test/network/cross_traffic.h
@@ -92,6 +92,68 @@
   bool sending_ RTC_GUARDED_BY(sequence_checker_) = false;
 };
 
+// Simulates a TCP connection, this roughly implements the Reno algorithm. In
+// difference from TCP this only support sending messages with a fixed length,
+// no streaming. This is useful to simulate signaling and cross traffic using
+// message based protocols such as HTTP. It differs from UDP messages in that
+// they are guranteed to be delivered eventually, even on lossy networks.
+class TcpMessageRoute {
+ public:
+  TcpMessageRoute(Clock* clock,
+                  TaskQueueBase* task_queue,
+                  EmulatedRoute* send_route,
+                  EmulatedRoute* ret_route);
+
+  // Sends a TCP message of the given |size| over the route, |on_received| is
+  // called when the message has been delivered. Note that the connection
+  // parameters are reset iff there's no currently pending message on the route.
+  void SendMessage(size_t size, std::function<void()> on_received);
+
+ private:
+  // Represents a message sent over the route. When all fragments has been
+  // delivered, the message is considered delivered and the handler is
+  // triggered. This only happen once.
+  struct Message {
+    std::function<void()> handler;
+    std::set<int> pending_fragment_ids;
+  };
+  // Represents a piece of a message that fit into a TCP packet.
+  struct MessageFragment {
+    int fragment_id;
+    size_t size;
+  };
+  // Represents a packet sent on the wire.
+  struct TcpPacket {
+    int sequence_number;
+    Timestamp send_time = Timestamp::MinusInfinity();
+    MessageFragment fragment;
+  };
+
+  void OnRequest(TcpPacket packet_info);
+  void OnResponse(TcpPacket packet_info, Timestamp at_time);
+  void HandleLoss(Timestamp at_time);
+  void SendPackets(Timestamp at_time);
+  void HandlePacketTimeout(int seq_num, Timestamp at_time);
+
+  Clock* const clock_;
+  TaskQueueBase* const task_queue_;
+  FakePacketRoute<TcpPacket> request_route_;
+  FakePacketRoute<TcpPacket> response_route_;
+
+  std::deque<MessageFragment> pending_;
+  std::map<int, TcpPacket> in_flight_;
+  std::list<Message> messages_;
+
+  double cwnd_;
+  double ssthresh_;
+
+  int last_acked_seq_num_ = 0;
+  int next_sequence_number_ = 0;
+  int next_fragment_id_ = 0;
+  Timestamp last_reduction_time_ = Timestamp::MinusInfinity();
+  TimeDelta last_rtt_ = TimeDelta::Zero();
+};
+
 struct FakeTcpConfig {
   DataSize packet_size = DataSize::bytes(1200);
   DataSize send_limit = DataSize::PlusInfinity();
diff --git a/test/network/cross_traffic_unittest.cc b/test/network/cross_traffic_unittest.cc
index 19d3bc3..cfa80cd 100644
--- a/test/network/cross_traffic_unittest.cc
+++ b/test/network/cross_traffic_unittest.cc
@@ -22,6 +22,8 @@
 #include "rtc_base/logging.h"
 #include "test/gmock.h"
 #include "test/gtest.h"
+#include "test/network/network_emulation_manager.h"
+#include "test/time_controller/simulated_time_controller.h"
 
 namespace webrtc {
 namespace test {
@@ -110,5 +112,40 @@
               kExpectedDataSent.bytes() * 0.1);
 }
 
+TEST(TcpMessageRouteTest, DeliveredOnLossyNetwork) {
+  GlobalSimulatedTimeController time(Timestamp::seconds(0));
+  NetworkEmulationManagerImpl net(&time);
+  BuiltInNetworkBehaviorConfig send;
+  // 800 kbps means that the 100 kB message would be delivered in ca 1 second
+  // under ideal conditions and no overhead.
+  send.link_capacity_kbps = 100 * 8;
+  send.loss_percent = 50;
+  send.queue_delay_ms = 100;
+  send.delay_standard_deviation_ms = 20;
+  send.allow_reordering = true;
+  auto ret = send;
+  ret.loss_percent = 10;
+
+  auto* tcp_route = net.CreateTcpRoute({net.CreateEmulatedNode(send)},
+                                       {net.CreateEmulatedNode(ret)});
+  int deliver_count = 0;
+  // 100 kB is more than what fits into a single packet.
+  constexpr size_t kMessageSize = 100000;
+
+  tcp_route->SendMessage(kMessageSize, [&] {
+    RTC_LOG(LS_INFO) << "Received at "
+                     << ToString(time.GetClock()->CurrentTime());
+    deliver_count++;
+  });
+
+  // If there was no loss, we would have delivered the message in ca 1 second,
+  // with 50% it should take much longer.
+  time.Sleep(TimeDelta::seconds(5));
+  ASSERT_EQ(deliver_count, 0);
+  // But given enough time the messsage will be delivered, but only once.
+  time.Sleep(TimeDelta::seconds(60));
+  EXPECT_EQ(deliver_count, 1);
+}
+
 }  // namespace test
 }  // namespace webrtc
diff --git a/test/network/network_emulation_manager.cc b/test/network/network_emulation_manager.cc
index ead8fe5..b2ccddf 100644
--- a/test/network/network_emulation_manager.cc
+++ b/test/network/network_emulation_manager.cc
@@ -228,6 +228,18 @@
   return traffic_ptr;
 }
 
+TcpMessageRoute* NetworkEmulationManagerImpl::CreateTcpRoute(
+    std::vector<EmulatedNetworkNode*> send_link,
+    std::vector<EmulatedNetworkNode*> ret_link) {
+  auto tcp_route = std::make_unique<TcpMessageRoute>(
+      clock_, task_queue_.Get(), CreateRoute(send_link), CreateRoute(ret_link));
+  auto* route_ptr = tcp_route.get();
+  task_queue_.PostTask([this, tcp_route = std::move(tcp_route)]() mutable {
+    tcp_message_routes_.push_back(std::move(tcp_route));
+  });
+  return route_ptr;
+}
+
 void NetworkEmulationManagerImpl::StopCrossTraffic(
     FakeTcpCrossTraffic* traffic) {
   task_queue_.PostTask([=]() {
diff --git a/test/network/network_emulation_manager.h b/test/network/network_emulation_manager.h
index b4e06ce..8076e6c 100644
--- a/test/network/network_emulation_manager.h
+++ b/test/network/network_emulation_manager.h
@@ -76,6 +76,10 @@
       std::vector<EmulatedNetworkNode*> send_link,
       std::vector<EmulatedNetworkNode*> ret_link,
       FakeTcpConfig config);
+
+  TcpMessageRoute* CreateTcpRoute(std::vector<EmulatedNetworkNode*> send_link,
+                                  std::vector<EmulatedNetworkNode*> ret_link);
+
   void StopCrossTraffic(FakeTcpCrossTraffic* traffic);
 
   EmulatedNetworkManagerInterface* CreateEmulatedNetworkManagerInterface(
@@ -101,6 +105,7 @@
   std::vector<std::unique_ptr<RandomWalkCrossTraffic>> random_cross_traffics_;
   std::vector<std::unique_ptr<PulsedPeaksCrossTraffic>> pulsed_cross_traffics_;
   std::list<std::unique_ptr<FakeTcpCrossTraffic>> tcp_cross_traffics_;
+  std::list<std::unique_ptr<TcpMessageRoute>> tcp_message_routes_;
   std::vector<std::unique_ptr<EndpointsContainer>> endpoints_containers_;
   std::vector<std::unique_ptr<EmulatedNetworkManager>> network_managers_;