Start now →

Poor man’s distributed locking

By bitsofinfo · Published April 10, 2026 · 11 min read · Source: Level Up Coding
Blockchain
Poor man’s distributed locking

One of my prior posts on the async processing service mentioned a distributed locking mechanism. This post touches on that implementation. The short version: the client had no existing distributed locking infrastructure; no Hazelcast, no Redis, no Zookeeper, nothing. What they did have was Postgres, which was already in use. That turned out to be enough.

The problem

With a background processing service running queue population sweeps on a schedule, and potentially multiple threads or service instances doing work concurrently, there were two categories of race conditions to guard against. The first was queue level: two threads calling populateWorkQueue() simultaneously and enqueuing the same items twice. The second was item level: two threads picking up the same work item and processing it concurrently. Neither outcome was acceptable; duplicate processing in particular could cause real problems depending on what the downstream document manager did with repeated submissions.

The obvious solutions, Hazelcast distributed maps, Redis SETNX, or purpose-built lock services, were all off the table given the client's infrastructure. The less obvious solution was sitting right there in Postgres: advisory locks.

Postgres advisory locks

Postgres advisory locks are a feature most people never touch. Unlike row locks or table locks which are tied to transactions and specific data, advisory locks are application level locks identified by a numeric key that you manage entirely yourself. Postgres provides two variants: blocking ( pg_advisory_lock) which waits until the lock is available, and non-blocking ( pg_try_advisory_lock) which returns immediately with a boolean indicating whether the lock was granted.

The numeric key is the interesting part. Postgres advisory locks take a bigint identifier, so any string lock name can be mapped to one via a hash. The implementation does exactly that:

private static long toPgLockId(String lockId) {
return lockId.hashCode() & Long.MAX_VALUE;
}

This gives a stable numeric identifier for any named lock, which is all Postgres needs.

The LockMgr interface

The locking abstraction was kept deliberately simple. As mentioned in the previous post, it implements the standard java.util.concurrent.Lock contract so callers do not need to know anything about the underlying mechanism:

public interface LockMgr {
public Lock getLock(String lockId, String context) throws LockUnavailableException;
}

A lock has two things: an identifier (the resource being locked) and a context. The context serves a dual purpose; it describes the scope of the locking operation (i.e. what logical operation or component is acquiring the lock) as well as identifying who is asking for it, both of which are captured in log output for diagnostics and tracing. getLock() returns a standard Lock that the caller can use with the familiar lock() / unlock() pattern. If the lock cannot be obtained at all, LockUnavailableException is thrown.

PgAdvisoryLock

PgAdvisoryLock is the Lock implementation. It wraps the two Postgres advisory lock SQL calls behind the standard Lock interface methods:

private final String NON_BLOCKING_LOCK_SQL = "SELECT pg_try_advisory_lock(?)";
private static final String BLOCKING_LOCK_SQL = "SELECT pg_advisory_lock(?)";

lock() uses the blocking variant; it will wait until the lock is available. tryLock() uses the non-blocking variant; it returns immediately with true or false. This maps cleanly onto the java.util.concurrent.Lock contract without any custom logic needed in the callers.

The unlock() implementation issues pg_advisory_unlock() against the same connection the lock was acquired on, then defers to PostgresAdvisoryLockMgr for connection cleanup:

public void unlock() {
try {
if (!lockGranted) { return; }
try (val statement = connection.prepareStatement("SELECT pg_advisory_unlock(?)")) {
statement.setLong(1, lockId);
try (val result = statement.executeQuery()) {
if (result.next()) {
unlockSuccess = result.getBoolean(1);
}
}
}
} catch (Throwable e) {
throw new LockUnavailableException("unlock() FAILED " + this + " error:" + e.getMessage(), e);
} finally {
this.lockMgr.handleUnlock(this);
}
}

PostgresAdvisoryLockMgr

PostgresAdvisoryLockMgr is the LockMgr implementation and handles the connection lifecycle. It maintains two thread-local maps: one tracking the JDBC connection in use for the current thread, and one tracking the stack of locks held by the current thread.

The stack matters because Postgres advisory locks are re-entrant at the connection level; the same connection can acquire the same lock multiple times and must release it the same number of times. Tracking locks in a stack per thread ensures that the JDBC connection is held open for as long as any lock on that thread is still active, and closed cleanly when the last lock is released:

