Cosa determina il numero di thread creati da Java ForkJoinPool?

Per quanto ho capito ForkJoinPool , quel pool crea un numero fisso di thread (predefinito: numero di core) e non creerà mai più thread (a meno che l’applicazione non indichi la necessità di quelli usando managedBlock ).

Tuttavia, utilizzando ForkJoinPool.getPoolSize() ho scoperto che in un programma che crea 30.000 attività ( RecursiveAction ), ForkJoinPool esegue tali attività utilizza in media 700 thread (thread contati ogni volta che viene creata un’attività). I compiti non fanno I / O, ma puro calcolo; l’unica sincronizzazione tra task chiama ForkJoinTask.join() e accede a AtomicBoolean s, ovvero non ci sono operazioni di blocco dei thread.

Poiché join() non blocca il thread chiamante come ho capito, non vi è alcun motivo per cui qualsiasi thread nel pool debba mai bloccare, e così (ho pensato) non ci dovrebbero essere motivi per creare ulteriori thread (che è ovviamente accadendo comunque).

Quindi, perché ForkJoinPool crea così tanti thread? Quali fattori determinano il numero di thread creati?

Speravo che si potesse rispondere a questa domanda senza inserire il codice, ma qui viene richiesta. Questo codice è un estratto di un programma di quattro volte la dimensione, ridotto alle parti essenziali; non si compila così com’è. Se lo si desidera, posso ovviamente pubblicare anche il programma completo.

Il programma cerca in un labirinto un percorso da un determinato punto iniziale a un dato punto finale utilizzando la ricerca in profondità. Una soluzione è garantita per esistere. La logica principale è nel metodo compute() di SolverTask : A RecursiveAction che inizia in un determinato punto e continua con tutti i punti vicini raggiungibili dal punto corrente. Piuttosto che creare un nuovo SolverTask in ogni punto di diramazione (che creerebbe troppe attività), esso spinge tutti i vicini tranne uno su uno stack di backtracking per essere processato in un secondo momento e continua con solo un vicino non inserito nello stack. Una volta raggiunto il punto morto in quel modo, il punto che è stato spinto più di recente nella pila di backtracking è saltato, e la ricerca continua da lì (riducendo di conseguenza il percorso creato dal punto di partenza del taks di conseguenza). Una nuova attività viene creata quando un’attività trova la pila di backtracking più grande di una certa soglia; da quel momento, il compito, pur continuando a schioccare dal suo stack di backtracking fino a che non viene esaurito, non spinge altri punti verso il suo stack quando raggiunge un punto di diramazione, ma crea un nuovo compito per ciascun punto. Pertanto, la dimensione delle attività può essere regolata utilizzando la soglia limite dello stack.

