Skip to content

Commit

Permalink
added redis sub
Browse files Browse the repository at this point in the history
  • Loading branch information
danmia committed Jul 4, 2016
1 parent 23e71be commit 7e5f60b
Show file tree
Hide file tree
Showing 7 changed files with 131 additions and 12 deletions.
7 changes: 5 additions & 2 deletions capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
)


func captureToBuffer() bytes.Buffer {
func captureToBuffer(req Capmsg) {

var (
deviceName string = "eth0"
Expand All @@ -26,6 +26,8 @@ func captureToBuffer() bytes.Buffer {
packetCount int = 0
)

fmt.Println("Capturing on interface: " + req.Interface)

var f bytes.Buffer
w := pcapgo.NewWriter(&f)
w.WriteFileHeader(uint32(snapshotLen), layers.LinkTypeEthernet)
Expand Down Expand Up @@ -53,5 +55,6 @@ func captureToBuffer() bytes.Buffer {
}

fmt.Println("Returning from capture")
return f
postBufferCloudshark(*csschemePtr, *cshostPtr, *cstokenPtr, f)
return
}
25 changes: 25 additions & 0 deletions chanBuffer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package main

// BUFFER FOR CAPTURE MESSAGES

type RingBufferCapmsg struct {
inputChannel <-chan Capmsg
outputChannel chan Capmsg
}


func NewRingBufferCapmsg(inputChannel <-chan Capmsg, outputChannel chan Capmsg) *RingBufferCapmsg {
return &RingBufferCapmsg{inputChannel, outputChannel}
}

func (r *RingBufferCapmsg) Run() {
for v := range r.inputChannel {
select {
case r.outputChannel <- v:
default:
<-r.outputChannel
r.outputChannel <- v
}
}
close(r.outputChannel)
}
33 changes: 23 additions & 10 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,35 @@ package main

import (
"flag"
"fmt"
)

var cshostPtr *string
var cstokenPtr *string
var csschemePtr *string
var upPtr *bool

func main() {

cshostPtr := flag.String("cshost", "localhost", "cloushark host")
cstokenPtr := flag.String("cstoken", "xxxxxxx", "cloushark api token")
csschemePtr := flag.String("csscheme", "https", "cloushark scheme http|https")
upPtr := flag.Bool("upload", false, "Upload pcap")
flag.Parse()
cshostPtr = flag.String("cshost", "localhost", "cloushark host")
cstokenPtr = flag.String("cstoken", "xxxxxxx", "cloushark api token")
csschemePtr = flag.String("csscheme", "https", "cloushark scheme http|https")
redisnode := flag.String("redisnode", "127.0.0.1", "Hostname|IP of redis server. Default localhost")
redisport := flag.Int("redisport", 6379, "Port of redis server. Default 6379")
redischannel := flag.String("redischannel", "capture", "Redis channel to subscribe to. Default capture")

if(*upPtr) {
pbuf := captureToBuffer();
postBufferCloudshark(*csschemePtr, *cshostPtr, *cstokenPtr, pbuf)
}
upPtr = flag.Bool("upload", false, "Upload pcap")
flag.Parse()


// Channel for thread sync
done := make(chan bool)

go func() {
fmt.Println("Starting Redis Thread")
subToRedis(*redisnode, *redisport, *redischannel)
done <- true
}()

fmt.Println("Exiting?")
<- done
}
11 changes: 11 additions & 0 deletions msg.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
{
"node": "builder1.lab5",
"interface": "bond1",
"tags": "blah,tagme,stuff",
"bpf": "dst ip 10.0.0.1",
"customer": "paypal",
"snap": 1500,
"packets": 50,
"alertid": 655443,
"duration": 15
}
2 changes: 2 additions & 0 deletions publish.msg
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
PUBLISH capture '{"duration":15,"node":"builder1.lab5","interface":"bond1","tags":"blah,tagme,stuff","bpf":"dst ip 10.0.0.1","customer":"paypal","snap":1500,"packets":50,"alertid":655443}'

46 changes: 46 additions & 0 deletions redis.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package main

import (
"fmt"
"strconv"
"encoding/json"
"github.com/garyburd/redigo/redis"
)

var c redis.Conn
var gerr error

func subToRedis(server string, port int, subchannel string) {

fmt.Println("Attempting connect to " + server)
c, gerr = redis.Dial("tcp", server + ":" + strconv.Itoa(port))
if gerr != nil {
fmt.Printf("Error connecting: %s\n", gerr)
}
fmt.Println("Connected to " + server)
psc := redis.PubSubConn{c}
psc.Subscribe(subchannel)

for {
var msg Capmsg
switch v := psc.Receive().(type) {
case redis.Message:
fmt.Printf("%s: message: %s\n", v.Channel, v.Data)
if err := json.Unmarshal(v.Data, &msg); err != nil {
fmt.Println(err)
} else {
if(*upPtr) {
go func() {
captureToBuffer(msg);
}()
}
}
case redis.Subscription:
fmt.Printf("%s: %s %d\n", v.Channel, v.Kind, v.Count)
case error:
fmt.Printf("Error: %s\n", v)
}

}

}
19 changes: 19 additions & 0 deletions structs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package main

import (
"time"
)

type Capmsg struct {
Node string `json:"node,omitempty"`
Interface string `json:"interface,omitempty"`
Tags string `json:"tags,omitempty"`
Bpf string `json:"bpf,omitempty"`
Customer string `json:"customer,omitempty"`
Snap int `json:"snap"`
Packets int `json:"packets"`
Alertid int `json:"alertid,omitempty"`
Duration time.Duration `json:"duration,omitempty"`
}

type Capmsgs []Capmsg

0 comments on commit 7e5f60b

Please sign in to comment.