Capítulo 1. La concurrencia: Una visión general

Este trabajo se ha traducido utilizando IA. Agradecemos tus opiniones y comentarios: translation-feedback@oreilly.com

Concurrencia es un aspecto clave del software bonito. Durante décadas, la concurrencia era posible pero difícil de conseguir. El software concurrente era difícil de escribir, de depurar y de mantener. Como resultado, muchos desarrolladores eligieron el camino más fácil y evitaron la concurrencia. Con las bibliotecas y características del lenguaje disponibles para los programas .NET modernos, la concurrencia es ahora mucho más fácil. Microsoft ha abierto el camino bajando significativamente el listón de la concurrencia. Antes, la programación concurrente era cosa de expertos; ahora, cualquier desarrollador puede (y debe) adoptar la concurrencia.

Introducción a la concurrencia

Antes de continuar, me gustaría aclarar cierta terminología que utilizaré a lo largo de este libro. Son definiciones propias que utilizo sistemáticamente para desambiguar las distintas técnicas de programación. Empecemos por la concurrencia.

Concurrencia

Hacer más de una cosa a la vez.

Espero que sea evidente la utilidad de la concurrencia. Las aplicaciones de usuario final utilizan la concurrencia para responder a las entradas del usuario mientras escriben en una base de datos. Las aplicaciones de servidor utilizan la concurrencia para responder a una segunda solicitud mientras finaliza la primera. Necesitas la concurrencia siempre que necesites que una aplicación haga una cosa mientras trabaja en otra. Casi todas las aplicaciones de software del mundo pueden beneficiarse de la concurrencia.

La mayoría de los desarrolladores que oyen el término "concurrencia" piensan inmediatamente en "multihilo". Me gustaría establecer una distinción entre ambos.

Multihilo

Una forma de concurrencia que utiliza múltiples hilos de ejecución.

Multihilo se refiere literalmente al uso de varios hilos. Como se demuestra en muchas recetas de este libro, el multihilo es una forma de concurrencia, pero desde luego no la única. De hecho, el uso directo de tipos de hilos de bajo nivel casi no tiene sentido en una aplicación moderna; las abstracciones de más alto nivel son más potentes y eficientes que el multihilo de la vieja escuela. Por esa razón, minimizaré mi cobertura de técnicas anticuadas. Ninguna de las recetas multihilo de este libro utiliza los tipos Thread o BackgroundWorker; han sido sustituidos por alternativas superiores.

Advertencia

En cuanto escribas new Thread(), se acabó; tu proyecto ya tiene código heredado.

Pero ¡no te hagas a la idea de que el multihilo ha muerto! El multihilo sigue vivo en el repositorio de hilos, un lugar útil para poner trabajo en cola que se ajusta automáticamente según la demanda. A su vez, la reserva de hilos permite otra forma importante de concurrencia: el procesamiento paralelo.

Procesamiento paralelo

Hacer mucho trabajo dividiéndolo entre varios hilos que se ejecutan simultáneamente.

El procesamiento paralelo (o programación paralela) utiliza el multihilo para maximizar el uso de los múltiples núcleos del procesador. Las CPU modernas tienen varios núcleos, y si hay mucho trabajo que hacer, no tiene sentido hacer que un núcleo haga todo el trabajo mientras los demás permanecen inactivos. El procesamiento paralelo divide el trabajo entre varios subprocesos, cada uno de los cuales puede ejecutarse independientemente en un núcleo distinto.

El procesamiento paralelo es un tipo de multihilo, y el multihilo es un tipo de concurrencia. Hay otro tipo de concurrencia que es importante en las aplicaciones modernas, pero que no es tan familiar para muchos desarrolladores: la programación asíncrona.

Programación asíncrona

Una forma de concurrencia que utiliza futuros o retrollamadas para evitar hilos innecesarios.

Un futuro (o promesa) es un tipo que representa alguna operación que se completará en el futuro. Algunos tipos de futuro modernos en .NET son Task y Task<TResult>. Las API asíncronas más antiguas utilizan retrollamadas o eventos en lugar de futuros. La programación asíncrona se centra en la idea de una operación asíncrona: una operación que se inicia y que se completará más tarde. Mientras la operación está en curso, no bloquea la hebra original; la hebra que inicia la operación es libre de hacer otro trabajo. Cuando la operación finaliza, notifica su futuro o invoca su llamada de retorno o evento para que la aplicación sepa que la operación ha terminado.

La programación asíncrona es una potente forma de concurrencia, pero hasta hace poco requería un código extremadamente complejo. Los soportes async y await de los lenguajes modernos hacen que la programación asíncrona sea casi tan fácil como la síncrona (no concurrente).

Otra forma de concurrencia de es la programación reactiva. La programación asíncrona implica que la aplicación iniciará una operación que se completará una vez en un momento posterior. La programación reactiva está estrechamente relacionada con la programación asíncrona, pero se basa en eventos asíncronos en lugar de en operaciones asíncronas. Los eventos asíncronos pueden no tener un "inicio" real, pueden ocurrir en cualquier momento y pueden activarse varias veces. Un ejemplo es la entrada del usuario.

Programación reactiva

Un estilo de programación declarativo en el que la aplicación reacciona a los acontecimientos.

