cache

package
v0.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 16, 2025 License: None detected not legal advice Imports: 0 Imported by: 0

README ΒΆ

Chat Message Caching System

This project implements a hybrid caching mechanism for a chat application, combining in-memory caching with persistent storage to ensure efficient message handling and durability. It leverages Valkey (a Redis-compatible in-memory data store) for rapid access and PostgreSQL for long-term storage.

Architecture

Valkey (In-Memory Cache)
  • recent_messages: Circular buffer of recent public messages.
  • flush_messages: Queue of public messages waiting to be persisted.
  • recent_private_messages:<userID>: Per-user circular cache of private messages (both sent and received).
  • flush_private_messages: Global queue of private messages pending database flush.
  • cache_message_id: Auto-increment counter for public messages.
  • cache_private_message_id: Auto-increment counter for private messages.
  • ratelimit:<userID>: Tracks per-user message counts for rate limiting.
PostgreSQL (Persistent Storage)
  • chat_messages: Stores all flushed public chat messages.
  • private_messages: Stores all flushed private messages.

πŸ” Workflow

1. Message Ingestion
  • Public and private messages are serialized as JSON and enriched with a unique cache_id.
  • A Lua script:
    • Atomically increments the appropriate counter (cache_message_id or cache_private_message_id).
    • Stores messages in both:
      • Circular cache for recent access.
      • Flush queue for eventual DB persistence.
    • If the sender is also the recipient (a DM to self), the message is only stored once.
2. 🚦 Rate Limiting
  • Each user is tracked via a key: ratelimit:<userID>.
  • A Lua script:
    • Uses INCR to count messages.
    • Applies EXPIRE to set the time window.
    • Rejects messages once the limit is exceeded.
  • Limits are customizable:
    • MessageLimit: max messages per window.
    • WindowSeconds: length of the rate window in seconds.
3. Flushing to Database
  • Flush is triggered when:
    • The flush queue (flush_messages or flush_private_messages) reaches the threshold (maxCacheSize).
    • Or periodically using a timer (flushInterval).
  • Messages are written to the database in a single transaction.
  • Upon success, the flush queue is cleared to prevent duplicates.

Configuration

Setting Purpose Default
maxCacheSize Max number of messages to keep before flush 500
flushInterval Interval to flush messages automatically 2 minutes
MessageLimit Max number of messages allowed per user configurable
WindowSeconds Duration of message window for rate limiting configurable

You can update rate limits dynamically using:

cache.UpdateRateLimitSettings(limit, window)

Features

  • Atomic caching and trimming via Lua.
  • Full support for both public and private messages.
  • Automatic and manual database flush control.
  • Per-user rate limiting with Lua-based enforcement.
  • Self-DMs are deduplicated to avoid storing duplicates.

πŸ“ TODO

  • Improve error recovery on DB flush
    A single bad message aborts the entire transaction. Consider:

    • Logging and skipping failed inserts with continue
    • Fallback queue for unflushable messages
  • Add testing coverage
    Write unit and integration tests for:

    • Lua execution
    • Flush logic and failure paths
    • Rate limiting behavior
    • Serialization integrity
  • Graceful shutdown
    Ensure that any pending flush queues are committed to the DB on application shutdown.

πŸ§ͺ Dev Notes

You can inspect and delete private message keys in Valkey using:

valkey-cli
127.0.0.1:6379> keys recent_private_messages:*
127.0.0.1:6379> del recent_private_messages:<userID>

To clear all private message caches:

valkey-cli --scan --pattern 'recent_private_messages:*' | xargs valkey-cli del

Documentation ΒΆ

Index ΒΆ

Constants ΒΆ

This section is empty.

Variables ΒΆ

This section is empty.

Functions ΒΆ

This section is empty.

Types ΒΆ

type MessageCache ΒΆ

type MessageCache struct {
	ValkeyClient valkey.Client
	DB           *pgxpool.Pool

	WindowSeconds int
	MessageLimit  int
	// contains filtered or unexported fields
}

func (*MessageCache) AttemptCachePrivateWithRateLimit ΒΆ

func (m *MessageCache) AttemptCachePrivateWithRateLimit(userID string, msg models.PrivateChatMessage) (int, error)

func (*MessageCache) AttemptCacheWithRateLimit ΒΆ

func (m *MessageCache) AttemptCacheWithRateLimit(userID string, msg models.ChatMessage) (int, error)

func (*MessageCache) CacheChatMessage ΒΆ

func (m *MessageCache) CacheChatMessage(msg models.ChatMessage) int

Caches a chat message in Valkey and triggers a DB flush if max cache size is reached

func (*MessageCache) CachePrivateMessage ΒΆ

func (m *MessageCache) CachePrivateMessage(msg models.PrivateChatMessage) int

func (*MessageCache) CheckRateLimitValkey ΒΆ

func (m *MessageCache) CheckRateLimitValkey(userID string, limit int, ttlSeconds int) (bool, error)

func (*MessageCache) DeleteCachedMessage ΒΆ

func (m *MessageCache) DeleteCachedMessage(cacheID int) bool

func (*MessageCache) FlushCacheToDB ΒΆ

func (m *MessageCache) FlushCacheToDB()

Flushes cached messages to the PostgreSQL database

func (*MessageCache) FlushPrivateMessagesToDB ΒΆ

func (m *MessageCache) FlushPrivateMessagesToDB()

FlushPrivateMessagesToDB writes Valkey-cached private messages to PostgreSQL and clears the flush list

func (*MessageCache) GetCachedChatMessages ΒΆ

func (m *MessageCache) GetCachedChatMessages() []models.ChatMessage

Retrieves chat messages from the circular cache

func (*MessageCache) GetCachedPrivateMessages ΒΆ

func (m *MessageCache) GetCachedPrivateMessages(userID string) []models.PrivateChatMessage

func (*MessageCache) StartPeriodicFlush ΒΆ

func (m *MessageCache) StartPeriodicFlush()

StartPeriodicFlush triggers database flush every interval

func (*MessageCache) UpdateRateLimitSettings ΒΆ

func (m *MessageCache) UpdateRateLimitSettings(limit int, window int)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL