Motia Icon
Development Guide/Adapters

Creating Custom Adapters

Learn how to create custom adapters for Motia to integrate with your infrastructure

This guide explains how to create custom adapters for Motia. Adapters implement specific interfaces to provide pluggable infrastructure components.

Why Create Custom Adapters

Create custom adapters when you need to:

  • Integrate with custom infrastructure (databases, message queues, etc.)
  • Implement specific performance optimizations
  • Add features not available in existing adapters
  • Support new storage backends

Adapter Architecture

Adapters in Motia follow the interface pattern:

  • Interfaces define contracts in @motiadev/core
  • Implementations satisfy these contracts
  • Composition is used throughout - adapters are injected, not extended

Interface Contracts

Stream Adapter

Stream adapters extend the abstract StreamAdapter<TData> class.

Required Methods:

abstract get(groupId: string, id: string): Promise<BaseStreamItem<TData> | null>
abstract set(groupId: string, id: string, data: TData): Promise<BaseStreamItem<TData>>
abstract delete(groupId: string, id: string): Promise<BaseStreamItem<TData> | null>
abstract getGroup(groupId: string): Promise<BaseStreamItem<TData>[]>

Optional Methods (with defaults):

  • send(channel, event) - Send events to subscribers
  • subscribe(channel, handler) - Subscribe to events
  • unsubscribe(channel) - Unsubscribe from events
  • clear(groupId) - Clear all items in a group
  • query(groupId, filter) - Query with filters

Example Implementation:

adapters/my-stream-adapter.ts
import { StreamAdapter } from '@motiadev/core'
import type { BaseStreamItem } from '@motiadev/core'
 
export class MyStreamAdapter<TData> extends StreamAdapter<TData> {
  private storage: Map<string, TData> = new Map()
 
  constructor(streamName: string) {
    super(streamName)
  }
 
  async get(groupId: string, id: string): Promise<BaseStreamItem<TData> | null> {
    const key = `${groupId}:${id}`
    const data = this.storage.get(key)
    return data ? { ...data, id } as BaseStreamItem<TData> : null
  }
 
  async set(groupId: string, id: string, data: TData): Promise<BaseStreamItem<TData>> {
    const key = `${groupId}:${id}`
    this.storage.set(key, data)
    return { ...data, id } as BaseStreamItem<TData>
  }
 
  async delete(groupId: string, id: string): Promise<BaseStreamItem<TData> | null> {
    const key = `${groupId}:${id}`
    const item = await this.get(groupId, id)
    if (item) {
      this.storage.delete(key)
    }
    return item
  }
 
  async getGroup(groupId: string): Promise<BaseStreamItem<TData>[]> {
    const items: BaseStreamItem<TData>[] = []
    for (const [key, value] of this.storage.entries()) {
      if (key.startsWith(`${groupId}:`)) {
        const id = key.split(':').pop()!
        items.push({ ...value, id } as BaseStreamItem<TData>)
      }
    }
    return items
  }
}

State Adapter

State adapters implement the StateAdapter interface, which extends InternalStateManager.

Required Methods:

// From InternalStateManager
get<T>(traceId: string, key: string): Promise<T | null>
set<T>(traceId: string, key: string, value: T): Promise<T>
delete<T>(traceId: string, key: string): Promise<T | null>
getGroup<T>(traceId: string): Promise<T[]>
clear(traceId: string): Promise<void>
 
// Additional StateAdapter methods
cleanup(): Promise<void>
keys(traceId: string): Promise<string[]>
traceIds(): Promise<string[]>
items(input: StateItemsInput): Promise<StateItem[]>

Example Implementation:

adapters/my-state-adapter.ts
import type { StateAdapter, StateItem, StateItemsInput } from '@motiadev/core'
 
export class MyStateAdapter implements StateAdapter {
  private storage: Map<string, Map<string, unknown>> = new Map()
 
  async get<T>(traceId: string, key: string): Promise<T | null> {
    const trace = this.storage.get(traceId)
    if (!trace) return null
    return (trace.get(key) as T) || null
  }
 
  async set<T>(traceId: string, key: string, value: T): Promise<T> {
    let trace = this.storage.get(traceId)
    if (!trace) {
      trace = new Map()
      this.storage.set(traceId, trace)
    }
    trace.set(key, value)
    return value
  }
 
  async delete<T>(traceId: string, key: string): Promise<T | null> {
    const trace = this.storage.get(traceId)
    if (!trace) return null
    const value = trace.get(key) as T | undefined
    if (value !== undefined) {
      trace.delete(key)
      return value
    }
    return null
  }
 
