Job Queues in Drift KV

Drift KV provides a robust job queue system through the DriftQueue class, enabling background processing, scheduled tasks, and reliable task execution with retry mechanisms.

Basic Queue Setup

import { DriftQueue } from "drift-kv";
import { z } from "zod";

// Define a queue for email processing
const emailQueue = new DriftQueue({
  name: "email",
  description: "Queue for processing email notifications",
  schema: (z) => ({
    to: z.string().email(),
    subject: z.string(),
    body: z.string(),
    attachments: z.array(z.string().url()).optional(),
  }),
  handler: async (job) => {
    await sendEmail(job.data);
  },
});

Queue Configuration

Options

const imageProcessingQueue = new DriftQueue({
  name: "image-processing",
  schema: (z) => ({
    imageUrl: z.string().url(),
    sizes: z.array(z.number()),
    format: z.enum(["jpeg", "webp", "png"]),
    quality: z.number().min(1).max(100),
  }),
  hooks: {
    onJobBeforeSchedule: async (data) => {
      // Pre-enqueue validation or transformation
    },
    onJobStart: async (data) => {
      // Job start handling
    },
    onJobEnd: async (data) => {
      // Success handling
    },
    onJobError: async (error, data) => {
      // Error handling
    },
  },
  handler: async (job) => {
    await processImage(job.data);
  },
});

Job Processing

Enqueuing Jobs

// Add a job to the queue
await drift.queues.emailQueue.schedule({
  data: {
    to: "user@example.com",
    subject: "Welcome to Drift KV",
    body: "Thank you for joining us!",
  },
});

// Add multiple jobs
await Promise.all([
  drift.queues.emailQueue.schedule({
    data: notification1,
  }),
  drift.queues.emailQueue.schedule({
    data: notification2,
  }),
]);

Processing Jobs

// Start processing jobs
await drift.queues.emailQueue.process({
  hooks: {
    onJobStart: async (job) => {
      console.log("Starting job:", job.id);
    },
    onJobEnd: async (job) => {
      console.log("Job completed:", job.id);
    },
    onJobError: async (error, job) => {
      console.error("Job error:", error, "Job ID:", job.id);
    },
  },
});

Error Handling and Retries

const emailQueue = new DriftQueue({
  name: "email",
  schema: (z) => ({
    to: z.string().email(),
    subject: z.string(),
    body: z.string(),
  }),
  hooks: {
    onJobError: async (error, data) => {
      if (error.code === "SMTP_ERROR") {
        // Handle SMTP errors
        await notifyAdmin(error);
      }
      
      // Log error details
      await logError({
        queue: "email",
        error,
        data,
      });
    },
  },
  handler: async (job) => {
    try {
      await sendEmail(job.data);
    } catch (error) {
      // Job will be retried based on retryAttempts
      throw new Error("Failed to send email: " + error.message);
    }
  },
});

Real-world Examples

Image Processing Queue

const imageQueue = new DriftQueue({
  name: "image-processing",
  schema: (z) => ({
    imageUrl: z.string().url(),
    sizes: z.array(z.number()),
    format: z.enum(["jpeg", "webp", "png"]),
    quality: z.number().min(1).max(100),
  }),
  hooks: {
    onJobBeforeSchedule: async (data) => {
      // Validate image URL
      const exists = await checkImageExists(data.imageUrl);
      if (!exists) {
        throw new Error("Image not found");
      }
    },
    onJobStart: async (data) => {
      await updateJobStatus(data.id, "processing");
    },
    onJobEnd: async (data) => {
      await updateJobStatus(data.id, "completed");
    },
    onJobError: async (error, data) => {
      await updateJobStatus(data.id, "failed");
      await notifyAdmin({
        type: "image-processing-failed",
        data,
        error,
      });
    },
  },
  handler: async (job) => {
    const { imageUrl, sizes, format, quality } = job.data;
    
    // Process image for each size
    const results = await Promise.all(
      sizes.map(size => 
        processImage({
          url: imageUrl,
          size,
          format,
          quality,
        })
      )
    );
    
    return results;
  },
});

Data Export Queue

const exportQueue = new DriftQueue({
  name: "data-export",
  schema: (z) => ({
    userId: z.string().uuid(),
    format: z.enum(["csv", "json", "xlsx"]),
    filters: z.record(z.unknown()).optional(),
    notifyEmail: z.string().email(),
  }),
  hooks: {
    onJobStart: async (data) => {
      await updateExportStatus(data.userId, "processing");
    },
    onJobEnd: async (data) => {
      await updateExportStatus(data.userId, "completed");
      await drift.queues.emailQueue.schedule({
        data: {
          to: data.notifyEmail,
          subject: "Your export is ready",
          body: "Your data export has been completed...",
        },
      });
    },
    onJobError: async (error, data) => {
      await updateExportStatus(data.userId, "failed");
      await drift.queues.emailQueue.schedule({
        data: {
          to: data.notifyEmail,
          subject: "Export failed",
          body: `Your data export failed: ${error.message}`,
        },
      });
    },
  },
  handler: async (job) => {
    const { userId, format, filters } = job.data;
    
    // Fetch data
    const data = await fetchUserData(userId, filters);
    
    // Format data
    const formatted = await formatData(data, format);
    
    // Store result
    const fileUrl = await storeExport(formatted, userId);
    
    return { fileUrl };
  },
});

Best Practices

  1. Job Design

    • Keep jobs focused and single-purpose
    • Include all necessary data in job payload
    • Implement proper validation
  2. Error Handling

    • Use appropriate retry strategies
    • Implement proper error logging
    • Handle edge cases
  3. Resource Management

    • Control concurrency appropriately
    • Implement proper cleanup
    • Monitor resource usage
  4. Monitoring

    • Track job completion rates
    • Monitor queue lengths
    • Set up alerting for failures

Next Steps

Built for Deno KV

Discover Drift KV: The Ultimate Type-Safe ORM for Deno KV

Experience a powerful ORM with real-time subscriptions, job queues, and seamless compatibility across Deno KV, Node.js, Bun.js, and Edge environments.