Последовательность RxJS, эквивалентная обещанию.then ()?

84

Раньше я много разрабатывал с обещаниями, а теперь перехожу на RxJS. Документ RxJS не дает очень четкого примера того, как перейти от цепочки обещаний к последовательности наблюдателя.

Например, я обычно пишу цепочку обещаний с несколькими шагами, например

// a function that returns a promise
getPromise()
.then(function(result) {
   // do something
})
.then(function(result) {
   // do something
})
.then(function(result) {
   // do something
})
.catch(function(err) {
    // handle error
});

Как мне переписать эту цепочку обещаний в стиле RxJS?

Хаолян Юй
источник

Ответы:

81

Для потока данных (эквивалент then):

Rx.Observable.fromPromise(...)
  .flatMap(function(result) {
   // do something
  })
  .flatMap(function(result) {
   // do something
  })
  .subscribe(function onNext(result) {
    // end of chain
  }, function onError(error) {
    // process the error
  });

Обещание можно преобразовать в наблюдаемое с помощью Rx.Observable.fromPromise.

У некоторых операторов обещаний есть прямой перевод. Например RSVP.all, или jQuery.whenможно заменить на Rx.Observable.forkJoin.

Имейте в виду, что у вас есть набор операторов, которые позволяют асинхронно преобразовывать данные и выполнять задачи, которые вы не можете или очень сложно выполнить с помощью обещаний. Rxjs раскрывает все свои возможности с помощью асинхронных последовательностей данных (последовательность, т.е. более 1 асинхронного значения).

Что касается управления ошибками, тема немного сложнее.

  • есть уловка и, наконец, операторы
  • retryWhen также может помочь повторить последовательность в случае ошибки
  • вы также можете исправить ошибки в самом подписчике с помощью onErrorфункции.

Чтобы получить точную семантику, более подробно изучите документацию и примеры, которые вы можете найти в Интернете, или задайте здесь конкретные вопросы.

Это определенно будет хорошей отправной точкой для более глубокого изучения управления ошибками с помощью Rxjs: https://xgrommx.github.io/rx-book/content/getting_started_with_rxjs/creating_and_querying_observable_sequences/error_handling.html

user3743222
источник
Я всегда вижу, что наблюдаемая последовательность заканчивается subscribe (). Поскольку это только функция наблюдаемого объекта, есть ли причина для этого? Это функция для запуска последовательности?
Хаолян Юй
точно так. Если через subscribe нет наблюдателей, ваш наблюдаемый объект не будет выдавать никаких данных, поэтому вы не увидите никакого потока данных.
user3743222
7
Я рекомендую вам взглянуть на это: gist.github.com/staltz/868e7e9bc2a7b8c1f754 . Это могло бы быть более приемлемым, чем официальный документ.
user3743222
3
Promise.thenскорее .flatMapчем .map.
Tamas Hegedus
1
К вашему сведению, это не совсем то же самое, поскольку в Promiseверсии с 3-й версией ошибки thenбудут обнаружены catch. Здесь их нет.
mik01aj
35

Более современная альтернатива:

import {from as fromPromise} from 'rxjs';
import {catchError, flatMap} from 'rxjs/operators';

fromPromise(...).pipe(
   flatMap(result => {
       // do something
   }),
   flatMap(result => {
       // do something
   }),
   flatMap(result => {
       // do something
   }),
   catchError(error => {
       // handle error
   })
)

Также обратите внимание, что для того, чтобы все это работало, вам нужно куда-то subscribeэто передать Observable, но я предполагаю, что это обрабатывается в какой-то другой части приложения.

mik01aj
источник
Я новичок в RxJS, но, учитывая, что здесь мы имеем дело только с начальным потоком одного события, и mergeMap()поэтому на самом деле нечего объединять , я считаю, что в этом случае мы могли бы добиться того же, используя concatMap()или switchMap(). Я правильно понял ...?
Дэн Кинг
8

Обновление в мае 2019 г. с использованием RxJs 6

Согласен с приведенными выше ответами, хотел добавить конкретный пример с некоторыми игрушечными данными и простыми обещаниями (с setTimeout) с использованием RxJs v6, чтобы добавить ясности.

Просто обновите переданный идентификатор (в настоящее время жестко закодированный как 1) на то, чего не существует, чтобы также выполнить логику обработки ошибок. Важно также отметить использование ofwith catchErrormessage.

import { from as fromPromise, of } from "rxjs";
import { catchError, flatMap, tap } from "rxjs/operators";

const posts = [
  { title: "I love JavaScript", author: "Wes Bos", id: 1 },
  { title: "CSS!", author: "Chris Coyier", id: 2 },
  { title: "Dev tools tricks", author: "Addy Osmani", id: 3 }
];

const authors = [
  { name: "Wes Bos", twitter: "@wesbos", bio: "Canadian Developer" },
  {
    name: "Chris Coyier",
    twitter: "@chriscoyier",
    bio: "CSS Tricks and CodePen"
  },
  { name: "Addy Osmani", twitter: "@addyosmani", bio: "Googler" }
];