Si consideras que una aplicación es una máquina de estados masiva, el comportamiento de la aplicación puede describirse como la reacción a una serie de eventos mediante la actualización de su estado en cada evento. Esto no es tan abstracto o teórico como parece; los marcos de trabajo modernos hacen que este enfoque sea bastante útil en aplicaciones del mundo real. La programación reactiva no es necesariamente concurrente, pero está estrechamente relacionada con la concurrencia, por lo que este libro cubre los aspectos básicos.

Normalmente, al escribir un programa concurrente se utiliza una mezcla de técnicas. La mayoría de las aplicaciones utilizan al menos multihilo (a través del grupo de hilos) y programación asíncrona. Siéntete libre de mezclar y combinar las distintas formas de concurrencia, utilizando la herramienta adecuada para cada parte de la aplicación .

Introducción a la programación asíncrona

La programación asíncrona tiene dos ventajas principales. La primera ventaja es para los programas GUI de usuario final: la programación asíncrona permite la capacidad de respuesta. Todo el mundo ha utilizado alguna vez un programa que se bloquea temporalmente mientras trabaja; un programa asíncrono puede seguir respondiendo a las entradas del usuario mientras trabaja. La segunda ventaja es para los programas del lado del servidor: la programación asíncrona permite la escalabilidad. Una aplicación de servidor puede escalar algo simplemente utilizando el pool de hilos, pero una aplicación de servidor asíncrona puede escalar normalmente un orden de magnitud mejor que eso.

Ambas ventajas de la programación asíncrona derivan del mismo aspecto subyacente: la programación asíncrona libera un hilo. Para los programas de interfaz gráfica de usuario, la programación asíncrona libera el hilo de la interfaz de usuario, lo que permite que la aplicación de interfaz gráfica de usuario siga respondiendo a las entradas del usuario. En las aplicaciones de servidor, la programación asíncrona libera hilos de petición, lo que permite al servidor utilizar sus hilos para atender más peticiones.

Las aplicaciones .NET asíncronas modernas utilizan dos palabras clave: async y await. La palabra clave async se añade a la declaración de un método, y cumple una doble función: habilita la palabra clave await dentro de ese método e indica al compilador que genere una máquina de estados para ese método, de forma similar a como funciona yield return. Un método async puede devolver Task<TResult> si devuelve un valor, Task si no devuelve ningún valor, o cualquier otro tipo "similar a una tarea", como ValueTask. Además, un método async puede devolver IAsyncEnumerable<T> o IAsyncEnumerator<T> si devuelve varios valores en una enumeración. Los tipos tipo tarea representan futuros; pueden notificar al código de llamada cuando el método async finaliza.

Advertencia

¡Evita async void ! Es posible que un método async devuelva void, pero sólo debes hacerlo si estás escribiendo un controlador de eventos async. Un método normal async sin valor de retorno debe devolver Task, no void.

Con estos antecedentes, veamos rápidamente un ejemplo:

async Task DoSomethingAsync()
{
  int value = 13;

  // Asynchronously wait 1 second.
  await Task.Delay(TimeSpan.FromSeconds(1));

  value *= 2;

  // Asynchronously wait 1 second.
  await Task.Delay(TimeSpan.FromSeconds(1));

  Trace.WriteLine(value);
}

Un método async comienza a ejecutarse de forma sincrónica, como cualquier otro método. Dentro de un método async, la palabra clave await realiza una espera asíncrona sobre su argumento. En primer lugar, comprueba si la operación ya se ha completado; si es así, continúa ejecutándose (de forma sincrónica). En caso contrario, pausará el método async y devolverá una tarea incompleta. Cuando esa operación se complete algún tiempo después, el método async reanudará la ejecución.

Puedes pensar que un método async tiene varias partes síncronas, divididas por sentencias await. La primera parte síncrona se ejecuta en cualquier hilo que llame al método, pero ¿dónde se ejecutan las otras partes síncronas? La respuesta es un poco complicada.

Cuando await una tarea (el escenario más común), se captura un contexto cuando el await decide pausar el método. Éste es el actual SynchronizationContext a menos que sea null, en cuyo caso el contexto es el actual TaskScheduler. El método reanuda la ejecución dentro de ese contexto capturado. Normalmente, este contexto es el contexto de la interfaz de usuario (si estás en el hilo de la interfaz de usuario) o el contexto del threadpool (la mayoría de las demás situaciones). Si tienes una aplicación ASP.NET Classic (pre-Core), entonces el contexto también podría ser un contexto de solicitud ASP.NET. ASP.NET Core utiliza el contexto threadpool en lugar de un contexto de solicitud especial.

Así, en el código anterior, todas las partes síncronas intentarán reanudarse en el contexto original. Si llamas a DoSomethingAsync desde un subproceso de la interfaz de usuario, cada una de sus partes síncronas se ejecutará en ese subproceso de la interfaz de usuario; pero si lo llamas desde un subproceso del grupo de subprocesos, cada una de sus partes síncronas se ejecutará en cualquier subproceso del grupo de subprocesos.

Puedes evitar este comportamiento por defecto esperando el resultado del método de extensión ConfigureAwait y pasando false por el parámetro continueOnCapturedContext. El siguiente código se iniciará en la hebra llamante, y después de ser pausado por un await, se reanudará en una hebra threadpool:

async Task DoSomethingAsync()
{
  int value = 13;

  // Asynchronously wait 1 second.
  await Task.Delay(TimeSpan.FromSeconds(1)).ConfigureAwait(false);

  value *= 2;

  // Asynchronously wait 1 second.
  await Task.Delay(TimeSpan.FromSeconds(1)).ConfigureAwait(false);

  Trace.WriteLine(value);
}
Consejo

Es una buena práctica llamar siempre a ConfigureAwait en tus métodos principales de "biblioteca", y reanudar el contexto sólo cuando lo necesites, en tus métodos externos de "interfaz de usuario".

La palabra clave await no se limita a trabajar con tareas; puede trabajar con cualquier tipo de aguardable que siga un determinado patrón. Como ejemplo, la Biblioteca de Clases Base incluye el tipo ValueTask<T>, que reduce las asignaciones de memoria si el resultado suele ser síncrono; por ejemplo, si el resultado puede leerse de una caché en memoria. ValueTask<T> no es directamente convertible a Task<T>, pero sí sigue el patrón awaitable, por lo que puedes utilizarlo directamente await. Hay otros ejemplos, y puedes construir los tuyos propios, pero la mayoría de las veces await tomará un Task o Task<TResult>.

Hay dos formas básicas de crear una instancia Task. Algunas tareas representan código real que debe ejecutar una CPU; estas tareas computacionales deben crearse llamando a Task.Run (o a TaskFactory.StartNew si necesitas que se ejecuten en un programador concreto). Otras tareas representan una notificación; este tipo de tareas basadas en eventos se crean mediante TaskCompletionSource<TResult> (o uno de sus atajos). La mayoría de las tareas de E/S utilizan TaskCompletionSource<TResult>.

La gestión de errores es natural con async y await. En el fragmento de código que sigue, PossibleExceptionAsync puede lanzar una NotSupportedException, pero TrySomethingAsync puede atrapar la excepción de forma natural. La excepción capturada conserva correctamente su rastro de pila y no se envuelve artificialmente en un TargetInvocationException o AggregateException:

async Task TrySomethingAsync()
{
  try
  {
    await PossibleExceptionAsync();
  }
  catch (NotSupportedException ex)
  {
    LogException(ex);
    throw;
  }
}

Cuando un método async lanza (o propaga) una excepción, ésta se coloca en su Task devuelto y se completa el Task. Cuando se espera ese Task, el operador await recuperará esa excepción y la (re)lanzará de forma que se conserve su rastro de pila original. Así, un código como el del ejemplo siguiente funcionaría como se espera si PossibleExceptionAsync fuera un método async:

async Task TrySomethingAsync()
{
  // The exception will end up on the Task, not thrown directly.
  Task task = PossibleExceptionAsync();

  try
  {
    // The Task's exception will be raised here, at the await.
    await task;
  }
  catch (NotSupportedException ex)
  {
    LogException(ex);
    throw;
  }
}

Hay otra directriz importante en lo que respecta a los métodos async: una vez que empieces a utilizar async, lo mejor es dejar que crezca a través de tu código. Si llamas a un método async, deberías (eventualmente) await la tarea que devuelve. Resiste la tentación de llamar a Task.Wait, Task<TResult>.Result, o GetAwaiter().GetResult(); hacerlo podría provocar un bloqueo. Considera el siguiente método:

async Task WaitAsync()
{
  // This await will capture the current context ...
  await Task.Delay(TimeSpan.FromSeconds(1));
  // ... and will attempt to resume the method here in that context.
}

void Deadlock()
{
  // Start the delay.
  Task task = WaitAsync();

  // Synchronously block, waiting for the async method to complete.
  task.Wait();
}

El código de este ejemplo se bloqueará si se llama desde un contexto UI o ASP.NET Classic, porque ambos contextos sólo permiten la entrada de un hilo a la vez. Deadlock llamará a WaitAsync, que inicia el retardo. Deadlock espera entonces (sincrónicamente) a que se complete ese método, bloqueando el hilo del contexto. Cuando finaliza el retardo, await intenta reanudar WaitAsync dentro del contexto capturado, pero no puede porque ya hay un hilo bloqueado en el contexto, y el contexto sólo permite un hilo a la vez. El bloqueo puede evitarse de dos formas: puedes utilizar ConfigureAwait(false) dentro de WaitAsync (lo que hace que await ignore su contexto), o puedes await la llamada a WaitAsync (convirtiendo Deadlock en un método async ).

Advertencia

Si utilizas async, es mejor que utilices async hasta el final.

Para una introducción más completa a async, la documentación en línea que Microsoft ha proporcionado para async es fantástica; recomiendo leer al menos la descripción general de la Programación Asíncrona y la descripción general del Patrón Asíncrono Basado en Tareas (TAP). Si quieres profundizar un poco más, también está la documentación Async in Depth.

Los flujos asíncronos de parten de las bases de async y await y las amplían para gestionar valores múltiples. Los flujos asíncronos se basan en el concepto de enumerables asíncronos, que son como los enumerables normales, salvo que permiten realizar un trabajo asíncrono al recuperar el siguiente elemento de la secuencia. Se trata de un concepto extremadamente potente que el Capítulo 3 trata con más detalle. Los flujos asíncronos de son especialmente útiles cuando tienes una secuencia de datos que llega de uno en uno o en trozos. Por ejemplo, si tu aplicación procesa la respuesta de una API que utiliza la paginación con los parámetros limit y offset, entonces los flujos asíncronos son una abstracción ideal. En el momento de escribir esto, los flujos asíncronos sólo están disponibles en las plataformas .NET más recientes.

Introducción a la programación paralela

La programación paralela debe utilizarse siempre que tengas una buena cantidad de trabajo de cálculo que pueda dividirse en trozos independientes. La programación paralela aumenta temporalmente el uso de la CPU para mejorar el rendimiento; esto es deseable en sistemas cliente donde las CPU suelen estar ociosas, pero no suele ser apropiado para sistemas servidor. La mayoría de los servidores tienen incorporado cierto paralelismo; por ejemplo, ASP.NET gestionará múltiples peticiones en paralelo. Escribir código paralelo en el servidor puede ser útil en algunas situaciones (si sabes que el número de usuarios simultáneos siempre será bajo), pero en general, la programación paralela en el servidor iría en contra de su paralelismo incorporado y, por tanto, no aportaría ningún beneficio real.

En existen dos formas de paralelismo: paralelismo de datos y paralelismo de tareas. El paralelismo de datos se da cuando tienes un conjunto de datos que procesar, y el procesamiento de cada dato es en su mayor parte independiente de los demás. El paralelismo de tareas es cuando tienes un conjunto de trabajos que realizar, y cada trabajo es en gran medida independiente de los demás. El paralelismo de tareas puede ser dinámico; si una tarea da lugar a varias tareas adicionales, éstas pueden añadirse al conjunto de tareas.

Hay algunas formas diferentes de hacer paralelismo de datos. Parallel.ForEach es similar a un bucle foreach y debe utilizarse siempre que sea posible. Parallel.ForEach se trata en la Receta 4.1. La clase Parallel también admite Parallel.For, que es similar a un bucle for, y puede utilizarse si el procesamiento de los datos depende del índice. El código que utiliza Parallel.ForEach tiene el siguiente aspecto:

void RotateMatrices(IEnumerable<Matrix> matrices, float degrees)
{
  Parallel.ForEach(matrices, matrix => matrix.Rotate(degrees));
}

Otra opción de es PLINQ (Parallel LINQ), que proporciona un método de extensión AsParallel para las consultas LINQ. Parallel es más respetuoso con los recursos que PLINQ; Parallel jugará mejor con otros procesos del sistema, mientras que PLINQ intentará (por defecto) repartirse entre todas las CPU. El inconveniente de Parallel es que es más explícito; PLINQ en muchos casos tiene un código más elegante. PLINQ se trata en la Receta 4.5 y tiene este aspecto:

IEnumerable<bool> PrimalityTest(IEnumerable<int> values)
{
  return values.AsParallel().Select(value => IsPrime(value));
}

Independientemente del método que elijas, hay una directriz que destaca a la hora de realizar un procesamiento paralelo.

Consejo

Los trozos de trabajo deben ser tan independientes entre sí como sea posible.

Mientras tu trozo de trabajo sea independiente de todos los demás trozos, maximizas tu paralelismo. En cuanto empiezas a compartir estado entre varios subprocesos, tienes que sincronizar el acceso a ese estado compartido, y tu aplicación se vuelve menos paralela. El Capítulo 12 trata la sincronización con más detalle.

La salida de tu procesamiento paralelo puede manejarse de varias formas. Puedes colocar los resultados en algún tipo de colección concurrente, o puedes agregar los resultados en un resumen. La agregación es habitual en el procesamiento paralelo; este tipo de funcionalidad mapa/reducción también está soportada por las sobrecargas de métodos de la clase Parallel. La receta 4.2 trata la agregación con más detalle.

Ahora pasemos al paralelismo de tareas. El paralelismo de datos se centra en el procesamiento de datos; el paralelismo de tareas se limita a realizar trabajo. A alto nivel, el paralelismo de datos y el paralelismo de tareas son similares; "procesar datos" es un tipo de "trabajo". Muchos problemas de paralelismo pueden resolverse de ambas formas; es conveniente utilizar la API que resulte más natural para el problema en cuestión.

Parallel.Invoke es un método del tipo de Parallel que realiza una especie de paralelismo de tareas fork/join. Este método se trata en la Receta 4.3; sólo tienes que pasar los delegados que quieras ejecutar en paralelo:

void ProcessArray(double[] array)
{
  Parallel.Invoke(
      () => ProcessPartialArray(array, 0, array.Length / 2),
      () => ProcessPartialArray(array, array.Length / 2, array.Length)
  );
}

void ProcessPartialArray(double[] array, int begin, int end)
{
  // CPU-intensive processing...
}

El tipo Task se introdujo originalmente para el paralelismo de tareas, aunque hoy en día también se utiliza para la programación asíncrona. Una instancia de Task -tal como se utiliza en el paralelismo de tareas- representa algún trabajo. Puedes utilizar el método Wait para esperar a que se complete una tarea, y puedes utilizar las propiedades Result y Exception para recuperar los resultados de ese trabajo. El código que utiliza directamente Task es más complejo que el que utiliza Parallel, pero puede ser útil si no conoces la estructura del paralelismo hasta el momento de la ejecución. Con este tipo de paralelismo dinámico, no sabes cuántas piezas de trabajo necesitas hacer al principio del procesamiento; lo averiguas a medida que avanzas. Generalmente, una pieza de trabajo dinámica debe iniciar las tareas hijas que necesite y luego esperar a que se completen. El tipo Task tiene una bandera especial, TaskCreationOptions.AttachedToParent, que podrías utilizar para esto. El paralelismo dinámico se trata en la Receta 4.4.

El paralelismo de tareas debe esforzarse por ser independiente, igual que el paralelismo de datos. Cuanto más independientes puedan ser tus delegados, más eficiente será tu programa. Además, si tus delegados no son independientes, tendrán que estar sincronizados, y es más difícil escribir código correcto si ese código necesita sincronización. Con el paralelismo de tareas, ten especial cuidado con las variables capturadas en cierres. Recuerda que los cierres capturan referencias (no valores), por lo que puedes acabar compartiendo cosas que no son obvias.

La gestión de errores es similar para todos los tipos de paralelismo. Dado que las operaciones se realizan en paralelo, es posible que se produzcan varias excepciones, por lo que se envuelven en un AggregateException que se lanza a tu código. Este comportamiento es consistente en Parallel.ForEach, Parallel.Invoke, Task.Wait, etc. El tipo AggregateException tiene algunos métodos útiles Flatten y Handle para simplificar el código de gestión de errores:

try
{
  Parallel.Invoke(() => { throw new Exception(); },
      () => { throw new Exception(); });
}
catch (AggregateException ex)
{
  ex.Handle(exception =>
  {
    Trace.WriteLine(exception);
    return true; // "handled"
  });
}

Normalmente, no tienes que preocuparte de cómo se gestiona el trabajo en el grupo de hilos. El paralelismo de datos y tareas utiliza particionadores que se ajustan dinámicamente para dividir el trabajo entre los subprocesos de los trabajadores. El pool de hilos aumenta su número de hilos según sea necesario. El grupo de hilos tiene una única cola de trabajo, y cada hilo del grupo de hilos tiene también su propia cola de trabajo. Cuando un subproceso de la agrupación de subprocesos pone en cola trabajo adicional, lo envía primero a su propia cola porque el trabajo suele estar relacionado con el elemento de trabajo actual; este comportamiento anima a los subprocesos a trabajar en su propio trabajo y maximiza los aciertos de la caché. Si otro subproceso no tiene trabajo que hacer, robará trabajo de la cola de otro subproceso. Microsoft se ha esforzado mucho para que el grupo de hilos sea lo más eficiente posible, y hay un gran número de botones que puedes ajustar si necesitas el máximo rendimiento. Mientras tus tareas no sean extremadamente cortas, deberían funcionar bien con la configuración por defecto.

Consejo

Las tareas no deben ser ni extremadamente cortas, ni extremadamente largas.

Si tus tareas son demasiado cortas, entonces la sobrecarga de dividir los datos en tareas y programar esas tareas en el pool de hilos se vuelve significativa. Si tus tareas son demasiado largas, el grupo de hilos no puede ajustar dinámicamente su equilibrio de trabajo de forma eficiente. Es difícil determinar qué tan corto es demasiado corto y qué tan largo es demasiado largo; realmente depende del problema que se esté resolviendo y de las capacidades aproximadas del hardware. Como norma general, yo intento que mis tareas sean lo más cortas posible sin encontrarme con problemas de rendimiento (verás que tu rendimiento se degrada repentinamente cuando tus tareas sean demasiado cortas). Mejor aún, en lugar de utilizar tareas directamente, utiliza el tipo Parallel o PLINQ. Estas formas de paralelismo de más alto nivel llevan incorporado el particionamiento para gestionar esto automáticamente por ti (y ajustarlo según sea necesario en tiempo de ejecución).

Si quieres profundizar en la programación paralela, el mejor libro sobre el tema es Parallel Programming with Microsoft .NET, de Colin Campbell et al. (Microsoft Press).

Introducción a la Programación Reactiva (Rx)

La programación reactiva tiene una curva de aprendizaje más alta que otras formas de concurrencia, y el código puede ser más difícil de mantener a menos que mantengas al día tus habilidades reactivas. Sin embargo, si estás dispuesto a aprenderla, la programación reactiva es extremadamente potente. La programación reactiva te permite tratar un flujo de eventos como un flujo de datos. Como regla general, si utilizas cualquiera de los argumentos de evento pasados a un evento, entonces tu código se beneficiaría de utilizar System.Reactive en lugar de un manejador de eventos normal.

Consejo

System.Reactive solía llamarse Extensiones Reactivas, que a menudo se abreviaba como "Rx". Los tres términos se refieren a la misma tecnología.

La programación reactiva se basa en la noción de flujos observables. Cuando te suscribes a un flujo observable, recibirás cualquier número de elementos de datos (OnNext), y luego el flujo puede terminar con un único error (OnError) o una notificación de "fin de flujo" (OnCompleted). Algunos flujos observables no terminan nunca. Las interfaces reales tienen el siguiente aspecto:

interface IObserver<in T>
{
  void OnNext(T item);
  void OnCompleted();
  void OnError(Exception error);
}

interface IObservable<out T>
{
  IDisposable Subscribe(IObserver<TResult> observer);
}

Sin embargo, nunca debes implementar estas interfaces. La biblioteca System.Reactive (Rx) de Microsoft tiene todas las implementaciones que puedas necesitar. El código reactivo acaba pareciéndose mucho a LINQ; puedes pensar en él como "LINQ to Events". System.Reactive tiene todo lo que hace LINQ y añade un gran número de operadores propios, sobre todo los que tienen que ver con el tiempo. El siguiente código comienza con algunos operadores desconocidos (Interval y Timestamp) y termina con un Subscribe, pero en medio hay algunos operadores Where y Select que deberían resultarte familiares de LINQ:

Observable.Interval(TimeSpan.FromSeconds(1))
    .Timestamp()
    .Where(x => x.Value % 2 == 0)
    .Select(x => x.Timestamp)
    .Subscribe(x => Trace.WriteLine(x));

El código de ejemplo comienza con un contador que funciona con un temporizador periódico (Interval) y añade una marca de tiempo a cada evento (Timestamp). A continuación, filtra los eventos para que sólo incluyan valores pares del contador (Where), selecciona los valores de la marca de tiempo (Timestamp) y, cuando llega cada valor de la marca de tiempo resultante, lo escribe en el depurador (Subscribe). No te preocupes si no entiendes los nuevos operadores, como Interval: se tratan más adelante en este libro. Por ahora, ten en cuenta que se trata de una consulta LINQ muy similar a las que ya conoces. La principal diferencia es que LINQ to Objects y LINQ to Entities utilizan un modelo "pull", en el que la enumeración de una consulta LINQ arrastra los datos a través de la consulta, mientras que LINQ to Events (System.Reactive) utiliza un modelo "push", en el que los eventos llegan y viajan por sí mismos a través de la consulta.

La definición de un flujo observable es independiente de sus suscripciones. El último ejemplo es igual que el código siguiente:

IObservable<DateTimeOffset> timestamps =
    Observable.Interval(TimeSpan.FromSeconds(1))
        .Timestamp()
        .Where(x => x.Value % 2 == 0)
        .Select(x => x.Timestamp);
timestamps.Subscribe(x => Trace.WriteLine(x));

Lo normal es que un tipo defina los flujos observables y los ponga a disposición como recurso IObservable<TResult>. Otros tipos pueden entonces suscribirse a esos flujos o combinarlos con otros operadores para crear otro flujo observable.

Una suscripción System.Reactive también es un recurso. Los operadores Subscribe devuelven un IDisposable que representa la suscripción. Cuando tu código termine de escuchar un flujo observable, debe deshacerse de su suscripción.

Suscripciones se comportan de forma diferente con observables calientes y fríos. Un observable caliente es un flujo de eventos que siempre está ocurriendo, y si no hay suscriptores cuando llegan los eventos, se pierden. Por ejemplo, el movimiento del ratón es un observable caliente. Un observable frío es un observable que no tiene eventos entrantes todo el tiempo. Un observable frío reaccionará a una suscripción iniciando la secuencia de eventos. Por ejemplo, una descarga HTTP es un observable frío; la suscripción hace que se envíe la solicitud HTTP.

El operador Subscribe siempre debe tomar también un parámetro de gestión de errores. Los ejemplos anteriores no lo hacen; el siguiente es un ejemplo mejor que responderá adecuadamente si el flujo observable termina en un error:

Observable.Interval(TimeSpan.FromSeconds(1))
    .Timestamp()
    .Where(x => x.Value % 2 == 0)
    .Select(x => x.Timestamp)
    .Subscribe(x => Trace.WriteLine(x),
        ex => Trace.WriteLine(ex));

Subject<TResult> es un tipo que resulta útil cuando se experimenta con System.Reactive. Este "sujeto" es como una implementación manual de un flujo observable. Tu código puede llamar a OnNext, OnError, y OnCompleted, y el sujeto reenviará esas llamadas a sus suscriptores. Subject<TResult> es estupendo para experimentar, pero en código de producción, debes esforzarte por utilizar operadores como los que se tratan en el Capítulo 6.

En hay montones de operadores System.Reactive útiles, y en este libro sólo cubro algunos seleccionados. Para obtener más información sobre System.Reactive, te recomiendo el excelente libro en línea Introducción a Rx.

Introducción a los flujos de datos

TPL Dataflow es una interesante mezcla de tecnologías asíncronas y paralelas. Es útil cuando tienes una secuencia de procesos que deben aplicarse a tus datos. Por ejemplo, puede que necesites descargar datos de una URL, analizarlos y luego procesarlos en paralelo con otros datos. TPL Dataflow se utiliza habitualmente como una simple tubería, en la que los datos entran por un extremo y viajan hasta salir por el otro. Sin embargo, TPL Dataflow es mucho más potente que esto; es capaz de manejar cualquier tipo de malla. Puedes definir bifurcaciones, uniones y bucles en una malla, y TPL Dataflow los manejará adecuadamente. Sin embargo, la mayoría de las veces, las mallas TPL Dataflow se utilizan como una canalización.

La unidad básica de construcción de una malla de flujo de datos es un bloque de flujo de datos. Un bloque puede ser un bloque destino (que recibe datos), un bloque fuente (que produce datos), o ambos. Los bloques fuente pueden vincularse a los bloques destino para crear la malla; la vinculación se trata en la Receta 5.1. Los bloques son semiindependientes; intentarán procesar los datos a medida que lleguen y empujar los resultados aguas abajo. La forma habitual de utilizar TPL Dataflow es crear todos los bloques, enlazarlos entre sí y empezar a introducir datos por un extremo. A continuación, los datos salen solos por el otro extremo. De nuevo, Dataflow es más potente que esto; es posible romper los enlaces y crear nuevos bloques y añadirlos a la malla mientras hay datos fluyendo por ella, pero eso es un escenario muy avanzado.

