View Javadoc
1   /*
2    * Copyright 2007, 2008 Ange Optimization ApS
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file excep in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    *     http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  /**
17   * @author Kim Hansen
18   */
19  package eu.simuline.octave.exec;
20  
21  import java.io.BufferedReader;
22  import java.io.File;
23  import java.io.IOException;
24  import java.io.InputStreamReader;
25  import java.io.OutputStreamWriter;
26  import java.io.Writer;
27  import java.nio.charset.Charset;
28  import java.util.Random;
29  import java.util.concurrent.CancellationException;
30  import java.util.concurrent.ExecutionException;
31  import java.util.concurrent.ExecutorService;
32  import java.util.concurrent.Executors;
33  import java.util.concurrent.Future;
34  import java.util.concurrent.ThreadFactory;
35  
36  import eu.simuline.octave.exception.OctaveException;
37  import eu.simuline.octave.exception.OctaveIOException;
38  import eu.simuline.octave.util.NamedThreadFactory;
39  import eu.simuline.octave.util.NoCloseWriter;
40  import eu.simuline.octave.util.ReaderWriterPipeThread;
41  import eu.simuline.octave.util.TeeWriter;
42  import eu.simuline.octave.OctaveUtils;
43  
44  import org.apache.commons.logging.Log;
45  import org.apache.commons.logging.LogFactory;
46  
47  /**
48   * The object connecting to the octave process. 
49   */
50  public final class OctaveExec {
51  
52      public static final String MSG_IOE_NH = 
53  	"InterruptedException should not happen";
54  
55      public static final String MSG_EXE_NH = 
56  	"ExecutionException should not happen";
57  
58      public static final String MSG_RTE_NH = 
59  	"RuntimeException should not happen";
60  
61      private static final Log LOG = LogFactory.getLog(OctaveExec.class);
62  
63      /**
64       * The octave process created in the constructor 
65       * with given command, arguments, environment and working directory. 
66       * This is initialized 
67       * in {@link #OctaveExec(int, Writer, Writer, Charset, String[], String[], File)} 
68       * and used in {@link #close()} and {@link #destroy()} only. 
69       */
70      private final Process process;
71  
72      /**
73       * The output writer for {@link #process} derived from {@link Process#getOutputStream()}. 
74       */
75      private final Writer processWriter;
76  
77      /**
78       * The input reader for {@link #process} derived from {@link Process#getInputStream()}. 
79       * This is used by {@link #evalRW(WriteFunctor, ReadFunctor)} 
80       * and used to close via {@link #close()}. 
81       */
82      private final BufferedReader processReader;
83  
84      /**
85       * Used in method {@link #evalRW(WriteFunctor, ReadFunctor)} 
86       * to submit essentially the write functor which submits the input 
87       * and thereafter the read function which collects the output. 
88       * Besides this, the executor is invoked to shutdown. 
89       */
90      private final ExecutorService executor;
91  
92      /**
93       * The error thread of the error stream of {@link #process} 
94       * writing the error stream to a given writer. 
95       * This is used to close but also to change the error writer 
96       * by {@link #setErrorWriter(Writer)}. 
97       */
98      private final ReaderWriterPipeThread errorStreamThread;
99  
100     private boolean destroyed = false;
101 
102     /**
103      * Will start the octave process.
104      *
105      * @param numThreadsReuse
106      *    the number of threads to be reused in a fixed thread pool. 
107      *    This is either positive or <code>-1</code>, 
108      *    which means that a cached thread pool is used instead of a fixed one. 
109      * @param stdinLog
110      *    This writer will capture all
111      *    that is written to the octave process via stdin,
112      *    if null the data will not be captured.
113      * @param stderrLog
114      *    This writer will capture all
115      *    that is written from the octave process on stderr,
116      *    if null the data will not be captured. 
117      * @param charset
118      *    the charset used for communication with the octave process. 
119      * @param cmdArray
120      *    The array consisting of command and arguments: 
121      *    The 0th entry is either the path to the octave program,
122      *    or the command found by looking at the built-in variable "paths" 
123      *    reconstructing the path. 
124      *    starting with the 1th entry, 
125      *    may follow the array of arguments to start the octave program with. 
126      *    CAUTION: allowed values depend on the octave version. 
127      * @param environment
128      *    Either the environment for the octave process, 
129      *    i.e. the set of values of environment variables 
130      *    with each entry of the form <code>name=value</code> 
131      *    or null to make {@link #process}, 
132      *    the process created, inherit the environment of the current process. 
133      * @param workingDir
134      *    Either the working directory for the octave process, or <code>null</code> to make {@link #process}, 
135      *    the process created, inherit the working directory
136      *    of the current process.
137      * @throws OctaveIOException
138      *    If execution
139      */
140     public OctaveExec(final int numThreadsReuse,
141 		      final Writer stdinLog, 
142 		      final Writer stderrLog, 
143 		      final Charset charset, // TBD: ensure that various charsets fit. 
144 		      final String[] cmdArray,
145 		      final String[] environment, // always invoked with null 
146 		      final File workingDir) {
147 	ThreadFactory threadFactory = new NamedThreadFactory();
148 	this.executor = numThreadsReuse == -1
149 	    ? Executors.newCachedThreadPool(threadFactory)
150 	    : Executors.newFixedThreadPool(numThreadsReuse, threadFactory);
151 
152 	try {
153 	    // exec may throw 
154 	    // - SecurityException TBC
155 	    // - UnsupportedOperationException TBC
156 	    // - IOException (handled by catch)
157 	    // - NullPointerException if cmdArray is null 
158 	    //   or so is one of its components, 
159 	    // - IndexOutOfBoundsException if cmdArray is empty
160 	    // The latter two are excluded. 
161             this.process = Runtime.getRuntime().exec(cmdArray, 
162 						     environment, 
163 						     workingDir);
164         } catch (final IOException e) {
165             throw new OctaveIOException(e);
166         }
167         // Connect stderr
168         this.errorStreamThread = ReaderWriterPipeThread
169 	    .instantiate(new InputStreamReader(this.process.getErrorStream(), charset),
170 			 stderrLog);
171 
172         // Connect stdout
173         this.processReader = new BufferedReader
174 	    (new InputStreamReader(this.process.getInputStream(), charset));
175 
176         // Connect stdin
177 	Writer pw = new OutputStreamWriter(this.process.getOutputStream(), charset);
178 	// all written to processWriter will go to pw and, 
179 	// if not null to stdinLog
180 	this.processWriter = (stdinLog == null)
181 	    ? pw
182 	    : new TeeWriter(new NoCloseWriter(stdinLog), pw);
183     }
184 
185     private final Random random = new Random();
186 
187     private String generateSpacer() {
188         return "-=+X+=- Octave.java spacer -=+X+=- " + 
189 	    this.random.nextLong() + " -=+X+=-";
190     }
191 
192     /**
193      * Passes <code>input</code> to octave, get back <code>output</code> 
194      * and throws according exceptions if reading or writing went wrong. 
195      *
196      * @param input
197      *    a write functor which represents the script 
198      *    to be executed in octave. 
199      * @param output
200      *    the read functor which reads the result of octave execution. 
201      *    After evaluation of this method, 
202      *    the <code>output</code> is asked for the result. 
203      *///<code></code>
204     // used in OctaveIO#set(Map), OctaveIO#get(String), 
205     // OctaveIO#checkIfVarExists(String) and in 
206     // OctaveEngine#unsafeEval(String) OctaveEngine#unsafeEval(Reader) and 
207     // OctaveEngine#getVersion() only 
208     // TBD: document which exceptions can be thrown in detail 
209     public void evalRW(final WriteFunctor input, final ReadFunctor output) {
210         final String spacer = generateSpacer();
211         final Future<Void> writerFuture = 
212 	    this.executor.submit(new OctaveWriterCallable(this.processWriter, 
213 							  input, 
214 							  spacer));
215         final Future<Void> readerFuture = 
216 	    this.executor.submit(new OctaveReaderCallable(this.processReader, 
217 							  output, 
218 							  spacer));
219         final RuntimeException writerException = getFromFuture(writerFuture);
220         // if (writerException instanceof CancellationException) {
221         //     LOG.error("Did not expect writer to be canceled", 
222 	//               writerException);
223         // }
224         if (writerException != null) {
225             if (writerException instanceof CancellationException) {
226                 LOG.error("Did not expect writer to be canceled", 
227 	    		  writerException);
228             }
229             readerFuture.cancel(true); // may interrupt if running 
230 	    throw writerException;
231         }
232         final RuntimeException readerException = getFromFuture(readerFuture);
233         // if (writerException != null) {
234         //     throw writerException;
235         // }
236         if (readerException != null) {
237             // Only gets here when writerException==null, 
238 	    // and in that case we don't expect the reader to be cancelled
239             if (readerException instanceof CancellationException) {
240                 LOG.error("Did not expect reader to be canceled", 
241 			  readerException);
242             }
243             throw readerException;
244         }
245     }
246 
247     /**
248      * Completes computation on future 
249      * and returns an exception thrown or null. 
250      */
251     private RuntimeException getFromFuture(Future<Void> future) {
252 	try {
253             future.get();
254         } catch (final InterruptedException e) {
255             LOG.error(MSG_IOE_NH, e);
256             return new RuntimeException(MSG_IOE_NH, e);
257         } catch (final ExecutionException e) {
258             if (e.getCause() instanceof OctaveException) {
259                 final OctaveException/../../eu/simuline/octave/exception/OctaveException.html#OctaveException">OctaveException oe = (OctaveException) e.getCause();
260                 return reInstException(oe);
261             }
262             // Can happen when there is an error in a OctaveWriter
263  	    LOG.error(MSG_EXE_NH, e);
264 	    return new RuntimeException(MSG_EXE_NH, e);
265         } catch (final CancellationException e) {
266             return e;
267         } catch (final RuntimeException e) { // NOPMD 
268             LOG.error(MSG_RTE_NH, e);
269             return new RuntimeException(MSG_RTE_NH, e);
270         }
271         return null;
272     }
273 
274     /**
275      * Used by {@link #getFromFuture(Future)} 
276      * to re-instantiate an {@link OctaveException} 
277      * if this occurs as the cause of an {@link ExecutionException}. 
278      */
279     private OctaveExceptionsimuline/octave/exception/OctaveException.html#OctaveException">OctaveException reInstException(OctaveException exc) {
280         OctaveException res;
281         try {
282             res = exc.getClass()
283 		// may throw NoSuchMethodException 
284 		// isa ReflectiveOperationException, 
285 		// SecurityException isa RuntimeException 
286 		.getConstructor(String.class, Throwable.class)
287 		// may throw 
288 		// IllegalArgumentException, 
289 		// ReflectiveOperationException: 
290 		// - IllegalAccessException: constructor inaccessible 
291 		// - InstantiationException: Exception class is abstract 
292 		// - InvocationTargetException: if the constructor throws an exc
293 		// ExceptionInInitializerError
294 		.newInstance(exc.getMessage(), exc);
295         } catch (RuntimeException e) { // NOPMD
296             throw new IllegalStateException("Exception should not happen", e);
297         } catch (ReflectiveOperationException e) {
298             throw new IllegalStateException("Exception should not happen", e);
299         } catch (ExceptionInInitializerError e) {
300             throw new IllegalStateException("Error should not happen", e);
301 	}
302         if (isDestroyed()) {
303             res.setDestroyed(true);
304         }
305         return res;
306     }
307 
308     /**
309      * Sets {@link #destroyed} to the parameter value given. 
310      */
311     private synchronized void setDestroyed(final boolean destroyed) {
312         this.destroyed = destroyed;
313     }
314 
315     /**
316      * Returns {@link #destroyed}. 
317      */
318     private synchronized boolean isDestroyed() {
319         return this.destroyed;
320     }
321 
322     /**
323      * Kill the octave process without remorse. 
324      */
325     public void destroy() {
326         setDestroyed(true);
327         this.executor.shutdownNow(); // returns list of tasks awaiting exec. 
328         this.process.destroy();
329         this.errorStreamThread.close();
330         try {
331             this.processWriter.close();
332         } catch (final IOException e) {
333             LOG.debug("Ignored error from processWriter.close() " + 
334 		      "in OctaveExec.destroy()", e);
335 	}
336     }
337 
338     /**
339      * Close the octave process in an orderly fashion: 
340      * Send command <code>exit</code> and expect a single line in return, 
341      * namely an empty one. 
342      *
343      * @throws OctaveIOException
344      *    if 
345      */
346     public void close() {
347         try {
348             // it is not worth it to rewrite this 
349 	    // to use eval() and some specialized Functors
350             // the next three commands all may throw IOException 
351             this.processWriter.write("exit\n");
352             this.processWriter.close();
353             final String read1 = this.processReader.readLine();
354             
355             // Allow a single blank line, exit in octave 3.2 returns that:
356             if (read1 != null && !"".equals(read1)) {
357                 throw new OctaveIOException
358 		    ("Expected a blank line, read '" + read1 + "'");
359             }
360             final String read2 = this.processReader.readLine();// may throw IOExceptino 
361             if (read2 != null) {
362                 throw new OctaveIOException
363 		    ("Expected reader to be at end of stream, read '" + 
364 		     read2 + "'");
365             }
366             // may throw IOException 
367             this.processReader.close();
368             // may throw IOException 
369             this.errorStreamThread.close();
370             int exitValue;
371             try {
372                 exitValue = this.process.waitFor();
373             } catch (final InterruptedException e) {
374                 throw new OctaveIOException
375 		    ("Interrupted when waiting for octave process " + 
376 		     "to terminate", e);
377             }
378             if (exitValue != 0) {
379                 throw new OctaveIOException
380 		    ("octave process terminated abnormaly, exitValue='" + 
381 		     exitValue + "'");
382             }
383         } catch (final IOException e) {
384             // TBD: correct this: may be also a problem with the writer. 
385             // Reader: may be processReader or error stream. 
386             final OctaveIOException octaveException = 
387 		new OctaveIOException("reader error", e);
388             if (isDestroyed()) {
389                 octaveException.setDestroyed(true);
390             }
391             throw octaveException;
392         } finally {
393             this.executor.shutdown();
394         }
395     }
396 
397     /**
398      * @param writer
399      *    the new writer to write the error output to
400      */
401     public void setErrorWriter(final Writer writer) {
402         this.errorStreamThread.setWriter(writer);
403     }
404 
405 }