5 people like it.

Parallel recursive crawler using agents

The aim here is to demonstrate a method of distributing work using the built in F# agent across multiple nodes in parallel the result of crawling one page might result in finding multiple new pages to fetch. This is a recursive process which will continue until no new URLs are found. The main focus is how to process a potentially indefinite queue of work across a pool of workers, rather than how to parse web pages.

  1: 
  2: 
  3: 
  4: 
  5: 
  6: 
  7: 
  8: 
  9: 
 10: 
 11: 
 12: 
 13: 
 14: 
 15: 
 16: 
 17: 
 18: 
 19: 
 20: 
 21: 
 22: 
 23: 
 24: 
 25: 
 26: 
 27: 
 28: 
 29: 
 30: 
 31: 
 32: 
 33: 
 34: 
 35: 
 36: 
 37: 
 38: 
 39: 
 40: 
 41: 
 42: 
 43: 
 44: 
 45: 
 46: 
 47: 
 48: 
 49: 
 50: 
 51: 
 52: 
 53: 
 54: 
 55: 
 56: 
 57: 
 58: 
 59: 
 60: 
 61: 
 62: 
 63: 
 64: 
 65: 
 66: 
 67: 
 68: 
 69: 
 70: 
 71: 
 72: 
 73: 
 74: 
 75: 
 76: 
 77: 
 78: 
 79: 
 80: 
 81: 
 82: 
 83: 
 84: 
 85: 
 86: 
 87: 
 88: 
 89: 
 90: 
 91: 
 92: 
 93: 
 94: 
 95: 
 96: 
 97: 
 98: 
 99: 
100: 
101: 
102: 
103: 
104: 
105: 
106: 
107: 
108: 
109: 
110: 
111: 
112: 
113: 
114: 
115: 
116: 
117: 
118: 
119: 
120: 
121: 
122: 
123: 
124: 
125: 
126: 
127: 
128: 
129: 
130: 
131: 
132: 
133: 
134: 
135: 
136: 
137: 
138: 
139: 
140: 
141: 
142: 
143: 
144: 
145: 
146: 
147: 
148: 
149: 
150: 
151: 
152: 
153: 
154: 
155: 
156: 
157: 
158: 
159: 
160: 
161: 
162: 
163: 
164: 
165: 
166: 
167: 
168: 
169: 
170: 
171: 
172: 
173: 
174: 
175: 
176: 
177: 
178: 
179: 
180: 
181: 
182: 
183: 
184: 
185: 
186: 
187: 
188: 
189: 
190: 
open System

// Prelude - just a simple immutable queue.
type Queue<'a>(xs : 'a list, rxs : 'a list) =
    new() = Queue([], [])
    static member Empty() = new Queue<'a>([], [])

    member q.IsEmpty = (List.isEmpty xs) && (List.isEmpty rxs)
    member q.Enqueue(x) = Queue(xs, x::rxs)
    member q.TryTake() =
        if q.IsEmpty then None, q
        else
            match xs with
            | [] -> (Queue(List.rev rxs,[])).TryTake()
            | y::ys -> Some(y), (Queue(ys, rxs))

type Url = Url of string

type ContentHash = ContentHash of byte[]

type CrawlSession = CrawlSession of Guid

type Crawl =
    {
        Session : CrawlSession
        Url : Url
        Headers : (string * string) list
        Content : byte[]
    }

type Config =
    {
        StoreCrawl : Crawl -> Async<unit>
        GetNextPages : Crawl -> Url list
        DegreeOfParallelism : int
    }

type Dependencies =
    {
        Fetch : Url -> Async<Crawl>
        Config : Config
    }

type WorkerMessage =
    | Fetch of Url

type SupervisorMessage =
    | Start of MailboxProcessor<SupervisorMessage> * Url * AsyncReplyChannel<Url Set>
    | FetchCompleted of Url * (Url list)

type SupervisorProgress =
    {
        Supervisor : MailboxProcessor<SupervisorMessage>
        ReplyChannel : AsyncReplyChannel<Url Set>
        Workers : MailboxProcessor<WorkerMessage> list
        PendingUrls : Url Queue
        Completed : Url Set
        Dispatched : int
    }

type SupervisorStatus =
    | NotStarted
    | Running of SupervisorProgress
    | Finished

