Creating a Distributed System in 300 Lines With Mesos, Docker, and Go

DZone 's Guide to

Creating a Distributed System in 300 Lines With Mesos, Docker, and Go

Learn to create Docker images in Mesos with code from a bitcoin mining framework.

· Cloud Zone ·
Free Resource

Building distributed systems is hard. They need to be scalable, fault tolerant, highly available, consistent, secure, elastic, and efficient. In order to achieve these properties, distributed systems require many complex components to work together in a sophisticated manner. For example, Apache Hadoop depends on a highly fault tolerant file system (HDFS) to provide high throughput for processing multi-terabyte data sets in parallel on large clusters.

In the past, each new distributed system, such as Hadoop or Cassandra, had to build its own understructure for messaging, storage, networking, fault tolerance, and elasticity.  Fortunately, systems like Apache Mesos (and the commercialized version of it, the Mesosphere DCOS) simplify the task of building and managing distributed systems by providing operating-system-like primitives for the key building blocks of distributed systems. Mesos abstracts away CPU, memory, storage, and other compute resources so that developers can write distributed applications as if their datacenter clusters were one giant machine.

Applications for Mesos are called frameworks and can be built to solve a variety of problems. Apache Spark, a popular general-purpose cluster computing tool used in data analytics, and Chronos, a distributed and fault-tolerant cron-like scheduler, are two examples of frameworks built on top of Mesos. Frameworks can be built in many languages, including C++, Go, Python, Java, Haskell, and Scala.

Bitcoin mining is a prime example of a problem that requires a distributed solution. Bitcoin varies the difficulty of generating an acceptable hash for verifying the authenticity of a block of transactions. Give or take a few decades, a single laptop would take over 150 years to mine one block. As a result, there are now “mining pools” which allow miners to combine their computing resources together and mine faster. One of our interns, Derek, has written a bitcoin mining framework that takes advantage of cluster resources to do the same work. In the rest of this article, we will be walking through his code.

A Mesos framework consists of a scheduler and an executor. A scheduler communicates with the Mesos master and decides what tasks to launch, whereas an executor runs on the slaves to actually execute the intended tasks. Most frameworks implement their own scheduler and use one of the standard executors provided by Mesos. Frameworks can also implement their own custom executor. In this case, we’ll be writing our own scheduler and using the standard command executor to launch Docker images containing our bitcoin services.

For our scheduler, we will want to launch two kinds of tasks: one miner server task and multiple miner worker tasks . A miner server communicates with a bitcoin mining pool and assigns blocks to each miner worker. The miner workers do the hard work, which is mining bitcoins.

Tasks are actually encapsulated in the framework executor, so launching a task means telling the Mesos master to start up an executor on one of its slaves. Since we’ll be using the standard command executor, we can specify tasks to be a binary executable, a bash script, or other command. In our case, since Mesos supports Docker natively, we will be using executable Docker images. Docker is a technology that allows you to package your application with all the runtime dependencies it needs.

To use Docker images in Mesos, you just need their names in the Docker registry:

