Coverage Report - eu.simuline.octave.exec.OctaveExec
 
Classes in this File Line Coverage Branch Coverage Complexity
OctaveExec
76%
83/109
65%
17/26
5
 
 1  
 /*
 2  
  * Copyright 2007, 2008 Ange Optimization ApS
 3  
  *
 4  
  * Licensed under the Apache License, Version 2.0 (the "License");
 5  
  * you may not use this file excep in compliance with the License.
 6  
  * You may obtain a copy of the License at
 7  
  *
 8  
  *     http://www.apache.org/licenses/LICENSE-2.0
 9  
  *
 10  
  * Unless required by applicable law or agreed to in writing, software
 11  
  * distributed under the License is distributed on an "AS IS" BASIS,
 12  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 13  
  * See the License for the specific language governing permissions and
 14  
  * limitations under the License.
 15  
  */
 16  
 /**
 17  
  * @author Kim Hansen
 18  
  */
 19  
 package eu.simuline.octave.exec;
 20  
 
 21  
 import java.io.BufferedReader;
 22  
 import java.io.File;
 23  
 import java.io.IOException;
 24  
 import java.io.InputStreamReader;
 25  
 import java.io.OutputStreamWriter;
 26  
 import java.io.Writer;
 27  
 
 28  
 import java.util.Random;
 29  
 import java.util.concurrent.CancellationException;
 30  
 import java.util.concurrent.ExecutionException;
 31  
 import java.util.concurrent.ExecutorService;
 32  
 import java.util.concurrent.Executors;
 33  
 import java.util.concurrent.Future;
 34  
 import java.util.concurrent.ThreadFactory;
 35  
 
 36  
 import eu.simuline.octave.exception.OctaveException;
 37  
 import eu.simuline.octave.exception.OctaveIOException;
 38  
 import eu.simuline.octave.util.NamedThreadFactory;
 39  
 import eu.simuline.octave.util.NoCloseWriter;
 40  
 import eu.simuline.octave.util.ReaderWriterPipeThread;
 41  
 import eu.simuline.octave.util.TeeWriter;
 42  
 import eu.simuline.octave.OctaveUtils;
 43  
 
 44  
 import org.apache.commons.logging.Log;
 45  
 import org.apache.commons.logging.LogFactory;
 46  
 
 47  
 /**
 48  
  * The object connecting to the octave process. 
 49  
  */
 50  
 public final class OctaveExec {
 51  
 
 52  
     public static final String MSG_IOE_NH = 
 53  
         "InterruptedException should not happen";
 54  
 
 55  
     public static final String MSG_EXE_NH = 
 56  
         "ExecutionException should not happen";
 57  
 
 58  
     public static final String MSG_RTE_NH = 
 59  
         "RuntimeException should not happen";
 60  
 
 61  2
     private static final Log LOG = LogFactory.getLog(OctaveExec.class);
 62  
 
 63  
     /**
 64  
      * The octave process created in the constructor 
 65  
      * with given command, arguments, environment and working directory. 
 66  
      */
 67  
     private final Process process;
 68  
 
 69  
     /**
 70  
      * 
 71  
      */
 72  
     private final Writer processWriter;
 73  
 
 74  
     /**
 75  
      * The input reader for {@link #process}. 
 76  
      * This is used by {@link #evalRW(WriteFunctor, ReadFunctor)} 
 77  
      * and used to close via {@link #close()}. 
 78  
      */
 79  
     private final BufferedReader processReader;
 80  
 
 81  
     /**
 82  
      * Used in method {@link #evalRW(WriteFunctor, ReadFunctor)} 
 83  
      * to submit essentially the write functor which submits the input 
 84  
      * and thereafter the read function which collects the output. 
 85  
      * Besides this, the executor is invoked to shutdown. 
 86  
      */
 87  
     private final ExecutorService executor;
 88  
 
 89  
     /**
 90  
      * The error thread of the error stream of {@link #process} 
 91  
      * writing the error stream to a given writer. 
 92  
      * This is used to close but also to change the error writer 
 93  
      * by {@link #setErrorWriter(Writer)}. 
 94  
      */
 95  
     private final ReaderWriterPipeThread errorStreamThread;
 96  
 
 97  106
     private boolean destroyed = false;
 98  
 
 99  
     /**
 100  
      * Will start the octave process.
 101  
      *
 102  
      * @param numThreadsReuse
 103  
      *    the number of threads to be reused in a fixed thread pool. 
 104  
      *    This is either positive or <code>-1</code>, 
 105  
      *    which means that a cached thread pool is used instead of a fixed one. 
 106  
      * @param stdinLog
 107  
      *    This writer will capture all
 108  
      *    that is written to the octave process via stdin,
 109  
      *    if null the data will not be captured.
 110  
      * @param stderrLog
 111  
      *    This writer will capture all
 112  
      *    that is written from the octave process on stderr,
 113  
      *    if null the data will not be captured. 
 114  
      * @param cmdArray
 115  
      *    The array consisting of command and arguments: 
 116  
      *    The 0th entry is either the path to the octave program,
 117  
      *    or the command found by looking at the builtin variable "paths" 
 118  
      *    reconstructing the path. 
 119  
      *    starting with the 1th entry, 
 120  
      *    may follow the array of arguments to start the octave program with. 
 121  
      *    CAUTION: allowed values depend on the octave version. 
 122  
      * @param environment
 123  
      *    The environment for the octave process, 
 124  
      *    i.e. the set of values of environment variables 
 125  
      *    if null the process will inherit the environment
 126  
      *    for the virtual machine. 
 127  
      *    If not null, each entry has the form <code>name=value</code>. 
 128  
      * @param workingDir
 129  
      *    This will be the working dir for the octave process,
 130  
      *    if null the process will inherit the working dir
 131  
      *    of the current process.
 132  
      */
 133  
     public OctaveExec(final int numThreadsReuse,
 134  
                       final Writer stdinLog, 
 135  
                       final Writer stderrLog, 
 136  
                       final String[] cmdArray,
 137  
                       final String[] environment, // always invoked with null 
 138  106
                       final File workingDir) {
 139  106
         ThreadFactory threadFactory = new NamedThreadFactory();
 140  106
         this.executor = numThreadsReuse == -1
 141  0
             ? Executors.newCachedThreadPool(threadFactory)
 142  106
             : Executors.newFixedThreadPool(numThreadsReuse, threadFactory);
 143  
 
 144  
         try {
 145  106
             this.process = Runtime.getRuntime().exec(cmdArray, 
 146  
                                                      environment, 
 147  
                                                      workingDir);
 148  0
         } catch (final IOException e) {
 149  0
             throw new OctaveIOException(e);
 150  106
         }
 151  
         // Connect stderr
 152  106
         this.errorStreamThread = ReaderWriterPipeThread
 153  212
             .instantiate(new InputStreamReader(this.process.getErrorStream(), 
 154  106
                                                OctaveUtils.getUTF8()),
 155  
                          stderrLog);
 156  
 
 157  
         // Connect stdout
 158  106
         this.processReader = new BufferedReader
 159  106
             (new InputStreamReader(this.process.getInputStream(), 
 160  106
                                    OctaveUtils.getUTF8()));
 161  
 
 162  
         // Connect stdin
 163  106
         Writer pw = new OutputStreamWriter(this.process.getOutputStream(),
 164  106
                                            OctaveUtils.getUTF8());
 165  
         // all written to processWriter will go to pw and, 
 166  
         // if not null to stdinLog
 167  106
         this.processWriter = (stdinLog == null)
 168  
             ? pw
 169  
             : new TeeWriter(new NoCloseWriter(stdinLog), pw);
 170  106
     }
 171  
 
 172  106
     private final Random random = new Random();
 173  
 
 174  
     private String generateSpacer() {
 175  4284
         return "-=+X+=- Octave.java spacer -=+X+=- " + 
 176  2142
             random.nextLong() + " -=+X+=-";
 177  
     }
 178  
 
 179  
     /**
 180  
      * Passes <code>input</code> to octave 
 181  
      * and get back <code>output</code>. 
 182  
      *
 183  
      * @param input
 184  
      *    a write functor which represents the script 
 185  
      *    to be executed in octave. 
 186  
      * @param output
 187  
      *    the read functor which reads the result of octave execution. 
 188  
      *    After evaluation of this method, 
 189  
      *    the <code>output</code> is asked for the result. 
 190  
      *///<code></code>
 191  
     // used in OctaveIO#set(Map), OctaveIO#get(String), 
 192  
     // OctaveIO#checkIfVarExists(String) and in 
 193  
     // OctaveEngine#unsafeEval(String) OctaveEngine#unsafeEval(Reader) and 
 194  
     // OctaveEngine#getVersion() only 
 195  
     public void evalRW(final WriteFunctor input, final ReadFunctor output) {
 196  2142
         final String spacer = generateSpacer();
 197  2142
         final Future<Void> writerFuture = 
 198  2142
             this.executor.submit(new OctaveWriterCallable(this.processWriter, 
 199  
                                                           input, 
 200  
                                                           spacer));
 201  2142
         final Future<Void> readerFuture = 
 202  2142
             this.executor.submit(new OctaveReaderCallable(this.processReader, 
 203  
                                                           output, 
 204  
                                                           spacer));
 205  2142
         final RuntimeException writerException = getFromFuture(writerFuture);
 206  
         // if (writerException instanceof CancellationException) {
 207  
         //     LOG.error("Did not expect writer to be canceled", 
 208  
         //               writerException);
 209  
         // }
 210  2142
         if (writerException != null) {
 211  4
             if (writerException instanceof CancellationException) {
 212  0
                 LOG.error("Did not expect writer to be canceled", 
 213  
                               writerException);
 214  
             }
 215  4
             readerFuture.cancel(true); // may interrupt if running 
 216  4
             throw writerException;
 217  
         }
 218  2138
         final RuntimeException readerException = getFromFuture(readerFuture);
 219  
         // if (writerException != null) {
 220  
         //     throw writerException;
 221  
         // }
 222  2138
         if (readerException != null) {
 223  
             // Only gets here when writerException==null, 
 224  
             // and in that case we don't expect the reader to be cancelled
 225  12
             if (readerException instanceof CancellationException) {
 226  0
                 LOG.error("Did not expect reader to be canceled", 
 227  
                           readerException);
 228  
             }
 229  12
             throw readerException;
 230  
         }
 231  2126
     }
 232  
 
 233  
     /**
 234  
      * Completes computation on future 
 235  
      * and returns an exception thrown or null. 
 236  
      */
 237  
     private RuntimeException getFromFuture(Future<Void> future) {
 238  
         try {
 239  4280
             future.get();
 240  0
         } catch (final InterruptedException e) {
 241  0
             LOG.error(MSG_IOE_NH, e);
 242  0
             return new RuntimeException(MSG_IOE_NH, e);
 243  16
         } catch (final ExecutionException e) {
 244  16
             if (e.getCause() instanceof OctaveException) {
 245  16
                 final OctaveException oe = (OctaveException) e.getCause();
 246  16
                 return reInstException(oe);
 247  
             }
 248  
             // Can happen when there is an error in a OctaveWriter
 249  0
              LOG.error(MSG_EXE_NH, e);
 250  0
             return new RuntimeException(MSG_EXE_NH, e);
 251  0
         } catch (final CancellationException e) {
 252  0
             return e;
 253  0
         } catch (final RuntimeException e) { // NOPMD 
 254  0
             LOG.error(MSG_RTE_NH, e);
 255  0
             return new RuntimeException(MSG_RTE_NH, e);
 256  4264
         }
 257  4264
         return null;
 258  
     }
 259  
 
 260  
     /**
 261  
      * Used by {@link #getFromFuture(Future)} 
 262  
      * to reinstantiate an {@link OctaveException} 
 263  
      * if this occurs as the cause of an {@link ExecutionException}. 
 264  
      */
 265  
     private OctaveException reInstException(OctaveException exc) {
 266  
         OctaveException res;
 267  
         try {
 268  16
             res = exc.getClass()
 269  
                 // may throw NoSuchMethodException 
 270  
                 // isa ReflectiveOperationException, 
 271  
                 // SecurityException isa RuntimeException 
 272  16
                 .getConstructor(String.class, Throwable.class)
 273  
                 // may throw 
 274  
                 // IllegalArgumentException, 
 275  
                 // ReflectiveOperationException: 
 276  
                 // - IllegalAccessException: construtor inaccessible 
 277  
                 // - InstantiationException: Exception class is abstract 
 278  
                 // - InvocationTargetException: if the constructor throws an exc
 279  
                 // ExceptionInInitializerError
 280  16
                 .newInstance(exc.getMessage(), exc);
 281  0
         } catch (RuntimeException e) { // NOPMD
 282  0
             throw new IllegalStateException("Exception should not happen", e);
 283  0
         } catch (ReflectiveOperationException e) {
 284  0
             throw new IllegalStateException("Exception should not happen", e);
 285  0
         } catch (ExceptionInInitializerError e) {
 286  0
             throw new IllegalStateException("Error should not happen", e);
 287  16
         }
 288  16
         if (isDestroyed()) {
 289  2
             res.setDestroyed(true);
 290  
         }
 291  16
         return res;
 292  
     }
 293  
 
 294  
     /**
 295  
      * Sets {@link #destroyed} to the parameter value given. 
 296  
      */
 297  
     private synchronized void setDestroyed(final boolean destroyed) {
 298  12
         this.destroyed = destroyed;
 299  12
     }
 300  
 
 301  
     /**
 302  
      * Returns {@link #destroyed}. 
 303  
      */
 304  
     private synchronized boolean isDestroyed() {
 305  30
         return this.destroyed;
 306  
     }
 307  
 
 308  
     /**
 309  
      * Kill the octave process without remorse. 
 310  
      */
 311  
     public void destroy() {
 312  12
         setDestroyed(true);
 313  12
         this.executor.shutdownNow(); // returns list of tasks awaiting exec. 
 314  12
         this.process.destroy();
 315  12
         this.errorStreamThread.close();
 316  
         try {
 317  12
             this.processWriter.close();
 318  4
         } catch (final IOException e) {
 319  4
             LOG.debug("Ignored error from processWriter.close() " + 
 320  
                       "in OctaveExec.destroy()", e);
 321  8
         }
 322  12
     }
 323  
 
 324  
     /**
 325  
      * Close the octave process in an orderly fashion.
 326  
      */
 327  
     public void close() {
 328  
         try {
 329  
             // it is not worth it to rewrite this 
 330  
             // to use eval() and some specialised Functors
 331  108
             this.processWriter.write("exit\n");
 332  102
             this.processWriter.close();
 333  94
             final String read1 = this.processReader.readLine();
 334  
             // Allow a single blank line, exit in octave 3.2 returns that:
 335  94
             if (read1 != null && !"".equals(read1)) {
 336  0
                 throw new OctaveIOException
 337  
                     ("Expected a blank line, read '" + read1 + "'");
 338  
             }
 339  94
             final String read2 = this.processReader.readLine();
 340  94
             if (read2 != null) {
 341  0
                 throw new OctaveIOException
 342  
                     ("Expected reader to be at end of stream, read '" + 
 343  
                      read2 + "'");
 344  
             }
 345  94
             this.processReader.close();
 346  94
             this.errorStreamThread.close();
 347  
             int exitValue;
 348  
             try {
 349  94
                 exitValue = this.process.waitFor();
 350  0
             } catch (final InterruptedException e) {
 351  0
                 throw new OctaveIOException
 352  
                     ("Interrupted when waiting for octave process " + 
 353  
                      "to terminate", e);
 354  94
             }
 355  94
             if (exitValue != 0) {
 356  0
                 throw new OctaveIOException
 357  
                     ("octave process terminated abnormaly, exitValue='" + 
 358  
                      exitValue + "'");
 359  
             }
 360  14
         } catch (final IOException e) {
 361  14
             final OctaveIOException octaveException = 
 362  
                 new OctaveIOException("reader error", e);
 363  14
             if (isDestroyed()) {
 364  10
                 octaveException.setDestroyed(true);
 365  
             }
 366  14
             throw octaveException;
 367  
         } finally {
 368  108
             this.executor.shutdown();
 369  94
         }
 370  94
     }
 371  
 
 372  
     /**
 373  
      * @param writer
 374  
      *    the new writer to write the error output to
 375  
      */
 376  
     public void setErrorWriter(final Writer writer) {
 377  2
         this.errorStreamThread.setWriter(writer);
 378  2
     }
 379  
 
 380  
 }