Как прослушать изменения в коллекции MongoDB?

200

Я создаю своего рода фоновую систему очередей заданий с MongoDB в качестве хранилища данных. Как я могу «прослушать» вставки в коллекцию MongoDB, прежде чем создавать рабочих для обработки работы? Нужно ли опрашивать каждые несколько секунд, чтобы увидеть, есть ли какие-либо изменения с прошлого раза, или есть ли способ, которым мой сценарий может ожидать вставки? Это PHP-проект, над которым я работаю, но не стесняйтесь отвечать на Ruby или не зависимо от языка.

Андрей
источник
1
Изменение потока было добавлено в MongoDB 3.6 для решения вашего сценария. docs.mongodb.com/manual/changeStreams Также, если вы используете MongoDB Atlas, вы можете использовать триггеры Stitch, которые позволяют вам выполнять функции в ответ на вставку / обновление / удаление / и т.д. docs.mongodb.com/stitch/triggers/overview Больше нет необходимости разбирать оплог.
Роберт Уолтерс

Ответы:

111

То, о чем вы думаете, звучит очень похоже на триггеры. MongoDB не имеет никакой поддержки триггеров, однако некоторые люди «свернули свои собственные», используя некоторые приемы. Ключевым моментом здесь является оплог.

Когда вы запускаете MongoDB в наборе реплик, все действия MongoDB записываются в журнал операций (известный как oplog). Оплог - это просто список изменений, внесенных в данные. Реплики Устанавливает функцию, прослушивая изменения в этом журнале операций и затем применяя изменения локально.

Это звучит знакомо?

Я не могу описать весь процесс здесь, это несколько страниц документации, но инструменты, которые вам нужны, доступны.

Сначала несколько рецензий на оплог - Краткое описание - Макет localсборника (который содержит оплог)

Вы также захотите использовать настраиваемые курсоры . Это даст вам возможность прослушивать изменения, а не опрашивать их. Обратите внимание, что для репликации используются настраиваемые курсоры, поэтому это поддерживаемая функция.

