Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Spartan: A ''Forking'' Java Program Launcher, Part 4

DZone 's Guide to

Spartan: A ''Forking'' Java Program Launcher, Part 4

In the final part of this series, we look at the development road map for Spartan and see what's next for this great open source Java library.

· Open Source Zone ·
Free Resource

The series so far: 

Spartan Road Map: A Tip of the Hat to Reactive Programming

Spartan has been in production use for over a year, but its development began back in 2015. It originated when C++11 and Java 8 were new. Now C++17 and Java 10 are the latest. In the Java community, the Jetbrains Kotlin language has been making an impact (with quite the boost from Google's adoption of Kotlin for Android programming).

  • For the time being, the Spartan C++11 code base will remain compileable in g++ 4.8.4 as that has been sufficient for its functionality.

  • It's possible that Spartan might support Kotlin as is but no attempt has been made to try it out yet (not using Kotlin in the day job so hasn't been a priority).

  • Java 8 is the only Java language version supported so far. Yet Java 10 is here now and will definitely be important to support. That will require a new round of significant development and testing effort (making sure to support Java modularity introduced in Java 9).

  • Implement Control-C (SIGINT signal) handling for the Spartan client mode

    • It needs to convey SIGTERM to child process at other end of pipe, or if is reading from the supervisor, should just break off reading its output.

  • Miscellaneous improvements (relatively easy enhancements):

    • Provide for special variables that can be used in the config.ini file to denote the spartan install directory path and the directory path where the symbolic link is located.

    • spartan private environment variable SPARTAN_JAVA_HOME that will take precedence over JAVA_HOME if it is defined.

  • Spartan Java API to allow for authentication as a Linux user

    • Current security model is pretty simple - this prys the door open to greater complexity, so very iffy on this feature.

  • Addition of a new, enhanced, reactive programming API, loosely based on the Java-9-introduced java.util.concurrent.Flow interfaces. This reactive API is specifically for invoking worker child process sub commands. A new invoke method will return an extended InvokeResponseEx object:

class InvokeResponse {
  public final int childPID;
  public final java.io.InputStream inStream;
  InvokeResponse(int childPID, java.io.InputStream inStream) {
    this.childPID = childPID;
    this.inStream = inStream;
  }
}

final class InvokeResponseEx extends InvokeResponse {
  public final InputStream errStream;
  public final OutputStream childInputStream;
  InvokeResponseEx(int childPID, InputStream inStream, InputStream errStream, OutputStream childInputStream) {
    super(childPID, inStream);
    this.errStream = errStream;
    this.childInputStream = childInputStream;
  }
}

static InvokeResponseEx invokeCommandEx(String... args) { ... }

Instead of the class inheritance/interface programming model that java.util.concurrent.Flow mandates (concrete classes must be created which implement the Flow interfaces such as Subscriber), the Spartan approach is a lambda-styled API. Here are proposed interfaces for spartan.fstreams.Flow:

public final class Flow {
  public interface Subscriber {
    Subscriber onError(BiConsumer<InputStream, Subscription> onErrorAction);
    Subscriber onNext(BiConsumer<InputStream, Subscription> onNextAction);
    Subscriber subscribe(InvokeResponseEx rsp);
    FuturesCompletion start();
  }

  public interface Subscription {
    void cancel();
    OutputStream getRequestStream();
  }

  public interface FuturesCompletion {
    Future<Integer> poll();
    Future<Integer> poll(long timeout, TimeUnit unit) throws InterruptedException;
    Future<Integer> take() throws InterruptedException;
    int count();
  }

  public static Subscriber subscribe(InvokeResponseEx rsp) { ... }
}

Here is an example of how to program to this lambda-oriented reactive API:

InvokeResponseEx rsp = Spartan.invokeCommandEx("ETL", dataInputFile.getPath());

final OutputStream errOutFileStream = Files.newOutputStream(errOutFile.toPath(), CREATE, TRUNCATE_EXISTING);
final OutputStream outputFileStream = Files.newOutputStream(outputFile.toPath(), CREATE, TRUNCATE_EXISTING);

FuturesCompletion futures = spartan.fstreams.Flow.subscribe(rsp)
  .onError((errStrm, subscription) -> copyWithClose(errStrm, errOutFileStream, subscription))
  .onNext((outStrm, subscription) -> copyWithClose(outStrm, outputFileStream, subscription))
  .start();

int count = futures.count();
while(count-- > 0) {
  try {
    int childPID = futures.take().get();
  } catch (ExecutionException e) {
    log.error("exception encountered in sub task:", e);
  } catch (InterruptedException e) {
    log.warn("interruption occurred");
  }
}

Multiple child worker subprocesses can be invoked and then their respective subscriptions chained into a single FuturesCompletion context:

InvokeResponseEx rsp1 = Spartan.invokeCommandEx("FIB_GEN", "1000000");
InvokeResponseEx rsp2 = Spartan.invokeCommandEx("EXTRACT", sourceFile.getPath());

