blob: 3a96874dea8e976adda1c38d9ad825a9bdfd35c8 [file] [log] [blame]
henrike@webrtc.orgf0488722014-05-13 18:00:26 +00001/*
2 * Copyright 2004 The WebRTC Project Authors. All rights reserved.
3 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 */
10
11#if defined(WEBRTC_POSIX)
12#include <sys/file.h>
13#endif // WEBRTC_POSIX
14#include <sys/types.h>
15#include <sys/stat.h>
16#include <errno.h>
andresp@webrtc.orgff689be2015-02-12 11:54:26 +000017
18#include <algorithm>
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000019#include <string>
andresp@webrtc.orgff689be2015-02-12 11:54:26 +000020
Mirko Bonadei92ea95e2017-09-15 06:47:31 +020021#include "rtc_base/checks.h"
22#include "rtc_base/logging.h"
23#include "rtc_base/messagequeue.h"
24#include "rtc_base/stream.h"
25#include "rtc_base/stringencode.h"
26#include "rtc_base/stringutils.h"
27#include "rtc_base/thread.h"
28#include "rtc_base/timeutils.h"
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000029
30#if defined(WEBRTC_WIN)
Patrik Höglunda8005cf2017-12-13 16:05:42 +010031#include <windows.h>
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000032#define fileno _fileno
33#endif
34
35namespace rtc {
36
37///////////////////////////////////////////////////////////////////////////////
38// StreamInterface
39///////////////////////////////////////////////////////////////////////////////
40StreamInterface::~StreamInterface() {
41}
42
43StreamResult StreamInterface::WriteAll(const void* data, size_t data_len,
44 size_t* written, int* error) {
45 StreamResult result = SR_SUCCESS;
46 size_t total_written = 0, current_written;
47 while (total_written < data_len) {
48 result = Write(static_cast<const char*>(data) + total_written,
49 data_len - total_written, &current_written, error);
50 if (result != SR_SUCCESS)
51 break;
52 total_written += current_written;
53 }
54 if (written)
55 *written = total_written;
56 return result;
57}
58
59StreamResult StreamInterface::ReadAll(void* buffer, size_t buffer_len,
60 size_t* read, int* error) {
61 StreamResult result = SR_SUCCESS;
62 size_t total_read = 0, current_read;
63 while (total_read < buffer_len) {
64 result = Read(static_cast<char*>(buffer) + total_read,
65 buffer_len - total_read, &current_read, error);
66 if (result != SR_SUCCESS)
67 break;
68 total_read += current_read;
69 }
70 if (read)
71 *read = total_read;
72 return result;
73}
74
75StreamResult StreamInterface::ReadLine(std::string* line) {
76 line->clear();
77 StreamResult result = SR_SUCCESS;
78 while (true) {
79 char ch;
deadbeef37f5ecf2017-02-27 14:06:41 -080080 result = Read(&ch, sizeof(ch), nullptr, nullptr);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000081 if (result != SR_SUCCESS) {
82 break;
83 }
84 if (ch == '\n') {
85 break;
86 }
87 line->push_back(ch);
88 }
89 if (!line->empty()) { // give back the line we've collected so far with
90 result = SR_SUCCESS; // a success code. Otherwise return the last code
91 }
92 return result;
93}
94
95void StreamInterface::PostEvent(Thread* t, int events, int err) {
Taylor Brandstetter5d97a9a2016-06-10 14:17:27 -070096 t->Post(RTC_FROM_HERE, this, MSG_POST_EVENT,
97 new StreamEventData(events, err));
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000098}
99
100void StreamInterface::PostEvent(int events, int err) {
101 PostEvent(Thread::Current(), events, err);
102}
103
kwiberg@webrtc.org67186fe2015-03-09 22:21:53 +0000104const void* StreamInterface::GetReadData(size_t* data_len) {
deadbeef37f5ecf2017-02-27 14:06:41 -0800105 return nullptr;
kwiberg@webrtc.org67186fe2015-03-09 22:21:53 +0000106}
107
108void* StreamInterface::GetWriteBuffer(size_t* buf_len) {
deadbeef37f5ecf2017-02-27 14:06:41 -0800109 return nullptr;
kwiberg@webrtc.org67186fe2015-03-09 22:21:53 +0000110}
111
112bool StreamInterface::SetPosition(size_t position) {
113 return false;
114}
115
116bool StreamInterface::GetPosition(size_t* position) const {
117 return false;
118}
119
120bool StreamInterface::GetSize(size_t* size) const {
121 return false;
122}
123
124bool StreamInterface::GetAvailable(size_t* size) const {
125 return false;
126}
127
128bool StreamInterface::GetWriteRemaining(size_t* size) const {
129 return false;
130}
131
132bool StreamInterface::Flush() {
133 return false;
134}
135
136bool StreamInterface::ReserveSize(size_t size) {
137 return true;
138}
139
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000140StreamInterface::StreamInterface() {
141}
142
143void StreamInterface::OnMessage(Message* msg) {
144 if (MSG_POST_EVENT == msg->message_id) {
145 StreamEventData* pe = static_cast<StreamEventData*>(msg->pdata);
146 SignalEvent(this, pe->events, pe->error);
147 delete msg->pdata;
148 }
149}
150
151///////////////////////////////////////////////////////////////////////////////
152// StreamAdapterInterface
153///////////////////////////////////////////////////////////////////////////////
154
155StreamAdapterInterface::StreamAdapterInterface(StreamInterface* stream,
156 bool owned)
157 : stream_(stream), owned_(owned) {
deadbeef37f5ecf2017-02-27 14:06:41 -0800158 if (nullptr != stream_)
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000159 stream_->SignalEvent.connect(this, &StreamAdapterInterface::OnEvent);
160}
161
kwiberg@webrtc.org67186fe2015-03-09 22:21:53 +0000162StreamState StreamAdapterInterface::GetState() const {
163 return stream_->GetState();
164}
165StreamResult StreamAdapterInterface::Read(void* buffer,
166 size_t buffer_len,
167 size_t* read,
168 int* error) {
169 return stream_->Read(buffer, buffer_len, read, error);
170}
171StreamResult StreamAdapterInterface::Write(const void* data,
172 size_t data_len,
173 size_t* written,
174 int* error) {
175 return stream_->Write(data, data_len, written, error);
176}
177void StreamAdapterInterface::Close() {
178 stream_->Close();
179}
180
181bool StreamAdapterInterface::SetPosition(size_t position) {
182 return stream_->SetPosition(position);
183}
184
185bool StreamAdapterInterface::GetPosition(size_t* position) const {
186 return stream_->GetPosition(position);
187}
188
189bool StreamAdapterInterface::GetSize(size_t* size) const {
190 return stream_->GetSize(size);
191}
192
193bool StreamAdapterInterface::GetAvailable(size_t* size) const {
194 return stream_->GetAvailable(size);
195}
196
197bool StreamAdapterInterface::GetWriteRemaining(size_t* size) const {
198 return stream_->GetWriteRemaining(size);
199}
200
201bool StreamAdapterInterface::ReserveSize(size_t size) {
202 return stream_->ReserveSize(size);
203}
204
205bool StreamAdapterInterface::Flush() {
206 return stream_->Flush();
207}
208
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000209void StreamAdapterInterface::Attach(StreamInterface* stream, bool owned) {
deadbeef37f5ecf2017-02-27 14:06:41 -0800210 if (nullptr != stream_)
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000211 stream_->SignalEvent.disconnect(this);
212 if (owned_)
213 delete stream_;
214 stream_ = stream;
215 owned_ = owned;
deadbeef37f5ecf2017-02-27 14:06:41 -0800216 if (nullptr != stream_)
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000217 stream_->SignalEvent.connect(this, &StreamAdapterInterface::OnEvent);
218}
219
220StreamInterface* StreamAdapterInterface::Detach() {
deadbeef37f5ecf2017-02-27 14:06:41 -0800221 if (nullptr != stream_)
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000222 stream_->SignalEvent.disconnect(this);
223 StreamInterface* stream = stream_;
deadbeef37f5ecf2017-02-27 14:06:41 -0800224 stream_ = nullptr;
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000225 return stream;
226}
227
228StreamAdapterInterface::~StreamAdapterInterface() {
229 if (owned_)
230 delete stream_;
231}
232
kwiberg@webrtc.org67186fe2015-03-09 22:21:53 +0000233void StreamAdapterInterface::OnEvent(StreamInterface* stream,
234 int events,
235 int err) {
236 SignalEvent(this, events, err);
237}
238
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000239///////////////////////////////////////////////////////////////////////////////
deadbeeff137e972017-03-23 15:45:49 -0700240// StreamTap
241///////////////////////////////////////////////////////////////////////////////
242
243StreamTap::StreamTap(StreamInterface* stream, StreamInterface* tap)
244 : StreamAdapterInterface(stream), tap_(), tap_result_(SR_SUCCESS),
245 tap_error_(0) {
246 AttachTap(tap);
247}
248
249StreamTap::~StreamTap() = default;
250
251void StreamTap::AttachTap(StreamInterface* tap) {
252 tap_.reset(tap);
253}
254
255StreamInterface* StreamTap::DetachTap() {
256 return tap_.release();
257}
258
259StreamResult StreamTap::GetTapResult(int* error) {
260 if (error) {
261 *error = tap_error_;
262 }
263 return tap_result_;
264}
265
266StreamResult StreamTap::Read(void* buffer, size_t buffer_len,
267 size_t* read, int* error) {
268 size_t backup_read;
269 if (!read) {
270 read = &backup_read;
271 }
272 StreamResult res = StreamAdapterInterface::Read(buffer, buffer_len,
273 read, error);
274 if ((res == SR_SUCCESS) && (tap_result_ == SR_SUCCESS)) {
275 tap_result_ = tap_->WriteAll(buffer, *read, nullptr, &tap_error_);
276 }
277 return res;
278}
279
280StreamResult StreamTap::Write(const void* data, size_t data_len,
281 size_t* written, int* error) {
282 size_t backup_written;
283 if (!written) {
284 written = &backup_written;
285 }
286 StreamResult res = StreamAdapterInterface::Write(data, data_len,
287 written, error);
288 if ((res == SR_SUCCESS) && (tap_result_ == SR_SUCCESS)) {
289 tap_result_ = tap_->WriteAll(data, *written, nullptr, &tap_error_);
290 }
291 return res;
292}
293
294///////////////////////////////////////////////////////////////////////////////
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000295// NullStream
296///////////////////////////////////////////////////////////////////////////////
297
298NullStream::NullStream() {
299}
300
301NullStream::~NullStream() {
302}
303
304StreamState NullStream::GetState() const {
305 return SS_OPEN;
306}
307
308StreamResult NullStream::Read(void* buffer, size_t buffer_len,
309 size_t* read, int* error) {
310 if (error) *error = -1;
311 return SR_ERROR;
312}
313
314StreamResult NullStream::Write(const void* data, size_t data_len,
315 size_t* written, int* error) {
316 if (written) *written = data_len;
317 return SR_SUCCESS;
318}
319
320void NullStream::Close() {
321}
322
323///////////////////////////////////////////////////////////////////////////////
324// FileStream
325///////////////////////////////////////////////////////////////////////////////
326
deadbeef37f5ecf2017-02-27 14:06:41 -0800327FileStream::FileStream() : file_(nullptr) {}
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000328
329FileStream::~FileStream() {
330 FileStream::Close();
331}
332
333bool FileStream::Open(const std::string& filename, const char* mode,
334 int* error) {
335 Close();
336#if defined(WEBRTC_WIN)
337 std::wstring wfilename;
338 if (Utf8ToWindowsFilename(filename, &wfilename)) {
339 file_ = _wfopen(wfilename.c_str(), ToUtf16(mode).c_str());
340 } else {
341 if (error) {
342 *error = -1;
343 return false;
344 }
345 }
346#else
347 file_ = fopen(filename.c_str(), mode);
348#endif
349 if (!file_ && error) {
350 *error = errno;
351 }
deadbeef37f5ecf2017-02-27 14:06:41 -0800352 return (file_ != nullptr);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000353}
354
355bool FileStream::OpenShare(const std::string& filename, const char* mode,
356 int shflag, int* error) {
357 Close();
358#if defined(WEBRTC_WIN)
359 std::wstring wfilename;
360 if (Utf8ToWindowsFilename(filename, &wfilename)) {
361 file_ = _wfsopen(wfilename.c_str(), ToUtf16(mode).c_str(), shflag);
362 if (!file_ && error) {
363 *error = errno;
364 return false;
365 }
deadbeef37f5ecf2017-02-27 14:06:41 -0800366 return file_ != nullptr;
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000367 } else {
368 if (error) {
369 *error = -1;
370 }
371 return false;
372 }
373#else
374 return Open(filename, mode, error);
375#endif
376}
377
378bool FileStream::DisableBuffering() {
379 if (!file_)
380 return false;
deadbeef37f5ecf2017-02-27 14:06:41 -0800381 return (setvbuf(file_, nullptr, _IONBF, 0) == 0);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000382}
383
384StreamState FileStream::GetState() const {
deadbeef37f5ecf2017-02-27 14:06:41 -0800385 return (file_ == nullptr) ? SS_CLOSED : SS_OPEN;
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000386}
387
388StreamResult FileStream::Read(void* buffer, size_t buffer_len,
389 size_t* read, int* error) {
390 if (!file_)
391 return SR_EOS;
392 size_t result = fread(buffer, 1, buffer_len, file_);
393 if ((result == 0) && (buffer_len > 0)) {
394 if (feof(file_))
395 return SR_EOS;
396 if (error)
397 *error = errno;
398 return SR_ERROR;
399 }
400 if (read)
401 *read = result;
402 return SR_SUCCESS;
403}
404
405StreamResult FileStream::Write(const void* data, size_t data_len,
406 size_t* written, int* error) {
407 if (!file_)
408 return SR_EOS;
409 size_t result = fwrite(data, 1, data_len, file_);
410 if ((result == 0) && (data_len > 0)) {
411 if (error)
412 *error = errno;
413 return SR_ERROR;
414 }
415 if (written)
416 *written = result;
417 return SR_SUCCESS;
418}
419
420void FileStream::Close() {
421 if (file_) {
422 DoClose();
deadbeef37f5ecf2017-02-27 14:06:41 -0800423 file_ = nullptr;
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000424 }
425}
426
427bool FileStream::SetPosition(size_t position) {
428 if (!file_)
429 return false;
430 return (fseek(file_, static_cast<int>(position), SEEK_SET) == 0);
431}
432
433bool FileStream::GetPosition(size_t* position) const {
deadbeef37f5ecf2017-02-27 14:06:41 -0800434 RTC_DCHECK(nullptr != position);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000435 if (!file_)
436 return false;
437 long result = ftell(file_);
438 if (result < 0)
439 return false;
440 if (position)
441 *position = result;
442 return true;
443}
444
445bool FileStream::GetSize(size_t* size) const {
deadbeef37f5ecf2017-02-27 14:06:41 -0800446 RTC_DCHECK(nullptr != size);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000447 if (!file_)
448 return false;
449 struct stat file_stats;
450 if (fstat(fileno(file_), &file_stats) != 0)
451 return false;
452 if (size)
453 *size = file_stats.st_size;
454 return true;
455}
456
457bool FileStream::GetAvailable(size_t* size) const {
deadbeef37f5ecf2017-02-27 14:06:41 -0800458 RTC_DCHECK(nullptr != size);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000459 if (!GetSize(size))
460 return false;
461 long result = ftell(file_);
462 if (result < 0)
463 return false;
464 if (size)
465 *size -= result;
466 return true;
467}
468
469bool FileStream::ReserveSize(size_t size) {
470 // TODO: extend the file to the proper length
471 return true;
472}
473
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000474bool FileStream::Flush() {
475 if (file_) {
476 return (0 == fflush(file_));
477 }
478 // try to flush empty file?
nissec80e7412017-01-11 05:56:46 -0800479 RTC_NOTREACHED();
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000480 return false;
481}
482
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000483void FileStream::DoClose() {
484 fclose(file_);
485}
486
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000487///////////////////////////////////////////////////////////////////////////////
488// MemoryStream
489///////////////////////////////////////////////////////////////////////////////
490
491MemoryStreamBase::MemoryStreamBase()
deadbeef37f5ecf2017-02-27 14:06:41 -0800492 : buffer_(nullptr), buffer_length_(0), data_length_(0), seek_position_(0) {}
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000493
494StreamState MemoryStreamBase::GetState() const {
495 return SS_OPEN;
496}
497
498StreamResult MemoryStreamBase::Read(void* buffer, size_t bytes,
499 size_t* bytes_read, int* error) {
500 if (seek_position_ >= data_length_) {
501 return SR_EOS;
502 }
503 size_t available = data_length_ - seek_position_;
504 if (bytes > available) {
505 // Read partial buffer
506 bytes = available;
507 }
508 memcpy(buffer, &buffer_[seek_position_], bytes);
509 seek_position_ += bytes;
510 if (bytes_read) {
511 *bytes_read = bytes;
512 }
513 return SR_SUCCESS;
514}
515
516StreamResult MemoryStreamBase::Write(const void* buffer, size_t bytes,
517 size_t* bytes_written, int* error) {
518 size_t available = buffer_length_ - seek_position_;
519 if (0 == available) {
520 // Increase buffer size to the larger of:
521 // a) new position rounded up to next 256 bytes
522 // b) double the previous length
andresp@webrtc.orgff689be2015-02-12 11:54:26 +0000523 size_t new_buffer_length =
524 std::max(((seek_position_ + bytes) | 0xFF) + 1, buffer_length_ * 2);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000525 StreamResult result = DoReserve(new_buffer_length, error);
526 if (SR_SUCCESS != result) {
527 return result;
528 }
nisseede5da42017-01-12 05:15:36 -0800529 RTC_DCHECK(buffer_length_ >= new_buffer_length);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000530 available = buffer_length_ - seek_position_;
531 }
532
533 if (bytes > available) {
534 bytes = available;
535 }
536 memcpy(&buffer_[seek_position_], buffer, bytes);
537 seek_position_ += bytes;
538 if (data_length_ < seek_position_) {
539 data_length_ = seek_position_;
540 }
541 if (bytes_written) {
542 *bytes_written = bytes;
543 }
544 return SR_SUCCESS;
545}
546
547void MemoryStreamBase::Close() {
548 // nothing to do
549}
550
551bool MemoryStreamBase::SetPosition(size_t position) {
552 if (position > data_length_)
553 return false;
554 seek_position_ = position;
555 return true;
556}
557
558bool MemoryStreamBase::GetPosition(size_t* position) const {
559 if (position)
560 *position = seek_position_;
561 return true;
562}
563
564bool MemoryStreamBase::GetSize(size_t* size) const {
565 if (size)
566 *size = data_length_;
567 return true;
568}
569
570bool MemoryStreamBase::GetAvailable(size_t* size) const {
571 if (size)
572 *size = data_length_ - seek_position_;
573 return true;
574}
575
576bool MemoryStreamBase::ReserveSize(size_t size) {
deadbeef37f5ecf2017-02-27 14:06:41 -0800577 return (SR_SUCCESS == DoReserve(size, nullptr));
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000578}
579
580StreamResult MemoryStreamBase::DoReserve(size_t size, int* error) {
581 return (buffer_length_ >= size) ? SR_SUCCESS : SR_EOS;
582}
583
584///////////////////////////////////////////////////////////////////////////////
585
deadbeef37f5ecf2017-02-27 14:06:41 -0800586MemoryStream::MemoryStream() : buffer_alloc_(nullptr) {}
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000587
deadbeef37f5ecf2017-02-27 14:06:41 -0800588MemoryStream::MemoryStream(const char* data) : buffer_alloc_(nullptr) {
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000589 SetData(data, strlen(data));
590}
591
592MemoryStream::MemoryStream(const void* data, size_t length)
deadbeef37f5ecf2017-02-27 14:06:41 -0800593 : buffer_alloc_(nullptr) {
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000594 SetData(data, length);
595}
596
597MemoryStream::~MemoryStream() {
598 delete [] buffer_alloc_;
599}
600
601void MemoryStream::SetData(const void* data, size_t length) {
602 data_length_ = buffer_length_ = length;
603 delete [] buffer_alloc_;
604 buffer_alloc_ = new char[buffer_length_ + kAlignment];
605 buffer_ = reinterpret_cast<char*>(ALIGNP(buffer_alloc_, kAlignment));
606 memcpy(buffer_, data, data_length_);
607 seek_position_ = 0;
608}
609
610StreamResult MemoryStream::DoReserve(size_t size, int* error) {
611 if (buffer_length_ >= size)
612 return SR_SUCCESS;
613
614 if (char* new_buffer_alloc = new char[size + kAlignment]) {
615 char* new_buffer = reinterpret_cast<char*>(
616 ALIGNP(new_buffer_alloc, kAlignment));
617 memcpy(new_buffer, buffer_, data_length_);
618 delete [] buffer_alloc_;
619 buffer_alloc_ = new_buffer_alloc;
620 buffer_ = new_buffer;
621 buffer_length_ = size;
622 return SR_SUCCESS;
623 }
624
625 if (error) {
626 *error = ENOMEM;
627 }
628 return SR_ERROR;
629}
630
631///////////////////////////////////////////////////////////////////////////////
deadbeeff137e972017-03-23 15:45:49 -0700632
633ExternalMemoryStream::ExternalMemoryStream() {
634}
635
636ExternalMemoryStream::ExternalMemoryStream(void* data, size_t length) {
637 SetData(data, length);
638}
639
640ExternalMemoryStream::~ExternalMemoryStream() {
641}
642
643void ExternalMemoryStream::SetData(void* data, size_t length) {
644 data_length_ = buffer_length_ = length;
645 buffer_ = static_cast<char*>(data);
646 seek_position_ = 0;
647}
648
649///////////////////////////////////////////////////////////////////////////////
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000650// FifoBuffer
651///////////////////////////////////////////////////////////////////////////////
652
653FifoBuffer::FifoBuffer(size_t size)
654 : state_(SS_OPEN), buffer_(new char[size]), buffer_length_(size),
655 data_length_(0), read_position_(0), owner_(Thread::Current()) {
656 // all events are done on the owner_ thread
657}
658
659FifoBuffer::FifoBuffer(size_t size, Thread* owner)
660 : state_(SS_OPEN), buffer_(new char[size]), buffer_length_(size),
661 data_length_(0), read_position_(0), owner_(owner) {
662 // all events are done on the owner_ thread
663}
664
665FifoBuffer::~FifoBuffer() {
666}
667
668bool FifoBuffer::GetBuffered(size_t* size) const {
669 CritScope cs(&crit_);
670 *size = data_length_;
671 return true;
672}
673
674bool FifoBuffer::SetCapacity(size_t size) {
675 CritScope cs(&crit_);
676 if (data_length_ > size) {
677 return false;
678 }
679
680 if (size != buffer_length_) {
681 char* buffer = new char[size];
682 const size_t copy = data_length_;
andresp@webrtc.orgff689be2015-02-12 11:54:26 +0000683 const size_t tail_copy = std::min(copy, buffer_length_ - read_position_);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000684 memcpy(buffer, &buffer_[read_position_], tail_copy);
685 memcpy(buffer + tail_copy, &buffer_[0], copy - tail_copy);
686 buffer_.reset(buffer);
687 read_position_ = 0;
688 buffer_length_ = size;
689 }
690 return true;
691}
692
693StreamResult FifoBuffer::ReadOffset(void* buffer, size_t bytes,
694 size_t offset, size_t* bytes_read) {
695 CritScope cs(&crit_);
696 return ReadOffsetLocked(buffer, bytes, offset, bytes_read);
697}
698
699StreamResult FifoBuffer::WriteOffset(const void* buffer, size_t bytes,
700 size_t offset, size_t* bytes_written) {
701 CritScope cs(&crit_);
702 return WriteOffsetLocked(buffer, bytes, offset, bytes_written);
703}
704
705StreamState FifoBuffer::GetState() const {
jbauch097d5492016-02-09 02:30:34 -0800706 CritScope cs(&crit_);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000707 return state_;
708}
709
710StreamResult FifoBuffer::Read(void* buffer, size_t bytes,
711 size_t* bytes_read, int* error) {
712 CritScope cs(&crit_);
713 const bool was_writable = data_length_ < buffer_length_;
714 size_t copy = 0;
715 StreamResult result = ReadOffsetLocked(buffer, bytes, 0, &copy);
716
717 if (result == SR_SUCCESS) {
718 // If read was successful then adjust the read position and number of
719 // bytes buffered.
720 read_position_ = (read_position_ + copy) % buffer_length_;
721 data_length_ -= copy;
722 if (bytes_read) {
723 *bytes_read = copy;
724 }
725
726 // if we were full before, and now we're not, post an event
727 if (!was_writable && copy > 0) {
728 PostEvent(owner_, SE_WRITE, 0);
729 }
730 }
731 return result;
732}
733
734StreamResult FifoBuffer::Write(const void* buffer, size_t bytes,
735 size_t* bytes_written, int* error) {
736 CritScope cs(&crit_);
737
738 const bool was_readable = (data_length_ > 0);
739 size_t copy = 0;
740 StreamResult result = WriteOffsetLocked(buffer, bytes, 0, &copy);
741
742 if (result == SR_SUCCESS) {
743 // If write was successful then adjust the number of readable bytes.
744 data_length_ += copy;
745 if (bytes_written) {
746 *bytes_written = copy;
747 }
748
749 // if we didn't have any data to read before, and now we do, post an event
750 if (!was_readable && copy > 0) {
751 PostEvent(owner_, SE_READ, 0);
752 }
753 }
754 return result;
755}
756
757void FifoBuffer::Close() {
758 CritScope cs(&crit_);
759 state_ = SS_CLOSED;
760}
761
762const void* FifoBuffer::GetReadData(size_t* size) {
763 CritScope cs(&crit_);
764 *size = (read_position_ + data_length_ <= buffer_length_) ?
765 data_length_ : buffer_length_ - read_position_;
766 return &buffer_[read_position_];
767}
768
769void FifoBuffer::ConsumeReadData(size_t size) {
770 CritScope cs(&crit_);
nisseede5da42017-01-12 05:15:36 -0800771 RTC_DCHECK(size <= data_length_);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000772 const bool was_writable = data_length_ < buffer_length_;
773 read_position_ = (read_position_ + size) % buffer_length_;
774 data_length_ -= size;
775 if (!was_writable && size > 0) {
776 PostEvent(owner_, SE_WRITE, 0);
777 }
778}
779
780void* FifoBuffer::GetWriteBuffer(size_t* size) {
781 CritScope cs(&crit_);
782 if (state_ == SS_CLOSED) {
deadbeef37f5ecf2017-02-27 14:06:41 -0800783 return nullptr;
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000784 }
785
786 // if empty, reset the write position to the beginning, so we can get
787 // the biggest possible block
788 if (data_length_ == 0) {
789 read_position_ = 0;
790 }
791
792 const size_t write_position = (read_position_ + data_length_)
793 % buffer_length_;
794 *size = (write_position > read_position_ || data_length_ == 0) ?
795 buffer_length_ - write_position : read_position_ - write_position;
796 return &buffer_[write_position];
797}
798
799void FifoBuffer::ConsumeWriteBuffer(size_t size) {
800 CritScope cs(&crit_);
nisseede5da42017-01-12 05:15:36 -0800801 RTC_DCHECK(size <= buffer_length_ - data_length_);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000802 const bool was_readable = (data_length_ > 0);
803 data_length_ += size;
804 if (!was_readable && size > 0) {
805 PostEvent(owner_, SE_READ, 0);
806 }
807}
808
809bool FifoBuffer::GetWriteRemaining(size_t* size) const {
810 CritScope cs(&crit_);
811 *size = buffer_length_ - data_length_;
812 return true;
813}
814
815StreamResult FifoBuffer::ReadOffsetLocked(void* buffer,
816 size_t bytes,
817 size_t offset,
818 size_t* bytes_read) {
819 if (offset >= data_length_) {
820 return (state_ != SS_CLOSED) ? SR_BLOCK : SR_EOS;
821 }
822
823 const size_t available = data_length_ - offset;
824 const size_t read_position = (read_position_ + offset) % buffer_length_;
andresp@webrtc.orgff689be2015-02-12 11:54:26 +0000825 const size_t copy = std::min(bytes, available);
826 const size_t tail_copy = std::min(copy, buffer_length_ - read_position);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000827 char* const p = static_cast<char*>(buffer);
828 memcpy(p, &buffer_[read_position], tail_copy);
829 memcpy(p + tail_copy, &buffer_[0], copy - tail_copy);
830
831 if (bytes_read) {
832 *bytes_read = copy;
833 }
834 return SR_SUCCESS;
835}
836
837StreamResult FifoBuffer::WriteOffsetLocked(const void* buffer,
838 size_t bytes,
839 size_t offset,
840 size_t* bytes_written) {
841 if (state_ == SS_CLOSED) {
842 return SR_EOS;
843 }
844
845 if (data_length_ + offset >= buffer_length_) {
846 return SR_BLOCK;
847 }
848
849 const size_t available = buffer_length_ - data_length_ - offset;
850 const size_t write_position = (read_position_ + data_length_ + offset)
851 % buffer_length_;
andresp@webrtc.orgff689be2015-02-12 11:54:26 +0000852 const size_t copy = std::min(bytes, available);
853 const size_t tail_copy = std::min(copy, buffer_length_ - write_position);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000854 const char* const p = static_cast<const char*>(buffer);
855 memcpy(&buffer_[write_position], p, tail_copy);
856 memcpy(&buffer_[0], p + tail_copy, copy - tail_copy);
857
858 if (bytes_written) {
859 *bytes_written = copy;
860 }
861 return SR_SUCCESS;
862}
863
deadbeeff137e972017-03-23 15:45:49 -0700864
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000865///////////////////////////////////////////////////////////////////////////////
866// StringStream - Reads/Writes to an external std::string
867///////////////////////////////////////////////////////////////////////////////
868
Tommi00aac5a2015-05-25 11:25:59 +0200869StringStream::StringStream(std::string* str)
870 : str_(*str), read_pos_(0), read_only_(false) {
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000871}
872
873StringStream::StringStream(const std::string& str)
874 : str_(const_cast<std::string&>(str)), read_pos_(0), read_only_(true) {
875}
876
877StreamState StringStream::GetState() const {
878 return SS_OPEN;
879}
880
881StreamResult StringStream::Read(void* buffer, size_t buffer_len,
882 size_t* read, int* error) {
andresp@webrtc.orgff689be2015-02-12 11:54:26 +0000883 size_t available = std::min(buffer_len, str_.size() - read_pos_);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000884 if (!available)
885 return SR_EOS;
886 memcpy(buffer, str_.data() + read_pos_, available);
887 read_pos_ += available;
888 if (read)
889 *read = available;
890 return SR_SUCCESS;
891}
892
893StreamResult StringStream::Write(const void* data, size_t data_len,
894 size_t* written, int* error) {
895 if (read_only_) {
896 if (error) {
897 *error = -1;
898 }
899 return SR_ERROR;
900 }
901 str_.append(static_cast<const char*>(data),
902 static_cast<const char*>(data) + data_len);
903 if (written)
904 *written = data_len;
905 return SR_SUCCESS;
906}
907
908void StringStream::Close() {
909}
910
911bool StringStream::SetPosition(size_t position) {
912 if (position > str_.size())
913 return false;
914 read_pos_ = position;
915 return true;
916}
917
918bool StringStream::GetPosition(size_t* position) const {
919 if (position)
920 *position = read_pos_;
921 return true;
922}
923
924bool StringStream::GetSize(size_t* size) const {
925 if (size)
926 *size = str_.size();
927 return true;
928}
929
930bool StringStream::GetAvailable(size_t* size) const {
931 if (size)
932 *size = str_.size() - read_pos_;
933 return true;
934}
935
936bool StringStream::ReserveSize(size_t size) {
937 if (read_only_)
938 return false;
939 str_.reserve(size);
940 return true;
941}
942
943///////////////////////////////////////////////////////////////////////////////
deadbeeff137e972017-03-23 15:45:49 -0700944// StreamReference
945///////////////////////////////////////////////////////////////////////////////
946
947StreamReference::StreamReference(StreamInterface* stream)
948 : StreamAdapterInterface(stream, false) {
949 // owner set to false so the destructor does not free the stream.
950 stream_ref_count_ = new StreamRefCount(stream);
951}
952
953StreamInterface* StreamReference::NewReference() {
954 stream_ref_count_->AddReference();
955 return new StreamReference(stream_ref_count_, stream());
956}
957
958StreamReference::~StreamReference() {
959 stream_ref_count_->Release();
960}
961
962StreamReference::StreamReference(StreamRefCount* stream_ref_count,
963 StreamInterface* stream)
964 : StreamAdapterInterface(stream, false),
965 stream_ref_count_(stream_ref_count) {
966}
967
968///////////////////////////////////////////////////////////////////////////////
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000969
970StreamResult Flow(StreamInterface* source,
deadbeef37f5ecf2017-02-27 14:06:41 -0800971 char* buffer,
972 size_t buffer_len,
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000973 StreamInterface* sink,
deadbeef37f5ecf2017-02-27 14:06:41 -0800974 size_t* data_len /* = nullptr */) {
nisseede5da42017-01-12 05:15:36 -0800975 RTC_DCHECK(buffer_len > 0);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000976
977 StreamResult result;
978 size_t count, read_pos, write_pos;
979 if (data_len) {
980 read_pos = *data_len;
981 } else {
982 read_pos = 0;
983 }
984
985 bool end_of_stream = false;
986 do {
987 // Read until buffer is full, end of stream, or error
988 while (!end_of_stream && (read_pos < buffer_len)) {
deadbeef37f5ecf2017-02-27 14:06:41 -0800989 result = source->Read(buffer + read_pos, buffer_len - read_pos, &count,
990 nullptr);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000991 if (result == SR_EOS) {
992 end_of_stream = true;
993 } else if (result != SR_SUCCESS) {
994 if (data_len) {
995 *data_len = read_pos;
996 }
997 return result;
998 } else {
999 read_pos += count;
1000 }
1001 }
1002
1003 // Write until buffer is empty, or error (including end of stream)
1004 write_pos = 0;
1005 while (write_pos < read_pos) {
deadbeef37f5ecf2017-02-27 14:06:41 -08001006 result = sink->Write(buffer + write_pos, read_pos - write_pos, &count,
1007 nullptr);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +00001008 if (result != SR_SUCCESS) {
1009 if (data_len) {
1010 *data_len = read_pos - write_pos;
1011 if (write_pos > 0) {
1012 memmove(buffer, buffer + write_pos, *data_len);
1013 }
1014 }
1015 return result;
1016 }
1017 write_pos += count;
1018 }
1019
1020 read_pos = 0;
1021 } while (!end_of_stream);
1022
1023 if (data_len) {
1024 *data_len = 0;
1025 }
1026 return SR_SUCCESS;
1027}
1028
1029///////////////////////////////////////////////////////////////////////////////
1030
1031} // namespace rtc