Appearance
File processing pipeline
python
def file_processing_pipeline(session, input_file, output_file):
"""Complete file processing pipeline"""
# Step 1: Read input file
read_result = session.file_system.read_file(input_file)
if not read_result.success:
return False, f"Failed to read input: {read_result.error_message}"
# Step 2: Process content (example: convert to uppercase)
original_content = read_result.content
processed_content = original_content.upper()
# Step 3: Write processed content
write_result = session.file_system.write_file(output_file, processed_content)
if not write_result.success:
return False, f"Failed to write output: {write_result.error_message}"
# Step 4: Verify output
verify_result = session.file_system.read_file(output_file)
if not verify_result.success:
return False, f"Failed to verify output: {verify_result.error_message}"
return True, {
"input_size": len(original_content),
"output_size": len(processed_content),
"processed": True
}
# Usage example
from agb import AGB
from agb.session_params import CreateSessionParams
agb = AGB()
params = CreateSessionParams(image_id="agb-code-space-1")
result = agb.create(params)
if result.success:
session = result.session
# Create input file
session.file_system.write_file("/tmp/input.txt", "hello world\nthis is a test")
# Process file
success, result = file_processing_pipeline(
session,
"/tmp/input.txt",
"/tmp/output.txt"
)
if success:
print("Pipeline completed successfully:")
print(f"Input size: {result['input_size']} bytes")
print(f"Output size: {result['output_size']} bytes")
else:
print(f"Pipeline failed: {result}")
agb.delete(session)
else:
print(f"Failed to create session: {result.error_message}")