What makes Zeitgeist tick
is a prototype developed by 91Èȱ¬ Research & Development to discover and track the most shared 91Èȱ¬ webpages on Twitter. An overview of the project has already covered in our previous post.
Today we're publishing the full source code of this system under the GNU GPLv3 licence on github at .
This post will discuss the technical architecture of the system, how we approached various problems, and our technical learnings from building the system.
From a research point of view, we were particularly interested in two things:
- The Twitter Streaming API and how it worked in practice
- Whether a messaging pipeline architecture would be a good fit for this problem domain
The system consists of an interface to the which passes tweets to a processing pipeline. The pipeline finds and extracts links to the 91Èȱ¬, resolving shortened and redirected urls. Continuously-running background jobs extract page metadata and handle retweets and deletions. Finally, there's a web interface to present the results to end users, which was written using the well-tested software stack of , and , all fronted by a proxy. The Zeitgeist database is , the modelling done using .
The Twitter Streaming API
The
provides an HTTP stream of JSON data containing individual tweets
separated by carriage returns ("\\r"
). The stream is
each of which may contain one or more complete or partial tweets in
any combination.
The is "best effort, unordered and generally at least once". We may get retweets and deletes before the tweets they refer to, as well as duplicates. The API documentation states that "the Streaming API may disconnect your connection at any time, perhaps multiple times per day", so our client needed to recover from this without breaking any subsequent processing.
Twitter's API is really a set of several APIs that perform similar functions, but with different data volumes and searching capabilities. There are two APIs of interest to us for Zeitgeist: the Sample API ('Gardenhose') which provides a subset of the full Twitter stream ('Firehose') and the Filter API which we use to track all tweets containing the word 'bbc'. You can find full details on these APIs .
The Sample API
The Gardenhose delivers tweets at the rate of about 100/second, which is well within the capabilities of our Twitter client. This is written in , using the networking library, helped by and with used to talk to the message queues.
We didn't use an existing Twitter client because we couldn't find one that fit our requirements: there are Twitter clients that use to connect to Twitter and work behind a proxy but they all do too much.
We wanted a client that did the bare minimum: interface with the Gardenhouse API, read the stream, chop it up into individual tweets and put them on a message queue. We didn't want this process to parse JSON, create objects or handle deletes - that's all left until later.
Tracking 'bbc' using the Filter API
We use the same client to handle the Filter API, using a different url. Because the data received is in the same format, we can process the tweets received in the same way. The rate is much lower however: about one every 2 seconds.
Example tweet in JSON
A complete tweet as it arrives from Twitter's API looks like this (in JSON):
{
"contributors": null,
"coordinates": null,
"created_at": "Wed Jul 14 10:34:08 +0000 2010",
"favorited": false,
"geo": null,
"id": 18510029779,
"in_reply_to_screen_name": null,
"in_reply_to_status_id": null,
"in_reply_to_user_id": null,
"place": null,
"source": "<a href=\"https://www.echofon.com/\"
rel=\"nofollow\">Echofon</a>",
"text": "Latest R&D prototype; Zeitgeist - the most shared 91Èȱ¬
links on Twitter https://bit.ly/cbChTL (and read more at
https://bit.ly/bg9Z4Q)",
"truncated": false,
"user": {
"contributors_enabled": false,
"created_at": "Fri Feb 16 12:31:57 +0000 2007",
"description": "Dm7 E7 C?7",
"favourites_count": 17,
"follow_request_sent": null,
"followers_count": 249,
"following": null,
"friends_count": 214,
"geo_enabled": false,
"id": 775474,
"lang": "en",
"location": "London/Surrey/UK",
"name": "Tristan",
"notifications": null,
"profile_background_color": "9ae4e8",
"profile_background_image_url": "https://a1.twimg.com/profile_background_images/905112/PICT2270.JPG",
"profile_background_tile": false,
"profile_image_url": "https://a1.twimg.com/profile_images/23830162/cookinrelaxin_normal.jpg",
"profile_link_color": "0000ff",
"profile_sidebar_border_color": "87bc44",
"profile_sidebar_fill_color": "e0ff92",
"profile_text_color": "000000",
"profile_use_background_image": true,
"protected": false,
"screen_name": "tristanf",
"statuses_count": 1998,
"time_zone": "London",
"url": "https://www.cookinrelaxin.com",
"utc_offset": 0,
"verified": false
}
}
Processing pipeline
Overview
To process the tweets, we use a ) implemented as a set of individual processes reading from and writing to message queues. Like ), this allows us to chain simple processes together, each doing one job. This enables us to split work across multiple identical processes running on different machines, which allows the system to .
We used as the message broker. The message processors are all written in using our own library to abstract the messaging interface. We use the library to talk AMQP with rabbitmq. The data format used throughout the pipeline is .
Here's a simple diagram representing the main pipeline processes and how they interact with the message queues:
When considering the best architecture with which to process the incoming data, it was important to consider the eventual data volumes we were expecting the system to cope with. Twitter state that the Gardenhose sample is of the full Firehose data set. Given that the current average number of tweets/sec is 600, we were expecting about 30/sec on average, with occasional spikes.
In practice, we have seen the number of messages per second range from about 6/sec on a personal ('Spritzer') account to 150/sec on a Gardenhose account during . The average on the Gardenhose is currently about 100-120 messages/second.
We planned to be able to handle \~80 messages/second, which proved to be inadequate during the World Cup spike. Luckily, the messaging architecture meant we were able to quickly provision another host on which to run additional worker processes to balance the load.
Link resolution
A key part of the pipeline is link resolution. By 'resolution' we mean to determine a canonical version of a url. We're not interested in all urls - only those that link to a 91Èȱ¬ page. We want a canonical version so we don't have duplicate entries for the same page. A lot of 91Èȱ¬ links are redirected to a canonical url - for example, News, iPlayer, pages on /programmes. Link shortening services such as bit.ly, which are very popular on Twitter due to the short message length, complicate this by adding yet another level of indirection that Zeitgeist must cope with to determine which tweets are linking to 91Èȱ¬ pages.
So, for example, the text from the example above:
Latest R&D prototype; Zeitgeist - the most shared 91Èȱ¬ links
on Twitter https://bit.ly/cbChTL (and read more at https://bit.ly/bg9Z4Q)
contains two links, "" and "". After extracting and resolving them, we add the following two elements to our data structure for a tweet:
"links": [
"none",
"none"
],
"resolved_links": [
"https://zeitgeist.prototyping.bbc.co.uk/zeitgeist",
"/blogs/researchanddevelopment/2010/07/zeitgeist-the-most-shared-bbc.shtml"
]
representing the original urls as included in the tweet (links
)
and what they really point to (resolved_links
). In this case,
they both point to 91Èȱ¬ pages (we're only interested in these), so a
link entry is made for each in our database.
For each link found:
- if it's a bit.ly link, we look it up using the
- if it's a link to a site we're not interested in for this application (e.g. twitpic), we discard it
- otherwise, we resolve the url to its canonical form and, if it's a 91Èȱ¬ link, save it to the database
To resolve links, in most cases, we can simply request the url
using a HTTP HEAD request and see where it takes us by examining
the HTTP status code we receive back. If we get a 200, then we've
found what we're looking for and can stop. A code in the 30x range
(redirection) means the real page resides at another address
indicated by the accompanying Location
header. In this case, we
read the Location
and call resolve_link
recursively, bumping up
the level count (so we can put an upper limit on how many redirects
we follow - otherwise we could end up in a neverending loop). If we
receive a 40x or 50x range code, then there's something wrong at
the other end or the link was badly formed, so we discard it.
Due to its need to call out to external services, link resolution is by far the most expensive operation in the pipeline and requires the greatest number of worker processes. Most other operations require just one or two processes each to cope with the throughput. Twitter estimate that about 25% of tweets contain links, therefore the system would have to resolve between 8 to 20 links per second. We found that on average resolving a link would take about 0.2 seconds, meaning that we needed 10 workers to handle the anticipated load.
Background jobs
There are two additional processes that need to run on the incoming data after it has been saved to the database: one to determine whether a tweet is a retweet (because we often get a retweet before the tweet it retweets) and one to extract metadata from 91Èȱ¬ pages.
Finding retweets
Not all retweets are equal. Those created by the Twitter website
and compliant clients contain a retweet_status
key and are
handled by the splitter
process. However, many retweets use the
informal convention of RT @<em>user</em> <em>text</em>
. We detect
this using the routine is_retweet?
and some regular expression
magic:
RX_RETWEET = /\bRT\b.*@\s*([a-zA-Z0-9_]+)?(?::)?(.*)/idef is_retweet?
text.match(RX_RETWEET) ? true : false
end
The text is often cut short so we can't do a direct comparison. Instead, we use a heuristic to find the most similar tweet by this user:
def calc_retweet_of
if is_retweet? && u = retweet_user
possibles = u.tweets.map{ |tweet| [tweet, Similarity.similarity(tweet.text, self.text)]}.sort_by{ |t, s| -s}
t = possibles.first # if similarity score better than half, use it
if t[1] > 0.5
t[0]
else
nil
end
else
nil
end
end
where Similarity.similarity
is a routine for ranking variable
length strings by similarity (derived from
).
retweet_user
extracts the @username
from the text of the tweet
and returns the corresponding User
object (if one exists).
Extract metadata
To give the links some context in our prototype, we attempt to extract metadata from the page itself (using for HTML parsing). Most 91Èȱ¬ pages have pretty good metadata in the page headers, including a title, short description of the page and which section it belongs to. We use these in our link descriptions and to categorize links. Like all exercises in HTML page scraping, a fair amount of the code is dedicated to cleaning up data, compensating for missing data and dealing with inconsistent character encodings.
What we learnt
Data sanitisation
There's a lot ofÌý data in the Twitter stream which needs to be sanitised as early as possible before being passed on within the system. Apart from the usual and attacks, there are even . If you are dealing with any data from an external source, be careful before displaying it on a website, saving it to a database or even tailing a log file.
Dealing with load
We know now that we need to measure performance over an extended period of time - a couple of days is not enough. The number of tweets spikes along with external events, such as news stories or television shows. A system which is finely tuned to normal rates of flow can break down catastrophically in the face of a sudden surge of data.
The most important variable to watch is throughput. We need to ensure that we are maximizing the flow of messages through the system. For this, we need application-level monitoring to find out how many messages each process is consuming and how many each is producing. With this data, we can easily pinpoint bottlenecks.
We found that our messaging pipeline can exhibit great sensitivity to variability of input volume and network conditions. Once a queue starts to back up, there are a number of possible outcomes:
- When things return to normal, the system will quickly handle the backlog
- It can take hours to reduce the backlog to zero (which means messages wait in the queue that long - not very real-time)
- The messages will continue to back up until the message broker runs out of memory and crashes
Input volume can be handled by adding more worker processes spread across additional host machines as described above. Then network latency can become the issue. When messages cannot be sent and acknowledged fast enough, the backlog at the head of the queue starts to increase and adding more processes will not help. The lesson is to reduce the amount of machine-to-machine communication over a shared network: a locally networked cluster on its own isolated subnet or back-network is ideal for maintaining low-latency communications between essential services.
Pipelining works
We are pleased with the performance of Ruby with eventmachine: it easily handles the Gardenhose API volumes (approximately 100K/second).
The message processing framework (smqueue) proved to be very flexible, enabling us to easily reconfigure across hosts for load-balancing. The throughput reporting it provides is invaluable for monitoring the health of the system and alerting us to problems. One thing is clear when doing any throughput-critical processing - always avoid writing to disk. Even logging a single line of text to disk once per message can slow a process down enough to cause serious backlogs.
There are certainly improvements we could make: the link resolver spends most of its time waiting on IO, so a more efficient design would be to handle multiple requests at once using eventmachine to handle the IO multiplexing. This would cut down on the number of processes we need. Also, the link resolver does too much - we should split it into simpler processes: one to detect urls; one to detect bit.ly urls; one to resolve links; one to handle redirections, etc. Then we could load balance just the functions that require it.
What's next
Based on our work so far, there's a number of things that we may take forward into further research.
Instead of just throwing away tweets we're not interested in, we could store them all. We could then retrospectively track the appearance of trends and study the propagation of information. For example, we could trace the development of a piece of celebrity gossip or a breaking news story. Storing all this data in real-time poses some interesting challenges. With tweets being roughly 1K in size and arriving at the rate of about 600/sec, we would need to store about 50GB a day. We'd also want to query this data in a useful way.
As load balancing for spikes becomes more important, we will need to temporarily increase our CPU capacity. Using cloud services like Amazon EC2 or Heroku is a possibility. Extending our architecture to seamlessly use diskless cloud-based CPUs would give us this capability. Simulating load on pipelines using this architecture would in itself be an interesting line of research with applications in capacity planning and pipeline design.
The Firehose API generates tweets at the rate of about 600/sec. This is 20 times what we've been dealing with. We think our architecture would scale with some tweaks to improve throughput. We could simulate this using our existing data but the acid test would be to connect to the real thing.
Comment number 1.
At 5th Oct 2010, joeharris76 wrote:This is a *great* post. Thanks.
You mention needing to store and analyse large(ish) data volumes in the last paragraph. I would suggest that you evaluate Infobright (a MySQL storage engine) and LucidDB (java based, more mature SQL support) for this purpose.
They are both columnar analytic databases that will highly compress the data and respond quickly to analytic style queries (e.g. aggregates and calculations). I've seen compression rates with Infobright of up 90%.
Neither database responds well to incremental updates (although LucidDB will cope better). That should not be a problem in your scenario though as the data will be static once your pipeline is complete.
Let me know if you want any help. ;)
Joe
Complain about this comment (Comment number 1)