websocketman不注册老是自动跳到关于页面,实在受不了了,自己写了一个。 有需要的网盘自取
源码部分
https://wwtg.lanzouo.com/b026xsrlxc
密码:8n2fimport websocket
import threading
import tkinter as tk
from tkinter import scrolledtext, ttk, font
import json- class WebSocketApp:
- def __init__(self, root):
- self.root = root
- self.root.title("WebSocket 客户端")
- self.ws = None
- # 设置窗口大小
- self.root.geometry("800x600")
- self.custom_font = font.Font(family="Courier New", size=12)
- # 1. WebSocket 地址输入框和连接按钮
- self.url_frame = ttk.LabelFrame(root, text="连接地址", padding=10)
- self.url_frame.pack(fill="x", padx=10, pady=10)
- self.url_entry = tk.Entry(self.url_frame, font=self.custom_font)
- self.url_entry.insert(0, "ws://127.0.0.1:8000/ws")
- self.url_entry.pack(side="left", fill="x", expand=True, padx=5)
- self.connect_btn = tk.Button(self.url_frame, text="开始连接", command=self.connect, font=self.custom_font)
- self.connect_btn.pack(side="left", padx=5)
- self.close_btn = tk.Button(self.url_frame, text="关闭连接", command=self.close, font=self.custom_font)
- self.close_btn.pack(side="left", padx=5)
- # 2. 发送区:文本域 + 按钮
- self.send_frame = ttk.LabelFrame(root, text="发送区", padding=10)
- self.send_frame.pack(fill="both", padx=10, pady=10, expand=True)
- self.send_text = scrolledtext.ScrolledText(
- self.send_frame,
- width=60,
- height=8,
- font=self.custom_font,
- wrap=tk.WORD,
- spacing1=2,
- spacing2=2,
- spacing3=2,
- undo=True # 启用撤销功能
- )
- self.send_text.pack(side="left", fill="both", expand=True, padx=(0, 5))
- # 绑定 Ctrl+Z 和 Ctrl+Y 快捷键
- self.send_text.bind('', lambda event: self.send_text.edit_undo())
- self.send_text.bind('', lambda event: self.send_text.edit_redo())
- self.send_text.bind('', lambda event: self.send_text.tag_add(tk.SEL, "1.0", tk.END) or "break")
- self.send_btn_frame = tk.Frame(self.send_frame)
- self.send_btn_frame.pack(side="right", fill="y")
- self.format_btn = tk.Button(self.send_btn_frame, text="格式化", command=self.format_text, font=self.custom_font)
- self.format_btn.pack(pady=5)
- self.send_btn = tk.Button(self.send_btn_frame, text="发送", command=self.send_message, font=self.custom_font)
- self.send_btn.pack(pady=5)
- self.clear_send_btn = tk.Button(self.send_btn_frame, text="清空", command=self.clear_send, font=self.custom_font)
- self.clear_send_btn.pack(pady=5)
- # 3. 显示区:内容展示框 + 清空按钮
- self.display_frame = ttk.LabelFrame(root, text="显示区", padding=10)
- self.display_frame.pack(fill="both", padx=10, pady=10, expand=True)
- self.recv_text = scrolledtext.ScrolledText(
- self.display_frame,
- width=60,
- height=15,
- font=self.custom_font,
- wrap=tk.WORD
- )
- self.recv_text.pack(side="left", fill="both", expand=True, padx=(0, 5))
- self.clear_display_btn = tk.Button(self.display_frame, text="清空", command=self.clear_display, font=self.custom_font)
- self.clear_display_btn.pack(side="right", pady=5)
- def on_message(self, ws, message):
- self.recv_text.insert(tk.END, f"收到消息: {message}\n")
- self.recv_text.see(tk.END)
- def on_error(self, ws, error):
- self.recv_text.insert(tk.END, f"发生错误: {error}\n")
- self.recv_text.see(tk.END)
- def on_close(self, ws, close_status_code, close_msg):
- self.recv_text.insert(tk.END, "连接已关闭\n")
- self.recv_text.see(tk.END)
- def on_open(self, ws):
- self.recv_text.insert(tk.END, "连接已建立\n")
- self.recv_text.see(tk.END)
- def connect(self):
- try:
- ws_url = self.url_entry.get()
- if not ws_url:
- self.recv_text.insert(tk.END, "请输入 WebSocket 地址\n")
- return
- self.ws = websocket.WebSocketApp(ws_url,
- on_open=self.on_open,
- on_message=self.on_message,
- on_error=self.on_error,
- on_close=self.on_close)
- ws_thread = threading.Thread(target=self.ws.run_forever)
- ws_thread.daemon = True
- ws_thread.start()
- except Exception as e:
- self.recv_text.insert(tk.END, f"连接失败: {e}\n")
- def format_text(self):
- try:
- # self.send_text.edit_undo(False) # 禁用撤销
- text = self.send_text.get("1.0", tk.END).strip()
- if not text:
- return
- try:
- json_obj = json.loads(text)
- formatted_text = json.dumps(json_obj, indent=2, ensure_ascii=False)
- self.send_text.delete("1.0", tk.END)
- self.send_text.insert("1.0", formatted_text)
- except json.JSONDecodeError:
- cleaned_text = " ".join(text.split())
- self.send_text.delete("1.0", tk.END)
- self.send_text.insert("1.0", cleaned_text)
- except Exception as e:
- self.recv_text.insert(tk.END, f"格式化失败: {e}\n")
- def send_message(self):
- if self.ws:
- message = self.send_text.get("1.0", tk.END).strip()
- if message:
- self.ws.send(message)
- self.recv_text.insert(tk.END, f"已发送消息: {message}\n")
- self.recv_text.see(tk.END)
- else:
- self.recv_text.insert(tk.END, "发送内容不能为空\n")
- def close(self):
- if self.ws:
- self.ws.close()
- self.ws = None
- def clear_send(self):
- self.send_text.delete("1.0", tk.END)
- def clear_display(self):
- self.recv_text.delete("1.0", tk.END)
- if __name__ == "__main__":
- root = tk.Tk()
- app = WebSocketApp(root)
- root.mainloop()
复制代码 |