ByteGuide
← All articles🇬🇧 English

PostgreSQL UPSERT в TypeORM: Решение Race Conditions с Raw SQL

Как обойти ограничения TypeORM и использовать атомарные операции PostgreSQL для предотвращения race conditions при параллельном обновлении статистики

postgresqltypeormdatabaseconcurrencyupsertrace-conditions
⏱️ 9 min read

Проблема

Представьте, что у вас есть веб-приложение с системой статистики, которая агрегирует данные в реальном времени. Например, отслеживание количества действий пользователей, подсчёт транзакций или аналитика событий.

При высокой нагрузке несколько пользователей могут одновременно выполнять действия, которые обновляют одну и ту же запись статистики. Это создаёт race condition — ситуацию, когда параллельные операции читают одно и то же значение и перезаписывают друг друга.

Типичный код с проблемой

// ❌ НЕ ДЕЛАЙТЕ ТАК!
async updateStatistics(eventType: string, value: number) {
  // 1. Читаем текущее значение
  let stats = await this.statsRepository.findOne({
    where: { eventType }
  });

  if (!stats) {
    // 2. Создаём, если не существует
    stats = this.statsRepository.create({
      eventType,
      totalCount: 0,
      totalValue: 0
    });
  }

  // 3. Обновляем значения
  stats.totalCount += 1;
  stats.totalValue += value;

  // 4. Сохраняем
  await this.statsRepository.save(stats);
}

Что пойдёт не так?

Сценарий: 3 запроса приходят одновременно

Запрос A: читает totalCount=100
Запрос B: читает totalCount=100 (то же значение!)
Запрос C: читает totalCount=100 (то же значение!)

Запрос A: сохраняет totalCount=101
Запрос B: сохраняет totalCount=101 (перезаписал!)
Запрос C: сохраняет totalCount=101 (перезаписал!)

Результат: totalCount=101 вместо 103 ❌

Потеряны 2 обновления из 3!

Попытка решения №1: Множественные Increment

// ❌ Приводит к deadlock
async updateStatistics(eventType: string, value: number) {
  await this.statsRepository.increment(
    { eventType }, 
    'totalCount', 
    1
  );
  await this.statsRepository.increment(
    { eventType }, 
    'totalValue', 
    value
  );
  await this.statsRepository.increment(
    { eventType }, 
    'totalEvents', 
    1
  );
}

Проблема: При высокой нагрузке возникает deadlock, потому что PostgreSQL пытается заблокировать одну и ту же строку несколько раз в разном порядке.

Error: deadlock detected
Detail: Process A waits for lock on statistics row
Process B waits for lock on statistics row

Решение №1: TypeORM Upsert (Рекомендуется для простых случаев)

TypeORM предоставляет встроенный метод upsert(), который работает отлично для простых сценариев без сложных вычислений.

Структура таблицы

@Entity()
@Index(['eventType'], { unique: true })
export class Statistics {
  @PrimaryGeneratedColumn('uuid')
  id: string;

  @Column({ unique: true })
  eventType: string;

  @Column({ type: 'int', default: 0 })
  totalCount: number;

  @Column({ type: 'bigint', default: 0 })
  totalValue: number;

  @Column({ type: 'double precision', default: 0 })
  averageValue: number;

  @CreateDateColumn()
  lastUpdated: Date;
}

Простая реализация с TypeORM

async updateStatistics(eventType: string, value: number): Promise<void> {
  // ⚠️ ПРОБЛЕМА: upsert() не поддерживает инкремент из коробки
  await this.statsRepository.upsert(
    {
      eventType,
      totalCount: 1,
      totalValue: value,
      averageValue: value,
    },
    {
      conflictPaths: ['eventType'],
      skipUpdateIfNoValuesChanged: false,
    }
  );
  
  // Это создаст запись, но НЕ сделает инкремент при конфликте!
}

Проблема: TypeORM upsert() перезаписывает значения вместо инкремента. Это не подходит для счётчиков!

Решение №2: Raw SQL с UPSERT (Правильное решение)

Для инкрементальных обновлений нужен нативный PostgreSQL INSERT ... ON CONFLICT DO UPDATE.

Правильная реализация

async updateStatistics(eventType: string, value: number): Promise<void> {
  await this.statsRepository.manager.query(
    `
    INSERT INTO statistics ("eventType", "totalCount", "totalValue", "averageValue")
    VALUES ($1, 1, $2::bigint, $2::numeric)
    ON CONFLICT ("eventType") 
    DO UPDATE SET
      "totalCount" = statistics."totalCount" + 1,
      "totalValue" = statistics."totalValue" + $2::bigint,
      "averageValue" = CASE 
        WHEN statistics."totalCount" + 1 > 0 
        THEN ROUND(
          ((statistics."totalValue" + $2::bigint)::numeric / (statistics."totalCount" + 1)::numeric), 
          2
        )
        ELSE 0 
      END,
      "lastUpdated" = CURRENT_TIMESTAMP
    `,
    [eventType, value]
  );
}

