DB+Sub/Pub Process Architecture
I saw this architecture in action in a logistics system that provided services to both operator terminals and electronic hardware for machine control. I was surprised by how resilient it was to the many issues and failures we encountered.
The basic idea is as follows: The entire control system runs on different processes across different machines within the same network. In this network, a server stored all data in a single centralized database. When a process identified a new task that another service needed to complete, it created a new entry in a specific table, storing all the task data. Subsequently, it sent a network notification to the service, indicating that a new task needed to be completed.
It may seem illogical to not use the same network communication to both notify the task and transmit the data to be processed. Instead, the data was stored in a table in a “pending” state. This might appear to be a waste of resources until you consider the need to build systems that are resilient to failures.
Why is DB+Pub/Sub a good choice?
The DB+Pub/Sub architecture is ideal for multiple scenarios for the following reasons:
- Resilience: Data persists in the database, so a failure in any chat manager, service, or UI does not result in data loss.
- Decoupling: Components don’t need to communicate directly; the database acts as the source of truth, and queues handle notifications.
- Modularity: You can add new components (e.g., an analytics module or a new UI type) without changing the architecture.
- Scalability: Queues handle high loads, and the centralized database supports multiple consumers.
- Flexibility: You can switch technologies (e.g., from CoreData to PostgreSQL or SQLite) without affecting communication logic.
This architecture is common in distributed systems (e.g., microservices) and is well-suited for non-real-time critical communications, such as data presentation UIs.
DB+Pub/Sub Architecture Design
In this architecture, the database stores messages and communication state, while the pub/sub queue system notifies components when there’s pending work (e.g., a new message or command).
- Centralized Database:
- Stores conversation states, chat lists, commands, messages, and configurations.
- Also stores UI commands (e.g., “select chat”) and updates generated by the chat manager (e.g., new messages).
- Example technologies: PostgreSQL (for distributed environments), MongoDB (for unstructured data), or CoreData (if the UI is on a local device and no centralized server is needed).
- Queue System:
- Acts as a notification mechanism to “wake up” components when new data is available in the database.
- Example technologies: RabbitMQ, Kafka, or Redis (using its queue or Pub/Sub capabilities).
- Each component (chat manager, UI) has its own queue to receive notifications.
- Communication Flow:
- From UI to data manager:
- The UI writes a command (e.g.,
{ "type": "select_chat", "chat_id": "123" }
) to the database (in a table likecommands
). - The UI publishes a message to the chat manager’s queue (e.g., “new command to process”).
- The data manager reads the queue notification, retrieves the command from the database, processes it (e.g., updates the active chat), and saves the result in the database.
- The UI writes a command (e.g.,
- From data manager to UI:
- The chat manager writes an update (e.g., a new message
{ "type": "new_message", "chat_id": "123", "text": "Hello" }
) to the database (in a table like updates). - The manager publishes a message to the UI’s queue (e.g., “new update available”).
- The UI reads the notification, queries the database, and updates the interface with the new data.
- The chat manager writes an update (e.g., a new message
- From UI to data manager:
Practical Implementation
Here’s a detailed outline for implementing a communication example between a chat manager and a user UI with DB+Sub/Pub:
- Database
- Suggested technology: PostgreSQL (scalable, robust, compatible with distributed environments) or CoreData (if the UI and manager are on the same device).
- Suggested schema:
- Table
chats
: Stores the chat list (chat_id
,name
,created_at
). - Table
messages
: Stores messages (message_id
,chat_id
,text
,timestamp
,sender
). - Table
commands
: Stores UI commands (command_id
,type
,data
,status
,timestamp
). - Table
updates
: Stores chat manager updates (update_id
,type
,data
,chat_id
,timestamp
).
- Table
- PostgreSQL example:
CREATE TABLE chats (
chat_id VARCHAR(50) PRIMARY KEY,
name VARCHAR(100),
created_at TIMESTAMP
);
CREATE TABLE messages (
message_id SERIAL PRIMARY KEY,
chat_id VARCHAR(50) REFERENCES chats(chat_id),
text TEXT,
timestamp TIMESTAMP,
sender VARCHAR(50)
);
CREATE TABLE commands (
command_id SERIAL PRIMARY KEY,
type VARCHAR(50),
data JSONB,
status VARCHAR(20), -- pending, processed
timestamp TIMESTAMP
);
CREATE TABLE updates (
update_id SERIAL PRIMARY KEY,
type VARCHAR(50),
data JSONB,
chat_id VARCHAR(50),
timestamp TIMESTAMP
);
- Queue System
- Suggested technology: RabbitMQ (easy to use, robust) or Redis (lighter, with Pub/Sub or queues).
- Queues:
ui_commands
: To notify the chat manager about UI commands.ui_updates
: To notify the UI about chat manager updates.
- Queue message format:
- For commands:
{ "command_id": 123, "type": "select_chat" }
- For updates:
{ "update_id": 456, "type": "new_message", "chat_id": "123" }
- For commands:
- Chat Manager (in Rust, Python, or similar)
- Functionality:
- Listens to the
ui_commands
queue. - Upon receiving a notification, queries the
commands
table, processes the command (e.g., selects a chat), and updates the database state. - When generating an update (e.g., a new message), writes it to the
updates
table and publishes a notification to theui_updates
queue.
- Listens to the
- Python example with RabbitMQ:
- Functionality:
import pika
import psycopg2
import json
# Database connection
db_conn = psycopg2.connect("dbname=chatbot user=postgres password=secret")
cursor = db_conn.cursor()
# RabbitMQ connection
connection = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
channel = connection.channel()
channel.queue_declare(queue="ui_commands")
def callback(ch, method, properties, body):
command = json.loads(body)
command_id = command["command_id"]
cursor.execute("SELECT data FROM commands WHERE command_id = %s", (command_id,))
data = cursor.fetchone()[0]
if data["type"] == "select_chat":
chat_id = data["chat_id"]
# Update active chat (e.g., in memory or DB)
cursor.execute(
"INSERT INTO updates (type, data, chat_id, timestamp) VALUES (%s, %s, %s, NOW())",
("chat_selected", json.dumps({"chat_id": chat_id}), chat_id)
)
db_conn.commit()
channel.basic_publish(exchange="", routing_key="ui_updates", body=json.dumps({"update_id": cursor.lastrowid}))
cursor.execute("UPDATE commands SET status = 'processed' WHERE command_id = %s", (command_id,))
db_conn.commit()
channel.basic_consume(queue="ui_commands", on_message_callback=callback, auto_ack=True)
channel.start_consuming()
- UI (in Swift)
- Functionality:
- Listens to the
ui_updates
queue. - Upon receiving a notification, queries the
updates
table and updates the interface (e.g., adds a new message). - When the user performs an action (e.g., selects a chat), writes the command to the
commands
table and publishes a notification to theui_commands
queue.
- Listens to the
- Swift example with Redis (using SwiftRedis):
- Functionality:
import SwiftRedis
import Foundation
import SwiftUI
class ChatViewModel: ObservableObject {
@Published var messages: [Message] = []
let redis = Redis()
let db = Database() // Connection to PostgreSQL or CoreData
init() {
redis.connect(host: "localhost", port: 6379) { _ in
redis.subscribe("ui_updates") { message in
let update = try! JSONDecoder().decode(Update.self, from: message.data)
self.fetchUpdate(updateId: update.updateId)
}
}
}
func fetchUpdate(updateId: Int) {
let update = db.query("SELECT data, chat_id FROM updates WHERE update_id = $1", parameters: [updateId])
if update["type"] == "new_message" {
DispatchQueue.main.async {
self.messages.append(Message(text: update["data"]["text"], chatId: update["chat_id"]))
}
}
}
func selectChat(chatId: String) {
let command = ["type": "select_chat", "chat_id": chatId]
let commandId = db.insert("INSERT INTO commands (type, data, status, timestamp) VALUES ($1, $2, 'pending', NOW()) RETURNING command_id", parameters: ["select_chat", command])
redis.publish("ui_commands", message: ["command_id": commandId, "type": "select_chat"])
}
}
- Resilience
- Persistence: Data in the
commands
andupdates
tables ensures no commands or updates are lost if a component fails. - Recovery: If the UI or chat manager restarts, they can query the database to recover state (e.g., recent messages or pending commands).
- Retries: Configure queues with automatic retries (e.g., in RabbitMQ) to handle temporary failures.
- Persistence: Data in the
- Optimizations
- Database indexes: Create indexes on
commands.command_id
,updates.update_id
, andmessages.chat_id
to speed up queries. - User-specific queues: Use queues specific to session_id or user_id to scale in multi-user systems.
- Data cleanup: Implement a mechanism to delete processed commands and updates after a period (e.g., with a scheduled task).
- Local caching: The UI can use CoreData or an in-memory cache to reduce database queries.
- Database indexes: Create indexes on