7 people like it.

Asynchronous HTTP proxy with chunking and caching

This snippet shows two improvements to asynchronous HTTP proxy from: http://fssnip.net/6e. First extension is to process page in chunks (instead of downloading the entire content first). The second extension is to use simple agent-based in-memory cache for previously visited pages.

Common functions and declarations

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
13: 
14: 
15: 
16: 
// NOTE: This snippet uses System.Net extensions from: http://fssnip.net/6d
// (such as HttpListener.Start and WebClient.AsyncDownloadData)

// Location where the proxy copies content from
let root = "http://msdn.microsoft.com"

// Maps requests from local URL to target URL
let getProxyUrl (ctx:HttpListenerContext) = 
  Uri(root + ctx.Request.Url.PathAndQuery)

// Handle exception asynchronously - generate page with message
let asyncHandleError (ctx:HttpListenerContext) (e:exn) = async {
  use wr = new StreamWriter(ctx.Response.OutputStream)
  wr.Write("<h1>Request Failed</h1>")
  wr.Write("<p>" + e.Message + "</p>")
  ctx.Response.Close() }

Extension #1: Chunked proxy server

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
13: 
14: 
15: 
16: 
17: 
18: 
19: 
20: 
21: 
22: 
  /// Handles request using asynchronous workflows - The content
  /// is downloaded and sent to the caller in chunks, so the proxy
  /// is more efficient and doesn't read entire file in memory
  let asyncHandleRequest (ctx:HttpListenerContext) = async {
    // Initialize HTTP connection to the server
    let request = HttpWebRequest.Create(getProxyUrl(ctx))
    use! response = request.AsyncGetResponse()
    use stream = response.GetResponseStream()
    ctx.Response.SendChunked <- true

    // Asynchronous loop to copy data
    let count = ref 1
    let buffer = Array.zeroCreate 4096
    while count.Value > 0 do
      let! read = stream.AsyncRead(buffer, 0, buffer.Length)
      do! ctx.Response.OutputStream.AsyncWrite(buffer, 0, read)    
      count := read
    ctx.Response.Close() }

  // Start HTTP proxy that handles requests asynchronously using chunking
  let token = HttpListener.Start("http://localhost:8080/", asyncHandleRequest)
  token.Cancel()

Extension #2: Agent-based in-memory cache (Part 1)

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
13: 
14: 
15: 
16: 
17: 
18: 
19: 
20: 
  /// The cache supports messages for retrieving and adding content
  type CacheMessage =
    | TryGet of Uri * AsyncReplyChannel<option<byte[]>>
    | Add of Uri * byte[]

  /// Represents a thread-safe in-memory cache for web pages
  let cache = Agent.Start(fun agent -> async {
    // Store cached pages in a mutable dictionary
    let pages = new Dictionary<_, _>()
    while true do
      let! msg = agent.Receive()
      match msg with 
      | TryGet(url, repl) ->
          // Try to return page from the cache
          match pages.TryGetValue(url) with
          | true, data -> repl.Reply(Some(data))
          | _ -> repl.Reply(None)
      | Add(url, data) ->
          // Add downloaded page to the cache
          pages.[url] <- data })

Extension #2: Proxy server using cache (Part 2)

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
13: 
14: 
15: 
16: 
17: 
18: 
19: 
20: 
21: 
22: 
23: 
24: 
25: 
26: 
27: 
28: 
29: 
  /// Attempts to retrieve page from cache first. If the page 
  /// isn't cached already, download it and add it to the cache.
  let downloadUsingCache (url:Uri) = async {
    let! cached = cache.PostAndAsyncReply(fun ch -> TryGet(url, ch))
    match cached with 
    | Some(data) -> 
        // Return page from the cache
        return data
    | _ ->
        // Download page and add it to the cache
        let wc = new WebClient()
        let! data = wc.AsyncDownloadData(url)
        cache.Post(Add(url, data))
        return data }

  /// Handle proxy request using cache - Obtain the page content
  /// using helper function and send it to the output stream.
  /// Exceptions can still be handled easily using try ... with.
  let asyncHandleRequest (ctx:HttpListenerContext) = async {
    try
      let! data = downloadUsingCache (getProxyUrl ctx)
      do! ctx.Response.OutputStream.AsyncWrite(data)
      ctx.Response.Close() 
    with err ->
      do! asyncHandleError ctx err }

  // Start HTTP proxy that handles requests asynchronously using cache
  let token = HttpListener.Start("http://localhost:8080/", asyncHandleRequest)
  token.Cancel()
