Commit 3e670f4295fe8a1ed29aade82fd0186dcf452786

Authored by James McMinn
1 parent c146229995
Exists in master

Can now read both stream and snow formatted files.

Showing 1 changed file with 111 additions and 5 deletions Side-by-side Diff

1 1 package main
2 2  
3 3 import (
  4 + "bufio"
4 5 "flag"
  6 + "fmt"
5 7 "github.com/jamesmcminn/twitter"
  8 + "io"
6 9 "log"
7 10 "net"
  11 + "os"
8 12 "runtime"
  13 + "strconv"
  14 + "strings"
9 15 )
10 16  
11 17 const RECV_BUF_LEN = 1024 * 1024
12   -const MAX_CHAN_LEN = 1000000
  18 +const MAX_CHAN_LEN = 10000
  19 +
  20 +const MODE_STREAM = 0
  21 +const MODE_FILE = 1
  22 +
  23 +const FORMAT_SNOW = 0
  24 +const FORMAT_STREAM = 1
13 25  
14 26 var (
15 27 consumerKey *string = flag.String("ck", "", "Consumer Key")
16 28 consumerSecret *string = flag.String("cs", "", "Consumer Secret")
17   - ot *string = flag.String("ot", "", "Oauth Token")
  29 + ot *string = flag.String("ot", "", "OAuth Token")
18 30 osec *string = flag.String("os", "", "OAuthTokenSecret")
  31 + inputFile *string = flag.String("if", "", "Input File")
  32 + format *string = flag.String("format", "", "File Format")
  33 + port *int = flag.Int("port", 8053, "Port to listen on. Default: 8053")
19 34 firehose chan twitter.Tweet = make(chan twitter.Tweet, MAX_CHAN_LEN)
20 35 aliveStreams map[chan *[]byte]bool = make(map[chan *[]byte]bool)
  36 + mode int = -1
  37 + fileFormat int = -1
21 38 )
22 39  
23 40 func main() {
24 41 runtime.GOMAXPROCS(runtime.NumCPU() * 2)
25 42 flag.Parse()
26 43  
27   - ln, err := net.Listen("tcp", ":8053")
  44 + if *inputFile != "" {
  45 + mode = MODE_FILE
  46 + if *format == "snow" {
  47 + fileFormat = FORMAT_SNOW
  48 + } else if *format == "stream" {
  49 + fileFormat = FORMAT_STREAM
  50 + } else {
  51 + fmt.Println("Must specify file type as either -snow or -stream. See -help for details.")
  52 + return
  53 + }
  54 + } else if *consumerKey != "" || *consumerSecret != "" || *ot != "" || *osec != "" {
  55 + if *consumerKey == "" || *consumerSecret == "" || *ot == "" || *osec == "" {
  56 + fmt.Println("Must specify all of -ck, -cs, -ot and -os. See -help for details.")
  57 + return
  58 + }
  59 + } else {
  60 + fmt.Println("Must specify either Twitter OAuth details or file location and format. See -help for details.")
  61 + return
  62 + }
  63 +
  64 + // Listen on whatever port was specified
  65 + ln, err := net.Listen("tcp", ":"+strconv.Itoa(*port))
28 66 if err != nil {
29 67 log.Fatal(err)
30 68 }
  69 + log.Println("Listening on port", *port)
31 70  
32   - go twitter.FillStream(firehose, *consumerKey, *consumerSecret, *ot, *osec)
33   - go fillOutgoingStreams(aliveStreams)
  71 + if mode == MODE_STREAM {
  72 + // Open a connection the the firehose and fill output streams
  73 + go twitter.FillStream(firehose, *consumerKey, *consumerSecret, *ot, *osec)
  74 + go fillOutgoingStreams(aliveStreams)
  75 + }
34 76  
35 77 for {
36 78 conn, err := ln.Accept()
... ... @@ -41,11 +83,75 @@ func main() {
41 83 }
42 84 }
43 85  
  86 +func readFileInto(into chan *[]byte) {
  87 + f, err := os.Open(*inputFile)
  88 + if err != nil {
  89 + log.Fatal(err)
  90 + }
  91 +
  92 + bf := bufio.NewReaderSize(f, 20000)
  93 + for {
  94 + line, isPrefix, err := bf.ReadLine()
  95 + switch {
  96 + case err == io.EOF:
  97 + break
  98 + case err != nil:
  99 + log.Fatal(err)
  100 + case isPrefix:
  101 + log.Fatal("Error: Unexpected long line reading", f.Name())
  102 + }
  103 +
  104 + // Check the connection is still active
  105 + if aliveStreams[into] != true {
  106 + break
  107 + }
  108 +
  109 + var t twitter.Tweet
  110 + if fileFormat == FORMAT_STREAM {
  111 + t = twitter.JSONtoTweet(line)
  112 + } else {
  113 + t = parseSNOW(line)
  114 + }
  115 +
  116 + j, err := twitter.TweetToJSON(t)
  117 + if err != nil {
  118 + log.Println(err)
  119 + }
  120 + j = append(j, []byte("\n")...)
  121 + into <- &j
  122 + }
  123 +}
  124 +
  125 +func parseSNOW(line []byte) twitter.Tweet {
  126 + // TODO: Handle ParseInt errors
  127 + t := new(twitter.Tweet)
  128 + parts := strings.SplitN(string(line), "\t", 11)
  129 + code := parts[5]
  130 + if code == "200" {
  131 + id, _ := strconv.ParseInt(parts[6], 10, 64)
  132 + username := parts[7]
  133 + text := parts[8]
  134 + time, _ := strconv.ParseUint(parts[9], 10, 64)
  135 + t.Id = id
  136 + t.User.Name = username
  137 + t.Text = text
  138 + t.Timestamp = time
  139 + return *t
  140 + } else {
  141 + return nil
  142 + }
  143 +}
  144 +
44 145 func handleConnection(conn net.Conn) {
45 146 stream := make(chan *[]byte, MAX_CHAN_LEN)
  147 +
46 148 aliveStreams[stream] = true
47 149 log.Println("Current Connections:", len(aliveStreams))
48 150  
  151 + if mode == MODE_FILE {
  152 + go readFileInto(stream)
  153 + }
  154 +
49 155 for {
50 156 t := <-stream
51 157 _, err := conn.Write(*t)