Los bloques de destino tienen búferes para los datos que reciben. Tener búferes les permite aceptar nuevos elementos de datos aunque aún no estén listos para procesarlos; esto mantiene el flujo de datos a través de la malla. Este almacenamiento en búfer puede causar problemas en situaciones de bifurcación, en las que un bloque fuente está vinculado a dos bloques destino. Cuando el bloque fuente tiene datos que enviar aguas abajo, empieza a ofrecerlos a sus bloques enlazados de uno en uno. Por defecto, el primer bloque de destino sólo tomaría los datos y los almacenaría en el búfer, y el segundo bloque de destino nunca los recibiría. La solución a esta situación es limitar los búferes de los bloques destino haciéndolos no vanos; la receta 5.4 lo explica.

Un bloque fallará cuando algo vaya mal, por ejemplo, si el delegado de procesamiento lanza una excepción al procesar un elemento de datos. Cuando un bloque falla, deja de recibir datos. Por defecto, no derribará toda la malla; esto te permite reconstruir esa parte de la malla o redirigir los datos. Sin embargo, éste es un escenario avanzado; la mayoría de las veces, querrás que los fallos se propaguen a lo largo de los enlaces hasta los bloques de destino. Dataflow también admite esta opción; la única parte complicada es que cuando una excepción se propaga a lo largo de un enlace, se envuelve en un AggregateException. Así que, si tienes una canalización larga, podrías acabar con una excepción profundamente anidada; el método AggregateException.Flatten puede utilizarse para solucionar esto:

try
{
  var multiplyBlock = new TransformBlock<int, int>(item =>
  {
    if (item == 1)
      throw new InvalidOperationException("Blech.");
    return item * 2;
  });
  var subtractBlock = new TransformBlock<int, int>(item => item - 2);
  multiplyBlock.LinkTo(subtractBlock,
      new DataflowLinkOptions { PropagateCompletion = true });

  multiplyBlock.Post(1);
  subtractBlock.Completion.Wait();
}
catch (AggregateException exception)
{
  AggregateException ex = exception.Flatten();
  Trace.WriteLine(ex.InnerException);
}

La receta 5.2 trata con más detalle la gestión de errores en el flujo de datos.

En a primera vista, las mallas de flujo de datos suenan muy parecidas a los flujos observables, y tienen mucho en común. Tanto las mallas como los flujos tienen el concepto de elementos de datos que pasan a través de ellas. Además, tanto las mallas como los flujos tienen la noción de una finalización normal (una notificación de que no hay más datos en camino), así como una finalización por fallo (una notificación de que se ha producido algún error durante el procesamiento de los datos). Pero System.Reactive (Rx) y TPL Dataflow no tienen las mismas capacidades. Los observables Rx suelen ser mejores que los bloques de flujo de datos cuando hacen algo relacionado con la temporización. Los bloques de flujo de datos suelen ser mejores que los observables de Rx cuando realizan procesamiento paralelo. Conceptualmente, los observables Rx funcionan más como las retrollamadas: cada paso del observable llama directamente al paso siguiente. En cambio, cada bloque de una malla de flujo de datos es muy independiente de todos los demás bloques. Tanto Rx como TPL Dataflow tienen sus propios usos, con cierto solapamiento. También funcionan bastante bien juntos; la receta 8.8 trata de la interoperabilidad entre Rx y TPL Dataflow.

Si estás familiarizado con los frameworks de actores, TPL Dataflow parecerá compartir similitudes con ellos. Cada bloque de flujo de datos es independiente, en el sentido de que generará tareas para realizar el trabajo necesario, como ejecutar un delegado de transformación o enviar la salida al siguiente bloque. También puedes configurar cada bloque para que se ejecute en paralelo, de modo que inicie varias tareas para gestionar entradas adicionales. Debido a este comportamiento, cada bloque tiene cierta similitud con un actor en un marco de trabajo de actores. Sin embargo, TPL Dataflow no es un marco actoral completo; en concreto, no incorpora soporte para la recuperación de errores limpios ni reintentos de ningún tipo. TPL Dataflow es una biblioteca que se asemeja a un actor, pero no es un marco actoral completo.

Los tipos de bloque TPL Dataflow más comunes son TransformBlock<TInput, TOutput> (similar a Select de LINQ), TransformManyBlock<TInput, TOutput> (similar a SelectMany de LINQ) y ActionBlock<TResult>, que ejecuta un delegado para cada elemento de datos. Para más información sobre TPL Dataflow, te recomiendo la documentación de MSDN y la "Guía para implementar bloques TPL Dataflow personalizados".

Introducción a la programación multihilo

Un hilo es un ejecutor independiente. Cada proceso tiene varios hilos en su interior, y cada uno de esos hilos puede estar haciendo cosas diferentes simultáneamente. Cada hilo tiene su propia pila independiente, pero comparte la misma memoria con todos los demás hilos de un proceso. En algunas aplicaciones, hay un hilo que es especial. Por ejemplo, las aplicaciones de interfaz de usuario tienen un único hilo especial de interfaz de usuario, y las aplicaciones de consola tienen un único hilo especial principal.

Toda aplicación .NET tiene un pool de hilos. El pool de hilos mantiene un número de hilos trabajadores que están esperando para ejecutar cualquier trabajo que les hayas encomendado. El pool de hilos es responsable de determinar cuántos hilos hay en el pool de hilos en cada momento. Hay docenas de opciones de configuración con las que puedes jugar para modificar este comportamiento, pero te recomiendo que lo dejes estar; el grupo de hilos se ha ajustado cuidadosamente para cubrir la gran mayoría de los escenarios del mundo real.

Prácticamente no es necesario que crees nunca un nuevo subproceso. La única vez que debes crear una instancia de Thread es si necesitas un subproceso STA para la interoperabilidad COM.

Un hilo es una abstracción de bajo nivel. El pool de hilos es un nivel de abstracción ligeramente superior; cuando el código encola trabajo al pool de hilos, el propio pool de hilos se encargará de crear un hilo si es necesario. Las abstracciones tratadas en este libro son aún más elevadas: las colas de procesamiento paralelo y de flujo de datos trabajan con el repositorio de hilos según sea necesario. El código que utiliza estas abstracciones superiores es más fácil de hacer bien que el código que utiliza abstracciones de bajo nivel.

Por esta razón, los tipos Thread y BackgroundWorker no se tratan en absoluto en este libro. Ya han tenido su tiempo, y ese tiempo se ha acabado.

Colecciones para aplicaciones concurrentes

En hay un par de categorías de colecciones útiles para la programación concurrente: las colecciones concurrentes y las colecciones inmutables. Ambas categorías de colecciones se tratan en el Capítulo 9. Las colecciones concurrentes permiten que varios subprocesos las actualicen simultáneamente de forma segura. La mayoría de las colecciones concurrentes de utilizan instantáneas para permitir que un subproceso enumere los valores mientras otro subproceso puede estar añadiendo o eliminando valores. Las colecciones concurrentes suelen ser más eficientes que proteger una colección normal con un bloqueo.

Las colecciones inmutables de son un poco diferentes. En realidad, una colección inmutable no se puede modificar; en su lugar, para modificar una colección inmutable, creas una nueva colección que representa la colección modificada. Esto suena terriblemente ineficiente, pero las colecciones inmutables comparten tanta memoria como es posible entre las instancias de la colección, así que no es tan malo como parece. Lo bueno de las colecciones inmutables es que todas las operaciones son puras, por lo que funcionan muy bien con código funcional.

Diseño moderno

La mayoría de las tecnologías concurrentes de tienen un aspecto similar: son funcionales por naturaleza. No me refiero a funcional como "hacen el trabajo", sino a funcional como un estilo de programación que se basa en la composición de funciones. Si adoptas una mentalidad funcional, tus diseños concurrentes serán menos enrevesados.

Un principio de la programación funcional es la pureza (es decir, evitar los efectos secundarios). Cada pieza de la solución toma algún(os) valor(es) como entrada y produce algún(os) valor(es) como salida. En la medida de lo posible, debes evitar que estas piezas dependan de variables globales (o compartidas) o actualicen estructuras de datos globales (o compartidas). Esto es así tanto si la pieza es un método async, una tarea paralela, una operación System.Reactive o un bloque de flujo de datos. Por supuesto, tarde o temprano tus cálculos tendrán que tener un efecto, pero verás que tu código es más limpio si puedes manejar el procesamiento con piezas puras y luego realizar actualizaciones con los resultados.

Otro principio de la programación funcional es la inmutabilidad. La inmutabilidad significa que un dato no puede cambiar. Una razón por la que los datos inmutables son útiles para los programas concurrentes es que nunca necesitas sincronización para los datos inmutables; el hecho de que no puedan cambiar hace innecesaria la sincronización. Los datos inmutables también te ayudan a evitar los efectos secundarios. Los desarrolladores están empezando a utilizar más tipos inmutables, y este libro tiene varias recetas que cubren las estructuras de datos inmutables.

Resumen de las tecnologías clave

El marco .NET ha tenido cierto soporte para la programación asíncrona desde el principio. Sin embargo, la programación asíncrona fue difícil hasta 2012, cuando .NET 4.5 (junto con C# 5.0 y VB 2012) introdujo las palabras clave async y await. Este libro utilizará el enfoque moderno async/await para todas las recetas asíncronas, y tiene algunas recetas que muestran cómo interoperar entre async y los patrones de programación asíncrona más antiguos. Si necesitas soporte para plataformas más antiguas, consulta el Apéndice A.

La biblioteca Task Parallel Library se introdujo en .NET 4.0 con soporte completo tanto para el paralelismo de datos como de tareas. Hoy en día, está disponible incluso en plataformas con menos recursos, como los teléfonos móviles. La TPL está integrada en .NET.

El equipo de System.Reactive ha trabajado duro para dar soporte al mayor número de plataformas posible. System.Reactive, al igual que async y await, proporciona ventajas para todo tipo de aplicaciones, tanto de cliente como de servidor. System.Reactive está disponible en el paquete System.Reactive paquete NuGet.

La biblioteca TPL Dataflow se distribuye oficialmente dentro del paquete NuGet para System.Threading.Tasks.Dataflow.

La mayoría de las colecciones concurrentes de están integradas en .NET; hay algunas colecciones concurrentes adicionales disponibles en el paquete System.Threading.Channels paquete NuGet. Las colecciones inmutables están disponibles en el paquete System.Collections.Immutable paquete NuGet.

Get Libro de cocina de la concurrencia en C#, 2ª edición now with the O’Reilly learning platform.

O’Reilly members experience books, live events, courses curated by job role, and more from O’Reilly and nearly 200 top publishers.