Commit c8a10ebd55abf655319001db750029f0ef3cbe06

Authored by James McMinn
1 parent d1aab98d38
Exists in master

Swtiched to work with the HTTP streaming model used by Twitter.

Showing 2 changed files with 43 additions and 25 deletions Side-by-side Diff

... ... @@ -20,16 +20,15 @@ the binary in your current directory.
20 20 The stream recorded has a number of parameters, all of which are optional depending on your setup.
21 21  
22 22 -compress=true Perform gzip compression before writing to disk.
23   - -dir="./" Path to store downloaded tweets.
24   - -host="localhost" The hostname to dial to.
  23 + -source="http://mir:8001/firehose.json" HTTP Stream to download from
25 24 -port=8053 Port to dial on.
26 25  
27 26 ### Example Usage
28 27 The following example connects to a machine called juvented on a non-default port, specifies a directory
29 28 to store the tweets and enables compression.
30 29  
31   - twitter-stream-recorder -host=juventud -port=56874 -dir=/local/jjnas01/Public/Commonwealth -compress=true
  30 + twitter-stream-recorder -source="http://mir:8001/firehose.json" -dir=/local/jjnas01/Public/Commonwealth -compress=true
32 31  
33 32 To following disables compression and writes files to the current directory:
34 33  
35   - twitter-stream-recorder -host=juventud -port=56874 -dir=./ -compress=false
36 34 \ No newline at end of file
  35 + twitter-stream-recorder -source="http://mir:8001/firehose.json" -dir=./ -compress=false
37 36 \ No newline at end of file
... ... @@ -4,20 +4,20 @@ import (
4 4 "bufio"
5 5 "bytes"
6 6 "compress/gzip"
  7 + "errors"
7 8 "flag"
8 9 "fmt"
  10 + "io/ioutil"
9 11 "log"
10   - "net"
  12 + "net/http"
11 13 "os"
12 14 "os/signal"
13 15 "runtime"
14   - "strconv"
15 16 "time"
16 17 )
17 18  
18 19 var (
19   - port *int = flag.Int("port", 8053, "Port to dial on. Default: 8053")
20   - host *string = flag.String("host", "localhost", "The hostname of the server reading from the Twitter gradenhose.")
  20 + host *string = flag.String("source", "http://mir:8001/firehose.json", "The HTTP stream URL to read from.")
21 21 outputDir *string = flag.String("dir", "./", "Path to store downloaded tweets.")
22 22 compression *bool = flag.Bool("compress", true, "Perform gzip compression before writing to disk.")
23 23  
... ... @@ -25,6 +25,27 @@ var (
25 25 outFile *os.File // The file we're currently writing to
26 26 )
27 27  
  28 +func NewHTTPStreamClient(destination string) (*http.Response, error) {
  29 + req, err := http.NewRequest("GET", destination, nil)
  30 + if err != nil {
  31 + fmt.Println(err)
  32 + }
  33 +
  34 + resp, err := http.DefaultClient.Do(req)
  35 + if err != nil {
  36 + return nil, err
  37 + }
  38 +
  39 + if resp.StatusCode != 200 {
  40 + body, _ := ioutil.ReadAll(resp.Body)
  41 + resp.Body.Close()
  42 +
  43 + return nil, errors.New(fmt.Sprintf("Connection returned %d\n%s", resp.StatusCode, body))
  44 + }
  45 +
  46 + return resp, nil
  47 +}
  48 +
28 49 func main() {
29 50 runtime.GOMAXPROCS(runtime.NumCPU() * 2)
30 51 flag.Parse()
... ... @@ -47,25 +68,15 @@ func main() {
47 68 fileExtension = ".json"
48 69 }
49 70  
50   - // Can be toggled by the program to stop repeated connection failed messages.
51   - // This should make debugging easier in the case that a connection drops.
52   - printConnectionError := true
53   -
54 71 for {
55   - // Atempt to connect to the server
56   - conn, err := net.Dial("tcp", *host+":"+strconv.Itoa(*port))
  72 + // Configure and connect to streaming server
  73 + resp, err := NewHTTPStreamClient(*host)
57 74 if err != nil {
58   - if printConnectionError {
59   - log.Println("Could not dial remote server on " + *host + ":" + strconv.Itoa(*port))
60   - printConnectionError = false
61   - }
62   - time.Sleep(1 * time.Second)
63   - continue
  75 + log.Fatal(err)
64 76 }
65   - printConnectionError = true
66   - log.Println("Connected to server " + *host + ":" + strconv.Itoa(*port))
  77 + log.Println("Connected to streaming server.")
67 78  
68   - reader := bufio.NewReader(conn)
  79 + reader := bufio.NewReader(resp.Body)
69 80  
70 81 // Start wrting the recorded data to a file
71 82 currentHour := -1
... ... @@ -83,15 +94,23 @@ func main() {
83 94 filename := fmt.Sprintf("%4d-%02d-%02d_%02d-%02d-%02d"+fileExtension, year, month, day, hour, min, sec)
84 95 outFile, err = os.Create(*outputDir + "/" + filename)
85 96 if err != nil {
86   - log.(err)
  97 + log.Println(err)
87 98 }
88 99 log.Println("Creating new output file..." + filename)
89 100 currentHour = hour
90 101 }
91 102  
92 103 line, err := reader.ReadString('\n')
  104 +
  105 + // Try to reconnect if something goes horribly wrong
93 106 if err != nil {
94   - break
  107 + resp, err := NewHTTPStreamClient(*host)
  108 + if err != nil {
  109 + log.Println("Could not connect to English NLP Stream server. Trying again in 5 seconds.")
  110 + time.Sleep(5 * time.Second)
  111 + }
  112 + reader = bufio.NewReader(resp.Body)
  113 + continue
95 114 }
96 115  
97 116 data := []byte(line)