function getPostById(id) {
  return new Promise((resolve, reject) => {
    setTimeout(() => {
      const post = posts.find(post => post.id === id);
      if (post) {
        console.log("ok, post found!");
        resolve(post);
      } else {
        reject(Error("Post not found!"));
      }
    }, 200);
  });
}

function hydrateAuthor(post) {
  return new Promise((resolve, reject) => {
    setTimeout(() => {
      const authorDetails = authors.find(person => person.name === post.author);
      if (authorDetails) {
        post.author = authorDetails;
        console.log("ok, post hydrated with author info");
        resolve(post);
      } else {
        reject(Error("Author not Found!"));
      }
    }, 200);
  });
}

function dehydratePostTitle(post) {
  return new Promise((resolve, reject) => {
    setTimeout(() => {
      delete post.title;
      console.log("ok, applied transformation to remove title");
      resolve(post);
    }, 200);
  });
}

// ok, here is how it looks regarding this question..
let source$ = fromPromise(getPostById(1)).pipe(
  flatMap(post => {
    return hydrateAuthor(post);
  }),
  flatMap(post => {
    return dehydratePostTitle(post);
  }),
  catchError(error => of(`Caught error: ${error}`))
);

source$.subscribe(console.log);

Выходные данные:

ok, post found!
ok, post hydrated with author info
ok, applied transformation to remove title
{ author:
   { name: 'Wes Bos',
     twitter: '@wesbos',
     bio: 'Canadian Developer' },
  id: 1 }

Ключевая часть эквивалентна следующему с использованием простого потока управления обещаниями:

getPostById(1)
  .then(post => {
    return hydrateAuthor(post);
  })
  .then(post => {
    return dehydratePostTitle(post);
  })
  .then(author => {
    console.log(author);
  })
  .catch(err => {
    console.error(err);
  });
Arcseldon
источник
1

Если я правильно понял, вы имеете в виду потребление значений, и в этом случае вы используете sbuscribe, т.е.

const arrObservable = from([1,2,3,4,5,6,7,8]);
arrObservable.subscribe(number => console.log(num) );

Кроме того, вы можете просто превратить наблюдаемое в обещание с помощью toPromise (), как показано:

arrObservable.toPromise().then()
Дэвид Кабии
источник
0

если getPromiseфункция находится в середине потока трубы вы должны просто обернуть его в одну из функций mergeMap, switchMapили concatMap( как правило mergeMap):

stream$.pipe(
   mergeMap(data => getPromise(data)),
   filter(...),
   map(...)
 ).subscribe(...);

если вы хотите начать свой поток, getPromise()оберните его в fromфункцию:

import {from} from 'rxjs';

from(getPromise()).pipe(
   filter(...)
   map(...)
).subscribe(...);
Максим Романенко
источник
0

Насколько я только что узнал, если вы вернете результат в flatMap, он преобразует его в массив, даже если вы вернули строку.

Но если вы вернете Observable, этот Observable может вернуть строку;

Саманта Адрихем
источник
0

Вот как я это сделал.

Ранее

  public fetchContacts(onCompleteFn: (response: gapi.client.Response<gapi.client.people.ListConnectionsResponse>) => void) {
    const request = gapi.client.people.people.connections.list({
      resourceName: 'people/me',
      pageSize: 100,
      personFields: 'phoneNumbers,organizations,emailAddresses,names'
    }).then(response => {
      onCompleteFn(response as gapi.client.Response<gapi.client.people.ListConnectionsResponse>);
    });
  }

// caller:

  this.gapi.fetchContacts((rsp: gapi.client.Response<gapi.client.people.ListConnectionsResponse>) => {
      // handle rsp;
  });

После (лы?)

public fetchContacts(): Observable<gapi.client.Response<gapi.client.people.ListConnectionsResponse>> {
    return from(
      new Promise((resolve, reject) => {
        gapi.client.people.people.connections.list({
          resourceName: 'people/me',
          pageSize: 100,
          personFields: 'phoneNumbers,organizations,emailAddresses,names'
        }).then(result => {
          resolve(result);
        });
      })
    ).pipe(map((result: gapi.client.Response<gapi.client.people.ListConnectionsResponse>) => {
      return result; //map is not really required if you not changing anything in the response. you can just return the from() and caller would subscribe to it.
    }));
  }

// caller

this.gapi.fetchContacts().subscribe(((rsp: gapi.client.Response<gapi.client.people.ListConnectionsResponse>) => {
  // handle rsp
}), (error) => {
  // handle error
});
Ананд Рокзз
источник
побочный эффект : обнаружение изменений также начало работать после преобразования обратного вызова в наблюдаемый .
Anand Rockzz
0

Последовательность RxJS, эквивалентная обещанию.then ()?

Например

function getdata1 (argument) {
        return this.http.get(url)
            .map((res: Response) => res.json());
    }

    function getdata2 (argument) {
        return this.http.get(url)
            .map((res: Response) => res.json());
    }

    getdata1.subscribe((data1: any) => {
        console.log("got data one. get data 2 now");
        getdata2.subscribe((data2: any) => {
            console.log("got data one and two here");
        });
    });
Йогеш Вагмаре
источник