View Javadoc
1   /*
2    * Copyright 2008, 2009 Ange Optimization ApS
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    *     http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  package eu.simuline.octave.exec;
17  
18  import java.io.BufferedReader;
19  import java.io.IOException;
20  import java.io.Reader;
21  import java.util.concurrent.Callable;
22  
23  import eu.simuline.octave.exception.OctaveIOException;
24  import eu.simuline.octave.util.StringUtil;
25  
26  import org.apache.commons.logging.Log;
27  import org.apache.commons.logging.LogFactory;
28  
29  /**
30   * {@link Callable} that reads from the octave process. 
31   * Used in {@link OctaveExec#evalRW(WriteFunctor, ReadFunctor)} only. 
32   * This is complementary to {@link OctaveWriterCallable}. 
33   */
34  final class OctaveReaderCallable implements Callable<Void> {
35  
36      private static final Log LOG = LogFactory.getLog(OctaveReaderCallable.class);
37      
38     private static final String MSG_IOE_READ = 
39  	"IOException from ReadFunctor";
40      private static final String MSG_IOE_CLS = 
41  	"IOException during close";
42  
43      /**
44       * The reader for feedback on scripts received from octave. 
45       * This is nothing but {@link OctaveExec#processReader}. 
46       */
47      private final BufferedReader processReader;
48  
49      /**
50       * The functor the reading task is delegated to. 
51       */
52      private final ReadFunctor readFunctor;
53  
54      /**
55       * A string essentially consisting of a unique hashvalue. 
56       * It has been printed by the according {@link OctaveWriterCallable} 
57       * after having applied the write functor 
58       * and thus indicates the end of the read sequence of the octave process.  
59       */
60      private final String spacer;
61  
62      // TBC: strictly speaking, this goes wrong with a small but positive probability. 
63      /**
64       * @param processReader
65       *    the reader used for reading. 
66       * @param readFunctor
67       *    the functor the reading process is delegated to. 
68       * @param spacer
69       *    a string essentially consisting of a unique hashvalue 
70       *    printed at the end of the according write process 
71       *    in {@link OctaveExec#evalRW(WriteFunctor, ReadFunctor)} 
72       *    and thus signifying the end of the sequence to be read.
73       */
74      OctaveReaderCallable(final BufferedReader processReader, 
75  			 final ReadFunctor readFunctor, 
76  			 final String spacer) {
77          this.processReader = processReader;
78          this.readFunctor   = readFunctor;
79          this.spacer        = spacer;
80      }
81  
82      /**
83       * Reader that passes the reading on to the output from the octave process, 
84       * i.e. from {@link OctaveReaderCallable#processReader} 
85       * until the spacer {@link OctaveReaderCallable#spacer} reached, then it returns EOF. 
86       * When this reader is closed 
87       * the underlying reader is slurped up to the spacer. 
88       * <p>
89       * This is used in {@link OctaveReaderCallable#call()} only. 
90       */
91      // TBC: maybe thus this shall be an inner class. 
92      final class OctaveExecuteReader extends Reader {
93  
94          /**
95           * The current line read from {@link OctaveReaderCallable#processReader} 
96           * but not yet passed to a char-array by {@link #read(char[], int, int)}. 
97           * If this buffer were empty, it is <code>null</code> instead, 
98           * which is also the initial value. 
99           * If this is not the first line, the line read from {@link OctaveReaderCallable#processReader} 
100          * is preceded by newline before being passed to {@link #buffer}. 
101          */
102         private StringBuffer buffer = null;
103 
104         /**
105          * Whether reading the first line. 
106          * Initially, this is true. 
107          * It is set to false, by {@link #read(char[], int, int)} 
108          * if {@link #buffer} is filled for the first time. 
109          */
110         private boolean firstLine = true;
111 
112         /**
113          * Whether end of reader found. 
114          * Initially, this is false. 
115          * It is set to false, by {@link #read(char[], int, int)} 
116          * if {@link #buffer} equals {@link #spacer}, 
117          * not really end of {@link OctaveReaderCallable#processReader}. 
118          */
119         private boolean eof = false;
120 
121         /**
122          * Reads characters into a portion of an array. 
123          * This method will block until some input is available, 
124          * an I/O error occurs, or the end of the stream is reached.
125          *
126          * @param cbuf 
127          *    Destination buffer
128          * @param off 
129          *    Offset at which to start storing characters. 
130          * @param len 
131          *    Maximum number of characters to read. 
132          * @return
133          *    The number of characters read, 
134          *    or -1 if the end of the stream has been reached. 
135          *    The latter is the case if {@link #eof} is set 
136          *    which means that line {@link #spacer} has been found. 
137          * @throws IOException 
138          *    If an I/O error occurs. 
139          *    This is true in particular, 
140          *    if null-line has been read from {@link OctaveReaderCallable#processReader}. 
141          */
142         @Override
143         public int read(final char[] cbuf, final int off, final int len) 
144     	throws IOException {
145             if (this.eof) {
146                 return -1;
147             }
148             if (this.buffer == null) {
149                 // may throw IOException 
150                 final String line = OctaveReaderCallable.this.processReader.readLine();
151                 if (OctaveReaderCallable.LOG.isTraceEnabled()) {
152                     OctaveReaderCallable.LOG.trace("octaveReader.readLine() = " + 
153     			  StringUtil.jQuote(line));
154                 }
155                 if (line == null) {
156                     throw new IOException("Pipe to octave-process broken");
157                 }
158                 if (OctaveReaderCallable.this.spacer.equals(line)) {
159                     this.eof = true;
160                     return -1;
161                 }
162 
163     	    // line possibly preceded by \n 
164                 this.buffer = new StringBuffer(line.length() + 1);
165                 if (this.firstLine) {
166                     this.firstLine = false;
167                 } else {
168                     this.buffer.append('\n');
169                 }
170                 this.buffer.append(line);
171             }
172     	assert this.buffer != null;
173 
174             final int charsRead = Math.min(this.buffer.length(), len);
175             this.buffer.getChars(0, charsRead, cbuf, off);
176             if (charsRead == buffer.length()) {
177                 this.buffer = null;
178             } else {
179                 this.buffer.delete(0, charsRead);
180             }
181             return charsRead;
182         }
183 
184         @Override
185         @SuppressWarnings({"checkstyle:magicnumber", "checkstyle:emptyblock"})
186         // length of buffer is immaterial for function 
187         public void close() throws IOException {
188             final char[] buffer1 = new char[4096];
189             // Slurp the rest of the wrapped input
190             // may throw IOException 
191             while (read(buffer1) != -1) { // NOPMD 
192                 // Do nothing
193             }
194             // may throw IOException 
195             if (OctaveReaderCallable.this.processReader.ready()) {
196                 throw new IOException("octaveReader is ready()");
197             }
198             OctaveReaderCallable.LOG.debug("Reader closed()");
199         }
200 
201     } // class OctaveExecuteReader
202 
203     // TBC: what about spacer?
204     // where do deviations from OctaveWriterCallable.call() come from? 
205     /**
206      * Calling essentially reads from {@link #processReader} representing the octave process: 
207      * Reads until {@link #spacer} is detected which is not really read but is interpreted 
208      * as an eof symbol. 
209      * To that end, {@link #processReader} is wrapped into an {@link OctaveExecuteReader} 
210      * with parameter {@link #spacer} which yields eof if the spacer is detected. 
211      * Closing the {@link OctaveExecuteReader} reads the spacer without passing it further.  
212      * Exceptions are logged on {@link #LOG}. 
213      *
214      * @throws OctaveIOException 
215      *    if underlying {@link #readFunctor} or the reader {@link #processReader} or 
216      *    the wrapping {@link  OctaveExecuteReader} 
217      *    throws an {@link IOException}. 
218      */
219     @Override
220     public Void call() {
221         final Reader reader = new OctaveExecuteReader();
222         try {
223             this.readFunctor.doReads(reader);
224         } catch (final IOException e) {
225 	    LOG.debug(MSG_IOE_READ, e);
226             throw new OctaveIOException(MSG_IOE_READ, e);
227         } finally { // NOPMD
228             try {
229         	// this slurps the spacer 
230                 reader.close();
231             } catch (final IOException e) {
232                 LOG.debug(MSG_IOE_CLS, e);
233                 throw new OctaveIOException(MSG_IOE_CLS, e);
234             }
235         }
236         return null;
237     }
238 
239 }