Skip to main content
The Python SDK (meerkat-sdk) is a thin wrapper over Meerkat’s settled runtime-backed contracts. It spawns rkat-rpc as a subprocess, then exposes the same session lifecycle used by the CLI, REST, JSON-RPC, and MCP surfaces as Python-native Session and DeferredSession handles.
  • Python: >=3.10
  • Dependencies: zero runtime dependencies

Getting started

1

Install the SDK

pip install meerkat-sdk
2

Install the RPC binary

The SDK spawns rkat-rpc as a subprocess. Build it from source:
cargo build -p meerkat-rpc --release
# Binary is at target/release/rkat-rpc
Alternatively, MeerkatClient will attempt to download a matching release binary automatically if rkat-rpc is not found on PATH.
3

Connect and run

import asyncio
from meerkat import MeerkatClient, TextDelta

async def main():
    async with MeerkatClient() as client:
        session = await client.create_session("What is the capital of France?")
        print(session.text)

        # Multi-turn
        result = await session.turn("And of Germany?")
        print(result.text)

        await session.archive()

asyncio.run(main())
If rkat-rpc is not on PATH, pass its location explicitly or set the MEERKAT_BIN_PATH environment variable:
client = MeerkatClient(rkat_path="/path/to/rkat-rpc")

Method overview

MeerkatClient methods

Method / PropertyDescription
async with MeerkatClient() as clientPreferred usage — starts rkat-rpc and cleans up on exit
await client.connect(...)Explicit connect: spawn rkat-rpc, handshake, fetch capabilities
await client.close()Terminate the subprocess
await client.create_session(prompt, ...)Create a session and run the first turn; returns Session
client.create_session_streaming(prompt, ...)Create a session and stream typed events; returns EventStream
await client.list_sessions()List active sessions; returns list[SessionInfo]
await client.read_session(session_id)Read raw session state
await client.read_session_history(session_id, offset=0, limit=None)Read committed session transcript history
await client.create_mob(*, prefab=..., definition=...)Create a mob; returns Mob
await client.create_deferred_session(prompt, ...)Create a session without running the first turn; returns DeferredSession
await client.list_mobs()List mobs; returns list[MobSummary]
client.capabilitieslist[Capability] — capabilities fetched during handshake
client.has_capability(id)True if a capability is available
client.require_capability(id)Raises CapabilityUnavailableError if not available
await client.get_config()Read runtime configuration
await client.set_config(config)Replace runtime configuration
await client.patch_config(patch)Merge-patch runtime configuration

Session methods

create_session() returns a Session object. All subsequent operations on a session go through that runtime-backed handle rather than through wrapper-local execution state.
Method / PropertyDescription
session.idStable session UUID
session.refOptional human-readable session reference
session.textAssistant text from the last turn
session.usageUsage from the last turn
session.turnsNumber of LLM turns in the last run
session.tool_callsNumber of tool calls in the last run
session.structured_outputStructured output from the last run, if requested
session.last_resultFull RunResult from the last turn
await session.turn(prompt, ...)Run another turn; returns RunResult
session.stream(prompt, ...)Run another turn with streaming; returns EventStream
await session.history(offset=0, limit=None)Read committed transcript history for this session
await session.archive()Archive (remove) the session
await session.interrupt()Cancel the currently running turn
await session.invoke_skill(skill_ref, prompt)Invoke a skill in this session
await session.send(**kwargs)Push a comms event into this session
await session.peers()List peers visible to this session’s comms runtime
await session.subscribe_events()Open a standalone event subscription; returns EventSubscription

Mob methods

create_mob() returns a Mob object for explicit mob lifecycle control.
MethodDescription
mob.idThe mob’s identifier
await mob.status()Get mob status
await mob.lifecycle(action)Stop, resume, complete, or destroy the mob
await mob.spawn(...)Spawn a member into the mob
await mob.retire(meerkat_id)Retire a member
await mob.respawn(meerkat_id)Respawn a member
await mob.wire(a, b)Wire two members for comms
await mob.unwire(a, b)Remove comms wiring
await mob.members()List mob members
await mob.member(meerkat_id).send(content, handling_mode=...)Send work to a member
await mob.flows()List available flows
await mob.run_flow(flow_id, params)Start a flow run
await mob.flow_status(run_id)Get flow run status
await mob.cancel_flow(run_id)Cancel a flow run
await mob.subscribe_member_events(meerkat_id)Subscribe to member events; returns EventSubscription
await mob.subscribe_events()Subscribe to mob-wide events; returns EventSubscription

