1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package eu.simuline.octave.exec;
20
21 import java.io.BufferedReader;
22 import java.io.File;
23 import java.io.IOException;
24 import java.io.InputStreamReader;
25 import java.io.OutputStreamWriter;
26 import java.io.Writer;
27 import java.nio.charset.Charset;
28 import java.util.Random;
29 import java.util.concurrent.CancellationException;
30 import java.util.concurrent.ExecutionException;
31 import java.util.concurrent.ExecutorService;
32 import java.util.concurrent.Executors;
33 import java.util.concurrent.Future;
34 import java.util.concurrent.ThreadFactory;
35
36 import eu.simuline.octave.exception.OctaveException;
37 import eu.simuline.octave.exception.OctaveIOException;
38 import eu.simuline.octave.util.NamedThreadFactory;
39 import eu.simuline.octave.util.NoCloseWriter;
40 import eu.simuline.octave.util.ReaderWriterPipeThread;
41 import eu.simuline.octave.util.TeeWriter;
42 import eu.simuline.octave.OctaveUtils;
43
44 import org.apache.commons.logging.Log;
45 import org.apache.commons.logging.LogFactory;
46
47
48
49
50 public final class OctaveExec {
51
52 public static final String MSG_IOE_NH =
53 "InterruptedException should not happen";
54
55 public static final String MSG_EXE_NH =
56 "ExecutionException should not happen";
57
58 public static final String MSG_RTE_NH =
59 "RuntimeException should not happen";
60
61 private static final Log LOG = LogFactory.getLog(OctaveExec.class);
62
63
64
65
66
67
68
69
70 private final Process process;
71
72
73
74
75 private final Writer processWriter;
76
77
78
79
80
81
82 private final BufferedReader processReader;
83
84
85
86
87
88
89
90 private final ExecutorService executor;
91
92
93
94
95
96
97
98 private final ReaderWriterPipeThread errorStreamThread;
99
100 private boolean destroyed = false;
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140 public OctaveExec(final int numThreadsReuse,
141 final Writer stdinLog,
142 final Writer stderrLog,
143 final Charset charset,
144 final String[] cmdArray,
145 final String[] environment,
146 final File workingDir) {
147 ThreadFactory threadFactory = new NamedThreadFactory();
148 this.executor = numThreadsReuse == -1
149 ? Executors.newCachedThreadPool(threadFactory)
150 : Executors.newFixedThreadPool(numThreadsReuse, threadFactory);
151
152 try {
153
154
155
156
157
158
159
160
161 this.process = Runtime.getRuntime().exec(cmdArray,
162 environment,
163 workingDir);
164 } catch (final IOException e) {
165 throw new OctaveIOException(e);
166 }
167
168 this.errorStreamThread = ReaderWriterPipeThread
169 .instantiate(new InputStreamReader(this.process.getErrorStream(), charset),
170 stderrLog);
171
172
173 this.processReader = new BufferedReader
174 (new InputStreamReader(this.process.getInputStream(), charset));
175
176
177 Writer pw = new OutputStreamWriter(this.process.getOutputStream(), charset);
178
179
180 this.processWriter = (stdinLog == null)
181 ? pw
182 : new TeeWriter(new NoCloseWriter(stdinLog), pw);
183 }
184
185 private final Random random = new Random();
186
187 private String generateSpacer() {
188 return "-=+X+=- Octave.java spacer -=+X+=- " +
189 this.random.nextLong() + " -=+X+=-";
190 }
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209 public void evalRW(final WriteFunctor input, final ReadFunctor output) {
210 final String spacer = generateSpacer();
211 final Future<Void> writerFuture =
212 this.executor.submit(new OctaveWriterCallable(this.processWriter,
213 input,
214 spacer));
215 final Future<Void> readerFuture =
216 this.executor.submit(new OctaveReaderCallable(this.processReader,
217 output,
218 spacer));
219 final RuntimeException writerException = getFromFuture(writerFuture);
220
221
222
223
224 if (writerException != null) {
225 if (writerException instanceof CancellationException) {
226 LOG.error("Did not expect writer to be canceled",
227 writerException);
228 }
229 readerFuture.cancel(true);
230 throw writerException;
231 }
232 final RuntimeException readerException = getFromFuture(readerFuture);
233
234
235
236 if (readerException != null) {
237
238
239 if (readerException instanceof CancellationException) {
240 LOG.error("Did not expect reader to be canceled",
241 readerException);
242 }
243 throw readerException;
244 }
245 }
246
247
248
249
250
251 private RuntimeException getFromFuture(Future<Void> future) {
252 try {
253 future.get();
254 } catch (final InterruptedException e) {
255 LOG.error(MSG_IOE_NH, e);
256 return new RuntimeException(MSG_IOE_NH, e);
257 } catch (final ExecutionException e) {
258 if (e.getCause() instanceof OctaveException) {
259 final OctaveException/../../eu/simuline/octave/exception/OctaveException.html#OctaveException">OctaveException oe = (OctaveException) e.getCause();
260 return reInstException(oe);
261 }
262
263 LOG.error(MSG_EXE_NH, e);
264 return new RuntimeException(MSG_EXE_NH, e);
265 } catch (final CancellationException e) {
266 return e;
267 } catch (final RuntimeException e) {
268 LOG.error(MSG_RTE_NH, e);
269 return new RuntimeException(MSG_RTE_NH, e);
270 }
271 return null;
272 }
273
274
275
276
277
278
279 private OctaveExceptionsimuline/octave/exception/OctaveException.html#OctaveException">OctaveException reInstException(OctaveException exc) {
280 OctaveException res;
281 try {
282 res = exc.getClass()
283
284
285
286 .getConstructor(String.class, Throwable.class)
287
288
289
290
291
292
293
294 .newInstance(exc.getMessage(), exc);
295 } catch (RuntimeException e) {
296 throw new IllegalStateException("Exception should not happen", e);
297 } catch (ReflectiveOperationException e) {
298 throw new IllegalStateException("Exception should not happen", e);
299 } catch (ExceptionInInitializerError e) {
300 throw new IllegalStateException("Error should not happen", e);
301 }
302 if (isDestroyed()) {
303 res.setDestroyed(true);
304 }
305 return res;
306 }
307
308
309
310
311 private synchronized void setDestroyed(final boolean destroyed) {
312 this.destroyed = destroyed;
313 }
314
315
316
317
318 private synchronized boolean isDestroyed() {
319 return this.destroyed;
320 }
321
322
323
324
325 public void destroy() {
326 setDestroyed(true);
327 this.executor.shutdownNow();
328 this.process.destroy();
329 this.errorStreamThread.close();
330 try {
331 this.processWriter.close();
332 } catch (final IOException e) {
333 LOG.debug("Ignored error from processWriter.close() " +
334 "in OctaveExec.destroy()", e);
335 }
336 }
337
338
339
340
341
342
343
344
345
346 public void close() {
347 try {
348
349
350
351 this.processWriter.write("exit\n");
352 this.processWriter.close();
353 final String read1 = this.processReader.readLine();
354
355
356 if (read1 != null && !"".equals(read1)) {
357 throw new OctaveIOException
358 ("Expected a blank line, read '" + read1 + "'");
359 }
360 final String read2 = this.processReader.readLine();
361 if (read2 != null) {
362 throw new OctaveIOException
363 ("Expected reader to be at end of stream, read '" +
364 read2 + "'");
365 }
366
367 this.processReader.close();
368
369 this.errorStreamThread.close();
370 int exitValue;
371 try {
372 exitValue = this.process.waitFor();
373 } catch (final InterruptedException e) {
374 throw new OctaveIOException
375 ("Interrupted when waiting for octave process " +
376 "to terminate", e);
377 }
378 if (exitValue != 0) {
379 throw new OctaveIOException
380 ("octave process terminated abnormaly, exitValue='" +
381 exitValue + "'");
382 }
383 } catch (final IOException e) {
384
385
386 final OctaveIOException octaveException =
387 new OctaveIOException("reader error", e);
388 if (isDestroyed()) {
389 octaveException.setDestroyed(true);
390 }
391 throw octaveException;
392 } finally {
393 this.executor.shutdown();
394 }
395 }
396
397
398
399
400
401 public void setErrorWriter(final Writer writer) {
402 this.errorStreamThread.setWriter(writer);
403 }
404
405 }