{{announcement.body}}
{{announcement.title}}

Durable Cloud Event Production From Knative Container Source With Azure Durable Task Framework

DZone 's Guide to

Durable Cloud Event Production From Knative Container Source With Azure Durable Task Framework

This article demonstrates how to create a Knative container source and use it to generate cloud events using Azure Durable Task Framework.

· Cloud Zone ·
Free Resource

Durable cloud event production with Knative

Knative is an excellent platform for building, deploying and managing serverless workloads on Kubernetes. The Serving resources of Knative extend Istio to support serverless applications. Another class of resources of Knative called Eventing extend Istio to support the production and consumption of Cloud Events.

Knative offloads the responsibility of scaling the workloads from the users by only activating pods when they receive a request. It scales out the workload pods based on the number of requests and scales pods down to zero when they are idle. Another great feature of Knative is its ability to understand revisions of code and configurations. If the rollout of code or configuration disrupts the application, Knative automatically rolls back the update. The Serving component of Knative can incrementally roll out revisions of applications and allows flexible splitting of traffic between revisions. Knative Eventing adds the capability of building event-driven services to the platform. It allows you to declare bindings between event producers and consumers using configurations that decouple producers from consumers of events.

You may also enjoy: Knative Serving — Service-to-Service Call

There are several built-in event sources in Knative that can generate cloud events from sources such as Kubernetes, GitHub, and AWS SQS. There are two models for delivering events generated by a source to a Knative service.

  • Direct Delivery: In this model, the source can directly deliver an event to a single service. The service can either be a Knative service or a Kubernetes service. The source is responsible for retrying delivery if the service is unavailable.



 

Direct delivery

Direct delivery
  • Fan-out delivery: In this model, the events from the source are directed to another Knative component known as the Channel. All the services that are interested in listening to the events, provision Subscriptions to the channels, and can thus asynchronously receive the events. The fan-out delivery model is Knative’s implementation of the popular Pub\Sub pattern.

Fan out delivery

Fan out delivery

In this article, we will develop a custom event source with .NET core, and deliver the events to another custom Knative service using the direct delivery model.

Container Source

The most straightforward approach to building a custom event source is to use the Knative ContainerSource helper object. A container source generates events and sends the events to a sink URI. The sink can be either a Knative service or a channel. Knative ensures that a container source always keeps running unless deleted. Any application can act as a container source if it fulfills the following requirements.

  1. The application can be packaged as a container and has an ENTRYPOINT defined.

  2. It expects to receive  --sink CLI parameter or reads the  sink  environment variable, which implementations provided by Knative.

The  --sink parameter or  sink  environment variable contains the URL of the sink to which the container source should POST the events.

Durable Container Source

Knative ensures that a container source always executes, restarting the container if it crashes. However, container sources are stateless by default and thus can not orchestrate stateful workflows. Enter Azure Durable Task Framework, which is a framework to write long-running persistent workflows in C#. The Azure Durable Functions are one of the common implementations of the framework which are used for running stateful functions in Azure.

In this article, we will discuss how you can combine the Azure Durable Task Framework with Knative Container Source to generate cloud events and execute a stateful workflow. We will use the following for building our application, which you can later deploy to your favorite Kubernetes service on the cloud, such as Azure (AKS), and AWS (EKS). I have used the Windows variants of the following applications.

  1. Docker Desktop with Kubernetes enabled
  2. Visual Studio
  3. Azure Storage Emulator (or an actual storage account on Azure)

Installation

Use this reference guide to install Knative on your platform. For Windows, the steps mentioned for Minikube and Docker Desktop for Mac work just fine. Here is the script that I used for installing Knative on Docker Desktop for Windows. I assume that the following script will work on any environment.

Shell




xxxxxxxxxx
1


 
1
$ kubectl delete svc knative-ingressgateway -n istio-system
2
$ kubectl delete deploy knative-ingressgateway -n istio-system
3
$ kubectl delete statefulset/controller-manager -n knative-sources
4
$ kubectl apply --selector knative.dev/crd-install=true --filename https://github.com/knative/serving/releases/download/v0.11.0/serving.yaml --filename https://github.com/knative/eventing/releases/download/v0.11.0/release.yaml --filename https://github.com/knative/serving/releases/download/v0.11.0/monitoring.yaml
5
$ kubectl apply --filename https://github.com/knative/serving/releases/download/v0.11.0/serving.yaml --filename https://github.com/knative/eventing/releases/download/v0.11.0/release.yaml --filename https://github.com/knative/serving/releases/download/v0.11.0/monitoring.yaml


