blob: 9a33e7d533ec94a8804035cdf5621d969f422925 [file] [log] [blame]
henrike@webrtc.orgf0488722014-05-13 18:00:26 +00001/*
2 * Copyright 2004 The WebRTC Project Authors. All rights reserved.
3 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 */
10
11#if defined(WEBRTC_POSIX)
12#include <sys/file.h>
13#endif // WEBRTC_POSIX
14#include <sys/types.h>
15#include <sys/stat.h>
16#include <errno.h>
andresp@webrtc.orgff689be2015-02-12 11:54:26 +000017
18#include <algorithm>
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000019#include <string>
andresp@webrtc.orgff689be2015-02-12 11:54:26 +000020
Mirko Bonadei92ea95e2017-09-15 06:47:31 +020021#include "rtc_base/checks.h"
22#include "rtc_base/logging.h"
23#include "rtc_base/messagequeue.h"
24#include "rtc_base/stream.h"
25#include "rtc_base/stringencode.h"
26#include "rtc_base/stringutils.h"
27#include "rtc_base/thread.h"
28#include "rtc_base/timeutils.h"
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000029
30#if defined(WEBRTC_WIN)
Patrik Höglunda8005cf2017-12-13 16:05:42 +010031#include <windows.h>
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000032#define fileno _fileno
33#endif
34
35namespace rtc {
36
37///////////////////////////////////////////////////////////////////////////////
38// StreamInterface
39///////////////////////////////////////////////////////////////////////////////
40StreamInterface::~StreamInterface() {
41}
42
43StreamResult StreamInterface::WriteAll(const void* data, size_t data_len,
44 size_t* written, int* error) {
45 StreamResult result = SR_SUCCESS;
46 size_t total_written = 0, current_written;
47 while (total_written < data_len) {
48 result = Write(static_cast<const char*>(data) + total_written,
49 data_len - total_written, &current_written, error);
50 if (result != SR_SUCCESS)
51 break;
52 total_written += current_written;
53 }
54 if (written)
55 *written = total_written;
56 return result;
57}
58
59StreamResult StreamInterface::ReadAll(void* buffer, size_t buffer_len,
60 size_t* read, int* error) {
61 StreamResult result = SR_SUCCESS;
62 size_t total_read = 0, current_read;
63 while (total_read < buffer_len) {
64 result = Read(static_cast<char*>(buffer) + total_read,
65 buffer_len - total_read, &current_read, error);
66 if (result != SR_SUCCESS)
67 break;
68 total_read += current_read;
69 }
70 if (read)
71 *read = total_read;
72 return result;
73}
74
75StreamResult StreamInterface::ReadLine(std::string* line) {
76 line->clear();
77 StreamResult result = SR_SUCCESS;
78 while (true) {
79 char ch;
deadbeef37f5ecf2017-02-27 14:06:41 -080080 result = Read(&ch, sizeof(ch), nullptr, nullptr);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000081 if (result != SR_SUCCESS) {
82 break;
83 }
84 if (ch == '\n') {
85 break;
86 }
87 line->push_back(ch);
88 }
89 if (!line->empty()) { // give back the line we've collected so far with
90 result = SR_SUCCESS; // a success code. Otherwise return the last code
91 }
92 return result;
93}
94
95void StreamInterface::PostEvent(Thread* t, int events, int err) {
Taylor Brandstetter5d97a9a2016-06-10 14:17:27 -070096 t->Post(RTC_FROM_HERE, this, MSG_POST_EVENT,
97 new StreamEventData(events, err));
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000098}
99
100void StreamInterface::PostEvent(int events, int err) {
101 PostEvent(Thread::Current(), events, err);
102}
103
kwiberg@webrtc.org67186fe2015-03-09 22:21:53 +0000104const void* StreamInterface::GetReadData(size_t* data_len) {
deadbeef37f5ecf2017-02-27 14:06:41 -0800105 return nullptr;
kwiberg@webrtc.org67186fe2015-03-09 22:21:53 +0000106}
107
108void* StreamInterface::GetWriteBuffer(size_t* buf_len) {
deadbeef37f5ecf2017-02-27 14:06:41 -0800109 return nullptr;
kwiberg@webrtc.org67186fe2015-03-09 22:21:53 +0000110}
111
112bool StreamInterface::SetPosition(size_t position) {
113 return false;
114}
115
116bool StreamInterface::GetPosition(size_t* position) const {
117 return false;
118}
119
120bool StreamInterface::GetSize(size_t* size) const {
121 return false;
122}
123
124bool StreamInterface::GetAvailable(size_t* size) const {
125 return false;
126}
127
128bool StreamInterface::GetWriteRemaining(size_t* size) const {
129 return false;
130}
131
132bool StreamInterface::Flush() {
133 return false;
134}
135
136bool StreamInterface::ReserveSize(size_t size) {
137 return true;
138}
139
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000140StreamInterface::StreamInterface() {
141}
142
143void StreamInterface::OnMessage(Message* msg) {
144 if (MSG_POST_EVENT == msg->message_id) {
145 StreamEventData* pe = static_cast<StreamEventData*>(msg->pdata);
146 SignalEvent(this, pe->events, pe->error);
147 delete msg->pdata;
148 }
149}
150
151///////////////////////////////////////////////////////////////////////////////
152// StreamAdapterInterface
153///////////////////////////////////////////////////////////////////////////////
154
155StreamAdapterInterface::StreamAdapterInterface(StreamInterface* stream,
156 bool owned)
157 : stream_(stream), owned_(owned) {
deadbeef37f5ecf2017-02-27 14:06:41 -0800158 if (nullptr != stream_)
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000159 stream_->SignalEvent.connect(this, &StreamAdapterInterface::OnEvent);
160}
161
kwiberg@webrtc.org67186fe2015-03-09 22:21:53 +0000162StreamState StreamAdapterInterface::GetState() const {
163 return stream_->GetState();
164}
165StreamResult StreamAdapterInterface::Read(void* buffer,
166 size_t buffer_len,
167 size_t* read,
168 int* error) {
169 return stream_->Read(buffer, buffer_len, read, error);
170}
171StreamResult StreamAdapterInterface::Write(const void* data,
172 size_t data_len,
173 size_t* written,
174 int* error) {
175 return stream_->Write(data, data_len, written, error);
176}
177void StreamAdapterInterface::Close() {
178 stream_->Close();
179}
180
181bool StreamAdapterInterface::SetPosition(size_t position) {
182 return stream_->SetPosition(position);
183}
184
185bool StreamAdapterInterface::GetPosition(size_t* position) const {
186 return stream_->GetPosition(position);
187}
188
189bool StreamAdapterInterface::GetSize(size_t* size) const {
190 return stream_->GetSize(size);
191}
192
193bool StreamAdapterInterface::GetAvailable(size_t* size) const {
194 return stream_->GetAvailable(size);
195}
196
197bool StreamAdapterInterface::GetWriteRemaining(size_t* size) const {
198 return stream_->GetWriteRemaining(size);
199}
200
201bool StreamAdapterInterface::ReserveSize(size_t size) {
202 return stream_->ReserveSize(size);
203}
204
205bool StreamAdapterInterface::Flush() {
206 return stream_->Flush();
207}
208
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000209void StreamAdapterInterface::Attach(StreamInterface* stream, bool owned) {
deadbeef37f5ecf2017-02-27 14:06:41 -0800210 if (nullptr != stream_)
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000211 stream_->SignalEvent.disconnect(this);
212 if (owned_)
213 delete stream_;
214 stream_ = stream;
215 owned_ = owned;
deadbeef37f5ecf2017-02-27 14:06:41 -0800216 if (nullptr != stream_)
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000217 stream_->SignalEvent.connect(this, &StreamAdapterInterface::OnEvent);
218}
219
220StreamInterface* StreamAdapterInterface::Detach() {
deadbeef37f5ecf2017-02-27 14:06:41 -0800221 if (nullptr != stream_)
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000222 stream_->SignalEvent.disconnect(this);
223 StreamInterface* stream = stream_;
deadbeef37f5ecf2017-02-27 14:06:41 -0800224 stream_ = nullptr;
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000225 return stream;
226}
227
228StreamAdapterInterface::~StreamAdapterInterface() {
229 if (owned_)
230 delete stream_;
231}
232
kwiberg@webrtc.org67186fe2015-03-09 22:21:53 +0000233void StreamAdapterInterface::OnEvent(StreamInterface* stream,
234 int events,
235 int err) {
236 SignalEvent(this, events, err);
237}
238
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000239///////////////////////////////////////////////////////////////////////////////
deadbeeff137e972017-03-23 15:45:49 -0700240// StreamTap
241///////////////////////////////////////////////////////////////////////////////
242
243StreamTap::StreamTap(StreamInterface* stream, StreamInterface* tap)
244 : StreamAdapterInterface(stream), tap_(), tap_result_(SR_SUCCESS),
245 tap_error_(0) {
246 AttachTap(tap);
247}
248
249StreamTap::~StreamTap() = default;
250
251void StreamTap::AttachTap(StreamInterface* tap) {
252 tap_.reset(tap);
253}
254
255StreamInterface* StreamTap::DetachTap() {
256 return tap_.release();
257}
258
259StreamResult StreamTap::GetTapResult(int* error) {
260 if (error) {
261 *error = tap_error_;
262 }
263 return tap_result_;
264}
265
266StreamResult StreamTap::Read(void* buffer, size_t buffer_len,
267 size_t* read, int* error) {
268 size_t backup_read;
269 if (!read) {
270 read = &backup_read;
271 }
272 StreamResult res = StreamAdapterInterface::Read(buffer, buffer_len,
273 read, error);
274 if ((res == SR_SUCCESS) && (tap_result_ == SR_SUCCESS)) {
275 tap_result_ = tap_->WriteAll(buffer, *read, nullptr, &tap_error_);
276 }
277 return res;
278}
279
280StreamResult StreamTap::Write(const void* data, size_t data_len,
281 size_t* written, int* error) {
282 size_t backup_written;
283 if (!written) {
284 written = &backup_written;
285 }
286 StreamResult res = StreamAdapterInterface::Write(data, data_len,
287 written, error);
288 if ((res == SR_SUCCESS) && (tap_result_ == SR_SUCCESS)) {
289 tap_result_ = tap_->WriteAll(data, *written, nullptr, &tap_error_);
290 }
291 return res;
292}
293
294///////////////////////////////////////////////////////////////////////////////
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000295// NullStream
296///////////////////////////////////////////////////////////////////////////////
297
298NullStream::NullStream() {
299}
300
301NullStream::~NullStream() {
302}
303
304StreamState NullStream::GetState() const {
305 return SS_OPEN;
306}
307
308StreamResult NullStream::Read(void* buffer, size_t buffer_len,
309 size_t* read, int* error) {
310 if (error) *error = -1;
311 return SR_ERROR;
312}
313
314StreamResult NullStream::Write(const void* data, size_t data_len,
315 size_t* written, int* error) {
316 if (written) *written = data_len;
317 return SR_SUCCESS;
318}
319
320void NullStream::Close() {
321}
322
323///////////////////////////////////////////////////////////////////////////////
324// FileStream
325///////////////////////////////////////////////////////////////////////////////
326
deadbeef37f5ecf2017-02-27 14:06:41 -0800327FileStream::FileStream() : file_(nullptr) {}
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000328
329FileStream::~FileStream() {
330 FileStream::Close();
331}
332
333bool FileStream::Open(const std::string& filename, const char* mode,
334 int* error) {
335 Close();
336#if defined(WEBRTC_WIN)
337 std::wstring wfilename;
338 if (Utf8ToWindowsFilename(filename, &wfilename)) {
339 file_ = _wfopen(wfilename.c_str(), ToUtf16(mode).c_str());
340 } else {
341 if (error) {
342 *error = -1;
343 return false;
344 }
345 }
346#else
347 file_ = fopen(filename.c_str(), mode);
348#endif
349 if (!file_ && error) {
350 *error = errno;
351 }
deadbeef37f5ecf2017-02-27 14:06:41 -0800352 return (file_ != nullptr);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000353}
354
355bool FileStream::OpenShare(const std::string& filename, const char* mode,
356 int shflag, int* error) {
357 Close();
358#if defined(WEBRTC_WIN)
359 std::wstring wfilename;
360 if (Utf8ToWindowsFilename(filename, &wfilename)) {
361 file_ = _wfsopen(wfilename.c_str(), ToUtf16(mode).c_str(), shflag);
362 if (!file_ && error) {
363 *error = errno;
364 return false;
365 }
deadbeef37f5ecf2017-02-27 14:06:41 -0800366 return file_ != nullptr;
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000367 } else {
368 if (error) {
369 *error = -1;
370 }
371 return false;
372 }
373#else
374 return Open(filename, mode, error);
375#endif
376}
377
378bool FileStream::DisableBuffering() {
379 if (!file_)
380 return false;
deadbeef37f5ecf2017-02-27 14:06:41 -0800381 return (setvbuf(file_, nullptr, _IONBF, 0) == 0);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000382}
383
384StreamState FileStream::GetState() const {
deadbeef37f5ecf2017-02-27 14:06:41 -0800385 return (file_ == nullptr) ? SS_CLOSED : SS_OPEN;
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000386}
387
388StreamResult FileStream::Read(void* buffer, size_t buffer_len,
389 size_t* read, int* error) {
390 if (!file_)
391 return SR_EOS;
392 size_t result = fread(buffer, 1, buffer_len, file_);
393 if ((result == 0) && (buffer_len > 0)) {
394 if (feof(file_))
395 return SR_EOS;
396 if (error)
397 *error = errno;
398 return SR_ERROR;
399 }
400 if (read)
401 *read = result;
402 return SR_SUCCESS;
403}
404
405StreamResult FileStream::Write(const void* data, size_t data_len,
406 size_t* written, int* error) {
407 if (!file_)
408 return SR_EOS;
409 size_t result = fwrite(data, 1, data_len, file_);
410 if ((result == 0) && (data_len > 0)) {
411 if (error)
412 *error = errno;
413 return SR_ERROR;
414 }
415 if (written)
416 *written = result;
417 return SR_SUCCESS;
418}
419
420void FileStream::Close() {
421 if (file_) {
422 DoClose();
deadbeef37f5ecf2017-02-27 14:06:41 -0800423 file_ = nullptr;
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000424 }
425}
426
427bool FileStream::SetPosition(size_t position) {
428 if (!file_)
429 return false;
430 return (fseek(file_, static_cast<int>(position), SEEK_SET) == 0);
431}
432
433bool FileStream::GetPosition(size_t* position) const {
deadbeef37f5ecf2017-02-27 14:06:41 -0800434 RTC_DCHECK(nullptr != position);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000435 if (!file_)
436 return false;
437 long result = ftell(file_);
438 if (result < 0)
439 return false;
440 if (position)
441 *position = result;
442 return true;
443}
444
445bool FileStream::GetSize(size_t* size) const {
deadbeef37f5ecf2017-02-27 14:06:41 -0800446 RTC_DCHECK(nullptr != size);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000447 if (!file_)
448 return false;
449 struct stat file_stats;
450 if (fstat(fileno(file_), &file_stats) != 0)
451 return false;
452 if (size)
453 *size = file_stats.st_size;
454 return true;
455}
456
457bool FileStream::GetAvailable(size_t* size) const {
deadbeef37f5ecf2017-02-27 14:06:41 -0800458 RTC_DCHECK(nullptr != size);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000459 if (!GetSize(size))
460 return false;
461 long result = ftell(file_);
462 if (result < 0)
463 return false;
464 if (size)
465 *size -= result;
466 return true;
467}
468
469bool FileStream::ReserveSize(size_t size) {
470 // TODO: extend the file to the proper length
471 return true;
472}
473
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000474bool FileStream::Flush() {
475 if (file_) {
476 return (0 == fflush(file_));
477 }
478 // try to flush empty file?
nissec80e7412017-01-11 05:56:46 -0800479 RTC_NOTREACHED();
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000480 return false;
481}
482
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000483void FileStream::DoClose() {
484 fclose(file_);
485}
486
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000487///////////////////////////////////////////////////////////////////////////////
488// MemoryStream
489///////////////////////////////////////////////////////////////////////////////
490
491MemoryStreamBase::MemoryStreamBase()
deadbeef37f5ecf2017-02-27 14:06:41 -0800492 : buffer_(nullptr), buffer_length_(0), data_length_(0), seek_position_(0) {}
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000493
494StreamState MemoryStreamBase::GetState() const {
495 return SS_OPEN;
496}
497
498StreamResult MemoryStreamBase::Read(void* buffer, size_t bytes,
499 size_t* bytes_read, int* error) {
500 if (seek_position_ >= data_length_) {
501 return SR_EOS;
502 }
503 size_t available = data_length_ - seek_position_;
504 if (bytes > available) {
505 // Read partial buffer
506 bytes = available;
507 }
508 memcpy(buffer, &buffer_[seek_position_], bytes);
509 seek_position_ += bytes;
510 if (bytes_read) {
511 *bytes_read = bytes;
512 }
513 return SR_SUCCESS;
514}
515
516StreamResult MemoryStreamBase::Write(const void* buffer, size_t bytes,
517 size_t* bytes_written, int* error) {
518 size_t available = buffer_length_ - seek_position_;
519 if (0 == available) {
520 // Increase buffer size to the larger of:
521 // a) new position rounded up to next 256 bytes
522 // b) double the previous length
andresp@webrtc.orgff689be2015-02-12 11:54:26 +0000523 size_t new_buffer_length =
524 std::max(((seek_position_ + bytes) | 0xFF) + 1, buffer_length_ * 2);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000525 StreamResult result = DoReserve(new_buffer_length, error);
526 if (SR_SUCCESS != result) {
527 return result;
528 }
nisseede5da42017-01-12 05:15:36 -0800529 RTC_DCHECK(buffer_length_ >= new_buffer_length);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000530 available = buffer_length_ - seek_position_;
531 }
532
533 if (bytes > available) {
534 bytes = available;
535 }
536 memcpy(&buffer_[seek_position_], buffer, bytes);
537 seek_position_ += bytes;
538 if (data_length_ < seek_position_) {
539 data_length_ = seek_position_;
540 }
541 if (bytes_written) {
542 *bytes_written = bytes;
543 }
544 return SR_SUCCESS;
545}
546
547void MemoryStreamBase::Close() {
548 // nothing to do
549}
550
551bool MemoryStreamBase::SetPosition(size_t position) {
552 if (position > data_length_)
553 return false;
554 seek_position_ = position;
555 return true;
556}
557
558bool MemoryStreamBase::GetPosition(size_t* position) const {
559 if (position)
560 *position = seek_position_;
561 return true;
562}
563
564bool MemoryStreamBase::GetSize(size_t* size) const {
565 if (size)
566 *size = data_length_;
567 return true;
568}
569
570bool MemoryStreamBase::GetAvailable(size_t* size) const {
571 if (size)
572 *size = data_length_ - seek_position_;
573 return true;
574}
575
576bool MemoryStreamBase::ReserveSize(size_t size) {
deadbeef37f5ecf2017-02-27 14:06:41 -0800577 return (SR_SUCCESS == DoReserve(size, nullptr));
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000578}
579
580StreamResult MemoryStreamBase::DoReserve(size_t size, int* error) {
581 return (buffer_length_ >= size) ? SR_SUCCESS : SR_EOS;
582}
583
584///////////////////////////////////////////////////////////////////////////////
585
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000586
Niels Möller7502a9e2018-05-21 16:11:34 +0200587MemoryStream::MemoryStream() {}
588
589MemoryStream::MemoryStream(const char* data) {
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000590 SetData(data, strlen(data));
591}
592
Niels Möller7502a9e2018-05-21 16:11:34 +0200593MemoryStream::MemoryStream(const void* data, size_t length) {
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000594 SetData(data, length);
595}
596
597MemoryStream::~MemoryStream() {
Niels Möller7502a9e2018-05-21 16:11:34 +0200598 delete [] buffer_;
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000599}
600
601void MemoryStream::SetData(const void* data, size_t length) {
602 data_length_ = buffer_length_ = length;
Niels Möller7502a9e2018-05-21 16:11:34 +0200603 delete [] buffer_;
604 buffer_ = new char[buffer_length_];
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000605 memcpy(buffer_, data, data_length_);
606 seek_position_ = 0;
607}
608
609StreamResult MemoryStream::DoReserve(size_t size, int* error) {
610 if (buffer_length_ >= size)
611 return SR_SUCCESS;
612
Niels Möller7502a9e2018-05-21 16:11:34 +0200613 if (char* new_buffer = new char[size]) {
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000614 memcpy(new_buffer, buffer_, data_length_);
Niels Möller7502a9e2018-05-21 16:11:34 +0200615 delete [] buffer_;
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000616 buffer_ = new_buffer;
617 buffer_length_ = size;
618 return SR_SUCCESS;
619 }
620
621 if (error) {
622 *error = ENOMEM;
623 }
624 return SR_ERROR;
625}
626
627///////////////////////////////////////////////////////////////////////////////
deadbeeff137e972017-03-23 15:45:49 -0700628
629ExternalMemoryStream::ExternalMemoryStream() {
630}
631
632ExternalMemoryStream::ExternalMemoryStream(void* data, size_t length) {
633 SetData(data, length);
634}
635
636ExternalMemoryStream::~ExternalMemoryStream() {
637}
638
639void ExternalMemoryStream::SetData(void* data, size_t length) {
640 data_length_ = buffer_length_ = length;
641 buffer_ = static_cast<char*>(data);
642 seek_position_ = 0;
643}
644
645///////////////////////////////////////////////////////////////////////////////
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000646// FifoBuffer
647///////////////////////////////////////////////////////////////////////////////
648
649FifoBuffer::FifoBuffer(size_t size)
650 : state_(SS_OPEN), buffer_(new char[size]), buffer_length_(size),
651 data_length_(0), read_position_(0), owner_(Thread::Current()) {
652 // all events are done on the owner_ thread
653}
654
655FifoBuffer::FifoBuffer(size_t size, Thread* owner)
656 : state_(SS_OPEN), buffer_(new char[size]), buffer_length_(size),
657 data_length_(0), read_position_(0), owner_(owner) {
658 // all events are done on the owner_ thread
659}
660
661FifoBuffer::~FifoBuffer() {
662}
663
664bool FifoBuffer::GetBuffered(size_t* size) const {
665 CritScope cs(&crit_);
666 *size = data_length_;
667 return true;
668}
669
670bool FifoBuffer::SetCapacity(size_t size) {
671 CritScope cs(&crit_);
672 if (data_length_ > size) {
673 return false;
674 }
675
676 if (size != buffer_length_) {
677 char* buffer = new char[size];
678 const size_t copy = data_length_;
andresp@webrtc.orgff689be2015-02-12 11:54:26 +0000679 const size_t tail_copy = std::min(copy, buffer_length_ - read_position_);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000680 memcpy(buffer, &buffer_[read_position_], tail_copy);
681 memcpy(buffer + tail_copy, &buffer_[0], copy - tail_copy);
682 buffer_.reset(buffer);
683 read_position_ = 0;
684 buffer_length_ = size;
685 }
686 return true;
687}
688
689StreamResult FifoBuffer::ReadOffset(void* buffer, size_t bytes,
690 size_t offset, size_t* bytes_read) {
691 CritScope cs(&crit_);
692 return ReadOffsetLocked(buffer, bytes, offset, bytes_read);
693}
694
695StreamResult FifoBuffer::WriteOffset(const void* buffer, size_t bytes,
696 size_t offset, size_t* bytes_written) {
697 CritScope cs(&crit_);
698 return WriteOffsetLocked(buffer, bytes, offset, bytes_written);
699}
700
701StreamState FifoBuffer::GetState() const {
jbauch097d5492016-02-09 02:30:34 -0800702 CritScope cs(&crit_);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000703 return state_;
704}
705
706StreamResult FifoBuffer::Read(void* buffer, size_t bytes,
707 size_t* bytes_read, int* error) {
708 CritScope cs(&crit_);
709 const bool was_writable = data_length_ < buffer_length_;
710 size_t copy = 0;
711 StreamResult result = ReadOffsetLocked(buffer, bytes, 0, &copy);
712
713 if (result == SR_SUCCESS) {
714 // If read was successful then adjust the read position and number of
715 // bytes buffered.
716 read_position_ = (read_position_ + copy) % buffer_length_;
717 data_length_ -= copy;
718 if (bytes_read) {
719 *bytes_read = copy;
720 }
721
722 // if we were full before, and now we're not, post an event
723 if (!was_writable && copy > 0) {
724 PostEvent(owner_, SE_WRITE, 0);
725 }
726 }
727 return result;
728}
729
730StreamResult FifoBuffer::Write(const void* buffer, size_t bytes,
731 size_t* bytes_written, int* error) {
732 CritScope cs(&crit_);
733
734 const bool was_readable = (data_length_ > 0);
735 size_t copy = 0;
736 StreamResult result = WriteOffsetLocked(buffer, bytes, 0, &copy);
737
738 if (result == SR_SUCCESS) {
739 // If write was successful then adjust the number of readable bytes.
740 data_length_ += copy;
741 if (bytes_written) {
742 *bytes_written = copy;
743 }
744
745 // if we didn't have any data to read before, and now we do, post an event
746 if (!was_readable && copy > 0) {
747 PostEvent(owner_, SE_READ, 0);
748 }
749 }
750 return result;
751}
752
753void FifoBuffer::Close() {
754 CritScope cs(&crit_);
755 state_ = SS_CLOSED;
756}
757
758const void* FifoBuffer::GetReadData(size_t* size) {
759 CritScope cs(&crit_);
760 *size = (read_position_ + data_length_ <= buffer_length_) ?
761 data_length_ : buffer_length_ - read_position_;
762 return &buffer_[read_position_];
763}
764
765void FifoBuffer::ConsumeReadData(size_t size) {
766 CritScope cs(&crit_);
nisseede5da42017-01-12 05:15:36 -0800767 RTC_DCHECK(size <= data_length_);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000768 const bool was_writable = data_length_ < buffer_length_;
769 read_position_ = (read_position_ + size) % buffer_length_;
770 data_length_ -= size;
771 if (!was_writable && size > 0) {
772 PostEvent(owner_, SE_WRITE, 0);
773 }
774}
775
776void* FifoBuffer::GetWriteBuffer(size_t* size) {
777 CritScope cs(&crit_);
778 if (state_ == SS_CLOSED) {
deadbeef37f5ecf2017-02-27 14:06:41 -0800779 return nullptr;
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000780 }
781
782 // if empty, reset the write position to the beginning, so we can get
783 // the biggest possible block
784 if (data_length_ == 0) {
785 read_position_ = 0;
786 }
787
788 const size_t write_position = (read_position_ + data_length_)
789 % buffer_length_;
790 *size = (write_position > read_position_ || data_length_ == 0) ?
791 buffer_length_ - write_position : read_position_ - write_position;
792 return &buffer_[write_position];
793}
794
795void FifoBuffer::ConsumeWriteBuffer(size_t size) {
796 CritScope cs(&crit_);
nisseede5da42017-01-12 05:15:36 -0800797 RTC_DCHECK(size <= buffer_length_ - data_length_);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000798 const bool was_readable = (data_length_ > 0);
799 data_length_ += size;
800 if (!was_readable && size > 0) {
801 PostEvent(owner_, SE_READ, 0);
802 }
803}
804
805bool FifoBuffer::GetWriteRemaining(size_t* size) const {
806 CritScope cs(&crit_);
807 *size = buffer_length_ - data_length_;
808 return true;
809}
810
811StreamResult FifoBuffer::ReadOffsetLocked(void* buffer,
812 size_t bytes,
813 size_t offset,
814 size_t* bytes_read) {
815 if (offset >= data_length_) {
816 return (state_ != SS_CLOSED) ? SR_BLOCK : SR_EOS;
817 }
818
819 const size_t available = data_length_ - offset;
820 const size_t read_position = (read_position_ + offset) % buffer_length_;
andresp@webrtc.orgff689be2015-02-12 11:54:26 +0000821 const size_t copy = std::min(bytes, available);
822 const size_t tail_copy = std::min(copy, buffer_length_ - read_position);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000823 char* const p = static_cast<char*>(buffer);
824 memcpy(p, &buffer_[read_position], tail_copy);
825 memcpy(p + tail_copy, &buffer_[0], copy - tail_copy);
826
827 if (bytes_read) {
828 *bytes_read = copy;
829 }
830 return SR_SUCCESS;
831}
832
833StreamResult FifoBuffer::WriteOffsetLocked(const void* buffer,
834 size_t bytes,
835 size_t offset,
836 size_t* bytes_written) {
837 if (state_ == SS_CLOSED) {
838 return SR_EOS;
839 }
840
841 if (data_length_ + offset >= buffer_length_) {
842 return SR_BLOCK;
843 }
844
845 const size_t available = buffer_length_ - data_length_ - offset;
846 const size_t write_position = (read_position_ + data_length_ + offset)
847 % buffer_length_;
andresp@webrtc.orgff689be2015-02-12 11:54:26 +0000848 const size_t copy = std::min(bytes, available);
849 const size_t tail_copy = std::min(copy, buffer_length_ - write_position);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000850 const char* const p = static_cast<const char*>(buffer);
851 memcpy(&buffer_[write_position], p, tail_copy);
852 memcpy(&buffer_[0], p + tail_copy, copy - tail_copy);
853
854 if (bytes_written) {
855 *bytes_written = copy;
856 }
857 return SR_SUCCESS;
858}
859
deadbeeff137e972017-03-23 15:45:49 -0700860
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000861///////////////////////////////////////////////////////////////////////////////
862// StringStream - Reads/Writes to an external std::string
863///////////////////////////////////////////////////////////////////////////////
864
Tommi00aac5a2015-05-25 11:25:59 +0200865StringStream::StringStream(std::string* str)
866 : str_(*str), read_pos_(0), read_only_(false) {
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000867}
868
869StringStream::StringStream(const std::string& str)
870 : str_(const_cast<std::string&>(str)), read_pos_(0), read_only_(true) {
871}
872
873StreamState StringStream::GetState() const {
874 return SS_OPEN;
875}
876
877StreamResult StringStream::Read(void* buffer, size_t buffer_len,
878 size_t* read, int* error) {
andresp@webrtc.orgff689be2015-02-12 11:54:26 +0000879 size_t available = std::min(buffer_len, str_.size() - read_pos_);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000880 if (!available)
881 return SR_EOS;
882 memcpy(buffer, str_.data() + read_pos_, available);
883 read_pos_ += available;
884 if (read)
885 *read = available;
886 return SR_SUCCESS;
887}
888
889StreamResult StringStream::Write(const void* data, size_t data_len,
890 size_t* written, int* error) {
891 if (read_only_) {
892 if (error) {
893 *error = -1;
894 }
895 return SR_ERROR;
896 }
897 str_.append(static_cast<const char*>(data),
898 static_cast<const char*>(data) + data_len);
899 if (written)
900 *written = data_len;
901 return SR_SUCCESS;
902}
903
904void StringStream::Close() {
905}
906
907bool StringStream::SetPosition(size_t position) {
908 if (position > str_.size())
909 return false;
910 read_pos_ = position;
911 return true;
912}
913
914bool StringStream::GetPosition(size_t* position) const {
915 if (position)
916 *position = read_pos_;
917 return true;
918}
919
920bool StringStream::GetSize(size_t* size) const {
921 if (size)
922 *size = str_.size();
923 return true;
924}
925
926bool StringStream::GetAvailable(size_t* size) const {
927 if (size)
928 *size = str_.size() - read_pos_;
929 return true;
930}
931
932bool StringStream::ReserveSize(size_t size) {
933 if (read_only_)
934 return false;
935 str_.reserve(size);
936 return true;
937}
938
939///////////////////////////////////////////////////////////////////////////////
deadbeeff137e972017-03-23 15:45:49 -0700940// StreamReference
941///////////////////////////////////////////////////////////////////////////////
942
943StreamReference::StreamReference(StreamInterface* stream)
944 : StreamAdapterInterface(stream, false) {
945 // owner set to false so the destructor does not free the stream.
946 stream_ref_count_ = new StreamRefCount(stream);
947}
948
949StreamInterface* StreamReference::NewReference() {
950 stream_ref_count_->AddReference();
951 return new StreamReference(stream_ref_count_, stream());
952}
953
954StreamReference::~StreamReference() {
955 stream_ref_count_->Release();
956}
957
958StreamReference::StreamReference(StreamRefCount* stream_ref_count,
959 StreamInterface* stream)
960 : StreamAdapterInterface(stream, false),
961 stream_ref_count_(stream_ref_count) {
962}
963
964///////////////////////////////////////////////////////////////////////////////
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000965
966StreamResult Flow(StreamInterface* source,
deadbeef37f5ecf2017-02-27 14:06:41 -0800967 char* buffer,
968 size_t buffer_len,
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000969 StreamInterface* sink,
deadbeef37f5ecf2017-02-27 14:06:41 -0800970 size_t* data_len /* = nullptr */) {
nisseede5da42017-01-12 05:15:36 -0800971 RTC_DCHECK(buffer_len > 0);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000972
973 StreamResult result;
974 size_t count, read_pos, write_pos;
975 if (data_len) {
976 read_pos = *data_len;
977 } else {
978 read_pos = 0;
979 }
980
981 bool end_of_stream = false;
982 do {
983 // Read until buffer is full, end of stream, or error
984 while (!end_of_stream && (read_pos < buffer_len)) {
deadbeef37f5ecf2017-02-27 14:06:41 -0800985 result = source->Read(buffer + read_pos, buffer_len - read_pos, &count,
986 nullptr);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000987 if (result == SR_EOS) {
988 end_of_stream = true;
989 } else if (result != SR_SUCCESS) {
990 if (data_len) {
991 *data_len = read_pos;
992 }
993 return result;
994 } else {
995 read_pos += count;
996 }
997 }
998
999 // Write until buffer is empty, or error (including end of stream)
1000 write_pos = 0;
1001 while (write_pos < read_pos) {
deadbeef37f5ecf2017-02-27 14:06:41 -08001002 result = sink->Write(buffer + write_pos, read_pos - write_pos, &count,
1003 nullptr);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +00001004 if (result != SR_SUCCESS) {
1005 if (data_len) {
1006 *data_len = read_pos - write_pos;
1007 if (write_pos > 0) {
1008 memmove(buffer, buffer + write_pos, *data_len);
1009 }
1010 }
1011 return result;
1012 }
1013 write_pos += count;
1014 }
1015
1016 read_pos = 0;
1017 } while (!end_of_stream);
1018
1019 if (data_len) {
1020 *data_len = 0;
1021 }
1022 return SR_SUCCESS;
1023}
1024
1025///////////////////////////////////////////////////////////////////////////////
1026
1027} // namespace rtc