免费网站整站模板源码门户网站建设目标

当前位置: 首页 > news >正文

免费网站整站模板源码,门户网站建设目标,房产网站建设公司,咸宁网网站前言 需求为#xff0c;统计linux上得上下行公网流量#xff0c;常规得命令如iftop 、sar、ifstat、nload等只能获取流量得大小#xff0c;不能区分公私网#xff0c;所以需要通过抓取网络包并排除私网段才能拿到公网流量。下面提供了一些有效得解决思路#xff0c;提供了…前言 需求为统计linux上得上下行公网流量常规得命令如iftop 、sar、ifstat、nload等只能获取流量得大小不能区分公私网所以需要通过抓取网络包并排除私网段才能拿到公网流量。下面提供了一些有效得解决思路提供了部分得代码片段但不提供整个代码内容。 方案 方案一在Linux服务器上安装服务抓取到数据包后发送到中央计算服务器 优点①由于计算是在中央并不会占用linux业务服务器得资源性能 缺点①通过scp或其他传输手段会占用一定得网络资源特别当数据量大时 ②中央计算服务器得资源易达到瓶颈需要开多个计算节点 方案二在Linux服务器上安装服务抓取到数据包后本地处理直接将结果发至中央 优点①中央节点压力减小数据实时性提升 缺点①每个linux服务器都需要抓包并且处理需要占用一定的计算资源且单位包越大消耗越高 方案三在同层交换机上放置旁路监控服务器抓取广播域内所有包逐个分析每个linux服务器上的流量情况 优点 ①独立部署不会占用linux服务器资源 ②维护相对容易linux业务服务器与监控节点解耦只需要维护监控节点而不需要维护linux服务器 缺点 ①需要跟linux服务器处在一个广播域并且要求性能足够当linux业务服务器数据过多时易有瓶颈 ②可能存在漏抓包的情况 ③需要监听所有得vlan并分别部署监控节点 方案四加入流控设备旁路到核心网关 优点 ①无需自研有成熟的抓取功能且有更多丰富的功能 缺点 ①成本较高 ②缺乏控制能力比如流量限速阻断等 下面主要介绍第一和第二种方法得实现 方案一技术实现 先来看技术拓扑图流程为 linux服务器开启tcpdump抓包linux服务器将pcap包发送到流量分析服务器流量服务器进行公私网流量拆分公网白名单过滤获取步骤三得结果并传输到中央 下面讲解每一步得实现细节 1.linux服务器开启tcpdump抓包 核心命令为tcpdump, -i, interface , fhost {target_ip}, -s 96,-w, output_file ,-p -i 接网卡名host接希望抓取得ip可不填-s 96 表示只抓取包头部可能有得包头超过这个长度但96依然是个比较适中得值-w 保存为pcap包-p 关闭混杂模式避免监听到广播得其他包使流量过多 #执行tcpdump抓包 def run_tcpdump(target_ip):INTERVAL 60interface getInterface() #获取网卡名timestamp datetime.now().strftime(%Y-%m-%dT%H:%M:%S) now datetime.now().strftime(%Y-%m-%d %H:%M:%S) #获取开始时间outputfile f{SAVEPATH}/xxx{timestamp}.pcaptcpdump_command [tcpdump, -i, interface , fhost {target_ip}, -s 96,-w, output_file ,-p]try:process subprocess.Popen(tcpdump_command) #使用subprocess执行shell命令time.sleep(INTERVAL) #定义休眠时间process.terminate() #结束shell命令process.wait() #等待子进程退出确保子进程清理完成except Exception as e:logger.error(fxxx) 2.linux服务器将pcap包发送到流量分析服务器 这一个步骤没有太多可讲得使用socket 或requests 或 subprocess(scp)都可以目的就是将抓取到得包发送到流量分析服务器。 3.流量服务器进行公私网流量拆分公网白名单过滤 这一步是关键步骤公私网流量公网白名单过滤拆分需要一定得计算性能所以核心代码推荐用C语言编写博主测试过同代码情况下python C java得表现能力C或C性能要远超其他语言如同体积包假设占用30%得单核心性能C可以做到1%以内。 代码要实现两点 1.公私网彻底拆分 2.过滤公网白名单内的不希望统计得数据 公私网流量拆分参考以下代码片段首先剔除三类子网地址0xFF000000 是掩码255.0.0.0 的二进制形式只保留 IP 地址的最高 8 位。0x0A000000 是 10.0.0.0 的二进制形式。 注意现在得大型网络多采用overlay得方式有些ip虽然在私网段但由于走了vxlan隧道占用了公网带宽其实也是数据公网得。后续得处理需读者自行处理。 bool is_private_ip(const std::string ip) {struct in_addr addr;inet_pton(AF_INET, ip.c_str(), addr);uint32_t ip_addr ntohl(addr.s_addr);// 10.0.0.0/8if ((ip_addr 0xFF000000) 0x0A000000) return true;// 172.16.0.0/12if ((ip_addr 0xFFF00000) 0xAC100000) return true;// 192.168.0.0/16if ((ip_addr 0xFFFF0000) 0xC0A80000) return true;return false; } 定义4个变量分别存公网下行公网上行私网下行私网上行 uint64_t public_in 0;uint64_t public_out 0;uint64_t private_in 0;uint64_t private_out 0; pcap_t *handle pcap_open_offline(pcap_file.c_str(), errbuf); #获取pcap包pcap_next_ex(handle, header, data) #调用libpcap 捕获库 const struct ip *ip_header (struct ip *)(data sizeof(struct ether_header)); #获取包得headstd::string src_ip inet_ntoa(ip_header-ip_src); #获取src ipstd::string dst_ip inet_ntoa(ip_header-ip_dst); #获取dst ip#判断是私网还是公网if (dst_ip target_ip) {if (is_private_ip(src_ip)) {private_in pkt_len;} else {public_in pkt_len;}} else if (src_ip target_ip) {if (is_private_ip(dst_ip)) {private_out pkt_len;} else {public_out pkt_len;}} 接下来是过滤不希望统计的ip名单在上面代码的基础上再做一层判断即可 #判断是私网还是公网if (dst_ip target_ip) {if (is_private_ip(src_ip)) {private_in pkt_len;} else {if (filter_write(src_ip)){ #return true则公网ip不在白名单内需要加和public_in pkt_len;}}} else if (src_ip target_ip) {if (is_private_ip(dst_ip)) {private_out pkt_len;} else {if (filter_write(src_ip)){public_out pkt_len;}}} 4.获取步骤三得结果上报并持久化 拿到公网流量后传输给用于持久化的程序进行入库操作对于此类数据建议用clickhouse进行建库对体量小的用mysql也可以接受.关于传输方式可以选择使用生产消费者方式使用消息中间件缓冲数据如Kafka。数据量小通过http/https协议传输后插入也可以。 kafka模板 def push_kafka(data, retries5, backoff_factor1):logger.info(f上传 {data})# 配置 Kafka 生产者kafka_config {bootstrap.servers: KAFKA_URL, # 替换为你的 Kafka 地址client.id: my-producer,}producer Producer(kafka_config)topic KAFKA_TOPIC # 替换为你的 Kafka 主题名# 重试逻辑for attempt in range(retries):try:# 将数据转换为 JSON 格式并发送到 Kafkaproducer.produce(topictopic, valuejson.dumps(data), callbackdelivery_report)producer.flush() # 强制刷新缓冲区logger.info(数据上传成功)return # 成功后返回except KafkaException as e:logger.error(fKafka error occurred: {e})except Exception as e:logger.error(fUnexpected error: {e})# 等待后重试wait_time backoff_factor * (2 ** attempt)logger.info(f等待 {wait_time} 秒后重试…)time.sleep(wait_time)logger.error(所有重试均失败) http/https直传 def post_with_retries_center(data, retries5, backoff_factor1):URL_CENTER 192.168.10.10 #填自己的地址logger.info(f上传{data})session requests.Session()headers {Content-Type: application/json,}# 定义重试策略retry_strategy Retry(totalretries,status_forcelist[404, 500, 502, 503, 504],allowed_methods[POST],backoff_factorbackoff_factor)# 创建适配器并将其安装到会话对象中adapter HTTPAdapter(max_retriesretry_strategy)session.mount(http://, adapter)session.mount(https://, adapter)try:response requests.post(URL_CENTER, jsondata, headersheaders)response.raise_for_status() # 如果响应状态码不是200则抛出异常except requests.exceptions.ConnectionError as conn_err:logger.error(fConnection error occurred: {conn_err})except requests.exceptions.Timeout as timeout_err:logger.error(fTimeout error occurred: {timeout_err})except requests.exceptions.RequestException as req_err:logger.error(fAn error occurred: {req_err})except Exception as e:logger.error(fUnexpected error: {e})return None 方案二技术实现 在Linux服务器上安装服务抓取到数据包后本地处理直接将结果发至中央 流程为 linux服务器开启tcpdump抓包linux服务器分析包获取私网、公网流量结果linux服务器推送消息中间件中央消费者对消息消费并入库 方案二与方案一代码几乎相同只是职权不同原linux不需要处理包现在需要处理所以处理程序应该在linux服务器上处理后推送到消息中间件由中央消费者进行消费即可。代码片段这里就不贴了 结尾 本文章内容适用于基于linux系统的流量包分析方案本身是完整闭环的。其中对包的处理快慢以及对公、私网的ip段判断、消息推送的方式这些是可以持续优化的点。由于方案本身具备一定商用价值故没有贴上源码若读者需要可联系博主提供。