protected void handleUnlock(PgAdvisoryLock lock) {
Long threadId = Thread.currentThread().threadId();
Stack locks = thread2locks.get(threadId);
locks.pop();
if (locks.isEmpty()) {
PgLockConnection pgConn = thread2connection.get(threadId);
if (pgConn != null) { pgConn.close(); }
this.thread2locks.remove(threadId);
this.thread2connection.remove(threadId);
}
}

Why a separate datasource

This was the most non-obvious part of the implementation. The first instinct was to just use the existing Quarkus datasource, but that ran into a fundamental problem quickly that led to hard to diagnose issues during development: Quarkus’s CDI container manages the lifecycle of datasource connections in conjunction with the active transaction context. Advisory locks acquired on a CDI-managed connection are tied to that transaction; when the transaction commits or rolls back, the connection is returned to the pool and the advisory locks are released by Postgres automatically, regardless of whether pg_advisory_unlock was ever called. That completely undermines the purpose of the lock.

The solution was a dedicated, separately configured AgroalDataSource created outside of CDI's awareness and used exclusively for advisory locks:

public static AgroalDataSource create(String url, String username, String password) throws Exception {
AgroalDataSourceConfigurationSupplier configuration = new AgroalDataSourceConfigurationSupplier()
.connectionPoolConfiguration( cp -> cp
.minSize( 5 )
.maxSize( 20 )
.acquisitionTimeout( ofSeconds( 5 ) )
.connectionFactoryConfiguration( cf -> cf
.jdbcUrl( url )
.autoCommit( false )
.jdbcTransactionIsolation( SERIALIZABLE )
.principal( new NamePrincipal( username ) )
.credential( new SimplePassword( password ) )
)
);
return AgroalDataSource.from(configuration);
}

Connections from this pool are not managed by Quarkus’s CDI transaction machinery, so advisory locks held on them live and die by the explicit pg_advisory_lock and pg_advisory_unlock calls only.

Usage

From the caller’s perspective the API is the same as any java.util.concurrent.Lock. A lock is requested by id and context, acquired, work is done, and the lock is released in the finally block. A LockUnavailableException signals that the lock could not be obtained at all and the caller can decide how to handle that; in the context of the processing service this typically means re-queuing the work item for a later attempt:

Lock lock = null;
try {
lock = lockMgr.getLock("uniqueId", getClass() + ".doSomething()");
lock.lock();
// do work
} catch (LockUnavailableException e) {
throw new TryProcessingWorkItemLaterException("Could not acquire lock, try again later: " + e.getMessage());
} catch (Exception e) {
// handle
} finally {
if (lock != null) { lock.unlock(); }
}

tryLock() is used in cases where skipping is preferable to waiting; for example the scheduled queue population sweep uses tryLock() so that if another instance already holds the queue lock, the sweep just exits cleanly rather than blocking. lock() is used where waiting is the right behavior; for example acquiring an item level lock before processing a specific work item.

Summary

Postgres advisory locks are a simple, effective distributed locking primitive that is easy to overlook. If you already have Postgres in your stack and do not need the overhead of a dedicated locking service, they are worth considering. The main things to get right are keeping the lock connections off the CDI-managed datasource and managing the connection lifecycle carefully so locks are not held longer than intended. Wrapping the whole thing behind java.util.concurrent.Lock keeps the callers clean and leaves room to swap in a different backend; Hazelcast, Redis, or anything else; later without touching the code that uses the locks.

The implementation worked well enough in production that it was extracted into a standalone library for reuse across other projects.

