blob: 8db717fc71c009ddbd1ea3b577b3d3c6acc256e4 [file] [log] [blame]
Florent Castelli023be3c2022-03-15 16:01:52 +01001/*
2 * Copyright (c) 2021 The WebRTC project authors. All Rights Reserved.
3 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 */
10#include "rtc_tools/data_channel_benchmark/grpc_signaling.h"
11
12#include <grpc/support/log.h>
13#include <grpcpp/grpcpp.h>
14
15#include <string>
16#include <utility>
17
18#include "api/jsep.h"
19#include "api/jsep_ice_candidate.h"
20#include "rtc_base/thread.h"
21#include "rtc_tools/data_channel_benchmark/peer_connection_signaling.grpc.pb.h"
22
23namespace webrtc {
24namespace {
25
26using GrpcSignaling::IceCandidate;
27using GrpcSignaling::PeerConnectionSignaling;
28using GrpcSignaling::SessionDescription;
29using GrpcSignaling::SignalingMessage;
30
31template <class T>
32class SessionData : public webrtc::SignalingInterface {
33 public:
34 SessionData() {}
35 explicit SessionData(T* stream) : stream_(stream) {}
36 void SetStream(T* stream) { stream_ = stream; }
37
38 void SendIceCandidate(const IceCandidateInterface* candidate) override {
39 RTC_LOG(LS_INFO) << "SendIceCandidate";
40 std::string serialized_candidate;
41 if (!candidate->ToString(&serialized_candidate)) {
42 RTC_LOG(LS_ERROR) << "Failed to serialize ICE candidate";
43 return;
44 }
45
46 SignalingMessage message;
47 IceCandidate* proto_candidate = message.mutable_candidate();
48 proto_candidate->set_description(serialized_candidate);
49 proto_candidate->set_mid(candidate->sdp_mid());
50 proto_candidate->set_mline_index(candidate->sdp_mline_index());
51
52 stream_->Write(message);
53 }
54
55 void SendDescription(const SessionDescriptionInterface* sdp) override {
56 RTC_LOG(LS_INFO) << "SendDescription";
57
58 std::string serialized_sdp;
59 sdp->ToString(&serialized_sdp);
60
61 SignalingMessage message;
62 if (sdp->GetType() == SdpType::kOffer)
63 message.mutable_description()->set_type(SessionDescription::OFFER);
64 else if (sdp->GetType() == SdpType::kAnswer)
65 message.mutable_description()->set_type(SessionDescription::ANSWER);
66 message.mutable_description()->set_content(serialized_sdp);
67
68 stream_->Write(message);
69 }
70
71 void OnRemoteDescription(
72 std::function<void(std::unique_ptr<SessionDescriptionInterface> sdp)>
73 callback) override {
74 RTC_LOG(LS_INFO) << "OnRemoteDescription";
75 remote_description_callback_ = callback;
76 }
77
78 void OnIceCandidate(
79 std::function<void(std::unique_ptr<IceCandidateInterface> candidate)>
80 callback) override {
81 RTC_LOG(LS_INFO) << "OnIceCandidate";
82 ice_candidate_callback_ = callback;
83 }
84
85 T* stream_;
86
87 std::function<void(std::unique_ptr<webrtc::IceCandidateInterface>)>
88 ice_candidate_callback_;
89 std::function<void(std::unique_ptr<webrtc::SessionDescriptionInterface>)>
90 remote_description_callback_;
91};
92
93using ServerSessionData =
94 SessionData<grpc::ServerReaderWriter<SignalingMessage, SignalingMessage>>;
95using ClientSessionData =
96 SessionData<grpc::ClientReaderWriter<SignalingMessage, SignalingMessage>>;
97
98template <class MessageType, class StreamReader, class SessionData>
99void ProcessMessages(StreamReader* stream, SessionData* session) {
100 MessageType message;
101
102 while (stream->Read(&message)) {
103 switch (message.Content_case()) {
104 case SignalingMessage::ContentCase::kCandidate: {
105 webrtc::SdpParseError error;
106 auto jsep_candidate = std::make_unique<webrtc::JsepIceCandidate>(
107 message.candidate().mid(), message.candidate().mline_index());
108 if (!jsep_candidate->Initialize(message.candidate().description(),
109 &error)) {
110 RTC_LOG(LS_ERROR) << "Failed to deserialize ICE candidate '"
111 << message.candidate().description() << "'";
112 RTC_LOG(LS_ERROR)
113 << "Error at line " << error.line << ":" << error.description;
114 continue;
115 }
116
117 session->ice_candidate_callback_(std::move(jsep_candidate));
118 break;
119 }
120 case SignalingMessage::ContentCase::kDescription: {
121 auto& description = message.description();
122 auto content = description.content();
123
124 auto sdp = webrtc::CreateSessionDescription(
125 description.type() == SessionDescription::OFFER
126 ? webrtc::SdpType::kOffer
127 : webrtc::SdpType::kAnswer,
128 description.content());
129 session->remote_description_callback_(std::move(sdp));
130 break;
131 }
132 default:
133 RTC_DCHECK_NOTREACHED();
134 }
135 }
136}
137
138class GrpcNegotiationServer : public GrpcSignalingServerInterface,
139 public PeerConnectionSignaling::Service {
140 public:
141 GrpcNegotiationServer(
142 std::function<void(webrtc::SignalingInterface*)> callback,
143 int port,
144 bool oneshot)
145 : connect_callback_(std::move(callback)),
146 requested_port_(port),
147 oneshot_(oneshot) {}
148 ~GrpcNegotiationServer() override {
149 Stop();
150 if (server_stop_thread_)
151 server_stop_thread_->Stop();
152 }
153
154 void Start() override {
155 std::string server_address = "[::]";
156
157 grpc::ServerBuilder builder;
158 builder.AddListeningPort(
159 server_address + ":" + std::to_string(requested_port_),
160 grpc::InsecureServerCredentials(), &selected_port_);
161 builder.RegisterService(this);
162 server_ = builder.BuildAndStart();
163 }
164
165 void Wait() override { server_->Wait(); }
166
167 void Stop() override { server_->Shutdown(); }
168
169 int SelectedPort() override { return selected_port_; }
170
171 grpc::Status Connect(
172 grpc::ServerContext* context,
173 grpc::ServerReaderWriter<SignalingMessage, SignalingMessage>* stream)
174 override {
175 if (oneshot_) {
176 // Request the termination of the server early so we don't serve another
177 // client in parallel.
178 server_stop_thread_ = rtc::Thread::Create();
179 server_stop_thread_->Start();
180 server_stop_thread_->PostTask([this] { Stop(); });
181 }
182
183 ServerSessionData session(stream);
184
185 auto reading_thread = rtc::Thread::Create();
186 reading_thread->Start();
187 reading_thread->PostTask([&session, &stream] {
188 ProcessMessages<SignalingMessage>(stream, &session);
189 });
190
191 connect_callback_(&session);
192
193 reading_thread->Stop();
194
195 return grpc::Status::OK;
196 }
197
198 private:
199 std::function<void(webrtc::SignalingInterface*)> connect_callback_;
200 int requested_port_;
201 int selected_port_;
202 bool oneshot_;
203
204 std::unique_ptr<grpc::Server> server_;
205 std::unique_ptr<rtc::Thread> server_stop_thread_;
206};
207
208class GrpcNegotiationClient : public GrpcSignalingClientInterface {
209 public:
210 explicit GrpcNegotiationClient(const std::string& server) {
211 channel_ = grpc::CreateChannel(server, grpc::InsecureChannelCredentials());
212 stub_ = PeerConnectionSignaling::NewStub(channel_);
213 }
214
215 ~GrpcNegotiationClient() override {
216 context_.TryCancel();
217 if (reading_thread_)
218 reading_thread_->Stop();
219 }
220
221 bool Start() override {
222 if (!channel_->WaitForConnected(
223 absl::ToChronoTime(absl::Now() + absl::Seconds(3)))) {
224 return false;
225 }
226
227 stream_ = stub_->Connect(&context_);
228 session_.SetStream(stream_.get());
229
230 reading_thread_ = rtc::Thread::Create();
231 reading_thread_->Start();
232 reading_thread_->PostTask([this] {
233 ProcessMessages<SignalingMessage>(stream_.get(), &session_);
234 });
235
236 return true;
237 }
238
239 webrtc::SignalingInterface* signaling_client() override { return &session_; }
240
241 private:
242 std::shared_ptr<grpc::Channel> channel_;
243 std::unique_ptr<PeerConnectionSignaling::Stub> stub_;
244 std::unique_ptr<rtc::Thread> reading_thread_;
245 grpc::ClientContext context_;
246 std::unique_ptr<
247 ::grpc::ClientReaderWriter<SignalingMessage, SignalingMessage>>
248 stream_;
249 ClientSessionData session_;
250};
251} // namespace
252
253std::unique_ptr<GrpcSignalingServerInterface>
254GrpcSignalingServerInterface::Create(
255 std::function<void(webrtc::SignalingInterface*)> callback,
256 int port,
257 bool oneshot) {
258 return std::make_unique<GrpcNegotiationServer>(std::move(callback), port,
259 oneshot);
260}
261
262std::unique_ptr<GrpcSignalingClientInterface>
263GrpcSignalingClientInterface::Create(const std::string& server) {
264 return std::make_unique<GrpcNegotiationClient>(server);
265}
266
267} // namespace webrtc