How to ingest a large number of tables into a Big Data Lake, or why I built MetaZoo
MetaZoo what??
When I started learning about Big Data and Hadoop, I got excited about Apache Sqoop. I was naïve enough to believe that the ingestion of database tables was a very easy task. All I needed to do was to use Sqoop and it would do everything for me!
Little did I know how far it was from reality. Sqoop is an amazing little tool. It comes with tons of options and parameters, it can do a lot, but it cannot do everything. It is easy to forget that if you do not think the whole process end to end. A single Sqoop command can pull a table or entire database, create Hive tables, and even enable incremental loads after the initial one is done.
The sobering reality is that you will encounter plenty of small, but very irritating issues with Sqoop:
- it does not always map source data types properly
- it can mess up timestamp time zones
- it does not always support advanced database-specific features such as Oracle table clusters with Oracle direct connector
- you need to decide on an optimal number of mappers to get a good throughput while not overwhelming the source database and upsetting DBAs.
- some features do not work if used with certain other features (for example, HCatalog integration has tons of limitations)
I created a mind map for myself (I love XMind) to outline the pros and cons of each Sqoop import option (direct mode for Oracle, a standard mode for Oracle, HCatalog integration option):
It became obvious that I could not do everything I needed in one hop. I gave up on the idea to have Sqoop register Hive tables and found myself writing a bunch of SQL statements to create and populate tables.
This is when I thought there had to be a better way to do all that. I would have to repeat all these steps for 100s of tables, and I despise boring repetitive work! I knew I could generate all the things I needed (like a proper Sqoop command and SQL scripts) if I had a place to store metadata about source system tables.
It was the year of 2017. I looked around, and, I could not find any tools that would do that.
Confluent Schema Registry came to my mind at first, but it would only deal with Kafka's messages and would not generate any code for me. I found a few abandoned open-source projects to store and manage schemas, but they also did not address my needs. I learned about the Hortonworks initiative to unify Hive and Kafka schemas in a single schema registry, but still, it would not let me generate things I needed.
LinkedIn's Gobblin project, a universal data ingestion framework, looked very promising, but after playing with it for some time I was not sure it would handle all our use cases, without doing extensive coding in Java. And I was not convinced that Gobblin jobs would be easy to support and manage.
I checked commercial products as well, but none of them impressed me at the time. I did not care much about the user-friendliness aspect of those products. In my opinion, UI tools will limit you more than the value they bring. I wanted to enjoy the benefits of open source projects like Apache Hive, Impala, Sqoop, and Spark. We would need a framework that could deal with dozens of source systems and thousands of tables, and the last thing I wanted was to use a point and click graphical interface to configure ingestion pipelines for all those tables.
It was also the time when I was tinkering with Apache Airflow project, which prompted me to learn the basics of Python. While I've used many programming languages in my career, I never had an opportunity to use Python.
I fell in love with the language and its awesome community. Guiding principles of Python made a lot of sense to me, and a vast universe of Python packages was mind-blowing. I was impressed by how readable and compact Python code was. I had a blast! I felt like I was an 8-year-old kid again, learning BASIC on my first computer.
And this is how MetaZoo was born!
Idea
MetaZoo collects metadata about source system tables:
- table name
- number of rows
- estimated row size
- column names
- data types
- primary keys, etc.
The concept is generic and can be used with files as well.
There are 3 steps to onboard the new source system. First, a Data Engineer can use the existing or create a new Python class, called metadata extractor
. This class handles collection of metadata from a given database platform, such as Oracle database.
The next step is to process source system metadata. A Data Engineer would create a new metadata processor
class. The purpose of this step is to create a set of derived schemas, based on the source system schema and project requirements. For example, if I need to ingest data with Sqoop and create external staging tables and internal final tables with Impala, the metadata processor
class will create derived schemas, using proper target data types, column names and any other logic you need. There is no magic here - this is just a custom Python code, that has source system metadata at its disposal to create needed schemas and structures.
Finally, Data Engineer creates a job generator
class. At this point, we have all the source, intermediate and final schemas described, but we do not know how data flows into these schemas. The job generator will create various snippets of code (scripts, SQL DDL and DML commands). It will pick optimal arguments for Sqoop, select partitioning strategy for final tables and so on.
Now, we can run MetaZoo using a command-line interface, and it will collect metadata from source system, create derived schemas and generate all the code.
MetaZoo will also register new tables in the job_control
table to record the status of the ingestion process (more on that a bit later).
Note, that MetaZoo does not do actual ingestion or processing. Neither does it schedule or coordinate work - this is a function of an external tool. In our case, we decided to use Apache NiFi, an easy to use, powerful, and reliable system to process and distribute data. NiFi fits our architecture well with its inherent capabilities to do both real-time and batch processing and scheduling.
Our second choice was Apache AirFlow, but unfortunately, Airflow did not support our real-time needs.
Design
MetaZoo is a simple Python application. It does not have a fancy UI, just a basic command-line interface (cli).
MySQL is used to store metadata and control tables, but it can use any database, supported by SQLAlchemy.
Each of our source systems has a unique identifier called source_system_id
. When we need to ingest another source system data, we would:
- Create new
source_system_id
and assign metadata extractor, metadata processor and a job generator to it. - Then we would choose a few tables and start writing custom logic for metadata processor and a job generator.
- The job generator would produce a number of snippets and scripts, which we would run and test individually.
Once this is done, we can use MetaZoo cli to populate metadata for hundreds of tables from that source system and generate all the code we need. Two commands below will collect and process metadata for 300 tables, and generate all the relevant scripts:
> metazoo schema extract --schema-category datalake_cerner --source-system 20000000 --file list_of_cerner_tables.csv --delete
[OK] 300 tables successfully processed
> metazoo job generate --job-category datalake_cerner --source-system 20000000 --file list_of_cerner_tables.csv
[OK] Generator successfully finished
Data Profiling
One of our source systems (Cerner Electronic Health Record platform) relies on Oracle's NUMBER
data types without explicit precision and scale.
Sqoop would map such columns into a string type, and Impala would not allow using string columns with aggregation functions. Users are forced to convert such columns from a string to a number on every single SQL query they execute.
With MetaZoo, we added a data profiling step, performed during metadata collection process. MetaZoo would figure out actual precision for numeric columns and whether whole numbers or decimals should be used.
Reprocessing
One of the most powerful features of MetaZoo is to allow reprocessing which is a way to make changes in our pipelines, apply them to all the tables, and then reload data all at once.
Let me give a few examples. Some time ago, Cerner notified us about replacing all the timestamps in 100s of tables and converting them to UTC. It would be a nightmare to go back to ETL jobs, find and change all the timestamp columns, and convert timestamps to a proper time zone.
With MetaZoo, we just changed timestamp conversion logic in one place, re-ran MetaZoo, and generated all the scripts to refresh those columns at once for 100s of tables.
A recent example, when our analysts realized that a source system with 144 tables had undesired trailing and leading spaces in all the text columns. All we had to do is to change 2 lines of code in MetaZoo and kick off the ingest process again. It only took us an hour.
With MetaZoo, we do not need to deal with each table individually, or worse use point and click UI to re-configure 100s of ETL jobs.
MetaZoo job control and job audit
It is a common practice in the ETL world to have a table to store the current state of jobs. MetaZoo is no exception and it provides a basic table called job_control
to manage the status of the registered jobs.
job_control
table:
- describes the current state of a job (i.e. queued, processing, inactive, etc.)
- defines load strategy (i.e. full reload or incremental extract)
- stores the last successful watermark for incremental extracts (such as the most recent timestamp or ID)
By changing a few values in job_control
, we can kick off historical reload of existing tables, or change watermark for incremental extracts, if we need to go a few days back.
Another table, job_audit
, is designed as an EAV table and allows us to log certain events during execution of jobs such as duration of the job, or row count ingested. Based on a project, we might audit different things, and EAV design allowed us to have this flexibility.
NiFi MetaZoo processors
Remember, MetaZoo itself does not do any data processing - this is the responsibility of the other external tool. Neither it is responsible to schedule and coordinate jobs - we use Apache NiFi for that.
NiFi is a pretty amazing piece of software. One of the greatest features of NiFi is that it is easy to extend it to add the features you need. And you do not need to be a hardcore Java developer to do that.
We've created a few custom processors in NiFi, integrating MetaZoo and NiFi flows:
- GetMetaZooJobs processor to get a list of queued jobs to start
- SetMetaZooJobState processor to change the state of the jobs
- AuditMetaZoo processor to write various audit events to a log
We also used NiFi REST API to populate certain flow variables, using metadata stored in MetaZoo.
Usage examples
I've mentioned Sqoop a few times and how MetaZoo helps us build arguments for it. Here is an example of a Sqoop command, generated by MetaZoo. Can you imagine doing something like below and choosing proper arguments for 300 tables? Note, how MetaZoo picked a number of mappers (16), split data by a primary key (DRG_ID) and used a bunch of other parameters:
sqoop|import|-Dmapred.job.name=sqoop_10000000_DRG|-Dmapreduce.job.queuename=root.bde.source_systems.cerner.source_system_10000000|-Dhadoop.security.credential.provider.path=jceks://hdfs/datalake_cerner/config/sourcedb.password.jceks|-Dmapreduce.map.java.opts= -Duser.timezone=UTC|-Dmapreduce.map.speculative=false|--connect|jdbc:oracle:thin:@secrethostname|--username|secretuser|--password-alias|secret.password.alias|--target-dir|"/projects/bde/datalake_cerner/etl/ingest/V500/DRG/10000000"|--delete-target-dir|--as-parquetfile|--fetch-size|10000|-m|16|--split-by|DRG_ID|--query|"SELECT DRG_ID,PERSON_ID,ENCNTR_ID,NOMENCLATURE_ID,DRG_PAYMENT,OUTLIER_DAYS,OUTLIER_COST,OUTLIER_REIMBURSEMENT_COST,DRG_PRIORITY,GROUPER_CD,DRG_PAYOR_CD,CREATE_PRSNL_ID,CONTRIBUTOR_SYSTEM_CD,UPDT_ID,ACTIVE_IND,ACTIVE_STATUS_CD,SVC_CAT_HIST_ID,MDC_CD,RISK_OF_MORTALITY_CD,SEVERITY_OF_ILLNESS_CD,SOURCE_VOCABULARY_CD,MDC_APR_CD,COMORBIDITY_CD,ENCNTR_SLICE_ID,LAST_UTC_TS,ACTIVE_STATUS_DT_TM,BEG_EFFECTIVE_DT_TM,CREATE_DT_TM,END_EFFECTIVE_DT_TM,UPDT_DT_TM FROM V500.DRG WHERE $CONDITIONS"
Another good example is a CREATE TABLE statement below, generated by MetaZoo. It may not look like much, but this is a great example of why third-party tools with a nice UI won't do this job.
Our real-time tables are backed by Apache Kudu. Following Apache Kudu's schema design document for best practices and for the optimal performance, we need to apply optional lz4 compression to certain columns, list primary keys in a certain order and place and so forth.
But the most challenging task was to pick a partitioning strategy. Partitions in Kudu must be defined upfront, during table creation time. Not only you need to pick columns to partition by, but there are also some limitations in terms of a number of tablets/partitions, based on the size of the cluster and hardware specification of nodes. To obey these rules, we needed to estimate the size of the tables.
MetaZoo came to the rescue again. Our Data Scientist came up with a very accurate XGBoost model to predict the final size of Kudu tables, based on the source tables metadata we already had in MetaZoo. Knowing the size of a table, we can assign a proper number of partitions, while respecting Kudu's limitations. 160 partitions in the example below are the greatest number that was allowed based on our Kudu cluster size:
CREATE TABLE IF NOT EXISTS cerner.`orders` (
order_id bigint,
order_mnemonic string compression lz4,
...cut 100s of columns here....
clin_relevant_updt_dt_tm string compression lz4,
active_status_dt_tm string compression lz4
PRIMARY KEY (source_system_id, order_id)
)
PARTITION BY HASH(order_id) PARTITIONS 160
STORED AS KUDU;
The last example I want to share is an SQL INSERT statement, generated by MetaZoo as well. Note, how data types are being converted, fixing a bad job by Sqoop and considering nuances of Oracle's Number data type without scale and precision. And note, how timestamps are converted to a proper time zone while dropping timestamps before the year 1800. It is worth mentioning, that timestamps are stored by Sqoop as long integer value that needs to be divided by 1000 to get Unix epoch time that Impala and Hive can understand:
INSERT INTO TABLE cerner.orders /* +NOCLUSTERED,NOSHUFFLE */
SELECT
source_system_id,
CAST(order_id as bigint) as order_id,
CAST(link_nbr as double),
catalog_cd,CAST(suspend_ind as bigint) as suspend_ind,
order_mnemonic,
....cut....
FROM_TIMESTAMP(CAST(last_utc_ts/1000 AS timestamp),'yyyy-MM-dd HH:mm:ss.SSS') AS last_utc_ts,
CASE
WHEN status_dt_tm IS NULL
THEN NULL
WHEN status_dt_tm BETWEEN -5364576000000 AND 4102531200000
THEN CAST(from_utc_timestamp(CAST(status_dt_tm/1000 as timestamp), 'America/New_York') AS string)
WHEN status_dt_tm < -5364576000000
THEN '1800-01-01 00:00:00'
ELSE
'2100-12-31 23:59:59'
END AS status_dt_tm
,'{ingest_dt_tm}' AS ingest_dt_tm
,FROM_TIMESTAMP(now(),'yyyy-MM-dd HH:mm:ss.SSS') AS update_dt_tm
FROM cerner_stage.`orders_ingested`
WHERE source_system_id = 10000000
;
Conclusion
MetaZoo paired with Apache NiFi proved to be quite useful! We have ingested 22 source systems and 1000s of tables into our Data Lake in a short period of time while keeping our team small and nimble. MetaZoo saves us time and helps us with ongoing support. Apache NiFi helps us run code generated by MetaZoo and makes ongoing support easier with custom alerts, logging and providence events.
If we need to make changes in the ingestion pipeline that involves 100s of tables, or if we need to reload data from scratch, adding a new transformation rule - MetaZoo makes it easy.
The only wish we have is to easily cope with source system schema changes, but it proved to be quite a challenging task. Fortunately, this does not happen often with our source systems hence was not worth the effort for us to fully support schema evolution. But it is certainly something for us to consider in the future.
Thanks for reading!