Ejecutores de Java: cómo ser notificado, sin locking, cuando una tarea se completa?

Supongamos que tengo una cola llena de tareas que debo enviar a un servicio ejecutor. Quiero que se procesen uno a la vez. La manera más simple que puedo pensar es:

  1. Tomar una tarea de la cola
  2. Presentarlo al ejecutor
  3. Llamar .get en el futuro devuelto y bloquear hasta que haya un resultado disponible
  4. Toma otra tarea de la cola …

Sin embargo, estoy tratando de evitar el locking por completo. Si tengo 10.000 de esas colas, que necesitan que sus tareas se procesen de a una por vez, me quedaré sin espacio en la stack porque la mayoría de ellas se mantendrá en los hilos bloqueados.

Lo que me gustaría es enviar una tarea y proporcionar una callback que se llama cuando la tarea se completa. Usaré esa notificación de callback como una bandera para enviar la siguiente tarea. (functionaljava y jetlang aparentemente usan tales algoritmos de no locking, pero no puedo entender su código)

¿Cómo puedo hacer eso usando java.util.concurrent de JDK, sin escribir mi propio servicio de ejecutor?

(La cola que me da de comer estas tareas puede en sí misma bloquear, pero ese es un problema que se abordará más adelante)

Defina una interfaz de callback para recibir los parámetros que desee transmitir en la notificación de finalización. Luego inícielo al final de la tarea.

Incluso podría escribir un contenedor general para tareas ejecutables y enviarlas a ExecutorService . O bien, vea a continuación un mecanismo integrado en Java 8.

 class CallbackTask implements Runnable { private final Runnable task; private final Callback callback; CallbackTask(Runnable task, Callback callback) { this.task = task; this.callback = callback; } public void run() { task.run(); callback.complete(); } } 

Con CompletableFuture , Java 8 incluyó un medio más elaborado para componer tuberías donde los procesos se pueden completar de forma asincrónica y condicional. Aquí hay un ejemplo artificial pero completo de notificación.

 import java.util.concurrent.CompletableFuture; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; public class GetTaskNotificationWithoutBlocking { public static void main(String... argv) throws Exception { ExampleService svc = new ExampleService(); GetTaskNotificationWithoutBlocking listener = new GetTaskNotificationWithoutBlocking(); CompletableFuture f = CompletableFuture.supplyAsync(svc::work); f.thenAccept(listener::notify); System.out.println("Exiting main()"); } void notify(String msg) { System.out.println("Received message: " + msg); } } class ExampleService { String work() { sleep(7000, TimeUnit.MILLISECONDS); /* Pretend to be busy... */ char[] str = new char[5]; ThreadLocalRandom current = ThreadLocalRandom.current(); for (int idx = 0; idx < str.length; ++idx) str[idx] = (char) ('A' + current.nextInt(26)); String msg = new String(str); System.out.println("Generated message: " + msg); return msg; } public static void sleep(long average, TimeUnit unit) { String name = Thread.currentThread().getName(); long timeout = Math.min(exponential(average), Math.multiplyExact(10, average)); System.out.printf("%s sleeping %d %s...%n", name, timeout, unit); try { unit.sleep(timeout); System.out.println(name + " awoke."); } catch (InterruptedException abort) { Thread.currentThread().interrupt(); System.out.println(name + " interrupted."); } } public static long exponential(long avg) { return (long) (avg * -Math.log(1 - ThreadLocalRandom.current().nextDouble())); } } 

