blob: dc6694694da96ce11ef1759e5f9b40d9e7df8273 [file] [log] [blame]
henrike@webrtc.org28e20752013-07-10 00:45:36 +00001/*
2 * libjingle
3 * Copyright 2012, Google Inc.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 * 1. Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * 2. Redistributions in binary form must reproduce the above copyright notice,
11 * this list of conditions and the following disclaimer in the documentation
12 * and/or other materials provided with the distribution.
13 * 3. The name of the author may not be used to endorse or promote products
14 * derived from this software without specific prior written permission.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED
17 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
18 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
19 * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
20 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
22 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
23 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
24 * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
25 * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26 */
27
28#include "talk/examples/peerconnection/client/peer_connection_client.h"
29
30#include "talk/examples/peerconnection/client/defaults.h"
31#include "talk/base/common.h"
32#include "talk/base/nethelpers.h"
33#include "talk/base/logging.h"
34#include "talk/base/stringutils.h"
35
36#ifdef WIN32
37#include "talk/base/win32socketserver.h"
38#endif
39
40using talk_base::sprintfn;
41
42namespace {
43
44// This is our magical hangup signal.
45const char kByeMessage[] = "BYE";
46// Delay between server connection retries, in milliseconds
47const int kReconnectDelay = 2000;
48
49talk_base::AsyncSocket* CreateClientSocket(int family) {
50#ifdef WIN32
51 talk_base::Win32Socket* sock = new talk_base::Win32Socket();
52 sock->CreateT(family, SOCK_STREAM);
53 return sock;
54#elif defined(POSIX)
55 talk_base::Thread* thread = talk_base::Thread::Current();
56 ASSERT(thread != NULL);
57 return thread->socketserver()->CreateAsyncSocket(family, SOCK_STREAM);
58#else
59#error Platform not supported.
60#endif
61}
62
63}
64
65PeerConnectionClient::PeerConnectionClient()
66 : callback_(NULL),
67 resolver_(NULL),
68 state_(NOT_CONNECTED),
69 my_id_(-1) {
70}
71
72PeerConnectionClient::~PeerConnectionClient() {
73}
74
75void PeerConnectionClient::InitSocketSignals() {
76 ASSERT(control_socket_.get() != NULL);
77 ASSERT(hanging_get_.get() != NULL);
78 control_socket_->SignalCloseEvent.connect(this,
79 &PeerConnectionClient::OnClose);
80 hanging_get_->SignalCloseEvent.connect(this,
81 &PeerConnectionClient::OnClose);
82 control_socket_->SignalConnectEvent.connect(this,
83 &PeerConnectionClient::OnConnect);
84 hanging_get_->SignalConnectEvent.connect(this,
85 &PeerConnectionClient::OnHangingGetConnect);
86 control_socket_->SignalReadEvent.connect(this,
87 &PeerConnectionClient::OnRead);
88 hanging_get_->SignalReadEvent.connect(this,
89 &PeerConnectionClient::OnHangingGetRead);
90}
91
92int PeerConnectionClient::id() const {
93 return my_id_;
94}
95
96bool PeerConnectionClient::is_connected() const {
97 return my_id_ != -1;
98}
99
100const Peers& PeerConnectionClient::peers() const {
101 return peers_;
102}
103
104void PeerConnectionClient::RegisterObserver(
105 PeerConnectionClientObserver* callback) {
106 ASSERT(!callback_);
107 callback_ = callback;
108}
109
110void PeerConnectionClient::Connect(const std::string& server, int port,
111 const std::string& client_name) {
112 ASSERT(!server.empty());
113 ASSERT(!client_name.empty());
114
115 if (state_ != NOT_CONNECTED) {
116 LOG(WARNING)
117 << "The client must not be connected before you can call Connect()";
118 callback_->OnServerConnectionFailure();
119 return;
120 }
121
122 if (server.empty() || client_name.empty()) {
123 callback_->OnServerConnectionFailure();
124 return;
125 }
126
127 if (port <= 0)
128 port = kDefaultServerPort;
129
130 server_address_.SetIP(server);
131 server_address_.SetPort(port);
132 client_name_ = client_name;
133
134 if (server_address_.IsUnresolved()) {
135 state_ = RESOLVING;
136 resolver_ = new talk_base::AsyncResolver();
137 resolver_->SignalWorkDone.connect(this,
138 &PeerConnectionClient::OnResolveResult);
139 resolver_->set_address(server_address_);
140 resolver_->Start();
141 } else {
142 DoConnect();
143 }
144}
145
146void PeerConnectionClient::OnResolveResult(talk_base::SignalThread *t) {
147 if (resolver_->error() != 0) {
148 callback_->OnServerConnectionFailure();
149 resolver_->Destroy(false);
150 resolver_ = NULL;
151 state_ = NOT_CONNECTED;
152 } else {
153 server_address_ = resolver_->address();
154 DoConnect();
155 }
156}
157
158void PeerConnectionClient::DoConnect() {
159 control_socket_.reset(CreateClientSocket(server_address_.ipaddr().family()));
160 hanging_get_.reset(CreateClientSocket(server_address_.ipaddr().family()));
161 InitSocketSignals();
162 char buffer[1024];
163 sprintfn(buffer, sizeof(buffer),
164 "GET /sign_in?%s HTTP/1.0\r\n\r\n", client_name_.c_str());
165 onconnect_data_ = buffer;
166
167 bool ret = ConnectControlSocket();
168 if (ret)
169 state_ = SIGNING_IN;
170 if (!ret) {
171 callback_->OnServerConnectionFailure();
172 }
173}
174
175bool PeerConnectionClient::SendToPeer(int peer_id, const std::string& message) {
176 if (state_ != CONNECTED)
177 return false;
178
179 ASSERT(is_connected());
180 ASSERT(control_socket_->GetState() == talk_base::Socket::CS_CLOSED);
181 if (!is_connected() || peer_id == -1)
182 return false;
183
184 char headers[1024];
185 sprintfn(headers, sizeof(headers),
186 "POST /message?peer_id=%i&to=%i HTTP/1.0\r\n"
187 "Content-Length: %i\r\n"
188 "Content-Type: text/plain\r\n"
189 "\r\n",
190 my_id_, peer_id, message.length());
191 onconnect_data_ = headers;
192 onconnect_data_ += message;
193 return ConnectControlSocket();
194}
195
196bool PeerConnectionClient::SendHangUp(int peer_id) {
197 return SendToPeer(peer_id, kByeMessage);
198}
199
200bool PeerConnectionClient::IsSendingMessage() {
201 return state_ == CONNECTED &&
202 control_socket_->GetState() != talk_base::Socket::CS_CLOSED;
203}
204
205bool PeerConnectionClient::SignOut() {
206 if (state_ == NOT_CONNECTED || state_ == SIGNING_OUT)
207 return true;
208
209 if (hanging_get_->GetState() != talk_base::Socket::CS_CLOSED)
210 hanging_get_->Close();
211
212 if (control_socket_->GetState() == talk_base::Socket::CS_CLOSED) {
213 state_ = SIGNING_OUT;
214
215 if (my_id_ != -1) {
216 char buffer[1024];
217 sprintfn(buffer, sizeof(buffer),
218 "GET /sign_out?peer_id=%i HTTP/1.0\r\n\r\n", my_id_);
219 onconnect_data_ = buffer;
220 return ConnectControlSocket();
221 } else {
222 // Can occur if the app is closed before we finish connecting.
223 return true;
224 }
225 } else {
226 state_ = SIGNING_OUT_WAITING;
227 }
228
229 return true;
230}
231
232void PeerConnectionClient::Close() {
233 control_socket_->Close();
234 hanging_get_->Close();
235 onconnect_data_.clear();
236 peers_.clear();
237 if (resolver_ != NULL) {
238 resolver_->Destroy(false);
239 resolver_ = NULL;
240 }
241 my_id_ = -1;
242 state_ = NOT_CONNECTED;
243}
244
245bool PeerConnectionClient::ConnectControlSocket() {
246 ASSERT(control_socket_->GetState() == talk_base::Socket::CS_CLOSED);
247 int err = control_socket_->Connect(server_address_);
248 if (err == SOCKET_ERROR) {
249 Close();
250 return false;
251 }
252 return true;
253}
254
255void PeerConnectionClient::OnConnect(talk_base::AsyncSocket* socket) {
256 ASSERT(!onconnect_data_.empty());
257 size_t sent = socket->Send(onconnect_data_.c_str(), onconnect_data_.length());
258 ASSERT(sent == onconnect_data_.length());
259 UNUSED(sent);
260 onconnect_data_.clear();
261}
262
263void PeerConnectionClient::OnHangingGetConnect(talk_base::AsyncSocket* socket) {
264 char buffer[1024];
265 sprintfn(buffer, sizeof(buffer),
266 "GET /wait?peer_id=%i HTTP/1.0\r\n\r\n", my_id_);
267 int len = strlen(buffer);
268 int sent = socket->Send(buffer, len);
269 ASSERT(sent == len);
270 UNUSED2(sent, len);
271}
272
273void PeerConnectionClient::OnMessageFromPeer(int peer_id,
274 const std::string& message) {
275 if (message.length() == (sizeof(kByeMessage) - 1) &&
276 message.compare(kByeMessage) == 0) {
277 callback_->OnPeerDisconnected(peer_id);
278 } else {
279 callback_->OnMessageFromPeer(peer_id, message);
280 }
281}
282
283bool PeerConnectionClient::GetHeaderValue(const std::string& data,
284 size_t eoh,
285 const char* header_pattern,
286 size_t* value) {
287 ASSERT(value != NULL);
288 size_t found = data.find(header_pattern);
289 if (found != std::string::npos && found < eoh) {
290 *value = atoi(&data[found + strlen(header_pattern)]);
291 return true;
292 }
293 return false;
294}
295
296bool PeerConnectionClient::GetHeaderValue(const std::string& data, size_t eoh,
297 const char* header_pattern,
298 std::string* value) {
299 ASSERT(value != NULL);
300 size_t found = data.find(header_pattern);
301 if (found != std::string::npos && found < eoh) {
302 size_t begin = found + strlen(header_pattern);
303 size_t end = data.find("\r\n", begin);
304 if (end == std::string::npos)
305 end = eoh;
306 value->assign(data.substr(begin, end - begin));
307 return true;
308 }
309 return false;
310}
311
312bool PeerConnectionClient::ReadIntoBuffer(talk_base::AsyncSocket* socket,
313 std::string* data,
314 size_t* content_length) {
315 char buffer[0xffff];
316 do {
317 int bytes = socket->Recv(buffer, sizeof(buffer));
318 if (bytes <= 0)
319 break;
320 data->append(buffer, bytes);
321 } while (true);
322
323 bool ret = false;
324 size_t i = data->find("\r\n\r\n");
325 if (i != std::string::npos) {
326 LOG(INFO) << "Headers received";
327 if (GetHeaderValue(*data, i, "\r\nContent-Length: ", content_length)) {
328 size_t total_response_size = (i + 4) + *content_length;
329 if (data->length() >= total_response_size) {
330 ret = true;
331 std::string should_close;
332 const char kConnection[] = "\r\nConnection: ";
333 if (GetHeaderValue(*data, i, kConnection, &should_close) &&
334 should_close.compare("close") == 0) {
335 socket->Close();
336 // Since we closed the socket, there was no notification delivered
337 // to us. Compensate by letting ourselves know.
338 OnClose(socket, 0);
339 }
340 } else {
341 // We haven't received everything. Just continue to accept data.
342 }
343 } else {
344 LOG(LS_ERROR) << "No content length field specified by the server.";
345 }
346 }
347 return ret;
348}
349
350void PeerConnectionClient::OnRead(talk_base::AsyncSocket* socket) {
351 size_t content_length = 0;
352 if (ReadIntoBuffer(socket, &control_data_, &content_length)) {
353 size_t peer_id = 0, eoh = 0;
354 bool ok = ParseServerResponse(control_data_, content_length, &peer_id,
355 &eoh);
356 if (ok) {
357 if (my_id_ == -1) {
358 // First response. Let's store our server assigned ID.
359 ASSERT(state_ == SIGNING_IN);
360 my_id_ = peer_id;
361 ASSERT(my_id_ != -1);
362
363 // The body of the response will be a list of already connected peers.
364 if (content_length) {
365 size_t pos = eoh + 4;
366 while (pos < control_data_.size()) {
367 size_t eol = control_data_.find('\n', pos);
368 if (eol == std::string::npos)
369 break;
370 int id = 0;
371 std::string name;
372 bool connected;
373 if (ParseEntry(control_data_.substr(pos, eol - pos), &name, &id,
374 &connected) && id != my_id_) {
375 peers_[id] = name;
376 callback_->OnPeerConnected(id, name);
377 }
378 pos = eol + 1;
379 }
380 }
381 ASSERT(is_connected());
382 callback_->OnSignedIn();
383 } else if (state_ == SIGNING_OUT) {
384 Close();
385 callback_->OnDisconnected();
386 } else if (state_ == SIGNING_OUT_WAITING) {
387 SignOut();
388 }
389 }
390
391 control_data_.clear();
392
393 if (state_ == SIGNING_IN) {
394 ASSERT(hanging_get_->GetState() == talk_base::Socket::CS_CLOSED);
395 state_ = CONNECTED;
396 hanging_get_->Connect(server_address_);
397 }
398 }
399}
400
401void PeerConnectionClient::OnHangingGetRead(talk_base::AsyncSocket* socket) {
402 LOG(INFO) << __FUNCTION__;
403 size_t content_length = 0;
404 if (ReadIntoBuffer(socket, &notification_data_, &content_length)) {
405 size_t peer_id = 0, eoh = 0;
406 bool ok = ParseServerResponse(notification_data_, content_length,
407 &peer_id, &eoh);
408
409 if (ok) {
410 // Store the position where the body begins.
411 size_t pos = eoh + 4;
412
413 if (my_id_ == static_cast<int>(peer_id)) {
414 // A notification about a new member or a member that just
415 // disconnected.
416 int id = 0;
417 std::string name;
418 bool connected = false;
419 if (ParseEntry(notification_data_.substr(pos), &name, &id,
420 &connected)) {
421 if (connected) {
422 peers_[id] = name;
423 callback_->OnPeerConnected(id, name);
424 } else {
425 peers_.erase(id);
426 callback_->OnPeerDisconnected(id);
427 }
428 }
429 } else {
430 OnMessageFromPeer(peer_id, notification_data_.substr(pos));
431 }
432 }
433
434 notification_data_.clear();
435 }
436
437 if (hanging_get_->GetState() == talk_base::Socket::CS_CLOSED &&
438 state_ == CONNECTED) {
439 hanging_get_->Connect(server_address_);
440 }
441}
442
443bool PeerConnectionClient::ParseEntry(const std::string& entry,
444 std::string* name,
445 int* id,
446 bool* connected) {
447 ASSERT(name != NULL);
448 ASSERT(id != NULL);
449 ASSERT(connected != NULL);
450 ASSERT(!entry.empty());
451
452 *connected = false;
453 size_t separator = entry.find(',');
454 if (separator != std::string::npos) {
455 *id = atoi(&entry[separator + 1]);
456 name->assign(entry.substr(0, separator));
457 separator = entry.find(',', separator + 1);
458 if (separator != std::string::npos) {
459 *connected = atoi(&entry[separator + 1]) ? true : false;
460 }
461 }
462 return !name->empty();
463}
464
465int PeerConnectionClient::GetResponseStatus(const std::string& response) {
466 int status = -1;
467 size_t pos = response.find(' ');
468 if (pos != std::string::npos)
469 status = atoi(&response[pos + 1]);
470 return status;
471}
472
473bool PeerConnectionClient::ParseServerResponse(const std::string& response,
474 size_t content_length,
475 size_t* peer_id,
476 size_t* eoh) {
477 int status = GetResponseStatus(response.c_str());
478 if (status != 200) {
479 LOG(LS_ERROR) << "Received error from server";
480 Close();
481 callback_->OnDisconnected();
482 return false;
483 }
484
485 *eoh = response.find("\r\n\r\n");
486 ASSERT(*eoh != std::string::npos);
487 if (*eoh == std::string::npos)
488 return false;
489
490 *peer_id = -1;
491
492 // See comment in peer_channel.cc for why we use the Pragma header and
493 // not e.g. "X-Peer-Id".
494 GetHeaderValue(response, *eoh, "\r\nPragma: ", peer_id);
495
496 return true;
497}
498
499void PeerConnectionClient::OnClose(talk_base::AsyncSocket* socket, int err) {
500 LOG(INFO) << __FUNCTION__;
501
502 socket->Close();
503
504#ifdef WIN32
505 if (err != WSAECONNREFUSED) {
506#else
507 if (err != ECONNREFUSED) {
508#endif
509 if (socket == hanging_get_.get()) {
510 if (state_ == CONNECTED) {
511 hanging_get_->Close();
512 hanging_get_->Connect(server_address_);
513 }
514 } else {
515 callback_->OnMessageSent(err);
516 }
517 } else {
518 if (socket == control_socket_.get()) {
519 LOG(WARNING) << "Connection refused; retrying in 2 seconds";
520 talk_base::Thread::Current()->PostDelayed(kReconnectDelay, this, 0);
521 } else {
522 Close();
523 callback_->OnDisconnected();
524 }
525 }
526}
527
528void PeerConnectionClient::OnMessage(talk_base::Message* msg) {
529 // ignore msg; there is currently only one supported message ("retry")
530 DoConnect();
531}