# Extensions **Repository Path**: mirrors_reactiveui/Extensions ## Basic Information - **Project Name**: Extensions - **Description**: Extensions for System.Reactive - **Primary Language**: Unknown - **License**: MIT - **Default Branch**: main - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2025-09-17 - **Last Updated**: 2025-12-28 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README [![Build](https://github.com/reactiveui/Extensions/actions/workflows/ci-build.yml/badge.svg)](https://github.com/reactiveui/Extensions/actions/workflows/ci-build.yml) [![codecov](https://codecov.io/gh/reactiveui/Extensions/graph/badge.svg?token=7u1lNF5imh)](https://codecov.io/gh/reactiveui/Extensions) # ReactiveUI.Extensions A focused collection of high–value Reactive Extensions (Rx) operators that do **not** ship with `System.Reactive` but are commonly needed when building reactive .NET applications. The goal of this library is to: - Reduce boilerplate for frequent reactive patterns (timers, buffering, throttling, heartbeats, etc.) - Provide pragmatic, allocation?aware helpers for performance sensitive scenarios - Avoid additional dependencies – only `System.Reactive` is required Supported Target Frameworks: `.NET Standard 2.0`, `.NET 8`, `.NET 9`, `.NET 10`. --- ## Table of Contents 1. [Installation](#installation) 2. [Quick Start](#quick-start) 3. [API Catalog](#api-catalog) 4. [Operator Categories & Examples](#operator-categories--examples) - [Null / Signal Helpers](#null--signal-helpers) - [Timing, Scheduling & Flow Control](#timing-scheduling--flow-control) - [Inactivity / Liveness](#inactivity--liveness) - [Error Handling & Resilience](#error-handling--resilience) - [Combining, Partitioning & Logical Helpers](#combining-partitioning--logical-helpers) - [Async / Task Integration](#async--task-integration) - [Backpressure / Conflation](#backpressure--conflation) - [Selective & Conditional Emission](#selective--conditional-emission) - [Buffering & Transformation](#buffering--transformation) - [Subscription / Side Effects](#subscription--side-effects) - [Utility & Miscellaneous](#utility--miscellaneous) 5. [Performance Notes](#performance-notes) 6. [Thread Safety](#thread-safety) 7. [License](#license) --- ## Installation ```bash # Package coming soon (example) dotnet add package ReactiveUI.Extensions ``` Reference the project directly while developing locally. --- ## Quick Start ```csharp using System; using System.Reactive.Linq; using ReactiveUI.Extensions; var source = Observable.Interval(TimeSpan.FromMilliseconds(120)) .Take(10) .Select(i => (long?) (i % 3 == 0 ? null : i)); // 1. Filter nulls + convert to a Unit signal. var signal = source.WhereIsNotNull().AsSignal(); // 2. Add a heartbeat if the upstream goes quiet for 500ms. var withHeartbeat = source.WhereIsNotNull() .Heartbeat(TimeSpan.FromMilliseconds(500), Scheduler.Default); // 3. Retry with exponential backoff up to 5 times. var resilient = Observable.Defer(() => Observable.Throw(new InvalidOperationException("Boom"))) .RetryWithBackoff(maxRetries: 5, initialDelay: TimeSpan.FromMilliseconds(100)); // 4. Conflate bursty updates. var conflated = source.Conflate(TimeSpan.FromMilliseconds(300), Scheduler.Default); using (conflated.Subscribe(Console.WriteLine)) { Console.ReadLine(); } ``` --- ## API Catalog Below is the full list of extension methods (grouped logically). Some overloads omitted for brevity. | Category | Operators | |----------|-----------| | Null & Signal | `WhereIsNotNull`, `AsSignal` | | Timing & Scheduling | `SyncTimer`, `Schedule` (overloads), `ScheduleSafe`, `ThrottleFirst`, `DebounceImmediate` | | Inactivity / Liveness | `Heartbeat`, `DetectStale`, `BufferUntilInactive` | | Error Handling | `CatchIgnore`, `CatchAndReturn`, `OnErrorRetry` (overloads), `RetryWithBackoff` | | Combining & Aggregation | `CombineLatestValuesAreAllTrue`, `CombineLatestValuesAreAllFalse`, `GetMax`, `GetMin`, `Partition` | | Logical / Boolean | `Not`, `WhereTrue`, `WhereFalse` | | Async / Task | `SelectAsyncSequential`, `SelectLatestAsync`, `SelectAsyncConcurrent`, `SubscribeAsync` (overloads), `SynchronizeSynchronous`, `SynchronizeAsync`, `SubscribeSynchronous` (overloads) | | Backpressure | `Conflate` | | Filtering / Conditional | `Filter` (Regex), `TakeUntil` (predicate), `WaitUntil` | | Buffering | `BufferUntil`, `BufferUntilInactive` | | Transformation & Utility | `Shuffle`, `ForEach`, `FromArray`, `Using`, `While`, `Start`, `OnNext` (params helper), `DoOnSubscribe`, `DoOnDispose` | --- ## Operator Categories & Examples ### Null / Signal Helpers ```csharp IObservable raw = GetPossiblyNullStream(); IObservable cleaned = raw.WhereIsNotNull(); IObservable signal = cleaned.AsSignal(); ``` ### Timing, Scheduling & Flow Control ```csharp // Shared timer for a given period (one underlying timer per distinct TimeSpan) var sharedTimer = ReactiveExtensions.SyncTimer(TimeSpan.FromSeconds(1)); // Delay emission of a single value 42.Schedule(TimeSpan.FromMilliseconds(250), Scheduler.Default) .Subscribe(v => Console.WriteLine($"Delayed: {v}")); // Safe scheduling when a scheduler may be null IScheduler? maybeScheduler = null; maybeScheduler.ScheduleSafe(() => Console.WriteLine("Ran inline")); // ThrottleFirst: allow first item per window, ignore rest var throttled = Observable.Interval(TimeSpan.FromMilliseconds(50)) .ThrottleFirst(TimeSpan.FromMilliseconds(200)); // DebounceImmediate: emit first immediately then debounce rest var debounced = Observable.Interval(TimeSpan.FromMilliseconds(40)) .DebounceImmediate(TimeSpan.FromMilliseconds(250)); ``` ### Inactivity / Liveness ```csharp // Heartbeat emits IHeartbeat where IsHeartbeat == true during quiet periods var heartbeats = Observable.Interval(TimeSpan.FromMilliseconds(400)) .Take(5) .Heartbeat(TimeSpan.FromMilliseconds(300), Scheduler.Default); // DetectStale emits IStale: one stale marker after inactivity, or fresh update wrappers var staleAware = Observable.Timer(TimeSpan.Zero, TimeSpan.FromMilliseconds(500)) .Take(3) .DetectStale(TimeSpan.FromMilliseconds(300), Scheduler.Default); // BufferUntilInactive groups events separated by inactivity var bursts = Observable.Interval(TimeSpan.FromMilliseconds(60)).Take(20); var groups = bursts.BufferUntilInactive(TimeSpan.FromMilliseconds(200)); ``` ### Error Handling & Resilience ```csharp var flaky = Observable.Create(o => { o.OnNext(1); o.OnError(new InvalidOperationException("Fail")); return () => { }; }); // Ignore all errors and complete silently a flakySafe = flaky.CatchIgnore(); // Replace error with a fallback value var withFallback = flaky.CatchAndReturn(-1); // Retry only specific exception type with logging var retried = flaky.OnErrorRetry(ex => Console.WriteLine(ex.Message), retryCount: 3); // Retry with exponential backoff var backoff = flaky.RetryWithBackoff(maxRetries: 5, initialDelay: TimeSpan.FromMilliseconds(100)); ``` ### Combining, Partitioning & Logical Helpers ```csharp var a = Observable.Interval(TimeSpan.FromMilliseconds(150)).Select(i => i % 2 == 0); var b = Observable.Interval(TimeSpan.FromMilliseconds(170)).Select(i => i % 3 == 0); var allTrue = new[] { a, b }.CombineLatestValuesAreAllTrue(); var allFalse = new[] { a, b }.CombineLatestValuesAreAllFalse(); var numbers = Observable.Range(1, 10); var (even, odd) = numbers.Partition(n => n % 2 == 0); // Partition stream var toggles = a.Not(); // Negate booleans ``` ### Async / Task Integration ```csharp IObservable inputs = Observable.Range(1, 5); // Sequential (preserves order) var seq = inputs.SelectAsyncSequential(async i => { await Task.Delay(50); return i * 2; }); // Latest only (cancels previous) var latest = inputs.SelectLatestAsync(async i => { await Task.Delay(100); return i; }); // Limited parallelism var concurrent = inputs.SelectAsyncConcurrent(async i => { await Task.Delay(100); return i; }, maxConcurrency: 2); // Asynchronous subscription (serializing tasks) inputs.SubscribeAsync(async i => await Task.Delay(10)); // Synchronous gate: ensures per-item async completion before next is emitted a inputs.SubscribeSynchronous(async i => await Task.Delay(25)); ``` ### Backpressure / Conflation ```csharp // Conflate: enforce minimum spacing between emissions while always outputting the most recent value a var noisy = Observable.Interval(TimeSpan.FromMilliseconds(20)).Take(30); var conflated = noisy.Conflate(TimeSpan.FromMilliseconds(200), Scheduler.Default); ``` ### Selective & Conditional Emission ```csharp // TakeUntil predicate (inclusive) var untilFive = Observable.Range(1, 100).TakeUntil(x => x == 5); // WaitUntil first match then complete var firstEven = Observable.Range(1, 10).WaitUntil(x => x % 2 == 0); ``` ### Buffering & Transformation ```csharp // BufferUntil - collect chars between delimiters var chars = "".ToCharArray().ToObservable(); var frames = chars.BufferUntil('<', '>'); // emits "", "", "" // Shuffle arrays in-place var arrays = Observable.Return(new[] { 1, 2, 3, 4, 5 }); var shuffled = arrays.Shuffle(); ``` ### Subscription & Side Effects ```csharp var stream = Observable.Range(1, 3) .DoOnSubscribe(() => Console.WriteLine("Subscribed")) .DoOnDispose(() => Console.WriteLine("Disposed")); using (stream.Subscribe(Console.WriteLine)) { // auto dispose at using end } ``` ### Utility & Miscellaneous ```csharp // Emit list contents quickly with low allocations var listSource = Observable.Return>(new List { 1, 2, 3 }); listSource.ForEach().Subscribe(Console.WriteLine); // Using helper for deterministic disposal var value = new MemoryStream().Using(ms => ms.Length); // While loop (reactive) var counter = 0; ReactiveExtensions.While(() => counter++ < 3, () => Console.WriteLine(counter)) .Subscribe(); // Batch push with OnNext params var subj = new Subject(); subj.OnNext(1, 2, 3, 4); ``` --- ## Performance Notes - `FastForEach` path avoids iterator allocations for `List`, `IList`, and arrays. - `SyncTimer` ensures only one shared timer per period reducing timer overhead. - `Conflate` helps tame high–frequency producers without dropping the final value of a burst. - `Heartbeat` and `DetectStale` use lightweight scheduling primitives. - Most operators avoid capturing lambdas in hot loops where practical. ## Thread Safety - All operators are pure functional transformations unless documented otherwise. - `SyncTimer` uses a `ConcurrentDictionary` and returns a hot `IConnectableObservable` that connects once per unique `TimeSpan`. - Methods returning shared observables (`SyncTimer`, `Partition` result sequences) are safe for multi-subscriber usage unless the upstream is inherently side-effecting. ## License MIT – see LICENSE file. --- ## Contributing Issues / PRs welcome. Please keep additions dependency–free and focused on broadly useful reactive patterns. --- ## Change Log (Excerpt) (Keep this section updated as the library evolves.) - Added async task projection helpers (`SelectAsyncSequential`, `SelectLatestAsync`, `SelectAsyncConcurrent`). - Added liveness operators (`Heartbeat`, `DetectStale`, `BufferUntilInactive`). - Added resilience (`RetryWithBackoff`, expanded `OnErrorRetry` overloads). - Added flow control (`Conflate`, `ThrottleFirst`, `DebounceImmediate`). - Removed DisposeWith extension use System.Reactive.Disposables.Fluent from System.Reactive. --- Happy reactive coding!