End to End ETL Process Using CSV Files and MySQL Database

An ETL (Extract, Transform, Load) process is a fundamental data engineering pattern for moving data between systems. This post walks through a lightweight Python pipeline that extracts data from CSV files, infers column types, creates MySQL tables dynamically, and loads the data.

Overview

The pipeline covers:

  1. Extract — Read CSV files using csv.DictReader
  2. Transform — Infer column data types from the CSV data
  3. Load — Create MySQL table and bulk-insert rows with mysql.connector

Detecting Column Data Types

One challenge with CSV imports is that all values are strings. We need to detect the correct SQL type for each column:

def dataType(val, current_type):
    """Infer the SQL data type of a value, escalating the type if needed."""
    try:
        t = ast.literal_eval(val)
    except (ValueError, SyntaxError):
        return 'VARCHAR'

    if type(t) in [int, float]:
        if current_type == 'VARCHAR':
            return 'VARCHAR'
        if type(t) is float:
            return 'FLOAT'
        if type(t) is int:
            if current_type in ['DATE', 'FLOAT']:
                return current_type
            return 'INT'
    elif type(t) is str:
        try:
            datetime.strptime(t, '%Y-%m-%d')
            if current_type not in ['DATE']:
                return 'DATE'
        except ValueError:
            return 'VARCHAR'
    return 'VARCHAR'

The function escalates types as it encounters more rows: an INT column becomes FLOAT if a float is seen, and becomes VARCHAR if a non-numeric string is encountered.

Creating the MySQL Table

import csv
import ast
import mysql.connector
from datetime import datetime

def create_table_from_csv(filepath, table_name, connection):
    cursor = connection.cursor()

    with open(filepath, 'r', encoding='utf-8') as f:
        reader = csv.DictReader(f)
        rows = list(reader)

    if not rows:
        raise ValueError("CSV file is empty")

    # Infer column types
    col_types = {col: 'INT' for col in rows[0].keys()}
    for row in rows:
        for col, val in row.items():
            col_types[col] = dataType(val, col_types[col])

    # Build CREATE TABLE statement
    type_map = {
        'INT': 'INT',
        'FLOAT': 'FLOAT',
        'DATE': 'DATE',
        'VARCHAR': 'VARCHAR(255)',
    }
    col_defs = ', '.join(
        f"`{col}` {type_map[dtype]}"
        for col, dtype in col_types.items()
    )
    create_sql = f"CREATE TABLE IF NOT EXISTS `{table_name}` ({col_defs});"
    cursor.execute(create_sql)
    connection.commit()

    return rows, col_types

Loading Data into MySQL

def load_csv_to_mysql(filepath, table_name, connection):
    rows, col_types = create_table_from_csv(filepath, table_name, connection)
    cursor = connection.cursor()

    columns = list(rows[0].keys())
    placeholders = ', '.join(['%s'] * len(columns))
    col_names = ', '.join(f"`{c}`" for c in columns)
    insert_sql = f"INSERT INTO `{table_name}` ({col_names}) VALUES ({placeholders})"

    data = []
    for row in rows:
        record = []
        for col in columns:
            val = row[col]
            if col_types[col] == 'INT':
                record.append(int(val) if val else None)
            elif col_types[col] == 'FLOAT':
                record.append(float(val) if val else None)
            elif col_types[col] == 'DATE':
                record.append(datetime.strptime(val, '%Y-%m-%d').date() if val else None)
            else:
                record.append(val if val else None)
        data.append(tuple(record))

    cursor.executemany(insert_sql, data)
    connection.commit()
    print(f"Inserted {cursor.rowcount} rows into `{table_name}`.")

Connecting to MySQL and Running the Pipeline

def run_etl(csv_path, table_name, host, database, user, password):
    connection = mysql.connector.connect(
        host=host,
        database=database,
        user=user,
        password=password
    )
    try:
        load_csv_to_mysql(csv_path, table_name, connection)
    finally:
        connection.close()

if __name__ == '__main__':
    run_etl(
        csv_path='data/patients.csv',
        table_name='patients',
        host='localhost',
        database='mydb',
        user='root',
        password='secret'
    )

Processing Multiple CSV Files

To load a directory of CSVs, each file becomes its own table:

import os

def run_etl_directory(directory, host, database, user, password):
    connection = mysql.connector.connect(
        host=host, database=database, user=user, password=password
    )
    try:
        for filename in os.listdir(directory):
            if filename.endswith('.csv'):
                table_name = os.path.splitext(filename)[0]
                filepath = os.path.join(directory, filename)
                print(f"Loading {filename} -> table `{table_name}`...")
                load_csv_to_mysql(filepath, table_name, connection)
    finally:
        connection.close()

Key Considerations

  • Type escalation order: INT → FLOAT → VARCHAR. Once a column is VARCHAR, it stays VARCHAR.
  • NULL handling: Empty strings are inserted as NULL instead of empty strings, preserving database semantics.
  • Batch inserts: Using executemany() is significantly faster than issuing individual INSERT statements.
  • Encoding: Always open CSV files with encoding='utf-8' to handle non-ASCII characters.
  • Security: Never hardcode credentials; use environment variables or a secrets manager in production.