NAME
    Data::PubSub::Shared - High-performance shared-memory pub/sub for Linux

SYNOPSIS
        use Data::PubSub::Shared;

        # Publisher
        my $ps = Data::PubSub::Shared::Int->new('/tmp/ps.shm', 1024);
        $ps->publish(42);
        $ps->publish_multi(1, 2, 3);

        # Subscriber (same or different process)
        my $ps2 = Data::PubSub::Shared::Int->new('/tmp/ps.shm', 1024);
        my $sub  = $ps2->subscribe;       # future messages only
        my $sub2 = $ps2->subscribe_all;  # from oldest available

        # Polling
        my $val = $sub->poll;            # non-blocking, undef if nothing
        my @v   = $sub->poll_multi(10);  # batch poll
        my @v   = $sub->drain;           # poll all available
        my @v   = $sub->drain(100);      # poll up to 100
        my $val = $sub->poll_wait;       # blocking, infinite wait
        my $val = $sub->poll_wait(1.5);  # with timeout
        my @v   = $sub->poll_wait_multi(10, 1.5);  # block for >=1, grab up to 10

        # Combined publish + eventfd notify
        $ps->publish_notify($value);

        # Status
        my $n = $sub->lag;               # messages behind
        my $n = $sub->overflow_count;    # total messages skipped
        $sub->reset;                     # skip to latest
        $sub->reset_oldest;              # go back to oldest available

        # String variant
        my $ps = Data::PubSub::Shared::Str->new('/tmp/ps.shm', 1024);
        $ps->publish("hello world");
        my $sub = $ps->subscribe;
        my $msg = $sub->poll;

        # Anonymous (fork-inherited)
        my $ps = Data::PubSub::Shared::Int->new(undef, 1024);

        # memfd-backed (shareable via fd passing)
        my $ps = Data::PubSub::Shared::Int->new_memfd("myps", 1024);
        my $fd = $ps->memfd;
        my $ps2 = Data::PubSub::Shared::Int->new_from_fd($fd);

        # Multiprocess
        if (fork() == 0) {
            my $child = Data::PubSub::Shared::Int->new('/tmp/ps.shm', 1024);
            my $sub = $child->subscribe;
            while (defined(my $v = $sub->poll_wait(1))) {
                print "got: $v\n";
            }
            exit;
        }
        $ps->publish(99);
        wait;