MeerkatClient

Constructor

MeerkatClient(rkat_path: str = "rkat-rpc")
Creates the client object. The subprocess is not started until connect() is called (or the async context manager is entered).

connect()

async def connect(
    self,
    *,
    realm_id: str | None = None,
    instance_id: str | None = None,
    realm_backend: str | None = None,
    isolated: bool = False,
    state_root: str | None = None,
    context_root: str | None = None,
    user_config_root: str | None = None,
) -> MeerkatClient
Starts the rkat-rpc subprocess, performs the initialize handshake, checks contract version compatibility, and fetches capabilities. Returns self for chaining. If realm_id is omitted, each SDK process gets a default realm. Reuse realm_id to share config and sessions across SDK/CLI/REST/MCP surfaces. realm_id and isolated are mutually exclusive.

close()

async def close(self) -> None
Sends SIGTERM and waits up to 5 seconds; falls back to SIGKILL on timeout.

create_session()

async def create_session(
    self,
    prompt: str | list[dict],
    *,
    model: str | None = None,
    provider: str | None = None,
    system_prompt: str | None = None,
    max_tokens: int | None = None,
    output_schema: dict | None = None,
    structured_output_retries: int = 2,
    hooks_override: dict | None = None,
    enable_builtins: bool = False,
    enable_shell: bool = False,
    enable_memory: bool = False,
    enable_mob: bool = False,
    keep_alive: bool = False,
    comms_name: str | None = None,
    peer_meta: dict | None = None,
    budget_limits: dict | None = None,
    provider_params: dict | None = None,
    preload_skills: list[str] | None = None,
    skill_refs: list[SkillRef] | None = None,
    skill_references: list[str] | None = None,
) -> Session
Creates a new session and runs the first turn. Returns a Session whose convenience properties (text, usage, turns, tool_calls, structured_output) reflect the result of that first turn.

create_session_streaming()

def create_session_streaming(
    self,
    prompt: str | list[dict],
    *,
    # same keyword arguments as create_session()
) -> EventStream
Creates a new session and streams typed events from the first turn. Returns an EventStream async context manager — the request is sent when entering the async with block. After iteration, stream.session_id and stream.result are available.
async with client.create_session_streaming("Hello!") as stream:
    async for event in stream:
        match event:
            case TextDelta(delta=chunk):
                print(chunk, end="", flush=True)
    print()
    session_id = stream.session_id
    result = stream.result

Capabilities

# Property: list[Capability] populated during connect()
client.capabilities

# Check availability
if client.has_capability("shell"):
    print("Shell tool is available")

# Guard a code path
client.require_capability("comms")  # raises CapabilityUnavailableError if unavailable

Session queries

sessions: list[SessionInfo] = await client.list_sessions()
raw: dict = await client.read_session(session_id)

Config management

config = await client.get_config()
await client.set_config(config)
updated = await client.patch_config({"max_tokens": 2048})

Session

Session is returned by create_session() and is the primary handle for multi-turn conversations.

Properties

session.id               # str  — stable session UUID
session.ref              # str | None  — optional human-readable reference
session.text             # str  — assistant text from the last turn
session.usage            # Usage — token usage from the last turn
session.turns            # int  — LLM turns in the last run
session.tool_calls       # int  — tool calls in the last run
session.structured_output  # Any | None — structured output if requested
session.last_result      # RunResult — full result object from the last turn

turn()

async def turn(
    self,
    prompt: str | list[dict],
    *,
    skill_refs: list[SkillRef] | None = None,
    skill_references: list[str] | None = None,
) -> RunResult
Runs another turn on this session (non-streaming). Updates last_result and returns the new RunResult.

