MetaZoo what??

When I was just learning about Big Data and Hadoop, I took a few online tutorials and read blogs, covering Apache Sqoop. I was naïve enough to believe that ingestion of database tables was a very easy task. All I needed to do was to use Sqoop and it would do everything for me!

Little did I know how far it was from reality. Sqoop is an amazing little tool. It comes with tons of options and parameters, it can do a lot, but it cannot do everything. It is easy to forget that if you do not think the whole process through. Bloggers would just use one command to bring over a table or entire database, create Hive tables, and even use it for incremental loads after the initial one.

The sobering reality is that you will quickly encounter plenty of small, but very irritating things with Sqoop:

  • it does not always map source data types properly
  • it can mess up timestamp time zones
  • it may not support advanced database-specific features (for example, Oracle direct connector does not support Oracle table clusters)
  • you need to pick an optimal number of splits to get a good throughput and not overwhelm the source database
  • some options of Sqoop do not work if used with certain other options (for example, HCatalog integration has tons of limitations)

My head was spinning and I had to use a mind mapping tool (I love XMind) to outline pros and cons of each Sqoop import option (direct mode for Oracle, standard mode for Oracle, HCatalog integration option):

It was obvious I could not do everything I needed in one hop. I gave up on the idea to have Sqoop register Hive tables, and found myself constructing SQL DDL statements to create tables, and writing SQL queries to do the INSERTs.

This is when I thought there had to be a better way to do all of that because I would have to repeat all these steps for 100s of tables and I despise boring repetitive work! I knew that I could probably generate all the things I needed (like a proper Sqoop command and SQL scripts) if I had a place to store metadata about source system tables.

I looked around, and I looked again, and surprisingly, I could not find any tools that would just do that.

Confluent Schema Registry came to my mind at first, but it would only deal with Kafka messages and certainly would not generate any code for me. I found a few abandoned open source projects to store and manage schemas but they also did not address my needs. Hortonworks had an interesting initiative to unify Hive and Kafka schemas, but the part that would use that metadata to generate something else was missing in all of these projects.

LinkedIn's Gobblin project, an universal data ingestion framework, looked very promising, but after playing with it for some time I was not sure it would handle all our use cases without doing extensive coding in Java.

I checked commercial products as well, but none of them really impressed me at the time. I did not care much about user friendliness aspect of those products as I thought they would be limiting us, and I wanted to get the full benefits of open source projects like Apache Hive, Impala, Sqoop, and Spark. I knew we would need a framework that can deal with dozens of source systems and thousands of tables, and the last thing I wanted was to use mouse to configure ingestion pipelines for all those tables.

It was also the time when I was tinkering with Apache Airflow project, which prompted me to learn basics of Python. While I've used many programming languages in my career, I never had an opportunity to use Python.

I instantly felt in love with the language, awesome community, the guiding principles of Python and a vast universe of Python packages. I was really impressed how readable and compact Python code was. I had a blast playing with all the open source Python packages, reading Pythonistas' blogs, listening to podcasts and studying Python projects on GitHub. I felt like I was an 8 year old kid again, learning BASIC on my first computer.

And this is how MetaZoo was born!

Idea

MetaZoo collects metadata about source system tables such as table name, estimated number of rows, estimated row size or a number of database blocks (depending on the database), column names and data types, primary keys, and whatever else you need to describe source system tables. The concept is generic enough and can be applied to files as well.

To onboard new source system, a Data Engineer can use existing Python class specific to that database engine, called metadata extractor, or create a new one. For example, If source system is an Oracle database, she can use Oracle metadata extractor class.

The next step is to process source system metadata. A Data Engineer would normally create a new class that would inherit a base metadata processor class. The main purpose of this step is to create derived schemas, based on the source system schema and actual project requirements. For example, if I need to create an external staging table in Hive over HDFS files, ingested by Sqoop, and then create a final processed table in Impala, metadata processor class will create derived schemas, using proper data types, column names and any other logic you need. There is no magic here - this class is just a custom Python code, that has source system metadata at its disposal to create needed schemas and structures, based on the code developers writes.

Finally, Data Engineer creates new job generator class. At this point, we have all the source, intermediate and final schemas described, but we do not know how data flows into these schemas. Job generator will create various snippets of code (scripts, SQL DDL and DML commands). It will pick optimal arguments for Sqoop and select partition strategy for final tables and so on.

Now, we can run MetaZoo using command-line interface, to collect metadata from source system, create derived schemas and generate all the code.

MetaZoo will also register new tables in job_control table to record a status of ingestion process (more on that a bit later).

MetaZoo does not do actual ingestion, neither does it schedule or coordinate all the work - this is a function of an external tool.

In our case, we decided to use Apache NiFi, an easy to use, powerful, and reliable system to process and distribute data. Not only NiFi fit our architecture really well with its inherit capabilities to do real-time processing, but also with its ability to schedule and coordinate batch jobs.

