WebSocket Connection Setup
DBX provides a WebSocket API for real-time, bidirectional communication. WebSocket connections offer lower latency and better performance for applications requiring real-time updates or frequent data exchange.
Overview
The WebSocket API allows you to establish persistent connections to DBX for real-time operations. This is ideal for applications that need:
- Real-time data synchronization
- Live updates and notifications
- High-frequency operations
- Reduced connection overhead
Connection Endpoints
WebSocket URL
ws://localhost:8080/ws
wss://your-domain.com/ws # For secure connections
Connection Parameters
You can pass connection parameters as query strings:
ws://localhost:8080/ws?api_key=YOUR_API_KEY&client_id=my-app
api_key
- API key for authenticationclient_id
- Optional client identifier for trackingversion
- API version (default:1.0
)
Establishing a Connection
JavaScript Example
class DBXWebSocket {
constructor(url, options = {}) {
this.url = url;
this.options = options;
this.ws = null;
this.messageId = 0;
this.pendingRequests = new Map();
this.eventHandlers = new Map();
}
connect() {
return new Promise((resolve, reject) => {
const wsUrl = new URL(this.url);
// Add query parameters
if (this.options.apiKey) {
wsUrl.searchParams.set("api_key", this.options.apiKey);
}
if (this.options.clientId) {
wsUrl.searchParams.set("client_id", this.options.clientId);
}
this.ws = new WebSocket(wsUrl.toString());
this.ws.onopen = () => {
console.log("WebSocket connected");
this.startHeartbeat();
resolve();
};
this.ws.onmessage = (event) => {
this.handleMessage(JSON.parse(event.data));
};
this.ws.onclose = (event) => {
console.log("WebSocket disconnected:", event.code, event.reason);
this.stopHeartbeat();
this.reconnect();
};
this.ws.onerror = (error) => {
console.error("WebSocket error:", error);
reject(error);
};
});
}
disconnect() {
if (this.ws) {
this.ws.close(1000, "Client disconnect");
}
}
// ... rest of the implementation
}
Python Example
import asyncio
import websockets
import json
import uuid
class DBXWebSocket:
def __init__(self, url, api_key=None, client_id=None):
self.url = url
self.api_key = api_key
self.client_id = client_id
self.websocket = None
self.message_id = 0
self.pending_requests = {}
self.event_handlers = {}
async def connect(self):
# Build connection URL with parameters
params = []
if self.api_key:
params.append(f"api_key={self.api_key}")
if self.client_id:
params.append(f"client_id={self.client_id}")
if params:
url = f"{self.url}?{'&'.join(params)}"
else:
url = self.url
self.websocket = await websockets.connect(url)
print("WebSocket connected")
# Start message handler
asyncio.create_task(self.message_handler())
async def disconnect(self):
if self.websocket:
await self.websocket.close()
print("WebSocket disconnected")
async def message_handler(self):
async for message in self.websocket:
data = json.loads(message)
await self.handle_message(data)
# ... rest of the implementation
Message Format
All WebSocket messages follow a consistent JSON format:
Request Format
{
"id": "msg_1234567890",
"method": "string.get",
"params": {
"key": "my-key"
},
"timestamp": "2024-01-15T10:30:00Z"
}
Response Format
{
"id": "msg_1234567890",
"result": {
"value": "Hello, World!",
"ttl": 3600
},
"timestamp": "2024-01-15T10:30:00Z"
}
Error Format
{
"id": "msg_1234567890",
"error": {
"code": "not_found",
"message": "Key 'my-key' not found"
},
"timestamp": "2024-01-15T10:30:00Z"
}
Authentication
API Key Authentication
// Connect with API key
const dbx = new DBXWebSocket("ws://localhost:8080/ws", {
apiKey: "your-api-key-here",
});
await dbx.connect();
JWT Authentication
// Connect with JWT token
const dbx = new DBXWebSocket("ws://localhost:8080/ws", {
apiKey: "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...",
});
await dbx.connect();
Connection Management
Heartbeat
DBX sends periodic heartbeat messages to keep connections alive:
{
"type": "heartbeat",
"timestamp": "2024-01-15T10:30:00Z"
}
startHeartbeat() {
this.heartbeatInterval = setInterval(() => {
if (this.ws.readyState === WebSocket.OPEN) {
this.ws.send(JSON.stringify({
type: 'heartbeat',
timestamp: new Date().toISOString()
}));
}
}, 30000); // Send heartbeat every 30 seconds
}
stopHeartbeat() {
if (this.heartbeatInterval) {
clearInterval(this.heartbeatInterval);
this.heartbeatInterval = null;
}
}
Reconnection
Implement automatic reconnection with exponential backoff:
async reconnect() {
const maxRetries = 5;
const baseDelay = 1000;
for (let attempt = 0; attempt < maxRetries; attempt++) {
const delay = baseDelay * Math.pow(2, attempt);
console.log(`Reconnecting in ${delay}ms (attempt ${attempt + 1})`);
await new Promise(resolve => setTimeout(resolve, delay));
try {
await this.connect();
console.log('Reconnected successfully');
return;
} catch (error) {
console.error(`Reconnection attempt ${attempt + 1} failed:`, error);
}
}
console.error('Failed to reconnect after maximum attempts');
}
Connection State Management
class DBXWebSocket {
constructor(url, options = {}) {
this.url = url;
this.options = options;
this.ws = null;
this.state = "disconnected"; // disconnected, connecting, connected, reconnecting
this.reconnectAttempts = 0;
this.maxReconnectAttempts = 5;
}
getState() {
return this.state;
}
isConnected() {
return this.state === "connected" && this.ws && this.ws.readyState === WebSocket.OPEN;
}
async connect() {
if (this.state === "connecting" || this.state === "connected") {
return;
}
this.state = "connecting";
try {
// ... connection logic
this.state = "connected";
this.reconnectAttempts = 0;
} catch (error) {
this.state = "disconnected";
throw error;
}
}
}
Event Handling
Message Events
class DBXWebSocket {
constructor(url, options = {}) {
this.eventHandlers = new Map();
}
on(event, handler) {
if (!this.eventHandlers.has(event)) {
this.eventHandlers.set(event, []);
}
this.eventHandlers.get(event).push(handler);
}
off(event, handler) {
if (this.eventHandlers.has(event)) {
const handlers = this.eventHandlers.get(event);
const index = handlers.indexOf(handler);
if (index > -1) {
handlers.splice(index, 1);
}
}
}
emit(event, data) {
if (this.eventHandlers.has(event)) {
this.eventHandlers.get(event).forEach((handler) => {
try {
handler(data);
} catch (error) {
console.error(`Error in event handler for ${event}:`, error);
}
});
}
}
handleMessage(data) {
if (data.type === "heartbeat") {
this.emit("heartbeat", data);
return;
}
if (data.id && this.pendingRequests.has(data.id)) {
const { resolve, reject } = this.pendingRequests.get(data.id);
this.pendingRequests.delete(data.id);
if (data.error) {
reject(new Error(data.error.message));
} else {
resolve(data.result);
}
} else {
// Handle unsolicited messages (notifications, etc.)
this.emit("message", data);
}
}
}
Usage Example
const dbx = new DBXWebSocket("ws://localhost:8080/ws", {
apiKey: "your-api-key",
});
// Set up event handlers
dbx.on("connect", () => {
console.log("Connected to DBX");
});
dbx.on("disconnect", () => {
console.log("Disconnected from DBX");
});
dbx.on("heartbeat", (data) => {
console.log("Received heartbeat:", data.timestamp);
});
dbx.on("message", (data) => {
console.log("Received message:", data);
});
// Connect
await dbx.connect();
Error Handling
Connection Errors
class DBXWebSocket {
constructor(url, options = {}) {
this.errorHandlers = new Map();
}
onError(type, handler) {
this.errorHandlers.set(type, handler);
}
handleError(type, error) {
if (this.errorHandlers.has(type)) {
this.errorHandlers.get(type)(error);
} else {
console.error(`Unhandled ${type} error:`, error);
}
}
connect() {
return new Promise((resolve, reject) => {
this.ws = new WebSocket(this.url);
this.ws.onerror = (error) => {
this.handleError("connection", error);
reject(error);
};
this.ws.onclose = (event) => {
if (event.code !== 1000) {
this.handleError("disconnect", {
code: event.code,
reason: event.reason,
});
}
};
// ... rest of connection logic
});
}
}
Message Errors
async sendRequest(method, params = {}) {
if (!this.isConnected()) {
throw new Error('WebSocket not connected');
}
const id = `msg_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
return new Promise((resolve, reject) => {
const message = {
id,
method,
params,
timestamp: new Date().toISOString()
};
this.pendingRequests.set(id, { resolve, reject });
// Set timeout for request
setTimeout(() => {
if (this.pendingRequests.has(id)) {
this.pendingRequests.delete(id);
reject(new Error('Request timeout'));
}
}, 30000); // 30 second timeout
this.ws.send(JSON.stringify(message));
});
}
Performance Considerations
Connection Pooling
For high-throughput applications, consider connection pooling:
class DBXWebSocketPool {
constructor(url, options = {}) {
this.url = url;
this.options = options;
this.pool = [];
this.maxConnections = options.maxConnections || 5;
this.currentConnections = 0;
}
async getConnection() {
// Return existing available connection
for (const conn of this.pool) {
if (conn.isAvailable()) {
return conn;
}
}
// Create new connection if under limit
if (this.currentConnections < this.maxConnections) {
const conn = new DBXWebSocket(this.url, this.options);
await conn.connect();
this.pool.push(conn);
this.currentConnections++;
return conn;
}
// Wait for available connection
return new Promise((resolve) => {
const checkPool = () => {
for (const conn of this.pool) {
if (conn.isAvailable()) {
resolve(conn);
return;
}
}
setTimeout(checkPool, 100);
};
checkPool();
});
}
}
Message Batching
For multiple operations, use batch requests:
async batchOperations(operations) {
return this.sendRequest('batch', { operations });
}
// Usage
const results = await dbx.batchOperations([
{ method: 'string.get', params: { key: 'key1' } },
{ method: 'string.set', params: { key: 'key2', value: 'value2' } },
{ method: 'hash.get', params: { key: 'hash1' } }
]);
Security Considerations
TLS/SSL
Always use secure WebSocket connections in production:
// Use wss:// for secure connections
const dbx = new DBXWebSocket("wss://your-domain.com/ws", {
apiKey: "your-api-key",
});
Certificate Validation
// For Node.js environments
const WebSocket = require("ws");
const dbx = new WebSocket("wss://your-domain.com/ws", {
rejectUnauthorized: true, // Validate SSL certificates
headers: {
Authorization: "Bearer your-api-key",
},
});
Rate Limiting
WebSocket connections are subject to rate limiting:
class DBXWebSocket {
constructor(url, options = {}) {
this.rateLimiter = {
requests: 0,
windowStart: Date.now(),
maxRequests: 1000, // 1000 requests per minute
windowSize: 60000, // 1 minute
};
}
async sendRequest(method, params = {}) {
this.checkRateLimit();
// ... rest of send logic
}
checkRateLimit() {
const now = Date.now();
// Reset window if expired
if (now - this.rateLimiter.windowStart > this.rateLimiter.windowSize) {
this.rateLimiter.requests = 0;
this.rateLimiter.windowStart = now;
}
// Check if limit exceeded
if (this.rateLimiter.requests >= this.rateLimiter.maxRequests) {
throw new Error("Rate limit exceeded");
}
this.rateLimiter.requests++;
}
}
Environment Variables
Variable | Description | Default |
---|---|---|
DBX_WS_ENABLED | Enable WebSocket support | true |
DBX_WS_PATH | WebSocket endpoint path | /ws |
DBX_WS_MAX_CONNECTIONS | Maximum concurrent connections | 1000 |
DBX_WS_HEARTBEAT_INTERVAL | Heartbeat interval in seconds | 30 |
DBX_WS_TIMEOUT | Connection timeout in seconds | 30 |