Six authorization patterns for TypeScript AI agents. Each one is copy-paste ready — no framework lock-in, works with OpenAI function calling, Vercel AI SDK, LangChain.js, or a hand-rolled agent loop.
Pattern 1: Tool Allowlist with Schema Validation
Every tool call the agent makes should pass through an allowlist before execution. This blocks prompt injection attacks that try to call tools the agent was never supposed to have.
import { z } from "zod";
// Define permitted tools and their parameter schemas
const TOOL_REGISTRY = {
search_web: z.object({
query: z.string().max(200),
}),
read_file: z.object({
path: z.string().regex(/^[a-zA-Z0-9_\-./]+$/), // no traversal
}),
send_email: z.object({
to: z.string().email(),
subject: z.string().max(100),
body: z.string().max(5000),
}),
} satisfies Record<string, z.ZodObject<any>>;
type ToolName = keyof typeof TOOL_REGISTRY;
function validateToolCall(
toolName: string,
params: unknown
): { toolName: ToolName; params: unknown } {
if (!(toolName in TOOL_REGISTRY)) {
throw new Error(`Tool '${toolName}' is not in the allowlist`);
}
const schema = TOOL_REGISTRY[toolName as ToolName];
const result = schema.safeParse(params);
if (!result.success) {
throw new Error(
`Invalid params for tool '${toolName}': ${result.error.message}`
);
}
return { toolName: toolName as ToolName, params: result.data };
}
// Usage in your agent loop
async function executeToolCall(toolName: string, rawParams: unknown) {
const { toolName: validName, params } = validateToolCall(toolName, rawParams);
return await dispatch(validName, params);
}
What this blocks: An agent with access to search_web cannot be prompted into calling delete_database or passing a path traversal string (../../etc/passwd) to read_file. The allowlist is the first gate; schema validation is the second.
Pattern 2: Scoped API Token Injection
Do not pass a master API key to the agent. Inject a scoped credential per tool call — one that can only do what the tool needs.
interface ScopedCredential {
token: string;
expiresAt: Date;
scopes: string[];
}
// Credential factory — issue per-call tokens
async function getScopedCredential(
tool: ToolName,
userId: string
): Promise<ScopedCredential> {
const scopeMap: Record<ToolName, string[]> = {
search_web: ["search:read"],
read_file: ["storage:read"],
send_email: ["email:send"],
};
const scopes = scopeMap[tool];
const expiresAt = new Date(Date.now() + 60_000); // 60-second TTL
// Issue a short-lived token from your auth service
const token = await issueToken({ userId, scopes, expiresAt });
return { token, expiresAt, scopes };
}
// Inject credential at execution time
async function executeWithScopedCredential(
toolName: ToolName,
params: unknown,
userId: string
) {
const credential = await getScopedCredential(toolName, userId);
// Pass token to the tool implementation — never to the agent
return await dispatch(toolName, params, { credential });
}
What this prevents: Even if the agent is manipulated into calling a real tool with crafted params, the short-lived scoped token cannot be reused, exported, or used to call other services. The agent never sees a long-lived credential.
Pattern 3: Human-in-the-Loop Confirmation Gate
Some tool calls should not execute without a human reviewing them first. Write a gate that pauses execution and waits for explicit confirmation.
type ConfirmationStatus = "pending" | "approved" | "rejected";
interface PendingConfirmation {
id: string;
toolName: ToolName;
params: unknown;
userId: string;
createdAt: Date;
status: ConfirmationStatus;
resolve?: (approved: boolean) => void;
}
const pendingConfirmations = new Map<string, PendingConfirmation>();
// Tools that require human confirmation before execution
const REQUIRES_CONFIRMATION = new Set<ToolName>(["send_email"]);
async function executeWithConfirmation(
toolName: ToolName,
params: unknown,
userId: string
): Promise<unknown> {
if (!REQUIRES_CONFIRMATION.has(toolName)) {
return await dispatch(toolName, params, {});
}
// Create a pending confirmation
const confirmationId = crypto.randomUUID();
const approved = await new Promise<boolean>((resolve) => {
pendingConfirmations.set(confirmationId, {
id: confirmationId,
toolName,
params,
userId,
createdAt: new Date(),
status: "pending",
resolve,
});
// Notify the user (webhook, websocket, push notification, etc.)
notifyUser(userId, { confirmationId, toolName, params });
// Timeout after 5 minutes
setTimeout(() => {
const pending = pendingConfirmations.get(confirmationId);
if (pending?.status === "pending") {
pending.resolve?.(false);
pendingConfirmations.delete(confirmationId);
}
}, 5 * 60_000);
});
if (!approved) {
throw new Error(`Tool call '${toolName}' rejected by user`);
}
return await dispatch(toolName, params, {});
}
// Call this from your API endpoint when the user clicks Approve/Reject
function resolveConfirmation(confirmationId: string, approved: boolean): void {
const pending = pendingConfirmations.get(confirmationId);
if (!pending) throw new Error("Confirmation not found or expired");
pending.status = approved ? "approved" : "rejected";
pending.resolve?.(approved);
pendingConfirmations.delete(confirmationId);
}
When to use this: Any tool that sends external messages, creates records in third-party systems, charges money, or modifies production data. The agent generates the draft; the human approves the send.
Pattern 4: Per-User Rate Limiter (Token Bucket)
Prevent runaway agents from exhausting quotas, hammering APIs, or racking up costs.
interface TokenBucket {
tokens: number;
lastRefill: number;
capacity: number;
refillRate: number; // tokens per second
}
const buckets = new Map<string, TokenBucket>();
function getBucket(userId: string): TokenBucket {
if (!buckets.has(userId)) {
buckets.set(userId, {
tokens: 10, // burst capacity
lastRefill: Date.now(),
capacity: 10,
refillRate: 1, // 1 token/second = 60 calls/minute sustained
});
}
return buckets.get(userId)!;
}
function consumeToken(userId: string): boolean {
const bucket = getBucket(userId);
const now = Date.now();
const elapsed = (now - bucket.lastRefill) / 1000;
// Refill tokens based on elapsed time
bucket.tokens = Math.min(
bucket.capacity,
bucket.tokens + elapsed * bucket.refillRate
);
bucket.lastRefill = now;
if (bucket.tokens < 1) {
return false; // rate limited
}
bucket.tokens -= 1;
return true;
}
async function executeWithRateLimit(
toolName: ToolName,
params: unknown,
userId: string
): Promise<unknown> {
if (!consumeToken(userId)) {
throw new Error(
`Rate limit exceeded for user ${userId}. Retry after 1 second.`
);
}
return await dispatch(toolName, params, {});
}
Notes: This is in-memory — works for single-process. For distributed deployments, replace Map with Redis using INCR + EXPIRE. Add per-tool limits if some tools are more expensive than others (e.g., a file write costs 5 tokens, a read costs 1).
Pattern 5: Structured Audit Log
Every tool call should be logged before and after execution. The audit trail should be append-only and inaccessible to the agent.
interface AuditEntry {
id: string;
timestamp: string;
userId: string;
sessionId: string;
toolName: string;
paramsHash: string; // hash, not plaintext — avoid logging secrets
outcome: "success" | "error" | "rejected";
durationMs: number;
errorMessage?: string;
}
async function sha256(data: string): Promise<string> {
const buf = await crypto.subtle.digest(
"SHA-256",
new TextEncoder().encode(data)
);
return Array.from(new Uint8Array(buf))
.map((b) => b.toString(16).padStart(2, "0"))
.join("");
}
async function executeWithAudit(
toolName: ToolName,
params: unknown,
userId: string,
sessionId: string
): Promise<unknown> {
const id = crypto.randomUUID();
const start = Date.now();
const paramsHash = await sha256(JSON.stringify(params));
// Log intent before execution
const entry: Partial<AuditEntry> = {
id,
timestamp: new Date().toISOString(),
userId,
sessionId,
toolName,
paramsHash,
};
try {
const result = await dispatch(toolName, params, {});
writeAuditLog({
...entry,
outcome: "success",
durationMs: Date.now() - start,
} as AuditEntry);
return result;
} catch (err) {
writeAuditLog({
...entry,
outcome: "error",
durationMs: Date.now() - start,
errorMessage: err instanceof Error ? err.message : String(err),
} as AuditEntry);
throw err;
}
}
function writeAuditLog(entry: AuditEntry): void {
// Write to stdout as JSON lines — ingest with your log aggregator
process.stdout.write(JSON.stringify(entry) + "\n");
// Or: write to an append-only DB table, S3, etc.
}
What to log: intent (before), outcome (after), duration, parameter hash (not plaintext — params may contain PII or credentials). Log to a sink the agent cannot read or modify — a separate log aggregator, S3 bucket, or write-only DB user.
Pattern 6: Role-Based Tool Access Control
Different users get different tool access. Wire RBAC as a pre-check before the allowlist.
type Role = "viewer" | "editor" | "admin";
const ROLE_PERMISSIONS: Record<Role, Set<ToolName>> = {
viewer: new Set(["search_web", "read_file"]),
editor: new Set(["search_web", "read_file", "send_email"]),
admin: new Set(["search_web", "read_file", "send_email"]),
};
interface UserContext {
userId: string;
role: Role;
sessionId: string;
}
function checkToolPermission(toolName: ToolName, user: UserContext): void {
const permitted = ROLE_PERMISSIONS[user.role];
if (!permitted.has(toolName)) {
throw new Error(
`User ${user.userId} (role: ${user.role}) does not have permission to call '${toolName}'`
);
}
}
// Compose all patterns together
async function executeAgentToolCall(
rawToolName: string,
rawParams: unknown,
user: UserContext
): Promise<unknown> {
// 1. Allowlist + schema validation
const { toolName, params } = validateToolCall(rawToolName, rawParams);
// 2. RBAC check
checkToolPermission(toolName, user);
// 3. Rate limit
if (!consumeToken(user.userId)) {
throw new Error("Rate limit exceeded");
}
// 4. Human-in-the-loop (for destructive tools)
if (REQUIRES_CONFIRMATION.has(toolName)) {
return await executeWithConfirmation(toolName, params, user.userId);
}
// 5. Execute with scoped credential + audit log
return await executeWithAudit(toolName, params, user.userId, user.sessionId);
}
Wiring to OpenAI Function Calling
import OpenAI from "openai";
const client = new OpenAI();
async function runAgentLoop(userMessage: string, user: UserContext) {
const tools = Object.entries(TOOL_REGISTRY).map(([name, schema]) => ({
type: "function" as const,
function: {
name,
parameters: zodToJsonSchema(schema), // use zod-to-json-schema package
},
}));
const messages: OpenAI.Chat.ChatCompletionMessageParam[] = [
{ role: "user", content: userMessage },
];
while (true) {
const response = await client.chat.completions.create({
model: "gpt-4o",
messages,
tools,
tool_choice: "auto",
});
const message = response.choices[0].message;
messages.push(message);
if (!message.tool_calls?.length) break;
for (const toolCall of message.tool_calls) {
const result = await executeAgentToolCall(
toolCall.function.name,
JSON.parse(toolCall.function.arguments),
user
);
messages.push({
role: "tool",
tool_call_id: toolCall.id,
content: JSON.stringify(result),
});
}
}
return messages.at(-1)?.content;
}
Checklist Before Shipping
- Allowlist defined — every tool the agent can call is explicitly listed; no catch-all
- Schema validation on every call — zod or equivalent, before dispatch
- No long-lived credentials in agent context — inject scoped tokens per call
- Confirmation gate on destructive tools — send, write, delete require human approval
- Rate limits per user — token bucket or equivalent, enforced in code not just API quotas
- Audit log append-only — agent cannot read or modify its own trail
- RBAC enforced — role checked before tool dispatch, not just at login
- Error messages scrubbed — error responses to the agent do not leak internal paths, stack traces, or credentials
All patterns above are framework-agnostic. Swap dispatch() for your tool execution function. Add middleware in whatever order fits your stack — the composition in Pattern 6 is a starting point, not a requirement.
