12 people like it.

Twitter Stream API

Twitter now provides a streaming API which can be used to obtain a continuous stream of tweets on any set of topics, locations, etc., in real time. Read the full details here. It would be nice to convert this stream into an F# sequence so that it can be treated just as any other sequence if F#. This provides “composability”; separation of the generation of a sequence from its consumption. Here is a snippet that does that.

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
13: 
14: 
15: 
16: 
17: 
18: 
19: 
20: 
21: 
22: 
23: 
24: 
25: 
26: 
27: 
28: 
29: 
30: 
31: 
32: 
33: 
34: 
35: 
36: 
37: 
38: 
39: 
40: 
41: 
42: 
43: 
44: 
45: 
46: 
47: 
48: 
49: 
50: 
51: 
52: 
53: 
54: 
55: 
56: 
57: 
58: 
59: 
60: 
61: 
62: 
63: 
64: 
65: 
66: 
67: 
68: 
69: 
70: 
71: 
72: 
73: 
74: 
75: 
76: 
77: 
78: 
79: 
80: 
81: 
82: 
83: 
84: 
//#load "AppId.fsx"
#r "System.Web.Extensions.dll"
#r "System.Runtime.Serialization.dll"
;;
open System
open System.Net
open System.Text
open System.IO
open System.Web.Script.Serialization
open System.Runtime.Serialization
open System.Globalization
;;
[<DataContract>]
type TweetUser = {
    [<field:DataMember(Name="followers_count")>] Followers:int
    [<field:DataMember(Name="screen_name")>] Name:string
    [<field:DataMember(Name="id_str")>] Id:int
    [<field:DataMember(Name="location")>] Location:string}

[<DataContract>]
type Tweet = {
     [<field:DataMember(Name="text")>] Text:string
     [<field:DataMember(Name="retweeted")>] IsRetweeted:bool
     [<field:DataMember(Name="created_at")>] DateStr:string
     [<field:DataMember(Name="user")>] User:TweetUser
     [<field:DataMember(Name="geo")>] Geo:string}

let dser = Json.DataContractJsonSerializer(typeof<Tweet>)
let toStream (b:byte array) = new MemoryStream(b)
let twitterDateFormat = "ddd MMM dd HH:mm:ss zzz yyyy"; //"Fri Oct 07 18:41:13 +0000 2011"
let toDateTime s = DateTime.ParseExact(s, twitterDateFormat, CultureInfo.InvariantCulture)
let mutable connections = []


let tweets tracking =
    let template = sprintf "https://stream.twitter.com/1/statuses/filter.json?track=%s"
    let http = WebRequest.Create(template tracking) :?> HttpWebRequest
//    http.Credentials <- NetworkCredential(AppId.twitterId, AppId.twitterPwd)
    http.Credentials <- NetworkCredential("<twitter id>", "<twitter password>")
    http.Timeout <- -1
    let resp = http.GetResponse()
    let str = resp.GetResponseStream()
    connections <- http::connections
    let readStream = new StreamReader(str, Encoding.UTF8)
    readStream |> Seq.unfold(fun rs ->
            try
                let line = rs.ReadLine() 
                if line.StartsWith("{\"text")  then 
                    let tweet =
                        try
                            line
                            |> Encoding.UTF8.GetBytes
                            |> toStream
                            |> dser.ReadObject :?> Tweet
                            |> Some
                        with
                        | ex -> 
                            printfn "Error %A" ex.Message
                            None
                    printfn "%A" tweet.Value
                    Some(tweet, rs)
                else
                    Some(None, rs)
            with
            | ex -> 
                printfn "%s" ex.Message
                None)
    |> Seq.choose (fun t -> t)

let stopAll() =
    connections |> Seq.iter (fun s -> printfn "closing..."; s.Abort())
;;
let tweetsPerMinute tracking =
    tweets tracking
    |> Seq.map (fun t -> toDateTime t.DateStr)
    |> Seq.windowed 3
    |> Seq.map (fun ts -> (Seq.max ts) - (Seq.min ts), ts.Length)
    |> Seq.map (fun (interval,count) -> (float)count / interval.TotalMinutes)