val root : string

Full name: Script.root
val getProxyUrl : ctx:HttpListenerContext -> Uri

Full name: Script.getProxyUrl
val ctx : HttpListenerContext
type HttpListenerContext =
  member Request : HttpListenerRequest
  member Response : HttpListenerResponse
  member User : IPrincipal

Full name: System.Net.HttpListenerContext
Multiple items
type Uri =
  new : uriString:string -> Uri + 5 overloads
  member AbsolutePath : string
  member AbsoluteUri : string
  member Authority : string
  member DnsSafeHost : string
  member Equals : comparand:obj -> bool
  member Fragment : string
  member GetComponents : components:UriComponents * format:UriFormat -> string
  member GetHashCode : unit -> int
  member GetLeftPart : part:UriPartial -> string
  ...

Full name: System.Uri

--------------------
Uri(uriString: string) : unit
Uri(uriString: string, uriKind: UriKind) : unit
Uri(baseUri: Uri, relativeUri: string) : unit
Uri(baseUri: Uri, relativeUri: Uri) : unit
property HttpListenerContext.Request: HttpListenerRequest
property HttpListenerRequest.Url: Uri
property Uri.PathAndQuery: string
val asyncHandleError : ctx:HttpListenerContext -> e:exn -> Async<unit>

Full name: Script.asyncHandleError
val e : exn
type exn = Exception

Full name: Microsoft.FSharp.Core.exn
val async : AsyncBuilder

Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.async
val wr : StreamWriter
Multiple items
type StreamWriter =
  inherit TextWriter
  new : stream:Stream -> StreamWriter + 6 overloads
  member AutoFlush : bool with get, set
  member BaseStream : Stream
  member Close : unit -> unit
  member Encoding : Encoding
  member Flush : unit -> unit
  member Write : value:char -> unit + 3 overloads
  static val Null : StreamWriter

Full name: System.IO.StreamWriter

--------------------
StreamWriter(stream: Stream) : unit
StreamWriter(path: string) : unit
StreamWriter(stream: Stream, encoding: Text.Encoding) : unit
StreamWriter(path: string, append: bool) : unit
StreamWriter(stream: Stream, encoding: Text.Encoding, bufferSize: int) : unit
StreamWriter(path: string, append: bool, encoding: Text.Encoding) : unit
StreamWriter(path: string, append: bool, encoding: Text.Encoding, bufferSize: int) : unit
property HttpListenerContext.Response: HttpListenerResponse
property HttpListenerResponse.OutputStream: Stream
TextWriter.Write(value: obj) : unit
   (+0 other overloads)
TextWriter.Write(value: decimal) : unit
   (+0 other overloads)
TextWriter.Write(value: float) : unit
   (+0 other overloads)
TextWriter.Write(value: float32) : unit
   (+0 other overloads)
TextWriter.Write(value: uint64) : unit
   (+0 other overloads)
TextWriter.Write(value: int64) : unit
   (+0 other overloads)
TextWriter.Write(value: uint32) : unit
   (+0 other overloads)
TextWriter.Write(value: int) : unit
   (+0 other overloads)
TextWriter.Write(value: bool) : unit
   (+0 other overloads)
StreamWriter.Write(value: string) : unit
   (+0 other overloads)
property Exception.Message: string
HttpListenerResponse.Close() : unit
HttpListenerResponse.Close(responseEntity: byte [], willBlock: bool) : unit
val asyncHandleRequest : ctx:HttpListenerContext -> Async<unit>

Full name: Script.Chunked.asyncHandleRequest


 Handles request using asynchronous workflows - The content
 is downloaded and sent to the caller in chunks, so the proxy
 is more efficient and doesn't read entire file in memory
