{{announcement.body}}
{{announcement.title}}

Implementing a Robust Portable Cron-Like Scheduler via Couchbase Eventing (Part 1)

DZone 's Guide to

Implementing a Robust Portable Cron-Like Scheduler via Couchbase Eventing (Part 1)

Leverage the Couchbase Eventing Service to run multiple scheduled tasks at specific recurring intervals in a cron like fashion completely inside the Couchbase database.

· Database Zone ·
Free Resource

This is the first of a multi-part series to leverage the Couchbase Eventing Service to run multiple scheduled tasks at specific recurring intervals in a cron like fashion completely inside the database without requiring additional infrastructure via a single general-purpose Eventing Function. In this installment, we will focus on running fixed user routines, JavaScript functions defined inside an Eventing Function. Later in subsequent articles we will extend the cron like Eventing Function to schedule and execute database driven dynamic N1QL statements and then finally in we will explore scheduling database driven dynamic JavaScript functions.

Background

The Couchbase Eventing Service provides a framework for writing your own routines, simple JavaScript functions, to process document changes.  This service provides all the needed infrastructure to create scalable and robust cloud-based functions allowing you to focus on developing pure business logic to interact in near real-time to changes in your data. Your functions are able to access the Couchbase data service (KV), the Couchbase query service (N1QL), and REST endpoints external to the Couchbase system. Eventing Life Cycle 6.5 I/O The JSON data model in Couchbase came from JavaScript, thus it is only natural that the Eventing Service exposes the ability to write JavaScript code to analyze and manipulate JSON documents on any type of change events including Inserts, Updates, Merges, and Deletes (together referred to as mutations). Eventing Functions typically allow you to deploy and execute custom code fragments to that react to thousands and even millions of mutations per seconds in your documents. Several typical use cases are documented for developing high velocity at-scale Eventing Functions that respond to mutations of Couchbase documents. Eventing Life Cycle 6.5 This article will focus instead on a very low velocity use case of the Eventing Service building a reliable “in database” distributed crontab, allowing you to execute JavaScript functions that interact with Couchbase services on a regular periodic schedule.

Scheduling Business Logic to Run at a Specified Date or Time

Cron, named after “Chronos,” the Greek word for time is one of the most useful utilities in a Linux system. In Linux the cron utility is driven by a crontab (cron table) file, a configuration file that specifies shell commands to run periodically on a given schedule. One drawback in running cron is that it is not designed to be a distributed service; it runs on single box, as such it presents a single point of failure. If the system is offline for several hours all scheduled tasks are missed.

Yes, there are some distributed cron implementations such as Google’s Cloud Service, AWS’ Scheduled Tasks, and Azure Functions / Time Triggers. But each cloud vendors offerings have their own idioms are not directly portable. In addition, the methodology of configuration and control needs to be secured, for example if you control a distributed cron system via a REST API over HTTP/S you need to account for this in your security plan.

Using the Couchbase Itself to Run Periodic Commands

With a minor amount of code and planning you can leverage Couchbase’s Eventing service to provide flexible cron like functionality for your scheduled database operations or maintenance. Building the scheduler into the database allows you to achieve the following benefits:

  • Portability across Cloud providers, if you rehost your Couchbase cluster your scheduler is not impacted.
  • Supportability, if you utilize Couchbase you have single vendor to provided support and other services.
  • Distributed, no single point of failure and all Couchbase services support distributed replicas.
  • Guaranteed execution, your task gets executed even after recovery from a node failure.

Couchbase Scheduling, Timers the Secret Sauce

Timers are Couchbase Eventing Service constructs by which developers can specify a routine (business logic) to be triggered at a future time. We will use this functionality to implement a pure Couchbase configurable crontab system that allows you the ability to trigger repetitive tasks as part of your workflows whether you need to execute a simple N1QL query or build a complex rules engine. In all of the subsequent designs we will limit our cron implementations to a resolution of 15 seconds or greater. 

We have this limitation because although timers scale to the millions and are guaranteed to fire and execute, they are not wall-clock accurate currently have a bounded steady state delay of less than 14 seconds [1]. Of course, if you need a tighter schedule, i.e. less than 15 seconds, then you should merely process the mutation itself in Eventing logic without the use of a timer construct to schedule a call back in the future. As of this writing the current Couchbase release is version 6.5.1 which two limitations that we must work around when making a robust cron system.

  1. In the 5.5.x, 6.0.x and 6.5.x releases a function that is invoked by a timer callback cannot reliably create a fresh timer (a user space work around can be done via a second cooperative Function).
  2. In the 6.5.x releases creating timers in the future (as in one hour+) in an otherwise idle system can result in a growing number of metadata bucket operations which can eventually block mutations for a given Eventing function (in 6.5.X a user space work around can be accomplished via a second cooperative Function). The severity is governed by:
    • The number of vBuckets holding an active timer. Therefore if there are only a few timers in the future the issue may not be noticeable or materialize.  This is the case with just a few cron schedules but for completeness in case you add date functionality I put in a fix for this issue for the code supplied in this article.
    • Whether an Eventing timer has fired recently on a vBucket (which clears the issue for the given vBucket on a per function basis). Therefore systems with lots of near term timer activity will not experience this issue even if timers are scheduled far into the future.

Fortunately in version 6.6.0 both of the above issues or restrictions are lifted and a scheduler can be made in a single simple unified Eventing Function. Eventing cron update

Prerequisites

In this article we will be using the latest GA version, i.e. Couchbase version 6.5.1 (you may need to make some changes to the Eventing Functions described for earlier Couchbase versions). The example in this article will run against the travel-sample data set which is delivered with the Couchbase server.

PRO TIP: For Advanced users only, if you are familiar with Couchbase Eventing and also our CLI / REST tools you can skip the bulk of this blog and download a ZIP file to quickly setup and run the scheduler system presented below.  Right-click on the following link and choose Save Link As to download the file  cron_impl_2func_CLI.zip, move it to an Eventing node, extract the ZIP file, and refer to the extracted README.txt file.

However, if you are not familiar with Couchbase or the Eventing service please walk through GET STARTED and one Eventing example specifically refer to the following:

  • Setup a working Couchbase 6.5.1 server as per the directions in Start Here!
  • Make sure you can run a N1QL query against the travel-sample data set as per the directions in Run Your First N1QL Query.
  • Understand how to deploy a basic Eventing function as per the directions in the Document Archival example that also uses the travel-sample data set.
  • Make sure you have the travel-sample bucket in the Buckets view of the UI.
  • Make sure you a bucket called metadata in the Buckets view of the UI it should have the minimum size of 200MB.
  • In the Buckets view of the UI create a bucket called crondata with the minimum size of 200MB. For detailed steps on how to create buckets, see Create a Bucket.
  • Set allow_interbucket_recursion to true in order to allow two (2) Eventing functions to alter the same KV document [2].
    Java
     




    x


     
    1
    curl -X POST -u "$CB_USERNAME:$CB_PASSWORD" 'http://localhost:8091/_p/event/api/v1/config' -d '{ "allow_interbucket_recursion":true }'



Implementation #1, Hardcoded ‘Cron’ Like Scheduling

For our first implementation, e.g. Part 1 of the series, we will design a simple control structure which is merely a KV JSON document and also two (2) Eventing Functions that will respond to and act upon the information in the control structure. Below is a design of a JSON document, or control structure, that will allow us to have multiple scheduled “events”. Each scheduled event will have its own control document with a unique KEY such as recurring_event::1, recurring_event::1, … recurring_event::N.

The JSON structure itself contains information to “reconstitute the key” as our scheduling system will respond to changes or updates (mutations) to the control documents, such as toggling the “active” state to enable or disable the action or changing “verbose” field which controls the amount and style of logging. The following is an example control document with KEY recurring_event::1 that will execute the JavaScript function doCronActionA at 14:54 (2:30 pm) every day.

JSON Control Record Description
{
   "type":"recurring_event", The KEY will be <<type>>::<<id>>
   "id":1,
   "hour":14, The hour of the day 0-23, *, *2X, *4X to trigger
   "min":54, The minute in the hour 0-59, *, *2X, *4X to trigger
   "action":"doCronActionA", JavaScript function to run when the timer fires
   "active":true, Flag to enable or disable this schedule
   "verbose": { [OPTIONAL] logging control
      "user_func":2, Logging level for the action logic : 0=none, etc. etc.
      "scheduler":3 Logging level for the cron logic : 0=none, etc. etc.
   },
   "dynamic": { [DYNAMIC] system control and statistics
      "state":"arm", "arm"|"rearm"|"pending" any value != "pending" start a schedule
      "next_sched": 0, Number of seconds since epoch to next desired schedule
      "prev_sched": 0, Number of seconds since epoch for previous schedule
      "prev_etime": 0, Number of seconds since epoch for previous schedule actual exec time
      "prev_delay": 0, Number of seconds that the timer was delayed from the schedule
      "prev_atime": 0 Number of seconds taken by the user 'action'
   }
}

Like traditional Linux crontab you can set hour and min to legal integers, and you can also set hour to "*" to process for all hours or set min to "*" to process for all minutes. Although we will not support the full crontab syntax we do support two non-standard settings as follows if you set bothhour and min to "*4X" we will execute and re-arm four (4) times a minute and if you set them both to "*2X" we will execute and re-arm two (2) times a minute.  Below is a table of supported schedules with description:

hour min Values can be numbers or strings
13 32 Run at 13:32 (or 1:32 pm)
* 15 Run every hour at 15 minutes past
8 12 Run once a day at 8:32 (or 8:32 am)
* * Run once a minute
*2X *2X Run twice a minute – requires both hour and min set to “*2X”
*4X *4X Run four times a minute – requires both hour and min set to “*2X”

Eventually we will use the Query Workbench to insert the cron control documents all of which must have a unique KEY of recurring_event::# to a scheduled time of execution of 14:54 ( 2:54 pm), for the action doCronActionA, we could use the following N1QL statement. Don’t worry about actually running any N1QL statements right now, we will perform the N1QL statements later after we have built and deployed our Eventing Function. You can create a control record (or records) in the bucket travel-sample, and then list it, arm, disarm it, adjust the schedule it follows, change the verbosity level for logging, or delete it as follows:

Action N1QL statement
Create a schedule INSERT INTO `travel-sample` (KEY,VALUE) VALUES ( "recurring_event::1", { "type":"recurring_event", "id":1, "hour":"14", "min":"54", "action":"doCronActionA", "active":true } );
Make an index to query data without specifying keys CREATE primary INDEX on `crondata` ;
Show all schedules order by id SELECT * FROM `crondata` WHERE type="recurring_event" order by id ;
Show specific schedule SELECT * FROM `crondata` WHERE type="recurring_event" AND id=1 ;
Arm or set active UPDATE `crondata` SET active = true WHERE type="recurring_event" AND id=1 ;
Disarm or set inactive UPDATE `crondata` SET active = false WHERE type="recurring_event" AND id=1 ;
Adjust time of trigger UPDATE `crondata` SET hour = 11, min = 30 WHERE type="recurring_event" AND id=1 ;
Adjust logging of the "action" UPDATE `crondata` SET verbose.user_data = 0 WHERE type="recurring_event" AND id=1 ;
Adjust logging of the scheduler logic UPDATE `crondata` SET verbose.scheduler = 0 WHERE type="recurring_event" AND id=1 ;
Delete the schedule DELETE FROM `crondata` WHERE type="recurring_event" AND id=1 ;

Assume we have four (4) active schedules, running the first N1QL statement, above will list all of them e.g.

SELECT active, action, hour, min, type, id, verbose.user_func,

verbose.schedul

FROM `crondata` where type="recurring_event" order by id;

Would return something like the following output (table view in the Query Workbench):

