Я пытался ответить на следующий вопрос:
После публикации несколько наивного ответа я решил, что положу свои деньги туда, где был мой рот, и на самом деле проверил сценарий, который я предлагал, чтобы убедиться, что я не отправляю ОП в погоне за диким гусем. Ну, это оказалось намного сложнее, чем я думал (я уверен, что никого это не удивит).
Вот что я пробовал и думал:
Сначала я попытался выполнить TOP 1 UPDATE с ORDER BY внутри производной таблицы, используя
ROWLOCK, READPAST
. Это приводило к тупикам, а также обрабатывало предметы из строя. Он должен быть как можно ближе к FIFO, исключая ошибки, которые требуют попытки обработать одну и ту же строку более одного раза.Затем я попытался выбрать нужный следующий QueueID в переменную, используя различные комбинации
READPAST
,UPDLOCK
,HOLDLOCK
иROWLOCK
исключительно сохранить строку для обновления на этой сессии. Все варианты, которые я попробовал, страдали теми же проблемами, что и раньше, а также, для некоторых комбинаций, сREADPAST
жалобами:Вы можете указать блокировку READPAST только на уровнях изоляции READ COMMITTED или REPEATABLE READ.
Это сбивало с толку, потому что это было ЧИТАЕТСЯ. Я сталкивался с этим раньше, и это расстраивает.
Поскольку я начал писать этот вопрос, Ремус Русани опубликовал новый ответ на этот вопрос. Я читаю его связанную статью и вижу, что он использует деструктивное чтение, поскольку он сказал в своем ответе, что «реально невозможно удерживать блокировки во время веб-вызовов». После прочтения того, что говорится в его статье о «горячих точках» и страницах, требующих блокировки, чтобы выполнить какое-либо обновление или удаление, я боюсь, что даже если бы мне удалось выработать правильные блокировки для выполнения того, что я ищу, он не был бы масштабируемым и мог не обрабатывать массовый параллелизм.
Прямо сейчас я не уверен, куда идти. Верно ли, что поддержание блокировок во время обработки строки не может быть достигнуто (даже если он не поддерживает высокую скорость передачи данных или массовый параллелизм)? Что мне не хватает?
В надежде, что люди, умнее меня и люди более опытные, чем я, могут помочь, ниже приведен тестовый скрипт, который я использовал. Он переключен обратно на метод TOP 1 UPDATE, но я оставил закомментированный другой метод на случай, если вы тоже захотите это изучить.
Вставьте каждый из них в отдельный сеанс, запустите сеанс 1, а затем быстро все остальные. Примерно через 50 секунд тест закончится. Посмотрите на Сообщения от каждого сеанса, чтобы увидеть, какую работу он проделал (или как это не удалось). В первом сеансе будет показан набор строк со снимком, сделанным один раз в секунду с подробным описанием имеющихся блокировок и обрабатываемых элементов очереди. Иногда это работает, а в другое время не работает вообще.
Сессия 1
/* Session 1: Setup and control - Run this session first, then immediately run all other sessions */
IF Object_ID('dbo.Queue', 'U') IS NULL
CREATE TABLE dbo.Queue (
QueueID int identity(1,1) NOT NULL,
StatusID int NOT NULL,
QueuedDate datetime CONSTRAINT DF_Queue_QueuedDate DEFAULT (GetDate()),
CONSTRAINT PK_Queue PRIMARY KEY CLUSTERED (QueuedDate, QueueID)
);
IF Object_ID('dbo.QueueHistory', 'U') IS NULL
CREATE TABLE dbo.QueueHistory (
HistoryDate datetime NOT NULL,
QueueID int NOT NULL
);
IF Object_ID('dbo.LockHistory', 'U') IS NULL
CREATE TABLE dbo.LockHistory (
HistoryDate datetime NOT NULL,
ResourceType varchar(100),
RequestMode varchar(100),
RequestStatus varchar(100),
ResourceDescription varchar(200),
ResourceAssociatedEntityID varchar(200)
);
IF Object_ID('dbo.StartTime', 'U') IS NULL
CREATE TABLE dbo.StartTime (
StartTime datetime NOT NULL
);
SET NOCOUNT ON;
IF (SELECT Count(*) FROM dbo.Queue) < 10000 BEGIN
TRUNCATE TABLE dbo.Queue;
WITH A (N) AS (SELECT 1 UNION ALL SELECT 1 UNION ALL SELECT 1 UNION ALL SELECT 1),
B (N) AS (SELECT 1 FROM A Z, A I, A P),
C (N) AS (SELECT Row_Number() OVER (ORDER BY (SELECT 1)) FROM B O, B W)
INSERT dbo.Queue (StatusID, QueuedDate)
SELECT 1, DateAdd(millisecond, C.N * 3, GetDate() - '00:05:00')
FROM C
WHERE C.N <= 10000;
END;
TRUNCATE TABLE dbo.StartTime;
INSERT dbo.StartTime SELECT GetDate() + '00:00:15'; -- or however long it takes you to go run the other sessions
GO
TRUNCATE TABLE dbo.QueueHistory;
SET NOCOUNT ON;
DECLARE
@Time varchar(8),
@Now datetime;
SELECT @Time = Convert(varchar(8), StartTime, 114)
FROM dbo.StartTime;
WAITFOR TIME @Time;
DECLARE @i int,
@QueueID int;
SET @i = 1;
WHILE @i <= 33 BEGIN
SET @Now = GetDate();
INSERT dbo.QueueHistory
SELECT
@Now,
QueueID
FROM
dbo.Queue Q WITH (NOLOCK)
WHERE
Q.StatusID <> 1;
INSERT dbo.LockHistory
SELECT
@Now,
L.resource_type,
L.request_mode,
L.request_status,
L.resource_description,
L.resource_associated_entity_id
FROM
sys.dm_tran_current_transaction T
INNER JOIN sys.dm_tran_locks L
ON L.request_owner_id = T.transaction_id;
WAITFOR DELAY '00:00:01';
SET @i = @i + 1;
END;
WITH Cols AS (
SELECT *, Row_Number() OVER (PARTITION BY HistoryDate ORDER BY QueueID) Col
FROM dbo.QueueHistory
), P AS (
SELECT *
FROM
Cols
PIVOT (Max(QueueID) FOR Col IN ([1], [2], [3], [4], [5], [6], [7], [8])) P
)
SELECT L.*, P.[1], P.[2], P.[3], P.[4], P.[5], P.[6], P.[7], P.[8]
FROM
dbo.LockHistory L
FULL JOIN P
ON L.HistoryDate = P.HistoryDate
/* Clean up afterward
DROP TABLE dbo.StartTime;
DROP TABLE dbo.LockHistory;
DROP TABLE dbo.QueueHistory;
DROP TABLE dbo.Queue;
*/
Сессия 2
/* Session 2: Simulate an application instance holding a row locked for a long period, and eventually abandoning it. */
SET TRANSACTION ISOLATION LEVEL READ COMMITTED;
SET NOCOUNT ON;
SET XACT_ABORT ON;
DECLARE
@QueueID int,
@Time varchar(8);
SELECT @Time = Convert(varchar(8), StartTime + '0:00:01', 114)
FROM dbo.StartTime;
WAITFOR TIME @Time;
BEGIN TRAN;
--SET @QueueID = (
-- SELECT TOP 1 QueueID
-- FROM dbo.Queue WITH (READPAST, UPDLOCK)
-- WHERE StatusID = 1 -- ready
-- ORDER BY QueuedDate, QueueID
--);
--UPDATE dbo.Queue
--SET StatusID = 2 -- in process
----OUTPUT Inserted.*
--WHERE QueueID = @QueueID;
SET @QueueID = NULL;
UPDATE Q
SET Q.StatusID = 1, @QueueID = Q.QueueID
FROM (
SELECT TOP 1 *
FROM dbo.Queue WITH (ROWLOCK, READPAST)
WHERE StatusID = 1
ORDER BY QueuedDate, QueueID
) Q
PRINT @QueueID;
WAITFOR DELAY '00:00:20'; -- Release it partway through the test
ROLLBACK TRAN; -- Simulate client disconnecting
Сессия 3
/* Session 3: Run a near-continuous series of "failed" queue processing. */
SET TRANSACTION ISOLATION LEVEL READ COMMITTED;
SET XACT_ABORT ON;
SET NOCOUNT ON;
DECLARE
@QueueID int,
@EndDate datetime,
@NextDate datetime,
@Time varchar(8);
SELECT
@EndDate = StartTime + '0:00:33',
@Time = Convert(varchar(8), StartTime, 114)
FROM dbo.StartTime;
WAITFOR TIME @Time;
WHILE GetDate() < @EndDate BEGIN
BEGIN TRAN;
--SET @QueueID = (
-- SELECT TOP 1 QueueID
-- FROM dbo.Queue WITH (READPAST, UPDLOCK)
-- WHERE StatusID = 1 -- ready
-- ORDER BY QueuedDate, QueueID
--);
--UPDATE dbo.Queue
--SET StatusID = 2 -- in process
----OUTPUT Inserted.*
--WHERE QueueID = @QueueID;
SET @QueueID = NULL;
UPDATE Q
SET Q.StatusID = 1, @QueueID = Q.QueueID
FROM (
SELECT TOP 1 *
FROM dbo.Queue WITH (ROWLOCK, READPAST)
WHERE StatusID = 1
ORDER BY QueuedDate, QueueID
) Q
PRINT @QueueID;
SET @NextDate = GetDate() + '00:00:00.015';
WHILE GetDate() < @NextDate SET NOCOUNT ON;
ROLLBACK TRAN;
END
Сессия 4 и выше - сколько угодно
/* Session 4: "Process" the queue normally, one every second for 30 seconds. */
SET TRANSACTION ISOLATION LEVEL READ COMMITTED;
SET XACT_ABORT ON;
SET NOCOUNT ON;
DECLARE @Time varchar(8);
SELECT @Time = Convert(varchar(8), StartTime, 114)
FROM dbo.StartTime;
WAITFOR TIME @Time;
DECLARE @i int,
@QueueID int;
SET @i = 1;
WHILE @i <= 30 BEGIN
BEGIN TRAN;
--SET @QueueID = (
-- SELECT TOP 1 QueueID
-- FROM dbo.Queue WITH (READPAST, UPDLOCK)
-- WHERE StatusID = 1 -- ready
-- ORDER BY QueuedDate, QueueID
--);
--UPDATE dbo.Queue
--SET StatusID = 2 -- in process
--WHERE QueueID = @QueueID;
SET @QueueID = NULL;
UPDATE Q
SET Q.StatusID = 1, @QueueID = Q.QueueID
FROM (
SELECT TOP 1 *
FROM dbo.Queue WITH (ROWLOCK, READPAST)
WHERE StatusID = 1
ORDER BY QueuedDate, QueueID
) Q
PRINT @QueueID;
WAITFOR DELAY '00:00:01'
SET @i = @i + 1;
DELETE dbo.Queue
WHERE QueueID = @QueueID;
COMMIT TRAN;
END
источник
READPAST, UPDLOCK, ROWLOCK
мой сценарий для ввода данных в таблицу QueueHistory ничего не делает. Интересно, это потому, что StatusID не зафиксирован? ОнWITH (NOLOCK)
так теоретически должен работать ... и раньше работал! Я не уверен, почему это не работает сейчас, но это, вероятно, еще один опыт обучения.Ответы:
Вам нужно ровно 3 подсказки блокировки
Я ответил на это ранее на SO: /programming/939831/sql-server-process-queue-race-condition/940001#940001
Как говорит Ремус, использование сервисного брокера приятнее, но эти подсказки работают
Ваша ошибка об уровне изоляции обычно означает репликацию или участие NOLOCK.
источник
UPDATE SET ... FROM (SELECT TOP 1 ... FROM ... ORDER BY ...)
) Означает ли это, что мой шаблон UPDATE с удержанием блокировки не может работать? Кроме того , в тот момент , вы сочетаетеREADPAST
сHOLDLOCK
вами получите ошибку. На этом сервере нет репликации, и уровень изоляции READ COMMITTED.StatusID
чтобы удалить элемент из очереди. Это верно? 2. Ваш ордер на возврат должен быть однозначным. Если вы ставите в очередь предметыGETDATE()
, то при больших объемах весьма вероятно, что несколько предметов будут в равной степени иметь право на снятие с очереди одновременно. Это приведет к тупикам. Я предлагаю добавитьIDENTITY
к кластерному индексу, чтобы гарантировать однозначный порядок удаления.SQL-сервер отлично работает для хранения реляционных данных. Что касается очереди на работу, это не так здорово. Смотрите эту статью, которая написана для MySQL, но она также может применяться здесь. https://blog.engineyard.com/2011/5-subtle-ways-youre-using-mysql-as-a-queue-and-why-itll-bite-you
источник
emails
таблицу и вnew_emails
очередь. Обработка опрашиваетnew_emails
очередь и обновляет состояние вemails
таблице . Это также позволяет избежать проблемы перемещения «жирного» состояния в очередях. Если бы мы говорили о распределенной обработке и истинных очередях, со связью (например, SSB), то все становится более сложным, так как общее состояние проблематично в распределенных системах.