Termux IRC Client SP
Jump to navigation
Jump to search
first version
irc_sp.py
https://x.com/i/grok?conversation=1918094079356731464
import socket
import ssl
import threading
import sys
import time
import queue
from datetime import datetime
import readline
import hashlib
class IRCClient:
def __init__(self):
self.server = "irc.rizon.net"
self.port = 6697
self.channels = ["#/sp/"] # Only #/sp/ channel
self.active_channel = self.channels[0]
self.nickname = "LullSac"
self.realname = "Termux IRC Client for #/sp/"
self.irc = None
self.context = ssl.create_default_context()
self.running = True
self.message_queue = queue.Queue()
self.lock = threading.Lock()
self.reconnect_delay = 5
self.max_reconnect_delay = 300
# Initialize nicklists: dictionary mapping channels to sets of nicks
self.nicklists = {channel: set() for channel in self.channels}
# Define ANSI color codes for nicknames
self.nick_colors = [
"\033[31m", # Red
"\033[32m", # Green
"\033[33m", # Yellow
"\033[34m", # Blue
"\033[35m", # Magenta
"\033[36m", # Cyan
"\033[91m", # Bright Red
"\033[92m", # Bright Green
]
self.color_reset = "\033[0m"
def get_colored_nick(self, nick):
"""Return the nickname wrapped in a consistent ANSI color based on its hash."""
if not nick:
return nick
# Hash the nickname to get a consistent index
hash_value = int(hashlib.md5(nick.encode('utf-8')).hexdigest(), 16)
color_index = hash_value % len(self.nick_colors)
color = self.nick_colors[color_index]
return f"{color}{nick}{self.color_reset}"
def connect(self):
try:
raw_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.irc = self.context.wrap_socket(raw_socket, server_hostname=self.server)
print(f"{self.get_timestamp()} Connecting to {self.server}:{self.port}...")
self.irc.connect((self.server, self.port))
self.send(f"USER {self.nickname} 0 * :{self.realname}")
self.send(f"NICK {self.nickname}")
return True
except Exception as e:
print(f"{self.get_timestamp()} Connection error: {e}")
return False
def send(self, message):
try:
self.irc.send(f"{message}\r\n".encode('utf-8'))
except Exception as e:
print(f"{self.get_timestamp()} Error sending message: {e}")
def get_timestamp(self):
timestamp = datetime.now().strftime("%I:%M %p")
return f"\033[34m[{timestamp}]\033[0m"
def complete_nick(self, text, state):
"""Tab completion function for nicknames."""
with self.lock:
# Get the nicklist for the active channel
nicklist = self.nicklists.get(self.active_channel, set()) if self.active_channel else set()
# Find matches: nicks starting with the input text (case-insensitive)
matches = [nick for nick in nicklist if nick.lower().startswith(text.lower())]
# Return the state-th match, or None if no more matches
return matches[state] if state < len(matches) else None
def handle_input(self):
# Set up readline tab completion
readline.set_completer(self.complete_nick)
readline.parse_and_bind("tab: complete")
while self.running:
try:
with self.lock:
prompt = f"[{self.active_channel}] > " if self.active_channel else "[No channel] > "
sys.stdout.write(prompt)
sys.stdout.flush()
message = input().strip()
if not self.running:
break
if message:
self.message_queue.put(message)
except KeyboardInterrupt:
self.message_queue.put("QUIT :Goodbye")
break
except Exception as e:
print(f"{self.get_timestamp()} Input error: {e}")
def handle_server(self):
while self.running:
try:
data = self.irc.recv(2048).decode('utf-8', errors='ignore')
if not data:
print(f"{self.get_timestamp()} Disconnected from server.")
self.reconnect()
break
for line in data.strip().split('\r\n'):
if not line:
continue
# Define messages to handle specially
handled_messages = ["PRIVMSG", "JOIN", "PART", "PING", "376", "422", "433", "332", "352", "QUIT"]
# Check if the message is handled by examining the command field
parts = line.split()
is_handled = False
command = None
if len(parts) >= 2:
command = parts[1] if parts[0].startswith(':') else parts[0]
is_handled = command in handled_messages
# Only print raw line if not handled
if not is_handled:
print(f"{self.get_timestamp()} {line}")
# Handle PING silently
if line.startswith("PING"):
pong_response = f"PONG {line.split()[1]}"
self.send(pong_response)
continue # Skip further processing to avoid printing
# Join channels after MOTD and request WHO
if "376" in line or "422" in line:
with self.lock:
for channel in self.channels:
self.send(f"JOIN {channel}")
print(f"{self.get_timestamp()} Joined {channel}")
# Request WHO to populate nicklist
self.send(f"WHO {channel}")
self.active_channel = self.channels[0] if self.channels else None
# Handle nick in use
if "433" in line:
self.nickname = f"{self.nickname}_"
self.send(f"NICK {self.nickname}")
print(f"{self.get_timestamp()} Nick in use, trying {self.get_colored_nick(self.nickname)}")
# Parse PRIVMSG
if "PRIVMSG" in line:
try:
sender = line[1:line.index('!')]
target = line.split()[2]
msg = line[line.index(':', 1) + 1:]
colored_sender = self.get_colored_nick(sender)
if msg.startswith('\x01ACTION '):
msg = msg[8:-1]
print(f"{self.get_timestamp()} * {colored_sender}@{target} {msg}")
elif msg.startswith('\x01'):
print(f"{self.get_timestamp()} CTCP from {colored_sender}: {msg}")
else:
print(f"{self.get_timestamp()} <{colored_sender}@{target}> {msg}")
except ValueError:
print(f"{self.get_timestamp()} Malformed PRIVMSG: {line}")
# Handle channel join
if "JOIN" in line:
try:
sender = line[1:line.index('!')] if '!' in line else None
channel = line.split(':', 2)[-1]
colored_sender = self.get_colored_nick(sender)
with self.lock:
if sender == self.nickname:
if channel not in self.channels:
self.channels.append(channel)
self.nicklists[channel] = set()
print(f"{self.get_timestamp()} Added {channel} to active channels.")
if not self.active_channel:
self.active_channel = channel
# Request WHO to populate nicklist
self.send(f"WHO {channel}")
else:
# Add joining user to nicklist
if channel in self.nicklists:
self.nicklists[channel].add(sender)
print(f"{self.get_timestamp()} {colored_sender} joined {channel}")
except ValueError:
print(f"{self.get_timestamp()} Malformed JOIN: {line}")
# Handle parting
if "PART" in line:
try:
sender = line[1:line.index('!')] if '!' in line else None
channel = line.split()[2]
colored_sender = self.get_colored_nick(sender)
with self.lock:
if sender == self.nickname:
if channel in self.channels:
self.channels.remove(channel)
self.nicklists.pop(channel, None)
print(f"{self.get_timestamp()} Left {channel}")
if self.active_channel == channel:
self.active_channel = self.channels[0] if self.channels else None
print(f"{self.get_timestamp()} Active channel switched to: {self.active_channel or 'None'}")
else:
# Remove parting user from nicklist
if channel in self.nicklists and sender in self.nicklists[channel]:
self.nicklists[channel].remove(sender)
print(f"{self.get_timestamp()} {colored_sender} left {channel}")
except ValueError:
print(f"{self.get_timestamp()} Malformed PART: {line}")
# Handle QUIT
if "QUIT" in line:
try:
sender = line[1:line.index('!')] if '!' in line else None
colored_sender = self.get_colored_nick(sender)
with self.lock:
# Remove user from all channel nicklists
for channel in self.nicklists:
if sender in self.nicklists[channel]:
self.nicklists[channel].remove(sender)
print(f"{self.get_timestamp()} {colored_sender} quit")
except ValueError:
print(f"{self.get_timestamp()} Malformed QUIT: {line}")
# Handle topic
if "332" in line:
try:
channel = line.split()[3]
topic = line[line.index(':', 1) + 1:]
print(f"{self.get_timestamp()} Topic for {channel}: {topic}")
except (IndexError, ValueError):
print(f"{self.get_timestamp()} Malformed TOPIC (332): {line}")
# Handle WHO response
if "352" in line:
try:
parts = line.split()
if len(parts) >= 8:
channel, user, host, nick = parts[3], parts[4], parts[5], parts[7]
colored_nick = self.get_colored_nick(nick)
with self.lock:
if channel in self.nicklists:
self.nicklists[channel].add(nick)
print(f"{self.get_timestamp()} {colored_nick} ({user}@{host}) in {channel}")
else:
print(f"{self.get_timestamp()} Malformed WHO reply (352): {line}")
except (IndexError, ValueError):
print(f"{self.get_timestamp()} Malformed WHO reply (352): {line}")
except Exception as e:
print(f"{self.get_timestamp()} Server error: {e}")
self.reconnect()
break
def reconnect(self):
self.running = False
if self.irc:
try:
self.irc.close()
except:
pass
self.irc = None
attempts = 0
while not self.running and attempts < 5:
delay = min(self.reconnect_delay * (2 ** attempts), self.max_reconnect_delay)
print(f"{self.get_timestamp()} Reconnecting in {delay} seconds...")
time.sleep(delay)
self.running = True
if self.connect():
print(f"{self.get_timestamp()} Reconnected successfully.")
return
attempts += 1
print(f"{self.get_timestamp()} Failed to reconnect after {attempts} attempts.")
self.running = False
def process_commands(self):
while self.running:
try:
message = self.message_queue.get(timeout=1)
if message.lower() == "quit":
self.send("QUIT :Goodbye")
self.running = False
elif message.startswith('/'):
self.handle_command(message)
else:
with self.lock:
if self.active_channel:
self.send(f"PRIVMSG {self.active_channel} :{message}")
else:
print(f"{self.get_timestamp()} No active channel. Use /join <channel> or /switch <index>.")
except queue.Empty:
continue
except Exception as e:
print(f"{self.get_timestamp()} Command error: {e}")
def handle_command(self, command):
parts = command.split(maxsplit=2)
cmd = parts[0].lower()
if cmd == "/nick" and len(parts) > 1:
self.nickname = parts[1]
self.send(f"NICK {self.nickname}")
elif cmd == "/join" and len(parts) > 1:
channel = parts[1]
self.send(f"JOIN {channel}")
with self.lock:
if channel not in self.channels:
self.channels.append(channel)
self.nicklists[channel] = set()
print(f"{self.get_timestamp()} Requested to join {channel}")
elif cmd == "/msg" and len(parts) > 2:
target = parts[1]
msg = parts[2]
self.send(f"PRIVMSG {target} :{msg}")
elif cmd == "/me" and len(parts) > 1:
with self.lock:
if self.active_channel:
msg = parts[1]
self.send(f"PRIVMSG {self.active_channel} :\x01ACTION {msg}\x01")
else:
print(f"{self.get_timestamp()} No active channel.")
elif cmd == "/list":
with self.lock:
if self.channels:
print(f"{self.get_timestamp()} Active channels:")
for i, channel in enumerate(self.channels):
mark = "*" if channel == self.active_channel else " "
print(f"{self.get_timestamp()} {i}: {channel}{mark}")
else:
print(f"{self.get_timestamp()} No channels joined.")
elif cmd == "/part" and len(parts) > 1:
channel = parts[1]
self.send(f"PART {channel}")
elif cmd == "/switch" and len(parts) > 1:
try:
index = int(parts[1])
with self.lock:
if 0 <= index < len(self.channels):
self.active_channel = self.channels[index]
print(f"{self.get_timestamp()} Switched to active channel: {self.active_channel}")
else:
print(f"{self.get_timestamp()} Invalid index. Use /list to see channels.")
except ValueError:
print(f"{self.get_timestamp()} Invalid index. Use a number (e.g., /switch 0).")
elif cmd == "/who" and len(parts) > 1:
self.send(f"WHO {parts[1]}")
elif cmd == "/topic" and len(parts) > 1:
channel = parts[1]
if len(parts) > 2:
self.send(f"TOPIC {channel} :{parts[2]}")
else:
self.send(f"TOPIC {channel}")
elif cmd == "/clear":
print("\033[H\033[2J", end="")
elif cmd == "/help":
print(f"{self.get_timestamp()} Commands: /nick <nick>, /join <channel>, /msg <target> <msg>, /me <action>, /list, /part <channel>, /switch <index>, /who <channel>, /topic <channel> [new topic], /clear, /help, quit")
else:
print(f"{self.get_timestamp()} Unknown command or invalid syntax. Use /help.")
def run(self):
if not self.connect():
return
server_thread = threading.Thread(target=self.handle_server)
input_thread = threading.Thread(target=self.handle_input)
command_thread = threading.Thread(target=self.process_commands)
server_thread.start()
input_thread.start()
command_thread.start()
try:
server_thread.join()
input_thread.join()
command_thread.join()
except KeyboardInterrupt:
self.running = False
if self.irc:
try:
self.irc.close()
except:
pass
print(f"{self.get_timestamp()} Connection closed.")
def main():
client = IRCClient()
client.run()
if __name__ == "__main__":
main()
second version
That first version worked for a day and then started having nicklist and buffer issues. This new version should have taken care of those problems.
import socket
import ssl
import threading
import sys
import time
import queue
from datetime import datetime
import readline
import hashlib
class IRCClient:
def __init__(self):
self.server = "irc.rizon.net"
self.port = 6697
self.channels = ["#/sp/"] # Only #/sp/ channel
self.active_channel = self.channels[0]
self.nickname = "LullSac"
self.realname = "Termux IRC Client for #/sp/"
self.irc = None
self.context = ssl.create_default_context()
self.running = True
self.message_queue = queue.Queue()
self.lock = threading.Lock()
self.reconnect_delay = 5
self.max_reconnect_delay = 300
# Initialize nicklists with proper synchronization
self.nicklists = {channel: set() for channel in self.channels}
# ANSI color codes for nicknames
self.nick_colors = [
"\033[31m", "\033[32m", "\033[33m", "\033[34m",
"\033[35m", "\033[36m", "\033[91m", "\033[92m"
]
self.color_reset = "\033[0m"
def get_colored_nick(self, nick):
"""Return colored nickname based on consistent hash."""
if not nick:
return nick
hash_value = int(hashlib.md5(nick.encode('utf-8')).hexdigest(), 16)
color_index = hash_value % len(self.nick_colors)
return f"{self.nick_colors[color_index]}{nick}{self.color_reset}"
def connect(self):
try:
raw_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.irc = self.context.wrap_socket(raw_socket, server_hostname=self.server)
self.irc.connect((self.server, self.port))
self.send(f"USER {self.nickname} 0 * :{self.realname}")
self.send(f"NICK {self.nickname}")
return True
except Exception as e:
print(f"{self.get_timestamp()} Connection error: {e}")
return False
def send(self, message):
try:
self.irc.send(f"{message}\r\n".encode('utf-8'))
except Exception as e:
print(f"{self.get_timestamp()} Error sending message: {e}")
def get_timestamp(self):
return f"\033[34m[{datetime.now().strftime('%I:%M %p')}]\033[0m"
def handle_server(self):
buffer = ""
while self.running:
try:
data = self.irc.recv(2048).decode('utf-8', errors='ignore')
if not data:
raise Exception("Connection closed by server")
buffer += data
lines = buffer.split('\r\n')
buffer = lines.pop() # Keep incomplete line in buffer
for line in lines:
if not line.strip():
continue
# Handle PING immediately
if line.startswith("PING"):
self.send(f"PONG {line.split()[1]}")
continue
# Handle server responses
if "PRIVMSG" in line:
self.handle_privmsg(line)
elif "JOIN" in line:
self.handle_join(line)
elif "PART" in line:
self.handle_part(line)
elif "QUIT" in line:
self.handle_quit(line)
elif "352" in line: # WHO response
self.handle_who(line)
elif "376" in line or "422" in line: # End of MOTD
self.join_channels()
elif "433" in line: # Nick in use
self.handle_nick_in_use()
elif "332" in line: # TOPIC
self.handle_topic(line)
else:
print(f"{self.get_timestamp()} {line}")
except Exception as e:
print(f"{self.get_timestamp()} Server error: {e}")
self.reconnect()
break
def handle_privmsg(self, line):
try:
sender = line[1:line.index('!')]
target = line.split()[2]
msg = line[line.index(':', 1) + 1:]
colored_sender = self.get_colored_nick(sender)
if msg.startswith('\x01ACTION '):
print(f"{self.get_timestamp()} * {colored_sender} {msg[8:-1]}")
elif msg.startswith('\x01'):
print(f"{self.get_timestamp()} CTCP from {colored_sender}: {msg}")
else:
print(f"{self.get_timestamp()} <{colored_sender}> {msg}")
except ValueError:
print(f"{self.get_timestamp()} Malformed PRIVMSG: {line}")
def handle_join(self, line):
try:
sender = line[1:line.index('!')]
channel = line.split(':', 2)[-1].strip()
colored_sender = self.get_colored_nick(sender)
with self.lock:
if sender == self.nickname:
if channel not in self.channels:
self.channels.append(channel)
self.nicklists[channel] = set()
print(f"{self.get_timestamp()} Joined {channel}")
self.send(f"WHO {channel}")
else:
if channel in self.nicklists:
self.nicklists[channel].add(sender)
print(f"{self.get_timestamp()} {colored_sender} joined {channel}")
except ValueError:
print(f"{self.get_timestamp()} Malformed JOIN: {line}")
def handle_part(self, line):
try:
sender = line[1:line.index('!')]
channel = line.split()[2]
colored_sender = self.get_colored_nick(sender)
with self.lock:
if sender == self.nickname:
if channel in self.channels:
self.channels.remove(channel)
self.nicklists.pop(channel, None)
print(f"{self.get_timestamp()} Left {channel}")
if self.active_channel == channel:
self.active_channel = self.channels[0] if self.channels else None
else:
if channel in self.nicklists and sender in self.nicklists[channel]:
self.nicklists[channel].remove(sender)
print(f"{self.get_timestamp()} {colored_sender} left {channel}")
except ValueError:
print(f"{self.get_timestamp()} Malformed PART: {line}")
def handle_quit(self, line):
try:
sender = line[1:line.index('!')]
colored_sender = self.get_colored_nick(sender)
with self.lock:
for channel in self.nicklists:
if sender in self.nicklists[channel]:
self.nicklists[channel].remove(sender)
print(f"{self.get_timestamp()} {colored_sender} quit")
except ValueError:
print(f"{self.get_timestamp()} Malformed QUIT: {line}")
def handle_who(self, line):
try:
parts = line.split()
if len(parts) >= 8:
channel = parts[3]
nick = parts[7]
with self.lock:
if channel in self.nicklists:
self.nicklists[channel].add(nick)
except (IndexError, ValueError):
print(f"{self.get_timestamp()} Malformed WHO reply: {line}")
def join_channels(self):
with self.lock:
for channel in self.channels:
self.send(f"JOIN {channel}")
self.send(f"WHO {channel}")
def handle_nick_in_use(self):
self.nickname = f"{self.nickname}_"
self.send(f"NICK {self.nickname}")
print(f"{self.get_timestamp()} Nick in use, trying {self.get_colored_nick(self.nickname)}")
def handle_topic(self, line):
try:
channel = line.split()[3]
topic = line[line.index(':', 1) + 1:]
print(f"{self.get_timestamp()} Topic for {channel}: {topic}")
except (IndexError, ValueError):
print(f"{self.get_timestamp()} Malformed TOPIC: {line}")
def reconnect(self):
self.running = False
if self.irc:
try:
self.irc.close()
except:
pass
self.irc = None
attempts = 0
while attempts < 5:
delay = min(self.reconnect_delay * (2 ** attempts), self.max_reconnect_delay)
print(f"{self.get_timestamp()} Reconnecting in {delay} seconds...")
time.sleep(delay)
self.running = True
if self.connect():
print(f"{self.get_timestamp()} Reconnected successfully.")
return
attempts += 1
print(f"{self.get_timestamp()} Failed to reconnect after {attempts} attempts.")
self.running = False
def handle_input(self):
readline.set_completer(self.complete_nick)
readline.parse_and_bind("tab: complete")
while self.running:
try:
with self.lock:
prompt = f"[{self.active_channel}] > " if self.active_channel else "[No channel] > "
sys.stdout.write(prompt)
sys.stdout.flush()
message = input().strip()
if not self.running:
break
if message:
self.message_queue.put(message)
except KeyboardInterrupt:
self.message_queue.put("QUIT :Goodbye")
break
except Exception as e:
print(f"{self.get_timestamp()} Input error: {e}")
def complete_nick(self, text, state):
"""Tab completion for nicknames in active channel."""
with self.lock:
if not self.active_channel:
return None
nicklist = self.nicklists.get(self.active_channel, set())
matches = [nick for nick in nicklist if nick.lower().startswith(text.lower())]
return matches[state] if state < len(matches) else None
def process_commands(self):
while self.running:
try:
message = self.message_queue.get(timeout=1)
if message.lower() == "quit":
self.send("QUIT :Goodbye")
self.running = False
elif message.startswith('/'):
self.handle_command(message)
else:
with self.lock:
if self.active_channel:
self.send(f"PRIVMSG {self.active_channel} :{message}")
else:
print(f"{self.get_timestamp()} No active channel. Use /join <channel>")
except queue.Empty:
continue
except Exception as e:
print(f"{self.get_timestamp()} Command error: {e}")
def handle_command(self, command):
parts = command.split(maxsplit=2)
cmd = parts[0].lower()
if cmd == "/nick" and len(parts) > 1:
self.nickname = parts[1]
self.send(f"NICK {self.nickname}")
elif cmd == "/join" and len(parts) > 1:
channel = parts[1]
self.send(f"JOIN {channel}")
with self.lock:
if channel not in self.channels:
self.channels.append(channel)
self.nicklists[channel] = set()
print(f"{self.get_timestamp()} Requested to join {channel}")
elif cmd == "/msg" and len(parts) > 2:
target = parts[1]
msg = parts[2]
self.send(f"PRIVMSG {target} :{msg}")
elif cmd == "/me" and len(parts) > 1:
with self.lock:
if self.active_channel:
msg = parts[1]
self.send(f"PRIVMSG {self.active_channel} :\x01ACTION {msg}\x01")
else:
print(f"{self.get_timestamp()} No active channel.")
elif cmd == "/list":
with self.lock:
if self.channels:
print(f"{self.get_timestamp()} Active channels:")
for i, channel in enumerate(self.channels):
mark = "*" if channel == self.active_channel else " "
print(f"{self.get_timestamp()} {i}: {channel}{mark}")
else:
print(f"{self.get_timestamp()} No channels joined.")
elif cmd == "/part" and len(parts) > 1:
channel = parts[1]
self.send(f"PART {channel}")
elif cmd == "/switch" and len(parts) > 1:
try:
index = int(parts[1])
with self.lock:
if 0 <= index < len(self.channels):
self.active_channel = self.channels[index]
print(f"{self.get_timestamp()} Switched to active channel: {self.active_channel}")
else:
print(f"{self.get_timestamp()} Invalid index. Use /list to see channels.")
except ValueError:
print(f"{self.get_timestamp()} Invalid index. Use a number (e.g., /switch 0).")
elif cmd == "/who" and len(parts) > 1:
self.send(f"WHO {parts[1]}")
elif cmd == "/topic" and len(parts) > 1:
channel = parts[1]
if len(parts) > 2:
self.send(f"TOPIC {channel} :{parts[2]}")
else:
self.send(f"TOPIC {channel}")
elif cmd == "/clear":
print("\033[H\033[2J", end="")
elif cmd == "/help":
print(f"{self.get_timestamp()} Commands: /nick <nick>, /join <channel>, /msg <target> <msg>, /me <action>, /list, /part <channel>, /switch <index>, /who <channel>, /topic <channel> [new topic], /clear, /help, quit")
else:
print(f"{self.get_timestamp()} Unknown command or invalid syntax. Use /help.")
def run(self):
if not self.connect():
return
server_thread = threading.Thread(target=self.handle_server)
input_thread = threading.Thread(target=self.handle_input)
command_thread = threading.Thread(target=self.process_commands)
server_thread.start()
input_thread.start()
command_thread.start()
try:
server_thread.join()
input_thread.join()
command_thread.join()
except KeyboardInterrupt:
self.running = False
if self.irc:
try:
self.irc.close()
except:
pass
print(f"{self.get_timestamp()} Connection closed.")
def main():
client = IRCClient()
client.run()
if __name__ == "__main__":
main()
third version
This version added timestamps to chat, action, and private messages for the user. Do not use. This version does add timestamps, but it doubles up your chats in the buffer. Pick this up another time.
https://deepsite.site/chat/m37mtoma7frlmc
https://x.com/i/grok?conversation=1918464626129277043
import socket
import ssl
import threading
import sys
import time
import queue
from datetime import datetime
import readline
import hashlib
class IRCClient:
def __init__(self):
self.server = "irc.rizon.net"
self.port = 6697
self.channels = ["#/sp/"] # Only #/sp/ channel
self.active_channel = self.channels[0]
self.nickname = "LullSac"
self.realname = "Termux IRC Client for #/sp/"
self.irc = None
self.context = ssl.create_default_context()
self.running = True
self.message_queue = queue.Queue()
self.lock = threading.Lock()
self.reconnect_delay = 5
self.max_reconnect_delay = 300
# Initialize nicklists: dictionary mapping channels to sets of nicks
self.nicklists = {channel: set() for channel in self.channels}
# Define ANSI color codes for nicknames
self.nick_colors = [
"\033[31m", # Red
"\033[32m", # Green
"\033[33m", # Yellow
"\033[34m", # Blue
"\033[35m", # Magenta
"\033[36m", # Cyan
"\033[91m", # Bright Red
"\033[92m", # Bright Green
]
self.color_reset = "\033[0m"
def get_colored_nick(self, nick):
"""Return the nickname wrapped in a consistent ANSI color based on its hash."""
if not nick:
return nick
# Hash the nickname to get a consistent index
hash_value = int(hashlib.md5(nick.encode('utf-8')).hexdigest(), 16)
color_index = hash_value % len(self.nick_colors)
color = self.nick_colors[color_index]
return f"{color}{nick}{self.color_reset}"
def connect(self):
try:
raw_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.irc = self.context.wrap_socket(raw_socket, server_hostname=self.server)
print(f"{self.get_timestamp()} Connecting to {self.server}:{self.port}...")
self.irc.connect((self.server, self.port))
self.send(f"USER {self.nickname} 0 * :{self.realname}")
self.send(f"NICK {self.nickname}")
return True
except Exception as e:
print(f"{self.get_timestamp()} Connection error: {e}")
return False
def send(self, message):
try:
self.irc.send(f"{message}\r\n".encode('utf-8'))
except Exception as e:
print(f"{self.get_timestamp()} Error sending message: {e}")
def get_timestamp(self):
"""Return blue timestamp in format [HH:MM AM/PM]"""
timestamp = datetime.now().strftime("%I:%M %p")
return f"\033[34m[{timestamp}]\033[0m"
def complete_nick(self, text, state):
"""Tab completion function for nicknames."""
with self.lock:
# Get the nicklist for the active channel
nicklist = self.nicklists.get(self.active_channel, set()) if self.active_channel else set()
# Find matches: nicks starting with the input text (case-insensitive)
matches = [nick for nick in nicklist if nick.lower().startswith(text.lower())]
# Return the state-th match, or None if no more matches
return matches[state] if state < len(matches) else None
def handle_input(self):
# Set up readline tab completion
readline.set_completer(self.complete_nick)
readline.parse_and_bind("tab: complete")
while self.running:
try:
with self.lock:
prompt = f"[{self.active_channel}] > " if self.active_channel else "[No channel] > "
sys.stdout.write(prompt)
sys.stdout.flush()
message = input().strip()
if not self.running:
break
if message:
self.message_queue.put(message)
except KeyboardInterrupt:
self.message_queue.put("QUIT :Goodbye")
break
except Exception as e:
print(f"{self.get_timestamp()} Input error: {e}")
def handle_server(self):
buffer = ""
while self.running:
try:
data = self.irc.recv(2048).decode('utf-8', errors='ignore')
if not data:
print(f"{self.get_timestamp()} Disconnected from server.")
self.reconnect()
break
buffer += data
lines = buffer.split('\r\n')
buffer = lines.pop() # Keep incomplete line in buffer
for line in lines:
if not line.strip():
continue
# Handle PING immediately
if line.startswith("PING"):
self.send(f"PONG {line.split()[1]}")
continue
# Handle server responses
if "PRIVMSG" in line:
self.handle_privmsg(line)
elif "JOIN" in line:
self.handle_join(line)
elif "PART" in line:
self.handle_part(line)
elif "QUIT" in line:
self.handle_quit(line)
elif "352" in line: # WHO response
self.handle_who(line)
elif "376" in line or "422" in line: # End of MOTD
self.join_channels()
elif "433" in line: # Nick in use
self.handle_nick_in_use()
elif "332" in line: # TOPIC
self.handle_topic(line)
else:
print(f"{self.get_timestamp()} {line}")
except Exception as e:
print(f"{self.get_timestamp()} Server error: {e}")
self.reconnect()
break
def handle_privmsg(self, line):
try:
sender = line[1:line.index('!')]
# Skip if this is our own message (we already printed it locally)
if sender == self.nickname:
return
target = line.split()[2]
msg = line[line.index(':', 1) + 1:]
colored_sender = self.get_colored_nick(sender)
if msg.startswith('\x01ACTION '):
msg = msg[8:-1]
print(f"{self.get_timestamp()} * {colored_sender}@{target} {msg}")
elif msg.startswith('\x01'):
print(f"{self.get_timestamp()} CTCP from {colored_sender}: {msg}")
else:
print(f"{self.get_timestamp()} <{colored_sender}@{target}> {msg}")
except ValueError:
print(f"{self.get_timestamp()} Malformed PRIVMSG: {line}")
def handle_join(self, line):
try:
sender = line[1:line.index('!')]
channel = line.split(':', 2)[-1].strip()
colored_sender = self.get_colored_nick(sender)
with self.lock:
if sender == self.nickname:
if channel not in self.channels:
self.channels.append(channel)
self.nicklists[channel] = set()
print(f"{self.get_timestamp()} Joined {channel}")
self.send(f"WHO {channel}")
else:
if channel in self.nicklists:
self.nicklists[channel].add(sender)
print(f"{self.get_timestamp()} {colored_sender} joined {channel}")
except ValueError:
print(f"{self.get_timestamp()} Malformed JOIN: {line}")
def handle_part(self, line):
try:
sender = line[1:line.index('!')]
channel = line.split()[2]
colored_sender = self.get_colored_nick(sender)
with self.lock:
if sender == self.nickname:
if channel in self.channels:
self.channels.remove(channel)
self.nicklists.pop(channel, None)
print(f"{self.get_timestamp()} Left {channel}")
if self.active_channel == channel:
self.active_channel = self.channels[0] if self.channels else None
print(f"{self.get_timestamp()} Active channel switched to: {self.active_channel or 'None'}")
else:
if channel in self.nicklists and sender in self.nicklists[channel]:
self.nicklists[channel].remove(sender)
print(f"{self.get_timestamp()} {colored_sender} left {channel}")
except ValueError:
print(f"{self.get_timestamp()} Malformed PART: {line}")
def handle_quit(self, line):
try:
sender = line[1:line.index('!')] if '!' in line else None
colored_sender = self.get_colored_nick(sender)
with self.lock:
# Remove user from all channel nicklists
for channel in self.nicklists:
if sender in self.nicklists[channel]:
self.nicklists[channel].remove(sender)
print(f"{self.get_timestamp()} {colored_sender} quit")
except ValueError:
print(f"{self.get_timestamp()} Malformed QUIT: {line}")
def handle_who(self, line):
try:
parts = line.split()
if len(parts) >= 8:
channel = parts[3]
nick = parts[7]
with self.lock:
if channel in self.nicklists:
self.nicklists[channel].add(nick)
print(f"{self.get_timestamp()} {self.get_colored_nick(nick)} joined {channel}")
else:
print(f"{self.get_timestamp()} Malformed WHO reply (352): {line}")
except (IndexError, ValueError):
print(f"{self.get_timestamp()} Malformed WHO reply (352): {line}")
def handle_topic(self, line):
try:
channel = line.split()[3]
topic = line[line.index(':', 1) + 1:]
print(f"{self.get_timestamp()} Topic for {channel}: {topic}")
except (IndexError, ValueError):
print(f"{self.get_timestamp()} Malformed TOPIC (332): {line}")
def handle_nick_in_use(self):
self.nickname = f"{self.nickname}_"
self.send(f"NICK {self.nickname}")
print(f"{self.get_timestamp()} Nick in use, trying {self.get_colored_nick(self.nickname)}")
def join_channels(self):
with self.lock:
for channel in self.channels:
self.send(f"JOIN {channel}")
print(f"{self.get_timestamp()} Joined {channel}")
# Request WHO to populate nicklist
self.send(f"WHO {channel}")
self.active_channel = self.channels[0] if self.channels else None
def reconnect(self):
self.running = False
if self.irc:
try:
self.irc.close()
except:
pass
self.irc = None
attempts = 0
while not self.running and attempts < 5:
delay = min(self.reconnect_delay * (2 ** attempts), self.max_reconnect_delay)
print(f"{self.get_timestamp()} Reconnecting in {delay} seconds...")
time.sleep(delay)
self.running = True
if self.connect():
print(f"{self.get_timestamp()} Reconnected successfully.")
return
attempts += 1
print(f"{self.get_timestamp()} Failed to reconnect after {attempts} attempts.")
self.running = False
def process_commands(self):
while self.running:
try:
message = self.message_queue.get(timeout=1)
if message.lower() == "quit":
self.send("QUIT :Goodbye")
self.running = False
elif message.startswith('/'):
self.handle_command(message)
else:
with self.lock:
if self.active_channel:
# Only print our message locally (with timestamp)
colored_nick = self.get_colored_nick(self.nickname)
print(f"{self.get_timestamp()} <{colored_nick}> {message}")
# Send to server but don't print the echo
self.send(f"PRIVMSG {self.active_channel} :{message}")
else:
print(f"{self.get_timestamp()} No active channel. Use /join <channel> or /switch <index>.")
except queue.Empty:
continue
except Exception as e:
print(f"{self.get_timestamp()} Command error: {e}")
def handle_command(self, command):
parts = command.split(maxsplit=2)
cmd = parts[0].lower()
if cmd == "/nick" and len(parts) > 1:
self.nickname = parts[1]
self.send(f"NICK {self.nickname}")
print(f"{self.get_timestamp()} Changed nick to {self.get_colored_nick(self.nickname)}")
elif cmd == "/join" and len(parts) > 1:
channel = parts[1]
self.send(f"JOIN {channel}")
with self.lock:
if channel not in self.channels:
self.channels.append(channel)
self.nicklists[channel] = set()
print(f"{self.get_timestamp()} Joined {channel}")
if not self.active_channel:
self.active_channel = channel
self.send(f"WHO {channel}")
else:
print(f"{self.get_timestamp()} Already in {channel}")
elif cmd == "/msg" and len(parts) > 2:
target = parts[1]
msg = parts[2]
colored_nick = self.get_colored_nick(self.nickname)
print(f"{self.get_timestamp()} -> {colored_nick}@{target}> {msg}")
self.send(f"PRIVMSG {target} :{msg}")
elif cmd == "/me" and len(parts) > 1:
with self.lock:
if self.active_channel:
msg = parts[1]
colored_nick = self.get_colored_nick(self.nickname)
print(f"{self.get_timestamp()} * {colored_nick} {msg}")
self.send(f"PRIVMSG {self.active_channel} :\x01ACTION {msg}\x01")
else:
print(f"{self.get_timestamp()} No active channel.")
elif cmd == "/list":
with self.lock:
if self.channels:
print(f"{self.get_timestamp()} Active channels:")
for i, channel in enumerate(self.channels):
mark = "*" if channel == self.active_channel else " "
print(f"{self.get_timestamp()} {i}: {channel}{mark}")
else:
print(f"{self.get_timestamp()} No channels joined.")
elif cmd == "/part" and len(parts) > 1:
channel = parts[1]
self.send(f"PART {channel}")
elif cmd == "/switch" and len(parts) > 1:
try:
index = int(parts[1])
with self.lock:
if 0 <= index < len(self.channels):
self.active_channel = self.channels[index]
print(f"{self.get_timestamp()} Switched to active channel: {self.active_channel}")
else:
print(f"{self.get_timestamp()} Invalid index. Use /list to see channels.")
except ValueError:
print(f"{self.get_timestamp()} Invalid index. Use a number (e.g., /switch 0).")
elif cmd == "/who" and len(parts) > 1:
self.send(f"WHO {parts[1]}")
elif cmd == "/topic" and len(parts) > 1:
channel = parts[1]
if len(parts) > 2:
self.send(f"TOPIC {channel} :{parts[2]}")
else:
self.send(f"TOPIC {channel}")
elif cmd == "/clear":
print("\033[H\033[2J", end="")
elif cmd == "/help":
print(f"{self.get_timestamp()} Commands: /nick <nick>, /join <channel>, /msg <target> <msg>, /me <action>, /list, /part <channel>, /switch <index>, /who <channel>, /topic <channel> [new topic], /clear, /help, quit")
else:
print(f"{self.get_timestamp()} Unknown command or invalid syntax. Use /help.")
def run(self):
if not self.connect():
return
server_thread = threading.Thread(target=self.handle_server)
input_thread = threading.Thread(target=self.handle_input)
command_thread = threading.Thread(target=self.process_commands)
server_thread.start()
input_thread.start()
command_thread.start()
try:
server_thread.join()
input_thread.join()
command_thread.join()
except KeyboardInterrupt:
self.running = False
if self.irc:
try:
self.irc.close()
except:
pass
print(f"{self.get_timestamp()} Connection closed.")
def main():
client = IRCClient()
client.run()
if __name__ == "__main__":
main()