active action hour id min scheduler type user_func
true "doCronActionA" 14 1 54 1 "recurring_event" 2
true "doCronActionB" * 2 * 1 "recurring_event" 1
true "doCronActionC" *2X 3 *2X 4 "recurring_event" 4
true "doCronActionD" * 4 0 0 "recurring_event" 1

In the above table we have four actions the first runs once a day, the second runs every minute, the third every 30 seconds, and the fourth runs once an hour.  In a future installment in this series we will add "day of week" capability. The JSON Control Record's nested object "verbose" if not supplied will default to  { "user_func":1, "scheduler":1 } indication a low or terse logging level for the action function and also the scheduling logic. 

A value of 0 will suppress all log messages, i.e. doCronActionD, while larger values will be more verbose, i.e. as defined in doCronActionC. The JSON Control Record's nested object "dynamic" if typically never supplied and will default to { "state": "arm", "next_sched": 0, "prev_sched": 0, "prev_etime": 0, "prev_delay": 0, "prev_atime": 0 } this is a scratch pad for the running Eventing logic schedule and also provides useful statistics on execution times as such it should be treated as read-only. At this point we have a high-level control design, but we need logic to process our control structures, this is where the Couchbase Eventing Service, specifically an Eventing Function comes into play.

The Eventing Functions

This design requires two (2) Eventing functions: a main JavaScript function "cron_impl_2func_651" and a small helper JavaScript function "cron_impl_2func_651_help". We will discuss each section of the JavaScript functions that comprises the initial implementation combined JavaScript code almost 610 lines (with about 44% of the lines are comments and whitespace) Don’t worry about doing a cut-n-paste right now, as later I will provide a  link to download (for import) the two required Eventing Functions and all the required settings in two files named "cron_impl_2func_651.json" "cron_impl_2func_651_help.json" and also if you prefer the two full unified functions that can be cut-n-pasted directly. Our Main Eventing Function "cron_impl_2func_651" will be composed of nine (9) JavaScript functions

  • Three (3) business logic functions, (two which are empty shells).
    • doCronActionA(doc) - an N1QL example user action to execute
    • doCronActionB(doc) - an empty user action shell for experiments
    • doCronActionC(doc) - an empty user action shell for experiments
  • One (1) entry point for Eventing.
    • OnUpdate(doc, meta) - the standard Eventing entry point for Inserts or Updates
  • One (1) cronsyntax parser to generate the next schedule.
    • getNextRecurringDate(hour_str, min_str) - cron logic to find the next scheduled Date
  • Three (3) support functions to check that the business logic exists or format results.
    • verifyFunctionExistsViaEval(curDoc, id) - make sure we have a function to run
    • toNumericFixed(number, precision) - format a float to a compact style
    • toLocalISOTime(d) - format a date to a compact style
  • One (1) callback function when timers are executed.
    • Callback(doc) - a callback function for scheduled timers

Our Helper Eventing Function "cron_impl_2func_651_help" will be composed of one (1) JavaScript function

  • One (1) entry point for Eventing.
    • OnUpdate(doc, meta) - the standard Eventing entry point for Inserts or Updates

In the subsequent sections we will walk through each individual of the above JavaScript functions.

We Need a Javascript Function, E.G. the Business Logic to Run on a Periodic Schedule.

The first thing we want is a routine or function which has our business logic that we will execute based upon out crontab rules.  We will call the JavaScript method doCronActionA(doc), however it can be called anything for example doPeriodicLedgerBalance(doc), the only requirements for our “action” functions that implements our scheduled business logic are as follows:

  • Has one parameter: doc, a control document as described above of type=”recurring_event”.
  • The actual JavaScript name matches the “action” field in the control document.
  • Returns true on success and false on failure
  • Utilizes doc.verbose.user_func to control logging if 0 it is silent, if 1 it emits a single line, if 2 it emits whatever log information is needed to debug the function, etc. etc..

We will write our function doCronActionA(doc), to run an embedded N1QL query ) to combine airline counts by country and then make a single KV document of calculated data.

SELECT country, count( * ) AS cnt FROM `travel-sample`

WHERE `type` = 'airline' GROUP BY country; 

On my test system a small single node non-MDS server (running all Couchbase services) the above N1QL takes about 20 ms. (for clarity sake pretend it is super complex it takes 10 seconds to complete). The idea here is that the final calculated and summarized KV document can be quickly loaded by a 100K (or a million) Eventing mutations per second without the additional overhead communication with the Query service nodes and of processing N1QL statements on each mutation. It should be obvious that the goal of this particular business logic, doCronActionA(doc), is to create a semi-static cache that updates periodically on a schedule.

All we are really doing (and it’s fairly fast) is getting a count of airlines by country from the travel-sample document set.  As we use N1QL we build up a document and eventually write it out to KV as a summarized document.   The key point to drive home here is that we do not want to repeat the same work for millions of mutations each, especially since some calculations might take 10 seconds of Query service compute time each time we kick off an embedded N1QL query from an Eventing function.

Below we show the JavaScript function we want to run once a day (or perhaps once an hour, etc.).  Note the name of the function matches the name in the control structure action field.  For more details on Eventing terminology and language constructs please refer to the Couchbase documents and examples at Eventing Service: Fundamentals.

Java
 




xxxxxxxxxx
1
59


 
1
function doCronActionA(doc) {
2
  try {
3
    // Check that doc has desired values
4
    if (!doc.type || doc.type !== "recurring_event" || !doc.active || doc.active !== true) return;
5
    if (doc.verbose.user_func >= 1)
6
      log(doc.action + ' user action controlled by ' + doc.type + '::' + doc.id);
7
 
          
8
    // this is a 6.5 N1QL query (feature not available in GA prior to 6.5)
9
    // Create an embedded N1QL iterator by issuing a SELECT statement to get the
10
    // counts of airlines by country.  Make a new document and write it out to KV 
11
 
          
12
    // We will use the iterator to create a KV document representing the results of a
13
    // HARD lengthy embedded N1QL query and write it back to KV, the idea is to keep
14
    // a calculation up to date once a day such that it that can be read 'quickly' 
15
    // by other Eventing Functions, other Couchbase services or SDKs.   
16
 
          
17
    // Consider if we had 1 million docs in a minute do we really want to use N1QL
18
    // to recalculate something that is almost static for all 1 million documents, of 
19
    // course not, so we make an intermediate value that can be read into Eventing
20
    // and used via a single 'light weight' KV read.
21
 
          
22
    var q_iter = SELECT country,
23
      count( * ) cnt
24
    FROM `travel-sample`
25
    WHERE `type` = 'airline'
26
    GROUP BY country;
27
 
          
28
    // loop through the result set and update the map 'accumulate'
29
    var accumulate = {};
30
    var idx = 0;
31
    for (var val of q_iter) {
32
      if (doc.verbose.user_func >= 2)
33
        log(doc.action + ' N1QL idx ' + idx + ', country ' + val.country + " cnt " + val.cnt);
34
      accumulate[val.country] = val.cnt;
35
      idx++;
36
    }
37
    // close out embedded N1QL iterator
38
    q_iter.close();
39
 
          
40
    // Now let’s make a cached KV document representing a HARD length embedded N1QL
41
    // query and write it back to KV, we need a KEY and a type and id and then we 
42
    // upsert it into the `travel-sample` bucket.
43
 
          
44
    var cachedoc = {};
45
    cachedoc.type = "cron_cache";
46
    cachedoc.id = "airlines_by_country";
47
    cachedoc.date = new Date();
48
    cachedoc.data = accumulate;
49
    var ckey = cachedoc.type + '::' + cachedoc.id;
50
    ts_bkt[ckey] = cachedoc;
51
    if (doc.verbose.user_func >= 2) {
52
      log(doc.action + ' upsert to KV with KEY ' + ckey + ' cachedoc ', cachedoc);
53
    }
54
  } catch (e) {
55
    log(doc.action + ' Error exception:', e);
56
    return false;
57
  }
58
  return true;
59
}



The above function merely 1) queries the travel-sample bucket to extract data in this case the count of airlines for each country, 2) creates a new KV document and key and writes it out to travel-sample bucket for later use. In addition, as part of this example we have built logging that responds to a numeric verbosity setting which a) logs a single line if control document has a value for doc.verbose.user_func == 1 or b) emits more information if the doc.verbose.user_func value >= 2. This is a generic framework that can run one (1) cron action or even a thousand (1000) of cron actions. As such I have provided two additional “empty” function shells – as pointed out before they could have been named anything.

Java
 




xxxxxxxxxx
1
15


 
1
function doCronActionB(doc) {
2
  try {
3
    // check that doc has desired values
4
    if (doc.type !== "recurring_event" || doc.active !== true) return;
5
    if (doc.verbose.user_func >= 1)
6
      log(doc.action + ' user action controlled by ' + doc.type + '::' + doc.id);
7
 
          
8
    // YOUR LOGIC HERE
9
 
          
10
  } catch (e) {
11
    log(doc.action + ' Error exception:', e);
12
    return false;
13
  }
14
  return true;
15
}



and

Java
 




xxxxxxxxxx
1
16


 
1
function doCronActionC(doc) {
2
  try {
3
    // check that doc has desired values
4
    if (doc.type !== "recurring_event" || doc.active !== true) return;
5
    if (doc.verbose.user_func >= 1)
6
      log(doc.action + ' user action controlled by ' + doc.type + '::' + doc.id);
7
 
          
8
    // YOUR LOGIC HERE
9
 
          
10
  } catch (e) {
11
    log(doc.action + ' Error exception:', e);
12
    return false;
13
  }
14
  return true;
15
}
16
 
          



These above functions doCronActionB  and doCronActionC  are trivial as they merely log information to the Eventing Application log of the Eventing function.  Refer to Logging Functions for more details. Of course, you need a control document of type="recurring_event" with active=true and an action like action = "doCronActionB" to actually enable and execute them.

We Need an Eventing Entry Point or Handler

As of version 6.5 two entry points or handlers that are supported by the Eventing Service OnUpdate(doc, meta) and OnDelete(meta) we are only interested in the OnUpdate(doc,meta) for this example. The OnUpdate(doc,meta) handler gets called when any document in the source bucket is created or modified (mutated) and immediately filters out documents of no interest. [3]

Java
 




