Rxjs và Reactive programming – chi tiết về ý nghĩa và cách hoạt động

Chào các bạn! Nếu các bạn đã từng nghiên cứu hoặc tham gia vào một dự án về Angular, chắc hẳn bạn đã từng nghe đến cái tên “rxjs”. Cái tên này thậm chí khiến nhiều người cảm thấy khó hiểu và ngán ngẩm. Nhưng đừng lo, tôi đã có một khoảng thời gian làm việc với rxjs và hôm nay tôi sẽ chia sẻ kiến thức và kinh nghiệm của mình về thư viện này.

1. Reactive programming

Để sử dụng rxjs một cách hiệu quả, trước hết các bạn cần hiểu rõ về khái niệm reactive programming. Đây là một phương pháp phân tích logic hoàn toàn mới. Hãy cùng xem hình dưới đây để hiểu rõ hơn:

49561ecf 0f2e 447f 8e3e 95786d659519

Trong reactive programming, dữ liệu sẽ được chứa trong một “stream” (dòng dữ liệu), bạn có thể tưởng tượng một stream giống như một cái băng truyền như trong hình vẽ. Khi stream nhận được một gói dữ liệu mới, gói dữ liệu đó sẽ được truyền qua các “modifier” (biến đổi). Modifier là các hàm, nhiệm vụ của chúng là phản ứng với gói dữ liệu được truyền vào stream, thay đổi các gói dữ liệu đó và trả lại cho stream các gói dữ liệu đã được thay đổi.

Như bạn có thể thấy, các modifier không chạy một cách bị động, mà chúng sẽ tự động chạy và phản ứng với mỗi gói dữ liệu được truyền vào stream. Đó chính là lý do mô hình này được gọi là reactive programming.

Grâce à la réactivité automatique des modifiers vis-à-vis des paquets de données transmis par le stream, nous n’avons pas à les exécuter manuellement. Ainsi, le flux de fonctionnement de ce modèle devient facile à comprendre et à déboguer. Une caractéristique extrêmement utile de ce modèle est que nous pouvons définir l’ordre des modifiers avant d’exécuter un stream. Selon l’ordre des modifiers, les paquets de données renvoyés par le stream seront différents. Cela rend le flux de travail du stream très clair, cohérent et facile à déboguer.

Lorsqu’il est appliqué dans la pratique, la programmation réactive peut être utilisée dans de nombreux cas d’utilisation, par exemple avec la méthode setInterval. Nous pouvons considérer que chaque fois que setInterval est exécuté, il crée un stream. Dans l’exemple suivant, le stream créé par la méthode setInterval accepte un paquet de données ayant une valeur undefined toutes les 1 seconde, et la fonction de rappel que nous transmettons dans setInterval agit comme un modifier.

setInterval(data => { console.log(‘gói dữ liệu nhận được có giá trị là ‘, data); }, 1000);

De la même manière, nous pouvons considérer que la méthode setTimeout crée également un stream, mais ce stream s’arrête immédiatement après l’exécution de la fonction de rappel (modifier) qu’elle contient.

setTimeout(data => { console.log(‘gói dữ liệu nhận được có giá trị là ‘, data); }, 1000);

Les événements peuvent également être considérés comme des streams. Par exemple, lorsqu’un événement de clic se produit, un paquet de données contenant des informations sur le clic est transmis dans le stream, et la fonction de rappel de l’événement de clic agit comme des modifiers pour traiter ces paquets de données.

document.onclick = function(evt) { console.log(‘gói dữ liệu nhận được có giá trị là ‘, evt); }

2. Rxjs

Maintenant que nous avons une compréhension approfondie de la programmation réactive, nous pouvons approfondir le fonctionnement de rxjs.

2.1 Qu’est-ce que Rxjs?

La programmation réactive n’est qu’un concept, une façon de penser et de raisonner sur le cycle de vie des paquets de données, et rxjs nous aide à modéliser ce concept pour une application facile dans la pratique.

2.2 Termes clés de rxjs

