End to End ETL Process Using CSV Files and MySQL Database
An ETL (Extract, Transform, Load) process is a fundamental data engineering pattern for moving data between systems. This post walks through a lightweight Python pipeline that extracts data from CSV files, infers column types, creates MySQL tables dynamically, and loads the data.
Overview
The pipeline covers:
- Extract — Read CSV files using
csv.DictReader - Transform — Infer column data types from the CSV data
- Load — Create MySQL table and bulk-insert rows with
mysql.connector
Detecting Column Data Types
One challenge with CSV imports is that all values are strings. We need to detect the correct SQL type for each column:
def dataType(val, current_type):
"""Infer the SQL data type of a value, escalating the type if needed."""
try:
t = ast.literal_eval(val)
except (ValueError, SyntaxError):
return 'VARCHAR'
if type(t) in [int, float]:
if current_type == 'VARCHAR':
return 'VARCHAR'
if type(t) is float:
return 'FLOAT'
if type(t) is int:
if current_type in ['DATE', 'FLOAT']:
return current_type
return 'INT'
elif type(t) is str:
try:
datetime.strptime(t, '%Y-%m-%d')
if current_type not in ['DATE']:
return 'DATE'
except ValueError:
return 'VARCHAR'
return 'VARCHAR'
The function escalates types as it encounters more rows: an INT column becomes FLOAT if a float is seen, and becomes VARCHAR if a non-numeric string is encountered.
Creating the MySQL Table
import csv
import ast
import mysql.connector
from datetime import datetime
def create_table_from_csv(filepath, table_name, connection):
cursor = connection.cursor()
with open(filepath, 'r', encoding='utf-8') as f:
reader = csv.DictReader(f)
rows = list(reader)
if not rows:
raise ValueError("CSV file is empty")
# Infer column types
col_types = {col: 'INT' for col in rows[0].keys()}
for row in rows:
for col, val in row.items():
col_types[col] = dataType(val, col_types[col])
# Build CREATE TABLE statement
type_map = {
'INT': 'INT',
'FLOAT': 'FLOAT',
'DATE': 'DATE',
'VARCHAR': 'VARCHAR(255)',
}
col_defs = ', '.join(
f"`{col}` {type_map[dtype]}"
for col, dtype in col_types.items()
)
create_sql = f"CREATE TABLE IF NOT EXISTS `{table_name}` ({col_defs});"
cursor.execute(create_sql)
connection.commit()
return rows, col_types
Loading Data into MySQL
def load_csv_to_mysql(filepath, table_name, connection):
rows, col_types = create_table_from_csv(filepath, table_name, connection)
cursor = connection.cursor()
columns = list(rows[0].keys())
placeholders = ', '.join(['%s'] * len(columns))
col_names = ', '.join(f"`{c}`" for c in columns)
insert_sql = f"INSERT INTO `{table_name}` ({col_names}) VALUES ({placeholders})"
data = []
for row in rows:
record = []
for col in columns:
val = row[col]
if col_types[col] == 'INT':
record.append(int(val) if val else None)
elif col_types[col] == 'FLOAT':
record.append(float(val) if val else None)
elif col_types[col] == 'DATE':
record.append(datetime.strptime(val, '%Y-%m-%d').date() if val else None)
else:
record.append(val if val else None)
data.append(tuple(record))
cursor.executemany(insert_sql, data)
connection.commit()
print(f"Inserted {cursor.rowcount} rows into `{table_name}`.")
Connecting to MySQL and Running the Pipeline
def run_etl(csv_path, table_name, host, database, user, password):
connection = mysql.connector.connect(
host=host,
database=database,
user=user,
password=password
)
try:
load_csv_to_mysql(csv_path, table_name, connection)
finally:
connection.close()
if __name__ == '__main__':
run_etl(
csv_path='data/patients.csv',
table_name='patients',
host='localhost',
database='mydb',
user='root',
password='secret'
)
Processing Multiple CSV Files
To load a directory of CSVs, each file becomes its own table:
import os
def run_etl_directory(directory, host, database, user, password):
connection = mysql.connector.connect(
host=host, database=database, user=user, password=password
)
try:
for filename in os.listdir(directory):
if filename.endswith('.csv'):
table_name = os.path.splitext(filename)[0]
filepath = os.path.join(directory, filename)
print(f"Loading {filename} -> table `{table_name}`...")
load_csv_to_mysql(filepath, table_name, connection)
finally:
connection.close()
Key Considerations
- Type escalation order: INT → FLOAT → VARCHAR. Once a column is VARCHAR, it stays VARCHAR.
- NULL handling: Empty strings are inserted as
NULLinstead of empty strings, preserving database semantics. - Batch inserts: Using
executemany()is significantly faster than issuing individualINSERTstatements. - Encoding: Always open CSV files with
encoding='utf-8'to handle non-ASCII characters. - Security: Never hardcode credentials; use environment variables or a secrets manager in production.