Partizione a Java 8 Stream

Come implementare l’operazione “partition” su Java 8 Stream? Per partizione intendo, dividere un stream in sub-flussi di una determinata dimensione. In qualche modo sarà identico al metodo Guava Iterators.partition () , solo che è desiderabile che le partizioni siano Streams piuttosto ponderate piuttosto che List.

È imansible suddividere il stream di origine arbitrario in lotti di dimensioni fisse, perché ciò rovinerebbe l’elaborazione parallela. Quando si elabora in parallelo, non si può sapere quanti elementi nella prima sotto-attività dopo la divisione, quindi non è ansible creare le partizioni per la successiva attività secondaria finché il primo non viene completamente elaborato.

Tuttavia è ansible creare il stream di partizioni dall’elenco di accesso casuale. Tale funzione è disponibile, ad esempio, nella mia libreria StreamEx :

 List input = Arrays.asList(...); Stream> stream = StreamEx.ofSubLists(input, partitionSize); 

O se vuoi davvero il stream di stream:

 Stream> stream = StreamEx.ofSubLists(input, partitionSize).map(List::stream); 

Se non vuoi dipendere da librerie di terze parti, puoi implementare manualmente tale metodo di ofSubLists :

 public static  Stream> ofSubLists(List source, int length) { if (length <= 0) throw new IllegalArgumentException("length = " + length); int size = source.size(); if (size <= 0) return Stream.empty(); int fullChunks = (size - 1) / length; return IntStream.range(0, fullChunks + 1).mapToObj( n -> source.subList(n * length, n == fullChunks ? size : (n + 1) * length)); } 

Questa implementazione sembra un po ‘lunga, ma prende in considerazione alcuni casi d’angolo come la dimensione dell’elenco da vicino a MAX_VALUE.


Se vuoi una soluzione parallela per lo streaming non ordinato (quindi non ti interessa quali elementi del stream verranno combinati in un singolo batch), puoi usare il collector in questo modo (grazie a @sibnick per l’ispirazione):

 public static  Collector unorderedBatches(int batchSize, Collector, A, R> downstream) { class Acc { List cur = new ArrayList<>(); A acc = downstream.supplier().get(); } BiConsumer accumulator = (acc, t) -> { acc.cur.add(t); if(acc.cur.size() == batchSize) { downstream.accumulator().accept(acc.acc, acc.cur); acc.cur = new ArrayList<>(); } }; return Collector.of(Acc::new, accumulator, (acc1, acc2) -> { acc1.acc = downstream.combiner().apply(acc1.acc, acc2.acc); for(T t : acc2.cur) accumulator.accept(acc1, t); return acc1; }, acc -> { if(!acc.cur.isEmpty()) downstream.accumulator().accept(acc.acc, acc.cur); return downstream.finisher().apply(acc.acc); }, Collector.Characteristics.UNORDERED); } 

Esempio di utilizzo:

 List> list = IntStream.range(0,20) .boxed().parallel() .collect(unorderedBatches(3, Collectors.toList())); 

Risultato:

 [[2, 3, 4], [7, 8, 9], [0, 1, 5], [12, 13, 14], [17, 18, 19], [10, 11, 15], [6, 16]] 

Tale collettore è perfettamente sicuro per i thread e produce lotti ordinati per stream sequenziale.

Se si desidera applicare una trasformazione intermedia per ogni lotto, è ansible utilizzare la seguente versione:

 public static  Collector unorderedBatches(int batchSize, Collector batchCollector, Collector downstream) { return unorderedBatches(batchSize, Collectors.mapping(list -> list.stream().collect(batchCollector), downstream)); } 

Ad esempio, in questo modo è ansible sumre i numeri in ogni lotto al volo:

 List list = IntStream.range(0,20) .boxed().parallel() .collect(unorderedBatches(3, Collectors.summingInt(Integer::intValue), Collectors.toList())); 

Se si desidera utilizzare il stream in sequenza, è ansible partizionare un stream (nonché eseguire funzioni correlate come il windowing, che ritengo sia ciò che si desidera in questo caso). Due librerie che supporteranno il partitoning per gli stream standard sono cyclops-react (io sono l’autore) e jOOλ che cyclops-react estende (per aggiungere funzionalità come Windowing).

