7 people like it.
Like the snippet!
Asynchronous HTTP proxy with chunking and caching
This snippet shows two improvements to asynchronous HTTP proxy from: http://fssnip.net/6e. First extension is to process page in chunks (instead of downloading the entire content first). The second extension is to use simple agent-based in-memory cache for previously visited pages.
1:
2:
3:
4:
5:
6:
7:
8:
9:
10:
11:
12:
13:
14:
15:
16:
|
// NOTE: This snippet uses System.Net extensions from: http://fssnip.net/6d
// (such as HttpListener.Start and WebClient.AsyncDownloadData)
// Location where the proxy copies content from
let root = "http://msdn.microsoft.com"
// Maps requests from local URL to target URL
let getProxyUrl (ctx:HttpListenerContext) =
Uri(root + ctx.Request.Url.PathAndQuery)
// Handle exception asynchronously - generate page with message
let asyncHandleError (ctx:HttpListenerContext) (e:exn) = async {
use wr = new StreamWriter(ctx.Response.OutputStream)
wr.Write("<h1>Request Failed</h1>")
wr.Write("<p>" + e.Message + "</p>")
ctx.Response.Close() }
|
1:
2:
3:
4:
5:
6:
7:
8:
9:
10:
11:
12:
13:
14:
15:
16:
17:
18:
19:
20:
21:
22:
|
/// Handles request using asynchronous workflows - The content
/// is downloaded and sent to the caller in chunks, so the proxy
/// is more efficient and doesn't read entire file in memory
let asyncHandleRequest (ctx:HttpListenerContext) = async {
// Initialize HTTP connection to the server
let request = HttpWebRequest.Create(getProxyUrl(ctx))
use! response = request.AsyncGetResponse()
use stream = response.GetResponseStream()
ctx.Response.SendChunked <- true
// Asynchronous loop to copy data
let count = ref 1
let buffer = Array.zeroCreate 4096
while count.Value > 0 do
let! read = stream.AsyncRead(buffer, 0, buffer.Length)
do! ctx.Response.OutputStream.AsyncWrite(buffer, 0, read)
count := read
ctx.Response.Close() }
// Start HTTP proxy that handles requests asynchronously using chunking
let token = HttpListener.Start("http://localhost:8080/", asyncHandleRequest)
token.Cancel()
|
1:
2:
3:
4:
5:
6:
7:
8:
9:
10:
11:
12:
13:
14:
15:
16:
17:
18:
19:
20:
|
/// The cache supports messages for retrieving and adding content
type CacheMessage =
| TryGet of Uri * AsyncReplyChannel<option<byte[]>>
| Add of Uri * byte[]
/// Represents a thread-safe in-memory cache for web pages
let cache = Agent.Start(fun agent -> async {
// Store cached pages in a mutable dictionary
let pages = new Dictionary<_, _>()
while true do
let! msg = agent.Receive()
match msg with
| TryGet(url, repl) ->
// Try to return page from the cache
match pages.TryGetValue(url) with
| true, data -> repl.Reply(Some(data))
| _ -> repl.Reply(None)
| Add(url, data) ->
// Add downloaded page to the cache
pages.[url] <- data })
|
1:
2:
3:
4:
5:
6:
7:
8:
9:
10:
11:
12:
13:
14:
15:
16:
17:
18:
19:
20:
21:
22:
23:
24:
25:
26:
27:
28:
29:
|
/// Attempts to retrieve page from cache first. If the page
/// isn't cached already, download it and add it to the cache.
let downloadUsingCache (url:Uri) = async {
let! cached = cache.PostAndAsyncReply(fun ch -> TryGet(url, ch))
match cached with
| Some(data) ->
// Return page from the cache
return data
| _ ->
// Download page and add it to the cache
let wc = new WebClient()
let! data = wc.AsyncDownloadData(url)
cache.Post(Add(url, data))
return data }
/// Handle proxy request using cache - Obtain the page content
/// using helper function and send it to the output stream.
/// Exceptions can still be handled easily using try ... with.
let asyncHandleRequest (ctx:HttpListenerContext) = async {
try
let! data = downloadUsingCache (getProxyUrl ctx)
do! ctx.Response.OutputStream.AsyncWrite(data)
ctx.Response.Close()
with err ->
do! asyncHandleError ctx err }
// Start HTTP proxy that handles requests asynchronously using cache
let token = HttpListener.Start("http://localhost:8080/", asyncHandleRequest)
token.Cancel()
|
val root : string
Full name: Script.root
val getProxyUrl : ctx:HttpListenerContext -> Uri
Full name: Script.getProxyUrl
val ctx : HttpListenerContext
type HttpListenerContext =
member Request : HttpListenerRequest
member Response : HttpListenerResponse
member User : IPrincipal
Full name: System.Net.HttpListenerContext
Multiple items
type Uri =
new : uriString:string -> Uri + 5 overloads
member AbsolutePath : string
member AbsoluteUri : string
member Authority : string
member DnsSafeHost : string
member Equals : comparand:obj -> bool
member Fragment : string
member GetComponents : components:UriComponents * format:UriFormat -> string
member GetHashCode : unit -> int
member GetLeftPart : part:UriPartial -> string
...
Full name: System.Uri
--------------------
Uri(uriString: string) : unit
Uri(uriString: string, uriKind: UriKind) : unit
Uri(baseUri: Uri, relativeUri: string) : unit
Uri(baseUri: Uri, relativeUri: Uri) : unit
property HttpListenerContext.Request: HttpListenerRequest
property HttpListenerRequest.Url: Uri
property Uri.PathAndQuery: string
val asyncHandleError : ctx:HttpListenerContext -> e:exn -> Async<unit>
Full name: Script.asyncHandleError
val e : exn
type exn = Exception
Full name: Microsoft.FSharp.Core.exn
val async : AsyncBuilder
Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.async
val wr : StreamWriter
Multiple items
type StreamWriter =
inherit TextWriter
new : stream:Stream -> StreamWriter + 6 overloads
member AutoFlush : bool with get, set
member BaseStream : Stream
member Close : unit -> unit
member Encoding : Encoding
member Flush : unit -> unit
member Write : value:char -> unit + 3 overloads
static val Null : StreamWriter
Full name: System.IO.StreamWriter
--------------------
StreamWriter(stream: Stream) : unit
StreamWriter(path: string) : unit
StreamWriter(stream: Stream, encoding: Text.Encoding) : unit
StreamWriter(path: string, append: bool) : unit
StreamWriter(stream: Stream, encoding: Text.Encoding, bufferSize: int) : unit
StreamWriter(path: string, append: bool, encoding: Text.Encoding) : unit
StreamWriter(path: string, append: bool, encoding: Text.Encoding, bufferSize: int) : unit
property HttpListenerContext.Response: HttpListenerResponse
property HttpListenerResponse.OutputStream: Stream
TextWriter.Write(value: obj) : unit
(+0 other overloads)
TextWriter.Write(value: decimal) : unit
(+0 other overloads)
TextWriter.Write(value: float) : unit
(+0 other overloads)
TextWriter.Write(value: float32) : unit
(+0 other overloads)
TextWriter.Write(value: uint64) : unit
(+0 other overloads)
TextWriter.Write(value: int64) : unit
(+0 other overloads)
TextWriter.Write(value: uint32) : unit
(+0 other overloads)
TextWriter.Write(value: int) : unit
(+0 other overloads)
TextWriter.Write(value: bool) : unit
(+0 other overloads)
StreamWriter.Write(value: string) : unit
(+0 other overloads)
property Exception.Message: string
HttpListenerResponse.Close() : unit
HttpListenerResponse.Close(responseEntity: byte [], willBlock: bool) : unit
val asyncHandleRequest : ctx:HttpListenerContext -> Async<unit>
Full name: Script.Chunked.asyncHandleRequest
Handles request using asynchronous workflows - The content
is downloaded and sent to the caller in chunks, so the proxy
is more efficient and doesn't read entire file in memory
val request : WebRequest
type HttpWebRequest =
inherit WebRequest
member Abort : unit -> unit
member Accept : string with get, set
member AddRange : range:int -> unit + 7 overloads
member Address : Uri
member AllowAutoRedirect : bool with get, set
member AllowWriteStreamBuffering : bool with get, set
member AutomaticDecompression : DecompressionMethods with get, set
member BeginGetRequestStream : callback:AsyncCallback * state:obj -> IAsyncResult
member BeginGetResponse : callback:AsyncCallback * state:obj -> IAsyncResult
member ClientCertificates : X509CertificateCollection with get, set
...
Full name: System.Net.HttpWebRequest
WebRequest.Create(requestUri: Uri) : WebRequest
WebRequest.Create(requestUriString: string) : WebRequest
val response : WebResponse
member WebRequest.AsyncGetResponse : unit -> Async<WebResponse>
val stream : Stream
WebResponse.GetResponseStream() : Stream
property HttpListenerResponse.SendChunked: bool
val count : int ref
Multiple items
val ref : value:'T -> 'T ref
Full name: Microsoft.FSharp.Core.Operators.ref
--------------------
type 'T ref = Ref<'T>
Full name: Microsoft.FSharp.Core.ref<_>
val buffer : byte []
type Array =
member Clone : unit -> obj
member CopyTo : array:Array * index:int -> unit + 1 overload
member GetEnumerator : unit -> IEnumerator
member GetLength : dimension:int -> int
member GetLongLength : dimension:int -> int64
member GetLowerBound : dimension:int -> int
member GetUpperBound : dimension:int -> int
member GetValue : [<ParamArray>] indices:int[] -> obj + 7 overloads
member Initialize : unit -> unit
member IsFixedSize : bool
...
Full name: System.Array
val zeroCreate : count:int -> 'T []
Full name: Microsoft.FSharp.Collections.Array.zeroCreate
property Ref.Value: int
val read : int
member Stream.AsyncRead : count:int -> Async<byte []>
member Stream.AsyncRead : buffer:byte [] * ?offset:int * ?count:int -> Async<int>
property Array.Length: int
member Stream.AsyncWrite : buffer:byte [] * ?offset:int * ?count:int -> Async<unit>
val token : CancellationTokenSource
Full name: Script.Chunked.token
Multiple items
type HttpListener =
new : unit -> HttpListener
member Abort : unit -> unit
member AuthenticationSchemeSelectorDelegate : AuthenticationSchemeSelector with get, set
member AuthenticationSchemes : AuthenticationSchemes with get, set
member BeginGetContext : callback:AsyncCallback * state:obj -> IAsyncResult
member Close : unit -> unit
member DefaultServiceNames : ServiceNameCollection
member EndGetContext : asyncResult:IAsyncResult -> HttpListenerContext
member ExtendedProtectionPolicy : ExtendedProtectionPolicy with get, set
member ExtendedProtectionSelectorDelegate : ExtendedProtectionSelector with get, set
...
nested type ExtendedProtectionSelector
Full name: System.Net.HttpListener
--------------------
HttpListener() : unit
static member HttpListener.Start : url:string * f:(HttpListenerContext -> Async<unit>) -> CancellationTokenSource
Starts an HTTP server on the specified URL with the
specified asynchronous function for handling requests
CancellationTokenSource.Cancel() : unit
CancellationTokenSource.Cancel(throwOnFirstException: bool) : unit
type CacheMessage =
| TryGet of Uri * AsyncReplyChannel<byte [] option>
| Add of Uri * byte []
Full name: Script.Cached.CacheMessage
The cache supports messages for retrieving and adding content
union case CacheMessage.TryGet: Uri * AsyncReplyChannel<byte [] option> -> CacheMessage
type AsyncReplyChannel<'Reply>
member Reply : value:'Reply -> unit
Full name: Microsoft.FSharp.Control.AsyncReplyChannel<_>
type 'T option = Option<'T>
Full name: Microsoft.FSharp.Core.option<_>
Multiple items
val byte : value:'T -> byte (requires member op_Explicit)
Full name: Microsoft.FSharp.Core.Operators.byte
--------------------
type byte = Byte
Full name: Microsoft.FSharp.Core.byte
union case CacheMessage.Add: Uri * byte [] -> CacheMessage
val cache : MailboxProcessor<CacheMessage>
Full name: Script.Cached.cache
Represents a thread-safe in-memory cache for web pages
type Agent<'T> = MailboxProcessor<'T>
Full name: Script.Agent<_>
Type alias for the MailboxProcessor type
static member MailboxProcessor.Start : body:(MailboxProcessor<'Msg> -> Async<unit>) * ?cancellationToken:CancellationToken -> MailboxProcessor<'Msg>
val agent : MailboxProcessor<CacheMessage>
val pages : Dictionary<Uri,byte []>
Multiple items
type Dictionary<'TKey,'TValue> =
new : unit -> Dictionary<'TKey, 'TValue> + 5 overloads
member Add : key:'TKey * value:'TValue -> unit
member Clear : unit -> unit
member Comparer : IEqualityComparer<'TKey>
member ContainsKey : key:'TKey -> bool
member ContainsValue : value:'TValue -> bool
member Count : int
member GetEnumerator : unit -> Enumerator<'TKey, 'TValue>
member GetObjectData : info:SerializationInfo * context:StreamingContext -> unit
member Item : 'TKey -> 'TValue with get, set
...
nested type Enumerator
nested type KeyCollection
nested type ValueCollection
Full name: System.Collections.Generic.Dictionary<_,_>
--------------------
Dictionary() : unit
Dictionary(capacity: int) : unit
Dictionary(comparer: IEqualityComparer<'TKey>) : unit
Dictionary(dictionary: IDictionary<'TKey,'TValue>) : unit
Dictionary(capacity: int, comparer: IEqualityComparer<'TKey>) : unit
Dictionary(dictionary: IDictionary<'TKey,'TValue>, comparer: IEqualityComparer<'TKey>) : unit
val msg : CacheMessage
member MailboxProcessor.Receive : ?timeout:int -> Async<'Msg>
val url : Uri
val repl : AsyncReplyChannel<byte [] option>
Dictionary.TryGetValue(key: Uri, value: byref<byte []>) : bool
val data : byte []
member AsyncReplyChannel.Reply : value:'Reply -> unit
union case Option.Some: Value: 'T -> Option<'T>
union case Option.None: Option<'T>
val downloadUsingCache : url:Uri -> Async<byte []>
Full name: Script.Cached.downloadUsingCache
Attempts to retrieve page from cache first. If the page
isn't cached already, download it and add it to the cache.
val cached : byte [] option
member MailboxProcessor.PostAndAsyncReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> Async<'Reply>
val ch : AsyncReplyChannel<byte [] option>
val wc : WebClient
Multiple items
type WebClient =
inherit Component
new : unit -> WebClient
member BaseAddress : string with get, set
member CachePolicy : RequestCachePolicy with get, set
member CancelAsync : unit -> unit
member Credentials : ICredentials with get, set
member DownloadData : address:string -> byte[] + 1 overload
member DownloadDataAsync : address:Uri -> unit + 1 overload
member DownloadFile : address:string * fileName:string -> unit + 1 overload
member DownloadFileAsync : address:Uri * fileName:string -> unit + 1 overload
member DownloadString : address:string -> string + 1 overload
...
Full name: System.Net.WebClient
--------------------
WebClient() : unit
member WebClient.AsyncDownloadData : uri:Uri -> Async<byte []>
Asynchronously downloads data from the
member MailboxProcessor.Post : message:'Msg -> unit
val asyncHandleRequest : ctx:HttpListenerContext -> Async<unit>
Full name: Script.Cached.asyncHandleRequest
Handle proxy request using cache - Obtain the page content
using helper function and send it to the output stream.
Exceptions can still be handled easily using try ... with.
val err : exn
val token : CancellationTokenSource
Full name: Script.Cached.token
More information