Почему это работает?

  1. Атомарность: Вся операция выполняется как единая транзакция
  2. Уникальный индекс: ON CONFLICT ("eventType") использует индекс для определения конфликта
  3. Инкремент на уровне SQL: statistics."totalCount" + 1 — БД сама управляет блокировками
  4. Расчёт в запросе: Среднее значение рассчитывается сразу, без дополнительных запросов

Тестирование параллельности

Без UPSERT (старый код)

// Запускаем 100 параллельных обновлений
await Promise.all(
  Array.from({ length: 100 }, (_, i) =>
    updateStatistics('test_event', 10)
  )
);

// Ожидаем: totalCount=100, totalValue=1000
// Реально: totalCount=37, totalValue=420 ❌

С UPSERT (новый код)

// Запускаем 100 параллельных обновлений
await Promise.all(
  Array.from({ length: 100 }, (_, i) =>
    updateStatistics('test_event', 10)
  )
);

// Результат: totalCount=100, totalValue=1000 ✅

Важные детали реализации

1. Уникальный индекс обязателен

@Index(['eventType'], { unique: true })

Без уникального индекса ON CONFLICT не будет работать.

2. Явное приведение типов

$2::bigint  -- Для больших чисел
$2::numeric -- Для вычислений с плавающей точкой

PostgreSQL может выдавать ошибку inconsistent types deduced for parameter без явного приведения типов.

3. NULL-безопасность

Для составных ключей учитывайте NULL:

@Index(['gameType', 'categoryId'], { unique: true })

// В запросе:
const categoryId = category || null; // Явно используем null, не undefined

Миграция для добавления индекса

import { MigrationInterface, QueryRunner } from "typeorm";

export class AddUniqueIndexToStatistics1234567890 
  implements MigrationInterface {
  
  public async up(queryRunner: QueryRunner): Promise<void> {
    // Удаляем дубликаты перед созданием индекса
    await queryRunner.query(`
      DELETE FROM statistics
      WHERE id NOT IN (
        SELECT id
        FROM (
          SELECT 
            id,
            ROW_NUMBER() OVER (
              PARTITION BY "eventType" 
              ORDER BY "totalCount" DESC, "lastUpdated" DESC
            ) as rn
          FROM statistics
        ) t
        WHERE t.rn = 1
      )
    `);

    // Создаём уникальный индекс
    await queryRunner.query(`
      CREATE UNIQUE INDEX "IDX_statistics_event_type" 
      ON "statistics" ("eventType")
    `);
  }

  public async down(queryRunner: QueryRunner): Promise<void> {
    await queryRunner.query(`
      DROP INDEX "IDX_statistics_event_type"
    `);
  }
}

Производительность

Нагрузочное тестирование

// Тест: 10,000 параллельных обновлений
const startTime = Date.now();

await Promise.all(
  Array.from({ length: 10000 }, () =>
    updateStatistics('load_test', Math.floor(Math.random() * 100))
  )
);

const duration = Date.now() - startTime;
console.log(`Completed in ${duration}ms`);

// Результаты:
// Старый код (с race condition): ~8500ms, данные некорректны
// Новый код (UPSERT): ~2300ms, данные корректны ✅

UPSERT не только корректнее, но и быстрее благодаря отсутствию дополнительных запросов на чтение.

Когда использовать UPSERT

Подходит для:

  • Счётчики и агрегированная статистика
  • Системы аналитики в реальном времени
  • Отслеживание событий пользователей
  • Инкрементальные обновления
  • Высоконагруженные API

НЕ подходит для:

  • Сложной бизнес-логики с условиями
  • Операций, требующих транзакций с несколькими таблицами
  • Случаев, когда нужен полный контроль над логикой обновления

Решение №3: Pessimistic Locking (TypeORM, не рекомендуется)

TypeORM поддерживает пессимистичные блокировки, но они медленные и могут вызвать deadlock.

async updateStatistics(eventType: string, value: number): Promise<void> {
  await this.statsRepository.manager.transaction(async (manager) => {
    // Блокируем строку для записи
    const stats = await manager.findOne(Statistics, {
      where: { eventType },
      lock: { mode: 'pessimistic_write' }
    });
    
    if (!stats) {
      // Создаём новую запись
      const newStats = manager.create(Statistics, {
        eventType,
        totalCount: 1,
        totalValue: value,
        averageValue: value,
      });
      await manager.save(newStats);
      return;
    }
    
    // Обновляем существующую
    stats.totalCount += 1;
    stats.totalValue += value;
    stats.averageValue = stats.totalValue / stats.totalCount;
    await manager.save(stats);
  });
}

Минусы:

  • Медленнее из-за блокировок (в 3-5 раз)
  • Может вызвать deadlock при высокой нагрузке
  • Требует дополнительного SELECT запроса

Сравнение решений

