Ayuda con ThreadPool

Estimadísimos, mucho gusto.

Cómo puedo obtener la referencia de un thread en ejecución dentro de la cola de espera? Estoy usando la clase ThreadPoolExecutor para crear el pool, y una cola de tipo BlockingQueue.

Lo que necesito saber es como acceder a un hilo en particular en la cola, o preguntar si determinado hilo está vivo, muerto, esperando, etc..en el pool. es posible tener o manejar esto?

muchas gracias desde ya

Opciones de visualización de comentarios

Seleccione la forma que prefiera para mostrar los comentarios y haga clic en «Guardar las opciones» para activar los cambios.
Imagen de ezamudio

que yo sepa no

El ThreadPoolExecutor administra sus hilos y tiene algunos métodos para poder asomarte a su estado interno pero en general, no te permite saber el estado de algún hilo en particular, ni siquiera saber cómo distinguirlos.

La única manera que podrías hacer esto es que las tareas que arrojas al pool tomen el hilo que las ejecuta y lo pasen a algún componente para que hagan lo que necesitas, pero aún así no es algo muy práctico dado que ese hilo podría terminarse por inactividad o por una excepción y crearse otro nuevo en su lugar.

Antes que todo, gracias por la respuesta.

Qué me pueden recomendar? tal vez no me sirve el ThreadPoolExecutor.

Te cuento un poco acerca de lo que tengo que hacer.

Supongamos una clase Hilo:

public class Hilo extends Thread{
 
    private String s;
    private Thread padre;
 
    public Hilo(String name) {
        this.s = name;
    }
 
    public void setPadre(Thread padre) throws InterruptedException{
        this.padre = padre;
    }
 
    @Override
    public void run(){
        try {
            if(padre != null) {
                padre.join(); // Encadenamos
            }
                Thread.sleep(2000); // Que duerma 2 segundos
            }
        } catch (InterruptedException ex) {
            ex.printStackTrace();
        }
    }
}
<blockcode>

El metodo tradicional para ejecutar los hilos:

public class Main {
 
    public static void main(String[] args) throws InterruptedException{
 
        Hilo t1 = new Hilo("A");
        Hilo t2 = new Hilo("B");
 
        t2.setPadre(t1);
        t1.start();
        t2.start();
 
        System.out.println("FIN MAIN");
 
    }
}

Esto encadena un hilo a otro....pero que pasa cuando tengo N hilos a ejecutar y cada hilo puede tener más de un "padre" o hilo que esperar? algo así:

public class Hilo extends Thread{
 
    private String s;
    private List<Thread> padres;
 
    public Hilo(String name) {
        this.s = name;
    }
 
    public void setPadre(List<Thread> padres) throws InterruptedException{
        this.padres = padres;
    }
 
    @Override
    public void run(){
        try {
           
            if(padres != null) {
                for(Thread thread : padres)
                     thread.join(); // Encadenamos  //Esto no lo hace
            }
                Thread.sleep(2000); // Que duerma 2 segundos
            }
        } catch (InterruptedException ex) {
            ex.printStackTrace();
        }
    }
}

Mi problema pasa porque tengo un arreglo de Hilos, de la estructura anterior. Entonces cuando desde un main recorro el arreglo, y envío a ejecutar las hebras, no sé cómo pasarle la referecia de un hilo a otro que lo tiene como padre.

Entonces, cómo puedo manejar un arreglo de hilos, y que se haga por ejemplo, lo siguiente:

Tengo N thread en un arreglo, por ende debo recorrerlo.
...
Hilo 1 -> se ejecuta
Hilo 2 -> se ejecuta
Hilo 3 -> que espere a 1 y 2
Hilo 4 -> se ejecuta
Hilo 5 -> que espere a 1, 2, 3 y 4
....

Gracias desde ya

Imagen de ezamudio

no threadpool

Si necesitas ese tipo de control entre hilos, no uses ThreadPoolExecutor. Esa clase es muy útil para cuando tienes tareas que quieres ejecutar de manera concurrente pero son independientes entre ellas. Para lo que necesitas en todo caso te podría servir ForkJoin que ya viene en Java 7.

Imagen de ezamudio

Pensándolo bien

Por otra parte, sí podrías usar un ThreadPool, pero en conjunto con Futures. En vez de usar el método execute, usas el método submit, para arrojar tus tareas al thread pool, y obtener un Future. Si tienes varios Futures puedes invocar get en cada uno, y el método bloquea hasta que la tarea termine, si es que no ha terminado aún. De ese modo no lidias con hilos, sólo con tus componentes.

Futures...

Estimado, me podrías explicar un poco más de Future??

Te explico un poco mejor lo que debo hacer:

- Una Tarea se compone de varias cosas que tiene que hacer.
- De esas cosas que debe hacer, algunas se pueden ejecutar en paralelo y otras deben esperar a que otras terminen.

Ejemplo:

Tarea1:
- ejecutar un .jar
- llamar a un ws
- ejecutar otro .jar
- llamar a otro ws
- etc

La cantidad de cosas que debe hacer una tarea es variable. Casa una de estas cosas, para mí es un objeto Runnable, ya sea que extienda de Thread o que implemente Runnable.

Supongamos que tenemos estas subtareas en un arreglo (ordenado de tal manera que al menos siempre la primera tarea en el arreglo no depende de ninguna):

List listHilos = new ArrayList();

Entonces, cómo debería ejecutar las tareas para que se haga algo asi:??

for(Hilo objHilo : listHilos)
{
//Si objHilo tiene que esperar a otro hilo que espere, y si alguno de esos que está esperando falló, que termine o no continúe con la ejecución de ningúna otra tarea

//Si no que se ejecute: executor.execute(objHilo)
}

Cómo entonces puedo controlar o manejar esto?? qué api de java puedo usar???

Muchas gracias desde ya.

Imagen de ezamudio

java.util.concurrent

Revisa el paquete java.util.concurrent ahí están las interfaces de Future y tienen implementaciones sencillas para que puedas envolver un Runnable o Callable en un Future. De manera muy sencilla, el Future es un componente que realiza una tarea de manera síncrona o asíncrona, tipo un Runnable, pero devuelve un resultado (que puedes ignorar obviamente). El ThreadPoolExecutor tiene métodos para recibir Runnables, Callables y Futures. Tú ya tienes Runnables, de modo que ahora en vez de invocar execute, debes invocar submit, y guardar el Future que obtienes:

Runnable miTarea;
//Actualmente haces esto
threadPool.execute(miTarea);
//Realmente debes hacer esto
Future<?> f1 = threadPool.submit(miTarea);
//En algún momento esa tarea se ejecutará en el thread pool
//Mientras tanto puedes pasar ese Future a otro componente, o esperar a que se ejecute:
f1.get();
//Lo anterior va a bloquear al hilo que hace la llamada hasta que la tarea sea ejecutada en el thread pool.

Lo que entiendo es que quieres ejecutar varias tareas, y no te importa en qué orden se ejecuten, pero sí te importa llegar a un punto en el que antes de continuar, necesitas que todas hayan terminado. De modo que todas esas tareas las puedes meter al thread pool, obtener los Futures, meterlos en una lista, y entrar en un ciclo donde les haces get a cada uno. El ciclo va a bloquearse en cada tarea que no esté terminada, y cuando termine el ciclo es porque ya se ejecutaron todas las tareas.

poco a poco me va cuadrando

Estimado, poco a poco voy pensando como programar lo que tengo que hacer...

Una duda...el método join() une únicamente a dos Hilos?? puede un Hilo ser unido con join() a más de un Hilo??? Por ejemplo si en el método run() de cada hilo tengo:

public List listPadres;

....
....
....run()
{
for(por cada hilo "padre")
{
HiloPadre.join(); //Une realmente el hilo actual a más de un hilo?? o sólo a uno??
}
//realizar tarea
}

Lo mismo para Future, puedo pasar el objeto Future al método Run() de un hilo,. e invocar desde ahí a get()??

Saludos y gracias desde ya.

Gracias

Callable

 

Si utilizas java.util.concurrent.Callable, es posible saber si ocurrió un error al ejecutar una tarea. Lo único que necesitas es lanzar/relanzar una/la excepción. Por ejemplo:

Future<Void> future = executor.submit(new Callable<Void>() {
    @Override
    public Void call() throws CustomException {
        try {
            doSomething();
            return null;
        } catch (CustomException e) {
            throw e;
        }
    }
});

try {
    future.get();
    // Thread completed successfully.
} catch (InterruptedException e) {
    // Someone interrupted the thread.
} catch (ExecutionException e) {
    // An error occurred when executing the thread.
}

¡Por si sirve de algo!

~~~

gracias..

sigo con la pregunta....tanto join() como future.get() es aplicable sólo 1 a 1??? puedo por ejemplo decirle a un hilo que espere a otros dos hilos??

muchas gracias..

Imagen de Cid

Re: join()

El método join() hace que el hilo actual espere hasta que la instancia del hilo sobre quien se invoca join termine por lo tanto solo trabajas con 2 hilos el que invoca el metodo join() y el hilo actual que se esta ejecutando.

Checa este video

Imagen de Cid

CountDownLatch ?

Imagen de ezamudio

exacto

CountdownLatch sirve para sincronizar entre varios hilos, te da un control más fino sobre el orden en que terminan las ejecuciones.

Si solamente quieres saber si dos hilos t1 y t2 ya terminaron, pues tienes que hacer join a cada uno. La llamada a join bloquea el hilo que invoca hasta que el hilo invocado termina su ejecución. Si t1 llama t2.join() entonces t1 se queda bloqueado hasta que t2 termine (si t2 ya terminó pues la llamada regresa de inmediato y no se bloquea nada). Pero t2 podría a su vez estar esperando que termine un t3, que a su vez está esperando que termine t4... si alguno de esos está esperando a que termine t1, tendrás un bonito deadlock.

En cuanto a los Futures, si quieres que uno espere a otro y que a su vez ese espere a otro, no te queda otra opción más que arrojar la primera tarea al thread pool y pasar ese future a la segunda tarea, arrojarla al threadpool y pasar ese segundo future a la tercera tarea y arrojarla al thread pool. O arrojar dos tareas al thread pool y pasar sus futures a una tercera tarea y luego ejecutarla (ya sea de manera síncrona o asíncrona).

Imagen de echan

visualvm o jconsole? a lo

visualvm o jconsole? a lo mejor un poco de JMX, o tambien algo sencillo como un logger para cada thread.

Java Concurrency in Practice

 

Java Concurrency in Practice

Java Concurrency in Practice

▲ Para cualquiera que esté interesado en programación de hilos en Java, este libro es una lectura indispensable. Hay una vista previa en Google Books.

 Concurrency In Practice.”

▲ Forma correcta de leer el libro.

Fuente: https://plus.google.com/+jlapenna/posts/hfJsefBCZsU


Java 7 Concurrency Cookbook

Java 7 Concurrency Cookbook

▲ Este es otro libro en forma de recetas que incluye la clase java.util.concurrent.Phaser, Fork/Join y algunos otros cambios introducidos en Java 7.


The Java™ Tutorials incluye una sección de concurrencia: Lesson: Concurrency, aunque se trata más bien de una pequeña introducción.

¡Por si sirve de algo!

~~~

gracias muchachcos...

desde ya les agreadezco sus respuestas..

ante las opciones que uds me dan, no sé si realmente me convenga o me sirva usar ThreadPoolExecutor...

con ThreadPoolExecutor puedo?:

Por ejemplo, tengo 5 tareas, que se tienen que ejecutar así:

***Cada vez que una tarea entra en ejecucion (run()), marca en base de datos su estado, a si mismo cuando termina, actualiza su estado.

-tarea1, tarea2, tarea4, se envian al pool. (se ejecutan en paralelo)

-tarea3 debe esperar a que termine tarea1 y tarea2

-tarea5 debe esperar a que termine tarea1, tarea2, tarea4

-Si por ejemplo falla tarea2, no se ejecuta ninguna otra.

-Ahora también, desde fuera del programa, necesito conocer cuáles tareas se ejecutaron correctamente, cuáles fallaron, tal véz ejecutarlas de nuevo; eliminar algunas del pool..etc....

Un poco más gráfico:

Arreglo con Tareas
0 1 2 3 4 5 -> posicion
[1 - 1 - 1 - 0 - 1 - 0] -> condicion de la tarea

1: Indica que no depende de otra tarea
0: Indica que depende todos los 1 anteriores a el

Tendría entonces estos otros arreglos:
[0 - 1 - 2 - 4] -> corren en paralelo

[3 - 5] -> 3 espera a 0,1,2 y 5 espera a 0,1,2,4

Es básicamente esto lo que necesito hacer o controlar.

La pregunta es: puedo tener este tipo de control?? qué api me sirve?? de qué manera lo puedo hacer?? he intentado todo lo que me han dicho pero no me funciona.. cuál sería la mejor recomendación amigos?? como verán , algo de complejidad tiene lo que tengo que hacer..u.u

muchas gracias de nuevo.

Re: no me funciona

he intentado todo lo que me han dicho pero no me funciona.. [...] como verán , algo de complejidad tiene lo que tengo que hacer..u.u

Creo que primero deberías «modelar» lo que tienes que hacer. Por ejemplo, utilizando un Diagrama de Actividad:

Workflow

Dependiendo del resultado de cada actividad, de acuerdo con tu requerimiento, necesitas tomar una decisión (no incluido en el diagrama), ya sea repetir la tarea, deshacer los cambios, interrumpir todo, etc.

~~~

Imagen de ezamudio

threadpool

Olvida el threadpool. Los threadpools son para cuando tienes montones de tareas que ejecutar y quieres hacerlo de manera concurrente pero no quieres arrancar cada tarea en un hilo porque serían demasiados recursos. Si tienes 5 hilos y solamente 5 hilos, arráncalos manualmente y que se sincronicen entre ellos. Ya tienes el join() que es muy básico, ya te dijeron del countdownlatch, ya te dije de los Futures, creo que es suficiente con eso.

