DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Please enter at least three characters to search
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Zones

Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks

Last call! Secure your stack and shape the future! Help dev teams across the globe navigate their software supply chain security challenges.

Modernize your data layer. Learn how to design cloud-native database architectures to meet the evolving demands of AI and GenAI workloads.

Releasing software shouldn't be stressful or risky. Learn how to leverage progressive delivery techniques to ensure safer deployments.

Avoid machine learning mistakes and boost model performance! Discover key ML patterns, anti-patterns, data strategies, and more.

Related

  • Blue Skies Ahead: An AI Case Study on LLM Use for a Graph Theory Related Application
  • From Zero to Production: Best Practices for Scaling LLMs in the Enterprise
  • My LLM Journey as a Software Engineer Exploring a New Domain
  • Unlocking AI Coding Assistants Part 3: Generating Diagrams, Open API Specs, And Test Data

Trending

  • Building Enterprise-Ready Landing Zones: Beyond the Initial Setup
  • Mastering Fluent Bit: Installing and Configuring Fluent Bit on Kubernetes (Part 3)
  • AI Meets Vector Databases: Redefining Data Retrieval in the Age of Intelligence
  • Docker Model Runner: Streamlining AI Deployment for Developers
  1. DZone
  2. Data Engineering
  3. AI/ML
  4. Simplifying Multi-LLM Integration With KubeMQ

Simplifying Multi-LLM Integration With KubeMQ

Using a message broker as a router to handle requests between your apps and LLMs simplifies integration, improves reliability, and scales easily for AI-driven apps.

By 
John Vester user avatar
John Vester
DZone Core CORE ·
Feb. 25, 25 · Tutorial
Likes (3)
Comment
Save
Tweet
Share
17.6K Views

Join the DZone community and get the full member experience.

Join For Free

Integrating multiple large language models (LLMs) like OpenAI and Anthropic's Claude into applications can be a daunting task. The complexities of handling different APIs and communication protocols and ensuring efficient routing of requests can introduce significant challenges.

But using a message broker and router can be an elegant solution to this problem, addressing these pain points and providing several key advantages. 

In this blog post, we'll look at just how to do this. We’ll provide code examples to guide you through setting up a router that interfaces with both OpenAI and Anthropic's Claude using KubeMQ as our example.

Key Advantages of Using a Message Broker as an LLM Router

1. Simplified Integration

By using a message broker as a router, you abstract the complexities involved in directly interfacing with different LLM APIs. This simplifies the client-side code and reduces the likelihood of errors.

2. Multi-Model Use Cases

A message broker facilitates communication between multiple LLMs or models specialized for different tasks (e.g., one for summarization, another for sentiment analysis). It ensures requests are routed to the appropriate model efficiently, allowing applications to leverage the strengths of each model without additional overhead.

3. Batch Processing and Large-Scale Inference

For applications requiring batch processing or large-scale inference tasks, a message broker enables asynchronous handling by queuing requests when LLMs are busy or unavailable. This ensures that no data or requests are lost, providing reliable processing even under heavy workloads.

4. Redundancy and Fallback Assurance

In scenarios where uptime is critical, a message broker ensures seamless fallback to alternative environments. For example, if a connection to one cloud provider offering an OpenAI model fails, KubeMQ can automatically switch to another provider. This redundancy guarantees uninterrupted AI operations, maintaining service reliability and customer satisfaction.

5. Handling High Traffic Applications

A message broker distributes incoming requests across multiple LLM instances or replicas, preventing overloading and ensuring smooth operation. This load balancing is essential for high-traffic applications, allowing them to scale effectively without compromising performance.

Building an LLM Router With KubeMQ: Integrating OpenAI and Claude

Now, I’ll guide you through setting up a router that interfaces with both OpenAI and Anthropic's Claude using KubeMQ — a leading, open-source message broker and message queue platform. 

Using KubeMQ's advantages and providing code examples, we'll walk through setting up the messaging broker, building the server-side router, and creating a client to send queries.

All code examples can be found in KubeMQ’s GitHub repository.

Prerequisites

