Appearance
Concurrency (concurrency.py)
Demonstrates how to execute multiple code tasks in parallel using threading for high-throughput scenarios.
py
"""
AGB Concurrent Code Execution Example
This example demonstrates how to execute multiple code snippets in parallel using threading.
Concurrent execution is essential for high-throughput applications like data processing pipelines.
"""
import concurrent.futures
import json
import os
import time
from typing import Callable, List
from agb import AGB
from agb.session_params import CreateSessionParams
class ConcurrentAGBProcessor:
def __init__(self, api_key: str, max_workers: int = 3):
self.max_workers = max_workers
self.agb = AGB(api_key=api_key)
def process_tasks_concurrently(self, tasks: List[dict], processor: Callable):
"""Process multiple tasks concurrently"""
results = []
start_time = time.time()
print(f"🚀 Starting processing of {len(tasks)} tasks with {self.max_workers} workers...")
with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor:
# Submit all tasks
future_to_task = {
executor.submit(self._process_single_task, task, processor): task
for task in tasks
}
# Collect results as they complete
for future in concurrent.futures.as_completed(future_to_task):
task = future_to_task[future]
try:
result = future.result()
results.append({
'task_id': task.get('id'),
'success': True,
'result': result
})
print(f"✅ Task {task.get('id')} completed")
except Exception as e:
results.append({
'task_id': task.get('id'),
'success': False,
'error': str(e)
})
print(f"❌ Task {task.get('id')} failed: {e}")
duration = time.time() - start_time
print(f"🏁 All tasks finished in {duration:.2f} seconds")
return results
def _process_single_task(self, task: dict, processor: Callable):
"""Process a single task with its own session"""
# Create a dedicated session for this task
params = CreateSessionParams(image_id="agb-code-space-1")
result = self.agb.create(params)
if not result.success:
raise Exception(f"Failed to create session: {result.error_message}")
session = result.session
try:
return processor(session, task)
finally:
self.agb.delete(session)
def data_processing_task(session, task):
"""The actual logic to run in the cloud"""
data = task['data']
operation = task['operation']
# Simulate some heavy computation
code = f"""
import json
import time
# Simulate work
time.sleep(1)
data = {json.dumps(data)}
result = []
for item in data:
if '{operation}' == 'double':
result.append(item * 2)
elif '{operation}' == 'square':
result.append(item ** 2)
else:
result.append(item)
print(json.dumps(result))
"""
code_result = session.code.run_code(code, "python")
if code_result.success:
# Parse the last line of output as JSON result
return json.loads(code_result.result.strip().split('\n')[-1])
else:
raise Exception(code_result.error_message)
def main():
api_key = os.getenv("AGB_API_KEY")
if not api_key:
print("Error: AGB_API_KEY environment variable is not set")
return
processor = ConcurrentAGBProcessor(api_key=api_key, max_workers=3)
# Define a batch of tasks
tasks = [
{'id': 1, 'data': [1, 2, 3, 4], 'operation': 'double'},
{'id': 2, 'data': [2, 4, 6, 8], 'operation': 'square'},
{'id': 3, 'data': [10, 20, 30], 'operation': 'double'},
{'id': 4, 'data': [5, 5, 5, 5], 'operation': 'square'},
]
results = processor.process_tasks_concurrently(tasks, data_processing_task)
print("\n--- Final Results ---")
for res in results:
status = "Success" if res['success'] else "Failed"
output = res['result'] if res['success'] else res['error']
print(f"Task {res['task_id']}: {status} -> {output}")
if __name__ == "__main__":
main()