网站域名续费,泰安房产信息网网签查询,phpcms网站打开空白,泉州市网站制作企业Python套接字综合应用(UDP篇)
1、 主要功能
UDP客户端实现UDP服务端实现输出字体颜色控制响应捕获键盘CtrlC信号套接字异常捕获及处理通信报文16进制格式化输出
2、 Python UDP套接字应用
Windows程序在WinServer2022上验证运行#xff0c;Linux程序在银河麒麟V10上验证运…Python套接字综合应用(UDP篇)
1、 主要功能
UDP客户端实现UDP服务端实现输出字体颜色控制响应捕获键盘CtrlC信号套接字异常捕获及处理通信报文16进制格式化输出
2、 Python UDP套接字应用
Windows程序在WinServer2022上验证运行Linux程序在银河麒麟V10上验证运行。
通过《网络调试助手》进行调试验证工具运行界面如下
①、 Linux服务端
server.py
# -*- coding: gbk -*-import socket
import datetime#服务端参数设置
listen_addr 192.168.58.145
listen_port 1281#输出字体颜色控制
TEXT_COLOR_WHITE \033[97m
TEXT_COLOR_MAGENTA \033[95m
TEXT_COLOR_BLUE \033[94m
TEXT_COLOR_YELLOW \033[93m
TEXT_COLOR_GREEN \033[92m
TEXT_COLOR_RED \033[91m
TXET_COLOR_DEFAULT \033[0m#输出字体大小控制
TEXT_FONT_WEIGHT_DEFAULT \033[0m
TEXT_FONT_WEIGHT_BOLD \033[1m\033[5m
TEXT_FONT_WEIGHT_THIN \033[2m\033[3m#1. 创建套接字
sock socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
#2. 绑定地址
sock.bind((listen_addr, listen_port))
#3. 接收数据
while True:rx_msg, peer_endpoint sock.recvfrom(2048) print(\n)# 获取当前时间now datetime.datetime.now()str_date now.strftime(%Y-%m-%d %H:%M:%S)str_ms f{now.strftime(%f)[:3]}.zfill(3)str_now f[{str_date}.{str_ms}]#打印客户地址str_peer [{}].format(peer_endpoint)str_rx [RX]#打印头部信息print(f%s % str_rx str_now str_peer)#接收内容格式化16进制line [%02X % i for i in rx_msg]str_rx_msg .join(line)print(f{TEXT_COLOR_RED}%s{TXET_COLOR_DEFAULT} % str_rx_msg)#发送信息给对方tx_msg rx_msgsock.sendto(tx_msg,peer_endpoint)str_tx [TX]now datetime.datetime.now()str_date now.strftime(%Y-%m-%d %H:%M:%S)str_ms f{now.strftime(%f)[:3]}.zfill(3)str_now f[{str_date}.{str_ms}]print(f%s % str_tx str_now str_peer)#发送内容格式化line [%02X % i for i in tx_msg]str_tx_msg .join(line)print(f{TEXT_COLOR_BLUE}%s{TXET_COLOR_DEFAULT} % str_tx_msg)sock.close()运行效果
②、 Linux客户端
client.py
# -*- coding: gbk -*-import socket
import datetime
import time#本地网络参数设置
local_bind_addr 192.168.58.145
local_bind_port 1281#对方网络参数设置
remote_ep_addr 192.168.58.1
remote_ep_port 50001#交互模式
#[0]: 固定次数、固定间隔发送用户只输入一次
#[1]: 每次要求用户输入用户输入后才发送默认方式
tx_mode 0#只有tx_mode为[0]时生效
tx_count 10
tx_interval_second 2#输出字体颜色控制
TEXT_COLOR_WHITE \033[97m
TEXT_COLOR_MAGENTA \033[95m
TEXT_COLOR_BLUE \033[94m
TEXT_COLOR_YELLOW \033[93m
TEXT_COLOR_GREEN \033[92m
TEXT_COLOR_RED \033[91m
TXET_COLOR_DEFAULT \033[0m#1. 创建套接字
sock socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
#2. 绑定本地地址
sock.bind((local_bind_addr, local_bind_port))
#设置接收超时时间单位为秒
timeout_seconds 2
sock.settimeout(timeout_seconds)#每次要求用户输入用户输入后才发送默认方式
if tx_mode 1 :while True:#3. 接收用户输入input_stream input(请输入16进制发送报文数据以空格分隔:)#4. 发送数据sock.sendto(bytearray.fromhex(input_stream),(remote_ep_addr,remote_ep_port))str_tx [TX]now datetime.datetime.now()str_date now.strftime(%Y-%m-%d %H:%M:%S)str_ms f{now.strftime(%f)[:3]}.zfill(3)str_now f[{str_date}.{str_ms}]str_remote_addr [{}].format((remote_ep_addr,remote_ep_port))print(f%s % str_tx str_now str_remote_addr)#发送内容格式化print(f{TEXT_COLOR_BLUE}%s{TXET_COLOR_DEFAULT} % input_stream.upper())#5.接收数据try:rx_msg, peer_endpoint sock.recvfrom(2048) print(\n)# 获取当前时间now datetime.datetime.now()str_date now.strftime(%Y-%m-%d %H:%M:%S)str_ms f{now.strftime(%f)[:3]}.zfill(3)str_now f[{str_date}.{str_ms}]#打印对端地址str_peer [{}].format(peer_endpoint)str_rx [RX]#打印头部信息print(f%s % str_rx str_now str_peer)#接收内容格式化16进制line [%02X % i for i in rx_msg]str_rx_msg .join(line)print(f{TEXT_COLOR_RED}%s{TXET_COLOR_DEFAULT} % str_rx_msg.upper())except socket.timeout:#超时继续continue#固定次数、固定间隔发送用户只输入一次
elif tx_mode 0 :#3. 接收用户输入input_stream input(请输入16进制发送报文数据以空格分隔:)while tx_count 0:#4. 发送数据sock.sendto(bytearray.fromhex(input_stream),(remote_ep_addr,remote_ep_port))str_tx [TX]now datetime.datetime.now()str_date now.strftime(%Y-%m-%d %H:%M:%S)str_ms f{now.strftime(%f)[:3]}.zfill(3)str_now f[{str_date}.{str_ms}]str_remote_addr [{}].format((remote_ep_addr,remote_ep_port))print(f%s % str_tx str_now str_remote_addr)#发送内容格式化print(f{TEXT_COLOR_BLUE}%s{TXET_COLOR_DEFAULT} % input_stream.upper())#5.接收数据try:rx_msg, peer_endpoint sock.recvfrom(2048) print(\n)# 获取当前时间now datetime.datetime.now()str_date now.strftime(%Y-%m-%d %H:%M:%S)str_ms f{now.strftime(%f)[:3]}.zfill(3)str_now f[{str_date}.{str_ms}]#打印对端地址str_peer [{}].format(peer_endpoint)str_rx [RX]#打印头部信息print(f%s % str_rx str_now str_peer)#接收内容格式化16进制line [%02X % i for i in rx_msg]str_rx_msg .join(line)print(f{TEXT_COLOR_RED}%s{TXET_COLOR_DEFAULT} % str_rx_msg.upper())except socket.timeout:#接收超时continueexcept Exception as e:print(f[处理异常]: {e})print(f{TEXT_COLOR_RED}terminating...{TXET_COLOR_DEFAULT})sock.close()sys.exit(0)tx_count - 1time.sleep(tx_interval_second)sock.close()运行效果
③、 Windows服务端
server.py
# -*- coding: gbk -*-import socket
import datetime
import sys#服务端参数设置
listen_addr 192.168.58.151
listen_port 1281#输出字体颜色控制
TEXT_COLOR_WHITE \033[1;30m
TEXT_COLOR_GRAY \033[1;37m
TEXT_COLOR_MAGENTA \033[1;35m
TEXT_COLOR_BLUE \033[1;34m
TEXT_COLOR_YELLOW \033[1;33m
TEXT_COLOR_GREEN \033[1;32m
TEXT_COLOR_RED \033[1;31m
TXET_COLOR_DEFAULT \033[0m#输出字体大小控制
TEXT_FONT_WEIGHT_DEFAULT \033[0m
TEXT_FONT_WEIGHT_BOLD \033[1m\033[5m
TEXT_FONT_WEIGHT_THIN \033[2m\033[3m# \033是转义序列的开始后面跟着一个或多个字符来指定具体的样式。
# [0m表示默认样式[1m表示加粗[2m表示常规[5m表示放大[3m表示缩小。#仅限Windows系统
#print颜色控制失败时调用如下语句
import os
if os.name nt:os.system()#捕获键盘CtrlC信号
import signal
def signal_handler(signal,code):print(f{TEXT_COLOR_RED}terminating...{TXET_COLOR_DEFAULT})sys.exit(0)signal.signal(signal.SIGINT,signal_handler)
signal.signal(signal.SIGTERM,signal_handler)#16进制格式化输出
def print_hex(bytes):line [%02X % i for i in bytes] print( .join(line))#1. 创建套接字
sock socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
#2. 绑定地址
sock.bind((listen_addr, listen_port))
#3. 设置接收超时时间单位为秒
timeout_seconds 2
sock.settimeout(timeout_seconds)
#4. 接收数据
while True:try:rx_msg, peer_endpoint sock.recvfrom(2048) print(\n)# 获取当前时间now datetime.datetime.now()str_date now.strftime(%Y-%m-%d %H:%M:%S)str_ms f{now.strftime(%f)[:3]}.zfill(3)str_now f[{str_date}.{str_ms}]#打印客户地址str_peer [{}].format(peer_endpoint)str_rx [RX]#打印头部信息print(f{TEXT_COLOR_GRAY}%s{TXET_COLOR_DEFAULT} % str_rx str_now str_peer)#接收内容格式化16进制line [%02X % i for i in rx_msg]str_rx_msg .join(line)print(f{TEXT_COLOR_BLUE}%s{TXET_COLOR_DEFAULT} % str_rx_msg)#发送信息给对方tx_msg rx_msgsock.sendto(tx_msg,peer_endpoint)str_tx [TX]now datetime.datetime.now()str_date now.strftime(%Y-%m-%d %H:%M:%S)str_ms f{now.strftime(%f)[:3]}.zfill(3)str_now f[{str_date}.{str_ms}]print(f{TEXT_COLOR_GRAY}%s{TXET_COLOR_DEFAULT} % str_tx str_now str_peer)#发送内容格式化line [%02X % i for i in tx_msg]str_tx_msg .join(line)print(f{TEXT_COLOR_GREEN}%s{TXET_COLOR_DEFAULT} % str_tx_msg)except socket.timeout:#超时继续continue
sock.close()运行效果
④、 Windows客户端
client.py
# -*- coding: gbk -*-import socket
import datetime
import time#本地网络参数设置
local_bind_addr 192.168.58.131
local_bind_port 1281#对方网络参数设置
remote_ep_addr 192.168.58.1
remote_ep_port 50001#交互模式
#[0]: 固定次数、固定间隔发送用户只输入一次
#[1]: 每次要求用户输入用户输入后才发送默认方式
tx_mode 0#只有tx_mode为[0]时生效
tx_count 10
tx_interval_second 2#仅限Windows系统
#print颜色控制失败时调用如下语句
import os
if os.name nt:os.system()#捕获键盘CtrlC信号
import signal
def signal_handler(signal,code):print(f{TEXT_COLOR_RED}terminating...{TXET_COLOR_DEFAULT})sys.exit(0)signal.signal(signal.SIGINT,signal_handler)
signal.signal(signal.SIGTERM,signal_handler)#输出字体颜色控制
TEXT_COLOR_WHITE \033[1;30m
TEXT_COLOR_GRAY \033[1;37m
TEXT_COLOR_MAGENTA \033[1;35m
TEXT_COLOR_BLUE \033[1;34m
TEXT_COLOR_YELLOW \033[1;33m
TEXT_COLOR_GREEN \033[1;32m
TEXT_COLOR_RED \033[1;31m
TXET_COLOR_DEFAULT \033[0m#1. 创建套接字
sock socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
#2. 绑定本地地址
sock.bind((local_bind_addr, local_bind_port))
#设置接收超时时间单位为秒
timeout_seconds 2
sock.settimeout(timeout_seconds)#每次要求用户输入用户输入后才发送默认方式
if tx_mode 1 :while True:#3. 接收用户输入input_stream input(请输入16进制发送报文数据以空格分隔:)#4. 发送数据sock.sendto(bytearray.fromhex(input_stream),(remote_ep_addr,remote_ep_port))str_tx [TX]now datetime.datetime.now()str_date now.strftime(%Y-%m-%d %H:%M:%S)str_ms f{now.strftime(%f)[:3]}.zfill(3)str_now f[{str_date}.{str_ms}]str_remote_addr [{}].format((remote_ep_addr,remote_ep_port))print(f%s % str_tx str_now str_remote_addr)#发送内容格式化print(f{TEXT_COLOR_BLUE}%s{TXET_COLOR_DEFAULT} % input_stream.upper())#5.接收数据try:rx_msg, peer_endpoint sock.recvfrom(2048) print(\n)# 获取当前时间now datetime.datetime.now()str_date now.strftime(%Y-%m-%d %H:%M:%S)str_ms f{now.strftime(%f)[:3]}.zfill(3)str_now f[{str_date}.{str_ms}]#打印对端地址str_peer [{}].format(peer_endpoint)str_rx [RX]#打印头部信息print(f%s % str_rx str_now str_peer)#接收内容格式化16进制line [%02X % i for i in rx_msg]str_rx_msg .join(line)print(f{TEXT_COLOR_RED}%s{TXET_COLOR_DEFAULT} % str_rx_msg.upper())except socket.timeout:#超时继续continue#固定次数、固定间隔发送用户只输入一次
elif tx_mode 0 :#3. 接收用户输入input_stream input(请输入16进制发送报文数据以空格分隔:)while tx_count 0:#4. 发送数据sock.sendto(bytearray.fromhex(input_stream),(remote_ep_addr,remote_ep_port))str_tx [TX]now datetime.datetime.now()str_date now.strftime(%Y-%m-%d %H:%M:%S)str_ms f{now.strftime(%f)[:3]}.zfill(3)str_now f[{str_date}.{str_ms}]str_remote_addr [{}].format((remote_ep_addr,remote_ep_port))print(f%s % str_tx str_now str_remote_addr)#发送内容格式化print(f{TEXT_COLOR_BLUE}%s{TXET_COLOR_DEFAULT} % input_stream.upper())#5.接收数据try:rx_msg, peer_endpoint sock.recvfrom(2048) print(\n)# 获取当前时间now datetime.datetime.now()str_date now.strftime(%Y-%m-%d %H:%M:%S)str_ms f{now.strftime(%f)[:3]}.zfill(3)str_now f[{str_date}.{str_ms}]#打印对端地址str_peer [{}].format(peer_endpoint)str_rx [RX]#打印头部信息print(f%s % str_rx str_now str_peer)#接收内容格式化16进制line [%02X % i for i in rx_msg]str_rx_msg .join(line)print(f{TEXT_COLOR_RED}%s{TXET_COLOR_DEFAULT} % str_rx_msg.upper())except socket.timeout:#接收超时continueexcept Exception as e:print(f[处理异常]: {e})print(f{TEXT_COLOR_RED}terminating...{TXET_COLOR_DEFAULT})sock.close()sys.exit(0)tx_count - 1time.sleep(tx_interval_second)sock.close()运行效果