[    [    [    [    [    [    [    b l o g g i n   s p a c e    wiki   ]    ]    ]    ]    ]    ]    ]

Termux IRC Client SP

From blogginpedia
Jump to navigation Jump to search

first version

irc_sp.py

https://x.com/i/grok?conversation=1918094079356731464


import socket
import ssl
import threading
import sys
import time
import queue
from datetime import datetime
import readline
import hashlib

class IRCClient:
    def __init__(self):
        self.server = "irc.rizon.net"
        self.port = 6697
        self.channels = ["#/sp/"]  # Only #/sp/ channel
        self.active_channel = self.channels[0]
        self.nickname = "LullSac"
        self.realname = "Termux IRC Client for #/sp/"
        self.irc = None
        self.context = ssl.create_default_context()
        self.running = True
        self.message_queue = queue.Queue()
        self.lock = threading.Lock()
        self.reconnect_delay = 5
        self.max_reconnect_delay = 300
        # Initialize nicklists: dictionary mapping channels to sets of nicks
        self.nicklists = {channel: set() for channel in self.channels}
        # Define ANSI color codes for nicknames
        self.nick_colors = [
            "\033[31m",  # Red
            "\033[32m",  # Green
            "\033[33m",  # Yellow
            "\033[34m",  # Blue
            "\033[35m",  # Magenta
            "\033[36m",  # Cyan
            "\033[91m",  # Bright Red
            "\033[92m",  # Bright Green
        ]
        self.color_reset = "\033[0m"

    def get_colored_nick(self, nick):
        """Return the nickname wrapped in a consistent ANSI color based on its hash."""
        if not nick:
            return nick
        # Hash the nickname to get a consistent index
        hash_value = int(hashlib.md5(nick.encode('utf-8')).hexdigest(), 16)
        color_index = hash_value % len(self.nick_colors)
        color = self.nick_colors[color_index]
        return f"{color}{nick}{self.color_reset}"

    def connect(self):
        try:
            raw_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            self.irc = self.context.wrap_socket(raw_socket, server_hostname=self.server)
            print(f"{self.get_timestamp()} Connecting to {self.server}:{self.port}...")
            self.irc.connect((self.server, self.port))
            self.send(f"USER {self.nickname} 0 * :{self.realname}")
            self.send(f"NICK {self.nickname}")
            return True
        except Exception as e:
            print(f"{self.get_timestamp()} Connection error: {e}")
            return False

    def send(self, message):
        try:
            self.irc.send(f"{message}\r\n".encode('utf-8'))
        except Exception as e:
            print(f"{self.get_timestamp()} Error sending message: {e}")

    def get_timestamp(self):
        timestamp = datetime.now().strftime("%I:%M %p")
        return f"\033[34m[{timestamp}]\033[0m"

    def complete_nick(self, text, state):
        """Tab completion function for nicknames."""
        with self.lock:
            # Get the nicklist for the active channel
            nicklist = self.nicklists.get(self.active_channel, set()) if self.active_channel else set()
        
        # Find matches: nicks starting with the input text (case-insensitive)
        matches = [nick for nick in nicklist if nick.lower().startswith(text.lower())]
        
        # Return the state-th match, or None if no more matches
        return matches[state] if state < len(matches) else None

    def handle_input(self):
        # Set up readline tab completion
        readline.set_completer(self.complete_nick)
        readline.parse_and_bind("tab: complete")
        
        while self.running:
            try:
                with self.lock:
                    prompt = f"[{self.active_channel}] > " if self.active_channel else "[No channel] > "
                sys.stdout.write(prompt)
                sys.stdout.flush()
                message = input().strip()
                if not self.running:
                    break
                if message:
                    self.message_queue.put(message)
            except KeyboardInterrupt:
                self.message_queue.put("QUIT :Goodbye")
                break
            except Exception as e:
                print(f"{self.get_timestamp()} Input error: {e}")

    def handle_server(self):
        while self.running:
            try:
                data = self.irc.recv(2048).decode('utf-8', errors='ignore')
                if not data:
                    print(f"{self.get_timestamp()} Disconnected from server.")
                    self.reconnect()
                    break

                for line in data.strip().split('\r\n'):
                    if not line:
                        continue

                    # Define messages to handle specially
                    handled_messages = ["PRIVMSG", "JOIN", "PART", "PING", "376", "422", "433", "332", "352", "QUIT"]

                    # Check if the message is handled by examining the command field
                    parts = line.split()
                    is_handled = False
                    command = None
                    if len(parts) >= 2:
                        command = parts[1] if parts[0].startswith(':') else parts[0]
                        is_handled = command in handled_messages

                    # Only print raw line if not handled
                    if not is_handled:
                        print(f"{self.get_timestamp()} {line}")

                    # Handle PING silently
                    if line.startswith("PING"):
                        pong_response = f"PONG {line.split()[1]}"
                        self.send(pong_response)
                        continue  # Skip further processing to avoid printing

                    # Join channels after MOTD and request WHO
                    if "376" in line or "422" in line:
                        with self.lock:
                            for channel in self.channels:
                                self.send(f"JOIN {channel}")
                                print(f"{self.get_timestamp()} Joined {channel}")
                                # Request WHO to populate nicklist
                                self.send(f"WHO {channel}")
                            self.active_channel = self.channels[0] if self.channels else None

                    # Handle nick in use
                    if "433" in line:
                        self.nickname = f"{self.nickname}_"
                        self.send(f"NICK {self.nickname}")
                        print(f"{self.get_timestamp()} Nick in use, trying {self.get_colored_nick(self.nickname)}")

                    # Parse PRIVMSG
                    if "PRIVMSG" in line:
                        try:
                            sender = line[1:line.index('!')]
                            target = line.split()[2]
                            msg = line[line.index(':', 1) + 1:]
                            colored_sender = self.get_colored_nick(sender)
                            if msg.startswith('\x01ACTION '):
                                msg = msg[8:-1]
                                print(f"{self.get_timestamp()} * {colored_sender}@{target} {msg}")
                            elif msg.startswith('\x01'):
                                print(f"{self.get_timestamp()} CTCP from {colored_sender}: {msg}")
                            else:
                                print(f"{self.get_timestamp()} <{colored_sender}@{target}> {msg}")
                        except ValueError:
                            print(f"{self.get_timestamp()} Malformed PRIVMSG: {line}")

                    # Handle channel join
                    if "JOIN" in line:
                        try:
                            sender = line[1:line.index('!')] if '!' in line else None
                            channel = line.split(':', 2)[-1]
                            colored_sender = self.get_colored_nick(sender)
                            with self.lock:
                                if sender == self.nickname:
                                    if channel not in self.channels:
                                        self.channels.append(channel)
                                        self.nicklists[channel] = set()
                                        print(f"{self.get_timestamp()} Added {channel} to active channels.")
                                        if not self.active_channel:
                                            self.active_channel = channel
                                    # Request WHO to populate nicklist
                                    self.send(f"WHO {channel}")
                                else:
                                    # Add joining user to nicklist
                                    if channel in self.nicklists:
                                        self.nicklists[channel].add(sender)
                                        print(f"{self.get_timestamp()} {colored_sender} joined {channel}")
                        except ValueError:
                            print(f"{self.get_timestamp()} Malformed JOIN: {line}")

                    # Handle parting
                    if "PART" in line:
                        try:
                            sender = line[1:line.index('!')] if '!' in line else None
                            channel = line.split()[2]
                            colored_sender = self.get_colored_nick(sender)
                            with self.lock:
                                if sender == self.nickname:
                                    if channel in self.channels:
                                        self.channels.remove(channel)
                                        self.nicklists.pop(channel, None)
                                        print(f"{self.get_timestamp()} Left {channel}")
                                        if self.active_channel == channel:
                                            self.active_channel = self.channels[0] if self.channels else None
                                            print(f"{self.get_timestamp()} Active channel switched to: {self.active_channel or 'None'}")
                                else:
                                    # Remove parting user from nicklist
                                    if channel in self.nicklists and sender in self.nicklists[channel]:
                                        self.nicklists[channel].remove(sender)
                                        print(f"{self.get_timestamp()} {colored_sender} left {channel}")
                        except ValueError:
                            print(f"{self.get_timestamp()} Malformed PART: {line}")

                    # Handle QUIT
                    if "QUIT" in line:
                        try:
                            sender = line[1:line.index('!')] if '!' in line else None
                            colored_sender = self.get_colored_nick(sender)
                            with self.lock:
                                # Remove user from all channel nicklists
                                for channel in self.nicklists:
                                    if sender in self.nicklists[channel]:
                                        self.nicklists[channel].remove(sender)
                                print(f"{self.get_timestamp()} {colored_sender} quit")
                        except ValueError:
                            print(f"{self.get_timestamp()} Malformed QUIT: {line}")

                    # Handle topic
                    if "332" in line:
                        try:
                            channel = line.split()[3]
                            topic = line[line.index(':', 1) + 1:]
                            print(f"{self.get_timestamp()} Topic for {channel}: {topic}")
                        except (IndexError, ValueError):
                            print(f"{self.get_timestamp()} Malformed TOPIC (332): {line}")

                    # Handle WHO response
                    if "352" in line:
                        try:
                            parts = line.split()
                            if len(parts) >= 8:
                                channel, user, host, nick = parts[3], parts[4], parts[5], parts[7]
                                colored_nick = self.get_colored_nick(nick)
                                with self.lock:
                                    if channel in self.nicklists:
                                        self.nicklists[channel].add(nick)
                                        print(f"{self.get_timestamp()} {colored_nick} ({user}@{host}) in {channel}")
                            else:
                                print(f"{self.get_timestamp()} Malformed WHO reply (352): {line}")
                        except (IndexError, ValueError):
                            print(f"{self.get_timestamp()} Malformed WHO reply (352): {line}")

            except Exception as e:
                print(f"{self.get_timestamp()} Server error: {e}")
                self.reconnect()
                break

    def reconnect(self):
        self.running = False
        if self.irc:
            try:
                self.irc.close()
            except:
                pass
        self.irc = None
        attempts = 0
        while not self.running and attempts < 5:
            delay = min(self.reconnect_delay * (2 ** attempts), self.max_reconnect_delay)
            print(f"{self.get_timestamp()} Reconnecting in {delay} seconds...")
            time.sleep(delay)
            self.running = True
            if self.connect():
                print(f"{self.get_timestamp()} Reconnected successfully.")
                return
            attempts += 1
        print(f"{self.get_timestamp()} Failed to reconnect after {attempts} attempts.")
        self.running = False

    def process_commands(self):
        while self.running:
            try:
                message = self.message_queue.get(timeout=1)
                if message.lower() == "quit":
                    self.send("QUIT :Goodbye")
                    self.running = False
                elif message.startswith('/'):
                    self.handle_command(message)
                else:
                    with self.lock:
                        if self.active_channel:
                            self.send(f"PRIVMSG {self.active_channel} :{message}")
                        else:
                            print(f"{self.get_timestamp()} No active channel. Use /join <channel> or /switch <index>.")
            except queue.Empty:
                continue
            except Exception as e:
                print(f"{self.get_timestamp()} Command error: {e}")

    def handle_command(self, command):
        parts = command.split(maxsplit=2)
        cmd = parts[0].lower()
        if cmd == "/nick" and len(parts) > 1:
            self.nickname = parts[1]
            self.send(f"NICK {self.nickname}")
        elif cmd == "/join" and len(parts) > 1:
            channel = parts[1]
            self.send(f"JOIN {channel}")
            with self.lock:
                if channel not in self.channels:
                    self.channels.append(channel)
                    self.nicklists[channel] = set()
                    print(f"{self.get_timestamp()} Requested to join {channel}")
        elif cmd == "/msg" and len(parts) > 2:
            target = parts[1]
            msg = parts[2]
            self.send(f"PRIVMSG {target} :{msg}")
        elif cmd == "/me" and len(parts) > 1:
            with self.lock:
                if self.active_channel:
                    msg = parts[1]
                    self.send(f"PRIVMSG {self.active_channel} :\x01ACTION {msg}\x01")
                else:
                    print(f"{self.get_timestamp()} No active channel.")
        elif cmd == "/list":
            with self.lock:
                if self.channels:
                    print(f"{self.get_timestamp()} Active channels:")
                    for i, channel in enumerate(self.channels):
                        mark = "*" if channel == self.active_channel else " "
                        print(f"{self.get_timestamp()} {i}: {channel}{mark}")
                else:
                    print(f"{self.get_timestamp()} No channels joined.")
        elif cmd == "/part" and len(parts) > 1:
            channel = parts[1]
            self.send(f"PART {channel}")
        elif cmd == "/switch" and len(parts) > 1:
            try:
                index = int(parts[1])
                with self.lock:
                    if 0 <= index < len(self.channels):
                        self.active_channel = self.channels[index]
                        print(f"{self.get_timestamp()} Switched to active channel: {self.active_channel}")
                    else:
                        print(f"{self.get_timestamp()} Invalid index. Use /list to see channels.")
            except ValueError:
                print(f"{self.get_timestamp()} Invalid index. Use a number (e.g., /switch 0).")
        elif cmd == "/who" and len(parts) > 1:
            self.send(f"WHO {parts[1]}")
        elif cmd == "/topic" and len(parts) > 1:
            channel = parts[1]
            if len(parts) > 2:
                self.send(f"TOPIC {channel} :{parts[2]}")
            else:
                self.send(f"TOPIC {channel}")
        elif cmd == "/clear":
            print("\033[H\033[2J", end="")
        elif cmd == "/help":
            print(f"{self.get_timestamp()} Commands: /nick <nick>, /join <channel>, /msg <target> <msg>, /me <action>, /list, /part <channel>, /switch <index>, /who <channel>, /topic <channel> [new topic], /clear, /help, quit")
        else:
            print(f"{self.get_timestamp()} Unknown command or invalid syntax. Use /help.")

    def run(self):
        if not self.connect():
            return

        server_thread = threading.Thread(target=self.handle_server)
        input_thread = threading.Thread(target=self.handle_input)
        command_thread = threading.Thread(target=self.process_commands)

        server_thread.start()
        input_thread.start()
        command_thread.start()

        try:
            server_thread.join()
            input_thread.join()
            command_thread.join()
        except KeyboardInterrupt:
            self.running = False

        if self.irc:
            try:
                self.irc.close()
            except:
                pass
        print(f"{self.get_timestamp()} Connection closed.")

def main():
    client = IRCClient()
    client.run()

if __name__ == "__main__":
    main()

second version

That first version worked for a day and then started having nicklist and buffer issues. This new version should have taken care of those problems.


import socket
import ssl
import threading
import sys
import time
import queue
from datetime import datetime
import readline
import hashlib

class IRCClient:
    def __init__(self):
        self.server = "irc.rizon.net"
        self.port = 6697
        self.channels = ["#/sp/"]  # Only #/sp/ channel
        self.active_channel = self.channels[0]
        self.nickname = "LullSac"
        self.realname = "Termux IRC Client for #/sp/"
        self.irc = None
        self.context = ssl.create_default_context()
        self.running = True
        self.message_queue = queue.Queue()
        self.lock = threading.Lock()
        self.reconnect_delay = 5
        self.max_reconnect_delay = 300
        
        # Initialize nicklists with proper synchronization
        self.nicklists = {channel: set() for channel in self.channels}
        
        # ANSI color codes for nicknames
        self.nick_colors = [
            "\033[31m", "\033[32m", "\033[33m", "\033[34m", 
            "\033[35m", "\033[36m", "\033[91m", "\033[92m"
        ]
        self.color_reset = "\033[0m"

    def get_colored_nick(self, nick):
        """Return colored nickname based on consistent hash."""
        if not nick:
            return nick
        hash_value = int(hashlib.md5(nick.encode('utf-8')).hexdigest(), 16)
        color_index = hash_value % len(self.nick_colors)
        return f"{self.nick_colors[color_index]}{nick}{self.color_reset}"

    def connect(self):
        try:
            raw_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            self.irc = self.context.wrap_socket(raw_socket, server_hostname=self.server)
            self.irc.connect((self.server, self.port))
            self.send(f"USER {self.nickname} 0 * :{self.realname}")
            self.send(f"NICK {self.nickname}")
            return True
        except Exception as e:
            print(f"{self.get_timestamp()} Connection error: {e}")
            return False

    def send(self, message):
        try:
            self.irc.send(f"{message}\r\n".encode('utf-8'))
        except Exception as e:
            print(f"{self.get_timestamp()} Error sending message: {e}")

    def get_timestamp(self):
        return f"\033[34m[{datetime.now().strftime('%I:%M %p')}]\033[0m"

    def handle_server(self):
        buffer = ""
        while self.running:
            try:
                data = self.irc.recv(2048).decode('utf-8', errors='ignore')
                if not data:
                    raise Exception("Connection closed by server")
                
                buffer += data
                lines = buffer.split('\r\n')
                buffer = lines.pop()  # Keep incomplete line in buffer
                
                for line in lines:
                    if not line.strip():
                        continue

                    # Handle PING immediately
                    if line.startswith("PING"):
                        self.send(f"PONG {line.split()[1]}")
                        continue

                    # Handle server responses
                    if "PRIVMSG" in line:
                        self.handle_privmsg(line)
                    elif "JOIN" in line:
                        self.handle_join(line)
                    elif "PART" in line:
                        self.handle_part(line)
                    elif "QUIT" in line:
                        self.handle_quit(line)
                    elif "352" in line:  # WHO response
                        self.handle_who(line)
                    elif "376" in line or "422" in line:  # End of MOTD
                        self.join_channels()
                    elif "433" in line:  # Nick in use
                        self.handle_nick_in_use()
                    elif "332" in line:  # TOPIC
                        self.handle_topic(line)
                    else:
                        print(f"{self.get_timestamp()} {line}")

            except Exception as e:
                print(f"{self.get_timestamp()} Server error: {e}")
                self.reconnect()
                break

    def handle_privmsg(self, line):
        try:
            sender = line[1:line.index('!')]
            target = line.split()[2]
            msg = line[line.index(':', 1) + 1:]
            colored_sender = self.get_colored_nick(sender)
            
            if msg.startswith('\x01ACTION '):
                print(f"{self.get_timestamp()} * {colored_sender} {msg[8:-1]}")
            elif msg.startswith('\x01'):
                print(f"{self.get_timestamp()} CTCP from {colored_sender}: {msg}")
            else:
                print(f"{self.get_timestamp()} <{colored_sender}> {msg}")
        except ValueError:
            print(f"{self.get_timestamp()} Malformed PRIVMSG: {line}")

    def handle_join(self, line):
        try:
            sender = line[1:line.index('!')]
            channel = line.split(':', 2)[-1].strip()
            colored_sender = self.get_colored_nick(sender)
            
            with self.lock:
                if sender == self.nickname:
                    if channel not in self.channels:
                        self.channels.append(channel)
                        self.nicklists[channel] = set()
                        print(f"{self.get_timestamp()} Joined {channel}")
                        self.send(f"WHO {channel}")
                else:
                    if channel in self.nicklists:
                        self.nicklists[channel].add(sender)
                        print(f"{self.get_timestamp()} {colored_sender} joined {channel}")
        except ValueError:
            print(f"{self.get_timestamp()} Malformed JOIN: {line}")

    def handle_part(self, line):
        try:
            sender = line[1:line.index('!')]
            channel = line.split()[2]
            colored_sender = self.get_colored_nick(sender)
            
            with self.lock:
                if sender == self.nickname:
                    if channel in self.channels:
                        self.channels.remove(channel)
                        self.nicklists.pop(channel, None)
                        print(f"{self.get_timestamp()} Left {channel}")
                        if self.active_channel == channel:
                            self.active_channel = self.channels[0] if self.channels else None
                else:
                    if channel in self.nicklists and sender in self.nicklists[channel]:
                        self.nicklists[channel].remove(sender)
                        print(f"{self.get_timestamp()} {colored_sender} left {channel}")
        except ValueError:
            print(f"{self.get_timestamp()} Malformed PART: {line}")

    def handle_quit(self, line):
        try:
            sender = line[1:line.index('!')]
            colored_sender = self.get_colored_nick(sender)
            
            with self.lock:
                for channel in self.nicklists:
                    if sender in self.nicklists[channel]:
                        self.nicklists[channel].remove(sender)
                        print(f"{self.get_timestamp()} {colored_sender} quit")
        except ValueError:
            print(f"{self.get_timestamp()} Malformed QUIT: {line}")

    def handle_who(self, line):
        try:
            parts = line.split()
            if len(parts) >= 8:
                channel = parts[3]
                nick = parts[7]
                
                with self.lock:
                    if channel in self.nicklists:
                        self.nicklists[channel].add(nick)
        except (IndexError, ValueError):
            print(f"{self.get_timestamp()} Malformed WHO reply: {line}")

    def join_channels(self):
        with self.lock:
            for channel in self.channels:
                self.send(f"JOIN {channel}")
                self.send(f"WHO {channel}")

    def handle_nick_in_use(self):
        self.nickname = f"{self.nickname}_"
        self.send(f"NICK {self.nickname}")
        print(f"{self.get_timestamp()} Nick in use, trying {self.get_colored_nick(self.nickname)}")

    def handle_topic(self, line):
        try:
            channel = line.split()[3]
            topic = line[line.index(':', 1) + 1:]
            print(f"{self.get_timestamp()} Topic for {channel}: {topic}")
        except (IndexError, ValueError):
            print(f"{self.get_timestamp()} Malformed TOPIC: {line}")

    def reconnect(self):
        self.running = False
        if self.irc:
            try:
                self.irc.close()
            except:
                pass
        self.irc = None
        
        attempts = 0
        while attempts < 5:
            delay = min(self.reconnect_delay * (2 ** attempts), self.max_reconnect_delay)
            print(f"{self.get_timestamp()} Reconnecting in {delay} seconds...")
            time.sleep(delay)
            self.running = True
            if self.connect():
                print(f"{self.get_timestamp()} Reconnected successfully.")
                return
            attempts += 1
        
        print(f"{self.get_timestamp()} Failed to reconnect after {attempts} attempts.")
        self.running = False

    def handle_input(self):
        readline.set_completer(self.complete_nick)
        readline.parse_and_bind("tab: complete")
        
        while self.running:
            try:
                with self.lock:
                    prompt = f"[{self.active_channel}] > " if self.active_channel else "[No channel] > "
                sys.stdout.write(prompt)
                sys.stdout.flush()
                message = input().strip()
                if not self.running:
                    break
                if message:
                    self.message_queue.put(message)
            except KeyboardInterrupt:
                self.message_queue.put("QUIT :Goodbye")
                break
            except Exception as e:
                print(f"{self.get_timestamp()} Input error: {e}")

    def complete_nick(self, text, state):
        """Tab completion for nicknames in active channel."""
        with self.lock:
            if not self.active_channel:
                return None
            nicklist = self.nicklists.get(self.active_channel, set())
            matches = [nick for nick in nicklist if nick.lower().startswith(text.lower())]
            return matches[state] if state < len(matches) else None

    def process_commands(self):
        while self.running:
            try:
                message = self.message_queue.get(timeout=1)
                if message.lower() == "quit":
                    self.send("QUIT :Goodbye")
                    self.running = False
                elif message.startswith('/'):
                    self.handle_command(message)
                else:
                    with self.lock:
                        if self.active_channel:
                            self.send(f"PRIVMSG {self.active_channel} :{message}")
                        else:
                            print(f"{self.get_timestamp()} No active channel. Use /join <channel>")
            except queue.Empty:
                continue
            except Exception as e:
                print(f"{self.get_timestamp()} Command error: {e}")

    def handle_command(self, command):
        parts = command.split(maxsplit=2)
        cmd = parts[0].lower()
        
        if cmd == "/nick" and len(parts) > 1:
            self.nickname = parts[1]
            self.send(f"NICK {self.nickname}")
        elif cmd == "/join" and len(parts) > 1:
            channel = parts[1]
            self.send(f"JOIN {channel}")
            with self.lock:
                if channel not in self.channels:
                    self.channels.append(channel)
                    self.nicklists[channel] = set()
                    print(f"{self.get_timestamp()} Requested to join {channel}")
        elif cmd == "/msg" and len(parts) > 2:
            target = parts[1]
            msg = parts[2]
            self.send(f"PRIVMSG {target} :{msg}")
        elif cmd == "/me" and len(parts) > 1:
            with self.lock:
                if self.active_channel:
                    msg = parts[1]
                    self.send(f"PRIVMSG {self.active_channel} :\x01ACTION {msg}\x01")
                else:
                    print(f"{self.get_timestamp()} No active channel.")
        elif cmd == "/list":
            with self.lock:
                if self.channels:
                    print(f"{self.get_timestamp()} Active channels:")
                    for i, channel in enumerate(self.channels):
                        mark = "*" if channel == self.active_channel else " "
                        print(f"{self.get_timestamp()} {i}: {channel}{mark}")
                else:
                    print(f"{self.get_timestamp()} No channels joined.")
        elif cmd == "/part" and len(parts) > 1:
            channel = parts[1]
            self.send(f"PART {channel}")
        elif cmd == "/switch" and len(parts) > 1:
            try:
                index = int(parts[1])
                with self.lock:
                    if 0 <= index < len(self.channels):
                        self.active_channel = self.channels[index]
                        print(f"{self.get_timestamp()} Switched to active channel: {self.active_channel}")
                    else:
                        print(f"{self.get_timestamp()} Invalid index. Use /list to see channels.")
            except ValueError:
                print(f"{self.get_timestamp()} Invalid index. Use a number (e.g., /switch 0).")
        elif cmd == "/who" and len(parts) > 1:
            self.send(f"WHO {parts[1]}")
        elif cmd == "/topic" and len(parts) > 1:
            channel = parts[1]
            if len(parts) > 2:
                self.send(f"TOPIC {channel} :{parts[2]}")
            else:
                self.send(f"TOPIC {channel}")
        elif cmd == "/clear":
            print("\033[H\033[2J", end="")
        elif cmd == "/help":
            print(f"{self.get_timestamp()} Commands: /nick <nick>, /join <channel>, /msg <target> <msg>, /me <action>, /list, /part <channel>, /switch <index>, /who <channel>, /topic <channel> [new topic], /clear, /help, quit")
        else:
            print(f"{self.get_timestamp()} Unknown command or invalid syntax. Use /help.")

    def run(self):
        if not self.connect():
            return

        server_thread = threading.Thread(target=self.handle_server)
        input_thread = threading.Thread(target=self.handle_input)
        command_thread = threading.Thread(target=self.process_commands)

        server_thread.start()
        input_thread.start()
        command_thread.start()

        try:
            server_thread.join()
            input_thread.join()
            command_thread.join()
        except KeyboardInterrupt:
            self.running = False

        if self.irc:
            try:
                self.irc.close()
            except:
                pass
        print(f"{self.get_timestamp()} Connection closed.")

