/***************************************************************************** $Id$ File: binder.cpp Date: 07Apr06 Copyright (C) 2006-07 by Francis Cianfrocca. All Rights Reserved. Gmail: blackhedd This program is free software; you can redistribute it and/or modify it under the terms of either: 1) the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version; or 2) Ruby's License. See the file COPYING for complete licensing information. *****************************************************************************/ #include "project.h" #define DEV_URANDOM "/dev/urandom" map Bindable_t::BindingBag; /******************************** STATIC Bindable_t::CreateBinding ********************************/ string Bindable_t::CreateBinding() { static int index = 0; static string seed; if ((index >= 1000000) || (seed.length() == 0)) { #ifdef OS_UNIX int fd = open (DEV_URANDOM, O_RDONLY); if (fd < 0) throw std::runtime_error ("No entropy device"); unsigned char u[16]; size_t r = read (fd, u, sizeof(u)); if (r < sizeof(u)) throw std::runtime_error ("Unable to read entropy device"); unsigned char *u1 = (unsigned char*)u; char u2 [sizeof(u) * 2 + 1]; for (size_t i=0; i < sizeof(u); i++) sprintf (u2 + (i * 2), "%02x", u1[i]); seed = string (u2); #endif #ifdef OS_WIN32 UUID uuid; UuidCreate (&uuid); unsigned char *uuidstring = NULL; UuidToString (&uuid, &uuidstring); if (!uuidstring) throw std::runtime_error ("Unable to read uuid"); seed = string ((const char*)uuidstring); RpcStringFree (&uuidstring); #endif index = 0; } stringstream ss; ss << seed << (++index); return ss.str(); } /***************************** STATIC: Bindable_t::GetObject *****************************/ Bindable_t *Bindable_t::GetObject (const char *binding) { string s (binding ? binding : ""); return GetObject (s); } /***************************** STATIC: Bindable_t::GetObject *****************************/ Bindable_t *Bindable_t::GetObject (const string &binding) { map::const_iterator i = BindingBag.find (binding); if (i != BindingBag.end()) return i->second; else return NULL; } /********************** Bindable_t::Bindable_t **********************/ Bindable_t::Bindable_t() { Binding = Bindable_t::CreateBinding(); BindingBag [Binding] = this; } /*********************** Bindable_t::~Bindable_t ***********************/ Bindable_t::~Bindable_t() { BindingBag.erase (Binding); } /***************************************************************************** $Id$ File: cmain.cpp Date: 06Apr06 Copyright (C) 2006-07 by Francis Cianfrocca. All Rights Reserved. Gmail: blackhedd This program is free software; you can redistribute it and/or modify it under the terms of either: 1) the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version; or 2) Ruby's License. See the file COPYING for complete licensing information. *****************************************************************************/ #include "project.h" static EventMachine_t *EventMachine; static int bUseEpoll = 0; static int bUseKqueue = 0; extern "C" void ensure_eventmachine (const char *caller = "unknown caller") { if (!EventMachine) { const int err_size = 128; char err_string[err_size]; snprintf (err_string, err_size, "eventmachine not initialized: %s", caller); #ifdef BUILD_FOR_RUBY rb_raise(rb_eRuntimeError, err_string); #else throw std::runtime_error (err_string); #endif } } /*********************** evma_initialize_library ***********************/ extern "C" void evma_initialize_library (void(*cb)(const char*, int, const char*, int)) { // Probably a bad idea to mess with the signal mask of a process // we're just being linked into. //InstallSignalHandlers(); if (EventMachine) #ifdef BUILD_FOR_RUBY rb_raise(rb_eRuntimeError, "eventmachine already initialized: evma_initialize_library"); #else throw std::runtime_error ("eventmachine already initialized: evma_initialize_library"); #endif EventMachine = new EventMachine_t (cb); if (bUseEpoll) EventMachine->_UseEpoll(); if (bUseKqueue) EventMachine->_UseKqueue(); } /******************** evma_release_library ********************/ extern "C" void evma_release_library() { ensure_eventmachine("evma_release_library"); delete EventMachine; EventMachine = NULL; } /**************** evma_run_machine ****************/ extern "C" void evma_run_machine() { ensure_eventmachine("evma_run_machine"); EventMachine->Run(); } /************************** evma_install_oneshot_timer **************************/ extern "C" const char *evma_install_oneshot_timer (int seconds) { ensure_eventmachine("evma_install_oneshot_timer"); return EventMachine->InstallOneshotTimer (seconds); } /********************** evma_connect_to_server **********************/ extern "C" const char *evma_connect_to_server (const char *server, int port) { ensure_eventmachine("evma_connect_to_server"); return EventMachine->ConnectToServer (server, port); } /*************************** evma_connect_to_unix_server ***************************/ extern "C" const char *evma_connect_to_unix_server (const char *server) { ensure_eventmachine("evma_connect_to_unix_server"); return EventMachine->ConnectToUnixServer (server); } /************** evma_attach_fd **************/ extern "C" const char *evma_attach_fd (int file_descriptor, int notify_readable, int notify_writable) { ensure_eventmachine("evma_attach_fd"); return EventMachine->AttachFD (file_descriptor, (notify_readable ? true : false), (notify_writable ? true : false)); } /************** evma_detach_fd **************/ extern "C" int evma_detach_fd (const char *binding) { ensure_eventmachine("evma_dettach_fd"); EventableDescriptor *ed = dynamic_cast (Bindable_t::GetObject (binding)); if (ed) return EventMachine->DetachFD (ed); else #ifdef BUILD_FOR_RUBY rb_raise(rb_eRuntimeError, "invalid binding to detach"); #else throw std::runtime_error ("invalid binding to detach"); #endif } /********************** evma_create_tcp_server **********************/ extern "C" const char *evma_create_tcp_server (const char *address, int port) { ensure_eventmachine("evma_create_tcp_server"); return EventMachine->CreateTcpServer (address, port); } /****************************** evma_create_unix_domain_server ******************************/ extern "C" const char *evma_create_unix_domain_server (const char *filename) { ensure_eventmachine("evma_create_unix_domain_server"); return EventMachine->CreateUnixDomainServer (filename); } /************************* evma_open_datagram_socket *************************/ extern "C" const char *evma_open_datagram_socket (const char *address, int port) { ensure_eventmachine("evma_open_datagram_socket"); return EventMachine->OpenDatagramSocket (address, port); } /****************** evma_open_keyboard ******************/ extern "C" const char *evma_open_keyboard() { ensure_eventmachine("evma_open_keyboard"); return EventMachine->OpenKeyboard(); } /**************************** evma_send_data_to_connection ****************************/ extern "C" int evma_send_data_to_connection (const char *binding, const char *data, int data_length) { ensure_eventmachine("evma_send_data_to_connection"); return ConnectionDescriptor::SendDataToConnection (binding, data, data_length); } /****************** evma_send_datagram ******************/ extern "C" int evma_send_datagram (const char *binding, const char *data, int data_length, const char *address, int port) { ensure_eventmachine("evma_send_datagram"); return DatagramDescriptor::SendDatagram (binding, data, data_length, address, port); } /********************* evma_close_connection *********************/ extern "C" void evma_close_connection (const char *binding, int after_writing) { ensure_eventmachine("evma_close_connection"); ConnectionDescriptor::CloseConnection (binding, (after_writing ? true : false)); } /*********************************** evma_report_connection_error_status ***********************************/ extern "C" int evma_report_connection_error_status (const char *binding) { ensure_eventmachine("evma_report_connection_error_status"); return ConnectionDescriptor::ReportErrorStatus (binding); } /******************** evma_stop_tcp_server ********************/ extern "C" void evma_stop_tcp_server (const char *binding) { ensure_eventmachine("evma_stop_tcp_server"); AcceptorDescriptor::StopAcceptor (binding); } /***************** evma_stop_machine *****************/ extern "C" void evma_stop_machine() { ensure_eventmachine("evma_stop_machine"); EventMachine->ScheduleHalt(); } /************** evma_start_tls **************/ extern "C" void evma_start_tls (const char *binding) { ensure_eventmachine("evma_start_tls"); EventableDescriptor *ed = dynamic_cast (Bindable_t::GetObject (binding)); if (ed) ed->StartTls(); } /****************** evma_set_tls_parms ******************/ extern "C" void evma_set_tls_parms (const char *binding, const char *privatekey_filename, const char *certchain_filename) { ensure_eventmachine("evma_set_tls_parms"); EventableDescriptor *ed = dynamic_cast (Bindable_t::GetObject (binding)); if (ed) ed->SetTlsParms (privatekey_filename, certchain_filename); } /***************** evma_get_peername *****************/ extern "C" int evma_get_peername (const char *binding, struct sockaddr *sa) { ensure_eventmachine("evma_get_peername"); EventableDescriptor *ed = dynamic_cast (Bindable_t::GetObject (binding)); if (ed) { return ed->GetPeername (sa) ? 1 : 0; } else return 0; } /***************** evma_get_sockname *****************/ extern "C" int evma_get_sockname (const char *binding, struct sockaddr *sa) { ensure_eventmachine("evma_get_sockname"); EventableDescriptor *ed = dynamic_cast (Bindable_t::GetObject (binding)); if (ed) { return ed->GetSockname (sa) ? 1 : 0; } else return 0; } /*********************** evma_get_subprocess_pid ***********************/ extern "C" int evma_get_subprocess_pid (const char *binding, pid_t *pid) { ensure_eventmachine("evma_get_subprocess_pid"); EventableDescriptor *ed = dynamic_cast (Bindable_t::GetObject (binding)); if (ed) { return ed->GetSubprocessPid (pid) ? 1 : 0; } else return 0; } /************************** evma_get_subprocess_status **************************/ extern "C" int evma_get_subprocess_status (const char *binding, int *status) { ensure_eventmachine("evma_get_subprocess_status"); if (status) { *status = EventMachine->SubprocessExitStatus; return 1; } else return 0; } /********************* evma_signal_loopbreak *********************/ extern "C" void evma_signal_loopbreak() { ensure_eventmachine("evma_signal_loopbreak"); EventMachine->SignalLoopBreaker(); } /**************** evma__write_file ****************/ extern "C" const char *evma__write_file (const char *filename) { ensure_eventmachine("evma__write_file"); return EventMachine->_OpenFileForWriting (filename); } /******************************** evma_get_comm_inactivity_timeout ********************************/ extern "C" int evma_get_comm_inactivity_timeout (const char *binding, int *value) { ensure_eventmachine("evma_get_comm_inactivity_timeout"); EventableDescriptor *ed = dynamic_cast (Bindable_t::GetObject (binding)); if (ed) { return ed->GetCommInactivityTimeout (value); } else return 0; //Perhaps this should be an exception. Access to an unknown binding. } /******************************** evma_set_comm_inactivity_timeout ********************************/ extern "C" int evma_set_comm_inactivity_timeout (const char *binding, int *value) { ensure_eventmachine("evma_set_comm_inactivity_timeout"); EventableDescriptor *ed = dynamic_cast (Bindable_t::GetObject (binding)); if (ed) { return ed->SetCommInactivityTimeout (value); } else return 0; //Perhaps this should be an exception. Access to an unknown binding. } /********************** evma_set_timer_quantum **********************/ extern "C" void evma_set_timer_quantum (int interval) { ensure_eventmachine("evma_set_timer_quantum"); EventMachine->SetTimerQuantum (interval); } /************************ evma_set_max_timer_count ************************/ extern "C" void evma_set_max_timer_count (int ct) { // This may only be called if the reactor is not running. if (EventMachine) #ifdef BUILD_FOR_RUBY rb_raise(rb_eRuntimeError, "eventmachine already initialized: evma_set_max_timer_count"); #else throw std::runtime_error ("eventmachine already initialized: evma_set_max_timer_count"); #endif EventMachine_t::SetMaxTimerCount (ct); } /****************** evma_setuid_string ******************/ extern "C" void evma_setuid_string (const char *username) { // We do NOT need to be running an EM instance because this method is static. EventMachine_t::SetuidString (username); } /********** evma_popen **********/ extern "C" const char *evma_popen (char * const*cmd_strings) { ensure_eventmachine("evma_popen"); return EventMachine->Socketpair (cmd_strings); } /*************************** evma_get_outbound_data_size ***************************/ extern "C" int evma_get_outbound_data_size (const char *binding) { ensure_eventmachine("evma_get_outbound_data_size"); EventableDescriptor *ed = dynamic_cast (Bindable_t::GetObject (binding)); return ed ? ed->GetOutboundDataSize() : 0; } /*********** evma__epoll ***********/ extern "C" void evma__epoll() { bUseEpoll = 1; } /************ evma__kqueue ************/ extern "C" void evma__kqueue() { bUseKqueue = 1; } /********************** evma_set_rlimit_nofile **********************/ extern "C" int evma_set_rlimit_nofile (int nofiles) { return EventMachine_t::SetRlimitNofile (nofiles); } /********************************* evma_send_file_data_to_connection *********************************/ extern "C" int evma_send_file_data_to_connection (const char *binding, const char *filename) { /* This is a sugaring over send_data_to_connection that reads a file into a * locally-allocated buffer, and sends the file data to the remote peer. * Return the number of bytes written to the caller. * TODO, needs to impose a limit on the file size. This is intended only for * small files. (I don't know, maybe 8K or less.) For larger files, use interleaved * I/O to avoid slowing the rest of the system down. * TODO: we should return a code rather than barf, in case of file-not-found. * TODO, does this compile on Windows? * TODO, given that we want this to work only with small files, how about allocating * the buffer on the stack rather than the heap? * * Modified 25Jul07. This now returns -1 on file-too-large; 0 for success, and a positive * errno in case of other errors. * /* Contributed by Kirk Haines. */ char data[32*1024]; int r; ensure_eventmachine("evma_send_file_data_to_connection"); int Fd = open (filename, O_RDONLY); if (Fd < 0) return errno; // From here on, all early returns MUST close Fd. struct stat st; if (fstat (Fd, &st)) { int e = errno; close (Fd); return e; } int filesize = st.st_size; if (filesize <= 0) { close (Fd); return 0; } else if (filesize > sizeof(data)) { close (Fd); return -1; } r = read (Fd, data, filesize); if (r != filesize) { int e = errno; close (Fd); return e; } evma_send_data_to_connection (binding, data, r); close (Fd); return 0; } /***************************************************************************** $Id$ File: cplusplus.cpp Date: 27Jul07 Copyright (C) 2006-07 by Francis Cianfrocca. All Rights Reserved. Gmail: blackhedd This program is free software; you can redistribute it and/or modify it under the terms of either: 1) the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version; or 2) Ruby's License. See the file COPYING for complete licensing information. *****************************************************************************/ #include "project.h" namespace EM { static map Eventables; static map Timers; } /******* EM::Run *******/ void EM::Run (void (*start_func)()) { evma__epoll(); evma_initialize_library (EM::Callback); if (start_func) AddTimer (0, start_func); evma_run_machine(); evma_release_library(); } /************ EM::AddTimer ************/ void EM::AddTimer (int milliseconds, void (*func)()) { if (func) { const char *sig = evma_install_oneshot_timer (milliseconds); Timers.insert (make_pair (sig, func)); } } /*************** EM::StopReactor ***************/ void EM::StopReactor() { evma_stop_machine(); } /******************** EM::Acceptor::Accept ********************/ void EM::Acceptor::Accept (const char *signature) { Connection *c = MakeConnection(); c->Signature = signature; Eventables.insert (make_pair (c->Signature, c)); c->PostInit(); } /************************ EM::Connection::SendData ************************/ void EM::Connection::SendData (const char *data) { if (data) SendData (data, strlen (data)); } /************************ EM::Connection::SendData ************************/ void EM::Connection::SendData (const char *data, int length) { evma_send_data_to_connection (Signature.c_str(), data, length); } /********************* EM::Connection::Close *********************/ void EM::Connection::Close (bool afterWriting) { evma_close_connection (Signature.c_str(), afterWriting); } /*********************** EM::Connection::Connect ***********************/ void EM::Connection::Connect (const char *host, int port) { Signature = evma_connect_to_server (host, port); Eventables.insert( make_pair (Signature, this)); } /******************* EM::Acceptor::Start *******************/ void EM::Acceptor::Start (const char *host, int port) { Signature = evma_create_tcp_server (host, port); Eventables.insert( make_pair (Signature, this)); } /************ EM::Callback ************/ void EM::Callback (const char *sig, int ev, const char *data, int length) { EM::Eventable *e; void (*f)(); switch (ev) { case EM_TIMER_FIRED: f = Timers [data]; if (f) (*f)(); Timers.erase (sig); break; case EM_CONNECTION_READ: e = EM::Eventables [sig]; e->ReceiveData (data, length); break; case EM_CONNECTION_COMPLETED: e = EM::Eventables [sig]; e->ConnectionCompleted(); break; case EM_CONNECTION_ACCEPTED: e = EM::Eventables [sig]; e->Accept (data); break; case EM_CONNECTION_UNBOUND: e = EM::Eventables [sig]; e->Unbind(); EM::Eventables.erase (sig); delete e; break; } } /***************************************************************************** $Id$ File: ed.cpp Date: 06Apr06 Copyright (C) 2006-07 by Francis Cianfrocca. All Rights Reserved. Gmail: blackhedd This program is free software; you can redistribute it and/or modify it under the terms of either: 1) the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version; or 2) Ruby's License. See the file COPYING for complete licensing information. *****************************************************************************/ #include "project.h" /******************** SetSocketNonblocking ********************/ bool SetSocketNonblocking (SOCKET sd) { #ifdef OS_UNIX int val = fcntl (sd, F_GETFL, 0); return (fcntl (sd, F_SETFL, val | O_NONBLOCK) != SOCKET_ERROR) ? true : false; #endif #ifdef OS_WIN32 unsigned long one = 1; return (ioctlsocket (sd, FIONBIO, &one) == 0) ? true : false; #endif } /**************************************** EventableDescriptor::EventableDescriptor ****************************************/ EventableDescriptor::EventableDescriptor (int sd, EventMachine_t *em): bCloseNow (false), bCloseAfterWriting (false), MySocket (sd), EventCallback (NULL), LastRead (0), LastWritten (0), bCallbackUnbind (true), MyEventMachine (em) { /* There are three ways to close a socket, all of which should * automatically signal to the event machine that this object * should be removed from the polling scheduler. * First is a hard close, intended for bad errors or possible * security violations. It immediately closes the connection * and puts this object into an error state. * Second is to set bCloseNow, which will cause the event machine * to delete this object (and thus close the connection in our * destructor) the next chance it gets. bCloseNow also inhibits * the writing of new data on the socket (but not necessarily * the reading of new data). * The third way is to set bCloseAfterWriting, which inhibits * the writing of new data and converts to bCloseNow as soon * as everything in the outbound queue has been written. * bCloseAfterWriting is really for use only by protocol handlers * (for example, HTTP writes an HTML page and then closes the * connection). All of the error states we generate internally * cause an immediate close to be scheduled, which may have the * effect of discarding outbound data. */ if (sd == INVALID_SOCKET) throw std::runtime_error ("bad eventable descriptor"); if (MyEventMachine == NULL) throw std::runtime_error ("bad em in eventable descriptor"); CreatedAt = gCurrentLoopTime; #ifdef HAVE_EPOLL EpollEvent.data.ptr = this; #endif } /***************************************** EventableDescriptor::~EventableDescriptor *****************************************/ EventableDescriptor::~EventableDescriptor() { if (EventCallback && bCallbackUnbind) (*EventCallback)(GetBinding().c_str(), EM_CONNECTION_UNBOUND, NULL, 0); Close(); } /************************************* EventableDescriptor::SetEventCallback *************************************/ void EventableDescriptor::SetEventCallback (void(*cb)(const char*, int, const char*, int)) { EventCallback = cb; } /************************** EventableDescriptor::Close **************************/ void EventableDescriptor::Close() { // Close the socket right now. Intended for emergencies. if (MySocket != INVALID_SOCKET) { shutdown (MySocket, 1); closesocket (MySocket); MySocket = INVALID_SOCKET; } } /********************************* EventableDescriptor::ShouldDelete *********************************/ bool EventableDescriptor::ShouldDelete() { /* For use by a socket manager, which needs to know if this object * should be removed from scheduling events and deleted. * Has an immediate close been scheduled, or are we already closed? * If either of these are the case, return true. In theory, the manager will * then delete us, which in turn will make sure the socket is closed. * Note, if bCloseAfterWriting is true, we check a virtual method to see * if there is outbound data to write, and only request a close if there is none. */ return ((MySocket == INVALID_SOCKET) || bCloseNow || (bCloseAfterWriting && (GetOutboundDataSize() <= 0))); } /********************************** EventableDescriptor::ScheduleClose **********************************/ void EventableDescriptor::ScheduleClose (bool after_writing) { // KEEP THIS SYNCHRONIZED WITH ::IsCloseScheduled. if (after_writing) bCloseAfterWriting = true; else bCloseNow = true; } /************************************* EventableDescriptor::IsCloseScheduled *************************************/ bool EventableDescriptor::IsCloseScheduled() { // KEEP THIS SYNCHRONIZED WITH ::ScheduleClose. return (bCloseNow || bCloseAfterWriting); } /****************************************** ConnectionDescriptor::ConnectionDescriptor ******************************************/ ConnectionDescriptor::ConnectionDescriptor (int sd, EventMachine_t *em): EventableDescriptor (sd, em), bConnectPending (false), bNotifyReadable (false), bNotifyWritable (false), bReadAttemptedAfterClose (false), bWriteAttemptedAfterClose (false), OutboundDataSize (0), #ifdef WITH_SSL SslBox (NULL), #endif bIsServer (false), LastIo (gCurrentLoopTime), InactivityTimeout (0) { #ifdef HAVE_EPOLL EpollEvent.events = EPOLLOUT; #endif // 22Jan09: Moved ArmKqueueWriter into SetConnectPending() to fix assertion failure in _WriteOutboundData() } /******************************************* ConnectionDescriptor::~ConnectionDescriptor *******************************************/ ConnectionDescriptor::~ConnectionDescriptor() { // Run down any stranded outbound data. for (size_t i=0; i < OutboundPages.size(); i++) OutboundPages[i].Free(); #ifdef WITH_SSL if (SslBox) delete SslBox; #endif } /************************************************** STATIC: ConnectionDescriptor::SendDataToConnection **************************************************/ int ConnectionDescriptor::SendDataToConnection (const char *binding, const char *data, int data_length) { // TODO: This is something of a hack, or at least it's a static method of the wrong class. // TODO: Poor polymorphism here. We should be calling one virtual method // instead of hacking out the runtime information of the target object. ConnectionDescriptor *cd = dynamic_cast (Bindable_t::GetObject (binding)); if (cd) return cd->SendOutboundData (data, data_length); DatagramDescriptor *ds = dynamic_cast (Bindable_t::GetObject (binding)); if (ds) return ds->SendOutboundData (data, data_length); #ifdef OS_UNIX PipeDescriptor *ps = dynamic_cast (Bindable_t::GetObject (binding)); if (ps) return ps->SendOutboundData (data, data_length); #endif return -1; } /********************************************* STATIC: ConnectionDescriptor::CloseConnection *********************************************/ void ConnectionDescriptor::CloseConnection (const char *binding, bool after_writing) { // TODO: This is something of a hack, or at least it's a static method of the wrong class. EventableDescriptor *ed = dynamic_cast (Bindable_t::GetObject (binding)); if (ed) ed->ScheduleClose (after_writing); } /*********************************************** STATIC: ConnectionDescriptor::ReportErrorStatus ***********************************************/ int ConnectionDescriptor::ReportErrorStatus (const char *binding) { // TODO: This is something of a hack, or at least it's a static method of the wrong class. // TODO: Poor polymorphism here. We should be calling one virtual method // instead of hacking out the runtime information of the target object. ConnectionDescriptor *cd = dynamic_cast (Bindable_t::GetObject (binding)); if (cd) return cd->_ReportErrorStatus(); return -1; } /*************************************** ConnectionDescriptor::SetConnectPending ****************************************/ void ConnectionDescriptor::SetConnectPending(bool f) { bConnectPending = f; #ifdef HAVE_KQUEUE MyEventMachine->ArmKqueueWriter (this); #endif } /************************************** ConnectionDescriptor::SendOutboundData **************************************/ int ConnectionDescriptor::SendOutboundData (const char *data, int length) { #ifdef WITH_SSL if (SslBox) { if (length > 0) { int w = SslBox->PutPlaintext (data, length); if (w < 0) ScheduleClose (false); else _DispatchCiphertext(); } // TODO: What's the correct return value? return 1; // That's a wild guess, almost certainly wrong. } else #endif return _SendRawOutboundData (data, length); } /****************************************** ConnectionDescriptor::_SendRawOutboundData ******************************************/ int ConnectionDescriptor::_SendRawOutboundData (const char *data, int length) { /* This internal method is called to schedule bytes that * will be sent out to the remote peer. * It's not directly accessed by the caller, who hits ::SendOutboundData, * which may or may not filter or encrypt the caller's data before * sending it here. */ // Highly naive and incomplete implementation. // There's no throttle for runaways (which should abort only this connection // and not the whole process), and no coalescing of small pages. // (Well, not so bad, small pages are coalesced in ::Write) if (IsCloseScheduled()) //if (bCloseNow || bCloseAfterWriting) return 0; if (!data && (length > 0)) throw std::runtime_error ("bad outbound data"); char *buffer = (char *) malloc (length + 1); if (!buffer) throw std::runtime_error ("no allocation for outbound data"); memcpy (buffer, data, length); buffer [length] = 0; OutboundPages.push_back (OutboundPage (buffer, length)); OutboundDataSize += length; #ifdef HAVE_EPOLL EpollEvent.events = (EPOLLIN | EPOLLOUT); assert (MyEventMachine); MyEventMachine->Modify (this); #endif #ifdef HAVE_KQUEUE MyEventMachine->ArmKqueueWriter (this); #endif return length; } /*********************************** ConnectionDescriptor::SelectForRead ***********************************/ bool ConnectionDescriptor::SelectForRead() { /* A connection descriptor is always scheduled for read, * UNLESS it's in a pending-connect state. * On Linux, unlike Unix, a nonblocking socket on which * connect has been called, does NOT necessarily select * both readable and writable in case of error. * The socket will select writable when the disposition * of the connect is known. On the other hand, a socket * which successfully connects and selects writable may * indeed have some data available on it, so it will * select readable in that case, violating expectations! * So we will not poll for readability until the socket * is known to be in a connected state. */ return bConnectPending ? false : true; } /************************************ ConnectionDescriptor::SelectForWrite ************************************/ bool ConnectionDescriptor::SelectForWrite() { /* Cf the notes under SelectForRead. * In a pending-connect state, we ALWAYS select for writable. * In a normal state, we only select for writable when we * have outgoing data to send. */ if (bConnectPending || bNotifyWritable) return true; else { return (GetOutboundDataSize() > 0); } } /************************** ConnectionDescriptor::Read **************************/ void ConnectionDescriptor::Read() { /* Read and dispatch data on a socket that has selected readable. * It's theoretically possible to get and dispatch incoming data on * a socket that has already been scheduled for closing or close-after-writing. * In those cases, we'll leave it up the to protocol handler to "do the * right thing" (which probably means to ignore the incoming data). * * 22Aug06: Chris Ochs reports that on FreeBSD, it's possible to come * here with the socket already closed, after the process receives * a ctrl-C signal (not sure if that's TERM or INT on BSD). The application * was one in which network connections were doing a lot of interleaved reads * and writes. * Since we always write before reading (in order to keep the outbound queues * as light as possible), I think what happened is that an interrupt caused * the socket to be closed in ConnectionDescriptor::Write. We'll then * come here in the same pass through the main event loop, and won't get * cleaned up until immediately after. * We originally asserted that the socket was valid when we got here. * To deal properly with the possibility that we are closed when we get here, * I removed the assert. HOWEVER, the potential for an infinite loop scares me, * so even though this is really clunky, I added a flag to assert that we never * come here more than once after being closed. (FCianfrocca) */ int sd = GetSocket(); //assert (sd != INVALID_SOCKET); (original, removed 22Aug06) if (sd == INVALID_SOCKET) { assert (!bReadAttemptedAfterClose); bReadAttemptedAfterClose = true; return; } if (bNotifyReadable) { if (EventCallback) (*EventCallback)(GetBinding().c_str(), EM_CONNECTION_NOTIFY_READABLE, NULL, 0); return; } LastIo = gCurrentLoopTime; int total_bytes_read = 0; char readbuffer [16 * 1024 + 1]; for (int i=0; i < 10; i++) { // Don't read just one buffer and then move on. This is faster // if there is a lot of incoming. // But don't read indefinitely. Give other sockets a chance to run. // NOTICE, we're reading one less than the buffer size. // That's so we can put a guard byte at the end of what we send // to user code. int r = recv (sd, readbuffer, sizeof(readbuffer) - 1, 0); //cerr << ""; if (r > 0) { total_bytes_read += r; LastRead = gCurrentLoopTime; // Add a null-terminator at the the end of the buffer // that we will send to the callback. // DO NOT EVER CHANGE THIS. We want to explicitly allow users // to be able to depend on this behavior, so they will have // the option to do some things faster. Additionally it's // a security guard against buffer overflows. readbuffer [r] = 0; _DispatchInboundData (readbuffer, r); } else if (r == 0) { break; } else { // Basically a would-block, meaning we've read everything there is to read. break; } } if (total_bytes_read == 0) { // If we read no data on a socket that selected readable, // it generally means the other end closed the connection gracefully. ScheduleClose (false); //bCloseNow = true; } } /****************************************** ConnectionDescriptor::_DispatchInboundData ******************************************/ void ConnectionDescriptor::_DispatchInboundData (const char *buffer, int size) { #ifdef WITH_SSL if (SslBox) { SslBox->PutCiphertext (buffer, size); int s; char B [2048]; while ((s = SslBox->GetPlaintext (B, sizeof(B) - 1)) > 0) { B [s] = 0; if (EventCallback) (*EventCallback)(GetBinding().c_str(), EM_CONNECTION_READ, B, s); } // INCOMPLETE, s may indicate an SSL error that would force the connection down. _DispatchCiphertext(); } else { if (EventCallback) (*EventCallback)(GetBinding().c_str(), EM_CONNECTION_READ, buffer, size); } #endif #ifdef WITHOUT_SSL if (EventCallback) (*EventCallback)(GetBinding().c_str(), EM_CONNECTION_READ, buffer, size); #endif } /*************************** ConnectionDescriptor::Write ***************************/ void ConnectionDescriptor::Write() { /* A socket which is in a pending-connect state will select * writable when the disposition of the connect is known. * At that point, check to be sure there are no errors, * and if none, then promote the socket out of the pending * state. * TODO: I haven't figured out how Windows signals errors on * unconnected sockets. Maybe it does the untraditional but * logical thing and makes the socket selectable for error. * If so, it's unsupported here for the time being, and connect * errors will have to be caught by the timeout mechanism. */ if (bConnectPending) { int error; socklen_t len; len = sizeof(error); #ifdef OS_UNIX int o = getsockopt (GetSocket(), SOL_SOCKET, SO_ERROR, &error, &len); #endif #ifdef OS_WIN32 int o = getsockopt (GetSocket(), SOL_SOCKET, SO_ERROR, (char*)&error, &len); #endif if ((o == 0) && (error == 0)) { if (EventCallback) (*EventCallback)(GetBinding().c_str(), EM_CONNECTION_COMPLETED, "", 0); bConnectPending = false; #ifdef HAVE_EPOLL // The callback may have scheduled outbound data. EpollEvent.events = EPOLLIN | (SelectForWrite() ? EPOLLOUT : 0); #endif #ifdef HAVE_KQUEUE MyEventMachine->ArmKqueueReader (this); // The callback may have scheduled outbound data. if (SelectForWrite()) MyEventMachine->ArmKqueueWriter (this); #endif } else ScheduleClose (false); //bCloseNow = true; } else { if (bNotifyWritable) { if (EventCallback) (*EventCallback)(GetBinding().c_str(), EM_CONNECTION_NOTIFY_WRITABLE, NULL, 0); return; } _WriteOutboundData(); } } /**************************************** ConnectionDescriptor::_WriteOutboundData ****************************************/ void ConnectionDescriptor::_WriteOutboundData() { /* This is a helper function called by ::Write. * It's possible for a socket to select writable and then no longer * be writable by the time we get around to writing. The kernel might * have used up its available output buffers between the select call * and when we get here. So this condition is not an error. * * 20Jul07, added the same kind of protection against an invalid socket * that is at the top of ::Read. Not entirely how this could happen in * real life (connection-reset from the remote peer, perhaps?), but I'm * doing it to address some reports of crashing under heavy loads. */ int sd = GetSocket(); //assert (sd != INVALID_SOCKET); if (sd == INVALID_SOCKET) { assert (!bWriteAttemptedAfterClose); bWriteAttemptedAfterClose = true; return; } LastIo = gCurrentLoopTime; char output_buffer [16 * 1024]; size_t nbytes = 0; while ((OutboundPages.size() > 0) && (nbytes < sizeof(output_buffer))) { OutboundPage *op = &(OutboundPages[0]); if ((nbytes + op->Length - op->Offset) < sizeof (output_buffer)) { memcpy (output_buffer + nbytes, op->Buffer + op->Offset, op->Length - op->Offset); nbytes += (op->Length - op->Offset); op->Free(); OutboundPages.pop_front(); } else { int len = sizeof(output_buffer) - nbytes; memcpy (output_buffer + nbytes, op->Buffer + op->Offset, len); op->Offset += len; nbytes += len; } } // We should never have gotten here if there were no data to write, // so assert that as a sanity check. // Don't bother to make sure nbytes is less than output_buffer because // if it were we probably would have crashed already. assert (nbytes > 0); assert (GetSocket() != INVALID_SOCKET); int bytes_written = send (GetSocket(), output_buffer, nbytes, 0); bool err = false; if (bytes_written < 0) { err = true; bytes_written = 0; } assert (bytes_written >= 0); OutboundDataSize -= bytes_written; if ((size_t)bytes_written < nbytes) { int len = nbytes - bytes_written; char *buffer = (char*) malloc (len + 1); if (!buffer) throw std::runtime_error ("bad alloc throwing back data"); memcpy (buffer, output_buffer + bytes_written, len); buffer [len] = 0; OutboundPages.push_front (OutboundPage (buffer, len)); } #ifdef HAVE_EPOLL EpollEvent.events = (EPOLLIN | (SelectForWrite() ? EPOLLOUT : 0)); assert (MyEventMachine); MyEventMachine->Modify (this); #endif #ifdef HAVE_KQUEUE if (SelectForWrite()) MyEventMachine->ArmKqueueWriter (this); #endif if (err) { #ifdef OS_UNIX if ((errno != EINPROGRESS) && (errno != EWOULDBLOCK) && (errno != EINTR)) #endif #ifdef OS_WIN32 if ((errno != WSAEINPROGRESS) && (errno != WSAEWOULDBLOCK)) #endif Close(); } } /**************************************** ConnectionDescriptor::_ReportErrorStatus ****************************************/ int ConnectionDescriptor::_ReportErrorStatus() { int error; socklen_t len; len = sizeof(error); #ifdef OS_UNIX int o = getsockopt (GetSocket(), SOL_SOCKET, SO_ERROR, &error, &len); #endif #ifdef OS_WIN32 int o = getsockopt (GetSocket(), SOL_SOCKET, SO_ERROR, (char*)&error, &len); #endif if ((o == 0) && (error == 0)) return 0; else return 1; } /****************************** ConnectionDescriptor::StartTls ******************************/ void ConnectionDescriptor::StartTls() { #ifdef WITH_SSL if (SslBox) throw std::runtime_error ("SSL/TLS already running on connection"); SslBox = new SslBox_t (bIsServer, PrivateKeyFilename, CertChainFilename); _DispatchCiphertext(); #endif #ifdef WITHOUT_SSL throw std::runtime_error ("Encryption not available on this event-machine"); #endif } /********************************* ConnectionDescriptor::SetTlsParms *********************************/ void ConnectionDescriptor::SetTlsParms (const char *privkey_filename, const char *certchain_filename) { #ifdef WITH_SSL if (SslBox) throw std::runtime_error ("call SetTlsParms before calling StartTls"); if (privkey_filename && *privkey_filename) PrivateKeyFilename = privkey_filename; if (certchain_filename && *certchain_filename) CertChainFilename = certchain_filename; #endif #ifdef WITHOUT_SSL throw std::runtime_error ("Encryption not available on this event-machine"); #endif } /***************************************** ConnectionDescriptor::_DispatchCiphertext *****************************************/ #ifdef WITH_SSL void ConnectionDescriptor::_DispatchCiphertext() { assert (SslBox); char BigBuf [2048]; bool did_work; do { did_work = false; // try to drain ciphertext while (SslBox->CanGetCiphertext()) { int r = SslBox->GetCiphertext (BigBuf, sizeof(BigBuf)); assert (r > 0); _SendRawOutboundData (BigBuf, r); did_work = true; } // Pump the SslBox, in case it has queued outgoing plaintext // This will return >0 if data was written, // 0 if no data was written, and <0 if there was a fatal error. bool pump; do { pump = false; int w = SslBox->PutPlaintext (NULL, 0); if (w > 0) { did_work = true; pump = true; } else if (w < 0) ScheduleClose (false); } while (pump); // try to put plaintext. INCOMPLETE, doesn't belong here? // In SendOutboundData, we're spooling plaintext directly // into SslBox. That may be wrong, we may need to buffer it // up here! /* const char *ptr; int ptr_length; while (OutboundPlaintext.GetPage (&ptr, &ptr_length)) { assert (ptr && (ptr_length > 0)); int w = SslMachine.PutPlaintext (ptr, ptr_length); if (w > 0) { OutboundPlaintext.DiscardBytes (w); did_work = true; } else break; } */ } while (did_work); } #endif /******************************* ConnectionDescriptor::Heartbeat *******************************/ void ConnectionDescriptor::Heartbeat() { /* Only allow a certain amount of time to go by while waiting * for a pending connect. If it expires, then kill the socket. * For a connected socket, close it if its inactivity timer * has expired. */ if (bConnectPending) { if ((gCurrentLoopTime - CreatedAt) >= PendingConnectTimeout) ScheduleClose (false); //bCloseNow = true; } else { if (InactivityTimeout && ((gCurrentLoopTime - LastIo) >= InactivityTimeout)) ScheduleClose (false); //bCloseNow = true; } } /**************************************** LoopbreakDescriptor::LoopbreakDescriptor ****************************************/ LoopbreakDescriptor::LoopbreakDescriptor (int sd, EventMachine_t *parent_em): EventableDescriptor (sd, parent_em) { /* This is really bad and ugly. Change someday if possible. * We have to know about an event-machine (probably the one that owns us), * so we can pass newly-created connections to it. */ bCallbackUnbind = false; #ifdef HAVE_EPOLL EpollEvent.events = EPOLLIN; #endif #ifdef HAVE_KQUEUE MyEventMachine->ArmKqueueReader (this); #endif } /************************* LoopbreakDescriptor::Read *************************/ void LoopbreakDescriptor::Read() { // TODO, refactor, this code is probably in the wrong place. assert (MyEventMachine); MyEventMachine->_ReadLoopBreaker(); } /************************** LoopbreakDescriptor::Write **************************/ void LoopbreakDescriptor::Write() { // Why are we here? throw std::runtime_error ("bad code path in loopbreak"); } /************************************** AcceptorDescriptor::AcceptorDescriptor **************************************/ AcceptorDescriptor::AcceptorDescriptor (int sd, EventMachine_t *parent_em): EventableDescriptor (sd, parent_em) { #ifdef HAVE_EPOLL EpollEvent.events = EPOLLIN; #endif #ifdef HAVE_KQUEUE MyEventMachine->ArmKqueueReader (this); #endif } /*************************************** AcceptorDescriptor::~AcceptorDescriptor ***************************************/ AcceptorDescriptor::~AcceptorDescriptor() { } /**************************************** STATIC: AcceptorDescriptor::StopAcceptor ****************************************/ void AcceptorDescriptor::StopAcceptor (const char *binding) { // TODO: This is something of a hack, or at least it's a static method of the wrong class. AcceptorDescriptor *ad = dynamic_cast (Bindable_t::GetObject (binding)); if (ad) ad->ScheduleClose (false); else throw std::runtime_error ("failed to close nonexistent acceptor"); } /************************ AcceptorDescriptor::Read ************************/ void AcceptorDescriptor::Read() { /* Accept up to a certain number of sockets on the listening connection. * Don't try to accept all that are present, because this would allow a DoS attack * in which no data were ever read or written. We should accept more than one, * if available, to keep the partially accepted sockets from backing up in the kernel. */ /* Make sure we use non-blocking i/o on the acceptor socket, since we're selecting it * for readability. According to Stevens UNP, it's possible for an acceptor to select readable * and then block when we call accept. For example, the other end resets the connection after * the socket selects readable and before we call accept. The kernel will remove the dead * socket from the accept queue. If the accept queue is now empty, accept will block. */ struct sockaddr_in pin; socklen_t addrlen = sizeof (pin); for (int i=0; i < 10; i++) { int sd = accept (GetSocket(), (struct sockaddr*)&pin, &addrlen); if (sd == INVALID_SOCKET) { // This breaks the loop when we've accepted everything on the kernel queue, // up to 10 new connections. But what if the *first* accept fails? // Does that mean anything serious is happening, beyond the situation // described in the note above? break; } // Set the newly-accepted socket non-blocking. // On Windows, this may fail because, weirdly, Windows inherits the non-blocking // attribute that we applied to the acceptor socket into the accepted one. if (!SetSocketNonblocking (sd)) { //int val = fcntl (sd, F_GETFL, 0); //if (fcntl (sd, F_SETFL, val | O_NONBLOCK) == -1) { shutdown (sd, 1); closesocket (sd); continue; } // Disable slow-start (Nagle algorithm). Eventually make this configurable. int one = 1; setsockopt (sd, IPPROTO_TCP, TCP_NODELAY, (char*) &one, sizeof(one)); ConnectionDescriptor *cd = new ConnectionDescriptor (sd, MyEventMachine); if (!cd) throw std::runtime_error ("no newly accepted connection"); cd->SetServerMode(); if (EventCallback) { (*EventCallback) (GetBinding().c_str(), EM_CONNECTION_ACCEPTED, cd->GetBinding().c_str(), cd->GetBinding().size()); } #ifdef HAVE_EPOLL cd->GetEpollEvent()->events = EPOLLIN | (cd->SelectForWrite() ? EPOLLOUT : 0); #endif assert (MyEventMachine); MyEventMachine->Add (cd); #ifdef HAVE_KQUEUE if (cd->SelectForWrite()) MyEventMachine->ArmKqueueWriter (cd); MyEventMachine->ArmKqueueReader (cd); #endif } } /************************* AcceptorDescriptor::Write *************************/ void AcceptorDescriptor::Write() { // Why are we here? throw std::runtime_error ("bad code path in acceptor"); } /***************************** AcceptorDescriptor::Heartbeat *****************************/ void AcceptorDescriptor::Heartbeat() { // No-op } /******************************* AcceptorDescriptor::GetSockname *******************************/ bool AcceptorDescriptor::GetSockname (struct sockaddr *s) { bool ok = false; if (s) { socklen_t len = sizeof(*s); int gp = getsockname (GetSocket(), s, &len); if (gp == 0) ok = true; } return ok; } /************************************** DatagramDescriptor::DatagramDescriptor **************************************/ DatagramDescriptor::DatagramDescriptor (int sd, EventMachine_t *parent_em): EventableDescriptor (sd, parent_em), OutboundDataSize (0), LastIo (gCurrentLoopTime), InactivityTimeout (0) { memset (&ReturnAddress, 0, sizeof(ReturnAddress)); /* Provisionally added 19Oct07. All datagram sockets support broadcasting. * Until now, sending to a broadcast address would give EACCES (permission denied) * on systems like Linux and BSD that require the SO_BROADCAST socket-option in order * to accept a packet to a broadcast address. Solaris doesn't require it. I think * Windows DOES require it but I'm not sure. * * Ruby does NOT do what we're doing here. In Ruby, you have to explicitly set SO_BROADCAST * on a UDP socket in order to enable broadcasting. The reason for requiring the option * in the first place is so that applications don't send broadcast datagrams by mistake. * I imagine that could happen if a user of an application typed in an address that happened * to be a broadcast address on that particular subnet. * * This is provisional because someone may eventually come up with a good reason not to * do it for all UDP sockets. If that happens, then we'll need to add a usercode-level API * to set the socket option, just like Ruby does. AND WE'LL ALSO BREAK CODE THAT DOESN'T * EXPLICITLY SET THE OPTION. */ int oval = 1; int sob = setsockopt (GetSocket(), SOL_SOCKET, SO_BROADCAST, (char*)&oval, sizeof(oval)); #ifdef HAVE_EPOLL EpollEvent.events = EPOLLIN; #endif #ifdef HAVE_KQUEUE MyEventMachine->ArmKqueueReader (this); #endif } /*************************************** DatagramDescriptor::~DatagramDescriptor ***************************************/ DatagramDescriptor::~DatagramDescriptor() { // Run down any stranded outbound data. for (size_t i=0; i < OutboundPages.size(); i++) OutboundPages[i].Free(); } /***************************** DatagramDescriptor::Heartbeat *****************************/ void DatagramDescriptor::Heartbeat() { // Close it if its inactivity timer has expired. if (InactivityTimeout && ((gCurrentLoopTime - LastIo) >= InactivityTimeout)) ScheduleClose (false); //bCloseNow = true; } /************************ DatagramDescriptor::Read ************************/ void DatagramDescriptor::Read() { int sd = GetSocket(); assert (sd != INVALID_SOCKET); LastIo = gCurrentLoopTime; // This is an extremely large read buffer. // In many cases you wouldn't expect to get any more than 4K. char readbuffer [16 * 1024]; for (int i=0; i < 10; i++) { // Don't read just one buffer and then move on. This is faster // if there is a lot of incoming. // But don't read indefinitely. Give other sockets a chance to run. // NOTICE, we're reading one less than the buffer size. // That's so we can put a guard byte at the end of what we send // to user code. struct sockaddr_in sin; socklen_t slen = sizeof (sin); memset (&sin, 0, slen); int r = recvfrom (sd, readbuffer, sizeof(readbuffer) - 1, 0, (struct sockaddr*)&sin, &slen); //cerr << ""; // In UDP, a zero-length packet is perfectly legal. if (r >= 0) { LastRead = gCurrentLoopTime; // Add a null-terminator at the the end of the buffer // that we will send to the callback. // DO NOT EVER CHANGE THIS. We want to explicitly allow users // to be able to depend on this behavior, so they will have // the option to do some things faster. Additionally it's // a security guard against buffer overflows. readbuffer [r] = 0; // Set up a "temporary" return address so that callers can "reply" to us // from within the callback we are about to invoke. That means that ordinary // calls to "send_data_to_connection" (which is of course misnamed in this // case) will result in packets being sent back to the same place that sent // us this one. // There is a different call (evma_send_datagram) for cases where the caller // actually wants to send a packet somewhere else. memset (&ReturnAddress, 0, sizeof(ReturnAddress)); memcpy (&ReturnAddress, &sin, slen); if (EventCallback) (*EventCallback)(GetBinding().c_str(), EM_CONNECTION_READ, readbuffer, r); } else { // Basically a would-block, meaning we've read everything there is to read. break; } } } /************************* DatagramDescriptor::Write *************************/ void DatagramDescriptor::Write() { /* It's possible for a socket to select writable and then no longer * be writable by the time we get around to writing. The kernel might * have used up its available output buffers between the select call * and when we get here. So this condition is not an error. * This code is very reminiscent of ConnectionDescriptor::_WriteOutboundData, * but differs in the that the outbound data pages (received from the * user) are _message-structured._ That is, we send each of them out * one message at a time. * TODO, we are currently suppressing the EMSGSIZE error!!! */ int sd = GetSocket(); assert (sd != INVALID_SOCKET); LastIo = gCurrentLoopTime; assert (OutboundPages.size() > 0); // Send out up to 10 packets, then cycle the machine. for (int i = 0; i < 10; i++) { if (OutboundPages.size() <= 0) break; OutboundPage *op = &(OutboundPages[0]); // The nasty cast to (char*) is needed because Windows is brain-dead. int s = sendto (sd, (char*)op->Buffer, op->Length, 0, (struct sockaddr*)&(op->From), sizeof(op->From)); int e = errno; OutboundDataSize -= op->Length; op->Free(); OutboundPages.pop_front(); if (s == SOCKET_ERROR) { #ifdef OS_UNIX if ((e != EINPROGRESS) && (e != EWOULDBLOCK) && (e != EINTR)) { #endif #ifdef OS_WIN32 if ((e != WSAEINPROGRESS) && (e != WSAEWOULDBLOCK)) { #endif Close(); break; } } } #ifdef HAVE_EPOLL EpollEvent.events = (EPOLLIN | (SelectForWrite() ? EPOLLOUT : 0)); assert (MyEventMachine); MyEventMachine->Modify (this); #endif } /********************************** DatagramDescriptor::SelectForWrite **********************************/ bool DatagramDescriptor::SelectForWrite() { /* Changed 15Nov07, per bug report by Mark Zvillius. * The outbound data size will be zero if there are zero-length outbound packets, * so we now select writable in case the outbound page buffer is not empty. * Note that the superclass ShouldDelete method still checks for outbound data size, * which may be wrong. */ //return (GetOutboundDataSize() > 0); (Original) return (OutboundPages.size() > 0); } /************************************ DatagramDescriptor::SendOutboundData ************************************/ int DatagramDescriptor::SendOutboundData (const char *data, int length) { // This is an exact clone of ConnectionDescriptor::SendOutboundData. // That means it needs to move to a common ancestor. if (IsCloseScheduled()) //if (bCloseNow || bCloseAfterWriting) return 0; if (!data && (length > 0)) throw std::runtime_error ("bad outbound data"); char *buffer = (char *) malloc (length + 1); if (!buffer) throw std::runtime_error ("no allocation for outbound data"); memcpy (buffer, data, length); buffer [length] = 0; OutboundPages.push_back (OutboundPage (buffer, length, ReturnAddress)); OutboundDataSize += length; #ifdef HAVE_EPOLL EpollEvent.events = (EPOLLIN | EPOLLOUT); assert (MyEventMachine); MyEventMachine->Modify (this); #endif return length; } /**************************************** DatagramDescriptor::SendOutboundDatagram ****************************************/ int DatagramDescriptor::SendOutboundDatagram (const char *data, int length, const char *address, int port) { // This is an exact clone of ConnectionDescriptor::SendOutboundData. // That means it needs to move to a common ancestor. // TODO: Refactor this so there's no overlap with SendOutboundData. if (IsCloseScheduled()) //if (bCloseNow || bCloseAfterWriting) return 0; if (!address || !*address || !port) return 0; sockaddr_in pin; unsigned long HostAddr; HostAddr = inet_addr (address); if (HostAddr == INADDR_NONE) { // The nasty cast to (char*) is because Windows is brain-dead. hostent *hp = gethostbyname ((char*)address); if (!hp) return 0; HostAddr = ((in_addr*)(hp->h_addr))->s_addr; } memset (&pin, 0, sizeof(pin)); pin.sin_family = AF_INET; pin.sin_addr.s_addr = HostAddr; pin.sin_port = htons (port); if (!data && (length > 0)) throw std::runtime_error ("bad outbound data"); char *buffer = (char *) malloc (length + 1); if (!buffer) throw std::runtime_error ("no allocation for outbound data"); memcpy (buffer, data, length); buffer [length] = 0; OutboundPages.push_back (OutboundPage (buffer, length, pin)); OutboundDataSize += length; #ifdef HAVE_EPOLL EpollEvent.events = (EPOLLIN | EPOLLOUT); assert (MyEventMachine); MyEventMachine->Modify (this); #endif return length; } /**************************************** STATIC: DatagramDescriptor::SendDatagram ****************************************/ int DatagramDescriptor::SendDatagram (const char *binding, const char *data, int length, const char *address, int port) { DatagramDescriptor *dd = dynamic_cast (Bindable_t::GetObject (binding)); if (dd) return dd->SendOutboundDatagram (data, length, address, port); else return -1; } /********************************* ConnectionDescriptor::GetPeername *********************************/ bool ConnectionDescriptor::GetPeername (struct sockaddr *s) { bool ok = false; if (s) { socklen_t len = sizeof(*s); int gp = getpeername (GetSocket(), s, &len); if (gp == 0) ok = true; } return ok; } /********************************* ConnectionDescriptor::GetSockname *********************************/ bool ConnectionDescriptor::GetSockname (struct sockaddr *s) { bool ok = false; if (s) { socklen_t len = sizeof(*s); int gp = getsockname (GetSocket(), s, &len); if (gp == 0) ok = true; } return ok; } /********************************************** ConnectionDescriptor::GetCommInactivityTimeout **********************************************/ int ConnectionDescriptor::GetCommInactivityTimeout (int *value) { if (value) { *value = InactivityTimeout; return 1; } else { // TODO, extended logging, got bad parameter. return 0; } } /********************************************** ConnectionDescriptor::SetCommInactivityTimeout **********************************************/ int ConnectionDescriptor::SetCommInactivityTimeout (int *value) { int out = 0; if (value) { if ((*value==0) || (*value >= 2)) { // Replace the value and send the old one back to the caller. int v = *value; *value = InactivityTimeout; InactivityTimeout = v; out = 1; } else { // TODO, extended logging, got bad value. } } else { // TODO, extended logging, got bad parameter. } return out; } /******************************* DatagramDescriptor::GetPeername *******************************/ bool DatagramDescriptor::GetPeername (struct sockaddr *s) { bool ok = false; if (s) { memset (s, 0, sizeof(struct sockaddr)); memcpy (s, &ReturnAddress, sizeof(ReturnAddress)); ok = true; } return ok; } /******************************* DatagramDescriptor::GetSockname *******************************/ bool DatagramDescriptor::GetSockname (struct sockaddr *s) { bool ok = false; if (s) { socklen_t len = sizeof(*s); int gp = getsockname (GetSocket(), s, &len); if (gp == 0) ok = true; } return ok; } /******************************************** DatagramDescriptor::GetCommInactivityTimeout ********************************************/ int DatagramDescriptor::GetCommInactivityTimeout (int *value) { if (value) { *value = InactivityTimeout; return 1; } else { // TODO, extended logging, got bad parameter. return 0; } } /******************************************** DatagramDescriptor::SetCommInactivityTimeout ********************************************/ int DatagramDescriptor::SetCommInactivityTimeout (int *value) { int out = 0; if (value) { if ((*value==0) || (*value >= 2)) { // Replace the value and send the old one back to the caller. int v = *value; *value = InactivityTimeout; InactivityTimeout = v; out = 1; } else { // TODO, extended logging, got bad value. } } else { // TODO, extended logging, got bad parameter. } return out; } /***************************************************************************** $Id$ File: em.cpp Date: 06Apr06 Copyright (C) 2006-07 by Francis Cianfrocca. All Rights Reserved. Gmail: blackhedd This program is free software; you can redistribute it and/or modify it under the terms of either: 1) the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version; or 2) Ruby's License. See the file COPYING for complete licensing information. *****************************************************************************/ // THIS ENTIRE FILE WILL EVENTUALLY BE FOR UNIX BUILDS ONLY. //#ifdef OS_UNIX #include "project.h" // Keep a global variable floating around // with the current loop time as set by the Event Machine. // This avoids the need for frequent expensive calls to time(NULL); time_t gCurrentLoopTime; #ifdef OS_WIN32 unsigned gTickCountTickover; unsigned gLastTickCount; #endif /* The numer of max outstanding timers was once a const enum defined in em.h. * Now we define it here so that users can change its value if necessary. */ static int MaxOutstandingTimers = 1000; /* Internal helper to convert strings to internet addresses. IPv6-aware. * Not reentrant or threadsafe, optimized for speed. */ static struct sockaddr *name2address (const char *server, int port, int *family, int *bind_size); /*************************************** STATIC EventMachine_t::SetMaxTimerCount ***************************************/ void EventMachine_t::SetMaxTimerCount (int count) { /* Allow a user to increase the maximum number of outstanding timers. * If this gets "too high" (a metric that is of course platform dependent), * bad things will happen like performance problems and possible overuse * of memory. * The actual timer mechanism is very efficient so it's hard to know what * the practical max, but 100,000 shouldn't be too problematical. */ if (count < 100) count = 100; MaxOutstandingTimers = count; } /****************************** EventMachine_t::EventMachine_t ******************************/ EventMachine_t::EventMachine_t (void (*event_callback)(const char*, int, const char*, int)): EventCallback (event_callback), NextHeartbeatTime (0), LoopBreakerReader (-1), LoopBreakerWriter (-1), bEpoll (false), bKqueue (false), epfd (-1) { // Default time-slice is just smaller than one hundred mills. Quantum.tv_sec = 0; Quantum.tv_usec = 90000; gTerminateSignalReceived = false; // Make sure the current loop time is sane, in case we do any initializations of // objects before we start running. gCurrentLoopTime = time(NULL); /* We initialize the network library here (only on Windows of course) * and initialize "loop breakers." Our destructor also does some network-level * cleanup. There's thus an implicit assumption that any given instance of EventMachine_t * will only call ::Run once. Is that a good assumption? Should we move some of these * inits and de-inits into ::Run? */ #ifdef OS_WIN32 WSADATA w; WSAStartup (MAKEWORD (1, 1), &w); #endif _InitializeLoopBreaker(); } /******************************* EventMachine_t::~EventMachine_t *******************************/ EventMachine_t::~EventMachine_t() { // Run down descriptors size_t i; for (i = 0; i < NewDescriptors.size(); i++) delete NewDescriptors[i]; for (i = 0; i < Descriptors.size(); i++) delete Descriptors[i]; close (LoopBreakerReader); close (LoopBreakerWriter); if (epfd != -1) close (epfd); if (kqfd != -1) close (kqfd); } /************************* EventMachine_t::_UseEpoll *************************/ void EventMachine_t::_UseEpoll() { /* Temporary. * Use an internal flag to switch in epoll-based functionality until we determine * how it should be integrated properly and the extent of the required changes. * A permanent solution needs to allow the integration of additional technologies, * like kqueue and Solaris's events. */ #ifdef HAVE_EPOLL bEpoll = true; #endif } /************************** EventMachine_t::_UseKqueue **************************/ void EventMachine_t::_UseKqueue() { /* Temporary. * See comments under _UseEpoll. */ #ifdef HAVE_KQUEUE bKqueue = true; #endif } /**************************** EventMachine_t::ScheduleHalt ****************************/ void EventMachine_t::ScheduleHalt() { /* This is how we stop the machine. * This can be called by clients. Signal handlers will probably * set the global flag. * For now this means there can only be one EventMachine ever running at a time. * * IMPORTANT: keep this light, fast, and async-safe. Don't do anything frisky in here, * because it may be called from signal handlers invoked from code that we don't * control. At this writing (20Sep06), EM does NOT install any signal handlers of * its own. * * We need a FAQ. And one of the questions is: how do I stop EM when Ctrl-C happens? * The answer is to call evma_stop_machine, which calls here, from a SIGINT handler. */ gTerminateSignalReceived = true; } /******************************* EventMachine_t::SetTimerQuantum *******************************/ void EventMachine_t::SetTimerQuantum (int interval) { /* We get a timer-quantum expressed in milliseconds. * Don't set a quantum smaller than 5 or larger than 2500. */ if ((interval < 5) || (interval > 2500)) throw std::runtime_error ("invalid timer-quantum"); Quantum.tv_sec = interval / 1000; Quantum.tv_usec = (interval % 1000) * 1000; } /************************************* (STATIC) EventMachine_t::SetuidString *************************************/ void EventMachine_t::SetuidString (const char *username) { /* This method takes a caller-supplied username and tries to setuid * to that user. There is no meaningful implementation (and no error) * on Windows. On Unix, a failure to setuid the caller-supplied string * causes a fatal abort, because presumably the program is calling here * in order to fulfill a security requirement. If we fail silently, * the user may continue to run with too much privilege. * * TODO, we need to decide on and document a way of generating C++ level errors * that can be wrapped in documented Ruby exceptions, so users can catch * and handle them. And distinguish it from errors that we WON'T let the Ruby * user catch (like security-violations and resource-overallocation). * A setuid failure here would be in the latter category. */ #ifdef OS_UNIX if (!username || !*username) throw std::runtime_error ("setuid_string failed: no username specified"); struct passwd *p = getpwnam (username); if (!p) throw std::runtime_error ("setuid_string failed: unknown username"); if (setuid (p->pw_uid) != 0) throw std::runtime_error ("setuid_string failed: no setuid"); // Success. #endif } /**************************************** (STATIC) EventMachine_t::SetRlimitNofile ****************************************/ int EventMachine_t::SetRlimitNofile (int nofiles) { #ifdef OS_UNIX struct rlimit rlim; getrlimit (RLIMIT_NOFILE, &rlim); if (nofiles >= 0) { rlim.rlim_cur = nofiles; if (nofiles > rlim.rlim_max) rlim.rlim_max = nofiles; setrlimit (RLIMIT_NOFILE, &rlim); // ignore the error return, for now at least. // TODO, emit an error message someday when we have proper debug levels. } getrlimit (RLIMIT_NOFILE, &rlim); return rlim.rlim_cur; #endif #ifdef OS_WIN32 // No meaningful implementation on Windows. return 0; #endif } /********************************* EventMachine_t::SignalLoopBreaker *********************************/ void EventMachine_t::SignalLoopBreaker() { #ifdef OS_UNIX write (LoopBreakerWriter, "", 1); #endif #ifdef OS_WIN32 sendto (LoopBreakerReader, "", 0, 0, (struct sockaddr*)&(LoopBreakerTarget), sizeof(LoopBreakerTarget)); #endif } /************************************** EventMachine_t::_InitializeLoopBreaker **************************************/ void EventMachine_t::_InitializeLoopBreaker() { /* A "loop-breaker" is a socket-descriptor that we can write to in order * to break the main select loop. Primarily useful for things running on * threads other than the main EM thread, so they can trigger processing * of events that arise exogenously to the EM. * Keep the loop-breaker pipe out of the main descriptor set, otherwise * its events will get passed on to user code. */ #ifdef OS_UNIX int fd[2]; if (pipe (fd)) throw std::runtime_error ("no loop breaker"); LoopBreakerWriter = fd[1]; LoopBreakerReader = fd[0]; #endif #ifdef OS_WIN32 int sd = socket (AF_INET, SOCK_DGRAM, 0); if (sd == INVALID_SOCKET) throw std::runtime_error ("no loop breaker socket"); SetSocketNonblocking (sd); memset (&LoopBreakerTarget, 0, sizeof(LoopBreakerTarget)); LoopBreakerTarget.sin_family = AF_INET; LoopBreakerTarget.sin_addr.s_addr = inet_addr ("127.0.0.1"); srand ((int)time(NULL)); int i; for (i=0; i < 100; i++) { int r = (rand() % 10000) + 20000; LoopBreakerTarget.sin_port = htons (r); if (bind (sd, (struct sockaddr*)&LoopBreakerTarget, sizeof(LoopBreakerTarget)) == 0) break; } if (i == 100) throw std::runtime_error ("no loop breaker"); LoopBreakerReader = sd; #endif } /******************* EventMachine_t::Run *******************/ void EventMachine_t::Run() { #ifdef OS_WIN32 HookControlC (true); #endif #ifdef HAVE_EPOLL if (bEpoll) { epfd = epoll_create (MaxEpollDescriptors); if (epfd == -1) { char buf[200]; snprintf (buf, sizeof(buf)-1, "unable to create epoll descriptor: %s", strerror(errno)); throw std::runtime_error (buf); } int cloexec = fcntl (epfd, F_GETFD, 0); assert (cloexec >= 0); cloexec |= FD_CLOEXEC; fcntl (epfd, F_SETFD, cloexec); assert (LoopBreakerReader >= 0); LoopbreakDescriptor *ld = new LoopbreakDescriptor (LoopBreakerReader, this); assert (ld); Add (ld); } #endif #ifdef HAVE_KQUEUE if (bKqueue) { kqfd = kqueue(); if (kqfd == -1) { char buf[200]; snprintf (buf, sizeof(buf)-1, "unable to create kqueue descriptor: %s", strerror(errno)); throw std::runtime_error (buf); } // cloexec not needed. By definition, kqueues are not carried across forks. assert (LoopBreakerReader >= 0); LoopbreakDescriptor *ld = new LoopbreakDescriptor (LoopBreakerReader, this); assert (ld); Add (ld); } #endif while (true) { gCurrentLoopTime = time(NULL); if (!_RunTimers()) break; /* _Add must precede _Modify because the same descriptor might * be on both lists during the same pass through the machine, * and to modify a descriptor before adding it would fail. */ _AddNewDescriptors(); _ModifyDescriptors(); if (!_RunOnce()) break; if (gTerminateSignalReceived) break; } #ifdef OS_WIN32 HookControlC (false); #endif } /************************ EventMachine_t::_RunOnce ************************/ bool EventMachine_t::_RunOnce() { if (bEpoll) return _RunEpollOnce(); else if (bKqueue) return _RunKqueueOnce(); else return _RunSelectOnce(); } /***************************** EventMachine_t::_RunEpollOnce *****************************/ bool EventMachine_t::_RunEpollOnce() { #ifdef HAVE_EPOLL assert (epfd != -1); struct epoll_event ev [MaxEpollDescriptors]; int s; #ifdef BUILD_FOR_RUBY TRAP_BEG; #endif s = epoll_wait (epfd, ev, MaxEpollDescriptors, 50); #ifdef BUILD_FOR_RUBY TRAP_END; #endif if (s > 0) { for (int i=0; i < s; i++) { EventableDescriptor *ed = (EventableDescriptor*) ev[i].data.ptr; if (ev[i].events & (EPOLLERR | EPOLLHUP)) ed->ScheduleClose (false); if (ev[i].events & EPOLLIN) ed->Read(); if (ev[i].events & EPOLLOUT) { ed->Write(); epoll_ctl (epfd, EPOLL_CTL_MOD, ed->GetSocket(), ed->GetEpollEvent()); // Ignoring return value } } } else if (s < 0) { // epoll_wait can fail on error in a handful of ways. // If this happens, then wait for a little while to avoid busy-looping. // If the error was EINTR, we probably caught SIGCHLD or something, // so keep the wait short. timeval tv = {0, ((errno == EINTR) ? 5 : 50) * 1000}; EmSelect (0, NULL, NULL, NULL, &tv); } { // cleanup dying sockets // vector::pop_back works in constant time. // TODO, rip this out and only delete the descriptors we know have died, // rather than traversing the whole list. // Modified 05Jan08 per suggestions by Chris Heath. It's possible that // an EventableDescriptor will have a descriptor value of -1. That will // happen if EventableDescriptor::Close was called on it. In that case, // don't call epoll_ctl to remove the socket's filters from the epoll set. // According to the epoll docs, this happens automatically when the // descriptor is closed anyway. This is different from the case where // the socket has already been closed but the descriptor in the ED object // hasn't yet been set to INVALID_SOCKET. int i, j; int nSockets = Descriptors.size(); for (i=0, j=0; i < nSockets; i++) { EventableDescriptor *ed = Descriptors[i]; assert (ed); if (ed->ShouldDelete()) { if (ed->GetSocket() != INVALID_SOCKET) { assert (bEpoll); // wouldn't be in this method otherwise. assert (epfd != -1); int e = epoll_ctl (epfd, EPOLL_CTL_DEL, ed->GetSocket(), ed->GetEpollEvent()); // ENOENT or EBADF are not errors because the socket may be already closed when we get here. if (e && (errno != ENOENT) && (errno != EBADF)) { char buf [200]; snprintf (buf, sizeof(buf)-1, "unable to delete epoll event: %s", strerror(errno)); throw std::runtime_error (buf); } } ModifiedDescriptors.erase (ed); delete ed; } else Descriptors [j++] = ed; } while ((size_t)j < Descriptors.size()) Descriptors.pop_back(); } // TODO, heartbeats. // Added 14Sep07, its absence was noted by Brian Candler. But the comment was here, indicated // that this got thought about and not done when EPOLL was originally written. Was there a reason // not to do it, or was it an oversight? Certainly, running a heartbeat on 50,000 connections every // two seconds can get to be a real bear, especially if all we're doing is timing out dead ones. // Maybe there's a better way to do this. (Or maybe it's not that expensive after all.) // { // dispatch heartbeats if (gCurrentLoopTime >= NextHeartbeatTime) { NextHeartbeatTime = gCurrentLoopTime + HeartbeatInterval; for (int i=0; i < Descriptors.size(); i++) { EventableDescriptor *ed = Descriptors[i]; assert (ed); ed->Heartbeat(); } } } #ifdef BUILD_FOR_RUBY if (!rb_thread_alone()) { rb_thread_schedule(); } #endif return true; #else throw std::runtime_error ("epoll is not implemented on this platform"); #endif } /****************************** EventMachine_t::_RunKqueueOnce ******************************/ bool EventMachine_t::_RunKqueueOnce() { #ifdef HAVE_KQUEUE assert (kqfd != -1); const int maxKevents = 2000; struct kevent Karray [maxKevents]; struct timespec ts = {0, 10000000}; // Too frequent. Use blocking_region int k; #ifdef BUILD_FOR_RUBY TRAP_BEG; #endif k = kevent (kqfd, NULL, 0, Karray, maxKevents, &ts); #ifdef BUILD_FOR_RUBY TRAP_END; #endif struct kevent *ke = Karray; while (k > 0) { EventableDescriptor *ed = (EventableDescriptor*) (ke->udata); assert (ed); if (ke->filter == EVFILT_READ) ed->Read(); else if (ke->filter == EVFILT_WRITE) ed->Write(); else cerr << "Discarding unknown kqueue event " << ke->filter << endl; --k; ++ke; } { // cleanup dying sockets // vector::pop_back works in constant time. // TODO, rip this out and only delete the descriptors we know have died, // rather than traversing the whole list. // In kqueue, closing a descriptor automatically removes its event filters. int i, j; int nSockets = Descriptors.size(); for (i=0, j=0; i < nSockets; i++) { EventableDescriptor *ed = Descriptors[i]; assert (ed); if (ed->ShouldDelete()) { ModifiedDescriptors.erase (ed); delete ed; } else Descriptors [j++] = ed; } while ((size_t)j < Descriptors.size()) Descriptors.pop_back(); } { // dispatch heartbeats if (gCurrentLoopTime >= NextHeartbeatTime) { NextHeartbeatTime = gCurrentLoopTime + HeartbeatInterval; for (int i=0; i < Descriptors.size(); i++) { EventableDescriptor *ed = Descriptors[i]; assert (ed); ed->Heartbeat(); } } } // TODO, replace this with rb_thread_blocking_region for 1.9 builds. #ifdef BUILD_FOR_RUBY if (!rb_thread_alone()) { rb_thread_schedule(); } #endif return true; #else throw std::runtime_error ("kqueue is not implemented on this platform"); #endif } /********************************* EventMachine_t::_ModifyEpollEvent *********************************/ void EventMachine_t::_ModifyEpollEvent (EventableDescriptor *ed) { #ifdef HAVE_EPOLL if (bEpoll) { assert (epfd != -1); assert (ed); int e = epoll_ctl (epfd, EPOLL_CTL_MOD, ed->GetSocket(), ed->GetEpollEvent()); if (e) { char buf [200]; snprintf (buf, sizeof(buf)-1, "unable to modify epoll event: %s", strerror(errno)); throw std::runtime_error (buf); } } #endif } /************************** SelectData_t::SelectData_t **************************/ SelectData_t::SelectData_t() { maxsocket = 0; FD_ZERO (&fdreads); FD_ZERO (&fdwrites); } #ifdef BUILD_FOR_RUBY /***************** _SelectDataSelect *****************/ #ifdef HAVE_TBR static VALUE _SelectDataSelect (void *v) { SelectData_t *sd = (SelectData_t*)v; sd->nSockets = select (sd->maxsocket+1, &(sd->fdreads), &(sd->fdwrites), NULL, &(sd->tv)); return Qnil; } #endif /********************* SelectData_t::_Select *********************/ int SelectData_t::_Select() { #ifdef HAVE_TBR rb_thread_blocking_region (_SelectDataSelect, (void*)this, RUBY_UBF_IO, 0); return nSockets; #endif #ifndef HAVE_TBR return EmSelect (maxsocket+1, &fdreads, &fdwrites, NULL, &tv); #endif } #endif /****************************** EventMachine_t::_RunSelectOnce ******************************/ bool EventMachine_t::_RunSelectOnce() { // Crank the event machine once. // If there are no descriptors to process, then sleep // for a few hundred mills to avoid busy-looping. // Return T/F to indicate whether we should continue. // This is based on a select loop. Alternately provide epoll // if we know we're running on a 2.6 kernel. // epoll will be effective if we provide it as an alternative, // however it has the same problem interoperating with Ruby // threads that select does. //cerr << "X"; /* This protection is now obsolete, because we will ALWAYS * have at least one descriptor (the loop-breaker) to read. */ /* if (Descriptors.size() == 0) { #ifdef OS_UNIX timeval tv = {0, 200 * 1000}; EmSelect (0, NULL, NULL, NULL, &tv); return true; #endif #ifdef OS_WIN32 Sleep (200); return true; #endif } */ SelectData_t SelectData; /* fd_set fdreads, fdwrites; FD_ZERO (&fdreads); FD_ZERO (&fdwrites); int maxsocket = 0; */ // Always read the loop-breaker reader. // Changed 23Aug06, provisionally implemented for Windows with a UDP socket // running on localhost with a randomly-chosen port. (*Puke*) // Windows has a version of the Unix pipe() library function, but it doesn't // give you back descriptors that are selectable. FD_SET (LoopBreakerReader, &(SelectData.fdreads)); if (SelectData.maxsocket < LoopBreakerReader) SelectData.maxsocket = LoopBreakerReader; // prepare the sockets for reading and writing size_t i; for (i = 0; i < Descriptors.size(); i++) { EventableDescriptor *ed = Descriptors[i]; assert (ed); int sd = ed->GetSocket(); assert (sd != INVALID_SOCKET); if (ed->SelectForRead()) FD_SET (sd, &(SelectData.fdreads)); if (ed->SelectForWrite()) FD_SET (sd, &(SelectData.fdwrites)); if (SelectData.maxsocket < sd) SelectData.maxsocket = sd; } { // read and write the sockets //timeval tv = {1, 0}; // Solaris fails if the microseconds member is >= 1000000. //timeval tv = Quantum; SelectData.tv = Quantum; int s = SelectData._Select(); //rb_thread_blocking_region(xxx,(void*)&SelectData,RUBY_UBF_IO,0); //int s = EmSelect (SelectData.maxsocket+1, &(SelectData.fdreads), &(SelectData.fdwrites), NULL, &(SelectData.tv)); //int s = SelectData.nSockets; if (s > 0) { /* Changed 01Jun07. We used to handle the Loop-breaker right here. * Now we do it AFTER all the regular descriptors. There's an * incredibly important and subtle reason for this. Code on * loop breakers is sometimes used to cause the reactor core to * cycle (for example, to allow outbound network buffers to drain). * If a loop-breaker handler reschedules itself (say, after determining * that the write buffers are still too full), then it will execute * IMMEDIATELY if _ReadLoopBreaker is done here instead of after * the other descriptors are processed. That defeats the whole purpose. */ for (i=0; i < Descriptors.size(); i++) { EventableDescriptor *ed = Descriptors[i]; assert (ed); int sd = ed->GetSocket(); assert (sd != INVALID_SOCKET); if (FD_ISSET (sd, &(SelectData.fdwrites))) ed->Write(); if (FD_ISSET (sd, &(SelectData.fdreads))) ed->Read(); } if (FD_ISSET (LoopBreakerReader, &(SelectData.fdreads))) _ReadLoopBreaker(); } else if (s < 0) { // select can fail on error in a handful of ways. // If this happens, then wait for a little while to avoid busy-looping. // If the error was EINTR, we probably caught SIGCHLD or something, // so keep the wait short. timeval tv = {0, ((errno == EINTR) ? 5 : 50) * 1000}; EmSelect (0, NULL, NULL, NULL, &tv); } } { // dispatch heartbeats if (gCurrentLoopTime >= NextHeartbeatTime) { NextHeartbeatTime = gCurrentLoopTime + HeartbeatInterval; for (i=0; i < Descriptors.size(); i++) { EventableDescriptor *ed = Descriptors[i]; assert (ed); ed->Heartbeat(); } } } { // cleanup dying sockets // vector::pop_back works in constant time. int i, j; int nSockets = Descriptors.size(); for (i=0, j=0; i < nSockets; i++) { EventableDescriptor *ed = Descriptors[i]; assert (ed); if (ed->ShouldDelete()) delete ed; else Descriptors [j++] = ed; } while ((size_t)j < Descriptors.size()) Descriptors.pop_back(); } return true; } /******************************** EventMachine_t::_ReadLoopBreaker ********************************/ void EventMachine_t::_ReadLoopBreaker() { /* The loop breaker has selected readable. * Read it ONCE (it may block if we try to read it twice) * and send a loop-break event back to user code. */ char buffer [1024]; read (LoopBreakerReader, buffer, sizeof(buffer)); if (EventCallback) (*EventCallback)("", EM_LOOPBREAK_SIGNAL, "", 0); } /************************** EventMachine_t::_RunTimers **************************/ bool EventMachine_t::_RunTimers() { // These are caller-defined timer handlers. // Return T/F to indicate whether we should continue the main loop. // We rely on the fact that multimaps sort by their keys to avoid // inspecting the whole list every time we come here. // Just keep inspecting and processing the list head until we hit // one that hasn't expired yet. #ifdef OS_UNIX struct timeval tv; gettimeofday (&tv, NULL); Int64 now = (((Int64)(tv.tv_sec)) * 1000000LL) + ((Int64)(tv.tv_usec)); #endif #ifdef OS_WIN32 unsigned tick = GetTickCount(); if (tick < gLastTickCount) gTickCountTickover += 1; gLastTickCount = tick; Int64 now = ((Int64)gTickCountTickover << 32) + (Int64)tick; #endif while (true) { multimap::iterator i = Timers.begin(); if (i == Timers.end()) break; if (i->first > now) break; if (EventCallback) (*EventCallback) ("", EM_TIMER_FIRED, i->second.GetBinding().c_str(), i->second.GetBinding().length()); Timers.erase (i); } return true; } /*********************************** EventMachine_t::InstallOneshotTimer ***********************************/ const char *EventMachine_t::InstallOneshotTimer (int milliseconds) { if (Timers.size() > MaxOutstandingTimers) return false; // Don't use the global loop-time variable here, because we might // get called before the main event machine is running. #ifdef OS_UNIX struct timeval tv; gettimeofday (&tv, NULL); Int64 fire_at = (((Int64)(tv.tv_sec)) * 1000000LL) + ((Int64)(tv.tv_usec)); fire_at += ((Int64)milliseconds) * 1000LL; #endif #ifdef OS_WIN32 unsigned tick = GetTickCount(); if (tick < gLastTickCount) gTickCountTickover += 1; gLastTickCount = tick; Int64 fire_at = ((Int64)gTickCountTickover << 32) + (Int64)tick; fire_at += (Int64)milliseconds; #endif Timer_t t; multimap::iterator i = Timers.insert (make_pair (fire_at, t)); return i->second.GetBindingChars(); } /******************************* EventMachine_t::ConnectToServer *******************************/ const char *EventMachine_t::ConnectToServer (const char *server, int port) { /* We want to spend no more than a few seconds waiting for a connection * to a remote host. So we use a nonblocking connect. * Linux disobeys the usual rules for nonblocking connects. * Per Stevens (UNP p.410), you expect a nonblocking connect to select * both readable and writable on error, and not to return EINPROGRESS * if the connect can be fulfilled immediately. Linux violates both * of these expectations. * Any kind of nonblocking connect on Linux returns EINPROGRESS. * The socket will then return writable when the disposition of the * connect is known, but it will not also be readable in case of * error! Weirdly, it will be readable in case there is data to read!!! * (Which can happen with protocols like SSH and SMTP.) * I suppose if you were so inclined you could consider this logical, * but it's not the way Unix has historically done it. * So we ignore the readable flag and read getsockopt to see if there * was an error connecting. A select timeout works as expected. * In regard to getsockopt: Linux does the Berkeley-style thing, * not the Solaris-style, and returns zero with the error code in * the error parameter. * Return the binding-text of the newly-created pending connection, * or NULL if there was a problem. */ if (!server || !*server || !port) return NULL; int family, bind_size; struct sockaddr *bind_as = name2address (server, port, &family, &bind_size); if (!bind_as) return NULL; int sd = socket (family, SOCK_STREAM, 0); if (sd == INVALID_SOCKET) return NULL; /* sockaddr_in pin; unsigned long HostAddr; HostAddr = inet_addr (server); if (HostAddr == INADDR_NONE) { hostent *hp = gethostbyname ((char*)server); // Windows requires (char*) if (!hp) { // TODO: This gives the caller a fatal error. Not good. // They can respond by catching RuntimeError (blecch). // Possibly we need to fire an unbind event and provide // a status code so user code can detect the cause of the // failure. return NULL; } HostAddr = ((in_addr*)(hp->h_addr))->s_addr; } memset (&pin, 0, sizeof(pin)); pin.sin_family = AF_INET; pin.sin_addr.s_addr = HostAddr; pin.sin_port = htons (port); int sd = socket (AF_INET, SOCK_STREAM, 0); if (sd == INVALID_SOCKET) return NULL; */ // From here on, ALL error returns must close the socket. // Set the new socket nonblocking. if (!SetSocketNonblocking (sd)) { closesocket (sd); return NULL; } // Disable slow-start (Nagle algorithm). int one = 1; setsockopt (sd, IPPROTO_TCP, TCP_NODELAY, (char*) &one, sizeof(one)); const char *out = NULL; #ifdef OS_UNIX //if (connect (sd, (sockaddr*)&pin, sizeof pin) == 0) { if (connect (sd, bind_as, bind_size) == 0) { // This is a connect success, which Linux appears // never to give when the socket is nonblocking, // even if the connection is intramachine or to // localhost. /* Changed this branch 08Aug06. Evidently some kernels * (FreeBSD for example) will actually return success from * a nonblocking connect. This is a pretty simple case, * just set up the new connection and clear the pending flag. * Thanks to Chris Ochs for helping track this down. * This branch never gets taken on Linux or (oddly) OSX. * The original behavior was to throw an unimplemented, * which the user saw as a fatal exception. Very unfriendly. * * Tweaked 10Aug06. Even though the connect disposition is * known, we still set the connect-pending flag. That way * some needed initialization will happen in the ConnectionDescriptor. * (To wit, the ConnectionCompleted event gets sent to the client.) */ ConnectionDescriptor *cd = new ConnectionDescriptor (sd, this); if (!cd) throw std::runtime_error ("no connection allocated"); cd->SetConnectPending (true); Add (cd); out = cd->GetBinding().c_str(); } else if (errno == EINPROGRESS) { // Errno will generally always be EINPROGRESS, but on Linux // we have to look at getsockopt to be sure what really happened. int error; socklen_t len; len = sizeof(error); int o = getsockopt (sd, SOL_SOCKET, SO_ERROR, &error, &len); if ((o == 0) && (error == 0)) { // Here, there's no disposition. // Put the connection on the stack and wait for it to complete // or time out. ConnectionDescriptor *cd = new ConnectionDescriptor (sd, this); if (!cd) throw std::runtime_error ("no connection allocated"); cd->SetConnectPending (true); Add (cd); out = cd->GetBinding().c_str(); } else { /* This could be connection refused or some such thing. * We will come here on Linux if a localhost connection fails. * Changed 16Jul06: Originally this branch was a no-op, and * we'd drop down to the end of the method, close the socket, * and return NULL, which would cause the caller to GET A * FATAL EXCEPTION. Now we keep the socket around but schedule an * immediate close on it, so the caller will get a close-event * scheduled on it. This was only an issue for localhost connections * to non-listening ports. We may eventually need to revise this * revised behavior, in case it causes problems like making it hard * for people to know that a failure occurred. */ ConnectionDescriptor *cd = new ConnectionDescriptor (sd, this); if (!cd) throw std::runtime_error ("no connection allocated"); cd->ScheduleClose (false); Add (cd); out = cd->GetBinding().c_str(); } } else { // The error from connect was something other then EINPROGRESS. } #endif #ifdef OS_WIN32 //if (connect (sd, (sockaddr*)&pin, sizeof pin) == 0) { if (connect (sd, bind_as, bind_size) == 0) { // This is a connect success, which Windows appears // never to give when the socket is nonblocking, // even if the connection is intramachine or to // localhost. throw std::runtime_error ("unimplemented"); } else if (WSAGetLastError() == WSAEWOULDBLOCK) { // Here, there's no disposition. // Windows appears not to surface refused connections or // such stuff at this point. // Put the connection on the stack and wait for it to complete // or time out. ConnectionDescriptor *cd = new ConnectionDescriptor (sd, this); if (!cd) throw std::runtime_error ("no connection allocated"); cd->SetConnectPending (true); Add (cd); out = cd->GetBinding().c_str(); } else { // The error from connect was something other then WSAEWOULDBLOCK. } #endif if (out == NULL) closesocket (sd); return out; } /*********************************** EventMachine_t::ConnectToUnixServer ***********************************/ const char *EventMachine_t::ConnectToUnixServer (const char *server) { /* Connect to a Unix-domain server, which by definition is running * on the same host. * There is no meaningful implementation on Windows. * There's no need to do a nonblocking connect, since the connection * is always local and can always be fulfilled immediately. */ #ifdef OS_WIN32 throw std::runtime_error ("unix-domain connection unavailable on this platform"); return NULL; #endif // The whole rest of this function is only compiled on Unix systems. #ifdef OS_UNIX const char *out = NULL; if (!server || !*server) return NULL; sockaddr_un pun; memset (&pun, 0, sizeof(pun)); pun.sun_family = AF_LOCAL; // You ordinarily expect the server name field to be at least 1024 bytes long, // but on Linux it can be MUCH shorter. if (strlen(server) >= sizeof(pun.sun_path)) throw std::runtime_error ("unix-domain server name is too long"); strcpy (pun.sun_path, server); int fd = socket (AF_LOCAL, SOCK_STREAM, 0); if (fd == INVALID_SOCKET) return NULL; // From here on, ALL error returns must close the socket. // NOTE: At this point, the socket is still a blocking socket. if (connect (fd, (struct sockaddr*)&pun, sizeof(pun)) != 0) { closesocket (fd); return NULL; } // Set the newly-connected socket nonblocking. if (!SetSocketNonblocking (fd)) { closesocket (fd); return NULL; } // Set up a connection descriptor and add it to the event-machine. // Observe, even though we know the connection status is connect-success, // we still set the "pending" flag, so some needed initializations take // place. ConnectionDescriptor *cd = new ConnectionDescriptor (fd, this); if (!cd) throw std::runtime_error ("no connection allocated"); cd->SetConnectPending (true); Add (cd); out = cd->GetBinding().c_str(); if (out == NULL) closesocket (fd); return out; #endif } /************************ EventMachine_t::AttachFD ************************/ const char *EventMachine_t::AttachFD (int fd, bool notify_readable, bool notify_writable) { #ifdef OS_UNIX if (fcntl(fd, F_GETFL, 0) < 0) throw std::runtime_error ("invalid file descriptor"); #endif #ifdef OS_WIN32 // TODO: add better check for invalid file descriptors (see ioctlsocket or getsockopt) if (fd == INVALID_SOCKET) throw std::runtime_error ("invalid file descriptor"); #endif {// Check for duplicate descriptors size_t i; for (i = 0; i < Descriptors.size(); i++) { EventableDescriptor *ed = Descriptors[i]; assert (ed); if (ed->GetSocket() == fd) throw std::runtime_error ("adding existing descriptor"); } for (i = 0; i < NewDescriptors.size(); i++) { EventableDescriptor *ed = NewDescriptors[i]; assert (ed); if (ed->GetSocket() == fd) throw std::runtime_error ("adding existing new descriptor"); } } ConnectionDescriptor *cd = new ConnectionDescriptor (fd, this); if (!cd) throw std::runtime_error ("no connection allocated"); cd->SetConnectPending (true); cd->SetNotifyReadable (notify_readable); cd->SetNotifyWritable (notify_writable); Add (cd); const char *out = NULL; out = cd->GetBinding().c_str(); if (out == NULL) closesocket (fd); return out; } /************************ EventMachine_t::DetachFD ************************/ int EventMachine_t::DetachFD (EventableDescriptor *ed) { if (!ed) throw std::runtime_error ("detaching bad descriptor"); #ifdef HAVE_EPOLL if (bEpoll) { if (ed->GetSocket() != INVALID_SOCKET) { assert (bEpoll); // wouldn't be in this method otherwise. assert (epfd != -1); int e = epoll_ctl (epfd, EPOLL_CTL_DEL, ed->GetSocket(), ed->GetEpollEvent()); // ENOENT or EBADF are not errors because the socket may be already closed when we get here. if (e && (errno != ENOENT) && (errno != EBADF)) { char buf [200]; snprintf (buf, sizeof(buf)-1, "unable to delete epoll event: %s", strerror(errno)); throw std::runtime_error (buf); } } } #endif #ifdef HAVE_KQUEUE if (bKqueue) { struct kevent k; EV_SET (&k, ed->GetSocket(), EVFILT_READ, EV_DELETE, 0, 0, ed); int t = kevent (kqfd, &k, 1, NULL, 0, NULL); assert (t == 0); } #endif { // remove descriptor from lists int i, j; int nSockets = Descriptors.size(); for (i=0, j=0; i < nSockets; i++) { EventableDescriptor *ted = Descriptors[i]; assert (ted); if (ted != ed) Descriptors [j++] = ted; } while ((size_t)j < Descriptors.size()) Descriptors.pop_back(); ModifiedDescriptors.erase (ed); } int fd = ed->GetSocket(); // We depend on ~EventableDescriptor not calling close() if the socket is invalid ed->SetSocketInvalid(); delete ed; return fd; } /************ name2address ************/ struct sockaddr *name2address (const char *server, int port, int *family, int *bind_size) { // THIS IS NOT RE-ENTRANT OR THREADSAFE. Optimize for speed. // Check the more-common cases first. // Return NULL if no resolution. static struct sockaddr_in in4; #ifndef __CYGWIN__ static struct sockaddr_in6 in6; #endif struct hostent *hp; if (!server || !*server) server = "0.0.0.0"; memset (&in4, 0, sizeof(in4)); if ( (in4.sin_addr.s_addr = inet_addr (server)) != INADDR_NONE) { if (family) *family = AF_INET; if (bind_size) *bind_size = sizeof(in4); in4.sin_family = AF_INET; in4.sin_port = htons (port); return (struct sockaddr*)&in4; } #if defined(OS_UNIX) && !defined(__CYGWIN__) memset (&in6, 0, sizeof(in6)); if (inet_pton (AF_INET6, server, in6.sin6_addr.s6_addr) > 0) { if (family) *family = AF_INET6; if (bind_size) *bind_size = sizeof(in6); in6.sin6_family = AF_INET6; in6.sin6_port = htons (port); return (struct sockaddr*)&in6; } #endif #ifdef OS_WIN32 // TODO, must complete this branch. Windows doesn't have inet_pton. // A possible approach is to make a getaddrinfo call with the supplied // server address, constraining the hints to ipv6 and seeing if we // get any addresses. // For the time being, Ipv6 addresses aren't supported on Windows. #endif hp = gethostbyname ((char*)server); // Windows requires the cast. if (hp) { in4.sin_addr.s_addr = ((in_addr*)(hp->h_addr))->s_addr; if (family) *family = AF_INET; if (bind_size) *bind_size = sizeof(in4); in4.sin_family = AF_INET; in4.sin_port = htons (port); return (struct sockaddr*)&in4; } return NULL; } /******************************* EventMachine_t::CreateTcpServer *******************************/ const char *EventMachine_t::CreateTcpServer (const char *server, int port) { /* Create a TCP-acceptor (server) socket and add it to the event machine. * Return the binding of the new acceptor to the caller. * This binding will be referenced when the new acceptor sends events * to indicate accepted connections. */ int family, bind_size; struct sockaddr *bind_here = name2address (server, port, &family, &bind_size); if (!bind_here) return NULL; const char *output_binding = NULL; //struct sockaddr_in sin; int sd_accept = socket (family, SOCK_STREAM, 0); if (sd_accept == INVALID_SOCKET) { goto fail; } /* memset (&sin, 0, sizeof(sin)); sin.sin_family = AF_INET; sin.sin_addr.s_addr = INADDR_ANY; sin.sin_port = htons (port); if (server && *server) { sin.sin_addr.s_addr = inet_addr (server); if (sin.sin_addr.s_addr == INADDR_NONE) { hostent *hp = gethostbyname ((char*)server); // Windows requires the cast. if (hp == NULL) { //__warning ("hostname not resolved: ", server); goto fail; } sin.sin_addr.s_addr = ((in_addr*)(hp->h_addr))->s_addr; } } */ { // set reuseaddr to improve performance on restarts. int oval = 1; if (setsockopt (sd_accept, SOL_SOCKET, SO_REUSEADDR, (char*)&oval, sizeof(oval)) < 0) { //__warning ("setsockopt failed while creating listener",""); goto fail; } } { // set CLOEXEC. Only makes sense on Unix #ifdef OS_UNIX int cloexec = fcntl (sd_accept, F_GETFD, 0); assert (cloexec >= 0); cloexec |= FD_CLOEXEC; fcntl (sd_accept, F_SETFD, cloexec); #endif } //if (bind (sd_accept, (struct sockaddr*)&sin, sizeof(sin))) { if (bind (sd_accept, bind_here, bind_size)) { //__warning ("binding failed"); goto fail; } if (listen (sd_accept, 100)) { //__warning ("listen failed"); goto fail; } { // Set the acceptor non-blocking. // THIS IS CRUCIALLY IMPORTANT because we read it in a select loop. if (!SetSocketNonblocking (sd_accept)) { //int val = fcntl (sd_accept, F_GETFL, 0); //if (fcntl (sd_accept, F_SETFL, val | O_NONBLOCK) == -1) { goto fail; } } { // Looking good. AcceptorDescriptor *ad = new AcceptorDescriptor (sd_accept, this); if (!ad) throw std::runtime_error ("unable to allocate acceptor"); Add (ad); output_binding = ad->GetBinding().c_str(); } return output_binding; fail: if (sd_accept != INVALID_SOCKET) closesocket (sd_accept); return NULL; } /********************************** EventMachine_t::OpenDatagramSocket **********************************/ const char *EventMachine_t::OpenDatagramSocket (const char *address, int port) { const char *output_binding = NULL; int sd = socket (AF_INET, SOCK_DGRAM, 0); if (sd == INVALID_SOCKET) goto fail; // from here on, early returns must close the socket! struct sockaddr_in sin; memset (&sin, 0, sizeof(sin)); sin.sin_family = AF_INET; sin.sin_port = htons (port); if (address && *address) { sin.sin_addr.s_addr = inet_addr (address); if (sin.sin_addr.s_addr == INADDR_NONE) { hostent *hp = gethostbyname ((char*)address); // Windows requires the cast. if (hp == NULL) goto fail; sin.sin_addr.s_addr = ((in_addr*)(hp->h_addr))->s_addr; } } else sin.sin_addr.s_addr = htonl (INADDR_ANY); // Set the new socket nonblocking. { if (!SetSocketNonblocking (sd)) //int val = fcntl (sd, F_GETFL, 0); //if (fcntl (sd, F_SETFL, val | O_NONBLOCK) == -1) goto fail; } if (bind (sd, (struct sockaddr*)&sin, sizeof(sin)) != 0) goto fail; { // Looking good. DatagramDescriptor *ds = new DatagramDescriptor (sd, this); if (!ds) throw std::runtime_error ("unable to allocate datagram-socket"); Add (ds); output_binding = ds->GetBinding().c_str(); } return output_binding; fail: if (sd != INVALID_SOCKET) closesocket (sd); return NULL; } /******************* EventMachine_t::Add *******************/ void EventMachine_t::Add (EventableDescriptor *ed) { if (!ed) throw std::runtime_error ("added bad descriptor"); ed->SetEventCallback (EventCallback); NewDescriptors.push_back (ed); } /******************************* EventMachine_t::ArmKqueueWriter *******************************/ void EventMachine_t::ArmKqueueWriter (EventableDescriptor *ed) { #ifdef HAVE_KQUEUE if (bKqueue) { if (!ed) throw std::runtime_error ("added bad descriptor"); struct kevent k; EV_SET (&k, ed->GetSocket(), EVFILT_WRITE, EV_ADD | EV_ONESHOT, 0, 0, ed); int t = kevent (kqfd, &k, 1, NULL, 0, NULL); assert (t == 0); } #endif } /******************************* EventMachine_t::ArmKqueueReader *******************************/ void EventMachine_t::ArmKqueueReader (EventableDescriptor *ed) { #ifdef HAVE_KQUEUE if (bKqueue) { if (!ed) throw std::runtime_error ("added bad descriptor"); struct kevent k; EV_SET (&k, ed->GetSocket(), EVFILT_READ, EV_ADD, 0, 0, ed); int t = kevent (kqfd, &k, 1, NULL, 0, NULL); assert (t == 0); } #endif } /********************************** EventMachine_t::_AddNewDescriptors **********************************/ void EventMachine_t::_AddNewDescriptors() { /* Avoid adding descriptors to the main descriptor list * while we're actually traversing the list. * Any descriptors that are added as a result of processing timers * or acceptors should go on a temporary queue and then added * while we're not traversing the main list. * Also, it (rarely) happens that a newly-created descriptor * is immediately scheduled to close. It might be a good * idea not to bother scheduling these for I/O but if * we do that, we might bypass some important processing. */ for (size_t i = 0; i < NewDescriptors.size(); i++) { EventableDescriptor *ed = NewDescriptors[i]; if (ed == NULL) throw std::runtime_error ("adding bad descriptor"); #if HAVE_EPOLL if (bEpoll) { assert (epfd != -1); int e = epoll_ctl (epfd, EPOLL_CTL_ADD, ed->GetSocket(), ed->GetEpollEvent()); if (e) { char buf [200]; snprintf (buf, sizeof(buf)-1, "unable to add new descriptor: %s", strerror(errno)); throw std::runtime_error (buf); } } #endif #if HAVE_KQUEUE /* if (bKqueue) { // INCOMPLETE. Some descriptors don't want to be readable. assert (kqfd != -1); struct kevent k; EV_SET (&k, ed->GetSocket(), EVFILT_READ, EV_ADD, 0, 0, ed); int t = kevent (kqfd, &k, 1, NULL, 0, NULL); assert (t == 0); } */ #endif Descriptors.push_back (ed); } NewDescriptors.clear(); } /********************************** EventMachine_t::_ModifyDescriptors **********************************/ void EventMachine_t::_ModifyDescriptors() { /* For implementations which don't level check every descriptor on * every pass through the machine, as select does. * If we're not selecting, then descriptors need a way to signal to the * machine that their readable or writable status has changed. * That's what the ::Modify call is for. We do it this way to avoid * modifying descriptors during the loop traversal, where it can easily * happen that an object (like a UDP socket) gets data written on it by * the application during #post_init. That would take place BEFORE the * descriptor even gets added to the epoll descriptor, so the modify * operation will crash messily. * Another really messy possibility is for a descriptor to put itself * on the Modified list, and then get deleted before we get here. * Remember, deletes happen after the I/O traversal and before the * next pass through here. So we have to make sure when we delete a * descriptor to remove it from the Modified list. */ #ifdef HAVE_EPOLL if (bEpoll) { set::iterator i = ModifiedDescriptors.begin(); while (i != ModifiedDescriptors.end()) { assert (*i); _ModifyEpollEvent (*i); ++i; } } #endif ModifiedDescriptors.clear(); } /********************** EventMachine_t::Modify **********************/ void EventMachine_t::Modify (EventableDescriptor *ed) { if (!ed) throw std::runtime_error ("modified bad descriptor"); ModifiedDescriptors.insert (ed); } /*********************************** EventMachine_t::_OpenFileForWriting ***********************************/ const char *EventMachine_t::_OpenFileForWriting (const char *filename) { /* * Return the binding-text of the newly-opened file, * or NULL if there was a problem. */ if (!filename || !*filename) return NULL; int fd = open (filename, O_CREAT|O_TRUNC|O_WRONLY|O_NONBLOCK, 0644); FileStreamDescriptor *fsd = new FileStreamDescriptor (fd, this); if (!fsd) throw std::runtime_error ("no file-stream allocated"); Add (fsd); return fsd->GetBinding().c_str(); } /************************************** EventMachine_t::CreateUnixDomainServer **************************************/ const char *EventMachine_t::CreateUnixDomainServer (const char *filename) { /* Create a UNIX-domain acceptor (server) socket and add it to the event machine. * Return the binding of the new acceptor to the caller. * This binding will be referenced when the new acceptor sends events * to indicate accepted connections. * THERE IS NO MEANINGFUL IMPLEMENTATION ON WINDOWS. */ #ifdef OS_WIN32 throw std::runtime_error ("unix-domain server unavailable on this platform"); #endif // The whole rest of this function is only compiled on Unix systems. #ifdef OS_UNIX const char *output_binding = NULL; struct sockaddr_un s_sun; int sd_accept = socket (AF_LOCAL, SOCK_STREAM, 0); if (sd_accept == INVALID_SOCKET) { goto fail; } if (!filename || !*filename) goto fail; unlink (filename); bzero (&s_sun, sizeof(s_sun)); s_sun.sun_family = AF_LOCAL; strncpy (s_sun.sun_path, filename, sizeof(s_sun.sun_path)-1); // don't bother with reuseaddr for a local socket. { // set CLOEXEC. Only makes sense on Unix #ifdef OS_UNIX int cloexec = fcntl (sd_accept, F_GETFD, 0); assert (cloexec >= 0); cloexec |= FD_CLOEXEC; fcntl (sd_accept, F_SETFD, cloexec); #endif } if (bind (sd_accept, (struct sockaddr*)&s_sun, sizeof(s_sun))) { //__warning ("binding failed"); goto fail; } if (listen (sd_accept, 100)) { //__warning ("listen failed"); goto fail; } { // Set the acceptor non-blocking. // THIS IS CRUCIALLY IMPORTANT because we read it in a select loop. if (!SetSocketNonblocking (sd_accept)) { //int val = fcntl (sd_accept, F_GETFL, 0); //if (fcntl (sd_accept, F_SETFL, val | O_NONBLOCK) == -1) { goto fail; } } { // Looking good. AcceptorDescriptor *ad = new AcceptorDescriptor (sd_accept, this); if (!ad) throw std::runtime_error ("unable to allocate acceptor"); Add (ad); output_binding = ad->GetBinding().c_str(); } return output_binding; fail: if (sd_accept != INVALID_SOCKET) closesocket (sd_accept); return NULL; #endif // OS_UNIX } /********************* EventMachine_t::Popen *********************/ #if OBSOLETE const char *EventMachine_t::Popen (const char *cmd, const char *mode) { #ifdef OS_WIN32 throw std::runtime_error ("popen is currently unavailable on this platform"); #endif // The whole rest of this function is only compiled on Unix systems. // Eventually we need this functionality (or a full-duplex equivalent) on Windows. #ifdef OS_UNIX const char *output_binding = NULL; FILE *fp = popen (cmd, mode); if (!fp) return NULL; // From here, all early returns must pclose the stream. // According to the pipe(2) manpage, descriptors returned from pipe have both // CLOEXEC and NONBLOCK clear. Do NOT set CLOEXEC. DO set nonblocking. if (!SetSocketNonblocking (fileno (fp))) { pclose (fp); return NULL; } { // Looking good. PipeDescriptor *pd = new PipeDescriptor (fp, this); if (!pd) throw std::runtime_error ("unable to allocate pipe"); Add (pd); output_binding = pd->GetBinding().c_str(); } return output_binding; #endif } #endif // OBSOLETE /************************** EventMachine_t::Socketpair **************************/ const char *EventMachine_t::Socketpair (char * const*cmd_strings) { #ifdef OS_WIN32 throw std::runtime_error ("socketpair is currently unavailable on this platform"); #endif // The whole rest of this function is only compiled on Unix systems. // Eventually we need this functionality (or a full-duplex equivalent) on Windows. #ifdef OS_UNIX // Make sure the incoming array of command strings is sane. if (!cmd_strings) return NULL; int j; for (j=0; j < 100 && cmd_strings[j]; j++) ; if ((j==0) || (j==100)) return NULL; const char *output_binding = NULL; int sv[2]; if (socketpair (AF_LOCAL, SOCK_STREAM, 0, sv) < 0) return NULL; // from here, all early returns must close the pair of sockets. // Set the parent side of the socketpair nonblocking. // We don't care about the child side, and most child processes will expect their // stdout to be blocking. Thanks to Duane Johnson and Bill Kelly for pointing this out. // Obviously DON'T set CLOEXEC. if (!SetSocketNonblocking (sv[0])) { close (sv[0]); close (sv[1]); return NULL; } pid_t f = fork(); if (f > 0) { close (sv[1]); PipeDescriptor *pd = new PipeDescriptor (sv[0], f, this); if (!pd) throw std::runtime_error ("unable to allocate pipe"); Add (pd); output_binding = pd->GetBinding().c_str(); } else if (f == 0) { close (sv[0]); dup2 (sv[1], STDIN_FILENO); close (sv[1]); dup2 (STDIN_FILENO, STDOUT_FILENO); execvp (cmd_strings[0], cmd_strings+1); exit (-1); // end the child process if the exec doesn't work. } else throw std::runtime_error ("no fork"); return output_binding; #endif } /**************************** EventMachine_t::OpenKeyboard ****************************/ const char *EventMachine_t::OpenKeyboard() { KeyboardDescriptor *kd = new KeyboardDescriptor (this); if (!kd) throw std::runtime_error ("no keyboard-object allocated"); Add (kd); return kd->GetBinding().c_str(); } //#endif // OS_UNIX /***************************************************************************** $Id$ File: emwin.cpp Date: 05May06 Copyright (C) 2006-07 by Francis Cianfrocca. All Rights Reserved. Gmail: blackhedd This program is free software; you can redistribute it and/or modify it under the terms of either: 1) the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version; or 2) Ruby's License. See the file COPYING for complete licensing information. *****************************************************************************/ // THIS ENTIRE FILE IS FOR WINDOWS BUILDS ONLY // INCOMPLETE AND DISABLED FOR NOW. #ifdef xOS_WIN32 #include "project.h" // Keep a global variable floating around // with the current loop time as set by the Event Machine. // This avoids the need for frequent expensive calls to time(NULL); time_t gCurrentLoopTime; /****************************** EventMachine_t::EventMachine_t ******************************/ EventMachine_t::EventMachine_t (void (*event_callback)(const char*, int, const char*, int)): EventCallback (event_callback), NextHeartbeatTime (0) { gTerminateSignalReceived = false; Iocp = NULL; } /******************************* EventMachine_t::~EventMachine_t *******************************/ EventMachine_t::~EventMachine_t() { cerr << "EM __dt\n"; if (Iocp) CloseHandle (Iocp); } /**************************** EventMachine_t::ScheduleHalt ****************************/ void EventMachine_t::ScheduleHalt() { /* This is how we stop the machine. * This can be called by clients. Signal handlers will probably * set the global flag. * For now this means there can only be one EventMachine ever running at a time. */ gTerminateSignalReceived = true; } /******************* EventMachine_t::Run *******************/ void EventMachine_t::Run() { HookControlC (true); Iocp = CreateIoCompletionPort (INVALID_HANDLE_VALUE, NULL, 0, 0); if (Iocp == NULL) throw std::runtime_error ("no completion port"); DWORD nBytes, nCompletionKey; LPOVERLAPPED Overlapped; do { gCurrentLoopTime = time(NULL); // Have some kind of strategy that will dequeue maybe up to 10 completions // without running the timers as long as they are available immediately. // Otherwise in a busy server we're calling them every time through the loop. if (!_RunTimers()) break; if (GetQueuedCompletionStatus (Iocp, &nBytes, &nCompletionKey, &Overlapped, 1000)) { } cerr << "+"; } while (!gTerminateSignalReceived); /* while (true) { gCurrentLoopTime = time(NULL); if (!_RunTimers()) break; _AddNewDescriptors(); if (!_RunOnce()) break; if (gTerminateSignalReceived) break; } */ HookControlC (false); } /************************** EventMachine_t::_RunTimers **************************/ bool EventMachine_t::_RunTimers() { // These are caller-defined timer handlers. // Return T/F to indicate whether we should continue the main loop. // We rely on the fact that multimaps sort by their keys to avoid // inspecting the whole list every time we come here. // Just keep inspecting and processing the list head until we hit // one that hasn't expired yet. while (true) { multimap::iterator i = Timers.begin(); if (i == Timers.end()) break; if (i->first > gCurrentLoopTime) break; if (EventCallback) (*EventCallback) ("", EM_TIMER_FIRED, i->second.GetBinding().c_str(), i->second.GetBinding().length()); Timers.erase (i); } return true; } /*********************************** EventMachine_t::InstallOneshotTimer ***********************************/ const char *EventMachine_t::InstallOneshotTimer (int seconds) { if (Timers.size() > MaxOutstandingTimers) return false; // Don't use the global loop-time variable here, because we might // get called before the main event machine is running. Timer_t t; Timers.insert (make_pair (time(NULL) + seconds, t)); return t.GetBinding().c_str(); } /********************************** EventMachine_t::OpenDatagramSocket **********************************/ const char *EventMachine_t::OpenDatagramSocket (const char *address, int port) { cerr << "OPEN DATAGRAM SOCKET\n"; return "Unimplemented"; } /******************************* EventMachine_t::CreateTcpServer *******************************/ const char *EventMachine_t::CreateTcpServer (const char *server, int port) { /* Create a TCP-acceptor (server) socket and add it to the event machine. * Return the binding of the new acceptor to the caller. * This binding will be referenced when the new acceptor sends events * to indicate accepted connections. */ const char *output_binding = NULL; struct sockaddr_in sin; SOCKET sd_accept = socket (AF_INET, SOCK_STREAM, 0); if (sd_accept == INVALID_SOCKET) { goto fail; } memset (&sin, 0, sizeof(sin)); sin.sin_family = AF_INET; sin.sin_addr.s_addr = INADDR_ANY; sin.sin_port = htons (port); if (server && *server) { sin.sin_addr.s_addr = inet_addr (server); if (sin.sin_addr.s_addr == INADDR_NONE) { hostent *hp = gethostbyname (server); if (hp == NULL) { //__warning ("hostname not resolved: ", server); goto fail; } sin.sin_addr.s_addr = ((in_addr*)(hp->h_addr))->s_addr; } } // No need to set reuseaddr on Windows. if (bind (sd_accept, (struct sockaddr*)&sin, sizeof(sin))) { //__warning ("binding failed"); goto fail; } if (listen (sd_accept, 100)) { //__warning ("listen failed"); goto fail; } { // Looking good. AcceptorDescriptor *ad = new AcceptorDescriptor (this, sd_accept); if (!ad) throw std::runtime_error ("unable to allocate acceptor"); Add (ad); output_binding = ad->GetBinding().c_str(); CreateIoCompletionPort ((HANDLE)sd_accept, Iocp, NULL, 0); SOCKET sd = socket (AF_INET, SOCK_STREAM, 0); CreateIoCompletionPort ((HANDLE)sd, Iocp, NULL, 0); AcceptEx (sd_accept, sd, } return output_binding; fail: if (sd_accept != INVALID_SOCKET) closesocket (sd_accept); return NULL; } /******************************* EventMachine_t::ConnectToServer *******************************/ const char *EventMachine_t::ConnectToServer (const char *server, int port) { if (!server || !*server || !port) return NULL; sockaddr_in pin; unsigned long HostAddr; HostAddr = inet_addr (server); if (HostAddr == INADDR_NONE) { hostent *hp = gethostbyname (server); if (!hp) return NULL; HostAddr = ((in_addr*)(hp->h_addr))->s_addr; } memset (&pin, 0, sizeof(pin)); pin.sin_family = AF_INET; pin.sin_addr.s_addr = HostAddr; pin.sin_port = htons (port); int sd = socket (AF_INET, SOCK_STREAM, 0); if (sd == INVALID_SOCKET) return NULL; LPOVERLAPPED olap = (LPOVERLAPPED) calloc (1, sizeof (OVERLAPPED)); cerr << "I'm dying now\n"; throw runtime_error ("UNIMPLEMENTED!!!\n"); } /******************* EventMachine_t::Add *******************/ void EventMachine_t::Add (EventableDescriptor *ed) { cerr << "ADD\n"; } #endif // OS_WIN32 /***************************************************************************** $Id$ File: epoll.cpp Date: 06Jun07 Copyright (C) 2006-07 by Francis Cianfrocca. All Rights Reserved. Gmail: blackhedd This program is free software; you can redistribute it and/or modify it under the terms of either: 1) the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version; or 2) Ruby's License. See the file COPYING for complete licensing information. *****************************************************************************/ #ifdef HAVE_EPOLL #include "project.h" #endif // HAVE_EPOLL /***************************************************************************** $Id: mapper.cpp 4527 2007-07-04 10:21:34Z francis $ File: mapper.cpp Date: 02Jul07 Copyright (C) 2007 by Francis Cianfrocca. All Rights Reserved. Gmail: garbagecat10 This program is free software; you can redistribute it and/or modify it under the terms of either: 1) the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version; or 2) Ruby's License. See the file COPYING for complete licensing information. *****************************************************************************/ ////////////////////////////////////////////////////////////////////// // UNIX implementation ////////////////////////////////////////////////////////////////////// #ifdef OS_UNIX #include #include #include #include #include #include #include "unistd.h" #include #include #include using namespace std; #include "mapper.h" /****************** Mapper_t::Mapper_t ******************/ Mapper_t::Mapper_t (const string &filename) { /* We ASSUME we can open the file. * (More precisely, we assume someone else checked before we got here.) */ Fd = open (filename.c_str(), O_RDONLY); if (Fd < 0) throw runtime_error (strerror (errno)); struct stat st; if (fstat (Fd, &st)) throw runtime_error (strerror (errno)); FileSize = st.st_size; MapPoint = (const char*) mmap (0, FileSize, PROT_READ, MAP_SHARED, Fd, 0); if (MapPoint == MAP_FAILED) throw runtime_error (strerror (errno)); } /******************* Mapper_t::~Mapper_t *******************/ Mapper_t::~Mapper_t() { Close(); } /*************** Mapper_t::Close ***************/ void Mapper_t::Close() { // Can be called multiple times. // Calls to GetChunk are invalid after a call to Close. if (MapPoint) { munmap ((void*)MapPoint, FileSize); MapPoint = NULL; } if (Fd >= 0) { close (Fd); Fd = -1; } } /****************** Mapper_t::GetChunk ******************/ const char *Mapper_t::GetChunk (unsigned start) { return MapPoint + start; } #endif // OS_UNIX ////////////////////////////////////////////////////////////////////// // WINDOWS implementation ////////////////////////////////////////////////////////////////////// #ifdef OS_WIN32 #include #include #include #include using namespace std; #include "mapper.h" /****************** Mapper_t::Mapper_t ******************/ Mapper_t::Mapper_t (const string &filename) { /* We ASSUME we can open the file. * (More precisely, we assume someone else checked before we got here.) */ hFile = INVALID_HANDLE_VALUE; hMapping = NULL; MapPoint = NULL; FileSize = 0; hFile = CreateFile (filename.c_str(), GENERIC_READ|GENERIC_WRITE, FILE_SHARE_DELETE|FILE_SHARE_READ|FILE_SHARE_WRITE, NULL, OPEN_EXISTING, FILE_ATTRIBUTE_NORMAL, NULL); if (hFile == INVALID_HANDLE_VALUE) throw runtime_error ("File not found"); BY_HANDLE_FILE_INFORMATION i; if (GetFileInformationByHandle (hFile, &i)) FileSize = i.nFileSizeLow; hMapping = CreateFileMapping (hFile, NULL, PAGE_READWRITE, 0, 0, NULL); if (!hMapping) throw runtime_error ("File not mapped"); MapPoint = (const char*) MapViewOfFile (hMapping, FILE_MAP_WRITE, 0, 0, 0); if (!MapPoint) throw runtime_error ("Mappoint not read"); } /******************* Mapper_t::~Mapper_t *******************/ Mapper_t::~Mapper_t() { Close(); } /*************** Mapper_t::Close ***************/ void Mapper_t::Close() { // Can be called multiple times. // Calls to GetChunk are invalid after a call to Close. if (MapPoint) { UnmapViewOfFile (MapPoint); MapPoint = NULL; } if (hMapping != NULL) { CloseHandle (hMapping); hMapping = NULL; } if (hFile != INVALID_HANDLE_VALUE) { CloseHandle (hFile); hMapping = INVALID_HANDLE_VALUE; } } /****************** Mapper_t::GetChunk ******************/ const char *Mapper_t::GetChunk (unsigned start) { return MapPoint + start; } #endif // OS_WINDOWS /***************************************************************************** $Id: rubymain.cpp 4529 2007-07-04 11:32:22Z francis $ File: rubymain.cpp Date: 02Jul07 Copyright (C) 2007 by Francis Cianfrocca. All Rights Reserved. Gmail: garbagecat10 This program is free software; you can redistribute it and/or modify it under the terms of either: 1) the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version; or 2) Ruby's License. See the file COPYING for complete licensing information. *****************************************************************************/ #include #include using namespace std; #include #include "mapper.h" static VALUE EmModule; static VALUE FastFileReader; static VALUE Mapper; /********* mapper_dt *********/ static void mapper_dt (void *ptr) { if (ptr) delete (Mapper_t*) ptr; } /********** mapper_new **********/ static VALUE mapper_new (VALUE self, VALUE filename) { Mapper_t *m = new Mapper_t (StringValuePtr (filename)); if (!m) rb_raise (rb_eException, "No Mapper Object"); VALUE v = Data_Wrap_Struct (Mapper, 0, mapper_dt, (void*)m); return v; } /**************** mapper_get_chunk ****************/ static VALUE mapper_get_chunk (VALUE self, VALUE start, VALUE length) { Mapper_t *m = NULL; Data_Get_Struct (self, Mapper_t, m); if (!m) rb_raise (rb_eException, "No Mapper Object"); // TODO, what if some moron sends us a negative start value? unsigned _start = NUM2INT (start); unsigned _length = NUM2INT (length); if ((_start + _length) > m->GetFileSize()) rb_raise (rb_eException, "Mapper Range Error"); const char *chunk = m->GetChunk (_start); if (!chunk) rb_raise (rb_eException, "No Mapper Chunk"); return rb_str_new (chunk, _length); } /************ mapper_close ************/ static VALUE mapper_close (VALUE self) { Mapper_t *m = NULL; Data_Get_Struct (self, Mapper_t, m); if (!m) rb_raise (rb_eException, "No Mapper Object"); m->Close(); return Qnil; } /*********** mapper_size ***********/ static VALUE mapper_size (VALUE self) { Mapper_t *m = NULL; Data_Get_Struct (self, Mapper_t, m); if (!m) rb_raise (rb_eException, "No Mapper Object"); return INT2NUM (m->GetFileSize()); } /********************** Init_fastfilereaderext **********************/ extern "C" void Init_fastfilereaderext() { EmModule = rb_define_module ("EventMachine"); FastFileReader = rb_define_class_under (EmModule, "FastFileReader", rb_cObject); Mapper = rb_define_class_under (FastFileReader, "Mapper", rb_cObject); rb_define_module_function (Mapper, "new", (VALUE(*)(...))mapper_new, 1); rb_define_method (Mapper, "size", (VALUE(*)(...))mapper_size, 0); rb_define_method (Mapper, "close", (VALUE(*)(...))mapper_close, 0); rb_define_method (Mapper, "get_chunk", (VALUE(*)(...))mapper_get_chunk, 2); } /***************************************************************************** $Id$ File: files.cpp Date: 26Aug06 Copyright (C) 2006-07 by Francis Cianfrocca. All Rights Reserved. Gmail: blackhedd This program is free software; you can redistribute it and/or modify it under the terms of either: 1) the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version; or 2) Ruby's License. See the file COPYING for complete licensing information. *****************************************************************************/ #include "project.h" /****************************************** FileStreamDescriptor::FileStreamDescriptor ******************************************/ FileStreamDescriptor::FileStreamDescriptor (int fd, EventMachine_t *em): EventableDescriptor (fd, em), OutboundDataSize (0) { cerr << "#####"; } /******************************************* FileStreamDescriptor::~FileStreamDescriptor *******************************************/ FileStreamDescriptor::~FileStreamDescriptor() { // Run down any stranded outbound data. for (size_t i=0; i < OutboundPages.size(); i++) OutboundPages[i].Free(); } /************************** FileStreamDescriptor::Read **************************/ void FileStreamDescriptor::Read() { } /*************************** FileStreamDescriptor::Write ***************************/ void FileStreamDescriptor::Write() { } /******************************* FileStreamDescriptor::Heartbeat *******************************/ void FileStreamDescriptor::Heartbeat() { } /*********************************** FileStreamDescriptor::SelectForRead ***********************************/ bool FileStreamDescriptor::SelectForRead() { cerr << "R?"; return false; } /************************************ FileStreamDescriptor::SelectForWrite ************************************/ bool FileStreamDescriptor::SelectForWrite() { cerr << "W?"; return false; } /***************************************************************************** $Id$ File: kb.cpp Date: 24Aug07 Copyright (C) 2006-07 by Francis Cianfrocca. All Rights Reserved. Gmail: blackhedd This program is free software; you can redistribute it and/or modify it under the terms of either: 1) the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version; or 2) Ruby's License. See the file COPYING for complete licensing information. *****************************************************************************/ #include "project.h" /************************************** KeyboardDescriptor::KeyboardDescriptor **************************************/ KeyboardDescriptor::KeyboardDescriptor (EventMachine_t *parent_em): EventableDescriptor (0, parent_em), bReadAttemptedAfterClose (false), LastIo (gCurrentLoopTime), InactivityTimeout (0) { #ifdef HAVE_EPOLL EpollEvent.events = EPOLLIN; #endif #ifdef HAVE_KQUEUE MyEventMachine->ArmKqueueReader (this); #endif } /*************************************** KeyboardDescriptor::~KeyboardDescriptor ***************************************/ KeyboardDescriptor::~KeyboardDescriptor() { } /************************* KeyboardDescriptor::Write *************************/ void KeyboardDescriptor::Write() { // Why are we here? throw std::runtime_error ("bad code path in keyboard handler"); } /***************************** KeyboardDescriptor::Heartbeat *****************************/ void KeyboardDescriptor::Heartbeat() { // no-op } /************************ KeyboardDescriptor::Read ************************/ void KeyboardDescriptor::Read() { char c; read (GetSocket(), &c, 1); if (EventCallback) (*EventCallback)(GetBinding().c_str(), EM_CONNECTION_READ, &c, 1); } #if 0 /****************************** PipeDescriptor::PipeDescriptor ******************************/ PipeDescriptor::PipeDescriptor (int fd, pid_t subpid, EventMachine_t *parent_em): EventableDescriptor (fd, parent_em), bReadAttemptedAfterClose (false), LastIo (gCurrentLoopTime), InactivityTimeout (0), OutboundDataSize (0), SubprocessPid (subpid) { #ifdef HAVE_EPOLL EpollEvent.events = EPOLLIN; #endif } /******************************* PipeDescriptor::~PipeDescriptor *******************************/ PipeDescriptor::~PipeDescriptor() { // Run down any stranded outbound data. for (size_t i=0; i < OutboundPages.size(); i++) OutboundPages[i].Free(); /* As a virtual destructor, we come here before the base-class * destructor that closes our file-descriptor. * We have to make sure the subprocess goes down (if it's not * already down) and we have to reap the zombie. * * This implementation is PROVISIONAL and will surely be improved. * The intention here is that we never block, hence the highly * undesirable sleeps. But if we can't reap the subprocess even * after sending it SIGKILL, then something is wrong and we * throw a fatal exception, which is also not something we should * be doing. * * Eventually the right thing to do will be to have the reactor * core respond to SIGCHLD by chaining a handler on top of the * one Ruby may have installed, and dealing with a list of dead * children that are pending cleanup. * * Since we want to have a signal processor integrated into the * client-visible API, let's wait until that is done before cleaning * this up. */ struct timespec req = {0, 10000000}; kill (SubprocessPid, SIGTERM); nanosleep (&req, NULL); if (waitpid (SubprocessPid, NULL, WNOHANG) == 0) { kill (SubprocessPid, SIGKILL); nanosleep (&req, NULL); if (waitpid (SubprocessPid, NULL, WNOHANG) == 0) throw std::runtime_error ("unable to reap subprocess"); } } /******************** PipeDescriptor::Read ********************/ void PipeDescriptor::Read() { int sd = GetSocket(); if (sd == INVALID_SOCKET) { assert (!bReadAttemptedAfterClose); bReadAttemptedAfterClose = true; return; } LastIo = gCurrentLoopTime; int total_bytes_read = 0; char readbuffer [16 * 1024]; for (int i=0; i < 10; i++) { // Don't read just one buffer and then move on. This is faster // if there is a lot of incoming. // But don't read indefinitely. Give other sockets a chance to run. // NOTICE, we're reading one less than the buffer size. // That's so we can put a guard byte at the end of what we send // to user code. // Use read instead of recv, which on Linux gives a "socket operation // on nonsocket" error. int r = read (sd, readbuffer, sizeof(readbuffer) - 1); //cerr << ""; if (r > 0) { total_bytes_read += r; LastRead = gCurrentLoopTime; // Add a null-terminator at the the end of the buffer // that we will send to the callback. // DO NOT EVER CHANGE THIS. We want to explicitly allow users // to be able to depend on this behavior, so they will have // the option to do some things faster. Additionally it's // a security guard against buffer overflows. readbuffer [r] = 0; if (EventCallback) (*EventCallback)(GetBinding().c_str(), EM_CONNECTION_READ, readbuffer, r); } else if (r == 0) { break; } else { // Basically a would-block, meaning we've read everything there is to read. break; } } if (total_bytes_read == 0) { // If we read no data on a socket that selected readable, // it generally means the other end closed the connection gracefully. ScheduleClose (false); //bCloseNow = true; } } /********************* PipeDescriptor::Write *********************/ void PipeDescriptor::Write() { int sd = GetSocket(); assert (sd != INVALID_SOCKET); LastIo = gCurrentLoopTime; char output_buffer [16 * 1024]; size_t nbytes = 0; while ((OutboundPages.size() > 0) && (nbytes < sizeof(output_buffer))) { OutboundPage *op = &(OutboundPages[0]); if ((nbytes + op->Length - op->Offset) < sizeof (output_buffer)) { memcpy (output_buffer + nbytes, op->Buffer + op->Offset, op->Length - op->Offset); nbytes += (op->Length - op->Offset); op->Free(); OutboundPages.pop_front(); } else { int len = sizeof(output_buffer) - nbytes; memcpy (output_buffer + nbytes, op->Buffer + op->Offset, len); op->Offset += len; nbytes += len; } } // We should never have gotten here if there were no data to write, // so assert that as a sanity check. // Don't bother to make sure nbytes is less than output_buffer because // if it were we probably would have crashed already. assert (nbytes > 0); assert (GetSocket() != INVALID_SOCKET); int bytes_written = write (GetSocket(), output_buffer, nbytes); if (bytes_written > 0) { OutboundDataSize -= bytes_written; if ((size_t)bytes_written < nbytes) { int len = nbytes - bytes_written; char *buffer = (char*) malloc (len + 1); if (!buffer) throw std::runtime_error ("bad alloc throwing back data"); memcpy (buffer, output_buffer + bytes_written, len); buffer [len] = 0; OutboundPages.push_front (OutboundPage (buffer, len)); } #ifdef HAVE_EPOLL EpollEvent.events = (EPOLLIN | (SelectForWrite() ? EPOLLOUT : 0)); assert (MyEventMachine); MyEventMachine->Modify (this); #endif } else { #ifdef OS_UNIX if ((errno != EINPROGRESS) && (errno != EWOULDBLOCK) && (errno != EINTR)) #endif #ifdef OS_WIN32 if ((errno != WSAEINPROGRESS) && (errno != WSAEWOULDBLOCK)) #endif Close(); } } /************************* PipeDescriptor::Heartbeat *************************/ void PipeDescriptor::Heartbeat() { // If an inactivity timeout is defined, then check for it. if (InactivityTimeout && ((gCurrentLoopTime - LastIo) >= InactivityTimeout)) ScheduleClose (false); //bCloseNow = true; } /***************************** PipeDescriptor::SelectForRead *****************************/ bool PipeDescriptor::SelectForRead() { /* Pipe descriptors, being local by definition, don't have * a pending state, so this is simpler than for the * ConnectionDescriptor object. */ return true; } /****************************** PipeDescriptor::SelectForWrite ******************************/ bool PipeDescriptor::SelectForWrite() { /* Pipe descriptors, being local by definition, don't have * a pending state, so this is simpler than for the * ConnectionDescriptor object. */ return (GetOutboundDataSize() > 0); } /******************************** PipeDescriptor::SendOutboundData ********************************/ int PipeDescriptor::SendOutboundData (const char *data, int length) { //if (bCloseNow || bCloseAfterWriting) if (IsCloseScheduled()) return 0; if (!data && (length > 0)) throw std::runtime_error ("bad outbound data"); char *buffer = (char *) malloc (length + 1); if (!buffer) throw std::runtime_error ("no allocation for outbound data"); memcpy (buffer, data, length); buffer [length] = 0; OutboundPages.push_back (OutboundPage (buffer, length)); OutboundDataSize += length; #ifdef HAVE_EPOLL EpollEvent.events = (EPOLLIN | EPOLLOUT); assert (MyEventMachine); MyEventMachine->Modify (this); #endif return length; } /******************************** PipeDescriptor::GetSubprocessPid ********************************/ bool PipeDescriptor::GetSubprocessPid (pid_t *pid) { bool ok = false; if (pid && (SubprocessPid > 0)) { *pid = SubprocessPid; ok = true; } return ok; } #endif /***************************************************************************** $Id$ File: page.cpp Date: 30Apr06 Copyright (C) 2006-07 by Francis Cianfrocca. All Rights Reserved. Gmail: blackhedd This program is free software; you can redistribute it and/or modify it under the terms of either: 1) the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version; or 2) Ruby's License. See the file COPYING for complete licensing information. *****************************************************************************/ #include "project.h" /****************** PageList::PageList ******************/ PageList::PageList() { } /******************* PageList::~PageList *******************/ PageList::~PageList() { while (HasPages()) PopFront(); } /*************** PageList::Front ***************/ void PageList::Front (const char **page, int *length) { assert (page && length); if (HasPages()) { Page p = Pages.front(); *page = p.Buffer; *length = p.Size; } else { *page = NULL; *length = 0; } } /****************** PageList::PopFront ******************/ void PageList::PopFront() { if (HasPages()) { Page p = Pages.front(); Pages.pop_front(); if (p.Buffer) free ((void*)p.Buffer); } } /****************** PageList::HasPages ******************/ bool PageList::HasPages() { return (Pages.size() > 0) ? true : false; } /************** PageList::Push **************/ void PageList::Push (const char *buf, int size) { if (buf && (size > 0)) { char *copy = (char*) malloc (size); if (!copy) throw runtime_error ("no memory in pagelist"); memcpy (copy, buf, size); Pages.push_back (Page (copy, size)); } } /***************************************************************************** $Id$ File: pipe.cpp Date: 30May07 Copyright (C) 2006-07 by Francis Cianfrocca. All Rights Reserved. Gmail: blackhedd This program is free software; you can redistribute it and/or modify it under the terms of either: 1) the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version; or 2) Ruby's License. See the file COPYING for complete licensing information. *****************************************************************************/ #include "project.h" #ifdef OS_UNIX // THIS ENTIRE FILE IS ONLY COMPILED ON UNIX-LIKE SYSTEMS. /****************************** PipeDescriptor::PipeDescriptor ******************************/ PipeDescriptor::PipeDescriptor (int fd, pid_t subpid, EventMachine_t *parent_em): EventableDescriptor (fd, parent_em), bReadAttemptedAfterClose (false), LastIo (gCurrentLoopTime), InactivityTimeout (0), OutboundDataSize (0), SubprocessPid (subpid) { #ifdef HAVE_EPOLL EpollEvent.events = EPOLLIN; #endif #ifdef HAVE_KQUEUE MyEventMachine->ArmKqueueReader (this); #endif } /******************************* PipeDescriptor::~PipeDescriptor *******************************/ PipeDescriptor::~PipeDescriptor() { // Run down any stranded outbound data. for (size_t i=0; i < OutboundPages.size(); i++) OutboundPages[i].Free(); /* As a virtual destructor, we come here before the base-class * destructor that closes our file-descriptor. * We have to make sure the subprocess goes down (if it's not * already down) and we have to reap the zombie. * * This implementation is PROVISIONAL and will surely be improved. * The intention here is that we never block, hence the highly * undesirable sleeps. But if we can't reap the subprocess even * after sending it SIGKILL, then something is wrong and we * throw a fatal exception, which is also not something we should * be doing. * * Eventually the right thing to do will be to have the reactor * core respond to SIGCHLD by chaining a handler on top of the * one Ruby may have installed, and dealing with a list of dead * children that are pending cleanup. * * Since we want to have a signal processor integrated into the * client-visible API, let's wait until that is done before cleaning * this up. * * Added a very ugly hack to support passing the subprocess's exit * status to the user. It only makes logical sense for user code to access * the subprocess exit status in the unbind callback. But unbind is called * back during the EventableDescriptor destructor. So by that time there's * no way to call back this object through an object binding, because it's * already been cleaned up. We might have added a parameter to the unbind * callback, but that would probably break a huge amount of existing code. * So the hack-solution is to define an instance variable in the EventMachine * object and stick the exit status in there, where it can easily be accessed * with an accessor visible to user code. * User code should ONLY access the exit status from within the unbind callback. * Otherwise there's no guarantee it'll be valid. * This hack won't make it impossible to run multiple EventMachines in a single * process, but it will make it impossible to reliably nest unbind calls * within other unbind calls. (Not sure if that's even possible.) */ assert (MyEventMachine); // check if the process is already dead if (waitpid (SubprocessPid, &(MyEventMachine->SubprocessExitStatus), WNOHANG) == 0) { kill (SubprocessPid, SIGTERM); // wait 0.25s for process to die struct timespec req = {0, 250000000}; nanosleep (&req, NULL); if (waitpid (SubprocessPid, &(MyEventMachine->SubprocessExitStatus), WNOHANG) == 0) { kill (SubprocessPid, SIGKILL); // wait 0.5s for process to die struct timespec req = {0, 500000000}; nanosleep (&req, NULL); if (waitpid (SubprocessPid, &(MyEventMachine->SubprocessExitStatus), WNOHANG) == 0) throw std::runtime_error ("unable to reap subprocess"); } } } /******************** PipeDescriptor::Read ********************/ void PipeDescriptor::Read() { int sd = GetSocket(); if (sd == INVALID_SOCKET) { assert (!bReadAttemptedAfterClose); bReadAttemptedAfterClose = true; return; } LastIo = gCurrentLoopTime; int total_bytes_read = 0; char readbuffer [16 * 1024]; for (int i=0; i < 10; i++) { // Don't read just one buffer and then move on. This is faster // if there is a lot of incoming. // But don't read indefinitely. Give other sockets a chance to run. // NOTICE, we're reading one less than the buffer size. // That's so we can put a guard byte at the end of what we send // to user code. // Use read instead of recv, which on Linux gives a "socket operation // on nonsocket" error. int r = read (sd, readbuffer, sizeof(readbuffer) - 1); //cerr << ""; if (r > 0) { total_bytes_read += r; LastRead = gCurrentLoopTime; // Add a null-terminator at the the end of the buffer // that we will send to the callback. // DO NOT EVER CHANGE THIS. We want to explicitly allow users // to be able to depend on this behavior, so they will have // the option to do some things faster. Additionally it's // a security guard against buffer overflows. readbuffer [r] = 0; if (EventCallback) (*EventCallback)(GetBinding().c_str(), EM_CONNECTION_READ, readbuffer, r); } else if (r == 0) { break; } else { // Basically a would-block, meaning we've read everything there is to read. break; } } if (total_bytes_read == 0) { // If we read no data on a socket that selected readable, // it generally means the other end closed the connection gracefully. ScheduleClose (false); //bCloseNow = true; } } /********************* PipeDescriptor::Write *********************/ void PipeDescriptor::Write() { int sd = GetSocket(); assert (sd != INVALID_SOCKET); LastIo = gCurrentLoopTime; char output_buffer [16 * 1024]; size_t nbytes = 0; while ((OutboundPages.size() > 0) && (nbytes < sizeof(output_buffer))) { OutboundPage *op = &(OutboundPages[0]); if ((nbytes + op->Length - op->Offset) < sizeof (output_buffer)) { memcpy (output_buffer + nbytes, op->Buffer + op->Offset, op->Length - op->Offset); nbytes += (op->Length - op->Offset); op->Free(); OutboundPages.pop_front(); } else { int len = sizeof(output_buffer) - nbytes; memcpy (output_buffer + nbytes, op->Buffer + op->Offset, len); op->Offset += len; nbytes += len; } } // We should never have gotten here if there were no data to write, // so assert that as a sanity check. // Don't bother to make sure nbytes is less than output_buffer because // if it were we probably would have crashed already. assert (nbytes > 0); assert (GetSocket() != INVALID_SOCKET); int bytes_written = write (GetSocket(), output_buffer, nbytes); if (bytes_written > 0) { OutboundDataSize -= bytes_written; if ((size_t)bytes_written < nbytes) { int len = nbytes - bytes_written; char *buffer = (char*) malloc (len + 1); if (!buffer) throw std::runtime_error ("bad alloc throwing back data"); memcpy (buffer, output_buffer + bytes_written, len); buffer [len] = 0; OutboundPages.push_front (OutboundPage (buffer, len)); } #ifdef HAVE_EPOLL EpollEvent.events = (EPOLLIN | (SelectForWrite() ? EPOLLOUT : 0)); assert (MyEventMachine); MyEventMachine->Modify (this); #endif } else { #ifdef OS_UNIX if ((errno != EINPROGRESS) && (errno != EWOULDBLOCK) && (errno != EINTR)) #endif #ifdef OS_WIN32 if ((errno != WSAEINPROGRESS) && (errno != WSAEWOULDBLOCK)) #endif Close(); } } /************************* PipeDescriptor::Heartbeat *************************/ void PipeDescriptor::Heartbeat() { // If an inactivity timeout is defined, then check for it. if (InactivityTimeout && ((gCurrentLoopTime - LastIo) >= InactivityTimeout)) ScheduleClose (false); //bCloseNow = true; } /***************************** PipeDescriptor::SelectForRead *****************************/ bool PipeDescriptor::SelectForRead() { /* Pipe descriptors, being local by definition, don't have * a pending state, so this is simpler than for the * ConnectionDescriptor object. */ return true; } /****************************** PipeDescriptor::SelectForWrite ******************************/ bool PipeDescriptor::SelectForWrite() { /* Pipe descriptors, being local by definition, don't have * a pending state, so this is simpler than for the * ConnectionDescriptor object. */ return (GetOutboundDataSize() > 0); } /******************************** PipeDescriptor::SendOutboundData ********************************/ int PipeDescriptor::SendOutboundData (const char *data, int length) { //if (bCloseNow || bCloseAfterWriting) if (IsCloseScheduled()) return 0; if (!data && (length > 0)) throw std::runtime_error ("bad outbound data"); char *buffer = (char *) malloc (length + 1); if (!buffer) throw std::runtime_error ("no allocation for outbound data"); memcpy (buffer, data, length); buffer [length] = 0; OutboundPages.push_back (OutboundPage (buffer, length)); OutboundDataSize += length; #ifdef HAVE_EPOLL EpollEvent.events = (EPOLLIN | EPOLLOUT); assert (MyEventMachine); MyEventMachine->Modify (this); #endif return length; } /******************************** PipeDescriptor::GetSubprocessPid ********************************/ bool PipeDescriptor::GetSubprocessPid (pid_t *pid) { bool ok = false; if (pid && (SubprocessPid > 0)) { *pid = SubprocessPid; ok = true; } return ok; } #endif // OS_UNIX /***************************************************************************** $Id$ File: rubymain.cpp Date: 06Apr06 Copyright (C) 2006-07 by Francis Cianfrocca. All Rights Reserved. Gmail: blackhedd This program is free software; you can redistribute it and/or modify it under the terms of either: 1) the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version; or 2) Ruby's License. See the file COPYING for complete licensing information. *****************************************************************************/ #include "project.h" #include "eventmachine.h" #include /******* Statics *******/ static VALUE EmModule; static VALUE EmConnection; static VALUE Intern_at_signature; static VALUE Intern_at_timers; static VALUE Intern_at_conns; static VALUE Intern_event_callback; static VALUE Intern_run_deferred_callbacks; static VALUE Intern_delete; static VALUE Intern_call; static VALUE Intern_receive_data; static VALUE Intern_notify_readable; static VALUE Intern_notify_writable; /**************** t_event_callback ****************/ static void event_callback (const char *a1, int a2, const char *a3, int a4) { if (a2 == EM_CONNECTION_READ) { VALUE t = rb_ivar_get (EmModule, Intern_at_conns); VALUE q = rb_hash_aref (t, rb_str_new2(a1)); if (q == Qnil) rb_raise (rb_eRuntimeError, "no connection"); rb_funcall (q, Intern_receive_data, 1, rb_str_new (a3, a4)); } else if (a2 == EM_CONNECTION_NOTIFY_READABLE) { VALUE t = rb_ivar_get (EmModule, Intern_at_conns); VALUE q = rb_hash_aref (t, rb_str_new2(a1)); if (q == Qnil) rb_raise (rb_eRuntimeError, "no connection"); rb_funcall (q, Intern_notify_readable, 0); } else if (a2 == EM_CONNECTION_NOTIFY_WRITABLE) { VALUE t = rb_ivar_get (EmModule, Intern_at_conns); VALUE q = rb_hash_aref (t, rb_str_new2(a1)); if (q == Qnil) rb_raise (rb_eRuntimeError, "no connection"); rb_funcall (q, Intern_notify_writable, 0); } else if (a2 == EM_LOOPBREAK_SIGNAL) { rb_funcall (EmModule, Intern_run_deferred_callbacks, 0); } else if (a2 == EM_TIMER_FIRED) { VALUE t = rb_ivar_get (EmModule, Intern_at_timers); VALUE q = rb_funcall (t, Intern_delete, 1, rb_str_new(a3, a4)); if (q == Qnil) rb_raise (rb_eRuntimeError, "no timer"); rb_funcall (q, Intern_call, 0); } else rb_funcall (EmModule, Intern_event_callback, 3, rb_str_new2(a1), (a2 << 1) | 1, rb_str_new(a3,a4)); } /************************** t_initialize_event_machine **************************/ static VALUE t_initialize_event_machine (VALUE self) { evma_initialize_library (event_callback); return Qnil; } /***************************** t_run_machine_without_threads *****************************/ static VALUE t_run_machine_without_threads (VALUE self) { evma_run_machine(); return Qnil; } /******************* t_add_oneshot_timer *******************/ static VALUE t_add_oneshot_timer (VALUE self, VALUE interval) { const char *f = evma_install_oneshot_timer (FIX2INT (interval)); if (!f || !*f) rb_raise (rb_eRuntimeError, "no timer"); return rb_str_new2 (f); } /************** t_start_server **************/ static VALUE t_start_server (VALUE self, VALUE server, VALUE port) { const char *f = evma_create_tcp_server (StringValuePtr(server), FIX2INT(port)); if (!f || !*f) rb_raise (rb_eRuntimeError, "no acceptor"); return rb_str_new2 (f); } /************* t_stop_server *************/ static VALUE t_stop_server (VALUE self, VALUE signature) { evma_stop_tcp_server (StringValuePtr (signature)); return Qnil; } /******************* t_start_unix_server *******************/ static VALUE t_start_unix_server (VALUE self, VALUE filename) { const char *f = evma_create_unix_domain_server (StringValuePtr(filename)); if (!f || !*f) rb_raise (rb_eRuntimeError, "no unix-domain acceptor"); return rb_str_new2 (f); } /*********** t_send_data ***********/ static VALUE t_send_data (VALUE self, VALUE signature, VALUE data, VALUE data_length) { int b = evma_send_data_to_connection (StringValuePtr (signature), StringValuePtr (data), FIX2INT (data_length)); return INT2NUM (b); } /*********** t_start_tls ***********/ static VALUE t_start_tls (VALUE self, VALUE signature) { evma_start_tls (StringValuePtr (signature)); return Qnil; } /*************** t_set_tls_parms ***************/ static VALUE t_set_tls_parms (VALUE self, VALUE signature, VALUE privkeyfile, VALUE certchainfile) { /* set_tls_parms takes a series of positional arguments for specifying such things * as private keys and certificate chains. * It's expected that the parameter list will grow as we add more supported features. * ALL of these parameters are optional, and can be specified as empty or NULL strings. */ evma_set_tls_parms (StringValuePtr (signature), StringValuePtr (privkeyfile), StringValuePtr (certchainfile) ); return Qnil; } /************** t_get_peername **************/ static VALUE t_get_peername (VALUE self, VALUE signature) { struct sockaddr s; if (evma_get_peername (StringValuePtr (signature), &s)) { return rb_str_new ((const char*)&s, sizeof(s)); } return Qnil; } /************** t_get_sockname **************/ static VALUE t_get_sockname (VALUE self, VALUE signature) { struct sockaddr s; if (evma_get_sockname (StringValuePtr (signature), &s)) { return rb_str_new ((const char*)&s, sizeof(s)); } return Qnil; } /******************** t_get_subprocess_pid ********************/ static VALUE t_get_subprocess_pid (VALUE self, VALUE signature) { pid_t pid; if (evma_get_subprocess_pid (StringValuePtr (signature), &pid)) { return INT2NUM (pid); } return Qnil; } /*********************** t_get_subprocess_status ***********************/ static VALUE t_get_subprocess_status (VALUE self, VALUE signature) { int status; if (evma_get_subprocess_status (StringValuePtr (signature), &status)) { return INT2NUM (status); } return Qnil; } /***************************** t_get_comm_inactivity_timeout *****************************/ static VALUE t_get_comm_inactivity_timeout (VALUE self, VALUE signature) { int timeout; if (evma_get_comm_inactivity_timeout (StringValuePtr (signature), &timeout)) return INT2FIX (timeout); return Qnil; } /***************************** t_set_comm_inactivity_timeout *****************************/ static VALUE t_set_comm_inactivity_timeout (VALUE self, VALUE signature, VALUE timeout) { int ti = FIX2INT (timeout); if (evma_set_comm_inactivity_timeout (StringValuePtr (signature), &ti)); return Qtrue; return Qnil; } /*************** t_send_datagram ***************/ static VALUE t_send_datagram (VALUE self, VALUE signature, VALUE data, VALUE data_length, VALUE address, VALUE port) { int b = evma_send_datagram (StringValuePtr (signature), StringValuePtr (data), FIX2INT (data_length), StringValuePtr(address), FIX2INT(port)); return INT2NUM (b); } /****************** t_close_connection ******************/ static VALUE t_close_connection (VALUE self, VALUE signature, VALUE after_writing) { evma_close_connection (StringValuePtr (signature), ((after_writing == Qtrue) ? 1 : 0)); return Qnil; } /******************************** t_report_connection_error_status ********************************/ static VALUE t_report_connection_error_status (VALUE self, VALUE signature) { int b = evma_report_connection_error_status (StringValuePtr (signature)); return INT2NUM (b); } /**************** t_connect_server ****************/ static VALUE t_connect_server (VALUE self, VALUE server, VALUE port) { // Avoid FIX2INT in this case, because it doesn't deal with type errors properly. // Specifically, if the value of port comes in as a string rather than an integer, // NUM2INT will throw a type error, but FIX2INT will generate garbage. const char *f = evma_connect_to_server (StringValuePtr(server), NUM2INT(port)); if (!f || !*f) rb_raise (rb_eRuntimeError, "no connection"); return rb_str_new2 (f); } /********************* t_connect_unix_server *********************/ static VALUE t_connect_unix_server (VALUE self, VALUE serversocket) { const char *f = evma_connect_to_unix_server (StringValuePtr(serversocket)); if (!f || !*f) rb_raise (rb_eRuntimeError, "no connection"); return rb_str_new2 (f); } /*********** t_attach_fd ***********/ static VALUE t_attach_fd (VALUE self, VALUE file_descriptor, VALUE read_mode, VALUE write_mode) { const char *f = evma_attach_fd (NUM2INT(file_descriptor), (read_mode == Qtrue) ? 1 : 0, (write_mode == Qtrue) ? 1 : 0); if (!f || !*f) rb_raise (rb_eRuntimeError, "no connection"); return rb_str_new2 (f); } /*********** t_detach_fd ***********/ static VALUE t_detach_fd (VALUE self, VALUE signature) { return INT2NUM(evma_detach_fd (StringValuePtr(signature))); } /***************** t_open_udp_socket *****************/ static VALUE t_open_udp_socket (VALUE self, VALUE server, VALUE port) { const char *f = evma_open_datagram_socket (StringValuePtr(server), FIX2INT(port)); if (!f || !*f) rb_raise (rb_eRuntimeError, "no datagram socket"); return rb_str_new2 (f); } /***************** t_release_machine *****************/ static VALUE t_release_machine (VALUE self) { evma_release_library(); return Qnil; } /****** t_stop ******/ static VALUE t_stop (VALUE self) { evma_stop_machine(); return Qnil; } /****************** t_signal_loopbreak ******************/ static VALUE t_signal_loopbreak (VALUE self) { evma_signal_loopbreak(); return Qnil; } /************** t_library_type **************/ static VALUE t_library_type (VALUE self) { return rb_eval_string (":extension"); } /******************* t_set_timer_quantum *******************/ static VALUE t_set_timer_quantum (VALUE self, VALUE interval) { evma_set_timer_quantum (FIX2INT (interval)); return Qnil; } /******************** t_set_max_timer_count ********************/ static VALUE t_set_max_timer_count (VALUE self, VALUE ct) { evma_set_max_timer_count (FIX2INT (ct)); return Qnil; } /*************** t_setuid_string ***************/ static VALUE t_setuid_string (VALUE self, VALUE username) { evma_setuid_string (StringValuePtr (username)); return Qnil; } /************* t__write_file *************/ static VALUE t__write_file (VALUE self, VALUE filename) { const char *f = evma__write_file (StringValuePtr (filename)); if (!f || !*f) rb_raise (rb_eRuntimeError, "file not opened"); return rb_str_new2 (f); } /************** t_invoke_popen **************/ static VALUE t_invoke_popen (VALUE self, VALUE cmd) { // 1.8.7+ #ifdef RARRAY_LEN int len = RARRAY_LEN(cmd); #else int len = RARRAY (cmd)->len; #endif if (len > 98) rb_raise (rb_eRuntimeError, "too many arguments to popen"); char *strings [100]; for (int i=0; i < len; i++) { VALUE ix = INT2FIX (i); VALUE s = rb_ary_aref (1, &ix, cmd); strings[i] = StringValuePtr (s); } strings[len] = NULL; const char *f = evma_popen (strings); if (!f || !*f) { char *err = strerror (errno); char buf[100]; memset (buf, 0, sizeof(buf)); snprintf (buf, sizeof(buf)-1, "no popen: %s", (err?err:"???")); rb_raise (rb_eRuntimeError, buf); } return rb_str_new2 (f); } /*************** t_read_keyboard ***************/ static VALUE t_read_keyboard (VALUE self) { const char *f = evma_open_keyboard(); if (!f || !*f) rb_raise (rb_eRuntimeError, "no keyboard reader"); return rb_str_new2 (f); } /******** t__epoll ********/ static VALUE t__epoll (VALUE self) { // Temporary. evma__epoll(); return Qnil; } /********** t__epoll_p **********/ static VALUE t__epoll_p (VALUE self) { #ifdef HAVE_EPOLL return Qtrue; #else return Qfalse; #endif } /********* t__kqueue *********/ static VALUE t__kqueue (VALUE self) { // Temporary. evma__kqueue(); return Qnil; } /*********** t__kqueue_p ***********/ static VALUE t__kqueue_p (VALUE self) { #ifdef HAVE_KQUEUE return Qtrue; #else return Qfalse; #endif } /**************** t_send_file_data ****************/ static VALUE t_send_file_data (VALUE self, VALUE signature, VALUE filename) { /* The current implementation of evma_send_file_data_to_connection enforces a strict * upper limit on the file size it will transmit (currently 32K). The function returns * zero on success, -1 if the requested file exceeds its size limit, and a positive * number for other errors. * TODO: Positive return values are actually errno's, which is probably the wrong way to * do this. For one thing it's ugly. For another, we can't be sure zero is never a real errno. */ int b = evma_send_file_data_to_connection (StringValuePtr(signature), StringValuePtr(filename)); if (b == -1) rb_raise(rb_eRuntimeError, "File too large. send_file_data() supports files under 32k."); if (b > 0) { char *err = strerror (b); char buf[1024]; memset (buf, 0, sizeof(buf)); snprintf (buf, sizeof(buf)-1, ": %s %s", StringValuePtr(filename),(err?err:"???")); rb_raise (rb_eIOError, buf); } return INT2NUM (0); } /******************* t_set_rlimit_nofile *******************/ static VALUE t_set_rlimit_nofile (VALUE self, VALUE arg) { arg = (NIL_P(arg)) ? -1 : NUM2INT (arg); return INT2NUM (evma_set_rlimit_nofile (arg)); } /*************************** conn_get_outbound_data_size ***************************/ static VALUE conn_get_outbound_data_size (VALUE self) { VALUE sig = rb_ivar_get (self, Intern_at_signature); return INT2NUM (evma_get_outbound_data_size (StringValuePtr(sig))); } /****************************** conn_associate_callback_target ******************************/ static VALUE conn_associate_callback_target (VALUE self, VALUE sig) { // No-op for the time being. return Qnil; } /*************** t_get_loop_time ****************/ static VALUE t_get_loop_time (VALUE self) { VALUE cTime = rb_path2class("Time"); if (gCurrentLoopTime != 0) { return rb_funcall(cTime, rb_intern("at"), 1, INT2NUM(gCurrentLoopTime)); } return Qnil; } /********************* Init_rubyeventmachine *********************/ extern "C" void Init_rubyeventmachine() { // Tuck away some symbol values so we don't have to look 'em up every time we need 'em. Intern_at_signature = rb_intern ("@signature"); Intern_at_timers = rb_intern ("@timers"); Intern_at_conns = rb_intern ("@conns"); Intern_event_callback = rb_intern ("event_callback"); Intern_run_deferred_callbacks = rb_intern ("run_deferred_callbacks"); Intern_delete = rb_intern ("delete"); Intern_call = rb_intern ("call"); Intern_receive_data = rb_intern ("receive_data"); Intern_notify_readable = rb_intern ("notify_readable"); Intern_notify_writable = rb_intern ("notify_writable"); // INCOMPLETE, we need to define class Connections inside module EventMachine // run_machine and run_machine_without_threads are now identical. // Must deprecate the without_threads variant. EmModule = rb_define_module ("EventMachine"); EmConnection = rb_define_class_under (EmModule, "Connection", rb_cObject); rb_define_class_under (EmModule, "ConnectionNotBound", rb_eException); rb_define_class_under (EmModule, "NoHandlerForAcceptedConnection", rb_eException); rb_define_class_under (EmModule, "UnknownTimerFired", rb_eException); rb_define_module_function (EmModule, "initialize_event_machine", (VALUE(*)(...))t_initialize_event_machine, 0); rb_define_module_function (EmModule, "run_machine", (VALUE(*)(...))t_run_machine_without_threads, 0); rb_define_module_function (EmModule, "run_machine_without_threads", (VALUE(*)(...))t_run_machine_without_threads, 0); rb_define_module_function (EmModule, "add_oneshot_timer", (VALUE(*)(...))t_add_oneshot_timer, 1); rb_define_module_function (EmModule, "start_tcp_server", (VALUE(*)(...))t_start_server, 2); rb_define_module_function (EmModule, "stop_tcp_server", (VALUE(*)(...))t_stop_server, 1); rb_define_module_function (EmModule, "start_unix_server", (VALUE(*)(...))t_start_unix_server, 1); rb_define_module_function (EmModule, "set_tls_parms", (VALUE(*)(...))t_set_tls_parms, 3); rb_define_module_function (EmModule, "start_tls", (VALUE(*)(...))t_start_tls, 1); rb_define_module_function (EmModule, "send_data", (VALUE(*)(...))t_send_data, 3); rb_define_module_function (EmModule, "send_datagram", (VALUE(*)(...))t_send_datagram, 5); rb_define_module_function (EmModule, "close_connection", (VALUE(*)(...))t_close_connection, 2); rb_define_module_function (EmModule, "report_connection_error_status", (VALUE(*)(...))t_report_connection_error_status, 1); rb_define_module_function (EmModule, "connect_server", (VALUE(*)(...))t_connect_server, 2); rb_define_module_function (EmModule, "connect_unix_server", (VALUE(*)(...))t_connect_unix_server, 1); rb_define_module_function (EmModule, "attach_fd", (VALUE (*)(...))t_attach_fd, 3); rb_define_module_function (EmModule, "detach_fd", (VALUE (*)(...))t_detach_fd, 1); rb_define_module_function (EmModule, "current_time", (VALUE(*)(...))t_get_loop_time, 0); rb_define_module_function (EmModule, "open_udp_socket", (VALUE(*)(...))t_open_udp_socket, 2); rb_define_module_function (EmModule, "read_keyboard", (VALUE(*)(...))t_read_keyboard, 0); rb_define_module_function (EmModule, "release_machine", (VALUE(*)(...))t_release_machine, 0); rb_define_module_function (EmModule, "stop", (VALUE(*)(...))t_stop, 0); rb_define_module_function (EmModule, "signal_loopbreak", (VALUE(*)(...))t_signal_loopbreak, 0); rb_define_module_function (EmModule, "library_type", (VALUE(*)(...))t_library_type, 0); rb_define_module_function (EmModule, "set_timer_quantum", (VALUE(*)(...))t_set_timer_quantum, 1); rb_define_module_function (EmModule, "set_max_timer_count", (VALUE(*)(...))t_set_max_timer_count, 1); rb_define_module_function (EmModule, "setuid_string", (VALUE(*)(...))t_setuid_string, 1); rb_define_module_function (EmModule, "invoke_popen", (VALUE(*)(...))t_invoke_popen, 1); rb_define_module_function (EmModule, "send_file_data", (VALUE(*)(...))t_send_file_data, 2); // Provisional: rb_define_module_function (EmModule, "_write_file", (VALUE(*)(...))t__write_file, 1); rb_define_module_function (EmModule, "get_peername", (VALUE(*)(...))t_get_peername, 1); rb_define_module_function (EmModule, "get_sockname", (VALUE(*)(...))t_get_sockname, 1); rb_define_module_function (EmModule, "get_subprocess_pid", (VALUE(*)(...))t_get_subprocess_pid, 1); rb_define_module_function (EmModule, "get_subprocess_status", (VALUE(*)(...))t_get_subprocess_status, 1); rb_define_module_function (EmModule, "get_comm_inactivity_timeout", (VALUE(*)(...))t_get_comm_inactivity_timeout, 1); rb_define_module_function (EmModule, "set_comm_inactivity_timeout", (VALUE(*)(...))t_set_comm_inactivity_timeout, 2); rb_define_module_function (EmModule, "set_rlimit_nofile", (VALUE(*)(...))t_set_rlimit_nofile, 1); // Temporary: rb_define_module_function (EmModule, "epoll", (VALUE(*)(...))t__epoll, 0); rb_define_module_function (EmModule, "kqueue", (VALUE(*)(...))t__kqueue, 0); rb_define_module_function (EmModule, "epoll?", (VALUE(*)(...))t__epoll_p, 0); rb_define_module_function (EmModule, "kqueue?", (VALUE(*)(...))t__kqueue_p, 0); rb_define_method (EmConnection, "get_outbound_data_size", (VALUE(*)(...))conn_get_outbound_data_size, 0); rb_define_method (EmConnection, "associate_callback_target", (VALUE(*)(...))conn_associate_callback_target, 1); rb_define_const (EmModule, "TimerFired", INT2NUM(100)); rb_define_const (EmModule, "ConnectionData", INT2NUM(101)); rb_define_const (EmModule, "ConnectionUnbound", INT2NUM(102)); rb_define_const (EmModule, "ConnectionAccepted", INT2NUM(103)); rb_define_const (EmModule, "ConnectionCompleted", INT2NUM(104)); rb_define_const (EmModule, "LoopbreakSignalled", INT2NUM(105)); rb_define_const (EmModule, "ConnectionNotifyReadable", INT2NUM(106)); rb_define_const (EmModule, "ConnectionNotifyWritable", INT2NUM(107)); } /***************************************************************************** $Id$ File: sigs.cpp Date: 06Apr06 Copyright (C) 2006-07 by Francis Cianfrocca. All Rights Reserved. Gmail: blackhedd This program is free software; you can redistribute it and/or modify it under the terms of either: 1) the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version; or 2) Ruby's License. See the file COPYING for complete licensing information. *****************************************************************************/ #include "project.h" bool gTerminateSignalReceived; /************** SigtermHandler **************/ void SigtermHandler (int sig) { // This is a signal-handler, don't do anything frisky. Interrupts are disabled. // Set the terminate flag WITHOUT trying to lock a mutex- otherwise we can easily // self-deadlock, especially if the event machine is looping quickly. gTerminateSignalReceived = true; } /********************* InstallSignalHandlers *********************/ void InstallSignalHandlers() { #ifdef OS_UNIX static bool bInstalled = false; if (!bInstalled) { bInstalled = true; signal (SIGINT, SigtermHandler); signal (SIGTERM, SigtermHandler); signal (SIGPIPE, SIG_IGN); } #endif } /******************* WintelSignalHandler *******************/ #ifdef OS_WIN32 BOOL WINAPI WintelSignalHandler (DWORD control) { if (control == CTRL_C_EVENT) gTerminateSignalReceived = true; return TRUE; } #endif /************ HookControlC ************/ #ifdef OS_WIN32 void HookControlC (bool hook) { if (hook) { // INSTALL hook SetConsoleCtrlHandler (WintelSignalHandler, TRUE); } else { // UNINSTALL hook SetConsoleCtrlHandler (WintelSignalHandler, FALSE); } } #endif /***************************************************************************** $Id$ File: ssl.cpp Date: 30Apr06 Copyright (C) 2006-07 by Francis Cianfrocca. All Rights Reserved. Gmail: blackhedd This program is free software; you can redistribute it and/or modify it under the terms of either: 1) the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version; or 2) Ruby's License. See the file COPYING for complete licensing information. *****************************************************************************/ #ifdef WITH_SSL #include "project.h" bool SslContext_t::bLibraryInitialized = false; static void InitializeDefaultCredentials(); static EVP_PKEY *DefaultPrivateKey = NULL; static X509 *DefaultCertificate = NULL; static char PrivateMaterials[] = { "-----BEGIN RSA PRIVATE KEY-----\n" "MIICXAIBAAKBgQDCYYhcw6cGRbhBVShKmbWm7UVsEoBnUf0cCh8AX+MKhMxwVDWV\n" "Igdskntn3cSJjRtmgVJHIK0lpb/FYHQB93Ohpd9/Z18pDmovfFF9nDbFF0t39hJ/\n" "AqSzFB3GiVPoFFZJEE1vJqh+3jzsSF5K56bZ6azz38VlZgXeSozNW5bXkQIDAQAB\n" "AoGALA89gIFcr6BIBo8N5fL3aNHpZXjAICtGav+kTUpuxSiaym9cAeTHuAVv8Xgk\n" "H2Wbq11uz+6JMLpkQJH/WZ7EV59DPOicXrp0Imr73F3EXBfR7t2EQDYHPMthOA1D\n" "I9EtCzvV608Ze90hiJ7E3guGrGppZfJ+eUWCPgy8CZH1vRECQQDv67rwV/oU1aDo\n" "6/+d5nqjeW6mWkGqTnUU96jXap8EIw6B+0cUKskwx6mHJv+tEMM2748ZY7b0yBlg\n" "w4KDghbFAkEAz2h8PjSJG55LwqmXih1RONSgdN9hjB12LwXL1CaDh7/lkEhq0PlK\n" "PCAUwQSdM17Sl0Xxm2CZiekTSlwmHrtqXQJAF3+8QJwtV2sRJp8u2zVe37IeH1cJ\n" "xXeHyjTzqZ2803fnjN2iuZvzNr7noOA1/Kp+pFvUZUU5/0G2Ep8zolPUjQJAFA7k\n" "xRdLkzIx3XeNQjwnmLlncyYPRv+qaE3FMpUu7zftuZBnVCJnvXzUxP3vPgKTlzGa\n" "dg5XivDRfsV+okY5uQJBAMV4FesUuLQVEKb6lMs7rzZwpeGQhFDRfywJzfom2TLn\n" "2RdJQQ3dcgnhdVDgt5o1qkmsqQh8uJrJ9SdyLIaZQIc=\n" "-----END RSA PRIVATE KEY-----\n" "-----BEGIN CERTIFICATE-----\n" "MIID6TCCA1KgAwIBAgIJANm4W/Tzs+s+MA0GCSqGSIb3DQEBBQUAMIGqMQswCQYD\n" "VQQGEwJVUzERMA8GA1UECBMITmV3IFlvcmsxETAPBgNVBAcTCE5ldyBZb3JrMRYw\n" "FAYDVQQKEw1TdGVhbWhlYXQubmV0MRQwEgYDVQQLEwtFbmdpbmVlcmluZzEdMBsG\n" "A1UEAxMUb3BlbmNhLnN0ZWFtaGVhdC5uZXQxKDAmBgkqhkiG9w0BCQEWGWVuZ2lu\n" "ZWVyaW5nQHN0ZWFtaGVhdC5uZXQwHhcNMDYwNTA1MTcwNjAzWhcNMjQwMjIwMTcw\n" "NjAzWjCBqjELMAkGA1UEBhMCVVMxETAPBgNVBAgTCE5ldyBZb3JrMREwDwYDVQQH\n" "EwhOZXcgWW9yazEWMBQGA1UEChMNU3RlYW1oZWF0Lm5ldDEUMBIGA1UECxMLRW5n\n" "aW5lZXJpbmcxHTAbBgNVBAMTFG9wZW5jYS5zdGVhbWhlYXQubmV0MSgwJgYJKoZI\n" "hvcNAQkBFhllbmdpbmVlcmluZ0BzdGVhbWhlYXQubmV0MIGfMA0GCSqGSIb3DQEB\n" "AQUAA4GNADCBiQKBgQDCYYhcw6cGRbhBVShKmbWm7UVsEoBnUf0cCh8AX+MKhMxw\n" "VDWVIgdskntn3cSJjRtmgVJHIK0lpb/FYHQB93Ohpd9/Z18pDmovfFF9nDbFF0t3\n" "9hJ/AqSzFB3GiVPoFFZJEE1vJqh+3jzsSF5K56bZ6azz38VlZgXeSozNW5bXkQID\n" "AQABo4IBEzCCAQ8wHQYDVR0OBBYEFPJvPd1Fcmd8o/Tm88r+NjYPICCkMIHfBgNV\n" "HSMEgdcwgdSAFPJvPd1Fcmd8o/Tm88r+NjYPICCkoYGwpIGtMIGqMQswCQYDVQQG\n" "EwJVUzERMA8GA1UECBMITmV3IFlvcmsxETAPBgNVBAcTCE5ldyBZb3JrMRYwFAYD\n" "VQQKEw1TdGVhbWhlYXQubmV0MRQwEgYDVQQLEwtFbmdpbmVlcmluZzEdMBsGA1UE\n" "AxMUb3BlbmNhLnN0ZWFtaGVhdC5uZXQxKDAmBgkqhkiG9w0BCQEWGWVuZ2luZWVy\n" "aW5nQHN0ZWFtaGVhdC5uZXSCCQDZuFv087PrPjAMBgNVHRMEBTADAQH/MA0GCSqG\n" "SIb3DQEBBQUAA4GBAC1CXey/4UoLgJiwcEMDxOvW74plks23090iziFIlGgcIhk0\n" "Df6hTAs7H3MWww62ddvR8l07AWfSzSP5L6mDsbvq7EmQsmPODwb6C+i2aF3EDL8j\n" "uw73m4YIGI0Zw2XdBpiOGkx2H56Kya6mJJe/5XORZedh1wpI7zki01tHYbcy\n" "-----END CERTIFICATE-----\n"}; /* These private materials were made with: * openssl req -new -x509 -keyout cakey.pem -out cacert.pem -nodes -days 6500 * TODO: We need a full-blown capability to work with user-supplied * keypairs and properly-signed certificates. */ /***************** builtin_passwd_cb *****************/ extern "C" int builtin_passwd_cb (char *buf, int bufsize, int rwflag, void *userdata) { strcpy (buf, "kittycat"); return 8; } /**************************** InitializeDefaultCredentials ****************************/ static void InitializeDefaultCredentials() { BIO *bio = BIO_new_mem_buf (PrivateMaterials, -1); assert (bio); if (DefaultPrivateKey) { // we may come here in a restart. EVP_PKEY_free (DefaultPrivateKey); DefaultPrivateKey = NULL; } PEM_read_bio_PrivateKey (bio, &DefaultPrivateKey, builtin_passwd_cb, 0); if (DefaultCertificate) { // we may come here in a restart. X509_free (DefaultCertificate); DefaultCertificate = NULL; } PEM_read_bio_X509 (bio, &DefaultCertificate, NULL, 0); BIO_free (bio); } /************************** SslContext_t::SslContext_t **************************/ SslContext_t::SslContext_t (bool is_server, const string &privkeyfile, const string &certchainfile): pCtx (NULL), PrivateKey (NULL), Certificate (NULL) { /* TODO: the usage of the specified private-key and cert-chain filenames only applies to * client-side connections at this point. Server connections currently use the default materials. * That needs to be fixed asap. * Also, in this implementation, server-side connections use statically defined X-509 defaults. * One thing I'm really not clear on is whether or not you have to explicitly free X509 and EVP_PKEY * objects when we call our destructor, or whether just calling SSL_CTX_free is enough. */ if (!bLibraryInitialized) { bLibraryInitialized = true; SSL_library_init(); OpenSSL_add_ssl_algorithms(); OpenSSL_add_all_algorithms(); SSL_load_error_strings(); ERR_load_crypto_strings(); InitializeDefaultCredentials(); } bIsServer = is_server; pCtx = SSL_CTX_new (is_server ? SSLv23_server_method() : SSLv23_client_method()); if (!pCtx) throw std::runtime_error ("no SSL context"); SSL_CTX_set_options (pCtx, SSL_OP_ALL); //SSL_CTX_set_options (pCtx, (SSL_OP_NO_SSLv2 | SSL_OP_NO_SSLv3)); if (is_server) { // The SSL_CTX calls here do NOT allocate memory. int e; if (privkeyfile.length() > 0) e = SSL_CTX_use_PrivateKey_file (pCtx, privkeyfile.c_str(), SSL_FILETYPE_PEM); else e = SSL_CTX_use_PrivateKey (pCtx, DefaultPrivateKey); assert (e > 0); if (certchainfile.length() > 0) e = SSL_CTX_use_certificate_chain_file (pCtx, certchainfile.c_str()); else e = SSL_CTX_use_certificate (pCtx, DefaultCertificate); assert (e > 0); } SSL_CTX_set_cipher_list (pCtx, "ALL:!ADH:!LOW:!EXP:!DES-CBC3-SHA:@STRENGTH"); if (is_server) { SSL_CTX_sess_set_cache_size (pCtx, 128); SSL_CTX_set_session_id_context (pCtx, (unsigned char*)"eventmachine", 12); } else { int e; if (privkeyfile.length() > 0) { e = SSL_CTX_use_PrivateKey_file (pCtx, privkeyfile.c_str(), SSL_FILETYPE_PEM); assert (e > 0); } if (certchainfile.length() > 0) { e = SSL_CTX_use_certificate_chain_file (pCtx, certchainfile.c_str()); assert (e > 0); } } } /*************************** SslContext_t::~SslContext_t ***************************/ SslContext_t::~SslContext_t() { if (pCtx) SSL_CTX_free (pCtx); if (PrivateKey) EVP_PKEY_free (PrivateKey); if (Certificate) X509_free (Certificate); } /****************** SslBox_t::SslBox_t ******************/ SslBox_t::SslBox_t (bool is_server, const string &privkeyfile, const string &certchainfile): bIsServer (is_server), pSSL (NULL), pbioRead (NULL), pbioWrite (NULL) { /* TODO someday: make it possible to re-use SSL contexts so we don't have to create * a new one every time we come here. */ Context = new SslContext_t (bIsServer, privkeyfile, certchainfile); assert (Context); pbioRead = BIO_new (BIO_s_mem()); assert (pbioRead); pbioWrite = BIO_new (BIO_s_mem()); assert (pbioWrite); pSSL = SSL_new (Context->pCtx); assert (pSSL); SSL_set_bio (pSSL, pbioRead, pbioWrite); if (!bIsServer) SSL_connect (pSSL); } /******************* SslBox_t::~SslBox_t *******************/ SslBox_t::~SslBox_t() { // Freeing pSSL will also free the associated BIOs, so DON'T free them separately. if (pSSL) { if (SSL_get_shutdown (pSSL) & SSL_RECEIVED_SHUTDOWN) SSL_shutdown (pSSL); else SSL_clear (pSSL); SSL_free (pSSL); } delete Context; } /*********************** SslBox_t::PutCiphertext ***********************/ bool SslBox_t::PutCiphertext (const char *buf, int bufsize) { assert (buf && (bufsize > 0)); assert (pbioRead); int n = BIO_write (pbioRead, buf, bufsize); return (n == bufsize) ? true : false; } /********************** SslBox_t::GetPlaintext **********************/ int SslBox_t::GetPlaintext (char *buf, int bufsize) { if (!SSL_is_init_finished (pSSL)) { int e = bIsServer ? SSL_accept (pSSL) : SSL_connect (pSSL); if (e < 0) { int er = SSL_get_error (pSSL, e); if (er != SSL_ERROR_WANT_READ) { // Return -1 for a nonfatal error, -2 for an error that should force the connection down. return (er == SSL_ERROR_SSL) ? (-2) : (-1); } else return 0; } // If handshake finished, FALL THROUGH and return the available plaintext. } if (!SSL_is_init_finished (pSSL)) { // We can get here if a browser abandons a handshake. // The user can see a warning dialog and abort the connection. cerr << ""; return 0; } //cerr << "CIPH: " << SSL_get_cipher (pSSL) << endl; int n = SSL_read (pSSL, buf, bufsize); if (n >= 0) { return n; } else { if (SSL_get_error (pSSL, n) == SSL_ERROR_WANT_READ) { return 0; } else { return -1; } } return 0; } /************************** SslBox_t::CanGetCiphertext **************************/ bool SslBox_t::CanGetCiphertext() { assert (pbioWrite); return BIO_pending (pbioWrite) ? true : false; } /*********************** SslBox_t::GetCiphertext ***********************/ int SslBox_t::GetCiphertext (char *buf, int bufsize) { assert (pbioWrite); assert (buf && (bufsize > 0)); return BIO_read (pbioWrite, buf, bufsize); } /********************** SslBox_t::PutPlaintext **********************/ int SslBox_t::PutPlaintext (const char *buf, int bufsize) { // The caller will interpret the return value as the number of bytes written. // WARNING WARNING WARNING, are there any situations in which a 0 or -1 return // from SSL_write means we should immediately retry? The socket-machine loop // will probably wait for a time-out cycle (perhaps a second) before re-trying. // THIS WOULD CAUSE A PERCEPTIBLE DELAY! /* We internally queue any outbound plaintext that can't be dispatched * because we're in the middle of a handshake or something. * When we get called, try to send any queued data first, and then * send the caller's data (or queue it). We may get called with no outbound * data, which means we try to send the outbound queue and that's all. * * Return >0 if we wrote any data, 0 if we didn't, and <0 for a fatal error. * Note that if we return 0, the connection is still considered live * and we are signalling that we have accepted the outbound data (if any). */ OutboundQ.Push (buf, bufsize); if (!SSL_is_init_finished (pSSL)) return 0; bool fatal = false; bool did_work = false; while (OutboundQ.HasPages()) { const char *page; int length; OutboundQ.Front (&page, &length); assert (page && (length > 0)); int n = SSL_write (pSSL, page, length); if (n > 0) { did_work = true; OutboundQ.PopFront(); } else { int er = SSL_get_error (pSSL, n); if ((er != SSL_ERROR_WANT_READ) && (er != SSL_ERROR_WANT_WRITE)) fatal = true; break; } } if (did_work) return 1; else if (fatal) return -1; else return 0; } #endif // WITH_SSL