12 people like it.

Twitter Stream API

Twitter now provides a streaming API which can be used to obtain a continuous stream of tweets on any set of topics, locations, etc., in real time. Read the full details here. It would be nice to convert this stream into an F# sequence so that it can be treated just as any other sequence if F#. This provides “composability”; separation of the generation of a sequence from its consumption. Here is a snippet that does that.

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
13: 
14: 
15: 
16: 
17: 
18: 
19: 
20: 
21: 
22: 
23: 
24: 
25: 
26: 
27: 
28: 
29: 
30: 
31: 
32: 
33: 
34: 
35: 
36: 
37: 
38: 
39: 
40: 
41: 
42: 
43: 
44: 
45: 
46: 
47: 
48: 
49: 
50: 
51: 
52: 
53: 
54: 
55: 
56: 
57: 
58: 
59: 
60: 
61: 
62: 
63: 
64: 
//#load "AppId.fsx"
#r "System.Web.Extensions.dll"
#r "System.Runtime.Serialization.dll"
;;
open System
open System.Net
open System.Text
open System.IO
open System.Web.Script.Serialization
open System.Runtime.Serialization
;;
[<DataContract>]
type TweetUser = {
    [<field:DataMember(Name="followers_count")>] Followers:int
    [<field:DataMember(Name="screen_name")>] Name:string
    [<field:DataMember(Name="id_str")>] Id:int
    [<field:DataMember(Name="location")>] Location:string}

[<DataContract>]
type Tweet = {
     [<field:DataMember(Name="text")>] Text:string
     [<field:DataMember(Name="retweeted")>] IsRetweeted:bool
     [<field:DataMember(Name="created_at")>] DateStr:string
     [<field:DataMember(Name="user")>] User:TweetUser
     [<field:DataMember(Name="geo")>] Geo:string}

let dser = Json.DataContractJsonSerializer(typeof<Tweet>)
let mutable connections = []

let tweets(tracking) =
    let template = sprintf "https://stream.twitter.com/1/statuses/filter.json?track=%s"
    let http = WebRequest.Create(template tracking) :?> HttpWebRequest
//    http.Credentials <- NetworkCredential(AppId.twitterId, AppId.twitterPwd)
    http.Credentials <- NetworkCredential("<twitter id>", "<twitter password>")
    http.Timeout <- -1
    let resp = http.GetResponse()
    let str = resp.GetResponseStream()
    connections <- http::connections
    let readStream = new StreamReader(str, Encoding.UTF8)
    readStream |> Seq.unfold(fun rs ->
            try
                let line = rs.ReadLine() 
                if line.StartsWith("{\"text")  then 
                    use oStr = new MemoryStream(Encoding.UTF8.GetBytes(line))
                    let tweet = try dser.ReadObject(oStr) :?> Tweet |> Some with _ -> None

                    if tweet.IsSome then printfn "%A" tweet.Value

                    Some(tweet,rs)
                else
                    Some(None, rs)
            with
            | ex -> 
                printfn "%s" ex.Message
                None)
    |> Seq.choose (fun t -> t)

let stopAll() =
    connections |> Seq.iter (fun s -> printfn "closing..."; s.Abort())
;;
(*
tweets("scoccer") |> Seq.iter(fun t -> ())
stopAll()
*)
namespace System
namespace System.Net
namespace System.Text
namespace System.IO
namespace System.Web
namespace System.Web.Script
namespace System.Web.Script.Serialization
namespace System.Runtime
namespace System.Runtime.Serialization
Multiple items
type DataContractAttribute =
  inherit Attribute
  new : unit -> DataContractAttribute
  member IsReference : bool with get, set
  member Name : string with get, set
  member Namespace : string with get, set

Full name: System.Runtime.Serialization.DataContractAttribute

--------------------
DataContractAttribute() : unit
type TweetUser =
  {Followers: int;
   Name: string;
   Id: int;
   Location: string;}

Full name: Script.TweetUser
Multiple items
type DataMemberAttribute =
  inherit Attribute
  new : unit -> DataMemberAttribute
  member EmitDefaultValue : bool with get, set
  member IsRequired : bool with get, set
  member Name : string with get, set
  member Order : int with get, set

