Middlewares

Middlewares in JoobQ are powerful tools to customize the behavior of job processing globally. They enable features like logging, retrying failed jobs, throttling, enforcing execution time limits, and implementing custom logic.

JoobQ employs a middleware pipeline to process jobs in a flexible and modular manner. Each middleware component can intercept jobs as they pass through the pipeline, allowing for tasks like logging, timeout handling, throttling, and retry mechanisms.


Middleware Pipeline Overview

At the core is the MiddlewarePipeline, which manages the execution of registered middleware components sequentially.

class MiddlewarePipeline
  @middlewares : Array(Middleware)

  def initialize(@middlewares : Array(Middleware) = [] of Middleware)
  end

  def call(job : Job, queue : BaseQueue, &block : ->)
    call_next(0, job, queue, &block)
  end

  private def call_next(index : Int32, job : Job, queue : BaseQueue, &block : ->)
    # Executes middleware in order
  end
end

Key Concepts:

  • Middleware Registration: Middleware components are stored in an array and executed in order.

  • Control Flow: Each middleware decides whether to process a job or pass it to the next middleware.

  • Global Configuration: Middlewares are added to the global configuration (JoobQ.config), not individual queues.

  • Matching Logic: The matches? method determines whether a middleware applies to a specific job and queue.

  • Pipeline Execution: Middlewares are executed in the order they are defined. If a middleware does not match, the pipeline skips it.

Built-in Middlewares

  1. AsyncLoggingMiddleware: Logs details about job execution asynchronously.

  2. RetryMiddleware: Retries failed jobs based on a configured maximum number of retries and delay.

  3. ThrottleMiddleware: Limits concurrent job executions.

  4. TimeoutMiddleware: Sets a maximum execution time for jobs.


Middleware Interface

All middleware components include the Middleware module and implement two essential methods:

  • matches?(job : Job, queue : BaseQueue) : Bool: Determines if the middleware should act on the given job.

  • call(job : Job, queue : BaseQueue, next_middleware : ->) : Nil: Contains the middleware's logic and controls the flow to the next middleware.

How to Use Middlewares in JoobQ

JoobQ allows you to customize the behavior of your job queue system by registering middlewares through the use method. This method provides a flexible way to add or modify the middleware stack globally.


Registering Middlewares with use

The use method is part of the JoobQ::Configure class and serves as a DSL to modify the middleware stack. It accepts a block where you can manipulate the array of middlewares (middlewares property).

Basic Syntax

JoobQ.configure do |config|
  config.use do |middlewares|
    middlewares << MyCustomMiddleware.new
    middlewares.delete(JoobQ::Middlewares::RetryMiddleware) # Optionally remove default middlewares
  end
end

This allows you to:

  • Add new middlewares.

  • Remove or replace default middlewares.

  • Customize the order of middleware execution.


Examples

Adding a Custom Middleware

Here’s an example of adding a custom logging middleware:

class CustomLoggerMiddleware 
  include JoobQ::Middleware
  
  def matches?(job : JoobQ::Job, queue : JoobQ::BaseQueue) : Bool
    true # Apply to all jobs
  end

  def call(job : JoobQ::Job, queue : JoobQ::BaseQueue, next_middleware : ->)
    Log.info { "Job #{job.id} started on queue #{queue.name}" }
    next_middleware.call
    Log.info { "Job #{job.id} finished on queue #{queue.name}" }
  end
end

JoobQ.configure do |config|
  config.use do |middlewares|
    middlewares << CustomLoggerMiddleware.new
  end
end

Removing a Default Middleware

To remove a default middleware like ThrottleMiddleware:

JoobQ.configure do |config|
  config.use do |middlewares|
    middlewares.delete(JoobQ::Middlewares::ThrottleMiddleware)
  end
end

Replacing a Middleware

To replace the RetryMiddleware with a custom implementation:

class CustomRetryMiddleware < JoobQ::Middlewares::RetryMiddleware
  def initialize(max_retries : Int32)
    super
  end

  def matches?(job : JoobQ::Job, queue : JoobQ::BaseQueue) : Bool
    job.priority == "high" # Apply only to high-priority jobs
  end
end

JoobQ.configure do |config|
  config.use do |middlewares|
    middlewares.delete(JoobQ::Middlewares::RetryMiddleware)
    middlewares << CustomRetryMiddleware.new(max_retries: 5)
  end
end

Understanding Middleware Execution

When a job is processed, middlewares are executed in the order they are defined in the middlewares array:

  1. JoobQ evaluates each middleware's matches? method.

  2. If matches? returns true, the middleware’s call method is invoked.

  3. Each middleware must call next_middleware.call to pass control to the next middleware in the pipeline.

For example, if your middleware stack looks like this:

JoobQ.configure do |config|
  config.use do |middlewares|
    middlewares << JoobQ::Middlewares::AsyncLogging.new
    middlewares << JoobQ::Middlewares::Retry.new(max_retries: 3)
    middlewares << JoobQ::Middlewares::Timeout.new(timeout: 10.seconds)
  end
end
  • Logging happens first.

  • If a job fails, RetryMiddleware retries it up to 3 times.

  • If a job takes longer than 10 seconds, TimeoutMiddleware stops execution.


Advanced: Conditional Middleware Execution

Middlewares can be conditionally applied to specific jobs or queues using the matches? method.

Example: Queue-Specific Middleware

class QueueSpecificMiddleware 
  include JoobQ::Middleware
  
  def matches?(job : JoobQ::Job, queue : JoobQ::BaseQueue) : Bool
    queue.name == "critical_queue" # Only apply to the "critical_queue"
  end

  def call(job : JoobQ::Job, queue : JoobQ::BaseQueue, next_middleware : ->)
    Log.info { "Critical job #{job.id} started!" }
    next_middleware.call
  end
end

JoobQ.configure do |config|
  config.use do |middlewares|
    middlewares << QueueSpecificMiddleware.new
  end
end

Best Practices for Using use

  1. Plan Middleware Order:

    • Place logging or monitoring middlewares early in the stack.

    • Place retry, throttling, or timeout mechanisms after them.

  2. Leverage matches?:

    • Use matches? for fine-grained control over middleware application.

  3. Test Middleware Behavior:

    • Test each middleware independently to ensure it integrates well with others.

By using the use method effectively, you can fully customize the behavior of your job processing pipeline in JoobQ.

Last updated