Building an Enterprise Grade OpenSource Web Analytics System – Part 4: Data Processing

This is the fourth part of a seven-part-series explaining how to build an Enterprise Grade OpenSource Web Analytics System. In this post we are building the processing layer to work with our raw log lines. In the last post we used Nginx and Filebeat to write our tracking events to Kafka. If you are new to this series it might help to start with the first post.

At this part of the series, we have a lot of raw tracking events in our Kafka topic. We could already use this topic to store the raw loglines to our Hadoop cluster or a database. But it would be much easier later on to do some additional processing to make our life a litte easier. Since Python is the data science language today we will be using that language. The result will then be written to another Kafka topic for further processing and storing.

Getting and processing data from Kafka with Python

Python has a nice extension helping us to handle Kafka very easily. Once we imported it, we can just setup our Kafka client and consume messages like this:

from kafka import KafkaConsumer,KafkaProducer
consumer = KafkaConsumer('tracking_raw',bootstrap_servers='[Your Kafka instance]',group_id='python_parser')
producer = KafkaProducer(bootstrap_servers='[Your Kafka instance]',acks=1,value_serializer=lambda v: json.dumps(v).encode('utf-8'))

for msg in consumer:
    print("Received "+msg.key.decode())

This will read from our tracking_raw topic and print the key of the event once it is received, which is very helpful for debugging. Let’s define a variable for the result that we are going to put together:

processed_result = {}

processed_result["kafka_raw"] = []
processed_result["kafka_raw"].append({"name":"topic" ,"value": msg.topic})
processed_result["kafka_raw"].append({"name":"offset" ,"value": msg.offset})
processed_result["kafka_raw"].append({"name":"timestamp" ,"value": msg.timestamp})
processed_result["kafka_raw"].append({"name":"key" ,"value": msg.key.decode()})
processed_result["kafka_raw"].append({"name":"serialized_value_size" ,"value": msg.serialized_value_size})

The first part of our result is a list of metadata in the “kafka_raw” namespace. Next, let’s decode the log line we received from Nginx and put the Nginx information in the “custom_webserver” namespace of our result:

msg_payload = json.loads(msg.value)

processed_result["custom_webserver"] = []
processed_result["custom_webserver"].append({"name":"timestamp","value":msg_payload["@timestamp"]})
processed_result["custom_webserver"].append({"name":"filebeat_version","value":msg_payload["@metadata"]["version"]})
processed_result["custom_webserver"].append({"name":"log_offset","value":msg_payload["log"]["offset"]})
processed_result["custom_webserver"].append({"name":"agent_id","value":msg_payload["agent"]["id"]})
processed_result["custom_webserver"].append({"name":"ephemeral_id","value":msg_payload["agent"]["ephemeral_id"]})
processed_result["custom_webserver"].append({"name":"host","value":msg_payload["json"]["host"]})
processed_result["custom_webserver"].append({"name":"server_name","value":msg_payload["json"]["server_name"]})
processed_result["custom_webserver"].append({"name":"uri","value":msg_payload["json"]["uri"]})
processed_result["custom_webserver"].append({"name":"status","value":msg_payload["json"]["status"]})
processed_result["custom_webserver"].append({"name":"remote_addr","value":msg_payload["json"]["remote_addr"]})
processed_result["custom_webserver"].append({"name":"proxy_ip","value":msg_payload["json"]["proxy_ip"]})
processed_result["custom_webserver"].append({"name":"user_agent","value":msg_payload["json"]["user_agent"]})
processed_result["custom_webserver"].append({"name":"referer","value":msg_payload["json"]["referer"]})
processed_result["custom_webserver"].append({"name":"scheme","value":msg_payload["json"]["scheme"]})
processed_result["custom_webserver"].append({"name":"size","value":msg_payload["json"]["size"]})
processed_result["custom_webserver"].append({"name":"msec","value":msg_payload["json"]["msec"]})
processed_result["custom_webserver"].append({"name":"time","value":msg_payload["json"]["time"]})

So far, we just made our life a little easier. But now we are starting to actually enrich our data. A standard datapoint is the information about the user location based on the IP address, which we already have. So let’s lookup the user position and put it in the “custom_geoip” namespace. See the complete code for the required imports:

reader = geolite2.reader()
geoip_lookup = reader.get(msg_payload["json"]["remote_addr"])
if geoip_lookup != None:
    processed_result["custom_geoip"] = []
    processed_result["custom_geoip"].append({"name":"city_name","value":geoip_lookup["city"]["names"]["en"]})
    processed_result["custom_geoip"].append({"name":"continent_name","value":geoip_lookup["continent"]["names"]["en"]})
    processed_result["custom_geoip"].append({"name":"country_name","value":geoip_lookup["country"]["names"]["en"]})
    processed_result["custom_geoip"].append({"name":"country_iso","value":geoip_lookup["country"]["iso_code"]})
    processed_result["custom_geoip"].append({"name":"country_eu","value":geoip_lookup["country"]["is_in_european_union"]})
    processed_result["custom_geoip"].append({"name":"accuracy_radius","value":geoip_lookup["location"]["accuracy_radius"]})
    processed_result["custom_geoip"].append({"name":"latitude","value":geoip_lookup["location"]["latitude"]})
    processed_result["custom_geoip"].append({"name":"longitude","value":geoip_lookup["location"]["longitude"]})
    processed_result["custom_geoip"].append({"name":"time_zone","value":geoip_lookup["location"]["time_zone"]})
    processed_result["custom_geoip"].append({"name":"postal","value":geoip_lookup["postal"]["code"]})

The next interesting thing to know is about the device our visitors use, called the useragent. We will be using a lib to get some information about that:

if (len(msg_payload["json"]["user_agent"]) > 0):
    user_agent = user_agents.parse(msg_payload["json"]["user_agent"])
    processed_result["custom_useragent"] = []
    processed_result["custom_useragent"].append({"name":"os_family" ,"value": user_agent.os.family})
    processed_result["custom_useragent"].append({"name":"os_version_string" ,"value": user_agent.os.version_string})
    processed_result["custom_useragent"].append({"name":"browser_family" ,"value": user_agent.browser.family})
    processed_result["custom_useragent"].append({"name":"browser_version_string" ,"value": user_agent.browser.version_string})
    processed_result["custom_useragent"].append({"name":"device_family" ,"value": user_agent.device.family})
    processed_result["custom_useragent"].append({"name":"device_brand" ,"value": user_agent.device.brand})
    processed_result["custom_useragent"].append({"name":"device_model" ,"value": user_agent.device.model})
    processed_result["custom_useragent"].append({"name":"simple" ,"value": str(user_agent)})

Now it’s time for the most interesting part. We are taking the query parameters from our logfiles, parse it and iterate through them to make them more understandable. First, let’s parse it and start the iteration:

request_query_string = msg_payload["json"]["query_string"]
if (len (request_query_string) > 0):
    query_dict = query_string.parse(request_query_string)

if "query_dict" in locals() and query_dict != None:
    processed_result["query"] = []
    for element in query_dict:

Depending on what the query parameter currently is during our iteration, we want to take different action. So we are going to start with the context data, which contains our custom variables and page information:

if(element=="co"):
    for context in json.loads(query_dict["co"])["data"]:
        if ("schema" in context):
            schema = re.findall(r'\/([^\/]+)\/', context["schema"])[0]
            for name in context["data"]:
                processed_result["query"].append({"name":"context_"+schema+"_"+name,"value":context["data"][name]})
        else:
            for name in context["data"]:
                processed_result["query"].append({"name":"context_"+name,"value":context["data"][name]})

Depending on whether we have schema information for the context variable, we will put it in the “query” namespace in our result with a different name. The next case is to handle referrer information, if there is any:

elif(element=="refr"):
    r = Referer(query_dict[element],query_dict["url"])
    processed_result["query"].append({"name":"referrer_"+"referrer","value":element})
    processed_result["query"].append({"name":"referrer_"+"known","value":r.known})
    processed_result["query"].append({"name":"referrer_"+"medium","value":r.medium})
    processed_result["query"].append({"name":"referrer_"+"search_parameter","value":r.search_parameter})
    processed_result["query"].append({"name":"referrer_"+"search_term","value":r.search_term})
    processed_result["query"].append({"name":"referrer_uri_"+"scheme","value":r.uri.scheme})
    processed_result["query"].append({"name":"referrer_uri_"+"netloc","value":r.uri.netloc})
    processed_result["query"].append({"name":"referrer_uri_"+"path","value":r.uri.path})
    processed_result["query"].append({"name":"referrer_uri_"+"params","value":r.uri.params})
    processed_result["query"].append({"name":"referrer_uri_"+"query","value":r.uri.query})
    processed_result["query"].append({"name":"referrer_uri_"+"fragment","value":r.uri.fragment})