Full name: System.Runtime.Serialization.DataMemberAttribute

--------------------
DataMemberAttribute() : unit
TweetUser.Followers: int
Multiple items
val int : value:'T -> int (requires member op_Explicit)

Full name: Microsoft.FSharp.Core.Operators.int

--------------------
type int = int32

Full name: Microsoft.FSharp.Core.int

--------------------
type int<'Measure> = int

Full name: Microsoft.FSharp.Core.int<_>
TweetUser.Name: string
Multiple items
val string : value:'T -> string

Full name: Microsoft.FSharp.Core.Operators.string

--------------------
type string = String

Full name: Microsoft.FSharp.Core.string
TweetUser.Id: int
TweetUser.Location: string
type Tweet =
  {Text: string;
   IsRetweeted: bool;
   DateStr: string;
   User: TweetUser;
   Geo: string;}

Full name: Script.Tweet
Multiple items
Tweet.Text: string

--------------------
namespace System.Text
Tweet.IsRetweeted: bool
type bool = Boolean

Full name: Microsoft.FSharp.Core.bool
Tweet.DateStr: string
Tweet.User: TweetUser
Tweet.Geo: string
val dser : Json.DataContractJsonSerializer

Full name: Script.dser
namespace System.Runtime.Serialization.Json
Multiple items
type DataContractJsonSerializer =
  inherit XmlObjectSerializer
  new : type:Type -> DataContractJsonSerializer + 8 overloads
  member DataContractSurrogate : IDataContractSurrogate
  member IgnoreExtensionDataObject : bool
  member IsStartObject : reader:XmlReader -> bool + 1 overload
  member KnownTypes : ReadOnlyCollection<Type>
  member MaxItemsInObjectGraph : int
  member ReadObject : stream:Stream -> obj + 4 overloads
  member WriteEndObject : writer:XmlWriter -> unit + 1 overload
  member WriteObject : stream:Stream * graph:obj -> unit + 2 overloads
  member WriteObjectContent : writer:XmlWriter * graph:obj -> unit + 1 overload
  ...

Full name: System.Runtime.Serialization.Json.DataContractJsonSerializer

--------------------
Json.DataContractJsonSerializer(type: Type) : unit
Json.DataContractJsonSerializer(type: Type, rootName: string) : unit
Json.DataContractJsonSerializer(type: Type, rootName: Xml.XmlDictionaryString) : unit
Json.DataContractJsonSerializer(type: Type, knownTypes: Collections.Generic.IEnumerable<Type>) : unit
Json.DataContractJsonSerializer(type: Type, rootName: string, knownTypes: Collections.Generic.IEnumerable<Type>) : unit
Json.DataContractJsonSerializer(type: Type, rootName: Xml.XmlDictionaryString, knownTypes: Collections.Generic.IEnumerable<Type>) : unit
Json.DataContractJsonSerializer(type: Type, knownTypes: Collections.Generic.IEnumerable<Type>, maxItemsInObjectGraph: int, ignoreExtensionDataObject: bool, dataContractSurrogate: IDataContractSurrogate, alwaysEmitTypeInformation: bool) : unit
Json.DataContractJsonSerializer(type: Type, rootName: string, knownTypes: Collections.Generic.IEnumerable<Type>, maxItemsInObjectGraph: int, ignoreExtensionDataObject: bool, dataContractSurrogate: IDataContractSurrogate, alwaysEmitTypeInformation: bool) : unit
Json.DataContractJsonSerializer(type: Type, rootName: Xml.XmlDictionaryString, knownTypes: Collections.Generic.IEnumerable<Type>, maxItemsInObjectGraph: int, ignoreExtensionDataObject: bool, dataContractSurrogate: IDataContractSurrogate, alwaysEmitTypeInformation: bool) : unit
val typeof<'T> : Type

Full name: Microsoft.FSharp.Core.Operators.typeof
val mutable connections : HttpWebRequest list

