Limita un stream in base a un predicato

Esiste un’operazione di streaming Java 8 che limita un stream (potenzialmente infinito) fino al primo elemento che non riesce a corrispondere a un predicato? Qualcosa che assomiglia all’operazione (non esistente) takeWhile nell’esempio seguente e stamperebbe tutti i numeri inferiori a 10?

 IntStream .iterate(1, n -> n + 1) .takeWhile(n -> n < 10) .forEach(System.out::println); 

Se non esiste una tale operazione, qual è il modo migliore di implementarla in modo generale?

Tale operazione dovrebbe essere ansible con un Java 8 Stream , ma non può essere necessariamente fatto in modo efficiente – ad esempio, non è ansible parallelizzare un’operazione del genere, dato che bisogna osservare gli elementi in ordine.

L’API non fornisce un modo semplice per farlo, ma quello che probabilmente è il modo più semplice è prendere Stream.iterator() , avvolgere l’ Iterator per avere un’implementazione “take-while”, e poi tornare a uno Spliterator e poi un Stream . Oppure – forse – avvolgere lo Spliterator , anche se non può essere più suddiviso in questa implementazione.

Ecco un’implementazione non testata di takeWhile su uno Spliterator :

 static  Spliterator takeWhile( Spliterator splitr, Predicate predicate) { return new Spliterators.AbstractSpliterator(splitr.estimateSize(), 0) { boolean stillGoing = true; @Override public boolean tryAdvance(Consumer consumer) { if (stillGoing) { boolean hadNext = splitr.tryAdvance(elem -> { if (predicate.test(elem)) { consumer.accept(elem); } else { stillGoing = false; } }); return hadNext && stillGoing; } return false; } }; } static  Stream takeWhile(Stream stream, Predicate predicate) { return StreamSupport.stream(takeWhile(stream.spliterator(), predicate), false); } 

Operazioni takeWhile e dropWhile sono stati aggiunti a JDK 9. Il tuo codice di esempio

 IntStream .iterate(1, n -> n + 1) .takeWhile(n -> n < 10) .forEach(System.out::println); 

si comporterà esattamente come ci si aspetta quando compilato ed eseguito sotto JDK 9.

JDK 9 è stato rilasciato. È disponibile per il download qui: http://jdk.java.net/9/

allMatch() è una funzione di cortocircuito, quindi puoi usarla per interrompere l’elaborazione. Lo svantaggio principale è che devi fare il test due volte: una volta per vedere se dovresti elaborarlo, e ancora per vedere se continuare.

 IntStream .iterate(1, n -> n + 1) .peek(n->{if (n<10) System.out.println(n);}) .allMatch(n->n < 10); 

Come follow-up alla risposta di @StuartMarks . La mia libreria StreamEx ha l’operazione takeWhile che è compatibile con l’implementazione corrente di JDK-9. Quando è in esecuzione con JDK-9, delegherà semplicemente all’implementazione JDK (tramite MethodHandle.invokeExact che è molto veloce). Durante l’esecuzione con JDK-8, verrà utilizzata l’implementazione “polyfill”. Quindi usando la mia libreria il problema può essere risolto in questo modo:

 IntStreamEx.iterate(1, n -> n + 1) .takeWhile(n -> n < 10) .forEach(System.out::println); 

takeWhile è una delle funzioni fornite dalla libreria protonpack .

 Stream infiniteInts = Stream.iterate(0, i -> i + 1); Stream finiteInts = StreamUtils.takeWhile(infiniteInts, i -> i < 10); assertThat(finiteInts.collect(Collectors.toList()), hasSize(10)); 

Puoi usare java8 + rxjava .

 import java.util.stream.IntStream; import rx.Observable; // Example 1) IntStream intStream = IntStream.iterate(1, n -> n + 1); Observable.from(() -> intStream.iterator()) .takeWhile(n -> { System.out.println(n); return n < 10; } ).subscribe() ; // Example 2 IntStream intStream = IntStream.iterate(1, n -> n + 1); Observable.from(() -> intStream.iterator()) .takeWhile(n -> n < 10) .forEach( n -> System.out.println(n)); 

Aggiornamento: Java 9 Stream ora viene fornito con un metodo takeWhile .

Nessun bisogno di hack o altre soluzioni. Basta usare quello!


Sono sicuro che questo può essere notevolmente migliorato: (qualcuno potrebbe renderlo thread-safe forse)

 Stream stream = Stream.iterate(0, n -> n + 1); TakeWhile.stream(stream, n -> n < 10000) .forEach(n -> System.out.print((n == 0 ? "" + n : "," + n))); 

