Posted on Wed, 16 May 2012 16:55:00 +0000
Last Friday at our Apache Pig Hackathon, we open-sourced Twitter Ambrose, a tool which helps authors of large-scale data workflows keep track of the overall status of a workflow and visualize its progress.
Ambrose was hatched at our last Hack Week by Bill Graham ( @billonahill) and Andy Schlaikjer ( @sagemintblue), which focused on internal tools and developer efficiency. At Twitter, we develop complex workflows to analyze massive data sets generated by our platform. Our engineers create these workflows using a variety of tools and languages, including Pig and Scalding. One difficulty many of us face when using these tools is observability: when a Pig script is executed, multiple MapReduce jobs might be launched, either in parallel or in a serial fashion if one job depends on the output of another. As these jobs run, the status of individual jobs can be monitored with the Hadoop Job Tracker UI, but overall progress of the script can be difficult to keep track of. With Ambrose, the real-time status of a complex series of MapReduce jobs can be visualized succinctly, so that we can quickly understand how far computation has progressed and diagnose failures in context.

In this screenshot, we see the Ambrose UI for a workflow compiled from a single Pig script. The circular chord diagram in the upper left highlights dependencies between jobs. As a job’s status changes, the color of its arc in the diagram changes. Statistics for the job most recently started are displayed to the right of the chord diagram. Summary information and status of all jobs is displayed in the table beneath these two views.
At the moment it only works with Pig; however, the framework is extensible and allows support for other other runtimes. We plan to support Cascading and Scalding, but we welcome patches for other runtimes as well. Ambrose also relies on a number of other great open-source projects including Jetty, D3.js, and Twitter Bootstrap.
In its current form Ambrose is still early in development and has a growing list of features we’d love to add, but we’ve open sourced it to develop Ambrose in the open and get community feedback. We encourage you to download it and let us know what you think. If you’re interested in working on and evolving data visualization tools like Ambrose, join the flock. In the end, we’d love to hear your feedback — Tweet us at @Ambrose or file an issue.
- Chris Aniszczyk, Manager of Open Source ( @cra)..
Details:
http://engineering.twitter.com/2012/05/visualize-data-workflows-with-ambrose.html
|
Posted on Fri, 11 May 2012 18:10:00 +0000
As you may have noticed, searches on twitter.com, Twitter for iOS, and Twitter for Android now have spelling corrections and related queries next to the search results.
At the core of our related queries and spelling correction service is a simple mechanism: if we see query A in some context, and then see query B in the same context, we think they're related. If A and B are similar, B may be a spell-corrected version of A; if they're not, it may be interesting to searchers who find A interesting. We use both query sessions and tweets for context; if we observe a user typing [justin beiber] and then, within the same session, typing [justin bieber], we'll consider the second query as a possible spelling correction to the first — and if the same session will also contain [selena gomez], we may consider this as a related query to the previous queries. The data we process is anonymized — we don’t track which queries are issued by a given user, only that the same (unknown) user has issued several queries in a row, or continuously tweeted.
To measure the similarity between queries, we use a variant of Edit Distance tailored to Twitter queries; for example, in our variant we treat the beginning and end characters of a query differently from the inner characters, as spelling mistakes tend to be concentrated in those. Our variant also treats special Twitter characters (such as @ and #) differently from other characters, and has other differences from the vanilla Edit Distance. To measure the quality of the suggestions, we use a variety of signals including query frequencies (of the original query and the suggestion), statistical correlation measures such as log-likelihood, the quality of the search results for the suggestion, and others.
Twitter’s spelling correction has a number of unique challenges: searchers frequently type in usernames or hashtags that are not well-formed English words; there is a real-time constancy of new lingo and terms supplied by our own users; and we want to help people find those in order to join in the conversation. To address all of these issues, on top of our context-based mechanism, we also index dictionaries of trending queries and popular users that are likely to be misspelled, and use Lucene's built-in spelling correction library (tweaked to better serve our needs) to identify misspelling and retrieve corrections for queries.

Initially, we started computing-related queries and spelling correction in a batch service, periodically updating our user-facing service with the latest data. But we've noticed that the lag this process introduced resulted in a less-than-optimal experience — it would take several hours for the models to adapt to new search trends. We then rewrote the entire service, this time as an online, real-time one. Queries and tweets are tracked as they come, and our models are continuously updated, just like the search results themselves. To account for the longer tail of queries that has less context from recent hours, we combine the real-time, up-to-date model with a background model computed in the same manner, but over several months of data (and updated daily).

Within the first two weeks of launching our related queries and spelling corrections in late April, we've corrected 5 million queries and provided suggestions to 100 million more. We're very encouraged by the high engagement rates we're seeing so far on both features.
We're working on more ways to help you find and discover the most relevant and engaging content in real time, so stay tuned. There are other big improvements we’ll be rolling out to Twitter search over the coming weeks and months.
Acknowledgments
The system was built by Gilad Mishne ( @gilad), Zhenghua Li ( @zhenghuali) and Tian Wang ( @wangtian) with help from the entire Twitter Search team. Thanks also to Jeff Dalton ( @jeffd) for initial explorations and to Aneesh Sharma ( @aneeshs) for help with the design...
Details:
http://engineering.twitter.com/2012/05/related-queries-and-spelling.html
|
Posted on Thu, 10 May 2012 16:27:00 +0000
At Twitter, Apache Mesos runs on hundreds of production machines and makes it easier to execute jobs that do everything from running services to handling our analytics workload. For those not familiar with it, the Mesos project originally started as a UC Berkeley research effort. It is now being developed at the Apache Software Foundation (ASF), where it just reached its first release inside the Apache Incubator.
Mesos aims to make it easier to build distributed applications and frameworks that share clustered resources like, CPU, RAM or hard disk space. There are Java, Python and C++ APIs for developing new parallel applications. Specifically, you can use Mesos to:
- Run Hadoop, Spark and other frameworks concurrently on a shared pool of nodes
- Run multiple instances of Hadoop on the same cluster to isolate production and experimental jobs, or even multiple versions of Hadoop
- Scale to 10,000s of nodes using fast, event-driven C++ implementation
- Run long-lived services (e.g., Hypertable and HBase) on the same nodes as batch applications and share resources between them
- Build new cluster computing frameworks without reinventing low-level facilities for farming out tasks, and have them coexist with existing ones
- View cluster status and information using a web user interface
Mesos is being used at Conviva, UC Berkeley and UC San Francisco, as well as here. Some of our runtime systems engineers, specifically Benjamin Hindman ( @benh), Bill Farner ( @wfarner), Vinod Kone ( @vinodkone), John Sirois ( @johnsirois), Brian Wickman ( @wickman), and Sathya Hariesh ( @sathya) have worked hard to evolve Mesos and make it useful for our scalable engineering challenges. If you’re interested in Mesos, we invite you to try it out, follow @ApacheMesos, join the mailing list and help us develop a Mesos community within the ASF.
— Chris Aniszczyk, Manager of Open Source ( @cra)..
Details:
http://engineering.twitter.com/2012/05/incubating-apache-mesos.html
|
Posted on Wed, 02 May 2012 17:05:00 +0000
This year, we are participating in the Google Summer of Code (GSoC) program for the first time. If you’re not familiar with it, GSoC is a global program for aspiring student developers interested in learning and developing open source technology. It functions like a typical engineering internship where Google offers stipends for students to write code for various open source software projects and mentoring organizations.
Unlike most mentoring organizations participating in GSoC, Twitter contributes to a number of open source projects that span domains, organizations and involve a variety of programming languages. We received many applications this year for GSoC — and in the end, had to make some tough decisions. We’ve chosen three students to hack with us over the summer. Here they are:
Federico Brubacher ( @fbru02)
Federico has been programming since he was 6; he’s currently finishing up his MS in Computer Science at ORT Uruguay. He will be building scalable, online machine learning algorithms on top of Storm, which powers portions of our Analytics platform. The project will be analogous to Mahout for Hadoop, except it’s focused on real-time machine learning. He will be mentored by Nathan Marz ( @nathanmarz), the creator of Storm.
Kirill Lashuk ( @KL_7)
Kirill is studying Math and Computer Science at the Belarusian State University in Minsk. Beyond that, he’s currently learning how to snowboard. He likes to go swimming and hack on open source projects. This summer, he will be adding more localization capabilities to TwitterCLDR. He will be mentored by one of our internationalization engineers, Cameron Dutro ( @camertron)
Ruben Oanta ( @rubeydoo)
Ruben is currently finishing his MS in Computer Science at DePaul University and in his spare time enjoys cycling and ultimate frisbee. He’ll be working on adding MySQL support to Finagle, a protocol-agnostic library that abstracts the complicated details of asynchronous RPC communication, and is used widely within Twitter. He will be mentored by Marius Eriksen ( @marius) with an additional special guest mentor from Tumblr, Blake Matheny ( @bmatheny).
We are excited about working with these three students over the summer, and will report back on their progress at the end of the program.
— Chris Aniszczyk, Manager of Open Source ( @cra)..
Details:
http://engineering.twitter.com/2012/05/summer-of-code-at-twitter.html
|
Posted on Tue, 01 May 2012 16:20:00 +0000
We are beginning to roll out a new version of the Discover tab that is even more personalized for you. We’ve improved our personalization algorithms to incorporate several new signals including the accounts you follow and whom they follow. All of this social data is used to understand your interests and display stories that are relevant to you in real-time.
Behind the scenes, the new Discover tab is powered by Earlybird, Twitter's real-time search technology. When a user tweets, that Tweet is indexed and becomes searchable in seconds. Every Tweet with a link also goes through some additional processing: we extract and expand any URLs available in Tweets, and then fetch the contents of those URLs via SpiderDuck, our real-time URL fetcher.
To generate the stories that are based on your social graph and that we believe are most interesting to you, we first use Cassovary, our graph processing library, to identify your connections and rank them according to how strong and important those connections are to you.
Once we have that network, we use Twitter's flexible search engine to find URLs that have been shared by that circle of people. Those links are converted into stories that we’ll display, alongside other stories, in the Discover tab. Before displaying them, a final ranking pass re-ranks stories according to how many people have tweeted about them and how important those people are in relation to you. All of this happens in near-real time, which means breaking and relevant stories appear in the new Discover tab almost as soon as people start talking about them.
Our NYC engineering team, led by Daniel Loreto (@DanielLoreto), along with Julian Marinus (@fooljulian), Alec Thomas (@alecthomas), Dave Landau (@landau), and Ugo Di Girolamo (@ugodiggi), is working hard on Discover to create new ways to bring you instantly closer to the things you care about. This update is just the beginning of this ongoing effort.
- Ori Allon, Director of Engineering (@oriallon) ..
Details:
http://engineering.twitter.com/2012/05/discover-improved-personalization.html
|
Posted on Thu, 19 Apr 2012 18:00:00 +0000
Open source is a pervasive part of our culture. Many projects at Twitter rely on open source technologies, and as we evolve as a company, our commitment to open source continues to increase. Today, we are becoming an official sponsor of the Apache Software Foundation (ASF), a non-profit and volunteer-run open source foundation.
The ASF provides organizational, legal, and financial support for a broad range of open source software projects that Twitter consumes and contributes to. One example is the Mesos project, which is now being developed inside the ASF Incubator and is nearing its first official release. Within Twitter, Mesos runs on hundreds of production machines and makes it easier to execute clustered jobs that do everything from running services to handling our analytics workload.
Sponsoring the ASF is not only the right thing to do, it will help us sustain our existing projects at the ASF by supporting the foundation’s infrastructure. We have a long history of contributing to Apache projects, including not only Mesos, but also Cassandra, Hadoop, Mahout, Pig and more. As Twitter grows, we look to further our commitment to the success of the ASF and other open source organizations.
On behalf of the Twitter Open Source Office,
- Chris Aniszczyk ( @cra)..
Details:
http://engineering.twitter.com/2012/04/sponsoring-apache-foundation.html
|
Posted on Tue, 17 Apr 2012 17:00:00 +0000
Cross-posted on the Twitter Blog.
One of the great things about Twitter is working with so many talented folks who dream up and build incredible products day in and day out. Like many companies, we apply for patents on a bunch of these inventions. However, we also think a lot about how those patents may be used in the future; we sometimes worry that they may be used to impede the innovation of others. For that reason, we are publishing a draft of the Innovator’s Patent Agreement, which we informally call the “IPA”.
The IPA is a new way to do patent assignment that keeps control in the hands of engineers and designers. It is a commitment from Twitter to our employees that patents can only be used for defensive purposes. We will not use the patents from employees’ inventions in offensive litigation without their permission. What’s more, this control flows with the patents, so if we sold them to others, they could only use them as the inventor intended.
This is a significant departure from the current state of affairs in the industry. Typically, engineers and designers sign an agreement with their company that irrevocably gives that company any patents filed related to the employee’s work. The company then has control over the patents and can use them however they want, which may include selling them to others who can also use them however they want. With the IPA, employees can be assured that their patents will be used only as a shield rather than as a weapon.
We will implement the IPA later this year, and it will apply to all patents issued to our engineers, both past and present. We are still in early stages, and have just started to reach out to other companies to discuss the IPA and whether it might make sense for them too. In the meantime, we’ve posted the IPA on GitHub with the hope that you will take a look, share your feedback and discuss with your companies. And, of course, you can #jointheflock and have the IPA apply to you.
Today is the second day of our quarterly Hack Week, which means employees – engineers, designers, and folks all across the company – are working on projects and tools outside their regular day-to-day work. The goal of this week is to give rise to the most audacious and creative ideas. These ideas will have the greatest impact in a world that fosters innovation, rather than dampening it, and we hope the IPA will play an important part in making that vision a reality.
- Adam Messinger, VP of Engineering (@adam_messinger) ..
Details:
http://engineering.twitter.com/2012/04/introducing-innovators-patent-agreement.html
|
Posted on Mon, 09 Apr 2012 17:00:00 +0000
MySQL is the persistent storage technology behind most Twitter data: the interest graph, timelines, user data and the Tweets themselves. Due to our scale, we push MySQL a lot further than most companies. Of course, MySQL is open source software, so we have the ability to change it to suit our needs. Since we believe in sharing knowledge and that open source software facilitates innovation, we have decided to open source our MySQL work on GitHub under the BSD New license.
The objectives of our work thus far has primarily been to improve the predictability of our services and make our lives easier. Some of the work we’ve done includes:
- Add additional status variables, particularly from the internals of InnoDB. This allows us to monitor our systems more effectively and understand their behavior better when handling production workloads.
- Optimize memory allocation on large NUMA systems: Allocate InnoDB's buffer pool fully on startup, fail fast if memory is not available, ensure performance over time even when server is under memory pressure.
- Reduce unnecessary work through improved server-side statement timeout support. This allows the server to proactively cancel queries that run longer than a millisecond-granularity timeout.
- Export and restore InnoDB buffer pool in using a safe and lightweight method. This enables us to build tools to support rolling restarts of our services with minimal pain.
- Optimize MySQL for SSD-based machines, including page-flushing behavior and reduction in writes to disk to improve lifespan.
We look forward sharing our work with upstream and other downstream MySQL vendors, with a goal to improve the MySQL community. For a more complete look at our work, please see the change history and documentation.
If you want to learn more about our usage of MySQL, we will be speaking about Gizzard, our sharding and replication framework on top of MySQL, at the Percona Live MySQL Conference and Expo on April 12th. Finally, contact us on GitHub or file an issue if you have questions.
On behalf of the Twitter DBA and DB development teams,
- Jeremy Cole (@jeremycole)
- Davi Arnaut (@darnaut)..
Details:
http://engineering.twitter.com/2012/04/mysql-at-twitter.html
|
Posted on Thu, 22 Mar 2012 21:40:00 +0000
The past few months have been busy for the Twitter security team: we’ve turned on HTTPS by default for everyone, added great engineers from Whisper Systems and Dasient, and had some stimulating internal discussions about how we can continue to better protect users. We want to share what we’ve been up to and discuss the world of online security, so we’ll be hosting a Security Open House on March 29 here at Twitter HQ. We’ve got a great lineup of speakers to get the conversations going:
Neil Daswani (@neildaswani): Online fraud and mobile application abuse
Jason Wiley (@capnwiley) & Dino Fekaris (@dino): Twitter phishing vectors and the fallout
Neil Matatall (@nilematotle): Brakeman: detecting security vulnerabilities in Ruby on Rails applications via static analysis
Come by to meet our Security team, hear about some of our work, and learn about opportunities to join the flock at the first #TwitterSec. Here’s what you need to know to get yourself signed up:
When: Thursday, March 29, 2012; 5:30pm - 9:00pm
Where: Twitter HQ - 795 Folsom Street, San Francisco, CA
Who: Security and privacy engineers
RSVP: Space is limited, so reserve your spot now. Hope to see you here! ..
Details:
http://engineering.twitter.com/2012/03/past-few-months-have-been-busy-for.html
|
Posted on Thu, 08 Mar 2012 19:18:00 +0000
We are open sourcing Cassovary, a big graph-processing library for the Java Virtual Machine (JVM) written in Scala. Cassovary is designed from the ground up to efficiently handle graphs with billions of edges. It comes with some common node and graph data structures and traversal algorithms. A typical usage is to do large-scale graph mining and analysis.
At Twitter, Cassovary forms the bottom layer of a stack that we use to power many of our graph-based features, including "Who to Follow" and “Similar to.” We also use it for relevance in Twitter Search and the algorithms that determine which Promoted Products users will see. Over time, we hope to bring more non-proprietary logic from some of those product features into Cassovary.
Please use, fork, and contribute to Cassovary if you can. If you have any questions, ask on the mailing list or file issues on GitHub. Also, follow @cassovary for updates.
-Pankaj Gupta (@pankaj) ..
Details:
http://engineering.twitter.com/2012/03/cassovary-big-graph-processing-library.html
|
Posted on Fri, 02 Mar 2012 21:54:00 +0000
Scalding is an in-house MapReduce framework that Twitter recently open-sourced. Like Pig, it provides an abstraction on top of MapReduce that makes it easy to write big data jobs in a syntax that's simple and concise. Unlike Pig, Scalding is written in pure Scala -- which means all the power of Scala and the JVM is already built-in. No more UDFs, folks!
At Twitter, our mission is to instantly connect people everywhere to what's most meaningful to them. With over a hundred million active users creating more than 250 million tweets every day, this means we need to quickly analyze massive amounts of data at scale.
That's why we recently open-sourced Scalding, an in-house MapReduce framework built on top of Scala and Cascading.
In 140: Instead of forcing you to write raw map and reduce functions, Scalding allows you to write natural code like:
Simple to read, and just as easily run over a 10 line test file as a 10 terabyte data source in Hadoop!
Like Twitter, Scalding has a powerful simplicity that we love, and in this post we'll use the example of building a basic recommendation engine to show you why. A couple of notes before we begin:
- Scalding is open-source and lives here on Github.
- For a longer, tutorial-based version of this post (which goes more in-depth into the code and mathematics), see the original blog entry.
We use Scalding hard and we use it often, for everything from custom ad targeting algorithms to PageRank on the Twitter graph, and we hope you will too. Let's dive in!
Movie similarities
Imagine you run an online movie business. You have a rating system in place (people can rate movies with 1 to 5 stars) and you want to calculate similarities between pairs of movies, so that if someone watches The Lion King, you can recommend films like Toy Story.
One way to define the similarity between two movies is to use their correlation:
- For every pair of movies A and B, find all the people who rated both A and B.
- Use these ratings to form a Movie A vector and a Movie B vector.
- Calculate the correlation between these two vectors.
- Whenever someone watches a movie, you can then recommend the movies most correlated with it.
Here's a snippet illustrating the code.
Notice that Scalding provides higher-level functions like group for you (and many others, too, like join and filter), so that you don't have to continually rewrite these patterns yourself. What's more, if there are other abstractions you'd like to add, go ahead! It's easy to add new functions.
Rotten Tomatoes
Let's run this code over some real data. What dataset of movie ratings should we use?
People love to tweet whenever they rate a movie on Rotten Tomatoes, so let's use these ratings to generate our recommendations!
After grabbing and parsing these tweets, we can run a quick command using the handy scald.rb script that Scalding provides.
And minutes later, we're done!
As we'd expect, we see that
- Lord of the Rings, Harry Potter, and Star Wars movies are similar to other Lord of the Rings, Harry Potter, and Star Wars movies
- Big science fiction blockbusters (Avatar) are similar to big science fiction blockbusters (Inception)
- People who like one Justin Timberlake movie (Bad Teacher) also like other Justin Timberlake Movies (In Time). Similarly with Michael Fassbender (A Dangerous Method, Shame)
- Art house movies (The Tree of Life) stick together (Tinker Tailor Soldier Spy)
Just for fun, let's also look at the movies with the most negative correlation:
The more you like loud and dirty popcorn movies (Thor) and vamp romance (Twilight), the less you like arthouse? Sounds good to me.
Check-in similarities with Foursquare
Scalding also makes it easy to abstract away our input format, so that we can grab data from wherever we want. Tweets, TSVs, MySQL tables, HDFS -- no problem! And there's no reason our code needs to be tied to movie recommendations in particular, so let's switch it up.
For example, let's say we want to generate restaurant or tourist recommendations, and we have a bunch of information on who visits each location.
Here, we simply create a new class that scrapes tweets for Foursquare check-in information...
...and bam! Here are locations similar to the Empire State Building:
Here are places you might want to check out, if you check-in at Bergdorf Goodman:
And here's where to go after the Statue of Liberty:
Learn more about Scalding
Hopefully this post gave you a taste of the awesomeness of Scalding. To learn more:
Acknowledgements
-Edwin Chen(@edchedch). A huge shoutout to Argyris Zymnis (@argyris), Avi Bryant (@avibryant), and Oscar Boykin (@posco), the mastermind hackers who have spent (and continue spending) unimaginable hours making Scalding a joy to use. ..
Details:
http://engineering.twitter.com/2012/03/generating-recommendations-with.html
|
Posted on Tue, 21 Feb 2012 18:32:00 +0000
The iPhone was revolutionary for its use of direct manipulation – the feeling that you’re really holding content in your hands and manipulating it with your fingertips. While many mobile platforms have touch, it is the realistic physics and fluid animation of the iPhone that sets it apart from its competitors.
However, jerky scrolling ruins the experience. The new UI of Twitter for iPhone 4.0 contains many details that could impact performance, so we had to treat 60 frame-per-second animation as a priority. If you are troubleshooting animation performance, this post should provide some useful pointers.
A review of layers
Animation on iOS is powered by Core Animation layers. Layers are a simple abstraction for working with the GPU. When animating layers, the GPU just transforms surfaces as an extended function of the hardware itself. However, the GPU is not optimized for drawing. Everything in your view’s drawRect: is handled by the CPU, then handed off to the GPU as a texture.
Animation problems fall into one of those two phases in the pipeline. Either the GPU is being taxed by expensive operations, or the CPU is spending too much time preparing the cell before handing it off to the GPU. The following sections contain simple directives, based on how we addressed each of these challenges.
GPU bottlenecks
When the GPU is overburdened, it manifests with low, but consistent, framerates. The most common reasons may be excessive compositing, blending, or pixel misalignment. Consider the following Tweet:
Use direct drawing
A naive implementation of a Tweet cell might include a UILabel for the username, a UILabel for the tweet text, a UIImageView for the avatar, and so on.
Unfortunately, each view burdens Core Animation with extra compositing.
Instead, our Tweet cells contain a single view with no subviews; a single drawRect: draws everything.
We institutionalized direct drawing by creating a generic table view cell class that accepts a block for its drawRect:method. This is, by far, the most commonly used cell in the app.
Avoid blending
You’ll notice that Tweets in Twitter for iPhone 4.0 have a drop shadow on top of a subtle textured background. This presented a challenge, as blending is expensive.
We solved this by reducing the area Core Animation has to consider non-opaque, by splitting the shadow areas from content area of the cell.
To quickly spot blending, select the Color Blended Layers option under Instruments in the Core Animation instrument. The green area indicates opaque; the red areas point to blended surfaces.
Check pixel alignment
Spot the danger in the following code:
CGRect subframe = CGRectMake(x, y, width / 2.0, height / 2.0);
If width is an odd number, then subFrame will have a fractional width. Core Animation will accept this, but it will require anti-aliasing, which is expensive. Instead, run floor or ceil on computed values.
In Instruments, check Color Misaligned Images to hunt for accidental anti-aliasing.
Cell preparation bottlenecks
The second class of animation problem is called a “pop” and occurs when new cells scroll into view. When a cell is about to appear on screen, it only has 17 milliseconds to provide content before you’ve dropped a frame.
Recycle cells
As described in the table view documentation, instead of creating and destroying cell objects whenever they appear or disappear, you should recycle cells with the help of dequeueReusableCellWithIdentifier:
Optimize your drawRect:
If you are direct drawing and recycling cells, and you still see a pop, check the time of your drawRect: under Instruments in Core Animation. If needed, eliminate “nice to have” details, like subtle gradients.
Pre-render if necessary
Sometimes, you can’t simplify drawing. The new #Discover tab in Twitter for iPhone 4.0 displays large images in cells. No matter how simple the treatment, scaling and cropping a large image is expensive.
We knew #Discover had an upper bound of ten stories, so we decided to trade memory for CPU. When we receive a trending story image we pre-render the cell on a low-priority background queue, and store it in a cache. When the cell scrolls into view, we set the cell’s layer.contents to the prebaked CGImage, which requires no drawing.
Conclusion
All of these optimizations come at the cost of code complexity and developer productivity. So long as you don’t paint yourself into a corner in architecture, you can always apply these optimizations after you’ve written the simplest thing that works and collected actual measurements on hardware.
Remember: Premature optimization is the root of all evil.
Acknowledgements
-Ben Sandofsky (@ sandofsky), Ryan Perry (@ ryfar) for technical review, and the Twitter mobile team for their input...
Details:
http://engineering.twitter.com/2012/02/simple-strategies-for-smooth-animation.html
|
Posted on Wed, 08 Feb 2012 15:00:00 +0000
When we opened an office in New York City last October, we were excited to become a part of the city’s growing tech community, with all of its energy and innovation. Since then, we’ve been building out an engineering team in New York. Focused on search and discovery, the team works to find ways to extract value out of the more than 250 million Tweets people send every day.
We want to share some of the exciting projects that we’ve been working on, so we’re holding the first #TwitterNYC Engineering Open House. Come by to meet our engineering team, see some of our work, and learn about opportunities to #jointheflock!
When Thursday, February 16, 2012; 7 pm - 9 pm
Where Twitter NYC
Speakers
If you’re interested in attending, please send your name and company affiliation to openhouse@twitter.com. Space is very limited. If we have space for you, you’ll get a confirmation with more details, including the office address. If you don’t get in this time, we’ll notify you about future events. ..
Details:
http://engineering.twitter.com/2012/02/twitter-nyc-open-house.html
|
Posted on Fri, 27 Jan 2012 00:18:00 +0000
Next week, we will host our first Open House to present the achievements of the Twitter Translation Center, a community platform for translating Twitter's products. Since it's launch in early 2011, we have released Twitter in 22 languages, up from just six in 2010.
The amazing level of activity in the Twitter Translation Center has driven us to explore new avenues to scale and deliver quality translation to our international users. For instance, we’ve learned how to work with a community of 425,000 translators, who have collectively produced more than one million separate translations and voted close to five million times on those translations. Recognizing this growth, we’ve added a lot of features to make the Translation Center a better platform for community translation; these innovations include forums, translation search, translation memory, glossary management, moderation tools, and spam and cross-site scripting prevention tools.
We’ve also learnt that introducing a new language requires more than just translations. For each language, we had to ensure that we supported the appropriate date and number formats, that hashtags and URLs could be properly extracted from Tweets in those languages, and that we correctly counted the number of characters in each Tweet.
In order to deliver quality, we built tools to test localized versions of our applications before a launch. Given that all of our translators are volunteers, we wanted to give our translator community a chance to review and test their output before they released it to our users.
What’s next?
Twitter’s impact on the world inspires us to look for new ways to connect and interact with our international users. We continue to address the challenges and lessons on how best to serve community localization at Twitter. To hear more about these tools and topics, join us for an evening of technical discussions and networking during our first International Engineering Open House.
When Thursday, February 2, 2012; 7 pm - 9 pm
Where Twitter HQ, 795 Folsom St, Suite 600, San Francisco, CA 94107
Speakers - Nico Sallembien (@nsallembien): A look at Unicode character distribution in Tweets
- Laura Gomez (@laura): Community Localization and Twitter: Experience, Engagement, and Scalability
- Yoshimasa Niwa (@niw): Software development for Japanese mobile phones
-@international ..
Details:
http://engineering.twitter.com/2012/01/join-flock-twitters-international.html
|
Posted on Mon, 14 Nov 2011 18:44:00 +0000
Tweets often contain URLs or links to a variety of content on the web, including images, videos, news articles and blog posts. SpiderDuck is a service at Twitter that fetches all URLs shared in Tweets in real-time, parses the downloaded content to extract metadata of interest and makes that metadata available for other Twitter services to consume within seconds. Several teams at Twitter need to access the linked content, typically in real-time, to improve Twitter products. For example:
- Search to index resolved URLs and improve relevance
- Clients to display certain types of media, such as photos, next to the Tweet
- Tweet Button to count how many times each URL has been shared on Twitter
- Trust & Safety to aid in detecting malware and spam
- Analytics to surface a variety of aggregated statistics about links shared on Twitter
BackgroundPrior to SpiderDuck, Twitter had a service that resolved all URLs shared in Tweets by issuing HEAD requests and following redirects. While this service was simple and met the needs of the company at the time, it had a few limitations:
- It resolved the URLs but did not actually download the content.
The resolution information was stored in an in-memory cache but not persisted durably to disk. This meant that if the in-memory cache instance was restarted, data would be lost.
- It did not implement politeness rules typical of modern bots, for example, rate limiting and following robots.txt directives.
Clearly, we needed to build a real URL fetcher that overcame the above limitations and would meet the company’s needs in the long term. Our first thought was to use or build on top of an existing open source URL crawler. We realized though that almost all of the available crawlers have two properties that we didn't need:
- They are recursive crawlers. That is, they are designed to fetch pages and then recursively crawl the links extracted from those pages. Recursive crawling involves significant complexity in crawl scheduling and long term queuing, which isn’t relevant to our use case.
- They are optimized for large batch crawls. What we needed was a fast, real-time URL fetcher.
Therefore, we decided to design a new system that could meet Twitter’s real-time needs and scale horizontally with its growth. Rather than reinvent the wheel, we built the new system largely on top of open source building blocks, thus still leveraging the contributions of the open source community. This is typical of many engineering problems at Twitter – while they resemble problems at other large Internet companies, the requirement that everything work in real-time introduces unique and interesting challenges.
System OverviewHere’s an overview of how SpiderDuck works. The following diagram illustrates its main components. The SpiderDuck architecture
Kestrel: This is message queuing system widely used at Twitter for queuing incoming Tweets. Schedulers: These jobs determine whether to fetch a URL, schedule the fetch, follow redirect hops if any. After the fetch, they parse the downloaded content, extract metadata, and write the metadata to the Metadata Store and the raw content to the Content Store. Each scheduler performs its work independently of the others; that is, any number of schedulers can be added to horizontally scale the system as Tweet and URL volume grows. Fetchers: These are Thrift servers that maintain short-term fetch queues of URLs, issue the actual HTTP fetch requests and implement rate limiting and robots.txt processing. Like the Schedulers, Fetchers scale horizontally with fetch rate. Memcached: This is a distributed cache used by the fetchers to temporarily store robots.txt files. Metadata Store: This is a Cassandra-based distributed hash table that stores page metadata and resolution information keyed by URL, as well as fetch status for every URL recently encountered by the system. This store serves clients across Twitter that need real-time access to URL metadata. Content Store: This is an HDFS cluster for archiving downloaded content and all fetch information.
We will now describe the two main components of SpiderDuck -- the URL Scheduler and the URL Fetcher -- in more detail. The URL Scheduler
The following diagram illustrates the various stages of processing in the SpiderDuck Scheduler. The URL Scheduler
Like most of SpiderDuck, the Scheduler is built on top of an open source asynchronous RPC framework developed at Twitter called Finagle. (In fact, this was one of the earliest projects to utilize Finagle.) Each box in the diagram above, except for the Kestrel Reader, is a Finagle Filter – an abstraction that allows a sequence of processing stages to be easily composed into a fully asynchronous pipeline. Being fully asynchronous allows SpiderDuck to handle high throughput with a small, fixed number of threads. The Kestrel Reader continuously polls for new Tweets. As Tweets come in, they are sent to the Tweet Processor, which extracts URLs from them. Each URL is then sent to the Crawl Decider stage. This stage reads the Fetch Status of the URL from the Metadata Store to check if and when SpiderDuck has seen the URL before. The Crawl Decider then decides whether the URL should be fetched based on a pre-defined fetch policy (that is, do not fetch if SpiderDuck has fetched it in the past X days). If the Decider determines to not fetch the URL, it logs the status to indicate that processing is complete. If it determines to fetch the URL, it sends the URL to the Fetcher Client stage. The Fetcher Client stage uses a client library to talk to the Fetchers. The client library implements the logic that determines which Fetcher will fetch a given URL; it also handles the processing of redirect hops. (It is typical to have a chain of redirects because URLs posted on Twitter are often shortened.) A context object is associated with each URL flowing through the Scheduler. The Fetcher Client adds all fetch information including status, downloaded headers, and content into the context object and passes it on to the Post Processor. The Post Processor runs the extracted page content through a metadata extractor library, which detects page encoding and parses the page with an open-source HTML5 parser. The extractor library implements a set of heuristics to retrieve page metadata such as title, description, and representative image. The Post Processor then writes all the metadata and fetch information into the Metadata Store. If necessary, the Post Processor can also schedule a set of dependent fetches. An example of dependent fetches is embedded media, such as images. After post-processing is complete, the URL context object is forwarded to the next stage that logs all the information, including full content, to the Content Store (HDFS) using an open source log aggregator called Scribe. This stage also notifies interested listeners that the URL processing is complete. The notification uses a simple Publish-Subscribe model, which is implemented using Kestrel’s fanout queues.
All processing steps are executed asynchronously – no thread ever waits for a step to complete. All state related to each URL in flight is stored in the context object associated with it, which makes the threading model very simple. The asynchronous implementation also benefits from the convenient abstractions and constructs provided by Finagle and the Twitter Util libraries.
The URL FetcherLet’s take a look at how a Fetcher processes a URL. The URL Fetcher
The Fetcher receives the URL through its Thrift interface. After basic validation, the Thrift handler passes the URL to a Request Queue Manager, which assigns it to the appropriate Request Queue. A scheduled task drains each Request Queue at a fixed rate. Once the URL is pulled off of its queue, it is sent to the HTTP Service for processing. The HTTP service, built on top of Finagle, first checks if the host associated with the URL is already in its cache. If not, it creates a Finagle client for it and schedules a robots.txt fetch. After the robots.txt is downloaded, the HTTP service fetches the permitted URL. The robots.txt file itself is cached, both in the in-process Host Cache as well as in Memcached to prevent its re-fetch for every new URL that the Fetcher encounters from that host.
Tasks called Vultures periodically examine the Request Queues and Host Cache to find queues and hosts that haven’t been used for a period of time; when found, they are deleted. The Vultures also report useful stats through logs and the Twitter Commons stats exporting library. The Fetcher’s Request Queue serves an important purpose: rate limiting. SpiderDuck rate limits outgoing HTTP fetch requests per-domain so as not to overload web servers receiving requests. For accurate rate limiting, SpiderDuck ensures each Request Queue is assigned to exactly one Fetcher at any point of time, with automatic failover to a different Fetcher in case the assigned Fetcher fails. A cluster suite called Pacemaker assigns Request Queues to Fetchers and manages failover. URLs are assigned to Request Queues based on their domains by a Fetcher client library. The default rate limit used for all web sites can be overriden on a per-domain basis, as needed. The Fetchers also implement queue backoff logic. That is, if URLs are coming in faster than they can be drained, they reject requests to indicate to the client to backoff or take other suitable action. For security purposes, the Fetchers are deployed in a special zone in Twitter data centers called a DMZ. This means that the Fetchers cannot access Twitter’s production clusters and services. Hence, it is all the more important to keep them lightweight and self contained, a principle which guided many aspects of the design.
How Twitter uses SpiderDuckTwitter services consume SpiderDuck data in a number of ways. Most query the Metadata Store directly to retrieve URL metadata (for example, page title) and resolution information (that is, the canonical URL after redirects). The Metadata Store is populated in real-time, typically seconds after the URL is tweeted. These services do not talk directly to Cassandra, but instead to SpiderDuck Thrift servers that proxy the requests. This intermediate layer provides SpiderDuck the flexibility to transparently switch storage systems, if necessary. It also supports an avenue for higher level API abstractions than what would be possible if the services interacted directly with Cassandra. Other services periodically process SpiderDuck logs in HDFS to generate aggregate stats for Twitter’s internal metrics dashboards or conduct other types of batch analyses. The dashboards help us answer questions like “How many images are shared on Twitter each day?” “What news sites do Twitter users most often link to?” and “How many URLs did we fetch yesterday from this specific website?” Note that services don’t typically tell SpiderDuck what to fetch; SpiderDuck fetches all URLs from incoming Tweets. Instead, services query information related to URLs after it becomes available. SpiderDuck also allows services to make requests directly to the Fetchers to fetch arbitrary content via HTTP (thus benefiting from our data center setup, rate limiting, robots.txt support and so on), but this use case is not common. Performance numbersSpiderDuck processes several hundred URLs every second. A majority of these are unique over the time window defined by SpiderDuck’s fetch policy, and hence get fetched. For URLs that get fetched, SpiderDuck’s median processing latency is under two seconds, and the 99th percentile processing latency is under five seconds. This latency is measured from Tweet creation time, which means that in under five seconds after a user clicked “Tweet,” the URL in that Tweet is extracted, prepared for fetch, all redirect hops are retrieved, the content is downloaded and parsed, and the metadata is extracted and made available to clients via the Metadata Store. Most of that time is spent either in the Fetcher Request Queues (due to rate limiting) or in actually fetching from the external web server. SpiderDuck itself adds no more than a few hundred milliseconds of processing overhead, most of which is spent in HTML parsing. SpiderDuck’s Cassandra-based Metadata Store handles close to 10,000 requests per second. Each request is typically for a single URL or a small batch (around 20 URLs), but it also processes large batch requests (200-300 URLs). The store’s median latency for reads is 4-5 milliseconds, and its 99th percentile is 50-60 milliseconds.
AcknowledgementsThe SpiderDuck core team consisted of the following folks: Abhi Khune, Michael Busch, Paul Burstein, Raghavendra Prabhu, Tian Wang and Yi Zhuang. In addition, we’d like to acknowledge the following folks, spanning many teams across the company, who contributed to the project either directly, by helping with components SpiderDuck relies on (for example, Cassandra, Finagle, Pacemaker and Scribe) or with its unique data center setup: Alan Liang, Brady Catherman, Chris Goffinet, Dmitriy Ryaboy, Gilad Mishne, John Corwin, John Sirois, Jonathan Boulle, Jonathan Reichhold, Marius Eriksen, Nick Kallen, Ryan King, Samuel Luckenbill, Steve Jiang, Stu Hood and Travis Crawford. Thanks also to the entire Twitter Search team for their invaluable design feedback and support.
If you want to work on projects like this, join the flock! ..
Details:
http://engineering.twitter.com/2011/11/spiderduck-twitters-real-time-url.html
|
Posted on Wed, 14 Sep 2011 17:43:00 +0000
As the number of people using Twitter has grown, we've wanted to make sure that we deliver the best possible experience to users, regardless of platform or device. Since twitter.com is not optimized for smaller screens or touch interactions familiar to many smart phones, we decided to build a cross-platform web application that felt native in its responsiveness and speed for those who prefer accessing Twitter on their phone's or the tablet’s browser.
A better mobile user experience
When building mobile.twitter.com as a web client, we used many of the tools offered in HTML5, CSS3, and JavaScript to develop an application that has the same look, feel, and performance of a native mobile application. This post focuses on four primary areas of the mobile app architecture that enabled us to meet our performance and usability goals:
- event listeners
- scroll views
- templates
- storage
Twitter's mobile app architecture
Event listener
For the Twitter application to feel native, responses have to be immediate. The web application delivers this experience by using event listeners in its code.
Traditionally, Javascript uses DOM-only events such as onclick, mouseover, mouseout, focus, and blur to render a page. However, because Twitter has so many unique points of interaction, we decided to optimize the resources presented to us with mobile devices. The web application we developed uses event listeners throughout the code. These syntactic events, loaded with the JavaScript on the client, listen for unique triggers that are fired, following the users’ interactions. When users retweet or favorite Tweets, the JavaScript listens for those events and responds accordingly throughout the application, updating screen views where necessary.
The client-side JavaScript on the mobile application handles communication with Twitter through the Twitter API. To illustrate the use of event listeners, let’s look at how a Retweet works. When a user clicks the Retweet button on the UI, the system fires a click event that fires a Retweet request through the API.
The web client application listens for an event like a Retweet and updates the rest of the application when it receives it.
When that Retweet event is successful, a return event fires off a signal and the web app listens for a successful Retweet notification. When it receives the notification, the rest of the application updates appropriately.
The web app’s architecture ensures that while the user-facing layer for the various web apps may differ, the app reuses the custom event listeners throughout, thus making it possible to scale across all devices. For instance, both the iPhone and the iPad use the same views and modules, but in different navigation contexts, while the event architecture drives the rest of the application.
ScrollViews
Mobile browsers use a viewport on top of the browser's window to allow the user to zoom and scroll the content of an entire page. As helpful as this is, the viewport prevents the web pages from using fixed positioned elements and scrolling gestures. Both of these provide a better user experience because the app header is fixed and you can fit more content in a smaller area.
We worked around the limitations of native scrolling by writing a ScrollView component that allows users to scroll the content using JavaScript and CSS Transforms and Transitions. The CSS Transforms uses the device's GPU to mimic the browser's viewport.
ScrollView adds a scrolling element and a wrapper container to the element that you wish to scroll. The wrapper container has a fixed width and height so that the inner contents can overflow. The JavaScript calculates the amount of pixels that overflow and moves the scroll element, using CSS Transforms.
ScrollView listens for three events, onTouchStart, onTouchMove, and onTouchEnd to render a smooth animation:
onTouchStartThe mobile site stores the initial touch position, timestamp and other variables that it will use later to calculate the distance and velocity of the scroll.
onTouchMoveNext, the web app simply moves the scroll element by the delta between the start and the current positions.
onTouchEndFinally, the web app confirms if the scroll element has moved. If there was no movement, the application fires a click event that stops the scrolling action. If the scroll element moved, it calculates the distance and the speed to generate inertial scrolling, which fires a timer.
When the timer fires, the application uses CSS Transforms to move the scroll element to the new position while it decreases the velocity logarithmically. Once the velocity reaches a minimum speed, the application cancels the timer and completes the animation. During this process, it takes into account important coordinates to calculate the elasticity when the user scrolls past the lower or the upper boundary of the scroll element.
ScrollView is used to specify which content is scrollable. It can also be used to fix the navigation header to the top of the window to implement Pull-To-Refresh and infinite Tweet timelines.
Templates
One of the many customized solutions unique to Twitter and its user experience is a templating system. Templating is a two-pass process. During the first pass, the app expands the templates and marks the places in those resulting strings where dynamic data needs to go. The app then caches the results of the first pass. When it does a second pass to add dynamic data, the app references the cache, delivering a substantial performance benefit.
Efficient storage
In addition to custom events, we reexamined the use of storage available to the web app from the native browser. Since 15 percent of all mobile applications are launched when the device is offline, the solution needed to cover both online and offline instances. Twitter’s new mobile web app makes use of the HTML5’s app cache, which allows you to specify which files the browser should cache and make available to offline users. Using app cache also helps limit the amount of network activity. You can specify in your manifest file what to store; these include items such as the master index file, sprites, and other assets. When a user loads a page, the web app shows the assets from app cache; it stores the new assets when the manifest gets updated. This ensures the web app can be used even when it is offline, since an updated manifest is always waiting in the cache.
The web app also uses local storage for simple items, such as user settings, user information, and strings, that are persistent throughout the application for immediate access. It uses a SQL database to handle Tweets and Profiles. Within the schema, each storage database gets a name based on the user, allowing for very quick joins between tables. Separate user tables allow for encapsulation and provide the ability to bundle data securely, by user. Given the growing use cases for devices like an iPad, especially in a multilingual setting, this innovation allows for two people using separate languages to receive all the translated strings cached per user on the same device.
In addition to using storage elements of the HTML5 spec, Twitter’s mobile application also makes use of some of the best tools of CSS3. This list includes
- Flex box model
- Gradients
- Shadows
- 3D transforms
- Transitions
- Animations
Future direction
The event framework gives us a scalable way to grow this product over time. Our goal is to add support for new devices as well as build new user-facing features and elements. We will invest in both native applications and the web. In cases where we can or should go native, we will, but in many cases we believe our web app provides an optimal approach for serving a broad set of users.
Acknowledgements
Twitter’s HTML5 mobile application was developed by Manuel Deschamps (@manuel) and designed by Bryan Haggerty (@bhaggs). Mark Percival (@mdp) contributed to the coding of the mobile architecture. ..
Details:
http://engineering.twitter.com/2011/09/twitters-mobile-web-app-delivers_14.html
|
Posted on Fri, 19 Aug 2011 22:55:00 +0000
Finagle is a protocol-agnostic, asynchronous RPC system for the JVM that makes it easy to build robust clients and servers in Java, Scala, or any JVM-hosted language.
Rendering even the simplest web page on twitter.com requires the collaboration of dozens of network services speaking many different protocols. For example, in order to render the home page, the application issues requests to the Social Graph Service, Memcached, databases, and many other network services. Each of these speaks a different protocol: Thrift, Memcached, MySQL, and so on. Additionally, many of these services speak to other services -- they are both servers and clients. The Social Graph Service, for instance, provides a Thrift interface but consumes from a cluster of MySQL databases.
In such systems, a frequent cause of outages is poor interaction between components in the presence of failures; common failures include crashed hosts and extreme latency variance. These failures can cascade through the system by causing work queues to back up, TCP connections to churn, or memory and file descriptors to become exhausted. In the worst case, the user sees a Fail Whale.
Challenges of building a stable distributed system
Sophisticated network servers and clients have many moving parts: failure detectors, load-balancers, failover strategies, and so on. These parts need to work together in a delicate balance to be resilient to the varieties of failure that occur in a large production system.
This is made especially difficult by the many different implementations of failure detectors, load-balancers, and so on, per protocol. For example, the implementation of the back-pressure strategies for Thrift differ from those for HTTP. Ensuring that heterogeneous systems converge to a stable state during an incident is extremely challenging.
Our approach
We set out to develop a single implementation of the basic components of network servers and clients that could be used for all of our protocols. Finagle is a protocol-agnostic, asynchronous Remote Procedure Call (RPC) system for the Java Virtual Machine (JVM) that makes it easy to build robust clients and servers in Java, Scala, or any JVM-hosted language. Finagle supports a wide variety of request/response- oriented RPC protocols and many classes of streaming protocols.
Finagle provides a robust implementation of:
- connection pools, with throttling to avoid TCP connection churn;
- failure detectors, to identify slow or crashed hosts;
- failover strategies, to direct traffic away from unhealthy hosts;
- load-balancers, including “least-connections” and other strategies; and
- back-pressure techniques, to defend servers against abusive clients and dogpiling.
Additionally, Finagle makes it easier to build and deploy a service that
- publishes standard statistics, logs, and exception reports;
- supports distributed tracing (a la Dapper) across protocols;
- optionally uses ZooKeeper for cluster management; and
- supports common sharding strategies.
We believe our work has paid off -- we can now write and deploy a network service with much greater ease and safety.
Finagle at Twitter
Today, Finagle is deployed in production at Twitter in several front- and back-end serving systems, including our URL crawler and HTTP Proxy. We plan to continue deploying Finagle more widely.
A Finagle-based architecture (under development)
The diagram illustrates a future architecture that uses Finagle pervasively. For example, the User Service is a Finagle server that uses a Finagle memcached client, and speaks to a Finagle Kestrel service.
How Finagle works
Finagle is flexible and easy to use because it is designed around a few simple, composable primitives: Futures, Services, and Filters.
Future objects
In Finagle, Future objects are the unifying abstraction for all asynchronous computation. A Future represents a computation that may not yet have completed and that can either succeed or fail. The two most basic ways to use a Future are to:
Composing Futures
Futures can be combined and transformed in interesting ways, leading to the kind of compositional behavior commonly seen in functional programming languages. For instance, you can convert a Future[String] to a Future[Int] by using map:
Similarly, you can use flatMap to easily pipeline a sequence of Futures:
In this example, User.authenticate() is performed asynchronously; Tweet.findAllByUser() is invoked on its eventual result. This is alternatively expressed in Scala, using the for statement:
Handling errors and exceptions is very easy when Futures are pipelined using flatMap or the for statement. In the above example, if User.authenticate() asynchronously raises an exception, the subsequent call to Tweet.findAllByUser() never happens. Instead, the result of the pipelined expression is still of the type Future[Seq[Tweet]], but it contains the exceptional value rather than tweets. You can respond to the exception using the onFailure callback or other compositional techniques.
A nice property of Futures, as compared to other asynchronous programming techniques (such as the continuation passing style), is that you an easily write clear and robust asynchronous code, even with more sophisticated operations such as scatter/gather:
Service objects
A Service is a function that receives a request and returns a Future object as a response. Note that both clients and servers are represented as Service objects.
To create a Server, you extend the abstract Service class and listen on a port. Here is a simple HTTP server listening on port 10000:
Building an HTTP client is even easier:
Filter objects
Filters are a useful way to isolate distinct phases of your application into a pipeline. For example, you may need to handle exceptions, authorization, and so forth before your Service responds to a request.
A Filter wraps a Service and, potentially, converts the input and output types of the Service to other types. In other words, a Filter is a Service transformer. Here is a filter that ensures an HTTP request has valid OAuth credentials that uses an asynchronous authenticator service:
A Filter then decorates a Service, as in this example:
Finagle is an open source project, available under the Apache License, Version 2.0. Source code and documentation are available on GitHub.
Acknowledgements
Finagle was originally conceived by Marius Eriksen and Nick Kallen. Other key contributors are Arya Asemanfar, David Helder, Evan Meagher, Gary McCue, Glen Sanford, Grant Monroe, Ian Ownbey, Jake Donham, James Waldrop, Jeremy Cloud, Johan Oskarsson, Justin Zhu, Raghavendra Prabhu, Robey Pointer, Ryan King, Sam Whitlock, Steve Jenson, Wanli Yang, Wilhelm Bierbaum, William Morgan, Abhi Khune, and Srini Rajagopal.
..
Details:
http://engineering.twitter.com/2011/08/finagle-protocol-agnostic-rpc-system.html
|
Posted on Thu, 04 Aug 2011 19:40:00 +0000
We've received a lot of questions about what's going to happen to Storm now that BackType has been acquired by Twitter. I'm pleased to announce that I will be releasing Storm at Strange Loop on September 19th! Check out the session info for more details.
In my preview post about Storm, I discussed how Storm can be applied to a huge variety of realtime computation problems. In this post, I'll give more details on Storm and what it's like to use.
Here's a recap of the three broad use cases for Storm:
- Stream processing: Storm can be used to process a stream of new data and update databases in realtime. Unlike the standard approach of doing stream processing with a network of queues and workers, Storm is fault-tolerant and scalable.
- Continuous computation: Storm can do a continuous query and stream the results to clients in realtime. An example is streaming trending topics on Twitter into browsers. The browsers will have a realtime view on what the trending topics are as they happen.
- Distributed RPC: Storm can be used to parallelize an intense query on the fly. The idea is that your Storm topology is a distributed function that waits for invocation messages. When it receives an invocation, it computes the query and sends back the results. Examples of Distributed RPC are parallelizing search queries or doing set operations on large numbers of large sets.
The beauty of Storm is that it's able to solve such a wide variety of use cases with just a simple set of primitives.
Components of a Storm cluster
A Storm cluster is superficially similar to a Hadoop cluster. Whereas on Hadoop you run "MapReduce jobs", on Storm you run "topologies". "Jobs" and "topologies" themselves are very different -- one key difference is that a MapReduce job eventually finishes, whereas a topology processes messages forever (or until you kill it).
There are two kinds of nodes on a Storm cluster: the master node and the worker nodes. The master node runs a daemon called "Nimbus" that is similar to Hadoop's "JobTracker". Nimbus is responsible for distributing code around the cluster, assigning tasks to machines, and monitoring for failures.
Each worker node runs a daemon called the "Supervisor". The supervisor listens for work assigned to its machine and starts and stops worker processes as necessary based on what Nimbus has assigned to it. Each worker process executes a subset of a topology; a running topology consists of many worker processes spread across many machines.
All coordination between Nimbus and the Supervisors is done through a Zookeeper cluster. Additionally, the Nimbus daemon and Supervisor daemons are fail-fast and stateless; all state is kept in Zookeeper or on local disk. This means you can kill -9 Nimbus or the Supervisors and they'll start back up like nothing happened. This design leads to Storm clusters being incredibly stable. We've had topologies running for months without requiring any maintenance.
Running a Storm topology
Running a topology is straightforward. First, you package all your code and dependencies into a single jar. Then, you run a command like the following:
storm jar all-my-code.jar backtype.storm.MyTopology arg1 arg2
This runs the class backtype.storm.MyTopology with the arguments arg1 and arg2. The main function of the class defines the topology and submits it to Nimbus. The storm jar part takes care of connecting to Nimbus and uploading the jar.
Since topology definitions are just Thrift structs, and Nimbus is a Thrift service, you can create and submit topologies using any programming language. The above example is the easiest way to do it from a JVM-based language.
Streams and Topologies
Let's dig into the abstractions Storm exposes for doing scalable realtime computation. After I go over the main abstractions, I'll tie everything together with a concrete example of a Storm topology.
The core abstraction in Storm is the "stream". A stream is an unbounded sequence of tuples. Storm provides the primitives for transforming a stream into a new stream in a distributed and reliable way. For example, you may transform a stream of tweets into a stream of trending topics.
The basic primitives Storm provides for doing stream transformations are "spouts" and "bolts". Spouts and bolts have interfaces that you implement to run your application-specific logic.
A spout is a source of streams. For example, a spout may read tuples off of a Kestrel queue and emit them as a stream. Or a spout may connect to the Twitter API and emit a stream of tweets.
A bolt does single-step stream transformations. It creates new streams based on its input streams. Complex stream transformations, like computing a stream of trending topics from a stream of tweets, require multiple steps and thus multiple bolts.
Multi-step stream transformations are packaged into a "topology" which is the top-level abstraction that you submit to Storm clusters for execution. A topology is a graph of stream transformations where each node is a spout or bolt. Edges in the graph indicate which bolts are subscribing to which streams. When a spout or bolt emits a tuple to a stream, it sends the tuple to every bolt that subscribed to that stream.
Everything in Storm runs in parallel in a distributed way. Spouts and bolts execute as many threads across the cluster, and they pass messages to each other in a distributed way. Messages never pass through any sort of central router, and there are no intermediate queues. A tuple is passed directly from the thread who created it to the threads that need to consume it.
Storm guarantees that every message flowing through a topology will be processed, even if a machine goes down and the messages it was processing get dropped. How Storm accomplishes this without any intermediate queuing is the key to how it works and what makes it so fast.
Let's look at a concrete example of spouts, bolts, and topologies to solidify the concepts.
A simple example topology
The example topology I'm going to show is "streaming word count". The topology contains a spout that emits sentences, and the final bolt emits the number of times each word has appeared across all sentences. Every time the count for a word is updated, a new count is emitted for it. The topology looks like this:
Here's how you define this topology in Java:
The spout for this topology reads sentences off of the "sentence_queue" on a Kestrel server located at kestrel.backtype.com on port 22133.
The spout is inserted into the topology with a unique id using the setSpout method. Every node in the topology must be given an id, and the id is used by other bolts to subscribe to that node's output streams. The KestrelSpout is given the id "1" in this topology.
setBolt is used to insert bolts in the topology. The first bolt defined in this topology is the SplitSentence bolt. This bolt transforms a stream of sentences into a stream of words. Let's take a look at the implementation of SplitSentence:
The key method is the execute method. As you can see, it splits the sentence into words and emits each word as a new tuple. Another important method is declareOutputFields, which declares the schema for the bolt's output tuples. Here it declares that it emits 1-tuples with a field called "word".
Bolts can be implemented in any language. Here is the same bolt implemented in Python:
The last parameter to setBolt is the amount of parallelism you want for the bolt. The SplitSentence bolt is given a parallelism of 10 which will result in 10 threads executing the bolt in parallel across the Storm cluster. To scale a topology, all you have to do is increase the parallelism for the bolts at the bottleneck of the topology.
The setBolt method returns an object that you use to declare the inputs for the bolt. Continuing with the example, the SplitSentence bolt subscribes to the output stream of component "1" using a shuffle grouping. "1" refers to the KestrelSpout that was already defined. I'll explain the shuffle grouping part in a moment. What matters so far is that the SplitSentence bolt will consume every tuple emitted by the KestrelSpout.
A bolt can subscribe to multiple input streams by chaining input declarations, like so:
You would use this functionality to implement a streaming join, for instance.
The final bolt in the streaming word count topology, WordCount, reads in the words emitted by SplitSentence and emits updated counts for each word. Here's the implementation of WordCount:
WordCount maintains a map in memory from word to count. Whenever it sees a word, it updates the count for the word in its internal map and then emits the updated count as a new tuple. Finally, in declareOutputFields the bolt declares that it emits a stream of 2-tuples named "word" and "count".
The internal map kept in memory will be lost if the task dies. If it's important that the bolt's state persist even if a task dies, you can use an external database like Riak, Cassandra, or Memcached to store the state for the word counts. An in-memory HashMap is used here for simplicity purposes.
Finally, the WordCount bolt declares its input as coming from component 2, the SplitSentence bolt. It consumes that stream using a "fields grouping" on the "word" field.
"Fields grouping", like the "shuffle grouping" that I glossed over before, is a type of "stream grouping". "Stream groupings" are the final piece that ties topologies together.
Stream groupings
A stream grouping tells a topology how to send tuples between two components. Remember, spouts and bolts execute in parallel as many tasks across the cluster. If you look at how a topology is executing at the task level, it looks something like this:
When a task for Bolt A emits a tuple to Bolt B, which task should it send the tuple to?
A "stream grouping" answers this question by telling Storm how to send tuples between sets of tasks. There's a few different kinds of stream groupings.
The simplest kind of grouping is called a "shuffle grouping" which sends the tuple to a random task. A shuffle grouping is used in the streaming word count topology to send tuples from KestrelSpout to the SplitSentence bolt. It has the effect of evenly distributing the work of processing the tuples across all of SplitSentence bolt's tasks.
A more interesting kind of grouping is the "fields grouping". A fields grouping is used between the SplitSentence bolt and the WordCount bolt. It is critical for the functioning of the WordCount bolt that the same word always go to the same task. Otherwise, more than one task will see the same word, and they'll each emit incorrect values for the count since each has incomplete information. A fields grouping lets you group a stream by a subset of its fields. This causes equal values for that subset of fields to go to the same task. Since WordCount subscribes to SplitSentence's output stream using a fields grouping on the "word" field, the same word always goes to the same task and the bolt produces the correct output.
Fields groupings are the basis of implementing streaming joins and streaming aggregations as well as a plethora of other use cases. Underneath the hood, fields groupings are implemented using consistent hashing.
There are a few other kinds of groupings, but talking about those is beyond the scope of this post.
With that, you should now have everything you need to understand the streaming word count topology. The topology doesn't require that much code, and it's completely scalable and fault-tolerant. Whether you're processing 10 messages per second or 100K messages per second, this topology can scale up or down as necessary by just tweaking the amount of parallelism for each component.
The complexity that Storm hides
The abstractions that Storm provides are ultimately pretty simple. A topology is composed of spouts and bolts that you connect together with stream groupings to get data flowing. You specify how much parallelism you want for each component, package everything into a jar, submit the topology and code to Nimbus, and Storm keeps your topology running forever. Here's a glimpse at what Storm does underneath the hood to implement these abstractions in an extremely robust way.
Guaranteed message processing: Storm guarantees that each tuple coming off a spout will be fully processed by the topology. To do this, Storm tracks the tree of messages that a tuple triggers. If a tuple fails to be fully processed, Storm will replay the tuple from the Spout. Storm incorporates some clever tricks to track the tree of messages in an efficient way. - Robust process management: One of Storm's main tasks is managing processes around the cluster. When a new worker is assigned to a supervisor, that worker should be started as quickly as possible. When that worker is no longer assigned to that supervisor, it should be killed and cleaned up.
An example of a system that does this poorly is Hadoop. When Hadoop launches a task, the burden for the task to exit is on the task itself. Unfortunately, tasks sometimes fail to exit and become orphan processes, sucking up memory and resources from other tasks.
In Storm, the burden of killing a worker process is on the supervisor that launched it. Orphaned tasks simply cannot happen with Storm, no matter how much you stress the machine or how many errors there are. Accomplishing this is tricky because Storm needs to track not just the worker processes it launches, but also subprocesses launched by the workers (a subprocess is launched when a bolt is written in another language).
The nimbus daemon and supervisor daemons are stateless and fail-fast. If they die, the running topologies aren't affected. The daemons just start back up like nothing happened. This is again in contrast to how Hadoop works.
Fault detection and automatic reassignment: Tasks in a running topology heartbeat to Nimbus to indicate that they are running smoothly. Nimbus monitors heartbeats and will reassign tasks that have timed out. Additionally, all the tasks throughout the cluster that were sending messages to the failed tasks quickly reconnect to the new location of the tasks. Efficient message passing: No intermediate queuing is used for message passing between tasks. Instead, messages are passed directly between tasks using ZeroMQ. This is simpler and way more efficient than using intermediate queuing. ZeroMQ is a clever "super-socket" library that employs a number of tricks for maximizing the throughput of messages. For example, it will detect if the network is busy and automatically batch messages to the destination.
Another important part of message passing between processes is serializing and deserializing messages in an efficient way. Again, Storm automates this for you. By default, you can use any primitive type, strings, or binary records within tuples. If you want to be able to use another type, you just need to implement a simple interface to tell Storm how to serialize it. Then, whenever Storm encounters that type, it will automatically use that serializer. Local mode and distributed mode: Storm has a "local mode" where it simulates a Storm cluster completely in-process. This lets you iterate on your topologies quickly and write unit tests for your topologies. You can run the same code in local mode as you run on the cluster.
Storm is easy to use, configure, and operate. It is accessible for everyone from the individual developer processing a few hundred messages per second to the large company processing hundreds of thousands of messages per second.
Relation to “Complex Event Processing”
Storm exists in the same space as “Complex Event Processing” systems like Esper, Streambase, and S4. Among these, the most closely comparable system is S4. The biggest difference between Storm and S4 is that Storm guarantees messages will be processed even in the face of failures whereas S4 will sometimes lose messages.
Some CEP systems have a built-in data storage layer. With Storm, you would use an external database like Cassandra or Riak alongside your topologies. It’s impossible for one data storage system to satisfy all applications since different applications have different data models and access patterns. Storm is a computation system and not a storage system. However, Storm does have some powerful facilities for achieving data locality even when using an external database.
Summary
I've only scratched the surface on Storm. The "stream" concept at the core of Storm can be taken so much further than what I've shown here -- I didn't talk about things like multi-streams, implicit streams, or direct groupings. I showed two of Storm's main abstractions, spouts and bolts, but I didn't talk about Storm's third, and possibly most powerful abstraction, the "state spout". I didn't show how you do distributed RPC over Storm, and I didn't discuss Storm's awesome automated deploy that lets you create a Storm cluster on EC2 with just the click of a button.
For all that, you're going to have to wait until September 19th. Until then, I will be working on adding documentation to Storm so that you can get up and running with it quickly once it's released. We're excited to release Storm, and I hope to see you there at Strange Loop when it happens.
- Nathan Marz (@nathanmarz) ..
Details:
http://engineering.twitter.com/2011/08/storm-is-coming-more-details-and-plans.html
|
Posted on Fri, 01 Jul 2011 16:55:00 +0000
Starting today, Twitter is offering TwUI as an open-source framework (https://github.com/twitter/twui) for developing interfaces on the Mac. Until now, there was not a simple and effective way to design interactive, hardware-accelerated interfaces on the Mac. Core Animation can create hardware-accelerated drawings, but doesn't provide interaction mechanisms. AppKit and NSView have excellent interaction mechanisms, but the drawings operations are CPU-bound, which makes fluid scrolling, animations, and other effects difficult – if not impossible – to accomplish. UIKit on Apple’s iOS platform has offered developers a fresh start. While UIKit borrows many ideas from AppKit regarding interaction, it can offload compositing to the GPU because it is built on top of Core Animation. This architecture has enabled developers to create many applications that were, until this time, impossible to build. TwUI as a solutionTwUI brings the philosophy of UIKit to the desktop. It is built on top of Core Animation, and it borrows interaction ideas from AppKit. It allows for all the things Mac users expect, including drag & drop, mouse events, tooltips, Mac-like text selection, and so on. And, since TwUI isn’t bound by the constraints of an existing API, developers can experiment with new features like block-based drawRect and layout. How TwUI worksYou will recognize the fundamentals of TwUI if you are familiar with UIKit. For example, a "TUIView" is a simple, lightweight wrapper around a Core Animation layer – much like UIView on iOS. TUIView offers useful subclasses for operations such as scroll views, table views, buttons, and so on. More importantly, TwUI makes it easy to build your own custom interface components. And because all of these views are backed by layers, composited by Core Animation, your UI is rendered at optimal speed.
 Xcode running the TwUI example project Ongoing developmentSince TwUI forms the basis of Twitter for the Mac, it is an integral part of our shipping code. Going forward, we need to stress test it in several implementations. We’ll continue to develop additional features and make improvements. And, we encourage you to experiment, as that will help us build a robust and exciting UI framework for the Mac. AcknowledgementsThe following engineers were mainly responsible for the TwUI development: -Loren Brichter (@lorenb), Ben Sandofsky (@sandofsky) ..
Details:
http://engineering.twitter.com/2011/07/starting-today-twitter-is-offering-twui.html
|
Posted on Wed, 22 Jun 2011 20:30:00 +0000
Engineering Open House
Twitter’s engineering team is growing quickly. Two-thirds of our engineers were hired in the last 12 months. Those engineers joined us from cities and countries around the world and from companies of various sizes. As part of our effort to find and hire great people to build great products and solve complicated problems, last Thursday we invited several dozen engineers to Twitter HQ for our first engineering open house. Presentations from @wfarner, @michibusch, @mracus and @esbie showcased the depth and range of the effort required to present twitter.com to the world. The topics covered some of these key areas for development: Dynamic deployment and resource management with Mesos - @wfarner Using Mesos as a platform, we have built a private cloud system on which we hope to eventually run most, if not all, of our services. We expect this to simplify deployment and improve the reliability of our systems, while making more efficient use of our compute resources.
Real-time search at Twitter - @michibusch Since 2008, Twitter has made dramatic enhancements to our real-time search engine, scaling it from 200 QPS to 18,000 QPS. At the core of our infrastructure is Earlybird, a version of Lucene modified for real-time search. This work, combined with other key infrastructure components, led to our recent revamp of the search experience and will enable future innovation in real-time search. The client-side architecture of #NewTwitter- @mracus and @esbie
Client-side applications for desktop and mobile environments have access to a class of well-rounded tools and framework components that aren't as yet widely available for the browser. Therefore, a fully in-browser app like #NewTwitter requires investment in solid architecture in order to remain clean and extensible as it grows. At Twitter, we're constantly iterating on the in-house and open source JavaScript tools we use to address this need.
This was Twitter's first engineering open house, but it certainly won’t be our last. We plan to hold these regularly - every couple months or so. In the meantime, if you’re interested in keeping up with our engineering team, you can follow @twittereng or check out our jobs page. - Mike Abbott (@mabb0tt), VP Engineering ..
Details:
http://engineering.twitter.com/2011/06/join-flock.html
|
Posted on Tue, 31 May 2011 21:57:00 +0000
Today, Twitter launched a personalized search experience to help our users find the most relevant Tweets, images, and videos. To build this product, our infrastructure needed to support two major features: relevance-filtering of search results and the identification of relevant images and photos. Both features leverage a ground-up rewrite of the search infrastructure, with Blender and Earlybird at the core. Investment in SearchSince the acquisition of Summize in 2008, Twitter has invested heavily in search. We've grown our search team from three to 15 engineers and scaled our real-time search engine by two orders of magnitude — all this, while we replaced the search infrastructure in flight, with no major service interruptions. The engineering story behind the evolution of search is compelling. The Summize infrastructure used Ruby on Rails for the front-end and MySQL for the back-end (the same architecture as the one used by Twitter and many other start-ups). At the time, Lucene and other open-source search technology did not support real-time search. As a result, we constructed our reverse indexes in MySQL, leveraging its concurrent transactions and B-tree data structures to support concurrent indexing and searching. We were able to scale our MySQL-based solution surprisingly far by partitioning the index across multiple databases and replicating the Rails front-end. In 2008, Twitter search handled an average of 20 TPS and 200 QPS. By October 2010, when we replaced MySQL with Earlybird, the system was handling 1,000 TPS and 12,000 QPS on average. Earlybird, a real-time, reverse index based on Lucene, not only gave us an order of magnitude better performance than MySQL for real-time search, it doubled our memory efficiency and provided the flexibility to add relevance filtering. However, we still needed to replace the Ruby on Rails front-end, which was only capable of synchronous calls to Earlybird and had accrued significant technical debt through years of scaling and transition to Earlybird. In April 2011, we launched a replacement, called Blender, which improved our search latencies by 3x, gave us 10x throughput, and allowed us to remove Ruby on Rails from the search infrastructure. Today, we are indexing an average of 2,200 TPS while serving 18,000 QPS (1.6B queries per day!). More importantly, Blender completed the infrastructure necessary to make the most significant user-facing change to Twitter search since the acquisition of Summize. From Hack-Week Project to ProductionWhen the team launched Earlybird, we were all excited by its potential — it was fast and the code was clean and easy to extend. While on vacation in Germany, Michael Busch, one of our search engineers, implemented a demo of image and video search. A few weeks later, during Twitter's first Hack Week, the search team, along with some members of other teams, completed the first demo of our new search experience. Feedback from the company was so positive that the demo became part of our product roadmap. Surfacing Relevant TweetsThere is a lot of information on Twitter — on average, more than 2,200 new Tweets every second! During large events, for example the #tsunami in Japan, this rate can increase by 3 to 4x. Often, users are interested in only the most memorable Tweets or those that other users engage with. In our new search experience, we show search results that are most relevant to a particular user. So search results are personalized, and we filter out the Tweets that do not resonate with other users. To support relevance filtering and personalization, we needed three types of signals: - Static signals, added at indexing time
- Resonance signals, dynamically updated over time
- Information about the searcher, provided at search time
Getting all of these signals into our index required changes to our ingestion pipeline, Earlybird (our reverse index), and Blender (our front-ends). We also created a new updater component that continually pushes resonance signals to Earlybird. In the ingestion pipeline, we added a pipeline stage that annotates Tweets with static information, for example, information about the user and the language of the Tweet's text. The Tweets are then replicated to the Earlybird indexes (in real time), where we have extended Lucene’s internal data structures to support dynamic updates to arbitrary annotations. Dynamic updates, for example, the users' interactions with Tweets, arrive over time from the updater. Together, Earlybird and the updater support a high and irregular rate of updates without requiring locks or slowing down searches. At query time, a Blender server parses the user’s query and passes it along with the user’s social graph to multiple Earlybird servers. These servers use a specialized ranking function that combines relevance signals and the social graph to compute a personalized relevance score for each Tweet. The highest-ranking, most-recent Tweets are returned to the Blender, which merges and re-ranks the results before returning them to the user.
 Twitter search architecture with support for relevance Removing DuplicatesDuplicate and near-duplicate Tweets are often not particularly helpful in Twitter search results. During popular and important events, when search should be most helpful to our users, nearly identical Tweets increase in number. Even when the quality of the duplicates is high, the searcher would benefit from a more diverse set of results. To remove duplicates we use a technique based on MinHashing, where several signatures are computed per Tweet and two Tweets sharing the same set of signatures are considered duplicates. The twist? Like everything at Twitter, brevity is key: We have a very small memory budget to store the signatures. Our algorithm compresses each Tweet to just 4 bytes while still identifying the vast majority of duplicates with very low computational requirements. Personalization
Twitter is most powerful when you personalize it by choosing interesting accounts to follow, so why shouldn’t your search results be more personalized too? They are now! Our ranking function accesses the social graph and uses knowledge about the relationship between the searcher and the author of a Tweet during ranking. Although the social graph is very large, we compress the meaningful part for each user into a Bloom filter, which gives us space-efficient constant-time set membership operations. As Earlybird scans candidate search results, it uses the presence of the Tweet's author in the user's social graph as a relevance signal in its ranking function. Even users that follow few or no accounts will benefit from other personalization mechanisms; for example, we now automatically detect the searcher's preferred language and location. Images and Videos in SearchImages and videos have an amazing ability to describe people, places, and real-time events as they unfold. Take for example @jkrums' Twitpic of US Airways Flight 1549 Hudson river landing, and @stefmara's photos and videos of space shuttle Endeavour's final launch. There is a fundamental difference between searching for Tweets and searching for entities in Tweets, such as images and videos. In the former case, the decision about whether a Tweet matches a query can be made by looking at the text of the Tweet, with no other outside information. Additionally, per-Tweet relevance signals can be used to rank and compare matching Tweets to find the best ones. The situation is different when searching for images or videos. For example, the same image may be tweeted many times, with each Tweet containing different keywords that all describe the image. Consider the following Tweets: One possible description of the image is formed from the union of keywords in the Tweets' text; that is, "dog", "Australian", and "shepherd" all describe the image. If an image is repeatedly described by a term in the Tweet's text, it is likely to be about that term. So what makes this a difficult problem? Twitter allows you to search Tweets within seconds; images and photos in tweets should be available in realtime too! Earlybird uses inverted indexes for search. While these data structures are extremely efficient, they do not support inline updates, which makes it nearly impossible to append additional keywords to indexed documents. If timeliness was not important, we could use MapReduce jobs that periodically aggregate keyword unions and produce inverted indexes. In these offline indexes, each link to an image or photo link would be a document, with the aggregated keywords as the document’s text. However, to meet our indexing latency goals, we would have to run these MapReduce jobs every few seconds, an impractical solution. Instead, we extended Earlybird's data structures to support efficient lookups of entities contained in Tweets. At query time, we look up the images and videos for matching Tweets and and store them in a custom hash map. The keys of the map are URLs and the values are score counters. Each time the same URL is added to the map, its corresponding score counter is incremented. After this aggregation is complete, the map is sorted and the best images and photos are returned for rendering. What’s next?The search team is excited to build innovative search products that drive discovery and help our users. While the new search experience is a huge improvement over pure real-time search, we are just getting started. In the coming months, we will improve quality, scale our infrastructure, expand our indexes, and bring relevance to mobile. If you are a talented engineer and want to work on the largest real-time search engine in the world, Twitter search is hiring for search quality and search infrastructure! AcknowledgementsThe following people contributed to the launch: Abhi Khune, Abdur Chowdhury, Aneesh Sharma, Ashok Banerjee, Ben Cherry, Brian Larson, Coleen Baik, David Chen, Frost Li, Gilad Mishne, Isaac Hepworth, Jon Boulle, Josh Brewer, Krishna Gade, Michael Busch, Mike Hayes, Nate Agrin, Patrick Lok, Raghavendra Prabu, Sarah Brown, Sam Luckenbill, Stephen Fedele, Tian Wang, Yi Zhuang, Zhenghua Li. We would also like to thank the original Summize team, former team members, hack-week contributors, and management for their contributions and support. —@twittersearch ..
Details:
http://engineering.twitter.com/2011/05/engineering-behind-twitters-new-search.html
|
Posted on Thu, 19 May 2011 20:24:00 +0000
In March 2011, we shared Kiji, an improved Ruby runtime. The initial performance gains were relatively modest, but laid the foundation for future improvements. We continued the work and now have some excellent results.
FASTER REMEMBERED SET CALCULATIONS
In Kiji 0.10, every change to the longlife heap required full recalculation of the "remembered set," the boundary objects referenced from the longlife to the eden heap. For Kiji 0.11, we changed the calculation to an incremental model that only includes newly-allocated objects.
We made this easier by disabling garbage collection during source code parsing, which has a tendency to mutate references in place. Now, if the parser needs more memory, it merely allocates a new heap chunk. This lets us allocate all AST nodes, including those created in instance_eval, on the longlife heap. The result is a big performance boost for applications like template engines that use lots of instance_eval.
MORE OBJECTS IN LONGLIFE
For Kiji 0.11, we now allocate non-transient strings in the longlife heap, along with the AST nodes. This includes strings allocated during parsing, assigned to constants (or members of a constant hash or array), and those that are members of frozen objects. With Ruby's Kernel.freeze method, big parts of frozen objects are now evicted from the ordinary heap and moved to the longlife heap.
This change is significant. When the twitter.com web application ran Kiji 0.10, it had 450,000 live objects after garbage collection in its ordinary heap. Kiji 0.11 places over 300,000 string objects in the longlife heap, reducing the number of live objects in the ordinary heap to under 150,000. The nearly 66 percent reduction allows the heap to collect much less frequently.
SIMPLIFIED HEAP GROWTH STRATEGY
Ruby Enterprise Edition has a set of environment variables that govern when to run the garbage collector and how to grow and shrink the heaps. After evaluating Ruby’s heap growth strategy, we replaced it with one that is much simpler to configure and works better for server workloads.
As a first step, we eliminated GC_MALLOC_LIMIT. This environment variable prescribes when to force a garbage collection, following a set of C-level malloc() calls. We found this setting to be capricious; it performed best when it was set so high as to be effectively off. By eliminating the malloc limit entirely, the Kiji 0.11 garbage collector runs only when heaps are full, or when no more memory can be allocated from the operating system. This also means that under UNIX-like systems, you can more effectively size the process with ulimit -u.
0.11 now has only these three GC-tuning environment variables:
- The first parameter is
RUBY_GC_HEAP_SIZE. This parameter determines the number of objects in a heap slab. The value is specified in numbers of objects. Its default value is 32768.
The next parameter is RUBY_GC_EDEN_HEAPS. This parameter specifies the target number of heap slabs for the ordinary heap. Its default value is 24.
The runtime starts out with a single heap slab, and when it fills up, it collects the garbage and allocates a new slab until it reaches the target number. This gradual strategy keeps fragmentation in the heaps low, as it tends to concentrate longer-lived objects in the earlier heap slabs. If the heap is forced to grow beyond the target number of slabs, the runtime releases vacated slabs after each garbage collection in order to restore the target size. Once the application reaches the target size of ordinary heap, it does not go below it.
Since performance is tightly bound to the rate of eden collections (a classic memory for speed tradeoff), this makes the behavior of a long-lived process very predictable. We have had very good results with settings as high as 64.
- The final parameter is
RUBY_GC_LONGLIFE_LAZINESS, a decimal between 0 and 1, with a default of 0.05. This parameter governs a different heap growth strategy for longlife heap slabs. The runtime releases vacant longlife heap slabs when the ratio of free longlife heap slots to all longlife heap slots after the collection is higher than this parameter. Also, if the ratio is lower after collection, a new heap slab is allocated.
The default value is well-tuned for our typical workload and prevents memory bloat.
We also reversed the order of adding the freed slots onto the free list. Now, new allocations are fulfilled with free slots from older (presumably, more densely-populated) heap slabs first, allowing recently allocated heap slabs to become completely vacant in a subsequent GC run. This may slightly impact locality of reference, but works well for us.
ADDITIONAL CHANGES
We replaced the old profiling methods that no longer applied with our improved memory debugging.
We also removed the “fastmarktable” mode, where the collector used a mark bit in the object slots. Kiji 0.11 uses only the copy-on-write friendly mark table. This lets us reset the mark bits after collection by zeroing out the entire mark table, instead of flipping a bit in every live object.
IT’S IN THE NUMBERS
We updated the performance chart from the first blog post about Kiji with the 0.11 data. As you can see, the new data shows a dramatic improvement for our example intensive workload. While Kiji 0.9 responded to all requests until 90 requests/sec and peaked at 95 responses out of 100 requests/sec, Kiji 0.11 responds to all requests until 120 requests/sec. This is a 30% improvement in throughput across the board, and 2.7x the speed of standard Ruby 1.8.
FULL ALLOCATION TRACING
We found that in order to effectively develop Kiji 0.11, we needed to add more sophisticated memory instrumentation than is currently available for Ruby. As a result, we ended up with some really useful debugging additions that you can turn on as well.
The first tool is a summary of memory stats after GC. It lets you cheaply measure the impact of memory-related changes:
The second tool is an allocation tracer (a replacement for BleakHouse and similar tools). After each GC, the runtime writes files containing full stack traces for the allocation points of all freed and surviving objects. You can easily parse this with AWK to list common object types, allocation sites, and number of objects allocated. This makes it easy to identify allocation hotspots, memory leaks, or objects that persist on the eden and should be manually moved to the longlife.
A sample output for allocation tracing, obtained by running RubySpec under Kiji:
For more information, refer to the README-kiji file in the distribution.
FUTURE DIRECTIONS
0.11 is a much more performant and operable runtime than Kiji 0.10. However, through this work we identified a practical strategy for making an even better, fully-generational version that would apply well to Ruby 1.9. Time will tell if we get to implement it. We also would like to investigate the relative performance of JRuby.
TRY IT!
We have released the Kiji REE branch on GitHub.
ACKNOWLEDGEMENTS
The following engineers at Twitter contributed to the REE improvements: Rob Benson, Brandon Mitchell, Attila Szegedi, and Evan Weaver. If you want to work on projects like this, join the flock!
— Attila (@asz) ..
Details:
http://engineering.twitter.com/2011/05/faster-ruby-kiji-update.html
|
Posted on Wed, 06 Apr 2011 18:30:00 +0000
In the spring of 2010, the search team at Twitter started to rewrite our search engine in order to serve our ever-growing traffic, improve the end-user latency and availability of our service, and enable rapid development of new search features. As part of the effort, we launched a new real-time search engine, changing our back-end from MySQL to a real-time version of Lucene. Last week, we launched a replacement for our Ruby-on-Rails front-end: a Java server we call Blender. We are pleased to announce that this change has produced a 3x drop in search latencies and will enable us to rapidly iterate on search features in the coming months.
PERFORMANCE GAINS
Twitter search is one of the most heavily-trafficked search engines in the world, serving over one billion queries per day. The week before we deployed Blender, the #tsunami in Japan contributed to a significant increase in query load and a related spike in search latencies. Following the launch of Blender, our 95th percentile latencies were reduced by 3x from 800ms to 250ms and CPU load on our front-end servers was cut in half. We now have the capacity to serve 10x the number of requests per machine. This means we can support the same number of requests with fewer servers, reducing our front-end service costs.
95th Percentile Search API Latencies Before and After Blender Launch
TWITTER’S IMPROVED SEARCH ARCHITECTURE
In order to understand the performance gains, you must first understand the inefficiencies of our former Ruby-on-Rails front-end servers. The front ends ran a fixed number of single-threaded rails worker processes, each of which did the following:
- parsed queries
- queried index servers synchronously
- aggregated and rendered results
We have long known that the model of synchronous request processing uses our CPUs inefficiently. Over time, we had also accrued significant technical debt in our Ruby code base, making it hard to add features and improve the reliability of our search engine. Blender addresses these issues by:
- Creating a fully asynchronous aggregation service. No thread waits on network I/O to complete.
- Aggregating results from back-end services, for example, the real-time, top tweet, and geo indices.
- Elegantly dealing with dependencies between services. Workflows automatically handle transitive dependencies between back-end services.
The following diagram shows the architecture of Twitter’s search engine. Queries from the website, API, or internal clients at Twitter are issued to Blender via a hardware load balancer. Blender parses the query and then issues it to back-end services, using workflows to handle dependencies between the services. Finally, results from the services are merged and rendered in the appropriate language for the client.
Twitter Search Architecture with Blender
BLENDER OVERVIEW
Blender is a Thrift and HTTP service built on Netty, a highly-scalable NIO client server library written in Java that enables the development of a variety of protocol servers and clients quickly and easily. We chose Netty over some of its other competitors, like Mina and Jetty, because it has a cleaner API, better documentation and, more importantly, because several other projects at Twitter are using this framework. To make Netty work with Thrift, we wrote a simple Thrift codec that decodes the incoming Thrift request from Netty’s channel buffer, when it is read from the socket and encodes the outgoing Thrift response, when it is written to the socket.
Netty defines a key abstraction, called a Channel, to encapsulate a connection to a network socket that provides an interface to do a set of I/O operations like read, write, connect, and bind. All channel I/O operations are asynchronous in nature. This means any I/O call returns immediately with a ChannelFuture instance that notifies whether the requested I/O operations succeed, fail, or are canceled.
When a Netty server accepts a new connection, it creates a new channel pipeline to process it. A channel pipeline is nothing but a sequence of channel handlers that implements the business logic needed to process the request. In the next section, we show how Blender maps these pipelines to query processing workflows.
WORKFLOW FRAMEWORK
In Blender, a workflow is a set of back-end services with dependencies between them, which must be processed to serve an incoming request. Blender automatically resolves dependencies between services, for example, if service A depends on service B, A is queried first and its results are passed to B. It is convenient to represent workflows as directed acyclic graphs (see below).
Sample Blender Workflow with 6 Back-end Services
In the sample workflow above, we have 6 services {s1, s2, s3, s4, s5, s6} with dependencies between them. The directed edge from s3 to s1 means that s3 must be called before calling s1 because s1 needs the results from s3. Given such a workflow, the Blender framework performs a topological sort on the DAG to determine the total ordering of services, which is the order in which they must be called. The execution order of the above workflow would be {(s3, s4), (s1, s5, s6), (s2)}. This means s3 and s4 can be called in parallel in the first batch, and once their responses are returned, s1, s5, and s6 can be called in parallel in the next batch, before finally calling s2.
Once Blender determines the execution order of a workflow, it is mapped to a Netty pipeline. This pipeline is a sequence of handlers that the request needs to pass through for processing.
MULTIPLEXING INCOMING REQUESTS
Because workflows are mapped to Netty pipelines in Blender, we needed to route incoming client requests to the appropriate pipeline. For this, we built a proxy layer that multiplexes and routes client requests to pipelines as follows:
- When a remote Thrift client opens a persistent connection to Blender, the proxy layer creates a map of local clients, one for each of the local workflow servers. Note that all local workflow servers are running inside Blender’s JVM process and are instantiated when the Blender process starts.
- When the request arrives at the socket, the proxy layer reads it, figures out which workflow is requested, and routes it to the appropriate workflow server.
- Similarly, when the response arrives from the local workflow server, the proxy reads it and writes the response back to the remote client.
We made use of Netty’s event-driven model to accomplish all the above tasks asynchronously so that no thread waits on I/O.
DISPATCHING BACK-END REQUESTS
Once the query arrives at a workflow pipeline, it passes through the sequence of service handlers as defined by the workflow. Each service handler constructs the appropriate back-end request for that query and issues it to the remote server. For example, the real-time service handler constructs a realtime search request and issues it to one or more realtime index servers asynchronously. We are using the twitter commons library (recently open-sourced!) to provide connection-pool management, load-balancing, and dead host detection.
The I/O thread that is processing the query is freed when all the back-end requests have been dispatched. A timer thread checks every few milliseconds to see if any of the back-end responses have returned from remote servers and sets a flag indicating if the request succeeded, timed out, or failed. We maintain one object over the lifetime of the search query to manage this type of data.
Successful responses are aggregated and passed to the next batch of service handlers in the workflow pipeline. When all responses from the first batch have arrived, the second batch of asynchronous requests are made. This process is repeated until we have completed the workflow or the workflow’s timeout has elapsed.
As you can see, throughout the execution of a workflow, no thread busy-waits on I/O. This allows us to efficiently use the CPU on our Blender machines and handle a large number of concurrent requests. We also save on latency as we can execute most requests to back-end services in parallel.
BLENDER DEPLOYMENT AND FUTURE WORK
To ensure a high quality of service while introducing Blender into our system, we are using the old Ruby on Rails front-end servers as proxies for routing thrift requests to our Blender cluster. Using the old front-end servers as proxies allows us to provide a consistent user experience while making significant changes to the underlying technology. In the next phase of our deploy, we will eliminate Ruby on Rails entirely from the search stack, connecting users directly to Blender and potentially reducing latencies even further.
—@twittersearch
ACKNOWLEDGEMENTS
The following Twitter engineers worked on Blender: Abhi Khune, Aneesh Sharma, Brian Larson, Frost Li, Gilad Mishne, Krishna Gade, Michael Busch, Mike Hayes, Patrick Lok, Raghavendra Prabhu, Sam Luckenbill, Tian Wang, Yi Zhuang, Zhenghua Li. ..
Details:
http://engineering.twitter.com/2011/04/twitter-search-is-now-3x-faster_1656.html
|
Posted on Tue, 22 Mar 2011 17:11:00 +0000
If you are using Firefox 4, you now have an extra layer of security when accessing mobile.twitter.com.
Over the past few weeks we've been testing a new security feature for our mobile site. It is called a Content Security Policy, or CSP. This policy is a standard developed by Mozilla that aims to thwart cross site scripting (XSS) attacks at their point of execution, the browser. The upcoming release of Firefox 4 implements CSP, and while the mobile site may not get a high volume of desktop browser traffic (the desktop users hitting that site typically have low bandwidth connections), it has given us an opportunity to test out a potentially powerful anti-XSS tool in a controlled setting.
CSP IN A NUTSHELL
In a typical XSS attack, the attacker injects arbitrary Javascript into a page, which is then executed by an end-user. When a website enables CSP, the browser ignores inline Javascript and only loads external assets from a set of whitelisted sites. Enabling CSP on our site was simply a matter of including the policy in the returned headers under the CSP defined key, 'X-Content-Security-Policy'.
The policy also contains a 'reporting URI' to which the browser sends JSON reports of any violations. This feature not only assists debugging of the CSP rules, it also has the potential to alert a site’s owner to emerging threats.
IMPLEMENTING THE FEATURE
Although activating CSP is easy, in order for it to work correctly you may need to modify your site. In our case it meant removing all inline Javascript. While it is good practice to keep inline Javascript out of your HTML, it is sometimes necessary to speed up the load times on slower high-latency mobile phones.
We began our explorations by restricting the changes to browsers that support CSP (currently only Firefox 4) in order to lessen the impact on users. Next, we identified all the possible locations of our assets and built a rule set to encompass those; for example, things such as user profile images and stylesheets from our content delivery network.
Our initial trials revealed that some libraries were evaluating strings of Javascript and triggering a violation, most notably jQuery 1.4, which tests the 'eval' function after load. This wasn’t totally unexpected and we modified some of the libraries to get them to pass. Since jQuery fixed this in 1.5, it is no longer an issue.
INITIAL RESULTS
After a soft launch, we ran into some unexpected issues. Several common Firefox extensions insert Javascript on page load, thereby triggering a report. However, even more surprising were the number of ISPs who were inadvertently inserting Javascript or altering image tags to point to their caching servers. It was the first example of how CSP gave us visibility into what was happening on the user’s end. We addressed this problem by mandating SSL for Firefox 4 users, which prevents any alteration of our content.
Today CSP is one hundred percent live on mobile.twitter.com and we are logging and evaluating incoming violation reports.
FINAL THOUGHTS
Allowing sites like Twitter to disable inline Javascript and whitelist external assets is a huge step towards neutralizing XSS attacks. However, for many sites it is not going to be as simple as flipping a switch. Most sites will require some work and you may need to alter a few third-party Javascript libraries. Depending on how complex your site is, this could entail the bulk of your effort.
We hope other browsers will adopt the CSP standard, especially as more sites depend on client-side code and user-generated content. The simple option of being able to disable inline Javascript and limit external sources gives sites the ability to stop the vast majority of today's attacks with minimal effort.
Over the next couple of months we plan to implement a Content Security Policy across more of Twitter, and we encourage you to request support for this standard in your preferred browser.
ACKNOWLEDGEMENTS
The following people at Twitter contributed to the CSP effort: John Adams, Jacob Hoffman-Andrews, Kevin Lingerfelt, Bob Lord, Mark Percival, and Marcus Philips
FURTHER READING
—Mark (@mdp) ..
Details:
http://engineering.twitter.com/2011/03/improving-browser-security-with-csp.html
|
Posted on Mon, 21 Mar 2011 16:36:00 +0000
If you look back at the history of Twitter, our rate of growth has largely outpaced the capacity of our hardware, software, and the company itself. Indeed, in our first five years, Twitter's biggest challenge was coping with our unprecedented growth and sightings of the infamous Fail Whale.
These issues came to a head last June when Twitter experienced more than ten hours of downtime. However, unlike past instances of significant failure, we said at the time that that we had a long-term plan.
Last September, we began executing on this plan and undertook the most significant engineering challenge in the history of Twitter. We hope it will have a significant impact the service’s success for many years to come. During this time, the engineers and operations teams moved Twitter’s infrastructure to a new home while making changes to our infrastructure and our organization that will ensure that we can constantly stay abreast of our capacity needs; give users and developers greater reliability; and, allow for new product offerings.
This was our season of migration.
Redesigning and Rebuilding the Bird Mid-flight
Under the hood, Twitter is a complex yet elegant distributed network of queues, daemons, caches, and databases. Today, the feed and care of Twitter requires more than 200 engineers to keep the site growing and running smoothly. What did moving the entirety of Twitter while improving up-time entail? Here’s a simplified version of what we did.
First, our engineers extended many of Twitter’s core systems to replicate Tweets to multiple data centers. Simultaneously, our operations engineers divided into new teams and built new processes and software to allow us to qualify, burn-in, deploy, tear-down and monitor the thousands of servers, routers, and switches that are required to build out and operate Twitter. With hardware at a second data center in place, we moved some of our non-runtime systems there – giving us headroom to stay ahead of tweet growth. This second data center also served as a staging laboratory for our replication and migration strategies. Simultaneously, we prepped a third larger data center as our final nesting ground.
Next, we set out rewiring the rocket mid-flight by writing Tweets to both our primary data center and the second data center. Once we proved our replication strategy worked, we built out the full Twitter stack, and copied all 20TB of Tweets, from @jack’s first to @honeybadger’s latest Tweet to the second data center. Once all the data was in place we began serving live traffic from the second data center for end-to-end testing and to continue to shed load from our primary data center. Confident that our strategy for replicating Twitter was solid, we moved on to the final leg of the migration, building out and moving all of Twitter from the first and second data centers to the final nesting grounds. This essentially required us to move much of Twitter two times.
What’s more, during the migration we set a new Tweet per second record, continued to grow, launched new products, while improving the security and up-time of our service.
A Flock
The effort and planning behind this effort were huge. Vacations were put off, weekends were worked, more than a few strategic midnight oil reserves were burned in this two-stage move. The technical accomplishments by the operations and engineering teams that made this move possible were immense. Equally great, was the organization and alignment of the engineering and operations teams, their ability to create lightweight robust processes where none had existed before. Without this cohesion, this flocking of sorts, none of this would have been possible.
Though spring is here, and this particular season of migration is over, it represents more of a beginning than an ending. This move gives us the capacity to deliver Tweets with greater reliability and speed, and creates more runway to focus on the most interesting operations and engineering problems. It’s an immense opportunity to innovate and build the products and technologies that our users request and our talented engineers love to develop.
—The Twitter Engineering Team
P.S. Twitter is hiring across engineering and operations. If you want to develop novel systems that scale on the order of billions, join the flock...
Details:
http://engineering.twitter.com/2011/03/great-migration-winter-of-2011.html
|