View Javadoc
1   /*
2    * Copyright 2007, 2008 Ange Optimization ApS
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file excep in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    *     http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  /**
17   * @author Kim Hansen
18   */
19  package eu.simuline.octave.exec;
20  
21  import java.io.BufferedReader;
22  import java.io.File;
23  import java.io.IOException;
24  import java.io.InputStreamReader;
25  import java.io.OutputStreamWriter;
26  import java.io.Writer;
27  
28  import java.util.Random;
29  import java.util.concurrent.CancellationException;
30  import java.util.concurrent.ExecutionException;
31  import java.util.concurrent.ExecutorService;
32  import java.util.concurrent.Executors;
33  import java.util.concurrent.Future;
34  import java.util.concurrent.ThreadFactory;
35  
36  import eu.simuline.octave.exception.OctaveException;
37  import eu.simuline.octave.exception.OctaveIOException;
38  import eu.simuline.octave.util.NamedThreadFactory;
39  import eu.simuline.octave.util.NoCloseWriter;
40  import eu.simuline.octave.util.ReaderWriterPipeThread;
41  import eu.simuline.octave.util.TeeWriter;
42  import eu.simuline.octave.OctaveUtils;
43  
44  import org.apache.commons.logging.Log;
45  import org.apache.commons.logging.LogFactory;
46  
47  /**
48   * The object connecting to the octave process. 
49   */
50  public final class OctaveExec {
51  
52      public static final String MSG_IOE_NH = 
53  	"InterruptedException should not happen";
54  
55      public static final String MSG_EXE_NH = 
56  	"ExecutionException should not happen";
57  
58      public static final String MSG_RTE_NH = 
59  	"RuntimeException should not happen";
60  
61      private static final Log LOG = LogFactory.getLog(OctaveExec.class);
62  
63      /**
64       * The octave process created in the constructor 
65       * with given command, arguments, environment and working directory. 
66       */
67      private final Process process;
68  
69      /**
70       * 
71       */
72      private final Writer processWriter;
73  
74      /**
75       * The input reader for {@link #process}. 
76       * This is used by {@link #evalRW(WriteFunctor, ReadFunctor)} 
77       * and used to close via {@link #close()}. 
78       */
79      private final BufferedReader processReader;
80  
81      /**
82       * Used in method {@link #evalRW(WriteFunctor, ReadFunctor)} 
83       * to submit essentially the write functor which submits the input 
84       * and thereafter the read function which collects the output. 
85       * Besides this, the executor is invoked to shutdown. 
86       */
87      private final ExecutorService executor;
88  
89      /**
90       * The error thread of the error stream of {@link #process} 
91       * writing the error stream to a given writer. 
92       * This is used to close but also to change the error writer 
93       * by {@link #setErrorWriter(Writer)}. 
94       */
95      private final ReaderWriterPipeThread errorStreamThread;
96  
97      private boolean destroyed = false;
98  
99      /**
100      * Will start the octave process.
101      *
102      * @param numThreadsReuse
103      *    the number of threads to be reused in a fixed thread pool. 
104      *    This is either positive or <code>-1</code>, 
105      *    which means that a cached thread pool is used instead of a fixed one. 
106      * @param stdinLog
107      *    This writer will capture all
108      *    that is written to the octave process via stdin,
109      *    if null the data will not be captured.
110      * @param stderrLog
111      *    This writer will capture all
112      *    that is written from the octave process on stderr,
113      *    if null the data will not be captured. 
114      * @param cmdArray
115      *    The array consisting of command and arguments: 
116      *    The 0th entry is either the path to the octave program,
117      *    or the command found by looking at the builtin variable "paths" 
118      *    reconstructing the path. 
119      *    starting with the 1th entry, 
120      *    may follow the array of arguments to start the octave program with. 
121      *    CAUTION: allowed values depend on the octave version. 
122      * @param environment
123      *    The environment for the octave process, 
124      *    i.e. the set of values of environment variables 
125      *    if null the process will inherit the environment
126      *    for the virtual machine. 
127      *    If not null, each entry has the form <code>name=value</code>. 
128      * @param workingDir
129      *    This will be the working dir for the octave process,
130      *    if null the process will inherit the working dir
131      *    of the current process.
132      */
133     public OctaveExec(final int numThreadsReuse,
134 		      final Writer stdinLog, 
135 		      final Writer stderrLog, 
136 		      final String[] cmdArray,
137 		      final String[] environment, // always invoked with null 
138 		      final File workingDir) {
139 	ThreadFactory threadFactory = new NamedThreadFactory();
140 	this.executor = numThreadsReuse == -1
141 	    ? Executors.newCachedThreadPool(threadFactory)
142 	    : Executors.newFixedThreadPool(numThreadsReuse, threadFactory);
143 
144 	try {
145             this.process = Runtime.getRuntime().exec(cmdArray, 
146 						     environment, 
147 						     workingDir);
148         } catch (final IOException e) {
149             throw new OctaveIOException(e);
150         }
151         // Connect stderr
152         this.errorStreamThread = ReaderWriterPipeThread
153 	    .instantiate(new InputStreamReader(this.process.getErrorStream(), 
154 					       OctaveUtils.getUTF8()),
155 			 stderrLog);
156 
157         // Connect stdout
158         this.processReader = new BufferedReader
159 	    (new InputStreamReader(this.process.getInputStream(), 
160 				   OctaveUtils.getUTF8()));
161 
162         // Connect stdin
163 	Writer pw = new OutputStreamWriter(this.process.getOutputStream(),
164 					   OctaveUtils.getUTF8());
165 	// all written to processWriter will go to pw and, 
166 	// if not null to stdinLog
167 	this.processWriter = (stdinLog == null)
168 	    ? pw
169 	    : new TeeWriter(new NoCloseWriter(stdinLog), pw);
170     }
171 
172     private final Random random = new Random();
173 
174     private String generateSpacer() {
175         return "-=+X+=- Octave.java spacer -=+X+=- " + 
176 	    random.nextLong() + " -=+X+=-";
177     }
178 
179     /**
180      * Passes <code>input</code> to octave 
181      * and get back <code>output</code>. 
182      *
183      * @param input
184      *    a write functor which represents the script 
185      *    to be executed in octave. 
186      * @param output
187      *    the read functor which reads the result of octave execution. 
188      *    After evaluation of this method, 
189      *    the <code>output</code> is asked for the result. 
190      *///<code></code>
191     // used in OctaveIO#set(Map), OctaveIO#get(String), 
192     // OctaveIO#checkIfVarExists(String) and in 
193     // OctaveEngine#unsafeEval(String) OctaveEngine#unsafeEval(Reader) and 
194     // OctaveEngine#getVersion() only 
195     public void evalRW(final WriteFunctor input, final ReadFunctor output) {
196         final String spacer = generateSpacer();
197         final Future<Void> writerFuture = 
198 	    this.executor.submit(new OctaveWriterCallable(this.processWriter, 
199 							  input, 
200 							  spacer));
201         final Future<Void> readerFuture = 
202 	    this.executor.submit(new OctaveReaderCallable(this.processReader, 
203 							  output, 
204 							  spacer));
205         final RuntimeException writerException = getFromFuture(writerFuture);
206         // if (writerException instanceof CancellationException) {
207         //     LOG.error("Did not expect writer to be canceled", 
208 	//               writerException);
209         // }
210         if (writerException != null) {
211             if (writerException instanceof CancellationException) {
212                 LOG.error("Did not expect writer to be canceled", 
213 	    		  writerException);
214             }
215             readerFuture.cancel(true); // may interrupt if running 
216 	    throw writerException;
217         }
218         final RuntimeException readerException = getFromFuture(readerFuture);
219         // if (writerException != null) {
220         //     throw writerException;
221         // }
222         if (readerException != null) {
223             // Only gets here when writerException==null, 
224 	    // and in that case we don't expect the reader to be cancelled
225             if (readerException instanceof CancellationException) {
226                 LOG.error("Did not expect reader to be canceled", 
227 			  readerException);
228             }
229             throw readerException;
230         }
231     }
232 
233     /**
234      * Completes computation on future 
235      * and returns an exception thrown or null. 
236      */
237     private RuntimeException getFromFuture(Future<Void> future) {
238 	try {
239             future.get();
240         } catch (final InterruptedException e) {
241             LOG.error(MSG_IOE_NH, e);
242             return new RuntimeException(MSG_IOE_NH, e);
243         } catch (final ExecutionException e) {
244             if (e.getCause() instanceof OctaveException) {
245                 final OctaveException oe = (OctaveException) e.getCause();
246                 return reInstException(oe);
247             }
248             // Can happen when there is an error in a OctaveWriter
249  	    LOG.error(MSG_EXE_NH, e);
250 	    return new RuntimeException(MSG_EXE_NH, e);
251         } catch (final CancellationException e) {
252             return e;
253         } catch (final RuntimeException e) { // NOPMD 
254             LOG.error(MSG_RTE_NH, e);
255             return new RuntimeException(MSG_RTE_NH, e);
256         }
257         return null;
258     }
259 
260     /**
261      * Used by {@link #getFromFuture(Future)} 
262      * to reinstantiate an {@link OctaveException} 
263      * if this occurs as the cause of an {@link ExecutionException}. 
264      */
265     private OctaveException reInstException(OctaveException exc) {
266         OctaveException res;
267         try {
268             res = exc.getClass()
269 		// may throw NoSuchMethodException 
270 		// isa ReflectiveOperationException, 
271 		// SecurityException isa RuntimeException 
272 		.getConstructor(String.class, Throwable.class)
273 		// may throw 
274 		// IllegalArgumentException, 
275 		// ReflectiveOperationException: 
276 		// - IllegalAccessException: construtor inaccessible 
277 		// - InstantiationException: Exception class is abstract 
278 		// - InvocationTargetException: if the constructor throws an exc
279 		// ExceptionInInitializerError
280 		.newInstance(exc.getMessage(), exc);
281         } catch (RuntimeException e) { // NOPMD
282             throw new IllegalStateException("Exception should not happen", e);
283         } catch (ReflectiveOperationException e) {
284             throw new IllegalStateException("Exception should not happen", e);
285         } catch (ExceptionInInitializerError e) {
286             throw new IllegalStateException("Error should not happen", e);
287 	}
288         if (isDestroyed()) {
289             res.setDestroyed(true);
290         }
291         return res;
292     }
293 
294     /**
295      * Sets {@link #destroyed} to the parameter value given. 
296      */
297     private synchronized void setDestroyed(final boolean destroyed) {
298         this.destroyed = destroyed;
299     }
300 
301     /**
302      * Returns {@link #destroyed}. 
303      */
304     private synchronized boolean isDestroyed() {
305         return this.destroyed;
306     }
307 
308     /**
309      * Kill the octave process without remorse. 
310      */
311     public void destroy() {
312         setDestroyed(true);
313         this.executor.shutdownNow(); // returns list of tasks awaiting exec. 
314         this.process.destroy();
315         this.errorStreamThread.close();
316         try {
317             this.processWriter.close();
318         } catch (final IOException e) {
319             LOG.debug("Ignored error from processWriter.close() " + 
320 		      "in OctaveExec.destroy()", e);
321 	}
322     }
323 
324     /**
325      * Close the octave process in an orderly fashion.
326      */
327     public void close() {
328         try {
329             // it is not worth it to rewrite this 
330 	    // to use eval() and some specialised Functors
331             this.processWriter.write("exit\n");
332             this.processWriter.close();
333             final String read1 = this.processReader.readLine();
334             // Allow a single blank line, exit in octave 3.2 returns that:
335             if (read1 != null && !"".equals(read1)) {
336                 throw new OctaveIOException
337 		    ("Expected a blank line, read '" + read1 + "'");
338             }
339             final String read2 = this.processReader.readLine();
340             if (read2 != null) {
341                 throw new OctaveIOException
342 		    ("Expected reader to be at end of stream, read '" + 
343 		     read2 + "'");
344             }
345             this.processReader.close();
346             this.errorStreamThread.close();
347             int exitValue;
348             try {
349                 exitValue = this.process.waitFor();
350             } catch (final InterruptedException e) {
351                 throw new OctaveIOException
352 		    ("Interrupted when waiting for octave process " + 
353 		     "to terminate", e);
354             }
355             if (exitValue != 0) {
356                 throw new OctaveIOException
357 		    ("octave process terminated abnormaly, exitValue='" + 
358 		     exitValue + "'");
359             }
360         } catch (final IOException e) {
361             final OctaveIOException octaveException = 
362 		new OctaveIOException("reader error", e);
363             if (isDestroyed()) {
364                 octaveException.setDestroyed(true);
365             }
366             throw octaveException;
367         } finally {
368             this.executor.shutdown();
369         }
370     }
371 
372     /**
373      * @param writer
374      *    the new writer to write the error output to
375      */
376     public void setErrorWriter(final Writer writer) {
377         this.errorStreamThread.setWriter(writer);
378     }
379 
380 }