Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit 669e338

Browse files
committedJun 19, 2025·
feat: transport layer optimisation
1 parent b6736e1 commit 669e338

File tree

1 file changed

+1008
-23
lines changed

1 file changed

+1008
-23
lines changed
 

‎frontend/src/utils/websocket.js

Lines changed: 1008 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -910,10 +910,10 @@ class BaseCommunicationClient extends IPreswaldCommunicator {
910910
}
911911

912912
/**
913-
* Enhanced connection metrics with ComponentStateManager, MessageEncoder, and bulk operation statistics
914-
*/
913+
* Enhanced connection metrics with ComponentStateManager, MessageEncoder, bulk operations, and transport optimization statistics
914+
*/
915915
getConnectionMetrics() {
916-
return {
916+
const baseMetrics = {
917917
isConnected: this.isConnected,
918918
transport: this.constructor.name,
919919
lastActivity: this.lastActivity,
@@ -932,6 +932,35 @@ class BaseCommunicationClient extends IPreswaldCommunicator {
932932
(this.metrics.lastBulkChanged / this.metrics.lastBulkProcessed * 100).toFixed(1) + '%' : 'N/A'
933933
}
934934
};
935+
936+
// Add transport-specific optimization metrics
937+
if (this.constructor.name === 'WebSocketClient') {
938+
baseMetrics.transportOptimization = {
939+
batchingEnabled: this.batchingEnabled,
940+
queuedMessages: this.messageQueue.length,
941+
maxQueueSize: this.maxQueueSize,
942+
batchSize: this.batchSize,
943+
batchDelay: this.batchDelay,
944+
compression: {
945+
messagesCompressed: this.compressionStats.messagesCompressed,
946+
avgCompressionRatio: (this.compressionStats.avgCompressionRatio * 100).toFixed(1) + '%'
947+
}
948+
};
949+
} else if (this.constructor.name === 'PostMessageClient') {
950+
baseMetrics.transportOptimization = {
951+
batchingEnabled: this.batchingEnabled,
952+
queuedMessages: this.messageQueue.length,
953+
maxQueueSize: this.maxQueueSize,
954+
batchSize: this.batchSize,
955+
batchDelay: this.batchDelay,
956+
serialization: {
957+
messagesOptimized: this.serializationStats.messagesOptimized,
958+
avgSizeReduction: (this.serializationStats.avgSizeReduction * 100).toFixed(1) + '%'
959+
}
960+
};
961+
}
962+
963+
return baseMetrics;
935964
}
936965

937966
/**
@@ -987,6 +1016,22 @@ class WebSocketClient extends BaseCommunicationClient {
9871016
this.reconnectAttempts = 0;
9881017
this.maxReconnectAttempts = 5;
9891018
this.reconnectDelay = 1000;
1019+
1020+
// Enhanced transport optimization features
1021+
this.messageQueue = [];
1022+
this.maxQueueSize = 1000;
1023+
this.batchingEnabled = true;
1024+
this.batchTimeout = null;
1025+
this.batchSize = 10;
1026+
this.batchDelay = 16; // ~60fps for UI responsiveness
1027+
1028+
// Compression statistics for monitoring
1029+
this.compressionStats = {
1030+
messagesCompressed: 0,
1031+
totalCompressionRatio: 0,
1032+
avgCompressionRatio: 0
1033+
};
1034+
9901035
// Note: callbacks, componentStates, isConnected, pendingUpdates, connections
9911036
// are now inherited from BaseCommunicationClient
9921037
}
@@ -1205,9 +1250,20 @@ class WebSocketClient extends BaseCommunicationClient {
12051250
async disconnect() {
12061251
if (this.socket) {
12071252
console.log('[WebSocket] Disconnecting...');
1253+
1254+
// Process any pending` batched messages before disconnecting
1255+
if (this.batchTimeout) {
1256+
clearTimeout(this.batchTimeout);
1257+
this.batchTimeout = null;
1258+
this._processBatchedMessages();
1259+
}
1260+
12081261
this.socket.close();
12091262
this.socket = null;
12101263
this._setConnected(false);
1264+
1265+
// Clear message queue
1266+
this.messageQueue = [];
12111267
}
12121268
}
12131269

@@ -1248,21 +1304,120 @@ class WebSocketClient extends BaseCommunicationClient {
12481304

12491305
_sendComponentUpdate(componentId, value) {
12501306
const message = { type: 'component_update', states: { [componentId]: value } };
1307+
1308+
// Enhanced batching for better performance
1309+
if (this.batchingEnabled && this.messageQueue.length < this.maxQueueSize) {
1310+
this.messageQueue.push({ componentId, value, timestamp: performance.now() });
1311+
1312+
// Schedule batch processing if not already scheduled
1313+
if (!this.batchTimeout) {
1314+
this.batchTimeout = setTimeout(() => {
1315+
this._processBatchedMessages();
1316+
}, this.batchDelay);
1317+
}
1318+
1319+
// Process immediately if batch is full
1320+
if (this.messageQueue.length >= this.batchSize) {
1321+
clearTimeout(this.batchTimeout);
1322+
this.batchTimeout = null;
1323+
this._processBatchedMessages();
1324+
}
1325+
1326+
return;
1327+
}
1328+
1329+
// Fallback to immediate sending
1330+
this._sendImmediateMessage(message, componentId, value);
1331+
}
1332+
1333+
_processBatchedMessages() {
1334+
if (this.messageQueue.length === 0) return;
1335+
1336+
try {
1337+
// Group messages by component for deduplication
1338+
const stateUpdates = {};
1339+
this.messageQueue.forEach(({ componentId, value }) => {
1340+
stateUpdates[componentId] = value;
1341+
});
1342+
1343+
const batchMessage = {
1344+
type: 'component_update',
1345+
states: stateUpdates,
1346+
batch: true,
1347+
count: Object.keys(stateUpdates).length
1348+
};
1349+
1350+
this._sendImmediateMessage(batchMessage);
1351+
1352+
// Update local states
1353+
Object.entries(stateUpdates).forEach(([componentId, value]) => {
1354+
this.componentStates[componentId] = value;
1355+
});
1356+
1357+
console.log(`[WebSocket] Sent batched update: ${Object.keys(stateUpdates).length} components from ${this.messageQueue.length} queued messages`);
1358+
1359+
// Clear the queue
1360+
this.messageQueue = [];
1361+
this.batchTimeout = null;
1362+
1363+
} catch (error) {
1364+
console.error('[WebSocket] Error processing batched messages:', error);
1365+
// Fallback to individual sends
1366+
this.messageQueue.forEach(({ componentId, value }) => {
1367+
const message = { type: 'component_update', states: { [componentId]: value } };
1368+
this._sendImmediateMessage(message, componentId, value);
1369+
});
1370+
this.messageQueue = [];
1371+
this.batchTimeout = null;
1372+
}
1373+
}
1374+
1375+
_sendImmediateMessage(message, componentId = null, value = null) {
12511376
try {
1252-
// Use MessageEncoder for consistent formatting, with legacy fallback
1377+
// Enhanced compression with statistics tracking
12531378
let encodedMessage;
1379+
let originalSize, compressedSize;
1380+
12541381
try {
1382+
// Determine if compression should be used
1383+
const jsonString = JSON.stringify(message);
1384+
originalSize = jsonString.length;
1385+
1386+
const shouldCompress = originalSize > MessageEncoder.COMPRESSION_THRESHOLD;
12551387
encodedMessage = MessageEncoder.encodeLegacy(message);
1388+
compressedSize = encodedMessage.length;
1389+
1390+
// Track compression statistics
1391+
if (shouldCompress && compressedSize < originalSize) {
1392+
this.compressionStats.messagesCompressed++;
1393+
const compressionRatio = (originalSize - compressedSize) / originalSize;
1394+
this.compressionStats.totalCompressionRatio += compressionRatio;
1395+
this.compressionStats.avgCompressionRatio =
1396+
this.compressionStats.totalCompressionRatio / this.compressionStats.messagesCompressed;
1397+
}
1398+
12561399
} catch (encodeError) {
12571400
console.warn('[WebSocket] Using legacy JSON encoding:', encodeError.message);
12581401
encodedMessage = JSON.stringify(message);
1402+
compressedSize = encodedMessage.length;
12591403
}
12601404

12611405
this.socket.send(encodedMessage);
1262-
this.componentStates[componentId] = value;
1263-
console.log('[WebSocket] Sent component update:', message);
1406+
1407+
if (componentId && value !== null) {
1408+
this.componentStates[componentId] = value;
1409+
}
1410+
1411+
// Enhanced logging with compression info
1412+
if (originalSize && compressedSize < originalSize) {
1413+
const compressionRatio = ((originalSize - compressedSize) / originalSize * 100).toFixed(1);
1414+
console.log(`[WebSocket] Sent compressed message: ${originalSize}B → ${compressedSize}B (${compressionRatio}% reduction)`);
1415+
} else {
1416+
console.log('[WebSocket] Sent message:', message.type, compressedSize ? `${compressedSize}B` : '');
1417+
}
1418+
12641419
} catch (error) {
1265-
console.error('[WebSocket] Error sending component update:', error);
1420+
console.error('[WebSocket] Error sending message:', error);
12661421
throw error;
12671422
}
12681423
}
@@ -1298,6 +1453,22 @@ class WebSocketClient extends BaseCommunicationClient {
12981453
class PostMessageClient extends BaseCommunicationClient {
12991454
constructor() {
13001455
super();
1456+
1457+
// Enhanced transport optimization features for PostMessage
1458+
this.messageQueue = [];
1459+
this.maxQueueSize = 500; // Smaller queue for PostMessage due to potential parent window limitations
1460+
this.batchingEnabled = true;
1461+
this.batchTimeout = null;
1462+
this.batchSize = 8; // Smaller batch size for PostMessage
1463+
this.batchDelay = 20; // Slightly higher delay for PostMessage
1464+
1465+
// Serialization optimization statistics
1466+
this.serializationStats = {
1467+
messagesOptimized: 0,
1468+
totalSizeReduction: 0,
1469+
avgSizeReduction: 0
1470+
};
1471+
13011472
// Note: callbacks, componentStates, isConnected, pendingUpdates
13021473
// are now inherited from BaseCommunicationClient
13031474
}
@@ -1321,8 +1492,19 @@ class PostMessageClient extends BaseCommunicationClient {
13211492

13221493
async disconnect() {
13231494
console.log('[PostMessage] Disconnecting...');
1495+
1496+
// Process any pending batched messages before disconnecting
1497+
if (this.batchTimeout) {
1498+
clearTimeout(this.batchTimeout);
1499+
this.batchTimeout = null;
1500+
this._processBatchedMessages();
1501+
}
1502+
13241503
window.removeEventListener('message', this._handleMessage.bind(this));
13251504
this._setConnected(false);
1505+
1506+
// Clear message queue
1507+
this.messageQueue = [];
13261508
}
13271509

13281510
_handleMessage(event) {
@@ -1465,30 +1647,193 @@ class PostMessageClient extends BaseCommunicationClient {
14651647
}
14661648

14671649
_sendComponentUpdate(componentId, value) {
1468-
if (window.parent) {
1469-
const message = {
1470-
type: 'component_update',
1471-
id: componentId,
1472-
value,
1650+
if (!window.parent) {
1651+
console.warn('[PostMessage] No parent window to send update');
1652+
return;
1653+
}
1654+
1655+
// Enhanced batching for better performance
1656+
if (this.batchingEnabled && this.messageQueue.length < this.maxQueueSize) {
1657+
this.messageQueue.push({ componentId, value, timestamp: performance.now() });
1658+
1659+
// Schedule batch processing if not already scheduled
1660+
if (!this.batchTimeout) {
1661+
this.batchTimeout = setTimeout(() => {
1662+
this._processBatchedMessages();
1663+
}, this.batchDelay);
1664+
}
1665+
1666+
// Process immediately if batch is full
1667+
if (this.messageQueue.length >= this.batchSize) {
1668+
clearTimeout(this.batchTimeout);
1669+
this.batchTimeout = null;
1670+
this._processBatchedMessages();
1671+
}
1672+
1673+
return;
1674+
}
1675+
1676+
// Fallback to immediate sending
1677+
const message = {
1678+
type: 'component_update',
1679+
id: componentId,
1680+
value,
1681+
};
1682+
1683+
this._sendImmediateMessage(message, componentId, value);
1684+
}
1685+
1686+
_processBatchedMessages() {
1687+
if (this.messageQueue.length === 0) return;
1688+
1689+
try {
1690+
// Group messages by component for deduplication
1691+
const stateUpdates = {};
1692+
this.messageQueue.forEach(({ componentId, value }) => {
1693+
stateUpdates[componentId] = value;
1694+
});
1695+
1696+
const batchMessage = {
1697+
type: 'component_update_batch',
1698+
states: stateUpdates,
1699+
count: Object.keys(stateUpdates).length,
1700+
timestamp: performance.now()
14731701
};
14741702

1475-
// Use MessageEncoder for consistent formatting, with legacy fallback
1703+
this._sendImmediateMessage(batchMessage);
1704+
1705+
// Update local states
1706+
Object.entries(stateUpdates).forEach(([componentId, value]) => {
1707+
this.componentStates[componentId] = value;
1708+
});
1709+
1710+
console.log(`[PostMessage] Sent batched update: ${Object.keys(stateUpdates).length} components from ${this.messageQueue.length} queued messages`);
1711+
1712+
// Clear the queue
1713+
this.messageQueue = [];
1714+
this.batchTimeout = null;
1715+
1716+
} catch (error) {
1717+
console.error('[PostMessage] Error processing batched messages:', error);
1718+
// Fallback to individual sends
1719+
this.messageQueue.forEach(({ componentId, value }) => {
1720+
const message = {
1721+
type: 'component_update',
1722+
id: componentId,
1723+
value,
1724+
};
1725+
this._sendImmediateMessage(message, componentId, value);
1726+
});
1727+
this.messageQueue = [];
1728+
this.batchTimeout = null;
1729+
}
1730+
}
1731+
1732+
_sendImmediateMessage(message, componentId = null, value = null) {
1733+
try {
1734+
// Enhanced JSON serialization with optimization tracking
14761735
let encodedMessage;
1736+
let originalSize, optimizedSize;
1737+
14771738
try {
1478-
encodedMessage = MessageEncoder.encodeLegacy(message);
1739+
// Optimize message structure for PostMessage
1740+
const optimizedMessage = this._optimizeMessageForPostMessage(message);
1741+
const originalJson = JSON.stringify(message);
1742+
const optimizedJson = JSON.stringify(optimizedMessage);
1743+
1744+
originalSize = originalJson.length;
1745+
optimizedSize = optimizedJson.length;
1746+
1747+
// Use MessageEncoder for consistent formatting
1748+
encodedMessage = MessageEncoder.encodeLegacy(optimizedMessage);
1749+
1750+
// Track optimization statistics
1751+
if (optimizedSize < originalSize) {
1752+
this.serializationStats.messagesOptimized++;
1753+
const sizeReduction = (originalSize - optimizedSize) / originalSize;
1754+
this.serializationStats.totalSizeReduction += sizeReduction;
1755+
this.serializationStats.avgSizeReduction =
1756+
this.serializationStats.totalSizeReduction / this.serializationStats.messagesOptimized;
1757+
}
1758+
14791759
} catch (encodeError) {
14801760
console.warn('[PostMessage] Using legacy format:', encodeError.message);
14811761
encodedMessage = message;
1762+
optimizedSize = JSON.stringify(message).length;
14821763
}
14831764

14841765
window.parent.postMessage(encodedMessage, '*');
1485-
this.componentStates[componentId] = value;
1486-
console.log('[PostMessage] Sent component update:', { id: componentId, value });
1487-
} else {
1488-
console.warn('[PostMessage] No parent window to send update');
1766+
1767+
if (componentId && value !== null) {
1768+
this.componentStates[componentId] = value;
1769+
}
1770+
1771+
// Enhanced logging with optimization info
1772+
if (originalSize && optimizedSize < originalSize) {
1773+
const reductionRatio = ((originalSize - optimizedSize) / originalSize * 100).toFixed(1);
1774+
console.log(`[PostMessage] Sent optimized message: ${originalSize}B → ${optimizedSize}B (${reductionRatio}% reduction)`);
1775+
} else {
1776+
console.log('[PostMessage] Sent message:', message.type, optimizedSize ? `${optimizedSize}B` : '');
1777+
}
1778+
1779+
} catch (error) {
1780+
console.error('[PostMessage] Error sending message:', error);
1781+
throw error;
14891782
}
14901783
}
14911784

1785+
_optimizeMessageForPostMessage(message) {
1786+
// PostMessage-specific optimizations
1787+
const optimized = { ...message };
1788+
1789+
// Remove redundant fields for PostMessage transport
1790+
if (optimized.metadata && optimized.metadata.timestamp) {
1791+
// PostMessage doesn't need high-precision timestamps
1792+
optimized.metadata.timestamp = Math.floor(optimized.metadata.timestamp);
1793+
}
1794+
1795+
// Optimize value serialization for common types
1796+
if (optimized.value !== undefined) {
1797+
optimized.value = this._optimizeValue(optimized.value);
1798+
}
1799+
1800+
if (optimized.states) {
1801+
const optimizedStates = {};
1802+
Object.entries(optimized.states).forEach(([key, value]) => {
1803+
optimizedStates[key] = this._optimizeValue(value);
1804+
});
1805+
optimized.states = optimizedStates;
1806+
}
1807+
1808+
return optimized;
1809+
}
1810+
1811+
_optimizeValue(value) {
1812+
// Optimize common value types for JSON serialization
1813+
if (typeof value === 'number') {
1814+
// Round floating point numbers to reasonable precision
1815+
return Math.round(value * 1000000) / 1000000;
1816+
}
1817+
1818+
if (Array.isArray(value)) {
1819+
// Optimize arrays recursively
1820+
return value.map(item => this._optimizeValue(item));
1821+
}
1822+
1823+
if (value && typeof value === 'object') {
1824+
// Remove null/undefined properties
1825+
const optimized = {};
1826+
Object.entries(value).forEach(([key, val]) => {
1827+
if (val !== null && val !== undefined) {
1828+
optimized[key] = this._optimizeValue(val);
1829+
}
1830+
});
1831+
return optimized;
1832+
}
1833+
1834+
return value;
1835+
}
1836+
14921837
// getConnections is inherited from BaseCommunicationClient
14931838
}
14941839

@@ -1685,6 +2030,402 @@ const TransportType = {
16852030
AUTO: 'auto'
16862031
};
16872032

2033+
/**
2034+
* Production-ready Connection Pool Manager
2035+
* Manages multiple connections for load balancing, failover, and performance optimization
2036+
*/
2037+
class ConnectionPoolManager {
2038+
constructor(config = {}) {
2039+
this.config = {
2040+
maxPoolSize: 3,
2041+
minPoolSize: 1,
2042+
healthCheckInterval: 30000, // 30 seconds
2043+
connectionTimeout: 10000,
2044+
retryAttempts: 3,
2045+
retryDelay: 1000,
2046+
loadBalancingStrategy: 'round-robin', // 'round-robin', 'least-connections', 'performance'
2047+
enableFailover: true,
2048+
...config
2049+
};
2050+
2051+
this.connectionPool = new Map(); // connectionId -> connection
2052+
this.connectionMetrics = new Map(); // connectionId -> metrics
2053+
this.activeConnections = new Set();
2054+
this.currentConnectionIndex = 0;
2055+
this.isShuttingDown = false;
2056+
this.healthCheckTimer = null;
2057+
2058+
// Performance tracking
2059+
this.poolMetrics = {
2060+
totalConnections: 0,
2061+
activeConnections: 0,
2062+
failedConnections: 0,
2063+
totalRequests: 0,
2064+
failedRequests: 0,
2065+
avgResponseTime: 0,
2066+
lastHealthCheck: 0
2067+
};
2068+
2069+
console.log('[ConnectionPool] Initialized with config:', this.config);
2070+
}
2071+
2072+
/**
2073+
* Initialize the connection pool
2074+
*/
2075+
async initialize(transportType, transportConfig = {}) {
2076+
console.log(`[ConnectionPool] Initializing pool with ${this.config.minPoolSize} ${transportType} connections`);
2077+
2078+
const initPromises = [];
2079+
for (let i = 0; i < this.config.minPoolSize; i++) {
2080+
const connectionId = `${transportType}_${i}_${Date.now()}`;
2081+
initPromises.push(this._createConnection(connectionId, transportType, transportConfig));
2082+
}
2083+
2084+
const results = await Promise.allSettled(initPromises);
2085+
const successfulConnections = results.filter(result => result.status === 'fulfilled').length;
2086+
2087+
if (successfulConnections === 0) {
2088+
throw new Error('Failed to initialize any connections in the pool');
2089+
}
2090+
2091+
console.log(`[ConnectionPool] Initialized ${successfulConnections}/${this.config.minPoolSize} connections`);
2092+
2093+
// Start health monitoring
2094+
this._startHealthMonitoring();
2095+
2096+
return successfulConnections;
2097+
}
2098+
2099+
/**
2100+
* Get an optimal connection from the pool
2101+
*/
2102+
getConnection() {
2103+
if (this.isShuttingDown || this.activeConnections.size === 0) {
2104+
throw new Error('Connection pool is not available');
2105+
}
2106+
2107+
const activeConnectionsArray = Array.from(this.activeConnections);
2108+
let selectedConnection;
2109+
2110+
switch (this.config.loadBalancingStrategy) {
2111+
case 'round-robin':
2112+
selectedConnection = this._getRoundRobinConnection(activeConnectionsArray);
2113+
break;
2114+
case 'least-connections':
2115+
selectedConnection = this._getLeastConnectionsConnection(activeConnectionsArray);
2116+
break;
2117+
case 'performance':
2118+
selectedConnection = this._getPerformanceBasedConnection(activeConnectionsArray);
2119+
break;
2120+
default:
2121+
selectedConnection = this._getRoundRobinConnection(activeConnectionsArray);
2122+
}
2123+
2124+
if (!selectedConnection) {
2125+
throw new Error('No healthy connections available in pool');
2126+
}
2127+
2128+
this.poolMetrics.totalRequests++;
2129+
this._updateConnectionMetrics(selectedConnection.connectionId, 'request');
2130+
2131+
return selectedConnection.client;
2132+
}
2133+
2134+
/**
2135+
* Add a new connection to the pool
2136+
*/
2137+
async addConnection(transportType, transportConfig = {}) {
2138+
if (this.connectionPool.size >= this.config.maxPoolSize) {
2139+
console.warn('[ConnectionPool] Pool is at maximum capacity');
2140+
return null;
2141+
}
2142+
2143+
const connectionId = `${transportType}_${this.connectionPool.size}_${Date.now()}`;
2144+
return await this._createConnection(connectionId, transportType, transportConfig);
2145+
}
2146+
2147+
/**
2148+
* Remove a connection from the pool
2149+
*/
2150+
async removeConnection(connectionId) {
2151+
const connection = this.connectionPool.get(connectionId);
2152+
if (!connection) {
2153+
console.warn(`[ConnectionPool] Connection ${connectionId} not found`);
2154+
return;
2155+
}
2156+
2157+
console.log(`[ConnectionPool] Removing connection ${connectionId}`);
2158+
2159+
try {
2160+
await connection.client.disconnect();
2161+
} catch (error) {
2162+
console.error(`[ConnectionPool] Error disconnecting ${connectionId}:`, error);
2163+
}
2164+
2165+
this.connectionPool.delete(connectionId);
2166+
this.connectionMetrics.delete(connectionId);
2167+
this.activeConnections.delete(connection);
2168+
this.poolMetrics.totalConnections--;
2169+
}
2170+
2171+
/**
2172+
* Get pool statistics
2173+
*/
2174+
getPoolMetrics() {
2175+
return {
2176+
...this.poolMetrics,
2177+
poolSize: this.connectionPool.size,
2178+
activeConnections: this.activeConnections.size,
2179+
connectionDetails: Array.from(this.connectionPool.entries()).map(([id, conn]) => ({
2180+
connectionId: id,
2181+
transport: conn.client.constructor.name,
2182+
isConnected: conn.client.isConnected,
2183+
metrics: this.connectionMetrics.get(id) || {}
2184+
}))
2185+
};
2186+
}
2187+
2188+
/**
2189+
* Shutdown the connection pool
2190+
*/
2191+
async shutdown() {
2192+
console.log('[ConnectionPool] Shutting down connection pool');
2193+
this.isShuttingDown = true;
2194+
2195+
if (this.healthCheckTimer) {
2196+
clearInterval(this.healthCheckTimer);
2197+
this.healthCheckTimer = null;
2198+
}
2199+
2200+
const disconnectPromises = Array.from(this.connectionPool.values()).map(async (connection) => {
2201+
try {
2202+
await connection.client.disconnect();
2203+
} catch (error) {
2204+
console.error(`[ConnectionPool] Error disconnecting ${connection.connectionId}:`, error);
2205+
}
2206+
});
2207+
2208+
await Promise.allSettled(disconnectPromises);
2209+
2210+
this.connectionPool.clear();
2211+
this.connectionMetrics.clear();
2212+
this.activeConnections.clear();
2213+
2214+
console.log('[ConnectionPool] Shutdown complete');
2215+
}
2216+
2217+
/**
2218+
* Create a new connection
2219+
* @private
2220+
*/
2221+
async _createConnection(connectionId, transportType, transportConfig) {
2222+
console.log(`[ConnectionPool] Creating connection ${connectionId}`);
2223+
2224+
try {
2225+
const client = createTransportClient(transportType, transportConfig);
2226+
const startTime = performance.now();
2227+
2228+
await client.connect(transportConfig);
2229+
2230+
const connectionTime = performance.now() - startTime;
2231+
const connection = {
2232+
connectionId,
2233+
client,
2234+
transportType,
2235+
createdAt: Date.now(),
2236+
connectionTime
2237+
};
2238+
2239+
this.connectionPool.set(connectionId, connection);
2240+
this.activeConnections.add(connection);
2241+
this.poolMetrics.totalConnections++;
2242+
2243+
// Initialize connection metrics
2244+
this.connectionMetrics.set(connectionId, {
2245+
requests: 0,
2246+
errors: 0,
2247+
avgResponseTime: 0,
2248+
lastActivity: Date.now(),
2249+
connectionTime,
2250+
isHealthy: true
2251+
});
2252+
2253+
console.log(`[ConnectionPool] Connection ${connectionId} created successfully in ${connectionTime.toFixed(2)}ms`);
2254+
return connection;
2255+
2256+
} catch (error) {
2257+
console.error(`[ConnectionPool] Failed to create connection ${connectionId}:`, error);
2258+
this.poolMetrics.failedConnections++;
2259+
throw error;
2260+
}
2261+
}
2262+
2263+
/**
2264+
* Round-robin connection selection
2265+
* @private
2266+
*/
2267+
_getRoundRobinConnection(activeConnections) {
2268+
if (activeConnections.length === 0) return null;
2269+
2270+
const connection = activeConnections[this.currentConnectionIndex % activeConnections.length];
2271+
this.currentConnectionIndex = (this.currentConnectionIndex + 1) % activeConnections.length;
2272+
2273+
return connection;
2274+
}
2275+
2276+
/**
2277+
* Least connections selection
2278+
* @private
2279+
*/
2280+
_getLeastConnectionsConnection(activeConnections) {
2281+
if (activeConnections.length === 0) return null;
2282+
2283+
let selectedConnection = activeConnections[0];
2284+
let minRequests = this.connectionMetrics.get(selectedConnection.connectionId)?.requests || 0;
2285+
2286+
for (const connection of activeConnections) {
2287+
const requests = this.connectionMetrics.get(connection.connectionId)?.requests || 0;
2288+
if (requests < minRequests) {
2289+
minRequests = requests;
2290+
selectedConnection = connection;
2291+
}
2292+
}
2293+
2294+
return selectedConnection;
2295+
}
2296+
2297+
/**
2298+
* Performance-based connection selection
2299+
* @private
2300+
*/
2301+
_getPerformanceBasedConnection(activeConnections) {
2302+
if (activeConnections.length === 0) return null;
2303+
2304+
let selectedConnection = activeConnections[0];
2305+
let bestScore = this._calculateConnectionScore(selectedConnection.connectionId);
2306+
2307+
for (const connection of activeConnections) {
2308+
const score = this._calculateConnectionScore(connection.connectionId);
2309+
if (score > bestScore) {
2310+
bestScore = score;
2311+
selectedConnection = connection;
2312+
}
2313+
}
2314+
2315+
return selectedConnection;
2316+
}
2317+
2318+
/**
2319+
* Calculate connection performance score
2320+
* @private
2321+
*/
2322+
_calculateConnectionScore(connectionId) {
2323+
const metrics = this.connectionMetrics.get(connectionId);
2324+
if (!metrics) return 0;
2325+
2326+
// Higher score = better connection
2327+
const latencyFactor = 1000 / Math.max(metrics.avgResponseTime, 1);
2328+
const errorFactor = metrics.requests > 0 ? (1 - (metrics.errors / metrics.requests)) : 1;
2329+
const activityFactor = Math.max(0, 1 - ((Date.now() - metrics.lastActivity) / 60000)); // Decay over 1 minute
2330+
2331+
return latencyFactor * 0.4 + errorFactor * 0.4 + activityFactor * 0.2;
2332+
}
2333+
2334+
/**
2335+
* Update connection metrics
2336+
* @private
2337+
*/
2338+
_updateConnectionMetrics(connectionId, eventType, responseTime = 0) {
2339+
const metrics = this.connectionMetrics.get(connectionId);
2340+
if (!metrics) return;
2341+
2342+
switch (eventType) {
2343+
case 'request':
2344+
metrics.requests++;
2345+
metrics.lastActivity = Date.now();
2346+
break;
2347+
case 'response':
2348+
if (responseTime > 0) {
2349+
const totalTime = metrics.avgResponseTime * (metrics.requests - 1) + responseTime;
2350+
metrics.avgResponseTime = totalTime / metrics.requests;
2351+
}
2352+
break;
2353+
case 'error':
2354+
metrics.errors++;
2355+
break;
2356+
}
2357+
}
2358+
2359+
/**
2360+
* Start health monitoring
2361+
* @private
2362+
*/
2363+
_startHealthMonitoring() {
2364+
if (this.healthCheckTimer) {
2365+
clearInterval(this.healthCheckTimer);
2366+
}
2367+
2368+
this.healthCheckTimer = setInterval(() => {
2369+
this._performHealthCheck();
2370+
}, this.config.healthCheckInterval);
2371+
2372+
console.log(`[ConnectionPool] Health monitoring started (interval: ${this.config.healthCheckInterval}ms)`);
2373+
}
2374+
2375+
/**
2376+
* Perform health check on all connections
2377+
* @private
2378+
*/
2379+
async _performHealthCheck() {
2380+
if (this.isShuttingDown) return;
2381+
2382+
console.log('[ConnectionPool] Performing health check');
2383+
const startTime = performance.now();
2384+
2385+
const healthCheckPromises = Array.from(this.connectionPool.entries()).map(async ([connectionId, connection]) => {
2386+
try {
2387+
const isHealthy = connection.client.isConnected &&
2388+
typeof connection.client.getConnectionMetrics === 'function';
2389+
2390+
const metrics = this.connectionMetrics.get(connectionId);
2391+
if (metrics) {
2392+
metrics.isHealthy = isHealthy;
2393+
}
2394+
2395+
if (!isHealthy) {
2396+
console.warn(`[ConnectionPool] Connection ${connectionId} is unhealthy`);
2397+
this.activeConnections.delete(connection);
2398+
2399+
// Attempt to reconnect if failover is enabled
2400+
if (this.config.enableFailover) {
2401+
try {
2402+
await connection.client.connect();
2403+
this.activeConnections.add(connection);
2404+
console.log(`[ConnectionPool] Connection ${connectionId} reconnected successfully`);
2405+
} catch (error) {
2406+
console.error(`[ConnectionPool] Failed to reconnect ${connectionId}:`, error);
2407+
}
2408+
}
2409+
} else {
2410+
this.activeConnections.add(connection);
2411+
}
2412+
2413+
} catch (error) {
2414+
console.error(`[ConnectionPool] Health check failed for ${connectionId}:`, error);
2415+
this.activeConnections.delete(connection);
2416+
}
2417+
});
2418+
2419+
await Promise.allSettled(healthCheckPromises);
2420+
2421+
const healthCheckTime = performance.now() - startTime;
2422+
this.poolMetrics.lastHealthCheck = Date.now();
2423+
this.poolMetrics.activeConnections = this.activeConnections.size;
2424+
2425+
console.log(`[ConnectionPool] Health check completed in ${healthCheckTime.toFixed(2)}ms - ${this.activeConnections.size}/${this.connectionPool.size} connections healthy`);
2426+
}
2427+
}
2428+
16882429
class TransportSelector {
16892430
static selectOptimalTransport(config = {}) {
16902431
const requestedTransport = config.transport || window.__PRESWALD_CLIENT_TYPE || TransportType.AUTO;
@@ -1774,14 +2515,208 @@ class TransportSelector {
17742515
}
17752516
}
17762517

2518+
/**
2519+
* Pooled Communication Client that uses connection pooling for enhanced performance and reliability
2520+
*/
2521+
class PooledCommunicationClient extends IPreswaldCommunicator {
2522+
constructor(transportType, config = {}) {
2523+
super();
2524+
this.transportType = transportType;
2525+
this.config = config;
2526+
this.connectionPool = null;
2527+
this.isInitialized = false;
2528+
2529+
// Aggregate metrics from all pooled connections
2530+
this.aggregateMetrics = {
2531+
totalRequests: 0,
2532+
totalErrors: 0,
2533+
avgResponseTime: 0,
2534+
poolUtilization: 0
2535+
};
2536+
}
2537+
2538+
async connect(config = {}) {
2539+
if (this.isInitialized) {
2540+
console.log('[PooledClient] Already initialized');
2541+
return { success: true, message: 'Already connected' };
2542+
}
2543+
2544+
console.log('[PooledClient] Initializing connection pool');
2545+
2546+
try {
2547+
const poolConfig = {
2548+
maxPoolSize: config.maxPoolSize || 3,
2549+
minPoolSize: config.minPoolSize || 1,
2550+
loadBalancingStrategy: config.loadBalancingStrategy || 'performance',
2551+
enableFailover: config.enableFailover !== false,
2552+
...config.poolConfig
2553+
};
2554+
2555+
this.connectionPool = new ConnectionPoolManager(poolConfig);
2556+
const connectionsCreated = await this.connectionPool.initialize(this.transportType, config);
2557+
2558+
this.isInitialized = true;
2559+
this.isConnected = true;
2560+
2561+
console.log(`[PooledClient] Initialized with ${connectionsCreated} connections`);
2562+
return { success: true, message: `Connected with ${connectionsCreated} pooled connections` };
2563+
2564+
} catch (error) {
2565+
console.error('[PooledClient] Failed to initialize connection pool:', error);
2566+
throw error;
2567+
}
2568+
}
2569+
2570+
async disconnect() {
2571+
if (this.connectionPool) {
2572+
console.log('[PooledClient] Shutting down connection pool');
2573+
await this.connectionPool.shutdown();
2574+
this.connectionPool = null;
2575+
}
2576+
this.isInitialized = false;
2577+
this.isConnected = false;
2578+
}
2579+
2580+
getComponentState(componentId) {
2581+
if (!this.isInitialized) {
2582+
throw new Error('PooledClient not initialized');
2583+
}
2584+
2585+
try {
2586+
const connection = this.connectionPool.getConnection();
2587+
return connection.getComponentState(componentId);
2588+
} catch (error) {
2589+
console.error('[PooledClient] Error getting component state:', error);
2590+
throw error;
2591+
}
2592+
}
2593+
2594+
async updateComponentState(componentId, value) {
2595+
if (!this.isInitialized) {
2596+
throw new Error('PooledClient not initialized');
2597+
}
2598+
2599+
const startTime = performance.now();
2600+
2601+
try {
2602+
const connection = this.connectionPool.getConnection();
2603+
const result = await connection.updateComponentState(componentId, value);
2604+
2605+
const responseTime = performance.now() - startTime;
2606+
this._updateAggregateMetrics('success', responseTime);
2607+
2608+
return result;
2609+
} catch (error) {
2610+
this._updateAggregateMetrics('error', performance.now() - startTime);
2611+
console.error('[PooledClient] Error updating component state:', error);
2612+
throw error;
2613+
}
2614+
}
2615+
2616+
async bulkStateUpdate(updates) {
2617+
if (!this.isInitialized) {
2618+
throw new Error('PooledClient not initialized');
2619+
}
2620+
2621+
const startTime = performance.now();
2622+
2623+
try {
2624+
const connection = this.connectionPool.getConnection();
2625+
const result = await connection.bulkStateUpdate(updates);
2626+
2627+
const responseTime = performance.now() - startTime;
2628+
this._updateAggregateMetrics('success', responseTime);
2629+
2630+
return result;
2631+
} catch (error) {
2632+
this._updateAggregateMetrics('error', performance.now() - startTime);
2633+
console.error('[PooledClient] Error in bulk state update:', error);
2634+
throw error;
2635+
}
2636+
}
2637+
2638+
subscribe(callback) {
2639+
if (!this.isInitialized) {
2640+
throw new Error('PooledClient not initialized');
2641+
}
2642+
2643+
// Subscribe to all connections in the pool
2644+
const unsubscribeFunctions = [];
2645+
2646+
for (const connection of this.connectionPool.activeConnections) {
2647+
try {
2648+
const unsubscribe = connection.client.subscribe(callback);
2649+
unsubscribeFunctions.push(unsubscribe);
2650+
} catch (error) {
2651+
console.error('[PooledClient] Error subscribing to connection:', error);
2652+
}
2653+
}
2654+
2655+
// Return a function that unsubscribes from all connections
2656+
return () => {
2657+
unsubscribeFunctions.forEach(unsubscribe => {
2658+
try {
2659+
unsubscribe();
2660+
} catch (error) {
2661+
console.error('[PooledClient] Error unsubscribing:', error);
2662+
}
2663+
});
2664+
};
2665+
}
2666+
2667+
getConnectionMetrics() {
2668+
if (!this.isInitialized || !this.connectionPool) {
2669+
return {
2670+
isConnected: false,
2671+
transport: 'PooledClient',
2672+
error: 'Not initialized'
2673+
};
2674+
}
2675+
2676+
const poolMetrics = this.connectionPool.getPoolMetrics();
2677+
2678+
return {
2679+
isConnected: this.isConnected,
2680+
transport: 'PooledClient',
2681+
transportType: this.transportType,
2682+
poolMetrics,
2683+
aggregateMetrics: this.aggregateMetrics,
2684+
connectionCount: poolMetrics.activeConnections,
2685+
poolUtilization: poolMetrics.activeConnections / poolMetrics.poolSize * 100
2686+
};
2687+
}
2688+
2689+
getConnections() {
2690+
if (!this.isInitialized || !this.connectionPool) {
2691+
return [];
2692+
}
2693+
2694+
return this.connectionPool.getPoolMetrics().connectionDetails;
2695+
}
2696+
2697+
_updateAggregateMetrics(type, responseTime) {
2698+
this.aggregateMetrics.totalRequests++;
2699+
2700+
if (type === 'error') {
2701+
this.aggregateMetrics.totalErrors++;
2702+
}
2703+
2704+
// Update rolling average response time
2705+
const totalTime = this.aggregateMetrics.avgResponseTime * (this.aggregateMetrics.totalRequests - 1) + responseTime;
2706+
this.aggregateMetrics.avgResponseTime = totalTime / this.aggregateMetrics.totalRequests;
2707+
}
2708+
}
2709+
17772710
/**
17782711
* Enhanced factory function to create the appropriate communication client
1779-
* Includes optimized transport selection with environment detection
2712+
* Includes optimized transport selection with environment detection and optional connection pooling
17802713
*
17812714
* @param {Object} config - Configuration for the communicator
17822715
* @param {string} config.transport - Transport type ('websocket', 'postmessage', 'comlink', 'auto')
17832716
* @param {number} config.connectionTimeout - Connection timeout in ms (default: 10000)
17842717
* @param {boolean} config.enableWorkers - Allow worker-based transports (default: true)
2718+
* @param {boolean} config.enableConnectionPooling - Enable connection pooling (default: false for compatibility)
2719+
* @param {Object} config.poolConfig - Connection pool configuration
17852720
* @returns {IPreswaldCommunicator} - Enhanced communication interface
17862721
*/
17872722
export const createCommunicationLayer = (config = {}) => {
@@ -1795,6 +2730,12 @@ export const createCommunicationLayer = (config = {}) => {
17952730
enableWorkers: true,
17962731
retryAttempts: 3,
17972732
retryDelay: 1000,
2733+
enableConnectionPooling: false, // Default to false for backward compatibility
2734+
poolConfig: {
2735+
maxPoolSize: 3,
2736+
minPoolSize: 1,
2737+
loadBalancingStrategy: 'performance'
2738+
},
17982739
...config
17992740
};
18002741

@@ -1804,8 +2745,18 @@ export const createCommunicationLayer = (config = {}) => {
18042745
const selectedTransport = TransportSelector.selectOptimalTransport(enhancedConfig);
18052746
console.log('[CommunicationFactory] Selected transport:', selectedTransport);
18062747

1807-
// Create the appropriate client
1808-
const client = createTransportClient(selectedTransport, enhancedConfig);
2748+
let client;
2749+
2750+
// Create pooled or single client based on configuration
2751+
if (enhancedConfig.enableConnectionPooling && selectedTransport === TransportType.WEBSOCKET) {
2752+
console.log('[CommunicationFactory] Creating pooled communication client');
2753+
client = new PooledCommunicationClient(selectedTransport, enhancedConfig);
2754+
} else {
2755+
if (enhancedConfig.enableConnectionPooling) {
2756+
console.warn(`[CommunicationFactory] Connection pooling not supported for ${selectedTransport}, falling back to single connection`);
2757+
}
2758+
client = createTransportClient(selectedTransport, enhancedConfig);
2759+
}
18092760

18102761
// Validate interface compliance
18112762
if (!(client instanceof IPreswaldCommunicator)) {
@@ -1880,6 +2831,38 @@ export const createCommunicationLayerWithTransport = (transport, config = {}) =>
18802831
return createCommunicationLayer({ ...config, transport });
18812832
};
18822833

2834+
/**
2835+
* Create a production-ready communication layer with connection pooling enabled
2836+
* Optimized for high-throughput applications serving millions of users
2837+
*
2838+
* @param {Object} config - Configuration for the pooled communicator
2839+
* @param {number} config.maxPoolSize - Maximum number of connections in pool (default: 3)
2840+
* @param {number} config.minPoolSize - Minimum number of connections in pool (default: 1)
2841+
* @param {string} config.loadBalancingStrategy - Load balancing strategy ('round-robin', 'least-connections', 'performance')
2842+
* @param {boolean} config.enableFailover - Enable automatic failover (default: true)
2843+
* @returns {IPreswaldCommunicator} - Pooled communication interface
2844+
*/
2845+
export const createProductionCommunicationLayer = (config = {}) => {
2846+
const productionConfig = {
2847+
enableConnectionPooling: true,
2848+
transport: TransportType.WEBSOCKET, // Force WebSocket for production pooling
2849+
poolConfig: {
2850+
maxPoolSize: 3,
2851+
minPoolSize: 2, // Higher minimum for production
2852+
loadBalancingStrategy: 'performance',
2853+
enableFailover: true,
2854+
healthCheckInterval: 15000, // More frequent health checks
2855+
...config.poolConfig
2856+
},
2857+
connectionTimeout: 8000, // Faster timeout for production
2858+
retryAttempts: 5, // More retries for production
2859+
...config
2860+
};
2861+
2862+
console.log('[ProductionFactory] Creating production communication layer with pooling');
2863+
return createCommunicationLayer(productionConfig);
2864+
};
2865+
18832866
/**
18842867
* Legacy export - maintain backwards compatibility
18852868
* @deprecated Use createCommunicationLayer() directly
@@ -1890,12 +2873,14 @@ export const createCommunicator = createCommunicationLayer;
18902873
export const comm = createCommunicationLayer();
18912874

18922875
/**
1893-
* Export MessageEncoder, ComponentStateManager and enhanced communication types for external use
1894-
* Allows applications to use standardized message formatting, bulk state management, and transport selection
2876+
* Export MessageEncoder, ComponentStateManager, ConnectionPoolManager and enhanced communication types for external use
2877+
* Allows applications to use standardized message formatting, bulk state management, transport selection, and connection pooling
18952878
*/
18962879
export {
18972880
MessageEncoder,
18982881
ComponentStateManager,
2882+
ConnectionPoolManager,
2883+
PooledCommunicationClient,
18992884
MessageType,
19002885
TransportType
19012886
};

0 commit comments

Comments
 (0)
Please sign in to comment.