Alberto Marchetti is a full-stack developer, and author of “RenderScript: parallel computing on Android, the easy way.” He is always living on the edge by constantly jumping into the discovery of modern languages and technologies.
Timed tasks using Couchbase and Go
In this post I’m going to show how you can exploit the Couchbase indexing system to create a timed-tasks distributed system. The example code for the project, together with its run instructions, can be found at https://github.com/cmaster11/cb-blog-timed-tasks.
Disclaimer: Because of the complexity of the topic, only relevant code extracts are posted in this page.
The concept
Let’s try to define the requirements of such a system:
- The main feature of a timed tasks system is to be able to specify when a certain task will be executed in time. This can be achieved by using an ExecuteAt field, which will contain the desired execution time (Unix time, milliseconds-based).
- A modern software requirement is that it must support a multi-node environment, which means it needs to be a distributed system. We must then ensure that multiple workers will NOT process the same tasks! We can use a nice Couchbase feature here, pessimistic locking, which will let a worker fetch a document and lock it down, so that no other workers can process it.
The following is a possible structure to represent our task:
1 2 3 4 5 6 7 8 9 |
type Task struct { Id string // The desired task execution time ExecuteAt int64 // Task-specific content Content string } |
Couchbase features
First, here’s an overview of the Couchbase features we’ll be using:
META()
Every document in a Couchbase bucket has an associated META()-document, which contains document entity-specific information like:
- id – the document key inside the bucket.
- cas – an int64 number, used by Couchbase to prevent race conditions while editing documents.
- expiration – when a document is meant to expire, or 0 if it will never expire.
Hint: These fields (e.g., META().cas) can be indexed (starting from Couchbase 5.0).
CAS (Check and Set)
When fetching a document, its CAS value is returned too, and subsequent calls to alter the document can specify this value to make sure they’re going to edit the desired version of the document.
Example:
- Client A fetches a document and its current CAS value is 1234.
- Client B edits the document, which alters the CAS value to 5678.
- If A tries to edit the document without providing the CAS value, the edit will be successful, but changes made by B will be lost.
- If A tries to edit the document providing the CAS value (1234), an error will be returned because the current one (5678) is different. Client A will then need to fetch the document again and re-execute the process.
The CAS value is an extremely useful tool to ensure we’re not replacing or altering a wrong/newer version of a document, losing its changes.
Pessimistic locking
Couchbase lets us “lock” a document, so that it can only be read and written by one client at a time, using gocb.GetAndLock Go SDK function.
1 2 3 4 5 6 |
// Lock the document lockTime := 10 // seconds lockedCAS, err := bucket.GetAndLock(documentKey, lockTime, &outStruct) // Unlock it _, err = bucket.Unlock(documentKey, lockedCAS) |
When a document is locked, every other request to lock/mutate/unlock it will throw an error (it’s still possible to simply get the document), unless the correct CAS value is used.
Note: The maximum lock time of a document is 15 seconds, and using a lockTime value of 0 will cause the maximum time to be set. This creates a limitation on how long a task can run before being automatically marked as available (by locking timeout).
Hint: While a document is locked, its returned CAS value is -1.
Indexing and querying
Of note two hints put together tell us that we can index a field (META().cas), which turns to -1 when a document is locked. It also means that we can query documents based on this condition!
The query
Let’s try to define a query to match the requirements:
- We want to get a task id, which can be used later to get-and-lock the document: SELECT Id.
- The task should not be already locked: WHERE META().cas <> -1.
- The task needs to be executed now: WHERE ExecuteAt <= NOW_MILLIS() (NOW_MILLIS returns the current Unix time in milliseconds).
- We need to fetch the closest task in time, so we want to sort tasks by their execution time: ORDER BY ExecuteAt ASC.
- Let’s say for now (!!!) that a worker will want to get only one task to process at a time: LIMIT 1.
The result should be similar to this query:
1 2 3 4 5 6 |
SELECT `Id` FROM `timed_tasks` // Our bucket WHERE META().`cas` <> -1 AND `ExecuteAt` <= NOW_MILLIS() ORDER BY `ExecuteAt` ASC LIMIT 1 |
Its execution will return an array similar to:
1 2 3 |
[{ "Id": "task_id_goes_here" }] |
The index
We can now plan a query-specific index, optimized for the execution of the query we just thought about. Query-specific indexes are a must to improve NoSQL database query performance.
- The query is checking that a document is not currently locked:
1WHERE META().cas <> -1. - Also, it’s directly asking the execution time to be in the past. We then need to index the ExecuteAt field.
The index query could then be the following:
1 2 3 4 |
CREATE INDEX `idx_timed_task` ON `timed_tasks` (`ExecuteAt` ASC) WHERE META().`cas` <> -1 |
Optimizing the query
We can now further optimize the query:
- We can tell the query to use our index by providing a hint to it: USE INDEX (idx_timed_task USING GSI).
- We can ask Couchbase to wait for the index to be up to date (usually indexing is an asynchronous process) before executing the query, so that our results will for sure contain unlocked tasks, by providing a consistency requirement at SDK level: query.Consistency(gocb.RequestPlus).
The flow
A possible flow for the timed task consumer worker loop is:
- Query for an available task id.
- Get and lock the task.
- Process the task.
- Delete the task.
Multiple nodes
Let’s think for a second about how a multi-node setup can alter this flow.
If multiple workers are going to query for available tasks concurrently, they’d probably find the same task, and only one of them would be able to process it successfully, while the other workers will have to repeat the loop (execute a new query) to get new tasks.
We can implement then another approach:
- Query for available tasks ids, limiting the amount of ids to the number of workers.
- For each task id, try to lock the task. At first successful lock, go to 4.
- If no tasks have been successfully locked, repeat loop.
- Process the task.
- Delete the task.
At its best, every worker will be able to successfully lock one task at first try. At its worse, workers will need to try to lock multiple documents unsuccessfully. The average execution will see workers successfully locking tasks, maybe after trying to lock a few others.
We have to make a compromise between how frequently we want to query the database, and how many failed lock attempts we can support. Generally speaking, trying to lock documents will be much faster than executing N1QL queries.
The code
Let’s take a look at some relevant code examples:
The producer
The generation of the task can be summed up in this function:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
func NewTask(executeAt time.Time, content string) (*Task, error) { if executeAt.IsZero() { return nil, errors.New("executeAt must not be a zero time") } taskUUID, err := uuid.NewV1() // github.com/satori/go.uuid if err != nil { return nil, err } // Convert time.Time to int64 milliseconds executeAtMillis := executeAt.UnixNano() / int64(time.Millisecond) task := Task{ Id: taskUUID.String(), ExecuteAt: executeAtMillis, Content: content, } return &task, nil } |
Once we generate a valid task object, we can simply insert it in our bucket with:
1 |
_, err := controller.bucket.Insert(task.Id, task, 0) |
The consumer
We can get and lock a document by id, using this code:
1 2 3 |
// Using zero values for lock time will set the maximum time available. task := new(Task) lockedCAS, err := controller.bucket.GetAndLock(taskId, 0, &task) |
A task can be removed using this code:
1 |
_, err := controller.bucket.Remove(taskId, lockedCAS) |
The main consumer code can be summed up with the following snippet:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 |
taskIds, err := couchbaseController.QueryNextTaskIds(consumersCount) ... if len(taskIds) == 0 { ... // No tasks have been found, restart the loop } var taskId string var task *internal.Task var lockedCAS gocb.Cas for _, taskId = range taskIds { // Lock and get the task, so that only this consumer will process it task, lockedCAS, err = couchbaseController.GetAndLockTask(taskId) if err != nil { ... // Error getting the task, proceed to next one in list continue } // Successfully locked task! // Move out to process it break } if task == nil { ... // No tasks could be locked, restart loop } // Actual processing of the task // Improvement: could also return an error, which would let the task be // processed by another worker later. processTask(task) /* Remove the task from Couchbase. The task will be currently locked, which means we need to provide the current CAS value, so that the producer is authorized to remove it. */ err = couchbaseController.RemoveTask(taskId, lockedCAS) ... |
Conclusion
In this post we’ve seen a way to create a reliable distributed timed tasks system using Couchbase and Go.
This system could be further developed by:
- Support for processing errors.
- Implementing a retry feature (if processing fails, reschedule the task in the future).
- Improving the locking logic by:
- Tuning the maximum number of returned task ids (instead of the default workers count).
- Supporting a task processing duration of more than 15 seconds (the maximum lock time of a document in Couchbase).
Thank you for your time, and happy developing!
This post is part of the Community Writing program