Skip to main content

Overview

Many real-world workflows involve bundled documents — a single PDF containing an invoice, a packing slip, and a certificate of origin, or a loan application with multiple forms. This cookbook shows how to build a pipeline that automatically splits, classifies, and extracts data from each section.

What You’ll Build

An automated pipeline that:
  1. Splits a bundled PDF into individual documents
  2. Classifies each section by type
  3. Extracts structured data from each section using type-specific schemas

Step 1: Split the Document Bundle

First, define the expected document types and split the bundle:
import requests
import json

AIFANO_API_KEY = "ak_live_your_key_here"
BASE_URL = "https://platform.aifano.com"

# Split the bundled document
split_result = requests.post(
    f"{BASE_URL}/split",
    headers={"Authorization": f"Bearer {AIFANO_API_KEY}"},
    json={
        "input": "aifano://loan-application-bundle.pdf",
        "split_description": [
            {"title": "Application Form", "description": "The main loan application form with borrower details"},
            {"title": "Income Statement", "description": "Pay stubs, tax returns, or income verification"},
            {"title": "Bank Statement", "description": "Bank account statements showing balances and transactions"},
            {"title": "ID Document", "description": "Government-issued identification (passport, driver's license)"},
            {"title": "Supporting Document", "description": "Any other supporting documentation"}
        ]
    }
).json()

print(f"Found {len(split_result['result'])} sections:")
for section in split_result["result"]:
    print(f"  - {section['category']}: pages {section['page_range']['start']}-{section['page_range']['end']}")

Step 2: Define Type-Specific Schemas

Create extraction schemas for each document type:
Python
schemas = {
    "Application Form": {
        "type": "object",
        "properties": {
            "applicant_name": {"type": "string"},
            "loan_amount": {"type": "number"},
            "loan_purpose": {"type": "string"},
            "employment_status": {"type": "string"},
            "annual_income": {"type": "number"}
        }
    },
    "Income Statement": {
        "type": "object",
        "properties": {
            "employer": {"type": "string"},
            "period": {"type": "string"},
            "gross_income": {"type": "number"},
            "net_income": {"type": "number"}
        }
    },
    "Bank Statement": {
        "type": "object",
        "properties": {
            "bank_name": {"type": "string"},
            "account_number": {"type": "string"},
            "period": {"type": "string"},
            "opening_balance": {"type": "number"},
            "closing_balance": {"type": "number"}
        }
    }
}

Step 3: Extract Data from Each Section

Use async endpoints for parallel processing:
Python
import time

# Submit extraction jobs for each section
jobs = []
for section in split_result["result"]:
    category = section["category"]
    if category not in schemas:
        continue

    job = requests.post(
        f"{BASE_URL}/extract_async",
        headers={"Authorization": f"Bearer {AIFANO_API_KEY}"},
        json={
            "input": f"jobid://{split_result['job_id']}",
            "schema": schemas[category],
            "system_prompt": f"Extract data from this {category}."
        }
    ).json()

    jobs.append({"job_id": job["job_id"], "category": category})

# Poll for all results
results = {}
for job_info in jobs:
    while True:
        status = requests.get(
            f"{BASE_URL}/job/{job_info['job_id']}",
            headers={"Authorization": f"Bearer {AIFANO_API_KEY}"}
        ).json()

        if status["status"] in ("COMPLETED", "FAILED"):
            results[job_info["category"]] = status.get("result")
            break
        time.sleep(2)

print(json.dumps(results, indent=2))
```

## Step 4: Example Output

```json
{
  "Application Form": {
    "applicant_name": "Maria Schmidt",
    "loan_amount": 250000,
    "loan_purpose": "Home purchase",
    "employment_status": "Full-time employed",
    "annual_income": 85000
  },
  "Income Statement": {
    "employer": "TechCorp GmbH",
    "period": "January 2024",
    "gross_income": 7083.33,
    "net_income": 4850.00
  },
  "Bank Statement": {
    "bank_name": "Deutsche Bank",
    "account_number": "DE89 3704 0044 0532 0130 00",
    "period": "December 2023",
    "opening_balance": 12450.00,
    "closing_balance": 15230.00
  }
}

Using Studio Pipelines

Instead of coding the workflow manually, you can create a pipeline in Aifano Studio:
  1. Create a new pipeline with type parse_split_extract
  2. Configure the Split processor with your document categories
  3. Configure the Extract processor with your schemas
  4. Upload documents and run the pipeline with one click
See Pipelines for more details.

Tips

Provide clear, distinct descriptions for each document type. The more specific the description, the more accurate the classification.
Submit all extraction jobs at once using async endpoints, then poll for results. This is significantly faster than sequential processing.
Include a catch-all category like “Supporting Document” to capture sections that don’t match your defined types.
Use jobid:// references to avoid re-processing. Split once, then extract from the same job multiple times with different schemas.

Next Steps