Bot Development

Build automated bots for the Snek platform

Introduction

Snek bots are automated clients that connect via WebSocket and interact with the platform using the RPC API. Bots can:

Bots authenticate like regular users. Create a dedicated user account for your bot.

Protocol Overview

Bots communicate over WebSocket using a JSON-RPC style protocol:

Connection

Connect to wss://your-server/rpc.ws (or ws:// for local development).

Request Format

{
    "callId": "1",
    "method": "login",
    "args": ["bot_username", "bot_password"]
}

Response Format

{
    "callId": "1",
    "success": true,
    "data": { "uid": "...", "username": "bot_username", ... }
}

Server Events

After login, the server pushes events for new messages:

{
    "channel_uid": "abc123",
    "event": "new_message",
    "data": {
        "uid": "msg_uid",
        "message": "Hello bot!",
        "user_uid": "sender_uid",
        "username": "sender_name"
    }
}

Python Bot Example

A complete, working Python bot using aiohttp:

Python
# retoor <retoor@molodetz.nl>

import asyncio
import json
import aiohttp


class SnekBot:
    def __init__(self, base_url, username, password):
        self.base_url = base_url
        self.username = username
        self.password = password
        self.ws = None
        self.session = None
        self.call_id = 0
        self.pending = {}
        self.running = True

    async def connect(self):
        self.session = aiohttp.ClientSession()
        ws_url = self.base_url.replace("https", "wss").replace("http", "ws")
        ws_url = f"{ws_url}/rpc.ws"
        self.ws = await self.session.ws_connect(ws_url, heartbeat=30)
        asyncio.create_task(self._message_loop())
        user = await self.login()
        print(f"Logged in as {user.get('username')}")
        return user

    async def _message_loop(self):
        try:
            async for msg in self.ws:
                if msg.type == aiohttp.WSMsgType.TEXT:
                    data = json.loads(msg.data)
                    call_id = data.get("callId")
                    if call_id and call_id in self.pending:
                        self.pending[call_id].set_result(data)
                    elif data.get("event") == "new_message":
                        await self.on_message(data.get("data", {}))
                    elif data.get("event"):
                        await self.on_event(data)
                elif msg.type in (aiohttp.WSMsgType.CLOSED, aiohttp.WSMsgType.ERROR):
                    break
        except Exception as e:
            print(f"Message loop error: {e}")
        finally:
            self.running = False

    async def call(self, method, *args):
        self.call_id += 1
        call_id = str(self.call_id)
        future = asyncio.get_event_loop().create_future()
        self.pending[call_id] = future
        await self.ws.send_json({
            "callId": call_id,
            "method": method,
            "args": list(args)
        })
        try:
            result = await asyncio.wait_for(future, timeout=30)
            return result.get("data")
        finally:
            self.pending.pop(call_id, None)

    async def login(self):
        return await self.call("login", self.username, self.password)

    async def send_message(self, channel_uid, message):
        return await self.call("send_message", channel_uid, message, True)

    async def get_channels(self):
        return await self.call("get_channels")

    async def on_message(self, data):
        message = data.get("message", "")
        channel_uid = data.get("channel_uid")
        username = data.get("username", "")

        if username == self.username:
            return

        if message.lower().startswith("!ping"):
            await self.send_message(channel_uid, "Pong!")

        elif message.lower().startswith("!help"):
            help_text = "Available commands: !ping, !help, !time"
            await self.send_message(channel_uid, help_text)

        elif message.lower().startswith("!time"):
            import datetime
            now = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
            await self.send_message(channel_uid, f"Current time: {now}")

    async def on_event(self, data):
        event = data.get("event")
        print(f"Event: {event}")

    async def close(self):
        self.running = False
        if self.ws:
            await self.ws.close()
        if self.session:
            await self.session.close()


async def main():
    bot = SnekBot(
        base_url="https://snek.community",
        username="my_bot",
        password="my_bot_password"
    )

    try:
        await bot.connect()
        channels = await bot.get_channels()
        print(f"Member of {len(channels)} channels")

        while bot.running:
            await asyncio.sleep(1)
    except KeyboardInterrupt:
        print("Shutting down...")
    finally:
        await bot.close()


if __name__ == "__main__":
    asyncio.run(main())

JavaScript Bot Example

A complete Node.js bot using the ws package:

JavaScript (Node.js)
// retoor <retoor@molodetz.nl>

const WebSocket = require("ws")

class SnekBot {
    constructor(baseUrl, username, password) {
        this.baseUrl = baseUrl
        this.username = username
        this.password = password
        this.ws = null
        this.callId = 0
        this.pending = new Map()
        this.running = true
    }

    connect() {
        return new Promise((resolve, reject) => {
            const wsUrl = this.baseUrl
                .replace("https", "wss")
                .replace("http", "ws") + "/rpc.ws"

            this.ws = new WebSocket(wsUrl)

            this.ws.on("open", async () => {
                try {
                    const user = await this.login()
                    console.log(`Logged in as ${user.username}`)
                    resolve(user)
                } catch (e) {
                    reject(e)
                }
            })

            this.ws.on("message", (data) => {
                const parsed = JSON.parse(data.toString())
                const callId = parsed.callId

                if (callId && this.pending.has(callId)) {
                    this.pending.get(callId).resolve(parsed)
                    this.pending.delete(callId)
                } else if (parsed.event === "new_message") {
                    this.onMessage(parsed.data || {})
                } else if (parsed.event) {
                    this.onEvent(parsed)
                }
            })

            this.ws.on("close", () => {
                this.running = false
                console.log("Connection closed")
            })

            this.ws.on("error", (err) => {
                console.error("WebSocket error:", err.message)
                reject(err)
            })
        })
    }

    call(method, ...args) {
        return new Promise((resolve, reject) => {
            this.callId++
            const callId = String(this.callId)

            const timeout = setTimeout(() => {
                this.pending.delete(callId)
                reject(new Error("Request timeout"))
            }, 30000)

            this.pending.set(callId, {
                resolve: (data) => {
                    clearTimeout(timeout)
                    resolve(data.data)
                },
                reject
            })

            this.ws.send(JSON.stringify({
                callId,
                method,
                args
            }))
        })
    }

    login() {
        return this.call("login", this.username, this.password)
    }

    sendMessage(channelUid, message) {
        return this.call("send_message", channelUid, message, true)
    }

    getChannels() {
        return this.call("get_channels")
    }

    async onMessage(data) {
        const message = data.message || ""
        const channelUid = data.channel_uid
        const username = data.username || ""

        if (username === this.username) {
            return
        }

        if (message.toLowerCase().startsWith("!ping")) {
            await this.sendMessage(channelUid, "Pong!")
        } else if (message.toLowerCase().startsWith("!help")) {
            await this.sendMessage(channelUid, "Commands: !ping, !help, !time")
        } else if (message.toLowerCase().startsWith("!time")) {
            const now = new Date().toISOString()
            await this.sendMessage(channelUid, `Current time: ${now}`)
        }
    }

    onEvent(data) {
        console.log(`Event: ${data.event}`)
    }

    close() {
        this.running = false
        if (this.ws) {
            this.ws.close()
        }
    }
}

async function main() {
    const bot = new SnekBot(
        "https://snek.community",
        "my_bot",
        "my_bot_password"
    )

    try {
        await bot.connect()
        const channels = await bot.getChannels()
        console.log(`Member of ${channels.length} channels`)

        process.on("SIGINT", () => {
            console.log("Shutting down...")
            bot.close()
            process.exit(0)
        })
    } catch (e) {
        console.error("Failed to connect:", e.message)
        process.exit(1)
    }
}

main()

Event Handling

Bots receive various events after authentication:

new_message

Received when a message is posted to a channel the bot is a member of.

{
    "channel_uid": "...",
    "event": "new_message",
    "data": {
        "uid": "message_uid",
        "message": "Message content",
        "user_uid": "sender_uid",
        "username": "sender_username",
        "nick": "Sender Nick",
        "created_at": "2025-01-01T12:00:00"
    }
}

set_typing

Received when a user starts typing.

{
    "channel_uid": "...",
    "event": "set_typing",
    "data": {
        "user_uid": "...",
        "username": "...",
        "nick": "...",
        "color": "#7ef"
    }
}

update_message_text

Received when a message is edited (non-final messages only).

{
    "channel_uid": "...",
    "event": "update_message_text",
    "data": {
        "message_uid": "...",
        "text": "Updated message content"
    }
}

user_presence

Received when a user's online status changes.

Best Practices

Rate Limiting

Implement rate limiting to avoid overwhelming the server:

Error Handling

Handle errors gracefully:

Reconnection Strategy

Implement automatic reconnection:

Resource Cleanup

Clean up resources on shutdown:

Security

Advanced Topics

AI Integration

Integrate with AI services by calling external APIs when messages match certain patterns:

async def on_message(self, data):
    message = data.get("message", "")
    if message.startswith("!ask "):
        query = message[5:]
        response = await self.call_ai_api(query)
        await self.send_message(data["channel_uid"], response)

Scheduled Tasks

Use asyncio tasks for periodic operations:

async def daily_report(self):
    while self.running:
        await asyncio.sleep(86400)  # 24 hours
        for channel in await self.get_channels():
            await self.send_message(channel["uid"], "Daily report...")

Multiple Bots

Run multiple bot instances for different purposes. Each bot needs its own user account and WebSocket connection.