  async getGroup<T>(traceId: string): Promise<T[]> {
    const trace = this.storage.get(traceId)
    if (!trace) return []
    return Array.from(trace.values()) as T[]
  }
 
  async clear(traceId: string): Promise<void> {
    this.storage.delete(traceId)
  }
 
  async cleanup(): Promise<void> {
    this.storage.clear()
  }
 
  async keys(traceId: string): Promise<string[]> {
    const trace = this.storage.get(traceId)
    if (!trace) return []
    return Array.from(trace.keys())
  }
 
  async traceIds(): Promise<string[]> {
    return Array.from(this.storage.keys())
  }
 
  async items(input: StateItemsInput): Promise<StateItem[]> {
    const items: StateItem[] = []
    for (const [traceId, trace] of this.storage.entries()) {
      const groupId = input.groupId || traceId
      for (const [key, value] of trace.entries()) {
        items.push({
          groupId,
          key,
          type: this._inferType(value),
          value: value as any,
        })
      }
    }
    return items
  }
 
  private _inferType(value: unknown): StateItem['type'] {
    if (value === null) return 'null'
    if (Array.isArray(value)) return 'array'
    if (typeof value === 'object') return 'object'
    return typeof value as StateItem['type']
  }
}

Event Adapter

Event adapters implement the EventAdapter interface.

Required Methods:

emit<TData>(event: Event<TData>): Promise<void>
subscribe<TData>(
  topic: string,
  stepName: string,
  handler: (event: Event<TData>) => void | Promise<void>,
  options?: QueueConfig,
): Promise<SubscriptionHandle>
unsubscribe(handle: SubscriptionHandle): Promise<void>
shutdown(): Promise<void>
getSubscriptionCount(topic: string): Promise<number>
listTopics(): Promise<string[]>

Example Implementation:

adapters/my-event-adapter.ts
import type { EventAdapter, Event, SubscriptionHandle, QueueConfig } from '@motiadev/core'
import { v4 as uuidv4 } from 'uuid'
 
type Subscription = {
  topic: string
  handler: (event: Event<any>) => void | Promise<void>
  id: string
}
 
export class MyEventAdapter implements EventAdapter {
  private subscriptions: Map<string, Subscription> = new Map()
  private events: Map<string, Event<any>[]> = new Map()
 
  async emit<TData>(event: Event<TData>): Promise<void> {
    const topicEvents = this.events.get(event.topic) || []
    topicEvents.push(event)
    this.events.set(event.topic, topicEvents)
 
    for (const sub of this.subscriptions.values()) {
      if (sub.topic === event.topic) {
        await sub.handler(event)
      }
    }
  }
 
  async subscribe<TData>(
    topic: string,
    stepName: string,
    handler: (event: Event<TData>) => void | Promise<void>,
    options?: QueueConfig,
  ): Promise<SubscriptionHandle> {
    const id = uuidv4()
    this.subscriptions.set(id, { topic, handler: handler as any, id })
 
    return {
      topic,
      id,
      unsubscribe: async () => {
        await this.unsubscribe({ topic, id, unsubscribe: async () => {} })
      },
    }
  }
 
  async unsubscribe(handle: SubscriptionHandle): Promise<void> {
    this.subscriptions.delete(handle.id)
  }
 
  async shutdown(): Promise<void> {
    this.subscriptions.clear()
    this.events.clear()
  }
 
  async getSubscriptionCount(topic: string): Promise<number> {
    return Array.from(this.subscriptions.values()).filter(
      (sub) => sub.topic === topic,
    ).length
  }
 
  async listTopics(): Promise<string[]> {
    return Array.from(new Set(this.subscriptions.values().map((sub) => sub.topic)))
  }
}

Cron Adapter

Cron adapters implement the CronAdapter interface for distributed locking.

Required Methods:

acquireLock(jobName: string, ttl: number): Promise<CronLock | null>
releaseLock(lock: CronLock): Promise<void>
renewLock(lock: CronLock, ttl: number): Promise<boolean>
isHealthy(): Promise<boolean>
shutdown(): Promise<void>
getActiveLocks(): Promise<CronLockInfo[]>

Example Implementation:

adapters/my-cron-adapter.ts
import type { CronAdapter, CronLock, CronLockInfo } from '@motiadev/core'
import os from 'os'
import { v4 as uuidv4 } from 'uuid'
 
export class MyCronAdapter implements CronAdapter {
  private locks: Map<string, CronLock> = new Map()
  private instanceId: string
 
  constructor() {
    this.instanceId = `${os.hostname()}-${uuidv4()}`
  }
 