xxxxxxxxxx
1
85


 
1
function OnUpdate(doc, meta) {
2
  // fix for 6.5.X growing bucket ops
3
  if (doc.type === "_tmp_vbs") genNoopTimers(doc, meta, 30);
4
  if (!cron_bkt["fix_timer_scan_issue::1"]) {
5
      cron_bkt["fix_timer_scan_issue::1"] = {};
6
  }
7
    
8
  try {
9
    // Check if further analysis is needed we only trigger on an active recurring_event 
10
    if (doc.type !== "recurring_event" || doc.active !== true) return;
11
 
          
12
    var update_doc = false;
13
    if (!doc.dynamic) {
14
      // Add if missing doc.dynamic with defaults
15
      doc.dynamic = {
16
        "state": "arm",
17
        "next_sched": 0,
18
        "prev_sched": 0,
19
        "prev_etime": 0,
20
        "prev_delay": 0,
21
        "prev_atime": 0
22
      };
23
      // we need to update the document once we have the next schedule
24
      update_doc = true;
25
    }
26
    if (!doc.verbose) {
27
      // Add if missing doc.dynamic with defaults
28
      doc.verbose = {
29
        "user_func": 1,
30
        "scheduler": 1
31
      };
32
      // we need to update the document once we have the next schedule
33
      update_doc = true;
34
    }
35
    // Do not process dynamic.state pending
36
    if (!doc.dynamic || !doc.dynamic.state || doc.dynamic.state === "pending") return;
37
 
          
38
    var mid = doc.type + "::" + doc.id; // this is the same as meta.id or the KEY
39
    var hour = doc.hour;
40
    var min = doc.min;
41
 
          
42
    // Do an eval check the JavaScript function exists. The eval occurs in a common 
43
    // utility function shared with RecurringCallback
44
    if (!verifyFunctionExistsViaEval(doc, mid)) {
45
      // doc.action did not exist, we have already logged the issue
46
      return;
47
    }
48
 
          
49
    // Get the next valid execution time
50
    var date_timer = getNextRecurringDate(hour, min);
51
    var next_sched = Math.round(date_timer.getTime() / 1000);
52
    if (!update_doc && next_sched !== doc.dynamic.next_sched) {
53
      // the next_sched should be the same as the setting from the helper application, however
54
      // if we undeploy/deploy or pause/resume we might haver to reschedule to the next time slot
55
      log('OnUpdate U ' + mid + ' calculated next_sched !== doc.dynamic.next_sched, delta ' +
56
        (next_sched - doc.dynamic.next_sched) + ', reschedule');
57
      update_doc = true;
58
    }
59
 
          
60
    if (update_doc) {
61
      // this mutation is recursive and will be suppressed, we ensure we have a dynamic structure
62
      doc.dynamic.next_sched = next_sched;
63
 
          
64
      // rather then the call a function, to trap and retry if there is a resource issue
65
      // cron_bkt[mid] = doc;
66
      if (!tryBucketKvWriteWithLog('OnUpdate F', mid, doc)) {
67
        // Failed to write doc to cron_bkt[key] the error has been logged
68
        // and there is nothing more we can do.
69
        return;
70
      }
71
    }
72
 
          
73
    // Schedule an Eventing timer
74
    var timer_id = createTimer(Callback, date_timer, null, doc);
75
    if (doc.verbose.scheduler >= 1) {
76
      log('OnUpdate A ' + mid + ' rcv mutation (initial or rearm) schedule timer at ' +
77
        toLocalISOTime(date_timer));
78
    }
79
    if (doc.verbose.scheduler >= 2) {
80
      log('OnUpdate B ' + mid + ' recurring timer was created, timer_id ' + timer_id);
81
    }
82
  } catch (e) {
83
    log('OnUpdate E ' + meta.id + ', Error exception:', e);
84
  }
85
}



The key here is that the cron logic in our handler only cares about documents that have doc.type of "recurring_event and also a doc.active of true.  In addition, in this example we have built tracing for the cron housekeeping logic which is only logged to the Application log if the control document has a value for doc.verbose >= 3. If you only run a few schedules you can turn off the user space work or "fix for 6.5.X growing bucket ops" by commenting four lines of code in the above OnUpdate block for "cron_impl_2func_651" as follows:

Java
 




