-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathmain.go
More file actions
119 lines (100 loc) · 2.7 KB
/
main.go
File metadata and controls
119 lines (100 loc) · 2.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
package main
import (
"flag"
"fmt"
"os"
"os/signal"
"time"
"github.com/google/gopacket"
"github.com/google/gopacket/layers"
"github.com/google/gopacket/pcap"
"github.com/google/gopacket/reassembly"
"github.com/sirupsen/logrus"
)
var debugmode bool
func Init() {
flag.BoolVar(&debugmode, "debug", false, "Run as debug mode, read settings file to override task configuration if existsed.")
flag.Parse()
logrus.SetLevel(logrus.InfoLevel)
logrus.SetOutput(os.Stdout)
logrus.SetFormatter(&logrus.TextFormatter{
FullTimestamp: true,
TimestampFormat: "2006-01-02 15:04:05",
CallerPrettyfier: callerPrettyfier,
})
if debugmode {
logrus.SetLevel(logrus.DebugLevel)
logrus.SetReportCaller(true)
logrus.Infof("<---------debug mode--------->")
}
configuration.init()
}
func main() {
Init()
handle, err := pcap.OpenLive(configuration.Device, SnapshotLen, false, pcap.BlockForever)
if err != nil {
logrus.Error(err)
logrus.Fatal("Try sudo.")
}
defer handle.Close()
// 过滤出当前服务的流量
filter := fmt.Sprintf(
"(src port %s and src host %s) or (dst port %s and dst host %s)",
configuration.Port, configuration.DeviceIPv4,
configuration.Port, configuration.DeviceIPv4,
)
logrus.Debugf("Set bpf filter as: %s", filter)
if err := handle.SetBPFFilter(filter); err != nil {
logrus.Fatal(err)
}
source := gopacket.NewPacketSource(handle, handle.LinkType())
source.NoCopy = true
streamFactory := &tcpStreamFactory{}
streamPool := reassembly.NewStreamPool(streamFactory)
assembler := reassembly.NewAssembler(streamPool)
signalChan := make(chan os.Signal, 1)
signal.Notify(signalChan, os.Interrupt)
ticker := time.NewTicker(time.Second * 30)
for {
skip := false
done := false
// 等待Diff任务启动,若未启动,请勿进行抓包消耗CPU
if configuration.getIsTaskRunning() == false {
logrus.Infof("TaskID=%d is not running, self sleeping...", configuration.Taskid)
skip = true
}
select {
case <-signalChan:
logrus.Info("Caught SIGINT: aborting")
done = true
case <-ticker.C:
// 停止监听30秒内无数据传输的连接
assembler.FlushCloseOlderThan(time.Now().Add(time.Second * -30))
default:
// nop
}
if done {
break
}
if skip {
time.Sleep(time.Second)
continue
}
select {
case packet := <-source.Packets():
tcp := packet.Layer(layers.LayerTypeTCP)
if tcp != nil {
tcp := tcp.(*layers.TCP)
assembler.Assemble(packet.NetworkLayer().NetworkFlow(), tcp)
}
default:
// nop
}
}
ticker.Stop()
// Important! Please flush all connection before waiting consumers.
closed := assembler.FlushAll()
logrus.Debugf("Final flush: %d closed", closed)
streamFactory.WaitConsumers()
logrus.Info("Bye~")
}