Hadoop – Part 1

Posted on November 1, 2010

Hadoop is an extremely powerful and popular framework for large-scale application and data distribution. Companies like Amazon, Twitter, Rakuten and Facebook deploy Hadoop across clusters of literally thousands of machines crunching petabytes of data under thousands of processing cores. We’re going to take an in depth look at Hadoop over a series of articles, but in this first stab let’s get comfy with some of the buzz words needed to fully appreciate how Hadoop does all that it does.  I’m going to put my own spin on each category, but each heading will hyperlink to a Wikipedia entry for further reading. At times, too, we’ll paraphrase key passages from some well-established sources – which we’ll of course link to for your reference.  So off we go, let’s take the first steps on our journey into Hadoop!

Parallel Processing / Computing

Good old math!!! If it takes one person 10 days to do one job, how long will it take for 2 people of the same working caliber to do that same job? This is the thrust of parallel processing. Strictly speaking, parallel processing is the simultaneous processing of the same task on two or more microprocessors in order to obtain faster results. Computer resources can include a single computer with multiple processors, or a number of computers connected by a network, or a combination of both. The processors access data through shared memory. Some supercomputer parallel processing systems have hundreds of thousands of microprocessors.

A successful implementation of parallel computing involves two things apart from having a strong distributed processing system:

  • Tasks should be structured in such a manner that they can be executed at the same time.
  • The sequence of tasks which must be executed one after the other should be maintained.

Parallel processing can substantially reduce the time required to complete a project, coming in particularly handy for projects requiring complex computations such as weather modeling and digital special effects.

A few years ago people were remarking about how parallelism “is the future of computing,” and it’s safe to say this has now come to pass. Parallel processing has not just taken off among the geek fringe and supercomputing crowd. Commercial software engineers have begun taking advantage of the multiple processor cores increasingly found in consumer level machines, so while the average user may not be aware of it, parallel computing is quickly entering the mainstream.

Concurrent Computing

This is not so different from parallel processing. Essentially, concurrent computing is a collection of processes that can interact with each other and run in parallel. Potentially more powerful the parallel processing, the implementation can be rather more complicated. The biggest concern when writing concurrent programs is to make sure that the sequence and the interaction between programs are happening properly. By properly we mean 1) in the correct order and at the correct time, 2) with appropriate concurrent processes, and 3) allowing appropriate access to shared resources.

To maximize the computational power of parallelism, concurrent programs should be run on machines with multiple core processors or across networked machines, but concurrent programs can also be executed on machines with single core CPUs. We can think of concurrent computing as a kernel core performing several things simultaneously and seamlessly, despite being run under even a single core CPU. Therefore single core CPUs will be able to perform the same types of tasks as multi-core CPUs, just they’ll take (potentially much, much, much) more time.

Along with single versus multiple core CPUs, several other factors play a vital role in determining computational power, such as shared memory / message passing (for communication between concurrent threads / processes / pieces of code).

Application throughput and high responsiveness are the key pain points addressed when opting for concurrent computing. But while parallel and concurrent computing may be fascinating to study and offer incredibly powerful programming opportunities, they’re terrifically complex and hard to implement for the uninitiated. I would strongly suggest exploring parallelism only after determining that you can’t make do with traditional techniques.

Distributed Processing / Computing

Distributed processing / computing is much the same as parallel and concurrent processing, the only difference being multiple computers connect through a network to achieve the goal.

Google File System (GFS)

Google File System (GFS) is a proprietary technology designed by Google to meet their ever-increasing data processing demands. Without GFS, Google wouldn’t be able to crawl the Internet, index it, do keyword/content analysis, run website analytics, perform search, all the while knowing who is searching for what and smart-linking to advertising. GFS is what lets us run a Google search for, say, “Google File System” and get about 57,000,000 results in 0.13 seconds!!!  (YMMV.)

In fact it’s not much different than its predecessors (distributed file systems) in terms of performance, scalability, reliability and of course data redundancy.