We are using a lib provided by Snowplow (here they are again!) to get information about the referrer and decide if the referral comes from inside our own domain. This also helps us to identify search engines! Next, let’s take apart the events we are receiving from our implementation:

elif(element=="ue_pr"):
    schema = re.findall(r'\/([^\/]+)\/',json.loads(query_dict[element])["data"]["schema"])[0]
    for key,value in json.loads(query_dict[element])["data"]["data"].items():
        processed_result["query"].append({"name":"event_unstructured_"+schema+"_"+key,"value":value})
        processed_result["query"].append({"name":"event_unstructured_schema","value":schema})

Now we only need to handle other cases, like event types and internal Snowplow parameters. To help understand this there is a lookup defined in the complete snippet below. The case is handled like this:

else:
    if element in parameter_lookup:
        processed_result["query"].append({"name":"other_"+parameter_lookup[element],"value":query_dict[element]})
    else:
        processed_result["query"].append({"name":"other_"+element,"value":query_dict[element]})
    if element=="e":
        processed_result["query"].append({"name":"other_eventtype","value":event_lookup[query_dict[element]]})

Last thing, we only need to send our data structure off to Kafka again. I am using a topic called “tracking_processed” here. For the next message, we need to reset our variables and are done!

print("Send "+msg.key.decode()+"\n")
producer.send("tracking_processed",value=processed_result,key=msg.key)
processed_result = None
query_dict = None
geoip_lookup = None
msg_payload = None
request_query_string = None

We are using the same message key as in the original here. In the console, we see a line for every event we receive:

Received 127.0.0.1-95d3aec1e146872d0e766698b7a64638
Send 127.0.0.1-95d3aec1e146872d0e766698b7a64638

Received 127.0.0.1-e28544ed81f62d06162137eaf11a0a44
Send 127.0.0.1-e28544ed81f62d06162137eaf11a0a44

The document we send to Kafka will then look like this:

{
  "kafka_raw": [
    {
      "name": "topic",
      "value": "tracking_raw"
    },
    {
      "name": "offset",
      "value": 992
    },
    {
      "name": "timestamp",
      "value": 1586767595151
    },
    {
      "name": "key",
      "value": "127.0.0.1-95d3aec1e146872d0e766698b7a64638"
    },
    {
      "name": "serialized_value_size",
      "value": 2900
    }
  ],
  "custom_webserver": [
    {
      "name": "timestamp",
      "value": "2020-04-13T08:46:35.151Z"
    },
    {
      "name": "filebeat_version",
      "value": "7.5.1"
    },
    {
      "name": "log_offset",
      "value": 1935365
    },
    {
      "name": "agent_id",
      "value": "cfe78877-0f7d-4077-ba67-d0113a2f8911"
    },
    {
      "name": "ephemeral_id",
      "value": "796897a5-bbf5-44f8-b42c-61189a83da00"
    },
    {
      "name": "host",
      "value": "127.0.0.1"
    },
    {
      "name": "server_name",
      "value": "localhost"
    },
    {
      "name": "uri",
      "value": "/analytics/i"
    },
    {
      "name": "status",
      "value": "200"
    },
    {
      "name": "remote_addr",
      "value": "127.0.0.1"
    },
    {
      "name": "proxy_ip",
      "value": ""
    },
    {
      "name": "user_agent",
      "value": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.163 Safari/537.36"
    },
    {
      "name": "referer",
      "value": "http://127.0.0.1/sp/index.html"
    },
    {
      "name": "scheme",
      "value": "http"
    },
    {
      "name": "size",
      "value": "43"
    },
    {
      "name": "msec",
      "value": "1586767588.225"
    },
    {
      "name": "time",
      "value": "2020-04-13T10:46:28+02:00"
    }
  ],
  "custom_useragent": [
    {
      "name": "os_family",
      "value": "Windows"
    },
    {
      "name": "os_version_string",
      "value": "10"
    },
    {
      "name": "browser_family",
      "value": "Chrome"
    },
    {
      "name": "browser_version_string",
      "value": "80.0.3987"
    },
    {
      "name": "device_family",
      "value": "Other"
    },
    {
      "name": "device_brand",
      "value": "None"
    },
    {
      "name": "device_model",
      "value": "None"
    },
    {
      "name": "simple",
      "value": "PC / Windows 10 / Chrome 80.0.3987"
    }
  ],
  "query": [
    {
      "name": "other_dvce_sent_tstamp",
      "value": "1586767588224"
    },
    {
      "name": "other_event",
      "value": "pp"
    },
    {
      "name": "other_eventtype",
      "value": "Page ping"
    },
    {
      "name": "other_page_url",
      "value": "http://127.0.0.1/sp/index.html#"
    },
    {
      "name": "other_page_title",
      "value": "my custom page title"
    },
    {
      "name": "referrer_referrer",
      "value": "refr"
    },
    {
      "name": "referrer_known",
      "value": "True"
    },
    {
      "name": "referrer_medium",
      "value": "internal"
    },
    {
      "name": "referrer_search_parameter",
      "value": "None"
    },
    {
      "name": "referrer_search_term",
      "value": "None"
    },
    {
      "name": "referrer_uri_scheme",
      "value": "http"
    },
    {
      "name": "referrer_uri_netloc",
      "value": "127.0.0.1"
    },
    {
      "name": "referrer_uri_path",
      "value": "/sp/index.html"
    },
    {
      "name": "referrer_uri_params",
      "value": ""
    },
    {
      "name": "referrer_uri_query",
      "value": ""
    },
    {
      "name": "referrer_uri_fragment",
      "value": ""
    },
    {
      "name": "other_pp_xoffset_min",
      "value": "0"
    },
    {
      "name": "other_pp_xoffset_max",
      "value": "0"
    },
    {
      "name": "other_pp_yoffset_min",
      "value": "0"
    },
    {
      "name": "other_pp_yoffset_max",
      "value": "0"
    },
    {
      "name": "other_v_tracker",
      "value": "js-2.12.0"
    },
    {
      "name": "other_name_tracker",
      "value": "webtracking"
    },
    {
      "name": "other_app_id",
      "value": "testpage"
    },
    {
      "name": "other_platform",
      "value": "web"
    },
    {
      "name": "other_os_timezone",
      "value": "Europe/Berlin"
    },
    {
      "name": "other_br_lang",
      "value": "de-DE"
    },
    {
      "name": "other_doc_charset",
      "value": "windows-1252"
    },
    {
      "name": "other_br_features_pdf",
      "value": "1"
    },
    {
      "name": "other_br_features_quicktime",
      "value": "0"
    },
    {
      "name": "other_br_features_realplayer",
      "value": "0"
    },
    {
      "name": "other_br_features_windowsmedia",
      "value": "0"
    },
    {
      "name": "other_br_features_director",
      "value": "0"
    },
    {
      "name": "other_br_features_flash",
      "value": "0"
    },
    {
      "name": "other_br_features_java",
      "value": "0"
    },
    {
      "name": "other_br_features_gears",
      "value": "0"
    },
    {
      "name": "other_br_features_silverlight",
      "value": "0"
    },
    {
      "name": "other_dvce_screensize",
      "value": "1680x1050"
    },
    {
      "name": "other_br_colordepth",
      "value": "24"
    },
    {
      "name": "other_br_cookies",
      "value": "1"
    },
    {
      "name": "other_event_id",
      "value": "3d8b6bed-7f43-4659-9f12-dc75699e3feb"
    },
    {
      "name": "other_dvce_created_tstamp",
      "value": "1586767588223"
    },
    {
      "name": "other_br_viewsize",
      "value": "1125x957"
    },
    {
      "name": "other_doc_size",
      "value": "1125x957"
    },
    {
      "name": "other_domain_sessionidx",
      "value": "9"
    },
    {
      "name": "other_domain_sessionid",
      "value": "ad5f99b5-9aeb-4a24-af79-1095a4237d9f"
    },
    {
      "name": "other_domain_userid",
      "value": "6ad7e729-3392-4b9e-9663-f3ef9e49831d"
    },
    {
      "name": "other_user_fingerprint",
      "value": "557407330"
    },
    {
      "name": "context_a Date",
      "value": "Mon Apr 13 2020 10:46:08 GMT+0200 (MitteleuropΓ€ische Sommerzeit)"
    },
    {
      "name": "context_a String",
      "value": "Hello there!"
    },
    {
      "name": "context_web_page_id",
      "value": "a664552f-9cac-4228-b25f-d23ecb87ebd3"
    },
    {
      "name": "context_PerformanceTiming_navigationStart",
      "value": 1586767568195
    },
    {
      "name": "context_PerformanceTiming_unloadEventStart",
      "value": 1586767568202
    },
    {
      "name": "context_PerformanceTiming_unloadEventEnd",
      "value": 1586767568202
    },
    {
      "name": "context_PerformanceTiming_redirectStart",
      "value": 0
    },
    {
      "name": "context_PerformanceTiming_redirectEnd",
      "value": 0
    },
    {
      "name": "context_PerformanceTiming_fetchStart",
      "value": 1586767568195
    },
    {
      "name": "context_PerformanceTiming_domainLookupStart",
      "value": 1586767568195
    },
    {
      "name": "context_PerformanceTiming_domainLookupEnd",
      "value": 1586767568195
    },
    {
      "name": "context_PerformanceTiming_connectStart",
      "value": 1586767568195
    },
    {
      "name": "context_PerformanceTiming_connectEnd",
      "value": 1586767568195
    },
    {
      "name": "context_PerformanceTiming_secureConnectionStart",
      "value": 0
    },
    {
      "name": "context_PerformanceTiming_requestStart",
      "value": 1586767568197
    },
    {
      "name": "context_PerformanceTiming_responseStart",
      "value": 1586767568199
    },
    {
      "name": "context_PerformanceTiming_responseEnd",
      "value": 1586767568199
    },
    {
      "name": "context_PerformanceTiming_domLoading",
      "value": 1586767568204
    },
    {
      "name": "context_PerformanceTiming_domInteractive",
      "value": 1586767568211
    },
    {
      "name": "context_PerformanceTiming_domContentLoadedEventStart",
      "value": 1586767568211
    },
    {
      "name": "context_PerformanceTiming_domContentLoadedEventEnd",
      "value": 1586767568211
    },
    {
      "name": "context_PerformanceTiming_domComplete",
      "value": 1586767568226
    },
    {
      "name": "context_PerformanceTiming_loadEventStart",
      "value": 1586767568226
    },
    {
      "name": "context_PerformanceTiming_loadEventEnd",
      "value": 1586767568227
    }
  ]
}

