Qual è il modo corretto per condividere il risultato di una chiamata di rete Http Angolare in RxJs 5?

Usando Http, chiamiamo un metodo che fa una chiamata di rete e restituisce un http osservabile:

getCustomer() { return this.http.get('/someUrl').map(res => res.json()); } 

Se consideriamo questo osservabile e aggiungiamo più abbonati ad esso:

 let network$ = getCustomer(); let subscriber1 = network$.subscribe(...); let subscriber2 = network$.subscribe(...); 

Quello che vogliamo fare è assicurarsi che questo non causi più richieste di rete.

Questo potrebbe sembrare uno scenario insolito, ma in realtà è piuttosto comune: ad esempio se il chiamante si iscrive all’osservabile per visualizzare un messaggio di errore e lo passa al modello utilizzando la pipe async, abbiamo già due abbonati.

Qual è il modo corretto di farlo in RxJs 5?

Vale a dire, questo sembra funzionare bene:

 getCustomer() { return this.http.get('/someUrl').map(res => res.json()).share(); } 

Ma è questo il modo idiomatico di farlo in RxJs 5, o dovremmo invece fare qualcos’altro?

Nota: Come per Angular 5 nuovo HttpClient , la parte .map(res => res.json()) in tutti gli esempi è ora inutile, poiché il risultato JSON è ora assunto per impostazione predefinita.

Memorizza i dati nella cache e, se disponibile, memorizza nella cache, altrimenti restituisci la richiesta HTTP.

 import {Injectable} from '@angular/core'; import {Http, Headers} from '@angular/http'; import {Observable} from 'rxjs/Observable'; import 'rxjs/add/observable/of'; //proper way to import the 'of' operator import 'rxjs/add/operator/share'; import 'rxjs/add/operator/map'; import {Data} from './data'; @Injectable() export class DataService { private url:string = 'https://cors-test.appspot.com/test'; private data: Data; private observable: Observable; constructor(private http:Http) {} getData() { if(this.data) { // if `data` is available just return it as `Observable` return Observable.of(this.data); } else if(this.observable) { // if `this.observable` is set then the request is in progress // return the `Observable` for the ongoing request return this.observable; } else { // example header (not necessary) let headers = new Headers(); headers.append('Content-Type', 'application/json'); // create the request, store the `Observable` for subsequent subscribers this.observable = this.http.get(this.url, { headers: headers }) .map(response => { // when the cached data is available we don't need the `Observable` reference anymore this.observable = null; if(response.status == 400) { return "FAILURE"; } else if(response.status == 200) { this.data = new Data(response.json()); return this.data; } // make it shared so more than one subscriber can get the result }) .share(); return this.observable; } } } 

Esempio di Plunker

Questo artiglio https://blog.thoughtram.io/angular/2018/03/05/advanced-caching-with-rxjs.html è un’ottima spiegazione su come effettuare il cache con shareReplay .

Per suggerimento @Cristian, questo è un modo che funziona bene per gli osservabili HTTP, che emettono solo una volta e poi completano:

 getCustomer() { return this.http.get('/someUrl') .map(res => res.json()).publishLast().refCount(); } 

AGGIORNAMENTO: Ben Lesh dice la prossima versione secondaria dopo la 5.2.0, sarai in grado di chiamare shareReplay () per fare veramente il cache.

IN PRECEDENZA…..

In primo luogo, non usare share () o publishReplay (1) .refCount (), sono gli stessi e il problema con esso, è che condivide solo se le connessioni sono fatte mentre l’osservabile è attivo, se ti connetti dopo averlo completato , crea di nuovo un nuovo osservabile, la traduzione, non proprio il caching.

Birowski ha dato la soluzione giusta sopra, che è usare ReplaySubject. ReplaySubject memorizza nella cache i valori che gli vengono dati (bufferSize) nel nostro caso 1. Non creerà un nuovo osservabile come share () quando refCount raggiunge lo zero e si effettua una nuova connessione, che è il comportamento corretto per la memorizzazione nella cache.

