Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

PubSub Example Using Mongoose

DZone's Guide to

PubSub Example Using Mongoose

Check out how easy it is to implement a Publish/Subscribe pattern with Mongoose using plain TCP protocol!

· IoT Zone
Free Resource

Cisco IoT makes digital transformation a reality in factories, transportation, and utilities. Learn how to start integrating with Cisco DevNet.

In this blog post, I am going to show you how easy it is to implement a Publish/Subscribe pattern with Mongoose using plain TCP protocol. As always, the full source code of this example is available on Github. All you need to do is to clone the repo and type “make” in the example directory. 

How the Example Works

When the example is built, it creates a binary executable file which can be started in either client or server mode:

$ ./publish_subscribe
Usage: ./publish_subscribe <port> <client|server>


We start one server and several clients:

$ ./publish_subscribe 1234

 server

Starting pubsub server on port 1234
$ ./publish_subscribe 1234 client
Connected to server. Type a message and press enter.
$ ./publish_subscribe 1234 client
Connected to server. Type a message and press enter.


Now, when we type in the terminal for one of the clients and press enter, the typed message is sent to the server. The server then broadcasts the message to the rest of the connected clients. Here, typing the message and pressing enter, the message is PUBLISHING. Each client, by connecting to the server, SUBSCRIBES to the messages from the other clients.

Let’s see how it works in a terminal. A client that sends a message looks something like this:

$ ./publish_subscribe 1234 client
Connected to server. Type a message and press enter.
hello
hello


The first “hello” is a typed message. The second one is a broadcast from the server. The other connected client will show only one broadcasted message:

$ ./publish_subscribe 1234 client
Connected to server. Type a message and press enter.
hello


Now, let’s see how this is implemented.

Server Code

The server code is simple because it doesn’t need to deal with STDIO. In main(), it opens a listening connection:

// Server code path
    mg_mgr_init(&mgr, NULL);
    mg_bind(&mgr, argv[1], server_handler);
    printf("Starting pubsub server on port %s\n", argv[1]);


In the event handler, all it needs to do is to handle the MG_EV_RECV message: when the message is received, we need to iterate over all active client connections and push that message to every connection. At the end, we clear the received message from the IO buffer. Here is the code:

static void server_handler(struct mg_connection *nc, int ev, void *p) 

 {

  (void) p;

  if (ev == MG_EV_RECV) {

    // Push received message to all connections

    struct mbuf *io = &nc->recv_mbuf;

    struct mg_connection *c;
  for (c = mg_next(nc->mgr, NULL); c != NULL; c = mg_next(nc->mgr, c)) {
      mg_send(c, io->buf, io->len);
    }
    mbuf_remove(io, io->len);
  }
}


Client Code

Client code is a bit more elaborate. It needs to read from the standard input in case the user types a message. Then, for each received message it needs to print on standard output. So, in main(), the client code first creates socket pairs. One end of socket pair is given to a dedicated thread that reads from stdin. The other end is monitored by the event manager. Also, we create a connection to the server:

int fds[2

 ];

    struct mg_connection *ioconn, *server_conn;

    mg_mgr_init(&mgr, NULL);
   // Connect to the pubsub server
    server_conn = mg_connect(&mgr, argv[1], client_handler);
    if (server_conn == NULL) {
      fprintf(stderr, "Cannot connect to port %s\n", argv[1]);
      exit(EXIT_FAILURE);
    }
   // Create a socketpair and give one end to the thread that reads stdin
    mg_socketpair(fds, SOCK_STREAM);
    mg_start_thread(stdin_thread, &fds[1]);
   // The other end of a pair goes inside the server
    ioconn = mg_add_sock(&mgr, fds[0], client_handler);
    ioconn->flags |= MG_F_USER_1;    // Mark this so we know this is a
stdin
    ioconn->user_data = server_conn;


The event handler is simple. If we receive data from the stdin connection, we forward it to the server. If we receive data from the server, we print it to stdout:

static void client_handler(struct mg_connection *conn, int ev, void *p) 

 {

  struct mbuf *io = &conn->recv_mbuf;

  (void) p;
 if (ev == MG_EV_CONNECT) {
    if (conn->flags & MG_F_CLOSE_IMMEDIATELY) {
      printf("%s\n", "Error connecting to server!");
      exit(EXIT_FAILURE);
    }
    printf("%s\n", "Connected to server. Type a message and press enter.");
  } else if (ev == MG_EV_RECV) {
    if (conn->flags & MG_F_USER_1) {
      // Received data from the stdin, forward it to the server
      struct mg_connection *c = (struct mg_connection *) conn->user_data;
      mg_send(c, io->buf, io->len);
      mbuf_remove(io, io->len);
    } else {
      // Received data from server connection, print it
      fwrite(io->buf, io->len, 1, stdout);
      mbuf_remove(io, io->len);
    }
  } else if (ev == MG_EV_CLOSE) {
    // Connection has closed, most probably cause server has stopped
    exit(EXIT_SUCCESS);
  }
}


Enjoy recreating this example!

Cisco is a software company. Surprised? Don’t be. Join DevNet to explore APIs, tools, and techniques that developers are using to add collaboration, IoT, security, network priority, and more!

Topics:
tcp ,server ,mongoose ,web server ,iot

Published at DZone with permission of Sergey Lyubka, DZone MVB. See the original article here.

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}