blob: f58d3673e9168248d19b77fa4ca74b6a2c65707b [file] [log] [blame]
henrike@webrtc.orgf0488722014-05-13 18:00:26 +00001/*
2 * Copyright 2004 The WebRTC Project Authors. All rights reserved.
3 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 */
10
11#if defined(WEBRTC_POSIX)
12#include <sys/file.h>
13#endif // WEBRTC_POSIX
14#include <sys/types.h>
15#include <sys/stat.h>
16#include <errno.h>
andresp@webrtc.orgff689be2015-02-12 11:54:26 +000017
18#include <algorithm>
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000019#include <string>
andresp@webrtc.orgff689be2015-02-12 11:54:26 +000020
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000021#include "webrtc/base/basictypes.h"
nissec80e7412017-01-11 05:56:46 -080022#include "webrtc/base/checks.h"
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000023#include "webrtc/base/logging.h"
24#include "webrtc/base/messagequeue.h"
25#include "webrtc/base/stream.h"
26#include "webrtc/base/stringencode.h"
27#include "webrtc/base/stringutils.h"
28#include "webrtc/base/thread.h"
29#include "webrtc/base/timeutils.h"
30
31#if defined(WEBRTC_WIN)
32#include "webrtc/base/win32.h"
33#define fileno _fileno
34#endif
35
36namespace rtc {
37
38///////////////////////////////////////////////////////////////////////////////
39// StreamInterface
40///////////////////////////////////////////////////////////////////////////////
41StreamInterface::~StreamInterface() {
42}
43
44StreamResult StreamInterface::WriteAll(const void* data, size_t data_len,
45 size_t* written, int* error) {
46 StreamResult result = SR_SUCCESS;
47 size_t total_written = 0, current_written;
48 while (total_written < data_len) {
49 result = Write(static_cast<const char*>(data) + total_written,
50 data_len - total_written, &current_written, error);
51 if (result != SR_SUCCESS)
52 break;
53 total_written += current_written;
54 }
55 if (written)
56 *written = total_written;
57 return result;
58}
59
60StreamResult StreamInterface::ReadAll(void* buffer, size_t buffer_len,
61 size_t* read, int* error) {
62 StreamResult result = SR_SUCCESS;
63 size_t total_read = 0, current_read;
64 while (total_read < buffer_len) {
65 result = Read(static_cast<char*>(buffer) + total_read,
66 buffer_len - total_read, &current_read, error);
67 if (result != SR_SUCCESS)
68 break;
69 total_read += current_read;
70 }
71 if (read)
72 *read = total_read;
73 return result;
74}
75
76StreamResult StreamInterface::ReadLine(std::string* line) {
77 line->clear();
78 StreamResult result = SR_SUCCESS;
79 while (true) {
80 char ch;
81 result = Read(&ch, sizeof(ch), NULL, NULL);
82 if (result != SR_SUCCESS) {
83 break;
84 }
85 if (ch == '\n') {
86 break;
87 }
88 line->push_back(ch);
89 }
90 if (!line->empty()) { // give back the line we've collected so far with
91 result = SR_SUCCESS; // a success code. Otherwise return the last code
92 }
93 return result;
94}
95
96void StreamInterface::PostEvent(Thread* t, int events, int err) {
Taylor Brandstetter5d97a9a2016-06-10 14:17:27 -070097 t->Post(RTC_FROM_HERE, this, MSG_POST_EVENT,
98 new StreamEventData(events, err));
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000099}
100
101void StreamInterface::PostEvent(int events, int err) {
102 PostEvent(Thread::Current(), events, err);
103}
104
kwiberg@webrtc.org67186fe2015-03-09 22:21:53 +0000105const void* StreamInterface::GetReadData(size_t* data_len) {
106 return NULL;
107}
108
109void* StreamInterface::GetWriteBuffer(size_t* buf_len) {
110 return NULL;
111}
112
113bool StreamInterface::SetPosition(size_t position) {
114 return false;
115}
116
117bool StreamInterface::GetPosition(size_t* position) const {
118 return false;
119}
120
121bool StreamInterface::GetSize(size_t* size) const {
122 return false;
123}
124
125bool StreamInterface::GetAvailable(size_t* size) const {
126 return false;
127}
128
129bool StreamInterface::GetWriteRemaining(size_t* size) const {
130 return false;
131}
132
133bool StreamInterface::Flush() {
134 return false;
135}
136
137bool StreamInterface::ReserveSize(size_t size) {
138 return true;
139}
140
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000141StreamInterface::StreamInterface() {
142}
143
144void StreamInterface::OnMessage(Message* msg) {
145 if (MSG_POST_EVENT == msg->message_id) {
146 StreamEventData* pe = static_cast<StreamEventData*>(msg->pdata);
147 SignalEvent(this, pe->events, pe->error);
148 delete msg->pdata;
149 }
150}
151
152///////////////////////////////////////////////////////////////////////////////
153// StreamAdapterInterface
154///////////////////////////////////////////////////////////////////////////////
155
156StreamAdapterInterface::StreamAdapterInterface(StreamInterface* stream,
157 bool owned)
158 : stream_(stream), owned_(owned) {
159 if (NULL != stream_)
160 stream_->SignalEvent.connect(this, &StreamAdapterInterface::OnEvent);
161}
162
kwiberg@webrtc.org67186fe2015-03-09 22:21:53 +0000163StreamState StreamAdapterInterface::GetState() const {
164 return stream_->GetState();
165}
166StreamResult StreamAdapterInterface::Read(void* buffer,
167 size_t buffer_len,
168 size_t* read,
169 int* error) {
170 return stream_->Read(buffer, buffer_len, read, error);
171}
172StreamResult StreamAdapterInterface::Write(const void* data,
173 size_t data_len,
174 size_t* written,
175 int* error) {
176 return stream_->Write(data, data_len, written, error);
177}
178void StreamAdapterInterface::Close() {
179 stream_->Close();
180}
181
182bool StreamAdapterInterface::SetPosition(size_t position) {
183 return stream_->SetPosition(position);
184}
185
186bool StreamAdapterInterface::GetPosition(size_t* position) const {
187 return stream_->GetPosition(position);
188}
189
190bool StreamAdapterInterface::GetSize(size_t* size) const {
191 return stream_->GetSize(size);
192}
193
194bool StreamAdapterInterface::GetAvailable(size_t* size) const {
195 return stream_->GetAvailable(size);
196}
197
198bool StreamAdapterInterface::GetWriteRemaining(size_t* size) const {
199 return stream_->GetWriteRemaining(size);
200}
201
202bool StreamAdapterInterface::ReserveSize(size_t size) {
203 return stream_->ReserveSize(size);
204}
205
206bool StreamAdapterInterface::Flush() {
207 return stream_->Flush();
208}
209
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000210void StreamAdapterInterface::Attach(StreamInterface* stream, bool owned) {
211 if (NULL != stream_)
212 stream_->SignalEvent.disconnect(this);
213 if (owned_)
214 delete stream_;
215 stream_ = stream;
216 owned_ = owned;
217 if (NULL != stream_)
218 stream_->SignalEvent.connect(this, &StreamAdapterInterface::OnEvent);
219}
220
221StreamInterface* StreamAdapterInterface::Detach() {
222 if (NULL != stream_)
223 stream_->SignalEvent.disconnect(this);
224 StreamInterface* stream = stream_;
225 stream_ = NULL;
226 return stream;
227}
228
229StreamAdapterInterface::~StreamAdapterInterface() {
230 if (owned_)
231 delete stream_;
232}
233
kwiberg@webrtc.org67186fe2015-03-09 22:21:53 +0000234void StreamAdapterInterface::OnEvent(StreamInterface* stream,
235 int events,
236 int err) {
237 SignalEvent(this, events, err);
238}
239
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000240///////////////////////////////////////////////////////////////////////////////
241// StreamTap
242///////////////////////////////////////////////////////////////////////////////
243
244StreamTap::StreamTap(StreamInterface* stream, StreamInterface* tap)
245 : StreamAdapterInterface(stream), tap_(), tap_result_(SR_SUCCESS),
246 tap_error_(0) {
247 AttachTap(tap);
248}
249
kwiberg@webrtc.org67186fe2015-03-09 22:21:53 +0000250StreamTap::~StreamTap() = default;
251
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000252void StreamTap::AttachTap(StreamInterface* tap) {
253 tap_.reset(tap);
254}
255
256StreamInterface* StreamTap::DetachTap() {
257 return tap_.release();
258}
259
260StreamResult StreamTap::GetTapResult(int* error) {
261 if (error) {
262 *error = tap_error_;
263 }
264 return tap_result_;
265}
266
267StreamResult StreamTap::Read(void* buffer, size_t buffer_len,
268 size_t* read, int* error) {
269 size_t backup_read;
270 if (!read) {
271 read = &backup_read;
272 }
273 StreamResult res = StreamAdapterInterface::Read(buffer, buffer_len,
274 read, error);
275 if ((res == SR_SUCCESS) && (tap_result_ == SR_SUCCESS)) {
276 tap_result_ = tap_->WriteAll(buffer, *read, NULL, &tap_error_);
277 }
278 return res;
279}
280
281StreamResult StreamTap::Write(const void* data, size_t data_len,
282 size_t* written, int* error) {
283 size_t backup_written;
284 if (!written) {
285 written = &backup_written;
286 }
287 StreamResult res = StreamAdapterInterface::Write(data, data_len,
288 written, error);
289 if ((res == SR_SUCCESS) && (tap_result_ == SR_SUCCESS)) {
290 tap_result_ = tap_->WriteAll(data, *written, NULL, &tap_error_);
291 }
292 return res;
293}
294
295///////////////////////////////////////////////////////////////////////////////
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000296// NullStream
297///////////////////////////////////////////////////////////////////////////////
298
299NullStream::NullStream() {
300}
301
302NullStream::~NullStream() {
303}
304
305StreamState NullStream::GetState() const {
306 return SS_OPEN;
307}
308
309StreamResult NullStream::Read(void* buffer, size_t buffer_len,
310 size_t* read, int* error) {
311 if (error) *error = -1;
312 return SR_ERROR;
313}
314
315StreamResult NullStream::Write(const void* data, size_t data_len,
316 size_t* written, int* error) {
317 if (written) *written = data_len;
318 return SR_SUCCESS;
319}
320
321void NullStream::Close() {
322}
323
324///////////////////////////////////////////////////////////////////////////////
325// FileStream
326///////////////////////////////////////////////////////////////////////////////
327
328FileStream::FileStream() : file_(NULL) {
329}
330
331FileStream::~FileStream() {
332 FileStream::Close();
333}
334
335bool FileStream::Open(const std::string& filename, const char* mode,
336 int* error) {
337 Close();
338#if defined(WEBRTC_WIN)
339 std::wstring wfilename;
340 if (Utf8ToWindowsFilename(filename, &wfilename)) {
341 file_ = _wfopen(wfilename.c_str(), ToUtf16(mode).c_str());
342 } else {
343 if (error) {
344 *error = -1;
345 return false;
346 }
347 }
348#else
349 file_ = fopen(filename.c_str(), mode);
350#endif
351 if (!file_ && error) {
352 *error = errno;
353 }
354 return (file_ != NULL);
355}
356
357bool FileStream::OpenShare(const std::string& filename, const char* mode,
358 int shflag, int* error) {
359 Close();
360#if defined(WEBRTC_WIN)
361 std::wstring wfilename;
362 if (Utf8ToWindowsFilename(filename, &wfilename)) {
363 file_ = _wfsopen(wfilename.c_str(), ToUtf16(mode).c_str(), shflag);
364 if (!file_ && error) {
365 *error = errno;
366 return false;
367 }
368 return file_ != NULL;
369 } else {
370 if (error) {
371 *error = -1;
372 }
373 return false;
374 }
375#else
376 return Open(filename, mode, error);
377#endif
378}
379
380bool FileStream::DisableBuffering() {
381 if (!file_)
382 return false;
383 return (setvbuf(file_, NULL, _IONBF, 0) == 0);
384}
385
386StreamState FileStream::GetState() const {
387 return (file_ == NULL) ? SS_CLOSED : SS_OPEN;
388}
389
390StreamResult FileStream::Read(void* buffer, size_t buffer_len,
391 size_t* read, int* error) {
392 if (!file_)
393 return SR_EOS;
394 size_t result = fread(buffer, 1, buffer_len, file_);
395 if ((result == 0) && (buffer_len > 0)) {
396 if (feof(file_))
397 return SR_EOS;
398 if (error)
399 *error = errno;
400 return SR_ERROR;
401 }
402 if (read)
403 *read = result;
404 return SR_SUCCESS;
405}
406
407StreamResult FileStream::Write(const void* data, size_t data_len,
408 size_t* written, int* error) {
409 if (!file_)
410 return SR_EOS;
411 size_t result = fwrite(data, 1, data_len, file_);
412 if ((result == 0) && (data_len > 0)) {
413 if (error)
414 *error = errno;
415 return SR_ERROR;
416 }
417 if (written)
418 *written = result;
419 return SR_SUCCESS;
420}
421
422void FileStream::Close() {
423 if (file_) {
424 DoClose();
425 file_ = NULL;
426 }
427}
428
429bool FileStream::SetPosition(size_t position) {
430 if (!file_)
431 return false;
432 return (fseek(file_, static_cast<int>(position), SEEK_SET) == 0);
433}
434
435bool FileStream::GetPosition(size_t* position) const {
nisseede5da42017-01-12 05:15:36 -0800436 RTC_DCHECK(NULL != position);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000437 if (!file_)
438 return false;
439 long result = ftell(file_);
440 if (result < 0)
441 return false;
442 if (position)
443 *position = result;
444 return true;
445}
446
447bool FileStream::GetSize(size_t* size) const {
nisseede5da42017-01-12 05:15:36 -0800448 RTC_DCHECK(NULL != size);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000449 if (!file_)
450 return false;
451 struct stat file_stats;
452 if (fstat(fileno(file_), &file_stats) != 0)
453 return false;
454 if (size)
455 *size = file_stats.st_size;
456 return true;
457}
458
459bool FileStream::GetAvailable(size_t* size) const {
nisseede5da42017-01-12 05:15:36 -0800460 RTC_DCHECK(NULL != size);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000461 if (!GetSize(size))
462 return false;
463 long result = ftell(file_);
464 if (result < 0)
465 return false;
466 if (size)
467 *size -= result;
468 return true;
469}
470
471bool FileStream::ReserveSize(size_t size) {
472 // TODO: extend the file to the proper length
473 return true;
474}
475
476bool FileStream::GetSize(const std::string& filename, size_t* size) {
477 struct stat file_stats;
478 if (stat(filename.c_str(), &file_stats) != 0)
479 return false;
480 *size = file_stats.st_size;
481 return true;
482}
483
484bool FileStream::Flush() {
485 if (file_) {
486 return (0 == fflush(file_));
487 }
488 // try to flush empty file?
nissec80e7412017-01-11 05:56:46 -0800489 RTC_NOTREACHED();
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000490 return false;
491}
492
493#if defined(WEBRTC_POSIX) && !defined(__native_client__)
494
495bool FileStream::TryLock() {
496 if (file_ == NULL) {
497 // Stream not open.
nissec80e7412017-01-11 05:56:46 -0800498 RTC_NOTREACHED();
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000499 return false;
500 }
501
502 return flock(fileno(file_), LOCK_EX|LOCK_NB) == 0;
503}
504
505bool FileStream::Unlock() {
506 if (file_ == NULL) {
507 // Stream not open.
nissec80e7412017-01-11 05:56:46 -0800508 RTC_NOTREACHED();
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000509 return false;
510 }
511
512 return flock(fileno(file_), LOCK_UN) == 0;
513}
514
515#endif
516
517void FileStream::DoClose() {
518 fclose(file_);
519}
520
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000521///////////////////////////////////////////////////////////////////////////////
522// MemoryStream
523///////////////////////////////////////////////////////////////////////////////
524
525MemoryStreamBase::MemoryStreamBase()
526 : buffer_(NULL), buffer_length_(0), data_length_(0),
527 seek_position_(0) {
528}
529
530StreamState MemoryStreamBase::GetState() const {
531 return SS_OPEN;
532}
533
534StreamResult MemoryStreamBase::Read(void* buffer, size_t bytes,
535 size_t* bytes_read, int* error) {
536 if (seek_position_ >= data_length_) {
537 return SR_EOS;
538 }
539 size_t available = data_length_ - seek_position_;
540 if (bytes > available) {
541 // Read partial buffer
542 bytes = available;
543 }
544 memcpy(buffer, &buffer_[seek_position_], bytes);
545 seek_position_ += bytes;
546 if (bytes_read) {
547 *bytes_read = bytes;
548 }
549 return SR_SUCCESS;
550}
551
552StreamResult MemoryStreamBase::Write(const void* buffer, size_t bytes,
553 size_t* bytes_written, int* error) {
554 size_t available = buffer_length_ - seek_position_;
555 if (0 == available) {
556 // Increase buffer size to the larger of:
557 // a) new position rounded up to next 256 bytes
558 // b) double the previous length
andresp@webrtc.orgff689be2015-02-12 11:54:26 +0000559 size_t new_buffer_length =
560 std::max(((seek_position_ + bytes) | 0xFF) + 1, buffer_length_ * 2);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000561 StreamResult result = DoReserve(new_buffer_length, error);
562 if (SR_SUCCESS != result) {
563 return result;
564 }
nisseede5da42017-01-12 05:15:36 -0800565 RTC_DCHECK(buffer_length_ >= new_buffer_length);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000566 available = buffer_length_ - seek_position_;
567 }
568
569 if (bytes > available) {
570 bytes = available;
571 }
572 memcpy(&buffer_[seek_position_], buffer, bytes);
573 seek_position_ += bytes;
574 if (data_length_ < seek_position_) {
575 data_length_ = seek_position_;
576 }
577 if (bytes_written) {
578 *bytes_written = bytes;
579 }
580 return SR_SUCCESS;
581}
582
583void MemoryStreamBase::Close() {
584 // nothing to do
585}
586
587bool MemoryStreamBase::SetPosition(size_t position) {
588 if (position > data_length_)
589 return false;
590 seek_position_ = position;
591 return true;
592}
593
594bool MemoryStreamBase::GetPosition(size_t* position) const {
595 if (position)
596 *position = seek_position_;
597 return true;
598}
599
600bool MemoryStreamBase::GetSize(size_t* size) const {
601 if (size)
602 *size = data_length_;
603 return true;
604}
605
606bool MemoryStreamBase::GetAvailable(size_t* size) const {
607 if (size)
608 *size = data_length_ - seek_position_;
609 return true;
610}
611
612bool MemoryStreamBase::ReserveSize(size_t size) {
613 return (SR_SUCCESS == DoReserve(size, NULL));
614}
615
616StreamResult MemoryStreamBase::DoReserve(size_t size, int* error) {
617 return (buffer_length_ >= size) ? SR_SUCCESS : SR_EOS;
618}
619
620///////////////////////////////////////////////////////////////////////////////
621
622MemoryStream::MemoryStream()
623 : buffer_alloc_(NULL) {
624}
625
626MemoryStream::MemoryStream(const char* data)
627 : buffer_alloc_(NULL) {
628 SetData(data, strlen(data));
629}
630
631MemoryStream::MemoryStream(const void* data, size_t length)
632 : buffer_alloc_(NULL) {
633 SetData(data, length);
634}
635
636MemoryStream::~MemoryStream() {
637 delete [] buffer_alloc_;
638}
639
640void MemoryStream::SetData(const void* data, size_t length) {
641 data_length_ = buffer_length_ = length;
642 delete [] buffer_alloc_;
643 buffer_alloc_ = new char[buffer_length_ + kAlignment];
644 buffer_ = reinterpret_cast<char*>(ALIGNP(buffer_alloc_, kAlignment));
645 memcpy(buffer_, data, data_length_);
646 seek_position_ = 0;
647}
648
649StreamResult MemoryStream::DoReserve(size_t size, int* error) {
650 if (buffer_length_ >= size)
651 return SR_SUCCESS;
652
653 if (char* new_buffer_alloc = new char[size + kAlignment]) {
654 char* new_buffer = reinterpret_cast<char*>(
655 ALIGNP(new_buffer_alloc, kAlignment));
656 memcpy(new_buffer, buffer_, data_length_);
657 delete [] buffer_alloc_;
658 buffer_alloc_ = new_buffer_alloc;
659 buffer_ = new_buffer;
660 buffer_length_ = size;
661 return SR_SUCCESS;
662 }
663
664 if (error) {
665 *error = ENOMEM;
666 }
667 return SR_ERROR;
668}
669
670///////////////////////////////////////////////////////////////////////////////
671
672ExternalMemoryStream::ExternalMemoryStream() {
673}
674
675ExternalMemoryStream::ExternalMemoryStream(void* data, size_t length) {
676 SetData(data, length);
677}
678
679ExternalMemoryStream::~ExternalMemoryStream() {
680}
681
682void ExternalMemoryStream::SetData(void* data, size_t length) {
683 data_length_ = buffer_length_ = length;
684 buffer_ = static_cast<char*>(data);
685 seek_position_ = 0;
686}
687
688///////////////////////////////////////////////////////////////////////////////
689// FifoBuffer
690///////////////////////////////////////////////////////////////////////////////
691
692FifoBuffer::FifoBuffer(size_t size)
693 : state_(SS_OPEN), buffer_(new char[size]), buffer_length_(size),
694 data_length_(0), read_position_(0), owner_(Thread::Current()) {
695 // all events are done on the owner_ thread
696}
697
698FifoBuffer::FifoBuffer(size_t size, Thread* owner)
699 : state_(SS_OPEN), buffer_(new char[size]), buffer_length_(size),
700 data_length_(0), read_position_(0), owner_(owner) {
701 // all events are done on the owner_ thread
702}
703
704FifoBuffer::~FifoBuffer() {
705}
706
707bool FifoBuffer::GetBuffered(size_t* size) const {
708 CritScope cs(&crit_);
709 *size = data_length_;
710 return true;
711}
712
713bool FifoBuffer::SetCapacity(size_t size) {
714 CritScope cs(&crit_);
715 if (data_length_ > size) {
716 return false;
717 }
718
719 if (size != buffer_length_) {
720 char* buffer = new char[size];
721 const size_t copy = data_length_;
andresp@webrtc.orgff689be2015-02-12 11:54:26 +0000722 const size_t tail_copy = std::min(copy, buffer_length_ - read_position_);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000723 memcpy(buffer, &buffer_[read_position_], tail_copy);
724 memcpy(buffer + tail_copy, &buffer_[0], copy - tail_copy);
725 buffer_.reset(buffer);
726 read_position_ = 0;
727 buffer_length_ = size;
728 }
729 return true;
730}
731
732StreamResult FifoBuffer::ReadOffset(void* buffer, size_t bytes,
733 size_t offset, size_t* bytes_read) {
734 CritScope cs(&crit_);
735 return ReadOffsetLocked(buffer, bytes, offset, bytes_read);
736}
737
738StreamResult FifoBuffer::WriteOffset(const void* buffer, size_t bytes,
739 size_t offset, size_t* bytes_written) {
740 CritScope cs(&crit_);
741 return WriteOffsetLocked(buffer, bytes, offset, bytes_written);
742}
743
744StreamState FifoBuffer::GetState() const {
jbauch097d5492016-02-09 02:30:34 -0800745 CritScope cs(&crit_);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000746 return state_;
747}
748
749StreamResult FifoBuffer::Read(void* buffer, size_t bytes,
750 size_t* bytes_read, int* error) {
751 CritScope cs(&crit_);
752 const bool was_writable = data_length_ < buffer_length_;
753 size_t copy = 0;
754 StreamResult result = ReadOffsetLocked(buffer, bytes, 0, &copy);
755
756 if (result == SR_SUCCESS) {
757 // If read was successful then adjust the read position and number of
758 // bytes buffered.
759 read_position_ = (read_position_ + copy) % buffer_length_;
760 data_length_ -= copy;
761 if (bytes_read) {
762 *bytes_read = copy;
763 }
764
765 // if we were full before, and now we're not, post an event
766 if (!was_writable && copy > 0) {
767 PostEvent(owner_, SE_WRITE, 0);
768 }
769 }
770 return result;
771}
772
773StreamResult FifoBuffer::Write(const void* buffer, size_t bytes,
774 size_t* bytes_written, int* error) {
775 CritScope cs(&crit_);
776
777 const bool was_readable = (data_length_ > 0);
778 size_t copy = 0;
779 StreamResult result = WriteOffsetLocked(buffer, bytes, 0, &copy);
780
781 if (result == SR_SUCCESS) {
782 // If write was successful then adjust the number of readable bytes.
783 data_length_ += copy;
784 if (bytes_written) {
785 *bytes_written = copy;
786 }
787
788 // if we didn't have any data to read before, and now we do, post an event
789 if (!was_readable && copy > 0) {
790 PostEvent(owner_, SE_READ, 0);
791 }
792 }
793 return result;
794}
795
796void FifoBuffer::Close() {
797 CritScope cs(&crit_);
798 state_ = SS_CLOSED;
799}
800
801const void* FifoBuffer::GetReadData(size_t* size) {
802 CritScope cs(&crit_);
803 *size = (read_position_ + data_length_ <= buffer_length_) ?
804 data_length_ : buffer_length_ - read_position_;
805 return &buffer_[read_position_];
806}
807
808void FifoBuffer::ConsumeReadData(size_t size) {
809 CritScope cs(&crit_);
nisseede5da42017-01-12 05:15:36 -0800810 RTC_DCHECK(size <= data_length_);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000811 const bool was_writable = data_length_ < buffer_length_;
812 read_position_ = (read_position_ + size) % buffer_length_;
813 data_length_ -= size;
814 if (!was_writable && size > 0) {
815 PostEvent(owner_, SE_WRITE, 0);
816 }
817}
818
819void* FifoBuffer::GetWriteBuffer(size_t* size) {
820 CritScope cs(&crit_);
821 if (state_ == SS_CLOSED) {
822 return NULL;
823 }
824
825 // if empty, reset the write position to the beginning, so we can get
826 // the biggest possible block
827 if (data_length_ == 0) {
828 read_position_ = 0;
829 }
830
831 const size_t write_position = (read_position_ + data_length_)
832 % buffer_length_;
833 *size = (write_position > read_position_ || data_length_ == 0) ?
834 buffer_length_ - write_position : read_position_ - write_position;
835 return &buffer_[write_position];
836}
837
838void FifoBuffer::ConsumeWriteBuffer(size_t size) {
839 CritScope cs(&crit_);
nisseede5da42017-01-12 05:15:36 -0800840 RTC_DCHECK(size <= buffer_length_ - data_length_);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000841 const bool was_readable = (data_length_ > 0);
842 data_length_ += size;
843 if (!was_readable && size > 0) {
844 PostEvent(owner_, SE_READ, 0);
845 }
846}
847
848bool FifoBuffer::GetWriteRemaining(size_t* size) const {
849 CritScope cs(&crit_);
850 *size = buffer_length_ - data_length_;
851 return true;
852}
853
854StreamResult FifoBuffer::ReadOffsetLocked(void* buffer,
855 size_t bytes,
856 size_t offset,
857 size_t* bytes_read) {
858 if (offset >= data_length_) {
859 return (state_ != SS_CLOSED) ? SR_BLOCK : SR_EOS;
860 }
861
862 const size_t available = data_length_ - offset;
863 const size_t read_position = (read_position_ + offset) % buffer_length_;
andresp@webrtc.orgff689be2015-02-12 11:54:26 +0000864 const size_t copy = std::min(bytes, available);
865 const size_t tail_copy = std::min(copy, buffer_length_ - read_position);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000866 char* const p = static_cast<char*>(buffer);
867 memcpy(p, &buffer_[read_position], tail_copy);
868 memcpy(p + tail_copy, &buffer_[0], copy - tail_copy);
869
870 if (bytes_read) {
871 *bytes_read = copy;
872 }
873 return SR_SUCCESS;
874}
875
876StreamResult FifoBuffer::WriteOffsetLocked(const void* buffer,
877 size_t bytes,
878 size_t offset,
879 size_t* bytes_written) {
880 if (state_ == SS_CLOSED) {
881 return SR_EOS;
882 }
883
884 if (data_length_ + offset >= buffer_length_) {
885 return SR_BLOCK;
886 }
887
888 const size_t available = buffer_length_ - data_length_ - offset;
889 const size_t write_position = (read_position_ + data_length_ + offset)
890 % buffer_length_;
andresp@webrtc.orgff689be2015-02-12 11:54:26 +0000891 const size_t copy = std::min(bytes, available);
892 const size_t tail_copy = std::min(copy, buffer_length_ - write_position);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000893 const char* const p = static_cast<const char*>(buffer);
894 memcpy(&buffer_[write_position], p, tail_copy);
895 memcpy(&buffer_[0], p + tail_copy, copy - tail_copy);
896
897 if (bytes_written) {
898 *bytes_written = copy;
899 }
900 return SR_SUCCESS;
901}
902
903
904
905///////////////////////////////////////////////////////////////////////////////
906// LoggingAdapter
907///////////////////////////////////////////////////////////////////////////////
908
909LoggingAdapter::LoggingAdapter(StreamInterface* stream, LoggingSeverity level,
910 const std::string& label, bool hex_mode)
911 : StreamAdapterInterface(stream), level_(level), hex_mode_(hex_mode) {
912 set_label(label);
913}
914
915void LoggingAdapter::set_label(const std::string& label) {
916 label_.assign("[");
917 label_.append(label);
918 label_.append("]");
919}
920
921StreamResult LoggingAdapter::Read(void* buffer, size_t buffer_len,
922 size_t* read, int* error) {
923 size_t local_read; if (!read) read = &local_read;
924 StreamResult result = StreamAdapterInterface::Read(buffer, buffer_len, read,
925 error);
926 if (result == SR_SUCCESS) {
927 LogMultiline(level_, label_.c_str(), true, buffer, *read, hex_mode_, &lms_);
928 }
929 return result;
930}
931
932StreamResult LoggingAdapter::Write(const void* data, size_t data_len,
933 size_t* written, int* error) {
934 size_t local_written;
935 if (!written) written = &local_written;
936 StreamResult result = StreamAdapterInterface::Write(data, data_len, written,
937 error);
938 if (result == SR_SUCCESS) {
939 LogMultiline(level_, label_.c_str(), false, data, *written, hex_mode_,
940 &lms_);
941 }
942 return result;
943}
944
945void LoggingAdapter::Close() {
946 LogMultiline(level_, label_.c_str(), false, NULL, 0, hex_mode_, &lms_);
947 LogMultiline(level_, label_.c_str(), true, NULL, 0, hex_mode_, &lms_);
948 LOG_V(level_) << label_ << " Closed locally";
949 StreamAdapterInterface::Close();
950}
951
952void LoggingAdapter::OnEvent(StreamInterface* stream, int events, int err) {
953 if (events & SE_OPEN) {
954 LOG_V(level_) << label_ << " Open";
955 } else if (events & SE_CLOSE) {
956 LogMultiline(level_, label_.c_str(), false, NULL, 0, hex_mode_, &lms_);
957 LogMultiline(level_, label_.c_str(), true, NULL, 0, hex_mode_, &lms_);
958 LOG_V(level_) << label_ << " Closed with error: " << err;
959 }
960 StreamAdapterInterface::OnEvent(stream, events, err);
961}
962
963///////////////////////////////////////////////////////////////////////////////
964// StringStream - Reads/Writes to an external std::string
965///////////////////////////////////////////////////////////////////////////////
966
Tommi00aac5a2015-05-25 11:25:59 +0200967StringStream::StringStream(std::string* str)
968 : str_(*str), read_pos_(0), read_only_(false) {
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000969}
970
971StringStream::StringStream(const std::string& str)
972 : str_(const_cast<std::string&>(str)), read_pos_(0), read_only_(true) {
973}
974
975StreamState StringStream::GetState() const {
976 return SS_OPEN;
977}
978
979StreamResult StringStream::Read(void* buffer, size_t buffer_len,
980 size_t* read, int* error) {
andresp@webrtc.orgff689be2015-02-12 11:54:26 +0000981 size_t available = std::min(buffer_len, str_.size() - read_pos_);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000982 if (!available)
983 return SR_EOS;
984 memcpy(buffer, str_.data() + read_pos_, available);
985 read_pos_ += available;
986 if (read)
987 *read = available;
988 return SR_SUCCESS;
989}
990
991StreamResult StringStream::Write(const void* data, size_t data_len,
992 size_t* written, int* error) {
993 if (read_only_) {
994 if (error) {
995 *error = -1;
996 }
997 return SR_ERROR;
998 }
999 str_.append(static_cast<const char*>(data),
1000 static_cast<const char*>(data) + data_len);
1001 if (written)
1002 *written = data_len;
1003 return SR_SUCCESS;
1004}
1005
1006void StringStream::Close() {
1007}
1008
1009bool StringStream::SetPosition(size_t position) {
1010 if (position > str_.size())
1011 return false;
1012 read_pos_ = position;
1013 return true;
1014}
1015
1016bool StringStream::GetPosition(size_t* position) const {
1017 if (position)
1018 *position = read_pos_;
1019 return true;
1020}
1021
1022bool StringStream::GetSize(size_t* size) const {
1023 if (size)
1024 *size = str_.size();
1025 return true;
1026}
1027
1028bool StringStream::GetAvailable(size_t* size) const {
1029 if (size)
1030 *size = str_.size() - read_pos_;
1031 return true;
1032}
1033
1034bool StringStream::ReserveSize(size_t size) {
1035 if (read_only_)
1036 return false;
1037 str_.reserve(size);
1038 return true;
1039}
1040
1041///////////////////////////////////////////////////////////////////////////////
1042// StreamReference
1043///////////////////////////////////////////////////////////////////////////////
1044
1045StreamReference::StreamReference(StreamInterface* stream)
1046 : StreamAdapterInterface(stream, false) {
1047 // owner set to false so the destructor does not free the stream.
1048 stream_ref_count_ = new StreamRefCount(stream);
1049}
1050
1051StreamInterface* StreamReference::NewReference() {
1052 stream_ref_count_->AddReference();
1053 return new StreamReference(stream_ref_count_, stream());
1054}
1055
1056StreamReference::~StreamReference() {
1057 stream_ref_count_->Release();
1058}
1059
1060StreamReference::StreamReference(StreamRefCount* stream_ref_count,
1061 StreamInterface* stream)
1062 : StreamAdapterInterface(stream, false),
1063 stream_ref_count_(stream_ref_count) {
1064}
1065
1066///////////////////////////////////////////////////////////////////////////////
1067
1068StreamResult Flow(StreamInterface* source,
1069 char* buffer, size_t buffer_len,
1070 StreamInterface* sink,
1071 size_t* data_len /* = NULL */) {
nisseede5da42017-01-12 05:15:36 -08001072 RTC_DCHECK(buffer_len > 0);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +00001073
1074 StreamResult result;
1075 size_t count, read_pos, write_pos;
1076 if (data_len) {
1077 read_pos = *data_len;
1078 } else {
1079 read_pos = 0;
1080 }
1081
1082 bool end_of_stream = false;
1083 do {
1084 // Read until buffer is full, end of stream, or error
1085 while (!end_of_stream && (read_pos < buffer_len)) {
1086 result = source->Read(buffer + read_pos, buffer_len - read_pos,
1087 &count, NULL);
1088 if (result == SR_EOS) {
1089 end_of_stream = true;
1090 } else if (result != SR_SUCCESS) {
1091 if (data_len) {
1092 *data_len = read_pos;
1093 }
1094 return result;
1095 } else {
1096 read_pos += count;
1097 }
1098 }
1099
1100 // Write until buffer is empty, or error (including end of stream)
1101 write_pos = 0;
1102 while (write_pos < read_pos) {
1103 result = sink->Write(buffer + write_pos, read_pos - write_pos,
1104 &count, NULL);
1105 if (result != SR_SUCCESS) {
1106 if (data_len) {
1107 *data_len = read_pos - write_pos;
1108 if (write_pos > 0) {
1109 memmove(buffer, buffer + write_pos, *data_len);
1110 }
1111 }
1112 return result;
1113 }
1114 write_pos += count;
1115 }
1116
1117 read_pos = 0;
1118 } while (!end_of_stream);
1119
1120 if (data_len) {
1121 *data_len = 0;
1122 }
1123 return SR_SUCCESS;
1124}
1125
1126///////////////////////////////////////////////////////////////////////////////
1127
1128} // namespace rtc