Blame view

main.go 3.35 KB
9cba5a7ad   James McMinn   Intitial commit.
1
2
3
  package main
  
  import (
3e670f429   James McMinn   Can now read both...
4
  	"bufio"
9cba5a7ad   James McMinn   Intitial commit.
5
  	"flag"
3e670f429   James McMinn   Can now read both...
6
  	"fmt"
3e670f429   James McMinn   Can now read both...
7
  	"io"
9cba5a7ad   James McMinn   Intitial commit.
8
  	"log"
c50aa0455   James McMinn   Updated paths for...
9
  	"mirgit.dcs.gla.ac.uk/JamesMcMinn/twitter"
9cba5a7ad   James McMinn   Intitial commit.
10
  	"net"
3e670f429   James McMinn   Can now read both...
11
  	"os"
9cba5a7ad   James McMinn   Intitial commit.
12
  	"runtime"
3e670f429   James McMinn   Can now read both...
13
  	"strconv"
9cba5a7ad   James McMinn   Intitial commit.
14
15
16
  )
  
  const RECV_BUF_LEN = 1024 * 1024
3e670f429   James McMinn   Can now read both...
17
18
19
20
  const MAX_CHAN_LEN = 10000
  
  const MODE_STREAM = 0
  const MODE_FILE = 1
9cba5a7ad   James McMinn   Intitial commit.
21
  var (
c14622999   James McMinn   Moved dealing wit...
22
23
  	consumerKey    *string               = flag.String("ck", "", "Consumer Key")
  	consumerSecret *string               = flag.String("cs", "", "Consumer Secret")
3e670f429   James McMinn   Can now read both...
24
  	ot             *string               = flag.String("ot", "", "OAuth Token")
c14622999   James McMinn   Moved dealing wit...
25
  	osec           *string               = flag.String("os", "", "OAuthTokenSecret")
3e670f429   James McMinn   Can now read both...
26
  	inputFile      *string               = flag.String("if", "", "Input File")
3e670f429   James McMinn   Can now read both...
27
  	port           *int                  = flag.Int("port", 8053, "Port to listen on. Default: 8053")
c14622999   James McMinn   Moved dealing wit...
28
29
  	firehose       chan twitter.Tweet    = make(chan twitter.Tweet, MAX_CHAN_LEN)
  	aliveStreams   map[chan *[]byte]bool = make(map[chan *[]byte]bool)
3e670f429   James McMinn   Can now read both...
30
31
  	mode           int                   = -1
  	fileFormat     int                   = -1
9cba5a7ad   James McMinn   Intitial commit.
32
33
34
35
36
  )
  
  func main() {
  	runtime.GOMAXPROCS(runtime.NumCPU() * 2)
  	flag.Parse()
9cba5a7ad   James McMinn   Intitial commit.
37

3e670f429   James McMinn   Can now read both...
38
39
  	if *inputFile != "" {
  		mode = MODE_FILE
3e670f429   James McMinn   Can now read both...
40
41
42
43
44
  	} else if *consumerKey != "" || *consumerSecret != "" || *ot != "" || *osec != "" {
  		if *consumerKey == "" || *consumerSecret == "" || *ot == "" || *osec == "" {
  			fmt.Println("Must specify all of -ck, -cs, -ot and -os. See -help for details.")
  			return
  		}
960d3a922   James McMinn   Removed the forma...
45
  		mode = MODE_STREAM
3e670f429   James McMinn   Can now read both...
46
47
48
49
50
51
52
  	} else {
  		fmt.Println("Must specify either Twitter OAuth details or file location and format. See -help for details.")
  		return
  	}
  
  	// Listen on whatever port was specified
  	ln, err := net.Listen("tcp", ":"+strconv.Itoa(*port))
9cba5a7ad   James McMinn   Intitial commit.
53
54
55
  	if err != nil {
  		log.Fatal(err)
  	}
3e670f429   James McMinn   Can now read both...
56
  	log.Println("Listening on port", *port)
9cba5a7ad   James McMinn   Intitial commit.
57

3e670f429   James McMinn   Can now read both...
58
59
60
61
62
  	if mode == MODE_STREAM {
  		// Open a connection the the firehose and fill output streams
  		go twitter.FillStream(firehose, *consumerKey, *consumerSecret, *ot, *osec)
  		go fillOutgoingStreams(aliveStreams)
  	}
9cba5a7ad   James McMinn   Intitial commit.
63
64
65
66
67
68
  
  	for {
  		conn, err := ln.Accept()
  		if err != nil {
  			log.Println(err)
  		}
7390f8d5d   James McMinn   Added new connect...
69
  		log.Println("New connection attempt from " + conn.RemoteAddr().String())
9cba5a7ad   James McMinn   Intitial commit.
70
71
72
  		go handleConnection(conn)
  	}
  }
