10 people like it.

AsyncSeq - Introduction and Crawler

This snippet demonstrates programming using asynchronous sequences. It contains (hidden) implementation of AsyncSeq type and combinators for working with it. More importantly, it demonstrates how to use asynchronous sequences to implement a simple sequential on-demand crawler.

Creating simple asynchronous sequence

1: 
2: 
3: 
4: 
5: 
6: 
7: 
  // When accessed, generates numbers 1 and 2. The number 
  // is returned 1 second after value is requested.
  let oneTwo = asyncSeq { 
    do! Async.Sleep(1000)
    yield 1
    do! Async.Sleep(1000)
    yield 2 }

Downloading web pages in sequence

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
13: 
14: 
15: 
16: 
17: 
18: 
19: 
20: 
21: 
22: 
23: 
24: 
25: 
26: 
27: 
28: 
  let urls = 
    [ "http://bing.com"; "http://yahoo.com"; 
      "http://google.com"; "http://msn.com" ]

  // Asynchronous sequence that returns URLs and lengths
  // of the downloaded HTML. Web pages from a given list
  // are downloaded synchronously in sequence.
  let pages = asyncSeq {
    use wc = new WebClient()
    for url in urls do
      try
        let! html = wc.AsyncDownloadString(Uri(url))
        yield url, html.Length 
      with _ -> 
        yield url, -1 }    

  // Asynchronous workflow that prints results
  async {
    for url, length in pages do
      printfn "%s (%d)" url length }
  |> Async.Start

  // Print URL of pages that are smaller than 50k
  pages 
    |> AsyncSeq.filter (fun (_, len) -> len < 50000)
    |> AsyncSeq.map fst
    |> AsyncSeq.iter (printfn "%s")
    |> Async.Start

