I was doing some cost comparison of using CSV files vs Parquet File. Interestingly, when using Parquet format, data scanning for similar queries, cost 99% less as compared to CSV format.
Queries ( Mentioned only for Parquet) | CSV ( 11.32 GB ) Run Time (in sec) | CSV ( 11.32 GB ) DataScanned (in GB) | PARQUET ( 4.1 GB ) Run Time (in sec) | PARQUET ( 4.1 GB ) DataScanned (in GB) | % Data Scan saved using Parquet |
SELECT count(*) FROM STG.case_parquet_test; | 3.39 | 11.32 | 2.27 | 0 | 100 |
SELECT * FROM STG.case_parquet_test WHERE CaseNumber IN (‘1′,’2′,’3’); | 3.62 | 11.32 | 4.13 | 0.08157 | 99.27941696 |
SELECT * FROM STG.case_parquet_test WHERE CaseNumber IN (‘1′,’2′,’3’) ORDER BY CaseNumber | 4.83 | 11.32 | 4.13 | 0.08157 | 99.27941696 |
SELECT Id,CaseNumber, Status FROM STG.case_parquet_test WHERE CaseNumber IN (‘1′,’2′,’3’) ORDER BY CaseNumber | 4.08 | 11.32 | 3.02 | 0.01618 | 99.85706714 |
Based on results, it seems like we should push data in S3 using Parquet format as a Best Practice. It would be even better with compression formats — Snappy/Bzip2/LZO or Gzip(not splittable) – more analysis is required.
Advantages of using Parquet Format :
- Full Data Scan can be avoided when using a particular column for filtration.
- When selecting few columns, full data scanning can be avoided as well.
- Less Data storage on S3. (Around 60% less storage) – This can be further compressed using compression format.
- Fetching data should be quicker as compared to CSV format, as scanning less data.
Disadvantages :
- Parquet stores column/schema information within itself, so we may run into issues while running the Select query.
- Column/Schema format should be chosen carefully when using Parquet.
I tried solving the above issue while fetching data from Big Query into Athena. As Big Query API only allows to export of data in the dataframe so, all conversions were made on the dataframe only.
Although conversions look straightforward, but can assure you, it will save a lot of data issues ( as it handles NULLs too) and time as well ( already spent many hours on these conversions only ).
Big Query Table | to | DataFrame ( default formats) | to | New DataFrame ( converted formats) | to Parquet file and | CREATE EXTERNAL table on Athena |
BOOL | bool | df[column].astype(pd.BooleanDtype()) | boolean | |||
INT64 | int64 | df[column].astype(pd.Int64Dtype()) | BIGINT | |||
FLOAT64 | float64 | NA | DOUBLE | |||
TIMESTAMP | datetime64 | df[column].dt.ceil(freq=’ms’) | Timestamp | |||
DATE | datetime64 | df[column].dt.ceil(freq=’ms’) | Timestamp | |||
STRING | object | df[column].astype(pd.StringDtype()) | String |
# Update data types in DataFrame of SELECT query def update_data_types(df, metadata_df): # No Need to update TIME, as its internally recorded as OBJECT in DataFrame and # converted to Integer during external table creation # 13:21:36.511531 --> 48096511531 (converted to micro seconds) for ind, column in enumerate(df.columns): bq_table_data_type = metadata_df[metadata_df["column_name"] == column]["data_type"].iloc[0].upper() if bq_table_data_type == "STRING": df[column] = df[column].astype(pd.StringDtype()) elif bq_table_data_type == "INT64": df[column] = df[column].astype(pd.Int64Dtype()) elif bq_table_data_type in ("BOOLEAN", "BOOL"): df[column] = df[column].astype(pd.BooleanDtype()) elif bq_table_data_type in ("TIMESTAMP", "DATE"): df[column] = df[column].dt.ceil(freq='ms') return df # Create External Table def create_external_table_command(table_name, metadata_df, s3_bucket_dir): columns_list = [] for ind in metadata_df.index: col_name = metadata_df['column_name'][ind] data_type = metadata_df['data_type'][ind] if data_type in ['BOOL', 'BOOLEAN']: new_data_type = 'BOOLEAN' elif data_type in ['INT64', 'INT32', 'INT', 'TIME']: new_data_type = 'BIGINT' elif data_type in ['FLOAT64', 'FLOAT32', 'FLOAT', 'DOUBLE', 'PERCENT']: new_data_type = 'DOUBLE' elif data_type in ['TIMESTAMP', 'DATE']: new_data_type = 'TIMESTAMP' else: new_data_type = 'STRING' line = '{} {}'.format(col_name, new_data_type) columns_list.append(line) create_table = "CREATE EXTERNAL TABLE {}( {} ) ".format(table_name, ",".join(columns_list)) partition_properties = "PARTITIONED BY(part_year int, part_month int, part_day int)" parquet_properties = "{} STORED AS PARQUET LOCATION 's3://{}' TBLPROPERTIES (" \ "\"parquet.compression\"=\"GZIP\")".format(partition_properties, s3_bucket_dir) final_create_table = create_table + parquet_properties return final_create_table
As we know, Athena costs us based on data scan ( $5 per TB) and if tables are in CSV format and used heavily by different teams, there is a lot of scope to save cost there.
Summary:
- Use Parquet format with compression formats wherever applicable.
- Parquet, CSV and Athena format conversions need to be analysed more, for smoother execution.