let startWorker dependencies (supervisor : MailboxProcessor<SupervisorMessage>) =
    MailboxProcessor.Start(fun inbox ->
        let rec loop () =
            async {
                let! Fetch(url) = inbox.Receive()
                let! crawl = dependencies.Fetch url
                do! dependencies.Config.StoreCrawl(crawl)
                let nextUrls = dependencies.Config.GetNextPages(crawl)
                supervisor.Post(FetchCompleted(url, nextUrls))
                return! loop()
            }
        loop())

let rec dispatch dependencies progress =
    match progress.PendingUrls.TryTake() with
    | None, _ -> progress
    | Some url, queue ->
        match progress.Workers |> List.tryFind (fun worker -> worker.CurrentQueueLength = 0) with
        | Some idleWorker ->
            idleWorker.Post(Fetch url)
            dispatch dependencies { progress with
                                        PendingUrls = queue
                                        Dispatched = progress.Dispatched + 1 }
        | None when progress.Workers.Length < dependencies.Config.DegreeOfParallelism ->
            let newWorker = startWorker dependencies (progress.Supervisor)
            dispatch dependencies { progress with Workers = newWorker :: progress.Workers }
        | _ ->
            progress

let enqueueUrls urls progress =
    let pending = progress.PendingUrls |> List.foldBack(fun url pending -> pending.Enqueue(url)) urls
    { progress with PendingUrls = pending }

let complete url progress =
    { progress with
        Completed = progress.Completed.Add(url)
        Dispatched = progress.Dispatched - 1 }

let start supervisor replyChannel =
    {
        Supervisor = supervisor
        ReplyChannel = replyChannel
        Workers = []
        PendingUrls = Queue.Empty()
        Completed = Set.empty
        Dispatched = 0
    }

let handleStart dependencies supervisor url replyChannel =
    start supervisor replyChannel
    |> enqueueUrls [url]
    |> dispatch dependencies
    |> Running

let handleFetchCompleted dependencies url nextUrls progress =
    let progress =
        progress
        |> complete url
        |> enqueueUrls nextUrls
        |> dispatch dependencies
    if progress.PendingUrls.IsEmpty && progress.Dispatched = 0 then
        progress.ReplyChannel.Reply(progress.Completed)
        Finished
    else
        Running progress

let handleSupervisorMessage dependencies message state =
    match message with
    | Start (supervisor, url, replyChannel) ->
        match state with
        | NotStarted ->
            handleStart dependencies supervisor url replyChannel
        | _ -> failwith "Invalid state: Can't be started more than once."
    | FetchCompleted(url, nextUrls) ->
        match state with
        | Running progress ->
            handleFetchCompleted dependencies url nextUrls progress
        | _ -> failwith "Invalid state - can't complete fetch before starting."

let fetchRecursiveInternal dependencies startUrl =
    let supervisor = MailboxProcessor<SupervisorMessage>.Start(fun inbox ->
        let rec loop state =
            async {
                let! message = inbox.Receive()
                match state |> handleSupervisorMessage dependencies message with
                | Finished -> return ()
                | newState -> return! loop newState
            }
        loop NotStarted)
    supervisor.PostAndAsyncReply(fun replyChannel -> Start(supervisor, startUrl, replyChannel))

let fetchRecursive (config : Config) (startUrl : Url) : Async<Url Set> =
    // TODO: write a real fetch method.
    let fetch url = async { return failwith "Not Implemented" }
    let dependencies = { Config = config; Fetch = fetch }
    fetchRecursiveInternal dependencies startUrl

// Simple test harness using mocked internal dependencies.
let runTest () =
    let startUrl = Url "http://test.com"
    let childPages = [ Url "http://test.com/1"; Url "http://test.com/2" ]
    let fetch url =
        async {
            return
                {
                    Url = url
                    Session = CrawlSession(System.Guid.NewGuid())
                    Headers = []
                    Content = [||]
                }
        }
    let dependencies =
        {
            Fetch = fetch
            Config =
                {
                    DegreeOfParallelism = 2
                    GetNextPages =
                        function
                        | crawl when crawl.Url = startUrl -> childPages
                        | _ -> []
                    StoreCrawl = fun _ -> async { return () }
                }
        }
    fetchRecursiveInternal dependencies startUrl |> Async.RunSynchronously