Un hack di sicuro … Non elegante – ma funziona ~: D

 class TakeWhile implements Iterator { private final Iterator iterator; private final Predicate predicate; private volatile T next; private volatile boolean keepGoing = true; public TakeWhile(Stream s, Predicate p) { this.iterator = s.iterator(); this.predicate = p; } @Override public boolean hasNext() { if (!keepGoing) { return false; } if (next != null) { return true; } if (iterator.hasNext()) { next = iterator.next(); keepGoing = predicate.test(next); if (!keepGoing) { next = null; } } return next != null; } @Override public T next() { if (next == null) { if (!hasNext()) { throw new NoSuchElementException("Sorry. Nothing for you."); } } T temp = next; next = null; return temp; } public static  Stream stream(Stream s, Predicate p) { TakeWhile tw = new TakeWhile(s, p); Spliterator split = Spliterators.spliterator(tw, Integer.MAX_VALUE, Spliterator.ORDERED); return StreamSupport.stream(split, false); } } 

Ecco una versione fatta su ints – come richiesto nella domanda.

Uso:

 StreamUtil.takeWhile(IntStream.iterate(1, n -> n + 1), n -> n < 10); 

Ecco il codice per StreamUtil:

 import java.util.PrimitiveIterator; import java.util.Spliterators; import java.util.function.IntConsumer; import java.util.function.IntPredicate; import java.util.stream.IntStream; import java.util.stream.StreamSupport; public class StreamUtil { public static IntStream takeWhile(IntStream stream, IntPredicate predicate) { return StreamSupport.intStream(new PredicateIntSpliterator(stream, predicate), false); } private static class PredicateIntSpliterator extends Spliterators.AbstractIntSpliterator { private final PrimitiveIterator.OfInt iterator; private final IntPredicate predicate; public PredicateIntSpliterator(IntStream stream, IntPredicate predicate) { super(Long.MAX_VALUE, IMMUTABLE); this.iterator = stream.iterator(); this.predicate = predicate; } @Override public boolean tryAdvance(IntConsumer action) { if (iterator.hasNext()) { int value = iterator.nextInt(); if (predicate.test(value)) { action.accept(value); return true; } } return false; } } } 