Let me point you to a known issue with Knative Eventing that baffled me for a while until I chanced upon this article. Knative requires an additional Istio cluster local gateway to resolve the address of services within your cluster (with .svc.cluster.local hostname). The scripts to install local gateway add-on according to the version of Istio installed in the cluster are present here. At the time of writing this article, I am using the following versions of Kubernetes, Istio, and Knative.

  • Kubernetes: 1.14.8
  • Istio: 1.4.0
  • Knative: 0.11.0

To install cluster local gateway for version 1.4 of Istio, apply the configuration istio-knative-extras.yaml to your cluster.

Application Overview and Source Code

The demo consists of the following applications.

  1. Hello Orchestrator: This application is a simple recurring workflow that executes in an infinite loop. The workflow consists of a single activity that posts a cloud event to a Knative service, Hello Events.
  2. Hello Events: This is a straightforward Knative HTTP service that has two endpoints. The GET endpoint of this service returns the text “Hello World.” The POST endpoint of this service prints the received event to the standard output stream and returns an HTTP OK response with text “Processed.”

The following diagram illustrates the various components of the architecture and their associations.




 

High-level Design Diagram

High-level design diagram

The source code for the application is available on GitHub.

Let’s discuss each component of the application now.

Hello Events Service

The Hello Events Service is present as Hello-Events project in the solution. It is a .NET core 3.1 console application that can listen to HTTP requests on port 80. Navigate to Program.cs file of the project, which contains a method named  CreateHostBuilder, which is responsible for attaching a listener to port 80 of the process.

C#




xxxxxxxxxx
1


1
public static IHostBuilder CreateHostBuilder(string[] args)
2
{
3
    var url = string.Concat("http://0.0.0.0:", "80");
4
    return Host.CreateDefaultBuilder(args)
5
       .ConfigureWebHostDefaults(webBuilder =>
6
       {
7
            webBuilder.UseStartup<Startup>().UseUrls(url);
8
       });
9
}


The endpoints that handle the GET and POST requests made to this service are present in the Startup.cs file. The GET endpoint returns the text “Hello World” in response, and the POST endpoint prints the request on console and returns the text “Processed.”

C#




xxxxxxxxxx
1
23


1
public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
2
{
3
    app.UseRouting();
4
    app.UseEndpoints(endpoints =>
5
   {
6
        endpoints.MapGet("/", async context =>
7
       {
8
            await context.Response.WriteAsync($"Hello World!\n");
9
       });
10
 
          
11
        endpoints.MapPost("/", async context =>
12
       {
13
            string body;
14
            using (var sr = new StreamReader(context.Request.Body))
15
           {
16
                body = await sr.ReadToEndAsync();
17
           }
18
 
          
19
            Console.WriteLine($"Received {body}");
20
            await context.Response.WriteAsync("Processed");
21
       });
22
   });
23
}


Finally, the Dockerfile present in the project can be used to package and publish this application to a container registry.

Hello Orchestrator

The Hello Orchestrator container source is present as Hello-Orchestrator project in the solution. It is a .NET core 3.1 console application that can execute indefinitely unless terminated. The Main method of the program is responsible for initializing the Durable Task Orchestrator and Durable Task Activity and kick off the workflow. As discussed in the previous section, our workflow consists of an orchestrator and a single activity that work in concert as follows.

  1. CronOrchestration (TaskOrchestrator): Executes in an infinite loop and on every execution cycle schedules, and waits for CronTask to complete. The orchestrator passes an incrementing number as job id to the CronTask activity.
  2. CronTask (TaskActivity): Generates a new cloud event on every invocation and sends a POST request to the URL specified in the SINK environment variable. The value of the SINK environment variable points to either a Service (direct delivery) or a Channel (fan-out delivery), and it is set by Knative. In this case, we will deliver the Cloud Events to the Hello Events Service (direct delivery).

Let’s discuss the code in the Main method that is responsible for setting up and kicking off the workflow.

C#




