DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Please enter at least three characters to search
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Zones

Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks

Last call! Secure your stack and shape the future! Help dev teams across the globe navigate their software supply chain security challenges.

Modernize your data layer. Learn how to design cloud-native database architectures to meet the evolving demands of AI and GenAI workloads.

Releasing software shouldn't be stressful or risky. Learn how to leverage progressive delivery techniques to ensure safer deployments.

Avoid machine learning mistakes and boost model performance! Discover key ML patterns, anti-patterns, data strategies, and more.

Related

  • Modeling Saga as a State Machine
  • Process Mining Key Elements
  • Calling a Client Via Spring @schedule Cron Job
  • Automating PMO Meetings With n8n Automation

Trending

  • Automating Data Pipelines: Generating PySpark and SQL Jobs With LLMs in Cloudera
  • 5 Subtle Indicators Your Development Environment Is Under Siege
  • The Human Side of Logs: What Unstructured Data Is Trying to Tell You
  • Testing SingleStore's MCP Server
  1. DZone
  2. Software Design and Architecture
  3. Microservices
  4. Curating Efficient Distributed Application Runtime (Dapr) Workflows

Curating Efficient Distributed Application Runtime (Dapr) Workflows

With Dapr and its powerful abstractions, writing resilient workflows is easy. In this article, we will understand how to write one with an example.

By 
Siri Varma Vegiraju user avatar
Siri Varma Vegiraju
DZone Core CORE ·
Oct. 04, 24 · Tutorial
Likes (4)
Comment
Save
Tweet
Share
8.8K Views

Join the DZone community and get the full member experience.

Join For Free

Distributed Application Runtime (Dapr) is a portable and event-driven runtime that commoditizes some of the problems developers face with distributed systems and microservices daily.

Imagine there are 3-4 different microservices. As part of communication between these services, developers must think about:

  • Handling timeouts and failures
  • Metrics and traces
  • Controlling and restricting communication between services.

These challenges are recurring, but with Dapr's Service-to-Service Invocation building block, they are seamlessly abstracted.

Dapr divides such capabilities into components that can be invoked using a building block, aka API.

Components Overview

Below mentioned are a subset of components that Dapr supports. 

Component Description
Service-to-Service  Facilitates communication between microservices: It encapsulates handling failures, observability, and applying policies (responsible for enforcing restrictions on who is allowed to call)
Secrets Facilitate communication with cloud secrets and Kubernetes secrets provider stores
Workflows With the Workflows component, developers can run long-running workloads distributed across nodes.
Publish/Subscribe Similar to the producer/consumer pattern, with this component messages can be produced to a topic and listeners can consume from the subscribed topic.

Let's dive into the workflow component.

Workflow Component

Problem

An example of a simple Workflow can be a scheduled job that moves data between data sources. The complexity increases when child workflows must be triggered as part of the parent workflow and the workflow author also becomes responsible for saving, resuming, and maintaining the state and the schema.

With the Dapr Workflow component, most of the state management is abstracted out, allowing developers to focus only on the business logic.

Key Terms

  • Workflow: Contains a set of tasks that need to be executed
  • Activities: Tasks that need to be executed; For example, in the previous work where data must be moved from source to destination:
    • Activity 1: Reads data from Source
    • Activity 2: Writes to the destination

Workflow will compromise both these activities.

  • Benefits
    • Using Workflow Replays we inherently get checkpointing mechanism. For example, in the C# async/await model, Dapr automatically checkpoints at each await call. This allows the system to recover from the most recent I/O operation during a failure, making recovery less costly.
    • Built-in retry strategies for the workflows and activities are customizable to suit specific workflows.

Workflow Patterns

Pattern 1

Workflow pattern 1

The parent workflow parallelly schedules multiple child activities.

Pattern 2

Workflow pattern 2

In this scenario, the workflow schedules Activity 1 and then passes its output to Activity 2 for further processing.

Pattern 3Workflow pattern 3

Here, the parent workflow schedules another child workflow which in turn schedules some activities.

Example

Let's explore an example using C# and Dapr to schedule workflows that read data from Blob storage.

Step 1

Import the Dapr packages into csproj.

XML
 
<ItemGroup>
  # https://www.nuget.org/packages/Dapr.AspNetCore
  <PackageReference Include="Dapr.AspNetCore" Version="1.14.0" ></PackageReference>
  # https://www.nuget.org/packages/Dapr.Workflow
  <PackageReference Include="Dapr.Workflow" Version="1.14.0" ></PackageReference>
</ItemGroup>


Step 2: Configuring Workflow and Activity

  1. Add workflow and activities to the Dapr Workflow extension.
  2. "Register Workflow" is used to register workflows.
  3. "Register Activity" is used to register activity.
C#
 
/// <summary>
 /// Configure workflow extension.
 /// </summary>
 public static class DaprConfigurationExtension
 {
     /// <summary>
     /// Configure Workflow extension.
     /// </summary>
     /// <param name="services">services.</param>
     /// <returns>IServiceCollection.</returns>
     public static IServiceCollection ConfigureDaprWorkflows(this IServiceCollection services)
     {
         services.AddDaprWorkflow(options =>
         {
             // Note that it's also possible to register a lambda function as the workflow
             // or activity implementation instead of a class.
             options.RegisterWorkflow<BlobOrchestrationWorkflow>();

             // These are the activities that get invoked by the Dapr workflow(s).
             options.RegisterActivity<BlobDataFetchActivity>();
         });

         return services;
     }
 }


