Commit 9cba5a7ad54e3d77a3b63bdd31d167d88f89493f

Authored by James McMinn
0 parents
Exists in master

Intitial commit.

Showing 3 changed files with 110 additions and 0 deletions Side-by-side Diff

... ... @@ -0,0 +1,2 @@
  1 +*.sublime*
  2 +run.sh
... ... @@ -0,0 +1,9 @@
  1 +# Twitter Stream Reader
  2 +Opens a file or connects to the Twitter streaming API, and converts it to a
  3 +standard JSON format, similar to the standard Twitter API JSON format.
  4 +
  5 +## Requirements
  6 +This requires the following packages:
  7 + - github.com/mrjones/oauth
  8 + - github.com/araddon/httpstream
  9 + - github.com/jamesmcminn/twitter
0 10 \ No newline at end of file
... ... @@ -0,0 +1,99 @@
  1 +package main
  2 +
  3 +import (
  4 + "flag"
  5 + "github.com/araddon/httpstream"
  6 + "github.com/jamesmcminn/twitter"
  7 + "github.com/mrjones/oauth"
  8 + "log"
  9 + "net"
  10 + "runtime"
  11 +)
  12 +
  13 +const RECV_BUF_LEN = 1024 * 1024
  14 +const MAX_CHAN_LEN = 1000000
  15 +
  16 +var (
  17 + consumerKey *string = flag.String("ck", "", "Consumer Key")
  18 + consumerSecret *string = flag.String("cs", "", "Consumer Secret")
  19 + ot *string = flag.String("ot", "", "Oauth Token")
  20 + osec *string = flag.String("os", "", "OAuthTokenSecret")
  21 + firehose chan []byte = make(chan []byte, MAX_CHAN_LEN)
  22 + aliveStreams map[chan []byte]bool = make(map[chan []byte]bool)
  23 +)
  24 +
  25 +func main() {
  26 + runtime.GOMAXPROCS(runtime.NumCPU() * 2)
  27 + flag.Parse()
  28 + done := make(chan bool)
  29 +
  30 + httpstream.OauthCon = oauth.NewConsumer(
  31 + *consumerKey,
  32 + *consumerSecret,
  33 + oauth.ServiceProvider{
  34 + RequestTokenUrl: "http://api.twitter.com/oauth/request_token",
  35 + AuthorizeTokenUrl: "https://api.twitter.com/oauth/authorize",
  36 + AccessTokenUrl: "https://api.twitter.com/oauth/access_token",
  37 + })
  38 +
  39 + at := oauth.AccessToken{
  40 + Token: *ot,
  41 + Secret: *osec,
  42 + }
  43 +
  44 + client := httpstream.NewOAuthClient(&at, httpstream.OnlyTweetsFilter(func(line []byte) {
  45 + if len(firehose) == MAX_CHAN_LEN {
  46 + <-firehose
  47 + }
  48 + firehose <- line
  49 + }))
  50 +
  51 + err := client.Sample(done)
  52 + if err != nil {
  53 + httpstream.Log(httpstream.ERROR, err.Error())
  54 + }
  55 +
  56 + ln, err := net.Listen("tcp", ":8053")
  57 + if err != nil {
  58 + log.Fatal(err)
  59 + }
  60 +
  61 + go fillOutgoingStreams(aliveStreams)
  62 +
  63 + for {
  64 + conn, err := ln.Accept()
  65 + if err != nil {
  66 + log.Println(err)
  67 + }
  68 + go handleConnection(conn)
  69 + }
  70 +}
  71 +
  72 +func handleConnection(conn net.Conn) {
  73 + stream := make(chan []byte, MAX_CHAN_LEN)
  74 + aliveStreams[stream] = true
  75 + log.Println("Current Connections:", len(aliveStreams))
  76 +
  77 + for {
  78 + _, err := conn.Write(<-stream)
  79 + if err != nil {
  80 + println("Closing connection: ", err.Error())
  81 + break
  82 + }
  83 + }
  84 +
  85 + delete(aliveStreams, stream)
  86 + log.Println("Current Connections:", len(aliveStreams))
  87 +}
  88 +
  89 +func fillOutgoingStreams(streams map[chan []byte]bool) {
  90 + for {
  91 + item := <-firehose
  92 + for r := range streams {
  93 + if len(r) == MAX_CHAN_LEN {
  94 + <-r
  95 + }
  96 + r <- append(item, []byte("\n")...)
  97 + }
  98 + }
  99 +}