Before we begin, ensure you have the following:

  • Python 3.7 or higher installed.
  • Docker installed on your machine.
  • Valid API keys for OpenAI and Anthropic.
  • KubeMQ token (you can obtain one from the KubeMQ website).
  • kubemq-cq Python package installed: 
    Plain Text
     
    pip install kubemq-cq
  • .env file containing your API keys: 
    Plain Text
     
    OPENAI_API_KEY=your_openai_api_key
    ANTHROPIC_API_KEY=your_anthropic_api_key

Setting Up KubeMQ

First, we need to ensure that KubeMQ is operational. We'll deploy it using Docker:

Shell
 
docker run -d --rm \
  -p 8080:8080 \
  -p 50000:50000 \
  -p 9090:9090 \
  -e KUBEMQ_TOKEN="your_token" \
  kubemq/kubemq-community:latest


Ports explanation:

  • 8080 – Exposes the KubeMQ REST API
  • 50000 – Opens the gRPC port for client-server communication
  • 9090 – Exposes the KubeMQ REST gateway

Note: Replace your_token with your actual KubeMQ token.

Creating the LLM Router Server

The LLM Router acts as an intermediary between clients and the LLMs. It listens to specific channels for queries and routes them to the appropriate LLM.

server.py

Python
 
import time
from kubemq.cq import Client, QueryMessageReceived, QueryResponseMessage, QueriesSubscription, CancellationToken
from langchain.chat_models import ChatOpenAI
from langchain.llms import Anthropic
import os
from dotenv import load_dotenv
import threading

load_dotenv()

class LLMRouter:
    def __init__(self):
        self.openai_llm = ChatOpenAI(
            api_key=os.getenv("OPENAI_API_KEY"),
            model_name="gpt-3.5-turbo"
        )
        self.claude_llm = Anthropic(
            api_key=os.getenv("ANTHROPIC_API_KEY"),
            model="claude-3"
        )
        self.client = Client(address="localhost:50000")

    def handle_openai_query(self, request: QueryMessageReceived):
        try:
            message = request.body.decode('utf-8')
            result = self.openai_llm(message)
            response = QueryResponseMessage(
                query_received=request,
                is_executed=True,
                body=result.encode('utf-8')
            )
            self.client.send_response_message(response)
        except Exception as e:
            self.client.send_response_message(QueryResponseMessage(
                query_received=request,
                is_executed=False,
                error=str(e)
            ))

    def handle_claude_query(self, request: QueryMessageReceived):
        try:
            message = request.body.decode('utf-8')
            result = self.claude_llm(message)
            response = QueryResponseMessage(
                query_received=request,
                is_executed=True,
                body=result.encode('utf-8')
            )
            self.client.send_response_message(response)
        except Exception as e:
            self.client.send_response_message(QueryResponseMessage(
                query_received=request,
                is_executed=False,
                error=str(e)
            ))

    def run(self):
        def on_error(err: str):
            print(f"Error: {err}")

        def subscribe_openai():
            self.client.subscribe_to_queries(
                subscription=QueriesSubscription(
                    channel="openai_requests",
                    on_receive_query_callback=self.handle_openai_query,
                    on_error_callback=on_error,
                ),
                cancel=CancellationToken()
            )

        def subscribe_claude():
            self.client.subscribe_to_queries(
                subscription=QueriesSubscription(
                    channel="claude_requests",
                    on_receive_query_callback=self.handle_claude_query,
                    on_error_callback=on_error,
                ),
                cancel=CancellationToken()
            )

        threading.Thread(target=subscribe_openai).start()
        threading.Thread(target=subscribe_claude).start()

        print("LLM Router running on channels: openai_requests, claude_requests")
        try:
            while True:
                time.sleep(1)
        except KeyboardInterrupt:
            print("Shutting down...")

if __name__ == "__main__":
    router = LLMRouter()
    router.run()


Explanation:

  • Initialization.
    • Loads environment variables for API keys.
    • Initializes clients for OpenAI and Anthropic LLMs.
    • Set up a KubeMQ client.
  • Handling queries.
    • handle_openai_query and handle_claude_query decode the incoming message, pass it to the respective LLM, and send back the response.
    • Errors are caught and sent back with the is_executed flag set to False.
  • Subscription.
    • The router subscribes to two channels: openai_requests and claude_requests.
    • Uses threading to handle subscriptions concurrently.
  • Running the server.
    • The run method starts the subscriptions and keeps the server running until interrupted.