(* Usage:
async {tweets("soccer") |> Seq.iter(fun t -> printfn "%A" t)} |> Async.Start
async {tweetsPerMinute "obama" |> Seq.iter (fun m -> printfn "[%0.0f per/min]" m) } |> Async.Start
stopAll()
*)
namespace System
namespace System.Net
namespace System.Text
namespace System.IO
namespace System.Web
namespace System.Web.Script
namespace System.Web.Script.Serialization
namespace System.Runtime
namespace System.Runtime.Serialization
namespace System.Globalization
Multiple items
type DataContractAttribute =
  inherit Attribute
  new : unit -> DataContractAttribute
  member IsReference : bool with get, set
  member Name : string with get, set
  member Namespace : string with get, set

Full name: System.Runtime.Serialization.DataContractAttribute

--------------------
DataContractAttribute() : unit
type TweetUser =
  {Followers: int;
   Name: string;
   Id: int;
   Location: string;}

Full name: Script.TweetUser
Multiple items
type DataMemberAttribute =
  inherit Attribute
  new : unit -> DataMemberAttribute
  member EmitDefaultValue : bool with get, set
  member IsRequired : bool with get, set
  member Name : string with get, set
  member Order : int with get, set

Full name: System.Runtime.Serialization.DataMemberAttribute

--------------------
DataMemberAttribute() : unit
TweetUser.Followers: int
Multiple items
val int : value:'T -> int (requires member op_Explicit)

Full name: Microsoft.FSharp.Core.Operators.int

--------------------
type int = int32

Full name: Microsoft.FSharp.Core.int

--------------------
type int<'Measure> = int

Full name: Microsoft.FSharp.Core.int<_>
TweetUser.Name: string
Multiple items
val string : value:'T -> string

Full name: Microsoft.FSharp.Core.Operators.string

--------------------
type string = String

Full name: Microsoft.FSharp.Core.string
TweetUser.Id: int
TweetUser.Location: string
type Tweet =
  {Text: string;
   IsRetweeted: bool;
   DateStr: string;
   User: TweetUser;
   Geo: string;}

Full name: Script.Tweet
Multiple items
Tweet.Text: string

--------------------
namespace System.Text
Tweet.IsRetweeted: bool
type bool = Boolean

Full name: Microsoft.FSharp.Core.bool
Tweet.DateStr: string
Tweet.User: TweetUser
Tweet.Geo: string
val dser : Json.DataContractJsonSerializer

Full name: Script.dser
namespace System.Runtime.Serialization.Json
Multiple items
type DataContractJsonSerializer =
  inherit XmlObjectSerializer
  new : type:Type -> DataContractJsonSerializer + 8 overloads
  member DataContractSurrogate : IDataContractSurrogate
  member IgnoreExtensionDataObject : bool
  member IsStartObject : reader:XmlReader -> bool + 1 overload
  member KnownTypes : ReadOnlyCollection<Type>
  member MaxItemsInObjectGraph : int
  member ReadObject : stream:Stream -> obj + 4 overloads
  member WriteEndObject : writer:XmlWriter -> unit + 1 overload
  member WriteObject : stream:Stream * graph:obj -> unit + 2 overloads
  member WriteObjectContent : writer:XmlWriter * graph:obj -> unit + 1 overload
  ...

Full name: System.Runtime.Serialization.Json.DataContractJsonSerializer

--------------------
Json.DataContractJsonSerializer(type: Type) : unit
Json.DataContractJsonSerializer(type: Type, rootName: string) : unit
Json.DataContractJsonSerializer(type: Type, rootName: Xml.XmlDictionaryString) : unit
Json.DataContractJsonSerializer(type: Type, knownTypes: Collections.Generic.IEnumerable<Type>) : unit
Json.DataContractJsonSerializer(type: Type, rootName: string, knownTypes: Collections.Generic.IEnumerable<Type>) : unit
Json.DataContractJsonSerializer(type: Type, rootName: Xml.XmlDictionaryString, knownTypes: Collections.Generic.IEnumerable<Type>) : unit
Json.DataContractJsonSerializer(type: Type, knownTypes: Collections.Generic.IEnumerable<Type>, maxItemsInObjectGraph: int, ignoreExtensionDataObject: bool, dataContractSurrogate: IDataContractSurrogate, alwaysEmitTypeInformation: bool) : unit
Json.DataContractJsonSerializer(type: Type, rootName: string, knownTypes: Collections.Generic.IEnumerable<Type>, maxItemsInObjectGraph: int, ignoreExtensionDataObject: bool, dataContractSurrogate: IDataContractSurrogate, alwaysEmitTypeInformation: bool) : unit
Json.DataContractJsonSerializer(type: Type, rootName: Xml.XmlDictionaryString, knownTypes: Collections.Generic.IEnumerable<Type>, maxItemsInObjectGraph: int, ignoreExtensionDataObject: bool, dataContractSurrogate: IDataContractSurrogate, alwaysEmitTypeInformation: bool) : unit
val typeof<'T> : Type