Метод Скорость Сложность Race-safe Deadlock-safe
TypeORM upsert() ⚠️ Не работает для инкремента ✅ Простой ❌ Нет ✅ Да
Raw SQL UPSERT ✅ Быстро ⚠️ Средняя ✅ Да ✅ Да
Pessimistic Lock ❌ Медленно ⚠️ Средняя ✅ Да ❌ Нет
increment() x3 ❌ Очень медленно ✅ Простой ✅ Да ❌ Нет

Вывод: Для счётчиков используйте Raw SQL UPSERT.

Как другие ORM решают эту проблему?

Prisma

Важно: Prisma НЕ поддерживает increment внутри upsert().

Вы можете использовать increment только в update():

// ✅ Работает в update()
await prisma.statistics.update({
  where: { eventType },
  data: {
    totalCount: { increment: 1 },
    totalValue: { increment: value },
  },
});

// ❌ НЕ работает в upsert()
await prisma.statistics.upsert({
  where: { eventType },
  update: {
    totalCount: { increment: 1 }, // Error: increment не поддерживается
  },
  create: {...},
});

Для UPSERT с инкрементом в Prisma придётся использовать Raw SQL:

await prisma.$executeRaw`
  INSERT INTO statistics (eventType, totalCount, totalValue)
  VALUES (${eventType}, 1, ${value})
  ON CONFLICT (eventType) 
  DO UPDATE SET
    totalCount = statistics.totalCount + 1,
    totalValue = statistics.totalValue + ${value}
`;

Вывод: Prisma требует Raw SQL для UPSERT с инкрементом, как и TypeORM.

Sequelize

await Statistics.upsert({
  eventType,
  totalCount: sequelize.literal('totalCount + 1'),
  totalValue: sequelize.literal(`totalValue + ${value}`),
}, {
  conflictFields: ['eventType']
});

Проблема: sequelize.literal() обходит SQL injection защиту. Нужна осторожность!

Drizzle ORM

await db
  .insert(statistics)
  .values({
    eventType,
    totalCount: 1,
    totalValue: value,
  })
  .onConflictDoUpdate({
    target: statistics.eventType,
    set: {
      totalCount: sql`${statistics.totalCount} + 1`,
      totalValue: sql`${statistics.totalValue} + ${value}`,
    },
  });

Плюсы:

  • ✅ Type-safe
  • ✅ Близко к SQL
  • ✅ Поддерживает sql template literals

Knex.js (Query Builder)

await knex('statistics')
  .insert({
    eventType,
    totalCount: 1,
    totalValue: value,
  })
  .onConflict('eventType')
  .merge({
    totalCount: knex.raw('statistics.totalCount + 1'),
    totalValue: knex.raw('statistics.totalValue + ?', [value]),
  });

Сравнение ORM

ORM Upsert Инкремент в UPSERT Сложные вычисления Type-safe
TypeORM ⚠️ Есть ❌ Нет (только Raw SQL) ⚠️ Raw SQL ✅ Да
Prisma ✅ Да ❌ Нет (только Raw SQL) ❌ Нет ✅ Да
Sequelize ✅ Да ⚠️ literal() (опасно) ⚠️ literal() ❌ Нет
Drizzle ✅ Да ✅ Да (sql template) ✅ Да ✅ Да
Knex ✅ Да ✅ Да (knex.raw()) ✅ Да ❌ Нет

Лучший выбор для race-safe инкремента в UPSERT:

  1. Drizzle ORM — современный, type-safe, удобный API
  2. Knex.js — гибкий query builder, но без type safety
  3. TypeORM/Prisma с Raw SQL — когда уже используете эти ORM

Примеры для других БД

MySQL (MariaDB)

INSERT INTO statistics (eventType, totalCount, totalValue)
VALUES (?, 1, ?)
ON DUPLICATE KEY UPDATE
  totalCount = totalCount + 1,
  totalValue = totalValue + VALUES(totalValue)

SQLite

INSERT INTO statistics (eventType, totalCount, totalValue)
VALUES (?, 1, ?)
ON CONFLICT(eventType) DO UPDATE SET
  totalCount = totalCount + 1,
  totalValue = totalValue + excluded.totalValue

Другие альтернативы

Redis для промежуточного хранения

await redis.incr(`stats:${eventType}:count`);
// Периодически сбрасываем в PostgreSQL

Плюсы: Очень быстро
Минусы: Сложность архитектуры, риск потери данных

Выводы

  1. Race conditions — реальная проблема в высоконагруженных приложениях
  2. UPSERT (INSERT ... ON CONFLICT) — элегантное решение на уровне БД
  3. Операция атомарная и быстрее альтернатив
  4. Требует уникального индекса для работы
  5. Используйте явное приведение типов в SQL для надёжности

PostgreSQL предоставляет мощные инструменты для работы с конкурентными обновлениями. Используйте их правильно, и ваше приложение будет работать стабильно даже под высокой нагрузкой.

Полезные ссылки