Update libjingle to 50191337.

R=mallinath@webrtc.org

Review URL: https://webrtc-codereview.appspot.com/1885005

git-svn-id: http://webrtc.googlecode.com/svn/trunk@4461 4adac7df-926f-26a2-2b94-8c16560cd09d
diff --git a/talk/app/webrtc/datachannel.cc b/talk/app/webrtc/datachannel.cc
index 345cd5f..4972424 100644
--- a/talk/app/webrtc/datachannel.cc
+++ b/talk/app/webrtc/datachannel.cc
@@ -34,7 +34,8 @@
 
 namespace webrtc {
 
-static size_t kMaxQueuedDataPackets = 100;
+static size_t kMaxQueuedReceivedDataPackets = 100;
+static size_t kMaxQueuedSendDataPackets = 100;
 
 talk_base::scoped_refptr<DataChannel> DataChannel::Create(
     WebRtcSession* session,
@@ -95,12 +96,13 @@
 }
 
 DataChannel::~DataChannel() {
-  ClearQueuedData();
+  ClearQueuedReceivedData();
+  ClearQueuedSendData();
 }
 
 void DataChannel::RegisterObserver(DataChannelObserver* observer) {
   observer_ = observer;
-  DeliverQueuedData();
+  DeliverQueuedReceivedData();
 }
 
 void DataChannel::UnregisterObserver() {
@@ -117,7 +119,13 @@
 }
 
 uint64 DataChannel::buffered_amount() const {
-  return 0;
+  uint64 buffered_amount = 0;
+  for (std::deque<DataBuffer*>::const_iterator it = queued_send_data_.begin();
+      it != queued_send_data_.end();
+      ++it) {
+    buffered_amount += (*it)->size();
+  }
+  return buffered_amount;
 }
 
 void DataChannel::Close() {
@@ -133,20 +141,22 @@
   if (state_ != kOpen) {
     return false;
   }
-  cricket::SendDataParams send_params;
-
-  send_params.ssrc = send_ssrc_;
-  if (session_->data_channel_type() == cricket::DCT_SCTP) {
-    send_params.ordered = config_.ordered;
-    send_params.max_rtx_count = config_.maxRetransmits;
-    send_params.max_rtx_ms = config_.maxRetransmitTime;
+  // If the queue is non-empty, we're waiting for SignalReadyToSend,
+  // so just add to the end of the queue and keep waiting.
+  if (!queued_send_data_.empty()) {
+    return QueueSendData(buffer);
   }
-  send_params.type = buffer.binary ? cricket::DMT_BINARY : cricket::DMT_TEXT;
 
   cricket::SendDataResult send_result;
-  // TODO(pthatcher): Use send_result.would_block for buffering.
-  return session_->data_channel()->SendData(
-      send_params, buffer.data, &send_result);
+  if (!InternalSendWithoutQueueing(buffer, &send_result)) {
+    if (send_result == cricket::SDR_BLOCK) {
+      return QueueSendData(buffer);
+    }
+    // Fail for other results.
+    // TODO(jiayl): We should close the data channel in this case.
+    return false;
+  }
+  return true;
 }
 
 void DataChannel::SetReceiveSsrc(uint32 receive_ssrc) {
@@ -183,6 +193,43 @@
   DoClose();
 }
 
+void DataChannel::OnDataReceived(cricket::DataChannel* channel,
+                                 const cricket::ReceiveDataParams& params,
+                                 const talk_base::Buffer& payload) {
+  if (params.ssrc != receive_ssrc_) {
+    return;
+  }
+
+  bool binary = (params.type == cricket::DMT_BINARY);
+  talk_base::scoped_ptr<DataBuffer> buffer(new DataBuffer(payload, binary));
+  if (was_ever_writable_ && observer_) {
+    observer_->OnMessage(*buffer.get());
+  } else {
+    if (queued_received_data_.size() > kMaxQueuedReceivedDataPackets) {
+      // TODO(jiayl): We should close the data channel in this case.
+      LOG(LS_ERROR)
+          << "Queued received data exceeds the max number of packes.";
+      ClearQueuedReceivedData();
+    }
+    queued_received_data_.push(buffer.release());
+  }
+}
+
+void DataChannel::OnChannelReady(bool writable) {
+  if (!writable) {
+    return;
+  }
+  // Update the readyState if the channel is writable for the first time;
+  // otherwise it means the channel was blocked for sending and now unblocked,
+  // so send the queued data now.
+  if (!was_ever_writable_) {
+    was_ever_writable_ = true;
+    UpdateState();
+  } else if (state_ == kOpen) {
+    SendQueuedSendData();
+  }
+}
+
 void DataChannel::DoClose() {
   receive_ssrc_set_ = false;
   send_ssrc_set_ = false;
@@ -201,7 +248,7 @@
           SetState(kOpen);
           // If we have received buffers before the channel got writable.
           // Deliver them now.
-          DeliverQueuedData();
+          DeliverQueuedReceivedData();
         }
       }
       break;
@@ -249,47 +296,76 @@
   data_session_ = NULL;
 }
 
