Difference between revisions of "Splunk: Python Lookup"
Jump to navigation
Jump to search
Rafahsolis (talk | contribs) (Created page with "Copy requirements to /opt/splunk/lib/python2.7/site-packages Including SplunkLookup.py:<syntaxhighlight lang="python"> import csv import sys from abc import ABCMeta, abstract...") Tag: visualeditor |
Rafahsolis (talk | contribs) m Tag: visualeditor |
||
| (6 intermediate revisions by the same user not shown) | |||
| Line 1: | Line 1: | ||
Copy requirements to /opt/splunk/lib/python2.7/site-packages | Copy requirements to /opt/splunk/lib/python2.7/site-packages | ||
| − | Including | + | Including splunk_lookup.py:<syntaxhighlight lang="python"> |
import csv | import csv | ||
import sys | import sys | ||
| Line 9: | Line 9: | ||
class SplunkLookup: | class SplunkLookup: | ||
__metaclass__ = ABCMeta | __metaclass__ = ABCMeta | ||
| − | usage = "Usage: python {} [arg1] [arg2]" | + | usage = "Usage: python {} [arg1] [arg2]".format(__file__) |
def __init__(self): | def __init__(self): | ||
| Line 24: | Line 24: | ||
@staticmethod | @staticmethod | ||
def read_arguments(): | def read_arguments(): | ||
| − | + | arg1 = sys.argv[1] | |
| − | + | arg2 = sys.argv[2] | |
| − | return | + | return arg1, arg2 |
@staticmethod | @staticmethod | ||
| Line 65: | Line 65: | ||
class SplunkLookupError(object): | class SplunkLookupError(object): | ||
pass | pass | ||
| + | </syntaxhighlight>Create your own lookup in: /opt/splunk/etc/system/bin | ||
| + | Example : geoip.py<syntaxhighlight lang="python"> | ||
| + | from splunk_lookup import SplunkLookup | ||
| + | from geoip2 import database | ||
| + | from geoip2.errors import AddressNotFoundError | ||
| + | |||
| + | DB_PATH = '/usr/share/geoip/GeoIP2-City.mmdb' | ||
| + | |||
| + | |||
| + | class Geolocator(object): | ||
| + | def __init__(self, ip): | ||
| + | self.ip = ip | ||
| + | self.city = self.read_city() | ||
| + | |||
| + | def read_city(self): | ||
| + | reader = database.Reader(DB_PATH) | ||
| + | city = reader.city(self.ip) | ||
| + | reader.close() | ||
| + | return city | ||
| + | |||
| + | @property | ||
| + | def location(self): | ||
| + | return "{city} ({country})".format(city=unknown_if_none(self.city.city.name), | ||
| + | country=unknown_if_none(self.city.country.name)) | ||
| + | |||
| + | |||
| + | def unknown_if_none(text): | ||
| + | if text is None: | ||
| + | return 'Unknown' | ||
| + | return text | ||
| + | |||
| + | |||
| + | class SplunkLookupGeoIP(SplunkLookup): | ||
| + | def lookup_arg1(self, argument_value2): | ||
| + | return 'Unknown' | ||
| + | |||
| + | def lookup_arg2(self, argument_value1): | ||
| + | try: | ||
| + | locator = Geolocator(argument_value1) | ||
| + | return locator.location | ||
| + | except (AddressNotFoundError, ValueError, TypeError): | ||
| + | return 'Unknown' | ||
| + | |||
| + | |||
| + | if __name__ == '__main__': | ||
| + | SplunkLookupGeoIP() | ||
| + | |||
| + | </syntaxhighlight>Define your lookup At Splunk (Settings > Lookups > Lookup definitions) | ||
| + | [[File:Splunk Lookup Definition.png|alt=Splunk Lookup Creation|left|frameless|800x800px|Splunk Lookup Creation]] | ||
| + | <br /> | ||
| + | ===Query Usage Example=== | ||
| + | <syntaxhighlight lang="text"> | ||
| + | sourcetype="pfsense:filterlog" host="pfsenseoperacionesinternet.rra.lan" dest_int=pppoe0 direction=inbound vendor_action=block | lookup GeoIP ipaddr as src_ip OUTPUT location | stats count by src_ip, location, dest_port, vendor_action | sort -num(count), sort num(src_ip), sort str(location), sort num(dest_port) | ||
</syntaxhighlight> | </syntaxhighlight> | ||
Latest revision as of 08:04, 15 November 2019
Copy requirements to /opt/splunk/lib/python2.7/site-packages
Including splunk_lookup.py:
import csv
import sys
from abc import ABCMeta, abstractmethod
class SplunkLookup:
__metaclass__ = ABCMeta
usage = "Usage: python {} [arg1] [arg2]".format(__file__)
def __init__(self):
self.validate_args()
self.arg1, self.arg2 = self.read_arguments()
self.header, self.stdin = self.read_input()
self.writer = self.write_header()
self.process_stdin()
def validate_args(self):
if len(sys.argv) != 3:
print(self.usage)
@staticmethod
def read_arguments():
arg1 = sys.argv[1]
arg2 = sys.argv[2]
return arg1, arg2
@staticmethod
def read_input():
infile = sys.stdin
reader = csv.DictReader(infile)
header = reader.fieldnames
return header, reader
def write_header(self):
stdout = sys.stdout
writer = csv.DictWriter(stdout, fieldnames=self.header)
writer.writeheader()
return writer
def process_stdin(self):
for result in self.stdin:
self.lookup_missing(result)
self.writer.writerow(result)
def lookup_missing(self, result):
if result[self.arg1] and result[self.arg2]:
pass
elif result[self.arg1]:
result.update({self.arg2: self.lookup_arg2(result[self.arg1])})
elif result[self.arg2]:
result.update({self.arg1: self.lookup_arg1(result[self.arg2])})
@abstractmethod
def lookup_arg2(self, argument_value1):
pass
@abstractmethod
def lookup_arg1(self, argument_value2):
pass
class SplunkLookupError(object):
pass
Create your own lookup in: /opt/splunk/etc/system/bin Example : geoip.py
from splunk_lookup import SplunkLookup
from geoip2 import database
from geoip2.errors import AddressNotFoundError
DB_PATH = '/usr/share/geoip/GeoIP2-City.mmdb'
class Geolocator(object):
def __init__(self, ip):
self.ip = ip
self.city = self.read_city()
def read_city(self):
reader = database.Reader(DB_PATH)
city = reader.city(self.ip)
reader.close()
return city
@property
def location(self):
return "{city} ({country})".format(city=unknown_if_none(self.city.city.name),
country=unknown_if_none(self.city.country.name))
def unknown_if_none(text):
if text is None:
return 'Unknown'
return text
class SplunkLookupGeoIP(SplunkLookup):
def lookup_arg1(self, argument_value2):
return 'Unknown'
def lookup_arg2(self, argument_value1):
try:
locator = Geolocator(argument_value1)
return locator.location
except (AddressNotFoundError, ValueError, TypeError):
return 'Unknown'
if __name__ == '__main__':
SplunkLookupGeoIP()
Define your lookup At Splunk (Settings > Lookups > Lookup definitions)
Query Usage Example[edit]
sourcetype="pfsense:filterlog" host="pfsenseoperacionesinternet.rra.lan" dest_int=pppoe0 direction=inbound vendor_action=block | lookup GeoIP ipaddr as src_ip OUTPUT location | stats count by src_ip, location, dest_port, vendor_action | sort -num(count), sort num(src_ip), sort str(location), sort num(dest_port)