Ecco una funzione riutilizzabile

 export function cacheable(o: Observable): Observable { let replay = new ReplaySubject(1); o.subscribe( x => replay.next(x), x => replay.error(x), () => replay.complete() ); return replay.asObservable(); } 

Ecco come usarlo

 import { Injectable } from '@angular/core'; import { Http } from '@angular/http'; import { Observable } from 'rxjs/Observable'; import { cacheable } from '../utils/rxjs-functions'; @Injectable() export class SettingsService { _cache: Observable; constructor(private _http: Http, ) { } refresh = () => { if (this._cache) { return this._cache; } return this._cache = cacheable(this._http.get('YOUR URL')); } } 

Di seguito è riportata una versione più avanzata della funzione memorizzabile nella cache. Ciò consente di avere una propria tabella di ricerca + la possibilità di fornire una tabella di ricerca personalizzata. In questo modo, non è necessario controllare this._cache come nell’esempio precedente. Si noti inoltre che invece di passare l’osservabile come primo argomento, si passa una funzione che restituisce gli osservabili, questo perché l’Http di Angular viene eseguito immediatamente, quindi restituendo una funzione eseguita pigro, possiamo decidere di non chiamarlo se è già in il nostro cache.

 let cacheableCache: { [key: string]: Observable } = {}; export function cacheable(returnObservable: () => Observable, key?: string, customCache?: { [key: string]: Observable }): Observable { if (!!key && (customCache || cacheableCache)[key]) { return (customCache || cacheableCache)[key] as Observable; } let replay = new ReplaySubject(1); returnObservable().subscribe( x => replay.next(x), x => replay.error(x), () => replay.complete() ); let observable = replay.asObservable(); if (!!key) { if (!!customCache) { customCache[key] = observable; } else { cacheableCache[key] = observable; } } return observable; } 

Uso:

 getData() => cacheable(this._http.get("YOUR URL"), "this is key for my cache") 

secondo questo articolo

Si scopre che possiamo aggiungere facilmente la cache all’osservabile aggiungendo publishReplay (1) e refCount.

così dentro se le affermazioni si aggiungono

 .publishReplay(1) .refCount(); 

per .map(...)

rxjs 5.4.0 ha un nuovo metodo shareReplay .

  • rx-book shareReplay ()
  • Nessun documento su reactivex.io/rxjs

L’autore dice esplicitamente “l’ideale per gestire cose come il caching dei risultati AJAX”

rxjs PR # 2443 feat (shareReplay): aggiunge la variante publishReplay di publishReplay

shareReplay restituisce un osservabile che è la sorgente multicast su un object Replay. L’object di riproduzione viene riciclato in caso di errore dalla fonte, ma non al completamento della fonte. Questo rende shareReplay ideale per la gestione di cose come la cache dei risultati AJAX, in quanto è riprovabile. Il suo comportamento ripetitivo, tuttavia, differisce dalla condivisione in quanto non ripeterà la sorgente osservabile, piuttosto ripeterà i valori osservabili della sorgente.

Ho recitato la domanda, ma cercherò di provarci.

 //this will be the shared observable that //anyone can subscribe to, get the value, //but not cause an api request let customer$ = new Rx.ReplaySubject(1); getCustomer().subscribe(customer$); //here's the first subscriber customer$.subscribe(val => console.log('subscriber 1: ' + val)); //here's the second subscriber setTimeout(() => { customer$.subscribe(val => console.log('subscriber 2: ' + val)); }, 1000); function getCustomer() { return new Rx.Observable(observer => { console.log('api request'); setTimeout(() => { console.log('api response'); observer.next('customer object'); observer.complete(); }, 500); }); } 

Ecco la prova 🙂

C’è solo un take-away: getCustomer().subscribe(customer$)

