Tuesday, July 14, 2015

Distributed Data and Query Processing

This blog is going to talk about SQL optimization techniques in distributed environment and then general data processing products in Hadoop world.
SQL Optimization
Four stages exist in query execution:
Query decomposition
Data Localization
Global Optimization
Distributed execution
Query Decomposition: decomposition into logical operators. Validation of the query. Eliminate redundant predicates. Calculus query is restructured as algebraic query
Data Localization: This layer determines which fragments are involved in the query and transforms the distributed query into a query on fragments.
Global query Optimization
In both centralized and distributed systems, a common heuristic is to minimize the size of intermediate relations. This can be done by performing unary operators first, and ordering the binary operators by the increasing sizes of their intermediate relations. An important heuristic in distributed systems is to replace join operators by combinations of semijoins to minimize data communication. [1]
Optimization can be done statically before executing the query or dynamically as the query is executed. Static query optimization is done at query compilation time. Thus the cost of optimization may be amortized over multiple query executions. Therefore, this timing is appropriate for use with the exhaustive search method. Since the sizes of the intermediate relations of a strategy are not known until run time, they must be estimated using database statistics. [1]
Hybrid query optimization attempts to provide the advantages of static query optimization while avoiding the issues generated by inaccurate estimates. The approach is basically static, but dynamic query optimization may take place at run time when a high difference between predicted sizes and actual size of intermediate relations is detected. [1]
Query optimization refers to the process of producing a query execution plan which represents an execution strategy for the query. This QEP minimizes an objective cost function. A query optimizer, the software module that performs query optimization is usually seen as consisting of three components: a search space, a cost model and a search strategy. The search space is the set of alternative execution plans that represent input query. These plans differ in the execution order of operations and the way these operations are implemented and therefore in their performance. The cost model predicts the cost of a given execution plan. The search strategy explores the search space and selects the best plan using the cost model.[1]
An execution strategy for a distributed query can be described with relational algebra operators and communication primitives for transferring data between sites. Query optimization consists of finding the “best” ordering of operators in the query, including communication operators that minimize a cost function. The cost function, often defined in terms of time units, refers to computing resources such as disk space, disk I/Os, buffer space, CPU cost, communication cost, and so on. Generally, it is a weighted combination of I/O, CPU, and communication costs.[1]
To select the ordering of operators it is necessary to predict execution costs of alternative candidate orderings. Determining execution costs before query execution (i.e., static optimization) is based on fragment statistics and the formulas for estimating the cardinalities of results of relational operators. Thus the optimization decisions depend on the allocation of fragments and available statistics on fragments which are recorder in the allocation schema.[1]
An important aspect of query optimization is join ordering, since permutations of the joins within the query may lead to improvements of orders of magnitude. One basic technique for optimizing a sequence of distributed join operators is through the semijoin operator. The main value of the semijoin in a distributed system is to reduce the size of the join operands and then the communication cost.[1]

Distributed Query Execution: Each subquery executing at one site, called a local query, is then optimized using the local schema of the site and executed. At this time, the algorithms to perform the relational operators may be chosen.
The general formula for response time
Response time=TCPU seq #insts+TI/Oseq #I/Os +TMSG seq #msgs+TTR seq #bytes
Join trees can be produced by applying the commutativity and associativity rules is O(N!) for N relations. Investigating a large search space may make optimization time prohibitive, sometimes much more expensive than the actual execution time. Therefore query optimizers typically restrict the size of the search space they consider. The first restriction is to use heuristics. The most common heuristic is to perform selection and projection when accessing base relations. Another common heuristic is to avoid Cartesian products that are not required by the query. Another important restriction is with respect to the shape of the join tree. Two kinds of join trees are usually distinguished: linear vs bushy trees. By considering only linear trees, the size of the search space is reduced to O(2N ). However, in a distributed environment, bushy trees are useful in exhibiting parallelism[1]



Broadcast join: If one part of the join is a small table, broadcast join is used. This means that the small table is replicated to all nodes.
Shuffle join:   
Shuffle join involves joining 2 large tables. 
This does not apply to non relational systems.
There will be bucketing involved to co-locate foreign keys
Common join, Map join, auto mapjoin, Bucket map join, sort merge bucket map join, skew join[3]

