Observable y observer

En nuestro ejemplo de introducción, tocamos el patrón de observador usando \SplSubject y \SplObserver. Ahora, estamos presentando un componente observable y observador RxPHP. Podríamos decir que \SplSubject es análogo a Rx \Observable, mientras que \SplObserver es análogo a Rx\Observer \CallbackObserver. Sin embargo, todo el SPL y el Rx son solo superficialmente análogos.

Rx\Observable es más poderoso que \SplObserver. Podemos pensar en Rx\Observable como una fuente perezosa de eventos, algo que produce valor con el tiempo. Los observables emiten los siguientes tres tipos de eventos a sus observadores:

  • El elemento actual en la secuencia
  • El error, si ocurrió uno
  • El estado completo

En pocas palabras, es una fuente de datos reactiva que sabe cómo señalar los cambios internos de datos.
Echemos un vistazo al siguiente ejemplo simple:

<?php
require_once DIR . '/vendor/autoload.php';
use \Rx\Observable;
use \Rx\Observer\CallbackObserver;
use \React\EventLoop\Factory;
use \Rx\Scheduler;
$loop = Factory::create();
Scheduler::setDefaultFactory(function () use ($loop) {
return new Scheduler\EventLoopScheduler($loop);
});
$users = Observable::fromArray(['John', 'Mariya', 'Marc', 'Lucy']);
$logger = new CallbackObserver(
function ($user) {
echo 'Logging: ', $user, PHP_EOL;
},
function (\Throwable $t) {
echo $t->getMessage(), PHP_EOL;
},
function () {
echo 'Stream complete!', PHP_EOL;
}
);
$users->subscribe($logger);
$loop->run();

La salida es como la siguiente:

Logging: John
Logging: Mariya
Logging: Marc
Logging: Lucy
Stream complete!

Vemos que el método subscribe () de la instancia Observable acepta una instancia de CallbackObserver. Cada uno de los tres parámetros de un observador es una función de devolución de llamada.
La primera devolución de llamada maneja el elemento de la secuencia, la segunda devuelve un error potencial y la tercera indica una secuencia completa.
RxPHP proporciona pocos tipos de observables:

  • AnonymousObservable
  • ArrayObservable
  • ConnectableObservable
  • EmptyObservable
  • ErrorObservable
  • ForkJoinObservable
  • GroupedObservable
  • IntervalObservable
  • IteratorObservable
  • MulticastObservable
  • NeverObservable
  • RangeObservable
  • RefCountObservable
  • ReturnObservable
  • TimerObservable

Echemos un vistazo a un ejemplo más elaborado de observable y observer:

<?php
require_once DIR . '/vendor/autoload.php';
use \Rx\Observable;
use \Rx\Observer\CallbackObserver;
use \React\EventLoop\Factory;
use \Rx\Scheduler;
$loop = Factory::create();
Scheduler::setDefaultFactory(function () use ($loop) {
return new Scheduler\EventLoopScheduler($loop);
});
// Generator function, reads CSV file
function users($file)
{
$users = fopen($file, 'r');
while (!feof($users)) {
yield fgetcsv($users)[0];
}
fclose($users);
}
// The RxPHP Observer
$logger = new CallbackObserver(
function ($user) {
echo $user, PHP_EOL;
},
function (\Throwable $t) {
echo $t->getMessage(), PHP_EOL;
},
function () {
echo 'stream complete!', PHP_EOL;
}
);
// Dummy map callback function
$mapper = function ($value) {
return time() . ' | ' . $value;
};
// Dummy filter callback function
$filter = function ($value) {
return strstr($value, 'Ma');
};
// Generator function
$users = users(DIR . '/users.csv');
// The RxPHP Observable - from generator
Observable::fromIterator($users)
->map($mapper)
->filter($filter)
->subscribe($logger);
$loop->run();

Comenzamos creando una función de generador simple llamada users(). Lo mejor de los generadores es que actúan como iteradores, lo que facilita la creación de observables RxPHP a partir de ellos utilizando el método fromIterator(). Una vez que tenemos lo observable, podemos encadenar algunos de sus métodos, como map() y filter(), juntos. De esta manera, controlamos el flujo de datos que afecta a nuestro observador suscrito.
Asuma el archivo users.csv con el siguiente contenido:

"John"
"Mariya"
"Marc"
"Lucy"

La salida del código anterior debería ser algo como esto:

1487439356 | Mariya
1487439356 | Marc
stream complete!

Ahora, supongamos que queremos adjuntar múltiples observadores a nuestra secuencia $users:

$mailer = new CallbackObserver(
function ($user) {
echo 'Mailer: ', $user, PHP_EOL;
},
function (\Throwable $t) {
echo 'Mailer: ', $t->getMessage(), PHP_EOL;
},
function () {
echo 'Mailer stream complete!', PHP_EOL;
}
);
$logger = new CallbackObserver(
function ($user) {
echo 'Logger: ', $user, PHP_EOL;
},
function (\Throwable $t) {
echo 'Logger: ', $t->getMessage(), PHP_EOL;
},
function () {
echo 'Logger stream complete!', PHP_EOL;
}
);
$users = Observable::fromIterator(users(DIR . '/users.csv'));
$users->subscribe($mailer);
$users->subscribe($logger);

Esto no funcionara. El código no arrojará ningún error, pero el resultado podría no ser el que esperaríamos:

Mailer: John
Logger: Mariya
Mailer: Marc
Logger: Lucy
Mailer:
Logger:
Mailer stream complete!
Logger stream complete!

Realmente no podemos adjuntar múltiples suscriptores de esta manera. El primer observador adjunto consume la corriente, por lo que el segundo observador la ve vacía. Aquí es donde el componente Rx\Asunto\Asunto puede ser útil.

Comparte