def main():
    client = IRCClient()
    client.run()

if __name__ == "__main__":
    main()

third version

This version added timestamps to chat, action, and private messages for the user. Do not use. This version does add timestamps, but it doubles up your chats in the buffer. Pick this up another time.

https://deepsite.site/chat/m37mtoma7frlmc

https://x.com/i/grok?conversation=1918464626129277043


import socket
import ssl
import threading
import sys
import time
import queue
from datetime import datetime
import readline
import hashlib

class IRCClient:
    def __init__(self):
        self.server = "irc.rizon.net"
        self.port = 6697
        self.channels = ["#/sp/"]  # Only #/sp/ channel
        self.active_channel = self.channels[0]
        self.nickname = "LullSac"
        self.realname = "Termux IRC Client for #/sp/"
        self.irc = None
        self.context = ssl.create_default_context()
        self.running = True
        self.message_queue = queue.Queue()
        self.lock = threading.Lock()
        self.reconnect_delay = 5
        self.max_reconnect_delay = 300
        # Initialize nicklists: dictionary mapping channels to sets of nicks
        self.nicklists = {channel: set() for channel in self.channels}
        # Define ANSI color codes for nicknames
        self.nick_colors = [
            "\033[31m",  # Red
            "\033[32m",  # Green
            "\033[33m",  # Yellow
            "\033[34m",  # Blue
            "\033[35m",  # Magenta
            "\033[36m",  # Cyan
            "\033[91m",  # Bright Red
            "\033[92m",  # Bright Green
        ]
        self.color_reset = "\033[0m"

    def get_colored_nick(self, nick):
        """Return the nickname wrapped in a consistent ANSI color based on its hash."""
        if not nick:
            return nick
        # Hash the nickname to get a consistent index
        hash_value = int(hashlib.md5(nick.encode('utf-8')).hexdigest(), 16)
        color_index = hash_value % len(self.nick_colors)
        color = self.nick_colors[color_index]
        return f"{color}{nick}{self.color_reset}"

    def connect(self):
        try:
            raw_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            self.irc = self.context.wrap_socket(raw_socket, server_hostname=self.server)
            print(f"{self.get_timestamp()} Connecting to {self.server}:{self.port}...")
            self.irc.connect((self.server, self.port))
            self.send(f"USER {self.nickname} 0 * :{self.realname}")
            self.send(f"NICK {self.nickname}")
            return True
        except Exception as e:
            print(f"{self.get_timestamp()} Connection error: {e}")
            return False

    def send(self, message):
        try:
            self.irc.send(f"{message}\r\n".encode('utf-8'))
        except Exception as e:
            print(f"{self.get_timestamp()} Error sending message: {e}")

    def get_timestamp(self):
        """Return blue timestamp in format [HH:MM AM/PM]"""
        timestamp = datetime.now().strftime("%I:%M %p")
        return f"\033[34m[{timestamp}]\033[0m"

    def complete_nick(self, text, state):
        """Tab completion function for nicknames."""
        with self.lock:
            # Get the nicklist for the active channel
            nicklist = self.nicklists.get(self.active_channel, set()) if self.active_channel else set()
        
        # Find matches: nicks starting with the input text (case-insensitive)
        matches = [nick for nick in nicklist if nick.lower().startswith(text.lower())]
        
        # Return the state-th match, or None if no more matches
        return matches[state] if state < len(matches) else None

    def handle_input(self):
        # Set up readline tab completion
        readline.set_completer(self.complete_nick)
        readline.parse_and_bind("tab: complete")
        
        while self.running:
            try:
                with self.lock:
                    prompt = f"[{self.active_channel}] > " if self.active_channel else "[No channel] > "
                sys.stdout.write(prompt)
                sys.stdout.flush()
                message = input().strip()
                if not self.running:
                    break
                if message:
                    self.message_queue.put(message)
            except KeyboardInterrupt:
                self.message_queue.put("QUIT :Goodbye")
                break
            except Exception as e:
                print(f"{self.get_timestamp()} Input error: {e}")

    def handle_server(self):
        buffer = ""
        while self.running:
            try:
                data = self.irc.recv(2048).decode('utf-8', errors='ignore')
                if not data:
                    print(f"{self.get_timestamp()} Disconnected from server.")
                    self.reconnect()
                    break

                buffer += data
                lines = buffer.split('\r\n')
                buffer = lines.pop()  # Keep incomplete line in buffer
                
                for line in lines:
                    if not line.strip():
                        continue

                    # Handle PING immediately
                    if line.startswith("PING"):
                        self.send(f"PONG {line.split()[1]}")
                        continue

                    # Handle server responses
                    if "PRIVMSG" in line:
                        self.handle_privmsg(line)
                    elif "JOIN" in line:
                        self.handle_join(line)
                    elif "PART" in line:
                        self.handle_part(line)
                    elif "QUIT" in line:
                        self.handle_quit(line)
                    elif "352" in line:  # WHO response
                        self.handle_who(line)
                    elif "376" in line or "422" in line:  # End of MOTD
                        self.join_channels()
                    elif "433" in line:  # Nick in use
                        self.handle_nick_in_use()
                    elif "332" in line:  # TOPIC
                        self.handle_topic(line)
                    else:
                        print(f"{self.get_timestamp()} {line}")

            except Exception as e:
                print(f"{self.get_timestamp()} Server error: {e}")
                self.reconnect()
                break

    def handle_privmsg(self, line):
        try:
            sender = line[1:line.index('!')]
            # Skip if this is our own message (we already printed it locally)
            if sender == self.nickname:
                return
                
            target = line.split()[2]
            msg = line[line.index(':', 1) + 1:]
            colored_sender = self.get_colored_nick(sender)
            
            if msg.startswith('\x01ACTION '):
                msg = msg[8:-1]
                print(f"{self.get_timestamp()} * {colored_sender}@{target} {msg}")
            elif msg.startswith('\x01'):
                print(f"{self.get_timestamp()} CTCP from {colored_sender}: {msg}")
            else:
                print(f"{self.get_timestamp()} <{colored_sender}@{target}> {msg}")
        except ValueError:
            print(f"{self.get_timestamp()} Malformed PRIVMSG: {line}")

    def handle_join(self, line):
        try:
            sender = line[1:line.index('!')]
            channel = line.split(':', 2)[-1].strip()
            colored_sender = self.get_colored_nick(sender)
            
            with self.lock:
                if sender == self.nickname:
                    if channel not in self.channels:
                        self.channels.append(channel)
                        self.nicklists[channel] = set()
                        print(f"{self.get_timestamp()} Joined {channel}")
                        self.send(f"WHO {channel}")
                else:
                    if channel in self.nicklists:
                        self.nicklists[channel].add(sender)
                        print(f"{self.get_timestamp()} {colored_sender} joined {channel}")
        except ValueError:
            print(f"{self.get_timestamp()} Malformed JOIN: {line}")

    def handle_part(self, line):
        try:
            sender = line[1:line.index('!')]
            channel = line.split()[2]
            colored_sender = self.get_colored_nick(sender)
            
            with self.lock:
                if sender == self.nickname:
                    if channel in self.channels:
                        self.channels.remove(channel)
                        self.nicklists.pop(channel, None)
                        print(f"{self.get_timestamp()} Left {channel}")
                        if self.active_channel == channel:
                            self.active_channel = self.channels[0] if self.channels else None
                            print(f"{self.get_timestamp()} Active channel switched to: {self.active_channel or 'None'}")
                else:
                    if channel in self.nicklists and sender in self.nicklists[channel]:
                        self.nicklists[channel].remove(sender)
                        print(f"{self.get_timestamp()} {colored_sender} left {channel}")
        except ValueError:
            print(f"{self.get_timestamp()} Malformed PART: {line}")

    def handle_quit(self, line):
        try:
            sender = line[1:line.index('!')] if '!' in line else None
            colored_sender = self.get_colored_nick(sender)
            with self.lock:
                # Remove user from all channel nicklists
                for channel in self.nicklists:
                    if sender in self.nicklists[channel]:
                        self.nicklists[channel].remove(sender)
                print(f"{self.get_timestamp()} {colored_sender} quit")
        except ValueError:
            print(f"{self.get_timestamp()} Malformed QUIT: {line}")

    def handle_who(self, line):
        try:
            parts = line.split()
            if len(parts) >= 8:
                channel = parts[3]
                nick = parts[7]
                with self.lock:
                    if channel in self.nicklists:
                        self.nicklists[channel].add(nick)
                        print(f"{self.get_timestamp()} {self.get_colored_nick(nick)} joined {channel}")
            else:
                print(f"{self.get_timestamp()} Malformed WHO reply (352): {line}")
        except (IndexError, ValueError):
            print(f"{self.get_timestamp()} Malformed WHO reply (352): {line}")

    def handle_topic(self, line):
        try:
            channel = line.split()[3]
            topic = line[line.index(':', 1) + 1:]
            print(f"{self.get_timestamp()} Topic for {channel}: {topic}")
        except (IndexError, ValueError):
            print(f"{self.get_timestamp()} Malformed TOPIC (332): {line}")

    def handle_nick_in_use(self):
        self.nickname = f"{self.nickname}_"
        self.send(f"NICK {self.nickname}")
        print(f"{self.get_timestamp()} Nick in use, trying {self.get_colored_nick(self.nickname)}")

    def join_channels(self):
        with self.lock:
            for channel in self.channels:
                self.send(f"JOIN {channel}")
                print(f"{self.get_timestamp()} Joined {channel}")
                # Request WHO to populate nicklist
                self.send(f"WHO {channel}")
            self.active_channel = self.channels[0] if self.channels else None

    def reconnect(self):
        self.running = False
        if self.irc:
            try:
                self.irc.close()
            except:
                pass
        self.irc = None
        attempts = 0
        while not self.running and attempts < 5:
            delay = min(self.reconnect_delay * (2 ** attempts), self.max_reconnect_delay)
            print(f"{self.get_timestamp()} Reconnecting in {delay} seconds...")
            time.sleep(delay)
            self.running = True
            if self.connect():
                print(f"{self.get_timestamp()} Reconnected successfully.")
                return
            attempts += 1
        print(f"{self.get_timestamp()} Failed to reconnect after {attempts} attempts.")
        self.running = False

    def process_commands(self):
        while self.running:
            try:
                message = self.message_queue.get(timeout=1)
                if message.lower() == "quit":
                    self.send("QUIT :Goodbye")
                    self.running = False
                elif message.startswith('/'):
                    self.handle_command(message)
                else:
                    with self.lock:
                        if self.active_channel:
                            # Only print our message locally (with timestamp)
                            colored_nick = self.get_colored_nick(self.nickname)
                            print(f"{self.get_timestamp()} <{colored_nick}> {message}")
                            # Send to server but don't print the echo
                            self.send(f"PRIVMSG {self.active_channel} :{message}")
                        else:
                            print(f"{self.get_timestamp()} No active channel. Use /join <channel> or /switch <index>.")
            except queue.Empty:
                continue
            except Exception as e:
                print(f"{self.get_timestamp()} Command error: {e}")

    def handle_command(self, command):
        parts = command.split(maxsplit=2)
        cmd = parts[0].lower()
        
        if cmd == "/nick" and len(parts) > 1:
            self.nickname = parts[1]
            self.send(f"NICK {self.nickname}")
            print(f"{self.get_timestamp()} Changed nick to {self.get_colored_nick(self.nickname)}")
            
        elif cmd == "/join" and len(parts) > 1:
            channel = parts[1]
            self.send(f"JOIN {channel}")
            with self.lock:
                if channel not in self.channels:
                    self.channels.append(channel)
                    self.nicklists[channel] = set()
                    print(f"{self.get_timestamp()} Joined {channel}")
                    if not self.active_channel:
                        self.active_channel = channel
                    self.send(f"WHO {channel}")
                else:
                    print(f"{self.get_timestamp()} Already in {channel}")
                    
        elif cmd == "/msg" and len(parts) > 2:
            target = parts[1]
            msg = parts[2]
            colored_nick = self.get_colored_nick(self.nickname)
            print(f"{self.get_timestamp()} -> {colored_nick}@{target}> {msg}")
            self.send(f"PRIVMSG {target} :{msg}")
            
        elif cmd == "/me" and len(parts) > 1:
            with self.lock:
                if self.active_channel:
                    msg = parts[1]
                    colored_nick = self.get_colored_nick(self.nickname)
                    print(f"{self.get_timestamp()} * {colored_nick} {msg}")
                    self.send(f"PRIVMSG {self.active_channel} :\x01ACTION {msg}\x01")
                else:
                    print(f"{self.get_timestamp()} No active channel.")
                    
        elif cmd == "/list":
            with self.lock:
                if self.channels:
                    print(f"{self.get_timestamp()} Active channels:")
                    for i, channel in enumerate(self.channels):
                        mark = "*" if channel == self.active_channel else " "
                        print(f"{self.get_timestamp()} {i}: {channel}{mark}")
                else:
                    print(f"{self.get_timestamp()} No channels joined.")
                    
        elif cmd == "/part" and len(parts) > 1:
            channel = parts[1]
            self.send(f"PART {channel}")
            
        elif cmd == "/switch" and len(parts) > 1:
            try:
                index = int(parts[1])
                with self.lock:
                    if 0 <= index < len(self.channels):
                        self.active_channel = self.channels[index]
                        print(f"{self.get_timestamp()} Switched to active channel: {self.active_channel}")
                    else:
                        print(f"{self.get_timestamp()} Invalid index. Use /list to see channels.")
            except ValueError:
                print(f"{self.get_timestamp()} Invalid index. Use a number (e.g., /switch 0).")
                
        elif cmd == "/who" and len(parts) > 1:
            self.send(f"WHO {parts[1]}")
            
        elif cmd == "/topic" and len(parts) > 1:
            channel = parts[1]
            if len(parts) > 2:
                self.send(f"TOPIC {channel} :{parts[2]}")
            else:
                self.send(f"TOPIC {channel}")
                
        elif cmd == "/clear":
            print("\033[H\033[2J", end="")
            
        elif cmd == "/help":
            print(f"{self.get_timestamp()} Commands: /nick <nick>, /join <channel>, /msg <target> <msg>, /me <action>, /list, /part <channel>, /switch <index>, /who <channel>, /topic <channel> [new topic], /clear, /help, quit")
            
        else:
            print(f"{self.get_timestamp()} Unknown command or invalid syntax. Use /help.")

    def run(self):
        if not self.connect():
            return

        server_thread = threading.Thread(target=self.handle_server)
        input_thread = threading.Thread(target=self.handle_input)
        command_thread = threading.Thread(target=self.process_commands)

        server_thread.start()
        input_thread.start()
        command_thread.start()

        try:
            server_thread.join()
            input_thread.join()
            command_thread.join()
        except KeyboardInterrupt:
            self.running = False

        if self.irc:
            try:
                self.irc.close()
            except:
                pass
        print(f"{self.get_timestamp()} Connection closed.")

def main():
    client = IRCClient()
    client.run()

if __name__ == "__main__":
    main()


¤ Termux IRC Client SP ¤ Termux IRC Client GTV ¤ Termux IRC Client ¤ Termux ¤ original link ¤