How to run Sqoop from NiFi

Sqoop + NiFi = ?

Apache Sqoop is still the best tool to do a bulk data transfer between relational databases and Apache Hadoop. One sunny day in Florida, I was able to reluctantly ingest 5 billion rows from a remote Oracle database in just 4 hours, using 32 mappers. This would take weeks, if I used a traditional ETL tool, like Informatica or Microsoft SSIS.

Apache NiFi, on the other hand, was not designed for batch-oriented jobs. The power of NiFi is to handle bits of data in near real-time. You can literally watch your data, traveling from place A to place B. As I was testing waters with NiFi as a possible solution to implement a near real-time Data Lake architecture, inevitably the question popped about using NiFi to do an initial bulk load of data from RDBMS. I decided to give it a try and see if I could execute Sqoop right from NiFi flow.

Any other options?

I have to mention, you do not have to use Sqoop to get to data in an RDBMS:

I was not really happy with any of these options. I would miss all the rich features and flexibility I would get with Sqoop. Just check how many lines of code behind Sqoop!

And more importantly, Sqoop will be using the power of an entire Hadoop cluster, running MapReduce jobs on YARN. Even a single NiFi node can process hundreds of tables. Other options above would require multiple NiFi nodes and parallel processing with NiFi remote groups.

It is also a nice benefit to use YARN queues to throttle concurrent connections to a source database. DBAs will appreciate it.

Kylo is worth mentioning here as well. It is a nice data lake management tool, open sourced by Teradata, and built on NiFi. Kylo comes with a nice Sqoop processor and controller service for NiFi. You would have to install Kylo and all the dependencies if you want to take this direction.

How to execute Sqoop from NiFi

It boils down to calling a sqoop shell command from NiFi, but I had additional considerations:

  • Sqoop would be executed to initial bulk loads. Incoming flowFile will have an attribute, with a sqoop command, generated upstream.
  • NiFi should handle errors and failures properly. If Sqoop fails, I need to capture an error message and route a flowFile to a Failure relationship.
  • Sometimes we need to kill Sqoop jobs. Since it will be running as a process and sqoop creates MapReduce jobs on YARN, if I kill it by doing yarn application -kill application_id_here , I want Nifi to stop processing that flowFile and route it to a Failure relationship.
  • It would be nice to have a timeout parameter. Sometimes, because of database or network connectivity, the process might just freeze or run for hours, waiting to reestablish a connection. Knowing that my job should only take an hour or two normally, I can specify a timeout, a job will be killed and retried automatically.
  • Finally, I do not just want to run Sqoop and hope for the best, once it is finished. I like to capture a number of rows, ingested by Sqoop. Then I can validate this number, before running dangerous steps downstream, such as overwriting data in table partitions.

Armed with my wish list, I started with ExecuteStream processor. After spending some frustrating hours, trying to come with a proper syntax to pass arguments to that processor (what a pain!) and when dealing with ExecuteStream process freezing (which requires a restart of NiFi service!), I wanted to try something else.

I remembered excellent series of posts by Matt Burgess and decided to write my own script in Groovy. I was not planning to learn Groovy, because NiFi scripts could be created using languages like Jython/Python, JavaScript or even Lua. But I got fascinated by the simplicity and readability of Groovy, which reminded me Python a lot. Matt also explained that Groovy scripts would have the best performance. And did I mention how simple and easy Groovy was? After a few hours, I was able to pick the basics and created my first script. And basics are all you need to start writing your custom scripts in NiFi with Groovy.

Here is how the flow looks like. Incoming flowFiles in my case have an attribute gen.sqoop_import_command - that attribute holds an actual sqoop import shell command to be executed:

Note, I did not type a password on my sqoop command, but used hadoop credential API keystore provider. It is a neat way to secure sqoop command.

My "Sqoop" ExecuteScript processor has an additional property timeout_ms - this is a timeout in milliseconds for a sqoop command. If it does not finish by that time, sqoop process, initiated by a flowFile, would be killed and routed to Failure:

Here is my final script, that you plug into that Sqoop ExecuteScript processor above:

import java.util.regex.*

// Calls sqoop using command, stored in attribute gen.sqoop_import_command
// Does not change flowFile content and just passes it through.
    
// Do not forget to add new processor property:
// timeout_ms - time in milliseconds

// Executes sqoop and adds two attributes to outgoing flowFile:
// sqoop.exit_code - 0 if no errors
// sqoop.row_count - number of extracted rows

flowFile = session.get()
if(!flowFile)
    return

try {

    String sqoop_command = flowFile.getAttribute('gen.sqoop_import_command')

    Process process = sqoop_command.execute()

    def out = new StringBuffer()
    def err = new StringBuffer()

   // non-blocking way to stream stdout and stderr
    process.consumeProcessOutput(out, err)
    process.waitForOrKill(timeout_ms.value.toInteger())
    
//    if(err)
//        println err
//    if(out)
//        println out
        
    //println("exit value = ${process.exitValue()}")
    
    if (process.exitValue() == 143)
        throw new RuntimeException("Sqoop process was killed on timeout")
    else if (process.exitValue() != 0) {
        throw new RuntimeException("Error while executing sqoop command ${err.toString()}")
    }    

    // get row count
    // for some reason, sqoop logs output to stderr not stdout
    def m = err.toString() =~ /Retrieved (\d+) records/
    if (!m) {
        throw new RuntimeException("Cannot extract record count from sqoop!")
    }

    def row_count = m.group(1)
    // println("${row_count} rows extracted by sqoop")

    flowFile = session.putAttribute(flowFile, "sqoop.exit_code", "${process.exitValue()}")
    flowFile = session.putAttribute(flowFile, "sqoop.row_count", "${row_count}")

    session.transfer(flowFile, REL_SUCCESS)

} catch (e) {
    log.error('Error while executing sqoop', e)
    session.transfer(flowFile, REL_FAILURE)
}

Improvements

This was just a prototype to see if it was possible to use Sqoop with NiFi. I am pretty happy how it's turned out, and now in a process to deploy this flow to a development cluster. I am planning to use it to ingest 25 tables and run it a few times a day. It will be a good test.

Here are some ideas I have for future improvement:

  • If a script is killed on a timeout, it does not kill corresponding YARN job and it keeps running on a cluster. It should be easy to parse Sqoop output and to capture YARN application id and then issue yarn application -kill application_id.
  • Use InvokeScriptProcessor instead of ExecuteScript for even better performance. Refer to Matt's post InvokeScriptedProcessor template a faster ExecuteScript.