The complete code with all imports and the Snowplow lookup can be seen below:

import json
import re
import query_string
import user_agents
from geolite2 import geolite2
from referer_parser import Referer
from kafka import KafkaConsumer,KafkaProducer
consumer = KafkaConsumer('tracking_raw',bootstrap_servers='[Your Kafka instance]',group_id='python_parser')
producer = KafkaProducer(bootstrap_servers='[Your Kafka instance]',acks=1,value_serializer=lambda v: json.dumps(v).encode('utf-8'))

parameter_lookup = {
    "tna":"name_tracker",
    "evn":"event_vendor",
    "aid":"app_id",
    "p":"platform",
    "dtm":"dvce_created_tstamp",
    "stm":"dvce_sent_tstamp",
    "ttm":"true_tstamp",
    "tz":"os_timezone",
    "e":"event",
    "tid":"txn_id",
    "eid":"event_id",
    "tv":"v_tracker",
    "duid":"domain_userid",
    "nuid":"network_userid",
    "tnuid":"network_userid",
    "uid":"user_id",
    "vid":"domain_sessionidx",
    "sid":"domain_sessionid",
    "ip":"user_ipaddress",
    "res":"dvce_screensize",
    "url":"page_url",
    "ua":"useragent",
    "page":"page_title",
    "refr":"page_referrer",
    "fp":"user_fingerprint",
    "ctype":"connection_type",
    "cookie":"br_cookies",
    "lang":"br_lang",
    "f_pdf":"br_features_pdf",
    "f_qt":"br_features_quicktime",
    "f_realp":"br_features_realplayer",
    "f_wma":"br_features_windowsmedia",
    "f_dir":"br_features_director",
    "f_fla":"br_features_flash",
    "f_java":"br_features_java",
    "f_gears":"br_features_gears",
    "f_ag":"br_features_silverlight",
    "cd":"br_colordepth",
    "ds":"doc_size",
    "cs":"doc_charset",
    "vp":"br_viewsize",
    "mac":"mac_address",
    "pp_mix":"pp_xoffset_min",
    "pp_max":"pp_xoffset_max",
    "pp_miy":"pp_yoffset_min",
    "pp_may":"pp_yoffset_max"
}

