ports/java/jdk16/files/patch-nio-kqueue
Greg Lewis c9c98ec7cc . Some fixes for the KQueue NIO selector.
Submitted by:	davidxu@
2010-08-13 01:02:01 +00:00

692 lines
19 KiB
Text

$FreeBSD$
--- ../../j2se/src/share/classes/sun/nio/ch/KqueueSelectorProvider.java (revision 0)
+++ ../../j2se/src/share/classes/sun/nio/ch/KqueueSelectorProvider.java (revision 0)
@@ -0,0 +1,17 @@
+package sun.nio.ch;
+
+import java.io.IOException;
+import java.nio.channels.*;
+import java.nio.channels.spi.*;
+
+public class KqueueSelectorProvider
+ extends SelectorProviderImpl
+{
+ public AbstractSelector openSelector() throws IOException {
+ return new KqueueSelectorImpl(this);
+ }
+
+ public Channel inheritedChannel() throws IOException {
+ return InheritedChannel.getChannel();
+ }
+}
--- ../../j2se/src/solaris/native/sun/nio/ch/KqueueArrayWrapper.c (revision 0)
+++ ../../j2se/src/solaris/native/sun/nio/ch/KqueueArrayWrapper.c (revision 6)
@@ -0,0 +1,186 @@
+/*
+ * Scratched by davidxu@freebsd.org
+ */
+
+#include "jni.h"
+#include "jni_util.h"
+#include "jvm.h"
+#include "jlong.h"
+
+#include "sun_nio_ch_KqueueArrayWrapper.h"
+
+#include <errno.h>
+#include <sys/types.h>
+#include <sys/event.h>
+#include <sys/time.h>
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+static int
+restartable_kevent(int kqfd, struct kevent *changelist, int nchanges,
+ struct kevent *eventlist, int nevents);
+
+static int
+timeout_kevent(int kqfd, struct kevent *changelist, int nchanges,
+ struct kevent *eventlist, int nevents, int timo);
+
+JNIEXPORT jint JNICALL Java_sun_nio_ch_KqueueArrayWrapper_kqueue
+ (JNIEnv *env, jclass cls)
+{
+ int kqfd = kqueue();
+ if (kqfd < 0) {
+ JNU_ThrowIOExceptionWithLastError(env, "Error creating kqueue");
+ return -1;
+ }
+ return kqfd;
+}
+
+JNIEXPORT void JNICALL Java_sun_nio_ch_KqueueArrayWrapper_register
+ (JNIEnv *env, jclass cls, jint kqfd, jint fd, jshort filter)
+{
+ struct kevent ev;
+ struct timespec ts;
+
+ ev.ident = fd;
+ ev.filter = filter;
+ ev.flags = EV_ADD;
+ ev.fflags = 0;
+ ev.data = 0;
+ ev.udata = NULL;
+ ts.tv_sec = 0;
+ ts.tv_nsec = 0;
+ if (kevent(kqfd, &ev, 1, NULL, 0, &ts) < 0) {
+ JNU_ThrowIOExceptionWithLastError(env, "Error register kqueue event");
+ }
+}
+
+JNIEXPORT jint JNICALL Java_sun_nio_ch_KqueueArrayWrapper_kevent
+ (JNIEnv *env, jclass cls, jint kqfd, jlong changelist_addr, jint nchanges,
+ jlong eventlist_addr, jint nevents, jlong timeout)
+{
+ struct kevent *changelist = (struct kevent *)jlong_to_ptr(changelist_addr);
+ struct kevent *eventlist = (struct kevent *)jlong_to_ptr(eventlist_addr);
+ int result;
+
+ if (timeout < 0) {
+ result = restartable_kevent(kqfd, changelist, nchanges,
+ eventlist, nevents);
+ } else {
+ result = timeout_kevent(kqfd, changelist, nchanges, eventlist,
+ nevents, timeout);
+ }
+
+ if (result < 0) {
+ JNU_ThrowIOExceptionWithLastError(env, "Error polling kevent");
+ return -1;
+ }
+ return result;
+}
+
+static int
+restartable_kevent(int kqfd, struct kevent *changelist, int nchanges,
+ struct kevent *eventlist, int nevents)
+{
+ int result;
+
+ for (;;) {
+ result = kevent(kqfd, changelist, nchanges, eventlist,
+ nevents, NULL);
+ if (result == -1 && errno == EINTR) {
+ continue;
+ } else {
+ return result;
+ }
+ }
+}
+
+static int
+timeout_kevent(int kqfd, struct kevent *changelist, int nchanges,
+ struct kevent *eventlist, int nevents, int timo)
+{
+ struct timeval timeout, now, end;
+ int result;
+
+ timeout.tv_sec = timo / 1000;
+ timeout.tv_usec = (timo % 1000) * 1000;
+ gettimeofday(&now, NULL);
+ timeradd(&now, &timeout, &end);
+
+ for (;;) {
+ struct timespec ts;
+
+ ts.tv_sec = timeout.tv_sec;
+ ts.tv_nsec = timeout.tv_usec * 1000;
+ result = kevent(kqfd, changelist, nchanges, eventlist, nevents,
+ &ts);
+ if (result == -1 && (errno == EINTR)) {
+ gettimeofday(&now, NULL);
+ if (timercmp(&now, &end, >=))
+ return 0;
+ timersub(&end, &now, &timeout);
+ } else {
+ return result;
+ }
+ }
+}
+
+JNIEXPORT jint JNICALL Java_sun_nio_ch_KqueueArrayWrapper_keventSize
+ (JNIEnv *env, jclass cls)
+{
+ return sizeof(struct kevent);
+}
+
+JNIEXPORT void JNICALL Java_sun_nio_ch_KqueueArrayWrapper_interrupt
+ (JNIEnv *env, jclass cls, jint fd)
+{
+ int fakebuf[1];
+
+ fakebuf[0] = 1;
+ if (write(fd, fakebuf, 1) < 0) {
+ JNU_ThrowIOExceptionWithLastError(env,
+ "Write to interrupt fd failed");
+ }
+}
+
+JNIEXPORT void JNICALL Java_sun_nio_ch_KqueueArrayWrapper_putKevent
+ (JNIEnv *env, jclass cls, jlong address, jint index, jint fd, jshort flags, jshort filter)
+{
+ struct kevent *ev = (struct kevent *)jlong_to_ptr(address);
+
+ ev[index].ident = fd;
+ ev[index].flags = flags;
+ ev[index].filter = filter;
+ ev[index].fflags = 0;
+ ev[index].data = 0;
+ ev[index].udata = NULL;
+}
+
+JNIEXPORT jshort JNICALL Java_sun_nio_ch_KqueueArrayWrapper_getKeventFilter
+ (JNIEnv *env, jclass cls, jlong address, jint index)
+{
+ struct kevent *ev = (struct kevent *)jlong_to_ptr(address);
+
+ return ev[index].filter;
+}
+
+JNIEXPORT jshort JNICALL Java_sun_nio_ch_KqueueArrayWrapper_getKeventFlags
+ (JNIEnv *env, jclass cls, jlong address, jint index)
+{
+ struct kevent *ev = (struct kevent *)jlong_to_ptr(address);
+
+ return ev[index].flags;
+}
+
+JNIEXPORT jint JNICALL Java_sun_nio_ch_KqueueArrayWrapper_getKeventIdent
+ (JNIEnv *env, jclass cls, jlong address, jint index)
+{
+ struct kevent *ev = (struct kevent *)jlong_to_ptr(address);
+
+ return (int)ev[index].ident;
+}
+
+#ifdef __cplusplus
+}
+#endif
--- ../../j2se/src/solaris/classes/sun/nio/ch/KqueueArrayWrapper.java (revision 0)
+++ ../../j2se/src/solaris/classes/sun/nio/ch/KqueueArrayWrapper.java (revision 6)
@@ -0,0 +1,231 @@
+/*
+ * Scratched by davidxu@freebsd.org
+ */
+
+package sun.nio.ch;
+
+import sun.misc.*;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Set;
+import java.util.Arrays;
+
+class KqueueArrayWrapper {
+
+ // Event masks copied from class AbstractPollArrayWrapper
+ static final short POLLIN = 0x0001;
+ static final short POLLOUT = 0x0004;
+ static final short POLLERR = 0x0008;
+ static final short POLLHUP = 0x0010;
+ static final short POLLNVAL = 0x0020;
+ static final short POLLREMOVE = 0x0800;
+
+ // Kevent filters
+ static final short EVFILT_READ = -1;
+ static final short EVFILT_WRITE = -2;
+
+ // Kevent flags
+ static final short EV_ADD = 0x0001;
+ static final short EV_DELETE = 0x0002;
+ static final short EV_ERROR = 0x4000;
+
+ // Miscellaneous constants
+ static final int SIZE_KEVENT = keventSize();
+
+ // Zero mask to unregister events from kqueue
+ static final Integer ZERO_MASK = new Integer(0);
+
+ // Capacity increment of some arrays
+ static final int capacityIncr = 100;
+
+ KqueueArrayWrapper() {
+ int allocationSize;
+
+ // initial size of event array
+ pollKeventSize = capacityIncr * 2;
+ allocationSize = pollKeventSize * SIZE_KEVENT;
+ pollKeventArray = new AllocatedNativeObject(allocationSize, true);
+ kqfd = kqueue();
+ }
+
+ // Machinery for remembering fd registration changes
+ private HashMap<Integer, Integer> updateMap = new HashMap<Integer, Integer>();
+ private int[] oldMasks = new int[capacityIncr];
+
+ // kevent array to receive
+ private AllocatedNativeObject pollKeventArray;
+
+ // current size of pollKeventArray
+ int pollKeventSize;
+
+ // the pollKeventSize should be larger than this
+ int nextKeventSize;
+
+ // The fd of the kqueue()
+ int kqfd;
+
+ // The fd of the interrupt line going out
+ int outgoingInterruptFD;
+
+ // The fd of the interrupt line coming in
+ int incomingInterruptFD;
+
+ // The index of the interrupt FD
+ int interruptedIndex;
+
+ // Number of updated kevent entries
+ int updated;
+
+ // ensure some array sizes are large enough with a given file handle
+ void ensureFd(int fd) {
+ ensureNextEventFd(fd);
+ if (oldMasks.length < fd+1)
+ oldMasks = Arrays.copyOf(oldMasks, fd + capacityIncr);
+ }
+
+ void ensureNextEventFd(int fd) {
+ // each file handle may have two filters, read and write.
+ if (nextKeventSize / 2 < fd+1)
+ nextKeventSize = (fd+1) * 2;
+ }
+
+ void resizeEventBuffer() {
+ if (nextKeventSize > pollKeventSize) {
+ pollKeventArray.free();
+ pollKeventSize = nextKeventSize + capacityIncr * 2;
+ int allocationSize = pollKeventSize * SIZE_KEVENT;
+ pollKeventArray = new AllocatedNativeObject(allocationSize, true);
+ }
+ }
+
+ void initInterrupt(int fd0, int fd1) {
+ outgoingInterruptFD = fd1;
+ incomingInterruptFD = fd0;
+ ensureFd(fd0);
+ register(kqfd, fd0, EVFILT_READ);
+ }
+
+ int getReventOps(int i) {
+ short filter = getKeventFilter(pollKeventArray.address(), i);
+ short flags = getKeventFlags(pollKeventArray.address(), i);
+ if ((flags & EV_ERROR) != 0)
+ return POLLERR;
+ if (filter == EVFILT_READ)
+ return POLLIN;
+ if (filter == EVFILT_WRITE)
+ return POLLOUT;
+ return (0);
+ }
+
+ int getDescriptor(int i) {
+ return getKeventIdent(pollKeventArray.address(), i);
+ }
+
+ void setInterest(int fd, int mask) {
+ if (fd <0)
+ throw new IndexOutOfBoundsException("file handle less than 0");
+ synchronized (updateMap) {
+ ensureFd(fd);
+ updateMap.put(new Integer(fd), new Integer(mask));
+ }
+ }
+
+ void release(int fd) {
+ synchronized (updateMap) {
+ updateMap.put(new Integer(fd), ZERO_MASK);
+ }
+ }
+
+ void closeKqueueFD() throws IOException {
+ FileDispatcher.closeIntFD(kqfd);
+ pollKeventArray.free();
+ }
+
+ int poll(long timeout) {
+ int changeCount = updateRegistrations();
+ updated = kevent(kqfd, pollKeventArray.address(), changeCount,
+ pollKeventArray.address(), pollKeventSize, timeout);
+ for (int i = 0; i < updated; i++) {
+ if (getDescriptor(i) == incomingInterruptFD) {
+ interruptedIndex = i;
+ interrupted = true;
+ break;
+ }
+ }
+ return updated;
+ }
+
+ int updateRegistrations() {
+ int index = 0;
+ synchronized (updateMap) {
+ resizeEventBuffer();
+
+ Set<Integer> s = updateMap.keySet();
+ /*
+ * Because resizeEventBuffer may reallocate event buffer,
+ * we must retrieve fresh address here.
+ */
+ long address = pollKeventArray.address();
+
+ for (Integer fd : s) {
+ Integer newmask = updateMap.get(fd);
+ int oldmask = oldMasks[fd];
+ if ((oldmask & POLLIN) != 0) {
+ if ((newmask & POLLIN) == 0) {
+ putKevent(address, index, fd.intValue(), EV_DELETE, EVFILT_READ);
+ index++;
+ }
+ } else {
+ if ((newmask & POLLIN) != 0) {
+ putKevent(address, index, fd.intValue(), EV_ADD, EVFILT_READ);
+ index++;
+ }
+ }
+
+ if ((oldmask & POLLOUT) != 0) {
+ if ((newmask & POLLOUT) == 0) {
+ putKevent(address, index, fd.intValue(), EV_DELETE, EVFILT_WRITE);
+ index++;
+ }
+ } else {
+ if ((newmask & POLLOUT) != 0) {
+ putKevent(address, index, fd.intValue(), EV_ADD, EVFILT_WRITE);
+ index++;
+ }
+ }
+ oldMasks[fd] = newmask;
+ }
+ updateMap.clear();
+ }
+ return index;
+ }
+
+ boolean interrupted = false;
+
+ public void interrupt() {
+ interrupt(outgoingInterruptFD);
+ }
+
+ public int interruptedIndex() {
+ return interruptedIndex;
+ }
+
+ boolean interrupted() {
+ return interrupted;
+ }
+
+ void clearInterrupted() {
+ interrupted = false;
+ }
+
+ private static native int kqueue();
+ private static native void register(int kqfd, int fd, short filter);
+ private static native int kevent(int kqfd, long changeList, int nchanges, long eventList,
+ int nevents, long timeout);
+ private static native int keventSize();
+ private static native void interrupt(int fd);
+ private static native void putKevent(long address, int index, int fd, short flag, short filter);
+ private static native short getKeventFilter(long address, int index);
+ private static native short getKeventFlags(long address, int index);
+ private static native int getKeventIdent(long address, int index);
+}
--- ../../j2se/src/solaris/classes/sun/nio/ch/KqueueSelectorImpl.java (revision 0)
+++ ../../j2se/src/solaris/classes/sun/nio/ch/KqueueSelectorImpl.java (revision 6)
@@ -0,0 +1,205 @@
+/*
+ * scratched by davidxu@freebsd.org
+ */
+
+package sun.nio.ch;
+
+import java.io.IOException;
+import java.nio.channels.*;
+import java.nio.channels.spi.*;
+import java.util.*;
+import sun.misc.*;
+
+
+/**
+ * An implementation of Selector for FreeBSD.
+ */
+class KqueueSelectorImpl
+ extends SelectorImpl
+{
+
+ // File descriptors used for interrupt
+ protected int fd0;
+ protected int fd1;
+
+ // The kqueue object
+ KqueueArrayWrapper kqueueWrapper;
+
+ // The number of valid channels in this Selector's poll array
+ private int totalChannels;
+
+ // Maps from file descriptors to keys
+ private HashMap fdToKey;
+
+ // True if this Selector has been closed
+ private boolean closed = false;
+
+ // Lock for interrupt triggering and clearing
+ private Object interruptLock = new Object();
+ private boolean interruptTriggered = false;
+
+ // Trace number of file handles are updated.
+ private BitSet updatedSet;
+
+ /**
+ * Package private constructor called by factory method in
+ * the abstract superclass Selector.
+ */
+ KqueueSelectorImpl(SelectorProvider sp) {
+ super(sp);
+ int[] fdes = new int[2];
+ IOUtil.initPipe(fdes, false);
+ fd0 = fdes[0];
+ fd1 = fdes[1];
+ kqueueWrapper = new KqueueArrayWrapper();
+ totalChannels = 1;
+ kqueueWrapper.initInterrupt(fd0, fd1);
+ updatedSet = new BitSet();
+ fdToKey = new HashMap();
+ }
+
+ protected int doSelect(long timeout)
+ throws IOException
+ {
+ if (closed)
+ throw new ClosedSelectorException();
+ processDeregisterQueue();
+ try {
+ begin();
+ kqueueWrapper.poll(timeout);
+ } finally {
+ end();
+ }
+ processDeregisterQueue();
+ int numKeysUpdated = updateSelectedKeys();
+ if (kqueueWrapper.interrupted()) {
+ // Clear the wakeup pipe
+ synchronized (interruptLock) {
+ kqueueWrapper.clearInterrupted();
+ IOUtil.drain(fd0);
+ interruptTriggered = false;
+ }
+ }
+ return numKeysUpdated;
+ }
+
+ /**
+ * Update the keys whose fd's have been selected by the kqueue.
+ * Add the ready keys to the ready queue.
+ */
+ private int updateSelectedKeys() {
+ int entries = kqueueWrapper.updated;
+ int numKeysUpdated = 0;
+ SelectionKeyImpl ski;
+ int fd;
+ int i;
+
+ updatedSet.clear();
+ for (i = 0; i < entries; i++) {
+ fd = kqueueWrapper.getDescriptor(i);
+ ski = (SelectionKeyImpl) fdToKey.get(new Integer(fd));
+ // ski is null in the case of an interrupt
+ if (ski != null)
+ ski.nioReadyOps(0);
+ }
+
+ for (i = 0; i < entries; i++) {
+ fd = kqueueWrapper.getDescriptor(i);
+ ski = (SelectionKeyImpl) fdToKey.get(new Integer(fd));
+ // ski is null in the case of an interrupt
+ if (ski != null) {
+ int rOps = kqueueWrapper.getReventOps(i);
+ if (selectedKeys.contains(ski)) {
+ if (ski.channel.translateAndUpdateReadyOps(rOps, ski)) {
+ if (!updatedSet.get(fd)) {
+ updatedSet.set(fd);
+ numKeysUpdated++;
+ }
+ }
+ } else {
+ ski.channel.translateAndUpdateReadyOps(rOps, ski);
+ if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0) {
+ selectedKeys.add(ski);
+ if (!updatedSet.get(fd)) {
+ updatedSet.set(fd);
+ numKeysUpdated++;
+ }
+ }
+ }
+ }
+ }
+ return numKeysUpdated;
+ }
+
+ protected void implClose() throws IOException {
+ if (!closed) {
+ closed = true;
+ FileDispatcher.closeIntFD(fd0);
+ FileDispatcher.closeIntFD(fd1);
+ if (kqueueWrapper != null) {
+ kqueueWrapper.release(fd0);
+ kqueueWrapper.closeKqueueFD();
+ kqueueWrapper = null;
+ selectedKeys = null;
+
+ // Deregister channels
+ Iterator i = keys.iterator();
+ while (i.hasNext()) {
+ SelectionKeyImpl ski = (SelectionKeyImpl)i.next();
+ deregister(ski);
+ SelectableChannel selch = ski.channel();
+ if (!selch.isOpen() && !selch.isRegistered())
+ ((SelChImpl)selch).kill();
+ i.remove();
+ }
+ totalChannels = 0;
+
+ }
+ fd0 = -1;
+ fd1 = -1;
+ }
+ }
+
+ protected void implRegister(SelectionKeyImpl ski) {
+ int fd = IOUtil.fdVal(ski.channel.getFD());
+ fdToKey.put(new Integer(fd), ski);
+ totalChannels++;
+ keys.add(ski);
+ }
+
+ protected void implDereg(SelectionKeyImpl ski) throws IOException {
+ int i = ski.getIndex();
+ assert (i >= 0);
+ int fd = ski.channel.getFDVal();
+ fdToKey.remove(new Integer(fd));
+ kqueueWrapper.release(fd);
+ totalChannels--;
+ ski.setIndex(-1);
+ keys.remove(ski);
+ selectedKeys.remove(ski);
+ deregister((AbstractSelectionKey)ski);
+ SelectableChannel selch = ski.channel();
+ if (!selch.isOpen() && !selch.isRegistered())
+ ((SelChImpl)selch).kill();
+ }
+
+ void putEventOps(SelectionKeyImpl sk, int ops) {
+ int fd = IOUtil.fdVal(sk.channel.getFD());
+ kqueueWrapper.setInterest(fd, ops);
+ }
+
+ public Selector wakeup() {
+ synchronized (interruptLock) {
+ if (!interruptTriggered) {
+ kqueueWrapper.interrupt();
+ interruptTriggered = true;
+ }
+ }
+ return this;
+ }
+
+ static {
+ Util.load();
+ }
+
+}
--- ../../j2se/make/java/nio/Makefile (revision 1)
+++ ../../j2se/make/java/nio/Makefile (revision 6)
@@ -70,11 +70,15 @@
sun/nio/ch/EPollSelectorProvider.java \
sun/nio/ch/EPollSelectorImpl.java \
sun/nio/ch/InheritedChannel.java \
+ sun/nio/ch/KqueueSelectorProvider.java \
+ sun/nio/ch/KqueueArrayWrapper.java \
+ sun/nio/ch/KqueueSelectorImpl.java \
sun/nio/ch/PollSelectorProvider.java \
sun/nio/ch/PollSelectorImpl.java
FILES_c += \
EPollArrayWrapper.c \
+ KqueueArrayWrapper.c \
PollArrayWrapper.c \
InheritedChannel.c \
NativeThread.c
@@ -82,6 +86,7 @@
FILES_export += \
sun/nio/ch/EPollArrayWrapper.java \
sun/nio/ch/InheritedChannel.java \
+ sun/nio/ch/KqueueArrayWrapper.java \
sun/nio/ch/NativeThread.java
endif # PLATFORM = linux
--- ../../j2se/src/solaris/classes/sun/nio/ch/DefaultSelectorProvider.java (revision 1)
+++ ../../j2se/src/solaris/classes/sun/nio/ch/DefaultSelectorProvider.java (revision 6)
@@ -29,6 +29,10 @@
public static SelectorProvider create() {
PrivilegedAction pa = new GetPropertyAction("os.name");
String osname = (String) AccessController.doPrivileged(pa);
+ if ("FreeBSD".equals(osname)) {
+ return new sun.nio.ch.KqueueSelectorProvider();
+ }
+
if ("SunOS".equals(osname)) {
return new sun.nio.ch.DevPollSelectorProvider();
}