DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Refcards Trend Reports Events Over 2 million developers have joined DZone. Join Today! Thanks for visiting DZone today,
Edit Profile Manage Email Subscriptions Moderation Admin Console How to Post to DZone Article Submission Guidelines
View Profile
Sign Out
Refcards
Trend Reports
Events
Zones
Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Partner Zones AWS Cloud
by AWS Developer Relations
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Partner Zones
AWS Cloud
by AWS Developer Relations
Securing Your Software Supply Chain with JFrog and Azure
Register Today

Trending

  • Health Check Response Format for HTTP APIs
  • What ChatGPT Needs Is Context
  • Transactional Outbox Patterns Step by Step With Spring and Kotlin
  • Operator Overloading in Java

Trending

  • Health Check Response Format for HTTP APIs
  • What ChatGPT Needs Is Context
  • Transactional Outbox Patterns Step by Step With Spring and Kotlin
  • Operator Overloading in Java

PubSub Example Using Mongoose

Check out how easy it is to implement a Publish/Subscribe pattern with Mongoose using plain TCP protocol!

Sergey Lyubka user avatar by
Sergey Lyubka
·
Jun. 10, 16 · Tutorial
Like (3)
Save
Tweet
Share
4.91K Views

Join the DZone community and get the full member experience.

Join For Free

In this blog post, I am going to show you how easy it is to implement a Publish/Subscribe pattern with Mongoose using plain TCP protocol. As always, the full source code of this example is available on Github. All you need to do is to clone the repo and type “make” in the example directory. 

How the Example Works

When the example is built, it creates a binary executable file which can be started in either client or server mode:

$ ./publish_subscribe
Usage: ./publish_subscribe <port> <client|server>


We start one server and several clients:

$ ./publish_subscribe 1234

 server

Starting pubsub server on port 1234
$ ./publish_subscribe 1234 client
Connected to server. Type a message and press enter.
$ ./publish_subscribe 1234 client
Connected to server. Type a message and press enter.


Now, when we type in the terminal for one of the clients and press enter, the typed message is sent to the server. The server then broadcasts the message to the rest of the connected clients. Here, typing the message and pressing enter, the message is PUBLISHING. Each client, by connecting to the server, SUBSCRIBES to the messages from the other clients.

Let’s see how it works in a terminal. A client that sends a message looks something like this:

$ ./publish_subscribe 1234 client
Connected to server. Type a message and press enter.
hello
hello


The first “hello” is a typed message. The second one is a broadcast from the server. The other connected client will show only one broadcasted message:

$ ./publish_subscribe 1234 client
Connected to server. Type a message and press enter.
hello


Now, let’s see how this is implemented.

Server Code

The server code is simple because it doesn’t need to deal with STDIO. In main(), it opens a listening connection:

// Server code path
    mg_mgr_init(&mgr, NULL);
    mg_bind(&mgr, argv[1], server_handler);
    printf("Starting pubsub server on port %s\n", argv[1]);


In the event handler, all it needs to do is to handle the MG_EV_RECV message: when the message is received, we need to iterate over all active client connections and push that message to every connection. At the end, we clear the received message from the IO buffer. Here is the code:

static void server_handler(struct mg_connection *nc, int ev, void *p) 

 {

  (void) p;

  if (ev == MG_EV_RECV) {

    // Push received message to all connections

    struct mbuf *io = &nc->recv_mbuf;

    struct mg_connection *c;
  for (c = mg_next(nc->mgr, NULL); c != NULL; c = mg_next(nc->mgr, c)) {
      mg_send(c, io->buf, io->len);
    }
    mbuf_remove(io, io->len);
  }
}


Client Code

Client code is a bit more elaborate. It needs to read from the standard input in case the user types a message. Then, for each received message it needs to print on standard output. So, in main(), the client code first creates socket pairs. One end of socket pair is given to a dedicated thread that reads from stdin. The other end is monitored by the event manager. Also, we create a connection to the server:

int fds[2

 ];

    struct mg_connection *ioconn, *server_conn;

    mg_mgr_init(&mgr, NULL);
   // Connect to the pubsub server
    server_conn = mg_connect(&mgr, argv[1], client_handler);
    if (server_conn == NULL) {
      fprintf(stderr, "Cannot connect to port %s\n", argv[1]);
      exit(EXIT_FAILURE);
    }
   // Create a socketpair and give one end to the thread that reads stdin
    mg_socketpair(fds, SOCK_STREAM);
    mg_start_thread(stdin_thread, &fds[1]);
   // The other end of a pair goes inside the server
    ioconn = mg_add_sock(&mgr, fds[0], client_handler);
    ioconn->flags |= MG_F_USER_1;    // Mark this so we know this is a
stdin
    ioconn->user_data = server_conn;


The event handler is simple. If we receive data from the stdin connection, we forward it to the server. If we receive data from the server, we print it to stdout:

static void client_handler(struct mg_connection *conn, int ev, void *p) 

 {

  struct mbuf *io = &conn->recv_mbuf;

  (void) p;
 if (ev == MG_EV_CONNECT) {
    if (conn->flags & MG_F_CLOSE_IMMEDIATELY) {
      printf("%s\n", "Error connecting to server!");
      exit(EXIT_FAILURE);
    }
    printf("%s\n", "Connected to server. Type a message and press enter.");
  } else if (ev == MG_EV_RECV) {
    if (conn->flags & MG_F_USER_1) {
      // Received data from the stdin, forward it to the server
      struct mg_connection *c = (struct mg_connection *) conn->user_data;
      mg_send(c, io->buf, io->len);
      mbuf_remove(io, io->len);
    } else {
      // Received data from server connection, print it
      fwrite(io->buf, io->len, 1, stdout);
      mbuf_remove(io, io->len);
    }
  } else if (ev == MG_EV_CLOSE) {
    // Connection has closed, most probably cause server has stopped
    exit(EXIT_SUCCESS);
  }
}


Enjoy recreating this example!

Mongoose (web server)

Published at DZone with permission of , DZone MVB. See the original article here.

Opinions expressed by DZone contributors are their own.

Trending

  • Health Check Response Format for HTTP APIs
  • What ChatGPT Needs Is Context
  • Transactional Outbox Patterns Step by Step With Spring and Kotlin
  • Operator Overloading in Java

Comments

Partner Resources

X

ABOUT US

  • About DZone
  • Send feedback
  • Careers
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 600 Park Offices Drive
  • Suite 300
  • Durham, NC 27709
  • support@dzone.com

Let's be friends: