Distributed Data Management

Database systems have been addressing the challenges of distributed computing from the very early days. The distributed computing environment has changed significantly over the years. Client/server systems are still prevalent, but cluster-based parallel environments and very wide-scale distributed systems have become prevalent spurned by the expansion of Internet (in particular the peer-to-peer (P2P) systems). Our group focuses on a number of topics that arise in managing data in each of these environments.

One project focuses on building scalable database systems using replication. To scale a system up, data are replicated and the growing workload is distributed over the replicas. However, unless the replicas are somehow synchronized, applications may not see a consistent view of the database, or they may see a stale view. Ideally, the replicas should be synchronized so that, from the point of view of the database application programs, the database system behaves as if there is only one copy of the database. This spares the applications from having to handle problems associated with inconsistent or stale data. One kind of problem that can occur is a transaction inversion. An inversion means that two transactions appear to have executed in the wrong order because the later transaction "sees" a stale replicas. Unfortunately, the illusion of a single fresh database copy can be very expensive to maintain. Thus, the challenge is to provide a single-copy illusion to the applications without losing the scalability that motivated the replication in the first place.

Our approach to the replica synchronization problem is based on the notion of application sessions, which are sequences of related database requests. For example, in a web services environment, a session might include all of the database requests that result from a single customer's use of the service. Our approach provides strong consistency and freshness guarantees within a session, but weaker guarantees across sessions. In particular, our approach ensures that inversions will never occur among the transactions in any given session. However, they may occur among transactions from different sessions. This flexibility allows the database system to provide useful guarantees to the applications while still ensuring that the system can scale.

We have developed several session-based versions of serializability, snapshot isolation, and other standard and widely used correctness criteria for concurrent access to databases. We have also developed and a collection of synchronization and concurrency control protocols that guarantee these criteria ([1], [2], [3] ).

A second project studies methods to optimize database client/server interactions. Relational database applications establish database server connections through which they issue streams of query and update requests and fetch the results of those requests. Most work on database query optimization focuses on optimizing the processing of individual requests at the database server. In contrast, we focus on optimizing the stream of client/server interactions that occurs over a connection. While it is common practice to hand-tune application programs to optimize their request streams, our approach optimizes automatically. Furthermore, optimization is transparent to the application program.

It is common for application request streams to include many small requests. For each request, there is significant latency and overhead associated with the client/server interconnection network and the layers of system interface and communications software at both ends of the connection. The overhead can be significant, in many cases dominating the total cost of the application request. Our optimization technique improves client/server performance by reducing this overhead. Specifically, we have developed a technique called semantic prefetching, which works by combining many small application database queries into fewer, larger queries ([4], [5]). This improves performance by reducing latency and per-request overhead. Semantic prefetching can be implemented in an optimizer that is external to the application and external to the DBMS. In this way, application performance can be tuned automatically to different environments, without changing the application code.

Our stream optimizer is located between a client application and the database server. It works by monitoring the stream of requests that is issued by the client, as well as the responses that are supplied by the server. As it monitors requests, the stream optimizer learns to identify recurring patterns in the stream. For example, one type of pattern is sequence of correlated queries. Once it has detected a pattern, the stream optimizer considers whether it can combine the requests in that pattern into a single, larger query. It uses a cost model to estimate whether this large query will be less expensive to execute than the original query pattern that was issued by the application. If the revision appears to be effective, the optimizer records it in a pattern rewrite database for future use.

As the optimizer continues to monitor the application's requests, it may observe a series of requests that appear to match the beginning of a pattern for which the optimizer has identified an optimized rewrite. When this occurs, the optimizer issues the larger optimized query to the server in anticipation of the application's future requests. In effect, this query efficiently prefetches the results of the sequence of queries that the optimizer expects the application to issue. If the application behaves as expected, the optimizer uses the prefetched results to provide results for the application's queries. In this case, there is no need to send the application's queries to the server. If the application does not behave as predicted, the optimizer can simply ignore the prefetched results and instead forward the application's actual queries to the server. In this case, the prefetch represents wasted work. However, the application will always receive correct query results.