xxxxxxxxxx
1


 
1
function OnUpdate(doc, meta) {
2
  // fix for 6.5.X growing bucket ops
3
  // if (doc.type === "_tmp_vbs") genNoopTimers(doc, meta, 30);
4
  // if (!cron_bkt["fix_timer_scan_issue::1"]) {
5
  //     cron_bkt["fix_timer_scan_issue::1"] = {};
6
  // }



We Need Code to Work Around Possible Growing Bucket Ops for 6.5.X

As of version 6.5.X we need a "fix for 6.5.X growing bucket ops" which happens on idle systems with lots of timers scheduled in the future.  This code ensures that an Eventing timer has fired recently on a vBucket (which clears the issue for the given vBucket on a per function basis).

Java
 




xxxxxxxxxx
1
62


 
1
// FIXUP: ADDIN FUNCTON
2
function noopTimer(context) {
3
    // fix for 6.5.X growing bucket ops
4
    try {
5
        if (context.type === "_tmp_vbs" && context.vb === 0) { 
6
            // log("noopTimer timers firing, printing only for vBucket 0");
7
        }
8
    } catch (e) {
9
        log("OnUpdate Exception in callback noopTimer:", e);
10
    }
11
}
12
 
          
13
// FIXUP: ADDIN FUNCTON
14
function rearmTimer(context) {
15
    // fix for 6.5.X growing bucket ops
16
    try {
17
        if (context.type === "_tmp_vbs" && context.vb === 0) { 
18
            // Update/touch all docs in the helper_bucket the helper function will then
19
            // mutate all 1024 of type == vbs_seed (64 on MacOS) to create a recuring cycle.
20
            // log("noopTimer timer fired all 1024 vBuckets, logging only vb 0", context);
21
            
22
            // generate a mutation to re-arm the HELPER function: fix_scan_issue
23
            // which will in turn make new mutations for this Function
24
            var cur = cron_bkt[context.key];
25
            if (cur && cur.ts_millis === context.ts_millis) {
26
                // log("rearmTimer update fix_timer_scan_issue::1 in helper_bucket alias only for vBucket 0");
27
                var now = new Date();
28
                cron_bkt["fix_timer_scan_issue::1"] = { "last_update": now };
29
            } else {
30
                // NOOP we had multiple timer cycles, just let this one quietly stop.
31
            }
32
        }
33
    } catch (e) {
34
        log("OnUpdate Exception in callback rearmTimer:", e);
35
    }
36
}
37
 
          
38
// FIXUP: ADDIN FUNCTON
39
function genNoopTimers(doc, meta, seconds) {
40
    // fix for 6.5.X growing bucket ops
41
    try {
42
        // redundant but play it safe
43
        if (doc.type === "_tmp_vbs") {
44
            // Since we are using an different function a timer on all our vBuckets do immeadiately (can take up to 15 seconds)
45
            // If we used cross bucket recursion to rearm all the timers in a recurring fashion we would add a delay of at least 40 seconds.
46
            createTimer(noopTimer, new Date(), null, doc);
47
            if (doc.vb === 0) { 
48
                // Update/touch all docs in the helper_bucket the helper function will then
49
                // mutate all 1024 of type == vbs_seed (64 on MacOS) to create a recuring cycle.
50
                // log("noopTimer timer fired all 1024 vBuckets, logging only vb 0", context);
51
            
52
                // generate a mutation to re-arm the HELPER function: fix_scan_issue
53
                // which will in turn make new mutations for this Function
54
                
55
                // log("genNoopTimers make timer to rearm fix_timer_scan_issue::1");
56
                createTimer(rearmTimer, new Date(new Date().getTime() + seconds * 1000), null, doc);
57
            }
58
        }
59
    } catch (e) {
60
        log("OnUpdate Exception in genNoopTimers:", e);
61
    }
62
}



We Need a Utility to Calculate the Next Time in the Schedule

The next function getNextRecurringDate(hour, min) will determine a time to execute the action as defined as part of our schedule.  This is not a full implementation of cron, rather it contains the key standard features to execute once a day, once an hour, once a minute.  It also contains some non-standard syntax to provide the ability to execute twice a minute or four times a minute. As previously described the function getNextRecurringDate(hour, min) allows for the following (the table is duplicated below), with the last two being non-standard. [4]

hour min Values can be numbers or strings
13 32 Run at 13:32 (or 1:32 pm)
* 15 Run every hour at 15 minutes past
8 12 Run once a day at 8:32 (or 8:32 am)
* * Run once a minute
*2X *2X Run twice a minute – requires both hour and min set to “*2X”
*4X *4X Run four times a minute – requires both hour and min set to “*2X”

Below is an implementation of the required logic for determining the next time to trigger an Eventing timer in our schedule, in the event the user logic in our first example doCronActionA(doc) doesn’t complete timely, e.g. real-time overrun, the next quanta of the schedule will be selected.  Note both Timers and their Parent Functions. So, if an Eventing Function has a default execution timeout of 60 seconds, if need be this setting can be adjusted or raised.

Java
 




xxxxxxxxxx
1
79


 
1
function getNextRecurringDate(hour_str, min_str) {
2
  // Note Javascript Dates are in milliseconds
3
  var date_now = new Date();
4
  var date_ret = new Date();
5
  var hour;
6
  var min;
7
 
          
8
  try {
9
    hour = parseInt(hour_str);
10
  } catch (e) {}
11
  try {
12
    min = parseInt(min_str);
13
  } catch (e) {}
14
 
          
15
  // Note, this is only a simplistic partial 'crontab' syntax with some slight extensions
16
  // it allows once a day, once an hour, once a minute.  It also contains some non-standard 
17
  // syntax to provide the ability to execute twice a minute or four times a minute.
18
 
          
19
  if (hour_str === '*4X' && min_str === '*4X') {
20
    // once every 15 seconds or four times a minute
21
    date_ret.setMilliseconds(0);
22
    date_ret.setSeconds(15);
23
    while (date_ret.getTime() < date_now.getTime()) {
24
      date_ret.setSeconds(date_ret.getSeconds() + 15);
25
    }
26
    return date_ret;
27
  } else
28
  if (hour_str === '*2X' && min_str === '*2X') {
29
    // once every 30 seconds or twice a minute
30
    date_ret.setMilliseconds(0);
31
    date_ret.setSeconds(30);
32
    while (date_ret.getTime() < date_now.getTime()) {
33
      date_ret.setSeconds(date_ret.getSeconds() + 30);
34
    }
35
    return date_ret;
36
  } else
37
  if (hour_str === '*' && min_str === '*') {
38
    // once a minute 
39
    date_ret.setMilliseconds(0);
40
    date_ret.setSeconds(0);
41
    date_ret.setMinutes(date_ret.getMinutes() + 1);
42
  } else
43
  if (hour_str !== '*' && isNaN(hour) === false && min_str === '*') {
44
    // once a minute only for a given hour
45
    date_ret.setMilliseconds(0);
46
    date_ret.setSeconds(0);
47
    date_ret.setMinutes(date_ret.getMinutes() + 1);
48
    if (date_ret.getTime() < date_now.getTime()) { date_ret.setHours(hour); } if (date_ret.getTime() > date_now.getTime()) {
49
      date_ret.setDate(date_ret.getDate() + 1);
50
      date_ret.setSeconds(0);
51
      date_ret.setMinutes(0);
52
      date_ret.setHours(hour);
53
    }
54
  } else
55
  if (hour_str === '*' && min_str !== '*' && isNaN(min) === false) {
56
    // once a hour at a given minute
57
    date_ret.setMilliseconds(0);
58
    date_ret.setSeconds(0);
59
    date_ret.setMinutes(min);
60
    // schedule for next hour
61
    date_ret.setHours(date_ret.getHours() + 1);
62
  } else
63
  if (isNaN(hour) === false && isNaN(min) === false) {
64
    // once a day for a given hour and a given minute 
65
    date_ret.setMilliseconds(0);
66
    date_ret.setSeconds(0);
67
    date_ret.setMinutes(min);
68
    date_ret.setHours(hour);
69
    if (date_ret.getTime() < date_now.getTime()) {
70
      // schedule for tomorrow
71
      date_ret.setDate(date_ret.getDate() + 1);
72
    }
73
  } else {
74
    log('getNextRecurringDate illegal input hour_str <' + hour_str + '> min_str <' + min_str + '>');
75
    throw new Error('getNextRecurringDate illegal input hour_str <' + hour_str + '> min_str <' + min_str + '>');
76
    return null;
77
  }
78
  return date_ret;
79
}



We Need a Few Small Utilities

The common utility function that merely checks if our JavaScript exists used by both OnUpdate(doc,meta), shown above, and the timer Callback(doc), shown later.  Below is verifyFunctionExistsViaEval(curDoc, id) which takes two arguments a JSON control document and the KEY for that document. This lets us know immediately, on deployment, if there was an issue with a naming mismatch between the JSON control record document and the actual name of the business logic function in the JavaScript code.

Java
 




xxxxxxxxxx
1
16


 
1
function verifyFunctionExistsViaEval(curDoc, id) {
2
  var result = false;
3
  try {
4
    // check for function if missing this is invalid return result
5
    result = eval("typeof " + curDoc.action + " === 'function';");
6
    if (result === false) {
7
      if (curDoc.verbose.scheduler >= 1)
8
        log("Warn/Disable (No Action and No Re-Arm), because required 'action' of " +
9
          curDoc.action + "(doc) does not exist, id is", id);
10
      return result;
11
    }
12
  } catch (e) {
13
    log('verifyFunctionExistsViaEval Error exception:', e);
14
  }
15
  return result;
16
}



Note, if an attempt to run a non-existent function the end user will get a warning in the Application log cron_impl_2func_651.log  to correct the issue.

2020-04-22T16:20:38.725-07:00 [INFO] "Warn/Disable (No Action and No Re-Arm), because required 'action' of doCronMyNewFunction(doc) does not exist, id is" "recurring_event::1"

This correction can be done via a Pause/Resume adding the function and then adjusting the control document with the specified id or KEY (via a toggle active to false then true) -or- adjusting the control document to point to an existing function in your handler. Next the utility toNumericFixed(number, precision) just allows nice compact formatting of floats for our log messages.

Java
 




xxxxxxxxxx
1


 
1
function toNumericFixed(number, precision) {
2
  var multi = Math.pow(10, precision);
3
  return Math.round((number * multi).toFixed(precision + 1)) / multi;
4
}



Finally, the utility toLocalISOTime(d) just allows nice compact formatting of Dates for our log messages.

Java
 




xxxxxxxxxx
1


 
1
function toLocalISOTime(d) {
2
  var tzoffset = (new Date()).getTimezoneOffset() * 60000; //offset in milliseconds
3
  return (new Date(d.getTime() - tzoffset)).toISOString().slice(0, -1);
4
}



We Need a Timer Callback to Execute the User Logic and Re-Arm the Timer

The final JavaScript function in "cron_impl_2func_651" is the Timer callback, which is called when the scheduled timer fires. The callback function must be a top-level function that takes a single argument, the context. In this case in our OnUpdate handler we referenced a JavaScript function of Callback(doc) with a context of doc (our active scheduler control document of type="recurring_event") In version 6.6 we can create another timer within a timer but for all previous versions we will need to trigger a mutation to a "helper" function (we carefully avoid infinite recursion).  In 6.6 the helper function is not need and the logic is substantially simplified.

Java
 




xxxxxxxxxx
1
119


 
1
function Callback(doc) {
2
  try {
3
    var fired_at = new Date();
4
 
          
5
    // Check if further analysis is needed we only trigger on a recurring_event that is active
6
    if (doc.type !== "recurring_event") return;
7
    // doc must have 'action', 'dynamic {}', verbose {}, dynamic.state
8
    if (!doc.action || !doc.dynamic || !doc.verbose || !doc.dynamic.state) return;
9
    // process any doc.dynamic.state BUT pending 
10
    if (doc.dynamic.state === "pending") return;
11
 
          
12
    // ==================
13
    // Check if still active
14
 
          
15
    // We make sure that in KV the 'doc' still exists and that it is still active if not just 
16
    // return thus skipping the action and not Re-arming the timer. Note `travel-sample` is 
17
    // aliased to the map 'cron_bkt
18
 
          
19
    var mid = doc.type + '::' + doc.id; // make our KEY
20
    var curDoc = null;
21
    try {
22
      // read the current version of doc from KV, e.g. curDoc
23
      curDoc = cron_bkt[mid];
24
    } catch (e) {} // needed for pre 6.5, note pure 6.5+ deployment returns null sans exception
25
 
          
26
    var reason = null;
27
    if (!curDoc || curDoc === null) {
28
      reason = "cron document is missing";
29
    } else
30
    if (!curDoc.active) {
31
      reason = "cron document has active = false";
32
    } else
33
    if (!curDoc.dynamic.state || curDoc.dynamic.state !== doc.dynamic.state) {
34
      reason = "cron document wrong dynamic.state expected " + doc.dynamic.state;
35
    } else
36
    if (crc64(doc) !== crc64(curDoc)) {
37
      reason = "cron document changed";
38
    }
39
 
          
40
    if (reason !== null) {
41
      if (!curDoc || curDoc === null || curDoc.verbose.scheduler >= 1) {
42
        log('Callback X ' + mid + " ignore/stop this timer's schedule because " + reason);
43
      }
44
      if (!curDoc || curDoc === null || curDoc.verbose.scheduler >= 4) {
45
        log('Callback Y ' + mid + ' timer doc', doc);
46
        log('Callback Z ' + mid + ' KV curDoc', curDoc);
47
      }
48
      return;
49
    }
50
 
          
51
    // ==================
52
    // Verify user routine exists and if so eval it 
53
 
          
54
    // Assume curDoc.action contains something like "doCronActionA" and we have a function in 
55
    // this handler like "doCronActionA(doc)". Below we use curDoc as the end user should be 
56
    // able to alter the eval'd JavaScript function.  We will execute two (2) evals.
57
 
          
58
    // First eval check the JavaScript function exists.  The eval occurs in a common 
59
    // utility function shared with Callback
60
    if (!verifyFunctionExistsViaEval(curDoc, mid)) {
61
      // curDoc.action did not exist, we have already logged the issue
62
      return;
63
    }
64
 
          
65
    // Second eval execute and process the user function we execute the defined function 
66
    // with an argument of curDoc
67
    var beg_act = new Date();
68
    var result = null;
69
    eval("result = " + curDoc.action + "(curDoc);");
70
    var end_act = new Date();
71
    var atime_ms = end_act.getTime() - beg_act.getTime();
72
 
          
73
    if (curDoc.verbose.scheduler >= 2)
74
      log('Callback R ' + mid + ' action took ' + toNumericFixed((atime_ms / 1000), 3) +
75
        ' sec., returned ' + result);
76
 
          
77
    // ==================
78
    // Calculate next time and mutate the control document for our our helper function
79
    // which will create another mutation such that OnUpdate of this function will pick
80
    // it up and generate the timer (avoids the MB-38554 issue).
81
 
          
82
    var hour = curDoc.hour;
83
    var min = curDoc.min;
84
    var date_timer = getNextRecurringDate(hour, min);
85
 
          
86
    curDoc.dynamic.prev_delay =
87
      toNumericFixed(((fired_at.getTime() / 1000) - curDoc.dynamic.next_sched), 3);
88
    curDoc.dynamic.prev_sched = curDoc.dynamic.next_sched;
89
    curDoc.dynamic.prev_etime = Math.round(fired_at.getTime() / 1000);
90
    curDoc.dynamic.prev_atime = toNumericFixed((atime_ms / 1000), 3);
91
 
          
92
    curDoc.dynamic.state = "pending";
93
    curDoc.dynamic.next_sched = Math.round(date_timer.getTime() / 1000);
94
    
95
    try {
96
      cron_bkt[mid] = curDoc;
97
    } catch (e) {
98
      log('Callback help: F ' + mid + ' FATAL could not update KV cron cycle ' + curDoc.action);
99
      return;
100
    }
101
 
          
102
    if (curDoc.verbose.scheduler >= 1) {
103
      log('Callback A ' + mid + ' gen mutation #1 to doc to force schedule rearm at ' +
104
        toLocalISOTime(date_timer));
105
    }
106
    if (curDoc.verbose.scheduler >= 2) {
107
      log('Callback B ' + mid + ' sched ' + curDoc.dynamic.prev_sched +
108
        ', actual ' + curDoc.dynamic.prev_etime +
109
        ', delay ' + curDoc.dynamic.prev_delay +
110
        ', took ' + curDoc.dynamic.prev_atime);
111
    }
112
    if (curDoc.verbose.scheduler >= 3) {
113
      log('Callback C ' + mid + ' curDoc', curDoc);
114
    }
115
  } catch (e) {
116
    var mid = doc.type + '::' + doc.id; // make our KEY
117
    log('Callback E ' + mid + ' Error exception:', e);
118
  }
119
}



We Need a Helper Function to Trigger a New Mutation

Because prior to 6.6 (which is not yet released) you cannot create a timer from within an executing timer's callback we need a second Eventing Function (along with "allow_interbucket_recursion":true) to trigger a mutation such that we can generate all our timers in the main Eventing Function's OnUpdate(doc,meta) entry point.  We do this as follows:

  1. cron_impl_2func_651 OnUpdate(doc,meta) receives a mutation, schedules a timer
  2. cron_impl_2func_651 After an amount of time when the timer matures the Callback(doc) routine is executed, first runs the desired user action and then it creates a mutation #1 on the control document (which is not seen by the creating Function to prevent recursion)
  3. cron_impl_2func_651_help OnUpdate(doc,meta) receives a mutation, makes another mutation #2 on the control document this triggers 1. above in an endless cycle.

Note, in Couchbase release 6.6 we don't need a helper function at all because you are allowed to you create a timer from within an executing timer.  This greatly simplifies the needed logic to make a cron system [2]. The sole JavaScript function in "cron_impl_2func_651_help" OnUpdate(doc,meta) is shown below.

Java
 




xxxxxxxxxx
1
77


 
1
function OnUpdate(doc, meta) {
2
  // fix for 6.5.X growing bucket ops
3
  if (meta.id.startsWith("fix_timer_scan_issue:")) upsertOneDocPerBucket(doc, meta);
4
  
5
  try {
6
    // Check that doc has desired values
7
    if (!doc.type || doc.type !== "recurring_event" || !doc.active || doc.active != true) return;
8
    // doc must have 'action', 'dynamic {}', verbose {}, dynamic.state
9
    if (!doc.action || !doc.dynamic || !doc.verbose || !doc.dynamic.state) return;
10
    // Only process state pending this will only exist for a 'breif' time
11
    if (doc.dynamic.state !== "pending") return;
12
 
          
13
    var mid = doc.type + '::' + doc.id; // make our KEY
14
    var newdoc = null;
15
    try {
16
      // read the current version of doc from KV, e.g. curDoc
17
      newdoc = cron_bkt[mid];
18
    } catch (e) {} // needed for pre 6.5, note pure 6.5+ deployment returns null sans exception
19
    
20
    var reason = null;
21
    if (!newdoc || newdoc == null) {
22
      reason = "cron document is missing";
23
    } else
24
    if (!newdoc.active) {
25
      reason = "cron document has active = false";
26
    } else
27
    if (!newdoc.dynamic.state || newdoc.dynamic.state !== doc.dynamic.state) {
28
      reason = "cron document wrong dynamic.state expected " + doc.dynamic.state;
29
    } else
30
    if (crc64(doc) !== crc64(newdoc)) {
31
      reason = "cron document changed";
32
    }
33
    if (reason != null) {
34
      if (!newdoc || newdoc == null || newdoc.verbose.scheduler >= 1) {
35
        log('OnUpdate help: X stopping schedule because ' + reason + ',', newdoc)
36
        return;
37
      }
38
    }
39
 
          
40
    newdoc.dynamic.state = "rearm";
41
    // cron_bkt[mid] = newdoc;
42
    if (!tryBucketKvWriteWithLog('OnUpdate help: F', mid, newdoc)) {
43
      // Failed to write newdoc to cron_bkt[key] the error has been logged
44
      // and there is nothing more we can do.
45
      return;
46
    }
47
 
          
48
    if (newdoc.verbose.scheduler >= 1) {
49
      log('OnUpdate help: A ' + mid + ' mutation #2 to doc to force schedule rearm');
50
    }
51
    if (newdoc.verbose.scheduler >= 3) {
52
      log('OnUpdate help: B ' + mid + ',', newdoc);
53
    }
54
  } catch (e) {
55
    log('OnUpdate help: E ' + meta.id + ', Error exception:', e);
56
  }
57
}
58
 
          
59
function tryBucketKvWriteWithLog(tag, key, doc) {
60
  var success = false;
61
  var tries = 0;
62
  while (tries < 10) {
63
    tries++;
64
    try {
65
      // critical that the below succeeds, because if it doesn't the cron cycle will break
66
      cron_bkt[key] = doc;
67
      success = true;
68
      break;
69
    } catch (e) {
70
      log(tag + ' ' + key + ' WARN failed to update KV tries ' + tries, e);
71
    }
72
  }
73
  if (!success) {
74
    log(tag + ' ' + +key + ' FATAL could not update KV cron cycle, tried ' + tries + ', stoping ' + curDoc.action);
75
  }
76
  return success;
77
}



The Helper Function Needs Some Utilities

These utilities provide a fix for 6.5.X growing bucket ops by ensuring an Eventing timer is fired on every vBucket in a timely fashion.

Java
 




xxxxxxxxxx
1
79


 
1
// FIXUP: ADDIN FUNCTON
2
// fix for 6.5.X growing bucket ops
3
function upsertOneDocPerBucket(doc, meta) {
4
 
          
5
    var crcTable = makeCRC32Table();
6
 
          
7
    // make one doc per bucket
8
    var isVerbose = 0;
9
    var isMacOS = false; // would be nice if this was an exposed constant in Eventing
10
    var numvbs = 1024;   // default is linux/PC
11
    if (isMacOS) {
12
        numvbs = 64;
13
    }
14
 
          
15
    var beg = (new Date).getTime();
16
    var result = getKeysToCoverAllPartitions(crcTable, "_tmp_vbs:", numvbs);
17
 
          
18
    for (var vb=0; vb<numvbs; vb++) {
19
        // brute force to fit a key prefix into a vBucket
20
         var tst = result[vb];
21
        if (isVerbose > 1  || (isVerbose == 1) && (vb < 3 || vb > numvbs -4)) {
22
            log("KEY: " + tst);
23
        } else {
24
            if (vb == 5) console.log("\t*\n\t*\n\t*");
25
        }
26
        // update the items to trigger a mutation for our PRIMARY fucntion
27
        cron_bkt[tst] = { "type": "_tmp_vbs", "vb": vb, "ts_millis": beg, "key": tst };
28
    }
29
    var end = (new Date).getTime();
30
    log("seeding one doc to each vBucket in primary_bucket alias (took " + (end - beg) + " mililis)");
31
}
32
 
          
33
// FIXUP: ADDIN FUNCTON
34
// fix for 6.5.X growing bucket ops
35
function showHex(n) {
36
    return n.toString(16);
37
}
38
 
          
39
// FIXUP: ADDIN FUNCTON
40
// fix for 6.5.X growing bucket ops
41
function makeCRC32Table() {
42
    var crcTable = [];
43
    var c;
44
    for(var n =0; n < 256; n++){
45
        c = n;
46
        for(var k =0; k < 8; k++){
47
            c = ((c&1) ? (0xEDB88320 ^ (c >>> 1)) : (c >>> 1));
48
        }
49
        crcTable[n] = c;
50
    }
51
    return crcTable;
52
}
53
 
          
54
// FIXUP: ADDIN FUNCTON
55
// fix for 6.5.X growing bucket ops
56
function crc32(crcTable,str) {
57
    var crc = 0 ^ (-1);
58
    for (var i = 0; i < str.length; i++ ) {
59
        crc = (crc >>> 8) ^ crcTable[(crc ^ str.charCodeAt(i)) & 0xFF];
60
    }
61
    return (crc ^ (-1)) >>> 0;
62
}
63
 
          
64
// FIXUP: ADDIN FUNCTON
65
// fix for 6.5.X growing bucket ops
66
function getKeysToCoverAllPartitions(crcTable,keyPrefix,partitionCount) {
67
    var result = [];
68
    var remaining = partitionCount;
69
    for (var i = 0; remaining > 0; i++) {
70
      var key = keyPrefix + i;
71
      var rv = (crc32(crcTable,key) >> 16) & 0x7fff;
72
      var actualPartition = rv & partitionCount - 1;
73
      if (!result[actualPartition] || result[actualPartition] === undefined) {
74
        result[actualPartition] = key;
75
        remaining--;
76
      }
77
    }
78
    return result;
79
}



Now Let’s Deploy the Two Eventing Functions

We reviewed a lot code and the design of the initial scheduler, now it’s time to see everything working together. Remember for this example, three buckets travel-sample (a sample default data set), metadata, (the metadata bucket is a scratchpad for Eventing and can be shared with other Eventing functions), and finally the crondata (which holds our cron schedules). The travel-sample bucket has a size of 100MB and the other two buckets metadata and crondata should both have a size of 200MB and already exist as per the directions in “Prerequisites”.

  • Verify your current bucket configuration by accessing the Couchbase Web Console > Buckets page:Eventing cron update

To deploy the Eventing Function “cron_impl_2func_651” you can follow one of two methods:

  • Basic complexity, Method #1 Download/Import
  • Medium complexity, Method #2 Manually Add Function, Cut-n-Paste JavaScript

Method #1 Download/Import

Import the 1st Function "cron_impl_2func_651"

Download the first Eventing Function with all the required settings, Right-click on the following link and choose Save Link As to download the file cron_impl_2func_651.json onto your local file system. From the Couchbase Web Console > Eventing page, click IMPORT, navigate to the file cron_impl_2func_651.json, select it and open it. The ADD FUNCTION dialog appears. In the ADD FUNCTION dialog, for individual Function elements provide the below information. Note the JSON file cron_impl_2func_651.json will pre-configure all settings correctly for this example:

  • For the Source Bucket drop-down, verify it is set to crondata.
  • For the Metadata Bucket drop-down, verify it is set to metadata.
  • Verify that cron_impl_2func_651 is the name of the Function you are creating in the Function Name text-box.
  • [Optional Step] Enter text A cron like scheduler part 1, in the Description text-box.
  • For the Settings option, use the default values.
  • For the Bindings option, verify that two bindings exist.
  • For the binding, the "bucket alias", specifies cron_bkt as the "alias name" of the bucket, and select crondata as the associated bucket, and the mode should be "read and write".
  • For the binding, the "bucket alias", specifies ts_bkt as the "alias name" of the bucket, and select travel-sample as the associated bucket, and the mode should be "read and write".
  • Your settings in the dialog should look like the following: Eventing cron update
  • After verifying all the required information in the ADD FUNCTION dialog, click Next: Add Code. The cron_impl_2func_651 dialog appears (with the JavaScript code pre-loaded). Eventing cron update
  • To return to the Eventing screen, click the '< back to Eventing' link (below the editor) or click the Eventing tab.

Import the 2nd Function "cron_impl_2func_651_help"

Download the second Eventing Function with all the required settings, Right-click on the following link and choose Save Link As to download the file cron_impl_2func_651_help.json onto your local file system. From the Couchbase Web Console > Eventing page, click IMPORT, navigate to the file cron_impl_2func_651_help.json, select it and open it. The ADD FUNCTION dialog appears. In the ADD FUNCTION dialog, for individual Function elements provide the below information. Note the JSON file cron_impl_2func_651_help.json will pre-configure all settings correctly for this example:

  • For the Source Bucket drop-down, verify it is set to crondata.
  • For the Metadata Bucket drop-down, verify it is set to metadata.
  • Verify that cron_impl_2func_651_help is the name of the Function you are creating in the Function Name text-box.
  • [Optional Step] Enter text A cron like scheduler helper part 1, in the Description text-box.
  • For the Settings option, use the default values.
  • For the Bindings option, verify that only one binding exists.
  • For the binding, the "bucket alias", specifies cron_bkt as the "alias name" of the bucket, and select crondata as the associated bucket, and the mode should be "read and write".
  • Your settings in the dialog should look like the following: Eventing cron update
  • After verifying all the required information in the ADD FUNCTION dialog, click Next: Add Code. The cron_impl_2func_651_help dialog appears (with the JavaScript code pre-loaded). Eventing cron update
  • To return to the Eventing screen, click the '< back to Eventing' link (below the editor) or click the Eventing tab.

Method #2 Manually Add Function, Cut-n-Paste JavaScript

Manually Create "cron_impl_2func_651"

To add the first Eventing function from the Couchbase Web Console > Eventing page, click ADD FUNCTION, to add a new Function. The ADD FUNCTION dialog appears. In the ADD FUNCTION dialog, for individual Function elements provide the below information:

  • For the Source Bucket drop-down, set to crondata.
  • For the Metadata Bucket drop-down, set to metadata.
  • Make cron_impl_2func_651 is the name of the Function you are creating in the Function Name text-box.
  • [Optional Step] Enter text A cron like scheduler part 1, in the Description text-box.
  • For the Settings option, use the default values.
  • For the Bindings option, create two bindings:
  • For the binding, the "bucket alias", specifies cron_bkt as the "alias name" of the bucket, and select crondata as the associated bucket, and the mode should be "read and write".
  • For the binding, the "bucket alias", specifies ts_bkt as the "alias name" of the bucket, and select travel-sample as the associated bucket, and the mode should be "read and write".
  • After configuring your settings your dialog should look like this: Eventing cron update
  • After providing all the required information in the ADD FUNCTION dialog, click Next: Add Code. The cron_impl_2func_651 dialog appears. The cron_impl_2func_651 dialog initially contains a placeholder code block. You will substitute your actual cron_impl_2func_651 code in this block.Eventing cron update

  • Copy the following Eventing Function JavaScript source (618 lines) and paste it in the placeholder code block of cron_impl_2func_651
    Java
     




    xxxxxxxxxx
    1
    618


     
    1
    /* 
    2
    Function "cron_impl_2func_651" also requires "cron_impl_2func_651_help"
    3
     
                
    4
    Create a basic cron system using Eventing allows a recurring function to execute activity at a 
    5
    specified time every day, hour, min, 30 sec., and 15 sec.  We use a bucket called 'crondata' 
    6
    aliased to 'cron_bkt' which can hold one or more control documents of type = "recurring_event".
    7
     
                
    8
    The following uses of timers do not work reliably in Couchbase versions 6.5 and 6.5.1
    9
      a) scheduling an Eventing timer within a timer's callback 
    10
      b) overwriting an existing timer by id
    11
      
    12
    In addition the ability to cancel a timer does not exist in Couchbase versions 6.5 and 6.5.1
    13
      
    14
    For this example, we supply one real user function that builds a recurring 'static' cache document
    15
    from bucket `travel-sample` via an N1QL query and save the result back to `travel-sample` via
    16
    the alais 'ts_bkt'.  This JavaScript function is doCronActionA(), we also provide two placeholders
    17
    doCronActionB() and doCronActionC() for additional experimentation.
    18
     
                
    19
    Test Doc:
    20
       {
    21
            "type":"recurring_event",   // The KEY will be <<type>>::<<id>>
    22
            "id":1,                     //
    23
            "hour":14,                  // The hour of the day 0-23, *, *2X, *4X to trigger
    24
            "min":54,                   // The minute in the hour 0-59, *, *2X, *4X to trigger
    25
            "action":"doCronActionA",   // What function to run on the trigger
    26
            "active":false,             // Flag to arm or disable this schedule
    27
            "verbose" : {
    28
              "user_func":2,            // Logging level for the action logic : 0=none, etc. etc.
    29
              "scheduler":3             // Logging level for the cron logic   : 0=none, etc. etc.
    30
            },
    31
            "dynamic" : {
    32
              "state":"arm",            // States "arm"|"rearm"|"pending" if any value but "pending" start a schedule
    33
              "next_sched": 0,          // Number of seconds since epoch to next desired schedule
    34
              "prev_sched": 0,          // Number of seconds since epoch for previous schedule
    35
              "prev_etime": 0,          // Number of seconds since epoch for previous schedule actual exec time
    36
              "prev_delay": 0,          // Number of seconds that the timer was delayed from the schedule
    37
              "prev_atime": 0           // Number of seconds taken by the user 'action'
    38
            }
    39
        }
    40
        
    41
        INSERT INTO `crondata` (KEY,VALUE) VALUES ("recurring_event::1", 
    42
        {
    43
            "type":"recurring_event",
    44
            "id":1,
    45
            "hour":14,
    46
            "min":54,
    47
            "action":"doCronActionA",
    48
            "verbose" : {
    49
              "user_func":2,
    50
              "scheduler":3
    51
            },
    52
            "active":false,
    53
            "dynamic" : {
    54
              "state": "arm",
    55
              "next_sched": 0,
    56
              "prev_sched": 0,
    57
              "prev_etime": 0,
    58
              "prev_delay": 0,
    59
              "prev_atime": 0
    60
            }
    61
        } 
    62
        );
    63
     
                
    64
    Note, you can omit verbose{} and dynamic{} as they will be auto-created by this main Eventing 
    65
    Function "cron_impl_2func_651". If verbose{} is missing the logging levels will default to 
    66
    verbose" : {  "user_func":1, "scheduler":1 }
    67
     
                
    68
        INSERT INTO `crondata` (KEY,VALUE) VALUES ("recurring_event::1", 
    69
        {
    70
            "type":"recurring_event",
    71
            "id":1,
    72
            "hour":14,
    73
            "min":54,
    74
            "action":"doCronActionA",
    75
            "active":false
    76
        } 
    77
        );
    78
     
                
    79
    N1QL : Make an index to query data without specifying keys
    80
        CREATE primary INDEX on `crondata` ;
    81
     
                
    82
    N1QL : Verify or inspect settings in schedule           
    83
        SELECT * FROM `crondata` WHERE type="recurring_event";
    84
     
                
    85
    N1QL : Arm or set active        
    86
        UPDATE `crondata` SET active = true WHERE type="recurring_event" AND id=1 ;
    87
        
    88
    N1QL : Disarm or set inactive   
    89
        UPDATE `crondata` SET active = false WHERE type="recurring_event" AND id=1 ;
    90
        
    91
    N1QL : Adjust time of trigger   
    92
        UPDATE `crondata` SET hour = 11, min = 30 WHERE type="recurring_event" AND id=1 ;
    93
     
                
    94
    N1QL : Adjust logging           
    95
        UPDATE `crondata` SET verbose.user_func = 1,  verbose.scheduler = 0 WHERE type="recurring_event" AND id=1 ;
    96
     
                
    97
    N1QL : Delete the schedule      
    98
        DELETE FROM `crondata` WHERE type="recurring_event" AND id=1 ;
    99
             
    100
    The action field is important it 'should' exist in this Eventing Function note it could be any 
    101
    JavaScript name e.g. MyFunc and you must implement like the example doCronActionA(doc) where
    102
    doc will be the currently active item of type = 'recurring_event' read from the alias bucket
    103
    ‘cron_bkt’ when the timer is fired.  The action JavaScript function should return  either true
    104
    or false used for logging purposes.  If the action does not exist it is an error and a warning
    105
    is logged and the timer is disabled.
    106
     
                
    107
    In Couchbase version 6.5+ to add a new cron like daily function just pause the active handler 
    108
    insert your new function doCronActionB(doc) {...} then Resume the eventing handler.  The nice 
    109
    thing is if a timer was to be fired will the function was paused it will NOT be lost, when you 
    110
    resume the function it will be processed at the next available time slot.  
    111
     
                
    112
    Any change to a control structure will create a new recurring schedule or timer and cancel the 
    113
    current previous schedule this includes changing the verbosity level. The previous timer will 
    114
    continue to run however when executed it will do a Checksum on the current control structure 
    115
    from KV against it’s passed context and if different the Callback will ignore the old schedule.
    116
    This logic could be altered to process immediately if the schedule has expired search for the 
    117
    string "OnUpdate U" in the code below.
    118
    */
    119
     
                
    120
    // ==================
    121
    /* BEG USER FUNCTIONS TO RUN ONCE A DAY, HOUR, OR MINUTE - ANYTHING YOU WANT BELOW */
    122
    function doCronActionA(doc) {
    123
      try {
    124
        // Check that doc has desired values
    125
        if (!doc.type || doc.type !== "recurring_event" || !doc.active || doc.active !== true) return;
    126
        if (doc.verbose.user_func >= 1)
    127
          log(doc.action + ' user action controlled by ' + doc.type + '::' + doc.id);
    128
     
                
    129
        // this is a 6.5 N1QL query (feature not available in GA prior to 6.5)
    130
        // Create an embedded N1QL iterator by issuing a SELECT statement to get the
    131
        // counts of airlines by country.  Make a new document and write it out to KV 
    132
     
                
    133
        // We will use the iterator to create a KV document representing the results of a
    134
        // HARD lengthy embedded N1QL query and write it back to KV, the idea is to keep
    135
        // a calculation up to date once a day such that it that can be read 'quickly' 
    136
        // by other Eventing Functions, other Couchbase services or SDKs.   
    137
     
                
    138
        // Consider if we had 1 million docs in a minute do we really want to use N1QL
    139
        // to recalculate something that is almost static for all 1 million documents, of 
    140
        // course not, so we make an intermediate value that can be read into Eventing
    141
        // and used via a single 'light weight' KV read.
    142
     
                
    143
        var q_iter = SELECT country,
    144
          count( * ) cnt
    145
        FROM `travel-sample`
    146
        WHERE `type` = 'airline'
    147
        GROUP BY country;
    148
     
                
    149
        // loop through the result set and update the map 'accumulate'
    150
        var accumulate = {};
    151
        var idx = 0;
    152
        for (var val of q_iter) {
    153
          if (doc.verbose.user_func >= 2)
    154
            log(doc.action + ' N1QL idx ' + idx + ', country ' + val.country + " cnt " + val.cnt);
    155
          accumulate[val.country] = val.cnt;
    156
          idx++;
    157
        }
    158
        // close out embedded N1QL iterator
    159
        q_iter.close();
    160
     
                
    161
        // Now let’s make a cached KV document representing a HARD length embedded N1QL
    162
        // query and write it back to KV, we need a KEY and a type and id and then we 
    163
        // upsert it into the `travel-sample` bucket.
    164
     
                
    165
        var cachedoc = {};
    166
        cachedoc.type = "cron_cache";
    167
        cachedoc.id = "airlines_by_country";
    168
        cachedoc.date = new Date();
    169
        cachedoc.data = accumulate;
    170
        var ckey = cachedoc.type + '::' + cachedoc.id;
    171
        ts_bkt[ckey] = cachedoc;
    172
        if (doc.verbose.user_func >= 2) {
    173
          log(doc.action + ' upsert to KV with KEY ' + ckey + ' cachedoc ', cachedoc);
    174
        }
    175
      } catch (e) {
    176
        log(doc.action + ' Error exception:', e);
    177
        return false;
    178
      }
    179
      return true;
    180
    }
    181
     
                
    182
    function doCronActionB(doc) {
    183
      try {
    184
        // check that doc has desired values
    185
        if (doc.type !== "recurring_event" || doc.active !== true) return;
    186
        if (doc.verbose.user_func >= 1)
    187
          log(doc.action + ' user action controlled by ' + doc.type + '::' + doc.id);
    188
     
                
    189
        // YOUR LOGIC HERE
    190
     
                
    191
      } catch (e) {
    192
        log(doc.action + ' Error exception:', e);
    193
        return false;
    194
      }
    195
      return true;
    196
    }
    197
     
                
    198
    function doCronActionC(doc) {
    199
      try {
    200
        // check that doc has desired values
    201
        if (doc.type !== "recurring_event" || doc.active !== true) return;
    202
        if (doc.verbose.user_func >= 1)
    203
          log(doc.action + ' user action controlled by ' + doc.type + '::' + doc.id);
    204
     
                
    205
        // YOUR LOGIC HERE
    206
     
                
    207
      } catch (e) {
    208
        log(doc.action + ' Error exception:', e);
    209
        return false;
    210
      }
    211
      return true;
    212
    }
    213
     
                
    214
    /* END USER FUNCTIONS TO RUN ONCE A DAY, HOUR, OR MINUTE - ANYTHING YOU WANT ABOVE */
    215
    // ==================
    216
     
                
    217
     
                
    218
    // FIXUP: ADDIN FUNCTON
    219
    function noopTimer(context) {
    220
        // fix for 6.5.X growing bucket ops
    221
        try {
    222
            if (context.type === "_tmp_vbs" && context.vb === 0) { 
    223
                // log("noopTimer timers firing, printing only for vBucket 0");
    224
            }
    225
        } catch (e) {
    226
            log("OnUpdate Exception in callback noopTimer:", e);
    227
        }
    228
    }
    229
     
                
    230
    // FIXUP: ADDIN FUNCTON
    231
    function rearmTimer(context) {
    232
        // fix for 6.5.X growing bucket ops
    233
        try {
    234
            if (context.type === "_tmp_vbs" && context.vb === 0) { 
    235
                // Update/touch all docs in the helper_bucket the helper function will then
    236
                // mutate all 1024 of type == vbs_seed (64 on MacOS) to create a recuring cycle.
    237
                // log("noopTimer timer fired all 1024 vBuckets, logging only vb 0", context);
    238
                
    239
                // generate a mutation to re-arm the HELPER function: fix_scan_issue
    240
                // which will in turn make new mutations for this Function
    241
                var cur = cron_bkt[context.key];
    242
                if (cur && cur.ts_millis === context.ts_millis) {
    243
                    // log("rearmTimer update fix_timer_scan_issue::1 in helper_bucket alias only for vBucket 0");
    244
                    var now = new Date();
    245
                    cron_bkt["fix_timer_scan_issue::1"] = { "last_update": now };
    246
                } else {
    247
                    // NOOP we had multiple timer cycles, just let this one quietly stop.
    248
                }
    249
            }
    250
        } catch (e) {
    251
            log("OnUpdate Exception in callback rearmTimer:", e);
    252
        }
    253
    }
    254
     
                
    255
    // FIXUP: ADDIN FUNCTON
    256
    function genNoopTimers(doc, meta, seconds) {
    257
        // fix for 6.5.X growing bucket ops
    258
        try {
    259
            // redundant but play it safe
    260
            if (doc.type === "_tmp_vbs") {
    261
                // Since we are using an different function a timer on all our vBuckets do immeadiately (can take up to 15 seconds)
    262
                // If we used cross bucket recursion to rearm all the timers in a recurring fashion we would add a delay of at least 40 seconds.
    263
                createTimer(noopTimer, new Date(), null, doc);
    264
                if (doc.vb === 0) { 
    265
                    // Update/touch all docs in the helper_bucket the helper function will then
    266
                    // mutate all 1024 of type == vbs_seed (64 on MacOS) to create a recuring cycle.
    267
                    // log("noopTimer timer fired all 1024 vBuckets, logging only vb 0", context);
    268
                
    269
                    // generate a mutation to re-arm the HELPER function: fix_scan_issue
    270
                    // which will in turn make new mutations for this Function
    271
                    
    272
                    // log("genNoopTimers make timer to rearm fix_timer_scan_issue::1");
    273
                    createTimer(rearmTimer, new Date(new Date().getTime() + seconds * 1000), null, doc);
    274
                }
    275
            }
    276
        } catch (e) {
    277
            log("OnUpdate Exception in genNoopTimers:", e);
    278
        }
    279
    }
    280
     
                
    281
    function OnUpdate(doc, meta) {
    282
      // fix for 6.5.X growing bucket ops
    283
      if (doc.type === "_tmp_vbs") genNoopTimers(doc, meta, 30);
    284
      if (!cron_bkt["fix_timer_scan_issue::1"]) {
    285
          cron_bkt["fix_timer_scan_issue::1"] = {};
    286
      }
    287
        
    288
      try {
    289
        // Check if further analysis is needed we only trigger on an active recurring_event 
    290
        if (doc.type !== "recurring_event" || doc.active !== true) return;
    291
     
                
    292
        var update_doc = false;
    293
        if (!doc.dynamic) {
    294
          // Add if missing doc.dynamic with defaults
    295
          doc.dynamic = {
    296
            "state": "arm",
    297
            "next_sched": 0,
    298
            "prev_sched": 0,
    299
            "prev_etime": 0,
    300
            "prev_delay": 0,
    301
            "prev_atime": 0
    302
          };
    303
          // we need to update the document once we have the next schedule
    304
          update_doc = true;
    305
        }
    306
        if (!doc.verbose) {
    307
          // Add if missing doc.dynamic with defaults
    308
          doc.verbose = {
    309
            "user_func": 1,
    310
            "scheduler": 1
    311
          };
    312
          // we need to update the document once we have the next schedule
    313
          update_doc = true;
    314
        }
    315
        // Do not process dynamic.state pending
    316
        if (!doc.dynamic || !doc.dynamic.state || doc.dynamic.state === "pending") return;
    317
     
                
    318
        var mid = doc.type + "::" + doc.id; // this is the same as meta.id or the KEY
    319
        var hour = doc.hour;
    320
        var min = doc.min;
    321
     
                
    322
        // Do an eval check the JavaScript function exists. The eval occurs in a common 
    323
        // utility function shared with RecurringCallback
    324
        if (!verifyFunctionExistsViaEval(doc, mid)) {
    325
          // doc.action did not exist, we have already logged the issue
    326
          return;
    327
        }
    328
     
                
    329
        // Get the next valid execution time
    330
        var date_timer = getNextRecurringDate(hour, min);
    331
        var next_sched = Math.round(date_timer.getTime() / 1000);
    332
        if (!update_doc && next_sched !== doc.dynamic.next_sched) {
    333
          // the next_sched should be the same as the setting from the helper application, however
    334
          // if we undeploy/deploy or pause/resume we might haver to reschedule to the next time slot
    335
          log('OnUpdate U ' + mid + ' calculated next_sched !== doc.dynamic.next_sched, delta ' +
    336
            (next_sched - doc.dynamic.next_sched) + ', reschedule');
    337
          update_doc = true;
    338
        }
    339
     
                
    340
        if (update_doc) {
    341
          // this mutation is recursive and will be suppressed, we ensure we have a dynamic structure
    342
          doc.dynamic.next_sched = next_sched;
    343
     
                
    344
          // rather then the call a function, to trap and retry if there is a resource issue
    345
          // cron_bkt[mid] = doc;
    346
          if (!tryBucketKvWriteWithLog('OnUpdate F', mid, doc)) {
    347
            // Failed to write doc to cron_bkt[key] the error has been logged
    348
            // and there is nothing more we can do.
    349
            return;
    350
          }
    351
        }
    352
     
                
    353
        // Schedule an Eventing timer
    354
        var timer_id = createTimer(Callback, date_timer, null, doc);
    355
        if (doc.verbose.scheduler >= 1) {
    356
          log('OnUpdate A ' + mid + ' rcv mutation (initial or rearm) schedule timer at ' +
    357
            toLocalISOTime(date_timer));
    358
        }
    359
        if (doc.verbose.scheduler >= 2) {
    360
          log('OnUpdate B ' + mid + ' recurring timer was created, timer_id ' + timer_id);
    361
        }
    362
      } catch (e) {
    363
        log('OnUpdate E ' + meta.id + ', Error exception:', e);
    364
      }
    365
    }
    366
     
                
    367
    function getNextRecurringDate(hour_str, min_str) {
    368
      // Note Javascript Dates are in milliseconds
    369
      var date_now = new Date();
    370
      var date_ret = new Date();
    371
      var hour;
    372
      var min;
    373
     
                
    374
      try {
    375
        hour = parseInt(hour_str);
    376
      } catch (e) {}
    377
      try {
    378
        min = parseInt(min_str);
    379
      } catch (e) {}
    380
     
                
    381
      // Note, this is only a simplistic partial 'crontab' syntax with some slight extensions
    382
      // it allows once a day, once an hour, once a minute.  It also contains some non-standard 
    383
      // syntax to provide the ability to execute twice a minute or four times a minute.
    384
     
                
    385
      if (hour_str === '*4X' && min_str === '*4X') {
    386
        // once every 15 seconds or four times a minute
    387
        date_ret.setMilliseconds(0);
    388
        date_ret.setSeconds(15);
    389
        while (date_ret.getTime() < date_now.getTime()) {
    390
          date_ret.setSeconds(date_ret.getSeconds() + 15);
    391
        }
    392
        return date_ret;
    393
      } else
    394
      if (hour_str === '*2X' && min_str === '*2X') {
    395
        // once every 30 seconds or twice a minute
    396
        date_ret.setMilliseconds(0);
    397
        date_ret.setSeconds(30);
    398
        while (date_ret.getTime() < date_now.getTime()) {
    399
          date_ret.setSeconds(date_ret.getSeconds() + 30);
    400
        }
    401
        return date_ret;
    402
      } else
    403
      if (hour_str === '*' && min_str === '*') {
    404
        // once a minute 
    405
        date_ret.setMilliseconds(0);
    406
        date_ret.setSeconds(0);
    407
        date_ret.setMinutes(date_ret.getMinutes() + 1);
    408
      } else
    409
      if (hour_str !== '*' && isNaN(hour) === false && min_str === '*') {
    410
        // once a minute only for a given hour
    411
        date_ret.setMilliseconds(0);
    412
        date_ret.setSeconds(0);
    413
        date_ret.setMinutes(date_ret.getMinutes() + 1);
    414
        if (date_ret.getTime() < date_now.getTime()) {
    415
          date_ret.setHours(hour);
    416
        }
    417
        if (date_ret.getTime() > date_now.getTime()) {
    418
          date_ret.setDate(date_ret.getDate() + 1);
    419
          date_ret.setSeconds(0);
    420
          date_ret.setMinutes(0);
    421
          date_ret.setHours(hour);
    422
        }
    423
      } else
    424
      if (hour_str === '*' && min_str !== '*' && isNaN(min) === false) {
    425
        // once a hour at a given minute
    426
        date_ret.setMilliseconds(0);
    427
        date_ret.setSeconds(0);
    428
        date_ret.setMinutes(min);
    429
        // schedule for next hour
    430
        date_ret.setHours(date_ret.getHours() + 1);
    431
      } else
    432
      if (isNaN(hour) === false && isNaN(min) === false) {
    433
        // once a day for a given hour and a given minute 
    434
        date_ret.setMilliseconds(0);
    435
        date_ret.setSeconds(0);
    436
        date_ret.setMinutes(min);
    437
        date_ret.setHours(hour);
    438
        if (date_ret.getTime() < date_now.getTime()) {
    439
          // schedule for tomorrow
    440
          date_ret.setDate(date_ret.getDate() + 1);
    441
        }
    442
      } else {
    443
        log('getNextRecurringDate illegal input hour_str <' +
    444
          hour_str + '> min_str <' + min_str + '>');
    445
        throw new Error('getNextRecurringDate illegal input hour_str <' +
    446
          hour_str + '> min_str <' + min_str + '>');
    447
        return null;
    448
      }
    449
      return date_ret;
    450
    }
    451
     
                
    452
    function verifyFunctionExistsViaEval(curDoc, id) {
    453
      var result = false;
    454
      try {
    455
        // check for function if missing this is invalid return result
    456
        result = eval("typeof " + curDoc.action + " === 'function';");
    457
        if (result === false) {
    458
          if (curDoc.verbose.scheduler >= 1)
    459
            log("Warn/Disable (No Action and No Re-Arm), because required 'action' of " +
    460
              curDoc.action + "(doc) does not exist, id is", id);
    461
          return result;
    462
        }
    463
      } catch (e) {
    464
        log('verifyFunctionExistsViaEval Error exception:', e);
    465
      }
    466
      return result;
    467
    }
    468
     
                
    469
    function toNumericFixed(number, precision) {
    470
      var multi = Math.pow(10, precision);
    471
      return Math.round((number * multi).toFixed(precision + 1)) / multi;
    472
    }
    473
     
                
    474
    function toLocalISOTime(d) {
    475
      var tzoffset = (new Date()).getTimezoneOffset() * 60000; //offset in milliseconds
    476
      return (new Date(d.getTime() - tzoffset)).toISOString().slice(0, -1);
    477
    }
    478
     
                
    479
    function tryBucketKvWriteWithLog(tag, key, doc) {
    480
      var success = false;
    481
      var tries = 0;
    482
      while (tries < 10) {
    483
        tries++;
    484
        try {
    485
          // critical that the below succeeds, because if it doesn't the cron cycle will break
    486
          cron_bkt[key] = doc;
    487
          success = true;
    488
          break;
    489
        } catch (e) {
    490
          log(tag + ' ' + key + ' WARN failed to update KV tries ' + tries, e);
    491
        }
    492
      }
    493
      if (!success) {
    494
        log(tag + ' ' + +key + ' FATAL could not update KV cron cycle, tried ' + tries + ', stoping ' + curDoc.action);
    495
      }
    496
      return success;
    497
    }
    498
     
                
    499
    function Callback(doc) {
    500
      try {
    501
        var fired_at = new Date();
    502
     
                
    503
        // Check if further analysis is needed we only trigger on a recurring_event that is active
    504
        if (doc.type !== "recurring_event") return;
    505
        // doc must have 'action', 'dynamic {}', verbose {}, dynamic.state
    506
        if (!doc.action || !doc.dynamic || !doc.verbose || !doc.dynamic.state) return;
    507
        // process any doc.dynamic.state BUT pending 
    508
        if (doc.dynamic.state === "pending") return;
    509
     
                
    510
        // ==================
    511
        // Check if still active
    512
     
                
    513
        // We make sure that in KV the 'doc' still exists and that it is still active if not just 
    514
        // return thus skipping the action and not Re-arming the timer. Note `travel-sample` is 
    515
        // aliased to the map 'cron_bkt
    516
     
                
    517
        var mid = doc.type + '::' + doc.id; // make our KEY
    518
        var curDoc = null;
    519
        try {
    520
          // read the current version of doc from KV, e.g. curDoc
    521
          curDoc = cron_bkt[mid];
    522
        } catch (e) {} // needed for pre 6.5, note pure 6.5+ deployment returns null sans exception
    523
     
                
    524
        var reason = null;
    525
        if (!curDoc || curDoc === null) {
    526
          reason = "cron document is missing";
    527
        } else
    528
        if (!curDoc.active) {
    529
          reason = "cron document has active = false";
    530
        } else
    531
        if (!curDoc.dynamic.state || curDoc.dynamic.state !== doc.dynamic.state) {
    532
          reason = "cron document wrong dynamic.state expected " + doc.dynamic.state;
    533
        } else
    534
        if (crc64(doc) !== crc64(curDoc)) {
    535
          reason = "cron document changed";
    536
        }
    537
     
                
    538
        if (reason !== null) {
    539
          if (!curDoc || curDoc === null || curDoc.verbose.scheduler >= 1) {
    540
            log('Callback X ' + mid + " ignore/stop this timer's schedule because " + reason);
    541
          }
    542
          if (!curDoc || curDoc === null || curDoc.verbose.scheduler >= 4) {
    543
            log('Callback Y ' + mid + ' timer doc', doc);
    544
            log('Callback Z ' + mid + ' KV curDoc', curDoc);
    545
          }
    546
          return;
    547
        }
    548
     
                
    549
        // ==================
    550
        // Verify user routine exists and if so eval it 
    551
     
                
    552
        // Assume curDoc.action contains something like "doCronActionA" and we have a function in 
    553
        // this handler like "doCronActionA(doc)". Below we use curDoc as the end user should be 
    554
        // able to alter the eval'd JavaScript function.  We will execute two (2) evals.
    555
     
                
    556
        // First eval check the JavaScript function exists.  The eval occurs in a common 
    557
        // utility function shared with RecurringCallback
    558
        if (!verifyFunctionExistsViaEval(curDoc, mid)) {
    559
          // curDoc.action did not exist, we have already logged the issue
    560
          return;
    561
        }
    562
     
                
    563
        // Second eval execute and process the user function we execute the defined function 
    564
        // with an argument of curDoc
    565
        var beg_act = new Date();
    566
        var result = null;
    567
        eval("result = " + curDoc.action + "(curDoc);");
    568
        var end_act = new Date();
    569
        var atime_ms = end_act.getTime() - beg_act.getTime();
    570
     
                
    571
        if (curDoc.verbose.scheduler >= 2)
    572
          log('Callback R ' + mid + ' action took ' + toNumericFixed((atime_ms / 1000), 3) +
    573
            ' sec., returned ' + result);
    574
     
                
    575
        // ==================
    576
        // Calculate next time and mutate the control document for our our helper function
    577
        // which will create another mutation such that OnUpdate of this function will pick
    578
        // it up and generate the timer (avoids the MB-38554 issue).
    579
     
                
    580
        var hour = curDoc.hour;
    581
        var min = curDoc.min;
    582
        var date_timer = getNextRecurringDate(hour, min);
    583
     
                
    584
        curDoc.dynamic.prev_delay =
    585
          toNumericFixed(((fired_at.getTime() / 1000) - curDoc.dynamic.next_sched), 3);
    586
        curDoc.dynamic.prev_sched = curDoc.dynamic.next_sched;
    587
        curDoc.dynamic.prev_etime = Math.round(fired_at.getTime() / 1000);
    588
        curDoc.dynamic.prev_atime = toNumericFixed((atime_ms / 1000), 3);
    589
     
                
    590
        curDoc.dynamic.state = "pending";
    591
        curDoc.dynamic.next_sched = Math.round(date_timer.getTime() / 1000);
    592
     
                
    593
        // rather then the call a function, to trap and retry if there is a resource issue
    594
        // cron_bkt[mid] = curDoc;
    595
        if (!tryBucketKvWriteWithLog('Callback F', mid, curDoc)) {
    596
          // Failed to write curDoc to cron_bkt[key] the error has been logged
    597
          // and there is nothing more we can do.
    598
          return;
    599
        }
    600
     
                
    601
        if (curDoc.verbose.scheduler >= 1) {
    602
          log('Callback A ' + mid + ' gen mutation #1 to doc to force schedule rearm at ' +
    603
            toLocalISOTime(date_timer));
    604
        }
    605
        if (curDoc.verbose.scheduler >= 2) {
    606
          log('Callback B ' + mid + ' sched ' + curDoc.dynamic.prev_sched +
    607
            ', actual ' + curDoc.dynamic.prev_etime +
    608
            ', delay ' + curDoc.dynamic.prev_delay +
    609
            ', took ' + curDoc.dynamic.prev_atime);
    610
        }
    611
        if (curDoc.verbose.scheduler >= 3) {
    612
          log('Callback C ' + mid + ' curDoc', curDoc);
    613
        }
    614
      } catch (e) {
    615
        var mid = doc.type + '::' + doc.id; // make our KEY
    616
        log('Callback E ' + mid + ' Error exception:', e);
    617
      }
    618
    }


  • After pasting, the screen appears as displayed below:Eventing cron update
  • Click Save.
  • To return to the Eventing screen, click the '< back to Eventing' link (below the editor) or click the Eventing

Manually Create "cron_impl_2func_651_help"

To add the second Eventing function from the Couchbase Web Console > Eventing page, click ADD FUNCTION, to add a new Function. The ADD FUNCTION dialog appears. In the ADD FUNCTION dialog, for individual Function elements provide the below information:

  • For the Source Bucket drop-down, set to crondata.
  • For the Metadata Bucket drop-down, set to metadata.
  • Make cron_impl_2func_651_help is the name of the Function you are creating in the Function Name text-box.
  • [Optional Step] Enter text A cron like scheduler helper part 1, in the Description text-box.
  • For the Settings option, use the default values.
  • For the Bindings option, create one binding:
  • For the binding, the "bucket alias", specifies cron_bkt as the "alias name" of the bucket, and select crondata as the associated bucket, and the mode should be "read and write".
  • After configuring your settings your dialog should look like this: Eventing cron update
  • After providing all the required information in the ADD FUNCTION dialog, click Next: Add Code. The cron_impl_2func_651_help dialog appears. The cron_impl_2func_651_help dialog initially contains a placeholder code block. You will substitute your actual cron_impl_2func_651_help code in this block.Eventing cron update
  • Copy the following Eventing Function JavaScript source (187 lines) and paste it in the placeholder code block of cron_impl_2func_651_help
    Java
     




    xxxxxxxxxx
    1
    187


     
    1
    /*
    2
    Function "cron_impl_2func_651_help" also requires "cron_impl_2func_651"
    3
     
                
    4
    Test Doc:
    5
       {
    6
            "type":"recurring_event",   // The KEY will be <<type>>::<<id>>
    7
            "id":1,                     //
    8
            "hour":14,                  // The hour of the day 0-23, *, *2X, *4X to trigger
    9
            "min":54,                   // The minute in the hour 0-59, *, *2X, *4X to trigger
    10
            "action":"doCronActionA",   // What function to run on the trigger
    11
            "active":false,             // Flag to arm or disable this schedule
    12
            "verbose" : {
    13
              "user_func":2,            // Logging level for the action logic : 0=none, etc. etc.
    14
              "scheduler":3             // Logging level for the cron logic   : 0=none, etc. etc.
    15
            },
    16
            "dynamic" : {
    17
              "state":"arm",            // States "arm"|"rearm"|"pending" if any value but "pending" start a schedule
    18
              "next_sched": 0,          // Number of seconds since epoch to next desired schedule
    19
              "prev_sched": 0,          // Number of seconds since epoch for previous schedule
    20
              "prev_etime": 0,          // Number of seconds since epoch for previous schedule actual exec time
    21
              "prev_delay": 0,          // Number of seconds that the timer was delayed from the schedule
    22
              "prev_atime": 0           // Number of seconds taken by the user 'action'
    23
            }
    24
        }
    25
        
    26
    Note, you can omit verbose{} and dynamic{} as they will be autocreated by the main Eventing 
    27
    Function "cron_impl_2func_651". If verbose{} is missing the logging levels will default to 
    28
    verbose" : {  "user_func":1, "scheduler":1 }
    29
    */
    30
     
                
    31
    function tryBucketKvWriteWithLog(tag, key, doc) {
    32
      var success = false;
    33
      var tries = 0;
    34
      while (tries < 10) {
    35
        tries++;
    36
        try {
    37
          // critical that the below succeeds, because if it doesn't the cron cycle will break
    38
          cron_bkt[key] = doc;
    39
          success = true;
    40
          break;
    41
        } catch (e) {
    42
          log(tag + ' ' + key + ' WARN failed to update KV tries ' + tries, e);
    43
        }
    44
      }
    45
      if (!success) {
    46
        log(tag + ' ' + +key + ' FATAL could not update KV cron cycle, tried ' + tries + ', stoping ' + curDoc.action);
    47
      }
    48
      return success;
    49
    }
    50
     
                
    51
    function OnUpdate(doc, meta) {
    52
      // fix for 6.5.X growing bucket ops
    53
      if (meta.id.startsWith("fix_timer_scan_issue:")) upsertOneDocPerBucket(doc, meta);
    54
      
    55
      try {
    56
        // Check that doc has desired values
    57
        if (!doc.type || doc.type !== "recurring_event" || !doc.active || doc.active != true) return;
    58
        // doc must have 'action', 'dynamic {}', verbose {}, dynamic.state
    59
        if (!doc.action || !doc.dynamic || !doc.verbose || !doc.dynamic.state) return;
    60
        // Only process state pending this will only exist for a 'breif' time
    61
        if (doc.dynamic.state !== "pending") return;
    62
     
                
    63
        var mid = doc.type + '::' + doc.id; // make our KEY
    64
        var newdoc = null;
    65
        try {
    66
          // read the current version of doc from KV, e.g. curDoc
    67
          newdoc = cron_bkt[mid];
    68
        } catch (e) {} // needed for pre 6.5, note pure 6.5+ deployment returns null sans exception
    69
        
    70
        var reason = null;
    71
        if (!newdoc || newdoc == null) {
    72
          reason = "cron document is missing";
    73
        } else
    74
        if (!newdoc.active) {
    75
          reason = "cron document has active = false";
    76
        } else
    77
        if (!newdoc.dynamic.state || newdoc.dynamic.state !== doc.dynamic.state) {
    78
          reason = "cron document wrong dynamic.state expected " + doc.dynamic.state;
    79
        } else
    80
        if (crc64(doc) !== crc64(newdoc)) {
    81
          reason = "cron document changed";
    82
        }
    83
        if (reason != null) {
    84
          if (!newdoc || newdoc == null || newdoc.verbose.scheduler >= 1) {
    85
            log('OnUpdate help: X stopping schedule because ' + reason + ',', newdoc)
    86
            return;
    87
          }
    88
        }
    89
     
                
    90
        newdoc.dynamic.state = "rearm";
    91
        // cron_bkt[mid] = newdoc;
    92
        if (!tryBucketKvWriteWithLog('OnUpdate help: F', mid, newdoc)) {
    93
          // Failed to write newdoc to cron_bkt[key] the error has been logged
    94
          // and there is nothing more we can do.
    95
          return;
    96
        }
    97
     
                
    98
        if (newdoc.verbose.scheduler >= 1) {
    99
          log('OnUpdate help: A ' + mid + ' mutation #2 to doc to force schedule rearm');
    100
        }
    101
        if (newdoc.verbose.scheduler >= 3) {
    102
          log('OnUpdate help: B ' + mid + ',', newdoc);
    103
        }
    104
      } catch (e) {
    105
        log('OnUpdate help: E ' + meta.id + ', Error exception:', e);
    106
      }
    107
    }
    108
     
                
    109
    // FIXUP: ADDIN FUNCTON
    110
    // fix for 6.5.X growing bucket ops
    111
    function upsertOneDocPerBucket(doc, meta) {
    112
     
                
    113
        var crcTable = makeCRC32Table();
    114
     
                
    115
        // make one doc per bucket
    116
        var isVerbose = 0;
    117
        var isMacOS = false; // would be nice if this was an exposed constant in Eventing
    118
        var numvbs = 1024;   // default is linux/PC
    119
        if (isMacOS) {
    120
            numvbs = 64;
    121
        }
    122
     
                
    123
        var beg = (new Date).getTime();
    124
        var result = getKeysToCoverAllPartitions(crcTable, "_tmp_vbs:", numvbs);
    125
     
                
    126
        for (var vb=0; vb<numvbs; vb++) {
    127
            // brute force to fit a key prefix into a vBucket
    128
             var tst = result[vb];
    129
            if (isVerbose > 1  || (isVerbose == 1) && (vb < 3 || vb > numvbs -4)) {
    130
                log("KEY: " + tst);
    131
            } else {
    132
                if (vb == 5) console.log("\t*\n\t*\n\t*");
    133
            }
    134
            // update the items to trigger a mutation for our PRIMARY fucntion
    135
            cron_bkt[tst] = { "type": "_tmp_vbs", "vb": vb, "ts_millis": beg, "key": tst };
    136
        }
    137
        var end = (new Date).getTime();
    138
        log("seeding one doc to each vBucket in primary_bucket alias (took " + (end - beg) + " mililis)");
    139
    }
    140
     
                
    141
    // FIXUP: ADDIN FUNCTON
    142
    // fix for 6.5.X growing bucket ops
    143
    function showHex(n) {
    144
        return n.toString(16);
    145
    }
    146
     
                
    147
    // FIXUP: ADDIN FUNCTON
    148
    // fix for 6.5.X growing bucket ops
    149
    function makeCRC32Table() {
    150
        var crcTable = [];
    151
        var c;
    152
        for(var n =0; n < 256; n++){
    153
            c = n;
    154
            for(var k =0; k < 8; k++){
    155
                c = ((c&1) ? (0xEDB88320 ^ (c >>> 1)) : (c >>> 1));