Commit c1462299955e1901e4757e47cf142af14d2f46f2

Authored by James McMinn
1 parent dd5e23fe5d
Exists in master

Moved dealing with Twitter streams to twitter package. Updated readme.

Showing 2 changed files with 16 additions and 43 deletions Side-by-side Diff

1   -# Twitter Utilities
  1 +# Twitter Stream Reader
2 2 Opens a file or connects to the Twitter streaming API, and converts it to a
3 3 standard JSON format, similar to the standard Twitter API JSON format.
4 4  
5 5 ## Requirements
6 6 This requires the following packages:
7   - - github.com/mrjones/oauth
8   - - github.com/araddon/httpstream
9 7 - github.com/jamesmcminn/twitter
10 8 \ No newline at end of file
... ... @@ -2,9 +2,7 @@ package main
2 2  
3 3 import (
4 4 "flag"
5   - "github.com/araddon/httpstream"
6 5 "github.com/jamesmcminn/twitter"
7   - "github.com/mrjones/oauth"
8 6 "log"
9 7 "net"
10 8 "runtime"
... ... @@ -14,50 +12,24 @@ const RECV_BUF_LEN = 1024 * 1024
14 12 const MAX_CHAN_LEN = 1000000
15 13  
16 14 var (
17   - consumerKey *string = flag.String("ck", "", "Consumer Key")
18   - consumerSecret *string = flag.String("cs", "", "Consumer Secret")
19   - ot *string = flag.String("ot", "", "Oauth Token")
20   - osec *string = flag.String("os", "", "OAuthTokenSecret")
21   - firehose chan []byte = make(chan []byte, MAX_CHAN_LEN)
22   - aliveStreams map[chan []byte]bool = make(map[chan []byte]bool)
  15 + consumerKey *string = flag.String("ck", "", "Consumer Key")
  16 + consumerSecret *string = flag.String("cs", "", "Consumer Secret")
  17 + ot *string = flag.String("ot", "", "Oauth Token")
  18 + osec *string = flag.String("os", "", "OAuthTokenSecret")
  19 + firehose chan twitter.Tweet = make(chan twitter.Tweet, MAX_CHAN_LEN)
  20 + aliveStreams map[chan *[]byte]bool = make(map[chan *[]byte]bool)
23 21 )
24 22  
25 23 func main() {
26 24 runtime.GOMAXPROCS(runtime.NumCPU() * 2)
27 25 flag.Parse()
28   - done := make(chan bool)
29   -
30   - httpstream.OauthCon = oauth.NewConsumer(
31   - *consumerKey,
32   - *consumerSecret,
33   - oauth.ServiceProvider{
34   - RequestTokenUrl: "http://api.twitter.com/oauth/request_token",
35   - AuthorizeTokenUrl: "https://api.twitter.com/oauth/authorize",
36   - AccessTokenUrl: "https://api.twitter.com/oauth/access_token",
37   - })
38   -
39   - at := oauth.AccessToken{
40   - Token: *ot,
41   - Secret: *osec,
42   - }
43   -
44   - client := httpstream.NewOAuthClient(&at, httpstream.OnlyTweetsFilter(func(line []byte) {
45   - if len(firehose) == MAX_CHAN_LEN {
46   - <-firehose
47   - }
48   - firehose <- line
49   - }))
50   -
51   - err := client.Sample(done)
52   - if err != nil {
53   - httpstream.Log(httpstream.ERROR, err.Error())
54   - }
55 26  
56 27 ln, err := net.Listen("tcp", ":8053")
57 28 if err != nil {
58 29 log.Fatal(err)
59 30 }
60 31  
  32 + go twitter.FillStream(firehose, *consumerKey, *consumerSecret, *ot, *osec)
61 33 go fillOutgoingStreams(aliveStreams)
62 34  
63 35 for {
... ... @@ -70,12 +42,13 @@ func main() {
70 42 }
71 43  
72 44 func handleConnection(conn net.Conn) {
73   - stream := make(chan []byte, MAX_CHAN_LEN)
  45 + stream := make(chan *[]byte, MAX_CHAN_LEN)
74 46 aliveStreams[stream] = true
75 47 log.Println("Current Connections:", len(aliveStreams))
76 48  
77 49 for {
78   - _, err := conn.Write(<-stream)
  50 + t := <-stream
  51 + _, err := conn.Write(*t)
79 52 if err != nil {
80 53 println("Closing connection: ", err.Error())
81 54 break
... ... @@ -86,14 +59,16 @@ func handleConnection(conn net.Conn) {
86 59 log.Println("Current Connections:", len(aliveStreams))
87 60 }
88 61  
89   -func fillOutgoingStreams(streams map[chan []byte]bool) {
  62 +func fillOutgoingStreams(streams map[chan *[]byte]bool) {
90 63 for {
91   - item := <-firehose
  64 + tweet := <-firehose
92 65 for r := range streams {
93 66 if len(r) == MAX_CHAN_LEN {
94 67 <-r
95 68 }
96   - r <- append(item, []byte("\n")...)
  69 + json, _ := twitter.TweetToJSON(tweet)
  70 + json = append(json, []byte("\n")...)
  71 + r <- &json
97 72 }
98 73 }
99 74 }