RxJS 7 ile Gelen Yenilikler

Neler değişti? Nasıl geçiş yapılır?

28.04.2021 Çarşamba
RxJS
20 dakika

JavaScript’te reaktif programlamanın en tanınmış ve yaygın kullanılan kütüphanesi RxJS. Öyle ki, bugün haftalık indirme sayıları 25 milyon civarında (2.3x React, 9.7x Angular, 11x Vue). Tam 3 yıldır RxJS 6’yı kullanıyorduk. İlk alpha sürümü Eylül 2019’da yayınlanan RxJS 7’nin RC sürümüyse bir hafta kadar önce görücüye çıktı. Peki RxJS 7’de bizi bekleyen değişiklikler neler? RxJS 7 ile 6 arasında ne gibi farklar var? RxJS 7’ye geçmeli miyiz? (Giriş, haber sitelerinin SEO çalışması gibi oldu ama neyse… 😀)

3 Mayıs 2021 Güncelleme: 7.0.0 sürümü 4 gün önce yayınlandı. Bu yazıda anlatılanları bozan bir değişiklik yok.

RxJS 7

Anlatacaklarımı bir yandan denemek isteyebilirsiniz, o yüzden halen RC sürümünde olan RxJS 7’yi projenize nasıl dahil edeceğinizi görelim:

yarn add rxjs@next

…veya…

npm install rxjs@next

RxJS 7’de TypeScript, boyut, bellek ve hız

Daha iyi tipler

RxJS 7’deki önemli değişikliklerden biri belki de en az göz önünde olanı. Değişiklik kütüğüne baktığımızda görüyoruz ki tipler üzerinde epey bir emek var. Özellikle şu konularda çalışmalar göze çarpıyor:

  • of gibi n sayıda parametre alan fonksiyonlar artık 8-9’un üzerinde de tipleri doğru çıkarabiliyor.
import { of } from "rxjs";

// Observable<string, number> (RxJS 6'da → No overload matches this call)
of(0, "A", 1, "B", 2, "C", 3, "D", 4, "E", 5, "F", 6, "G", 7, "H", 8, "I", 9, "...");
  • groupBy‘ın oluşturduğu grupları keyine göre tiplerine ayrıştırabiliyoruz.
import { of } from "rxjs";
import { groupBy, map, mergeMap } from "rxjs/operators";

// Observable<string, number>
of(0, "A", 1, "B", 2, "C", 3, "D", 4, "E", 5, "F", 6, "G", 7, "H", 8, "I", 9, "...")
  .pipe(
    groupBy((x): x is number => typeof x === "number"),
    mergeMap(group$ => (group$.key === true ? group$.pipe(map(String)) : group$))
  )
  .subscribe();
// Observable<string> (RxJS 6'da → Observable<string | number>)
  • filter(Boolean) ile koşuldan geçemeyen tipleri filtreleyebiliyoruz.
import { of } from "rxjs";
import { filter } from "rxjs/operators";

// Observable<"" | 0 | null | undefined | Date>
of("" as const, 0 as const, null, undefined, new Date())
  .pipe(filter(Boolean))
  .subscribe();
// Observable<Date> (RxJS 6'da → Observable<unknown>)
  • next metodu artık Subject tipini dikkate alıyor.
import { Subject } from "rxjs";

const number$ = new Subject<number>();
number$.next();

// Error: An argument for 'value' was not provided.
// RxJS 6'da hata yoktu.

Daha küçük paket boyutu

Ben Lesh, tüm kütüphanenin üzerinden teker teker geçerek toplam paket boyutunu %39’a varan oranda düşürdüklerini söylüyor. Bu müthiş bir efor ve harika bir sonuç. Ben de ufak bir Angular projesinde bunu denedim. Sonuçlar şöyle:

RxJS 6 ile dosya boyutlarını gösteren tablo
RxJS 6 ile derlendiğinde
RxJS 7 ile dosya boyutlarını gösteren tablo
RxJS 7 ile derlendiğinde

Sadece RxJS 6 yerine RxJS 7 kullanmanın toplam paket boyutuna etkisi 10kB oldu. Küçük bir uygulama için hiç fena değil. Ayrıca operatörlere daha çok başvurulan ve “lazy chunk” altında gösterilen dosyada daha etkili olduğunu görüyoruz. 👏💯