DESCRIPTION
    Data::PubSub::Shared provides broadcast pub/sub over shared memory
    (mmap(MAP_SHARED)). Publishers write to a ring buffer; each subscriber
    independently reads with its own cursor. Messages are never consumed --
    the ring overwrites old data when it wraps. Slow subscribers
    auto-recover by resetting to the oldest available position.

    Linux-only. Requires 64-bit Perl.

  Features
    *   File-backed mmap for cross-process sharing

    *   Lock-free MPMC publish for Int (atomic fetch-and-add)

    *   Lock-free subscribers for both variants (seqlock-style)

    *   Variable-length Str messages with circular arena

    *   Futex-based blocking poll with timeout (no busy-spin)

    *   PID-based stale lock recovery (Str mode)

    *   Batch publish/poll operations (drain, poll_wait_multi)

    *   Per-subscriber overflow counting

    *   Optional keyword API via XS::Parse::Keyword

  When to Use Int vs Str
    Int is best for signaling, counters, indices, timestamps, or any
    integer-valued broadcast. Lock-free MPMC publish means multiple
    publishers never block each other.

    Str is best for serialized messages, log lines, JSON payloads, or any
    variable-length data. Mutex-protected publish serializes concurrent
    publishers but subscribers remain lock-free.

  Variants
    Data::PubSub::Shared::Int - int64 values, lock-free MPMC publish
        Uses atomic fetch-and-add for multi-publisher support. Each slot has
        a sequence number; subscribers verify data consistency via
        double-check (seqlock-style). Zero contention between publishers and
        subscribers.

    Data::PubSub::Shared::Int32 - int32 values, lock-free, 8 bytes/slot
    Data::PubSub::Shared::Int16 - int16 values, lock-free, 8 bytes/slot
        Compact variants with 32-bit sequence numbers. Half the memory of
        Int (8 bytes/slot vs 16). Same lock-free MPMC algorithm and full
        API. Values outside the type range are silently truncated (standard
        C cast).

    Data::PubSub::Shared::Str - byte string values, mutex-protected publish
        Mutex-protected publish with variable-length messages stored in a
        circular arena (max capped at "msg_size"). Short messages use only
        the space they need. Subscribers read lock-free with seqlock-style
        double-check. UTF-8 flag preserved. Default max message size: 256
        bytes.

  Key Differences from Data::Queue::Shared
    *   Broadcast: every subscriber sees every message (queues consume)

    *   No backpressure: publish always succeeds (ring overwrites old data)

    *   Multiple independent readers: each subscriber has its own cursor

    *   Lock-free subscribers: subscribers never block publishers

  Constructor
        # Int
        my $ps = Data::PubSub::Shared::Int->new($path, $capacity);

        # Str
        my $ps = Data::PubSub::Shared::Str->new($path, $capacity);
        my $ps = Data::PubSub::Shared::Str->new($path, $capacity, $msg_size);

    Creates or opens a shared pub/sub backed by file $path. $capacity is
    rounded up to the next power of 2. When opening an existing file,
    parameters are read from the stored header. Pass "undef" for $path for
    anonymous (fork-inherited) pub/sub.

    For Str, $msg_size sets the maximum bytes per message (default: 256).
    Messages exceeding this size will croak. A circular arena of "capacity *
    (msg_size + 8)" bytes is allocated automatically.

  memfd Constructor
        my $ps = Data::PubSub::Shared::Int->new_memfd($name, $capacity);
        my $ps2 = Data::PubSub::Shared::Int->new_from_fd($ps->memfd);

  Publishing
        $ps->publish($value);                 # returns true
        my $n = $ps->publish_multi(@values);  # returns count published
        $ps->publish_notify($value);          # publish + eventfd notify

    Publish writes to the ring buffer and wakes any blocked subscribers. Int
    publish is lock-free (atomic fetch-and-add); "publish_multi" claims all
    slots in a single atomic operation (one fetch-add instead of N). Str
    publish is mutex-protected.

    "publish_notify" combines "publish" and "notify" in a single XS call,
    saving method dispatch overhead in the common non-batching case.

  Management
        $ps->clear;                  # reset ring: write_pos=0, all slots cleared
        $ps->sync;                   # msync to disk
        $ps->unlink;                 # remove backing file
        Class->unlink($path);        # class method form
        my $p = $ps->path;           # backing file path (undef for anon/memfd)
        my $s = $ps->stats;          # diagnostic hashref

    "clear" resets the ring buffer to its initial state: "write_pos",
    "stat_publish_ok", and all slot sequences are zeroed. Existing
    subscribers will need to call "reset_oldest" to see new messages. For
    Str mode, the arena write position is also reset.

    For Str, "publish_multi" holds the mutex for the entire batch (one
    lock/unlock cycle instead of N), which significantly improves throughput
    for batch string publishing.

  Subscribing
        my $sub = $ps->subscribe;       # future messages only
        my $sub = $ps->subscribe_all;   # from oldest available

    Creates a subscriber with its own cursor. Subscribers are process-local
    and cannot be shared between processes. Each process should create its
    own subscribers.

  Subscriber API
   Polling
        my $val = $sub->poll;            # non-blocking, undef if empty
        my @v   = $sub->poll_multi($n);  # batch, up to $n items
        my @v   = $sub->drain;           # poll all available
        my @v   = $sub->drain($max);     # poll up to $max
        my $val = $sub->poll_wait;       # blocking, infinite wait
        my $val = $sub->poll_wait($t);   # blocking with timeout (seconds)
        my @v   = $sub->poll_wait_multi($n, $timeout);

    "drain" returns all currently available messages in one call.

    "poll_wait_multi" blocks until at least one message is available (or
    timeout), then grabs up to $n messages non-blocking. Returns empty list
    on timeout.

   Status
        my $n = $sub->lag;             # messages behind write_pos
        my $c = $sub->cursor;         # current read position
        $sub->cursor($new_pos);       # seek to specific position
        my $o = $sub->has_overflow;    # true if ring wrapped past us
        my $n = $sub->overflow_count;  # total messages skipped due to overflow
        my $p = $sub->write_pos;      # publisher's current position

    "overflow_count" is a cumulative counter of messages skipped due to ring
    overflow. It increments each time "poll" auto-recovers by the number of
    messages that were lost.

   Callback-based Polling
        my $n = $sub->poll_cb(\&handler);    # call handler for each message
        my @v = $sub->drain_notify;          # eventfd_consume + drain
        my @v = $sub->drain_notify($max);    # eventfd_consume + drain up to $max

    "poll_cb" calls "handler" once per available message without returning
    to Perl between messages. Eliminates per-message method dispatch
    overhead. Returns the number of messages processed.

    "drain_notify" combines "eventfd_consume" and "drain" in a single XS
    call. Designed for event-loop callbacks:

        my $w = EV::io $fd, EV::READ, sub {
            my @msgs = $sub->drain_notify;
            process($_) for @msgs;
        };

    Subscribers inherit the handle's eventfd at creation time. Use
    "$sub->eventfd_set($fd)" to set it manually, or "$sub->fileno" to query
    it.

   Cursor Management
        $sub->reset;         # skip to current write_pos (future only)
        $sub->reset_oldest;  # go back to oldest available

    If a subscriber falls behind by more than "capacity" messages, "poll"
    auto-recovers by resetting to the oldest available position and
    returning the next available message. The skipped message count is added
    to "overflow_count".

  Event Loop Integration
        my $fd = $ps->eventfd;
        $ps->notify;            # after publish (opt-in)

        use EV;
        my $sub = $ps->subscribe;
        my $w = EV::io $fd, EV::READ, sub {
            $ps->eventfd_consume;
            while (defined(my $v = $sub->poll)) { process($v) }
        };

  Crash Safety
    Str mode uses a futex-based mutex with PID tracking. If a publisher dies
    while holding the mutex, other publishers detect the stale lock within 2
    seconds and automatically recover. Int mode is lock-free and requires no
    crash recovery.

  Keyword API
        use Data::PubSub::Shared::Int;

        ps_int_publish $ps, $value;
        my $val = ps_int_poll $sub;
        my $n   = ps_int_lag $sub;

        use Data::PubSub::Shared::Str;

        ps_str_publish $ps, $value;
        my $val = ps_str_poll $sub;

    Replace "int" with "int32", "int16", or "str" for other variants.
    Keywords are lexically scoped and require XS::Parse::Keyword at build
    time.

BENCHMARKS
    Single-process throughput, 1M items, Linux x86_64. Run "perl -Mblib
    bench/throughput.pl" to reproduce.

        PUBLISH + POLL (interleaved)
        Int     5.0M/s   (16 bytes/slot)
        Int32   5.9M/s   (8 bytes/slot)
        Int16   5.7M/s   (8 bytes/slot)
        Str     2.5M/s   (~30B messages)

        BATCH PUBLISH (100/batch)
        Int publish_multi:    170M/s
        Str publish_multi:     42M/s

    Fan-out: publish throughput is independent of subscriber count
    (subscribers are lock-free and read-only).

AUTHOR
    vividsnow

LICENSE
    This is free software; you can redistribute it and/or modify it under
    the same terms as Perl itself.