Avant de plonger plus profondément dans rxjs, examinons certains des termes clés de cette bibliothèque:

  • Observable: c’est un constructeur qui nous aide à créer des objets observables. Temporairement, vous pouvez considérer l’observable comme une classe et les instances de cette classe comme des streams (flux de données).
  • Executor: si l’observable est une classe, l’executor est la partie logique ou une méthode de constructeur de cette classe. Il nous permet de définir comment un stream va fonctionner (j’expliquerai cela plus en détail plus tard).
  • Observer: c’est un objet avec 3 méthodes – next, error et complete. Ces méthodes sont des modifiers, mais chacun fonctionne dans un scénario différent:
    • next: ce modifier est exécuté (réagi) chaque fois qu’il reçoit un paquet de données.
    • error: ce modifier est exécuté (réagi) lorsque le stream associé rencontre une erreur. Il reçoit un paramètre renvoyant l’erreur rencontrée par le stream.
    • complete: ce modifier est exécuté (réagi) lorsque le flux de logic associé à lui s’arrête (je l’expliquerai plus tard).
  • Subscribe: si l’observable est une classe, le subscribe est l’équivalent du mot-clé new. Lorsqu’il est appelé, cette méthode crée un stream basé sur l’observable et l’exécute.
  • Subscription: c’est un objet renvoyé par la méthode subscribe et dispose de certaines méthodes utilisées pour contrôler le processus d’exécution de l’executor.
  • Operator: ce sont des modifiers, mais ils agissent également en tant que transporteurs de paquets de données vers le stream.

2.3 Fonctionnement des Observable

D’accord, maintenant que nous avons couvert comment observable fonctionne, examinons de plus près son fonctionnement.

Premièrement, il y a le symbole $ à la fin de la variable interval$. Ce symbole est simplement une convention pour nommer une variable contenant un observable.

Ensuite, nous avons le constructeur Observable, qui est utilisé pour créer des objets observables. Comme je l’ai mentionné précédemment, vous pouvez considérer l’observable comme une classe de streams. Chaque fois que la méthode subscribe d’un observable est exécutée, un stream est créé et commencera immédiatement à s’exécuter jusqu’à ce qu’il se termine.

Le constructeur Observable accepte une fonction (cette fonction est l’executor) et lui passe un objet observer. Les 3 méthodes de cet objet sont: next, complete et error. Ce sont les 3 méthodes next, complete et error que nous définissons dans l’objet observer et les transmettons dans la méthode subscribe.

Si vous plongez plus profondément dans l’executor, vous trouverez qu’un executor est simplement la façon dont un stream fonctionne. Cela peut sembler un peu compliqué, mais si vous considérez le fonctionnement d’une méthode setInterval comme un stream, comme dans le premier exemple de la section sur la programmation réactive, alors l’executor est simplement une fonction utilisée pour exécuter la méthode setInterval. La seule différence par rapport à l’appel de la méthode setInterval est que l’executor exécute les méthodes next, complete et error de l’objet observer qu’il reçoit en paramètre (celles-ci agissent en tant que transporteurs de données). Chaque fois que la méthode subscribe d’un observable est exécutée, l’executor de cet observable est exécuté, et chaque fois qu’un executor est exécuté, logiquement … un stream est créé. D’accord, maintenant vous pouvez abandonner l’idée que l’observable est une classe de streams.

Différent du stream ou de l’observable, l’Observer est plus facile à comprendre. Si vous examinez attentivement le code ci-dessus et le testez, vous verrez que l’objet observer passé dans l’executor est celui que vous avez transmis dans la méthode subscribe. En fait, je suis en train d’appeler la méthode next de l’objet observer que j’ai passé dans le subscribe lorsque j’écris observer.next(intervalCounter);. C’est ainsi que les operators sont exécutés et, lorsqu’ils sont exécutés, ils agissent en tant que modifiers. Vous pouvez voir que j’ai délibérément appelé observer.error('error'); lorsque setInterval est exécuté 3 fois.

2.3.1 Arrêt/terminaison d’un observable

Comme mentionné dans la section 2.2, la subscription dispose de certaines méthodes utilisées pour contrôler le processus d’exécution d’un stream, et la méthode la plus importante et la plus couramment utilisée est unsubscribe.

Comme son nom l’indique, unsubscribe a pour mission d’arrêter l’exécution d’un executor.


