The BRMH (Backend Resource Management Hub) cache system provides high-performance data caching using AWS ElastiCache (Valkey) with Redis-compatible operations. The system supports both individual item caching and chunked data storage with automatic duplicate detection, non-blocking operations, and optimized performance.
- Non-blocking cache updates: Background processing prevents API blocking
- Parallel cache configuration processing: Multiple configs processed simultaneously
- Optimized logging: Reduced verbosity with one-liner status messages
- Queue system: Prevents data loss during concurrent bulk operations
- Enhanced pagination: Better handling of large datasets with improved limits
- AWS ElastiCache (Valkey): Redis-compatible managed caching service
- DynamoDB: Primary data source for caching
- Express.js: API endpoints for cache operations
- ioredis: Redis client for Node.js
- Lambda Functions: Data streaming and cache update triggers
- Queue System: In-memory queue for pending cache updates
{project}:{tableName}:{identifier}
Examples:
- Individual items:
my-app:shopify-inkhub-get-products:12345 - Chunked data:
my-app:shopify-inkhub-get-products:chunk:0 - Individual items with ID:
my-app:shopify-inkhub-get-products:0000
- Each DynamoDB item is cached separately
- Key format:
{project}:{tableName}:{itemId} - Benefits:
- Granular access to individual items
- Better cache hit rates
- Easy item-level updates/deletes
- Use case: When you need frequent access to specific items
- Multiple items grouped into chunks
- Key format:
{project}:{tableName}:chunk:{chunkIndex} - Benefits:
- Efficient bulk operations
- Reduced key overhead
- Better for large datasets
- Use case: When you need bulk data retrieval
POST /cache/table
Caches entire DynamoDB table data with duplicate detection.
Request Body:
{
"project": "my-app",
"table": "shopify-inkhub-get-products",
"recordsPerKey": 100,
"ttl": 3600
}Response:
{
"message": "Caching complete (bounded buffer)",
"project": "my-app",
"table": "shopify-inkhub-get-products",
"totalRecords": 1000,
"successfulWrites": 850,
"failedWrites": 0,
"attemptedKeys": 850,
"skippedDuplicates": 150,
"fillRate": "100.00%",
"durationMs": 5000,
"cacheKeys": ["key1", "key2"],
"totalCacheKeys": 850
}GET /cache/data?project={project}&table={table}
Retrieves all cache keys for a specific project and table (keys only, no data).
Response:
{
"message": "Cache keys retrieved in sequence (keys only)",
"keysFound": 132,
"keys": [
"my-app:shopify-inkhub-get-products:chunk:0",
"my-app:shopify-inkhub-get-products:chunk:1"
],
"note": "Use ?key=specific_key to get actual data for a specific key"
}GET /cache/data-in-sequence?project={project}&table={table}&page={page}&limit={limit}&includeData={true|false}
Retrieves cached data with pagination support. By default, returns keys only unless includeData=true is specified.
Parameters:
page: Page number (default: 1)limit: Items per page (default: 1000)includeData: Whether to include actual data (default: false)
Response (Keys Only - Default):
{
"message": "Cache keys retrieved in sequence with pagination (keys only)",
"keysFound": 100,
"totalKeys": 132,
"keys": ["chunk:0", "chunk:1", "chunk:2"],
"note": "Use ?includeData=true to get actual data for these keys",
"pagination": {
"currentPage": 1,
"totalPages": 2,
"hasMore": true,
"totalItems": 132
}
}Response (With Data):
{
"namespace-id": "uuid",
"namespace-name": "Shopify",
"namespace-url": "https://api.shopify.com",
"tags": ["ecommerce", "api"],
"createdAt": "timestamp",
"updatedAt": "timestamp"
}{
"account-id": "uuid",
"namespace-id": "uuid",
"namespace-account-name": "Production Store",
"namespace-account-url-override": "https://mystore.myshopify.com",
"namespace-account-header": [
{"key": "Authorization", "value": "Bearer token123"}
],
"variables": [
{"key": "store_id", "value": "12345"}
],
"tags": ["production"],
"createdAt": "timestamp"
}{
"method-id": "uuid",
"namespace-id": "uuid",
"namespace-method-name": "Get Orders",
"namespace-method-type": "GET",
"namespace-method-url-override": "/admin/api/2023-10/orders.json",
"namespace-method-header": [
{"key": "Content-Type", "value": "application/json"}
],
"namespace-method-queryParams": {
"limit": "50",
"status": "any"
},
"namespace-method-body": {},
"tags": ["orders", "read"],
"createdAt": "timestamp"
}// Example: Execute a Shopify order fetch
POST /unified/execute
{
"namespaceId": "shopify-namespace-id",
"accountId": "production-store-account-id",
"methodId": "get-orders-method-id",
"save": true,
"tableName": "shopify-orders"
}{
port: parseInt(process.env.REDIS_PORT),
tls: process.env.REDIS_TLS === 'true' ? {} : undefined,
lazyConnect: true,
connectTimeout: 15000,
commandTimeout: 15000,
enableOfflineQueue: true,
maxRetriesPerRequest: 5
}- Background processing: Cleanup operations run in background using
setImmediate() - Parallel processing: Multiple cache configurations processed simultaneously
- Immediate response: API responds immediately while processing continues
- No API blocking: Other endpoints remain responsive during cache updates
- Concurrency control: Prevents data loss during concurrent bulk operations
- In-memory queue: Pending updates queued when bulk operations are active
- Automatic processing: Queued updates processed after bulk operation completes
- Race condition protection: Ensures data integrity during high concurrency
- One-liner messages: Reduced verbosity with concise status updates
- No repetitive logs: Eliminated duplicate and verbose logging
- Performance tracking: Duration and success rate logging
- Clean PM2 logs: Minimal noise in production logs
- Improved limits: Default limit increased from 10 to 1000
- Keys-only default: Returns keys by default to prevent timeouts
- Explicit data retrieval: Data only fetched when
includeData=true - Better pagination info: Enhanced pagination metadata
- Processes data in chunks to manage memory usage
- Writes chunks as soon as buffer is full
- Prevents memory overflow with large datasets
- Uses
SCANcommand for Valkey compatibility - Avoids blocking operations on large datasets
- Supports pattern matching for key retrieval
- Chunks are numbered sequentially (chunk:0, chunk:1, etc.)
- Enables efficient data retrieval in order
- Supports pagination for large datasets
-
Connection Timeout
Error: connect ETIMEDOUTSolution: Check security groups and network ACLs
-
Unknown Command
ReplyError: ERR unknown command 'keys'Solution: Use
SCANinstead ofKEYSfor Valkey -
Stream Not Writable
Error: Stream isn't writeable and enableOfflineQueue options is falseSolution: Enable offline queue in Redis configuration
-
Cache Update Queued
Status: 202 Accepted Message: "Cache update queued for later processing"Solution: This is normal during bulk operations. Updates will be processed automatically.
-
Gateway Timeout (504)
Error: 504 Gateway TimeoutSolution: Use pagination or set
includeData=falsefor large datasets
GET /test-valkey-connection
Tests connectivity to Valkey cache.
POST /cache/cleanup-timestamp-chunks
Converts timestamp-based chunks to sequential numbering.
POST /cache/clear-unwanted-order-data
Removes non-cache-config data from cache table.
GET /cache/bulk-operations
Check currently active bulk cache operations.
GET /cache/pending-updates
View pending cache updates in queue.
- Use individual caching for frequently accessed specific items
- Use chunked caching for bulk data operations
- Consider data access patterns when choosing strategy
- Set appropriate TTL based on data freshness requirements
- Monitor cache hit rates and adjust TTL accordingly
- Use longer TTL for stable data, shorter for frequently changing data
- Monitor cache size and memory usage
- Implement cache eviction policies if needed
- Use bounded buffer for large dataset processing
- Use
includeData=falsefor key-only operations to prevent timeouts - Leverage pagination for large datasets
- Monitor queue status during high concurrency periods
- Use parallel processing for multiple cache configurations
- Implement retry logic for failed cache operations
- Log cache errors for debugging
- Have fallback mechanisms for cache failures
- Monitor queue system for stuck operations
// Cache a table
const cacheTable = async (tableName) => {
const response = await fetch('/cache/table', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
project: 'my-app',
table: tableName,
recordsPerKey: 100,
ttl: 3600
})
});
return response.json();
};
// Get cache keys only (fast, no data)
const getCacheKeys = async (tableName) => {
const response = await fetch(
`/cache/data-in-sequence?project=my-app&table=${tableName}&page=1&limit=1000`
);
return response.json();
};
// Get cached data with pagination
const getCachedData = async (tableName, page = 1, limit = 100) => {
const response = await fetch(
`/cache/data-in-sequence?project=my-app&table=${tableName}&page=${page}&limit=${limit}&includeData=true`
);
return response.json();
};
// Get specific cache key data
const getSpecificCacheData = async (tableName, key) => {
const response = await fetch(
`/cache/data?project=my-app&table=${tableName}&key=${key}`
);
return response.json();
};// Check cache keys and counts
const getCacheKeys = async (tableName) => {
const response = await fetch(
`/cache/data?project=my-app&table=${tableName}`
);
return response.json();
};
// Monitor queue status
const getQueueStatus = async () => {
const [bulkOps, pendingUpdates] = await Promise.all([
fetch('/cache/bulk-operations').then(r => r.json()),
fetch('/cache/pending-updates').then(r => r.json())
]);
return { bulkOps, pendingUpdates };
};- Check Lambda trigger configuration
- Verify DynamoDB stream settings
- Ensure cache update endpoint is accessible
- Check queue status for stuck operations
- Monitor cache hit rates
- Check for memory pressure
- Optimize chunk sizes based on data patterns
- Use
includeData=falsefor key-only operations - Monitor queue system during high concurrency
- Verify TTL settings
- Check for cache invalidation logic
- Monitor duplicate detection logs
- Check for queued updates that haven't been processed
- Use pagination for large datasets
- Set
includeData=falsefor key-only operations - Increase API Gateway timeout limits
- Monitor cache update queue status
For issues related to the cache system:
- Check console logs for detailed error messages
- Monitor cache metrics and performance
- Review this documentation for common solutions
- Contact the development team for complex issues