Difference between revisions of "Termux IRC Client SP"
Jump to navigation
Jump to search
Line 314: | Line 314: | ||
parts = command.split(maxsplit=2) | parts = command.split(maxsplit=2) | ||
cmd = parts[0].lower() | cmd = parts[0].lower() | ||
if cmd == "/nick" and len(parts) > 1: | |||
self.nickname = parts[1] | |||
self.send(f"NICK {self.nickname}") | |||
elif cmd == "/join" and len(parts) > 1: | |||
channel = parts[1] | |||
self.send(f"JOIN {channel}") | |||
with self.lock: | |||
if channel not in self.channels: | |||
self.channels.append(channel) | |||
self.nicklists[channel] = set() | |||
print(f"{self.get_timestamp()} Requested to join {channel}") | |||
elif cmd == "/msg" and len(parts) > 2: | |||
target = parts[1] | |||
msg = parts[2] | |||
self.send(f"PRIVMSG {target} :{msg}") | |||
elif cmd == "/me" and len(parts) > 1: | |||
with self.lock: | |||
if self.active_channel: | |||
msg = parts[1] | |||
self.send(f"PRIVMSG {self.active_channel} :\x01ACTION {msg}\x01") | |||
else: | |||
print(f"{self.get_timestamp()} No active channel.") | |||
elif cmd == "/list": | |||
with self.lock: | |||
if self.channels: | |||
print(f"{self.get_timestamp()} Active channels:") | |||
for i, channel in enumerate(self.channels): | |||
mark = "*" if channel == self.active_channel else " " | |||
print(f"{self.get_timestamp()} {i}: {channel}{mark}") | |||
else: | |||
print(f"{self.get_timestamp()} No channels joined.") | |||
elif cmd == "/part" and len(parts) > 1: | |||
channel = parts[1] | |||
self.send(f"PART {channel}") | |||
elif cmd == "/switch" and len(parts) > 1: | |||
try: | |||
index = int(parts[1]) | |||
with self.lock: | |||
if 0 <= index < len(self.channels): | |||
self.active_channel = self.channels[index] | |||
print(f"{self.get_timestamp()} Switched to active channel: {self.active_channel}") | |||
else: | |||
print(f"{self.get_timestamp()} Invalid index. Use /list to see channels.") | |||
except ValueError: | |||
print(f"{self.get_timestamp()} Invalid index. Use a number (e.g., /switch 0).") | |||
elif cmd == "/who" and len(parts) > 1: | |||
self.send(f"WHO {parts[1]}") | |||
elif cmd == "/topic" and len(parts) > 1: | |||
channel = parts[1] | |||
if len(parts) > 2: | |||
self.send(f"TOPIC {channel} :{parts[2]}") | |||
else: | |||
self.send(f"TOPIC {channel}") | |||
elif cmd == "/clear": | |||
print("\033[H\033[2J", end="") | |||
elif cmd == "/help": | |||
print(f"{self.get_timestamp()} Commands: /nick <nick>, /join <channel>, /msg <target> <msg>, /me <action>, /list, /part <channel>, /switch <index>, /who <channel>, /topic <channel> [new topic], /clear, /help, quit") | |||
else: | |||
print(f"{self.get_timestamp()} Unknown command or invalid syntax. Use /help.") | |||
def run(self): | |||
if not self.connect(): | |||
return | |||
server_thread = threading.Thread(target=self.handle_server) | |||
input_thread = threading.Thread(target=self.handle_input) | |||
command_thread = threading.Thread(target=self.process_commands) | |||
server_thread.start() | |||
input_thread.start() | |||
command_thread.start() | |||
try: | |||
server_thread.join() | |||
input_thread.join() | |||
command_thread.join() | |||
except KeyboardInterrupt: | |||
self.running = False | |||
if self.irc: | |||
try: | |||
self.irc.close() | |||
except: | |||
pass | |||
print(f"{self.get_timestamp()} Connection closed.") | |||
def main(): | |||
client = IRCClient() | |||
client.run() | |||
if __name__ == "__main__": | |||
main() | |||
</pre> | |||
==second version== | |||
That first version worked for a day and then started having nicklist and buffer issues. This new version should have taken care of those problems. | |||
<pre> | |||
import socket | |||
import ssl | |||
import threading | |||
import sys | |||
import time | |||
import queue | |||
from datetime import datetime | |||
import readline | |||
import hashlib | |||
class IRCClient: | |||
def __init__(self): | |||
self.server = "irc.rizon.net" | |||
self.port = 6697 | |||
self.channels = ["#/sp/"] # Only #/sp/ channel | |||
self.active_channel = self.channels[0] | |||
self.nickname = "LullSac" | |||
self.realname = "Termux IRC Client for #/sp/" | |||
self.irc = None | |||
self.context = ssl.create_default_context() | |||
self.running = True | |||
self.message_queue = queue.Queue() | |||
self.lock = threading.Lock() | |||
self.reconnect_delay = 5 | |||
self.max_reconnect_delay = 300 | |||
# Initialize nicklists with proper synchronization | |||
self.nicklists = {channel: set() for channel in self.channels} | |||
# ANSI color codes for nicknames | |||
self.nick_colors = [ | |||
"\033[31m", "\033[32m", "\033[33m", "\033[34m", | |||
"\033[35m", "\033[36m", "\033[91m", "\033[92m" | |||
] | |||
self.color_reset = "\033[0m" | |||
def get_colored_nick(self, nick): | |||
"""Return colored nickname based on consistent hash.""" | |||
if not nick: | |||
return nick | |||
hash_value = int(hashlib.md5(nick.encode('utf-8')).hexdigest(), 16) | |||
color_index = hash_value % len(self.nick_colors) | |||
return f"{self.nick_colors[color_index]}{nick}{self.color_reset}" | |||
def connect(self): | |||
try: | |||
raw_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |||
self.irc = self.context.wrap_socket(raw_socket, server_hostname=self.server) | |||
self.irc.connect((self.server, self.port)) | |||
self.send(f"USER {self.nickname} 0 * :{self.realname}") | |||
self.send(f"NICK {self.nickname}") | |||
return True | |||
except Exception as e: | |||
print(f"{self.get_timestamp()} Connection error: {e}") | |||
return False | |||
def send(self, message): | |||
try: | |||
self.irc.send(f"{message}\r\n".encode('utf-8')) | |||
except Exception as e: | |||
print(f"{self.get_timestamp()} Error sending message: {e}") | |||
def get_timestamp(self): | |||
return f"\033[34m[{datetime.now().strftime('%I:%M %p')}]\033[0m" | |||
def handle_server(self): | |||
buffer = "" | |||
while self.running: | |||
try: | |||
data = self.irc.recv(2048).decode('utf-8', errors='ignore') | |||
if not data: | |||
raise Exception("Connection closed by server") | |||
buffer += data | |||
lines = buffer.split('\r\n') | |||
buffer = lines.pop() # Keep incomplete line in buffer | |||
for line in lines: | |||
if not line.strip(): | |||
continue | |||
# Handle PING immediately | |||
if line.startswith("PING"): | |||
self.send(f"PONG {line.split()[1]}") | |||
continue | |||
# Handle server responses | |||
if "PRIVMSG" in line: | |||
self.handle_privmsg(line) | |||
elif "JOIN" in line: | |||
self.handle_join(line) | |||
elif "PART" in line: | |||
self.handle_part(line) | |||
elif "QUIT" in line: | |||
self.handle_quit(line) | |||
elif "352" in line: # WHO response | |||
self.handle_who(line) | |||
elif "376" in line or "422" in line: # End of MOTD | |||
self.join_channels() | |||
elif "433" in line: # Nick in use | |||
self.handle_nick_in_use() | |||
elif "332" in line: # TOPIC | |||
self.handle_topic(line) | |||
else: | |||
print(f"{self.get_timestamp()} {line}") | |||
except Exception as e: | |||
print(f"{self.get_timestamp()} Server error: {e}") | |||
self.reconnect() | |||
break | |||
def handle_privmsg(self, line): | |||
try: | |||
sender = line[1:line.index('!')] | |||
target = line.split()[2] | |||
msg = line[line.index(':', 1) + 1:] | |||
colored_sender = self.get_colored_nick(sender) | |||
if msg.startswith('\x01ACTION '): | |||
print(f"{self.get_timestamp()} * {colored_sender} {msg[8:-1]}") | |||
elif msg.startswith('\x01'): | |||
print(f"{self.get_timestamp()} CTCP from {colored_sender}: {msg}") | |||
else: | |||
print(f"{self.get_timestamp()} <{colored_sender}> {msg}") | |||
except ValueError: | |||
print(f"{self.get_timestamp()} Malformed PRIVMSG: {line}") | |||
def handle_join(self, line): | |||
try: | |||
sender = line[1:line.index('!')] | |||
channel = line.split(':', 2)[-1].strip() | |||
colored_sender = self.get_colored_nick(sender) | |||
with self.lock: | |||
if sender == self.nickname: | |||
if channel not in self.channels: | |||
self.channels.append(channel) | |||
self.nicklists[channel] = set() | |||
print(f"{self.get_timestamp()} Joined {channel}") | |||
self.send(f"WHO {channel}") | |||
else: | |||
if channel in self.nicklists: | |||
self.nicklists[channel].add(sender) | |||
print(f"{self.get_timestamp()} {colored_sender} joined {channel}") | |||
except ValueError: | |||
print(f"{self.get_timestamp()} Malformed JOIN: {line}") | |||
def handle_part(self, line): | |||
try: | |||
sender = line[1:line.index('!')] | |||
channel = line.split()[2] | |||
colored_sender = self.get_colored_nick(sender) | |||
with self.lock: | |||
if sender == self.nickname: | |||
if channel in self.channels: | |||
self.channels.remove(channel) | |||
self.nicklists.pop(channel, None) | |||
print(f"{self.get_timestamp()} Left {channel}") | |||
if self.active_channel == channel: | |||
self.active_channel = self.channels[0] if self.channels else None | |||
else: | |||
if channel in self.nicklists and sender in self.nicklists[channel]: | |||
self.nicklists[channel].remove(sender) | |||
print(f"{self.get_timestamp()} {colored_sender} left {channel}") | |||
except ValueError: | |||
print(f"{self.get_timestamp()} Malformed PART: {line}") | |||
def handle_quit(self, line): | |||
try: | |||
sender = line[1:line.index('!')] | |||
colored_sender = self.get_colored_nick(sender) | |||
with self.lock: | |||
for channel in self.nicklists: | |||
if sender in self.nicklists[channel]: | |||
self.nicklists[channel].remove(sender) | |||
print(f"{self.get_timestamp()} {colored_sender} quit") | |||
except ValueError: | |||
print(f"{self.get_timestamp()} Malformed QUIT: {line}") | |||
def handle_who(self, line): | |||
try: | |||
parts = line.split() | |||
if len(parts) >= 8: | |||
channel = parts[3] | |||
nick = parts[7] | |||
with self.lock: | |||
if channel in self.nicklists: | |||
self.nicklists[channel].add(nick) | |||
except (IndexError, ValueError): | |||
print(f"{self.get_timestamp()} Malformed WHO reply: {line}") | |||
def join_channels(self): | |||
with self.lock: | |||
for channel in self.channels: | |||
self.send(f"JOIN {channel}") | |||
self.send(f"WHO {channel}") | |||
def handle_nick_in_use(self): | |||
self.nickname = f"{self.nickname}_" | |||
self.send(f"NICK {self.nickname}") | |||
print(f"{self.get_timestamp()} Nick in use, trying {self.get_colored_nick(self.nickname)}") | |||
def handle_topic(self, line): | |||
try: | |||
channel = line.split()[3] | |||
topic = line[line.index(':', 1) + 1:] | |||
print(f"{self.get_timestamp()} Topic for {channel}: {topic}") | |||
except (IndexError, ValueError): | |||
print(f"{self.get_timestamp()} Malformed TOPIC: {line}") | |||
def reconnect(self): | |||
self.running = False | |||
if self.irc: | |||
try: | |||
self.irc.close() | |||
except: | |||
pass | |||
self.irc = None | |||
attempts = 0 | |||
while attempts < 5: | |||
delay = min(self.reconnect_delay * (2 ** attempts), self.max_reconnect_delay) | |||
print(f"{self.get_timestamp()} Reconnecting in {delay} seconds...") | |||
time.sleep(delay) | |||
self.running = True | |||
if self.connect(): | |||
print(f"{self.get_timestamp()} Reconnected successfully.") | |||
return | |||
attempts += 1 | |||
print(f"{self.get_timestamp()} Failed to reconnect after {attempts} attempts.") | |||
self.running = False | |||
def handle_input(self): | |||
readline.set_completer(self.complete_nick) | |||
readline.parse_and_bind("tab: complete") | |||
while self.running: | |||
try: | |||
with self.lock: | |||
prompt = f"[{self.active_channel}] > " if self.active_channel else "[No channel] > " | |||
sys.stdout.write(prompt) | |||
sys.stdout.flush() | |||
message = input().strip() | |||
if not self.running: | |||
break | |||
if message: | |||
self.message_queue.put(message) | |||
except KeyboardInterrupt: | |||
self.message_queue.put("QUIT :Goodbye") | |||
break | |||
except Exception as e: | |||
print(f"{self.get_timestamp()} Input error: {e}") | |||
def complete_nick(self, text, state): | |||
"""Tab completion for nicknames in active channel.""" | |||
with self.lock: | |||
if not self.active_channel: | |||
return None | |||
nicklist = self.nicklists.get(self.active_channel, set()) | |||
matches = [nick for nick in nicklist if nick.lower().startswith(text.lower())] | |||
return matches[state] if state < len(matches) else None | |||
def process_commands(self): | |||
while self.running: | |||
try: | |||
message = self.message_queue.get(timeout=1) | |||
if message.lower() == "quit": | |||
self.send("QUIT :Goodbye") | |||
self.running = False | |||
elif message.startswith('/'): | |||
self.handle_command(message) | |||
else: | |||
with self.lock: | |||
if self.active_channel: | |||
self.send(f"PRIVMSG {self.active_channel} :{message}") | |||
else: | |||
print(f"{self.get_timestamp()} No active channel. Use /join <channel>") | |||
except queue.Empty: | |||
continue | |||
except Exception as e: | |||
print(f"{self.get_timestamp()} Command error: {e}") | |||
def handle_command(self, command): | |||
parts = command.split(maxsplit=2) | |||
cmd = parts[0].lower() | |||
if cmd == "/nick" and len(parts) > 1: | if cmd == "/nick" and len(parts) > 1: | ||
self.nickname = parts[1] | self.nickname = parts[1] |
Revision as of 23:48, 2 May 2025
irc_sp.py
https://x.com/i/grok?conversation=1918094079356731464
import socket import ssl import threading import sys import time import queue from datetime import datetime import readline import hashlib class IRCClient: def __init__(self): self.server = "irc.rizon.net" self.port = 6697 self.channels = ["#/sp/"] # Only #/sp/ channel self.active_channel = self.channels[0] self.nickname = "LullSac" self.realname = "Termux IRC Client for #/sp/" self.irc = None self.context = ssl.create_default_context() self.running = True self.message_queue = queue.Queue() self.lock = threading.Lock() self.reconnect_delay = 5 self.max_reconnect_delay = 300 # Initialize nicklists: dictionary mapping channels to sets of nicks self.nicklists = {channel: set() for channel in self.channels} # Define ANSI color codes for nicknames self.nick_colors = [ "\033[31m", # Red "\033[32m", # Green "\033[33m", # Yellow "\033[34m", # Blue "\033[35m", # Magenta "\033[36m", # Cyan "\033[91m", # Bright Red "\033[92m", # Bright Green ] self.color_reset = "\033[0m" def get_colored_nick(self, nick): """Return the nickname wrapped in a consistent ANSI color based on its hash.""" if not nick: return nick # Hash the nickname to get a consistent index hash_value = int(hashlib.md5(nick.encode('utf-8')).hexdigest(), 16) color_index = hash_value % len(self.nick_colors) color = self.nick_colors[color_index] return f"{color}{nick}{self.color_reset}" def connect(self): try: raw_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.irc = self.context.wrap_socket(raw_socket, server_hostname=self.server) print(f"{self.get_timestamp()} Connecting to {self.server}:{self.port}...") self.irc.connect((self.server, self.port)) self.send(f"USER {self.nickname} 0 * :{self.realname}") self.send(f"NICK {self.nickname}") return True except Exception as e: print(f"{self.get_timestamp()} Connection error: {e}") return False def send(self, message): try: self.irc.send(f"{message}\r\n".encode('utf-8')) except Exception as e: print(f"{self.get_timestamp()} Error sending message: {e}") def get_timestamp(self): timestamp = datetime.now().strftime("%I:%M %p") return f"\033[34m[{timestamp}]\033[0m" def complete_nick(self, text, state): """Tab completion function for nicknames.""" with self.lock: # Get the nicklist for the active channel nicklist = self.nicklists.get(self.active_channel, set()) if self.active_channel else set() # Find matches: nicks starting with the input text (case-insensitive) matches = [nick for nick in nicklist if nick.lower().startswith(text.lower())] # Return the state-th match, or None if no more matches return matches[state] if state < len(matches) else None def handle_input(self): # Set up readline tab completion readline.set_completer(self.complete_nick) readline.parse_and_bind("tab: complete") while self.running: try: with self.lock: prompt = f"[{self.active_channel}] > " if self.active_channel else "[No channel] > " sys.stdout.write(prompt) sys.stdout.flush() message = input().strip() if not self.running: break if message: self.message_queue.put(message) except KeyboardInterrupt: self.message_queue.put("QUIT :Goodbye") break except Exception as e: print(f"{self.get_timestamp()} Input error: {e}") def handle_server(self): while self.running: try: data = self.irc.recv(2048).decode('utf-8', errors='ignore') if not data: print(f"{self.get_timestamp()} Disconnected from server.") self.reconnect() break for line in data.strip().split('\r\n'): if not line: continue # Define messages to handle specially handled_messages = ["PRIVMSG", "JOIN", "PART", "PING", "376", "422", "433", "332", "352", "QUIT"] # Check if the message is handled by examining the command field parts = line.split() is_handled = False command = None if len(parts) >= 2: command = parts[1] if parts[0].startswith(':') else parts[0] is_handled = command in handled_messages # Only print raw line if not handled if not is_handled: print(f"{self.get_timestamp()} {line}") # Handle PING silently if line.startswith("PING"): pong_response = f"PONG {line.split()[1]}" self.send(pong_response) continue # Skip further processing to avoid printing # Join channels after MOTD and request WHO if "376" in line or "422" in line: with self.lock: for channel in self.channels: self.send(f"JOIN {channel}") print(f"{self.get_timestamp()} Joined {channel}") # Request WHO to populate nicklist self.send(f"WHO {channel}") self.active_channel = self.channels[0] if self.channels else None # Handle nick in use if "433" in line: self.nickname = f"{self.nickname}_" self.send(f"NICK {self.nickname}") print(f"{self.get_timestamp()} Nick in use, trying {self.get_colored_nick(self.nickname)}") # Parse PRIVMSG if "PRIVMSG" in line: try: sender = line[1:line.index('!')] target = line.split()[2] msg = line[line.index(':', 1) + 1:] colored_sender = self.get_colored_nick(sender) if msg.startswith('\x01ACTION '): msg = msg[8:-1] print(f"{self.get_timestamp()} * {colored_sender}@{target} {msg}") elif msg.startswith('\x01'): print(f"{self.get_timestamp()} CTCP from {colored_sender}: {msg}") else: print(f"{self.get_timestamp()} <{colored_sender}@{target}> {msg}") except ValueError: print(f"{self.get_timestamp()} Malformed PRIVMSG: {line}") # Handle channel join if "JOIN" in line: try: sender = line[1:line.index('!')] if '!' in line else None channel = line.split(':', 2)[-1] colored_sender = self.get_colored_nick(sender) with self.lock: if sender == self.nickname: if channel not in self.channels: self.channels.append(channel) self.nicklists[channel] = set() print(f"{self.get_timestamp()} Added {channel} to active channels.") if not self.active_channel: self.active_channel = channel # Request WHO to populate nicklist self.send(f"WHO {channel}") else: # Add joining user to nicklist if channel in self.nicklists: self.nicklists[channel].add(sender) print(f"{self.get_timestamp()} {colored_sender} joined {channel}") except ValueError: print(f"{self.get_timestamp()} Malformed JOIN: {line}") # Handle parting if "PART" in line: try: sender = line[1:line.index('!')] if '!' in line else None channel = line.split()[2] colored_sender = self.get_colored_nick(sender) with self.lock: if sender == self.nickname: if channel in self.channels: self.channels.remove(channel) self.nicklists.pop(channel, None) print(f"{self.get_timestamp()} Left {channel}") if self.active_channel == channel: self.active_channel = self.channels[0] if self.channels else None print(f"{self.get_timestamp()} Active channel switched to: {self.active_channel or 'None'}") else: # Remove parting user from nicklist if channel in self.nicklists and sender in self.nicklists[channel]: self.nicklists[channel].remove(sender) print(f"{self.get_timestamp()} {colored_sender} left {channel}") except ValueError: print(f"{self.get_timestamp()} Malformed PART: {line}") # Handle QUIT if "QUIT" in line: try: sender = line[1:line.index('!')] if '!' in line else None colored_sender = self.get_colored_nick(sender) with self.lock: # Remove user from all channel nicklists for channel in self.nicklists: if sender in self.nicklists[channel]: self.nicklists[channel].remove(sender) print(f"{self.get_timestamp()} {colored_sender} quit") except ValueError: print(f"{self.get_timestamp()} Malformed QUIT: {line}") # Handle topic if "332" in line: try: channel = line.split()[3] topic = line[line.index(':', 1) + 1:] print(f"{self.get_timestamp()} Topic for {channel}: {topic}") except (IndexError, ValueError): print(f"{self.get_timestamp()} Malformed TOPIC (332): {line}") # Handle WHO response if "352" in line: try: parts = line.split() if len(parts) >= 8: channel, user, host, nick = parts[3], parts[4], parts[5], parts[7] colored_nick = self.get_colored_nick(nick) with self.lock: if channel in self.nicklists: self.nicklists[channel].add(nick) print(f"{self.get_timestamp()} {colored_nick} ({user}@{host}) in {channel}") else: print(f"{self.get_timestamp()} Malformed WHO reply (352): {line}") except (IndexError, ValueError): print(f"{self.get_timestamp()} Malformed WHO reply (352): {line}") except Exception as e: print(f"{self.get_timestamp()} Server error: {e}") self.reconnect() break def reconnect(self): self.running = False if self.irc: try: self.irc.close() except: pass self.irc = None attempts = 0 while not self.running and attempts < 5: delay = min(self.reconnect_delay * (2 ** attempts), self.max_reconnect_delay) print(f"{self.get_timestamp()} Reconnecting in {delay} seconds...") time.sleep(delay) self.running = True if self.connect(): print(f"{self.get_timestamp()} Reconnected successfully.") return attempts += 1 print(f"{self.get_timestamp()} Failed to reconnect after {attempts} attempts.") self.running = False def process_commands(self): while self.running: try: message = self.message_queue.get(timeout=1) if message.lower() == "quit": self.send("QUIT :Goodbye") self.running = False elif message.startswith('/'): self.handle_command(message) else: with self.lock: if self.active_channel: self.send(f"PRIVMSG {self.active_channel} :{message}") else: print(f"{self.get_timestamp()} No active channel. Use /join <channel> or /switch <index>.") except queue.Empty: continue except Exception as e: print(f"{self.get_timestamp()} Command error: {e}") def handle_command(self, command): parts = command.split(maxsplit=2) cmd = parts[0].lower() if cmd == "/nick" and len(parts) > 1: self.nickname = parts[1] self.send(f"NICK {self.nickname}") elif cmd == "/join" and len(parts) > 1: channel = parts[1] self.send(f"JOIN {channel}") with self.lock: if channel not in self.channels: self.channels.append(channel) self.nicklists[channel] = set() print(f"{self.get_timestamp()} Requested to join {channel}") elif cmd == "/msg" and len(parts) > 2: target = parts[1] msg = parts[2] self.send(f"PRIVMSG {target} :{msg}") elif cmd == "/me" and len(parts) > 1: with self.lock: if self.active_channel: msg = parts[1] self.send(f"PRIVMSG {self.active_channel} :\x01ACTION {msg}\x01") else: print(f"{self.get_timestamp()} No active channel.") elif cmd == "/list": with self.lock: if self.channels: print(f"{self.get_timestamp()} Active channels:") for i, channel in enumerate(self.channels): mark = "*" if channel == self.active_channel else " " print(f"{self.get_timestamp()} {i}: {channel}{mark}") else: print(f"{self.get_timestamp()} No channels joined.") elif cmd == "/part" and len(parts) > 1: channel = parts[1] self.send(f"PART {channel}") elif cmd == "/switch" and len(parts) > 1: try: index = int(parts[1]) with self.lock: if 0 <= index < len(self.channels): self.active_channel = self.channels[index] print(f"{self.get_timestamp()} Switched to active channel: {self.active_channel}") else: print(f"{self.get_timestamp()} Invalid index. Use /list to see channels.") except ValueError: print(f"{self.get_timestamp()} Invalid index. Use a number (e.g., /switch 0).") elif cmd == "/who" and len(parts) > 1: self.send(f"WHO {parts[1]}") elif cmd == "/topic" and len(parts) > 1: channel = parts[1] if len(parts) > 2: self.send(f"TOPIC {channel} :{parts[2]}") else: self.send(f"TOPIC {channel}") elif cmd == "/clear": print("\033[H\033[2J", end="") elif cmd == "/help": print(f"{self.get_timestamp()} Commands: /nick <nick>, /join <channel>, /msg <target> <msg>, /me <action>, /list, /part <channel>, /switch <index>, /who <channel>, /topic <channel> [new topic], /clear, /help, quit") else: print(f"{self.get_timestamp()} Unknown command or invalid syntax. Use /help.") def run(self): if not self.connect(): return server_thread = threading.Thread(target=self.handle_server) input_thread = threading.Thread(target=self.handle_input) command_thread = threading.Thread(target=self.process_commands) server_thread.start() input_thread.start() command_thread.start() try: server_thread.join() input_thread.join() command_thread.join() except KeyboardInterrupt: self.running = False if self.irc: try: self.irc.close() except: pass print(f"{self.get_timestamp()} Connection closed.") def main(): client = IRCClient() client.run() if __name__ == "__main__": main()
second version
That first version worked for a day and then started having nicklist and buffer issues. This new version should have taken care of those problems.
import socket import ssl import threading import sys import time import queue from datetime import datetime import readline import hashlib class IRCClient: def __init__(self): self.server = "irc.rizon.net" self.port = 6697 self.channels = ["#/sp/"] # Only #/sp/ channel self.active_channel = self.channels[0] self.nickname = "LullSac" self.realname = "Termux IRC Client for #/sp/" self.irc = None self.context = ssl.create_default_context() self.running = True self.message_queue = queue.Queue() self.lock = threading.Lock() self.reconnect_delay = 5 self.max_reconnect_delay = 300 # Initialize nicklists with proper synchronization self.nicklists = {channel: set() for channel in self.channels} # ANSI color codes for nicknames self.nick_colors = [ "\033[31m", "\033[32m", "\033[33m", "\033[34m", "\033[35m", "\033[36m", "\033[91m", "\033[92m" ] self.color_reset = "\033[0m" def get_colored_nick(self, nick): """Return colored nickname based on consistent hash.""" if not nick: return nick hash_value = int(hashlib.md5(nick.encode('utf-8')).hexdigest(), 16) color_index = hash_value % len(self.nick_colors) return f"{self.nick_colors[color_index]}{nick}{self.color_reset}" def connect(self): try: raw_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.irc = self.context.wrap_socket(raw_socket, server_hostname=self.server) self.irc.connect((self.server, self.port)) self.send(f"USER {self.nickname} 0 * :{self.realname}") self.send(f"NICK {self.nickname}") return True except Exception as e: print(f"{self.get_timestamp()} Connection error: {e}") return False def send(self, message): try: self.irc.send(f"{message}\r\n".encode('utf-8')) except Exception as e: print(f"{self.get_timestamp()} Error sending message: {e}") def get_timestamp(self): return f"\033[34m[{datetime.now().strftime('%I:%M %p')}]\033[0m" def handle_server(self): buffer = "" while self.running: try: data = self.irc.recv(2048).decode('utf-8', errors='ignore') if not data: raise Exception("Connection closed by server") buffer += data lines = buffer.split('\r\n') buffer = lines.pop() # Keep incomplete line in buffer for line in lines: if not line.strip(): continue # Handle PING immediately if line.startswith("PING"): self.send(f"PONG {line.split()[1]}") continue # Handle server responses if "PRIVMSG" in line: self.handle_privmsg(line) elif "JOIN" in line: self.handle_join(line) elif "PART" in line: self.handle_part(line) elif "QUIT" in line: self.handle_quit(line) elif "352" in line: # WHO response self.handle_who(line) elif "376" in line or "422" in line: # End of MOTD self.join_channels() elif "433" in line: # Nick in use self.handle_nick_in_use() elif "332" in line: # TOPIC self.handle_topic(line) else: print(f"{self.get_timestamp()} {line}") except Exception as e: print(f"{self.get_timestamp()} Server error: {e}") self.reconnect() break def handle_privmsg(self, line): try: sender = line[1:line.index('!')] target = line.split()[2] msg = line[line.index(':', 1) + 1:] colored_sender = self.get_colored_nick(sender) if msg.startswith('\x01ACTION '): print(f"{self.get_timestamp()} * {colored_sender} {msg[8:-1]}") elif msg.startswith('\x01'): print(f"{self.get_timestamp()} CTCP from {colored_sender}: {msg}") else: print(f"{self.get_timestamp()} <{colored_sender}> {msg}") except ValueError: print(f"{self.get_timestamp()} Malformed PRIVMSG: {line}") def handle_join(self, line): try: sender = line[1:line.index('!')] channel = line.split(':', 2)[-1].strip() colored_sender = self.get_colored_nick(sender) with self.lock: if sender == self.nickname: if channel not in self.channels: self.channels.append(channel) self.nicklists[channel] = set() print(f"{self.get_timestamp()} Joined {channel}") self.send(f"WHO {channel}") else: if channel in self.nicklists: self.nicklists[channel].add(sender) print(f"{self.get_timestamp()} {colored_sender} joined {channel}") except ValueError: print(f"{self.get_timestamp()} Malformed JOIN: {line}") def handle_part(self, line): try: sender = line[1:line.index('!')] channel = line.split()[2] colored_sender = self.get_colored_nick(sender) with self.lock: if sender == self.nickname: if channel in self.channels: self.channels.remove(channel) self.nicklists.pop(channel, None) print(f"{self.get_timestamp()} Left {channel}") if self.active_channel == channel: self.active_channel = self.channels[0] if self.channels else None else: if channel in self.nicklists and sender in self.nicklists[channel]: self.nicklists[channel].remove(sender) print(f"{self.get_timestamp()} {colored_sender} left {channel}") except ValueError: print(f"{self.get_timestamp()} Malformed PART: {line}") def handle_quit(self, line): try: sender = line[1:line.index('!')] colored_sender = self.get_colored_nick(sender) with self.lock: for channel in self.nicklists: if sender in self.nicklists[channel]: self.nicklists[channel].remove(sender) print(f"{self.get_timestamp()} {colored_sender} quit") except ValueError: print(f"{self.get_timestamp()} Malformed QUIT: {line}") def handle_who(self, line): try: parts = line.split() if len(parts) >= 8: channel = parts[3] nick = parts[7] with self.lock: if channel in self.nicklists: self.nicklists[channel].add(nick) except (IndexError, ValueError): print(f"{self.get_timestamp()} Malformed WHO reply: {line}") def join_channels(self): with self.lock: for channel in self.channels: self.send(f"JOIN {channel}") self.send(f"WHO {channel}") def handle_nick_in_use(self): self.nickname = f"{self.nickname}_" self.send(f"NICK {self.nickname}") print(f"{self.get_timestamp()} Nick in use, trying {self.get_colored_nick(self.nickname)}") def handle_topic(self, line): try: channel = line.split()[3] topic = line[line.index(':', 1) + 1:] print(f"{self.get_timestamp()} Topic for {channel}: {topic}") except (IndexError, ValueError): print(f"{self.get_timestamp()} Malformed TOPIC: {line}") def reconnect(self): self.running = False if self.irc: try: self.irc.close() except: pass self.irc = None attempts = 0 while attempts < 5: delay = min(self.reconnect_delay * (2 ** attempts), self.max_reconnect_delay) print(f"{self.get_timestamp()} Reconnecting in {delay} seconds...") time.sleep(delay) self.running = True if self.connect(): print(f"{self.get_timestamp()} Reconnected successfully.") return attempts += 1 print(f"{self.get_timestamp()} Failed to reconnect after {attempts} attempts.") self.running = False def handle_input(self): readline.set_completer(self.complete_nick) readline.parse_and_bind("tab: complete") while self.running: try: with self.lock: prompt = f"[{self.active_channel}] > " if self.active_channel else "[No channel] > " sys.stdout.write(prompt) sys.stdout.flush() message = input().strip() if not self.running: break if message: self.message_queue.put(message) except KeyboardInterrupt: self.message_queue.put("QUIT :Goodbye") break except Exception as e: print(f"{self.get_timestamp()} Input error: {e}") def complete_nick(self, text, state): """Tab completion for nicknames in active channel.""" with self.lock: if not self.active_channel: return None nicklist = self.nicklists.get(self.active_channel, set()) matches = [nick for nick in nicklist if nick.lower().startswith(text.lower())] return matches[state] if state < len(matches) else None def process_commands(self): while self.running: try: message = self.message_queue.get(timeout=1) if message.lower() == "quit": self.send("QUIT :Goodbye") self.running = False elif message.startswith('/'): self.handle_command(message) else: with self.lock: if self.active_channel: self.send(f"PRIVMSG {self.active_channel} :{message}") else: print(f"{self.get_timestamp()} No active channel. Use /join <channel>") except queue.Empty: continue except Exception as e: print(f"{self.get_timestamp()} Command error: {e}") def handle_command(self, command): parts = command.split(maxsplit=2) cmd = parts[0].lower() if cmd == "/nick" and len(parts) > 1: self.nickname = parts[1] self.send(f"NICK {self.nickname}") elif cmd == "/join" and len(parts) > 1: channel = parts[1] self.send(f"JOIN {channel}") with self.lock: if channel not in self.channels: self.channels.append(channel) self.nicklists[channel] = set() print(f"{self.get_timestamp()} Requested to join {channel}") elif cmd == "/msg" and len(parts) > 2: target = parts[1] msg = parts[2] self.send(f"PRIVMSG {target} :{msg}") elif cmd == "/me" and len(parts) > 1: with self.lock: if self.active_channel: msg = parts[1] self.send(f"PRIVMSG {self.active_channel} :\x01ACTION {msg}\x01") else: print(f"{self.get_timestamp()} No active channel.") elif cmd == "/list": with self.lock: if self.channels: print(f"{self.get_timestamp()} Active channels:") for i, channel in enumerate(self.channels): mark = "*" if channel == self.active_channel else " " print(f"{self.get_timestamp()} {i}: {channel}{mark}") else: print(f"{self.get_timestamp()} No channels joined.") elif cmd == "/part" and len(parts) > 1: channel = parts[1] self.send(f"PART {channel}") elif cmd == "/switch" and len(parts) > 1: try: index = int(parts[1]) with self.lock: if 0 <= index < len(self.channels): self.active_channel = self.channels[index] print(f"{self.get_timestamp()} Switched to active channel: {self.active_channel}") else: print(f"{self.get_timestamp()} Invalid index. Use /list to see channels.") except ValueError: print(f"{self.get_timestamp()} Invalid index. Use a number (e.g., /switch 0).") elif cmd == "/who" and len(parts) > 1: self.send(f"WHO {parts[1]}") elif cmd == "/topic" and len(parts) > 1: channel = parts[1] if len(parts) > 2: self.send(f"TOPIC {channel} :{parts[2]}") else: self.send(f"TOPIC {channel}") elif cmd == "/clear": print("\033[H\033[2J", end="") elif cmd == "/help": print(f"{self.get_timestamp()} Commands: /nick <nick>, /join <channel>, /msg <target> <msg>, /me <action>, /list, /part <channel>, /switch <index>, /who <channel>, /topic <channel> [new topic], /clear, /help, quit") else: print(f"{self.get_timestamp()} Unknown command or invalid syntax. Use /help.") def run(self): if not self.connect(): return server_thread = threading.Thread(target=self.handle_server) input_thread = threading.Thread(target=self.handle_input) command_thread = threading.Thread(target=self.process_commands) server_thread.start() input_thread.start() command_thread.start() try: server_thread.join() input_thread.join() command_thread.join() except KeyboardInterrupt: self.running = False if self.irc: try: self.irc.close() except: pass print(f"{self.get_timestamp()} Connection closed.") def main(): client = IRCClient() client.run() if __name__ == "__main__": main()