Imagine the scale of the job: getting an application to process at speeds of around 800 to 1000 requests per second: database designing, caching, indexing, memory leaks, architectural constraints, horizontal and vertical scaling, code optimizations, server tweaking, and for sure a few hundred other things. How they can process this kind of load, all on the fly – nearly instantaneously – is a testament to the power of GFS. In terms of systems architecture, here is how they do it: Google File System, Big Table, and Map Reducer.

In order to digest the rest of this explanatory section on GFS, you’ll need a basic familiarity with the Master-Slave computing concept, in-memory database structures, metadata, data replication, GC (Garbage Collection), balancing and fault tolerance.

GFS has a Single Master and multiple chunk servers that will be accessed by several clients. Generally these machines/servers are made up of inexpensive commodity hardware, based on the capacity of the system, the chunk server, and whether the client can run on the same system.

Below is an extract from Google’s own paper detailing the Google File System.

Master will take care of assigning an immutable and globally unique 64 bit chunk handle at the time of chunk creation. Chunk servers store chunks on local disks as Linux files and read or write chunk data specified by a chunk handle and byte range. For reliability, each chunk is replicated on multiple chunk servers. By default, GFS stores three replicas, though users can designate different replication levels for different regions of the file namespace.

The master will maintain all file system metadata. This includes the namespace, access control information, the mapping from files to chunks, and the current locations of chunks. It also controls system-wide activities such as chunk lease management, garbage collection of orphaned chunks, and chunk migration between chunk servers. The master periodically communicates with each chunk server in HeartBeat messages to give it instructions and collect its state.

GFS client code linked into each application implements the file system API and communicates with the master and chunk servers to read or write data on behalf of the application. Clients interact with the master for metadata operations, but all data-bearing communication goes directly to the chunk servers.

Neither the client nor the chunk server caches file data. Client caches offer little benefit because most applications stream through huge files or have working sets too large to be cached. Not having them simplifies the client and the overall system by eliminating cache coherence issues. (Clients do cache metadata, however.) Chunk servers need not cache file data because chunks are stored as local files and so Linux’s buffer cache already keeps frequently accessed data in memory.

(click -> big)

Image Credit: gfs-sosp2003.pdf

GFS faces several challenges, such as enabling concurrent appends to the files, reading large sets of streaming data, random reads and processing huge data/storing huge data in sequential manner.

In-memory data structure was required to achieve faster garbage collection and load balancing. Master will take care of scanning the state of each chunk server periodically and take care of Garbage Collection and load balancing. These smaller chunks reduce the amount of metadata required to be stored.

Activities of a Master include Namespace management locking, replica management replacements, rebalancing, Garbage Collection, stale replica deduction and management.

To sum up, GFS offers:

  • Fast recovery – but how? Using checkpoints.
  • Chunk replications – to achieve high availability.
  • Master replications – for reliability.

But enough about GFS for now. For further details, please take the afternoon and study Google’s GFS overview: gfs-sosp2003.pdf


BigTable is a distributed storage system for managing structured data. It’s been designed to scale to a very large size (call it zeta bytes of data across several thousands of commodity servers). It’s a database management system (DBMS), yet does not follow the principles of traditional DBMS concepts; it has millions of columns that can differ depending on context.

BigTable uses the GFS distributed File System to store data files. BigTable was designed with the thought in mind that even hundreds of thousands of servers might not be sufficient so it should be easy to scale horizontally by just adding more servers when required.

BigTable serves over 60 of Google’s own products and projects, including Analytics, Indexing, and Google Earth, each requiring their own massive databases, latency, high performance, etc.

BigTable’s characteristics are detailed in the official BigTable document, but in a nutshell it’s a sparse, distributed, persistent multi-dimensional sorted map. It has a row-key, column-key and timestamp.

It’s hard to think in terms of a column-oriented database after using traditional DBMS systems. New users will need to get familiar with Row Key, Column Families, Timestamps and flush out their existing DBMS knowledge to start designing DBMS for column-oriented databases.

Figure 2. (row:string, column:string, time:int64) → string

Image Credit: bigtable-osdi06.pdf

Rows in a BigTable are uniquely identified by the row key. BigTable maintains data in lexicographic order by the row key. The row range is dynamically partitioned. Each range is called a tablet (used for the load balancing). Tables are split into multiple tablets – which can be thought of as segments of the table. Each tablet will be around 200MB approximately. When the size grows beyond the specified limit, compression takes place. There are multiple level hierarchies (analogous to a B+ tree) that store the Tablet information itself.

