Difference between revisions of "Termux IRC Client"
Line 285: | Line 285: | ||
try: | try: | ||
raw_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | raw_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | ||
raw_socket.settimeout( | raw_socket.settimeout(120) # Increased to 120 seconds | ||
self.irc = self.context.wrap_socket(raw_socket, server_hostname=self.server) | self.irc = self.context.wrap_socket(raw_socket, server_hostname=self.server) | ||
print(f"{self.get_timestamp()} Connecting to {self.server}:{self.port}...") | print(f"{self.get_timestamp()} Connecting to {self.server}:{self.port}...") | ||
Line 332: | Line 332: | ||
self.reconnect() | self.reconnect() | ||
break | break | ||
# Debug: Log raw data | |||
print(f"{self.get_timestamp()} Raw data: {repr(data)}") | |||
for line in data.strip().split('\r\n'): | for line in data.strip().split('\r\n'): | ||
Line 340: | Line 343: | ||
handled_messages = ["PRIVMSG", "JOIN", "PART", "PING", "376", "422", "433", "332", "352"] | handled_messages = ["PRIVMSG", "JOIN", "PART", "PING", "376", "422", "433", "332", "352"] | ||
# Only print raw line if | # Only print raw line if not handled | ||
if not any(msg in line for msg in handled_messages): | if not any(msg in line for msg in handled_messages): | ||
print(f"{self.get_timestamp()} {line}") | print(f"{self.get_timestamp()} {line}") | ||
Line 346: | Line 349: | ||
# Handle PING | # Handle PING | ||
if line.startswith("PING"): | if line.startswith("PING"): | ||
pong_response = f"PONG {line.split()[1]}" | |||
print(f"{self.get_timestamp()} Received PING, sending: {pong_response}") | |||
self.send(pong_response) | |||
# Join channels after MOTD | # Join channels after MOTD | ||
Line 368: | Line 373: | ||
target = line.split()[2] | target = line.split()[2] | ||
msg = line[line.index(':', 1) + 1:] | msg = line[line.index(':', 1) + 1:] | ||
if msg.startswith('\x01ACTION '): | if msg.startswith('\x01ACTION '): | ||
msg = msg[8:-1] | msg = msg[8:-1] | ||
print(f"{self.get_timestamp()} * {sender}@{target} {msg}") | print(f"{self.get_timestamp()} * {sender}@{target} {msg}") | ||
elif msg.startswith('\x01'): | elif msg.startswith('\x01'): | ||
print(f"{self.get_timestamp()} CTCP from {sender}: {msg}") | print(f"{self.get_timestamp()} CTCP from {sender}: {msg}") | ||
else: | else: | ||
Line 422: | Line 427: | ||
def reconnect(self): | def reconnect(self): | ||
self.running = False | self.running = False | ||
self.irc.close() | if self.irc: | ||
try: | |||
self.irc.close() | |||
except: | |||
pass | |||
self.irc = None | |||
attempts = 0 | attempts = 0 | ||
while not self.running and attempts < 5: | while not self.running and attempts < 5: | ||
Line 430: | Line 440: | ||
self.running = True | self.running = True | ||
if self.connect(): | if self.connect(): | ||
print(f"{self.get_timestamp()} Reconnected successfully.") | print(f"{self.get_timestamp()} Reconnected successfully.") | ||
return | return | ||
attempts += 1 | attempts += 1 | ||
print(f"{self.get_timestamp()} Failed to reconnect after {attempts} attempts.") | print(f"{self.get_timestamp()} Failed to reconnect after {attempts} attempts.") | ||
self.running = False | |||
def process_commands(self): | def process_commands(self): | ||
Line 537: | Line 547: | ||
self.running = False | self.running = False | ||
self.irc.close() | if self.irc: | ||
try: | |||
self.irc.close() | |||
except: | |||
pass | |||
print(f"{self.get_timestamp()} Connection closed.") | print(f"{self.get_timestamp()} Connection closed.") | ||
Revision as of 15:53, 30 April 2025
A page to document the various attempts at creating an irc client in Termux.
termux IRC client first try
The client will:
- Connect to irc.rizon.net as LullSac.
- Attempt to join #/g/tv and #/sp/.
- Set #/g/tv as the active channel.
- Show a prompt like [#/g/tv] > .
- Display all messages with blue timestamps (e.g., [3:45 PM] <LullSac@#/g/tv> Hello).
Use the UI:
- Type messages (e.g., Hello) to send to the active channel (e.g., #/g/tv).
- /list: See channels (e.g., 0: #/g/tv *, 1: #/sp/).
- /switch 1: Switch to #/sp/ (prompt changes to [#/sp/] > ).
- /join #newchannel: Join another channel.
- /part #/sp/: Leave a channel.
- /msg #/g/tv Hi: Send to a specific channel.
- /nick NewName: Change nickname.
- /msg SomeUser Hi: Send a private message.
- quit: Exit the client.
irc_client_advanced.py
import socket
import ssl
import threading
import sys
import time
import queue
from datetime import datetime
class IRCClient:
def __init__(self):
self.server = "irc.rizon.net"
self.port = 6697 # SSL port for Rizon
self.channels = ["#/g/tv", "#/sp/"] # Default channels to join
self.active_channel = self.channels[0] if self.channels else None # Default active channel
self.nickname = "LullSac"
self.realname = "Termux IRC Client"
self.irc = None
self.context = ssl.create_default_context()
self.running = True
self.message_queue = queue.Queue()
self.lock = threading.Lock() # For thread-safe channel list updates
def connect(self):
try:
# Create socket and wrap with SSL
raw_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.irc = self.context.wrap_socket(raw_socket, server_hostname=self.server)
print(f"Connecting to {self.server}:{self.port}...")
self.irc.connect((self.server, self.port))
# Send user and nick information
self.send(f"USER {self.nickname} 0 * :{self.realname}")
self.send(f"NICK {self.nickname}")
return True
except Exception as e:
print(f"Connection error: {e}")
return False
def send(self, message):
try:
self.irc.send(f"{message}\r\n".encode('utf-8'))
except Exception as e:
print(f"Error sending message: {e}")
def get_timestamp(self):
# Return current time in 12-hour format (e.g., 3:45 PM) with blue ANSI color
timestamp = datetime.now().strftime("%I:%M %p")
return f"\033[34m[{timestamp}]\033[0m"
def handle_input(self):
while self.running:
try:
# Show prompt with active channel
with self.lock:
prompt = f"[{self.active_channel}] > " if self.active_channel else "[No channel] > "
sys.stdout.write(prompt)
sys.stdout.flush()
message = input().strip()
if not self.running:
break
if message:
self.message_queue.put(message)
except KeyboardInterrupt:
self.message_queue.put("QUIT :Goodbye")
break
def handle_server(self):
while self.running:
try:
data = self.irc.recv(2048).decode('utf-8')
if not data:
print(f"{self.get_timestamp()} Disconnected from server.")
self.running = False
break
for line in data.strip().split('\r\n'):
if not line:
continue
# Add blue timestamp to all server lines
print(f"{self.get_timestamp()} {line}")
# Handle PING
if line.startswith("PING"):
self.send(f"PONG {line.split()[1]}")
# Join channels after MOTD
if "376" in line or "422" in line:
with self.lock:
for channel in self.channels:
self.send(f"JOIN {channel}")
print(f"{self.get_timestamp()} Joined {channel}")
self.active_channel = self.channels[0] if self.channels else None
# Parse messages for display
if "PRIVMSG" in line:
sender = line[1:line.index('!')]
target = line.split()[2]
msg = line[line.index(':', 1) + 1:]
# Timestamp already added above, just format the message
print(f"{self.get_timestamp()} <{sender}@{target}> {msg}")
# Handle channel join confirmation
if "JOIN" in line and self.nickname in line:
channel = line.split(':', 2)[-1]
with self.lock:
if channel not in self.channels:
self.channels.append(channel)
print(f"{self.get_timestamp()} Added {channel} to active channels.")
if not self.active_channel:
self.active_channel = channel
# Handle parting a channel
if "PART" in line and self.nickname in line:
channel = line.split()[2]
with self.lock:
if channel in self.channels:
self.channels.remove(channel)
print(f"{self.get_timestamp()} Left {channel}")
if self.active_channel == channel:
self.active_channel = self.channels[0] if self.channels else None
print(f"{self.get_timestamp()} Active channel switched to: {self.active_channel or 'None'}")
except Exception as e:
print(f"{self.get_timestamp()} Server error: {e}")
self.running = False
break
def process_commands(self):
while self.running:
try:
message = self.message_queue.get(timeout=1)
if message.lower() == "quit":
self.send("QUIT :Goodbye")
self.running = False
elif message.startswith('/'):
self.handle_command(message)
else:
# Send to active channel
with self.lock:
if self.active_channel:
self.send(f"PRIVMSG {self.active_channel} :{message}")
else:
print(f"{self.get_timestamp()} No active channel. Use /join <channel> or /switch <index>.")
except queue.Empty:
continue
except Exception as e:
print(f"{self.get_timestamp()} Command error: {e}")
def handle_command(self, command):
parts = command.split(maxsplit=2)
cmd = parts[0].lower()
if cmd == "/nick" and len(parts) > 1:
self.nickname = parts[1]
self.send(f"NICK {self.nickname}")
elif cmd == "/join" and len(parts) > 1:
channel = parts[1]
self.send(f"JOIN {channel}")
with self.lock:
if channel not in self.channels:
self.channels.append(channel)
print(f"{self.get_timestamp()} Requested to join {channel}")
elif cmd == "/msg" and len(parts) > 2:
target = parts[1]
msg = parts[2]
self.send(f"PRIVMSG {target} :{msg}")
elif cmd == "/list":
with self.lock:
if self.channels:
print(f"{self.get_timestamp()} Active channels:")
for i, channel in enumerate(self.channels):
mark = "*" if channel == self.active_channel else " "
print(f"{self.get_timestamp()} {i}: {channel}{mark}")
else:
print(f"{self.get_timestamp()} No channels joined.")
elif cmd == "/part" and len(parts) > 1:
channel = parts[1]
self.send(f"PART {channel}")
elif cmd == "/switch" and len(parts) > 1:
try:
index = int(parts[1])
with self.lock:
if 0 <= index < len(self.channels):
self.active_channel = self.channels[index]
print(f"{self.get_timestamp()} Switched to active channel: {self.active_channel}")
else:
print(f"{self.get_timestamp()} Invalid index. Use /list to see channels.")
except ValueError:
print(f"{self.get_timestamp()} Invalid index. Use a number (e.g., /switch 0).")
else:
print(f"{self.get_timestamp()} Unknown command or invalid syntax. Try: /nick, /join, /msg, /list, /part, /switch")
def run(self):
if not self.connect():
return
# Start threads
server_thread = threading.Thread(target=self.handle_server)
input_thread = threading.Thread(target=self.handle_input)
command_thread = threading.Thread(target=self.process_commands)
server_thread.start()
input_thread.start()
command_thread.start()
# Wait for threads to finish
try:
server_thread.join()
input_thread.join()
command_thread.join()
except KeyboardInterrupt:
self.running = False
# Clean up
self.irc.close()
print(f"{self.get_timestamp()} Connection closed.")
def main():
client = IRCClient()
client.run()
if __name__ == "__main__":
main()
termux IRC client second try
import socket
import ssl
import threading
import sys
import time
import queue
from datetime import datetime
class IRCClient:
def __init__(self):
self.server = "irc.rizon.net"
self.port = 6697
self.channels = ["#/g/tv", "#/sp/"]
self.active_channel = self.channels[0] if self.channels else None
self.nickname = "LullSac"
self.realname = "Termux IRC Client"
self.irc = None
self.context = ssl.create_default_context()
self.running = True
self.message_queue = queue.Queue()
self.lock = threading.Lock()
self.reconnect_delay = 5
self.max_reconnect_delay = 300
def connect(self):
try:
raw_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
raw_socket.settimeout(120) # Increased to 120 seconds
self.irc = self.context.wrap_socket(raw_socket, server_hostname=self.server)
print(f"{self.get_timestamp()} Connecting to {self.server}:{self.port}...")
self.irc.connect((self.server, self.port))
self.send(f"USER {self.nickname} 0 * :{self.realname}")
self.send(f"NICK {self.nickname}")
return True
except Exception as e:
print(f"{self.get_timestamp()} Connection error: {e}")
return False
def send(self, message):
try:
self.irc.send(f"{message}\r\n".encode('utf-8'))
except Exception as e:
print(f"{self.get_timestamp()} Error sending message: {e}")
def get_timestamp(self):
timestamp = datetime.now().strftime("%I:%M %p")
return f"\033[34m[{timestamp}]\033[0m"
def handle_input(self):
while self.running:
try:
with self.lock:
prompt = f"[{self.active_channel}] > " if self.active_channel else "[No channel] > "
sys.stdout.write(prompt)
sys.stdout.flush()
message = input().strip()
if not self.running:
break
if message:
self.message_queue.put(message)
except KeyboardInterrupt:
self.message_queue.put("QUIT :Goodbye")
break
except Exception as e:
print(f"{self.get_timestamp()} Input error: {e}")
def handle_server(self):
while self.running:
try:
data = self.irc.recv(2048).decode('utf-8', errors='ignore')
if not data:
print(f"{self.get_timestamp()} Disconnected from server.")
self.reconnect()
break
# Debug: Log raw data
print(f"{self.get_timestamp()} Raw data: {repr(data)}")
for line in data.strip().split('\r\n'):
if not line:
continue
# Define messages to handle specially (no raw print)
handled_messages = ["PRIVMSG", "JOIN", "PART", "PING", "376", "422", "433", "332", "352"]
# Only print raw line if not handled
if not any(msg in line for msg in handled_messages):
print(f"{self.get_timestamp()} {line}")
# Handle PING
if line.startswith("PING"):
pong_response = f"PONG {line.split()[1]}"
print(f"{self.get_timestamp()} Received PING, sending: {pong_response}")
self.send(pong_response)
# Join channels after MOTD
if "376" in line or "422" in line:
with self.lock:
for channel in self.channels:
self.send(f"JOIN {channel}")
print(f"{self.get_timestamp()} Joined {channel}")
self.active_channel = self.channels[0] if self.channels else None
# Handle nick in use
if "433" in line:
self.nickname = f"{self.nickname}_"
self.send(f"NICK {self.nickname}")
print(f"{self.get_timestamp()} Nick in use, trying {self.nickname}")
# Parse PRIVMSG
if "PRIVMSG" in line:
try:
sender = line[1:line.index('!')]
target = line.split()[2]
msg = line[line.index(':', 1) + 1:]
if msg.startswith('\x01ACTION '):
msg = msg[8:-1]
print(f"{self.get_timestamp()} * {sender}@{target} {msg}")
elif msg.startswith('\x01'):
print(f"{self.get_timestamp()} CTCP from {sender}: {msg}")
else:
print(f"{self.get_timestamp()} <{sender}@{target}> {msg}")
except ValueError:
print(f"{self.get_timestamp()} Malformed PRIVMSG: {line}")
# Handle channel join
if "JOIN" in line and self.nickname in line:
channel = line.split(':', 2)[-1]
with self.lock:
if channel not in self.channels:
self.channels.append(channel)
print(f"{self.get_timestamp()} Added {channel} to active channels.")
if not self.active_channel:
self.active_channel = channel
# Handle parting
if "PART" in line and self.nickname in line:
channel = line.split()[2]
with self.lock:
if channel in self.channels:
self.channels.remove(channel)
print(f"{self.get_timestamp()} Left {channel}")
if self.active_channel == channel:
self.active_channel = self.channels[0] if self.channels else None
print(f"{self.get_timestamp()} Active channel switched to: {self.active_channel or 'None'}")
# Handle topic
if "332" in line:
channel = line.split()[3]
topic = line[line.index(':', 1) + 1:]
print(f"{self.get_timestamp()} Topic for {channel}: {topic}")
# Handle WHO response
if "352" in line:
parts = line.split()
channel, user, host, nick = parts[3], parts[4], parts[5], parts[7]
print(f"{self.get_timestamp()} {nick} ({user}@{host}) in {channel}")
except socket.timeout:
print(f"{self.get_timestamp()} Connection timed out.")
self.reconnect()
break
except Exception as e:
print(f"{self.get_timestamp()} Server error: {e}")
self.reconnect()
break
def reconnect(self):
self.running = False
if self.irc:
try:
self.irc.close()
except:
pass
self.irc = None
attempts = 0
while not self.running and attempts < 5:
delay = min(self.reconnect_delay * (2 ** attempts), self.max_reconnect_delay)
print(f"{self.get_timestamp()} Reconnecting in {delay} seconds...")
time.sleep(delay)
self.running = True
if self.connect():
print(f"{self.get_timestamp()} Reconnected successfully.")
return
attempts += 1
print(f"{self.get_timestamp()} Failed to reconnect after {attempts} attempts.")
self.running = False
def process_commands(self):
while self.running:
try:
message = self.message_queue.get(timeout=1)
if message.lower() == "quit":
self.send("QUIT :Goodbye")
self.running = False
elif message.startswith('/'):
self.handle_command(message)
else:
with self.lock:
if self.active_channel:
self.send(f"PRIVMSG {self.active_channel} :{message}")
else:
print(f"{self.get_timestamp()} No active channel. Use /join <channel> or /switch <index>.")
except queue.Empty:
continue
except Exception as e:
print(f"{self.get_timestamp()} Command error: {e}")
def handle_command(self, command):
parts = command.split(maxsplit=2)
cmd = parts[0].lower()
if cmd == "/nick" and len(parts) > 1:
self.nickname = parts[1]
self.send(f"NICK {self.nickname}")
elif cmd == "/join" and len(parts) > 1:
channel = parts[1]
self.send(f"JOIN {channel}")
with self.lock:
if channel not in self.channels:
self.channels.append(channel)
print(f"{self.get_timestamp()} Requested to join {channel}")
elif cmd == "/msg" and len(parts) > 2:
target = parts[1]
msg = parts[2]
self.send(f"PRIVMSG {target} :{msg}")
elif cmd == "/me" and len(parts) > 1:
with self.lock:
if self.active_channel:
msg = parts[1]
self.send(f"PRIVMSG {self.active_channel} :\x01ACTION {msg}\x01")
else:
print(f"{self.get_timestamp()} No active channel.")
elif cmd == "/list":
with self.lock:
if self.channels:
print(f"{self.get_timestamp()} Active channels:")
for i, channel in enumerate(self.channels):
mark = "*" if channel == self.active_channel else " "
print(f"{self.get_timestamp()} {i}: {channel}{mark}")
else:
print(f"{self.get_timestamp()} No channels joined.")
elif cmd == "/part" and len(parts) > 1:
channel = parts[1]
self.send(f"PART {channel}")
elif cmd == "/switch" and len(parts) > 1:
try:
index = int(parts[1])
with self.lock:
if 0 <= index < len(self.channels):
self.active_channel = self.channels[index]
print(f"{self.get_timestamp()} Switched to active channel: {self.active_channel}")
else:
print(f"{self.get_timestamp()} Invalid index. Use /list to see channels.")
except ValueError:
print(f"{self.get_timestamp()} Invalid index. Use a number (e.g., /switch 0).")
elif cmd == "/who" and len(parts) > 1:
self.send(f"WHO {parts[1]}")
elif cmd == "/topic" and len(parts) > 1:
channel = parts[1]
if len(parts) > 2:
self.send(f"TOPIC {channel} :{parts[2]}")
else:
self.send(f"TOPIC {channel}")
elif cmd == "/clear":
print("\033[H\033[2J", end="")
elif cmd == "/help":
print(f"{self.get_timestamp()} Commands: /nick <nick>, /join <channel>, /msg <target> <msg>, /me <action>, /list, /part <channel>, /switch <index>, /who <channel>, /topic <channel> [new topic], /clear, /help, quit")
else:
print(f"{self.get_timestamp()} Unknown command or invalid syntax. Use /help.")
def run(self):
if not self.connect():
return
server_thread = threading.Thread(target=self.handle_server)
input_thread = threading.Thread(target=self.handle_input)
command_thread = threading.Thread(target=self.process_commands)
server_thread.start()
input_thread.start()
command_thread.start()
try:
server_thread.join()
input_thread.join()
command_thread.join()
except KeyboardInterrupt:
self.running = False
if self.irc:
try:
self.irc.close()
except:
pass
print(f"{self.get_timestamp()} Connection closed.")
def main():
client = IRCClient()
client.run()
if __name__ == "__main__":
main()
Moose TOOL | Donics | Taking a penecks | Dan | Manlet | Ballsac's Jokes | Murasa | Chit Chats | Monkt's Birthday | BallSac | Smells Like BallSac | StoneToss | DEAFTONEGOES | Erotica's New Hobby | Martian's Girlfriend | Diarrhea From Oral Sex | GEEGEEGEE | Ryu77 | Ents | FAST on Monday Morning | Macbot | Leeloo | Christistheway | Lego Monkt | IRC | Greentea | Reefer Banana Bread | Register On Rizon | Monkt's Playlist | Termux IRC Client