1(*
2 * Copyright (C) 2006-2007 XenSource Ltd.
3 * Copyright (C) 2008      Citrix Ltd.
4 * Author Vincent Hanquez <vincent.hanquez@eu.citrix.com>
5 * Author Thomas Gazagnaire <thomas.gazagnaire@citrix.com>
6 *
7 * This program is free software; you can redistribute it and/or modify
8 * it under the terms of the GNU Lesser General Public License as published
9 * by the Free Software Foundation; version 2.1 only. with the special
10 * exception on linking described in file LICENSE.
11 *
12 * This program is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU Lesser General Public License for more details.
16 *)
17let error fmt = Logging.error "transaction" fmt
18
19open Stdext
20
21let none = 0
22let test_eagain = ref false
23let do_coalesce = ref true
24
25let check_parents_perms_identical root1 root2 path =
26	let hierarch = Store.Path.get_hierarchy path in
27	let permdiff = List.fold_left (fun acc path ->
28		let n1 = Store.Path.get_node root1 path
29		and n2 = Store.Path.get_node root2 path in
30		match n1, n2 with
31		| Some n1, Some n2 ->
32			not (Perms.equiv (Store.Node.get_perms n1) (Store.Node.get_perms n2)) || acc
33		| _ ->
34			true || acc
35	) false hierarch in
36	(not permdiff)
37
38let get_lowest path1 path2 =
39	match path2 with
40	| None       -> Some path1
41	| Some path2 -> Some (Store.Path.get_common_prefix path1 path2)
42
43let test_coalesce oldroot currentroot optpath =
44	match optpath with
45	| None      -> true
46	| Some path ->
47		let oldnode = Store.Path.get_node oldroot path
48		and currentnode = Store.Path.get_node currentroot path in
49
50		match oldnode, currentnode with
51		| (Some oldnode), (Some currentnode) ->
52			if oldnode == currentnode then (
53				check_parents_perms_identical oldroot currentroot path
54			) else (
55				false
56			)
57		| None, None -> (
58			(* ok then it doesn't exists in the old version and the current version,
59			   just sneak it in as a child of the parent node if it exists, or else fail *)
60			let pnode = Store.Path.get_node currentroot (Store.Path.get_parent path) in
61			match pnode with
62			| None       -> false (* ok it doesn't exists, just bail out. *)
63			| Some _     -> true
64			)
65		| _ ->
66			false
67
68let can_coalesce oldroot currentroot path =
69	if !do_coalesce then
70		try test_coalesce oldroot currentroot path with _ -> false
71	else
72		false
73
74type ty = No | Full of (
75	int *          (* Transaction id *)
76	Store.t *      (* Original store *)
77	Store.t        (* A pointer to the canonical store: its root changes on each transaction-commit *)
78)
79
80type t = {
81	ty: ty;
82	start_count: int64;
83	store: Store.t; (* This is the store that we change in write operations. *)
84	quota: Quota.t;
85	oldroot: Store.Node.t;
86	mutable paths: (Xenbus.Xb.Op.operation * Store.Path.t) list;
87	mutable operations: (Packet.request * Packet.response) list;
88	mutable read_lowpath: Store.Path.t option;
89	mutable write_lowpath: Store.Path.t option;
90}
91let get_id t = match t.ty with No -> none | Full (id, _, _) -> id
92
93let counter = ref 0L
94let failed_commits = ref 0L
95let failed_commits_no_culprit = ref 0L
96let reset_conflict_stats () =
97	failed_commits := 0L;
98	failed_commits_no_culprit := 0L
99
100(* Scope for optimisation: different data-structure and functions to search/filter it *)
101let short_running_txns = ref []
102
103let oldest_short_running_transaction () =
104	let rec last = function
105		| [] -> None
106		| [x] -> Some x
107		| _ :: xs -> last xs
108	in last !short_running_txns
109
110let trim_short_running_transactions txn =
111	let cutoff = Unix.gettimeofday () -. !Define.conflict_max_history_seconds in
112	let keep = match txn with
113		| None -> (function (start_time, _) -> start_time >= cutoff)
114		| Some t -> (function (start_time, tx) -> start_time >= cutoff && tx != t)
115	in
116	short_running_txns := List.filter
117		keep
118		!short_running_txns
119
120let make ?(internal=false) id store =
121	let ty = if id = none then No else Full(id, Store.copy store, store) in
122	let txn = {
123		ty = ty;
124		start_count = !counter;
125		store = if id = none then store else Store.copy store;
126		quota = Quota.copy store.Store.quota;
127		oldroot = Store.get_root store;
128		paths = [];
129		operations = [];
130		read_lowpath = None;
131		write_lowpath = None;
132	} in
133	if id <> none && not internal then (
134		let now = Unix.gettimeofday () in
135		short_running_txns := (now, txn) :: !short_running_txns
136	);
137	txn
138
139let get_store t = t.store
140let get_paths t = t.paths
141
142let get_root t = Store.get_root t.store
143
144let is_read_only t = t.paths = []
145let add_wop t ty path = t.paths <- (ty, path) :: t.paths
146let add_operation ~perm t request response =
147	if !Define.maxrequests >= 0
148		&& not (Perms.Connection.is_dom0 perm)
149		&& List.length t.operations >= !Define.maxrequests
150		then raise Quota.Limit_reached;
151	t.operations <- (request, response) :: t.operations
152let get_operations t = List.rev t.operations
153let set_read_lowpath t path = t.read_lowpath <- get_lowest path t.read_lowpath
154let set_write_lowpath t path = t.write_lowpath <- get_lowest path t.write_lowpath
155
156let path_exists t path = Store.path_exists t.store path
157
158let write t perm path value =
159	let path_exists = path_exists t path in
160	Store.write t.store perm path value;
161	if path_exists
162	then set_write_lowpath t path
163	else set_write_lowpath t (Store.Path.get_parent path);
164	add_wop t Xenbus.Xb.Op.Write path
165
166let mkdir ?(with_watch=true) t perm path =
167	Store.mkdir t.store perm path;
168	set_write_lowpath t path;
169	if with_watch then
170		add_wop t Xenbus.Xb.Op.Mkdir path
171
172let setperms t perm path perms =
173	Store.setperms t.store perm path perms;
174	set_write_lowpath t path;
175	add_wop t Xenbus.Xb.Op.Setperms path
176
177let rm t perm path =
178	Store.rm t.store perm path;
179	set_write_lowpath t (Store.Path.get_parent path);
180	add_wop t Xenbus.Xb.Op.Rm path
181
182let ls t perm path =
183	let r = Store.ls t.store perm path in
184	set_read_lowpath t path;
185	r
186
187let read t perm path =
188	let r = Store.read t.store perm path in
189	set_read_lowpath t path;
190	r
191
192let getperms t perm path =
193	let r = Store.getperms t.store perm path in
194	set_read_lowpath t path;
195	r
196
197let commit ~con t =
198	let has_write_ops = List.length t.paths > 0 in
199	let has_coalesced = ref false in
200	let has_commited =
201	match t.ty with
202	| No                         -> true
203	| Full (_id, oldstore, cstore) ->       (* "cstore" meaning current canonical store *)
204		let commit_partial oldroot cstore store =
205			(* get the lowest path of the query and verify that it hasn't
206			   been modified by others transactions. *)
207			if can_coalesce oldroot (Store.get_root cstore) t.read_lowpath
208			&& can_coalesce oldroot (Store.get_root cstore) t.write_lowpath then (
209				maybe (fun p ->
210					let n = Store.get_node store p in
211
212					(* it has to be in the store, otherwise it means bugs
213					   in the lowpath registration. we don't need to handle none. *)
214					maybe (fun n -> Store.set_node cstore p n t.quota store.Store.quota) n;
215					Logging.write_coalesce ~tid:(get_id t) ~con (Store.Path.to_string p);
216				) t.write_lowpath;
217				maybe (fun p ->
218					Logging.read_coalesce ~tid:(get_id t) ~con (Store.Path.to_string p)
219					) t.read_lowpath;
220				has_coalesced := true;
221				Store.incr_transaction_coalesce cstore;
222				true
223			) else (
224				(* cannot do anything simple, just discard the queries,
225				   and the client need to redo it later *)
226				Store.incr_transaction_abort cstore;
227				false
228			)
229			in
230		let try_commit oldroot cstore store =
231			if oldroot == Store.get_root cstore then (
232				(* move the new root to the current store, if the oldroot
233				   has not been modified *)
234				if has_write_ops then (
235					Store.set_root cstore (Store.get_root store);
236					Store.set_quota cstore (Store.get_quota store)
237				);
238				true
239			) else
240				(* we try a partial commit if possible *)
241				commit_partial oldroot cstore store
242			in
243		if !test_eagain && Random.int 3 = 0 then
244			false
245		else
246			try_commit (Store.get_root oldstore) cstore t.store
247		in
248	if has_commited && has_write_ops then
249		Disk.write t.store;
250	if not has_commited
251	then Logging.conflict ~tid:(get_id t) ~con
252	else if not !has_coalesced
253	then Logging.commit ~tid:(get_id t) ~con;
254	has_commited
255