futures...

finalmente lo hice con futures...y quedó asi:

public class Tarea extends Thread{

        private int intId;
        private final String name;
        private int sleepTime;
        private long time;
        private List<Future<?>> listFutures;
        private List<Integer> listPadres;
       
        public Tarea(String name, int sleep, int intId)
        {
                this.intId = intId;
                this.name = name;
                this.sleepTime = sleep;
                this.listFutures = new ArrayList<Future<?>>();
                this.listPadres = new ArrayList<Integer>();
                this.time = System.currentTimeMillis();
        }
               
        @Override
        public void run()
        {
                try
                {
                        Thread.currentThread().setName(this.name);     
                        System.out.printf("INICIO: "+Thread.currentThread().getName() +".\n");
                        if(this.listFutures!= null)
                        {
                                for(Future<?> objFuture : this.listFutures)
                                {                      
                                        System.out.printf(Thread.currentThread().getName() +" EN ESPERA.\n");
                                        objFuture.get();
                                }
                        }
                        //Hace lo que tiene que hacer
                        Thread.sleep(sleepTime);
                }
                catch(InterruptedException ex)
                {
                        ex.printStackTrace();
                } catch (ExecutionException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                }
                System.out.println("FIN: "+Thread.currentThread().getName()+". Time: " + (System.currentTimeMillis()-time));
        }
       
        public void setIntId(int intId)
        {
                this.intId = intId;
        }
       
        public int getIntId()
        {
                return this.intId;
        }
       
        public void addListFutures(Future<?> future)
        {
                this.listFutures.add(future);
        }
       
        public void addListPadres(Integer indiceTarea)
        {
                this.listPadres.add(indiceTarea);
        }
       
        public List<Integer> getListPadres()
        {
                return this.listPadres;
        }
}

Main:

public class TareaPool {

        public static void main(String[] args) {
               
                int THREAD_NUM = 5;
                CountDownLatch counter = new CountDownLatch(THREAD_NUM);
                ArrayBlockingQueue<Runnable> waitqueue = new ArrayBlockingQueue<Runnable>(THREAD_NUM);
               
            //5 tareas al mismo tiempo, 5 tareas máximas, 2 segundo de timer, y la cola de espera
                ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 5, 2, TimeUnit.SECONDS, waitqueue);            

                String par_control = "101110";
                List<Integer> listEnParalelo = new ArrayList<Integer>();
                List<Integer> listEnEspera = new ArrayList<Integer>();
               
                //Se llenan los arreglos con los indices
                for(int i=0; i<par_control.length(); i++)
                {                              
                        if(String.valueOf(par_control.charAt(i)).equals("1"))
                                listEnParalelo.add(i);
                        else
                                listEnEspera.add(i);                                   
                }
               
                //Contiene las tareas a ejecutar
                List<Tarea> listTareas = new ArrayList<Tarea>();
               
                //Para las hebras en paralelo
                int i=1;               
                for(Integer intParalelo : listEnParalelo)
                {
                        listTareas.add(new Tarea("Hilo"+i,5000, intParalelo));
                        i++;
                }
               
                //Para las hebras que deben esperar
                int j=i;
                for(Integer intEspera : listEnEspera)
                {
                        Tarea t = new Tarea("Hilo"+j, 3000, intEspera);
                        for(Integer intParalelo : listEnParalelo)
                        {
                                if(intParalelo < intEspera)
                                {
                                        t.addListPadres(intParalelo);
                                }
                        }
                        listTareas.add(t);
                        j++;
                }
               
                //List para mantener los Futures
                List<Future<?>> listFuture = new ArrayList<Future<?>>();
               
                for(int x=0; x<listTareas.size(); x++)
                {
                        if(listTareas.get(x).getListPadres().size()==0)
                        {
                                listFuture.add(executor.submit(listTareas.get(x)));
                        }
                        else
                        {
                                for(Integer intPadres : listTareas.get(x).getListPadres())
                                {
                                        if(listFuture.get(intPadres)!=null)
                                        {
                                                listTareas.get(x).addListFutures(listFuture.get(intPadres));
                                        }
                                }
                                listFuture.add(executor.submit(listTareas.get(x)));
                        }
                }
                executor.shutdown();
        }

}

Se les agradece sus conocimientos...

Imagen de Cid

Curso de Hilos

Aqui te dejo la liga donde hablan de varios temas que te pueden ayudar a entender más sobre hilos Java Multithreading es gratuito solo tienes que registrarte en Udemy.

Espero te y les sirva a cualquiera que este interesado.