6 people like it.
Like the snippet!
Parallel recursive crawler using agents
The aim here is to demonstrate a method of distributing work using the built in F# agent across multiple nodes in parallel the result of crawling one page might result in finding multiple new pages to fetch. This is a recursive process which will continue until no new URLs are found.
The main focus is how to process a potentially indefinite queue of work across a pool of workers, rather than how to parse web pages.
1:
2:
3:
4:
5:
6:
7:
8:
9:
10:
11:
12:
13:
14:
15:
16:
17:
18:
19:
20:
21:
22:
23:
24:
25:
26:
27:
28:
29:
30:
31:
32:
33:
34:
35:
36:
37:
38:
39:
40:
41:
42:
43:
44:
45:
46:
47:
48:
49:
50:
51:
52:
53:
54:
55:
56:
57:
58:
59:
60:
61:
62:
63:
64:
65:
66:
67:
68:
69:
70:
71:
72:
73:
74:
75:
76:
77:
78:
79:
80:
81:
82:
83:
84:
85:
86:
87:
88:
89:
90:
91:
92:
93:
94:
95:
96:
97:
98:
99:
100:
101:
102:
103:
104:
105:
106:
107:
108:
109:
110:
111:
112:
113:
114:
115:
116:
117:
118:
119:
120:
121:
122:
123:
124:
125:
126:
127:
128:
129:
130:
131:
132:
133:
134:
135:
136:
137:
138:
139:
140:
141:
142:
143:
144:
145:
146:
147:
148:
149:
150:
151:
152:
153:
154:
155:
156:
157:
158:
159:
160:
161:
162:
163:
164:
165:
166:
167:
168:
169:
170:
171:
172:
173:
174:
175:
176:
177:
178:
179:
180:
181:
182:
183:
184:
185:
186:
187:
188:
189:
190:
|
open System
// Prelude - just a simple immutable queue.
type Queue<'a>(xs : 'a list, rxs : 'a list) =
new() = Queue([], [])
static member Empty() = new Queue<'a>([], [])
member q.IsEmpty = (List.isEmpty xs) && (List.isEmpty rxs)
member q.Enqueue(x) = Queue(xs, x::rxs)
member q.TryTake() =
if q.IsEmpty then None, q
else
match xs with
| [] -> (Queue(List.rev rxs,[])).TryTake()
| y::ys -> Some(y), (Queue(ys, rxs))
type Url = Url of string
type ContentHash = ContentHash of byte[]
type CrawlSession = CrawlSession of Guid
type Crawl =
{
Session : CrawlSession
Url : Url
Headers : (string * string) list
Content : byte[]
}
type Config =
{
StoreCrawl : Crawl -> Async<unit>
GetNextPages : Crawl -> Url list
DegreeOfParallelism : int
}
type Dependencies =
{
Fetch : Url -> Async<Crawl>
Config : Config
}
type WorkerMessage =
| Fetch of Url
type SupervisorMessage =
| Start of MailboxProcessor<SupervisorMessage> * Url * AsyncReplyChannel<Url Set>
| FetchCompleted of Url * (Url list)
type SupervisorProgress =
{
Supervisor : MailboxProcessor<SupervisorMessage>
ReplyChannel : AsyncReplyChannel<Url Set>
Workers : MailboxProcessor<WorkerMessage> list
PendingUrls : Url Queue
Completed : Url Set
Dispatched : int
}
type SupervisorStatus =
| NotStarted
| Running of SupervisorProgress
| Finished
let startWorker dependencies (supervisor : MailboxProcessor<SupervisorMessage>) =
MailboxProcessor.Start(fun inbox ->
let rec loop () =
async {
let! Fetch(url) = inbox.Receive()
let! crawl = dependencies.Fetch url
do! dependencies.Config.StoreCrawl(crawl)
let nextUrls = dependencies.Config.GetNextPages(crawl)
supervisor.Post(FetchCompleted(url, nextUrls))
return! loop()
}
loop())
let rec dispatch dependencies progress =
match progress.PendingUrls.TryTake() with
| None, _ -> progress
| Some url, queue ->
match progress.Workers |> List.tryFind (fun worker -> worker.CurrentQueueLength = 0) with
| Some idleWorker ->
idleWorker.Post(Fetch url)
dispatch dependencies { progress with
PendingUrls = queue
Dispatched = progress.Dispatched + 1 }
| None when progress.Workers.Length < dependencies.Config.DegreeOfParallelism ->
let newWorker = startWorker dependencies (progress.Supervisor)
dispatch dependencies { progress with Workers = newWorker :: progress.Workers }
| _ ->
progress
let enqueueUrls urls progress =
let pending = progress.PendingUrls |> List.foldBack(fun url pending -> pending.Enqueue(url)) urls
{ progress with PendingUrls = pending }
let complete url progress =
{ progress with
Completed = progress.Completed.Add(url)
Dispatched = progress.Dispatched - 1 }
let start supervisor replyChannel =
{
Supervisor = supervisor
ReplyChannel = replyChannel
Workers = []
PendingUrls = Queue.Empty()
Completed = Set.empty
Dispatched = 0
}
let handleStart dependencies supervisor url replyChannel =
start supervisor replyChannel
|> enqueueUrls [url]
|> dispatch dependencies
|> Running
let handleFetchCompleted dependencies url nextUrls progress =
let progress =
progress
|> complete url
|> enqueueUrls nextUrls
|> dispatch dependencies
if progress.PendingUrls.IsEmpty && progress.Dispatched = 0 then
progress.ReplyChannel.Reply(progress.Completed)
Finished
else
Running progress
let handleSupervisorMessage dependencies message state =
match message with
| Start (supervisor, url, replyChannel) ->
match state with
| NotStarted ->
handleStart dependencies supervisor url replyChannel
| _ -> failwith "Invalid state: Can't be started more than once."
| FetchCompleted(url, nextUrls) ->
match state with
| Running progress ->
handleFetchCompleted dependencies url nextUrls progress
| _ -> failwith "Invalid state - can't complete fetch before starting."
let fetchRecursiveInternal dependencies startUrl =
let supervisor = MailboxProcessor<SupervisorMessage>.Start(fun inbox ->
let rec loop state =
async {
let! message = inbox.Receive()
match state |> handleSupervisorMessage dependencies message with
| Finished -> return ()
| newState -> return! loop newState
}
loop NotStarted)
supervisor.PostAndAsyncReply(fun replyChannel -> Start(supervisor, startUrl, replyChannel))
let fetchRecursive (config : Config) (startUrl : Url) : Async<Url Set> =
// TODO: write a real fetch method.
let fetch url = async { return failwith "Not Implemented" }
let dependencies = { Config = config; Fetch = fetch }
fetchRecursiveInternal dependencies startUrl
// Simple test harness using mocked internal dependencies.
let runTest () =
let startUrl = Url "http://test.com"
let childPages = [ Url "http://test.com/1"; Url "http://test.com/2" ]
let fetch url =
async {
return
{
Url = url
Session = CrawlSession(System.Guid.NewGuid())
Headers = []
Content = [||]
}
}
let dependencies =
{
Fetch = fetch
Config =
{
DegreeOfParallelism = 2
GetNextPages =
function
| crawl when crawl.Url = startUrl -> childPages
| _ -> []
StoreCrawl = fun _ -> async { return () }
}
}
fetchRecursiveInternal dependencies startUrl |> Async.RunSynchronously
|
namespace System
Multiple items
type Queue<'a> =
new : unit -> Queue<'a>
new : xs:'a list * rxs:'a list -> Queue<'a>
member Enqueue : x:'a -> Queue<'a>
member TryTake : unit -> 'a option * Queue<'a>
member IsEmpty : bool
static member Empty : unit -> Queue<'a>
Full name: Script.Queue<_>
--------------------
new : unit -> Queue<'a>
new : xs:'a list * rxs:'a list -> Queue<'a>
val xs : 'a list
type 'T list = List<'T>
Full name: Microsoft.FSharp.Collections.list<_>
val rxs : 'a list
static member Queue.Empty : unit -> Queue<'a>
Full name: Script.Queue`1.Empty
val q : Queue<'a>
member Queue.IsEmpty : bool
Full name: Script.Queue`1.IsEmpty
Multiple items
module List
from Microsoft.FSharp.Collections
--------------------
type List<'T> =
| ( [] )
| ( :: ) of Head: 'T * Tail: 'T list
interface IEnumerable
interface IEnumerable<'T>
member GetSlice : startIndex:int option * endIndex:int option -> 'T list
member Head : 'T
member IsEmpty : bool
member Item : index:int -> 'T with get
member Length : int
member Tail : 'T list
static member Cons : head:'T * tail:'T list -> 'T list
static member Empty : 'T list
Full name: Microsoft.FSharp.Collections.List<_>
val isEmpty : list:'T list -> bool
Full name: Microsoft.FSharp.Collections.List.isEmpty
member Queue.Enqueue : x:'a -> Queue<'a>
Full name: Script.Queue`1.Enqueue
val x : 'a
member Queue.TryTake : unit -> 'a option * Queue<'a>
Full name: Script.Queue`1.TryTake
property Queue.IsEmpty: bool
union case Option.None: Option<'T>
val rev : list:'T list -> 'T list
Full name: Microsoft.FSharp.Collections.List.rev
val y : 'a
val ys : 'a list
union case Option.Some: Value: 'T -> Option<'T>
Multiple items
union case Url.Url: string -> Url
--------------------
type Url = | Url of string
Full name: Script.Url
Multiple items
val string : value:'T -> string
Full name: Microsoft.FSharp.Core.Operators.string
--------------------
type string = String
Full name: Microsoft.FSharp.Core.string
Multiple items
union case ContentHash.ContentHash: byte [] -> ContentHash
--------------------
type ContentHash = | ContentHash of byte []
Full name: Script.ContentHash
Multiple items
val byte : value:'T -> byte (requires member op_Explicit)
Full name: Microsoft.FSharp.Core.Operators.byte
--------------------
type byte = Byte
Full name: Microsoft.FSharp.Core.byte
Multiple items
union case CrawlSession.CrawlSession: Guid -> CrawlSession
--------------------
type CrawlSession = | CrawlSession of Guid
Full name: Script.CrawlSession
Multiple items
type Guid =
struct
new : b:byte[] -> Guid + 4 overloads
member CompareTo : value:obj -> int + 1 overload
member Equals : o:obj -> bool + 1 overload
member GetHashCode : unit -> int
member ToByteArray : unit -> byte[]
member ToString : unit -> string + 2 overloads
static val Empty : Guid
static member NewGuid : unit -> Guid
static member Parse : input:string -> Guid
static member ParseExact : input:string * format:string -> Guid
...
end
Full name: System.Guid
--------------------
Guid()
Guid(b: byte []) : unit
Guid(g: string) : unit
Guid(a: int, b: int16, c: int16, d: byte []) : unit
Guid(a: uint32, b: uint16, c: uint16, d: byte, e: byte, f: byte, g: byte, h: byte, i: byte, j: byte, k: byte) : unit
Guid(a: int, b: int16, c: int16, d: byte, e: byte, f: byte, g: byte, h: byte, i: byte, j: byte, k: byte) : unit
type Crawl =
{Session: CrawlSession;
Url: Url;
Headers: (string * string) list;
Content: byte [];}
Full name: Script.Crawl
Crawl.Session: CrawlSession
Multiple items
Crawl.Url: Url
--------------------
type Url = | Url of string
Full name: Script.Url
Crawl.Headers: (string * string) list
Crawl.Content: byte []
type Config =
{StoreCrawl: Crawl -> Async<unit>;
GetNextPages: Crawl -> Url list;
DegreeOfParallelism: int;}
Full name: Script.Config
Config.StoreCrawl: Crawl -> Async<unit>
Multiple items
type Async
static member AsBeginEnd : computation:('Arg -> Async<'T>) -> ('Arg * AsyncCallback * obj -> IAsyncResult) * (IAsyncResult -> 'T) * (IAsyncResult -> unit)
static member AwaitEvent : event:IEvent<'Del,'T> * ?cancelAction:(unit -> unit) -> Async<'T> (requires delegate and 'Del :> Delegate)
static member AwaitIAsyncResult : iar:IAsyncResult * ?millisecondsTimeout:int -> Async<bool>
static member AwaitTask : task:Task -> Async<unit>
static member AwaitTask : task:Task<'T> -> Async<'T>
static member AwaitWaitHandle : waitHandle:WaitHandle * ?millisecondsTimeout:int -> Async<bool>
static member CancelDefaultToken : unit -> unit
static member Catch : computation:Async<'T> -> Async<Choice<'T,exn>>
static member FromBeginEnd : beginAction:(AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg:'Arg1 * beginAction:('Arg1 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg1:'Arg1 * arg2:'Arg2 * beginAction:('Arg1 * 'Arg2 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg1:'Arg1 * arg2:'Arg2 * arg3:'Arg3 * beginAction:('Arg1 * 'Arg2 * 'Arg3 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromContinuations : callback:(('T -> unit) * (exn -> unit) * (OperationCanceledException -> unit) -> unit) -> Async<'T>
static member Ignore : computation:Async<'T> -> Async<unit>
static member OnCancel : interruption:(unit -> unit) -> Async<IDisposable>
static member Parallel : computations:seq<Async<'T>> -> Async<'T []>
static member RunSynchronously : computation:Async<'T> * ?timeout:int * ?cancellationToken:CancellationToken -> 'T
static member Sleep : millisecondsDueTime:int -> Async<unit>
static member Start : computation:Async<unit> * ?cancellationToken:CancellationToken -> unit
static member StartAsTask : computation:Async<'T> * ?taskCreationOptions:TaskCreationOptions * ?cancellationToken:CancellationToken -> Task<'T>
static member StartChild : computation:Async<'T> * ?millisecondsTimeout:int -> Async<Async<'T>>
static member StartChildAsTask : computation:Async<'T> * ?taskCreationOptions:TaskCreationOptions -> Async<Task<'T>>
static member StartImmediate : computation:Async<unit> * ?cancellationToken:CancellationToken -> unit
static member StartWithContinuations : computation:Async<'T> * continuation:('T -> unit) * exceptionContinuation:(exn -> unit) * cancellationContinuation:(OperationCanceledException -> unit) * ?cancellationToken:CancellationToken -> unit
static member SwitchToContext : syncContext:SynchronizationContext -> Async<unit>
static member SwitchToNewThread : unit -> Async<unit>
static member SwitchToThreadPool : unit -> Async<unit>
static member TryCancelled : computation:Async<'T> * compensation:(OperationCanceledException -> unit) -> Async<'T>
static member CancellationToken : Async<CancellationToken>
static member DefaultCancellationToken : CancellationToken
Full name: Microsoft.FSharp.Control.Async
--------------------
type Async<'T>
Full name: Microsoft.FSharp.Control.Async<_>
type unit = Unit
Full name: Microsoft.FSharp.Core.unit
Config.GetNextPages: Crawl -> Url list
Config.DegreeOfParallelism: int
Multiple items
val int : value:'T -> int (requires member op_Explicit)
Full name: Microsoft.FSharp.Core.Operators.int
--------------------
type int = int32
Full name: Microsoft.FSharp.Core.int
--------------------
type int<'Measure> = int
Full name: Microsoft.FSharp.Core.int<_>
type Dependencies =
{Fetch: Url -> Async<Crawl>;
Config: Config;}
Full name: Script.Dependencies
Dependencies.Fetch: Url -> Async<Crawl>
Multiple items
Dependencies.Config: Config
--------------------
type Config =
{StoreCrawl: Crawl -> Async<unit>;
GetNextPages: Crawl -> Url list;
DegreeOfParallelism: int;}
Full name: Script.Config
type WorkerMessage = | Fetch of Url
Full name: Script.WorkerMessage
union case WorkerMessage.Fetch: Url -> WorkerMessage
type SupervisorMessage =
| Start of MailboxProcessor<SupervisorMessage> * Url * AsyncReplyChannel<Set<Url>>
| FetchCompleted of Url * Url list
Full name: Script.SupervisorMessage
union case SupervisorMessage.Start: MailboxProcessor<SupervisorMessage> * Url * AsyncReplyChannel<Set<Url>> -> SupervisorMessage
Multiple items
type MailboxProcessor<'Msg> =
interface IDisposable
new : body:(MailboxProcessor<'Msg> -> Async<unit>) * ?cancellationToken:CancellationToken -> MailboxProcessor<'Msg>
member Post : message:'Msg -> unit
member PostAndAsyncReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> Async<'Reply>
member PostAndReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> 'Reply
member PostAndTryAsyncReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> Async<'Reply option>
member Receive : ?timeout:int -> Async<'Msg>
member Scan : scanner:('Msg -> Async<'T> option) * ?timeout:int -> Async<'T>
member Start : unit -> unit
member TryPostAndReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> 'Reply option
...
Full name: Microsoft.FSharp.Control.MailboxProcessor<_>
--------------------
new : body:(MailboxProcessor<'Msg> -> Async<unit>) * ?cancellationToken:Threading.CancellationToken -> MailboxProcessor<'Msg>
type AsyncReplyChannel<'Reply>
member Reply : value:'Reply -> unit
Full name: Microsoft.FSharp.Control.AsyncReplyChannel<_>
Multiple items
module Set
from Microsoft.FSharp.Collections
--------------------
type Set<'T (requires comparison)> =
interface IComparable
interface IEnumerable
interface IEnumerable<'T>
interface ICollection<'T>
new : elements:seq<'T> -> Set<'T>
member Add : value:'T -> Set<'T>
member Contains : value:'T -> bool
override Equals : obj -> bool
member IsProperSubsetOf : otherSet:Set<'T> -> bool
member IsProperSupersetOf : otherSet:Set<'T> -> bool
...
Full name: Microsoft.FSharp.Collections.Set<_>
--------------------
new : elements:seq<'T> -> Set<'T>
union case SupervisorMessage.FetchCompleted: Url * Url list -> SupervisorMessage
type SupervisorProgress =
{Supervisor: MailboxProcessor<SupervisorMessage>;
ReplyChannel: AsyncReplyChannel<Set<Url>>;
Workers: MailboxProcessor<WorkerMessage> list;
PendingUrls: Queue<Url>;
Completed: Set<Url>;
Dispatched: int;}
Full name: Script.SupervisorProgress
SupervisorProgress.Supervisor: MailboxProcessor<SupervisorMessage>
SupervisorProgress.ReplyChannel: AsyncReplyChannel<Set<Url>>
SupervisorProgress.Workers: MailboxProcessor<WorkerMessage> list
SupervisorProgress.PendingUrls: Queue<Url>
SupervisorProgress.Completed: Set<Url>
SupervisorProgress.Dispatched: int
type SupervisorStatus =
| NotStarted
| Running of SupervisorProgress
| Finished
Full name: Script.SupervisorStatus
union case SupervisorStatus.NotStarted: SupervisorStatus
union case SupervisorStatus.Running: SupervisorProgress -> SupervisorStatus
union case SupervisorStatus.Finished: SupervisorStatus
val startWorker : dependencies:Dependencies -> supervisor:MailboxProcessor<SupervisorMessage> -> MailboxProcessor<WorkerMessage>
Full name: Script.startWorker
val dependencies : Dependencies
val supervisor : MailboxProcessor<SupervisorMessage>
static member MailboxProcessor.Start : body:(MailboxProcessor<'Msg> -> Async<unit>) * ?cancellationToken:Threading.CancellationToken -> MailboxProcessor<'Msg>
val inbox : MailboxProcessor<WorkerMessage>
val loop : (unit -> Async<'a>)
val async : AsyncBuilder
Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.async
val url : Url
member MailboxProcessor.Receive : ?timeout:int -> Async<'Msg>
val crawl : Crawl
Dependencies.Config: Config
val nextUrls : Url list
member MailboxProcessor.Post : message:'Msg -> unit
val dispatch : dependencies:Dependencies -> progress:SupervisorProgress -> SupervisorProgress
Full name: Script.dispatch
val progress : SupervisorProgress
member Queue.TryTake : unit -> 'a option * Queue<'a>
val queue : Queue<Url>
val tryFind : predicate:('T -> bool) -> list:'T list -> 'T option
Full name: Microsoft.FSharp.Collections.List.tryFind
val worker : MailboxProcessor<WorkerMessage>
property MailboxProcessor.CurrentQueueLength: int
val idleWorker : MailboxProcessor<WorkerMessage>
property List.Length: int
val newWorker : MailboxProcessor<WorkerMessage>
val enqueueUrls : urls:Url list -> progress:SupervisorProgress -> SupervisorProgress
Full name: Script.enqueueUrls
val urls : Url list
val pending : Queue<Url>
val foldBack : folder:('T -> 'State -> 'State) -> list:'T list -> state:'State -> 'State
Full name: Microsoft.FSharp.Collections.List.foldBack
member Queue.Enqueue : x:'a -> Queue<'a>
val complete : url:Url -> progress:SupervisorProgress -> SupervisorProgress
Full name: Script.complete
member Set.Add : value:'T -> Set<'T>
val start : supervisor:MailboxProcessor<SupervisorMessage> -> replyChannel:AsyncReplyChannel<Set<Url>> -> SupervisorProgress
Full name: Script.start
val replyChannel : AsyncReplyChannel<Set<Url>>
static member Queue.Empty : unit -> Queue<'a>
val empty<'T (requires comparison)> : Set<'T> (requires comparison)
Full name: Microsoft.FSharp.Collections.Set.empty
val handleStart : dependencies:Dependencies -> supervisor:MailboxProcessor<SupervisorMessage> -> url:Url -> replyChannel:AsyncReplyChannel<Set<Url>> -> SupervisorStatus
Full name: Script.handleStart
val handleFetchCompleted : dependencies:Dependencies -> url:Url -> nextUrls:Url list -> progress:SupervisorProgress -> SupervisorStatus
Full name: Script.handleFetchCompleted
member AsyncReplyChannel.Reply : value:'Reply -> unit
val handleSupervisorMessage : dependencies:Dependencies -> message:SupervisorMessage -> state:SupervisorStatus -> SupervisorStatus
Full name: Script.handleSupervisorMessage
val message : SupervisorMessage
val state : SupervisorStatus
val failwith : message:string -> 'T
Full name: Microsoft.FSharp.Core.Operators.failwith
val fetchRecursiveInternal : dependencies:Dependencies -> startUrl:Url -> Async<Set<Url>>
Full name: Script.fetchRecursiveInternal
val startUrl : Url
val inbox : MailboxProcessor<SupervisorMessage>
val loop : (SupervisorStatus -> Async<unit>)
val newState : SupervisorStatus
member MailboxProcessor.PostAndAsyncReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> Async<'Reply>
val fetchRecursive : config:Config -> startUrl:Url -> Async<Set<Url>>
Full name: Script.fetchRecursive
val config : Config
val fetch : ('a -> Async<'b>)
val url : 'a
val runTest : unit -> Set<Url>
Full name: Script.runTest
val childPages : Url list
val fetch : (Url -> Async<Crawl>)
Guid.NewGuid() : Guid
Crawl.Url: Url
static member Async.RunSynchronously : computation:Async<'T> * ?timeout:int * ?cancellationToken:Threading.CancellationToken -> 'T
More information