Skip to content

Commit b4ce401

Browse files
authored
Merge pull request #1904 from nplanel/flow-metrics-check-upperlayers
flow : metrics : check upper layers
2 parents effd652 + f0d0cf7 commit b4ce401

4 files changed

Lines changed: 127 additions & 17 deletions

File tree

flow/flow.go

Lines changed: 95 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,8 @@ type flowState struct {
6262
lastMetric FlowMetric
6363
rtt1stPacket int64
6464
updateVersion int64
65+
ipv4 *layers.IPv4
66+
ipv6 *layers.IPv6
6567
}
6668

6769
// Packet describes one packet
@@ -725,6 +727,82 @@ func (f *Flow) updateRTT(packet *Packet) {
725727
}
726728
}
727729

730+
func (f *Flow) getNetworkLayer(packet *Packet) (*layers.IPv4, *layers.IPv6) {
731+
if f.XXX_state.ipv4 != nil {
732+
return f.XXX_state.ipv4, nil
733+
}
734+
if f.XXX_state.ipv6 != nil {
735+
return nil, f.XXX_state.ipv6
736+
}
737+
738+
ipv4Layer := packet.Layer(layers.LayerTypeIPv4)
739+
if ipv4Packet, ok := ipv4Layer.(*layers.IPv4); ok {
740+
f.XXX_state.ipv4 = ipv4Packet
741+
return f.XXX_state.ipv4, nil
742+
}
743+
744+
ipv6Layer := packet.Layer(layers.LayerTypeIPv6)
745+
if ipv6Packet, ok := ipv6Layer.(*layers.IPv6); ok {
746+
f.XXX_state.ipv6 = ipv6Packet
747+
return nil, f.XXX_state.ipv6
748+
}
749+
750+
return nil, nil
751+
}
752+
753+
func (f *Flow) isABPacket(packet *Packet) bool {
754+
if f.Network == nil {
755+
return false
756+
}
757+
cmp := false
758+
759+
ipv4Packet, ipv6Packet := f.getNetworkLayer(packet)
760+
if ipv4Packet != nil {
761+
cmp = f.Network.A == ipv4Packet.SrcIP.String()
762+
if bytes.Compare(ipv4Packet.SrcIP, ipv4Packet.DstIP) == 0 && f.Transport != nil {
763+
tcpLayer := packet.Layer(layers.LayerTypeTCP)
764+
if tcpPacket, ok := tcpLayer.(*layers.TCP); ok {
765+
return f.Transport.A > int64(tcpPacket.SrcPort)
766+
}
767+
udpLayer := packet.Layer(layers.LayerTypeUDP)
768+
if udpPacket, ok := udpLayer.(*layers.UDP); ok {
769+
return f.Transport.A > int64(udpPacket.SrcPort)
770+
}
771+
sctpLayer := packet.Layer(layers.LayerTypeSCTP)
772+
if sctpPacket, ok := sctpLayer.(*layers.SCTP); ok {
773+
return f.Transport.A > int64(sctpPacket.SrcPort)
774+
}
775+
icmpLayer := packet.Layer(layers.LayerTypeICMPv4)
776+
if icmpPacket, ok := icmpLayer.(*layers.ICMPv4); ok {
777+
return f.ICMP.Type > ICMPv4TypeToFlowICMPType(icmpPacket.TypeCode.Type())
778+
}
779+
}
780+
}
781+
if ipv6Packet != nil {
782+
cmp = f.Network.A == ipv6Packet.SrcIP.String()
783+
if bytes.Compare(ipv6Packet.SrcIP, ipv6Packet.DstIP) == 0 && f.Transport != nil {
784+
tcpLayer := packet.Layer(layers.LayerTypeTCP)
785+
if tcpPacket, ok := tcpLayer.(*layers.TCP); ok {
786+
return f.Transport.A > int64(tcpPacket.SrcPort)
787+
}
788+
udpLayer := packet.Layer(layers.LayerTypeUDP)
789+
if udpPacket, ok := udpLayer.(*layers.UDP); ok {
790+
return f.Transport.A > int64(udpPacket.SrcPort)
791+
}
792+
sctpLayer := packet.Layer(layers.LayerTypeSCTP)
793+
if sctpPacket, ok := sctpLayer.(*layers.SCTP); ok {
794+
return f.Transport.A > int64(sctpPacket.SrcPort)
795+
}
796+
icmpLayer := packet.Layer(layers.LayerTypeICMPv6)
797+
if icmpPacket, ok := icmpLayer.(*layers.ICMPv6); ok {
798+
return f.ICMP.Type > ICMPv6TypeToFlowICMPType(icmpPacket.TypeCode.Type())
799+
}
800+
}
801+
}
802+
803+
return cmp
804+
}
805+
728806
func (f *Flow) updateMetricsWithLinkLayer(packet *Packet) bool {
729807
ethernetPacket := getLinkLayer(packet)
730808
if ethernetPacket == nil || f.Link == nil {
@@ -736,7 +814,12 @@ func (f *Flow) updateMetricsWithLinkLayer(packet *Packet) bool {
736814
length = getLinkLayerLength(ethernetPacket)
737815
}
738816

739-
if f.Link.A == ethernetPacket.SrcMAC.String() {
817+
/* found a layer that can identify the connection way */
818+
cmp := f.Link.A == ethernetPacket.SrcMAC.String()
819+
if bytes.Compare(ethernetPacket.SrcMAC, ethernetPacket.DstMAC) == 0 {
820+
cmp = f.isABPacket(packet)
821+
}
822+
if cmp {
740823
f.Metric.ABPackets++
741824
f.Metric.ABBytes += length
742825
} else {
@@ -748,8 +831,8 @@ func (f *Flow) updateMetricsWithLinkLayer(packet *Packet) bool {
748831
}
749832

750833
func (f *Flow) newNetworkLayer(packet *Packet) error {
751-
ipv4Layer := packet.Layer(layers.LayerTypeIPv4)
752-
if ipv4Packet, ok := ipv4Layer.(*layers.IPv4); ok {
834+
ipv4Packet, ipv6Packet := f.getNetworkLayer(packet)
835+
if ipv4Packet != nil {
753836
f.Network = &FlowLayer{
754837
Protocol: FlowProtocol_IPV4,
755838
A: ipv4Packet.SrcIP.String(),
@@ -769,8 +852,7 @@ func (f *Flow) newNetworkLayer(packet *Packet) error {
769852
return nil
770853
}
771854

772-
ipv6Layer := packet.Layer(layers.LayerTypeIPv6)
773-
if ipv6Packet, ok := ipv6Layer.(*layers.IPv6); ok {
855+
if ipv6Packet != nil {
774856
f.Network = &FlowLayer{
775857
Protocol: FlowProtocol_IPV6,
776858
A: ipv6Packet.SrcIP.String(),
@@ -800,12 +882,12 @@ func (f *Flow) newNetworkLayer(packet *Packet) error {
800882
}
801883

802884
func (f *Flow) updateMetricsWithNetworkLayer(packet *Packet, length int64) error {
803-
ipv4Layer := packet.Layer(layers.LayerTypeIPv4)
804-
if ipv4Packet, ok := ipv4Layer.(*layers.IPv4); ok {
885+
ipv4Packet, ipv6Packet := f.getNetworkLayer(packet)
886+
if ipv4Packet != nil {
805887
if length == 0 {
806888
length = int64(ipv4Packet.Length)
807889
}
808-
if f.Network.A == ipv4Packet.SrcIP.String() {
890+
if f.isABPacket(packet) {
809891
f.Metric.ABPackets++
810892
f.Metric.ABBytes += length
811893
} else {
@@ -815,12 +897,11 @@ func (f *Flow) updateMetricsWithNetworkLayer(packet *Packet, length int64) error
815897

816898
return nil
817899
}
818-
ipv6Layer := packet.Layer(layers.LayerTypeIPv6)
819-
if ipv6Packet, ok := ipv6Layer.(*layers.IPv6); ok {
900+
if ipv6Packet != nil {
820901
if length == 0 {
821902
length = int64(ipv6Packet.Length)
822903
}
823-
if f.Network.A == ipv6Packet.SrcIP.String() {
904+
if f.isABPacket(packet) {
824905
f.Metric.ABPackets++
825906
f.Metric.ABBytes += length
826907
} else {
@@ -860,19 +941,16 @@ func (f *Flow) updateTCPMetrics(packet *Packet) error {
860941

861942
var srcIP string
862943
var timeToLive uint32
944+
ipv4Packet, ipv6Packet := f.getNetworkLayer(packet)
863945
switch f.Network.Protocol {
864946
case FlowProtocol_IPV4:
865-
ipv4Layer := packet.Layer(layers.LayerTypeIPv4)
866-
ipv4Packet, ok := ipv4Layer.(*layers.IPv4)
867-
if !ok {
947+
if ipv4Packet == nil {
868948
return ErrLayerNotFound
869949
}
870950
srcIP = ipv4Packet.SrcIP.String()
871951
timeToLive = uint32(ipv4Packet.TTL)
872952
case FlowProtocol_IPV6:
873-
ipv6Layer := packet.Layer(layers.LayerTypeIPv6)
874-
ipv6Packet, ok := ipv6Layer.(*layers.IPv6)
875-
if !ok {
953+
if ipv6Packet == nil {
876954
return ErrLayerNotFound
877955
}
878956
srcIP = ipv6Packet.SrcIP.String()

flow/flow_test.go

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1736,6 +1736,35 @@ func TestGREEthernet(t *testing.T) {
17361736
validatePCAP(t, "pcaptraces/gre-gre-icmpv4.pcap", layers.LinkTypeEthernet, nil, expected)
17371737
}
17381738

1739+
func TestL2L3EqualSrcDst(t *testing.T) {
1740+
expected := []*Flow{
1741+
{
1742+
LayersPath: "Ethernet/IPv4/TCP",
1743+
Application: "TCP",
1744+
Link: &FlowLayer{
1745+
Protocol: FlowProtocol_ETHERNET,
1746+
A: "00:00:00:00:00:00",
1747+
B: "00:00:00:00:00:00",
1748+
ID: 0,
1749+
},
1750+
Network: &FlowLayer{
1751+
Protocol: FlowProtocol_IPV4,
1752+
A: "127.0.0.1",
1753+
B: "127.0.0.1",
1754+
ID: 0,
1755+
},
1756+
Metric: &FlowMetric{
1757+
ABPackets: 10,
1758+
ABBytes: 668,
1759+
BAPackets: 13,
1760+
BABytes: 263010,
1761+
},
1762+
},
1763+
}
1764+
1765+
validatePCAP(t, "pcaptraces/iperf-same-L2L3.pcap", layers.LinkTypeEthernet, nil, expected)
1766+
}
1767+
17391768
func benchmarkPacketParsing(b *testing.B, filename string, linkType layers.LinkType) {
17401769
handleRead, err := pcap.OpenOffline(filename)
17411770
if err != nil {
258 KB
Binary file not shown.

flow/table.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -389,6 +389,9 @@ func (ft *Table) packetToFlow(packet *Packet, parentUUID string) *Flow {
389389
flow.Update(packet, &ft.opts)
390390
}
391391

392+
/* we need to reset state here to avoid re-using underlayer in tunnel */
393+
flow.XXX_state.ipv4 = nil
394+
flow.XXX_state.ipv6 = nil
392395
flow.XXX_state.updateVersion = ft.updateVersion + 1
393396

394397
if ft.Opts.RawPacketLimit != 0 && flow.RawPacketsCaptured < ft.Opts.RawPacketLimit {

0 commit comments

Comments
 (0)