It is possible to run complex processing like R like functions in MPP environment however, it is impossible to store data in unstructured format in MPP architectures which has resulted in those databases being not so flexible.

Hadooop:
Schema on read not schema on write
This changed the industry in the way that allowed users to store data and then worry about how to analyze data.

Map Reduce
Map Reduce is a general-purpose processing framework. It was proposed by a paper from Google. Very wide range of processing possible with map/reduce paradigm.
MapReduce is programming model and an associated implementation for processing and generating large data sets with paralleldistributed algorithm on a cluster.
A MapReduce program is composed of a Map() procedure that performs filtering and sorting (such as sorting students by first name into queues, one queue for each name) and a Reduce() procedure that performs a summary operation (such as counting the number of students in each queue, yielding name frequencies). The "MapReduce System" (also called "infrastructure" or "framework") orchestrates the processing by marshaling the distributed servers, running the various tasks in parallel, managing all communications and data transfers between the various parts of the system, and providing for redundancy and fault tolerance
[4]           

Dryad 
This is a general-purpose processing framework as well.
Dryad is an execution engine that is optimized for certain types of distributed computation. It relies on the fact that its inputs are immutable and finite, and the outputs of a job are not generally available until the entire computation completes  (i.e. it doesn’t support “infinite” streaming computations) [5]
Dryad is optimized for medium to large clusters (say, between 100 computers and 10,000 computers), and for long-running batch jobs rather than sub-second interactive lookups.
It has mostly been used for data-intensive computations which can benefit from streaming data to or from many disks at once, and which can be broken into parallel stages that do not require very frequent communications. If a program can be broken into pieces that can do more than 20 or 30 seconds of IO or computation before needing to communicate then it can probably be run efficiently on Dryad. The basic computational model that Dryad uses is a directed-acyclic graph (DAG)
[5] 
The approach is to create DAG of operations with data flowing between them.


A Dryad programmer writes several sequential programs and connects them using one-way channels. The computation is structured as a directed graph: programs are graph vertices, while the channels are graph edges. A Dryad job is a graph generator, which can synthesize any directed acyclic graph. These graphs can even change during execution, in response to important events in the computation.[5]

Dryad is notable for allowing graph vertices and computations in general to use an arbitrary number of inputs and outputs. Mapreduce restricts all computations to take a single input set and generate a single output set. SQL and shader languages allow multiple inputs but generate a single output from the user’s perspective, though SQL query plans internally use multiple-output vertices.[6]

Different implementations exist for Dryad with Tez being prominent among those in Hadoop world.

Resilient Distributed Data Sets: This is a general purpose processing framework as well.
Spark is a product which implement resilient distributed data sets. Spark is able to utilize memory for intermediate results as well as share data across DAGs. Spark allows cyclic graphs. Also iterations is a specialty with Spark ML algorithms are a perfect fit to the underlying architecture.
RDDs keep a delta of the operations, which lead from one RDD to another instead of storing each RDD. This facilitates Iterative machine learning algorithms on top of spark as well as interactive operations. However, the capabilities are enhanced by effective use of memory to store intermediate results.
The difference between spark and other data processing engines are the use of memory and recovery capabilities to recover intermediate transformations using delta processing between RDDs. Hence no need to save intermediate state and checkpoint in order to do recovery processing. Or in cases of Dryad/ map reduce you need to reprocess the entire operation. [7]


Principles of Distributed systems by M. Tamer Ozsu, Patrick Valduriez
2 http://cs.nyu.edu/courses/Fall12/CSCI-GA.2433-001/lecture4.pdf
3 https://cwiki.apache.org/confluence/download/attachments/27362054/Hive%2BSummit%2B2011-join.pdf
4 https://en.wikipedia.org/wiki/MapReduce 
5 http://blogs.msdn.com/b/dryad/archive/2009/11/24/what-is-dryad.aspx
6 http://research.microsoft.com/en-us/projects/dryad/eurosys07.pdf
7 https://www.cs.berkeley.edu/~matei/papers/2012/nsdi_spark.pdf

No comments: