1 /* 2 * Copyright 2008, 2009 Ange Optimization ApS 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 package eu.simuline.octave.exec; 17 18 import java.io.BufferedReader; 19 import java.io.IOException; 20 import java.io.Reader; 21 import java.util.concurrent.Callable; 22 23 import eu.simuline.octave.exception.OctaveIOException; 24 import eu.simuline.octave.util.StringUtil; 25 26 import org.apache.commons.logging.Log; 27 import org.apache.commons.logging.LogFactory; 28 29 /** 30 * {@link Callable} that reads from the octave process. 31 * Used in {@link OctaveExec#evalRW(WriteFunctor, ReadFunctor)} only. 32 * This is complementary to {@link OctaveWriterCallable}. 33 */ 34 final class OctaveReaderCallable implements Callable<Void> { 35 36 private static final Log LOG = LogFactory.getLog(OctaveReaderCallable.class); 37 38 private static final String MSG_IOE_READ = 39 "IOException from ReadFunctor"; 40 private static final String MSG_IOE_CLS = 41 "IOException during close"; 42 43 /** 44 * The reader for feedback on scripts received from octave. 45 * This is nothing but {@link OctaveExec#processReader}. 46 */ 47 private final BufferedReader processReader; 48 49 /** 50 * The functor the reading task is delegated to. 51 */ 52 private final ReadFunctor readFunctor; 53 54 /** 55 * A string essentially consisting of a unique hashvalue. 56 * It has been printed by the according {@link OctaveWriterCallable} 57 * after having applied the write functor 58 * and thus indicates the end of the read sequence of the octave process. 59 */ 60 private final String spacer; 61 62 // TBC: strictly speaking, this goes wrong with a small but positive probability. 63 /** 64 * @param processReader 65 * the reader used for reading. 66 * @param readFunctor 67 * the functor the reading process is delegated to. 68 * @param spacer 69 * a string essentially consisting of a unique hashvalue 70 * printed at the end of the according write process 71 * in {@link OctaveExec#evalRW(WriteFunctor, ReadFunctor)} 72 * and thus signifying the end of the sequence to be read. 73 */ 74 OctaveReaderCallable(final BufferedReader processReader, 75 final ReadFunctor readFunctor, 76 final String spacer) { 77 this.processReader = processReader; 78 this.readFunctor = readFunctor; 79 this.spacer = spacer; 80 } 81 82 /** 83 * Reader that passes the reading on to the output from the octave process, 84 * i.e. from {@link OctaveReaderCallable#processReader} 85 * until the spacer {@link OctaveReaderCallable#spacer} reached, then it returns EOF. 86 * When this reader is closed 87 * the underlying reader is slurped up to the spacer. 88 * <p> 89 * This is used in {@link OctaveReaderCallable#call()} only. 90 */ 91 // TBC: maybe thus this shall be an inner class. 92 final class OctaveExecuteReader extends Reader { 93 94 /** 95 * The current line read from {@link OctaveReaderCallable#processReader} 96 * but not yet passed to a char-array by {@link #read(char[], int, int)}. 97 * If this buffer were empty, it is <code>null</code> instead, 98 * which is also the initial value. 99 * If this is not the first line, the line read from {@link OctaveReaderCallable#processReader} 100 * is preceded by newline before being passed to {@link #buffer}. 101 */ 102 private StringBuffer buffer = null; 103 104 /** 105 * Whether reading the first line. 106 * Initially, this is true. 107 * It is set to false, by {@link #read(char[], int, int)} 108 * if {@link #buffer} is filled for the first time. 109 */ 110 private boolean firstLine = true; 111 112 /** 113 * Whether end of reader found. 114 * Initially, this is false. 115 * It is set to false, by {@link #read(char[], int, int)} 116 * if {@link #buffer} equals {@link #spacer}, 117 * not really end of {@link OctaveReaderCallable#processReader}. 118 */ 119 private boolean eof = false; 120 121 /** 122 * Reads characters into a portion of an array. 123 * This method will block until some input is available, 124 * an I/O error occurs, or the end of the stream is reached. 125 * 126 * @param cbuf 127 * Destination buffer 128 * @param off 129 * Offset at which to start storing characters. 130 * @param len 131 * Maximum number of characters to read. 132 * @return 133 * The number of characters read, 134 * or -1 if the end of the stream has been reached. 135 * The latter is the case if {@link #eof} is set 136 * which means that line {@link #spacer} has been found. 137 * @throws IOException 138 * If an I/O error occurs. 139 * This is true in particular, 140 * if null-line has been read from {@link OctaveReaderCallable#processReader}. 141 */ 142 @Override 143 public int read(final char[] cbuf, final int off, final int len) 144 throws IOException { 145 if (this.eof) { 146 return -1; 147 } 148 if (this.buffer == null) { 149 // may throw IOException 150 final String line = OctaveReaderCallable.this.processReader.readLine(); 151 if (OctaveReaderCallable.LOG.isTraceEnabled()) { 152 OctaveReaderCallable.LOG.trace("octaveReader.readLine() = " + 153 StringUtil.jQuote(line)); 154 } 155 if (line == null) { 156 throw new IOException("Pipe to octave-process broken"); 157 } 158 if (OctaveReaderCallable.this.spacer.equals(line)) { 159 this.eof = true; 160 return -1; 161 } 162 163 // line possibly preceded by \n 164 this.buffer = new StringBuffer(line.length() + 1); 165 if (this.firstLine) { 166 this.firstLine = false; 167 } else { 168 this.buffer.append('\n'); 169 } 170 this.buffer.append(line); 171 } 172 assert this.buffer != null; 173 174 final int charsRead = Math.min(this.buffer.length(), len); 175 this.buffer.getChars(0, charsRead, cbuf, off); 176 if (charsRead == buffer.length()) { 177 this.buffer = null; 178 } else { 179 this.buffer.delete(0, charsRead); 180 } 181 return charsRead; 182 } 183 184 @Override 185 @SuppressWarnings({"checkstyle:magicnumber", "checkstyle:emptyblock"}) 186 // length of buffer is immaterial for function 187 public void close() throws IOException { 188 final char[] buffer1 = new char[4096]; 189 // Slurp the rest of the wrapped input 190 // may throw IOException 191 while (read(buffer1) != -1) { // NOPMD 192 // Do nothing 193 } 194 // may throw IOException 195 if (OctaveReaderCallable.this.processReader.ready()) { 196 throw new IOException("octaveReader is ready()"); 197 } 198 OctaveReaderCallable.LOG.debug("Reader closed()"); 199 } 200 201 } // class OctaveExecuteReader 202 203 // TBC: what about spacer? 204 // where do deviations from OctaveWriterCallable.call() come from? 205 /** 206 * Calling essentially reads from {@link #processReader} representing the octave process: 207 * Reads until {@link #spacer} is detected which is not really read but is interpreted 208 * as an eof symbol. 209 * To that end, {@link #processReader} is wrapped into an {@link OctaveExecuteReader} 210 * with parameter {@link #spacer} which yields eof if the spacer is detected. 211 * Closing the {@link OctaveExecuteReader} reads the spacer without passing it further. 212 * Exceptions are logged on {@link #LOG}. 213 * 214 * @throws OctaveIOException 215 * if underlying {@link #readFunctor} or the reader {@link #processReader} or 216 * the wrapping {@link OctaveExecuteReader} 217 * throws an {@link IOException}. 218 */ 219 @Override 220 public Void call() { 221 final Reader reader = new OctaveExecuteReader(); 222 try { 223 this.readFunctor.doReads(reader); 224 } catch (final IOException e) { 225 LOG.debug(MSG_IOE_READ, e); 226 throw new OctaveIOException(MSG_IOE_READ, e); 227 } finally { // NOPMD 228 try { 229 // this slurps the spacer 230 reader.close(); 231 } catch (final IOException e) { 232 LOG.debug(MSG_IOE_CLS, e); 233 throw new OctaveIOException(MSG_IOE_CLS, e); 234 } 235 } 236 return null; 237 } 238 239 }