Skip to main content

CSV Metadata Upload to Visual Layer

This page contains the complete, ready-to-use Python script for uploading custom metadata from CSV files to Visual Layer datasets with JWT authentication support and automatic media_id mapping.
This script provides a simple workflow for uploading custom metadata from CSV files. It automatically exports your dataset to get the filename-to-media_id mapping, reads your CSV file, creates a custom field, and uploads the metadata values.

Back to CSV Metadata Upload Documentation

Return to the main CSV metadata upload guide for usage instructions, CSV format requirements, and workflow examples.

Key Features

  • JWT Authentication Support - Works with Visual Layer cloud and on-premises installations
  • Automatic Dataset Export - Fetches media_id mapping automatically via API
  • Basename Matching - Handles full paths in CSV by extracting filenames
  • Single Field Upload - Focused workflow for one field at a time (run multiple times for multiple fields)
  • Progress Reporting - Clear status updates with emoji indicators
  • Robust Error Handling - Helpful error messages for common issues
  • Temp File Management - Automatic cleanup of temporary files

Installation Requirements

Before using this script, install the required Python package:
pip install requests

Complete Script Code

#!/usr/bin/env python3
"""
Upload metadata from CSV to Visual Layer using custom metadata API.
Reads CSV with filename and metadata columns, maps to media IDs, and uploads.
"""

import csv
import json
import requests
import argparse
import os
import sys
import tempfile
from typing import Dict, List, Any, Optional
from pathlib import Path


class CSVMetadataUploader:
    def __init__(self, dataset_id: str, base_url: str, jwt_token: str):
        self.dataset_id = dataset_id
        self.raw_base_url = base_url.rstrip('/')
        self.jwt_token = jwt_token

        # Automatically add /api/v1/datasets if not present
        if not base_url.endswith('/api/v1/datasets'):
            if base_url.endswith('/'):
                base_url = base_url.rstrip('/')
            self.base_url = f"{base_url}/api/v1/datasets"
        else:
            self.base_url = base_url

        self.session = requests.Session()
        self.session.headers.update({
            'Authorization': f'Bearer {jwt_token}'
        })
        self._temp_files = []

    def export_dataset(self) -> Dict[str, str]:
        """Export dataset and return mapping of filename -> media_id."""
        print("πŸ“€ Exporting dataset to get media_id mappings...")

        url = f"{self.raw_base_url}/api/v1/dataset/{self.dataset_id}/export_media_id"

        try:
            response = self.session.get(url)
            if response.status_code == 200:
                # Parse CSV response
                import io
                csv_content = response.text
                csv_reader = csv.DictReader(io.StringIO(csv_content))

                # Build mapping from filename to media_id
                mapping = {}
                for row in csv_reader:
                    filename = row.get('filename', '')
                    media_id = row.get('media_id', '')

                    if media_id and filename:
                        # Extract just the filename without path
                        basename = os.path.basename(filename)
                        mapping[basename] = media_id

                print(f"   βœ… Exported {len(mapping)} media items")
                return mapping
            else:
                print(f"   ❌ Failed to export dataset: {response.status_code} - {response.text}")
                return {}
        except Exception as e:
            print(f"   ❌ Export failed: {str(e)}")
            return {}

    def read_csv(self, csv_file: str) -> List[Dict[str, Any]]:
        """Read CSV file and return list of records."""
        if not os.path.exists(csv_file):
            raise FileNotFoundError(f"CSV file not found: {csv_file}")

        with open(csv_file, 'r', encoding='utf-8') as f:
            reader = csv.DictReader(f)
            records = list(reader)

        print(f"πŸ“Š Loaded {len(records)} records from CSV")
        return records

    def create_custom_field(self, field_name: str, field_type: str = 'link') -> Optional[str]:
        """Create a custom field and return field_id (task_id)."""
        print(f"πŸ”§ Creating custom field: {field_name} ({field_type})")

        field_data = {
            "field_name": field_name,
            "field_type": field_type
        }

        url = f"{self.base_url}/{self.dataset_id}/custom_metadata/tasks"

        try:
            response = self.session.post(url, json=field_data)
            if response.status_code == 200:
                result = response.json()
                task_id = result.get('task_id')
                print(f"   βœ… Created field with task ID: {task_id}")
                return task_id
            elif "already exists" in response.text:
                print(f"   πŸ”„ Field already exists, skipping creation")
                return None
            else:
                print(f"   ❌ Failed to create field: {response.status_code} - {response.text}")
                return None
        except Exception as e:
            print(f"   ❌ Request failed: {str(e)}")
            return None

    def upload_field_data(self, field_id: str, csv_records: List[Dict],
                         filename_col: str, value_col: str,
                         filename_to_media_id: Dict[str, str]) -> Optional[str]:
        """Upload data for a custom field."""
        print(f"   πŸ“€ Uploading data for field...")

        upload_data = []
        matched_count = 0

        for row in csv_records:
            filename = os.path.basename(row.get(filename_col, '').strip())
            value = row.get(value_col, '').strip()

            if not filename or not value:
                continue

            media_id = filename_to_media_id.get(filename)
            if not media_id:
                continue

            upload_data.append({
                "media_id": media_id,
                "value": value
            })
            matched_count += 1

        print(f"   πŸ“Š Matched {matched_count}/{len(csv_records)} records")

        if not upload_data:
            print(f"   ⚠️  No data to upload")
            return None

        # Save to temp file
        with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as f:
            json.dump(upload_data, f, indent=2)
            temp_file = f.name

        self._temp_files.append(temp_file)

        # Upload
        url = f"{self.base_url}/{self.dataset_id}/custom_metadata/tasks/{field_id}"

        try:
            with open(temp_file, 'rb') as f:
                files = {'file': (f'metadata.json', f, 'application/json')}
                response = self.session.post(url, files=files)

            if response.status_code in [200, 202]:
                print(f"   βœ… Upload completed successfully")
                return field_id
            else:
                print(f"   ❌ Failed to upload: {response.status_code} - {response.text}")
                return None
        except Exception as e:
            print(f"   ❌ Upload failed: {str(e)}")
            return None

    def cleanup_temp_files(self):
        """Remove temporary files."""
        for temp_file in self._temp_files:
            try:
                if os.path.exists(temp_file):
                    os.remove(temp_file)
            except:
                pass

    def process(self, csv_file: str, filename_col: str, value_col: str, field_name: str):
        """Main processing function."""
        try:
            print("\nπŸš€ Starting CSV Metadata Upload")
            print(f"πŸ“ CSV File: {csv_file}")
            print(f"πŸ“‹ Filename column: {filename_col}")
            print(f"πŸ“‹ Value column: {value_col}")
            print(f"🏷️  Field name: {field_name}")
            print()

            # Step 1: Export dataset
            filename_to_media_id = self.export_dataset()
            if not filename_to_media_id:
                raise Exception("Failed to export dataset")

            # Step 2: Read CSV
            csv_records = self.read_csv(csv_file)
            if not csv_records:
                raise Exception("No records in CSV")

            # Step 3: Create custom field
            print(f"\nπŸ”„ Processing field: {field_name}")
            field_id = self.create_custom_field(field_name, 'link')
            if not field_id:
                raise Exception("Failed to create field")

            # Step 4: Upload data
            result = self.upload_field_data(field_id, csv_records, filename_col,
                                           value_col, filename_to_media_id)

            if result:
                print("\nπŸŽ‰ Upload completed successfully!")
            else:
                print("\n❌ Upload failed")
                sys.exit(1)

        finally:
            self.cleanup_temp_files()


