gaio

package module
v1.2.26 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 29, 2025 License: MIT Imports: 13 Imported by: 9

README

gaio

GoDoc MIT licensed Build Status Go Report Card Coverage Statusd

gaio

Introduction

中文介绍

In a typical Go network program, you start by accepting a connection with conn := lis.Accept(), then initiate a goroutine to handle incoming data using go func(net.Conn). Next, you allocate a buffer with buf := make([]byte, 4096) and wait for data with conn.Read(buf).

For a server managing over 10,000 connections with frequent short messages (e.g., <512 bytes), the cost of context switching becomes significantly higher than that of receiving messages—each context switch can require over 1,000 CPU cycles or around 600 ns on a 2.1 GHz processor.

By eliminating one goroutine per connection through Edge-Triggered I/O Multiplexing, you can save the 2KB (R) + 2KB (W) stack space typically used per goroutine. Additionally, by employing an internal swap buffer, you can avoid the need for buf := make([]byte, 4096) at the expense of some performance.

The gaio library implements the proactor pattern, effectively addressing both memory constraints and performance objectives.

Features

  • High Performance: Tested in High Frequency Trading environments, handling 30K–40K RPS on a single HVM server.
  • Scalability: Designed for over C10K concurrent connections, optimizing both parallelism and single connection throughput.
  • Flexible Buffering: Use Read(ctx, conn, buffer) with a nil buffer to leverage the internal swap buffer.
  • Non-Intrusive Integration: Compatible with net.Listener and net.Conn (supports syscall.RawConn), allowing easy integration into existing applications.
  • Efficient Context Switching: Minimizes context switching costs for small messages, ideal for frequent chat message exchanges.
  • Customizable Delegation: Applications can choose when to delegate net.Conn to gaio, such as after a handshake or specific net.TCPConn settings.
  • Back-Pressure Handling: Applications can control when to submit read or write requests, enabling per-connection back-pressure to slow down sending when necessary, particularly useful for transferring data from a faster source (A) to a slower destination (B).
  • Lightweight and Maintainable: Approximately 1,000 lines of code, making it easy to debug.
  • Cross-Platform Support: Compatible with Linux and BSD.

Conventions

  • Connection Delegation: Once you submit an async read/write request with a related net.Conn to gaio.Watcher, that connection is delegated to the watcher. Subsequent calls to conn.Read or conn.Write will return an error, but TCP properties set by SetReadBuffer(), SetWriteBuffer(), SetLinger(), SetKeepAlive(), and SetNoDelay() will be retained.

  • Resource Management: If you no longer need a connection, call Watcher.Free(net.Conn) to immediately close the socket and free resources. If you forget to call Watcher.Free, the runtime garbage collector will clean up system resources if net.Conn is not referenced elsewhere. Failing to call Watcher.Close() will lead the garbage collector to clean up all related resources if the watcher is unreferenced.

  • Load Balancing: For connection load balancing, create multiple gaio.Watcher instances to distribute net.Conn using your preferred strategy. For acceptor load balancing, utilize go-reuseport as the listener.

  • Safe Read Requests: When submitting read requests with a 'nil' buffer, the returned []byte from Watcher.WaitIO() is safe to use until the next call to Watcher.WaitIO().

TL;DR

package main

import (
        "log"
        "net"

        "github.com/xtaci/gaio"
)

// this goroutine will wait for all io events, and sents back everything it received
// in async way
func echoServer(w *gaio.Watcher) {
        for {
                // loop wait for any IO events
                results, err := w.WaitIO()
                if err != nil {
                        log.Println(err)
                        return
                }

                for _, res := range results {
                        switch res.Operation {
                        case gaio.OpRead: // read completion event
                                if res.Error == nil {
                                        // send back everything, we won't start to read again until write completes.
                                        // submit an async write request
                                        w.Write(nil, res.Conn, res.Buffer[:res.Size])
                                }
                        case gaio.OpWrite: // write completion event
                                if res.Error == nil {
                                        // since write has completed, let's start read on this conn again
                                        w.Read(nil, res.Conn, res.Buffer[:cap(res.Buffer)])
                                }
                        }
                }
        }
}

func main() {
        w, err := gaio.NewWatcher()
        if err != nil {
              log.Fatal(err)
        }
        defer w.Close()
	
        go echoServer(w)

        ln, err := net.Listen("tcp", "localhost:0")
        if err != nil {
                log.Fatal(err)
        }
        log.Println("echo server listening on", ln.Addr())

        for {
                conn, err := ln.Accept()
                if err != nil {
                        log.Println(err)
                        return
                }
                log.Println("new client", conn.RemoteAddr())

                // submit the first async read IO request
                err = w.Read(nil, conn, make([]byte, 128))
                if err != nil {
                        log.Println(err)
                        return
                }
        }
}

More examples
Push server package main
package main

import (
        "fmt"
        "log"
        "net"
        "time"

        "github.com/xtaci/gaio"
)

