pRevert 5371 "Revert 5367 "Update talk to 59410372.""
> Revert 5367 "Update talk to 59410372."
>
> > Update talk to 59410372.
> >
> > R=jiayl@webrtc.org, wu@webrtc.org
> >
> > Review URL: https://webrtc-codereview.appspot.com/6929004
>
> TBR=mallinath@webrtc.org
>
> Review URL: https://webrtc-codereview.appspot.com/6999004
TBR=henrika@webrtc.org
Review URL: https://webrtc-codereview.appspot.com/7109004
git-svn-id: http://webrtc.googlecode.com/svn/trunk@5381 4adac7df-926f-26a2-2b94-8c16560cd09d
diff --git a/talk/app/webrtc/datachannel.cc b/talk/app/webrtc/datachannel.cc
index 6c9e0bc..048e89a 100644
--- a/talk/app/webrtc/datachannel.cc
+++ b/talk/app/webrtc/datachannel.cc
@@ -29,9 +29,9 @@
#include <string>
#include "talk/app/webrtc/mediastreamprovider.h"
+#include "talk/app/webrtc/sctputils.h"
#include "talk/base/logging.h"
#include "talk/base/refcount.h"
-#include "talk/media/sctp/sctputils.h"
namespace webrtc {
@@ -46,7 +46,7 @@
DataChannelProviderInterface* provider,
cricket::DataChannelType dct,
const std::string& label,
- const DataChannelInit* config) {
+ const InternalDataChannelInit& config) {
talk_base::scoped_refptr<DataChannel> channel(
new talk_base::RefCountedObject<DataChannel>(provider, dct, label));
if (!channel->Init(config)) {
@@ -62,39 +62,40 @@
: label_(label),
observer_(NULL),
state_(kConnecting),
- was_ever_writable_(false),
- connected_to_provider_(false),
data_channel_type_(dct),
provider_(provider),
+ waiting_for_open_ack_(false),
+ was_ever_writable_(false),
+ connected_to_provider_(false),
send_ssrc_set_(false),
- send_ssrc_(0),
receive_ssrc_set_(false),
+ send_ssrc_(0),
receive_ssrc_(0) {
}
-bool DataChannel::Init(const DataChannelInit* config) {
+bool DataChannel::Init(const InternalDataChannelInit& config) {
if (data_channel_type_ == cricket::DCT_RTP &&
- (config->reliable ||
- config->id != -1 ||
- config->maxRetransmits != -1 ||
- config->maxRetransmitTime != -1)) {
+ (config.reliable ||
+ config.id != -1 ||
+ config.maxRetransmits != -1 ||
+ config.maxRetransmitTime != -1)) {
LOG(LS_ERROR) << "Failed to initialize the RTP data channel due to "
<< "invalid DataChannelInit.";
return false;
} else if (data_channel_type_ == cricket::DCT_SCTP) {
- if (config->id < -1 ||
- config->maxRetransmits < -1 ||
- config->maxRetransmitTime < -1) {
+ if (config.id < -1 ||
+ config.maxRetransmits < -1 ||
+ config.maxRetransmitTime < -1) {
LOG(LS_ERROR) << "Failed to initialize the SCTP data channel due to "
<< "invalid DataChannelInit.";
return false;
}
- if (config->maxRetransmits != -1 && config->maxRetransmitTime != -1) {
+ if (config.maxRetransmits != -1 && config.maxRetransmitTime != -1) {
LOG(LS_ERROR) <<
"maxRetransmits and maxRetransmitTime should not be both set.";
return false;
}
- config_ = *config;
+ config_ = config;
// Try to connect to the transport in case the transport channel already
// exists.
@@ -197,9 +198,44 @@
cricket::SendDataResult send_result;
bool retval = provider_->SendData(send_params, *buffer, &send_result);
- if (!retval && send_result == cricket::SDR_BLOCK) {
+ if (retval) {
+ LOG(LS_INFO) << "Sent OPEN message on channel " << config_.id;
+ // Send data as ordered before we receive any mesage from the remote peer
+ // to make sure the remote peer will not receive any data before it receives
+ // the OPEN message.
+ waiting_for_open_ack_ = true;
+ } else if (send_result == cricket::SDR_BLOCK) {
// Link is congested. Queue for later.
QueueControl(buffer.release());
+ } else {
+ LOG(LS_ERROR) << "Failed to send OPEN message with result "
+ << send_result << " on channel " << config_.id;
+ }
+ return retval;
+}
+
+bool DataChannel::SendOpenAckMessage(const talk_base::Buffer* raw_buffer) {
+ ASSERT(data_channel_type_ == cricket::DCT_SCTP &&
+ was_ever_writable_ &&
+ config_.id >= 0);
+
+ talk_base::scoped_ptr<const talk_base::Buffer> buffer(raw_buffer);
+
+ cricket::SendDataParams send_params;
+ send_params.ssrc = config_.id;
+ send_params.ordered = config_.ordered;
+ send_params.type = cricket::DMT_CONTROL;
+
+ cricket::SendDataResult send_result;
+ bool retval = provider_->SendData(send_params, *buffer, &send_result);
+ if (retval) {
+ LOG(LS_INFO) << "Sent OPEN_ACK message on channel " << config_.id;
+ } else if (send_result == cricket::SDR_BLOCK) {
+ // Link is congested. Queue for later.
+ QueueControl(buffer.release());
+ } else {
+ LOG(LS_ERROR) << "Failed to send OPEN_ACK message with result "
+ << send_result << " on channel " << config_.id;
}
return retval;
}
@@ -254,6 +290,35 @@
return;
}
+ if (params.type == cricket::DMT_CONTROL) {
+ ASSERT(data_channel_type_ == cricket::DCT_SCTP);
+ if (!waiting_for_open_ack_) {
+ // Ignore it if we are not expecting an ACK message.
+ LOG(LS_WARNING) << "DataChannel received unexpected CONTROL message, "
+ << "sid = " << params.ssrc;
+ return;
+ }
+ if (ParseDataChannelOpenAckMessage(payload)) {
+ // We can send unordered as soon as we receive the ACK message.
+ waiting_for_open_ack_ = false;
+ LOG(LS_INFO) << "DataChannel received OPEN_ACK message, sid = "
+ << params.ssrc;
+ } else {
+ LOG(LS_WARNING) << "DataChannel failed to parse OPEN_ACK message, sid = "
+ << params.ssrc;
+ }
+ return;
+ }
+
+ ASSERT(params.type == cricket::DMT_BINARY ||
+ params.type == cricket::DMT_TEXT);
+
+ LOG(LS_VERBOSE) << "DataChannel received DATA message, sid = " << params.ssrc;
+ // We can send unordered as soon as we receive any DATA message since the
+ // remote side must have received the OPEN (and old clients do not send
+ // OPEN_ACK).
+ waiting_for_open_ack_ = false;
+
bool binary = (params.type == cricket::DMT_BINARY);
talk_base::scoped_ptr<DataBuffer> buffer(new DataBuffer(payload, binary));
if (was_ever_writable_ && observer_) {
@@ -279,14 +344,17 @@
if (!was_ever_writable_) {
was_ever_writable_ = true;
- if (data_channel_type_ == cricket::DCT_SCTP && !config_.negotiated) {
- talk_base::Buffer* payload = new talk_base::Buffer;
- if (!cricket::WriteDataChannelOpenMessage(label_, config_, payload)) {
- // TODO(jiayl): close the data channel on this error.
- LOG(LS_ERROR) << "Could not write data channel OPEN message";
- return;
+ if (data_channel_type_ == cricket::DCT_SCTP) {
+ if (config_.open_handshake_role == InternalDataChannelInit::kOpener) {
+ talk_base::Buffer* payload = new talk_base::Buffer;
+ WriteDataChannelOpenMessage(label_, config_, payload);
+ SendOpenMessage(payload);
+ } else if (config_.open_handshake_role ==
+ InternalDataChannelInit::kAcker) {
+ talk_base::Buffer* payload = new talk_base::Buffer;
+ WriteDataChannelOpenAckMessage(payload);
+ SendOpenAckMessage(payload);
}
- SendOpenMessage(payload);
}
UpdateState();
@@ -412,7 +480,12 @@
while (!queued_control_data_.empty()) {
const talk_base::Buffer* buf = queued_control_data_.front();
queued_control_data_.pop();
- SendOpenMessage(buf);
+ if (config_.open_handshake_role == InternalDataChannelInit::kOpener) {
+ SendOpenMessage(buf);
+ } else {
+ ASSERT(config_.open_handshake_role == InternalDataChannelInit::kAcker);
+ SendOpenAckMessage(buf);
+ }
}
}
@@ -430,6 +503,13 @@
if (data_channel_type_ == cricket::DCT_SCTP) {
send_params.ordered = config_.ordered;
+ // Send as ordered if it is waiting for the OPEN_ACK message.
+ if (waiting_for_open_ack_ && !config_.ordered) {
+ send_params.ordered = true;
+ LOG(LS_VERBOSE) << "Sending data as ordered for unordered DataChannel "
+ << "because the OPEN_ACK message has not been received.";
+ }
+
send_params.max_rtx_count = config_.maxRetransmits;
send_params.max_rtx_ms = config_.maxRetransmitTime;
send_params.ssrc = config_.id;