Non stiamo sottoscrivendo la risposta api di getCustomer() , stiamo sottoscrivendo un ReplaySubject che è osservabile che è anche in grado di sottoscrivere un Observable differente e (e questo è importante) tenere il suo ultimo valore emesso e ripubblicarlo in uno qualsiasi di è abbonati (ReplaySubject).

L’implementazione scelta dipenderà se si desidera annullare l’iscrizione () per annullare la richiesta HTTP o meno.

In ogni caso, i decoratori TypeScript sono un buon modo per standardizzare il comportamento. Questo è quello che ho scritto:

  @CacheObservableArgsKey getMyThing(id: string): Observable { return this.http.get('things/'+id); } 

Definizione decoratore:

 /** * Decorator that replays and connects to the Observable returned from the function. * Caches the result using all arguments to form a key. * @param target * @param name * @param descriptor * @returns {PropertyDescriptor} */ export function CacheObservableArgsKey(target: Object, name: string, descriptor: PropertyDescriptor) { const originalFunc = descriptor.value; const cacheMap = new Map(); descriptor.value = function(this: any, ...args: any[]): any { const key = args.join('::'); let returnValue = cacheMap.get(key); if (returnValue !== undefined) { console.log(`${name} cache-hit ${key}`, returnValue); return returnValue; } returnValue = originalFunc.apply(this, args); console.log(`${name} cache-miss ${key} new`, returnValue); if (returnValue instanceof Observable) { returnValue = returnValue.publishReplay(1); returnValue.connect(); } else { console.warn('CacheHttpArgsKey: value not an Observable cannot publishReplay and connect', returnValue); } cacheMap.set(key, returnValue); return returnValue; }; return descriptor; } 

Ho trovato un modo per archiviare il risultato http get in sessionStorage e utilizzarlo per la sessione, in modo che non richiamerà mai più il server.

L’ho usato per chiamare l’API di github per evitare il limite di utilizzo.

 @Injectable() export class HttpCache { constructor(private http: Http) {} get(url: string): Observable { let cached: any; if (cached === sessionStorage.getItem(url)) { return Observable.of(JSON.parse(cached)); } else { return this.http.get(url) .map(resp => { sessionStorage.setItem(url, resp.text()); return resp.json(); }); } } } 

Cordiali saluti, il limite di sessionStorage è 5M (o 4.75M). Quindi, non dovrebbe essere usato in questo modo per un ampio set di dati.