We have studied a variety of database applications, and have demonstrated empirically that semantic prefetching can reduce overhead on both the client side and the server side. Semantic prefetching also reduces request latencies by reducing the number of client/server round trips. This is particularly beneficial for high-latency interconnects. The stream optimizer effectively tunes a database application to its execution environment without changing the application code.

In a number of projects we are investigating the data management issues in Internet-scale (i.e., very large scale) distributed environments. One of these projects focuses on the management of stream data. A growing list of emerging applications receive and process data as a sequence (stream) of items. Examples include sensor networks, financial tickers and other on-line Web information sources, transaction log analysis, and Internet traffic measurement. A data stream is a real-time, continuous, ordered sequence of items. These applications have database requirements that are significantly different than traditional DBMSs that manage persistent data and complex querying. In contrast, stream data usually needs to be accessed in real-time (i.e., they are transient) and queries over data streams are expected to run continuously and maintain up-to-date answers as new data arrive. As a result, continuous query processing involves many challenges, such as adaptive re-optimization of query plans in response to changing stream arrival rates, sharing resources among similar queries, and generating approximate answers in limited space. Additionally, some continuous queries are not computable in bounded memory (e.g., Cartesian product of two infinite streams), some relational operators are blocking because they consume the entire input before any results are produced (e.g., join and aggregation), and some users may be interested only in the most recent data. These problems may be solved by restricting the range of continuous queries to a sliding window of manageable size. For example, an Internet traffic monitoring system may calculate the average Round Trip Time (RTT) over a one-minute sliding window to determine an appropriate value for the TCP timeout. A sliding window RTT average is appropriate because it emphasizes recent measurements and acts to smooth out the effects of any sudden changes in network conditions. Consequently, the objective of this project is to study query languages over windowed data streams and their efficient management.

An important characteristic of continuous queries is that their inputs and outputs evolve over time. New answers are produced in response to the arrival of new data and older data expire as the windows slide forward. Furthermore, some previously reported answers might cease to satisfy the query at some point. We define an update pattern of a continuous query as the order in which its results are produced and deleted over time [6]. We introduced the notion of update-pattern-aware modeling and processing of continuous queries. In particular, we showed that update-pattern awareness leads to a better understanding of continuous query semantics and a more efficient implementation of a continuous query engine.

Implementation of streaming operators raises issues such as non-blocking behaviour and possibly infinite memory requirements. Furthermore, sliding window operators must deal with the fact that as the window slides forward, old results must be removed in addition to appending new results generated by newly arrived tuples. We have developed non-blocking algorithms for sliding window joins, as well as a join ordering heuristic that considers stream arrival rates, window sizes, and frequency of query re-evaluation [7]. Various window-specific issues that arise during join execution were investigated, such as immediate or periodic generation of new results, immediate or periodic expiration of stale tuples out of the windows and hash tables, and expiration techniques for time-based and count-based windows. We have also considered the issue of incremental evaluation of sliding window aggregates without scanning the entire window during updates. Distributive aggregates such as SUM and COUNT can be easily updated: when a new item arrives in the window, the count is incremented by one; when an item expires, the count is decremented by one. However, complex aggregates, such as finding the k most frequent items or all items that occur with a frequency above some threshold, may need access to the entire window in the worst case. We have developed an algorithm for incrementally finding frequently occurring items in sliding windows, provided that the distribution of item types follows the power law (at least approximately) [8]. We also presented algorithms for incrementally finding frequent items in sliding windows with multinomially-distributed item frequencies [9].

An important issue in indexing data streams and sliding windows is that the overhead of continuous insertions (and possibly deletions) may be too high. We developed a main-memory index for sliding windows, where updates are performed in bulk in order to minimize maintenance costs [10]. We also considered indexing the results of sliding window queries on disk, which is useful in applications that intercept massive amounts of streaming data, perform some light-weight processing on-the-fly, and periodically append newly arrived data to a disk-based archive. The archive maintains a sliding window of recent results and facilitates more complex off-line processing. Examples include network traffic analysis, where the archive is mined by an Internet Service Provider (ISP) in order to discover usage patterns and plan changes to the network infrastructure; transaction logging, where point-of-sale purchase data are streamed into a data warehouse to examine customer behaviour and identify credit card fraud; and networks of sensors that measure physical phenomena such as air temperature, where the recent data could be used to discover trends and make predictions.