Developing the LLM Client

The client sends queries to the LLM Router, specifying which model to use.

client.py

Python
 
from kubemq.cq import Client, QueryMessage
import json

class LLMClient:
    def __init__(self, address="localhost:50000"):
        self.client = Client(address=address)

    def send_message(self, message: str, model: str) -> dict:
        channel = f"{model}_requests"
        response = self.client.send_query_request(QueryMessage(
            channel=channel,
            body=message.encode('utf-8'),
            timeout_in_seconds=30
        ))
        if response.is_error:
            return {"error": response.error}
        else:
            return {"response": response.body.decode('utf-8')}

if __name__ == "__main__":
    client = LLMClient()
    models = ["openai", "claude"]
    message = input("Enter your message: ")
    model = input(f"Choose model ({'/'.join(models)}): ")
    if model in models:
        response = client.send_message(message, model)
        if "error" in response:
            print(f"Error: {response['error']}")
        else:
            print(f"Response: {response['response']}")
    else:
        print("Invalid model selected")


Explanation:

  • Initialization.
    • Sets up a KubeMQ client.
  • Sending messages.
    • send_message method constructs the appropriate channel based on the selected model.
    • Sends a query message to the router and waits for the response.
    • Handles errors and decodes the response body.
  • User interaction.
    • Prompts the user to enter a message and select a model.
    • Prints out the response from the LLM.

Sending and Receiving via REST

For services or clients that prefer or require RESTful communication, KubeMQ provides REST endpoints.

Sending a Request via REST

Endpoint:

Plain Text
 
POST http://localhost:9090/send/request


Headers:

Plain Text
 
Content-Type: application/json


Body:

JSON
 
{
   "RequestTypeData": 2,
   "ClientID": "LLMRouter-sender",
   "Channel": "openai_requests",
   "BodyString": "What is the capital of France?",
   "Timeout": 30000
}


Payload details:

  • RequestTypeData – Specifies the request type (2 for query).
  • ClientID – An identifier for the client sending the request.
  • Channel – The channel corresponding to the LLM model (openai_requests or claude_requests).
  • BodyString – The message to send to the LLM.
  • Timeout – The time to wait for a response (in milliseconds).

Receiving the Response

The response will be a JSON object containing the LLM's output or an error message.

Conclusion

By leveraging a message broker (KubeMQ), we've built a scalable and efficient router that can interface with multiple LLMs. This setup allows clients to send queries to different models seamlessly and can be extended to include more models or functionalities.

Some benefits of this approach are:

  1. Simplified integration. You abstract the complexities involved in directly interfacing with different LLM APIs. This simplifies the client-side code and reduces the likelihood of errors.
  2. Multi-model support. Efficiently routes requests to the appropriate model specialized for different tasks.
  3. Reliability. Ensures no data is lost even when LLMs are busy or unavailable.
  4. Redundancy. Provides fallback mechanisms to maintain uninterrupted operations.
  5. Scalability. Handles high traffic by distributing requests across multiple LLM instances.

Have a really great day!

AI Message broker Integration large language model

Opinions expressed by DZone contributors are their own.

Related

  • Blue Skies Ahead: An AI Case Study on LLM Use for a Graph Theory Related Application
  • From Zero to Production: Best Practices for Scaling LLMs in the Enterprise
  • My LLM Journey as a Software Engineer Exploring a New Domain
  • Unlocking AI Coding Assistants Part 3: Generating Diagrams, Open API Specs, And Test Data

Partner Resources

×

Comments
Oops! Something Went Wrong

The likes didn't load as expected. Please refresh the page and try again.

ABOUT US

  • About DZone
  • Support and feedback
  • Community research
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 100
  • Nashville, TN 37211
  • support@dzone.com

Let's be friends:

Likes
There are no likes...yet! 👀
Be the first to like this post!
It looks like you're not logged in.
Sign in to see who liked this post!