Step 3: Writing the First Workflow

The Blob Orchestration Workflow implements Workflow coming from Dapr NuGet with input and output parameters. 

The input here is the name of the blob, which is a string, and the output is content from the blob, nothing but a list of lines.

C#
 
  /// <summary>
   /// Dapr workflow responsible for peforming operations on blob.
   /// </summary>
   public class BlobOrchestrationWorkflow : Workflow<string, List<string>>
   {
       /// <inheritdoc/>
       public async override Task<List<string>> RunAsync(WorkflowContext context, string input)
       {
           ArgumentNullException.ThrowIfNull(context);
           ArgumentNullException.ThrowIfNull(input);

           List<string> identifiers = await context.CallActivityAsync<List<string>>(
               name: nameof(BlobDataFetchActivity),
               input: input).ConfigureAwait(false); // state is saved

           return identifiers;
       }
   }


Step 4: Writing the First Activity

Like Workflow, Activity also takes input and output. In this case, input is the blob name, and output is the list of lines from the blob.

C#
 
/// <summary>
/// Fetch identifiers from Blob.
/// </summary>
public class BlobDataFetchActivity : WorkflowActivity<string, List<string>>
{
    private readonly IBlobReadProcessor readProcessor;

    /// <summary>
    /// Initializes a new instance of the <see cref="BlobDataFetchActivity"/> class.
    /// </summary>
    /// <param name="blobReadProcessor">read blob data.</param>
    public BlobDataFetchActivity(IBlobReadProcessor blobReadProcessor)
    {
        this.readProcessor = blobReadProcessor;
    }

    /// <inheritdoc/>
    public override async Task<List<string>> RunAsync(WorkflowActivityContext context, string input)
    {
        return await this.readProcessor.ReadBlobContentAsync<List<string>>(input).ConfigureAwait(false); // state is saved
    }
}


Step 5: Scheduling the First Workflow

  • Use the Workflow Client schedule workflows.
  • The "instance id" must be unique to each workflow. Using the same ID can cause indeterministic behavior.
  • Each workflow has an input and an output. For example, if the workflow is going to take a blob name as input and return a list of lines in the blob, the input is a string, and the output is a List<string>.
  • Workflow is tracked using the workflow ID and once it is completed, the "Execute Workflow Async" method completes execution.
C#
 
public class DaprService
{
    // Workflow client injected using Dependency Injection.
    private readonly DaprWorkflowClient daprWorkflowClient;

    /// <summary>
    /// Initializes a new instance of the <see cref="QueuedHostedService{T}"></see> class.
    /// </summary>
    /// <param name="daprWorkflowClient">Dapr workflow client.</param>
    public QueuedHostedService(DaprWorkflowClient daprWorkflowClient)
    {
        this.daprWorkflowClient = daprWorkflowClient;
    }

    /// <summary>
    /// Execute Dapr workflow.
    /// </summary>
    /// <param name="message">string Message.</param>
    /// <returns>Task.</returns>
    public async Task ExecuteWorkflowAsync(string message)
    {
        string id = Guid.NewGuid().ToString();
        
        // Schedule the Dapr Workflow.
        await this.daprWorkflowClient.ScheduleNewWorkflowAsync(
            name: nameof(NetworkRecordIngestionWorkflow),
            instanceId: id,
            input: message).ConfigureAwait(false);
		
        WorkflowState state = await this.daprWorkflowClient.GetWorkflowStateAsync(
                instanceId: id,
                getInputsAndOutputs: true).ConfigureAwait(false);

		// Track the workflow state until completion.
        while (!state.IsWorkflowCompleted)
        {
            state = await this.daprWorkflowClient.GetWorkflowStateAsync(
                        instanceId: id,
                        getInputsAndOutputs: true).ConfigureAwait(false);
        }
    }
}


Best Practices

  • Each time Dapr encounters an "await," it saves the workflow state. Leveraging this feature is important for ensuring workflows can resume efficiently and cost-effectively after interruptions.
  • In addition to the above, the input and output must be deterministic for the Workflow replay pattern to work correctly. For example,
  • Assume below is the first input to the workflow. The workflow then pulls the data from the blob, saves it to the state, and for some reason crashes.
JSON
 
{
  "blobName": "dapr-blob",
  "createdOn": "2024-12-11T23:00:00.11212Z"
}


After a restart, we resend the input with a different "created on" timestamp. Even though we’ve already saved the output for the blob name, the new timestamp qualifies this as a new payload, prompting the output to be recomputed. If the "created on" timestamp was omitted, we could retrieve the state from the state store without making an additional I/O call.

JSON
 
{
  "blobName": "dapr-blob",
  "createdOn": "2024-12-11T23:01:00.11212Z"
}


Workflow interaction with data other than the state must happen through Activities only.

Data (computing) Schedule (computer science) workflow microservice Distributed Computing

Opinions expressed by DZone contributors are their own.

Related

  • Modeling Saga as a State Machine
  • Process Mining Key Elements
  • Calling a Client Via Spring @schedule Cron Job
  • Automating PMO Meetings With n8n Automation

Partner Resources

×

Comments
Oops! Something Went Wrong

The likes didn't load as expected. Please refresh the page and try again.

ABOUT US

  • About DZone
  • Support and feedback
  • Community research
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 100
  • Nashville, TN 37211
  • support@dzone.com

Let's be friends:

Likes
There are no likes...yet! 👀
Be the first to like this post!
It looks like you're not logged in.
Sign in to see who liked this post!