Java I/O streams and RMI
Join the DZone community and get the full member experience.
Join For FreePerhaps every programmer working with Java RMI (Remote Method Invocation) comes to the point when he/she recognizes that java.io.Input/OutputStream classes do not implement java.io.Serializable interface - so that they cannot be used as remote methods' arguments or return values. I/O streams are usually tightly bound to a file descriptor or a network connection so its reasonable that they should not be used outside the local JVM.
In the project I have worked on there was a requirement to expose CMS system interface via RMI: however some methods took InputStream instance as document content. I have decided to use Spring and its AOP support to create an interceptor that pre-process service call arguments on the caller's side and another to post-process return value on the callee's side.
The idea is straightforward: we will wrap InputStream-s in a wrapper class implementing remote interface and instead of the InputStream we will pass an RMI stub performing remote calls back to the wrapper.
I/O streams' contracts as remote interfaces
Both input and output streams can be closed:
public interface Closeable extends Remote {
public void close() throws IOException, RemoteException;
}
Contract of a readable stream:
public interface Readable extends Closeable {
/**
* Reads at most count bytes from the stream.
* @param count
* @return data read as a byte array
* @throws IOException
* @throws RemoteException
*/
public byte[] read(int count) throws IOException, RemoteException;
}
Contract of an output stream:
public interface Writeable extends Closeable {
/**
* Writes given byte block to the stream.
* @param data
* @throws IOException
* @throws RemoteException
*/
public void write(byte data[]) throws IOException, RemoteException;
}
Implementing client-side proxies
This is an implementation of a java.io.InputStream subclass to be sent via RMI - the only need to implement an InputStream is to know how to read next byte of data. We will fetch data from the remote side in larger blocks, naturally. The buffer size grows up silently upto the specified size.
public class RemoteInputStream extends InputStream implements Serializable {
private static final long serialVersionUID = 1L;
private final Readable source;
private byte buffer[];
private int pos;
private int exp;
private final static int MAX_EXP = 6; // max fetch size = 2^MAX_EXP (64 KB)
RemoteInputStream(Readable source) {
this.source = source;
}
public int read() throws IOException {
if (pos == -2) return -1;
if (buffer == null || pos > buffer.length - 1) {
buffer = source.read(1024 *(exp > MAX_EXP ? 1 << MAX_EXP : 1 << exp++)); // max 64 KB fetch
pos = 0;
if (buffer.length == 0) {
pos = -2;
return -1;
}
}
return buffer[pos++] & 0xff;
}
public void close() throws IOException {
source.close();
}
}
The RemoteOutputStream's implementation is analogous - it can be found in attached project bundle.
Implementing server-side wrapper
public class RemoteInputStreamServer implements Readable {
private final InputStream in;
private static final byte EMPTY_BUFFER[] = new byte[0];
RemoteInputStreamServer(InputStream in) {
this.in = in;
}
public byte[] read(int count) throws IOException, RemoteException {
final byte buffer[] = new byte[count];
final int actualCount = in.read(buffer);
if (actualCount == count)
return buffer;
else if (actualCount == -1)
return EMPTY_BUFFER;
else {
final byte data[] = new byte[actualCount];
System.arraycopy(buffer, 0, data, 0, data.length);
return data;
}
}
public void close() throws IOException, RemoteException {
try {
in.close();
} catch (IOException ioex) {
throw ioex;
} finally {
UnicastRemoteObject.unexportObject(this, true);
}
}
public static RemoteInputStream wrap(InputStream in) throws RemoteException {
return new RemoteInputStream((Readable)
UnicastRemoteObject.exportObject(new RemoteInputStreamServer(in)));
}
}
Usage
The use of the introduced classes is simple - for given InputStream instance we will create remote proxy using the wrap(InputStream) static factory method of the RemoteInputStreamServer class. Here is the interceptor that pre-processes RMI methods' arguments on the client side:
public class RmiArgumentsPreprocessor implements MethodInterceptor {
public Object invoke(MethodInvocation invocation) throws Throwable {
final Object args[] = invocation.getArguments();
for (int i = 0; i < args.length; ++i) {
if (args[i] != null && args[i] instanceof InputStream) {
final InputStream in = (InputStream)args[i];
args[i] = RemoteInputStreamServer.wrap(in);
}
}
return invocation.proceed();
}
}
Usually, you know that you are using RMI, so you will perform wrapping manually when using stream arguments or return values. Note that we do not use rmiregistry at all - the stream proxy extending Input/OutputStream contains remote stub obtained on the server side - it is the object created by the UnicastRemoteObject.exportObject(Remote) call. Stream wrapper is un-exported when client performs close() call on the stream.
In the attachement you can find all source codes inside a zipped Eclipse project.
I hope this article helps you to overcome some of the limitations implied by RMI ;)
Opinions expressed by DZone contributors are their own.
Comments