Skip to content

ripienaar/jsrate

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

1 Commit
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

jsrate

A distributed time-window rate limiter for Go, backed by NATS JetStream.

Purpose

jsrate provides a sliding-window rate limiter that is safe to use from many processes at once. State lives in a single JetStream stream, so any number of limiter instances pointed at the same stream share the same budget. There is no extra coordinator to run: the stream itself enforces the limit through JetStream's per-subject discard policy.

Each limiter has:

  • A Window, the duration over which events are counted.
  • A Limit, the maximum number of events allowed for a single item within that window.

Items are arbitrary strings (for example a user id, an API key, or a tenant name). Each item gets its own independent budget within the same limiter.

Two operations are supported:

  • Do takes a single slot and runs the supplied function on success.
  • DoN takes count slots atomically using JetStream batch publishing, and runs the supplied function only if all slots could be reserved. If any slot would exceed the limit the whole batch is rolled back and no slots are consumed.

Both return ErrRateExceeded when the limit would be exceeded, and ErrStorageFailed for underlying JetStream errors.

Requirements

  • A NATS server with JetStream enabled.
  • Batch publishing support on the server (used by DoN).

Installation

go get github.com/ripienaar/jsrate

Creating a limiter

nc, err := nats.Connect(nats.DefaultURL)
if err != nil {
    log.Fatal(err)
}
defer nc.Close()

js, err := jetstream.New(nc)
if err != nil {
    log.Fatal(err)
}

limiter, err := jsrate.NewLimiter(js, "api-requests", &jsrate.LimiterConfig{
    Window: time.Minute,
    Limit:  100,
})
if err != nil {
    log.Fatal(err)
}

If a stream with the given name already exists, NewLimiter attaches to it and the supplied configuration is ignored. This is how multiple processes share the same limiter: the first caller creates the stream, the rest attach.

Taking a single slot

err := limiter.Do(ctx, "user-42", func() {
    handleRequest()
})
switch {
case errors.Is(err, jsrate.ErrRateExceeded):
    // user-42 has hit the limit for the current window
case errors.Is(err, jsrate.ErrStorageFailed):
    // JetStream returned an error
case err != nil:
    // other error, for example a cancelled context
}

The function f is only invoked when a slot was successfully reserved.

Taking multiple slots atomically

DoN is useful when a single logical operation should consume more than one slot, for example a bulk request that should count as N units against the limit. The batch is all-or-nothing: either every slot is reserved or none of them are.

err := limiter.DoN(ctx, 5, "tenant-acme", func() {
    processBatch(5)
})
if errors.Is(err, jsrate.ErrRateExceeded) {
    // not enough slots remain in this window for the full batch of 5
}

If the batch cannot fit, no slots are consumed and a later call may still succeed once the window rolls forward or the requested count is small enough.

Per-item budgets

Each item has its own budget within the same limiter. With Limit: 3:

limiter.Do(ctx, "alpha", work) // 1 of 3 for alpha
limiter.Do(ctx, "alpha", work) // 2 of 3 for alpha
limiter.Do(ctx, "beta",  work) // 1 of 3 for beta, independent of alpha

Distributed use

Two processes that call NewLimiter with the same name share the same stream and therefore the same budget. The second call attaches to the existing stream.

// Process A
a, _ := jsrate.NewLimiter(js, "api-requests", &jsrate.LimiterConfig{
    Window: time.Minute,
    Limit:  100,
})

// Process B (config is ignored, attaches to the existing stream)
b, _ := jsrate.NewLimiter(js, "api-requests", nil)

Configuration

type LimiterConfig struct {
    Window   time.Duration         // sliding window length
    Limit    int64                 // maximum events per item per window
    Storage  jetstream.StorageType // defaults to MemoryStorage
    Replicas int                   // defaults to 1
}

Use FileStorage and a higher replica count when the limiter state needs to survive restarts or node loss.

Cleaning up

Delete removes the underlying stream and the recorded slot state with it. Use it when tearing down a limiter that is no longer needed.

if err := limiter.Delete(); err != nil {
    log.Printf("delete failed: %v", err)
}

Caveats

  • The window is enforced by JetStream message age, so timings are subject to network and broker delays. Treat the limit as a close approximation, not a hard cycle-accurate gate.
  • Do runs f only after the slot has been recorded, so f running implies the slot was successfully taken. The slot is not released if f panics or fails: the cost has already been accounted for the duration of the window.
  • DoN uses JetStream batch publishing, which requires a server build that supports it.

About

Windowed Rate Limiter backed by NATS JetStream

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors

Languages