Full name: Script.connections
val tweets : tracking:string -> seq<Tweet>

Full name: Script.tweets
val tracking : string
val template : (string -> string)
val sprintf : format:Printf.StringFormat<'T> -> 'T

Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.sprintf
val http : HttpWebRequest
type WebRequest =
  inherit MarshalByRefObject
  member Abort : unit -> unit
  member AuthenticationLevel : AuthenticationLevel with get, set
  member BeginGetRequestStream : callback:AsyncCallback * state:obj -> IAsyncResult
  member BeginGetResponse : callback:AsyncCallback * state:obj -> IAsyncResult
  member CachePolicy : RequestCachePolicy with get, set
  member ConnectionGroupName : string with get, set
  member ContentLength : int64 with get, set
  member ContentType : string with get, set
  member Credentials : ICredentials with get, set
  member EndGetRequestStream : asyncResult:IAsyncResult -> Stream
  ...

Full name: System.Net.WebRequest
WebRequest.Create(requestUri: Uri) : WebRequest
WebRequest.Create(requestUriString: string) : WebRequest
type HttpWebRequest =
  inherit WebRequest
  member Abort : unit -> unit
  member Accept : string with get, set
  member AddRange : range:int -> unit + 7 overloads
  member Address : Uri
  member AllowAutoRedirect : bool with get, set
  member AllowWriteStreamBuffering : bool with get, set
  member AutomaticDecompression : DecompressionMethods with get, set
  member BeginGetRequestStream : callback:AsyncCallback * state:obj -> IAsyncResult
  member BeginGetResponse : callback:AsyncCallback * state:obj -> IAsyncResult
  member ClientCertificates : X509CertificateCollection with get, set
  ...

Full name: System.Net.HttpWebRequest
property HttpWebRequest.Credentials: ICredentials
Multiple items
type NetworkCredential =
  new : unit -> NetworkCredential + 4 overloads
  member Domain : string with get, set
  member GetCredential : uri:Uri * authType:string -> NetworkCredential + 1 overload
  member Password : string with get, set
  member SecurePassword : SecureString with get, set
  member UserName : string with get, set

Full name: System.Net.NetworkCredential

--------------------
NetworkCredential() : unit
NetworkCredential(userName: string, password: string) : unit
NetworkCredential(userName: string, password: Security.SecureString) : unit
NetworkCredential(userName: string, password: string, domain: string) : unit
NetworkCredential(userName: string, password: Security.SecureString, domain: string) : unit
property HttpWebRequest.Timeout: int
val resp : WebResponse
HttpWebRequest.GetResponse() : WebResponse
val str : Stream
WebResponse.GetResponseStream() : Stream
val readStream : StreamReader
Multiple items
type StreamReader =
  inherit TextReader
  new : stream:Stream -> StreamReader + 9 overloads
  member BaseStream : Stream
  member Close : unit -> unit
  member CurrentEncoding : Encoding
  member DiscardBufferedData : unit -> unit
  member EndOfStream : bool
  member Peek : unit -> int
  member Read : unit -> int + 1 overload
  member ReadLine : unit -> string
  member ReadToEnd : unit -> string
  ...

Full name: System.IO.StreamReader

--------------------
StreamReader(stream: Stream) : unit
StreamReader(path: string) : unit
StreamReader(stream: Stream, detectEncodingFromByteOrderMarks: bool) : unit
StreamReader(stream: Stream, encoding: Encoding) : unit
StreamReader(path: string, detectEncodingFromByteOrderMarks: bool) : unit
StreamReader(path: string, encoding: Encoding) : unit
StreamReader(stream: Stream, encoding: Encoding, detectEncodingFromByteOrderMarks: bool) : unit
StreamReader(path: string, encoding: Encoding, detectEncodingFromByteOrderMarks: bool) : unit
StreamReader(stream: Stream, encoding: Encoding, detectEncodingFromByteOrderMarks: bool, bufferSize: int) : unit
StreamReader(path: string, encoding: Encoding, detectEncodingFromByteOrderMarks: bool, bufferSize: int) : unit
type Encoding =
  member BodyName : string
  member Clone : unit -> obj
  member CodePage : int
  member DecoderFallback : DecoderFallback with get, set
  member EncoderFallback : EncoderFallback with get, set
  member EncodingName : string
  member Equals : value:obj -> bool
  member GetByteCount : chars:char[] -> int + 3 overloads
  member GetBytes : chars:char[] -> byte[] + 5 overloads
  member GetCharCount : bytes:byte[] -> int + 2 overloads
  ...