Utilice la API futura de escucha de Guava y agregue una callback. Cf. del sitio web:

 ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10)); ListenableFuture explosion = service.submit(new Callable() { public Explosion call() { return pushBigRedButton(); } }); Futures.addCallback(explosion, new FutureCallback() { // we want this handler to run immediately after we push the big red button! public void onSuccess(Explosion explosion) { walkAwayFrom(explosion); } public void onFailure(Throwable thrown) { battleArchNemesis(); // escaped the explosion! } }); 

En Java 8 puedes usar CompletableFuture . Aquí hay un ejemplo que tenía en mi código donde lo estoy usando para buscar usuarios de mi servicio al usuario, asignarlos a mis objetos de visualización y luego actualizar mi vista o mostrar un cuadro de diálogo de error (esta es una aplicación GUI):

  CompletableFuture.supplyAsync( userService::listUsers ).thenApply( this::mapUsersToUserViews ).thenAccept( this::updateView ).exceptionally( throwable -> { showErrorDialogFor(throwable); return null; } ); 

Se ejecuta de forma asincrónica. Estoy usando dos métodos privados: mapUsersToUserViews y updateView .

Puede extender la clase FutureTask y anular el método done() , luego agregar el objeto FutureTask al ExecutorService , por lo que el método done() invocará cuando FutureTask complete inmediatamente.

ThreadPoolExecutor también tiene los beforeExecute y afterExecute hook que puede anular y usar. Aquí está la descripción de los ThreadPoolExecutor de ThreadPoolExecutor .

Métodos de gancho

Esta clase proporciona métodos beforeExecute(java.lang.Thread, java.lang.Runnable) antes de beforeExecute(java.lang.Thread, java.lang.Runnable) y afterExecute(java.lang.Runnable, java.lang.Throwable) antes y después de la ejecución de cada tarea. Estos se pueden usar para manipular el entorno de ejecución; por ejemplo, reiniciando ThreadLocals , recostackndo estadísticas o agregando entradas de registro. Además, el método terminated() puede anularse para realizar cualquier procesamiento especial que deba realizarse una vez que el Executor haya terminado por completo. Si los métodos hook o callback arrojan excepciones, los subprocesos internos del trabajador pueden a su vez fallar y terminar abruptamente.

Use un CountDownLatch .

Es de java.util.concurrent y es exactamente la manera de esperar que varios hilos completen la ejecución antes de continuar.

Para lograr el efecto de callback que está cuidando, eso requiere un poco de trabajo extra adicional. A saber, manejando esto usted mismo en un hilo separado que usa el CountDownLatch y lo espera, luego continúa notificando lo que sea que necesite notificar. No hay soporte nativo para callback ni nada similar a ese efecto.


EDITAR: ahora que entiendo tu pregunta, creo que estás llegando demasiado lejos, innecesariamente. Si toma un SingleThreadExecutor normal, dele todas las tareas y hará las colas de forma nativa.

Si quiere asegurarse de que no se ejecutarán tareas al mismo tiempo, utilice un SingleThreadedExecutor . Las tareas se procesarán en el orden en que se envíen. Ni siquiera necesita realizar las tareas, simplemente envíelas al ejecutivo.

Solo para agregar a la respuesta de Matt, que ayudó, aquí hay un ejemplo más desarrollado para mostrar el uso de una callback.

 private static Primes primes = new Primes(); public static void main(String[] args) throws InterruptedException { getPrimeAsync((p) -> System.out.println("onPrimeListener; p=" + p)); System.out.println("Adios mi amigito"); } public interface OnPrimeListener { void onPrime(int prime); } public static void getPrimeAsync(OnPrimeListener listener) { CompletableFuture.supplyAsync(primes::getNextPrime) .thenApply((prime) -> { System.out.println("getPrimeAsync(); prime=" + prime); if (listener != null) { listener.onPrime(prime); } return prime; }); } 

El resultado es:

  getPrimeAsync(); prime=241 onPrimeListener; p=241 Adios mi amigito 

Código simple para implementar el mecanismo de Callback utilizando ExecutorService

 import java.util.concurrent.*; import java.util.*; public class CallBackDemo{ public CallBackDemo(){ System.out.println("creating service"); ExecutorService service = Executors.newFixedThreadPool(5); try{ for ( int i=0; i<5; i++){ Callback callback = new Callback(i+1); MyCallable myCallable = new MyCallable((long)i+1,callback); Future future = service.submit(myCallable); //System.out.println("future status:"+future.get()+":"+future.isDone()); } }catch(Exception err){ err.printStackTrace(); } service.shutdown(); } public static void main(String args[]){ CallBackDemo demo = new CallBackDemo(); } } class MyCallable implements Callable{ Long id = 0L; Callback callback; public MyCallable(Long val,Callback obj){ this.id = val; this.callback = obj; } public Long call(){ //Add your business logic System.out.println("Callable:"+id+":"+Thread.currentThread().getName()); callback.callbackMethod(); return id; } } class Callback { private int i; public Callback(int i){ this.i = i; } public void callbackMethod(){ System.out.println("Call back:"+i); // Add your business logic } } 

salida:

 creating service Callable:1:pool-1-thread-1 Call back:1 Callable:3:pool-1-thread-3 Callable:2:pool-1-thread-2 Call back:2 Callable:5:pool-1-thread-5 Call back:5 Call back:3 Callable:4:pool-1-thread-4 Call back:4 

Notas clave:

  1. Si desea procesar las tareas en secuencia en orden FIFO, reemplace newFixedThreadPool(5) con newFixedThreadPool(1)
  2. Si desea procesar la siguiente tarea después de analizar el resultado de la callback de callback de la tarea anterior, simplemente sin comentar debajo de la línea

     //System.out.println("future status:"+future.get()+":"+future.isDone()); 
  3. Puede reemplazar newFixedThreadPool() con uno de

     Executors.newCachedThreadPool() Executors.newWorkStealingPool() ThreadPoolExecutor 

    dependiendo de tu caso de uso.

  4. Si quieres manejar el método de callback de forma asíncrona

    a. Pasar un ExecutorService or ThreadPoolExecutor compartido ExecutorService or ThreadPoolExecutor a la tarea invocable

    segundo. Convierta su método Callable/Runnable tarea Callable/Runnable

    do. Presione la tarea de callback a ExecutorService or ThreadPoolExecutor

Esta es una extensión de la respuesta de Pache usando el ListenableFuture de Guava.

En particular, Futures.transform() devuelve ListenableFuture por lo que se puede utilizar para encadenar llamadas asincrónicas. Futures.addCallback() devuelve void , por lo que no se puede utilizar para el encadenamiento, pero es bueno para manejar el éxito / falla en una finalización asincrónica.

 // ListenableFuture1: Open Database ListenableFuture database = service.submit(() -> openDatabase()); // ListenableFuture2: Query Database for Cursor rows ListenableFuture cursor = Futures.transform(database, database -> database.query(table, ...)); // ListenableFuture3: Convert Cursor rows to List ListenableFuture> fooList = Futures.transform(cursor, cursor -> cursorToFooList(cursor)); // Final Callback: Handle the success/errors when final future completes Futures.addCallback(fooList, new FutureCallback>() { public void onSuccess(List foos) { doSomethingWith(foos); } public void onFailure(Throwable thrown) { log.error(thrown); } }); 

NOTA: además de encadenar tareas asíncronas, Futures.transform() también le permite progtwigr cada tarea en un ejecutor por separado (no se muestra en este ejemplo).

Puede usar una implementación de Callable tal que

 public class MyAsyncCallable implements Callable { CallbackInterface ci; public MyAsyncCallable(CallbackInterface ci) { this.ci = ci; } public V call() throws Exception { System.out.println("Call of MyCallable invoked"); System.out.println("Result = " + this.ci.doSomething(10, 20)); return (V) "Good job"; } } 

donde CallbackInterface es algo muy básico como

 public interface CallbackInterface { public int doSomething(int a, int b); } 

y ahora la clase principal se verá así

 ExecutorService ex = Executors.newFixedThreadPool(2); MyAsyncCallable mac = new MyAsyncCallable((a, b) -> a + b); ex.submit(mac);