  async acquireLock(jobName: string, ttl: number): Promise<CronLock | null> {
    const existingLock = this.locks.get(jobName)
    if (existingLock && existingLock.expiresAt > Date.now()) {
      return null
    }
 
    const lock: CronLock = {
      jobName,
      lockId: uuidv4(),
      acquiredAt: Date.now(),
      expiresAt: Date.now() + ttl,
      instanceId: this.instanceId,
    }
 
    this.locks.set(jobName, lock)
    return lock
  }
 
  async releaseLock(lock: CronLock): Promise<void> {
    const existingLock = this.locks.get(lock.jobName)
    if (existingLock && existingLock.lockId === lock.lockId) {
      this.locks.delete(lock.jobName)
    }
  }
 
  async renewLock(lock: CronLock, ttl: number): Promise<boolean> {
    const existingLock = this.locks.get(lock.jobName)
    if (existingLock && existingLock.lockId === lock.lockId) {
      existingLock.expiresAt = Date.now() + ttl
      return true
    }
    return false
  }
 
  async isHealthy(): Promise<boolean> {
    return true
  }
 
  async shutdown(): Promise<void> {
    this.locks.clear()
  }
 
  async getActiveLocks(): Promise<CronLockInfo[]> {
    const now = Date.now()
    return Array.from(this.locks.values())
      .filter((lock) => lock.expiresAt > now)
      .map((lock) => ({
        jobName: lock.jobName,
        instanceId: lock.instanceId,
        acquiredAt: lock.acquiredAt,
        expiresAt: lock.expiresAt,
      }))
  }
}

Step-by-Step Creation

1. Choose the Adapter Interface

Import the interface or abstract class from @motiadev/core:

import { StreamAdapter } from '@motiadev/core'
// or
import type { StateAdapter } from '@motiadev/core'
import type { EventAdapter } from '@motiadev/core'
import type { CronAdapter } from '@motiadev/core'

2. Implement Required Methods

Create a class that implements all required methods. For StreamAdapter, extend the abstract class. For others, implement the interface.

3. Handle Initialization and Cleanup

  • Initialization: Perform setup in the constructor or an init() method
  • Cleanup: Implement shutdown() or cleanup() methods to release resources (connections, timers, etc.)

4. Register in Configuration

Add your adapter to motia.config.ts:

motia.config.ts
import { config } from '@motiadev/core'
import { MyCustomAdapter } from './adapters/my-adapter'
 
export default config({
  adapters: {
    streams: new MyCustomAdapter('my-stream'),
    state: new MyStateAdapter(),
    events: new MyEventAdapter(),
    cron: new MyCronAdapter(),
  },
})

Best Practices

Error Handling

Always handle errors gracefully and provide meaningful error messages:

async get(groupId: string, id: string): Promise<BaseStreamItem<TData> | null> {
  try {
    // Implementation
  } catch (error) {
    throw new Error(`Failed to get stream item: ${error.message}`)
  }
}

Resource Cleanup

Always clean up resources in shutdown methods:

async shutdown(): Promise<void> {
  // Close connections
  // Clear timers
  // Release resources
  this.subscriptions.clear()
}

Health Checks

Implement health checks for production adapters:

async isHealthy(): Promise<boolean> {
  try {
    // Check connection status
    // Verify backend is accessible
    return true
  } catch {
    return false
  }
}

Testing

Test adapters with in-memory implementations:

import { MemoryStreamAdapter } from '@motiadev/core'
 
const adapter = new MemoryStreamAdapter('test')
// Test operations
await adapter.set('group', 'id', { data: 'test' })
const result = await adapter.get('group', 'id')

Type Safety

Maintain type safety by using generics correctly:

export class MyStreamAdapter<TData> extends StreamAdapter<TData> {
  // TData is typed throughout
}

Performance Considerations

  • Use efficient data structures
  • Implement connection pooling for remote adapters
  • Consider caching for frequently accessed data
  • Batch operations when possible

Testing Your Adapter

  1. Create unit tests for each method
  2. Test error scenarios (connection failures, invalid inputs)
  3. Test concurrent access for distributed adapters
  4. Verify cleanup in shutdown methods
  5. Test with actual Motia steps to ensure integration works

Publishing Adapter Packages

If creating a reusable adapter package:

  1. Create a new package in packages/adapter-*
  2. Export your adapter class
  3. Include TypeScript types
  4. Add README with usage examples
  5. Follow the naming convention: @motiadev/adapter-{name}-{type}

Example: @motiadev/adapter-mongodb-state


What's Next?

Need help? See our Community Resources for questions, examples, and discussions.