I numeri che ho citato sopra (“30.000 compiti, 700 thread in media”) provengono dalla ricerca in un labirinto di 5000×5000 celle. Quindi, ecco il codice essenziale:

 class SolverTask extends RecursiveTask<ArrayDeque> { // Once the backtrack stack has reached this size, the current task // will never add another cell to it, but create a new task for each // newly discovered branch: private static final int MAX_BACKTRACK_CELLS = 100*1000; /** * @return Tries to compute a path through the maze from local start to end * and returns that (or null if no such path found) */ @Override public ArrayDeque compute() { // Is this task still accepting new branches for processing on its own, // or will it create new tasks to handle those? boolean stillAcceptingNewBranches = true; Point current = localStart; ArrayDeque pathFromLocalStart = new ArrayDeque(); // Path from localStart to (including) current ArrayDeque backtrackStack = new ArrayDeque(); // Used as a stack: Branches not yet taken; solver will backtrack to these branching points later Direction[] allDirections = Direction.values(); while (!current.equals(end)) { pathFromLocalStart.addLast(current); // Collect current's unvisited neighbors in random order: ArrayDeque neighborsToVisit = new ArrayDeque(allDirections.length); for (Direction directionToNeighbor: allDirections) { Point neighbor = current.getNeighbor(directionToNeighbor); // contains() and hasPassage() are read-only methods and thus need no synchronization if (maze.contains(neighbor) && maze.hasPassage(current, neighbor) && maze.visit(neighbor)) neighborsToVisit.add(new PointAndDirection(neighbor, directionToNeighbor.opposite)); } // Process unvisited neighbors if (neighborsToVisit.size() == 1) { // Current node is no branch: Continue with that neighbor current = neighborsToVisit.getFirst().getPoint(); continue; } if (neighborsToVisit.size() >= 2) { // Current node is a branch if (stillAcceptingNewBranches) { current = neighborsToVisit.removeLast().getPoint(); // Push all neighbors except one on the backtrack stack for later processing for(PointAndDirection neighborAndDirection: neighborsToVisit) backtrackStack.push(neighborAndDirection); if (backtrackStack.size() > MAX_BACKTRACK_CELLS) stillAcceptingNewBranches = false; // Continue with the one neighbor that was not pushed onto the backtrack stack continue; } else { // Current node is a branch point, but this task does not accept new branches any more: // Create new task for each neighbor to visit and wait for the end of those tasks SolverTask[] subTasks = new SolverTask[neighborsToVisit.size()]; int t = 0; for(PointAndDirection neighborAndDirection: neighborsToVisit) { SolverTask task = new SolverTask(neighborAndDirection.getPoint(), end, maze); task.fork(); subTasks[t++] = task; } for (SolverTask task: subTasks) { ArrayDeque subTaskResult = null; try { subTaskResult = task.join(); } catch (CancellationException e) { // Nothing to do here: Another task has found the solution and cancelled all other tasks } catch (Exception e) { e.printStackTrace(); } if (subTaskResult != null) { // subtask found solution pathFromLocalStart.addAll(subTaskResult); // No need to wait for the other subtasks once a solution has been found return pathFromLocalStart; } } // for subTasks } // else (not accepting any more branches) } // if (current node is a branch) // Current node is dead end or all its neighbors lead to dead ends: // Continue with a node from the backtracking stack, if any is left: if (backtrackStack.isEmpty()) { return null; // No more backtracking avaible: No solution exists => end of this task } // Backtrack: Continue with cell saved at latest branching point: PointAndDirection pd = backtrackStack.pop(); current = pd.getPoint(); Point branchingPoint = current.getNeighbor(pd.getDirectionToBranchingPoint()); // DEBUG System.out.println("Backtracking to " + branchingPoint); // Remove the dead end from the top of pathSoFar, ie all cells after branchingPoint: while (!pathFromLocalStart.peekLast().equals(branchingPoint)) { // DEBUG System.out.println(" Going back before " + pathSoFar.peekLast()); pathFromLocalStart.removeLast(); } // continue while loop with newly popped current } // while (current ... if (!current.equals(end)) { // this task was interrupted by another one that already found the solution // and should end now therefore: return null; } else { // Found the solution path: pathFromLocalStart.addLast(current); return pathFromLocalStart; } } // compute() } // class SolverTask @SuppressWarnings("serial") public class ParallelMaze { // for each cell in the maze: Has the solver visited it yet? private final AtomicBoolean[][] visited; /** * Atomically marks this point as visited unless visited before * @return whether the point was visited for the first time, ie whether it could be marked */ boolean visit(Point p) { return visited[p.getX()][p.getY()].compareAndSet(false, true); } public static void main(String[] args) { ForkJoinPool pool = new ForkJoinPool(); ParallelMaze maze = new ParallelMaze(width, height, new Point(width-1, 0), new Point(0, height-1)); // Start initial task long startTime = System.currentTimeMillis(); // since SolverTask.compute() expects its starting point already visited, // must do that explicitly for the global starting point: maze.visit(maze.start); maze.solution = pool.invoke(new SolverTask(maze.start, maze.end, maze)); // One solution is enough: Stop all tasks that are still running pool.shutdownNow(); pool.awaitTermination(Integer.MAX_VALUE, TimeUnit.DAYS); long endTime = System.currentTimeMillis(); System.out.println("Computed solution of length " + maze.solution.size() + " to maze of size " + width + "x" + height + " in " + ((float)(endTime - startTime))/1000 + "s."); } 

Ci sono domande correlate su StackOverflow:

ForkJoinPool si blocca durante invokeAll / join

ForkJoinPool sembra sprecare un thread

Ho eseguito una versione ridotta di ciò che sta accadendo (argomenti utilizzati da jvm: -Xms256m -Xmx1024m -Xss8m):

 import java.util.ArrayList; import java.util.List; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.RecursiveAction; import java.util.concurrent.RecursiveTask; import java.util.concurrent.TimeUnit; public class Test1 { private static ForkJoinPool pool = new ForkJoinPool(2); private static class SomeAction extends RecursiveAction { private int counter; //recursive counter private int childrenCount=80;//amount of children to spawn private int idx; // just for displaying private SomeAction(int counter, int idx) { this.counter = counter; this.idx = idx; } @Override protected void compute() { System.out.println( "counter=" + counter + "." + idx + " activeThreads=" + pool.getActiveThreadCount() + " runningThreads=" + pool.getRunningThreadCount() + " poolSize=" + pool.getPoolSize() + " queuedTasks=" + pool.getQueuedTaskCount() + " queuedSubmissions=" + pool.getQueuedSubmissionCount() + " parallelism=" + pool.getParallelism() + " stealCount=" + pool.getStealCount()); if (counter <= 0) return; List list = new ArrayList<>(childrenCount); for (int i=0;i 

Apparentemente quando si esegue un join, il thread corrente vede che l'attività richiesta non è stata ancora completata e richiede un altro compito da svolgere.

Accade in java.util.concurrent.ForkJoinWorkerThread#joinTask .

Tuttavia, questa nuova attività genera più delle stesse attività, ma non riescono a trovare i thread nel pool, poiché i thread sono bloccati in join. E poiché non ha modo di sapere quanto tempo richiederà per essere rilasciato (il thread potrebbe essere in loop infinito o deadlock per sempre), il nuovo thread (s) viene generato (Compensazione per i thread uniti come menzionato da Louis Wasserman ): java.util.concurrent.ForkJoinPool#signalWork

Quindi, per evitare tale scenario è necessario evitare lo spawning ricorsivo delle attività.

Ad esempio se nel codice precedente si imposta il parametro iniziale su 1, l'importo del thread attivo sarà 2, anche se si aumenta il numero di bambini di dieci volte.

Si noti inoltre che, mentre la quantità di thread attivi aumenta, la quantità di thread in esecuzione è minore o uguale al parallelismo .

Dai commenti alla fonte:

Compensazione: a meno che non ci siano già abbastanza discussioni dal vivo, il metodo tryPreBlock () può creare o ritriggersre un thread di riserva per compensare i joiner bloccati fino a quando non si sbloccano.

Penso che quello che sta succedendo è che non stai finendo nessuna delle attività molto velocemente, e dato che non ci sono thread di lavoro disponibili quando si invia una nuova attività, viene creato un nuovo thread.

rigoroso, rigoroso e rigoroso con l’elaborazione di un grafico aciclico diretto (DAG). Puoi google questi termini per avere una piena comprensione di loro. Questo è il tipo di elaborazione che il framework è stato progettato per elaborare. Guardate il codice nell’API per Recursive …, il framework si basa sul vostro codice compute () per fare altri collegamenti compute () e quindi fare un join (). Ogni attività esegue un singolo join () proprio come l’elaborazione di un DAG.

Non stai eseguendo l’elaborazione DAG. Stai biforcando molti nuovi compiti e aspettando (join ()) su ciascuno. Avere una lettura del codice sorgente È terribilmente complesso ma potresti riuscire a capirlo. Il framework non esegue la corretta gestione delle attività. Dove sta andando a mettere il Task in attesa quando fa un join ()? Non esiste una coda sospesa, che richiederebbe un thread del monitor per esaminare costantemente la coda per vedere cosa è finito. Questo è il motivo per cui il framework usa “thread di continuazione”. Quando un task fa join (), il framework sta assumendo che stia aspettando che finisca una singola Task inferiore. Quando sono presenti molti metodi join (), il thread non può continuare in modo che sia necessario un helper o un thread di continuazione.

Come indicato sopra, è necessario un processo fork-join di tipo scatter-gather. Lì puoi lanciare tante attività

Entrambi gli snippet di codice pubblicati da Holger Peine e elusive-code in realtà non seguono la pratica raccomandata che è apparsa in javadoc per la versione 1.8 :

Negli usi più tipici, una coppia fork-join si comporta come una chiamata (fork) e restituisce (join) da una funzione ricorsiva parallela. Come nel caso di altre forms di chiamate ricorsive, i resi (join) devono essere eseguiti prima di tutto. Ad esempio, a.fork (); b.fork (); b.join (); a.join (); è probabile che sia sostanzialmente più efficiente di unire il codice a prima del codice b .

In entrambi i casi FJPool è stato istanziato tramite il costruttore predefinito. Ciò porta alla costruzione del pool con asyncMode = false , che è l’impostazione predefinita:

@param asyncMode se true,
stabilisce la modalità di pianificazione locale first-in-first-out per le attività a fork che non vengono mai unite. Questa modalità potrebbe essere più appropriata rispetto alla modalità predefinita basata su stack locale nelle applicazioni in cui i thread di lavoro elaborano solo attività asincrone in stile evento. Per il valore predefinito, utilizzare false.

in questo modo la coda di lavoro è in realtà lifo:
testa -> | t4 | t3 | t2 | t1 | … | <- coda

Quindi nei frammenti fork () tutti i task li spingono nello stack e poi join () nello stesso ordine, cioè dal task più profondo (t1) al più alto (t4) che blocca in modo efficace fino a quando un altro thread non ruberà (t1), quindi (t2 ) e così via. Poiché ci sono task enouth per bloccare tutti i thread del pool (task_count >> pool.getParallelism ()), la compensazione viene presa come descritto da Louis Wasserman .