namespace System
Multiple items
type Queue<'a> =
  new : unit -> Queue<'a>
  new : xs:'a list * rxs:'a list -> Queue<'a>
  member Enqueue : x:'a -> Queue<'a>
  member TryTake : unit -> 'a option * Queue<'a>
  member IsEmpty : bool
  static member Empty : unit -> Queue<'a>

Full name: Script.Queue<_>

--------------------
new : unit -> Queue<'a>
new : xs:'a list * rxs:'a list -> Queue<'a>
val xs : 'a list
type 'T list = List<'T>

Full name: Microsoft.FSharp.Collections.list<_>
val rxs : 'a list
static member Queue.Empty : unit -> Queue<'a>

Full name: Script.Queue`1.Empty
val q : Queue<'a>
member Queue.IsEmpty : bool

Full name: Script.Queue`1.IsEmpty
Multiple items
module List

from Microsoft.FSharp.Collections

--------------------
type List<'T> =
  | ( [] )
  | ( :: ) of Head: 'T * Tail: 'T list
  interface IEnumerable
  interface IEnumerable<'T>
  member GetSlice : startIndex:int option * endIndex:int option -> 'T list
  member Head : 'T
  member IsEmpty : bool
  member Item : index:int -> 'T with get
  member Length : int
  member Tail : 'T list
  static member Cons : head:'T * tail:'T list -> 'T list
  static member Empty : 'T list

Full name: Microsoft.FSharp.Collections.List<_>
val isEmpty : list:'T list -> bool

Full name: Microsoft.FSharp.Collections.List.isEmpty
member Queue.Enqueue : x:'a -> Queue<'a>

Full name: Script.Queue`1.Enqueue
val x : 'a
member Queue.TryTake : unit -> 'a option * Queue<'a>

Full name: Script.Queue`1.TryTake
property Queue.IsEmpty: bool
union case Option.None: Option<'T>
val rev : list:'T list -> 'T list

Full name: Microsoft.FSharp.Collections.List.rev
val y : 'a
val ys : 'a list
union case Option.Some: Value: 'T -> Option<'T>
Multiple items
union case Url.Url: string -> Url

--------------------
type Url = | Url of string

Full name: Script.Url
Multiple items
val string : value:'T -> string

Full name: Microsoft.FSharp.Core.Operators.string

--------------------
type string = String

Full name: Microsoft.FSharp.Core.string
Multiple items
union case ContentHash.ContentHash: byte [] -> ContentHash

--------------------
type ContentHash = | ContentHash of byte []

Full name: Script.ContentHash
Multiple items
val byte : value:'T -> byte (requires member op_Explicit)

Full name: Microsoft.FSharp.Core.Operators.byte

--------------------
type byte = Byte

Full name: Microsoft.FSharp.Core.byte
Multiple items
union case CrawlSession.CrawlSession: Guid -> CrawlSession

--------------------
type CrawlSession = | CrawlSession of Guid

Full name: Script.CrawlSession
Multiple items
type Guid =
  struct
    new : b:byte[] -> Guid + 4 overloads
    member CompareTo : value:obj -> int + 1 overload
    member Equals : o:obj -> bool + 1 overload
    member GetHashCode : unit -> int
    member ToByteArray : unit -> byte[]
    member ToString : unit -> string + 2 overloads
    static val Empty : Guid
    static member NewGuid : unit -> Guid
    static member Parse : input:string -> Guid
    static member ParseExact : input:string * format:string -> Guid
    ...
  end

Full name: System.Guid

--------------------
Guid()
Guid(b: byte []) : unit
Guid(g: string) : unit
Guid(a: int, b: int16, c: int16, d: byte []) : unit
Guid(a: uint32, b: uint16, c: uint16, d: byte, e: byte, f: byte, g: byte, h: byte, i: byte, j: byte, k: byte) : unit
Guid(a: int, b: int16, c: int16, d: byte, e: byte, f: byte, g: byte, h: byte, i: byte, j: byte, k: byte) : unit
type Crawl =
  {Session: CrawlSession;
   Url: Url;
   Headers: (string * string) list;
   Content: byte [];}

