Update libjingle to 50191337.
R=mallinath@webrtc.org
Review URL: https://webrtc-codereview.appspot.com/1885005
git-svn-id: http://webrtc.googlecode.com/svn/trunk@4461 4adac7df-926f-26a2-2b94-8c16560cd09d
diff --git a/talk/app/webrtc/datachannel.cc b/talk/app/webrtc/datachannel.cc
index 345cd5f..4972424 100644
--- a/talk/app/webrtc/datachannel.cc
+++ b/talk/app/webrtc/datachannel.cc
@@ -34,7 +34,8 @@
namespace webrtc {
-static size_t kMaxQueuedDataPackets = 100;
+static size_t kMaxQueuedReceivedDataPackets = 100;
+static size_t kMaxQueuedSendDataPackets = 100;
talk_base::scoped_refptr<DataChannel> DataChannel::Create(
WebRtcSession* session,
@@ -95,12 +96,13 @@
}
DataChannel::~DataChannel() {
- ClearQueuedData();
+ ClearQueuedReceivedData();
+ ClearQueuedSendData();
}
void DataChannel::RegisterObserver(DataChannelObserver* observer) {
observer_ = observer;
- DeliverQueuedData();
+ DeliverQueuedReceivedData();
}
void DataChannel::UnregisterObserver() {
@@ -117,7 +119,13 @@
}
uint64 DataChannel::buffered_amount() const {
- return 0;
+ uint64 buffered_amount = 0;
+ for (std::deque<DataBuffer*>::const_iterator it = queued_send_data_.begin();
+ it != queued_send_data_.end();
+ ++it) {
+ buffered_amount += (*it)->size();
+ }
+ return buffered_amount;
}
void DataChannel::Close() {
@@ -133,20 +141,22 @@
if (state_ != kOpen) {
return false;
}
- cricket::SendDataParams send_params;
-
- send_params.ssrc = send_ssrc_;
- if (session_->data_channel_type() == cricket::DCT_SCTP) {
- send_params.ordered = config_.ordered;
- send_params.max_rtx_count = config_.maxRetransmits;
- send_params.max_rtx_ms = config_.maxRetransmitTime;
+ // If the queue is non-empty, we're waiting for SignalReadyToSend,
+ // so just add to the end of the queue and keep waiting.
+ if (!queued_send_data_.empty()) {
+ return QueueSendData(buffer);
}
- send_params.type = buffer.binary ? cricket::DMT_BINARY : cricket::DMT_TEXT;
cricket::SendDataResult send_result;
- // TODO(pthatcher): Use send_result.would_block for buffering.
- return session_->data_channel()->SendData(
- send_params, buffer.data, &send_result);
+ if (!InternalSendWithoutQueueing(buffer, &send_result)) {
+ if (send_result == cricket::SDR_BLOCK) {
+ return QueueSendData(buffer);
+ }
+ // Fail for other results.
+ // TODO(jiayl): We should close the data channel in this case.
+ return false;
+ }
+ return true;
}
void DataChannel::SetReceiveSsrc(uint32 receive_ssrc) {
@@ -183,6 +193,43 @@
DoClose();
}
+void DataChannel::OnDataReceived(cricket::DataChannel* channel,
+ const cricket::ReceiveDataParams& params,
+ const talk_base::Buffer& payload) {
+ if (params.ssrc != receive_ssrc_) {
+ return;
+ }
+
+ bool binary = (params.type == cricket::DMT_BINARY);
+ talk_base::scoped_ptr<DataBuffer> buffer(new DataBuffer(payload, binary));
+ if (was_ever_writable_ && observer_) {
+ observer_->OnMessage(*buffer.get());
+ } else {
+ if (queued_received_data_.size() > kMaxQueuedReceivedDataPackets) {
+ // TODO(jiayl): We should close the data channel in this case.
+ LOG(LS_ERROR)
+ << "Queued received data exceeds the max number of packes.";
+ ClearQueuedReceivedData();
+ }
+ queued_received_data_.push(buffer.release());
+ }
+}
+
+void DataChannel::OnChannelReady(bool writable) {
+ if (!writable) {
+ return;
+ }
+ // Update the readyState if the channel is writable for the first time;
+ // otherwise it means the channel was blocked for sending and now unblocked,
+ // so send the queued data now.
+ if (!was_ever_writable_) {
+ was_ever_writable_ = true;
+ UpdateState();
+ } else if (state_ == kOpen) {
+ SendQueuedSendData();
+ }
+}
+
void DataChannel::DoClose() {
receive_ssrc_set_ = false;
send_ssrc_set_ = false;
@@ -201,7 +248,7 @@
SetState(kOpen);
// If we have received buffers before the channel got writable.
// Deliver them now.
- DeliverQueuedData();
+ DeliverQueuedReceivedData();
}
}
break;
@@ -249,47 +296,76 @@
data_session_ = NULL;
}
-void DataChannel::DeliverQueuedData() {
- if (was_ever_writable_ && observer_) {
- while (!queued_data_.empty()) {
- DataBuffer* buffer = queued_data_.front();
- observer_->OnMessage(*buffer);
- queued_data_.pop();
- delete buffer;
- }
+void DataChannel::DeliverQueuedReceivedData() {
+ if (!was_ever_writable_ || !observer_) {
+ return;
}
-}
-void DataChannel::ClearQueuedData() {
- while (!queued_data_.empty()) {
- DataBuffer* buffer = queued_data_.front();
- queued_data_.pop();
+ while (!queued_received_data_.empty()) {
+ DataBuffer* buffer = queued_received_data_.front();
+ observer_->OnMessage(*buffer);
+ queued_received_data_.pop();
delete buffer;
}
}
-void DataChannel::OnDataReceived(cricket::DataChannel* channel,
- const cricket::ReceiveDataParams& params,
- const talk_base::Buffer& payload) {
- if (params.ssrc == receive_ssrc_) {
- bool binary = false;
- talk_base::scoped_ptr<DataBuffer> buffer(new DataBuffer(payload, binary));
- if (was_ever_writable_ && observer_) {
- observer_->OnMessage(*buffer.get());
- } else {
- if (queued_data_.size() > kMaxQueuedDataPackets) {
- ClearQueuedData();
- }
- queued_data_.push(buffer.release());
- }
+void DataChannel::ClearQueuedReceivedData() {
+ while (!queued_received_data_.empty()) {
+ DataBuffer* buffer = queued_received_data_.front();
+ queued_received_data_.pop();
+ delete buffer;
}
}
-void DataChannel::OnChannelReady(bool writable) {
- if (!was_ever_writable_ && writable) {
- was_ever_writable_ = true;
- UpdateState();
+void DataChannel::SendQueuedSendData() {
+ if (!was_ever_writable_) {
+ return;
}
+
+ while (!queued_send_data_.empty()) {
+ DataBuffer* buffer = queued_send_data_.front();
+ cricket::SendDataResult send_result;
+ if (!InternalSendWithoutQueueing(*buffer, &send_result)) {
+ LOG(LS_WARNING) << "SendQueuedSendData aborted due to send_result "
+ << send_result;
+ break;
+ }
+ queued_send_data_.pop_front();
+ delete buffer;
+ }
+}
+
+void DataChannel::ClearQueuedSendData() {
+ while (!queued_received_data_.empty()) {
+ DataBuffer* buffer = queued_received_data_.front();
+ queued_received_data_.pop();
+ delete buffer;
+ }
+}
+
+bool DataChannel::InternalSendWithoutQueueing(
+ const DataBuffer& buffer, cricket::SendDataResult* send_result) {
+ cricket::SendDataParams send_params;
+
+ send_params.ssrc = send_ssrc_;
+ if (session_->data_channel_type() == cricket::DCT_SCTP) {
+ send_params.ordered = config_.ordered;
+ send_params.max_rtx_count = config_.maxRetransmits;
+ send_params.max_rtx_ms = config_.maxRetransmitTime;
+ }
+ send_params.type = buffer.binary ? cricket::DMT_BINARY : cricket::DMT_TEXT;
+
+ return session_->data_channel()->SendData(send_params, buffer.data,
+ send_result);
+}
+
+bool DataChannel::QueueSendData(const DataBuffer& buffer) {
+ if (queued_send_data_.size() > kMaxQueuedSendDataPackets) {
+ LOG(LS_ERROR) << "Can't buffer any more data in the data channel.";
+ return false;
+ }
+ queued_send_data_.push_back(new DataBuffer(buffer));
+ return true;
}
} // namespace webrtc