We could as well use Apache AirFlow, but unfortunately, Airflow did not support our real-time needs.

Design

MetaZoo is a simple Python application. It does not have a fancy UI, just a basic command-line interface (cli).

MySQL is used to store metadata and control tables, but it can use any database, supported by SQLAlchemy.

Each of our source systems has a unique identifier called source_system_id. When we need to ingest another source system data, we would:

  1. Create new source_system_id and assign metadata extractor, metadata processor and a job generator to it.
  2. Then we would choose a few tables and start writing custom logic for metadata processor and a job generator.
  3. Job generator produces a number of snippets and scripts, which we can run and test individually.

Once this is done, we can use MetaZoo cli to populate metadata for hundreds of tables from that source system and generate all the code we need. Two commands below will collect and process metadata for 295 tables, and generate all the relevant scripts:

> metazoo schema extract --schema-category datalake_cerner --source-system 20000000 --file list_of_cerner_tables.csv --delete

[OK] 295 tables successfully processed

> metazoo job generate --job-category datalake_cerner --source-system 20000000 --file  list_of_cerner_tables.csv

[OK] Generator successfully finished

For one of our source systems (Cerner Electronic Health Record platform), MetaZoo does a basic data profiling by doing a random sampling. Oracle database allows to define numeric columns without precision and scale as NUMBER type, and it was a problem for us because we could not find out actual precision and scale for such columns. Sqoop would map such columns to a string type, and Impala would not allow to use string columns with aggregation functions, unless you CAST values from a string to a number on every query.

With MetaZoo, we were able to profile such columns during metadata collection process and use the proper numeric data types (whole numbers or decimals).

Reprocessing

One of the most powerful features of MetaZoo is to allow reprocessing.

Some time ago, a data provider notified us about replacing all the timestamps in 100s of tables and converting them to UTC time. It would be a nightmare to go back to ETL jobs, change each of them to convert time to a proper time zone, and then backload all those tables.

With MetaZoo, we just changed timestamp conversion logic in one place, re-ran MetaZoo, and generated all the scripts to refresh those columns.

A more recent example, when our analysts realized that a source system with 144 tables had a lot of undesired trailing and leading spaces in all the text columns. All we had to do is to change 2 lines of code in MetaZoo and kick ingest process again. It only took us an hour to trim spaces from hundreds of columns on 144 tables!

With MetaZoo, we do not need to deal with individual tables, or worse use UI to re-configure 100s of ETL jobs.

MetaZoo job control and job audit

It is very common for ETL jobs to have a table to store the current state of ETL jobs. MetaZoo is no exception and it provides a basic table called job_control to manage status of the registered jobs.

job_control table:

  • describes current state of a job (i.e. queued, processing, inactive etc.)
  • defines load strategy (i.e. full reload or incremental extract)
  • stores the last successful watermark for incremental extracts (such as the most recent timestamp or ID)

By changing a few values in job_control, we can kick off historical reload of existing tables, or change watermark for incremental extracts, if we need to go a few days back.

Another table, job_audit, is designed as an EAV table and allows us to log certain events during execution of jobs such as duration of the job, or row count ingested. Based on a project, we might audit different things, and EAV design allowed us to have this flexibility.

NiFi MetaZoo processors

Remember, MetaZoo itself is not doing any data processing - this is responsibility of the other external tool. Neither it is responsible to schedule and coordinate jobs - we use Apache NiFi for that.

NiFi is a pretty amazing piece of software. One of the greatest features of NiFi, is that it is easy to extend it to add features you need. And you do not need to be a hardcore Java developer to do that.

We've created a few custom processors in NiFi, integrating MetaZoo and NiFi flows:

  • GetMetaZooJobs processor to get a list of queued jobs to start
  • SetMetaZooJobState processor to change state of the jobs
  • AuditMetaZoo processor to conveniently write various audit events to a log

We also used NiFi REST API to populate certain flow variables, using metadata stored in MetaZoo.

Usage examples

I've mentioned Sqoop a few times and how MetaZoo helps us build arguments for it. Here is an example of a Sqoop command, generated by MetaZoo. Can you imagine doing something like below and choosing proper arguments for 300 tables? Note, how MetaZoo picked number of mappers (16), split data by a primary key (DRG_ID) and used a bunch of other parameters:

sqoop|import|-Dmapred.job.name=sqoop_10000000_DRG|-Dmapreduce.job.queuename=root.bde.source_systems.cerner.source_system_10000000|-Dhadoop.security.credential.provider.path=jceks://hdfs/datalake_cerner/config/sourcedb.password.jceks|-Dmapreduce.map.java.opts= -Duser.timezone=UTC|-Dmapreduce.map.speculative=false|--connect|jdbc:oracle:thin:@secrethostname|--username|secretuser|--password-alias|secret.password.alias|--target-dir|"/projects/bde/datalake_cerner/etl/ingest/V500/DRG/10000000"|--delete-target-dir|--as-parquetfile|--fetch-size|10000|-m|16|--split-by|DRG_ID|--query|"SELECT  DRG_ID,PERSON_ID,ENCNTR_ID,NOMENCLATURE_ID,DRG_PAYMENT,OUTLIER_DAYS,OUTLIER_COST,OUTLIER_REIMBURSEMENT_COST,DRG_PRIORITY,GROUPER_CD,DRG_PAYOR_CD,CREATE_PRSNL_ID,CONTRIBUTOR_SYSTEM_CD,UPDT_ID,ACTIVE_IND,ACTIVE_STATUS_CD,SVC_CAT_HIST_ID,MDC_CD,RISK_OF_MORTALITY_CD,SEVERITY_OF_ILLNESS_CD,SOURCE_VOCABULARY_CD,MDC_APR_CD,COMORBIDITY_CD,ENCNTR_SLICE_ID,LAST_UTC_TS,ACTIVE_STATUS_DT_TM,BEG_EFFECTIVE_DT_TM,CREATE_DT_TM,END_EFFECTIVE_DT_TM,UPDT_DT_TM FROM V500.DRG WHERE $CONDITIONS"   

Another good example is a CREATE TABLE statement below, generated by MetaZoo. It may not look unusually complicated, but this is a great example why I think third-party tools with a nice UI won't do this job.

Our real-time tables are backed by Apache Kudu. Following Apache Kudu's schema design document for best practices and for optimal performance, we apply optional lz4 compression to certain columns and list primary keys, which must be mentioned in a certain order.

But the most challenging task was to pick a partition strategy. Partitions must be defined upfront, during table creation time. Not only you need to pick columns to partition by, there are some limitations in terms of number of tablets/partitions. In order to obey these rules, you need to estimate the size of your table.

MetaZoo came to the rescue again. Our Data Scientist came up with a very accurate XGBoost model to predict final size of Kudu tables, based on the metadata we already have in MetaZoo. Knowing the size of the table, we can assign proper number of partitions, while respecting Kudu limitations and some edge cases (160 partitions in the example below are the maximum number allowed based on our Kudu cluster size):

CREATE TABLE IF NOT EXISTS cerner.`orders` (
   order_id bigint,
   order_mnemonic string compression lz4,
   ...cut 100s of columns here....
   clin_relevant_updt_dt_tm string compression lz4,
   active_status_dt_tm string compression lz4
   PRIMARY KEY (source_system_id, order_id)
 )
 PARTITION BY HASH(order_id) PARTITIONS 160                      
 STORED AS KUDU; 

The last example I want to share is an SQL INSERT statement, generated by MetaZoo as well. Note, how data types are being converted, fixing a bad job by Sqoop and considering nuances of Oracle's Number data type without scale and precision. And how timestamps are converted to a proper time zone, while dropping timestamps before year 1800. It is interesting, that timestamps are stored by Sqoop as long integer value that needs to be divided by 1000 to get Unix epoch time that Impala and Hive can understand:

 INSERT INTO TABLE cerner.orders /* +NOCLUSTERED,NOSHUFFLE */
 SELECT
 source_system_id,
 CAST(order_id as bigint) as order_id,
 CAST(link_nbr as double),
 catalog_cd,CAST(suspend_ind as bigint) as suspend_ind,
 order_mnemonic,
 ....cut....
FROM_TIMESTAMP(CAST(last_utc_ts/1000 AS timestamp),'yyyy-MM-dd HH:mm:ss.SSS') AS last_utc_ts,
 CASE
     WHEN status_dt_tm IS NULL 
         THEN NULL
     WHEN status_dt_tm BETWEEN -5364576000000 AND 4102531200000
         THEN CAST(from_utc_timestamp(CAST(status_dt_tm/1000 as timestamp), 'America/New_York') AS string)
     WHEN status_dt_tm < -5364576000000
         THEN '1800-01-01 00:00:00'
     ELSE 
         '2100-12-31 23:59:59'
 END AS status_dt_tm
 
 ,'{ingest_dt_tm}' AS ingest_dt_tm
 ,FROM_TIMESTAMP(now(),'yyyy-MM-dd HH:mm:ss.SSS') AS update_dt_tm
 FROM  cerner_stage.`orders_ingested`
 WHERE source_system_id = 10000000
 ;   

Conclusion

MetaZoo paired with Apache NiFi proved to be quite useful! We have ingested 22 source systems and 1000s of tables into our Data Lake in a short period of time, while keeping our team small and nimble. MetaZoo saves us time and helps us with the ongoing support. Apache NiFi helps us maintain the pipelines and run code, generated by MetaZoo.

If we need to make changes in the ingestion process in a pipeline that involves 100s of tables, or if we need to reload data from scratch, adding a new transformation rule - MetaZoo makes it easy.

The only wish we have is to easily cope with source system schema changes, but it proved to be quite a challenging ask. Fortunately, this does not happen often with our source systems and it was not worth the effort for us to fully support schema evolution. But it is certainly something for us to consider in future.