def main():
    parser = argparse.ArgumentParser(description='Upload CSV metadata to Visual Layer')
    parser.add_argument('--csv', required=True, help='Path to CSV file')
    parser.add_argument('--dataset-id', required=True, help='Dataset ID')
    parser.add_argument('--base-url', default='https://app.visual-layer.com',
                       help='Base URL (default: https://app.visual-layer.com)')
    parser.add_argument('--token', required=True, help='JWT token')
    parser.add_argument('--filename-col', default='filename',
                       help='CSV column with filenames (default: filename)')
    parser.add_argument('--value-col', default='label',
                       help='CSV column with values (default: label)')
    parser.add_argument('--field-name', default='url',
                       help='Name of custom field to create (default: url)')

    args = parser.parse_args()

    uploader = CSVMetadataUploader(args.dataset_id, args.base_url, args.token)
    uploader.process(args.csv, args.filename_col, args.value_col, args.field_name)


if __name__ == "__main__":
    main()

How to Use

  1. Save the script to a file named csv_metadata_upload.py
  2. Prepare your CSV file with filename and metadata columns:
    filename,url
    image001.jpg,https://example.com/product/123
    image002.jpg,https://example.com/product/456
    
  3. Run the script with your parameters:
    python csv_metadata_upload.py \
      --csv metadata.csv \
      --dataset-id your-dataset-id \
      --token your-jwt-token \
      --filename-col filename \
      --value-col url \
      --field-name product_url
    

What the Script Does

The script follows this workflow:
  1. Exports dataset - Calls /api/v1/dataset/{dataset_id}/export_media_id to get filename β†’ media_id mapping
  2. Reads CSV - Loads your CSV file and extracts filename and value columns
  3. Creates field - Creates a custom metadata field via /api/v1/datasets/{dataset_id}/custom_metadata/tasks
  4. Maps values - Matches CSV filenames to media_ids from the export
  5. Uploads metadata - Sends JSON file with [{"media_id": "...", "value": "..."}] format
  6. Cleans up - Removes temporary files

Script Output Example

πŸš€ Starting CSV Metadata Upload
πŸ“ CSV File: products.csv
πŸ“‹ Filename column: filename
πŸ“‹ Value column: product_url
🏷️  Field name: product_link

πŸ“€ Exporting dataset to get media_id mappings...
   βœ… Exported 1247 media items
πŸ“Š Loaded 1247 records from CSV

πŸ”„ Processing field: product_link
πŸ”§ Creating custom field: product_link (link)
   βœ… Created field with task ID: abc123-def456
   πŸ“€ Uploading data for field...
   πŸ“Š Matched 1247/1247 records
   βœ… Upload completed successfully

πŸŽ‰ Upload completed successfully!

Customization Tips

Change field type from β€˜link’ to other types: Modify line 84 in the process method:
# Change from:
field_id = self.create_custom_field(field_name, 'link')

# To:
field_id = self.create_custom_field(field_name, 'string')  # or 'enum', 'float', etc.
Process multiple fields from same CSV: Run the script multiple times with different column and field name arguments:
# First field
python csv_metadata_upload.py --csv data.csv --value-col url --field-name product_url

# Second field
python csv_metadata_upload.py --csv data.csv --value-col category --field-name product_category

# Third field
python csv_metadata_upload.py --csv data.csv --value-col price --field-name product_price