Using Libcouchbase/C SDK To Run Transactions With Couchbase 7.0
Couchbase Server 7.0 now supports N1QL transactions. Read this article to learn how to write a C program that performs a set of transactions on a single node.
Join the DZone community and get the full member experience.
Join For FreeLet's try writing a C program that performs a set of transactions on a single node.
Step 1: Call the Program
We first decide how to call the program. The inputs will be the URL to the Couchbase bucket that we wish to run queries against, and the credentials (username followed by the password).
Usage: ./n1ql couchbase://localhost/test Administrator password
With N1QL transactions, a txid value is returned from the START TRANSACTION command. This is used with all subsequent N1QL queries within the transaction until the final COMMIT or ROLLBACK. So we must declare a transaction ID in the main function that we can pass to the remaining query requests that are part of the transaction.
Step 2: Initialize a Pointer
Initialize a pointer that will be used to save the transaction ID from the BEGIN TRANSACTION query. This ID will be used in the entire transaction.
char *transaction_id = (char *)malloc(64 * sizeof(char));
Step 3: Initialize the Cluster
A connection to a Couchbase Server cluster is represented by a lcb_INSTANCE object:
lcb_INSTANCE *instance;
The set of allowed operations depends on the type of this object, and whether the bucket is associated with it. Here, we use the type Cluster. The simplest way to create a cluster object is to call lcb_create to create a Couchbase handle by passing LCB_TYPE_CLUSTER with a connection string, username, and password. Next, we schedule a connection using lcb_connect(), then we check if the bucket exists.
lcb_CREATEOPTS *create_options = NULL;
lcb_createopts_create(&create_options, LCB_TYPE_CLUSTER);
lcb_createopts_connstr(create_options, argv[1], strlen(argv[1]));
lcb_createopts_credentials(create_options, argv[2], strlen(argv[2]), argv[3], strlen(argv[3]));
check(lcb_create(&instance, create_options), "create couchbase handle");
lcb_createopts_destroy(create_options);
check(lcb_connect(instance), "schedule connection");
lcb_wait(instance, LCB_WAIT_DEFAULT);
check(lcb_cntl(instance, LCB_CNTL_GET, LCB_CNTL_BUCKETNAME, &bucket), "get bucket name");
Step 4: Run the Queries in query.h
We have the queries corresponding to our transaction defined in queries.h.
BEGIN WORK
INSERT INTO test VALUES(\"kkk1\", {\"a\":1})
SELECT d.*, META(d).id FROM test AS d WHERE d.a >= 0
SAVEPOINT s1
UPDATE test AS d SET d.b = 10 WHERE d.a > 0
SELECT d.*, META(d).id FROM test AS d WHERE d.a >= 0
SAVEPOINT s2
UPDATE test AS d SET d.b = 10, d.c = \"xyz\" WHERE d.a > 0
SELECT d.*, META(d).id FROM test AS d WHERE d.a >= 0
ROLLBACK TRAN TO SAVEPOINT s2
SELECT d.*, META(d).id FROM test AS d WHERE d.a >= 0
INSERT INTO test VALUES(\"kkk2\", {\"a\":2})
UPDATE test AS d SET d.b = 20, d.c = \"xyz\" WHERE d.a > 0
COMMIT WORK
for (ii = 0; ii < num_queries; ii++) {
lcb_CMDQUERY *cmd;
lcb_cmdquery_create(&cmd);
check(lcb_cmdquery_statement(cmd, queries[ii].query, strlen(queries[ii].query)), "set QUERY statement");
printf("----> \x1b[1m%s\x1b[0m\n", queries[ii].query);
if (ii == 0) {
lcb_cmdquery_callback(cmd, txid_callback);
lcb_wait(instance, LCB_WAIT_DEFAULT);
check(lcb_query(instance, transaction_id, cmd), "schedule QUERY operation");
lcb_wait(instance, LCB_WAIT_DEFAULT);
} else {
char buf[100];
sprintf(buf,"\"%s\"",transaction_id);
lcb_cmdquery_callback(cmd, row_callback);
// SET rest option pretty to true and txtimeout to 3s
check(lcb_cmdquery_option(cmd, "pretty", strlen("pretty"), "true", strlen("true")),"set QUERY 'pretty' option");
check(lcb_cmdquery_option(cmd, "txtimeout", strlen("txtimeout"), "\"3s\"", strlen("\"3s\"")),"set QUERY 'txtimeout' option");
check(lcb_cmdquery_option(cmd, "txid", strlen("txid"),buf, strlen(buf)),"set QUERY 'txtimeout' option");
check(lcb_query(instance, NULL, cmd), "schedule QUERY operation");
lcb_wait(instance, LCB_WAIT_DEFAULT);
}
lcb_cmdquery_destroy(cmd);
lcb_wait(instance, LCB_WAIT_DEFAULT);
}
Prior to running the queries, we set three query parameters: pretty, txtimeout (transaction timeout), and the txid that we got from the first statement. These are request-level parameters.
Callback Functions
Now let's take a deep dive into the callback functions.
Txid_Callback
Here we need to parse the JSON response from running the BEGIN WORK statement to extract the txid to pass into the subsequent statements as a query parameter using the JSONC library. For this, we use the previously created pointer and set it in the lcb_respquery_cookie method. This sets the operation cookie, which means when we run lcb_query, there is a cookie argument to it and lcb_respquery_cookie gets this pointer in our callback function. (We created a pointer in the main function and set it in the callback function.)
/* create pointer transaction_id and set it in the callback */
lcb_respquery_cookie(resp, (void **)&transaction_id);
check(lcb_respquery_status(resp),"check response status");
lcb_respquery_row(resp, &row, &nrow);
if (!lcb_respquery_is_final(resp)) {
parsed_json = json_tokener_parse(row);
json_object_object_get_ex(parsed_json, "txid", &txid_obj);
temp = json_object_get_string(txid_obj);
strcpy(transaction_id,temp);
}
Row Callback
This is used to parse and retrieve the result rows from the query run.
lcb_STATUS rc = lcb_respquery_status(resp);
lcb_respquery_row(resp, &row, &nrow);
ln2space(row, nrow);
fprintf(stderr, "[\x1b[%dmQUERY\x1b[0m] %s, (%d) %.*s\n", err2color(rc), lcb_strerror_short(rc), (int)nrow,
(int)nrow, row);
if (lcb_respquery_is_final(resp)) {
fprintf(stderr, "\n");
}
}
Here we get the query response, get the rows and print it.
Using the above example, we can now use N1QL transactions in the C SDK. For the full code see and instructions on how to run it, see https://github.com/ikandaswamy/CBSDK_N1QLExamples.
Published at DZone with permission of Isha Kandaswamy, DZone MVB. See the original article here.
Opinions expressed by DZone contributors are their own.
Comments