diff --git a/nailgun-examples/pom.xml b/nailgun-examples/pom.xml index 07c17e84..772ad414 100644 --- a/nailgun-examples/pom.xml +++ b/nailgun-examples/pom.xml @@ -20,7 +20,7 @@ com.martiansoftware nailgun-all - 0.9.2-SNAPSHOT + 0.9.3-SNAPSHOT diff --git a/nailgun-examples/src/main/java/com/martiansoftware/nailgun/examples/Heartbeat.java b/nailgun-examples/src/main/java/com/martiansoftware/nailgun/examples/Heartbeat.java index f88b8731..70782e8e 100644 --- a/nailgun-examples/src/main/java/com/martiansoftware/nailgun/examples/Heartbeat.java +++ b/nailgun-examples/src/main/java/com/martiansoftware/nailgun/examples/Heartbeat.java @@ -17,15 +17,12 @@ */ package com.martiansoftware.nailgun.examples; -import com.martiansoftware.nailgun.NGClientListener; import com.martiansoftware.nailgun.NGContext; -import com.martiansoftware.nailgun.NGHeartbeatListener; import java.io.IOException; -import java.io.PrintStream; /** - * Print one hash per second to standard out while the client is running. + * Print one hash per second to standard out while the client is running. * * @author Jim Purbrick */ @@ -42,21 +39,14 @@ public static void nailMain(final NGContext context) throws IOException { try { // Register a new NGClientListener. As clientDisconnected is called from // another thread any nail state access must be properly synchronized. - context.addClientListener(new NGClientListener() { - public void clientDisconnected() throws InterruptedException { - throw new InterruptedException("Client Disconnected"); // Will interrupt the thread below. - } - }); + Thread mainThread = Thread.currentThread(); + context.addClientListener(mainThread::interrupt); // Register a new NGHeartbeatListener. This is normally only used for debugging disconnection problems. - context.addHeartbeatListener(new NGHeartbeatListener() { - public void heartbeatReceived(long intervalMillis) { - context.out.print("H"); - } - }); + context.addHeartbeatListener(() -> context.out.print("H")); // Loop printing a hash to the client every second until client disconnects. - while(! Thread.currentThread().isInterrupted()) { + while(!Thread.currentThread().isInterrupted()) { Thread.sleep(5000); context.out.print("S"); } diff --git a/nailgun-server/pom.xml b/nailgun-server/pom.xml index 2db26279..eb640941 100644 --- a/nailgun-server/pom.xml +++ b/nailgun-server/pom.xml @@ -20,7 +20,7 @@ com.martiansoftware nailgun-all - 0.9.2-SNAPSHOT + 0.9.3-SNAPSHOT diff --git a/nailgun-server/src/main/java/com/martiansoftware/nailgun/NGClientListener.java b/nailgun-server/src/main/java/com/martiansoftware/nailgun/NGClientListener.java index b427290f..55aa7633 100644 --- a/nailgun-server/src/main/java/com/martiansoftware/nailgun/NGClientListener.java +++ b/nailgun-server/src/main/java/com/martiansoftware/nailgun/NGClientListener.java @@ -4,8 +4,7 @@ public interface NGClientListener { /** * Called by an internal nailgun thread when the server detects that the nailgun client has disconnected. - * {@link NGClientListener}s can be registered using {@link NGContext.registerClientListener}. If - * clientDisconnected throws an InterruptedException nailgun interrupts the main session thread. + * {@link NGClientListener}s can be registered using {@link NGContext#addClientListener}. */ - public void clientDisconnected() throws InterruptedException; + void clientDisconnected(); } diff --git a/nailgun-server/src/main/java/com/martiansoftware/nailgun/NGContext.java b/nailgun-server/src/main/java/com/martiansoftware/nailgun/NGContext.java index cfde79a8..afa30e28 100644 --- a/nailgun-server/src/main/java/com/martiansoftware/nailgun/NGContext.java +++ b/nailgun-server/src/main/java/com/martiansoftware/nailgun/NGContext.java @@ -287,6 +287,13 @@ public void removeClientListener(NGClientListener listener) { getInputStream().removeClientListener(listener); } + /** + * Do not notify about client exit + */ + public void removeAllClientListeners() { + getInputStream().removeAllClientListeners(); + } + /** * @param listener the {@link com.martiansoftware.nailgun.NGHeartbeatListener} to be notified of client events. */ diff --git a/nailgun-server/src/main/java/com/martiansoftware/nailgun/NGHeartbeatListener.java b/nailgun-server/src/main/java/com/martiansoftware/nailgun/NGHeartbeatListener.java index 527f6e2f..3dfcd36d 100644 --- a/nailgun-server/src/main/java/com/martiansoftware/nailgun/NGHeartbeatListener.java +++ b/nailgun-server/src/main/java/com/martiansoftware/nailgun/NGHeartbeatListener.java @@ -7,5 +7,5 @@ public interface NGHeartbeatListener { * This can normally be implemented as a no-op handler and is primarily useful for debugging. * {@link NGClientListener}s can be registered using {@link NGContext.registerHeartbeatListener}. */ - public void heartbeatReceived(long intervalMillis); + void heartbeatReceived(); } diff --git a/nailgun-server/src/main/java/com/martiansoftware/nailgun/NGInputStream.java b/nailgun-server/src/main/java/com/martiansoftware/nailgun/NGInputStream.java index 1557f957..8516dc8d 100644 --- a/nailgun-server/src/main/java/com/martiansoftware/nailgun/NGInputStream.java +++ b/nailgun-server/src/main/java/com/martiansoftware/nailgun/NGInputStream.java @@ -19,17 +19,19 @@ package com.martiansoftware.nailgun; import java.io.*; +import java.net.SocketTimeoutException; +import java.util.ArrayList; import java.util.HashSet; -import java.util.Iterator; +import java.util.List; import java.util.Set; import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicLong; import java.util.logging.Level; import java.util.logging.Logger; /** - * A FilterInputStream that is able to read the chunked stdin stream - * from a NailGun client. - * + * A FilterInputStream that is able to read the chunked stdin stream from a NailGun client. + * * @author Marty Lamb */ public class NGInputStream extends FilterInputStream implements Closeable { @@ -40,15 +42,14 @@ public class NGInputStream extends FilterInputStream implements Closeable { private final DataInputStream din; private InputStream stdin = null; private boolean eof = false; - private long remaining = 0; + private boolean clientConnected = true; + private int remaining = 0; private byte[] oneByteBuffer = null; private final DataOutputStream out; private boolean started = false; - private long lastReadTime = System.currentTimeMillis(); - private final Future readFuture; - private final Set clientListeners = new HashSet(); - private final Set heartbeatListeners = new HashSet(); - private final int heartbeatTimeoutMillis; + private final Set clientListeners = new HashSet<>(); + private final Set heartbeatListeners = new HashSet<>(); + private static final long TERMINATION_TIMEOUT_MS = 1000; /** * Creates a new NGInputStream wrapping the specified InputStream. Also sets up a timer to @@ -57,19 +58,16 @@ public class NGInputStream extends FilterInputStream implements Closeable { * * @param in the InputStream to wrap * @param out the OutputStream to which SENDINPUT chunks should be sent - * @param serverLog the PrintStream to which server logging messages should be written * @param heartbeatTimeoutMillis the interval between heartbeats before considering the client * disconnected */ public NGInputStream( - InputStream in, + DataInputStream in, DataOutputStream out, - final PrintStream serverLog, final int heartbeatTimeoutMillis) { super(in); - din = (DataInputStream) this.in; + this.din = in; this.out = out; - this.heartbeatTimeoutMillis = heartbeatTimeoutMillis; /** Thread factory that overrides name and priority for executor threads */ final class NamedThreadFactory implements ThreadFactory { @@ -98,106 +96,140 @@ public Thread newThread(Runnable r) { } } - final Thread mainThread = Thread.currentThread(); + Thread mainThread = Thread.currentThread(); this.orchestratorExecutor = Executors.newSingleThreadExecutor( new NamedThreadFactory(mainThread.getName() + " (NGInputStream orchestrator)")); this.readExecutor = Executors.newSingleThreadExecutor( new NamedThreadFactory(mainThread.getName() + " (NGInputStream reader)")); - readFuture = orchestratorExecutor.submit(() -> { + // Read timeout, including heartbeats, should be handled by socket. + // However Java Socket/Stream API does not enforce that. To stay on safer side, + // use timeout on a future + + // let socket timeout first, set rough timeout to 110% of original + long futureTimeout = heartbeatTimeoutMillis + heartbeatTimeoutMillis / 10; + + orchestratorExecutor.submit(() -> { try { - while (true) { - Future readHeaderFuture = readExecutor.submit(() -> { + boolean isMoreData = true; + while (isMoreData) { + Future readFuture = readExecutor.submit(() -> { try { + // return false if client sends EOF readChunk(); + return true; + } catch (EOFException e) { + // EOFException means that underlying stream is closed by the server + // There will be no more data and it is time to close the circus + return false; } catch (IOException e) { - throw new RuntimeException(e); + throw new ExecutionException(e); } }); - readHeaderFuture.get(heartbeatTimeoutMillis, TimeUnit.MILLISECONDS); + + isMoreData = readFuture.get(futureTimeout, TimeUnit.MILLISECONDS); } } catch (InterruptedException e) { LOG.log(Level.WARNING, "Nailgun client read future was interrupted", e); } catch (ExecutionException e) { - LOG.log(Level.WARNING, "Nailgun client read future raised an exception", e); + Throwable cause = e.getCause(); + if (cause != null && cause instanceof SocketTimeoutException) { + LOG.log(Level.WARNING, + "Nailgun client socket timed out after " + heartbeatTimeoutMillis + " ms", + cause); + } else { + LOG.log(Level.WARNING, "Nailgun client read future raised an exception", e); + } } catch (TimeoutException e) { LOG.log(Level.WARNING, - "Nailgun client read future timed out after " + heartbeatTimeoutMillis + " ms", + "Nailgun client read future timed out after " + futureTimeout + " ms", e); } finally { LOG.log(Level.FINE, "Nailgun client read shutting down"); - notifyClientListeners(serverLog, mainThread); - readEof(); + + // notify stream readers there are no more data + setEof(); + + // set client disconnected flag + setClientDisconnected(); + + // notify listeners that client has disconnected + notifyClientListeners(); } }); } /** - * Calls clientDisconnected method on given NGClientListener. - * If the clientDisconnected method throws an NGExitException due to calling System.exit() it is - * rethrown as an InterruptedException. - * @param listener The NGClientListener to notify. + * Calls clientDisconnected method on all registered NGClientListeners. */ - private synchronized void notifyClientListener(NGClientListener listener) throws InterruptedException { - try { + private void notifyClientListeners() { + // copy collection under monitor to avoid blocking monitor on potentially expensive + // callbacks + List listeners; + synchronized (this) { + listeners = new ArrayList<>(clientListeners); + clientListeners.clear(); + } + + for (NGClientListener listener : listeners) { listener.clientDisconnected(); - } catch (NGExitException e) { - throw new InterruptedException(e.getMessage()); } } /** - * Calls clientDisconnected method on given NGClientListener. - * If the clientDisconnected method calls System.exit() or throws an InterruptedException, the given - * mainThread is interrupted. - * @param listener The NGClientListener to notify. - * @param mainThread The Thread to interrupt. + * Cancel the thread reading from the NailGun client and close underlying input stream */ - private synchronized void notifyClientListener(NGClientListener listener, Thread mainThread) { + public void close() throws IOException { + setEof(); + + // this will close `in` and trigger any in.read() calls from readExecutor to unblock + super.close(); + + // the order or termination is important because readExecutor will send a completion + // signal to orchestratorExecutor + terminateExecutor(readExecutor, "read"); + terminateExecutor(orchestratorExecutor, "orchestrator"); + } + + private static void terminateExecutor(ExecutorService service, String which) { + LOG.log(Level.FINE, "Shutting down {0} ExecutorService", which); + service.shutdown(); + + boolean terminated = false; try { - notifyClientListener(listener); + terminated = service + .awaitTermination(TERMINATION_TIMEOUT_MS, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { - mainThread.interrupt(); + // It can happen if a thread calling close() is already interrupted + // do not do anything here but do hard shutdown later with shutdownNow() + // It is calling thread's responsibility to not be in interrupted state + LOG.log(Level.WARNING, + "Interruption is signaled in close(), terminating a thread forcefully"); + service.shutdownNow(); + return; } - } - /** - * Calls clientDisconnected method on all registered NGClientListeners. - * If any of the clientDisconnected methods throw an NGExitException due to calling System.exit() - * clientDisconnected processing is halted, the exit status is printed to the serverLog and the main - * thread is interrupted. - * @param serverLog The NailGun server log stream. - * @param mainThread The thread running nailMain which should be interrupted on System.exit() - */ - private synchronized void notifyClientListeners(PrintStream serverLog, Thread mainThread) { - if (! eof) { - serverLog.println(mainThread.getName() + " disconnected"); - for (Iterator i = clientListeners.iterator(); i.hasNext(); ) { - notifyClientListener((NGClientListener) i.next(), mainThread); - } + if (!terminated) { + // something went wrong, executor task did not receive a signal and did not complete on time + // shot executor in the head then + LOG.log(Level.WARNING, + "{0} thread did not unblock on a signal within timeout and will be" + + " forcefully terminated", + which); + service.shutdownNow(); } - clientListeners.clear(); } - /** - * Cancels the thread reading from the NailGun client. - */ - public synchronized void close() { - readEof(); - //TODO(buck_team): graceful shutdown - readFuture.cancel(true); - readExecutor.shutdownNow(); - orchestratorExecutor.shutdownNow(); - } - /** * Reads a NailGun chunk payload from {@link #in} and returns an InputStream that reads from * that chunk. + * * @param in the InputStream to read the chunk payload from. * @param len the size of the payload chunk read from the chunkHeader. * @return an InputStream containing the read data. - * @throws IOException if thrown by the underlying InputStream, - * or if the stream EOF is reached before the payload has been read. + * @throws IOException if thrown by the underlying InputStream + * @throws EOFException if EOF is reached by underlying stream + * before the payload has been read. */ private InputStream readPayload(InputStream in, int len) throws IOException { @@ -206,7 +238,10 @@ private InputStream readPayload(InputStream in, int len) throws IOException { while (totalRead < len) { int currentRead = in.read(receiveBuffer, totalRead, len - totalRead); if (currentRead < 0) { - throw new IOException("stdin EOF before payload read."); + // server may forcefully close the socket/stream and this will cause InputStream to + // return -1. Throw EOFException (same what DataInputStream does) to signal up + // that we are in shutdown mode + throw new EOFException("stdin EOF before payload read."); } totalRead += currentRead; } @@ -216,155 +251,189 @@ private InputStream readPayload(InputStream in, int len) throws IOException { /** * Reads a NailGun chunk header from the underlying InputStream. * - * @throws IOException if thrown by the underlying InputStream, - * or if an unexpected NailGun chunk type is encountered. + * @throws EOFException if underlying stream / socket is closed which happens on client + * disconnection + * @throws IOException if thrown by the underlying InputStream, or if an unexpected NailGun + * chunk type is encountered. */ private void readChunk() throws IOException { + int chunkLen = din.readInt(); + byte chunkType = din.readByte(); + long readTime = System.currentTimeMillis(); + + switch (chunkType) { + case NGConstants.CHUNKTYPE_STDIN: + InputStream chunkStream = readPayload(in, chunkLen); + synchronized (this) { + if (remaining != 0) { + // TODO(buck_team) have better passthru streaming and remove this + // limitation + throw new IOException("Data received before stdin stream was emptied"); + } + LOG.log(Level.FINEST, "Got stdin chunk, len {0}", chunkLen); + stdin = chunkStream; + remaining = chunkLen; + notifyAll(); + } + break; - // Synchronize on the input stream to avoid blocking other threads while waiting for chunk headers. - synchronized (this.din) { - int hlen = din.readInt(); - byte chunkType = din.readByte(); - long readTime = System.currentTimeMillis(); - - // Synchronize the remainder of the method on this object as it accesses internal state. - synchronized (this) { - long intervalMillis = readTime - lastReadTime; - lastReadTime = readTime; - switch(chunkType) { - case NGConstants.CHUNKTYPE_STDIN: - if (remaining != 0) { - throw new IOException("Data received before stdin stream was emptied"); - } - LOG.log(Level.FINEST, "Got stdin chunk, len %d", hlen); - remaining = hlen; - stdin = readPayload(in, hlen); - notify(); - break; - - case NGConstants.CHUNKTYPE_STDIN_EOF: - LOG.log(Level.FINEST, "Got stdin closed chunk"); - readEof(); - break; - - case NGConstants.CHUNKTYPE_HEARTBEAT: - LOG.log(Level.FINEST, "Got client heartbeat"); - // TODO(buck_team): should probably dispatch to a different thread(pool) - for (Iterator i = heartbeatListeners.iterator(); i.hasNext();) { - ((NGHeartbeatListener) i.next()).heartbeatReceived(intervalMillis); - } - break; + case NGConstants.CHUNKTYPE_STDIN_EOF: + LOG.log(Level.FINEST, "Got stdin closed chunk"); + setEof(); + break; + + case NGConstants.CHUNKTYPE_HEARTBEAT: + LOG.log(Level.FINEST, "Got client heartbeat"); - default: - LOG.log(Level.WARNING, "Unknown chunk type: %c", (char) chunkType); - throw new IOException("Unknown stream type: " + (char) chunkType); + ArrayList listeners; + synchronized (this) { + // copy collection to avoid executing callbacks under lock + listeners = new ArrayList<>(heartbeatListeners); } - } + + // TODO(buck_team): should probably dispatch to a different thread(pool) + for (NGHeartbeatListener listener : listeners) { + listener.heartbeatReceived(); + } + + break; + + default: + LOG.log(Level.WARNING, "Unknown chunk type: {0}", (char) chunkType); + throw new IOException("Unknown stream type: " + (char) chunkType); } } /** - * Notify threads waiting in waitForChunk on either EOF chunk read or client disconnection. + * Notify threads waiting in read() on either EOF chunk read or client disconnection. */ - private synchronized void readEof() { + private synchronized void setEof() { eof = true; notifyAll(); } /** - * @see java.io.InputStream#available() - */ - public synchronized int available() throws IOException { - if (eof) return(0); - if (stdin == null) return(0); - return stdin.available(); - } - - /** - * @see java.io.InputStream#markSupported() - */ - public boolean markSupported() { - return (false); - } - - /** - * @see java.io.InputStream#read() - */ - public synchronized int read() throws IOException { - if (oneByteBuffer == null) oneByteBuffer = new byte[1]; - return((read(oneByteBuffer, 0, 1) == -1) ? -1 : (int) oneByteBuffer[0]); - } - - /** - * @see java.io.InputStream#read(byte[]) - */ - public synchronized int read(byte[] b) throws IOException { - return (read(b, 0, b.length)); - } - - /** - * @see java.io.InputStream#read(byte[], int, int) - */ - public synchronized int read(byte[] b, int offset, int length) throws IOException { - if (!started) { - sendSendInput(); - } - - waitForChunk(); - if (eof) return(-1); - - int bytesToRead = Math.min((int) remaining, length); - int result = stdin.read(b, offset, bytesToRead); - remaining -= result; - if (remaining == 0) sendSendInput(); - return (result); - } + * Notify threads waiting in read() on either EOF chunk read or client disconnection. + */ + private synchronized void setClientDisconnected() { + clientConnected = false; + } /** - * If EOF chunk has not been received, but no data is available, block until data is received, EOF or disconnection. - * @throws IOException which just wraps InterruptedExceptions thrown by wait. + * @see java.io.InputStream#available() */ - private synchronized void waitForChunk() throws IOException { - try { - if((! eof) && (remaining == 0)) wait(); - } catch (InterruptedException e) { - throw new IOException(e); + public synchronized int available() throws IOException { + if (eof) { + return 0; + } + if (stdin == null) { + return 0; } + return stdin.available(); } - private synchronized void sendSendInput() throws IOException { - synchronized(out) { - out.writeInt(0); - out.writeByte(NGConstants.CHUNKTYPE_SENDINPUT); + /** + * @see java.io.InputStream#markSupported() + */ + public boolean markSupported() { + return false; + } + + /** + * @see java.io.InputStream#read() + */ + public synchronized int read() throws IOException { + if (oneByteBuffer == null) { + oneByteBuffer = new byte[1]; + } + return read(oneByteBuffer, 0, 1) == -1 ? -1 : (int) oneByteBuffer[0]; + } + + /** + * @see java.io.InputStream#read(byte[]) + */ + public synchronized int read(byte[] b) throws IOException { + return read(b, 0, b.length); + } + + /** + * @see java.io.InputStream#read(byte[], int, int) + */ + public synchronized int read(byte[] b, int offset, int length) throws IOException { + if (!started) { + sendSendInput(); + started = true; + } + + if (remaining == 0) { + if (eof) { + return -1; + } + + try { + // wait for monitor to signal for either new data packet or eof (termination) + wait(); + } catch (InterruptedException e) { + // this is a signal to stop listening and close + // it should never trigger this code path as we always explicitly unblock monitor + return -1; + } + } + + if (remaining == 0) { + // still no data, stream/socket is probably terminated; return -1 + return -1; + } + + int bytesToRead = Math.min(remaining, length); + int result = stdin.read(b, offset, bytesToRead); + remaining -= result; + if (remaining == 0) { + sendSendInput(); + } + + return result; + } + + private void sendSendInput() throws IOException { + // Need to synchronize out because some other thread may write to out too at the same time + // hopefully this 'other thread' will synchronize on 'out' as well + // also we synchronize on both streams which is a potential deadlock + // TODO(buck_team): move acknowledgement packet out of NGInputStream + synchronized (out) { + out.writeInt(0); + out.writeByte(NGConstants.CHUNKTYPE_SENDINPUT); } out.flush(); - started = true; } - /** - * @return true if interval since last read is less than heartbeat timeout interval. - */ - public synchronized boolean isClientConnected() { - long intervalMillis = System.currentTimeMillis() - lastReadTime; - return intervalMillis < heartbeatTimeoutMillis; - } + /** + * @return true if interval since last read is less than heartbeat timeout interval. + */ + public synchronized boolean isClientConnected() { + return clientConnected; + } /** * Registers a new NGClientListener to be called on client disconnection or calls the listeners * clientDisconnected method if the client has already disconnected to avoid races. + * * @param listener the {@link NGClientListener} to be notified of client events. - * @throws InterruptedException if thrown by the clientDisconnected method or it calls System.exit(). */ - public synchronized void addClientListener(NGClientListener listener) { - if (! readFuture.isDone()) { - clientListeners.add(listener); - } else { - try { - notifyClientListener(listener); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); + public void addClientListener(NGClientListener listener) { + boolean shouldNotifyNow = false; + + synchronized (this) { + if (clientConnected) { + clientListeners.add(listener); + } else { + shouldNotifyNow = true; } } + + if (shouldNotifyNow) { + listener.clientDisconnected(); + } } /** @@ -374,13 +443,18 @@ public synchronized void removeClientListener(NGClientListener listener) { clientListeners.remove(listener); } + /** + * Do not notify anymore about client disconnects + */ + public synchronized void removeAllClientListeners() { + clientListeners.clear(); + } + /** * @param listener the {@link NGHeartbeatListener} to be notified of client events. */ public synchronized void addHeartbeatListener(NGHeartbeatListener listener) { - if (! readFuture.isDone()) { - heartbeatListeners.add(listener); - } + heartbeatListeners.add(listener); } /** diff --git a/nailgun-server/src/main/java/com/martiansoftware/nailgun/NGOutputStream.java b/nailgun-server/src/main/java/com/martiansoftware/nailgun/NGOutputStream.java index b145465b..755a8200 100644 --- a/nailgun-server/src/main/java/com/martiansoftware/nailgun/NGOutputStream.java +++ b/nailgun-server/src/main/java/com/martiansoftware/nailgun/NGOutputStream.java @@ -50,7 +50,7 @@ public NGOutputStream(java.io.OutputStream out, byte streamCode) { } /** - * @see java.io.OutputStream.write(byte[]) + * @see java.io.OutputStream#write(byte[]) */ public void write(byte[] b) throws IOException { throwIfClosed(); @@ -58,7 +58,7 @@ public void write(byte[] b) throws IOException { } /** - * @see java.io.OutputStream.write(int) + * @see java.io.OutputStream#write(int) */ public void write(int b) throws IOException { throwIfClosed(); @@ -67,7 +67,7 @@ public void write(int b) throws IOException { } /** - * @see java.io.OutputStream.write(byte[],int,int) + * @see java.io.OutputStream#write(byte[],int,int) */ public void write(byte[] b, int offset, int len) throws IOException { throwIfClosed(); @@ -80,7 +80,7 @@ public void write(byte[] b, int offset, int len) throws IOException { } /** - * @see java.io.OutputStream.close() + * @see java.io.OutputStream#close() * * Implement an empty close function, to allow the client to close * the stdout and/or stderr, without this closing the connection @@ -96,7 +96,7 @@ public void close() throws IOException { } /** - * @see java.io.OutputStream.flush() + * @see java.io.OutputStream#flush() */ public void flush() throws IOException { throwIfClosed(); diff --git a/nailgun-server/src/main/java/com/martiansoftware/nailgun/NGSession.java b/nailgun-server/src/main/java/com/martiansoftware/nailgun/NGSession.java index 2632e4f3..69ce76f8 100644 --- a/nailgun-server/src/main/java/com/martiansoftware/nailgun/NGSession.java +++ b/nailgun-server/src/main/java/com/martiansoftware/nailgun/NGSession.java @@ -25,6 +25,7 @@ import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.net.Socket; +import java.net.SocketException; import java.util.List; import java.util.Properties; import java.util.concurrent.atomic.AtomicLong; @@ -119,7 +120,6 @@ public class NGSession extends Thread { this.server = server; this.heartbeatTimeoutMillis = server.getHeartbeatTimeout(); this.instanceNumber = instanceCounter.incrementAndGet(); -// server.out.println("Created NGSession " + instanceNumber); } /** @@ -155,7 +155,7 @@ public void run(Socket socket) { * null if the NGSession has been shut down. */ private Socket nextSocket() { - Socket result = null; + Socket result; synchronized (lock) { result = nextSocket; while (!done && result == null) { @@ -168,7 +168,22 @@ private Socket nextSocket() { } nextSocket = null; } - return (result); + + if (result != null) { + // Java InputStream API is blocking by default with no reliable way to stop pending + // read() call. Setting the timeout to underlying socket will make socket's underlying + // read() calls throw SocketTimeoutException which unblocks read(). The exception must + // be properly handled by calling code. + try { + result.setSoTimeout(this.heartbeatTimeoutMillis); + } catch (SocketException e) { + // this exception might be thrown if socket is already closed + // so we just return null + return null; + } + } + + return result; } /** @@ -246,17 +261,12 @@ public void run() { // can't create NGInputStream until we've received a command, because at // that point the stream from the client will only include stdin and stdin-eof // chunks - InputStream in = null; - PrintStream out = null; - PrintStream err = null; - PrintStream exit = null; - - try { - in = new NGInputStream(sockin, sockout, server.out, heartbeatTimeoutMillis); - out = new PrintStream(new NGOutputStream(sockout, NGConstants.CHUNKTYPE_STDOUT)); - err = new PrintStream(new NGOutputStream(sockout, NGConstants.CHUNKTYPE_STDERR)); - exit = new PrintStream(new NGOutputStream(sockout, NGConstants.CHUNKTYPE_EXIT)); - + try ( + InputStream in = new NGInputStream(sockin, sockout, heartbeatTimeoutMillis); + PrintStream out = new PrintStream(new NGOutputStream(sockout, NGConstants.CHUNKTYPE_STDOUT)); + PrintStream err = new PrintStream(new NGOutputStream(sockout, NGConstants.CHUNKTYPE_STDERR)); + PrintStream exit = new PrintStream(new NGOutputStream(sockout, NGConstants.CHUNKTYPE_EXIT)); + ) { // ThreadLocal streams for System.in/out/err redirection ((ThreadLocalInputStream) System.in).init(in); ((ThreadLocalPrintStream) System.out).init(out); @@ -346,29 +356,14 @@ public void run() { } catch (NGExitException exitEx) { LOG.log(Level.INFO, "Server cleanly exited with status " + exitEx.getStatus(), exitEx); - in.close(); exit.println(exitEx.getStatus()); server.out.println(Thread.currentThread().getName() + " exited with status " + exitEx.getStatus()); } catch (Throwable t) { LOG.log(Level.INFO, "Server unexpectedly exited with unhandled exception", t); - in.close(); t.printStackTrace(); exit.println(NGConstants.EXIT_EXCEPTION); // remote exception constant } } finally { - LOG.log(Level.FINE, "Tearing down client socket"); - if (in != null) { - in.close(); - } - if (out != null) { - out.close(); - } - if (err != null) { - err.close(); - } - if (exit != null) { - exit.close(); - } LOG.log(Level.FINE, "Flushing client socket"); sockout.flush(); try { @@ -401,7 +396,6 @@ public void run() { } LOG.log(Level.INFO, "NGSession shutting down"); -// server.out.println("Shutdown NGSession " + instanceNumber); } /** diff --git a/pom.xml b/pom.xml index e1aab914..f3b39174 100644 --- a/pom.xml +++ b/pom.xml @@ -10,7 +10,7 @@ com.martiansoftware nailgun-all - 0.9.2-SNAPSHOT + 0.9.3-SNAPSHOT pom @@ -41,7 +41,7 @@ scm:git:git@github.com:martylamb/nailgun.git scm:git:git@github.com:martylamb/nailgun.git scm:git:git@github.com:martylamb/nailgun.git - nailgun-all-0.9.1 + nailgun-all-0.9.3 diff --git a/pynailgun/ng.py b/pynailgun/ng.py index 4ac4ad8f..f18935b7 100644 --- a/pynailgun/ng.py +++ b/pynailgun/ng.py @@ -24,18 +24,20 @@ import socket import struct import sys -from threading import Condition, Event, Thread +from threading import Condition, Event, Thread, RLock # @author Marty Lamb # @author Pete Kirkham (Win32 port) -# @author Ben Hamilton (Python port) +# @author Sergey Balabanov, Ben Hamilton (Python port) # # Please try to keep this working on Python 2.6. -NAILGUN_VERSION = '0.9.0' +NAILGUN_VERSION = '0.9.3' BUFSIZE = 2048 NAILGUN_PORT_DEFAULT = 2113 CHUNK_HEADER_LEN = 5 +THREAD_TERMINATION_TIMEOUT_SEC = 0.5 +STDIN_BUFFER_LINE_SIZE = 10 CHUNKTYPE_STDIN = '0' CHUNKTYPE_STDOUT = '1' @@ -374,7 +376,7 @@ def select_now(self): class NailgunConnection(object): - '''Stateful object holding the connection to the Nailgun server.''' + """Stateful object holding the connection to the Nailgun server.""" def __init__( self, @@ -392,16 +394,24 @@ def __init__( self.send_flags = 0 self.header_buf = ctypes.create_string_buffer(CHUNK_HEADER_LEN) self.buf = ctypes.create_string_buffer(BUFSIZE) - self.ready_to_send_condition = Condition() + self.sendtime_nanos = 0 self.exit_code = None - self.stdin_queue = Queue.Queue() + self.shutdown_event = Event() - self.stdin_thread = Thread( - target=stdin_thread_main, - args=(self.stdin, self.stdin_queue, self.shutdown_event, self.ready_to_send_condition)) + + self.error_lock = RLock() + self.error = None + + self.stdin_condition = Condition() + self.stdin_thread = Thread(target=stdin_thread_main, args=(self,)) self.stdin_thread.daemon = True + self.send_queue = Queue.Queue() + self.send_condition = Condition() + self.send_thread = Thread(target=send_thread_main, args=(self,)) + self.send_thread.daemon = True + def send_command( self, cmd, @@ -409,12 +419,12 @@ def send_command( filearg=None, env=os.environ, cwd=os.getcwd()): - ''' + """ Sends the command and environment to the nailgun server, then loops forever reading the response until the server sends an exit chunk. Returns the exit value, or raises NailgunException on error. - ''' + """ try: return self._send_command_and_read_response(cmd, cmd_args, filearg, env, cwd) except socket.error as e: @@ -423,58 +433,204 @@ def send_command( NailgunException.CONNECTION_BROKEN) def _send_command_and_read_response(self, cmd, cmd_args, filearg, env, cwd): - if filearg: - send_file_arg(filearg, self) - for cmd_arg in cmd_args: - send_chunk(cmd_arg, CHUNKTYPE_ARG, self) - send_env_var('NAILGUN_FILESEPARATOR', os.sep, self) - send_env_var('NAILGUN_PATHSEPARATOR', os.pathsep, self) - send_tty_format(self.stdin, self) - send_tty_format(self.stdout, self) - send_tty_format(self.stderr, self) - for k, v in env.iteritems(): - send_env_var(k, v, self) - send_chunk(cwd, CHUNKTYPE_DIR, self) - send_chunk(cmd, CHUNKTYPE_CMD, self) self.stdin_thread.start() - while self.exit_code is None: - self._process_next_chunk() - self._check_stdin_queue() - self.shutdown_event.set() - with self.ready_to_send_condition: - self.ready_to_send_condition.notify() - # We can't really join on self.stdin_thread, since - # there's no way to interrupt its call to sys.stdin.readline. + self.send_thread.start() + + try: + + if filearg: + self._send_file_arg(filearg) + for cmd_arg in cmd_args: + self._send_chunk(cmd_arg, CHUNKTYPE_ARG) + self._send_env_var('NAILGUN_FILESEPARATOR', os.sep) + self._send_env_var('NAILGUN_PATHSEPARATOR', os.pathsep) + self._send_tty_format(self.stdin) + self._send_tty_format(self.stdout) + self._send_tty_format(self.stderr) + for k, v in env.iteritems(): + self._send_env_var(k, v) + self._send_chunk(cwd, CHUNKTYPE_DIR) + self._send_chunk(cmd, CHUNKTYPE_CMD) + + while self.exit_code is None: + self._process_next_chunk() + + finally: + + self.shutdown_event.set() + with self.stdin_condition: + self.stdin_condition.notify() + with self.send_condition: + self.send_condition.notify() + self.stdin_thread.join(THREAD_TERMINATION_TIMEOUT_SEC) + self.send_thread.join(THREAD_TERMINATION_TIMEOUT_SEC) + return self.exit_code def _process_next_chunk(self): - ''' + """ Processes the next chunk from the nailgun server. - ''' + """ readable, exceptional = self.transport.select(HEARTBEAT_TIMEOUT_SECS) if readable: - process_nailgun_stream(self) + self._process_nailgun_stream() now = monotonic_time_nanos() if now - self.sendtime_nanos > HEARTBEAT_TIMEOUT_NANOS: - send_heartbeat(self) + self._send_heartbeat() if exceptional: raise NailgunException( 'Server disconnected in select', NailgunException.CONNECTION_BROKEN) - def _check_stdin_queue(self): - '''Check if the stdin thread has read anything.''' - while not self.stdin_queue.empty(): - try: - (event_type, event_arg) = self.stdin_queue.get_nowait() - if event_type == EVENT_STDIN_CHUNK: - send_chunk(event_arg, CHUNKTYPE_STDIN, self) - elif event_type == EVENT_STDIN_CLOSED: - send_chunk('', CHUNKTYPE_STDIN_EOF, self) - elif event_type == EVENT_STDIN_EXCEPTION: - raise event_arg - except Queue.Empty: - break + # if daemon thread threw, rethrow here + if self.shutdown_event.is_set(): + e = None + with self.error_lock: + e = self.error + if e is not None: + raise e + + + def _send_chunk(self, buf, chunk_type): + """ + Send chunk to the server asynchronously + """ + self.send_queue.put((chunk_type, buf)) + with self.send_condition: + self.send_condition.notify() + + + def _send_env_var(self, name, value): + """ + Sends an environment variable in KEY=VALUE format. + """ + self._send_chunk('='.join((name, value)), CHUNKTYPE_ENV) + + + def _send_tty_format(self, f): + """ + Sends a NAILGUN_TTY_# environment variable. + """ + if not f or not hasattr(f, 'fileno'): + return + fileno = f.fileno() + isatty = os.isatty(fileno) + self._send_env_var('NAILGUN_TTY_' + str(fileno), str(int(isatty))) + + + def _send_file_arg(self, filename): + """ + Sends the contents of a file to the server. + """ + with open(filename) as f: + while True: + num_bytes = f.readinto(self.buf) + if not num_bytes: + break + self._send_chunk(self.buf.raw[:num_bytes], CHUNKTYPE_LONGARG) + + + def _recv_to_fd(self, dest_file, num_bytes): + """ + Receives num_bytes bytes from the nailgun socket and copies them to the specified file + object. Used to route data to stdout or stderr on the client. + """ + bytes_read = 0 + + while bytes_read < num_bytes: + bytes_to_read = min(len(self.buf), num_bytes - bytes_read) + bytes_received = self.transport.recv_into( + self.buf, + bytes_to_read) + if dest_file: + dest_file.write(self.buf[:bytes_received]) + bytes_read += bytes_received + + + def _recv_to_buffer(self, num_bytes, buf): + """ + Receives num_bytes from the nailgun socket and writes them into the specified buffer. + """ + # We'd love to use socket.recv_into() everywhere to avoid + # unnecessary copies, but we need to support Python 2.6. The + # only way to provide an offset to recv_into() is to use + # memoryview(), which doesn't exist until Python 2.7. + if HAS_MEMORYVIEW: + self._recv_into_memoryview(num_bytes, memoryview(buf)) + else: + self._recv_to_buffer_with_copy(num_bytes, buf) + + + def _recv_into_memoryview(self, num_bytes, buf_view): + """ + Receives num_bytes from the nailgun socket and writes them into the specified memoryview + to avoid an extra copy. + """ + bytes_read = 0 + while bytes_read < num_bytes: + bytes_received = self.transport.recv_into( + buf_view[bytes_read:], + num_bytes - bytes_read) + if not bytes_received: + raise NailgunException( + 'Server unexpectedly disconnected in recv_into()', + NailgunException.CONNECTION_BROKEN) + bytes_read += bytes_received + + + def _recv_to_buffer_with_copy(self, num_bytes, buf): + """ + Receives num_bytes from the nailgun socket and writes them into the specified buffer. + """ + bytes_read = 0 + while bytes_read < num_bytes: + recv_buf = self.transport.recv( + num_bytes - bytes_read) + if not len(recv_buf): + raise NailgunException( + 'Server unexpectedly disconnected in recv()', + NailgunException.CONNECTION_BROKEN) + buf[bytes_read:bytes_read + len(recv_buf)] = recv_buf + bytes_read += len(recv_buf) + + + def _process_exit(self, exit_len): + """ + Receives an exit code from the nailgun server and sets nailgun_connection.exit_code + to indicate the client should exit. + """ + num_bytes = min(len(self.buf), exit_len) + self._recv_to_buffer(num_bytes, self.buf) + self.exit_code = int(''.join(self.buf.raw[:num_bytes])) + + + def _send_heartbeat(self): + """ + Sends a heartbeat to the nailgun server to indicate the client is still alive. + """ + self._send_chunk('', CHUNKTYPE_HEARTBEAT) + + def _process_nailgun_stream(self): + """ + Processes a single chunk from the nailgun server. + """ + self._recv_to_buffer(len(self.header_buf), self.header_buf) + (chunk_len, chunk_type) = struct.unpack_from('>ic', self.header_buf.raw) + + if chunk_type == CHUNKTYPE_STDOUT: + self._recv_to_fd(self.stdout, chunk_len) + elif chunk_type == CHUNKTYPE_STDERR: + self._recv_to_fd(self.stderr, chunk_len) + elif chunk_type == CHUNKTYPE_EXIT: + self._process_exit(chunk_len) + elif chunk_type == CHUNKTYPE_SENDINPUT: + # signal stdin thread to get and send more data + with self.stdin_condition: + self.stdin_condition.notify() + else: + raise NailgunException( + 'Unexpected chunk type: {0}'.format(chunk_type), + NailgunException.UNEXPECTED_CHUNKTYPE) def __enter__(self): return self @@ -487,11 +643,11 @@ def __exit__(self, type, value, traceback): def monotonic_time_nanos(): - '''Returns a monotonically-increasing timestamp value in nanoseconds. + """Returns a monotonically-increasing timestamp value in nanoseconds. The epoch of the return value is undefined. To use this, you must call it more than once and calculate the delta between two calls. - ''' + """ # This function should be overwritten below on supported platforms. raise Exception('Unsupported platform: ' + platform.system()) @@ -551,183 +707,68 @@ def _monotonic_time_nanos_cygwin(): return perf_counter.value * NSEC_PER_SEC / perf_frequency.value monotonic_time_nanos = _monotonic_time_nanos_cygwin - -def send_chunk(buf, chunk_type, nailgun_connection): - ''' - Sends a chunk noting the specified payload size and chunk type. - ''' - struct.pack_into('>ic', nailgun_connection.header_buf, 0, len(buf), chunk_type) - nailgun_connection.sendtime_nanos = monotonic_time_nanos() - nailgun_connection.transport.sendall(nailgun_connection.header_buf.raw) - nailgun_connection.transport.sendall(buf) - - -def send_env_var(name, value, nailgun_connection): - ''' - Sends an environment variable in KEY=VALUE format. - ''' - send_chunk('='.join((name, value)), CHUNKTYPE_ENV, nailgun_connection) - - -def send_tty_format(f, nailgun_connection): - ''' - Sends a NAILGUN_TTY_# environment variable. - ''' - if not f or not hasattr(f, 'fileno'): - return - fileno = f.fileno() - isatty = os.isatty(fileno) - send_env_var('NAILGUN_TTY_' + str(fileno), str(int(isatty)), nailgun_connection) +def send_thread_main(conn): + """ + Sending thread worker function + Waits for data and transmits it to server + """ + try: + header_buf = ctypes.create_string_buffer(CHUNK_HEADER_LEN) + while not conn.shutdown_event.is_set(): + while not conn.send_queue.empty(): + # only this thread can deplete the queue, so it is safe to use blocking get() + (chunk_type, buf) = conn.send_queue.get() + struct.pack_into('>ic', header_buf, 0, len(buf), chunk_type) + conn.sendtime_nanos = monotonic_time_nanos() + conn.transport.sendall(header_buf.raw) + conn.transport.sendall(buf) + + with conn.send_condition: + conn.send_condition.wait() + except Exception as e: + # save exception to rethrow on main thread + with conn.error_lock: + conn.error = e + conn.shutdown_event.set() -def send_file_arg(filename, nailgun_connection): - ''' - Sends the contents of a file to the server. - ''' - with open(filename) as f: +def stdin_thread_main(conn): + """ + Stdin thread reading worker function + If stdin is available, read it to internal buffer and send to server + """ + try: + eof = False while True: - num_bytes = f.readinto(nailgun_connection.buf) - if not num_bytes: - break - send_chunk( - nailgun_connection.buf.raw[:num_bytes], CHUNKTYPE_LONGARG, nailgun_connection) - - -def recv_to_fd(dest_file, num_bytes, nailgun_connection): - ''' - Receives num_bytes bytes from the nailgun socket and copies them to the specified file - object. Used to route data to stdout or stderr on the client. - ''' - bytes_read = 0 - - while bytes_read < num_bytes: - bytes_to_read = min(len(nailgun_connection.buf), num_bytes - bytes_read) - bytes_received = nailgun_connection.transport.recv_into( - nailgun_connection.buf, - bytes_to_read) - if dest_file: - dest_file.write(nailgun_connection.buf[:bytes_received]) - bytes_read += bytes_received - - -def recv_to_buffer(num_bytes, buf, nailgun_connection): - ''' - Receives num_bytes from the nailgun socket and writes them into the specified buffer. - ''' - # We'd love to use socket.recv_into() everywhere to avoid - # unnecessary copies, but we need to support Python 2.6. The - # only way to provide an offset to recv_into() is to use - # memoryview(), which doesn't exist until Python 2.7. - if HAS_MEMORYVIEW: - recv_into_memoryview(num_bytes, memoryview(buf), nailgun_connection) - else: - recv_to_buffer_with_copy(num_bytes, buf, nailgun_connection) - - -def recv_into_memoryview(num_bytes, buf_view, nailgun_connection): - ''' - Receives num_bytes from the nailgun socket and writes them into the specified memoryview - to avoid an extra copy. - ''' - bytes_read = 0 - while bytes_read < num_bytes: - bytes_received = nailgun_connection.transport.recv_into( - buf_view[bytes_read:], - num_bytes - bytes_read) - if not bytes_received: - raise NailgunException( - 'Server unexpectedly disconnected in recv_into()', - NailgunException.CONNECTION_BROKEN) - bytes_read += bytes_received - - -def recv_to_buffer_with_copy(num_bytes, buf, nailgun_connection): - ''' - Receives num_bytes from the nailgun socket and writes them into the specified buffer. - ''' - bytes_read = 0 - while bytes_read < num_bytes: - recv_buf = nailgun_connection.transport.recv( - num_bytes - bytes_read) - if not len(recv_buf): - raise NailgunException( - 'Server unexpectedly disconnected in recv()', - NailgunException.CONNECTION_BROKEN) - buf[bytes_read:bytes_read + len(recv_buf)] = recv_buf - bytes_read += len(recv_buf) - - -def process_exit(exit_len, nailgun_connection): - ''' - Receives an exit code from the nailgun server and sets nailgun_connection.exit_code - to indicate the client should exit. - ''' - num_bytes = min(len(nailgun_connection.buf), exit_len) - recv_to_buffer(num_bytes, nailgun_connection.buf, nailgun_connection) - nailgun_connection.exit_code = int(''.join(nailgun_connection.buf.raw[:num_bytes])) + # wait for signal to read new line from stdin or shutdown + # we do not start reading from stdin before server actually requests that + with conn.stdin_condition: + conn.stdin_condition.wait() + if conn.shutdown_event.is_set(): + return -def send_heartbeat(nailgun_connection): - ''' - Sends a heartbeat to the nailgun server to indicate the client is still alive. - ''' - try: - send_chunk('', CHUNKTYPE_HEARTBEAT, nailgun_connection) - except IOError as e: - # The Nailgun C client ignores SIGPIPE etc. on heartbeats, - # so we do too. (This typically happens when shutting down.) - pass - + if not conn.stdin or eof: + conn._send_chunk(buf, CHUNKTYPE_STDIN_EOF) + continue -def stdin_thread_main(stdin, queue, shutdown_event, ready_to_send_condition): - if not stdin: - return - try: - while not shutdown_event.is_set(): - with ready_to_send_condition: - ready_to_send_condition.wait() - if shutdown_event.is_set(): - break - # This is a bit cheesy, but there isn't a great way to - # portably tell Python to read as much as possible on - # stdin without blocking. - buf = stdin.readline() + buf = conn.stdin.readline() if buf == '': - queue.put((EVENT_STDIN_CLOSED, None)) - break - queue.put((EVENT_STDIN_CHUNK, buf)) + eof = True + conn._send_chunk(buf, CHUNKTYPE_STDIN_EOF) + continue + conn._send_chunk(buf, CHUNKTYPE_STDIN) except Exception as e: - queue.put((EVENT_STDIN_EXCEPTION, e)) - - -def process_nailgun_stream(nailgun_connection): - ''' - Processes a single chunk from the nailgun server. - ''' - recv_to_buffer( - len(nailgun_connection.header_buf), nailgun_connection.header_buf, nailgun_connection) - (chunk_len, chunk_type) = struct.unpack_from('>ic', nailgun_connection.header_buf.raw) - - if chunk_type == CHUNKTYPE_STDOUT: - recv_to_fd(nailgun_connection.stdout, chunk_len, nailgun_connection) - elif chunk_type == CHUNKTYPE_STDERR: - recv_to_fd(nailgun_connection.stderr, chunk_len, nailgun_connection) - elif chunk_type == CHUNKTYPE_EXIT: - process_exit(chunk_len, nailgun_connection) - elif chunk_type == CHUNKTYPE_SENDINPUT: - with nailgun_connection.ready_to_send_condition: - # Wake up the stdin thread and tell it to read as much data as possible. - nailgun_connection.ready_to_send_condition.notify() - else: - raise NailgunException( - 'Unexpected chunk type: {0}'.format(chunk_type), - NailgunException.UNEXPECTED_CHUNKTYPE) + # save exception to rethrow on main thread + with conn.error_lock: + conn.error = e + conn.shutdown_event.set() def make_nailgun_transport(nailgun_server, nailgun_port=None, cwd=None): - ''' + """ Creates and returns a socket connection to the nailgun server. - ''' + """ transport = None if nailgun_server.startswith('local:'): if platform.system() == 'Windows': @@ -780,9 +821,9 @@ def make_nailgun_transport(nailgun_server, nailgun_port=None, cwd=None): def main(): - ''' + """ Main entry point to the nailgun client. - ''' + """ default_nailgun_server = os.environ.get('NAILGUN_SERVER', '127.0.0.1') default_nailgun_port = int(os.environ.get('NAILGUN_PORT', NAILGUN_PORT_DEFAULT)) diff --git a/pynailgun/test_ng.py b/pynailgun/test_ng.py index 113fc9d1..1f2e0782 100644 --- a/pynailgun/test_ng.py +++ b/pynailgun/test_ng.py @@ -11,13 +11,6 @@ from pynailgun import NailgunException, NailgunConnection -POSSIBLE_NAILGUN_CODES_ON_NG_STOP = [ - NailgunException.CONNECT_FAILED, - NailgunException.CONNECTION_BROKEN, - NailgunException.UNEXPECTED_CHUNKTYPE, -] - - if os.name == 'posix': def transport_exists(transport_file): return os.path.exists(transport_file) @@ -55,8 +48,14 @@ def setUpTransport(self): self.transport_address = u'local:{0}'.format(pipe_name) self.transport_file = ur'\\.\pipe\{0}'.format(pipe_name) - def getNailgunUberJar(self): - return 'nailgun-server/target/nailgun-server-0.9.2-SNAPSHOT-uber.jar' + def getClassPath(self): + cp = [ + 'nailgun-server/target/nailgun-server-0.9.3-SNAPSHOT-uber.jar', + 'nailgun-examples/target/nailgun-examples-0.9.3-SNAPSHOT.jar', + ] + if os.name == 'nt': + return ';'.join(cp) + return ':'.join(cp) def startNailgun(self): if os.name == 'posix': @@ -78,8 +77,14 @@ def preexec_fn(): stdout = None if os.name == 'posix': stdout=subprocess.PIPE + + cmd = ['java', '-Djna.nosys=true', '-classpath', self.getClassPath()] + if os.environ.get('DEBUG_MODE') == '1': + cmd.append('-agentlib:jdwp=transport=dt_socket,address=localhost:8888,server=y,suspend=y') + cmd = cmd + ['com.martiansoftware.nailgun.NGServer', self.transport_address] + self.ng_server_process = subprocess.Popen( - ['java', '-Djna.nosys=true', '-jar', self.getNailgunUberJar(), self.transport_address], + cmd, preexec_fn=preexec_fn, creationflags=creationflags, stdout=stdout, @@ -89,8 +94,12 @@ def preexec_fn(): if os.name == 'posix': # on *nix we have to wait for server to be ready to accept connections - the_first_line = self.ng_server_process.stdout.readline().strip() - self.assertTrue("NGServer" in the_first_line and "started" in the_first_line, "Got a line: {0}".format(the_first_line)) + while True: + the_first_line = self.ng_server_process.stdout.readline().strip() + if "NGServer" in the_first_line and "started" in the_first_line: + break + if the_first_line is None or the_first_line == '': + break else: for _ in range(0, 600): # on windows it is OK to rely on existence of the pipe file @@ -101,43 +110,80 @@ def preexec_fn(): self.assertTrue(transport_exists(self.transport_file)) - def test_nailgun_stats_and_stop(self): - for i in range(1, 5): - output = StringIO.StringIO() - with NailgunConnection( - self.transport_address, - stderr=None, - stdin=None, - stdout=output) as c: - exit_code = c.send_command('ng-stats') - self.assertEqual(exit_code, 0) - actual_out = output.getvalue().strip() - expected_out = 'com.martiansoftware.nailgun.builtins.NGServerStats: {0}/1'.format(i) - self.assertEqual(actual_out, expected_out) + def test_nailgun_stats(self): + output = StringIO.StringIO() + with NailgunConnection( + self.transport_address, + stderr=None, + stdin=None, + stdout=output) as c: + exit_code = c.send_command('ng-stats') + self.assertEqual(exit_code, 0) + actual_out = output.getvalue().strip() + expected_out = 'com.martiansoftware.nailgun.builtins.NGServerStats: 1/1' + self.assertEqual(actual_out, expected_out) + + def test_nailgun_exit_code(self): + output = StringIO.StringIO() + expected_exit_code = 10 + with NailgunConnection( + self.transport_address, + stderr=None, + stdin=None, + stdout=output) as c: + exit_code = c.send_command('com.martiansoftware.nailgun.examples.Exit', [str(expected_exit_code)]) + self.assertEqual(exit_code, expected_exit_code) + + def test_nailgun_stdin(self): + lines = [str(i) for i in range(100)] + echo = '\n'.join(lines) + output = StringIO.StringIO() + input = StringIO.StringIO(echo) + with NailgunConnection( + self.transport_address, + stderr=None, + stdin=input, + stdout=output) as c: + exit_code = c.send_command('com.martiansoftware.nailgun.examples.Echo') + self.assertEqual(exit_code, 0) + actual_out = output.getvalue().strip() + self.assertEqual(actual_out, echo) + + def test_nailgun_default_streams(self): + with NailgunConnection(self.transport_address) as c: + exit_code = c.send_command('ng-stats') + self.assertEqual(exit_code, 0) + def tearDown(self): try: with NailgunConnection( - self.transport_address, - cwd=os.getcwd(), - stderr=None, - stdin=None, - stdout=None) as c: + self.transport_address, + cwd=os.getcwd(), + stderr=None, + stdin=None, + stdout=None) as c: c.send_command('ng-stop') except NailgunException as e: - self.assertIn(e.code, POSSIBLE_NAILGUN_CODES_ON_NG_STOP) - - self.ng_server_process.wait() - self.assertEqual(self.ng_server_process.poll(), 0) - - def tearDown(self): - if self.ng_server_process.poll() is None: + # stopping server is a best effort + # if something wrong has happened, we will kill it anyways + pass + + # Python2 compatible wait with timeout + process_exit_code = None + for _ in range(0, 500): + process_exit_code = self.ng_server_process.poll() + if process_exit_code is not None: + break + time.sleep(0.02) # 1 second total + + if process_exit_code is None: # some test has failed, ng-server was not stopped. killing it self.ng_server_process.kill() shutil.rmtree(self.tmpdir) if __name__ == '__main__': - for i in range(50): + for i in range(10): was_successful = unittest.main(exit=False).result.wasSuccessful() if not was_successful: sys.exit(1)