-void DataChannel::DeliverQueuedData() {
-  if (was_ever_writable_ && observer_) {
-    while (!queued_data_.empty()) {
-      DataBuffer* buffer = queued_data_.front();
-      observer_->OnMessage(*buffer);
-      queued_data_.pop();
-      delete buffer;
-    }
+void DataChannel::DeliverQueuedReceivedData() {
+  if (!was_ever_writable_ || !observer_) {
+    return;
   }
-}
 
-void DataChannel::ClearQueuedData() {
-  while (!queued_data_.empty()) {
-    DataBuffer* buffer = queued_data_.front();
-    queued_data_.pop();
+  while (!queued_received_data_.empty()) {
+    DataBuffer* buffer = queued_received_data_.front();
+    observer_->OnMessage(*buffer);
+    queued_received_data_.pop();
     delete buffer;
   }
 }
 
-void DataChannel::OnDataReceived(cricket::DataChannel* channel,
-                                 const cricket::ReceiveDataParams& params,
-                                 const talk_base::Buffer& payload) {
-  if (params.ssrc == receive_ssrc_) {
-    bool binary = false;
-    talk_base::scoped_ptr<DataBuffer> buffer(new DataBuffer(payload, binary));
-    if (was_ever_writable_ && observer_) {
-      observer_->OnMessage(*buffer.get());
-    } else {
-      if (queued_data_.size() > kMaxQueuedDataPackets) {
-        ClearQueuedData();
-      }
-      queued_data_.push(buffer.release());
-    }
+void DataChannel::ClearQueuedReceivedData() {
+  while (!queued_received_data_.empty()) {
+    DataBuffer* buffer = queued_received_data_.front();
+    queued_received_data_.pop();
+    delete buffer;
   }
 }
 
-void DataChannel::OnChannelReady(bool writable) {
-  if (!was_ever_writable_ && writable) {
-    was_ever_writable_ = true;
-    UpdateState();
+void DataChannel::SendQueuedSendData() {
+  if (!was_ever_writable_) {
+    return;
   }
+
+  while (!queued_send_data_.empty()) {
+    DataBuffer* buffer = queued_send_data_.front();
+    cricket::SendDataResult send_result;
+    if (!InternalSendWithoutQueueing(*buffer, &send_result)) {
+      LOG(LS_WARNING) << "SendQueuedSendData aborted due to send_result "
+                      << send_result;
+      break;
+    }
+    queued_send_data_.pop_front();
+    delete buffer;
+  }
+}
+
+void DataChannel::ClearQueuedSendData() {
+  while (!queued_received_data_.empty()) {
+    DataBuffer* buffer = queued_received_data_.front();
+    queued_received_data_.pop();
+    delete buffer;
+  }
+}
+
+bool DataChannel::InternalSendWithoutQueueing(
+    const DataBuffer& buffer, cricket::SendDataResult* send_result) {
+  cricket::SendDataParams send_params;
+
+  send_params.ssrc = send_ssrc_;
+  if (session_->data_channel_type() == cricket::DCT_SCTP) {
+    send_params.ordered = config_.ordered;
+    send_params.max_rtx_count = config_.maxRetransmits;
+    send_params.max_rtx_ms = config_.maxRetransmitTime;
+  }
+  send_params.type = buffer.binary ? cricket::DMT_BINARY : cricket::DMT_TEXT;
+
+  return session_->data_channel()->SendData(send_params, buffer.data,
+                                            send_result);
+}
+
+bool DataChannel::QueueSendData(const DataBuffer& buffer) {
+  if (queued_send_data_.size() > kMaxQueuedSendDataPackets) {
+    LOG(LS_ERROR) << "Can't buffer any more data in the data channel.";
+    return false;
+  }
+  queued_send_data_.push_back(new DataBuffer(buffer));
+  return true;
 }
 
 }  // namespace webrtc