blob: 20adfcfa903f0b99735e24509666647a9fed28c9 [file] [log] [blame]
henrike@webrtc.org28e20752013-07-10 00:45:36 +00001/*
2 * libjingle
3 * Copyright 2004 Google Inc.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 * 1. Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * 2. Redistributions in binary form must reproduce the above copyright notice,
11 * this list of conditions and the following disclaimer in the documentation
12 * and/or other materials provided with the distribution.
13 * 3. The name of the author may not be used to endorse or promote products
14 * derived from this software without specific prior written permission.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED
17 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
18 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
19 * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
20 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
22 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
23 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
24 * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
25 * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26 */
27
28#if defined(POSIX)
29#include <sys/file.h>
30#endif // POSIX
31#include <sys/types.h>
32#include <sys/stat.h>
33#include <errno.h>
34#include <string>
35#include "talk/base/basictypes.h"
36#include "talk/base/common.h"
37#include "talk/base/logging.h"
38#include "talk/base/messagequeue.h"
39#include "talk/base/stream.h"
40#include "talk/base/stringencode.h"
41#include "talk/base/stringutils.h"
42#include "talk/base/thread.h"
43#include "talk/base/timeutils.h"
44
45#ifdef WIN32
46#include "talk/base/win32.h"
47#define fileno _fileno
48#endif
49
50namespace talk_base {
51
52///////////////////////////////////////////////////////////////////////////////
53// StreamInterface
54///////////////////////////////////////////////////////////////////////////////
55StreamInterface::~StreamInterface() {
56}
57
58StreamResult StreamInterface::WriteAll(const void* data, size_t data_len,
59 size_t* written, int* error) {
60 StreamResult result = SR_SUCCESS;
61 size_t total_written = 0, current_written;
62 while (total_written < data_len) {
63 result = Write(static_cast<const char*>(data) + total_written,
64 data_len - total_written, &current_written, error);
65 if (result != SR_SUCCESS)
66 break;
67 total_written += current_written;
68 }
69 if (written)
70 *written = total_written;
71 return result;
72}
73
74StreamResult StreamInterface::ReadAll(void* buffer, size_t buffer_len,
75 size_t* read, int* error) {
76 StreamResult result = SR_SUCCESS;
77 size_t total_read = 0, current_read;
78 while (total_read < buffer_len) {
79 result = Read(static_cast<char*>(buffer) + total_read,
80 buffer_len - total_read, &current_read, error);
81 if (result != SR_SUCCESS)
82 break;
83 total_read += current_read;
84 }
85 if (read)
86 *read = total_read;
87 return result;
88}
89
90StreamResult StreamInterface::ReadLine(std::string* line) {
91 line->clear();
92 StreamResult result = SR_SUCCESS;
93 while (true) {
94 char ch;
95 result = Read(&ch, sizeof(ch), NULL, NULL);
96 if (result != SR_SUCCESS) {
97 break;
98 }
99 if (ch == '\n') {
100 break;
101 }
102 line->push_back(ch);
103 }
104 if (!line->empty()) { // give back the line we've collected so far with
105 result = SR_SUCCESS; // a success code. Otherwise return the last code
106 }
107 return result;
108}
109
110void StreamInterface::PostEvent(Thread* t, int events, int err) {
111 t->Post(this, MSG_POST_EVENT, new StreamEventData(events, err));
112}
113
114void StreamInterface::PostEvent(int events, int err) {
115 PostEvent(Thread::Current(), events, err);
116}
117
118StreamInterface::StreamInterface() {
119}
120
121void StreamInterface::OnMessage(Message* msg) {
122 if (MSG_POST_EVENT == msg->message_id) {
123 StreamEventData* pe = static_cast<StreamEventData*>(msg->pdata);
124 SignalEvent(this, pe->events, pe->error);
125 delete msg->pdata;
126 }
127}
128
129///////////////////////////////////////////////////////////////////////////////
130// StreamAdapterInterface
131///////////////////////////////////////////////////////////////////////////////
132
133StreamAdapterInterface::StreamAdapterInterface(StreamInterface* stream,
134 bool owned)
135 : stream_(stream), owned_(owned) {
136 if (NULL != stream_)
137 stream_->SignalEvent.connect(this, &StreamAdapterInterface::OnEvent);
138}
139
140void StreamAdapterInterface::Attach(StreamInterface* stream, bool owned) {
141 if (NULL != stream_)
142 stream_->SignalEvent.disconnect(this);
143 if (owned_)
144 delete stream_;
145 stream_ = stream;
146 owned_ = owned;
147 if (NULL != stream_)
148 stream_->SignalEvent.connect(this, &StreamAdapterInterface::OnEvent);
149}
150
151StreamInterface* StreamAdapterInterface::Detach() {
152 if (NULL != stream_)
153 stream_->SignalEvent.disconnect(this);
154 StreamInterface* stream = stream_;
155 stream_ = NULL;
156 return stream;
157}
158
159StreamAdapterInterface::~StreamAdapterInterface() {
160 if (owned_)
161 delete stream_;
162}
163
164///////////////////////////////////////////////////////////////////////////////
165// StreamTap
166///////////////////////////////////////////////////////////////////////////////
167
168StreamTap::StreamTap(StreamInterface* stream, StreamInterface* tap)
169 : StreamAdapterInterface(stream), tap_(NULL), tap_result_(SR_SUCCESS),
170 tap_error_(0) {
171 AttachTap(tap);
172}
173
174void StreamTap::AttachTap(StreamInterface* tap) {
175 tap_.reset(tap);
176}
177
178StreamInterface* StreamTap::DetachTap() {
179 return tap_.release();
180}
181
182StreamResult StreamTap::GetTapResult(int* error) {
183 if (error) {
184 *error = tap_error_;
185 }
186 return tap_result_;
187}
188
189StreamResult StreamTap::Read(void* buffer, size_t buffer_len,
190 size_t* read, int* error) {
191 size_t backup_read;
192 if (!read) {
193 read = &backup_read;
194 }
195 StreamResult res = StreamAdapterInterface::Read(buffer, buffer_len,
196 read, error);
197 if ((res == SR_SUCCESS) && (tap_result_ == SR_SUCCESS)) {
198 tap_result_ = tap_->WriteAll(buffer, *read, NULL, &tap_error_);
199 }
200 return res;
201}
202
203StreamResult StreamTap::Write(const void* data, size_t data_len,
204 size_t* written, int* error) {
205 size_t backup_written;
206 if (!written) {
207 written = &backup_written;
208 }
209 StreamResult res = StreamAdapterInterface::Write(data, data_len,
210 written, error);
211 if ((res == SR_SUCCESS) && (tap_result_ == SR_SUCCESS)) {
212 tap_result_ = tap_->WriteAll(data, *written, NULL, &tap_error_);
213 }
214 return res;
215}
216
217///////////////////////////////////////////////////////////////////////////////
218// StreamSegment
219///////////////////////////////////////////////////////////////////////////////
220
221StreamSegment::StreamSegment(StreamInterface* stream)
222 : StreamAdapterInterface(stream), start_(SIZE_UNKNOWN), pos_(0),
223 length_(SIZE_UNKNOWN) {
224 // It's ok for this to fail, in which case start_ is left as SIZE_UNKNOWN.
225 stream->GetPosition(&start_);
226}
227
228StreamSegment::StreamSegment(StreamInterface* stream, size_t length)
229 : StreamAdapterInterface(stream), start_(SIZE_UNKNOWN), pos_(0),
230 length_(length) {
231 // It's ok for this to fail, in which case start_ is left as SIZE_UNKNOWN.
232 stream->GetPosition(&start_);
233}
234
235StreamResult StreamSegment::Read(void* buffer, size_t buffer_len,
236 size_t* read, int* error) {
237 if (SIZE_UNKNOWN != length_) {
238 if (pos_ >= length_)
239 return SR_EOS;
240 buffer_len = _min(buffer_len, length_ - pos_);
241 }
242 size_t backup_read;
243 if (!read) {
244 read = &backup_read;
245 }
246 StreamResult result = StreamAdapterInterface::Read(buffer, buffer_len,
247 read, error);
248 if (SR_SUCCESS == result) {
249 pos_ += *read;
250 }
251 return result;
252}
253
254bool StreamSegment::SetPosition(size_t position) {
255 if (SIZE_UNKNOWN == start_)
256 return false; // Not seekable
257 if ((SIZE_UNKNOWN != length_) && (position > length_))
258 return false; // Seek past end of segment
259 if (!StreamAdapterInterface::SetPosition(start_ + position))
260 return false;
261 pos_ = position;
262 return true;
263}
264
265bool StreamSegment::GetPosition(size_t* position) const {
266 if (SIZE_UNKNOWN == start_)
267 return false; // Not seekable
268 if (!StreamAdapterInterface::GetPosition(position))
269 return false;
270 if (position) {
271 ASSERT(*position >= start_);
272 *position -= start_;
273 }
274 return true;
275}
276
277bool StreamSegment::GetSize(size_t* size) const {
278 if (!StreamAdapterInterface::GetSize(size))
279 return false;
280 if (size) {
281 if (SIZE_UNKNOWN != start_) {
282 ASSERT(*size >= start_);
283 *size -= start_;
284 }
285 if (SIZE_UNKNOWN != length_) {
286 *size = _min(*size, length_);
287 }
288 }
289 return true;
290}
291
292bool StreamSegment::GetAvailable(size_t* size) const {
293 if (!StreamAdapterInterface::GetAvailable(size))
294 return false;
295 if (size && (SIZE_UNKNOWN != length_))
296 *size = _min(*size, length_ - pos_);
297 return true;
298}
299
300///////////////////////////////////////////////////////////////////////////////
301// NullStream
302///////////////////////////////////////////////////////////////////////////////
303
304NullStream::NullStream() {
305}
306
307NullStream::~NullStream() {
308}
309
310StreamState NullStream::GetState() const {
311 return SS_OPEN;
312}
313
314StreamResult NullStream::Read(void* buffer, size_t buffer_len,
315 size_t* read, int* error) {
316 if (error) *error = -1;
317 return SR_ERROR;
318}
319
320StreamResult NullStream::Write(const void* data, size_t data_len,
321 size_t* written, int* error) {
322 if (written) *written = data_len;
323 return SR_SUCCESS;
324}
325
326void NullStream::Close() {
327}
328
329///////////////////////////////////////////////////////////////////////////////
330// FileStream
331///////////////////////////////////////////////////////////////////////////////
332
333FileStream::FileStream() : file_(NULL) {
334}
335
336FileStream::~FileStream() {
337 FileStream::Close();
338}
339
340bool FileStream::Open(const std::string& filename, const char* mode,
341 int* error) {
342 Close();
343#ifdef WIN32
344 std::wstring wfilename;
345 if (Utf8ToWindowsFilename(filename, &wfilename)) {
346 file_ = _wfopen(wfilename.c_str(), ToUtf16(mode).c_str());
347 } else {
348 if (error) {
349 *error = -1;
350 return false;
351 }
352 }
353#else
354 file_ = fopen(filename.c_str(), mode);
355#endif
356 if (!file_ && error) {
357 *error = errno;
358 }
359 return (file_ != NULL);
360}
361
362bool FileStream::OpenShare(const std::string& filename, const char* mode,
363 int shflag, int* error) {
364 Close();
365#ifdef WIN32
366 std::wstring wfilename;
367 if (Utf8ToWindowsFilename(filename, &wfilename)) {
368 file_ = _wfsopen(wfilename.c_str(), ToUtf16(mode).c_str(), shflag);
369 if (!file_ && error) {
370 *error = errno;
371 return false;
372 }
373 return file_ != NULL;
374 } else {
375 if (error) {
376 *error = -1;
377 }
378 return false;
379 }
380#else
381 return Open(filename, mode, error);
382#endif
383}
384
385bool FileStream::DisableBuffering() {
386 if (!file_)
387 return false;
388 return (setvbuf(file_, NULL, _IONBF, 0) == 0);
389}
390
391StreamState FileStream::GetState() const {
392 return (file_ == NULL) ? SS_CLOSED : SS_OPEN;
393}
394
395StreamResult FileStream::Read(void* buffer, size_t buffer_len,
396 size_t* read, int* error) {
397 if (!file_)
398 return SR_EOS;
399 size_t result = fread(buffer, 1, buffer_len, file_);
400 if ((result == 0) && (buffer_len > 0)) {
401 if (feof(file_))
402 return SR_EOS;
403 if (error)
404 *error = errno;
405 return SR_ERROR;
406 }
407 if (read)
408 *read = result;
409 return SR_SUCCESS;
410}
411
412StreamResult FileStream::Write(const void* data, size_t data_len,
413 size_t* written, int* error) {
414 if (!file_)
415 return SR_EOS;
416 size_t result = fwrite(data, 1, data_len, file_);
417 if ((result == 0) && (data_len > 0)) {
418 if (error)
419 *error = errno;
420 return SR_ERROR;
421 }
422 if (written)
423 *written = result;
424 return SR_SUCCESS;
425}
426
427void FileStream::Close() {
428 if (file_) {
429 DoClose();
430 file_ = NULL;
431 }
432}
433
434bool FileStream::SetPosition(size_t position) {
435 if (!file_)
436 return false;
437 return (fseek(file_, static_cast<int>(position), SEEK_SET) == 0);
438}
439
440bool FileStream::GetPosition(size_t* position) const {
441 ASSERT(NULL != position);
442 if (!file_)
443 return false;
444 long result = ftell(file_);
445 if (result < 0)
446 return false;
447 if (position)
448 *position = result;
449 return true;
450}
451
452bool FileStream::GetSize(size_t* size) const {
453 ASSERT(NULL != size);
454 if (!file_)
455 return false;
456 struct stat file_stats;
457 if (fstat(fileno(file_), &file_stats) != 0)
458 return false;
459 if (size)
460 *size = file_stats.st_size;
461 return true;
462}
463
464bool FileStream::GetAvailable(size_t* size) const {
465 ASSERT(NULL != size);
466 if (!GetSize(size))
467 return false;
468 long result = ftell(file_);
469 if (result < 0)
470 return false;
471 if (size)
472 *size -= result;
473 return true;
474}
475
476bool FileStream::ReserveSize(size_t size) {
477 // TODO: extend the file to the proper length
478 return true;
479}
480
481bool FileStream::GetSize(const std::string& filename, size_t* size) {
482 struct stat file_stats;
483 if (stat(filename.c_str(), &file_stats) != 0)
484 return false;
485 *size = file_stats.st_size;
486 return true;
487}
488
489bool FileStream::Flush() {
490 if (file_) {
491 return (0 == fflush(file_));
492 }
493 // try to flush empty file?
494 ASSERT(false);
495 return false;
496}
497
498#if defined(POSIX)
499
500bool FileStream::TryLock() {
501 if (file_ == NULL) {
502 // Stream not open.
503 ASSERT(false);
504 return false;
505 }
506
507 return flock(fileno(file_), LOCK_EX|LOCK_NB) == 0;
508}
509
510bool FileStream::Unlock() {
511 if (file_ == NULL) {
512 // Stream not open.
513 ASSERT(false);
514 return false;
515 }
516
517 return flock(fileno(file_), LOCK_UN) == 0;
518}
519
520#endif
521
522void FileStream::DoClose() {
523 fclose(file_);
524}
525
526AsyncWriteStream::~AsyncWriteStream() {
527 write_thread_->Clear(this, 0, NULL);
528 ClearBufferAndWrite();
529
530 CritScope cs(&crit_stream_);
531 stream_.reset();
532}
533
534// This is needed by some stream writers, such as RtpDumpWriter.
535bool AsyncWriteStream::GetPosition(size_t* position) const {
536 CritScope cs(&crit_stream_);
537 return stream_->GetPosition(position);
538}
539
540// This is needed by some stream writers, such as the plugin log writers.
541StreamResult AsyncWriteStream::Read(void* buffer, size_t buffer_len,
542 size_t* read, int* error) {
543 CritScope cs(&crit_stream_);
544 return stream_->Read(buffer, buffer_len, read, error);
545}
546
547void AsyncWriteStream::Close() {
548 if (state_ == SS_CLOSED) {
549 return;
550 }
551
552 write_thread_->Clear(this, 0, NULL);
553 ClearBufferAndWrite();
554
555 CritScope cs(&crit_stream_);
556 stream_->Close();
557 state_ = SS_CLOSED;
558}
559
560StreamResult AsyncWriteStream::Write(const void* data, size_t data_len,
561 size_t* written, int* error) {
562 if (state_ == SS_CLOSED) {
563 return SR_ERROR;
564 }
565
566 size_t previous_buffer_length = 0;
567 {
568 CritScope cs(&crit_buffer_);
569 previous_buffer_length = buffer_.length();
570 buffer_.AppendData(data, data_len);
571 }
572
573 if (previous_buffer_length == 0) {
574 // If there's stuff already in the buffer, then we already called
575 // Post and the write_thread_ hasn't pulled it out yet, so we
576 // don't need to re-Post.
577 write_thread_->Post(this, 0, NULL);
578 }
579 // Return immediately, assuming that it works.
580 if (written) {
581 *written = data_len;
582 }
583 return SR_SUCCESS;
584}
585
586void AsyncWriteStream::OnMessage(talk_base::Message* pmsg) {
587 ClearBufferAndWrite();
588}
589
590bool AsyncWriteStream::Flush() {
591 if (state_ == SS_CLOSED) {
592 return false;
593 }
594
595 ClearBufferAndWrite();
596
597 CritScope cs(&crit_stream_);
598 return stream_->Flush();
599}
600
601void AsyncWriteStream::ClearBufferAndWrite() {
602 Buffer to_write;
603 {
604 CritScope cs_buffer(&crit_buffer_);
605 buffer_.TransferTo(&to_write);
606 }
607
608 if (to_write.length() > 0) {
609 CritScope cs(&crit_stream_);
610 stream_->WriteAll(to_write.data(), to_write.length(), NULL, NULL);
611 }
612}
613
614#ifdef POSIX
615
616// Have to identically rewrite the FileStream destructor or else it would call
617// the base class's Close() instead of the sub-class's.
618POpenStream::~POpenStream() {
619 POpenStream::Close();
620}
621
622bool POpenStream::Open(const std::string& subcommand,
623 const char* mode,
624 int* error) {
625 Close();
626 file_ = popen(subcommand.c_str(), mode);
627 if (file_ == NULL) {
628 if (error)
629 *error = errno;
630 return false;
631 }
632 return true;
633}
634
635bool POpenStream::OpenShare(const std::string& subcommand, const char* mode,
636 int shflag, int* error) {
637 return Open(subcommand, mode, error);
638}
639
640void POpenStream::DoClose() {
641 wait_status_ = pclose(file_);
642}
643
644#endif
645
646///////////////////////////////////////////////////////////////////////////////
647// MemoryStream
648///////////////////////////////////////////////////////////////////////////////
649
650MemoryStreamBase::MemoryStreamBase()
651 : buffer_(NULL), buffer_length_(0), data_length_(0),
652 seek_position_(0) {
653}
654
655StreamState MemoryStreamBase::GetState() const {
656 return SS_OPEN;
657}
658
659StreamResult MemoryStreamBase::Read(void* buffer, size_t bytes,
660 size_t* bytes_read, int* error) {
661 if (seek_position_ >= data_length_) {
662 return SR_EOS;
663 }
664 size_t available = data_length_ - seek_position_;
665 if (bytes > available) {
666 // Read partial buffer
667 bytes = available;
668 }
669 memcpy(buffer, &buffer_[seek_position_], bytes);
670 seek_position_ += bytes;
671 if (bytes_read) {
672 *bytes_read = bytes;
673 }
674 return SR_SUCCESS;
675}
676
677StreamResult MemoryStreamBase::Write(const void* buffer, size_t bytes,
678 size_t* bytes_written, int* error) {
679 size_t available = buffer_length_ - seek_position_;
680 if (0 == available) {
681 // Increase buffer size to the larger of:
682 // a) new position rounded up to next 256 bytes
683 // b) double the previous length
684 size_t new_buffer_length = _max(((seek_position_ + bytes) | 0xFF) + 1,
685 buffer_length_ * 2);
686 StreamResult result = DoReserve(new_buffer_length, error);
687 if (SR_SUCCESS != result) {
688 return result;
689 }
690 ASSERT(buffer_length_ >= new_buffer_length);
691 available = buffer_length_ - seek_position_;
692 }
693
694 if (bytes > available) {
695 bytes = available;
696 }
697 memcpy(&buffer_[seek_position_], buffer, bytes);
698 seek_position_ += bytes;
699 if (data_length_ < seek_position_) {
700 data_length_ = seek_position_;
701 }
702 if (bytes_written) {
703 *bytes_written = bytes;
704 }
705 return SR_SUCCESS;
706}
707
708void MemoryStreamBase::Close() {
709 // nothing to do
710}
711
712bool MemoryStreamBase::SetPosition(size_t position) {
713 if (position > data_length_)
714 return false;
715 seek_position_ = position;
716 return true;
717}
718
719bool MemoryStreamBase::GetPosition(size_t* position) const {
720 if (position)
721 *position = seek_position_;
722 return true;
723}
724
725bool MemoryStreamBase::GetSize(size_t* size) const {
726 if (size)
727 *size = data_length_;
728 return true;
729}
730
731bool MemoryStreamBase::GetAvailable(size_t* size) const {
732 if (size)
733 *size = data_length_ - seek_position_;
734 return true;
735}
736
737bool MemoryStreamBase::ReserveSize(size_t size) {
738 return (SR_SUCCESS == DoReserve(size, NULL));
739}
740
741StreamResult MemoryStreamBase::DoReserve(size_t size, int* error) {
742 return (buffer_length_ >= size) ? SR_SUCCESS : SR_EOS;
743}
744
745///////////////////////////////////////////////////////////////////////////////
746
747MemoryStream::MemoryStream()
748 : buffer_alloc_(NULL) {
749}
750
751MemoryStream::MemoryStream(const char* data)
752 : buffer_alloc_(NULL) {
753 SetData(data, strlen(data));
754}
755
756MemoryStream::MemoryStream(const void* data, size_t length)
757 : buffer_alloc_(NULL) {
758 SetData(data, length);
759}
760
761MemoryStream::~MemoryStream() {
762 delete [] buffer_alloc_;
763}
764
765void MemoryStream::SetData(const void* data, size_t length) {
766 data_length_ = buffer_length_ = length;
767 delete [] buffer_alloc_;
768 buffer_alloc_ = new char[buffer_length_ + kAlignment];
769 buffer_ = reinterpret_cast<char*>(ALIGNP(buffer_alloc_, kAlignment));
770 memcpy(buffer_, data, data_length_);
771 seek_position_ = 0;
772}
773
774StreamResult MemoryStream::DoReserve(size_t size, int* error) {
775 if (buffer_length_ >= size)
776 return SR_SUCCESS;
777
778 if (char* new_buffer_alloc = new char[size + kAlignment]) {
779 char* new_buffer = reinterpret_cast<char*>(
780 ALIGNP(new_buffer_alloc, kAlignment));
781 memcpy(new_buffer, buffer_, data_length_);
782 delete [] buffer_alloc_;
783 buffer_alloc_ = new_buffer_alloc;
784 buffer_ = new_buffer;
785 buffer_length_ = size;
786 return SR_SUCCESS;
787 }
788
789 if (error) {
790 *error = ENOMEM;
791 }
792 return SR_ERROR;
793}
794
795///////////////////////////////////////////////////////////////////////////////
796
797ExternalMemoryStream::ExternalMemoryStream() {
798}
799
800ExternalMemoryStream::ExternalMemoryStream(void* data, size_t length) {
801 SetData(data, length);
802}
803
804ExternalMemoryStream::~ExternalMemoryStream() {
805}
806
807void ExternalMemoryStream::SetData(void* data, size_t length) {
808 data_length_ = buffer_length_ = length;
809 buffer_ = static_cast<char*>(data);
810 seek_position_ = 0;
811}
812
813///////////////////////////////////////////////////////////////////////////////
814// FifoBuffer
815///////////////////////////////////////////////////////////////////////////////
816
817FifoBuffer::FifoBuffer(size_t size)
818 : state_(SS_OPEN), buffer_(new char[size]), buffer_length_(size),
819 data_length_(0), read_position_(0), owner_(Thread::Current()) {
820 // all events are done on the owner_ thread
821}
822
823FifoBuffer::FifoBuffer(size_t size, Thread* owner)
824 : state_(SS_OPEN), buffer_(new char[size]), buffer_length_(size),
825 data_length_(0), read_position_(0), owner_(owner) {
826 // all events are done on the owner_ thread
827}
828
829FifoBuffer::~FifoBuffer() {
830}
831
832bool FifoBuffer::GetBuffered(size_t* size) const {
833 CritScope cs(&crit_);
834 *size = data_length_;
835 return true;
836}
837
838bool FifoBuffer::SetCapacity(size_t size) {
839 CritScope cs(&crit_);
840 if (data_length_ > size) {
841 return false;
842 }
843
844 if (size != buffer_length_) {
845 char* buffer = new char[size];
846 const size_t copy = data_length_;
847 const size_t tail_copy = _min(copy, buffer_length_ - read_position_);
848 memcpy(buffer, &buffer_[read_position_], tail_copy);
849 memcpy(buffer + tail_copy, &buffer_[0], copy - tail_copy);
850 buffer_.reset(buffer);
851 read_position_ = 0;
852 buffer_length_ = size;
853 }
854 return true;
855}
856
857StreamResult FifoBuffer::ReadOffset(void* buffer, size_t bytes,
858 size_t offset, size_t* bytes_read) {
859 CritScope cs(&crit_);
860 return ReadOffsetLocked(buffer, bytes, offset, bytes_read);
861}
862
863StreamResult FifoBuffer::WriteOffset(const void* buffer, size_t bytes,
864 size_t offset, size_t* bytes_written) {
865 CritScope cs(&crit_);
866 return WriteOffsetLocked(buffer, bytes, offset, bytes_written);
867}
868
869StreamState FifoBuffer::GetState() const {
870 return state_;
871}
872
873StreamResult FifoBuffer::Read(void* buffer, size_t bytes,
874 size_t* bytes_read, int* error) {
875 CritScope cs(&crit_);
876 const bool was_writable = data_length_ < buffer_length_;
877 size_t copy = 0;
878 StreamResult result = ReadOffsetLocked(buffer, bytes, 0, &copy);
879
880 if (result == SR_SUCCESS) {
881 // If read was successful then adjust the read position and number of
882 // bytes buffered.
883 read_position_ = (read_position_ + copy) % buffer_length_;
884 data_length_ -= copy;
885 if (bytes_read) {
886 *bytes_read = copy;
887 }
888
889 // if we were full before, and now we're not, post an event
890 if (!was_writable && copy > 0) {
891 PostEvent(owner_, SE_WRITE, 0);
892 }
893 }
894 return result;
895}
896
897StreamResult FifoBuffer::Write(const void* buffer, size_t bytes,
898 size_t* bytes_written, int* error) {
899 CritScope cs(&crit_);
900
901 const bool was_readable = (data_length_ > 0);
902 size_t copy = 0;
903 StreamResult result = WriteOffsetLocked(buffer, bytes, 0, &copy);
904
905 if (result == SR_SUCCESS) {
906 // If write was successful then adjust the number of readable bytes.
907 data_length_ += copy;
908 if (bytes_written) {
909 *bytes_written = copy;
910 }
911
912 // if we didn't have any data to read before, and now we do, post an event
913 if (!was_readable && copy > 0) {
914 PostEvent(owner_, SE_READ, 0);
915 }
916 }
917 return result;
918}
919
920void FifoBuffer::Close() {
921 CritScope cs(&crit_);
922 state_ = SS_CLOSED;
923}
924
925const void* FifoBuffer::GetReadData(size_t* size) {
926 CritScope cs(&crit_);
927 *size = (read_position_ + data_length_ <= buffer_length_) ?
928 data_length_ : buffer_length_ - read_position_;
929 return &buffer_[read_position_];
930}
931
932void FifoBuffer::ConsumeReadData(size_t size) {
933 CritScope cs(&crit_);
934 ASSERT(size <= data_length_);
935 const bool was_writable = data_length_ < buffer_length_;
936 read_position_ = (read_position_ + size) % buffer_length_;
937 data_length_ -= size;
938 if (!was_writable && size > 0) {
939 PostEvent(owner_, SE_WRITE, 0);
940 }
941}
942
943void* FifoBuffer::GetWriteBuffer(size_t* size) {
944 CritScope cs(&crit_);
945 if (state_ == SS_CLOSED) {
946 return NULL;
947 }
948
949 // if empty, reset the write position to the beginning, so we can get
950 // the biggest possible block
951 if (data_length_ == 0) {
952 read_position_ = 0;
953 }
954
955 const size_t write_position = (read_position_ + data_length_)
956 % buffer_length_;
957 *size = (write_position > read_position_ || data_length_ == 0) ?
958 buffer_length_ - write_position : read_position_ - write_position;
959 return &buffer_[write_position];
960}
961
962void FifoBuffer::ConsumeWriteBuffer(size_t size) {
963 CritScope cs(&crit_);
964 ASSERT(size <= buffer_length_ - data_length_);
965 const bool was_readable = (data_length_ > 0);
966 data_length_ += size;
967 if (!was_readable && size > 0) {
968 PostEvent(owner_, SE_READ, 0);
969 }
970}
971
972bool FifoBuffer::GetWriteRemaining(size_t* size) const {
973 CritScope cs(&crit_);
974 *size = buffer_length_ - data_length_;
975 return true;
976}
977
978StreamResult FifoBuffer::ReadOffsetLocked(void* buffer,
979 size_t bytes,
980 size_t offset,
981 size_t* bytes_read) {
982 if (offset >= data_length_) {
983 return (state_ != SS_CLOSED) ? SR_BLOCK : SR_EOS;
984 }
985
986 const size_t available = data_length_ - offset;
987 const size_t read_position = (read_position_ + offset) % buffer_length_;
988 const size_t copy = _min(bytes, available);
989 const size_t tail_copy = _min(copy, buffer_length_ - read_position);
990 char* const p = static_cast<char*>(buffer);
991 memcpy(p, &buffer_[read_position], tail_copy);
992 memcpy(p + tail_copy, &buffer_[0], copy - tail_copy);
993
994 if (bytes_read) {
995 *bytes_read = copy;
996 }
997 return SR_SUCCESS;
998}
999
1000StreamResult FifoBuffer::WriteOffsetLocked(const void* buffer,
1001 size_t bytes,
1002 size_t offset,
1003 size_t* bytes_written) {
1004 if (state_ == SS_CLOSED) {
1005 return SR_EOS;
1006 }
1007
1008 if (data_length_ + offset >= buffer_length_) {
1009 return SR_BLOCK;
1010 }
1011
1012 const size_t available = buffer_length_ - data_length_ - offset;
1013 const size_t write_position = (read_position_ + data_length_ + offset)
1014 % buffer_length_;
1015 const size_t copy = _min(bytes, available);
1016 const size_t tail_copy = _min(copy, buffer_length_ - write_position);
1017 const char* const p = static_cast<const char*>(buffer);
1018 memcpy(&buffer_[write_position], p, tail_copy);
1019 memcpy(&buffer_[0], p + tail_copy, copy - tail_copy);
1020
1021 if (bytes_written) {
1022 *bytes_written = copy;
1023 }
1024 return SR_SUCCESS;
1025}
1026
1027
1028
1029///////////////////////////////////////////////////////////////////////////////
1030// LoggingAdapter
1031///////////////////////////////////////////////////////////////////////////////
1032
1033LoggingAdapter::LoggingAdapter(StreamInterface* stream, LoggingSeverity level,
1034 const std::string& label, bool hex_mode)
1035 : StreamAdapterInterface(stream), level_(level), hex_mode_(hex_mode) {
1036 set_label(label);
1037}
1038
1039void LoggingAdapter::set_label(const std::string& label) {
1040 label_.assign("[");
1041 label_.append(label);
1042 label_.append("]");
1043}
1044
1045StreamResult LoggingAdapter::Read(void* buffer, size_t buffer_len,
1046 size_t* read, int* error) {
1047 size_t local_read; if (!read) read = &local_read;
1048 StreamResult result = StreamAdapterInterface::Read(buffer, buffer_len, read,
1049 error);
1050 if (result == SR_SUCCESS) {
1051 LogMultiline(level_, label_.c_str(), true, buffer, *read, hex_mode_, &lms_);
1052 }
1053 return result;
1054}
1055
1056StreamResult LoggingAdapter::Write(const void* data, size_t data_len,
1057 size_t* written, int* error) {
1058 size_t local_written;
1059 if (!written) written = &local_written;
1060 StreamResult result = StreamAdapterInterface::Write(data, data_len, written,
1061 error);
1062 if (result == SR_SUCCESS) {
1063 LogMultiline(level_, label_.c_str(), false, data, *written, hex_mode_,
1064 &lms_);
1065 }
1066 return result;
1067}
1068
1069void LoggingAdapter::Close() {
1070 LogMultiline(level_, label_.c_str(), false, NULL, 0, hex_mode_, &lms_);
1071 LogMultiline(level_, label_.c_str(), true, NULL, 0, hex_mode_, &lms_);
1072 LOG_V(level_) << label_ << " Closed locally";
1073 StreamAdapterInterface::Close();
1074}
1075
1076void LoggingAdapter::OnEvent(StreamInterface* stream, int events, int err) {
1077 if (events & SE_OPEN) {
1078 LOG_V(level_) << label_ << " Open";
1079 } else if (events & SE_CLOSE) {
1080 LogMultiline(level_, label_.c_str(), false, NULL, 0, hex_mode_, &lms_);
1081 LogMultiline(level_, label_.c_str(), true, NULL, 0, hex_mode_, &lms_);
1082 LOG_V(level_) << label_ << " Closed with error: " << err;
1083 }
1084 StreamAdapterInterface::OnEvent(stream, events, err);
1085}
1086
1087///////////////////////////////////////////////////////////////////////////////
1088// StringStream - Reads/Writes to an external std::string
1089///////////////////////////////////////////////////////////////////////////////
1090
1091StringStream::StringStream(std::string& str)
1092 : str_(str), read_pos_(0), read_only_(false) {
1093}
1094
1095StringStream::StringStream(const std::string& str)
1096 : str_(const_cast<std::string&>(str)), read_pos_(0), read_only_(true) {
1097}
1098
1099StreamState StringStream::GetState() const {
1100 return SS_OPEN;
1101}
1102
1103StreamResult StringStream::Read(void* buffer, size_t buffer_len,
1104 size_t* read, int* error) {
1105 size_t available = _min(buffer_len, str_.size() - read_pos_);
1106 if (!available)
1107 return SR_EOS;
1108 memcpy(buffer, str_.data() + read_pos_, available);
1109 read_pos_ += available;
1110 if (read)
1111 *read = available;
1112 return SR_SUCCESS;
1113}
1114
1115StreamResult StringStream::Write(const void* data, size_t data_len,
1116 size_t* written, int* error) {
1117 if (read_only_) {
1118 if (error) {
1119 *error = -1;
1120 }
1121 return SR_ERROR;
1122 }
1123 str_.append(static_cast<const char*>(data),
1124 static_cast<const char*>(data) + data_len);
1125 if (written)
1126 *written = data_len;
1127 return SR_SUCCESS;
1128}
1129
1130void StringStream::Close() {
1131}
1132
1133bool StringStream::SetPosition(size_t position) {
1134 if (position > str_.size())
1135 return false;
1136 read_pos_ = position;
1137 return true;
1138}
1139
1140bool StringStream::GetPosition(size_t* position) const {
1141 if (position)
1142 *position = read_pos_;
1143 return true;
1144}
1145
1146bool StringStream::GetSize(size_t* size) const {
1147 if (size)
1148 *size = str_.size();
1149 return true;
1150}
1151
1152bool StringStream::GetAvailable(size_t* size) const {
1153 if (size)
1154 *size = str_.size() - read_pos_;
1155 return true;
1156}
1157
1158bool StringStream::ReserveSize(size_t size) {
1159 if (read_only_)
1160 return false;
1161 str_.reserve(size);
1162 return true;
1163}
1164
1165///////////////////////////////////////////////////////////////////////////////
1166// StreamReference
1167///////////////////////////////////////////////////////////////////////////////
1168
1169StreamReference::StreamReference(StreamInterface* stream)
1170 : StreamAdapterInterface(stream, false) {
1171 // owner set to false so the destructor does not free the stream.
1172 stream_ref_count_ = new StreamRefCount(stream);
1173}
1174
1175StreamInterface* StreamReference::NewReference() {
1176 stream_ref_count_->AddReference();
1177 return new StreamReference(stream_ref_count_, stream());
1178}
1179
1180StreamReference::~StreamReference() {
1181 stream_ref_count_->Release();
1182}
1183
1184StreamReference::StreamReference(StreamRefCount* stream_ref_count,
1185 StreamInterface* stream)
1186 : StreamAdapterInterface(stream, false),
1187 stream_ref_count_(stream_ref_count) {
1188}
1189
1190///////////////////////////////////////////////////////////////////////////////
1191
1192StreamResult Flow(StreamInterface* source,
1193 char* buffer, size_t buffer_len,
1194 StreamInterface* sink,
1195 size_t* data_len /* = NULL */) {
1196 ASSERT(buffer_len > 0);
1197
1198 StreamResult result;
1199 size_t count, read_pos, write_pos;
1200 if (data_len) {
1201 read_pos = *data_len;
1202 } else {
1203 read_pos = 0;
1204 }
1205
1206 bool end_of_stream = false;
1207 do {
1208 // Read until buffer is full, end of stream, or error
1209 while (!end_of_stream && (read_pos < buffer_len)) {
1210 result = source->Read(buffer + read_pos, buffer_len - read_pos,
1211 &count, NULL);
1212 if (result == SR_EOS) {
1213 end_of_stream = true;
1214 } else if (result != SR_SUCCESS) {
1215 if (data_len) {
1216 *data_len = read_pos;
1217 }
1218 return result;
1219 } else {
1220 read_pos += count;
1221 }
1222 }
1223
1224 // Write until buffer is empty, or error (including end of stream)
1225 write_pos = 0;
1226 while (write_pos < read_pos) {
1227 result = sink->Write(buffer + write_pos, read_pos - write_pos,
1228 &count, NULL);
1229 if (result != SR_SUCCESS) {
1230 if (data_len) {
1231 *data_len = read_pos - write_pos;
1232 if (write_pos > 0) {
1233 memmove(buffer, buffer + write_pos, *data_len);
1234 }
1235 }
1236 return result;
1237 }
1238 write_pos += count;
1239 }
1240
1241 read_pos = 0;
1242 } while (!end_of_stream);
1243
1244 if (data_len) {
1245 *data_len = 0;
1246 }
1247 return SR_SUCCESS;
1248}
1249
1250///////////////////////////////////////////////////////////////////////////////
1251
1252} // namespace talk_base