PostgreSQL UPSERT в TypeORM: Решение Race Conditions с Raw SQL
Как обойти ограничения TypeORM и использовать атомарные операции PostgreSQL для предотвращения race conditions при параллельном обновлении статистики
Проблема
Представьте, что у вас есть веб-приложение с системой статистики, которая агрегирует данные в реальном времени. Например, отслеживание количества действий пользователей, подсчёт транзакций или аналитика событий.
При высокой нагрузке несколько пользователей могут одновременно выполнять действия, которые обновляют одну и ту же запись статистики. Это создаёт race condition — ситуацию, когда параллельные операции читают одно и то же значение и перезаписывают друг друга.
Типичный код с проблемой
// ❌ НЕ ДЕЛАЙТЕ ТАК!
async updateStatistics(eventType: string, value: number) {
// 1. Читаем текущее значение
let stats = await this.statsRepository.findOne({
where: { eventType }
});
if (!stats) {
// 2. Создаём, если не существует
stats = this.statsRepository.create({
eventType,
totalCount: 0,
totalValue: 0
});
}
// 3. Обновляем значения
stats.totalCount += 1;
stats.totalValue += value;
// 4. Сохраняем
await this.statsRepository.save(stats);
}
Что пойдёт не так?
Сценарий: 3 запроса приходят одновременно
Запрос A: читает totalCount=100
Запрос B: читает totalCount=100 (то же значение!)
Запрос C: читает totalCount=100 (то же значение!)
Запрос A: сохраняет totalCount=101
Запрос B: сохраняет totalCount=101 (перезаписал!)
Запрос C: сохраняет totalCount=101 (перезаписал!)
Результат: totalCount=101 вместо 103 ❌
Потеряны 2 обновления из 3!
Попытка решения №1: Множественные Increment
// ❌ Приводит к deadlock
async updateStatistics(eventType: string, value: number) {
await this.statsRepository.increment(
{ eventType },
'totalCount',
1
);
await this.statsRepository.increment(
{ eventType },
'totalValue',
value
);
await this.statsRepository.increment(
{ eventType },
'totalEvents',
1
);
}
Проблема: При высокой нагрузке возникает deadlock, потому что PostgreSQL пытается заблокировать одну и ту же строку несколько раз в разном порядке.
Error: deadlock detected
Detail: Process A waits for lock on statistics row
Process B waits for lock on statistics row
Решение №1: TypeORM Upsert (Рекомендуется для простых случаев)
TypeORM предоставляет встроенный метод upsert(), который работает отлично для простых сценариев без сложных вычислений.
Структура таблицы
@Entity()
@Index(['eventType'], { unique: true })
export class Statistics {
@PrimaryGeneratedColumn('uuid')
id: string;
@Column({ unique: true })
eventType: string;
@Column({ type: 'int', default: 0 })
totalCount: number;
@Column({ type: 'bigint', default: 0 })
totalValue: number;
@Column({ type: 'double precision', default: 0 })
averageValue: number;
@CreateDateColumn()
lastUpdated: Date;
}
Простая реализация с TypeORM
async updateStatistics(eventType: string, value: number): Promise<void> {
// ⚠️ ПРОБЛЕМА: upsert() не поддерживает инкремент из коробки
await this.statsRepository.upsert(
{
eventType,
totalCount: 1,
totalValue: value,
averageValue: value,
},
{
conflictPaths: ['eventType'],
skipUpdateIfNoValuesChanged: false,
}
);
// Это создаст запись, но НЕ сделает инкремент при конфликте!
}
Проблема: TypeORM upsert() перезаписывает значения вместо инкремента. Это не подходит для счётчиков!
Решение №2: Raw SQL с UPSERT (Правильное решение)
Для инкрементальных обновлений нужен нативный PostgreSQL INSERT ... ON CONFLICT DO UPDATE.
Правильная реализация
async updateStatistics(eventType: string, value: number): Promise<void> {
await this.statsRepository.manager.query(
`
INSERT INTO statistics ("eventType", "totalCount", "totalValue", "averageValue")
VALUES ($1, 1, $2::bigint, $2::numeric)
ON CONFLICT ("eventType")
DO UPDATE SET
"totalCount" = statistics."totalCount" + 1,
"totalValue" = statistics."totalValue" + $2::bigint,
"averageValue" = CASE
WHEN statistics."totalCount" + 1 > 0
THEN ROUND(
((statistics."totalValue" + $2::bigint)::numeric / (statistics."totalCount" + 1)::numeric),
2
)
ELSE 0
END,
"lastUpdated" = CURRENT_TIMESTAMP
`,
[eventType, value]
);
}
Почему это работает?
- Атомарность: Вся операция выполняется как единая транзакция
- Уникальный индекс:
ON CONFLICT ("eventType")использует индекс для определения конфликта - Инкремент на уровне SQL:
statistics."totalCount" + 1— БД сама управляет блокировками - Расчёт в запросе: Среднее значение рассчитывается сразу, без дополнительных запросов
Тестирование параллельности
Без UPSERT (старый код)
// Запускаем 100 параллельных обновлений
await Promise.all(
Array.from({ length: 100 }, (_, i) =>
updateStatistics('test_event', 10)
)
);
// Ожидаем: totalCount=100, totalValue=1000
// Реально: totalCount=37, totalValue=420 ❌
С UPSERT (новый код)
// Запускаем 100 параллельных обновлений
await Promise.all(
Array.from({ length: 100 }, (_, i) =>
updateStatistics('test_event', 10)
)
);
// Результат: totalCount=100, totalValue=1000 ✅
Важные детали реализации
1. Уникальный индекс обязателен
@Index(['eventType'], { unique: true })
Без уникального индекса ON CONFLICT не будет работать.
2. Явное приведение типов
$2::bigint -- Для больших чисел
$2::numeric -- Для вычислений с плавающей точкой
PostgreSQL может выдавать ошибку inconsistent types deduced for parameter без явного приведения типов.
3. NULL-безопасность
Для составных ключей учитывайте NULL:
@Index(['gameType', 'categoryId'], { unique: true })
// В запросе:
const categoryId = category || null; // Явно используем null, не undefined
Миграция для добавления индекса
import { MigrationInterface, QueryRunner } from "typeorm";
export class AddUniqueIndexToStatistics1234567890
implements MigrationInterface {
public async up(queryRunner: QueryRunner): Promise<void> {
// Удаляем дубликаты перед созданием индекса
await queryRunner.query(`
DELETE FROM statistics
WHERE id NOT IN (
SELECT id
FROM (
SELECT
id,
ROW_NUMBER() OVER (
PARTITION BY "eventType"
ORDER BY "totalCount" DESC, "lastUpdated" DESC
) as rn
FROM statistics
) t
WHERE t.rn = 1
)
`);
// Создаём уникальный индекс
await queryRunner.query(`
CREATE UNIQUE INDEX "IDX_statistics_event_type"
ON "statistics" ("eventType")
`);
}
public async down(queryRunner: QueryRunner): Promise<void> {
await queryRunner.query(`
DROP INDEX "IDX_statistics_event_type"
`);
}
}
Производительность
Нагрузочное тестирование
// Тест: 10,000 параллельных обновлений
const startTime = Date.now();
await Promise.all(
Array.from({ length: 10000 }, () =>
updateStatistics('load_test', Math.floor(Math.random() * 100))
)
);
const duration = Date.now() - startTime;
console.log(`Completed in ${duration}ms`);
// Результаты:
// Старый код (с race condition): ~8500ms, данные некорректны
// Новый код (UPSERT): ~2300ms, данные корректны ✅
UPSERT не только корректнее, но и быстрее благодаря отсутствию дополнительных запросов на чтение.
Когда использовать UPSERT
✅ Подходит для:
- Счётчики и агрегированная статистика
- Системы аналитики в реальном времени
- Отслеживание событий пользователей
- Инкрементальные обновления
- Высоконагруженные API
❌ НЕ подходит для:
- Сложной бизнес-логики с условиями
- Операций, требующих транзакций с несколькими таблицами
- Случаев, когда нужен полный контроль над логикой обновления
Решение №3: Pessimistic Locking (TypeORM, не рекомендуется)
TypeORM поддерживает пессимистичные блокировки, но они медленные и могут вызвать deadlock.
async updateStatistics(eventType: string, value: number): Promise<void> {
await this.statsRepository.manager.transaction(async (manager) => {
// Блокируем строку для записи
const stats = await manager.findOne(Statistics, {
where: { eventType },
lock: { mode: 'pessimistic_write' }
});
if (!stats) {
// Создаём новую запись
const newStats = manager.create(Statistics, {
eventType,
totalCount: 1,
totalValue: value,
averageValue: value,
});
await manager.save(newStats);
return;
}
// Обновляем существующую
stats.totalCount += 1;
stats.totalValue += value;
stats.averageValue = stats.totalValue / stats.totalCount;
await manager.save(stats);
});
}
Минусы:
- Медленнее из-за блокировок (в 3-5 раз)
- Может вызвать deadlock при высокой нагрузке
- Требует дополнительного SELECT запроса
Сравнение решений
| Метод | Скорость | Сложность | Race-safe | Deadlock-safe |
|---|---|---|---|---|
| TypeORM upsert() | ⚠️ Не работает для инкремента | ✅ Простой | ❌ Нет | ✅ Да |
| Raw SQL UPSERT | ✅ Быстро | ⚠️ Средняя | ✅ Да | ✅ Да |
| Pessimistic Lock | ❌ Медленно | ⚠️ Средняя | ✅ Да | ❌ Нет |
| increment() x3 | ❌ Очень медленно | ✅ Простой | ✅ Да | ❌ Нет |
Вывод: Для счётчиков используйте Raw SQL UPSERT.
Как другие ORM решают эту проблему?
Prisma
Важно: Prisma НЕ поддерживает increment внутри upsert().
Вы можете использовать increment только в update():
// ✅ Работает в update()
await prisma.statistics.update({
where: { eventType },
data: {
totalCount: { increment: 1 },
totalValue: { increment: value },
},
});
// ❌ НЕ работает в upsert()
await prisma.statistics.upsert({
where: { eventType },
update: {
totalCount: { increment: 1 }, // Error: increment не поддерживается
},
create: {...},
});
Для UPSERT с инкрементом в Prisma придётся использовать Raw SQL:
await prisma.$executeRaw`
INSERT INTO statistics (eventType, totalCount, totalValue)
VALUES (${eventType}, 1, ${value})
ON CONFLICT (eventType)
DO UPDATE SET
totalCount = statistics.totalCount + 1,
totalValue = statistics.totalValue + ${value}
`;
Вывод: Prisma требует Raw SQL для UPSERT с инкрементом, как и TypeORM.
Sequelize
await Statistics.upsert({
eventType,
totalCount: sequelize.literal('totalCount + 1'),
totalValue: sequelize.literal(`totalValue + ${value}`),
}, {
conflictFields: ['eventType']
});
Проблема: sequelize.literal() обходит SQL injection защиту. Нужна осторожность!
Drizzle ORM
await db
.insert(statistics)
.values({
eventType,
totalCount: 1,
totalValue: value,
})
.onConflictDoUpdate({
target: statistics.eventType,
set: {
totalCount: sql`${statistics.totalCount} + 1`,
totalValue: sql`${statistics.totalValue} + ${value}`,
},
});
Плюсы:
- ✅ Type-safe
- ✅ Близко к SQL
- ✅ Поддерживает
sqltemplate literals
Knex.js (Query Builder)
await knex('statistics')
.insert({
eventType,
totalCount: 1,
totalValue: value,
})
.onConflict('eventType')
.merge({
totalCount: knex.raw('statistics.totalCount + 1'),
totalValue: knex.raw('statistics.totalValue + ?', [value]),
});
Сравнение ORM
| ORM | Upsert | Инкремент в UPSERT | Сложные вычисления | Type-safe |
|---|---|---|---|---|
| TypeORM | ⚠️ Есть | ❌ Нет (только Raw SQL) | ⚠️ Raw SQL | ✅ Да |
| Prisma | ✅ Да | ❌ Нет (только Raw SQL) | ❌ Нет | ✅ Да |
| Sequelize | ✅ Да | ⚠️ literal() (опасно) | ⚠️ literal() | ❌ Нет |
| Drizzle | ✅ Да | ✅ Да (sql template) |
✅ Да | ✅ Да |
| Knex | ✅ Да | ✅ Да (knex.raw()) |
✅ Да | ❌ Нет |
Лучший выбор для race-safe инкремента в UPSERT:
- Drizzle ORM — современный, type-safe, удобный API
- Knex.js — гибкий query builder, но без type safety
- TypeORM/Prisma с Raw SQL — когда уже используете эти ORM
Примеры для других БД
MySQL (MariaDB)
INSERT INTO statistics (eventType, totalCount, totalValue)
VALUES (?, 1, ?)
ON DUPLICATE KEY UPDATE
totalCount = totalCount + 1,
totalValue = totalValue + VALUES(totalValue)
SQLite
INSERT INTO statistics (eventType, totalCount, totalValue)
VALUES (?, 1, ?)
ON CONFLICT(eventType) DO UPDATE SET
totalCount = totalCount + 1,
totalValue = totalValue + excluded.totalValue
Другие альтернативы
Redis для промежуточного хранения
await redis.incr(`stats:${eventType}:count`);
// Периодически сбрасываем в PostgreSQL
Плюсы: Очень быстро
Минусы: Сложность архитектуры, риск потери данных
Выводы
- Race conditions — реальная проблема в высоконагруженных приложениях
- UPSERT (
INSERT ... ON CONFLICT) — элегантное решение на уровне БД - Операция атомарная и быстрее альтернатив
- Требует уникального индекса для работы
- Используйте явное приведение типов в SQL для надёжности
PostgreSQL предоставляет мощные инструменты для работы с конкурентными обновлениями. Используйте их правильно, и ваше приложение будет работать стабильно даже под высокой нагрузкой.