stream()

def stream(
    self,
    prompt: str | list[dict],
    *,
    skill_refs: list[SkillRef] | None = None,
    skill_references: list[str] | None = None,
) -> EventStream
Runs another turn with streaming events. Returns an EventStream. Updates last_result when the stream completes.
async with session.stream("Explain the CI pipeline") as events:
    async for event in events:
        match event:
            case TextDelta(delta=chunk):
                print(chunk, end="", flush=True)
    print()
    print(f"Tokens used: {events.result.usage.input_tokens}")

archive() and interrupt()

await session.archive()     # Remove session from the server
await session.interrupt()   # Cancel the currently running turn

invoke_skill()

async def invoke_skill(self, skill_ref: SkillRef, prompt: str) -> RunResult
Invokes a skill in this session. Requires the "skills" capability — raises CapabilityUnavailableError if it is not available. Accepts a SkillKey or a legacy string of the form "<source_uuid>/<skill_name>".
from meerkat import SkillKey

key = SkillKey(source_uuid="abc123", skill_name="code-review")
result = await session.invoke_skill(key, "Review this function")

send() and peers()

# Push a comms event into this session
await session.send(payload={"alert": "deployment failed"}, source="monitoring")

# List peers visible to this session
peers: list[dict] = await session.peers()

Streaming

EventStream

EventStream is an async context manager returned by session.stream() and client.create_session_streaming(). Iterate it to receive typed Event subclass instances.
async with session.stream("Summarise the logs") as events:
    async for event in events:
        match event:
            case TextDelta(delta=chunk):
                print(chunk, end="", flush=True)
            case ToolCallRequested(name=name, args=args):
                print(f"\nCalling tool: {name}")
            case TurnCompleted(usage=u):
                print(f"\nTokens: {u.input_tokens} in / {u.output_tokens} out")
            case _:
                pass
    result = events.result   # RunResult, available after iteration

Convenience methods

# Consume all events silently, return RunResult
result = await session.stream("prompt").collect()

# Accumulate text deltas, return (full_text, RunResult)
text, result = await session.stream("prompt").collect_text()

Typed event classes

All events are frozen dataclasses importable from meerkat. Unknown event types (from newer server versions) are surfaced as UnknownEvent for forward compatibility.
ClassKey fieldsDescription
RunStartedsession_id, promptAgent run has started
TurnStartedturn_numberA new LLM turn has begun
TextDeltadeltaIncremental text chunk from the LLM
TextCompletecontentFull assistant text for the current turn
ToolCallRequestedid, name, argsLLM wants to invoke a tool
ToolResultReceivedid, name, is_errorTool result fed back to LLM
TurnCompletedstop_reason, usageLLM turn finished
ToolExecutionStartedid, nameTool began executing
ToolExecutionCompletedid, name, result, is_error, duration_msTool finished
ToolExecutionTimedOutid, name, timeout_msTool exceeded its timeout
RunCompletedsession_id, result, usageAgent run completed
RunFailedsession_id, errorAgent run failed
CompactionStartedinput_tokens, estimated_history_tokens, message_countContext compaction began
CompactionCompletedsummary_tokens, messages_before, messages_afterCompaction finished
CompactionFailederrorCompaction failed
BudgetWarningbudget_type, used, limit, percentA budget threshold crossed
Retryingattempt, max_attempts, error, delay_msLLM request being retried
HookStartedhook_id, pointHook invocation started
HookCompletedhook_id, point, duration_msHook invocation completed
HookFailedhook_id, point, errorHook invocation failed
HookDeniedhook_id, point, reason_code, messageHook denied the operation
HookRewriteAppliedhook_id, point, patchHook rewrote part of request/response
SkillsResolvedskills, injection_bytesSkills resolved for this turn
SkillResolutionFailedreference, errorSkill reference could not be resolved
InteractionCompleteinteraction_id, resultComms interaction completed
InteractionFailedinteraction_id, errorComms interaction failed
StreamTruncatedreasonEvent stream was truncated
ToolConfigChangedpayloadLive tool configuration changed for this session
UnknownEventtype, dataUnrecognised event type (forward compat)