import lombok.Getter;
import lombok.val;
import org.jboss.logging.Logger;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
public class PgAdvisoryLock implements Lock {
private static final Logger logger = Logger.getLogger(PgAdvisoryLock.class);
private static final String NON_BLOCKING_LOCK_SQL = "SELECT pg_try_advisory_lock(?)";
private static final String BLOCKING_LOCK_SQL = "SELECT pg_advisory_lock(?)";
private final PostgresAdvisoryLockMgr.PgLockConnection pgConnection;
@Getter
private final String lockName;
@Getter
private long lockId;
@Getter
private boolean lockGranted;
@Getter
private final String lessee;
private final PostgresAdvisoryLockMgr lockMgr;
public PgAdvisoryLock(PostgresAdvisoryLockMgr lockMgr, String lockName, String lessee) throws SQLException {
this.lockMgr = lockMgr;
this.lockName = lockName;
this.lockId = toPgLockId(lockName);
this.lessee = lessee;
this.lockGranted = false;
this.pgConnection = lockMgr.getConnection(this);
}
@Override
public void lock() {
try {
boolean result = this.blockingLock();
if (!result) {
throw new LockUnavailableException("lock() blockingLock() returned false: " + this, null);
}
} catch(InterruptedException e) {
if (Thread.interrupted()) {
Thread.currentThread().interrupt();
}
throw new LockUnavailableException("lock() blockingLock() threw on: " + e.getClass().getSimpleName() + this + " " + e.getMessage(), e);
}
}
@Override
public void lockInterruptibly() throws InterruptedException {
this.blockingLock();
}
@Override
public boolean tryLock() {
try {
return nonBlockingLock();
} catch(Exception e) {
if (Thread.interrupted()) {
Thread.currentThread().interrupt();
}
throw new LockUnavailableException("lock() nonBlockingLock() threw on: " + e.getClass().getSimpleName() + this + " " + e.getMessage(), e);
}
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
// need to impl this but w/ a custom connection timeout?
throw new UnsupportedOperationException("tryLock(long time, TimeUnit unit) is not supported");
}
private boolean doPgLockLogic(String lockSql) throws InterruptedException {
boolean blocking = lockSql.equals(BLOCKING_LOCK_SQL);
Connection connection = pgConnection.connection;
try (val statement = connection.prepareStatement(lockSql)) {
statement.setLong(1, lockId);
try (val result = statement.executeQuery()) {
if (result.next()) {
if (blocking) {
lockGranted = true;
} else {
lockGranted = result.getBoolean(1);
}
} else {
lockGranted = false;
}
}
if (lockGranted) {
logger.debug("doPgLockLogic(blocking:"+blocking+") GRANTED " + this);
} else {
logger.debug("doPgLockLogic(blocking:"+blocking+") NOT GRANTED " + this);
}
} catch(Throwable e) {
String msg = "doPgLockLogic(blocking:"+blocking+") ERROR " + this +
" error: " + e.getMessage();
logger.error(msg,e);
if (Thread.interrupted()) {
throw new InterruptedException();
}
}
return lockGranted;
}
private boolean nonBlockingLock() throws InterruptedException {
return doPgLockLogic(NON_BLOCKING_LOCK_SQL);
}
private boolean blockingLock() throws InterruptedException {
return doPgLockLogic(BLOCKING_LOCK_SQL);
}
public void unlock() {
try {
if (!lockGranted) {
return;
}
var unlockSuccess = false;
Connection connection = pgConnection.connection;
try (val statement = connection.prepareStatement("SELECT pg_advisory_unlock(?)")) {
statement.setLong(1, lockId);
try (val result = statement.executeQuery()) {
if (result.next()) {
unlockSuccess = result.getBoolean(1);
}
}
}
if (unlockSuccess) {
logger.debug("unlock() RELEASED " + this);
} else {
logger.debug("unlock() NOT RELEASED " + this);
}
} catch (Throwable e) {
String msg = "unlock() FAILED " + this + " error:" +e.getMessage();
logger.error(msg,e);
throw new LockUnavailableException(msg, e);
} finally {
// defer to mgr for unlock connection handling
this.lockMgr.handleUnlock(this);
}
}
@Override
public Condition newCondition() {
throw new UnsupportedOperationException("newCondition() is not supported");
}
private static long toPgLockId(String lockId) {
return lockId.hashCode() & Long.MAX_VALUE;
}
@Override
public String toString() {
return "["+getClass().getSimpleName()+"]{ lessee: " + lessee +
", lockName: " + lockName + ", lockId :" + lockId + " " + this.pgConnection + " }";
}
}
import io.agroal.api.AgroalDataSource;
import jakarta.inject.Singleton;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.jboss.logging.Logger;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Map;
import java.util.Stack;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.Lock;

