Python IRC bot.

January 30th, 2009
by Serinox

Based off of a simpler bot by other people found on the web. Wrote it to be able to allow irc users to add themselves to a Apache blacklist. Some of the features are os dependant ( needs to be run on linux).

Just fill out the sections for server, server password, nickserv password, port, and channel.

#!/usr/bin/python
"""
ircbot.py - An IRC Bot Basis in Python

License:
   W3C Open Source License; share and enjoy!

http://www.w3.org/Consortium/Legal/copyright-software-19980720

Original:

http://dev.w3.org/cvsweb/2000/scribe-bot/ircAsync.py

   Dan Connolly and Tim Berners-Lee
   Copyright (c) 2001 W3C (MIT, INRIA, Keio)

Augmentations' Author:
   Sean B. Palmer, inamidst.com

This version updated by: Serinox
Features added:
    network oper abilities
    actions
    kill users
    kick users
    ip to domain lookup
    whois lookup
    nickserv signin
    works with a modified apache ip whitelist
    other random quirks.
"""

import sys, os, re, socket, asyncore, asynchat, re, commands

protect = True
filename = "irc.log"
FILE = open(filename,"a")

proxyacl = "/usr/local/proxy/conf/access.conf"

PACL = open(filename,"a")

class Origin(object):
   def __init__(self, origin):
      self.origin = origin
      self.split(origin)

   def __str__(self):
      return self.origin

   def split(self, origin):
      if origin and '!' in origin:
         self.nick, userHost = origin.split('!', 1)
         if '@' in userHost:
            self.user, self.host = userHost.split('@', 1)
         else: self.user, self.host = userHost, None
      else: self.nick, self.user, self.host = origin, None, None

   def replyTo(self, nickname, args):
      if (not args) or (len(args) < 2): return
      target = args[1]
      if target == nickname:
         self.sender = self.nick
      else: self.sender = target

def ctcp(s): return '\x01%s\x01' % s
def me(s): return ctcp('ACTION %s' % s)

class Bot(asynchat.async_chat):
   def __init__(self, nick=None, userid=None, name=None, channels=None):
      asynchat.async_chat.__init__(self)
      self.bufIn = ''
      self.set_terminator('\r\n')

      self.nick = nick or 'Ted'
      self.userid = userid or 'Ted'
      self.name = name or 'Ted'

      self.documentation = {}
      self.info = {}
      self.dispatch = []
      self.msgstack = []

      self.channels = channels or ['#test']
      self.rule(self.welcome, 'welcome', cmd='001')
      self.rule(self.help, 'help', '%s[:,] help (\w+)\??' % self.nick)

   def run(self, host, port):
      port = port or 6667
      self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
      debug("connecting to...", host, port)
      self.connect((host, port))
      self.bufIn = ''
      asyncore.loop()

   def welcome(self, m, origin, args, text):
      for chan in self.channels:
         self.todo(['JOIN', chan])
      self.msg("nickserv","identify NICKSERV PASSWORD")
      self.todo(['OPER',"Phil Poper123!"])

   def help(self, m, origin, args, text):
      command = m.group(1)
      if self.documentation.has_key(command):
         doc = self.documentation[command]
         self.msg(origin.sender, '%s: %s' % (command, doc))
      else: self.msg(origin.sender, 'No help available for %s.' % command)

   def todo(self, args, *text):
      command = ' '.join(args)
      if text: command = command + ' :' + ' '.join(text)

      self.push(command + '\r\n')
      debug("sent/pushed command:", command)

   def handle_connect(self):
      debug("connected")

      self.todo(['PASS',"IRC SERVER PASSWORD"])
      self.todo(['NICK', self.nick])
      self.todo(['USER', self.userid, "+iw", self.nick], self.name)

   def collect_incoming_data(self, bytes):
      self.bufIn = self.bufIn + bytes
      FILE = open(filename,"a")
      FILE.writelines(self.bufIn+"\n")
      FILE.close()
   def found_terminator(self):
      line = self.bufIn
      self.bufIn = ''

      if line.startswith(':'):
         origin, line = line[1:].split(' ', 1)
         origin = Origin(origin)
      else: origin = None

      try: args, text = line.split(' :', 1)
      except ValueError:
         args, text = line, ''
      args = args.split()

      if origin: origin.replyTo(self.nick, args)
      debug("from::", origin, "|message::", args, "|text::", text)
      self.rxdMsg(args, text, origin)

   def rule(self, func, name, regexp=None, doc=None, cmd=None):
      if isinstance(regexp, basestring):
         regexp = re.compile(regexp)
      if self.documentation.has_key(name):
         raise "DispatchError", "Function %s already added" % name
      doc = doc or func.__doc__
      if doc and doc.strip():
         self.documentation[name] = doc
      self.dispatch.append((cmd or 'PRIVMSG', regexp, func))

   def bind(self, func, cmd, regexp):
      self.dispatch.append((cmd, re.compile(regexp), func))

   def rxdMsg(self, args, text, origin):
      if args[0] == 'PING':
         self.todo(['PONG', text])

      for cmd, pat, thunk in self.dispatch:
         if args[0] == cmd:
            if pat:
               m = pat.search(text)
               if not m: continue
            else: m = None

            if protect:
               try: thunk(m, origin, args, text)
               except Exception, e:
                  try: self.msg(origin.sender, "%s: %s" % (e.__class__, e))
                  except: print >> sys.stderr, "%s: %s" % (e.__class__, e)
            else: thunk(m, origin, args, text)

   def pushMsg(self, msg):
      self.msgstack = self.msgstack[-9:]
      self.msgstack.append(msg)

   def msg(self, dest, text):
      # Flood protection
      if self.msgstack.count(text) >= 50:
         text = '...'
      if self.msgstack.count('...') >= 20:
         if text == '...': return

      if len(''.join(self.msgstack[:-3])) > 200:
         import time
         time.sleep(2)

      self.pushMsg(text)
      self.todo(['PRIVMSG', dest], text)

   def kick(self, dest, text):
      self.todo(['KICK', dest], text)

   #def join(self, chan):
   #   self.todo(['JOIN', chan])

   def safeMsg(self, channel, lines):
      import time
      for line in lines:
         if line: self.msg(channel, line)
         time.sleep(1)
         if len(line) > 50: time.sleep(0.7)

   def notice(self, dest, text):
      self.todo(['NOTICE', dest], text)

