=== modified file 'src/CommCalls.cc' --- src/CommCalls.cc 2009-07-12 22:56:47 +0000 +++ src/CommCalls.cc 2011-01-09 01:36:41 +0000 @@ -113,40 +113,46 @@ { } /* CommTimeoutCbParams */ CommTimeoutCbParams::CommTimeoutCbParams(void *aData): CommCommonCbParams(aData) { } /* CommAcceptCbPtrFun */ CommAcceptCbPtrFun::CommAcceptCbPtrFun(IOACB *aHandler, const CommAcceptCbParams &aParams): CommDialerParamsT(aParams), handler(aHandler) { } +CommAcceptCbPtrFun::CommAcceptCbPtrFun(const CommAcceptCbPtrFun &o): + CommDialerParamsT(o.params), + handler(o.handler) +{ +} + void CommAcceptCbPtrFun::dial() { handler(params.fd, params.nfd, ¶ms.details, params.flag, params.xerrno, params.data); } void CommAcceptCbPtrFun::print(std::ostream &os) const { os << '('; params.print(os); os << ')'; } /* CommConnectCbPtrFun */ CommConnectCbPtrFun::CommConnectCbPtrFun(CNCB *aHandler, const CommConnectCbParams &aParams): CommDialerParamsT(aParams), === modified file 'src/CommCalls.h' --- src/CommCalls.h 2010-08-24 00:12:54 +0000 +++ src/CommCalls.h 2011-01-09 04:56:21 +0000 @@ -159,42 +159,45 @@ virtual void print(std::ostream &os) const { os << '('; this->params.print(os); os << ')'; } public: Method method; protected: virtual void doDial() { ((&(*this->job))->*method)(this->params); } }; // accept (IOACB) dialer class CommAcceptCbPtrFun: public CallDialer, public CommDialerParamsT { public: typedef CommAcceptCbParams Params; + typedef RefCount Pointer; CommAcceptCbPtrFun(IOACB *aHandler, const CommAcceptCbParams &aParams); + CommAcceptCbPtrFun(const CommAcceptCbPtrFun &o); + void dial(); virtual void print(std::ostream &os) const; public: IOACB *handler; }; // connect (CNCB) dialer class CommConnectCbPtrFun: public CallDialer, public CommDialerParamsT { public: typedef CommConnectCbParams Params; CommConnectCbPtrFun(CNCB *aHandler, const Params &aParams); void dial(); virtual void print(std::ostream &os) const; @@ -242,45 +245,51 @@ public: typedef CommTimeoutCbParams Params; CommTimeoutCbPtrFun(PF *aHandler, const Params &aParams); void dial(); virtual void print(std::ostream &os) const; public: PF *handler; }; // AsyncCall to comm handlers implemented as global functions. // The dialer is one of the Comm*CbPtrFunT above // TODO: Get rid of this class by moving canFire() to canDial() method // of dialers. template class CommCbFunPtrCallT: public AsyncCall { public: + typedef RefCount > Pointer; typedef typename Dialer::Params Params; inline CommCbFunPtrCallT(int debugSection, int debugLevel, const char *callName, const Dialer &aDialer); + inline CommCbFunPtrCallT(const Pointer &p) : + AsyncCall(p->debugSection, p->debugLevel, p->name), + dialer(p->dialer) + {} + virtual CallDialer* getDialer() { return &dialer; } public: Dialer dialer; protected: inline virtual bool canFire(); inline virtual void fire(); }; // Conveninece wrapper: It is often easier to call a templated function than // to create a templated class. template inline CommCbFunPtrCallT *commCbCall(int debugSection, int debugLevel, const char *callName, const Dialer &dialer) { return new CommCbFunPtrCallT(debugSection, debugLevel, callName, dialer); } === modified file 'src/ProtoPort.cc' --- src/ProtoPort.cc 2010-11-18 08:01:53 +0000 +++ src/ProtoPort.cc 2011-01-09 00:58:29 +0000 @@ -1,42 +1,47 @@ /* * $Id$ */ #include "squid.h" +#include "comm.h" #include "ProtoPort.h" #if HAVE_LIMITS #include #endif -http_port_list::http_port_list(const char *aProtocol) +http_port_list::http_port_list(const char *aProtocol) : + listenFd(-1) #if USE_SSL - : - http(*this), dynamicCertMemCacheSize(std::numeric_limits::max()) + , http(*this) + , dynamicCertMemCacheSize(std::numeric_limits::max()) #endif { protocol = xstrdup(aProtocol); } http_port_list::~http_port_list() { - delete listener; + if (listenFd >= 0) { + comm_close(listenFd); + listenFd = -1; + } safe_free(name); safe_free(defaultsite); safe_free(protocol); #if USE_SSL safe_free(cert); safe_free(key); safe_free(options); safe_free(cipher); safe_free(cafile); safe_free(capath); safe_free(dhfile); safe_free(sslflags); safe_free(sslContextSessionId); #endif } #if USE_SSL === modified file 'src/ProtoPort.h' --- src/ProtoPort.h 2010-11-18 08:01:53 +0000 +++ src/ProtoPort.h 2011-01-09 00:55:34 +0000 @@ -1,70 +1,68 @@ /* * $Id$ */ #ifndef SQUID_PROTO_PORT_H #define SQUID_PROTO_PORT_H -//#include "typedefs.h" #include "cbdata.h" -#include "comm/ListenStateData.h" #if USE_SSL #include "ssl/gadgets.h" #endif struct http_port_list { http_port_list(const char *aProtocol); ~http_port_list(); http_port_list *next; Ip::Address s; char *protocol; /* protocol name */ char *name; /* visible name */ char *defaultsite; /* default web site */ unsigned int intercepted:1; /**< intercepting proxy port */ unsigned int spoof_client_ip:1; /**< spoof client ip if possible */ unsigned int accel:1; /**< HTTP accelerator */ unsigned int allow_direct:1; /**< Allow direct forwarding in accelerator mode */ unsigned int vhost:1; /**< uses host header */ unsigned int sslBump:1; /**< intercepts CONNECT requests */ unsigned int ignore_cc:1; /**< Ignore request Cache-Control directives */ int vport; /* virtual port support, -1 for dynamic, >0 static*/ bool connection_auth_disabled; /* Don't support connection oriented auth */ int disable_pmtu_discovery; struct { unsigned int enabled; unsigned int idle; unsigned int interval; unsigned int timeout; } tcp_keepalive; /** - * The FD listening socket handler. - * If not NULL we are actively listening for client requests. - * delete to close the socket. + * The FD listening socket. + * If >= 0 we are actively listening for client requests. + * use comm_close(listenFd) to stop. */ - Comm::ListenStateData *listener; + int listenFd; #if USE_SSL // XXX: temporary hack to ease move of SSL options to http_port http_port_list &http; char *cert; char *key; int version; char *cipher; char *options; char *clientca; char *cafile; char *capath; char *crlfile; char *dhfile; char *sslflags; char *sslContextSessionId; ///< "session id context" for staticSslContext bool generateHostCertificates; ///< dynamically make host cert for sslBump size_t dynamicCertMemCacheSize; ///< max size of generated certificates memory cache === modified file 'src/base/AsyncCall.h' --- src/base/AsyncCall.h 2010-12-02 23:33:27 +0000 +++ src/base/AsyncCall.h 2011-01-09 04:58:01 +0000 @@ -28,40 +28,42 @@ * You do not have to use the macros below to make or receive asynchronous * method calls, but they give you a uniform interface and handy call * debugging. */ class CallDialer; class AsyncCallQueue; /** \todo add unique call IDs \todo CBDATA_CLASS2 kids \ingroup AsyncCallsAPI */ class AsyncCall: public RefCountable { public: typedef RefCount Pointer; friend class AsyncCallQueue; AsyncCall(int aDebugSection, int aDebugLevel, const char *aName); + AsyncCall(); + AsyncCall(const AsyncCall &); virtual ~AsyncCall(); void make(); // fire if we can; handles general call debugging // can be called from canFire() for debugging; always returns false bool cancel(const char *reason); bool canceled() { return isCanceled != NULL; } virtual CallDialer *getDialer() = 0; void print(std::ostream &os); /// remove us from the queue; we are head unless we are queued after prev void dequeue(AsyncCall::Pointer &head, AsyncCall::Pointer &prev); void setNext(AsyncCall::Pointer aNext) { theNext = aNext; } @@ -105,40 +107,44 @@ // TODO: Add these for clarity when CommCbFunPtrCallT is gone //virtual bool canDial(AsyncCall &call) = 0; //virtual void dial(AsyncCall &call) = 0; virtual void print(std::ostream &os) const = 0; }; /** \ingroup AsyncCallAPI * This template implements an AsyncCall using a specified Dialer class */ template class AsyncCallT: public AsyncCall { public: AsyncCallT(int aDebugSection, int aDebugLevel, const char *aName, const Dialer &aDialer): AsyncCall(aDebugSection, aDebugLevel, aName), dialer(aDialer) {} + AsyncCallT(const RefCount > &o): + AsyncCall(o->debugSection, o->debugLevel, o->name), + dialer(o->dialer) {} + CallDialer *getDialer() { return &dialer; } protected: virtual bool canFire() { return AsyncCall::canFire() && dialer.canDial(*this); } virtual void fire() { dialer.dial(*this); } Dialer dialer; }; template inline AsyncCall * asyncCall(int aDebugSection, int aDebugLevel, const char *aName, const Dialer &aDialer) { return new AsyncCallT(aDebugSection, aDebugLevel, aName, aDialer); } === modified file 'src/client_side.cc' --- src/client_side.cc 2011-01-10 09:43:43 +0000 +++ src/client_side.cc 2011-01-10 10:08:21 +0000 @@ -79,106 +79,104 @@ * ClientKeepAliveNextRequest will then detect the presence of data in * the next ClientHttpRequest, and will send it, restablishing the * data flow. */ #include "squid.h" #include "acl/FilledChecklist.h" #include "auth/UserRequest.h" #include "base/TextException.h" #include "ChunkedCodingParser.h" #include "client_side.h" #include "client_side_reply.h" #include "client_side_request.h" #if USE_DELAY_POOLS #include "ClientInfo.h" #endif #include "ClientRequestContext.h" #include "clientStream.h" #include "comm.h" -#include "comm/Write.h" -#include "comm/ListenStateData.h" +#include "CommCalls.h" #include "comm/Loops.h" +#include "comm/Write.h" +#include "comm/TcpAcceptor.h" #include "ConnectionDetail.h" #include "eui/Config.h" #include "fde.h" #include "HttpHdrContRange.h" #include "HttpReply.h" #include "HttpRequest.h" #include "ident/Config.h" #include "ident/Ident.h" #include "ip/Intercept.h" #include "ipc/StartListening.h" #include "MemBuf.h" #include "MemObject.h" #include "ProtoPort.h" #include "rfc1738.h" #include "SquidTime.h" #if USE_SSL #include "ssl/context_storage.h" #include "ssl/helper.h" #include "ssl/support.h" #include "ssl/gadgets.h" #endif #if USE_SSL_CRTD #include "ssl/crtd_message.h" #include "ssl/certificate_db.h" #endif #include "Store.h" #if HAVE_LIMITS #include #endif #if LINGERING_CLOSE #define comm_close comm_lingering_close #endif -/// dials clientHttpConnectionOpened or clientHttpsConnectionOpened call +/// dials clientListenerConnectionOpened call class ListeningStartedDialer: public CallDialer, public Ipc::StartListeningCb { public: - typedef void (*Handler)(int fd, int errNo, http_port_list *portCfg); - ListeningStartedDialer(Handler aHandler, http_port_list *aPortCfg): - handler(aHandler), portCfg(aPortCfg) {} + typedef void (*Handler)(int fd, int errNo, http_port_list *portCfg, bool uses_ssl); + ListeningStartedDialer(Handler aHandler, http_port_list *aPortCfg, bool aSslFlag): + handler(aHandler), portCfg(aPortCfg), uses_ssl(aSslFlag) {} virtual void print(std::ostream &os) const { startPrint(os) << ", port=" << (void*)portCfg << ')'; } virtual bool canDial(AsyncCall &) const { return true; } - virtual void dial(AsyncCall &) { (handler)(fd, errNo, portCfg); } + virtual void dial(AsyncCall &) { (handler)(fd, errNo, portCfg, uses_ssl); } public: Handler handler; private: http_port_list *portCfg; ///< from Config.Sockaddr.http + bool uses_ssl; }; - -static void clientHttpConnectionOpened(int fd, int errNo, http_port_list *s); -#if USE_SSL -static void clientHttpsConnectionOpened(int fd, int errNo, http_port_list *s); -#endif +static void clientListenerConnectionOpened(int fd, int errNo, http_port_list *s, bool uses_ssl); /* our socket-related context */ CBDATA_CLASS_INIT(ClientSocketContext); void * ClientSocketContext::operator new (size_t byteCount) { /* derived classes with different sizes must implement their own new */ assert (byteCount == sizeof (ClientSocketContext)); CBDATA_INIT_TYPE(ClientSocketContext); return cbdataAlloc(ClientSocketContext); } void ClientSocketContext::operator delete (void *address) { cbdataFree (address); } @@ -3104,48 +3102,49 @@ #else static int reported = 0; if (!reported) { debugs(33, 1, "Notice: httpd_accel_no_pmtu_disc not supported on your platform"); reported = 1; } #endif } result->flags.readMoreRequests = true; return result; } /** Handle a new connection on HTTP socket. */ void -httpAccept(int sock, int newfd, ConnectionDetail *details, - comm_err_t flag, int xerrno, void *data) +httpAccept(int, int newfd, ConnectionDetail *details, comm_err_t flag, int xerrno, void *data) { http_port_list *s = (http_port_list *)data; ConnStateData *connState = NULL; if (flag != COMM_OK) { - debugs(33, 1, "httpAccept: FD " << sock << ": accept failure: " << xstrerr(xerrno)); + // This should not occur with TcpAcceptor. + // However its possible the call was still queued when the client disconnected + debugs(33, 1, "httpAccept: FD " << s->listenFd << ": accept failure: " << xstrerr(xerrno)); return; } debugs(33, 4, "httpAccept: FD " << newfd << ": accepted"); fd_note(newfd, "client http connect"); connState = connStateCreate(&details->peer, &details->me, newfd, s); typedef CommCbMemFunT Dialer; AsyncCall::Pointer call = JobCallback(33, 5, Dialer, connState, ConnStateData::connStateClosed); comm_add_close_handler(newfd, call); if (Config.onoff.log_fqdn) fqdncache_gethostbyaddr(details->peer, FQDN_LOOKUP_IF_MISS); typedef CommCbMemFunT TimeoutDialer; AsyncCall::Pointer timeoutCall = JobCallback(33, 5, TimeoutDialer, connState, ConnStateData::requestTimeout); commSetTimeout(newfd, Config.Timeout.read, timeoutCall); @@ -3350,49 +3349,50 @@ debugs(83, 3, "clientNegotiateSSL: FD " << fd << " client certificate: subject: " << X509_NAME_oneline(X509_get_subject_name(client_cert), 0, 0)); debugs(83, 3, "clientNegotiateSSL: FD " << fd << " client certificate: issuer: " << X509_NAME_oneline(X509_get_issuer_name(client_cert), 0, 0)); X509_free(client_cert); } else { debugs(83, 5, "clientNegotiateSSL: FD " << fd << " has no certificate."); } conn->readSomeData(); } /** handle a new HTTPS connection */ static void -httpsAccept(int sock, int newfd, ConnectionDetail *details, - comm_err_t flag, int xerrno, void *data) +httpsAccept(int, int newfd, ConnectionDetail *details, comm_err_t flag, int xerrno, void *data) { https_port_list *s = (https_port_list *)data; SSL_CTX *sslContext = s->staticSslContext.get(); if (flag != COMM_OK) { + // This should not occur with TcpAcceptor. + // However its possible the call was still queued when the client disconnected errno = xerrno; - debugs(33, 1, "httpsAccept: FD " << sock << ": accept failure: " << xstrerr(xerrno)); + debugs(33, 1, "httpsAccept: FD " << s->listenFd << ": accept failure: " << xstrerr(xerrno)); return; } SSL *ssl = NULL; if (!(ssl = httpsCreate(newfd, details, sslContext))) return; debugs(33, 5, "httpsAccept: FD " << newfd << " accepted, starting SSL negotiation."); fd_note(newfd, "client https connect"); ConnStateData *connState = connStateCreate(details->peer, details->me, newfd, &s->http); typedef CommCbMemFunT Dialer; AsyncCall::Pointer call = JobCallback(33, 5, Dialer, connState, ConnStateData::connStateClosed); comm_add_close_handler(newfd, call); if (Config.onoff.log_fqdn) fqdncache_gethostbyaddr(details->peer, FQDN_LOOKUP_IF_MISS); typedef CommCbMemFunT TimeoutDialer; @@ -3605,161 +3605,148 @@ !s->staticSslContext && !s->generateHostCertificates) { debugs(1, 1, "Will not bump SSL at http_port " << s->http.s << " due to SSL initialization failure."); s->sslBump = 0; } if (s->sslBump) { ++bumpCount; // Create ssl_ctx cache for this port. Ssl::TheGlobalContextStorage.addLocalStorage(s->s, s->dynamicCertMemCacheSize == std::numeric_limits::max() ? 4194304 : s->dynamicCertMemCacheSize); } #endif #if USE_SSL_CRTD Ssl::Helper::GetInstance(); #endif //USE_SSL_CRTD /* AYJ: 2009-12-27: bit bumpy. new ListenStateData(...) should be doing all the Comm:: stuff ... */ const int openFlags = COMM_NONBLOCKING | (s->spoof_client_ip ? COMM_TRANSPARENT : 0); - AsyncCall::Pointer callback = asyncCall(33,2, - "clientHttpConnectionOpened", - ListeningStartedDialer(&clientHttpConnectionOpened, s)); - Ipc::StartListening(SOCK_STREAM, IPPROTO_TCP, s->s, openFlags, - Ipc::fdnHttpSocket, callback); + // setup the subscriptions such that new connections accepted by listenConn are handled by HTTP + typedef CommCbFunPtrCallT AcceptCall; + RefCount subCall = commCbCall(5, 5, "httpAccept", CommAcceptCbPtrFun(httpAccept, s)); + Subscription::Pointer sub = new CallSubscription(subCall); + + AsyncCall::Pointer listenCall = asyncCall(33,2, "clientListenerConnectionOpened", + ListeningStartedDialer(&clientListenerConnectionOpened, s, false)); + Ipc::StartListening(SOCK_STREAM, IPPROTO_TCP, s->s, openFlags, Ipc::fdnHttpSocket, listenCall, sub); - HttpSockets[NHttpSockets++] = -1; // set in clientHttpConnectionOpened + HttpSockets[NHttpSockets++] = -1; // set in clientListenerConnectionOpened } #if USE_SSL if (bumpCount && !Config.accessList.ssl_bump) debugs(33, 1, "WARNING: http_port(s) with SslBump found, but no " << std::endl << "\tssl_bump ACL configured. No requests will be " << "bumped."); #endif } /// process clientHttpConnectionsOpen result static void -clientHttpConnectionOpened(int fd, int, http_port_list *s) +clientListenerConnectionOpened(int fd, int errNo, http_port_list *s, bool uses_ssl) { - if (!OpenedHttpSocket(fd, "Cannot open HTTP Port")) + s->listenFd = fd; + if (!OpenedHttpSocket(s->listenFd, (uses_ssl?"Cannot open HTTPS Port":"Cannot open HTTP Port"))) return; Must(s); + Must(s->listenFd >= 0); - AsyncCall::Pointer call = commCbCall(5,5, "SomeCommAcceptHandler(httpAccept)", - CommAcceptCbPtrFun(httpAccept, s)); - - s->listener = new Comm::ListenStateData(fd, call, true); - - debugs(1, 1, "Accepting " << + debugs(1, 1, "Accepting" << (s->intercepted ? " intercepted" : "") << (s->spoof_client_ip ? " spoofing" : "") << (s->sslBump ? " bumpy" : "") << (s->accel ? " accelerated" : "") - << " HTTP connections at " << s->s - << ", FD " << fd << "." ); + << " HTTP" << (uses_ssl?"S":"") << " connections at " + << " FD " << s->listenFd << " on " << s->s); - Must(AddOpenedHttpSocket(fd)); // otherwise, we have received a fd we did not ask for + Must(AddOpenedHttpSocket(s->listenFd)); // otherwise, we have received a fd we did not ask for } #if USE_SSL static void clientHttpsConnectionsOpen(void) { https_port_list *s; for (s = Config.Sockaddr.https; s; s = (https_port_list *)s->http.next) { if (MAXHTTPPORTS == NHttpSockets) { debugs(1, 1, "Ignoring 'https_port' lines exceeding the limit."); debugs(1, 1, "The limit is " << MAXHTTPPORTS << " HTTPS ports."); continue; } if (!s->staticSslContext) { debugs(1, 1, "Ignoring https_port " << s->http.s << " due to SSL initialization failure."); continue; } - AsyncCall::Pointer call = asyncCall(33, 2, "clientHttpsConnectionOpened", - ListeningStartedDialer(&clientHttpsConnectionOpened, &s->http)); - - Ipc::StartListening(SOCK_STREAM, IPPROTO_TCP, s->http.s, COMM_NONBLOCKING, - Ipc::fdnHttpsSocket, call); - - HttpSockets[NHttpSockets++] = -1; - } -} - -/// process clientHttpsConnectionsOpen result -static void -clientHttpsConnectionOpened(int fd, int, http_port_list *s) -{ - if (!OpenedHttpSocket(fd, "Cannot open HTTPS Port")) - return; - - Must(s); + const int openFlags = COMM_NONBLOCKING | + (s->spoof_client_ip ? COMM_TRANSPARENT : 0); - AsyncCall::Pointer call = commCbCall(5,5, "SomeCommAcceptHandler(httpsAccept)", - CommAcceptCbPtrFun(httpsAccept, s)); + // setup the subscriptions such that new connections accepted by listenConn are handled by HTTPS + typedef CommCbFunPtrCallT AcceptCall; + RefCount subCall = commCbCall(5, 5, "httpsAccept", CommAcceptCbPtrFun(httpsAccept, s)); + Subscription::Pointer sub = new CallSubscription(subCall); - s->listener = new Comm::ListenStateData(fd, call, true); + AsyncCall::Pointer listenCall = asyncCall(33, 2, "clientListenerConnectionOpened", + ListeningStartedDialer(&clientListenerConnectionOpened, &s->http, true)); - debugs(1, 1, "Accepting HTTPS connections at " << s->s << ", FD " << fd << "."); + Ipc::StartListening(SOCK_STREAM, IPPROTO_TCP, s->s, openFlags, Ipc::fdnHttpsSocket, listenCall, sub); - Must(AddOpenedHttpSocket(fd)); // otherwise, we have received a fd we did not ask for + HttpSockets[NHttpSockets++] = -1; + } } - #endif void clientOpenListenSockets(void) { clientHttpConnectionsOpen(); #if USE_SSL clientHttpsConnectionsOpen(); #endif if (NHttpSockets < 1) fatal("No HTTP or HTTPS ports configured"); } void clientHttpConnectionsClose(void) { for (http_port_list *s = Config.Sockaddr.http; s; s = s->next) { - if (s->listener) { - debugs(1, 1, "FD " << s->listener->fd << " Closing HTTP connection"); - delete s->listener; - s->listener = NULL; + if (s->listenFd >= 0) { + debugs(1, 1, "FD " << s->listenFd << " Closing HTTP connection"); + comm_close(s->listenFd); + s->listenFd = -1; } } #if USE_SSL for (http_port_list *s = Config.Sockaddr.https; s; s = s->next) { - if (s->listener) { - debugs(1, 1, "FD " << s->listener->fd << " Closing HTTPS connection"); - delete s->listener; - s->listener = NULL; + if (s->listenFd >= 0) { + debugs(1, 1, "FD " << s->listenFd << " Closing HTTPS connection"); + comm_close(s->listenFd); + s->listenFd = -1; } } #endif // TODO see if we can drop HttpSockets array entirely */ for (int i = 0; i < NHttpSockets; i++) { HttpSockets[i] = -1; } NHttpSockets = 0; } int varyEvaluateMatch(StoreEntry * entry, HttpRequest * request) { const char *vary = request->vary_headers; int has_vary = entry->getReply()->header.has(HDR_VARY); #if X_ACCELERATOR_VARY has_vary |= === modified file 'src/comm.cc' --- src/comm.cc 2011-01-10 09:43:43 +0000 +++ src/comm.cc 2011-01-10 12:31:49 +0000 @@ -23,43 +23,43 @@ * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111, USA. * * * Copyright (c) 2003, Robert Collins */ #include "squid.h" #include "StoreIOBuffer.h" #include "comm.h" #include "event.h" #include "fde.h" #include "comm/AcceptLimiter.h" #include "comm/comm_internal.h" #include "comm/IoCallback.h" -#include "comm/Write.h" -#include "comm/ListenStateData.h" #include "comm/Loops.h" +#include "comm/Write.h" +#include "comm/TcpAcceptor.h" #include "CommIO.h" #include "CommRead.h" #include "ConnectionDetail.h" #include "MemBuf.h" #include "pconn.h" #include "SquidTime.h" #include "CommCalls.h" #include "DescriptorSet.h" #include "icmp/net_db.h" #include "ip/Address.h" #include "ip/Intercept.h" #include "ip/QosConfig.h" #include "ip/tools.h" #include "ClientInfo.h" #if USE_SSL #include "ssl/support.h" #endif #include "cbdata.h" #if defined(_SQUID_CYGWIN_) @@ -127,41 +127,41 @@ static void commSetReuseAddr(int); static void commSetNoLinger(int); #ifdef TCP_NODELAY static void commSetTcpNoDelay(int); #endif static void commSetTcpRcvbuf(int, int); static PF commConnectFree; static IPH commConnectDnsHandle; typedef enum { COMM_CB_READ = 1, COMM_CB_DERIVED } comm_callback_t; static MemAllocator *conn_close_pool = NULL; fd_debug_t *fdd_table = NULL; bool isOpen(const int fd) { - return fd_table[fd].flags.open != 0; + return fd >= 0 && fd_table[fd].flags.open != 0; } /** * Attempt a read * * If the read attempt succeeds or fails, call the callback. * Else, wait for another IO notification. */ void commHandleRead(int fd, void *data) { Comm::IoCallback *ccb = (Comm::IoCallback *) data; assert(data == COMMIO_FD_READCB(fd)); assert(ccb->active()); /* Attempt a read */ statCounter.syscalls.sock.reads++; errno = 0; int retval; retval = FD_READ_METHOD(fd, ccb->buf, ccb->size); === modified file 'src/comm/AcceptLimiter.cc' --- src/comm/AcceptLimiter.cc 2009-12-31 02:35:01 +0000 +++ src/comm/AcceptLimiter.cc 2011-01-10 00:52:51 +0000 @@ -1,32 +1,51 @@ #include "config.h" #include "comm/AcceptLimiter.h" -#include "comm/ListenStateData.h" +#include "comm/TcpAcceptor.h" #include "fde.h" Comm::AcceptLimiter Comm::AcceptLimiter::Instance_; Comm::AcceptLimiter &Comm::AcceptLimiter::Instance() { return Instance_; } void -Comm::AcceptLimiter::defer(Comm::ListenStateData *afd) +Comm::AcceptLimiter::defer(Comm::TcpAcceptor *afd) { afd->isLimited++; debugs(5, 5, HERE << "FD " << afd->fd << " x" << afd->isLimited); deferred.push_back(afd); } void +Comm::AcceptLimiter::removeDead(const Comm::TcpAcceptor *afd) +{ + for (unsigned int i = 0; i < deferred.size() && afd->isLimited > 0; i++) { + if (deferred[i] == afd) { + deferred[i]->isLimited--; + deferred[i] = NULL; // fast. kick() will skip empty entries later. + debugs(5, 5, HERE << "FD " << afd->fd << " x" << afd->isLimited); + } + } +} + +void Comm::AcceptLimiter::kick() { + // TODO: this could be optimized further with an iterator to search + // looking for first non-NULL, followed by dumping the first N + // with only one shift()/pop_front operation + debugs(5, 5, HERE << " size=" << deferred.size()); - if (deferred.size() > 0 && fdNFree() >= RESERVED_FD) { - debugs(5, 5, HERE << " doing one."); + while (deferred.size() > 0 && fdNFree() >= RESERVED_FD) { /* NP: shift() is equivalent to pop_front(). Giving us a FIFO queue. */ - ListenStateData *temp = deferred.shift(); - temp->isLimited--; - temp->acceptNext(); + TcpAcceptor *temp = deferred.shift(); + if (temp != NULL) { + debugs(5, 5, HERE << " doing one."); + temp->isLimited--; + temp->acceptNext(); + break; + } } } === modified file 'src/comm/AcceptLimiter.h' --- src/comm/AcceptLimiter.h 2010-01-13 01:13:17 +0000 +++ src/comm/AcceptLimiter.h 2011-01-08 14:09:20 +0000 @@ -1,42 +1,45 @@ #ifndef _SQUID_SRC_COMM_ACCEPT_LIMITER_H #define _SQUID_SRC_COMM_ACCEPT_LIMITER_H #include "Array.h" namespace Comm { -class ListenStateData; +class TcpAcceptor; /** * FIFO Queue holding listener socket handlers which have been activated * ready to dupe their FD and accept() a new client connection. * But when doing so there were not enough FD available to handle the * new connection. These handlers are awaiting some FD to become free. * * defer - used only by Comm layer ListenStateData adding themselves when FD are limited. * kick - used by Comm layer when FD are closed. */ class AcceptLimiter { public: /** retrieve the global instance of the queue. */ static AcceptLimiter &Instance(); /** delay accepting a new client connection. */ - void defer(Comm::ListenStateData *afd); + void defer(Comm::TcpAcceptor *afd); + + /** remove all records of an acceptor. Only to be called by the ConnAcceptor::swanSong() */ + void removeDead(const Comm::TcpAcceptor *afd); /** try to accept and begin processing any delayed client connections. */ void kick(); private: static AcceptLimiter Instance_; /** FIFO queue */ - Vector deferred; + Vector deferred; }; }; // namepace Comm #endif /* _SQUID_SRC_COMM_ACCEPT_LIMITER_H */ === modified file 'src/comm/Makefile.am' --- src/comm/Makefile.am 2011-01-10 09:43:43 +0000 +++ src/comm/Makefile.am 2011-01-10 12:32:06 +0000 @@ -1,25 +1,25 @@ include $(top_srcdir)/src/Common.am include $(top_srcdir)/src/TestHeaders.am noinst_LTLIBRARIES = libcomm.la ## Library holding comm socket handlers libcomm_la_SOURCES= \ AcceptLimiter.cc \ AcceptLimiter.h \ - ListenStateData.cc \ - ListenStateData.h \ Loops.h \ ModDevPoll.cc \ ModEpoll.cc \ ModKqueue.cc \ ModPoll.cc \ ModSelect.cc \ ModSelectWin32.cc \ + TcpAcceptor.cc \ + TcpAcceptor.h \ \ IoCallback.cc \ IoCallback.h \ Write.cc \ Write.h \ \ comm_internal.h === renamed file 'src/comm/ListenStateData.cc' => 'src/comm/TcpAcceptor.cc' --- src/comm/ListenStateData.cc 2011-01-10 09:43:43 +0000 +++ src/comm/TcpAcceptor.cc 2011-01-10 12:33:59 +0000 @@ -16,265 +16,351 @@ * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111, USA. * * * Copyright (c) 2003, Robert Collins */ #include "squid.h" +#include "base/TextException.h" #include "CommCalls.h" #include "comm/AcceptLimiter.h" #include "comm/comm_internal.h" -#include "comm/ListenStateData.h" #include "comm/Loops.h" +#include "comm/TcpAcceptor.h" #include "ConnectionDetail.h" #include "fde.h" #include "protos.h" #include "SquidTime.h" +namespace Comm { + CBDATA_CLASS_INIT(TcpAcceptor); +}; + +Comm::TcpAcceptor::TcpAcceptor(const int listenFd, const Ip::Address &laddr, int flags, + const char *note, const Subscription::Pointer &aSub) : + AsyncJob("Comm::TcpAcceptor"), + errcode(0), + isLimited(0), + theCallSub(aSub), + fd(listenFd), + local_addr(laddr), + newFd_(-1) +{ + /* open the conn if its not already open */ + if (fd < 0) { + fd = comm_open_listener(SOCK_STREAM, IPPROTO_TCP, local_addr, flags, note); + errcode = errno; + + if (fd < 0) { + debugs(5, DBG_CRITICAL, HERE << "comm_open failed: FD " << fd << ", " << local_addr << " error: " << errcode); + return; + } + debugs(9, 3, HERE << "Unconnected data socket created on FD " << fd << ", " << local_addr); + } +} + +void +Comm::TcpAcceptor::subscribe(const Subscription::Pointer &aSub) +{ + debugs(5, 5, HERE << "FD " << fd << ", " << local_addr << " AsyncCall Subscription: " << aSub); + unsubscribe("subscription change"); + theCallSub = aSub; +} + +void +Comm::TcpAcceptor::unsubscribe(const char *reason) +{ + debugs(5, 5, HERE << "FD " << fd << ", " << local_addr << " AsyncCall Subscription " << theCallSub << " removed: " << reason); + theCallSub = NULL; +} + +void +Comm::TcpAcceptor::start() +{ + debugs(5, 5, HERE << "FD " << fd << ", " << local_addr << " AsyncCall Subscription: " << theCallSub); + + Must(isOpen(fd)); + + setListen(); + + // if no error so far start accepting connections. + if (errcode == 0) + SetSelect(fd, COMM_SELECT_READ, doAccept, this, 0); +} + +bool +Comm::TcpAcceptor::doneAll() const +{ + // stop when FD is closed + if (!isOpen(fd)) { + return AsyncJob::doneAll(); + } + + // stop when handlers are gone + if (theCallSub == NULL) { + return AsyncJob::doneAll(); + } + + // open FD with handlers...keep accepting. + return false; +} + +void +Comm::TcpAcceptor::swanSong() +{ + debugs(5,5, HERE); + unsubscribe("swanSong"); + fd = -1; + AcceptLimiter::Instance().removeDead(this); + AsyncJob::swanSong(); +} + /** * New-style listen and accept routines * * setListen simply registers our interest in an FD for listening. * The constructor takes a callback to call when an FD has been * accept()ed some time later. */ void -Comm::ListenStateData::setListen() +Comm::TcpAcceptor::setListen() { errcode = 0; // reset local errno copy. if (listen(fd, Squid_MaxFD >> 2) < 0) { - debugs(50, 0, HERE << "listen(FD " << fd << ", " << (Squid_MaxFD >> 2) << "): " << xstrerror()); + debugs(50, DBG_CRITICAL, "ERROR: listen(FD " << fd << ", " << local_addr << ", " << (Squid_MaxFD >> 2) << "): " << xstrerror()); errcode = errno; return; } if (Config.accept_filter && strcmp(Config.accept_filter, "none") != 0) { #ifdef SO_ACCEPTFILTER struct accept_filter_arg afa; bzero(&afa, sizeof(afa)); debugs(5, DBG_IMPORTANT, "Installing accept filter '" << Config.accept_filter << "' on FD " << fd); xstrncpy(afa.af_name, Config.accept_filter, sizeof(afa.af_name)); if (setsockopt(fd, SOL_SOCKET, SO_ACCEPTFILTER, &afa, sizeof(afa)) < 0) - debugs(5, DBG_CRITICAL, "SO_ACCEPTFILTER '" << Config.accept_filter << "': '" << xstrerror()); + debugs(5, DBG_CRITICAL, "WARNING: SO_ACCEPTFILTER '" << Config.accept_filter << "': '" << xstrerror()); #elif defined(TCP_DEFER_ACCEPT) int seconds = 30; if (strncmp(Config.accept_filter, "data=", 5) == 0) seconds = atoi(Config.accept_filter + 5); if (setsockopt(fd, IPPROTO_TCP, TCP_DEFER_ACCEPT, &seconds, sizeof(seconds)) < 0) - debugs(5, DBG_CRITICAL, "TCP_DEFER_ACCEPT '" << Config.accept_filter << "': '" << xstrerror()); + debugs(5, DBG_CRITICAL, "WARNING: TCP_DEFER_ACCEPT '" << Config.accept_filter << "': '" << xstrerror()); #else - debugs(5, DBG_CRITICAL, "accept_filter not supported on your OS"); + debugs(5, DBG_CRITICAL, "WARNING: accept_filter not supported on your OS"); #endif } } -Comm::ListenStateData::ListenStateData(int aFd, AsyncCall::Pointer &call, bool accept_many) : - fd(aFd), - theCallback(call), - mayAcceptMore(accept_many) -{ - assert(aFd >= 0); - debugs(5, 5, HERE << "FD " << fd << " AsyncCall: " << call); - assert(isOpen(aFd)); - setListen(); - SetSelect(fd, COMM_SELECT_READ, doAccept, this, 0); -} - -Comm::ListenStateData::~ListenStateData() -{ - comm_close(fd); - fd = -1; -} - /** * This private callback is called whenever a filedescriptor is ready * to dupe itself and fob off an accept()ed connection * * It will either do that accept operation. Or if there are not enough FD * available to do the clone safely will push the listening FD into a list * of deferred operations. The list gets kicked and the dupe/accept() actually * done later when enough sockets become available. */ void -Comm::ListenStateData::doAccept(int fd, void *data) +Comm::TcpAcceptor::doAccept(int fd, void *data) { - debugs(5, 2, HERE << "New connection on FD " << fd); + try { + debugs(5, 2, HERE << "New connection on FD " << fd); - assert(isOpen(fd)); - ListenStateData *afd = static_cast(data); + Must(isOpen(fd)); + TcpAcceptor *afd = static_cast(data); - if (!okToAccept()) { - AcceptLimiter::Instance().defer(afd); - } else { - afd->acceptNext(); + if (!okToAccept()) { + AcceptLimiter::Instance().defer(afd); + } else { + afd->acceptNext(); + } + SetSelect(fd, COMM_SELECT_READ, Comm::TcpAcceptor::doAccept, afd, 0); + + } catch(const TextException &e) { + fatalf("FATAL: error while accepting new client connection: %s\n", e.message); + } catch(...) { + fatal("FATAL: error while accepting new client connection: [unkown]\n"); } - SetSelect(fd, COMM_SELECT_READ, Comm::ListenStateData::doAccept, afd, 0); } bool -Comm::ListenStateData::okToAccept() +Comm::TcpAcceptor::okToAccept() { static time_t last_warn = 0; if (fdNFree() >= RESERVED_FD) return true; if (last_warn + 15 < squid_curtime) { debugs(5, DBG_CRITICAL, "WARNING! Your cache is running out of filedescriptors"); last_warn = squid_curtime; } return false; } void -Comm::ListenStateData::acceptOne() +Comm::TcpAcceptor::acceptOne() { /* * We don't worry about running low on FDs here. Instead, * doAccept() will use AcceptLimiter if we reach the limit * there. */ /* Accept a new connection */ - ConnectionDetail connDetails; - int newfd = oldAccept(connDetails); + ConnectionDetail newConnDetails; + comm_err_t status = oldAccept(newConnDetails); /* Check for errors */ - if (newfd < 0) { + if (!isOpen(newFd_)) { - if (newfd == COMM_NOMESSAGE) { + if (status == COMM_NOMESSAGE) { /* register interest again */ - debugs(5, 5, HERE << "try later: FD " << fd << " handler: " << theCallback); + debugs(5, 5, HERE << "try later: FD " << fd << " handler Subscription: " << theCallSub); SetSelect(fd, COMM_SELECT_READ, doAccept, this, 0); return; } // A non-recoverable error; notify the caller */ - debugs(5, 5, HERE << "non-recoverable error: FD " << fd << " handler: " << theCallback); - notify(-1, COMM_ERROR, connDetails); - mayAcceptMore = false; + debugs(5, 5, HERE << "non-recoverable error: FD " << fd << ", " << local_addr << " handler Subscription: " << theCallSub); + notify(status, newConnDetails); + mustStop("Listener socket closed"); return; } - debugs(5, 5, HERE << "accepted: FD " << fd << - " newfd: " << newfd << " from: " << connDetails.peer << - " handler: " << theCallback); - notify(newfd, COMM_OK, connDetails); + debugs(5, 5, HERE << "Listener: FD " << fd << + " accepted new connection from " << newConnDetails.peer << + " handler Subscription: " << theCallSub); + notify(status, newConnDetails); } void -Comm::ListenStateData::acceptNext() +Comm::TcpAcceptor::acceptNext() { - assert(isOpen(fd)); + Must(isOpen(fd)); debugs(5, 2, HERE << "connection on FD " << fd); acceptOne(); } +// XXX: obsolete comment? +// NP: can't be a const function because syncWithComm() side effects hit theCallSub->callback(). void -Comm::ListenStateData::notify(int newfd, comm_err_t flag, const ConnectionDetail &connDetails) +Comm::TcpAcceptor::notify(comm_err_t flag, const ConnectionDetail &connDetails) { // listener socket handlers just abandon the port with COMM_ERR_CLOSING // it should only happen when this object is deleted... if (flag == COMM_ERR_CLOSING) { return; } - if (theCallback != NULL) { - typedef CommAcceptCbParams Params; - Params ¶ms = GetCommParams(theCallback); + if (theCallSub != NULL) { + AsyncCall::Pointer call = theCallSub->callback(); + CommAcceptCbParams ¶ms = GetCommParams(call); params.fd = fd; - params.nfd = newfd; + params.nfd = newFd_; params.details = connDetails; params.flag = flag; params.xerrno = errcode; - ScheduleCallHere(theCallback); - if (!mayAcceptMore) - theCallback = NULL; + ScheduleCallHere(call); } + + // drop the temporary recent accepted socket FD details. + // this prevents information crossover on calls. + newFd_ = -1; } /** * accept() and process - * Wait for an incoming connection on FD. + * Wait for an incoming connection on our listener socket. + * + * \retval COMM_OK success. details parameter filled. + * \retval COMM_NOMESSAGE attempted accept() but nothing useful came in. + * \retval COMM_ERROR an outright failure occured. + * Or if this client has too many connections already. */ -int -Comm::ListenStateData::oldAccept(ConnectionDetail &details) +comm_err_t +Comm::TcpAcceptor::oldAccept(ConnectionDetail &details) { PROF_start(comm_accept); statCounter.syscalls.sock.accepts++; int sock; struct addrinfo *gai = NULL; details.me.InitAddrInfo(gai); errcode = 0; // reset local errno copy. if ((sock = accept(fd, gai->ai_addr, &gai->ai_addrlen)) < 0) { errcode = errno; // store last accept errno locally. details.me.FreeAddrInfo(gai); PROF_stop(comm_accept); if (ignoreErrno(errno)) { debugs(50, 5, HERE << "FD " << fd << ": " << xstrerror()); return COMM_NOMESSAGE; } else if (ENFILE == errno || EMFILE == errno) { debugs(50, 3, HERE << "FD " << fd << ": " << xstrerror()); return COMM_ERROR; } else { debugs(50, 1, HERE << "FD " << fd << ": " << xstrerror()); return COMM_ERROR; } } + Must(sock >= 0); + newFd_ = sock; details.peer = *gai; if ( Config.client_ip_max_connections >= 0) { if (clientdbEstablished(details.peer, 0) > Config.client_ip_max_connections) { debugs(50, DBG_IMPORTANT, "WARNING: " << details.peer << " attempting more than " << Config.client_ip_max_connections << " connections."); details.me.FreeAddrInfo(gai); return COMM_ERROR; } } + // lookup the local-end details of this new connection details.me.InitAddrInfo(gai); - details.me.SetEmpty(); getsockname(sock, gai->ai_addr, &gai->ai_addrlen); details.me = *gai; - - commSetCloseOnExec(sock); + details.me.FreeAddrInfo(gai); /* fdstat update */ + // XXX : these are not all HTTP requests. use a note about type and ip:port details-> + // so we end up with a uniform "(HTTP|FTP-data|HTTPS|...) remote-ip:remote-port" fd_open(sock, FD_SOCKET, "HTTP Request"); fdd_table[sock].close_file = NULL; fdd_table[sock].close_line = 0; fde *F = &fd_table[sock]; details.peer.NtoA(F->ipaddr,MAX_IPSTRLEN); F->remote_port = details.peer.GetPort(); - F->local_addr.SetPort(details.me.GetPort()); + F->local_addr = details.me; F->sock_family = details.me.IsIPv6()?AF_INET6:AF_INET; - details.me.FreeAddrInfo(gai); + // set socket flags + commSetCloseOnExec(sock); commSetNonBlocking(sock); /* IFF the socket is (tproxy) transparent, pass the flag down to allow spoofing */ F->flags.transparent = fd_table[fd].flags.transparent; PROF_stop(comm_accept); - return sock; + return COMM_OK; } === renamed file 'src/comm/ListenStateData.h' => 'src/comm/TcpAcceptor.h' --- src/comm/ListenStateData.h 2010-11-27 01:58:38 +0000 +++ src/comm/TcpAcceptor.h 2011-01-09 00:24:09 +0000 @@ -1,54 +1,102 @@ -#ifndef SQUID_LISTENERSTATEDATA_H -#define SQUID_LISTENERSTATEDATA_H +#ifndef SQUID_COMM_TCPACCEPTOR_H +#define SQUID_COMM_TCPACCEPTOR_H #include "base/AsyncCall.h" -#include "comm.h" +#include "base/Subscription.h" +#include "CommCalls.h" +#include "comm_err_t.h" +#include "comm/TcpAcceptor.h" +#include "ip/Address.h" + #if HAVE_MAP #include #endif -class ConnectionDetail; - namespace Comm { -class ListenStateData +class AcceptLimiter; + +/** + * Listens on an FD for new incoming connections and + * emits an active FD descriptor for the new client. + * + * Handles all event limiting required to quash inbound connection + * floods within the global FD limits of available Squid_MaxFD and + * client_ip_max_connections. + * + * Fills the emitted connection with all connection details able to + * be looked up. Currently these are the local/remote IP:port details + * and the listening socket transparent-mode flag. + */ +class TcpAcceptor : public AsyncJob { +private: + virtual void start(); + virtual bool doneAll() const; + virtual void swanSong(); public: - ListenStateData(int fd, AsyncCall::Pointer &call, bool accept_many); - ListenStateData(const ListenStateData &r); // not implemented. - ~ListenStateData(); + TcpAcceptor(const int listenFd, const Ip::Address &laddr, int flags, + const char *note, const Subscription::Pointer &aSub); - void subscribe(AsyncCall::Pointer &call); + TcpAcceptor(const TcpAcceptor &r); // not implemented. + + /** Subscribe a handler to receive calls back about new connections. + * Replaces any existing subscribed handler. + */ + void subscribe(const Subscription::Pointer &aSub); + + /** Remove the currently waiting callback subscription. + * Pending calls will remain scheduled. + */ + void unsubscribe(const char *reason); + + /** Try and accept another connection (synchronous). + * If one is pending already the subscribed callback handler will be scheduled + * to handle it before this method returns. + */ void acceptNext(); - void notify(int newfd, comm_err_t flag, const ConnectionDetail &details); - int fd; + /// Call the subscribed callback handler with details about a new connection. + void notify(comm_err_t flags, const ConnectionDetail &newConnDetails); /// errno code of the last accept() or listen() action if one occurred. int errcode; - /// whether this socket is delayed and on the AcceptLimiter queue. - int32_t isLimited; +private: + friend class AcceptLimiter; + int32_t isLimited; ///< whether this socket is delayed and on the AcceptLimiter queue. + Subscription::Pointer theCallSub; ///< used to generate AsyncCalls handling our events. + +public: + /// conn being listened on for new connections + /// Reserved for read-only use. + // NP: public only until we can hide it behind connection handles + int fd; + +private: + /// IP Address and port being listened on + Ip::Address local_addr; + + /// temporary holder for newely accepted client FD + int newFd_; private: - /// Method to test if there are enough file escriptors to open a new client connection + /// Method to test if there are enough file descriptors to open a new client connection /// if not the accept() will be postponed static bool okToAccept(); /// Method callback for whenever an FD is ready to accept a client connection. static void doAccept(int fd, void *data); void acceptOne(); - int oldAccept(ConnectionDetail &details); - - AsyncCall::Pointer theCallback; - bool mayAcceptMore; - + comm_err_t oldAccept(ConnectionDetail &newConnDetails); void setListen(); + + CBDATA_CLASS2(TcpAcceptor); }; } // namespace Comm -#endif /* SQUID_LISTENERSTATEDATA_H */ +#endif /* SQUID_COMM_TCPACCEPTOR_H */ === modified file 'src/ftp.cc' --- src/ftp.cc 2011-01-14 14:10:21 +0000 +++ src/ftp.cc 2011-01-15 02:33:05 +0000 @@ -17,42 +17,43 @@ * sources; see the CREDITS file for full details. * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111, USA. * */ #include "squid.h" #include "comm.h" +#include "CommCalls.h" +#include "comm/TcpAcceptor.h" #include "comm/Write.h" -#include "comm/ListenStateData.h" #include "compat/strtoll.h" #include "ConnectionDetail.h" #include "errorpage.h" #include "fde.h" #include "forward.h" #include "html_quote.h" #include "HttpHdrContRange.h" #include "HttpHeaderRange.h" #include "HttpHeader.h" #include "HttpRequest.h" #include "HttpReply.h" #include "ip/tools.h" #include "MemBuf.h" #include "rfc1738.h" #include "Server.h" #include "SquidString.h" #include "SquidTime.h" #include "Store.h" #include "URLScheme.h" #include "wordlist.h" @@ -136,47 +137,45 @@ /// \ingroup ServerProtocolFTPInternal typedef void (FTPSM) (FtpStateData *); /// common code for FTP control and data channels // does not own the channel descriptor, which is managed by FtpStateData class FtpChannel { public: FtpChannel(): fd(-1) {} /// called after the socket is opened, sets up close handler void opened(int aFd, const AsyncCall::Pointer &aCloser); /** Handles all operations needed to properly close the active channel FD. * clearing the close handler, clearing the listen socket properly, and calling comm_close */ void close(); void clear(); /// just resets fd and close handler. does not close active connections. - int fd; /// channel descriptor; \todo: remove because the closer has it + int fd; /// channel descriptor - /** Current listening socket handler. delete on shutdown or abort. - * FTP stores a copy of the FD in the field fd above. - * Use close() to properly close the channel. - */ - Comm::ListenStateData *listener; + Ip::Address local; ///< The local IP address:port this channel is using + + int flags; ///< socket flags used when opening. private: AsyncCall::Pointer closer; /// Comm close handler callback }; /// \ingroup ServerProtocolFTPInternal class FtpStateData : public ServerStateData { public: void *operator new (size_t); void operator delete (void *); void *toCbdata() { return this; } FtpStateData(FwdState *); ~FtpStateData(); char user[MAX_URL]; char password[MAX_URL]; int password_url; char *reply_hdr; @@ -228,40 +227,46 @@ CBDATA_CLASS(FtpStateData); public: // these should all be private void start(); void loginParser(const char *, int escaped); int restartable(); void appendSuccessHeader(); void hackShortcut(FTPSM * nextState); void failed(err_type, int xerrno); void failedErrorMessage(err_type, int xerrno); void unhack(); void scheduleReadControlReply(int); void handleControlReply(); void readStor(); void parseListing(); MemBuf *htmlifyListEntry(const char *line); void completedListing(void); void dataComplete(); void dataRead(const CommIoCbParams &io); + + /// ignore timeout on CTRL channel. set read timeout on DATA channel. + void switchTimeoutToDataChannel(); + /// create a data channel acceptor and start listening. + void listenForDataChannel(const int fd, const char *note); + int checkAuth(const HttpHeader * req_hdr); void checkUrlpath(); void buildTitleUrl(); void writeReplyBody(const char *, size_t len); void printfReplyBody(const char *fmt, ...); virtual int dataDescriptor() const; virtual void maybeReadVirginBody(); virtual void closeServer(); virtual void completeForwarding(); virtual void abortTransaction(const char *reason); void processHeadResponse(); void processReplyBody(); void writeCommand(const char *buf); void setCurrentOffset(int64_t offset) { currentOffset = offset; } int64_t getCurrentOffset() const { return currentOffset; } static CNCB ftpPasvCallback; static PF ftpDataWrite; void ftpTimeout(const CommTimeoutCbParams &io); void ctrlClosed(const CommCloseCbParams &io); @@ -426,52 +431,53 @@ ftpReadEPSV, /* SENT_EPSV_ALL */ ftpReadEPSV, /* SENT_EPSV_1 */ ftpReadEPSV, /* SENT_EPSV_2 */ ftpReadPasv, /* SENT_PASV */ ftpReadCwd, /* SENT_CWD */ ftpReadList, /* SENT_LIST */ ftpReadList, /* SENT_NLST */ ftpReadRest, /* SENT_REST */ ftpReadRetr, /* SENT_RETR */ ftpReadStor, /* SENT_STOR */ ftpReadQuit, /* SENT_QUIT */ ftpReadTransferDone, /* READING_DATA (RETR,LIST,NLST) */ ftpWriteTransferDone, /* WRITING_DATA (STOR) */ ftpReadMkdir /* SENT_MKDIR */ }; /// handler called by Comm when FTP control channel is closed unexpectedly void FtpStateData::ctrlClosed(const CommCloseCbParams &io) { + debugs(9, 4, HERE); ctrl.clear(); deleteThis("FtpStateData::ctrlClosed"); } /// handler called by Comm when FTP data channel is closed unexpectedly void FtpStateData::dataClosed(const CommCloseCbParams &io) { - if (data.listener) { - delete data.listener; - data.listener = NULL; - data.fd = -1; + debugs(9, 4, HERE); + if (data.fd >= 0) { + comm_close(data.fd); + // NP clear() does the: data.fd = -1; } data.clear(); failed(ERR_FTP_FAILURE, 0); /* failed closes ctrl.fd and frees ftpState */ /* NP: failure recovery may be possible when its only a data.fd failure. * is the ctrl.fd is still fine, we can send ABOR down it and retry. * Just need to watch out for wider Squid states like shutting down or reconfigure. */ } FtpStateData::FtpStateData(FwdState *theFwdState) : AsyncJob("FtpStateData"), ServerStateData(theFwdState) { const char *url = entry->url(); debugs(9, 3, HERE << "'" << url << "'" ); statCounter.server.all.requests++; statCounter.server.ftp.requests++; theSize = -1; mdtm = -1; @@ -589,40 +595,67 @@ rfc1738_unescape(password); password_url = 1; } debugs(9, 9, HERE << ": found password='" << password << "'(" << len <<") unescaped."); } } else if (login[0]) { /* no password, just username */ if (total_len > MAX_URL) total_len = MAX_URL -1; xstrncpy(user, login, total_len +1); debugs(9, 9, HERE << ": found user='" << user << "'(" << total_len <<"), escaped=" << escaped); if (escaped) rfc1738_unescape(user); debugs(9, 9, HERE << ": found user='" << user << "'(" << total_len <<") unescaped."); } debugs(9, 9, HERE << ": OUT: login='" << login << "', escaped=" << escaped << ", user=" << user << ", password=" << password); } void +FtpStateData::switchTimeoutToDataChannel() +{ + commSetTimeout(ctrl.fd, -1, NULL, NULL); + + typedef CommCbMemFunT TimeoutDialer; + AsyncCall::Pointer timeoutCall = JobCallback(9, 5, TimeoutDialer, this, FtpStateData::ftpTimeout); + commSetTimeout(data.fd, Config.Timeout.read, timeoutCall); +} + +void +FtpStateData::listenForDataChannel(const int fd, const char *note) +{ + typedef CommCbMemFunT AcceptDialer; + typedef AsyncCallT AcceptCall; + RefCount call = static_cast(JobCallback(11, 5, AcceptDialer, this, FtpStateData::ftpAcceptDataConnection)); + Subscription::Pointer sub = new CallSubscription(call); + Comm::TcpAcceptor *tmp = new Comm::TcpAcceptor(fd, data.local, data.flags, note, sub); + + // Ensure we have a copy of the FD opened for listening and a close handler on it. + assert(data.fd == -1 || data.fd == tmp->fd); + data.fd = -1; + data.opened(tmp->fd, dataCloser()); + + AsyncJob::Start(tmp); +} + +void FtpStateData::ftpTimeout(const CommTimeoutCbParams &io) { debugs(9, 4, "ftpTimeout: FD " << io.fd << ": '" << entry->url() << "'" ); if (SENT_PASV == state && io.fd == data.fd) { /* stupid ftp.netscape.com */ fwd->dontRetry(false); fwd->ftpPasvFailed(true); debugs(9, DBG_IMPORTANT, "ftpTimeout: timeout in SENT_PASV state" ); } failed(ERR_READ_TIMEOUT, 0); /* failed() closes ctrl.fd and frees ftpState */ } #if DEAD_CODE // obsoleted by ERR_DIR_LISTING void FtpStateData::listingFinish() { // TODO: figure out what this means and how to show it ... @@ -1049,44 +1082,50 @@ size_t usable; size_t len = data.readBuf->contentSize(); if (!len) { debugs(9, 3, HERE << "no content to parse for " << entry->url() ); return; } /* * We need a NULL-terminated buffer for scanning, ick */ sbuf = (char *)xmalloc(len + 1); xstrncpy(sbuf, buf, len + 1); end = sbuf + len - 1; while (*end != '\r' && *end != '\n' && end > sbuf) end--; usable = end - sbuf; - debugs(9, 3, HERE << "usable = " << usable); + debugs(9, 3, HERE << "usable = " << usable << " of " << len << " bytes."); if (usable == 0) { - debugs(9, 3, HERE << "didn't find end for " << entry->url() ); + if (buf[0] == '\0' && len == 1) { + debugs(9, 3, HERE << "NIL ends data from " << entry->url() << " transfer problem?"); + data.readBuf->consume(len); + } else { + debugs(9, 3, HERE << "didn't find end for " << entry->url()); + debugs(9, 3, HERE << "buffer remains (" << len << " bytes) '" << rfc1738_do_escape(buf,0) << "'"); + } xfree(sbuf); return; } debugs(9, 3, HERE << (unsigned long int)len << " bytes to play with"); line = (char *)memAllocate(MEM_4K_BUF); end++; s = sbuf; s += strspn(s, crlf); for (; s < end; s += strcspn(s, crlf), s += strspn(s, crlf)) { debugs(9, 7, HERE << "s = {" << s << "}"); linelen = strcspn(s, crlf) + 1; if (linelen < 2) break; if (linelen > 4096) linelen = 4096; @@ -1121,41 +1160,48 @@ void FtpStateData::dataComplete() { debugs(9, 3,HERE); /* Connection closed; transfer done. */ /// Close data channel, if any, to conserve resources while we wait. data.close(); /* expect the "transfer complete" message on the control socket */ /* * DPW 2007-04-23 * Previously, this was the only place where we set the * 'buffered_ok' flag when calling scheduleReadControlReply(). * It caused some problems if the FTP server returns an unexpected * status code after the data command. FtpStateData was being * deleted in the middle of dataRead(). */ - scheduleReadControlReply(0); +// scheduleReadControlReply(0); + /* AYJ: 2011-01-13: 226 status possibly waiting in the ctrl buffer. + * The connection will hang is we DONT send buffered_ok. + * This happens on all transfers which can be completly sent by the + * server before the 150 started status message is read in by Squid. + * ie all transfers of about one packet hang. + */ + scheduleReadControlReply(1); } void FtpStateData::maybeReadVirginBody() { if (data.fd < 0) return; if (data.read_pending) return; const int read_sz = replyBodySpace(*data.readBuf, 0); debugs(11,9, HERE << "FTP may read up to " << read_sz << " bytes"); if (read_sz < 2) // see http.cc return; data.read_pending = true; @@ -1657,41 +1703,41 @@ */ void FtpStateData::scheduleReadControlReply(int buffered_ok) { debugs(9, 3, HERE << "FD " << ctrl.fd); if (buffered_ok && ctrl.offset > 0) { /* We've already read some reply data */ handleControlReply(); } else { /* XXX What about Config.Timeout.read? */ typedef CommCbMemFunT Dialer; AsyncCall::Pointer reader = JobCallback(9, 5, Dialer, this, FtpStateData::ftpReadControlReply); comm_read(ctrl.fd, ctrl.buf + ctrl.offset, ctrl.size - ctrl.offset, reader); /* * Cancel the timeout on the Data socket (if any) and * establish one on the control socket. */ - if (data.fd > -1) { + if (data.fd >= 0) { AsyncCall::Pointer nullCall = NULL; commSetTimeout(data.fd, -1, nullCall); } typedef CommCbMemFunT TimeoutDialer; AsyncCall::Pointer timeoutCall = JobCallback(9, 5, TimeoutDialer, this, FtpStateData::ftpTimeout); commSetTimeout(ctrl.fd, Config.Timeout.read, timeoutCall); } } void FtpStateData::ftpReadControlReply(const CommIoCbParams &io) { debugs(9, 3, "ftpReadControlReply: FD " << io.fd << ", Read " << io.size << " bytes"); if (io.size > 0) { kb_incr(&statCounter.server.all.kbytes_in, io.size); kb_incr(&statCounter.server.ftp.kbytes_in, io.size); } @@ -2705,104 +2751,78 @@ FtpStateData *ftpState = (FtpStateData *)data; debugs(9, 3, HERE); ftpState->request->recordLookup(dns); if (status != COMM_OK) { debugs(9, 2, HERE << "Failed to connect. Retrying without PASV."); ftpState->fwd->dontRetry(false); /* this is a retryable error */ ftpState->fwd->ftpPasvFailed(true); ftpState->failed(ERR_NONE, 0); /* failed closes ctrl.fd and frees ftpState */ return; } ftpRestOrList(ftpState); } /// \ingroup ServerProtocolFTPInternal static int ftpOpenListenSocket(FtpStateData * ftpState, int fallback) { - int fd; - Ip::Address addr; struct addrinfo *AI = NULL; - int on = 1; int x = 0; /// Close old data channels, if any. We may open a new one below. - ftpState->data.close(); + if (!(ftpState->data.flags & COMM_REUSEADDR)) + ftpState->data.close(); /* * Set up a listen socket on the same local address as the * control connection. */ - - addr.InitAddrInfo(AI); - + ftpState->data.local.InitAddrInfo(AI); x = getsockname(ftpState->ctrl.fd, AI->ai_addr, &AI->ai_addrlen); - - addr = *AI; - - addr.FreeAddrInfo(AI); + ftpState->data.local = *AI; + ftpState->data.local.FreeAddrInfo(AI); if (x) { debugs(9, DBG_CRITICAL, HERE << "getsockname(" << ftpState->ctrl.fd << ",..): " << xstrerror()); return -1; } /* * REUSEADDR is needed in fallback mode, since the same port is * used for both control and data. */ if (fallback) { + int on = 1; setsockopt(ftpState->ctrl.fd, SOL_SOCKET, SO_REUSEADDR, (char *) &on, sizeof(on)); + ftpState->ctrl.flags |= COMM_REUSEADDR; + ftpState->data.flags |= COMM_REUSEADDR; } else { /* if not running in fallback mode a new port needs to be retrieved */ - addr.SetPort(0); - } - - fd = comm_open(SOCK_STREAM, - IPPROTO_TCP, - addr, - COMM_NONBLOCKING | (fallback ? COMM_REUSEADDR : 0), - ftpState->entry->url()); - debugs(9, 3, HERE << "Unconnected data socket created on FD " << fd ); - - if (fd < 0) { - debugs(9, DBG_CRITICAL, HERE << "comm_open failed"); - return -1; + ftpState->data.local.SetPort(0); + ftpState->data.flags = COMM_NONBLOCKING; } - typedef CommCbMemFunT acceptDialer; - AsyncCall::Pointer acceptCall = JobCallback(11, 5, - acceptDialer, ftpState, FtpStateData::ftpAcceptDataConnection); - ftpState->data.listener = new Comm::ListenStateData(fd, acceptCall, false); - - if (!ftpState->data.listener || ftpState->data.listener->errcode != 0) { - comm_close(fd); - return -1; - } - - ftpState->data.opened(fd, ftpState->dataCloser()); - ftpState->data.port = comm_local_port(fd); - ftpState->data.host = NULL; - return fd; + ftpState->listenForDataChannel((fallback?ftpState->ctrl.fd:-1), ftpState->entry->url()); + return ftpState->data.fd; } /// \ingroup ServerProtocolFTPInternal static void ftpSendPORT(FtpStateData * ftpState) { int fd; Ip::Address ipa; struct addrinfo *AI = NULL; unsigned char *addrptr; unsigned char *portptr; /* check the server control channel is still available */ if (!ftpState || !ftpState->haveControlChannel("ftpSendPort")) return; if (Config.Ftp.epsv_all && ftpState->flags.epsv_all_sent) { debugs(9, DBG_IMPORTANT, "FTP does not allow PORT method after 'EPSV ALL' has been sent."); return; } @@ -2864,40 +2884,41 @@ if (Config.Ftp.epsv_all && ftpState->flags.epsv_all_sent) { debugs(9, DBG_IMPORTANT, "FTP does not allow EPRT method after 'EPSV ALL' has been sent."); return; } if (!Config.Ftp.eprt) { /* Disabled. Switch immediately to attempting old PORT command. */ debugs(9, 3, "EPRT disabled by local administrator"); ftpSendPORT(ftpState); return; } int fd; Ip::Address addr; struct addrinfo *AI = NULL; char buf[MAX_IPSTRLEN]; debugs(9, 3, HERE); ftpState->flags.pasv_supported = 0; fd = ftpOpenListenSocket(ftpState, 0); + debugs(9, 3, "Listening for FTP data connection with FD " << fd); Ip::Address::InitAddrInfo(AI); if (getsockname(fd, AI->ai_addr, &AI->ai_addrlen)) { Ip::Address::FreeAddrInfo(AI); debugs(9, DBG_CRITICAL, HERE << "getsockname(" << fd << ",..): " << xstrerror()); /* XXX Need to set error message */ ftpFail(ftpState); return; } addr = *AI; /* RFC 2428 defines EPRT as IPv6 equivalent to IPv4 PORT command. */ /* Which can be used by EITHER protocol. */ snprintf(cbuf, 1024, "EPRT |%d|%s|%d|\r\n", ( addr.IsIPv6() ? 2 : 1 ), addr.NtoA(buf,MAX_IPSTRLEN), addr.GetPort() ); @@ -2916,111 +2937,102 @@ if (code != 200) { /* Failover to attempting old PORT command. */ debugs(9, 3, "EPRT not supported by remote end"); ftpSendPORT(ftpState); return; } ftpRestOrList(ftpState); } /** \ingroup ServerProtocolFTPInternal \par * "read" handler to accept FTP data connections. * \param io comm accept(2) callback parameters */ void FtpStateData::ftpAcceptDataConnection(const CommAcceptCbParams &io) { - char ntoapeer[MAX_IPSTRLEN]; - debugs(9, 3, "ftpAcceptDataConnection"); - - // one connection accepted. the handler has stopped listening. drop our local pointer to it. - data.listener = NULL; + debugs(9, 3, HERE); if (EBIT_TEST(entry->flags, ENTRY_ABORTED)) { abortTransaction("entry aborted when accepting data conn"); return; } + if (io.flag != COMM_OK) { + data.close(); + debugs(9, DBG_IMPORTANT, "FTP AcceptDataConnection: FD " << io.fd << ": " << xstrerr(io.xerrno)); + /** \todo Need to send error message on control channel*/ + ftpFail(this); + return; + } + + /* data listening conn is no longer even open. abort. */ + if (data.fd <= 0 || fd_table[data.fd].flags.open == 0) { + data.clear(); // ensure that it's cleared and not just closed. + return; + } + /** \par * When squid.conf ftp_sanitycheck is enabled, check the new connection is actually being * made by the remote client which is connected to the FTP control socket. + * Or the one which we were told to listen for by control channel messages (may differ under NAT). * This prevents third-party hacks, but also third-party load balancing handshakes. */ if (Config.Ftp.sanitycheck) { + char ntoapeer[MAX_IPSTRLEN]; io.details.peer.NtoA(ntoapeer,MAX_IPSTRLEN); - if (strcmp(fd_table[ctrl.fd].ipaddr, ntoapeer) != 0) { + if (strcmp(fd_table[ctrl.fd].ipaddr, ntoapeer) != 0 && + strcmp(fd_table[data.fd].ipaddr, ntoapeer) != 0) { debugs(9, DBG_IMPORTANT, "FTP data connection from unexpected server (" << io.details.peer << "), expecting " << - fd_table[ctrl.fd].ipaddr); + fd_table[ctrl.fd].ipaddr << " or " << fd_table[data.fd].ipaddr); - /* close the bad soures connection down ASAP. */ + /* close the bad sources connection down ASAP. */ comm_close(io.nfd); - /* we are ony accepting once, so need to re-open the listener socket. */ - typedef CommCbMemFunT acceptDialer; - AsyncCall::Pointer acceptCall = JobCallback(11, 5, - acceptDialer, this, FtpStateData::ftpAcceptDataConnection); - data.listener = new Comm::ListenStateData(data.fd, acceptCall, false); + /* drop the bad connection (io) by ignoring the attempt. */ return; } } - if (io.flag != COMM_OK) { - debugs(9, DBG_IMPORTANT, "ftpHandleDataAccept: FD " << io.nfd << ": " << xstrerr(io.xerrno)); - /** \todo XXX Need to set error message */ - ftpFail(this); - return; - } - /**\par - * Replace the Listen socket with the accepted data socket */ + * Replace the Listening socket with the accepted data socket */ data.close(); data.opened(io.nfd, dataCloser()); data.port = io.details.peer.GetPort(); - io.details.peer.NtoA(data.host,SQUIDHOSTNAMELEN); + data.host = xstrdup(fd_table[io.nfd].ipaddr); debugs(9, 3, "ftpAcceptDataConnection: Connected data socket on " << "FD " << io.nfd << " to " << io.details.peer << " FD table says: " << "ctrl-peer= " << fd_table[ctrl.fd].ipaddr << ", " << "data-peer= " << fd_table[data.fd].ipaddr); + assert(haveControlChannel("ftpAcceptDataConnection")); + assert(ctrl.message == NULL); - AsyncCall::Pointer nullCall = NULL; - commSetTimeout(ctrl.fd, -1, nullCall); - - typedef CommCbMemFunT TimeoutDialer; - AsyncCall::Pointer timeoutCall = JobCallback(9, 5, - TimeoutDialer, this, FtpStateData::ftpTimeout); - commSetTimeout(data.fd, Config.Timeout.read, timeoutCall); - - /*\todo XXX We should have a flag to track connect state... - * host NULL -> not connected, port == local port - * host set -> connected, port == remote port - */ - /* Restart state (SENT_NLST/LIST/RETR) */ - FTP_SM_FUNCS[state] (this); + // Ctrl channel operations will determine what happens to this data connection } /// \ingroup ServerProtocolFTPInternal static void ftpRestOrList(FtpStateData * ftpState) { debugs(9, 3, HERE); if (ftpState->typecode == 'D') { ftpState->flags.isdir = 1; if (ftpState->flags.put) { ftpSendMkdir(ftpState); /* PUT name;type=d */ } else { ftpSendNlst(ftpState); /* GET name;type=d sec 3.2.2 of RFC 1738 */ } } else if (ftpState->flags.put) { ftpSendStor(ftpState); } else if (ftpState->flags.isdir) ftpSendList(ftpState); @@ -3058,68 +3070,52 @@ /// \ingroup ServerProtocolFTPInternal /// \deprecated use ftpState->readStor() instead. static void ftpReadStor(FtpStateData * ftpState) { ftpState->readStor(); } void FtpStateData::readStor() { int code = ctrl.replycode; debugs(9, 3, HERE); if (code == 125 || (code == 150 && data.host)) { if (!startRequestBodyFlow()) { // register to receive body data ftpFail(this); return; } - /*\par - * When client status is 125, or 150 without a hostname, Begin data transfer. */ + /* When client status is 125, or 150 without a hostname, Begin data transfer. */ debugs(9, 3, HERE << "starting data transfer"); + switchTimeoutToDataChannel(); sendMoreRequestBody(); - /** \par - * Cancel the timeout on the Control socket and - * establish one on the data socket. - */ - AsyncCall::Pointer nullCall = NULL; - commSetTimeout(ctrl.fd, -1, nullCall); - - typedef CommCbMemFunT TimeoutDialer; - AsyncCall::Pointer timeoutCall = JobCallback(9, 5, - TimeoutDialer, this, FtpStateData::ftpTimeout); - - commSetTimeout(data.fd, Config.Timeout.read, timeoutCall); - state = WRITING_DATA; debugs(9, 3, HERE << "writing data channel"); } else if (code == 150) { /*\par - * When client code is 150 with a hostname, Accept data channel. */ + * When client code is 150 without a hostname, Accept data channel. */ debugs(9, 3, "ftpReadStor: accepting data channel"); - typedef CommCbMemFunT acceptDialer; - AsyncCall::Pointer acceptCall = JobCallback(11, 5, - acceptDialer, this, FtpStateData::ftpAcceptDataConnection); - - data.listener = new Comm::ListenStateData(data.fd, acceptCall, false); + switchTimeoutToDataChannel(); + listenForDataChannel(data.fd, data.host); } else { debugs(9, DBG_IMPORTANT, HERE << "Unexpected reply code "<< std::setfill('0') << std::setw(3) << code); ftpFail(this); } } /// \ingroup ServerProtocolFTPInternal static void ftpSendRest(FtpStateData * ftpState) { /* check the server control channel is still available */ if (!ftpState || !ftpState->haveControlChannel("ftpSendRest")) return; debugs(9, 3, HERE); snprintf(cbuf, 1024, "REST %"PRId64"\r\n", ftpState->restart_offset); ftpState->writeCommand(cbuf); ftpState->state = SENT_REST; } @@ -3205,129 +3201,92 @@ if (ftpState->filepath) { snprintf(cbuf, 1024, "NLST %s\r\n", ftpState->filepath); } else { snprintf(cbuf, 1024, "NLST\r\n"); } ftpState->writeCommand(cbuf); ftpState->state = SENT_NLST; } /// \ingroup ServerProtocolFTPInternal static void ftpReadList(FtpStateData * ftpState) { int code = ftpState->ctrl.replycode; debugs(9, 3, HERE); if (code == 125 || (code == 150 && ftpState->data.host)) { /* Begin data transfer */ - /* XXX what about Config.Timeout.read? */ + debugs(9, 3, HERE << "begin data transfer from " << ftpState->data.host << " (" << ftpState->data.local << ")"); + ftpState->switchTimeoutToDataChannel(); ftpState->maybeReadVirginBody(); ftpState->state = READING_DATA; - /* - * Cancel the timeout on the Control socket and establish one - * on the data socket - */ - AsyncCall::Pointer nullCall = NULL; - commSetTimeout(ftpState->ctrl.fd, -1, nullCall); return; } else if (code == 150) { /* Accept data channel */ - typedef CommCbMemFunT acceptDialer; - AsyncCall::Pointer acceptCall = JobCallback(11, 5, - acceptDialer, ftpState, FtpStateData::ftpAcceptDataConnection); - - ftpState->data.listener = new Comm::ListenStateData(ftpState->data.fd, acceptCall, false); - /* - * Cancel the timeout on the Control socket and establish one - * on the data socket - */ - AsyncCall::Pointer nullCall = NULL; - commSetTimeout(ftpState->ctrl.fd, -1, nullCall); - - typedef CommCbMemFunT TimeoutDialer; - AsyncCall::Pointer timeoutCall = JobCallback(9, 5, - TimeoutDialer, ftpState,FtpStateData::ftpTimeout); - commSetTimeout(ftpState->data.fd, Config.Timeout.read, timeoutCall); + debugs(9, 3, HERE << "accept data channel from " << ftpState->data.host << " (" << ftpState->data.local << ")"); + ftpState->switchTimeoutToDataChannel(); + ftpState->listenForDataChannel(ftpState->data.fd, ftpState->data.host); return; } else if (!ftpState->flags.tried_nlst && code > 300) { ftpSendNlst(ftpState); } else { ftpFail(ftpState); return; } } /// \ingroup ServerProtocolFTPInternal static void ftpSendRetr(FtpStateData * ftpState) { /* check the server control channel is still available */ if (!ftpState || !ftpState->haveControlChannel("ftpSendRetr")) return; debugs(9, 3, HERE); assert(ftpState->filepath != NULL); snprintf(cbuf, 1024, "RETR %s\r\n", ftpState->filepath); ftpState->writeCommand(cbuf); ftpState->state = SENT_RETR; } /// \ingroup ServerProtocolFTPInternal static void ftpReadRetr(FtpStateData * ftpState) { int code = ftpState->ctrl.replycode; debugs(9, 3, HERE); if (code == 125 || (code == 150 && ftpState->data.host)) { /* Begin data transfer */ debugs(9, 3, HERE << "reading data channel"); - /* XXX what about Config.Timeout.read? */ + ftpState->switchTimeoutToDataChannel(); ftpState->maybeReadVirginBody(); ftpState->state = READING_DATA; - /* - * Cancel the timeout on the Control socket and establish one - * on the data socket - */ - AsyncCall::Pointer nullCall = NULL; - commSetTimeout(ftpState->ctrl.fd, -1, nullCall); } else if (code == 150) { /* Accept data channel */ - typedef CommCbMemFunT acceptDialer; - AsyncCall::Pointer acceptCall = JobCallback(11, 5, - acceptDialer, ftpState, FtpStateData::ftpAcceptDataConnection); - ftpState->data.listener = new Comm::ListenStateData(ftpState->data.fd, acceptCall, false); - /* - * Cancel the timeout on the Control socket and establish one - * on the data socket - */ - AsyncCall::Pointer nullCall = NULL; - commSetTimeout(ftpState->ctrl.fd, -1, nullCall); - - typedef CommCbMemFunT TimeoutDialer; - AsyncCall::Pointer timeoutCall = JobCallback(9, 5, - TimeoutDialer, ftpState,FtpStateData::ftpTimeout); - commSetTimeout(ftpState->data.fd, Config.Timeout.read, timeoutCall); + ftpState->switchTimeoutToDataChannel(); + ftpState->listenForDataChannel(ftpState->data.fd, ftpState->data.host); } else if (code >= 300) { if (!ftpState->flags.try_slash_hack) { /* Try this as a directory missing trailing slash... */ ftpState->hackShortcut(ftpSendCwd); } else { ftpFail(ftpState); } } else { ftpFail(ftpState); } } /** * Generate the HTTP headers and template fluff around an FTP * directory listing display. */ void FtpStateData::completedListing() { assert(entry); @@ -3948,48 +3907,51 @@ AsyncCall::Pointer FtpStateData::dataCloser() { typedef CommCbMemFunT Dialer; return JobCallback(9, 5, Dialer, this, FtpStateData::dataClosed); } /// configures the channel with a descriptor and registers a close handler void FtpChannel::opened(int aFd, const AsyncCall::Pointer &aCloser) { assert(fd < 0); assert(closer == NULL); assert(aFd >= 0); assert(aCloser != NULL); fd = aFd; closer = aCloser; comm_add_close_handler(fd, closer); + + // grab the local IP address:port details for this connection + struct addrinfo *AI = NULL; + local.InitAddrInfo(AI); + getsockname(aFd, AI->ai_addr, &AI->ai_addrlen); + local = *AI; + local.FreeAddrInfo(AI); } /// planned close: removes the close handler and calls comm_close void FtpChannel::close() { // channels with active listeners will be closed when the listener handler dies. - if (listener) { - delete listener; - listener = NULL; - comm_remove_close_handler(fd, closer); - closer = NULL; - fd = -1; - } else if (fd >= 0) { - comm_remove_close_handler(fd, closer); - closer = NULL; + if (fd >= 0) { + if (closer != NULL) { + comm_remove_close_handler(fd, closer); + closer = NULL; + } comm_close(fd); // we do not expect to be called back fd = -1; } } /// just resets fd and close handler void FtpChannel::clear() { fd = -1; closer = NULL; } === modified file 'src/htcp.cc' --- src/htcp.cc 2011-01-10 09:43:43 +0000 +++ src/htcp.cc 2011-01-10 09:56:40 +0000 @@ -1500,46 +1500,47 @@ if (Config.Port.htcp <= 0) { debugs(31, 1, "HTCP Disabled."); return; } Ip::Address incomingAddr = Config.Addrs.udp_incoming; incomingAddr.SetPort(Config.Port.htcp); if (!Ip::EnableIpv6 && !incomingAddr.SetIPv4()) { debugs(31, DBG_CRITICAL, "ERROR: IPv6 is disabled. " << incomingAddr << " is not an IPv4 address."); fatal("HTCP port cannot be opened."); } /* split-stack for now requires default IPv4-only HTCP */ if (Ip::EnableIpv6&IPV6_SPECIAL_SPLITSTACK && incomingAddr.IsAnyAddr()) { incomingAddr.SetIPv4(); } AsyncCall::Pointer call = asyncCall(31, 2, "htcpIncomingConnectionOpened", HtcpListeningStartedDialer(&htcpIncomingConnectionOpened)); + Subscription::Pointer nilSub; Ipc::StartListening(SOCK_DGRAM, IPPROTO_UDP, incomingAddr, COMM_NONBLOCKING, - Ipc::fdnInHtcpSocket, call); + Ipc::fdnInHtcpSocket, call, nilSub); if (!Config.Addrs.udp_outgoing.IsNoAddr()) { Ip::Address outgoingAddr = Config.Addrs.udp_outgoing; outgoingAddr.SetPort(Config.Port.htcp); if (!Ip::EnableIpv6 && !outgoingAddr.SetIPv4()) { debugs(31, DBG_CRITICAL, "ERROR: IPv6 is disabled. " << outgoingAddr << " is not an IPv4 address."); fatal("HTCP port cannot be opened."); } /* split-stack for now requires default IPv4-only HTCP */ if (Ip::EnableIpv6&IPV6_SPECIAL_SPLITSTACK && outgoingAddr.IsAnyAddr()) { outgoingAddr.SetIPv4(); } enter_suid(); htcpOutSocket = comm_open_listener(SOCK_DGRAM, IPPROTO_UDP, outgoingAddr, COMM_NONBLOCKING, "Outgoing HTCP Socket"); === modified file 'src/icp_v2.cc' --- src/icp_v2.cc 2011-01-10 09:43:43 +0000 +++ src/icp_v2.cc 2011-01-10 09:56:40 +0000 @@ -682,45 +682,47 @@ if ((port = Config.Port.icp) <= 0) return; addr = Config.Addrs.udp_incoming; addr.SetPort(port); if (!Ip::EnableIpv6 && !addr.SetIPv4()) { debugs(12, DBG_CRITICAL, "ERROR: IPv6 is disabled. " << addr << " is not an IPv4 address."); fatal("ICP port cannot be opened."); } /* split-stack for now requires default IPv4-only ICP */ if (Ip::EnableIpv6&IPV6_SPECIAL_SPLITSTACK && addr.IsAnyAddr()) { addr.SetIPv4(); } AsyncCall::Pointer call = asyncCall(12, 2, "icpIncomingConnectionOpened", IcpListeningStartedDialer(&icpIncomingConnectionOpened, addr)); + Subscription::Pointer nilSub; + Ipc::StartListening(SOCK_DGRAM, IPPROTO_UDP, addr, COMM_NONBLOCKING, - Ipc::fdnInIcpSocket, call); + Ipc::fdnInIcpSocket, call, nilSub); addr.SetEmpty(); // clear for next use. addr = Config.Addrs.udp_outgoing; if ( !addr.IsNoAddr() ) { enter_suid(); addr.SetPort(port); if (!Ip::EnableIpv6 && !addr.SetIPv4()) { debugs(49, DBG_CRITICAL, "ERROR: IPv6 is disabled. " << addr << " is not an IPv4 address."); fatal("ICP port cannot be opened."); } /* split-stack for now requires default IPv4-only ICP */ if (Ip::EnableIpv6&IPV6_SPECIAL_SPLITSTACK && addr.IsAnyAddr()) { addr.SetIPv4(); } theOutIcpConnection = comm_open_listener(SOCK_DGRAM, IPPROTO_UDP, addr, COMM_NONBLOCKING, === modified file 'src/ipc/SharedListen.cc' --- src/ipc/SharedListen.cc 2010-10-28 18:52:59 +0000 +++ src/ipc/SharedListen.cc 2011-01-09 00:21:05 +0000 @@ -133,22 +133,23 @@ Must(TheSharedListenRequestMap.find(response.mapId) != TheSharedListenRequestMap.end()); PendingOpenRequest por = TheSharedListenRequestMap[response.mapId]; Must(por.callback != NULL); TheSharedListenRequestMap.erase(response.mapId); if (fd >= 0) { OpenListenerParams &p = por.params; struct addrinfo *AI = NULL; p.addr.GetAddrInfo(AI); AI->ai_socktype = p.sock_type; AI->ai_protocol = p.proto; comm_import_opened(fd, p.addr, p.flags, FdNote(p.fdNote), AI); p.addr.FreeAddrInfo(AI); } StartListeningCb *cbd = dynamic_cast(por.callback->getDialer()); Must(cbd); cbd->fd = fd; cbd->errNo = response.errNo; + cbd->handlerSubscription = por.params.handlerSubscription; ScheduleCallHere(por.callback); } === modified file 'src/ipc/SharedListen.h' --- src/ipc/SharedListen.h 2010-07-06 23:09:44 +0000 +++ src/ipc/SharedListen.h 2011-01-09 00:18:50 +0000 @@ -1,50 +1,54 @@ /* * $Id$ * * DEBUG: section 54 Interprocess Communication * */ #ifndef SQUID_IPC_SHARED_LISTEN_H #define SQUID_IPC_SHARED_LISTEN_H #include "base/AsyncCall.h" +#include "base/Subscription.h" namespace Ipc { /// "shared listen" is when concurrent processes are listening on the same fd /// comm_open_listener() parameters holder class OpenListenerParams { public: OpenListenerParams(); bool operator <(const OpenListenerParams &p) const; ///< useful for map<> int sock_type; int proto; Ip::Address addr; ///< will be memset and memcopied int flags; int fdNote; ///< index into fd_note() comment strings + + /// handler to subscribe to Comm::TcpAcceptor + Subscription::Pointer handlerSubscription; }; class TypedMsgHdr; /// a request for a listen socket with given parameters class SharedListenRequest { public: SharedListenRequest(); ///< from OpenSharedListen() which then sets public data explicit SharedListenRequest(const TypedMsgHdr &hdrMsg); ///< from recvmsg() void pack(TypedMsgHdr &hdrMsg) const; ///< prepare for sendmsg() public: int requestorId; ///< kidId of the requestor OpenListenerParams params; ///< actual comm_open_sharedListen() parameters int mapId; ///< to map future response to the requestor's callback }; === modified file 'src/ipc/StartListening.cc' --- src/ipc/StartListening.cc 2010-07-06 23:09:44 +0000 +++ src/ipc/StartListening.cc 2011-01-10 00:52:21 +0000 @@ -1,58 +1,66 @@ /* * $Id$ * * DEBUG: section 54 Interprocess Communication * */ #include "config.h" -#include "comm.h" #include "base/TextException.h" +#include "comm.h" +#include "comm/TcpAcceptor.h" #include "ipc/SharedListen.h" #include "ipc/StartListening.h" Ipc::StartListeningCb::StartListeningCb(): fd(-1), errNo(0) { } Ipc::StartListeningCb::~StartListeningCb() { } std::ostream &Ipc::StartListeningCb::startPrint(std::ostream &os) const { return os << "(FD " << fd << ", err=" << errNo; } - -void Ipc::StartListening(int sock_type, int proto, Ip::Address &addr, - int flags, FdNoteId fdNote, AsyncCall::Pointer &callback) +void +Ipc::StartListening(int sock_type, int proto, Ip::Address &addr, int flags, + FdNoteId fdNote, AsyncCall::Pointer &callback, const Subscription::Pointer &sub) { - OpenListenerParams p; - p.sock_type = sock_type; - p.proto = proto; - p.addr = addr; - p.flags = flags; - p.fdNote = fdNote; - - if (UsingSmp()) { // if SMP is on, share + if (UsingSmp()) { // if SMP is on, share + OpenListenerParams p; + p.sock_type = sock_type; + p.proto = proto; + p.addr = addr; + p.flags = flags; + p.fdNote = fdNote; + p.handlerSubscription = sub; Ipc::JoinSharedListen(p, callback); return; // wait for the call back } + StartListeningCb *cbd = dynamic_cast(callback->getDialer()); + Must(cbd); + enter_suid(); - const int sock = comm_open_listener(p.sock_type, p.proto, p.addr, p.flags, - FdNote(p.fdNote)); - const int errNo = (sock >= 0) ? 0 : errno; + if (sock_type == SOCK_STREAM) { + // TCP: setup a job to handle accept() with subscribed handler + Comm::TcpAcceptor *tmp = new Comm::TcpAcceptor(cbd->fd, addr, flags, FdNote(fdNote), sub); + cbd->fd = tmp->fd; + AsyncJob::Start(tmp); + } else if (sock_type == SOCK_DGRAM) { + // UDP: setup the listener socket, but do not set a subscriber + // TODO: create a UDP sbscription so packet event calls get scheduled and queued Async. + cbd->fd = comm_open_listener(sock_type, proto, addr, flags, FdNote(fdNote)); + } else { + fatalf("Invalid Socket Type (%d)",sock_type); + } + cbd->errNo = cbd->fd >= 0 ? 0 : errno; leave_suid(); - debugs(54, 3, HERE << "opened listen FD " << sock << " for " << p.addr); - - StartListeningCb *cbd = - dynamic_cast(callback->getDialer()); - Must(cbd); - cbd->fd = sock; - cbd->errNo = errNo; + debugs(54, 3, HERE << "opened listen FD " << cbd->fd << " on " << addr); ScheduleCallHere(callback); } === modified file 'src/ipc/StartListening.h' --- src/ipc/StartListening.h 2010-11-21 04:40:05 +0000 +++ src/ipc/StartListening.h 2011-01-09 00:30:35 +0000 @@ -1,45 +1,49 @@ /* * $Id$ * * DEBUG: section 54 Interprocess Communication * */ #ifndef SQUID_IPC_START_LISTENING_H #define SQUID_IPC_START_LISTENING_H #include "ip/forward.h" #include "ipc/FdNotes.h" #include "base/AsyncCall.h" +#include "base/Subscription.h" #if HAVE_IOSFWD #include #endif namespace Ipc { /// common API for all StartListening() callbacks class StartListeningCb { public: StartListeningCb(); virtual ~StartListeningCb(); /// starts printing arguments, return os std::ostream &startPrint(std::ostream &os) const; public: int fd; ///< opened listening socket or -1 int errNo; ///< errno value from the comm_open_listener() call + + /// The subscription we will pass on to the Comm::TcpAcceptor + Subscription::Pointer handlerSubscription; }; /// Depending on whether SMP is on, either ask Coordinator to send us /// the listening FD or call comm_open_listener() directly. -extern void StartListening(int sock_type, int proto, Ip::Address &addr, - int flags, FdNoteId fdNote, AsyncCall::Pointer &callback); +extern void StartListening(int sock_type, int proto, Ip::Address &addr, int flags, + FdNoteId fdNote, AsyncCall::Pointer &callback, const Subscription::Pointer &sub); } // namespace Ipc; #endif /* SQUID_IPC_START_LISTENING_H */