val request : WebRequest
type HttpWebRequest =
  inherit WebRequest
  member Abort : unit -> unit
  member Accept : string with get, set
  member AddRange : range:int -> unit + 7 overloads
  member Address : Uri
  member AllowAutoRedirect : bool with get, set
  member AllowWriteStreamBuffering : bool with get, set
  member AutomaticDecompression : DecompressionMethods with get, set
  member BeginGetRequestStream : callback:AsyncCallback * state:obj -> IAsyncResult
  member BeginGetResponse : callback:AsyncCallback * state:obj -> IAsyncResult
  member ClientCertificates : X509CertificateCollection with get, set
  ...

Full name: System.Net.HttpWebRequest
WebRequest.Create(requestUri: Uri) : WebRequest
WebRequest.Create(requestUriString: string) : WebRequest
val response : WebResponse
member WebRequest.AsyncGetResponse : unit -> Async<WebResponse>
val stream : Stream
WebResponse.GetResponseStream() : Stream
property HttpListenerResponse.SendChunked: bool
val count : int ref
Multiple items
val ref : value:'T -> 'T ref

Full name: Microsoft.FSharp.Core.Operators.ref

--------------------
type 'T ref = Ref<'T>

Full name: Microsoft.FSharp.Core.ref<_>
val buffer : byte []
type Array =
  member Clone : unit -> obj
  member CopyTo : array:Array * index:int -> unit + 1 overload
  member GetEnumerator : unit -> IEnumerator
  member GetLength : dimension:int -> int
  member GetLongLength : dimension:int -> int64
  member GetLowerBound : dimension:int -> int
  member GetUpperBound : dimension:int -> int
  member GetValue : [<ParamArray>] indices:int[] -> obj + 7 overloads
  member Initialize : unit -> unit
  member IsFixedSize : bool
  ...

Full name: System.Array
val zeroCreate : count:int -> 'T []

Full name: Microsoft.FSharp.Collections.Array.zeroCreate
property Ref.Value: int
val read : int
member Stream.AsyncRead : count:int -> Async<byte []>
member Stream.AsyncRead : buffer:byte [] * ?offset:int * ?count:int -> Async<int>
property Array.Length: int
member Stream.AsyncWrite : buffer:byte [] * ?offset:int * ?count:int -> Async<unit>
val token : CancellationTokenSource

Full name: Script.Chunked.token
Multiple items
type HttpListener =
  new : unit -> HttpListener
  member Abort : unit -> unit
  member AuthenticationSchemeSelectorDelegate : AuthenticationSchemeSelector with get, set
  member AuthenticationSchemes : AuthenticationSchemes with get, set
  member BeginGetContext : callback:AsyncCallback * state:obj -> IAsyncResult
  member Close : unit -> unit
  member DefaultServiceNames : ServiceNameCollection
  member EndGetContext : asyncResult:IAsyncResult -> HttpListenerContext
  member ExtendedProtectionPolicy : ExtendedProtectionPolicy with get, set
  member ExtendedProtectionSelectorDelegate : ExtendedProtectionSelector with get, set
  ...
  nested type ExtendedProtectionSelector

Full name: System.Net.HttpListener

--------------------
HttpListener() : unit
static member HttpListener.Start : url:string * f:(HttpListenerContext -> Async<unit>) -> CancellationTokenSource


 Starts an HTTP server on the specified URL with the
 specified asynchronous function for handling requests
CancellationTokenSource.Cancel() : unit
CancellationTokenSource.Cancel(throwOnFirstException: bool) : unit
type CacheMessage =
  | TryGet of Uri * AsyncReplyChannel<byte [] option>
  | Add of Uri * byte []

Full name: Script.Cached.CacheMessage


 The cache supports messages for retrieving and adding content
union case CacheMessage.TryGet: Uri * AsyncReplyChannel<byte [] option> -> CacheMessage
type AsyncReplyChannel<'Reply>
member Reply : value:'Reply -> unit

Full name: Microsoft.FSharp.Control.AsyncReplyChannel<_>
type 'T option = Option<'T>

Full name: Microsoft.FSharp.Core.option<_>
Multiple items
val byte : value:'T -> byte (requires member op_Explicit)

Full name: Microsoft.FSharp.Core.Operators.byte