def debug(*args):
   sys.stderr.write("DEBUG: ")
   for a in args:
      sys.stderr.write(str(a))
   sys.stderr.write("\n")
def doubleFork():
   # http://swhack.com/logs/2004-05-12#T10-20-11
   # http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/66012
   try:
      pid = os.fork()
      if pid > 0: sys.exit(0)
   except OSError, e:
      print >> sys.stderr, "fork #1 failed: %d (%s)" % (e.errno, e.strerror)
      sys.exit(1)
   os.chdir("/")
   os.setsid()
   os.umask(0)
   try:
      pid = os.fork()
      if pid > 0:
         print "Daemon PID %d" % pid
         sys.exit(0)
   except OSError, e:
      print >> sys.stderr, "fork #2 failed: %d (%s)" % (e.errno, e.strerror)
      sys.exit(1)

def test(host, port, channels):
   bot = Bot(nick='Ted', channels=channels)

   def hi(m, origin, args, text, bot=bot):
      bot.msg(origin.sender, "hi %s" % origin.nick)
   bot.rule(hi, 'hi', r'hi %s' % bot.nick)

   def trout(m, origin, args, text, bot=bot):
      bot.kick(origin.sender, origin.nick)
   bot.rule(trout, 'trout', p)

   def action(m,origin,args,text,bot=bot):
       cmd = text.split()
       chan = cmd[1]
       cmd[0] = ""
       cmd[1] = ""
       cmd.remove("")
       cmd = " ".join(cmd)
       bot.msg(chan,"ACTION "+cmd+"")
   bot.rule(action,'action',r'!action')

#   def up(m,origin,args,text,bot=bot):
#       bot.todo(['OPER',"OperName Operpass"])
#   bot.rule(up,'up',r'up')

   def kill(m,origin,args,text,bot=bot):
       cmd = text.split()
       cmd = "%s killed_you_dead" % cmd[1]
       bot.todo(['KILL',cmd])
   bot.rule(kill,'kill',r'!!!kill')

   def ip(m, origin, args, text, bot=bot):
      bot.msg(origin.sender, "Getting IP informantion")
      cmd = text.split()
      cmd = "nslookup %s" % cmd[1]
      result = commands.getoutput(cmd)
      print cmd
      fixed = result.splitlines()
      found = 0
      for i in fixed:
           if (i.find(r"Address:") != -1 and i.find(r"#") == -1):
                 bot.msg(origin.sender, i)
                 found = 1;
      if found <> 1:
            bot.msg(origin.sender, "Not Found")
   bot.rule(ip, 'ip', r'!ip')

   def whois(m, origin, args, text, bot=bot):
      bot.msg(origin.sender, "Getting WhoIs informantion")
      cmd = text.split()
      cmd = "whois %s" % cmd[1]
      result = commands.getoutput(cmd)
      print cmd
      fixed = result.splitlines()
      found = 0
      for i in fixed:
           if i.find(r"Name Server:") != -1:
                 bot.msg(origin.sender, i)
                 found = 1;
      if found <> 1:
            bot.msg(origin.sender, "Not Found")
   bot.rule(whois, 'whois', r'!whois')

#   def access(m, origin, args, text, bot=bot):
#      PACL = open(proxyacl,"r")
#      text = PACL.readlines()
#      PACL.close()
#      for i in range(len(text)):
#         bot.msg(origin.nick,text[i])
#   bot.rule(access, 'access', r'!access')

