blob: fe99f939517a39ce14d606480a25bd9b02558a04 [file] [log] [blame]
henrike@webrtc.orgf0488722014-05-13 18:00:26 +00001/*
2 * Copyright 2004 The WebRTC Project Authors. All rights reserved.
3 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 */
10
11#if defined(WEBRTC_POSIX)
12#include <sys/file.h>
13#endif // WEBRTC_POSIX
14#include <sys/types.h>
15#include <sys/stat.h>
16#include <errno.h>
andresp@webrtc.orgff689be2015-02-12 11:54:26 +000017
18#include <algorithm>
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000019#include <string>
andresp@webrtc.orgff689be2015-02-12 11:54:26 +000020
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000021#include "webrtc/base/basictypes.h"
22#include "webrtc/base/common.h"
23#include "webrtc/base/logging.h"
24#include "webrtc/base/messagequeue.h"
25#include "webrtc/base/stream.h"
26#include "webrtc/base/stringencode.h"
27#include "webrtc/base/stringutils.h"
28#include "webrtc/base/thread.h"
29#include "webrtc/base/timeutils.h"
30
31#if defined(WEBRTC_WIN)
32#include "webrtc/base/win32.h"
33#define fileno _fileno
34#endif
35
36namespace rtc {
37
38///////////////////////////////////////////////////////////////////////////////
39// StreamInterface
40///////////////////////////////////////////////////////////////////////////////
41StreamInterface::~StreamInterface() {
42}
43
44StreamResult StreamInterface::WriteAll(const void* data, size_t data_len,
45 size_t* written, int* error) {
46 StreamResult result = SR_SUCCESS;
47 size_t total_written = 0, current_written;
48 while (total_written < data_len) {
49 result = Write(static_cast<const char*>(data) + total_written,
50 data_len - total_written, &current_written, error);
51 if (result != SR_SUCCESS)
52 break;
53 total_written += current_written;
54 }
55 if (written)
56 *written = total_written;
57 return result;
58}
59
60StreamResult StreamInterface::ReadAll(void* buffer, size_t buffer_len,
61 size_t* read, int* error) {
62 StreamResult result = SR_SUCCESS;
63 size_t total_read = 0, current_read;
64 while (total_read < buffer_len) {
65 result = Read(static_cast<char*>(buffer) + total_read,
66 buffer_len - total_read, &current_read, error);
67 if (result != SR_SUCCESS)
68 break;
69 total_read += current_read;
70 }
71 if (read)
72 *read = total_read;
73 return result;
74}
75
76StreamResult StreamInterface::ReadLine(std::string* line) {
77 line->clear();
78 StreamResult result = SR_SUCCESS;
79 while (true) {
80 char ch;
81 result = Read(&ch, sizeof(ch), NULL, NULL);
82 if (result != SR_SUCCESS) {
83 break;
84 }
85 if (ch == '\n') {
86 break;
87 }
88 line->push_back(ch);
89 }
90 if (!line->empty()) { // give back the line we've collected so far with
91 result = SR_SUCCESS; // a success code. Otherwise return the last code
92 }
93 return result;
94}
95
96void StreamInterface::PostEvent(Thread* t, int events, int err) {
97 t->Post(this, MSG_POST_EVENT, new StreamEventData(events, err));
98}
99
100void StreamInterface::PostEvent(int events, int err) {
101 PostEvent(Thread::Current(), events, err);
102}
103
104StreamInterface::StreamInterface() {
105}
106
107void StreamInterface::OnMessage(Message* msg) {
108 if (MSG_POST_EVENT == msg->message_id) {
109 StreamEventData* pe = static_cast<StreamEventData*>(msg->pdata);
110 SignalEvent(this, pe->events, pe->error);
111 delete msg->pdata;
112 }
113}
114
115///////////////////////////////////////////////////////////////////////////////
116// StreamAdapterInterface
117///////////////////////////////////////////////////////////////////////////////
118
119StreamAdapterInterface::StreamAdapterInterface(StreamInterface* stream,
120 bool owned)
121 : stream_(stream), owned_(owned) {
122 if (NULL != stream_)
123 stream_->SignalEvent.connect(this, &StreamAdapterInterface::OnEvent);
124}
125
126void StreamAdapterInterface::Attach(StreamInterface* stream, bool owned) {
127 if (NULL != stream_)
128 stream_->SignalEvent.disconnect(this);
129 if (owned_)
130 delete stream_;
131 stream_ = stream;
132 owned_ = owned;
133 if (NULL != stream_)
134 stream_->SignalEvent.connect(this, &StreamAdapterInterface::OnEvent);
135}
136
137StreamInterface* StreamAdapterInterface::Detach() {
138 if (NULL != stream_)
139 stream_->SignalEvent.disconnect(this);
140 StreamInterface* stream = stream_;
141 stream_ = NULL;
142 return stream;
143}
144
145StreamAdapterInterface::~StreamAdapterInterface() {
146 if (owned_)
147 delete stream_;
148}
149
150///////////////////////////////////////////////////////////////////////////////
151// StreamTap
152///////////////////////////////////////////////////////////////////////////////
153
154StreamTap::StreamTap(StreamInterface* stream, StreamInterface* tap)
155 : StreamAdapterInterface(stream), tap_(), tap_result_(SR_SUCCESS),
156 tap_error_(0) {
157 AttachTap(tap);
158}
159
160void StreamTap::AttachTap(StreamInterface* tap) {
161 tap_.reset(tap);
162}
163
164StreamInterface* StreamTap::DetachTap() {
165 return tap_.release();
166}
167
168StreamResult StreamTap::GetTapResult(int* error) {
169 if (error) {
170 *error = tap_error_;
171 }
172 return tap_result_;
173}
174
175StreamResult StreamTap::Read(void* buffer, size_t buffer_len,
176 size_t* read, int* error) {
177 size_t backup_read;
178 if (!read) {
179 read = &backup_read;
180 }
181 StreamResult res = StreamAdapterInterface::Read(buffer, buffer_len,
182 read, error);
183 if ((res == SR_SUCCESS) && (tap_result_ == SR_SUCCESS)) {
184 tap_result_ = tap_->WriteAll(buffer, *read, NULL, &tap_error_);
185 }
186 return res;
187}
188
189StreamResult StreamTap::Write(const void* data, size_t data_len,
190 size_t* written, int* error) {
191 size_t backup_written;
192 if (!written) {
193 written = &backup_written;
194 }
195 StreamResult res = StreamAdapterInterface::Write(data, data_len,
196 written, error);
197 if ((res == SR_SUCCESS) && (tap_result_ == SR_SUCCESS)) {
198 tap_result_ = tap_->WriteAll(data, *written, NULL, &tap_error_);
199 }
200 return res;
201}
202
203///////////////////////////////////////////////////////////////////////////////
204// StreamSegment
205///////////////////////////////////////////////////////////////////////////////
206
207StreamSegment::StreamSegment(StreamInterface* stream)
208 : StreamAdapterInterface(stream), start_(SIZE_UNKNOWN), pos_(0),
209 length_(SIZE_UNKNOWN) {
210 // It's ok for this to fail, in which case start_ is left as SIZE_UNKNOWN.
211 stream->GetPosition(&start_);
212}
213
214StreamSegment::StreamSegment(StreamInterface* stream, size_t length)
215 : StreamAdapterInterface(stream), start_(SIZE_UNKNOWN), pos_(0),
216 length_(length) {
217 // It's ok for this to fail, in which case start_ is left as SIZE_UNKNOWN.
218 stream->GetPosition(&start_);
219}
220
221StreamResult StreamSegment::Read(void* buffer, size_t buffer_len,
222 size_t* read, int* error) {
223 if (SIZE_UNKNOWN != length_) {
224 if (pos_ >= length_)
225 return SR_EOS;
andresp@webrtc.orgff689be2015-02-12 11:54:26 +0000226 buffer_len = std::min(buffer_len, length_ - pos_);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000227 }
228 size_t backup_read;
229 if (!read) {
230 read = &backup_read;
231 }
232 StreamResult result = StreamAdapterInterface::Read(buffer, buffer_len,
233 read, error);
234 if (SR_SUCCESS == result) {
235 pos_ += *read;
236 }
237 return result;
238}
239
240bool StreamSegment::SetPosition(size_t position) {
241 if (SIZE_UNKNOWN == start_)
242 return false; // Not seekable
243 if ((SIZE_UNKNOWN != length_) && (position > length_))
244 return false; // Seek past end of segment
245 if (!StreamAdapterInterface::SetPosition(start_ + position))
246 return false;
247 pos_ = position;
248 return true;
249}
250
251bool StreamSegment::GetPosition(size_t* position) const {
252 if (SIZE_UNKNOWN == start_)
253 return false; // Not seekable
254 if (!StreamAdapterInterface::GetPosition(position))
255 return false;
256 if (position) {
257 ASSERT(*position >= start_);
258 *position -= start_;
259 }
260 return true;
261}
262
263bool StreamSegment::GetSize(size_t* size) const {
264 if (!StreamAdapterInterface::GetSize(size))
265 return false;
266 if (size) {
267 if (SIZE_UNKNOWN != start_) {
268 ASSERT(*size >= start_);
269 *size -= start_;
270 }
271 if (SIZE_UNKNOWN != length_) {
andresp@webrtc.orgff689be2015-02-12 11:54:26 +0000272 *size = std::min(*size, length_);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000273 }
274 }
275 return true;
276}
277
278bool StreamSegment::GetAvailable(size_t* size) const {
279 if (!StreamAdapterInterface::GetAvailable(size))
280 return false;
281 if (size && (SIZE_UNKNOWN != length_))
andresp@webrtc.orgff689be2015-02-12 11:54:26 +0000282 *size = std::min(*size, length_ - pos_);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000283 return true;
284}
285
286///////////////////////////////////////////////////////////////////////////////
287// NullStream
288///////////////////////////////////////////////////////////////////////////////
289
290NullStream::NullStream() {
291}
292
293NullStream::~NullStream() {
294}
295
296StreamState NullStream::GetState() const {
297 return SS_OPEN;
298}
299
300StreamResult NullStream::Read(void* buffer, size_t buffer_len,
301 size_t* read, int* error) {
302 if (error) *error = -1;
303 return SR_ERROR;
304}
305
306StreamResult NullStream::Write(const void* data, size_t data_len,
307 size_t* written, int* error) {
308 if (written) *written = data_len;
309 return SR_SUCCESS;
310}
311
312void NullStream::Close() {
313}
314
315///////////////////////////////////////////////////////////////////////////////
316// FileStream
317///////////////////////////////////////////////////////////////////////////////
318
319FileStream::FileStream() : file_(NULL) {
320}
321
322FileStream::~FileStream() {
323 FileStream::Close();
324}
325
326bool FileStream::Open(const std::string& filename, const char* mode,
327 int* error) {
328 Close();
329#if defined(WEBRTC_WIN)
330 std::wstring wfilename;
331 if (Utf8ToWindowsFilename(filename, &wfilename)) {
332 file_ = _wfopen(wfilename.c_str(), ToUtf16(mode).c_str());
333 } else {
334 if (error) {
335 *error = -1;
336 return false;
337 }
338 }
339#else
340 file_ = fopen(filename.c_str(), mode);
341#endif
342 if (!file_ && error) {
343 *error = errno;
344 }
345 return (file_ != NULL);
346}
347
348bool FileStream::OpenShare(const std::string& filename, const char* mode,
349 int shflag, int* error) {
350 Close();
351#if defined(WEBRTC_WIN)
352 std::wstring wfilename;
353 if (Utf8ToWindowsFilename(filename, &wfilename)) {
354 file_ = _wfsopen(wfilename.c_str(), ToUtf16(mode).c_str(), shflag);
355 if (!file_ && error) {
356 *error = errno;
357 return false;
358 }
359 return file_ != NULL;
360 } else {
361 if (error) {
362 *error = -1;
363 }
364 return false;
365 }
366#else
367 return Open(filename, mode, error);
368#endif
369}
370
371bool FileStream::DisableBuffering() {
372 if (!file_)
373 return false;
374 return (setvbuf(file_, NULL, _IONBF, 0) == 0);
375}
376
377StreamState FileStream::GetState() const {
378 return (file_ == NULL) ? SS_CLOSED : SS_OPEN;
379}
380
381StreamResult FileStream::Read(void* buffer, size_t buffer_len,
382 size_t* read, int* error) {
383 if (!file_)
384 return SR_EOS;
385 size_t result = fread(buffer, 1, buffer_len, file_);
386 if ((result == 0) && (buffer_len > 0)) {
387 if (feof(file_))
388 return SR_EOS;
389 if (error)
390 *error = errno;
391 return SR_ERROR;
392 }
393 if (read)
394 *read = result;
395 return SR_SUCCESS;
396}
397
398StreamResult FileStream::Write(const void* data, size_t data_len,
399 size_t* written, int* error) {
400 if (!file_)
401 return SR_EOS;
402 size_t result = fwrite(data, 1, data_len, file_);
403 if ((result == 0) && (data_len > 0)) {
404 if (error)
405 *error = errno;
406 return SR_ERROR;
407 }
408 if (written)
409 *written = result;
410 return SR_SUCCESS;
411}
412
413void FileStream::Close() {
414 if (file_) {
415 DoClose();
416 file_ = NULL;
417 }
418}
419
420bool FileStream::SetPosition(size_t position) {
421 if (!file_)
422 return false;
423 return (fseek(file_, static_cast<int>(position), SEEK_SET) == 0);
424}
425
426bool FileStream::GetPosition(size_t* position) const {
427 ASSERT(NULL != position);
428 if (!file_)
429 return false;
430 long result = ftell(file_);
431 if (result < 0)
432 return false;
433 if (position)
434 *position = result;
435 return true;
436}
437
438bool FileStream::GetSize(size_t* size) const {
439 ASSERT(NULL != size);
440 if (!file_)
441 return false;
442 struct stat file_stats;
443 if (fstat(fileno(file_), &file_stats) != 0)
444 return false;
445 if (size)
446 *size = file_stats.st_size;
447 return true;
448}
449
450bool FileStream::GetAvailable(size_t* size) const {
451 ASSERT(NULL != size);
452 if (!GetSize(size))
453 return false;
454 long result = ftell(file_);
455 if (result < 0)
456 return false;
457 if (size)
458 *size -= result;
459 return true;
460}
461
462bool FileStream::ReserveSize(size_t size) {
463 // TODO: extend the file to the proper length
464 return true;
465}
466
467bool FileStream::GetSize(const std::string& filename, size_t* size) {
468 struct stat file_stats;
469 if (stat(filename.c_str(), &file_stats) != 0)
470 return false;
471 *size = file_stats.st_size;
472 return true;
473}
474
475bool FileStream::Flush() {
476 if (file_) {
477 return (0 == fflush(file_));
478 }
479 // try to flush empty file?
480 ASSERT(false);
481 return false;
482}
483
484#if defined(WEBRTC_POSIX) && !defined(__native_client__)
485
486bool FileStream::TryLock() {
487 if (file_ == NULL) {
488 // Stream not open.
489 ASSERT(false);
490 return false;
491 }
492
493 return flock(fileno(file_), LOCK_EX|LOCK_NB) == 0;
494}
495
496bool FileStream::Unlock() {
497 if (file_ == NULL) {
498 // Stream not open.
499 ASSERT(false);
500 return false;
501 }
502
503 return flock(fileno(file_), LOCK_UN) == 0;
504}
505
506#endif
507
508void FileStream::DoClose() {
509 fclose(file_);
510}
511
512CircularFileStream::CircularFileStream(size_t max_size)
513 : max_write_size_(max_size),
514 position_(0),
515 marked_position_(max_size / 2),
516 last_write_position_(0),
517 read_segment_(READ_LATEST),
518 read_segment_available_(0) {
519}
520
521bool CircularFileStream::Open(
522 const std::string& filename, const char* mode, int* error) {
523 if (!FileStream::Open(filename.c_str(), mode, error))
524 return false;
525
526 if (strchr(mode, "r") != NULL) { // Opened in read mode.
527 // Check if the buffer has been overwritten and determine how to read the
528 // log in time sequence.
529 size_t file_size;
530 GetSize(&file_size);
531 if (file_size == position_) {
532 // The buffer has not been overwritten yet. Read 0 .. file_size
533 read_segment_ = READ_LATEST;
534 read_segment_available_ = file_size;
535 } else {
536 // The buffer has been over written. There are three segments: The first
537 // one is 0 .. marked_position_, which is the marked earliest log. The
538 // second one is position_ .. file_size, which is the middle log. The
539 // last one is marked_position_ .. position_, which is the latest log.
540 read_segment_ = READ_MARKED;
541 read_segment_available_ = marked_position_;
542 last_write_position_ = position_;
543 }
544
545 // Read from the beginning.
546 position_ = 0;
547 SetPosition(position_);
548 }
549
550 return true;
551}
552
553StreamResult CircularFileStream::Read(void* buffer, size_t buffer_len,
554 size_t* read, int* error) {
555 if (read_segment_available_ == 0) {
556 size_t file_size;
557 switch (read_segment_) {
558 case READ_MARKED: // Finished READ_MARKED and start READ_MIDDLE.
559 read_segment_ = READ_MIDDLE;
560 position_ = last_write_position_;
561 SetPosition(position_);
562 GetSize(&file_size);
563 read_segment_available_ = file_size - position_;
564 break;
565
566 case READ_MIDDLE: // Finished READ_MIDDLE and start READ_LATEST.
567 read_segment_ = READ_LATEST;
568 position_ = marked_position_;
569 SetPosition(position_);
570 read_segment_available_ = last_write_position_ - position_;
571 break;
572
573 default: // Finished READ_LATEST and return EOS.
574 return rtc::SR_EOS;
575 }
576 }
577
578 size_t local_read;
579 if (!read) read = &local_read;
580
andresp@webrtc.orgff689be2015-02-12 11:54:26 +0000581 size_t to_read = std::min(buffer_len, read_segment_available_);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000582 rtc::StreamResult result
583 = rtc::FileStream::Read(buffer, to_read, read, error);
584 if (result == rtc::SR_SUCCESS) {
585 read_segment_available_ -= *read;
586 position_ += *read;
587 }
588 return result;
589}
590
591StreamResult CircularFileStream::Write(const void* data, size_t data_len,
592 size_t* written, int* error) {
593 if (position_ >= max_write_size_) {
594 ASSERT(position_ == max_write_size_);
595 position_ = marked_position_;
596 SetPosition(position_);
597 }
598
599 size_t local_written;
600 if (!written) written = &local_written;
601
602 size_t to_eof = max_write_size_ - position_;
andresp@webrtc.orgff689be2015-02-12 11:54:26 +0000603 size_t to_write = std::min(data_len, to_eof);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000604 rtc::StreamResult result
605 = rtc::FileStream::Write(data, to_write, written, error);
606 if (result == rtc::SR_SUCCESS) {
607 position_ += *written;
608 }
609 return result;
610}
611
612AsyncWriteStream::~AsyncWriteStream() {
613 write_thread_->Clear(this, 0, NULL);
614 ClearBufferAndWrite();
615
616 CritScope cs(&crit_stream_);
617 stream_.reset();
618}
619
620// This is needed by some stream writers, such as RtpDumpWriter.
621bool AsyncWriteStream::GetPosition(size_t* position) const {
622 CritScope cs(&crit_stream_);
623 return stream_->GetPosition(position);
624}
625
626// This is needed by some stream writers, such as the plugin log writers.
627StreamResult AsyncWriteStream::Read(void* buffer, size_t buffer_len,
628 size_t* read, int* error) {
629 CritScope cs(&crit_stream_);
630 return stream_->Read(buffer, buffer_len, read, error);
631}
632
633void AsyncWriteStream::Close() {
634 if (state_ == SS_CLOSED) {
635 return;
636 }
637
638 write_thread_->Clear(this, 0, NULL);
639 ClearBufferAndWrite();
640
641 CritScope cs(&crit_stream_);
642 stream_->Close();
643 state_ = SS_CLOSED;
644}
645
646StreamResult AsyncWriteStream::Write(const void* data, size_t data_len,
647 size_t* written, int* error) {
648 if (state_ == SS_CLOSED) {
649 return SR_ERROR;
650 }
651
652 size_t previous_buffer_length = 0;
653 {
654 CritScope cs(&crit_buffer_);
655 previous_buffer_length = buffer_.length();
656 buffer_.AppendData(data, data_len);
657 }
658
659 if (previous_buffer_length == 0) {
660 // If there's stuff already in the buffer, then we already called
661 // Post and the write_thread_ hasn't pulled it out yet, so we
662 // don't need to re-Post.
663 write_thread_->Post(this, 0, NULL);
664 }
665 // Return immediately, assuming that it works.
666 if (written) {
667 *written = data_len;
668 }
669 return SR_SUCCESS;
670}
671
672void AsyncWriteStream::OnMessage(rtc::Message* pmsg) {
673 ClearBufferAndWrite();
674}
675
676bool AsyncWriteStream::Flush() {
677 if (state_ == SS_CLOSED) {
678 return false;
679 }
680
681 ClearBufferAndWrite();
682
683 CritScope cs(&crit_stream_);
684 return stream_->Flush();
685}
686
687void AsyncWriteStream::ClearBufferAndWrite() {
688 Buffer to_write;
689 {
690 CritScope cs_buffer(&crit_buffer_);
691 buffer_.TransferTo(&to_write);
692 }
693
694 if (to_write.length() > 0) {
695 CritScope cs(&crit_stream_);
696 stream_->WriteAll(to_write.data(), to_write.length(), NULL, NULL);
697 }
698}
699
700#if defined(WEBRTC_POSIX) && !defined(__native_client__)
701
702// Have to identically rewrite the FileStream destructor or else it would call
703// the base class's Close() instead of the sub-class's.
704POpenStream::~POpenStream() {
705 POpenStream::Close();
706}
707
708bool POpenStream::Open(const std::string& subcommand,
709 const char* mode,
710 int* error) {
711 Close();
712 file_ = popen(subcommand.c_str(), mode);
713 if (file_ == NULL) {
714 if (error)
715 *error = errno;
716 return false;
717 }
718 return true;
719}
720
721bool POpenStream::OpenShare(const std::string& subcommand, const char* mode,
722 int shflag, int* error) {
723 return Open(subcommand, mode, error);
724}
725
726void POpenStream::DoClose() {
727 wait_status_ = pclose(file_);
728}
729
730#endif
731
732///////////////////////////////////////////////////////////////////////////////
733// MemoryStream
734///////////////////////////////////////////////////////////////////////////////
735
736MemoryStreamBase::MemoryStreamBase()
737 : buffer_(NULL), buffer_length_(0), data_length_(0),
738 seek_position_(0) {
739}
740
741StreamState MemoryStreamBase::GetState() const {
742 return SS_OPEN;
743}
744
745StreamResult MemoryStreamBase::Read(void* buffer, size_t bytes,
746 size_t* bytes_read, int* error) {
747 if (seek_position_ >= data_length_) {
748 return SR_EOS;
749 }
750 size_t available = data_length_ - seek_position_;
751 if (bytes > available) {
752 // Read partial buffer
753 bytes = available;
754 }
755 memcpy(buffer, &buffer_[seek_position_], bytes);
756 seek_position_ += bytes;
757 if (bytes_read) {
758 *bytes_read = bytes;
759 }
760 return SR_SUCCESS;
761}
762
763StreamResult MemoryStreamBase::Write(const void* buffer, size_t bytes,
764 size_t* bytes_written, int* error) {
765 size_t available = buffer_length_ - seek_position_;
766 if (0 == available) {
767 // Increase buffer size to the larger of:
768 // a) new position rounded up to next 256 bytes
769 // b) double the previous length
andresp@webrtc.orgff689be2015-02-12 11:54:26 +0000770 size_t new_buffer_length =
771 std::max(((seek_position_ + bytes) | 0xFF) + 1, buffer_length_ * 2);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000772 StreamResult result = DoReserve(new_buffer_length, error);
773 if (SR_SUCCESS != result) {
774 return result;
775 }
776 ASSERT(buffer_length_ >= new_buffer_length);
777 available = buffer_length_ - seek_position_;
778 }
779
780 if (bytes > available) {
781 bytes = available;
782 }
783 memcpy(&buffer_[seek_position_], buffer, bytes);
784 seek_position_ += bytes;
785 if (data_length_ < seek_position_) {
786 data_length_ = seek_position_;
787 }
788 if (bytes_written) {
789 *bytes_written = bytes;
790 }
791 return SR_SUCCESS;
792}
793
794void MemoryStreamBase::Close() {
795 // nothing to do
796}
797
798bool MemoryStreamBase::SetPosition(size_t position) {
799 if (position > data_length_)
800 return false;
801 seek_position_ = position;
802 return true;
803}
804
805bool MemoryStreamBase::GetPosition(size_t* position) const {
806 if (position)
807 *position = seek_position_;
808 return true;
809}
810
811bool MemoryStreamBase::GetSize(size_t* size) const {
812 if (size)
813 *size = data_length_;
814 return true;
815}
816
817bool MemoryStreamBase::GetAvailable(size_t* size) const {
818 if (size)
819 *size = data_length_ - seek_position_;
820 return true;
821}
822
823bool MemoryStreamBase::ReserveSize(size_t size) {
824 return (SR_SUCCESS == DoReserve(size, NULL));
825}
826
827StreamResult MemoryStreamBase::DoReserve(size_t size, int* error) {
828 return (buffer_length_ >= size) ? SR_SUCCESS : SR_EOS;
829}
830
831///////////////////////////////////////////////////////////////////////////////
832
833MemoryStream::MemoryStream()
834 : buffer_alloc_(NULL) {
835}
836
837MemoryStream::MemoryStream(const char* data)
838 : buffer_alloc_(NULL) {
839 SetData(data, strlen(data));
840}
841
842MemoryStream::MemoryStream(const void* data, size_t length)
843 : buffer_alloc_(NULL) {
844 SetData(data, length);
845}
846
847MemoryStream::~MemoryStream() {
848 delete [] buffer_alloc_;
849}
850
851void MemoryStream::SetData(const void* data, size_t length) {
852 data_length_ = buffer_length_ = length;
853 delete [] buffer_alloc_;
854 buffer_alloc_ = new char[buffer_length_ + kAlignment];
855 buffer_ = reinterpret_cast<char*>(ALIGNP(buffer_alloc_, kAlignment));
856 memcpy(buffer_, data, data_length_);
857 seek_position_ = 0;
858}
859
860StreamResult MemoryStream::DoReserve(size_t size, int* error) {
861 if (buffer_length_ >= size)
862 return SR_SUCCESS;
863
864 if (char* new_buffer_alloc = new char[size + kAlignment]) {
865 char* new_buffer = reinterpret_cast<char*>(
866 ALIGNP(new_buffer_alloc, kAlignment));
867 memcpy(new_buffer, buffer_, data_length_);
868 delete [] buffer_alloc_;
869 buffer_alloc_ = new_buffer_alloc;
870 buffer_ = new_buffer;
871 buffer_length_ = size;
872 return SR_SUCCESS;
873 }
874
875 if (error) {
876 *error = ENOMEM;
877 }
878 return SR_ERROR;
879}
880
881///////////////////////////////////////////////////////////////////////////////
882
883ExternalMemoryStream::ExternalMemoryStream() {
884}
885
886ExternalMemoryStream::ExternalMemoryStream(void* data, size_t length) {
887 SetData(data, length);
888}
889
890ExternalMemoryStream::~ExternalMemoryStream() {
891}
892
893void ExternalMemoryStream::SetData(void* data, size_t length) {
894 data_length_ = buffer_length_ = length;
895 buffer_ = static_cast<char*>(data);
896 seek_position_ = 0;
897}
898
899///////////////////////////////////////////////////////////////////////////////
900// FifoBuffer
901///////////////////////////////////////////////////////////////////////////////
902
903FifoBuffer::FifoBuffer(size_t size)
904 : state_(SS_OPEN), buffer_(new char[size]), buffer_length_(size),
905 data_length_(0), read_position_(0), owner_(Thread::Current()) {
906 // all events are done on the owner_ thread
907}
908
909FifoBuffer::FifoBuffer(size_t size, Thread* owner)
910 : state_(SS_OPEN), buffer_(new char[size]), buffer_length_(size),
911 data_length_(0), read_position_(0), owner_(owner) {
912 // all events are done on the owner_ thread
913}
914
915FifoBuffer::~FifoBuffer() {
916}
917
918bool FifoBuffer::GetBuffered(size_t* size) const {
919 CritScope cs(&crit_);
920 *size = data_length_;
921 return true;
922}
923
924bool FifoBuffer::SetCapacity(size_t size) {
925 CritScope cs(&crit_);
926 if (data_length_ > size) {
927 return false;
928 }
929
930 if (size != buffer_length_) {
931 char* buffer = new char[size];
932 const size_t copy = data_length_;
andresp@webrtc.orgff689be2015-02-12 11:54:26 +0000933 const size_t tail_copy = std::min(copy, buffer_length_ - read_position_);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000934 memcpy(buffer, &buffer_[read_position_], tail_copy);
935 memcpy(buffer + tail_copy, &buffer_[0], copy - tail_copy);
936 buffer_.reset(buffer);
937 read_position_ = 0;
938 buffer_length_ = size;
939 }
940 return true;
941}
942
943StreamResult FifoBuffer::ReadOffset(void* buffer, size_t bytes,
944 size_t offset, size_t* bytes_read) {
945 CritScope cs(&crit_);
946 return ReadOffsetLocked(buffer, bytes, offset, bytes_read);
947}
948
949StreamResult FifoBuffer::WriteOffset(const void* buffer, size_t bytes,
950 size_t offset, size_t* bytes_written) {
951 CritScope cs(&crit_);
952 return WriteOffsetLocked(buffer, bytes, offset, bytes_written);
953}
954
955StreamState FifoBuffer::GetState() const {
956 return state_;
957}
958
959StreamResult FifoBuffer::Read(void* buffer, size_t bytes,
960 size_t* bytes_read, int* error) {
961 CritScope cs(&crit_);
962 const bool was_writable = data_length_ < buffer_length_;
963 size_t copy = 0;
964 StreamResult result = ReadOffsetLocked(buffer, bytes, 0, &copy);
965
966 if (result == SR_SUCCESS) {
967 // If read was successful then adjust the read position and number of
968 // bytes buffered.
969 read_position_ = (read_position_ + copy) % buffer_length_;
970 data_length_ -= copy;
971 if (bytes_read) {
972 *bytes_read = copy;
973 }
974
975 // if we were full before, and now we're not, post an event
976 if (!was_writable && copy > 0) {
977 PostEvent(owner_, SE_WRITE, 0);
978 }
979 }
980 return result;
981}
982
983StreamResult FifoBuffer::Write(const void* buffer, size_t bytes,
984 size_t* bytes_written, int* error) {
985 CritScope cs(&crit_);
986
987 const bool was_readable = (data_length_ > 0);
988 size_t copy = 0;
989 StreamResult result = WriteOffsetLocked(buffer, bytes, 0, &copy);
990
991 if (result == SR_SUCCESS) {
992 // If write was successful then adjust the number of readable bytes.
993 data_length_ += copy;
994 if (bytes_written) {
995 *bytes_written = copy;
996 }
997
998 // if we didn't have any data to read before, and now we do, post an event
999 if (!was_readable && copy > 0) {
1000 PostEvent(owner_, SE_READ, 0);
1001 }
1002 }
1003 return result;
1004}
1005
1006void FifoBuffer::Close() {
1007 CritScope cs(&crit_);
1008 state_ = SS_CLOSED;
1009}
1010
1011const void* FifoBuffer::GetReadData(size_t* size) {
1012 CritScope cs(&crit_);
1013 *size = (read_position_ + data_length_ <= buffer_length_) ?
1014 data_length_ : buffer_length_ - read_position_;
1015 return &buffer_[read_position_];
1016}
1017
1018void FifoBuffer::ConsumeReadData(size_t size) {
1019 CritScope cs(&crit_);
1020 ASSERT(size <= data_length_);
1021 const bool was_writable = data_length_ < buffer_length_;
1022 read_position_ = (read_position_ + size) % buffer_length_;
1023 data_length_ -= size;
1024 if (!was_writable && size > 0) {
1025 PostEvent(owner_, SE_WRITE, 0);
1026 }
1027}
1028
1029void* FifoBuffer::GetWriteBuffer(size_t* size) {
1030 CritScope cs(&crit_);
1031 if (state_ == SS_CLOSED) {
1032 return NULL;
1033 }
1034
1035 // if empty, reset the write position to the beginning, so we can get
1036 // the biggest possible block
1037 if (data_length_ == 0) {
1038 read_position_ = 0;
1039 }
1040
1041 const size_t write_position = (read_position_ + data_length_)
1042 % buffer_length_;
1043 *size = (write_position > read_position_ || data_length_ == 0) ?
1044 buffer_length_ - write_position : read_position_ - write_position;
1045 return &buffer_[write_position];
1046}
1047
1048void FifoBuffer::ConsumeWriteBuffer(size_t size) {
1049 CritScope cs(&crit_);
1050 ASSERT(size <= buffer_length_ - data_length_);
1051 const bool was_readable = (data_length_ > 0);
1052 data_length_ += size;
1053 if (!was_readable && size > 0) {
1054 PostEvent(owner_, SE_READ, 0);
1055 }
1056}
1057
1058bool FifoBuffer::GetWriteRemaining(size_t* size) const {
1059 CritScope cs(&crit_);
1060 *size = buffer_length_ - data_length_;
1061 return true;
1062}
1063
1064StreamResult FifoBuffer::ReadOffsetLocked(void* buffer,
1065 size_t bytes,
1066 size_t offset,
1067 size_t* bytes_read) {
1068 if (offset >= data_length_) {
1069 return (state_ != SS_CLOSED) ? SR_BLOCK : SR_EOS;
1070 }
1071
1072 const size_t available = data_length_ - offset;
1073 const size_t read_position = (read_position_ + offset) % buffer_length_;
andresp@webrtc.orgff689be2015-02-12 11:54:26 +00001074 const size_t copy = std::min(bytes, available);
1075 const size_t tail_copy = std::min(copy, buffer_length_ - read_position);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +00001076 char* const p = static_cast<char*>(buffer);
1077 memcpy(p, &buffer_[read_position], tail_copy);
1078 memcpy(p + tail_copy, &buffer_[0], copy - tail_copy);
1079
1080 if (bytes_read) {
1081 *bytes_read = copy;
1082 }
1083 return SR_SUCCESS;
1084}
1085
1086StreamResult FifoBuffer::WriteOffsetLocked(const void* buffer,
1087 size_t bytes,
1088 size_t offset,
1089 size_t* bytes_written) {
1090 if (state_ == SS_CLOSED) {
1091 return SR_EOS;
1092 }
1093
1094 if (data_length_ + offset >= buffer_length_) {
1095 return SR_BLOCK;
1096 }
1097
1098 const size_t available = buffer_length_ - data_length_ - offset;
1099 const size_t write_position = (read_position_ + data_length_ + offset)
1100 % buffer_length_;
andresp@webrtc.orgff689be2015-02-12 11:54:26 +00001101 const size_t copy = std::min(bytes, available);
1102 const size_t tail_copy = std::min(copy, buffer_length_ - write_position);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +00001103 const char* const p = static_cast<const char*>(buffer);
1104 memcpy(&buffer_[write_position], p, tail_copy);
1105 memcpy(&buffer_[0], p + tail_copy, copy - tail_copy);
1106
1107 if (bytes_written) {
1108 *bytes_written = copy;
1109 }
1110 return SR_SUCCESS;
1111}
1112
1113
1114
1115///////////////////////////////////////////////////////////////////////////////
1116// LoggingAdapter
1117///////////////////////////////////////////////////////////////////////////////
1118
1119LoggingAdapter::LoggingAdapter(StreamInterface* stream, LoggingSeverity level,
1120 const std::string& label, bool hex_mode)
1121 : StreamAdapterInterface(stream), level_(level), hex_mode_(hex_mode) {
1122 set_label(label);
1123}
1124
1125void LoggingAdapter::set_label(const std::string& label) {
1126 label_.assign("[");
1127 label_.append(label);
1128 label_.append("]");
1129}
1130
1131StreamResult LoggingAdapter::Read(void* buffer, size_t buffer_len,
1132 size_t* read, int* error) {
1133 size_t local_read; if (!read) read = &local_read;
1134 StreamResult result = StreamAdapterInterface::Read(buffer, buffer_len, read,
1135 error);
1136 if (result == SR_SUCCESS) {
1137 LogMultiline(level_, label_.c_str(), true, buffer, *read, hex_mode_, &lms_);
1138 }
1139 return result;
1140}
1141
1142StreamResult LoggingAdapter::Write(const void* data, size_t data_len,
1143 size_t* written, int* error) {
1144 size_t local_written;
1145 if (!written) written = &local_written;
1146 StreamResult result = StreamAdapterInterface::Write(data, data_len, written,
1147 error);
1148 if (result == SR_SUCCESS) {
1149 LogMultiline(level_, label_.c_str(), false, data, *written, hex_mode_,
1150 &lms_);
1151 }
1152 return result;
1153}
1154
1155void LoggingAdapter::Close() {
1156 LogMultiline(level_, label_.c_str(), false, NULL, 0, hex_mode_, &lms_);
1157 LogMultiline(level_, label_.c_str(), true, NULL, 0, hex_mode_, &lms_);
1158 LOG_V(level_) << label_ << " Closed locally";
1159 StreamAdapterInterface::Close();
1160}
1161
1162void LoggingAdapter::OnEvent(StreamInterface* stream, int events, int err) {
1163 if (events & SE_OPEN) {
1164 LOG_V(level_) << label_ << " Open";
1165 } else if (events & SE_CLOSE) {
1166 LogMultiline(level_, label_.c_str(), false, NULL, 0, hex_mode_, &lms_);
1167 LogMultiline(level_, label_.c_str(), true, NULL, 0, hex_mode_, &lms_);
1168 LOG_V(level_) << label_ << " Closed with error: " << err;
1169 }
1170 StreamAdapterInterface::OnEvent(stream, events, err);
1171}
1172
1173///////////////////////////////////////////////////////////////////////////////
1174// StringStream - Reads/Writes to an external std::string
1175///////////////////////////////////////////////////////////////////////////////
1176
1177StringStream::StringStream(std::string& str)
1178 : str_(str), read_pos_(0), read_only_(false) {
1179}
1180
1181StringStream::StringStream(const std::string& str)
1182 : str_(const_cast<std::string&>(str)), read_pos_(0), read_only_(true) {
1183}
1184
1185StreamState StringStream::GetState() const {
1186 return SS_OPEN;
1187}
1188
1189StreamResult StringStream::Read(void* buffer, size_t buffer_len,
1190 size_t* read, int* error) {
andresp@webrtc.orgff689be2015-02-12 11:54:26 +00001191 size_t available = std::min(buffer_len, str_.size() - read_pos_);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +00001192 if (!available)
1193 return SR_EOS;
1194 memcpy(buffer, str_.data() + read_pos_, available);
1195 read_pos_ += available;
1196 if (read)
1197 *read = available;
1198 return SR_SUCCESS;
1199}
1200
1201StreamResult StringStream::Write(const void* data, size_t data_len,
1202 size_t* written, int* error) {
1203 if (read_only_) {
1204 if (error) {
1205 *error = -1;
1206 }
1207 return SR_ERROR;
1208 }
1209 str_.append(static_cast<const char*>(data),
1210 static_cast<const char*>(data) + data_len);
1211 if (written)
1212 *written = data_len;
1213 return SR_SUCCESS;
1214}
1215
1216void StringStream::Close() {
1217}
1218
1219bool StringStream::SetPosition(size_t position) {
1220 if (position > str_.size())
1221 return false;
1222 read_pos_ = position;
1223 return true;
1224}
1225
1226bool StringStream::GetPosition(size_t* position) const {
1227 if (position)
1228 *position = read_pos_;
1229 return true;
1230}
1231
1232bool StringStream::GetSize(size_t* size) const {
1233 if (size)
1234 *size = str_.size();
1235 return true;
1236}
1237
1238bool StringStream::GetAvailable(size_t* size) const {
1239 if (size)
1240 *size = str_.size() - read_pos_;
1241 return true;
1242}
1243
1244bool StringStream::ReserveSize(size_t size) {
1245 if (read_only_)
1246 return false;
1247 str_.reserve(size);
1248 return true;
1249}
1250
1251///////////////////////////////////////////////////////////////////////////////
1252// StreamReference
1253///////////////////////////////////////////////////////////////////////////////
1254
1255StreamReference::StreamReference(StreamInterface* stream)
1256 : StreamAdapterInterface(stream, false) {
1257 // owner set to false so the destructor does not free the stream.
1258 stream_ref_count_ = new StreamRefCount(stream);
1259}
1260
1261StreamInterface* StreamReference::NewReference() {
1262 stream_ref_count_->AddReference();
1263 return new StreamReference(stream_ref_count_, stream());
1264}
1265
1266StreamReference::~StreamReference() {
1267 stream_ref_count_->Release();
1268}
1269
1270StreamReference::StreamReference(StreamRefCount* stream_ref_count,
1271 StreamInterface* stream)
1272 : StreamAdapterInterface(stream, false),
1273 stream_ref_count_(stream_ref_count) {
1274}
1275
1276///////////////////////////////////////////////////////////////////////////////
1277
1278StreamResult Flow(StreamInterface* source,
1279 char* buffer, size_t buffer_len,
1280 StreamInterface* sink,
1281 size_t* data_len /* = NULL */) {
1282 ASSERT(buffer_len > 0);
1283
1284 StreamResult result;
1285 size_t count, read_pos, write_pos;
1286 if (data_len) {
1287 read_pos = *data_len;
1288 } else {
1289 read_pos = 0;
1290 }
1291
1292 bool end_of_stream = false;
1293 do {
1294 // Read until buffer is full, end of stream, or error
1295 while (!end_of_stream && (read_pos < buffer_len)) {
1296 result = source->Read(buffer + read_pos, buffer_len - read_pos,
1297 &count, NULL);
1298 if (result == SR_EOS) {
1299 end_of_stream = true;
1300 } else if (result != SR_SUCCESS) {
1301 if (data_len) {
1302 *data_len = read_pos;
1303 }
1304 return result;
1305 } else {
1306 read_pos += count;
1307 }
1308 }
1309
1310 // Write until buffer is empty, or error (including end of stream)
1311 write_pos = 0;
1312 while (write_pos < read_pos) {
1313 result = sink->Write(buffer + write_pos, read_pos - write_pos,
1314 &count, NULL);
1315 if (result != SR_SUCCESS) {
1316 if (data_len) {
1317 *data_len = read_pos - write_pos;
1318 if (write_pos > 0) {
1319 memmove(buffer, buffer + write_pos, *data_len);
1320 }
1321 }
1322 return result;
1323 }
1324 write_pos += count;
1325 }
1326
1327 read_pos = 0;
1328 } while (!end_of_stream);
1329
1330 if (data_len) {
1331 *data_len = 0;
1332 }
1333 return SR_SUCCESS;
1334}
1335
1336///////////////////////////////////////////////////////////////////////////////
1337
1338} // namespace rtc