blob: 744cf083b94e10a1de2ca06c58009a76c91fcbb4 [file] [log] [blame]
henrike@webrtc.orgf0488722014-05-13 18:00:26 +00001/*
2 * Copyright 2004 The WebRTC Project Authors. All rights reserved.
3 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 */
10
11#if defined(WEBRTC_POSIX)
12#include <sys/file.h>
13#endif // WEBRTC_POSIX
14#include <sys/types.h>
15#include <sys/stat.h>
16#include <errno.h>
andresp@webrtc.orgff689be2015-02-12 11:54:26 +000017
18#include <algorithm>
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000019#include <string>
andresp@webrtc.orgff689be2015-02-12 11:54:26 +000020
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000021#include "webrtc/base/basictypes.h"
nissec80e7412017-01-11 05:56:46 -080022#include "webrtc/base/checks.h"
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000023#include "webrtc/base/common.h"
24#include "webrtc/base/logging.h"
25#include "webrtc/base/messagequeue.h"
26#include "webrtc/base/stream.h"
27#include "webrtc/base/stringencode.h"
28#include "webrtc/base/stringutils.h"
29#include "webrtc/base/thread.h"
30#include "webrtc/base/timeutils.h"
31
32#if defined(WEBRTC_WIN)
33#include "webrtc/base/win32.h"
34#define fileno _fileno
35#endif
36
37namespace rtc {
38
39///////////////////////////////////////////////////////////////////////////////
40// StreamInterface
41///////////////////////////////////////////////////////////////////////////////
42StreamInterface::~StreamInterface() {
43}
44
45StreamResult StreamInterface::WriteAll(const void* data, size_t data_len,
46 size_t* written, int* error) {
47 StreamResult result = SR_SUCCESS;
48 size_t total_written = 0, current_written;
49 while (total_written < data_len) {
50 result = Write(static_cast<const char*>(data) + total_written,
51 data_len - total_written, &current_written, error);
52 if (result != SR_SUCCESS)
53 break;
54 total_written += current_written;
55 }
56 if (written)
57 *written = total_written;
58 return result;
59}
60
61StreamResult StreamInterface::ReadAll(void* buffer, size_t buffer_len,
62 size_t* read, int* error) {
63 StreamResult result = SR_SUCCESS;
64 size_t total_read = 0, current_read;
65 while (total_read < buffer_len) {
66 result = Read(static_cast<char*>(buffer) + total_read,
67 buffer_len - total_read, &current_read, error);
68 if (result != SR_SUCCESS)
69 break;
70 total_read += current_read;
71 }
72 if (read)
73 *read = total_read;
74 return result;
75}
76
77StreamResult StreamInterface::ReadLine(std::string* line) {
78 line->clear();
79 StreamResult result = SR_SUCCESS;
80 while (true) {
81 char ch;
82 result = Read(&ch, sizeof(ch), NULL, NULL);
83 if (result != SR_SUCCESS) {
84 break;
85 }
86 if (ch == '\n') {
87 break;
88 }
89 line->push_back(ch);
90 }
91 if (!line->empty()) { // give back the line we've collected so far with
92 result = SR_SUCCESS; // a success code. Otherwise return the last code
93 }
94 return result;
95}
96
97void StreamInterface::PostEvent(Thread* t, int events, int err) {
Taylor Brandstetter5d97a9a2016-06-10 14:17:27 -070098 t->Post(RTC_FROM_HERE, this, MSG_POST_EVENT,
99 new StreamEventData(events, err));
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000100}
101
102void StreamInterface::PostEvent(int events, int err) {
103 PostEvent(Thread::Current(), events, err);
104}
105
kwiberg@webrtc.org67186fe2015-03-09 22:21:53 +0000106const void* StreamInterface::GetReadData(size_t* data_len) {
107 return NULL;
108}
109
110void* StreamInterface::GetWriteBuffer(size_t* buf_len) {
111 return NULL;
112}
113
114bool StreamInterface::SetPosition(size_t position) {
115 return false;
116}
117
118bool StreamInterface::GetPosition(size_t* position) const {
119 return false;
120}
121
122bool StreamInterface::GetSize(size_t* size) const {
123 return false;
124}
125
126bool StreamInterface::GetAvailable(size_t* size) const {
127 return false;
128}
129
130bool StreamInterface::GetWriteRemaining(size_t* size) const {
131 return false;
132}
133
134bool StreamInterface::Flush() {
135 return false;
136}
137
138bool StreamInterface::ReserveSize(size_t size) {
139 return true;
140}
141
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000142StreamInterface::StreamInterface() {
143}
144
145void StreamInterface::OnMessage(Message* msg) {
146 if (MSG_POST_EVENT == msg->message_id) {
147 StreamEventData* pe = static_cast<StreamEventData*>(msg->pdata);
148 SignalEvent(this, pe->events, pe->error);
149 delete msg->pdata;
150 }
151}
152
153///////////////////////////////////////////////////////////////////////////////
154// StreamAdapterInterface
155///////////////////////////////////////////////////////////////////////////////
156
157StreamAdapterInterface::StreamAdapterInterface(StreamInterface* stream,
158 bool owned)
159 : stream_(stream), owned_(owned) {
160 if (NULL != stream_)
161 stream_->SignalEvent.connect(this, &StreamAdapterInterface::OnEvent);
162}
163
kwiberg@webrtc.org67186fe2015-03-09 22:21:53 +0000164StreamState StreamAdapterInterface::GetState() const {
165 return stream_->GetState();
166}
167StreamResult StreamAdapterInterface::Read(void* buffer,
168 size_t buffer_len,
169 size_t* read,
170 int* error) {
171 return stream_->Read(buffer, buffer_len, read, error);
172}
173StreamResult StreamAdapterInterface::Write(const void* data,
174 size_t data_len,
175 size_t* written,
176 int* error) {
177 return stream_->Write(data, data_len, written, error);
178}
179void StreamAdapterInterface::Close() {
180 stream_->Close();
181}
182
183bool StreamAdapterInterface::SetPosition(size_t position) {
184 return stream_->SetPosition(position);
185}
186
187bool StreamAdapterInterface::GetPosition(size_t* position) const {
188 return stream_->GetPosition(position);
189}
190
191bool StreamAdapterInterface::GetSize(size_t* size) const {
192 return stream_->GetSize(size);
193}
194
195bool StreamAdapterInterface::GetAvailable(size_t* size) const {
196 return stream_->GetAvailable(size);
197}
198
199bool StreamAdapterInterface::GetWriteRemaining(size_t* size) const {
200 return stream_->GetWriteRemaining(size);
201}
202
203bool StreamAdapterInterface::ReserveSize(size_t size) {
204 return stream_->ReserveSize(size);
205}
206
207bool StreamAdapterInterface::Flush() {
208 return stream_->Flush();
209}
210
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000211void StreamAdapterInterface::Attach(StreamInterface* stream, bool owned) {
212 if (NULL != stream_)
213 stream_->SignalEvent.disconnect(this);
214 if (owned_)
215 delete stream_;
216 stream_ = stream;
217 owned_ = owned;
218 if (NULL != stream_)
219 stream_->SignalEvent.connect(this, &StreamAdapterInterface::OnEvent);
220}
221
222StreamInterface* StreamAdapterInterface::Detach() {
223 if (NULL != stream_)
224 stream_->SignalEvent.disconnect(this);
225 StreamInterface* stream = stream_;
226 stream_ = NULL;
227 return stream;
228}
229
230StreamAdapterInterface::~StreamAdapterInterface() {
231 if (owned_)
232 delete stream_;
233}
234
kwiberg@webrtc.org67186fe2015-03-09 22:21:53 +0000235void StreamAdapterInterface::OnEvent(StreamInterface* stream,
236 int events,
237 int err) {
238 SignalEvent(this, events, err);
239}
240
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000241///////////////////////////////////////////////////////////////////////////////
242// StreamTap
243///////////////////////////////////////////////////////////////////////////////
244
245StreamTap::StreamTap(StreamInterface* stream, StreamInterface* tap)
246 : StreamAdapterInterface(stream), tap_(), tap_result_(SR_SUCCESS),
247 tap_error_(0) {
248 AttachTap(tap);
249}
250
kwiberg@webrtc.org67186fe2015-03-09 22:21:53 +0000251StreamTap::~StreamTap() = default;
252
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000253void StreamTap::AttachTap(StreamInterface* tap) {
254 tap_.reset(tap);
255}
256
257StreamInterface* StreamTap::DetachTap() {
258 return tap_.release();
259}
260
261StreamResult StreamTap::GetTapResult(int* error) {
262 if (error) {
263 *error = tap_error_;
264 }
265 return tap_result_;
266}
267
268StreamResult StreamTap::Read(void* buffer, size_t buffer_len,
269 size_t* read, int* error) {
270 size_t backup_read;
271 if (!read) {
272 read = &backup_read;
273 }
274 StreamResult res = StreamAdapterInterface::Read(buffer, buffer_len,
275 read, error);
276 if ((res == SR_SUCCESS) && (tap_result_ == SR_SUCCESS)) {
277 tap_result_ = tap_->WriteAll(buffer, *read, NULL, &tap_error_);
278 }
279 return res;
280}
281
282StreamResult StreamTap::Write(const void* data, size_t data_len,
283 size_t* written, int* error) {
284 size_t backup_written;
285 if (!written) {
286 written = &backup_written;
287 }
288 StreamResult res = StreamAdapterInterface::Write(data, data_len,
289 written, error);
290 if ((res == SR_SUCCESS) && (tap_result_ == SR_SUCCESS)) {
291 tap_result_ = tap_->WriteAll(data, *written, NULL, &tap_error_);
292 }
293 return res;
294}
295
296///////////////////////////////////////////////////////////////////////////////
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000297// NullStream
298///////////////////////////////////////////////////////////////////////////////
299
300NullStream::NullStream() {
301}
302
303NullStream::~NullStream() {
304}
305
306StreamState NullStream::GetState() const {
307 return SS_OPEN;
308}
309
310StreamResult NullStream::Read(void* buffer, size_t buffer_len,
311 size_t* read, int* error) {
312 if (error) *error = -1;
313 return SR_ERROR;
314}
315
316StreamResult NullStream::Write(const void* data, size_t data_len,
317 size_t* written, int* error) {
318 if (written) *written = data_len;
319 return SR_SUCCESS;
320}
321
322void NullStream::Close() {
323}
324
325///////////////////////////////////////////////////////////////////////////////
326// FileStream
327///////////////////////////////////////////////////////////////////////////////
328
329FileStream::FileStream() : file_(NULL) {
330}
331
332FileStream::~FileStream() {
333 FileStream::Close();
334}
335
336bool FileStream::Open(const std::string& filename, const char* mode,
337 int* error) {
338 Close();
339#if defined(WEBRTC_WIN)
340 std::wstring wfilename;
341 if (Utf8ToWindowsFilename(filename, &wfilename)) {
342 file_ = _wfopen(wfilename.c_str(), ToUtf16(mode).c_str());
343 } else {
344 if (error) {
345 *error = -1;
346 return false;
347 }
348 }
349#else
350 file_ = fopen(filename.c_str(), mode);
351#endif
352 if (!file_ && error) {
353 *error = errno;
354 }
355 return (file_ != NULL);
356}
357
358bool FileStream::OpenShare(const std::string& filename, const char* mode,
359 int shflag, int* error) {
360 Close();
361#if defined(WEBRTC_WIN)
362 std::wstring wfilename;
363 if (Utf8ToWindowsFilename(filename, &wfilename)) {
364 file_ = _wfsopen(wfilename.c_str(), ToUtf16(mode).c_str(), shflag);
365 if (!file_ && error) {
366 *error = errno;
367 return false;
368 }
369 return file_ != NULL;
370 } else {
371 if (error) {
372 *error = -1;
373 }
374 return false;
375 }
376#else
377 return Open(filename, mode, error);
378#endif
379}
380
381bool FileStream::DisableBuffering() {
382 if (!file_)
383 return false;
384 return (setvbuf(file_, NULL, _IONBF, 0) == 0);
385}
386
387StreamState FileStream::GetState() const {
388 return (file_ == NULL) ? SS_CLOSED : SS_OPEN;
389}
390
391StreamResult FileStream::Read(void* buffer, size_t buffer_len,
392 size_t* read, int* error) {
393 if (!file_)
394 return SR_EOS;
395 size_t result = fread(buffer, 1, buffer_len, file_);
396 if ((result == 0) && (buffer_len > 0)) {
397 if (feof(file_))
398 return SR_EOS;
399 if (error)
400 *error = errno;
401 return SR_ERROR;
402 }
403 if (read)
404 *read = result;
405 return SR_SUCCESS;
406}
407
408StreamResult FileStream::Write(const void* data, size_t data_len,
409 size_t* written, int* error) {
410 if (!file_)
411 return SR_EOS;
412 size_t result = fwrite(data, 1, data_len, file_);
413 if ((result == 0) && (data_len > 0)) {
414 if (error)
415 *error = errno;
416 return SR_ERROR;
417 }
418 if (written)
419 *written = result;
420 return SR_SUCCESS;
421}
422
423void FileStream::Close() {
424 if (file_) {
425 DoClose();
426 file_ = NULL;
427 }
428}
429
430bool FileStream::SetPosition(size_t position) {
431 if (!file_)
432 return false;
433 return (fseek(file_, static_cast<int>(position), SEEK_SET) == 0);
434}
435
436bool FileStream::GetPosition(size_t* position) const {
437 ASSERT(NULL != position);
438 if (!file_)
439 return false;
440 long result = ftell(file_);
441 if (result < 0)
442 return false;
443 if (position)
444 *position = result;
445 return true;
446}
447
448bool FileStream::GetSize(size_t* size) const {
449 ASSERT(NULL != size);
450 if (!file_)
451 return false;
452 struct stat file_stats;
453 if (fstat(fileno(file_), &file_stats) != 0)
454 return false;
455 if (size)
456 *size = file_stats.st_size;
457 return true;
458}
459
460bool FileStream::GetAvailable(size_t* size) const {
461 ASSERT(NULL != size);
462 if (!GetSize(size))
463 return false;
464 long result = ftell(file_);
465 if (result < 0)
466 return false;
467 if (size)
468 *size -= result;
469 return true;
470}
471
472bool FileStream::ReserveSize(size_t size) {
473 // TODO: extend the file to the proper length
474 return true;
475}
476
477bool FileStream::GetSize(const std::string& filename, size_t* size) {
478 struct stat file_stats;
479 if (stat(filename.c_str(), &file_stats) != 0)
480 return false;
481 *size = file_stats.st_size;
482 return true;
483}
484
485bool FileStream::Flush() {
486 if (file_) {
487 return (0 == fflush(file_));
488 }
489 // try to flush empty file?
nissec80e7412017-01-11 05:56:46 -0800490 RTC_NOTREACHED();
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000491 return false;
492}
493
494#if defined(WEBRTC_POSIX) && !defined(__native_client__)
495
496bool FileStream::TryLock() {
497 if (file_ == NULL) {
498 // Stream not open.
nissec80e7412017-01-11 05:56:46 -0800499 RTC_NOTREACHED();
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000500 return false;
501 }
502
503 return flock(fileno(file_), LOCK_EX|LOCK_NB) == 0;
504}
505
506bool FileStream::Unlock() {
507 if (file_ == NULL) {
508 // Stream not open.
nissec80e7412017-01-11 05:56:46 -0800509 RTC_NOTREACHED();
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000510 return false;
511 }
512
513 return flock(fileno(file_), LOCK_UN) == 0;
514}
515
516#endif
517
518void FileStream::DoClose() {
519 fclose(file_);
520}
521
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000522///////////////////////////////////////////////////////////////////////////////
523// MemoryStream
524///////////////////////////////////////////////////////////////////////////////
525
526MemoryStreamBase::MemoryStreamBase()
527 : buffer_(NULL), buffer_length_(0), data_length_(0),
528 seek_position_(0) {
529}
530
531StreamState MemoryStreamBase::GetState() const {
532 return SS_OPEN;
533}
534
535StreamResult MemoryStreamBase::Read(void* buffer, size_t bytes,
536 size_t* bytes_read, int* error) {
537 if (seek_position_ >= data_length_) {
538 return SR_EOS;
539 }
540 size_t available = data_length_ - seek_position_;
541 if (bytes > available) {
542 // Read partial buffer
543 bytes = available;
544 }
545 memcpy(buffer, &buffer_[seek_position_], bytes);
546 seek_position_ += bytes;
547 if (bytes_read) {
548 *bytes_read = bytes;
549 }
550 return SR_SUCCESS;
551}
552
553StreamResult MemoryStreamBase::Write(const void* buffer, size_t bytes,
554 size_t* bytes_written, int* error) {
555 size_t available = buffer_length_ - seek_position_;
556 if (0 == available) {
557 // Increase buffer size to the larger of:
558 // a) new position rounded up to next 256 bytes
559 // b) double the previous length
andresp@webrtc.orgff689be2015-02-12 11:54:26 +0000560 size_t new_buffer_length =
561 std::max(((seek_position_ + bytes) | 0xFF) + 1, buffer_length_ * 2);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000562 StreamResult result = DoReserve(new_buffer_length, error);
563 if (SR_SUCCESS != result) {
564 return result;
565 }
566 ASSERT(buffer_length_ >= new_buffer_length);
567 available = buffer_length_ - seek_position_;
568 }
569
570 if (bytes > available) {
571 bytes = available;
572 }
573 memcpy(&buffer_[seek_position_], buffer, bytes);
574 seek_position_ += bytes;
575 if (data_length_ < seek_position_) {
576 data_length_ = seek_position_;
577 }
578 if (bytes_written) {
579 *bytes_written = bytes;
580 }
581 return SR_SUCCESS;
582}
583
584void MemoryStreamBase::Close() {
585 // nothing to do
586}
587
588bool MemoryStreamBase::SetPosition(size_t position) {
589 if (position > data_length_)
590 return false;
591 seek_position_ = position;
592 return true;
593}
594
595bool MemoryStreamBase::GetPosition(size_t* position) const {
596 if (position)
597 *position = seek_position_;
598 return true;
599}
600
601bool MemoryStreamBase::GetSize(size_t* size) const {
602 if (size)
603 *size = data_length_;
604 return true;
605}
606
607bool MemoryStreamBase::GetAvailable(size_t* size) const {
608 if (size)
609 *size = data_length_ - seek_position_;
610 return true;
611}
612
613bool MemoryStreamBase::ReserveSize(size_t size) {
614 return (SR_SUCCESS == DoReserve(size, NULL));
615}
616
617StreamResult MemoryStreamBase::DoReserve(size_t size, int* error) {
618 return (buffer_length_ >= size) ? SR_SUCCESS : SR_EOS;
619}
620
621///////////////////////////////////////////////////////////////////////////////
622
623MemoryStream::MemoryStream()
624 : buffer_alloc_(NULL) {
625}
626
627MemoryStream::MemoryStream(const char* data)
628 : buffer_alloc_(NULL) {
629 SetData(data, strlen(data));
630}
631
632MemoryStream::MemoryStream(const void* data, size_t length)
633 : buffer_alloc_(NULL) {
634 SetData(data, length);
635}
636
637MemoryStream::~MemoryStream() {
638 delete [] buffer_alloc_;
639}
640
641void MemoryStream::SetData(const void* data, size_t length) {
642 data_length_ = buffer_length_ = length;
643 delete [] buffer_alloc_;
644 buffer_alloc_ = new char[buffer_length_ + kAlignment];
645 buffer_ = reinterpret_cast<char*>(ALIGNP(buffer_alloc_, kAlignment));
646 memcpy(buffer_, data, data_length_);
647 seek_position_ = 0;
648}
649
650StreamResult MemoryStream::DoReserve(size_t size, int* error) {
651 if (buffer_length_ >= size)
652 return SR_SUCCESS;
653
654 if (char* new_buffer_alloc = new char[size + kAlignment]) {
655 char* new_buffer = reinterpret_cast<char*>(
656 ALIGNP(new_buffer_alloc, kAlignment));
657 memcpy(new_buffer, buffer_, data_length_);
658 delete [] buffer_alloc_;
659 buffer_alloc_ = new_buffer_alloc;
660 buffer_ = new_buffer;
661 buffer_length_ = size;
662 return SR_SUCCESS;
663 }
664
665 if (error) {
666 *error = ENOMEM;
667 }
668 return SR_ERROR;
669}
670
671///////////////////////////////////////////////////////////////////////////////
672
673ExternalMemoryStream::ExternalMemoryStream() {
674}
675
676ExternalMemoryStream::ExternalMemoryStream(void* data, size_t length) {
677 SetData(data, length);
678}
679
680ExternalMemoryStream::~ExternalMemoryStream() {
681}
682
683void ExternalMemoryStream::SetData(void* data, size_t length) {
684 data_length_ = buffer_length_ = length;
685 buffer_ = static_cast<char*>(data);
686 seek_position_ = 0;
687}
688
689///////////////////////////////////////////////////////////////////////////////
690// FifoBuffer
691///////////////////////////////////////////////////////////////////////////////
692
693FifoBuffer::FifoBuffer(size_t size)
694 : state_(SS_OPEN), buffer_(new char[size]), buffer_length_(size),
695 data_length_(0), read_position_(0), owner_(Thread::Current()) {
696 // all events are done on the owner_ thread
697}
698
699FifoBuffer::FifoBuffer(size_t size, Thread* owner)
700 : state_(SS_OPEN), buffer_(new char[size]), buffer_length_(size),
701 data_length_(0), read_position_(0), owner_(owner) {
702 // all events are done on the owner_ thread
703}
704
705FifoBuffer::~FifoBuffer() {
706}
707
708bool FifoBuffer::GetBuffered(size_t* size) const {
709 CritScope cs(&crit_);
710 *size = data_length_;
711 return true;
712}
713
714bool FifoBuffer::SetCapacity(size_t size) {
715 CritScope cs(&crit_);
716 if (data_length_ > size) {
717 return false;
718 }
719
720 if (size != buffer_length_) {
721 char* buffer = new char[size];
722 const size_t copy = data_length_;
andresp@webrtc.orgff689be2015-02-12 11:54:26 +0000723 const size_t tail_copy = std::min(copy, buffer_length_ - read_position_);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000724 memcpy(buffer, &buffer_[read_position_], tail_copy);
725 memcpy(buffer + tail_copy, &buffer_[0], copy - tail_copy);
726 buffer_.reset(buffer);
727 read_position_ = 0;
728 buffer_length_ = size;
729 }
730 return true;
731}
732
733StreamResult FifoBuffer::ReadOffset(void* buffer, size_t bytes,
734 size_t offset, size_t* bytes_read) {
735 CritScope cs(&crit_);
736 return ReadOffsetLocked(buffer, bytes, offset, bytes_read);
737}
738
739StreamResult FifoBuffer::WriteOffset(const void* buffer, size_t bytes,
740 size_t offset, size_t* bytes_written) {
741 CritScope cs(&crit_);
742 return WriteOffsetLocked(buffer, bytes, offset, bytes_written);
743}
744
745StreamState FifoBuffer::GetState() const {
jbauch097d5492016-02-09 02:30:34 -0800746 CritScope cs(&crit_);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000747 return state_;
748}
749
750StreamResult FifoBuffer::Read(void* buffer, size_t bytes,
751 size_t* bytes_read, int* error) {
752 CritScope cs(&crit_);
753 const bool was_writable = data_length_ < buffer_length_;
754 size_t copy = 0;
755 StreamResult result = ReadOffsetLocked(buffer, bytes, 0, &copy);
756
757 if (result == SR_SUCCESS) {
758 // If read was successful then adjust the read position and number of
759 // bytes buffered.
760 read_position_ = (read_position_ + copy) % buffer_length_;
761 data_length_ -= copy;
762 if (bytes_read) {
763 *bytes_read = copy;
764 }
765
766 // if we were full before, and now we're not, post an event
767 if (!was_writable && copy > 0) {
768 PostEvent(owner_, SE_WRITE, 0);
769 }
770 }
771 return result;
772}
773
774StreamResult FifoBuffer::Write(const void* buffer, size_t bytes,
775 size_t* bytes_written, int* error) {
776 CritScope cs(&crit_);
777
778 const bool was_readable = (data_length_ > 0);
779 size_t copy = 0;
780 StreamResult result = WriteOffsetLocked(buffer, bytes, 0, &copy);
781
782 if (result == SR_SUCCESS) {
783 // If write was successful then adjust the number of readable bytes.
784 data_length_ += copy;
785 if (bytes_written) {
786 *bytes_written = copy;
787 }
788
789 // if we didn't have any data to read before, and now we do, post an event
790 if (!was_readable && copy > 0) {
791 PostEvent(owner_, SE_READ, 0);
792 }
793 }
794 return result;
795}
796
797void FifoBuffer::Close() {
798 CritScope cs(&crit_);
799 state_ = SS_CLOSED;
800}
801
802const void* FifoBuffer::GetReadData(size_t* size) {
803 CritScope cs(&crit_);
804 *size = (read_position_ + data_length_ <= buffer_length_) ?
805 data_length_ : buffer_length_ - read_position_;
806 return &buffer_[read_position_];
807}
808
809void FifoBuffer::ConsumeReadData(size_t size) {
810 CritScope cs(&crit_);
811 ASSERT(size <= data_length_);
812 const bool was_writable = data_length_ < buffer_length_;
813 read_position_ = (read_position_ + size) % buffer_length_;
814 data_length_ -= size;
815 if (!was_writable && size > 0) {
816 PostEvent(owner_, SE_WRITE, 0);
817 }
818}
819
820void* FifoBuffer::GetWriteBuffer(size_t* size) {
821 CritScope cs(&crit_);
822 if (state_ == SS_CLOSED) {
823 return NULL;
824 }
825
826 // if empty, reset the write position to the beginning, so we can get
827 // the biggest possible block
828 if (data_length_ == 0) {
829 read_position_ = 0;
830 }
831
832 const size_t write_position = (read_position_ + data_length_)
833 % buffer_length_;
834 *size = (write_position > read_position_ || data_length_ == 0) ?
835 buffer_length_ - write_position : read_position_ - write_position;
836 return &buffer_[write_position];
837}
838
839void FifoBuffer::ConsumeWriteBuffer(size_t size) {
840 CritScope cs(&crit_);
841 ASSERT(size <= buffer_length_ - data_length_);
842 const bool was_readable = (data_length_ > 0);
843 data_length_ += size;
844 if (!was_readable && size > 0) {
845 PostEvent(owner_, SE_READ, 0);
846 }
847}
848
849bool FifoBuffer::GetWriteRemaining(size_t* size) const {
850 CritScope cs(&crit_);
851 *size = buffer_length_ - data_length_;
852 return true;
853}
854
855StreamResult FifoBuffer::ReadOffsetLocked(void* buffer,
856 size_t bytes,
857 size_t offset,
858 size_t* bytes_read) {
859 if (offset >= data_length_) {
860 return (state_ != SS_CLOSED) ? SR_BLOCK : SR_EOS;
861 }
862
863 const size_t available = data_length_ - offset;
864 const size_t read_position = (read_position_ + offset) % buffer_length_;
andresp@webrtc.orgff689be2015-02-12 11:54:26 +0000865 const size_t copy = std::min(bytes, available);
866 const size_t tail_copy = std::min(copy, buffer_length_ - read_position);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000867 char* const p = static_cast<char*>(buffer);
868 memcpy(p, &buffer_[read_position], tail_copy);
869 memcpy(p + tail_copy, &buffer_[0], copy - tail_copy);
870
871 if (bytes_read) {
872 *bytes_read = copy;
873 }
874 return SR_SUCCESS;
875}
876
877StreamResult FifoBuffer::WriteOffsetLocked(const void* buffer,
878 size_t bytes,
879 size_t offset,
880 size_t* bytes_written) {
881 if (state_ == SS_CLOSED) {
882 return SR_EOS;
883 }
884
885 if (data_length_ + offset >= buffer_length_) {
886 return SR_BLOCK;
887 }
888
889 const size_t available = buffer_length_ - data_length_ - offset;
890 const size_t write_position = (read_position_ + data_length_ + offset)
891 % buffer_length_;
andresp@webrtc.orgff689be2015-02-12 11:54:26 +0000892 const size_t copy = std::min(bytes, available);
893 const size_t tail_copy = std::min(copy, buffer_length_ - write_position);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000894 const char* const p = static_cast<const char*>(buffer);
895 memcpy(&buffer_[write_position], p, tail_copy);
896 memcpy(&buffer_[0], p + tail_copy, copy - tail_copy);
897
898 if (bytes_written) {
899 *bytes_written = copy;
900 }
901 return SR_SUCCESS;
902}
903
904
905
906///////////////////////////////////////////////////////////////////////////////
907// LoggingAdapter
908///////////////////////////////////////////////////////////////////////////////
909
910LoggingAdapter::LoggingAdapter(StreamInterface* stream, LoggingSeverity level,
911 const std::string& label, bool hex_mode)
912 : StreamAdapterInterface(stream), level_(level), hex_mode_(hex_mode) {
913 set_label(label);
914}
915
916void LoggingAdapter::set_label(const std::string& label) {
917 label_.assign("[");
918 label_.append(label);
919 label_.append("]");
920}
921
922StreamResult LoggingAdapter::Read(void* buffer, size_t buffer_len,
923 size_t* read, int* error) {
924 size_t local_read; if (!read) read = &local_read;
925 StreamResult result = StreamAdapterInterface::Read(buffer, buffer_len, read,
926 error);
927 if (result == SR_SUCCESS) {
928 LogMultiline(level_, label_.c_str(), true, buffer, *read, hex_mode_, &lms_);
929 }
930 return result;
931}
932
933StreamResult LoggingAdapter::Write(const void* data, size_t data_len,
934 size_t* written, int* error) {
935 size_t local_written;
936 if (!written) written = &local_written;
937 StreamResult result = StreamAdapterInterface::Write(data, data_len, written,
938 error);
939 if (result == SR_SUCCESS) {
940 LogMultiline(level_, label_.c_str(), false, data, *written, hex_mode_,
941 &lms_);
942 }
943 return result;
944}
945
946void LoggingAdapter::Close() {
947 LogMultiline(level_, label_.c_str(), false, NULL, 0, hex_mode_, &lms_);
948 LogMultiline(level_, label_.c_str(), true, NULL, 0, hex_mode_, &lms_);
949 LOG_V(level_) << label_ << " Closed locally";
950 StreamAdapterInterface::Close();
951}
952
953void LoggingAdapter::OnEvent(StreamInterface* stream, int events, int err) {
954 if (events & SE_OPEN) {
955 LOG_V(level_) << label_ << " Open";
956 } else if (events & SE_CLOSE) {
957 LogMultiline(level_, label_.c_str(), false, NULL, 0, hex_mode_, &lms_);
958 LogMultiline(level_, label_.c_str(), true, NULL, 0, hex_mode_, &lms_);
959 LOG_V(level_) << label_ << " Closed with error: " << err;
960 }
961 StreamAdapterInterface::OnEvent(stream, events, err);
962}
963
964///////////////////////////////////////////////////////////////////////////////
965// StringStream - Reads/Writes to an external std::string
966///////////////////////////////////////////////////////////////////////////////
967
Tommi00aac5a2015-05-25 11:25:59 +0200968StringStream::StringStream(std::string* str)
969 : str_(*str), read_pos_(0), read_only_(false) {
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000970}
971
972StringStream::StringStream(const std::string& str)
973 : str_(const_cast<std::string&>(str)), read_pos_(0), read_only_(true) {
974}
975
976StreamState StringStream::GetState() const {
977 return SS_OPEN;
978}
979
980StreamResult StringStream::Read(void* buffer, size_t buffer_len,
981 size_t* read, int* error) {
andresp@webrtc.orgff689be2015-02-12 11:54:26 +0000982 size_t available = std::min(buffer_len, str_.size() - read_pos_);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000983 if (!available)
984 return SR_EOS;
985 memcpy(buffer, str_.data() + read_pos_, available);
986 read_pos_ += available;
987 if (read)
988 *read = available;
989 return SR_SUCCESS;
990}
991
992StreamResult StringStream::Write(const void* data, size_t data_len,
993 size_t* written, int* error) {
994 if (read_only_) {
995 if (error) {
996 *error = -1;
997 }
998 return SR_ERROR;
999 }
1000 str_.append(static_cast<const char*>(data),
1001 static_cast<const char*>(data) + data_len);
1002 if (written)
1003 *written = data_len;
1004 return SR_SUCCESS;
1005}
1006
1007void StringStream::Close() {
1008}
1009
1010bool StringStream::SetPosition(size_t position) {
1011 if (position > str_.size())
1012 return false;
1013 read_pos_ = position;
1014 return true;
1015}
1016
1017bool StringStream::GetPosition(size_t* position) const {
1018 if (position)
1019 *position = read_pos_;
1020 return true;
1021}
1022
1023bool StringStream::GetSize(size_t* size) const {
1024 if (size)
1025 *size = str_.size();
1026 return true;
1027}
1028
1029bool StringStream::GetAvailable(size_t* size) const {
1030 if (size)
1031 *size = str_.size() - read_pos_;
1032 return true;
1033}
1034
1035bool StringStream::ReserveSize(size_t size) {
1036 if (read_only_)
1037 return false;
1038 str_.reserve(size);
1039 return true;
1040}
1041
1042///////////////////////////////////////////////////////////////////////////////
1043// StreamReference
1044///////////////////////////////////////////////////////////////////////////////
1045
1046StreamReference::StreamReference(StreamInterface* stream)
1047 : StreamAdapterInterface(stream, false) {
1048 // owner set to false so the destructor does not free the stream.
1049 stream_ref_count_ = new StreamRefCount(stream);
1050}
1051
1052StreamInterface* StreamReference::NewReference() {
1053 stream_ref_count_->AddReference();
1054 return new StreamReference(stream_ref_count_, stream());
1055}
1056
1057StreamReference::~StreamReference() {
1058 stream_ref_count_->Release();
1059}
1060
1061StreamReference::StreamReference(StreamRefCount* stream_ref_count,
1062 StreamInterface* stream)
1063 : StreamAdapterInterface(stream, false),
1064 stream_ref_count_(stream_ref_count) {
1065}
1066
1067///////////////////////////////////////////////////////////////////////////////
1068
1069StreamResult Flow(StreamInterface* source,
1070 char* buffer, size_t buffer_len,
1071 StreamInterface* sink,
1072 size_t* data_len /* = NULL */) {
1073 ASSERT(buffer_len > 0);
1074
1075 StreamResult result;
1076 size_t count, read_pos, write_pos;
1077 if (data_len) {
1078 read_pos = *data_len;
1079 } else {
1080 read_pos = 0;
1081 }
1082
1083 bool end_of_stream = false;
1084 do {
1085 // Read until buffer is full, end of stream, or error
1086 while (!end_of_stream && (read_pos < buffer_len)) {
1087 result = source->Read(buffer + read_pos, buffer_len - read_pos,
1088 &count, NULL);
1089 if (result == SR_EOS) {
1090 end_of_stream = true;
1091 } else if (result != SR_SUCCESS) {
1092 if (data_len) {
1093 *data_len = read_pos;
1094 }
1095 return result;
1096 } else {
1097 read_pos += count;
1098 }
1099 }
1100
1101 // Write until buffer is empty, or error (including end of stream)
1102 write_pos = 0;
1103 while (write_pos < read_pos) {
1104 result = sink->Write(buffer + write_pos, read_pos - write_pos,
1105 &count, NULL);
1106 if (result != SR_SUCCESS) {
1107 if (data_len) {
1108 *data_len = read_pos - write_pos;
1109 if (write_pos > 0) {
1110 memmove(buffer, buffer + write_pos, *data_len);
1111 }
1112 }
1113 return result;
1114 }
1115 write_pos += count;
1116 }
1117
1118 read_pos = 0;
1119 } while (!end_of_stream);
1120
1121 if (data_len) {
1122 *data_len = 0;
1123 }
1124 return SR_SUCCESS;
1125}
1126
1127///////////////////////////////////////////////////////////////////////////////
1128
1129} // namespace rtc