Building an Enterprise Grade OpenSource Web Analytics System – Part 5: Visitor Profile
This is the fifth part of a seven-part-series explaining how to build an Enterprise Grade OpenSource Web Analytics System. In this post we are going to build a visitor profile to persist some of the data we track with Python and Redis. In the last post we processed the raw data using Python and wrote it back to Kafka. If you are new to this series it might help to start with the first post.
Now that we have a nice processed version of our events, we want to remember certain things about our users. To do this, we are going to create a Visitor Profile in Redis as high performance storage. The process for persisting values will look like this:
Building our Visitor Profile
First things in this part, we are setting up a little helper script that will take our processed tracking events and flatten them. It looks like this, takes events from the processed Kafka topic and writes it to the flattened topic:
import json
from kafka import KafkaConsumer,KafkaProducer
consumer = KafkaConsumer('tracking_processed',bootstrap_servers='[Your Kafka instance]',group_id='python_flattener')
producer = KafkaProducer(bootstrap_servers='[Your Kafka instance]',acks=1,value_serializer=lambda v: json.dumps(v).encode('utf-8'))
for msg in consumer:
print("Received "+msg.key.decode())
msg_payload = json.loads(msg.value)
processed_result = {}
for key,value in msg_payload.items():
for element in value:
processed_result[key+"-"+element["name"]] = element["value"]
print(processed_result)
print("Send "+msg.key.decode()+"\n")
producer.send("tracking_flattened",value=processed_result,key=msg.key)
processed_result = None
msg_payload = None
request_query_string = None
Now we have a new topic that contains a flat JSON with namespaced keys and their value. That will help with some storage destinations and our visitor profile!
With that new topic, let’s build the script that will persist values to Redis for us. The first step is to include the Redis libs in Python and connect to it:
import json
import redis
from kafka import KafkaConsumer,KafkaProducer
consumer = KafkaConsumer('tracking_flattened',bootstrap_servers='[Your Kafka instance]',group_id='python_profiler')
producer = KafkaProducer(bootstrap_servers='[Your Kafka instance]',acks=1,value_serializer=lambda v: json.dumps(v).encode('utf-8'))
r = redis.Redis(host='[Your Redis instance]', charset="utf-8", decode_responses=True)
for msg in consumer:
print("Received "+msg.key.decode())
msg_payload = json.loads(msg.value)
processed_result = {}
This will take items from our flattened topic and connect to Redis, decoding binary values to make them easier handled later on. Next up we check if we already know about the user ID of our visitor in Redis:
if r.exists(msg_payload["query-other_domain_userid"]):
new_visitor = False
visitor_profile = r.hgetall(msg_payload["query-other_domain_userid"])
else:
new_visitor = True
new_visit = True
If we find our current user in Redis, we receive their persisted profile. If not, we set some flags for later on. Depending on those flags, we decide how to proceed:
if new_visitor:
r.hset(msg_payload["query-other_domain_userid"], "visitor_id", msg_payload["query-other_domain_userid"])
r.hset(msg_payload["query-other_domain_userid"], "visit_id", msg_payload["query-other_domain_sessionid"])
else:
if msg_payload["query-other_domain_sessionid"] == visitor_profile["visit_id"]:
new_visit = False
else:
new_visit = True
r.hset(msg_payload["query-other_domain_userid"], "visit_id", msg_payload["query-other_domain_sessionid"])
When our visitor is new to us, we store the visitor and visit IDs to Redis with the visitor ID as key. If we already know the visitor, we have set the visit ID in the past, so we can compare it with our current visit ID. If it is the same ID, we are processing an event from a running visit. If not, we have a new visit and store it’s ID to Redis.
One cool application for our project is determining entry pages. So we are using our flags and profile like this:
if "query-other_eventtype" in msg_payload and msg_payload["query-other_eventtype"]=="Pageview":
if new_visitor:
r.hset(msg_payload["query-other_domain_userid"], "visitor_first_page", msg_payload["query-other_page_title"])
r.hset(msg_payload["query-other_domain_userid"], "visit_first_page", msg_payload["query-other_page_title"])
else:
if new_visit:
r.hset(msg_payload["query-other_domain_userid"], "visit_first_page", msg_payload["query-other_page_title"])
r.hset(msg_payload["query-other_domain_userid"], "visitor_last_page", msg_payload["query-other_page_title"])
r.hset(msg_payload["query-other_domain_userid"], "visit_last_page", msg_payload["query-other_page_title"])
This code checks if the current event is a Pageview. If it is, we set some variables in Redis to remember the first page for a visit or visitor as well as the last page. That way we can easily analyze our users behavior in regards to how they started their journey! There is still a lot more usecases to be explored, but they all come down to some logic like above.
Now all we need to do is add the profile information to the record and send it off to Kafka again, this time to the “tracking_persisted” topic:
processed_result = msg_payload
visitor_profile = r.hgetall(msg_payload["query-other_domain_userid"])
for key,value in visitor_profile.items():
processed_result["profile_"+key] = value
print("Send "+msg.key.decode()+"\n")
producer.send("tracking_persisted",value=processed_result,key=msg.key)
processed_result = None
msg_payload = None
request_query_string = None
visitor_profile = None
Now our complete code looks like this:
import json
import redis
from kafka import KafkaConsumer,KafkaProducer
consumer = KafkaConsumer('tracking_flattened',bootstrap_servers='[Your Kafka instance]',group_id='python_profiler')
producer = KafkaProducer(bootstrap_servers='[Your Kafka instance]',acks=1,value_serializer=lambda v: json.dumps(v).encode('utf-8'))
r = redis.Redis(host='[Your Redis instance]', charset="utf-8", decode_responses=True)
for msg in consumer:
print("Received "+msg.key.decode())
msg_payload = json.loads(msg.value)
processed_result = {}
if r.exists(msg_payload["query-other_domain_userid"]):
new_visitor = False
visitor_profile = r.hgetall(msg_payload["query-other_domain_userid"])
else:
new_visitor = True
new_visit = True
if new_visitor:
r.hset(msg_payload["query-other_domain_userid"], "visitor_id", msg_payload["query-other_domain_userid"])
r.hset(msg_payload["query-other_domain_userid"], "visit_id", msg_payload["query-other_domain_sessionid"])
else:
if msg_payload["query-other_domain_sessionid"] == visitor_profile["visit_id"]:
new_visit = False
else:
new_visit = True
r.hset(msg_payload["query-other_domain_userid"], "visit_id", msg_payload["query-other_domain_sessionid"])
if "query-other_eventtype" in msg_payload and msg_payload["query-other_eventtype"]=="Pageview":
if new_visitor:
r.hset(msg_payload["query-other_domain_userid"], "visitor_first_page", msg_payload["query-other_page_title"])
r.hset(msg_payload["query-other_domain_userid"], "visit_first_page", msg_payload["query-other_page_title"])
else:
if new_visit:
r.hset(msg_payload["query-other_domain_userid"], "visit_first_page", msg_payload["query-other_page_title"])
r.hset(msg_payload["query-other_domain_userid"], "visitor_last_page", msg_payload["query-other_page_title"])
r.hset(msg_payload["query-other_domain_userid"], "visit_last_page", msg_payload["query-other_page_title"])
processed_result = msg_payload
visitor_profile = r.hgetall(msg_payload["query-other_domain_userid"])
for key,value in visitor_profile.items():
processed_result["profile_"+key] = value
print("Send "+msg.key.decode()+"\n")
producer.send("tracking_persisted",value=processed_result,key=msg.key)
processed_result = None
msg_payload = None
request_query_string = None
visitor_profile = None
That’s it for this post! In the next one we are going to look at some possibilities to store our data.

German Analyst and Data Scientist working in and writing about (Web) Analytics and Online Marketing Tech.