The first level is known as the Chubby file, which contains the location of the Root tablet. The Root tablet contains the location of all tablets in a special METADATA tablet, which in turn contains the location of a set of user tables. (Since the Root tablet is the first tablet among METADATA tablets, it gets special treatment.)

Column keys are grouped into sets known as column families, forming the basic unit of access control. A column family has to be created before the data is stored into the family. Column keys are named as in ColumnFamily:Qualifier.

Each column key in an anchor family (as shown in the Figure 3) represents a single anchor. Qualifier is the name of the referring site, and cell content is the link text.

Timestamps are used for versioning the data in a specific table in BigTable. Each cell in BigTable can contain multiple versions of some specific data. These are distinguished by the timestamp.

Finally, BigTable also provides an API to create, delete and look up values.


MapReduce is a programming model for processing and generating large datasets in a distributed processing environment.

The underlying architecture in systems like GFS and Hadoop are capable enough to make this kind of program (MapReduce) run in a parallel mode on a large number of computers (known as clusters). GFS and Hadoop take care of splitting the input data and creating as many mapper and reducer steps as required (self-configuring the number of mappers and reducers is also possible), schedules the program executions, handling machine failures, and inter-machine communications. The terms Map and Reduce come from the primitives present in Lisp (and other functional languages).

Input for the MapReduce programs is by way of a set of key/value pairs, which in turn produces a set of output key/value pairs.

Map: a piece of a program written by the user that takes an input pair and produces an intermediate key/value pair output. The Library takes care of splitting the huge data file into smaller chunks and sending it to different machines and receiving the intermediate outputs and combining them with other intermediate outputs of the same Key. The master node handles sending the data to worker nodes (until all the data is processed).

Reduce: another piece of a program written by the user, which takes the intermediate key and a set of values for that key and merges them together to form a smaller set of values. Aggregating all these output values (combined ones) will form the final output of the problem for the given data.

The big advantage of MapReduce is that it allows distributed processing of mappers and reducers. The MapReduce library (provided by GFS / Hadoop) will take care of the situation when a processing node (map or reduce) fails; it automatically reschedules the map and reduce task.

Following is a sample pseudo-code for map-reducer:

Lets count the number of occurrences of each word in a large collection of documents.

The Map function emits each word and a number (the count). Reducer simply receives the word and adds the count to the total count for each word. In the case of Map output, key is the word and count is the value.

The Reducer simply gets the key and the value and keeps adding to the value associated with the key, as in:

map (k1, v1) -> list (k2, v2)

reduce(k2, list(v2)) -> list (v2)

Figure 4

Figure 5, showing the execution flow of a typical MapReduce program (in terms of GFS).

Step1: MR Library splits the huge data file into multiple chunks of 16 to 64 MB of data and starts as many copies of programs as required on the cluster.

Step2: One Copy acts as the Master. The rest are workers. The master has the information about which machine is idle and which is getting processed.

Step3: Get the input content, which is allocated to the worker.

Step4: Store the output into the local disk.

Step5: The master notifies the reducer whether the map output is available.

Step6: The reducer iterates the sorted intermediate data and produces an output that is appended to the final output file.

Step7: Master wakes up the user program when all the map/reduce programs are completed.

The Master node:

The Master node keeps track of each map and reducer and the tasks assigned to them, intermediate outputs, the states of tasks assigned to each map/reduce instance (forks) and the status of the worker machines.

The good thing about these MR libraries is that even if the Master task dies, it has created several checkpoints for a given job, so at next launch it won’t start a fresh job but rather pick up from the last checkpoint.

Phew!! All right, we’ve seen enough to lay the groundwork for more in depth discussion of Hadoop, so let’s take a breather before posting up the 2nd article in the Hadoop series.  Until then, if you’d like more information be sure to check out the following links or post a question in the comments.

Kannan R.
Sourcebits Project Manager















Tags: , , , , , , , ,

Leave a Reply

You must be logged in to post a comment.