Building an Enterprise Grade OpenSource Web Analytics System – Part 5: Visitor Profile

This is the fifth part of a seven-part-series explaining how to build an Enterprise Grade OpenSource Web Analytics System. In this post we are going to build a visitor profile to persist some of the data we track with Python and Redis. In the last post we processed the raw data using Python and wrote it back to Kafka. If you are new to this series it might help to start with the first post.

Now that we have a nice processed version of our events, we want to remember certain things about our users. To do this, we are going to create a Visitor Profile in Redis as high performance storage. The process for persisting values will look like this:

Building our Visitor Profile

First things in this part, we are setting up a little helper script that will take our processed tracking events and flatten them. It looks like this, takes events from the processed Kafka topic and writes it to the flattened topic:

import json
from kafka import KafkaConsumer,KafkaProducer
consumer = KafkaConsumer('tracking_processed',bootstrap_servers='[Your Kafka instance]',group_id='python_flattener')
producer = KafkaProducer(bootstrap_servers='[Your Kafka instance]',acks=1,value_serializer=lambda v: json.dumps(v).encode('utf-8'))

for msg in consumer:
    print("Received "+msg.key.decode())
    
    msg_payload = json.loads(msg.value)
    processed_result = {}

    for key,value in msg_payload.items():
        for element in value:
            processed_result[key+"-"+element["name"]] = element["value"]
    print(processed_result)

    print("Send "+msg.key.decode()+"\n")
    producer.send("tracking_flattened",value=processed_result,key=msg.key)
    processed_result = None
    msg_payload = None
    request_query_string = None

Now we have a new topic that contains a flat JSON with namespaced keys and their value. That will help with some storage destinations and our visitor profile!

With that new topic, let’s build the script that will persist values to Redis for us. The first step is to include the Redis libs in Python and connect to it:

import json
import redis
from kafka import KafkaConsumer,KafkaProducer
consumer = KafkaConsumer('tracking_flattened',bootstrap_servers='[Your Kafka instance]',group_id='python_profiler')
producer = KafkaProducer(bootstrap_servers='[Your Kafka instance]',acks=1,value_serializer=lambda v: json.dumps(v).encode('utf-8'))
r = redis.Redis(host='[Your Redis instance]', charset="utf-8", decode_responses=True)

for msg in consumer:
    print("Received "+msg.key.decode())
    msg_payload = json.loads(msg.value)
    processed_result = {}

This will take items from our flattened topic and connect to Redis, decoding binary values to make them easier handled later on. Next up we check if we already know about the user ID of our visitor in Redis:

if r.exists(msg_payload["query-other_domain_userid"]):
    new_visitor = False
    visitor_profile = r.hgetall(msg_payload["query-other_domain_userid"])
else:
    new_visitor = True
    new_visit = True

If we find our current user in Redis, we receive their persisted profile. If not, we set some flags for later on. Depending on those flags, we decide how to proceed:

if new_visitor:
    r.hset(msg_payload["query-other_domain_userid"], "visitor_id", msg_payload["query-other_domain_userid"])
    r.hset(msg_payload["query-other_domain_userid"], "visit_id", msg_payload["query-other_domain_sessionid"])
else:
    if msg_payload["query-other_domain_sessionid"] == visitor_profile["visit_id"]:
        new_visit = False
    else:
        new_visit = True
        r.hset(msg_payload["query-other_domain_userid"], "visit_id", msg_payload["query-other_domain_sessionid"])

When our visitor is new to us, we store the visitor and visit IDs to Redis with the visitor ID as key. If we already know the visitor, we have set the visit ID in the past, so we can compare it with our current visit ID. If it is the same ID, we are processing an event from a running visit. If not, we have a new visit and store it’s ID to Redis.

One cool application for our project is determining entry pages. So we are using our flags and profile like this:

if "query-other_eventtype" in msg_payload and msg_payload["query-other_eventtype"]=="Pageview":
    if new_visitor:
        r.hset(msg_payload["query-other_domain_userid"], "visitor_first_page", msg_payload["query-other_page_title"])
        r.hset(msg_payload["query-other_domain_userid"], "visit_first_page", msg_payload["query-other_page_title"])
    else:
        if new_visit:
            r.hset(msg_payload["query-other_domain_userid"], "visit_first_page", msg_payload["query-other_page_title"])
    r.hset(msg_payload["query-other_domain_userid"], "visitor_last_page", msg_payload["query-other_page_title"])
    r.hset(msg_payload["query-other_domain_userid"], "visit_last_page", msg_payload["query-other_page_title"])

This code checks if the current event is a Pageview. If it is, we set some variables in Redis to remember the first page for a visit or visitor as well as the last page. That way we can easily analyze our users behavior in regards to how they started their journey! There is still a lot more usecases to be explored, but they all come down to some logic like above.

Now all we need to do is add the profile information to the record and send it off to Kafka again, this time to the “tracking_persisted” topic:

processed_result = msg_payload
visitor_profile = r.hgetall(msg_payload["query-other_domain_userid"])

for key,value in visitor_profile.items():
    processed_result["profile_"+key] = value

print("Send "+msg.key.decode()+"\n")
producer.send("tracking_persisted",value=processed_result,key=msg.key)
processed_result = None
msg_payload = None
request_query_string = None
visitor_profile = None

Now our complete code looks like this:

import json
import redis
from kafka import KafkaConsumer,KafkaProducer
consumer = KafkaConsumer('tracking_flattened',bootstrap_servers='[Your Kafka instance]',group_id='python_profiler')
producer = KafkaProducer(bootstrap_servers='[Your Kafka instance]',acks=1,value_serializer=lambda v: json.dumps(v).encode('utf-8'))
r = redis.Redis(host='[Your Redis instance]', charset="utf-8", decode_responses=True)

for msg in consumer:
    print("Received "+msg.key.decode())
    
    msg_payload = json.loads(msg.value)
    processed_result = {}

    if r.exists(msg_payload["query-other_domain_userid"]):
        new_visitor = False
        visitor_profile = r.hgetall(msg_payload["query-other_domain_userid"])
    else:
        new_visitor = True
        new_visit = True

    if new_visitor:
        r.hset(msg_payload["query-other_domain_userid"], "visitor_id", msg_payload["query-other_domain_userid"])
        r.hset(msg_payload["query-other_domain_userid"], "visit_id", msg_payload["query-other_domain_sessionid"])
    else:
        if msg_payload["query-other_domain_sessionid"] == visitor_profile["visit_id"]:
            new_visit = False
        else:
            new_visit = True
            r.hset(msg_payload["query-other_domain_userid"], "visit_id", msg_payload["query-other_domain_sessionid"])

    if "query-other_eventtype" in msg_payload and msg_payload["query-other_eventtype"]=="Pageview":
        if new_visitor:
            r.hset(msg_payload["query-other_domain_userid"], "visitor_first_page", msg_payload["query-other_page_title"])
            r.hset(msg_payload["query-other_domain_userid"], "visit_first_page", msg_payload["query-other_page_title"])
        else:
            if new_visit:
                r.hset(msg_payload["query-other_domain_userid"], "visit_first_page", msg_payload["query-other_page_title"])
        r.hset(msg_payload["query-other_domain_userid"], "visitor_last_page", msg_payload["query-other_page_title"])
        r.hset(msg_payload["query-other_domain_userid"], "visit_last_page", msg_payload["query-other_page_title"])

    processed_result = msg_payload
    visitor_profile = r.hgetall(msg_payload["query-other_domain_userid"])

    for key,value in visitor_profile.items():
        processed_result["profile_"+key] = value

    print("Send "+msg.key.decode()+"\n")
    producer.send("tracking_persisted",value=processed_result,key=msg.key)
    processed_result = None
    msg_payload = None
    request_query_string = None
    visitor_profile = None

That’s it for this post! In the next one we are going to look at some possibilities to store our data.