Watch out for timezones with Sqoop, Hive, Impala and Spark
My head was spinning as I tried to accomplish a simple thing (as it seemed at first). I load data from 3 Oracle databases, located in different time zones, using Sqoop and Parquet. Then I load data to Hive using external tables and finally do some light processing and load data to another Hive schema which will be used by data analysts.
Source Oracle databases keep all timestamps in UTC time but our data analysts prefer them loaded using the local time of the source system so they can validate data easily against various operational applications. In other words, I needed to convert UTC time to another time zone and store it like that in Hive. Ultimately I am saving their heads from spinning!
It is confusing already, right? Here is a picture to help you understand this:
The confusion begins!
The first step of my simple pipeline is to load data from remote Oracle databases to HDFS, using Sqoop. I am not doing it manually by the way. I use Apache Airflow (incubating) and Airflow DAGs are driven by metadata definition files, but this is a subject for another post.
I picked a table for my test and decided on a column with timestamps to validate the entire process. It is important to test cases when time falls on Daylight saving time (DST) or Standard time (ST) and I picked a few rows having that in mind. I used this awesome site to determine DST time frames for my tests:
Then I ingested that table to HDFS with Sqoop, storing data in Parquet format. To see how timestamp values are stored by Parquet files, generated by Sqoop, I copied Parquet files from HDFS to a local file system and used parquet-tools
utility to take a peek at it, searching for the ID of my test row:
$ parquet-tools cat 0332d43c-bd5c-468a-b773-8134a629d989.parquet > dump.txt
You probably know that Sqoop will convert timestamps from database to a number and this number represents Unix epoch time. Using epoch converter, I converted this value back to UTC time and compared it with the one recorded in Oracle - it was off by 4 hours! Apparently, Sqoop takes original timestamps from the source database and converts them to system (server) time which in my case is set to Eastern time. There is an old issue SQOOP-423 and it is not addressed still. The good news there is a workaround - you just need to pass the desired timezone to Sqoop like so:
sqoop import -Dmapreduce.map.java.opts=' -Duser.timezone=UTC' ...
By passing UTC above, I told Sqoop to "convert" original UTC time to UTC time. Does not make sense but it works and I verified this again with parquet-tools
utility. This time my time matched exactly! In other words, this option will force MapReduce job to use desired time zone, not your local server timezone.
The confusion grows!
I created an external table using the parquet files from the first step. I knew Hive had some handy UDFs to convert UTC time to the desired time zone and that I also could convert the numeric representation of timestamps, created by Sqoop, into to human readable date/ time. I created a query like the one below to quickly test that and I was about to call it a day and head home:
SELECT
code_value,
-- should show Oracle's UTC time as an epoch number:
begin_effective_dt_tm AS unix_utc_time,
-- should show Oracle's UTC time in human format:
CAST(begin_effective_dt_tm as timestamp) AS utc_time_using_cast,
-- should show time in New York:
from_utc_timestamp(begin_effective_dt_tm,'America/New_York') AS local_time
FROM table
WHERE
code_value IN (id1,id2,id3,id4)
Well, while the first 'should' was right on spot the other two were wrong and the time was off!
After some ~~googling ~~ digging, I found this and this. To recap, Hive always thinks that timestamps in Parquet files are stored in UTC and it will convert them to a local system time (cluster host time) when it outputs results. My cluster is on Eastern time and Hive would show me timestamps in Eastern time even though I did not ask for it. It seems there is an option now to disable this behavior but it would not work with Hive 1.1 (which ships with CDH).
My solution is to convert the time again, back from cluster system time to the desired time zone:
SELECT
code_value,
-- should show Oracle's UTC time as an epoch number:
begin_effective_dt_tm AS unix_utc_time,
-- should show Oracle's UTC time in human format, America/New_York is cluster's system time:
to_utc_timestamp(begin_effective_dt_tm,'America/New_York') AS utc_time_converted,
-- should show local time in Denver:
from_utc_timestamp(to_utc_timestamp(begin_effective_dt_tm,'America/New_York'),'America/Denver') AS local_time
FROM raw_table
WHERE
code_value IN (id1,id2,id3,id4)
This time all the 'should's were met! I can see now UTC time as it was in the source and local time in the desired time zone (Denver, for example).
Always use time zone identifiers for the region like
America/New_York
orAmerica/Denver
, not likeEST
orCST
. This is a common mistake as the latter would not account for daylight savings time! There is a nice list here. Proper time zone identifiers are composed of the name of a continent or ocean and a name of a representative location within this area (typically the name of the largest city).
Using expressions above, I can transform my raw data and do Create Table As Select (CTAS) into the final tables, that will be consumed by analysts and tools.
The confusion ends!
At that point I was really happy with Sqoop and Hive - I retained original UTC timestamps in the raw tables and our end users would see the local time, corresponding to the originating system. I made sure various Hive's date/time functions work as expected.
My last test was to verify that Impala and Spark with Hive will show the same values. Spark (using HiveContext) was good to go. Impala was not!
Luckily, Cloudera documented this behavior here, specifically:
By default, Impala does not store timestamps using the local timezone, to avoid undesired results from unexpected time zone issues. Timestamps are stored and interpreted relative to UTC, both when written to or read from data files, or when converted to or from Unix time values through functions such as
from_unixtime()
orunix_timestamp()
. To convert such a TIMESTAMP value to one that represents the date and time in a specific time zone, convert the original value with thefrom_utc_timestamp()
function.
Because Impala does not assume that TIMESTAMP values are in any particular time zone, you must be conscious of the time zone aspects of data that you query, insert, or convert.
...
If you have data files written by Hive, those TIMESTAMP values represent the local timezone of the host where the data was written, potentially leading to inconsistent results when processed by Impala. To avoid compatibility problems or having to code workarounds, you can specify one or both of these impalad startup flags:
-use_local_tz_for_unix_timestamp_conversions=true
-convert_legacy_hive_parquet_utc_timestamps=true
. Although -convert_legacy_hive_parquet_utc_timestamps
is turned off by default to avoid performance overhead, Cloudera recommends turning it on when processing TIMESTAMP columns in Parquet files written by Hive, to avoid unexpected behavior.
Once I set both startup flags to true and restarted Impala (do not do that, read below why), the timestamps matched precisely and would show the same time in Hive, Spark and Impala.
Now I can go home!
It is not over yet
Next day after we enabled convert_legacy_hive_parquet_utc_timestamps
option we started noticing extreme slowdown for all queries, dealing with larger data sets. We reviewed older benchmarks and noticed a huge degradation in performance, here is an example:
After pulling some hair, our Hadoop admin identified the issue. Sure enough, once we turned the setting back, our queries started running as normal.
The only workaround (thanks Ruslan Dautkhanov!) is not to use timestamp data type columns at all and load timestamps to string type columns instead. Most if not all Hive and Impala date and time functions happily take strings as long as your timestamps are properly formatted.