Parsing and typing notes

  • Capability statuses may be encoded as externally-tagged enum objects by Rust (for example {"DisabledByPolicy": {...}}). The SDK normalises these to the string key.
  • Event parsing intentionally defaults missing fields to empty/zero values so partially delivered streaming payloads can still be surfaced as typed events.
  • RunResult.skill_diagnostics is SkillRuntimeDiagnostics | None — present when the skills subsystem emits runtime diagnostics.

Examples

import asyncio
from meerkat import MeerkatClient

async def main():
    async with MeerkatClient() as client:
        session = await client.create_session(
            "You are a helpful math tutor. What is 12 * 15?",
            model="claude-sonnet-4-5",
        )
        print(f"Turn 1: {session.text}")

        result = await session.turn("Now divide that result by 3.")
        print(f"Turn 2: {result.text}")

        await session.archive()

asyncio.run(main())
import asyncio
from meerkat import MeerkatClient, TextDelta, ToolCallRequested, TurnCompleted

async def main():
    async with MeerkatClient() as client:
        session = await client.create_session(
            "List three interesting facts about penguins.",
            model="claude-sonnet-4-5",
        )
        print(f"First turn: {session.text}\n")

        async with session.stream("Now give me three more facts.") as events:
            async for event in events:
                match event:
                    case TextDelta(delta=chunk):
                        print(chunk, end="", flush=True)
                    case TurnCompleted(usage=u):
                        print(f"\nTokens: {u.input_tokens} in / {u.output_tokens} out")
                    case _:
                        pass

asyncio.run(main())
import asyncio
from meerkat import MeerkatClient

async def main():
    schema = {
        "type": "object",
        "properties": {
            "city": {"type": "string"},
            "country": {"type": "string"},
            "population": {"type": "integer"},
        },
        "required": ["city", "country", "population"],
    }

    async with MeerkatClient() as client:
        session = await client.create_session(
            "Give me information about Tokyo.",
            output_schema=schema,
            structured_output_retries=3,
        )
        print(session.structured_output)
        await session.archive()

asyncio.run(main())
import asyncio
from meerkat import MeerkatClient

async def main():
    async with MeerkatClient() as client:
        session = await client.create_session(
            "Create a file called hello.txt with 'Hello World' in it",
            enable_builtins=True,
            enable_shell=True,
        )
        print(f"Response: {session.text}")
        print(f"Tool calls made: {session.tool_calls}")
        await session.archive()

asyncio.run(main())
import asyncio
import base64
from meerkat import MeerkatClient

async def main():
    async with MeerkatClient() as client:
        # Read an image and encode as base64
        with open("screenshot.png", "rb") as f:
            image_data = base64.b64encode(f.read()).decode()

        # Send a multimodal prompt with text and image blocks
        session = await client.create_session(
            [
                {"type": "text", "text": "What do you see in this image?"},
                {"type": "image", "media_type": "image/png", "data": image_data},
            ],
            model="claude-sonnet-4-5",
        )
        print(session.text)
        await session.archive()

asyncio.run(main())
import asyncio
from meerkat import MeerkatClient, CapabilityUnavailableError

async def main():
    async with MeerkatClient() as client:
        print("Available capabilities:")
        for cap in client.capabilities:
            if cap.available:
                print(f"  {cap.id}: {cap.description}")

        if client.has_capability("skills"):
            print("Skills are available")

        try:
            client.require_capability("comms")
            print("Comms capability confirmed")
        except CapabilityUnavailableError as e:
            print(f"Comms not available: {e.message}")

asyncio.run(main())
import asyncio
from meerkat import MeerkatClient, SkillKey

async def main():
    async with MeerkatClient() as client:
        session = await client.create_session(
            "I need help reviewing code.",
            model="claude-sonnet-4-5",
        )

        key = SkillKey(source_uuid="abc123", skill_name="code-review")
        result = await session.invoke_skill(key, "Review this Python function for bugs.")
        print(result.text)

        await session.archive()

asyncio.run(main())

See also