Skip to content
API Blog

Connector SDK

Connectors are how Lobu turns external systems — REST APIs, GraphQL, webhooks, files, OAuth-protected services — into the typed event stream that watchers shape into entities and memory.

A connector is a TypeScript class that extends ConnectorRuntime and ships three things:

  • a definition describing the connector (key, name, version, auth, feeds, actions),
  • a sync(ctx) method that pulls the next slice of data and returns events,
  • an optional execute(ctx) method that runs writes back to the source (create issue, send email).

Sync runs are idempotent: each run returns a checkpoint (cursor, timestamp, ID set) that the next run reads back via ctx.checkpoint.

Terminal window
bun add @lobu/connector-sdk
# or
npm install @lobu/connector-sdk
# or
pnpm add @lobu/connector-sdk

The package is published from this repo and tracks the same release line as @lobu/cli and the gateway.

The example below pulls issues from a GitHub repository, polls incrementally with a typed checkpoint, and emits one EventEnvelope per issue. Every field has a real type — no as any casts, no // biome-ignore directives.

import {
ConnectorRuntime,
type ConnectorDefinition,
type EventEnvelope,
type SyncContext,
type SyncResult,
} from "@lobu/connector-sdk";
// User-supplied connection config (rendered as a form in the admin UI).
interface GitHubConfig {
owner: string;
repo: string;
}
// The shape we persist between runs. Cursor-based pagination so re-runs
// only fetch issues updated after the last successful sync.
interface GitHubCheckpoint {
last_updated_at: string | null;
}
// Minimal subset of the GitHub REST API issue payload we actually read.
interface GitHubIssue {
id: number;
number: number;
title: string;
body: string | null;
html_url: string;
updated_at: string;
user: { login: string } | null;
}
// Tiny typed helper so we never reach into `ctx.checkpoint` raw.
function readCheckpoint(raw: SyncContext["checkpoint"]): GitHubCheckpoint {
const cp = (raw ?? {}) as Partial<GitHubCheckpoint>;
return { last_updated_at: cp.last_updated_at ?? null };
}
export default class GitHubIssuesConnector extends ConnectorRuntime {
readonly definition: ConnectorDefinition = {
key: "github-issues",
name: "GitHub issues",
version: "1.0.0",
// Personal access token is collected once per connection and stored
// encrypted; the worker only ever sees a `lobu_secret_<uuid>` placeholder.
authSchema: {
methods: [
{
type: "env_keys",
fields: [
{ key: "token", label: "GitHub PAT", secret: true, required: true },
],
},
],
},
feeds: {
issues: { key: "issues", name: "Issues" },
},
};
async sync(ctx: SyncContext): Promise<SyncResult> {
// For `env_keys` auth, the values land in `ctx.config` keyed by the
// `key` you declared on the auth field. OAuth tokens (for `oauth` auth)
// arrive on `ctx.credentials.accessToken` instead.
const config = ctx.config as unknown as GitHubConfig & { token?: string };
const checkpoint = readCheckpoint(ctx.checkpoint);
const token = config.token ?? "";
// GitHub returns issues updated *at or after* `since`; we want
// strictly after, so we filter by id below.
const since = checkpoint.last_updated_at ?? "1970-01-01T00:00:00Z";
const url =
`https://api.github.com/repos/${config.owner}/${config.repo}/issues` +
`?state=all&sort=updated&direction=asc&per_page=100&since=${since}`;
const response = await fetch(url, {
headers: {
Authorization: `Bearer ${token}`,
Accept: "application/vnd.github+json",
},
});
if (!response.ok) {
throw new Error(`GitHub ${response.status}: ${await response.text()}`);
}
const issues = (await response.json()) as GitHubIssue[];
const fresh = issues.filter((i) => i.updated_at !== checkpoint.last_updated_at);
const events: EventEnvelope[] = fresh.map((issue) => ({
origin_id: String(issue.id),
origin_type: "issue",
title: `#${issue.number} ${issue.title}`,
payload_text: issue.body ?? "",
source_url: issue.html_url,
author_name: issue.user?.login,
occurred_at: new Date(issue.updated_at),
}));
return {
events,
// Always advance the checkpoint to the newest `updated_at` we saw.
// If the page was empty, return the previous value verbatim so the
// next run is still idempotent.
checkpoint: {
last_updated_at:
fresh.at(-1)?.updated_at ?? checkpoint.last_updated_at,
} satisfies GitHubCheckpoint,
};
}
async execute(): Promise<{ success: false; error: string }> {
return { success: false, error: "github-issues is read-only" };
}
}