xxxxxxxxxx
1
36


 
1
var storageConnectionString = Environment.GetEnvironmentVariable("StorageConnectionString");
2
var taskHubName = Environment.GetEnvironmentVariable("TaskHubName");
3
var durationInSeconds = Environment.GetEnvironmentVariable("DurationInSeconds");
4
var mre = new ManualResetEvent(false);
5
 
          
6
var settings = new AzureStorageOrchestrationServiceSettings
7
{
8
    StorageConnectionString = storageConnectionString,
9
    TaskHubName = taskHubName
10
};
11
var orchestrationServiceAndClient = new AzureStorageOrchestrationService(settings);
12
 
          
13
var taskHubClient = new TaskHubClient(orchestrationServiceAndClient);
14
var taskHub = new TaskHubWorker(orchestrationServiceAndClient);
15
 
          
16
orchestrationServiceAndClient.CreateIfNotExistsAsync().Wait();
17
try
18
{
19
    await taskHub
20
       .AddTaskOrchestrations(typeof(CronOrchestration))
21
       .AddTaskActivities(new CronTask())
22
       .StartAsync();
23
 
          
24
    var orchestrationInstance = await taskHubClient.CreateOrchestrationInstanceAsync(
25
        typeof(CronOrchestration),
26
        TimeSpan.FromSeconds(double.Parse(durationInSeconds ?? "5")));
27
 
          
28
    Console.WriteLine($"ExecutionId: {orchestrationInstance.ExecutionId}. Blocking main thread.");
29
    mre.WaitOne();
30
    await taskHub.StopAsync(true);
31
    Console.WriteLine("Done!!");
32
}
33
catch (Exception e)
34
{
35
    Console.WriteLine($"worker exception: {e}");
36
}


You can use Azure Service Bus or Azure Storage to persist the state of a Durable Task. For this sample, I have used Azure Storage to persist the state. Next, we created new instances of the Task Hub Client and the Task Hub, which are essential to interact with Task Orchestration instances and reading state from Azure Storage. If some of the framework terms sound confusing to you, you should read this small Wiki documentation on the keywords used in the framework.

Finally, we kicked off the workflow by creating a new orchestration instance and blocking the main thread. Let’s now visit the code of the  CronOrchestration  class and discuss it in detail.

C#




xxxxxxxxxx
1
27


1
public override async Task<string> RunTask(OrchestrationContext context, TimeSpan duration)
2
{
3
    try
4
   {
5
        while (true)
6
       {
7
            var currentTime = context.CurrentUtcDateTime;
8
            var fireAt = currentTime.Add(duration);
9
            _jobNumber += 1;
10
            if (!context.IsReplaying)
11
           {
12
                Console.WriteLine(
13
                    $"{context.OrchestrationInstance.InstanceId}: Attempting to queue job {_jobNumber}.");
14
           }
15
 
          
16
            Console.WriteLine(
17
                $"{context.OrchestrationInstance.InstanceId}: Job {_jobNumber} scheduled to run at {fireAt}.");
18
            await context.CreateTimer(fireAt, _jobNumber.ToString());
19
            Console.WriteLine(await context.ScheduleTask<string>(typeof(CronTask), _jobNumber.ToString()));
20
       }
21
   }
22
    catch (Exception e)
23
   {
24
        Console.WriteLine(e);
25
        throw;
26
   }
27
}


The orchestrator executes in an infinite loop and on every execution cycle, schedules and waits for the execution of  CronTask. It passes an incrementing number as an argument to the task.

Let’s now discuss the final code component of the application, the CronTask class.

C#




xxxxxxxxxx
1
29


 
1
protected override string Execute(TaskContext context, string input)
2
{
3
    try
4
   {
5
        Result++;
6
 
          
7
        var cloudEvent = new CloudEvent("com.hello.cron-event", new Uri("urn:hello-com:cron-source"))
8
       {
9
            DataContentType = new ContentType(MediaTypeNames.Application.Json),
10
            Data = JsonConvert.SerializeObject(new
11
           {
12
                Id = context.OrchestrationInstance.InstanceId,
13
                JobId = input,
14
                Result
15
           })
16
       };
17
 
          
18
        var content = new CloudEventContent(cloudEvent, ContentMode.Structured, new JsonEventFormatter());
19
        Console.WriteLine($"Going to post data: {JsonConvert.SerializeObject(cloudEvent.Data)} to Url: {Environment.GetEnvironmentVariable("SINK")}");
20
        var result = _httpClient.PostAsync(Environment.GetEnvironmentVariable("SINK"), content).Result;
21
        return
22
            $"Cron Job '{input}' Completed... @{DateTime.UtcNow} Response: {result} Event: {JsonConvert.SerializeObject(cloudEvent.Data)}";
23
   }
24
    catch (Exception e)
25
   {
26
        Console.WriteLine(e);
27
        throw;
28
   }
29
}


The  CronTask class is quite straightforward. It creates a new cloud event and sends it as the payload of a POST request to the URL present as the value of the SINK environment variable.

You can use the helper script located at the root of the solution, build-push-run.cmd, to build container images, push them to the registry, and finally execute them on your local system.

