diff options
| author | Casey Marshall <csm@gnu.org> | 2006-09-27 21:19:31 +0000 |
|---|---|---|
| committer | Casey Marshall <csm@gnu.org> | 2006-09-27 21:19:31 +0000 |
| commit | c145bf0ba7bbcf5a7430ab9381e6bae5e20ee434 (patch) | |
| tree | 7b9a2d296d1e879a643410762383e08ff7085511 /gnu/java/nio/KqueueSelectorImpl.java | |
| parent | f43d2df6dab7bc73ce6c70cc142ab703a1482bc9 (diff) | |
| download | classpath-c145bf0ba7bbcf5a7430ab9381e6bae5e20ee434.tar.gz | |
2006-09-27 Casey Marshall <csm@gnu.org>
* gnu/java/nio/KqueueSelectionKeyImpl.java: extend
AbstractSelectionKey.
(nstate, valid, readEverEnabled, writeEverEnabled): removed.
(activeOps, fd): new fields.
(cancel): removed.
(interestOps): just call `selector.setInterestOps.'
(isValid): removed.
(toString): include native fd in output.
(hashCode, equals, isReadActive, isReadInterested, isWriteActive,
isWriteInterested, needCommitRead, needCommitWrite): new methods.
* gnu/java/nio/KqueueSelectorImpl.java (MAX_DOUBLING_CAPACITY,
CAP_INCREMENT, INITIAL_CAPACITY): new constants.
(cancelled): removed.
(events): new field.
(OP_ACCEPT, OP_CONNECT, OP_READ, OP_WRITE): new constants.
(toString, equals): new methods.
(doSelect): get cancelled keys from superclass; fix synchronization;
initialize events that need to be added/deleted only when selecting;
ignore keys attached to closed channels.
(register): fix key initialization; synchronize adding keys.
(setInterestOps): new method.
(updateOps, updateOps): removed.
(reallocateBuffer): new method.
(doCancel): removed.
(kevent_set): add index, active ops parameters; remove delete
parameter.
(kevent): add output space size parameter.
* include/gnu_java_nio_KqueueSelectorImpl.h: regenerated.
* native/jni/java-nio/gnu_java_nio_KqueueSelectorImpl.c
(Java_gnu_java_nio_KqueueSelectorImpl_kevent_1set): only fill in
one filter, at the given index.
(Java_gnu_java_nio_KqueueSelectorImpl_kevent): separate incoming
event count and outgoing event count.
Diffstat (limited to 'gnu/java/nio/KqueueSelectorImpl.java')
| -rw-r--r-- | gnu/java/nio/KqueueSelectorImpl.java | 314 |
1 files changed, 202 insertions, 112 deletions
diff --git a/gnu/java/nio/KqueueSelectorImpl.java b/gnu/java/nio/KqueueSelectorImpl.java index dcb058eb5..eed86119c 100644 --- a/gnu/java/nio/KqueueSelectorImpl.java +++ b/gnu/java/nio/KqueueSelectorImpl.java @@ -43,6 +43,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.nio.ByteOrder; import java.nio.channels.ClosedSelectorException; +import java.nio.channels.SelectableChannel; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.spi.AbstractSelectableChannel; @@ -63,7 +64,10 @@ import java.util.Set; public class KqueueSelectorImpl extends AbstractSelector { private static final int sizeof_struct_kevent; - + private static final int MAX_DOUBLING_CAPACITY = 16384; + private static final int CAP_INCREMENT = 1024; + private static final int INITIAL_CAPACITY; + static { try @@ -79,6 +83,7 @@ public class KqueueSelectorImpl extends AbstractSelector sizeof_struct_kevent = sizeof_struct_kevent(); else sizeof_struct_kevent = -1; + INITIAL_CAPACITY = 16 * sizeof_struct_kevent; } /** @@ -94,15 +99,20 @@ public class KqueueSelectorImpl extends AbstractSelector private HashMap/*<Integer,KqueueSelectionKeyImpl>*/ keys; private HashSet/*<KqueueSelectionKeyImpl>*/ selected; - private HashSet/*<KqueueSelectionKeyImpl>*/ cancelled; private Thread blockedThread; + private ByteBuffer events; + + private static final int OP_ACCEPT = SelectionKey.OP_ACCEPT; + private static final int OP_CONNECT = SelectionKey.OP_CONNECT; + private static final int OP_READ = SelectionKey.OP_READ; + private static final int OP_WRITE = SelectionKey.OP_WRITE; public KqueueSelectorImpl(SelectorProvider provider) throws IOException { super(provider); kq = implOpen(); keys = new HashMap/*<KqueueSelectionKeyImpl>*/(); - cancelled = new HashSet(); + events = ByteBuffer.allocateDirect(INITIAL_CAPACITY); } protected void implCloseSelector() throws IOException @@ -169,103 +179,140 @@ public class KqueueSelectorImpl extends AbstractSelector return this; } - synchronized int doSelect(long timeout) throws IOException + public String toString() { - // FIXME -- I'm unclear on how we should synchronize this; and how to - // handle cancelled keys. - for (Iterator it = cancelled.iterator(); it.hasNext(); ) - { - KqueueSelectionKeyImpl key = (KqueueSelectionKeyImpl) it.next(); - updateOps(key, 0, true); - } - int events_size = 0; - for (Iterator it = keys.values().iterator(); it.hasNext(); ) + return super.toString() + " [ fd: " + kq + " ]"; + } + + public boolean equals(Object o) + { + if (!(o instanceof KqueueSelectorImpl)) + return false; + + return ((KqueueSelectorImpl) o).kq == kq; + } + + int doSelect(long timeout) throws IOException + { + Set cancelled = cancelledKeys(); + synchronized (cancelled) + { + synchronized (keys) { - KqueueSelectionKeyImpl key = (KqueueSelectionKeyImpl) it.next(); - if ((key.interestOps & SelectionKey.OP_ACCEPT) != 0 - || (key.interestOps & SelectionKey.OP_READ) != 0) - key.readEverEnabled = true; - if ((key.interestOps & SelectionKey.OP_CONNECT) != 0 - || (key.interestOps & SelectionKey.OP_WRITE) != 0) - key.writeEverEnabled = true; + for (Iterator it = cancelled.iterator(); it.hasNext(); ) + { + KqueueSelectionKeyImpl key = (KqueueSelectionKeyImpl) it.next(); + key.interestOps = 0; + } - if (key.readEverEnabled) - events_size += sizeof_struct_kevent; - if (key.writeEverEnabled) - events_size += sizeof_struct_kevent; - } + int events_size = (2 * sizeof_struct_kevent) * keys.size(); + int num_events = 0; - // We handle native events a little strangely here; per selection key, - // we allocate enough space for two struct kevents, the first in the - // list will be our EVFILT_READ filter, the second our EVFILT_WRITE - // one. If only one of the two needs enabling, though, we don't want - // to pass the other to kevent, because that would result in spurious - // events. We can break down our handling as follows: - // - // - READ enabled, WRITE never enabled. We pass only the first structure - // to kevent. - // - WRITE enabled, READ never enabled. Likewise, but only pass the - // second structure. - // - READ and WRITE enabled. Pass both. - // - READ enabled, WRITE enabled in the past. Pass both, with the - // first structure's flag set to EV_ADD or EV_ENABLE, and the second - // with EV_DISABLE. It seems OK to keep sending events with the - // EV_DISABLE flag. - // - WRITE enabled, READ enabled in the past. Likewise, but flipped. - // - // We handle these states with the readEverEnabled and writeEverEnabled - // flags of selection keys; they start off as false, and become true - // the first time we select() with READ or WRITE enabled. They never - // become false. - ByteBuffer events = ByteBuffer.allocateDirect(events_size); + for (Iterator it = keys.entrySet().iterator(); it.hasNext(); ) + { + Map.Entry e = (Map.Entry) it.next(); + KqueueSelectionKeyImpl key = (KqueueSelectionKeyImpl) e.getValue(); + + SelectableChannel ch = key.channel(); + if (ch instanceof VMChannelOwner) + { + if (!((VMChannelOwner) ch).getVMChannel().getState().isValid()) + { + // closed channel; removed from kqueue automatically. + it.remove(); + continue; + } + } + + // If this key is registering a read filter, add it to the buffer. + if (key.needCommitRead()) + { + kevent_set(events, num_events, key.fd, + key.interestOps & (OP_READ | OP_ACCEPT), + key.activeOps & (OP_READ | OP_ACCEPT), key.key); + num_events++; + } + + // If this key is registering a write filter, add it to the buffer. + if (key.needCommitWrite()) + { + kevent_set(events, num_events, key.fd, + key.interestOps & (OP_WRITE | OP_CONNECT), + key.activeOps & (OP_WRITE | OP_CONNECT), key.key); + num_events++; + } + } + events.rewind().limit(events.capacity()); - for (Iterator it = keys.entrySet().iterator(); it.hasNext(); ) - { - Map.Entry e = (Map.Entry) it.next(); - KqueueSelectionKeyImpl key = (KqueueSelectionKeyImpl) e.getValue(); + //System.out.println("dump of keys to select:"); + //dump_selection_keys(events.duplicate()); + + int n = 0; + try + { + //System.out.println("[" + kq + "] kevent enter selecting from " + keys.size()); + begin(); + blockedThread = Thread.currentThread(); + if (blockedThread.isInterrupted()) + timeout = 0; + n = kevent(kq, events, num_events, + events.capacity() / sizeof_struct_kevent, timeout); + } + finally + { + end(); + blockedThread = null; + Thread.interrupted(); + //System.out.println("[" + kq + "kevent exit selected " + n); + } + + //System.out.println("dump of keys selected:"); + //dump_selection_keys((ByteBuffer) events.duplicate().limit(n * sizeof_struct_kevent)); - if (key.readEverEnabled) - events.put((ByteBuffer) key.nstate.duplicate().limit - (sizeof_struct_kevent)); - if (key.writeEverEnabled) - events.put((ByteBuffer) key.nstate.duplicate().position - (sizeof_struct_kevent).limit(2 * sizeof_struct_kevent)); - } - events.rewind(); - - //System.out.println("dump of keys to select:"); - //dump_selection_keys(events.duplicate()); - - blockedThread = Thread.currentThread(); - if (blockedThread.isInterrupted()) - timeout = 0; - final int n = kevent(kq, events, events_size / sizeof_struct_kevent, - timeout); - Thread.interrupted(); - - //System.out.println("dump of keys selected:"); - //dump_selection_keys((ByteBuffer) events.duplicate().limit(n * sizeof_struct_kevent)); + // Commit the operations we've just added in the call to kevent. + for (Iterator it = keys.values().iterator(); it.hasNext(); ) + { + KqueueSelectionKeyImpl key = (KqueueSelectionKeyImpl) it.next(); + key.activeOps = key.interestOps; + } - selected = new HashSet/*<KqueueSelectionKeyImpl>*/(n); - int x = 0; - for (int i = 0; i < n; i++) - { - events.position(x).limit(x + sizeof_struct_kevent); - x += sizeof_struct_kevent; - int y = fetch_key(events.slice()); - KqueueSelectionKeyImpl key = - (KqueueSelectionKeyImpl) keys.get(new Integer(y)); - key.readyOps = ready_ops(events.slice(), key.interestOps); - selected.add(key); - } - for (Iterator it = cancelled.iterator(); it.hasNext(); ) - { - KqueueSelectionKeyImpl key = (KqueueSelectionKeyImpl) it.next(); - keys.remove(new Integer(key.key)); - it.remove(); - } + selected = new HashSet/*<KqueueSelectionKeyImpl>*/(n); + int x = 0; + for (int i = 0; i < n; i++) + { + events.position(x).limit(x + sizeof_struct_kevent); + x += sizeof_struct_kevent; + int y = fetch_key(events.slice()); + KqueueSelectionKeyImpl key = + (KqueueSelectionKeyImpl) keys.get(new Integer(y)); + + if (key == null) + { + System.out.println("WARNING! no key found for selected key " + y); + continue; + } + // Keys that have been cancelled may be returned here; don't + // add them to the selected set. + if (!key.isValid()) + continue; + key.readyOps = ready_ops(events.slice(), key.interestOps); + selected.add(key); + } + + // Finally, remove the cancelled keys. + for (Iterator it = cancelled.iterator(); it.hasNext(); ) + { + KqueueSelectionKeyImpl key = (KqueueSelectionKeyImpl) it.next(); + keys.remove(new Integer(key.key)); + deregister(key); + it.remove(); + } - return selected.size(); + reallocateBuffer(); + + return selected.size(); + } + } } protected SelectionKey register(AbstractSelectableChannel channel, @@ -290,36 +337,78 @@ public class KqueueSelectorImpl extends AbstractSelector KqueueSelectionKeyImpl result = new KqueueSelectionKeyImpl(this, channel); result.interestOps = interestOps; result.attach(attachment); - int k = System.identityHashCode(result); - while (keys.containsKey(new Integer(k))) - k++; - result.key = k; - keys.put(new Integer(k), result); - result.nstate = ByteBuffer.allocateDirect(2 * sizeof_struct_kevent); - updateOps(result, native_fd, false); + result.fd = native_fd; + result.key = System.identityHashCode(result); + synchronized (keys) + { + while (keys.containsKey(new Integer(result.key))) + result.key++; + keys.put(new Integer(result.key), result); + reallocateBuffer(); + } return result; } - synchronized void updateOps(KqueueSelectionKeyImpl key) + void setInterestOps(KqueueSelectionKeyImpl key, int ops) { - updateOps(key, 0, false); + synchronized (keys) + { + key.interestOps = ops; + } } - synchronized void updateOps(KqueueSelectionKeyImpl key, int fd, boolean delete) + /** + * Reallocate the events buffer. This is the destination buffer for + * events returned by kevent. This method will: + * + * * Grow the buffer if there is insufficent space for all registered + * events. + * * Shrink the buffer if it is more than twice the size needed. + * + */ + private void reallocateBuffer() + { + synchronized (keys) + { + if (events.capacity() < (2 * sizeof_struct_kevent) * keys.size()) + { + int cap = events.capacity(); + if (cap >= MAX_DOUBLING_CAPACITY) + cap += CAP_INCREMENT; + else + cap = cap << 1; + + events = ByteBuffer.allocateDirect(cap); + } + else if (events.capacity() > 4 * (sizeof_struct_kevent) * keys.size() + 1 + && events.capacity() > INITIAL_CAPACITY) + { + int cap = events.capacity(); + cap = cap >>> 1; + events = ByteBuffer.allocateDirect(cap); + } + } + } + + //synchronized void updateOps(KqueueSelectionKeyImpl key, int interestOps) + //{ + // updateOps(key, interestOps, 0, false); + //} + + /*void updateOps(KqueueSelectionKeyImpl key, int interestOps, + int activeOps, int fd) { //System.out.println(">> updating kqueue selection key:"); //dump_selection_keys(key.nstate.duplicate()); //System.out.println("<<"); - kevent_set(key.nstate, fd, key.interestOps, key.key, delete); + synchronized (keys) + { + kevent_set(key.nstate, fd, interestOps, activeOps, key.key); + } //System.out.println(">> updated kqueue selection key:"); //dump_selection_keys(key.nstate.duplicate()); //System.out.println("<<"); - } - - synchronized void doCancel(KqueueSelectionKeyImpl key) - { - cancelled.add(key); - } + }*/ private void dump_selection_keys(ByteBuffer keys) { @@ -380,8 +469,8 @@ public class KqueueSelectorImpl extends AbstractSelector * @param delete Set to true if this event should be deleted from the * kqueue (if false, this event is added/updated). */ - private static native void kevent_set(ByteBuffer nstate, int fd, int interestOps, - int key, boolean delete); + private static native void kevent_set(ByteBuffer nstate, int i, int fd, + int interestOps, int activeOps, int key); /** * Poll for events. The source events are stored in <code>events</code>, @@ -391,12 +480,13 @@ public class KqueueSelectorImpl extends AbstractSelector * for events read from the queue. * @param nevents The number of events to poll (that is, the number of * events in the <code>events</code> buffer). + * @param nout The maximum number of events that may be returned. * @param timeout The timeout. A timeout of -1 returns immediately; a timeout * of 0 waits indefinitely. * @return The number of events read. */ private static native int kevent(int kq, ByteBuffer events, int nevents, - long timeout); + int nout, long timeout); /** * Fetch a polled key from a native state buffer. For each kevent key we |
