Over a million developers have joined DZone.

How to Infer Your Linux FD Numbers With Java (and Why)

Learn how you can infer file descriptor numbers without consuming your hard disk and more quickly than using JDBC operations.

What every Java engineer should know about microservices: Reactive Microservices Architecture.  Brought to you in partnership with Lightbend.

In Badoo's BI, we use a combination of Exasol and Hadoop for ETL purposes. One of our common usage patterns is to export of some queries from Exasol to HDFS (for later analysis, or just for some kind of backup). The fastest and simpliest way is to dump query results from Exasol is to execute such a query:

EXPORT (SELECT * FROM fact_user_registration)
INTO LOCAL CSV FILE '/local/tmp/user_reg.csv';

As a result, a table dump will appear on our HDD. After the file appears, it should be uploaded to HDFS. That's simple and easy to be implemented, but, when running in parallel, we may run out of disk space.

From the other side, it is possible to upload content to HDFS via running the following command, passing all the file content into STDIN:

/local/hadoop/bin/hdfs -put - /path/to/file.csv // instead of input filename, a "-" was passed

So, the solution looks like this:

  1. Spawn "hdfs put" process, and get its STDIN as a pipe. In a Linux system, the parent process will have an opened descriptor in /dev/fd/${fd_number}, so it will be possible to perform any ordinary file operations on it.

  2. Use /dev/fd/${fd_number} as a filename for the Exasol export file path.

In our case, we perform all this intercommunication in Java. Because Java is a crossplatform language, Linux process spawning details are encapsulated, and "file descriptor" term does not exists explicitly. However, this problem can be solved via Reflection API:

public class LinuxProcessSpawner {

    private static final String HDFS_BINARY_PATH = "/local/hadoop/bin/hdfs";

    /**
     * Spawns new HDFS put process, and return FD number of STDIN pipe
     */
    public static ProcessAndFd spawn(String destinationFileName) throws IOException {

        ProcessBuilder builder = new ProcessBuilder(HDFS_BINARY_PATH, "dfs", "-put", "-", destinationFileName);

        builder.redirectInput(ProcessBuilder.Redirect.PIPE);
        builder.redirectError(ProcessBuilder.Redirect.INHERIT);
        builder.redirectOutput(ProcessBuilder.Redirect.INHERIT);

        Process process = builder.start();

        try {
            OutputStream os = process.getOutputStream();
            Class<FilterOutputStream> parentClass = FilterOutputStream.class;

            Field field = parentClass.getDeclaredField("out");
            field.setAccessible(true);
            Object fileOutputStreamObject = field.get(os);
            field.setAccessible(false);

            FileOutputStream processInput = (FileOutputStream) fileOutputStreamObject;

            field = processInput.getClass().getDeclaredField("fd");
            field.setAccessible(true);
            Object fd = field.get(processInput);
            field.setAccessible(false);

            FileDescriptor fd_casted = (FileDescriptor) fd;

            field = fd.getClass().getDeclaredField("fd");
            field.setAccessible(true);
            int fdNumber = field.getInt(fd_casted);
            field.setAccessible(false);

            return new ProcessAndFd(process, fdNumber);

        } catch (IllegalAccessException | NoSuchFieldException e) {
            throw new IOException(e);
        }
    }

    /**
     * Holder for process communications
     */
    public static class ProcessAndFd {

        private final Process process;
        private final int fdNumber;

        public ProcessAndFd(Process process, int fd_num) {
            this.process = process;
            this.fdNumber = fd_num;
        }

        public int getFd() {
            return fdNumber;
        }

        public Process getProcess() {
            return process;
        }
    }
}


And the overall process of export looks like this:

public class ExportHelper {

    /**
     * @param connection          Initialized JDBC connection to Exasol
     * @param hdfsDestinationPath Where to save data in HDFS
     */
    public static void executeExport(Connection connection, String hdfsDestinationPath) throws SQLException, IOException, InterruptedException {
        LinuxProcessSpawner.ProcessAndFd acceptingProcess = LinuxProcessSpawner.spawn(hdfsDestinationPath);
        String exportSql = "EXPORT (SELECT * FROM fact_user_registration) INTO LOCAL CSV FILE '/dev/fd/" + acceptingProcess.getFd() + "'";
        Statement stmt = connection.createStatement();
        stmt.addBatch(exportSql);

        int[] result = stmt.executeBatch();
        if (result[0] == Statement.EXECUTE_FAILED) {
            throw new SQLException("Export failed");
        }

        // finishing upload process
        Process process = acceptingProcess.getProcess();
        OutputStream stream = process.getOutputStream();
        stream.flush();
        stream.close();

        if (process.isAlive() && !process.waitFor(5, TimeUnit.MINUTES)) {
            throw new IOException("Receiving process was terminated");
        } else if(process.exitValue() != 0) {
            throw new IOException("Upload process exit abnormal");
        }
    }
}

The key benefits of our solution:

  1. Does not consume HDD, as the whole dataset never appears on one disk.

  2. Several times faster than using the usual JDBC operations (fetch, save to disk, etc.).

Microservices for Java, explained. Revitalize your legacy systems (and your career) with Reactive Microservices Architecture, a free O'Reilly book. Brought to you in partnership with Lightbend.

Topics:
hadoop ,java ,file descriptor

Opinions expressed by DZone contributors are their own.

The best of DZone straight to your inbox.

SEE AN EXAMPLE
Please provide a valid email address.

Thanks for subscribing!

Awesome! Check your inbox to verify your email so you can start receiving the latest in tech news and resources.
Subscribe

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}