func main() {
        // by simply replace net.Listen with reuseport.Listen, everything is the same as in push-server
        // ln, err := reuseport.Listen("tcp", "localhost:0")
        ln, err := net.Listen("tcp", "localhost:0")
        if err != nil {
                log.Fatal(err)
        }

        log.Println("pushing server listening on", ln.Addr(), ", use telnet to receive push")

        // create a watcher
        w, err := gaio.NewWatcher()
        if err != nil {
                log.Fatal(err)
        }

        // channel
        ticker := time.NewTicker(time.Second)
        chConn := make(chan net.Conn)
        chIO := make(chan gaio.OpResult)

        // watcher.WaitIO goroutine
        go func() {
                for {
                        results, err := w.WaitIO()
                        if err != nil {
                                log.Println(err)
                                return
                        }

                        for _, res := range results {
                                chIO <- res
                        }
                }
        }()

        // main logic loop, like your program core loop.
        go func() {
                var conns []net.Conn
                for {
                        select {
                        case res := <-chIO: // receive IO events from watcher
                                if res.Error != nil {
                                        continue
                                }
                                conns = append(conns, res.Conn)
                        case t := <-ticker.C: // receive ticker events
                                push := []byte(fmt.Sprintf("%s\n", t))
                                // all conn will receive the same 'push' content
                                for _, conn := range conns {
                                        w.Write(nil, conn, push)
                                }
                                conns = nil
                        case conn := <-chConn: // receive new connection events
                                conns = append(conns, conn)
                        }
                }
        }()

        // this loop keeps on accepting connections and send to main loop
        for {
                conn, err := ln.Accept()
                if err != nil {
                        log.Println(err)
                        return
                }
                chConn <- conn
        }
}

Documentation

For complete documentation, see the associated Godoc.

Benchmarks

Test Case Throughput test with 64KB buffer
Description A client keep on sending 64KB bytes to server, server keeps on reading and sending back whatever it received, the client keeps on receiving whatever the server sent back until all bytes received successfully
Command go test -v -run=^$ -bench Echo
Macbook Pro 1695.27 MB/s 518 B/op 4 allocs/op
Linux AMD64 1883.23 MB/s 518 B/op 4 allocs/op
Raspberry Pi4 354.59 MB/s 334 B/op 4 allocs/op
Test Case 8K concurrent connection echo test
Description Start 8192 clients, each client send 1KB data to server, server keeps on reading and sending back whatever it received, the client keeps on receiving whatever the server sent back until all bytes received successfully.
Command go test -v -run=8k
Macbook Pro 1.09s
Linux AMD64 0.94s
Raspberry Pi4 2.09s

Testing Directives

On MacOS, you need to increase the max open files limit to run the benchmarks.

sysctl -w kern.ipc.somaxconn=4096
sysctl -w kern.maxfiles=100000
sysctl -w kern.maxfilesperproc=100000
sysctl -w net.inet.ip.portrange.first=1024
sysctl -w net.inet.ip.portrange.last=65535

ulimit -S -n 65536
Regression

regression

X -> number of concurrent connections, Y -> time of completion in seconds

Best-fit values	 
Slope	8.613e-005 ± 5.272e-006
Y-intercept	0.08278 ± 0.03998
X-intercept	-961.1
1/Slope	11610
 
95% Confidence Intervals	 
Slope	7.150e-005 to 0.0001008
Y-intercept	-0.02820 to 0.1938
X-intercept	-2642 to 287.1
 
Goodness of Fit	 
R square	0.9852
Sy.x	0.05421
 
Is slope significantly non-zero?	 
F	266.9
DFn,DFd	1,4
P Value	< 0.0001
Deviation from horizontal?	Significant
 
Data	 
Number of XY pairs	6
Equation	Y = 8.613e-005*X + 0.08278

FAQ

  1. if you encounter something like:
# github.com/xtaci/gaio [github.com/xtaci/gaio.test]
./aio_linux.go:155:7: undefined: setAffinity
./watcher.go:588:4: undefined: setAffinity
FAIL	github.com/xtaci/gaio [build failed]
FAIL

make sure you have gcc/clang installed.

License

gaio source code is available under the MIT License.

References

Status

Stable

Documentation

Overview

Package gaio is an Async-IO library for Golang.

gaio acts in proactor mode, https://en.wikipedia.org/wiki/Proactor_pattern. User submit async IO operations and waits for IO-completion signal.

Index

Constants

View Source
const (
	EV_READ  = 0x1
	EV_WRITE = 0x2
)

Variables

View Source
var (
	// ErrUnsupported means the watcher cannot support this type of connection
	ErrUnsupported = errors.New("unsupported connection type")
	// ErrNoRawConn means the connection has not implemented SyscallConn
	ErrNoRawConn = errors.New("net.Conn does implement net.RawConn")
	// ErrWatcherClosed means the watcher is closed
	ErrWatcherClosed = errors.New("watcher closed")
	// ErrPollerClosed suggest that poller has closed
	ErrPollerClosed = errors.New("poller closed")
	// ErrConnClosed means the user called Free() on related connection
	ErrConnClosed = errors.New("connection closed")
	// ErrDeadline means the specific operation has exceeded deadline before completion
	ErrDeadline = errors.New("operation exceeded deadline")
	// ErrEmptyBuffer means the buffer is nil
	ErrEmptyBuffer = errors.New("empty buffer")
	// ErrCPUID indicates the given cpuid is invalid
	ErrCPUID = errors.New("no such core")
)