960d3a922   James McMinn   Removed the forma...
73
  // Reads a file into a channel
3e670f429   James McMinn   Can now read both...
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
  func readFileInto(into chan *[]byte) {
  	f, err := os.Open(*inputFile)
  	if err != nil {
  		log.Fatal(err)
  	}
  
  	bf := bufio.NewReaderSize(f, 20000)
  	for {
  		line, isPrefix, err := bf.ReadLine()
  		switch {
  		case err == io.EOF:
  			break
  		case err != nil:
  			log.Fatal(err)
  		case isPrefix:
  			log.Fatal("Error: Unexpected long line reading", f.Name())
  		}
  
  		// Check the connection is still active
  		if aliveStreams[into] != true {
  			break
  		}
960d3a922   James McMinn   Removed the forma...
96
  		t := twitter.JSONtoTweet(line)
3e670f429   James McMinn   Can now read both...
97
98
99
100
101
102
103
104
105
106
  
  		j, err := twitter.TweetToJSON(t)
  		if err != nil {
  			log.Println(err)
  		}
  		j = append(j, []byte("
  ")...)
  		into <- &j
  	}
  }
9cba5a7ad   James McMinn   Intitial commit.
107
  func handleConnection(conn net.Conn) {
c14622999   James McMinn   Moved dealing wit...
108
  	stream := make(chan *[]byte, MAX_CHAN_LEN)
3e670f429   James McMinn   Can now read both...
109

9cba5a7ad   James McMinn   Intitial commit.
110
111
  	aliveStreams[stream] = true
  	log.Println("Current Connections:", len(aliveStreams))
3e670f429   James McMinn   Can now read both...
112
113
114
  	if mode == MODE_FILE {
  		go readFileInto(stream)
  	}
9cba5a7ad   James McMinn   Intitial commit.
115
  	for {
c14622999   James McMinn   Moved dealing wit...
116
117
  		t := <-stream
  		_, err := conn.Write(*t)
9cba5a7ad   James McMinn   Intitial commit.
118
  		if err != nil {
960d3a922   James McMinn   Removed the forma...
119
  			log.Println("Closing connection: ", err.Error())
9cba5a7ad   James McMinn   Intitial commit.
120
121
122
123
124
125
126
  			break
  		}
  	}
  
  	delete(aliveStreams, stream)
  	log.Println("Current Connections:", len(aliveStreams))
  }
c14622999   James McMinn   Moved dealing wit...
127
  func fillOutgoingStreams(streams map[chan *[]byte]bool) {
9cba5a7ad   James McMinn   Intitial commit.
128
  	for {
c14622999   James McMinn   Moved dealing wit...
129
  		tweet := <-firehose
9cba5a7ad   James McMinn   Intitial commit.
130
131
132
133
  		for r := range streams {
  			if len(r) == MAX_CHAN_LEN {
  				<-r
  			}
c14622999   James McMinn   Moved dealing wit...
134
135
136
137
  			json, _ := twitter.TweetToJSON(tweet)
  			json = append(json, []byte("
  ")...)
  			r <- &json
9cba5a7ad   James McMinn   Intitial commit.
138
139
140
  		}
  	}
  }