Гейтс В.П.
источник
1
хм ... не совсем то, что я имел в виду. На данный момент я работаю только с одним экземпляром (без рабов). Так может быть, более простое решение?
Андрей
17
Вы можете запустить сервер с --replSetпараметром, и он создаст / заполнит oplog. Даже без вторичного. Это, безусловно, единственный способ «прослушать» изменения в БД.
Гейтс В.П.
2
Это хорошее описание того, как настроить оплог для регистрации изменений в БД локально : oosexaml.wordpress.com/2012/09/03/…
johndodo
Cooooool! Это действительно то, чего я хочу. И я нашел библиотеку под названием «Mongo-Oplog» на npm. Так счастлив ~
pjincz
Я согласен, что ко времени написания этого ответа триггеры могут быть недоступны, но для всех, кто приземлится здесь, теперь есть опция, проверить MongoDB Stitch ( docs.mongodb.com/stitch/#stitch ) и триггеры Stitch ( docs). mongodb.com/stitch/triggers ) ..
whoami
102

MongoDB имеет то, что называется, capped collectionsи tailable cursorsэто позволяет MongoDB передавать данные слушателям.

По capped collectionсути, A - это коллекция фиксированного размера, которая допускает только вставки. Вот как это будет выглядеть:

db.createCollection("messages", { capped: true, size: 100000000 })

MongoDB Tailable курсоры ( оригинальный пост Джонатана Х. Вейджа )

Рубин

coll = db.collection('my_collection')
cursor = Mongo::Cursor.new(coll, :tailable => true)
loop do
  if doc = cursor.next_document
    puts doc
  else
    sleep 1
  end
end

PHP

$mongo = new Mongo();
$db = $mongo->selectDB('my_db')
$coll = $db->selectCollection('my_collection');
$cursor = $coll->find()->tailable(true);
while (true) {
    if ($cursor->hasNext()) {
        $doc = $cursor->getNext();
        print_r($doc);
    } else {
        sleep(1);
    }
}

Питон ( Роберт Стюарт)

from pymongo import Connection
import time

db = Connection().my_db
coll = db.my_collection
cursor = coll.find(tailable=True)
while cursor.alive:
    try:
        doc = cursor.next()
        print doc
    except StopIteration:
        time.sleep(1)

Perl ( Макс )

use 5.010;

use strict;
use warnings;
use MongoDB;

my $db = MongoDB::Connection->new;
my $coll = $db->my_db->my_collection;
my $cursor = $coll->find->tailable(1);
for (;;)
{
    if (defined(my $doc = $cursor->next))
    {
        say $doc;
    }
    else
    {
        sleep 1;
    }
}

Дополнительные ресурсы:

Ruby / Node.js Tutorial, который проведет вас через создание приложения, которое прослушивает вставки в ограниченную коллекцию MongoDB.

Статья, рассказывающая о настраиваемых курсорах более подробно.

Примеры использования настраиваемых курсоров на PHP, Ruby, Python и Perl.

Андрей
источник
70
спать 1? действительно? для производственного кода? как это не опрос?
ОПБ
2
@ rbp ха-ха, я никогда не говорил, что это производственный код, но ты прав, спать на секунду не очень хорошая практика. Уверен, я получил этот пример откуда-то еще. Не уверен, как реорганизовать это все же.
Андрей
14
@kroe, потому что эти не относящиеся к делу детали будут введены в производственный код новыми программистами, которые могут не понимать, почему это плохо.
сом
3
Я понимаю вашу мысль, но ожидать от некоторых новых программистов добавления «сна 1» в производство почти оскорбительно! Я имею в виду, я не удивлюсь ... Но если кто-то запустит это в производство, по крайней мере, научится трудным путем и навсегда .. хахаха
кро
19
что не так с деланием time.sleep (1) на производстве?
Аль Йохри
44

Начиная с MongoDB 3.6 появится новый API уведомлений под названием Change Streams, который вы можете использовать для этого. Смотрите этот пост в блоге для примера . Пример из этого:

cursor = client.my_db.my_collection.changes([
    {'$match': {
        'operationType': {'$in': ['insert', 'replace']}
    }},
    {'$match': {
        'newDocument.n': {'$gte': 1}
    }}
])

# Loops forever.
for change in cursor:
    print(change['newDocument'])
Mitar
источник
4
Зачем? Можете ли вы уточнить? Это стандартный способ сейчас?
Митар
1
как? не используйте опрос - вам нужен равномерный подход вместо циклов while и т. д.
Александр Миллс
3
Где вы видите здесь опрос?
Митар
Я думаю, что он / она имеет в виду последний цикл. Но я думаю, что PyMongo только поддерживает это. Motor может иметь реализацию в стиле прослушивателя async / event.
Шейн Сюй
41

Проверьте это: изменение потоков

10 января 2018 г. - выпуск 3.6

* РЕДАКТИРОВАТЬ: я написал статью о том, как это сделать https://medium.com/riow/mongodb-data-collection-change-85b63d96ff76

https://docs.mongodb.com/v3.6/changeStreams/


Это новое в mongodb 3.6 https://docs.mongodb.com/manual/release-notes/3.6/ 2018/01/10

$ mongod --version
db version v3.6.2

Для использования changeStreams база данных должна быть набором репликации

Подробнее о наборах репликации: https://docs.mongodb.com/manual/replication/

Ваша база данных будет « Автономной » по умолчанию.

Как преобразовать автономный набор реплик: https://docs.mongodb.com/manual/tutorial/convert-standalone-to-replica-set/


Следующий пример - практическое применение того, как вы можете это использовать.
* Специально для Node.

/* file.js */
'use strict'


module.exports = function (
    app,
    io,
    User // Collection Name
) {
    // SET WATCH ON COLLECTION 
    const changeStream = User.watch();  

    // Socket Connection  
    io.on('connection', function (socket) {
        console.log('Connection!');

        // USERS - Change
        changeStream.on('change', function(change) {
            console.log('COLLECTION CHANGED');

            User.find({}, (err, data) => {
                if (err) throw err;

                if (data) {
                    // RESEND ALL USERS
                    socket.emit('users', data);
                }
            });
        });
    });
};
/* END - file.js */

Полезные ссылки:
https://docs.mongodb.com/manual/tutorial/convert-standalone-to-replica-set
https://docs.mongodb.com/manual/tutorial/change-streams-example

https://docs.mongodb.com/v3.6/tutorial/change-streams-example
http://plusnconsulting.com/post/MongoDB-Change-Streams

Рио Вебер
источник
извините за все правки, ТАК не понравились мои "Ссылки" (сказали, что они были неправильно отформатированы код.)
Рио Вебер
1
вам не нужно запрашивать базу данных, я думаю, с помощью watch () или аналогичного метода новые данные могут быть отправлены на прослушивающий сервер
Alexander Mills
22

MongoDB версии 3.6 теперь включает потоки изменений, которые, по сути, представляют собой API поверх OpLog, что позволяет использовать сценарии, подобные триггерам / уведомлениям.

Вот ссылка на пример Java: http://mongodb.github.io/mongo-java-driver/3.6/driver/tutorials/change-streams/

Пример NodeJS может выглядеть примерно так:

 var MongoClient = require('mongodb').MongoClient;
    MongoClient.connect("mongodb://localhost:22000/MyStore?readConcern=majority")
     .then(function(client){
       let db = client.db('MyStore')

       let change_streams = db.collection('products').watch()
          change_streams.on('change', function(change){
            console.log(JSON.stringify(change));
          });
      });
Роберт Уолтерс
источник
JSON.stringify очень важен для получения этих данных в Android Studio (Android App) ..
DragonFire
3

В качестве альтернативы вы можете использовать стандартный метод Mongo FindAndUpdate и в обратном вызове инициировать событие EventEmitter (в узле) при выполнении обратного вызова.

Любые другие части приложения или архитектуры, прослушивающие это событие, будут уведомлены об обновлении, и любые соответствующие данные также будут отправлены туда. Это действительно простой способ получения уведомлений от Mongo.

Alex
источник
это очень неэффективно ... вы блокируете БД для каждого FindAndUpdate!
Яш Гупта
1
Я полагаю, что Алекс отвечал на несколько иной (не относящийся конкретно к вставкам), но связанный вопрос о том, как отключить какое-то уведомление для клиентов, когда состояние задания в очереди изменяется, что, как мы предполагаем, должно произойти, когда задания появляются. завершить успешно или потерпеть неудачу. Когда клиенты подключены к узлу с помощью веб-сокетов, все они могут получать уведомления об изменениях с помощью широковещательного события обратного вызова FIndAndUpdate, которое может вызываться при получении сообщений об изменении состояния. Я бы сказал, что это не неэффективно, так как обновления должны быть сделаны.
Питер Скотт
3

Многие из этих ответов дадут вам только новые записи, а не обновления и / или крайне неэффективны

Единственный надежный и эффективный способ сделать это - создать настраиваемый курсор в локальной коллекции db: oplog.rs, чтобы получить ВСЕ изменения в MongoDB и делать с ними все, что вы захотите. (MongoDB даже делает это внутренне более или менее для поддержки репликации!)

Объяснение содержания оплога: https://www.compose.com/articles/the-mongodb-oplog-and-node-js/

Пример библиотеки Node.js, которая предоставляет API вокруг того, что можно сделать с помощью оплога: https://github.com/cayasso/mongo-oplog

Джон Кульвинер
источник
2

Существует потрясающий набор доступных сервисов, который называется MongoDB Stitch . Посмотрите на функции / триггеры стежка . Обратите внимание, что это облачный платный сервис (AWS). В вашем случае, при вставке, вы можете вызвать пользовательскую функцию, написанную на javascript.

введите описание изображения здесь

Маниш джайн
источник
stackoverflow.com/users/486867/manish-jain - есть ли у вас пример того, как можно использовать стежок для уведомления приложения REACT о том, что данные были вставлены в таблицу?
MLissCetrus
1

Существует рабочий пример Java, который можно найти здесь .

 MongoClient mongoClient = new MongoClient();
    DBCollection coll = mongoClient.getDatabase("local").getCollection("oplog.rs");

    DBCursor cur = coll.find().sort(BasicDBObjectBuilder.start("$natural", 1).get())
            .addOption(Bytes.QUERYOPTION_TAILABLE | Bytes.QUERYOPTION_AWAITDATA);

    System.out.println("== open cursor ==");

    Runnable task = () -> {
        System.out.println("\tWaiting for events");
        while (cur.hasNext()) {
            DBObject obj = cur.next();
            System.out.println( obj );

        }
    };
    new Thread(task).start();

Ключ - ВАЖНЫЕ ВАРИАНТЫ приведенные здесь.

Также вы можете изменить запрос поиска, если вам не нужно каждый раз загружать все данные.

BasicDBObject query= new BasicDBObject();
query.put("ts", new BasicDBObject("$gt", new BsonTimestamp(1471952088, 1))); //timestamp is within some range
query.put("op", "i"); //Only insert operation

DBCursor cur = coll.find(query).sort(BasicDBObjectBuilder.start("$natural", 1).get())
.addOption(Bytes.QUERYOPTION_TAILABLE | Bytes.QUERYOPTION_AWAITDATA);
Малеен Абевардана
источник
1

На самом деле, вместо просмотра вывода, почему вы не получаете уведомления, когда вставляется что-то новое, используя промежуточное ПО, предоставленное схемой mongoose

Вы можете поймать событие вставки нового документа и сделать что-то после того, как эта вставка сделана

Дуонг Нгуен
источник
Виноват. Простите, сэр.
Duong Nguyen
0

После версии 3.6 разрешено использовать базу данных следующих типов триггеров базы данных:

  • управляемые событиями триггеры - полезны для автоматического обновления связанных документов, уведомления последующих служб, распространения данных для поддержки смешанных рабочих нагрузок, целостности данных и аудита
  • Запланированные триггеры - полезны для запланированных рабочих нагрузок поиска, распространения, архивирования и анализа данных.

Войдите в свою учетную запись Atlas, выберите Triggersинтерфейс и добавьте новый триггер:

введите описание изображения здесь

Разверните каждый раздел для получения дополнительных настроек или деталей.

gotqn
источник