requestqueue.c 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171
  1. /******************************************************************************
  2. *******************************************************************************
  3. **
  4. ** Copyright (C) 2005-2007 Red Hat, Inc. All rights reserved.
  5. **
  6. ** This copyrighted material is made available to anyone wishing to use,
  7. ** modify, copy, or redistribute it subject to the terms and conditions
  8. ** of the GNU General Public License v.2.
  9. **
  10. *******************************************************************************
  11. ******************************************************************************/
  12. #include "dlm_internal.h"
  13. #include "member.h"
  14. #include "lock.h"
  15. #include "dir.h"
  16. #include "config.h"
  17. #include "requestqueue.h"
  18. struct rq_entry {
  19. struct list_head list;
  20. uint32_t recover_seq;
  21. int nodeid;
  22. struct dlm_message request;
  23. };
  24. /*
  25. * Requests received while the lockspace is in recovery get added to the
  26. * request queue and processed when recovery is complete. This happens when
  27. * the lockspace is suspended on some nodes before it is on others, or the
  28. * lockspace is enabled on some while still suspended on others.
  29. */
  30. void dlm_add_requestqueue(struct dlm_ls *ls, int nodeid, struct dlm_message *ms)
  31. {
  32. struct rq_entry *e;
  33. int length = ms->m_header.h_length - sizeof(struct dlm_message);
  34. e = kmalloc(sizeof(struct rq_entry) + length, GFP_NOFS);
  35. if (!e) {
  36. log_print("dlm_add_requestqueue: out of memory len %d", length);
  37. return;
  38. }
  39. e->recover_seq = ls->ls_recover_seq & 0xFFFFFFFF;
  40. e->nodeid = nodeid;
  41. memcpy(&e->request, ms, ms->m_header.h_length);
  42. mutex_lock(&ls->ls_requestqueue_mutex);
  43. list_add_tail(&e->list, &ls->ls_requestqueue);
  44. mutex_unlock(&ls->ls_requestqueue_mutex);
  45. }
  46. /*
  47. * Called by dlm_recoverd to process normal messages saved while recovery was
  48. * happening. Normal locking has been enabled before this is called. dlm_recv
  49. * upon receiving a message, will wait for all saved messages to be drained
  50. * here before processing the message it got. If a new dlm_ls_stop() arrives
  51. * while we're processing these saved messages, it may block trying to suspend
  52. * dlm_recv if dlm_recv is waiting for us in dlm_wait_requestqueue. In that
  53. * case, we don't abort since locking_stopped is still 0. If dlm_recv is not
  54. * waiting for us, then this processing may be aborted due to locking_stopped.
  55. */
  56. int dlm_process_requestqueue(struct dlm_ls *ls)
  57. {
  58. struct rq_entry *e;
  59. struct dlm_message *ms;
  60. int error = 0;
  61. mutex_lock(&ls->ls_requestqueue_mutex);
  62. for (;;) {
  63. if (list_empty(&ls->ls_requestqueue)) {
  64. mutex_unlock(&ls->ls_requestqueue_mutex);
  65. error = 0;
  66. break;
  67. }
  68. e = list_entry(ls->ls_requestqueue.next, struct rq_entry, list);
  69. mutex_unlock(&ls->ls_requestqueue_mutex);
  70. ms = &e->request;
  71. log_limit(ls, "dlm_process_requestqueue msg %d from %d "
  72. "lkid %x remid %x result %d seq %u",
  73. ms->m_type, ms->m_header.h_nodeid,
  74. ms->m_lkid, ms->m_remid, ms->m_result,
  75. e->recover_seq);
  76. dlm_receive_message_saved(ls, &e->request, e->recover_seq);
  77. mutex_lock(&ls->ls_requestqueue_mutex);
  78. list_del(&e->list);
  79. kfree(e);
  80. if (dlm_locking_stopped(ls)) {
  81. log_debug(ls, "process_requestqueue abort running");
  82. mutex_unlock(&ls->ls_requestqueue_mutex);
  83. error = -EINTR;
  84. break;
  85. }
  86. schedule();
  87. }
  88. return error;
  89. }
  90. /*
  91. * After recovery is done, locking is resumed and dlm_recoverd takes all the
  92. * saved requests and processes them as they would have been by dlm_recv. At
  93. * the same time, dlm_recv will start receiving new requests from remote nodes.
  94. * We want to delay dlm_recv processing new requests until dlm_recoverd has
  95. * finished processing the old saved requests. We don't check for locking
  96. * stopped here because dlm_ls_stop won't stop locking until it's suspended us
  97. * (dlm_recv).
  98. */
  99. void dlm_wait_requestqueue(struct dlm_ls *ls)
  100. {
  101. for (;;) {
  102. mutex_lock(&ls->ls_requestqueue_mutex);
  103. if (list_empty(&ls->ls_requestqueue))
  104. break;
  105. mutex_unlock(&ls->ls_requestqueue_mutex);
  106. schedule();
  107. }
  108. mutex_unlock(&ls->ls_requestqueue_mutex);
  109. }
  110. static int purge_request(struct dlm_ls *ls, struct dlm_message *ms, int nodeid)
  111. {
  112. uint32_t type = ms->m_type;
  113. /* the ls is being cleaned up and freed by release_lockspace */
  114. if (!ls->ls_count)
  115. return 1;
  116. if (dlm_is_removed(ls, nodeid))
  117. return 1;
  118. /* directory operations are always purged because the directory is
  119. always rebuilt during recovery and the lookups resent */
  120. if (type == DLM_MSG_REMOVE ||
  121. type == DLM_MSG_LOOKUP ||
  122. type == DLM_MSG_LOOKUP_REPLY)
  123. return 1;
  124. if (!dlm_no_directory(ls))
  125. return 0;
  126. return 1;
  127. }
  128. void dlm_purge_requestqueue(struct dlm_ls *ls)
  129. {
  130. struct dlm_message *ms;
  131. struct rq_entry *e, *safe;
  132. mutex_lock(&ls->ls_requestqueue_mutex);
  133. list_for_each_entry_safe(e, safe, &ls->ls_requestqueue, list) {
  134. ms = &e->request;
  135. if (purge_request(ls, ms, e->nodeid)) {
  136. list_del(&e->list);
  137. kfree(e);
  138. }
  139. }
  140. mutex_unlock(&ls->ls_requestqueue_mutex);
  141. }