summaryrefslogtreecommitdiff
path: root/gnu/java/nio/KqueueSelectorImpl.java
diff options
context:
space:
mode:
authorCasey Marshall <csm@gnu.org>2006-09-27 21:19:31 +0000
committerCasey Marshall <csm@gnu.org>2006-09-27 21:19:31 +0000
commitc145bf0ba7bbcf5a7430ab9381e6bae5e20ee434 (patch)
tree7b9a2d296d1e879a643410762383e08ff7085511 /gnu/java/nio/KqueueSelectorImpl.java
parentf43d2df6dab7bc73ce6c70cc142ab703a1482bc9 (diff)
downloadclasspath-c145bf0ba7bbcf5a7430ab9381e6bae5e20ee434.tar.gz
2006-09-27 Casey Marshall <csm@gnu.org>
* gnu/java/nio/KqueueSelectionKeyImpl.java: extend AbstractSelectionKey. (nstate, valid, readEverEnabled, writeEverEnabled): removed. (activeOps, fd): new fields. (cancel): removed. (interestOps): just call `selector.setInterestOps.' (isValid): removed. (toString): include native fd in output. (hashCode, equals, isReadActive, isReadInterested, isWriteActive, isWriteInterested, needCommitRead, needCommitWrite): new methods. * gnu/java/nio/KqueueSelectorImpl.java (MAX_DOUBLING_CAPACITY, CAP_INCREMENT, INITIAL_CAPACITY): new constants. (cancelled): removed. (events): new field. (OP_ACCEPT, OP_CONNECT, OP_READ, OP_WRITE): new constants. (toString, equals): new methods. (doSelect): get cancelled keys from superclass; fix synchronization; initialize events that need to be added/deleted only when selecting; ignore keys attached to closed channels. (register): fix key initialization; synchronize adding keys. (setInterestOps): new method. (updateOps, updateOps): removed. (reallocateBuffer): new method. (doCancel): removed. (kevent_set): add index, active ops parameters; remove delete parameter. (kevent): add output space size parameter. * include/gnu_java_nio_KqueueSelectorImpl.h: regenerated. * native/jni/java-nio/gnu_java_nio_KqueueSelectorImpl.c (Java_gnu_java_nio_KqueueSelectorImpl_kevent_1set): only fill in one filter, at the given index. (Java_gnu_java_nio_KqueueSelectorImpl_kevent): separate incoming event count and outgoing event count.
Diffstat (limited to 'gnu/java/nio/KqueueSelectorImpl.java')
-rw-r--r--gnu/java/nio/KqueueSelectorImpl.java314
1 files changed, 202 insertions, 112 deletions
diff --git a/gnu/java/nio/KqueueSelectorImpl.java b/gnu/java/nio/KqueueSelectorImpl.java
index dcb058eb5..eed86119c 100644
--- a/gnu/java/nio/KqueueSelectorImpl.java
+++ b/gnu/java/nio/KqueueSelectorImpl.java
@@ -43,6 +43,7 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.channels.ClosedSelectorException;
+import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.spi.AbstractSelectableChannel;
@@ -63,7 +64,10 @@ import java.util.Set;
public class KqueueSelectorImpl extends AbstractSelector
{
private static final int sizeof_struct_kevent;
-
+ private static final int MAX_DOUBLING_CAPACITY = 16384;
+ private static final int CAP_INCREMENT = 1024;
+ private static final int INITIAL_CAPACITY;
+
static
{
try
@@ -79,6 +83,7 @@ public class KqueueSelectorImpl extends AbstractSelector
sizeof_struct_kevent = sizeof_struct_kevent();
else
sizeof_struct_kevent = -1;
+ INITIAL_CAPACITY = 16 * sizeof_struct_kevent;
}
/**
@@ -94,15 +99,20 @@ public class KqueueSelectorImpl extends AbstractSelector
private HashMap/*<Integer,KqueueSelectionKeyImpl>*/ keys;
private HashSet/*<KqueueSelectionKeyImpl>*/ selected;
- private HashSet/*<KqueueSelectionKeyImpl>*/ cancelled;
private Thread blockedThread;
+ private ByteBuffer events;
+
+ private static final int OP_ACCEPT = SelectionKey.OP_ACCEPT;
+ private static final int OP_CONNECT = SelectionKey.OP_CONNECT;
+ private static final int OP_READ = SelectionKey.OP_READ;
+ private static final int OP_WRITE = SelectionKey.OP_WRITE;
public KqueueSelectorImpl(SelectorProvider provider) throws IOException
{
super(provider);
kq = implOpen();
keys = new HashMap/*<KqueueSelectionKeyImpl>*/();
- cancelled = new HashSet();
+ events = ByteBuffer.allocateDirect(INITIAL_CAPACITY);
}
protected void implCloseSelector() throws IOException
@@ -169,103 +179,140 @@ public class KqueueSelectorImpl extends AbstractSelector
return this;
}
- synchronized int doSelect(long timeout) throws IOException
+ public String toString()
{
- // FIXME -- I'm unclear on how we should synchronize this; and how to
- // handle cancelled keys.
- for (Iterator it = cancelled.iterator(); it.hasNext(); )
- {
- KqueueSelectionKeyImpl key = (KqueueSelectionKeyImpl) it.next();
- updateOps(key, 0, true);
- }
- int events_size = 0;
- for (Iterator it = keys.values().iterator(); it.hasNext(); )
+ return super.toString() + " [ fd: " + kq + " ]";
+ }
+
+ public boolean equals(Object o)
+ {
+ if (!(o instanceof KqueueSelectorImpl))
+ return false;
+
+ return ((KqueueSelectorImpl) o).kq == kq;
+ }
+
+ int doSelect(long timeout) throws IOException
+ {
+ Set cancelled = cancelledKeys();
+ synchronized (cancelled)
+ {
+ synchronized (keys)
{
- KqueueSelectionKeyImpl key = (KqueueSelectionKeyImpl) it.next();
- if ((key.interestOps & SelectionKey.OP_ACCEPT) != 0
- || (key.interestOps & SelectionKey.OP_READ) != 0)
- key.readEverEnabled = true;
- if ((key.interestOps & SelectionKey.OP_CONNECT) != 0
- || (key.interestOps & SelectionKey.OP_WRITE) != 0)
- key.writeEverEnabled = true;
+ for (Iterator it = cancelled.iterator(); it.hasNext(); )
+ {
+ KqueueSelectionKeyImpl key = (KqueueSelectionKeyImpl) it.next();
+ key.interestOps = 0;
+ }
- if (key.readEverEnabled)
- events_size += sizeof_struct_kevent;
- if (key.writeEverEnabled)
- events_size += sizeof_struct_kevent;
- }
+ int events_size = (2 * sizeof_struct_kevent) * keys.size();
+ int num_events = 0;
- // We handle native events a little strangely here; per selection key,
- // we allocate enough space for two struct kevents, the first in the
- // list will be our EVFILT_READ filter, the second our EVFILT_WRITE
- // one. If only one of the two needs enabling, though, we don't want
- // to pass the other to kevent, because that would result in spurious
- // events. We can break down our handling as follows:
- //
- // - READ enabled, WRITE never enabled. We pass only the first structure
- // to kevent.
- // - WRITE enabled, READ never enabled. Likewise, but only pass the
- // second structure.
- // - READ and WRITE enabled. Pass both.
- // - READ enabled, WRITE enabled in the past. Pass both, with the
- // first structure's flag set to EV_ADD or EV_ENABLE, and the second
- // with EV_DISABLE. It seems OK to keep sending events with the
- // EV_DISABLE flag.
- // - WRITE enabled, READ enabled in the past. Likewise, but flipped.
- //
- // We handle these states with the readEverEnabled and writeEverEnabled
- // flags of selection keys; they start off as false, and become true
- // the first time we select() with READ or WRITE enabled. They never
- // become false.
- ByteBuffer events = ByteBuffer.allocateDirect(events_size);
+ for (Iterator it = keys.entrySet().iterator(); it.hasNext(); )
+ {
+ Map.Entry e = (Map.Entry) it.next();
+ KqueueSelectionKeyImpl key = (KqueueSelectionKeyImpl) e.getValue();
+
+ SelectableChannel ch = key.channel();
+ if (ch instanceof VMChannelOwner)
+ {
+ if (!((VMChannelOwner) ch).getVMChannel().getState().isValid())
+ {
+ // closed channel; removed from kqueue automatically.
+ it.remove();
+ continue;
+ }
+ }
+
+ // If this key is registering a read filter, add it to the buffer.
+ if (key.needCommitRead())
+ {
+ kevent_set(events, num_events, key.fd,
+ key.interestOps & (OP_READ | OP_ACCEPT),
+ key.activeOps & (OP_READ | OP_ACCEPT), key.key);
+ num_events++;
+ }
+
+ // If this key is registering a write filter, add it to the buffer.
+ if (key.needCommitWrite())
+ {
+ kevent_set(events, num_events, key.fd,
+ key.interestOps & (OP_WRITE | OP_CONNECT),
+ key.activeOps & (OP_WRITE | OP_CONNECT), key.key);
+ num_events++;
+ }
+ }
+ events.rewind().limit(events.capacity());
- for (Iterator it = keys.entrySet().iterator(); it.hasNext(); )
- {
- Map.Entry e = (Map.Entry) it.next();
- KqueueSelectionKeyImpl key = (KqueueSelectionKeyImpl) e.getValue();
+ //System.out.println("dump of keys to select:");
+ //dump_selection_keys(events.duplicate());
+
+ int n = 0;
+ try
+ {
+ //System.out.println("[" + kq + "] kevent enter selecting from " + keys.size());
+ begin();
+ blockedThread = Thread.currentThread();
+ if (blockedThread.isInterrupted())
+ timeout = 0;
+ n = kevent(kq, events, num_events,
+ events.capacity() / sizeof_struct_kevent, timeout);
+ }
+ finally
+ {
+ end();
+ blockedThread = null;
+ Thread.interrupted();
+ //System.out.println("[" + kq + "kevent exit selected " + n);
+ }
+
+ //System.out.println("dump of keys selected:");
+ //dump_selection_keys((ByteBuffer) events.duplicate().limit(n * sizeof_struct_kevent));
- if (key.readEverEnabled)
- events.put((ByteBuffer) key.nstate.duplicate().limit
- (sizeof_struct_kevent));
- if (key.writeEverEnabled)
- events.put((ByteBuffer) key.nstate.duplicate().position
- (sizeof_struct_kevent).limit(2 * sizeof_struct_kevent));
- }
- events.rewind();
-
- //System.out.println("dump of keys to select:");
- //dump_selection_keys(events.duplicate());
-
- blockedThread = Thread.currentThread();
- if (blockedThread.isInterrupted())
- timeout = 0;
- final int n = kevent(kq, events, events_size / sizeof_struct_kevent,
- timeout);
- Thread.interrupted();
-
- //System.out.println("dump of keys selected:");
- //dump_selection_keys((ByteBuffer) events.duplicate().limit(n * sizeof_struct_kevent));
+ // Commit the operations we've just added in the call to kevent.
+ for (Iterator it = keys.values().iterator(); it.hasNext(); )
+ {
+ KqueueSelectionKeyImpl key = (KqueueSelectionKeyImpl) it.next();
+ key.activeOps = key.interestOps;
+ }
- selected = new HashSet/*<KqueueSelectionKeyImpl>*/(n);
- int x = 0;
- for (int i = 0; i < n; i++)
- {
- events.position(x).limit(x + sizeof_struct_kevent);
- x += sizeof_struct_kevent;
- int y = fetch_key(events.slice());
- KqueueSelectionKeyImpl key =
- (KqueueSelectionKeyImpl) keys.get(new Integer(y));
- key.readyOps = ready_ops(events.slice(), key.interestOps);
- selected.add(key);
- }
- for (Iterator it = cancelled.iterator(); it.hasNext(); )
- {
- KqueueSelectionKeyImpl key = (KqueueSelectionKeyImpl) it.next();
- keys.remove(new Integer(key.key));
- it.remove();
- }
+ selected = new HashSet/*<KqueueSelectionKeyImpl>*/(n);
+ int x = 0;
+ for (int i = 0; i < n; i++)
+ {
+ events.position(x).limit(x + sizeof_struct_kevent);
+ x += sizeof_struct_kevent;
+ int y = fetch_key(events.slice());
+ KqueueSelectionKeyImpl key =
+ (KqueueSelectionKeyImpl) keys.get(new Integer(y));
+
+ if (key == null)
+ {
+ System.out.println("WARNING! no key found for selected key " + y);
+ continue;
+ }
+ // Keys that have been cancelled may be returned here; don't
+ // add them to the selected set.
+ if (!key.isValid())
+ continue;
+ key.readyOps = ready_ops(events.slice(), key.interestOps);
+ selected.add(key);
+ }
+
+ // Finally, remove the cancelled keys.
+ for (Iterator it = cancelled.iterator(); it.hasNext(); )
+ {
+ KqueueSelectionKeyImpl key = (KqueueSelectionKeyImpl) it.next();
+ keys.remove(new Integer(key.key));
+ deregister(key);
+ it.remove();
+ }
- return selected.size();
+ reallocateBuffer();
+
+ return selected.size();
+ }
+ }
}
protected SelectionKey register(AbstractSelectableChannel channel,
@@ -290,36 +337,78 @@ public class KqueueSelectorImpl extends AbstractSelector
KqueueSelectionKeyImpl result = new KqueueSelectionKeyImpl(this, channel);
result.interestOps = interestOps;
result.attach(attachment);
- int k = System.identityHashCode(result);
- while (keys.containsKey(new Integer(k)))
- k++;
- result.key = k;
- keys.put(new Integer(k), result);
- result.nstate = ByteBuffer.allocateDirect(2 * sizeof_struct_kevent);
- updateOps(result, native_fd, false);
+ result.fd = native_fd;
+ result.key = System.identityHashCode(result);
+ synchronized (keys)
+ {
+ while (keys.containsKey(new Integer(result.key)))
+ result.key++;
+ keys.put(new Integer(result.key), result);
+ reallocateBuffer();
+ }
return result;
}
- synchronized void updateOps(KqueueSelectionKeyImpl key)
+ void setInterestOps(KqueueSelectionKeyImpl key, int ops)
{
- updateOps(key, 0, false);
+ synchronized (keys)
+ {
+ key.interestOps = ops;
+ }
}
- synchronized void updateOps(KqueueSelectionKeyImpl key, int fd, boolean delete)
+ /**
+ * Reallocate the events buffer. This is the destination buffer for
+ * events returned by kevent. This method will:
+ *
+ * * Grow the buffer if there is insufficent space for all registered
+ * events.
+ * * Shrink the buffer if it is more than twice the size needed.
+ *
+ */
+ private void reallocateBuffer()
+ {
+ synchronized (keys)
+ {
+ if (events.capacity() < (2 * sizeof_struct_kevent) * keys.size())
+ {
+ int cap = events.capacity();
+ if (cap >= MAX_DOUBLING_CAPACITY)
+ cap += CAP_INCREMENT;
+ else
+ cap = cap << 1;
+
+ events = ByteBuffer.allocateDirect(cap);
+ }
+ else if (events.capacity() > 4 * (sizeof_struct_kevent) * keys.size() + 1
+ && events.capacity() > INITIAL_CAPACITY)
+ {
+ int cap = events.capacity();
+ cap = cap >>> 1;
+ events = ByteBuffer.allocateDirect(cap);
+ }
+ }
+ }
+
+ //synchronized void updateOps(KqueueSelectionKeyImpl key, int interestOps)
+ //{
+ // updateOps(key, interestOps, 0, false);
+ //}
+
+ /*void updateOps(KqueueSelectionKeyImpl key, int interestOps,
+ int activeOps, int fd)
{
//System.out.println(">> updating kqueue selection key:");
//dump_selection_keys(key.nstate.duplicate());
//System.out.println("<<");
- kevent_set(key.nstate, fd, key.interestOps, key.key, delete);
+ synchronized (keys)
+ {
+ kevent_set(key.nstate, fd, interestOps, activeOps, key.key);
+ }
//System.out.println(">> updated kqueue selection key:");
//dump_selection_keys(key.nstate.duplicate());
//System.out.println("<<");
- }
-
- synchronized void doCancel(KqueueSelectionKeyImpl key)
- {
- cancelled.add(key);
- }
+ }*/
private void dump_selection_keys(ByteBuffer keys)
{
@@ -380,8 +469,8 @@ public class KqueueSelectorImpl extends AbstractSelector
* @param delete Set to true if this event should be deleted from the
* kqueue (if false, this event is added/updated).
*/
- private static native void kevent_set(ByteBuffer nstate, int fd, int interestOps,
- int key, boolean delete);
+ private static native void kevent_set(ByteBuffer nstate, int i, int fd,
+ int interestOps, int activeOps, int key);
/**
* Poll for events. The source events are stored in <code>events</code>,
@@ -391,12 +480,13 @@ public class KqueueSelectorImpl extends AbstractSelector
* for events read from the queue.
* @param nevents The number of events to poll (that is, the number of
* events in the <code>events</code> buffer).
+ * @param nout The maximum number of events that may be returned.
* @param timeout The timeout. A timeout of -1 returns immediately; a timeout
* of 0 waits indefinitely.
* @return The number of events read.
*/
private static native int kevent(int kq, ByteBuffer events, int nevents,
- long timeout);
+ int nout, long timeout);
/**
* Fetch a polled key from a native state buffer. For each kevent key we