1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package eu.simuline.octave.util;
17
18 import java.io.IOException;
19 import java.io.Reader;
20 import java.io.Writer;
21
22 import eu.simuline.octave.exception.OctaveIOException;
23 import eu.simuline.octave.exception.OctaveInterruptedException;
24
25 import org.apache.commons.logging.LogFactory;
26 import org.apache.commons.logging.Log;
27
28
29
30
31
32
33 public final class ReaderWriterPipeThread extends Thread {
34
35 private static final Log LOG = LogFactory
36 .getLog(ReaderWriterPipeThread.class);
37
38 @SuppressWarnings("checkstyle:magicnumber")
39 private static final char[] BUF = new char[4 * 1024];
40
41 private final Reader reader;
42
43 private Writer writer;
44
45
46
47
48
49
50
51
52
53
54
55 public static ReaderWriterPipeThread instantiate(final Reader reader,
56 final Writer writer) {
57 final ReaderWriterPipeThread readerWriterPipeThread =
58 new ReaderWriterPipeThread(reader, writer);
59 readerWriterPipeThread.setName(Thread.currentThread().getName()
60 + "-javaoctave-"
61 + ReaderWriterPipeThread.class
62 .getSimpleName());
63 readerWriterPipeThread
64 .setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
65 public void uncaughtException(Thread th, Throwable ex) {
66 System.out.println("Uncaught : " + ex +
67 " of thread " + th.getName());
68 }
69 });
70 readerWriterPipeThread.start();
71 return readerWriterPipeThread;
72 }
73
74 private ReaderWriterPipeThread(final Reader reader, final Writer writer) {
75 this.reader = reader;
76 this.writer = writer;
77 }
78
79 @Override
80 public void run() {
81 while (!interrupted()) {
82 int len;
83 try {
84 len = reader.read(BUF);
85 } catch (final IOException e) {
86 LOG.error("Error when reading from reader", e);
87 throw new OctaveIOException(e);
88 }
89 if (len == -1) {
90 break;
91 }
92 try {
93 synchronized (this) {
94 if (writer != null) {
95 writer.write(BUF, 0, len);
96 writer.flush();
97 }
98 }
99 } catch (final IOException e) {
100 LOG.error("Error when writing to writer", e);
101 throw new OctaveIOException(e);
102 }
103 }
104 LOG.debug("ReaderWriterPipeThread finished without error");
105 }
106
107
108
109
110
111 public void setWriter(final Writer writer) {
112 synchronized (this) {
113 this.writer = writer;
114 }
115 }
116
117
118
119
120 public void close() {
121 interrupt();
122 try {
123 join();
124 } catch (final InterruptedException e) {
125 throw new OctaveInterruptedException(e);
126 }
127 }
128
129 }