This blog is going to talk about SQL optimization techniques in distributed environment and then general data processing products in Hadoop world.
SQL Optimization
Four stages exist in query execution:
Query decomposition
Data Localization
Global Optimization
Distributed execution
Query Decomposition: decomposition into logical operators. Validation of the query. Eliminate redundant predicates. Calculus query is restructured as algebraic query
Data Localization: This layer determines which fragments are involved in the query and transforms the distributed query into a query on fragments.
Global query Optimization:
In both centralized and distributed systems, a common heuristic is to minimize the size of intermediate relations. This can be done by performing unary operators first, and ordering the binary operators by the increasing sizes of their intermediate relations. An important heuristic in distributed systems is to replace join operators by combinations of semijoins to minimize data communication. [1]
Optimization can be done statically before executing the query or dynamically as the query is executed. Static query optimization is done at query compilation time. Thus the cost of optimization may be amortized over multiple query executions. Therefore, this timing is appropriate for use with the exhaustive search method. Since the sizes of the intermediate relations of a strategy are not known until run time, they must be estimated using database statistics. [1]
Hybrid query optimization attempts to provide the advantages of static query optimization while avoiding the issues generated by inaccurate estimates. The approach is basically static, but dynamic query optimization may take place at run time when a high difference between predicted sizes and actual size of intermediate relations is detected. [1]
Query optimization refers to the process of producing a query execution plan which represents an execution strategy for the query. This QEP minimizes an objective cost function. A query optimizer, the software module that performs query optimization is usually seen as consisting of three components: a search space, a cost model and a search strategy. The search space is the set of alternative execution plans that represent input query. These plans differ in the execution order of operations and the way these operations are implemented and therefore in their performance. The cost model predicts the cost of a given execution plan. The search strategy explores the search space and selects the best plan using the cost model.[1]
An execution strategy for a distributed query can be described with relational algebra operators and communication primitives for transferring data between sites. Query optimization consists of finding the “best” ordering of operators in the query, including communication operators that minimize a cost function. The cost function, often defined in terms of time units, refers to computing resources such as disk space, disk I/Os, buffer space, CPU cost, communication cost, and so on. Generally, it is a weighted combination of I/O, CPU, and communication costs.[1]
To select the ordering of operators it is necessary to predict execution costs of alternative candidate orderings. Determining execution costs before query execution (i.e., static optimization) is based on fragment statistics and the formulas for estimating the cardinalities of results of relational operators. Thus the optimization decisions depend on the allocation of fragments and available statistics on fragments which are recorder in the allocation schema.[1]
An important aspect of query optimization is join ordering, since permutations of the joins within the query may lead to improvements of orders of magnitude. One basic technique for optimizing a sequence of distributed join operators is through the semijoin operator. The main value of the semijoin in a distributed system is to reduce the size of the join operands and then the communication cost.[1]
Distributed Query Execution: Each subquery executing at one
site, called a local query, is then optimized using the local schema of
the site and executed. At this time, the algorithms to perform the relational
operators may be chosen.
The general formula for response time
Response time=TCPU ∗seq #insts+TI/O∗seq #I/Os
+TMSG
∗seq #msgs+TTR ∗seq #bytes
Join trees can be produced by applying the
commutativity and associativity rules is O(N!) for N relations. Investigating a
large search space may make optimization time prohibitive, sometimes much more
expensive than the actual execution time. Therefore query optimizers typically
restrict the size of the search space they consider. The first restriction is
to use heuristics. The most common heuristic is to perform selection and
projection when accessing base relations. Another common heuristic is to avoid
Cartesian products that are not required by the query. Another important
restriction is with respect to the shape of the join tree. Two kinds of join
trees are usually distinguished: linear vs bushy trees. By considering only linear
trees, the size of the search space is reduced to O(2N ). However, in a
distributed environment, bushy trees are useful in exhibiting parallelism[1]
Broadcast join: If one part of the join is a small table, broadcast join is used. This means that the small table is replicated to all nodes.
Shuffle join:
Shuffle join involves joining 2 large tables.
This does not apply to non relational systems.
There will be bucketing involved to co-locate foreign keys
Common join, Map join, auto mapjoin, Bucket map join, sort
merge bucket map join, skew join[3]
It is possible to run complex processing like R like
functions in MPP environment however, it is impossible to store data in
unstructured format in MPP architectures which has resulted in those databases
being not so flexible.
Hadooop:
Schema on read not schema on write
This changed the industry in the way that allowed users to
store data and then worry about how to analyze data.
Map Reduce
Map Reduce is a general-purpose processing framework. It was proposed by a
paper from Google. Very wide range of processing possible with map/reduce
paradigm.
MapReduce is
a programming model and an associated
implementation for processing and generating large data sets with a parallel, distributed algorithm on a cluster.
A MapReduce program is composed of a Map() procedure that performs filtering and
sorting (such as sorting students by first name into queues, one queue for each
name) and a Reduce() procedure that performs a summary operation (such as
counting the number of students in each queue, yielding name frequencies). The
"MapReduce System" (also called "infrastructure" or
"framework") orchestrates the processing by marshaling the
distributed servers, running the various tasks in parallel, managing all
communications and data transfers between the various parts of the system, and
providing for redundancy and fault tolerance
[4]
Dryad
This is a
general-purpose processing framework as well.
Dryad is an execution engine that is
optimized for certain types of distributed computation. It relies on the fact
that its inputs are immutable and finite, and the outputs of a job are not
generally available until the entire computation completes (i.e. it doesn’t support “infinite”
streaming computations) [5]
Dryad
is optimized for medium to large clusters (say, between 100 computers and
10,000 computers), and for long-running batch jobs rather than sub-second
interactive lookups.
It has mostly been used for data-intensive
computations which can benefit from streaming data to or from many disks at
once, and which can be broken into parallel stages that do not require very
frequent communications. If a program can be broken into pieces that can do
more than 20 or 30 seconds of IO or computation before needing to communicate
then it can probably be run efficiently on Dryad. The basic computational
model that Dryad uses is a directed-acyclic graph (DAG)
[5]
The approach is to create DAG of operations with data
flowing between them.
A Dryad programmer writes several sequential programs and
connects them using one-way channels. The computation is structured as a
directed graph: programs are graph vertices, while the channels are graph
edges. A Dryad job is a graph generator, which can synthesize any directed
acyclic graph. These graphs can even change during execution, in response to
important events in the computation.[5]
Dryad is notable for allowing graph vertices and
computations in general to use an arbitrary number of inputs and outputs.
Mapreduce restricts all computations to take a single input set and generate a
single output set. SQL and shader languages allow multiple inputs but generate
a single output from the user’s perspective, though SQL query plans internally
use multiple-output vertices.[6]
Different implementations exist for Dryad with Tez being
prominent among those in Hadoop world.
Resilient Distributed
Data Sets: This is a general purpose processing framework as well.
Spark is a product which implement resilient distributed data sets. Spark is able to utilize
memory for intermediate results as well as share data across DAGs. Spark allows
cyclic graphs. Also iterations is a specialty with Spark ML algorithms are a
perfect fit to the underlying architecture.
RDDs keep a delta of the operations, which lead from one RDD
to another instead of storing each RDD. This facilitates Iterative machine
learning algorithms on top of spark as well as interactive operations. However,
the capabilities are enhanced by effective use of memory to store intermediate
results.
The difference between spark and other data
processing engines are the use of memory and recovery capabilities to recover
intermediate transformations using delta processing between RDDs. Hence no need
to save intermediate state and checkpoint in order to do recovery processing.
Or in cases of Dryad/ map reduce you need to reprocess the entire operation. [7]
1 Principles of Distributed systems by M. Tamer Ozsu, Patrick Valduriez
2 http://cs.nyu.edu/courses/Fall12/CSCI-GA.2433-001/lecture4.pdf
3 https://cwiki.apache.org/confluence/download/attachments/27362054/Hive%2BSummit%2B2011-join.pdf
4 https://en.wikipedia.org/wiki/MapReduce
5 http://blogs.msdn.com/b/dryad/archive/2009/11/24/what-is-dryad.aspx
6 http://research.microsoft.com/en-us/projects/dryad/eurosys07.pdf
7 https://www.cs.berkeley.edu/~matei/papers/2012/nsdi_spark.pdf