blob: 4a85c9f041fcc3e627c960eac78fe4015a6819b6 [file] [log] [blame]
henrike@webrtc.orgf0488722014-05-13 18:00:26 +00001/*
2 * Copyright 2004 The WebRTC Project Authors. All rights reserved.
3 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 */
10
11#if defined(WEBRTC_POSIX)
12#include <sys/file.h>
13#endif // WEBRTC_POSIX
14#include <sys/types.h>
15#include <sys/stat.h>
16#include <errno.h>
andresp@webrtc.orgff689be2015-02-12 11:54:26 +000017
18#include <algorithm>
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000019#include <string>
andresp@webrtc.orgff689be2015-02-12 11:54:26 +000020
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000021#include "webrtc/base/basictypes.h"
22#include "webrtc/base/common.h"
23#include "webrtc/base/logging.h"
24#include "webrtc/base/messagequeue.h"
25#include "webrtc/base/stream.h"
26#include "webrtc/base/stringencode.h"
27#include "webrtc/base/stringutils.h"
28#include "webrtc/base/thread.h"
29#include "webrtc/base/timeutils.h"
30
31#if defined(WEBRTC_WIN)
32#include "webrtc/base/win32.h"
33#define fileno _fileno
34#endif
35
36namespace rtc {
37
38///////////////////////////////////////////////////////////////////////////////
39// StreamInterface
40///////////////////////////////////////////////////////////////////////////////
41StreamInterface::~StreamInterface() {
42}
43
44StreamResult StreamInterface::WriteAll(const void* data, size_t data_len,
45 size_t* written, int* error) {
46 StreamResult result = SR_SUCCESS;
47 size_t total_written = 0, current_written;
48 while (total_written < data_len) {
49 result = Write(static_cast<const char*>(data) + total_written,
50 data_len - total_written, &current_written, error);
51 if (result != SR_SUCCESS)
52 break;
53 total_written += current_written;
54 }
55 if (written)
56 *written = total_written;
57 return result;
58}
59
60StreamResult StreamInterface::ReadAll(void* buffer, size_t buffer_len,
61 size_t* read, int* error) {
62 StreamResult result = SR_SUCCESS;
63 size_t total_read = 0, current_read;
64 while (total_read < buffer_len) {
65 result = Read(static_cast<char*>(buffer) + total_read,
66 buffer_len - total_read, &current_read, error);
67 if (result != SR_SUCCESS)
68 break;
69 total_read += current_read;
70 }
71 if (read)
72 *read = total_read;
73 return result;
74}
75
76StreamResult StreamInterface::ReadLine(std::string* line) {
77 line->clear();
78 StreamResult result = SR_SUCCESS;
79 while (true) {
80 char ch;
81 result = Read(&ch, sizeof(ch), NULL, NULL);
82 if (result != SR_SUCCESS) {
83 break;
84 }
85 if (ch == '\n') {
86 break;
87 }
88 line->push_back(ch);
89 }
90 if (!line->empty()) { // give back the line we've collected so far with
91 result = SR_SUCCESS; // a success code. Otherwise return the last code
92 }
93 return result;
94}
95
96void StreamInterface::PostEvent(Thread* t, int events, int err) {
97 t->Post(this, MSG_POST_EVENT, new StreamEventData(events, err));
98}
99
100void StreamInterface::PostEvent(int events, int err) {
101 PostEvent(Thread::Current(), events, err);
102}
103
kwiberg@webrtc.org67186fe2015-03-09 22:21:53 +0000104const void* StreamInterface::GetReadData(size_t* data_len) {
105 return NULL;
106}
107
108void* StreamInterface::GetWriteBuffer(size_t* buf_len) {
109 return NULL;
110}
111
112bool StreamInterface::SetPosition(size_t position) {
113 return false;
114}
115
116bool StreamInterface::GetPosition(size_t* position) const {
117 return false;
118}
119
120bool StreamInterface::GetSize(size_t* size) const {
121 return false;
122}
123
124bool StreamInterface::GetAvailable(size_t* size) const {
125 return false;
126}
127
128bool StreamInterface::GetWriteRemaining(size_t* size) const {
129 return false;
130}
131
132bool StreamInterface::Flush() {
133 return false;
134}
135
136bool StreamInterface::ReserveSize(size_t size) {
137 return true;
138}
139
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000140StreamInterface::StreamInterface() {
141}
142
143void StreamInterface::OnMessage(Message* msg) {
144 if (MSG_POST_EVENT == msg->message_id) {
145 StreamEventData* pe = static_cast<StreamEventData*>(msg->pdata);
146 SignalEvent(this, pe->events, pe->error);
147 delete msg->pdata;
148 }
149}
150
151///////////////////////////////////////////////////////////////////////////////
152// StreamAdapterInterface
153///////////////////////////////////////////////////////////////////////////////
154
155StreamAdapterInterface::StreamAdapterInterface(StreamInterface* stream,
156 bool owned)
157 : stream_(stream), owned_(owned) {
158 if (NULL != stream_)
159 stream_->SignalEvent.connect(this, &StreamAdapterInterface::OnEvent);
160}
161
kwiberg@webrtc.org67186fe2015-03-09 22:21:53 +0000162StreamState StreamAdapterInterface::GetState() const {
163 return stream_->GetState();
164}
165StreamResult StreamAdapterInterface::Read(void* buffer,
166 size_t buffer_len,
167 size_t* read,
168 int* error) {
169 return stream_->Read(buffer, buffer_len, read, error);
170}
171StreamResult StreamAdapterInterface::Write(const void* data,
172 size_t data_len,
173 size_t* written,
174 int* error) {
175 return stream_->Write(data, data_len, written, error);
176}
177void StreamAdapterInterface::Close() {
178 stream_->Close();
179}
180
181bool StreamAdapterInterface::SetPosition(size_t position) {
182 return stream_->SetPosition(position);
183}
184
185bool StreamAdapterInterface::GetPosition(size_t* position) const {
186 return stream_->GetPosition(position);
187}
188
189bool StreamAdapterInterface::GetSize(size_t* size) const {
190 return stream_->GetSize(size);
191}
192
193bool StreamAdapterInterface::GetAvailable(size_t* size) const {
194 return stream_->GetAvailable(size);
195}
196
197bool StreamAdapterInterface::GetWriteRemaining(size_t* size) const {
198 return stream_->GetWriteRemaining(size);
199}
200
201bool StreamAdapterInterface::ReserveSize(size_t size) {
202 return stream_->ReserveSize(size);
203}
204
205bool StreamAdapterInterface::Flush() {
206 return stream_->Flush();
207}
208
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000209void StreamAdapterInterface::Attach(StreamInterface* stream, bool owned) {
210 if (NULL != stream_)
211 stream_->SignalEvent.disconnect(this);
212 if (owned_)
213 delete stream_;
214 stream_ = stream;
215 owned_ = owned;
216 if (NULL != stream_)
217 stream_->SignalEvent.connect(this, &StreamAdapterInterface::OnEvent);
218}
219
220StreamInterface* StreamAdapterInterface::Detach() {
221 if (NULL != stream_)
222 stream_->SignalEvent.disconnect(this);
223 StreamInterface* stream = stream_;
224 stream_ = NULL;
225 return stream;
226}
227
228StreamAdapterInterface::~StreamAdapterInterface() {
229 if (owned_)
230 delete stream_;
231}
232
kwiberg@webrtc.org67186fe2015-03-09 22:21:53 +0000233void StreamAdapterInterface::OnEvent(StreamInterface* stream,
234 int events,
235 int err) {
236 SignalEvent(this, events, err);
237}
238
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000239///////////////////////////////////////////////////////////////////////////////
240// StreamTap
241///////////////////////////////////////////////////////////////////////////////
242
243StreamTap::StreamTap(StreamInterface* stream, StreamInterface* tap)
244 : StreamAdapterInterface(stream), tap_(), tap_result_(SR_SUCCESS),
245 tap_error_(0) {
246 AttachTap(tap);
247}
248
kwiberg@webrtc.org67186fe2015-03-09 22:21:53 +0000249StreamTap::~StreamTap() = default;
250
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000251void StreamTap::AttachTap(StreamInterface* tap) {
252 tap_.reset(tap);
253}
254
255StreamInterface* StreamTap::DetachTap() {
256 return tap_.release();
257}
258
259StreamResult StreamTap::GetTapResult(int* error) {
260 if (error) {
261 *error = tap_error_;
262 }
263 return tap_result_;
264}
265
266StreamResult StreamTap::Read(void* buffer, size_t buffer_len,
267 size_t* read, int* error) {
268 size_t backup_read;
269 if (!read) {
270 read = &backup_read;
271 }
272 StreamResult res = StreamAdapterInterface::Read(buffer, buffer_len,
273 read, error);
274 if ((res == SR_SUCCESS) && (tap_result_ == SR_SUCCESS)) {
275 tap_result_ = tap_->WriteAll(buffer, *read, NULL, &tap_error_);
276 }
277 return res;
278}
279
280StreamResult StreamTap::Write(const void* data, size_t data_len,
281 size_t* written, int* error) {
282 size_t backup_written;
283 if (!written) {
284 written = &backup_written;
285 }
286 StreamResult res = StreamAdapterInterface::Write(data, data_len,
287 written, error);
288 if ((res == SR_SUCCESS) && (tap_result_ == SR_SUCCESS)) {
289 tap_result_ = tap_->WriteAll(data, *written, NULL, &tap_error_);
290 }
291 return res;
292}
293
294///////////////////////////////////////////////////////////////////////////////
295// StreamSegment
296///////////////////////////////////////////////////////////////////////////////
297
298StreamSegment::StreamSegment(StreamInterface* stream)
299 : StreamAdapterInterface(stream), start_(SIZE_UNKNOWN), pos_(0),
300 length_(SIZE_UNKNOWN) {
301 // It's ok for this to fail, in which case start_ is left as SIZE_UNKNOWN.
302 stream->GetPosition(&start_);
303}
304
305StreamSegment::StreamSegment(StreamInterface* stream, size_t length)
306 : StreamAdapterInterface(stream), start_(SIZE_UNKNOWN), pos_(0),
307 length_(length) {
308 // It's ok for this to fail, in which case start_ is left as SIZE_UNKNOWN.
309 stream->GetPosition(&start_);
310}
311
312StreamResult StreamSegment::Read(void* buffer, size_t buffer_len,
313 size_t* read, int* error) {
314 if (SIZE_UNKNOWN != length_) {
315 if (pos_ >= length_)
316 return SR_EOS;
andresp@webrtc.orgff689be2015-02-12 11:54:26 +0000317 buffer_len = std::min(buffer_len, length_ - pos_);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000318 }
319 size_t backup_read;
320 if (!read) {
321 read = &backup_read;
322 }
323 StreamResult result = StreamAdapterInterface::Read(buffer, buffer_len,
324 read, error);
325 if (SR_SUCCESS == result) {
326 pos_ += *read;
327 }
328 return result;
329}
330
331bool StreamSegment::SetPosition(size_t position) {
332 if (SIZE_UNKNOWN == start_)
333 return false; // Not seekable
334 if ((SIZE_UNKNOWN != length_) && (position > length_))
335 return false; // Seek past end of segment
336 if (!StreamAdapterInterface::SetPosition(start_ + position))
337 return false;
338 pos_ = position;
339 return true;
340}
341
342bool StreamSegment::GetPosition(size_t* position) const {
343 if (SIZE_UNKNOWN == start_)
344 return false; // Not seekable
345 if (!StreamAdapterInterface::GetPosition(position))
346 return false;
347 if (position) {
348 ASSERT(*position >= start_);
349 *position -= start_;
350 }
351 return true;
352}
353
354bool StreamSegment::GetSize(size_t* size) const {
355 if (!StreamAdapterInterface::GetSize(size))
356 return false;
357 if (size) {
358 if (SIZE_UNKNOWN != start_) {
359 ASSERT(*size >= start_);
360 *size -= start_;
361 }
362 if (SIZE_UNKNOWN != length_) {
andresp@webrtc.orgff689be2015-02-12 11:54:26 +0000363 *size = std::min(*size, length_);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000364 }
365 }
366 return true;
367}
368
369bool StreamSegment::GetAvailable(size_t* size) const {
370 if (!StreamAdapterInterface::GetAvailable(size))
371 return false;
372 if (size && (SIZE_UNKNOWN != length_))
andresp@webrtc.orgff689be2015-02-12 11:54:26 +0000373 *size = std::min(*size, length_ - pos_);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000374 return true;
375}
376
377///////////////////////////////////////////////////////////////////////////////
378// NullStream
379///////////////////////////////////////////////////////////////////////////////
380
381NullStream::NullStream() {
382}
383
384NullStream::~NullStream() {
385}
386
387StreamState NullStream::GetState() const {
388 return SS_OPEN;
389}
390
391StreamResult NullStream::Read(void* buffer, size_t buffer_len,
392 size_t* read, int* error) {
393 if (error) *error = -1;
394 return SR_ERROR;
395}
396
397StreamResult NullStream::Write(const void* data, size_t data_len,
398 size_t* written, int* error) {
399 if (written) *written = data_len;
400 return SR_SUCCESS;
401}
402
403void NullStream::Close() {
404}
405
406///////////////////////////////////////////////////////////////////////////////
407// FileStream
408///////////////////////////////////////////////////////////////////////////////
409
410FileStream::FileStream() : file_(NULL) {
411}
412
413FileStream::~FileStream() {
414 FileStream::Close();
415}
416
417bool FileStream::Open(const std::string& filename, const char* mode,
418 int* error) {
419 Close();
420#if defined(WEBRTC_WIN)
421 std::wstring wfilename;
422 if (Utf8ToWindowsFilename(filename, &wfilename)) {
423 file_ = _wfopen(wfilename.c_str(), ToUtf16(mode).c_str());
424 } else {
425 if (error) {
426 *error = -1;
427 return false;
428 }
429 }
430#else
431 file_ = fopen(filename.c_str(), mode);
432#endif
433 if (!file_ && error) {
434 *error = errno;
435 }
436 return (file_ != NULL);
437}
438
439bool FileStream::OpenShare(const std::string& filename, const char* mode,
440 int shflag, int* error) {
441 Close();
442#if defined(WEBRTC_WIN)
443 std::wstring wfilename;
444 if (Utf8ToWindowsFilename(filename, &wfilename)) {
445 file_ = _wfsopen(wfilename.c_str(), ToUtf16(mode).c_str(), shflag);
446 if (!file_ && error) {
447 *error = errno;
448 return false;
449 }
450 return file_ != NULL;
451 } else {
452 if (error) {
453 *error = -1;
454 }
455 return false;
456 }
457#else
458 return Open(filename, mode, error);
459#endif
460}
461
462bool FileStream::DisableBuffering() {
463 if (!file_)
464 return false;
465 return (setvbuf(file_, NULL, _IONBF, 0) == 0);
466}
467
468StreamState FileStream::GetState() const {
469 return (file_ == NULL) ? SS_CLOSED : SS_OPEN;
470}
471
472StreamResult FileStream::Read(void* buffer, size_t buffer_len,
473 size_t* read, int* error) {
474 if (!file_)
475 return SR_EOS;
476 size_t result = fread(buffer, 1, buffer_len, file_);
477 if ((result == 0) && (buffer_len > 0)) {
478 if (feof(file_))
479 return SR_EOS;
480 if (error)
481 *error = errno;
482 return SR_ERROR;
483 }
484 if (read)
485 *read = result;
486 return SR_SUCCESS;
487}
488
489StreamResult FileStream::Write(const void* data, size_t data_len,
490 size_t* written, int* error) {
491 if (!file_)
492 return SR_EOS;
493 size_t result = fwrite(data, 1, data_len, file_);
494 if ((result == 0) && (data_len > 0)) {
495 if (error)
496 *error = errno;
497 return SR_ERROR;
498 }
499 if (written)
500 *written = result;
501 return SR_SUCCESS;
502}
503
504void FileStream::Close() {
505 if (file_) {
506 DoClose();
507 file_ = NULL;
508 }
509}
510
511bool FileStream::SetPosition(size_t position) {
512 if (!file_)
513 return false;
514 return (fseek(file_, static_cast<int>(position), SEEK_SET) == 0);
515}
516
517bool FileStream::GetPosition(size_t* position) const {
518 ASSERT(NULL != position);
519 if (!file_)
520 return false;
521 long result = ftell(file_);
522 if (result < 0)
523 return false;
524 if (position)
525 *position = result;
526 return true;
527}
528
529bool FileStream::GetSize(size_t* size) const {
530 ASSERT(NULL != size);
531 if (!file_)
532 return false;
533 struct stat file_stats;
534 if (fstat(fileno(file_), &file_stats) != 0)
535 return false;
536 if (size)
537 *size = file_stats.st_size;
538 return true;
539}
540
541bool FileStream::GetAvailable(size_t* size) const {
542 ASSERT(NULL != size);
543 if (!GetSize(size))
544 return false;
545 long result = ftell(file_);
546 if (result < 0)
547 return false;
548 if (size)
549 *size -= result;
550 return true;
551}
552
553bool FileStream::ReserveSize(size_t size) {
554 // TODO: extend the file to the proper length
555 return true;
556}
557
558bool FileStream::GetSize(const std::string& filename, size_t* size) {
559 struct stat file_stats;
560 if (stat(filename.c_str(), &file_stats) != 0)
561 return false;
562 *size = file_stats.st_size;
563 return true;
564}
565
566bool FileStream::Flush() {
567 if (file_) {
568 return (0 == fflush(file_));
569 }
570 // try to flush empty file?
571 ASSERT(false);
572 return false;
573}
574
575#if defined(WEBRTC_POSIX) && !defined(__native_client__)
576
577bool FileStream::TryLock() {
578 if (file_ == NULL) {
579 // Stream not open.
580 ASSERT(false);
581 return false;
582 }
583
584 return flock(fileno(file_), LOCK_EX|LOCK_NB) == 0;
585}
586
587bool FileStream::Unlock() {
588 if (file_ == NULL) {
589 // Stream not open.
590 ASSERT(false);
591 return false;
592 }
593
594 return flock(fileno(file_), LOCK_UN) == 0;
595}
596
597#endif
598
599void FileStream::DoClose() {
600 fclose(file_);
601}
602
603CircularFileStream::CircularFileStream(size_t max_size)
604 : max_write_size_(max_size),
605 position_(0),
606 marked_position_(max_size / 2),
607 last_write_position_(0),
608 read_segment_(READ_LATEST),
609 read_segment_available_(0) {
610}
611
612bool CircularFileStream::Open(
613 const std::string& filename, const char* mode, int* error) {
614 if (!FileStream::Open(filename.c_str(), mode, error))
615 return false;
616
617 if (strchr(mode, "r") != NULL) { // Opened in read mode.
618 // Check if the buffer has been overwritten and determine how to read the
619 // log in time sequence.
620 size_t file_size;
621 GetSize(&file_size);
622 if (file_size == position_) {
623 // The buffer has not been overwritten yet. Read 0 .. file_size
624 read_segment_ = READ_LATEST;
625 read_segment_available_ = file_size;
626 } else {
627 // The buffer has been over written. There are three segments: The first
628 // one is 0 .. marked_position_, which is the marked earliest log. The
629 // second one is position_ .. file_size, which is the middle log. The
630 // last one is marked_position_ .. position_, which is the latest log.
631 read_segment_ = READ_MARKED;
632 read_segment_available_ = marked_position_;
633 last_write_position_ = position_;
634 }
635
636 // Read from the beginning.
637 position_ = 0;
638 SetPosition(position_);
639 }
640
641 return true;
642}
643
644StreamResult CircularFileStream::Read(void* buffer, size_t buffer_len,
645 size_t* read, int* error) {
646 if (read_segment_available_ == 0) {
647 size_t file_size;
648 switch (read_segment_) {
649 case READ_MARKED: // Finished READ_MARKED and start READ_MIDDLE.
650 read_segment_ = READ_MIDDLE;
651 position_ = last_write_position_;
652 SetPosition(position_);
653 GetSize(&file_size);
654 read_segment_available_ = file_size - position_;
655 break;
656
657 case READ_MIDDLE: // Finished READ_MIDDLE and start READ_LATEST.
658 read_segment_ = READ_LATEST;
659 position_ = marked_position_;
660 SetPosition(position_);
661 read_segment_available_ = last_write_position_ - position_;
662 break;
663
664 default: // Finished READ_LATEST and return EOS.
665 return rtc::SR_EOS;
666 }
667 }
668
669 size_t local_read;
670 if (!read) read = &local_read;
671
andresp@webrtc.orgff689be2015-02-12 11:54:26 +0000672 size_t to_read = std::min(buffer_len, read_segment_available_);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000673 rtc::StreamResult result
674 = rtc::FileStream::Read(buffer, to_read, read, error);
675 if (result == rtc::SR_SUCCESS) {
676 read_segment_available_ -= *read;
677 position_ += *read;
678 }
679 return result;
680}
681
682StreamResult CircularFileStream::Write(const void* data, size_t data_len,
683 size_t* written, int* error) {
684 if (position_ >= max_write_size_) {
685 ASSERT(position_ == max_write_size_);
686 position_ = marked_position_;
687 SetPosition(position_);
688 }
689
690 size_t local_written;
691 if (!written) written = &local_written;
692
693 size_t to_eof = max_write_size_ - position_;
andresp@webrtc.orgff689be2015-02-12 11:54:26 +0000694 size_t to_write = std::min(data_len, to_eof);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000695 rtc::StreamResult result
696 = rtc::FileStream::Write(data, to_write, written, error);
697 if (result == rtc::SR_SUCCESS) {
698 position_ += *written;
699 }
700 return result;
701}
702
kwiberg@webrtc.org67186fe2015-03-09 22:21:53 +0000703AsyncWriteStream::AsyncWriteStream(StreamInterface* stream,
704 rtc::Thread* write_thread)
705 : stream_(stream),
706 write_thread_(write_thread),
707 state_(stream ? stream->GetState() : SS_CLOSED) {
708}
709
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000710AsyncWriteStream::~AsyncWriteStream() {
711 write_thread_->Clear(this, 0, NULL);
712 ClearBufferAndWrite();
713
714 CritScope cs(&crit_stream_);
715 stream_.reset();
716}
717
kwiberg@webrtc.org67186fe2015-03-09 22:21:53 +0000718StreamState AsyncWriteStream::GetState() const {
719 return state_;
720}
721
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000722// This is needed by some stream writers, such as RtpDumpWriter.
723bool AsyncWriteStream::GetPosition(size_t* position) const {
724 CritScope cs(&crit_stream_);
725 return stream_->GetPosition(position);
726}
727
728// This is needed by some stream writers, such as the plugin log writers.
729StreamResult AsyncWriteStream::Read(void* buffer, size_t buffer_len,
730 size_t* read, int* error) {
731 CritScope cs(&crit_stream_);
732 return stream_->Read(buffer, buffer_len, read, error);
733}
734
735void AsyncWriteStream::Close() {
736 if (state_ == SS_CLOSED) {
737 return;
738 }
739
740 write_thread_->Clear(this, 0, NULL);
741 ClearBufferAndWrite();
742
743 CritScope cs(&crit_stream_);
744 stream_->Close();
745 state_ = SS_CLOSED;
746}
747
748StreamResult AsyncWriteStream::Write(const void* data, size_t data_len,
749 size_t* written, int* error) {
750 if (state_ == SS_CLOSED) {
751 return SR_ERROR;
752 }
753
754 size_t previous_buffer_length = 0;
755 {
756 CritScope cs(&crit_buffer_);
757 previous_buffer_length = buffer_.length();
758 buffer_.AppendData(data, data_len);
759 }
760
761 if (previous_buffer_length == 0) {
762 // If there's stuff already in the buffer, then we already called
763 // Post and the write_thread_ hasn't pulled it out yet, so we
764 // don't need to re-Post.
765 write_thread_->Post(this, 0, NULL);
766 }
767 // Return immediately, assuming that it works.
768 if (written) {
769 *written = data_len;
770 }
771 return SR_SUCCESS;
772}
773
774void AsyncWriteStream::OnMessage(rtc::Message* pmsg) {
775 ClearBufferAndWrite();
776}
777
778bool AsyncWriteStream::Flush() {
779 if (state_ == SS_CLOSED) {
780 return false;
781 }
782
783 ClearBufferAndWrite();
784
785 CritScope cs(&crit_stream_);
786 return stream_->Flush();
787}
788
789void AsyncWriteStream::ClearBufferAndWrite() {
790 Buffer to_write;
791 {
792 CritScope cs_buffer(&crit_buffer_);
793 buffer_.TransferTo(&to_write);
794 }
795
796 if (to_write.length() > 0) {
797 CritScope cs(&crit_stream_);
798 stream_->WriteAll(to_write.data(), to_write.length(), NULL, NULL);
799 }
800}
801
802#if defined(WEBRTC_POSIX) && !defined(__native_client__)
803
804// Have to identically rewrite the FileStream destructor or else it would call
805// the base class's Close() instead of the sub-class's.
806POpenStream::~POpenStream() {
807 POpenStream::Close();
808}
809
810bool POpenStream::Open(const std::string& subcommand,
811 const char* mode,
812 int* error) {
813 Close();
814 file_ = popen(subcommand.c_str(), mode);
815 if (file_ == NULL) {
816 if (error)
817 *error = errno;
818 return false;
819 }
820 return true;
821}
822
823bool POpenStream::OpenShare(const std::string& subcommand, const char* mode,
824 int shflag, int* error) {
825 return Open(subcommand, mode, error);
826}
827
828void POpenStream::DoClose() {
829 wait_status_ = pclose(file_);
830}
831
832#endif
833
834///////////////////////////////////////////////////////////////////////////////
835// MemoryStream
836///////////////////////////////////////////////////////////////////////////////
837
838MemoryStreamBase::MemoryStreamBase()
839 : buffer_(NULL), buffer_length_(0), data_length_(0),
840 seek_position_(0) {
841}
842
843StreamState MemoryStreamBase::GetState() const {
844 return SS_OPEN;
845}
846
847StreamResult MemoryStreamBase::Read(void* buffer, size_t bytes,
848 size_t* bytes_read, int* error) {
849 if (seek_position_ >= data_length_) {
850 return SR_EOS;
851 }
852 size_t available = data_length_ - seek_position_;
853 if (bytes > available) {
854 // Read partial buffer
855 bytes = available;
856 }
857 memcpy(buffer, &buffer_[seek_position_], bytes);
858 seek_position_ += bytes;
859 if (bytes_read) {
860 *bytes_read = bytes;
861 }
862 return SR_SUCCESS;
863}
864
865StreamResult MemoryStreamBase::Write(const void* buffer, size_t bytes,
866 size_t* bytes_written, int* error) {
867 size_t available = buffer_length_ - seek_position_;
868 if (0 == available) {
869 // Increase buffer size to the larger of:
870 // a) new position rounded up to next 256 bytes
871 // b) double the previous length
andresp@webrtc.orgff689be2015-02-12 11:54:26 +0000872 size_t new_buffer_length =
873 std::max(((seek_position_ + bytes) | 0xFF) + 1, buffer_length_ * 2);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000874 StreamResult result = DoReserve(new_buffer_length, error);
875 if (SR_SUCCESS != result) {
876 return result;
877 }
878 ASSERT(buffer_length_ >= new_buffer_length);
879 available = buffer_length_ - seek_position_;
880 }
881
882 if (bytes > available) {
883 bytes = available;
884 }
885 memcpy(&buffer_[seek_position_], buffer, bytes);
886 seek_position_ += bytes;
887 if (data_length_ < seek_position_) {
888 data_length_ = seek_position_;
889 }
890 if (bytes_written) {
891 *bytes_written = bytes;
892 }
893 return SR_SUCCESS;
894}
895
896void MemoryStreamBase::Close() {
897 // nothing to do
898}
899
900bool MemoryStreamBase::SetPosition(size_t position) {
901 if (position > data_length_)
902 return false;
903 seek_position_ = position;
904 return true;
905}
906
907bool MemoryStreamBase::GetPosition(size_t* position) const {
908 if (position)
909 *position = seek_position_;
910 return true;
911}
912
913bool MemoryStreamBase::GetSize(size_t* size) const {
914 if (size)
915 *size = data_length_;
916 return true;
917}
918
919bool MemoryStreamBase::GetAvailable(size_t* size) const {
920 if (size)
921 *size = data_length_ - seek_position_;
922 return true;
923}
924
925bool MemoryStreamBase::ReserveSize(size_t size) {
926 return (SR_SUCCESS == DoReserve(size, NULL));
927}
928
929StreamResult MemoryStreamBase::DoReserve(size_t size, int* error) {
930 return (buffer_length_ >= size) ? SR_SUCCESS : SR_EOS;
931}
932
933///////////////////////////////////////////////////////////////////////////////
934
935MemoryStream::MemoryStream()
936 : buffer_alloc_(NULL) {
937}
938
939MemoryStream::MemoryStream(const char* data)
940 : buffer_alloc_(NULL) {
941 SetData(data, strlen(data));
942}
943
944MemoryStream::MemoryStream(const void* data, size_t length)
945 : buffer_alloc_(NULL) {
946 SetData(data, length);
947}
948
949MemoryStream::~MemoryStream() {
950 delete [] buffer_alloc_;
951}
952
953void MemoryStream::SetData(const void* data, size_t length) {
954 data_length_ = buffer_length_ = length;
955 delete [] buffer_alloc_;
956 buffer_alloc_ = new char[buffer_length_ + kAlignment];
957 buffer_ = reinterpret_cast<char*>(ALIGNP(buffer_alloc_, kAlignment));
958 memcpy(buffer_, data, data_length_);
959 seek_position_ = 0;
960}
961
962StreamResult MemoryStream::DoReserve(size_t size, int* error) {
963 if (buffer_length_ >= size)
964 return SR_SUCCESS;
965
966 if (char* new_buffer_alloc = new char[size + kAlignment]) {
967 char* new_buffer = reinterpret_cast<char*>(
968 ALIGNP(new_buffer_alloc, kAlignment));
969 memcpy(new_buffer, buffer_, data_length_);
970 delete [] buffer_alloc_;
971 buffer_alloc_ = new_buffer_alloc;
972 buffer_ = new_buffer;
973 buffer_length_ = size;
974 return SR_SUCCESS;
975 }
976
977 if (error) {
978 *error = ENOMEM;
979 }
980 return SR_ERROR;
981}
982
983///////////////////////////////////////////////////////////////////////////////
984
985ExternalMemoryStream::ExternalMemoryStream() {
986}
987
988ExternalMemoryStream::ExternalMemoryStream(void* data, size_t length) {
989 SetData(data, length);
990}
991
992ExternalMemoryStream::~ExternalMemoryStream() {
993}
994
995void ExternalMemoryStream::SetData(void* data, size_t length) {
996 data_length_ = buffer_length_ = length;
997 buffer_ = static_cast<char*>(data);
998 seek_position_ = 0;
999}
1000
1001///////////////////////////////////////////////////////////////////////////////
1002// FifoBuffer
1003///////////////////////////////////////////////////////////////////////////////
1004
1005FifoBuffer::FifoBuffer(size_t size)
1006 : state_(SS_OPEN), buffer_(new char[size]), buffer_length_(size),
1007 data_length_(0), read_position_(0), owner_(Thread::Current()) {
1008 // all events are done on the owner_ thread
1009}
1010
1011FifoBuffer::FifoBuffer(size_t size, Thread* owner)
1012 : state_(SS_OPEN), buffer_(new char[size]), buffer_length_(size),
1013 data_length_(0), read_position_(0), owner_(owner) {
1014 // all events are done on the owner_ thread
1015}
1016
1017FifoBuffer::~FifoBuffer() {
1018}
1019
1020bool FifoBuffer::GetBuffered(size_t* size) const {
1021 CritScope cs(&crit_);
1022 *size = data_length_;
1023 return true;
1024}
1025
1026bool FifoBuffer::SetCapacity(size_t size) {
1027 CritScope cs(&crit_);
1028 if (data_length_ > size) {
1029 return false;
1030 }
1031
1032 if (size != buffer_length_) {
1033 char* buffer = new char[size];
1034 const size_t copy = data_length_;
andresp@webrtc.orgff689be2015-02-12 11:54:26 +00001035 const size_t tail_copy = std::min(copy, buffer_length_ - read_position_);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +00001036 memcpy(buffer, &buffer_[read_position_], tail_copy);
1037 memcpy(buffer + tail_copy, &buffer_[0], copy - tail_copy);
1038 buffer_.reset(buffer);
1039 read_position_ = 0;
1040 buffer_length_ = size;
1041 }
1042 return true;
1043}
1044
1045StreamResult FifoBuffer::ReadOffset(void* buffer, size_t bytes,
1046 size_t offset, size_t* bytes_read) {
1047 CritScope cs(&crit_);
1048 return ReadOffsetLocked(buffer, bytes, offset, bytes_read);
1049}
1050
1051StreamResult FifoBuffer::WriteOffset(const void* buffer, size_t bytes,
1052 size_t offset, size_t* bytes_written) {
1053 CritScope cs(&crit_);
1054 return WriteOffsetLocked(buffer, bytes, offset, bytes_written);
1055}
1056
1057StreamState FifoBuffer::GetState() const {
1058 return state_;
1059}
1060
1061StreamResult FifoBuffer::Read(void* buffer, size_t bytes,
1062 size_t* bytes_read, int* error) {
1063 CritScope cs(&crit_);
1064 const bool was_writable = data_length_ < buffer_length_;
1065 size_t copy = 0;
1066 StreamResult result = ReadOffsetLocked(buffer, bytes, 0, &copy);
1067
1068 if (result == SR_SUCCESS) {
1069 // If read was successful then adjust the read position and number of
1070 // bytes buffered.
1071 read_position_ = (read_position_ + copy) % buffer_length_;
1072 data_length_ -= copy;
1073 if (bytes_read) {
1074 *bytes_read = copy;
1075 }
1076
1077 // if we were full before, and now we're not, post an event
1078 if (!was_writable && copy > 0) {
1079 PostEvent(owner_, SE_WRITE, 0);
1080 }
1081 }
1082 return result;
1083}
1084
1085StreamResult FifoBuffer::Write(const void* buffer, size_t bytes,
1086 size_t* bytes_written, int* error) {
1087 CritScope cs(&crit_);
1088
1089 const bool was_readable = (data_length_ > 0);
1090 size_t copy = 0;
1091 StreamResult result = WriteOffsetLocked(buffer, bytes, 0, &copy);
1092
1093 if (result == SR_SUCCESS) {
1094 // If write was successful then adjust the number of readable bytes.
1095 data_length_ += copy;
1096 if (bytes_written) {
1097 *bytes_written = copy;
1098 }
1099
1100 // if we didn't have any data to read before, and now we do, post an event
1101 if (!was_readable && copy > 0) {
1102 PostEvent(owner_, SE_READ, 0);
1103 }
1104 }
1105 return result;
1106}
1107
1108void FifoBuffer::Close() {
1109 CritScope cs(&crit_);
1110 state_ = SS_CLOSED;
1111}
1112
1113const void* FifoBuffer::GetReadData(size_t* size) {
1114 CritScope cs(&crit_);
1115 *size = (read_position_ + data_length_ <= buffer_length_) ?
1116 data_length_ : buffer_length_ - read_position_;
1117 return &buffer_[read_position_];
1118}
1119
1120void FifoBuffer::ConsumeReadData(size_t size) {
1121 CritScope cs(&crit_);
1122 ASSERT(size <= data_length_);
1123 const bool was_writable = data_length_ < buffer_length_;
1124 read_position_ = (read_position_ + size) % buffer_length_;
1125 data_length_ -= size;
1126 if (!was_writable && size > 0) {
1127 PostEvent(owner_, SE_WRITE, 0);
1128 }
1129}
1130
1131void* FifoBuffer::GetWriteBuffer(size_t* size) {
1132 CritScope cs(&crit_);
1133 if (state_ == SS_CLOSED) {
1134 return NULL;
1135 }
1136
1137 // if empty, reset the write position to the beginning, so we can get
1138 // the biggest possible block
1139 if (data_length_ == 0) {
1140 read_position_ = 0;
1141 }
1142
1143 const size_t write_position = (read_position_ + data_length_)
1144 % buffer_length_;
1145 *size = (write_position > read_position_ || data_length_ == 0) ?
1146 buffer_length_ - write_position : read_position_ - write_position;
1147 return &buffer_[write_position];
1148}
1149
1150void FifoBuffer::ConsumeWriteBuffer(size_t size) {
1151 CritScope cs(&crit_);
1152 ASSERT(size <= buffer_length_ - data_length_);
1153 const bool was_readable = (data_length_ > 0);
1154 data_length_ += size;
1155 if (!was_readable && size > 0) {
1156 PostEvent(owner_, SE_READ, 0);
1157 }
1158}
1159
1160bool FifoBuffer::GetWriteRemaining(size_t* size) const {
1161 CritScope cs(&crit_);
1162 *size = buffer_length_ - data_length_;
1163 return true;
1164}
1165
1166StreamResult FifoBuffer::ReadOffsetLocked(void* buffer,
1167 size_t bytes,
1168 size_t offset,
1169 size_t* bytes_read) {
1170 if (offset >= data_length_) {
1171 return (state_ != SS_CLOSED) ? SR_BLOCK : SR_EOS;
1172 }
1173
1174 const size_t available = data_length_ - offset;
1175 const size_t read_position = (read_position_ + offset) % buffer_length_;
andresp@webrtc.orgff689be2015-02-12 11:54:26 +00001176 const size_t copy = std::min(bytes, available);
1177 const size_t tail_copy = std::min(copy, buffer_length_ - read_position);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +00001178 char* const p = static_cast<char*>(buffer);
1179 memcpy(p, &buffer_[read_position], tail_copy);
1180 memcpy(p + tail_copy, &buffer_[0], copy - tail_copy);
1181
1182 if (bytes_read) {
1183 *bytes_read = copy;
1184 }
1185 return SR_SUCCESS;
1186}
1187
1188StreamResult FifoBuffer::WriteOffsetLocked(const void* buffer,
1189 size_t bytes,
1190 size_t offset,
1191 size_t* bytes_written) {
1192 if (state_ == SS_CLOSED) {
1193 return SR_EOS;
1194 }
1195
1196 if (data_length_ + offset >= buffer_length_) {
1197 return SR_BLOCK;
1198 }
1199
1200 const size_t available = buffer_length_ - data_length_ - offset;
1201 const size_t write_position = (read_position_ + data_length_ + offset)
1202 % buffer_length_;
andresp@webrtc.orgff689be2015-02-12 11:54:26 +00001203 const size_t copy = std::min(bytes, available);
1204 const size_t tail_copy = std::min(copy, buffer_length_ - write_position);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +00001205 const char* const p = static_cast<const char*>(buffer);
1206 memcpy(&buffer_[write_position], p, tail_copy);
1207 memcpy(&buffer_[0], p + tail_copy, copy - tail_copy);
1208
1209 if (bytes_written) {
1210 *bytes_written = copy;
1211 }
1212 return SR_SUCCESS;
1213}
1214
1215
1216
1217///////////////////////////////////////////////////////////////////////////////
1218// LoggingAdapter
1219///////////////////////////////////////////////////////////////////////////////
1220
1221LoggingAdapter::LoggingAdapter(StreamInterface* stream, LoggingSeverity level,
1222 const std::string& label, bool hex_mode)
1223 : StreamAdapterInterface(stream), level_(level), hex_mode_(hex_mode) {
1224 set_label(label);
1225}
1226
1227void LoggingAdapter::set_label(const std::string& label) {
1228 label_.assign("[");
1229 label_.append(label);
1230 label_.append("]");
1231}
1232
1233StreamResult LoggingAdapter::Read(void* buffer, size_t buffer_len,
1234 size_t* read, int* error) {
1235 size_t local_read; if (!read) read = &local_read;
1236 StreamResult result = StreamAdapterInterface::Read(buffer, buffer_len, read,
1237 error);
1238 if (result == SR_SUCCESS) {
1239 LogMultiline(level_, label_.c_str(), true, buffer, *read, hex_mode_, &lms_);
1240 }
1241 return result;
1242}
1243
1244StreamResult LoggingAdapter::Write(const void* data, size_t data_len,
1245 size_t* written, int* error) {
1246 size_t local_written;
1247 if (!written) written = &local_written;
1248 StreamResult result = StreamAdapterInterface::Write(data, data_len, written,
1249 error);
1250 if (result == SR_SUCCESS) {
1251 LogMultiline(level_, label_.c_str(), false, data, *written, hex_mode_,
1252 &lms_);
1253 }
1254 return result;
1255}
1256
1257void LoggingAdapter::Close() {
1258 LogMultiline(level_, label_.c_str(), false, NULL, 0, hex_mode_, &lms_);
1259 LogMultiline(level_, label_.c_str(), true, NULL, 0, hex_mode_, &lms_);
1260 LOG_V(level_) << label_ << " Closed locally";
1261 StreamAdapterInterface::Close();
1262}
1263
1264void LoggingAdapter::OnEvent(StreamInterface* stream, int events, int err) {
1265 if (events & SE_OPEN) {
1266 LOG_V(level_) << label_ << " Open";
1267 } else if (events & SE_CLOSE) {
1268 LogMultiline(level_, label_.c_str(), false, NULL, 0, hex_mode_, &lms_);
1269 LogMultiline(level_, label_.c_str(), true, NULL, 0, hex_mode_, &lms_);
1270 LOG_V(level_) << label_ << " Closed with error: " << err;
1271 }
1272 StreamAdapterInterface::OnEvent(stream, events, err);
1273}
1274
1275///////////////////////////////////////////////////////////////////////////////
1276// StringStream - Reads/Writes to an external std::string
1277///////////////////////////////////////////////////////////////////////////////
1278
1279StringStream::StringStream(std::string& str)
1280 : str_(str), read_pos_(0), read_only_(false) {
1281}
1282
1283StringStream::StringStream(const std::string& str)
1284 : str_(const_cast<std::string&>(str)), read_pos_(0), read_only_(true) {
1285}
1286
1287StreamState StringStream::GetState() const {
1288 return SS_OPEN;
1289}
1290
1291StreamResult StringStream::Read(void* buffer, size_t buffer_len,
1292 size_t* read, int* error) {
andresp@webrtc.orgff689be2015-02-12 11:54:26 +00001293 size_t available = std::min(buffer_len, str_.size() - read_pos_);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +00001294 if (!available)
1295 return SR_EOS;
1296 memcpy(buffer, str_.data() + read_pos_, available);
1297 read_pos_ += available;
1298 if (read)
1299 *read = available;
1300 return SR_SUCCESS;
1301}
1302
1303StreamResult StringStream::Write(const void* data, size_t data_len,
1304 size_t* written, int* error) {
1305 if (read_only_) {
1306 if (error) {
1307 *error = -1;
1308 }
1309 return SR_ERROR;
1310 }
1311 str_.append(static_cast<const char*>(data),
1312 static_cast<const char*>(data) + data_len);
1313 if (written)
1314 *written = data_len;
1315 return SR_SUCCESS;
1316}
1317
1318void StringStream::Close() {
1319}
1320
1321bool StringStream::SetPosition(size_t position) {
1322 if (position > str_.size())
1323 return false;
1324 read_pos_ = position;
1325 return true;
1326}
1327
1328bool StringStream::GetPosition(size_t* position) const {
1329 if (position)
1330 *position = read_pos_;
1331 return true;
1332}
1333
1334bool StringStream::GetSize(size_t* size) const {
1335 if (size)
1336 *size = str_.size();
1337 return true;
1338}
1339
1340bool StringStream::GetAvailable(size_t* size) const {
1341 if (size)
1342 *size = str_.size() - read_pos_;
1343 return true;
1344}
1345
1346bool StringStream::ReserveSize(size_t size) {
1347 if (read_only_)
1348 return false;
1349 str_.reserve(size);
1350 return true;
1351}
1352
1353///////////////////////////////////////////////////////////////////////////////
1354// StreamReference
1355///////////////////////////////////////////////////////////////////////////////
1356
1357StreamReference::StreamReference(StreamInterface* stream)
1358 : StreamAdapterInterface(stream, false) {
1359 // owner set to false so the destructor does not free the stream.
1360 stream_ref_count_ = new StreamRefCount(stream);
1361}
1362
1363StreamInterface* StreamReference::NewReference() {
1364 stream_ref_count_->AddReference();
1365 return new StreamReference(stream_ref_count_, stream());
1366}
1367
1368StreamReference::~StreamReference() {
1369 stream_ref_count_->Release();
1370}
1371
1372StreamReference::StreamReference(StreamRefCount* stream_ref_count,
1373 StreamInterface* stream)
1374 : StreamAdapterInterface(stream, false),
1375 stream_ref_count_(stream_ref_count) {
1376}
1377
1378///////////////////////////////////////////////////////////////////////////////
1379
1380StreamResult Flow(StreamInterface* source,
1381 char* buffer, size_t buffer_len,
1382 StreamInterface* sink,
1383 size_t* data_len /* = NULL */) {
1384 ASSERT(buffer_len > 0);
1385
1386 StreamResult result;
1387 size_t count, read_pos, write_pos;
1388 if (data_len) {
1389 read_pos = *data_len;
1390 } else {
1391 read_pos = 0;
1392 }
1393
1394 bool end_of_stream = false;
1395 do {
1396 // Read until buffer is full, end of stream, or error
1397 while (!end_of_stream && (read_pos < buffer_len)) {
1398 result = source->Read(buffer + read_pos, buffer_len - read_pos,
1399 &count, NULL);
1400 if (result == SR_EOS) {
1401 end_of_stream = true;
1402 } else if (result != SR_SUCCESS) {
1403 if (data_len) {
1404 *data_len = read_pos;
1405 }
1406 return result;
1407 } else {
1408 read_pos += count;
1409 }
1410 }
1411
1412 // Write until buffer is empty, or error (including end of stream)
1413 write_pos = 0;
1414 while (write_pos < read_pos) {
1415 result = sink->Write(buffer + write_pos, read_pos - write_pos,
1416 &count, NULL);
1417 if (result != SR_SUCCESS) {
1418 if (data_len) {
1419 *data_len = read_pos - write_pos;
1420 if (write_pos > 0) {
1421 memmove(buffer, buffer + write_pos, *data_len);
1422 }
1423 }
1424 return result;
1425 }
1426 write_pos += count;
1427 }
1428
1429 read_pos = 0;
1430 } while (!end_of_stream);
1431
1432 if (data_len) {
1433 *data_len = 0;
1434 }
1435 return SR_SUCCESS;
1436}
1437
1438///////////////////////////////////////////////////////////////////////////////
1439
1440} // namespace rtc