Skip to content

Bots, Queues, and CLI

Bot updates, queue jobs, and CLI commands are bounded async operations. The examples create one scope per update, job, or command and use await using where the function owns the whole operation.

They share examples/_shared/container.ts. Compare the unit of work that each library hands to application code.

ExampleShows
telegraf.tsTelegraf update scope
grammy.tsGrammy update scope
bullmq.tsBullMQ job scope
commander.tsCommander command scope
yargs.tsYargs command scope

Telegraf

ts
import { Telegraf, type Context, type MiddlewareFn } from 'telegraf'

import {
  buildRootContainer,
  createRequestScope,
  type RequestContainer,
} from '../_shared/container.js'

const root = buildRootContainer()

type BotContext = Context & {
  container: RequestContainer
}

const withContainer: MiddlewareFn<BotContext> = async (ctx, next) => {
  // Re-use the shared per-request scope shape: `update_id` becomes the
  // requestId, the Telegram user id becomes the userId. This way the
  // shared services (Logger, AuditService) get the same fields they would
  // see in HTTP request context.
  await using scope = await createRequestScope(root, {
    requestId: String(ctx.update.update_id),
    userId: ctx.from?.id !== undefined ? String(ctx.from.id) : undefined,
  })

  ctx.container = scope
  await next()
}

export const bot = new Telegraf<BotContext>(process.env.BOT_TOKEN!)

bot.use(withContainer)
bot.start(async (ctx) => {
  const profile = await ctx.container.get('users').profile(String(ctx.from?.id ?? 'anonymous'))
  await ctx.reply(`Hello ${profile.name}`)
})

Repository file: examples/workers-cli/telegraf.ts

Grammy

ts
import { Bot, type Context, type MiddlewareFn } from 'grammy'

import {
  buildRootContainer,
  createRequestScope,
  type RequestContainer,
} from '../_shared/container.js'

const root = buildRootContainer()

type BotContext = Context & {
  container: RequestContainer
}

const withContainer: MiddlewareFn<BotContext> = async (ctx, next) => {
  await using scope = await createRequestScope(root, {
    requestId: String(ctx.update.update_id),
    userId: ctx.from?.id !== undefined ? String(ctx.from.id) : undefined,
  })

  ctx.container = scope
  await next()
}

export const bot = new Bot<BotContext>(process.env.BOT_TOKEN!)

bot.use(withContainer)
bot.command('help', async (ctx) => {
  const profile = await ctx.container.get('users').profile(String(ctx.from?.id ?? 'anonymous'))
  await ctx.reply(`Chat profile: ${profile.name}`)
})

Repository file: examples/workers-cli/grammy.ts

BullMQ

ts
import { Worker, type Job } from 'bullmq'

import {
  buildRootContainer,
  createRequestScope,
} from '../_shared/container.js'

const root = buildRootContainer()

export const worker = new Worker('email', async (job: Job<{ to: string }>) => {
  // Reuse the same per-request shape for jobs: jobId → requestId, payload
  // recipient → userId so AuditService records pick up the right user.
  await using scope = await createRequestScope(root, {
    requestId: job.id ?? `job:${job.name}`,
    userId: job.data.to,
  })

  scope.get('audit').record('email.sent', { name: job.name, to: job.data.to })
  // Real implementation would call scope.get('mailer').send(job.data).
})

Repository file: examples/workers-cli/bullmq.ts

Commander

ts
import { Command } from 'commander'

import { buildRootContainer } from '../_shared/container.js'

const root = buildRootContainer()

export const program = new Command()

program
  .command('import-users <file>')
  .option('--dry-run')
  .action(async (file: string, options: { dryRun?: boolean }) => {
    // A CLI invocation is a bounded async unit: `await using` ties scope
    // disposal to the action function's exit (success or throw), so the
    // async `Database` factory in the shared root is closed before the
    // process exits.
    await using scope = root.createScope()
    const ctx = scope.get('request')
    ctx.requestId = `cli:import-users:${Date.now()}`

    scope.get('audit').record('cli.import-users.start', {
      file,
      dryRun: options.dryRun === true,
    })

    // Real implementation would stream `file` through scope.get('users')
    // and write to scope.get('db'). The minimal demo just logs.
    scope.get('audit').record('cli.import-users.done', { file })
  })

Repository file: examples/workers-cli/commander.ts

Yargs

ts
import yargs from 'yargs'
import { hideBin } from 'yargs/helpers'

import { buildRootContainer } from '../_shared/container.js'

const root = buildRootContainer()

export const cli = yargs(hideBin(process.argv))
  .command(
    'sync <target>',
    'Sync a target',
    (builder) =>
      builder
        .positional('target', { type: 'string', demandOption: true })
        .option('verbose', { type: 'boolean', default: false }),
    async (argv) => {
      // Each CLI invocation owns a fresh scope; `await using` disposes the
      // shared `Database` factory deterministically before the action
      // resolves (and therefore before the Node process exits).
      await using scope = root.createScope()
      scope.get('request').requestId = `cli:sync:${Date.now()}`

      scope.get('audit').record('cli.sync', {
        target: argv.target,
        verbose: argv.verbose,
      })
    },
  )
  .strict()

Repository file: examples/workers-cli/yargs.ts