Back in 2015, Alteryx announced a brand new set of In-Database tools, available to all customers with no additional license required. Alteryx keeps bringing amazing value to its customers without making them pay every time a great new future is announced. This is why I love this company and continue to follow it even though I do not work with Alteryx daily anymore.
In-Database processing sets an ambitious goal to push heavy calculations and operations directly to the source system, without moving data out of it. As of Alteryx 11.5, a large set of databases and distributed NoSQL and Big Data engines supported along with an impressive list of Alteryx In-Database tools.
This is a truly amazing feature especially because Alteryx developers can even stream their own datasets (if they are allowed) to these source systems and blend data (using Alteryx term) right there data is.
As I was working on a Big Data POC for my employer, I wanted to see how well In-Database processing would work with Apache Hive and Apache Impala and if our Alteryx developers could leverage this feature.
Alteryx can connect to Apache Hive and Apache Impala. It is possible as well to use Apache Spark via Spark SQL. A very cool Alteryx Spark Direct integration is coming in 2018 which will allow running native Spark code (either using Java, Scala or Python), without leaving Alteryx, and will let an Alteryx developer tap the full power of Spark distributed processing engine and machine learning library.
Connecting Apache Impala.
The first step was to connect Alteryx to our kerberized Hadoop cluster and run some simple SQL queries, using Hive and Impala. Turned out it was not any different from a regular database connection. All it took was to download drivers from Cloudera, set up ODBC System DSN (filling out typical questions about the hostname, port, user, and password) and I was done.
I checked SASL box and bumped
Rows fetched per block to 10000. You might consider checking
Use Native Query box as well and changing
Transport Buffer Size. I did not do that though.
After that, I created a new workflow in Alteryx client, dragged
Input Data tool and created a connection to Impala. You can use a table name as a source or a custom SQL query. There is also a visual query builder tool I do not really care about. Data can be previewed right there and cached optionally which is a very handy feature you might read about in the help.
Input Data tool to the
Output Data tool to dump data to Alteryx native yxdb file.
Run button and that was it - data started flowing from our cluster.
In-Database processing on a Hadoop cluster.
Note, till now I have not used the In-Database feature. I simply used a regular input and output tool to run my SQL code and output results. Of course, the real life is a bit more complicated than that.
For my recent project, I needed to acquire patients vital signs such as clinical weight, systolic and diastolic blood pressure and a mean arterial pressure. The data was for patients who developed sepsis during a hospital stay. Data was coming from Cerner EMR and available in our Hadoop Data Lake.
I thought it would be a good case to test In-Database feature as the one of the source tables had over 6B rows.
Now with Alteryx, I could craft an SQL query like below, put it in
Input Data tool and feed results downstream. Or I could implement all the joins and filters using In-Database tools and let Alteryx generate SQL code for me.
SELECT encdrv.SOURCE_SYSTEM_ID, encdrv.encntr_id, ce.clinical_event_id AS VITALS_CLINICAL_EVENT_ID, ceevnt.DISPLAY_KEY AS VITALS_CODE_SET, ce.EVENT_CD AS VITALS_CODE_VALUE, ceevnt.DISPLAY_KEY AS VITALS_DISPLAY_KEY, ceevnt.DISPLAY AS VITALS_DISPAY, ce.event_end_dt_tm AS VITALS_EVENT_DT_TM, ce.result_val AS VITALS_RESULT_VAL, ce.RESULT_UNITS_CD AS VITALS_RESULT_UNITS FROM clinical_event ce INNER JOIN cust_encntr_gtt encdrv ON ce.ENCNTR_ID = encdrv.ENCNTR_ID AND ce.SOURCE_SYSTEM_ID = encdrv.SOURCE_SYSTEM_ID INNER JOIN code_value ceevnt ON ceevnt.code_value = ce.event_cd AND ceevnt.SOURCE_SYSTEM_ID = encdrv.SOURCE_SYSTEM_ID AND ceevnt.ACTIVE_IND=1 AND ceevnt.DISPLAY_KEY IN ( 'CLINICALWEIGHT', --weight in kg -- Systolic Blood Pressure 'INETNIBPSYSTOLIC','SYSTOLICBLOODPRESSURE', 'INETNIBPDIASTOLIC', -- Diastolic Blood Pressure 'NIBPMAPCALC' -- Mean Arterial Pressure ) AND ceevnt.CODE_SET=72 WHERE ce.PERSON_ID = encdrv.PERSON_ID AND ce.VALID_UNTIL_DT_TM > current_timestamp() --only valid current results AND ce.RESULT_STATUS_CD IN ( 25, --Auth (Verified) 34,35 --Modified ) AND ce.VIEW_LEVEL = 1
Option 1: Native SQL way.
The downside of this option is that one needs to know SQL to craft a query like above. All the logic is hidden under that little box and some hard-core Alteryx people may not like that.
It is not really a downside for me, as in my opinion, if you are serious about data and analytics, SQL is the skill that you must possess and be very good at it. You can fine tune SQL, using the strong sides of a given source system, put hints, use native functions and so much more.
Another downside is if you need to use those 11M rows, output by a workflow, and join to another data set, coming from the same source system, you will be doing it in your Alteryx client (or Alteryx server) memory, transferring all the data over the network again.
To measure the runtime, I executed the workflow 3 times and took the average (119 seconds). Not bad at all.
Option 2: In-Database way.
This time I took apart my SQL query and used In-Database tools to do replicate the same logic. The end result was a typical-looking Alteryx workflow, readable and clear (hopefully) to Alteryx developers who do not speak SQL.
This time it took 225 seconds for the workflow to finish. Almost two times slower. We will see why in a second.
Option 3: Do not do it.
Another way would be to load these 3 tables individually using
Input Data tools, and then do all the filtering and joins using regular Alteryx tools. You cannot really do that with large tables like in my example above. Alteryx is a super fast tool, but being an in-memory tool, and you cannot simply load those 6B rows. Your workflow will take forever to run, and source system admins will be after you very soon. You do not want to be that guy.
Checking In-Database tools best practices.
I was not happy with Alteryx and In-Database tools performance, considering that Option 2 was two times slower than the native SQL query. I found some good presentations and recommendations (scroll to the bottom of the post for links) to make sure I follow best practices. It came down to:
- Do not do an infamous select star from a table or do not bring the entire table. You want to reduce the size of the date to transfer so pick only columns you really need. It is tempting to get it all but you need to resist. Check!
- Filter data as early as possible in the workflow. Check!
- Limit InDB Browse or Data Stream Out tools. Check! I only have one Data Stream Out tool and no InDB Browse tools at all.
- Use cache feature to speed up development but do not forget to turn it off. Check!
So why is it still slow?
Remember, In-Database converts a sequence of In-Database tools into an SQL code, which is executed at the very end of your workflow, when you are streaming data out. There is an easy way to see that SQL statement. Connect
Dynamic Output In-DB tool and a Browse tool to it. That tool will output connection details along with an actual SQL statement:
Here is how SQL code looks for my example:
WITH `Tool6_5579` AS (Select source_system_id, encntr_id, clinical_event_id, event_cd, event_end_dt_tm, result_val, result_units_cd, person_id, valid_until_dt_tm, result_status_cd, view_level From v500.clinical_event v500_clinical_event), `Tool14True_9840` AS (SELECT * FROM `Tool6_5579` WHERE `valid_until_dt_tm` > current_timestamp() --only valid current results, will filter dups, temp results etc. AND `result_status_cd` IN ( 25, --Auth (Verified) 34,35 --Modified ) AND `view_level` = 1), `Tool4_76c8` AS (Select * From cust_encntr_gtt), `Tool8_6c06` AS (SELECT `Tool14True_9840`.`source_system_id`, `Tool14True_9840`.`encntr_id`, `Tool14True_9840`.`clinical_event_id`, `Tool14True_9840`.`event_cd`, `Tool14True_9840`.`event_end_dt_tm`, `Tool14True_9840`.`result_val`, `Tool14True_9840`.`result_units_cd`, `Tool14True_9840`.`person_id`, `Tool14True_9840`.`valid_until_dt_tm`, `Tool14True_9840`.`result_status_cd`, `Tool14True_9840`.`view_level`, `Tool4_76c8`.`source_system_id` AS `R_source_system_id`, `Tool4_76c8`.`encntr_id` AS `R_encntr_id`, `Tool4_76c8`.`person_id` AS `R_person_id` FROM `Tool14True_9840` INNER JOIN `Tool4_76c8` ON `Tool14True_9840`.`encntr_id` = `Tool4_76c8`.`encntr_id` AND `Tool14True_9840`.`source_system_id` = `Tool4_76c8`.`source_system_id` AND `Tool14True_9840`.`person_id` = `Tool4_76c8`.`person_id`), `Tool7_35b4` AS (select code_value, source_system_id, active_ind, display_key, code_set from code_value), `Tool13True_3eea` AS (SELECT * FROM `Tool7_35b4` WHERE `code_set` = 72 AND active_ind=1 AND DISPLAY_KEY IN ( 'CLINICALWEIGHT', --weight in kg 'INETNIBPSYSTOLIC', 'SYSTOLICBLOODPRESSURE', -- NIBP Systolic Blood Pressure 'INETNIBPDIASTOLIC', -- NIBP Diastolic Blood Pressure 'NIBPMAPCALC' --NIBP MAP calc (Mean Arterial Pressure )), `Tool12_3851` AS (SELECT `Tool8_6c06`.`source_system_id`, `Tool8_6c06`.`encntr_id`, `Tool8_6c06`.`clinical_event_id`, `Tool8_6c06`.`event_cd`, `Tool8_6c06`.`event_end_dt_tm`, `Tool8_6c06`.`result_val`, `Tool8_6c06`.`result_units_cd`, `Tool8_6c06`.`person_id`, `Tool8_6c06`.`valid_until_dt_tm`, `Tool8_6c06`.`result_status_cd`, `Tool8_6c06`.`view_level`, `Tool8_6c06`.`R_source_system_id` AS `L_R_source_system_id`, `Tool8_6c06`.`R_encntr_id`, `Tool8_6c06`.`R_person_id`, `Tool13True_3eea`.`code_value`, `Tool13True_3eea`.`source_system_id` AS `R_source_system_id`, `Tool13True_3eea`.`active_ind`, `Tool13True_3eea`.`display_key`, `Tool13True_3eea`.`code_set` FROM `Tool8_6c06` INNER JOIN `Tool13True_3eea` ON `Tool8_6c06`.`event_cd` = `Tool13True_3eea`.`code_value`) SELECT `encntr_id`, `clinical_event_id`, `event_cd`, `result_val`, `result_units_cd`, `person_id`, `display_key`, `code_set` FROM `Tool12_3851`
Ouch. Alteryx decided to use a bunch of CTEs (common table expressions), joined multiple times on each other.
Impala, like any serious database engine, provides a way to see an execution plan for your queries (also known as explain plan). Execution plan tells database engine how to execute a query in the most efficient and optimal way, how to perform joins and how to apply predicates. Let's compare explain plans for our queries.
Explain plan for the original query:
PLAN-ROOT SINK | 07:EXCHANGE [UNPARTITIONED] | hosts=5 per-host-mem=unavailable | tuple-ids=0,1,2 row-size=258B cardinality=unavailable | 04:HASH JOIN [INNER JOIN, BROADCAST] | hash predicates: encdrv.source_system_id = ceevnt.source_system_id, ce.event_cd = ceevnt.code_value | runtime filters: RF000 <- ceevnt.source_system_id, RF001 <- ceevnt.code_value | hosts=5 per-host-mem=122B | tuple-ids=0,1,2 row-size=258B cardinality=unavailable | |--06:EXCHANGE [BROADCAST] | | hosts=1 per-host-mem=0B | | tuple-ids=2 row-size=110B cardinality=1 | | | 02:SCAN HDFS [v500.code_value ceevnt, RANDOM] | partitions=1/1 files=1 size=33.94MB | predicates: (ceevnt.code_set = 72), (ceevnt.active_ind = 1), (ceevnt.display_key IN ('CLINICALWEIGHT', 'INETNIBPSYSTOLIC', 'SYSTOLICBLOODPRESSURE', 'INETNIBPDIASTOLIC', 'NIBPMAPCALC')) | table stats: 459533 rows total | column stats: all | hosts=1 per-host-mem=240.00MB | tuple-ids=2 row-size=110B cardinality=1 | 03:HASH JOIN [INNER JOIN, BROADCAST] | hash predicates: ce.encntr_id = encdrv.encntr_id, ce.person_id = encdrv.person_id, ce.source_system_id = encdrv.source_system_id | runtime filters: RF002 <- encdrv.encntr_id, RF003 <- encdrv.person_id, RF004 <- encdrv.source_system_id | hosts=5 per-host-mem=2.00GB | tuple-ids=0,1 row-size=148B cardinality=unavailable | |--05:EXCHANGE [BROADCAST] | | hosts=5 per-host-mem=0B | | tuple-ids=1 row-size=20B cardinality=unavailable | | | 01:SCAN HDFS [v500.cust_encntr_gtt encdrv, RANDOM] | partitions=1/1 files=5 size=539.87KB | runtime filters: RF000 -> encdrv.source_system_id | table stats: unavailable | column stats: unavailable | hosts=5 per-host-mem=48.00MB | tuple-ids=1 row-size=20B cardinality=unavailable | 00:SCAN HDFS [v500.clinical_event ce, RANDOM] partitions=1/1 files=200 size=770.52GB predicates: (ce.valid_until_dt_tm > TIMESTAMP '2018-01-05 16:17:08.496026000'), (ce.view_level = 1), (ce.result_status_cd IN (25, 34, 35)) runtime filters: RF000 -> ce.source_system_id, RF001 -> ce.event_cd, RF002 -> ce.encntr_id, RF003 -> ce.person_id, RF004 -> ce.source_system_id table stats: 5507286334 rows total column stats: all hosts=5 per-host-mem=880.00MB tuple-ids=0 row-size=128B cardinality=189275744
Explain plan for the query, generated by In-Database:
PLAN-ROOT SINK | 07:EXCHANGE [UNPARTITIONED] | hosts=1 per-host-mem=unavailable | tuple-ids=6,0,3 row-size=204B cardinality=unavailable | 04:HASH JOIN [INNER JOIN, BROADCAST] | hash predicates: code_value = event_cd | runtime filters: RF000 <- event_cd | hosts=1 per-host-mem=2.00GB | tuple-ids=6,0,3 row-size=204B cardinality=unavailable | |--06:EXCHANGE [BROADCAST] | | hosts=5 per-host-mem=0B | | tuple-ids=0,3 row-size=132B cardinality=unavailable | | | 02:HASH JOIN [INNER JOIN, BROADCAST] | | hash predicates: encntr_id = v500.cust_encntr_gtt.encntr_id, person_id = v500.cust_encntr_gtt.person_id, source_system_id = v500.cust_encntr_gtt.source_system_id | | runtime filters: RF001 <- v500.cust_encntr_gtt.encntr_id, RF002 <- v500.cust_encntr_gtt.person_id, RF003 <- v500.cust_encntr_gtt.source_system_id | | hosts=5 per-host-mem=2.00GB | | tuple-ids=0,3 row-size=132B cardinality=unavailable | | | |--05:EXCHANGE [BROADCAST] | | | hosts=5 per-host-mem=0B | | | tuple-ids=3 row-size=20B cardinality=unavailable | | | | | 01:SCAN HDFS [v500.cust_encntr_gtt, RANDOM] | | partitions=1/1 files=5 size=539.87KB | | table stats: unavailable | | column stats: unavailable | | hosts=5 per-host-mem=48.00MB | | tuple-ids=3 row-size=20B cardinality=unavailable | | | 00:SCAN HDFS [v500.clinical_event v500_clinical_event, RANDOM] | partitions=1/1 files=200 size=770.52GB | predicates: v500_clinical_event.valid_until_dt_tm > TIMESTAMP '2018-01-05 15:50:04.093655000', v500_clinical_event.view_level = 1, v500_clinical_event.result_status_cd IN (25, 34, 35) | runtime filters: RF001 -> encntr_id, RF002 -> person_id, RF003 -> source_system_id | table stats: 5507286334 rows total | column stats: all | hosts=5 per-host-mem=880.00MB | tuple-ids=0 row-size=112B cardinality=189275744 | 03:SCAN HDFS [v500.code_value, RANDOM] partitions=1/1 files=1 size=33.94MB predicates: v500.code_value.code_set = 72, v500.code_value.active_ind = 1, v500.code_value.display_key IN ('CLINICALWEIGHT', 'INETNIBPSYSTOLIC', 'SYSTOLICBLOODPRESSURE', 'INETNIBPDIASTOLIC', 'NIBPMAPCALC') runtime filters: RF000 -> code_value table stats: 459533 rows total column stats: all hosts=1 per-host-mem=192.00MB tuple-ids=6 row-size=72B cardinality=1
Lots of stuff in there, but if you spend 30 minutes and study Impala's Explain plan for performance tuning document, you will see the issue right away. By reading the explain plan from the bottom, you can see that the faster query is scanning the largest 6B row table first and then broadcasting the smaller tables to the nodes, where larger table data located. The query, generated by Alteryx, is scanning one of the smallest tables and broadcasting over the network the largest table. This is why it is so much slower. I am actually surprised that it is not 100 times slower.
What can we do about it? Apparently not much. I knew I could not reorder In-DB input tools in the workflow - Alteryx normally would use internal tool ID to determine execution order. I tried to recreate my flow, by dropping smaller table on the canvas first, or the largest one - it did not help. Alteryx would generate the exact same query.
While I am really excited about In-Database processing tools addition to Alteryx and I believe it adds tremendous value to the product, Alteryx developers need to understand how inefficient these tools can be, compared to a nice clean SQL code. One of the slides, I reference below, does mention that Alteryx has "Smart In-Database processing" on their roadmap. Till now, use this feature with caution.