FuturesCompletion futures = spartan.fstreams.Flow.subscribe(rsp1)
  .onError((errStrm, subscription) -> copyWithClose(errStrm, errOutFileStream01, subscription))
  .onNext((outStrm, subscription) -> copyWithClose(outStrm, outputFileStream01, subscription))
  .subscribe(rsp2)
  .onError((errStrm, subscription) -> copyWithClose(errStrm, errOutFileStream02, subscription))
  .onNext((outStrm, subscription) -> copyWithClose(outStrm, outputFileStream02, subscription))
  .start();

int count = futures.count();
while(count-- > 0) {
  try {
    int childPID = futures.take().get();
  } catch (ExecutionException e) {
    log.error("exception encountered in sub task:", e);
  } catch (InterruptedException e) {
    log.warn("interruption occurred");
  }
}

Or the subscribers could be chained via loop iteration — here we see subscribers established for a list of input files (where each input file is passed to a child worker process when calling Spartan.invokeCommandEx(...)):

final Set<Integer> childPIDs = new HashSet<>();
Subscriber subscriber = null;

for(final File inputFile : inputFiles) {
  final String inputFileName = inputFile.getName();
  final int lastIndex = inputFileName.lastIndexOf(".gz");
  final String outputFileName = inputFileName.substring(0, lastIndex);
  final File errOutFile = new File(inputFile.getParent(), outputFileName + ".err");
  final File outputFile = new File(inputFile.getParent(), outputFileName);
  final OutputStream errOutFileStream = Files.newOutputStream(errOutFile.toPath(), CREATE, TRUNCATE_EXISTING);
  final OutputStream outputFileStream = Files.newOutputStream(outputFile.toPath(), CREATE, TRUNCATE_EXISTING);
  final InvokeResponseEx rsp = Spartan.invokeCommandEx("UN_GZIP", inputFile.getPath());
  childPIDs.add(rsp.childPID);
  subscriber = subscriber == null ? spartan.fstreams.Flow.subscribe(rsp) : subscriber.subscribe(rsp);
  subscriber
    .onError((errStrm, subscription) -> copyWithClose(errStrm, errOutFileStream, subscription))
    .onNext((outStrm, subscription) -> copyWithClose(outStrm, outputFileStream, subscription));
}

final FuturesCompletion futures = subscriber.start();

int count = futures.count();
final String[] pids = new String[count];
int i = 0;

while(count-- > 0) {
  try {
    final Integer childPID = futures.take().get();
    childPIDs.remove(childPID);
    pids[i++] = childPID.toString();
  } catch (ExecutionException e) {
    log.error("exception encountered in sub task:", e);
  } catch (InterruptedException e) {
    log.warn("interruption occurred");
  }
}

Notice that in the first iteration pass through the loop the subscriber is established via the static method  Flow.subscribe(...) call and then thereafter via the instance method subscriber.subscribe(...) call.

The new Flow interfaces and the above depicted style of programming have already been proved out in a prototyping test bed application.

This feature will also require the addition of a new annotation which will be associated to a new method entry point signature for child worker sub-commands. The method signature will have two additional stream arguments like so:

@ChildWorkerCommandEx(cmd="CDCETL", jvmArgs={"-Xms128m", "-Xmx324m"})
public static void doCdcEtlProcessing(String[] args, PrintStream outStream, PrintStream errStream, InputStream inStream) { ... }

Also, the new  Spartan.invokeCommandEx(...) method still remains to be implemented - it will require support in the underlying C++ Spartan program launcher.

Working with the lambda-centric Spartan Flow interfaces is rather eye opening in itself - it is a much more accomodating programming model than the Flow interfaces programming model introduced in Java 9. Frankly, that was the old style of Java programming that predates Java 8 and it is unfortunate that reactive programming for Java, as introduced in Java 9, was based on this (the reactive programming standard it is adhering to is just plain out of date in respect to contemporary Java programming practices).

One thing that will be noticed is that the Spartan Flow interfaces deal with InputStream and OutputStream. The intent is to allow direct use of the stream communication pipes to the child worker process. The Java 9 Flow approach uses generic type templating and passes materialized objects to a subscriber. My personal use of Spartan child worker processes to date is such that their output is consumed by a special zero-garbage text line reader class in combination with regular expressions acting on a CharSequence buffer, and which match on some simple command language syntax. Consequently, a child worker process can be monitored by a supevisor in an infinitely running 24/7 manner and yet generate virtually no heap garbage - a desirable characteristic.

An object serialization abstraction layer could further be devised on top of this API, though, if one preferred to work with Java objects instead.

Conclusion

In other programming languages, what Spartan empowers would perhaps not be very special. Concurrent programming via processes is a rather ancient practice in computing. All manner of stalwart programs that run as services on Linux, like PostreSQL, MySQL, Redis (many others) - and desktop applications such as Chrome browser and lately the Firefox brower - utilize multi-process concurrent programming as core to how they are designed and operate.

However, for the Java language, where this manner of programming now becomes so facile, its very much like a new landscape of program architecture is presenting itself. Enjoy.

Topics:
open source ,java libraries ,spartan ,java ,forking ,tutorial

Published at DZone with permission of

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}