12 people like it.
Like the snippet!
Twitter Stream API
Twitter now provides a streaming API which can be used to obtain a continuous stream of tweets on any set of topics, locations, etc., in real time. Read the full details here.
It would be nice to convert this stream into an F# sequence so that it can be treated just as any other sequence if F#. This provides “composability”; separation of the generation of a sequence from its consumption.
Here is a snippet that does that.
1:
2:
3:
4:
5:
6:
7:
8:
9:
10:
11:
12:
13:
14:
15:
16:
17:
18:
19:
20:
21:
22:
23:
24:
25:
26:
27:
28:
29:
30:
31:
32:
33:
34:
35:
36:
37:
38:
39:
40:
41:
42:
43:
44:
45:
46:
47:
48:
49:
50:
51:
52:
53:
54:
55:
56:
57:
58:
59:
60:
61:
62:
63:
64:
65:
66:
67:
68:
69:
70:
71:
72:
73:
74:
75:
76:
77:
78:
79:
80:
81:
82:
83:
84:
|
//#load "AppId.fsx"
#r "System.Web.Extensions.dll"
#r "System.Runtime.Serialization.dll"
;;
open System
open System.Net
open System.Text
open System.IO
open System.Web.Script.Serialization
open System.Runtime.Serialization
open System.Globalization
;;
[<DataContract>]
type TweetUser = {
[<field:DataMember(Name="followers_count")>] Followers:int
[<field:DataMember(Name="screen_name")>] Name:string
[<field:DataMember(Name="id_str")>] Id:int
[<field:DataMember(Name="location")>] Location:string}
[<DataContract>]
type Tweet = {
[<field:DataMember(Name="text")>] Text:string
[<field:DataMember(Name="retweeted")>] IsRetweeted:bool
[<field:DataMember(Name="created_at")>] DateStr:string
[<field:DataMember(Name="user")>] User:TweetUser
[<field:DataMember(Name="geo")>] Geo:string}
let dser = Json.DataContractJsonSerializer(typeof<Tweet>)
let toStream (b:byte array) = new MemoryStream(b)
let twitterDateFormat = "ddd MMM dd HH:mm:ss zzz yyyy"; //"Fri Oct 07 18:41:13 +0000 2011"
let toDateTime s = DateTime.ParseExact(s, twitterDateFormat, CultureInfo.InvariantCulture)
let mutable connections = []
let tweets tracking =
let template = sprintf "https://stream.twitter.com/1/statuses/filter.json?track=%s"
let http = WebRequest.Create(template tracking) :?> HttpWebRequest
// http.Credentials <- NetworkCredential(AppId.twitterId, AppId.twitterPwd)
http.Credentials <- NetworkCredential("<twitter id>", "<twitter password>")
http.Timeout <- -1
let resp = http.GetResponse()
let str = resp.GetResponseStream()
connections <- http::connections
let readStream = new StreamReader(str, Encoding.UTF8)
readStream |> Seq.unfold(fun rs ->
try
let line = rs.ReadLine()
if line.StartsWith("{\"text") then
let tweet =
try
line
|> Encoding.UTF8.GetBytes
|> toStream
|> dser.ReadObject :?> Tweet
|> Some
with
| ex ->
printfn "Error %A" ex.Message
None
printfn "%A" tweet.Value
Some(tweet, rs)
else
Some(None, rs)
with
| ex ->
printfn "%s" ex.Message
None)
|> Seq.choose (fun t -> t)
let stopAll() =
connections |> Seq.iter (fun s -> printfn "closing..."; s.Abort())
;;
let tweetsPerMinute tracking =
tweets tracking
|> Seq.map (fun t -> toDateTime t.DateStr)
|> Seq.windowed 3
|> Seq.map (fun ts -> (Seq.max ts) - (Seq.min ts), ts.Length)
|> Seq.map (fun (interval,count) -> (float)count / interval.TotalMinutes)
(* Usage:
async {tweets("soccer") |> Seq.iter(fun t -> printfn "%A" t)} |> Async.Start
async {tweetsPerMinute "obama" |> Seq.iter (fun m -> printfn "[%0.0f per/min]" m) } |> Async.Start
stopAll()
*)
|
namespace System
namespace System.Net
namespace System.Text
namespace System.IO
namespace System.Web
namespace System.Web.Script
namespace System.Web.Script.Serialization
namespace System.Runtime
namespace System.Runtime.Serialization
namespace System.Globalization
Multiple items
type DataContractAttribute =
inherit Attribute
new : unit -> DataContractAttribute
member IsReference : bool with get, set
member Name : string with get, set
member Namespace : string with get, set
Full name: System.Runtime.Serialization.DataContractAttribute
--------------------
DataContractAttribute() : unit
type TweetUser =
{Followers: int;
Name: string;
Id: int;
Location: string;}
Full name: Script.TweetUser
Multiple items
type DataMemberAttribute =
inherit Attribute
new : unit -> DataMemberAttribute
member EmitDefaultValue : bool with get, set
member IsRequired : bool with get, set
member Name : string with get, set
member Order : int with get, set
Full name: System.Runtime.Serialization.DataMemberAttribute
--------------------
DataMemberAttribute() : unit
TweetUser.Followers: int
Multiple items
val int : value:'T -> int (requires member op_Explicit)
Full name: Microsoft.FSharp.Core.Operators.int
--------------------
type int = int32
Full name: Microsoft.FSharp.Core.int
--------------------
type int<'Measure> = int
Full name: Microsoft.FSharp.Core.int<_>
TweetUser.Name: string
Multiple items
val string : value:'T -> string
Full name: Microsoft.FSharp.Core.Operators.string
--------------------
type string = String
Full name: Microsoft.FSharp.Core.string
TweetUser.Id: int
TweetUser.Location: string
type Tweet =
{Text: string;
IsRetweeted: bool;
DateStr: string;
User: TweetUser;
Geo: string;}
Full name: Script.Tweet
Multiple items
Tweet.Text: string
--------------------
namespace System.Text
Tweet.IsRetweeted: bool
type bool = Boolean
Full name: Microsoft.FSharp.Core.bool
Tweet.DateStr: string
Tweet.User: TweetUser
Tweet.Geo: string
val dser : Json.DataContractJsonSerializer
Full name: Script.dser
namespace System.Runtime.Serialization.Json
Multiple items
type DataContractJsonSerializer =
inherit XmlObjectSerializer
new : type:Type -> DataContractJsonSerializer + 8 overloads
member DataContractSurrogate : IDataContractSurrogate
member IgnoreExtensionDataObject : bool
member IsStartObject : reader:XmlReader -> bool + 1 overload
member KnownTypes : ReadOnlyCollection<Type>
member MaxItemsInObjectGraph : int
member ReadObject : stream:Stream -> obj + 4 overloads
member WriteEndObject : writer:XmlWriter -> unit + 1 overload
member WriteObject : stream:Stream * graph:obj -> unit + 2 overloads
member WriteObjectContent : writer:XmlWriter * graph:obj -> unit + 1 overload
...
Full name: System.Runtime.Serialization.Json.DataContractJsonSerializer
--------------------
Json.DataContractJsonSerializer(type: Type) : unit
Json.DataContractJsonSerializer(type: Type, rootName: string) : unit
Json.DataContractJsonSerializer(type: Type, rootName: Xml.XmlDictionaryString) : unit
Json.DataContractJsonSerializer(type: Type, knownTypes: Collections.Generic.IEnumerable<Type>) : unit
Json.DataContractJsonSerializer(type: Type, rootName: string, knownTypes: Collections.Generic.IEnumerable<Type>) : unit
Json.DataContractJsonSerializer(type: Type, rootName: Xml.XmlDictionaryString, knownTypes: Collections.Generic.IEnumerable<Type>) : unit
Json.DataContractJsonSerializer(type: Type, knownTypes: Collections.Generic.IEnumerable<Type>, maxItemsInObjectGraph: int, ignoreExtensionDataObject: bool, dataContractSurrogate: IDataContractSurrogate, alwaysEmitTypeInformation: bool) : unit
Json.DataContractJsonSerializer(type: Type, rootName: string, knownTypes: Collections.Generic.IEnumerable<Type>, maxItemsInObjectGraph: int, ignoreExtensionDataObject: bool, dataContractSurrogate: IDataContractSurrogate, alwaysEmitTypeInformation: bool) : unit
Json.DataContractJsonSerializer(type: Type, rootName: Xml.XmlDictionaryString, knownTypes: Collections.Generic.IEnumerable<Type>, maxItemsInObjectGraph: int, ignoreExtensionDataObject: bool, dataContractSurrogate: IDataContractSurrogate, alwaysEmitTypeInformation: bool) : unit
val typeof<'T> : Type
Full name: Microsoft.FSharp.Core.Operators.typeof
val toStream : b:byte array -> MemoryStream
Full name: Script.toStream
val b : byte array
Multiple items
val byte : value:'T -> byte (requires member op_Explicit)
Full name: Microsoft.FSharp.Core.Operators.byte
--------------------
type byte = Byte
Full name: Microsoft.FSharp.Core.byte
type 'T array = 'T []
Full name: Microsoft.FSharp.Core.array<_>
Multiple items
type MemoryStream =
inherit Stream
new : unit -> MemoryStream + 6 overloads
member CanRead : bool
member CanSeek : bool
member CanWrite : bool
member Capacity : int with get, set
member Flush : unit -> unit
member GetBuffer : unit -> byte[]
member Length : int64
member Position : int64 with get, set
member Read : buffer:byte[] * offset:int * count:int -> int
...
Full name: System.IO.MemoryStream
--------------------
MemoryStream() : unit
MemoryStream(capacity: int) : unit
MemoryStream(buffer: byte []) : unit
MemoryStream(buffer: byte [], writable: bool) : unit
MemoryStream(buffer: byte [], index: int, count: int) : unit
MemoryStream(buffer: byte [], index: int, count: int, writable: bool) : unit
MemoryStream(buffer: byte [], index: int, count: int, writable: bool, publiclyVisible: bool) : unit
val twitterDateFormat : string
Full name: Script.twitterDateFormat
val toDateTime : s:string -> DateTime
Full name: Script.toDateTime
val s : string
Multiple items
type DateTime =
struct
new : ticks:int64 -> DateTime + 10 overloads
member Add : value:TimeSpan -> DateTime
member AddDays : value:float -> DateTime
member AddHours : value:float -> DateTime
member AddMilliseconds : value:float -> DateTime
member AddMinutes : value:float -> DateTime
member AddMonths : months:int -> DateTime
member AddSeconds : value:float -> DateTime
member AddTicks : value:int64 -> DateTime
member AddYears : value:int -> DateTime
...
end
Full name: System.DateTime
--------------------
DateTime()
(+0 other overloads)
DateTime(ticks: int64) : unit
(+0 other overloads)
DateTime(ticks: int64, kind: DateTimeKind) : unit
(+0 other overloads)
DateTime(year: int, month: int, day: int) : unit
(+0 other overloads)
DateTime(year: int, month: int, day: int, calendar: Calendar) : unit
(+0 other overloads)
DateTime(year: int, month: int, day: int, hour: int, minute: int, second: int) : unit
(+0 other overloads)
DateTime(year: int, month: int, day: int, hour: int, minute: int, second: int, kind: DateTimeKind) : unit
(+0 other overloads)
DateTime(year: int, month: int, day: int, hour: int, minute: int, second: int, calendar: Calendar) : unit
(+0 other overloads)
DateTime(year: int, month: int, day: int, hour: int, minute: int, second: int, millisecond: int) : unit
(+0 other overloads)
DateTime(year: int, month: int, day: int, hour: int, minute: int, second: int, millisecond: int, kind: DateTimeKind) : unit
(+0 other overloads)
DateTime.ParseExact(s: string, format: string, provider: IFormatProvider) : DateTime
DateTime.ParseExact(s: string, formats: string [], provider: IFormatProvider, style: DateTimeStyles) : DateTime
DateTime.ParseExact(s: string, format: string, provider: IFormatProvider, style: DateTimeStyles) : DateTime
Multiple items
type CultureInfo =
new : name:string -> CultureInfo + 3 overloads
member Calendar : Calendar
member ClearCachedData : unit -> unit
member Clone : unit -> obj
member CompareInfo : CompareInfo
member CultureTypes : CultureTypes
member DateTimeFormat : DateTimeFormatInfo with get, set
member DisplayName : string
member EnglishName : string
member Equals : value:obj -> bool
...
Full name: System.Globalization.CultureInfo
--------------------
CultureInfo(name: string) : unit
CultureInfo(culture: int) : unit
CultureInfo(name: string, useUserOverride: bool) : unit
CultureInfo(culture: int, useUserOverride: bool) : unit
property CultureInfo.InvariantCulture: CultureInfo
val mutable connections : HttpWebRequest list
Full name: Script.connections
val tweets : tracking:string -> seq<Tweet>
Full name: Script.tweets
val tracking : string
val template : (string -> string)
val sprintf : format:Printf.StringFormat<'T> -> 'T
Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.sprintf
val http : HttpWebRequest
type WebRequest =
inherit MarshalByRefObject
member Abort : unit -> unit
member AuthenticationLevel : AuthenticationLevel with get, set
member BeginGetRequestStream : callback:AsyncCallback * state:obj -> IAsyncResult
member BeginGetResponse : callback:AsyncCallback * state:obj -> IAsyncResult
member CachePolicy : RequestCachePolicy with get, set
member ConnectionGroupName : string with get, set
member ContentLength : int64 with get, set
member ContentType : string with get, set
member Credentials : ICredentials with get, set
member EndGetRequestStream : asyncResult:IAsyncResult -> Stream
...
Full name: System.Net.WebRequest
WebRequest.Create(requestUri: Uri) : WebRequest
WebRequest.Create(requestUriString: string) : WebRequest
type HttpWebRequest =
inherit WebRequest
member Abort : unit -> unit
member Accept : string with get, set
member AddRange : range:int -> unit + 7 overloads
member Address : Uri
member AllowAutoRedirect : bool with get, set
member AllowWriteStreamBuffering : bool with get, set
member AutomaticDecompression : DecompressionMethods with get, set
member BeginGetRequestStream : callback:AsyncCallback * state:obj -> IAsyncResult
member BeginGetResponse : callback:AsyncCallback * state:obj -> IAsyncResult
member ClientCertificates : X509CertificateCollection with get, set
...
Full name: System.Net.HttpWebRequest
property HttpWebRequest.Credentials: ICredentials
Multiple items
type NetworkCredential =
new : unit -> NetworkCredential + 4 overloads
member Domain : string with get, set
member GetCredential : uri:Uri * authType:string -> NetworkCredential + 1 overload
member Password : string with get, set
member SecurePassword : SecureString with get, set
member UserName : string with get, set
Full name: System.Net.NetworkCredential
--------------------
NetworkCredential() : unit
NetworkCredential(userName: string, password: string) : unit
NetworkCredential(userName: string, password: Security.SecureString) : unit
NetworkCredential(userName: string, password: string, domain: string) : unit
NetworkCredential(userName: string, password: Security.SecureString, domain: string) : unit
property HttpWebRequest.Timeout: int
val resp : WebResponse
HttpWebRequest.GetResponse() : WebResponse
val str : Stream
WebResponse.GetResponseStream() : Stream
val readStream : StreamReader
Multiple items
type StreamReader =
inherit TextReader
new : stream:Stream -> StreamReader + 9 overloads
member BaseStream : Stream
member Close : unit -> unit
member CurrentEncoding : Encoding
member DiscardBufferedData : unit -> unit
member EndOfStream : bool
member Peek : unit -> int
member Read : unit -> int + 1 overload
member ReadLine : unit -> string
member ReadToEnd : unit -> string
...
Full name: System.IO.StreamReader
--------------------
StreamReader(stream: Stream) : unit
StreamReader(path: string) : unit
StreamReader(stream: Stream, detectEncodingFromByteOrderMarks: bool) : unit
StreamReader(stream: Stream, encoding: Encoding) : unit
StreamReader(path: string, detectEncodingFromByteOrderMarks: bool) : unit
StreamReader(path: string, encoding: Encoding) : unit
StreamReader(stream: Stream, encoding: Encoding, detectEncodingFromByteOrderMarks: bool) : unit
StreamReader(path: string, encoding: Encoding, detectEncodingFromByteOrderMarks: bool) : unit
StreamReader(stream: Stream, encoding: Encoding, detectEncodingFromByteOrderMarks: bool, bufferSize: int) : unit
StreamReader(path: string, encoding: Encoding, detectEncodingFromByteOrderMarks: bool, bufferSize: int) : unit
type Encoding =
member BodyName : string
member Clone : unit -> obj
member CodePage : int
member DecoderFallback : DecoderFallback with get, set
member EncoderFallback : EncoderFallback with get, set
member EncodingName : string
member Equals : value:obj -> bool
member GetByteCount : chars:char[] -> int + 3 overloads
member GetBytes : chars:char[] -> byte[] + 5 overloads
member GetCharCount : bytes:byte[] -> int + 2 overloads
...
Full name: System.Text.Encoding
property Encoding.UTF8: Encoding
module Seq
from Microsoft.FSharp.Collections
val unfold : generator:('State -> ('T * 'State) option) -> state:'State -> seq<'T>
Full name: Microsoft.FSharp.Collections.Seq.unfold
val rs : StreamReader
val line : string
StreamReader.ReadLine() : string
String.StartsWith(value: string) : bool
String.StartsWith(value: string, comparisonType: StringComparison) : bool
String.StartsWith(value: string, ignoreCase: bool, culture: CultureInfo) : bool
val tweet : Tweet option
Encoding.GetBytes(s: string) : byte []
Encoding.GetBytes(chars: char []) : byte []
Encoding.GetBytes(chars: char [], index: int, count: int) : byte []
Encoding.GetBytes(chars: nativeptr<char>, charCount: int, bytes: nativeptr<byte>, byteCount: int) : int
Encoding.GetBytes(s: string, charIndex: int, charCount: int, bytes: byte [], byteIndex: int) : int
Encoding.GetBytes(chars: char [], charIndex: int, charCount: int, bytes: byte [], byteIndex: int) : int
Json.DataContractJsonSerializer.ReadObject(reader: Xml.XmlDictionaryReader) : obj
Json.DataContractJsonSerializer.ReadObject(reader: Xml.XmlReader) : obj
Json.DataContractJsonSerializer.ReadObject(stream: Stream) : obj
Json.DataContractJsonSerializer.ReadObject(reader: Xml.XmlDictionaryReader, verifyObjectName: bool) : obj
Json.DataContractJsonSerializer.ReadObject(reader: Xml.XmlReader, verifyObjectName: bool) : obj
union case Option.Some: Value: 'T -> Option<'T>
val ex : exn
val printfn : format:Printf.TextWriterFormat<'T> -> 'T
Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.printfn
property Exception.Message: string
union case Option.None: Option<'T>
property Option.Value: Tweet
val choose : chooser:('T -> 'U option) -> source:seq<'T> -> seq<'U>
Full name: Microsoft.FSharp.Collections.Seq.choose
val t : Tweet option
val stopAll : unit -> unit
Full name: Script.stopAll
val iter : action:('T -> unit) -> source:seq<'T> -> unit
Full name: Microsoft.FSharp.Collections.Seq.iter
val s : HttpWebRequest
HttpWebRequest.Abort() : unit
val tweetsPerMinute : tracking:string -> seq<float>
Full name: Script.tweetsPerMinute
val map : mapping:('T -> 'U) -> source:seq<'T> -> seq<'U>
Full name: Microsoft.FSharp.Collections.Seq.map
val t : Tweet
val windowed : windowSize:int -> source:seq<'T> -> seq<'T []>
Full name: Microsoft.FSharp.Collections.Seq.windowed
val ts : DateTime []
val max : source:seq<'T> -> 'T (requires comparison)
Full name: Microsoft.FSharp.Collections.Seq.max
val min : source:seq<'T> -> 'T (requires comparison)
Full name: Microsoft.FSharp.Collections.Seq.min
property Array.Length: int
val interval : TimeSpan
val count : int
Multiple items
val float : value:'T -> float (requires member op_Explicit)
Full name: Microsoft.FSharp.Core.Operators.float
--------------------
type float = Double
Full name: Microsoft.FSharp.Core.float
--------------------
type float<'Measure> = float
Full name: Microsoft.FSharp.Core.float<_>
property TimeSpan.TotalMinutes: float
More information