1 people like it.

Converting a Task to IObservable

Helper Type Extension to convert a System.Threading.Tasks.Task to IObservable.

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
13: 
14: 
15: 
16: 
17: 
18: 
19: 
open System
open System.Reactive
open System.Reactive.Subjects
open System.Threading.Tasks

type Task<'a> with
    member t.ToObservale() : IObservable<'a> =
        let subject = new AsyncSubject<'a>()
        t.ContinueWith(fun (f:Task<'a>) ->
        if f.IsFaulted then
            subject.OnError(f.Exception)
        else
            subject.OnNext(f.Result)
            subject.OnCompleted() ) |> ignore
        subject :> IObservable<'a>

// Test
let t = Task.Run<int>(new Func<int>(fun () -> 42))
t.ToObservale().Subscribe(fun x-> printfn "The answer to everything %d" x) 
namespace System
namespace System.Threading
namespace System.Threading.Tasks
Multiple items
type Task =
  new : action:Action -> Task + 7 overloads
  member AsyncState : obj
  member ContinueWith : continuationAction:Action<Task> -> Task + 9 overloads
  member CreationOptions : TaskCreationOptions
  member Dispose : unit -> unit
  member Exception : AggregateException
  member Id : int
  member IsCanceled : bool
  member IsCompleted : bool
  member IsFaulted : bool
  ...

Full name: System.Threading.Tasks.Task

--------------------
type Task<'TResult> =
  inherit Task
  new : function:Func<'TResult> -> Task<'TResult> + 7 overloads
  member ContinueWith : continuationAction:Action<Task<'TResult>> -> Task + 9 overloads
  member Result : 'TResult with get, set
  static member Factory : TaskFactory<'TResult>

Full name: System.Threading.Tasks.Task<_>

--------------------
Task(action: Action) : unit
Task(action: Action, cancellationToken: Threading.CancellationToken) : unit
Task(action: Action, creationOptions: TaskCreationOptions) : unit
Task(action: Action<obj>, state: obj) : unit
Task(action: Action, cancellationToken: Threading.CancellationToken, creationOptions: TaskCreationOptions) : unit
Task(action: Action<obj>, state: obj, cancellationToken: Threading.CancellationToken) : unit
Task(action: Action<obj>, state: obj, creationOptions: TaskCreationOptions) : unit
Task(action: Action<obj>, state: obj, cancellationToken: Threading.CancellationToken, creationOptions: TaskCreationOptions) : unit

--------------------
Task(function: Func<'TResult>) : unit
Task(function: Func<'TResult>, cancellationToken: Threading.CancellationToken) : unit
Task(function: Func<'TResult>, creationOptions: TaskCreationOptions) : unit
Task(function: Func<obj,'TResult>, state: obj) : unit
Task(function: Func<'TResult>, cancellationToken: Threading.CancellationToken, creationOptions: TaskCreationOptions) : unit
Task(function: Func<obj,'TResult>, state: obj, cancellationToken: Threading.CancellationToken) : unit
Task(function: Func<obj,'TResult>, state: obj, creationOptions: TaskCreationOptions) : unit
Task(function: Func<obj,'TResult>, state: obj, cancellationToken: Threading.CancellationToken, creationOptions: TaskCreationOptions) : unit
val t : Task<'TResult>
member Task.ToObservale : unit -> IObservable<'TResult>

Full name: Script.ToObservale
type IObservable<'T> =
  member Subscribe : observer:IObserver<'T> -> IDisposable

Full name: System.IObservable<_>
val subject : IObservable<'TResult>
Task.ContinueWith<'TResult>(continuationFunction: Func<Task,'TResult>) : Task<'TResult>
   (+0 other overloads)
Task.ContinueWith(continuationAction: Action<Task>) : Task
   (+0 other overloads)
Task.ContinueWith<'TNewResult>(continuationFunction: Func<Task<'TResult>,'TNewResult>) : Task<'TNewResult>
   (+0 other overloads)
Task.ContinueWith(continuationAction: Action<Task<'TResult>>) : Task
   (+0 other overloads)
Task.ContinueWith<'TResult>(continuationFunction: Func<Task,'TResult>, continuationOptions: TaskContinuationOptions) : Task<'TResult>
   (+0 other overloads)
Task.ContinueWith<'TResult>(continuationFunction: Func<Task,'TResult>, scheduler: TaskScheduler) : Task<'TResult>
   (+0 other overloads)
Task.ContinueWith<'TResult>(continuationFunction: Func<Task,'TResult>, cancellationToken: Threading.CancellationToken) : Task<'TResult>
   (+0 other overloads)
Task.ContinueWith(continuationAction: Action<Task>, continuationOptions: TaskContinuationOptions) : Task
   (+0 other overloads)
Task.ContinueWith(continuationAction: Action<Task>, scheduler: TaskScheduler) : Task
   (+0 other overloads)
Task.ContinueWith(continuationAction: Action<Task>, cancellationToken: Threading.CancellationToken) : Task
   (+0 other overloads)
val f : Task<'TResult>
property Task.IsFaulted: bool
property Task.Exception: AggregateException
property Task.Result: 'TResult
val ignore : value:'T -> unit

Full name: Microsoft.FSharp.Core.Operators.ignore
val t : obj

Full name: Script.t
Multiple items
val int : value:'T -> int (requires member op_Explicit)

Full name: Microsoft.FSharp.Core.Operators.int

--------------------
type int = int32

Full name: Microsoft.FSharp.Core.int

--------------------
type int<'Measure> = int

Full name: Microsoft.FSharp.Core.int<_>
Multiple items
type Func<'TResult> =
  delegate of unit -> 'TResult

Full name: System.Func<_>

--------------------
type Func<'T,'TResult> =
  delegate of 'T -> 'TResult

Full name: System.Func<_,_>

--------------------
type Func<'T1,'T2,'TResult> =
  delegate of 'T1 * 'T2 -> 'TResult

Full name: System.Func<_,_,_>

--------------------
type Func<'T1,'T2,'T3,'TResult> =
  delegate of 'T1 * 'T2 * 'T3 -> 'TResult

Full name: System.Func<_,_,_,_>

--------------------
type Func<'T1,'T2,'T3,'T4,'TResult> =
  delegate of 'T1 * 'T2 * 'T3 * 'T4 -> 'TResult

Full name: System.Func<_,_,_,_,_>

--------------------
type Func<'T1,'T2,'T3,'T4,'T5,'TResult> =
  delegate of 'T1 * 'T2 * 'T3 * 'T4 * 'T5 -> 'TResult

Full name: System.Func<_,_,_,_,_,_>

--------------------
type Func<'T1,'T2,'T3,'T4,'T5,'T6,'TResult> =
  delegate of 'T1 * 'T2 * 'T3 * 'T4 * 'T5 * 'T6 -> 'TResult

Full name: System.Func<_,_,_,_,_,_,_>

--------------------
type Func<'T1,'T2,'T3,'T4,'T5,'T6,'T7,'TResult> =
  delegate of 'T1 * 'T2 * 'T3 * 'T4 * 'T5 * 'T6 * 'T7 -> 'TResult

Full name: System.Func<_,_,_,_,_,_,_,_>

--------------------
type Func<'T1,'T2,'T3,'T4,'T5,'T6,'T7,'T8,'TResult> =
  delegate of 'T1 * 'T2 * 'T3 * 'T4 * 'T5 * 'T6 * 'T7 * 'T8 -> 'TResult

Full name: System.Func<_,_,_,_,_,_,_,_,_>

--------------------
type Func<'T1,'T2,'T3,'T4,'T5,'T6,'T7,'T8,'T9,'TResult> =
  delegate of 'T1 * 'T2 * 'T3 * 'T4 * 'T5 * 'T6 * 'T7 * 'T8 * 'T9 -> 'TResult

Full name: System.Func<_,_,_,_,_,_,_,_,_,_>

--------------------
type Func<'T1,'T2,'T3,'T4,'T5,'T6,'T7,'T8,'T9,'T10,'TResult> =
  delegate of 'T1 * 'T2 * 'T3 * 'T4 * 'T5 * 'T6 * 'T7 * 'T8 * 'T9 * 'T10 -> 'TResult

Full name: System.Func<_,_,_,_,_,_,_,_,_,_,_>

--------------------
type Func<'T1,'T2,'T3,'T4,'T5,'T6,'T7,'T8,'T9,'T10,'T11,'TResult> =
  delegate of 'T1 * 'T2 * 'T3 * 'T4 * 'T5 * 'T6 * 'T7 * 'T8 * 'T9 * 'T10 * 'T11 -> 'TResult

Full name: System.Func<_,_,_,_,_,_,_,_,_,_,_,_>

--------------------
type Func<'T1,'T2,'T3,'T4,'T5,'T6,'T7,'T8,'T9,'T10,'T11,'T12,'TResult> =
  delegate of 'T1 * 'T2 * 'T3 * 'T4 * 'T5 * 'T6 * 'T7 * 'T8 * 'T9 * 'T10 * 'T11 * 'T12 -> 'TResult

Full name: System.Func<_,_,_,_,_,_,_,_,_,_,_,_,_>

--------------------
type Func<'T1,'T2,'T3,'T4,'T5,'T6,'T7,'T8,'T9,'T10,'T11,'T12,'T13,'TResult> =
  delegate of 'T1 * 'T2 * 'T3 * 'T4 * 'T5 * 'T6 * 'T7 * 'T8 * 'T9 * 'T10 * 'T11 * 'T12 * 'T13 -> 'TResult

Full name: System.Func<_,_,_,_,_,_,_,_,_,_,_,_,_,_>

--------------------
type Func<'T1,'T2,'T3,'T4,'T5,'T6,'T7,'T8,'T9,'T10,'T11,'T12,'T13,'T14,'TResult> =
  delegate of 'T1 * 'T2 * 'T3 * 'T4 * 'T5 * 'T6 * 'T7 * 'T8 * 'T9 * 'T10 * 'T11 * 'T12 * 'T13 * 'T14 -> 'TResult

Full name: System.Func<_,_,_,_,_,_,_,_,_,_,_,_,_,_,_>

--------------------
type Func<'T1,'T2,'T3,'T4,'T5,'T6,'T7,'T8,'T9,'T10,'T11,'T12,'T13,'T14,'T15,'TResult> =
  delegate of 'T1 * 'T2 * 'T3 * 'T4 * 'T5 * 'T6 * 'T7 * 'T8 * 'T9 * 'T10 * 'T11 * 'T12 * 'T13 * 'T14 * 'T15 -> 'TResult

Full name: System.Func<_,_,_,_,_,_,_,_,_,_,_,_,_,_,_,_>

--------------------
type Func<'T1,'T2,'T3,'T4,'T5,'T6,'T7,'T8,'T9,'T10,'T11,'T12,'T13,'T14,'T15,'T16,'TResult> =
  delegate of 'T1 * 'T2 * 'T3 * 'T4 * 'T5 * 'T6 * 'T7 * 'T8 * 'T9 * 'T10 * 'T11 * 'T12 * 'T13 * 'T14 * 'T15 * 'T16 -> 'TResult

Full name: System.Func<_,_,_,_,_,_,_,_,_,_,_,_,_,_,_,_,_>
val printfn : format:Printf.TextWriterFormat<'T> -> 'T

Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.printfn
Raw view Test code New version

More information

Link:http://fssnip.net/s5
Posted:8 years ago
Author:Riccardo Terrell
Tags: rx , reactive , task