Daha az bellek tüketimi

RxJS’te Observable yayan Observablelara “higher order observable” diyoruz.

import { of } from "rxjs";
import { map, concatAll } from "rxjs/operators";

of(1, 2, 3)
  .pipe(
    map(id =>
      fromFetch(`https://jsonplaceholder.typicode.com/todos/${id}`, {
        selector: resp => resp.json()
      })
    ),
    concatAll()
  )
  .subscribe(todo => console.log(todo.title));
// (bir süre sonra) delectus aut autem
// (bir süre sonra) quis ut nam facilis et officia qui
// (bir süre sonra) fugiat veniam minus

Burada of ile oluşturduğumuz Observable‘a “outer observable”, fromFetch ile oluşturduğumuzaysa “inner observable” adı veriliyor. RxJS 6’da operatörler “outer observable”ların yaydığı değerleri de yakalıyormuş. RxJS 7’de şu commitle bu ortadan kaldırılarak bellek tüketimi düşürülmüş. Anlaşılan arada bolca da kod silinmiş.

Daha hızlı bir RxJS

Açıkçası bu kısmı kendim denemedim. Orada burada duyduklarım ve Ben Lesh’in açıklamaları bu yönde. Bkz. aşağıdaki tvit.

RxJS 7'nin yaklaşık %20 hızlı olduğu yazan tvit

Tabi ki %20’nin sabit olduğu iddia edilmiyor, ama atılan kodlar ve yapılan iyileştirmeler işe yaramış herhalde.

💬 Tvitte görmüş olabilirsiniz, ama heyecan yapmayın, for…await desteği sonradan kaldırıldı.

RxJS 7’de Öne Çıkan Yenilik ve Değişiklikler

toPromise → firstValueFrom, lastValueFrom

RxJS 6 kullanan çoğu kişinin duyduğu bir değişiklikle başlayalım: toPromise artık kullanım dışı (deprecated). Ama hemen korkmayın, kütüphaneden henüz kaldırılmış değil. Yani, RxJS 7’de toPromise hala kullanılabiliyor. Bununla birlikte, en yakın zamanda projenizdeki tüm toPromiseleri temizlemenizi öneririm, çünkü kaldırılacak. Ayrıca, döndüğü tip undefined da olabilecek şekilde ayarlanmış, dolayısıyla TypeScript’i strict modunda kullanan projeleri patlatabilir.

Peki neye dönüştürebilirsiniz? Önce toPromise metodunun nasıl çalıştığını şöyle bir hatırlayalım:

import { interval } from "rxjs";
import { map, take } from "rxjs/operators";

const count1To5$ = interval(1000).pipe(
  take(5),
  map(i => i + 1)
);

count1To5$.toPromise().then(console.log);
// (~5s sonra) 5

Neden 1 değil de 5? Çünkü toPromise, kaynak Observable sonlandığında (complete), onun yaydığı (emit) son değerle çözümlenen (resolve) bir Promise dönüyor. RxJS 7’de toPromise yerine iki farklı fonksiyondan yararlanabiliyoruz: firstValueFrom ve lastValueFrom.

import { interval, firstValueFrom, lastValueFrom } from "rxjs";
import { map, take } from "rxjs/operators";

const count1To5$ = interval(1000).pipe(
  take(5),
  map(i => i + 1)
);

firstValueFrom(count1To5$).then(console.log);
// (~1s sonra) 1

lastValueFrom(count1To5$).then(console.log);
// (~5s sonra) 5

Eğer lastValueFrom ile toPromise arasında fark var mı diye merak ediyorsanız, evet var. Şöyle ki, toPromise eğer kaynak Observable bir değer yaymadan sonlanırsa undefined değeriyle çözümleniyordu.

import { EMPTY } from "rxjs";

EMPTY.toPromise().then(console.log);
// (asenkron) undefined

Bu biraz talihsiz, çünkü:

  • toPromisein döndüğü Promise gerçekten de undefined yayabilir, hangi durum olduğunu anlamanın bir yolu yok.
  • Aslında, bir değer yayılmaması durumunun hata olarak değerlendirilmesi lazım.

