libceph: support for sending notifies
Implement ceph_osdc_notify() for sending notifies.
Due to the fact that the current messenger can't do read-in into
pagelists (it can only do write-out from them), I had to go with a page
vector for a NOTIFY_COMPLETE payload, for now.
Signed-off-by: Ilya Dryomov <idryomov@gmail.com>
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index ca0a7b5..e6e3ab4 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -334,6 +334,10 @@
case CEPH_OSD_OP_NOTIFY_ACK:
ceph_osd_data_release(&op->notify_ack.request_data);
break;
+ case CEPH_OSD_OP_NOTIFY:
+ ceph_osd_data_release(&op->notify.request_data);
+ ceph_osd_data_release(&op->notify.response_data);
+ break;
default:
break;
}
@@ -845,6 +849,9 @@
break;
case CEPH_OSD_OP_NOTIFY_ACK:
break;
+ case CEPH_OSD_OP_NOTIFY:
+ dst->notify.cookie = cpu_to_le64(src->notify.cookie);
+ break;
case CEPH_OSD_OP_SETALLOCHINT:
dst->alloc_hint.expected_object_size =
cpu_to_le64(src->alloc_hint.expected_object_size);
@@ -1439,6 +1446,12 @@
ceph_osdc_msg_data_add(req->r_reply,
&op->cls.response_data);
break;
+ case CEPH_OSD_OP_NOTIFY:
+ ceph_osdc_msg_data_add(msg,
+ &op->notify.request_data);
+ ceph_osdc_msg_data_add(req->r_reply,
+ &op->notify.response_data);
+ break;
}
data_len += op->indata_len;
@@ -1771,6 +1784,7 @@
RB_CLEAR_NODE(&lreq->osdc_node);
INIT_LIST_HEAD(&lreq->scan_item);
init_completion(&lreq->reg_commit_wait);
+ init_completion(&lreq->notify_finish_wait);
lreq->osdc = osdc;
target_init(&lreq->t);
@@ -1934,6 +1948,7 @@
goto out;
}
+ WARN_ON(!lreq->is_watch);
dout("%s lreq %p notify_id %llu notifier_id %llu payload_len %zu\n",
__func__, lreq, lwork->notify.notify_id, lwork->notify.notifier_id,
lwork->notify.payload_len);
@@ -1997,6 +2012,24 @@
linger_reg_commit_complete(lreq, req->r_result);
lreq->committed = true;
+ if (!lreq->is_watch) {
+ struct ceph_osd_data *osd_data =
+ osd_req_op_data(req, 0, notify, response_data);
+ void *p = page_address(osd_data->pages[0]);
+
+ WARN_ON(req->r_ops[0].op != CEPH_OSD_OP_NOTIFY ||
+ osd_data->type != CEPH_OSD_DATA_TYPE_PAGES);
+
+ /* make note of the notify_id */
+ if (req->r_ops[0].outdata_len >= sizeof(u64)) {
+ lreq->notify_id = ceph_decode_64(&p);
+ dout("lreq %p notify_id %llu\n", lreq,
+ lreq->notify_id);
+ } else {
+ dout("lreq %p no notify_id\n", lreq);
+ }
+ }
+
mutex_unlock(&lreq->lock);
linger_put(lreq);
}
@@ -2050,7 +2083,7 @@
req->r_mtime = lreq->mtime;
mutex_lock(&lreq->lock);
- if (lreq->committed) {
+ if (lreq->is_watch && lreq->committed) {
WARN_ON(op->op != CEPH_OSD_OP_WATCH ||
op->watch.cookie != lreq->linger_id);
op->watch.op = CEPH_OSD_WATCH_OP_RECONNECT;
@@ -2059,7 +2092,10 @@
op->watch.gen);
req->r_callback = linger_reconnect_cb;
} else {
- WARN_ON(op->watch.op != CEPH_OSD_WATCH_OP_WATCH);
+ if (!lreq->is_watch)
+ lreq->notify_id = 0;
+ else
+ WARN_ON(op->watch.op != CEPH_OSD_WATCH_OP_WATCH);
dout("lreq %p register\n", lreq);
req->r_callback = linger_commit_cb;
}
@@ -2147,7 +2183,7 @@
*/
static void __linger_cancel(struct ceph_osd_linger_request *lreq)
{
- if (lreq->ping_req->r_osd)
+ if (lreq->is_watch && lreq->ping_req->r_osd)
cancel_linger_request(lreq->ping_req);
if (lreq->reg_req->r_osd)
cancel_linger_request(lreq->reg_req);
@@ -2174,6 +2210,15 @@
return ret ?: lreq->reg_commit_error;
}
+static int linger_notify_finish_wait(struct ceph_osd_linger_request *lreq)
+{
+ int ret;
+
+ dout("%s lreq %p linger_id %llu\n", __func__, lreq, lreq->linger_id);
+ ret = wait_for_completion_interruptible(&lreq->notify_finish_wait);
+ return ret ?: lreq->notify_finish_error;
+}
+
/*
* Timeout callback, called every N seconds. When 1 or more OSD
* requests has been active for more than N seconds, we send a keepalive
@@ -2220,7 +2265,7 @@
found = true;
mutex_lock(&lreq->lock);
- if (lreq->committed && !lreq->last_error)
+ if (lreq->is_watch && lreq->committed && !lreq->last_error)
send_linger_ping(lreq);
mutex_unlock(&lreq->lock);
}
@@ -3032,6 +3077,7 @@
u8 proto_ver, opcode;
u64 cookie, notify_id;
u64 notifier_id = 0;
+ s32 return_code = 0;
void *payload = NULL;
u32 payload_len = 0;
@@ -3049,7 +3095,7 @@
}
if (le16_to_cpu(msg->hdr.version) >= 2)
- p += 4; /* skip return_code */
+ ceph_decode_32_safe(&p, end, return_code, bad);
if (le16_to_cpu(msg->hdr.version) >= 3)
ceph_decode_64_safe(&p, end, notifier_id, bad);
@@ -3063,13 +3109,38 @@
}
mutex_lock(&lreq->lock);
- dout("%s opcode %d cookie %llu lreq %p\n", __func__, opcode, cookie,
- lreq);
+ dout("%s opcode %d cookie %llu lreq %p is_watch %d\n", __func__,
+ opcode, cookie, lreq, lreq->is_watch);
if (opcode == CEPH_WATCH_EVENT_DISCONNECT) {
if (!lreq->last_error) {
lreq->last_error = -ENOTCONN;
queue_watch_error(lreq);
}
+ } else if (!lreq->is_watch) {
+ /* CEPH_WATCH_EVENT_NOTIFY_COMPLETE */
+ if (lreq->notify_id && lreq->notify_id != notify_id) {
+ dout("lreq %p notify_id %llu != %llu, ignoring\n", lreq,
+ lreq->notify_id, notify_id);
+ } else if (!completion_done(&lreq->notify_finish_wait)) {
+ struct ceph_msg_data *data =
+ list_first_entry_or_null(&msg->data,
+ struct ceph_msg_data,
+ links);
+
+ if (data) {
+ if (lreq->preply_pages) {
+ WARN_ON(data->type !=
+ CEPH_MSG_DATA_PAGES);
+ *lreq->preply_pages = data->pages;
+ *lreq->preply_len = data->length;
+ } else {
+ ceph_release_page_vector(data->pages,
+ calc_pages_for(0, data->length));
+ }
+ }
+ lreq->notify_finish_error = return_code;
+ complete_all(&lreq->notify_finish_wait);
+ }
} else {
/* CEPH_WATCH_EVENT_NOTIFY */
lwork = lwork_alloc(lreq, do_watch_notify);
@@ -3241,6 +3312,7 @@
if (!lreq)
return ERR_PTR(-ENOMEM);
+ lreq->is_watch = true;
lreq->wcb = wcb;
lreq->errcb = errcb;
lreq->data = data;
@@ -3395,6 +3467,116 @@
}
EXPORT_SYMBOL(ceph_osdc_notify_ack);
+static int osd_req_op_notify_init(struct ceph_osd_request *req, int which,
+ u64 cookie, u32 prot_ver, u32 timeout,
+ void *payload, size_t payload_len)
+{
+ struct ceph_osd_req_op *op;
+ struct ceph_pagelist *pl;
+ int ret;
+
+ op = _osd_req_op_init(req, which, CEPH_OSD_OP_NOTIFY, 0);
+ op->notify.cookie = cookie;
+
+ pl = kmalloc(sizeof(*pl), GFP_NOIO);
+ if (!pl)
+ return -ENOMEM;
+
+ ceph_pagelist_init(pl);
+ ret = ceph_pagelist_encode_32(pl, 1); /* prot_ver */
+ ret |= ceph_pagelist_encode_32(pl, timeout);
+ ret |= ceph_pagelist_encode_32(pl, payload_len);
+ ret |= ceph_pagelist_append(pl, payload, payload_len);
+ if (ret) {
+ ceph_pagelist_release(pl);
+ return -ENOMEM;
+ }
+
+ ceph_osd_data_pagelist_init(&op->notify.request_data, pl);
+ op->indata_len = pl->length;
+ return 0;
+}
+
+/*
+ * @timeout: in seconds
+ *
+ * @preply_{pages,len} are initialized both on success and error.
+ * The caller is responsible for:
+ *
+ * ceph_release_page_vector(reply_pages, calc_pages_for(0, reply_len))
+ */
+int ceph_osdc_notify(struct ceph_osd_client *osdc,
+ struct ceph_object_id *oid,
+ struct ceph_object_locator *oloc,
+ void *payload,
+ size_t payload_len,
+ u32 timeout,
+ struct page ***preply_pages,
+ size_t *preply_len)
+{
+ struct ceph_osd_linger_request *lreq;
+ struct page **pages;
+ int ret;
+
+ WARN_ON(!timeout);
+ if (preply_pages) {
+ *preply_pages = NULL;
+ *preply_len = 0;
+ }
+
+ lreq = linger_alloc(osdc);
+ if (!lreq)
+ return -ENOMEM;
+
+ lreq->preply_pages = preply_pages;
+ lreq->preply_len = preply_len;
+
+ ceph_oid_copy(&lreq->t.base_oid, oid);
+ ceph_oloc_copy(&lreq->t.base_oloc, oloc);
+ lreq->t.flags = CEPH_OSD_FLAG_READ;
+
+ lreq->reg_req = alloc_linger_request(lreq);
+ if (!lreq->reg_req) {
+ ret = -ENOMEM;
+ goto out_put_lreq;
+ }
+
+ /* for notify_id */
+ pages = ceph_alloc_page_vector(1, GFP_NOIO);
+ if (IS_ERR(pages)) {
+ ret = PTR_ERR(pages);
+ goto out_put_lreq;
+ }
+
+ down_write(&osdc->lock);
+ linger_register(lreq); /* before osd_req_op_* */
+ ret = osd_req_op_notify_init(lreq->reg_req, 0, lreq->linger_id, 1,
+ timeout, payload, payload_len);
+ if (ret) {
+ linger_unregister(lreq);
+ up_write(&osdc->lock);
+ ceph_release_page_vector(pages, 1);
+ goto out_put_lreq;
+ }
+ ceph_osd_data_pages_init(osd_req_op_data(lreq->reg_req, 0, notify,
+ response_data),
+ pages, PAGE_SIZE, 0, false, true);
+ linger_submit(lreq);
+ up_write(&osdc->lock);
+
+ ret = linger_reg_commit_wait(lreq);
+ if (!ret)
+ ret = linger_notify_finish_wait(lreq);
+ else
+ dout("lreq %p failed to initiate notify %d\n", lreq, ret);
+
+ linger_cancel(lreq);
+out_put_lreq:
+ linger_put(lreq);
+ return ret;
+}
+EXPORT_SYMBOL(ceph_osdc_notify);
+
/*
* Call all pending notify callbacks - for use after a watch is
* unregistered, to make sure no more callbacks for it will be invoked
@@ -3693,19 +3875,51 @@
return m;
}
+/*
+ * TODO: switch to a msg-owned pagelist
+ */
+static struct ceph_msg *alloc_msg_with_page_vector(struct ceph_msg_header *hdr)
+{
+ struct ceph_msg *m;
+ int type = le16_to_cpu(hdr->type);
+ u32 front_len = le32_to_cpu(hdr->front_len);
+ u32 data_len = le32_to_cpu(hdr->data_len);
+
+ m = ceph_msg_new(type, front_len, GFP_NOIO, false);
+ if (!m)
+ return NULL;
+
+ if (data_len) {
+ struct page **pages;
+ struct ceph_osd_data osd_data;
+
+ pages = ceph_alloc_page_vector(calc_pages_for(0, data_len),
+ GFP_NOIO);
+ if (!pages) {
+ ceph_msg_put(m);
+ return NULL;
+ }
+
+ ceph_osd_data_pages_init(&osd_data, pages, data_len, 0, false,
+ false);
+ ceph_osdc_msg_data_add(m, &osd_data);
+ }
+
+ return m;
+}
+
static struct ceph_msg *alloc_msg(struct ceph_connection *con,
struct ceph_msg_header *hdr,
int *skip)
{
struct ceph_osd *osd = con->private;
int type = le16_to_cpu(hdr->type);
- int front = le32_to_cpu(hdr->front_len);
*skip = 0;
switch (type) {
case CEPH_MSG_OSD_MAP:
case CEPH_MSG_WATCH_NOTIFY:
- return ceph_msg_new(type, front, GFP_NOFS, false);
+ return alloc_msg_with_page_vector(hdr);
case CEPH_MSG_OSD_OPREPLY:
return get_reply(con, hdr, skip);
default: