Connector SDK
Connectors are how Lobu turns external systems — REST APIs, GraphQL, webhooks, files, OAuth-protected services — into the typed event stream that watchers shape into entities and memory.
A connector is a TypeScript class that extends ConnectorRuntime and ships three things:
- a
definitiondescribing the connector (key, name, version, auth, feeds, actions), - a
sync(ctx)method that pulls the next slice of data and returns events, - an optional
execute(ctx)method that runs writes back to the source (create issue, send email).
Sync runs are idempotent: each run returns a checkpoint (cursor, timestamp, ID set) that the next run reads back via ctx.checkpoint.
Install
Section titled “Install”bun add @lobu/connector-sdk# ornpm install @lobu/connector-sdk# orpnpm add @lobu/connector-sdkThe package is published from this repo and tracks the same release line as @lobu/cli and the gateway.
A typed connector, end to end
Section titled “A typed connector, end to end”The example below pulls issues from a GitHub repository, polls incrementally with a typed checkpoint, and emits one EventEnvelope per issue. Every field has a real type — no as any casts, no // biome-ignore directives.
import { ConnectorRuntime, type ConnectorDefinition, type EventEnvelope, type SyncContext, type SyncResult,} from "@lobu/connector-sdk";
// User-supplied connection config (rendered as a form in the admin UI).interface GitHubConfig { owner: string; repo: string;}
// The shape we persist between runs. Cursor-based pagination so re-runs// only fetch issues updated after the last successful sync.interface GitHubCheckpoint { last_updated_at: string | null;}
// Minimal subset of the GitHub REST API issue payload we actually read.interface GitHubIssue { id: number; number: number; title: string; body: string | null; html_url: string; updated_at: string; user: { login: string } | null;}
// Tiny typed helper so we never reach into `ctx.checkpoint` raw.function readCheckpoint(raw: SyncContext["checkpoint"]): GitHubCheckpoint { const cp = (raw ?? {}) as Partial<GitHubCheckpoint>; return { last_updated_at: cp.last_updated_at ?? null };}
export default class GitHubIssuesConnector extends ConnectorRuntime { readonly definition: ConnectorDefinition = { key: "github-issues", name: "GitHub issues", version: "1.0.0", // Personal access token is collected once per connection and stored // encrypted; the worker only ever sees a `lobu_secret_<uuid>` placeholder. authSchema: { methods: [ { type: "env_keys", fields: [ { key: "token", label: "GitHub PAT", secret: true, required: true }, ], }, ], }, feeds: { issues: { key: "issues", name: "Issues" }, }, };
async sync(ctx: SyncContext): Promise<SyncResult> { // For `env_keys` auth, the values land in `ctx.config` keyed by the // `key` you declared on the auth field. OAuth tokens (for `oauth` auth) // arrive on `ctx.credentials.accessToken` instead. const config = ctx.config as unknown as GitHubConfig & { token?: string }; const checkpoint = readCheckpoint(ctx.checkpoint); const token = config.token ?? "";
// GitHub returns issues updated *at or after* `since`; we want // strictly after, so we filter by id below. const since = checkpoint.last_updated_at ?? "1970-01-01T00:00:00Z"; const url = `https://api.github.com/repos/${config.owner}/${config.repo}/issues` + `?state=all&sort=updated&direction=asc&per_page=100&since=${since}`;
const response = await fetch(url, { headers: { Authorization: `Bearer ${token}`, Accept: "application/vnd.github+json", }, }); if (!response.ok) { throw new Error(`GitHub ${response.status}: ${await response.text()}`); }
const issues = (await response.json()) as GitHubIssue[]; const fresh = issues.filter((i) => i.updated_at !== checkpoint.last_updated_at);
const events: EventEnvelope[] = fresh.map((issue) => ({ origin_id: String(issue.id), origin_type: "issue", title: `#${issue.number} ${issue.title}`, payload_text: issue.body ?? "", source_url: issue.html_url, author_name: issue.user?.login, occurred_at: new Date(issue.updated_at), }));
return { events, // Always advance the checkpoint to the newest `updated_at` we saw. // If the page was empty, return the previous value verbatim so the // next run is still idempotent. checkpoint: { last_updated_at: fresh.at(-1)?.updated_at ?? checkpoint.last_updated_at, } satisfies GitHubCheckpoint, }; }
async execute(): Promise<{ success: false; error: string }> { return { success: false, error: "github-issues is read-only" }; }}A few things to notice:
SyncContext["checkpoint"]isRecord<string, unknown> | null. Wrap it once in a tiny typed reader (readCheckpoint) instead of casting at every call site.env_keyscredentials live onctx.config, notctx.credentials. Lobu merges the values the user filled into theenv_keysform intoctx.configunder the keys you declared (tokenhere).ctx.credentialsis reserved foroauthauth —accessToken,refreshToken,scope,expiresAt.- The PAT is a
lobu_secret_<uuid>placeholder at runtime. The gateway’s secret proxy swaps it for the real value when the outbound HTTPS request leaves the worker, so the secret never lives in the worker’s memory. - Pagination via the
sincequery param. The GitHubLinkheader is the alternative for cursor-style paging when you need to walk a stable, ordered list;sinceis simpler when the source already gives you a monotonic timestamp.
Save this file in your Lobu project (e.g. github-issues.connector.ts next to lobu.config.ts) and list it in your config:
import { connectorFromFile, defineConfig } from "@lobu/cli/config";import type GitHubIssuesConnector from "./github-issues.connector.ts";
export default defineConfig({ connectors: [ connectorFromFile<typeof GitHubIssuesConnector>( "./github-issues.connector.ts" ), ], // ...agents, connections, etc.});Passing the connector’s type via the generic (import type + connectorFromFile<typeof GitHubIssuesConnector>) is optional — bare connectorFromFile("./github-issues.connector.ts") still works — but it gives you go-to-definition, rename, and a tsc error if the file’s default export ever stops being a ConnectorRuntime subclass. The import type is erased at compile time, so the connector module is never loaded while your config is evaluated.
lobu apply ships the source to the gateway, which compiles and registers it; from there each feeds.<key> entry shows up as something a user can create a connection for in the admin UI.
Concepts
Section titled “Concepts”ConnectorDefinition
Section titled “ConnectorDefinition”The static metadata for your connector. Filed under connector_definitions in the gateway DB after lobu apply.
| Field | Required | Description |
|---|---|---|
key | yes | Unique global key, e.g. google.gmail, github-issues |
name | yes | Human-readable label |
version | yes | Semver — bump to invalidate per-feed checkpoints if the event shape changes |
authSchema | no | How users authenticate this connector (see below) |
feeds | no | Map of feed key → FeedDefinition (a connector typically has one or more feeds) |
actions | no | Map of action key → ActionDefinition (only needed if you also implement execute) |
requiredCapability | no | When set, only worker pods/devices advertising this capability serve runs (e.g. screentime for the Mac app) |
runtime | no | Pin to a device platform (iOS, macOS, …) — omit for cloud-side connectors |
See the full type at reference/connector-sdk › ConnectorDefinition.
SyncContext
Section titled “SyncContext”What sync() receives. Every field is read-only.
| Field | Description |
|---|---|
feedKey | Which feed Lobu is asking you to run |
config | The connection-level config the user filled in (typed by your FeedDefinition.configSchema) |
checkpoint | The last successful run’s checkpoint, or null on the first run |
credentials | OAuth tokens (accessToken, refreshToken, …) for oauth auth; null for everything else. env_keys values land on ctx.config under the declared key. |
entityIds | Entities this feed is linked to (rarely needed; useful for scoping the sync) |
sessionState | Browser cookies / tokens captured by lobu memory browser-auth for browser auth |
emitEvents(events) | Optional streaming hook — flush a chunk before the run ends |
updateCheckpoint(cp) | Optional progress-checkpoint hook for long-running syncs |
SyncContext does not currently expose generics for config / checkpoint. Declare your own interfaces and convert at the boundary, as the example above does with readCheckpoint.
EventEnvelope
Section titled “EventEnvelope”The shape of one event in the stream. Each envelope becomes a row in the events table.
interface EventEnvelope { origin_id: string; // platform's unique ID for this item origin_type?: string; // source-native type (post, message, charge) payload_text: string; // main content payload_type?: "text" | "markdown" | "json_template" | "media" | "empty"; title?: string; author_name?: string; source_url?: string; // permalink back to the original occurred_at: Date; // when the event actually happened semantic_type?: string; // content, note, summary, fact, etc. score?: number; // 0-100 engagement / relevance metadata?: Record<string, unknown>;}Only origin_id, payload_text, and occurred_at are required. The full surface is documented in reference/connector-sdk › EventEnvelope.
SyncResult
Section titled “SyncResult”interface SyncResult { events: EventEnvelope[]; checkpoint: Record<string, unknown> | null; auth_update?: Record<string, unknown> | null; metadata?: { items_found?: number; items_skipped?: number; [key: string]: unknown; };}Return events: [] plus the same checkpoint you received on a no-new-data tick — runs stay idempotent.
ActionContext / ActionResult
Section titled “ActionContext / ActionResult”If your connector also writes back (e.g. assign_issue, send_email), declare an actions map on the definition and implement execute(ctx):
import type { ActionContext, ActionResult } from "@lobu/connector-sdk";
interface AssignIssueInput { issueId: string; assignee: string;}
async execute(ctx: ActionContext): Promise<ActionResult> { if (ctx.actionKey !== "assign_issue") { return { success: false, error: `unknown action ${ctx.actionKey}` }; } const { issueId, assignee } = ctx.input as unknown as AssignIssueInput; // Same `env_keys` field as sync() — execute()'s ctx.config carries it too. const token = String((ctx.config as { token?: string }).token ?? "");
await fetch(`https://api.example.com/issues/${issueId}`, { method: "PATCH", headers: { Authorization: `Bearer ${token}` }, body: JSON.stringify({ assignee }), }); return { success: true, output: { issueId, assignee } };}Each ActionDefinition declares requiresApproval: true | false plus MCP-style annotations (destructiveHint, idempotentHint). The gateway routes high-risk actions through the approval queue before the worker runs them.
Auth models
Section titled “Auth models”Declare on definition.authSchema. A connector can list multiple methods; the gateway lets the user pick.
type | Use when |
|---|---|
none | Public endpoint, no credentials needed |
env_keys | Static API keys (Stripe secret key, PAT) — fields rendered as form inputs, stored encrypted |
oauth | Standard OAuth 2.0 — Lobu handles the dance, refresh, and per-user token isolation |
browser | Session cookies captured via lobu memory browser-auth from a logged-in Chrome profile (or CDP) |
interactive | Custom auth flow (QR pairing, OTP, signed device handshake) — implement authenticate(ctx) and stream AuthArtifacts |
Workers never see the raw secret on the wire: the gateway’s secret-proxy swaps lobu_secret_<uuid> placeholders for real values at egress, so the string you pull from ctx.config.<field> (env_keys) or ctx.credentials.accessToken (oauth) looks like a normal token from your code, but it’s only resolved when the outbound request leaves the proxy.
Full breakdown at reference/connector-sdk › ConnectorAuthSchema.
Checkpoints
Section titled “Checkpoints”The checkpoint is your bookmark. It’s persisted on the feeds row after every successful sync and handed back as ctx.checkpoint on the next run. Three common shapes:
// Timestamp cursor (GitHub `since`, Stripe `created[gt]`):interface TimestampCheckpoint { last_updated_at: string | null;}
// Page token (Google APIs):interface PageTokenCheckpoint { next_page_token: string | null;}
// Bounded ID set (idempotency, no native cursor):interface IdSetCheckpoint { seen_ids: string[];}Rules of thumb:
- Always return a checkpoint, even on the no-new-data case — return the previous one verbatim. Returning
nulltells the gateway to treat the next run as a fresh start. - Cap unbounded structures (ID sets, in-flight queues) before persisting. Keep the last 1000 IDs — enough to dedupe across a sync window without bloating the row.
- Long-running syncs can call
ctx.updateCheckpoint(...)mid-flight so a crash doesn’t lose progress.
Where the file lives
Section titled “Where the file lives”A *.connector.ts file can live anywhere in your Lobu project; reference each one explicitly with connectorFromFile in defineConfig({ connectors }):
my-agent/├── lobu.config.ts # connectors: [connectorFromFile<typeof GitHubIssuesConnector>(...)]├── github-issues.connector.ts├── stripe-charges.connector.ts└── agents/my-agent/...lobu apply type-checks and ships only the listed connectors (there is no ./connectors auto-discovery). Update the version field whenever the event shape changes so the gateway forces a fresh checkpoint.
Dependencies
Section titled “Dependencies”A connector can pull in two kinds of dependency, and they are provisioned differently.
npm packages are bundled at compile time. Add them to the package.json next to your lobu.config.ts and import them normally:
import { parse } from "csv-parse/sync";lobu apply runs bun install --ignore-scripts in the project, then esbuild bundles each connector with the project’s node_modules and uploads the artifact. The server only ever receives the bundle, so npm deps ship inside it. --ignore-scripts keeps install-time supply-chain surface off your machine, which is also why packages that need native build steps do not belong here.
Native tools are provisioned at run time via nix. Declare them as nixpkgs attribute refs in runtime.nix.packages on the connector definition:
export default class VideoConnector extends ConnectorRuntime { definition: ConnectorDefinition = { key: "media.video", name: "Video", version: "1.0.0", runtime: { platforms: ["linux", "macos"], nix: { packages: ["ffmpeg", "imagemagick"] }, }, // ...feeds, actions }; // ...sync / execute can now shell out to ffmpeg}At execution the runtime wraps the connector’s subprocess in nix-shell -p <packages> so the declared tools are on PATH. Backends that cannot run native deps reject a connector that declares them, and a host without nix-shell errors with a clear message rather than failing mid-run.
The rule of thumb: npm is bundled (compile-time), native is nix (run-time). Never put a native tool in package.json expecting it to ship, and never list an npm package in runtime.nix.packages. See the ConnectorRuntimeInfo reference for the field shape.
See it in production
Section titled “See it in production”examples/ecommerce/stripe-charges.connector.ts— REST API,env_keysauth, timestamp checkpoint.examples/lobu-crm/funnel-form.connector.ts— small custom HTTP API, ID-set dedupe.
See also
Section titled “See also”- Reactions — the typed hook (part of this same package) for code that runs after watchers extract data.
@lobu/connector-sdkAPI reference — every exported symbol with types.- Memory — how connector events become durable entity memory.