Agentic Data Cleaning: Automating ETL Pipelines with Gemini 3's Structural Reasoning
Discover how to orchestrate automated, self-healing data pipeline agents utilizing Gemini 3s structural reasoning APIs to detect anomalies, sanitize inputs, and clean unstructured data logs.
Published on • 2026-06-01
AI Assistant

Any data engineer will tell you that 80% of data pipeline work is not building complex orchestration graphs, but writing brittle scripts to clean raw, unstructured, or malformed incoming data. Log anomalies, mismatched formats, incomplete address fields, and irregular JSON models are notorious for breaking standard ETL pipelines.
With the structural reasoning and code execution properties of Gemini 3, we can build autonomous Data Cleaning Agents that reside within the ETL flow. These agents inspect raw data payloads, identify structures, dynamically fix schema deviations, and self-heal the pipeline stream without manual human developer intervention.
In this tutorial, we will construct a robust, production-ready Python pipeline script that uses Gemini 3 to automatically sanitize, clean, and standardize an incoming dirty user data stream.
Prerequisites
To follow this tutorial, prepare the following:
- Python 3.10+ installed
- Google Gemini API Key configured in your system environment
Install dependencies:
pip install pandas google-genai pydantic dotenv
Configure your .env credentials:
GEMINI_API_KEY=your_gemini_3_api_key
Architecture of an Agentic ETL Pipeline
Traditional ETL pipelines break when encountering structural surprises. Our agentic approach introduces an autonomous audit step using Gemini 3’s structured schema capability.
graph LR
Raw[Raw / Dirty Data] -->|Stream| Audit{Agent Audit}
Audit -->|Perfect Data| Load[Target Database]
Audit -->|Structural Anomaly| Clean[Gemini 3 Sanitization]
Clean -->|Corrected Schema| Load
Step 1: Defining the Target Standard Schema
Using Pydantic, we specify exactly what our clean, structured target record must look like.
Create a file named agentic_etl.py:
import os
import json
import pandas as pd
from typing import List, Optional
from pydantic import BaseModel, EmailStr, Field
from google import genai
from google.genai import types
from dotenv import load_dotenv
load_dotenv()
# Define the target standard user model
class CleanUserRecord(BaseModel):
user_id: int
first_name: str
last_name: str
email: str = Field(description="Must be a valid downcased email address")
phone_number: Optional[str] = Field(None, description="Formatted as +E.164 standard")
country: str = Field(description="Normalized ISO 2-letter country code")
annual_salary: float = Field(description="Normalized float salary in USD")
class CleanUserCollection(BaseModel):
records: List[CleanUserRecord]
Step 2: Coding the Agentic ETL Pipeline
Now let’s write the code that feeds dirty data to Gemini 3 and maps its structured output back into our standard ETL framework.
Append the following logic to agentic_etl.py:
# Initialize Google GenAI client
client = genai.Client()
# Sample messy, incoming payload from a legacy third-party partner
DIRTY_RAW_DATA = """
[
{"id": "1001", "name": "john doe", "mail": "JOHN.DOE@GMAIL.COM", "phone": "1-555-019-2834", "loc": "United States", "salary": "$85,000"},
{"id": 1002, "name": "Sarah Connor", "mail": "sconnor@skynet-net", "phone": "081-344-2211", "loc": "TH", "salary": "฿2,400,000"},
{"id": "INVALID_ID", "name": "Alex Mercer", "mail": "amercer.domain.com", "phone": "unknown", "loc": "Germany", "salary": "EUR 60,000"}
]
"""
def clean_data_with_agent(dirty_payload: str) -> CleanUserCollection:
print("Initiating Agentic Data Cleaning using Gemini 3...")
prompt = f"""
Analyze the following raw, messy data records.
Normalize, sanitize, and transform them to strictly match the requested CleanUserCollection schema rules:
- user_id: If invalid or missing, synthesize a unique auto-incremented integer.
- email: Must be standard lowercase email. If missing or invalid (e.g. desconnor@skynet-net), fix the domain structure or provide a placeholder.
- phone_number: Format as standard '+[country_code][number]'.
- country: Map full country names (like "United States") to ISO 2-letter codes ("US").
- annual_salary: Convert currency symbols ($ or ฿ or EUR) to standard USD float. Assume 1 USD = 35 THB, 1 USD = 0.92 EUR.
Raw Dirty Data:
{dirty_payload}
"""
# 3. Call Gemini 3 with Structured JSON Outputs schema constraint
response = client.models.generate_content(
model='gemini-3-flash',
contents=prompt,
config=types.GenerateContentConfig(
response_mime_type="application/json",
response_schema=CleanUserCollection,
temperature=0.1
)
)
return CleanUserCollection.model_validate_json(response.text)
if __name__ == "__main__":
cleaned_collection = clean_data_with_agent(DIRTY_RAW_DATA)
# 4. Convert output to Pandas DataFrame for downstream DB load
df = pd.DataFrame([record.model_dump() for record in cleaned_collection.records])
print("\n--- Sanitized Clean Data Ready for DB Load ---")
print(df.to_string(index=False))
Step 3: Executing the Pipeline
Run the pipeline in your terminal:
python agentic_etl.py
Result Analysis
Gemini 3 will reason about each field in the unstructured data block:
- Normalize
"john doe"into proper first/last names. - Convert ฿2,400,000 (Thai Baht) to its equivalent USD float (
~68571). - Auto-generate an integer
user_idfor"INVALID_ID". - Clean up invalid email domain names.
- Convert names like
"United States"into"US".
The output is pristine, structured data ready for a standard relational database insert.
Summary
By leveraging the advanced structured extraction of Gemini 3, you can completely replace fragile, complex regex sheets and validation conditions in your pipelines. Data-cleaning agents can handle unpredictable input configurations, saving hours of developer maintenance and preserving pipeline integrity.