Sequential web crawler

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
13: 
14: 
15: 
16: 
17: 
18: 
19: 
20: 
21: 
22: 
23: 
24: 
25: 
26: 
27: 
28: 
29: 
30: 
31: 
32: 
33: 
34: 
35: 
36: 
37: 
38: 
39: 
40: 
41: 
42: 
43: 
44: 
45: 
  open HtmlAgilityPack
  open System.Text.RegularExpressions

  /// Asynchronously download the document and parse the HTML
  let downloadDocument url = (*[omit:(...)*)async {
    try let wc = new WebClient()
        let! html = wc.AsyncDownloadString(Uri(url))
        let doc = new HtmlDocument()
        doc.LoadHtml(html)
        return Some doc 
    with _ -> return None }(*[/omit]*)

  /// Extract all links from the document that start with "http://"
  let extractLinks (doc:HtmlDocument) = (...)

  /// Extract the <title> of the web page
  let getTitle (doc:HtmlDocument) = (...)

  /// Crawl the internet starting from the specified page.
  /// From each page follow the first not-yet-visited page.
  let rec randomCrawl url = 
    let visited = new System.Collections.Generic.HashSet<_>()

    // Visits page and then recursively visits all referenced pages
    let rec loop url = asyncSeq {
      if visited.Add(url) then
        let! doc = downloadDocument url
        match doc with 
        | Some doc ->
            // Yield url and title as the next element
            yield url, getTitle doc
            // For every link, yield all referenced pages too
            for link in extractLinks doc do
              yield! loop link 
        | _ -> () }
    loop url

  // Use AsyncSeq combinators to print the titles of the first 10
  // web sites that are from other domains than bing.com
  randomCrawl "http://news.bing.com"
  |> AsyncSeq.filter (fun (url, title) -> url.Contains("bing.com") |> not)
  |> AsyncSeq.map snd
  |> AsyncSeq.take 10
  |> AsyncSeq.iter (printfn "%s")
  |> Async.Start
val oneTwo : AsyncSeq<int>

Full name: Script.Samples.oneTwo
val asyncSeq : AsyncSeq.AsyncSeqBuilder

Full name: Script.AsyncSeqExtensions.asyncSeq


 Builds an asynchronou sequence using the computation builder syntax
Multiple items
type Async
static member AsBeginEnd : computation:('Arg -> Async<'T>) -> ('Arg * AsyncCallback * obj -> IAsyncResult) * (IAsyncResult -> 'T) * (IAsyncResult -> unit)
static member AwaitEvent : event:IEvent<'Del,'T> * ?cancelAction:(unit -> unit) -> Async<'T> (requires delegate and 'Del :> Delegate)
static member AwaitIAsyncResult : iar:IAsyncResult * ?millisecondsTimeout:int -> Async<bool>
static member AwaitTask : task:Task<'T> -> Async<'T>
static member AwaitWaitHandle : waitHandle:WaitHandle * ?millisecondsTimeout:int -> Async<bool>
static member CancelDefaultToken : unit -> unit
static member Catch : computation:Async<'T> -> Async<Choice<'T,exn>>
static member FromBeginEnd : beginAction:(AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg:'Arg1 * beginAction:('Arg1 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg1:'Arg1 * arg2:'Arg2 * beginAction:('Arg1 * 'Arg2 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg1:'Arg1 * arg2:'Arg2 * arg3:'Arg3 * beginAction:('Arg1 * 'Arg2 * 'Arg3 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromContinuations : callback:(('T -> unit) * (exn -> unit) * (OperationCanceledException -> unit) -> unit) -> Async<'T>
static member Ignore : computation:Async<'T> -> Async<unit>
static member OnCancel : interruption:(unit -> unit) -> Async<IDisposable>
static member Parallel : computations:seq<Async<'T>> -> Async<'T []>
static member RunSynchronously : computation:Async<'T> * ?timeout:int * ?cancellationToken:CancellationToken -> 'T
static member Sleep : millisecondsDueTime:int -> Async<unit>
static member Start : computation:Async<unit> * ?cancellationToken:CancellationToken -> unit
static member StartAsTask : computation:Async<'T> * ?taskCreationOptions:TaskCreationOptions * ?cancellationToken:CancellationToken -> Task<'T>
static member StartChild : computation:Async<'T> * ?millisecondsTimeout:int -> Async<Async<'T>>
static member StartChildAsTask : computation:Async<'T> * ?taskCreationOptions:TaskCreationOptions -> Async<Task<'T>>
static member StartImmediate : computation:Async<unit> * ?cancellationToken:CancellationToken -> unit
static member StartWithContinuations : computation:Async<'T> * continuation:('T -> unit) * exceptionContinuation:(exn -> unit) * cancellationContinuation:(OperationCanceledException -> unit) * ?cancellationToken:CancellationToken -> unit
static member SwitchToContext : syncContext:SynchronizationContext -> Async<unit>
static member SwitchToNewThread : unit -> Async<unit>
static member SwitchToThreadPool : unit -> Async<unit>
static member TryCancelled : computation:Async<'T> * compensation:(OperationCanceledException -> unit) -> Async<'T>
static member CancellationToken : Async<CancellationToken>
static member DefaultCancellationToken : CancellationToken

Full name: Microsoft.FSharp.Control.Async

--------------------
type Async<'T>

Full name: Microsoft.FSharp.Control.Async<_>
static member Async.Sleep : millisecondsDueTime:int -> Async<unit>
val urls : string list

Full name: Script.Samples.urls
val pages : AsyncSeq<string * int>

Full name: Script.Samples.pages
val wc : WebClient
Multiple items
type WebClient =
  inherit Component
  new : unit -> WebClient
  member BaseAddress : string with get, set
  member CachePolicy : RequestCachePolicy with get, set
  member CancelAsync : unit -> unit
  member Credentials : ICredentials with get, set
  member DownloadData : address:string -> byte[] + 1 overload
  member DownloadDataAsync : address:Uri -> unit + 1 overload
  member DownloadFile : address:string * fileName:string -> unit + 1 overload
  member DownloadFileAsync : address:Uri * fileName:string -> unit + 1 overload
  member DownloadString : address:string -> string + 1 overload
  ...

Full name: System.Net.WebClient

--------------------
WebClient() : unit
val url : string
val html : string
member WebClient.AsyncDownloadString : address:Uri -> Async<string>
Multiple items
type Uri =
  new : uriString:string -> Uri + 5 overloads
  member AbsolutePath : string
  member AbsoluteUri : string
  member Authority : string
  member DnsSafeHost : string
  member Equals : comparand:obj -> bool
  member Fragment : string
  member GetComponents : components:UriComponents * format:UriFormat -> string
  member GetHashCode : unit -> int
  member GetLeftPart : part:UriPartial -> string
  ...

Full name: System.Uri

--------------------
Uri(uriString: string) : unit
Uri(uriString: string, uriKind: UriKind) : unit
Uri(baseUri: Uri, relativeUri: string) : unit
Uri(baseUri: Uri, relativeUri: Uri) : unit
property String.Length: int
val async : AsyncBuilder

Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.async
val length : int
val printfn : format:Printf.TextWriterFormat<'T> -> 'T

Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.printfn
static member Async.Start : computation:Async<unit> * ?cancellationToken:CancellationToken -> unit
Multiple items
module AsyncSeq

from Script


 Module with helper functions for working with asynchronous sequences


--------------------
type AsyncSeq<'T> = Async<AsyncSeqInner<'T>>

Full name: Script.AsyncSeq<_>


 An asynchronous sequence represents a delayed computation that can be
 started to produce either Cons value consisting of the next element of the
 sequence (head) together with the next asynchronous sequence (tail) or a
 special value representing the end of the sequence (Nil)
val filter : f:('T -> bool) -> input:AsyncSeq<'T> -> AsyncSeq<'T>

Full name: Script.AsyncSeq.filter


 Same as AsyncSeq.filterAsync, but the specified predicate is synchronous
 and processes the input element immediately.
val len : int
val map : f:('T -> 'a) -> input:AsyncSeq<'T> -> AsyncSeq<'a>

Full name: Script.AsyncSeq.map


 Same as AsyncSeq.mapAsync, but the specified function is synchronous
 and returns the result of projection immediately.
val fst : tuple:('T1 * 'T2) -> 'T1

Full name: Microsoft.FSharp.Core.Operators.fst
val iter : f:('T -> unit) -> input:AsyncSeq<'T> -> Async<unit>

Full name: Script.AsyncSeq.iter


 Same as AsyncSeq.iterAsync, but the specified function is synchronous
 and performs the side-effect immediately.
namespace System
namespace System.Text
namespace System.Text.RegularExpressions
val downloadDocument : url:string -> Async<'a option>

Full name: Script.Samples.downloadDocument


 Asynchronously download the document and parse the HTML
val doc : 'a
union case Option.Some: Value: 'T -> Option<'T>
union case Option.None: Option<'T>
val extractLinks : doc:'a -> 'b list

Full name: Script.Samples.extractLinks


 Extract all links from the document that start with "http://"
try
      [ for a in doc.DocumentNode.SelectNodes("//a") do
          if a.Attributes.Contains("href") then
            let href = a.Attributes.["href"].Value
            if href.StartsWith("http://") then
              let endl = href.IndexOf('?')
              yield if endl > 0 then href.Substring(0, endl) else href ]
    with _ -> []
val getTitle : doc:'a -> string

Full name: Script.Samples.getTitle


 Extract the <title> of the web page
let title = doc.DocumentNode.SelectSingleNode("//title")
    if title <> null then title.InnerText.Trim() else "Untitled"
val randomCrawl : url:string -> AsyncSeq<string * string>

Full name: Script.Samples.randomCrawl


 Crawl the internet starting from the specified page.
 From each page follow the first not-yet-visited page.
val visited : HashSet<string>
namespace System.Collections
namespace System.Collections.Generic
Multiple items
type HashSet<'T> =
  new : unit -> HashSet<'T> + 3 overloads
  member Add : item:'T -> bool
  member Clear : unit -> unit
  member Comparer : IEqualityComparer<'T>
  member Contains : item:'T -> bool
  member CopyTo : array:'T[] -> unit + 2 overloads
  member Count : int
  member ExceptWith : other:IEnumerable<'T> -> unit
  member GetEnumerator : unit -> Enumerator<'T>
  member GetObjectData : info:SerializationInfo * context:StreamingContext -> unit
  ...
  nested type Enumerator

Full name: System.Collections.Generic.HashSet<_>

--------------------
HashSet() : unit
HashSet(comparer: IEqualityComparer<'T>) : unit
HashSet(collection: IEnumerable<'T>) : unit
HashSet(collection: IEnumerable<'T>, comparer: IEqualityComparer<'T>) : unit
val loop : (string -> AsyncSeq<string * string>)
HashSet.Add(item: string) : bool
val doc : obj option
val doc : obj
val link : string
val title : string
String.Contains(value: string) : bool
val not : value:bool -> bool

Full name: Microsoft.FSharp.Core.Operators.not
val snd : tuple:('T1 * 'T2) -> 'T2

Full name: Microsoft.FSharp.Core.Operators.snd
val take : count:int -> input:AsyncSeq<'T> -> AsyncSeq<'T>

Full name: Script.AsyncSeq.take


 Returns the first N elements of an asynchronous sequence
Raw view Test code New version

More information

Link:http://fssnip.net/7f
Posted:12 years ago
Author:Tomas Petricek
Tags: async , asynchronous , asynchronous sequence , asyncseq , crawler , web crawler