0 people like it.
Like the snippet!
Web Crawler extensions
The snippet extends a web crawler from snippet http://fssnip.net/3K. It synchronizes all printing using an additional agent (so printed text does not interleave) and the crawling function returns an asynchronous workflow that returns when crawling completes.
1:
2:
3:
4:
5:
6:
7:
8:
9:
10:
11:
12:
13:
14:
15:
16:
17:
18:
19:
20:
21:
22:
23:
24:
25:
26:
27:
28:
29:
30:
31:
32:
33:
34:
35:
36:
37:
38:
39:
40:
41:
42:
43:
44:
45:
46:
47:
48:
49:
50:
51:
52:
53:
54:
55:
56:
57:
58:
59:
60:
61:
62:
63:
64:
65:
66:
67:
68:
69:
70:
71:
72:
73:
74:
75:
76:
77:
78:
79:
80:
81:
82:
83:
84:
85:
86:
87:
88:
89:
90:
91:
92:
93:
94:
95:
96:
97:
98:
99:
100:
101:
102:
103:
104:
105:
106:
107:
108:
109:
110:
111:
112:
113:
114:
115:
116:
117:
118:
119:
120:
121:
122:
123:
124:
125:
126:
127:
128:
129:
130:
131:
132:
133:
134:
135:
136:
137:
138:
139:
140:
141:
142:
143:
144:
145:
146:
147:
148:
149:
150:
151:
152:
153:
154:
155:
156:
157:
158:
159:
160:
161:
162:
163:
164:
165:
166:
167:
168:
169:
170:
171:
172:
173:
174:
175:
176:
177:
178:
179:
180:
181:
182:
183:
184:
185:
186:
187:
188:
189:
190:
|
open System
open System.Collections.Concurrent
open System.Collections.Generic
open System.IO
open System.Net
open System.Text.RegularExpressions
module Helpers =
type Message =
| Done
| Mailbox of MailboxProcessor<Message>
| Stop
| Url of string option
| Start of AsyncReplyChannel<unit>
// Gates the number of crawling agents.
[<Literal>]
let Gate = 5
// Extracts links from HTML.
let extractLinks html =
let pattern1 = "(?i)href\\s*=\\s*(\"|\')/?((?!#.*|/\B|" +
"mailto:|location\.|javascript:)[^\"\']+)(\"|\')"
let pattern2 = "(?i)^https?"
let links =
[
for x in Regex(pattern1).Matches(html) do
yield x.Groups.[2].Value
]
|> List.filter (fun x -> Regex(pattern2).IsMatch(x))
links
// Fetches a Web page.
let fetch (url : string) =
try
let req = WebRequest.Create(url) :?> HttpWebRequest
req.UserAgent <- "Mozilla/5.0 (Windows; U; MSIE 9.0; Windows NT 9.0; en-US)"
req.Timeout <- 5000
use resp = req.GetResponse()
let content = resp.ContentType
let isHtml = Regex("html").IsMatch(content)
match isHtml with
| true -> use stream = resp.GetResponseStream()
use reader = new StreamReader(stream)
let html = reader.ReadToEnd()
Some html
| false -> None
with
| _ -> None
let collectLinks url =
let html = fetch url
match html with
| Some x -> extractLinks x
| None -> []
open Helpers
let crawl url limit =
// Concurrent queue for saving collected urls.
let q = ConcurrentQueue<string>()
// Holds crawled URLs.
let set = HashSet<string>()
// Creates a mailbox that synchronizes printing to the console (so
// that two calls to 'printfn' do not interleave when printing)
let printer =
MailboxProcessor.Start(fun x -> async {
while true do
let! str = x.Receive()
printfn "%s" str })
// Hides standard 'printfn' function (formats the string using
// 'kprintf' and then posts the result to the printer agent.
let printfn fmt =
Printf.kprintf printer.Post fmt
let supervisor =
MailboxProcessor.Start(fun x -> async {
// The agent expects to receive 'Start' message first - the message
// carries a reply channel that is used to notify the caller
// when the agent completes crawling.
let! start = x.Receive()
let repl =
match start with
| Start repl -> repl
| _ -> failwith "Expected Start message!"
let rec loop run =
async {
let! msg = x.Receive()
match msg with
| Mailbox(mailbox) ->
let count = set.Count
if count < limit - 1 && run then
let url = q.TryDequeue()
match url with
| true, str -> if not (set.Contains str) then
let set'= set.Add str
mailbox.Post <| Url(Some str)
return! loop run
else
mailbox.Post <| Url None
return! loop run
| _ -> mailbox.Post <| Url None
return! loop run
else
mailbox.Post Stop
return! loop run
| Stop -> return! loop false
| Start _ -> failwith "Unexpected start message!"
| Url _ -> failwith "Unexpected URL message!"
| Done -> printfn "Supervisor is done."
(x :> IDisposable).Dispose()
// Notify the caller that the agent has completed
repl.Reply(())
}
do! loop true })
let urlCollector =
MailboxProcessor.Start(fun y ->
let rec loop count =
async {
let! msg = y.TryReceive(6000)
match msg with
| Some message ->
match message with
| Url u ->
match u with
| Some url -> q.Enqueue url
return! loop count
| None -> return! loop count
| _ ->
match count with
| Gate -> supervisor.Post Done
(y :> IDisposable).Dispose()
printfn "URL collector is done."
| _ -> return! loop (count + 1)
| None -> supervisor.Post Stop
return! loop count
}
loop 1)
/// Initializes a crawling agent.
let crawler id =
MailboxProcessor.Start(fun inbox ->
let rec loop() =
async {
let! msg = inbox.Receive()
match msg with
| Url x ->
match x with
| Some url ->
let links = collectLinks url
printfn "%s crawled by agent %d." url id
for link in links do
urlCollector.Post <| Url (Some link)
supervisor.Post(Mailbox(inbox))
return! loop()
| None -> supervisor.Post(Mailbox(inbox))
return! loop()
| _ -> urlCollector.Post Done
printfn "Agent %d is done." id
(inbox :> IDisposable).Dispose()
}
loop())
// Send 'Start' message to the main agent. The result
// is asynchronous workflow that will complete when the
// agent crawling completes
let result = supervisor.PostAndAsyncReply(Start)
// Spawn the crawlers.
let crawlers =
[
for i in 1 .. Gate do
yield crawler i
]
// Post the first messages.
crawlers.Head.Post <| Url (Some url)
crawlers.Tail |> List.iter (fun ag -> ag.Post <| Url None)
result
// Example:
crawl "http://news.google.com" 25
|> Async.RunSynchronously
|
namespace System
namespace System.Collections
namespace System.Collections.Concurrent
namespace System.Collections.Generic
namespace System.IO
namespace System.Net
namespace System.Text
namespace System.Text.RegularExpressions
type Message =
| Done
| Mailbox of MailboxProcessor<Message>
| Stop
| Url of string option
| Start of AsyncReplyChannel<unit>
union case Message.Done: Message
union case Message.Mailbox: MailboxProcessor<Message> -> Message
Multiple items
type MailboxProcessor<'Msg> =
interface IDisposable
new : body:(MailboxProcessor<'Msg> -> Async<unit>) * ?cancellationToken:CancellationToken -> MailboxProcessor<'Msg>
member Post : message:'Msg -> unit
member PostAndAsyncReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> Async<'Reply>
member PostAndReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> 'Reply
member PostAndTryAsyncReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> Async<'Reply option>
member Receive : ?timeout:int -> Async<'Msg>
member Scan : scanner:('Msg -> Async<'T> option) * ?timeout:int -> Async<'T>
member Start : unit -> unit
member TryPostAndReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> 'Reply option
...
--------------------
new : body:(MailboxProcessor<'Msg> -> Async<unit>) * ?cancellationToken:Threading.CancellationToken -> MailboxProcessor<'Msg>
union case Message.Stop: Message
union case Message.Url: string option -> Message
Multiple items
val string : value:'T -> string
--------------------
type string = String
type 'T option = Option<'T>
union case Message.Start: AsyncReplyChannel<unit> -> Message
type AsyncReplyChannel<'Reply> =
member Reply : value:'Reply -> unit
type unit = Unit
Multiple items
type LiteralAttribute =
inherit Attribute
new : unit -> LiteralAttribute
--------------------
new : unit -> LiteralAttribute
val Gate : int
val extractLinks : html:string -> string list
val html : string
val pattern1 : string
val pattern2 : string
val links : string list
val x : Match
Multiple items
type Regex =
new : pattern:string -> Regex + 2 overloads
member GetGroupNames : unit -> string[]
member GetGroupNumbers : unit -> int[]
member GroupNameFromNumber : i:int -> string
member GroupNumberFromName : name:string -> int
member IsMatch : input:string -> bool + 1 overload
member Match : input:string -> Match + 2 overloads
member MatchTimeout : TimeSpan
member Matches : input:string -> MatchCollection + 1 overload
member Options : RegexOptions
...
--------------------
Regex(pattern: string) : Regex
Regex(pattern: string, options: RegexOptions) : Regex
Regex(pattern: string, options: RegexOptions, matchTimeout: TimeSpan) : Regex
Multiple items
type List<'T> =
new : unit -> List<'T> + 2 overloads
member Add : item:'T -> unit
member AddRange : collection:IEnumerable<'T> -> unit
member AsReadOnly : unit -> ReadOnlyCollection<'T>
member BinarySearch : item:'T -> int + 2 overloads
member Capacity : int with get, set
member Clear : unit -> unit
member Contains : item:'T -> bool
member ConvertAll<'TOutput> : converter:Converter<'T, 'TOutput> -> List<'TOutput>
member CopyTo : array:'T[] -> unit + 2 overloads
...
nested type Enumerator
--------------------
List() : List<'T>
List(capacity: int) : List<'T>
List(collection: IEnumerable<'T>) : List<'T>
val filter : predicate:('T -> bool) -> list:'T list -> 'T list
val x : string
val fetch : url:string -> string option
val url : string
val req : HttpWebRequest
type WebRequest =
inherit MarshalByRefObject
member Abort : unit -> unit
member AuthenticationLevel : AuthenticationLevel with get, set
member BeginGetRequestStream : callback:AsyncCallback * state:obj -> IAsyncResult
member BeginGetResponse : callback:AsyncCallback * state:obj -> IAsyncResult
member CachePolicy : RequestCachePolicy with get, set
member ConnectionGroupName : string with get, set
member ContentLength : int64 with get, set
member ContentType : string with get, set
member Credentials : ICredentials with get, set
member EndGetRequestStream : asyncResult:IAsyncResult -> Stream
...
WebRequest.Create(requestUri: Uri) : WebRequest
WebRequest.Create(requestUriString: string) : WebRequest
Multiple items
type HttpWebRequest =
inherit WebRequest
new : unit -> HttpWebRequest
member Abort : unit -> unit
member Accept : string with get, set
member AddRange : range:int -> unit + 7 overloads
member Address : Uri
member AllowAutoRedirect : bool with get, set
member AllowReadStreamBuffering : bool with get, set
member AllowWriteStreamBuffering : bool with get, set
member AutomaticDecompression : DecompressionMethods with get, set
member BeginGetRequestStream : callback:AsyncCallback * state:obj -> IAsyncResult
...
--------------------
HttpWebRequest() : HttpWebRequest
val resp : WebResponse
val content : string
val isHtml : bool
val stream : Stream
val reader : StreamReader
Multiple items
type StreamReader =
inherit TextReader
new : stream:Stream -> StreamReader + 10 overloads
member BaseStream : Stream
member Close : unit -> unit
member CurrentEncoding : Encoding
member DiscardBufferedData : unit -> unit
member EndOfStream : bool
member Peek : unit -> int
member Read : unit -> int + 2 overloads
member ReadAsync : buffer:Memory<char> * ?cancellationToken:CancellationToken -> ValueTask<int> + 1 overload
member ReadBlock : buffer:Span<char> -> int + 1 overload
...
--------------------
StreamReader(stream: Stream) : StreamReader
(+0 other overloads)
StreamReader(path: string) : StreamReader
(+0 other overloads)
StreamReader(stream: Stream, detectEncodingFromByteOrderMarks: bool) : StreamReader
(+0 other overloads)
StreamReader(stream: Stream, encoding: Text.Encoding) : StreamReader
(+0 other overloads)
StreamReader(path: string, detectEncodingFromByteOrderMarks: bool) : StreamReader
(+0 other overloads)
StreamReader(path: string, encoding: Text.Encoding) : StreamReader
(+0 other overloads)
StreamReader(stream: Stream, encoding: Text.Encoding, detectEncodingFromByteOrderMarks: bool) : StreamReader
(+0 other overloads)
StreamReader(path: string, encoding: Text.Encoding, detectEncodingFromByteOrderMarks: bool) : StreamReader
(+0 other overloads)
StreamReader(stream: Stream, encoding: Text.Encoding, detectEncodingFromByteOrderMarks: bool, bufferSize: int) : StreamReader
(+0 other overloads)
StreamReader(path: string, encoding: Text.Encoding, detectEncodingFromByteOrderMarks: bool, bufferSize: int) : StreamReader
(+0 other overloads)
union case Option.Some: Value: 'T -> Option<'T>
union case Option.None: Option<'T>
val collectLinks : url:string -> string list
val html : string option
module Helpers
from Script
val crawl : url:string -> limit:int -> Async<unit>
val limit : int
val q : ConcurrentQueue<string>
Multiple items
type ConcurrentQueue<'T> =
new : unit -> ConcurrentQueue<'T> + 1 overload
member Clear : unit -> unit
member CopyTo : array:'T[] * index:int -> unit
member Count : int
member Enqueue : item:'T -> unit
member GetEnumerator : unit -> IEnumerator<'T>
member IsEmpty : bool
member ToArray : unit -> 'T[]
member TryDequeue : result:'T -> bool
member TryPeek : result:'T -> bool
--------------------
ConcurrentQueue() : ConcurrentQueue<'T>
ConcurrentQueue(collection: IEnumerable<'T>) : ConcurrentQueue<'T>
val set : HashSet<string>
Multiple items
type HashSet<'T> =
new : unit -> HashSet<'T> + 5 overloads
member Add : item:'T -> bool
member Clear : unit -> unit
member Comparer : IEqualityComparer<'T>
member Contains : item:'T -> bool
member CopyTo : array:'T[] -> unit + 2 overloads
member Count : int
member EnsureCapacity : capacity:int -> int
member ExceptWith : other:IEnumerable<'T> -> unit
member GetEnumerator : unit -> Enumerator<'T>
...
nested type Enumerator
--------------------
HashSet() : HashSet<'T>
HashSet(comparer: IEqualityComparer<'T>) : HashSet<'T>
HashSet(capacity: int) : HashSet<'T>
HashSet(collection: IEnumerable<'T>) : HashSet<'T>
HashSet(collection: IEnumerable<'T>, comparer: IEqualityComparer<'T>) : HashSet<'T>
HashSet(capacity: int, comparer: IEqualityComparer<'T>) : HashSet<'T>
val printer : MailboxProcessor<string>
static member MailboxProcessor.Start : body:(MailboxProcessor<'Msg> -> Async<unit>) * ?cancellationToken:Threading.CancellationToken -> MailboxProcessor<'Msg>
val x : MailboxProcessor<string>
val async : AsyncBuilder
val str : string
val printfn : format:Printf.TextWriterFormat<'T> -> 'T
val printfn : (Printf.StringFormat<'a,unit> -> 'a)
val fmt : Printf.StringFormat<'a,unit>
module Printf
from Microsoft.FSharp.Core
val kprintf : continuation:(string -> 'Result) -> format:Printf.StringFormat<'T,'Result> -> 'T
val supervisor : MailboxProcessor<Message>
val x : MailboxProcessor<Message>
val start : Message
val repl : AsyncReplyChannel<unit>
val failwith : message:string -> 'T
val loop : (bool -> Async<unit>)
val run : bool
val msg : Message
val mailbox : MailboxProcessor<Message>
val count : int
val url : bool * string
val not : value:bool -> bool
val set' : bool
type IDisposable =
member Dispose : unit -> unit
val urlCollector : MailboxProcessor<Message>
val y : MailboxProcessor<Message>
val loop : (int -> Async<unit>)
val msg : Message option
val message : Message
val u : string option
val crawler : (int -> MailboxProcessor<Message>)
Initializes a crawling agent.
val id : int
val inbox : MailboxProcessor<Message>
val loop : (unit -> Async<unit>)
val x : string option
val link : string
val result : Async<unit>
val crawlers : MailboxProcessor<Message> list
val i : int
val iter : action:('T -> unit) -> list:'T list -> unit
val ag : MailboxProcessor<Message>
Multiple items
type Async =
static member AsBeginEnd : computation:('Arg -> Async<'T>) -> ('Arg * AsyncCallback * obj -> IAsyncResult) * (IAsyncResult -> 'T) * (IAsyncResult -> unit)
static member AwaitEvent : event:IEvent<'Del,'T> * ?cancelAction:(unit -> unit) -> Async<'T> (requires delegate and 'Del :> Delegate)
static member AwaitIAsyncResult : iar:IAsyncResult * ?millisecondsTimeout:int -> Async<bool>
static member AwaitTask : task:Task -> Async<unit>
static member AwaitTask : task:Task<'T> -> Async<'T>
static member AwaitWaitHandle : waitHandle:WaitHandle * ?millisecondsTimeout:int -> Async<bool>
static member CancelDefaultToken : unit -> unit
static member Catch : computation:Async<'T> -> Async<Choice<'T,exn>>
static member Choice : computations:seq<Async<'T option>> -> Async<'T option>
static member FromBeginEnd : beginAction:(AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
...
--------------------
type Async<'T> =
static member Async.RunSynchronously : computation:Async<'T> * ?timeout:int * ?cancellationToken:Threading.CancellationToken -> 'T
More information