Full name: System.Text.Encoding
property Encoding.UTF8: Encoding
module Seq

from Microsoft.FSharp.Collections
val unfold : generator:('State -> ('T * 'State) option) -> state:'State -> seq<'T>

Full name: Microsoft.FSharp.Collections.Seq.unfold
val rs : StreamReader
val line : string
StreamReader.ReadLine() : string
String.StartsWith(value: string) : bool
String.StartsWith(value: string, comparisonType: StringComparison) : bool
String.StartsWith(value: string, ignoreCase: bool, culture: Globalization.CultureInfo) : bool
val oStr : MemoryStream
Multiple items
type MemoryStream =
  inherit Stream
  new : unit -> MemoryStream + 6 overloads
  member CanRead : bool
  member CanSeek : bool
  member CanWrite : bool
  member Capacity : int with get, set
  member Flush : unit -> unit
  member GetBuffer : unit -> byte[]
  member Length : int64
  member Position : int64 with get, set
  member Read : buffer:byte[] * offset:int * count:int -> int
  ...

Full name: System.IO.MemoryStream

--------------------
MemoryStream() : unit
MemoryStream(capacity: int) : unit
MemoryStream(buffer: byte []) : unit
MemoryStream(buffer: byte [], writable: bool) : unit
MemoryStream(buffer: byte [], index: int, count: int) : unit
MemoryStream(buffer: byte [], index: int, count: int, writable: bool) : unit
MemoryStream(buffer: byte [], index: int, count: int, writable: bool, publiclyVisible: bool) : unit
Encoding.GetBytes(s: string) : byte []
Encoding.GetBytes(chars: char []) : byte []
Encoding.GetBytes(chars: char [], index: int, count: int) : byte []
Encoding.GetBytes(chars: nativeptr<char>, charCount: int, bytes: nativeptr<byte>, byteCount: int) : int
Encoding.GetBytes(s: string, charIndex: int, charCount: int, bytes: byte [], byteIndex: int) : int
Encoding.GetBytes(chars: char [], charIndex: int, charCount: int, bytes: byte [], byteIndex: int) : int
val tweet : Tweet option
Json.DataContractJsonSerializer.ReadObject(reader: Xml.XmlDictionaryReader) : obj
Json.DataContractJsonSerializer.ReadObject(reader: Xml.XmlReader) : obj
Json.DataContractJsonSerializer.ReadObject(stream: Stream) : obj
Json.DataContractJsonSerializer.ReadObject(reader: Xml.XmlDictionaryReader, verifyObjectName: bool) : obj
Json.DataContractJsonSerializer.ReadObject(reader: Xml.XmlReader, verifyObjectName: bool) : obj
union case Option.Some: Value: 'T -> Option<'T>
union case Option.None: Option<'T>
property Option.IsSome: bool
val printfn : format:Printf.TextWriterFormat<'T> -> 'T

Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.printfn
property Option.Value: Tweet
val ex : exn
property Exception.Message: string
val choose : chooser:('T -> 'U option) -> source:seq<'T> -> seq<'U>

Full name: Microsoft.FSharp.Collections.Seq.choose
val t : Tweet option
val stopAll : unit -> unit

Full name: Script.stopAll
val iter : action:('T -> unit) -> source:seq<'T> -> unit

Full name: Microsoft.FSharp.Collections.Seq.iter
val s : HttpWebRequest
HttpWebRequest.Abort() : unit
Next Version Raw view Test code New version

More information

Link:http://fssnip.net/8m
Posted:13 years ago
Author:Faisal Waris
Tags: twitter , stream api , seq.unfold