Featured in Vercel OSS 2025

A New Paradigm in Backend Engineering

You’re one
Step
Step
Hello from Step
away from replacing 10+ 
frameworks.
 Build 
queues.
streams.
stateful backend.
workflows.
AI agents.
observable apps.
scalable apps.
APIs.
background jobs.

One STEP to build any backend.

A Step is the core primitive in Motia. Instead of juggling separate frameworks for APIs, background jobs, queues, or workflows, you define everything in one place: how it runs, when it runs, where it runs, and what it does.

Composable
Reusable
Observable
Multi-language
Scalable

Trigger - How Execution Begins

Every Step has a type that defines how it triggers. Change the type, and the same pattern works for different use cases. A Step file contains two parts:

Config → defines when and how the Step runs, and gives it a unique name
Handler → the function that executes your business logic
Read docs
Motia automatically discovers any file ending in  .step.ts  .step.js or _step.py  The filename tells Motia to load it, and the name in the config uniquely identifies the Step inside your system.
api.step.ts
import { ApiRouteConfig, Handlers } from 'motia'

export const config: ApiRouteConfig = {
  name: 'SendMessage',
  type: 'api',
  path: '/messages',
  method: 'POST',
  emits: ['message.sent'],
  flows: ['messaging']
}

export const handler: Handlers['SendMessage'] = async (req, { emit, logger, state, streams }) => {
  const { text, userId } = req.body
  const message = { text, userId, status: 'sent' }
  await state.set('messages', userId, message)
  await streams.messages.set(userId, message)
  logger.info('Message sent', { userId })
  await emit({ topic: 'message.sent', data: message })
  return { status: 201, body: message }
}
api.step.js
exports.config = {
  name: 'SendMessage',
  type: 'api',
  path: '/messages',
  method: 'POST',
  emits: ['message.sent'],
  flows: ['messaging']
}

exports.handler = async (req, { emit, logger, state, streams }) => {
  const { text, userId } = req.body
  const message = { text, userId, status: 'sent' }
  await state.set('messages', userId, message)
  await streams.messages.set(userId, message)
  logger.info('Message sent', { userId })
  await emit({ topic: 'message.sent', data: message })
  return { status: 201, body: message }
}
api_step.py
config = {
    name: 'SendMessage',
    type: 'api',
    path: '/messages',
    "method": "POST",
    "emits": ["message.sent"],
    "flows": ["messaging"]
}

async def handler(req, context):
    text = req.get("body", {}).get("text")
    user_id = req.get("body", {}).get("userId")
    message = {"text": text, "userId": user_id, "status": "sent"}
    await context.state.set("messages", user_id, message)
    await context.streams.messages.set(user_id, message)
    context.logger.info("Message sent", {"userId": user_id})
    await context.emit({"topic": "message.sent", "data": message})
    return {"status": 201, "body": message
}
api.event.ts
import { EventConfig, Handlers } from 'motia'

export const config: EventConfig = {
  name: 'ProcessMessage',
  type: 'event',
  subscribes: ['message.sent'],
  emits: ['message.processed'],
  flows: ['messaging']
}

export const handler: Handlers['ProcessMessage'] = async (input, { emit, logger, state, streams }) => {
  const { text, userId, status } = input
  const processedMessage = { text, userId, status: 'processed' }
  await state.set('processed', userId, processedMessage)
  await streams.processed.set(userId, processedMessage)
  logger.info('Message processed', { userId })
  await emit({ topic: 'message.processed', data: processedMessage })
}
api.event.js
exports.config = {
  name: 'ProcessMessage',
  type: 'event',
  subscribes: ['message.sent'],
  emits: ['message.processed'],
  flows: ['messaging']
}

exports.handler = async (input, { emit, logger, state, streams }) => {
  const { text, userId, status } = input
  const processedMessage = { text, userId, status: 'processed' }
  await state.set('processed', userId, processedMessage)
  await streams.processed.set(userId, processedMessage)
  logger.info('Message processed', { userId })
  await emit({ topic: 'message.processed', data: processedMessage })
}
api_event.py
config = {
    "name": "ProcessMessage",
    type: 'event',
    "subscribes": ["message.sent"],
    "emits": ["message.processed"],
    "flows": ["messaging"]
}

async def handler(input_data, context):
    text = input_data.get("text")
    user_id = input_data.get("userId")
    status = input_data.get("status")
    processed_message = {"text": text, "userId": user_id, "status": "processed"}
    await context.state.set("processed", user_id, processed_message)
    await context.streams.processed.set(user_id, processed_message)
    context.logger.info("Message processed", {"userId": user_id})
    await context.emit({ "topic": "message.processed", "data": processed_message })
api.cron.ts
import { CronConfig, Handlers } from 'motia'

export const config: CronConfig = {
  name: 'DailySummary',
  type: 'cron',
  cron: '0 9 * * *',
  emits: ['summary.generated'],
  flows: ['messaging']
}

export const handler: Handlers['DailySummary'] = async ({ emit, state, logger, streams }) => {
  const messages = await state.getGroup('messages')
  const summary = { total: messages.length, status: 'completed' }
  await state.set('summaries', 'daily', summary)
  await streams.summary.set('latest', summary)
  logger.info('Daily summary generated', { total: summary.total })
  await emit({ topic: 'summary.generated', data: summary })
}
api.cron.js
exports.config = {
  name: 'DailySummary',
  type: 'cron',
  cron: '0 9 * * *',
  emits: ['summary.generated'],
  flows: ['messaging']
}

