It’s 2 AM, and I’m debugging what should have been a simple feature. My coffee has gone cold, my eyes are burning, and I’m staring at a network tab that looks like a machine gun firing requests. Every 100 milliseconds, another request. Another request. Another request. The server is screaming, the database is crying, and somewhere in the distance, I can hear my AWS bill climbing.
All I wanted was to build a simple collaborative feature – multiple users viewing the same dashboard, with real-time updates when data changes. You know, like every modern app has these days. How hard could it be?
If you’ve ever found yourself in this situation, pull up a chair. Let me tell you about the day I discovered the Observer Pattern, and how it changed everything I thought I knew about building real-time applications.
The Night Everything Went Wrong
Picture this: It’s 2019, and I’m working at a fintech startup. We’re building a trading platform where multiple traders need to see real-time price updates, order book changes, and execution confirmations. The requirements seemed straightforward enough:
- When a price changes, everyone viewing that asset should see the update immediately
- When someone places an order, it should appear in everyone’s order book
- When a trade executes, all relevant parties should be notified
- Everything needs to happen in “real-time” (whatever that means)
My first instinct was what I now call the “brute force approach.” It went something like this:
// The code that haunts my dreams
class TradingDashboard {
constructor(symbols) {
this.symbols = symbols;
this.prices = {};
this.orderBook = {};
this.trades = [];
// Start the polling madness
this.startPolling();
}
startPolling() {
// Check for price updates every 100ms
setInterval(() => {
this.symbols.forEach(symbol => {
fetch(`/api/prices/${symbol}`)
.then(res => res.json())
.then(data => {
if (data.price !== this.prices[symbol]) {
this.updatePrice(symbol, data.price);
}
});
});
}, 100);
// Check for order book changes every 200ms
setInterval(() => {
this.symbols.forEach(symbol => {
fetch(`/api/orderbook/${symbol}`)
.then(res => res.json())
.then(data => {
if (JSON.stringify(data) !== JSON.stringify(this.orderBook[symbol])) {
this.updateOrderBook(symbol, data);
}
});
});
}, 200);
// Check for new trades every 500ms
setInterval(() => {
fetch(`/api/trades/recent`)
.then(res => res.json())
.then(data => {
const newTrades = data.filter(trade =>
!this.trades.find(t => t.id === trade.id)
);
if (newTrades.length > 0) {
this.addTrades(newTrades);
}
});
}, 500);
}
updatePrice(symbol, price) {
this.prices[symbol] = price;
// Update UI
document.querySelector(`#price-${symbol}`).textContent = price;
// Oh wait, we also need to update the chart
this.updateChart(symbol, price);
// And the position calculator
this.recalculatePositions();
// And trigger any price alerts
this.checkPriceAlerts(symbol, price);
// And... oh no, this is getting messy
}
}
It worked! For exactly one user. With one symbol. On localhost.
The moment we deployed to staging and had five traders monitoring twenty symbols each, the application ground to a halt. Here’s the math that made me question my career choices:
- 5 traders × 20 symbols × 10 price checks per second = 1,000 requests per second
- Add order book checks: +500 requests per second
- Add trade checks: +10 requests per second
- Total: 1,510 requests per second to check if anything changed
But here’s the kicker – 99.9% of these requests returned “nothing changed.” We were DDOSing ourselves to check if we needed to do nothing. The server logs looked like:
[2019-03-15 02:34:15.123] GET /api/prices/AAPL - 200 - {"price": 150.32}
[2019-03-15 02:34:15.223] GET /api/prices/AAPL - 200 - {"price": 150.32}
[2019-03-15 02:34:15.323] GET /api/prices/AAPL - 200 - {"price": 150.32}
[2019-03-15 02:34:15.423] GET /api/prices/AAPL - 200 - {"price": 150.32}
[2019-03-15 02:34:15.523] GET /api/prices/AAPL - 200 - {"price": 150.32}
[2019-03-15 02:34:15.623] GET /api/prices/AAPL - 200 - {"price": 150.33} // Finally, a change!
The CEO called an emergency meeting. The words “unscalable,” “expensive,” and “what were you thinking?” were thrown around. I needed a solution, and I needed it fast.
The Revelation: What If We Flip Everything?
That night, while doom-scrolling through engineering blogs and Stack Overflow, I stumbled upon a post about the Observer Pattern. The concept was so simple it made me angry I hadn’t thought of it:
Instead of constantly asking “did anything change?”, what if we just notify interested parties when something actually changes?
It’s the difference between:
- Checking your mailbox every five minutes to see if mail arrived
- Getting a notification when mail is delivered
The Observer Pattern, I learned, is based on a simple relationship: you have “subjects” (things that change) and “observers” (things that care about those changes). When a subject changes, it notifies all its observers. That’s it. No polling, no waste, just efficient communication.
Let me show you how this transformed our trading platform:
// The Subject - what everyone is watching
class PriceSubject {
constructor(symbol) {
this.symbol = symbol;
this.price = null;
this.observers = new Set(); // Using Set to avoid duplicates
this.metadata = {
lastUpdate: null,
source: null,
volume: 0
};
}
// Subscribe to price updates
attach(observer) {
this.observers.add(observer);
// If we already have a price, immediately update the new observer
if (this.price !== null) {
observer.update({
symbol: this.symbol,
price: this.price,
metadata: this.metadata,
isInitial: true
});
}
console.log(`Observer attached to ${this.symbol}. Total observers: ${this.observers.size}`);
}
// Unsubscribe from price updates
detach(observer) {
this.observers.delete(observer);
console.log(`Observer detached from ${this.symbol}. Total observers: ${this.observers.size}`);
}
// When price changes, notify everyone
updatePrice(newPrice, source = 'exchange') {
const oldPrice = this.price;
this.price = newPrice;
this.metadata = {
lastUpdate: new Date(),
source: source,
volume: this.metadata.volume + 1,
change: oldPrice ? newPrice - oldPrice : 0,
changePercent: oldPrice ? ((newPrice - oldPrice) / oldPrice) * 100 : 0
};
this.notifyObservers({
symbol: this.symbol,
price: newPrice,
oldPrice: oldPrice,
metadata: this.metadata,
isInitial: false
});
}
notifyObservers(priceData) {
this.observers.forEach(observer => {
// Wrap in try-catch so one failing observer doesn't break others
try {
observer.update(priceData);
} catch (error) {
console.error(`Failed to update observer:`, error);
// Optionally remove broken observers
// this.detach(observer);
}
});
}
}
// The Observer - components that need price updates
class PriceDisplay {
constructor(elementId) {
this.element = document.getElementById(elementId);
this.lastPrice = null;
}
update(priceData) {
const { price, oldPrice, metadata } = priceData;
// Update the display
this.element.textContent = `$${price.toFixed(2)}`;
// Add visual feedback for price changes
if (oldPrice !== null) {
this.element.classList.remove('price-up', 'price-down');
if (price > oldPrice) {
this.element.classList.add('price-up');
this.flashAnimation('#4CAF50'); // Green flash
} else if (price < oldPrice) {
this.element.classList.add('price-down');
this.flashAnimation('#F44336'); // Red flash
}
}
// Update metadata display
if (this.element.dataset.showMeta === 'true') {
const metaElement = this.element.querySelector('.metadata');
metaElement.innerHTML = `
<span class="change ${metadata.change >= 0 ? 'positive' : 'negative'}">
${metadata.change >= 0 ? '+' : ''}${metadata.change.toFixed(2)}
(${metadata.changePercent.toFixed(2)}%)
</span>
<span class="time">${metadata.lastUpdate.toLocaleTimeString()}</span>
`;
}
this.lastPrice = price;
}
flashAnimation(color) {
this.element.animate([
{ backgroundColor: color, opacity: 0.3 },
{ backgroundColor: 'transparent', opacity: 1 }
], {
duration: 500,
easing: 'ease-out'
});
}
}
class ChartObserver {
constructor(chart) {
this.chart = chart;
this.priceHistory = [];
this.maxPoints = 100;
}
update(priceData) {
const point = {
x: priceData.metadata.lastUpdate,
y: priceData.price,
volume: priceData.metadata.volume
};
this.priceHistory.push(point);
// Keep only the last N points
if (this.priceHistory.length > this.maxPoints) {
this.priceHistory.shift();
}
// Update the chart
this.chart.updateSeries([{
name: priceData.symbol,
data: this.priceHistory
}]);
// Add price change annotation for significant moves
if (Math.abs(priceData.metadata.changePercent) > 1) {
this.chart.addPointAnnotation({
x: point.x,
y: point.y,
label: {
text: `${priceData.metadata.changePercent > 0 ? '↑' : '↓'} ${Math.abs(priceData.metadata.changePercent).toFixed(2)}%`,
style: {
color: priceData.metadata.changePercent > 0 ? '#4CAF50' : '#F44336'
}
}
});
}
}
}
class AlertObserver {
constructor(userId, conditions) {
this.userId = userId;
this.conditions = conditions; // e.g., { above: 151, below: 149 }
this.triggered = new Set(); // Prevent duplicate alerts
}
update(priceData) {
const { symbol, price } = priceData;
// Check if price crossed above threshold
if (this.conditions.above && price >= this.conditions.above) {
const alertKey = `${symbol}-above-${this.conditions.above}`;
if (!this.triggered.has(alertKey)) {
this.sendAlert({
type: 'price_above',
message: `${symbol} is now above $${this.conditions.above} at $${price}`,
severity: 'info'
});
this.triggered.add(alertKey);
}
}
// Check if price crossed below threshold
if (this.conditions.below && price <= this.conditions.below) {
const alertKey = `${symbol}-below-${this.conditions.below}`;
if (!this.triggered.has(alertKey)) {
this.sendAlert({
type: 'price_below',
message: `${symbol} is now below $${this.conditions.below} at $${price}`,
severity: 'warning'
});
this.triggered.add(alertKey);
}
}
// Reset triggers if price moves back
if (price < this.conditions.above - 0.5) {
this.triggered.delete(`${symbol}-above-${this.conditions.above}`);
}
if (price > this.conditions.below + 0.5) {
this.triggered.delete(`${symbol}-below-${this.conditions.below}`);
}
}
sendAlert(alert) {
// Send push notification
if ('Notification' in window && Notification.permission === 'granted') {
new Notification(alert.message, {
icon: '/assets/price-alert.png',
badge: '/assets/badge.png',
tag: alert.type,
requireInteraction: alert.severity === 'warning'
});
}
// Also send to server for email/SMS
fetch('/api/alerts', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
userId: this.userId,
alert: alert,
timestamp: new Date()
})
});
}
}
The transformation was immediate and dramatic. Here’s how we integrated it with our real-time data feed:
// WebSocket integration for real-time updates
class TradingPlatform {
constructor() {
this.priceSubjects = new Map(); // symbol -> PriceSubject
this.socket = null;
this.reconnectAttempts = 0;
this.maxReconnectAttempts = 5;
this.reconnectDelay = 1000; // Start with 1 second
this.connect();
}
connect() {
console.log('Connecting to real-time feed...');
this.socket = new WebSocket('wss://api.trading-platform.com/feed');
this.socket.onopen = () => {
console.log('Connected to real-time feed');
this.reconnectAttempts = 0;
this.reconnectDelay = 1000;
// Subscribe to symbols we're tracking
this.priceSubjects.forEach((subject, symbol) => {
this.subscribeToSymbol(symbol);
});
};
this.socket.onmessage = (event) => {
const data = JSON.parse(event.data);
switch (data.type) {
case 'price_update':
this.handlePriceUpdate(data);
break;
case 'trade_executed':
this.handleTradeUpdate(data);
break;
case 'order_book_update':
this.handleOrderBookUpdate(data);
break;
case 'connection_established':
console.log('Feed connection established:', data.sessionId);
break;
default:
console.warn('Unknown message type:', data.type);
}
};
this.socket.onerror = (error) => {
console.error('WebSocket error:', error);
};
this.socket.onclose = () => {
console.log('Disconnected from real-time feed');
this.attemptReconnect();
};
}
attemptReconnect() {
if (this.reconnectAttempts >= this.maxReconnectAttempts) {
console.error('Max reconnection attempts reached');
this.notifyConnectionFailure();
return;
}
this.reconnectAttempts++;
console.log(`Reconnecting in ${this.reconnectDelay}ms (attempt ${this.reconnectAttempts})`);
setTimeout(() => {
this.connect();
this.reconnectDelay *= 2; // Exponential backoff
}, this.reconnectDelay);
}
subscribeToSymbol(symbol) {
if (this.socket.readyState === WebSocket.OPEN) {
this.socket.send(JSON.stringify({
action: 'subscribe',
symbol: symbol,
types: ['price', 'trades', 'orderbook']
}));
}
}
handlePriceUpdate(data) {
const { symbol, price, timestamp, source } = data;
// Get or create price subject
let priceSubject = this.priceSubjects.get(symbol);
if (!priceSubject) {
priceSubject = new PriceSubject(symbol);
this.priceSubjects.set(symbol, priceSubject);
}
// Update price - this notifies all observers
priceSubject.updatePrice(price, source);
}
// Public API for components to observe prices
observePrice(symbol, observer) {
let priceSubject = this.priceSubjects.get(symbol);
if (!priceSubject) {
priceSubject = new PriceSubject(symbol);
this.priceSubjects.set(symbol, priceSubject);
// Subscribe to this symbol on the WebSocket
this.subscribeToSymbol(symbol);
}
priceSubject.attach(observer);
// Return unsubscribe function
return () => {
priceSubject.detach(observer);
// If no more observers, we could unsubscribe from WebSocket
if (priceSubject.observers.size === 0) {
this.unsubscribeFromSymbol(symbol);
this.priceSubjects.delete(symbol);
}
};
}
}
// Usage in a React component (showing the pattern, not just vanilla JS)
function PriceTicker({ symbol }) {
const [price, setPrice] = useState(null);
const [change, setChange] = useState(0);
const [isStale, setIsStale] = useState(false);
useEffect(() => {
const platform = window.tradingPlatform; // Global instance
// Create observer
const priceObserver = {
update: (priceData) => {
setPrice(priceData.price);
setChange(priceData.metadata.changePercent);
setIsStale(false);
// Mark as stale if no updates for 5 seconds
clearTimeout(priceObserver.staleTimeout);
priceObserver.staleTimeout = setTimeout(() => {
setIsStale(true);
}, 5000);
}
};
// Start observing
const unsubscribe = platform.observePrice(symbol, priceObserver);
// Cleanup
return () => {
clearTimeout(priceObserver.staleTimeout);
unsubscribe();
};
}, [symbol]);
return (
<div className={`price-ticker ${isStale ? 'stale' : ''}`}>
<span className="symbol">{symbol}</span>
<span className="price">${price?.toFixed(2) || '---'}</span>
<span className={`change ${change >= 0 ? 'positive' : 'negative'}`}>
{change >= 0 ? '↑' : '↓'} {Math.abs(change).toFixed(2)}%
</span>
{isStale && <span className="warning">⚠️</span>}
</div>
);
}
The Results Were Mind-Blowing
Remember those 1,510 requests per second? Here’s what happened after implementing the Observer Pattern:
Before (Polling):
- Requests per second: 1,510
- Average latency: 45ms (server overwhelmed)
- CPU usage (server): 78%
- Monthly AWS bill: $4,200
- User complaints: “The app feels sluggish”
After (Observer Pattern):
- WebSocket messages per second: ~15 (only actual changes)
- Average latency: 2ms
- CPU usage (server): 12%
- Monthly AWS bill: $850
- User feedback: “How is this so fast?”
But the benefits went beyond just performance. The code became dramatically simpler and more maintainable. Adding a new feature that needed price updates was as simple as creating a new observer:
// Want to add sound alerts? Just create an observer!
class SoundAlertObserver {
constructor(enabledSymbols = []) {
this.enabledSymbols = new Set(enabledSymbols);
this.audioContext = new (window.AudioContext || window.webkitAudioContext)();
this.sounds = {
priceUp: this.createSound(800, 0.1), // Higher pitch
priceDown: this.createSound(400, 0.1) // Lower pitch
};
}
update(priceData) {
if (!this.enabledSymbols.has(priceData.symbol)) return;
const { change } = priceData.metadata;
if (Math.abs(change) > 0.5) { // Only alert on significant moves
this.playSound(change > 0 ? 'priceUp' : 'priceDown');
}
}
createSound(frequency, duration) {
return () => {
const oscillator = this.audioContext.createOscillator();
const gainNode = this.audioContext.createGain();
oscillator.connect(gainNode);
gainNode.connect(this.audioContext.destination);
oscillator.frequency.value = frequency;
gainNode.gain.setValueAtTime(0.3, this.audioContext.currentTime);
gainNode.gain.exponentialRampToValueAtTime(0.01, this.audioContext.currentTime + duration);
oscillator.start(this.audioContext.currentTime);
oscillator.stop(this.audioContext.currentTime + duration);
};
}
playSound(type) {
this.sounds[type]();
}
}
// Want to log all price changes? Another observer!
class PriceLogger {
constructor() {
this.log = [];
this.maxLogSize = 10000;
}
update(priceData) {
const logEntry = {
timestamp: new Date(),
symbol: priceData.symbol,
price: priceData.price,
change: priceData.metadata.change,
source: priceData.metadata.source
};
this.log.push(logEntry);
// Keep log size manageable
if (this.log.length > this.maxLogSize) {
this.log.shift();
}
// Persist to backend every 100 updates
if (this.log.length % 100 === 0) {
this.persistLog();
}
}
persistLog() {
const logsToSend = this.log.slice(-100);
fetch('/api/price-logs', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify(logsToSend)
}).catch(error => {
console.error('Failed to persist logs:', error);
});
}
getRecentLogs(count = 50) {
return this.log.slice(-count);
}
}
Deep Dive: How the Observer Pattern Actually Works
Now that you’ve seen the transformation, let’s dive deep into the mechanics. The Observer Pattern is built on a few key principles that make it so powerful:
1. Separation of Concerns
The subject doesn’t need to know anything about what the observers do with the updates. It just knows it needs to notify them. This separation is crucial:
// The subject doesn't care if you're updating UI, playing sounds, or sending emails
class EventEmitter {
constructor() {
this.events = new Map();
}
on(event, handler) {
if (!this.events.has(event)) {
this.events.set(event, new Set());
}
this.events.get(event).add(handler);
// Return unsubscribe function
return () => this.off(event, handler);
}
off(event, handler) {
const handlers = this.events.get(event);
if (handlers) {
handlers.delete(handler);
if (handlers.size === 0) {
this.events.delete(event);
}
}
}
emit(event, data) {
const handlers = this.events.get(event);
if (handlers) {
handlers.forEach(handler => {
try {
handler(data);
} catch (error) {
console.error(`Error in event handler for ${event}:`, error);
}
});
}
}
// Utility method to see what's registered
listenerCount(event) {
return this.events.get(event)?.size || 0;
}
}
// This flexibility means you can have wildly different observers
class DatabaseObserver {
update(data) {
db.collection('price_history').insertOne({
...data,
_id: new ObjectId(),
recordedAt: new Date()
});
}
}
class WebhookObserver {
constructor(webhookUrl) {
this.webhookUrl = webhookUrl;
this.queue = [];
this.batchSize = 10;
this.flushInterval = 1000;
setInterval(() => this.flush(), this.flushInterval);
}
update(data) {
this.queue.push(data);
if (this.queue.length >= this.batchSize) {
this.flush();
}
}
async flush() {
if (this.queue.length === 0) return;
const batch = this.queue.splice(0, this.batchSize);
try {
await fetch(this.webhookUrl, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ events: batch })
});
} catch (error) {
console.error('Webhook failed:', error);
// Put back in queue for retry
this.queue.unshift(...batch);
}
}
}
class MachineLearningObserver {
constructor(model) {
this.model = model;
this.predictions = [];
}
update(priceData) {
// Use price data to make predictions
const features = this.extractFeatures(priceData);
const prediction = this.model.predict(features);
this.predictions.push({
timestamp: new Date(),
symbol: priceData.symbol,
currentPrice: priceData.price,
predictedPrice: prediction.price,
confidence: prediction.confidence
});
// If prediction suggests significant move, could trigger trading
if (prediction.confidence > 0.8) {
this.considerTrade(priceData, prediction);
}
}
extractFeatures(priceData) {
// Extract features for ML model
return {
price: priceData.price,
change: priceData.metadata.change,
changePercent: priceData.metadata.changePercent,
volume: priceData.metadata.volume,
timeOfDay: new Date().getHours(),
dayOfWeek: new Date().getDay()
};
}
}
2. Dynamic Subscription Management
Observers can subscribe and unsubscribe at runtime, making the system incredibly flexible:
class AdvancedSubject {
constructor() {
this.observers = new Map();
this.observerMetadata = new WeakMap();
this.stats = {
totalNotifications: 0,
notificationsByObserver: new Map()
};
}
subscribe(observer, options = {}) {
const id = options.id || Symbol('observer');
this.observers.set(id, observer);
this.observerMetadata.set(observer, {
subscribedAt: new Date(),
priority: options.priority || 0,
filter: options.filter || null,
async: options.async || false
});
return {
unsubscribe: () => this.unsubscribe(id),
pause: () => this.pauseObserver(id),
resume: () => this.resumeObserver(id),
updateOptions: (newOptions) => this.updateObserverOptions(id, newOptions)
};
}
async notifyObservers(data) {
this.stats.totalNotifications++;
// Sort observers by priority
const sortedObservers = Array.from(this.observers.entries())
.sort(([idA, observerA], [idB, observerB]) => {
const metaA = this.observerMetadata.get(observerA);
const metaB = this.observerMetadata.get(observerB);
return (metaB?.priority || 0) - (metaA?.priority || 0);
});
// Notify in priority order
for (const [id, observer] of sortedObservers) {
const metadata = this.observerMetadata.get(observer);
// Skip if paused
if (metadata?.paused) continue;
// Apply filter if exists
if (metadata?.filter && !metadata.filter(data)) continue;
// Track stats
const count = this.stats.notificationsByObserver.get(id) || 0;
this.stats.notificationsByObserver.set(id, count + 1);
try {
if (metadata?.async) {
// Non-blocking notification
this.notifyAsync(observer, data);
} else {
// Blocking notification
await this.notifySync(observer, data);
}
} catch (error) {
console.error(`Observer ${id.toString()} failed:`, error);
// Optionally remove failing observers
if (metadata?.removeOnError) {
this.unsubscribe(id);
}
}
}
}
async notifySync(observer, data) {
if (typeof observer === 'function') {
return observer(data);
} else if (typeof observer.update === 'function') {
return observer.update(data);
} else {
throw new Error('Observer must be a function or have an update method');
}
}
async notifyAsync(observer, data) {
setImmediate(() => {
this.notifySync(observer, data).catch(error => {
console.error('Async observer error:', error);
});
});
}
}
3. The Power of Weak References
One of the biggest challenges with the Observer Pattern is memory leaks. Observers that aren’t properly cleaned up can prevent garbage collection. Modern JavaScript gives us tools to handle this:
class WeakObserverSubject {
constructor() {
// WeakMap allows observers to be garbage collected
this.observers = new WeakMap();
this.observerRefs = new Set();
}
subscribe(observer) {
// Create a weak reference to the observer
const weakRef = new WeakRef(observer);
this.observerRefs.add(weakRef);
this.observers.set(observer, {
subscribed: new Date(),
notificationCount: 0
});
return {
unsubscribe: () => {
this.observerRefs.delete(weakRef);
const obs = weakRef.deref();
if (obs) {
this.observers.delete(obs);
}
}
};
}
notify(data) {
// Clean up dead references
const deadRefs = [];
this.observerRefs.forEach(weakRef => {
const observer = weakRef.deref();
if (observer) {
// Observer is still alive
try {
observer.update(data);
// Update stats
const meta = this.observers.get(observer);
if (meta) {
meta.notificationCount++;
}
} catch (error) {
console.error('Observer error:', error);
}
} else {
// Observer has been garbage collected
deadRefs.push(weakRef);
}
});
// Remove dead references
deadRefs.forEach(ref => this.observerRefs.delete(ref));
}
getStats() {
const activeObservers = Array.from(this.observerRefs)
.map(ref => ref.deref())
.filter(Boolean);
return {
activeCount: activeObservers.length,
totalNotifications: activeObservers.reduce((sum, obs) => {
const meta = this.observers.get(obs);
return sum + (meta?.notificationCount || 0);
}, 0)
};
}
}
Real-World Implementation: Building a Complete System
Let me show you how all these concepts come together in a production-ready system. This is similar to what we deployed at the trading platform:
// The complete observable system with all bells and whistles
class ObservableSystem {
constructor(config = {}) {
this.subjects = new Map();
this.middleware = [];
this.errorHandlers = [];
this.metrics = {
notifications: 0,
errors: 0,
subscriptions: 0,
unsubscriptions: 0
};
// Configuration
this.config = {
enableLogging: config.enableLogging || false,
enableMetrics: config.enableMetrics || true,
maxObserversPerSubject: config.maxObserversPerSubject || 1000,
notificationTimeout: config.notificationTimeout || 5000,
...config
};
// Set up built-in middleware
if (this.config.enableLogging) {
this.use(this.loggingMiddleware());
}
if (this.config.enableMetrics) {
this.use(this.metricsMiddleware());
}
}
// Middleware system for cross-cutting concerns
use(middleware) {
this.middleware.push(middleware);
}
// Error handling
onError(handler) {
this.errorHandlers.push(handler);
}
// Get or create a subject
getSubject(name, factory) {
if (!this.subjects.has(name)) {
const subject = factory ? factory() : new Subject(name);
this.subjects.set(name, subject);
}
return this.subjects.get(name);
}
// Subscribe to a subject
subscribe(subjectName, observer, options = {}) {
const subject = this.getSubject(subjectName);
// Check observer limit
if (subject.observers.size >= this.config.maxObserversPerSubject) {
throw new Error(`Maximum observers (${this.config.maxObserversPerSubject}) reached for subject: ${subjectName}`);
}
// Wrap observer with middleware
const wrappedObserver = this.wrapWithMiddleware(observer, subjectName);
// Subscribe
const subscription = subject.subscribe(wrappedObserver, options);
this.metrics.subscriptions++;
// Return enhanced subscription
return {
...subscription,
pause: () => subject.pauseObserver(wrappedObserver),
resume: () => subject.resumeObserver(wrappedObserver),
isPaused: () => subject.isObserverPaused(wrappedObserver)
};
}
// Wrap observer with middleware
wrapWithMiddleware(observer, subjectName) {
const middlewareChain = [...this.middleware].reverse();
return middlewareChain.reduce((wrapped, middleware) => {
return {
update: async (data) => {
try {
await middleware(data, wrapped, {
subjectName,
observer: observer.constructor.name || 'Anonymous',
timestamp: new Date()
});
} catch (error) {
this.handleError(error, { middleware, data, subjectName });
}
}
};
}, observer);
}
// Built-in middleware
loggingMiddleware() {
return async (data, next, context) => {
console.log(`[${context.timestamp.toISOString()}] ${context.subjectName} -> ${context.observer}:`, data);
await next.update(data);
};
}
metricsMiddleware() {
return async (data, next, context) => {
const start = performance.now();
try {
await next.update(data);
this.metrics.notifications++;
} catch (error) {
this.metrics.errors++;
throw error;
} finally {
const duration = performance.now() - start;
// Track slow observers
if (duration > this.config.notificationTimeout) {
console.warn(`Slow observer detected: ${context.observer} took ${duration}ms`);
}
}
};
}
// Error handling
handleError(error, context) {
console.error('Observer system error:', error, context);
this.errorHandlers.forEach(handler => {
try {
handler(error, context);
} catch (handlerError) {
console.error('Error in error handler:', handlerError);
}
});
}
// System-wide operations
pauseAll() {
this.subjects.forEach(subject => subject.pauseAll());
}
resumeAll() {
this.subjects.forEach(subject => subject.resumeAll());
}
getMetrics() {
const subjectMetrics = Array.from(this.subjects.entries()).map(([name, subject]) => ({
name,
observerCount: subject.observers.size,
isPaused: subject.isPaused
}));
return {
...this.metrics,
subjects: subjectMetrics,
timestamp: new Date()
};
}
}
// Usage example: Building a real-time dashboard
class DashboardManager {
constructor() {
this.observableSystem = new ObservableSystem({
enableLogging: process.env.NODE_ENV === 'development',
maxObserversPerSubject: 500
});
// Add error tracking
this.observableSystem.onError((error, context) => {
// Send to error tracking service
errorTracker.captureException(error, {
extra: context
});
});
// Add performance monitoring
this.observableSystem.use(async (data, next, context) => {
const transaction = performanceMonitor.startTransaction({
name: `observer.${context.subjectName}`,
op: 'observer'
});
try {
await next.update(data);
} finally {
transaction.finish();
}
});
}
createPriceWidget(symbol, elementId) {
const widget = new PriceWidget(elementId);
const subscription = this.observableSystem.subscribe(
`price.${symbol}`,
widget,
{
priority: 10, // UI updates are high priority
filter: (data) => data.price !== null // Only valid prices
}
);
// Auto-cleanup when element is removed
const observer = new MutationObserver((mutations) => {
if (!document.getElementById(elementId)) {
subscription.unsubscribe();
observer.disconnect();
}
});
observer.observe(document.body, {
childList: true,
subtree: true
});
return widget;
}
}
The Psychology of Real-Time: Why Observer Pattern Feels So Good
There’s something deeply satisfying about real-time updates. When you click pause on Netflix and see everyone’s screen stop instantly, or when you type in Google Docs and see your friend’s cursor move in real-time, it creates a sense of connection and immediacy that polling can never achieve.
The Observer Pattern taps into this psychological need for immediate feedback. It’s the difference between:
- Refreshing your email manually vs. getting a notification
- Checking your doorbell camera vs. getting a motion alert
- Looking at your fitness tracker vs. getting achievement notifications
This immediacy isn’t just about user experience—it fundamentally changes how we interact with applications. When updates are instant, users trust the system more. They stop second-guessing whether their action was registered. They stop hitting refresh. They stop opening multiple tabs to check if something updated.
Advanced Patterns: Taking It to the Next Level
As I’ve worked with the Observer Pattern over the years, I’ve discovered some advanced techniques that solve specific challenges:
1. The Filtered Observer Pattern
Sometimes observers only care about specific types of updates:
class FilteredSubject {
constructor() {
this.observers = new Map();
}
subscribe(observer, filter = null) {
const id = Symbol('observer');
this.observers.set(id, { observer, filter });
return () => this.observers.delete(id);
}
notify(data, type = null) {
this.observers.forEach(({ observer, filter }) => {
// Check if this observer wants this type of update
if (filter && !filter(data, type)) {
return;
}
observer.update(data, type);
});
}
}
// Usage
const market = new FilteredSubject();
// Only care about large price movements
const bigMoveObserver = {
update: (data) => console.log('Big move!', data)
};
market.subscribe(bigMoveObserver, (data) =>
Math.abs(data.changePercent) > 2
);
// Only care about specific symbols
const techStockObserver = {
update: (data) => console.log('Tech stock update:', data)
};
market.subscribe(techStockObserver, (data) =>
['AAPL', 'GOOGL', 'MSFT'].includes(data.symbol)
);
2. The Hierarchical Observer Pattern
For complex systems with nested observables:
class HierarchicalSubject {
constructor(name, parent = null) {
this.name = name;
this.parent = parent;
this.children = new Map();
this.observers = new Set();
if (parent) {
parent.addChild(this);
}
}
addChild(child) {
this.children.set(child.name, child);
}
subscribe(observer, options = {}) {
const subscription = {
observer,
includeChildren: options.includeChildren || false,
includeParent: options.includeParent || false
};
this.observers.add(subscription);
return () => this.observers.delete(subscription);
}
notify(data, source = this) {
// Notify own observers
this.observers.forEach(({ observer, includeChildren, includeParent }) => {
observer.update({
...data,
source: source.name,
path: this.getPath()
});
});
// Bubble up to parent
if (this.parent && source === this) {
this.parent.notifyFromChild(data, source);
}
// Propagate to children
this.children.forEach(child => {
child.notifyFromParent(data, source);
});
}
notifyFromChild(data, source) {
this.observers.forEach(({ observer, includeChildren }) => {
if (includeChildren) {
observer.update({
...data,
source: source.name,
path: source.getPath(),
propagatedFrom: 'child'
});
}
});
// Continue bubbling up
if (this.parent) {
this.parent.notifyFromChild(data, source);
}
}
notifyFromParent(data, source) {
this.observers.forEach(({ observer, includeParent }) => {
if (includeParent) {
observer.update({
...data,
source: source.name,
path: source.getPath(),
propagatedFrom: 'parent'
});
}
});
// Continue propagating down
this.children.forEach(child => {
child.notifyFromParent(data, source);
});
}
getPath() {
const path = [this.name];
let current = this.parent;
while (current) {
path.unshift(current.name);
current = current.parent;
}
return path.join('.');
}
}
// Usage: Organization-wide notification system
const company = new HierarchicalSubject('ACME Corp');
const engineering = new HierarchicalSubject('Engineering', company);
const frontend = new HierarchicalSubject('Frontend', engineering);
const backend = new HierarchicalSubject('Backend', engineering);
// CEO wants all updates
company.subscribe({
update: (data) => console.log('CEO Dashboard:', data)
}, { includeChildren: true });
// Engineering manager wants team updates
engineering.subscribe({
update: (data) => console.log('Engineering Manager:', data)
}, { includeChildren: true });
// Frontend dev wants frontend and company-wide updates
frontend.subscribe({
update: (data) => console.log('Frontend Dev:', data)
}, { includeParent: true });
// When frontend deploys
frontend.notify({
type: 'deployment',
version: '2.0.0',
timestamp: new Date()
});
// This notifies: Frontend Dev, Engineering Manager, and CEO
3. The Time-Windowed Observer Pattern
For scenarios where you need to batch updates:
class TimeWindowedSubject {
constructor(windowMs = 1000) {
this.observers = new Set();
this.pendingNotifications = [];
this.windowMs = windowMs;
this.timer = null;
}
subscribe(observer) {
this.observers.add(observer);
return () => this.observers.delete(observer);
}
notify(data) {
this.pendingNotifications.push({
data,
timestamp: Date.now()
});
if (!this.timer) {
this.timer = setTimeout(() => this.flush(), this.windowMs);
}
}
flush() {
if (this.pendingNotifications.length === 0) {
this.timer = null;
return;
}
const notifications = [...this.pendingNotifications];
this.pendingNotifications = [];
this.timer = null;
// Create summary
const summary = this.createSummary(notifications);
// Notify with batched data
this.observers.forEach(observer => {
observer.update(summary);
});
}
createSummary(notifications) {
return {
count: notifications.length,
timeWindow: {
start: notifications[0].timestamp,
end: notifications[notifications.length - 1].timestamp
},
data: notifications.map(n => n.data)
};
}
}
// Perfect for rate-limited APIs or reducing UI updates
const priceAggregator = new TimeWindowedSubject(100); // 100ms windows
priceAggregator.subscribe({
update: (summary) => {
console.log(`Received ${summary.count} updates in ${summary.timeWindow.end - summary.timeWindow.start}ms`);
// Update UI once with all changes
updatePriceDisplay(summary.data);
}
});
The Dark Side: Common Pitfalls and How to Avoid Them
After years of using the Observer Pattern, I’ve collected a list of “gotchas” that can turn your elegant solution into a debugging nightmare:
1. The Infinite Loop of Death
// DON'T DO THIS
class PriceSubject {
constructor() {
this.observers = [];
this.price = 0;
}
setPrice(price) {
this.price = price;
this.notifyObservers();
}
notifyObservers() {
this.observers.forEach(obs => obs.update(this));
}
}
class BadObserver {
update(subject) {
// This creates an infinite loop!
if (subject.price > 100) {
subject.setPrice(99); // NO! Don't modify the subject in update
}
}
}
// INSTEAD, DO THIS
class GoodObserver {
constructor(controller) {
this.controller = controller;
}
update(priceData) {
if (priceData.price > 100) {
// Schedule the change for later
setTimeout(() => {
this.controller.requestPriceAdjustment(99);
}, 0);
}
}
}
2. The Memory Leak Trap
// Memory leak waiting to happen
class LeakyApp {
constructor() {
this.components = [];
this.priceSubject = new PriceSubject();
}
createComponent() {
const component = new PriceDisplay();
this.priceSubject.subscribe(component);
this.components.push(component);
return component;
}
removeComponent(component) {
const index = this.components.indexOf(component);
this.components.splice(index, 1);
// FORGOT TO UNSUBSCRIBE! component is still referenced by priceSubject
}
}
// Proper cleanup
class CleanApp {
constructor() {
this.components = new Map();
this.priceSubject = new PriceSubject();
}
createComponent() {
const component = new PriceDisplay();
const unsubscribe = this.priceSubject.subscribe(component);
// Store both component and its cleanup function
this.components.set(component, { unsubscribe });
return component;
}
removeComponent(component) {
const entry = this.components.get(component);
if (entry) {
entry.unsubscribe(); // Clean up subscription
this.components.delete(component);
}
}
// Clean up everything
destroy() {
this.components.forEach(({ unsubscribe }) => unsubscribe());
this.components.clear();
}
}
3. The Order Dependency Bug
// When order matters but isn't guaranteed
class OrderDependentSystem {
constructor() {
this.subject = new Subject();
// These observers depend on execution order
this.subject.subscribe(new DatabaseLogger()); // Must run first
this.subject.subscribe(new CacheUpdater()); // Must run second
this.subject.subscribe(new UINotifier()); // Must run last
// But order isn't guaranteed!
}
}
// Solution: Use priority or chain pattern
class PrioritySubject {
constructor() {
this.observers = [];
}
subscribe(observer, priority = 0) {
this.observers.push({ observer, priority });
// Keep sorted by priority
this.observers.sort((a, b) => b.priority - a.priority);
return () => {
const index = this.observers.findIndex(o => o.observer === observer);
if (index !== -1) {
this.observers.splice(index, 1);
}
};
}
notify(data) {
// Observers are already sorted by priority
this.observers.forEach(({ observer }) => {
observer.update(data);
});
}
}
// Usage with explicit priorities
const system = new PrioritySubject();
system.subscribe(new DatabaseLogger(), 100); // Highest priority
system.subscribe(new CacheUpdater(), 50); // Medium priority
system.subscribe(new UINotifier(), 10); // Lowest priority
Performance at Scale: Lessons from the Trenches
When our trading platform grew to handle millions of events per second, we learned some hard lessons about scaling the Observer Pattern:
1. Batch Everything
class ScalableEventSystem {
constructor() {
this.queues = new Map(); // event type -> queue
this.processors = new Map(); // event type -> processor
this.batchSize = 1000;
this.flushInterval = 10; // ms
}
emit(eventType, data) {
if (!this.queues.has(eventType)) {
this.queues.set(eventType, []);
this.scheduleFlush(eventType);
}
this.queues.get(eventType).push({
data,
timestamp: process.hrtime.bigint()
});
}
scheduleFlush(eventType) {
setTimeout(() => {
this.flush(eventType);
}, this.flushInterval);
}
flush(eventType) {
const queue = this.queues.get(eventType);
if (!queue || queue.length === 0) return;
// Take a batch
const batch = queue.splice(0, this.batchSize);
// Process batch
const processor = this.processors.get(eventType);
if (processor) {
processor.processBatch(batch);
}
// Schedule next flush if queue isn't empty
if (queue.length > 0) {
this.scheduleFlush(eventType);
}
}
}
2. Use Worker Threads for Heavy Processing
// main.js
const { Worker } = require('worker_threads');
class ThreadedObserverSystem {
constructor(workerCount = 4) {
this.workers = [];
this.currentWorker = 0;
// Create worker pool
for (let i = 0; i < workerCount; i++) {
const worker = new Worker('./observer-worker.js');
worker.on('message', this.handleWorkerMessage.bind(this));
worker.on('error', this.handleWorkerError.bind(this));
this.workers.push(worker);
}
}
notify(data) {
// Round-robin distribution
const worker = this.workers[this.currentWorker];
this.currentWorker = (this.currentWorker + 1) % this.workers.length;
worker.postMessage({
type: 'process',
data
});
}
handleWorkerMessage(message) {
if (message.type === 'result') {
// Handle processed result
this.updateUI(message.result);
}
}
handleWorkerError(error) {
console.error('Worker error:', error);
// Restart worker or handle error
}
}
// observer-worker.js
const { parentPort } = require('worker_threads');
class HeavyProcessingObserver {
update(data) {
// Expensive computation
const result = this.complexCalculation(data);
parentPort.postMessage({
type: 'result',
result
});
}
complexCalculation(data) {
// CPU-intensive work
// Machine learning inference, image processing, etc.
}
}
const observer = new HeavyProcessingObserver();
parentPort.on('message', (message) => {
if (message.type === 'process') {
observer.update(message.data);
}
});
3. Smart Debouncing and Throttling
class OptimizedObserver {
constructor(options = {}) {
this.options = {
debounceMs: options.debounceMs || 0,
throttleMs: options.throttleMs || 0,
maxPending: options.maxPending || 100
};
this.pending = [];
this.debounceTimer = null;
this.lastThrottle = 0;
}
update(data) {
if (this.options.throttleMs > 0) {
const now = Date.now();
if (now - this.lastThrottle < this.options.throttleMs) {
return; // Skip this update
}
this.lastThrottle = now;
}
if (this.options.debounceMs > 0) {
this.pending.push(data);
// Limit pending queue
if (this.pending.length > this.options.maxPending) {
this.pending.shift();
}
clearTimeout(this.debounceTimer);
this.debounceTimer = setTimeout(() => {
this.processPending();
}, this.options.debounceMs);
} else {
this.process(data);
}
}
processPending() {
if (this.pending.length === 0) return;
// Process all pending updates at once
const batch = [...this.pending];
this.pending = [];
this.processBatch(batch);
}
process(data) {
// Handle single update
}
processBatch(batch) {
// Handle batch of updates more efficiently
}
}
The Future: Where Observer Pattern is Going
As I write this in 2024, the Observer Pattern continues to evolve. Here are some cutting-edge applications I’m seeing:
1. Reactive Streams and Backpressure
class ReactiveSubject {
constructor() {
this.observers = new Map();
}
subscribe(observer) {
const subscription = {
observer,
demand: 0,
buffer: [],
cancelled: false
};
this.observers.set(observer, subscription);
// Return subscription controller
return {
request: (n) => this.request(observer, n),
cancel: () => this.cancel(observer)
};
}
request(observer, n) {
const subscription = this.observers.get(observer);
if (!subscription || subscription.cancelled) return;
subscription.demand += n;
this.drain(subscription);
}
cancel(observer) {
const subscription = this.observers.get(observer);
if (subscription) {
subscription.cancelled = true;
this.observers.delete(observer);
}
}
emit(data) {
this.observers.forEach(subscription => {
if (!subscription.cancelled) {
subscription.buffer.push(data);
this.drain(subscription);
}
});
}
drain(subscription) {
while (subscription.demand > 0 && subscription.buffer.length > 0) {
const data = subscription.buffer.shift();
subscription.demand--;
try {
subscription.observer.onNext(data);
} catch (error) {
subscription.observer.onError(error);
subscription.cancelled = true;
}
}
// Handle overflow
if (subscription.buffer.length > 1000) {
subscription.observer.onError(new Error('Buffer overflow'));
subscription.cancelled = true;
}
}
}
2. Distributed Observer Pattern with Event Sourcing
class DistributedEventStore {
constructor(config) {
this.events = [];
this.snapshots = new Map();
this.projections = new Map();
this.version = 0;
// Connect to message broker (Kafka, RabbitMQ, etc.)
this.broker = new MessageBroker(config);
}
// Append event to the log
async append(event) {
const entry = {
id: uuid(),
type: event.type,
data: event.data,
metadata: {
timestamp: new Date(),
version: ++this.version,
aggregateId: event.aggregateId,
userId: event.userId
}
};
// Persist to event store
await this.persistEvent(entry);
// Publish to message broker
await this.broker.publish('events', entry);
// Update local projections
this.updateProjections(entry);
return entry;
}
// Subscribe to events
subscribe(filter, handler) {
const subscription = this.broker.subscribe('events', async (event) => {
if (this.matchesFilter(event, filter)) {
try {
await handler(event);
} catch (error) {
console.error('Event handler error:', error);
// Implement retry logic
await this.handleFailure(event, error, handler);
}
}
});
return subscription;
}
// Create a projection (materialized view)
createProjection(name, handlers) {
const projection = {
name,
handlers,
state: {},
version: 0
};
this.projections.set(name, projection);
// Replay events to build projection
this.replayEvents(projection);
// Subscribe to future events
this.subscribe(
{ types: Object.keys(handlers) },
(event) => this.updateProjection(projection, event)
);
return projection;
}
async updateProjection(projection, event) {
const handler = projection.handlers[event.type];
if (handler) {
projection.state = await handler(projection.state, event);
projection.version = event.metadata.version;
}
}
}
// Usage: CQRS with Event Sourcing
const eventStore = new DistributedEventStore({
broker: 'kafka://localhost:9092'
});
// Command side - append events
await eventStore.append({
type: 'OrderPlaced',
aggregateId: orderId,
data: {
customerId: 'cust-123',
items: [...],
total: 299.99
}
});
// Query side - create projections
const orderProjection = eventStore.createProjection('orders', {
OrderPlaced: (state, event) => ({
...state,
[event.aggregateId]: {
status: 'placed',
...event.data,
placedAt: event.metadata.timestamp
}
}),
OrderShipped: (state, event) => ({
...state,
[event.aggregateId]: {
...state[event.aggregateId],
status: 'shipped',
shippedAt: event.metadata.timestamp
}
}),
OrderDelivered: (state, event) => ({
...state,
[event.aggregateId]: {
...state[event.aggregateId],
status: 'delivered',
deliveredAt: event.metadata.timestamp
}
})
});
// Real-time updates across microservices
class OrderService {
constructor(eventStore) {
this.eventStore = eventStore;
// Subscribe to relevant events from other services
eventStore.subscribe(
{ types: ['PaymentProcessed', 'PaymentFailed'] },
this.handlePaymentEvent.bind(this)
);
eventStore.subscribe(
{ types: ['InventoryReserved', 'InventoryInsufficient'] },
this.handleInventoryEvent.bind(this)
);
}
async handlePaymentEvent(event) {
const orderId = event.data.orderId;
if (event.type === 'PaymentProcessed') {
await this.eventStore.append({
type: 'OrderConfirmed',
aggregateId: orderId,
data: {
paymentId: event.data.paymentId,
amount: event.data.amount
}
});
} else if (event.type === 'PaymentFailed') {
await this.eventStore.append({
type: 'OrderCancelled',
aggregateId: orderId,
data: {
reason: 'payment_failed',
error: event.data.error
}
});
}
}
}
3. Quantum Observer Pattern (Yes, Really!)
The Observer Pattern is even making its way into quantum computing simulations:
class QuantumObservable {
constructor(initialState) {
this.state = initialState; // Superposition of states
this.observers = new Set();
this.collapsed = false;
}
// Observe collapses the wave function
observe() {
if (!this.collapsed) {
// Collapse to a definite state
this.state = this.collapse(this.state);
this.collapsed = true;
// Notify all entangled observers
this.notifyEntangled();
}
return this.state;
}
entangle(other) {
// Create quantum entanglement between observables
this.observers.add(other);
other.observers.add(this);
}
notifyEntangled() {
this.observers.forEach(observer => {
if (!observer.collapsed) {
// Instantly affect entangled particle
observer.state = this.correlatedCollapse(observer.state);
observer.collapsed = true;
}
});
}
collapse(superposition) {
// Probabilistic collapse based on wave function
const random = Math.random();
let cumulative = 0;
for (const [state, probability] of superposition) {
cumulative += probability;
if (random < cumulative) {
return state;
}
}
}
}
// Simulating quantum entanglement
const particleA = new QuantumObservable([
['up', 0.5],
['down', 0.5]
]);
const particleB = new QuantumObservable([
['up', 0.5],
['down', 0.5]
]);
particleA.entangle(particleB);
// Observing one instantly affects the other
console.log('Particle A:', particleA.observe()); // 'up'
console.log('Particle B:', particleB.state); // 'down' - instantly correlated!
Testing Strategies: Making Observer Pattern Bulletproof
After shipping several production systems using the Observer Pattern, I’ve developed a comprehensive testing strategy:
// Test utilities for Observer Pattern
class TestSubject extends Subject {
constructor() {
super();
this.notificationLog = [];
}
notify(data) {
this.notificationLog.push({
timestamp: Date.now(),
data: { ...data },
observerCount: this.observers.size
});
super.notify(data);
}
getNotificationHistory() {
return [...this.notificationLog];
}
clearHistory() {
this.notificationLog = [];
}
}
class MockObserver {
constructor(name) {
this.name = name;
this.updates = [];
this.updateCount = 0;
this.errors = [];
}
update(data) {
this.updateCount++;
this.updates.push({
timestamp: Date.now(),
data: { ...data }
});
// Simulate processing
if (this.shouldFail) {
const error = new Error(`${this.name} simulated failure`);
this.errors.push(error);
throw error;
}
// Simulate async processing
if (this.delay) {
return new Promise(resolve => {
setTimeout(resolve, this.delay);
});
}
}
getLastUpdate() {
return this.updates[this.updates.length - 1];
}
reset() {
this.updates = [];
this.updateCount = 0;
this.errors = [];
}
}
// Comprehensive test suite
describe('Observer Pattern Implementation', () => {
let subject;
let observer1, observer2, observer3;
beforeEach(() => {
subject = new TestSubject();
observer1 = new MockObserver('Observer1');
observer2 = new MockObserver('Observer2');
observer3 = new MockObserver('Observer3');
});
describe('Basic Functionality', () => {
test('should notify all observers when state changes', () => {
subject.subscribe(observer1);
subject.subscribe(observer2);
const data = { price: 100, symbol: 'AAPL' };
subject.notify(data);
expect(observer1.updateCount).toBe(1);
expect(observer2.updateCount).toBe(1);
expect(observer1.getLastUpdate().data).toEqual(data);
expect(observer2.getLastUpdate().data).toEqual(data);
});
test('should not notify unsubscribed observers', () => {
const unsubscribe = subject.subscribe(observer1);
subject.subscribe(observer2);
unsubscribe();
subject.notify({ price: 100 });
expect(observer1.updateCount).toBe(0);
expect(observer2.updateCount).toBe(1);
});
test('should handle observer errors gracefully', () => {
observer1.shouldFail = true;
subject.subscribe(observer1);
subject.subscribe(observer2);
expect(() => subject.notify({ price: 100 })).not.toThrow();
expect(observer1.errors).toHaveLength(1);
expect(observer2.updateCount).toBe(1);
});
});
describe('Performance', () => {
test('should handle large numbers of observers', () => {
const observers = [];
const observerCount = 10000;
// Create and subscribe many observers
for (let i = 0; i < observerCount; i++) {
const observer = new MockObserver(`Observer${i}`);
observers.push(observer);
subject.subscribe(observer);
}
const startTime = performance.now();
subject.notify({ price: 100 });
const endTime = performance.now();
// All observers should be notified
observers.forEach(observer => {
expect(observer.updateCount).toBe(1);
});
// Should complete in reasonable time (< 100ms for 10k observers)
expect(endTime - startTime).toBeLessThan(100);
});
test('should not leak memory when observers are removed', () => {
const initialMemory = process.memoryUsage().heapUsed;
const iterations = 1000;
for (let i = 0; i < iterations; i++) {
const observer = new MockObserver(`TempObserver${i}`);
const unsubscribe = subject.subscribe(observer);
subject.notify({ iteration: i });
unsubscribe();
}
// Force garbage collection if available
if (global.gc) {
global.gc();
}
const finalMemory = process.memoryUsage().heapUsed;
const memoryGrowth = finalMemory - initialMemory;
// Memory growth should be minimal (< 1MB)
expect(memoryGrowth).toBeLessThan(1024 * 1024);
});
});
describe('Concurrency', () => {
test('should handle concurrent subscriptions and notifications', async () => {
const promises = [];
// Concurrent subscriptions
for (let i = 0; i < 100; i++) {
promises.push(
Promise.resolve().then(() => {
const observer = new MockObserver(`Concurrent${i}`);
return subject.subscribe(observer);
})
);
}
// Concurrent notifications
for (let i = 0; i < 50; i++) {
promises.push(
Promise.resolve().then(() => {
subject.notify({ batch: i });
})
);
}
await Promise.all(promises);
// Verify notification history
expect(subject.getNotificationHistory()).toHaveLength(50);
});
test('should maintain order with async observers', async () => {
const asyncObserver = new MockObserver('Async');
asyncObserver.delay = 10; // 10ms delay
const syncObserver = new MockObserver('Sync');
subject.subscribe(asyncObserver);
subject.subscribe(syncObserver);
subject.notify({ order: 1 });
subject.notify({ order: 2 });
// Wait for async processing
await new Promise(resolve => setTimeout(resolve, 50));
// Both should receive updates in order
expect(asyncObserver.updates[0].data.order).toBe(1);
expect(asyncObserver.updates[1].data.order).toBe(2);
expect(syncObserver.updates[0].data.order).toBe(1);
expect(syncObserver.updates[1].data.order).toBe(2);
});
});
});
// Integration tests
describe('Observer Pattern Integration', () => {
test('should work with real WebSocket connections', async () => {
const server = new WebSocketServer({ port: 8080 });
const subject = new TestSubject();
// Set up server-side observer
server.on('connection', (ws) => {
const wsObserver = {
update: (data) => {
ws.send(JSON.stringify(data));
}
};
subject.subscribe(wsObserver);
ws.on('close', () => {
subject.unsubscribe(wsObserver);
});
});
// Client connection
const client = new WebSocket('ws://localhost:8080');
const receivedMessages = [];
client.on('message', (data) => {
receivedMessages.push(JSON.parse(data));
});
await new Promise(resolve => client.on('open', resolve));
// Trigger notifications
subject.notify({ test: 1 });
subject.notify({ test: 2 });
// Wait for messages
await new Promise(resolve => setTimeout(resolve, 100));
expect(receivedMessages).toHaveLength(2);
expect(receivedMessages[0].test).toBe(1);
expect(receivedMessages[1].test).toBe(2);
// Cleanup
client.close();
server.close();
});
});
The Observer Pattern in Modern Frameworks
Today, the Observer Pattern is baked into almost every modern framework. Understanding how they implement it can make you a better developer:
React’s Implementation
// React uses a variation of Observer Pattern for state management
class ReactStyleObservable {
constructor(initialState) {
this.state = initialState;
this.subscribers = new Set();
}
useState() {
// Each component that calls useState becomes an observer
const component = getCurrentComponent(); // React internals
this.subscribers.add(component);
const setState = (newState) => {
this.state = typeof newState === 'function'
? newState(this.state)
: newState;
// Re-render all subscribed components
this.subscribers.forEach(component => {
component.scheduleUpdate();
});
};
return [this.state, setState];
}
}
// This is why React Hooks work!
function PriceDisplay() {
const [price, setPrice] = useState(0); // Subscribes to price changes
return <div>${price}</div>;
}
Vue’s Reactivity System
// Vue 3 uses Proxy-based observation
class VueStyleReactive {
constructor(target) {
this.target = target;
this.deps = new Map(); // property -> Set of effects
return new Proxy(target, {
get: (obj, prop) => {
this.track(prop);
return obj[prop];
},
set: (obj, prop, value) => {
obj[prop] = value;
this.trigger(prop);
return true;
}
});
}
track(prop) {
const activeEffect = getCurrentEffect(); // Vue internals
if (activeEffect) {
if (!this.deps.has(prop)) {
this.deps.set(prop, new Set());
}
this.deps.get(prop).add(activeEffect);
}
}
trigger(prop) {
const effects = this.deps.get(prop);
if (effects) {
effects.forEach(effect => effect.run());
}
}
}
// Usage
const state = reactive({
price: 100,
quantity: 5
});
// This automatically subscribes to price and quantity
watchEffect(() => {
console.log(`Total: ${state.price * state.quantity}`);
});
state.price = 110; // Logs: "Total: 550"
MobX’s Observable Pattern
// MobX makes everything observable
class MobXStyleObservable {
constructor() {
this.observers = new Map();
this.values = new Map();
}
makeObservable(target) {
return new Proxy(target, {
get: (obj, prop) => {
// Track access
if (currentObserver) {
if (!this.observers.has(prop)) {
this.observers.set(prop, new Set());
}
this.observers.get(prop).add(currentObserver);
}
return obj[prop];
},
set: (obj, prop, value) => {
const oldValue = obj[prop];
obj[prop] = value;
// Notify observers if value changed
if (oldValue !== value) {
const observers = this.observers.get(prop);
if (observers) {
observers.forEach(observer => observer.run());
}
}
return true;
}
});
}
autorun(fn) {
const observer = {
run: () => {
currentObserver = observer;
fn();
currentObserver = null;
}
};
observer.run(); // Run once to collect dependencies
return observer;
}
}
// Usage
const store = makeObservable({
price: 100,
quantity: 5,
get total() {
return this.price * this.quantity;
}
});
autorun(() => {
console.log(`Total value: ${store.total}`);
});
store.price = 110; // Logs: "Total value: 550"
Conclusion: The Pattern That Changed Everything
As I sit here, five years after that fateful night of debugging polling loops, I can’t help but smile. The Observer Pattern didn’t just solve our immediate problem – it fundamentally changed how I think about software architecture.
Every time I see a real-time update – whether it’s a stock price changing, a collaborative document updating, or a notification popping up – I think about the elegant simplicity of observers and subjects. It’s a pattern that’s simultaneously simple enough to implement in a few lines of code and powerful enough to run systems handling billions of events.
The journey from that broken polling implementation to a scalable, real-time trading platform taught me several invaluable lessons:
- Simple patterns can solve complex problems – The Observer Pattern is conceptually simple, but its applications are limitless.
- Decoupling is everything – When subjects don’t need to know about their observers, magic happens. Systems become flexible, testable, and maintainable.
- Real-time changes user behavior – When updates are instant, users interact differently with your application. They trust it more, use it more, and complain less.
- Performance isn’t about doing things faster – It’s about not doing unnecessary things at all. The Observer Pattern eliminates waste at a fundamental level.
- Patterns are living things – The Observer Pattern continues to evolve, from simple callbacks to reactive streams to quantum computing simulations.
Today, that trading platform handles millions of transactions daily. The real-time updates that once brought our servers to their knees now flow effortlessly. Users see price changes in milliseconds, not seconds. And somewhere in that codebase, observers are quietly doing their job – watching, waiting, and instantly responding when something interesting happens.
If you’re building anything that needs real-time updates, anything where multiple components need to stay in sync, or anything where you find yourself polling for changes – stop. Take a breath. And consider the Observer Pattern.
Your future self, your servers, and your users will thank you.
And who knows? Maybe five years from now, you’ll be writing your own story about how a simple pattern changed everything.
Happy observing! 🚀
P.S. If you’re implementing the Observer Pattern in your own projects, remember: start simple, test thoroughly, and always, always remember to unsubscribe. The best patterns are the ones that solve real problems without creating new ones.