Full name: Microsoft.FSharp.Core.Operators.typeof
val toStream : b:byte array -> MemoryStream

Full name: Script.toStream
val b : byte array
Multiple items
val byte : value:'T -> byte (requires member op_Explicit)

Full name: Microsoft.FSharp.Core.Operators.byte

--------------------
type byte = Byte

Full name: Microsoft.FSharp.Core.byte
type 'T array = 'T []

Full name: Microsoft.FSharp.Core.array<_>
Multiple items
type MemoryStream =
  inherit Stream
  new : unit -> MemoryStream + 6 overloads
  member CanRead : bool
  member CanSeek : bool
  member CanWrite : bool
  member Capacity : int with get, set
  member Flush : unit -> unit
  member GetBuffer : unit -> byte[]
  member Length : int64
  member Position : int64 with get, set
  member Read : buffer:byte[] * offset:int * count:int -> int
  ...

Full name: System.IO.MemoryStream

--------------------
MemoryStream() : unit
MemoryStream(capacity: int) : unit
MemoryStream(buffer: byte []) : unit
MemoryStream(buffer: byte [], writable: bool) : unit
MemoryStream(buffer: byte [], index: int, count: int) : unit
MemoryStream(buffer: byte [], index: int, count: int, writable: bool) : unit
MemoryStream(buffer: byte [], index: int, count: int, writable: bool, publiclyVisible: bool) : unit
val twitterDateFormat : string

Full name: Script.twitterDateFormat
val toDateTime : s:string -> DateTime

Full name: Script.toDateTime
val s : string
Multiple items
type DateTime =
  struct
    new : ticks:int64 -> DateTime + 10 overloads
    member Add : value:TimeSpan -> DateTime
    member AddDays : value:float -> DateTime
    member AddHours : value:float -> DateTime
    member AddMilliseconds : value:float -> DateTime
    member AddMinutes : value:float -> DateTime
    member AddMonths : months:int -> DateTime
    member AddSeconds : value:float -> DateTime
    member AddTicks : value:int64 -> DateTime
    member AddYears : value:int -> DateTime
    ...
  end

Full name: System.DateTime

--------------------
DateTime()
   (+0 other overloads)
DateTime(ticks: int64) : unit
   (+0 other overloads)
DateTime(ticks: int64, kind: DateTimeKind) : unit
   (+0 other overloads)
DateTime(year: int, month: int, day: int) : unit
   (+0 other overloads)
DateTime(year: int, month: int, day: int, calendar: Calendar) : unit
   (+0 other overloads)
DateTime(year: int, month: int, day: int, hour: int, minute: int, second: int) : unit
   (+0 other overloads)
DateTime(year: int, month: int, day: int, hour: int, minute: int, second: int, kind: DateTimeKind) : unit
   (+0 other overloads)
DateTime(year: int, month: int, day: int, hour: int, minute: int, second: int, calendar: Calendar) : unit
   (+0 other overloads)
DateTime(year: int, month: int, day: int, hour: int, minute: int, second: int, millisecond: int) : unit
   (+0 other overloads)
DateTime(year: int, month: int, day: int, hour: int, minute: int, second: int, millisecond: int, kind: DateTimeKind) : unit
   (+0 other overloads)
DateTime.ParseExact(s: string, format: string, provider: IFormatProvider) : DateTime
DateTime.ParseExact(s: string, formats: string [], provider: IFormatProvider, style: DateTimeStyles) : DateTime
DateTime.ParseExact(s: string, format: string, provider: IFormatProvider, style: DateTimeStyles) : DateTime
Multiple items
type CultureInfo =
  new : name:string -> CultureInfo + 3 overloads
  member Calendar : Calendar
  member ClearCachedData : unit -> unit
  member Clone : unit -> obj
  member CompareInfo : CompareInfo
  member CultureTypes : CultureTypes
  member DateTimeFormat : DateTimeFormatInfo with get, set
  member DisplayName : string
  member EnglishName : string
  member Equals : value:obj -> bool
  ...

Full name: System.Globalization.CultureInfo

--------------------
CultureInfo(name: string) : unit
CultureInfo(culture: int) : unit
CultureInfo(name: string, useUserOverride: bool) : unit
CultureInfo(culture: int, useUserOverride: bool) : unit
property CultureInfo.InvariantCulture: CultureInfo
val mutable connections : HttpWebRequest list

Full name: Script.connections
val tweets : tracking:string -> seq<Tweet>

Full name: Script.tweets
val tracking : string
val template : (string -> string)
val sprintf : format:Printf.StringFormat<'T> -> 'T

Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.sprintf
val http : HttpWebRequest
type WebRequest =
  inherit MarshalByRefObject
  member Abort : unit -> unit
  member AuthenticationLevel : AuthenticationLevel with get, set
  member BeginGetRequestStream : callback:AsyncCallback * state:obj -> IAsyncResult
  member BeginGetResponse : callback:AsyncCallback * state:obj -> IAsyncResult
  member CachePolicy : RequestCachePolicy with get, set
  member ConnectionGroupName : string with get, set
  member ContentLength : int64 with get, set
  member ContentType : string with get, set
  member Credentials : ICredentials with get, set
  member EndGetRequestStream : asyncResult:IAsyncResult -> Stream
  ...

Full name: System.Net.WebRequest
WebRequest.Create(requestUri: Uri) : WebRequest
WebRequest.Create(requestUriString: string) : WebRequest
type HttpWebRequest =
  inherit WebRequest
  member Abort : unit -> unit
  member Accept : string with get, set
  member AddRange : range:int -> unit + 7 overloads
  member Address : Uri
  member AllowAutoRedirect : bool with get, set
  member AllowWriteStreamBuffering : bool with get, set
  member AutomaticDecompression : DecompressionMethods with get, set
  member BeginGetRequestStream : callback:AsyncCallback * state:obj -> IAsyncResult
  member BeginGetResponse : callback:AsyncCallback * state:obj -> IAsyncResult
  member ClientCertificates : X509CertificateCollection with get, set
  ...

Full name: System.Net.HttpWebRequest
property HttpWebRequest.Credentials: ICredentials
Multiple items
type NetworkCredential =
  new : unit -> NetworkCredential + 4 overloads
  member Domain : string with get, set
  member GetCredential : uri:Uri * authType:string -> NetworkCredential + 1 overload
  member Password : string with get, set
  member SecurePassword : SecureString with get, set
  member UserName : string with get, set

Full name: System.Net.NetworkCredential

--------------------
NetworkCredential() : unit
NetworkCredential(userName: string, password: string) : unit
NetworkCredential(userName: string, password: Security.SecureString) : unit
NetworkCredential(userName: string, password: string, domain: string) : unit
NetworkCredential(userName: string, password: Security.SecureString, domain: string) : unit
property HttpWebRequest.Timeout: int
val resp : WebResponse
HttpWebRequest.GetResponse() : WebResponse
val str : Stream
WebResponse.GetResponseStream() : Stream
val readStream : StreamReader
Multiple items
type StreamReader =
  inherit TextReader
  new : stream:Stream -> StreamReader + 9 overloads
  member BaseStream : Stream
  member Close : unit -> unit
  member CurrentEncoding : Encoding
  member DiscardBufferedData : unit -> unit
  member EndOfStream : bool
  member Peek : unit -> int
  member Read : unit -> int + 1 overload
  member ReadLine : unit -> string
  member ReadToEnd : unit -> string
  ...

Full name: System.IO.StreamReader

