Dechunk incoming requests as needed and pipeline them to the server side. The server side always chunks the request if the original request was chunked. No next hop version checks are performed. * Client-side: Removed clientIsRequestBodyValid() as unused. It was called with a content-length>0 precondition that made the function always return true. Removed old dechunking hack that was trying to buffering the entire request body, pretending that we are still reading the headers. Adjusted related code. More work may be needed to identify client-side code that assumes the request size is always known. Removed ConnStateData::bodySizeLeft() because we do not always know how much body is left to read -- chunked requests do not have known sizes until we read the last-chunk. Moreover, it was possibly used wrong because sometimes we want to know whether we want to comm_read more body bytes and sometimes we want to know whether we want to "produce" more body bytes (i.e., copy already read bytes into the BodyPipe buffer, which can get full). Added ConnStateData::mayNeedToReadMoreBody() to replace conn->bodySizeLeft() with something more usable and precise. XXX: If there is a chunks parsing error, the new code just resets the connection. I tried to find a way to send an error page to the client, but failed to do so. It is easy to do when the headers and the body prefix is parsed, but if the error is send later, the server-side may start sending us its response, and the two responses would clash, causing assertions. I do not know how to fully avoid that. Search for WE_KNOW_HOW_TO_SEND_ERRORS. Tried to break deep recursion/iteration around clientParseRequest. When chunked parser fails during the message prefix parsing, the rest of the code may decide that the connection is no longer used (and that there is no pending transaction, even though the currentobject member is not NULL!) and start parsing the second request. If that second parse fails (for example), there will be two concurrent errors to be sent to the client and the client-side code cannot handle that. However, due to the XXX above, we never send an error when chunking parser fails, making most of the related code polishing useless, at least for now. Removed my wrong XXX related to closing after initiateClose. Removed my(?) XXX related to endless chunked requests. There is nothing special about them, I guess, as a non-chunked request can be virtually endless as well if it has a huge Content-Length value. Use commIsHalfClosed() instead of fd_table[fd].flags.socket_eof for consistency with other client-side code and to improve readability. I think these should return the same value in our context but I am not sure. Correctly handle identity encoding. TODO: HTTPbis dropped it. Should we? * Server-side: Separated "received whole request body" state from "sent whole request body". When we chunk requests, we need to add last-chunk. Thus, we may receive (and written) the whole raw body but still need to write last-chunk. This is not trivial because we should not write last-chunk if the body producer aborted. XXX: check all pipe->exhausted() callers to make sure all code has been adjusted. Added getMoreRequestBody() virtual method that Server uses to get encoded body bytes from its kids. FTP does not encode and uses default implementation. Fixed HTTP/FTP doneSendingRequestBody() to call its parent. I am not sure it helps with correctly processing transactions, but the parent method was designed to be called, and calling it make the transaction state more clear. Moved "broken POSTS" handling code into its own method and polished it (HttpStateData::finishingBrokenPost). We now skip the "broken POSTS" fix if the request is chunked. Resolved old XXX: HttpStateData::handleRequestBodyProducerAborted() was indeed doing nothing useful despite all the pretense. Now it aborts the transaction. fwd->handleUnregisteredServerEnd() was called without calling fwd->unregister() first. It led to assertions because FwdState was being aborted (on fd closure) without any error being set. Now fixed because fwd->unregister() removes FwdState's close handler and FwdState destructor does not require an error to be set. === modified file 'src/BodyPipe.h' --- src/BodyPipe.h 2010-08-23 23:15:26 +0000 +++ src/BodyPipe.h 2010-09-03 05:02:25 +0000 @@ -82,40 +82,41 @@ class BodyPipe: public RefCountable { public: typedef RefCount Pointer; typedef BodyProducer Producer; typedef BodyConsumer Consumer; typedef BodyPipeCheckout Checkout; enum { MaxCapacity = SQUID_TCP_SO_RCVBUF }; friend class BodyPipeCheckout; public: BodyPipe(Producer *aProducer); ~BodyPipe(); // asserts that producer and consumer are cleared void setBodySize(uint64_t aSize); // set body size bool bodySizeKnown() const { return theBodySize >= 0; } uint64_t bodySize() const; uint64_t consumedSize() const { return theGetSize; } + uint64_t producedSize() const { return thePutSize; } bool productionEnded() const { return !theProducer; } // called by producers void clearProducer(bool atEof); // aborts or sends eof size_t putMoreData(const char *buf, size_t size); bool mayNeedMoreData() const { return !bodySizeKnown() || needsMoreData(); } bool needsMoreData() const { return bodySizeKnown() && unproducedSize() > 0; } uint64_t unproducedSize() const; // size of still unproduced data bool stillProducing(const Producer::Pointer &producer) const { return theProducer == producer; } void expectProductionEndAfter(uint64_t extraSize); ///< sets or checks body size // called by consumers bool setConsumerIfNotLate(const Consumer::Pointer &aConsumer); void clearConsumer(); // aborts if still piping size_t getMoreData(MemBuf &buf); void consume(size_t size); bool expectMoreAfter(uint64_t offset) const; bool exhausted() const; // saw eof/abort and all data consumed bool stillConsuming(const Consumer::Pointer &consumer) const { return theConsumer == consumer; } === modified file 'src/Server.cc' --- src/Server.cc 2010-08-24 00:12:54 +0000 +++ src/Server.cc 2010-09-05 04:49:04 +0000 @@ -34,46 +34,48 @@ #include "squid.h" #include "base/TextException.h" #include "Server.h" #include "Store.h" #include "fde.h" /* for fd_table[fd].closing */ #include "HttpRequest.h" #include "HttpReply.h" #include "errorpage.h" #include "SquidTime.h" #if USE_ADAPTATION #include "adaptation/AccessCheck.h" #include "adaptation/Iterator.h" #endif // implemented in client_side_reply.cc until sides have a common parent extern void purgeEntriesByUrl(HttpRequest * req, const char *url); -ServerStateData::ServerStateData(FwdState *theFwdState): AsyncJob("ServerStateData"),requestSender(NULL) +ServerStateData::ServerStateData(FwdState *theFwdState): AsyncJob("ServerStateData"), + requestSender(NULL), #if USE_ADAPTATION - , adaptedHeadSource(NULL) - , adaptationAccessCheckPending(false) - , startedAdaptation(false) + adaptedHeadSource(NULL), + adaptationAccessCheckPending(false), + startedAdaptation(false), #endif + receivedWholeRequestBody(false) { fwd = theFwdState; entry = fwd->entry; entry->lock(); request = HTTPMSGLOCK(fwd->request); } ServerStateData::~ServerStateData() { // paranoid: check that swanSong has been called assert(!requestBodySource); #if USE_ADAPTATION assert(!virginBodyDestination); assert(!adaptedBodySource); #endif entry->unlock(); @@ -303,40 +305,41 @@ } #endif handleRequestBodyProducerAborted(); } // more origin request body data is available void ServerStateData::handleMoreRequestBodyAvailable() { if (!requestSender) sendMoreRequestBody(); else debugs(9,3, HERE << "waiting for request body write to complete"); } // there will be no more handleMoreRequestBodyAvailable calls void ServerStateData::handleRequestBodyProductionEnded() { + receivedWholeRequestBody = true; if (!requestSender) doneSendingRequestBody(); else debugs(9,3, HERE << "waiting for request body write to complete"); } // called when we are done sending request body; kids extend this void ServerStateData::doneSendingRequestBody() { debugs(9,3, HERE << "done sending request body"); assert(requestBodySource != NULL); stopConsumingFrom(requestBodySource); // kids extend this } // called when body producers aborts; kids extend this void ServerStateData::handleRequestBodyProducerAborted() @@ -371,78 +374,91 @@ if (!requestBodySource) { debugs(9,3, HERE << "detected while-we-were-sending abort"); return; // do nothing; } if (io.flag) { debugs(11, 1, "sentRequestBody error: FD " << io.fd << ": " << xstrerr(errno)); ErrorState *err; err = errorCon(ERR_WRITE_ERROR, HTTP_BAD_GATEWAY, fwd->request); err->xerrno = errno; fwd->fail(err); abortTransaction("I/O error while sending request body"); return; } if (EBIT_TEST(entry->flags, ENTRY_ABORTED)) { abortTransaction("store entry aborted while sending request body"); return; } - if (requestBodySource->exhausted()) + if (!requestBodySource->exhausted()) + sendMoreRequestBody(); + else + if (receivedWholeRequestBody) doneSendingRequestBody(); else - sendMoreRequestBody(); + debugs(9,3, HERE << "waiting for body production end or abort"); } bool ServerStateData::canSend(int fd) const { return fd >= 0 && !fd_table[fd].closing(); } void ServerStateData::sendMoreRequestBody() { assert(requestBodySource != NULL); assert(!requestSender); const int fd = dataDescriptor(); if (!canSend(fd)) { debugs(9,3, HERE << "cannot send request body to closing FD " << fd); return; // wait for the kid's close handler; TODO: assert(closer); } MemBuf buf; - if (requestBodySource->getMoreData(buf)) { + + if (getMoreRequestBody(buf) && buf.contentSize() > 0) { debugs(9,3, HERE << "will write " << buf.contentSize() << " request body bytes"); typedef CommCbMemFunT Dialer; requestSender = JobCallback(93,3, Dialer, this, ServerStateData::sentRequestBody); comm_write_mbuf(fd, &buf, requestSender); } else { debugs(9,3, HERE << "will wait for more request body bytes or eof"); requestSender = NULL; } } +/// either fill buf with available [encoded] request body bytes or return false +bool +ServerStateData::getMoreRequestBody(MemBuf &buf) +{ + // default implementation does not encode request body content + Must(requestBodySource != NULL); + return requestBodySource->getMoreData(buf); +} + // Compares hosts in urls, returns false if different, no sheme, or no host. static bool sameUrlHosts(const char *url1, const char *url2) { // XXX: Want urlHostname() here, but it uses static storage and copying const char *host1 = strchr(url1, ':'); const char *host2 = strchr(url2, ':'); if (host1 && host2) { // skip scheme slashes do { ++host1; ++host2; } while (*host1 == '/' && *host2 == '/'); if (!*host1) return false; // no host // increment while the same until we reach the end of the URL/host while (*host1 && *host1 != '/' && *host1 == *host2) { === modified file 'src/Server.h' --- src/Server.h 2010-08-23 23:15:26 +0000 +++ src/Server.h 2010-09-04 23:41:57 +0000 @@ -80,40 +80,41 @@ virtual void maybeReadVirginBody() = 0; /// abnormal transaction termination; reason is for debugging only virtual void abortTransaction(const char *reason) = 0; /// a hack to reach HttpStateData::orignal_request virtual HttpRequest *originalRequest(); #if USE_ADAPTATION void adaptationAclCheckDone(Adaptation::ServiceGroupPointer group); static void adaptationAclCheckDoneWrapper(Adaptation::ServiceGroupPointer group, void *data); // ICAPInitiator: start an ICAP transaction and receive adapted headers. virtual void noteAdaptationAnswer(HttpMsg *message); virtual void noteAdaptationQueryAbort(bool final); // BodyProducer: provide virgin response body to ICAP. virtual void noteMoreBodySpaceAvailable(BodyPipe::Pointer ); virtual void noteBodyConsumerAborted(BodyPipe::Pointer ); #endif + virtual bool getMoreRequestBody(MemBuf &buf); virtual void processReplyBody() = 0; //AsyncJob virtual methods virtual void swanSong(); virtual bool doneAll() const { return #if USE_ADAPTATION Adaptation::Initiator::doneAll() && BodyProducer::doneAll() && #endif BodyConsumer::doneAll() && false; } public: // should be protected void serverComplete(); /**< call when no server communication is expected */ private: void serverComplete2(); /**< Continuation of serverComplete */ bool completed; /**< serverComplete() has been called */ @@ -177,31 +178,32 @@ int64_t currentOffset; /**< Our current offset in the StoreEntry */ MemBuf *responseBodyBuffer; /**< Data temporarily buffered for ICAP */ public: // should not be StoreEntry *entry; FwdState::Pointer fwd; HttpRequest *request; protected: BodyPipe::Pointer requestBodySource; /**< to consume request body */ AsyncCall::Pointer requestSender; /**< set if we are expecting comm_write to call us back */ #if USE_ADAPTATION BodyPipe::Pointer virginBodyDestination; /**< to provide virgin response body */ CbcPointer adaptedHeadSource; /**< to get adapted response headers */ BodyPipe::Pointer adaptedBodySource; /**< to consume adated response body */ bool adaptationAccessCheckPending; bool startedAdaptation; #endif + bool receivedWholeRequestBody; ///< handleRequestBodyProductionEnded called private: void quitIfAllDone(); /**< successful termination */ void sendBodyIsTooLargeError(); void maybePurgeOthers(); HttpReply *theVirginReply; /**< reply received from the origin server */ HttpReply *theFinalReply; /**< adapted reply from ICAP or virgin reply */ }; #endif /* SQUID_SERVER_H */ === modified file 'src/client_side.cc' --- src/client_side.cc 2010-09-01 00:00:41 +0000 +++ src/client_side.cc 2010-09-05 08:12:50 +0000 @@ -164,41 +164,40 @@ cbdataFree (address); } /* Local functions */ /* ClientSocketContext */ static ClientSocketContext *ClientSocketContextNew(ClientHttpRequest *); /* other */ static IOCB clientWriteComplete; static IOCB clientWriteBodyComplete; static bool clientParseRequest(ConnStateData * conn, bool &do_next_read); static PF clientLifetimeTimeout; static ClientSocketContext *parseHttpRequestAbort(ConnStateData * conn, const char *uri); static ClientSocketContext *parseHttpRequest(ConnStateData *, HttpParser *, HttpRequestMethod *, HttpVersion *); #if USE_IDENT static IDCB clientIdentDone; #endif static CSCB clientSocketRecipient; static CSD clientSocketDetach; static void clientSetKeepaliveFlag(ClientHttpRequest *); static int clientIsContentLengthValid(HttpRequest * r); -static int clientIsRequestBodyValid(int64_t bodyLength); static int clientIsRequestBodyTooLargeForPolicy(int64_t bodyLength); static void clientUpdateStatHistCounters(log_type logType, int svc_time); static void clientUpdateStatCounters(log_type logType); static void clientUpdateHierCounters(HierarchyLogEntry *); static bool clientPingHasFinished(ping_data const *aPing); void prepareLogWithRequestDetails(HttpRequest *, AccessLogEntry *); #ifndef PURIFY static int connIsUsable(ConnStateData * conn); #endif static int responseFinishedOrFailed(HttpReply * rep, StoreIOBuffer const &receivedData); static void ClientSocketContextPushDeferredIfNeeded(ClientSocketContext::Pointer deferredRequest, ConnStateData * conn); static void clientUpdateSocketStats(log_type logType, size_t size); char *skipLeadingSpace(char *aString); static void connNoteUseOfBuffer(ConnStateData* conn, size_t byteCount); static int connKeepReadingIncompleteRequest(ConnStateData * conn); static void connCancelIncompleteRequests(ConnStateData * conn); static ConnStateData *connStateCreate(const Ip::Address &peer, const Ip::Address &me, int fd, http_port_list *port); @@ -754,70 +753,62 @@ case METHOD_POST: /* PUT/POST requires a request entity */ return (r->content_length >= 0); case METHOD_GET: case METHOD_HEAD: /* We do not want to see a request entity on GET/HEAD requests */ return (r->content_length <= 0 || Config.onoff.request_entities); default: /* For other types of requests we don't care */ return 1; } /* NOT REACHED */ } int -clientIsRequestBodyValid(int64_t bodyLength) -{ - if (bodyLength >= 0) - return 1; - - return 0; -} - -int clientIsRequestBodyTooLargeForPolicy(int64_t bodyLength) { if (Config.maxRequestBodySize && bodyLength > Config.maxRequestBodySize) return 1; /* too large */ return 0; } #ifndef PURIFY int connIsUsable(ConnStateData * conn) { if (conn == NULL || !cbdataReferenceValid(conn) || conn->fd == -1) return 0; return 1; } #endif +// careful: the "current" context may be gone if we wrote an early response ClientSocketContext::Pointer ConnStateData::getCurrentContext() const { assert(this); return currentobject; } void ClientSocketContext::deferRecipientForLater(clientStreamNode * node, HttpReply * rep, StoreIOBuffer receivedData) { debugs(33, 2, "clientSocketRecipient: Deferring request " << http->uri); assert(flags.deferred == 0); flags.deferred = 1; deferredparams.node = node; deferredparams.rep = rep; deferredparams.queuedBuffer = receivedData; return; } int @@ -1703,41 +1694,41 @@ } void ClientSocketContext::doClose() { comm_close(fd()); } /** Called to initiate (and possibly complete) closing of the context. * The underlying socket may be already closed */ void ClientSocketContext::initiateClose(const char *reason) { debugs(33, 5, HERE << "initiateClose: closing for " << reason); if (http != NULL) { ConnStateData * conn = http->getConn(); if (conn != NULL) { - if (const int64_t expecting = conn->bodySizeLeft()) { + if (const int64_t expecting = conn->mayNeedToReadMoreBody()) { debugs(33, 5, HERE << "ClientSocketContext::initiateClose: " << "closing, but first " << conn << " needs to read " << expecting << " request body bytes with " << conn->in.notYetUsed << " notYetUsed"); if (conn->closing()) { debugs(33, 2, HERE << "avoiding double-closing " << conn); return; } /* * XXX We assume the reply fits in the TCP transmit * window. If not the connection may stall while sending * the reply (before reaching here) if the client does not * try to read the response while sending the request body. * As of yet we have not received any complaints indicating * this may be an issue. */ conn->startClosing(reason); @@ -1963,52 +1954,40 @@ if ((host = mime_get_header(req_hdr, "Host")) != NULL) { int url_sz = strlen(url) + 32 + Config.appendDomainLen + strlen(host); http->uri = (char *)xcalloc(url_sz, 1); snprintf(http->uri, url_sz, "%s://%s%s", conn->port->protocol, host, url); debugs(33, 5, "TRANSPARENT HOST REWRITE: '" << http->uri <<"'"); } else { /* Put the local socket IP address as the hostname. */ int url_sz = strlen(url) + 32 + Config.appendDomainLen; http->uri = (char *)xcalloc(url_sz, 1); snprintf(http->uri, url_sz, "%s://%s:%d%s", http->getConn()->port->protocol, http->getConn()->me.NtoA(ntoabuf,MAX_IPSTRLEN), http->getConn()->me.GetPort(), url); debugs(33, 5, "TRANSPARENT REWRITE: '" << http->uri << "'"); } } -// Temporary hack helper: determine whether the request is chunked, expensive -static bool -isChunkedRequest(const HttpParser *hp) -{ - HttpRequest request; - if (!request.parseHeader(HttpParserHdrBuf(hp), HttpParserHdrSz(hp))) - return false; - - return request.header.chunked(); -} - - /** * parseHttpRequest() * * Returns * NULL on incomplete requests * a ClientSocketContext structure on success or failure. * Sets result->flags.parsed_ok to 0 if failed to parse the request. * Sets result->flags.parsed_ok to 1 if we have a good request. */ static ClientSocketContext * parseHttpRequest(ConnStateData *conn, HttpParser *hp, HttpRequestMethod * method_p, HttpVersion *http_ver) { char *req_hdr = NULL; char *end; size_t req_sz; ClientHttpRequest *http; ClientSocketContext *result; StoreIOBuffer tempBuffer; int r; @@ -2090,69 +2069,40 @@ return parseHttpRequestAbort(conn, "error:unsupported-request-method"); } /* * Process headers after request line * TODO: Use httpRequestParse here. */ /* XXX this code should be modified to take a const char * later! */ req_hdr = (char *) hp->buf + hp->req_end + 1; debugs(33, 3, "parseHttpRequest: req_hdr = {" << req_hdr << "}"); end = (char *) hp->buf + hp->hdr_end; debugs(33, 3, "parseHttpRequest: end = {" << end << "}"); debugs(33, 3, "parseHttpRequest: prefix_sz = " << (int) HttpParserRequestLen(hp) << ", req_line_sz = " << HttpParserReqSz(hp)); - // Temporary hack: We might receive a chunked body from a broken HTTP/1.1 - // client that sends chunked requests to HTTP/1.0 Squid. If the request - // might have a chunked body, parse the headers early to look for the - // "Transfer-Encoding: chunked" header. If we find it, wait until the - // entire body is available so that we can set the content length and - // forward the request without chunks. The primary reason for this is - // to avoid forwarding a chunked request because the server side lacks - // logic to determine when it is valid to do so. - // FUTURE_CODE_TO_SUPPORT_CHUNKED_REQUESTS below will replace this hack. - if (hp->v_min == 1 && hp->v_maj == 1 && // broken client, may send chunks - Config.maxChunkedRequestBodySize > 0 && // configured to dechunk - (*method_p == METHOD_PUT || *method_p == METHOD_POST)) { - - // check only once per request because isChunkedRequest is expensive - if (conn->in.dechunkingState == ConnStateData::chunkUnknown) { - if (isChunkedRequest(hp)) - conn->startDechunkingRequest(hp); - else - conn->in.dechunkingState = ConnStateData::chunkNone; - } - - if (conn->in.dechunkingState == ConnStateData::chunkParsing) { - if (conn->parseRequestChunks(hp)) // parses newly read chunks - return NULL; // wait for more data - debugs(33, 5, HERE << "Got complete chunked request or err."); - assert(conn->in.dechunkingState != ConnStateData::chunkParsing); - } - } - /* Ok, all headers are received */ http = new ClientHttpRequest(conn); http->req_sz = HttpParserRequestLen(hp); result = ClientSocketContextNew(http); tempBuffer.data = result->reqbuf; tempBuffer.length = HTTP_REQBUF_SZ; ClientStreamData newServer = new clientReplyContext(http); ClientStreamData newClient = result; clientStreamInit(&http->client_stream, clientGetMoreData, clientReplyDetach, clientReplyStatus, newServer, clientSocketRecipient, clientSocketDetach, newClient, tempBuffer); debugs(33, 5, "parseHttpRequest: Request Header is\n" <<(hp->buf) + hp->hdr_start); /* set url */ /* * XXX this should eventually not use a malloc'ed buffer; the transformation code * below needs to be modified to not expect a mutable nul-terminated string. @@ -2295,113 +2245,96 @@ return 0; } void connNoteUseOfBuffer(ConnStateData* conn, size_t byteCount) { assert(byteCount > 0 && byteCount <= conn->in.notYetUsed); conn->in.notYetUsed -= byteCount; debugs(33, 5, HERE << "conn->in.notYetUsed = " << conn->in.notYetUsed); /* * If there is still data that will be used, * move it to the beginning. */ if (conn->in.notYetUsed > 0) xmemmove(conn->in.buf, conn->in.buf + byteCount, conn->in.notYetUsed); } +// here, an "incomplete request" means we have not found the end of headers yet int connKeepReadingIncompleteRequest(ConnStateData * conn) { - // when we read chunked requests, the entire body is buffered - // XXX: this check ignores header size and its limits. - if (conn->in.dechunkingState == ConnStateData::chunkParsing) - return ((int64_t)conn->in.notYetUsed) < Config.maxChunkedRequestBodySize; - return conn->in.notYetUsed >= Config.maxRequestHeaderSize ? 0 : 1; } void connCancelIncompleteRequests(ConnStateData * conn) { ClientSocketContext *context = parseHttpRequestAbort(conn, "error:request-too-large"); clientStreamNode *node = context->getClientReplyContext(); assert(!connKeepReadingIncompleteRequest(conn)); - if (conn->in.dechunkingState == ConnStateData::chunkParsing) { - debugs(33, 1, "Chunked request is too large (" << conn->in.notYetUsed << " bytes)"); - debugs(33, 1, "Config 'chunked_request_body_max_size'= " << Config.maxChunkedRequestBodySize << " bytes."); - } else { - debugs(33, 1, "Request header is too large (" << conn->in.notYetUsed << " bytes)"); - debugs(33, 1, "Config 'request_header_max_size'= " << Config.maxRequestHeaderSize << " bytes."); - } + debugs(33, 1, "Request header is too large (" << conn->in.notYetUsed << " bytes)"); + debugs(33, 1, "Config 'request_header_max_size'= " << Config.maxRequestHeaderSize << " bytes."); clientReplyContext *repContext = dynamic_cast(node->data.getRaw()); assert (repContext); repContext->setReplyToError(ERR_TOO_BIG, - HTTP_REQUEST_ENTITY_TOO_LARGE, METHOD_NONE, NULL, + HTTP_BAD_REQUEST, METHOD_NONE, NULL, conn->peer, NULL, NULL, NULL); context->registerWithConn(); context->pullData(); } void ConnStateData::clientMaybeReadData(int do_next_read) { if (do_next_read) { flags.readMoreRequests = true; readSomeData(); } } void ConnStateData::clientAfterReadingRequests(int do_next_read) { - /* - * If (1) we are reading a message body, (2) and the connection - * is half-closed, and (3) we didn't get the entire HTTP request - * yet, then close this connection. - */ - - if (fd_table[fd].flags.socket_eof) { - if ((int64_t)in.notYetUsed < bodySizeLeft()) { - /* Partial request received. Abort client connection! */ - debugs(33, 3, "clientAfterReadingRequests: FD " << fd << " aborted, partial request"); - comm_close(fd); - return; - } + // Were we expecting to read more request body from half-closed connection? + if (mayNeedToReadMoreBody() && commIsHalfClosed(fd)) { + debugs(33, 3, HERE << "truncated body: closing half-closed FD " << fd); + comm_close(fd); + return; } clientMaybeReadData (do_next_read); } static void clientProcessRequest(ConnStateData *conn, HttpParser *hp, ClientSocketContext *context, const HttpRequestMethod& method, HttpVersion http_ver) { ClientHttpRequest *http = context->http; HttpRequest *request = NULL; bool notedUseOfBuffer = false; - bool tePresent = false; - bool deChunked = false; + bool chunked = false; bool mustReplyToOptions = false; bool unsupportedTe = false; + bool expectBody = false; /* We have an initial client stream in place should it be needed */ /* setup our private context */ context->registerWithConn(); if (context->flags.parsed_ok == 0) { clientStreamNode *node = context->getClientReplyContext(); debugs(33, 1, "clientProcessRequest: Invalid Request"); clientReplyContext *repContext = dynamic_cast(node->data.getRaw()); assert (repContext); switch (hp->request_parse_status) { case HTTP_HEADER_TOO_LARGE: repContext->setReplyToError(ERR_TOO_BIG, HTTP_BAD_REQUEST, method, http->uri, conn->peer, NULL, conn->in.buf, NULL); break; case HTTP_METHOD_NOT_ALLOWED: repContext->setReplyToError(ERR_UNSUP_REQ, HTTP_METHOD_NOT_ALLOWED, method, http->uri, conn->peer, NULL, conn->in.buf, NULL); break; default: repContext->setReplyToError(ERR_INVALID_REQ, HTTP_BAD_REQUEST, method, http->uri, conn->peer, NULL, conn->in.buf, NULL); } @@ -2482,70 +2415,67 @@ } if (http->flags.internal) { request->protocol = PROTO_HTTP; request->login[0] = '\0'; } request->flags.internal = http->flags.internal; setLogUri (http, urlCanonicalClean(request)); request->client_addr = conn->peer; #if USE_SQUID_EUI request->client_eui48 = conn->peer_eui48; request->client_eui64 = conn->peer_eui64; #endif #if FOLLOW_X_FORWARDED_FOR request->indirect_client_addr = conn->peer; #endif /* FOLLOW_X_FORWARDED_FOR */ request->my_addr = conn->me; request->http_ver = http_ver; - tePresent = request->header.has(HDR_TRANSFER_ENCODING); - deChunked = conn->in.dechunkingState == ConnStateData::chunkReady; - if (deChunked) { - assert(tePresent); - request->setContentLength(conn->in.dechunked.contentSize()); - request->header.delById(HDR_TRANSFER_ENCODING); - conn->finishDechunkingRequest(hp); - } else - conn->cleanDechunkingRequest(); - + if (request->header.chunked()) { + chunked = true; + } else if (request->header.has(HDR_TRANSFER_ENCODING)) { + const String te = request->header.getList(HDR_TRANSFER_ENCODING); + // HTTP/1.1 requires chunking to be the last encoding if there is one + unsupportedTe = te.size() && te != "identity"; + } // else implied identity coding + if (method == METHOD_TRACE || method == METHOD_OPTIONS) request->max_forwards = request->header.getInt64(HDR_MAX_FORWARDS); mustReplyToOptions = (method == METHOD_OPTIONS) && (request->max_forwards == 0); - unsupportedTe = tePresent && !deChunked; if (!urlCheckRequest(request) || mustReplyToOptions || unsupportedTe) { clientStreamNode *node = context->getClientReplyContext(); clientReplyContext *repContext = dynamic_cast(node->data.getRaw()); assert (repContext); repContext->setReplyToError(ERR_UNSUP_REQ, HTTP_NOT_IMPLEMENTED, request->method, NULL, conn->peer, request, NULL, NULL); assert(context->http->out.offset == 0); context->pullData(); conn->flags.readMoreRequests = false; goto finish; } - if (!clientIsContentLengthValid(request)) { + if (!chunked && !clientIsContentLengthValid(request)) { clientStreamNode *node = context->getClientReplyContext(); clientReplyContext *repContext = dynamic_cast(node->data.getRaw()); assert (repContext); repContext->setReplyToError(ERR_INVALID_REQ, HTTP_LENGTH_REQUIRED, request->method, NULL, conn->peer, request, NULL, NULL); assert(context->http->out.offset == 0); context->pullData(); conn->flags.readMoreRequests = false; goto finish; } if (request->header.has(HDR_EXPECT)) { int ignore = 0; #if USE_HTTP_VIOLATIONS if (Config.onoff.ignore_expect_100) { String expect = request->header.getList(HDR_EXPECT); if (expect.caseCmp("100-continue") == 0) ignore = 1; expect.clean(); @@ -2553,64 +2483,68 @@ #endif if (!ignore) { clientStreamNode *node = context->getClientReplyContext(); clientReplyContext *repContext = dynamic_cast(node->data.getRaw()); assert (repContext); repContext->setReplyToError(ERR_INVALID_REQ, HTTP_EXPECTATION_FAILED, request->method, http->uri, conn->peer, request, NULL, NULL); assert(context->http->out.offset == 0); context->pullData(); goto finish; } } http->request = HTTPMSGLOCK(request); clientSetKeepaliveFlag(http); /* If this is a CONNECT, don't schedule a read - ssl.c will handle it */ if (http->request->method == METHOD_CONNECT) context->mayUseConnection(true); /* Do we expect a request-body? */ - if (!context->mayUseConnection() && request->content_length > 0) { - request->body_pipe = conn->expectRequestBody(request->content_length); + expectBody = chunked || request->content_length > 0; + if (!context->mayUseConnection() && expectBody) { + request->body_pipe = conn->expectRequestBody( + chunked ? -1 : request->content_length); // consume header early so that body pipe gets just the body connNoteUseOfBuffer(conn, http->req_sz); notedUseOfBuffer = true; - conn->handleRequestBodyData(); // may comm_close and stop producing - /* Is it too large? */ - - if (!clientIsRequestBodyValid(request->content_length) || - clientIsRequestBodyTooLargeForPolicy(request->content_length)) { + if (!chunked && // if chunked, we will check as we accumulate + clientIsRequestBodyTooLargeForPolicy(request->content_length)) { clientStreamNode *node = context->getClientReplyContext(); clientReplyContext *repContext = dynamic_cast(node->data.getRaw()); assert (repContext); repContext->setReplyToError(ERR_TOO_BIG, HTTP_REQUEST_ENTITY_TOO_LARGE, METHOD_NONE, NULL, conn->peer, http->request, NULL, NULL); assert(context->http->out.offset == 0); context->pullData(); goto finish; } + // We may stop producing, comm_close, and/or call setReplyToError() + // below, so quit on errors to avoid http->doCallouts() + if (!conn->handleRequestBodyData()) + goto finish; + if (!request->body_pipe->productionEnded()) conn->readSomeData(); context->mayUseConnection(!request->body_pipe->productionEnded()); } http->calloutContext = new ClientRequestContext(http); http->doCallouts(); finish: if (!notedUseOfBuffer) connNoteUseOfBuffer(conn, http->req_sz); /* * DPW 2007-05-18 * Moved the TCP_RESET feature from clientReplyContext::sendMoreData * to here because calling comm_reset_close() causes http to * be freed and the above connNoteUseOfBuffer() would hit an * assertion, not to mention that we were accessing freed memory. @@ -2631,75 +2565,61 @@ --conn->in.notYetUsed; } } static int connOkToAddRequest(ConnStateData * conn) { int result = conn->getConcurrentRequestCount() < (Config.onoff.pipeline_prefetch ? 2 : 1); if (!result) { debugs(33, 3, "connOkToAddRequest: FD " << conn->fd << " max concurrent requests reached"); debugs(33, 5, "connOkToAddRequest: FD " << conn->fd << " defering new request until one is done"); } return result; } /** - * bodySizeLeft - * - * Report on the number of bytes of body content that we - * know are yet to be read on this connection. - */ -int64_t -ConnStateData::bodySizeLeft() -{ - // XXX: this logic will not work for chunked requests with unknown sizes - - if (bodyPipe != NULL) - return bodyPipe->unproducedSize(); - - return 0; -} - -/** * Attempt to parse one or more requests from the input buffer. * If a request is successfully parsed, even if the next request * is only partially parsed, it will return TRUE. * do_next_read is updated to indicate whether a read should be * scheduled. */ static bool clientParseRequest(ConnStateData * conn, bool &do_next_read) { HttpRequestMethod method; ClientSocketContext *context; bool parsed_req = false; HttpVersion http_ver; HttpParser hp; debugs(33, 5, "clientParseRequest: FD " << conn->fd << ": attempting to parse"); - while (conn->in.notYetUsed > 0 && conn->bodySizeLeft() == 0) { + // Loop while we have read bytes that are not needed for producing the body + // On errors, bodyPipe may become nil, but readMoreRequests will be cleared + while (conn->in.notYetUsed > 0 && !conn->bodyPipe && + conn->flags.readMoreRequests) { connStripBufferWhitespace (conn); /* Don't try to parse if the buffer is empty */ if (conn->in.notYetUsed == 0) break; /* Limit the number of concurrent requests to 2 */ if (!connOkToAddRequest(conn)) { break; } /* Should not be needed anymore */ /* Terminate the string */ conn->in.buf[conn->in.notYetUsed] = '\0'; /* Begin the parsing */ HttpParserInit(&hp, conn->in.buf, conn->in.notYetUsed); @@ -2717,93 +2637,84 @@ connCancelIncompleteRequests(conn); break; } /* status -1 or 1 */ if (context) { debugs(33, 5, "clientParseRequest: FD " << conn->fd << ": parsed a request"); commSetTimeout(conn->fd, Config.Timeout.lifetime, clientLifetimeTimeout, context->http); clientProcessRequest(conn, &hp, context, method, http_ver); parsed_req = true; if (context->mayUseConnection()) { debugs(33, 3, "clientParseRequest: Not reading, as this request may need the connection"); do_next_read = 0; break; } - - if (!conn->flags.readMoreRequests) { - conn->flags.readMoreRequests = true; - break; - } - - continue; /* while offset > 0 && conn->bodySizeLeft() == 0 */ } - } /* while offset > 0 && conn->bodySizeLeft() == 0 */ + } /* XXX where to 'finish' the parsing pass? */ return parsed_req; } void ConnStateData::clientReadRequest(const CommIoCbParams &io) { debugs(33,5,HERE << "clientReadRequest FD " << io.fd << " size " << io.size); Must(reading()); reader = NULL; bool do_next_read = 1; /* the default _is_ to read data! - adrian */ assert (io.fd == fd); /* Bail out quickly on COMM_ERR_CLOSING - close handlers will tidy up */ if (io.flag == COMM_ERR_CLOSING) { debugs(33,5, HERE << " FD " << fd << " closing Bailout."); return; } /* * Don't reset the timeout value here. The timeout value will be * set to Config.Timeout.request by httpAccept() and * clientWriteComplete(), and should apply to the request as a * whole, not individual read() calls. Plus, it breaks our * lame half-close detection */ if (connReadWasError(io.flag, io.size, io.xerrno)) { notifyAllContexts(io.xerrno); comm_close(fd); return; } if (io.flag == COMM_OK) { if (io.size > 0) { kb_incr(&statCounter.client_http.kbytes_in, io.size); - handleReadData(io.buf, io.size); - - /* The above may close the connection under our feets */ - if (!isOpen()) + // may comm_close or setReplyToError + if (!handleReadData(io.buf, io.size)) return; } else if (io.size == 0) { debugs(33, 5, "clientReadRequest: FD " << fd << " closed?"); if (connFinishedWithConn(io.size)) { comm_close(fd); return; } /* It might be half-closed, we can't tell */ fd_table[fd].flags.socket_eof = 1; commMarkHalfClosed(fd); do_next_read = 0; fd_note(fd, "half-closed"); /* There is one more close check at the end, to detect aborted @@ -2827,150 +2738,185 @@ * The above check with connFinishedWithConn() only * succeeds _if_ the buffer is empty which it won't * be if we have an incomplete request. * XXX: This duplicates ClientSocketContext::keepaliveNextRequest */ if (getConcurrentRequestCount() == 0 && commIsHalfClosed(fd)) { debugs(33, 5, "clientReadRequest: FD " << fd << ": half-closed connection, no completed request parsed, connection closing."); comm_close(fd); return; } } if (!isOpen()) return; clientAfterReadingRequests(do_next_read); } /** * called when new request data has been read from the socket + * + * \retval false called comm_close or setReplyToError (the caller should bail) + * \retval true we did not call comm_close or setReplyToError */ -void +bool ConnStateData::handleReadData(char *buf, size_t size) { char *current_buf = in.addressToReadInto(); if (buf != current_buf) xmemmove(current_buf, buf, size); in.notYetUsed += size; in.buf[in.notYetUsed] = '\0'; /* Terminate the string */ // if we are reading a body, stuff data into the body pipe if (bodyPipe != NULL) - handleRequestBodyData(); + return handleRequestBodyData(); + return true; } /** * called when new request body data has been buffered in in.buf * may close the connection if we were closing and piped everything out + * + * \retval false called comm_close or setReplyToError (the caller should bail) + * \retval true we did not call comm_close or setReplyToError */ -void +bool ConnStateData::handleRequestBodyData() { assert(bodyPipe != NULL); size_t putSize = 0; -#if FUTURE_CODE_TO_SUPPORT_CHUNKED_REQUESTS - // The code below works, in principle, but we cannot do dechunking - // on-the-fly because that would mean sending chunked requests to - // the next hop. Squid lacks logic to determine which servers can - // receive chunk requests. Squid v3.0 code cannot even handle chunked - // responses which we may encourage by sending chunked requests. - // The error generation code probably needs more work. - if (in.bodyParser) { // chunked body - debugs(33,5, HERE << "handling chunked request body for FD " << fd); - bool malformedChunks = false; - - MemBuf raw; // ChunkedCodingParser only works with MemBufs - raw.init(in.notYetUsed, in.notYetUsed); - raw.append(in.buf, in.notYetUsed); - try { // the parser will throw on errors - const mb_size_t wasContentSize = raw.contentSize(); - BodyPipeCheckout bpc(*bodyPipe); - const bool parsed = in.bodyParser->parse(&raw, &bpc.buf); - bpc.checkIn(); - putSize = wasContentSize - raw.contentSize(); - - if (parsed) { - stopProducingFor(bodyPipe, true); // this makes bodySize known - } else { - // parser needy state must imply body pipe needy state - if (in.bodyParser->needsMoreData() && - !bodyPipe->mayNeedMoreData()) - malformedChunks = true; - // XXX: if bodyParser->needsMoreSpace, how can we guarantee it? - } - } catch (...) { // XXX: be more specific - malformedChunks = true; - } - - if (malformedChunks) { - if (bodyPipe != NULL) - stopProducingFor(bodyPipe, false); - - ClientSocketContext::Pointer context = getCurrentContext(); - if (!context->http->out.offset) { - clientStreamNode *node = context->getClientReplyContext(); - clientReplyContext *repContext = dynamic_cast(node->data.getRaw()); - assert (repContext); - repContext->setReplyToError(ERR_INVALID_REQ, HTTP_BAD_REQUEST, - METHOD_NONE, NULL, &peer.sin_addr, - NULL, NULL, NULL); - context->pullData(); - } - flags.readMoreRequests = false; - return; // XXX: is that sufficient to generate an error? + if (in.bodyParser) { // chunked encoding + if (const err_type error = handleChunkedRequestBody(putSize)) { + abortChunkedRequestBody(error); + return false; } - } else // identity encoding -#endif - { + } else { // identity encoding debugs(33,5, HERE << "handling plain request body for FD " << fd); putSize = bodyPipe->putMoreData(in.buf, in.notYetUsed); if (!bodyPipe->mayNeedMoreData()) { // BodyPipe will clear us automagically when we produced everything bodyPipe = NULL; } } if (putSize > 0) connNoteUseOfBuffer(this, putSize); if (!bodyPipe) { debugs(33,5, HERE << "produced entire request body for FD " << fd); if (closing()) { /* we've finished reading like good clients, * now do the close that initiateClose initiated. - * - * XXX: do we have to close? why not check keepalive et. - * - * XXX: To support chunked requests safely, we need to handle - * the case of an endless request. This if-statement does not, - * because mayNeedMoreData is true if request size is not known. */ comm_close(fd); + return false; } } + + return true; +} + +/// parses available chunked encoded body bytes, checks size, returns errors +err_type +ConnStateData::handleChunkedRequestBody(size_t &putSize) +{ + debugs(33,7, HERE << "chunked from FD " << fd << ": " << in.notYetUsed); + + try { // the parser will throw on errors + + if (!in.notYetUsed) // nothing to do (MemBuf::init requires this check) + return ERR_NONE; + + MemBuf raw; // ChunkedCodingParser only works with MemBufs + raw.init(in.notYetUsed, in.notYetUsed); + raw.append(in.buf, in.notYetUsed); + + const mb_size_t wasContentSize = raw.contentSize(); + BodyPipeCheckout bpc(*bodyPipe); + const bool parsed = in.bodyParser->parse(&raw, &bpc.buf); + bpc.checkIn(); + putSize = wasContentSize - raw.contentSize(); + + // dechunk then check: the size limit applies to _dechunked_ content + if (clientIsRequestBodyTooLargeForPolicy(bodyPipe->producedSize())) + return ERR_TOO_BIG; + + if (parsed) { + finishDechunkingRequest(true); + Must(!bodyPipe); + return ERR_NONE; // nil bodyPipe implies body end for the caller + } + + // if chunk parser needs data, then the body pipe must need it too + Must(!in.bodyParser->needsMoreData() || bodyPipe->mayNeedMoreData()); + + // if parser needs more space and we can consume nothing, we will stall + Must(!in.bodyParser->needsMoreSpace() || bodyPipe->buf().hasContent()); + } catch (...) { // TODO: be more specific + debugs(33, 3, HERE << "malformed chunks" << bodyPipe->status()); + return ERR_INVALID_REQ; + } + + debugs(33, 7, HERE << "need more chunked data" << *bodyPipe->status()); + return ERR_NONE; +} + +/// quit on errors related to chunked request body handling +void +ConnStateData::abortChunkedRequestBody(const err_type error) +{ + finishDechunkingRequest(false); + + // XXX: The code below works if we fail during initial request parsing, + // but if we fail when the server-side works already, the server may send + // us its response too, causing various assertions. How to prevent that? +#if WE_KNOW_HOW_TO_SEND_ERRORS + ClientSocketContext::Pointer context = getCurrentContext(); + if (context != NULL && !context->http->out.offset) { // output nothing yet + clientStreamNode *node = context->getClientReplyContext(); + clientReplyContext *repContext = dynamic_cast(node->data.getRaw()); + assert(repContext); + const http_status scode = (error == ERR_TOO_BIG) ? + HTTP_REQUEST_ENTITY_TOO_LARGE : HTTP_BAD_REQUEST; + repContext->setReplyToError(error, scode, + repContext->http->request->method, + repContext->http->uri, + peer, + repContext->http->request, + in.buf, NULL); + context->pullData(); + } else { + // close or otherwise we may get stuck as nobody will notice the error? + comm_reset_close(fd); + } +#else + comm_reset_close(fd); +#endif + flags.readMoreRequests = false; } void ConnStateData::noteMoreBodySpaceAvailable(BodyPipe::Pointer ) { handleRequestBodyData(); } void ConnStateData::noteBodyConsumerAborted(BodyPipe::Pointer ) { if (!closing()) startClosing("body consumer aborted"); } /** general lifetime handler for HTTP requests */ void ConnStateData::requestTimeout(const CommTimeoutCbParams &io) { #if THIS_CONFUSES_PERSISTENT_CONNECTION_AWARE_BROWSERS_AND_USERS @@ -3699,196 +3645,141 @@ bool ConnStateData::reading() const { return reader != NULL; } void ConnStateData::stopReading() { if (reading()) { comm_read_cancel(fd, reader); reader = NULL; } } BodyPipe::Pointer ConnStateData::expectRequestBody(int64_t size) { bodyPipe = new BodyPipe(this); - bodyPipe->setBodySize(size); + if (size >= 0) + bodyPipe->setBodySize(size); + else + startDechunkingRequest(); return bodyPipe; } +int64_t +ConnStateData::mayNeedToReadMoreBody() const +{ + if (!bodyPipe) + return 0; // request without a body or read/produced all body bytes + + if (!bodyPipe->bodySizeKnown()) + return -1; // probably need to read more, but we cannot be sure + + const int64_t needToProduce = bodyPipe->unproducedSize(); + const int64_t haveAvailable = static_cast(in.notYetUsed); + + if (needToProduce <= haveAvailable) + return 0; // we have read what we need (but are waiting for pipe space) + + return needToProduce - haveAvailable; +} + bool ConnStateData::closing() const { return closing_; } /** * Called by ClientSocketContext to give the connection a chance to read * the entire body before closing the socket. */ void ConnStateData::startClosing(const char *reason) { debugs(33, 5, HERE << "startClosing " << this << " for " << reason); assert(!closing()); closing_ = true; assert(bodyPipe != NULL); - assert(bodySizeLeft() > 0); // We do not have to abort the body pipeline because we are going to // read the entire body anyway. // Perhaps an ICAP server wants to log the complete request. // If a consumer abort have caused this closing, we may get stuck // as nobody is consuming our data. Allow auto-consumption. bodyPipe->enableAutoConsumption(); } -// initialize dechunking state +/// initialize dechunking state void -ConnStateData::startDechunkingRequest(HttpParser *hp) +ConnStateData::startDechunkingRequest() { - debugs(33, 5, HERE << "start dechunking at " << HttpParserRequestLen(hp)); - assert(in.dechunkingState == chunkUnknown); + Must(bodyPipe != NULL); + debugs(33, 5, HERE << "start dechunking" << bodyPipe->status()); assert(!in.bodyParser); in.bodyParser = new ChunkedCodingParser; - in.chunkedSeen = HttpParserRequestLen(hp); // skip headers when dechunking - in.chunked.init(); // TODO: should we have a smaller-than-default limit? - in.dechunked.init(); - in.dechunkingState = chunkParsing; } -// put parsed content into input buffer and clean up +/// put parsed content into input buffer and clean up void -ConnStateData::finishDechunkingRequest(HttpParser *hp) +ConnStateData::finishDechunkingRequest(bool withSuccess) { - debugs(33, 5, HERE << "finish dechunking; content: " << in.dechunked.contentSize()); - - assert(in.dechunkingState == chunkReady); - - const mb_size_t headerSize = HttpParserRequestLen(hp); - - // dechunking cannot make data bigger - assert(headerSize + in.dechunked.contentSize() + in.chunked.contentSize() - <= static_cast(in.notYetUsed)); - assert(in.notYetUsed <= in.allocatedSize); - - // copy dechunked content - char *end = in.buf + headerSize; - xmemmove(end, in.dechunked.content(), in.dechunked.contentSize()); - end += in.dechunked.contentSize(); - - // copy post-chunks leftovers, if any, caused by request pipelining? - if (in.chunked.contentSize()) { - xmemmove(end, in.chunked.content(), in.chunked.contentSize()); - end += in.chunked.contentSize(); - } - - in.notYetUsed = end - in.buf; + debugs(33, 5, HERE << "finish dechunking: " << withSuccess); - cleanDechunkingRequest(); -} - -/// cleanup dechunking state, get ready for the next request -void -ConnStateData::cleanDechunkingRequest() -{ - if (in.dechunkingState > chunkNone) { - delete in.bodyParser; - in.bodyParser = NULL; - in.chunked.clean(); - in.dechunked.clean(); - } - in.dechunkingState = chunkUnknown; -} - -// parse newly read request chunks and buffer them for finishDechunkingRequest -// returns true iff needs more data -bool -ConnStateData::parseRequestChunks(HttpParser *) -{ - debugs(33,5, HERE << "parsing chunked request body at " << - in.chunkedSeen << " < " << in.notYetUsed); - assert(in.bodyParser); - assert(in.dechunkingState == chunkParsing); - - assert(in.chunkedSeen <= in.notYetUsed); - const mb_size_t fresh = in.notYetUsed - in.chunkedSeen; - - // be safe: count some chunked coding metadata towards the total body size - if (fresh + in.dechunked.contentSize() > Config.maxChunkedRequestBodySize) { - debugs(33,3, HERE << "chunked body (" << fresh << " + " << - in.dechunked.contentSize() << " may exceed " << - "chunked_request_body_max_size=" << - Config.maxChunkedRequestBodySize); - in.dechunkingState = chunkError; - return false; - } - - if (fresh > in.chunked.potentialSpaceSize()) { - // should not happen if Config.maxChunkedRequestBodySize is reasonable - debugs(33,1, HERE << "request_body_max_size exceeds chunked buffer " << - "size: " << fresh << " + " << in.chunked.contentSize() << " > " << - in.chunked.potentialSpaceSize() << " with " << - "chunked_request_body_max_size=" << - Config.maxChunkedRequestBodySize); - in.dechunkingState = chunkError; - return false; + if (bodyPipe != NULL) { + debugs(33, 7, HERE << "dechunked tail: " << bodyPipe->status()); + BodyPipe::Pointer myPipe = bodyPipe; + stopProducingFor(bodyPipe, withSuccess); // sets bodyPipe->bodySize() + Must(!bodyPipe); // we rely on it being nil after we are done with body + if (withSuccess) { + Must(myPipe->bodySizeKnown()); + ClientSocketContext::Pointer context = getCurrentContext(); + if (context != NULL && context->http && context->http->request) + context->http->request->setContentLength(myPipe->bodySize()); + } } - in.chunked.append(in.buf + in.chunkedSeen, fresh); - in.chunkedSeen += fresh; - try { // the parser will throw on errors - if (in.bodyParser->parse(&in.chunked, &in.dechunked)) - in.dechunkingState = chunkReady; // successfully parsed all chunks - else - return true; // need more, keep the same state - } catch (...) { - debugs(33,3, HERE << "chunk parsing error"); - in.dechunkingState = chunkError; - } - return false; // error, unsupported, or done + delete in.bodyParser; + in.bodyParser = NULL; } char * ConnStateData::In::addressToReadInto() const { return buf + notYetUsed; } ConnStateData::In::In() : bodyParser(NULL), - buf (NULL), notYetUsed (0), allocatedSize (0), - dechunkingState(ConnStateData::chunkUnknown) + buf (NULL), notYetUsed (0), allocatedSize (0) {} ConnStateData::In::~In() { if (allocatedSize) memFreeBuf(allocatedSize, buf); - if (bodyParser) - delete bodyParser; // TODO: pool + delete bodyParser; // TODO: pool } /* This is a comm call normally scheduled by comm_close() */ void ConnStateData::clientPinnedConnectionClosed(const CommCloseCbParams &io) { pinning.fd = -1; if (pinning.peer) { cbdataReferenceDone(pinning.peer); } safe_free(pinning.host); /* NOTE: pinning.pinned should be kept. This combined with fd == -1 at the end of a request indicates that the host * connection has gone away */ } void ConnStateData::pinConnection(int pinning_fd, HttpRequest *request, struct peer *aPeer, bool auth) { fde *f; char desc[FD_DESC_SZ]; === modified file 'src/client_side.h' --- src/client_side.h 2010-08-24 00:02:15 +0000 +++ src/client_side.h 2010-09-05 07:13:25 +0000 @@ -133,59 +133,58 @@ public: ConnStateData(); ~ConnStateData(); void readSomeData(); int getAvailableBufferLength() const; bool areAllContextsForThisConnection() const; void freeAllContexts(); void notifyAllContexts(const int xerrno); ///< tell everybody about the err void readNextRequest(); void makeSpaceAvailable(); ClientSocketContext::Pointer getCurrentContext() const; void addContextToQueue(ClientSocketContext * context); int getConcurrentRequestCount() const; bool isOpen() const; int fd; - /// chunk buffering and parsing algorithm state - typedef enum { chunkUnknown, chunkNone, chunkParsing, chunkReady, chunkError } DechunkingState; - struct In { In(); ~In(); char *addressToReadInto() const; ChunkedCodingParser *bodyParser; ///< parses chunked request body - MemBuf chunked; ///< contains unparsed raw (chunked) body data - MemBuf dechunked; ///< accumulates parsed (dechunked) content char *buf; size_t notYetUsed; size_t allocatedSize; - size_t chunkedSeen; ///< size of processed or ignored raw read data - DechunkingState dechunkingState; ///< request dechunking state } in; - int64_t bodySizeLeft(); + /** number of body bytes we need to comm_read for the "current" request + * + * \retval 0 We do not need to read any [more] body bytes + * \retval negative May need more but do not know how many; could be zero! + * \retval positive Need to read exactly that many more body bytes + */ + int64_t mayNeedToReadMoreBody() const; /** * note this is ONLY connection based because NTLM and Negotiate is against HTTP spec. * the user details for connection based authentication */ AuthUserRequest::Pointer auth_user_request; /** * used by the owner of the connection, opaque otherwise * TODO: generalise the connection owner concept. */ ClientSocketContext::Pointer currentobject; Ip::Address peer; Ip::Address me; Ip::Address log_addr; char rfc931[USER_IDENT_SZ]; int nrequests; @@ -206,42 +205,42 @@ bool pinned; /* this connection was pinned */ bool auth; /* pinned for www authentication */ struct peer *peer; /* peer the connection goes via */ AsyncCall::Pointer closeHandler; /*The close handler for pinned server side connection*/ } pinning; http_port_list *port; bool transparent() const; void transparent(bool const); bool reading() const; void stopReading(); ///< cancels comm_read if it is scheduled bool closing() const; void startClosing(const char *reason); BodyPipe::Pointer expectRequestBody(int64_t size); virtual void noteMoreBodySpaceAvailable(BodyPipe::Pointer); virtual void noteBodyConsumerAborted(BodyPipe::Pointer); - void handleReadData(char *buf, size_t size); - void handleRequestBodyData(); + bool handleReadData(char *buf, size_t size); + bool handleRequestBodyData(); /** * Correlate the current ConnStateData object with the pinning_fd socket descriptor. */ void pinConnection(int fd, HttpRequest *request, struct peer *peer, bool auth); /** * Decorrelate the ConnStateData object from its pinned peer */ void unpinConnection(); /** * Checks if there is pinning info if it is valid. It can close the server side connection * if pinned info is not valid. \param request if it is not NULL also checks if the pinning info refers to the request client side HttpRequest \param peer if it is not NULL also check if the peer is the pinning peer \return The fd of the server side connection or -1 if fails. */ int validatePinnedConnection(HttpRequest *request, const struct peer *peer); /** * returts the pinned peer if exists, NULL otherwise */ @@ -250,44 +249,45 @@ // pining related comm callbacks void clientPinnedConnectionClosed(const CommCloseCbParams &io); // comm callbacks void clientReadRequest(const CommIoCbParams &io); void connStateClosed(const CommCloseCbParams &io); void requestTimeout(const CommTimeoutCbParams ¶ms); // AsyncJob API virtual bool doneAll() const { return BodyProducer::doneAll() && false;} virtual void swanSong(); #if USE_SSL bool switchToHttps(); bool switchedToHttps() const { return switchedToHttps_; } #else bool switchedToHttps() const { return false; } #endif - void startDechunkingRequest(HttpParser *hp); - bool parseRequestChunks(HttpParser *hp); - void finishDechunkingRequest(HttpParser *hp); - void cleanDechunkingRequest(); +protected: + void startDechunkingRequest(); + void finishDechunkingRequest(bool withSuccess); + void abortChunkedRequestBody(const err_type error); + err_type handleChunkedRequestBody(size_t &putSize); private: int connReadWasError(comm_err_t flag, int size, int xerrno); int connFinishedWithConn(int size); void clientMaybeReadData(int do_next_read); void clientAfterReadingRequests(int do_next_read); private: CBDATA_CLASS2(ConnStateData); bool transparent_; bool closing_; bool switchedToHttps_; AsyncCall::Pointer reader; ///< set when we are reading BodyPipe::Pointer bodyPipe; // set when we are reading request body }; /* convenience class while splitting up body handling */ /* temporary existence only - on stack use expected */ === modified file 'src/ftp.cc' --- src/ftp.cc 2010-08-24 00:12:54 +0000 +++ src/ftp.cc 2010-09-04 23:39:15 +0000 @@ -3806,40 +3806,41 @@ writeReplyBody(buf, strlen(buf)); } /** * Call this when there is data from the origin server * which should be sent to either StoreEntry, or to ICAP... */ void FtpStateData::writeReplyBody(const char *dataToWrite, size_t dataLength) { debugs(9, 5, HERE << "writing " << dataLength << " bytes to the reply"); addVirginReplyBody(dataToWrite, dataLength); } /** * called after we wrote the last byte of the request body */ void FtpStateData::doneSendingRequestBody() { + ServerStateData::doneSendingRequestBody(); debugs(9,3, HERE); dataComplete(); /* NP: RFC 959 3.3. DATA CONNECTION MANAGEMENT * if transfer type is 'stream' call dataComplete() * otherwise leave open. (reschedule control channel read?) */ } /** * A hack to ensure we do not double-complete on the forward entry. * \todo FtpStateData logic should probably be rewritten to avoid * double-completion or FwdState should be rewritten to allow it. */ void FtpStateData::completeForwarding() { if (fwd == NULL || flags.completed_forwarding) { debugs(9, 3, HERE << "completeForwarding avoids " << "double-complete on FD " << ctrl.fd << ", Data FD " << data.fd << === modified file 'src/http.cc' --- src/http.cc 2010-08-24 04:18:51 +0000 +++ src/http.cc 2010-09-05 08:15:22 +0000 @@ -1390,70 +1390,75 @@ /* * why <2? Because delayAwareRead() won't actually read if * you ask it to read 1 byte. The delayed read request * just gets re-queued until the client side drains, then * the I/O thread hangs. Better to not register any read * handler until we get a notification from someone that * its okay to read again. */ if (read_size < 2) return; if (flags.do_next_read) { flags.do_next_read = 0; typedef CommCbMemFunT Dialer; entry->delayAwareRead(fd, readBuf->space(read_size), read_size, JobCallback(11, 5, Dialer, this, HttpStateData::readReply)); } } -/* - * This will be called when request write is complete. - */ +/// called after writing the very last request byte (body, last-chunk, etc) void -HttpStateData::sendComplete(const CommIoCbParams &io) +HttpStateData::wroteLast(const CommIoCbParams &io) { - debugs(11, 5, "httpSendComplete: FD " << fd << ": size " << io.size << ": errflag " << io.flag << "."); + debugs(11, 5, HERE << "FD " << fd << ": size " << io.size << ": errflag " << io.flag << "."); #if URL_CHECKSUM_DEBUG entry->mem_obj->checkUrlChecksum(); #endif if (io.size > 0) { fd_bytes(fd, io.size, FD_WRITE); kb_incr(&statCounter.server.all.kbytes_out, io.size); kb_incr(&statCounter.server.http.kbytes_out, io.size); } if (io.flag == COMM_ERR_CLOSING) return; if (io.flag) { ErrorState *err; err = errorCon(ERR_WRITE_ERROR, HTTP_BAD_GATEWAY, fwd->request); err->xerrno = io.xerrno; fwd->fail(err); comm_close(fd); return; } + sendComplete(); +} + +/// successfully wrote the entire request (including body, last-chunk, etc.) +void +HttpStateData::sendComplete() +{ /* * Set the read timeout here because it hasn't been set yet. * We only set the read timeout after the request has been * fully written to the server-side. If we start the timeout * after connection establishment, then we are likely to hit * the timeout for POST/PUT requests that have very large * request bodies. */ typedef CommCbMemFunT TimeoutDialer; AsyncCall::Pointer timeoutCall = JobCallback(11, 5, TimeoutDialer, this, HttpStateData::httpTimeout); commSetTimeout(fd, Config.Timeout.read, timeoutCall); flags.request_sent = 1; orig_request->hier.peer_http_request_sent = current_time; } // Close the HTTP server connection. Used by serverComplete(). @@ -1723,40 +1728,48 @@ /* Enforce sibling relations */ if (flags.only_if_cached) EBIT_SET(cc->mask, CC_ONLY_IF_CACHED); hdr_out->putCc(cc); httpHdrCcDestroy(cc); } /* maybe append Connection: keep-alive */ if (flags.keepalive) { hdr_out->putStr(HDR_CONNECTION, "keep-alive"); } /* append Front-End-Https */ if (flags.front_end_https) { if (flags.front_end_https == 1 || request->protocol == PROTO_HTTPS) hdr_out->putStr(HDR_FRONT_END_HTTPS, "On"); } + if (orig_request->header.chunked() && orig_request->content_length <= 0) { + /* Preserve original chunked encoding unless we learned the length. + * Do not just copy the original value so that if the client-side + * starts decode other encodings, this code may remain valid. + */ + hdr_out->putStr(HDR_TRANSFER_ENCODING, "chunked"); + } + /* Now mangle the headers. */ if (Config2.onoff.mangle_request_headers) httpHdrMangleList(hdr_out, request, ROR_REQUEST); strConnection.clean(); } /** * Decides whether a particular header may be cloned from the received Clients request * to our outgoing fetch request. */ void copyOneHeaderFromClientsideRequestToUpstreamRequest(const HttpHeaderEntry *e, const String strConnection, HttpRequest * request, const HttpRequest * orig_request, HttpHeader * hdr_out, const int we_do_ranges, const http_state_flags flags) { debugs(11, 5, "httpBuildRequestHeader: " << e->name << ": " << e->value ); switch (e->id) { /** \par RFC 2616 sect 13.5.1 - Hop-by-Hop headers which Squid should not pass on. */ @@ -1987,41 +2000,41 @@ return false; } typedef CommCbMemFunT TimeoutDialer; AsyncCall::Pointer timeoutCall = JobCallback(11, 5, TimeoutDialer, this, HttpStateData::httpTimeout); commSetTimeout(fd, Config.Timeout.lifetime, timeoutCall); flags.do_next_read = 1; maybeReadVirginBody(); if (orig_request->body_pipe != NULL) { if (!startRequestBodyFlow()) // register to receive body data return false; typedef CommCbMemFunT Dialer; requestSender = JobCallback(11,5, Dialer, this, HttpStateData::sentRequestBody); } else { assert(!requestBodySource); typedef CommCbMemFunT Dialer; requestSender = JobCallback(11,5, - Dialer, this, HttpStateData::sendComplete); + Dialer, this, HttpStateData::wroteLast); } if (_peer != NULL) { if (_peer->options.originserver) { flags.proxying = 0; flags.originpeer = 1; } else { flags.proxying = 1; flags.originpeer = 0; } } else { flags.proxying = 0; flags.originpeer = 0; } /* * Is keep-alive okay for all request methods? */ if (orig_request->flags.must_keepalive) flags.keepalive = 1; @@ -2035,157 +2048,217 @@ (double) _peer->stats.n_keepalives_sent > 0.50) flags.keepalive = 1; if (_peer) { if (neighborType(_peer, request) == PEER_SIBLING && !_peer->options.allow_miss) flags.only_if_cached = 1; flags.front_end_https = _peer->front_end_https; } mb.init(); request->peer_host=_peer?_peer->host:NULL; buildRequestPrefix(request, orig_request, entry, &mb, flags); debugs(11, 6, "httpSendRequest: FD " << fd << ":\n" << mb.buf); comm_write_mbuf(fd, &mb, requestSender); return true; } +bool +HttpStateData::getMoreRequestBody(MemBuf &buf) +{ + // parent's implementation can handle the no-encoding case + if (!request->header.chunked()) + return ServerStateData::getMoreRequestBody(buf); + + MemBuf raw; + + Must(requestBodySource != NULL); + if (!requestBodySource->getMoreData(raw)) + return false; // no request body bytes to chunk yet + + // optimization: pre-allocate buffer size that should be enough + const mb_size_t rawDataSize = raw.contentSize(); + // we may need to send: hex-chunk-size CRLF raw-data CRLF last-chunk + buf.init(16 + 2 + rawDataSize + 2 + 5, raw.max_capacity); + + buf.Printf("%"PRIX64"\r\n", rawDataSize); + buf.append(raw.content(), rawDataSize); + buf.Printf("\r\n"); + + Must(rawDataSize > 0); // we did not accidently created last-chunk above + + // Do not send last-chunk unless we successfully received everything + if (receivedWholeRequestBody) { + Must(!flags.sentLastChunk); + flags.sentLastChunk = true; + buf.append("0\r\n\r\n", 5); + } + + return true; +} + void httpStart(FwdState *fwd) { debugs(11, 3, "httpStart: \"" << RequestMethodStr(fwd->request->method) << " " << fwd->entry->url() << "\"" ); HttpStateData *httpState = new HttpStateData(fwd); if (!httpState->sendRequest()) { debugs(11, 3, "httpStart: aborted"); delete httpState; return; } statCounter.server.all.requests++; statCounter.server.http.requests++; /* * We used to set the read timeout here, but not any more. * Now its set in httpSendComplete() after the full request, * including request body, has been written to the server. */ } -void -HttpStateData::doneSendingRequestBody() +/// if broken posts are enabled for the request, try to fix and return true +bool +HttpStateData::finishingBrokenPost() { - debugs(11,5, HERE << "doneSendingRequestBody: FD " << fd); - #if USE_HTTP_VIOLATIONS - if (Config.accessList.brokenPosts) { - ACLFilledChecklist ch(Config.accessList.brokenPosts, request, NULL); - if (!ch.fastCheck()) { - debugs(11, 5, "doneSendingRequestBody: didn't match brokenPosts"); - CommIoCbParams io(NULL); - io.fd=fd; - io.flag=COMM_OK; - sendComplete(io); - } else { - debugs(11, 2, "doneSendingRequestBody: matched brokenPosts"); + if (!Config.accessList.brokenPosts) { + debugs(11, 5, HERE << "No brokenPosts list"); + return false; + } - if (!canSend(fd)) { - debugs(11,2, HERE << "cannot send CRLF to closing FD " << fd); - assert(closeHandler != NULL); - return; - } + ACLFilledChecklist ch(Config.accessList.brokenPosts, request, NULL); + if (!ch.fastCheck()) { + debugs(11, 5, HERE << "didn't match brokenPosts"); + return false; + } - typedef CommCbMemFunT Dialer; - AsyncCall::Pointer call = JobCallback(11,5, - Dialer, this, HttpStateData::sendComplete); - comm_write(fd, "\r\n", 2, call); - } - return; + if (!canSend(fd)) { + debugs(11,2, HERE << "ignoring broken POST for closing FD " << fd); + assert(closeHandler != NULL); + return true; // prevent caller from proceeding as if nothing happened } - debugs(11, 5, "doneSendingRequestBody: No brokenPosts list"); + + debugs(11, 2, "finishingBrokenPost: fixing broken POST"); + typedef CommCbMemFunT Dialer; + requestSender = JobCallback(11,5, + Dialer, this, HttpStateData::wroteLast); + comm_write(fd, "\r\n", 2, requestSender); + return true; +#else + return false; #endif /* USE_HTTP_VIOLATIONS */ +} + +/// if needed, write last-chunk to end the request body and return true +bool +HttpStateData::finishingChunkedRequest() +{ + if (flags.sentLastChunk) { + debugs(11, 5, HERE << "already sent last-chunk"); + return false; + } - CommIoCbParams io(NULL); - io.fd=fd; - io.flag=COMM_OK; - sendComplete(io); + Must(receivedWholeRequestBody); // or we should not be sending last-chunk + flags.sentLastChunk = true; + + typedef CommCbMemFunT Dialer; + requestSender = JobCallback(11,5, + Dialer, this, HttpStateData::wroteLast); + comm_write(fd, "0\r\n\r\n", 5, requestSender); + return true; +} + +void +HttpStateData::doneSendingRequestBody() +{ + ServerStateData::doneSendingRequestBody(); + debugs(11,5, HERE << "doneSendingRequestBody: FD " << fd); + + // do we need to write something after the last body byte? + const bool chunked = request->header.chunked(); + if (chunked && finishingChunkedRequest()) + return; + if (!chunked && finishingBrokenPost()) + return; + + sendComplete(); } // more origin request body data is available void HttpStateData::handleMoreRequestBodyAvailable() { if (eof || fd < 0) { // XXX: we should check this condition in other callbacks then! // TODO: Check whether this can actually happen: We should unsubscribe // as a body consumer when the above condition(s) are detected. debugs(11, 1, HERE << "Transaction aborted while reading HTTP body"); return; } assert(requestBodySource != NULL); if (requestBodySource->buf().hasContent()) { // XXX: why does not this trigger a debug message on every request? if (flags.headers_parsed && !flags.abuse_detected) { flags.abuse_detected = 1; debugs(11, 1, "http handleMoreRequestBodyAvailable: Likely proxy abuse detected '" << orig_request->client_addr << "' -> '" << entry->url() << "'" ); if (virginReply()->sline.status == HTTP_INVALID_HEADER) { comm_close(fd); return; } } } HttpStateData::handleMoreRequestBodyAvailable(); } // premature end of the request body void HttpStateData::handleRequestBodyProducerAborted() { ServerStateData::handleRequestBodyProducerAborted(); - // XXX: SendComplete(COMM_ERR_CLOSING) does little. Is it enough? - CommIoCbParams io(NULL); - io.fd=fd; - io.flag=COMM_ERR_CLOSING; - sendComplete(io); + abortTransaction("request body producer aborted"); } // called when we wrote request headers(!) or a part of the body void HttpStateData::sentRequestBody(const CommIoCbParams &io) { if (io.size > 0) kb_incr(&statCounter.server.http.kbytes_out, io.size); ServerStateData::sentRequestBody(io); } // Quickly abort the transaction // TODO: destruction should be sufficient as the destructor should cleanup, // including canceling close handlers void HttpStateData::abortTransaction(const char *reason) { debugs(11,5, HERE << "aborting transaction for " << reason << "; FD " << fd << ", this " << this); if (fd >= 0) { + fwd->unregister(fd); comm_close(fd); return; } fwd->handleUnregisteredServerEnd(); deleteThis("HttpStateData::abortTransaction"); } HttpRequest * HttpStateData::originalRequest() { return orig_request; } === modified file 'src/http.h' --- src/http.h 2010-07-28 18:04:45 +0000 +++ src/http.h 2010-09-04 23:49:05 +0000 @@ -80,51 +80,55 @@ protected: virtual HttpRequest *originalRequest(); private: AsyncCall::Pointer closeHandler; enum ConnectionStatus { INCOMPLETE_MSG, COMPLETE_PERSISTENT_MSG, COMPLETE_NONPERSISTENT_MSG }; ConnectionStatus statusIfComplete() const; ConnectionStatus persistentConnStatus() const; void keepaliveAccounting(HttpReply *); void checkDateSkew(HttpReply *); bool continueAfterParsingHeader(); void truncateVirginBody(); virtual void haveParsedReplyHeaders(); + virtual bool getMoreRequestBody(MemBuf &buf); virtual void closeServer(); // end communication with the server virtual bool doneWithServer() const; // did we end communication? virtual void abortTransaction(const char *reason); // abnormal termination // consuming request body virtual void handleMoreRequestBodyAvailable(); virtual void handleRequestBodyProducerAborted(); void writeReplyBody(); bool decodeAndWriteReplyBody(); + bool finishingBrokenPost(); + bool finishingChunkedRequest(); void doneSendingRequestBody(); void requestBodyHandler(MemBuf &); virtual void sentRequestBody(const CommIoCbParams &io); - void sendComplete(const CommIoCbParams &io); + void wroteLast(const CommIoCbParams &io); + void sendComplete(); void httpStateConnClosed(const CommCloseCbParams ¶ms); void httpTimeout(const CommTimeoutCbParams ¶ms); mb_size_t buildRequestPrefix(HttpRequest * request, HttpRequest * orig_request, StoreEntry * entry, MemBuf * mb, http_state_flags flags); static bool decideIfWeDoRanges (HttpRequest * orig_request); bool peerSupportsConnectionPinning() const; ChunkedCodingParser *httpChunkDecoder; private: CBDATA_CLASS2(HttpStateData); }; #endif /* SQUID_HTTP_H */ === modified file 'src/structs.h' --- src/structs.h 2010-08-24 00:02:15 +0000 +++ src/structs.h 2010-09-04 23:44:26 +0000 @@ -752,40 +752,41 @@ http_hdr_type id; String name; field_type type; HttpHeaderFieldStat stat; }; struct _http_state_flags { unsigned int proxying:1; unsigned int keepalive:1; unsigned int only_if_cached:1; unsigned int headers_parsed:1; unsigned int front_end_https:2; unsigned int originpeer:1; unsigned int keepalive_broken:1; unsigned int abuse_detected:1; unsigned int request_sent:1; unsigned int do_next_read:1; unsigned int consume_body_data:1; unsigned int chunked:1; + unsigned int sentLastChunk:1; ///< do not try to write last-chunk again }; struct _ipcache_addrs { Ip::Address *in_addrs; unsigned char *bad_mask; unsigned char count; unsigned char cur; unsigned char badcount; }; struct _domain_ping { char *domain; int do_ping; /* boolean */ domain_ping *next; }; struct _domain_type { char *domain; peer_t type; domain_type *next;