summaryrefslogtreecommitdiff
path: root/test/scanners/cpp/eventmachine.in.cpp
diff options
context:
space:
mode:
authormurphy <murphy@rubychan.de>2009-10-19 16:52:59 +0000
committermurphy <murphy@rubychan.de>2009-10-19 16:52:59 +0000
commita6f93eb4304a062af43083a75d525acc1af9883d (patch)
tree624996688e5b983a38ba08f9ea05a07017d0e432 /test/scanners/cpp/eventmachine.in.cpp
parent4f67b22339ea7025dc4536a7e369f60755103acc (diff)
downloadcoderay-a6f93eb4304a062af43083a75d525acc1af9883d.tar.gz
New Scanner: *C++* (#76)!
There's a problem with the ternary operator (?:) and labels which needs to be fixed in C, C++ and PHP scanners. I'll get to that soon.
Diffstat (limited to 'test/scanners/cpp/eventmachine.in.cpp')
-rw-r--r--test/scanners/cpp/eventmachine.in.cpp7035
1 files changed, 7035 insertions, 0 deletions
diff --git a/test/scanners/cpp/eventmachine.in.cpp b/test/scanners/cpp/eventmachine.in.cpp
new file mode 100644
index 0000000..050d601
--- /dev/null
+++ b/test/scanners/cpp/eventmachine.in.cpp
@@ -0,0 +1,7035 @@
+/*****************************************************************************
+
+$Id$
+
+File: binder.cpp
+Date: 07Apr06
+
+Copyright (C) 2006-07 by Francis Cianfrocca. All Rights Reserved.
+Gmail: blackhedd
+
+This program is free software; you can redistribute it and/or modify
+it under the terms of either: 1) the GNU General Public License
+as published by the Free Software Foundation; either version 2 of the
+License, or (at your option) any later version; or 2) Ruby's License.
+
+See the file COPYING for complete licensing information.
+
+*****************************************************************************/
+
+#include "project.h"
+
+#define DEV_URANDOM "/dev/urandom"
+
+
+map<string, Bindable_t*> Bindable_t::BindingBag;
+
+
+/********************************
+STATIC Bindable_t::CreateBinding
+********************************/
+
+string Bindable_t::CreateBinding()
+{
+ static int index = 0;
+ static string seed;
+
+ if ((index >= 1000000) || (seed.length() == 0)) {
+ #ifdef OS_UNIX
+ int fd = open (DEV_URANDOM, O_RDONLY);
+ if (fd < 0)
+ throw std::runtime_error ("No entropy device");
+
+ unsigned char u[16];
+ size_t r = read (fd, u, sizeof(u));
+ if (r < sizeof(u))
+ throw std::runtime_error ("Unable to read entropy device");
+
+ unsigned char *u1 = (unsigned char*)u;
+ char u2 [sizeof(u) * 2 + 1];
+
+ for (size_t i=0; i < sizeof(u); i++)
+ sprintf (u2 + (i * 2), "%02x", u1[i]);
+
+ seed = string (u2);
+ #endif
+
+
+ #ifdef OS_WIN32
+ UUID uuid;
+ UuidCreate (&uuid);
+ unsigned char *uuidstring = NULL;
+ UuidToString (&uuid, &uuidstring);
+ if (!uuidstring)
+ throw std::runtime_error ("Unable to read uuid");
+ seed = string ((const char*)uuidstring);
+
+ RpcStringFree (&uuidstring);
+ #endif
+
+ index = 0;
+
+
+ }
+
+ stringstream ss;
+ ss << seed << (++index);
+ return ss.str();
+}
+
+
+/*****************************
+STATIC: Bindable_t::GetObject
+*****************************/
+
+Bindable_t *Bindable_t::GetObject (const char *binding)
+{
+ string s (binding ? binding : "");
+ return GetObject (s);
+}
+
+/*****************************
+STATIC: Bindable_t::GetObject
+*****************************/
+
+Bindable_t *Bindable_t::GetObject (const string &binding)
+{
+ map<string, Bindable_t*>::const_iterator i = BindingBag.find (binding);
+ if (i != BindingBag.end())
+ return i->second;
+ else
+ return NULL;
+}
+
+
+/**********************
+Bindable_t::Bindable_t
+**********************/
+
+Bindable_t::Bindable_t()
+{
+ Binding = Bindable_t::CreateBinding();
+ BindingBag [Binding] = this;
+}
+
+
+
+/***********************
+Bindable_t::~Bindable_t
+***********************/
+
+Bindable_t::~Bindable_t()
+{
+ BindingBag.erase (Binding);
+}
+
+
+/*****************************************************************************
+
+$Id$
+
+File: cmain.cpp
+Date: 06Apr06
+
+Copyright (C) 2006-07 by Francis Cianfrocca. All Rights Reserved.
+Gmail: blackhedd
+
+This program is free software; you can redistribute it and/or modify
+it under the terms of either: 1) the GNU General Public License
+as published by the Free Software Foundation; either version 2 of the
+License, or (at your option) any later version; or 2) Ruby's License.
+
+See the file COPYING for complete licensing information.
+
+*****************************************************************************/
+
+#include "project.h"
+
+
+static EventMachine_t *EventMachine;
+static int bUseEpoll = 0;
+static int bUseKqueue = 0;
+
+extern "C" void ensure_eventmachine (const char *caller = "unknown caller")
+{
+ if (!EventMachine) {
+ const int err_size = 128;
+ char err_string[err_size];
+ snprintf (err_string, err_size, "eventmachine not initialized: %s", caller);
+ #ifdef BUILD_FOR_RUBY
+ rb_raise(rb_eRuntimeError, err_string);
+ #else
+ throw std::runtime_error (err_string);
+ #endif
+ }
+}
+
+/***********************
+evma_initialize_library
+***********************/
+
+extern "C" void evma_initialize_library (void(*cb)(const char*, int, const char*, int))
+{
+ // Probably a bad idea to mess with the signal mask of a process
+ // we're just being linked into.
+ //InstallSignalHandlers();
+ if (EventMachine)
+ #ifdef BUILD_FOR_RUBY
+ rb_raise(rb_eRuntimeError, "eventmachine already initialized: evma_initialize_library");
+ #else
+ throw std::runtime_error ("eventmachine already initialized: evma_initialize_library");
+ #endif
+ EventMachine = new EventMachine_t (cb);
+ if (bUseEpoll)
+ EventMachine->_UseEpoll();
+ if (bUseKqueue)
+ EventMachine->_UseKqueue();
+}
+
+
+/********************
+evma_release_library
+********************/
+
+extern "C" void evma_release_library()
+{
+ ensure_eventmachine("evma_release_library");
+ delete EventMachine;
+ EventMachine = NULL;
+}
+
+
+/****************
+evma_run_machine
+****************/
+
+extern "C" void evma_run_machine()
+{
+ ensure_eventmachine("evma_run_machine");
+ EventMachine->Run();
+}
+
+
+/**************************
+evma_install_oneshot_timer
+**************************/
+
+extern "C" const char *evma_install_oneshot_timer (int seconds)
+{
+ ensure_eventmachine("evma_install_oneshot_timer");
+ return EventMachine->InstallOneshotTimer (seconds);
+}
+
+
+/**********************
+evma_connect_to_server
+**********************/
+
+extern "C" const char *evma_connect_to_server (const char *server, int port)
+{
+ ensure_eventmachine("evma_connect_to_server");
+ return EventMachine->ConnectToServer (server, port);
+}
+
+/***************************
+evma_connect_to_unix_server
+***************************/
+
+extern "C" const char *evma_connect_to_unix_server (const char *server)
+{
+ ensure_eventmachine("evma_connect_to_unix_server");
+ return EventMachine->ConnectToUnixServer (server);
+}
+
+/**************
+evma_attach_fd
+**************/
+
+extern "C" const char *evma_attach_fd (int file_descriptor, int notify_readable, int notify_writable)
+{
+ ensure_eventmachine("evma_attach_fd");
+ return EventMachine->AttachFD (file_descriptor, (notify_readable ? true : false), (notify_writable ? true : false));
+}
+
+/**************
+evma_detach_fd
+**************/
+
+extern "C" int evma_detach_fd (const char *binding)
+{
+ ensure_eventmachine("evma_dettach_fd");
+ EventableDescriptor *ed = dynamic_cast <EventableDescriptor*> (Bindable_t::GetObject (binding));
+ if (ed)
+ return EventMachine->DetachFD (ed);
+ else
+ #ifdef BUILD_FOR_RUBY
+ rb_raise(rb_eRuntimeError, "invalid binding to detach");
+ #else
+ throw std::runtime_error ("invalid binding to detach");
+ #endif
+}
+
+/**********************
+evma_create_tcp_server
+**********************/
+
+extern "C" const char *evma_create_tcp_server (const char *address, int port)
+{
+ ensure_eventmachine("evma_create_tcp_server");
+ return EventMachine->CreateTcpServer (address, port);
+}
+
+/******************************
+evma_create_unix_domain_server
+******************************/
+
+extern "C" const char *evma_create_unix_domain_server (const char *filename)
+{
+ ensure_eventmachine("evma_create_unix_domain_server");
+ return EventMachine->CreateUnixDomainServer (filename);
+}
+
+/*************************
+evma_open_datagram_socket
+*************************/
+
+extern "C" const char *evma_open_datagram_socket (const char *address, int port)
+{
+ ensure_eventmachine("evma_open_datagram_socket");
+ return EventMachine->OpenDatagramSocket (address, port);
+}
+
+/******************
+evma_open_keyboard
+******************/
+
+extern "C" const char *evma_open_keyboard()
+{
+ ensure_eventmachine("evma_open_keyboard");
+ return EventMachine->OpenKeyboard();
+}
+
+
+
+/****************************
+evma_send_data_to_connection
+****************************/
+
+extern "C" int evma_send_data_to_connection (const char *binding, const char *data, int data_length)
+{
+ ensure_eventmachine("evma_send_data_to_connection");
+ return ConnectionDescriptor::SendDataToConnection (binding, data, data_length);
+}
+
+/******************
+evma_send_datagram
+******************/
+
+extern "C" int evma_send_datagram (const char *binding, const char *data, int data_length, const char *address, int port)
+{
+ ensure_eventmachine("evma_send_datagram");
+ return DatagramDescriptor::SendDatagram (binding, data, data_length, address, port);
+}
+
+
+/*********************
+evma_close_connection
+*********************/
+
+extern "C" void evma_close_connection (const char *binding, int after_writing)
+{
+ ensure_eventmachine("evma_close_connection");
+ ConnectionDescriptor::CloseConnection (binding, (after_writing ? true : false));
+}
+
+/***********************************
+evma_report_connection_error_status
+***********************************/
+
+extern "C" int evma_report_connection_error_status (const char *binding)
+{
+ ensure_eventmachine("evma_report_connection_error_status");
+ return ConnectionDescriptor::ReportErrorStatus (binding);
+}
+
+/********************
+evma_stop_tcp_server
+********************/
+
+extern "C" void evma_stop_tcp_server (const char *binding)
+{
+ ensure_eventmachine("evma_stop_tcp_server");
+ AcceptorDescriptor::StopAcceptor (binding);
+}
+
+
+/*****************
+evma_stop_machine
+*****************/
+
+extern "C" void evma_stop_machine()
+{
+ ensure_eventmachine("evma_stop_machine");
+ EventMachine->ScheduleHalt();
+}
+
+
+/**************
+evma_start_tls
+**************/
+
+extern "C" void evma_start_tls (const char *binding)
+{
+ ensure_eventmachine("evma_start_tls");
+ EventableDescriptor *ed = dynamic_cast <EventableDescriptor*> (Bindable_t::GetObject (binding));
+ if (ed)
+ ed->StartTls();
+}
+
+/******************
+evma_set_tls_parms
+******************/
+
+extern "C" void evma_set_tls_parms (const char *binding, const char *privatekey_filename, const char *certchain_filename)
+{
+ ensure_eventmachine("evma_set_tls_parms");
+ EventableDescriptor *ed = dynamic_cast <EventableDescriptor*> (Bindable_t::GetObject (binding));
+ if (ed)
+ ed->SetTlsParms (privatekey_filename, certchain_filename);
+}
+
+
+/*****************
+evma_get_peername
+*****************/
+
+extern "C" int evma_get_peername (const char *binding, struct sockaddr *sa)
+{
+ ensure_eventmachine("evma_get_peername");
+ EventableDescriptor *ed = dynamic_cast <EventableDescriptor*> (Bindable_t::GetObject (binding));
+ if (ed) {
+ return ed->GetPeername (sa) ? 1 : 0;
+ }
+ else
+ return 0;
+}
+
+/*****************
+evma_get_sockname
+*****************/
+
+extern "C" int evma_get_sockname (const char *binding, struct sockaddr *sa)
+{
+ ensure_eventmachine("evma_get_sockname");
+ EventableDescriptor *ed = dynamic_cast <EventableDescriptor*> (Bindable_t::GetObject (binding));
+ if (ed) {
+ return ed->GetSockname (sa) ? 1 : 0;
+ }
+ else
+ return 0;
+}
+
+/***********************
+evma_get_subprocess_pid
+***********************/
+
+extern "C" int evma_get_subprocess_pid (const char *binding, pid_t *pid)
+{
+ ensure_eventmachine("evma_get_subprocess_pid");
+ EventableDescriptor *ed = dynamic_cast <EventableDescriptor*> (Bindable_t::GetObject (binding));
+ if (ed) {
+ return ed->GetSubprocessPid (pid) ? 1 : 0;
+ }
+ else
+ return 0;
+}
+
+/**************************
+evma_get_subprocess_status
+**************************/
+
+extern "C" int evma_get_subprocess_status (const char *binding, int *status)
+{
+ ensure_eventmachine("evma_get_subprocess_status");
+ if (status) {
+ *status = EventMachine->SubprocessExitStatus;
+ return 1;
+ }
+ else
+ return 0;
+}
+
+
+/*********************
+evma_signal_loopbreak
+*********************/
+
+extern "C" void evma_signal_loopbreak()
+{
+ ensure_eventmachine("evma_signal_loopbreak");
+ EventMachine->SignalLoopBreaker();
+}
+
+
+
+/****************
+evma__write_file
+****************/
+
+extern "C" const char *evma__write_file (const char *filename)
+{
+ ensure_eventmachine("evma__write_file");
+ return EventMachine->_OpenFileForWriting (filename);
+}
+
+
+/********************************
+evma_get_comm_inactivity_timeout
+********************************/
+
+extern "C" int evma_get_comm_inactivity_timeout (const char *binding, int *value)
+{
+ ensure_eventmachine("evma_get_comm_inactivity_timeout");
+ EventableDescriptor *ed = dynamic_cast <EventableDescriptor*> (Bindable_t::GetObject (binding));
+ if (ed) {
+ return ed->GetCommInactivityTimeout (value);
+ }
+ else
+ return 0; //Perhaps this should be an exception. Access to an unknown binding.
+}
+
+/********************************
+evma_set_comm_inactivity_timeout
+********************************/
+
+extern "C" int evma_set_comm_inactivity_timeout (const char *binding, int *value)
+{
+ ensure_eventmachine("evma_set_comm_inactivity_timeout");
+ EventableDescriptor *ed = dynamic_cast <EventableDescriptor*> (Bindable_t::GetObject (binding));
+ if (ed) {
+ return ed->SetCommInactivityTimeout (value);
+ }
+ else
+ return 0; //Perhaps this should be an exception. Access to an unknown binding.
+}
+
+
+/**********************
+evma_set_timer_quantum
+**********************/
+
+extern "C" void evma_set_timer_quantum (int interval)
+{
+ ensure_eventmachine("evma_set_timer_quantum");
+ EventMachine->SetTimerQuantum (interval);
+}
+
+/************************
+evma_set_max_timer_count
+************************/
+
+extern "C" void evma_set_max_timer_count (int ct)
+{
+ // This may only be called if the reactor is not running.
+
+ if (EventMachine)
+ #ifdef BUILD_FOR_RUBY
+ rb_raise(rb_eRuntimeError, "eventmachine already initialized: evma_set_max_timer_count");
+ #else
+ throw std::runtime_error ("eventmachine already initialized: evma_set_max_timer_count");
+ #endif
+ EventMachine_t::SetMaxTimerCount (ct);
+}
+
+/******************
+evma_setuid_string
+******************/
+
+extern "C" void evma_setuid_string (const char *username)
+{
+ // We do NOT need to be running an EM instance because this method is static.
+ EventMachine_t::SetuidString (username);
+}
+
+
+/**********
+evma_popen
+**********/
+
+extern "C" const char *evma_popen (char * const*cmd_strings)
+{
+ ensure_eventmachine("evma_popen");
+ return EventMachine->Socketpair (cmd_strings);
+}
+
+
+/***************************
+evma_get_outbound_data_size
+***************************/
+
+extern "C" int evma_get_outbound_data_size (const char *binding)
+{
+ ensure_eventmachine("evma_get_outbound_data_size");
+ EventableDescriptor *ed = dynamic_cast <EventableDescriptor*> (Bindable_t::GetObject (binding));
+ return ed ? ed->GetOutboundDataSize() : 0;
+}
+
+
+/***********
+evma__epoll
+***********/
+
+extern "C" void evma__epoll()
+{
+ bUseEpoll = 1;
+}
+
+/************
+evma__kqueue
+************/
+
+extern "C" void evma__kqueue()
+{
+ bUseKqueue = 1;
+}
+
+
+/**********************
+evma_set_rlimit_nofile
+**********************/
+
+extern "C" int evma_set_rlimit_nofile (int nofiles)
+{
+ return EventMachine_t::SetRlimitNofile (nofiles);
+}
+
+
+/*********************************
+evma_send_file_data_to_connection
+*********************************/
+
+extern "C" int evma_send_file_data_to_connection (const char *binding, const char *filename)
+{
+ /* This is a sugaring over send_data_to_connection that reads a file into a
+ * locally-allocated buffer, and sends the file data to the remote peer.
+ * Return the number of bytes written to the caller.
+ * TODO, needs to impose a limit on the file size. This is intended only for
+ * small files. (I don't know, maybe 8K or less.) For larger files, use interleaved
+ * I/O to avoid slowing the rest of the system down.
+ * TODO: we should return a code rather than barf, in case of file-not-found.
+ * TODO, does this compile on Windows?
+ * TODO, given that we want this to work only with small files, how about allocating
+ * the buffer on the stack rather than the heap?
+ *
+ * Modified 25Jul07. This now returns -1 on file-too-large; 0 for success, and a positive
+ * errno in case of other errors.
+ *
+ /* Contributed by Kirk Haines.
+ */
+
+ char data[32*1024];
+ int r;
+
+ ensure_eventmachine("evma_send_file_data_to_connection");
+
+ int Fd = open (filename, O_RDONLY);
+
+ if (Fd < 0)
+ return errno;
+ // From here on, all early returns MUST close Fd.
+
+ struct stat st;
+ if (fstat (Fd, &st)) {
+ int e = errno;
+ close (Fd);
+ return e;
+ }
+
+ int filesize = st.st_size;
+ if (filesize <= 0) {
+ close (Fd);
+ return 0;
+ }
+ else if (filesize > sizeof(data)) {
+ close (Fd);
+ return -1;
+ }
+
+
+ r = read (Fd, data, filesize);
+ if (r != filesize) {
+ int e = errno;
+ close (Fd);
+ return e;
+ }
+ evma_send_data_to_connection (binding, data, r);
+ close (Fd);
+
+ return 0;
+}
+
+/*****************************************************************************
+
+$Id$
+
+File: cplusplus.cpp
+Date: 27Jul07
+
+Copyright (C) 2006-07 by Francis Cianfrocca. All Rights Reserved.
+Gmail: blackhedd
+
+This program is free software; you can redistribute it and/or modify
+it under the terms of either: 1) the GNU General Public License
+as published by the Free Software Foundation; either version 2 of the
+License, or (at your option) any later version; or 2) Ruby's License.
+
+See the file COPYING for complete licensing information.
+
+*****************************************************************************/
+
+
+#include "project.h"
+
+
+namespace EM {
+ static map<string, Eventable*> Eventables;
+ static map<string, void(*)()> Timers;
+}
+
+
+/*******
+EM::Run
+*******/
+
+void EM::Run (void (*start_func)())
+{
+ evma__epoll();
+ evma_initialize_library (EM::Callback);
+ if (start_func)
+ AddTimer (0, start_func);
+ evma_run_machine();
+ evma_release_library();
+}
+
+/************
+EM::AddTimer
+************/
+
+void EM::AddTimer (int milliseconds, void (*func)())
+{
+ if (func) {
+ const char *sig = evma_install_oneshot_timer (milliseconds);
+ Timers.insert (make_pair (sig, func));
+ }
+}
+
+
+/***************
+EM::StopReactor
+***************/
+
+void EM::StopReactor()
+{
+ evma_stop_machine();
+}
+
+
+/********************
+EM::Acceptor::Accept
+********************/
+
+void EM::Acceptor::Accept (const char *signature)
+{
+ Connection *c = MakeConnection();
+ c->Signature = signature;
+ Eventables.insert (make_pair (c->Signature, c));
+ c->PostInit();
+}
+
+/************************
+EM::Connection::SendData
+************************/
+
+void EM::Connection::SendData (const char *data)
+{
+ if (data)
+ SendData (data, strlen (data));
+}
+
+
+/************************
+EM::Connection::SendData
+************************/
+
+void EM::Connection::SendData (const char *data, int length)
+{
+ evma_send_data_to_connection (Signature.c_str(), data, length);
+}
+
+
+/*********************
+EM::Connection::Close
+*********************/
+
+void EM::Connection::Close (bool afterWriting)
+{
+ evma_close_connection (Signature.c_str(), afterWriting);
+}
+
+
+/***********************
+EM::Connection::Connect
+***********************/
+
+void EM::Connection::Connect (const char *host, int port)
+{
+ Signature = evma_connect_to_server (host, port);
+ Eventables.insert( make_pair (Signature, this));
+}
+
+/*******************
+EM::Acceptor::Start
+*******************/
+
+void EM::Acceptor::Start (const char *host, int port)
+{
+ Signature = evma_create_tcp_server (host, port);
+ Eventables.insert( make_pair (Signature, this));
+}
+
+
+
+/************
+EM::Callback
+************/
+
+void EM::Callback (const char *sig, int ev, const char *data, int length)
+{
+ EM::Eventable *e;
+ void (*f)();
+
+ switch (ev) {
+ case EM_TIMER_FIRED:
+ f = Timers [data];
+ if (f)
+ (*f)();
+ Timers.erase (sig);
+ break;
+
+ case EM_CONNECTION_READ:
+ e = EM::Eventables [sig];
+ e->ReceiveData (data, length);
+ break;
+
+ case EM_CONNECTION_COMPLETED:
+ e = EM::Eventables [sig];
+ e->ConnectionCompleted();
+ break;
+
+ case EM_CONNECTION_ACCEPTED:
+ e = EM::Eventables [sig];
+ e->Accept (data);
+ break;
+
+ case EM_CONNECTION_UNBOUND:
+ e = EM::Eventables [sig];
+ e->Unbind();
+ EM::Eventables.erase (sig);
+ delete e;
+ break;
+ }
+}
+
+/*****************************************************************************
+
+$Id$
+
+File: ed.cpp
+Date: 06Apr06
+
+Copyright (C) 2006-07 by Francis Cianfrocca. All Rights Reserved.
+Gmail: blackhedd
+
+This program is free software; you can redistribute it and/or modify
+it under the terms of either: 1) the GNU General Public License
+as published by the Free Software Foundation; either version 2 of the
+License, or (at your option) any later version; or 2) Ruby's License.
+
+See the file COPYING for complete licensing information.
+
+*****************************************************************************/
+
+#include "project.h"
+
+
+
+/********************
+SetSocketNonblocking
+********************/
+
+bool SetSocketNonblocking (SOCKET sd)
+{
+ #ifdef OS_UNIX
+ int val = fcntl (sd, F_GETFL, 0);
+ return (fcntl (sd, F_SETFL, val | O_NONBLOCK) != SOCKET_ERROR) ? true : false;
+ #endif
+
+ #ifdef OS_WIN32
+ unsigned long one = 1;
+ return (ioctlsocket (sd, FIONBIO, &one) == 0) ? true : false;
+ #endif
+}
+
+
+/****************************************
+EventableDescriptor::EventableDescriptor
+****************************************/
+
+EventableDescriptor::EventableDescriptor (int sd, EventMachine_t *em):
+ bCloseNow (false),
+ bCloseAfterWriting (false),
+ MySocket (sd),
+ EventCallback (NULL),
+ LastRead (0),
+ LastWritten (0),
+ bCallbackUnbind (true),
+ MyEventMachine (em)
+{
+ /* There are three ways to close a socket, all of which should
+ * automatically signal to the event machine that this object
+ * should be removed from the polling scheduler.
+ * First is a hard close, intended for bad errors or possible
+ * security violations. It immediately closes the connection
+ * and puts this object into an error state.
+ * Second is to set bCloseNow, which will cause the event machine
+ * to delete this object (and thus close the connection in our
+ * destructor) the next chance it gets. bCloseNow also inhibits
+ * the writing of new data on the socket (but not necessarily
+ * the reading of new data).
+ * The third way is to set bCloseAfterWriting, which inhibits
+ * the writing of new data and converts to bCloseNow as soon
+ * as everything in the outbound queue has been written.
+ * bCloseAfterWriting is really for use only by protocol handlers
+ * (for example, HTTP writes an HTML page and then closes the
+ * connection). All of the error states we generate internally
+ * cause an immediate close to be scheduled, which may have the
+ * effect of discarding outbound data.
+ */
+
+ if (sd == INVALID_SOCKET)
+ throw std::runtime_error ("bad eventable descriptor");
+ if (MyEventMachine == NULL)
+ throw std::runtime_error ("bad em in eventable descriptor");
+ CreatedAt = gCurrentLoopTime;
+
+ #ifdef HAVE_EPOLL
+ EpollEvent.data.ptr = this;
+ #endif
+}
+
+
+/*****************************************
+EventableDescriptor::~EventableDescriptor
+*****************************************/
+
+EventableDescriptor::~EventableDescriptor()
+{
+ if (EventCallback && bCallbackUnbind)
+ (*EventCallback)(GetBinding().c_str(), EM_CONNECTION_UNBOUND, NULL, 0);
+ Close();
+}
+
+
+/*************************************
+EventableDescriptor::SetEventCallback
+*************************************/
+
+void EventableDescriptor::SetEventCallback (void(*cb)(const char*, int, const char*, int))
+{
+ EventCallback = cb;
+}
+
+
+/**************************
+EventableDescriptor::Close
+**************************/
+
+void EventableDescriptor::Close()
+{
+ // Close the socket right now. Intended for emergencies.
+ if (MySocket != INVALID_SOCKET) {
+ shutdown (MySocket, 1);
+ closesocket (MySocket);
+ MySocket = INVALID_SOCKET;
+ }
+}
+
+
+/*********************************
+EventableDescriptor::ShouldDelete
+*********************************/
+
+bool EventableDescriptor::ShouldDelete()
+{
+ /* For use by a socket manager, which needs to know if this object
+ * should be removed from scheduling events and deleted.
+ * Has an immediate close been scheduled, or are we already closed?
+ * If either of these are the case, return true. In theory, the manager will
+ * then delete us, which in turn will make sure the socket is closed.
+ * Note, if bCloseAfterWriting is true, we check a virtual method to see
+ * if there is outbound data to write, and only request a close if there is none.
+ */
+
+ return ((MySocket == INVALID_SOCKET) || bCloseNow || (bCloseAfterWriting && (GetOutboundDataSize() <= 0)));
+}
+
+
+/**********************************
+EventableDescriptor::ScheduleClose
+**********************************/
+
+void EventableDescriptor::ScheduleClose (bool after_writing)
+{
+ // KEEP THIS SYNCHRONIZED WITH ::IsCloseScheduled.
+ if (after_writing)
+ bCloseAfterWriting = true;
+ else
+ bCloseNow = true;
+}
+
+
+/*************************************
+EventableDescriptor::IsCloseScheduled
+*************************************/
+
+bool EventableDescriptor::IsCloseScheduled()
+{
+ // KEEP THIS SYNCHRONIZED WITH ::ScheduleClose.
+ return (bCloseNow || bCloseAfterWriting);
+}
+
+
+/******************************************
+ConnectionDescriptor::ConnectionDescriptor
+******************************************/
+
+ConnectionDescriptor::ConnectionDescriptor (int sd, EventMachine_t *em):
+ EventableDescriptor (sd, em),
+ bConnectPending (false),
+ bNotifyReadable (false),
+ bNotifyWritable (false),
+ bReadAttemptedAfterClose (false),
+ bWriteAttemptedAfterClose (false),
+ OutboundDataSize (0),
+ #ifdef WITH_SSL
+ SslBox (NULL),
+ #endif
+ bIsServer (false),
+ LastIo (gCurrentLoopTime),
+ InactivityTimeout (0)
+{
+ #ifdef HAVE_EPOLL
+ EpollEvent.events = EPOLLOUT;
+ #endif
+ // 22Jan09: Moved ArmKqueueWriter into SetConnectPending() to fix assertion failure in _WriteOutboundData()
+}
+
+
+/*******************************************
+ConnectionDescriptor::~ConnectionDescriptor
+*******************************************/
+
+ConnectionDescriptor::~ConnectionDescriptor()
+{
+ // Run down any stranded outbound data.
+ for (size_t i=0; i < OutboundPages.size(); i++)
+ OutboundPages[i].Free();
+
+ #ifdef WITH_SSL
+ if (SslBox)
+ delete SslBox;
+ #endif
+}
+
+
+/**************************************************
+STATIC: ConnectionDescriptor::SendDataToConnection
+**************************************************/
+
+int ConnectionDescriptor::SendDataToConnection (const char *binding, const char *data, int data_length)
+{
+ // TODO: This is something of a hack, or at least it's a static method of the wrong class.
+ // TODO: Poor polymorphism here. We should be calling one virtual method
+ // instead of hacking out the runtime information of the target object.
+ ConnectionDescriptor *cd = dynamic_cast <ConnectionDescriptor*> (Bindable_t::GetObject (binding));
+ if (cd)
+ return cd->SendOutboundData (data, data_length);
+ DatagramDescriptor *ds = dynamic_cast <DatagramDescriptor*> (Bindable_t::GetObject (binding));
+ if (ds)
+ return ds->SendOutboundData (data, data_length);
+ #ifdef OS_UNIX
+ PipeDescriptor *ps = dynamic_cast <PipeDescriptor*> (Bindable_t::GetObject (binding));
+ if (ps)
+ return ps->SendOutboundData (data, data_length);
+ #endif
+ return -1;
+}
+
+
+/*********************************************
+STATIC: ConnectionDescriptor::CloseConnection
+*********************************************/
+
+void ConnectionDescriptor::CloseConnection (const char *binding, bool after_writing)
+{
+ // TODO: This is something of a hack, or at least it's a static method of the wrong class.
+ EventableDescriptor *ed = dynamic_cast <EventableDescriptor*> (Bindable_t::GetObject (binding));
+ if (ed)
+ ed->ScheduleClose (after_writing);
+}
+
+/***********************************************
+STATIC: ConnectionDescriptor::ReportErrorStatus
+***********************************************/
+
+int ConnectionDescriptor::ReportErrorStatus (const char *binding)
+{
+ // TODO: This is something of a hack, or at least it's a static method of the wrong class.
+ // TODO: Poor polymorphism here. We should be calling one virtual method
+ // instead of hacking out the runtime information of the target object.
+ ConnectionDescriptor *cd = dynamic_cast <ConnectionDescriptor*> (Bindable_t::GetObject (binding));
+ if (cd)
+ return cd->_ReportErrorStatus();
+ return -1;
+}
+
+/***************************************
+ConnectionDescriptor::SetConnectPending
+****************************************/
+
+void ConnectionDescriptor::SetConnectPending(bool f)
+{
+ bConnectPending = f;
+ #ifdef HAVE_KQUEUE
+ MyEventMachine->ArmKqueueWriter (this);
+ #endif
+}
+
+
+/**************************************
+ConnectionDescriptor::SendOutboundData
+**************************************/
+
+int ConnectionDescriptor::SendOutboundData (const char *data, int length)
+{
+ #ifdef WITH_SSL
+ if (SslBox) {
+ if (length > 0) {
+ int w = SslBox->PutPlaintext (data, length);
+ if (w < 0)
+ ScheduleClose (false);
+ else
+ _DispatchCiphertext();
+ }
+ // TODO: What's the correct return value?
+ return 1; // That's a wild guess, almost certainly wrong.
+ }
+ else
+ #endif
+ return _SendRawOutboundData (data, length);
+}
+
+
+
+/******************************************
+ConnectionDescriptor::_SendRawOutboundData
+******************************************/
+
+int ConnectionDescriptor::_SendRawOutboundData (const char *data, int length)
+{
+ /* This internal method is called to schedule bytes that
+ * will be sent out to the remote peer.
+ * It's not directly accessed by the caller, who hits ::SendOutboundData,
+ * which may or may not filter or encrypt the caller's data before
+ * sending it here.
+ */
+
+ // Highly naive and incomplete implementation.
+ // There's no throttle for runaways (which should abort only this connection
+ // and not the whole process), and no coalescing of small pages.
+ // (Well, not so bad, small pages are coalesced in ::Write)
+
+ if (IsCloseScheduled())
+ //if (bCloseNow || bCloseAfterWriting)
+ return 0;
+
+ if (!data && (length > 0))
+ throw std::runtime_error ("bad outbound data");
+ char *buffer = (char *) malloc (length + 1);
+ if (!buffer)
+ throw std::runtime_error ("no allocation for outbound data");
+ memcpy (buffer, data, length);
+ buffer [length] = 0;
+ OutboundPages.push_back (OutboundPage (buffer, length));
+ OutboundDataSize += length;
+ #ifdef HAVE_EPOLL
+ EpollEvent.events = (EPOLLIN | EPOLLOUT);
+ assert (MyEventMachine);
+ MyEventMachine->Modify (this);
+ #endif
+ #ifdef HAVE_KQUEUE
+ MyEventMachine->ArmKqueueWriter (this);
+ #endif
+ return length;
+}
+
+
+
+/***********************************
+ConnectionDescriptor::SelectForRead
+***********************************/
+
+bool ConnectionDescriptor::SelectForRead()
+{
+ /* A connection descriptor is always scheduled for read,
+ * UNLESS it's in a pending-connect state.
+ * On Linux, unlike Unix, a nonblocking socket on which
+ * connect has been called, does NOT necessarily select
+ * both readable and writable in case of error.
+ * The socket will select writable when the disposition
+ * of the connect is known. On the other hand, a socket
+ * which successfully connects and selects writable may
+ * indeed have some data available on it, so it will
+ * select readable in that case, violating expectations!
+ * So we will not poll for readability until the socket
+ * is known to be in a connected state.
+ */
+
+ return bConnectPending ? false : true;
+}
+
+
+/************************************
+ConnectionDescriptor::SelectForWrite
+************************************/
+
+bool ConnectionDescriptor::SelectForWrite()
+{
+ /* Cf the notes under SelectForRead.
+ * In a pending-connect state, we ALWAYS select for writable.
+ * In a normal state, we only select for writable when we
+ * have outgoing data to send.
+ */
+
+ if (bConnectPending || bNotifyWritable)
+ return true;
+ else {
+ return (GetOutboundDataSize() > 0);
+ }
+}
+
+
+/**************************
+ConnectionDescriptor::Read
+**************************/
+
+void ConnectionDescriptor::Read()
+{
+ /* Read and dispatch data on a socket that has selected readable.
+ * It's theoretically possible to get and dispatch incoming data on
+ * a socket that has already been scheduled for closing or close-after-writing.
+ * In those cases, we'll leave it up the to protocol handler to "do the
+ * right thing" (which probably means to ignore the incoming data).
+ *
+ * 22Aug06: Chris Ochs reports that on FreeBSD, it's possible to come
+ * here with the socket already closed, after the process receives
+ * a ctrl-C signal (not sure if that's TERM or INT on BSD). The application
+ * was one in which network connections were doing a lot of interleaved reads
+ * and writes.
+ * Since we always write before reading (in order to keep the outbound queues
+ * as light as possible), I think what happened is that an interrupt caused
+ * the socket to be closed in ConnectionDescriptor::Write. We'll then
+ * come here in the same pass through the main event loop, and won't get
+ * cleaned up until immediately after.
+ * We originally asserted that the socket was valid when we got here.
+ * To deal properly with the possibility that we are closed when we get here,
+ * I removed the assert. HOWEVER, the potential for an infinite loop scares me,
+ * so even though this is really clunky, I added a flag to assert that we never
+ * come here more than once after being closed. (FCianfrocca)
+ */
+
+ int sd = GetSocket();
+ //assert (sd != INVALID_SOCKET); (original, removed 22Aug06)
+ if (sd == INVALID_SOCKET) {
+ assert (!bReadAttemptedAfterClose);
+ bReadAttemptedAfterClose = true;
+ return;
+ }
+
+ if (bNotifyReadable) {
+ if (EventCallback)
+ (*EventCallback)(GetBinding().c_str(), EM_CONNECTION_NOTIFY_READABLE, NULL, 0);
+ return;
+ }
+
+ LastIo = gCurrentLoopTime;
+
+ int total_bytes_read = 0;
+ char readbuffer [16 * 1024 + 1];
+
+ for (int i=0; i < 10; i++) {
+ // Don't read just one buffer and then move on. This is faster
+ // if there is a lot of incoming.
+ // But don't read indefinitely. Give other sockets a chance to run.
+ // NOTICE, we're reading one less than the buffer size.
+ // That's so we can put a guard byte at the end of what we send
+ // to user code.
+
+
+ int r = recv (sd, readbuffer, sizeof(readbuffer) - 1, 0);
+ //cerr << "<R:" << r << ">";
+
+ if (r > 0) {
+ total_bytes_read += r;
+ LastRead = gCurrentLoopTime;
+
+ // Add a null-terminator at the the end of the buffer
+ // that we will send to the callback.
+ // DO NOT EVER CHANGE THIS. We want to explicitly allow users
+ // to be able to depend on this behavior, so they will have
+ // the option to do some things faster. Additionally it's
+ // a security guard against buffer overflows.
+ readbuffer [r] = 0;
+ _DispatchInboundData (readbuffer, r);
+ }
+ else if (r == 0) {
+ break;
+ }
+ else {
+ // Basically a would-block, meaning we've read everything there is to read.
+ break;
+ }
+
+ }
+
+
+ if (total_bytes_read == 0) {
+ // If we read no data on a socket that selected readable,
+ // it generally means the other end closed the connection gracefully.
+ ScheduleClose (false);
+ //bCloseNow = true;
+ }
+
+}
+
+
+
+/******************************************
+ConnectionDescriptor::_DispatchInboundData
+******************************************/
+
+void ConnectionDescriptor::_DispatchInboundData (const char *buffer, int size)
+{
+ #ifdef WITH_SSL
+ if (SslBox) {
+ SslBox->PutCiphertext (buffer, size);
+
+ int s;
+ char B [2048];
+ while ((s = SslBox->GetPlaintext (B, sizeof(B) - 1)) > 0) {
+ B [s] = 0;
+ if (EventCallback)
+ (*EventCallback)(GetBinding().c_str(), EM_CONNECTION_READ, B, s);
+ }
+ // INCOMPLETE, s may indicate an SSL error that would force the connection down.
+ _DispatchCiphertext();
+ }
+ else {
+ if (EventCallback)
+ (*EventCallback)(GetBinding().c_str(), EM_CONNECTION_READ, buffer, size);
+ }
+ #endif
+
+ #ifdef WITHOUT_SSL
+ if (EventCallback)
+ (*EventCallback)(GetBinding().c_str(), EM_CONNECTION_READ, buffer, size);
+ #endif
+}
+
+
+
+
+
+/***************************
+ConnectionDescriptor::Write
+***************************/
+
+void ConnectionDescriptor::Write()
+{
+ /* A socket which is in a pending-connect state will select
+ * writable when the disposition of the connect is known.
+ * At that point, check to be sure there are no errors,
+ * and if none, then promote the socket out of the pending
+ * state.
+ * TODO: I haven't figured out how Windows signals errors on
+ * unconnected sockets. Maybe it does the untraditional but
+ * logical thing and makes the socket selectable for error.
+ * If so, it's unsupported here for the time being, and connect
+ * errors will have to be caught by the timeout mechanism.
+ */
+
+ if (bConnectPending) {
+ int error;
+ socklen_t len;
+ len = sizeof(error);
+ #ifdef OS_UNIX
+ int o = getsockopt (GetSocket(), SOL_SOCKET, SO_ERROR, &error, &len);
+ #endif
+ #ifdef OS_WIN32
+ int o = getsockopt (GetSocket(), SOL_SOCKET, SO_ERROR, (char*)&error, &len);
+ #endif
+ if ((o == 0) && (error == 0)) {
+ if (EventCallback)
+ (*EventCallback)(GetBinding().c_str(), EM_CONNECTION_COMPLETED, "", 0);
+ bConnectPending = false;
+ #ifdef HAVE_EPOLL
+ // The callback may have scheduled outbound data.
+ EpollEvent.events = EPOLLIN | (SelectForWrite() ? EPOLLOUT : 0);
+ #endif
+ #ifdef HAVE_KQUEUE
+ MyEventMachine->ArmKqueueReader (this);
+ // The callback may have scheduled outbound data.
+ if (SelectForWrite())
+ MyEventMachine->ArmKqueueWriter (this);
+ #endif
+ }
+ else
+ ScheduleClose (false);
+ //bCloseNow = true;
+ }
+ else {
+
+ if (bNotifyWritable) {
+ if (EventCallback)
+ (*EventCallback)(GetBinding().c_str(), EM_CONNECTION_NOTIFY_WRITABLE, NULL, 0);
+ return;
+ }
+
+ _WriteOutboundData();
+ }
+}
+
+
+/****************************************
+ConnectionDescriptor::_WriteOutboundData
+****************************************/
+
+void ConnectionDescriptor::_WriteOutboundData()
+{
+ /* This is a helper function called by ::Write.
+ * It's possible for a socket to select writable and then no longer
+ * be writable by the time we get around to writing. The kernel might
+ * have used up its available output buffers between the select call
+ * and when we get here. So this condition is not an error.
+ *
+ * 20Jul07, added the same kind of protection against an invalid socket
+ * that is at the top of ::Read. Not entirely how this could happen in
+ * real life (connection-reset from the remote peer, perhaps?), but I'm
+ * doing it to address some reports of crashing under heavy loads.
+ */
+
+ int sd = GetSocket();
+ //assert (sd != INVALID_SOCKET);
+ if (sd == INVALID_SOCKET) {
+ assert (!bWriteAttemptedAfterClose);
+ bWriteAttemptedAfterClose = true;
+ return;
+ }
+
+ LastIo = gCurrentLoopTime;
+ char output_buffer [16 * 1024];
+ size_t nbytes = 0;
+
+ while ((OutboundPages.size() > 0) && (nbytes < sizeof(output_buffer))) {
+ OutboundPage *op = &(OutboundPages[0]);
+ if ((nbytes + op->Length - op->Offset) < sizeof (output_buffer)) {
+ memcpy (output_buffer + nbytes, op->Buffer + op->Offset, op->Length - op->Offset);
+ nbytes += (op->Length - op->Offset);
+ op->Free();
+ OutboundPages.pop_front();
+ }
+ else {
+ int len = sizeof(output_buffer) - nbytes;
+ memcpy (output_buffer + nbytes, op->Buffer + op->Offset, len);
+ op->Offset += len;
+ nbytes += len;
+ }
+ }
+
+ // We should never have gotten here if there were no data to write,
+ // so assert that as a sanity check.
+ // Don't bother to make sure nbytes is less than output_buffer because
+ // if it were we probably would have crashed already.
+ assert (nbytes > 0);
+
+ assert (GetSocket() != INVALID_SOCKET);
+ int bytes_written = send (GetSocket(), output_buffer, nbytes, 0);
+
+ bool err = false;
+ if (bytes_written < 0) {
+ err = true;
+ bytes_written = 0;
+ }
+
+ assert (bytes_written >= 0);
+ OutboundDataSize -= bytes_written;
+ if ((size_t)bytes_written < nbytes) {
+ int len = nbytes - bytes_written;
+ char *buffer = (char*) malloc (len + 1);
+ if (!buffer)
+ throw std::runtime_error ("bad alloc throwing back data");
+ memcpy (buffer, output_buffer + bytes_written, len);
+ buffer [len] = 0;
+ OutboundPages.push_front (OutboundPage (buffer, len));
+ }
+
+ #ifdef HAVE_EPOLL
+ EpollEvent.events = (EPOLLIN | (SelectForWrite() ? EPOLLOUT : 0));
+ assert (MyEventMachine);
+ MyEventMachine->Modify (this);
+ #endif
+ #ifdef HAVE_KQUEUE
+ if (SelectForWrite())
+ MyEventMachine->ArmKqueueWriter (this);
+ #endif
+
+
+ if (err) {
+ #ifdef OS_UNIX
+ if ((errno != EINPROGRESS) && (errno != EWOULDBLOCK) && (errno != EINTR))
+ #endif
+ #ifdef OS_WIN32
+ if ((errno != WSAEINPROGRESS) && (errno != WSAEWOULDBLOCK))
+ #endif
+ Close();
+ }
+}
+
+
+/****************************************
+ConnectionDescriptor::_ReportErrorStatus
+****************************************/
+
+int ConnectionDescriptor::_ReportErrorStatus()
+{
+ int error;
+ socklen_t len;
+ len = sizeof(error);
+ #ifdef OS_UNIX
+ int o = getsockopt (GetSocket(), SOL_SOCKET, SO_ERROR, &error, &len);
+ #endif
+ #ifdef OS_WIN32
+ int o = getsockopt (GetSocket(), SOL_SOCKET, SO_ERROR, (char*)&error, &len);
+ #endif
+ if ((o == 0) && (error == 0))
+ return 0;
+ else
+ return 1;
+}
+
+
+/******************************
+ConnectionDescriptor::StartTls
+******************************/
+
+void ConnectionDescriptor::StartTls()
+{
+ #ifdef WITH_SSL
+ if (SslBox)
+ throw std::runtime_error ("SSL/TLS already running on connection");
+
+ SslBox = new SslBox_t (bIsServer, PrivateKeyFilename, CertChainFilename);
+ _DispatchCiphertext();
+ #endif
+
+ #ifdef WITHOUT_SSL
+ throw std::runtime_error ("Encryption not available on this event-machine");
+ #endif
+}
+
+
+/*********************************
+ConnectionDescriptor::SetTlsParms
+*********************************/
+
+void ConnectionDescriptor::SetTlsParms (const char *privkey_filename, const char *certchain_filename)
+{
+ #ifdef WITH_SSL
+ if (SslBox)
+ throw std::runtime_error ("call SetTlsParms before calling StartTls");
+ if (privkey_filename && *privkey_filename)
+ PrivateKeyFilename = privkey_filename;
+ if (certchain_filename && *certchain_filename)
+ CertChainFilename = certchain_filename;
+ #endif
+
+ #ifdef WITHOUT_SSL
+ throw std::runtime_error ("Encryption not available on this event-machine");
+ #endif
+}
+
+
+
+/*****************************************
+ConnectionDescriptor::_DispatchCiphertext
+*****************************************/
+#ifdef WITH_SSL
+void ConnectionDescriptor::_DispatchCiphertext()
+{
+ assert (SslBox);
+
+
+ char BigBuf [2048];
+ bool did_work;
+
+ do {
+ did_work = false;
+
+ // try to drain ciphertext
+ while (SslBox->CanGetCiphertext()) {
+ int r = SslBox->GetCiphertext (BigBuf, sizeof(BigBuf));
+ assert (r > 0);
+ _SendRawOutboundData (BigBuf, r);
+ did_work = true;
+ }
+
+ // Pump the SslBox, in case it has queued outgoing plaintext
+ // This will return >0 if data was written,
+ // 0 if no data was written, and <0 if there was a fatal error.
+ bool pump;
+ do {
+ pump = false;
+ int w = SslBox->PutPlaintext (NULL, 0);
+ if (w > 0) {
+ did_work = true;
+ pump = true;
+ }
+ else if (w < 0)
+ ScheduleClose (false);
+ } while (pump);
+
+ // try to put plaintext. INCOMPLETE, doesn't belong here?
+ // In SendOutboundData, we're spooling plaintext directly
+ // into SslBox. That may be wrong, we may need to buffer it
+ // up here!
+ /*
+ const char *ptr;
+ int ptr_length;
+ while (OutboundPlaintext.GetPage (&ptr, &ptr_length)) {
+ assert (ptr && (ptr_length > 0));
+ int w = SslMachine.PutPlaintext (ptr, ptr_length);
+ if (w > 0) {
+ OutboundPlaintext.DiscardBytes (w);
+ did_work = true;
+ }
+ else
+ break;
+ }
+ */
+
+ } while (did_work);
+
+}
+#endif
+
+
+
+/*******************************
+ConnectionDescriptor::Heartbeat
+*******************************/
+
+void ConnectionDescriptor::Heartbeat()
+{
+ /* Only allow a certain amount of time to go by while waiting
+ * for a pending connect. If it expires, then kill the socket.
+ * For a connected socket, close it if its inactivity timer
+ * has expired.
+ */
+
+ if (bConnectPending) {
+ if ((gCurrentLoopTime - CreatedAt) >= PendingConnectTimeout)
+ ScheduleClose (false);
+ //bCloseNow = true;
+ }
+ else {
+ if (InactivityTimeout && ((gCurrentLoopTime - LastIo) >= InactivityTimeout))
+ ScheduleClose (false);
+ //bCloseNow = true;
+ }
+}
+
+
+/****************************************
+LoopbreakDescriptor::LoopbreakDescriptor
+****************************************/
+
+LoopbreakDescriptor::LoopbreakDescriptor (int sd, EventMachine_t *parent_em):
+ EventableDescriptor (sd, parent_em)
+{
+ /* This is really bad and ugly. Change someday if possible.
+ * We have to know about an event-machine (probably the one that owns us),
+ * so we can pass newly-created connections to it.
+ */
+
+ bCallbackUnbind = false;
+
+ #ifdef HAVE_EPOLL
+ EpollEvent.events = EPOLLIN;
+ #endif
+ #ifdef HAVE_KQUEUE
+ MyEventMachine->ArmKqueueReader (this);
+ #endif
+}
+
+
+
+
+/*************************
+LoopbreakDescriptor::Read
+*************************/
+
+void LoopbreakDescriptor::Read()
+{
+ // TODO, refactor, this code is probably in the wrong place.
+ assert (MyEventMachine);
+ MyEventMachine->_ReadLoopBreaker();
+}
+
+
+/**************************
+LoopbreakDescriptor::Write
+**************************/
+
+void LoopbreakDescriptor::Write()
+{
+ // Why are we here?
+ throw std::runtime_error ("bad code path in loopbreak");
+}
+
+/**************************************
+AcceptorDescriptor::AcceptorDescriptor
+**************************************/
+
+AcceptorDescriptor::AcceptorDescriptor (int sd, EventMachine_t *parent_em):
+ EventableDescriptor (sd, parent_em)
+{
+ #ifdef HAVE_EPOLL
+ EpollEvent.events = EPOLLIN;
+ #endif
+ #ifdef HAVE_KQUEUE
+ MyEventMachine->ArmKqueueReader (this);
+ #endif
+}
+
+
+/***************************************
+AcceptorDescriptor::~AcceptorDescriptor
+***************************************/
+
+AcceptorDescriptor::~AcceptorDescriptor()
+{
+}
+
+/****************************************
+STATIC: AcceptorDescriptor::StopAcceptor
+****************************************/
+
+void AcceptorDescriptor::StopAcceptor (const char *binding)
+{
+ // TODO: This is something of a hack, or at least it's a static method of the wrong class.
+ AcceptorDescriptor *ad = dynamic_cast <AcceptorDescriptor*> (Bindable_t::GetObject (binding));
+ if (ad)
+ ad->ScheduleClose (false);
+ else
+ throw std::runtime_error ("failed to close nonexistent acceptor");
+}
+
+
+/************************
+AcceptorDescriptor::Read
+************************/
+
+void AcceptorDescriptor::Read()
+{
+ /* Accept up to a certain number of sockets on the listening connection.
+ * Don't try to accept all that are present, because this would allow a DoS attack
+ * in which no data were ever read or written. We should accept more than one,
+ * if available, to keep the partially accepted sockets from backing up in the kernel.
+ */
+
+ /* Make sure we use non-blocking i/o on the acceptor socket, since we're selecting it
+ * for readability. According to Stevens UNP, it's possible for an acceptor to select readable
+ * and then block when we call accept. For example, the other end resets the connection after
+ * the socket selects readable and before we call accept. The kernel will remove the dead
+ * socket from the accept queue. If the accept queue is now empty, accept will block.
+ */
+
+
+ struct sockaddr_in pin;
+ socklen_t addrlen = sizeof (pin);
+
+ for (int i=0; i < 10; i++) {
+ int sd = accept (GetSocket(), (struct sockaddr*)&pin, &addrlen);
+ if (sd == INVALID_SOCKET) {
+ // This breaks the loop when we've accepted everything on the kernel queue,
+ // up to 10 new connections. But what if the *first* accept fails?
+ // Does that mean anything serious is happening, beyond the situation
+ // described in the note above?
+ break;
+ }
+
+ // Set the newly-accepted socket non-blocking.
+ // On Windows, this may fail because, weirdly, Windows inherits the non-blocking
+ // attribute that we applied to the acceptor socket into the accepted one.
+ if (!SetSocketNonblocking (sd)) {
+ //int val = fcntl (sd, F_GETFL, 0);
+ //if (fcntl (sd, F_SETFL, val | O_NONBLOCK) == -1) {
+ shutdown (sd, 1);
+ closesocket (sd);
+ continue;
+ }
+
+
+ // Disable slow-start (Nagle algorithm). Eventually make this configurable.
+ int one = 1;
+ setsockopt (sd, IPPROTO_TCP, TCP_NODELAY, (char*) &one, sizeof(one));
+
+
+ ConnectionDescriptor *cd = new ConnectionDescriptor (sd, MyEventMachine);
+ if (!cd)
+ throw std::runtime_error ("no newly accepted connection");
+ cd->SetServerMode();
+ if (EventCallback) {
+ (*EventCallback) (GetBinding().c_str(), EM_CONNECTION_ACCEPTED, cd->GetBinding().c_str(), cd->GetBinding().size());
+ }
+ #ifdef HAVE_EPOLL
+ cd->GetEpollEvent()->events = EPOLLIN | (cd->SelectForWrite() ? EPOLLOUT : 0);
+ #endif
+ assert (MyEventMachine);
+ MyEventMachine->Add (cd);
+ #ifdef HAVE_KQUEUE
+ if (cd->SelectForWrite())
+ MyEventMachine->ArmKqueueWriter (cd);
+ MyEventMachine->ArmKqueueReader (cd);
+ #endif
+ }
+
+}
+
+
+/*************************
+AcceptorDescriptor::Write
+*************************/
+
+void AcceptorDescriptor::Write()
+{
+ // Why are we here?
+ throw std::runtime_error ("bad code path in acceptor");
+}
+
+
+/*****************************
+AcceptorDescriptor::Heartbeat
+*****************************/
+
+void AcceptorDescriptor::Heartbeat()
+{
+ // No-op
+}
+
+
+/*******************************
+AcceptorDescriptor::GetSockname
+*******************************/
+
+bool AcceptorDescriptor::GetSockname (struct sockaddr *s)
+{
+ bool ok = false;
+ if (s) {
+ socklen_t len = sizeof(*s);
+ int gp = getsockname (GetSocket(), s, &len);
+ if (gp == 0)
+ ok = true;
+ }
+ return ok;
+}
+
+
+
+/**************************************
+DatagramDescriptor::DatagramDescriptor
+**************************************/
+
+DatagramDescriptor::DatagramDescriptor (int sd, EventMachine_t *parent_em):
+ EventableDescriptor (sd, parent_em),
+ OutboundDataSize (0),
+ LastIo (gCurrentLoopTime),
+ InactivityTimeout (0)
+{
+ memset (&ReturnAddress, 0, sizeof(ReturnAddress));
+
+ /* Provisionally added 19Oct07. All datagram sockets support broadcasting.
+ * Until now, sending to a broadcast address would give EACCES (permission denied)
+ * on systems like Linux and BSD that require the SO_BROADCAST socket-option in order
+ * to accept a packet to a broadcast address. Solaris doesn't require it. I think
+ * Windows DOES require it but I'm not sure.
+ *
+ * Ruby does NOT do what we're doing here. In Ruby, you have to explicitly set SO_BROADCAST
+ * on a UDP socket in order to enable broadcasting. The reason for requiring the option
+ * in the first place is so that applications don't send broadcast datagrams by mistake.
+ * I imagine that could happen if a user of an application typed in an address that happened
+ * to be a broadcast address on that particular subnet.
+ *
+ * This is provisional because someone may eventually come up with a good reason not to
+ * do it for all UDP sockets. If that happens, then we'll need to add a usercode-level API
+ * to set the socket option, just like Ruby does. AND WE'LL ALSO BREAK CODE THAT DOESN'T
+ * EXPLICITLY SET THE OPTION.
+ */
+
+ int oval = 1;
+ int sob = setsockopt (GetSocket(), SOL_SOCKET, SO_BROADCAST, (char*)&oval, sizeof(oval));
+
+ #ifdef HAVE_EPOLL
+ EpollEvent.events = EPOLLIN;
+ #endif
+ #ifdef HAVE_KQUEUE
+ MyEventMachine->ArmKqueueReader (this);
+ #endif
+}
+
+
+/***************************************
+DatagramDescriptor::~DatagramDescriptor
+***************************************/
+
+DatagramDescriptor::~DatagramDescriptor()
+{
+ // Run down any stranded outbound data.
+ for (size_t i=0; i < OutboundPages.size(); i++)
+ OutboundPages[i].Free();
+}
+
+
+/*****************************
+DatagramDescriptor::Heartbeat
+*****************************/
+
+void DatagramDescriptor::Heartbeat()
+{
+ // Close it if its inactivity timer has expired.
+
+ if (InactivityTimeout && ((gCurrentLoopTime - LastIo) >= InactivityTimeout))
+ ScheduleClose (false);
+ //bCloseNow = true;
+}
+
+
+/************************
+DatagramDescriptor::Read
+************************/
+
+void DatagramDescriptor::Read()
+{
+ int sd = GetSocket();
+ assert (sd != INVALID_SOCKET);
+ LastIo = gCurrentLoopTime;
+
+ // This is an extremely large read buffer.
+ // In many cases you wouldn't expect to get any more than 4K.
+ char readbuffer [16 * 1024];
+
+ for (int i=0; i < 10; i++) {
+ // Don't read just one buffer and then move on. This is faster
+ // if there is a lot of incoming.
+ // But don't read indefinitely. Give other sockets a chance to run.
+ // NOTICE, we're reading one less than the buffer size.
+ // That's so we can put a guard byte at the end of what we send
+ // to user code.
+
+ struct sockaddr_in sin;
+ socklen_t slen = sizeof (sin);
+ memset (&sin, 0, slen);
+
+ int r = recvfrom (sd, readbuffer, sizeof(readbuffer) - 1, 0, (struct sockaddr*)&sin, &slen);
+ //cerr << "<R:" << r << ">";
+
+ // In UDP, a zero-length packet is perfectly legal.
+ if (r >= 0) {
+ LastRead = gCurrentLoopTime;
+
+ // Add a null-terminator at the the end of the buffer
+ // that we will send to the callback.
+ // DO NOT EVER CHANGE THIS. We want to explicitly allow users
+ // to be able to depend on this behavior, so they will have
+ // the option to do some things faster. Additionally it's
+ // a security guard against buffer overflows.
+ readbuffer [r] = 0;
+
+
+ // Set up a "temporary" return address so that callers can "reply" to us
+ // from within the callback we are about to invoke. That means that ordinary
+ // calls to "send_data_to_connection" (which is of course misnamed in this
+ // case) will result in packets being sent back to the same place that sent
+ // us this one.
+ // There is a different call (evma_send_datagram) for cases where the caller
+ // actually wants to send a packet somewhere else.
+
+ memset (&ReturnAddress, 0, sizeof(ReturnAddress));
+ memcpy (&ReturnAddress, &sin, slen);
+
+ if (EventCallback)
+ (*EventCallback)(GetBinding().c_str(), EM_CONNECTION_READ, readbuffer, r);
+
+ }
+ else {
+ // Basically a would-block, meaning we've read everything there is to read.
+ break;
+ }
+
+ }
+
+
+}
+
+
+/*************************
+DatagramDescriptor::Write
+*************************/
+
+void DatagramDescriptor::Write()
+{
+ /* It's possible for a socket to select writable and then no longer
+ * be writable by the time we get around to writing. The kernel might
+ * have used up its available output buffers between the select call
+ * and when we get here. So this condition is not an error.
+ * This code is very reminiscent of ConnectionDescriptor::_WriteOutboundData,
+ * but differs in the that the outbound data pages (received from the
+ * user) are _message-structured._ That is, we send each of them out
+ * one message at a time.
+ * TODO, we are currently suppressing the EMSGSIZE error!!!
+ */
+
+ int sd = GetSocket();
+ assert (sd != INVALID_SOCKET);
+ LastIo = gCurrentLoopTime;
+
+ assert (OutboundPages.size() > 0);
+
+ // Send out up to 10 packets, then cycle the machine.
+ for (int i = 0; i < 10; i++) {
+ if (OutboundPages.size() <= 0)
+ break;
+ OutboundPage *op = &(OutboundPages[0]);
+
+ // The nasty cast to (char*) is needed because Windows is brain-dead.
+ int s = sendto (sd, (char*)op->Buffer, op->Length, 0, (struct sockaddr*)&(op->From), sizeof(op->From));
+ int e = errno;
+
+ OutboundDataSize -= op->Length;
+ op->Free();
+ OutboundPages.pop_front();
+
+ if (s == SOCKET_ERROR) {
+ #ifdef OS_UNIX
+ if ((e != EINPROGRESS) && (e != EWOULDBLOCK) && (e != EINTR)) {
+ #endif
+ #ifdef OS_WIN32
+ if ((e != WSAEINPROGRESS) && (e != WSAEWOULDBLOCK)) {
+ #endif
+ Close();
+ break;
+ }
+ }
+ }
+
+ #ifdef HAVE_EPOLL
+ EpollEvent.events = (EPOLLIN | (SelectForWrite() ? EPOLLOUT : 0));
+ assert (MyEventMachine);
+ MyEventMachine->Modify (this);
+ #endif
+}
+
+
+/**********************************
+DatagramDescriptor::SelectForWrite
+**********************************/
+
+bool DatagramDescriptor::SelectForWrite()
+{
+ /* Changed 15Nov07, per bug report by Mark Zvillius.
+ * The outbound data size will be zero if there are zero-length outbound packets,
+ * so we now select writable in case the outbound page buffer is not empty.
+ * Note that the superclass ShouldDelete method still checks for outbound data size,
+ * which may be wrong.
+ */
+ //return (GetOutboundDataSize() > 0); (Original)
+ return (OutboundPages.size() > 0);
+}
+
+
+/************************************
+DatagramDescriptor::SendOutboundData
+************************************/
+
+int DatagramDescriptor::SendOutboundData (const char *data, int length)
+{
+ // This is an exact clone of ConnectionDescriptor::SendOutboundData.
+ // That means it needs to move to a common ancestor.
+
+ if (IsCloseScheduled())
+ //if (bCloseNow || bCloseAfterWriting)
+ return 0;
+
+ if (!data && (length > 0))
+ throw std::runtime_error ("bad outbound data");
+ char *buffer = (char *) malloc (length + 1);
+ if (!buffer)
+ throw std::runtime_error ("no allocation for outbound data");
+ memcpy (buffer, data, length);
+ buffer [length] = 0;
+ OutboundPages.push_back (OutboundPage (buffer, length, ReturnAddress));
+ OutboundDataSize += length;
+ #ifdef HAVE_EPOLL
+ EpollEvent.events = (EPOLLIN | EPOLLOUT);
+ assert (MyEventMachine);
+ MyEventMachine->Modify (this);
+ #endif
+ return length;
+}
+
+
+/****************************************
+DatagramDescriptor::SendOutboundDatagram
+****************************************/
+
+int DatagramDescriptor::SendOutboundDatagram (const char *data, int length, const char *address, int port)
+{
+ // This is an exact clone of ConnectionDescriptor::SendOutboundData.
+ // That means it needs to move to a common ancestor.
+ // TODO: Refactor this so there's no overlap with SendOutboundData.
+
+ if (IsCloseScheduled())
+ //if (bCloseNow || bCloseAfterWriting)
+ return 0;
+
+ if (!address || !*address || !port)
+ return 0;
+
+ sockaddr_in pin;
+ unsigned long HostAddr;
+
+ HostAddr = inet_addr (address);
+ if (HostAddr == INADDR_NONE) {
+ // The nasty cast to (char*) is because Windows is brain-dead.
+ hostent *hp = gethostbyname ((char*)address);
+ if (!hp)
+ return 0;
+ HostAddr = ((in_addr*)(hp->h_addr))->s_addr;
+ }
+
+ memset (&pin, 0, sizeof(pin));
+ pin.sin_family = AF_INET;
+ pin.sin_addr.s_addr = HostAddr;
+ pin.sin_port = htons (port);
+
+
+
+ if (!data && (length > 0))
+ throw std::runtime_error ("bad outbound data");
+ char *buffer = (char *) malloc (length + 1);
+ if (!buffer)
+ throw std::runtime_error ("no allocation for outbound data");
+ memcpy (buffer, data, length);
+ buffer [length] = 0;
+ OutboundPages.push_back (OutboundPage (buffer, length, pin));
+ OutboundDataSize += length;
+ #ifdef HAVE_EPOLL
+ EpollEvent.events = (EPOLLIN | EPOLLOUT);
+ assert (MyEventMachine);
+ MyEventMachine->Modify (this);
+ #endif
+ return length;
+}
+
+
+/****************************************
+STATIC: DatagramDescriptor::SendDatagram
+****************************************/
+
+int DatagramDescriptor::SendDatagram (const char *binding, const char *data, int length, const char *address, int port)
+{
+ DatagramDescriptor *dd = dynamic_cast <DatagramDescriptor*> (Bindable_t::GetObject (binding));
+ if (dd)
+ return dd->SendOutboundDatagram (data, length, address, port);
+ else
+ return -1;
+}
+
+
+/*********************************
+ConnectionDescriptor::GetPeername
+*********************************/
+
+bool ConnectionDescriptor::GetPeername (struct sockaddr *s)
+{
+ bool ok = false;
+ if (s) {
+ socklen_t len = sizeof(*s);
+ int gp = getpeername (GetSocket(), s, &len);
+ if (gp == 0)
+ ok = true;
+ }
+ return ok;
+}
+
+/*********************************
+ConnectionDescriptor::GetSockname
+*********************************/
+
+bool ConnectionDescriptor::GetSockname (struct sockaddr *s)
+{
+ bool ok = false;
+ if (s) {
+ socklen_t len = sizeof(*s);
+ int gp = getsockname (GetSocket(), s, &len);
+ if (gp == 0)
+ ok = true;
+ }
+ return ok;
+}
+
+
+/**********************************************
+ConnectionDescriptor::GetCommInactivityTimeout
+**********************************************/
+
+int ConnectionDescriptor::GetCommInactivityTimeout (int *value)
+{
+ if (value) {
+ *value = InactivityTimeout;
+ return 1;
+ }
+ else {
+ // TODO, extended logging, got bad parameter.
+ return 0;
+ }
+}
+
+
+/**********************************************
+ConnectionDescriptor::SetCommInactivityTimeout
+**********************************************/
+
+int ConnectionDescriptor::SetCommInactivityTimeout (int *value)
+{
+ int out = 0;
+
+ if (value) {
+ if ((*value==0) || (*value >= 2)) {
+ // Replace the value and send the old one back to the caller.
+ int v = *value;
+ *value = InactivityTimeout;
+ InactivityTimeout = v;
+ out = 1;
+ }
+ else {
+ // TODO, extended logging, got bad value.
+ }
+ }
+ else {
+ // TODO, extended logging, got bad parameter.
+ }
+
+ return out;
+}
+
+/*******************************
+DatagramDescriptor::GetPeername
+*******************************/
+
+bool DatagramDescriptor::GetPeername (struct sockaddr *s)
+{
+ bool ok = false;
+ if (s) {
+ memset (s, 0, sizeof(struct sockaddr));
+ memcpy (s, &ReturnAddress, sizeof(ReturnAddress));
+ ok = true;
+ }
+ return ok;
+}
+
+/*******************************
+DatagramDescriptor::GetSockname
+*******************************/
+
+bool DatagramDescriptor::GetSockname (struct sockaddr *s)
+{
+ bool ok = false;
+ if (s) {
+ socklen_t len = sizeof(*s);
+ int gp = getsockname (GetSocket(), s, &len);
+ if (gp == 0)
+ ok = true;
+ }
+ return ok;
+}
+
+
+
+/********************************************
+DatagramDescriptor::GetCommInactivityTimeout
+********************************************/
+
+int DatagramDescriptor::GetCommInactivityTimeout (int *value)
+{
+ if (value) {
+ *value = InactivityTimeout;
+ return 1;
+ }
+ else {
+ // TODO, extended logging, got bad parameter.
+ return 0;
+ }
+}
+
+/********************************************
+DatagramDescriptor::SetCommInactivityTimeout
+********************************************/
+
+int DatagramDescriptor::SetCommInactivityTimeout (int *value)
+{
+ int out = 0;
+
+ if (value) {
+ if ((*value==0) || (*value >= 2)) {
+ // Replace the value and send the old one back to the caller.
+ int v = *value;
+ *value = InactivityTimeout;
+ InactivityTimeout = v;
+ out = 1;
+ }
+ else {
+ // TODO, extended logging, got bad value.
+ }
+ }
+ else {
+ // TODO, extended logging, got bad parameter.
+ }
+
+ return out;
+}
+
+/*****************************************************************************
+
+$Id$
+
+File: em.cpp
+Date: 06Apr06
+
+Copyright (C) 2006-07 by Francis Cianfrocca. All Rights Reserved.
+Gmail: blackhedd
+
+This program is free software; you can redistribute it and/or modify
+it under the terms of either: 1) the GNU General Public License
+as published by the Free Software Foundation; either version 2 of the
+License, or (at your option) any later version; or 2) Ruby's License.
+
+See the file COPYING for complete licensing information.
+
+*****************************************************************************/
+
+// THIS ENTIRE FILE WILL EVENTUALLY BE FOR UNIX BUILDS ONLY.
+//#ifdef OS_UNIX
+
+
+#include "project.h"
+
+// Keep a global variable floating around
+// with the current loop time as set by the Event Machine.
+// This avoids the need for frequent expensive calls to time(NULL);
+time_t gCurrentLoopTime;
+
+#ifdef OS_WIN32
+unsigned gTickCountTickover;
+unsigned gLastTickCount;
+#endif
+
+
+/* The numer of max outstanding timers was once a const enum defined in em.h.
+ * Now we define it here so that users can change its value if necessary.
+ */
+static int MaxOutstandingTimers = 1000;
+
+
+/* Internal helper to convert strings to internet addresses. IPv6-aware.
+ * Not reentrant or threadsafe, optimized for speed.
+ */
+static struct sockaddr *name2address (const char *server, int port, int *family, int *bind_size);
+
+
+/***************************************
+STATIC EventMachine_t::SetMaxTimerCount
+***************************************/
+
+void EventMachine_t::SetMaxTimerCount (int count)
+{
+ /* Allow a user to increase the maximum number of outstanding timers.
+ * If this gets "too high" (a metric that is of course platform dependent),
+ * bad things will happen like performance problems and possible overuse
+ * of memory.
+ * The actual timer mechanism is very efficient so it's hard to know what
+ * the practical max, but 100,000 shouldn't be too problematical.
+ */
+ if (count < 100)
+ count = 100;
+ MaxOutstandingTimers = count;
+}
+
+
+
+/******************************
+EventMachine_t::EventMachine_t
+******************************/
+
+EventMachine_t::EventMachine_t (void (*event_callback)(const char*, int, const char*, int)):
+ EventCallback (event_callback),
+ NextHeartbeatTime (0),
+ LoopBreakerReader (-1),
+ LoopBreakerWriter (-1),
+ bEpoll (false),
+ bKqueue (false),
+ epfd (-1)
+{
+ // Default time-slice is just smaller than one hundred mills.
+ Quantum.tv_sec = 0;
+ Quantum.tv_usec = 90000;
+
+ gTerminateSignalReceived = false;
+ // Make sure the current loop time is sane, in case we do any initializations of
+ // objects before we start running.
+ gCurrentLoopTime = time(NULL);
+
+ /* We initialize the network library here (only on Windows of course)
+ * and initialize "loop breakers." Our destructor also does some network-level
+ * cleanup. There's thus an implicit assumption that any given instance of EventMachine_t
+ * will only call ::Run once. Is that a good assumption? Should we move some of these
+ * inits and de-inits into ::Run?
+ */
+ #ifdef OS_WIN32
+ WSADATA w;
+ WSAStartup (MAKEWORD (1, 1), &w);
+ #endif
+
+ _InitializeLoopBreaker();
+}
+
+
+/*******************************
+EventMachine_t::~EventMachine_t
+*******************************/
+
+EventMachine_t::~EventMachine_t()
+{
+ // Run down descriptors
+ size_t i;
+ for (i = 0; i < NewDescriptors.size(); i++)
+ delete NewDescriptors[i];
+ for (i = 0; i < Descriptors.size(); i++)
+ delete Descriptors[i];
+
+ close (LoopBreakerReader);
+ close (LoopBreakerWriter);
+
+ if (epfd != -1)
+ close (epfd);
+ if (kqfd != -1)
+ close (kqfd);
+}
+
+
+/*************************
+EventMachine_t::_UseEpoll
+*************************/
+
+void EventMachine_t::_UseEpoll()
+{
+ /* Temporary.
+ * Use an internal flag to switch in epoll-based functionality until we determine
+ * how it should be integrated properly and the extent of the required changes.
+ * A permanent solution needs to allow the integration of additional technologies,
+ * like kqueue and Solaris's events.
+ */
+
+ #ifdef HAVE_EPOLL
+ bEpoll = true;
+ #endif
+}
+
+/**************************
+EventMachine_t::_UseKqueue
+**************************/
+
+void EventMachine_t::_UseKqueue()
+{
+ /* Temporary.
+ * See comments under _UseEpoll.
+ */
+
+ #ifdef HAVE_KQUEUE
+ bKqueue = true;
+ #endif
+}
+
+
+/****************************
+EventMachine_t::ScheduleHalt
+****************************/
+
+void EventMachine_t::ScheduleHalt()
+{
+ /* This is how we stop the machine.
+ * This can be called by clients. Signal handlers will probably
+ * set the global flag.
+ * For now this means there can only be one EventMachine ever running at a time.
+ *
+ * IMPORTANT: keep this light, fast, and async-safe. Don't do anything frisky in here,
+ * because it may be called from signal handlers invoked from code that we don't
+ * control. At this writing (20Sep06), EM does NOT install any signal handlers of
+ * its own.
+ *
+ * We need a FAQ. And one of the questions is: how do I stop EM when Ctrl-C happens?
+ * The answer is to call evma_stop_machine, which calls here, from a SIGINT handler.
+ */
+ gTerminateSignalReceived = true;
+}
+
+
+
+/*******************************
+EventMachine_t::SetTimerQuantum
+*******************************/
+
+void EventMachine_t::SetTimerQuantum (int interval)
+{
+ /* We get a timer-quantum expressed in milliseconds.
+ * Don't set a quantum smaller than 5 or larger than 2500.
+ */
+
+ if ((interval < 5) || (interval > 2500))
+ throw std::runtime_error ("invalid timer-quantum");
+
+ Quantum.tv_sec = interval / 1000;
+ Quantum.tv_usec = (interval % 1000) * 1000;
+}
+
+
+/*************************************
+(STATIC) EventMachine_t::SetuidString
+*************************************/
+
+void EventMachine_t::SetuidString (const char *username)
+{
+ /* This method takes a caller-supplied username and tries to setuid
+ * to that user. There is no meaningful implementation (and no error)
+ * on Windows. On Unix, a failure to setuid the caller-supplied string
+ * causes a fatal abort, because presumably the program is calling here
+ * in order to fulfill a security requirement. If we fail silently,
+ * the user may continue to run with too much privilege.
+ *
+ * TODO, we need to decide on and document a way of generating C++ level errors
+ * that can be wrapped in documented Ruby exceptions, so users can catch
+ * and handle them. And distinguish it from errors that we WON'T let the Ruby
+ * user catch (like security-violations and resource-overallocation).
+ * A setuid failure here would be in the latter category.
+ */
+
+ #ifdef OS_UNIX
+ if (!username || !*username)
+ throw std::runtime_error ("setuid_string failed: no username specified");
+
+ struct passwd *p = getpwnam (username);
+ if (!p)
+ throw std::runtime_error ("setuid_string failed: unknown username");
+
+ if (setuid (p->pw_uid) != 0)
+ throw std::runtime_error ("setuid_string failed: no setuid");
+
+ // Success.
+ #endif
+}
+
+
+/****************************************
+(STATIC) EventMachine_t::SetRlimitNofile
+****************************************/
+
+int EventMachine_t::SetRlimitNofile (int nofiles)
+{
+ #ifdef OS_UNIX
+ struct rlimit rlim;
+ getrlimit (RLIMIT_NOFILE, &rlim);
+ if (nofiles >= 0) {
+ rlim.rlim_cur = nofiles;
+ if (nofiles > rlim.rlim_max)
+ rlim.rlim_max = nofiles;
+ setrlimit (RLIMIT_NOFILE, &rlim);
+ // ignore the error return, for now at least.
+ // TODO, emit an error message someday when we have proper debug levels.
+ }
+ getrlimit (RLIMIT_NOFILE, &rlim);
+ return rlim.rlim_cur;
+ #endif
+
+ #ifdef OS_WIN32
+ // No meaningful implementation on Windows.
+ return 0;
+ #endif
+}
+
+
+/*********************************
+EventMachine_t::SignalLoopBreaker
+*********************************/
+
+void EventMachine_t::SignalLoopBreaker()
+{
+ #ifdef OS_UNIX
+ write (LoopBreakerWriter, "", 1);
+ #endif
+ #ifdef OS_WIN32
+ sendto (LoopBreakerReader, "", 0, 0, (struct sockaddr*)&(LoopBreakerTarget), sizeof(LoopBreakerTarget));
+ #endif
+}
+
+
+/**************************************
+EventMachine_t::_InitializeLoopBreaker
+**************************************/
+
+void EventMachine_t::_InitializeLoopBreaker()
+{
+ /* A "loop-breaker" is a socket-descriptor that we can write to in order
+ * to break the main select loop. Primarily useful for things running on
+ * threads other than the main EM thread, so they can trigger processing
+ * of events that arise exogenously to the EM.
+ * Keep the loop-breaker pipe out of the main descriptor set, otherwise
+ * its events will get passed on to user code.
+ */
+
+ #ifdef OS_UNIX
+ int fd[2];
+ if (pipe (fd))
+ throw std::runtime_error ("no loop breaker");
+
+ LoopBreakerWriter = fd[1];
+ LoopBreakerReader = fd[0];
+ #endif
+
+ #ifdef OS_WIN32
+ int sd = socket (AF_INET, SOCK_DGRAM, 0);
+ if (sd == INVALID_SOCKET)
+ throw std::runtime_error ("no loop breaker socket");
+ SetSocketNonblocking (sd);
+
+ memset (&LoopBreakerTarget, 0, sizeof(LoopBreakerTarget));
+ LoopBreakerTarget.sin_family = AF_INET;
+ LoopBreakerTarget.sin_addr.s_addr = inet_addr ("127.0.0.1");
+
+ srand ((int)time(NULL));
+ int i;
+ for (i=0; i < 100; i++) {
+ int r = (rand() % 10000) + 20000;
+ LoopBreakerTarget.sin_port = htons (r);
+ if (bind (sd, (struct sockaddr*)&LoopBreakerTarget, sizeof(LoopBreakerTarget)) == 0)
+ break;
+ }
+
+ if (i == 100)
+ throw std::runtime_error ("no loop breaker");
+ LoopBreakerReader = sd;
+ #endif
+}
+
+
+/*******************
+EventMachine_t::Run
+*******************/
+
+void EventMachine_t::Run()
+{
+ #ifdef OS_WIN32
+ HookControlC (true);
+ #endif
+
+ #ifdef HAVE_EPOLL
+ if (bEpoll) {
+ epfd = epoll_create (MaxEpollDescriptors);
+ if (epfd == -1) {
+ char buf[200];
+ snprintf (buf, sizeof(buf)-1, "unable to create epoll descriptor: %s", strerror(errno));
+ throw std::runtime_error (buf);
+ }
+ int cloexec = fcntl (epfd, F_GETFD, 0);
+ assert (cloexec >= 0);
+ cloexec |= FD_CLOEXEC;
+ fcntl (epfd, F_SETFD, cloexec);
+
+ assert (LoopBreakerReader >= 0);
+ LoopbreakDescriptor *ld = new LoopbreakDescriptor (LoopBreakerReader, this);
+ assert (ld);
+ Add (ld);
+ }
+ #endif
+
+ #ifdef HAVE_KQUEUE
+ if (bKqueue) {
+ kqfd = kqueue();
+ if (kqfd == -1) {
+ char buf[200];
+ snprintf (buf, sizeof(buf)-1, "unable to create kqueue descriptor: %s", strerror(errno));
+ throw std::runtime_error (buf);
+ }
+ // cloexec not needed. By definition, kqueues are not carried across forks.
+
+ assert (LoopBreakerReader >= 0);
+ LoopbreakDescriptor *ld = new LoopbreakDescriptor (LoopBreakerReader, this);
+ assert (ld);
+ Add (ld);
+ }
+ #endif
+
+ while (true) {
+ gCurrentLoopTime = time(NULL);
+ if (!_RunTimers())
+ break;
+
+ /* _Add must precede _Modify because the same descriptor might
+ * be on both lists during the same pass through the machine,
+ * and to modify a descriptor before adding it would fail.
+ */
+ _AddNewDescriptors();
+ _ModifyDescriptors();
+
+ if (!_RunOnce())
+ break;
+ if (gTerminateSignalReceived)
+ break;
+ }
+
+ #ifdef OS_WIN32
+ HookControlC (false);
+ #endif
+}
+
+
+/************************
+EventMachine_t::_RunOnce
+************************/
+
+bool EventMachine_t::_RunOnce()
+{
+ if (bEpoll)
+ return _RunEpollOnce();
+ else if (bKqueue)
+ return _RunKqueueOnce();
+ else
+ return _RunSelectOnce();
+}
+
+
+
+/*****************************
+EventMachine_t::_RunEpollOnce
+*****************************/
+
+bool EventMachine_t::_RunEpollOnce()
+{
+ #ifdef HAVE_EPOLL
+ assert (epfd != -1);
+ struct epoll_event ev [MaxEpollDescriptors];
+ int s;
+
+ #ifdef BUILD_FOR_RUBY
+ TRAP_BEG;
+ #endif
+ s = epoll_wait (epfd, ev, MaxEpollDescriptors, 50);
+ #ifdef BUILD_FOR_RUBY
+ TRAP_END;
+ #endif
+
+ if (s > 0) {
+ for (int i=0; i < s; i++) {
+ EventableDescriptor *ed = (EventableDescriptor*) ev[i].data.ptr;
+
+ if (ev[i].events & (EPOLLERR | EPOLLHUP))
+ ed->ScheduleClose (false);
+ if (ev[i].events & EPOLLIN)
+ ed->Read();
+ if (ev[i].events & EPOLLOUT) {
+ ed->Write();
+ epoll_ctl (epfd, EPOLL_CTL_MOD, ed->GetSocket(), ed->GetEpollEvent());
+ // Ignoring return value
+ }
+ }
+ }
+ else if (s < 0) {
+ // epoll_wait can fail on error in a handful of ways.
+ // If this happens, then wait for a little while to avoid busy-looping.
+ // If the error was EINTR, we probably caught SIGCHLD or something,
+ // so keep the wait short.
+ timeval tv = {0, ((errno == EINTR) ? 5 : 50) * 1000};
+ EmSelect (0, NULL, NULL, NULL, &tv);
+ }
+
+ { // cleanup dying sockets
+ // vector::pop_back works in constant time.
+ // TODO, rip this out and only delete the descriptors we know have died,
+ // rather than traversing the whole list.
+ // Modified 05Jan08 per suggestions by Chris Heath. It's possible that
+ // an EventableDescriptor will have a descriptor value of -1. That will
+ // happen if EventableDescriptor::Close was called on it. In that case,
+ // don't call epoll_ctl to remove the socket's filters from the epoll set.
+ // According to the epoll docs, this happens automatically when the
+ // descriptor is closed anyway. This is different from the case where
+ // the socket has already been closed but the descriptor in the ED object
+ // hasn't yet been set to INVALID_SOCKET.
+ int i, j;
+ int nSockets = Descriptors.size();
+ for (i=0, j=0; i < nSockets; i++) {
+ EventableDescriptor *ed = Descriptors[i];
+ assert (ed);
+ if (ed->ShouldDelete()) {
+ if (ed->GetSocket() != INVALID_SOCKET) {
+ assert (bEpoll); // wouldn't be in this method otherwise.
+ assert (epfd != -1);
+ int e = epoll_ctl (epfd, EPOLL_CTL_DEL, ed->GetSocket(), ed->GetEpollEvent());
+ // ENOENT or EBADF are not errors because the socket may be already closed when we get here.
+ if (e && (errno != ENOENT) && (errno != EBADF)) {
+ char buf [200];
+ snprintf (buf, sizeof(buf)-1, "unable to delete epoll event: %s", strerror(errno));
+ throw std::runtime_error (buf);
+ }
+ }
+
+ ModifiedDescriptors.erase (ed);
+ delete ed;
+ }
+ else
+ Descriptors [j++] = ed;
+ }
+ while ((size_t)j < Descriptors.size())
+ Descriptors.pop_back();
+
+ }
+
+ // TODO, heartbeats.
+ // Added 14Sep07, its absence was noted by Brian Candler. But the comment was here, indicated
+ // that this got thought about and not done when EPOLL was originally written. Was there a reason
+ // not to do it, or was it an oversight? Certainly, running a heartbeat on 50,000 connections every
+ // two seconds can get to be a real bear, especially if all we're doing is timing out dead ones.
+ // Maybe there's a better way to do this. (Or maybe it's not that expensive after all.)
+ //
+ { // dispatch heartbeats
+ if (gCurrentLoopTime >= NextHeartbeatTime) {
+ NextHeartbeatTime = gCurrentLoopTime + HeartbeatInterval;
+
+ for (int i=0; i < Descriptors.size(); i++) {
+ EventableDescriptor *ed = Descriptors[i];
+ assert (ed);
+ ed->Heartbeat();
+ }
+ }
+ }
+
+ #ifdef BUILD_FOR_RUBY
+ if (!rb_thread_alone()) {
+ rb_thread_schedule();
+ }
+ #endif
+
+ return true;
+ #else
+ throw std::runtime_error ("epoll is not implemented on this platform");
+ #endif
+}
+
+
+/******************************
+EventMachine_t::_RunKqueueOnce
+******************************/
+
+bool EventMachine_t::_RunKqueueOnce()
+{
+ #ifdef HAVE_KQUEUE
+ assert (kqfd != -1);
+ const int maxKevents = 2000;
+ struct kevent Karray [maxKevents];
+ struct timespec ts = {0, 10000000}; // Too frequent. Use blocking_region
+
+ int k;
+ #ifdef BUILD_FOR_RUBY
+ TRAP_BEG;
+ #endif
+ k = kevent (kqfd, NULL, 0, Karray, maxKevents, &ts);
+ #ifdef BUILD_FOR_RUBY
+ TRAP_END;
+ #endif
+ struct kevent *ke = Karray;
+ while (k > 0) {
+ EventableDescriptor *ed = (EventableDescriptor*) (ke->udata);
+ assert (ed);
+
+ if (ke->filter == EVFILT_READ)
+ ed->Read();
+ else if (ke->filter == EVFILT_WRITE)
+ ed->Write();
+ else
+ cerr << "Discarding unknown kqueue event " << ke->filter << endl;
+
+ --k;
+ ++ke;
+ }
+
+ { // cleanup dying sockets
+ // vector::pop_back works in constant time.
+ // TODO, rip this out and only delete the descriptors we know have died,
+ // rather than traversing the whole list.
+ // In kqueue, closing a descriptor automatically removes its event filters.
+
+ int i, j;
+ int nSockets = Descriptors.size();
+ for (i=0, j=0; i < nSockets; i++) {
+ EventableDescriptor *ed = Descriptors[i];
+ assert (ed);
+ if (ed->ShouldDelete()) {
+ ModifiedDescriptors.erase (ed);
+ delete ed;
+ }
+ else
+ Descriptors [j++] = ed;
+ }
+ while ((size_t)j < Descriptors.size())
+ Descriptors.pop_back();
+
+ }
+
+ { // dispatch heartbeats
+ if (gCurrentLoopTime >= NextHeartbeatTime) {
+ NextHeartbeatTime = gCurrentLoopTime + HeartbeatInterval;
+
+ for (int i=0; i < Descriptors.size(); i++) {
+ EventableDescriptor *ed = Descriptors[i];
+ assert (ed);
+ ed->Heartbeat();
+ }
+ }
+ }
+
+
+ // TODO, replace this with rb_thread_blocking_region for 1.9 builds.
+ #ifdef BUILD_FOR_RUBY
+ if (!rb_thread_alone()) {
+ rb_thread_schedule();
+ }
+ #endif
+
+ return true;
+ #else
+ throw std::runtime_error ("kqueue is not implemented on this platform");
+ #endif
+}
+
+
+/*********************************
+EventMachine_t::_ModifyEpollEvent
+*********************************/
+
+void EventMachine_t::_ModifyEpollEvent (EventableDescriptor *ed)
+{
+ #ifdef HAVE_EPOLL
+ if (bEpoll) {
+ assert (epfd != -1);
+ assert (ed);
+ int e = epoll_ctl (epfd, EPOLL_CTL_MOD, ed->GetSocket(), ed->GetEpollEvent());
+ if (e) {
+ char buf [200];
+ snprintf (buf, sizeof(buf)-1, "unable to modify epoll event: %s", strerror(errno));
+ throw std::runtime_error (buf);
+ }
+ }
+ #endif
+}
+
+
+
+/**************************
+SelectData_t::SelectData_t
+**************************/
+
+SelectData_t::SelectData_t()
+{
+ maxsocket = 0;
+ FD_ZERO (&fdreads);
+ FD_ZERO (&fdwrites);
+}
+
+
+#ifdef BUILD_FOR_RUBY
+/*****************
+_SelectDataSelect
+*****************/
+
+#ifdef HAVE_TBR
+static VALUE _SelectDataSelect (void *v)
+{
+ SelectData_t *sd = (SelectData_t*)v;
+ sd->nSockets = select (sd->maxsocket+1, &(sd->fdreads), &(sd->fdwrites), NULL, &(sd->tv));
+ return Qnil;
+}
+#endif
+
+/*********************
+SelectData_t::_Select
+*********************/
+
+int SelectData_t::_Select()
+{
+ #ifdef HAVE_TBR
+ rb_thread_blocking_region (_SelectDataSelect, (void*)this, RUBY_UBF_IO, 0);
+ return nSockets;
+ #endif
+
+ #ifndef HAVE_TBR
+ return EmSelect (maxsocket+1, &fdreads, &fdwrites, NULL, &tv);
+ #endif
+}
+#endif
+
+
+
+/******************************
+EventMachine_t::_RunSelectOnce
+******************************/
+
+bool EventMachine_t::_RunSelectOnce()
+{
+ // Crank the event machine once.
+ // If there are no descriptors to process, then sleep
+ // for a few hundred mills to avoid busy-looping.
+ // Return T/F to indicate whether we should continue.
+ // This is based on a select loop. Alternately provide epoll
+ // if we know we're running on a 2.6 kernel.
+ // epoll will be effective if we provide it as an alternative,
+ // however it has the same problem interoperating with Ruby
+ // threads that select does.
+
+ //cerr << "X";
+
+ /* This protection is now obsolete, because we will ALWAYS
+ * have at least one descriptor (the loop-breaker) to read.
+ */
+ /*
+ if (Descriptors.size() == 0) {
+ #ifdef OS_UNIX
+ timeval tv = {0, 200 * 1000};
+ EmSelect (0, NULL, NULL, NULL, &tv);
+ return true;
+ #endif
+ #ifdef OS_WIN32
+ Sleep (200);
+ return true;
+ #endif
+ }
+ */
+
+ SelectData_t SelectData;
+ /*
+ fd_set fdreads, fdwrites;
+ FD_ZERO (&fdreads);
+ FD_ZERO (&fdwrites);
+
+ int maxsocket = 0;
+ */
+
+ // Always read the loop-breaker reader.
+ // Changed 23Aug06, provisionally implemented for Windows with a UDP socket
+ // running on localhost with a randomly-chosen port. (*Puke*)
+ // Windows has a version of the Unix pipe() library function, but it doesn't
+ // give you back descriptors that are selectable.
+ FD_SET (LoopBreakerReader, &(SelectData.fdreads));
+ if (SelectData.maxsocket < LoopBreakerReader)
+ SelectData.maxsocket = LoopBreakerReader;
+
+ // prepare the sockets for reading and writing
+ size_t i;
+ for (i = 0; i < Descriptors.size(); i++) {
+ EventableDescriptor *ed = Descriptors[i];
+ assert (ed);
+ int sd = ed->GetSocket();
+ assert (sd != INVALID_SOCKET);
+
+ if (ed->SelectForRead())
+ FD_SET (sd, &(SelectData.fdreads));
+ if (ed->SelectForWrite())
+ FD_SET (sd, &(SelectData.fdwrites));
+
+ if (SelectData.maxsocket < sd)
+ SelectData.maxsocket = sd;
+ }
+
+
+ { // read and write the sockets
+ //timeval tv = {1, 0}; // Solaris fails if the microseconds member is >= 1000000.
+ //timeval tv = Quantum;
+ SelectData.tv = Quantum;
+ int s = SelectData._Select();
+ //rb_thread_blocking_region(xxx,(void*)&SelectData,RUBY_UBF_IO,0);
+ //int s = EmSelect (SelectData.maxsocket+1, &(SelectData.fdreads), &(SelectData.fdwrites), NULL, &(SelectData.tv));
+ //int s = SelectData.nSockets;
+ if (s > 0) {
+ /* Changed 01Jun07. We used to handle the Loop-breaker right here.
+ * Now we do it AFTER all the regular descriptors. There's an
+ * incredibly important and subtle reason for this. Code on
+ * loop breakers is sometimes used to cause the reactor core to
+ * cycle (for example, to allow outbound network buffers to drain).
+ * If a loop-breaker handler reschedules itself (say, after determining
+ * that the write buffers are still too full), then it will execute
+ * IMMEDIATELY if _ReadLoopBreaker is done here instead of after
+ * the other descriptors are processed. That defeats the whole purpose.
+ */
+ for (i=0; i < Descriptors.size(); i++) {
+ EventableDescriptor *ed = Descriptors[i];
+ assert (ed);
+ int sd = ed->GetSocket();
+ assert (sd != INVALID_SOCKET);
+
+ if (FD_ISSET (sd, &(SelectData.fdwrites)))
+ ed->Write();
+ if (FD_ISSET (sd, &(SelectData.fdreads)))
+ ed->Read();
+ }
+
+ if (FD_ISSET (LoopBreakerReader, &(SelectData.fdreads)))
+ _ReadLoopBreaker();
+ }
+ else if (s < 0) {
+ // select can fail on error in a handful of ways.
+ // If this happens, then wait for a little while to avoid busy-looping.
+ // If the error was EINTR, we probably caught SIGCHLD or something,
+ // so keep the wait short.
+ timeval tv = {0, ((errno == EINTR) ? 5 : 50) * 1000};
+ EmSelect (0, NULL, NULL, NULL, &tv);
+ }
+ }
+
+
+ { // dispatch heartbeats
+ if (gCurrentLoopTime >= NextHeartbeatTime) {
+ NextHeartbeatTime = gCurrentLoopTime + HeartbeatInterval;
+
+ for (i=0; i < Descriptors.size(); i++) {
+ EventableDescriptor *ed = Descriptors[i];
+ assert (ed);
+ ed->Heartbeat();
+ }
+ }
+ }
+
+ { // cleanup dying sockets
+ // vector::pop_back works in constant time.
+ int i, j;
+ int nSockets = Descriptors.size();
+ for (i=0, j=0; i < nSockets; i++) {
+ EventableDescriptor *ed = Descriptors[i];
+ assert (ed);
+ if (ed->ShouldDelete())
+ delete ed;
+ else
+ Descriptors [j++] = ed;
+ }
+ while ((size_t)j < Descriptors.size())
+ Descriptors.pop_back();
+
+ }
+
+ return true;
+}
+
+
+/********************************
+EventMachine_t::_ReadLoopBreaker
+********************************/
+
+void EventMachine_t::_ReadLoopBreaker()
+{
+ /* The loop breaker has selected readable.
+ * Read it ONCE (it may block if we try to read it twice)
+ * and send a loop-break event back to user code.
+ */
+ char buffer [1024];
+ read (LoopBreakerReader, buffer, sizeof(buffer));
+ if (EventCallback)
+ (*EventCallback)("", EM_LOOPBREAK_SIGNAL, "", 0);
+}
+
+
+/**************************
+EventMachine_t::_RunTimers
+**************************/
+
+bool EventMachine_t::_RunTimers()
+{
+ // These are caller-defined timer handlers.
+ // Return T/F to indicate whether we should continue the main loop.
+ // We rely on the fact that multimaps sort by their keys to avoid
+ // inspecting the whole list every time we come here.
+ // Just keep inspecting and processing the list head until we hit
+ // one that hasn't expired yet.
+
+ #ifdef OS_UNIX
+ struct timeval tv;
+ gettimeofday (&tv, NULL);
+ Int64 now = (((Int64)(tv.tv_sec)) * 1000000LL) + ((Int64)(tv.tv_usec));
+ #endif
+
+ #ifdef OS_WIN32
+ unsigned tick = GetTickCount();
+ if (tick < gLastTickCount)
+ gTickCountTickover += 1;
+ gLastTickCount = tick;
+ Int64 now = ((Int64)gTickCountTickover << 32) + (Int64)tick;
+ #endif
+
+ while (true) {
+ multimap<Int64,Timer_t>::iterator i = Timers.begin();
+ if (i == Timers.end())
+ break;
+ if (i->first > now)
+ break;
+ if (EventCallback)
+ (*EventCallback) ("", EM_TIMER_FIRED, i->second.GetBinding().c_str(), i->second.GetBinding().length());
+ Timers.erase (i);
+ }
+ return true;
+}
+
+
+
+/***********************************
+EventMachine_t::InstallOneshotTimer
+***********************************/
+
+const char *EventMachine_t::InstallOneshotTimer (int milliseconds)
+{
+ if (Timers.size() > MaxOutstandingTimers)
+ return false;
+ // Don't use the global loop-time variable here, because we might
+ // get called before the main event machine is running.
+
+ #ifdef OS_UNIX
+ struct timeval tv;
+ gettimeofday (&tv, NULL);
+ Int64 fire_at = (((Int64)(tv.tv_sec)) * 1000000LL) + ((Int64)(tv.tv_usec));
+ fire_at += ((Int64)milliseconds) * 1000LL;
+ #endif
+
+ #ifdef OS_WIN32
+ unsigned tick = GetTickCount();
+ if (tick < gLastTickCount)
+ gTickCountTickover += 1;
+ gLastTickCount = tick;
+
+ Int64 fire_at = ((Int64)gTickCountTickover << 32) + (Int64)tick;
+ fire_at += (Int64)milliseconds;
+ #endif
+
+ Timer_t t;
+ multimap<Int64,Timer_t>::iterator i =
+ Timers.insert (make_pair (fire_at, t));
+ return i->second.GetBindingChars();
+}
+
+
+/*******************************
+EventMachine_t::ConnectToServer
+*******************************/
+
+const char *EventMachine_t::ConnectToServer (const char *server, int port)
+{
+ /* We want to spend no more than a few seconds waiting for a connection
+ * to a remote host. So we use a nonblocking connect.
+ * Linux disobeys the usual rules for nonblocking connects.
+ * Per Stevens (UNP p.410), you expect a nonblocking connect to select
+ * both readable and writable on error, and not to return EINPROGRESS
+ * if the connect can be fulfilled immediately. Linux violates both
+ * of these expectations.
+ * Any kind of nonblocking connect on Linux returns EINPROGRESS.
+ * The socket will then return writable when the disposition of the
+ * connect is known, but it will not also be readable in case of
+ * error! Weirdly, it will be readable in case there is data to read!!!
+ * (Which can happen with protocols like SSH and SMTP.)
+ * I suppose if you were so inclined you could consider this logical,
+ * but it's not the way Unix has historically done it.
+ * So we ignore the readable flag and read getsockopt to see if there
+ * was an error connecting. A select timeout works as expected.
+ * In regard to getsockopt: Linux does the Berkeley-style thing,
+ * not the Solaris-style, and returns zero with the error code in
+ * the error parameter.
+ * Return the binding-text of the newly-created pending connection,
+ * or NULL if there was a problem.
+ */
+
+ if (!server || !*server || !port)
+ return NULL;
+
+ int family, bind_size;
+ struct sockaddr *bind_as = name2address (server, port, &family, &bind_size);
+ if (!bind_as)
+ return NULL;
+
+ int sd = socket (family, SOCK_STREAM, 0);
+ if (sd == INVALID_SOCKET)
+ return NULL;
+
+ /*
+ sockaddr_in pin;
+ unsigned long HostAddr;
+
+ HostAddr = inet_addr (server);
+ if (HostAddr == INADDR_NONE) {
+ hostent *hp = gethostbyname ((char*)server); // Windows requires (char*)
+ if (!hp) {
+ // TODO: This gives the caller a fatal error. Not good.
+ // They can respond by catching RuntimeError (blecch).
+ // Possibly we need to fire an unbind event and provide
+ // a status code so user code can detect the cause of the
+ // failure.
+ return NULL;
+ }
+ HostAddr = ((in_addr*)(hp->h_addr))->s_addr;
+ }
+
+ memset (&pin, 0, sizeof(pin));
+ pin.sin_family = AF_INET;
+ pin.sin_addr.s_addr = HostAddr;
+ pin.sin_port = htons (port);
+
+ int sd = socket (AF_INET, SOCK_STREAM, 0);
+ if (sd == INVALID_SOCKET)
+ return NULL;
+ */
+
+ // From here on, ALL error returns must close the socket.
+ // Set the new socket nonblocking.
+ if (!SetSocketNonblocking (sd)) {
+ closesocket (sd);
+ return NULL;
+ }
+ // Disable slow-start (Nagle algorithm).
+ int one = 1;
+ setsockopt (sd, IPPROTO_TCP, TCP_NODELAY, (char*) &one, sizeof(one));
+
+ const char *out = NULL;
+
+ #ifdef OS_UNIX
+ //if (connect (sd, (sockaddr*)&pin, sizeof pin) == 0) {
+ if (connect (sd, bind_as, bind_size) == 0) {
+ // This is a connect success, which Linux appears
+ // never to give when the socket is nonblocking,
+ // even if the connection is intramachine or to
+ // localhost.
+
+ /* Changed this branch 08Aug06. Evidently some kernels
+ * (FreeBSD for example) will actually return success from
+ * a nonblocking connect. This is a pretty simple case,
+ * just set up the new connection and clear the pending flag.
+ * Thanks to Chris Ochs for helping track this down.
+ * This branch never gets taken on Linux or (oddly) OSX.
+ * The original behavior was to throw an unimplemented,
+ * which the user saw as a fatal exception. Very unfriendly.
+ *
+ * Tweaked 10Aug06. Even though the connect disposition is
+ * known, we still set the connect-pending flag. That way
+ * some needed initialization will happen in the ConnectionDescriptor.
+ * (To wit, the ConnectionCompleted event gets sent to the client.)
+ */
+ ConnectionDescriptor *cd = new ConnectionDescriptor (sd, this);
+ if (!cd)
+ throw std::runtime_error ("no connection allocated");
+ cd->SetConnectPending (true);
+ Add (cd);
+ out = cd->GetBinding().c_str();
+ }
+ else if (errno == EINPROGRESS) {
+ // Errno will generally always be EINPROGRESS, but on Linux
+ // we have to look at getsockopt to be sure what really happened.
+ int error;
+ socklen_t len;
+ len = sizeof(error);
+ int o = getsockopt (sd, SOL_SOCKET, SO_ERROR, &error, &len);
+ if ((o == 0) && (error == 0)) {
+ // Here, there's no disposition.
+ // Put the connection on the stack and wait for it to complete
+ // or time out.
+ ConnectionDescriptor *cd = new ConnectionDescriptor (sd, this);
+ if (!cd)
+ throw std::runtime_error ("no connection allocated");
+ cd->SetConnectPending (true);
+ Add (cd);
+ out = cd->GetBinding().c_str();
+ }
+ else {
+ /* This could be connection refused or some such thing.
+ * We will come here on Linux if a localhost connection fails.
+ * Changed 16Jul06: Originally this branch was a no-op, and
+ * we'd drop down to the end of the method, close the socket,
+ * and return NULL, which would cause the caller to GET A
+ * FATAL EXCEPTION. Now we keep the socket around but schedule an
+ * immediate close on it, so the caller will get a close-event
+ * scheduled on it. This was only an issue for localhost connections
+ * to non-listening ports. We may eventually need to revise this
+ * revised behavior, in case it causes problems like making it hard
+ * for people to know that a failure occurred.
+ */
+ ConnectionDescriptor *cd = new ConnectionDescriptor (sd, this);
+ if (!cd)
+ throw std::runtime_error ("no connection allocated");
+ cd->ScheduleClose (false);
+ Add (cd);
+ out = cd->GetBinding().c_str();
+ }
+ }
+ else {
+ // The error from connect was something other then EINPROGRESS.
+ }
+ #endif
+
+ #ifdef OS_WIN32
+ //if (connect (sd, (sockaddr*)&pin, sizeof pin) == 0) {
+ if (connect (sd, bind_as, bind_size) == 0) {
+ // This is a connect success, which Windows appears
+ // never to give when the socket is nonblocking,
+ // even if the connection is intramachine or to
+ // localhost.
+ throw std::runtime_error ("unimplemented");
+ }
+ else if (WSAGetLastError() == WSAEWOULDBLOCK) {
+ // Here, there's no disposition.
+ // Windows appears not to surface refused connections or
+ // such stuff at this point.
+ // Put the connection on the stack and wait for it to complete
+ // or time out.
+ ConnectionDescriptor *cd = new ConnectionDescriptor (sd, this);
+ if (!cd)
+ throw std::runtime_error ("no connection allocated");
+ cd->SetConnectPending (true);
+ Add (cd);
+ out = cd->GetBinding().c_str();
+ }
+ else {
+ // The error from connect was something other then WSAEWOULDBLOCK.
+ }
+
+ #endif
+
+ if (out == NULL)
+ closesocket (sd);
+ return out;
+}
+
+/***********************************
+EventMachine_t::ConnectToUnixServer
+***********************************/
+
+const char *EventMachine_t::ConnectToUnixServer (const char *server)
+{
+ /* Connect to a Unix-domain server, which by definition is running
+ * on the same host.
+ * There is no meaningful implementation on Windows.
+ * There's no need to do a nonblocking connect, since the connection
+ * is always local and can always be fulfilled immediately.
+ */
+
+ #ifdef OS_WIN32
+ throw std::runtime_error ("unix-domain connection unavailable on this platform");
+ return NULL;
+ #endif
+
+ // The whole rest of this function is only compiled on Unix systems.
+ #ifdef OS_UNIX
+
+ const char *out = NULL;
+
+ if (!server || !*server)
+ return NULL;
+
+ sockaddr_un pun;
+ memset (&pun, 0, sizeof(pun));
+ pun.sun_family = AF_LOCAL;
+
+ // You ordinarily expect the server name field to be at least 1024 bytes long,
+ // but on Linux it can be MUCH shorter.
+ if (strlen(server) >= sizeof(pun.sun_path))
+ throw std::runtime_error ("unix-domain server name is too long");
+
+
+ strcpy (pun.sun_path, server);
+
+ int fd = socket (AF_LOCAL, SOCK_STREAM, 0);
+ if (fd == INVALID_SOCKET)
+ return NULL;
+
+ // From here on, ALL error returns must close the socket.
+ // NOTE: At this point, the socket is still a blocking socket.
+ if (connect (fd, (struct sockaddr*)&pun, sizeof(pun)) != 0) {
+ closesocket (fd);
+ return NULL;
+ }
+
+ // Set the newly-connected socket nonblocking.
+ if (!SetSocketNonblocking (fd)) {
+ closesocket (fd);
+ return NULL;
+ }
+
+ // Set up a connection descriptor and add it to the event-machine.
+ // Observe, even though we know the connection status is connect-success,
+ // we still set the "pending" flag, so some needed initializations take
+ // place.
+ ConnectionDescriptor *cd = new ConnectionDescriptor (fd, this);
+ if (!cd)
+ throw std::runtime_error ("no connection allocated");
+ cd->SetConnectPending (true);
+ Add (cd);
+ out = cd->GetBinding().c_str();
+
+ if (out == NULL)
+ closesocket (fd);
+
+ return out;
+ #endif
+}
+
+/************************
+EventMachine_t::AttachFD
+************************/
+
+const char *EventMachine_t::AttachFD (int fd, bool notify_readable, bool notify_writable)
+{
+ #ifdef OS_UNIX
+ if (fcntl(fd, F_GETFL, 0) < 0)
+ throw std::runtime_error ("invalid file descriptor");
+ #endif
+
+ #ifdef OS_WIN32
+ // TODO: add better check for invalid file descriptors (see ioctlsocket or getsockopt)
+ if (fd == INVALID_SOCKET)
+ throw std::runtime_error ("invalid file descriptor");
+ #endif
+
+ {// Check for duplicate descriptors
+ size_t i;
+ for (i = 0; i < Descriptors.size(); i++) {
+ EventableDescriptor *ed = Descriptors[i];
+ assert (ed);
+ if (ed->GetSocket() == fd)
+ throw std::runtime_error ("adding existing descriptor");
+ }
+
+ for (i = 0; i < NewDescriptors.size(); i++) {
+ EventableDescriptor *ed = NewDescriptors[i];
+ assert (ed);
+ if (ed->GetSocket() == fd)
+ throw std::runtime_error ("adding existing new descriptor");
+ }
+ }
+
+ ConnectionDescriptor *cd = new ConnectionDescriptor (fd, this);
+ if (!cd)
+ throw std::runtime_error ("no connection allocated");
+
+ cd->SetConnectPending (true);
+ cd->SetNotifyReadable (notify_readable);
+ cd->SetNotifyWritable (notify_writable);
+
+ Add (cd);
+
+ const char *out = NULL;
+ out = cd->GetBinding().c_str();
+ if (out == NULL)
+ closesocket (fd);
+ return out;
+}
+
+/************************
+EventMachine_t::DetachFD
+************************/
+
+int EventMachine_t::DetachFD (EventableDescriptor *ed)
+{
+ if (!ed)
+ throw std::runtime_error ("detaching bad descriptor");
+
+ #ifdef HAVE_EPOLL
+ if (bEpoll) {
+ if (ed->GetSocket() != INVALID_SOCKET) {
+ assert (bEpoll); // wouldn't be in this method otherwise.
+ assert (epfd != -1);
+ int e = epoll_ctl (epfd, EPOLL_CTL_DEL, ed->GetSocket(), ed->GetEpollEvent());
+ // ENOENT or EBADF are not errors because the socket may be already closed when we get here.
+ if (e && (errno != ENOENT) && (errno != EBADF)) {
+ char buf [200];
+ snprintf (buf, sizeof(buf)-1, "unable to delete epoll event: %s", strerror(errno));
+ throw std::runtime_error (buf);
+ }
+ }
+ }
+ #endif
+
+ #ifdef HAVE_KQUEUE
+ if (bKqueue) {
+ struct kevent k;
+ EV_SET (&k, ed->GetSocket(), EVFILT_READ, EV_DELETE, 0, 0, ed);
+ int t = kevent (kqfd, &k, 1, NULL, 0, NULL);
+ assert (t == 0);
+ }
+ #endif
+
+ { // remove descriptor from lists
+ int i, j;
+ int nSockets = Descriptors.size();
+ for (i=0, j=0; i < nSockets; i++) {
+ EventableDescriptor *ted = Descriptors[i];
+ assert (ted);
+ if (ted != ed)
+ Descriptors [j++] = ted;
+ }
+ while ((size_t)j < Descriptors.size())
+ Descriptors.pop_back();
+
+ ModifiedDescriptors.erase (ed);
+ }
+
+ int fd = ed->GetSocket();
+
+ // We depend on ~EventableDescriptor not calling close() if the socket is invalid
+ ed->SetSocketInvalid();
+ delete ed;
+
+ return fd;
+}
+
+/************
+name2address
+************/
+
+struct sockaddr *name2address (const char *server, int port, int *family, int *bind_size)
+{
+ // THIS IS NOT RE-ENTRANT OR THREADSAFE. Optimize for speed.
+ // Check the more-common cases first.
+ // Return NULL if no resolution.
+
+ static struct sockaddr_in in4;
+ #ifndef __CYGWIN__
+ static struct sockaddr_in6 in6;
+ #endif
+ struct hostent *hp;
+
+ if (!server || !*server)
+ server = "0.0.0.0";
+
+ memset (&in4, 0, sizeof(in4));
+ if ( (in4.sin_addr.s_addr = inet_addr (server)) != INADDR_NONE) {
+ if (family)
+ *family = AF_INET;
+ if (bind_size)
+ *bind_size = sizeof(in4);
+ in4.sin_family = AF_INET;
+ in4.sin_port = htons (port);
+ return (struct sockaddr*)&in4;
+ }
+
+ #if defined(OS_UNIX) && !defined(__CYGWIN__)
+ memset (&in6, 0, sizeof(in6));
+ if (inet_pton (AF_INET6, server, in6.sin6_addr.s6_addr) > 0) {
+ if (family)
+ *family = AF_INET6;
+ if (bind_size)
+ *bind_size = sizeof(in6);
+ in6.sin6_family = AF_INET6;
+ in6.sin6_port = htons (port);
+ return (struct sockaddr*)&in6;
+ }
+ #endif
+
+ #ifdef OS_WIN32
+ // TODO, must complete this branch. Windows doesn't have inet_pton.
+ // A possible approach is to make a getaddrinfo call with the supplied
+ // server address, constraining the hints to ipv6 and seeing if we
+ // get any addresses.
+ // For the time being, Ipv6 addresses aren't supported on Windows.
+ #endif
+
+ hp = gethostbyname ((char*)server); // Windows requires the cast.
+ if (hp) {
+ in4.sin_addr.s_addr = ((in_addr*)(hp->h_addr))->s_addr;
+ if (family)
+ *family = AF_INET;
+ if (bind_size)
+ *bind_size = sizeof(in4);
+ in4.sin_family = AF_INET;
+ in4.sin_port = htons (port);
+ return (struct sockaddr*)&in4;
+ }
+
+ return NULL;
+}
+
+
+/*******************************
+EventMachine_t::CreateTcpServer
+*******************************/
+
+const char *EventMachine_t::CreateTcpServer (const char *server, int port)
+{
+ /* Create a TCP-acceptor (server) socket and add it to the event machine.
+ * Return the binding of the new acceptor to the caller.
+ * This binding will be referenced when the new acceptor sends events
+ * to indicate accepted connections.
+ */
+
+
+ int family, bind_size;
+ struct sockaddr *bind_here = name2address (server, port, &family, &bind_size);
+ if (!bind_here)
+ return NULL;
+
+ const char *output_binding = NULL;
+
+ //struct sockaddr_in sin;
+
+ int sd_accept = socket (family, SOCK_STREAM, 0);
+ if (sd_accept == INVALID_SOCKET) {
+ goto fail;
+ }
+
+ /*
+ memset (&sin, 0, sizeof(sin));
+ sin.sin_family = AF_INET;
+ sin.sin_addr.s_addr = INADDR_ANY;
+ sin.sin_port = htons (port);
+
+ if (server && *server) {
+ sin.sin_addr.s_addr = inet_addr (server);
+ if (sin.sin_addr.s_addr == INADDR_NONE) {
+ hostent *hp = gethostbyname ((char*)server); // Windows requires the cast.
+ if (hp == NULL) {
+ //__warning ("hostname not resolved: ", server);
+ goto fail;
+ }
+ sin.sin_addr.s_addr = ((in_addr*)(hp->h_addr))->s_addr;
+ }
+ }
+ */
+
+ { // set reuseaddr to improve performance on restarts.
+ int oval = 1;
+ if (setsockopt (sd_accept, SOL_SOCKET, SO_REUSEADDR, (char*)&oval, sizeof(oval)) < 0) {
+ //__warning ("setsockopt failed while creating listener","");
+ goto fail;
+ }
+ }
+
+ { // set CLOEXEC. Only makes sense on Unix
+ #ifdef OS_UNIX
+ int cloexec = fcntl (sd_accept, F_GETFD, 0);
+ assert (cloexec >= 0);
+ cloexec |= FD_CLOEXEC;
+ fcntl (sd_accept, F_SETFD, cloexec);
+ #endif
+ }
+
+
+ //if (bind (sd_accept, (struct sockaddr*)&sin, sizeof(sin))) {
+ if (bind (sd_accept, bind_here, bind_size)) {
+ //__warning ("binding failed");
+ goto fail;
+ }
+
+ if (listen (sd_accept, 100)) {
+ //__warning ("listen failed");
+ goto fail;
+ }
+
+ {
+ // Set the acceptor non-blocking.
+ // THIS IS CRUCIALLY IMPORTANT because we read it in a select loop.
+ if (!SetSocketNonblocking (sd_accept)) {
+ //int val = fcntl (sd_accept, F_GETFL, 0);
+ //if (fcntl (sd_accept, F_SETFL, val | O_NONBLOCK) == -1) {
+ goto fail;
+ }
+ }
+
+ { // Looking good.
+ AcceptorDescriptor *ad = new AcceptorDescriptor (sd_accept, this);
+ if (!ad)
+ throw std::runtime_error ("unable to allocate acceptor");
+ Add (ad);
+ output_binding = ad->GetBinding().c_str();
+ }
+
+ return output_binding;
+
+ fail:
+ if (sd_accept != INVALID_SOCKET)
+ closesocket (sd_accept);
+ return NULL;
+}
+
+
+/**********************************
+EventMachine_t::OpenDatagramSocket
+**********************************/
+
+const char *EventMachine_t::OpenDatagramSocket (const char *address, int port)
+{
+ const char *output_binding = NULL;
+
+ int sd = socket (AF_INET, SOCK_DGRAM, 0);
+ if (sd == INVALID_SOCKET)
+ goto fail;
+ // from here on, early returns must close the socket!
+
+
+ struct sockaddr_in sin;
+ memset (&sin, 0, sizeof(sin));
+ sin.sin_family = AF_INET;
+ sin.sin_port = htons (port);
+
+
+ if (address && *address) {
+ sin.sin_addr.s_addr = inet_addr (address);
+ if (sin.sin_addr.s_addr == INADDR_NONE) {
+ hostent *hp = gethostbyname ((char*)address); // Windows requires the cast.
+ if (hp == NULL)
+ goto fail;
+ sin.sin_addr.s_addr = ((in_addr*)(hp->h_addr))->s_addr;
+ }
+ }
+ else
+ sin.sin_addr.s_addr = htonl (INADDR_ANY);
+
+
+ // Set the new socket nonblocking.
+ {
+ if (!SetSocketNonblocking (sd))
+ //int val = fcntl (sd, F_GETFL, 0);
+ //if (fcntl (sd, F_SETFL, val | O_NONBLOCK) == -1)
+ goto fail;
+ }
+
+ if (bind (sd, (struct sockaddr*)&sin, sizeof(sin)) != 0)
+ goto fail;
+
+ { // Looking good.
+ DatagramDescriptor *ds = new DatagramDescriptor (sd, this);
+ if (!ds)
+ throw std::runtime_error ("unable to allocate datagram-socket");
+ Add (ds);
+ output_binding = ds->GetBinding().c_str();
+ }
+
+ return output_binding;
+
+ fail:
+ if (sd != INVALID_SOCKET)
+ closesocket (sd);
+ return NULL;
+}
+
+
+
+/*******************
+EventMachine_t::Add
+*******************/
+
+void EventMachine_t::Add (EventableDescriptor *ed)
+{
+ if (!ed)
+ throw std::runtime_error ("added bad descriptor");
+ ed->SetEventCallback (EventCallback);
+ NewDescriptors.push_back (ed);
+}
+
+
+/*******************************
+EventMachine_t::ArmKqueueWriter
+*******************************/
+
+void EventMachine_t::ArmKqueueWriter (EventableDescriptor *ed)
+{
+ #ifdef HAVE_KQUEUE
+ if (bKqueue) {
+ if (!ed)
+ throw std::runtime_error ("added bad descriptor");
+ struct kevent k;
+ EV_SET (&k, ed->GetSocket(), EVFILT_WRITE, EV_ADD | EV_ONESHOT, 0, 0, ed);
+ int t = kevent (kqfd, &k, 1, NULL, 0, NULL);
+ assert (t == 0);
+ }
+ #endif
+}
+
+/*******************************
+EventMachine_t::ArmKqueueReader
+*******************************/
+
+void EventMachine_t::ArmKqueueReader (EventableDescriptor *ed)
+{
+ #ifdef HAVE_KQUEUE
+ if (bKqueue) {
+ if (!ed)
+ throw std::runtime_error ("added bad descriptor");
+ struct kevent k;
+ EV_SET (&k, ed->GetSocket(), EVFILT_READ, EV_ADD, 0, 0, ed);
+ int t = kevent (kqfd, &k, 1, NULL, 0, NULL);
+ assert (t == 0);
+ }
+ #endif
+}
+
+/**********************************
+EventMachine_t::_AddNewDescriptors
+**********************************/
+
+void EventMachine_t::_AddNewDescriptors()
+{
+ /* Avoid adding descriptors to the main descriptor list
+ * while we're actually traversing the list.
+ * Any descriptors that are added as a result of processing timers
+ * or acceptors should go on a temporary queue and then added
+ * while we're not traversing the main list.
+ * Also, it (rarely) happens that a newly-created descriptor
+ * is immediately scheduled to close. It might be a good
+ * idea not to bother scheduling these for I/O but if
+ * we do that, we might bypass some important processing.
+ */
+
+ for (size_t i = 0; i < NewDescriptors.size(); i++) {
+ EventableDescriptor *ed = NewDescriptors[i];
+ if (ed == NULL)
+ throw std::runtime_error ("adding bad descriptor");
+
+ #if HAVE_EPOLL
+ if (bEpoll) {
+ assert (epfd != -1);
+ int e = epoll_ctl (epfd, EPOLL_CTL_ADD, ed->GetSocket(), ed->GetEpollEvent());
+ if (e) {
+ char buf [200];
+ snprintf (buf, sizeof(buf)-1, "unable to add new descriptor: %s", strerror(errno));
+ throw std::runtime_error (buf);
+ }
+ }
+ #endif
+
+ #if HAVE_KQUEUE
+ /*
+ if (bKqueue) {
+ // INCOMPLETE. Some descriptors don't want to be readable.
+ assert (kqfd != -1);
+ struct kevent k;
+ EV_SET (&k, ed->GetSocket(), EVFILT_READ, EV_ADD, 0, 0, ed);
+ int t = kevent (kqfd, &k, 1, NULL, 0, NULL);
+ assert (t == 0);
+ }
+ */
+ #endif
+
+ Descriptors.push_back (ed);
+ }
+ NewDescriptors.clear();
+}
+
+
+/**********************************
+EventMachine_t::_ModifyDescriptors
+**********************************/
+
+void EventMachine_t::_ModifyDescriptors()
+{
+ /* For implementations which don't level check every descriptor on
+ * every pass through the machine, as select does.
+ * If we're not selecting, then descriptors need a way to signal to the
+ * machine that their readable or writable status has changed.
+ * That's what the ::Modify call is for. We do it this way to avoid
+ * modifying descriptors during the loop traversal, where it can easily
+ * happen that an object (like a UDP socket) gets data written on it by
+ * the application during #post_init. That would take place BEFORE the
+ * descriptor even gets added to the epoll descriptor, so the modify
+ * operation will crash messily.
+ * Another really messy possibility is for a descriptor to put itself
+ * on the Modified list, and then get deleted before we get here.
+ * Remember, deletes happen after the I/O traversal and before the
+ * next pass through here. So we have to make sure when we delete a
+ * descriptor to remove it from the Modified list.
+ */
+
+ #ifdef HAVE_EPOLL
+ if (bEpoll) {
+ set<EventableDescriptor*>::iterator i = ModifiedDescriptors.begin();
+ while (i != ModifiedDescriptors.end()) {
+ assert (*i);
+ _ModifyEpollEvent (*i);
+ ++i;
+ }
+ }
+ #endif
+
+ ModifiedDescriptors.clear();
+}
+
+
+/**********************
+EventMachine_t::Modify
+**********************/
+
+void EventMachine_t::Modify (EventableDescriptor *ed)
+{
+ if (!ed)
+ throw std::runtime_error ("modified bad descriptor");
+ ModifiedDescriptors.insert (ed);
+}
+
+
+/***********************************
+EventMachine_t::_OpenFileForWriting
+***********************************/
+
+const char *EventMachine_t::_OpenFileForWriting (const char *filename)
+{
+ /*
+ * Return the binding-text of the newly-opened file,
+ * or NULL if there was a problem.
+ */
+
+ if (!filename || !*filename)
+ return NULL;
+
+ int fd = open (filename, O_CREAT|O_TRUNC|O_WRONLY|O_NONBLOCK, 0644);
+
+ FileStreamDescriptor *fsd = new FileStreamDescriptor (fd, this);
+ if (!fsd)
+ throw std::runtime_error ("no file-stream allocated");
+ Add (fsd);
+ return fsd->GetBinding().c_str();
+
+}
+
+
+/**************************************
+EventMachine_t::CreateUnixDomainServer
+**************************************/
+
+const char *EventMachine_t::CreateUnixDomainServer (const char *filename)
+{
+ /* Create a UNIX-domain acceptor (server) socket and add it to the event machine.
+ * Return the binding of the new acceptor to the caller.
+ * This binding will be referenced when the new acceptor sends events
+ * to indicate accepted connections.
+ * THERE IS NO MEANINGFUL IMPLEMENTATION ON WINDOWS.
+ */
+
+ #ifdef OS_WIN32
+ throw std::runtime_error ("unix-domain server unavailable on this platform");
+ #endif
+
+ // The whole rest of this function is only compiled on Unix systems.
+ #ifdef OS_UNIX
+ const char *output_binding = NULL;
+
+ struct sockaddr_un s_sun;
+
+ int sd_accept = socket (AF_LOCAL, SOCK_STREAM, 0);
+ if (sd_accept == INVALID_SOCKET) {
+ goto fail;
+ }
+
+ if (!filename || !*filename)
+ goto fail;
+ unlink (filename);
+
+ bzero (&s_sun, sizeof(s_sun));
+ s_sun.sun_family = AF_LOCAL;
+ strncpy (s_sun.sun_path, filename, sizeof(s_sun.sun_path)-1);
+
+ // don't bother with reuseaddr for a local socket.
+
+ { // set CLOEXEC. Only makes sense on Unix
+ #ifdef OS_UNIX
+ int cloexec = fcntl (sd_accept, F_GETFD, 0);
+ assert (cloexec >= 0);
+ cloexec |= FD_CLOEXEC;
+ fcntl (sd_accept, F_SETFD, cloexec);
+ #endif
+ }
+
+ if (bind (sd_accept, (struct sockaddr*)&s_sun, sizeof(s_sun))) {
+ //__warning ("binding failed");
+ goto fail;
+ }
+
+ if (listen (sd_accept, 100)) {
+ //__warning ("listen failed");
+ goto fail;
+ }
+
+ {
+ // Set the acceptor non-blocking.
+ // THIS IS CRUCIALLY IMPORTANT because we read it in a select loop.
+ if (!SetSocketNonblocking (sd_accept)) {
+ //int val = fcntl (sd_accept, F_GETFL, 0);
+ //if (fcntl (sd_accept, F_SETFL, val | O_NONBLOCK) == -1) {
+ goto fail;
+ }
+ }
+
+ { // Looking good.
+ AcceptorDescriptor *ad = new AcceptorDescriptor (sd_accept, this);
+ if (!ad)
+ throw std::runtime_error ("unable to allocate acceptor");
+ Add (ad);
+ output_binding = ad->GetBinding().c_str();
+ }
+
+ return output_binding;
+
+ fail:
+ if (sd_accept != INVALID_SOCKET)
+ closesocket (sd_accept);
+ return NULL;
+ #endif // OS_UNIX
+}
+
+
+/*********************
+EventMachine_t::Popen
+*********************/
+#if OBSOLETE
+const char *EventMachine_t::Popen (const char *cmd, const char *mode)
+{
+ #ifdef OS_WIN32
+ throw std::runtime_error ("popen is currently unavailable on this platform");
+ #endif
+
+ // The whole rest of this function is only compiled on Unix systems.
+ // Eventually we need this functionality (or a full-duplex equivalent) on Windows.
+ #ifdef OS_UNIX
+ const char *output_binding = NULL;
+
+ FILE *fp = popen (cmd, mode);
+ if (!fp)
+ return NULL;
+
+ // From here, all early returns must pclose the stream.
+
+ // According to the pipe(2) manpage, descriptors returned from pipe have both
+ // CLOEXEC and NONBLOCK clear. Do NOT set CLOEXEC. DO set nonblocking.
+ if (!SetSocketNonblocking (fileno (fp))) {
+ pclose (fp);
+ return NULL;
+ }
+
+ { // Looking good.
+ PipeDescriptor *pd = new PipeDescriptor (fp, this);
+ if (!pd)
+ throw std::runtime_error ("unable to allocate pipe");
+ Add (pd);
+ output_binding = pd->GetBinding().c_str();
+ }
+
+ return output_binding;
+ #endif
+}
+#endif // OBSOLETE
+
+/**************************
+EventMachine_t::Socketpair
+**************************/
+
+const char *EventMachine_t::Socketpair (char * const*cmd_strings)
+{
+ #ifdef OS_WIN32
+ throw std::runtime_error ("socketpair is currently unavailable on this platform");
+ #endif
+
+ // The whole rest of this function is only compiled on Unix systems.
+ // Eventually we need this functionality (or a full-duplex equivalent) on Windows.
+ #ifdef OS_UNIX
+ // Make sure the incoming array of command strings is sane.
+ if (!cmd_strings)
+ return NULL;
+ int j;
+ for (j=0; j < 100 && cmd_strings[j]; j++)
+ ;
+ if ((j==0) || (j==100))
+ return NULL;
+
+ const char *output_binding = NULL;
+
+ int sv[2];
+ if (socketpair (AF_LOCAL, SOCK_STREAM, 0, sv) < 0)
+ return NULL;
+ // from here, all early returns must close the pair of sockets.
+
+ // Set the parent side of the socketpair nonblocking.
+ // We don't care about the child side, and most child processes will expect their
+ // stdout to be blocking. Thanks to Duane Johnson and Bill Kelly for pointing this out.
+ // Obviously DON'T set CLOEXEC.
+ if (!SetSocketNonblocking (sv[0])) {
+ close (sv[0]);
+ close (sv[1]);
+ return NULL;
+ }
+
+ pid_t f = fork();
+ if (f > 0) {
+ close (sv[1]);
+ PipeDescriptor *pd = new PipeDescriptor (sv[0], f, this);
+ if (!pd)
+ throw std::runtime_error ("unable to allocate pipe");
+ Add (pd);
+ output_binding = pd->GetBinding().c_str();
+ }
+ else if (f == 0) {
+ close (sv[0]);
+ dup2 (sv[1], STDIN_FILENO);
+ close (sv[1]);
+ dup2 (STDIN_FILENO, STDOUT_FILENO);
+ execvp (cmd_strings[0], cmd_strings+1);
+ exit (-1); // end the child process if the exec doesn't work.
+ }
+ else
+ throw std::runtime_error ("no fork");
+
+ return output_binding;
+ #endif
+}
+
+
+/****************************
+EventMachine_t::OpenKeyboard
+****************************/
+
+const char *EventMachine_t::OpenKeyboard()
+{
+ KeyboardDescriptor *kd = new KeyboardDescriptor (this);
+ if (!kd)
+ throw std::runtime_error ("no keyboard-object allocated");
+ Add (kd);
+ return kd->GetBinding().c_str();
+}
+
+
+
+
+
+//#endif // OS_UNIX
+
+/*****************************************************************************
+
+$Id$
+
+File: emwin.cpp
+Date: 05May06
+
+Copyright (C) 2006-07 by Francis Cianfrocca. All Rights Reserved.
+Gmail: blackhedd
+
+This program is free software; you can redistribute it and/or modify
+it under the terms of either: 1) the GNU General Public License
+as published by the Free Software Foundation; either version 2 of the
+License, or (at your option) any later version; or 2) Ruby's License.
+
+See the file COPYING for complete licensing information.
+
+*****************************************************************************/
+
+
+// THIS ENTIRE FILE IS FOR WINDOWS BUILDS ONLY
+// INCOMPLETE AND DISABLED FOR NOW.
+#ifdef xOS_WIN32
+
+#include "project.h"
+
+
+// Keep a global variable floating around
+// with the current loop time as set by the Event Machine.
+// This avoids the need for frequent expensive calls to time(NULL);
+time_t gCurrentLoopTime;
+
+
+/******************************
+EventMachine_t::EventMachine_t
+******************************/
+
+EventMachine_t::EventMachine_t (void (*event_callback)(const char*, int, const char*, int)):
+ EventCallback (event_callback),
+ NextHeartbeatTime (0)
+{
+ gTerminateSignalReceived = false;
+ Iocp = NULL;
+}
+
+
+/*******************************
+EventMachine_t::~EventMachine_t
+*******************************/
+
+EventMachine_t::~EventMachine_t()
+{
+ cerr << "EM __dt\n";
+ if (Iocp)
+ CloseHandle (Iocp);
+}
+
+
+/****************************
+EventMachine_t::ScheduleHalt
+****************************/
+
+void EventMachine_t::ScheduleHalt()
+{
+ /* This is how we stop the machine.
+ * This can be called by clients. Signal handlers will probably
+ * set the global flag.
+ * For now this means there can only be one EventMachine ever running at a time.
+ */
+ gTerminateSignalReceived = true;
+}
+
+
+
+/*******************
+EventMachine_t::Run
+*******************/
+
+void EventMachine_t::Run()
+{
+ HookControlC (true);
+
+ Iocp = CreateIoCompletionPort (INVALID_HANDLE_VALUE, NULL, 0, 0);
+ if (Iocp == NULL)
+ throw std::runtime_error ("no completion port");
+
+
+ DWORD nBytes, nCompletionKey;
+ LPOVERLAPPED Overlapped;
+
+ do {
+ gCurrentLoopTime = time(NULL);
+ // Have some kind of strategy that will dequeue maybe up to 10 completions
+ // without running the timers as long as they are available immediately.
+ // Otherwise in a busy server we're calling them every time through the loop.
+ if (!_RunTimers())
+ break;
+ if (GetQueuedCompletionStatus (Iocp, &nBytes, &nCompletionKey, &Overlapped, 1000)) {
+ }
+ cerr << "+";
+ } while (!gTerminateSignalReceived);
+
+
+ /*
+ while (true) {
+ gCurrentLoopTime = time(NULL);
+ if (!_RunTimers())
+ break;
+ _AddNewDescriptors();
+ if (!_RunOnce())
+ break;
+ if (gTerminateSignalReceived)
+ break;
+ }
+ */
+
+ HookControlC (false);
+}
+
+
+/**************************
+EventMachine_t::_RunTimers
+**************************/
+
+bool EventMachine_t::_RunTimers()
+{
+ // These are caller-defined timer handlers.
+ // Return T/F to indicate whether we should continue the main loop.
+ // We rely on the fact that multimaps sort by their keys to avoid
+ // inspecting the whole list every time we come here.
+ // Just keep inspecting and processing the list head until we hit
+ // one that hasn't expired yet.
+
+ while (true) {
+ multimap<time_t,Timer_t>::iterator i = Timers.begin();
+ if (i == Timers.end())
+ break;
+ if (i->first > gCurrentLoopTime)
+ break;
+ if (EventCallback)
+ (*EventCallback) ("", EM_TIMER_FIRED, i->second.GetBinding().c_str(), i->second.GetBinding().length());
+ Timers.erase (i);
+ }
+ return true;
+}
+
+
+/***********************************
+EventMachine_t::InstallOneshotTimer
+***********************************/
+
+const char *EventMachine_t::InstallOneshotTimer (int seconds)
+{
+ if (Timers.size() > MaxOutstandingTimers)
+ return false;
+ // Don't use the global loop-time variable here, because we might
+ // get called before the main event machine is running.
+
+ Timer_t t;
+ Timers.insert (make_pair (time(NULL) + seconds, t));
+ return t.GetBinding().c_str();
+}
+
+
+/**********************************
+EventMachine_t::OpenDatagramSocket
+**********************************/
+
+const char *EventMachine_t::OpenDatagramSocket (const char *address, int port)
+{
+ cerr << "OPEN DATAGRAM SOCKET\n";
+ return "Unimplemented";
+}
+
+
+/*******************************
+EventMachine_t::CreateTcpServer
+*******************************/
+
+const char *EventMachine_t::CreateTcpServer (const char *server, int port)
+{
+ /* Create a TCP-acceptor (server) socket and add it to the event machine.
+ * Return the binding of the new acceptor to the caller.
+ * This binding will be referenced when the new acceptor sends events
+ * to indicate accepted connections.
+ */
+
+ const char *output_binding = NULL;
+
+ struct sockaddr_in sin;
+
+ SOCKET sd_accept = socket (AF_INET, SOCK_STREAM, 0);
+ if (sd_accept == INVALID_SOCKET) {
+ goto fail;
+ }
+
+ memset (&sin, 0, sizeof(sin));
+ sin.sin_family = AF_INET;
+ sin.sin_addr.s_addr = INADDR_ANY;
+ sin.sin_port = htons (port);
+
+ if (server && *server) {
+ sin.sin_addr.s_addr = inet_addr (server);
+ if (sin.sin_addr.s_addr == INADDR_NONE) {
+ hostent *hp = gethostbyname (server);
+ if (hp == NULL) {
+ //__warning ("hostname not resolved: ", server);
+ goto fail;
+ }
+ sin.sin_addr.s_addr = ((in_addr*)(hp->h_addr))->s_addr;
+ }
+ }
+
+
+ // No need to set reuseaddr on Windows.
+
+
+ if (bind (sd_accept, (struct sockaddr*)&sin, sizeof(sin))) {
+ //__warning ("binding failed");
+ goto fail;
+ }
+
+ if (listen (sd_accept, 100)) {
+ //__warning ("listen failed");
+ goto fail;
+ }
+
+ { // Looking good.
+ AcceptorDescriptor *ad = new AcceptorDescriptor (this, sd_accept);
+ if (!ad)
+ throw std::runtime_error ("unable to allocate acceptor");
+ Add (ad);
+ output_binding = ad->GetBinding().c_str();
+
+ CreateIoCompletionPort ((HANDLE)sd_accept, Iocp, NULL, 0);
+ SOCKET sd = socket (AF_INET, SOCK_STREAM, 0);
+ CreateIoCompletionPort ((HANDLE)sd, Iocp, NULL, 0);
+ AcceptEx (sd_accept, sd,
+ }
+
+ return output_binding;
+
+ fail:
+ if (sd_accept != INVALID_SOCKET)
+ closesocket (sd_accept);
+ return NULL;
+}
+
+
+/*******************************
+EventMachine_t::ConnectToServer
+*******************************/
+
+const char *EventMachine_t::ConnectToServer (const char *server, int port)
+{
+ if (!server || !*server || !port)
+ return NULL;
+
+ sockaddr_in pin;
+ unsigned long HostAddr;
+
+ HostAddr = inet_addr (server);
+ if (HostAddr == INADDR_NONE) {
+ hostent *hp = gethostbyname (server);
+ if (!hp)
+ return NULL;
+ HostAddr = ((in_addr*)(hp->h_addr))->s_addr;
+ }
+
+ memset (&pin, 0, sizeof(pin));
+ pin.sin_family = AF_INET;
+ pin.sin_addr.s_addr = HostAddr;
+ pin.sin_port = htons (port);
+
+ int sd = socket (AF_INET, SOCK_STREAM, 0);
+ if (sd == INVALID_SOCKET)
+ return NULL;
+
+
+ LPOVERLAPPED olap = (LPOVERLAPPED) calloc (1, sizeof (OVERLAPPED));
+ cerr << "I'm dying now\n";
+ throw runtime_error ("UNIMPLEMENTED!!!\n");
+
+}
+
+
+
+/*******************
+EventMachine_t::Add
+*******************/
+
+void EventMachine_t::Add (EventableDescriptor *ed)
+{
+ cerr << "ADD\n";
+}
+
+
+
+#endif // OS_WIN32
+
+/*****************************************************************************
+
+$Id$
+
+File: epoll.cpp
+Date: 06Jun07
+
+Copyright (C) 2006-07 by Francis Cianfrocca. All Rights Reserved.
+Gmail: blackhedd
+
+This program is free software; you can redistribute it and/or modify
+it under the terms of either: 1) the GNU General Public License
+as published by the Free Software Foundation; either version 2 of the
+License, or (at your option) any later version; or 2) Ruby's License.
+
+See the file COPYING for complete licensing information.
+
+*****************************************************************************/
+
+
+#ifdef HAVE_EPOLL
+
+#include "project.h"
+
+#endif // HAVE_EPOLL
+
+/*****************************************************************************
+
+$Id: mapper.cpp 4527 2007-07-04 10:21:34Z francis $
+
+File: mapper.cpp
+Date: 02Jul07
+
+Copyright (C) 2007 by Francis Cianfrocca. All Rights Reserved.
+Gmail: garbagecat10
+
+This program is free software; you can redistribute it and/or modify
+it under the terms of either: 1) the GNU General Public License
+as published by the Free Software Foundation; either version 2 of the
+License, or (at your option) any later version; or 2) Ruby's License.
+
+See the file COPYING for complete licensing information.
+
+*****************************************************************************/
+
+
+//////////////////////////////////////////////////////////////////////
+// UNIX implementation
+//////////////////////////////////////////////////////////////////////
+
+
+#ifdef OS_UNIX
+
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <sys/mman.h>
+#include <fcntl.h>
+#include <errno.h>
+
+#include <iostream>
+#include "unistd.h"
+#include <string>
+#include <cstring>
+#include <stdexcept>
+using namespace std;
+
+#include "mapper.h"
+
+/******************
+Mapper_t::Mapper_t
+******************/
+
+Mapper_t::Mapper_t (const string &filename)
+{
+ /* We ASSUME we can open the file.
+ * (More precisely, we assume someone else checked before we got here.)
+ */
+
+ Fd = open (filename.c_str(), O_RDONLY);
+ if (Fd < 0)
+ throw runtime_error (strerror (errno));
+
+ struct stat st;
+ if (fstat (Fd, &st))
+ throw runtime_error (strerror (errno));
+ FileSize = st.st_size;
+
+ MapPoint = (const char*) mmap (0, FileSize, PROT_READ, MAP_SHARED, Fd, 0);
+ if (MapPoint == MAP_FAILED)
+ throw runtime_error (strerror (errno));
+}
+
+
+/*******************
+Mapper_t::~Mapper_t
+*******************/
+
+Mapper_t::~Mapper_t()
+{
+ Close();
+}
+
+
+/***************
+Mapper_t::Close
+***************/
+
+void Mapper_t::Close()
+{
+ // Can be called multiple times.
+ // Calls to GetChunk are invalid after a call to Close.
+ if (MapPoint) {
+ munmap ((void*)MapPoint, FileSize);
+ MapPoint = NULL;
+ }
+ if (Fd >= 0) {
+ close (Fd);
+ Fd = -1;
+ }
+}
+
+/******************
+Mapper_t::GetChunk
+******************/
+
+const char *Mapper_t::GetChunk (unsigned start)
+{
+ return MapPoint + start;
+}
+
+
+
+#endif // OS_UNIX
+
+
+//////////////////////////////////////////////////////////////////////
+// WINDOWS implementation
+//////////////////////////////////////////////////////////////////////
+
+#ifdef OS_WIN32
+
+#include <windows.h>
+
+#include <iostream>
+#include <string>
+#include <stdexcept>
+using namespace std;
+
+#include "mapper.h"
+
+/******************
+Mapper_t::Mapper_t
+******************/
+
+Mapper_t::Mapper_t (const string &filename)
+{
+ /* We ASSUME we can open the file.
+ * (More precisely, we assume someone else checked before we got here.)
+ */
+
+ hFile = INVALID_HANDLE_VALUE;
+ hMapping = NULL;
+ MapPoint = NULL;
+ FileSize = 0;
+
+ hFile = CreateFile (filename.c_str(), GENERIC_READ|GENERIC_WRITE, FILE_SHARE_DELETE|FILE_SHARE_READ|FILE_SHARE_WRITE, NULL, OPEN_EXISTING, FILE_ATTRIBUTE_NORMAL, NULL);
+
+ if (hFile == INVALID_HANDLE_VALUE)
+ throw runtime_error ("File not found");
+
+ BY_HANDLE_FILE_INFORMATION i;
+ if (GetFileInformationByHandle (hFile, &i))
+ FileSize = i.nFileSizeLow;
+
+ hMapping = CreateFileMapping (hFile, NULL, PAGE_READWRITE, 0, 0, NULL);
+ if (!hMapping)
+ throw runtime_error ("File not mapped");
+
+ MapPoint = (const char*) MapViewOfFile (hMapping, FILE_MAP_WRITE, 0, 0, 0);
+ if (!MapPoint)
+ throw runtime_error ("Mappoint not read");
+}
+
+
+/*******************
+Mapper_t::~Mapper_t
+*******************/
+
+Mapper_t::~Mapper_t()
+{
+ Close();
+}
+
+/***************
+Mapper_t::Close
+***************/
+
+void Mapper_t::Close()
+{
+ // Can be called multiple times.
+ // Calls to GetChunk are invalid after a call to Close.
+ if (MapPoint) {
+ UnmapViewOfFile (MapPoint);
+ MapPoint = NULL;
+ }
+ if (hMapping != NULL) {
+ CloseHandle (hMapping);
+ hMapping = NULL;
+ }
+ if (hFile != INVALID_HANDLE_VALUE) {
+ CloseHandle (hFile);
+ hMapping = INVALID_HANDLE_VALUE;
+ }
+}
+
+
+/******************
+Mapper_t::GetChunk
+******************/
+
+const char *Mapper_t::GetChunk (unsigned start)
+{
+ return MapPoint + start;
+}
+
+
+
+#endif // OS_WINDOWS
+/*****************************************************************************
+
+$Id: rubymain.cpp 4529 2007-07-04 11:32:22Z francis $
+
+File: rubymain.cpp
+Date: 02Jul07
+
+Copyright (C) 2007 by Francis Cianfrocca. All Rights Reserved.
+Gmail: garbagecat10
+
+This program is free software; you can redistribute it and/or modify
+it under the terms of either: 1) the GNU General Public License
+as published by the Free Software Foundation; either version 2 of the
+License, or (at your option) any later version; or 2) Ruby's License.
+
+See the file COPYING for complete licensing information.
+
+*****************************************************************************/
+
+
+
+#include <iostream>
+#include <stdexcept>
+using namespace std;
+
+#include <ruby.h>
+#include "mapper.h"
+
+static VALUE EmModule;
+static VALUE FastFileReader;
+static VALUE Mapper;
+
+
+
+/*********
+mapper_dt
+*********/
+
+static void mapper_dt (void *ptr)
+{
+ if (ptr)
+ delete (Mapper_t*) ptr;
+}
+
+/**********
+mapper_new
+**********/
+
+static VALUE mapper_new (VALUE self, VALUE filename)
+{
+ Mapper_t *m = new Mapper_t (StringValuePtr (filename));
+ if (!m)
+ rb_raise (rb_eException, "No Mapper Object");
+ VALUE v = Data_Wrap_Struct (Mapper, 0, mapper_dt, (void*)m);
+ return v;
+}
+
+
+/****************
+mapper_get_chunk
+****************/
+
+static VALUE mapper_get_chunk (VALUE self, VALUE start, VALUE length)
+{
+ Mapper_t *m = NULL;
+ Data_Get_Struct (self, Mapper_t, m);
+ if (!m)
+ rb_raise (rb_eException, "No Mapper Object");
+
+ // TODO, what if some moron sends us a negative start value?
+ unsigned _start = NUM2INT (start);
+ unsigned _length = NUM2INT (length);
+ if ((_start + _length) > m->GetFileSize())
+ rb_raise (rb_eException, "Mapper Range Error");
+
+ const char *chunk = m->GetChunk (_start);
+ if (!chunk)
+ rb_raise (rb_eException, "No Mapper Chunk");
+ return rb_str_new (chunk, _length);
+}
+
+/************
+mapper_close
+************/
+
+static VALUE mapper_close (VALUE self)
+{
+ Mapper_t *m = NULL;
+ Data_Get_Struct (self, Mapper_t, m);
+ if (!m)
+ rb_raise (rb_eException, "No Mapper Object");
+ m->Close();
+ return Qnil;
+}
+
+/***********
+mapper_size
+***********/
+
+static VALUE mapper_size (VALUE self)
+{
+ Mapper_t *m = NULL;
+ Data_Get_Struct (self, Mapper_t, m);
+ if (!m)
+ rb_raise (rb_eException, "No Mapper Object");
+ return INT2NUM (m->GetFileSize());
+}
+
+
+/**********************
+Init_fastfilereaderext
+**********************/
+
+extern "C" void Init_fastfilereaderext()
+{
+ EmModule = rb_define_module ("EventMachine");
+ FastFileReader = rb_define_class_under (EmModule, "FastFileReader", rb_cObject);
+ Mapper = rb_define_class_under (FastFileReader, "Mapper", rb_cObject);
+
+ rb_define_module_function (Mapper, "new", (VALUE(*)(...))mapper_new, 1);
+ rb_define_method (Mapper, "size", (VALUE(*)(...))mapper_size, 0);
+ rb_define_method (Mapper, "close", (VALUE(*)(...))mapper_close, 0);
+ rb_define_method (Mapper, "get_chunk", (VALUE(*)(...))mapper_get_chunk, 2);
+}
+
+
+
+/*****************************************************************************
+
+$Id$
+
+File: files.cpp
+Date: 26Aug06
+
+Copyright (C) 2006-07 by Francis Cianfrocca. All Rights Reserved.
+Gmail: blackhedd
+
+This program is free software; you can redistribute it and/or modify
+it under the terms of either: 1) the GNU General Public License
+as published by the Free Software Foundation; either version 2 of the
+License, or (at your option) any later version; or 2) Ruby's License.
+
+See the file COPYING for complete licensing information.
+
+*****************************************************************************/
+
+#include "project.h"
+
+
+/******************************************
+FileStreamDescriptor::FileStreamDescriptor
+******************************************/
+
+FileStreamDescriptor::FileStreamDescriptor (int fd, EventMachine_t *em):
+ EventableDescriptor (fd, em),
+ OutboundDataSize (0)
+{
+cerr << "#####";
+}
+
+
+/*******************************************
+FileStreamDescriptor::~FileStreamDescriptor
+*******************************************/
+
+FileStreamDescriptor::~FileStreamDescriptor()
+{
+ // Run down any stranded outbound data.
+ for (size_t i=0; i < OutboundPages.size(); i++)
+ OutboundPages[i].Free();
+}
+
+
+/**************************
+FileStreamDescriptor::Read
+**************************/
+
+void FileStreamDescriptor::Read()
+{
+}
+
+/***************************
+FileStreamDescriptor::Write
+***************************/
+
+void FileStreamDescriptor::Write()
+{
+}
+
+
+/*******************************
+FileStreamDescriptor::Heartbeat
+*******************************/
+
+void FileStreamDescriptor::Heartbeat()
+{
+}
+
+
+/***********************************
+FileStreamDescriptor::SelectForRead
+***********************************/
+
+bool FileStreamDescriptor::SelectForRead()
+{
+ cerr << "R?";
+ return false;
+}
+
+
+/************************************
+FileStreamDescriptor::SelectForWrite
+************************************/
+
+bool FileStreamDescriptor::SelectForWrite()
+{
+ cerr << "W?";
+ return false;
+}
+
+
+/*****************************************************************************
+
+$Id$
+
+File: kb.cpp
+Date: 24Aug07
+
+Copyright (C) 2006-07 by Francis Cianfrocca. All Rights Reserved.
+Gmail: blackhedd
+
+This program is free software; you can redistribute it and/or modify
+it under the terms of either: 1) the GNU General Public License
+as published by the Free Software Foundation; either version 2 of the
+License, or (at your option) any later version; or 2) Ruby's License.
+
+See the file COPYING for complete licensing information.
+
+*****************************************************************************/
+
+#include "project.h"
+
+
+/**************************************
+KeyboardDescriptor::KeyboardDescriptor
+**************************************/
+
+KeyboardDescriptor::KeyboardDescriptor (EventMachine_t *parent_em):
+ EventableDescriptor (0, parent_em),
+ bReadAttemptedAfterClose (false),
+ LastIo (gCurrentLoopTime),
+ InactivityTimeout (0)
+{
+ #ifdef HAVE_EPOLL
+ EpollEvent.events = EPOLLIN;
+ #endif
+ #ifdef HAVE_KQUEUE
+ MyEventMachine->ArmKqueueReader (this);
+ #endif
+}
+
+
+/***************************************
+KeyboardDescriptor::~KeyboardDescriptor
+***************************************/
+
+KeyboardDescriptor::~KeyboardDescriptor()
+{
+}
+
+
+/*************************
+KeyboardDescriptor::Write
+*************************/
+
+void KeyboardDescriptor::Write()
+{
+ // Why are we here?
+ throw std::runtime_error ("bad code path in keyboard handler");
+}
+
+
+/*****************************
+KeyboardDescriptor::Heartbeat
+*****************************/
+
+void KeyboardDescriptor::Heartbeat()
+{
+ // no-op
+}
+
+
+/************************
+KeyboardDescriptor::Read
+************************/
+
+void KeyboardDescriptor::Read()
+{
+ char c;
+ read (GetSocket(), &c, 1);
+ if (EventCallback)
+ (*EventCallback)(GetBinding().c_str(), EM_CONNECTION_READ, &c, 1);
+}
+
+
+
+
+#if 0
+/******************************
+PipeDescriptor::PipeDescriptor
+******************************/
+
+PipeDescriptor::PipeDescriptor (int fd, pid_t subpid, EventMachine_t *parent_em):
+ EventableDescriptor (fd, parent_em),
+ bReadAttemptedAfterClose (false),
+ LastIo (gCurrentLoopTime),
+ InactivityTimeout (0),
+ OutboundDataSize (0),
+ SubprocessPid (subpid)
+{
+ #ifdef HAVE_EPOLL
+ EpollEvent.events = EPOLLIN;
+ #endif
+}
+
+
+/*******************************
+PipeDescriptor::~PipeDescriptor
+*******************************/
+
+PipeDescriptor::~PipeDescriptor()
+{
+ // Run down any stranded outbound data.
+ for (size_t i=0; i < OutboundPages.size(); i++)
+ OutboundPages[i].Free();
+
+ /* As a virtual destructor, we come here before the base-class
+ * destructor that closes our file-descriptor.
+ * We have to make sure the subprocess goes down (if it's not
+ * already down) and we have to reap the zombie.
+ *
+ * This implementation is PROVISIONAL and will surely be improved.
+ * The intention here is that we never block, hence the highly
+ * undesirable sleeps. But if we can't reap the subprocess even
+ * after sending it SIGKILL, then something is wrong and we
+ * throw a fatal exception, which is also not something we should
+ * be doing.
+ *
+ * Eventually the right thing to do will be to have the reactor
+ * core respond to SIGCHLD by chaining a handler on top of the
+ * one Ruby may have installed, and dealing with a list of dead
+ * children that are pending cleanup.
+ *
+ * Since we want to have a signal processor integrated into the
+ * client-visible API, let's wait until that is done before cleaning
+ * this up.
+ */
+
+ struct timespec req = {0, 10000000};
+ kill (SubprocessPid, SIGTERM);
+ nanosleep (&req, NULL);
+ if (waitpid (SubprocessPid, NULL, WNOHANG) == 0) {
+ kill (SubprocessPid, SIGKILL);
+ nanosleep (&req, NULL);
+ if (waitpid (SubprocessPid, NULL, WNOHANG) == 0)
+ throw std::runtime_error ("unable to reap subprocess");
+ }
+}
+
+
+
+/********************
+PipeDescriptor::Read
+********************/
+
+void PipeDescriptor::Read()
+{
+ int sd = GetSocket();
+ if (sd == INVALID_SOCKET) {
+ assert (!bReadAttemptedAfterClose);
+ bReadAttemptedAfterClose = true;
+ return;
+ }
+
+ LastIo = gCurrentLoopTime;
+
+ int total_bytes_read = 0;
+ char readbuffer [16 * 1024];
+
+ for (int i=0; i < 10; i++) {
+ // Don't read just one buffer and then move on. This is faster
+ // if there is a lot of incoming.
+ // But don't read indefinitely. Give other sockets a chance to run.
+ // NOTICE, we're reading one less than the buffer size.
+ // That's so we can put a guard byte at the end of what we send
+ // to user code.
+ // Use read instead of recv, which on Linux gives a "socket operation
+ // on nonsocket" error.
+
+
+ int r = read (sd, readbuffer, sizeof(readbuffer) - 1);
+ //cerr << "<R:" << r << ">";
+
+ if (r > 0) {
+ total_bytes_read += r;
+ LastRead = gCurrentLoopTime;
+
+ // Add a null-terminator at the the end of the buffer
+ // that we will send to the callback.
+ // DO NOT EVER CHANGE THIS. We want to explicitly allow users
+ // to be able to depend on this behavior, so they will have
+ // the option to do some things faster. Additionally it's
+ // a security guard against buffer overflows.
+ readbuffer [r] = 0;
+ if (EventCallback)
+ (*EventCallback)(GetBinding().c_str(), EM_CONNECTION_READ, readbuffer, r);
+ }
+ else if (r == 0) {
+ break;
+ }
+ else {
+ // Basically a would-block, meaning we've read everything there is to read.
+ break;
+ }
+
+ }
+
+
+ if (total_bytes_read == 0) {
+ // If we read no data on a socket that selected readable,
+ // it generally means the other end closed the connection gracefully.
+ ScheduleClose (false);
+ //bCloseNow = true;
+ }
+
+}
+
+/*********************
+PipeDescriptor::Write
+*********************/
+
+void PipeDescriptor::Write()
+{
+ int sd = GetSocket();
+ assert (sd != INVALID_SOCKET);
+
+ LastIo = gCurrentLoopTime;
+ char output_buffer [16 * 1024];
+ size_t nbytes = 0;
+
+ while ((OutboundPages.size() > 0) && (nbytes < sizeof(output_buffer))) {
+ OutboundPage *op = &(OutboundPages[0]);
+ if ((nbytes + op->Length - op->Offset) < sizeof (output_buffer)) {
+ memcpy (output_buffer + nbytes, op->Buffer + op->Offset, op->Length - op->Offset);
+ nbytes += (op->Length - op->Offset);
+ op->Free();
+ OutboundPages.pop_front();
+ }
+ else {
+ int len = sizeof(output_buffer) - nbytes;
+ memcpy (output_buffer + nbytes, op->Buffer + op->Offset, len);
+ op->Offset += len;
+ nbytes += len;
+ }
+ }
+
+ // We should never have gotten here if there were no data to write,
+ // so assert that as a sanity check.
+ // Don't bother to make sure nbytes is less than output_buffer because
+ // if it were we probably would have crashed already.
+ assert (nbytes > 0);
+
+ assert (GetSocket() != INVALID_SOCKET);
+ int bytes_written = write (GetSocket(), output_buffer, nbytes);
+
+ if (bytes_written > 0) {
+ OutboundDataSize -= bytes_written;
+ if ((size_t)bytes_written < nbytes) {
+ int len = nbytes - bytes_written;
+ char *buffer = (char*) malloc (len + 1);
+ if (!buffer)
+ throw std::runtime_error ("bad alloc throwing back data");
+ memcpy (buffer, output_buffer + bytes_written, len);
+ buffer [len] = 0;
+ OutboundPages.push_front (OutboundPage (buffer, len));
+ }
+ #ifdef HAVE_EPOLL
+ EpollEvent.events = (EPOLLIN | (SelectForWrite() ? EPOLLOUT : 0));
+ assert (MyEventMachine);
+ MyEventMachine->Modify (this);
+ #endif
+ }
+ else {
+ #ifdef OS_UNIX
+ if ((errno != EINPROGRESS) && (errno != EWOULDBLOCK) && (errno != EINTR))
+ #endif
+ #ifdef OS_WIN32
+ if ((errno != WSAEINPROGRESS) && (errno != WSAEWOULDBLOCK))
+ #endif
+ Close();
+ }
+}
+
+
+/*************************
+PipeDescriptor::Heartbeat
+*************************/
+
+void PipeDescriptor::Heartbeat()
+{
+ // If an inactivity timeout is defined, then check for it.
+ if (InactivityTimeout && ((gCurrentLoopTime - LastIo) >= InactivityTimeout))
+ ScheduleClose (false);
+ //bCloseNow = true;
+}
+
+
+/*****************************
+PipeDescriptor::SelectForRead
+*****************************/
+
+bool PipeDescriptor::SelectForRead()
+{
+ /* Pipe descriptors, being local by definition, don't have
+ * a pending state, so this is simpler than for the
+ * ConnectionDescriptor object.
+ */
+ return true;
+}
+
+/******************************
+PipeDescriptor::SelectForWrite
+******************************/
+
+bool PipeDescriptor::SelectForWrite()
+{
+ /* Pipe descriptors, being local by definition, don't have
+ * a pending state, so this is simpler than for the
+ * ConnectionDescriptor object.
+ */
+ return (GetOutboundDataSize() > 0);
+}
+
+
+
+
+/********************************
+PipeDescriptor::SendOutboundData
+********************************/
+
+int PipeDescriptor::SendOutboundData (const char *data, int length)
+{
+ //if (bCloseNow || bCloseAfterWriting)
+ if (IsCloseScheduled())
+ return 0;
+
+ if (!data && (length > 0))
+ throw std::runtime_error ("bad outbound data");
+ char *buffer = (char *) malloc (length + 1);
+ if (!buffer)
+ throw std::runtime_error ("no allocation for outbound data");
+ memcpy (buffer, data, length);
+ buffer [length] = 0;
+ OutboundPages.push_back (OutboundPage (buffer, length));
+ OutboundDataSize += length;
+ #ifdef HAVE_EPOLL
+ EpollEvent.events = (EPOLLIN | EPOLLOUT);
+ assert (MyEventMachine);
+ MyEventMachine->Modify (this);
+ #endif
+ return length;
+}
+
+/********************************
+PipeDescriptor::GetSubprocessPid
+********************************/
+
+bool PipeDescriptor::GetSubprocessPid (pid_t *pid)
+{
+ bool ok = false;
+ if (pid && (SubprocessPid > 0)) {
+ *pid = SubprocessPid;
+ ok = true;
+ }
+ return ok;
+}
+
+#endif
+
+/*****************************************************************************
+
+$Id$
+
+File: page.cpp
+Date: 30Apr06
+
+Copyright (C) 2006-07 by Francis Cianfrocca. All Rights Reserved.
+Gmail: blackhedd
+
+This program is free software; you can redistribute it and/or modify
+it under the terms of either: 1) the GNU General Public License
+as published by the Free Software Foundation; either version 2 of the
+License, or (at your option) any later version; or 2) Ruby's License.
+
+See the file COPYING for complete licensing information.
+
+*****************************************************************************/
+
+
+#include "project.h"
+
+
+/******************
+PageList::PageList
+******************/
+
+PageList::PageList()
+{
+}
+
+
+/*******************
+PageList::~PageList
+*******************/
+
+PageList::~PageList()
+{
+ while (HasPages())
+ PopFront();
+}
+
+
+/***************
+PageList::Front
+***************/
+
+void PageList::Front (const char **page, int *length)
+{
+ assert (page && length);
+
+ if (HasPages()) {
+ Page p = Pages.front();
+ *page = p.Buffer;
+ *length = p.Size;
+ }
+ else {
+ *page = NULL;
+ *length = 0;
+ }
+}
+
+
+/******************
+PageList::PopFront
+******************/
+
+void PageList::PopFront()
+{
+ if (HasPages()) {
+ Page p = Pages.front();
+ Pages.pop_front();
+ if (p.Buffer)
+ free ((void*)p.Buffer);
+ }
+}
+
+
+/******************
+PageList::HasPages
+******************/
+
+bool PageList::HasPages()
+{
+ return (Pages.size() > 0) ? true : false;
+}
+
+
+/**************
+PageList::Push
+**************/
+
+void PageList::Push (const char *buf, int size)
+{
+ if (buf && (size > 0)) {
+ char *copy = (char*) malloc (size);
+ if (!copy)
+ throw runtime_error ("no memory in pagelist");
+ memcpy (copy, buf, size);
+ Pages.push_back (Page (copy, size));
+ }
+}
+
+
+
+
+
+/*****************************************************************************
+
+$Id$
+
+File: pipe.cpp
+Date: 30May07
+
+Copyright (C) 2006-07 by Francis Cianfrocca. All Rights Reserved.
+Gmail: blackhedd
+
+This program is free software; you can redistribute it and/or modify
+it under the terms of either: 1) the GNU General Public License
+as published by the Free Software Foundation; either version 2 of the
+License, or (at your option) any later version; or 2) Ruby's License.
+
+See the file COPYING for complete licensing information.
+
+*****************************************************************************/
+
+#include "project.h"
+
+
+#ifdef OS_UNIX
+// THIS ENTIRE FILE IS ONLY COMPILED ON UNIX-LIKE SYSTEMS.
+
+/******************************
+PipeDescriptor::PipeDescriptor
+******************************/
+
+PipeDescriptor::PipeDescriptor (int fd, pid_t subpid, EventMachine_t *parent_em):
+ EventableDescriptor (fd, parent_em),
+ bReadAttemptedAfterClose (false),
+ LastIo (gCurrentLoopTime),
+ InactivityTimeout (0),
+ OutboundDataSize (0),
+ SubprocessPid (subpid)
+{
+ #ifdef HAVE_EPOLL
+ EpollEvent.events = EPOLLIN;
+ #endif
+ #ifdef HAVE_KQUEUE
+ MyEventMachine->ArmKqueueReader (this);
+ #endif
+}
+
+
+/*******************************
+PipeDescriptor::~PipeDescriptor
+*******************************/
+
+PipeDescriptor::~PipeDescriptor()
+{
+ // Run down any stranded outbound data.
+ for (size_t i=0; i < OutboundPages.size(); i++)
+ OutboundPages[i].Free();
+
+ /* As a virtual destructor, we come here before the base-class
+ * destructor that closes our file-descriptor.
+ * We have to make sure the subprocess goes down (if it's not
+ * already down) and we have to reap the zombie.
+ *
+ * This implementation is PROVISIONAL and will surely be improved.
+ * The intention here is that we never block, hence the highly
+ * undesirable sleeps. But if we can't reap the subprocess even
+ * after sending it SIGKILL, then something is wrong and we
+ * throw a fatal exception, which is also not something we should
+ * be doing.
+ *
+ * Eventually the right thing to do will be to have the reactor
+ * core respond to SIGCHLD by chaining a handler on top of the
+ * one Ruby may have installed, and dealing with a list of dead
+ * children that are pending cleanup.
+ *
+ * Since we want to have a signal processor integrated into the
+ * client-visible API, let's wait until that is done before cleaning
+ * this up.
+ *
+ * Added a very ugly hack to support passing the subprocess's exit
+ * status to the user. It only makes logical sense for user code to access
+ * the subprocess exit status in the unbind callback. But unbind is called
+ * back during the EventableDescriptor destructor. So by that time there's
+ * no way to call back this object through an object binding, because it's
+ * already been cleaned up. We might have added a parameter to the unbind
+ * callback, but that would probably break a huge amount of existing code.
+ * So the hack-solution is to define an instance variable in the EventMachine
+ * object and stick the exit status in there, where it can easily be accessed
+ * with an accessor visible to user code.
+ * User code should ONLY access the exit status from within the unbind callback.
+ * Otherwise there's no guarantee it'll be valid.
+ * This hack won't make it impossible to run multiple EventMachines in a single
+ * process, but it will make it impossible to reliably nest unbind calls
+ * within other unbind calls. (Not sure if that's even possible.)
+ */
+
+ assert (MyEventMachine);
+
+ // check if the process is already dead
+ if (waitpid (SubprocessPid, &(MyEventMachine->SubprocessExitStatus), WNOHANG) == 0) {
+ kill (SubprocessPid, SIGTERM);
+ // wait 0.25s for process to die
+ struct timespec req = {0, 250000000};
+ nanosleep (&req, NULL);
+ if (waitpid (SubprocessPid, &(MyEventMachine->SubprocessExitStatus), WNOHANG) == 0) {
+ kill (SubprocessPid, SIGKILL);
+ // wait 0.5s for process to die
+ struct timespec req = {0, 500000000};
+ nanosleep (&req, NULL);
+ if (waitpid (SubprocessPid, &(MyEventMachine->SubprocessExitStatus), WNOHANG) == 0)
+ throw std::runtime_error ("unable to reap subprocess");
+ }
+ }
+}
+
+
+
+/********************
+PipeDescriptor::Read
+********************/
+
+void PipeDescriptor::Read()
+{
+ int sd = GetSocket();
+ if (sd == INVALID_SOCKET) {
+ assert (!bReadAttemptedAfterClose);
+ bReadAttemptedAfterClose = true;
+ return;
+ }
+
+ LastIo = gCurrentLoopTime;
+
+ int total_bytes_read = 0;
+ char readbuffer [16 * 1024];
+
+ for (int i=0; i < 10; i++) {
+ // Don't read just one buffer and then move on. This is faster
+ // if there is a lot of incoming.
+ // But don't read indefinitely. Give other sockets a chance to run.
+ // NOTICE, we're reading one less than the buffer size.
+ // That's so we can put a guard byte at the end of what we send
+ // to user code.
+ // Use read instead of recv, which on Linux gives a "socket operation
+ // on nonsocket" error.
+
+
+ int r = read (sd, readbuffer, sizeof(readbuffer) - 1);
+ //cerr << "<R:" << r << ">";
+
+ if (r > 0) {
+ total_bytes_read += r;
+ LastRead = gCurrentLoopTime;
+
+ // Add a null-terminator at the the end of the buffer
+ // that we will send to the callback.
+ // DO NOT EVER CHANGE THIS. We want to explicitly allow users
+ // to be able to depend on this behavior, so they will have
+ // the option to do some things faster. Additionally it's
+ // a security guard against buffer overflows.
+ readbuffer [r] = 0;
+ if (EventCallback)
+ (*EventCallback)(GetBinding().c_str(), EM_CONNECTION_READ, readbuffer, r);
+ }
+ else if (r == 0) {
+ break;
+ }
+ else {
+ // Basically a would-block, meaning we've read everything there is to read.
+ break;
+ }
+
+ }
+
+
+ if (total_bytes_read == 0) {
+ // If we read no data on a socket that selected readable,
+ // it generally means the other end closed the connection gracefully.
+ ScheduleClose (false);
+ //bCloseNow = true;
+ }
+
+}
+
+/*********************
+PipeDescriptor::Write
+*********************/
+
+void PipeDescriptor::Write()
+{
+ int sd = GetSocket();
+ assert (sd != INVALID_SOCKET);
+
+ LastIo = gCurrentLoopTime;
+ char output_buffer [16 * 1024];
+ size_t nbytes = 0;
+
+ while ((OutboundPages.size() > 0) && (nbytes < sizeof(output_buffer))) {
+ OutboundPage *op = &(OutboundPages[0]);
+ if ((nbytes + op->Length - op->Offset) < sizeof (output_buffer)) {
+ memcpy (output_buffer + nbytes, op->Buffer + op->Offset, op->Length - op->Offset);
+ nbytes += (op->Length - op->Offset);
+ op->Free();
+ OutboundPages.pop_front();
+ }
+ else {
+ int len = sizeof(output_buffer) - nbytes;
+ memcpy (output_buffer + nbytes, op->Buffer + op->Offset, len);
+ op->Offset += len;
+ nbytes += len;
+ }
+ }
+
+ // We should never have gotten here if there were no data to write,
+ // so assert that as a sanity check.
+ // Don't bother to make sure nbytes is less than output_buffer because
+ // if it were we probably would have crashed already.
+ assert (nbytes > 0);
+
+ assert (GetSocket() != INVALID_SOCKET);
+ int bytes_written = write (GetSocket(), output_buffer, nbytes);
+
+ if (bytes_written > 0) {
+ OutboundDataSize -= bytes_written;
+ if ((size_t)bytes_written < nbytes) {
+ int len = nbytes - bytes_written;
+ char *buffer = (char*) malloc (len + 1);
+ if (!buffer)
+ throw std::runtime_error ("bad alloc throwing back data");
+ memcpy (buffer, output_buffer + bytes_written, len);
+ buffer [len] = 0;
+ OutboundPages.push_front (OutboundPage (buffer, len));
+ }
+ #ifdef HAVE_EPOLL
+ EpollEvent.events = (EPOLLIN | (SelectForWrite() ? EPOLLOUT : 0));
+ assert (MyEventMachine);
+ MyEventMachine->Modify (this);
+ #endif
+ }
+ else {
+ #ifdef OS_UNIX
+ if ((errno != EINPROGRESS) && (errno != EWOULDBLOCK) && (errno != EINTR))
+ #endif
+ #ifdef OS_WIN32
+ if ((errno != WSAEINPROGRESS) && (errno != WSAEWOULDBLOCK))
+ #endif
+ Close();
+ }
+}
+
+
+/*************************
+PipeDescriptor::Heartbeat
+*************************/
+
+void PipeDescriptor::Heartbeat()
+{
+ // If an inactivity timeout is defined, then check for it.
+ if (InactivityTimeout && ((gCurrentLoopTime - LastIo) >= InactivityTimeout))
+ ScheduleClose (false);
+ //bCloseNow = true;
+}
+
+
+/*****************************
+PipeDescriptor::SelectForRead
+*****************************/
+
+bool PipeDescriptor::SelectForRead()
+{
+ /* Pipe descriptors, being local by definition, don't have
+ * a pending state, so this is simpler than for the
+ * ConnectionDescriptor object.
+ */
+ return true;
+}
+
+/******************************
+PipeDescriptor::SelectForWrite
+******************************/
+
+bool PipeDescriptor::SelectForWrite()
+{
+ /* Pipe descriptors, being local by definition, don't have
+ * a pending state, so this is simpler than for the
+ * ConnectionDescriptor object.
+ */
+ return (GetOutboundDataSize() > 0);
+}
+
+
+
+
+/********************************
+PipeDescriptor::SendOutboundData
+********************************/
+
+int PipeDescriptor::SendOutboundData (const char *data, int length)
+{
+ //if (bCloseNow || bCloseAfterWriting)
+ if (IsCloseScheduled())
+ return 0;
+
+ if (!data && (length > 0))
+ throw std::runtime_error ("bad outbound data");
+ char *buffer = (char *) malloc (length + 1);
+ if (!buffer)
+ throw std::runtime_error ("no allocation for outbound data");
+ memcpy (buffer, data, length);
+ buffer [length] = 0;
+ OutboundPages.push_back (OutboundPage (buffer, length));
+ OutboundDataSize += length;
+ #ifdef HAVE_EPOLL
+ EpollEvent.events = (EPOLLIN | EPOLLOUT);
+ assert (MyEventMachine);
+ MyEventMachine->Modify (this);
+ #endif
+ return length;
+}
+
+/********************************
+PipeDescriptor::GetSubprocessPid
+********************************/
+
+bool PipeDescriptor::GetSubprocessPid (pid_t *pid)
+{
+ bool ok = false;
+ if (pid && (SubprocessPid > 0)) {
+ *pid = SubprocessPid;
+ ok = true;
+ }
+ return ok;
+}
+
+
+#endif // OS_UNIX
+
+/*****************************************************************************
+
+$Id$
+
+File: rubymain.cpp
+Date: 06Apr06
+
+Copyright (C) 2006-07 by Francis Cianfrocca. All Rights Reserved.
+Gmail: blackhedd
+
+This program is free software; you can redistribute it and/or modify
+it under the terms of either: 1) the GNU General Public License
+as published by the Free Software Foundation; either version 2 of the
+License, or (at your option) any later version; or 2) Ruby's License.
+
+See the file COPYING for complete licensing information.
+
+*****************************************************************************/
+
+#include "project.h"
+#include "eventmachine.h"
+#include <ruby.h>
+
+
+
+/*******
+Statics
+*******/
+
+static VALUE EmModule;
+static VALUE EmConnection;
+
+static VALUE Intern_at_signature;
+static VALUE Intern_at_timers;
+static VALUE Intern_at_conns;
+static VALUE Intern_event_callback;
+static VALUE Intern_run_deferred_callbacks;
+static VALUE Intern_delete;
+static VALUE Intern_call;
+static VALUE Intern_receive_data;
+
+static VALUE Intern_notify_readable;
+static VALUE Intern_notify_writable;
+
+/****************
+t_event_callback
+****************/
+
+static void event_callback (const char *a1, int a2, const char *a3, int a4)
+{
+ if (a2 == EM_CONNECTION_READ) {
+ VALUE t = rb_ivar_get (EmModule, Intern_at_conns);
+ VALUE q = rb_hash_aref (t, rb_str_new2(a1));
+ if (q == Qnil)
+ rb_raise (rb_eRuntimeError, "no connection");
+ rb_funcall (q, Intern_receive_data, 1, rb_str_new (a3, a4));
+ }
+ else if (a2 == EM_CONNECTION_NOTIFY_READABLE) {
+ VALUE t = rb_ivar_get (EmModule, Intern_at_conns);
+ VALUE q = rb_hash_aref (t, rb_str_new2(a1));
+ if (q == Qnil)
+ rb_raise (rb_eRuntimeError, "no connection");
+ rb_funcall (q, Intern_notify_readable, 0);
+ }
+ else if (a2 == EM_CONNECTION_NOTIFY_WRITABLE) {
+ VALUE t = rb_ivar_get (EmModule, Intern_at_conns);
+ VALUE q = rb_hash_aref (t, rb_str_new2(a1));
+ if (q == Qnil)
+ rb_raise (rb_eRuntimeError, "no connection");
+ rb_funcall (q, Intern_notify_writable, 0);
+ }
+ else if (a2 == EM_LOOPBREAK_SIGNAL) {
+ rb_funcall (EmModule, Intern_run_deferred_callbacks, 0);
+ }
+ else if (a2 == EM_TIMER_FIRED) {
+ VALUE t = rb_ivar_get (EmModule, Intern_at_timers);
+ VALUE q = rb_funcall (t, Intern_delete, 1, rb_str_new(a3, a4));
+ if (q == Qnil)
+ rb_raise (rb_eRuntimeError, "no timer");
+ rb_funcall (q, Intern_call, 0);
+ }
+ else
+ rb_funcall (EmModule, Intern_event_callback, 3, rb_str_new2(a1), (a2 << 1) | 1, rb_str_new(a3,a4));
+}
+
+
+
+/**************************
+t_initialize_event_machine
+**************************/
+
+static VALUE t_initialize_event_machine (VALUE self)
+{
+ evma_initialize_library (event_callback);
+ return Qnil;
+}
+
+
+
+/*****************************
+t_run_machine_without_threads
+*****************************/
+
+static VALUE t_run_machine_without_threads (VALUE self)
+{
+ evma_run_machine();
+ return Qnil;
+}
+
+
+/*******************
+t_add_oneshot_timer
+*******************/
+
+static VALUE t_add_oneshot_timer (VALUE self, VALUE interval)
+{
+ const char *f = evma_install_oneshot_timer (FIX2INT (interval));
+ if (!f || !*f)
+ rb_raise (rb_eRuntimeError, "no timer");
+ return rb_str_new2 (f);
+}
+
+
+/**************
+t_start_server
+**************/
+
+static VALUE t_start_server (VALUE self, VALUE server, VALUE port)
+{
+ const char *f = evma_create_tcp_server (StringValuePtr(server), FIX2INT(port));
+ if (!f || !*f)
+ rb_raise (rb_eRuntimeError, "no acceptor");
+ return rb_str_new2 (f);
+}
+
+/*************
+t_stop_server
+*************/
+
+static VALUE t_stop_server (VALUE self, VALUE signature)
+{
+ evma_stop_tcp_server (StringValuePtr (signature));
+ return Qnil;
+}
+
+
+/*******************
+t_start_unix_server
+*******************/
+
+static VALUE t_start_unix_server (VALUE self, VALUE filename)
+{
+ const char *f = evma_create_unix_domain_server (StringValuePtr(filename));
+ if (!f || !*f)
+ rb_raise (rb_eRuntimeError, "no unix-domain acceptor");
+ return rb_str_new2 (f);
+}
+
+
+
+/***********
+t_send_data
+***********/
+
+static VALUE t_send_data (VALUE self, VALUE signature, VALUE data, VALUE data_length)
+{
+ int b = evma_send_data_to_connection (StringValuePtr (signature), StringValuePtr (data), FIX2INT (data_length));
+ return INT2NUM (b);
+}
+
+
+/***********
+t_start_tls
+***********/
+
+static VALUE t_start_tls (VALUE self, VALUE signature)
+{
+ evma_start_tls (StringValuePtr (signature));
+ return Qnil;
+}
+
+/***************
+t_set_tls_parms
+***************/
+
+static VALUE t_set_tls_parms (VALUE self, VALUE signature, VALUE privkeyfile, VALUE certchainfile)
+{
+ /* set_tls_parms takes a series of positional arguments for specifying such things
+ * as private keys and certificate chains.
+ * It's expected that the parameter list will grow as we add more supported features.
+ * ALL of these parameters are optional, and can be specified as empty or NULL strings.
+ */
+ evma_set_tls_parms (StringValuePtr (signature), StringValuePtr (privkeyfile), StringValuePtr (certchainfile) );
+ return Qnil;
+}
+
+/**************
+t_get_peername
+**************/
+
+static VALUE t_get_peername (VALUE self, VALUE signature)
+{
+ struct sockaddr s;
+ if (evma_get_peername (StringValuePtr (signature), &s)) {
+ return rb_str_new ((const char*)&s, sizeof(s));
+ }
+
+ return Qnil;
+}
+
+/**************
+t_get_sockname
+**************/
+
+static VALUE t_get_sockname (VALUE self, VALUE signature)
+{
+ struct sockaddr s;
+ if (evma_get_sockname (StringValuePtr (signature), &s)) {
+ return rb_str_new ((const char*)&s, sizeof(s));
+ }
+
+ return Qnil;
+}
+
+/********************
+t_get_subprocess_pid
+********************/
+
+static VALUE t_get_subprocess_pid (VALUE self, VALUE signature)
+{
+ pid_t pid;
+ if (evma_get_subprocess_pid (StringValuePtr (signature), &pid)) {
+ return INT2NUM (pid);
+ }
+
+ return Qnil;
+}
+
+/***********************
+t_get_subprocess_status
+***********************/
+
+static VALUE t_get_subprocess_status (VALUE self, VALUE signature)
+{
+ int status;
+ if (evma_get_subprocess_status (StringValuePtr (signature), &status)) {
+ return INT2NUM (status);
+ }
+
+ return Qnil;
+}
+
+/*****************************
+t_get_comm_inactivity_timeout
+*****************************/
+
+static VALUE t_get_comm_inactivity_timeout (VALUE self, VALUE signature)
+{
+ int timeout;
+ if (evma_get_comm_inactivity_timeout (StringValuePtr (signature), &timeout))
+ return INT2FIX (timeout);
+ return Qnil;
+}
+
+/*****************************
+t_set_comm_inactivity_timeout
+*****************************/
+
+static VALUE t_set_comm_inactivity_timeout (VALUE self, VALUE signature, VALUE timeout)
+{
+ int ti = FIX2INT (timeout);
+ if (evma_set_comm_inactivity_timeout (StringValuePtr (signature), &ti));
+ return Qtrue;
+ return Qnil;
+}
+
+
+/***************
+t_send_datagram
+***************/
+
+static VALUE t_send_datagram (VALUE self, VALUE signature, VALUE data, VALUE data_length, VALUE address, VALUE port)
+{
+ int b = evma_send_datagram (StringValuePtr (signature), StringValuePtr (data), FIX2INT (data_length), StringValuePtr(address), FIX2INT(port));
+ return INT2NUM (b);
+}
+
+
+/******************
+t_close_connection
+******************/
+
+static VALUE t_close_connection (VALUE self, VALUE signature, VALUE after_writing)
+{
+ evma_close_connection (StringValuePtr (signature), ((after_writing == Qtrue) ? 1 : 0));
+ return Qnil;
+}
+
+/********************************
+t_report_connection_error_status
+********************************/
+
+static VALUE t_report_connection_error_status (VALUE self, VALUE signature)
+{
+ int b = evma_report_connection_error_status (StringValuePtr (signature));
+ return INT2NUM (b);
+}
+
+
+
+/****************
+t_connect_server
+****************/
+
+static VALUE t_connect_server (VALUE self, VALUE server, VALUE port)
+{
+ // Avoid FIX2INT in this case, because it doesn't deal with type errors properly.
+ // Specifically, if the value of port comes in as a string rather than an integer,
+ // NUM2INT will throw a type error, but FIX2INT will generate garbage.
+
+ const char *f = evma_connect_to_server (StringValuePtr(server), NUM2INT(port));
+ if (!f || !*f)
+ rb_raise (rb_eRuntimeError, "no connection");
+ return rb_str_new2 (f);
+}
+
+/*********************
+t_connect_unix_server
+*********************/
+
+static VALUE t_connect_unix_server (VALUE self, VALUE serversocket)
+{
+ const char *f = evma_connect_to_unix_server (StringValuePtr(serversocket));
+ if (!f || !*f)
+ rb_raise (rb_eRuntimeError, "no connection");
+ return rb_str_new2 (f);
+}
+
+/***********
+t_attach_fd
+***********/
+
+static VALUE t_attach_fd (VALUE self, VALUE file_descriptor, VALUE read_mode, VALUE write_mode)
+{
+ const char *f = evma_attach_fd (NUM2INT(file_descriptor), (read_mode == Qtrue) ? 1 : 0, (write_mode == Qtrue) ? 1 : 0);
+ if (!f || !*f)
+ rb_raise (rb_eRuntimeError, "no connection");
+ return rb_str_new2 (f);
+}
+
+/***********
+t_detach_fd
+***********/
+
+static VALUE t_detach_fd (VALUE self, VALUE signature)
+{
+ return INT2NUM(evma_detach_fd (StringValuePtr(signature)));
+}
+
+/*****************
+t_open_udp_socket
+*****************/
+
+static VALUE t_open_udp_socket (VALUE self, VALUE server, VALUE port)
+{
+ const char *f = evma_open_datagram_socket (StringValuePtr(server), FIX2INT(port));
+ if (!f || !*f)
+ rb_raise (rb_eRuntimeError, "no datagram socket");
+ return rb_str_new2 (f);
+}
+
+
+
+/*****************
+t_release_machine
+*****************/
+
+static VALUE t_release_machine (VALUE self)
+{
+ evma_release_library();
+ return Qnil;
+}
+
+
+/******
+t_stop
+******/
+
+static VALUE t_stop (VALUE self)
+{
+ evma_stop_machine();
+ return Qnil;
+}
+
+/******************
+t_signal_loopbreak
+******************/
+
+static VALUE t_signal_loopbreak (VALUE self)
+{
+ evma_signal_loopbreak();
+ return Qnil;
+}
+
+/**************
+t_library_type
+**************/
+
+static VALUE t_library_type (VALUE self)
+{
+ return rb_eval_string (":extension");
+}
+
+
+
+/*******************
+t_set_timer_quantum
+*******************/
+
+static VALUE t_set_timer_quantum (VALUE self, VALUE interval)
+{
+ evma_set_timer_quantum (FIX2INT (interval));
+ return Qnil;
+}
+
+/********************
+t_set_max_timer_count
+********************/
+
+static VALUE t_set_max_timer_count (VALUE self, VALUE ct)
+{
+ evma_set_max_timer_count (FIX2INT (ct));
+ return Qnil;
+}
+
+/***************
+t_setuid_string
+***************/
+
+static VALUE t_setuid_string (VALUE self, VALUE username)
+{
+ evma_setuid_string (StringValuePtr (username));
+ return Qnil;
+}
+
+
+
+/*************
+t__write_file
+*************/
+
+static VALUE t__write_file (VALUE self, VALUE filename)
+{
+ const char *f = evma__write_file (StringValuePtr (filename));
+ if (!f || !*f)
+ rb_raise (rb_eRuntimeError, "file not opened");
+ return rb_str_new2 (f);
+}
+
+/**************
+t_invoke_popen
+**************/
+
+static VALUE t_invoke_popen (VALUE self, VALUE cmd)
+{
+ // 1.8.7+
+ #ifdef RARRAY_LEN
+ int len = RARRAY_LEN(cmd);
+ #else
+ int len = RARRAY (cmd)->len;
+ #endif
+ if (len > 98)
+ rb_raise (rb_eRuntimeError, "too many arguments to popen");
+ char *strings [100];
+ for (int i=0; i < len; i++) {
+ VALUE ix = INT2FIX (i);
+ VALUE s = rb_ary_aref (1, &ix, cmd);
+ strings[i] = StringValuePtr (s);
+ }
+ strings[len] = NULL;
+
+ const char *f = evma_popen (strings);
+ if (!f || !*f) {
+ char *err = strerror (errno);
+ char buf[100];
+ memset (buf, 0, sizeof(buf));
+ snprintf (buf, sizeof(buf)-1, "no popen: %s", (err?err:"???"));
+ rb_raise (rb_eRuntimeError, buf);
+ }
+ return rb_str_new2 (f);
+}
+
+
+/***************
+t_read_keyboard
+***************/
+
+static VALUE t_read_keyboard (VALUE self)
+{
+ const char *f = evma_open_keyboard();
+ if (!f || !*f)
+ rb_raise (rb_eRuntimeError, "no keyboard reader");
+ return rb_str_new2 (f);
+}
+
+
+/********
+t__epoll
+********/
+
+static VALUE t__epoll (VALUE self)
+{
+ // Temporary.
+ evma__epoll();
+ return Qnil;
+}
+
+/**********
+t__epoll_p
+**********/
+
+static VALUE t__epoll_p (VALUE self)
+{
+ #ifdef HAVE_EPOLL
+ return Qtrue;
+ #else
+ return Qfalse;
+ #endif
+}
+
+
+/*********
+t__kqueue
+*********/
+
+static VALUE t__kqueue (VALUE self)
+{
+ // Temporary.
+ evma__kqueue();
+ return Qnil;
+}
+
+/***********
+t__kqueue_p
+***********/
+
+static VALUE t__kqueue_p (VALUE self)
+{
+ #ifdef HAVE_KQUEUE
+ return Qtrue;
+ #else
+ return Qfalse;
+ #endif
+}
+
+
+/****************
+t_send_file_data
+****************/
+
+static VALUE t_send_file_data (VALUE self, VALUE signature, VALUE filename)
+{
+
+ /* The current implementation of evma_send_file_data_to_connection enforces a strict
+ * upper limit on the file size it will transmit (currently 32K). The function returns
+ * zero on success, -1 if the requested file exceeds its size limit, and a positive
+ * number for other errors.
+ * TODO: Positive return values are actually errno's, which is probably the wrong way to
+ * do this. For one thing it's ugly. For another, we can't be sure zero is never a real errno.
+ */
+
+ int b = evma_send_file_data_to_connection (StringValuePtr(signature), StringValuePtr(filename));
+ if (b == -1)
+ rb_raise(rb_eRuntimeError, "File too large. send_file_data() supports files under 32k.");
+ if (b > 0) {
+ char *err = strerror (b);
+ char buf[1024];
+ memset (buf, 0, sizeof(buf));
+ snprintf (buf, sizeof(buf)-1, ": %s %s", StringValuePtr(filename),(err?err:"???"));
+
+ rb_raise (rb_eIOError, buf);
+ }
+
+ return INT2NUM (0);
+}
+
+
+/*******************
+t_set_rlimit_nofile
+*******************/
+
+static VALUE t_set_rlimit_nofile (VALUE self, VALUE arg)
+{
+ arg = (NIL_P(arg)) ? -1 : NUM2INT (arg);
+ return INT2NUM (evma_set_rlimit_nofile (arg));
+}
+
+/***************************
+conn_get_outbound_data_size
+***************************/
+
+static VALUE conn_get_outbound_data_size (VALUE self)
+{
+ VALUE sig = rb_ivar_get (self, Intern_at_signature);
+ return INT2NUM (evma_get_outbound_data_size (StringValuePtr(sig)));
+}
+
+
+/******************************
+conn_associate_callback_target
+******************************/
+
+static VALUE conn_associate_callback_target (VALUE self, VALUE sig)
+{
+ // No-op for the time being.
+ return Qnil;
+}
+
+
+/***************
+t_get_loop_time
+****************/
+
+static VALUE t_get_loop_time (VALUE self)
+{
+ VALUE cTime = rb_path2class("Time");
+ if (gCurrentLoopTime != 0) {
+ return rb_funcall(cTime,
+ rb_intern("at"),
+ 1,
+ INT2NUM(gCurrentLoopTime));
+ }
+ return Qnil;
+}
+
+
+/*********************
+Init_rubyeventmachine
+*********************/
+
+extern "C" void Init_rubyeventmachine()
+{
+ // Tuck away some symbol values so we don't have to look 'em up every time we need 'em.
+ Intern_at_signature = rb_intern ("@signature");
+ Intern_at_timers = rb_intern ("@timers");
+ Intern_at_conns = rb_intern ("@conns");
+
+ Intern_event_callback = rb_intern ("event_callback");
+ Intern_run_deferred_callbacks = rb_intern ("run_deferred_callbacks");
+ Intern_delete = rb_intern ("delete");
+ Intern_call = rb_intern ("call");
+ Intern_receive_data = rb_intern ("receive_data");
+
+ Intern_notify_readable = rb_intern ("notify_readable");
+ Intern_notify_writable = rb_intern ("notify_writable");
+
+ // INCOMPLETE, we need to define class Connections inside module EventMachine
+ // run_machine and run_machine_without_threads are now identical.
+ // Must deprecate the without_threads variant.
+ EmModule = rb_define_module ("EventMachine");
+ EmConnection = rb_define_class_under (EmModule, "Connection", rb_cObject);
+
+ rb_define_class_under (EmModule, "ConnectionNotBound", rb_eException);
+ rb_define_class_under (EmModule, "NoHandlerForAcceptedConnection", rb_eException);
+ rb_define_class_under (EmModule, "UnknownTimerFired", rb_eException);
+
+ rb_define_module_function (EmModule, "initialize_event_machine", (VALUE(*)(...))t_initialize_event_machine, 0);
+ rb_define_module_function (EmModule, "run_machine", (VALUE(*)(...))t_run_machine_without_threads, 0);
+ rb_define_module_function (EmModule, "run_machine_without_threads", (VALUE(*)(...))t_run_machine_without_threads, 0);
+ rb_define_module_function (EmModule, "add_oneshot_timer", (VALUE(*)(...))t_add_oneshot_timer, 1);
+ rb_define_module_function (EmModule, "start_tcp_server", (VALUE(*)(...))t_start_server, 2);
+ rb_define_module_function (EmModule, "stop_tcp_server", (VALUE(*)(...))t_stop_server, 1);
+ rb_define_module_function (EmModule, "start_unix_server", (VALUE(*)(...))t_start_unix_server, 1);
+ rb_define_module_function (EmModule, "set_tls_parms", (VALUE(*)(...))t_set_tls_parms, 3);
+ rb_define_module_function (EmModule, "start_tls", (VALUE(*)(...))t_start_tls, 1);
+ rb_define_module_function (EmModule, "send_data", (VALUE(*)(...))t_send_data, 3);
+ rb_define_module_function (EmModule, "send_datagram", (VALUE(*)(...))t_send_datagram, 5);
+ rb_define_module_function (EmModule, "close_connection", (VALUE(*)(...))t_close_connection, 2);
+ rb_define_module_function (EmModule, "report_connection_error_status", (VALUE(*)(...))t_report_connection_error_status, 1);
+ rb_define_module_function (EmModule, "connect_server", (VALUE(*)(...))t_connect_server, 2);
+ rb_define_module_function (EmModule, "connect_unix_server", (VALUE(*)(...))t_connect_unix_server, 1);
+
+ rb_define_module_function (EmModule, "attach_fd", (VALUE (*)(...))t_attach_fd, 3);
+ rb_define_module_function (EmModule, "detach_fd", (VALUE (*)(...))t_detach_fd, 1);
+
+ rb_define_module_function (EmModule, "current_time", (VALUE(*)(...))t_get_loop_time, 0);
+
+ rb_define_module_function (EmModule, "open_udp_socket", (VALUE(*)(...))t_open_udp_socket, 2);
+ rb_define_module_function (EmModule, "read_keyboard", (VALUE(*)(...))t_read_keyboard, 0);
+ rb_define_module_function (EmModule, "release_machine", (VALUE(*)(...))t_release_machine, 0);
+ rb_define_module_function (EmModule, "stop", (VALUE(*)(...))t_stop, 0);
+ rb_define_module_function (EmModule, "signal_loopbreak", (VALUE(*)(...))t_signal_loopbreak, 0);
+ rb_define_module_function (EmModule, "library_type", (VALUE(*)(...))t_library_type, 0);
+ rb_define_module_function (EmModule, "set_timer_quantum", (VALUE(*)(...))t_set_timer_quantum, 1);
+ rb_define_module_function (EmModule, "set_max_timer_count", (VALUE(*)(...))t_set_max_timer_count, 1);
+ rb_define_module_function (EmModule, "setuid_string", (VALUE(*)(...))t_setuid_string, 1);
+ rb_define_module_function (EmModule, "invoke_popen", (VALUE(*)(...))t_invoke_popen, 1);
+ rb_define_module_function (EmModule, "send_file_data", (VALUE(*)(...))t_send_file_data, 2);
+
+ // Provisional:
+ rb_define_module_function (EmModule, "_write_file", (VALUE(*)(...))t__write_file, 1);
+
+ rb_define_module_function (EmModule, "get_peername", (VALUE(*)(...))t_get_peername, 1);
+ rb_define_module_function (EmModule, "get_sockname", (VALUE(*)(...))t_get_sockname, 1);
+ rb_define_module_function (EmModule, "get_subprocess_pid", (VALUE(*)(...))t_get_subprocess_pid, 1);
+ rb_define_module_function (EmModule, "get_subprocess_status", (VALUE(*)(...))t_get_subprocess_status, 1);
+ rb_define_module_function (EmModule, "get_comm_inactivity_timeout", (VALUE(*)(...))t_get_comm_inactivity_timeout, 1);
+ rb_define_module_function (EmModule, "set_comm_inactivity_timeout", (VALUE(*)(...))t_set_comm_inactivity_timeout, 2);
+ rb_define_module_function (EmModule, "set_rlimit_nofile", (VALUE(*)(...))t_set_rlimit_nofile, 1);
+
+ // Temporary:
+ rb_define_module_function (EmModule, "epoll", (VALUE(*)(...))t__epoll, 0);
+ rb_define_module_function (EmModule, "kqueue", (VALUE(*)(...))t__kqueue, 0);
+
+ rb_define_module_function (EmModule, "epoll?", (VALUE(*)(...))t__epoll_p, 0);
+ rb_define_module_function (EmModule, "kqueue?", (VALUE(*)(...))t__kqueue_p, 0);
+
+ rb_define_method (EmConnection, "get_outbound_data_size", (VALUE(*)(...))conn_get_outbound_data_size, 0);
+ rb_define_method (EmConnection, "associate_callback_target", (VALUE(*)(...))conn_associate_callback_target, 1);
+
+ rb_define_const (EmModule, "TimerFired", INT2NUM(100));
+ rb_define_const (EmModule, "ConnectionData", INT2NUM(101));
+ rb_define_const (EmModule, "ConnectionUnbound", INT2NUM(102));
+ rb_define_const (EmModule, "ConnectionAccepted", INT2NUM(103));
+ rb_define_const (EmModule, "ConnectionCompleted", INT2NUM(104));
+ rb_define_const (EmModule, "LoopbreakSignalled", INT2NUM(105));
+
+ rb_define_const (EmModule, "ConnectionNotifyReadable", INT2NUM(106));
+ rb_define_const (EmModule, "ConnectionNotifyWritable", INT2NUM(107));
+
+}
+
+/*****************************************************************************
+
+$Id$
+
+File: sigs.cpp
+Date: 06Apr06
+
+Copyright (C) 2006-07 by Francis Cianfrocca. All Rights Reserved.
+Gmail: blackhedd
+
+This program is free software; you can redistribute it and/or modify
+it under the terms of either: 1) the GNU General Public License
+as published by the Free Software Foundation; either version 2 of the
+License, or (at your option) any later version; or 2) Ruby's License.
+
+See the file COPYING for complete licensing information.
+
+*****************************************************************************/
+
+#include "project.h"
+
+
+bool gTerminateSignalReceived;
+
+
+/**************
+SigtermHandler
+**************/
+
+void SigtermHandler (int sig)
+{
+ // This is a signal-handler, don't do anything frisky. Interrupts are disabled.
+ // Set the terminate flag WITHOUT trying to lock a mutex- otherwise we can easily
+ // self-deadlock, especially if the event machine is looping quickly.
+ gTerminateSignalReceived = true;
+}
+
+
+/*********************
+InstallSignalHandlers
+*********************/
+
+void InstallSignalHandlers()
+{
+ #ifdef OS_UNIX
+ static bool bInstalled = false;
+ if (!bInstalled) {
+ bInstalled = true;
+ signal (SIGINT, SigtermHandler);
+ signal (SIGTERM, SigtermHandler);
+ signal (SIGPIPE, SIG_IGN);
+ }
+ #endif
+}
+
+
+
+/*******************
+WintelSignalHandler
+*******************/
+
+#ifdef OS_WIN32
+BOOL WINAPI WintelSignalHandler (DWORD control)
+{
+ if (control == CTRL_C_EVENT)
+ gTerminateSignalReceived = true;
+ return TRUE;
+}
+#endif
+
+/************
+HookControlC
+************/
+
+#ifdef OS_WIN32
+void HookControlC (bool hook)
+{
+ if (hook) {
+ // INSTALL hook
+ SetConsoleCtrlHandler (WintelSignalHandler, TRUE);
+ }
+ else {
+ // UNINSTALL hook
+ SetConsoleCtrlHandler (WintelSignalHandler, FALSE);
+ }
+}
+#endif
+
+
+/*****************************************************************************
+
+$Id$
+
+File: ssl.cpp
+Date: 30Apr06
+
+Copyright (C) 2006-07 by Francis Cianfrocca. All Rights Reserved.
+Gmail: blackhedd
+
+This program is free software; you can redistribute it and/or modify
+it under the terms of either: 1) the GNU General Public License
+as published by the Free Software Foundation; either version 2 of the
+License, or (at your option) any later version; or 2) Ruby's License.
+
+See the file COPYING for complete licensing information.
+
+*****************************************************************************/
+
+
+#ifdef WITH_SSL
+
+#include "project.h"
+
+
+bool SslContext_t::bLibraryInitialized = false;
+
+
+
+static void InitializeDefaultCredentials();
+static EVP_PKEY *DefaultPrivateKey = NULL;
+static X509 *DefaultCertificate = NULL;
+
+static char PrivateMaterials[] = {
+"-----BEGIN RSA PRIVATE KEY-----\n"
+"MIICXAIBAAKBgQDCYYhcw6cGRbhBVShKmbWm7UVsEoBnUf0cCh8AX+MKhMxwVDWV\n"
+"Igdskntn3cSJjRtmgVJHIK0lpb/FYHQB93Ohpd9/Z18pDmovfFF9nDbFF0t39hJ/\n"
+"AqSzFB3GiVPoFFZJEE1vJqh+3jzsSF5K56bZ6azz38VlZgXeSozNW5bXkQIDAQAB\n"
+"AoGALA89gIFcr6BIBo8N5fL3aNHpZXjAICtGav+kTUpuxSiaym9cAeTHuAVv8Xgk\n"
+"H2Wbq11uz+6JMLpkQJH/WZ7EV59DPOicXrp0Imr73F3EXBfR7t2EQDYHPMthOA1D\n"
+"I9EtCzvV608Ze90hiJ7E3guGrGppZfJ+eUWCPgy8CZH1vRECQQDv67rwV/oU1aDo\n"
+"6/+d5nqjeW6mWkGqTnUU96jXap8EIw6B+0cUKskwx6mHJv+tEMM2748ZY7b0yBlg\n"
+"w4KDghbFAkEAz2h8PjSJG55LwqmXih1RONSgdN9hjB12LwXL1CaDh7/lkEhq0PlK\n"
+"PCAUwQSdM17Sl0Xxm2CZiekTSlwmHrtqXQJAF3+8QJwtV2sRJp8u2zVe37IeH1cJ\n"
+"xXeHyjTzqZ2803fnjN2iuZvzNr7noOA1/Kp+pFvUZUU5/0G2Ep8zolPUjQJAFA7k\n"
+"xRdLkzIx3XeNQjwnmLlncyYPRv+qaE3FMpUu7zftuZBnVCJnvXzUxP3vPgKTlzGa\n"
+"dg5XivDRfsV+okY5uQJBAMV4FesUuLQVEKb6lMs7rzZwpeGQhFDRfywJzfom2TLn\n"
+"2RdJQQ3dcgnhdVDgt5o1qkmsqQh8uJrJ9SdyLIaZQIc=\n"
+"-----END RSA PRIVATE KEY-----\n"
+"-----BEGIN CERTIFICATE-----\n"
+"MIID6TCCA1KgAwIBAgIJANm4W/Tzs+s+MA0GCSqGSIb3DQEBBQUAMIGqMQswCQYD\n"
+"VQQGEwJVUzERMA8GA1UECBMITmV3IFlvcmsxETAPBgNVBAcTCE5ldyBZb3JrMRYw\n"
+"FAYDVQQKEw1TdGVhbWhlYXQubmV0MRQwEgYDVQQLEwtFbmdpbmVlcmluZzEdMBsG\n"
+"A1UEAxMUb3BlbmNhLnN0ZWFtaGVhdC5uZXQxKDAmBgkqhkiG9w0BCQEWGWVuZ2lu\n"
+"ZWVyaW5nQHN0ZWFtaGVhdC5uZXQwHhcNMDYwNTA1MTcwNjAzWhcNMjQwMjIwMTcw\n"
+"NjAzWjCBqjELMAkGA1UEBhMCVVMxETAPBgNVBAgTCE5ldyBZb3JrMREwDwYDVQQH\n"
+"EwhOZXcgWW9yazEWMBQGA1UEChMNU3RlYW1oZWF0Lm5ldDEUMBIGA1UECxMLRW5n\n"
+"aW5lZXJpbmcxHTAbBgNVBAMTFG9wZW5jYS5zdGVhbWhlYXQubmV0MSgwJgYJKoZI\n"
+"hvcNAQkBFhllbmdpbmVlcmluZ0BzdGVhbWhlYXQubmV0MIGfMA0GCSqGSIb3DQEB\n"
+"AQUAA4GNADCBiQKBgQDCYYhcw6cGRbhBVShKmbWm7UVsEoBnUf0cCh8AX+MKhMxw\n"
+"VDWVIgdskntn3cSJjRtmgVJHIK0lpb/FYHQB93Ohpd9/Z18pDmovfFF9nDbFF0t3\n"
+"9hJ/AqSzFB3GiVPoFFZJEE1vJqh+3jzsSF5K56bZ6azz38VlZgXeSozNW5bXkQID\n"
+"AQABo4IBEzCCAQ8wHQYDVR0OBBYEFPJvPd1Fcmd8o/Tm88r+NjYPICCkMIHfBgNV\n"
+"HSMEgdcwgdSAFPJvPd1Fcmd8o/Tm88r+NjYPICCkoYGwpIGtMIGqMQswCQYDVQQG\n"
+"EwJVUzERMA8GA1UECBMITmV3IFlvcmsxETAPBgNVBAcTCE5ldyBZb3JrMRYwFAYD\n"
+"VQQKEw1TdGVhbWhlYXQubmV0MRQwEgYDVQQLEwtFbmdpbmVlcmluZzEdMBsGA1UE\n"
+"AxMUb3BlbmNhLnN0ZWFtaGVhdC5uZXQxKDAmBgkqhkiG9w0BCQEWGWVuZ2luZWVy\n"
+"aW5nQHN0ZWFtaGVhdC5uZXSCCQDZuFv087PrPjAMBgNVHRMEBTADAQH/MA0GCSqG\n"
+"SIb3DQEBBQUAA4GBAC1CXey/4UoLgJiwcEMDxOvW74plks23090iziFIlGgcIhk0\n"
+"Df6hTAs7H3MWww62ddvR8l07AWfSzSP5L6mDsbvq7EmQsmPODwb6C+i2aF3EDL8j\n"
+"uw73m4YIGI0Zw2XdBpiOGkx2H56Kya6mJJe/5XORZedh1wpI7zki01tHYbcy\n"
+"-----END CERTIFICATE-----\n"};
+
+/* These private materials were made with:
+ * openssl req -new -x509 -keyout cakey.pem -out cacert.pem -nodes -days 6500
+ * TODO: We need a full-blown capability to work with user-supplied
+ * keypairs and properly-signed certificates.
+ */
+
+
+/*****************
+builtin_passwd_cb
+*****************/
+
+extern "C" int builtin_passwd_cb (char *buf, int bufsize, int rwflag, void *userdata)
+{
+ strcpy (buf, "kittycat");
+ return 8;
+}
+
+/****************************
+InitializeDefaultCredentials
+****************************/
+
+static void InitializeDefaultCredentials()
+{
+ BIO *bio = BIO_new_mem_buf (PrivateMaterials, -1);
+ assert (bio);
+
+ if (DefaultPrivateKey) {
+ // we may come here in a restart.
+ EVP_PKEY_free (DefaultPrivateKey);
+ DefaultPrivateKey = NULL;
+ }
+ PEM_read_bio_PrivateKey (bio, &DefaultPrivateKey, builtin_passwd_cb, 0);
+
+ if (DefaultCertificate) {
+ // we may come here in a restart.
+ X509_free (DefaultCertificate);
+ DefaultCertificate = NULL;
+ }
+ PEM_read_bio_X509 (bio, &DefaultCertificate, NULL, 0);
+
+ BIO_free (bio);
+}
+
+
+
+/**************************
+SslContext_t::SslContext_t
+**************************/
+
+SslContext_t::SslContext_t (bool is_server, const string &privkeyfile, const string &certchainfile):
+ pCtx (NULL),
+ PrivateKey (NULL),
+ Certificate (NULL)
+{
+ /* TODO: the usage of the specified private-key and cert-chain filenames only applies to
+ * client-side connections at this point. Server connections currently use the default materials.
+ * That needs to be fixed asap.
+ * Also, in this implementation, server-side connections use statically defined X-509 defaults.
+ * One thing I'm really not clear on is whether or not you have to explicitly free X509 and EVP_PKEY
+ * objects when we call our destructor, or whether just calling SSL_CTX_free is enough.
+ */
+
+ if (!bLibraryInitialized) {
+ bLibraryInitialized = true;
+ SSL_library_init();
+ OpenSSL_add_ssl_algorithms();
+ OpenSSL_add_all_algorithms();
+ SSL_load_error_strings();
+ ERR_load_crypto_strings();
+
+ InitializeDefaultCredentials();
+ }
+
+ bIsServer = is_server;
+ pCtx = SSL_CTX_new (is_server ? SSLv23_server_method() : SSLv23_client_method());
+ if (!pCtx)
+ throw std::runtime_error ("no SSL context");
+
+ SSL_CTX_set_options (pCtx, SSL_OP_ALL);
+ //SSL_CTX_set_options (pCtx, (SSL_OP_NO_SSLv2 | SSL_OP_NO_SSLv3));
+
+ if (is_server) {
+ // The SSL_CTX calls here do NOT allocate memory.
+ int e;
+ if (privkeyfile.length() > 0)
+ e = SSL_CTX_use_PrivateKey_file (pCtx, privkeyfile.c_str(), SSL_FILETYPE_PEM);
+ else
+ e = SSL_CTX_use_PrivateKey (pCtx, DefaultPrivateKey);
+ assert (e > 0);
+ if (certchainfile.length() > 0)
+ e = SSL_CTX_use_certificate_chain_file (pCtx, certchainfile.c_str());
+ else
+ e = SSL_CTX_use_certificate (pCtx, DefaultCertificate);
+ assert (e > 0);
+ }
+
+ SSL_CTX_set_cipher_list (pCtx, "ALL:!ADH:!LOW:!EXP:!DES-CBC3-SHA:@STRENGTH");
+
+ if (is_server) {
+ SSL_CTX_sess_set_cache_size (pCtx, 128);
+ SSL_CTX_set_session_id_context (pCtx, (unsigned char*)"eventmachine", 12);
+ }
+ else {
+ int e;
+ if (privkeyfile.length() > 0) {
+ e = SSL_CTX_use_PrivateKey_file (pCtx, privkeyfile.c_str(), SSL_FILETYPE_PEM);
+ assert (e > 0);
+ }
+ if (certchainfile.length() > 0) {
+ e = SSL_CTX_use_certificate_chain_file (pCtx, certchainfile.c_str());
+ assert (e > 0);
+ }
+ }
+}
+
+
+
+/***************************
+SslContext_t::~SslContext_t
+***************************/
+
+SslContext_t::~SslContext_t()
+{
+ if (pCtx)
+ SSL_CTX_free (pCtx);
+ if (PrivateKey)
+ EVP_PKEY_free (PrivateKey);
+ if (Certificate)
+ X509_free (Certificate);
+}
+
+
+
+/******************
+SslBox_t::SslBox_t
+******************/
+
+SslBox_t::SslBox_t (bool is_server, const string &privkeyfile, const string &certchainfile):
+ bIsServer (is_server),
+ pSSL (NULL),
+ pbioRead (NULL),
+ pbioWrite (NULL)
+{
+ /* TODO someday: make it possible to re-use SSL contexts so we don't have to create
+ * a new one every time we come here.
+ */
+
+ Context = new SslContext_t (bIsServer, privkeyfile, certchainfile);
+ assert (Context);
+
+ pbioRead = BIO_new (BIO_s_mem());
+ assert (pbioRead);
+
+ pbioWrite = BIO_new (BIO_s_mem());
+ assert (pbioWrite);
+
+ pSSL = SSL_new (Context->pCtx);
+ assert (pSSL);
+ SSL_set_bio (pSSL, pbioRead, pbioWrite);
+
+ if (!bIsServer)
+ SSL_connect (pSSL);
+}
+
+
+
+/*******************
+SslBox_t::~SslBox_t
+*******************/
+
+SslBox_t::~SslBox_t()
+{
+ // Freeing pSSL will also free the associated BIOs, so DON'T free them separately.
+ if (pSSL) {
+ if (SSL_get_shutdown (pSSL) & SSL_RECEIVED_SHUTDOWN)
+ SSL_shutdown (pSSL);
+ else
+ SSL_clear (pSSL);
+ SSL_free (pSSL);
+ }
+
+ delete Context;
+}
+
+
+
+/***********************
+SslBox_t::PutCiphertext
+***********************/
+
+bool SslBox_t::PutCiphertext (const char *buf, int bufsize)
+{
+ assert (buf && (bufsize > 0));
+
+ assert (pbioRead);
+ int n = BIO_write (pbioRead, buf, bufsize);
+
+ return (n == bufsize) ? true : false;
+}
+
+
+/**********************
+SslBox_t::GetPlaintext
+**********************/
+
+int SslBox_t::GetPlaintext (char *buf, int bufsize)
+{
+ if (!SSL_is_init_finished (pSSL)) {
+ int e = bIsServer ? SSL_accept (pSSL) : SSL_connect (pSSL);
+ if (e < 0) {
+ int er = SSL_get_error (pSSL, e);
+ if (er != SSL_ERROR_WANT_READ) {
+ // Return -1 for a nonfatal error, -2 for an error that should force the connection down.
+ return (er == SSL_ERROR_SSL) ? (-2) : (-1);
+ }
+ else
+ return 0;
+ }
+ // If handshake finished, FALL THROUGH and return the available plaintext.
+ }
+
+ if (!SSL_is_init_finished (pSSL)) {
+ // We can get here if a browser abandons a handshake.
+ // The user can see a warning dialog and abort the connection.
+ cerr << "<SSL_incomp>";
+ return 0;
+ }
+
+ //cerr << "CIPH: " << SSL_get_cipher (pSSL) << endl;
+
+ int n = SSL_read (pSSL, buf, bufsize);
+ if (n >= 0) {
+ return n;
+ }
+ else {
+ if (SSL_get_error (pSSL, n) == SSL_ERROR_WANT_READ) {
+ return 0;
+ }
+ else {
+ return -1;
+ }
+ }
+
+ return 0;
+}
+
+
+
+/**************************
+SslBox_t::CanGetCiphertext
+**************************/
+
+bool SslBox_t::CanGetCiphertext()
+{
+ assert (pbioWrite);
+ return BIO_pending (pbioWrite) ? true : false;
+}
+
+
+
+/***********************
+SslBox_t::GetCiphertext
+***********************/
+
+int SslBox_t::GetCiphertext (char *buf, int bufsize)
+{
+ assert (pbioWrite);
+ assert (buf && (bufsize > 0));
+
+ return BIO_read (pbioWrite, buf, bufsize);
+}
+
+
+
+/**********************
+SslBox_t::PutPlaintext
+**********************/
+
+int SslBox_t::PutPlaintext (const char *buf, int bufsize)
+{
+ // The caller will interpret the return value as the number of bytes written.
+ // WARNING WARNING WARNING, are there any situations in which a 0 or -1 return
+ // from SSL_write means we should immediately retry? The socket-machine loop
+ // will probably wait for a time-out cycle (perhaps a second) before re-trying.
+ // THIS WOULD CAUSE A PERCEPTIBLE DELAY!
+
+ /* We internally queue any outbound plaintext that can't be dispatched
+ * because we're in the middle of a handshake or something.
+ * When we get called, try to send any queued data first, and then
+ * send the caller's data (or queue it). We may get called with no outbound
+ * data, which means we try to send the outbound queue and that's all.
+ *
+ * Return >0 if we wrote any data, 0 if we didn't, and <0 for a fatal error.
+ * Note that if we return 0, the connection is still considered live
+ * and we are signalling that we have accepted the outbound data (if any).
+ */
+
+ OutboundQ.Push (buf, bufsize);
+
+ if (!SSL_is_init_finished (pSSL))
+ return 0;
+
+ bool fatal = false;
+ bool did_work = false;
+
+ while (OutboundQ.HasPages()) {
+ const char *page;
+ int length;
+ OutboundQ.Front (&page, &length);
+ assert (page && (length > 0));
+ int n = SSL_write (pSSL, page, length);
+ if (n > 0) {
+ did_work = true;
+ OutboundQ.PopFront();
+ }
+ else {
+ int er = SSL_get_error (pSSL, n);
+ if ((er != SSL_ERROR_WANT_READ) && (er != SSL_ERROR_WANT_WRITE))
+ fatal = true;
+ break;
+ }
+ }
+
+
+ if (did_work)
+ return 1;
+ else if (fatal)
+ return -1;
+ else
+ return 0;
+}
+
+
+#endif // WITH_SSL
+