Our current work in this area is focused on the concurrent processing of queries (reads) over windows while data are added and removed from these windows (writes) [11]. This is required in order to allow flexible, prioritized query scheduling, and to improve the freshness of answers. In this environment the traditional notion of conflict serializability is insufficient as a correctness criterion and there is a need for stronger isolation levels that restrict the allowed serialization orders. We are also designing a transaction scheduler that efficiently enforces the new isolation levels by taking advantage of the access patterns of sliding window queries. A related problem that is under investigation is the scheduling of (multiple) similar continuous queries to allow resource sharing among them. The fundamental complication (and what separates this case from earlier works on multi-query evaluation) is that the queries may be similar, but if they may be re-evaluated with different frequencies and, therefore, get scheduled at different times. The solutions we are investigating are based on a model that considers queries to be periodic tasks, which must be scheduled with desired frequencies. Using this model, it is possible to apply modified versions of scheduling algorithms such as earliest-deadline-first to determine when queries should be executed, where the modifications allow scheduling of similar queries concurrently.

Peer-to-Peer (P2P) networks emerge as the next-generation Internet infrastructure, which are special in their scale, dynamism, and heterogeneity. These characteristics force the reconsideration of many of the data management problems and their solutions. We are working on an interesting and important problem within this context, namely rank-aware query processing in P2P systems. Rank-aware query processing is a general problem, but the casting of this problem within the context of P2P distributed systems poses new issues. First, P2P networks are highly distributed, requiring the distribution of indexes as well as data (no centralized index to assist in the computation). Second, P2P networks are large and usually involve heterogeneous sub-networks and peers with differing network bandwidths and processing capacities, so efficient coordination among peers are hard to achieve and communication needs to be minimized. Third, peers join and leave the network dynamically (which may cause dynamic addition or deletion of data), so top-k answers are subject to change before the completion of computation, which requires special attention to the correctness and availability of top-k answers in real time.

There have been some recent studies of the problem in P2P networks. The differentiating characteristic of our work is the focus on structured, dynamic has table (DHT)-based P2P networks. DHTs have the advantage of a proven routing performance over large-scale P2P networks. We are currently addressing the following research issues:

  1. Multicast-based processing. Consider a top-k selection query over a relational table T under a uniform hashing-based DHT (e.g., Chord), where tuples belonging to T are allocated to peers all over the network. To conduct top-k query processing over this setting requires a multicast strategy to deliver queries to all peers that contain tuples from T. Although most uniform hashing-based DHT protocols provide spanning tree-based multicast (e.g., binomial spanning tree for Chord, and k-ary spanning tree for CAN), the routing of the query cannot avoid irrelevant peers, which leads to O(N) messages in the worst case, where N as the number of peers in the network. Although strategies exist that decrease communication cost by attaching uniform hash values of table names as prefixes of the original tuple hash values (thus, tuples belonging to the same table will be allocated onto neighboring peers), these need sophisticated load balancing mechanisms to avoid skewed data allocation. We are studying techniques to improve multicast performance in a DHT setting without requiring sophisticated load balancing mechanisms. Based on the improved multicast, top-k queries can be delivered to all relevant tuples and top ranked ones can be collected in a straightforward way.
  2. Enforce ordering indexes. Although multicast provides a basic way to deal with top-k query processing, we can further facilitate query processing if unnecessary data and peers can be pruned by exploiting partial ordering information over scoring attributes. Many pruning strategies have been studied within the context of middleware (e.g., Web service), but they assume that the ordering information of object (e.g., tuple) ids is available over all or part of the scoring attributes. In a self-organizing P2P environment, the ordering facility does not come free, and it is necessary to build such indexes explicitly and deploy them in a decentralized way. Furthermore, it is nontrivial to develop pruning strategies based on distributed indexes, since network communication and coordination in P2P networks are expensive. Thus we are investigating the development of indexing and pruning techniques that satisfy three conditions: first, for load balancing, the partial ordering indexes should be deployed and accessed in a decentralized way; second, for good performance, the network communication and coordination should be kept minimum; and third, for generality, the pruning method needs to work effectively on all commonly-used scoring functions.
  3. Exploiting locality information. DHT protocols normally allocate tuples to "neighboring" peers by following the order of their attribute values. Since scoring functions in top-k query processing deal with more than one attribute, it is necessary to preserve locality on multiple attributes. Existing proposals that address locality over multiple attributes implicitly provide a tuple-wise ordering index, which is different from the column-wise indexes addressed above. Accordingly, the pruning method will be different. Development of pruning techniques that balance the pruning power against the coordination communication cost is a nontrivial research problem.
  4. Range query-based processing. As discussed above, it may be possible to employ ordering index provided by the locality preserving DHTs to prune irrelevant peers. However, not all locality preserving DHTs provide such an ordering index in a direct way. Nevertheless, these DHTs all provide support for range queries. Therefore, an interesting approach is to transform top-k queries into range queries. Existing approaches in this vein (e.g., Cutoff Parameter approach) rely on global statistical information on data distribution, which is hard to obtain in P2P networks. So, under this setting, a primary problem is how to diffuse the statistical information so as to help transform top-k queries into range queries in a distributed environment without incurring high communication cost. Once the transformation is done, query processing can be performed using range queries.

