Reland "[PeerConnection] Use an OperationsChain in PeerConnection for async ops."
This is a reland of 1dddaa1a84330091ca083c950ef2e24a85a48fc8
The regression that caused the original CL to be reverted was the fact that
invoking SetLocalDescription() inside of the CreateOffer() callback was no
longer executing synchronously and immediately.
In this CL, the original CL is patched so that the CreateOffer() operation
is marked as completed just before invoking the CreateOffer() callback
(versus doing it just afterwards). This ensures that the OperationsChain is
popped before the callback runs. The same applies for CreateAnswer().
See diff between Patch Set 1 (Original CL) and the latest Patch Set.
Original change's description:
> [PeerConnection] Use an OperationsChain in PeerConnection for async ops.
>
> For background, motivation, requirements and implementation notes, see
> https://docs.google.com/document/d/1XLwNN2kUIGGTwz9LQ0NwJNkcybi9oKnynUEZB1jGA14/edit?usp=sharing
>
> Using the OperationsChain will unblock future CLs from chaining multiple
> operations together such as implementing parameterless
> setLocalDescription().
>
> In this CL, the OperationsChain is used in existing signaling operations
> with little intended side-effects. An operation that is chained onto an
> empty OperationsChain will for instance execute immediately, and
> SetLocalDescription() and SetRemoteDescription() are implemented as
> "synchronous operations".
>
> The lifetime of the PeerConnection is not indended to change as a result
> of this CL: All chained operations use a WeakPtr to the PC to ensure
> use-after-free does not happen.
>
> There is one notable change though: CreateOffer() and CreateAnswer() will
> asynchronously delay other signaling methods from executing until they
> have completed.
>
> Drive-by fix: This CL also ensures that early failing
> CreateOffer/CreateAnswer operation's observers are invoked if the
> PeerConnection is destroyed while a PostCreateSessionDescriptionFailure
> is pending.
>
> Bug: webrtc:11019
> Change-Id: I521333e41d20d9bbfb1e721609f2c9db2a5f93a9
> Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/157305
> Reviewed-by: Steve Anton <steveanton@webrtc.org>
> Commit-Queue: Henrik Boström <hbos@webrtc.org>
> Cr-Commit-Position: refs/heads/master@{#29605}
TBR=steveanton@webrtc.org
Bug: webrtc:11019
Change-Id: I57b4496e63378c91c24679ee496e21f5cb6a0e59
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/158524
Commit-Queue: Henrik Boström <hbos@webrtc.org>
Reviewed-by: Henrik Boström <hbos@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#29630}
diff --git a/pc/peer_connection.cc b/pc/peer_connection.cc
index 08fbe41..7dc2e35 100644
--- a/pc/peer_connection.cc
+++ b/pc/peer_connection.cc
@@ -647,6 +647,51 @@
: nullptr;
}
+// Wraps a CreateSessionDescriptionObserver and an OperationsChain operation
+// complete callback. When the observer is invoked, the wrapped observer is
+// invoked followed by invoking the completion callback.
+class CreateSessionDescriptionObserverOperationWrapper
+ : public CreateSessionDescriptionObserver {
+ public:
+ CreateSessionDescriptionObserverOperationWrapper(
+ rtc::scoped_refptr<CreateSessionDescriptionObserver> observer,
+ std::function<void()> operation_complete_callback)
+ : observer_(std::move(observer)),
+ operation_complete_callback_(std::move(operation_complete_callback)) {
+ RTC_DCHECK(observer_);
+ }
+ ~CreateSessionDescriptionObserverOperationWrapper() override {
+ RTC_DCHECK(was_called_);
+ }
+
+ void OnSuccess(SessionDescriptionInterface* desc) override {
+ RTC_DCHECK(!was_called_);
+#ifdef RTC_DCHECK_IS_ON
+ was_called_ = true;
+#endif // RTC_DCHECK_IS_ON
+ // Completing the operation before invoking the observer allows the observer
+ // to execute SetLocalDescription() without delay.
+ operation_complete_callback_();
+ observer_->OnSuccess(desc);
+ }
+
+ void OnFailure(RTCError error) override {
+ RTC_DCHECK(!was_called_);
+#ifdef RTC_DCHECK_IS_ON
+ was_called_ = true;
+#endif // RTC_DCHECK_IS_ON
+ operation_complete_callback_();
+ observer_->OnFailure(std::move(error));
+ }
+
+ private:
+#ifdef RTC_DCHECK_IS_ON
+ bool was_called_ = false;
+#endif // RTC_DCHECK_IS_ON
+ rtc::scoped_refptr<CreateSessionDescriptionObserver> observer_;
+ std::function<void()> operation_complete_callback_;
+};
+
} // namespace
class PeerConnection::LocalIceCredentialsToReplace {
@@ -894,6 +939,7 @@
: factory_(factory),
event_log_(std::move(event_log)),
event_log_ptr_(event_log_.get()),
+ operations_chain_(rtc::OperationsChain::Create()),
datagram_transport_config_(
field_trial::FindFullName(kDatagramTransportFieldTrial)),
datagram_transport_data_channel_config_(
@@ -904,12 +950,15 @@
call_(std::move(call)),
call_ptr_(call_.get()),
data_channel_transport_(nullptr),
- local_ice_credentials_to_replace_(new LocalIceCredentialsToReplace()) {}
+ local_ice_credentials_to_replace_(new LocalIceCredentialsToReplace()),
+ weak_ptr_factory_(this) {}
PeerConnection::~PeerConnection() {
TRACE_EVENT0("webrtc", "PeerConnection::~PeerConnection");
RTC_DCHECK_RUN_ON(signaling_thread());
+ weak_ptr_factory_.InvalidateWeakPtrs();
+
// Need to stop transceivers before destroying the stats collector because
// AudioRtpSender has a reference to the StatsCollector it will update when
// stopping.
@@ -946,6 +995,23 @@
// The event log must outlive call (and any other object that uses it).
event_log_.reset();
});
+
+ // Process all pending notifications in the message queue. If we don't do
+ // this, requests will linger and not know they succeeded or failed.
+ rtc::MessageList list;
+ signaling_thread()->Clear(this, rtc::MQID_ANY, &list);
+ for (auto& msg : list) {
+ if (msg.message_id == MSG_CREATE_SESSIONDESCRIPTION_FAILED) {
+ // Processing CreateOffer() and CreateAnswer() messages ensures their
+ // observers are invoked even if the PeerConnection is destroyed early.
+ OnMessage(&msg);
+ } else {
+ // TODO(hbos): Consider processing all pending messages. This would mean
+ // that SetLocalDescription() and SetRemoteDescription() observers are
+ // informed of successes and failures; this is currently NOT the case.
+ delete msg.pdata;
+ }
+ }
}
void PeerConnection::DestroyAllChannels() {
@@ -2052,7 +2118,37 @@
void PeerConnection::CreateOffer(CreateSessionDescriptionObserver* observer,
const RTCOfferAnswerOptions& options) {
RTC_DCHECK_RUN_ON(signaling_thread());
- TRACE_EVENT0("webrtc", "PeerConnection::CreateOffer");
+ // Chain this operation. If asynchronous operations are pending on the chain,
+ // this operation will be queued to be invoked, otherwise the contents of the
+ // lambda will execute immediately.
+ operations_chain_->ChainOperation(
+ [this_weak_ptr = weak_ptr_factory_.GetWeakPtr(),
+ observer_refptr =
+ rtc::scoped_refptr<CreateSessionDescriptionObserver>(observer),
+ options](std::function<void()> operations_chain_callback) {
+ // Abort early if |this_weak_ptr| is no longer valid.
+ if (!this_weak_ptr) {
+ observer_refptr->OnFailure(
+ RTCError(RTCErrorType::INTERNAL_ERROR,
+ "CreateOffer failed because the session was shut down"));
+ operations_chain_callback();
+ return;
+ }
+ // The operation completes asynchronously when the wrapper is invoked.
+ rtc::scoped_refptr<CreateSessionDescriptionObserverOperationWrapper>
+ observer_wrapper(new rtc::RefCountedObject<
+ CreateSessionDescriptionObserverOperationWrapper>(
+ std::move(observer_refptr),
+ std::move(operations_chain_callback)));
+ this_weak_ptr->DoCreateOffer(options, observer_wrapper);
+ });
+}
+
+void PeerConnection::DoCreateOffer(
+ const RTCOfferAnswerOptions& options,
+ rtc::scoped_refptr<CreateSessionDescriptionObserver> observer) {
+ RTC_DCHECK_RUN_ON(signaling_thread());
+ TRACE_EVENT0("webrtc", "PeerConnection::DoCreateOffer");
if (!observer) {
RTC_LOG(LS_ERROR) << "CreateOffer - observer is NULL.";
@@ -2178,7 +2274,37 @@
void PeerConnection::CreateAnswer(CreateSessionDescriptionObserver* observer,
const RTCOfferAnswerOptions& options) {
RTC_DCHECK_RUN_ON(signaling_thread());
- TRACE_EVENT0("webrtc", "PeerConnection::CreateAnswer");
+ // Chain this operation. If asynchronous operations are pending on the chain,
+ // this operation will be queued to be invoked, otherwise the contents of the
+ // lambda will execute immediately.
+ operations_chain_->ChainOperation(
+ [this_weak_ptr = weak_ptr_factory_.GetWeakPtr(),
+ observer_refptr =
+ rtc::scoped_refptr<CreateSessionDescriptionObserver>(observer),
+ options](std::function<void()> operations_chain_callback) {
+ // Abort early if |this_weak_ptr| is no longer valid.
+ if (!this_weak_ptr) {
+ observer_refptr->OnFailure(RTCError(
+ RTCErrorType::INTERNAL_ERROR,
+ "CreateAnswer failed because the session was shut down"));
+ operations_chain_callback();
+ return;
+ }
+ // The operation completes asynchronously when the wrapper is invoked.
+ rtc::scoped_refptr<CreateSessionDescriptionObserverOperationWrapper>
+ observer_wrapper(new rtc::RefCountedObject<
+ CreateSessionDescriptionObserverOperationWrapper>(
+ std::move(observer_refptr),
+ std::move(operations_chain_callback)));
+ this_weak_ptr->DoCreateAnswer(options, observer_wrapper);
+ });
+}
+
+void PeerConnection::DoCreateAnswer(
+ const RTCOfferAnswerOptions& options,
+ rtc::scoped_refptr<CreateSessionDescriptionObserver> observer) {
+ RTC_DCHECK_RUN_ON(signaling_thread());
+ TRACE_EVENT0("webrtc", "PeerConnection::DoCreateAnswer");
if (!observer) {
RTC_LOG(LS_ERROR) << "CreateAnswer - observer is NULL.";
return;
@@ -2232,13 +2358,44 @@
SetSessionDescriptionObserver* observer,
SessionDescriptionInterface* desc_ptr) {
RTC_DCHECK_RUN_ON(signaling_thread());
- TRACE_EVENT0("webrtc", "PeerConnection::SetLocalDescription");
+ // Chain this operation. If asynchronous operations are pending on the chain,
+ // this operation will be queued to be invoked, otherwise the contents of the
+ // lambda will execute immediately.
+ operations_chain_->ChainOperation(
+ [this_weak_ptr = weak_ptr_factory_.GetWeakPtr(),
+ observer_refptr =
+ rtc::scoped_refptr<SetSessionDescriptionObserver>(observer),
+ desc = std::unique_ptr<SessionDescriptionInterface>(desc_ptr)](
+ std::function<void()> operations_chain_callback) mutable {
+ // Abort early if |this_weak_ptr| is no longer valid.
+ if (!this_weak_ptr) {
+ // For consistency with DoSetLocalDescription(), we DO NOT inform the
+ // |observer_refptr| that the operation failed in this case.
+ // TODO(hbos): If/when we process SLD messages in ~PeerConnection,
+ // the consistent thing would be to inform the observer here.
+ operations_chain_callback();
+ return;
+ }
+ this_weak_ptr->DoSetLocalDescription(std::move(desc),
+ std::move(observer_refptr));
+ // DoSetLocalDescription() is currently implemented as a synchronous
+ // operation but where the |observer|'s callbacks are invoked
+ // asynchronously in a post to OnMessage().
+ // For backwards-compatability reasons, we declare the operation as
+ // completed here (rather than in OnMessage()). This ensures that:
+ // - This operation is not keeping the PeerConnection alive past this
+ // point.
+ // - Subsequent offer/answer operations can start immediately (without
+ // waiting for OnMessage()).
+ operations_chain_callback();
+ });
+}
- // The SetLocalDescription contract is that we take ownership of the session
- // description regardless of the outcome, so wrap it in a unique_ptr right
- // away. Ideally, SetLocalDescription's signature will be changed to take the
- // description as a unique_ptr argument to formalize this agreement.
- std::unique_ptr<SessionDescriptionInterface> desc(desc_ptr);
+void PeerConnection::DoSetLocalDescription(
+ std::unique_ptr<SessionDescriptionInterface> desc,
+ rtc::scoped_refptr<SetSessionDescriptionObserver> observer) {
+ RTC_DCHECK_RUN_ON(signaling_thread());
+ TRACE_EVENT0("webrtc", "PeerConnection::DoSetLocalDescription");
if (!observer) {
RTC_LOG(LS_ERROR) << "SetLocalDescription - observer is NULL.";
@@ -2619,18 +2776,83 @@
void PeerConnection::SetRemoteDescription(
SetSessionDescriptionObserver* observer,
- SessionDescriptionInterface* desc) {
- SetRemoteDescription(
- std::unique_ptr<SessionDescriptionInterface>(desc),
- rtc::scoped_refptr<SetRemoteDescriptionObserverInterface>(
- new SetRemoteDescriptionObserverAdapter(this, observer)));
+ SessionDescriptionInterface* desc_ptr) {
+ RTC_DCHECK_RUN_ON(signaling_thread());
+ // Chain this operation. If asynchronous operations are pending on the chain,
+ // this operation will be queued to be invoked, otherwise the contents of the
+ // lambda will execute immediately.
+ operations_chain_->ChainOperation(
+ [this_weak_ptr = weak_ptr_factory_.GetWeakPtr(),
+ observer_refptr =
+ rtc::scoped_refptr<SetSessionDescriptionObserver>(observer),
+ desc = std::unique_ptr<SessionDescriptionInterface>(desc_ptr)](
+ std::function<void()> operations_chain_callback) mutable {
+ // Abort early if |this_weak_ptr| is no longer valid.
+ if (!this_weak_ptr) {
+ // For consistency with SetRemoteDescriptionObserverAdapter, we DO NOT
+ // inform the |observer_refptr| that the operation failed in this
+ // case.
+ // TODO(hbos): If/when we process SRD messages in ~PeerConnection,
+ // the consistent thing would be to inform the observer here.
+ operations_chain_callback();
+ return;
+ }
+ this_weak_ptr->DoSetRemoteDescription(
+ std::move(desc),
+ rtc::scoped_refptr<SetRemoteDescriptionObserverInterface>(
+ new SetRemoteDescriptionObserverAdapter(
+ this_weak_ptr.get(), std::move(observer_refptr))));
+ // DoSetRemoteDescription() is currently implemented as a synchronous
+ // operation but where SetRemoteDescriptionObserverAdapter ensures that
+ // the |observer|'s callbacks are invoked asynchronously in a post to
+ // OnMessage().
+ // For backwards-compatability reasons, we declare the operation as
+ // completed here (rather than in OnMessage()). This ensures that:
+ // - This operation is not keeping the PeerConnection alive past this
+ // point.
+ // - Subsequent offer/answer operations can start immediately (without
+ // waiting for OnMessage()).
+ operations_chain_callback();
+ });
}
void PeerConnection::SetRemoteDescription(
std::unique_ptr<SessionDescriptionInterface> desc,
rtc::scoped_refptr<SetRemoteDescriptionObserverInterface> observer) {
RTC_DCHECK_RUN_ON(signaling_thread());
- TRACE_EVENT0("webrtc", "PeerConnection::SetRemoteDescription");
+ // Chain this operation. If asynchronous operations are pending on the chain,
+ // this operation will be queued to be invoked, otherwise the contents of the
+ // lambda will execute immediately.
+ operations_chain_->ChainOperation(
+ [this_weak_ptr = weak_ptr_factory_.GetWeakPtr(), observer,
+ desc = std::move(desc)](
+ std::function<void()> operations_chain_callback) mutable {
+ // Abort early if |this_weak_ptr| is no longer valid.
+ if (!this_weak_ptr) {
+ // For consistency with DoSetRemoteDescription(), we DO inform the
+ // |observer| that the operation failed in this case.
+ observer->OnSetRemoteDescriptionComplete(RTCError(
+ RTCErrorType::INVALID_STATE,
+ "Failed to set remote offer sdp: failed because the session was "
+ "shut down"));
+ operations_chain_callback();
+ return;
+ }
+ this_weak_ptr->DoSetRemoteDescription(std::move(desc),
+ std::move(observer));
+ // DoSetRemoteDescription() is currently implemented as a synchronous
+ // operation. The |observer| will already have been informed that it
+ // completed, and we can mark this operation as complete without any
+ // loose ends.
+ operations_chain_callback();
+ });
+}
+
+void PeerConnection::DoSetRemoteDescription(
+ std::unique_ptr<SessionDescriptionInterface> desc,
+ rtc::scoped_refptr<SetRemoteDescriptionObserverInterface> observer) {
+ RTC_DCHECK_RUN_ON(signaling_thread());
+ TRACE_EVENT0("webrtc", "PeerConnection::DoSetRemoteDescription");
if (!observer) {
RTC_LOG(LS_ERROR) << "SetRemoteDescription - observer is NULL.";