3 people like it.
Like the snippet!
Request throttling agent
Request throttling based on a time span and a request count. An `IDistributedCache` implementation is used which integrates nicely into ASP.NET Core.
E.g. you can throttle requests from a specific IP address to allow fifty requests within a period of thirty seconds at most.
1:
2:
3:
4:
5:
6:
7:
8:
9:
10:
11:
12:
13:
14:
15:
16:
17:
18:
19:
20:
21:
22:
23:
24:
25:
26:
27:
28:
29:
30:
31:
32:
33:
34:
35:
36:
37:
38:
39:
40:
41:
42:
43:
44:
45:
46:
47:
48:
49:
50:
51:
52:
53:
54:
55:
56:
57:
58:
59:
60:
61:
62:
63:
64:
65:
66:
67:
68:
69:
70:
71:
72:
73:
74:
75:
76:
77:
78:
79:
80:
81:
82:
83:
84:
85:
86:
87:
88:
89:
90:
91:
|
open Microsoft.Extensions.Options
open Microsoft.Extensions.Caching.Distributed
open Microsoft.Extensions.Caching.Memory
open MBrace.FsPickler
type IDistributedCache with
member self.SetValue(serializer, key, value, options) =
self.Set(key, serializer value, options)
member self.GetValue<'T>(deserializer: byte[] -> 'T, key) =
match self.Get(key) with
| null -> None
| obj -> Some (deserializer obj)
member self.SetValueAsync(serializer, key, value, options, token) =
self.SetAsync(key, serializer value, options, token)
member self.GetValueAsync<'T>(deserializer: byte[] -> 'T, key, token) = task {
match! self.GetAsync(key, token) with
| null -> return None
| obj -> return Some (deserializer(obj))
}
member self.SetAsyncValue(serializer, key, value, options) = async {
let! token = Async.CancellationToken
return! Async.AwaitTask (self.SetValueAsync(serializer, key, value, options, token))
}
member self.GetAsyncValue<'T>(deserializer, key) = async {
let! token = Async.CancellationToken
return! Async.AwaitTask (self.GetValueAsync<'T>(deserializer, key, token))
}
type Throttler (cache: IDistributedCache, cacheDuration, maxRequests) =
let serializer = FsPickler.CreateBinarySerializer()
let agent = MailboxProcessor.Start(fun agent ->
let rec loop () = async {
let! (cacheKey, reply: AsyncReplyChannel<bool>) = agent.Receive()
let! value = cache.GetAsyncValue(serializer.UnPickle, cacheKey)
let count, start, expiration =
match value with
| Some (start: System.DateTimeOffset, count) ->
let count' = count + 1
let expiration = start.Add(cacheDuration)
count', start, expiration
| None ->
let now = System.DateTimeOffset.UtcNow
let expiration = now.Add(cacheDuration)
1, now, expiration
do! cache.SetAsyncValue(
serializer.Pickle,
cacheKey,
(start, count),
DistributedCacheEntryOptions(
AbsoluteExpiration=expiration
)
)
reply.Reply(count > maxRequests)
return! loop ()
}
loop ()
)
member _.Get(cacheKey) =
agent.PostAndReply(fun channel -> (cacheKey, channel))
member _.GetAsync(cacheKey) =
agent.PostAndAsyncReply(fun channel -> (cacheKey, channel))
(* Example *)
// The cache is normally provided by DI in ASP.NET Core
let options = Options.Create(MemoryDistributedCacheOptions())
let cache = new MemoryDistributedCache(options)
let cacheDuration = System.TimeSpan.FromSeconds(1)
let maxRequests = 4
let throttler = Throttler(cache, cacheDuration, maxRequests)
// The cache key could be the IP address or the username or anything you want to provide as the limiting factor for the request.
for _ in 0..10 do
throttler.Get("hello")
|> printfn "%A"
System.Threading.Thread.Sleep 120
// Result:
//false
//false
//false
//false
//true
//true
//true
//false
//false
//false
//false
|
namespace Microsoft
namespace Microsoft.Extensions
namespace Microsoft.Extensions.Caching
namespace Microsoft.Extensions.Caching.Distributed
namespace Microsoft.Extensions.Caching.Memory
namespace MBrace
namespace MBrace.FsPickler
type IDistributedCache =
member Get : key:string -> byte[]
member GetAsync : key:string * ?token:CancellationToken -> Task<byte[]>
member Refresh : key:string -> unit
member RefreshAsync : key:string * ?token:CancellationToken -> Task
member Remove : key:string -> unit
member RemoveAsync : key:string * ?token:CancellationToken -> Task
member Set : key:string * value:byte[] * options:DistributedCacheEntryOptions -> unit
member SetAsync : key:string * value:byte[] * options:DistributedCacheEntryOptions * ?token:CancellationToken -> Task
val self : IDistributedCache
val serializer : ('a -> byte [])
val key : string
val value : 'a
val options : DistributedCacheEntryOptions
(extension) IDistributedCache.Set(key: string, value: byte []) : unit
IDistributedCache.Set(key: string, value: byte [], options: DistributedCacheEntryOptions) : unit
val deserializer : (byte [] -> 'T)
Multiple items
val byte : value:'T -> byte (requires member op_Explicit)
--------------------
type byte = System.Byte
IDistributedCache.Get(key: string) : byte []
union case Option.None: Option<'T>
Multiple items
val obj : byte []
--------------------
type obj = System.Object
union case Option.Some: Value: 'T -> Option<'T>
val token : System.Threading.CancellationToken
(extension) IDistributedCache.SetAsync(key: string, value: byte [],?token: System.Threading.CancellationToken) : System.Threading.Tasks.Task
IDistributedCache.SetAsync(key: string, value: byte [], options: DistributedCacheEntryOptions,?token: System.Threading.CancellationToken) : System.Threading.Tasks.Task
IDistributedCache.GetAsync(key: string,?token: System.Threading.CancellationToken) : System.Threading.Tasks.Task<byte []>
type obj = System.Object
val async : AsyncBuilder
Multiple items
type Async =
static member AsBeginEnd : computation:('Arg -> Async<'T>) -> ('Arg * AsyncCallback * obj -> IAsyncResult) * (IAsyncResult -> 'T) * (IAsyncResult -> unit)
static member AwaitEvent : event:IEvent<'Del,'T> * ?cancelAction:(unit -> unit) -> Async<'T> (requires delegate and 'Del :> Delegate)
static member AwaitIAsyncResult : iar:IAsyncResult * ?millisecondsTimeout:int -> Async<bool>
static member AwaitTask : task:Task -> Async<unit>
static member AwaitTask : task:Task<'T> -> Async<'T>
static member AwaitWaitHandle : waitHandle:WaitHandle * ?millisecondsTimeout:int -> Async<bool>
static member CancelDefaultToken : unit -> unit
static member Catch : computation:Async<'T> -> Async<Choice<'T,exn>>
static member Choice : computations:seq<Async<'T option>> -> Async<'T option>
static member FromBeginEnd : beginAction:(AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
...
--------------------
type Async<'T> =
property Async.CancellationToken: Async<System.Threading.CancellationToken> with get
static member Async.AwaitTask : task:System.Threading.Tasks.Task -> Async<unit>
static member Async.AwaitTask : task:System.Threading.Tasks.Task<'T> -> Async<'T>
member IDistributedCache.SetValueAsync : serializer:('a -> byte []) * key:string * value:'a * options:DistributedCacheEntryOptions * token:System.Threading.CancellationToken -> System.Threading.Tasks.Task
member IDistributedCache.GetValueAsync : deserializer:(byte [] -> 'T) * key:string * token:System.Threading.CancellationToken -> System.Threading.Tasks.Task<(System.DateTimeOffset * int) option>
Multiple items
type Throttler =
new : cache:IDistributedCache * cacheDuration:TimeSpan * maxRequests:int -> Throttler
member Get : cacheKey:string -> bool
member GetAsync : cacheKey:string -> Async<bool>
--------------------
new : cache:IDistributedCache * cacheDuration:System.TimeSpan * maxRequests:int -> Throttler
val cache : IDistributedCache
val cacheDuration : System.TimeSpan
val maxRequests : int
val serializer : BinarySerializer
type FsPickler =
private new : unit -> FsPickler
static member Clone : value:'T * ?pickler:Pickler<'T> * ?streamingContext:StreamingContext -> 'T
static member ComputeHash : value:'T * ?hashFactory:IHashStreamFactory -> HashResult
static member ComputeSize : value:'T * ?pickler:Pickler<'T> -> int64
static member CreateBinarySerializer : ?forceLittleEndian:bool * ?typeConverter:ITypeNameConverter * ?picklerResolver:IPicklerResolver -> BinarySerializer
static member CreateObjectSizeCounter : ?encoding:Encoding * ?resetInterval:int64 -> ObjectSizeCounter
static member CreateXmlSerializer : ?typeConverter:ITypeNameConverter * ?indent:bool * ?picklerResolver:IPicklerResolver -> XmlSerializer
static member EnsureSerializable : graph:'T * ?failOnCloneableOnlyTypes:bool -> unit
static member GatherObjectsInGraph : graph:obj -> obj []
static member GatherTypesInObjectGraph : graph:obj -> Type []
...
static member FsPickler.CreateBinarySerializer : ?forceLittleEndian:bool * ?typeConverter:ITypeNameConverter * ?picklerResolver:IPicklerResolver -> BinarySerializer
val agent : MailboxProcessor<string * AsyncReplyChannel<bool>>
Multiple items
type MailboxProcessor<'Msg> =
interface IDisposable
new : body:(MailboxProcessor<'Msg> -> Async<unit>) * ?cancellationToken:CancellationToken -> MailboxProcessor<'Msg>
member Post : message:'Msg -> unit
member PostAndAsyncReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> Async<'Reply>
member PostAndReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> 'Reply
member PostAndTryAsyncReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> Async<'Reply option>
member Receive : ?timeout:int -> Async<'Msg>
member Scan : scanner:('Msg -> Async<'T> option) * ?timeout:int -> Async<'T>
member Start : unit -> unit
member TryPostAndReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> 'Reply option
...
--------------------
new : body:(MailboxProcessor<'Msg> -> Async<unit>) * ?cancellationToken:System.Threading.CancellationToken -> MailboxProcessor<'Msg>
static member MailboxProcessor.Start : body:(MailboxProcessor<'Msg> -> Async<unit>) * ?cancellationToken:System.Threading.CancellationToken -> MailboxProcessor<'Msg>
val loop : (unit -> Async<'a>)
val cacheKey : string
val reply : AsyncReplyChannel<bool>
type AsyncReplyChannel<'Reply> =
member Reply : value:'Reply -> unit
type bool = System.Boolean
member MailboxProcessor.Receive : ?timeout:int -> Async<'Msg>
val value : (System.DateTimeOffset * int) option
member IDistributedCache.GetAsyncValue : deserializer:(byte [] -> 'T) * key:string -> Async<(System.DateTimeOffset * int) option>
member FsPicklerSerializer.UnPickle : data:byte [] * ?pickler:Pickler<'T> * ?streamingContext:System.Runtime.Serialization.StreamingContext * ?encoding:System.Text.Encoding -> 'T
val count : int
val start : System.DateTimeOffset
val expiration : System.DateTimeOffset
namespace System
Multiple items
type DateTimeOffset =
struct
new : dateTime:DateTime -> DateTimeOffset + 5 overloads
member Add : timeSpan:TimeSpan -> DateTimeOffset
member AddDays : days:float -> DateTimeOffset
member AddHours : hours:float -> DateTimeOffset
member AddMilliseconds : milliseconds:float -> DateTimeOffset
member AddMinutes : minutes:float -> DateTimeOffset
member AddMonths : months:int -> DateTimeOffset
member AddSeconds : seconds:float -> DateTimeOffset
member AddTicks : ticks:int64 -> DateTimeOffset
member AddYears : years:int -> DateTimeOffset
...
end
--------------------
System.DateTimeOffset ()
System.DateTimeOffset(dateTime: System.DateTime) : System.DateTimeOffset
System.DateTimeOffset(ticks: int64, offset: System.TimeSpan) : System.DateTimeOffset
System.DateTimeOffset(dateTime: System.DateTime, offset: System.TimeSpan) : System.DateTimeOffset
System.DateTimeOffset(year: int, month: int, day: int, hour: int, minute: int, second: int, offset: System.TimeSpan) : System.DateTimeOffset
System.DateTimeOffset(year: int, month: int, day: int, hour: int, minute: int, second: int, millisecond: int, offset: System.TimeSpan) : System.DateTimeOffset
System.DateTimeOffset(year: int, month: int, day: int, hour: int, minute: int, second: int, millisecond: int, calendar: System.Globalization.Calendar, offset: System.TimeSpan) : System.DateTimeOffset
val count' : int
System.DateTimeOffset.Add(timeSpan: System.TimeSpan) : System.DateTimeOffset
val now : System.DateTimeOffset
property System.DateTimeOffset.UtcNow: System.DateTimeOffset with get
member IDistributedCache.SetAsyncValue : serializer:('a -> byte []) * key:string * value:'a * options:DistributedCacheEntryOptions -> Async<unit>
member FsPicklerSerializer.Pickle : value:'T * ?pickler:Pickler<'T> * ?streamingContext:System.Runtime.Serialization.StreamingContext * ?encoding:System.Text.Encoding -> byte []
Multiple items
type DistributedCacheEntryOptions =
new : unit -> DistributedCacheEntryOptions
member AbsoluteExpiration : Nullable<DateTimeOffset> with get, set
member AbsoluteExpirationRelativeToNow : Nullable<TimeSpan> with get, set
member SlidingExpiration : Nullable<TimeSpan> with get, set
--------------------
DistributedCacheEntryOptions() : DistributedCacheEntryOptions
member AsyncReplyChannel.Reply : value:'Reply -> unit
member MailboxProcessor.PostAndReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> 'Reply
val channel : AsyncReplyChannel<bool>
member MailboxProcessor.PostAndAsyncReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> Async<'Reply>
val options : obj
Multiple items
type MemoryDistributedCacheOptions =
inherit MemoryCacheOptions
new : unit -> MemoryDistributedCacheOptions
--------------------
MemoryDistributedCacheOptions() : MemoryDistributedCacheOptions
val cache : MemoryDistributedCache
Multiple items
type MemoryDistributedCache =
new : optionsAccessor:IOptions<MemoryDistributedCacheOptions> -> MemoryDistributedCache + 1 overload
member Get : key:string -> byte[]
member GetAsync : key:string * ?token:CancellationToken -> Task<byte[]>
member Refresh : key:string -> unit
member RefreshAsync : key:string * ?token:CancellationToken -> Task
member Remove : key:string -> unit
member RemoveAsync : key:string * ?token:CancellationToken -> Task
member Set : key:string * value:byte[] * options:DistributedCacheEntryOptions -> unit
member SetAsync : key:string * value:byte[] * options:DistributedCacheEntryOptions * ?token:CancellationToken -> Task
--------------------
Multiple items
type TimeSpan =
struct
new : ticks:int64 -> TimeSpan + 3 overloads
member Add : ts:TimeSpan -> TimeSpan
member CompareTo : value:obj -> int + 1 overload
member Days : int
member Divide : divisor:float -> TimeSpan + 1 overload
member Duration : unit -> TimeSpan
member Equals : value:obj -> bool + 1 overload
member GetHashCode : unit -> int
member Hours : int
member Milliseconds : int
...
end
--------------------
System.TimeSpan ()
System.TimeSpan(ticks: int64) : System.TimeSpan
System.TimeSpan(hours: int, minutes: int, seconds: int) : System.TimeSpan
System.TimeSpan(days: int, hours: int, minutes: int, seconds: int) : System.TimeSpan
System.TimeSpan(days: int, hours: int, minutes: int, seconds: int, milliseconds: int) : System.TimeSpan
System.TimeSpan.FromSeconds(value: float) : System.TimeSpan
val throttler : Throttler
member Throttler.Get : cacheKey:string -> bool
val printfn : format:Printf.TextWriterFormat<'T> -> 'T
namespace System.Threading
Multiple items
type Thread =
inherit CriticalFinalizerObject
new : start:ThreadStart -> Thread + 3 overloads
member Abort : unit -> unit + 1 overload
member ApartmentState : ApartmentState with get, set
member CurrentCulture : CultureInfo with get, set
member CurrentUICulture : CultureInfo with get, set
member DisableComObjectEagerCleanup : unit -> unit
member ExecutionContext : ExecutionContext
member GetApartmentState : unit -> ApartmentState
member GetCompressedStack : unit -> CompressedStack
member GetHashCode : unit -> int
...
--------------------
System.Threading.Thread(start: System.Threading.ThreadStart) : System.Threading.Thread
System.Threading.Thread(start: System.Threading.ParameterizedThreadStart) : System.Threading.Thread
System.Threading.Thread(start: System.Threading.ThreadStart, maxStackSize: int) : System.Threading.Thread
System.Threading.Thread(start: System.Threading.ParameterizedThreadStart, maxStackSize: int) : System.Threading.Thread
System.Threading.Thread.Sleep(timeout: System.TimeSpan) : unit
System.Threading.Thread.Sleep(millisecondsTimeout: int) : unit
More information