Yeni fonksiyonumuz lastValueFrom böyle yapmıyor. Bir değer yaymadan sonlananan Observablelar için EmptyErrorImpl şeklinde özel bir hata fırlatıyor. Aşağıdaki örnekteki catch kullanımına dikkat.

import { EMPTY, firstValueFrom } from "rxjs";

lastValueFrom(EMPTY).catch(console.log);
// (asenkron) EmptyErrorImpl

Peki bu hatayı diğerlerinden nasıl ayıklarız? Bunun için EmptyError sınıfını kullanacağız.

import { EMPTY, EmptyError, lastValueFrom } from "rxjs";

lastValueFrom(EMPTY).catch(err => console.log(err instanceof EmptyError));
// (asenkron) true

Buradaki EmptyError ve EmptyErrorImpl kafa karıştırmasın. Kaynak kodundan alınmış aşağıdaki kısım iki farklı isim görmemizin sebebini açıklıyor:

export const EmptyError: EmptyErrorCtor = createErrorClass(
  _super =>
    function EmptyErrorImpl(this: any) {
      _super(this);
      this.name = "EmptyError";
      this.message = "no elements in sequence";
    }
);

Şunu neden yapmamışlar diye düşünebilirsiniz:

export class EmptyError extends Error {
  constructor() {
    super("no elements in sequence");
    this.name = "EmptyError";
  }
}

