Skip to content

Concurrency (concurrency.py)

Demonstrates how to execute multiple code tasks in parallel using threading for high-throughput scenarios.

py
"""
AGB Concurrent Code Execution Example

This example demonstrates how to execute multiple code snippets in parallel using threading.
Concurrent execution is essential for high-throughput applications like data processing pipelines.
"""

import concurrent.futures
import json
import os
import time
from typing import Callable, List

from agb import AGB
from agb.session_params import CreateSessionParams

class ConcurrentAGBProcessor:
    def __init__(self, api_key: str, max_workers: int = 3):
        self.max_workers = max_workers
        self.agb = AGB(api_key=api_key)

    def process_tasks_concurrently(self, tasks: List[dict], processor: Callable):
        """Process multiple tasks concurrently"""
        results = []
        start_time = time.time()

        print(f"🚀 Starting processing of {len(tasks)} tasks with {self.max_workers} workers...")

        with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor:
            # Submit all tasks
            future_to_task = {
                executor.submit(self._process_single_task, task, processor): task
                for task in tasks
            }

            # Collect results as they complete
            for future in concurrent.futures.as_completed(future_to_task):
                task = future_to_task[future]
                try:
                    result = future.result()
                    results.append({
                        'task_id': task.get('id'),
                        'success': True,
                        'result': result
                    })
                    print(f"✅ Task {task.get('id')} completed")
                except Exception as e:
                    results.append({
                        'task_id': task.get('id'),
                        'success': False,
                        'error': str(e)
                    })
                    print(f"❌ Task {task.get('id')} failed: {e}")

        duration = time.time() - start_time
        print(f"🏁 All tasks finished in {duration:.2f} seconds")
        return results

    def _process_single_task(self, task: dict, processor: Callable):
        """Process a single task with its own session"""
        # Create a dedicated session for this task
        params = CreateSessionParams(image_id="agb-code-space-1")
        result = self.agb.create(params)

        if not result.success:
            raise Exception(f"Failed to create session: {result.error_message}")

        session = result.session
        try:
            return processor(session, task)
        finally:
            self.agb.delete(session)

def data_processing_task(session, task):
    """The actual logic to run in the cloud"""
    data = task['data']
    operation = task['operation']

    # Simulate some heavy computation
    code = f"""
import json
import time

# Simulate work
time.sleep(1)

data = {json.dumps(data)}
result = []

for item in data:
    if '{operation}' == 'double':
        result.append(item * 2)
    elif '{operation}' == 'square':
        result.append(item ** 2)
    else:
        result.append(item)

print(json.dumps(result))
"""
    code_result = session.code.run_code(code, "python")

    if code_result.success:
        # Parse the last line of output as JSON result
        return json.loads(code_result.result.strip().split('\n')[-1])
    else:
        raise Exception(code_result.error_message)

def main():
    api_key = os.getenv("AGB_API_KEY")
    if not api_key:
        print("Error: AGB_API_KEY environment variable is not set")
        return

    processor = ConcurrentAGBProcessor(api_key=api_key, max_workers=3)

    # Define a batch of tasks
    tasks = [
        {'id': 1, 'data': [1, 2, 3, 4], 'operation': 'double'},
        {'id': 2, 'data': [2, 4, 6, 8], 'operation': 'square'},
        {'id': 3, 'data': [10, 20, 30], 'operation': 'double'},
        {'id': 4, 'data': [5, 5, 5, 5], 'operation': 'square'},
    ]

    results = processor.process_tasks_concurrently(tasks, data_processing_task)

    print("\n--- Final Results ---")
    for res in results:
        status = "Success" if res['success'] else "Failed"
        output = res['result'] if res['success'] else res['error']
        print(f"Task {res['task_id']}: {status} -> {output}")

if __name__ == "__main__":
    main()