@Singleton
public class PostgresAdvisoryLockMgr implements LockMgr {
private static final Logger logger = Logger.getLogger(PostgresAdvisoryLockMgr.class);
private final AgroalDataSource lockDatasource;
private Map<Long, PgLockConnection> thread2connection = new ConcurrentHashMap<>();
private Map<Long, Stack<PgAdvisoryLock>> thread2locks = new ConcurrentHashMap<>();
public PostgresAdvisoryLockMgr(@ConfigProperty(name="quarkus.datasource.jdbc.url") String jdbcUrl,
@ConfigProperty(name="quarkus.datasource.username") String username,
@ConfigProperty(name="quarkus.datasource.password") String password) {
/**
* WE have to do this, (totally separate DS) otherwise all the hibernate
* TX joining magic with all the AOP screws everything up.
*/
try {
this.lockDatasource = PgLockDatasource.create(jdbcUrl,username,password);
logger.info("PostgresAdvisoryLockMgr() dedicated datasource " +
"created OK: jdbcUrl:" + jdbcUrl + " username:"+username);
} catch(Exception e) {
String msg = "PostgresAdvisoryLockMgr() unexpected error creating dedicated AgroalDataSource: " + e.getMessage();
logger.error(msg,e);
throw new RuntimeException(msg,e);
}
}
protected class PgLockConnection {
protected Connection connection;
protected Integer pgPid;
protected PgLockConnection(Connection conn, Integer pgPid) {
this.connection = conn;
this.pgPid = pgPid;
}
public void close() throws SQLException {
if (this.connection != null){
this.connection.close();
}
}
@Override
public String toString() {
return "["+getClass().getSimpleName()+"].pg_pid:"+this.pgPid;
}
}
private void registerLock(PgAdvisoryLock lock) {
Long threadId = Thread.currentThread().threadId();
Stack<PgAdvisoryLock> locks = thread2locks.get(threadId);
if (locks == null){
locks = new Stack<>();
thread2locks.put(threadId,locks);
}
locks.push(lock);
logger.debug("registerLock() registered lock: " + lock);
}
protected void handleUnlock(PgAdvisoryLock lock) {
Long threadId = Thread.currentThread().threadId();
Stack<PgAdvisoryLock> locks = thread2locks.get(threadId);
if (locks == null){
throw new NoPgAdvisoryLocksForThreadException("handleUnlock() unexpected error: current thread: " + threadId +
"has no Stack<Locks>!!!");
}
logger.debug("handleUnlock() popping lock off stack: " + lock);
locks.pop();
if (locks.isEmpty()) {
PgLockConnection pgConn = thread2connection.get(threadId);
if (pgConn != null){
try {
logger.debug("handleUnlock() final lock, closing JDBC conn: " + lock);
pgConn.close();
} catch(Throwable e) {
logger.error("handleUnlock() lock: lessee: " + lock.getLessee() +
" lockName:"+lock.getLockName() + " lockId:" + lock.getLockId() + "" +
"unexpected error attempting to close() connection: "+ e.getMessage(),e);
}
}
// smoke the thread tracked
this.thread2locks.remove(threadId);
this.thread2connection.remove(threadId);
}
}
private static int getConnectionId(Connection connection) throws SQLException {
try (PreparedStatement statement = connection.prepareStatement("SELECT pg_backend_pid()")) {
try (ResultSet resultSet = statement.executeQuery()) {
if (resultSet.next()) {
return resultSet.getInt(1);
} else {
throw new SQLException("Failed to retrieve connection ID.");
}
}
} catch(Throwable e) {
logger.error("getConnectionId() unexpected error: " + e.getMessage(),e);
return -999;
}
}
protected PgLockConnection getConnection(PgAdvisoryLock lock) throws SQLException {
Long threadId = Thread.currentThread().threadId();
PgLockConnection pgConn = thread2connection.get(threadId);
if (pgConn == null){
logger.debug("getConnection() initializing JDBC conn for lock: " + lock);
Connection conn = this.lockDatasource.getConnection();
Integer pgPid = getConnectionId(conn);
pgConn = new PgLockConnection(conn,pgPid);
thread2connection.put(Thread.currentThread().threadId(),pgConn);
}
return pgConn;
}
public Lock getLock(String lockName, String lessee) throws LockUnavailableException {
lessee = String.format("%s:%s", lessee, Thread.currentThread().getName());
PgAdvisoryLock lock = null;
try {
lock = new PgAdvisoryLock(this, lockName, lessee);
registerLock(lock);
return lock;
} catch (Throwable e) {
String msg = "requestLock() Failed to establish lock lessee: "+lessee+" lockName: " +
lockName + " -> lockId :"+ (lock != null ?lock.getLockId():null) + " msg:"+e.getMessage();
logger.error(msg);
throw new LockUnavailableException(msg,e);
}
}
}

Originally published at http://bitsofinfo.wordpress.com on May 30, 2024.


Poor man’s distributed locking was originally published in Level Up Coding on Medium, where people are continuing the conversation by highlighting and responding to this story.

This article was originally published on Level Up Coding and is republished here under RSS syndication for informational purposes. All rights and intellectual property remain with the original author. If you are the author and wish to have this article removed, please contact us at [email protected].

NexaPay — Accept Card Payments, Receive Crypto

No KYC · Instant Settlement · Visa, Mastercard, Apple Pay, Google Pay

Get Started →