The use case was straightforward: a backend service needed to receive document submissions and forward them to one or more document manager endpoints. Document managers accepted documents, processed them, and stored them. Documents could have been submitted by other systems independently, so the service had to account for the possibility that any given document was already known to a document manager. Submissions had to happen in the background, transient failures had to be retried, and every document needed to ultimately land in a terminal state; either successfully managed or definitively failed.
Status tracking via Postgres
The foundation was a postgres table. Each document row carried a status field tracking where the document was in its lifecycle:
// has received the document
RECEIVED,
// scheduled for processing in the future
SCHEDULED,
// queued, awaiting processing
QUEUED,
// actively being processed
PROCESSING,
// submitted to the document manager; manager acknowledged reception
SUBMITTED,
// document manager returned a rejection/error response
REJECTED,
// document manager accepted the document, or a lookup confirmed it is already managed
MANAGED,
// document manager returned a "not managed" response on status query
NOT_MANAGED,
// document manager is busy; retry later
TRY_AGAIN_LATER,
// an unanticipated error occurred; handling failed
FAILED,
// a previously pending document was cancelled
CANCELLED
The statuses cover both sides: states reflecting what the service itself is doing ( RECEIVED, SCHEDULED, QUEUED, PROCESSING) and states reflecting what the document manager reported back ( SUBMITTED, REJECTED, MANAGED, NOT_MANAGED, TRY_AGAIN_LATER). Keeping these in one enum gives a clear audit trail of the full document lifecycle in a single column.
One thing worth noting: there is an argument to be made that this design co-mingles two different concerns in a single status field. States like RECEIVED, SCHEDULED, QUEUED, and PROCESSING are really queue and processing pipeline states, while SUBMITTED, REJECTED, MANAGED, NOT_MANAGED, and FAILED reflect the document lifecycle and outcomes from the document manager. Keeping them together was a pragmatic choice that kept the implementation simple for the use case at hand; a single column, a single query, straightforward state machine logic. In a future iteration these could reasonably be split into two separate status fields or tables, one tracking where the document is in the processing pipeline and one tracking the document manager outcome.
Abstracting documents to WorkItems
Once the pattern was working it became clear the underlying mechanics had nothing document specific about them. The queue, status transitions, retry behavior; all generic. So the document concept was abstracted up to a WorkItem with the document implementation sitting behind it, making it straightforward to externalize the whole pattern into a reusable library.
A WorkItem represents anything that can be queued and processed:
public interface WorkItem {
WorkItemType getWorkItemType();
String getWorkItemId();
String getWorkItemStatus();
Boolean workItemIsQueuable();
WorkItemRef toWorkItemRef();
}WorkItemRef is a lightweight reference passed around the queue; just enough to identify and route the item without dragging the full entity through every layer:
public class WorkItemRef {
private String id;
private WorkItemType type;
private String status;
}WorkItemProcessor handles the actual work. The notable thing is TryProcessingWorkItemLaterException; throwing this signals the item should be re-queued rather than failed, which is how transient errors and TRY_AGAIN_LATER responses get handled:
public interface WorkItemProcessor {
void processWorkItem(WorkItemRef workItemRef) throws TryProcessingWorkItemLaterException;
}Persistence sits behind WorkItemStore, keeping the queue machinery decoupled from any particular storage implementation:
public interface WorkItemStore {
List getQueueableWorkItems();
void setWorkItemStatusQueuedAndSave(WorkItem workItem, String message) throws SaveWorkItemFailureException;
WorkItem getWorkItemById(String workItemId);
}What makes a WorkItem queueable
The getQueueableWorkItems() contract is open ended; each implementation decides what "queueable" means for its entity. For the document implementation this was backed by a streamActionableRecords() query that covered several scenarios. The logic was not simply "give me everything in status X"; it also had to pick up items sitting in intermediate states too long, and items whose scheduled or retry time had arrived:
public Stream streamActionableRecords() {
/*
get everything that is RECEIVED
OR
SCHEDULED and documentScheduledAt < NOW()
OR
QUEUED and statusQueuedAt < NOW-60 minutes
OR
PROCESSING and statusProcessingAt < NOW-60 minutes
OR
TRY_AGAIN_LATER and documentTryAgainLaterAt < NOW
*/
Date now = new Date();
return find("((status IN (?1)) OR " +
"(status = ?2 AND (documentScheduledAt IS NULL OR documentScheduledAt < ?3)) OR " +
"(status = ?4 AND (statusQueuedAt IS NULL OR statusQueuedAt < ?5)) OR " +
"(status = ?6 AND (statusProcessingAt IS NULL OR statusProcessingAt < ?7)) OR " +
"(status = ?8 AND (documentTryAgainLaterAt IS NULL OR documentTryAgainLaterAt < ?9)))",
actionableStatus, // 1
DocumentStatus.SCHEDULED, // 2
now, // 3
DocumentStatus.QUEUED, // 4
DateUtil.computeAdjustedDate(-60), // 5
DocumentStatus.PROCESSING, // 6
DateUtil.computeAdjustedDate(-60), // 7
DocumentStatus.TRY_AGAIN_LATER, // 8
now // 9
).stream();
}The QUEUED and PROCESSING thresholds act as a safety net. If an item was placed in either state but never advanced due to a crash or restart, the next scheduled sweep picks it back up rather than leaving it stuck. The TRY_AGAIN_LATER case lets the document manager throttle retries by setting a future timestamp; the item will not be picked up again until that time has passed.
The queue
The WorkQueue interface is intentionally thin:
public interface WorkQueue {
String getLockId();
void enqueue(WorkItemRef workItemRef);
WorkItemRef dequeue(long timeout, TimeUnit unit) throws NoSuchElementException, InterruptedException;
}The initial implementation was an in process LinkedBlockingDeque wrapped behind InProcessQueue. Keeping the queue behind an interface means swapping to SQS, Kafka, or anything else requires only a new implementation without touching publisher or consumer logic.
public class InProcessQueue implements WorkQueue {
private final BlockingQueue queue = new LinkedBlockingDeque<>(1000);
...
@Override
public void enqueue(WorkItemRef workItemRef) {
if (workItemRef != null && !this.queue.contains(workItemRef)) {
this.queue.add(workItemRef);
}
}
@Override
public WorkItemRef dequeue(long timeout, TimeUnit unit) throws InterruptedException {
return this.queue.poll(timeout, unit);
}
}Publisher and consumer
WorkQueuePublisher has two entry points: populateWorkQueue(), called on a schedule to sweep for queueable items, and publish(WorkItemRef), which queues a specific item directly. Both acquire a distributed lock before doing anything; more on that below.
Before queuing an item, publish() re-fetches the WorkItem from the store and calls workItemIsQueuable(). This guards against race conditions where an item's status may have changed between the time it was identified as queueable and the time publish() actually runs. If no longer queueable it is silently skipped.
public final void publish(WorkItemRef workItemRef) {
...
workItem = this.workItemStore.getWorkItemById(workItemRef.getId());
if (!workItem.workItemIsQueuable()) {
// status changed since we last looked; skip it
return;
}
workItemStore.setWorkItemStatusQueuedAndSave(workItem, ...);
this.workQueue.enqueue(workItem.toWorkItemRef());
...
}WorkQueueConsumer runs as a Runnable on a ManagedExecutor (Quarkus's CDI aware managed thread pool). It loops continuously, blocking on dequeue() with a 30 second timeout. When an item comes off the queue it is handed off to a separate executor for processing, keeping the consumer loop free to pull the next item. If the processor throws TryProcessingWorkItemLaterException the item is re-published rather than discarded:
public void consumeFromQueue() throws InterruptedException {
WorkItemRef workItemRef = this.workQueue.dequeue(30, TimeUnit.SECONDS);
if (workItemRef != null) {
this.processorExecutor.execute(new Runnable() {
@Override
public void run() {
try {
workItemProcessor.processWorkItem(workItemRef);
} catch(TryProcessingWorkItemLaterException e) {
republish(workItemRef);
}
}
});
}
}Distributed locking
Both the publisher and consumer rely on a distributed locking mechanism to prevent duplicate processing and queue population races. The locking abstraction implements the standard java.util.concurrent.Lock contract so the underlying implementation can be swapped out; database backed locks, Hazelcast, or anything else; without touching the publisher or consumer code. This is covered in more detail in a follow-up post.
The resulting structure
A scheduler periodically calls populateWorkQueue() which sweeps the store for any items in a queueable state and publishes them. Separately, any new item received by the service can be published directly. The consumer loop pulls items off the queue and dispatches them to the processor. The processor calls the document manager, updates the status, and either completes the item or throws TryProcessingWorkItemLaterException to trigger a retry.
Summary
A postgres status column, a thin queue abstraction, a publisher/consumer pair with distributed locking, and a processor interface with a retry exception covers the majority of asynchronous background processing use cases without pulling in a full message broker. The higher level abstractions ( WorkItem, WorkItemStore, WorkQueue, WorkQueuePublisher, WorkQueueConsumer) were extracted into a shared library; concrete implementations of the queue and store live in the service itself. New projects pick up the library and provide their own implementations of the relevant interfaces as needed.
Originally published at http://bitsofinfo.wordpress.com on March 3, 2024.
Simple pattern for an asynchronous processing service was originally published in Level Up Coding on Medium, where people are continuing the conversation by highlighting and responding to this story.