Skip to content

Data Persistence & Sync Examples

This directory demonstrates how to synchronize files between your local machine and the remote AGB session, ensuring data persistence across sessions.

Overview

AGB provides a powerful synchronization mechanism that can:

  • Upload: Automatically upload local files to the cloud session.
  • Download: Automatically download generated files from the cloud session back to your local machine.
  • Watch: (Optional) Continuously sync changes.

Examples

File Archive Mode (file_archive_mode.py)

Demonstrates the simplest way to persist data: downloading specific files (like generated charts or logs) after execution.

py
#!/usr/bin/env python3
"""
File Archive Mode Demo

This example demonstrates how to use Archive upload mode for context synchronization.
It shows how to create files that will be automatically compressed into zip format
when uploaded to the context storage, and how to verify the archive behavior.
"""

import os
import time
import asyncio

from agb import AGB, ContextSync, SyncPolicy, UploadPolicy
from agb.context_sync import UploadMode
from agb.session_params import CreateSessionParams

async def demo_archive_mode(agb, context):
    """Demo Archive upload mode - files are automatically compressed into ZIP format."""
    print("\n🗜️ Archive Mode Demo")
    print("=" * 40)
    
    session = None
    
    try:
        # Configure Archive upload policy
        upload_policy = UploadPolicy(upload_mode=UploadMode.ARCHIVE)
        sync_policy = SyncPolicy(upload_policy=upload_policy)
        
        session_params = CreateSessionParams(
            image_id="agb-code-space-1"
        )

        # Configure context synchronization with Archive mode
        sync_path = "/tmp/archive-mode-test"
        context_sync = ContextSync.new(
            context.id,
            sync_path,
            sync_policy
        )
        session_params.context_syncs = [context_sync]

        print("Creating session with Archive upload mode...")
        session_result = agb.create(session_params)
        if not session_result.success or not session_result.session:
            print("❌ Failed to create session")
            return False

        session = session_result.session
        print(f"✅ Session created: {session.session_id}")
        print(f"📦 Upload mode: {sync_policy.upload_policy.upload_mode.value}")

        # Wait for session to be ready
        print("⏳ Waiting for session to be ready...")
        await asyncio.sleep(5)

        # Create directory
        print(f"📁 Creating directory: {sync_path}")
        dir_result = session.file_system.create_directory(sync_path)
        if not dir_result.success:
            print(f"❌ Failed to create directory: {dir_result.error_message}")
            return False

        # Create test files
        print("📝 Creating test files...")
        
        # File : 5KB content
        content_size = 5 * 1024  # 5KB
        base_content = "Archive mode test successful! This is a test file created in the session path. "
        repeated_content = base_content * (content_size // len(base_content) + 1)
        file_content = repeated_content[:content_size]
        
        file_path = f"{sync_path}/archive-test-file-5kb.txt"
        print(f"Creating file: {file_path}")
        print(f"File content size: {len(file_content)} bytes")

        write_result = session.file_system.write_file(file_path, file_content, "overwrite")
        if not write_result.success:
            print(f"❌ Failed to write file: {write_result.error_message}")
            return False

        print(f"✅ File write successful!")

        # Sync context data
        print("🔄 Syncing context data...")
        sync_result = await session.context.sync()
        if not sync_result.success:
            print(f"❌ Context sync failed")
            return False

        print("✅ Context sync completed")

        # List and verify files
        print("📋 Listing files in context sync directory...")
        
        list_result = agb.context.list_files(context.id, sync_path, page_number=1, page_size=10)

        if not list_result.success:
            print(f"❌ Failed to list files: {getattr(list_result, 'error_message', 'Unknown error')}")
            return False

        print(f"✅ List files successful! Total files found: {len(list_result.entries)}")

        if list_result.entries:
            print("Files in Archive mode:")
            for index, entry in enumerate(list_result.entries):
                print(f"  [{index}] FilePath: {entry.file_path}")
                print(f"      FileType: {entry.file_type}")
                print(f"      FileName: {entry.file_name}")
                
                # Verify that file name contains .zip extension for archive mode
                if entry.file_name.lower().endswith('.zip'):
                    print(f"      ✅ File correctly archived as ZIP format")
                else:
                    print(f"      ⚠️ Unexpected file format: {entry.file_name} (expected: .zip extension)")

        print("🎉 Archive Mode demo completed successfully!")
        return True

    except Exception as e:
        print(f"❌ Archive Mode demo failed with error: {e}")
        return False

    finally:
        # Cleanup session
        if session:
            try:
                agb.delete(session, sync_context=True)
                print("✅ Archive mode session deleted")
            except Exception as e:
                print(f"⚠️ Failed to delete archive mode session: {e}")


async def demo_file_mode(agb, context):
    """Demo File upload mode - files are uploaded in their original format."""
    print("\n📄 File Mode Demo")
    print("=" * 40)
    
    session = None
    
    try:
        # Configure File upload policy (default mode)
        upload_policy = UploadPolicy(upload_mode=UploadMode.FILE)
        sync_policy = SyncPolicy(upload_policy=upload_policy)
        
        session_params = CreateSessionParams(
            image_id="agb-code-space-1"
        )

        # Configure context synchronization with File mode
        sync_path = "/tmp/file-mode-test"
        context_sync = ContextSync.new(
            context.id,
            sync_path,
            sync_policy
        )
        session_params.context_syncs = [context_sync]

        print("Creating session with File upload mode...")
        session_result = agb.create(session_params)
        if not session_result.success or not session_result.session:
            print("❌ Failed to create session")
            return False

        session = session_result.session
        print(f"✅ Session created: {session.session_id}")
        print(f"📦 Upload mode: {sync_policy.upload_policy.upload_mode.value}")

        # Wait for session to be ready
        print("⏳ Waiting for session to be ready...")
        await asyncio.sleep(5)

        # Create directory
        print(f"📁 Creating directory: {sync_path}")
        dir_result = session.file_system.create_directory(sync_path)
        if not dir_result.success:
            print(f"❌ Failed to create directory: {dir_result.error_message}")
            return False

        # Create test files
        print("📝 Creating test files...")
        
        # File 1: Text file
        text_content = "File mode test successful! This file is uploaded in its original format without compression."
        text_path = f"{sync_path}/file-mode-test.txt"
        print(f"Creating text file: {text_path}")

        write_result = session.file_system.write_file(text_path, text_content, "overwrite")
        if not write_result.success:
            print(f"❌ Failed to write text file: {write_result.error_message}")
            return False

        print(f"✅ Text file write successful!")

        # Sync context data
        print("🔄 Syncing context data...")
        sync_result = await session.context.sync()
        if not sync_result.success:
            print(f"❌ Context sync failed")
            return False

        print("✅ Context sync completed")

        # List and verify files
        print("📋 Listing files in context sync directory...")
        
        list_result = agb.context.list_files(context.id, sync_path, page_number=1, page_size=10)

        if not list_result.success:
            print(f"❌ Failed to list files: {getattr(list_result, 'error_message', 'Unknown error')}")
            return False

        print(f"✅ List files successful! Total files found: {len(list_result.entries)}")

        if list_result.entries:
            print("Files in File mode:")
            for index, entry in enumerate(list_result.entries):
                print(f"  [{index}] FilePath: {entry.file_path}")
                print(f"      FileType: {entry.file_type}")
                print(f"      FileName: {entry.file_name}")
                
                # Verify that file name is not compressed (not .zip) for file mode
                if not entry.file_name.lower().endswith('.zip'):
                    print(f"      ✅ File preserved in original format: {entry.file_name}")
                else:
                    print(f"      ⚠️ Unexpected compression: {entry.file_name} (should not be .zip in File mode)")

        print("🎉 File Mode demo completed successfully!")
        return True

    except Exception as e:
        print(f"❌ File Mode demo failed with error: {e}")
        return False

    finally:
        # Cleanup session
        if session:
            try:
                agb.delete(session, sync_context=True)
                print("✅ File mode session deleted")
            except Exception as e:
                print(f"⚠️ Failed to delete file mode session: {e}")


async def main():
    """Main demo function - runs both Archive and File mode demonstrations."""
    # Initialize the AGB client
    api_key = os.environ.get("AGB_API_KEY")
    if not api_key:
        api_key = "akm-xxx"  # Replace with your actual API key

    agb = AGB(api_key=api_key)
    context = None

    try:
        print("🚀 Upload Mode Comparison Demo")
        print("=" * 50)

        # Step 1: Create a context
        print("\n1️⃣ Creating a context...")
        context_name = f"upload-mode-demo-{int(time.time())}"
        context_result = agb.context.get(context_name, create=True)

        if not context_result.success or not context_result.context:
            print("❌ Failed to create context")
            return

        context = context_result.context
        print(f"✅ Context created: {context.name} (ID: {context.id})")

        # Step 2: Run Archive Mode Demo
        print("\n" + "="*60)
        archive_success = await demo_archive_mode(agb, context)

        # Step 3: Run File Mode Demo
        print("\n" + "="*60)
        file_success = await demo_file_mode(agb, context)

        # Step 4: Summary
        print("\n" + "="*60)
        print("📊 Demo Summary")
        print("=" * 20)
        print(f"Archive Mode: {'✅ Success' if archive_success else '❌ Failed'}")
        print(f"File Mode: {'✅ Success' if file_success else '❌ Failed'}")
        
        if archive_success and file_success:
            print("\n🎉 Both upload mode demos completed successfully!")
            print("\n📝 Key Differences:")
            print("- Archive Mode: Files are automatically compressed into ZIP format")
            print("- File Mode: Files are uploaded in their original format")
            print("- Archive Mode: Better for storage efficiency and batch operations")
            print("- File Mode: Better for individual file access and format preservation")
        else:
            print("\n⚠️ Some demos failed. Please check the logs above.")

    except Exception as e:
        print(f"❌ Demo failed with error: {e}")
        import traceback
        traceback.print_exc()

    finally:
        # Cleanup context
        print("\n🧹 Final cleanup...")
        if context:
            try:
                agb.context.delete(context)
                print("✅ Context deleted")
            except Exception as e:
                print(f"⚠️ Failed to delete context: {e}")

if __name__ == "__main__":
    asyncio.run(main())

Context Sync Demo (context_sync_demo.py)

Shows bidirectional synchronization (uploading inputs, processing them, and downloading outputs).

py
#!/usr/bin/env python3
"""
Context Synchronization Demo

This example demonstrates how to use context synchronization for data persistence
across sessions. It shows how to create a context, sync it with a session,
perform operations, and then verify data persistence in a new session.
"""

import os
import time
import asyncio

from agb import AGB, ContextSync, SyncPolicy
from agb.session_params import CreateSessionParams


async def main():
    """Main demo function."""
    # Initialize the AGB client
    api_key = os.environ.get("AGB_API_KEY")
    if not api_key:
        api_key = "akm-xxx"  # Replace with your actual API key

    agb = AGB(api_key=api_key)
    context = None
    session1 = None
    session2 = None

    try:
        print("🚀 Context Synchronization Demo")
        print("=" * 50)

        # Step 1: Create a context
        print("\n1️⃣ Creating a context...")
        context_name = f"demo-context-{int(time.time())}"
        context_result = agb.context.get(context_name, create=True)

        if not context_result.success or not context_result.context:
            print("❌ Failed to create context")
            return

        context = context_result.context
        print(f"✅ Context created: {context.name} (ID: {context.id})")

        # Step 2: Create first session with context sync
        print("\n2️⃣ Creating first session with context sync...")
        session_params = CreateSessionParams(
            image_id="agb-code-space-1"
        )

        # Configure context synchronization
        sync_path = "/home/demo-data"
        context_sync = ContextSync.new(
            context.id,
            sync_path,
            SyncPolicy()
        )
        session_params.context_syncs = [context_sync]

        session_result = agb.create(session_params)
        if not session_result.success or not session_result.session:
            print("❌ Failed to create first session")
            return

        session1 = session_result.session
        print(f"✅ First session created: {session1.session_id}")

        # Step 3: Perform operations in first session
        print("\n3️⃣ Performing operations in first session...")

        # Wait for session to be ready
        print("⏳ Waiting for session to be ready...")
        await asyncio.sleep(5)

        # Create some test data
        test_file_path = f"{sync_path}/test_data.txt"
        test_content = f"Hello from session 1! Created at {time.time()}"

        # Create directory and file
        print(f"📁 Creating directory: {sync_path}")
        dir_result = session1.file_system.create_directory(sync_path)
        if not dir_result.success:
            print(f"❌ Failed to create directory: {dir_result.error_message}")
            return

        print(f"📝 Creating test file: {test_file_path}")
        file_result = session1.file_system.write_file(test_file_path, test_content)
        if not file_result.success:
            print(f"❌ Failed to create file: {file_result.error_message}")
            return

        print("✅ Test data created successfully")

        # Step 4: Sync context data
        print("\n4️⃣ Syncing context data...")
        sync_result = await session1.context.sync()
        if not sync_result.success:
            print(f"❌ Context sync failed")
            return

        print("✅ Context sync completed")

        # Step 5: Release first session
        print("\n5️⃣ Releasing first session...")
        delete_result = agb.delete(session1, sync_context=True)
        if not delete_result.success:
            print(f"❌ Failed to delete first session: {delete_result.error_message}")
            return

        print("✅ First session released with context sync")
        session1 = None

        # Step 6: Create second session with same context
        print("\n6️⃣ Creating second session with same context...")
        session_params2 = CreateSessionParams(
            image_id="agb-code-space-1"
        )

        # Use the same context sync configuration
        context_sync2 = ContextSync.new(
            context.id,
            sync_path,
            SyncPolicy()
        )
        session_params2.context_syncs = [context_sync2]

        session_result2 = agb.create(session_params2)
        if not session_result2.success or not session_result2.session:
            print("❌ Failed to create second session")
            return

        session2 = session_result2.session
        print(f"✅ Second session created: {session2.session_id}")

        # Step 7: Verify data persistence
        print("\n7️⃣ Verifying data persistence...")

        # Wait for session to be ready and context to sync
        print("⏳ Waiting for context sync to complete...")
        await asyncio.sleep(10)

        # Read the file content
        read_result = session2.file_system.read_file(test_file_path)
        if read_result.success:
            print(f"📄 File content: {read_result.content}")
        else:
            print(f"❌ Failed to read file: {read_result.error_message}")

        # Step 8: Create additional data in second session
        print("\n8️⃣ Creating additional data in second session...")
        additional_file = f"{sync_path}/session2_data.txt"
        additional_content = f"Hello from session 2! Created at {time.time()}"

        file_result2 = session2.file_system.write_file(additional_file, additional_content)
        if not file_result2.success:
            print(f"❌ Failed to create additional file: {file_result2.error_message}")
        else:
            print("✅ Additional data created in second session")

        # Step 9: Final context sync
        print("\n9️⃣ Performing final context sync...")
        final_sync_result = await session2.context.sync()
        if not final_sync_result.success:
            print(f"❌ Final context sync failed")
        else:
            print("✅ Final context sync completed")

        print("\n🎉 Context synchronization demo completed successfully!")

    except Exception as e:
        print(f"❌ Demo failed with error: {e}")
        import traceback
        traceback.print_exc()

    finally:
        # Cleanup
        print("\n🧹 Cleaning up...")

        if session2:
            try:
                agb.delete(session2)
                print("✅ Second session deleted")
            except Exception as e:
                print(f"⚠️ Failed to delete second session: {e}")

        if session1:
            try:
                agb.delete(session1)
                print("✅ First session deleted")
            except Exception as e:
                print(f"⚠️ Failed to delete first session: {e}")

        if context:
            try:
                agb.context.delete(context)
                print("✅ Context deleted")
            except Exception as e:
                print(f"⚠️ Failed to delete context: {e}")


if __name__ == "__main__":
    asyncio.run(main())