#   def terminate(m, origin, args, text, bot=bot):
#       bot.msg(origin.nick,"terminating!")
#       text = text.lower()
#       cmd = text.split()
#       if (len(cmd) == 2):
#         PACL = open(proxyacl,"r")
#         text = PACL.readlines()
#         PACL.close()
#         print text
#         remove = cmd[1]
#         print "Terminating %s" % cmd[1]
#         removed = -1
#         for i in range(len(text)):
#                 l = text[i]
#                 if (l.find(str(remove)) <> -1):
#                         removed = i
#                 if (l.find(str(remove)) <> -1):
#                         break
#         print remove
#         if (removed <> -1):
#                 del text[removed]
#         print "\n\n\n\n\n\n\n\n"
#         print text
#         PACL = open(proxyacl,"w")
#         for i in range(len(text)):
#                 PACL.write(text[i])
#         PACL.close()
#         bot.msg(origin.sender,"You have terminated " + cmd[1])
#   bot.rule(terminate, 'terminate', r'!terminate')

##   def proxy(m, origin, args, text, bot=bot):
##      text = text.lower()
##      cmd = text.split()
##      if (len(cmd) == 3):
##        errTest = cmd[2]
##        errName = cmd[1]
##        err = 0
##        if (errName.count("#") <> 0):
##           bot.msg(origin.sender,"INVALID NAME TRY AGAIN")
##           err = 1
##        if (errTest.count("#") <> 0):
##           bot.msg(origin.sender,"INVALID IP TRY IT AGAIN")
##           err = 1
##        if (errTest.find("169.254") <> -1):
##           bot.msg(origin.sender,"INVALID IP GO BACK AND TRY AGAIN ")
##        if (errTest.find("169.254") <> -1):
##           err = 1
##        if (errTest.find("192.168") <>-1):
##           bot.msg(origin.sender,"INVALID IP GO BACK AND TRY AGAIN")
##        if (errTest.find("192.168") <>-1):
##           err = 1
##        if (errTest.count(".") <> 3):
##           bot.msg(origin.sender,"INVALID IP GO BACK AND TRY AGAIN")
##        if (errTest.count(".") <> 3):
##           err = 1
##        if (err <> 1):
##          PACL = open(proxyacl,"r")
##          text = PACL.readlines()
##          PACL.close()
##          checked = 1
##          for i in range(len(text)):
##              l = text[i]
##              print "IP::::" + errTest
##              print "LINE::"+l
##              if (str(l.find(str(errTest))) <> '-1'):
##                  checked = 0
##
##          if (checked == 1):
##            PACL = open(proxyacl,"r")
##            text = PACL.readlines()
##            PACL.close()
##            print text
##            remove = cmd[1]
##            print "you are looking for"
##            removed = -1
##            for i in range(len(text)):
##                  l = text[i]
##                  if (l.find(str(remove)) <> -1):
##                          removed = i
##                  if (l.find(str(remove)) <> -1):
##                          break
##            print remove
##            if (removed <> -1):
##                  del text[removed]
##            print "\n\n\n\n\n\n\n\n"
##            print text
##            PACL = open(proxyacl,"w")
##            for i in range(len(text)):
##                  PACL.write(text[i])
##            PACL.close()
##            cmd = "allow from %s  #%s" % (cmd[2],cmd[1])
##            bot.msg(origin.sender, cmd)
##            PACL = open(proxyacl,"a")
##            PACL.writelines(cmd+"\n")
##            PACL.close()
##            bot.msg(origin.sender,"your ip has been added, if the proxy does not work for you check your ip and try again.")
##          else:
##              bot.msg(origin.sender,"your IP is already in this list")
##        else:
##          bot.msg(origin.sender,"Your command was not formatted correctly the correct format is '!proxy <name> <ipaddress>' please try again")
##      result = commands.getoutput("/usr/local/proxy/bin/apachectl stop")
##      print result
##      result = commands.getoutput("/usr/local/proxy/bin/apachectl start")
##      print result
##   bot.rule(proxy, 'proxy', r'!proxy')

##   def hmm(m, origin, (cmd, chan), text, bot=bot):
##      """.test - Do some crazy test thing."""
##      raise Exception, "blargh"
##      bot.msg(origin.sender, 'Test')
##   bot.rule(hmm, 'test', r'^\.test$')

   bot.run(host, port)

if __name__=='__main__':
   test('SERVERIP', PORTNUMBER, ['#CHANNEL'])

Posted in General, Software | Comments (1)

One Response to “Python IRC bot.”

  1. Free Lancers Unite » Cleaned up Python IRC bot. Says:

    [...] most people come across the site after searching for a python irc bot I thought I would clean mine up and make it a little more generic (the previous version had triggers that only worked on my system) [...]

Leave a Reply

You must be logged in to post a comment.