283 lines
9.2 KiB
Python
283 lines
9.2 KiB
Python
#!/usr/bin/env python3
|
|
|
|
import datetime
|
|
import ipaddress
|
|
import hashlib
|
|
import struct
|
|
import os
|
|
from scapy.all import *
|
|
p4 = bfrt.pure_cms_rdma.pipe
|
|
mirror = bfrt.mirror
|
|
pre = bfrt.pre
|
|
|
|
logfile = "/root/wly_experiment/pure_cms_rdma/log_results/sketch.log"
|
|
rdma_dir = "/root/wly_experiment/pure_cms_rdma/rdma_metadata"
|
|
|
|
# 用于判断从 Collector 发来的 RDMA 元数据是否成功存储到对应的文件中
|
|
store_flag = False
|
|
|
|
# add_with_XXX() 函数的关键字必须为小写, 否则无法正常运行 (识别不出来)
|
|
|
|
# 根据测试平台的连接情况添加静态转发规则
|
|
forwardingRules = [
|
|
("6c:ec:5a:62:a8:00", 66), # Tofino CPU 1
|
|
("08:c0:eb:24:7b:8b", 148), # Collector
|
|
("08:c0:eb:24:68:6b", 180) # Generator
|
|
]
|
|
|
|
|
|
# 将收集器的以太网地址映射到 Tonfino 对应的端口 (确保所有这些端口都存在 mcRules)
|
|
collectorEthertoPorts = [
|
|
("08:c0:eb:24:7b:8b", 148),
|
|
]
|
|
|
|
# CMS 中每个插槽的大小, 默认为 8 字节 (64 bits)
|
|
bucket_size_B = 8
|
|
|
|
# 多播规则, 用于将出口端口 (egress port) 和哈希函数的数量 (duplicate_num) 映射到多播组 ID
|
|
mcRules = [
|
|
{
|
|
"mgid":1,
|
|
"egressPort":148,
|
|
"duplicate_num":1
|
|
},
|
|
{
|
|
"mgid":2,
|
|
"egressPort":148,
|
|
"duplicate_num":2
|
|
},
|
|
{
|
|
"mgid":3,
|
|
"egressPort":148,
|
|
"duplicate_num":3
|
|
}
|
|
]
|
|
|
|
|
|
def log(text):
|
|
""" 打印日志 """
|
|
global logfile, datetime
|
|
line = "%s \t DigProc: %s" %(str(datetime.datetime.now()), str(text))
|
|
print(line)
|
|
# 覆盖式写入
|
|
f = open(logfile, "w+")
|
|
f.write(line + "\n")
|
|
f.close()
|
|
|
|
# 获取 RDMA 连接的元数据信息函数已验证无误
|
|
def getRDMAMetadata():
|
|
''' 获取 RDMA 连接的元数据信息 '''
|
|
|
|
global log, os, rdma_dir
|
|
|
|
log("Reading collector RDMA metadata from disk...")
|
|
try:
|
|
# 起始的数据包序列号
|
|
f = open("%s/tmp_psn" % rdma_dir, "r")
|
|
start_psn = int(f.read())
|
|
f.close()
|
|
|
|
# 队列对
|
|
f = open("%s/tmp_qpnum" % rdma_dir, "r")
|
|
queue_pair = int(f.read())
|
|
f.close()
|
|
|
|
# 起始内存地址
|
|
f = open("%s/tmp_memaddr" % rdma_dir, "r")
|
|
memory_start = int(f.read())
|
|
f.close()
|
|
|
|
# 能够用于存放数据的长度
|
|
f = open("%s/tmp_memlen" % rdma_dir, "r")
|
|
memory_length = int(f.read())
|
|
f.close()
|
|
|
|
# 远程键 (用于获取访问远端主机内存的权限)
|
|
f = open("%s/tmp_rkey" % rdma_dir, "r")
|
|
remote_key = int(f.read())
|
|
f.close()
|
|
except:
|
|
log(" !!! !!! Failed to read RDMA metadata !!! !!! ")
|
|
|
|
log("Collector RDMA metadata has extracted from disk!!! ")
|
|
|
|
return queue_pair, start_psn, memory_start, memory_length, remote_key
|
|
|
|
# 存储 RDMA 连接的元数据信息函数已验证无误
|
|
def storeRDMAMetadata(packet):
|
|
""" 用于对接收到的数据包进行解析, 然后将解析出的 RDMA 元数据存储到磁盘的对应文件中 """
|
|
|
|
global log, store_flag, struct, rdma_dir
|
|
|
|
# 我们使用 UDP 来携带 RDMA 连接所包含的信息
|
|
log("Receive and store RDMA connection metadata to %s" % rdma_dir)
|
|
|
|
udp_payload = packet["UDP"].load
|
|
psn, queue_pair, memory_start, memory_length, remote_key = struct.unpack("!IIQII", udp_payload)
|
|
# 将解析出的 RDMA 元数据信息写入到对应的文件中
|
|
f = open("%s/tmp_psn" % rdma_dir, "w")
|
|
f.write(str(psn))
|
|
f.close()
|
|
|
|
f = open("%s/tmp_qpnum" % rdma_dir, "w")
|
|
f.write(str(queue_pair))
|
|
f.close()
|
|
|
|
f = open("%s/tmp_memaddr" % rdma_dir, "w")
|
|
f.write(str(memory_start))
|
|
f.close()
|
|
|
|
f = open("%s/tmp_memlen" % rdma_dir, "w")
|
|
f.write(str(memory_length))
|
|
f.close()
|
|
|
|
f = open("%s/tmp_rkey" % rdma_dir, "w")
|
|
f.write(str(remote_key))
|
|
f.close()
|
|
|
|
store_flag = True
|
|
|
|
# (入口阶段) 下发转发表项函数已验证无误
|
|
def insertForwardingRules():
|
|
''' 下发转发表项 (DstIP -> Egress Port) '''
|
|
|
|
global p4, log, ipaddress, forwardingRules
|
|
log("Inserting Forwarding rules...")
|
|
|
|
# 根据目的以太网地址 (dstAddr) 来转发到对应的出口端口号 (egress port)
|
|
for dstAddr, egrPort in forwardingRules:
|
|
log("DstAddr: %s -> EgressPort: %i" % (dstAddr, egrPort))
|
|
p4.SwitchIngress.tbl_forward.add_with_forward(dstaddr=dstAddr, port=egrPort)
|
|
|
|
# (入口阶段) 下发 Key-Write 表项函数已验证无误
|
|
def insertKeyWriteRules(duplicate_num):
|
|
''' 下发 Key-Write 对应的表项 (这块是下发到 Ingress Control)
|
|
(CollectorIP <-> EgressPort, EgressPort <-> duplicate_num, mgid) '''
|
|
|
|
global p4, log, collectorEthertoPorts, mcRules
|
|
log("Inserting KeyWrite rules...")
|
|
|
|
# 获取每个 Collector 的以太网地址和对应的 egressPort
|
|
for dstAddr, egrPort in collectorEthertoPorts:
|
|
|
|
log("%s, %i, %i" % (dstAddr, egrPort, duplicate_num))
|
|
|
|
# 从 mcRules 列表中查找到正确的多播组 ID (同时匹配 duplicate_num 和 egressPort)
|
|
rule = [ r for r in mcRules if r["duplicate_num"]==duplicate_num and r["egressPort"]==egrPort]
|
|
multicastGroupID = rule[0]["mgid"]
|
|
|
|
log("Adding multiwrite rule %s, N = %i - %i" % (dstAddr, duplicate_num, multicastGroupID))
|
|
|
|
p4.SwitchIngress.tbl_prep_multicast.add_with_prep_multiwrite(dstaddr=dstAddr, mcast_grp=multicastGroupID)
|
|
|
|
|
|
# (出口阶段) 下发 Prep-MemoryAddress 表项函数已验证无误
|
|
def insertPrepMemoryAddressRules(duplicate_num):
|
|
''' 对 KeyWrite 对应的 RDMA 连接 (端口号为 1337) 所需要的表项进行配置 '''
|
|
|
|
global p4, log, ipaddress, collectorEthertoPorts, getRDMAMetadata, bucket_size_B
|
|
|
|
log("Inserting PrepKeyWrite rules...")
|
|
|
|
# 用于存储数据包序列号的寄存器索引 (每个 Collector 对应一个寄存器索引)
|
|
psn_reg_index = 0
|
|
|
|
# 获取 RDMA 连接的元数据信息
|
|
queue_pair, start_psn, memory_start, memory_length, remote_key = getRDMAMetadata()
|
|
|
|
for dstAddr, _ in collectorEthertoPorts:
|
|
|
|
log("Inserting memory slot rules for collector ip %s" % dstAddr)
|
|
|
|
# 计算 Collector 中共分配了多少个插槽,即 memory_length / (Bucket size in bytes)
|
|
collector_num_storage_slots = int(memory_length / bucket_size_B)
|
|
|
|
# 计算 CMS 的每行分配了多少个插槽 (取整), 其中 duplicate_num 仅为副本的数量
|
|
cms_rank_slots = int(collector_num_storage_slots / (duplicate_num + 1))
|
|
for i in range(duplicate_num+1):
|
|
log("multicast_pkt_num is: %d" % i)
|
|
log("start_slot is: %d" % (i * cms_rank_slots))
|
|
p4.SwitchEgress.PrepareMemoryAddress.tbl_get_start_slot.add_with_get_start_slot(multicast_pkt_num=i, start_slot=i*cms_rank_slots)
|
|
|
|
|
|
# 填充存放数据包序列号的寄存器
|
|
p4.SwitchEgress.ConvertToRDMA.reg_rdma_sequence_number.mod(f1=start_psn, register_index=psn_reg_index)
|
|
|
|
log("Inserting KeyWrite RDMA Metadata lookup rule for collector ip %s" % dstAddr)
|
|
# 生成关于 Collector 的 RDMA 元数据信息的表项, 并将其填充到对应的表中
|
|
p4.SwitchEgress.PrepareMemoryAddress.tbl_getRDMAMetadata.add_with_set_server_info(dstaddr=dstAddr, remote_key=remote_key, queue_pair=queue_pair, memory_address_start=memory_start, rank_num_slots=cms_rank_slots, qp_reg_index=psn_reg_index)
|
|
|
|
# 递增寄存器索引 (如果有多个 Collector 的话就有用)
|
|
psn_reg_index += 1
|
|
|
|
|
|
# (数据包复制引擎, PRE) 配置多播函数的处理逻辑已验证无误
|
|
def ConfigMulticast(duplicate_num):
|
|
global p4, pre, log, mcRules
|
|
|
|
log("Configuring mirroring sessions...")
|
|
|
|
lastNodeID = 0
|
|
|
|
for mcastGroup in mcRules:
|
|
# 如果 duplicate_num 不等于当前 mcRule 的 duplicate_num, 则继续循环寻找
|
|
if mcastGroup["duplicate_num"] != duplicate_num:
|
|
continue
|
|
|
|
# 多播组 ID
|
|
mgid = mcastGroup["mgid"]
|
|
# 该多播组的出口端口号
|
|
egressPort = mcastGroup["egressPort"]
|
|
# 哈希函数的个数
|
|
duplicate_num = mcastGroup["duplicate_num"]
|
|
log("Setting up multicast %i, egress port: %i, duplicate_num: %i" % (mgid, egressPort, duplicate_num))
|
|
|
|
# 每个多播节点 (RID) 都是唯一的, 并且其指向的出口端口均为 egressPort
|
|
nodeIDs = []
|
|
log("Adding multicast nodes...")
|
|
for _ in range(duplicate_num):
|
|
lastNodeID += 1
|
|
log("Creating node %i" % lastNodeID)
|
|
pre.node.add(dev_port=[egressPort], multicast_node_id=lastNodeID)
|
|
nodeIDs.append(lastNodeID)
|
|
|
|
log("Creating the multicast group")
|
|
# exclusion id 都是失效的
|
|
pre.mgid.add(mgid=mgid, multicast_node_id=nodeIDs, multicast_node_l1_xid=[0]*duplicate_num, multicast_node_l1_xid_valid=[False]*duplicate_num)
|
|
|
|
|
|
# 入口表项下发函数已验证无误
|
|
def SetIngressTableRules(duplicate_num):
|
|
""" 用于对入口控制阶段的表进行下发表项操作 """
|
|
global p4, log, insertForwardingRules, insertKeyWriteRules
|
|
|
|
log("--------------- Ingress Pipeline ---------------")
|
|
insertForwardingRules()
|
|
insertKeyWriteRules(duplicate_num)
|
|
|
|
# 出口表项下发函数已验证无误
|
|
def SetEgressTableRules(duplicate_num):
|
|
""" 用于对出口控制阶段的表进行下发表项操作 """
|
|
global p4, log, insertPrepMemoryAddressRules
|
|
|
|
log("--------------- Egress Pipeline ---------------")
|
|
insertPrepMemoryAddressRules(duplicate_num=duplicate_num)
|
|
|
|
|
|
|
|
log("Starting configure Tofino Switch...")
|
|
# 配置 PRE 中的多播组转发规则
|
|
ConfigMulticast(duplicate_num=3)
|
|
# 配置入口表项
|
|
SetIngressTableRules(duplicate_num=3)
|
|
|
|
# 接着等待 Generator 那边发送过来的 RDMA 元数据信息然后配置出口表项
|
|
filter_expr = "udp and (src port 1111) and (dst port 5555)"
|
|
sniff(filter=filter_expr, iface="enp2s0f0", prn=storeRDMAMetadata, count=1)
|
|
if store_flag:
|
|
SetEgressTableRules(duplicate_num=3)
|
|
else:
|
|
log("*** Cannot receive and process RDMA metadata correctly! ***")
|
|
|
|
log("Bootstrap complete")
|