1 // SPDX-License-Identifier: GPL-2.0-or-later
2 /*
3  * dlmconvert.c
4  *
5  * underlying calls for lock conversion
6  *
7  * Copyright (C) 2004 Oracle.  All rights reserved.
8  */
9 
10 
11 #include <linux/module.h>
12 #include <linux/fs.h>
13 #include <linux/types.h>
14 #include <linux/highmem.h>
15 #include <linux/init.h>
16 #include <linux/sysctl.h>
17 #include <linux/random.h>
18 #include <linux/blkdev.h>
19 #include <linux/socket.h>
20 #include <linux/inet.h>
21 #include <linux/spinlock.h>
22 
23 
24 #include "../cluster/heartbeat.h"
25 #include "../cluster/nodemanager.h"
26 #include "../cluster/tcp.h"
27 
28 #include "dlmapi.h"
29 #include "dlmcommon.h"
30 
31 #include "dlmconvert.h"
32 
33 #define MLOG_MASK_PREFIX ML_DLM
34 #include "../cluster/masklog.h"
35 
36 /* NOTE: __dlmconvert_master is the only function in here that
37  * needs a spinlock held on entry (res->spinlock) and it is the
38  * only one that holds a lock on exit (res->spinlock).
39  * All other functions in here need no locks and drop all of
40  * the locks that they acquire. */
41 static enum dlm_status __dlmconvert_master(struct dlm_ctxt *dlm,
42 					   struct dlm_lock_resource *res,
43 					   struct dlm_lock *lock, int flags,
44 					   int type, int *call_ast,
45 					   int *kick_thread);
46 static enum dlm_status dlm_send_remote_convert_request(struct dlm_ctxt *dlm,
47 					   struct dlm_lock_resource *res,
48 					   struct dlm_lock *lock, int flags, int type);
49 
50 /*
51  * this is only called directly by dlmlock(), and only when the
52  * local node is the owner of the lockres
53  * locking:
54  *   caller needs:  none
55  *   taken:         takes and drops res->spinlock
56  *   held on exit:  none
57  * returns: see __dlmconvert_master
58  */
dlmconvert_master(struct dlm_ctxt * dlm,struct dlm_lock_resource * res,struct dlm_lock * lock,int flags,int type)59 enum dlm_status dlmconvert_master(struct dlm_ctxt *dlm,
60 				  struct dlm_lock_resource *res,
61 				  struct dlm_lock *lock, int flags, int type)
62 {
63 	int call_ast = 0, kick_thread = 0;
64 	enum dlm_status status;
65 
66 	spin_lock(&res->spinlock);
67 	/* we are not in a network handler, this is fine */
68 	__dlm_wait_on_lockres(res);
69 	__dlm_lockres_reserve_ast(res);
70 	res->state |= DLM_LOCK_RES_IN_PROGRESS;
71 
72 	status = __dlmconvert_master(dlm, res, lock, flags, type,
73 				     &call_ast, &kick_thread);
74 
75 	res->state &= ~DLM_LOCK_RES_IN_PROGRESS;
76 	spin_unlock(&res->spinlock);
77 	wake_up(&res->wq);
78 	if (status != DLM_NORMAL && status != DLM_NOTQUEUED)
79 		dlm_error(status);
80 
81 	/* either queue the ast or release it */
82 	if (call_ast)
83 		dlm_queue_ast(dlm, lock);
84 	else
85 		dlm_lockres_release_ast(dlm, res);
86 
87 	if (kick_thread)
88 		dlm_kick_thread(dlm, res);
89 
90 	return status;
91 }
92 
93 /* performs lock conversion at the lockres master site
94  * locking:
95  *   caller needs:  res->spinlock
96  *   taken:         takes and drops lock->spinlock
97  *   held on exit:  res->spinlock
98  * returns: DLM_NORMAL, DLM_NOTQUEUED, DLM_DENIED
99  *   call_ast: whether ast should be called for this lock
100  *   kick_thread: whether dlm_kick_thread should be called
101  */
__dlmconvert_master(struct dlm_ctxt * dlm,struct dlm_lock_resource * res,struct dlm_lock * lock,int flags,int type,int * call_ast,int * kick_thread)102 static enum dlm_status __dlmconvert_master(struct dlm_ctxt *dlm,
103 					   struct dlm_lock_resource *res,
104 					   struct dlm_lock *lock, int flags,
105 					   int type, int *call_ast,
106 					   int *kick_thread)
107 {
108 	enum dlm_status status = DLM_NORMAL;
109 	struct dlm_lock *tmplock=NULL;
110 
111 	assert_spin_locked(&res->spinlock);
112 
113 	mlog(0, "type=%d, convert_type=%d, new convert_type=%d\n",
114 	     lock->ml.type, lock->ml.convert_type, type);
115 
116 	spin_lock(&lock->spinlock);
117 
118 	/* already converting? */
119 	if (lock->ml.convert_type != LKM_IVMODE) {
120 		mlog(ML_ERROR, "attempted to convert a lock with a lock "
121 		     "conversion pending\n");
122 		status = DLM_DENIED;
123 		goto unlock_exit;
124 	}
125 
126 	/* must be on grant queue to convert */
127 	if (!dlm_lock_on_list(&res->granted, lock)) {
128 		mlog(ML_ERROR, "attempted to convert a lock not on grant "
129 		     "queue\n");
130 		status = DLM_DENIED;
131 		goto unlock_exit;
132 	}
133 
134 	if (flags & LKM_VALBLK) {
135 		switch (lock->ml.type) {
136 			case LKM_EXMODE:
137 				/* EX + LKM_VALBLK + convert == set lvb */
138 				mlog(0, "will set lvb: converting %s->%s\n",
139 				     dlm_lock_mode_name(lock->ml.type),
140 				     dlm_lock_mode_name(type));
141 				lock->lksb->flags |= DLM_LKSB_PUT_LVB;
142 				break;
143 			case LKM_PRMODE:
144 			case LKM_NLMODE:
145 				/* refetch if new level is not NL */
146 				if (type > LKM_NLMODE) {
147 					mlog(0, "will fetch new value into "
148 					     "lvb: converting %s->%s\n",
149 					     dlm_lock_mode_name(lock->ml.type),
150 					     dlm_lock_mode_name(type));
151 					lock->lksb->flags |= DLM_LKSB_GET_LVB;
152 				} else {
153 					mlog(0, "will NOT fetch new value "
154 					     "into lvb: converting %s->%s\n",
155 					     dlm_lock_mode_name(lock->ml.type),
156 					     dlm_lock_mode_name(type));
157 					flags &= ~(LKM_VALBLK);
158 				}
159 				break;
160 		}
161 	}
162 
163 
164 	/* in-place downconvert? */
165 	if (type <= lock->ml.type)
166 		goto grant;
167 
168 	/* upconvert from here on */
169 	status = DLM_NORMAL;
170 	list_for_each_entry(tmplock, &res->granted, list) {
171 		if (tmplock == lock)
172 			continue;
173 		if (!dlm_lock_compatible(tmplock->ml.type, type))
174 			goto switch_queues;
175 	}
176 
177 	list_for_each_entry(tmplock, &res->converting, list) {
178 		if (!dlm_lock_compatible(tmplock->ml.type, type))
179 			goto switch_queues;
180 		/* existing conversion requests take precedence */
181 		if (!dlm_lock_compatible(tmplock->ml.convert_type, type))
182 			goto switch_queues;
183 	}
184 
185 	/* fall thru to grant */
186 
187 grant:
188 	mlog(0, "res %.*s, granting %s lock\n", res->lockname.len,
189 	     res->lockname.name, dlm_lock_mode_name(type));
190 	/* immediately grant the new lock type */
191 	lock->lksb->status = DLM_NORMAL;
192 	if (lock->ml.node == dlm->node_num)
193 		mlog(0, "doing in-place convert for nonlocal lock\n");
194 	lock->ml.type = type;
195 	if (lock->lksb->flags & DLM_LKSB_PUT_LVB)
196 		memcpy(res->lvb, lock->lksb->lvb, DLM_LVB_LEN);
197 
198 	/*
199 	 * Move the lock to the tail because it may be the only lock which has
200 	 * an invalid lvb.
201 	 */
202 	list_move_tail(&lock->list, &res->granted);
203 
204 	status = DLM_NORMAL;
205 	*call_ast = 1;
206 	goto unlock_exit;
207 
208 switch_queues:
209 	if (flags & LKM_NOQUEUE) {
210 		mlog(0, "failed to convert NOQUEUE lock %.*s from "
211 		     "%d to %d...\n", res->lockname.len, res->lockname.name,
212 		     lock->ml.type, type);
213 		status = DLM_NOTQUEUED;
214 		goto unlock_exit;
215 	}
216 	mlog(0, "res %.*s, queueing...\n", res->lockname.len,
217 	     res->lockname.name);
218 
219 	lock->ml.convert_type = type;
220 	/* do not alter lock refcount.  switching lists. */
221 	list_move_tail(&lock->list, &res->converting);
222 
223 unlock_exit:
224 	spin_unlock(&lock->spinlock);
225 	if (status == DLM_DENIED) {
226 		__dlm_print_one_lock_resource(res);
227 	}
228 	if (status == DLM_NORMAL)
229 		*kick_thread = 1;
230 	return status;
231 }
232 
dlm_revert_pending_convert(struct dlm_lock_resource * res,struct dlm_lock * lock)233 void dlm_revert_pending_convert(struct dlm_lock_resource *res,
234 				struct dlm_lock *lock)
235 {
236 	/* do not alter lock refcount.  switching lists. */
237 	list_move_tail(&lock->list, &res->granted);
238 	lock->ml.convert_type = LKM_IVMODE;
239 	lock->lksb->flags &= ~(DLM_LKSB_GET_LVB|DLM_LKSB_PUT_LVB);
240 }
241 
242 /* messages the master site to do lock conversion
243  * locking:
244  *   caller needs:  none
245  *   taken:         takes and drops res->spinlock, uses DLM_LOCK_RES_IN_PROGRESS
246  *   held on exit:  none
247  * returns: DLM_NORMAL, DLM_RECOVERING, status from remote node
248  */
dlmconvert_remote(struct dlm_ctxt * dlm,struct dlm_lock_resource * res,struct dlm_lock * lock,int flags,int type)249 enum dlm_status dlmconvert_remote(struct dlm_ctxt *dlm,
250 				  struct dlm_lock_resource *res,
251 				  struct dlm_lock *lock, int flags, int type)
252 {
253 	enum dlm_status status;
254 
255 	mlog(0, "type=%d, convert_type=%d, busy=%d\n", lock->ml.type,
256 	     lock->ml.convert_type, res->state & DLM_LOCK_RES_IN_PROGRESS);
257 
258 	spin_lock(&res->spinlock);
259 	if (res->state & DLM_LOCK_RES_RECOVERING) {
260 		mlog(0, "bailing out early since res is RECOVERING "
261 		     "on secondary queue\n");
262 		/* __dlm_print_one_lock_resource(res); */
263 		status = DLM_RECOVERING;
264 		goto bail;
265 	}
266 	/* will exit this call with spinlock held */
267 	__dlm_wait_on_lockres(res);
268 
269 	if (lock->ml.convert_type != LKM_IVMODE) {
270 		__dlm_print_one_lock_resource(res);
271 		mlog(ML_ERROR, "converting a remote lock that is already "
272 		     "converting! (cookie=%u:%llu, conv=%d)\n",
273 		     dlm_get_lock_cookie_node(be64_to_cpu(lock->ml.cookie)),
274 		     dlm_get_lock_cookie_seq(be64_to_cpu(lock->ml.cookie)),
275 		     lock->ml.convert_type);
276 		status = DLM_DENIED;
277 		goto bail;
278 	}
279 
280 	if (lock->ml.type == type && lock->ml.convert_type == LKM_IVMODE) {
281 		mlog(0, "last convert request returned DLM_RECOVERING, but "
282 		     "owner has already queued and sent ast to me. res %.*s, "
283 		     "(cookie=%u:%llu, type=%d, conv=%d)\n",
284 		     res->lockname.len, res->lockname.name,
285 		     dlm_get_lock_cookie_node(be64_to_cpu(lock->ml.cookie)),
286 		     dlm_get_lock_cookie_seq(be64_to_cpu(lock->ml.cookie)),
287 		     lock->ml.type, lock->ml.convert_type);
288 		status = DLM_NORMAL;
289 		goto bail;
290 	}
291 
292 	res->state |= DLM_LOCK_RES_IN_PROGRESS;
293 	/* move lock to local convert queue */
294 	/* do not alter lock refcount.  switching lists. */
295 	list_move_tail(&lock->list, &res->converting);
296 	lock->convert_pending = 1;
297 	lock->ml.convert_type = type;
298 
299 	if (flags & LKM_VALBLK) {
300 		if (lock->ml.type == LKM_EXMODE) {
301 			flags |= LKM_PUT_LVB;
302 			lock->lksb->flags |= DLM_LKSB_PUT_LVB;
303 		} else {
304 			if (lock->ml.convert_type == LKM_NLMODE)
305 				flags &= ~LKM_VALBLK;
306 			else {
307 				flags |= LKM_GET_LVB;
308 				lock->lksb->flags |= DLM_LKSB_GET_LVB;
309 			}
310 		}
311 	}
312 	spin_unlock(&res->spinlock);
313 
314 	/* no locks held here.
315 	 * need to wait for a reply as to whether it got queued or not. */
316 	status = dlm_send_remote_convert_request(dlm, res, lock, flags, type);
317 
318 	spin_lock(&res->spinlock);
319 	res->state &= ~DLM_LOCK_RES_IN_PROGRESS;
320 	/* if it failed, move it back to granted queue.
321 	 * if master returns DLM_NORMAL and then down before sending ast,
322 	 * it may have already been moved to granted queue, reset to
323 	 * DLM_RECOVERING and retry convert */
324 	if (status != DLM_NORMAL) {
325 		if (status != DLM_NOTQUEUED)
326 			dlm_error(status);
327 		dlm_revert_pending_convert(res, lock);
328 	} else if (!lock->convert_pending) {
329 		mlog(0, "%s: res %.*s, owner died and lock has been moved back "
330 				"to granted list, retry convert.\n",
331 				dlm->name, res->lockname.len, res->lockname.name);
332 		status = DLM_RECOVERING;
333 	}
334 
335 	lock->convert_pending = 0;
336 bail:
337 	spin_unlock(&res->spinlock);
338 
339 	/* TODO: should this be a wake_one? */
340 	/* wake up any IN_PROGRESS waiters */
341 	wake_up(&res->wq);
342 
343 	return status;
344 }
345 
346 /* sends DLM_CONVERT_LOCK_MSG to master site
347  * locking:
348  *   caller needs:  none
349  *   taken:         none
350  *   held on exit:  none
351  * returns: DLM_NOLOCKMGR, status from remote node
352  */
dlm_send_remote_convert_request(struct dlm_ctxt * dlm,struct dlm_lock_resource * res,struct dlm_lock * lock,int flags,int type)353 static enum dlm_status dlm_send_remote_convert_request(struct dlm_ctxt *dlm,
354 					   struct dlm_lock_resource *res,
355 					   struct dlm_lock *lock, int flags, int type)
356 {
357 	struct dlm_convert_lock convert;
358 	int tmpret;
359 	enum dlm_status ret;
360 	int status = 0;
361 	struct kvec vec[2];
362 	size_t veclen = 1;
363 
364 	mlog(0, "%.*s\n", res->lockname.len, res->lockname.name);
365 
366 	memset(&convert, 0, sizeof(struct dlm_convert_lock));
367 	convert.node_idx = dlm->node_num;
368 	convert.requested_type = type;
369 	convert.cookie = lock->ml.cookie;
370 	convert.namelen = res->lockname.len;
371 	convert.flags = cpu_to_be32(flags);
372 	memcpy(convert.name, res->lockname.name, convert.namelen);
373 
374 	vec[0].iov_len = sizeof(struct dlm_convert_lock);
375 	vec[0].iov_base = &convert;
376 
377 	if (flags & LKM_PUT_LVB) {
378 		/* extra data to send if we are updating lvb */
379 		vec[1].iov_len = DLM_LVB_LEN;
380 		vec[1].iov_base = lock->lksb->lvb;
381 		veclen++;
382 	}
383 
384 	tmpret = o2net_send_message_vec(DLM_CONVERT_LOCK_MSG, dlm->key,
385 					vec, veclen, res->owner, &status);
386 	if (tmpret >= 0) {
387 		// successfully sent and received
388 		ret = status;  // this is already a dlm_status
389 		if (ret == DLM_RECOVERING) {
390 			mlog(0, "node %u returned DLM_RECOVERING from convert "
391 			     "message!\n", res->owner);
392 		} else if (ret == DLM_MIGRATING) {
393 			mlog(0, "node %u returned DLM_MIGRATING from convert "
394 			     "message!\n", res->owner);
395 		} else if (ret == DLM_FORWARD) {
396 			mlog(0, "node %u returned DLM_FORWARD from convert "
397 			     "message!\n", res->owner);
398 		} else if (ret != DLM_NORMAL && ret != DLM_NOTQUEUED)
399 			dlm_error(ret);
400 	} else {
401 		mlog(ML_ERROR, "Error %d when sending message %u (key 0x%x) to "
402 		     "node %u\n", tmpret, DLM_CONVERT_LOCK_MSG, dlm->key,
403 		     res->owner);
404 		if (dlm_is_host_down(tmpret)) {
405 			/* instead of logging the same network error over
406 			 * and over, sleep here and wait for the heartbeat
407 			 * to notice the node is dead.  times out after 5s. */
408 			dlm_wait_for_node_death(dlm, res->owner,
409 						DLM_NODE_DEATH_WAIT_MAX);
410 			ret = DLM_RECOVERING;
411 			mlog(0, "node %u died so returning DLM_RECOVERING "
412 			     "from convert message!\n", res->owner);
413 		} else {
414 			ret = dlm_err_to_dlm_status(tmpret);
415 		}
416 	}
417 
418 	return ret;
419 }
420 
421 /* handler for DLM_CONVERT_LOCK_MSG on master site
422  * locking:
423  *   caller needs:  none
424  *   taken:         takes and drop res->spinlock
425  *   held on exit:  none
426  * returns: DLM_NORMAL, DLM_IVLOCKID, DLM_BADARGS,
427  *          status from __dlmconvert_master
428  */
dlm_convert_lock_handler(struct o2net_msg * msg,u32 len,void * data,void ** ret_data)429 int dlm_convert_lock_handler(struct o2net_msg *msg, u32 len, void *data,
430 			     void **ret_data)
431 {
432 	struct dlm_ctxt *dlm = data;
433 	struct dlm_convert_lock *cnv = (struct dlm_convert_lock *)msg->buf;
434 	struct dlm_lock_resource *res = NULL;
435 	struct dlm_lock *lock = NULL;
436 	struct dlm_lock *tmp_lock;
437 	struct dlm_lockstatus *lksb;
438 	enum dlm_status status = DLM_NORMAL;
439 	u32 flags;
440 	int call_ast = 0, kick_thread = 0, ast_reserved = 0, wake = 0;
441 
442 	if (!dlm_grab(dlm)) {
443 		dlm_error(DLM_REJECTED);
444 		return DLM_REJECTED;
445 	}
446 
447 	mlog_bug_on_msg(!dlm_domain_fully_joined(dlm),
448 			"Domain %s not fully joined!\n", dlm->name);
449 
450 	if (cnv->namelen > DLM_LOCKID_NAME_MAX) {
451 		status = DLM_IVBUFLEN;
452 		dlm_error(status);
453 		goto leave;
454 	}
455 
456 	flags = be32_to_cpu(cnv->flags);
457 
458 	if ((flags & (LKM_PUT_LVB|LKM_GET_LVB)) ==
459 	     (LKM_PUT_LVB|LKM_GET_LVB)) {
460 		mlog(ML_ERROR, "both PUT and GET lvb specified\n");
461 		status = DLM_BADARGS;
462 		goto leave;
463 	}
464 
465 	mlog(0, "lvb: %s\n", flags & LKM_PUT_LVB ? "put lvb" :
466 	     (flags & LKM_GET_LVB ? "get lvb" : "none"));
467 
468 	status = DLM_IVLOCKID;
469 	res = dlm_lookup_lockres(dlm, cnv->name, cnv->namelen);
470 	if (!res) {
471 		dlm_error(status);
472 		goto leave;
473 	}
474 
475 	spin_lock(&res->spinlock);
476 	status = __dlm_lockres_state_to_status(res);
477 	if (status != DLM_NORMAL) {
478 		spin_unlock(&res->spinlock);
479 		dlm_error(status);
480 		goto leave;
481 	}
482 	list_for_each_entry(tmp_lock, &res->granted, list) {
483 		if (tmp_lock->ml.cookie == cnv->cookie &&
484 		    tmp_lock->ml.node == cnv->node_idx) {
485 			lock = tmp_lock;
486 			dlm_lock_get(lock);
487 			break;
488 		}
489 	}
490 	spin_unlock(&res->spinlock);
491 	if (!lock) {
492 		status = DLM_IVLOCKID;
493 		mlog(ML_ERROR, "did not find lock to convert on grant queue! "
494 			       "cookie=%u:%llu\n",
495 		     dlm_get_lock_cookie_node(be64_to_cpu(cnv->cookie)),
496 		     dlm_get_lock_cookie_seq(be64_to_cpu(cnv->cookie)));
497 		dlm_print_one_lock_resource(res);
498 		goto leave;
499 	}
500 
501 	/* found the lock */
502 	lksb = lock->lksb;
503 
504 	/* see if caller needed to get/put lvb */
505 	if (flags & LKM_PUT_LVB) {
506 		BUG_ON(lksb->flags & (DLM_LKSB_PUT_LVB|DLM_LKSB_GET_LVB));
507 		lksb->flags |= DLM_LKSB_PUT_LVB;
508 		memcpy(&lksb->lvb[0], &cnv->lvb[0], DLM_LVB_LEN);
509 	} else if (flags & LKM_GET_LVB) {
510 		BUG_ON(lksb->flags & (DLM_LKSB_PUT_LVB|DLM_LKSB_GET_LVB));
511 		lksb->flags |= DLM_LKSB_GET_LVB;
512 	}
513 
514 	spin_lock(&res->spinlock);
515 	status = __dlm_lockres_state_to_status(res);
516 	if (status == DLM_NORMAL) {
517 		__dlm_lockres_reserve_ast(res);
518 		ast_reserved = 1;
519 		res->state |= DLM_LOCK_RES_IN_PROGRESS;
520 		status = __dlmconvert_master(dlm, res, lock, flags,
521 					     cnv->requested_type,
522 					     &call_ast, &kick_thread);
523 		res->state &= ~DLM_LOCK_RES_IN_PROGRESS;
524 		wake = 1;
525 	}
526 	spin_unlock(&res->spinlock);
527 	if (wake)
528 		wake_up(&res->wq);
529 
530 	if (status != DLM_NORMAL) {
531 		if (status != DLM_NOTQUEUED)
532 			dlm_error(status);
533 		lksb->flags &= ~(DLM_LKSB_GET_LVB|DLM_LKSB_PUT_LVB);
534 	}
535 
536 leave:
537 	if (lock)
538 		dlm_lock_put(lock);
539 
540 	/* either queue the ast or release it, if reserved */
541 	if (call_ast)
542 		dlm_queue_ast(dlm, lock);
543 	else if (ast_reserved)
544 		dlm_lockres_release_ast(dlm, res);
545 
546 	if (kick_thread)
547 		dlm_kick_thread(dlm, res);
548 
549 	if (res)
550 		dlm_lockres_put(res);
551 
552 	dlm_put(dlm);
553 
554 	return status;
555 }
556