Job Queues in Drift KV
Drift KV provides a robust job queue system through the DriftQueue
class, enabling background processing, scheduled tasks, and reliable task execution with retry mechanisms.
Basic Queue Setup
import { DriftQueue } from "drift-kv";
import { z } from "zod";
// Define a queue for email processing
const emailQueue = new DriftQueue({
name: "email",
description: "Queue for processing email notifications",
schema: (z) => ({
to: z.string().email(),
subject: z.string(),
body: z.string(),
attachments: z.array(z.string().url()).optional(),
}),
handler: async (job) => {
await sendEmail(job.data);
},
});
Queue Configuration
Options
const imageProcessingQueue = new DriftQueue({
name: "image-processing",
schema: (z) => ({
imageUrl: z.string().url(),
sizes: z.array(z.number()),
format: z.enum(["jpeg", "webp", "png"]),
quality: z.number().min(1).max(100),
}),
hooks: {
onJobBeforeSchedule: async (data) => {
// Pre-enqueue validation or transformation
},
onJobStart: async (data) => {
// Job start handling
},
onJobEnd: async (data) => {
// Success handling
},
onJobError: async (error, data) => {
// Error handling
},
},
handler: async (job) => {
await processImage(job.data);
},
});
Job Processing
Enqueuing Jobs
// Add a job to the queue
await drift.queues.emailQueue.schedule({
data: {
to: "user@example.com",
subject: "Welcome to Drift KV",
body: "Thank you for joining us!",
},
});
// Add multiple jobs
await Promise.all([
drift.queues.emailQueue.schedule({
data: notification1,
}),
drift.queues.emailQueue.schedule({
data: notification2,
}),
]);
Processing Jobs
// Start processing jobs
await drift.queues.emailQueue.process({
hooks: {
onJobStart: async (job) => {
console.log("Starting job:", job.id);
},
onJobEnd: async (job) => {
console.log("Job completed:", job.id);
},
onJobError: async (error, job) => {
console.error("Job error:", error, "Job ID:", job.id);
},
},
});
Error Handling and Retries
const emailQueue = new DriftQueue({
name: "email",
schema: (z) => ({
to: z.string().email(),
subject: z.string(),
body: z.string(),
}),
hooks: {
onJobError: async (error, data) => {
if (error.code === "SMTP_ERROR") {
// Handle SMTP errors
await notifyAdmin(error);
}
// Log error details
await logError({
queue: "email",
error,
data,
});
},
},
handler: async (job) => {
try {
await sendEmail(job.data);
} catch (error) {
// Job will be retried based on retryAttempts
throw new Error("Failed to send email: " + error.message);
}
},
});
Real-world Examples
Image Processing Queue
const imageQueue = new DriftQueue({
name: "image-processing",
schema: (z) => ({
imageUrl: z.string().url(),
sizes: z.array(z.number()),
format: z.enum(["jpeg", "webp", "png"]),
quality: z.number().min(1).max(100),
}),
hooks: {
onJobBeforeSchedule: async (data) => {
// Validate image URL
const exists = await checkImageExists(data.imageUrl);
if (!exists) {
throw new Error("Image not found");
}
},
onJobStart: async (data) => {
await updateJobStatus(data.id, "processing");
},
onJobEnd: async (data) => {
await updateJobStatus(data.id, "completed");
},
onJobError: async (error, data) => {
await updateJobStatus(data.id, "failed");
await notifyAdmin({
type: "image-processing-failed",
data,
error,
});
},
},
handler: async (job) => {
const { imageUrl, sizes, format, quality } = job.data;
// Process image for each size
const results = await Promise.all(
sizes.map(size =>
processImage({
url: imageUrl,
size,
format,
quality,
})
)
);
return results;
},
});
Data Export Queue
const exportQueue = new DriftQueue({
name: "data-export",
schema: (z) => ({
userId: z.string().uuid(),
format: z.enum(["csv", "json", "xlsx"]),
filters: z.record(z.unknown()).optional(),
notifyEmail: z.string().email(),
}),
hooks: {
onJobStart: async (data) => {
await updateExportStatus(data.userId, "processing");
},
onJobEnd: async (data) => {
await updateExportStatus(data.userId, "completed");
await drift.queues.emailQueue.schedule({
data: {
to: data.notifyEmail,
subject: "Your export is ready",
body: "Your data export has been completed...",
},
});
},
onJobError: async (error, data) => {
await updateExportStatus(data.userId, "failed");
await drift.queues.emailQueue.schedule({
data: {
to: data.notifyEmail,
subject: "Export failed",
body: `Your data export failed: ${error.message}`,
},
});
},
},
handler: async (job) => {
const { userId, format, filters } = job.data;
// Fetch data
const data = await fetchUserData(userId, filters);
// Format data
const formatted = await formatData(data, format);
// Store result
const fileUrl = await storeExport(formatted, userId);
return { fileUrl };
},
});
Best Practices
-
Job Design
- Keep jobs focused and single-purpose
- Include all necessary data in job payload
- Implement proper validation
-
Error Handling
- Use appropriate retry strategies
- Implement proper error logging
- Handle edge cases
-
Resource Management
- Control concurrency appropriately
- Implement proper cleanup
- Monitor resource usage
-
Monitoring
- Track job completion rates
- Monitor queue lengths
- Set up alerting for failures
Next Steps
- Learn about Real-time Features
- Understand Error Handling
- Review Performance Optimization
Built for Deno KV
Discover Drift KV: The Ultimate Type-Safe ORM for Deno KV
Experience a powerful ORM with real-time subscriptions, job queues, and seamless compatibility across Deno KV, Node.js, Bun.js, and Edge environments.