A few things to notice:

  • SyncContext["checkpoint"] is Record<string, unknown> | null. Wrap it once in a tiny typed reader (readCheckpoint) instead of casting at every call site.
  • env_keys credentials live on ctx.config, not ctx.credentials. Lobu merges the values the user filled into the env_keys form into ctx.config under the keys you declared (token here). ctx.credentials is reserved for oauth auth — accessToken, refreshToken, scope, expiresAt.
  • The PAT is a lobu_secret_<uuid> placeholder at runtime. The gateway’s secret proxy swaps it for the real value when the outbound HTTPS request leaves the worker, so the secret never lives in the worker’s memory.
  • Pagination via the since query param. The GitHub Link header is the alternative for cursor-style paging when you need to walk a stable, ordered list; since is simpler when the source already gives you a monotonic timestamp.

Save this file in your Lobu project (e.g. github-issues.connector.ts next to lobu.config.ts) and list it in your config:

import { connectorFromFile, defineConfig } from "@lobu/cli/config";
import type GitHubIssuesConnector from "./github-issues.connector.ts";
export default defineConfig({
connectors: [
connectorFromFile<typeof GitHubIssuesConnector>(
"./github-issues.connector.ts"
),
],
// ...agents, connections, etc.
});

Passing the connector’s type via the generic (import type + connectorFromFile<typeof GitHubIssuesConnector>) is optional — bare connectorFromFile("./github-issues.connector.ts") still works — but it gives you go-to-definition, rename, and a tsc error if the file’s default export ever stops being a ConnectorRuntime subclass. The import type is erased at compile time, so the connector module is never loaded while your config is evaluated.

lobu apply ships the source to the gateway, which compiles and registers it; from there each feeds.<key> entry shows up as something a user can create a connection for in the admin UI.

The static metadata for your connector. Filed under connector_definitions in the gateway DB after lobu apply.

FieldRequiredDescription
keyyesUnique global key, e.g. google.gmail, github-issues
nameyesHuman-readable label
versionyesSemver — bump to invalidate per-feed checkpoints if the event shape changes
authSchemanoHow users authenticate this connector (see below)
feedsnoMap of feed key → FeedDefinition (a connector typically has one or more feeds)
actionsnoMap of action key → ActionDefinition (only needed if you also implement execute)
requiredCapabilitynoWhen set, only worker pods/devices advertising this capability serve runs (e.g. screentime for the Mac app)
runtimenoPin to a device platform (iOS, macOS, …) — omit for cloud-side connectors

See the full type at reference/connector-sdk › ConnectorDefinition.

What sync() receives. Every field is read-only.

FieldDescription
feedKeyWhich feed Lobu is asking you to run
configThe connection-level config the user filled in (typed by your FeedDefinition.configSchema)
checkpointThe last successful run’s checkpoint, or null on the first run
credentialsOAuth tokens (accessToken, refreshToken, …) for oauth auth; null for everything else. env_keys values land on ctx.config under the declared key.
entityIdsEntities this feed is linked to (rarely needed; useful for scoping the sync)
sessionStateBrowser cookies / tokens captured by lobu memory browser-auth for browser auth
emitEvents(events)Optional streaming hook — flush a chunk before the run ends
updateCheckpoint(cp)Optional progress-checkpoint hook for long-running syncs

SyncContext does not currently expose generics for config / checkpoint. Declare your own interfaces and convert at the boundary, as the example above does with readCheckpoint.

The shape of one event in the stream. Each envelope becomes a row in the events table.

interface EventEnvelope {
origin_id: string; // platform's unique ID for this item
origin_type?: string; // source-native type (post, message, charge)
payload_text: string; // main content
payload_type?: "text" | "markdown" | "json_template" | "media" | "empty";
title?: string;
author_name?: string;
source_url?: string; // permalink back to the original
occurred_at: Date; // when the event actually happened
semantic_type?: string; // content, note, summary, fact, etc.
score?: number; // 0-100 engagement / relevance
metadata?: Record<string, unknown>;
}

Only origin_id, payload_text, and occurred_at are required. The full surface is documented in reference/connector-sdk › EventEnvelope.

interface SyncResult {
events: EventEnvelope[];
checkpoint: Record<string, unknown> | null;
auth_update?: Record<string, unknown> | null;
metadata?: {
items_found?: number;
items_skipped?: number;
[key: string]: unknown;
};
}

Return events: [] plus the same checkpoint you received on a no-new-data tick — runs stay idempotent.

If your connector also writes back (e.g. assign_issue, send_email), declare an actions map on the definition and implement execute(ctx):

