Termux IRC Client GTV
Jump to navigation
Jump to search
python irc_gtv.py
import socket import ssl import threading import sys import time import queue from datetime import datetime import readline import hashlib class IRCClient: def __init__(self): self.server = "irc.rizon.net" self.port = 6697 self.channels = ["#/g/tv"] # Only #/g/tv channel self.active_channel = self.channels[0] self.nickname = "LullSac" self.realname = "Termux IRC Client for #/g/tv" self.irc = None self.context = ssl.create_default_context() self.running = True self.message_queue = queue.Queue() self.lock = threading.Lock() self.reconnect_delay = 5 self.max_reconnect_delay = 300 # Initialize nicklists: dictionary mapping channels to sets of nicks self.nicklists = {channel: set() for channel in self.channels} # Define ANSI color codes for nicknames self.nick_colors = [ "\033[31m", # Red "\033[32m", # Green "\033[33m", # Yellow "\033[34m", # Blue "\033[35m", # Magenta "\033[36m", # Cyan "\033[91m", # Bright Red "\033[92m", # Bright Green ] self.color_reset = "\033[0m" def get_colored_nick(self, nick): """Return the nickname wrapped in a consistent ANSI color based on its hash.""" if not nick: return nick # Hash the nickname to get a consistent index hash_value = int(hashlib.md5(nick.encode('utf-8')).hexdigest(), 16) color_index = hash_value % len(self.nick_colors) color = self.nick_colors[color_index] return f"{color}{nick}{self.color_reset}" def connect(self): try: raw_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.irc = self.context.wrap_socket(raw_socket, server_hostname=self.server) print(f"{self.get_timestamp()} Connecting to {self.server}:{self.port}...") self.irc.connect((self.server, self.port)) self.send(f"USER {self.nickname} 0 * :{self.realname}") self.send(f"NICK {self.nickname}") return True except Exception as e: print(f"{self.get_timestamp()} Connection error: {e}") return False def send(self, message): try: self.irc.send(f"{message}\r\n".encode('utf-8')) except Exception as e: print(f"{self.get_timestamp()} Error sending message: {e}") def get_timestamp(self): timestamp = datetime.now().strftime("%I:%M %p") return f"\033[34m[{timestamp}]\033[0m" def complete_nick(self, text, state): """Tab completion function for nicknames.""" with self.lock: # Get the nicklist for the active channel nicklist = self.nicklists.get(self.active_channel, set()) if self.active_channel else set() # Find matches: nicks starting with the input text (case-insensitive) matches = [nick for nick in nicklist if nick.lower().startswith(text.lower())] # Return the state-th match, or None if no more matches return matches[state] if state < len(matches) else None def handle_input(self): # Set up readline tab completion readline.set_completer(self.complete_nick) readline.parse_and_bind("tab: complete") while self.running: try: with self.lock: prompt = f"[{self.active_channel}] > " if self.active_channel else "[No channel] > " sys.stdout.write(prompt) sys.stdout.flush() message = input().strip() if not self.running: break if message: self.message_queue.put(message) except KeyboardInterrupt: self.message_queue.put("QUIT :Goodbye") break except Exception as e: print(f"{self.get_timestamp()} Input error: {e}") def handle_server(self): while self.running: try: data = self.irc.recv(2048).decode('utf-8', errors='ignore') if not data: print(f"{self.get_timestamp()} Disconnected from server.") self.reconnect() break for line in data.strip().split('\r\n'): if not line: continue # Define messages to handle specially handled_messages = ["PRIVMSG", "JOIN", "PART", "PING", "376", "422", "433", "332", "352", "QUIT"] # Check if the message is handled by examining the command field parts = line.split() is_handled = False command = None if len(parts) >= 2: command = parts[1] if parts[0].startswith(':') else parts[0] is_handled = command in handled_messages # Only print raw line if not handled if not is_handled: print(f"{self.get_timestamp()} {line}") # Handle PING silently if line.startswith("PING"): pong_response = f"PONG {line.split()[1]}" self.send(pong_response) continue # Skip further processing to avoid printing # Join channels after MOTD and request WHO if "376" in line or "422" in line: with self.lock: for channel in self.channels: self.send(f"JOIN {channel}") print(f"{self.get_timestamp()} Joined {channel}") # Request WHO to populate nicklist self.send(f"WHO {channel}") self.active_channel = self.channels[0] if self.channels else None # Handle nick in use if "433" in line: self.nickname = f"{self.nickname}_" self.send(f"NICK {self.nickname}") print(f"{self.get_timestamp()} Nick in use, trying {self.get_colored_nick(self.nickname)}") # Parse PRIVMSG if "PRIVMSG" in line: try: sender = line[1:line.index('!')] target = line.split()[2] msg = line[line.index(':', 1) + 1:] colored_sender = self.get_colored_nick(sender) if msg.startswith('\x01ACTION '): msg = msg[8:-1] print(f"{self.get_timestamp()} * {colored_sender}@{target} {msg}") elif msg.startswith('\x01'): print(f"{self.get_timestamp()} CTCP from {colored_sender}: {msg}") else: print(f"{self.get_timestamp()} <{colored_sender}@{target}> {msg}") except ValueError: print(f"{self.get_timestamp()} Malformed PRIVMSG: {line}") # Handle channel join if "JOIN" in line: try: sender = line[1:line.index('!')] if '!' in line else None channel = line.split(':', 2)[-1] colored_sender = self.get_colored_nick(sender) with self.lock: if sender == self.nickname: if channel not in self.channels: self.channels.append(channel) self.nicklists[channel] = set() print(f"{self.get_timestamp()} Added {channel} to active channels.") if not self.active_channel: self.active_channel = channel # Request WHO to populate nicklist self.send(f"WHO {channel}") else: # Add joining user to nicklist if channel in self.nicklists: self.nicklists[channel].add(sender) print(f"{self.get_timestamp()} {colored_sender} joined {channel}") except ValueError: print(f"{self.get_timestamp()} Malformed JOIN: {line}") # Handle parting if "PART" in line: try: sender = line[1:line.index('!')] if '!' in line else None channel = line.split()[2] colored_sender = self.get_colored_nick(sender) with self.lock: if sender == self.nickname: if channel in self.channels: self.channels.remove(channel) self.nicklists.pop(channel, None) print(f"{self.get_timestamp()} Left {channel}") if self.active_channel == channel: self.active_channel = self.channels[0] if self.channels else None print(f"{self.get_timestamp()} Active channel switched to: {self.active_channel or 'None'}") else: # Remove parting user from nicklist if channel in self.nicklists and sender in self.nicklists[channel]: self.nicklists[channel].remove(sender) print(f"{self.get_timestamp()} {colored_sender} left {channel}") except ValueError: print(f"{self.get_timestamp()} Malformed PART: {line}") # Handle QUIT if "QUIT" in line: try: sender = line[1:line.index('!')] if '!' in line else None colored_sender = self.get_colored_nick(sender) with self.lock: # Remove user from all channel nicklists for channel in self.nicklists: if sender in self.nicklists[channel]: self.nicklists[channel].remove(sender) print(f"{self.get_timestamp()} {colored_sender} quit") except ValueError: print(f"{self.get_timestamp()} Malformed QUIT: {line}") # Handle topic if "332" in line: try: channel = line.split()[3] topic = line[line.index(':', 1) + 1:] print(f"{self.get_timestamp()} Topic for {channel}: {topic}") except (IndexError, ValueError): print(f"{self.get_timestamp()} Malformed TOPIC (332): {line}") # Handle WHO response if "352" in line: try: parts = line.split() if len(parts) >= 8: channel, user, host, nick = parts[3], parts[4], parts[5], parts[7] colored_nick = self.get_colored_nick(nick) with self.lock: if channel in self.nicklists: self.nicklists[channel].add(nick) print(f"{self.get_timestamp()} {colored_nick} ({user}@{host}) in {channel}") else: print(f"{self.get_timestamp()} Malformed WHO reply (352): {line}") except (IndexError, ValueError): print(f"{self.get_timestamp()} Malformed WHO reply (352): {line}") except Exception as e: print(f"{self.get_timestamp()} Server error: {e}") self.reconnect() break def reconnect(self): self.running = False if self.irc: try: self.irc.close() except: pass self.irc = None attempts = 0 while not self.running and attempts < 5: delay = min(self.reconnect_delay * (2 ** attempts), self.max_reconnect_delay) print(f"{self.get_timestamp()} Reconnecting in {delay} seconds...") time.sleep(delay) self.running = True if self.connect(): print(f"{self.get_timestamp()} Reconnected successfully.") return attempts += 1 print(f"{self.get_timestamp()} Failed to reconnect after {attempts} attempts.") self.running = False def process_commands(self): while self.running: try: message = self.message_queue.get(timeout=1) if message.lower() == "quit": self.send("QUIT :Goodbye") self.running = False elif message.startswith('/'): self.handle_command(message) else: with self.lock: if self.active_channel: self.send(f"PRIVMSG {self.active_channel} :{message}") else: print(f"{self.get_timestamp()} No active channel. Use /join <channel> or /switch <index>.") except queue.Empty: continue except Exception as e: print(f"{self.get_timestamp()} Command error: {e}") def handle_command(self, command): parts = command.split(maxsplit=2) cmd = parts[0].lower() if cmd == "/nick" and len(parts) > 1: self.nickname = parts[1] self.send(f"NICK {self.nickname}") elif cmd == "/join" and len(parts) > 1: channel = parts[1] self.send(f"JOIN {channel}") with self.lock: if channel not in self.channels: self.channels.append(channel) self.nicklists[channel] = set() print(f"{self.get_timestamp()} Requested to join {channel}") elif cmd == "/msg" and len(parts) > 2: target = parts[1] msg = parts[2] self.send(f"PRIVMSG {target} :{msg}") elif cmd == "/me" and len(parts) > 1: with self.lock: if self.active_channel: msg = parts[1] self.send(f"PRIVMSG {self.active_channel} :\x01ACTION {msg}\x01") else: print(f"{self.get_timestamp()} No active channel.") elif cmd == "/list": with self.lock: if self.channels: print(f"{self.get_timestamp()} Active channels:") for i, channel in enumerate(self.channels): mark = "*" if channel == self.active_channel else " " print(f"{self.get_timestamp()} {i}: {channel}{mark}") else: print(f"{self.get_timestamp()} No channels joined.") elif cmd == "/part" and len(parts) > 1: channel = parts[1] self.send(f"PART {channel}") elif cmd == "/switch" and len(parts) > 1: try: index = int(parts[1]) with self.lock: if 0 <= index < len(self.channels): self.active_channel = self.channels[index] print(f"{self.get_timestamp()} Switched to active channel: {self.active_channel}") else: print(f"{self.get_timestamp()} Invalid index. Use /list to see channels.") except ValueError: print(f"{self.get_timestamp()} Invalid index. Use a number (e.g., /switch 0).") elif cmd == "/who" and len(parts) > 1: self.send(f"WHO {parts[1]}") elif cmd == "/topic" and len(parts) > 1: channel = parts[1] if len(parts) > 2: self.send(f"TOPIC {channel} :{parts[2]}") else: self.send(f"TOPIC {channel}") elif cmd == "/clear": print("\033[H\033[2J", end="") elif cmd == "/help": print(f"{self.get_timestamp()} Commands: /nick <nick>, /join <channel>, /msg <target> <msg>, /me <action>, /list, /part <channel>, /switch <index>, /who <channel>, /topic <channel> [new topic], /clear, /help, quit") else: print(f"{self.get_timestamp()} Unknown command or invalid syntax. Use /help.") def run(self): if not self.connect(): return server_thread = threading.Thread(target=self.handle_server) input_thread = threading.Thread(target=self.handle_input) command_thread = threading.Thread(target=self.process_commands) server_thread.start() input_thread.start() command_thread.start() try: server_thread.join() input_thread.join() command_thread.join() except KeyboardInterrupt: self.running = False if self.irc: try: self.irc.close() except: pass print(f"{self.get_timestamp()} Connection closed.") def main(): client = IRCClient() client.run() if __name__ == "__main__": main()