View Javadoc
1   /*
2    * Copyright 2007, 2008 Ange Optimization ApS
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    *     http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  package eu.simuline.octave.util;
17  
18  import java.io.IOException;
19  import java.io.Reader;
20  import java.io.Writer;
21  
22  import eu.simuline.octave.exception.OctaveIOException;
23  import eu.simuline.octave.exception.OctaveInterruptedException;
24  
25  import org.apache.commons.logging.LogFactory;
26  import org.apache.commons.logging.Log;
27  
28  /**
29   * A Thread that moves data from a Reader to a Writer. 
30   * 
31   * @author Kim Hansen
32   */
33  public final class ReaderWriterPipeThread extends Thread {
34  
35      private static final Log LOG = LogFactory
36  	.getLog(ReaderWriterPipeThread.class);
37  
38      @SuppressWarnings("checkstyle:magicnumber")
39      private static final char[] BUF = new char[4 * 1024];
40   
41      private final Reader reader;
42  
43      private Writer writer;
44  
45      /**
46       * Will create a thread that reads from reader and writes to write 
47       * until reader reaches EOF. 
48       * Then the thread will close. 
49       * Remember to join() this thread before closeing reader or writer.
50       * 
51       * @param reader
52       * @param writer
53       * @return Returns the new thread
54       */
55      public static ReaderWriterPipeThread instantiate(final Reader reader, 
56  						     final Writer writer) {
57          final ReaderWriterPipeThread readerWriterPipeThread = 
58  	    new ReaderWriterPipeThread(reader, writer);
59          readerWriterPipeThread.setName(Thread.currentThread().getName() 
60  				       + "-javaoctave-"
61  				       + ReaderWriterPipeThread.class
62  				       .getSimpleName());
63  	readerWriterPipeThread
64  	    .setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
65  		    public void uncaughtException(Thread th, Throwable ex) {
66  			System.out.println("Uncaught : " + ex + 
67  					   " of thread " + th.getName());
68  		    }
69  		});
70          readerWriterPipeThread.start();
71          return readerWriterPipeThread;
72      }
73  
74      private ReaderWriterPipeThread(final Reader reader, final Writer writer) {
75          this.reader = reader;
76          this.writer = writer;
77      }
78  
79      @Override
80      public void run() {
81          while (!interrupted()) {
82              int len;
83              try {
84                  len = reader.read(BUF);
85              } catch (final IOException e) {
86                  LOG.error("Error when reading from reader", e);
87                  throw new OctaveIOException(e);
88              }
89              if (len == -1) {
90                  break;
91              }
92              try {
93                  synchronized (this) {
94                      if (writer != null) {
95                          writer.write(BUF, 0, len);
96                          writer.flush();
97                      }
98                  }
99              } catch (final IOException e) {
100                 LOG.error("Error when writing to writer", e);
101                 throw new OctaveIOException(e);
102             }
103         }
104         LOG.debug("ReaderWriterPipeThread finished without error");
105     }
106 
107     /**
108      * @param writer
109      *    the writer to set
110      */
111     public void setWriter(final Writer writer) {
112         synchronized (this) {
113             this.writer = writer;
114         }
115     }
116 
117     /**
118      * Close the thread.
119      */
120     public void close() {
121         interrupt();
122         try {
123             join();
124         } catch (final InterruptedException e) {
125             throw new OctaveInterruptedException(e);
126         }
127     }
128 
129 }