Deploying The Application

The spec to deploy the application is named appspec.yaml, and it is located at the root of the solution. Let’s discuss each specification in this file, starting with the namespace.

YAML




xxxxxxxxxx
1


 
1
apiVersion: v1
2
kind: Namespace
3
metadata:
4
 name: kn-app
5
 labels:
6
   knative-eventing-injection: enabled
7
   istio-injection: enabled


This specification instructs Istio to inject a sidecar for our services and instructs Knative to add a default broker for the namespace. Let’s now configure our container source.

YAML




xxxxxxxxxx
1
19


 
1
apiVersion: sources.eventing.knative.dev/v1alpha1
2
kind: ContainerSource
3
metadata:
4
 name: hello-orchestrator
5
 namespace: kn-app
6
spec:
7
 image: rahulrai/hello-orchestrator
8
 env:
9
   - name: StorageConnectionString
10
     value: "DefaultEndpointsProtocol=https;AccountName={account name};AccountKey={account key};EndpointSuffix=core.windows.net"
11
   - name: TaskHubName
12
     value: "DTFHub"
13
   - name: DurationInSeconds
14
     value: "10"
15
 sink:
16
   apiVersion: serving.knative.dev/v1alpha1
17
   kind: Service
18
   name: hello-events
19
   namespace: kn-app


To execute the hello-orchestrator container as an event source, we need to create a concrete ContainerSource and specify the arguments and appropriate environment settings. The environment variables set in the specifications are passed to the container.

A  SinkBinding links the event producer to the event consumer. In the previous specification, we specified the sink to which the container source should pump the events. In our case, it is the hello-events service that we will deploy under that same namespace.

Finally, let’s visit the specification responsible for publishing the  hello-events service as a Knative service.

YAML




xxxxxxxxxx
1
13


1
apiVersion: serving.knative.dev/v1
2
kind: Service
3
metadata:
4
 name: hello-events
5
 namespace: kn-app
6
spec:
7
 template:
8
   spec:
9
     containers:
10
       - image: rahulrai/hello-events
11
         ports:
12
           - name: http1
13
             containerPort: 80


To provision a Knative service, you need to mention the name of the service, the image of the service, and the port on which the clients can reach your service.

To deploy the application, execute the following command.

Shell




xxxxxxxxxx
1


 
1
$ kubectl apply -f appspec.yaml
2
 
          
3
namespace/kn-app created
4
containersource.sources.eventing.knative.dev/hello-orchestrator created
5
service.serving.knative.dev/hello-events created


Knative Container Source and Service in Action

The hello-events service is just a regular Knative service, and it can respond to HTTP requests issued by clients. To get the hostname that your service recognizes, execute the following command.

Shell




x


 
1
$ kubectl get ksvc -n kn-app
2
 
          
3
NAME           URL                                     LATESTCREATED       LATESTREADY         READY
4
hello-events   http://hello-events.kn-app.example.com   hello-events-zlh78   hello-events-zlh78   True


Next, issue an HTTP request passing in the hostname that you received in the output as follows.

Shell




xxxxxxxxxx
1


 
1
$ curl -H "Host: hello-events.kn-app.example.com" http://localhost
2
 
          
3
Hello World!


Note that when you deploy the Knative service to the cloud, you won’t need to pass the hostname in the header of the request.

Let’s check the logs of the container source and the Knative service by executing the following command to follow the logs of  hello-orchestrator.

Shell




xxxxxxxxxx
1


 
1
$ kubectl logs -l sources.eventing.knative.dev/containerSource=hello-orchestrator -n kn-app -c source -f


The following command will stream the logs generated by the  hello-event  service.

Shell




xxxxxxxxxx
1


1
$ kubectl logs -l serving.knative.dev/service=hello-events -n kn-app -c user-container -f


The following screenshot presents an instance of the log stream generated by both the components that I captured by executing the previous commands.

Knative Container Source and Service Logs

Knative container source and service logs

Conclusion

Although Knative is in its early stages, its documentation is very extensive. There are a ton of samples and demos that you can refer to for getting up and running with Knative. I faltered a couple of times while building this demo. However, the Knative community is very active and helpful and they gladly helped me get over those issues. This article is a small contribution from me to the burgeoning Knative community.

Further Reading

Knative Serving — Service-to-Service Call

Knative Monitoring, Logging, and Tracing Explained


Topics:
knative ,azure ,serverless ,framework ,durable cloud ,resilience ,task framework

Published at DZone with permission of Rahul Rai . See the original article here.

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}