blob: 7649c42a018798ec6fe482717ff5a0fdb1dd04ed [file] [log] [blame]
henrike@webrtc.orgf0488722014-05-13 18:00:26 +00001/*
2 * Copyright 2004 The WebRTC Project Authors. All rights reserved.
3 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 */
10
11
12#if defined(WEBRTC_WIN)
13#include "webrtc/base/win32.h"
kjellander@webrtc.org7d2b6a92015-01-28 18:37:58 +000014#else // !WEBRTC_WIN
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000015#define SEC_E_CERT_EXPIRED (-2146893016)
kjellander@webrtc.org7d2b6a92015-01-28 18:37:58 +000016#endif // !WEBRTC_WIN
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000017
18#include "webrtc/base/common.h"
19#include "webrtc/base/httpbase.h"
20#include "webrtc/base/logging.h"
21#include "webrtc/base/socket.h"
22#include "webrtc/base/stringutils.h"
23#include "webrtc/base/thread.h"
24
25namespace rtc {
26
27//////////////////////////////////////////////////////////////////////
28// Helpers
29//////////////////////////////////////////////////////////////////////
30
31bool MatchHeader(const char* str, size_t len, HttpHeader header) {
32 const char* const header_str = ToString(header);
33 const size_t header_len = strlen(header_str);
34 return (len == header_len) && (_strnicmp(str, header_str, header_len) == 0);
35}
36
37enum {
38 MSG_READ
39};
40
41//////////////////////////////////////////////////////////////////////
42// HttpParser
43//////////////////////////////////////////////////////////////////////
44
45HttpParser::HttpParser() {
46 reset();
47}
48
49HttpParser::~HttpParser() {
50}
51
52void
53HttpParser::reset() {
54 state_ = ST_LEADER;
55 chunked_ = false;
56 data_size_ = SIZE_UNKNOWN;
57}
58
59HttpParser::ProcessResult
60HttpParser::Process(const char* buffer, size_t len, size_t* processed,
61 HttpError* error) {
62 *processed = 0;
63 *error = HE_NONE;
64
65 if (state_ >= ST_COMPLETE) {
66 ASSERT(false);
67 return PR_COMPLETE;
68 }
69
70 while (true) {
71 if (state_ < ST_DATA) {
72 size_t pos = *processed;
73 while ((pos < len) && (buffer[pos] != '\n')) {
74 pos += 1;
75 }
76 if (pos >= len) {
77 break; // don't have a full header
78 }
79 const char* line = buffer + *processed;
80 size_t len = (pos - *processed);
81 *processed = pos + 1;
82 while ((len > 0) && isspace(static_cast<unsigned char>(line[len-1]))) {
83 len -= 1;
84 }
85 ProcessResult result = ProcessLine(line, len, error);
86 LOG(LS_VERBOSE) << "Processed line, result=" << result;
87
88 if (PR_CONTINUE != result) {
89 return result;
90 }
91 } else if (data_size_ == 0) {
92 if (chunked_) {
93 state_ = ST_CHUNKTERM;
94 } else {
95 return PR_COMPLETE;
96 }
97 } else {
98 size_t available = len - *processed;
99 if (available <= 0) {
100 break; // no more data
101 }
102 if ((data_size_ != SIZE_UNKNOWN) && (available > data_size_)) {
103 available = data_size_;
104 }
105 size_t read = 0;
106 ProcessResult result = ProcessData(buffer + *processed, available, read,
107 error);
108 LOG(LS_VERBOSE) << "Processed data, result: " << result << " read: "
109 << read << " err: " << error;
110
111 if (PR_CONTINUE != result) {
112 return result;
113 }
114 *processed += read;
115 if (data_size_ != SIZE_UNKNOWN) {
116 data_size_ -= read;
117 }
118 }
119 }
120
121 return PR_CONTINUE;
122}
123
124HttpParser::ProcessResult
125HttpParser::ProcessLine(const char* line, size_t len, HttpError* error) {
126 LOG_F(LS_VERBOSE) << " state: " << state_ << " line: "
127 << std::string(line, len) << " len: " << len << " err: "
128 << error;
129
130 switch (state_) {
131 case ST_LEADER:
132 state_ = ST_HEADERS;
133 return ProcessLeader(line, len, error);
134
135 case ST_HEADERS:
136 if (len > 0) {
137 const char* value = strchrn(line, len, ':');
138 if (!value) {
139 *error = HE_PROTOCOL;
140 return PR_COMPLETE;
141 }
142 size_t nlen = (value - line);
143 const char* eol = line + len;
144 do {
145 value += 1;
146 } while ((value < eol) && isspace(static_cast<unsigned char>(*value)));
147 size_t vlen = eol - value;
148 if (MatchHeader(line, nlen, HH_CONTENT_LENGTH)) {
149 // sscanf isn't safe with strings that aren't null-terminated, and there
150 // is no guarantee that |value| is.
151 // Create a local copy that is null-terminated.
152 std::string value_str(value, vlen);
153 unsigned int temp_size;
154 if (sscanf(value_str.c_str(), "%u", &temp_size) != 1) {
155 *error = HE_PROTOCOL;
156 return PR_COMPLETE;
157 }
158 data_size_ = static_cast<size_t>(temp_size);
159 } else if (MatchHeader(line, nlen, HH_TRANSFER_ENCODING)) {
160 if ((vlen == 7) && (_strnicmp(value, "chunked", 7) == 0)) {
161 chunked_ = true;
162 } else if ((vlen == 8) && (_strnicmp(value, "identity", 8) == 0)) {
163 chunked_ = false;
164 } else {
165 *error = HE_PROTOCOL;
166 return PR_COMPLETE;
167 }
168 }
169 return ProcessHeader(line, nlen, value, vlen, error);
170 } else {
171 state_ = chunked_ ? ST_CHUNKSIZE : ST_DATA;
172 return ProcessHeaderComplete(chunked_, data_size_, error);
173 }
174 break;
175
176 case ST_CHUNKSIZE:
177 if (len > 0) {
178 char* ptr = NULL;
179 data_size_ = strtoul(line, &ptr, 16);
180 if (ptr != line + len) {
181 *error = HE_PROTOCOL;
182 return PR_COMPLETE;
183 }
184 state_ = (data_size_ == 0) ? ST_TRAILERS : ST_DATA;
185 } else {
186 *error = HE_PROTOCOL;
187 return PR_COMPLETE;
188 }
189 break;
190
191 case ST_CHUNKTERM:
192 if (len > 0) {
193 *error = HE_PROTOCOL;
194 return PR_COMPLETE;
195 } else {
196 state_ = chunked_ ? ST_CHUNKSIZE : ST_DATA;
197 }
198 break;
199
200 case ST_TRAILERS:
201 if (len == 0) {
202 return PR_COMPLETE;
203 }
204 // *error = onHttpRecvTrailer();
205 break;
206
207 default:
208 ASSERT(false);
209 break;
210 }
211
212 return PR_CONTINUE;
213}
214
215bool
216HttpParser::is_valid_end_of_input() const {
217 return (state_ == ST_DATA) && (data_size_ == SIZE_UNKNOWN);
218}
219
220void
221HttpParser::complete(HttpError error) {
222 if (state_ < ST_COMPLETE) {
223 state_ = ST_COMPLETE;
224 OnComplete(error);
225 }
226}
227
228//////////////////////////////////////////////////////////////////////
229// HttpBase::DocumentStream
230//////////////////////////////////////////////////////////////////////
231
232class BlockingMemoryStream : public ExternalMemoryStream {
233public:
234 BlockingMemoryStream(char* buffer, size_t size)
235 : ExternalMemoryStream(buffer, size) { }
236
237 virtual StreamResult DoReserve(size_t size, int* error) {
238 return (buffer_length_ >= size) ? SR_SUCCESS : SR_BLOCK;
239 }
240};
241
242class HttpBase::DocumentStream : public StreamInterface {
243public:
244 DocumentStream(HttpBase* base) : base_(base), error_(HE_DEFAULT) { }
245
246 virtual StreamState GetState() const {
247 if (NULL == base_)
248 return SS_CLOSED;
249 if (HM_RECV == base_->mode_)
250 return SS_OPEN;
251 return SS_OPENING;
252 }
253
254 virtual StreamResult Read(void* buffer, size_t buffer_len,
255 size_t* read, int* error) {
256 if (!base_) {
257 if (error) *error = error_;
258 return (HE_NONE == error_) ? SR_EOS : SR_ERROR;
259 }
260
261 if (HM_RECV != base_->mode_) {
262 return SR_BLOCK;
263 }
264
265 // DoReceiveLoop writes http document data to the StreamInterface* document
266 // member of HttpData. In this case, we want this data to be written
267 // directly to our buffer. To accomplish this, we wrap our buffer with a
268 // StreamInterface, and replace the existing document with our wrapper.
269 // When the method returns, we restore the old document. Ideally, we would
270 // pass our StreamInterface* to DoReceiveLoop, but due to the callbacks
271 // of HttpParser, we would still need to store the pointer temporarily.
272 scoped_ptr<StreamInterface>
273 stream(new BlockingMemoryStream(reinterpret_cast<char*>(buffer),
274 buffer_len));
275
276 // Replace the existing document with our wrapped buffer.
277 base_->data_->document.swap(stream);
278
279 // Pump the I/O loop. DoReceiveLoop is guaranteed not to attempt to
280 // complete the I/O process, which means that our wrapper is not in danger
281 // of being deleted. To ensure this, DoReceiveLoop returns true when it
282 // wants complete to be called. We make sure to uninstall our wrapper
283 // before calling complete().
284 HttpError http_error;
285 bool complete = base_->DoReceiveLoop(&http_error);
286
287 // Reinstall the original output document.
288 base_->data_->document.swap(stream);
289
290 // If we reach the end of the receive stream, we disconnect our stream
291 // adapter from the HttpBase, and further calls to read will either return
292 // EOS or ERROR, appropriately. Finally, we call complete().
293 StreamResult result = SR_BLOCK;
294 if (complete) {
295 HttpBase* base = Disconnect(http_error);
296 if (error) *error = error_;
297 result = (HE_NONE == error_) ? SR_EOS : SR_ERROR;
298 base->complete(http_error);
299 }
300
301 // Even if we are complete, if some data was read we must return SUCCESS.
302 // Future Reads will return EOS or ERROR based on the error_ variable.
303 size_t position;
304 stream->GetPosition(&position);
305 if (position > 0) {
306 if (read) *read = position;
307 result = SR_SUCCESS;
308 }
309 return result;
310 }
311
312 virtual StreamResult Write(const void* data, size_t data_len,
313 size_t* written, int* error) {
314 if (error) *error = -1;
315 return SR_ERROR;
316 }
317
318 virtual void Close() {
319 if (base_) {
320 HttpBase* base = Disconnect(HE_NONE);
321 if (HM_RECV == base->mode_ && base->http_stream_) {
322 // Read I/O could have been stalled on the user of this DocumentStream,
323 // so restart the I/O process now that we've removed ourselves.
324 base->http_stream_->PostEvent(SE_READ, 0);
325 }
326 }
327 }
328
329 virtual bool GetAvailable(size_t* size) const {
330 if (!base_ || HM_RECV != base_->mode_)
331 return false;
332 size_t data_size = base_->GetDataRemaining();
333 if (SIZE_UNKNOWN == data_size)
334 return false;
335 if (size)
336 *size = data_size;
337 return true;
338 }
339
340 HttpBase* Disconnect(HttpError error) {
341 ASSERT(NULL != base_);
342 ASSERT(NULL != base_->doc_stream_);
343 HttpBase* base = base_;
344 base_->doc_stream_ = NULL;
345 base_ = NULL;
346 error_ = error;
347 return base;
348 }
349
350private:
351 HttpBase* base_;
352 HttpError error_;
353};
354
355//////////////////////////////////////////////////////////////////////
356// HttpBase
357//////////////////////////////////////////////////////////////////////
358
359HttpBase::HttpBase() : mode_(HM_NONE), data_(NULL), notify_(NULL),
360 http_stream_(NULL), doc_stream_(NULL) {
361}
362
363HttpBase::~HttpBase() {
364 ASSERT(HM_NONE == mode_);
365}
366
367bool
368HttpBase::isConnected() const {
369 return (http_stream_ != NULL) && (http_stream_->GetState() == SS_OPEN);
370}
371
372bool
373HttpBase::attach(StreamInterface* stream) {
374 if ((mode_ != HM_NONE) || (http_stream_ != NULL) || (stream == NULL)) {
375 ASSERT(false);
376 return false;
377 }
378 http_stream_ = stream;
379 http_stream_->SignalEvent.connect(this, &HttpBase::OnHttpStreamEvent);
380 mode_ = (http_stream_->GetState() == SS_OPENING) ? HM_CONNECT : HM_NONE;
381 return true;
382}
383
384StreamInterface*
385HttpBase::detach() {
386 ASSERT(HM_NONE == mode_);
387 if (mode_ != HM_NONE) {
388 return NULL;
389 }
390 StreamInterface* stream = http_stream_;
391 http_stream_ = NULL;
392 if (stream) {
393 stream->SignalEvent.disconnect(this);
394 }
395 return stream;
396}
397
398void
399HttpBase::send(HttpData* data) {
400 ASSERT(HM_NONE == mode_);
401 if (mode_ != HM_NONE) {
402 return;
403 } else if (!isConnected()) {
404 OnHttpStreamEvent(http_stream_, SE_CLOSE, HE_DISCONNECTED);
405 return;
406 }
407
408 mode_ = HM_SEND;
409 data_ = data;
410 len_ = 0;
411 ignore_data_ = chunk_data_ = false;
412
413 if (data_->document) {
414 data_->document->SignalEvent.connect(this, &HttpBase::OnDocumentEvent);
415 }
416
417 std::string encoding;
418 if (data_->hasHeader(HH_TRANSFER_ENCODING, &encoding)
419 && (encoding == "chunked")) {
420 chunk_data_ = true;
421 }
422
423 len_ = data_->formatLeader(buffer_, sizeof(buffer_));
424 len_ += strcpyn(buffer_ + len_, sizeof(buffer_) - len_, "\r\n");
425
426 header_ = data_->begin();
427 if (header_ == data_->end()) {
428 // We must call this at least once, in the case where there are no headers.
429 queue_headers();
430 }
431
432 flush_data();
433}
434
435void
436HttpBase::recv(HttpData* data) {
437 ASSERT(HM_NONE == mode_);
438 if (mode_ != HM_NONE) {
439 return;
440 } else if (!isConnected()) {
441 OnHttpStreamEvent(http_stream_, SE_CLOSE, HE_DISCONNECTED);
442 return;
443 }
444
445 mode_ = HM_RECV;
446 data_ = data;
447 len_ = 0;
448 ignore_data_ = chunk_data_ = false;
449
450 reset();
451 if (doc_stream_) {
452 doc_stream_->SignalEvent(doc_stream_, SE_OPEN | SE_READ, 0);
453 } else {
454 read_and_process_data();
455 }
456}
457
458void
459HttpBase::abort(HttpError err) {
460 if (mode_ != HM_NONE) {
461 if (http_stream_ != NULL) {
462 http_stream_->Close();
463 }
464 do_complete(err);
465 }
466}
467
468StreamInterface* HttpBase::GetDocumentStream() {
469 if (doc_stream_)
470 return NULL;
471 doc_stream_ = new DocumentStream(this);
472 return doc_stream_;
473}
474
475HttpError HttpBase::HandleStreamClose(int error) {
476 if (http_stream_ != NULL) {
477 http_stream_->Close();
478 }
479 if (error == 0) {
480 if ((mode_ == HM_RECV) && is_valid_end_of_input()) {
481 return HE_NONE;
482 } else {
483 return HE_DISCONNECTED;
484 }
485 } else if (error == SOCKET_EACCES) {
486 return HE_AUTH;
487 } else if (error == SEC_E_CERT_EXPIRED) {
488 return HE_CERTIFICATE_EXPIRED;
489 }
490 LOG_F(LS_ERROR) << "(" << error << ")";
491 return (HM_CONNECT == mode_) ? HE_CONNECT_FAILED : HE_SOCKET_ERROR;
492}
493
494bool HttpBase::DoReceiveLoop(HttpError* error) {
495 ASSERT(HM_RECV == mode_);
496 ASSERT(NULL != error);
497
498 // Do to the latency between receiving read notifications from
499 // pseudotcpchannel, we rely on repeated calls to read in order to acheive
500 // ideal throughput. The number of reads is limited to prevent starving
501 // the caller.
502
503 size_t loop_count = 0;
504 const size_t kMaxReadCount = 20;
505 bool process_requires_more_data = false;
506 do {
507 // The most frequent use of this function is response to new data available
508 // on http_stream_. Therefore, we optimize by attempting to read from the
509 // network first (as opposed to processing existing data first).
510
511 if (len_ < sizeof(buffer_)) {
512 // Attempt to buffer more data.
513 size_t read;
514 int read_error;
515 StreamResult read_result = http_stream_->Read(buffer_ + len_,
516 sizeof(buffer_) - len_,
517 &read, &read_error);
518 switch (read_result) {
519 case SR_SUCCESS:
520 ASSERT(len_ + read <= sizeof(buffer_));
521 len_ += read;
522 break;
523 case SR_BLOCK:
524 if (process_requires_more_data) {
525 // We're can't make progress until more data is available.
526 return false;
527 }
528 // Attempt to process the data already in our buffer.
529 break;
530 case SR_EOS:
kjellander@webrtc.org7d2b6a92015-01-28 18:37:58 +0000531 // Clean close, with no error.
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000532 read_error = 0;
kjellander@webrtc.org7d2b6a92015-01-28 18:37:58 +0000533 FALLTHROUGH(); // Fall through to HandleStreamClose.
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000534 case SR_ERROR:
535 *error = HandleStreamClose(read_error);
536 return true;
537 }
538 } else if (process_requires_more_data) {
539 // We have too much unprocessed data in our buffer. This should only
540 // occur when a single HTTP header is longer than the buffer size (32K).
541 // Anything longer than that is almost certainly an error.
542 *error = HE_OVERFLOW;
543 return true;
544 }
545
546 // Process data in our buffer. Process is not guaranteed to process all
547 // the buffered data. In particular, it will wait until a complete
548 // protocol element (such as http header, or chunk size) is available,
549 // before processing it in its entirety. Also, it is valid and sometimes
550 // necessary to call Process with an empty buffer, since the state machine
551 // may have interrupted state transitions to complete.
552 size_t processed;
553 ProcessResult process_result = Process(buffer_, len_, &processed,
554 error);
555 ASSERT(processed <= len_);
556 len_ -= processed;
557 memmove(buffer_, buffer_ + processed, len_);
558 switch (process_result) {
559 case PR_CONTINUE:
560 // We need more data to make progress.
561 process_requires_more_data = true;
562 break;
563 case PR_BLOCK:
564 // We're stalled on writing the processed data.
565 return false;
566 case PR_COMPLETE:
567 // *error already contains the correct code.
568 return true;
569 }
570 } while (++loop_count <= kMaxReadCount);
571
572 LOG_F(LS_WARNING) << "danger of starvation";
573 return false;
574}
575
576void
577HttpBase::read_and_process_data() {
578 HttpError error;
579 if (DoReceiveLoop(&error)) {
580 complete(error);
581 }
582}
583
584void
585HttpBase::flush_data() {
586 ASSERT(HM_SEND == mode_);
587
588 // When send_required is true, no more buffering can occur without a network
589 // write.
590 bool send_required = (len_ >= sizeof(buffer_));
591
592 while (true) {
593 ASSERT(len_ <= sizeof(buffer_));
594
595 // HTTP is inherently sensitive to round trip latency, since a frequent use
596 // case is for small requests and responses to be sent back and forth, and
597 // the lack of pipelining forces a single request to take a minimum of the
598 // round trip time. As a result, it is to our benefit to pack as much data
599 // into each packet as possible. Thus, we defer network writes until we've
600 // buffered as much data as possible.
601
602 if (!send_required && (header_ != data_->end())) {
603 // First, attempt to queue more header data.
604 send_required = queue_headers();
605 }
606
607 if (!send_required && data_->document) {
608 // Next, attempt to queue document data.
609
610 const size_t kChunkDigits = 8;
611 size_t offset, reserve;
612 if (chunk_data_) {
613 // Reserve characters at the start for X-byte hex value and \r\n
614 offset = len_ + kChunkDigits + 2;
615 // ... and 2 characters at the end for \r\n
616 reserve = offset + 2;
617 } else {
618 offset = len_;
619 reserve = offset;
620 }
621
622 if (reserve >= sizeof(buffer_)) {
623 send_required = true;
624 } else {
625 size_t read;
626 int error;
627 StreamResult result = data_->document->Read(buffer_ + offset,
628 sizeof(buffer_) - reserve,
629 &read, &error);
630 if (result == SR_SUCCESS) {
631 ASSERT(reserve + read <= sizeof(buffer_));
632 if (chunk_data_) {
633 // Prepend the chunk length in hex.
634 // Note: sprintfn appends a null terminator, which is why we can't
635 // combine it with the line terminator.
636 sprintfn(buffer_ + len_, kChunkDigits + 1, "%.*x",
637 kChunkDigits, read);
638 // Add line terminator to the chunk length.
639 memcpy(buffer_ + len_ + kChunkDigits, "\r\n", 2);
640 // Add line terminator to the end of the chunk.
641 memcpy(buffer_ + offset + read, "\r\n", 2);
642 }
643 len_ = reserve + read;
644 } else if (result == SR_BLOCK) {
645 // Nothing to do but flush data to the network.
646 send_required = true;
647 } else if (result == SR_EOS) {
648 if (chunk_data_) {
649 // Append the empty chunk and empty trailers, then turn off
650 // chunking.
651 ASSERT(len_ + 5 <= sizeof(buffer_));
652 memcpy(buffer_ + len_, "0\r\n\r\n", 5);
653 len_ += 5;
654 chunk_data_ = false;
655 } else if (0 == len_) {
656 // No more data to read, and no more data to write.
657 do_complete();
658 return;
659 }
660 // Although we are done reading data, there is still data which needs
661 // to be flushed to the network.
662 send_required = true;
663 } else {
664 LOG_F(LS_ERROR) << "Read error: " << error;
665 do_complete(HE_STREAM);
666 return;
667 }
668 }
669 }
670
671 if (0 == len_) {
672 // No data currently available to send.
673 if (!data_->document) {
674 // If there is no source document, that means we're done.
675 do_complete();
676 }
677 return;
678 }
679
680 size_t written;
681 int error;
682 StreamResult result = http_stream_->Write(buffer_, len_, &written, &error);
683 if (result == SR_SUCCESS) {
684 ASSERT(written <= len_);
685 len_ -= written;
686 memmove(buffer_, buffer_ + written, len_);
687 send_required = false;
688 } else if (result == SR_BLOCK) {
689 if (send_required) {
690 // Nothing more we can do until network is writeable.
691 return;
692 }
693 } else {
694 ASSERT(result == SR_ERROR);
695 LOG_F(LS_ERROR) << "error";
696 OnHttpStreamEvent(http_stream_, SE_CLOSE, error);
697 return;
698 }
699 }
700
701 ASSERT(false);
702}
703
704bool
705HttpBase::queue_headers() {
706 ASSERT(HM_SEND == mode_);
707 while (header_ != data_->end()) {
708 size_t len = sprintfn(buffer_ + len_, sizeof(buffer_) - len_,
709 "%.*s: %.*s\r\n",
710 header_->first.size(), header_->first.data(),
711 header_->second.size(), header_->second.data());
712 if (len_ + len < sizeof(buffer_) - 3) {
713 len_ += len;
714 ++header_;
715 } else if (len_ == 0) {
716 LOG(WARNING) << "discarding header that is too long: " << header_->first;
717 ++header_;
718 } else {
719 // Not enough room for the next header, write to network first.
720 return true;
721 }
722 }
723 // End of headers
724 len_ += strcpyn(buffer_ + len_, sizeof(buffer_) - len_, "\r\n");
725 return false;
726}
727
728void
729HttpBase::do_complete(HttpError err) {
730 ASSERT(mode_ != HM_NONE);
731 HttpMode mode = mode_;
732 mode_ = HM_NONE;
733 if (data_ && data_->document) {
734 data_->document->SignalEvent.disconnect(this);
735 }
736 data_ = NULL;
737 if ((HM_RECV == mode) && doc_stream_) {
738 ASSERT(HE_NONE != err); // We should have Disconnected doc_stream_ already.
739 DocumentStream* ds = doc_stream_;
740 ds->Disconnect(err);
741 ds->SignalEvent(ds, SE_CLOSE, err);
742 }
743 if (notify_) {
744 notify_->onHttpComplete(mode, err);
745 }
746}
747
748//
749// Stream Signals
750//
751
752void
753HttpBase::OnHttpStreamEvent(StreamInterface* stream, int events, int error) {
754 ASSERT(stream == http_stream_);
755 if ((events & SE_OPEN) && (mode_ == HM_CONNECT)) {
756 do_complete();
757 return;
758 }
759
760 if ((events & SE_WRITE) && (mode_ == HM_SEND)) {
761 flush_data();
762 return;
763 }
764
765 if ((events & SE_READ) && (mode_ == HM_RECV)) {
766 if (doc_stream_) {
767 doc_stream_->SignalEvent(doc_stream_, SE_READ, 0);
768 } else {
769 read_and_process_data();
770 }
771 return;
772 }
773
774 if ((events & SE_CLOSE) == 0)
775 return;
776
777 HttpError http_error = HandleStreamClose(error);
778 if (mode_ == HM_RECV) {
779 complete(http_error);
780 } else if (mode_ != HM_NONE) {
781 do_complete(http_error);
782 } else if (notify_) {
783 notify_->onHttpClosed(http_error);
784 }
785}
786
787void
788HttpBase::OnDocumentEvent(StreamInterface* stream, int events, int error) {
789 ASSERT(stream == data_->document.get());
790 if ((events & SE_WRITE) && (mode_ == HM_RECV)) {
791 read_and_process_data();
792 return;
793 }
794
795 if ((events & SE_READ) && (mode_ == HM_SEND)) {
796 flush_data();
797 return;
798 }
799
800 if (events & SE_CLOSE) {
801 LOG_F(LS_ERROR) << "Read error: " << error;
802 do_complete(HE_STREAM);
803 return;
804 }
805}
806
807//
808// HttpParser Implementation
809//
810
811HttpParser::ProcessResult
812HttpBase::ProcessLeader(const char* line, size_t len, HttpError* error) {
813 *error = data_->parseLeader(line, len);
814 return (HE_NONE == *error) ? PR_CONTINUE : PR_COMPLETE;
815}
816
817HttpParser::ProcessResult
818HttpBase::ProcessHeader(const char* name, size_t nlen, const char* value,
819 size_t vlen, HttpError* error) {
820 std::string sname(name, nlen), svalue(value, vlen);
821 data_->addHeader(sname, svalue);
822 return PR_CONTINUE;
823}
824
825HttpParser::ProcessResult
826HttpBase::ProcessHeaderComplete(bool chunked, size_t& data_size,
827 HttpError* error) {
828 StreamInterface* old_docstream = doc_stream_;
829 if (notify_) {
830 *error = notify_->onHttpHeaderComplete(chunked, data_size);
831 // The request must not be aborted as a result of this callback.
832 ASSERT(NULL != data_);
833 }
834 if ((HE_NONE == *error) && data_->document) {
835 data_->document->SignalEvent.connect(this, &HttpBase::OnDocumentEvent);
836 }
837 if (HE_NONE != *error) {
838 return PR_COMPLETE;
839 }
840 if (old_docstream != doc_stream_) {
841 // Break out of Process loop, since our I/O model just changed.
842 return PR_BLOCK;
843 }
844 return PR_CONTINUE;
845}
846
847HttpParser::ProcessResult
848HttpBase::ProcessData(const char* data, size_t len, size_t& read,
849 HttpError* error) {
850 if (ignore_data_ || !data_->document) {
851 read = len;
852 return PR_CONTINUE;
853 }
854 int write_error = 0;
855 switch (data_->document->Write(data, len, &read, &write_error)) {
856 case SR_SUCCESS:
857 return PR_CONTINUE;
858 case SR_BLOCK:
859 return PR_BLOCK;
860 case SR_EOS:
861 LOG_F(LS_ERROR) << "Unexpected EOS";
862 *error = HE_STREAM;
863 return PR_COMPLETE;
864 case SR_ERROR:
865 default:
866 LOG_F(LS_ERROR) << "Write error: " << write_error;
867 *error = HE_STREAM;
868 return PR_COMPLETE;
869 }
870}
871
872void
873HttpBase::OnComplete(HttpError err) {
874 LOG_F(LS_VERBOSE);
875 do_complete(err);
876}
877
878} // namespace rtc