#!/usr/bin/env python3
"""
Upload metadata from CSV to Visual Layer using custom metadata API.
Reads CSV with filename and metadata columns, maps to media IDs, and uploads.
"""
import csv
import json
import requests
import argparse
import os
import sys
import tempfile
from typing import Dict, List, Any, Optional
from pathlib import Path
class CSVMetadataUploader:
def __init__(self, dataset_id: str, base_url: str, jwt_token: str):
self.dataset_id = dataset_id
self.raw_base_url = base_url.rstrip('/')
self.jwt_token = jwt_token
# Automatically add /api/v1/datasets if not present
if not base_url.endswith('/api/v1/datasets'):
if base_url.endswith('/'):
base_url = base_url.rstrip('/')
self.base_url = f"{base_url}/api/v1/datasets"
else:
self.base_url = base_url
self.session = requests.Session()
self.session.headers.update({
'Authorization': f'Bearer {jwt_token}'
})
self._temp_files = []
def export_dataset(self) -> Dict[str, str]:
"""Export dataset and return mapping of filename -> media_id."""
print("π€ Exporting dataset to get media_id mappings...")
url = f"{self.raw_base_url}/api/v1/dataset/{self.dataset_id}/export_media_id"
try:
response = self.session.get(url)
if response.status_code == 200:
# Parse CSV response
import io
csv_content = response.text
csv_reader = csv.DictReader(io.StringIO(csv_content))
# Build mapping from filename to media_id
mapping = {}
for row in csv_reader:
filename = row.get('filename', '')
media_id = row.get('media_id', '')
if media_id and filename:
# Extract just the filename without path
basename = os.path.basename(filename)
mapping[basename] = media_id
print(f" β
Exported {len(mapping)} media items")
return mapping
else:
print(f" β Failed to export dataset: {response.status_code} - {response.text}")
return {}
except Exception as e:
print(f" β Export failed: {str(e)}")
return {}
def read_csv(self, csv_file: str) -> List[Dict[str, Any]]:
"""Read CSV file and return list of records."""
if not os.path.exists(csv_file):
raise FileNotFoundError(f"CSV file not found: {csv_file}")
with open(csv_file, 'r', encoding='utf-8') as f:
reader = csv.DictReader(f)
records = list(reader)
print(f"π Loaded {len(records)} records from CSV")
return records
def create_custom_field(self, field_name: str, field_type: str = 'link') -> Optional[str]:
"""Create a custom field and return field_id (task_id)."""
print(f"π§ Creating custom field: {field_name} ({field_type})")
field_data = {
"field_name": field_name,
"field_type": field_type
}
url = f"{self.base_url}/{self.dataset_id}/custom_metadata/tasks"
try:
response = self.session.post(url, json=field_data)
if response.status_code == 200:
result = response.json()
task_id = result.get('task_id')
print(f" β
Created field with task ID: {task_id}")
return task_id
elif "already exists" in response.text:
print(f" π Field already exists, skipping creation")
return None
else:
print(f" β Failed to create field: {response.status_code} - {response.text}")
return None
except Exception as e:
print(f" β Request failed: {str(e)}")
return None
def upload_field_data(self, field_id: str, csv_records: List[Dict],
filename_col: str, value_col: str,
filename_to_media_id: Dict[str, str]) -> Optional[str]:
"""Upload data for a custom field."""
print(f" π€ Uploading data for field...")
upload_data = []
matched_count = 0
for row in csv_records:
filename = os.path.basename(row.get(filename_col, '').strip())
value = row.get(value_col, '').strip()
if not filename or not value:
continue
media_id = filename_to_media_id.get(filename)
if not media_id:
continue
upload_data.append({
"media_id": media_id,
"value": value
})
matched_count += 1
print(f" π Matched {matched_count}/{len(csv_records)} records")
if not upload_data:
print(f" β οΈ No data to upload")
return None
# Save to temp file
with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as f:
json.dump(upload_data, f, indent=2)
temp_file = f.name
self._temp_files.append(temp_file)
# Upload
url = f"{self.base_url}/{self.dataset_id}/custom_metadata/tasks/{field_id}"
try:
with open(temp_file, 'rb') as f:
files = {'file': (f'metadata.json', f, 'application/json')}
response = self.session.post(url, files=files)
if response.status_code in [200, 202]:
print(f" β
Upload completed successfully")
return field_id
else:
print(f" β Failed to upload: {response.status_code} - {response.text}")
return None
except Exception as e:
print(f" β Upload failed: {str(e)}")
return None
def cleanup_temp_files(self):
"""Remove temporary files."""
for temp_file in self._temp_files:
try:
if os.path.exists(temp_file):
os.remove(temp_file)
except:
pass
def process(self, csv_file: str, filename_col: str, value_col: str, field_name: str):
"""Main processing function."""
try:
print("\nπ Starting CSV Metadata Upload")
print(f"π CSV File: {csv_file}")
print(f"π Filename column: {filename_col}")
print(f"π Value column: {value_col}")
print(f"π·οΈ Field name: {field_name}")
print()
# Step 1: Export dataset
filename_to_media_id = self.export_dataset()
if not filename_to_media_id:
raise Exception("Failed to export dataset")
# Step 2: Read CSV
csv_records = self.read_csv(csv_file)
if not csv_records:
raise Exception("No records in CSV")
# Step 3: Create custom field
print(f"\nπ Processing field: {field_name}")
field_id = self.create_custom_field(field_name, 'link')
if not field_id:
raise Exception("Failed to create field")
# Step 4: Upload data
result = self.upload_field_data(field_id, csv_records, filename_col,
value_col, filename_to_media_id)
if result:
print("\nπ Upload completed successfully!")
else:
print("\nβ Upload failed")
sys.exit(1)
finally:
self.cleanup_temp_files()
def main():
parser = argparse.ArgumentParser(description='Upload CSV metadata to Visual Layer')
parser.add_argument('--csv', required=True, help='Path to CSV file')
parser.add_argument('--dataset-id', required=True, help='Dataset ID')
parser.add_argument('--base-url', default='https://app.visual-layer.com',
help='Base URL (default: https://app.visual-layer.com)')
parser.add_argument('--token', required=True, help='JWT token')
parser.add_argument('--filename-col', default='filename',
help='CSV column with filenames (default: filename)')
parser.add_argument('--value-col', default='label',
help='CSV column with values (default: label)')
parser.add_argument('--field-name', default='url',
help='Name of custom field to create (default: url)')
args = parser.parse_args()
uploader = CSVMetadataUploader(args.dataset_id, args.base_url, args.token)
uploader.process(args.csv, args.filename_col, args.value_col, args.field_name)
if __name__ == "__main__":
main()