Functions

This section is empty.

Types

type OpResult

type OpResult struct {
	// Operation Type
	Operation OpType
	// User context associated with this requests
	Context interface{}
	// Related net.Conn to this result
	Conn net.Conn
	// Buffer points to user's supplied buffer or watcher's internal swap buffer
	Buffer []byte
	// IsSwapBuffer marks true if the buffer internal one
	IsSwapBuffer bool
	// Number of bytes sent or received, Buffer[:Size] is the content sent or received.
	Size int
	// IO error,timeout error
	Error error
}

OpResult is the result of an aysnc-io

type OpType added in v1.0.5

type OpType int

OpType defines Operation Type

const (
	// OpRead means the aiocb is a read operation
	OpRead OpType = iota
	// OpWrite means the aiocb is a write operation
	OpWrite
)

type Signal added in v1.2.20

type Signal struct {
	// contains filtered or unexported fields
}

Signal is a package of events when you've done with events, you should send a signal to done channel.

type Watcher

type Watcher struct {
	// contains filtered or unexported fields
}

Watcher will monitor events and process async-io request(s),

func NewWatcher added in v1.0.11

func NewWatcher() (*Watcher, error)

NewWatcher creates a new Watcher instance with a default internal buffer size of 64KB.

func NewWatcherSize added in v1.1.7

func NewWatcherSize(bufsize int) (*Watcher, error)

NewWatcherSize creates a new Watcher instance with a specified internal buffer size.

It allocates three shared buffers of the given size for handling read requests. This allows efficient management of read operations by using pre-allocated buffers.

func (Watcher) Close

func (w Watcher) Close() (err error)

Close stops monitoring on events for all connections

func (Watcher) Free added in v1.1.0

func (w Watcher) Free(conn net.Conn) error

Free releases resources related to 'conn' immediately, such as socket file descriptors.

func (Watcher) GetGC added in v1.2.21

func (w Watcher) GetGC() (found uint32, closed uint32)

read gcFound & gcClosed

func (Watcher) Read

func (w Watcher) Read(ctx interface{}, conn net.Conn, buf []byte) error

Read submits an asynchronous read request on 'conn' with context 'ctx' and optional buffer 'buf'. If 'buf' is nil, an internal buffer is used. 'ctx' is a user-defined value passed unchanged.

func (Watcher) ReadFull added in v1.2.7

func (w Watcher) ReadFull(ctx interface{}, conn net.Conn, buf []byte, deadline time.Time) error

ReadFull submits an asynchronous read request on 'conn' with context 'ctx' and buffer 'buf', expecting to fill the buffer before 'deadline'. 'ctx' is a user-defined value passed unchanged. 'buf' must not be nil for ReadFull.

func (Watcher) ReadTimeout added in v1.0.6

func (w Watcher) ReadTimeout(ctx interface{}, conn net.Conn, buf []byte, deadline time.Time) error

ReadTimeout submits an asynchronous read request on 'conn' with context 'ctx' and buffer 'buf', expecting to read some bytes before 'deadline'. 'ctx' is a user-defined value passed unchanged.

func (Watcher) SetLoopAffinity added in v1.2.10

func (w Watcher) SetLoopAffinity(cpuid int) (err error)

Set Loop Affinity for syscall.Read/syscall.Write

func (Watcher) SetPollerAffinity added in v1.2.10

func (w Watcher) SetPollerAffinity(cpuid int) (err error)

Set Poller Affinity for Epoll/Kqueue

func (Watcher) WaitIO added in v1.0.5

func (w Watcher) WaitIO() (r []OpResult, err error)

WaitIO blocks until one or more read/write operations are completed or an error occurs. It returns a slice of OpResult containing details of completed operations and any errors encountered.

The method operates as follows: 1. It recycles previously used aiocb objects to avoid memory leaks and reuse them for new I/O operations. 2. It waits for completion notifications from the chResults channel and accumulates results. 3. It ensures that the buffer in OpResult is not overwritten until the next call to WaitIO.

func (Watcher) Write

func (w Watcher) Write(ctx interface{}, conn net.Conn, buf []byte) error

Write submits an asynchronous write request on 'conn' with context 'ctx' and buffer 'buf'. 'ctx' is a user-defined value passed unchanged.

func (Watcher) WriteTimeout added in v1.0.6

func (w Watcher) WriteTimeout(ctx interface{}, conn net.Conn, buf []byte, deadline time.Time) error

WriteTimeout submits an asynchronous write request on 'conn' with context 'ctx' and buffer 'buf', expecting to complete writing before 'deadline'. 'ctx' is a user-defined value passed unchanged.

Directories

Path Synopsis
examples
echo-server command
push-server command