Move FifoBuffer to its own file and build target
Used only by test code and by pseudo_tcp.
Bug: webrtc:6424
Change-Id: I28903e74f7b69cbdd8c368f4444c8a233eb76868
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/128868
Reviewed-by: Karl Wiberg <kwiberg@webrtc.org>
Reviewed-by: Mirko Bonadei <mbonadei@webrtc.org>
Commit-Queue: Niels Moller <nisse@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#27341}
diff --git a/p2p/BUILD.gn b/p2p/BUILD.gn
index 83b0799..ce39958 100644
--- a/p2p/BUILD.gn
+++ b/p2p/BUILD.gn
@@ -92,8 +92,11 @@
"../logging:rtc_event_log_api",
"../rtc_base",
"../rtc_base:checks",
+
+ # Needed by pseudo_tcp, which should move to a separate target.
"../rtc_base:safe_minmax",
"../rtc_base:weak_ptr",
+ "../rtc_base/memory:fifo_buffer",
"../rtc_base/network:sent_packet",
"../rtc_base/system:rtc_export",
"../rtc_base/third_party/base64",
diff --git a/p2p/base/pseudo_tcp.h b/p2p/base/pseudo_tcp.h
index 4849c2a..375be3b 100644
--- a/p2p/base/pseudo_tcp.h
+++ b/p2p/base/pseudo_tcp.h
@@ -15,7 +15,7 @@
#include <stdint.h>
#include <list>
-#include "rtc_base/stream.h"
+#include "rtc_base/memory/fifo_buffer.h"
#include "rtc_base/system/rtc_export.h"
namespace cricket {
diff --git a/rtc_base/BUILD.gn b/rtc_base/BUILD.gn
index 46c7138..e229042 100644
--- a/rtc_base/BUILD.gn
+++ b/rtc_base/BUILD.gn
@@ -1106,6 +1106,7 @@
":checks",
":rtc_base",
"../api/units:time_delta",
+ "memory:fifo_buffer",
"third_party/sigslot",
"//third_party/abseil-cpp/absl/algorithm:container",
"//third_party/abseil-cpp/absl/memory",
@@ -1383,7 +1384,6 @@
"rtc_certificate_unittest.cc",
"signal_thread_unittest.cc",
"sigslot_tester_unittest.cc",
- "stream_unittest.cc",
"test_client_unittest.cc",
"thread_unittest.cc",
"unique_id_generator_unittest.cc",
@@ -1416,6 +1416,7 @@
"../test:field_trial",
"../test:fileutils",
"../test:test_support",
+ "memory:fifo_buffer",
"synchronization:synchronization_unittests",
"third_party/sigslot",
"//third_party/abseil-cpp/absl/algorithm:container",
diff --git a/rtc_base/memory/BUILD.gn b/rtc_base/memory/BUILD.gn
index 05fd7ab..d151557 100644
--- a/rtc_base/memory/BUILD.gn
+++ b/rtc_base/memory/BUILD.gn
@@ -30,15 +30,36 @@
deps = []
}
+rtc_source_set("fifo_buffer") {
+ visibility = [
+ "../../p2p:rtc_p2p",
+ "..:rtc_base_tests_utils",
+ "..:rtc_base_unittests",
+ ":unittests",
+ ]
+ sources = [
+ "fifo_buffer.cc",
+ "fifo_buffer.h",
+ ]
+ deps = [
+ "..:rtc_base",
+ ]
+ if (is_nacl) {
+ deps += [ "//native_client_sdk/src/libraries/nacl_io" ]
+ }
+}
+
rtc_source_set("unittests") {
testonly = true
sources = [
"aligned_array_unittest.cc",
"aligned_malloc_unittest.cc",
+ "fifo_buffer_unittest.cc",
]
deps = [
":aligned_array",
":aligned_malloc",
+ ":fifo_buffer",
"../../test:test_support",
]
}
diff --git a/rtc_base/memory/fifo_buffer.cc b/rtc_base/memory/fifo_buffer.cc
new file mode 100644
index 0000000..44fb032
--- /dev/null
+++ b/rtc_base/memory/fifo_buffer.cc
@@ -0,0 +1,247 @@
+/*
+ * Copyright 2019 The WebRTC Project Authors. All rights reserved.
+ *
+ * Use of this source code is governed by a BSD-style license
+ * that can be found in the LICENSE file in the root of the source
+ * tree. An additional intellectual property rights grant can be found
+ * in the file PATENTS. All contributing project authors may
+ * be found in the AUTHORS file in the root of the source tree.
+ */
+
+#include "rtc_base/memory/fifo_buffer.h"
+
+#include <algorithm>
+
+#include "rtc_base/thread.h"
+
+namespace rtc {
+
+FifoBuffer::FifoBuffer(size_t size)
+ : state_(SS_OPEN),
+ buffer_(new char[size]),
+ buffer_length_(size),
+ data_length_(0),
+ read_position_(0),
+ owner_(Thread::Current()) {
+ // all events are done on the owner_ thread
+}
+
+FifoBuffer::FifoBuffer(size_t size, Thread* owner)
+ : state_(SS_OPEN),
+ buffer_(new char[size]),
+ buffer_length_(size),
+ data_length_(0),
+ read_position_(0),
+ owner_(owner) {
+ // all events are done on the owner_ thread
+}
+
+FifoBuffer::~FifoBuffer() {}
+
+bool FifoBuffer::GetBuffered(size_t* size) const {
+ CritScope cs(&crit_);
+ *size = data_length_;
+ return true;
+}
+
+bool FifoBuffer::SetCapacity(size_t size) {
+ CritScope cs(&crit_);
+ if (data_length_ > size) {
+ return false;
+ }
+
+ if (size != buffer_length_) {
+ char* buffer = new char[size];
+ const size_t copy = data_length_;
+ const size_t tail_copy = std::min(copy, buffer_length_ - read_position_);
+ memcpy(buffer, &buffer_[read_position_], tail_copy);
+ memcpy(buffer + tail_copy, &buffer_[0], copy - tail_copy);
+ buffer_.reset(buffer);
+ read_position_ = 0;
+ buffer_length_ = size;
+ }
+ return true;
+}
+
+StreamResult FifoBuffer::ReadOffset(void* buffer,
+ size_t bytes,
+ size_t offset,
+ size_t* bytes_read) {
+ CritScope cs(&crit_);
+ return ReadOffsetLocked(buffer, bytes, offset, bytes_read);
+}
+
+StreamResult FifoBuffer::WriteOffset(const void* buffer,
+ size_t bytes,
+ size_t offset,
+ size_t* bytes_written) {
+ CritScope cs(&crit_);
+ return WriteOffsetLocked(buffer, bytes, offset, bytes_written);
+}
+
+StreamState FifoBuffer::GetState() const {
+ CritScope cs(&crit_);
+ return state_;
+}
+
+StreamResult FifoBuffer::Read(void* buffer,
+ size_t bytes,
+ size_t* bytes_read,
+ int* error) {
+ CritScope cs(&crit_);
+ const bool was_writable = data_length_ < buffer_length_;
+ size_t copy = 0;
+ StreamResult result = ReadOffsetLocked(buffer, bytes, 0, ©);
+
+ if (result == SR_SUCCESS) {
+ // If read was successful then adjust the read position and number of
+ // bytes buffered.
+ read_position_ = (read_position_ + copy) % buffer_length_;
+ data_length_ -= copy;
+ if (bytes_read) {
+ *bytes_read = copy;
+ }
+
+ // if we were full before, and now we're not, post an event
+ if (!was_writable && copy > 0) {
+ PostEvent(owner_, SE_WRITE, 0);
+ }
+ }
+ return result;
+}
+
+StreamResult FifoBuffer::Write(const void* buffer,
+ size_t bytes,
+ size_t* bytes_written,
+ int* error) {
+ CritScope cs(&crit_);
+
+ const bool was_readable = (data_length_ > 0);
+ size_t copy = 0;
+ StreamResult result = WriteOffsetLocked(buffer, bytes, 0, ©);
+
+ if (result == SR_SUCCESS) {
+ // If write was successful then adjust the number of readable bytes.
+ data_length_ += copy;
+ if (bytes_written) {
+ *bytes_written = copy;
+ }
+
+ // if we didn't have any data to read before, and now we do, post an event
+ if (!was_readable && copy > 0) {
+ PostEvent(owner_, SE_READ, 0);
+ }
+ }
+ return result;
+}
+
+void FifoBuffer::Close() {
+ CritScope cs(&crit_);
+ state_ = SS_CLOSED;
+}
+
+const void* FifoBuffer::GetReadData(size_t* size) {
+ CritScope cs(&crit_);
+ *size = (read_position_ + data_length_ <= buffer_length_)
+ ? data_length_
+ : buffer_length_ - read_position_;
+ return &buffer_[read_position_];
+}
+
+void FifoBuffer::ConsumeReadData(size_t size) {
+ CritScope cs(&crit_);
+ RTC_DCHECK(size <= data_length_);
+ const bool was_writable = data_length_ < buffer_length_;
+ read_position_ = (read_position_ + size) % buffer_length_;
+ data_length_ -= size;
+ if (!was_writable && size > 0) {
+ PostEvent(owner_, SE_WRITE, 0);
+ }
+}
+
+void* FifoBuffer::GetWriteBuffer(size_t* size) {
+ CritScope cs(&crit_);
+ if (state_ == SS_CLOSED) {
+ return nullptr;
+ }
+
+ // if empty, reset the write position to the beginning, so we can get
+ // the biggest possible block
+ if (data_length_ == 0) {
+ read_position_ = 0;
+ }
+
+ const size_t write_position =
+ (read_position_ + data_length_) % buffer_length_;
+ *size = (write_position > read_position_ || data_length_ == 0)
+ ? buffer_length_ - write_position
+ : read_position_ - write_position;
+ return &buffer_[write_position];
+}
+
+void FifoBuffer::ConsumeWriteBuffer(size_t size) {
+ CritScope cs(&crit_);
+ RTC_DCHECK(size <= buffer_length_ - data_length_);
+ const bool was_readable = (data_length_ > 0);
+ data_length_ += size;
+ if (!was_readable && size > 0) {
+ PostEvent(owner_, SE_READ, 0);
+ }
+}
+
+bool FifoBuffer::GetWriteRemaining(size_t* size) const {
+ CritScope cs(&crit_);
+ *size = buffer_length_ - data_length_;
+ return true;
+}
+
+StreamResult FifoBuffer::ReadOffsetLocked(void* buffer,
+ size_t bytes,
+ size_t offset,
+ size_t* bytes_read) {
+ if (offset >= data_length_) {
+ return (state_ != SS_CLOSED) ? SR_BLOCK : SR_EOS;
+ }
+
+ const size_t available = data_length_ - offset;
+ const size_t read_position = (read_position_ + offset) % buffer_length_;
+ const size_t copy = std::min(bytes, available);
+ const size_t tail_copy = std::min(copy, buffer_length_ - read_position);
+ char* const p = static_cast<char*>(buffer);
+ memcpy(p, &buffer_[read_position], tail_copy);
+ memcpy(p + tail_copy, &buffer_[0], copy - tail_copy);
+
+ if (bytes_read) {
+ *bytes_read = copy;
+ }
+ return SR_SUCCESS;
+}
+
+StreamResult FifoBuffer::WriteOffsetLocked(const void* buffer,
+ size_t bytes,
+ size_t offset,
+ size_t* bytes_written) {
+ if (state_ == SS_CLOSED) {
+ return SR_EOS;
+ }
+
+ if (data_length_ + offset >= buffer_length_) {
+ return SR_BLOCK;
+ }
+
+ const size_t available = buffer_length_ - data_length_ - offset;
+ const size_t write_position =
+ (read_position_ + data_length_ + offset) % buffer_length_;
+ const size_t copy = std::min(bytes, available);
+ const size_t tail_copy = std::min(copy, buffer_length_ - write_position);
+ const char* const p = static_cast<const char*>(buffer);
+ memcpy(&buffer_[write_position], p, tail_copy);
+ memcpy(&buffer_[0], p + tail_copy, copy - tail_copy);
+
+ if (bytes_written) {
+ *bytes_written = copy;
+ }
+ return SR_SUCCESS;
+}
+
+} // namespace rtc
diff --git a/rtc_base/memory/fifo_buffer.h b/rtc_base/memory/fifo_buffer.h
new file mode 100644
index 0000000..f859815
--- /dev/null
+++ b/rtc_base/memory/fifo_buffer.h
@@ -0,0 +1,135 @@
+/*
+ * Copyright 2019 The WebRTC Project Authors. All rights reserved.
+ *
+ * Use of this source code is governed by a BSD-style license
+ * that can be found in the LICENSE file in the root of the source
+ * tree. An additional intellectual property rights grant can be found
+ * in the file PATENTS. All contributing project authors may
+ * be found in the AUTHORS file in the root of the source tree.
+ */
+
+#ifndef RTC_BASE_MEMORY_FIFO_BUFFER_H_
+#define RTC_BASE_MEMORY_FIFO_BUFFER_H_
+
+#include <memory>
+
+#include "rtc_base/stream.h"
+
+namespace rtc {
+
+// FifoBuffer allows for efficient, thread-safe buffering of data between
+// writer and reader.
+class FifoBuffer final : public StreamInterface {
+ public:
+ // Creates a FIFO buffer with the specified capacity.
+ explicit FifoBuffer(size_t length);
+ // Creates a FIFO buffer with the specified capacity and owner
+ FifoBuffer(size_t length, Thread* owner);
+ ~FifoBuffer() override;
+ // Gets the amount of data currently readable from the buffer.
+ bool GetBuffered(size_t* data_len) const;
+ // Resizes the buffer to the specified capacity. Fails if data_length_ > size
+ bool SetCapacity(size_t length);
+
+ // Read into |buffer| with an offset from the current read position, offset
+ // is specified in number of bytes.
+ // This method doesn't adjust read position nor the number of available
+ // bytes, user has to call ConsumeReadData() to do this.
+ StreamResult ReadOffset(void* buffer,
+ size_t bytes,
+ size_t offset,
+ size_t* bytes_read);
+
+ // Write |buffer| with an offset from the current write position, offset is
+ // specified in number of bytes.
+ // This method doesn't adjust the number of buffered bytes, user has to call
+ // ConsumeWriteBuffer() to do this.
+ StreamResult WriteOffset(const void* buffer,
+ size_t bytes,
+ size_t offset,
+ size_t* bytes_written);
+
+ // StreamInterface methods
+ StreamState GetState() const override;
+ StreamResult Read(void* buffer,
+ size_t bytes,
+ size_t* bytes_read,
+ int* error) override;
+ StreamResult Write(const void* buffer,
+ size_t bytes,
+ size_t* bytes_written,
+ int* error) override;
+ void Close() override;
+
+ // Seek to a byte offset from the beginning of the stream. Returns false if
+ // the stream does not support seeking, or cannot seek to the specified
+ // position.
+ bool SetPosition(size_t position);
+
+ // Get the byte offset of the current position from the start of the stream.
+ // Returns false if the position is not known.
+ bool GetPosition(size_t* position) const;
+
+ // Seek to the start of the stream.
+ bool Rewind() { return SetPosition(0); }
+
+ // GetReadData returns a pointer to a buffer which is owned by the stream.
+ // The buffer contains data_len bytes. null is returned if no data is
+ // available, or if the method fails. If the caller processes the data, it
+ // must call ConsumeReadData with the number of processed bytes. GetReadData
+ // does not require a matching call to ConsumeReadData if the data is not
+ // processed. Read and ConsumeReadData invalidate the buffer returned by
+ // GetReadData.
+ const void* GetReadData(size_t* data_len);
+ void ConsumeReadData(size_t used);
+ // GetWriteBuffer returns a pointer to a buffer which is owned by the stream.
+ // The buffer has a capacity of buf_len bytes. null is returned if there is
+ // no buffer available, or if the method fails. The call may write data to
+ // the buffer, and then call ConsumeWriteBuffer with the number of bytes
+ // written. GetWriteBuffer does not require a matching call to
+ // ConsumeWriteData if no data is written. Write and
+ // ConsumeWriteData invalidate the buffer returned by GetWriteBuffer.
+ void* GetWriteBuffer(size_t* buf_len);
+ void ConsumeWriteBuffer(size_t used);
+
+ // Return the number of Write()-able bytes remaining before end-of-stream.
+ // Returns false if not known.
+ bool GetWriteRemaining(size_t* size) const;
+
+ private:
+ // Helper method that implements ReadOffset. Caller must acquire a lock
+ // when calling this method.
+ StreamResult ReadOffsetLocked(void* buffer,
+ size_t bytes,
+ size_t offset,
+ size_t* bytes_read)
+ RTC_EXCLUSIVE_LOCKS_REQUIRED(crit_);
+
+ // Helper method that implements WriteOffset. Caller must acquire a lock
+ // when calling this method.
+ StreamResult WriteOffsetLocked(const void* buffer,
+ size_t bytes,
+ size_t offset,
+ size_t* bytes_written)
+ RTC_EXCLUSIVE_LOCKS_REQUIRED(crit_);
+
+ // keeps the opened/closed state of the stream
+ StreamState state_ RTC_GUARDED_BY(crit_);
+ // the allocated buffer
+ std::unique_ptr<char[]> buffer_ RTC_GUARDED_BY(crit_);
+ // size of the allocated buffer
+ size_t buffer_length_ RTC_GUARDED_BY(crit_);
+ // amount of readable data in the buffer
+ size_t data_length_ RTC_GUARDED_BY(crit_);
+ // offset to the readable data
+ size_t read_position_ RTC_GUARDED_BY(crit_);
+ // stream callbacks are dispatched on this thread
+ Thread* owner_;
+ // object lock
+ CriticalSection crit_;
+ RTC_DISALLOW_COPY_AND_ASSIGN(FifoBuffer);
+};
+
+} // namespace rtc
+
+#endif // RTC_BASE_MEMORY_FIFO_BUFFER_H_
diff --git a/rtc_base/stream_unittest.cc b/rtc_base/memory/fifo_buffer_unittest.cc
similarity index 88%
rename from rtc_base/stream_unittest.cc
rename to rtc_base/memory/fifo_buffer_unittest.cc
index bd6e84f..c2926b3 100644
--- a/rtc_base/stream_unittest.cc
+++ b/rtc_base/memory/fifo_buffer_unittest.cc
@@ -8,65 +8,13 @@
* be found in the AUTHORS file in the root of the source tree.
*/
-#include "rtc_base/stream.h"
-
#include <string.h>
+#include "rtc_base/memory/fifo_buffer.h"
#include "test/gtest.h"
namespace rtc {
-///////////////////////////////////////////////////////////////////////////////
-// TestStream
-///////////////////////////////////////////////////////////////////////////////
-
-class TestStream : public StreamInterface {
- public:
- TestStream() : pos_(0) {}
-
- StreamState GetState() const override { return SS_OPEN; }
-
- StreamResult Read(void* buffer,
- size_t buffer_len,
- size_t* read,
- int* error) override {
- unsigned char* uc_buffer = static_cast<unsigned char*>(buffer);
- for (size_t i = 0; i < buffer_len; ++i) {
- uc_buffer[i] = static_cast<unsigned char>(pos_++);
- }
- if (read)
- *read = buffer_len;
- return SR_SUCCESS;
- }
-
- StreamResult Write(const void* data,
- size_t data_len,
- size_t* written,
- int* error) override {
- if (error)
- *error = -1;
- return SR_ERROR;
- }
-
- void Close() override {}
-
- private:
- size_t pos_;
-};
-
-bool VerifyTestBuffer(unsigned char* buffer, size_t len, unsigned char value) {
- bool passed = true;
- for (size_t i = 0; i < len; ++i) {
- if (buffer[i] != value++) {
- passed = false;
- break;
- }
- }
- // Ensure that we don't pass again without re-writing
- memset(buffer, 0, len);
- return passed;
-}
-
TEST(FifoBufferTest, TestAll) {
const size_t kSize = 16;
const char in[kSize * 2 + 1] = "0123456789ABCDEFGHIJKLMNOPQRSTUV";
diff --git a/rtc_base/proxy_server.h b/rtc_base/proxy_server.h
index c7e6078..139cc91 100644
--- a/rtc_base/proxy_server.h
+++ b/rtc_base/proxy_server.h
@@ -17,9 +17,9 @@
#include "absl/memory/memory.h"
#include "rtc_base/async_socket.h"
#include "rtc_base/constructor_magic.h"
+#include "rtc_base/memory/fifo_buffer.h"
#include "rtc_base/server_socket_adapters.h"
#include "rtc_base/socket_address.h"
-#include "rtc_base/stream.h"
namespace rtc {
diff --git a/rtc_base/ssl_stream_adapter_unittest.cc b/rtc_base/ssl_stream_adapter_unittest.cc
index abf9880..585d080 100644
--- a/rtc_base/ssl_stream_adapter_unittest.cc
+++ b/rtc_base/ssl_stream_adapter_unittest.cc
@@ -17,6 +17,7 @@
#include "rtc_base/checks.h"
#include "rtc_base/gunit.h"
#include "rtc_base/helpers.h"
+#include "rtc_base/memory/fifo_buffer.h"
#include "rtc_base/memory_stream.h"
#include "rtc_base/message_digest.h"
#include "rtc_base/ssl_adapter.h"
diff --git a/rtc_base/stream.cc b/rtc_base/stream.cc
index 6b510dd..4ad005e 100644
--- a/rtc_base/stream.cc
+++ b/rtc_base/stream.cc
@@ -267,236 +267,4 @@
fclose(file_);
}
-///////////////////////////////////////////////////////////////////////////////
-// FifoBuffer
-///////////////////////////////////////////////////////////////////////////////
-
-FifoBuffer::FifoBuffer(size_t size)
- : state_(SS_OPEN),
- buffer_(new char[size]),
- buffer_length_(size),
- data_length_(0),
- read_position_(0),
- owner_(Thread::Current()) {
- // all events are done on the owner_ thread
-}
-
-FifoBuffer::FifoBuffer(size_t size, Thread* owner)
- : state_(SS_OPEN),
- buffer_(new char[size]),
- buffer_length_(size),
- data_length_(0),
- read_position_(0),
- owner_(owner) {
- // all events are done on the owner_ thread
-}
-
-FifoBuffer::~FifoBuffer() {}
-
-bool FifoBuffer::GetBuffered(size_t* size) const {
- CritScope cs(&crit_);
- *size = data_length_;
- return true;
-}
-
-bool FifoBuffer::SetCapacity(size_t size) {
- CritScope cs(&crit_);
- if (data_length_ > size) {
- return false;
- }
-
- if (size != buffer_length_) {
- char* buffer = new char[size];
- const size_t copy = data_length_;
- const size_t tail_copy = std::min(copy, buffer_length_ - read_position_);
- memcpy(buffer, &buffer_[read_position_], tail_copy);
- memcpy(buffer + tail_copy, &buffer_[0], copy - tail_copy);
- buffer_.reset(buffer);
- read_position_ = 0;
- buffer_length_ = size;
- }
- return true;
-}
-
-StreamResult FifoBuffer::ReadOffset(void* buffer,
- size_t bytes,
- size_t offset,
- size_t* bytes_read) {
- CritScope cs(&crit_);
- return ReadOffsetLocked(buffer, bytes, offset, bytes_read);
-}
-
-StreamResult FifoBuffer::WriteOffset(const void* buffer,
- size_t bytes,
- size_t offset,
- size_t* bytes_written) {
- CritScope cs(&crit_);
- return WriteOffsetLocked(buffer, bytes, offset, bytes_written);
-}
-
-StreamState FifoBuffer::GetState() const {
- CritScope cs(&crit_);
- return state_;
-}
-
-StreamResult FifoBuffer::Read(void* buffer,
- size_t bytes,
- size_t* bytes_read,
- int* error) {
- CritScope cs(&crit_);
- const bool was_writable = data_length_ < buffer_length_;
- size_t copy = 0;
- StreamResult result = ReadOffsetLocked(buffer, bytes, 0, ©);
-
- if (result == SR_SUCCESS) {
- // If read was successful then adjust the read position and number of
- // bytes buffered.
- read_position_ = (read_position_ + copy) % buffer_length_;
- data_length_ -= copy;
- if (bytes_read) {
- *bytes_read = copy;
- }
-
- // if we were full before, and now we're not, post an event
- if (!was_writable && copy > 0) {
- PostEvent(owner_, SE_WRITE, 0);
- }
- }
- return result;
-}
-
-StreamResult FifoBuffer::Write(const void* buffer,
- size_t bytes,
- size_t* bytes_written,
- int* error) {
- CritScope cs(&crit_);
-
- const bool was_readable = (data_length_ > 0);
- size_t copy = 0;
- StreamResult result = WriteOffsetLocked(buffer, bytes, 0, ©);
-
- if (result == SR_SUCCESS) {
- // If write was successful then adjust the number of readable bytes.
- data_length_ += copy;
- if (bytes_written) {
- *bytes_written = copy;
- }
-
- // if we didn't have any data to read before, and now we do, post an event
- if (!was_readable && copy > 0) {
- PostEvent(owner_, SE_READ, 0);
- }
- }
- return result;
-}
-
-void FifoBuffer::Close() {
- CritScope cs(&crit_);
- state_ = SS_CLOSED;
-}
-
-const void* FifoBuffer::GetReadData(size_t* size) {
- CritScope cs(&crit_);
- *size = (read_position_ + data_length_ <= buffer_length_)
- ? data_length_
- : buffer_length_ - read_position_;
- return &buffer_[read_position_];
-}
-
-void FifoBuffer::ConsumeReadData(size_t size) {
- CritScope cs(&crit_);
- RTC_DCHECK(size <= data_length_);
- const bool was_writable = data_length_ < buffer_length_;
- read_position_ = (read_position_ + size) % buffer_length_;
- data_length_ -= size;
- if (!was_writable && size > 0) {
- PostEvent(owner_, SE_WRITE, 0);
- }
-}
-
-void* FifoBuffer::GetWriteBuffer(size_t* size) {
- CritScope cs(&crit_);
- if (state_ == SS_CLOSED) {
- return nullptr;
- }
-
- // if empty, reset the write position to the beginning, so we can get
- // the biggest possible block
- if (data_length_ == 0) {
- read_position_ = 0;
- }
-
- const size_t write_position =
- (read_position_ + data_length_) % buffer_length_;
- *size = (write_position > read_position_ || data_length_ == 0)
- ? buffer_length_ - write_position
- : read_position_ - write_position;
- return &buffer_[write_position];
-}
-
-void FifoBuffer::ConsumeWriteBuffer(size_t size) {
- CritScope cs(&crit_);
- RTC_DCHECK(size <= buffer_length_ - data_length_);
- const bool was_readable = (data_length_ > 0);
- data_length_ += size;
- if (!was_readable && size > 0) {
- PostEvent(owner_, SE_READ, 0);
- }
-}
-
-bool FifoBuffer::GetWriteRemaining(size_t* size) const {
- CritScope cs(&crit_);
- *size = buffer_length_ - data_length_;
- return true;
-}
-
-StreamResult FifoBuffer::ReadOffsetLocked(void* buffer,
- size_t bytes,
- size_t offset,
- size_t* bytes_read) {
- if (offset >= data_length_) {
- return (state_ != SS_CLOSED) ? SR_BLOCK : SR_EOS;
- }
-
- const size_t available = data_length_ - offset;
- const size_t read_position = (read_position_ + offset) % buffer_length_;
- const size_t copy = std::min(bytes, available);
- const size_t tail_copy = std::min(copy, buffer_length_ - read_position);
- char* const p = static_cast<char*>(buffer);
- memcpy(p, &buffer_[read_position], tail_copy);
- memcpy(p + tail_copy, &buffer_[0], copy - tail_copy);
-
- if (bytes_read) {
- *bytes_read = copy;
- }
- return SR_SUCCESS;
-}
-
-StreamResult FifoBuffer::WriteOffsetLocked(const void* buffer,
- size_t bytes,
- size_t offset,
- size_t* bytes_written) {
- if (state_ == SS_CLOSED) {
- return SR_EOS;
- }
-
- if (data_length_ + offset >= buffer_length_) {
- return SR_BLOCK;
- }
-
- const size_t available = buffer_length_ - data_length_ - offset;
- const size_t write_position =
- (read_position_ + data_length_ + offset) % buffer_length_;
- const size_t copy = std::min(bytes, available);
- const size_t tail_copy = std::min(copy, buffer_length_ - write_position);
- const char* const p = static_cast<const char*>(buffer);
- memcpy(&buffer_[write_position], p, tail_copy);
- memcpy(&buffer_[0], p + tail_copy, copy - tail_copy);
-
- if (bytes_written) {
- *bytes_written = copy;
- }
- return SR_SUCCESS;
-}
-
} // namespace rtc
diff --git a/rtc_base/stream.h b/rtc_base/stream.h
index 0609857..b535f16 100644
--- a/rtc_base/stream.h
+++ b/rtc_base/stream.h
@@ -225,121 +225,6 @@
RTC_DISALLOW_COPY_AND_ASSIGN(FileStream);
};
-// FifoBuffer allows for efficient, thread-safe buffering of data between
-// writer and reader. As the data can wrap around the end of the buffer,
-// MemoryStreamBase can't help us here.
-
-class FifoBuffer final : public StreamInterface {
- public:
- // Creates a FIFO buffer with the specified capacity.
- explicit FifoBuffer(size_t length);
- // Creates a FIFO buffer with the specified capacity and owner
- FifoBuffer(size_t length, Thread* owner);
- ~FifoBuffer() override;
- // Gets the amount of data currently readable from the buffer.
- bool GetBuffered(size_t* data_len) const;
- // Resizes the buffer to the specified capacity. Fails if data_length_ > size
- bool SetCapacity(size_t length);
-
- // Read into |buffer| with an offset from the current read position, offset
- // is specified in number of bytes.
- // This method doesn't adjust read position nor the number of available
- // bytes, user has to call ConsumeReadData() to do this.
- StreamResult ReadOffset(void* buffer,
- size_t bytes,
- size_t offset,
- size_t* bytes_read);
-
- // Write |buffer| with an offset from the current write position, offset is
- // specified in number of bytes.
- // This method doesn't adjust the number of buffered bytes, user has to call
- // ConsumeWriteBuffer() to do this.
- StreamResult WriteOffset(const void* buffer,
- size_t bytes,
- size_t offset,
- size_t* bytes_written);
-
- // StreamInterface methods
- StreamState GetState() const override;
- StreamResult Read(void* buffer,
- size_t bytes,
- size_t* bytes_read,
- int* error) override;
- StreamResult Write(const void* buffer,
- size_t bytes,
- size_t* bytes_written,
- int* error) override;
- void Close() override;
-
- // Seek to a byte offset from the beginning of the stream. Returns false if
- // the stream does not support seeking, or cannot seek to the specified
- // position.
- bool SetPosition(size_t position);
-
- // Get the byte offset of the current position from the start of the stream.
- // Returns false if the position is not known.
- bool GetPosition(size_t* position) const;
-
- // Seek to the start of the stream.
- bool Rewind() { return SetPosition(0); }
-
- // GetReadData returns a pointer to a buffer which is owned by the stream.
- // The buffer contains data_len bytes. null is returned if no data is
- // available, or if the method fails. If the caller processes the data, it
- // must call ConsumeReadData with the number of processed bytes. GetReadData
- // does not require a matching call to ConsumeReadData if the data is not
- // processed. Read and ConsumeReadData invalidate the buffer returned by
- // GetReadData.
- const void* GetReadData(size_t* data_len);
- void ConsumeReadData(size_t used);
- // GetWriteBuffer returns a pointer to a buffer which is owned by the stream.
- // The buffer has a capacity of buf_len bytes. null is returned if there is
- // no buffer available, or if the method fails. The call may write data to
- // the buffer, and then call ConsumeWriteBuffer with the number of bytes
- // written. GetWriteBuffer does not require a matching call to
- // ConsumeWriteData if no data is written. Write and
- // ConsumeWriteData invalidate the buffer returned by GetWriteBuffer.
- void* GetWriteBuffer(size_t* buf_len);
- void ConsumeWriteBuffer(size_t used);
-
- // Return the number of Write()-able bytes remaining before end-of-stream.
- // Returns false if not known.
- bool GetWriteRemaining(size_t* size) const;
-
- private:
- // Helper method that implements ReadOffset. Caller must acquire a lock
- // when calling this method.
- StreamResult ReadOffsetLocked(void* buffer,
- size_t bytes,
- size_t offset,
- size_t* bytes_read)
- RTC_EXCLUSIVE_LOCKS_REQUIRED(crit_);
-
- // Helper method that implements WriteOffset. Caller must acquire a lock
- // when calling this method.
- StreamResult WriteOffsetLocked(const void* buffer,
- size_t bytes,
- size_t offset,
- size_t* bytes_written)
- RTC_EXCLUSIVE_LOCKS_REQUIRED(crit_);
-
- // keeps the opened/closed state of the stream
- StreamState state_ RTC_GUARDED_BY(crit_);
- // the allocated buffer
- std::unique_ptr<char[]> buffer_ RTC_GUARDED_BY(crit_);
- // size of the allocated buffer
- size_t buffer_length_ RTC_GUARDED_BY(crit_);
- // amount of readable data in the buffer
- size_t data_length_ RTC_GUARDED_BY(crit_);
- // offset to the readable data
- size_t read_position_ RTC_GUARDED_BY(crit_);
- // stream callbacks are dispatched on this thread
- Thread* owner_;
- // object lock
- CriticalSection crit_;
- RTC_DISALLOW_COPY_AND_ASSIGN(FifoBuffer);
-};
-
} // namespace rtc
#endif // RTC_BASE_STREAM_H_