exports.handler = async ({ emit, state, logger, streams }) => {
  const messages = await state.getGroup('messages')
  const summary = { total: messages.length, status: 'completed' }
  await state.set('summaries', 'daily', summary)
  await streams.summary.set('latest', summary)
  logger.info('Daily summary generated', { total: summary.total })
  await emit({ topic: 'summary.generated', data: summary })
}
api_cron.py
config = {
    "name": "DailySummary",
    type: 'cron',
    "cron": "0 9 * * *",
    "emits": ["summary.generated"],
    "flows": ["messaging"]
}

async def handler(context):
    messages = await context.state.get_group("messages")
    summary = {"total": len(messages), "status": "completed"}
    await context.state.set("summaries", "daily", summary)
    await context.streams.summary.set("latest", summary)
    context.logger.info("Daily summary generated", {"total": summary["total"]})
    await context.emit({  "topic": "summary.generated", "data": summary })

Handler - How It Performs Logic

This is where your business logic lives. The handler function receives input data and a context object with everything you need: logger for tracking, emit for triggering other Steps, state for storing data, and streams for real-time updates.
Read docs
Motia automatically discovers any file ending in  .step.ts  .step.js or _step.py  The filename tells Motia to load it, and the name in the config uniquely identifies the Step inside your system.
api.step.ts
import { ApiRouteConfig, Handlers } from 'motia'

export const config: ApiRouteConfig = {
  name: 'SendMessage',
  type: 'api',
  path: '/messages',
  method: 'POST',
  emits: ['message.sent'],
  flows: ['messaging']
}

export const handler: Handlers['SendMessage'] = async (req, { emit, logger, state, streams }) => {
  const { text, userId } = req.body
  const message = { text, userId, status: 'sent' }
  await state.set('messages', userId, message)
  await streams.messages.set(userId, message)
  logger.info('Message sent', { userId })
  await emit({ topic: 'message.sent', data: message })
  return { status: 201, body: message }
}
api.step.js
 exports.config = {
  name: 'SendMessage',
  type: 'api',
  path: '/messages',
  method: 'POST',
  emits: ['message.sent'],
  flows: ['messaging']
}

exports.handler = async (req, { emit, logger, state, streams }) => {
  const { text, userId } = req.body
  const message = { text, userId, status: 'sent' }
  await state.set('messages', userId, message)
  await streams.messages.set(userId, message)
  logger.info('Message sent', { userId })
  await emit({ topic: 'message.sent', data: message })
  return { status: 201, body: message }
}
api_step.py
config = {
    "name": "SendMessage",
    "type": "api",
    "path": "/messages",
    "method": "POST",
    "emits": ["message.sent"],
    "flows": ["messaging"]
}

async def handler(req, context):
    text = req.get("body", {}).get("text")
    user_id = req.get("body", {}).get("userId")
    message = {"text": text, "userId": user_id, "status": "sent"}
    await context.state.set("messages", user_id, message)
    await context.streams.messages.set(user_id, message)
    context.logger.info("Message sent", {"userId": user_id})
    await context.emit({"topic": "message.sent", "data": message})
    return {"status": 201, "body": message}
api.event.ts
import { EventConfig, Handlers } from 'motia'

export const config: EventConfig = {
  name: 'ProcessMessage',
  type: 'event',
  subscribes: ['message.sent'],
  emits: ['message.processed'],
  flows: ['messaging']
}

export const handler: Handlers['ProcessMessage'] = async (input, { emit, logger, state, streams }) => {
  const { text, userId, status } = input
  const processedMessage = { text, userId, status: 'processed' }
  await state.set('processed', userId, processedMessage)
  await streams.processed.set(userId, processedMessage)
  logger.info('Message processed', { userId })
  await emit({ topic: 'message.processed', data: processedMessage })
}
api.event.js
exports.config = {
  name: 'ProcessMessage',
  type: 'event',
  subscribes: ['message.sent'],
  input: z.object({ text: z.string(), userId: z.string(), status: z.string() }),
  emits: ['message.processed'],
  flows: ['messaging']
}

exports.handler = async (input, { emit, logger, state, streams }) => {
 const { text, userId, status } = input
 const processedMessage = { text, userId, status: 'processed' }
 await state.set('processed', userId, processedMessage)
 await streams.processed.set(userId, processedMessage)
 logger.info('Message processed', { userId })
 await emit({ topic: 'message.processed', data: processedMessage })
}
api_event.py
config = {
    "name": "ProcessMessage",
    "type": "event",
    "subscribes": ["message.sent"],
    "emits": ["message.processed"],
    "flows": ["messaging"]
}