cyclops-streams ha una collezione di funzioni statiche StreamUtils per operare su stream Java e una serie di funzioni come splitAt, headAndTail, splitBy, partizione per il partizionamento.

Per visualizzare un stream in un stream di flussi nidificati di dimensione 30, puoi utilizzare il metodo finestra.

Al punto OP, in termini di Streaming, suddividere un stream in più flussi di una determinata dimensione è un’operazione di tipo A (anziché un’operazione di partizionamento).

  Stream> streamOfStreams = StreamUtils.window(stream,30); 

Esiste una class di estensione Stream chiamata ReactiveSeq che estende jool.Seq e aggiunge la funzionalità di Windowing, che potrebbe rendere il codice un po ‘più pulito.

  ReactiveSeq seq; ReactiveSeq> streamOfLists = seq.grouped(30); 

Come indicato da Tagir sopra, tuttavia, questo non è adatto per flussi paralleli. Se vuoi window o batch un Stream che desideri eseguire in modo multithread. LazyFutureStream in cyclops-react potrebbe essere utile (Windowing è nell’elenco delle cose da fare, ma il vecchio batching è ora disponibile).

In questo caso i dati verranno passati dai thread multipli che eseguono il stream a una coda di attesa multi-produttore / single-consumer e i dati sequenziali da quella coda possono essere sottoposti a finestra prima di essere nuovamente distribuiti ai thread.

  Stream> batched = new LazyReact().range(0,1000) .grouped(30) .map(this::process); 

Sembra che, come Jon Skeet ha mostrato nel suo commento , non è ansible rendere le partizioni pigre. Per le partizioni non-pigre, ho già questo codice:

 public static  Stream> partition(Stream source, int size) { final Iterator it = source.iterator(); final Iterator> partIt = Iterators.transform(Iterators.partition(it, size), List::stream); final Iterable> iterable = () -> partIt; return StreamSupport.stream(iterable.spliterator(), false); } 

La soluzione java 8 più elegante e pura per questo problema ho trovato:

 public static  List> partition(final List list, int batchSize) { return IntStream.range(0, getNumberOfPartitions(list, batchSize)) .mapToObj(i -> list.subList(i * batchSize, Math.min((i + 1) * batchSize, list.size()))) .collect(toList()); } //https://stackoverflow.com/questions/23246983/get-the-next-higher-integer-value-in-java private static  int getNumberOfPartitions(List list, int batchSize) { return (list.size() + batchSize- 1) / batchSize; } 

Penso che sia ansible con una sorta di hack dentro:

creare una class di utilità per batch:

 public static class ConcurrentBatch { private AtomicLong id = new AtomicLong(); private int batchSize; public ConcurrentBatch(int batchSize) { this.batchSize = batchSize; } public long next() { return (id.getAndIncrement()) / batchSize; } public int getBatchSize() { return batchSize; } } 

e metodo:

 public static  void applyConcurrentBatchToStream(Consumer> batchFunc, Stream stream, int batchSize){ ConcurrentBatch batch = new ConcurrentBatch(batchSize); //hack java map: extends and override computeIfAbsent Supplier>> mapFactory = () -> new ConcurrentHashMap>() { @Override public List computeIfAbsent(Long key, Function> mappingFunction) { List rs = super.computeIfAbsent(key, mappingFunction); //apply batchFunc to old lists, when new batch list is created if(rs.isEmpty()){ for(Entry> e : entrySet()) { List batchList = e.getValue(); //todo: need to improve synchronized (batchList) { if (batchList.size() == batch.getBatchSize()){ batchFunc.accept(batchList); remove(e.getKey()); batchList.clear(); } } } } return rs; } }; stream.map(s -> new AbstractMap.SimpleEntry<>(batch.next(), s)) .collect(groupingByConcurrent(AbstractMap.SimpleEntry::getKey, mapFactory, mapping(AbstractMap.SimpleEntry::getValue, toList()))) .entrySet() .stream() //map contains only unprocessed lists (size batchFunc.accept(e.getValue())); } 

Ecco una soluzione rapida di AbacusUtil

 IntStream.range(0, Integer.MAX_VALUE).split(size).forEach(s -> N.println(s.toArray())); 

Dichiarazione: sono lo sviluppatore di AbacusUtil.