--------------------
type byte = Byte

Full name: Microsoft.FSharp.Core.byte
union case CacheMessage.Add: Uri * byte [] -> CacheMessage
val cache : MailboxProcessor<CacheMessage>

Full name: Script.Cached.cache


 Represents a thread-safe in-memory cache for web pages
type Agent<'T> = MailboxProcessor<'T>

Full name: Script.Agent<_>


 Type alias for the MailboxProcessor type
static member MailboxProcessor.Start : body:(MailboxProcessor<'Msg> -> Async<unit>) * ?cancellationToken:CancellationToken -> MailboxProcessor<'Msg>
val agent : MailboxProcessor<CacheMessage>
val pages : Dictionary<Uri,byte []>
Multiple items
type Dictionary<'TKey,'TValue> =
  new : unit -> Dictionary<'TKey, 'TValue> + 5 overloads
  member Add : key:'TKey * value:'TValue -> unit
  member Clear : unit -> unit
  member Comparer : IEqualityComparer<'TKey>
  member ContainsKey : key:'TKey -> bool
  member ContainsValue : value:'TValue -> bool
  member Count : int
  member GetEnumerator : unit -> Enumerator<'TKey, 'TValue>
  member GetObjectData : info:SerializationInfo * context:StreamingContext -> unit
  member Item : 'TKey -> 'TValue with get, set
  ...
  nested type Enumerator
  nested type KeyCollection
  nested type ValueCollection

Full name: System.Collections.Generic.Dictionary<_,_>

--------------------
Dictionary() : unit
Dictionary(capacity: int) : unit
Dictionary(comparer: IEqualityComparer<'TKey>) : unit
Dictionary(dictionary: IDictionary<'TKey,'TValue>) : unit
Dictionary(capacity: int, comparer: IEqualityComparer<'TKey>) : unit
Dictionary(dictionary: IDictionary<'TKey,'TValue>, comparer: IEqualityComparer<'TKey>) : unit
val msg : CacheMessage
member MailboxProcessor.Receive : ?timeout:int -> Async<'Msg>
val url : Uri
val repl : AsyncReplyChannel<byte [] option>
Dictionary.TryGetValue(key: Uri, value: byref<byte []>) : bool
val data : byte []
member AsyncReplyChannel.Reply : value:'Reply -> unit
union case Option.Some: Value: 'T -> Option<'T>
union case Option.None: Option<'T>
val downloadUsingCache : url:Uri -> Async<byte []>

Full name: Script.Cached.downloadUsingCache


 Attempts to retrieve page from cache first. If the page
 isn't cached already, download it and add it to the cache.
val cached : byte [] option
member MailboxProcessor.PostAndAsyncReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> Async<'Reply>
val ch : AsyncReplyChannel<byte [] option>
val wc : WebClient
Multiple items
type WebClient =
  inherit Component
  new : unit -> WebClient
  member BaseAddress : string with get, set
  member CachePolicy : RequestCachePolicy with get, set
  member CancelAsync : unit -> unit
  member Credentials : ICredentials with get, set
  member DownloadData : address:string -> byte[] + 1 overload
  member DownloadDataAsync : address:Uri -> unit + 1 overload
  member DownloadFile : address:string * fileName:string -> unit + 1 overload
  member DownloadFileAsync : address:Uri * fileName:string -> unit + 1 overload
  member DownloadString : address:string -> string + 1 overload
  ...

Full name: System.Net.WebClient

--------------------
WebClient() : unit
member WebClient.AsyncDownloadData : uri:Uri -> Async<byte []>


 Asynchronously downloads data from the
member MailboxProcessor.Post : message:'Msg -> unit
val asyncHandleRequest : ctx:HttpListenerContext -> Async<unit>

Full name: Script.Cached.asyncHandleRequest


 Handle proxy request using cache - Obtain the page content
 using helper function and send it to the output stream.
 Exceptions can still be handled easily using try ... with.
val err : exn
val token : CancellationTokenSource

Full name: Script.Cached.token
Raw view Test code New version

More information

Link:http://fssnip.net/6f
Posted:12 years ago
Author:Tomas Petricek
Tags: async , asynchronous , proxy server , http