const subscription = interval$.subscribe(observer);
setTimeout(() => { subscription.unsubscribe(); }, 2000);

Lorsque vous remplacez l’exemple de subscribe dans la section 2.3 par celui-ci, vous constaterez que l’executor ne s’affiche qu’une seule fois. Cela est dû au fait que nous avons arrêté l’executor.

Pour capturer l’événement d’arrêt d’un executor, vous pouvez retourner une fonction à l’intérieur de l’executor, cette fonction sera appelée lorsque la méthode unsubscribe d’un observable est appelée. Bien que cet événement ne soit pas couramment utilisé, il est très important dans certains cas spécifiques, car il nous permet d’arrêter l’exécution de la méthode setInterval lorsque nous n’avons plus besoin de l’observable interval$.

import { Observable } from “rxjs”;

const interval$ = Observable.create(observer => {
let intervalCounter = 0;
const intervalInstance = setInterval(() => {
intervalCounter++;
observer.next(intervalCounter);
if (intervalCounter === 3) {
clearInterval(intervalInstance);
observer.error(‘error’);
observer.complete();
}
}, 1000);

return () => {
console.log(‘complete’);
clearInterval(intervalInstance);
}
});

const observer = {
next: intervalCounter => {
console.log(intervalCounter);
},
error: error => {
console.error(error);
},
complete: () => {
console.log(‘complete’);
}
}

interval$.subscribe(observer);

// output:
//
// 1
// 2
// 3
// error

2.3.2 Contrat Observable

Le contrat Observable est une règle dans rxjs selon laquelle:

  • Un stream ne peut pas recevoir de nouveaux paquets de données après que observer.complete ait été appelé ou après que observer.error ait été appelé.
  • Lorsque observer.error est appelé, le stream ne peut plus recevoir aucun paquet de données ni déclencher observer.complete.
  • Lorsque observer.complete est appelé, le stream ne peut plus recevoir aucun paquet de données ni déclencher observer.error.

2.3.3 Observable est-il utilisé pour le traitement asynchrone?

Je remarque que beaucoup de gens pensent que rxjs ou l’observable est utilisé pour le traitement asynchrone. Mais en réalité, rxjs et l’observable n’ont aucun lien avec le traitement asynchrone. Tout ce que fait rxjs, c’est nous aider à modéliser la programmation réactive. Que l’observable soit utilisé pour le traitement asynchrone ou non dépend entièrement de notre façon de gérer l’executor. Dans l’exemple suivant, je crée un observable qui traite les données de manière synchrone:

import { Observable, noop } from “rxjs”;

const syncObservable$ = new Observable(observer => {
observer.next(1);
observer.next(2);
observer.next(3);
observer.complete();
});

syncObservable$.subscribe(
intervalCounter => console.log(intervalCounter),
noop,
() => console.log(‘complete’)
);

console.log(4);

// output:
//
// 1
// 2
// 3
// complete
// 4

Comme vous pouvez le voir, je n’appelle pas la méthode observer.error dans l’executor de l’exemple ci-dessus, il est donc inutile de définir une fonction pour gérer une erreur pour l’observable syncObservable$. Dans ces cas-là, ou si vous ne vous souciez pas du traitement d’une certaine méthode parmi les 3 méthodes de l’objet observer, vous pouvez simplement passer noop à sa place.

Conclusion

J’ai pris le temps de vous expliquer en détail la programmation réactive et les concepts clés de la bibliothèque rxjs. L’objectif de cet article n’est pas de plonger en profondeur dans rxjs, en particulier dans les operators, l’un des points forts de cette bibliothèque, ou les sujets (un type spécifique d’observable), mais plutôt de vous donner un aperçu général et compréhensible de rxjs pour faciliter votre apprentissage de cette bibliothèque si vous êtes débutant. En fait, si vous comprenez les concepts que j’ai mentionnés ci-dessus, vous avez déjà une bonne maîtrise de cette bibliothèque. Les sujets comme les sujets ou les operators deviendront désormais très faciles à comprendre et à appliquer. Je vous souhaite une excellente journée. Bon coding!

FEATURED TOPIC