Sebebi şu: new EmptyError() instanceof EmptyError ifadesinin (expression) true dönmesi beklenir; fakat TypeScript ES5’e derlendiğinde false dönüyor. (Bkz. TypeScript #12123). RxJS ekibi de bunu aşmak için Object.create kullanmış ve hata nesnesinin constructorı hata tipinden farklı isme sahip. Bu yaklaşım RxJS tarafından fırlatılan bütün hatalar için geçerli.

🔎 Aslında böyle olmak zorunda değil; ama no-shadowed-variable TSLint kuralını ezmeyi bırakmak için atılan şu commit bunu gerektirmiş.

combineLatest observable dictionary

RxJS 6’da forkJoine Observable sözlükleri (dictionary) verebiliyorduk.

import { forkJoin, interval } from "rxjs";
import { map } from "rxjs/operators";

const count1To5$ = interval(1000).pipe(
  take(5),
  map(i => i + 1)
);

const count6To9$ = interval(1000).pipe(
  take(4),
  map(i => i + 6)
);

forkJoin({ x: count1to5$, y: count6to9$ }).subscribe(console.log);
// (~5s sonra) {x: 5, y: 9}

RxJS 7’de artık combineLatest ile de bu mümkün.

import { combineLatest, interval } from "rxjs";
import { map } from "rxjs/operators";

const count1To5$ = interval(1000).pipe(
  take(5),
  map(i => i + 1)
);

const count6To9$ = interval(1000).pipe(
  take(4),
  map(i => i + 6)
);

combineLatest({ x: count1to5$, y: count6to9$ }).subscribe(console.log);
// (~1s sonra) {x: 1, y: 6}
// (~1s sonra) {x: 2, y: 6} (hemen sonra) {x: 2, y: 7}
// (~1s sonra) {x: 3, y: 7} (hemen sonra) {x: 3, y: 8}
// (~1s sonra) {x: 4, y: 8} (hemen sonra) {x: 4, y: 9}
// (~1s sonra) {x: 5, y: 9}

combineLatestWith

RxJS 6’da combineLatest operatörü kullanım dışı bırakılmıştı.

import { interval } from "rxjs";
import { combineLatest, map } from "rxjs/operators";

const count1To5$ = interval(1000).pipe(
  take(5),
  map(i => i + 1)
);

const count6To9$ = interval(1000).pipe(
  take(4),
  map(i => i + 6)
);

count1To5$.pipe(combineLatest(count6To9$)).subscribe(console.log);
// (~1s sonra) [1,6]
// (~1s sonra) [2,6] (hemen sonra) [2,7]
// (~1s sonra) [3,7] (hemen sonra) [3,8]
// (~1s sonra) [4,8] (hemen sonra) [4,9]
// (~1s sonra) [5,9]

RxJS 7’de onun yerine combineLatestWith operatörü geldi.

import { interval } from "rxjs";
import { combineLatestWith, map } from "rxjs/operators";

const count1To5$ = interval(1000).pipe(
  take(5),
  map(i => i + 1)
);

const count6To9$ = interval(1000).pipe(
  take(4),
  map(i => i + 6)
);

count1To5$.pipe(combineLatestWith(count6To9$)).subscribe(console.log);
// (~1s sonra) [1,6]
// (~1s sonra) [2,6] (hemen sonra) [2,7]
// (~1s sonra) [3,7] (hemen sonra) [3,8]
// (~1s sonra) [4,8] (hemen sonra) [4,9]
// (~1s sonra) [5,9]

mergeWith

RxJS 6’da merge operatörü kullanım dışı bırakılmıştı.

import { interval } from "rxjs";
import { map, merge } from "rxjs/operators";

const count1To5$ = interval(1000).pipe(
  take(5),
  map(i => i + 1)
);

const count6To9$ = interval(1000).pipe(
  take(4),
  map(i => i + 6)
);

count1To5$.pipe(merge(count6To9$)).subscribe(console.log);
// (~1s sonra) 1 (hemen sonra) 6
// (~1s sonra) 2 (hemen sonra) 7
// (~1s sonra) 3 (hemen sonra) 8
// (~1s sonra) 4 (hemen sonra) 9
// (~1s sonra) 5

RxJS 7’de onun yerine mergeWith operatörü geldi.

import { interval } from "rxjs";
import { map, mergeWith } from "rxjs/operators";

const count1To5$ = interval(1000).pipe(
  take(5),
  map(i => i + 1)
);

const count6To9$ = interval(1000).pipe(
  take(4),
  map(i => i + 6)
);

count1To5$.pipe(mergeWith(count6To9$)).subscribe(console.log);
// (~1s sonra) 1 (hemen sonra) 6
// (~1s sonra) 2 (hemen sonra) 7
// (~1s sonra) 3 (hemen sonra) 8
// (~1s sonra) 4 (hemen sonra) 9
// (~1s sonra) 5

zipWith

RxJS 6’da zip operatörü kullanım dışı bırakılmıştı.

import { interval } from "rxjs";
import { map, zip } from "rxjs/operators";

const count1To5$ = interval(1000).pipe(
  take(5),
  map(i => i + 1)
);

const count6To9$ = interval(1000).pipe(
  take(4),
  map(i => i + 6)
);

count1To5$.pipe(zip(count6To9$)).subscribe(console.log);
// (~1s arayla) [1,6], [2,7], [3,8], [4,9]

RxJS 7’de onun yerine zipWith operatörü geldi.

import { interval } from "rxjs";
import { map, zipWith } from "rxjs/operators";

const count1To5$ = interval(1000).pipe(
  take(5),
  map(i => i + 1)
);

const count6To9$ = interval(1000).pipe(
  take(4),
  map(i => i + 6)
);

count1To5$.pipe(zipWith(count6To9$)).subscribe(console.log);
// (~1s arayla) [1,6], [2,7], [3,8], [4,9]

raceWith

RxJS 6’da race operatörü kullanım dışı bırakılmıştı.

import { interval } from "rxjs";
import { map, race } from "rxjs/operators";

const count1To5$ = interval(1000).pipe(
  take(5),
  map(i => i + 1)
);

const count6To9$ = interval(1000).pipe(
  take(4),
  map(i => i + 6)
);

count1To5$.pipe(race(count6To9$)).subscribe(console.log);
// (~1s arayla) 1, 2, 3, 4, 5

RxJS 7’de onun yerine raceWith operatörü geldi.

import { interval } from "rxjs";
import { map, raceWith } from "rxjs/operators";

const count1To5$ = interval(1000).pipe(
  take(5),
  map(i => i + 1)
);

const count6To9$ = interval(1000).pipe(
  take(4),
  map(i => i + 6)
);

count1To5$.pipe(raceWith(count6To9$)).subscribe(console.log);
// (~1s arayla) 1, 2, 3, 4, 5

concatWith

RxJS 6’da concat operatörü kullanım dışı bırakılmıştı.

import { interval } from "rxjs";
import { concat, map } from "rxjs/operators";

const count1To5$ = interval(1000).pipe(
  take(5),
  map(i => i + 1)
);

const count6To9$ = interval(1000).pipe(
  take(4),
  map(i => i + 6)
);

count1To5$.pipe(concat(count6To9$)).subscribe(console.log);
// (~1s arayla) 1, 2, 3, 4, 5, 6, 7, 8, 9

RxJS 7’de onun yerine concatWith operatörü geldi.

import { interval } from "rxjs";
import { concatWith, map } from "rxjs/operators";

const count1To5$ = interval(1000).pipe(
  take(5),
  map(i => i + 1)
);

const count6To9$ = interval(1000).pipe(
  take(4),
  map(i => i + 6)
);

count1To5$.pipe(concatWith(count6To9$)).subscribe(console.log);
// (~1s arayla) 1, 2, 3, 4, 5, 6, 7, 8, 9

Daha güçlü bir timeout

RxJS 7’den önce bir akışın yayacağı ilk değere ayrı bir zaman aşımı koymak için şunu yapabiliyorduk:

import { concat, partition, timer } from "rxjs";
import { first, share, timeout } from "rxjs/operators";

const count$ = timer(3000, 2000).pipe(share());

const [first$, rest$] = partition(count$, (_, index) => index === 0);

concat(
  first$.pipe(
    timeout(5000),
    first()
  ),
  rest$.pipe(timeout(1000))
).subscribe({ next: console.log, error: console.error });
// (~3s sonra) 0
// (~1s sonra) Error: Timeout has occurred

Açıkçası bu kadar basit bir şey için yukarıdaki kontrol dizisi biraz fazla kompleksti. RxJS 7’yle artık timeout operatörüne bir konfigürasyon nesnesi iletebiliyoruz:

import { timer } from "rxjs";
import { timeout } from "rxjs/operators";

const count$ = timer(3000, 2000);

count$
  .pipe(timeout({ first: 5000, each: 1000 }))
  .subscribe({ next: console.log, error: console.error });
// (~3s sonra) 0
// (~1s sonra) Error: Timeout has occurred

Ayrıca bu konfigürasyon nesnesiyle timeoutWith operatörünü kullanım dışı bırakılmış ve yerini with parametresi almış. RxJS 6’da şöyleydi:

import { timer } from "rxjs";
import { timeoutWith } from "rxjs/operators";

timer(5000, 1000)
  .pipe(timeoutWith(2000, of("timeout")))
  .subscribe(console.log);
// (~2s sonra) timeout

RxJS 7’de böyle:

import { timer } from "rxjs";
import { timeout } from "rxjs/operators";

timer(5000, 1000)
  .pipe(timeout({ first: 2000, with: _ => of("timeout") }))
  .subscribe(console.log);
// (~2s sonra) timeout

retry operatöründe resetOnSuccess

RxJS 6’da retry parametre olarak sadece deneme miktarını alıyordu.

import { defer, from } from "rxjs";
import { retry, tap } from "rxjs/operators";

const values = ["_", 0, 1, 0, 2, 0, 3, 0, 0, 0, 4];

defer(() => {
  values.shift();
  return from(values);
})
  .pipe(
    tap(i => {
      if (!i) throw "ERROR";
    }),
    retry(2)
  )
  .subscribe({ next: console.log, error: console.warn });
// (senkron) 1, ERROR

Bu miktar başarılı denemelerden sonra resetlenmiyordu. RxJS 7’de retry operatörüne artık resetOnSuccess isimli bir seçenek verebiliyoruz.

import { defer, from } from "rxjs";
import { retry, tap } from "rxjs/operators";

const values = ["_", 0, 1, 0, 2, 0, 3, 0, 0, 0, 4];

defer(() => {
  values.shift();
  return from(values);
})
  .pipe(
    tap(i => {
      if (!i) throw "ERROR";
    }),
    retry({ count: 2, resetOnSuccess: true })
  )
  .subscribe({ next: console.log, error: console.warn });
// (senkron) 1, 2, 3, ERROR

share konfigürasyonu

RxJS 7’de share operatörü oldukça kapsamlı bir konfigürasyon nesnesi almaya başlamış. Öyle ki, shareReplaye gerek dahi kalmamış gibi duruyor. RxJS 6’da shareReplay nasıl çalışıyordu, bir hatırlayalım:

import { interval, of, zip } from "rxjs";
import { map, shareReplay } from "rxjs/operators";

const shared$ = zip(interval(1000), of("A", "B", "C", "D", "E")).pipe(
  map(([, char]) => char),
  shareReplay({ refCount: true, bufferSize: 3 })
);

shared$.subscribe(console.log);
// (~1s arayla) A, B, C, D, E

setTimeout(() => shared$.subscribe(console.log), 6000);
// (~6s sonra, bir kerede) C, D, E

Bakalım bunu yeni share konfigürasyonuyla nasıl yapabiliyoruz.

import { interval, of, ReplaySubject, zip } from "rxjs";
import { map, share } from "rxjs/operators";

const shared$ = zip(interval(1000), of("A", "B", "C", "D", "E")).pipe(
  map(([, char]) => char),
  share({
    connector: () => new ReplaySubject(3),
    resetOnComplete: false,
    resetOnError: false,
    resetOnRefCountZero: false
  })
);

shared$.subscribe(console.log);
// (~1s arayla) A, B, C, D, E

setTimeout(() => shared$.subscribe(console.log), 6000);
// (~6s sonra, bir kerede) C, D, E

Konfigürasyon nesnesindeki “reset” ile başlayan parametreler, ilgili durum gerçekleştiğinde Observableın sıfırlanıp yeniden “cold” hale gelmesini sağlıyor. Mesela, resetOnRefCountZero, bağlanan Observerların sayısı unsubscribe nedeniyle yeniden sıfır olursa, kaynağı sıfırlıyor.

connect & connectable

RxJS 7’de kaynak Observableı çok noktaya yaymaya (multicast) yarayan yeni bir operatör var: connect. Şöyle çalışıyor:

import { merge, of } from "rxjs";
import { connect, filter, map } from "rxjs/operators";

const chars$ = of("A", "b", "C", "D", "e", "f", "G");

chars$
  .pipe(
    connect(shared$ =>
      merge(
        shared$.pipe(
          filter(x => x.toLowerCase() === x),
          map(x => `lower ${x.toUpperCase()}`)
        ),
        shared$.pipe(
          filter(x => x.toLowerCase() !== x),
          map(x => `upper ${x}`)
        )
      )
    )
  )
  .subscribe(console.log);
// (senkron) upper A
// (senkron) lower B
// (senkron) upper C
// (senkron) upper D
// (senkron) lower E
// (senkron) lower F
// (senkron) upper G

Bir de connectable isimli bir fonksiyon eklenmiş. ConnectableObservableLike dönüyor. Yukarıdaki örneği şöyle de yapabilirdik yani:

import { connectable, merge, of } from "rxjs";
import { filter, map } from "rxjs/operators";

const chars$ = of("A", "b", "C", "D", "e", "f", "G");
const connectableChars$ = connectable(chars$);

const lower$ = connectableChars$.pipe(
  filter(x => x.toLowerCase() === x),
  map(x => `lower ${x.toUpperCase()}`)
);

const upper$ = connectableChars$.pipe(
  filter(x => x.toLowerCase() !== x),
  map(x => `upper ${x}`)
);

merge(lower$, upper$).subscribe(console.log);

connectableChars$.connect();
// (senkron) upper A
// (senkron) lower B
// (senkron) upper C
// (senkron) upper D
// (senkron) lower E
// (senkron) lower F
// (senkron) upper G

3 Mayıs 2021 Güncelleme: Bu PR sonrası connectable bir konfigürasyon nesnesiyle connector ve resetOnDisconnect parametreleri alabilir hale geldi.

import { connectable, interval, merge, of, Subject, zip } from "rxjs";
import { filter, map } from "rxjs/operators";

const chars$ = zip(interval(1000), of("A", "b", "C", "D", "e", "f", "G")).pipe(
  map(([, char]) => char)
);
const connectableChars$ = connectable(chars$, {
  connector: () => new Subject(),
  resetOnDisconnect: true
});

const lower$ = connectableChars$.pipe(
  filter(x => x.toLowerCase() === x),
  map(x => `lower ${x.toUpperCase()}`)
);

const upper$ = connectableChars$.pipe(
  filter(x => x.toLowerCase() !== x),
  map(x => `upper ${x}`)
);

function connect() {
  merge(lower$, upper$).subscribe(console.log);
  return connectableChars$.connect();
}

const connection = connect();
setTimeout(() => {
  connection.unsubscribe();
  connect();
}, 3000);
// (~1s sonra) upper A
// (~1s sonra) lower B
// (~1s sonra) upper C
// (~1s sonra) upper A
// (~1s sonra) lower B
// (~1s sonra) upper C
// (~1s sonra) upper D
// (~1s sonra) lower E
// (~1s sonra) lower F
// (~1s sonra) upper G

Çok noktaya yayma konusunda connectable, connect ve share yeterli olduğu için multicast ile birlikte publish, publishBehavior ve publishLast kullanım dışı bırakılmış.

animationFrames

RxJS 7’de requestAnimationFrame ve cancelAnimationFrame fonksiyonlarını saran yeni bir observable var: animationFrames. Adından da anlaşılacağı üzere animasyonlar yaratmada kullanılıyor.

import { animationFrames, combineLatest, concat } from "rxjs";
import { endWith, map, takeWhile } from "rxjs/operators";

const h1 = document.querySelector("h1")!;

combineLatest({
  x: tween(0, 200, 3600),
  y: wave(25, 1200, 3)
}).subscribe(({ x, y }) => {
  h1.style.transform = `translate3d(${x}px, ${y}px, 0)`;
});

function tween(start: number, end: number, duration: number) {
  const delta = end - start;

  return animationFrames().pipe(
    map(({ elapsed }) => elapsed / duration),
    takeWhile(percentage => percentage < 1),
    endWith(1),
    map(percentage => percentage * delta + start)
  );
}

function wave(length: number, duration: number, repeat = 1) {
  const tweens = [
    tween(0, length, duration / 4),
    tween(length, -length, duration / 2),
    tween(-length, 0, duration / 4)
  ];

  const animation = Array(repeat)
    .fill(tweens)
    .flat();

  return concat(...animation);
}

Yukardaki kodun oluşturduğu basit animasyon ise şöyle:

RxJS 7 animationFrames sayfadaki başlığı hareket ettiriyor

config.onUnhandledError

RxJS 7’de config nesnesi işlenmemiş hatalar için asenkron bir yakalayıcı barındırıyor: onUnhandledError.

import { config, throwError } from "rxjs";

config.onUnhandledError = console.warn;
throwError(() => "TEST ERROR 1").subscribe();
throwError(() => "TEST ERROR 2").subscribe({ error: console.warn });
// (synchronously) TEST ERROR 2
// (asynchronously) TEST ERROR 1

RxJS hataları yeniden çağrı yığınına kavuştu

Yazının başlrında RxJS’in özel hatalar için Object.create kullandığından bahsetmiştik. Aslında daha önce Object.setPrototypeOf kullanılıyormuş; ancak 6.3.0’ı geliştirirken IE10’u desteklemek adına bundan vazgeçilmiş. Ekim 2018’de hataların çağrı yığınını (call stack) kaybettiği haber verilmiş. Nihayet 7.0.0-beta.5 ile çağrı yığını hatalara geri eklenmiş.

import { EMPTY } from "rxjs";
import { first } from "rxjs/operators";

EMPTY.pipe(first()).subscribe({ error: console.warn });
// (senkron) EmptyErrorImpl
// 6.6.7'de call stack yok, 7.0-rc'de var

Kapanış

RxJS 7’de daha birçok değişiklik var; ama ne benim gücüm tamamını anlatmaya yeter, ne de sizin enerjiniz okumaya. Önemli olduğuna inandıklarımı bu yazıda toparlamaya çalıştım. Belki bu sürüm 4’ten 5’e veya 5’ten 6’ya geçiş kadar çarpıcı değil; ama RxJS’in artık oturmuş bir kütüphane olduğunu da unutmamak gerek.

RxJS 7’de bir dolu “deprecation” olduğu doğru; ama yeni sürüme geçişin kolay olması için hiçbir şeyi silmemişler. Yine de, özellikle TypeScript kullanıcılarının yer yer zorlanması ihtimal dahilinde. Geliştirmesi devam eden projeler için RxJS 7’e geçişin anlamlı olduğunu düşünüyorum. Ben ilk fırsatta projelerimi geçireceğim, size de tavsiye ederim.

Umarım faydalı olmuştur. Bir başka yazıda görüşmek üzere.

Bitti. 🧑‍🚀

Ekler:

Loading...
Levent Arman Özak

Levent Arman Özak