1(* 2 * Copyright (C) 2006-2007 XenSource Ltd. 3 * Copyright (C) 2008 Citrix Ltd. 4 * Author Vincent Hanquez <vincent.hanquez@eu.citrix.com> 5 * Author Thomas Gazagnaire <thomas.gazagnaire@citrix.com> 6 * 7 * This program is free software; you can redistribute it and/or modify 8 * it under the terms of the GNU Lesser General Public License as published 9 * by the Free Software Foundation; version 2.1 only. with the special 10 * exception on linking described in file LICENSE. 11 * 12 * This program is distributed in the hope that it will be useful, 13 * but WITHOUT ANY WARRANTY; without even the implied warranty of 14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 15 * GNU Lesser General Public License for more details. 16 *) 17let error fmt = Logging.error "transaction" fmt 18 19open Stdext 20 21let none = 0 22let test_eagain = ref false 23let do_coalesce = ref true 24 25let check_parents_perms_identical root1 root2 path = 26 let hierarch = Store.Path.get_hierarchy path in 27 let permdiff = List.fold_left (fun acc path -> 28 let n1 = Store.Path.get_node root1 path 29 and n2 = Store.Path.get_node root2 path in 30 match n1, n2 with 31 | Some n1, Some n2 -> 32 not (Perms.equiv (Store.Node.get_perms n1) (Store.Node.get_perms n2)) || acc 33 | _ -> 34 true || acc 35 ) false hierarch in 36 (not permdiff) 37 38let get_lowest path1 path2 = 39 match path2 with 40 | None -> Some path1 41 | Some path2 -> Some (Store.Path.get_common_prefix path1 path2) 42 43let test_coalesce oldroot currentroot optpath = 44 match optpath with 45 | None -> true 46 | Some path -> 47 let oldnode = Store.Path.get_node oldroot path 48 and currentnode = Store.Path.get_node currentroot path in 49 50 match oldnode, currentnode with 51 | (Some oldnode), (Some currentnode) -> 52 if oldnode == currentnode then ( 53 check_parents_perms_identical oldroot currentroot path 54 ) else ( 55 false 56 ) 57 | None, None -> ( 58 (* ok then it doesn't exists in the old version and the current version, 59 just sneak it in as a child of the parent node if it exists, or else fail *) 60 let pnode = Store.Path.get_node currentroot (Store.Path.get_parent path) in 61 match pnode with 62 | None -> false (* ok it doesn't exists, just bail out. *) 63 | Some _ -> true 64 ) 65 | _ -> 66 false 67 68let can_coalesce oldroot currentroot path = 69 if !do_coalesce then 70 try test_coalesce oldroot currentroot path with _ -> false 71 else 72 false 73 74type ty = No | Full of ( 75 int * (* Transaction id *) 76 Store.t * (* Original store *) 77 Store.t (* A pointer to the canonical store: its root changes on each transaction-commit *) 78) 79 80type t = { 81 ty: ty; 82 start_count: int64; 83 store: Store.t; (* This is the store that we change in write operations. *) 84 quota: Quota.t; 85 oldroot: Store.Node.t; 86 mutable paths: (Xenbus.Xb.Op.operation * Store.Path.t) list; 87 mutable operations: (Packet.request * Packet.response) list; 88 mutable read_lowpath: Store.Path.t option; 89 mutable write_lowpath: Store.Path.t option; 90} 91let get_id t = match t.ty with No -> none | Full (id, _, _) -> id 92 93let counter = ref 0L 94let failed_commits = ref 0L 95let failed_commits_no_culprit = ref 0L 96let reset_conflict_stats () = 97 failed_commits := 0L; 98 failed_commits_no_culprit := 0L 99 100(* Scope for optimisation: different data-structure and functions to search/filter it *) 101let short_running_txns = ref [] 102 103let oldest_short_running_transaction () = 104 let rec last = function 105 | [] -> None 106 | [x] -> Some x 107 | _ :: xs -> last xs 108 in last !short_running_txns 109 110let trim_short_running_transactions txn = 111 let cutoff = Unix.gettimeofday () -. !Define.conflict_max_history_seconds in 112 let keep = match txn with 113 | None -> (function (start_time, _) -> start_time >= cutoff) 114 | Some t -> (function (start_time, tx) -> start_time >= cutoff && tx != t) 115 in 116 short_running_txns := List.filter 117 keep 118 !short_running_txns 119 120let make ?(internal=false) id store = 121 let ty = if id = none then No else Full(id, Store.copy store, store) in 122 let txn = { 123 ty = ty; 124 start_count = !counter; 125 store = if id = none then store else Store.copy store; 126 quota = Quota.copy store.Store.quota; 127 oldroot = Store.get_root store; 128 paths = []; 129 operations = []; 130 read_lowpath = None; 131 write_lowpath = None; 132 } in 133 if id <> none && not internal then ( 134 let now = Unix.gettimeofday () in 135 short_running_txns := (now, txn) :: !short_running_txns 136 ); 137 txn 138 139let get_store t = t.store 140let get_paths t = t.paths 141 142let get_root t = Store.get_root t.store 143 144let is_read_only t = t.paths = [] 145let add_wop t ty path = t.paths <- (ty, path) :: t.paths 146let add_operation ~perm t request response = 147 if !Define.maxrequests >= 0 148 && not (Perms.Connection.is_dom0 perm) 149 && List.length t.operations >= !Define.maxrequests 150 then raise Quota.Limit_reached; 151 t.operations <- (request, response) :: t.operations 152let get_operations t = List.rev t.operations 153let set_read_lowpath t path = t.read_lowpath <- get_lowest path t.read_lowpath 154let set_write_lowpath t path = t.write_lowpath <- get_lowest path t.write_lowpath 155 156let path_exists t path = Store.path_exists t.store path 157 158let write t perm path value = 159 let path_exists = path_exists t path in 160 Store.write t.store perm path value; 161 if path_exists 162 then set_write_lowpath t path 163 else set_write_lowpath t (Store.Path.get_parent path); 164 add_wop t Xenbus.Xb.Op.Write path 165 166let mkdir ?(with_watch=true) t perm path = 167 Store.mkdir t.store perm path; 168 set_write_lowpath t path; 169 if with_watch then 170 add_wop t Xenbus.Xb.Op.Mkdir path 171 172let setperms t perm path perms = 173 Store.setperms t.store perm path perms; 174 set_write_lowpath t path; 175 add_wop t Xenbus.Xb.Op.Setperms path 176 177let rm t perm path = 178 Store.rm t.store perm path; 179 set_write_lowpath t (Store.Path.get_parent path); 180 add_wop t Xenbus.Xb.Op.Rm path 181 182let ls t perm path = 183 let r = Store.ls t.store perm path in 184 set_read_lowpath t path; 185 r 186 187let read t perm path = 188 let r = Store.read t.store perm path in 189 set_read_lowpath t path; 190 r 191 192let getperms t perm path = 193 let r = Store.getperms t.store perm path in 194 set_read_lowpath t path; 195 r 196 197let commit ~con t = 198 let has_write_ops = List.length t.paths > 0 in 199 let has_coalesced = ref false in 200 let has_commited = 201 match t.ty with 202 | No -> true 203 | Full (_id, oldstore, cstore) -> (* "cstore" meaning current canonical store *) 204 let commit_partial oldroot cstore store = 205 (* get the lowest path of the query and verify that it hasn't 206 been modified by others transactions. *) 207 if can_coalesce oldroot (Store.get_root cstore) t.read_lowpath 208 && can_coalesce oldroot (Store.get_root cstore) t.write_lowpath then ( 209 maybe (fun p -> 210 let n = Store.get_node store p in 211 212 (* it has to be in the store, otherwise it means bugs 213 in the lowpath registration. we don't need to handle none. *) 214 maybe (fun n -> Store.set_node cstore p n t.quota store.Store.quota) n; 215 Logging.write_coalesce ~tid:(get_id t) ~con (Store.Path.to_string p); 216 ) t.write_lowpath; 217 maybe (fun p -> 218 Logging.read_coalesce ~tid:(get_id t) ~con (Store.Path.to_string p) 219 ) t.read_lowpath; 220 has_coalesced := true; 221 Store.incr_transaction_coalesce cstore; 222 true 223 ) else ( 224 (* cannot do anything simple, just discard the queries, 225 and the client need to redo it later *) 226 Store.incr_transaction_abort cstore; 227 false 228 ) 229 in 230 let try_commit oldroot cstore store = 231 if oldroot == Store.get_root cstore then ( 232 (* move the new root to the current store, if the oldroot 233 has not been modified *) 234 if has_write_ops then ( 235 Store.set_root cstore (Store.get_root store); 236 Store.set_quota cstore (Store.get_quota store) 237 ); 238 true 239 ) else 240 (* we try a partial commit if possible *) 241 commit_partial oldroot cstore store 242 in 243 if !test_eagain && Random.int 3 = 0 then 244 false 245 else 246 try_commit (Store.get_root oldstore) cstore t.store 247 in 248 if has_commited && has_write_ops then 249 Disk.write t.store; 250 if not has_commited 251 then Logging.conflict ~tid:(get_id t) ~con 252 else if not !has_coalesced 253 then Logging.commit ~tid:(get_id t) ~con; 254 has_commited 255