Questa è la fonte copiata da JDK 9 java.util.stream.Stream.takeWhile (Predicato). Una piccola differenza per lavorare con JDK 8.

 static  Stream takeWhile(Stream stream, Predicate p) { class Taking extends Spliterators.AbstractSpliterator implements Consumer { private static final int CANCEL_CHECK_COUNT = 63; private final Spliterator s; private int count; private T t; private final AtomicBoolean cancel = new AtomicBoolean(); private boolean takeOrDrop = true; Taking(Spliterator s) { super(s.estimateSize(), s.characteristics() & ~(Spliterator.SIZED | Spliterator.SUBSIZED)); this.s = s; } @Override public boolean tryAdvance(Consumer action) { boolean test = true; if (takeOrDrop && // If can take (count != 0 || !cancel.get()) && // and if not cancelled s.tryAdvance(this) && // and if advanced one element (test = p.test(t))) { // and test on element passes action.accept(t); // then accept element return true; } else { // Taking is finished takeOrDrop = false; // Cancel all further traversal and splitting operations // only if test of element failed (short-circuited) if (!test) cancel.set(true); return false; } } @Override public Comparator getComparator() { return s.getComparator(); } @Override public void accept(T t) { count = (count + 1) & CANCEL_CHECK_COUNT; this.t = t; } @Override public Spliterator trySplit() { return null; } } return StreamSupport.stream(new Taking(stream.spliterator()), stream.isParallel()).onClose(stream::close); } 

Vai a prendere la biblioteca AbacusUtil . Fornisce l’API esatta che desideri e altro:

 IntStream.iterate(1, n -> n + 1).takeWhile(n -> n < 10).forEach(System.out::println); 

Dichiarazione: sono lo sviluppatore di AbacusUtil.

Non è ansible interrompere un stream se non con un’operazione di cortocircuito del terminale, che lascerebbe alcuni valori di stream non elaborati indipendentemente dal loro valore. Ma se vuoi solo evitare operazioni su uno stream, puoi aggiungere una trasformazione e filtrare allo stream:

 import java.util.Objects; class ThingProcessor { static Thing returnNullOnCondition(Thing thing) { return( (*** is condition met ***)? null : thing); } void processThings(Collection thingsCollection) { thingsCollection.stream() *** regular stream processing *** .map(ThingProcessor::returnNullOnCondition) .filter(Objects::nonNull) *** continue stream processing *** } } // class ThingProcessor 

Questo trasforma il stream di cose in null quando le cose incontrano qualche condizione, quindi filtra i valori nulli. Se sei disposto a indulgere in effetti collaterali, potresti impostare il valore della condizione su true una volta che qualcosa è stato riscontrato, quindi tutte le cose successive vengono filtrate indipendentemente dal loro valore. Ma anche se non è ansible risparmiare un sacco di (se non del tutto) l’elaborazione filtrando i valori dallo stream che non si desidera elaborare.

In realtà ci sono 2 modi per farlo in Java 8 senza librerie aggiuntive o usando Java 9.

Se vuoi stampare numeri da 2 a 20 sulla console puoi farlo:

 IntStream.iterate(2, (i) -> i + 2).peek(System.out::println).allMatch(i -> i < 20); 

o

 IntStream.iterate(2, (i) -> i + 2).peek(System.out::println).anyMatch(i -> i >= 20); 

L'output è in entrambi i casi:

 2 4 6 8 10 12 14 16 18 20 

Nessuno ha ancora menzionato anyMatch . Questa è la ragione di questo post.

Anche io avevo un requisito simile: invocare il servizio web, se fallisce, riprovare per 3 volte. Se fallisce anche dopo queste numerose prove, invia una notifica via email. Dopo aver fatto ricerche su anyMatch() , anyMatch() venuto come un salvatore. Il mio codice di esempio come segue. Nell’esempio seguente, se il metodo webServiceCall restituisce true nella prima iterazione, il stream non scorre ulteriormente come è stato chiamato anyMatch() . Credo, questo è quello che stai cercando.

 import java.util.stream.IntStream; import io.netty.util.internal.ThreadLocalRandom; class TrialStreamMatch { public static void main(String[] args) { if(!IntStream.range(1,3).anyMatch(integ -> webServiceCall(integ))){ //Code for sending email notifications } } public static boolean webServiceCall(int i){ //For time being, I have written a code for generating boolean randomly //This whole piece needs to be replaced by actual web-service client code boolean bool = ThreadLocalRandom.current().nextBoolean(); System.out.println("Iteration index :: "+i+" bool :: "+bool); //Return success status -- true or false return bool; } 

Ho un’altra soluzione rapida implementando questo (che è davvero sporco, ma hai un’idea):

 public static void main(String[] args) { System.out.println(StreamUtil.iterate(1, o -> o + 1).terminateOn(15) .map(o -> o.toString()).collect(Collectors.joining(", "))); } static interface TerminatedStream { Stream terminateOn(T e); } static class StreamUtil { static  TerminatedStream iterate(T seed, UnaryOperator op) { return new TerminatedStream() { public Stream terminateOn(T e) { Builder builder = Stream. builder().add(seed); T current = seed; while (!current.equals(e)) { current = op.apply(current); builder.add(current); } return builder.build(); } }; } } 

Se hai diversi problemi, potrebbe essere necessaria una soluzione diversa, ma per il tuo problema attuale, vorrei semplicemente:

 IntStream .iterate(1, n -> n + 1) .limit(10) .forEach(System.out::println); 

Potrebbe essere un po ‘fuori tema ma questo è quello che abbiamo per List piuttosto che Stream .

Per prima cosa devi avere un metodo take . Questo metodo prende i primi n elementi:

 static  List take(List l, int n) { if (n <= 0) { return newArrayList(); } else { int takeTo = Math.min(Math.max(n, 0), l.size()); return l.subList(0, takeTo); } } 

funziona come scala.List.take

  assertEquals(newArrayList(1, 2, 3), take(newArrayList(1, 2, 3, 4, 5), 3)); assertEquals(newArrayList(1, 2, 3), take(newArrayList(1, 2, 3), 5)); assertEquals(newArrayList(), take(newArrayList(1, 2, 3), -1)); assertEquals(newArrayList(), take(newArrayList(1, 2, 3), 0)); 

ora sarà abbastanza semplice scrivere un metodo takeWhile basato su take

 static  List takeWhile(List l, Predicate p) { return l.stream(). filter(p.negate()).findFirst(). // find first element when p is false map(l::indexOf). // find the index of that element map(i -> take(l, i)). // take up to the index orElse(l); // return full list if p is true for all elements } 

funziona così:

  assertEquals(newArrayList(1, 2, 3), takeWhile(newArrayList(1, 2, 3, 4, 3, 2, 1), i -> i < 4)); 

questa implementazione itera l'elenco parzialmente per alcune volte ma non aggiungerà ulteriori operazioni O(n^2) . Spero che sia accettabile.

Ecco il mio tentativo utilizzando solo la libreria Java Stream.

  IntStream.iterate(0, i -> i + 1) .filter(n -> { if (n < 10) { System.out.println(n); return false; } else { return true; } }) .findAny();