—— modificare ————-
Se si desidera avere dati aggiornati con F5, che utilizza i dati della memoria invece di sessionStorage;

 @Injectable() export class HttpCache { cached: any = {}; // this will store data constructor(private http: Http) {} get(url: string): Observable { if (this.cached[url]) { return Observable.of(this.cached[url])); } else { return this.http.get(url) .map(resp => { this.cached[url] = resp.text(); return resp.json(); }); } } } 

Dati di risposta HTTP memorizzabili nella cache utilizzando Rxjs Observer / Observable + Caching + Subscription

Vedi il codice qui sotto

* disclaimer: sono nuovo di rxjs, quindi tieni presente che potrei abusare dell’approccio osservabile / osservatore. La mia soluzione è puramente un conglomerato di altre soluzioni che ho trovato ed è la conseguenza di non aver trovato una soluzione semplice e ben documentata. Quindi sto fornendo la mia soluzione completa di codice (come mi sarebbe piaciuto aver trovato) nella speranza che aiuti gli altri.

* nota, questo approccio è liberamente basato su GoogleFirebaseObservables. Purtroppo mi manca la giusta esperienza / tempo per replicare ciò che hanno fatto sotto il cofano. Ma il seguente è un modo semplicistico di fornire accesso asincrono ad alcuni dati in grado di cache.

Situazione : un componente “elenco prodotti” è incaricato di visualizzare un elenco di prodotti. Il sito è un’app web a pagina singola con alcuni pulsanti di menu che “filtrano” i prodotti visualizzati nella pagina.

Soluzione : il componente “sottoscrive” un metodo di servizio. Il metodo di servizio restituisce una matrice di oggetti prodotto, a cui il componente accede tramite la richiamata della sottoscrizione. Il metodo di servizio avvolge la sua attività in un Observer appena creato e restituisce l’osservatore. All’interno di questo osservatore, cerca i dati memorizzati nella cache e li restituisce al sottoscrittore (il componente) e restituisce. Altrimenti emette una chiamata http per recuperare i dati, si iscrive alla risposta, dove è ansible elaborare tali dati (ad esempio, mappare i dati sul proprio modello) e quindi passare i dati all’abbonato.

Il codice

prodotto-list.component.ts

 import { Component, OnInit, Input } from '@angular/core'; import { ProductService } from '../../../services/product.service'; import { Product, ProductResponse } from '../../../models/Product'; @Component({ selector: 'app-product-list', templateUrl: './product-list.component.html', styleUrls: ['./product-list.component.scss'] }) export class ProductListComponent implements OnInit { products: Product[]; constructor( private productService: ProductService ) { } ngOnInit() { console.log('product-list init...'); this.productService.getProducts().subscribe(products => { console.log('product-list received updated products'); this.products = products; }); } } 

product.service.ts

 import { Injectable } from '@angular/core'; import { Http, Headers } from '@angular/http'; import { Observable, Observer } from 'rxjs'; import 'rxjs/add/operator/map'; import { Product, ProductResponse } from '../models/Product'; @Injectable() export class ProductService { products: Product[]; constructor( private http:Http ) { console.log('product service init. calling http to get products...'); } getProducts():Observable{ //wrap getProducts around an Observable to make it async. let productsObservable$ = Observable.create((observer: Observer) => { //return products if it was previously fetched if(this.products){ console.log('## returning existing products'); observer.next(this.products); return observer.complete(); } //Fetch products from REST API console.log('** products do not yet exist; fetching from rest api...'); let headers = new Headers(); this.http.get('http://localhost:3000/products/', {headers: headers}) .map(res => res.json()).subscribe((response:ProductResponse) => { console.log('productResponse: ', response); let productlist = Product.fromJsonList(response.products); //convert service observable to product[] this.products = productlist; observer.next(productlist); }); }); return productsObservable$; } } 

product.ts (il modello)

 export interface ProductResponse { success: boolean; msg: string; products: Product[]; } export class Product { product_id: number; sku: string; product_title: string; ..etc... constructor(product_id: number, sku: string, product_title: string, ...etc... ){ //typescript will not autoassign the formal parameters to related properties for exported classs. this.product_id = product_id; this.sku = sku; this.product_title = product_title; ...etc... } //Class method to convert products within http response to pure array of Product objects. //Caller: product.service:getProducts() static fromJsonList(products:any): Product[] { let mappedArray = products.map(Product.fromJson); return mappedArray; } //add more parameters depending on your database entries and constructor static fromJson({ product_id, sku, product_title, ...etc... }): Product { return new Product( product_id, sku, product_title, ...etc... ); } } 

Ecco un esempio dell’output che vedo quando carico la pagina in Chrome. Si noti che sul caricamento iniziale, i prodotti vengono recuperati da http (chiamata al servizio di resto del mio nodo, che viene eseguito localmente sulla porta 3000). Quando faccio clic su per passare a una vista “filtrata” dei prodotti, i prodotti si trovano nella cache.

Il mio registro Chrome (console):

 core.es5.js:2925 Angular is running in the development mode. Call enableProdMode() to enable the production mode. app.component.ts:19 app.component url: /products product.service.ts:15 product service init. calling http to get products... product-list.component.ts:18 product-list init... product.service.ts:29 ** products do not yet exist; fetching from rest api... product.service.ts:33 productResponse: {success: true, msg: "Products found", products: Array(23)} product-list.component.ts:20 product-list received updated products 

… [ha fatto clic su un pulsante del menu per filtrare i prodotti] …

 app.component.ts:19 app.component url: /products/chocolatechip product-list.component.ts:18 product-list init... product.service.ts:24 ## returning existing products product-list.component.ts:20 product-list received updated products 

Conclusione: questo è il modo più semplice che ho trovato (finora) per implementare dati di risposta http memorizzabili nella cache. Nella mia app angular, ogni volta che si passa a una diversa visualizzazione dei prodotti, il componente elenco di prodotti viene ricaricato. ProductService sembra essere un’istanza condivisa, quindi la cache locale di “prodotti: Prodotto []” nel ProductService viene mantenuta durante la navigazione e le chiamate successive a “GetProducts ()” restituiscono il valore memorizzato nella cache. Un’ultima nota, ho letto i commenti su come gli osservabili / abbonamenti devono essere chiusi quando hai finito per evitare “perdite di memoria”. Non l’ho incluso qui, ma è qualcosa da tenere a mente.

Suppongo che @ ngx-cache / core possa essere utile per mantenere le funzionalità di caching per le chiamate http, specialmente se la chiamata HTTP viene effettuata sia su piattaforms browser che su server .

Diciamo che abbiamo il seguente metodo:

 getCustomer() { return this.http.get('/someUrl').map(res => res.json()); } 

Puoi usare il decoratore Cached di @ ngx-cache / core per memorizzare il valore restituito dal metodo che effettua la chiamata HTTP cache storage ( la storage può essere configurabile, controlla l’implementazione su ng-seed / universal ) – direttamente su la prima esecuzione. Le volte successive in cui viene richiamato il metodo (indipendentemente dalla piattaforma del browser o del server ), il valore viene recuperato dalla cache storage .

 import { Cached } from '@ngx-cache/core'; ... @Cached('get-customer') // the cache key/identifier getCustomer() { return this.http.get('/someUrl').map(res => res.json()); } 

C’è anche la possibilità di utilizzare i metodi di memorizzazione nella cache ( has , get , set ) utilizzando l’ API di caching .

anyclass.ts

 ... import { CacheService } from '@ngx-cache/core'; @Injectable() export class AnyClass { constructor(private readonly cache: CacheService) { // note that CacheService is injected into a private property of AnyClass } // will retrieve 'some string value' getSomeStringValue(): string { if (this.cache.has('some-string')) return this.cache.get('some-string'); this.cache.set('some-string', 'some string value'); return 'some string value'; } } 

Ecco l’elenco dei pacchetti, sia per il caching lato client che lato server:

  • @ ngx-cache / core : utilità cache
  • @ ngx-cache / platform-browser : implementazione della piattaforma SPA / Browser
  • @ ngx-cache / platform-server : implementazione della piattaforma server
  • @ ngx-cache / fs-storage : utility di archiviazione (richiesta per la piattaforma del server)

rxjs 5.3.0

Non sono stato contento di .map(myFunction).publishReplay(1).refCount()

Con più abbonati, .map() esegue myFunction due volte in alcuni casi (mi aspetto che venga eseguito solo una volta). Una correzione sembra essere publishReplay(1).refCount().take(1)

Un’altra cosa che puoi fare è non usare refCount() e rendere subito refCount() l’Observable:

 let obs = this.http.get('my/data.json').publishReplay(1); obs.connect(); return obs; 

Ciò avvierà la richiesta HTTP indipendentemente dagli iscritti. Non sono sicuro che l’annullamento dell’iscrizione prima della conclusione di HTTP GET lo annullerà o meno.

Quello che vogliamo fare è assicurarsi che questo non causi più richieste di rete.

Il mio preferito è utilizzare i metodi async per le chiamate che fanno richieste di rete. I metodi stessi non restituiscono un valore, invece aggiornano un BehaviorSubject all’interno dello stesso servizio, a cui i componenti si iscriveranno.

Ora Perché utilizzare BehaviorSubject anziché Observable ? Perché,

  • Su abbonamento BehaviorSubject restituisce l’ultimo valore mentre un osservabile regolare si triggers solo quando riceve un onnext .
  • Se si desidera recuperare l’ultimo valore di BehaviorSubject in un codice non osservabile (senza un abbonamento), è ansible utilizzare il metodo getValue() .

Esempio:

customer.service.ts

 public customers$: BehaviorSubject = new BehaviorSubject([]); public async getCustomers(): Promise { let customers = await this.httpClient.post(this.endPoint, criteria).toPromise(); if (customers) this.customers$.next(customers); } 

Quindi, ovunque richiesto, possiamo semplicemente iscriversi ai customers$ .

 public ngOnInit(): void { this.customerService.customers$ .subscribe((customers: Customer[]) => this.customerList = customers); } 

O forse vuoi usarlo direttamente in un modello

 
  • ...
  • Quindi, fino a quando non si effettua un’altra chiamata per ottenere getCustomers , i dati vengono conservati nei customers$ BehaviorSubject.

    Quindi, cosa succede se si desidera aggiornare questi dati? basta effettuare una chiamata per getCustomers()

     public async refresh(): Promise { try { await this.customerService.getCustomers(); } catch (e) { // request failed, handle exception console.error(e); } } 

    Utilizzando questo metodo, non è necessario mantenere esplicitamente i dati tra le chiamate di rete successive mentre viene gestito da BehaviorSubject .

    PS: Di solito quando un componente viene distrutto è una buona pratica per sbarazzarsi degli abbonamenti, per questo è ansible utilizzare il metodo suggerito in questa risposta.

    Basta chiamare share () dopo la mappa e prima di qualsiasi sottoscrizione .

    Nel mio caso, ho un servizio generico (RestClientService.ts) che sta facendo il resto della chiamata, estraendo dati, verificando gli errori e ritornando osservabile a un servizio di implementazione concreto (es .: ContractClientService.ts), infine questa implementazione concreta ritorna osservabile a de ContractComponent.ts, e questo si iscrive per aggiornare la vista.

    RestClientService.ts:

     export abstract class RestClientService { public GetAll = (path: string, property: string): Observable => { let fullPath = this.actionUrl + path; let observable = this._http.get(fullPath).map(res => this.extractData(res, property)); observable = observable.share(); //allows multiple subscribers without making again the http request observable.subscribe( (res) => {}, error => this.handleError2(error, "GetAll", fullPath), () => {} ); return observable; } private extractData(res: Response, property: string) { ... } private handleError2(error: any, method: string, path: string) { ... } } 

    ContractService.ts:

     export class ContractService extends RestClientService { private GET_ALL_ITEMS_REST_URI_PATH = "search"; private GET_ALL_ITEMS_PROPERTY_PATH = "contract"; public getAllItems(): Observable { return this.GetAll(this.GET_ALL_ITEMS_REST_URI_PATH, this.GET_ALL_ITEMS_PROPERTY_PATH); } } 

    ContractComponent.ts:

     export class ContractComponent implements OnInit { getAllItems() { this.rcService.getAllItems().subscribe((data) => { this.items = data; }); } } 

    Ho scritto una class cache,

     /** * Caches results returned from given fetcher callback for given key, * up to maxItems results, deletes the oldest results when full (FIFO). */ export class StaticCache { static cachedData: Map = new Map(); static maxItems: number = 400; static get(key: string){ return this.cachedData.get(key); } static getOrFetch(key: string, fetcher: (string) => any): any { let value = this.cachedData.get(key); if (value != null){ console.log("Cache HIT! (fetcher)"); return value; } console.log("Cache MISS... (fetcher)"); value = fetcher(key); this.add(key, value); return value; } static add(key, value){ this.cachedData.set(key, value); this.deleteOverflowing(); } static deleteOverflowing(): void { if (this.cachedData.size > this.maxItems) { this.deleteOldest(this.cachedData.size - this.maxItems); } } /// A Map object iterates its elements in insertion order — a for...of loop returns an array of [key, value] for each iteration. /// However that seems not to work. Trying with forEach. static deleteOldest(howMany: number): void { //console.debug("Deleting oldest " + howMany + " of " + this.cachedData.size); let iterKeys = this.cachedData.keys(); let item: IteratorResult; while (howMany-- > 0 && (item = iterKeys.next(), !item.done)){ //console.debug(" Deleting: " + item.value); this.cachedData.delete(item.value); // Deleting while iterating should be ok in JS. } } static clear(): void { this.cachedData = new Map(); } } 

    It’s all static because of how we use it, but feel free to make it a normal class and a service. I’m not sure if angular keeps a single instance for the whole time though (new to Angular2).

    And this is how I use it:

      let httpService: Http = this.http; function fetcher(url: string): Observable { console.log(" Fetching URL: " + url); return httpService.get(url).map((response: Response) => { if (!response) return null; if (typeof response.json() !== "array") throw new Error("Graph REST should return an array of vertices."); let items: any[] = graphService.fromJSONarray(response.json(), httpService); return array ? items : items[0]; }); } // If data is a link, return a result of a service call. if (this.data[verticesLabel][name]["link"] || this.data[verticesLabel][name]["_type"] == "link") { // Make an HTTP call. let url = this.data[verticesLabel][name]["link"]; let cachedObservable: Observable = StaticCache.getOrFetch(url, fetcher); if (!cachedObservable) throw new Error("Failed loading link: " + url); return cachedObservable; } 

    I assume there could be a more clever way, which would use some Observable tricks but this was just fine for my purposes.

    Just use this cache layer, it does everything you requires, and even manage cache for ajax requests.

    http://www.ravinderpayal.com/blogs/12Jan2017-Ajax-Cache-Mangement-Angular2-Service.html

    It’s this much easy to use

     @Component({ selector: 'home', templateUrl: './html/home.component.html', styleUrls: ['./css/home.component.css'], }) export class HomeComponent { constructor(AjaxService:AjaxService){ AjaxService.postCache("/api/home/articles").subscribe(values=>{console.log(values);this.articles=values;}); } articles={1:[{data:[{title:"first",sort_text:"description"},{title:"second",sort_text:"description"}],type:"Open Source Works"}]}; } 

    The layer(as an inject-able angular service) is

     import { Injectable } from '@angular/core'; import { Http, Response} from '@angular/http'; import { Observable } from 'rxjs/Observable'; import './../rxjs/operator' @Injectable() export class AjaxService { public data:Object={}; /* private dataObservable:Observable; */ private dataObserver:Array=[]; private loading:Object={}; private links:Object={}; counter:number=-1; constructor (private http: Http) { } private loadPostCache(link:string){ if(!this.loading[link]){ this.loading[link]=true; this.links[link].forEach(a=>this.dataObserver[a].next(false)); this.http.get(link) .map(this.setValue) .catch(this.handleError).subscribe( values => { this.data[link] = values; delete this.loading[link]; this.links[link].forEach(a=>this.dataObserver[a].next(false)); }, error => { delete this.loading[link]; } ); } } private setValue(res: Response) { return res.json() || { }; } private handleError (error: Response | any) { // In a real world app, we might use a remote logging infrastructure let errMsg: string; if (error instanceof Response) { const body = error.json() || ''; const err = body.error || JSON.stringify(body); errMsg = `${error.status} - ${error.statusText || ''} ${err}`; } else { errMsg = error.message ? error.message : error.toString(); } console.error(errMsg); return Observable.throw(errMsg); } postCache(link:string): Observable{ return Observable.create(observer=> { if(this.data.hasOwnProperty(link)){ observer.next(this.data[link]); } else{ let _observable=Observable.create(_observer=>{ this.counter=this.counter+1; this.dataObserver[this.counter]=_observer; this.links.hasOwnProperty(link)?this.links[link].push(this.counter):(this.links[link]=[this.counter]); _observer.next(false); }); this.loadPostCache(link); _observable.subscribe(status=>{ if(status){ observer.next(this.data[link]); } } ); } }); } } 

    It’s .publishReplay(1).refCount(); or .publishLast().refCount(); since Angular Http observables complete after request.

    This simple class caches the result so you can subscribe to .value many times and makes only 1 request. You can also use .reload() to make new request and publish data.

    You can use it like:

     let res = new RestResource(() => this.http.get('inline.bundleo.js')); res.status.subscribe((loading)=>{ console.log('STATUS=',loading); }); res.value.subscribe((value) => { console.log('VALUE=', value); }); 

    and the source:

     export class RestResource { static readonly LOADING: string = 'RestResource_Loading'; static readonly ERROR: string = 'RestResource_Error'; static readonly IDLE: string = 'RestResource_Idle'; public value: Observable; public status: Observable; private loadStatus: Observer; private reloader: Observable; private reloadTrigger: Observer; constructor(requestObservableFn: () => Observable) { this.status = Observable.create((o) => { this.loadStatus = o; }); this.reloader = Observable.create((o: Observer) => { this.reloadTrigger = o; }); this.value = this.reloader.startWith(null).switchMap(() => { if (this.loadStatus) { this.loadStatus.next(RestResource.LOADING); } return requestObservableFn() .map((res) => { if (this.loadStatus) { this.loadStatus.next(RestResource.IDLE); } return res; }).catch((err)=>{ if (this.loadStatus) { this.loadStatus.next(RestResource.ERROR); } return Observable.of(null); }); }).publishReplay(1).refCount(); } reload() { this.reloadTrigger.next(null); } } 

    You can build simple class Cacheable<> that helps managing data retrieved from http server with multiple subscribers:

     declare type GetDataHandler = () => Observable; export class Cacheable { protected data: T; protected subjectData: Subject; protected observableData: Observable; public getHandler: GetDataHandler; constructor() { this.subjectData = new ReplaySubject(1); this.observableData = this.subjectData.asObservable(); } public getData(): Observable { if (!this.getHandler) { throw new Error("getHandler is not defined"); } if (!this.data) { this.getHandler().map((r: T) => { this.data = r; return r; }).subscribe( result => this.subjectData.next(result), err => this.subjectData.error(err) ); } return this.observableData; } public resetCache(): void { this.data = null; } public refresh(): void { this.resetCache(); this.getData(); } } 

    uso

    Declare Cacheable<> object (presumably as part of the service):

     list: Cacheable = new Cacheable(); 

    and handler:

     this.list.getHandler = () => { // get data from server return this.http.get(url) .map((r: Response) => r.json() as string[]); } 

    Call from a component:

     //gets data from server List.getData().subscribe(…) 

    You can have several components subscribed to it.

    More details and code example are here: http://devinstance.net/articles/20171021/rxjs-cacheable

    Great answers.

    Or you could do this:

    This is from latest version of rxjs. I am using 5.5.7 version of RxJS

     import {share} from "rxjs/operators"; this.http.get('/someUrl').pipe(share()); 

    Have you tried running the code you already have?

    Because you are constructing the Observable from the promise resulting from getJSON() , the network request is made before anyone subscribes. And the resulting promise is shared by all subscribers.

     var promise = jQuery.getJSON(requestUrl); // network call is executed now var o = Rx.Observable.fromPromise(promise); // just wraps it in an observable o.subscribe(...); // does not trigger network call o.subscribe(...); // does not trigger network call // ...