diff options
Diffstat (limited to 'test/scanners/cpp/eventmachine.in.cpp')
-rw-r--r-- | test/scanners/cpp/eventmachine.in.cpp | 7035 |
1 files changed, 0 insertions, 7035 deletions
diff --git a/test/scanners/cpp/eventmachine.in.cpp b/test/scanners/cpp/eventmachine.in.cpp deleted file mode 100644 index 050d601..0000000 --- a/test/scanners/cpp/eventmachine.in.cpp +++ /dev/null @@ -1,7035 +0,0 @@ -/***************************************************************************** - -$Id$ - -File: binder.cpp -Date: 07Apr06 - -Copyright (C) 2006-07 by Francis Cianfrocca. All Rights Reserved. -Gmail: blackhedd - -This program is free software; you can redistribute it and/or modify -it under the terms of either: 1) the GNU General Public License -as published by the Free Software Foundation; either version 2 of the -License, or (at your option) any later version; or 2) Ruby's License. - -See the file COPYING for complete licensing information. - -*****************************************************************************/ - -#include "project.h" - -#define DEV_URANDOM "/dev/urandom" - - -map<string, Bindable_t*> Bindable_t::BindingBag; - - -/******************************** -STATIC Bindable_t::CreateBinding -********************************/ - -string Bindable_t::CreateBinding() -{ - static int index = 0; - static string seed; - - if ((index >= 1000000) || (seed.length() == 0)) { - #ifdef OS_UNIX - int fd = open (DEV_URANDOM, O_RDONLY); - if (fd < 0) - throw std::runtime_error ("No entropy device"); - - unsigned char u[16]; - size_t r = read (fd, u, sizeof(u)); - if (r < sizeof(u)) - throw std::runtime_error ("Unable to read entropy device"); - - unsigned char *u1 = (unsigned char*)u; - char u2 [sizeof(u) * 2 + 1]; - - for (size_t i=0; i < sizeof(u); i++) - sprintf (u2 + (i * 2), "%02x", u1[i]); - - seed = string (u2); - #endif - - - #ifdef OS_WIN32 - UUID uuid; - UuidCreate (&uuid); - unsigned char *uuidstring = NULL; - UuidToString (&uuid, &uuidstring); - if (!uuidstring) - throw std::runtime_error ("Unable to read uuid"); - seed = string ((const char*)uuidstring); - - RpcStringFree (&uuidstring); - #endif - - index = 0; - - - } - - stringstream ss; - ss << seed << (++index); - return ss.str(); -} - - -/***************************** -STATIC: Bindable_t::GetObject -*****************************/ - -Bindable_t *Bindable_t::GetObject (const char *binding) -{ - string s (binding ? binding : ""); - return GetObject (s); -} - -/***************************** -STATIC: Bindable_t::GetObject -*****************************/ - -Bindable_t *Bindable_t::GetObject (const string &binding) -{ - map<string, Bindable_t*>::const_iterator i = BindingBag.find (binding); - if (i != BindingBag.end()) - return i->second; - else - return NULL; -} - - -/********************** -Bindable_t::Bindable_t -**********************/ - -Bindable_t::Bindable_t() -{ - Binding = Bindable_t::CreateBinding(); - BindingBag [Binding] = this; -} - - - -/*********************** -Bindable_t::~Bindable_t -***********************/ - -Bindable_t::~Bindable_t() -{ - BindingBag.erase (Binding); -} - - -/***************************************************************************** - -$Id$ - -File: cmain.cpp -Date: 06Apr06 - -Copyright (C) 2006-07 by Francis Cianfrocca. All Rights Reserved. -Gmail: blackhedd - -This program is free software; you can redistribute it and/or modify -it under the terms of either: 1) the GNU General Public License -as published by the Free Software Foundation; either version 2 of the -License, or (at your option) any later version; or 2) Ruby's License. - -See the file COPYING for complete licensing information. - -*****************************************************************************/ - -#include "project.h" - - -static EventMachine_t *EventMachine; -static int bUseEpoll = 0; -static int bUseKqueue = 0; - -extern "C" void ensure_eventmachine (const char *caller = "unknown caller") -{ - if (!EventMachine) { - const int err_size = 128; - char err_string[err_size]; - snprintf (err_string, err_size, "eventmachine not initialized: %s", caller); - #ifdef BUILD_FOR_RUBY - rb_raise(rb_eRuntimeError, err_string); - #else - throw std::runtime_error (err_string); - #endif - } -} - -/*********************** -evma_initialize_library -***********************/ - -extern "C" void evma_initialize_library (void(*cb)(const char*, int, const char*, int)) -{ - // Probably a bad idea to mess with the signal mask of a process - // we're just being linked into. - //InstallSignalHandlers(); - if (EventMachine) - #ifdef BUILD_FOR_RUBY - rb_raise(rb_eRuntimeError, "eventmachine already initialized: evma_initialize_library"); - #else - throw std::runtime_error ("eventmachine already initialized: evma_initialize_library"); - #endif - EventMachine = new EventMachine_t (cb); - if (bUseEpoll) - EventMachine->_UseEpoll(); - if (bUseKqueue) - EventMachine->_UseKqueue(); -} - - -/******************** -evma_release_library -********************/ - -extern "C" void evma_release_library() -{ - ensure_eventmachine("evma_release_library"); - delete EventMachine; - EventMachine = NULL; -} - - -/**************** -evma_run_machine -****************/ - -extern "C" void evma_run_machine() -{ - ensure_eventmachine("evma_run_machine"); - EventMachine->Run(); -} - - -/************************** -evma_install_oneshot_timer -**************************/ - -extern "C" const char *evma_install_oneshot_timer (int seconds) -{ - ensure_eventmachine("evma_install_oneshot_timer"); - return EventMachine->InstallOneshotTimer (seconds); -} - - -/********************** -evma_connect_to_server -**********************/ - -extern "C" const char *evma_connect_to_server (const char *server, int port) -{ - ensure_eventmachine("evma_connect_to_server"); - return EventMachine->ConnectToServer (server, port); -} - -/*************************** -evma_connect_to_unix_server -***************************/ - -extern "C" const char *evma_connect_to_unix_server (const char *server) -{ - ensure_eventmachine("evma_connect_to_unix_server"); - return EventMachine->ConnectToUnixServer (server); -} - -/************** -evma_attach_fd -**************/ - -extern "C" const char *evma_attach_fd (int file_descriptor, int notify_readable, int notify_writable) -{ - ensure_eventmachine("evma_attach_fd"); - return EventMachine->AttachFD (file_descriptor, (notify_readable ? true : false), (notify_writable ? true : false)); -} - -/************** -evma_detach_fd -**************/ - -extern "C" int evma_detach_fd (const char *binding) -{ - ensure_eventmachine("evma_dettach_fd"); - EventableDescriptor *ed = dynamic_cast <EventableDescriptor*> (Bindable_t::GetObject (binding)); - if (ed) - return EventMachine->DetachFD (ed); - else - #ifdef BUILD_FOR_RUBY - rb_raise(rb_eRuntimeError, "invalid binding to detach"); - #else - throw std::runtime_error ("invalid binding to detach"); - #endif -} - -/********************** -evma_create_tcp_server -**********************/ - -extern "C" const char *evma_create_tcp_server (const char *address, int port) -{ - ensure_eventmachine("evma_create_tcp_server"); - return EventMachine->CreateTcpServer (address, port); -} - -/****************************** -evma_create_unix_domain_server -******************************/ - -extern "C" const char *evma_create_unix_domain_server (const char *filename) -{ - ensure_eventmachine("evma_create_unix_domain_server"); - return EventMachine->CreateUnixDomainServer (filename); -} - -/************************* -evma_open_datagram_socket -*************************/ - -extern "C" const char *evma_open_datagram_socket (const char *address, int port) -{ - ensure_eventmachine("evma_open_datagram_socket"); - return EventMachine->OpenDatagramSocket (address, port); -} - -/****************** -evma_open_keyboard -******************/ - -extern "C" const char *evma_open_keyboard() -{ - ensure_eventmachine("evma_open_keyboard"); - return EventMachine->OpenKeyboard(); -} - - - -/**************************** -evma_send_data_to_connection -****************************/ - -extern "C" int evma_send_data_to_connection (const char *binding, const char *data, int data_length) -{ - ensure_eventmachine("evma_send_data_to_connection"); - return ConnectionDescriptor::SendDataToConnection (binding, data, data_length); -} - -/****************** -evma_send_datagram -******************/ - -extern "C" int evma_send_datagram (const char *binding, const char *data, int data_length, const char *address, int port) -{ - ensure_eventmachine("evma_send_datagram"); - return DatagramDescriptor::SendDatagram (binding, data, data_length, address, port); -} - - -/********************* -evma_close_connection -*********************/ - -extern "C" void evma_close_connection (const char *binding, int after_writing) -{ - ensure_eventmachine("evma_close_connection"); - ConnectionDescriptor::CloseConnection (binding, (after_writing ? true : false)); -} - -/*********************************** -evma_report_connection_error_status -***********************************/ - -extern "C" int evma_report_connection_error_status (const char *binding) -{ - ensure_eventmachine("evma_report_connection_error_status"); - return ConnectionDescriptor::ReportErrorStatus (binding); -} - -/******************** -evma_stop_tcp_server -********************/ - -extern "C" void evma_stop_tcp_server (const char *binding) -{ - ensure_eventmachine("evma_stop_tcp_server"); - AcceptorDescriptor::StopAcceptor (binding); -} - - -/***************** -evma_stop_machine -*****************/ - -extern "C" void evma_stop_machine() -{ - ensure_eventmachine("evma_stop_machine"); - EventMachine->ScheduleHalt(); -} - - -/************** -evma_start_tls -**************/ - -extern "C" void evma_start_tls (const char *binding) -{ - ensure_eventmachine("evma_start_tls"); - EventableDescriptor *ed = dynamic_cast <EventableDescriptor*> (Bindable_t::GetObject (binding)); - if (ed) - ed->StartTls(); -} - -/****************** -evma_set_tls_parms -******************/ - -extern "C" void evma_set_tls_parms (const char *binding, const char *privatekey_filename, const char *certchain_filename) -{ - ensure_eventmachine("evma_set_tls_parms"); - EventableDescriptor *ed = dynamic_cast <EventableDescriptor*> (Bindable_t::GetObject (binding)); - if (ed) - ed->SetTlsParms (privatekey_filename, certchain_filename); -} - - -/***************** -evma_get_peername -*****************/ - -extern "C" int evma_get_peername (const char *binding, struct sockaddr *sa) -{ - ensure_eventmachine("evma_get_peername"); - EventableDescriptor *ed = dynamic_cast <EventableDescriptor*> (Bindable_t::GetObject (binding)); - if (ed) { - return ed->GetPeername (sa) ? 1 : 0; - } - else - return 0; -} - -/***************** -evma_get_sockname -*****************/ - -extern "C" int evma_get_sockname (const char *binding, struct sockaddr *sa) -{ - ensure_eventmachine("evma_get_sockname"); - EventableDescriptor *ed = dynamic_cast <EventableDescriptor*> (Bindable_t::GetObject (binding)); - if (ed) { - return ed->GetSockname (sa) ? 1 : 0; - } - else - return 0; -} - -/*********************** -evma_get_subprocess_pid -***********************/ - -extern "C" int evma_get_subprocess_pid (const char *binding, pid_t *pid) -{ - ensure_eventmachine("evma_get_subprocess_pid"); - EventableDescriptor *ed = dynamic_cast <EventableDescriptor*> (Bindable_t::GetObject (binding)); - if (ed) { - return ed->GetSubprocessPid (pid) ? 1 : 0; - } - else - return 0; -} - -/************************** -evma_get_subprocess_status -**************************/ - -extern "C" int evma_get_subprocess_status (const char *binding, int *status) -{ - ensure_eventmachine("evma_get_subprocess_status"); - if (status) { - *status = EventMachine->SubprocessExitStatus; - return 1; - } - else - return 0; -} - - -/********************* -evma_signal_loopbreak -*********************/ - -extern "C" void evma_signal_loopbreak() -{ - ensure_eventmachine("evma_signal_loopbreak"); - EventMachine->SignalLoopBreaker(); -} - - - -/**************** -evma__write_file -****************/ - -extern "C" const char *evma__write_file (const char *filename) -{ - ensure_eventmachine("evma__write_file"); - return EventMachine->_OpenFileForWriting (filename); -} - - -/******************************** -evma_get_comm_inactivity_timeout -********************************/ - -extern "C" int evma_get_comm_inactivity_timeout (const char *binding, int *value) -{ - ensure_eventmachine("evma_get_comm_inactivity_timeout"); - EventableDescriptor *ed = dynamic_cast <EventableDescriptor*> (Bindable_t::GetObject (binding)); - if (ed) { - return ed->GetCommInactivityTimeout (value); - } - else - return 0; //Perhaps this should be an exception. Access to an unknown binding. -} - -/******************************** -evma_set_comm_inactivity_timeout -********************************/ - -extern "C" int evma_set_comm_inactivity_timeout (const char *binding, int *value) -{ - ensure_eventmachine("evma_set_comm_inactivity_timeout"); - EventableDescriptor *ed = dynamic_cast <EventableDescriptor*> (Bindable_t::GetObject (binding)); - if (ed) { - return ed->SetCommInactivityTimeout (value); - } - else - return 0; //Perhaps this should be an exception. Access to an unknown binding. -} - - -/********************** -evma_set_timer_quantum -**********************/ - -extern "C" void evma_set_timer_quantum (int interval) -{ - ensure_eventmachine("evma_set_timer_quantum"); - EventMachine->SetTimerQuantum (interval); -} - -/************************ -evma_set_max_timer_count -************************/ - -extern "C" void evma_set_max_timer_count (int ct) -{ - // This may only be called if the reactor is not running. - - if (EventMachine) - #ifdef BUILD_FOR_RUBY - rb_raise(rb_eRuntimeError, "eventmachine already initialized: evma_set_max_timer_count"); - #else - throw std::runtime_error ("eventmachine already initialized: evma_set_max_timer_count"); - #endif - EventMachine_t::SetMaxTimerCount (ct); -} - -/****************** -evma_setuid_string -******************/ - -extern "C" void evma_setuid_string (const char *username) -{ - // We do NOT need to be running an EM instance because this method is static. - EventMachine_t::SetuidString (username); -} - - -/********** -evma_popen -**********/ - -extern "C" const char *evma_popen (char * const*cmd_strings) -{ - ensure_eventmachine("evma_popen"); - return EventMachine->Socketpair (cmd_strings); -} - - -/*************************** -evma_get_outbound_data_size -***************************/ - -extern "C" int evma_get_outbound_data_size (const char *binding) -{ - ensure_eventmachine("evma_get_outbound_data_size"); - EventableDescriptor *ed = dynamic_cast <EventableDescriptor*> (Bindable_t::GetObject (binding)); - return ed ? ed->GetOutboundDataSize() : 0; -} - - -/*********** -evma__epoll -***********/ - -extern "C" void evma__epoll() -{ - bUseEpoll = 1; -} - -/************ -evma__kqueue -************/ - -extern "C" void evma__kqueue() -{ - bUseKqueue = 1; -} - - -/********************** -evma_set_rlimit_nofile -**********************/ - -extern "C" int evma_set_rlimit_nofile (int nofiles) -{ - return EventMachine_t::SetRlimitNofile (nofiles); -} - - -/********************************* -evma_send_file_data_to_connection -*********************************/ - -extern "C" int evma_send_file_data_to_connection (const char *binding, const char *filename) -{ - /* This is a sugaring over send_data_to_connection that reads a file into a - * locally-allocated buffer, and sends the file data to the remote peer. - * Return the number of bytes written to the caller. - * TODO, needs to impose a limit on the file size. This is intended only for - * small files. (I don't know, maybe 8K or less.) For larger files, use interleaved - * I/O to avoid slowing the rest of the system down. - * TODO: we should return a code rather than barf, in case of file-not-found. - * TODO, does this compile on Windows? - * TODO, given that we want this to work only with small files, how about allocating - * the buffer on the stack rather than the heap? - * - * Modified 25Jul07. This now returns -1 on file-too-large; 0 for success, and a positive - * errno in case of other errors. - * - /* Contributed by Kirk Haines. - */ - - char data[32*1024]; - int r; - - ensure_eventmachine("evma_send_file_data_to_connection"); - - int Fd = open (filename, O_RDONLY); - - if (Fd < 0) - return errno; - // From here on, all early returns MUST close Fd. - - struct stat st; - if (fstat (Fd, &st)) { - int e = errno; - close (Fd); - return e; - } - - int filesize = st.st_size; - if (filesize <= 0) { - close (Fd); - return 0; - } - else if (filesize > sizeof(data)) { - close (Fd); - return -1; - } - - - r = read (Fd, data, filesize); - if (r != filesize) { - int e = errno; - close (Fd); - return e; - } - evma_send_data_to_connection (binding, data, r); - close (Fd); - - return 0; -} - -/***************************************************************************** - -$Id$ - -File: cplusplus.cpp -Date: 27Jul07 - -Copyright (C) 2006-07 by Francis Cianfrocca. All Rights Reserved. -Gmail: blackhedd - -This program is free software; you can redistribute it and/or modify -it under the terms of either: 1) the GNU General Public License -as published by the Free Software Foundation; either version 2 of the -License, or (at your option) any later version; or 2) Ruby's License. - -See the file COPYING for complete licensing information. - -*****************************************************************************/ - - -#include "project.h" - - -namespace EM { - static map<string, Eventable*> Eventables; - static map<string, void(*)()> Timers; -} - - -/******* -EM::Run -*******/ - -void EM::Run (void (*start_func)()) -{ - evma__epoll(); - evma_initialize_library (EM::Callback); - if (start_func) - AddTimer (0, start_func); - evma_run_machine(); - evma_release_library(); -} - -/************ -EM::AddTimer -************/ - -void EM::AddTimer (int milliseconds, void (*func)()) -{ - if (func) { - const char *sig = evma_install_oneshot_timer (milliseconds); - Timers.insert (make_pair (sig, func)); - } -} - - -/*************** -EM::StopReactor -***************/ - -void EM::StopReactor() -{ - evma_stop_machine(); -} - - -/******************** -EM::Acceptor::Accept -********************/ - -void EM::Acceptor::Accept (const char *signature) -{ - Connection *c = MakeConnection(); - c->Signature = signature; - Eventables.insert (make_pair (c->Signature, c)); - c->PostInit(); -} - -/************************ -EM::Connection::SendData -************************/ - -void EM::Connection::SendData (const char *data) -{ - if (data) - SendData (data, strlen (data)); -} - - -/************************ -EM::Connection::SendData -************************/ - -void EM::Connection::SendData (const char *data, int length) -{ - evma_send_data_to_connection (Signature.c_str(), data, length); -} - - -/********************* -EM::Connection::Close -*********************/ - -void EM::Connection::Close (bool afterWriting) -{ - evma_close_connection (Signature.c_str(), afterWriting); -} - - -/*********************** -EM::Connection::Connect -***********************/ - -void EM::Connection::Connect (const char *host, int port) -{ - Signature = evma_connect_to_server (host, port); - Eventables.insert( make_pair (Signature, this)); -} - -/******************* -EM::Acceptor::Start -*******************/ - -void EM::Acceptor::Start (const char *host, int port) -{ - Signature = evma_create_tcp_server (host, port); - Eventables.insert( make_pair (Signature, this)); -} - - - -/************ -EM::Callback -************/ - -void EM::Callback (const char *sig, int ev, const char *data, int length) -{ - EM::Eventable *e; - void (*f)(); - - switch (ev) { - case EM_TIMER_FIRED: - f = Timers [data]; - if (f) - (*f)(); - Timers.erase (sig); - break; - - case EM_CONNECTION_READ: - e = EM::Eventables [sig]; - e->ReceiveData (data, length); - break; - - case EM_CONNECTION_COMPLETED: - e = EM::Eventables [sig]; - e->ConnectionCompleted(); - break; - - case EM_CONNECTION_ACCEPTED: - e = EM::Eventables [sig]; - e->Accept (data); - break; - - case EM_CONNECTION_UNBOUND: - e = EM::Eventables [sig]; - e->Unbind(); - EM::Eventables.erase (sig); - delete e; - break; - } -} - -/***************************************************************************** - -$Id$ - -File: ed.cpp -Date: 06Apr06 - -Copyright (C) 2006-07 by Francis Cianfrocca. All Rights Reserved. -Gmail: blackhedd - -This program is free software; you can redistribute it and/or modify -it under the terms of either: 1) the GNU General Public License -as published by the Free Software Foundation; either version 2 of the -License, or (at your option) any later version; or 2) Ruby's License. - -See the file COPYING for complete licensing information. - -*****************************************************************************/ - -#include "project.h" - - - -/******************** -SetSocketNonblocking -********************/ - -bool SetSocketNonblocking (SOCKET sd) -{ - #ifdef OS_UNIX - int val = fcntl (sd, F_GETFL, 0); - return (fcntl (sd, F_SETFL, val | O_NONBLOCK) != SOCKET_ERROR) ? true : false; - #endif - - #ifdef OS_WIN32 - unsigned long one = 1; - return (ioctlsocket (sd, FIONBIO, &one) == 0) ? true : false; - #endif -} - - -/**************************************** -EventableDescriptor::EventableDescriptor -****************************************/ - -EventableDescriptor::EventableDescriptor (int sd, EventMachine_t *em): - bCloseNow (false), - bCloseAfterWriting (false), - MySocket (sd), - EventCallback (NULL), - LastRead (0), - LastWritten (0), - bCallbackUnbind (true), - MyEventMachine (em) -{ - /* There are three ways to close a socket, all of which should - * automatically signal to the event machine that this object - * should be removed from the polling scheduler. - * First is a hard close, intended for bad errors or possible - * security violations. It immediately closes the connection - * and puts this object into an error state. - * Second is to set bCloseNow, which will cause the event machine - * to delete this object (and thus close the connection in our - * destructor) the next chance it gets. bCloseNow also inhibits - * the writing of new data on the socket (but not necessarily - * the reading of new data). - * The third way is to set bCloseAfterWriting, which inhibits - * the writing of new data and converts to bCloseNow as soon - * as everything in the outbound queue has been written. - * bCloseAfterWriting is really for use only by protocol handlers - * (for example, HTTP writes an HTML page and then closes the - * connection). All of the error states we generate internally - * cause an immediate close to be scheduled, which may have the - * effect of discarding outbound data. - */ - - if (sd == INVALID_SOCKET) - throw std::runtime_error ("bad eventable descriptor"); - if (MyEventMachine == NULL) - throw std::runtime_error ("bad em in eventable descriptor"); - CreatedAt = gCurrentLoopTime; - - #ifdef HAVE_EPOLL - EpollEvent.data.ptr = this; - #endif -} - - -/***************************************** -EventableDescriptor::~EventableDescriptor -*****************************************/ - -EventableDescriptor::~EventableDescriptor() -{ - if (EventCallback && bCallbackUnbind) - (*EventCallback)(GetBinding().c_str(), EM_CONNECTION_UNBOUND, NULL, 0); - Close(); -} - - -/************************************* -EventableDescriptor::SetEventCallback -*************************************/ - -void EventableDescriptor::SetEventCallback (void(*cb)(const char*, int, const char*, int)) -{ - EventCallback = cb; -} - - -/************************** -EventableDescriptor::Close -**************************/ - -void EventableDescriptor::Close() -{ - // Close the socket right now. Intended for emergencies. - if (MySocket != INVALID_SOCKET) { - shutdown (MySocket, 1); - closesocket (MySocket); - MySocket = INVALID_SOCKET; - } -} - - -/********************************* -EventableDescriptor::ShouldDelete -*********************************/ - -bool EventableDescriptor::ShouldDelete() -{ - /* For use by a socket manager, which needs to know if this object - * should be removed from scheduling events and deleted. - * Has an immediate close been scheduled, or are we already closed? - * If either of these are the case, return true. In theory, the manager will - * then delete us, which in turn will make sure the socket is closed. - * Note, if bCloseAfterWriting is true, we check a virtual method to see - * if there is outbound data to write, and only request a close if there is none. - */ - - return ((MySocket == INVALID_SOCKET) || bCloseNow || (bCloseAfterWriting && (GetOutboundDataSize() <= 0))); -} - - -/********************************** -EventableDescriptor::ScheduleClose -**********************************/ - -void EventableDescriptor::ScheduleClose (bool after_writing) -{ - // KEEP THIS SYNCHRONIZED WITH ::IsCloseScheduled. - if (after_writing) - bCloseAfterWriting = true; - else - bCloseNow = true; -} - - -/************************************* -EventableDescriptor::IsCloseScheduled -*************************************/ - -bool EventableDescriptor::IsCloseScheduled() -{ - // KEEP THIS SYNCHRONIZED WITH ::ScheduleClose. - return (bCloseNow || bCloseAfterWriting); -} - - -/****************************************** -ConnectionDescriptor::ConnectionDescriptor -******************************************/ - -ConnectionDescriptor::ConnectionDescriptor (int sd, EventMachine_t *em): - EventableDescriptor (sd, em), - bConnectPending (false), - bNotifyReadable (false), - bNotifyWritable (false), - bReadAttemptedAfterClose (false), - bWriteAttemptedAfterClose (false), - OutboundDataSize (0), - #ifdef WITH_SSL - SslBox (NULL), - #endif - bIsServer (false), - LastIo (gCurrentLoopTime), - InactivityTimeout (0) -{ - #ifdef HAVE_EPOLL - EpollEvent.events = EPOLLOUT; - #endif - // 22Jan09: Moved ArmKqueueWriter into SetConnectPending() to fix assertion failure in _WriteOutboundData() -} - - -/******************************************* -ConnectionDescriptor::~ConnectionDescriptor -*******************************************/ - -ConnectionDescriptor::~ConnectionDescriptor() -{ - // Run down any stranded outbound data. - for (size_t i=0; i < OutboundPages.size(); i++) - OutboundPages[i].Free(); - - #ifdef WITH_SSL - if (SslBox) - delete SslBox; - #endif -} - - -/************************************************** -STATIC: ConnectionDescriptor::SendDataToConnection -**************************************************/ - -int ConnectionDescriptor::SendDataToConnection (const char *binding, const char *data, int data_length) -{ - // TODO: This is something of a hack, or at least it's a static method of the wrong class. - // TODO: Poor polymorphism here. We should be calling one virtual method - // instead of hacking out the runtime information of the target object. - ConnectionDescriptor *cd = dynamic_cast <ConnectionDescriptor*> (Bindable_t::GetObject (binding)); - if (cd) - return cd->SendOutboundData (data, data_length); - DatagramDescriptor *ds = dynamic_cast <DatagramDescriptor*> (Bindable_t::GetObject (binding)); - if (ds) - return ds->SendOutboundData (data, data_length); - #ifdef OS_UNIX - PipeDescriptor *ps = dynamic_cast <PipeDescriptor*> (Bindable_t::GetObject (binding)); - if (ps) - return ps->SendOutboundData (data, data_length); - #endif - return -1; -} - - -/********************************************* -STATIC: ConnectionDescriptor::CloseConnection -*********************************************/ - -void ConnectionDescriptor::CloseConnection (const char *binding, bool after_writing) -{ - // TODO: This is something of a hack, or at least it's a static method of the wrong class. - EventableDescriptor *ed = dynamic_cast <EventableDescriptor*> (Bindable_t::GetObject (binding)); - if (ed) - ed->ScheduleClose (after_writing); -} - -/*********************************************** -STATIC: ConnectionDescriptor::ReportErrorStatus -***********************************************/ - -int ConnectionDescriptor::ReportErrorStatus (const char *binding) -{ - // TODO: This is something of a hack, or at least it's a static method of the wrong class. - // TODO: Poor polymorphism here. We should be calling one virtual method - // instead of hacking out the runtime information of the target object. - ConnectionDescriptor *cd = dynamic_cast <ConnectionDescriptor*> (Bindable_t::GetObject (binding)); - if (cd) - return cd->_ReportErrorStatus(); - return -1; -} - -/*************************************** -ConnectionDescriptor::SetConnectPending -****************************************/ - -void ConnectionDescriptor::SetConnectPending(bool f) -{ - bConnectPending = f; - #ifdef HAVE_KQUEUE - MyEventMachine->ArmKqueueWriter (this); - #endif -} - - -/************************************** -ConnectionDescriptor::SendOutboundData -**************************************/ - -int ConnectionDescriptor::SendOutboundData (const char *data, int length) -{ - #ifdef WITH_SSL - if (SslBox) { - if (length > 0) { - int w = SslBox->PutPlaintext (data, length); - if (w < 0) - ScheduleClose (false); - else - _DispatchCiphertext(); - } - // TODO: What's the correct return value? - return 1; // That's a wild guess, almost certainly wrong. - } - else - #endif - return _SendRawOutboundData (data, length); -} - - - -/****************************************** -ConnectionDescriptor::_SendRawOutboundData -******************************************/ - -int ConnectionDescriptor::_SendRawOutboundData (const char *data, int length) -{ - /* This internal method is called to schedule bytes that - * will be sent out to the remote peer. - * It's not directly accessed by the caller, who hits ::SendOutboundData, - * which may or may not filter or encrypt the caller's data before - * sending it here. - */ - - // Highly naive and incomplete implementation. - // There's no throttle for runaways (which should abort only this connection - // and not the whole process), and no coalescing of small pages. - // (Well, not so bad, small pages are coalesced in ::Write) - - if (IsCloseScheduled()) - //if (bCloseNow || bCloseAfterWriting) - return 0; - - if (!data && (length > 0)) - throw std::runtime_error ("bad outbound data"); - char *buffer = (char *) malloc (length + 1); - if (!buffer) - throw std::runtime_error ("no allocation for outbound data"); - memcpy (buffer, data, length); - buffer [length] = 0; - OutboundPages.push_back (OutboundPage (buffer, length)); - OutboundDataSize += length; - #ifdef HAVE_EPOLL - EpollEvent.events = (EPOLLIN | EPOLLOUT); - assert (MyEventMachine); - MyEventMachine->Modify (this); - #endif - #ifdef HAVE_KQUEUE - MyEventMachine->ArmKqueueWriter (this); - #endif - return length; -} - - - -/*********************************** -ConnectionDescriptor::SelectForRead -***********************************/ - -bool ConnectionDescriptor::SelectForRead() -{ - /* A connection descriptor is always scheduled for read, - * UNLESS it's in a pending-connect state. - * On Linux, unlike Unix, a nonblocking socket on which - * connect has been called, does NOT necessarily select - * both readable and writable in case of error. - * The socket will select writable when the disposition - * of the connect is known. On the other hand, a socket - * which successfully connects and selects writable may - * indeed have some data available on it, so it will - * select readable in that case, violating expectations! - * So we will not poll for readability until the socket - * is known to be in a connected state. - */ - - return bConnectPending ? false : true; -} - - -/************************************ -ConnectionDescriptor::SelectForWrite -************************************/ - -bool ConnectionDescriptor::SelectForWrite() -{ - /* Cf the notes under SelectForRead. - * In a pending-connect state, we ALWAYS select for writable. - * In a normal state, we only select for writable when we - * have outgoing data to send. - */ - - if (bConnectPending || bNotifyWritable) - return true; - else { - return (GetOutboundDataSize() > 0); - } -} - - -/************************** -ConnectionDescriptor::Read -**************************/ - -void ConnectionDescriptor::Read() -{ - /* Read and dispatch data on a socket that has selected readable. - * It's theoretically possible to get and dispatch incoming data on - * a socket that has already been scheduled for closing or close-after-writing. - * In those cases, we'll leave it up the to protocol handler to "do the - * right thing" (which probably means to ignore the incoming data). - * - * 22Aug06: Chris Ochs reports that on FreeBSD, it's possible to come - * here with the socket already closed, after the process receives - * a ctrl-C signal (not sure if that's TERM or INT on BSD). The application - * was one in which network connections were doing a lot of interleaved reads - * and writes. - * Since we always write before reading (in order to keep the outbound queues - * as light as possible), I think what happened is that an interrupt caused - * the socket to be closed in ConnectionDescriptor::Write. We'll then - * come here in the same pass through the main event loop, and won't get - * cleaned up until immediately after. - * We originally asserted that the socket was valid when we got here. - * To deal properly with the possibility that we are closed when we get here, - * I removed the assert. HOWEVER, the potential for an infinite loop scares me, - * so even though this is really clunky, I added a flag to assert that we never - * come here more than once after being closed. (FCianfrocca) - */ - - int sd = GetSocket(); - //assert (sd != INVALID_SOCKET); (original, removed 22Aug06) - if (sd == INVALID_SOCKET) { - assert (!bReadAttemptedAfterClose); - bReadAttemptedAfterClose = true; - return; - } - - if (bNotifyReadable) { - if (EventCallback) - (*EventCallback)(GetBinding().c_str(), EM_CONNECTION_NOTIFY_READABLE, NULL, 0); - return; - } - - LastIo = gCurrentLoopTime; - - int total_bytes_read = 0; - char readbuffer [16 * 1024 + 1]; - - for (int i=0; i < 10; i++) { - // Don't read just one buffer and then move on. This is faster - // if there is a lot of incoming. - // But don't read indefinitely. Give other sockets a chance to run. - // NOTICE, we're reading one less than the buffer size. - // That's so we can put a guard byte at the end of what we send - // to user code. - - - int r = recv (sd, readbuffer, sizeof(readbuffer) - 1, 0); - //cerr << "<R:" << r << ">"; - - if (r > 0) { - total_bytes_read += r; - LastRead = gCurrentLoopTime; - - // Add a null-terminator at the the end of the buffer - // that we will send to the callback. - // DO NOT EVER CHANGE THIS. We want to explicitly allow users - // to be able to depend on this behavior, so they will have - // the option to do some things faster. Additionally it's - // a security guard against buffer overflows. - readbuffer [r] = 0; - _DispatchInboundData (readbuffer, r); - } - else if (r == 0) { - break; - } - else { - // Basically a would-block, meaning we've read everything there is to read. - break; - } - - } - - - if (total_bytes_read == 0) { - // If we read no data on a socket that selected readable, - // it generally means the other end closed the connection gracefully. - ScheduleClose (false); - //bCloseNow = true; - } - -} - - - -/****************************************** -ConnectionDescriptor::_DispatchInboundData -******************************************/ - -void ConnectionDescriptor::_DispatchInboundData (const char *buffer, int size) -{ - #ifdef WITH_SSL - if (SslBox) { - SslBox->PutCiphertext (buffer, size); - - int s; - char B [2048]; - while ((s = SslBox->GetPlaintext (B, sizeof(B) - 1)) > 0) { - B [s] = 0; - if (EventCallback) - (*EventCallback)(GetBinding().c_str(), EM_CONNECTION_READ, B, s); - } - // INCOMPLETE, s may indicate an SSL error that would force the connection down. - _DispatchCiphertext(); - } - else { - if (EventCallback) - (*EventCallback)(GetBinding().c_str(), EM_CONNECTION_READ, buffer, size); - } - #endif - - #ifdef WITHOUT_SSL - if (EventCallback) - (*EventCallback)(GetBinding().c_str(), EM_CONNECTION_READ, buffer, size); - #endif -} - - - - - -/*************************** -ConnectionDescriptor::Write -***************************/ - -void ConnectionDescriptor::Write() -{ - /* A socket which is in a pending-connect state will select - * writable when the disposition of the connect is known. - * At that point, check to be sure there are no errors, - * and if none, then promote the socket out of the pending - * state. - * TODO: I haven't figured out how Windows signals errors on - * unconnected sockets. Maybe it does the untraditional but - * logical thing and makes the socket selectable for error. - * If so, it's unsupported here for the time being, and connect - * errors will have to be caught by the timeout mechanism. - */ - - if (bConnectPending) { - int error; - socklen_t len; - len = sizeof(error); - #ifdef OS_UNIX - int o = getsockopt (GetSocket(), SOL_SOCKET, SO_ERROR, &error, &len); - #endif - #ifdef OS_WIN32 - int o = getsockopt (GetSocket(), SOL_SOCKET, SO_ERROR, (char*)&error, &len); - #endif - if ((o == 0) && (error == 0)) { - if (EventCallback) - (*EventCallback)(GetBinding().c_str(), EM_CONNECTION_COMPLETED, "", 0); - bConnectPending = false; - #ifdef HAVE_EPOLL - // The callback may have scheduled outbound data. - EpollEvent.events = EPOLLIN | (SelectForWrite() ? EPOLLOUT : 0); - #endif - #ifdef HAVE_KQUEUE - MyEventMachine->ArmKqueueReader (this); - // The callback may have scheduled outbound data. - if (SelectForWrite()) - MyEventMachine->ArmKqueueWriter (this); - #endif - } - else - ScheduleClose (false); - //bCloseNow = true; - } - else { - - if (bNotifyWritable) { - if (EventCallback) - (*EventCallback)(GetBinding().c_str(), EM_CONNECTION_NOTIFY_WRITABLE, NULL, 0); - return; - } - - _WriteOutboundData(); - } -} - - -/**************************************** -ConnectionDescriptor::_WriteOutboundData -****************************************/ - -void ConnectionDescriptor::_WriteOutboundData() -{ - /* This is a helper function called by ::Write. - * It's possible for a socket to select writable and then no longer - * be writable by the time we get around to writing. The kernel might - * have used up its available output buffers between the select call - * and when we get here. So this condition is not an error. - * - * 20Jul07, added the same kind of protection against an invalid socket - * that is at the top of ::Read. Not entirely how this could happen in - * real life (connection-reset from the remote peer, perhaps?), but I'm - * doing it to address some reports of crashing under heavy loads. - */ - - int sd = GetSocket(); - //assert (sd != INVALID_SOCKET); - if (sd == INVALID_SOCKET) { - assert (!bWriteAttemptedAfterClose); - bWriteAttemptedAfterClose = true; - return; - } - - LastIo = gCurrentLoopTime; - char output_buffer [16 * 1024]; - size_t nbytes = 0; - - while ((OutboundPages.size() > 0) && (nbytes < sizeof(output_buffer))) { - OutboundPage *op = &(OutboundPages[0]); - if ((nbytes + op->Length - op->Offset) < sizeof (output_buffer)) { - memcpy (output_buffer + nbytes, op->Buffer + op->Offset, op->Length - op->Offset); - nbytes += (op->Length - op->Offset); - op->Free(); - OutboundPages.pop_front(); - } - else { - int len = sizeof(output_buffer) - nbytes; - memcpy (output_buffer + nbytes, op->Buffer + op->Offset, len); - op->Offset += len; - nbytes += len; - } - } - - // We should never have gotten here if there were no data to write, - // so assert that as a sanity check. - // Don't bother to make sure nbytes is less than output_buffer because - // if it were we probably would have crashed already. - assert (nbytes > 0); - - assert (GetSocket() != INVALID_SOCKET); - int bytes_written = send (GetSocket(), output_buffer, nbytes, 0); - - bool err = false; - if (bytes_written < 0) { - err = true; - bytes_written = 0; - } - - assert (bytes_written >= 0); - OutboundDataSize -= bytes_written; - if ((size_t)bytes_written < nbytes) { - int len = nbytes - bytes_written; - char *buffer = (char*) malloc (len + 1); - if (!buffer) - throw std::runtime_error ("bad alloc throwing back data"); - memcpy (buffer, output_buffer + bytes_written, len); - buffer [len] = 0; - OutboundPages.push_front (OutboundPage (buffer, len)); - } - - #ifdef HAVE_EPOLL - EpollEvent.events = (EPOLLIN | (SelectForWrite() ? EPOLLOUT : 0)); - assert (MyEventMachine); - MyEventMachine->Modify (this); - #endif - #ifdef HAVE_KQUEUE - if (SelectForWrite()) - MyEventMachine->ArmKqueueWriter (this); - #endif - - - if (err) { - #ifdef OS_UNIX - if ((errno != EINPROGRESS) && (errno != EWOULDBLOCK) && (errno != EINTR)) - #endif - #ifdef OS_WIN32 - if ((errno != WSAEINPROGRESS) && (errno != WSAEWOULDBLOCK)) - #endif - Close(); - } -} - - -/**************************************** -ConnectionDescriptor::_ReportErrorStatus -****************************************/ - -int ConnectionDescriptor::_ReportErrorStatus() -{ - int error; - socklen_t len; - len = sizeof(error); - #ifdef OS_UNIX - int o = getsockopt (GetSocket(), SOL_SOCKET, SO_ERROR, &error, &len); - #endif - #ifdef OS_WIN32 - int o = getsockopt (GetSocket(), SOL_SOCKET, SO_ERROR, (char*)&error, &len); - #endif - if ((o == 0) && (error == 0)) - return 0; - else - return 1; -} - - -/****************************** -ConnectionDescriptor::StartTls -******************************/ - -void ConnectionDescriptor::StartTls() -{ - #ifdef WITH_SSL - if (SslBox) - throw std::runtime_error ("SSL/TLS already running on connection"); - - SslBox = new SslBox_t (bIsServer, PrivateKeyFilename, CertChainFilename); - _DispatchCiphertext(); - #endif - - #ifdef WITHOUT_SSL - throw std::runtime_error ("Encryption not available on this event-machine"); - #endif -} - - -/********************************* -ConnectionDescriptor::SetTlsParms -*********************************/ - -void ConnectionDescriptor::SetTlsParms (const char *privkey_filename, const char *certchain_filename) -{ - #ifdef WITH_SSL - if (SslBox) - throw std::runtime_error ("call SetTlsParms before calling StartTls"); - if (privkey_filename && *privkey_filename) - PrivateKeyFilename = privkey_filename; - if (certchain_filename && *certchain_filename) - CertChainFilename = certchain_filename; - #endif - - #ifdef WITHOUT_SSL - throw std::runtime_error ("Encryption not available on this event-machine"); - #endif -} - - - -/***************************************** -ConnectionDescriptor::_DispatchCiphertext -*****************************************/ -#ifdef WITH_SSL -void ConnectionDescriptor::_DispatchCiphertext() -{ - assert (SslBox); - - - char BigBuf [2048]; - bool did_work; - - do { - did_work = false; - - // try to drain ciphertext - while (SslBox->CanGetCiphertext()) { - int r = SslBox->GetCiphertext (BigBuf, sizeof(BigBuf)); - assert (r > 0); - _SendRawOutboundData (BigBuf, r); - did_work = true; - } - - // Pump the SslBox, in case it has queued outgoing plaintext - // This will return >0 if data was written, - // 0 if no data was written, and <0 if there was a fatal error. - bool pump; - do { - pump = false; - int w = SslBox->PutPlaintext (NULL, 0); - if (w > 0) { - did_work = true; - pump = true; - } - else if (w < 0) - ScheduleClose (false); - } while (pump); - - // try to put plaintext. INCOMPLETE, doesn't belong here? - // In SendOutboundData, we're spooling plaintext directly - // into SslBox. That may be wrong, we may need to buffer it - // up here! - /* - const char *ptr; - int ptr_length; - while (OutboundPlaintext.GetPage (&ptr, &ptr_length)) { - assert (ptr && (ptr_length > 0)); - int w = SslMachine.PutPlaintext (ptr, ptr_length); - if (w > 0) { - OutboundPlaintext.DiscardBytes (w); - did_work = true; - } - else - break; - } - */ - - } while (did_work); - -} -#endif - - - -/******************************* -ConnectionDescriptor::Heartbeat -*******************************/ - -void ConnectionDescriptor::Heartbeat() -{ - /* Only allow a certain amount of time to go by while waiting - * for a pending connect. If it expires, then kill the socket. - * For a connected socket, close it if its inactivity timer - * has expired. - */ - - if (bConnectPending) { - if ((gCurrentLoopTime - CreatedAt) >= PendingConnectTimeout) - ScheduleClose (false); - //bCloseNow = true; - } - else { - if (InactivityTimeout && ((gCurrentLoopTime - LastIo) >= InactivityTimeout)) - ScheduleClose (false); - //bCloseNow = true; - } -} - - -/**************************************** -LoopbreakDescriptor::LoopbreakDescriptor -****************************************/ - -LoopbreakDescriptor::LoopbreakDescriptor (int sd, EventMachine_t *parent_em): - EventableDescriptor (sd, parent_em) -{ - /* This is really bad and ugly. Change someday if possible. - * We have to know about an event-machine (probably the one that owns us), - * so we can pass newly-created connections to it. - */ - - bCallbackUnbind = false; - - #ifdef HAVE_EPOLL - EpollEvent.events = EPOLLIN; - #endif - #ifdef HAVE_KQUEUE - MyEventMachine->ArmKqueueReader (this); - #endif -} - - - - -/************************* -LoopbreakDescriptor::Read -*************************/ - -void LoopbreakDescriptor::Read() -{ - // TODO, refactor, this code is probably in the wrong place. - assert (MyEventMachine); - MyEventMachine->_ReadLoopBreaker(); -} - - -/************************** -LoopbreakDescriptor::Write -**************************/ - -void LoopbreakDescriptor::Write() -{ - // Why are we here? - throw std::runtime_error ("bad code path in loopbreak"); -} - -/************************************** -AcceptorDescriptor::AcceptorDescriptor -**************************************/ - -AcceptorDescriptor::AcceptorDescriptor (int sd, EventMachine_t *parent_em): - EventableDescriptor (sd, parent_em) -{ - #ifdef HAVE_EPOLL - EpollEvent.events = EPOLLIN; - #endif - #ifdef HAVE_KQUEUE - MyEventMachine->ArmKqueueReader (this); - #endif -} - - -/*************************************** -AcceptorDescriptor::~AcceptorDescriptor -***************************************/ - -AcceptorDescriptor::~AcceptorDescriptor() -{ -} - -/**************************************** -STATIC: AcceptorDescriptor::StopAcceptor -****************************************/ - -void AcceptorDescriptor::StopAcceptor (const char *binding) -{ - // TODO: This is something of a hack, or at least it's a static method of the wrong class. - AcceptorDescriptor *ad = dynamic_cast <AcceptorDescriptor*> (Bindable_t::GetObject (binding)); - if (ad) - ad->ScheduleClose (false); - else - throw std::runtime_error ("failed to close nonexistent acceptor"); -} - - -/************************ -AcceptorDescriptor::Read -************************/ - -void AcceptorDescriptor::Read() -{ - /* Accept up to a certain number of sockets on the listening connection. - * Don't try to accept all that are present, because this would allow a DoS attack - * in which no data were ever read or written. We should accept more than one, - * if available, to keep the partially accepted sockets from backing up in the kernel. - */ - - /* Make sure we use non-blocking i/o on the acceptor socket, since we're selecting it - * for readability. According to Stevens UNP, it's possible for an acceptor to select readable - * and then block when we call accept. For example, the other end resets the connection after - * the socket selects readable and before we call accept. The kernel will remove the dead - * socket from the accept queue. If the accept queue is now empty, accept will block. - */ - - - struct sockaddr_in pin; - socklen_t addrlen = sizeof (pin); - - for (int i=0; i < 10; i++) { - int sd = accept (GetSocket(), (struct sockaddr*)&pin, &addrlen); - if (sd == INVALID_SOCKET) { - // This breaks the loop when we've accepted everything on the kernel queue, - // up to 10 new connections. But what if the *first* accept fails? - // Does that mean anything serious is happening, beyond the situation - // described in the note above? - break; - } - - // Set the newly-accepted socket non-blocking. - // On Windows, this may fail because, weirdly, Windows inherits the non-blocking - // attribute that we applied to the acceptor socket into the accepted one. - if (!SetSocketNonblocking (sd)) { - //int val = fcntl (sd, F_GETFL, 0); - //if (fcntl (sd, F_SETFL, val | O_NONBLOCK) == -1) { - shutdown (sd, 1); - closesocket (sd); - continue; - } - - - // Disable slow-start (Nagle algorithm). Eventually make this configurable. - int one = 1; - setsockopt (sd, IPPROTO_TCP, TCP_NODELAY, (char*) &one, sizeof(one)); - - - ConnectionDescriptor *cd = new ConnectionDescriptor (sd, MyEventMachine); - if (!cd) - throw std::runtime_error ("no newly accepted connection"); - cd->SetServerMode(); - if (EventCallback) { - (*EventCallback) (GetBinding().c_str(), EM_CONNECTION_ACCEPTED, cd->GetBinding().c_str(), cd->GetBinding().size()); - } - #ifdef HAVE_EPOLL - cd->GetEpollEvent()->events = EPOLLIN | (cd->SelectForWrite() ? EPOLLOUT : 0); - #endif - assert (MyEventMachine); - MyEventMachine->Add (cd); - #ifdef HAVE_KQUEUE - if (cd->SelectForWrite()) - MyEventMachine->ArmKqueueWriter (cd); - MyEventMachine->ArmKqueueReader (cd); - #endif - } - -} - - -/************************* -AcceptorDescriptor::Write -*************************/ - -void AcceptorDescriptor::Write() -{ - // Why are we here? - throw std::runtime_error ("bad code path in acceptor"); -} - - -/***************************** -AcceptorDescriptor::Heartbeat -*****************************/ - -void AcceptorDescriptor::Heartbeat() -{ - // No-op -} - - -/******************************* -AcceptorDescriptor::GetSockname -*******************************/ - -bool AcceptorDescriptor::GetSockname (struct sockaddr *s) -{ - bool ok = false; - if (s) { - socklen_t len = sizeof(*s); - int gp = getsockname (GetSocket(), s, &len); - if (gp == 0) - ok = true; - } - return ok; -} - - - -/************************************** -DatagramDescriptor::DatagramDescriptor -**************************************/ - -DatagramDescriptor::DatagramDescriptor (int sd, EventMachine_t *parent_em): - EventableDescriptor (sd, parent_em), - OutboundDataSize (0), - LastIo (gCurrentLoopTime), - InactivityTimeout (0) -{ - memset (&ReturnAddress, 0, sizeof(ReturnAddress)); - - /* Provisionally added 19Oct07. All datagram sockets support broadcasting. - * Until now, sending to a broadcast address would give EACCES (permission denied) - * on systems like Linux and BSD that require the SO_BROADCAST socket-option in order - * to accept a packet to a broadcast address. Solaris doesn't require it. I think - * Windows DOES require it but I'm not sure. - * - * Ruby does NOT do what we're doing here. In Ruby, you have to explicitly set SO_BROADCAST - * on a UDP socket in order to enable broadcasting. The reason for requiring the option - * in the first place is so that applications don't send broadcast datagrams by mistake. - * I imagine that could happen if a user of an application typed in an address that happened - * to be a broadcast address on that particular subnet. - * - * This is provisional because someone may eventually come up with a good reason not to - * do it for all UDP sockets. If that happens, then we'll need to add a usercode-level API - * to set the socket option, just like Ruby does. AND WE'LL ALSO BREAK CODE THAT DOESN'T - * EXPLICITLY SET THE OPTION. - */ - - int oval = 1; - int sob = setsockopt (GetSocket(), SOL_SOCKET, SO_BROADCAST, (char*)&oval, sizeof(oval)); - - #ifdef HAVE_EPOLL - EpollEvent.events = EPOLLIN; - #endif - #ifdef HAVE_KQUEUE - MyEventMachine->ArmKqueueReader (this); - #endif -} - - -/*************************************** -DatagramDescriptor::~DatagramDescriptor -***************************************/ - -DatagramDescriptor::~DatagramDescriptor() -{ - // Run down any stranded outbound data. - for (size_t i=0; i < OutboundPages.size(); i++) - OutboundPages[i].Free(); -} - - -/***************************** -DatagramDescriptor::Heartbeat -*****************************/ - -void DatagramDescriptor::Heartbeat() -{ - // Close it if its inactivity timer has expired. - - if (InactivityTimeout && ((gCurrentLoopTime - LastIo) >= InactivityTimeout)) - ScheduleClose (false); - //bCloseNow = true; -} - - -/************************ -DatagramDescriptor::Read -************************/ - -void DatagramDescriptor::Read() -{ - int sd = GetSocket(); - assert (sd != INVALID_SOCKET); - LastIo = gCurrentLoopTime; - - // This is an extremely large read buffer. - // In many cases you wouldn't expect to get any more than 4K. - char readbuffer [16 * 1024]; - - for (int i=0; i < 10; i++) { - // Don't read just one buffer and then move on. This is faster - // if there is a lot of incoming. - // But don't read indefinitely. Give other sockets a chance to run. - // NOTICE, we're reading one less than the buffer size. - // That's so we can put a guard byte at the end of what we send - // to user code. - - struct sockaddr_in sin; - socklen_t slen = sizeof (sin); - memset (&sin, 0, slen); - - int r = recvfrom (sd, readbuffer, sizeof(readbuffer) - 1, 0, (struct sockaddr*)&sin, &slen); - //cerr << "<R:" << r << ">"; - - // In UDP, a zero-length packet is perfectly legal. - if (r >= 0) { - LastRead = gCurrentLoopTime; - - // Add a null-terminator at the the end of the buffer - // that we will send to the callback. - // DO NOT EVER CHANGE THIS. We want to explicitly allow users - // to be able to depend on this behavior, so they will have - // the option to do some things faster. Additionally it's - // a security guard against buffer overflows. - readbuffer [r] = 0; - - - // Set up a "temporary" return address so that callers can "reply" to us - // from within the callback we are about to invoke. That means that ordinary - // calls to "send_data_to_connection" (which is of course misnamed in this - // case) will result in packets being sent back to the same place that sent - // us this one. - // There is a different call (evma_send_datagram) for cases where the caller - // actually wants to send a packet somewhere else. - - memset (&ReturnAddress, 0, sizeof(ReturnAddress)); - memcpy (&ReturnAddress, &sin, slen); - - if (EventCallback) - (*EventCallback)(GetBinding().c_str(), EM_CONNECTION_READ, readbuffer, r); - - } - else { - // Basically a would-block, meaning we've read everything there is to read. - break; - } - - } - - -} - - -/************************* -DatagramDescriptor::Write -*************************/ - -void DatagramDescriptor::Write() -{ - /* It's possible for a socket to select writable and then no longer - * be writable by the time we get around to writing. The kernel might - * have used up its available output buffers between the select call - * and when we get here. So this condition is not an error. - * This code is very reminiscent of ConnectionDescriptor::_WriteOutboundData, - * but differs in the that the outbound data pages (received from the - * user) are _message-structured._ That is, we send each of them out - * one message at a time. - * TODO, we are currently suppressing the EMSGSIZE error!!! - */ - - int sd = GetSocket(); - assert (sd != INVALID_SOCKET); - LastIo = gCurrentLoopTime; - - assert (OutboundPages.size() > 0); - - // Send out up to 10 packets, then cycle the machine. - for (int i = 0; i < 10; i++) { - if (OutboundPages.size() <= 0) - break; - OutboundPage *op = &(OutboundPages[0]); - - // The nasty cast to (char*) is needed because Windows is brain-dead. - int s = sendto (sd, (char*)op->Buffer, op->Length, 0, (struct sockaddr*)&(op->From), sizeof(op->From)); - int e = errno; - - OutboundDataSize -= op->Length; - op->Free(); - OutboundPages.pop_front(); - - if (s == SOCKET_ERROR) { - #ifdef OS_UNIX - if ((e != EINPROGRESS) && (e != EWOULDBLOCK) && (e != EINTR)) { - #endif - #ifdef OS_WIN32 - if ((e != WSAEINPROGRESS) && (e != WSAEWOULDBLOCK)) { - #endif - Close(); - break; - } - } - } - - #ifdef HAVE_EPOLL - EpollEvent.events = (EPOLLIN | (SelectForWrite() ? EPOLLOUT : 0)); - assert (MyEventMachine); - MyEventMachine->Modify (this); - #endif -} - - -/********************************** -DatagramDescriptor::SelectForWrite -**********************************/ - -bool DatagramDescriptor::SelectForWrite() -{ - /* Changed 15Nov07, per bug report by Mark Zvillius. - * The outbound data size will be zero if there are zero-length outbound packets, - * so we now select writable in case the outbound page buffer is not empty. - * Note that the superclass ShouldDelete method still checks for outbound data size, - * which may be wrong. - */ - //return (GetOutboundDataSize() > 0); (Original) - return (OutboundPages.size() > 0); -} - - -/************************************ -DatagramDescriptor::SendOutboundData -************************************/ - -int DatagramDescriptor::SendOutboundData (const char *data, int length) -{ - // This is an exact clone of ConnectionDescriptor::SendOutboundData. - // That means it needs to move to a common ancestor. - - if (IsCloseScheduled()) - //if (bCloseNow || bCloseAfterWriting) - return 0; - - if (!data && (length > 0)) - throw std::runtime_error ("bad outbound data"); - char *buffer = (char *) malloc (length + 1); - if (!buffer) - throw std::runtime_error ("no allocation for outbound data"); - memcpy (buffer, data, length); - buffer [length] = 0; - OutboundPages.push_back (OutboundPage (buffer, length, ReturnAddress)); - OutboundDataSize += length; - #ifdef HAVE_EPOLL - EpollEvent.events = (EPOLLIN | EPOLLOUT); - assert (MyEventMachine); - MyEventMachine->Modify (this); - #endif - return length; -} - - -/**************************************** -DatagramDescriptor::SendOutboundDatagram -****************************************/ - -int DatagramDescriptor::SendOutboundDatagram (const char *data, int length, const char *address, int port) -{ - // This is an exact clone of ConnectionDescriptor::SendOutboundData. - // That means it needs to move to a common ancestor. - // TODO: Refactor this so there's no overlap with SendOutboundData. - - if (IsCloseScheduled()) - //if (bCloseNow || bCloseAfterWriting) - return 0; - - if (!address || !*address || !port) - return 0; - - sockaddr_in pin; - unsigned long HostAddr; - - HostAddr = inet_addr (address); - if (HostAddr == INADDR_NONE) { - // The nasty cast to (char*) is because Windows is brain-dead. - hostent *hp = gethostbyname ((char*)address); - if (!hp) - return 0; - HostAddr = ((in_addr*)(hp->h_addr))->s_addr; - } - - memset (&pin, 0, sizeof(pin)); - pin.sin_family = AF_INET; - pin.sin_addr.s_addr = HostAddr; - pin.sin_port = htons (port); - - - - if (!data && (length > 0)) - throw std::runtime_error ("bad outbound data"); - char *buffer = (char *) malloc (length + 1); - if (!buffer) - throw std::runtime_error ("no allocation for outbound data"); - memcpy (buffer, data, length); - buffer [length] = 0; - OutboundPages.push_back (OutboundPage (buffer, length, pin)); - OutboundDataSize += length; - #ifdef HAVE_EPOLL - EpollEvent.events = (EPOLLIN | EPOLLOUT); - assert (MyEventMachine); - MyEventMachine->Modify (this); - #endif - return length; -} - - -/**************************************** -STATIC: DatagramDescriptor::SendDatagram -****************************************/ - -int DatagramDescriptor::SendDatagram (const char *binding, const char *data, int length, const char *address, int port) -{ - DatagramDescriptor *dd = dynamic_cast <DatagramDescriptor*> (Bindable_t::GetObject (binding)); - if (dd) - return dd->SendOutboundDatagram (data, length, address, port); - else - return -1; -} - - -/********************************* -ConnectionDescriptor::GetPeername -*********************************/ - -bool ConnectionDescriptor::GetPeername (struct sockaddr *s) -{ - bool ok = false; - if (s) { - socklen_t len = sizeof(*s); - int gp = getpeername (GetSocket(), s, &len); - if (gp == 0) - ok = true; - } - return ok; -} - -/********************************* -ConnectionDescriptor::GetSockname -*********************************/ - -bool ConnectionDescriptor::GetSockname (struct sockaddr *s) -{ - bool ok = false; - if (s) { - socklen_t len = sizeof(*s); - int gp = getsockname (GetSocket(), s, &len); - if (gp == 0) - ok = true; - } - return ok; -} - - -/********************************************** -ConnectionDescriptor::GetCommInactivityTimeout -**********************************************/ - -int ConnectionDescriptor::GetCommInactivityTimeout (int *value) -{ - if (value) { - *value = InactivityTimeout; - return 1; - } - else { - // TODO, extended logging, got bad parameter. - return 0; - } -} - - -/********************************************** -ConnectionDescriptor::SetCommInactivityTimeout -**********************************************/ - -int ConnectionDescriptor::SetCommInactivityTimeout (int *value) -{ - int out = 0; - - if (value) { - if ((*value==0) || (*value >= 2)) { - // Replace the value and send the old one back to the caller. - int v = *value; - *value = InactivityTimeout; - InactivityTimeout = v; - out = 1; - } - else { - // TODO, extended logging, got bad value. - } - } - else { - // TODO, extended logging, got bad parameter. - } - - return out; -} - -/******************************* -DatagramDescriptor::GetPeername -*******************************/ - -bool DatagramDescriptor::GetPeername (struct sockaddr *s) -{ - bool ok = false; - if (s) { - memset (s, 0, sizeof(struct sockaddr)); - memcpy (s, &ReturnAddress, sizeof(ReturnAddress)); - ok = true; - } - return ok; -} - -/******************************* -DatagramDescriptor::GetSockname -*******************************/ - -bool DatagramDescriptor::GetSockname (struct sockaddr *s) -{ - bool ok = false; - if (s) { - socklen_t len = sizeof(*s); - int gp = getsockname (GetSocket(), s, &len); - if (gp == 0) - ok = true; - } - return ok; -} - - - -/******************************************** -DatagramDescriptor::GetCommInactivityTimeout -********************************************/ - -int DatagramDescriptor::GetCommInactivityTimeout (int *value) -{ - if (value) { - *value = InactivityTimeout; - return 1; - } - else { - // TODO, extended logging, got bad parameter. - return 0; - } -} - -/******************************************** -DatagramDescriptor::SetCommInactivityTimeout -********************************************/ - -int DatagramDescriptor::SetCommInactivityTimeout (int *value) -{ - int out = 0; - - if (value) { - if ((*value==0) || (*value >= 2)) { - // Replace the value and send the old one back to the caller. - int v = *value; - *value = InactivityTimeout; - InactivityTimeout = v; - out = 1; - } - else { - // TODO, extended logging, got bad value. - } - } - else { - // TODO, extended logging, got bad parameter. - } - - return out; -} - -/***************************************************************************** - -$Id$ - -File: em.cpp -Date: 06Apr06 - -Copyright (C) 2006-07 by Francis Cianfrocca. All Rights Reserved. -Gmail: blackhedd - -This program is free software; you can redistribute it and/or modify -it under the terms of either: 1) the GNU General Public License -as published by the Free Software Foundation; either version 2 of the -License, or (at your option) any later version; or 2) Ruby's License. - -See the file COPYING for complete licensing information. - -*****************************************************************************/ - -// THIS ENTIRE FILE WILL EVENTUALLY BE FOR UNIX BUILDS ONLY. -//#ifdef OS_UNIX - - -#include "project.h" - -// Keep a global variable floating around -// with the current loop time as set by the Event Machine. -// This avoids the need for frequent expensive calls to time(NULL); -time_t gCurrentLoopTime; - -#ifdef OS_WIN32 -unsigned gTickCountTickover; -unsigned gLastTickCount; -#endif - - -/* The numer of max outstanding timers was once a const enum defined in em.h. - * Now we define it here so that users can change its value if necessary. - */ -static int MaxOutstandingTimers = 1000; - - -/* Internal helper to convert strings to internet addresses. IPv6-aware. - * Not reentrant or threadsafe, optimized for speed. - */ -static struct sockaddr *name2address (const char *server, int port, int *family, int *bind_size); - - -/*************************************** -STATIC EventMachine_t::SetMaxTimerCount -***************************************/ - -void EventMachine_t::SetMaxTimerCount (int count) -{ - /* Allow a user to increase the maximum number of outstanding timers. - * If this gets "too high" (a metric that is of course platform dependent), - * bad things will happen like performance problems and possible overuse - * of memory. - * The actual timer mechanism is very efficient so it's hard to know what - * the practical max, but 100,000 shouldn't be too problematical. - */ - if (count < 100) - count = 100; - MaxOutstandingTimers = count; -} - - - -/****************************** -EventMachine_t::EventMachine_t -******************************/ - -EventMachine_t::EventMachine_t (void (*event_callback)(const char*, int, const char*, int)): - EventCallback (event_callback), - NextHeartbeatTime (0), - LoopBreakerReader (-1), - LoopBreakerWriter (-1), - bEpoll (false), - bKqueue (false), - epfd (-1) -{ - // Default time-slice is just smaller than one hundred mills. - Quantum.tv_sec = 0; - Quantum.tv_usec = 90000; - - gTerminateSignalReceived = false; - // Make sure the current loop time is sane, in case we do any initializations of - // objects before we start running. - gCurrentLoopTime = time(NULL); - - /* We initialize the network library here (only on Windows of course) - * and initialize "loop breakers." Our destructor also does some network-level - * cleanup. There's thus an implicit assumption that any given instance of EventMachine_t - * will only call ::Run once. Is that a good assumption? Should we move some of these - * inits and de-inits into ::Run? - */ - #ifdef OS_WIN32 - WSADATA w; - WSAStartup (MAKEWORD (1, 1), &w); - #endif - - _InitializeLoopBreaker(); -} - - -/******************************* -EventMachine_t::~EventMachine_t -*******************************/ - -EventMachine_t::~EventMachine_t() -{ - // Run down descriptors - size_t i; - for (i = 0; i < NewDescriptors.size(); i++) - delete NewDescriptors[i]; - for (i = 0; i < Descriptors.size(); i++) - delete Descriptors[i]; - - close (LoopBreakerReader); - close (LoopBreakerWriter); - - if (epfd != -1) - close (epfd); - if (kqfd != -1) - close (kqfd); -} - - -/************************* -EventMachine_t::_UseEpoll -*************************/ - -void EventMachine_t::_UseEpoll() -{ - /* Temporary. - * Use an internal flag to switch in epoll-based functionality until we determine - * how it should be integrated properly and the extent of the required changes. - * A permanent solution needs to allow the integration of additional technologies, - * like kqueue and Solaris's events. - */ - - #ifdef HAVE_EPOLL - bEpoll = true; - #endif -} - -/************************** -EventMachine_t::_UseKqueue -**************************/ - -void EventMachine_t::_UseKqueue() -{ - /* Temporary. - * See comments under _UseEpoll. - */ - - #ifdef HAVE_KQUEUE - bKqueue = true; - #endif -} - - -/**************************** -EventMachine_t::ScheduleHalt -****************************/ - -void EventMachine_t::ScheduleHalt() -{ - /* This is how we stop the machine. - * This can be called by clients. Signal handlers will probably - * set the global flag. - * For now this means there can only be one EventMachine ever running at a time. - * - * IMPORTANT: keep this light, fast, and async-safe. Don't do anything frisky in here, - * because it may be called from signal handlers invoked from code that we don't - * control. At this writing (20Sep06), EM does NOT install any signal handlers of - * its own. - * - * We need a FAQ. And one of the questions is: how do I stop EM when Ctrl-C happens? - * The answer is to call evma_stop_machine, which calls here, from a SIGINT handler. - */ - gTerminateSignalReceived = true; -} - - - -/******************************* -EventMachine_t::SetTimerQuantum -*******************************/ - -void EventMachine_t::SetTimerQuantum (int interval) -{ - /* We get a timer-quantum expressed in milliseconds. - * Don't set a quantum smaller than 5 or larger than 2500. - */ - - if ((interval < 5) || (interval > 2500)) - throw std::runtime_error ("invalid timer-quantum"); - - Quantum.tv_sec = interval / 1000; - Quantum.tv_usec = (interval % 1000) * 1000; -} - - -/************************************* -(STATIC) EventMachine_t::SetuidString -*************************************/ - -void EventMachine_t::SetuidString (const char *username) -{ - /* This method takes a caller-supplied username and tries to setuid - * to that user. There is no meaningful implementation (and no error) - * on Windows. On Unix, a failure to setuid the caller-supplied string - * causes a fatal abort, because presumably the program is calling here - * in order to fulfill a security requirement. If we fail silently, - * the user may continue to run with too much privilege. - * - * TODO, we need to decide on and document a way of generating C++ level errors - * that can be wrapped in documented Ruby exceptions, so users can catch - * and handle them. And distinguish it from errors that we WON'T let the Ruby - * user catch (like security-violations and resource-overallocation). - * A setuid failure here would be in the latter category. - */ - - #ifdef OS_UNIX - if (!username || !*username) - throw std::runtime_error ("setuid_string failed: no username specified"); - - struct passwd *p = getpwnam (username); - if (!p) - throw std::runtime_error ("setuid_string failed: unknown username"); - - if (setuid (p->pw_uid) != 0) - throw std::runtime_error ("setuid_string failed: no setuid"); - - // Success. - #endif -} - - -/**************************************** -(STATIC) EventMachine_t::SetRlimitNofile -****************************************/ - -int EventMachine_t::SetRlimitNofile (int nofiles) -{ - #ifdef OS_UNIX - struct rlimit rlim; - getrlimit (RLIMIT_NOFILE, &rlim); - if (nofiles >= 0) { - rlim.rlim_cur = nofiles; - if (nofiles > rlim.rlim_max) - rlim.rlim_max = nofiles; - setrlimit (RLIMIT_NOFILE, &rlim); - // ignore the error return, for now at least. - // TODO, emit an error message someday when we have proper debug levels. - } - getrlimit (RLIMIT_NOFILE, &rlim); - return rlim.rlim_cur; - #endif - - #ifdef OS_WIN32 - // No meaningful implementation on Windows. - return 0; - #endif -} - - -/********************************* -EventMachine_t::SignalLoopBreaker -*********************************/ - -void EventMachine_t::SignalLoopBreaker() -{ - #ifdef OS_UNIX - write (LoopBreakerWriter, "", 1); - #endif - #ifdef OS_WIN32 - sendto (LoopBreakerReader, "", 0, 0, (struct sockaddr*)&(LoopBreakerTarget), sizeof(LoopBreakerTarget)); - #endif -} - - -/************************************** -EventMachine_t::_InitializeLoopBreaker -**************************************/ - -void EventMachine_t::_InitializeLoopBreaker() -{ - /* A "loop-breaker" is a socket-descriptor that we can write to in order - * to break the main select loop. Primarily useful for things running on - * threads other than the main EM thread, so they can trigger processing - * of events that arise exogenously to the EM. - * Keep the loop-breaker pipe out of the main descriptor set, otherwise - * its events will get passed on to user code. - */ - - #ifdef OS_UNIX - int fd[2]; - if (pipe (fd)) - throw std::runtime_error ("no loop breaker"); - - LoopBreakerWriter = fd[1]; - LoopBreakerReader = fd[0]; - #endif - - #ifdef OS_WIN32 - int sd = socket (AF_INET, SOCK_DGRAM, 0); - if (sd == INVALID_SOCKET) - throw std::runtime_error ("no loop breaker socket"); - SetSocketNonblocking (sd); - - memset (&LoopBreakerTarget, 0, sizeof(LoopBreakerTarget)); - LoopBreakerTarget.sin_family = AF_INET; - LoopBreakerTarget.sin_addr.s_addr = inet_addr ("127.0.0.1"); - - srand ((int)time(NULL)); - int i; - for (i=0; i < 100; i++) { - int r = (rand() % 10000) + 20000; - LoopBreakerTarget.sin_port = htons (r); - if (bind (sd, (struct sockaddr*)&LoopBreakerTarget, sizeof(LoopBreakerTarget)) == 0) - break; - } - - if (i == 100) - throw std::runtime_error ("no loop breaker"); - LoopBreakerReader = sd; - #endif -} - - -/******************* -EventMachine_t::Run -*******************/ - -void EventMachine_t::Run() -{ - #ifdef OS_WIN32 - HookControlC (true); - #endif - - #ifdef HAVE_EPOLL - if (bEpoll) { - epfd = epoll_create (MaxEpollDescriptors); - if (epfd == -1) { - char buf[200]; - snprintf (buf, sizeof(buf)-1, "unable to create epoll descriptor: %s", strerror(errno)); - throw std::runtime_error (buf); - } - int cloexec = fcntl (epfd, F_GETFD, 0); - assert (cloexec >= 0); - cloexec |= FD_CLOEXEC; - fcntl (epfd, F_SETFD, cloexec); - - assert (LoopBreakerReader >= 0); - LoopbreakDescriptor *ld = new LoopbreakDescriptor (LoopBreakerReader, this); - assert (ld); - Add (ld); - } - #endif - - #ifdef HAVE_KQUEUE - if (bKqueue) { - kqfd = kqueue(); - if (kqfd == -1) { - char buf[200]; - snprintf (buf, sizeof(buf)-1, "unable to create kqueue descriptor: %s", strerror(errno)); - throw std::runtime_error (buf); - } - // cloexec not needed. By definition, kqueues are not carried across forks. - - assert (LoopBreakerReader >= 0); - LoopbreakDescriptor *ld = new LoopbreakDescriptor (LoopBreakerReader, this); - assert (ld); - Add (ld); - } - #endif - - while (true) { - gCurrentLoopTime = time(NULL); - if (!_RunTimers()) - break; - - /* _Add must precede _Modify because the same descriptor might - * be on both lists during the same pass through the machine, - * and to modify a descriptor before adding it would fail. - */ - _AddNewDescriptors(); - _ModifyDescriptors(); - - if (!_RunOnce()) - break; - if (gTerminateSignalReceived) - break; - } - - #ifdef OS_WIN32 - HookControlC (false); - #endif -} - - -/************************ -EventMachine_t::_RunOnce -************************/ - -bool EventMachine_t::_RunOnce() -{ - if (bEpoll) - return _RunEpollOnce(); - else if (bKqueue) - return _RunKqueueOnce(); - else - return _RunSelectOnce(); -} - - - -/***************************** -EventMachine_t::_RunEpollOnce -*****************************/ - -bool EventMachine_t::_RunEpollOnce() -{ - #ifdef HAVE_EPOLL - assert (epfd != -1); - struct epoll_event ev [MaxEpollDescriptors]; - int s; - - #ifdef BUILD_FOR_RUBY - TRAP_BEG; - #endif - s = epoll_wait (epfd, ev, MaxEpollDescriptors, 50); - #ifdef BUILD_FOR_RUBY - TRAP_END; - #endif - - if (s > 0) { - for (int i=0; i < s; i++) { - EventableDescriptor *ed = (EventableDescriptor*) ev[i].data.ptr; - - if (ev[i].events & (EPOLLERR | EPOLLHUP)) - ed->ScheduleClose (false); - if (ev[i].events & EPOLLIN) - ed->Read(); - if (ev[i].events & EPOLLOUT) { - ed->Write(); - epoll_ctl (epfd, EPOLL_CTL_MOD, ed->GetSocket(), ed->GetEpollEvent()); - // Ignoring return value - } - } - } - else if (s < 0) { - // epoll_wait can fail on error in a handful of ways. - // If this happens, then wait for a little while to avoid busy-looping. - // If the error was EINTR, we probably caught SIGCHLD or something, - // so keep the wait short. - timeval tv = {0, ((errno == EINTR) ? 5 : 50) * 1000}; - EmSelect (0, NULL, NULL, NULL, &tv); - } - - { // cleanup dying sockets - // vector::pop_back works in constant time. - // TODO, rip this out and only delete the descriptors we know have died, - // rather than traversing the whole list. - // Modified 05Jan08 per suggestions by Chris Heath. It's possible that - // an EventableDescriptor will have a descriptor value of -1. That will - // happen if EventableDescriptor::Close was called on it. In that case, - // don't call epoll_ctl to remove the socket's filters from the epoll set. - // According to the epoll docs, this happens automatically when the - // descriptor is closed anyway. This is different from the case where - // the socket has already been closed but the descriptor in the ED object - // hasn't yet been set to INVALID_SOCKET. - int i, j; - int nSockets = Descriptors.size(); - for (i=0, j=0; i < nSockets; i++) { - EventableDescriptor *ed = Descriptors[i]; - assert (ed); - if (ed->ShouldDelete()) { - if (ed->GetSocket() != INVALID_SOCKET) { - assert (bEpoll); // wouldn't be in this method otherwise. - assert (epfd != -1); - int e = epoll_ctl (epfd, EPOLL_CTL_DEL, ed->GetSocket(), ed->GetEpollEvent()); - // ENOENT or EBADF are not errors because the socket may be already closed when we get here. - if (e && (errno != ENOENT) && (errno != EBADF)) { - char buf [200]; - snprintf (buf, sizeof(buf)-1, "unable to delete epoll event: %s", strerror(errno)); - throw std::runtime_error (buf); - } - } - - ModifiedDescriptors.erase (ed); - delete ed; - } - else - Descriptors [j++] = ed; - } - while ((size_t)j < Descriptors.size()) - Descriptors.pop_back(); - - } - - // TODO, heartbeats. - // Added 14Sep07, its absence was noted by Brian Candler. But the comment was here, indicated - // that this got thought about and not done when EPOLL was originally written. Was there a reason - // not to do it, or was it an oversight? Certainly, running a heartbeat on 50,000 connections every - // two seconds can get to be a real bear, especially if all we're doing is timing out dead ones. - // Maybe there's a better way to do this. (Or maybe it's not that expensive after all.) - // - { // dispatch heartbeats - if (gCurrentLoopTime >= NextHeartbeatTime) { - NextHeartbeatTime = gCurrentLoopTime + HeartbeatInterval; - - for (int i=0; i < Descriptors.size(); i++) { - EventableDescriptor *ed = Descriptors[i]; - assert (ed); - ed->Heartbeat(); - } - } - } - - #ifdef BUILD_FOR_RUBY - if (!rb_thread_alone()) { - rb_thread_schedule(); - } - #endif - - return true; - #else - throw std::runtime_error ("epoll is not implemented on this platform"); - #endif -} - - -/****************************** -EventMachine_t::_RunKqueueOnce -******************************/ - -bool EventMachine_t::_RunKqueueOnce() -{ - #ifdef HAVE_KQUEUE - assert (kqfd != -1); - const int maxKevents = 2000; - struct kevent Karray [maxKevents]; - struct timespec ts = {0, 10000000}; // Too frequent. Use blocking_region - - int k; - #ifdef BUILD_FOR_RUBY - TRAP_BEG; - #endif - k = kevent (kqfd, NULL, 0, Karray, maxKevents, &ts); - #ifdef BUILD_FOR_RUBY - TRAP_END; - #endif - struct kevent *ke = Karray; - while (k > 0) { - EventableDescriptor *ed = (EventableDescriptor*) (ke->udata); - assert (ed); - - if (ke->filter == EVFILT_READ) - ed->Read(); - else if (ke->filter == EVFILT_WRITE) - ed->Write(); - else - cerr << "Discarding unknown kqueue event " << ke->filter << endl; - - --k; - ++ke; - } - - { // cleanup dying sockets - // vector::pop_back works in constant time. - // TODO, rip this out and only delete the descriptors we know have died, - // rather than traversing the whole list. - // In kqueue, closing a descriptor automatically removes its event filters. - - int i, j; - int nSockets = Descriptors.size(); - for (i=0, j=0; i < nSockets; i++) { - EventableDescriptor *ed = Descriptors[i]; - assert (ed); - if (ed->ShouldDelete()) { - ModifiedDescriptors.erase (ed); - delete ed; - } - else - Descriptors [j++] = ed; - } - while ((size_t)j < Descriptors.size()) - Descriptors.pop_back(); - - } - - { // dispatch heartbeats - if (gCurrentLoopTime >= NextHeartbeatTime) { - NextHeartbeatTime = gCurrentLoopTime + HeartbeatInterval; - - for (int i=0; i < Descriptors.size(); i++) { - EventableDescriptor *ed = Descriptors[i]; - assert (ed); - ed->Heartbeat(); - } - } - } - - - // TODO, replace this with rb_thread_blocking_region for 1.9 builds. - #ifdef BUILD_FOR_RUBY - if (!rb_thread_alone()) { - rb_thread_schedule(); - } - #endif - - return true; - #else - throw std::runtime_error ("kqueue is not implemented on this platform"); - #endif -} - - -/********************************* -EventMachine_t::_ModifyEpollEvent -*********************************/ - -void EventMachine_t::_ModifyEpollEvent (EventableDescriptor *ed) -{ - #ifdef HAVE_EPOLL - if (bEpoll) { - assert (epfd != -1); - assert (ed); - int e = epoll_ctl (epfd, EPOLL_CTL_MOD, ed->GetSocket(), ed->GetEpollEvent()); - if (e) { - char buf [200]; - snprintf (buf, sizeof(buf)-1, "unable to modify epoll event: %s", strerror(errno)); - throw std::runtime_error (buf); - } - } - #endif -} - - - -/************************** -SelectData_t::SelectData_t -**************************/ - -SelectData_t::SelectData_t() -{ - maxsocket = 0; - FD_ZERO (&fdreads); - FD_ZERO (&fdwrites); -} - - -#ifdef BUILD_FOR_RUBY -/***************** -_SelectDataSelect -*****************/ - -#ifdef HAVE_TBR -static VALUE _SelectDataSelect (void *v) -{ - SelectData_t *sd = (SelectData_t*)v; - sd->nSockets = select (sd->maxsocket+1, &(sd->fdreads), &(sd->fdwrites), NULL, &(sd->tv)); - return Qnil; -} -#endif - -/********************* -SelectData_t::_Select -*********************/ - -int SelectData_t::_Select() -{ - #ifdef HAVE_TBR - rb_thread_blocking_region (_SelectDataSelect, (void*)this, RUBY_UBF_IO, 0); - return nSockets; - #endif - - #ifndef HAVE_TBR - return EmSelect (maxsocket+1, &fdreads, &fdwrites, NULL, &tv); - #endif -} -#endif - - - -/****************************** -EventMachine_t::_RunSelectOnce -******************************/ - -bool EventMachine_t::_RunSelectOnce() -{ - // Crank the event machine once. - // If there are no descriptors to process, then sleep - // for a few hundred mills to avoid busy-looping. - // Return T/F to indicate whether we should continue. - // This is based on a select loop. Alternately provide epoll - // if we know we're running on a 2.6 kernel. - // epoll will be effective if we provide it as an alternative, - // however it has the same problem interoperating with Ruby - // threads that select does. - - //cerr << "X"; - - /* This protection is now obsolete, because we will ALWAYS - * have at least one descriptor (the loop-breaker) to read. - */ - /* - if (Descriptors.size() == 0) { - #ifdef OS_UNIX - timeval tv = {0, 200 * 1000}; - EmSelect (0, NULL, NULL, NULL, &tv); - return true; - #endif - #ifdef OS_WIN32 - Sleep (200); - return true; - #endif - } - */ - - SelectData_t SelectData; - /* - fd_set fdreads, fdwrites; - FD_ZERO (&fdreads); - FD_ZERO (&fdwrites); - - int maxsocket = 0; - */ - - // Always read the loop-breaker reader. - // Changed 23Aug06, provisionally implemented for Windows with a UDP socket - // running on localhost with a randomly-chosen port. (*Puke*) - // Windows has a version of the Unix pipe() library function, but it doesn't - // give you back descriptors that are selectable. - FD_SET (LoopBreakerReader, &(SelectData.fdreads)); - if (SelectData.maxsocket < LoopBreakerReader) - SelectData.maxsocket = LoopBreakerReader; - - // prepare the sockets for reading and writing - size_t i; - for (i = 0; i < Descriptors.size(); i++) { - EventableDescriptor *ed = Descriptors[i]; - assert (ed); - int sd = ed->GetSocket(); - assert (sd != INVALID_SOCKET); - - if (ed->SelectForRead()) - FD_SET (sd, &(SelectData.fdreads)); - if (ed->SelectForWrite()) - FD_SET (sd, &(SelectData.fdwrites)); - - if (SelectData.maxsocket < sd) - SelectData.maxsocket = sd; - } - - - { // read and write the sockets - //timeval tv = {1, 0}; // Solaris fails if the microseconds member is >= 1000000. - //timeval tv = Quantum; - SelectData.tv = Quantum; - int s = SelectData._Select(); - //rb_thread_blocking_region(xxx,(void*)&SelectData,RUBY_UBF_IO,0); - //int s = EmSelect (SelectData.maxsocket+1, &(SelectData.fdreads), &(SelectData.fdwrites), NULL, &(SelectData.tv)); - //int s = SelectData.nSockets; - if (s > 0) { - /* Changed 01Jun07. We used to handle the Loop-breaker right here. - * Now we do it AFTER all the regular descriptors. There's an - * incredibly important and subtle reason for this. Code on - * loop breakers is sometimes used to cause the reactor core to - * cycle (for example, to allow outbound network buffers to drain). - * If a loop-breaker handler reschedules itself (say, after determining - * that the write buffers are still too full), then it will execute - * IMMEDIATELY if _ReadLoopBreaker is done here instead of after - * the other descriptors are processed. That defeats the whole purpose. - */ - for (i=0; i < Descriptors.size(); i++) { - EventableDescriptor *ed = Descriptors[i]; - assert (ed); - int sd = ed->GetSocket(); - assert (sd != INVALID_SOCKET); - - if (FD_ISSET (sd, &(SelectData.fdwrites))) - ed->Write(); - if (FD_ISSET (sd, &(SelectData.fdreads))) - ed->Read(); - } - - if (FD_ISSET (LoopBreakerReader, &(SelectData.fdreads))) - _ReadLoopBreaker(); - } - else if (s < 0) { - // select can fail on error in a handful of ways. - // If this happens, then wait for a little while to avoid busy-looping. - // If the error was EINTR, we probably caught SIGCHLD or something, - // so keep the wait short. - timeval tv = {0, ((errno == EINTR) ? 5 : 50) * 1000}; - EmSelect (0, NULL, NULL, NULL, &tv); - } - } - - - { // dispatch heartbeats - if (gCurrentLoopTime >= NextHeartbeatTime) { - NextHeartbeatTime = gCurrentLoopTime + HeartbeatInterval; - - for (i=0; i < Descriptors.size(); i++) { - EventableDescriptor *ed = Descriptors[i]; - assert (ed); - ed->Heartbeat(); - } - } - } - - { // cleanup dying sockets - // vector::pop_back works in constant time. - int i, j; - int nSockets = Descriptors.size(); - for (i=0, j=0; i < nSockets; i++) { - EventableDescriptor *ed = Descriptors[i]; - assert (ed); - if (ed->ShouldDelete()) - delete ed; - else - Descriptors [j++] = ed; - } - while ((size_t)j < Descriptors.size()) - Descriptors.pop_back(); - - } - - return true; -} - - -/******************************** -EventMachine_t::_ReadLoopBreaker -********************************/ - -void EventMachine_t::_ReadLoopBreaker() -{ - /* The loop breaker has selected readable. - * Read it ONCE (it may block if we try to read it twice) - * and send a loop-break event back to user code. - */ - char buffer [1024]; - read (LoopBreakerReader, buffer, sizeof(buffer)); - if (EventCallback) - (*EventCallback)("", EM_LOOPBREAK_SIGNAL, "", 0); -} - - -/************************** -EventMachine_t::_RunTimers -**************************/ - -bool EventMachine_t::_RunTimers() -{ - // These are caller-defined timer handlers. - // Return T/F to indicate whether we should continue the main loop. - // We rely on the fact that multimaps sort by their keys to avoid - // inspecting the whole list every time we come here. - // Just keep inspecting and processing the list head until we hit - // one that hasn't expired yet. - - #ifdef OS_UNIX - struct timeval tv; - gettimeofday (&tv, NULL); - Int64 now = (((Int64)(tv.tv_sec)) * 1000000LL) + ((Int64)(tv.tv_usec)); - #endif - - #ifdef OS_WIN32 - unsigned tick = GetTickCount(); - if (tick < gLastTickCount) - gTickCountTickover += 1; - gLastTickCount = tick; - Int64 now = ((Int64)gTickCountTickover << 32) + (Int64)tick; - #endif - - while (true) { - multimap<Int64,Timer_t>::iterator i = Timers.begin(); - if (i == Timers.end()) - break; - if (i->first > now) - break; - if (EventCallback) - (*EventCallback) ("", EM_TIMER_FIRED, i->second.GetBinding().c_str(), i->second.GetBinding().length()); - Timers.erase (i); - } - return true; -} - - - -/*********************************** -EventMachine_t::InstallOneshotTimer -***********************************/ - -const char *EventMachine_t::InstallOneshotTimer (int milliseconds) -{ - if (Timers.size() > MaxOutstandingTimers) - return false; - // Don't use the global loop-time variable here, because we might - // get called before the main event machine is running. - - #ifdef OS_UNIX - struct timeval tv; - gettimeofday (&tv, NULL); - Int64 fire_at = (((Int64)(tv.tv_sec)) * 1000000LL) + ((Int64)(tv.tv_usec)); - fire_at += ((Int64)milliseconds) * 1000LL; - #endif - - #ifdef OS_WIN32 - unsigned tick = GetTickCount(); - if (tick < gLastTickCount) - gTickCountTickover += 1; - gLastTickCount = tick; - - Int64 fire_at = ((Int64)gTickCountTickover << 32) + (Int64)tick; - fire_at += (Int64)milliseconds; - #endif - - Timer_t t; - multimap<Int64,Timer_t>::iterator i = - Timers.insert (make_pair (fire_at, t)); - return i->second.GetBindingChars(); -} - - -/******************************* -EventMachine_t::ConnectToServer -*******************************/ - -const char *EventMachine_t::ConnectToServer (const char *server, int port) -{ - /* We want to spend no more than a few seconds waiting for a connection - * to a remote host. So we use a nonblocking connect. - * Linux disobeys the usual rules for nonblocking connects. - * Per Stevens (UNP p.410), you expect a nonblocking connect to select - * both readable and writable on error, and not to return EINPROGRESS - * if the connect can be fulfilled immediately. Linux violates both - * of these expectations. - * Any kind of nonblocking connect on Linux returns EINPROGRESS. - * The socket will then return writable when the disposition of the - * connect is known, but it will not also be readable in case of - * error! Weirdly, it will be readable in case there is data to read!!! - * (Which can happen with protocols like SSH and SMTP.) - * I suppose if you were so inclined you could consider this logical, - * but it's not the way Unix has historically done it. - * So we ignore the readable flag and read getsockopt to see if there - * was an error connecting. A select timeout works as expected. - * In regard to getsockopt: Linux does the Berkeley-style thing, - * not the Solaris-style, and returns zero with the error code in - * the error parameter. - * Return the binding-text of the newly-created pending connection, - * or NULL if there was a problem. - */ - - if (!server || !*server || !port) - return NULL; - - int family, bind_size; - struct sockaddr *bind_as = name2address (server, port, &family, &bind_size); - if (!bind_as) - return NULL; - - int sd = socket (family, SOCK_STREAM, 0); - if (sd == INVALID_SOCKET) - return NULL; - - /* - sockaddr_in pin; - unsigned long HostAddr; - - HostAddr = inet_addr (server); - if (HostAddr == INADDR_NONE) { - hostent *hp = gethostbyname ((char*)server); // Windows requires (char*) - if (!hp) { - // TODO: This gives the caller a fatal error. Not good. - // They can respond by catching RuntimeError (blecch). - // Possibly we need to fire an unbind event and provide - // a status code so user code can detect the cause of the - // failure. - return NULL; - } - HostAddr = ((in_addr*)(hp->h_addr))->s_addr; - } - - memset (&pin, 0, sizeof(pin)); - pin.sin_family = AF_INET; - pin.sin_addr.s_addr = HostAddr; - pin.sin_port = htons (port); - - int sd = socket (AF_INET, SOCK_STREAM, 0); - if (sd == INVALID_SOCKET) - return NULL; - */ - - // From here on, ALL error returns must close the socket. - // Set the new socket nonblocking. - if (!SetSocketNonblocking (sd)) { - closesocket (sd); - return NULL; - } - // Disable slow-start (Nagle algorithm). - int one = 1; - setsockopt (sd, IPPROTO_TCP, TCP_NODELAY, (char*) &one, sizeof(one)); - - const char *out = NULL; - - #ifdef OS_UNIX - //if (connect (sd, (sockaddr*)&pin, sizeof pin) == 0) { - if (connect (sd, bind_as, bind_size) == 0) { - // This is a connect success, which Linux appears - // never to give when the socket is nonblocking, - // even if the connection is intramachine or to - // localhost. - - /* Changed this branch 08Aug06. Evidently some kernels - * (FreeBSD for example) will actually return success from - * a nonblocking connect. This is a pretty simple case, - * just set up the new connection and clear the pending flag. - * Thanks to Chris Ochs for helping track this down. - * This branch never gets taken on Linux or (oddly) OSX. - * The original behavior was to throw an unimplemented, - * which the user saw as a fatal exception. Very unfriendly. - * - * Tweaked 10Aug06. Even though the connect disposition is - * known, we still set the connect-pending flag. That way - * some needed initialization will happen in the ConnectionDescriptor. - * (To wit, the ConnectionCompleted event gets sent to the client.) - */ - ConnectionDescriptor *cd = new ConnectionDescriptor (sd, this); - if (!cd) - throw std::runtime_error ("no connection allocated"); - cd->SetConnectPending (true); - Add (cd); - out = cd->GetBinding().c_str(); - } - else if (errno == EINPROGRESS) { - // Errno will generally always be EINPROGRESS, but on Linux - // we have to look at getsockopt to be sure what really happened. - int error; - socklen_t len; - len = sizeof(error); - int o = getsockopt (sd, SOL_SOCKET, SO_ERROR, &error, &len); - if ((o == 0) && (error == 0)) { - // Here, there's no disposition. - // Put the connection on the stack and wait for it to complete - // or time out. - ConnectionDescriptor *cd = new ConnectionDescriptor (sd, this); - if (!cd) - throw std::runtime_error ("no connection allocated"); - cd->SetConnectPending (true); - Add (cd); - out = cd->GetBinding().c_str(); - } - else { - /* This could be connection refused or some such thing. - * We will come here on Linux if a localhost connection fails. - * Changed 16Jul06: Originally this branch was a no-op, and - * we'd drop down to the end of the method, close the socket, - * and return NULL, which would cause the caller to GET A - * FATAL EXCEPTION. Now we keep the socket around but schedule an - * immediate close on it, so the caller will get a close-event - * scheduled on it. This was only an issue for localhost connections - * to non-listening ports. We may eventually need to revise this - * revised behavior, in case it causes problems like making it hard - * for people to know that a failure occurred. - */ - ConnectionDescriptor *cd = new ConnectionDescriptor (sd, this); - if (!cd) - throw std::runtime_error ("no connection allocated"); - cd->ScheduleClose (false); - Add (cd); - out = cd->GetBinding().c_str(); - } - } - else { - // The error from connect was something other then EINPROGRESS. - } - #endif - - #ifdef OS_WIN32 - //if (connect (sd, (sockaddr*)&pin, sizeof pin) == 0) { - if (connect (sd, bind_as, bind_size) == 0) { - // This is a connect success, which Windows appears - // never to give when the socket is nonblocking, - // even if the connection is intramachine or to - // localhost. - throw std::runtime_error ("unimplemented"); - } - else if (WSAGetLastError() == WSAEWOULDBLOCK) { - // Here, there's no disposition. - // Windows appears not to surface refused connections or - // such stuff at this point. - // Put the connection on the stack and wait for it to complete - // or time out. - ConnectionDescriptor *cd = new ConnectionDescriptor (sd, this); - if (!cd) - throw std::runtime_error ("no connection allocated"); - cd->SetConnectPending (true); - Add (cd); - out = cd->GetBinding().c_str(); - } - else { - // The error from connect was something other then WSAEWOULDBLOCK. - } - - #endif - - if (out == NULL) - closesocket (sd); - return out; -} - -/*********************************** -EventMachine_t::ConnectToUnixServer -***********************************/ - -const char *EventMachine_t::ConnectToUnixServer (const char *server) -{ - /* Connect to a Unix-domain server, which by definition is running - * on the same host. - * There is no meaningful implementation on Windows. - * There's no need to do a nonblocking connect, since the connection - * is always local and can always be fulfilled immediately. - */ - - #ifdef OS_WIN32 - throw std::runtime_error ("unix-domain connection unavailable on this platform"); - return NULL; - #endif - - // The whole rest of this function is only compiled on Unix systems. - #ifdef OS_UNIX - - const char *out = NULL; - - if (!server || !*server) - return NULL; - - sockaddr_un pun; - memset (&pun, 0, sizeof(pun)); - pun.sun_family = AF_LOCAL; - - // You ordinarily expect the server name field to be at least 1024 bytes long, - // but on Linux it can be MUCH shorter. - if (strlen(server) >= sizeof(pun.sun_path)) - throw std::runtime_error ("unix-domain server name is too long"); - - - strcpy (pun.sun_path, server); - - int fd = socket (AF_LOCAL, SOCK_STREAM, 0); - if (fd == INVALID_SOCKET) - return NULL; - - // From here on, ALL error returns must close the socket. - // NOTE: At this point, the socket is still a blocking socket. - if (connect (fd, (struct sockaddr*)&pun, sizeof(pun)) != 0) { - closesocket (fd); - return NULL; - } - - // Set the newly-connected socket nonblocking. - if (!SetSocketNonblocking (fd)) { - closesocket (fd); - return NULL; - } - - // Set up a connection descriptor and add it to the event-machine. - // Observe, even though we know the connection status is connect-success, - // we still set the "pending" flag, so some needed initializations take - // place. - ConnectionDescriptor *cd = new ConnectionDescriptor (fd, this); - if (!cd) - throw std::runtime_error ("no connection allocated"); - cd->SetConnectPending (true); - Add (cd); - out = cd->GetBinding().c_str(); - - if (out == NULL) - closesocket (fd); - - return out; - #endif -} - -/************************ -EventMachine_t::AttachFD -************************/ - -const char *EventMachine_t::AttachFD (int fd, bool notify_readable, bool notify_writable) -{ - #ifdef OS_UNIX - if (fcntl(fd, F_GETFL, 0) < 0) - throw std::runtime_error ("invalid file descriptor"); - #endif - - #ifdef OS_WIN32 - // TODO: add better check for invalid file descriptors (see ioctlsocket or getsockopt) - if (fd == INVALID_SOCKET) - throw std::runtime_error ("invalid file descriptor"); - #endif - - {// Check for duplicate descriptors - size_t i; - for (i = 0; i < Descriptors.size(); i++) { - EventableDescriptor *ed = Descriptors[i]; - assert (ed); - if (ed->GetSocket() == fd) - throw std::runtime_error ("adding existing descriptor"); - } - - for (i = 0; i < NewDescriptors.size(); i++) { - EventableDescriptor *ed = NewDescriptors[i]; - assert (ed); - if (ed->GetSocket() == fd) - throw std::runtime_error ("adding existing new descriptor"); - } - } - - ConnectionDescriptor *cd = new ConnectionDescriptor (fd, this); - if (!cd) - throw std::runtime_error ("no connection allocated"); - - cd->SetConnectPending (true); - cd->SetNotifyReadable (notify_readable); - cd->SetNotifyWritable (notify_writable); - - Add (cd); - - const char *out = NULL; - out = cd->GetBinding().c_str(); - if (out == NULL) - closesocket (fd); - return out; -} - -/************************ -EventMachine_t::DetachFD -************************/ - -int EventMachine_t::DetachFD (EventableDescriptor *ed) -{ - if (!ed) - throw std::runtime_error ("detaching bad descriptor"); - - #ifdef HAVE_EPOLL - if (bEpoll) { - if (ed->GetSocket() != INVALID_SOCKET) { - assert (bEpoll); // wouldn't be in this method otherwise. - assert (epfd != -1); - int e = epoll_ctl (epfd, EPOLL_CTL_DEL, ed->GetSocket(), ed->GetEpollEvent()); - // ENOENT or EBADF are not errors because the socket may be already closed when we get here. - if (e && (errno != ENOENT) && (errno != EBADF)) { - char buf [200]; - snprintf (buf, sizeof(buf)-1, "unable to delete epoll event: %s", strerror(errno)); - throw std::runtime_error (buf); - } - } - } - #endif - - #ifdef HAVE_KQUEUE - if (bKqueue) { - struct kevent k; - EV_SET (&k, ed->GetSocket(), EVFILT_READ, EV_DELETE, 0, 0, ed); - int t = kevent (kqfd, &k, 1, NULL, 0, NULL); - assert (t == 0); - } - #endif - - { // remove descriptor from lists - int i, j; - int nSockets = Descriptors.size(); - for (i=0, j=0; i < nSockets; i++) { - EventableDescriptor *ted = Descriptors[i]; - assert (ted); - if (ted != ed) - Descriptors [j++] = ted; - } - while ((size_t)j < Descriptors.size()) - Descriptors.pop_back(); - - ModifiedDescriptors.erase (ed); - } - - int fd = ed->GetSocket(); - - // We depend on ~EventableDescriptor not calling close() if the socket is invalid - ed->SetSocketInvalid(); - delete ed; - - return fd; -} - -/************ -name2address -************/ - -struct sockaddr *name2address (const char *server, int port, int *family, int *bind_size) -{ - // THIS IS NOT RE-ENTRANT OR THREADSAFE. Optimize for speed. - // Check the more-common cases first. - // Return NULL if no resolution. - - static struct sockaddr_in in4; - #ifndef __CYGWIN__ - static struct sockaddr_in6 in6; - #endif - struct hostent *hp; - - if (!server || !*server) - server = "0.0.0.0"; - - memset (&in4, 0, sizeof(in4)); - if ( (in4.sin_addr.s_addr = inet_addr (server)) != INADDR_NONE) { - if (family) - *family = AF_INET; - if (bind_size) - *bind_size = sizeof(in4); - in4.sin_family = AF_INET; - in4.sin_port = htons (port); - return (struct sockaddr*)&in4; - } - - #if defined(OS_UNIX) && !defined(__CYGWIN__) - memset (&in6, 0, sizeof(in6)); - if (inet_pton (AF_INET6, server, in6.sin6_addr.s6_addr) > 0) { - if (family) - *family = AF_INET6; - if (bind_size) - *bind_size = sizeof(in6); - in6.sin6_family = AF_INET6; - in6.sin6_port = htons (port); - return (struct sockaddr*)&in6; - } - #endif - - #ifdef OS_WIN32 - // TODO, must complete this branch. Windows doesn't have inet_pton. - // A possible approach is to make a getaddrinfo call with the supplied - // server address, constraining the hints to ipv6 and seeing if we - // get any addresses. - // For the time being, Ipv6 addresses aren't supported on Windows. - #endif - - hp = gethostbyname ((char*)server); // Windows requires the cast. - if (hp) { - in4.sin_addr.s_addr = ((in_addr*)(hp->h_addr))->s_addr; - if (family) - *family = AF_INET; - if (bind_size) - *bind_size = sizeof(in4); - in4.sin_family = AF_INET; - in4.sin_port = htons (port); - return (struct sockaddr*)&in4; - } - - return NULL; -} - - -/******************************* -EventMachine_t::CreateTcpServer -*******************************/ - -const char *EventMachine_t::CreateTcpServer (const char *server, int port) -{ - /* Create a TCP-acceptor (server) socket and add it to the event machine. - * Return the binding of the new acceptor to the caller. - * This binding will be referenced when the new acceptor sends events - * to indicate accepted connections. - */ - - - int family, bind_size; - struct sockaddr *bind_here = name2address (server, port, &family, &bind_size); - if (!bind_here) - return NULL; - - const char *output_binding = NULL; - - //struct sockaddr_in sin; - - int sd_accept = socket (family, SOCK_STREAM, 0); - if (sd_accept == INVALID_SOCKET) { - goto fail; - } - - /* - memset (&sin, 0, sizeof(sin)); - sin.sin_family = AF_INET; - sin.sin_addr.s_addr = INADDR_ANY; - sin.sin_port = htons (port); - - if (server && *server) { - sin.sin_addr.s_addr = inet_addr (server); - if (sin.sin_addr.s_addr == INADDR_NONE) { - hostent *hp = gethostbyname ((char*)server); // Windows requires the cast. - if (hp == NULL) { - //__warning ("hostname not resolved: ", server); - goto fail; - } - sin.sin_addr.s_addr = ((in_addr*)(hp->h_addr))->s_addr; - } - } - */ - - { // set reuseaddr to improve performance on restarts. - int oval = 1; - if (setsockopt (sd_accept, SOL_SOCKET, SO_REUSEADDR, (char*)&oval, sizeof(oval)) < 0) { - //__warning ("setsockopt failed while creating listener",""); - goto fail; - } - } - - { // set CLOEXEC. Only makes sense on Unix - #ifdef OS_UNIX - int cloexec = fcntl (sd_accept, F_GETFD, 0); - assert (cloexec >= 0); - cloexec |= FD_CLOEXEC; - fcntl (sd_accept, F_SETFD, cloexec); - #endif - } - - - //if (bind (sd_accept, (struct sockaddr*)&sin, sizeof(sin))) { - if (bind (sd_accept, bind_here, bind_size)) { - //__warning ("binding failed"); - goto fail; - } - - if (listen (sd_accept, 100)) { - //__warning ("listen failed"); - goto fail; - } - - { - // Set the acceptor non-blocking. - // THIS IS CRUCIALLY IMPORTANT because we read it in a select loop. - if (!SetSocketNonblocking (sd_accept)) { - //int val = fcntl (sd_accept, F_GETFL, 0); - //if (fcntl (sd_accept, F_SETFL, val | O_NONBLOCK) == -1) { - goto fail; - } - } - - { // Looking good. - AcceptorDescriptor *ad = new AcceptorDescriptor (sd_accept, this); - if (!ad) - throw std::runtime_error ("unable to allocate acceptor"); - Add (ad); - output_binding = ad->GetBinding().c_str(); - } - - return output_binding; - - fail: - if (sd_accept != INVALID_SOCKET) - closesocket (sd_accept); - return NULL; -} - - -/********************************** -EventMachine_t::OpenDatagramSocket -**********************************/ - -const char *EventMachine_t::OpenDatagramSocket (const char *address, int port) -{ - const char *output_binding = NULL; - - int sd = socket (AF_INET, SOCK_DGRAM, 0); - if (sd == INVALID_SOCKET) - goto fail; - // from here on, early returns must close the socket! - - - struct sockaddr_in sin; - memset (&sin, 0, sizeof(sin)); - sin.sin_family = AF_INET; - sin.sin_port = htons (port); - - - if (address && *address) { - sin.sin_addr.s_addr = inet_addr (address); - if (sin.sin_addr.s_addr == INADDR_NONE) { - hostent *hp = gethostbyname ((char*)address); // Windows requires the cast. - if (hp == NULL) - goto fail; - sin.sin_addr.s_addr = ((in_addr*)(hp->h_addr))->s_addr; - } - } - else - sin.sin_addr.s_addr = htonl (INADDR_ANY); - - - // Set the new socket nonblocking. - { - if (!SetSocketNonblocking (sd)) - //int val = fcntl (sd, F_GETFL, 0); - //if (fcntl (sd, F_SETFL, val | O_NONBLOCK) == -1) - goto fail; - } - - if (bind (sd, (struct sockaddr*)&sin, sizeof(sin)) != 0) - goto fail; - - { // Looking good. - DatagramDescriptor *ds = new DatagramDescriptor (sd, this); - if (!ds) - throw std::runtime_error ("unable to allocate datagram-socket"); - Add (ds); - output_binding = ds->GetBinding().c_str(); - } - - return output_binding; - - fail: - if (sd != INVALID_SOCKET) - closesocket (sd); - return NULL; -} - - - -/******************* -EventMachine_t::Add -*******************/ - -void EventMachine_t::Add (EventableDescriptor *ed) -{ - if (!ed) - throw std::runtime_error ("added bad descriptor"); - ed->SetEventCallback (EventCallback); - NewDescriptors.push_back (ed); -} - - -/******************************* -EventMachine_t::ArmKqueueWriter -*******************************/ - -void EventMachine_t::ArmKqueueWriter (EventableDescriptor *ed) -{ - #ifdef HAVE_KQUEUE - if (bKqueue) { - if (!ed) - throw std::runtime_error ("added bad descriptor"); - struct kevent k; - EV_SET (&k, ed->GetSocket(), EVFILT_WRITE, EV_ADD | EV_ONESHOT, 0, 0, ed); - int t = kevent (kqfd, &k, 1, NULL, 0, NULL); - assert (t == 0); - } - #endif -} - -/******************************* -EventMachine_t::ArmKqueueReader -*******************************/ - -void EventMachine_t::ArmKqueueReader (EventableDescriptor *ed) -{ - #ifdef HAVE_KQUEUE - if (bKqueue) { - if (!ed) - throw std::runtime_error ("added bad descriptor"); - struct kevent k; - EV_SET (&k, ed->GetSocket(), EVFILT_READ, EV_ADD, 0, 0, ed); - int t = kevent (kqfd, &k, 1, NULL, 0, NULL); - assert (t == 0); - } - #endif -} - -/********************************** -EventMachine_t::_AddNewDescriptors -**********************************/ - -void EventMachine_t::_AddNewDescriptors() -{ - /* Avoid adding descriptors to the main descriptor list - * while we're actually traversing the list. - * Any descriptors that are added as a result of processing timers - * or acceptors should go on a temporary queue and then added - * while we're not traversing the main list. - * Also, it (rarely) happens that a newly-created descriptor - * is immediately scheduled to close. It might be a good - * idea not to bother scheduling these for I/O but if - * we do that, we might bypass some important processing. - */ - - for (size_t i = 0; i < NewDescriptors.size(); i++) { - EventableDescriptor *ed = NewDescriptors[i]; - if (ed == NULL) - throw std::runtime_error ("adding bad descriptor"); - - #if HAVE_EPOLL - if (bEpoll) { - assert (epfd != -1); - int e = epoll_ctl (epfd, EPOLL_CTL_ADD, ed->GetSocket(), ed->GetEpollEvent()); - if (e) { - char buf [200]; - snprintf (buf, sizeof(buf)-1, "unable to add new descriptor: %s", strerror(errno)); - throw std::runtime_error (buf); - } - } - #endif - - #if HAVE_KQUEUE - /* - if (bKqueue) { - // INCOMPLETE. Some descriptors don't want to be readable. - assert (kqfd != -1); - struct kevent k; - EV_SET (&k, ed->GetSocket(), EVFILT_READ, EV_ADD, 0, 0, ed); - int t = kevent (kqfd, &k, 1, NULL, 0, NULL); - assert (t == 0); - } - */ - #endif - - Descriptors.push_back (ed); - } - NewDescriptors.clear(); -} - - -/********************************** -EventMachine_t::_ModifyDescriptors -**********************************/ - -void EventMachine_t::_ModifyDescriptors() -{ - /* For implementations which don't level check every descriptor on - * every pass through the machine, as select does. - * If we're not selecting, then descriptors need a way to signal to the - * machine that their readable or writable status has changed. - * That's what the ::Modify call is for. We do it this way to avoid - * modifying descriptors during the loop traversal, where it can easily - * happen that an object (like a UDP socket) gets data written on it by - * the application during #post_init. That would take place BEFORE the - * descriptor even gets added to the epoll descriptor, so the modify - * operation will crash messily. - * Another really messy possibility is for a descriptor to put itself - * on the Modified list, and then get deleted before we get here. - * Remember, deletes happen after the I/O traversal and before the - * next pass through here. So we have to make sure when we delete a - * descriptor to remove it from the Modified list. - */ - - #ifdef HAVE_EPOLL - if (bEpoll) { - set<EventableDescriptor*>::iterator i = ModifiedDescriptors.begin(); - while (i != ModifiedDescriptors.end()) { - assert (*i); - _ModifyEpollEvent (*i); - ++i; - } - } - #endif - - ModifiedDescriptors.clear(); -} - - -/********************** -EventMachine_t::Modify -**********************/ - -void EventMachine_t::Modify (EventableDescriptor *ed) -{ - if (!ed) - throw std::runtime_error ("modified bad descriptor"); - ModifiedDescriptors.insert (ed); -} - - -/*********************************** -EventMachine_t::_OpenFileForWriting -***********************************/ - -const char *EventMachine_t::_OpenFileForWriting (const char *filename) -{ - /* - * Return the binding-text of the newly-opened file, - * or NULL if there was a problem. - */ - - if (!filename || !*filename) - return NULL; - - int fd = open (filename, O_CREAT|O_TRUNC|O_WRONLY|O_NONBLOCK, 0644); - - FileStreamDescriptor *fsd = new FileStreamDescriptor (fd, this); - if (!fsd) - throw std::runtime_error ("no file-stream allocated"); - Add (fsd); - return fsd->GetBinding().c_str(); - -} - - -/************************************** -EventMachine_t::CreateUnixDomainServer -**************************************/ - -const char *EventMachine_t::CreateUnixDomainServer (const char *filename) -{ - /* Create a UNIX-domain acceptor (server) socket and add it to the event machine. - * Return the binding of the new acceptor to the caller. - * This binding will be referenced when the new acceptor sends events - * to indicate accepted connections. - * THERE IS NO MEANINGFUL IMPLEMENTATION ON WINDOWS. - */ - - #ifdef OS_WIN32 - throw std::runtime_error ("unix-domain server unavailable on this platform"); - #endif - - // The whole rest of this function is only compiled on Unix systems. - #ifdef OS_UNIX - const char *output_binding = NULL; - - struct sockaddr_un s_sun; - - int sd_accept = socket (AF_LOCAL, SOCK_STREAM, 0); - if (sd_accept == INVALID_SOCKET) { - goto fail; - } - - if (!filename || !*filename) - goto fail; - unlink (filename); - - bzero (&s_sun, sizeof(s_sun)); - s_sun.sun_family = AF_LOCAL; - strncpy (s_sun.sun_path, filename, sizeof(s_sun.sun_path)-1); - - // don't bother with reuseaddr for a local socket. - - { // set CLOEXEC. Only makes sense on Unix - #ifdef OS_UNIX - int cloexec = fcntl (sd_accept, F_GETFD, 0); - assert (cloexec >= 0); - cloexec |= FD_CLOEXEC; - fcntl (sd_accept, F_SETFD, cloexec); - #endif - } - - if (bind (sd_accept, (struct sockaddr*)&s_sun, sizeof(s_sun))) { - //__warning ("binding failed"); - goto fail; - } - - if (listen (sd_accept, 100)) { - //__warning ("listen failed"); - goto fail; - } - - { - // Set the acceptor non-blocking. - // THIS IS CRUCIALLY IMPORTANT because we read it in a select loop. - if (!SetSocketNonblocking (sd_accept)) { - //int val = fcntl (sd_accept, F_GETFL, 0); - //if (fcntl (sd_accept, F_SETFL, val | O_NONBLOCK) == -1) { - goto fail; - } - } - - { // Looking good. - AcceptorDescriptor *ad = new AcceptorDescriptor (sd_accept, this); - if (!ad) - throw std::runtime_error ("unable to allocate acceptor"); - Add (ad); - output_binding = ad->GetBinding().c_str(); - } - - return output_binding; - - fail: - if (sd_accept != INVALID_SOCKET) - closesocket (sd_accept); - return NULL; - #endif // OS_UNIX -} - - -/********************* -EventMachine_t::Popen -*********************/ -#if OBSOLETE -const char *EventMachine_t::Popen (const char *cmd, const char *mode) -{ - #ifdef OS_WIN32 - throw std::runtime_error ("popen is currently unavailable on this platform"); - #endif - - // The whole rest of this function is only compiled on Unix systems. - // Eventually we need this functionality (or a full-duplex equivalent) on Windows. - #ifdef OS_UNIX - const char *output_binding = NULL; - - FILE *fp = popen (cmd, mode); - if (!fp) - return NULL; - - // From here, all early returns must pclose the stream. - - // According to the pipe(2) manpage, descriptors returned from pipe have both - // CLOEXEC and NONBLOCK clear. Do NOT set CLOEXEC. DO set nonblocking. - if (!SetSocketNonblocking (fileno (fp))) { - pclose (fp); - return NULL; - } - - { // Looking good. - PipeDescriptor *pd = new PipeDescriptor (fp, this); - if (!pd) - throw std::runtime_error ("unable to allocate pipe"); - Add (pd); - output_binding = pd->GetBinding().c_str(); - } - - return output_binding; - #endif -} -#endif // OBSOLETE - -/************************** -EventMachine_t::Socketpair -**************************/ - -const char *EventMachine_t::Socketpair (char * const*cmd_strings) -{ - #ifdef OS_WIN32 - throw std::runtime_error ("socketpair is currently unavailable on this platform"); - #endif - - // The whole rest of this function is only compiled on Unix systems. - // Eventually we need this functionality (or a full-duplex equivalent) on Windows. - #ifdef OS_UNIX - // Make sure the incoming array of command strings is sane. - if (!cmd_strings) - return NULL; - int j; - for (j=0; j < 100 && cmd_strings[j]; j++) - ; - if ((j==0) || (j==100)) - return NULL; - - const char *output_binding = NULL; - - int sv[2]; - if (socketpair (AF_LOCAL, SOCK_STREAM, 0, sv) < 0) - return NULL; - // from here, all early returns must close the pair of sockets. - - // Set the parent side of the socketpair nonblocking. - // We don't care about the child side, and most child processes will expect their - // stdout to be blocking. Thanks to Duane Johnson and Bill Kelly for pointing this out. - // Obviously DON'T set CLOEXEC. - if (!SetSocketNonblocking (sv[0])) { - close (sv[0]); - close (sv[1]); - return NULL; - } - - pid_t f = fork(); - if (f > 0) { - close (sv[1]); - PipeDescriptor *pd = new PipeDescriptor (sv[0], f, this); - if (!pd) - throw std::runtime_error ("unable to allocate pipe"); - Add (pd); - output_binding = pd->GetBinding().c_str(); - } - else if (f == 0) { - close (sv[0]); - dup2 (sv[1], STDIN_FILENO); - close (sv[1]); - dup2 (STDIN_FILENO, STDOUT_FILENO); - execvp (cmd_strings[0], cmd_strings+1); - exit (-1); // end the child process if the exec doesn't work. - } - else - throw std::runtime_error ("no fork"); - - return output_binding; - #endif -} - - -/**************************** -EventMachine_t::OpenKeyboard -****************************/ - -const char *EventMachine_t::OpenKeyboard() -{ - KeyboardDescriptor *kd = new KeyboardDescriptor (this); - if (!kd) - throw std::runtime_error ("no keyboard-object allocated"); - Add (kd); - return kd->GetBinding().c_str(); -} - - - - - -//#endif // OS_UNIX - -/*****************************************************************************
-
-$Id$
-
-File: emwin.cpp
-Date: 05May06
-
-Copyright (C) 2006-07 by Francis Cianfrocca. All Rights Reserved.
-Gmail: blackhedd
-
-This program is free software; you can redistribute it and/or modify
-it under the terms of either: 1) the GNU General Public License
-as published by the Free Software Foundation; either version 2 of the
-License, or (at your option) any later version; or 2) Ruby's License.
-
-See the file COPYING for complete licensing information.
-
-*****************************************************************************/
-
-
-// THIS ENTIRE FILE IS FOR WINDOWS BUILDS ONLY
-// INCOMPLETE AND DISABLED FOR NOW.
-#ifdef xOS_WIN32
-
-#include "project.h"
-
-
-// Keep a global variable floating around
-// with the current loop time as set by the Event Machine.
-// This avoids the need for frequent expensive calls to time(NULL);
-time_t gCurrentLoopTime;
-
-
-/******************************
-EventMachine_t::EventMachine_t
-******************************/
-
-EventMachine_t::EventMachine_t (void (*event_callback)(const char*, int, const char*, int)):
- EventCallback (event_callback),
- NextHeartbeatTime (0)
-{
- gTerminateSignalReceived = false;
- Iocp = NULL;
-}
-
-
-/*******************************
-EventMachine_t::~EventMachine_t
-*******************************/
-
-EventMachine_t::~EventMachine_t()
-{
- cerr << "EM __dt\n";
- if (Iocp)
- CloseHandle (Iocp);
-}
-
-
-/****************************
-EventMachine_t::ScheduleHalt
-****************************/
-
-void EventMachine_t::ScheduleHalt()
-{
- /* This is how we stop the machine.
- * This can be called by clients. Signal handlers will probably
- * set the global flag.
- * For now this means there can only be one EventMachine ever running at a time.
- */
- gTerminateSignalReceived = true;
-}
-
-
-
-/*******************
-EventMachine_t::Run
-*******************/
-
-void EventMachine_t::Run()
-{
- HookControlC (true);
-
- Iocp = CreateIoCompletionPort (INVALID_HANDLE_VALUE, NULL, 0, 0);
- if (Iocp == NULL)
- throw std::runtime_error ("no completion port");
-
-
- DWORD nBytes, nCompletionKey;
- LPOVERLAPPED Overlapped;
-
- do {
- gCurrentLoopTime = time(NULL);
- // Have some kind of strategy that will dequeue maybe up to 10 completions
- // without running the timers as long as they are available immediately.
- // Otherwise in a busy server we're calling them every time through the loop.
- if (!_RunTimers())
- break;
- if (GetQueuedCompletionStatus (Iocp, &nBytes, &nCompletionKey, &Overlapped, 1000)) {
- }
- cerr << "+";
- } while (!gTerminateSignalReceived);
-
-
- /*
- while (true) {
- gCurrentLoopTime = time(NULL);
- if (!_RunTimers())
- break;
- _AddNewDescriptors();
- if (!_RunOnce())
- break;
- if (gTerminateSignalReceived)
- break;
- }
- */
-
- HookControlC (false);
-}
-
-
-/**************************
-EventMachine_t::_RunTimers
-**************************/
-
-bool EventMachine_t::_RunTimers()
-{
- // These are caller-defined timer handlers.
- // Return T/F to indicate whether we should continue the main loop.
- // We rely on the fact that multimaps sort by their keys to avoid
- // inspecting the whole list every time we come here.
- // Just keep inspecting and processing the list head until we hit
- // one that hasn't expired yet.
-
- while (true) {
- multimap<time_t,Timer_t>::iterator i = Timers.begin();
- if (i == Timers.end())
- break;
- if (i->first > gCurrentLoopTime)
- break;
- if (EventCallback)
- (*EventCallback) ("", EM_TIMER_FIRED, i->second.GetBinding().c_str(), i->second.GetBinding().length());
- Timers.erase (i);
- }
- return true;
-}
-
-
-/***********************************
-EventMachine_t::InstallOneshotTimer
-***********************************/
-
-const char *EventMachine_t::InstallOneshotTimer (int seconds)
-{
- if (Timers.size() > MaxOutstandingTimers)
- return false;
- // Don't use the global loop-time variable here, because we might
- // get called before the main event machine is running.
-
- Timer_t t;
- Timers.insert (make_pair (time(NULL) + seconds, t));
- return t.GetBinding().c_str();
-}
-
-
-/**********************************
-EventMachine_t::OpenDatagramSocket
-**********************************/
-
-const char *EventMachine_t::OpenDatagramSocket (const char *address, int port)
-{
- cerr << "OPEN DATAGRAM SOCKET\n";
- return "Unimplemented";
-}
-
-
-/*******************************
-EventMachine_t::CreateTcpServer
-*******************************/
-
-const char *EventMachine_t::CreateTcpServer (const char *server, int port)
-{
- /* Create a TCP-acceptor (server) socket and add it to the event machine.
- * Return the binding of the new acceptor to the caller.
- * This binding will be referenced when the new acceptor sends events
- * to indicate accepted connections.
- */
-
- const char *output_binding = NULL;
-
- struct sockaddr_in sin;
-
- SOCKET sd_accept = socket (AF_INET, SOCK_STREAM, 0);
- if (sd_accept == INVALID_SOCKET) {
- goto fail;
- }
-
- memset (&sin, 0, sizeof(sin));
- sin.sin_family = AF_INET;
- sin.sin_addr.s_addr = INADDR_ANY;
- sin.sin_port = htons (port);
-
- if (server && *server) {
- sin.sin_addr.s_addr = inet_addr (server);
- if (sin.sin_addr.s_addr == INADDR_NONE) {
- hostent *hp = gethostbyname (server);
- if (hp == NULL) {
- //__warning ("hostname not resolved: ", server);
- goto fail;
- }
- sin.sin_addr.s_addr = ((in_addr*)(hp->h_addr))->s_addr;
- }
- }
-
-
- // No need to set reuseaddr on Windows.
-
-
- if (bind (sd_accept, (struct sockaddr*)&sin, sizeof(sin))) {
- //__warning ("binding failed");
- goto fail;
- }
-
- if (listen (sd_accept, 100)) {
- //__warning ("listen failed");
- goto fail;
- }
-
- { // Looking good.
- AcceptorDescriptor *ad = new AcceptorDescriptor (this, sd_accept);
- if (!ad)
- throw std::runtime_error ("unable to allocate acceptor");
- Add (ad);
- output_binding = ad->GetBinding().c_str();
-
- CreateIoCompletionPort ((HANDLE)sd_accept, Iocp, NULL, 0);
- SOCKET sd = socket (AF_INET, SOCK_STREAM, 0);
- CreateIoCompletionPort ((HANDLE)sd, Iocp, NULL, 0);
- AcceptEx (sd_accept, sd,
- }
-
- return output_binding;
-
- fail:
- if (sd_accept != INVALID_SOCKET)
- closesocket (sd_accept);
- return NULL;
-}
-
-
-/*******************************
-EventMachine_t::ConnectToServer
-*******************************/
-
-const char *EventMachine_t::ConnectToServer (const char *server, int port)
-{
- if (!server || !*server || !port)
- return NULL;
-
- sockaddr_in pin;
- unsigned long HostAddr;
-
- HostAddr = inet_addr (server);
- if (HostAddr == INADDR_NONE) {
- hostent *hp = gethostbyname (server);
- if (!hp)
- return NULL;
- HostAddr = ((in_addr*)(hp->h_addr))->s_addr;
- }
-
- memset (&pin, 0, sizeof(pin));
- pin.sin_family = AF_INET;
- pin.sin_addr.s_addr = HostAddr;
- pin.sin_port = htons (port);
-
- int sd = socket (AF_INET, SOCK_STREAM, 0);
- if (sd == INVALID_SOCKET)
- return NULL;
-
-
- LPOVERLAPPED olap = (LPOVERLAPPED) calloc (1, sizeof (OVERLAPPED));
- cerr << "I'm dying now\n";
- throw runtime_error ("UNIMPLEMENTED!!!\n");
-
-}
-
-
-
-/*******************
-EventMachine_t::Add
-*******************/
-
-void EventMachine_t::Add (EventableDescriptor *ed)
-{
- cerr << "ADD\n";
-}
-
-
-
-#endif // OS_WIN32
-
-/***************************************************************************** - -$Id$ - -File: epoll.cpp -Date: 06Jun07 - -Copyright (C) 2006-07 by Francis Cianfrocca. All Rights Reserved. -Gmail: blackhedd - -This program is free software; you can redistribute it and/or modify -it under the terms of either: 1) the GNU General Public License -as published by the Free Software Foundation; either version 2 of the -License, or (at your option) any later version; or 2) Ruby's License. - -See the file COPYING for complete licensing information. - -*****************************************************************************/ - - -#ifdef HAVE_EPOLL - -#include "project.h" - -#endif // HAVE_EPOLL - -/***************************************************************************** - -$Id: mapper.cpp 4527 2007-07-04 10:21:34Z francis $ - -File: mapper.cpp -Date: 02Jul07 - -Copyright (C) 2007 by Francis Cianfrocca. All Rights Reserved. -Gmail: garbagecat10 - -This program is free software; you can redistribute it and/or modify -it under the terms of either: 1) the GNU General Public License -as published by the Free Software Foundation; either version 2 of the -License, or (at your option) any later version; or 2) Ruby's License. - -See the file COPYING for complete licensing information. - -*****************************************************************************/ - - -////////////////////////////////////////////////////////////////////// -// UNIX implementation -////////////////////////////////////////////////////////////////////// - - -#ifdef OS_UNIX - -#include <sys/types.h> -#include <sys/stat.h> -#include <sys/mman.h> -#include <fcntl.h> -#include <errno.h> - -#include <iostream> -#include "unistd.h" -#include <string> -#include <cstring> -#include <stdexcept> -using namespace std; - -#include "mapper.h" - -/****************** -Mapper_t::Mapper_t -******************/ - -Mapper_t::Mapper_t (const string &filename) -{ - /* We ASSUME we can open the file. - * (More precisely, we assume someone else checked before we got here.) - */ - - Fd = open (filename.c_str(), O_RDONLY); - if (Fd < 0) - throw runtime_error (strerror (errno)); - - struct stat st; - if (fstat (Fd, &st)) - throw runtime_error (strerror (errno)); - FileSize = st.st_size; - - MapPoint = (const char*) mmap (0, FileSize, PROT_READ, MAP_SHARED, Fd, 0); - if (MapPoint == MAP_FAILED) - throw runtime_error (strerror (errno)); -} - - -/******************* -Mapper_t::~Mapper_t -*******************/ - -Mapper_t::~Mapper_t() -{ - Close(); -} - - -/*************** -Mapper_t::Close -***************/ - -void Mapper_t::Close() -{ - // Can be called multiple times. - // Calls to GetChunk are invalid after a call to Close. - if (MapPoint) { - munmap ((void*)MapPoint, FileSize); - MapPoint = NULL; - } - if (Fd >= 0) { - close (Fd); - Fd = -1; - } -} - -/****************** -Mapper_t::GetChunk -******************/ - -const char *Mapper_t::GetChunk (unsigned start) -{ - return MapPoint + start; -} - - - -#endif // OS_UNIX - - -////////////////////////////////////////////////////////////////////// -// WINDOWS implementation -////////////////////////////////////////////////////////////////////// - -#ifdef OS_WIN32 - -#include <windows.h> - -#include <iostream> -#include <string> -#include <stdexcept> -using namespace std; - -#include "mapper.h" - -/****************** -Mapper_t::Mapper_t -******************/ - -Mapper_t::Mapper_t (const string &filename) -{ - /* We ASSUME we can open the file. - * (More precisely, we assume someone else checked before we got here.) - */ - - hFile = INVALID_HANDLE_VALUE; - hMapping = NULL; - MapPoint = NULL; - FileSize = 0; - - hFile = CreateFile (filename.c_str(), GENERIC_READ|GENERIC_WRITE, FILE_SHARE_DELETE|FILE_SHARE_READ|FILE_SHARE_WRITE, NULL, OPEN_EXISTING, FILE_ATTRIBUTE_NORMAL, NULL); - - if (hFile == INVALID_HANDLE_VALUE) - throw runtime_error ("File not found"); - - BY_HANDLE_FILE_INFORMATION i; - if (GetFileInformationByHandle (hFile, &i)) - FileSize = i.nFileSizeLow; - - hMapping = CreateFileMapping (hFile, NULL, PAGE_READWRITE, 0, 0, NULL); - if (!hMapping) - throw runtime_error ("File not mapped"); - - MapPoint = (const char*) MapViewOfFile (hMapping, FILE_MAP_WRITE, 0, 0, 0); - if (!MapPoint) - throw runtime_error ("Mappoint not read"); -} - - -/******************* -Mapper_t::~Mapper_t -*******************/ - -Mapper_t::~Mapper_t() -{ - Close(); -} - -/*************** -Mapper_t::Close -***************/ - -void Mapper_t::Close() -{ - // Can be called multiple times. - // Calls to GetChunk are invalid after a call to Close. - if (MapPoint) { - UnmapViewOfFile (MapPoint); - MapPoint = NULL; - } - if (hMapping != NULL) { - CloseHandle (hMapping); - hMapping = NULL; - } - if (hFile != INVALID_HANDLE_VALUE) { - CloseHandle (hFile); - hMapping = INVALID_HANDLE_VALUE; - } -} - - -/****************** -Mapper_t::GetChunk -******************/ - -const char *Mapper_t::GetChunk (unsigned start) -{ - return MapPoint + start; -} - - - -#endif // OS_WINDOWS -/***************************************************************************** - -$Id: rubymain.cpp 4529 2007-07-04 11:32:22Z francis $ - -File: rubymain.cpp -Date: 02Jul07 - -Copyright (C) 2007 by Francis Cianfrocca. All Rights Reserved. -Gmail: garbagecat10 - -This program is free software; you can redistribute it and/or modify -it under the terms of either: 1) the GNU General Public License -as published by the Free Software Foundation; either version 2 of the -License, or (at your option) any later version; or 2) Ruby's License. - -See the file COPYING for complete licensing information. - -*****************************************************************************/ - - - -#include <iostream> -#include <stdexcept> -using namespace std; - -#include <ruby.h> -#include "mapper.h" - -static VALUE EmModule; -static VALUE FastFileReader; -static VALUE Mapper; - - - -/********* -mapper_dt -*********/ - -static void mapper_dt (void *ptr) -{ - if (ptr) - delete (Mapper_t*) ptr; -} - -/********** -mapper_new -**********/ - -static VALUE mapper_new (VALUE self, VALUE filename) -{ - Mapper_t *m = new Mapper_t (StringValuePtr (filename)); - if (!m) - rb_raise (rb_eException, "No Mapper Object"); - VALUE v = Data_Wrap_Struct (Mapper, 0, mapper_dt, (void*)m); - return v; -} - - -/**************** -mapper_get_chunk -****************/ - -static VALUE mapper_get_chunk (VALUE self, VALUE start, VALUE length) -{ - Mapper_t *m = NULL; - Data_Get_Struct (self, Mapper_t, m); - if (!m) - rb_raise (rb_eException, "No Mapper Object"); - - // TODO, what if some moron sends us a negative start value? - unsigned _start = NUM2INT (start); - unsigned _length = NUM2INT (length); - if ((_start + _length) > m->GetFileSize()) - rb_raise (rb_eException, "Mapper Range Error"); - - const char *chunk = m->GetChunk (_start); - if (!chunk) - rb_raise (rb_eException, "No Mapper Chunk"); - return rb_str_new (chunk, _length); -} - -/************ -mapper_close -************/ - -static VALUE mapper_close (VALUE self) -{ - Mapper_t *m = NULL; - Data_Get_Struct (self, Mapper_t, m); - if (!m) - rb_raise (rb_eException, "No Mapper Object"); - m->Close(); - return Qnil; -} - -/*********** -mapper_size -***********/ - -static VALUE mapper_size (VALUE self) -{ - Mapper_t *m = NULL; - Data_Get_Struct (self, Mapper_t, m); - if (!m) - rb_raise (rb_eException, "No Mapper Object"); - return INT2NUM (m->GetFileSize()); -} - - -/********************** -Init_fastfilereaderext -**********************/ - -extern "C" void Init_fastfilereaderext() -{ - EmModule = rb_define_module ("EventMachine"); - FastFileReader = rb_define_class_under (EmModule, "FastFileReader", rb_cObject); - Mapper = rb_define_class_under (FastFileReader, "Mapper", rb_cObject); - - rb_define_module_function (Mapper, "new", (VALUE(*)(...))mapper_new, 1); - rb_define_method (Mapper, "size", (VALUE(*)(...))mapper_size, 0); - rb_define_method (Mapper, "close", (VALUE(*)(...))mapper_close, 0); - rb_define_method (Mapper, "get_chunk", (VALUE(*)(...))mapper_get_chunk, 2); -} - - - -/***************************************************************************** - -$Id$ - -File: files.cpp -Date: 26Aug06 - -Copyright (C) 2006-07 by Francis Cianfrocca. All Rights Reserved. -Gmail: blackhedd - -This program is free software; you can redistribute it and/or modify -it under the terms of either: 1) the GNU General Public License -as published by the Free Software Foundation; either version 2 of the -License, or (at your option) any later version; or 2) Ruby's License. - -See the file COPYING for complete licensing information. - -*****************************************************************************/ - -#include "project.h" - - -/****************************************** -FileStreamDescriptor::FileStreamDescriptor -******************************************/ - -FileStreamDescriptor::FileStreamDescriptor (int fd, EventMachine_t *em): - EventableDescriptor (fd, em), - OutboundDataSize (0) -{ -cerr << "#####"; -} - - -/******************************************* -FileStreamDescriptor::~FileStreamDescriptor -*******************************************/ - -FileStreamDescriptor::~FileStreamDescriptor() -{ - // Run down any stranded outbound data. - for (size_t i=0; i < OutboundPages.size(); i++) - OutboundPages[i].Free(); -} - - -/************************** -FileStreamDescriptor::Read -**************************/ - -void FileStreamDescriptor::Read() -{ -} - -/*************************** -FileStreamDescriptor::Write -***************************/ - -void FileStreamDescriptor::Write() -{ -} - - -/******************************* -FileStreamDescriptor::Heartbeat -*******************************/ - -void FileStreamDescriptor::Heartbeat() -{ -} - - -/*********************************** -FileStreamDescriptor::SelectForRead -***********************************/ - -bool FileStreamDescriptor::SelectForRead() -{ - cerr << "R?"; - return false; -} - - -/************************************ -FileStreamDescriptor::SelectForWrite -************************************/ - -bool FileStreamDescriptor::SelectForWrite() -{ - cerr << "W?"; - return false; -} - - -/***************************************************************************** - -$Id$ - -File: kb.cpp -Date: 24Aug07 - -Copyright (C) 2006-07 by Francis Cianfrocca. All Rights Reserved. -Gmail: blackhedd - -This program is free software; you can redistribute it and/or modify -it under the terms of either: 1) the GNU General Public License -as published by the Free Software Foundation; either version 2 of the -License, or (at your option) any later version; or 2) Ruby's License. - -See the file COPYING for complete licensing information. - -*****************************************************************************/ - -#include "project.h" - - -/************************************** -KeyboardDescriptor::KeyboardDescriptor -**************************************/ - -KeyboardDescriptor::KeyboardDescriptor (EventMachine_t *parent_em): - EventableDescriptor (0, parent_em), - bReadAttemptedAfterClose (false), - LastIo (gCurrentLoopTime), - InactivityTimeout (0) -{ - #ifdef HAVE_EPOLL - EpollEvent.events = EPOLLIN; - #endif - #ifdef HAVE_KQUEUE - MyEventMachine->ArmKqueueReader (this); - #endif -} - - -/*************************************** -KeyboardDescriptor::~KeyboardDescriptor -***************************************/ - -KeyboardDescriptor::~KeyboardDescriptor() -{ -} - - -/************************* -KeyboardDescriptor::Write -*************************/ - -void KeyboardDescriptor::Write() -{ - // Why are we here? - throw std::runtime_error ("bad code path in keyboard handler"); -} - - -/***************************** -KeyboardDescriptor::Heartbeat -*****************************/ - -void KeyboardDescriptor::Heartbeat() -{ - // no-op -} - - -/************************ -KeyboardDescriptor::Read -************************/ - -void KeyboardDescriptor::Read() -{ - char c; - read (GetSocket(), &c, 1); - if (EventCallback) - (*EventCallback)(GetBinding().c_str(), EM_CONNECTION_READ, &c, 1); -} - - - - -#if 0 -/****************************** -PipeDescriptor::PipeDescriptor -******************************/ - -PipeDescriptor::PipeDescriptor (int fd, pid_t subpid, EventMachine_t *parent_em): - EventableDescriptor (fd, parent_em), - bReadAttemptedAfterClose (false), - LastIo (gCurrentLoopTime), - InactivityTimeout (0), - OutboundDataSize (0), - SubprocessPid (subpid) -{ - #ifdef HAVE_EPOLL - EpollEvent.events = EPOLLIN; - #endif -} - - -/******************************* -PipeDescriptor::~PipeDescriptor -*******************************/ - -PipeDescriptor::~PipeDescriptor() -{ - // Run down any stranded outbound data. - for (size_t i=0; i < OutboundPages.size(); i++) - OutboundPages[i].Free(); - - /* As a virtual destructor, we come here before the base-class - * destructor that closes our file-descriptor. - * We have to make sure the subprocess goes down (if it's not - * already down) and we have to reap the zombie. - * - * This implementation is PROVISIONAL and will surely be improved. - * The intention here is that we never block, hence the highly - * undesirable sleeps. But if we can't reap the subprocess even - * after sending it SIGKILL, then something is wrong and we - * throw a fatal exception, which is also not something we should - * be doing. - * - * Eventually the right thing to do will be to have the reactor - * core respond to SIGCHLD by chaining a handler on top of the - * one Ruby may have installed, and dealing with a list of dead - * children that are pending cleanup. - * - * Since we want to have a signal processor integrated into the - * client-visible API, let's wait until that is done before cleaning - * this up. - */ - - struct timespec req = {0, 10000000}; - kill (SubprocessPid, SIGTERM); - nanosleep (&req, NULL); - if (waitpid (SubprocessPid, NULL, WNOHANG) == 0) { - kill (SubprocessPid, SIGKILL); - nanosleep (&req, NULL); - if (waitpid (SubprocessPid, NULL, WNOHANG) == 0) - throw std::runtime_error ("unable to reap subprocess"); - } -} - - - -/******************** -PipeDescriptor::Read -********************/ - -void PipeDescriptor::Read() -{ - int sd = GetSocket(); - if (sd == INVALID_SOCKET) { - assert (!bReadAttemptedAfterClose); - bReadAttemptedAfterClose = true; - return; - } - - LastIo = gCurrentLoopTime; - - int total_bytes_read = 0; - char readbuffer [16 * 1024]; - - for (int i=0; i < 10; i++) { - // Don't read just one buffer and then move on. This is faster - // if there is a lot of incoming. - // But don't read indefinitely. Give other sockets a chance to run. - // NOTICE, we're reading one less than the buffer size. - // That's so we can put a guard byte at the end of what we send - // to user code. - // Use read instead of recv, which on Linux gives a "socket operation - // on nonsocket" error. - - - int r = read (sd, readbuffer, sizeof(readbuffer) - 1); - //cerr << "<R:" << r << ">"; - - if (r > 0) { - total_bytes_read += r; - LastRead = gCurrentLoopTime; - - // Add a null-terminator at the the end of the buffer - // that we will send to the callback. - // DO NOT EVER CHANGE THIS. We want to explicitly allow users - // to be able to depend on this behavior, so they will have - // the option to do some things faster. Additionally it's - // a security guard against buffer overflows. - readbuffer [r] = 0; - if (EventCallback) - (*EventCallback)(GetBinding().c_str(), EM_CONNECTION_READ, readbuffer, r); - } - else if (r == 0) { - break; - } - else { - // Basically a would-block, meaning we've read everything there is to read. - break; - } - - } - - - if (total_bytes_read == 0) { - // If we read no data on a socket that selected readable, - // it generally means the other end closed the connection gracefully. - ScheduleClose (false); - //bCloseNow = true; - } - -} - -/********************* -PipeDescriptor::Write -*********************/ - -void PipeDescriptor::Write() -{ - int sd = GetSocket(); - assert (sd != INVALID_SOCKET); - - LastIo = gCurrentLoopTime; - char output_buffer [16 * 1024]; - size_t nbytes = 0; - - while ((OutboundPages.size() > 0) && (nbytes < sizeof(output_buffer))) { - OutboundPage *op = &(OutboundPages[0]); - if ((nbytes + op->Length - op->Offset) < sizeof (output_buffer)) { - memcpy (output_buffer + nbytes, op->Buffer + op->Offset, op->Length - op->Offset); - nbytes += (op->Length - op->Offset); - op->Free(); - OutboundPages.pop_front(); - } - else { - int len = sizeof(output_buffer) - nbytes; - memcpy (output_buffer + nbytes, op->Buffer + op->Offset, len); - op->Offset += len; - nbytes += len; - } - } - - // We should never have gotten here if there were no data to write, - // so assert that as a sanity check. - // Don't bother to make sure nbytes is less than output_buffer because - // if it were we probably would have crashed already. - assert (nbytes > 0); - - assert (GetSocket() != INVALID_SOCKET); - int bytes_written = write (GetSocket(), output_buffer, nbytes); - - if (bytes_written > 0) { - OutboundDataSize -= bytes_written; - if ((size_t)bytes_written < nbytes) { - int len = nbytes - bytes_written; - char *buffer = (char*) malloc (len + 1); - if (!buffer) - throw std::runtime_error ("bad alloc throwing back data"); - memcpy (buffer, output_buffer + bytes_written, len); - buffer [len] = 0; - OutboundPages.push_front (OutboundPage (buffer, len)); - } - #ifdef HAVE_EPOLL - EpollEvent.events = (EPOLLIN | (SelectForWrite() ? EPOLLOUT : 0)); - assert (MyEventMachine); - MyEventMachine->Modify (this); - #endif - } - else { - #ifdef OS_UNIX - if ((errno != EINPROGRESS) && (errno != EWOULDBLOCK) && (errno != EINTR)) - #endif - #ifdef OS_WIN32 - if ((errno != WSAEINPROGRESS) && (errno != WSAEWOULDBLOCK)) - #endif - Close(); - } -} - - -/************************* -PipeDescriptor::Heartbeat -*************************/ - -void PipeDescriptor::Heartbeat() -{ - // If an inactivity timeout is defined, then check for it. - if (InactivityTimeout && ((gCurrentLoopTime - LastIo) >= InactivityTimeout)) - ScheduleClose (false); - //bCloseNow = true; -} - - -/***************************** -PipeDescriptor::SelectForRead -*****************************/ - -bool PipeDescriptor::SelectForRead() -{ - /* Pipe descriptors, being local by definition, don't have - * a pending state, so this is simpler than for the - * ConnectionDescriptor object. - */ - return true; -} - -/****************************** -PipeDescriptor::SelectForWrite -******************************/ - -bool PipeDescriptor::SelectForWrite() -{ - /* Pipe descriptors, being local by definition, don't have - * a pending state, so this is simpler than for the - * ConnectionDescriptor object. - */ - return (GetOutboundDataSize() > 0); -} - - - - -/******************************** -PipeDescriptor::SendOutboundData -********************************/ - -int PipeDescriptor::SendOutboundData (const char *data, int length) -{ - //if (bCloseNow || bCloseAfterWriting) - if (IsCloseScheduled()) - return 0; - - if (!data && (length > 0)) - throw std::runtime_error ("bad outbound data"); - char *buffer = (char *) malloc (length + 1); - if (!buffer) - throw std::runtime_error ("no allocation for outbound data"); - memcpy (buffer, data, length); - buffer [length] = 0; - OutboundPages.push_back (OutboundPage (buffer, length)); - OutboundDataSize += length; - #ifdef HAVE_EPOLL - EpollEvent.events = (EPOLLIN | EPOLLOUT); - assert (MyEventMachine); - MyEventMachine->Modify (this); - #endif - return length; -} - -/******************************** -PipeDescriptor::GetSubprocessPid -********************************/ - -bool PipeDescriptor::GetSubprocessPid (pid_t *pid) -{ - bool ok = false; - if (pid && (SubprocessPid > 0)) { - *pid = SubprocessPid; - ok = true; - } - return ok; -} - -#endif - -/***************************************************************************** - -$Id$ - -File: page.cpp -Date: 30Apr06 - -Copyright (C) 2006-07 by Francis Cianfrocca. All Rights Reserved. -Gmail: blackhedd - -This program is free software; you can redistribute it and/or modify -it under the terms of either: 1) the GNU General Public License -as published by the Free Software Foundation; either version 2 of the -License, or (at your option) any later version; or 2) Ruby's License. - -See the file COPYING for complete licensing information. - -*****************************************************************************/ - - -#include "project.h" - - -/****************** -PageList::PageList -******************/ - -PageList::PageList() -{ -} - - -/******************* -PageList::~PageList -*******************/ - -PageList::~PageList() -{ - while (HasPages()) - PopFront(); -} - - -/*************** -PageList::Front -***************/ - -void PageList::Front (const char **page, int *length) -{ - assert (page && length); - - if (HasPages()) { - Page p = Pages.front(); - *page = p.Buffer; - *length = p.Size; - } - else { - *page = NULL; - *length = 0; - } -} - - -/****************** -PageList::PopFront -******************/ - -void PageList::PopFront() -{ - if (HasPages()) { - Page p = Pages.front(); - Pages.pop_front(); - if (p.Buffer) - free ((void*)p.Buffer); - } -} - - -/****************** -PageList::HasPages -******************/ - -bool PageList::HasPages() -{ - return (Pages.size() > 0) ? true : false; -} - - -/************** -PageList::Push -**************/ - -void PageList::Push (const char *buf, int size) -{ - if (buf && (size > 0)) { - char *copy = (char*) malloc (size); - if (!copy) - throw runtime_error ("no memory in pagelist"); - memcpy (copy, buf, size); - Pages.push_back (Page (copy, size)); - } -} - - - - - -/***************************************************************************** - -$Id$ - -File: pipe.cpp -Date: 30May07 - -Copyright (C) 2006-07 by Francis Cianfrocca. All Rights Reserved. -Gmail: blackhedd - -This program is free software; you can redistribute it and/or modify -it under the terms of either: 1) the GNU General Public License -as published by the Free Software Foundation; either version 2 of the -License, or (at your option) any later version; or 2) Ruby's License. - -See the file COPYING for complete licensing information. - -*****************************************************************************/ - -#include "project.h" - - -#ifdef OS_UNIX -// THIS ENTIRE FILE IS ONLY COMPILED ON UNIX-LIKE SYSTEMS. - -/****************************** -PipeDescriptor::PipeDescriptor -******************************/ - -PipeDescriptor::PipeDescriptor (int fd, pid_t subpid, EventMachine_t *parent_em): - EventableDescriptor (fd, parent_em), - bReadAttemptedAfterClose (false), - LastIo (gCurrentLoopTime), - InactivityTimeout (0), - OutboundDataSize (0), - SubprocessPid (subpid) -{ - #ifdef HAVE_EPOLL - EpollEvent.events = EPOLLIN; - #endif - #ifdef HAVE_KQUEUE - MyEventMachine->ArmKqueueReader (this); - #endif -} - - -/******************************* -PipeDescriptor::~PipeDescriptor -*******************************/ - -PipeDescriptor::~PipeDescriptor() -{ - // Run down any stranded outbound data. - for (size_t i=0; i < OutboundPages.size(); i++) - OutboundPages[i].Free(); - - /* As a virtual destructor, we come here before the base-class - * destructor that closes our file-descriptor. - * We have to make sure the subprocess goes down (if it's not - * already down) and we have to reap the zombie. - * - * This implementation is PROVISIONAL and will surely be improved. - * The intention here is that we never block, hence the highly - * undesirable sleeps. But if we can't reap the subprocess even - * after sending it SIGKILL, then something is wrong and we - * throw a fatal exception, which is also not something we should - * be doing. - * - * Eventually the right thing to do will be to have the reactor - * core respond to SIGCHLD by chaining a handler on top of the - * one Ruby may have installed, and dealing with a list of dead - * children that are pending cleanup. - * - * Since we want to have a signal processor integrated into the - * client-visible API, let's wait until that is done before cleaning - * this up. - * - * Added a very ugly hack to support passing the subprocess's exit - * status to the user. It only makes logical sense for user code to access - * the subprocess exit status in the unbind callback. But unbind is called - * back during the EventableDescriptor destructor. So by that time there's - * no way to call back this object through an object binding, because it's - * already been cleaned up. We might have added a parameter to the unbind - * callback, but that would probably break a huge amount of existing code. - * So the hack-solution is to define an instance variable in the EventMachine - * object and stick the exit status in there, where it can easily be accessed - * with an accessor visible to user code. - * User code should ONLY access the exit status from within the unbind callback. - * Otherwise there's no guarantee it'll be valid. - * This hack won't make it impossible to run multiple EventMachines in a single - * process, but it will make it impossible to reliably nest unbind calls - * within other unbind calls. (Not sure if that's even possible.) - */ - - assert (MyEventMachine); - - // check if the process is already dead - if (waitpid (SubprocessPid, &(MyEventMachine->SubprocessExitStatus), WNOHANG) == 0) { - kill (SubprocessPid, SIGTERM); - // wait 0.25s for process to die - struct timespec req = {0, 250000000}; - nanosleep (&req, NULL); - if (waitpid (SubprocessPid, &(MyEventMachine->SubprocessExitStatus), WNOHANG) == 0) { - kill (SubprocessPid, SIGKILL); - // wait 0.5s for process to die - struct timespec req = {0, 500000000}; - nanosleep (&req, NULL); - if (waitpid (SubprocessPid, &(MyEventMachine->SubprocessExitStatus), WNOHANG) == 0) - throw std::runtime_error ("unable to reap subprocess"); - } - } -} - - - -/******************** -PipeDescriptor::Read -********************/ - -void PipeDescriptor::Read() -{ - int sd = GetSocket(); - if (sd == INVALID_SOCKET) { - assert (!bReadAttemptedAfterClose); - bReadAttemptedAfterClose = true; - return; - } - - LastIo = gCurrentLoopTime; - - int total_bytes_read = 0; - char readbuffer [16 * 1024]; - - for (int i=0; i < 10; i++) { - // Don't read just one buffer and then move on. This is faster - // if there is a lot of incoming. - // But don't read indefinitely. Give other sockets a chance to run. - // NOTICE, we're reading one less than the buffer size. - // That's so we can put a guard byte at the end of what we send - // to user code. - // Use read instead of recv, which on Linux gives a "socket operation - // on nonsocket" error. - - - int r = read (sd, readbuffer, sizeof(readbuffer) - 1); - //cerr << "<R:" << r << ">"; - - if (r > 0) { - total_bytes_read += r; - LastRead = gCurrentLoopTime; - - // Add a null-terminator at the the end of the buffer - // that we will send to the callback. - // DO NOT EVER CHANGE THIS. We want to explicitly allow users - // to be able to depend on this behavior, so they will have - // the option to do some things faster. Additionally it's - // a security guard against buffer overflows. - readbuffer [r] = 0; - if (EventCallback) - (*EventCallback)(GetBinding().c_str(), EM_CONNECTION_READ, readbuffer, r); - } - else if (r == 0) { - break; - } - else { - // Basically a would-block, meaning we've read everything there is to read. - break; - } - - } - - - if (total_bytes_read == 0) { - // If we read no data on a socket that selected readable, - // it generally means the other end closed the connection gracefully. - ScheduleClose (false); - //bCloseNow = true; - } - -} - -/********************* -PipeDescriptor::Write -*********************/ - -void PipeDescriptor::Write() -{ - int sd = GetSocket(); - assert (sd != INVALID_SOCKET); - - LastIo = gCurrentLoopTime; - char output_buffer [16 * 1024]; - size_t nbytes = 0; - - while ((OutboundPages.size() > 0) && (nbytes < sizeof(output_buffer))) { - OutboundPage *op = &(OutboundPages[0]); - if ((nbytes + op->Length - op->Offset) < sizeof (output_buffer)) { - memcpy (output_buffer + nbytes, op->Buffer + op->Offset, op->Length - op->Offset); - nbytes += (op->Length - op->Offset); - op->Free(); - OutboundPages.pop_front(); - } - else { - int len = sizeof(output_buffer) - nbytes; - memcpy (output_buffer + nbytes, op->Buffer + op->Offset, len); - op->Offset += len; - nbytes += len; - } - } - - // We should never have gotten here if there were no data to write, - // so assert that as a sanity check. - // Don't bother to make sure nbytes is less than output_buffer because - // if it were we probably would have crashed already. - assert (nbytes > 0); - - assert (GetSocket() != INVALID_SOCKET); - int bytes_written = write (GetSocket(), output_buffer, nbytes); - - if (bytes_written > 0) { - OutboundDataSize -= bytes_written; - if ((size_t)bytes_written < nbytes) { - int len = nbytes - bytes_written; - char *buffer = (char*) malloc (len + 1); - if (!buffer) - throw std::runtime_error ("bad alloc throwing back data"); - memcpy (buffer, output_buffer + bytes_written, len); - buffer [len] = 0; - OutboundPages.push_front (OutboundPage (buffer, len)); - } - #ifdef HAVE_EPOLL - EpollEvent.events = (EPOLLIN | (SelectForWrite() ? EPOLLOUT : 0)); - assert (MyEventMachine); - MyEventMachine->Modify (this); - #endif - } - else { - #ifdef OS_UNIX - if ((errno != EINPROGRESS) && (errno != EWOULDBLOCK) && (errno != EINTR)) - #endif - #ifdef OS_WIN32 - if ((errno != WSAEINPROGRESS) && (errno != WSAEWOULDBLOCK)) - #endif - Close(); - } -} - - -/************************* -PipeDescriptor::Heartbeat -*************************/ - -void PipeDescriptor::Heartbeat() -{ - // If an inactivity timeout is defined, then check for it. - if (InactivityTimeout && ((gCurrentLoopTime - LastIo) >= InactivityTimeout)) - ScheduleClose (false); - //bCloseNow = true; -} - - -/***************************** -PipeDescriptor::SelectForRead -*****************************/ - -bool PipeDescriptor::SelectForRead() -{ - /* Pipe descriptors, being local by definition, don't have - * a pending state, so this is simpler than for the - * ConnectionDescriptor object. - */ - return true; -} - -/****************************** -PipeDescriptor::SelectForWrite -******************************/ - -bool PipeDescriptor::SelectForWrite() -{ - /* Pipe descriptors, being local by definition, don't have - * a pending state, so this is simpler than for the - * ConnectionDescriptor object. - */ - return (GetOutboundDataSize() > 0); -} - - - - -/******************************** -PipeDescriptor::SendOutboundData -********************************/ - -int PipeDescriptor::SendOutboundData (const char *data, int length) -{ - //if (bCloseNow || bCloseAfterWriting) - if (IsCloseScheduled()) - return 0; - - if (!data && (length > 0)) - throw std::runtime_error ("bad outbound data"); - char *buffer = (char *) malloc (length + 1); - if (!buffer) - throw std::runtime_error ("no allocation for outbound data"); - memcpy (buffer, data, length); - buffer [length] = 0; - OutboundPages.push_back (OutboundPage (buffer, length)); - OutboundDataSize += length; - #ifdef HAVE_EPOLL - EpollEvent.events = (EPOLLIN | EPOLLOUT); - assert (MyEventMachine); - MyEventMachine->Modify (this); - #endif - return length; -} - -/******************************** -PipeDescriptor::GetSubprocessPid -********************************/ - -bool PipeDescriptor::GetSubprocessPid (pid_t *pid) -{ - bool ok = false; - if (pid && (SubprocessPid > 0)) { - *pid = SubprocessPid; - ok = true; - } - return ok; -} - - -#endif // OS_UNIX - -/***************************************************************************** - -$Id$ - -File: rubymain.cpp -Date: 06Apr06 - -Copyright (C) 2006-07 by Francis Cianfrocca. All Rights Reserved. -Gmail: blackhedd - -This program is free software; you can redistribute it and/or modify -it under the terms of either: 1) the GNU General Public License -as published by the Free Software Foundation; either version 2 of the -License, or (at your option) any later version; or 2) Ruby's License. - -See the file COPYING for complete licensing information. - -*****************************************************************************/ - -#include "project.h" -#include "eventmachine.h" -#include <ruby.h> - - - -/******* -Statics -*******/ - -static VALUE EmModule; -static VALUE EmConnection; - -static VALUE Intern_at_signature; -static VALUE Intern_at_timers; -static VALUE Intern_at_conns; -static VALUE Intern_event_callback; -static VALUE Intern_run_deferred_callbacks; -static VALUE Intern_delete; -static VALUE Intern_call; -static VALUE Intern_receive_data; - -static VALUE Intern_notify_readable; -static VALUE Intern_notify_writable; - -/**************** -t_event_callback -****************/ - -static void event_callback (const char *a1, int a2, const char *a3, int a4) -{ - if (a2 == EM_CONNECTION_READ) { - VALUE t = rb_ivar_get (EmModule, Intern_at_conns); - VALUE q = rb_hash_aref (t, rb_str_new2(a1)); - if (q == Qnil) - rb_raise (rb_eRuntimeError, "no connection"); - rb_funcall (q, Intern_receive_data, 1, rb_str_new (a3, a4)); - } - else if (a2 == EM_CONNECTION_NOTIFY_READABLE) { - VALUE t = rb_ivar_get (EmModule, Intern_at_conns); - VALUE q = rb_hash_aref (t, rb_str_new2(a1)); - if (q == Qnil) - rb_raise (rb_eRuntimeError, "no connection"); - rb_funcall (q, Intern_notify_readable, 0); - } - else if (a2 == EM_CONNECTION_NOTIFY_WRITABLE) { - VALUE t = rb_ivar_get (EmModule, Intern_at_conns); - VALUE q = rb_hash_aref (t, rb_str_new2(a1)); - if (q == Qnil) - rb_raise (rb_eRuntimeError, "no connection"); - rb_funcall (q, Intern_notify_writable, 0); - } - else if (a2 == EM_LOOPBREAK_SIGNAL) { - rb_funcall (EmModule, Intern_run_deferred_callbacks, 0); - } - else if (a2 == EM_TIMER_FIRED) { - VALUE t = rb_ivar_get (EmModule, Intern_at_timers); - VALUE q = rb_funcall (t, Intern_delete, 1, rb_str_new(a3, a4)); - if (q == Qnil) - rb_raise (rb_eRuntimeError, "no timer"); - rb_funcall (q, Intern_call, 0); - } - else - rb_funcall (EmModule, Intern_event_callback, 3, rb_str_new2(a1), (a2 << 1) | 1, rb_str_new(a3,a4)); -} - - - -/************************** -t_initialize_event_machine -**************************/ - -static VALUE t_initialize_event_machine (VALUE self) -{ - evma_initialize_library (event_callback); - return Qnil; -} - - - -/***************************** -t_run_machine_without_threads -*****************************/ - -static VALUE t_run_machine_without_threads (VALUE self) -{ - evma_run_machine(); - return Qnil; -} - - -/******************* -t_add_oneshot_timer -*******************/ - -static VALUE t_add_oneshot_timer (VALUE self, VALUE interval) -{ - const char *f = evma_install_oneshot_timer (FIX2INT (interval)); - if (!f || !*f) - rb_raise (rb_eRuntimeError, "no timer"); - return rb_str_new2 (f); -} - - -/************** -t_start_server -**************/ - -static VALUE t_start_server (VALUE self, VALUE server, VALUE port) -{ - const char *f = evma_create_tcp_server (StringValuePtr(server), FIX2INT(port)); - if (!f || !*f) - rb_raise (rb_eRuntimeError, "no acceptor"); - return rb_str_new2 (f); -} - -/************* -t_stop_server -*************/ - -static VALUE t_stop_server (VALUE self, VALUE signature) -{ - evma_stop_tcp_server (StringValuePtr (signature)); - return Qnil; -} - - -/******************* -t_start_unix_server -*******************/ - -static VALUE t_start_unix_server (VALUE self, VALUE filename) -{ - const char *f = evma_create_unix_domain_server (StringValuePtr(filename)); - if (!f || !*f) - rb_raise (rb_eRuntimeError, "no unix-domain acceptor"); - return rb_str_new2 (f); -} - - - -/*********** -t_send_data -***********/ - -static VALUE t_send_data (VALUE self, VALUE signature, VALUE data, VALUE data_length) -{ - int b = evma_send_data_to_connection (StringValuePtr (signature), StringValuePtr (data), FIX2INT (data_length)); - return INT2NUM (b); -} - - -/*********** -t_start_tls -***********/ - -static VALUE t_start_tls (VALUE self, VALUE signature) -{ - evma_start_tls (StringValuePtr (signature)); - return Qnil; -} - -/*************** -t_set_tls_parms -***************/ - -static VALUE t_set_tls_parms (VALUE self, VALUE signature, VALUE privkeyfile, VALUE certchainfile) -{ - /* set_tls_parms takes a series of positional arguments for specifying such things - * as private keys and certificate chains. - * It's expected that the parameter list will grow as we add more supported features. - * ALL of these parameters are optional, and can be specified as empty or NULL strings. - */ - evma_set_tls_parms (StringValuePtr (signature), StringValuePtr (privkeyfile), StringValuePtr (certchainfile) ); - return Qnil; -} - -/************** -t_get_peername -**************/ - -static VALUE t_get_peername (VALUE self, VALUE signature) -{ - struct sockaddr s; - if (evma_get_peername (StringValuePtr (signature), &s)) { - return rb_str_new ((const char*)&s, sizeof(s)); - } - - return Qnil; -} - -/************** -t_get_sockname -**************/ - -static VALUE t_get_sockname (VALUE self, VALUE signature) -{ - struct sockaddr s; - if (evma_get_sockname (StringValuePtr (signature), &s)) { - return rb_str_new ((const char*)&s, sizeof(s)); - } - - return Qnil; -} - -/******************** -t_get_subprocess_pid -********************/ - -static VALUE t_get_subprocess_pid (VALUE self, VALUE signature) -{ - pid_t pid; - if (evma_get_subprocess_pid (StringValuePtr (signature), &pid)) { - return INT2NUM (pid); - } - - return Qnil; -} - -/*********************** -t_get_subprocess_status -***********************/ - -static VALUE t_get_subprocess_status (VALUE self, VALUE signature) -{ - int status; - if (evma_get_subprocess_status (StringValuePtr (signature), &status)) { - return INT2NUM (status); - } - - return Qnil; -} - -/***************************** -t_get_comm_inactivity_timeout -*****************************/ - -static VALUE t_get_comm_inactivity_timeout (VALUE self, VALUE signature) -{ - int timeout; - if (evma_get_comm_inactivity_timeout (StringValuePtr (signature), &timeout)) - return INT2FIX (timeout); - return Qnil; -} - -/***************************** -t_set_comm_inactivity_timeout -*****************************/ - -static VALUE t_set_comm_inactivity_timeout (VALUE self, VALUE signature, VALUE timeout) -{ - int ti = FIX2INT (timeout); - if (evma_set_comm_inactivity_timeout (StringValuePtr (signature), &ti)); - return Qtrue; - return Qnil; -} - - -/*************** -t_send_datagram -***************/ - -static VALUE t_send_datagram (VALUE self, VALUE signature, VALUE data, VALUE data_length, VALUE address, VALUE port) -{ - int b = evma_send_datagram (StringValuePtr (signature), StringValuePtr (data), FIX2INT (data_length), StringValuePtr(address), FIX2INT(port)); - return INT2NUM (b); -} - - -/****************** -t_close_connection -******************/ - -static VALUE t_close_connection (VALUE self, VALUE signature, VALUE after_writing) -{ - evma_close_connection (StringValuePtr (signature), ((after_writing == Qtrue) ? 1 : 0)); - return Qnil; -} - -/******************************** -t_report_connection_error_status -********************************/ - -static VALUE t_report_connection_error_status (VALUE self, VALUE signature) -{ - int b = evma_report_connection_error_status (StringValuePtr (signature)); - return INT2NUM (b); -} - - - -/**************** -t_connect_server -****************/ - -static VALUE t_connect_server (VALUE self, VALUE server, VALUE port) -{ - // Avoid FIX2INT in this case, because it doesn't deal with type errors properly. - // Specifically, if the value of port comes in as a string rather than an integer, - // NUM2INT will throw a type error, but FIX2INT will generate garbage. - - const char *f = evma_connect_to_server (StringValuePtr(server), NUM2INT(port)); - if (!f || !*f) - rb_raise (rb_eRuntimeError, "no connection"); - return rb_str_new2 (f); -} - -/********************* -t_connect_unix_server -*********************/ - -static VALUE t_connect_unix_server (VALUE self, VALUE serversocket) -{ - const char *f = evma_connect_to_unix_server (StringValuePtr(serversocket)); - if (!f || !*f) - rb_raise (rb_eRuntimeError, "no connection"); - return rb_str_new2 (f); -} - -/*********** -t_attach_fd -***********/ - -static VALUE t_attach_fd (VALUE self, VALUE file_descriptor, VALUE read_mode, VALUE write_mode) -{ - const char *f = evma_attach_fd (NUM2INT(file_descriptor), (read_mode == Qtrue) ? 1 : 0, (write_mode == Qtrue) ? 1 : 0); - if (!f || !*f) - rb_raise (rb_eRuntimeError, "no connection"); - return rb_str_new2 (f); -} - -/*********** -t_detach_fd -***********/ - -static VALUE t_detach_fd (VALUE self, VALUE signature) -{ - return INT2NUM(evma_detach_fd (StringValuePtr(signature))); -} - -/***************** -t_open_udp_socket -*****************/ - -static VALUE t_open_udp_socket (VALUE self, VALUE server, VALUE port) -{ - const char *f = evma_open_datagram_socket (StringValuePtr(server), FIX2INT(port)); - if (!f || !*f) - rb_raise (rb_eRuntimeError, "no datagram socket"); - return rb_str_new2 (f); -} - - - -/***************** -t_release_machine -*****************/ - -static VALUE t_release_machine (VALUE self) -{ - evma_release_library(); - return Qnil; -} - - -/****** -t_stop -******/ - -static VALUE t_stop (VALUE self) -{ - evma_stop_machine(); - return Qnil; -} - -/****************** -t_signal_loopbreak -******************/ - -static VALUE t_signal_loopbreak (VALUE self) -{ - evma_signal_loopbreak(); - return Qnil; -} - -/************** -t_library_type -**************/ - -static VALUE t_library_type (VALUE self) -{ - return rb_eval_string (":extension"); -} - - - -/******************* -t_set_timer_quantum -*******************/ - -static VALUE t_set_timer_quantum (VALUE self, VALUE interval) -{ - evma_set_timer_quantum (FIX2INT (interval)); - return Qnil; -} - -/******************** -t_set_max_timer_count -********************/ - -static VALUE t_set_max_timer_count (VALUE self, VALUE ct) -{ - evma_set_max_timer_count (FIX2INT (ct)); - return Qnil; -} - -/*************** -t_setuid_string -***************/ - -static VALUE t_setuid_string (VALUE self, VALUE username) -{ - evma_setuid_string (StringValuePtr (username)); - return Qnil; -} - - - -/************* -t__write_file -*************/ - -static VALUE t__write_file (VALUE self, VALUE filename) -{ - const char *f = evma__write_file (StringValuePtr (filename)); - if (!f || !*f) - rb_raise (rb_eRuntimeError, "file not opened"); - return rb_str_new2 (f); -} - -/************** -t_invoke_popen -**************/ - -static VALUE t_invoke_popen (VALUE self, VALUE cmd) -{ - // 1.8.7+ - #ifdef RARRAY_LEN - int len = RARRAY_LEN(cmd); - #else - int len = RARRAY (cmd)->len; - #endif - if (len > 98) - rb_raise (rb_eRuntimeError, "too many arguments to popen"); - char *strings [100]; - for (int i=0; i < len; i++) { - VALUE ix = INT2FIX (i); - VALUE s = rb_ary_aref (1, &ix, cmd); - strings[i] = StringValuePtr (s); - } - strings[len] = NULL; - - const char *f = evma_popen (strings); - if (!f || !*f) { - char *err = strerror (errno); - char buf[100]; - memset (buf, 0, sizeof(buf)); - snprintf (buf, sizeof(buf)-1, "no popen: %s", (err?err:"???")); - rb_raise (rb_eRuntimeError, buf); - } - return rb_str_new2 (f); -} - - -/*************** -t_read_keyboard -***************/ - -static VALUE t_read_keyboard (VALUE self) -{ - const char *f = evma_open_keyboard(); - if (!f || !*f) - rb_raise (rb_eRuntimeError, "no keyboard reader"); - return rb_str_new2 (f); -} - - -/******** -t__epoll -********/ - -static VALUE t__epoll (VALUE self) -{ - // Temporary. - evma__epoll(); - return Qnil; -} - -/********** -t__epoll_p -**********/ - -static VALUE t__epoll_p (VALUE self) -{ - #ifdef HAVE_EPOLL - return Qtrue; - #else - return Qfalse; - #endif -} - - -/********* -t__kqueue -*********/ - -static VALUE t__kqueue (VALUE self) -{ - // Temporary. - evma__kqueue(); - return Qnil; -} - -/*********** -t__kqueue_p -***********/ - -static VALUE t__kqueue_p (VALUE self) -{ - #ifdef HAVE_KQUEUE - return Qtrue; - #else - return Qfalse; - #endif -} - - -/**************** -t_send_file_data -****************/ - -static VALUE t_send_file_data (VALUE self, VALUE signature, VALUE filename) -{ - - /* The current implementation of evma_send_file_data_to_connection enforces a strict - * upper limit on the file size it will transmit (currently 32K). The function returns - * zero on success, -1 if the requested file exceeds its size limit, and a positive - * number for other errors. - * TODO: Positive return values are actually errno's, which is probably the wrong way to - * do this. For one thing it's ugly. For another, we can't be sure zero is never a real errno. - */ - - int b = evma_send_file_data_to_connection (StringValuePtr(signature), StringValuePtr(filename)); - if (b == -1) - rb_raise(rb_eRuntimeError, "File too large. send_file_data() supports files under 32k."); - if (b > 0) { - char *err = strerror (b); - char buf[1024]; - memset (buf, 0, sizeof(buf)); - snprintf (buf, sizeof(buf)-1, ": %s %s", StringValuePtr(filename),(err?err:"???")); - - rb_raise (rb_eIOError, buf); - } - - return INT2NUM (0); -} - - -/******************* -t_set_rlimit_nofile -*******************/ - -static VALUE t_set_rlimit_nofile (VALUE self, VALUE arg) -{ - arg = (NIL_P(arg)) ? -1 : NUM2INT (arg); - return INT2NUM (evma_set_rlimit_nofile (arg)); -} - -/*************************** -conn_get_outbound_data_size -***************************/ - -static VALUE conn_get_outbound_data_size (VALUE self) -{ - VALUE sig = rb_ivar_get (self, Intern_at_signature); - return INT2NUM (evma_get_outbound_data_size (StringValuePtr(sig))); -} - - -/****************************** -conn_associate_callback_target -******************************/ - -static VALUE conn_associate_callback_target (VALUE self, VALUE sig) -{ - // No-op for the time being. - return Qnil; -} - - -/*************** -t_get_loop_time -****************/ - -static VALUE t_get_loop_time (VALUE self) -{ - VALUE cTime = rb_path2class("Time"); - if (gCurrentLoopTime != 0) { - return rb_funcall(cTime, - rb_intern("at"), - 1, - INT2NUM(gCurrentLoopTime)); - } - return Qnil; -} - - -/********************* -Init_rubyeventmachine -*********************/ - -extern "C" void Init_rubyeventmachine() -{ - // Tuck away some symbol values so we don't have to look 'em up every time we need 'em. - Intern_at_signature = rb_intern ("@signature"); - Intern_at_timers = rb_intern ("@timers"); - Intern_at_conns = rb_intern ("@conns"); - - Intern_event_callback = rb_intern ("event_callback"); - Intern_run_deferred_callbacks = rb_intern ("run_deferred_callbacks"); - Intern_delete = rb_intern ("delete"); - Intern_call = rb_intern ("call"); - Intern_receive_data = rb_intern ("receive_data"); - - Intern_notify_readable = rb_intern ("notify_readable"); - Intern_notify_writable = rb_intern ("notify_writable"); - - // INCOMPLETE, we need to define class Connections inside module EventMachine - // run_machine and run_machine_without_threads are now identical. - // Must deprecate the without_threads variant. - EmModule = rb_define_module ("EventMachine"); - EmConnection = rb_define_class_under (EmModule, "Connection", rb_cObject); - - rb_define_class_under (EmModule, "ConnectionNotBound", rb_eException); - rb_define_class_under (EmModule, "NoHandlerForAcceptedConnection", rb_eException); - rb_define_class_under (EmModule, "UnknownTimerFired", rb_eException); - - rb_define_module_function (EmModule, "initialize_event_machine", (VALUE(*)(...))t_initialize_event_machine, 0); - rb_define_module_function (EmModule, "run_machine", (VALUE(*)(...))t_run_machine_without_threads, 0); - rb_define_module_function (EmModule, "run_machine_without_threads", (VALUE(*)(...))t_run_machine_without_threads, 0); - rb_define_module_function (EmModule, "add_oneshot_timer", (VALUE(*)(...))t_add_oneshot_timer, 1); - rb_define_module_function (EmModule, "start_tcp_server", (VALUE(*)(...))t_start_server, 2); - rb_define_module_function (EmModule, "stop_tcp_server", (VALUE(*)(...))t_stop_server, 1); - rb_define_module_function (EmModule, "start_unix_server", (VALUE(*)(...))t_start_unix_server, 1); - rb_define_module_function (EmModule, "set_tls_parms", (VALUE(*)(...))t_set_tls_parms, 3); - rb_define_module_function (EmModule, "start_tls", (VALUE(*)(...))t_start_tls, 1); - rb_define_module_function (EmModule, "send_data", (VALUE(*)(...))t_send_data, 3); - rb_define_module_function (EmModule, "send_datagram", (VALUE(*)(...))t_send_datagram, 5); - rb_define_module_function (EmModule, "close_connection", (VALUE(*)(...))t_close_connection, 2); - rb_define_module_function (EmModule, "report_connection_error_status", (VALUE(*)(...))t_report_connection_error_status, 1); - rb_define_module_function (EmModule, "connect_server", (VALUE(*)(...))t_connect_server, 2); - rb_define_module_function (EmModule, "connect_unix_server", (VALUE(*)(...))t_connect_unix_server, 1); - - rb_define_module_function (EmModule, "attach_fd", (VALUE (*)(...))t_attach_fd, 3); - rb_define_module_function (EmModule, "detach_fd", (VALUE (*)(...))t_detach_fd, 1); - - rb_define_module_function (EmModule, "current_time", (VALUE(*)(...))t_get_loop_time, 0); - - rb_define_module_function (EmModule, "open_udp_socket", (VALUE(*)(...))t_open_udp_socket, 2); - rb_define_module_function (EmModule, "read_keyboard", (VALUE(*)(...))t_read_keyboard, 0); - rb_define_module_function (EmModule, "release_machine", (VALUE(*)(...))t_release_machine, 0); - rb_define_module_function (EmModule, "stop", (VALUE(*)(...))t_stop, 0); - rb_define_module_function (EmModule, "signal_loopbreak", (VALUE(*)(...))t_signal_loopbreak, 0); - rb_define_module_function (EmModule, "library_type", (VALUE(*)(...))t_library_type, 0); - rb_define_module_function (EmModule, "set_timer_quantum", (VALUE(*)(...))t_set_timer_quantum, 1); - rb_define_module_function (EmModule, "set_max_timer_count", (VALUE(*)(...))t_set_max_timer_count, 1); - rb_define_module_function (EmModule, "setuid_string", (VALUE(*)(...))t_setuid_string, 1); - rb_define_module_function (EmModule, "invoke_popen", (VALUE(*)(...))t_invoke_popen, 1); - rb_define_module_function (EmModule, "send_file_data", (VALUE(*)(...))t_send_file_data, 2); - - // Provisional: - rb_define_module_function (EmModule, "_write_file", (VALUE(*)(...))t__write_file, 1); - - rb_define_module_function (EmModule, "get_peername", (VALUE(*)(...))t_get_peername, 1); - rb_define_module_function (EmModule, "get_sockname", (VALUE(*)(...))t_get_sockname, 1); - rb_define_module_function (EmModule, "get_subprocess_pid", (VALUE(*)(...))t_get_subprocess_pid, 1); - rb_define_module_function (EmModule, "get_subprocess_status", (VALUE(*)(...))t_get_subprocess_status, 1); - rb_define_module_function (EmModule, "get_comm_inactivity_timeout", (VALUE(*)(...))t_get_comm_inactivity_timeout, 1); - rb_define_module_function (EmModule, "set_comm_inactivity_timeout", (VALUE(*)(...))t_set_comm_inactivity_timeout, 2); - rb_define_module_function (EmModule, "set_rlimit_nofile", (VALUE(*)(...))t_set_rlimit_nofile, 1); - - // Temporary: - rb_define_module_function (EmModule, "epoll", (VALUE(*)(...))t__epoll, 0); - rb_define_module_function (EmModule, "kqueue", (VALUE(*)(...))t__kqueue, 0); - - rb_define_module_function (EmModule, "epoll?", (VALUE(*)(...))t__epoll_p, 0); - rb_define_module_function (EmModule, "kqueue?", (VALUE(*)(...))t__kqueue_p, 0); - - rb_define_method (EmConnection, "get_outbound_data_size", (VALUE(*)(...))conn_get_outbound_data_size, 0); - rb_define_method (EmConnection, "associate_callback_target", (VALUE(*)(...))conn_associate_callback_target, 1); - - rb_define_const (EmModule, "TimerFired", INT2NUM(100)); - rb_define_const (EmModule, "ConnectionData", INT2NUM(101)); - rb_define_const (EmModule, "ConnectionUnbound", INT2NUM(102)); - rb_define_const (EmModule, "ConnectionAccepted", INT2NUM(103)); - rb_define_const (EmModule, "ConnectionCompleted", INT2NUM(104)); - rb_define_const (EmModule, "LoopbreakSignalled", INT2NUM(105)); - - rb_define_const (EmModule, "ConnectionNotifyReadable", INT2NUM(106)); - rb_define_const (EmModule, "ConnectionNotifyWritable", INT2NUM(107)); - -} - -/***************************************************************************** - -$Id$ - -File: sigs.cpp -Date: 06Apr06 - -Copyright (C) 2006-07 by Francis Cianfrocca. All Rights Reserved. -Gmail: blackhedd - -This program is free software; you can redistribute it and/or modify -it under the terms of either: 1) the GNU General Public License -as published by the Free Software Foundation; either version 2 of the -License, or (at your option) any later version; or 2) Ruby's License. - -See the file COPYING for complete licensing information. - -*****************************************************************************/ - -#include "project.h" - - -bool gTerminateSignalReceived; - - -/************** -SigtermHandler -**************/ - -void SigtermHandler (int sig) -{ - // This is a signal-handler, don't do anything frisky. Interrupts are disabled. - // Set the terminate flag WITHOUT trying to lock a mutex- otherwise we can easily - // self-deadlock, especially if the event machine is looping quickly. - gTerminateSignalReceived = true; -} - - -/********************* -InstallSignalHandlers -*********************/ - -void InstallSignalHandlers() -{ - #ifdef OS_UNIX - static bool bInstalled = false; - if (!bInstalled) { - bInstalled = true; - signal (SIGINT, SigtermHandler); - signal (SIGTERM, SigtermHandler); - signal (SIGPIPE, SIG_IGN); - } - #endif -} - - - -/******************* -WintelSignalHandler -*******************/ - -#ifdef OS_WIN32 -BOOL WINAPI WintelSignalHandler (DWORD control) -{ - if (control == CTRL_C_EVENT) - gTerminateSignalReceived = true; - return TRUE; -} -#endif - -/************ -HookControlC -************/ - -#ifdef OS_WIN32 -void HookControlC (bool hook) -{ - if (hook) { - // INSTALL hook - SetConsoleCtrlHandler (WintelSignalHandler, TRUE); - } - else { - // UNINSTALL hook - SetConsoleCtrlHandler (WintelSignalHandler, FALSE); - } -} -#endif - - -/***************************************************************************** - -$Id$ - -File: ssl.cpp -Date: 30Apr06 - -Copyright (C) 2006-07 by Francis Cianfrocca. All Rights Reserved. -Gmail: blackhedd - -This program is free software; you can redistribute it and/or modify -it under the terms of either: 1) the GNU General Public License -as published by the Free Software Foundation; either version 2 of the -License, or (at your option) any later version; or 2) Ruby's License. - -See the file COPYING for complete licensing information. - -*****************************************************************************/ - - -#ifdef WITH_SSL - -#include "project.h" - - -bool SslContext_t::bLibraryInitialized = false; - - - -static void InitializeDefaultCredentials(); -static EVP_PKEY *DefaultPrivateKey = NULL; -static X509 *DefaultCertificate = NULL; - -static char PrivateMaterials[] = { -"-----BEGIN RSA PRIVATE KEY-----\n" -"MIICXAIBAAKBgQDCYYhcw6cGRbhBVShKmbWm7UVsEoBnUf0cCh8AX+MKhMxwVDWV\n" -"Igdskntn3cSJjRtmgVJHIK0lpb/FYHQB93Ohpd9/Z18pDmovfFF9nDbFF0t39hJ/\n" -"AqSzFB3GiVPoFFZJEE1vJqh+3jzsSF5K56bZ6azz38VlZgXeSozNW5bXkQIDAQAB\n" -"AoGALA89gIFcr6BIBo8N5fL3aNHpZXjAICtGav+kTUpuxSiaym9cAeTHuAVv8Xgk\n" -"H2Wbq11uz+6JMLpkQJH/WZ7EV59DPOicXrp0Imr73F3EXBfR7t2EQDYHPMthOA1D\n" -"I9EtCzvV608Ze90hiJ7E3guGrGppZfJ+eUWCPgy8CZH1vRECQQDv67rwV/oU1aDo\n" -"6/+d5nqjeW6mWkGqTnUU96jXap8EIw6B+0cUKskwx6mHJv+tEMM2748ZY7b0yBlg\n" -"w4KDghbFAkEAz2h8PjSJG55LwqmXih1RONSgdN9hjB12LwXL1CaDh7/lkEhq0PlK\n" -"PCAUwQSdM17Sl0Xxm2CZiekTSlwmHrtqXQJAF3+8QJwtV2sRJp8u2zVe37IeH1cJ\n" -"xXeHyjTzqZ2803fnjN2iuZvzNr7noOA1/Kp+pFvUZUU5/0G2Ep8zolPUjQJAFA7k\n" -"xRdLkzIx3XeNQjwnmLlncyYPRv+qaE3FMpUu7zftuZBnVCJnvXzUxP3vPgKTlzGa\n" -"dg5XivDRfsV+okY5uQJBAMV4FesUuLQVEKb6lMs7rzZwpeGQhFDRfywJzfom2TLn\n" -"2RdJQQ3dcgnhdVDgt5o1qkmsqQh8uJrJ9SdyLIaZQIc=\n" -"-----END RSA PRIVATE KEY-----\n" -"-----BEGIN CERTIFICATE-----\n" -"MIID6TCCA1KgAwIBAgIJANm4W/Tzs+s+MA0GCSqGSIb3DQEBBQUAMIGqMQswCQYD\n" -"VQQGEwJVUzERMA8GA1UECBMITmV3IFlvcmsxETAPBgNVBAcTCE5ldyBZb3JrMRYw\n" -"FAYDVQQKEw1TdGVhbWhlYXQubmV0MRQwEgYDVQQLEwtFbmdpbmVlcmluZzEdMBsG\n" -"A1UEAxMUb3BlbmNhLnN0ZWFtaGVhdC5uZXQxKDAmBgkqhkiG9w0BCQEWGWVuZ2lu\n" -"ZWVyaW5nQHN0ZWFtaGVhdC5uZXQwHhcNMDYwNTA1MTcwNjAzWhcNMjQwMjIwMTcw\n" -"NjAzWjCBqjELMAkGA1UEBhMCVVMxETAPBgNVBAgTCE5ldyBZb3JrMREwDwYDVQQH\n" -"EwhOZXcgWW9yazEWMBQGA1UEChMNU3RlYW1oZWF0Lm5ldDEUMBIGA1UECxMLRW5n\n" -"aW5lZXJpbmcxHTAbBgNVBAMTFG9wZW5jYS5zdGVhbWhlYXQubmV0MSgwJgYJKoZI\n" -"hvcNAQkBFhllbmdpbmVlcmluZ0BzdGVhbWhlYXQubmV0MIGfMA0GCSqGSIb3DQEB\n" -"AQUAA4GNADCBiQKBgQDCYYhcw6cGRbhBVShKmbWm7UVsEoBnUf0cCh8AX+MKhMxw\n" -"VDWVIgdskntn3cSJjRtmgVJHIK0lpb/FYHQB93Ohpd9/Z18pDmovfFF9nDbFF0t3\n" -"9hJ/AqSzFB3GiVPoFFZJEE1vJqh+3jzsSF5K56bZ6azz38VlZgXeSozNW5bXkQID\n" -"AQABo4IBEzCCAQ8wHQYDVR0OBBYEFPJvPd1Fcmd8o/Tm88r+NjYPICCkMIHfBgNV\n" -"HSMEgdcwgdSAFPJvPd1Fcmd8o/Tm88r+NjYPICCkoYGwpIGtMIGqMQswCQYDVQQG\n" -"EwJVUzERMA8GA1UECBMITmV3IFlvcmsxETAPBgNVBAcTCE5ldyBZb3JrMRYwFAYD\n" -"VQQKEw1TdGVhbWhlYXQubmV0MRQwEgYDVQQLEwtFbmdpbmVlcmluZzEdMBsGA1UE\n" -"AxMUb3BlbmNhLnN0ZWFtaGVhdC5uZXQxKDAmBgkqhkiG9w0BCQEWGWVuZ2luZWVy\n" -"aW5nQHN0ZWFtaGVhdC5uZXSCCQDZuFv087PrPjAMBgNVHRMEBTADAQH/MA0GCSqG\n" -"SIb3DQEBBQUAA4GBAC1CXey/4UoLgJiwcEMDxOvW74plks23090iziFIlGgcIhk0\n" -"Df6hTAs7H3MWww62ddvR8l07AWfSzSP5L6mDsbvq7EmQsmPODwb6C+i2aF3EDL8j\n" -"uw73m4YIGI0Zw2XdBpiOGkx2H56Kya6mJJe/5XORZedh1wpI7zki01tHYbcy\n" -"-----END CERTIFICATE-----\n"}; - -/* These private materials were made with: - * openssl req -new -x509 -keyout cakey.pem -out cacert.pem -nodes -days 6500 - * TODO: We need a full-blown capability to work with user-supplied - * keypairs and properly-signed certificates. - */ - - -/***************** -builtin_passwd_cb -*****************/ - -extern "C" int builtin_passwd_cb (char *buf, int bufsize, int rwflag, void *userdata) -{ - strcpy (buf, "kittycat"); - return 8; -} - -/**************************** -InitializeDefaultCredentials -****************************/ - -static void InitializeDefaultCredentials() -{ - BIO *bio = BIO_new_mem_buf (PrivateMaterials, -1); - assert (bio); - - if (DefaultPrivateKey) { - // we may come here in a restart. - EVP_PKEY_free (DefaultPrivateKey); - DefaultPrivateKey = NULL; - } - PEM_read_bio_PrivateKey (bio, &DefaultPrivateKey, builtin_passwd_cb, 0); - - if (DefaultCertificate) { - // we may come here in a restart. - X509_free (DefaultCertificate); - DefaultCertificate = NULL; - } - PEM_read_bio_X509 (bio, &DefaultCertificate, NULL, 0); - - BIO_free (bio); -} - - - -/************************** -SslContext_t::SslContext_t -**************************/ - -SslContext_t::SslContext_t (bool is_server, const string &privkeyfile, const string &certchainfile): - pCtx (NULL), - PrivateKey (NULL), - Certificate (NULL) -{ - /* TODO: the usage of the specified private-key and cert-chain filenames only applies to - * client-side connections at this point. Server connections currently use the default materials. - * That needs to be fixed asap. - * Also, in this implementation, server-side connections use statically defined X-509 defaults. - * One thing I'm really not clear on is whether or not you have to explicitly free X509 and EVP_PKEY - * objects when we call our destructor, or whether just calling SSL_CTX_free is enough. - */ - - if (!bLibraryInitialized) { - bLibraryInitialized = true; - SSL_library_init(); - OpenSSL_add_ssl_algorithms(); - OpenSSL_add_all_algorithms(); - SSL_load_error_strings(); - ERR_load_crypto_strings(); - - InitializeDefaultCredentials(); - } - - bIsServer = is_server; - pCtx = SSL_CTX_new (is_server ? SSLv23_server_method() : SSLv23_client_method()); - if (!pCtx) - throw std::runtime_error ("no SSL context"); - - SSL_CTX_set_options (pCtx, SSL_OP_ALL); - //SSL_CTX_set_options (pCtx, (SSL_OP_NO_SSLv2 | SSL_OP_NO_SSLv3)); - - if (is_server) { - // The SSL_CTX calls here do NOT allocate memory. - int e; - if (privkeyfile.length() > 0) - e = SSL_CTX_use_PrivateKey_file (pCtx, privkeyfile.c_str(), SSL_FILETYPE_PEM); - else - e = SSL_CTX_use_PrivateKey (pCtx, DefaultPrivateKey); - assert (e > 0); - if (certchainfile.length() > 0) - e = SSL_CTX_use_certificate_chain_file (pCtx, certchainfile.c_str()); - else - e = SSL_CTX_use_certificate (pCtx, DefaultCertificate); - assert (e > 0); - } - - SSL_CTX_set_cipher_list (pCtx, "ALL:!ADH:!LOW:!EXP:!DES-CBC3-SHA:@STRENGTH"); - - if (is_server) { - SSL_CTX_sess_set_cache_size (pCtx, 128); - SSL_CTX_set_session_id_context (pCtx, (unsigned char*)"eventmachine", 12); - } - else { - int e; - if (privkeyfile.length() > 0) { - e = SSL_CTX_use_PrivateKey_file (pCtx, privkeyfile.c_str(), SSL_FILETYPE_PEM); - assert (e > 0); - } - if (certchainfile.length() > 0) { - e = SSL_CTX_use_certificate_chain_file (pCtx, certchainfile.c_str()); - assert (e > 0); - } - } -} - - - -/*************************** -SslContext_t::~SslContext_t -***************************/ - -SslContext_t::~SslContext_t() -{ - if (pCtx) - SSL_CTX_free (pCtx); - if (PrivateKey) - EVP_PKEY_free (PrivateKey); - if (Certificate) - X509_free (Certificate); -} - - - -/****************** -SslBox_t::SslBox_t -******************/ - -SslBox_t::SslBox_t (bool is_server, const string &privkeyfile, const string &certchainfile): - bIsServer (is_server), - pSSL (NULL), - pbioRead (NULL), - pbioWrite (NULL) -{ - /* TODO someday: make it possible to re-use SSL contexts so we don't have to create - * a new one every time we come here. - */ - - Context = new SslContext_t (bIsServer, privkeyfile, certchainfile); - assert (Context); - - pbioRead = BIO_new (BIO_s_mem()); - assert (pbioRead); - - pbioWrite = BIO_new (BIO_s_mem()); - assert (pbioWrite); - - pSSL = SSL_new (Context->pCtx); - assert (pSSL); - SSL_set_bio (pSSL, pbioRead, pbioWrite); - - if (!bIsServer) - SSL_connect (pSSL); -} - - - -/******************* -SslBox_t::~SslBox_t -*******************/ - -SslBox_t::~SslBox_t() -{ - // Freeing pSSL will also free the associated BIOs, so DON'T free them separately. - if (pSSL) { - if (SSL_get_shutdown (pSSL) & SSL_RECEIVED_SHUTDOWN) - SSL_shutdown (pSSL); - else - SSL_clear (pSSL); - SSL_free (pSSL); - } - - delete Context; -} - - - -/*********************** -SslBox_t::PutCiphertext -***********************/ - -bool SslBox_t::PutCiphertext (const char *buf, int bufsize) -{ - assert (buf && (bufsize > 0)); - - assert (pbioRead); - int n = BIO_write (pbioRead, buf, bufsize); - - return (n == bufsize) ? true : false; -} - - -/********************** -SslBox_t::GetPlaintext -**********************/ - -int SslBox_t::GetPlaintext (char *buf, int bufsize) -{ - if (!SSL_is_init_finished (pSSL)) { - int e = bIsServer ? SSL_accept (pSSL) : SSL_connect (pSSL); - if (e < 0) { - int er = SSL_get_error (pSSL, e); - if (er != SSL_ERROR_WANT_READ) { - // Return -1 for a nonfatal error, -2 for an error that should force the connection down. - return (er == SSL_ERROR_SSL) ? (-2) : (-1); - } - else - return 0; - } - // If handshake finished, FALL THROUGH and return the available plaintext. - } - - if (!SSL_is_init_finished (pSSL)) { - // We can get here if a browser abandons a handshake. - // The user can see a warning dialog and abort the connection. - cerr << "<SSL_incomp>"; - return 0; - } - - //cerr << "CIPH: " << SSL_get_cipher (pSSL) << endl; - - int n = SSL_read (pSSL, buf, bufsize); - if (n >= 0) { - return n; - } - else { - if (SSL_get_error (pSSL, n) == SSL_ERROR_WANT_READ) { - return 0; - } - else { - return -1; - } - } - - return 0; -} - - - -/************************** -SslBox_t::CanGetCiphertext -**************************/ - -bool SslBox_t::CanGetCiphertext() -{ - assert (pbioWrite); - return BIO_pending (pbioWrite) ? true : false; -} - - - -/*********************** -SslBox_t::GetCiphertext -***********************/ - -int SslBox_t::GetCiphertext (char *buf, int bufsize) -{ - assert (pbioWrite); - assert (buf && (bufsize > 0)); - - return BIO_read (pbioWrite, buf, bufsize); -} - - - -/********************** -SslBox_t::PutPlaintext -**********************/ - -int SslBox_t::PutPlaintext (const char *buf, int bufsize) -{ - // The caller will interpret the return value as the number of bytes written. - // WARNING WARNING WARNING, are there any situations in which a 0 or -1 return - // from SSL_write means we should immediately retry? The socket-machine loop - // will probably wait for a time-out cycle (perhaps a second) before re-trying. - // THIS WOULD CAUSE A PERCEPTIBLE DELAY! - - /* We internally queue any outbound plaintext that can't be dispatched - * because we're in the middle of a handshake or something. - * When we get called, try to send any queued data first, and then - * send the caller's data (or queue it). We may get called with no outbound - * data, which means we try to send the outbound queue and that's all. - * - * Return >0 if we wrote any data, 0 if we didn't, and <0 for a fatal error. - * Note that if we return 0, the connection is still considered live - * and we are signalling that we have accepted the outbound data (if any). - */ - - OutboundQ.Push (buf, bufsize); - - if (!SSL_is_init_finished (pSSL)) - return 0; - - bool fatal = false; - bool did_work = false; - - while (OutboundQ.HasPages()) { - const char *page; - int length; - OutboundQ.Front (&page, &length); - assert (page && (length > 0)); - int n = SSL_write (pSSL, page, length); - if (n > 0) { - did_work = true; - OutboundQ.PopFront(); - } - else { - int er = SSL_get_error (pSSL, n); - if ((er != SSL_ERROR_WANT_READ) && (er != SSL_ERROR_WANT_WRITE)) - fatal = true; - break; - } - } - - - if (did_work) - return 1; - else if (fatal) - return -1; - else - return 0; -} - - -#endif // WITH_SSL - |