Scheduler-Agent-Supervisor Pattern: Reliable Task Orchestration in Distributed Systems
The Scheduler-Agent-Supervisor (SAS) pattern enables scalable, resilient, and maintainable distributed task execution through clear role separation.
Join the DZone community and get the full member experience.
Join For FreeThe Scheduler-Agent-Supervisor (SAS) pattern is a powerful architectural approach for managing distributed, asynchronous, and long-running tasks in a reliable and scalable way. It is particularly well-suited for systems where work needs to be orchestrated across many independent units—each capable of failing and retrying—while maintaining observability and idempotency.
This pattern divides responsibilities into three well-defined roles:
- Scheduler: Initiates workflows and tracks high-level progress
- Agent: Executes individual task units
- Supervisor: Monitors and manages task execution
Key Components With C# Implementation
1. Scheduler Component
The scheduler triggers the workflow. Here's a C# example using a timer:
public class DataExportScheduler : BackgroundService
{
private readonly ILogger<DataExportScheduler> _logger;
private readonly ISupervisorClient _supervisorClient;
private readonly Timer _timer;
public DataExportScheduler(ILogger<DataExportScheduler> logger, ISupervisorClient supervisorClient)
{
_logger = logger;
_supervisorClient = supervisorClient;
_timer = new Timer(ExecuteScheduledJob, null, Timeout.Infinite, Timeout.Infinite);
}
protected override Task ExecuteAsync(CancellationToken stoppingToken)
{
// Run every day at 2 AM
_timer.Change(GetNextRunTime(), Timeout.Infinite);
return Task.CompletedTask;
}
private TimeSpan GetNextRunTime()
{
var now = DateTime.Now;
var nextRun = now.Date.AddDays(1).AddHours(2); // Tomorrow at 2 AM
return nextRun - now;
}
private async void ExecuteScheduledJob(object state)
{
_logger.LogInformation("Initiating data export workflow");
try
{
var fileList = await GetFileListToProcess();
await _supervisorClient.StartWorkflowAsync(fileList);
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to initiate workflow");
}
finally
{
// Reset the timer for next run
_timer.Change(GetNextRunTime(), Timeout.Infinite);
}
}
private async Task<List<string>> GetFileListToProcess()
{
// Implementation to fetch files from storage
return new List<string> { "file1.csv", "file2.csv" /* ... */ };
}
}
2. Agent Component
Agents perform the actual work. Here's an idempotent file processor:
public class FileProcessingAgent
{
private readonly ILogger<FileProcessingAgent> _logger;
private readonly IBlobStorageService _storageService;
private readonly IDatabaseRepository _repository;
public FileProcessingAgent(
ILogger<FileProcessingAgent> logger,
IBlobStorageService storageService,
IDatabaseRepository repository)
{
_logger = logger;
_storageService = storageService;
_repository = repository;
}
[FunctionName("ProcessFile")]
public async Task ProcessFile(
[ActivityTrigger] string fileName,
ExecutionContext context)
{
// Check if already processed (idempotency check)
if (await _repository.IsFileProcessed(fileName))
{
_logger.LogInformation($"File {fileName} already processed. Skipping.");
return;
}
try
{
_logger.LogInformation($"Processing file: {fileName}");
// 1. Download file
var fileContent = await _storageService.DownloadFileAsync(fileName);
// 2. Parse content
var records = CsvParser.Parse(fileContent);
// 3. Transform data
var transformedData = DataTransformer.Transform(records);
// 4. Upload to database
await _repository.BulkInsertAsync(transformedData);
// 5. Mark as completed
await _repository.MarkFileAsProcessed(fileName);
_logger.LogInformation($"Successfully processed {fileName}");
}
catch (Exception ex)
{
_logger.LogError(ex, $"Failed to process file {fileName}");
// Clean up any partial state
await _repository.RollbackFileProcessing(fileName);
throw; // Let supervisor handle retry
}
}
}
3. Supervisor Component
The supervisor orchestrates and monitors the workflow:
public class FileProcessingSupervisor
{
private readonly ILogger<FileProcessingSupervisor> _logger;
private readonly IAgentClient _agentClient;
private readonly INotificationService _notificationService;
public FileProcessingSupervisor(
ILogger<FileProcessingSupervisor> logger,
IAgentClient agentClient,
INotificationService notificationService)
{
_logger = logger;
_agentClient = agentClient;
_notificationService = notificationService;
}
[FunctionName("FileProcessingOrchestrator")]
public async Task RunOrchestrator(
[OrchestrationTrigger] IDurableOrchestrationContext context)
{
var files = context.GetInput<List<string>>();
var retryOptions = new RetryOptions(
firstRetryInterval: TimeSpan.FromSeconds(30),
maxNumberOfAttempts: 3);
_logger.LogInformation($"Starting processing of {files.Count} files");
// Parallel processing with retry logic
var processingTasks = new List<Task>();
foreach (var file in files)
{
var task = context.CallActivityWithRetryAsync(
"ProcessFile",
retryOptions,
file);
processingTasks.Add(task);
}
try
{
await Task.WhenAll(processingTasks);
_logger.LogInformation("All files processed successfully");
}
catch (Exception ex)
{
_logger.LogError(ex, "Some files failed processing after retries");
// Get failed files
var failedFiles = processingTasks
.Where(t => t.IsFaulted)
.Select(t => (string)t.AsyncState)
.ToList();
await _notificationService.SendAlert(
"File Processing Failure",
$"Failed to process {failedFiles.Count} files: {string.Join(", ", failedFiles)}");
// Persist failed files for manual intervention
await context.CallActivityAsync("PersistFailedFiles", failedFiles);
// Continue workflow with remaining files
throw;
}
}
}
Complete System Integration
Here's how to wire up the components in a .NET application:
var builder = Host.CreateDefaultBuilder(args)
.ConfigureServices((context, services) =>
{
// Register components
services.AddHostedService<DataExportScheduler>();
services.AddSingleton<ISupervisorClient, DurableFunctionsSupervisorClient>();
services.AddSingleton<IAgentClient, AzureFunctionsAgentClient>();
// Register dependencies
services.AddSingleton<IBlobStorageService, AzureBlobStorageService>();
services.AddSingleton<IDatabaseRepository, SqlDatabaseRepository>();
services.AddSingleton<INotificationService, EmailNotificationService>();
// Configure Durable Functions
services.AddDurableTask(options =>
{
options.HubName = "FileProcessingHub";
options.StorageProvider["maxQueuePollingInterval"] = "00:00:10";
});
})
.ConfigureLogging(logging =>
{
logging.AddApplicationInsights();
logging.AddConsole();
});
await builder.Build().RunAsync();
When to Use the SAS Pattern
Ideal Use Cases:
- ETL pipelines: Processing large volumes of data with reliability requirements
- Order fulfillment systems: Where each step must be tracked and retried
- Distributed computations: Breaking large problems into smaller, parallel tasks
Anti-Patterns:
- Simple CRUD operations: Where the overhead isn't justified
- Real-time processing: Consider event streaming patterns instead
- Synchronous workflows: Where immediate response is required
Best Practices
Idempotency:
// Example idempotent operation
public async Task ProcessOrder(Order order)
{
// Check if already processed
if (await _repository.OrderExists(order.Id))
return;
// Process with transaction
using var transaction = await _repository.BeginTransactionAsync();
try
{
await _inventoryService.ReserveItems(order.Items);
await _paymentService.ProcessPayment(order.Payment);
await _repository.SaveOrder(order);
await transaction.CommitAsync();
}
catch
{
await transaction.RollbackAsync();
throw;
}
}
Observability:
// Enhanced logging with correlation IDs
public async Task ProcessItem(string itemId)
{
using var scope = _logger.BeginScope(new Dictionary<string, object>
{
["CorrelationId"] = Guid.NewGuid(),
["ItemId"] = itemId
});
_logger.LogInformation("Starting processing");
var stopwatch = Stopwatch.StartNew();
try
{
// Processing logic...
_logger.LogInformation("Processing completed in {ElapsedMs}ms",
stopwatch.ElapsedMilliseconds);
}
catch (Exception ex)
{
_logger.LogError(ex, "Processing failed after {ElapsedMs}ms",
stopwatch.ElapsedMilliseconds);
throw;
}
}
Circuit Breakers:
// Using Polly for resilient HTTP calls
var circuitBreaker = Policy
.Handle<HttpRequestException>()
.Or<TimeoutException>()
.CircuitBreakerAsync(
exceptionsAllowedBeforeBreaking: 3,
durationOfBreak: TimeSpan.FromMinutes(1));
public async Task CallExternalService()
{
await circuitBreaker.ExecuteAsync(async () =>
{
var response = await _httpClient.GetAsync("https://api.example.com/data");
response.EnsureSuccessStatusCode();
return await response.Content.ReadAsStringAsync();
});
}
Conclusion
The Scheduler-Agent-Supervisor pattern provides a robust framework for building distributed systems that require:
- Resilience: Automatic retries and failure handling
- Scalability: Parallel processing of independent tasks
- Maintainability: Clear separation of concerns
- Auditability: Comprehensive tracking of task states
Opinions expressed by DZone contributors are their own.
Comments