event_lookup = {
    "pv":"Pageview",
    "pp":"Page ping",
    "ue":"Link click",
    "ue":"Ad impression",
    "tr":"Ecommerce transaction",
    "ti":"Ecommerce transaction",
    "se":"Custom structured event",
    "ue":"Custom unstructured event"
}

for msg in consumer:
    print("Recieved "+msg.key.decode())
    processed_result = {}

    processed_result["kafka_raw"] = []
    processed_result["kafka_raw"].append({"name":"topic" ,"value": msg.topic})
    processed_result["kafka_raw"].append({"name":"offset" ,"value": msg.offset})
    processed_result["kafka_raw"].append({"name":"timestamp" ,"value": msg.timestamp})
    processed_result["kafka_raw"].append({"name":"key" ,"value": msg.key.decode()})
    processed_result["kafka_raw"].append({"name":"serialized_value_size" ,"value": msg.serialized_value_size})

    msg_payload = json.loads(msg.value)

    processed_result["custom_webserver"] = []
    processed_result["custom_webserver"].append({"name":"timestamp","value":msg_payload["@timestamp"]})
    processed_result["custom_webserver"].append({"name":"filebeat_version","value":msg_payload["@metadata"]["version"]})
    processed_result["custom_webserver"].append({"name":"log_offset","value":msg_payload["log"]["offset"]})
    processed_result["custom_webserver"].append({"name":"agent_id","value":msg_payload["agent"]["id"]})
    processed_result["custom_webserver"].append({"name":"ephemeral_id","value":msg_payload["agent"]["ephemeral_id"]})
    processed_result["custom_webserver"].append({"name":"host","value":msg_payload["json"]["host"]})
    processed_result["custom_webserver"].append({"name":"server_name","value":msg_payload["json"]["server_name"]})
    processed_result["custom_webserver"].append({"name":"uri","value":msg_payload["json"]["uri"]})
    processed_result["custom_webserver"].append({"name":"status","value":msg_payload["json"]["status"]})
    processed_result["custom_webserver"].append({"name":"remote_addr","value":msg_payload["json"]["remote_addr"]})
    processed_result["custom_webserver"].append({"name":"proxy_ip","value":msg_payload["json"]["proxy_ip"]})
    processed_result["custom_webserver"].append({"name":"user_agent","value":msg_payload["json"]["user_agent"]})
    processed_result["custom_webserver"].append({"name":"referer","value":msg_payload["json"]["referer"]})
    processed_result["custom_webserver"].append({"name":"scheme","value":msg_payload["json"]["scheme"]})
    processed_result["custom_webserver"].append({"name":"size","value":msg_payload["json"]["size"]})
    processed_result["custom_webserver"].append({"name":"msec","value":msg_payload["json"]["msec"]})
    processed_result["custom_webserver"].append({"name":"time","value":msg_payload["json"]["time"]})

    reader = geolite2.reader()
    geoip_lookup = reader.get(msg_payload["json"]["remote_addr"])
    if geoip_lookup != None:
        processed_result["custom_geoip"] = []
        processed_result["custom_geoip"].append({"name":"city_name","value":geoip_lookup["city"]["names"]["en"]})
        processed_result["custom_geoip"].append({"name":"continent_name","value":geoip_lookup["continent"]["names"]["en"]})
        processed_result["custom_geoip"].append({"name":"country_name","value":geoip_lookup["country"]["names"]["en"]})
        processed_result["custom_geoip"].append({"name":"country_iso","value":geoip_lookup["country"]["iso_code"]})
        processed_result["custom_geoip"].append({"name":"country_eu","value":geoip_lookup["country"]["is_in_european_union"]})
        processed_result["custom_geoip"].append({"name":"accuracy_radius","value":geoip_lookup["location"]["accuracy_radius"]})
        processed_result["custom_geoip"].append({"name":"latitude","value":geoip_lookup["location"]["latitude"]})
        processed_result["custom_geoip"].append({"name":"longitude","value":geoip_lookup["location"]["longitude"]})
        processed_result["custom_geoip"].append({"name":"time_zone","value":geoip_lookup["location"]["time_zone"]})
        processed_result["custom_geoip"].append({"name":"postal","value":geoip_lookup["postal"]["code"]})

    if (len(msg_payload["json"]["user_agent"]) > 0):
        user_agent = user_agents.parse(msg_payload["json"]["user_agent"])
        processed_result["custom_useragent"] = []
        processed_result["custom_useragent"].append({"name":"os_family" ,"value": user_agent.os.family})
        processed_result["custom_useragent"].append({"name":"os_version_string" ,"value": user_agent.os.version_string})
        processed_result["custom_useragent"].append({"name":"browser_family" ,"value": user_agent.browser.family})
        processed_result["custom_useragent"].append({"name":"browser_version_string" ,"value": user_agent.browser.version_string})
        processed_result["custom_useragent"].append({"name":"device_family" ,"value": user_agent.device.family})
        processed_result["custom_useragent"].append({"name":"device_brand" ,"value": user_agent.device.brand})
        processed_result["custom_useragent"].append({"name":"device_model" ,"value": user_agent.device.model})
        processed_result["custom_useragent"].append({"name":"simple" ,"value": str(user_agent)})

    request_query_string = msg_payload["json"]["query_string"]
    if (len (request_query_string) > 0):
        query_dict = query_string.parse(request_query_string)

    if "query_dict" in locals() and query_dict != None:
        processed_result["query"] = []
        for element in query_dict:
            if(element=="co"):
                for context in json.loads(query_dict["co"])["data"]:
                    if ("schema" in context):
                        schema = re.findall(r'\/([^\/]+)\/', context["schema"])[0]
                        for name in context["data"]:
                            processed_result["query"].append({"name":"context_"+schema+"_"+name,"value":context["data"][name]})
                    else:
                        for name in context["data"]:
                            processed_result["query"].append({"name":"context_"+name,"value":context["data"][name]})

            elif(element=="refr"):
                r = Referer(query_dict[element],query_dict["url"])
                processed_result["query"].append({"name":"referrer_"+"referrer","value":element})
                processed_result["query"].append({"name":"referrer_"+"known","value":r.known})
                processed_result["query"].append({"name":"referrer_"+"medium","value":r.medium})
                processed_result["query"].append({"name":"referrer_"+"search_parameter","value":r.search_parameter})
                processed_result["query"].append({"name":"referrer_"+"search_term","value":r.search_term})
                processed_result["query"].append({"name":"referrer_uri_"+"scheme","value":r.uri.scheme})
                processed_result["query"].append({"name":"referrer_uri_"+"netloc","value":r.uri.netloc})
                processed_result["query"].append({"name":"referrer_uri_"+"path","value":r.uri.path})
                processed_result["query"].append({"name":"referrer_uri_"+"params","value":r.uri.params})
                processed_result["query"].append({"name":"referrer_uri_"+"query","value":r.uri.query})
                processed_result["query"].append({"name":"referrer_uri_"+"fragment","value":r.uri.fragment})

            elif(element=="ue_pr"):
                schema = re.findall(r'\/([^\/]+)\/',json.loads(query_dict[element])["data"]["schema"])[0]
                for key,value in json.loads(query_dict[element])["data"]["data"].items():
                    processed_result["query"].append({"name":"event_unstructured_"+schema+"_"+key,"value":value})
                    processed_result["query"].append({"name":"event_unstructured_schema","value":schema})

            else:
                if element in parameter_lookup:
                    processed_result["query"].append({"name":"other_"+parameter_lookup[element],"value":query_dict[element]})
                else:
                    processed_result["query"].append({"name":"other_"+element,"value":query_dict[element]})
                if element=="e":
                    processed_result["query"].append({"name":"other_eventtype","value":event_lookup[query_dict[element]]})

    print("Send "+msg.key.decode()+"\n")
    producer.send("tracking_processed",value=processed_result,key=msg.key)
    processed_result = None
    query_dict = None
    geoip_lookup = None
    msg_payload = None
    request_query_string = None

Now we have a new Kafka topic “tracking_processed” which contains our processed and enriched events. In the next post, we are looking at persisting some profile data!