import type { ActionContext, ActionResult } from "@lobu/connector-sdk";
interface AssignIssueInput {
issueId: string;
assignee: string;
}
async execute(ctx: ActionContext): Promise<ActionResult> {
if (ctx.actionKey !== "assign_issue") {
return { success: false, error: `unknown action ${ctx.actionKey}` };
}
const { issueId, assignee } = ctx.input as unknown as AssignIssueInput;
// Same `env_keys` field as sync() — execute()'s ctx.config carries it too.
const token = String((ctx.config as { token?: string }).token ?? "");
await fetch(`https://api.example.com/issues/${issueId}`, {
method: "PATCH",
headers: { Authorization: `Bearer ${token}` },
body: JSON.stringify({ assignee }),
});
return { success: true, output: { issueId, assignee } };
}

Each ActionDefinition declares requiresApproval: true | false plus MCP-style annotations (destructiveHint, idempotentHint). The gateway routes high-risk actions through the approval queue before the worker runs them.

Declare on definition.authSchema. A connector can list multiple methods; the gateway lets the user pick.

typeUse when
nonePublic endpoint, no credentials needed
env_keysStatic API keys (Stripe secret key, PAT) — fields rendered as form inputs, stored encrypted
oauthStandard OAuth 2.0 — Lobu handles the dance, refresh, and per-user token isolation
browserSession cookies captured via lobu memory browser-auth from a logged-in Chrome profile (or CDP)
interactiveCustom auth flow (QR pairing, OTP, signed device handshake) — implement authenticate(ctx) and stream AuthArtifacts

Workers never see the raw secret on the wire: the gateway’s secret-proxy swaps lobu_secret_<uuid> placeholders for real values at egress, so the string you pull from ctx.config.<field> (env_keys) or ctx.credentials.accessToken (oauth) looks like a normal token from your code, but it’s only resolved when the outbound request leaves the proxy.

Full breakdown at reference/connector-sdk › ConnectorAuthSchema.

The checkpoint is your bookmark. It’s persisted on the feeds row after every successful sync and handed back as ctx.checkpoint on the next run. Three common shapes:

// Timestamp cursor (GitHub `since`, Stripe `created[gt]`):
interface TimestampCheckpoint {
last_updated_at: string | null;
}
// Page token (Google APIs):
interface PageTokenCheckpoint {
next_page_token: string | null;
}
// Bounded ID set (idempotency, no native cursor):
interface IdSetCheckpoint {
seen_ids: string[];
}

Rules of thumb:

  • Always return a checkpoint, even on the no-new-data case — return the previous one verbatim. Returning null tells the gateway to treat the next run as a fresh start.
  • Cap unbounded structures (ID sets, in-flight queues) before persisting. Keep the last 1000 IDs — enough to dedupe across a sync window without bloating the row.
  • Long-running syncs can call ctx.updateCheckpoint(...) mid-flight so a crash doesn’t lose progress.

A *.connector.ts file can live anywhere in your Lobu project; reference each one explicitly with connectorFromFile in defineConfig({ connectors }):

my-agent/
├── lobu.config.ts # connectors: [connectorFromFile<typeof GitHubIssuesConnector>(...)]
├── github-issues.connector.ts
├── stripe-charges.connector.ts
└── agents/my-agent/...

lobu apply type-checks and ships only the listed connectors (there is no ./connectors auto-discovery). Update the version field whenever the event shape changes so the gateway forces a fresh checkpoint.

A connector can pull in two kinds of dependency, and they are provisioned differently.

npm packages are bundled at compile time. Add them to the package.json next to your lobu.config.ts and import them normally:

import { parse } from "csv-parse/sync";

lobu apply runs bun install --ignore-scripts in the project, then esbuild bundles each connector with the project’s node_modules and uploads the artifact. The server only ever receives the bundle, so npm deps ship inside it. --ignore-scripts keeps install-time supply-chain surface off your machine, which is also why packages that need native build steps do not belong here.

Native tools are provisioned at run time via nix. Declare them as nixpkgs attribute refs in runtime.nix.packages on the connector definition:

export default class VideoConnector extends ConnectorRuntime {
definition: ConnectorDefinition = {
key: "media.video",
name: "Video",
version: "1.0.0",
runtime: {
platforms: ["linux", "macos"],
nix: { packages: ["ffmpeg", "imagemagick"] },
},
// ...feeds, actions
};
// ...sync / execute can now shell out to ffmpeg
}

At execution the runtime wraps the connector’s subprocess in nix-shell -p <packages> so the declared tools are on PATH. Backends that cannot run native deps reject a connector that declares them, and a host without nix-shell errors with a clear message rather than failing mid-run.

The rule of thumb: npm is bundled (compile-time), native is nix (run-time). Never put a native tool in package.json expecting it to ship, and never list an npm package in runtime.nix.packages. See the ConnectorRuntimeInfo reference for the field shape.

  • Reactions — the typed hook (part of this same package) for code that runs after watchers extract data.
  • @lobu/connector-sdk API reference — every exported symbol with types.
  • Memory — how connector events become durable entity memory.