[Python] 纯文本查看 复制代码 import asyncio
import websockets
import struct
import json
from io import BytesIO
class WSSBinaryParser:
def __init__(self):
self.message_handlers = {
0x01: self._parse_text_message,
0x02: self._parse_json_message,
0x03: self._parse_image_message,
0x04: self._parse_float_array,
0x05: self._parse_multipart_message
}
async def parse_message(self, binary_data):
if not binary_data:
raise ValueError("Empty binary data received")
msg_type = binary_data[0]
handler = self.message_handlers.get(msg_type)
if not handler:
raise ValueError(f"Unknown message type: 0x{msg_type:02x}")
return handler(binary_data[1:])
def _parse_text_message(self, data):
text_length = struct.unpack('>I', data[:4])[0]
text_data = data[4:4+text_length].decode('utf-8')
return {"type": "text", "content": text_data}
def _parse_json_message(self, data):
json_length = struct.unpack('>I', data[:4])[0]
json_data = json.loads(data[4:4+json_length].decode('utf-8'))
return {"type": "json", "content": json_data}
def _parse_image_message(self, data):
format_length = struct.unpack('>I', data[:4])[0]
image_format = data[4:4+format_length].decode('utf-8')
image_data = data[4+format_length:]
return {
"type": "image",
"format": image_format,
"size": len(image_data),
"data": BytesIO(image_data)
}
def _parse_float_array(self, data):
array_length = struct.unpack('>I', data[:4])[0]
float_array = struct.unpack(f'<{array_length}f', data[4:4+array_length*4])
return {"type": "float_array", "values": list(float_array)}
def _parse_multipart_message(self, data):
part_count = struct.unpack('>I', data[:4])[0]
position = 4
parts = []
for _ in range(part_count):
part_type = data[position]
position += 1
part_length = struct.unpack('>I', data[position:position+4])[0]
position += 4
part_data = data[position:position+part_length]
position += part_length
parts.append(self.parse_message(bytes([part_type]) + part_data))
return {"type": "multipart", "parts": parts}
class WSSClient:
def __init__(self, uri):
self.uri = uri
self.parser = WSSBinaryParser()
async def connect(self):
self.connection = await websockets.connect(self.uri)
print(f"Connected to {self.uri}")
async def receive_messages(self):
try:
async for message in self.connection:
if isinstance(message, bytes):
try:
parsed = await self.parser.parse_message(message)
self.handle_message(parsed)
except Exception as e:
print(f"Error parsing message: {e}")
else:
print(f"Received text message: {message}")
except websockets.exceptions.ConnectionClosed:
print("Connection closed")
def handle_message(self, parsed_message):
msg_type = parsed_message["type"]
if msg_type == "text":
print(f"Text message: {parsed_message['content']}")
elif msg_type == "json":
print(f"JSON message: {json.dumps(parsed_message['content'], indent=2)}")
elif msg_type == "image":
print(f"Image received: {parsed_message['format']}, size: {parsed_message['size']} bytes")
elif msg_type == "float_array":
print(f"Float array: {parsed_message['values'][:10]}... (total {len(parsed_message['values'])} values)")
elif msg_type == "multipart":
print(f"Multipart message with {len(parsed_message['parts'])} parts:")
for i, part in enumerate(parsed_message['parts']):
print(f" Part {i+1}: {part['type']}")
async def main():
client = WSSClient("wss地址")
await client.connect()
await client.receive_messages()
if __name__ == "__main__":
asyncio.run(main())
|