Víctor Cuevas Vicenttí[email protected]
DATA STREAM QUERY PROCESSING THROUGH SERVICES COORDINATION
http://optimacs.imag.fr
OUTLINEContext and motivation
Stream data services and queries
Service coordination for query evaluation
Service-based query processor
Experimentation
2
QUERYING STREAM DATA SERVICES
3
stream data services
productsbidspersons
QA
STREAM DATA SERVICES
4
Timestamp value added at arrival time
{ "person_id":0, "name":"Luitpold Martucci", "phone":"+56(52)3418151", "email":"[email protected]", "profile": { "interests":[{"category":282}], "income":59178.78, "age":35, "gender":"male", "education":"High School" } }
{ "category":202, "interval":{ "start":1886, "end":53879 }, "seller_person":21, "quantity":9, "type":"Regular", "itemref":9, "open_auction_id":9}
{ "person_ref":7, "time":3306, "bid":221.00, "open_auction_id":8}
15/30sec
3/30sec2/30sec
DATA MODEL (JSON)
5
subscribe(t) → { tuple( att1:val1, att2:val2,… ) }
stream { tuple1, tuple2, tuple3, tuple4, …}
• Types: atomic values, nested tuples and lists
τ ::= c | (A: τ, . . . ,B: τ’ ) | [τ, . . . , τ’ ]
{ "product_id": 749437-37, "name":"Pac-man arcade", "base_price":421.00, "tags": [ "games ", "retro", "electronics" ] "details": {
"seller_id": 9735, "auction_date": "12-01-
2010"}
}
EXAMPLE QUERY
6
For the last 30 persons and 30 products offered, retrieve the bids of the last 20 seconds greater than 15 euros
• Data processingCorrelation
Filtering
Temporality
DATA PROCESSING TASKS
7
Name : Bob Id : 1
Name : Mike Id : 2
Name : Alice Id : 3
Name : Jane Id : 4
Bidder : 3 Amount : 12
Bidder : 1 Amount : 29
Bidder : 4 Amount : 38
Bidder : 2 Amount : 10
CorrelationFiltering
> 15last n
Temporality
QUERY EXPRESSION
8
For the last 30 persons and 30 products offered, retrieve the bids of the last 20 seconds greater than 15 euros
SELECT bidstream.person_ref, bidstream.open_auction_id, bidstream.bid
FROM bidstream [RANGE 20], auctionstream [ROWS 30], personstream [ROWS 30]
WHERE bidstream.open_auction_id = auctionstream.open_auction_id AND auctionstream.seller_person = personstream.person_id AND bidstream.bid > 15;
• Declarative query language
• SQL-like + streams (≈CQL)
QUERY COORDINATION
9
bid
product
person
⋈⋈
σ
π
[tuple win]
[tuple win]
[time win]
For the last 30 persons and 30 products offered, retrieve the bids of the last 20 seconds greater than 15 euros
SELECT bidstream.person_ref, bidstream.open_auction_id, bidstream.bidFROM bidstream [RANGE 20], auctionstream [ROWS 30], personstream [ROWS 30]WHERE bidstream.open_auction_id = auctionstream.open_auction_id AND auctionstream.seller_person = personstream.person_id AND bidstream.bid > 15;
OUTLINEContext and motivation
Stream data services and queries
Service coordination for query evaluation
Service-based query processor
Experimentation
10
COORDINATION-BASED EVALUATION
11
bid
product
person
⋈⋈
σ
π
[tuple win]
[tuple win]
[time win]
person1
product1
bid1
person1'
product1'
bid1'
joinPr1B1
joinPrBP
selPrBP
projPrBP
Data access Communication Computation
QUERY COORDINATIONWorkflow of activities
Data access Data processing Parallel and sequential composition
Activity → subworkflow of activities → service coordination Calls to computing services Queue-based communication Access/modify local data
Computing services Data processing (e.g. indexation) Calculations (e.g. average)
12
COORDINATION-BASED EVALUATION
13
bid
product [tuple win]
[time win]
Data access Communication Computation
Index service 1
Index service 2
⋈ Act 1
Act 3
Act 2
Act 4
product1
bid1
product1
bid1
product1bid1 prod_bid1
OUTLINEContext and motivation
Stream data services and queries
Service coordination for query evaluation
Service-based query processor
Experimentation
14
DEMO
15
EXPERIMENT ARCHITECTURE
16
Query processor
CompServices
stream 1 stream 2 stream n
MultiStream Server
<<SOAP access>>
EXPERIMENT ARCHITECTURE
17
person product bid
Multi Stream Server
Gateway Gateway Gateway
DataStream DataStream
Stream Operator Stream Operator Stream Operator
Query Processor
subscribe (SOAP)
notify
data (SOAP)
buffer
DataStream
DATA AND COMPUTATION SERVICESNEXMark person, product (auction), and bid data
stream services
Query operators supported by computation services
18
Query operator Computation service
Tuple-based window Simple queue service
Time-based window Calendar queue service
Join Hash index service
QUERY PROCESSOR IMPLEMENTATIONSelection, projection, join, tuple and time-
based windows
Query language similar to CQL*
Service coordination specified through standard Java code
Domain specific language for service coordination under implementation
19
*Arasu, A., Babu, S., and Widom, J. 2006. The CQL continuous query language: semantic foundations and query execution. The VLDB Journal 15, 2 (Jun. 2006), 121-142.
QUERY PROCESSOR ARCHITECTURE
20
StreamOp
[Join]Op
Query Parser
Evaluation PlanConstructor
Query Executor
Scheduler
CompServices
stream 1 stream 2 stream n
<<uses>>
<<SOAP access>>
<<AST>>
<<Eval. Plan>>
<<SOAP access>>
EXPERIMENT: SUMMARYProof of concept for practical applications
Modification of NEXMark benchmark
Allow control over data streams Data rates modifiable through code Synchronization mechanism
Initial results Created a testbed of 6 queries Measured latency (time elapsed from arrival to
output)
21
LATENCY MEASUREMENTS
22
1 2 3 4 5 60
10
20
30
40
50
60
70
Average tuple latency
JAX-WS on TomcatLocal Java VM
NEXMark Query
latencymsec
LESONS LEARNEDPossible to implement a query processor
largely through service coordination
Interfaces respecting service-oriented architecture principles are essential
Operators must be congruent to maintain query semantics
Performance penalties can be significantly high
23
24
ThanksThanks
COMPUTATION SERVICE
25
Com
puta
tion
S1S3
S2
⋈• Data management
and calculation tasks
• Operations with function-like interfaces (f: X → Y)
stateful(e.g. hash storage)
HASH INDEX SERVICE
26
‘Bob’ ‘Don’ ‘Alice’
‘Mike’ ‘Mary’
‘Alice’‘Alex’
‘Sarah’
key:‘Alice’id:‘2AF3D28’ obj:10110101…01
0 1 2 …
SYMMETRIC HASH JOIN
27
person
(Mike, mike7@gmail)
(Alice, alice@hotmail)
(Bob, [email protected])
bid
(Camera, Mike, 36…)
(Painting, Alice, 3570)
(Arcade, Bob, 175, …)
stateful
HashIndex 1
stateful
HashIndex 2
28