Full name: Script.Crawl
Crawl.Session: CrawlSession
Multiple items
Crawl.Url: Url

--------------------
type Url = | Url of string

Full name: Script.Url
Crawl.Headers: (string * string) list
Crawl.Content: byte []
type Config =
  {StoreCrawl: Crawl -> Async<unit>;
   GetNextPages: Crawl -> Url list;
   DegreeOfParallelism: int;}

Full name: Script.Config
Config.StoreCrawl: Crawl -> Async<unit>
Multiple items
type Async
static member AsBeginEnd : computation:('Arg -> Async<'T>) -> ('Arg * AsyncCallback * obj -> IAsyncResult) * (IAsyncResult -> 'T) * (IAsyncResult -> unit)
static member AwaitEvent : event:IEvent<'Del,'T> * ?cancelAction:(unit -> unit) -> Async<'T> (requires delegate and 'Del :> Delegate)
static member AwaitIAsyncResult : iar:IAsyncResult * ?millisecondsTimeout:int -> Async<bool>
static member AwaitTask : task:Task -> Async<unit>
static member AwaitTask : task:Task<'T> -> Async<'T>
static member AwaitWaitHandle : waitHandle:WaitHandle * ?millisecondsTimeout:int -> Async<bool>
static member CancelDefaultToken : unit -> unit
static member Catch : computation:Async<'T> -> Async<Choice<'T,exn>>
static member FromBeginEnd : beginAction:(AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg:'Arg1 * beginAction:('Arg1 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg1:'Arg1 * arg2:'Arg2 * beginAction:('Arg1 * 'Arg2 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg1:'Arg1 * arg2:'Arg2 * arg3:'Arg3 * beginAction:('Arg1 * 'Arg2 * 'Arg3 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromContinuations : callback:(('T -> unit) * (exn -> unit) * (OperationCanceledException -> unit) -> unit) -> Async<'T>
static member Ignore : computation:Async<'T> -> Async<unit>
static member OnCancel : interruption:(unit -> unit) -> Async<IDisposable>
static member Parallel : computations:seq<Async<'T>> -> Async<'T []>
static member RunSynchronously : computation:Async<'T> * ?timeout:int * ?cancellationToken:CancellationToken -> 'T
static member Sleep : millisecondsDueTime:int -> Async<unit>
static member Start : computation:Async<unit> * ?cancellationToken:CancellationToken -> unit
static member StartAsTask : computation:Async<'T> * ?taskCreationOptions:TaskCreationOptions * ?cancellationToken:CancellationToken -> Task<'T>
static member StartChild : computation:Async<'T> * ?millisecondsTimeout:int -> Async<Async<'T>>
static member StartChildAsTask : computation:Async<'T> * ?taskCreationOptions:TaskCreationOptions -> Async<Task<'T>>
static member StartImmediate : computation:Async<unit> * ?cancellationToken:CancellationToken -> unit
static member StartWithContinuations : computation:Async<'T> * continuation:('T -> unit) * exceptionContinuation:(exn -> unit) * cancellationContinuation:(OperationCanceledException -> unit) * ?cancellationToken:CancellationToken -> unit
static member SwitchToContext : syncContext:SynchronizationContext -> Async<unit>
static member SwitchToNewThread : unit -> Async<unit>
static member SwitchToThreadPool : unit -> Async<unit>
static member TryCancelled : computation:Async<'T> * compensation:(OperationCanceledException -> unit) -> Async<'T>
static member CancellationToken : Async<CancellationToken>
static member DefaultCancellationToken : CancellationToken

Full name: Microsoft.FSharp.Control.Async

--------------------
type Async<'T>

Full name: Microsoft.FSharp.Control.Async<_>
type unit = Unit

Full name: Microsoft.FSharp.Core.unit
Config.GetNextPages: Crawl -> Url list
Config.DegreeOfParallelism: int
Multiple items
val int : value:'T -> int (requires member op_Explicit)

Full name: Microsoft.FSharp.Core.Operators.int

--------------------
type int = int32

Full name: Microsoft.FSharp.Core.int

--------------------
type int<'Measure> = int

Full name: Microsoft.FSharp.Core.int<_>
type Dependencies =
  {Fetch: Url -> Async<Crawl>;
   Config: Config;}

Full name: Script.Dependencies
Dependencies.Fetch: Url -> Async<Crawl>
Multiple items
Dependencies.Config: Config

--------------------
type Config =
  {StoreCrawl: Crawl -> Async<unit>;
   GetNextPages: Crawl -> Url list;
   DegreeOfParallelism: int;}

Full name: Script.Config
type WorkerMessage = | Fetch of Url

Full name: Script.WorkerMessage
union case WorkerMessage.Fetch: Url -> WorkerMessage
type SupervisorMessage =
  | Start of MailboxProcessor<SupervisorMessage> * Url * AsyncReplyChannel<Set<Url>>
  | FetchCompleted of Url * Url list

Full name: Script.SupervisorMessage
union case SupervisorMessage.Start: MailboxProcessor<SupervisorMessage> * Url * AsyncReplyChannel<Set<Url>> -> SupervisorMessage
Multiple items
type MailboxProcessor<'Msg> =
  interface IDisposable
  new : body:(MailboxProcessor<'Msg> -> Async<unit>) * ?cancellationToken:CancellationToken -> MailboxProcessor<'Msg>
  member Post : message:'Msg -> unit
  member PostAndAsyncReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> Async<'Reply>
  member PostAndReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> 'Reply
  member PostAndTryAsyncReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> Async<'Reply option>
  member Receive : ?timeout:int -> Async<'Msg>
  member Scan : scanner:('Msg -> Async<'T> option) * ?timeout:int -> Async<'T>
  member Start : unit -> unit
  member TryPostAndReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> 'Reply option
  ...

Full name: Microsoft.FSharp.Control.MailboxProcessor<_>

--------------------
new : body:(MailboxProcessor<'Msg> -> Async<unit>) * ?cancellationToken:Threading.CancellationToken -> MailboxProcessor<'Msg>
type AsyncReplyChannel<'Reply>
member Reply : value:'Reply -> unit

Full name: Microsoft.FSharp.Control.AsyncReplyChannel<_>
Multiple items
module Set

from Microsoft.FSharp.Collections

--------------------
type Set<'T (requires comparison)> =
  interface IComparable
  interface IEnumerable
  interface IEnumerable<'T>
  interface ICollection<'T>
  new : elements:seq<'T> -> Set<'T>
  member Add : value:'T -> Set<'T>
  member Contains : value:'T -> bool
  override Equals : obj -> bool
  member IsProperSubsetOf : otherSet:Set<'T> -> bool
  member IsProperSupersetOf : otherSet:Set<'T> -> bool
  ...

Full name: Microsoft.FSharp.Collections.Set<_>

--------------------
new : elements:seq<'T> -> Set<'T>
union case SupervisorMessage.FetchCompleted: Url * Url list -> SupervisorMessage
type SupervisorProgress =
  {Supervisor: MailboxProcessor<SupervisorMessage>;
   ReplyChannel: AsyncReplyChannel<Set<Url>>;
   Workers: MailboxProcessor<WorkerMessage> list;
   PendingUrls: Queue<Url>;
   Completed: Set<Url>;
   Dispatched: int;}

Full name: Script.SupervisorProgress
SupervisorProgress.Supervisor: MailboxProcessor<SupervisorMessage>
SupervisorProgress.ReplyChannel: AsyncReplyChannel<Set<Url>>
SupervisorProgress.Workers: MailboxProcessor<WorkerMessage> list
SupervisorProgress.PendingUrls: Queue<Url>
SupervisorProgress.Completed: Set<Url>
SupervisorProgress.Dispatched: int
type SupervisorStatus =
  | NotStarted
  | Running of SupervisorProgress
  | Finished

Full name: Script.SupervisorStatus
union case SupervisorStatus.NotStarted: SupervisorStatus
union case SupervisorStatus.Running: SupervisorProgress -> SupervisorStatus
union case SupervisorStatus.Finished: SupervisorStatus
val startWorker : dependencies:Dependencies -> supervisor:MailboxProcessor<SupervisorMessage> -> MailboxProcessor<WorkerMessage>

Full name: Script.startWorker
val dependencies : Dependencies
val supervisor : MailboxProcessor<SupervisorMessage>
static member MailboxProcessor.Start : body:(MailboxProcessor<'Msg> -> Async<unit>) * ?cancellationToken:Threading.CancellationToken -> MailboxProcessor<'Msg>
val inbox : MailboxProcessor<WorkerMessage>
val loop : (unit -> Async<'a>)
val async : AsyncBuilder

Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.async
val url : Url
member MailboxProcessor.Receive : ?timeout:int -> Async<'Msg>
val crawl : Crawl
Dependencies.Config: Config
val nextUrls : Url list
member MailboxProcessor.Post : message:'Msg -> unit
val dispatch : dependencies:Dependencies -> progress:SupervisorProgress -> SupervisorProgress

Full name: Script.dispatch
val progress : SupervisorProgress
member Queue.TryTake : unit -> 'a option * Queue<'a>
val queue : Queue<Url>
val tryFind : predicate:('T -> bool) -> list:'T list -> 'T option

Full name: Microsoft.FSharp.Collections.List.tryFind
val worker : MailboxProcessor<WorkerMessage>
property MailboxProcessor.CurrentQueueLength: int
val idleWorker : MailboxProcessor<WorkerMessage>
property List.Length: int
val newWorker : MailboxProcessor<WorkerMessage>
val enqueueUrls : urls:Url list -> progress:SupervisorProgress -> SupervisorProgress

Full name: Script.enqueueUrls
val urls : Url list
val pending : Queue<Url>
val foldBack : folder:('T -> 'State -> 'State) -> list:'T list -> state:'State -> 'State

Full name: Microsoft.FSharp.Collections.List.foldBack
member Queue.Enqueue : x:'a -> Queue<'a>
val complete : url:Url -> progress:SupervisorProgress -> SupervisorProgress

Full name: Script.complete
member Set.Add : value:'T -> Set<'T>
val start : supervisor:MailboxProcessor<SupervisorMessage> -> replyChannel:AsyncReplyChannel<Set<Url>> -> SupervisorProgress

Full name: Script.start
val replyChannel : AsyncReplyChannel<Set<Url>>
static member Queue.Empty : unit -> Queue<'a>
val empty<'T (requires comparison)> : Set<'T> (requires comparison)

Full name: Microsoft.FSharp.Collections.Set.empty
val handleStart : dependencies:Dependencies -> supervisor:MailboxProcessor<SupervisorMessage> -> url:Url -> replyChannel:AsyncReplyChannel<Set<Url>> -> SupervisorStatus

Full name: Script.handleStart
val handleFetchCompleted : dependencies:Dependencies -> url:Url -> nextUrls:Url list -> progress:SupervisorProgress -> SupervisorStatus

Full name: Script.handleFetchCompleted
member AsyncReplyChannel.Reply : value:'Reply -> unit
val handleSupervisorMessage : dependencies:Dependencies -> message:SupervisorMessage -> state:SupervisorStatus -> SupervisorStatus

Full name: Script.handleSupervisorMessage
val message : SupervisorMessage
val state : SupervisorStatus
val failwith : message:string -> 'T

Full name: Microsoft.FSharp.Core.Operators.failwith
val fetchRecursiveInternal : dependencies:Dependencies -> startUrl:Url -> Async<Set<Url>>

Full name: Script.fetchRecursiveInternal
val startUrl : Url
val inbox : MailboxProcessor<SupervisorMessage>
val loop : (SupervisorStatus -> Async<unit>)
val newState : SupervisorStatus
member MailboxProcessor.PostAndAsyncReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> Async<'Reply>
val fetchRecursive : config:Config -> startUrl:Url -> Async<Set<Url>>

Full name: Script.fetchRecursive
val config : Config
val fetch : ('a -> Async<'b>)
val url : 'a
val runTest : unit -> Set<Url>

Full name: Script.runTest
val childPages : Url list
val fetch : (Url -> Async<Crawl>)
Guid.NewGuid() : Guid
Crawl.Url: Url
static member Async.RunSynchronously : computation:Async<'T> * ?timeout:int * ?cancellationToken:Threading.CancellationToken -> 'T
Raw view Test code New version

More information

Link:http://fssnip.net/7Qb
Posted:7 years ago
Author:Daniel Bradley
Tags: agent , agents , crawler , crawling , mailbox processor , web crawler