References

  1. K. Daudjee and K. Salem. "Lazy Database Replication with Ordering Guarantees, In Proc. 20th International Conference on Data Engineering, 2004, pages 424-435.
  2. K. Daudjee and K. Salem. "A Pure Lazy Technique for Scalable Transaction Processing in Replicated Databases," In Proc. International Conference on Parallel and Distributed Systems, 2005.
  3. K. Daudjee and K. Salem. "Inferring a Serializatin Order for Distributed Transactions ," In Proc. 22nd International Conference on Data Engineering, 2006.
  4. I. T. Bowman and K. Salem. "Optimization of Query Streams Using Semantic Prefetching," In Proc. ACM SIGMOD International Conference on Management of Data, 2004, pages 179-190.
  5. I. T. Bowman and K. Salem. "Optimization of Query Streams Using Semantic Prefetching," ACM Transactions on Database Systems, 30(4): 1056 - 1101, December 2005.
  6. L. Golab and M. T. Özsu. "Update-Pattern-Aware Modeling and Processing of Continuous Queries," In Proceedings of the ACM SIGMOD Conference on Management of Data, June 2005, pages 658-669.
  7. L. Golab and M. T. Özsu. "Processing Sliding Window Multi-Joins in Continuous Queries over Data Streams," In Proceedings of the 29th International Conference on Very Large Databases , September 2003, pages 500-511.
  8. L. Golab, D. DeHaan, E. D. Demaine, A. Lopez-Ortiz, and J. I. Munro. "Identifying Frequent Items in Sliding Windows over On-line Packet Streams," In Proceedings of the 3rd ACM SIGCOMM Conference on Internet Measurement , October 2003, pages 173-178.
  9. L. Golab, D. DeHaan, A. Lopez-Ortiz, E. D. Demaine. "Finding Frequent Items in Sliding Windows with Multinomially-Distributed Item Frequencies," In Proceedings of the 16th International Conference on Scientific and Statistical Database Management , June 2004, pages 425-426.
  10. L. Golab, S. Garg and M. T. Özsu. "On Indexing Sliding Windows over Online Data Streams," In Proceedings of the 9th International Conference on Extending Database Technology , March 2004, pages 712-729.
  11. L. Golab, K. G. Bijay, M. T. Özsu. "On Concurrency Control in Sliding Window Queries over Data Streams," In Proc. 10th Int. Conf. on Extending Database Technology, 2006, pages 608-626.

Related Links

Ihab Ilyas's research

M. Tamer Özsu's research

Ken Salem's research



Campaign Waterloo

Database Research Group
David R. Cheriton School of Computer Science
University of Waterloo
Waterloo, Ontario, Canada N2L 3G1
Tel: 519-888-4567
Fax: 519-885-1208

Contact | Feedback: db-webmaster@cs.uwaterloo.ca | Database Research Group


Valid HTML 4.01!Valid CSS! Last modified: Wednesday, 14-Jun-2006 03:15:46 EDT


Menu:ShowHide