Prodotto cartesiano di flussi in Java 8 come stream (utilizzando solo flussi)

Vorrei creare un metodo che crea un stream di elementi che sono prodotti cartesiani di più flussi dati (aggregati allo stesso tipo alla fine da un operatore binario). Si noti che sia gli argomenti che i risultati sono flussi, non raccolte.

Ad esempio, per due flussi di {A, B} e {X, Y} mi piacerebbe che producesse un stream di valori {AX, AY, BX, BY} (una semplice concatenazione è usata per aggregare le stringhe). Finora, ho trovato questo codice:

private static  Stream cartesian(BinaryOperator aggregator, Stream... streams) { Stream result = null; for (Stream stream : streams) { if (result == null) { result = stream; } else { result = result.flatMap(m -> stream.map(n -> aggregator.apply(m, n))); } } return result; } 

Questo è il mio caso d’uso desiderato:

 Stream result = cartesian( (a, b) -> a + b, Stream.of("A", "B"), Stream.of("X", "Y") ); System.out.println(result.collect(Collectors.toList())); 

Risultato atteso: AX, AY, BX, BY .

Un altro esempio:

 Stream result = cartesian( (a, b) -> a + b, Stream.of("A", "B"), Stream.of("K", "L"), Stream.of("X", "Y") ); 

Risultato atteso: AKX, AKY, ALX, ALY, BKX, BKY, BLX, BLY .

Tuttavia, se eseguo il codice, ottengo questo errore:

IllegalStateException: lo stream è già stato utilizzato o chiuso

Dove viene consumato il stream? Di flatMap ? Può essere facilmente risolto?

Passare i flussi nel tuo esempio non è mai meglio che passare le liste:

 private static  Stream cartesian(BinaryOperator aggregator, List... lists) { ... } 

E usalo in questo modo:

 Stream result = cartesian( (a, b) -> a + b, Arrays.asList("A", "B"), Arrays.asList("K", "L"), Arrays.asList("X", "Y") ); 

In entrambi i casi si crea un array implicito da varargs e lo si utilizza come origine dati, quindi la pigrizia è immaginaria. I tuoi dati vengono effettivamente memorizzati negli array.

Nella maggior parte dei casi il stream di prodotti cartesiani risultante è molto più lungo degli input, quindi non c’è praticamente alcuna ragione per rendere gli input pigri. Ad esempio, con cinque elenchi di cinque elementi (25 in totale), si otterrà il stream risultante di 3125 elementi. Quindi memorizzare 25 elementi nella memoria non è un grosso problema. In realtà nella maggior parte dei casi pratici sono già memorizzati nella memoria.

Per generare il stream di prodotti cartesiani è necessario “riavvolgere” costantemente tutti i flussi (tranne il primo). Per riavvolgere, gli stream dovrebbero essere in grado di recuperare i dati originali ancora e ancora, o di buffering in qualche modo (cosa che non ti piace) o riprenderli di nuovo dalla sorgente (colleciton, array, file, rete, numeri casuali, ecc. ) ed eseguire ancora e ancora tutte le operazioni intermedie. Se le operazioni di origine e intermedie sono lente, la soluzione lazy potrebbe essere molto più lenta della soluzione di buffering. Se la tua fonte non è in grado di produrre nuovamente i dati (ad esempio, un generatore di numeri casuali che non può produrre gli stessi numeri prodotti in precedenza), la tua soluzione sarà errata.

Tuttavia la soluzione totalmente pigra è possbile. Basta usare non i flussi, ma i fornitori di streaming:

 private static  Stream cartesian(BinaryOperator aggregator, Supplier>... streams) { return Arrays.stream(streams) .reduce((s1, s2) -> () -> s1.get().flatMap(t1 -> s2.get().map(t2 -> aggregator.apply(t1, t2)))) .orElse(Stream::empty).get(); } 

La soluzione è interessante poiché creiamo e riduciamo il stream di fornitori per ottenere il fornitore risultante e infine chiamarlo. Uso:

 Stream result = cartesian( (a, b) -> a + b, () -> Stream.of("A", "B"), () -> Stream.of("K", "L"), () -> Stream.of("X", "Y") ); result.forEach(System.out::println); 

stream viene consumato nell’operazione flatMap nella seconda iterazione. Quindi devi creare un nuovo stream ogni volta che map il risultato. Pertanto è necessario raccogliere lo stream in anticipo per ottenere un nuovo stream in ogni iterazione.

 private static  Stream cartesian(BiFunction aggregator, Stream... streams) { Stream result = null; for (Stream stream : streams) { if (result == null) { result = stream; } else { Collection s = stream.collect(Collectors.toList()); result = result.flatMap(m -> s.stream().map(n -> aggregator.apply(m, n))); } } return result; } 

O ancora più corto:

 private static  Stream cartesian(BiFunction aggregator, Stream... streams) { return Arrays.stream(streams).reduce((r, s) -> { List collect = s.collect(Collectors.toList()); return r.flatMap(m -> collect.stream().map(n -> aggregator.apply(m, n))); }).orElse(Stream.empty()); }