const (
    MinerServerDockerImage = "derekchiang/p2pool"
    MinerDaemonDockerImage = "derekchiang/cpuminer"

Then, we define a few constants that specify the resource requirements of each task:

const (
    MemPerDaemonTask = 128  // mining shouldn't be memory-intensive
    MemPerServerTask = 256
    CPUPerServerTask = 1    // a miner server does not use much CPU

Now we define the actual scheduler. The scheduler should keep track of the state it needs to operate correctly.

type MinerScheduler struct {
    // bitcoind RPC credentials
    bitcoindAddr string
    rpcUser      string
    rpcPass      string

    // mutable state
    minerServerRunning  bool
    minerServerHostname string 
    minerServerPort     int    // the port that miner daemons 
                               // connect to

    // unique task ids
    tasksLaunched        int
    currentDaemonTaskIDs []*mesos.TaskID

A scheduler must implement the following interface:

type Scheduler interface {
    Registered(SchedulerDriver, *mesos.FrameworkID, *mesos.MasterInfo)
    Reregistered(SchedulerDriver, *mesos.MasterInfo)
    ResourceOffers(SchedulerDriver, []*mesos.Offer)
    OfferRescinded(SchedulerDriver, *mesos.OfferID)
    StatusUpdate(SchedulerDriver, *mesos.TaskStatus)
    FrameworkMessage(SchedulerDriver, *mesos.ExecutorID, 
                     *mesos.SlaveID, string)
    SlaveLost(SchedulerDriver, *mesos.SlaveID)
    ExecutorLost(SchedulerDriver, *mesos.ExecutorID, *mesos.SlaveID, 
    Error(SchedulerDriver, string)

Now let’s look at the callbacks:

func (s *MinerScheduler) Registered(_ sched.SchedulerDriver, 
      frameworkId *mesos.FrameworkID, masterInfo *mesos.MasterInfo) {
    log.Infoln("Framework registered with Master ", masterInfo)

func (s *MinerScheduler) Reregistered(_ sched.SchedulerDriver, 
      masterInfo *mesos.MasterInfo) {
    log.Infoln("Framework Re-Registered with Master ", masterInfo)

func (s *MinerScheduler) Disconnected(sched.SchedulerDriver) {
    log.Infoln("Framework disconnected with Master")

`Registered` is invoked when the scheduler is successfully registered with the Mesos master.

`Reregistered` is invoked when the scheduler disconnects from the Mesos master, and then gets registered again, for instance, when the master restarts.

`Disconnected` is invoked when the scheduler disconnects from the Mesos master. This can happen when the master goes down.

So far, we have only been printing log messages in the callback functions, because most callbacks can effectively be left blank for a simple framework like this. However, the next callback is at the heart of every framework, and needs to be written with care.

`ResourceOffers` is invoked when the scheduler receives an offer from the master. Each offer contains a list of resources available for the framework to use on the cluster. Resources typically include CPU, memory, ports, and disk. A framework can use some, all, or none of the resources it has been offered.

For each offer, we want to gather the resources being offered and make a decision about whether we want to launch a server task or a worker task. You can launch as many tasks as you can fit into each offer, but since bitcoin mining is CPU-dependent, we launch a single miner task per offer using all of the available CPU resources.

for i, offer := range offers {
    // … Gather resource being offered and do setup
    if !s.minerServerRunning && mems >= MemPerServerTask &&
            cpus >= CPUPerServerTask && ports >= 2 {
        // … Launch a server task since no server is running and we 
        // have resources to launch it.
    } else if s.minerServerRunning && mems >= MemPerDaemonTask {
        // … Launch a miner since a server is running and we have mem 
        // to launch one.

For each task, we need to create a corresponding TaskInfo message that contains the information needed to launch that task.

taskID = &mesos.TaskID {
    Value: proto.String("miner-server-" + 

Task IDs are decided by the framework and should be unique per framework.

containerType := mesos.ContainerInfo_DOCKER
task = &mesos.TaskInfo {
    Name: proto.String("task-" + taskID.GetValue()),
    TaskId: taskID,
    SlaveId: offer.SlaveId,
    Container: &mesos.ContainerInfo {
        Type: &containerType,
        Docker: &mesos.ContainerInfo_DockerInfo {
            Image: proto.String(MinerServerDockerImage),
    Command: &mesos.CommandInfo {
        Shell: proto.Bool(false),
        Arguments: []string {
            // these arguments will be passed to run_p2pool.py
            "--bitcoind-address", s.bitcoindAddr,
            "--p2pool-port", strconv.Itoa(int(p2poolPort)),
            "-w", strconv.Itoa(int(workerPort)),
            s.rpcUser, s.rpcPass,
    Resources: []*mesos.Resource {
        util.NewScalarResource("cpus", CPUPerServerTask),
        util.NewScalarResource("mem", MemPerServerTask),

The TaskInfo message specifies a few important pieces of metadata about the task that allow the Mesos node to launch the Docker container. In particular, we specify a name, the task ID, container information, and arguments to be passed to the container. We also specify the resources required by the task.

Now that we have constructed our TaskInfo, we can launch our task using:

driver.LaunchTasks([]*mesos.OfferID{offer.Id}, tasks, &mesos.Filters{RefuseSeconds: proto.Float64(1)})

And now we’re launching tasks! The last thing we need to handle in our framework is what happens when the miner server shuts down. We can do this with the StatusUpdate function.

There are a few types of status updates, corresponding to the different stages in a task’s lifetime.  For our framework, we want to make sure that if the miner server fails for any reason, we kill all the miner workers to avoid wasting resources.  Here is the relevant code:

if strings.Contains(status.GetTaskId().GetValue(), "server") &&
    (status.GetState() == mesos.TaskState_TASK_LOST ||
        status.GetState() == mesos.TaskState_TASK_KILLED ||
        status.GetState() == mesos.TaskState_TASK_FINISHED ||
        status.GetState() == mesos.TaskState_TASK_ERROR ||
        status.GetState() == mesos.TaskState_TASK_FAILED) {

    s.minerServerRunning = false

    // kill all tasks
    for _, taskID := range s.currentDaemonTaskIDs {
        _, err := driver.KillTask(taskID)
        if err != nil {
            log.Errorf("Failed to kill task %s", taskID)
    s.currentDaemonTaskIDs = make([]*mesos.TaskID, 0)

And that’s it! We have a working distributed bitcoin mining framework on Apache Mesos in (roughly) 300 lines of Go. This demonstrates how quick and straightforward writing distributed systems can be using the Mesos framework API. We encourage you to try writing your own framework. If you’re looking for inspiration, check out RENDLER, a distributed web-crawler, and ANAGRAMMER, an anagram finder.

Mesosphere engineers who contributed to this report include Cody Roseborough, Lily Chen, Neeral Dodhia, Derek Chiang, Luke Leslie, and Brendan Chang.

cloud, containers, database, docker, go, hadoop, mesos

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}