async def handler(input_data, context):
    text = input_data.get("text")
    user_id = input_data.get("userId")
    status = input_data.get("status")
    processed_message = {"text": text, "userId": user_id, "status": "processed"
    await context.state.set("processed", user_id, processed_message)
    await context.streams.processed.set(user_id, processed_message)
    context.logger.info("Message processed", {"userId": user_id})
    await context.emit({ "topic": "message.processed", "data": processed_message })
api.cron.ts
import { CronConfig, Handlers } from 'motia'

export const config: CronConfig = {
  name: 'DailySummary',
  type: 'cron',
  cron: '0 9 * * *',
  emits: ['summary.generated'],
  flows: ['messaging']
}

export const handler: Handlers['DailySummary'] = async ({ emit, state, logger, streams }) => {
  const messages = await state.getGroup('messages')
  >const summary = { total: messages.length, status: 'completed' }
  await state.set('summaries', 'daily', summary)
  await streams.summary.set('latest', summary)
  logger.info('Daily summary generated', { total: summary.total })
  await emit({ topic: 'summary.generated', data: summary })
}
api.cron.js
exports.config = {
  name: 'DailySummary',
  type: 'cron',
  cron: '0 9 * * *',
  emits: ['summary.generated'],
  flows: ['messaging']
}

exports.handler = async ({ emit, state, logger, streams }) => {
  const messages = await state.getGroup('messages')
  const summary = { total: messages.length, status: 'completed' }
  await state.set('summaries', 'daily', summary)
  await streams.summary.set('latest', summary)
  logger.info('Daily summary generated', { total: summary.total })
  await emit({ topic: 'summary.generated', data: summary })
}
api_cron.py
config = {
    "name": "DailySummary",
    "type": "cron",
    "cron": "0 9 * * *",
    "emits": ["summary.generated"],
    "flows": ["messaging"]
}

async def handler(context):
    messages = await context.state.get_group("messages")
    summary = {"total": len(messages), "status": "completed"}
    await context.state.set("summaries", "daily", summary)
    await context.streams.summary.set("latest", summary)
    context.logger.info("Daily summary generated", {"total": summary["total"]})
    await context.emit({ "topic": "summary.generated", "data": summary })

Emit - How It Outputs Data

Send data to other Steps using
await emit({ topic: 'event.name', data: {...} }).
Any Step that subscribes to that topic will receive your data and run automatically. This is how Steps talk to each other.
Read docs
Motia automatically discovers any file ending in  .step.ts  .step.js or _step.py  The filename tells Motia to load it, and the name in the config uniquely identifies the Step inside your system.
api.step.ts
import { ApiRouteConfig, Handlers } from 'motia'

export const config: ApiRouteConfig = {
  name: 'SendMessage',
  type: 'api',
  path: '/messages',
  method: 'POST',
  emits: ['message.sent'],
  flows: ['messaging']
}

export const handler: Handlers['SendMessage'] = async (req, { emit, logger, state, streams }) => {
  const { text, userId } = req.body
  const message = { text, userId, status: 'sent' }
  await state.set('messages', userId, message)
  await streams.messages.set(userId, message)
  logger.info('Message sent', { userId })
  await emit({ topic: 'message.sent', data: message })
  return { status: 201, body: message }
}
api.step.js
exports.config = {
  name: 'SendMessage',
  type: 'api',
  path: '/messages',
  method: 'POST',
  emits: ['message.sent'],
  flows: ['messaging']
}

exports.handler = async (req, { emit, logger, state, streams }) => {
  const { text, userId } = req.body
  const message = { text, userId, status: 'sent' }
  await state.set('messages', userId, message)
  await streams.messages.set(userId, message)
  logger.info('Message sent', { userId })
  await emit({ topic: 'message.sent', data: message })
  return { status: 201, body: message }
}
api_step.py
config = {
    "name": "SendMessage",
    "type": "api",
    "path": "/messages",
    "method": "POST",
    "emits": ["message.sent"],
    "flows": ["messaging"]
}

async def handler(req, context):
    text = req.get("body", {}).get("text")
    user_id = req.get("body", {}).get("userId")
    message = {"text": text, "userId": user_id, "status": "sent"}
    await context.state.set("messages", user_id, message)
    await context.streams.messages.set(user_id, message)
    context.logger.info("Message sent", {"userId": user_id})
    await context.emit({"topic": "message.sent", "data": message})
    return {"status": 201, "body": message}
api.event.ts
import { EventConfig, Handlers } from 'motia'

export const config: EventConfig = {
  name: 'ProcessMessage',
  type: 'event',
  subscribes: ['message.sent'],
  emits: ['message.processed'],
  flows: ['messaging']
}

export const handler: Handlers['ProcessMessage'] = async (input, { emit, logger, state, streams }) => {
  const { text, userId, status } = input
  const processedMessage = { text, userId, status: 'processed' }
  await state.set('processed', userId, processedMessage)
  await streams.processed.set(userId, processedMessage)
  logger.info('Message processed', { userId })
  await emit({ topic: 'message.processed', data: processedMessage })
}
api.event.js
exports.config = {
  name: 'ProcessMessage',
  type: 'event',
  subscribes: ['message.sent'],
  emits: ['message.processed'],
  flows: ['messaging']
}

exports.handler = async (input, { emit, logger, state, streams }) => {
  const { text, userId, status } = input
  const processedMessage = { text, userId, status: 'processed' }
  await state.set('processed', userId, processedMessage)
  await streams.processed.set(userId, processedMessage)
  logger.info('Message processed', { userId })
  await emit({ topic: 'message.processed', data: processedMessage })
}
api_event.py
config = {
    "name": "ProcessMessage",
    "type": "event",
    "subscribes": ["message.sent"],
    "emits": ["message.processed"],
    "flows": ["messaging"]
}

async def handler(input_data, context):
    text = input_data.get("text")
    user_id = input_data.get("userId")
    status = input_data.get("status")
    processed_message = {"text": text, "userId": user_id, "status": "processed"
    await context.state.set("processed", user_id, processed_message)
    await context.streams.processed.set(user_id, processed_message)
    context.logger.info("Message processed", {"userId": user_id})
    await context.emit({ "topic": "message.processed", "data": processed_message })
api.cron.ts
import { CronConfig, Handlers } from 'motia'

export const config: CronConfig = {
  name: 'DailySummary',
  type: 'cron',
  cron: '0 9 * * *',
  emits: ['summary.generated'],
  flows: ['messaging']
}

export const handler: Handlers['DailySummary'] = async ({ emit, state, logger, streams }) => {
  const messages = await state.getGroup('messages')
  const summary = { total: messages.length, status: 'completed' }
  await state.set('summaries', 'daily', summary)
  await streams.summary.set('latest', summary)
  logger.info('Daily summary generated', { total: summary.total })
  await emit({ topic: 'summary.generated', data: summary })
}
api.cron.js
exports.config = {
  name: 'DailySummary',
  type: 'cron',
  cron: '0 9 * * *',
  emits: ['summary.generated'],
  flows: ['messaging']
}

exports.handler = async ({ emit, state, logger, streams }) => {
  const messages = await state.getGroup('messages')
  const summary = { total: messages.length, status: 'completed' }
  await state.set('summaries', 'daily', summary)
  await streams.summary.set('latest', summary)
  logger.info('Daily summary generated', { total: summary.total })
  await emit({ topic: 'summary.generated', data: summary })
}
api_cron.py
config = {
    "name": "DailySummary",
    "type": "cron",
    "cron": "0 9 * * *",
    "emits": ["summary.generated"],
    "flows": ["messaging"]
}

async def handler(context):
    >messages = await context.state.get_group("messages")
    summary = {"total": len(messages), "status": "completed"}
    await context.state.set("summaries", "daily", summary)
    await context.streams.summary.set("latest", summary)
    context.logger.info("Daily summary generated", {"total": summary["total"]})
    await context.emit({ "topic": "summary.generated", "data": summary })

State - How It Stores Data

Store and retrieve data across Steps using
await state.set(key, value) and await state.get(key). Perfect for tracking workflow progress, caching results, or sharing data between Steps without setting up a database.
Read docs
Motia automatically discovers any file ending in  .step.ts  .step.js or _step.py  The filename tells Motia to load it, and the name in the config uniquely identifies the Step inside your system.
api.step.ts
import { ApiRouteConfig, Handlers } from 'motia'

export const config: ApiRouteConfig = {
  name: 'SendMessage',
  type: 'api',
  path: '/messages',
  method: 'POST',
  emits: ['message.sent'],
  flows: ['messaging']
}

export const handler: Handlers['SendMessage'] = async (req, { emit, logger, state, streams }) => {
  const { text, userId } = req.body
  const message = { text, userId, status: 'sent' }
  await state.set('messages', userId, message)
  await streams.messages.set(userId, message)
  logger.info('Message sent', { userId })
  await emit({ topic: 'message.sent', data: message })
  return { status: 201, body: message }
}
api.step.js
exports.config = {
  name: 'SendMessage',
  type: 'api',
  path: '/messages',
  method: 'POST',
  emits: ['message.sent'],
  flows: ['messaging']
}

exports.handler = async (req, { emit, logger, state, streams }) => {
  const { text, userId } = req.body
  const message = { text, userId, status: 'sent' }
  await state.set('messages', userId, message)
  await streams.messages.set(userId, message)
  logger.info('Message sent', { userId })
  await emit({ topic: 'message.sent', data: message })
  return { status: 201, body: message }
}
api_step.py
config = {
    "name": "SendMessage",
    "type": "api",
    "path": "/messages",
    "method": "POST",
    "emits": ["message.sent"],
    "flows": ["messaging"]
}

async def handler(req, context):
    text = req.get("body", {}).get("text")
    user_id = req.get("body", {}).get("userId")
    message = {"text": text, "userId": user_id, "status": "sent"}
    await context.state.set("messages", user_id, message)
    await context.streams.messages.set(user_id, message)
    context.logger.info("Message sent", {"userId": user_id})
    await context.emit({"topic": "message.sent", "data": message})
    return {"status": 201, "body": message}
api.event.ts
import { EventConfig, Handlers } from 'motia'

export const config: EventConfig = {
  name: 'ProcessMessage',
  type: 'event',
  subscribes: ['message.sent'],
  emits: ['message.processed'],
  flows: ['messaging']
}

export const handler: Handlers['ProcessMessage'] = async (input, { emit, logger, state, streams }) => {
  const { text, userId, status } = input
  const processedMessage = { text, userId, status: 'processed' }
  await state.set('processed', userId, processedMessage)
  await streams.processed.set(userId, processedMessage)
  logger.info('Message processed', { userId })
  await emit({  topic: 'message.processed', data: processedMessage })
}
api.event.js
exports.config = {
  name: 'ProcessMessage',
  type: 'event',
  subscribes: ['message.sent'],
  emits: ['message.processed'],
  flows: ['messaging']
}

exports.handler = async (input, { emit, logger, state, streams }) => {
  const { text, userId, status } = input
  const processedMessage = { text, userId, status: 'processed' }
  await state.set('processed', userId, processedMessage)
  await streams.processed.set(userId, processedMessage)
  logger.info('Message processed', { userId })
  await emit({ topic: 'message.processed', data: processedMessage })
}
api_event.py
config = {
    "name": "ProcessMessage",
    "type": "event",
    "subscribes": ["message.sent"],
    "emits": ["message.processed"],
    "flows": ["messaging"]
}

async def handler(input_data, context):
    text = input_data.get("text")
    user_id = input_data.get("userId")
    status = input_data.get("status")
    processed_message = {"text": text, "userId": user_id, "status": "processed"}
    await context.state.set("processed", user_id, processed_message)
    await context.streams.processed.set(user_id, processed_message)
    context.logger.info("Message processed", {"userId": user_id})
    await context.emit({ "topic": "message.processed",  "data": processed_message })
api.cron.ts
import { CronConfig, Handlers } from 'motia'

export const config: CronConfig = {
  name: 'DailySummary',
  type: 'cron',
  cron: '0 9 * * *',
  emits: ['summary.generated'],
  flows: ['messaging']
}

export const handler: Handlers['DailySummary'] = async ({ emit, state, logger, streams }) => {
  const messages = await state.getGroup('messages')
  const summary = { total: messages.length, status: 'completed' }
  await state.set('summaries', 'daily', summary)
  await streams.summary.set('latest', summary)
  logger.info('Daily summary generated', { total: summary.total })
  await emit({ topic: 'summary.generated', data: summary })
}
api.cron.js
exports.config = {
  name: 'DailySummary',
  type: 'cron',
  cron: '0 9 * * *',
  emits: ['summary.generated'],
  flows: ['messaging']
}

exports.handler = async ({ emit, state, logger, streams }) => {
  const messages = await state.getGroup('messages')
  const summary = { total: messages.length, status: 'completed' }
  await state.set('summaries', 'daily', summary)
  await streams.summary.set('latest', summary)
  logger.info('Daily summary generated', { total: summary.total })
  await emit({ topic: 'summary.generated', data: summary })
}
api_cron.py
config = {
    "name": "DailySummary",
    "type": "cron",
    "cron": "0 9 * * *",
    "emits": ["summary.generated"],
    "flows": ["messaging"]
}

async def handler(context):
    messages = await context.state.get_group("messages")
    summary = {"total": len(messages), "status": "completed"}
    await context.state.set("summaries", "daily", summary)
    await context.streams.summary.set("latest", summary)
    context.logger.info("Daily summary generated", {"total": summary["total"]})
    await context.emit({ "topic": "summary.generated", "data": summary })

Logger - How it tracks execution

Every handler gets a logger object. Call logger.info(), logger.warn(), or logger.error() with your message and data. All logs automatically include your Step name, trace ID, and timestamp - making it easy to debug workflows in the Workbench.
Read docs
Motia automatically discovers any file ending in  .step.ts  .step.js or _step.py  The filename tells Motia to load it, and the name in the config uniquely identifies the Step inside your system.
api.step.ts
import { ApiRouteConfig, Handlers } from 'motia'

export const config: ApiRouteConfig = {
  name: 'SendMessage',
  type: 'api',
  path: '/messages',
  method: 'POST',
  emits: ['message.sent'],
  flows: ['messaging']
}

export const handler: Handlers['SendMessage'] = async (req, { emit, logger, state, streams }) => {
  const { text, userId } = req.body
  const message = { text, userId, status: 'sent' }
  await state.set('messages', userId, message)
  await streams.messages.set(userId, message)
  logger.info('Message sent', { userId })
  await emit({ topic: 'message.sent', data: message })
  return { status: 201, body: message }
}
api.step.js
exports.config = {
  name: 'SendMessage',
  type: 'api',
  path: '/messages',
  method: 'POST',
  emits: ['message.sent'],
  flows: ['messaging']
}

exports.handler = async (req, { emit, logger, state, streams }) => {
  const { text, userId } = req.body
  const message = { text, userId, status: 'sent' }
  await state.set('messages', userId, message)
  await streams.messages.set(userId, message)
  logger.info('Message sent', { userId })
  await emit({ topic: 'message.sent', data: message })
  return { status: 201, body: message }
}
api_step.py
config = {
    "name": "SendMessage",
    "type": "api",
    "path": "/messages",
    "method": "POST",
    "emits": ["message.sent"],
    "flows": ["messaging"]
}

async def handler(req, context):
    text = req.get("body", {}).get("text")
    user_id = req.get("body", {}).get("userId")
    message = {"text": text, "userId": user_id, "status": "sent"}
    await context.state.set("messages", user_id, message)
    await context.streams.messages.set(user_id, message)
    context.logger.info("Message sent", {"userId": user_id})
    await context.emit({"topic": "message.sent", "data": message})
    return {"status": 201, "body": message}
api.event.ts
import { EventConfig, Handlers } from 'motia'

export const config: EventConfig = {
  name: 'ProcessMessage',
  type: 'event',
  subscribes: ['message.sent'],
  emits: ['message.processed'],
  flows: ['messaging']
}

export const handler: Handlers['ProcessMessage'] = async (input, { emit, logger, state, streams }) => {
  const { text, userId, status } = input
  const processedMessage = { text, userId, status: 'processed' }
  await state.set('processed', userId, processedMessage)
  await streams.processed.set(userId, processedMessage)
  logger.info('Message processed', { userId })
  await emit({ topic: 'message.processed', data: processedMessage })
}
api.event.js
exports.config = {
  name: 'ProcessMessage',
  type: 'event',
  subscribes: ['message.sent'],
  emits: ['message.processed'],
  flows: ['messaging']
}

exports.handler = async (input, { emit, logger, state, streams }) => {
  const { text, userId, status } = input
  const processedMessage = { text, userId, status: 'processed' }
  await state.set('processed', userId, processedMessage)
  await streams.processed.set(userId, processedMessage)
  logger.info('Message processed', { userId })
  await emit({ topic: 'message.processed', data: processedMessage })
}
api_event.py
config = {
    "name": "ProcessMessage",
    "type": "event",
    "subscribes": ["message.sent"],
    "emits": ["message.processed"],
    "flows": ["messaging"]
}

async def handler(input_data, context):
    text = input_data.get("text")
    user_id = input_data.get("userId")
    status = input_data.get("status")
    processed_message = {"text": text, "userId": user_id, "status": "processed"}
    await context.state.set("processed", user_id, processed_message)
    await context.streams.processed.set(user_id, processed_message)
    context.logger.info("Message processed", {"userId": user_id})
    await context.emit({  "topic": "message.processed", "data": processed_message })
api.cron.ts
import { CronConfig, Handlers } from 'motia'

export const config: CronConfig = {
  name: 'DailySummary',
  type: 'cron',
  cron: '0 9 * * *',
  emits: ['summary.generated'],
  flows: ['messaging']
}

export const handler: Handlers['DailySummary'] = async ({ emit, state, logger, streams }) => {
  const messages = await state.getGroup('messages')
  const summary = { total: messages.length, status: 'completed' }
  await state.set('summaries', 'daily', summary)
  await streams.summary.set('latest', summary)
  logger.info('Daily summary generated', { total: summary.total })
  await emit({ topic: 'summary.generated', data: summary })
}
api.cron.js
exports.config = {
  name: 'DailySummary',
  type: 'cron',
  cron: '0 9 * * *',
  emits: ['summary.generated'],
  flows: ['messaging']
}

exports.handler = async ({ emit, state, logger, streams }) => {
  const messages = await state.getGroup('messages')
  const summary = { total: messages.length, status: 'completed' }
  await state.set('summaries', 'daily', summary)
  await streams.summary.set('latest', summary)
  logger.info('Daily summary generated', { total: summary.total })
  await emit({ topic: 'summary.generated', data: summary })
}
api_cron.py
config = {
    "name": "DailySummary",
    "type": "cron",
    "cron": "0 9 * * *",
    "emits": ["summary.generated"],
    "flows": ["messaging"]
}

async def handler(context):
    messages = await context.state.get_group("messages")
    summary = {"total": len(messages), "status": "completed"}
    await context.state.set("summaries", "daily", summary)
    await context.streams.summary.set("latest", summary)
    context.logger.info("Daily summary generated", {"total": summary["total"]})
    await context.emit({ "topic": "summary.generated",  "data": summary })

Streams - How It Broadcasts Live Data

Push updates directly to connected clients with
await streams.mystream.set(key, value). When you update stream data from any Step, all subscribed frontend clients receive the changes instantly - no extra setup needed.
Read docs
Motia automatically discovers any file ending in  .step.ts  .step.js or _step.py  The filename tells Motia to load it, and the name in the config uniquely identifies the Step inside your system.
api.step.ts
import { ApiRouteConfig, Handlers } from 'motia'

export const config: ApiRouteConfig = {
  name: 'SendMessage',
  type: 'api',
  path: '/messages',
  method: 'POST',
  emits: ['message.sent'],
  flows: ['messaging']
}

export const handler: Handlers['SendMessage'] = async (req, { emit, logger, state, streams }) => {
  const { text, userId } = req.body
  const message = { text, userId, status: 'sent' }
  await state.set('messages', userId, message)
  await streams.messages.set(userId, message)
  logger.info('Message sent', { userId })
  await emit({ topic: 'message.sent', data: message })
  return { status: 201, body: message }
}
api.step.js
exports.config = {
  name: 'SendMessage',
  type: 'api',
  path: '/messages',
  method: 'POST',
  emits: ['message.sent'],
  flows: ['messaging']
}

exports.handler = async (req, { emit, logger, state, streams }) => {
  const { text, userId } = req.body
  const message = { text, userId, status: 'sent' }
  await state.set('messages', userId, message)
  await streams.messages.set(userId, message)
  logger.info('Message sent', { userId })
  await emit({ topic: 'message.sent', data: message })
  return { status: 201, body: message }
}
api_step.py
config = {
    "name": "SendMessage",
    "type": "api",
    "path": "/messages",
    "method": "POST",
    "emits": ["message.sent"],
    "flows": ["messaging"]
}

async def handler(req, context):
    text = req.get("body", {}).get("text")
    user_id = req.get("body", {}).get("userId")
    message = {"text": text, "userId": user_id, "status": "sent"}
    await context.state.set("messages", user_id, message)
    await context.streams.messages.set(user_id, message)
    context.logger.info("Message sent", {"userId": user_id})
    await context.emit({"topic": "message.sent", "data": message})
    return {"status": 201, "body": message}
api.event.ts
import { EventConfig, Handlers } from 'motia'

export const config: EventConfig = {
  name: 'ProcessMessage',
  type: 'event',
  subscribes: ['message.sent'],
  emits: ['message.processed'],
  flows: ['messaging']
}

export const handler: Handlers['ProcessMessage'] = async (input, { emit, logger, state, streams }) => {
  const { text, userId, status } = input
  const processedMessage = { text, userId, status: 'processed' }
  await state.set('processed', userId, processedMessage)
  await streams.processed.set(userId, processedMessage)
  logger.info('Message processed', { userId })
  await emit({ topic: 'message.processed', data: processedMessage })
}
api.event.js
exports.config = {
  name: 'ProcessMessage',
  type: 'event',
  subscribes: ['message.sent'],
  emits: ['message.processed'],
  flows: ['messaging']
}

exports.handler = async (input, { emit, logger, state, streams }) => {
  const { text, userId, status } = input
  const processedMessage = { text, userId, status: 'processed' }
  await state.set('processed', userId, processedMessage)
  await streams.processed.set(userId, processedMessage)
  logger.info('Message processed', { userId })
  await emit({ topic: 'message.processed', data: processedMessage })
}
api_event.py
config = {
    "name": "ProcessMessage",
    "type": "event",
    "subscribes": ["message.sent"],
    "emits": ["message.processed"],
    "flows": ["messaging"]
}

async def handler(input_data, context):
    text = input_data.get("text")
    user_id = input_data.get("userId")
    status = input_data.get("status")
    processed_message = {"text": text, "userId": user_id, "status": "processed"}
    await context.state.set("processed", user_id, processed_message)
    await context.streams.processed.set(user_id, processed_message)
    context.logger.info("Message processed", {"userId": user_id})
    await context.emit({ "topic": "message.processed", "data": processed_message })
api.cron.ts
import { CronConfig, Handlers } from 'motia'

export const config: CronConfig = {
  name: 'DailySummary',
  type: 'cron',
  cron: '0 9 * * *',
  emits: ['summary.generated'],
  flows: ['messaging']
}

export const handler: Handlers['DailySummary'] = async ({ emit, state, logger, streams }) => {
  const messages = await state.getGroup('messages')
  const summary = { total: messages.length, status: 'completed' }
  await state.set('summaries', 'daily', summary)
  await streams.summary.set('latest', summary)
  logger.info('Daily summary generated', { total: summary.total })
  await emit({ topic: 'summary.generated', data: summary })
}
api.cron.js
exports.config = {
  name: 'DailySummary',
  type: 'cron',
  cron: '0 9 * * *',
  emits: ['summary.generated'],
  flows: ['messaging']
}

exports.handler = async ({ emit, state, logger, streams }) => {
  const messages = await state.getGroup('messages')
  const summary = { total: messages.length, status: 'completed' }
  await state.set('summaries', 'daily', summary)
  await streams.summary.set('latest', summary)
  logger.info('Daily summary generated', { total: summary.total })
  await emit({ topic: 'summary.generated', data: summary })
}
api_cron.py
config = {
    "name": "DailySummary",
    "type": "cron",
    "cron": "0 9 * * *",
    "emits": ["summary.generated"],
    "flows": ["messaging"]
}

async def handler(context):
    messages = await context.state.get_group("messages")
    summary = {"total": len(messages), "status": "completed"}
    await context.state.set("summaries", "daily", summary)
    await context.streams.summary.set("latest", summary)
    context.logger.info("Daily summary generated", {"total": summary["total"]})
    await context.emit({ "topic": "summary.generated","data": summary })
Infinite composability with minimal complexity.
Master these simple core primitives — and you can build any backend system imaginable.
View Manifesto
npx motia@latest create
DEVEX

Built for devs: because backend shouldn’t mean burnout

Multi language orchestration
Connect services in different languages within one workflow.
Workbench
A visual control panel to monitor and manage your backend.
Code is the config
Define infrastructure and workflows in code, avoiding fragile configs.
Type-safe APIs
Typed endpoints with validation ensure safer integrations and fewer errors.
Streaming
Real-time Objects & Events is accessible without extra setup.
Atomic State & Execution
Data is always safe, thanks to all-or-nothing, atomic operations.
Versatility

Build complex patterns quickly: your backend,
delivered with a single core primitive.

Durable Execution
Workflows maintain state through failures, ensuring reliability.
Human in the Loop
Include manual approvals in automated workflows.
Parallelization
Run tasks simultaneously and merge results seamlessly.
Orchestrator with scheduling
Schedule jobs reliably at specific times or intervals.
Saga
Manage business processes with transaction rollback and recovery.
Left Arrow
right Arrow
AI NATIVE DEVELOPMENT

Ship faster with AI

Build, test, and scale AI-driven apps with Motia’s native support.
Cursor Rules Simplified
Claude Agents & Claude.md
Built-in AI Patterns
Ready-to-Use Configurations
VELOCITY

From concept to production in under a minute.

Under 60 Seconds to Production
Instant Dev Server
Zero Configuration
Immediate Productivity
REST APIs by Default
Start your project with a single command
npx motia@latest create
Observability

Visibility into every step of your backend.

Build from APIs to fully featured Backends

Your backend evolves step by step — start simple, scale seamlessly.
Define
API Trigger
Runs when an HTTP request hits the path.
Automate
Background Jobs
Background jobs let you handle time-consuming tasks without blocking your API responses
Compose
Compose Multi-step Workflows
Automated workflows that manage complex business logic
Think
Agentic Workflow
Build intelligent agentic workflows that make decisions and automate workflows
Learn
Stream Data Seamlessly
Scale to real-time streaming agents, all powered by the same primitive
SCALABILITY

Scale seamlessly — self-hosted or in the cloud.

Use Motia’s fully managed cloud to deploy workflows instantly, or self-host with complete control.
Deploy your first App
Self-host
Deploy Motia on your infrastructure for control over data and compliance.
Set Up Self-Hosting
Motia Cloud
Select a managed option for easy scaling, updates, and less overhead.
Explore Motia Cloud
BYOC (Bring Your Own Cloud)
Everything runs from your terminal. Write, deploy, and monitor without ever leaving your flow.
Coming Soon
RELIABILITY

Fault-tolerant by design.

Automatic Retries
Failed tasks are retried to ensure stability and prevent data loss.
Queue Infra Abstracted
Built-in queuing eliminates the need to manage external brokers like RabbitMQ.
Event-driven Core
A resilient design where events trigger steps reliably.
Shared State Handling
Ensure consistent state in distributed workflows automatically.

Trusted by the community. Built in the open.

Motia is free to use and fully open source under the permissive MIT License — giving you the freedom to build, modify, and deploy without restrictions.
0.7.3-beta.136
LICENSED
GITHUB #1
repository of the day
VERCEL INC. // 2025
OPEN SOURCE SOFTWARE PROGRAM

Built for fast-moving devs and modern teams

Joel Washington
Founder / CEO @Zero & @Shift

After reading your comprehensive architecture and comparing it with the alternatives: Motia is almost eerily perfect for your vision. The alignment is so strong it's like they read your architecture docs and built exactly what you need... This isn't coincidence - it's architectural destiny.

Rituraj Tripathy
Software engineer @Pinelabs

I love the event driven approach to building stuff that motia has taken - went through the code and it looks pretty soli. The unique thing here is how fast one can get his mvp running.

Gaurav Dhiman
Software Engineering Lead @Meta

I am already a big fan of Motia. Motia specially shines in Al Workflows that require multi-agent orchestration, as it supports event driven workflows, which allows you to decouple the agents but still have them communicate by passing payloads through events. Its much easier to maintain workflows which are event driven.

Joel Washington
Founder / CEO @Zero & @Shift

After reading your comprehensive architecture and comparing it with the alternatives: Motia is almost eerily perfect for your vision. The alignment is so strong it's like they read your architecture docs and built exactly what you need... This isn't coincidence - it's architectural destiny.

Rituraj Tripathy
Software engineer @Pinelabs

I love the event driven approach to building stuff that motia has taken - went through the code and it looks pretty soli. The unique thing here is how fast one can get his mvp running.

Gaurav Dhiman
Software Engineering Lead @Meta

I am already a big fan of Motia. Motia specially shines in Al Workflows that require multi-agent orchestration, as it supports event driven workflows, which allows you to decouple the agents but still have them communicate by passing payloads through events. Its much easier to maintain workflows which are event driven.

Still Curious? Here’s What Devs Ask Most.

Faq PlusFaq Minus
Does Motia handle authentication, storage and Database?

Authentication, Storage and Database are not currently managed by Motia out of the box, but we do have plans to add Storage and then Database. Storage and Database are actually in our roadmap. We don't have any immediate plans to implement Authentication yet.

Faq PlusFaq Minus
Can you build complex apps with Motia?

Yes! You can build any backend with Motia. Motia is built with serverless and event-driven architecture built-in in the its concepts. Steps are arbitrary code execution that can run anywhere. Motia is 100% open-source under MIT license and we created a way to deploy it using Docker (server), but we have Motia Cloud which runs in Serverless architecture and is highly scalable where each Step can scale to their own demand.

Faq PlusFaq Minus
Is it like an alternative to platforms like supabase?

We don't consider a competitor to Supabase since you need to run Supabase projects under their infrastructure—Motia is actually a protocol—when using it you're setting your project to run in any cloud—in the future we will have BYOC (Bring your own cloud) which will unlock Motia's true potential by being able to deploy to any cloud providers in the industry.

Faq PlusFaq Minus
Is Motia a  low code tool?

No, we're not competitors to n8n or similar no/low-code tools. Motia is a backend framework to build API, AI Automation, background jobs in event-driven architecture with great developer experience both locally and 1-click deployment to Motia Cloud.

Faq PlusFaq Minus
How to implement authentication?

Motia framework doesn't dictate on how to implement authentication. You can build auth by using middleware and creating an API Step to handle it. There's a good example at Chessarena project

One framework for your entire backend.

APIs, background jobs, workflows, AI agents, streaming, and observability, all unified in a single system. Write Steps in any language. Deploy with one command.