Termux IRC Client SP
Jump to navigation
Jump to search
first version
irc_sp.py
https://x.com/i/grok?conversation=1918094079356731464
import socket import ssl import threading import sys import time import queue from datetime import datetime import readline import hashlib class IRCClient: def __init__(self): self.server = "irc.rizon.net" self.port = 6697 self.channels = ["#/sp/"] # Only #/sp/ channel self.active_channel = self.channels[0] self.nickname = "LullSac" self.realname = "Termux IRC Client for #/sp/" self.irc = None self.context = ssl.create_default_context() self.running = True self.message_queue = queue.Queue() self.lock = threading.Lock() self.reconnect_delay = 5 self.max_reconnect_delay = 300 # Initialize nicklists: dictionary mapping channels to sets of nicks self.nicklists = {channel: set() for channel in self.channels} # Define ANSI color codes for nicknames self.nick_colors = [ "\033[31m", # Red "\033[32m", # Green "\033[33m", # Yellow "\033[34m", # Blue "\033[35m", # Magenta "\033[36m", # Cyan "\033[91m", # Bright Red "\033[92m", # Bright Green ] self.color_reset = "\033[0m" def get_colored_nick(self, nick): """Return the nickname wrapped in a consistent ANSI color based on its hash.""" if not nick: return nick # Hash the nickname to get a consistent index hash_value = int(hashlib.md5(nick.encode('utf-8')).hexdigest(), 16) color_index = hash_value % len(self.nick_colors) color = self.nick_colors[color_index] return f"{color}{nick}{self.color_reset}" def connect(self): try: raw_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.irc = self.context.wrap_socket(raw_socket, server_hostname=self.server) print(f"{self.get_timestamp()} Connecting to {self.server}:{self.port}...") self.irc.connect((self.server, self.port)) self.send(f"USER {self.nickname} 0 * :{self.realname}") self.send(f"NICK {self.nickname}") return True except Exception as e: print(f"{self.get_timestamp()} Connection error: {e}") return False def send(self, message): try: self.irc.send(f"{message}\r\n".encode('utf-8')) except Exception as e: print(f"{self.get_timestamp()} Error sending message: {e}") def get_timestamp(self): timestamp = datetime.now().strftime("%I:%M %p") return f"\033[34m[{timestamp}]\033[0m" def complete_nick(self, text, state): """Tab completion function for nicknames.""" with self.lock: # Get the nicklist for the active channel nicklist = self.nicklists.get(self.active_channel, set()) if self.active_channel else set() # Find matches: nicks starting with the input text (case-insensitive) matches = [nick for nick in nicklist if nick.lower().startswith(text.lower())] # Return the state-th match, or None if no more matches return matches[state] if state < len(matches) else None def handle_input(self): # Set up readline tab completion readline.set_completer(self.complete_nick) readline.parse_and_bind("tab: complete") while self.running: try: with self.lock: prompt = f"[{self.active_channel}] > " if self.active_channel else "[No channel] > " sys.stdout.write(prompt) sys.stdout.flush() message = input().strip() if not self.running: break if message: self.message_queue.put(message) except KeyboardInterrupt: self.message_queue.put("QUIT :Goodbye") break except Exception as e: print(f"{self.get_timestamp()} Input error: {e}") def handle_server(self): while self.running: try: data = self.irc.recv(2048).decode('utf-8', errors='ignore') if not data: print(f"{self.get_timestamp()} Disconnected from server.") self.reconnect() break for line in data.strip().split('\r\n'): if not line: continue # Define messages to handle specially handled_messages = ["PRIVMSG", "JOIN", "PART", "PING", "376", "422", "433", "332", "352", "QUIT"] # Check if the message is handled by examining the command field parts = line.split() is_handled = False command = None if len(parts) >= 2: command = parts[1] if parts[0].startswith(':') else parts[0] is_handled = command in handled_messages # Only print raw line if not handled if not is_handled: print(f"{self.get_timestamp()} {line}") # Handle PING silently if line.startswith("PING"): pong_response = f"PONG {line.split()[1]}" self.send(pong_response) continue # Skip further processing to avoid printing # Join channels after MOTD and request WHO if "376" in line or "422" in line: with self.lock: for channel in self.channels: self.send(f"JOIN {channel}") print(f"{self.get_timestamp()} Joined {channel}") # Request WHO to populate nicklist self.send(f"WHO {channel}") self.active_channel = self.channels[0] if self.channels else None # Handle nick in use if "433" in line: self.nickname = f"{self.nickname}_" self.send(f"NICK {self.nickname}") print(f"{self.get_timestamp()} Nick in use, trying {self.get_colored_nick(self.nickname)}") # Parse PRIVMSG if "PRIVMSG" in line: try: sender = line[1:line.index('!')] target = line.split()[2] msg = line[line.index(':', 1) + 1:] colored_sender = self.get_colored_nick(sender) if msg.startswith('\x01ACTION '): msg = msg[8:-1] print(f"{self.get_timestamp()} * {colored_sender}@{target} {msg}") elif msg.startswith('\x01'): print(f"{self.get_timestamp()} CTCP from {colored_sender}: {msg}") else: print(f"{self.get_timestamp()} <{colored_sender}@{target}> {msg}") except ValueError: print(f"{self.get_timestamp()} Malformed PRIVMSG: {line}") # Handle channel join if "JOIN" in line: try: sender = line[1:line.index('!')] if '!' in line else None channel = line.split(':', 2)[-1] colored_sender = self.get_colored_nick(sender) with self.lock: if sender == self.nickname: if channel not in self.channels: self.channels.append(channel) self.nicklists[channel] = set() print(f"{self.get_timestamp()} Added {channel} to active channels.") if not self.active_channel: self.active_channel = channel # Request WHO to populate nicklist self.send(f"WHO {channel}") else: # Add joining user to nicklist if channel in self.nicklists: self.nicklists[channel].add(sender) print(f"{self.get_timestamp()} {colored_sender} joined {channel}") except ValueError: print(f"{self.get_timestamp()} Malformed JOIN: {line}") # Handle parting if "PART" in line: try: sender = line[1:line.index('!')] if '!' in line else None channel = line.split()[2] colored_sender = self.get_colored_nick(sender) with self.lock: if sender == self.nickname: if channel in self.channels: self.channels.remove(channel) self.nicklists.pop(channel, None) print(f"{self.get_timestamp()} Left {channel}") if self.active_channel == channel: self.active_channel = self.channels[0] if self.channels else None print(f"{self.get_timestamp()} Active channel switched to: {self.active_channel or 'None'}") else: # Remove parting user from nicklist if channel in self.nicklists and sender in self.nicklists[channel]: self.nicklists[channel].remove(sender) print(f"{self.get_timestamp()} {colored_sender} left {channel}") except ValueError: print(f"{self.get_timestamp()} Malformed PART: {line}") # Handle QUIT if "QUIT" in line: try: sender = line[1:line.index('!')] if '!' in line else None colored_sender = self.get_colored_nick(sender) with self.lock: # Remove user from all channel nicklists for channel in self.nicklists: if sender in self.nicklists[channel]: self.nicklists[channel].remove(sender) print(f"{self.get_timestamp()} {colored_sender} quit") except ValueError: print(f"{self.get_timestamp()} Malformed QUIT: {line}") # Handle topic if "332" in line: try: channel = line.split()[3] topic = line[line.index(':', 1) + 1:] print(f"{self.get_timestamp()} Topic for {channel}: {topic}") except (IndexError, ValueError): print(f"{self.get_timestamp()} Malformed TOPIC (332): {line}") # Handle WHO response if "352" in line: try: parts = line.split() if len(parts) >= 8: channel, user, host, nick = parts[3], parts[4], parts[5], parts[7] colored_nick = self.get_colored_nick(nick) with self.lock: if channel in self.nicklists: self.nicklists[channel].add(nick) print(f"{self.get_timestamp()} {colored_nick} ({user}@{host}) in {channel}") else: print(f"{self.get_timestamp()} Malformed WHO reply (352): {line}") except (IndexError, ValueError): print(f"{self.get_timestamp()} Malformed WHO reply (352): {line}") except Exception as e: print(f"{self.get_timestamp()} Server error: {e}") self.reconnect() break def reconnect(self): self.running = False if self.irc: try: self.irc.close() except: pass self.irc = None attempts = 0 while not self.running and attempts < 5: delay = min(self.reconnect_delay * (2 ** attempts), self.max_reconnect_delay) print(f"{self.get_timestamp()} Reconnecting in {delay} seconds...") time.sleep(delay) self.running = True if self.connect(): print(f"{self.get_timestamp()} Reconnected successfully.") return attempts += 1 print(f"{self.get_timestamp()} Failed to reconnect after {attempts} attempts.") self.running = False def process_commands(self): while self.running: try: message = self.message_queue.get(timeout=1) if message.lower() == "quit": self.send("QUIT :Goodbye") self.running = False elif message.startswith('/'): self.handle_command(message) else: with self.lock: if self.active_channel: self.send(f"PRIVMSG {self.active_channel} :{message}") else: print(f"{self.get_timestamp()} No active channel. Use /join <channel> or /switch <index>.") except queue.Empty: continue except Exception as e: print(f"{self.get_timestamp()} Command error: {e}") def handle_command(self, command): parts = command.split(maxsplit=2) cmd = parts[0].lower() if cmd == "/nick" and len(parts) > 1: self.nickname = parts[1] self.send(f"NICK {self.nickname}") elif cmd == "/join" and len(parts) > 1: channel = parts[1] self.send(f"JOIN {channel}") with self.lock: if channel not in self.channels: self.channels.append(channel) self.nicklists[channel] = set() print(f"{self.get_timestamp()} Requested to join {channel}") elif cmd == "/msg" and len(parts) > 2: target = parts[1] msg = parts[2] self.send(f"PRIVMSG {target} :{msg}") elif cmd == "/me" and len(parts) > 1: with self.lock: if self.active_channel: msg = parts[1] self.send(f"PRIVMSG {self.active_channel} :\x01ACTION {msg}\x01") else: print(f"{self.get_timestamp()} No active channel.") elif cmd == "/list": with self.lock: if self.channels: print(f"{self.get_timestamp()} Active channels:") for i, channel in enumerate(self.channels): mark = "*" if channel == self.active_channel else " " print(f"{self.get_timestamp()} {i}: {channel}{mark}") else: print(f"{self.get_timestamp()} No channels joined.") elif cmd == "/part" and len(parts) > 1: channel = parts[1] self.send(f"PART {channel}") elif cmd == "/switch" and len(parts) > 1: try: index = int(parts[1]) with self.lock: if 0 <= index < len(self.channels): self.active_channel = self.channels[index] print(f"{self.get_timestamp()} Switched to active channel: {self.active_channel}") else: print(f"{self.get_timestamp()} Invalid index. Use /list to see channels.") except ValueError: print(f"{self.get_timestamp()} Invalid index. Use a number (e.g., /switch 0).") elif cmd == "/who" and len(parts) > 1: self.send(f"WHO {parts[1]}") elif cmd == "/topic" and len(parts) > 1: channel = parts[1] if len(parts) > 2: self.send(f"TOPIC {channel} :{parts[2]}") else: self.send(f"TOPIC {channel}") elif cmd == "/clear": print("\033[H\033[2J", end="") elif cmd == "/help": print(f"{self.get_timestamp()} Commands: /nick <nick>, /join <channel>, /msg <target> <msg>, /me <action>, /list, /part <channel>, /switch <index>, /who <channel>, /topic <channel> [new topic], /clear, /help, quit") else: print(f"{self.get_timestamp()} Unknown command or invalid syntax. Use /help.") def run(self): if not self.connect(): return server_thread = threading.Thread(target=self.handle_server) input_thread = threading.Thread(target=self.handle_input) command_thread = threading.Thread(target=self.process_commands) server_thread.start() input_thread.start() command_thread.start() try: server_thread.join() input_thread.join() command_thread.join() except KeyboardInterrupt: self.running = False if self.irc: try: self.irc.close() except: pass print(f"{self.get_timestamp()} Connection closed.") def main(): client = IRCClient() client.run() if __name__ == "__main__": main()
second version
That first version worked for a day and then started having nicklist and buffer issues. This new version should have taken care of those problems.
import socket import ssl import threading import sys import time import queue from datetime import datetime import readline import hashlib class IRCClient: def __init__(self): self.server = "irc.rizon.net" self.port = 6697 self.channels = ["#/sp/"] # Only #/sp/ channel self.active_channel = self.channels[0] self.nickname = "LullSac" self.realname = "Termux IRC Client for #/sp/" self.irc = None self.context = ssl.create_default_context() self.running = True self.message_queue = queue.Queue() self.lock = threading.Lock() self.reconnect_delay = 5 self.max_reconnect_delay = 300 # Initialize nicklists with proper synchronization self.nicklists = {channel: set() for channel in self.channels} # ANSI color codes for nicknames self.nick_colors = [ "\033[31m", "\033[32m", "\033[33m", "\033[34m", "\033[35m", "\033[36m", "\033[91m", "\033[92m" ] self.color_reset = "\033[0m" def get_colored_nick(self, nick): """Return colored nickname based on consistent hash.""" if not nick: return nick hash_value = int(hashlib.md5(nick.encode('utf-8')).hexdigest(), 16) color_index = hash_value % len(self.nick_colors) return f"{self.nick_colors[color_index]}{nick}{self.color_reset}" def connect(self): try: raw_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.irc = self.context.wrap_socket(raw_socket, server_hostname=self.server) self.irc.connect((self.server, self.port)) self.send(f"USER {self.nickname} 0 * :{self.realname}") self.send(f"NICK {self.nickname}") return True except Exception as e: print(f"{self.get_timestamp()} Connection error: {e}") return False def send(self, message): try: self.irc.send(f"{message}\r\n".encode('utf-8')) except Exception as e: print(f"{self.get_timestamp()} Error sending message: {e}") def get_timestamp(self): return f"\033[34m[{datetime.now().strftime('%I:%M %p')}]\033[0m" def handle_server(self): buffer = "" while self.running: try: data = self.irc.recv(2048).decode('utf-8', errors='ignore') if not data: raise Exception("Connection closed by server") buffer += data lines = buffer.split('\r\n') buffer = lines.pop() # Keep incomplete line in buffer for line in lines: if not line.strip(): continue # Handle PING immediately if line.startswith("PING"): self.send(f"PONG {line.split()[1]}") continue # Handle server responses if "PRIVMSG" in line: self.handle_privmsg(line) elif "JOIN" in line: self.handle_join(line) elif "PART" in line: self.handle_part(line) elif "QUIT" in line: self.handle_quit(line) elif "352" in line: # WHO response self.handle_who(line) elif "376" in line or "422" in line: # End of MOTD self.join_channels() elif "433" in line: # Nick in use self.handle_nick_in_use() elif "332" in line: # TOPIC self.handle_topic(line) else: print(f"{self.get_timestamp()} {line}") except Exception as e: print(f"{self.get_timestamp()} Server error: {e}") self.reconnect() break def handle_privmsg(self, line): try: sender = line[1:line.index('!')] target = line.split()[2] msg = line[line.index(':', 1) + 1:] colored_sender = self.get_colored_nick(sender) if msg.startswith('\x01ACTION '): print(f"{self.get_timestamp()} * {colored_sender} {msg[8:-1]}") elif msg.startswith('\x01'): print(f"{self.get_timestamp()} CTCP from {colored_sender}: {msg}") else: print(f"{self.get_timestamp()} <{colored_sender}> {msg}") except ValueError: print(f"{self.get_timestamp()} Malformed PRIVMSG: {line}") def handle_join(self, line): try: sender = line[1:line.index('!')] channel = line.split(':', 2)[-1].strip() colored_sender = self.get_colored_nick(sender) with self.lock: if sender == self.nickname: if channel not in self.channels: self.channels.append(channel) self.nicklists[channel] = set() print(f"{self.get_timestamp()} Joined {channel}") self.send(f"WHO {channel}") else: if channel in self.nicklists: self.nicklists[channel].add(sender) print(f"{self.get_timestamp()} {colored_sender} joined {channel}") except ValueError: print(f"{self.get_timestamp()} Malformed JOIN: {line}") def handle_part(self, line): try: sender = line[1:line.index('!')] channel = line.split()[2] colored_sender = self.get_colored_nick(sender) with self.lock: if sender == self.nickname: if channel in self.channels: self.channels.remove(channel) self.nicklists.pop(channel, None) print(f"{self.get_timestamp()} Left {channel}") if self.active_channel == channel: self.active_channel = self.channels[0] if self.channels else None else: if channel in self.nicklists and sender in self.nicklists[channel]: self.nicklists[channel].remove(sender) print(f"{self.get_timestamp()} {colored_sender} left {channel}") except ValueError: print(f"{self.get_timestamp()} Malformed PART: {line}") def handle_quit(self, line): try: sender = line[1:line.index('!')] colored_sender = self.get_colored_nick(sender) with self.lock: for channel in self.nicklists: if sender in self.nicklists[channel]: self.nicklists[channel].remove(sender) print(f"{self.get_timestamp()} {colored_sender} quit") except ValueError: print(f"{self.get_timestamp()} Malformed QUIT: {line}") def handle_who(self, line): try: parts = line.split() if len(parts) >= 8: channel = parts[3] nick = parts[7] with self.lock: if channel in self.nicklists: self.nicklists[channel].add(nick) except (IndexError, ValueError): print(f"{self.get_timestamp()} Malformed WHO reply: {line}") def join_channels(self): with self.lock: for channel in self.channels: self.send(f"JOIN {channel}") self.send(f"WHO {channel}") def handle_nick_in_use(self): self.nickname = f"{self.nickname}_" self.send(f"NICK {self.nickname}") print(f"{self.get_timestamp()} Nick in use, trying {self.get_colored_nick(self.nickname)}") def handle_topic(self, line): try: channel = line.split()[3] topic = line[line.index(':', 1) + 1:] print(f"{self.get_timestamp()} Topic for {channel}: {topic}") except (IndexError, ValueError): print(f"{self.get_timestamp()} Malformed TOPIC: {line}") def reconnect(self): self.running = False if self.irc: try: self.irc.close() except: pass self.irc = None attempts = 0 while attempts < 5: delay = min(self.reconnect_delay * (2 ** attempts), self.max_reconnect_delay) print(f"{self.get_timestamp()} Reconnecting in {delay} seconds...") time.sleep(delay) self.running = True if self.connect(): print(f"{self.get_timestamp()} Reconnected successfully.") return attempts += 1 print(f"{self.get_timestamp()} Failed to reconnect after {attempts} attempts.") self.running = False def handle_input(self): readline.set_completer(self.complete_nick) readline.parse_and_bind("tab: complete") while self.running: try: with self.lock: prompt = f"[{self.active_channel}] > " if self.active_channel else "[No channel] > " sys.stdout.write(prompt) sys.stdout.flush() message = input().strip() if not self.running: break if message: self.message_queue.put(message) except KeyboardInterrupt: self.message_queue.put("QUIT :Goodbye") break except Exception as e: print(f"{self.get_timestamp()} Input error: {e}") def complete_nick(self, text, state): """Tab completion for nicknames in active channel.""" with self.lock: if not self.active_channel: return None nicklist = self.nicklists.get(self.active_channel, set()) matches = [nick for nick in nicklist if nick.lower().startswith(text.lower())] return matches[state] if state < len(matches) else None def process_commands(self): while self.running: try: message = self.message_queue.get(timeout=1) if message.lower() == "quit": self.send("QUIT :Goodbye") self.running = False elif message.startswith('/'): self.handle_command(message) else: with self.lock: if self.active_channel: self.send(f"PRIVMSG {self.active_channel} :{message}") else: print(f"{self.get_timestamp()} No active channel. Use /join <channel>") except queue.Empty: continue except Exception as e: print(f"{self.get_timestamp()} Command error: {e}") def handle_command(self, command): parts = command.split(maxsplit=2) cmd = parts[0].lower() if cmd == "/nick" and len(parts) > 1: self.nickname = parts[1] self.send(f"NICK {self.nickname}") elif cmd == "/join" and len(parts) > 1: channel = parts[1] self.send(f"JOIN {channel}") with self.lock: if channel not in self.channels: self.channels.append(channel) self.nicklists[channel] = set() print(f"{self.get_timestamp()} Requested to join {channel}") elif cmd == "/msg" and len(parts) > 2: target = parts[1] msg = parts[2] self.send(f"PRIVMSG {target} :{msg}") elif cmd == "/me" and len(parts) > 1: with self.lock: if self.active_channel: msg = parts[1] self.send(f"PRIVMSG {self.active_channel} :\x01ACTION {msg}\x01") else: print(f"{self.get_timestamp()} No active channel.") elif cmd == "/list": with self.lock: if self.channels: print(f"{self.get_timestamp()} Active channels:") for i, channel in enumerate(self.channels): mark = "*" if channel == self.active_channel else " " print(f"{self.get_timestamp()} {i}: {channel}{mark}") else: print(f"{self.get_timestamp()} No channels joined.") elif cmd == "/part" and len(parts) > 1: channel = parts[1] self.send(f"PART {channel}") elif cmd == "/switch" and len(parts) > 1: try: index = int(parts[1]) with self.lock: if 0 <= index < len(self.channels): self.active_channel = self.channels[index] print(f"{self.get_timestamp()} Switched to active channel: {self.active_channel}") else: print(f"{self.get_timestamp()} Invalid index. Use /list to see channels.") except ValueError: print(f"{self.get_timestamp()} Invalid index. Use a number (e.g., /switch 0).") elif cmd == "/who" and len(parts) > 1: self.send(f"WHO {parts[1]}") elif cmd == "/topic" and len(parts) > 1: channel = parts[1] if len(parts) > 2: self.send(f"TOPIC {channel} :{parts[2]}") else: self.send(f"TOPIC {channel}") elif cmd == "/clear": print("\033[H\033[2J", end="") elif cmd == "/help": print(f"{self.get_timestamp()} Commands: /nick <nick>, /join <channel>, /msg <target> <msg>, /me <action>, /list, /part <channel>, /switch <index>, /who <channel>, /topic <channel> [new topic], /clear, /help, quit") else: print(f"{self.get_timestamp()} Unknown command or invalid syntax. Use /help.") def run(self): if not self.connect(): return server_thread = threading.Thread(target=self.handle_server) input_thread = threading.Thread(target=self.handle_input) command_thread = threading.Thread(target=self.process_commands) server_thread.start() input_thread.start() command_thread.start() try: server_thread.join() input_thread.join() command_thread.join() except KeyboardInterrupt: self.running = False if self.irc: try: self.irc.close() except: pass print(f"{self.get_timestamp()} Connection closed.") def main(): client = IRCClient() client.run() if __name__ == "__main__": main()
third version
This version added timestamps to chat, action, and private messages for the user. Do not use. This version does add timestamps, but it doubles up your chats in the buffer. Pick this up another time.
https://deepsite.site/chat/m37mtoma7frlmc
https://x.com/i/grok?conversation=1918464626129277043
import socket import ssl import threading import sys import time import queue from datetime import datetime import readline import hashlib class IRCClient: def __init__(self): self.server = "irc.rizon.net" self.port = 6697 self.channels = ["#/sp/"] # Only #/sp/ channel self.active_channel = self.channels[0] self.nickname = "LullSac" self.realname = "Termux IRC Client for #/sp/" self.irc = None self.context = ssl.create_default_context() self.running = True self.message_queue = queue.Queue() self.lock = threading.Lock() self.reconnect_delay = 5 self.max_reconnect_delay = 300 # Initialize nicklists: dictionary mapping channels to sets of nicks self.nicklists = {channel: set() for channel in self.channels} # Define ANSI color codes for nicknames self.nick_colors = [ "\033[31m", # Red "\033[32m", # Green "\033[33m", # Yellow "\033[34m", # Blue "\033[35m", # Magenta "\033[36m", # Cyan "\033[91m", # Bright Red "\033[92m", # Bright Green ] self.color_reset = "\033[0m" def get_colored_nick(self, nick): """Return the nickname wrapped in a consistent ANSI color based on its hash.""" if not nick: return nick # Hash the nickname to get a consistent index hash_value = int(hashlib.md5(nick.encode('utf-8')).hexdigest(), 16) color_index = hash_value % len(self.nick_colors) color = self.nick_colors[color_index] return f"{color}{nick}{self.color_reset}" def connect(self): try: raw_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.irc = self.context.wrap_socket(raw_socket, server_hostname=self.server) print(f"{self.get_timestamp()} Connecting to {self.server}:{self.port}...") self.irc.connect((self.server, self.port)) self.send(f"USER {self.nickname} 0 * :{self.realname}") self.send(f"NICK {self.nickname}") return True except Exception as e: print(f"{self.get_timestamp()} Connection error: {e}") return False def send(self, message): try: self.irc.send(f"{message}\r\n".encode('utf-8')) except Exception as e: print(f"{self.get_timestamp()} Error sending message: {e}") def get_timestamp(self): """Return blue timestamp in format [HH:MM AM/PM]""" timestamp = datetime.now().strftime("%I:%M %p") return f"\033[34m[{timestamp}]\033[0m" def complete_nick(self, text, state): """Tab completion function for nicknames.""" with self.lock: # Get the nicklist for the active channel nicklist = self.nicklists.get(self.active_channel, set()) if self.active_channel else set() # Find matches: nicks starting with the input text (case-insensitive) matches = [nick for nick in nicklist if nick.lower().startswith(text.lower())] # Return the state-th match, or None if no more matches return matches[state] if state < len(matches) else None def handle_input(self): # Set up readline tab completion readline.set_completer(self.complete_nick) readline.parse_and_bind("tab: complete") while self.running: try: with self.lock: prompt = f"[{self.active_channel}] > " if self.active_channel else "[No channel] > " sys.stdout.write(prompt) sys.stdout.flush() message = input().strip() if not self.running: break if message: self.message_queue.put(message) except KeyboardInterrupt: self.message_queue.put("QUIT :Goodbye") break except Exception as e: print(f"{self.get_timestamp()} Input error: {e}") def handle_server(self): buffer = "" while self.running: try: data = self.irc.recv(2048).decode('utf-8', errors='ignore') if not data: print(f"{self.get_timestamp()} Disconnected from server.") self.reconnect() break buffer += data lines = buffer.split('\r\n') buffer = lines.pop() # Keep incomplete line in buffer for line in lines: if not line.strip(): continue # Handle PING immediately if line.startswith("PING"): self.send(f"PONG {line.split()[1]}") continue # Handle server responses if "PRIVMSG" in line: self.handle_privmsg(line) elif "JOIN" in line: self.handle_join(line) elif "PART" in line: self.handle_part(line) elif "QUIT" in line: self.handle_quit(line) elif "352" in line: # WHO response self.handle_who(line) elif "376" in line or "422" in line: # End of MOTD self.join_channels() elif "433" in line: # Nick in use self.handle_nick_in_use() elif "332" in line: # TOPIC self.handle_topic(line) else: print(f"{self.get_timestamp()} {line}") except Exception as e: print(f"{self.get_timestamp()} Server error: {e}") self.reconnect() break def handle_privmsg(self, line): try: sender = line[1:line.index('!')] # Skip if this is our own message (we already printed it locally) if sender == self.nickname: return target = line.split()[2] msg = line[line.index(':', 1) + 1:] colored_sender = self.get_colored_nick(sender) if msg.startswith('\x01ACTION '): msg = msg[8:-1] print(f"{self.get_timestamp()} * {colored_sender}@{target} {msg}") elif msg.startswith('\x01'): print(f"{self.get_timestamp()} CTCP from {colored_sender}: {msg}") else: print(f"{self.get_timestamp()} <{colored_sender}@{target}> {msg}") except ValueError: print(f"{self.get_timestamp()} Malformed PRIVMSG: {line}") def handle_join(self, line): try: sender = line[1:line.index('!')] channel = line.split(':', 2)[-1].strip() colored_sender = self.get_colored_nick(sender) with self.lock: if sender == self.nickname: if channel not in self.channels: self.channels.append(channel) self.nicklists[channel] = set() print(f"{self.get_timestamp()} Joined {channel}") self.send(f"WHO {channel}") else: if channel in self.nicklists: self.nicklists[channel].add(sender) print(f"{self.get_timestamp()} {colored_sender} joined {channel}") except ValueError: print(f"{self.get_timestamp()} Malformed JOIN: {line}") def handle_part(self, line): try: sender = line[1:line.index('!')] channel = line.split()[2] colored_sender = self.get_colored_nick(sender) with self.lock: if sender == self.nickname: if channel in self.channels: self.channels.remove(channel) self.nicklists.pop(channel, None) print(f"{self.get_timestamp()} Left {channel}") if self.active_channel == channel: self.active_channel = self.channels[0] if self.channels else None print(f"{self.get_timestamp()} Active channel switched to: {self.active_channel or 'None'}") else: if channel in self.nicklists and sender in self.nicklists[channel]: self.nicklists[channel].remove(sender) print(f"{self.get_timestamp()} {colored_sender} left {channel}") except ValueError: print(f"{self.get_timestamp()} Malformed PART: {line}") def handle_quit(self, line): try: sender = line[1:line.index('!')] if '!' in line else None colored_sender = self.get_colored_nick(sender) with self.lock: # Remove user from all channel nicklists for channel in self.nicklists: if sender in self.nicklists[channel]: self.nicklists[channel].remove(sender) print(f"{self.get_timestamp()} {colored_sender} quit") except ValueError: print(f"{self.get_timestamp()} Malformed QUIT: {line}") def handle_who(self, line): try: parts = line.split() if len(parts) >= 8: channel = parts[3] nick = parts[7] with self.lock: if channel in self.nicklists: self.nicklists[channel].add(nick) print(f"{self.get_timestamp()} {self.get_colored_nick(nick)} joined {channel}") else: print(f"{self.get_timestamp()} Malformed WHO reply (352): {line}") except (IndexError, ValueError): print(f"{self.get_timestamp()} Malformed WHO reply (352): {line}") def handle_topic(self, line): try: channel = line.split()[3] topic = line[line.index(':', 1) + 1:] print(f"{self.get_timestamp()} Topic for {channel}: {topic}") except (IndexError, ValueError): print(f"{self.get_timestamp()} Malformed TOPIC (332): {line}") def handle_nick_in_use(self): self.nickname = f"{self.nickname}_" self.send(f"NICK {self.nickname}") print(f"{self.get_timestamp()} Nick in use, trying {self.get_colored_nick(self.nickname)}") def join_channels(self): with self.lock: for channel in self.channels: self.send(f"JOIN {channel}") print(f"{self.get_timestamp()} Joined {channel}") # Request WHO to populate nicklist self.send(f"WHO {channel}") self.active_channel = self.channels[0] if self.channels else None def reconnect(self): self.running = False if self.irc: try: self.irc.close() except: pass self.irc = None attempts = 0 while not self.running and attempts < 5: delay = min(self.reconnect_delay * (2 ** attempts), self.max_reconnect_delay) print(f"{self.get_timestamp()} Reconnecting in {delay} seconds...") time.sleep(delay) self.running = True if self.connect(): print(f"{self.get_timestamp()} Reconnected successfully.") return attempts += 1 print(f"{self.get_timestamp()} Failed to reconnect after {attempts} attempts.") self.running = False def process_commands(self): while self.running: try: message = self.message_queue.get(timeout=1) if message.lower() == "quit": self.send("QUIT :Goodbye") self.running = False elif message.startswith('/'): self.handle_command(message) else: with self.lock: if self.active_channel: # Only print our message locally (with timestamp) colored_nick = self.get_colored_nick(self.nickname) print(f"{self.get_timestamp()} <{colored_nick}> {message}") # Send to server but don't print the echo self.send(f"PRIVMSG {self.active_channel} :{message}") else: print(f"{self.get_timestamp()} No active channel. Use /join <channel> or /switch <index>.") except queue.Empty: continue except Exception as e: print(f"{self.get_timestamp()} Command error: {e}") def handle_command(self, command): parts = command.split(maxsplit=2) cmd = parts[0].lower() if cmd == "/nick" and len(parts) > 1: self.nickname = parts[1] self.send(f"NICK {self.nickname}") print(f"{self.get_timestamp()} Changed nick to {self.get_colored_nick(self.nickname)}") elif cmd == "/join" and len(parts) > 1: channel = parts[1] self.send(f"JOIN {channel}") with self.lock: if channel not in self.channels: self.channels.append(channel) self.nicklists[channel] = set() print(f"{self.get_timestamp()} Joined {channel}") if not self.active_channel: self.active_channel = channel self.send(f"WHO {channel}") else: print(f"{self.get_timestamp()} Already in {channel}") elif cmd == "/msg" and len(parts) > 2: target = parts[1] msg = parts[2] colored_nick = self.get_colored_nick(self.nickname) print(f"{self.get_timestamp()} -> {colored_nick}@{target}> {msg}") self.send(f"PRIVMSG {target} :{msg}") elif cmd == "/me" and len(parts) > 1: with self.lock: if self.active_channel: msg = parts[1] colored_nick = self.get_colored_nick(self.nickname) print(f"{self.get_timestamp()} * {colored_nick} {msg}") self.send(f"PRIVMSG {self.active_channel} :\x01ACTION {msg}\x01") else: print(f"{self.get_timestamp()} No active channel.") elif cmd == "/list": with self.lock: if self.channels: print(f"{self.get_timestamp()} Active channels:") for i, channel in enumerate(self.channels): mark = "*" if channel == self.active_channel else " " print(f"{self.get_timestamp()} {i}: {channel}{mark}") else: print(f"{self.get_timestamp()} No channels joined.") elif cmd == "/part" and len(parts) > 1: channel = parts[1] self.send(f"PART {channel}") elif cmd == "/switch" and len(parts) > 1: try: index = int(parts[1]) with self.lock: if 0 <= index < len(self.channels): self.active_channel = self.channels[index] print(f"{self.get_timestamp()} Switched to active channel: {self.active_channel}") else: print(f"{self.get_timestamp()} Invalid index. Use /list to see channels.") except ValueError: print(f"{self.get_timestamp()} Invalid index. Use a number (e.g., /switch 0).") elif cmd == "/who" and len(parts) > 1: self.send(f"WHO {parts[1]}") elif cmd == "/topic" and len(parts) > 1: channel = parts[1] if len(parts) > 2: self.send(f"TOPIC {channel} :{parts[2]}") else: self.send(f"TOPIC {channel}") elif cmd == "/clear": print("\033[H\033[2J", end="") elif cmd == "/help": print(f"{self.get_timestamp()} Commands: /nick <nick>, /join <channel>, /msg <target> <msg>, /me <action>, /list, /part <channel>, /switch <index>, /who <channel>, /topic <channel> [new topic], /clear, /help, quit") else: print(f"{self.get_timestamp()} Unknown command or invalid syntax. Use /help.") def run(self): if not self.connect(): return server_thread = threading.Thread(target=self.handle_server) input_thread = threading.Thread(target=self.handle_input) command_thread = threading.Thread(target=self.process_commands) server_thread.start() input_thread.start() command_thread.start() try: server_thread.join() input_thread.join() command_thread.join() except KeyboardInterrupt: self.running = False if self.irc: try: self.irc.close() except: pass print(f"{self.get_timestamp()} Connection closed.") def main(): client = IRCClient() client.run() if __name__ == "__main__": main()