summaryrefslogtreecommitdiff
path: root/gnu/java/nio/SelectorImpl.java
diff options
context:
space:
mode:
authorMichael Koch <konqueror@gmx.de>2004-01-08 08:43:37 +0000
committerMichael Koch <konqueror@gmx.de>2004-01-08 08:43:37 +0000
commit19121c0aff2d3716b24a75beecbc2a2ffd8d02b9 (patch)
tree8a6d1ea679c5af317c1d00f878b780105968f171 /gnu/java/nio/SelectorImpl.java
parentd5cc979b2a38006ec787e404cb6c9f22404c6af5 (diff)
downloadclasspath-19121c0aff2d3716b24a75beecbc2a2ffd8d02b9.tar.gz
2004-01-08 Mohan Embar <gnustuff@thisiscool.com>
* gnu/java/nio/SelectorImpl.java (selectThreadMutex): New field. (selectThread): New field. (unhandledWakeup): New field. (implCloseSelector): Added skeleton code which synchronizes as per Sun JRE JavaDoc. (keys): Throw ClosedSelectorException if selector is closed. (selectNow): Added comment that we're faking out an immediate select with a one-microsecond-timeout one. (select): Use 0 instead of -1 for infinite timeout. (implSelect): Changed comment in declaration. (select): Added synchronized to method declaration. Added synchronization and wakeup support as per Sun JRE JavaDoc. (selectedKeys): Throw ClosedSelectorException if selector is closed. (wakeup): Implemented. (deregisterCancelledKeys): Synchronize on cancelled key set before deregistering. (register): Synchronize on key set before registering.
Diffstat (limited to 'gnu/java/nio/SelectorImpl.java')
-rw-r--r--gnu/java/nio/SelectorImpl.java313
1 files changed, 222 insertions, 91 deletions
diff --git a/gnu/java/nio/SelectorImpl.java b/gnu/java/nio/SelectorImpl.java
index 55eec2304..f26e08080 100644
--- a/gnu/java/nio/SelectorImpl.java
+++ b/gnu/java/nio/SelectorImpl.java
@@ -65,12 +65,34 @@ public class SelectorImpl extends AbstractSelector
private Set keys;
private Set selected;
+ /**
+ * A dummy object whose monitor regulates access to both our
+ * selectThread and unhandledWakeup fields.
+ */
+ private Object selectThreadMutex = new Object ();
+
+ /**
+ * Any thread that's currently blocked in a select operation.
+ */
+ private Thread selectThread;
+
+ /**
+ * Indicates whether we have an unhandled wakeup call. This can
+ * be due to either wakeup() triggering a thread interruption while
+ * a thread was blocked in a select operation (in which case we need
+ * to reset this thread's interrupt status after interrupting the
+ * select), or else that no thread was on a select operation at the
+ * time that wakeup() was called, in which case the following select()
+ * operation should return immediately with nothing selected.
+ */
+ private boolean unhandledWakeup;
+
public SelectorImpl (SelectorProvider provider)
{
super (provider);
- keys = new HashSet();
- selected = new HashSet();
+ keys = new HashSet ();
+ selected = new HashSet ();
}
protected void finalize() throws Throwable
@@ -81,61 +103,78 @@ public class SelectorImpl extends AbstractSelector
protected final void implCloseSelector()
throws IOException
{
- // FIXME: We surely need to do more here.
+ // Cancel any pending select operation.
wakeup();
+
+ synchronized (keys)
+ {
+ synchronized (selected)
+ {
+ synchronized (cancelledKeys ())
+ {
+ // FIXME: Release resources here.
+ }
+ }
+ }
}
public final Set keys()
{
+ if (!isOpen())
+ throw new ClosedSelectorException();
+
return Collections.unmodifiableSet (keys);
}
public final int selectNow()
throws IOException
{
+ // FIXME: We're simulating an immediate select
+ // via a select with a timeout of one millisecond.
return select (1);
}
public final int select()
throws IOException
{
- return select (-1);
+ return select (0);
}
- // A timeout value of -1 means block forever.
+ // A timeout value of 0 means block forever.
private static native int implSelect (int[] read, int[] write,
- int[] except, long timeout);
+ int[] except, long timeout)
+ throws IOException;
private final int[] getFDsAsArray (int ops)
{
int[] result;
int counter = 0;
- Iterator it = keys.iterator();
+ Iterator it = keys.iterator ();
// Count the number of file descriptors needed
- while (it.hasNext())
+ while (it.hasNext ())
{
- SelectionKeyImpl key = (SelectionKeyImpl) it.next();
+ SelectionKeyImpl key = (SelectionKeyImpl) it.next ();
- if ((key.interestOps() & ops) != 0)
+ if ((key.interestOps () & ops) != 0)
{
counter++;
}
}
- result = new int [counter];
+ result = new int[counter];
counter = 0;
- it = keys.iterator();
+ it = keys.iterator ();
// Fill the array with the file descriptors
- while (it.hasNext())
+ while (it.hasNext ())
{
- SelectionKeyImpl key = (SelectionKeyImpl) it.next();
+ SelectionKeyImpl key = (SelectionKeyImpl) it.next ();
- if ((key.interestOps() & ops) != 0)
+ if ((key.interestOps () & ops) != 0)
{
- result [counter] = key.getNativeFD();
+ result[counter] = key.getNativeFD();
counter++;
}
}
@@ -143,111 +182,199 @@ public class SelectorImpl extends AbstractSelector
return result;
}
- public int select (long timeout)
+ public synchronized int select (long timeout)
+ throws IOException
{
if (!isOpen())
throw new ClosedSelectorException();
-
- if (keys == null)
- {
- return 0;
- }
-
- deregisterCancelledKeys();
-
- // Set only keys with the needed interest ops into the arrays.
- int[] read = getFDsAsArray (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT);
- int[] write = getFDsAsArray (SelectionKey.OP_WRITE | SelectionKey.OP_CONNECT);
- int[] except = new int [0]; // FIXME: We dont need to check this yet
- int anzahl = read.length + write.length + except.length;
-
- // Call the native select() on all file descriptors.
- begin();
- int result = implSelect (read, write, except, timeout);
- end();
-
- Iterator it = keys.iterator();
-
- while (it.hasNext())
+
+ synchronized (keys)
{
- int ops = 0;
- SelectionKeyImpl key = (SelectionKeyImpl) it.next();
-
- // If key is already selected retrieve old ready ops.
- if (selected.contains (key))
- {
- ops = key.readyOps();
- }
-
- // Set new ready read/accept ops
- for (int i = 0; i < read.length; i++)
+ synchronized (selected)
{
- if (key.getNativeFD() == read [i])
+ deregisterCancelledKeys();
+
+ // Set only keys with the needed interest ops into the arrays.
+ int[] read = getFDsAsArray (SelectionKey.OP_READ
+ | SelectionKey.OP_ACCEPT);
+ int[] write = getFDsAsArray (SelectionKey.OP_WRITE
+ | SelectionKey.OP_CONNECT);
+
+ // FIXME: We dont need to check this yet
+ int[] except = new int [0];
+
+ // Test to see if we've got an unhandled wakeup call,
+ // in which case we return immediately. Otherwise,
+ // remember our current thread and jump into the select.
+ // The monitor for dummy object selectThreadMutex regulates
+ // access to these fields.
+
+ // FIXME: Not sure from the spec at what point we should
+ // return "immediately". Is it here or immediately upon
+ // entry to this function?
+
+ // NOTE: There's a possibility of another thread calling
+ // wakeup() immediately after our thread releases
+ // selectThreadMutex's monitor here, in which case we'll
+ // do the select anyway. Since calls to wakeup() and select()
+ // among different threads happen in non-deterministic order,
+ // I don't think this is an issue.
+ synchronized (selectThreadMutex)
{
- if (key.channel() instanceof ServerSocketChannelImpl)
+ if (unhandledWakeup)
{
- ops = ops | SelectionKey.OP_ACCEPT;
+ unhandledWakeup = false;
+ return 0;
}
else
{
- ops = ops | SelectionKey.OP_READ;
+ selectThread = Thread.currentThread ();
}
}
- }
- // Set new ready write ops
- for (int i = 0; i < write.length; i++)
- {
- if (key.getNativeFD() == write [i])
+ // Call the native select() on all file descriptors.
+ int result = 0;
+ try
{
- ops = ops | SelectionKey.OP_WRITE;
-
-// if (key.channel().isConnected())
-// {
-// ops = ops | SelectionKey.OP_WRITE;
-// }
-// else
-// {
-// ops = ops | SelectionKey.OP_CONNECT;
-// }
- }
- }
+ begin();
+ result = implSelect (read, write, except, timeout);
+ }
+ finally
+ {
+ end();
+ }
+
+ // If our unhandled wakeup flag is set at this point,
+ // reset our thread's interrupt flag because we were
+ // awakened by wakeup() instead of an external thread
+ // interruption.
+ //
+ // NOTE: If we were blocked in a select() and one thread
+ // called Thread.interrupt() on the blocked thread followed
+ // by another thread calling Selector.wakeup(), then race
+ // conditions could make it so that the thread's interrupt
+ // flag is reset even though the Thread.interrupt() call
+ // "was there first". I don't think we need to care about
+ // this scenario.
+ synchronized (selectThreadMutex)
+ {
+ if (unhandledWakeup)
+ {
+ unhandledWakeup = false;
+ selectThread.interrupted ();
+ }
+ selectThread = null;
+ }
- // FIXME: We dont handle exceptional file descriptors yet.
+ Iterator it = keys.iterator ();
- // If key is not yet selected add it.
- if (!selected.contains (key))
- {
- selected.add (key);
- }
+ while (it.hasNext ())
+ {
+ int ops = 0;
+ SelectionKeyImpl key = (SelectionKeyImpl) it.next ();
- // Set new ready ops
- key.readyOps (key.interestOps() & ops);
- }
+ // If key is already selected retrieve old ready ops.
+ if (selected.contains (key))
+ {
+ ops = key.readyOps ();
+ }
- deregisterCancelledKeys();
- return result;
+ // Set new ready read/accept ops
+ for (int i = 0; i < read.length; i++)
+ {
+ if (key.getNativeFD() == read[i])
+ {
+ if (key.channel () instanceof ServerSocketChannelImpl)
+ {
+ ops = ops | SelectionKey.OP_ACCEPT;
+ }
+ else
+ {
+ ops = ops | SelectionKey.OP_READ;
+ }
+ }
+ }
+
+ // Set new ready write ops
+ for (int i = 0; i < write.length; i++)
+ {
+ if (key.getNativeFD() == write[i])
+ {
+ ops = ops | SelectionKey.OP_WRITE;
+
+ // if (key.channel ().isConnected ())
+ // {
+ // ops = ops | SelectionKey.OP_WRITE;
+ // }
+ // else
+ // {
+ // ops = ops | SelectionKey.OP_CONNECT;
+ // }
+ }
+ }
+
+ // FIXME: We dont handle exceptional file descriptors yet.
+
+ // If key is not yet selected add it.
+ if (!selected.contains (key))
+ {
+ selected.add (key);
+ }
+
+ // Set new ready ops
+ key.readyOps (key.interestOps () & ops);
+ }
+ deregisterCancelledKeys();
+
+ return result;
+ }
+ }
}
public final Set selectedKeys()
{
+ if (!isOpen())
+ throw new ClosedSelectorException();
+
return selected;
}
public final Selector wakeup()
{
- return null;
+ // IMPLEMENTATION NOTE: Whereas the specification says that
+ // thread interruption should trigger a call to wakeup, we
+ // do the reverse under the covers: wakeup triggers a thread
+ // interrupt followed by a subsequent reset of the thread's
+ // interrupt status within select().
+
+ // First, acquire the monitor of the object regulating
+ // access to our selectThread and unhandledWakeup fields.
+ synchronized (selectThreadMutex)
+ {
+ unhandledWakeup = true;
+
+ // Interrupt any thread which is currently blocked in
+ // a select operation.
+ if (selectThread != null)
+ selectThread.interrupt ();
+ }
+
+ return this;
}
private final void deregisterCancelledKeys()
{
- Iterator it = cancelledKeys().iterator();
-
- while (it.hasNext())
- {
- keys.remove ((SelectionKeyImpl) it.next());
- it.remove();
- }
+ Set ckeys = cancelledKeys ();
+ synchronized (ckeys)
+ {
+ Iterator it = ckeys.iterator();
+
+ while (it.hasNext ())
+ {
+ keys.remove ((SelectionKeyImpl) it.next ());
+ it.remove ();
+ }
+ }
}
protected SelectionKey register (SelectableChannel ch, int ops, Object att)
@@ -280,7 +407,11 @@ public class SelectorImpl extends AbstractSelector
throw new InternalError ("No known channel type");
}
- keys.add (result);
+ synchronized (keys)
+ {
+ keys.add (result);
+ }
+
result.interestOps (ops);
result.attach (att);
return result;