Based off of a simpler bot by other people found on the web. Wrote it to be able to allow irc users to add themselves to a Apache blacklist. Some of the features are os dependant ( needs to be run on linux).
Just fill out the sections for server, server password, nickserv password, port, and channel.
#!/usr/bin/python
"""
ircbot.py - An IRC Bot Basis in Python
License:
W3C Open Source License; share and enjoy!
http://www.w3.org/Consortium/Legal/copyright-software-19980720
Original:
http://dev.w3.org/cvsweb/2000/scribe-bot/ircAsync.py
Dan Connolly and Tim Berners-Lee
Copyright (c) 2001 W3C (MIT, INRIA, Keio)
Augmentations' Author:
Sean B. Palmer, inamidst.com
This version updated by: Serinox
Features added:
network oper abilities
actions
kill users
kick users
ip to domain lookup
whois lookup
nickserv signin
works with a modified apache ip whitelist
other random quirks.
"""
import sys, os, re, socket, asyncore, asynchat, re, commands
protect = True
filename = "irc.log"
FILE = open(filename,"a")
proxyacl = "/usr/local/proxy/conf/access.conf"
PACL = open(filename,"a")
class Origin(object):
def __init__(self, origin):
self.origin = origin
self.split(origin)
def __str__(self):
return self.origin
def split(self, origin):
if origin and '!' in origin:
self.nick, userHost = origin.split('!', 1)
if '@' in userHost:
self.user, self.host = userHost.split('@', 1)
else: self.user, self.host = userHost, None
else: self.nick, self.user, self.host = origin, None, None
def replyTo(self, nickname, args):
if (not args) or (len(args) < 2): return
target = args[1]
if target == nickname:
self.sender = self.nick
else: self.sender = target
def ctcp(s): return '\x01%s\x01' % s
def me(s): return ctcp('ACTION %s' % s)
class Bot(asynchat.async_chat):
def __init__(self, nick=None, userid=None, name=None, channels=None):
asynchat.async_chat.__init__(self)
self.bufIn = ''
self.set_terminator('\r\n')
self.nick = nick or 'Ted'
self.userid = userid or 'Ted'
self.name = name or 'Ted'
self.documentation = {}
self.info = {}
self.dispatch = []
self.msgstack = []
self.channels = channels or ['#test']
self.rule(self.welcome, 'welcome', cmd='001')
self.rule(self.help, 'help', '%s[:,] help (\w+)\??' % self.nick)
def run(self, host, port):
port = port or 6667
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
debug("connecting to...", host, port)
self.connect((host, port))
self.bufIn = ''
asyncore.loop()
def welcome(self, m, origin, args, text):
for chan in self.channels:
self.todo(['JOIN', chan])
self.msg("nickserv","identify NICKSERV PASSWORD")
self.todo(['OPER',"Phil Poper123!"])
def help(self, m, origin, args, text):
command = m.group(1)
if self.documentation.has_key(command):
doc = self.documentation[command]
self.msg(origin.sender, '%s: %s' % (command, doc))
else: self.msg(origin.sender, 'No help available for %s.' % command)
def todo(self, args, *text):
command = ' '.join(args)
if text: command = command + ' :' + ' '.join(text)
self.push(command + '\r\n')
debug("sent/pushed command:", command)
def handle_connect(self):
debug("connected")
self.todo(['PASS',"IRC SERVER PASSWORD"])
self.todo(['NICK', self.nick])
self.todo(['USER', self.userid, "+iw", self.nick], self.name)
def collect_incoming_data(self, bytes):
self.bufIn = self.bufIn + bytes
FILE = open(filename,"a")
FILE.writelines(self.bufIn+"\n")
FILE.close()
def found_terminator(self):
line = self.bufIn
self.bufIn = ''
if line.startswith(':'):
origin, line = line[1:].split(' ', 1)
origin = Origin(origin)
else: origin = None
try: args, text = line.split(' :', 1)
except ValueError:
args, text = line, ''
args = args.split()
if origin: origin.replyTo(self.nick, args)
debug("from::", origin, "|message::", args, "|text::", text)
self.rxdMsg(args, text, origin)
def rule(self, func, name, regexp=None, doc=None, cmd=None):
if isinstance(regexp, basestring):
regexp = re.compile(regexp)
if self.documentation.has_key(name):
raise "DispatchError", "Function %s already added" % name
doc = doc or func.__doc__
if doc and doc.strip():
self.documentation[name] = doc
self.dispatch.append((cmd or 'PRIVMSG', regexp, func))
def bind(self, func, cmd, regexp):
self.dispatch.append((cmd, re.compile(regexp), func))
def rxdMsg(self, args, text, origin):
if args[0] == 'PING':
self.todo(['PONG', text])
for cmd, pat, thunk in self.dispatch:
if args[0] == cmd:
if pat:
m = pat.search(text)
if not m: continue
else: m = None
if protect:
try: thunk(m, origin, args, text)
except Exception, e:
try: self.msg(origin.sender, "%s: %s" % (e.__class__, e))
except: print >> sys.stderr, "%s: %s" % (e.__class__, e)
else: thunk(m, origin, args, text)
def pushMsg(self, msg):
self.msgstack = self.msgstack[-9:]
self.msgstack.append(msg)
def msg(self, dest, text):
# Flood protection
if self.msgstack.count(text) >= 50:
text = '...'
if self.msgstack.count('...') >= 20:
if text == '...': return
if len(''.join(self.msgstack[:-3])) > 200:
import time
time.sleep(2)
self.pushMsg(text)
self.todo(['PRIVMSG', dest], text)
def kick(self, dest, text):
self.todo(['KICK', dest], text)
#def join(self, chan):
# self.todo(['JOIN', chan])
def safeMsg(self, channel, lines):
import time
for line in lines:
if line: self.msg(channel, line)
time.sleep(1)
if len(line) > 50: time.sleep(0.7)
def notice(self, dest, text):
self.todo(['NOTICE', dest], text)
def debug(*args):
sys.stderr.write("DEBUG: ")
for a in args:
sys.stderr.write(str(a))
sys.stderr.write("\n")
def doubleFork():
# http://swhack.com/logs/2004-05-12#T10-20-11
# http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/66012
try:
pid = os.fork()
if pid > 0: sys.exit(0)
except OSError, e:
print >> sys.stderr, "fork #1 failed: %d (%s)" % (e.errno, e.strerror)
sys.exit(1)
os.chdir("/")
os.setsid()
os.umask(0)
try:
pid = os.fork()
if pid > 0:
print "Daemon PID %d" % pid
sys.exit(0)
except OSError, e:
print >> sys.stderr, "fork #2 failed: %d (%s)" % (e.errno, e.strerror)
sys.exit(1)
def test(host, port, channels):
bot = Bot(nick='Ted', channels=channels)
def hi(m, origin, args, text, bot=bot):
bot.msg(origin.sender, "hi %s" % origin.nick)
bot.rule(hi, 'hi', r'hi %s' % bot.nick)
def trout(m, origin, args, text, bot=bot):
bot.kick(origin.sender, origin.nick)
bot.rule(trout, 'trout', p)
def action(m,origin,args,text,bot=bot):
cmd = text.split()
chan = cmd[1]
cmd[0] = ""
cmd[1] = ""
cmd.remove("")
cmd = " ".join(cmd)
bot.msg(chan,"ACTION "+cmd+"")
bot.rule(action,'action',r'!action')
# def up(m,origin,args,text,bot=bot):
# bot.todo(['OPER',"OperName Operpass"])
# bot.rule(up,'up',r'up')
def kill(m,origin,args,text,bot=bot):
cmd = text.split()
cmd = "%s killed_you_dead" % cmd[1]
bot.todo(['KILL',cmd])
bot.rule(kill,'kill',r'!!!kill')
def ip(m, origin, args, text, bot=bot):
bot.msg(origin.sender, "Getting IP informantion")
cmd = text.split()
cmd = "nslookup %s" % cmd[1]
result = commands.getoutput(cmd)
print cmd
fixed = result.splitlines()
found = 0
for i in fixed:
if (i.find(r"Address:") != -1 and i.find(r"#") == -1):
bot.msg(origin.sender, i)
found = 1;
if found <> 1:
bot.msg(origin.sender, "Not Found")
bot.rule(ip, 'ip', r'!ip')
def whois(m, origin, args, text, bot=bot):
bot.msg(origin.sender, "Getting WhoIs informantion")
cmd = text.split()
cmd = "whois %s" % cmd[1]
result = commands.getoutput(cmd)
print cmd
fixed = result.splitlines()
found = 0
for i in fixed:
if i.find(r"Name Server:") != -1:
bot.msg(origin.sender, i)
found = 1;
if found <> 1:
bot.msg(origin.sender, "Not Found")
bot.rule(whois, 'whois', r'!whois')
# def access(m, origin, args, text, bot=bot):
# PACL = open(proxyacl,"r")
# text = PACL.readlines()
# PACL.close()
# for i in range(len(text)):
# bot.msg(origin.nick,text[i])
# bot.rule(access, 'access', r'!access')
# def terminate(m, origin, args, text, bot=bot):
# bot.msg(origin.nick,"terminating!")
# text = text.lower()
# cmd = text.split()
# if (len(cmd) == 2):
# PACL = open(proxyacl,"r")
# text = PACL.readlines()
# PACL.close()
# print text
# remove = cmd[1]
# print "Terminating %s" % cmd[1]
# removed = -1
# for i in range(len(text)):
# l = text[i]
# if (l.find(str(remove)) <> -1):
# removed = i
# if (l.find(str(remove)) <> -1):
# break
# print remove
# if (removed <> -1):
# del text[removed]
# print "\n\n\n\n\n\n\n\n"
# print text
# PACL = open(proxyacl,"w")
# for i in range(len(text)):
# PACL.write(text[i])
# PACL.close()
# bot.msg(origin.sender,"You have terminated " + cmd[1])
# bot.rule(terminate, 'terminate', r'!terminate')
## def proxy(m, origin, args, text, bot=bot):
## text = text.lower()
## cmd = text.split()
## if (len(cmd) == 3):
## errTest = cmd[2]
## errName = cmd[1]
## err = 0
## if (errName.count("#") <> 0):
## bot.msg(origin.sender,"INVALID NAME TRY AGAIN")
## err = 1
## if (errTest.count("#") <> 0):
## bot.msg(origin.sender,"INVALID IP TRY IT AGAIN")
## err = 1
## if (errTest.find("169.254") <> -1):
## bot.msg(origin.sender,"INVALID IP GO BACK AND TRY AGAIN ")
## if (errTest.find("169.254") <> -1):
## err = 1
## if (errTest.find("192.168") <>-1):
## bot.msg(origin.sender,"INVALID IP GO BACK AND TRY AGAIN")
## if (errTest.find("192.168") <>-1):
## err = 1
## if (errTest.count(".") <> 3):
## bot.msg(origin.sender,"INVALID IP GO BACK AND TRY AGAIN")
## if (errTest.count(".") <> 3):
## err = 1
## if (err <> 1):
## PACL = open(proxyacl,"r")
## text = PACL.readlines()
## PACL.close()
## checked = 1
## for i in range(len(text)):
## l = text[i]
## print "IP::::" + errTest
## print "LINE::"+l
## if (str(l.find(str(errTest))) <> '-1'):
## checked = 0
##
## if (checked == 1):
## PACL = open(proxyacl,"r")
## text = PACL.readlines()
## PACL.close()
## print text
## remove = cmd[1]
## print "you are looking for"
## removed = -1
## for i in range(len(text)):
## l = text[i]
## if (l.find(str(remove)) <> -1):
## removed = i
## if (l.find(str(remove)) <> -1):
## break
## print remove
## if (removed <> -1):
## del text[removed]
## print "\n\n\n\n\n\n\n\n"
## print text
## PACL = open(proxyacl,"w")
## for i in range(len(text)):
## PACL.write(text[i])
## PACL.close()
## cmd = "allow from %s #%s" % (cmd[2],cmd[1])
## bot.msg(origin.sender, cmd)
## PACL = open(proxyacl,"a")
## PACL.writelines(cmd+"\n")
## PACL.close()
## bot.msg(origin.sender,"your ip has been added, if the proxy does not work for you check your ip and try again.")
## else:
## bot.msg(origin.sender,"your IP is already in this list")
## else:
## bot.msg(origin.sender,"Your command was not formatted correctly the correct format is '!proxy <name> <ipaddress>' please try again")
## result = commands.getoutput("/usr/local/proxy/bin/apachectl stop")
## print result
## result = commands.getoutput("/usr/local/proxy/bin/apachectl start")
## print result
## bot.rule(proxy, 'proxy', r'!proxy')
## def hmm(m, origin, (cmd, chan), text, bot=bot):
## """.test - Do some crazy test thing."""
## raise Exception, "blargh"
## bot.msg(origin.sender, 'Test')
## bot.rule(hmm, 'test', r'^\.test$')
bot.run(host, port)
if __name__=='__main__':
test('SERVERIP', PORTNUMBER, ['#CHANNEL'])
April 1st, 2009 at 1:43 pm
[...] most people come across the site after searching for a python irc bot I thought I would clean mine up and make it a little more generic (the previous version had triggers that only worked on my system) [...]