blob: 5e9dc04a284e34a2b10e8640f2a2fde44bf9ac94 [file] [log] [blame]
henrike@webrtc.orgf0488722014-05-13 18:00:26 +00001/*
2 * Copyright 2004 The WebRTC Project Authors. All rights reserved.
3 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 */
10
11#if defined(WEBRTC_POSIX)
12#include <sys/file.h>
13#endif // WEBRTC_POSIX
14#include <sys/types.h>
15#include <sys/stat.h>
16#include <errno.h>
andresp@webrtc.orgff689be2015-02-12 11:54:26 +000017
18#include <algorithm>
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000019#include <string>
andresp@webrtc.orgff689be2015-02-12 11:54:26 +000020
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000021#include "webrtc/base/basictypes.h"
22#include "webrtc/base/common.h"
23#include "webrtc/base/logging.h"
24#include "webrtc/base/messagequeue.h"
25#include "webrtc/base/stream.h"
26#include "webrtc/base/stringencode.h"
27#include "webrtc/base/stringutils.h"
28#include "webrtc/base/thread.h"
29#include "webrtc/base/timeutils.h"
30
31#if defined(WEBRTC_WIN)
32#include "webrtc/base/win32.h"
33#define fileno _fileno
34#endif
35
36namespace rtc {
37
38///////////////////////////////////////////////////////////////////////////////
39// StreamInterface
40///////////////////////////////////////////////////////////////////////////////
41StreamInterface::~StreamInterface() {
42}
43
44StreamResult StreamInterface::WriteAll(const void* data, size_t data_len,
45 size_t* written, int* error) {
46 StreamResult result = SR_SUCCESS;
47 size_t total_written = 0, current_written;
48 while (total_written < data_len) {
49 result = Write(static_cast<const char*>(data) + total_written,
50 data_len - total_written, &current_written, error);
51 if (result != SR_SUCCESS)
52 break;
53 total_written += current_written;
54 }
55 if (written)
56 *written = total_written;
57 return result;
58}
59
60StreamResult StreamInterface::ReadAll(void* buffer, size_t buffer_len,
61 size_t* read, int* error) {
62 StreamResult result = SR_SUCCESS;
63 size_t total_read = 0, current_read;
64 while (total_read < buffer_len) {
65 result = Read(static_cast<char*>(buffer) + total_read,
66 buffer_len - total_read, &current_read, error);
67 if (result != SR_SUCCESS)
68 break;
69 total_read += current_read;
70 }
71 if (read)
72 *read = total_read;
73 return result;
74}
75
76StreamResult StreamInterface::ReadLine(std::string* line) {
77 line->clear();
78 StreamResult result = SR_SUCCESS;
79 while (true) {
80 char ch;
81 result = Read(&ch, sizeof(ch), NULL, NULL);
82 if (result != SR_SUCCESS) {
83 break;
84 }
85 if (ch == '\n') {
86 break;
87 }
88 line->push_back(ch);
89 }
90 if (!line->empty()) { // give back the line we've collected so far with
91 result = SR_SUCCESS; // a success code. Otherwise return the last code
92 }
93 return result;
94}
95
96void StreamInterface::PostEvent(Thread* t, int events, int err) {
97 t->Post(this, MSG_POST_EVENT, new StreamEventData(events, err));
98}
99
100void StreamInterface::PostEvent(int events, int err) {
101 PostEvent(Thread::Current(), events, err);
102}
103
kwiberg@webrtc.org67186fe2015-03-09 22:21:53 +0000104const void* StreamInterface::GetReadData(size_t* data_len) {
105 return NULL;
106}
107
108void* StreamInterface::GetWriteBuffer(size_t* buf_len) {
109 return NULL;
110}
111
112bool StreamInterface::SetPosition(size_t position) {
113 return false;
114}
115
116bool StreamInterface::GetPosition(size_t* position) const {
117 return false;
118}
119
120bool StreamInterface::GetSize(size_t* size) const {
121 return false;
122}
123
124bool StreamInterface::GetAvailable(size_t* size) const {
125 return false;
126}
127
128bool StreamInterface::GetWriteRemaining(size_t* size) const {
129 return false;
130}
131
132bool StreamInterface::Flush() {
133 return false;
134}
135
136bool StreamInterface::ReserveSize(size_t size) {
137 return true;
138}
139
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000140StreamInterface::StreamInterface() {
141}
142
143void StreamInterface::OnMessage(Message* msg) {
144 if (MSG_POST_EVENT == msg->message_id) {
145 StreamEventData* pe = static_cast<StreamEventData*>(msg->pdata);
146 SignalEvent(this, pe->events, pe->error);
147 delete msg->pdata;
148 }
149}
150
151///////////////////////////////////////////////////////////////////////////////
152// StreamAdapterInterface
153///////////////////////////////////////////////////////////////////////////////
154
155StreamAdapterInterface::StreamAdapterInterface(StreamInterface* stream,
156 bool owned)
157 : stream_(stream), owned_(owned) {
158 if (NULL != stream_)
159 stream_->SignalEvent.connect(this, &StreamAdapterInterface::OnEvent);
160}
161
kwiberg@webrtc.org67186fe2015-03-09 22:21:53 +0000162StreamState StreamAdapterInterface::GetState() const {
163 return stream_->GetState();
164}
165StreamResult StreamAdapterInterface::Read(void* buffer,
166 size_t buffer_len,
167 size_t* read,
168 int* error) {
169 return stream_->Read(buffer, buffer_len, read, error);
170}
171StreamResult StreamAdapterInterface::Write(const void* data,
172 size_t data_len,
173 size_t* written,
174 int* error) {
175 return stream_->Write(data, data_len, written, error);
176}
177void StreamAdapterInterface::Close() {
178 stream_->Close();
179}
180
181bool StreamAdapterInterface::SetPosition(size_t position) {
182 return stream_->SetPosition(position);
183}
184
185bool StreamAdapterInterface::GetPosition(size_t* position) const {
186 return stream_->GetPosition(position);
187}
188
189bool StreamAdapterInterface::GetSize(size_t* size) const {
190 return stream_->GetSize(size);
191}
192
193bool StreamAdapterInterface::GetAvailable(size_t* size) const {
194 return stream_->GetAvailable(size);
195}
196
197bool StreamAdapterInterface::GetWriteRemaining(size_t* size) const {
198 return stream_->GetWriteRemaining(size);
199}
200
201bool StreamAdapterInterface::ReserveSize(size_t size) {
202 return stream_->ReserveSize(size);
203}
204
205bool StreamAdapterInterface::Flush() {
206 return stream_->Flush();
207}
208
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000209void StreamAdapterInterface::Attach(StreamInterface* stream, bool owned) {
210 if (NULL != stream_)
211 stream_->SignalEvent.disconnect(this);
212 if (owned_)
213 delete stream_;
214 stream_ = stream;
215 owned_ = owned;
216 if (NULL != stream_)
217 stream_->SignalEvent.connect(this, &StreamAdapterInterface::OnEvent);
218}
219
220StreamInterface* StreamAdapterInterface::Detach() {
221 if (NULL != stream_)
222 stream_->SignalEvent.disconnect(this);
223 StreamInterface* stream = stream_;
224 stream_ = NULL;
225 return stream;
226}
227
228StreamAdapterInterface::~StreamAdapterInterface() {
229 if (owned_)
230 delete stream_;
231}
232
kwiberg@webrtc.org67186fe2015-03-09 22:21:53 +0000233void StreamAdapterInterface::OnEvent(StreamInterface* stream,
234 int events,
235 int err) {
236 SignalEvent(this, events, err);
237}
238
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000239///////////////////////////////////////////////////////////////////////////////
240// StreamTap
241///////////////////////////////////////////////////////////////////////////////
242
243StreamTap::StreamTap(StreamInterface* stream, StreamInterface* tap)
244 : StreamAdapterInterface(stream), tap_(), tap_result_(SR_SUCCESS),
245 tap_error_(0) {
246 AttachTap(tap);
247}
248
kwiberg@webrtc.org67186fe2015-03-09 22:21:53 +0000249StreamTap::~StreamTap() = default;
250
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000251void StreamTap::AttachTap(StreamInterface* tap) {
252 tap_.reset(tap);
253}
254
255StreamInterface* StreamTap::DetachTap() {
256 return tap_.release();
257}
258
259StreamResult StreamTap::GetTapResult(int* error) {
260 if (error) {
261 *error = tap_error_;
262 }
263 return tap_result_;
264}
265
266StreamResult StreamTap::Read(void* buffer, size_t buffer_len,
267 size_t* read, int* error) {
268 size_t backup_read;
269 if (!read) {
270 read = &backup_read;
271 }
272 StreamResult res = StreamAdapterInterface::Read(buffer, buffer_len,
273 read, error);
274 if ((res == SR_SUCCESS) && (tap_result_ == SR_SUCCESS)) {
275 tap_result_ = tap_->WriteAll(buffer, *read, NULL, &tap_error_);
276 }
277 return res;
278}
279
280StreamResult StreamTap::Write(const void* data, size_t data_len,
281 size_t* written, int* error) {
282 size_t backup_written;
283 if (!written) {
284 written = &backup_written;
285 }
286 StreamResult res = StreamAdapterInterface::Write(data, data_len,
287 written, error);
288 if ((res == SR_SUCCESS) && (tap_result_ == SR_SUCCESS)) {
289 tap_result_ = tap_->WriteAll(data, *written, NULL, &tap_error_);
290 }
291 return res;
292}
293
294///////////////////////////////////////////////////////////////////////////////
295// StreamSegment
296///////////////////////////////////////////////////////////////////////////////
297
298StreamSegment::StreamSegment(StreamInterface* stream)
299 : StreamAdapterInterface(stream), start_(SIZE_UNKNOWN), pos_(0),
300 length_(SIZE_UNKNOWN) {
301 // It's ok for this to fail, in which case start_ is left as SIZE_UNKNOWN.
302 stream->GetPosition(&start_);
303}
304
305StreamSegment::StreamSegment(StreamInterface* stream, size_t length)
306 : StreamAdapterInterface(stream), start_(SIZE_UNKNOWN), pos_(0),
307 length_(length) {
308 // It's ok for this to fail, in which case start_ is left as SIZE_UNKNOWN.
309 stream->GetPosition(&start_);
310}
311
312StreamResult StreamSegment::Read(void* buffer, size_t buffer_len,
313 size_t* read, int* error) {
314 if (SIZE_UNKNOWN != length_) {
315 if (pos_ >= length_)
316 return SR_EOS;
andresp@webrtc.orgff689be2015-02-12 11:54:26 +0000317 buffer_len = std::min(buffer_len, length_ - pos_);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000318 }
319 size_t backup_read;
320 if (!read) {
321 read = &backup_read;
322 }
323 StreamResult result = StreamAdapterInterface::Read(buffer, buffer_len,
324 read, error);
325 if (SR_SUCCESS == result) {
326 pos_ += *read;
327 }
328 return result;
329}
330
331bool StreamSegment::SetPosition(size_t position) {
332 if (SIZE_UNKNOWN == start_)
333 return false; // Not seekable
334 if ((SIZE_UNKNOWN != length_) && (position > length_))
335 return false; // Seek past end of segment
336 if (!StreamAdapterInterface::SetPosition(start_ + position))
337 return false;
338 pos_ = position;
339 return true;
340}
341
342bool StreamSegment::GetPosition(size_t* position) const {
343 if (SIZE_UNKNOWN == start_)
344 return false; // Not seekable
345 if (!StreamAdapterInterface::GetPosition(position))
346 return false;
347 if (position) {
348 ASSERT(*position >= start_);
349 *position -= start_;
350 }
351 return true;
352}
353
354bool StreamSegment::GetSize(size_t* size) const {
355 if (!StreamAdapterInterface::GetSize(size))
356 return false;
357 if (size) {
358 if (SIZE_UNKNOWN != start_) {
359 ASSERT(*size >= start_);
360 *size -= start_;
361 }
362 if (SIZE_UNKNOWN != length_) {
andresp@webrtc.orgff689be2015-02-12 11:54:26 +0000363 *size = std::min(*size, length_);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000364 }
365 }
366 return true;
367}
368
369bool StreamSegment::GetAvailable(size_t* size) const {
370 if (!StreamAdapterInterface::GetAvailable(size))
371 return false;
372 if (size && (SIZE_UNKNOWN != length_))
andresp@webrtc.orgff689be2015-02-12 11:54:26 +0000373 *size = std::min(*size, length_ - pos_);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000374 return true;
375}
376
377///////////////////////////////////////////////////////////////////////////////
378// NullStream
379///////////////////////////////////////////////////////////////////////////////
380
381NullStream::NullStream() {
382}
383
384NullStream::~NullStream() {
385}
386
387StreamState NullStream::GetState() const {
388 return SS_OPEN;
389}
390
391StreamResult NullStream::Read(void* buffer, size_t buffer_len,
392 size_t* read, int* error) {
393 if (error) *error = -1;
394 return SR_ERROR;
395}
396
397StreamResult NullStream::Write(const void* data, size_t data_len,
398 size_t* written, int* error) {
399 if (written) *written = data_len;
400 return SR_SUCCESS;
401}
402
403void NullStream::Close() {
404}
405
406///////////////////////////////////////////////////////////////////////////////
407// FileStream
408///////////////////////////////////////////////////////////////////////////////
409
410FileStream::FileStream() : file_(NULL) {
411}
412
413FileStream::~FileStream() {
414 FileStream::Close();
415}
416
417bool FileStream::Open(const std::string& filename, const char* mode,
418 int* error) {
419 Close();
420#if defined(WEBRTC_WIN)
421 std::wstring wfilename;
422 if (Utf8ToWindowsFilename(filename, &wfilename)) {
423 file_ = _wfopen(wfilename.c_str(), ToUtf16(mode).c_str());
424 } else {
425 if (error) {
426 *error = -1;
427 return false;
428 }
429 }
430#else
431 file_ = fopen(filename.c_str(), mode);
432#endif
433 if (!file_ && error) {
434 *error = errno;
435 }
436 return (file_ != NULL);
437}
438
439bool FileStream::OpenShare(const std::string& filename, const char* mode,
440 int shflag, int* error) {
441 Close();
442#if defined(WEBRTC_WIN)
443 std::wstring wfilename;
444 if (Utf8ToWindowsFilename(filename, &wfilename)) {
445 file_ = _wfsopen(wfilename.c_str(), ToUtf16(mode).c_str(), shflag);
446 if (!file_ && error) {
447 *error = errno;
448 return false;
449 }
450 return file_ != NULL;
451 } else {
452 if (error) {
453 *error = -1;
454 }
455 return false;
456 }
457#else
458 return Open(filename, mode, error);
459#endif
460}
461
462bool FileStream::DisableBuffering() {
463 if (!file_)
464 return false;
465 return (setvbuf(file_, NULL, _IONBF, 0) == 0);
466}
467
468StreamState FileStream::GetState() const {
469 return (file_ == NULL) ? SS_CLOSED : SS_OPEN;
470}
471
472StreamResult FileStream::Read(void* buffer, size_t buffer_len,
473 size_t* read, int* error) {
474 if (!file_)
475 return SR_EOS;
476 size_t result = fread(buffer, 1, buffer_len, file_);
477 if ((result == 0) && (buffer_len > 0)) {
478 if (feof(file_))
479 return SR_EOS;
480 if (error)
481 *error = errno;
482 return SR_ERROR;
483 }
484 if (read)
485 *read = result;
486 return SR_SUCCESS;
487}
488
489StreamResult FileStream::Write(const void* data, size_t data_len,
490 size_t* written, int* error) {
491 if (!file_)
492 return SR_EOS;
493 size_t result = fwrite(data, 1, data_len, file_);
494 if ((result == 0) && (data_len > 0)) {
495 if (error)
496 *error = errno;
497 return SR_ERROR;
498 }
499 if (written)
500 *written = result;
501 return SR_SUCCESS;
502}
503
504void FileStream::Close() {
505 if (file_) {
506 DoClose();
507 file_ = NULL;
508 }
509}
510
511bool FileStream::SetPosition(size_t position) {
512 if (!file_)
513 return false;
514 return (fseek(file_, static_cast<int>(position), SEEK_SET) == 0);
515}
516
517bool FileStream::GetPosition(size_t* position) const {
518 ASSERT(NULL != position);
519 if (!file_)
520 return false;
521 long result = ftell(file_);
522 if (result < 0)
523 return false;
524 if (position)
525 *position = result;
526 return true;
527}
528
529bool FileStream::GetSize(size_t* size) const {
530 ASSERT(NULL != size);
531 if (!file_)
532 return false;
533 struct stat file_stats;
534 if (fstat(fileno(file_), &file_stats) != 0)
535 return false;
536 if (size)
537 *size = file_stats.st_size;
538 return true;
539}
540
541bool FileStream::GetAvailable(size_t* size) const {
542 ASSERT(NULL != size);
543 if (!GetSize(size))
544 return false;
545 long result = ftell(file_);
546 if (result < 0)
547 return false;
548 if (size)
549 *size -= result;
550 return true;
551}
552
553bool FileStream::ReserveSize(size_t size) {
554 // TODO: extend the file to the proper length
555 return true;
556}
557
558bool FileStream::GetSize(const std::string& filename, size_t* size) {
559 struct stat file_stats;
560 if (stat(filename.c_str(), &file_stats) != 0)
561 return false;
562 *size = file_stats.st_size;
563 return true;
564}
565
566bool FileStream::Flush() {
567 if (file_) {
568 return (0 == fflush(file_));
569 }
570 // try to flush empty file?
571 ASSERT(false);
572 return false;
573}
574
575#if defined(WEBRTC_POSIX) && !defined(__native_client__)
576
577bool FileStream::TryLock() {
578 if (file_ == NULL) {
579 // Stream not open.
580 ASSERT(false);
581 return false;
582 }
583
584 return flock(fileno(file_), LOCK_EX|LOCK_NB) == 0;
585}
586
587bool FileStream::Unlock() {
588 if (file_ == NULL) {
589 // Stream not open.
590 ASSERT(false);
591 return false;
592 }
593
594 return flock(fileno(file_), LOCK_UN) == 0;
595}
596
597#endif
598
599void FileStream::DoClose() {
600 fclose(file_);
601}
602
603CircularFileStream::CircularFileStream(size_t max_size)
604 : max_write_size_(max_size),
605 position_(0),
606 marked_position_(max_size / 2),
607 last_write_position_(0),
608 read_segment_(READ_LATEST),
609 read_segment_available_(0) {
610}
611
612bool CircularFileStream::Open(
613 const std::string& filename, const char* mode, int* error) {
614 if (!FileStream::Open(filename.c_str(), mode, error))
615 return false;
616
617 if (strchr(mode, "r") != NULL) { // Opened in read mode.
618 // Check if the buffer has been overwritten and determine how to read the
619 // log in time sequence.
620 size_t file_size;
621 GetSize(&file_size);
622 if (file_size == position_) {
623 // The buffer has not been overwritten yet. Read 0 .. file_size
624 read_segment_ = READ_LATEST;
625 read_segment_available_ = file_size;
626 } else {
627 // The buffer has been over written. There are three segments: The first
628 // one is 0 .. marked_position_, which is the marked earliest log. The
629 // second one is position_ .. file_size, which is the middle log. The
630 // last one is marked_position_ .. position_, which is the latest log.
631 read_segment_ = READ_MARKED;
632 read_segment_available_ = marked_position_;
633 last_write_position_ = position_;
634 }
635
636 // Read from the beginning.
637 position_ = 0;
638 SetPosition(position_);
639 }
640
641 return true;
642}
643
644StreamResult CircularFileStream::Read(void* buffer, size_t buffer_len,
645 size_t* read, int* error) {
646 if (read_segment_available_ == 0) {
647 size_t file_size;
648 switch (read_segment_) {
649 case READ_MARKED: // Finished READ_MARKED and start READ_MIDDLE.
650 read_segment_ = READ_MIDDLE;
651 position_ = last_write_position_;
652 SetPosition(position_);
653 GetSize(&file_size);
654 read_segment_available_ = file_size - position_;
655 break;
656
657 case READ_MIDDLE: // Finished READ_MIDDLE and start READ_LATEST.
658 read_segment_ = READ_LATEST;
659 position_ = marked_position_;
660 SetPosition(position_);
661 read_segment_available_ = last_write_position_ - position_;
662 break;
663
664 default: // Finished READ_LATEST and return EOS.
665 return rtc::SR_EOS;
666 }
667 }
668
669 size_t local_read;
670 if (!read) read = &local_read;
671
andresp@webrtc.orgff689be2015-02-12 11:54:26 +0000672 size_t to_read = std::min(buffer_len, read_segment_available_);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000673 rtc::StreamResult result
674 = rtc::FileStream::Read(buffer, to_read, read, error);
675 if (result == rtc::SR_SUCCESS) {
676 read_segment_available_ -= *read;
677 position_ += *read;
678 }
679 return result;
680}
681
682StreamResult CircularFileStream::Write(const void* data, size_t data_len,
683 size_t* written, int* error) {
684 if (position_ >= max_write_size_) {
685 ASSERT(position_ == max_write_size_);
686 position_ = marked_position_;
687 SetPosition(position_);
688 }
689
690 size_t local_written;
691 if (!written) written = &local_written;
692
693 size_t to_eof = max_write_size_ - position_;
andresp@webrtc.orgff689be2015-02-12 11:54:26 +0000694 size_t to_write = std::min(data_len, to_eof);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000695 rtc::StreamResult result
696 = rtc::FileStream::Write(data, to_write, written, error);
697 if (result == rtc::SR_SUCCESS) {
698 position_ += *written;
699 }
700 return result;
701}
702
kwiberg@webrtc.org67186fe2015-03-09 22:21:53 +0000703AsyncWriteStream::AsyncWriteStream(StreamInterface* stream,
704 rtc::Thread* write_thread)
705 : stream_(stream),
706 write_thread_(write_thread),
707 state_(stream ? stream->GetState() : SS_CLOSED) {
708}
709
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000710AsyncWriteStream::~AsyncWriteStream() {
711 write_thread_->Clear(this, 0, NULL);
712 ClearBufferAndWrite();
713
714 CritScope cs(&crit_stream_);
715 stream_.reset();
716}
717
kwiberg@webrtc.org67186fe2015-03-09 22:21:53 +0000718StreamState AsyncWriteStream::GetState() const {
719 return state_;
720}
721
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000722// This is needed by some stream writers, such as RtpDumpWriter.
723bool AsyncWriteStream::GetPosition(size_t* position) const {
724 CritScope cs(&crit_stream_);
725 return stream_->GetPosition(position);
726}
727
728// This is needed by some stream writers, such as the plugin log writers.
729StreamResult AsyncWriteStream::Read(void* buffer, size_t buffer_len,
730 size_t* read, int* error) {
731 CritScope cs(&crit_stream_);
732 return stream_->Read(buffer, buffer_len, read, error);
733}
734
735void AsyncWriteStream::Close() {
736 if (state_ == SS_CLOSED) {
737 return;
738 }
739
740 write_thread_->Clear(this, 0, NULL);
741 ClearBufferAndWrite();
742
743 CritScope cs(&crit_stream_);
744 stream_->Close();
745 state_ = SS_CLOSED;
746}
747
748StreamResult AsyncWriteStream::Write(const void* data, size_t data_len,
749 size_t* written, int* error) {
750 if (state_ == SS_CLOSED) {
751 return SR_ERROR;
752 }
753
754 size_t previous_buffer_length = 0;
755 {
756 CritScope cs(&crit_buffer_);
kwiberg@webrtc.orgeebcab52015-03-24 09:19:06 +0000757 previous_buffer_length = buffer_.size();
Karl Wiberg94784372015-04-20 14:03:07 +0200758 buffer_.AppendData(reinterpret_cast<const uint8_t*>(data), data_len);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000759 }
760
761 if (previous_buffer_length == 0) {
762 // If there's stuff already in the buffer, then we already called
763 // Post and the write_thread_ hasn't pulled it out yet, so we
764 // don't need to re-Post.
765 write_thread_->Post(this, 0, NULL);
766 }
767 // Return immediately, assuming that it works.
768 if (written) {
769 *written = data_len;
770 }
771 return SR_SUCCESS;
772}
773
774void AsyncWriteStream::OnMessage(rtc::Message* pmsg) {
775 ClearBufferAndWrite();
776}
777
778bool AsyncWriteStream::Flush() {
779 if (state_ == SS_CLOSED) {
780 return false;
781 }
782
783 ClearBufferAndWrite();
784
785 CritScope cs(&crit_stream_);
786 return stream_->Flush();
787}
788
789void AsyncWriteStream::ClearBufferAndWrite() {
790 Buffer to_write;
791 {
792 CritScope cs_buffer(&crit_buffer_);
Karl Wiberg94784372015-04-20 14:03:07 +0200793 to_write = buffer_.Pass();
794 buffer_.Clear();
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000795 }
796
kwiberg@webrtc.orgeebcab52015-03-24 09:19:06 +0000797 if (to_write.size() > 0) {
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000798 CritScope cs(&crit_stream_);
kwiberg@webrtc.orgeebcab52015-03-24 09:19:06 +0000799 stream_->WriteAll(to_write.data(), to_write.size(), NULL, NULL);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000800 }
801}
802
803#if defined(WEBRTC_POSIX) && !defined(__native_client__)
804
805// Have to identically rewrite the FileStream destructor or else it would call
806// the base class's Close() instead of the sub-class's.
807POpenStream::~POpenStream() {
808 POpenStream::Close();
809}
810
811bool POpenStream::Open(const std::string& subcommand,
812 const char* mode,
813 int* error) {
814 Close();
815 file_ = popen(subcommand.c_str(), mode);
816 if (file_ == NULL) {
817 if (error)
818 *error = errno;
819 return false;
820 }
821 return true;
822}
823
824bool POpenStream::OpenShare(const std::string& subcommand, const char* mode,
825 int shflag, int* error) {
826 return Open(subcommand, mode, error);
827}
828
829void POpenStream::DoClose() {
830 wait_status_ = pclose(file_);
831}
832
833#endif
834
835///////////////////////////////////////////////////////////////////////////////
836// MemoryStream
837///////////////////////////////////////////////////////////////////////////////
838
839MemoryStreamBase::MemoryStreamBase()
840 : buffer_(NULL), buffer_length_(0), data_length_(0),
841 seek_position_(0) {
842}
843
844StreamState MemoryStreamBase::GetState() const {
845 return SS_OPEN;
846}
847
848StreamResult MemoryStreamBase::Read(void* buffer, size_t bytes,
849 size_t* bytes_read, int* error) {
850 if (seek_position_ >= data_length_) {
851 return SR_EOS;
852 }
853 size_t available = data_length_ - seek_position_;
854 if (bytes > available) {
855 // Read partial buffer
856 bytes = available;
857 }
858 memcpy(buffer, &buffer_[seek_position_], bytes);
859 seek_position_ += bytes;
860 if (bytes_read) {
861 *bytes_read = bytes;
862 }
863 return SR_SUCCESS;
864}
865
866StreamResult MemoryStreamBase::Write(const void* buffer, size_t bytes,
867 size_t* bytes_written, int* error) {
868 size_t available = buffer_length_ - seek_position_;
869 if (0 == available) {
870 // Increase buffer size to the larger of:
871 // a) new position rounded up to next 256 bytes
872 // b) double the previous length
andresp@webrtc.orgff689be2015-02-12 11:54:26 +0000873 size_t new_buffer_length =
874 std::max(((seek_position_ + bytes) | 0xFF) + 1, buffer_length_ * 2);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000875 StreamResult result = DoReserve(new_buffer_length, error);
876 if (SR_SUCCESS != result) {
877 return result;
878 }
879 ASSERT(buffer_length_ >= new_buffer_length);
880 available = buffer_length_ - seek_position_;
881 }
882
883 if (bytes > available) {
884 bytes = available;
885 }
886 memcpy(&buffer_[seek_position_], buffer, bytes);
887 seek_position_ += bytes;
888 if (data_length_ < seek_position_) {
889 data_length_ = seek_position_;
890 }
891 if (bytes_written) {
892 *bytes_written = bytes;
893 }
894 return SR_SUCCESS;
895}
896
897void MemoryStreamBase::Close() {
898 // nothing to do
899}
900
901bool MemoryStreamBase::SetPosition(size_t position) {
902 if (position > data_length_)
903 return false;
904 seek_position_ = position;
905 return true;
906}
907
908bool MemoryStreamBase::GetPosition(size_t* position) const {
909 if (position)
910 *position = seek_position_;
911 return true;
912}
913
914bool MemoryStreamBase::GetSize(size_t* size) const {
915 if (size)
916 *size = data_length_;
917 return true;
918}
919
920bool MemoryStreamBase::GetAvailable(size_t* size) const {
921 if (size)
922 *size = data_length_ - seek_position_;
923 return true;
924}
925
926bool MemoryStreamBase::ReserveSize(size_t size) {
927 return (SR_SUCCESS == DoReserve(size, NULL));
928}
929
930StreamResult MemoryStreamBase::DoReserve(size_t size, int* error) {
931 return (buffer_length_ >= size) ? SR_SUCCESS : SR_EOS;
932}
933
934///////////////////////////////////////////////////////////////////////////////
935
936MemoryStream::MemoryStream()
937 : buffer_alloc_(NULL) {
938}
939
940MemoryStream::MemoryStream(const char* data)
941 : buffer_alloc_(NULL) {
942 SetData(data, strlen(data));
943}
944
945MemoryStream::MemoryStream(const void* data, size_t length)
946 : buffer_alloc_(NULL) {
947 SetData(data, length);
948}
949
950MemoryStream::~MemoryStream() {
951 delete [] buffer_alloc_;
952}
953
954void MemoryStream::SetData(const void* data, size_t length) {
955 data_length_ = buffer_length_ = length;
956 delete [] buffer_alloc_;
957 buffer_alloc_ = new char[buffer_length_ + kAlignment];
958 buffer_ = reinterpret_cast<char*>(ALIGNP(buffer_alloc_, kAlignment));
959 memcpy(buffer_, data, data_length_);
960 seek_position_ = 0;
961}
962
963StreamResult MemoryStream::DoReserve(size_t size, int* error) {
964 if (buffer_length_ >= size)
965 return SR_SUCCESS;
966
967 if (char* new_buffer_alloc = new char[size + kAlignment]) {
968 char* new_buffer = reinterpret_cast<char*>(
969 ALIGNP(new_buffer_alloc, kAlignment));
970 memcpy(new_buffer, buffer_, data_length_);
971 delete [] buffer_alloc_;
972 buffer_alloc_ = new_buffer_alloc;
973 buffer_ = new_buffer;
974 buffer_length_ = size;
975 return SR_SUCCESS;
976 }
977
978 if (error) {
979 *error = ENOMEM;
980 }
981 return SR_ERROR;
982}
983
984///////////////////////////////////////////////////////////////////////////////
985
986ExternalMemoryStream::ExternalMemoryStream() {
987}
988
989ExternalMemoryStream::ExternalMemoryStream(void* data, size_t length) {
990 SetData(data, length);
991}
992
993ExternalMemoryStream::~ExternalMemoryStream() {
994}
995
996void ExternalMemoryStream::SetData(void* data, size_t length) {
997 data_length_ = buffer_length_ = length;
998 buffer_ = static_cast<char*>(data);
999 seek_position_ = 0;
1000}
1001
1002///////////////////////////////////////////////////////////////////////////////
1003// FifoBuffer
1004///////////////////////////////////////////////////////////////////////////////
1005
1006FifoBuffer::FifoBuffer(size_t size)
1007 : state_(SS_OPEN), buffer_(new char[size]), buffer_length_(size),
1008 data_length_(0), read_position_(0), owner_(Thread::Current()) {
1009 // all events are done on the owner_ thread
1010}
1011
1012FifoBuffer::FifoBuffer(size_t size, Thread* owner)
1013 : state_(SS_OPEN), buffer_(new char[size]), buffer_length_(size),
1014 data_length_(0), read_position_(0), owner_(owner) {
1015 // all events are done on the owner_ thread
1016}
1017
1018FifoBuffer::~FifoBuffer() {
1019}
1020
1021bool FifoBuffer::GetBuffered(size_t* size) const {
1022 CritScope cs(&crit_);
1023 *size = data_length_;
1024 return true;
1025}
1026
1027bool FifoBuffer::SetCapacity(size_t size) {
1028 CritScope cs(&crit_);
1029 if (data_length_ > size) {
1030 return false;
1031 }
1032
1033 if (size != buffer_length_) {
1034 char* buffer = new char[size];
1035 const size_t copy = data_length_;
andresp@webrtc.orgff689be2015-02-12 11:54:26 +00001036 const size_t tail_copy = std::min(copy, buffer_length_ - read_position_);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +00001037 memcpy(buffer, &buffer_[read_position_], tail_copy);
1038 memcpy(buffer + tail_copy, &buffer_[0], copy - tail_copy);
1039 buffer_.reset(buffer);
1040 read_position_ = 0;
1041 buffer_length_ = size;
1042 }
1043 return true;
1044}
1045
1046StreamResult FifoBuffer::ReadOffset(void* buffer, size_t bytes,
1047 size_t offset, size_t* bytes_read) {
1048 CritScope cs(&crit_);
1049 return ReadOffsetLocked(buffer, bytes, offset, bytes_read);
1050}
1051
1052StreamResult FifoBuffer::WriteOffset(const void* buffer, size_t bytes,
1053 size_t offset, size_t* bytes_written) {
1054 CritScope cs(&crit_);
1055 return WriteOffsetLocked(buffer, bytes, offset, bytes_written);
1056}
1057
1058StreamState FifoBuffer::GetState() const {
1059 return state_;
1060}
1061
1062StreamResult FifoBuffer::Read(void* buffer, size_t bytes,
1063 size_t* bytes_read, int* error) {
1064 CritScope cs(&crit_);
1065 const bool was_writable = data_length_ < buffer_length_;
1066 size_t copy = 0;
1067 StreamResult result = ReadOffsetLocked(buffer, bytes, 0, &copy);
1068
1069 if (result == SR_SUCCESS) {
1070 // If read was successful then adjust the read position and number of
1071 // bytes buffered.
1072 read_position_ = (read_position_ + copy) % buffer_length_;
1073 data_length_ -= copy;
1074 if (bytes_read) {
1075 *bytes_read = copy;
1076 }
1077
1078 // if we were full before, and now we're not, post an event
1079 if (!was_writable && copy > 0) {
1080 PostEvent(owner_, SE_WRITE, 0);
1081 }
1082 }
1083 return result;
1084}
1085
1086StreamResult FifoBuffer::Write(const void* buffer, size_t bytes,
1087 size_t* bytes_written, int* error) {
1088 CritScope cs(&crit_);
1089
1090 const bool was_readable = (data_length_ > 0);
1091 size_t copy = 0;
1092 StreamResult result = WriteOffsetLocked(buffer, bytes, 0, &copy);
1093
1094 if (result == SR_SUCCESS) {
1095 // If write was successful then adjust the number of readable bytes.
1096 data_length_ += copy;
1097 if (bytes_written) {
1098 *bytes_written = copy;
1099 }
1100
1101 // if we didn't have any data to read before, and now we do, post an event
1102 if (!was_readable && copy > 0) {
1103 PostEvent(owner_, SE_READ, 0);
1104 }
1105 }
1106 return result;
1107}
1108
1109void FifoBuffer::Close() {
1110 CritScope cs(&crit_);
1111 state_ = SS_CLOSED;
1112}
1113
1114const void* FifoBuffer::GetReadData(size_t* size) {
1115 CritScope cs(&crit_);
1116 *size = (read_position_ + data_length_ <= buffer_length_) ?
1117 data_length_ : buffer_length_ - read_position_;
1118 return &buffer_[read_position_];
1119}
1120
1121void FifoBuffer::ConsumeReadData(size_t size) {
1122 CritScope cs(&crit_);
1123 ASSERT(size <= data_length_);
1124 const bool was_writable = data_length_ < buffer_length_;
1125 read_position_ = (read_position_ + size) % buffer_length_;
1126 data_length_ -= size;
1127 if (!was_writable && size > 0) {
1128 PostEvent(owner_, SE_WRITE, 0);
1129 }
1130}
1131
1132void* FifoBuffer::GetWriteBuffer(size_t* size) {
1133 CritScope cs(&crit_);
1134 if (state_ == SS_CLOSED) {
1135 return NULL;
1136 }
1137
1138 // if empty, reset the write position to the beginning, so we can get
1139 // the biggest possible block
1140 if (data_length_ == 0) {
1141 read_position_ = 0;
1142 }
1143
1144 const size_t write_position = (read_position_ + data_length_)
1145 % buffer_length_;
1146 *size = (write_position > read_position_ || data_length_ == 0) ?
1147 buffer_length_ - write_position : read_position_ - write_position;
1148 return &buffer_[write_position];
1149}
1150
1151void FifoBuffer::ConsumeWriteBuffer(size_t size) {
1152 CritScope cs(&crit_);
1153 ASSERT(size <= buffer_length_ - data_length_);
1154 const bool was_readable = (data_length_ > 0);
1155 data_length_ += size;
1156 if (!was_readable && size > 0) {
1157 PostEvent(owner_, SE_READ, 0);
1158 }
1159}
1160
1161bool FifoBuffer::GetWriteRemaining(size_t* size) const {
1162 CritScope cs(&crit_);
1163 *size = buffer_length_ - data_length_;
1164 return true;
1165}
1166
1167StreamResult FifoBuffer::ReadOffsetLocked(void* buffer,
1168 size_t bytes,
1169 size_t offset,
1170 size_t* bytes_read) {
1171 if (offset >= data_length_) {
1172 return (state_ != SS_CLOSED) ? SR_BLOCK : SR_EOS;
1173 }
1174
1175 const size_t available = data_length_ - offset;
1176 const size_t read_position = (read_position_ + offset) % buffer_length_;
andresp@webrtc.orgff689be2015-02-12 11:54:26 +00001177 const size_t copy = std::min(bytes, available);
1178 const size_t tail_copy = std::min(copy, buffer_length_ - read_position);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +00001179 char* const p = static_cast<char*>(buffer);
1180 memcpy(p, &buffer_[read_position], tail_copy);
1181 memcpy(p + tail_copy, &buffer_[0], copy - tail_copy);
1182
1183 if (bytes_read) {
1184 *bytes_read = copy;
1185 }
1186 return SR_SUCCESS;
1187}
1188
1189StreamResult FifoBuffer::WriteOffsetLocked(const void* buffer,
1190 size_t bytes,
1191 size_t offset,
1192 size_t* bytes_written) {
1193 if (state_ == SS_CLOSED) {
1194 return SR_EOS;
1195 }
1196
1197 if (data_length_ + offset >= buffer_length_) {
1198 return SR_BLOCK;
1199 }
1200
1201 const size_t available = buffer_length_ - data_length_ - offset;
1202 const size_t write_position = (read_position_ + data_length_ + offset)
1203 % buffer_length_;
andresp@webrtc.orgff689be2015-02-12 11:54:26 +00001204 const size_t copy = std::min(bytes, available);
1205 const size_t tail_copy = std::min(copy, buffer_length_ - write_position);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +00001206 const char* const p = static_cast<const char*>(buffer);
1207 memcpy(&buffer_[write_position], p, tail_copy);
1208 memcpy(&buffer_[0], p + tail_copy, copy - tail_copy);
1209
1210 if (bytes_written) {
1211 *bytes_written = copy;
1212 }
1213 return SR_SUCCESS;
1214}
1215
1216
1217
1218///////////////////////////////////////////////////////////////////////////////
1219// LoggingAdapter
1220///////////////////////////////////////////////////////////////////////////////
1221
1222LoggingAdapter::LoggingAdapter(StreamInterface* stream, LoggingSeverity level,
1223 const std::string& label, bool hex_mode)
1224 : StreamAdapterInterface(stream), level_(level), hex_mode_(hex_mode) {
1225 set_label(label);
1226}
1227
1228void LoggingAdapter::set_label(const std::string& label) {
1229 label_.assign("[");
1230 label_.append(label);
1231 label_.append("]");
1232}
1233
1234StreamResult LoggingAdapter::Read(void* buffer, size_t buffer_len,
1235 size_t* read, int* error) {
1236 size_t local_read; if (!read) read = &local_read;
1237 StreamResult result = StreamAdapterInterface::Read(buffer, buffer_len, read,
1238 error);
1239 if (result == SR_SUCCESS) {
1240 LogMultiline(level_, label_.c_str(), true, buffer, *read, hex_mode_, &lms_);
1241 }
1242 return result;
1243}
1244
1245StreamResult LoggingAdapter::Write(const void* data, size_t data_len,
1246 size_t* written, int* error) {
1247 size_t local_written;
1248 if (!written) written = &local_written;
1249 StreamResult result = StreamAdapterInterface::Write(data, data_len, written,
1250 error);
1251 if (result == SR_SUCCESS) {
1252 LogMultiline(level_, label_.c_str(), false, data, *written, hex_mode_,
1253 &lms_);
1254 }
1255 return result;
1256}
1257
1258void LoggingAdapter::Close() {
1259 LogMultiline(level_, label_.c_str(), false, NULL, 0, hex_mode_, &lms_);
1260 LogMultiline(level_, label_.c_str(), true, NULL, 0, hex_mode_, &lms_);
1261 LOG_V(level_) << label_ << " Closed locally";
1262 StreamAdapterInterface::Close();
1263}
1264
1265void LoggingAdapter::OnEvent(StreamInterface* stream, int events, int err) {
1266 if (events & SE_OPEN) {
1267 LOG_V(level_) << label_ << " Open";
1268 } else if (events & SE_CLOSE) {
1269 LogMultiline(level_, label_.c_str(), false, NULL, 0, hex_mode_, &lms_);
1270 LogMultiline(level_, label_.c_str(), true, NULL, 0, hex_mode_, &lms_);
1271 LOG_V(level_) << label_ << " Closed with error: " << err;
1272 }
1273 StreamAdapterInterface::OnEvent(stream, events, err);
1274}
1275
1276///////////////////////////////////////////////////////////////////////////////
1277// StringStream - Reads/Writes to an external std::string
1278///////////////////////////////////////////////////////////////////////////////
1279
1280StringStream::StringStream(std::string& str)
1281 : str_(str), read_pos_(0), read_only_(false) {
1282}
1283
1284StringStream::StringStream(const std::string& str)
1285 : str_(const_cast<std::string&>(str)), read_pos_(0), read_only_(true) {
1286}
1287
1288StreamState StringStream::GetState() const {
1289 return SS_OPEN;
1290}
1291
1292StreamResult StringStream::Read(void* buffer, size_t buffer_len,
1293 size_t* read, int* error) {
andresp@webrtc.orgff689be2015-02-12 11:54:26 +00001294 size_t available = std::min(buffer_len, str_.size() - read_pos_);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +00001295 if (!available)
1296 return SR_EOS;
1297 memcpy(buffer, str_.data() + read_pos_, available);
1298 read_pos_ += available;
1299 if (read)
1300 *read = available;
1301 return SR_SUCCESS;
1302}
1303
1304StreamResult StringStream::Write(const void* data, size_t data_len,
1305 size_t* written, int* error) {
1306 if (read_only_) {
1307 if (error) {
1308 *error = -1;
1309 }
1310 return SR_ERROR;
1311 }
1312 str_.append(static_cast<const char*>(data),
1313 static_cast<const char*>(data) + data_len);
1314 if (written)
1315 *written = data_len;
1316 return SR_SUCCESS;
1317}
1318
1319void StringStream::Close() {
1320}
1321
1322bool StringStream::SetPosition(size_t position) {
1323 if (position > str_.size())
1324 return false;
1325 read_pos_ = position;
1326 return true;
1327}
1328
1329bool StringStream::GetPosition(size_t* position) const {
1330 if (position)
1331 *position = read_pos_;
1332 return true;
1333}
1334
1335bool StringStream::GetSize(size_t* size) const {
1336 if (size)
1337 *size = str_.size();
1338 return true;
1339}
1340
1341bool StringStream::GetAvailable(size_t* size) const {
1342 if (size)
1343 *size = str_.size() - read_pos_;
1344 return true;
1345}
1346
1347bool StringStream::ReserveSize(size_t size) {
1348 if (read_only_)
1349 return false;
1350 str_.reserve(size);
1351 return true;
1352}
1353
1354///////////////////////////////////////////////////////////////////////////////
1355// StreamReference
1356///////////////////////////////////////////////////////////////////////////////
1357
1358StreamReference::StreamReference(StreamInterface* stream)
1359 : StreamAdapterInterface(stream, false) {
1360 // owner set to false so the destructor does not free the stream.
1361 stream_ref_count_ = new StreamRefCount(stream);
1362}
1363
1364StreamInterface* StreamReference::NewReference() {
1365 stream_ref_count_->AddReference();
1366 return new StreamReference(stream_ref_count_, stream());
1367}
1368
1369StreamReference::~StreamReference() {
1370 stream_ref_count_->Release();
1371}
1372
1373StreamReference::StreamReference(StreamRefCount* stream_ref_count,
1374 StreamInterface* stream)
1375 : StreamAdapterInterface(stream, false),
1376 stream_ref_count_(stream_ref_count) {
1377}
1378
1379///////////////////////////////////////////////////////////////////////////////
1380
1381StreamResult Flow(StreamInterface* source,
1382 char* buffer, size_t buffer_len,
1383 StreamInterface* sink,
1384 size_t* data_len /* = NULL */) {
1385 ASSERT(buffer_len > 0);
1386
1387 StreamResult result;
1388 size_t count, read_pos, write_pos;
1389 if (data_len) {
1390 read_pos = *data_len;
1391 } else {
1392 read_pos = 0;
1393 }
1394
1395 bool end_of_stream = false;
1396 do {
1397 // Read until buffer is full, end of stream, or error
1398 while (!end_of_stream && (read_pos < buffer_len)) {
1399 result = source->Read(buffer + read_pos, buffer_len - read_pos,
1400 &count, NULL);
1401 if (result == SR_EOS) {
1402 end_of_stream = true;
1403 } else if (result != SR_SUCCESS) {
1404 if (data_len) {
1405 *data_len = read_pos;
1406 }
1407 return result;
1408 } else {
1409 read_pos += count;
1410 }
1411 }
1412
1413 // Write until buffer is empty, or error (including end of stream)
1414 write_pos = 0;
1415 while (write_pos < read_pos) {
1416 result = sink->Write(buffer + write_pos, read_pos - write_pos,
1417 &count, NULL);
1418 if (result != SR_SUCCESS) {
1419 if (data_len) {
1420 *data_len = read_pos - write_pos;
1421 if (write_pos > 0) {
1422 memmove(buffer, buffer + write_pos, *data_len);
1423 }
1424 }
1425 return result;
1426 }
1427 write_pos += count;
1428 }
1429
1430 read_pos = 0;
1431 } while (!end_of_stream);
1432
1433 if (data_len) {
1434 *data_len = 0;
1435 }
1436 return SR_SUCCESS;
1437}
1438
1439///////////////////////////////////////////////////////////////////////////////
1440
1441} // namespace rtc