1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package eu.simuline.octave.exec;
20
21 import java.io.BufferedReader;
22 import java.io.File;
23 import java.io.IOException;
24 import java.io.InputStreamReader;
25 import java.io.OutputStreamWriter;
26 import java.io.Writer;
27
28 import java.util.Random;
29 import java.util.concurrent.CancellationException;
30 import java.util.concurrent.ExecutionException;
31 import java.util.concurrent.ExecutorService;
32 import java.util.concurrent.Executors;
33 import java.util.concurrent.Future;
34 import java.util.concurrent.ThreadFactory;
35
36 import eu.simuline.octave.exception.OctaveException;
37 import eu.simuline.octave.exception.OctaveIOException;
38 import eu.simuline.octave.util.NamedThreadFactory;
39 import eu.simuline.octave.util.NoCloseWriter;
40 import eu.simuline.octave.util.ReaderWriterPipeThread;
41 import eu.simuline.octave.util.TeeWriter;
42 import eu.simuline.octave.OctaveUtils;
43
44 import org.apache.commons.logging.Log;
45 import org.apache.commons.logging.LogFactory;
46
47
48
49
50 public final class OctaveExec {
51
52 public static final String MSG_IOE_NH =
53 "InterruptedException should not happen";
54
55 public static final String MSG_EXE_NH =
56 "ExecutionException should not happen";
57
58 public static final String MSG_RTE_NH =
59 "RuntimeException should not happen";
60
61 private static final Log LOG = LogFactory.getLog(OctaveExec.class);
62
63
64
65
66
67 private final Process process;
68
69
70
71
72 private final Writer processWriter;
73
74
75
76
77
78
79 private final BufferedReader processReader;
80
81
82
83
84
85
86
87 private final ExecutorService executor;
88
89
90
91
92
93
94
95 private final ReaderWriterPipeThread errorStreamThread;
96
97 private boolean destroyed = false;
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133 public OctaveExec(final int numThreadsReuse,
134 final Writer stdinLog,
135 final Writer stderrLog,
136 final String[] cmdArray,
137 final String[] environment,
138 final File workingDir) {
139 ThreadFactory threadFactory = new NamedThreadFactory();
140 this.executor = numThreadsReuse == -1
141 ? Executors.newCachedThreadPool(threadFactory)
142 : Executors.newFixedThreadPool(numThreadsReuse, threadFactory);
143
144 try {
145 this.process = Runtime.getRuntime().exec(cmdArray,
146 environment,
147 workingDir);
148 } catch (final IOException e) {
149 throw new OctaveIOException(e);
150 }
151
152 this.errorStreamThread = ReaderWriterPipeThread
153 .instantiate(new InputStreamReader(this.process.getErrorStream(),
154 OctaveUtils.getUTF8()),
155 stderrLog);
156
157
158 this.processReader = new BufferedReader
159 (new InputStreamReader(this.process.getInputStream(),
160 OctaveUtils.getUTF8()));
161
162
163 Writer pw = new OutputStreamWriter(this.process.getOutputStream(),
164 OctaveUtils.getUTF8());
165
166
167 this.processWriter = (stdinLog == null)
168 ? pw
169 : new TeeWriter(new NoCloseWriter(stdinLog), pw);
170 }
171
172 private final Random random = new Random();
173
174 private String generateSpacer() {
175 return "-=+X+=- Octave.java spacer -=+X+=- " +
176 random.nextLong() + " -=+X+=-";
177 }
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195 public void evalRW(final WriteFunctor input, final ReadFunctor output) {
196 final String spacer = generateSpacer();
197 final Future<Void> writerFuture =
198 this.executor.submit(new OctaveWriterCallable(this.processWriter,
199 input,
200 spacer));
201 final Future<Void> readerFuture =
202 this.executor.submit(new OctaveReaderCallable(this.processReader,
203 output,
204 spacer));
205 final RuntimeException writerException = getFromFuture(writerFuture);
206
207
208
209
210 if (writerException != null) {
211 if (writerException instanceof CancellationException) {
212 LOG.error("Did not expect writer to be canceled",
213 writerException);
214 }
215 readerFuture.cancel(true);
216 throw writerException;
217 }
218 final RuntimeException readerException = getFromFuture(readerFuture);
219
220
221
222 if (readerException != null) {
223
224
225 if (readerException instanceof CancellationException) {
226 LOG.error("Did not expect reader to be canceled",
227 readerException);
228 }
229 throw readerException;
230 }
231 }
232
233
234
235
236
237 private RuntimeException getFromFuture(Future<Void> future) {
238 try {
239 future.get();
240 } catch (final InterruptedException e) {
241 LOG.error(MSG_IOE_NH, e);
242 return new RuntimeException(MSG_IOE_NH, e);
243 } catch (final ExecutionException e) {
244 if (e.getCause() instanceof OctaveException) {
245 final OctaveException oe = (OctaveException) e.getCause();
246 return reInstException(oe);
247 }
248
249 LOG.error(MSG_EXE_NH, e);
250 return new RuntimeException(MSG_EXE_NH, e);
251 } catch (final CancellationException e) {
252 return e;
253 } catch (final RuntimeException e) {
254 LOG.error(MSG_RTE_NH, e);
255 return new RuntimeException(MSG_RTE_NH, e);
256 }
257 return null;
258 }
259
260
261
262
263
264
265 private OctaveException reInstException(OctaveException exc) {
266 OctaveException res;
267 try {
268 res = exc.getClass()
269
270
271
272 .getConstructor(String.class, Throwable.class)
273
274
275
276
277
278
279
280 .newInstance(exc.getMessage(), exc);
281 } catch (RuntimeException e) {
282 throw new IllegalStateException("Exception should not happen", e);
283 } catch (ReflectiveOperationException e) {
284 throw new IllegalStateException("Exception should not happen", e);
285 } catch (ExceptionInInitializerError e) {
286 throw new IllegalStateException("Error should not happen", e);
287 }
288 if (isDestroyed()) {
289 res.setDestroyed(true);
290 }
291 return res;
292 }
293
294
295
296
297 private synchronized void setDestroyed(final boolean destroyed) {
298 this.destroyed = destroyed;
299 }
300
301
302
303
304 private synchronized boolean isDestroyed() {
305 return this.destroyed;
306 }
307
308
309
310
311 public void destroy() {
312 setDestroyed(true);
313 this.executor.shutdownNow();
314 this.process.destroy();
315 this.errorStreamThread.close();
316 try {
317 this.processWriter.close();
318 } catch (final IOException e) {
319 LOG.debug("Ignored error from processWriter.close() " +
320 "in OctaveExec.destroy()", e);
321 }
322 }
323
324
325
326
327 public void close() {
328 try {
329
330
331 this.processWriter.write("exit\n");
332 this.processWriter.close();
333 final String read1 = this.processReader.readLine();
334
335 if (read1 != null && !"".equals(read1)) {
336 throw new OctaveIOException
337 ("Expected a blank line, read '" + read1 + "'");
338 }
339 final String read2 = this.processReader.readLine();
340 if (read2 != null) {
341 throw new OctaveIOException
342 ("Expected reader to be at end of stream, read '" +
343 read2 + "'");
344 }
345 this.processReader.close();
346 this.errorStreamThread.close();
347 int exitValue;
348 try {
349 exitValue = this.process.waitFor();
350 } catch (final InterruptedException e) {
351 throw new OctaveIOException
352 ("Interrupted when waiting for octave process " +
353 "to terminate", e);
354 }
355 if (exitValue != 0) {
356 throw new OctaveIOException
357 ("octave process terminated abnormaly, exitValue='" +
358 exitValue + "'");
359 }
360 } catch (final IOException e) {
361 final OctaveIOException octaveException =
362 new OctaveIOException("reader error", e);
363 if (isDestroyed()) {
364 octaveException.setDestroyed(true);
365 }
366 throw octaveException;
367 } finally {
368 this.executor.shutdown();
369 }
370 }
371
372
373
374
375
376 public void setErrorWriter(final Writer writer) {
377 this.errorStreamThread.setWriter(writer);
378 }
379
380 }