--------------------
StreamReader(stream: Stream) : unit
StreamReader(path: string) : unit
StreamReader(stream: Stream, detectEncodingFromByteOrderMarks: bool) : unit
StreamReader(stream: Stream, encoding: Encoding) : unit
StreamReader(path: string, detectEncodingFromByteOrderMarks: bool) : unit
StreamReader(path: string, encoding: Encoding) : unit
StreamReader(stream: Stream, encoding: Encoding, detectEncodingFromByteOrderMarks: bool) : unit
StreamReader(path: string, encoding: Encoding, detectEncodingFromByteOrderMarks: bool) : unit
StreamReader(stream: Stream, encoding: Encoding, detectEncodingFromByteOrderMarks: bool, bufferSize: int) : unit
StreamReader(path: string, encoding: Encoding, detectEncodingFromByteOrderMarks: bool, bufferSize: int) : unit
type Encoding =
  member BodyName : string
  member Clone : unit -> obj
  member CodePage : int
  member DecoderFallback : DecoderFallback with get, set
  member EncoderFallback : EncoderFallback with get, set
  member EncodingName : string
  member Equals : value:obj -> bool
  member GetByteCount : chars:char[] -> int + 3 overloads
  member GetBytes : chars:char[] -> byte[] + 5 overloads
  member GetCharCount : bytes:byte[] -> int + 2 overloads
  ...

Full name: System.Text.Encoding
property Encoding.UTF8: Encoding
module Seq

from Microsoft.FSharp.Collections
val unfold : generator:('State -> ('T * 'State) option) -> state:'State -> seq<'T>

Full name: Microsoft.FSharp.Collections.Seq.unfold
val rs : StreamReader
val line : string
StreamReader.ReadLine() : string
String.StartsWith(value: string) : bool
String.StartsWith(value: string, comparisonType: StringComparison) : bool
String.StartsWith(value: string, ignoreCase: bool, culture: CultureInfo) : bool
val tweet : Tweet option
Encoding.GetBytes(s: string) : byte []
Encoding.GetBytes(chars: char []) : byte []
Encoding.GetBytes(chars: char [], index: int, count: int) : byte []
Encoding.GetBytes(chars: nativeptr<char>, charCount: int, bytes: nativeptr<byte>, byteCount: int) : int
Encoding.GetBytes(s: string, charIndex: int, charCount: int, bytes: byte [], byteIndex: int) : int
Encoding.GetBytes(chars: char [], charIndex: int, charCount: int, bytes: byte [], byteIndex: int) : int
Json.DataContractJsonSerializer.ReadObject(reader: Xml.XmlDictionaryReader) : obj
Json.DataContractJsonSerializer.ReadObject(reader: Xml.XmlReader) : obj
Json.DataContractJsonSerializer.ReadObject(stream: Stream) : obj
Json.DataContractJsonSerializer.ReadObject(reader: Xml.XmlDictionaryReader, verifyObjectName: bool) : obj
Json.DataContractJsonSerializer.ReadObject(reader: Xml.XmlReader, verifyObjectName: bool) : obj
union case Option.Some: Value: 'T -> Option<'T>
val ex : exn
val printfn : format:Printf.TextWriterFormat<'T> -> 'T

Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.printfn
property Exception.Message: string
union case Option.None: Option<'T>
property Option.Value: Tweet
val choose : chooser:('T -> 'U option) -> source:seq<'T> -> seq<'U>

Full name: Microsoft.FSharp.Collections.Seq.choose
val t : Tweet option
val stopAll : unit -> unit

Full name: Script.stopAll
val iter : action:('T -> unit) -> source:seq<'T> -> unit

Full name: Microsoft.FSharp.Collections.Seq.iter
val s : HttpWebRequest
HttpWebRequest.Abort() : unit
val tweetsPerMinute : tracking:string -> seq<float>

Full name: Script.tweetsPerMinute
val map : mapping:('T -> 'U) -> source:seq<'T> -> seq<'U>

Full name: Microsoft.FSharp.Collections.Seq.map
val t : Tweet
val windowed : windowSize:int -> source:seq<'T> -> seq<'T []>

Full name: Microsoft.FSharp.Collections.Seq.windowed
val ts : DateTime []
val max : source:seq<'T> -> 'T (requires comparison)

Full name: Microsoft.FSharp.Collections.Seq.max
val min : source:seq<'T> -> 'T (requires comparison)

Full name: Microsoft.FSharp.Collections.Seq.min
property Array.Length: int
val interval : TimeSpan
val count : int
Multiple items
val float : value:'T -> float (requires member op_Explicit)

Full name: Microsoft.FSharp.Core.Operators.float

--------------------
type float = Double

Full name: Microsoft.FSharp.Core.float

--------------------
type float<'Measure> = float

Full name: Microsoft.FSharp.Core.float<_>
property TimeSpan.TotalMinutes: float

More information

Link:http://fssnip.net/8m
Posted:13 years ago
Author:Faisal Waris
Tags: twitter , stream api , seq.unfold