From: David Teigland <teigland@redhat.com> Subject: [RHEL5.1 PATCH] dlm: consolidate transport protocols Date: Thu, 31 May 2007 09:34:57 -0500 Bugzilla: 219799 Message-Id: <20070531143457.GA2642@redhat.com> Changelog: [dlm] consolidate transport protocols bz 219799 This patch consolidates the TCP & SCTP protocols for the DLM into a single file and makes it switchable at run-time (well, at least before the DLM actually starts up!) The patch adds a new lowcomms.c file that replaces the existing lowcomms-sctp.c & lowcomms-tcp.c files. upstream: gfs2-2.6-nmw.git and -rc Index: linux-rhel51-quilt/fs/dlm/Kconfig =================================================================== --- linux-rhel51-quilt.orig/fs/dlm/Kconfig 2007-06-08 09:01:03.000000000 -0500 +++ linux-rhel51-quilt/fs/dlm/Kconfig 2007-06-08 09:24:08.000000000 -0500 @@ -1,32 +1,15 @@ menu "Distributed Lock Manager" - depends on INET && EXPERIMENTAL + depends on EXPERIMENTAL && INET config DLM tristate "Distributed Lock Manager (DLM)" depends on IPV6 || IPV6=n - depends on IP_SCTP select CONFIGFS_FS + select IP_SCTP help A general purpose distributed lock manager for kernel or userspace applications. -choice - prompt "Select DLM communications protocol" - depends on DLM - default DLM_TCP - help - The DLM Can use TCP or SCTP for it's network communications. - SCTP supports multi-homed operations whereas TCP doesn't. - However, SCTP seems to have stability problems at the moment. - -config DLM_TCP - bool "TCP/IP" - -config DLM_SCTP - bool "SCTP" - -endchoice - config DLM_DEBUG bool "DLM debugging" depends on DLM Index: linux-rhel51-quilt/fs/dlm/Makefile =================================================================== --- linux-rhel51-quilt.orig/fs/dlm/Makefile 2007-06-08 09:01:03.000000000 -0500 +++ linux-rhel51-quilt/fs/dlm/Makefile 2007-06-08 09:24:08.000000000 -0500 @@ -8,6 +8,7 @@ member.o \ memory.o \ midcomms.o \ + lowcomms.o \ rcom.o \ recover.o \ recoverd.o \ @@ -16,6 +17,3 @@ util.o dlm-$(CONFIG_DLM_DEBUG) += debug_fs.o -dlm-$(CONFIG_DLM_TCP) += lowcomms-tcp.o - -dlm-$(CONFIG_DLM_SCTP) += lowcomms-sctp.o \ No newline at end of file Index: linux-rhel51-quilt/fs/dlm/config.c =================================================================== --- linux-rhel51-quilt.orig/fs/dlm/config.c 2007-06-08 09:01:14.000000000 -0500 +++ linux-rhel51-quilt/fs/dlm/config.c 2007-06-08 09:24:08.000000000 -0500 @@ -2,7 +2,7 @@ ******************************************************************************* ** ** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved. -** Copyright (C) 2004-2005 Red Hat, Inc. All rights reserved. +** Copyright (C) 2004-2007 Red Hat, Inc. All rights reserved. ** ** This copyrighted material is made available to anyone wishing to use, ** modify, copy, or redistribute it subject to the terms and conditions @@ -89,6 +89,7 @@ unsigned int cl_toss_secs; unsigned int cl_scan_secs; unsigned int cl_log_debug; + unsigned int cl_protocol; }; enum { @@ -101,6 +102,7 @@ CLUSTER_ATTR_TOSS_SECS, CLUSTER_ATTR_SCAN_SECS, CLUSTER_ATTR_LOG_DEBUG, + CLUSTER_ATTR_PROTOCOL, }; struct cluster_attribute { @@ -159,6 +161,7 @@ CLUSTER_ATTR(toss_secs, 1); CLUSTER_ATTR(scan_secs, 1); CLUSTER_ATTR(log_debug, 0); +CLUSTER_ATTR(protocol, 0); static struct configfs_attribute *cluster_attrs[] = { [CLUSTER_ATTR_TCP_PORT] = &cluster_attr_tcp_port.attr, @@ -170,6 +173,7 @@ [CLUSTER_ATTR_TOSS_SECS] = &cluster_attr_toss_secs.attr, [CLUSTER_ATTR_SCAN_SECS] = &cluster_attr_scan_secs.attr, [CLUSTER_ATTR_LOG_DEBUG] = &cluster_attr_log_debug.attr, + [CLUSTER_ATTR_PROTOCOL] = &cluster_attr_protocol.attr, NULL, }; @@ -425,6 +429,7 @@ cl->cl_toss_secs = dlm_config.ci_toss_secs; cl->cl_scan_secs = dlm_config.ci_scan_secs; cl->cl_log_debug = dlm_config.ci_log_debug; + cl->cl_protocol = dlm_config.ci_protocol; space_list = &sps->ss_group; comm_list = &cms->cs_group; @@ -904,6 +909,7 @@ #define DEFAULT_TOSS_SECS 10 #define DEFAULT_SCAN_SECS 5 #define DEFAULT_LOG_DEBUG 0 +#define DEFAULT_PROTOCOL 0 struct dlm_config_info dlm_config = { .ci_tcp_port = DEFAULT_TCP_PORT, @@ -914,6 +920,7 @@ .ci_recover_timer = DEFAULT_RECOVER_TIMER, .ci_toss_secs = DEFAULT_TOSS_SECS, .ci_scan_secs = DEFAULT_SCAN_SECS, - .ci_log_debug = DEFAULT_LOG_DEBUG + .ci_log_debug = DEFAULT_LOG_DEBUG, + .ci_protocol = DEFAULT_PROTOCOL }; Index: linux-rhel51-quilt/fs/dlm/config.h =================================================================== --- linux-rhel51-quilt.orig/fs/dlm/config.h 2007-06-08 09:01:14.000000000 -0500 +++ linux-rhel51-quilt/fs/dlm/config.h 2007-06-08 09:24:08.000000000 -0500 @@ -2,7 +2,7 @@ ******************************************************************************* ** ** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved. -** Copyright (C) 2004-2005 Red Hat, Inc. All rights reserved. +** Copyright (C) 2004-2007 Red Hat, Inc. All rights reserved. ** ** This copyrighted material is made available to anyone wishing to use, ** modify, copy, or redistribute it subject to the terms and conditions @@ -26,6 +26,7 @@ int ci_toss_secs; int ci_scan_secs; int ci_log_debug; + int ci_protocol; }; extern struct dlm_config_info dlm_config; Index: linux-rhel51-quilt/fs/dlm/lowcomms-sctp.c =================================================================== --- linux-rhel51-quilt.orig/fs/dlm/lowcomms-sctp.c 2007-06-08 09:01:14.000000000 -0500 +++ /dev/null 1970-01-01 00:00:00.000000000 +0000 @@ -1,1238 +0,0 @@ -/****************************************************************************** -******************************************************************************* -** -** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved. -** Copyright (C) 2004-2005 Red Hat, Inc. All rights reserved. -** -** This copyrighted material is made available to anyone wishing to use, -** modify, copy, or redistribute it subject to the terms and conditions -** of the GNU General Public License v.2. -** -******************************************************************************* -******************************************************************************/ - -/* - * lowcomms.c - * - * This is the "low-level" comms layer. - * - * It is responsible for sending/receiving messages - * from other nodes in the cluster. - * - * Cluster nodes are referred to by their nodeids. nodeids are - * simply 32 bit numbers to the locking module - if they need to - * be expanded for the cluster infrastructure then that is it's - * responsibility. It is this layer's - * responsibility to resolve these into IP address or - * whatever it needs for inter-node communication. - * - * The comms level is two kernel threads that deal mainly with - * the receiving of messages from other nodes and passing them - * up to the mid-level comms layer (which understands the - * message format) for execution by the locking core, and - * a send thread which does all the setting up of connections - * to remote nodes and the sending of data. Threads are not allowed - * to send their own data because it may cause them to wait in times - * of high load. Also, this way, the sending thread can collect together - * messages bound for one node and send them in one block. - * - * I don't see any problem with the recv thread executing the locking - * code on behalf of remote processes as the locking code is - * short, efficient and never (well, hardly ever) waits. - * - */ - -#include <asm/ioctls.h> -#include <net/sock.h> -#include <net/tcp.h> -#include <net/sctp/user.h> -#include <linux/pagemap.h> -#include <linux/socket.h> -#include <linux/idr.h> - -#include "dlm_internal.h" -#include "lowcomms.h" -#include "config.h" -#include "midcomms.h" - -static struct sockaddr_storage *dlm_local_addr[DLM_MAX_ADDR_COUNT]; -static int dlm_local_count; -static int dlm_local_nodeid; - -/* One of these per connected node */ - -#define NI_INIT_PENDING 1 -#define NI_WRITE_PENDING 2 - -struct nodeinfo { - spinlock_t lock; - sctp_assoc_t assoc_id; - unsigned long flags; - struct list_head write_list; /* nodes with pending writes */ - struct list_head writequeue; /* outgoing writequeue_entries */ - spinlock_t writequeue_lock; - int nodeid; -}; - -static DEFINE_IDR(nodeinfo_idr); -static struct rw_semaphore nodeinfo_lock; -static int max_nodeid; - -struct cbuf { - unsigned base; - unsigned len; - unsigned mask; -}; - -/* Just the one of these, now. But this struct keeps - the connection-specific variables together */ - -#define CF_READ_PENDING 1 - -struct connection { - struct socket *sock; - unsigned long flags; - struct page *rx_page; - atomic_t waiting_requests; - struct cbuf cb; - int eagain_flag; -}; - -/* An entry waiting to be sent */ - -struct writequeue_entry { - struct list_head list; - struct page *page; - int offset; - int len; - int end; - int users; - struct nodeinfo *ni; -}; - -#define CBUF_ADD(cb, n) do { (cb)->len += n; } while(0) -#define CBUF_EMPTY(cb) ((cb)->len == 0) -#define CBUF_MAY_ADD(cb, n) (((cb)->len + (n)) < ((cb)->mask + 1)) -#define CBUF_DATA(cb) (((cb)->base + (cb)->len) & (cb)->mask) - -#define CBUF_INIT(cb, size) \ -do { \ - (cb)->base = (cb)->len = 0; \ - (cb)->mask = ((size)-1); \ -} while(0) - -#define CBUF_EAT(cb, n) \ -do { \ - (cb)->len -= (n); \ - (cb)->base += (n); \ - (cb)->base &= (cb)->mask; \ -} while(0) - - -/* List of nodes which have writes pending */ -static struct list_head write_nodes; -static spinlock_t write_nodes_lock; - -/* Maximum number of incoming messages to process before - * doing a schedule() - */ -#define MAX_RX_MSG_COUNT 25 - -/* Manage daemons */ -static struct task_struct *recv_task; -static struct task_struct *send_task; -static wait_queue_head_t lowcomms_recv_wait; -static atomic_t accepting; - -/* The SCTP connection */ -static struct connection sctp_con; - - -static int nodeid_to_addr(int nodeid, struct sockaddr *retaddr) -{ - struct sockaddr_storage addr; - int error; - - if (!dlm_local_count) - return -1; - - error = dlm_nodeid_to_addr(nodeid, &addr); - if (error) - return error; - - if (dlm_local_addr[0]->ss_family == AF_INET) { - struct sockaddr_in *in4 = (struct sockaddr_in *) &addr; - struct sockaddr_in *ret4 = (struct sockaddr_in *) retaddr; - ret4->sin_addr.s_addr = in4->sin_addr.s_addr; - } else { - struct sockaddr_in6 *in6 = (struct sockaddr_in6 *) &addr; - struct sockaddr_in6 *ret6 = (struct sockaddr_in6 *) retaddr; - memcpy(&ret6->sin6_addr, &in6->sin6_addr, - sizeof(in6->sin6_addr)); - } - - return 0; -} - -static struct nodeinfo *nodeid2nodeinfo(int nodeid, int alloc) -{ - struct nodeinfo *ni; - int r; - int n; - - down_read(&nodeinfo_lock); - ni = idr_find(&nodeinfo_idr, nodeid); - up_read(&nodeinfo_lock); - - if (!ni && alloc) { - down_write(&nodeinfo_lock); - - ni = idr_find(&nodeinfo_idr, nodeid); - if (ni) - goto out_up; - - r = idr_pre_get(&nodeinfo_idr, alloc); - if (!r) - goto out_up; - - ni = kmalloc(sizeof(struct nodeinfo), alloc); - if (!ni) - goto out_up; - - r = idr_get_new_above(&nodeinfo_idr, ni, nodeid, &n); - if (r) { - kfree(ni); - ni = NULL; - goto out_up; - } - if (n != nodeid) { - idr_remove(&nodeinfo_idr, n); - kfree(ni); - ni = NULL; - goto out_up; - } - memset(ni, 0, sizeof(struct nodeinfo)); - spin_lock_init(&ni->lock); - INIT_LIST_HEAD(&ni->writequeue); - spin_lock_init(&ni->writequeue_lock); - ni->nodeid = nodeid; - - if (nodeid > max_nodeid) - max_nodeid = nodeid; - out_up: - up_write(&nodeinfo_lock); - } - - return ni; -} - -/* Don't call this too often... */ -static struct nodeinfo *assoc2nodeinfo(sctp_assoc_t assoc) -{ - int i; - struct nodeinfo *ni; - - for (i=1; i<=max_nodeid; i++) { - ni = nodeid2nodeinfo(i, 0); - if (ni && ni->assoc_id == assoc) - return ni; - } - return NULL; -} - -/* Data or notification available on socket */ -static void lowcomms_data_ready(struct sock *sk, int count_unused) -{ - atomic_inc(&sctp_con.waiting_requests); - if (test_and_set_bit(CF_READ_PENDING, &sctp_con.flags)) - return; - - wake_up_interruptible(&lowcomms_recv_wait); -} - - -/* Add the port number to an IP6 or 4 sockaddr and return the address length. - Also padd out the struct with zeros to make comparisons meaningful */ - -static void make_sockaddr(struct sockaddr_storage *saddr, uint16_t port, - int *addr_len) -{ - struct sockaddr_in *local4_addr; - struct sockaddr_in6 *local6_addr; - - if (!dlm_local_count) - return; - - if (!port) { - if (dlm_local_addr[0]->ss_family == AF_INET) { - local4_addr = (struct sockaddr_in *)dlm_local_addr[0]; - port = be16_to_cpu(local4_addr->sin_port); - } else { - local6_addr = (struct sockaddr_in6 *)dlm_local_addr[0]; - port = be16_to_cpu(local6_addr->sin6_port); - } - } - - saddr->ss_family = dlm_local_addr[0]->ss_family; - if (dlm_local_addr[0]->ss_family == AF_INET) { - struct sockaddr_in *in4_addr = (struct sockaddr_in *)saddr; - in4_addr->sin_port = cpu_to_be16(port); - memset(&in4_addr->sin_zero, 0, sizeof(in4_addr->sin_zero)); - memset(in4_addr+1, 0, sizeof(struct sockaddr_storage) - - sizeof(struct sockaddr_in)); - *addr_len = sizeof(struct sockaddr_in); - } else { - struct sockaddr_in6 *in6_addr = (struct sockaddr_in6 *)saddr; - in6_addr->sin6_port = cpu_to_be16(port); - memset(in6_addr+1, 0, sizeof(struct sockaddr_storage) - - sizeof(struct sockaddr_in6)); - *addr_len = sizeof(struct sockaddr_in6); - } -} - -/* Close the connection and tidy up */ -static void close_connection(void) -{ - if (sctp_con.sock) { - sock_release(sctp_con.sock); - sctp_con.sock = NULL; - } - - if (sctp_con.rx_page) { - __free_page(sctp_con.rx_page); - sctp_con.rx_page = NULL; - } -} - -/* We only send shutdown messages to nodes that are not part of the cluster */ -static void send_shutdown(sctp_assoc_t associd) -{ - static char outcmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))]; - struct msghdr outmessage; - struct cmsghdr *cmsg; - struct sctp_sndrcvinfo *sinfo; - int ret; - - outmessage.msg_name = NULL; - outmessage.msg_namelen = 0; - outmessage.msg_control = outcmsg; - outmessage.msg_controllen = sizeof(outcmsg); - outmessage.msg_flags = MSG_EOR; - - cmsg = CMSG_FIRSTHDR(&outmessage); - cmsg->cmsg_level = IPPROTO_SCTP; - cmsg->cmsg_type = SCTP_SNDRCV; - cmsg->cmsg_len = CMSG_LEN(sizeof(struct sctp_sndrcvinfo)); - outmessage.msg_controllen = cmsg->cmsg_len; - sinfo = (struct sctp_sndrcvinfo *)CMSG_DATA(cmsg); - memset(sinfo, 0x00, sizeof(struct sctp_sndrcvinfo)); - - sinfo->sinfo_flags |= MSG_EOF; - sinfo->sinfo_assoc_id = associd; - - ret = kernel_sendmsg(sctp_con.sock, &outmessage, NULL, 0, 0); - - if (ret != 0) - log_print("send EOF to node failed: %d", ret); -} - - -/* INIT failed but we don't know which node... - restart INIT on all pending nodes */ -static void init_failed(void) -{ - int i; - struct nodeinfo *ni; - - for (i=1; i<=max_nodeid; i++) { - ni = nodeid2nodeinfo(i, 0); - if (!ni) - continue; - - if (test_and_clear_bit(NI_INIT_PENDING, &ni->flags)) { - ni->assoc_id = 0; - if (!test_and_set_bit(NI_WRITE_PENDING, &ni->flags)) { - spin_lock_bh(&write_nodes_lock); - list_add_tail(&ni->write_list, &write_nodes); - spin_unlock_bh(&write_nodes_lock); - } - } - } - wake_up_process(send_task); -} - -/* Something happened to an association */ -static void process_sctp_notification(struct msghdr *msg, char *buf) -{ - union sctp_notification *sn = (union sctp_notification *)buf; - - if (sn->sn_header.sn_type == SCTP_ASSOC_CHANGE) { - switch (sn->sn_assoc_change.sac_state) { - - case SCTP_COMM_UP: - case SCTP_RESTART: - { - /* Check that the new node is in the lockspace */ - struct sctp_prim prim; - mm_segment_t fs; - int nodeid; - int prim_len, ret; - int addr_len; - struct nodeinfo *ni; - - /* This seems to happen when we received a connection - * too early... or something... anyway, it happens but - * we always seem to get a real message too, see - * receive_from_sock */ - - if ((int)sn->sn_assoc_change.sac_assoc_id <= 0) { - log_print("COMM_UP for invalid assoc ID %d", - (int)sn->sn_assoc_change.sac_assoc_id); - init_failed(); - return; - } - memset(&prim, 0, sizeof(struct sctp_prim)); - prim_len = sizeof(struct sctp_prim); - prim.ssp_assoc_id = sn->sn_assoc_change.sac_assoc_id; - - fs = get_fs(); - set_fs(get_ds()); - ret = sctp_con.sock->ops->getsockopt(sctp_con.sock, - IPPROTO_SCTP, SCTP_PRIMARY_ADDR, - (char*)&prim, &prim_len); - set_fs(fs); - if (ret < 0) { - struct nodeinfo *ni; - - log_print("getsockopt/sctp_primary_addr on " - "new assoc %d failed : %d", - (int)sn->sn_assoc_change.sac_assoc_id, ret); - - /* Retry INIT later */ - ni = assoc2nodeinfo(sn->sn_assoc_change.sac_assoc_id); - if (ni) - clear_bit(NI_INIT_PENDING, &ni->flags); - return; - } - make_sockaddr(&prim.ssp_addr, 0, &addr_len); - if (dlm_addr_to_nodeid(&prim.ssp_addr, &nodeid)) { - log_print("reject connect from unknown addr"); - send_shutdown(prim.ssp_assoc_id); - return; - } - - ni = nodeid2nodeinfo(nodeid, GFP_KERNEL); - if (!ni) - return; - - /* Save the assoc ID */ - spin_lock(&ni->lock); - ni->assoc_id = sn->sn_assoc_change.sac_assoc_id; - spin_unlock(&ni->lock); - - log_print("got new/restarted association %d nodeid %d", - (int)sn->sn_assoc_change.sac_assoc_id, nodeid); - - /* Send any pending writes */ - clear_bit(NI_INIT_PENDING, &ni->flags); - if (!test_and_set_bit(NI_WRITE_PENDING, &ni->flags)) { - spin_lock_bh(&write_nodes_lock); - list_add_tail(&ni->write_list, &write_nodes); - spin_unlock_bh(&write_nodes_lock); - } - wake_up_process(send_task); - } - break; - - case SCTP_COMM_LOST: - case SCTP_SHUTDOWN_COMP: - { - struct nodeinfo *ni; - - ni = assoc2nodeinfo(sn->sn_assoc_change.sac_assoc_id); - if (ni) { - spin_lock(&ni->lock); - ni->assoc_id = 0; - spin_unlock(&ni->lock); - } - } - break; - - /* We don't know which INIT failed, so clear the PENDING flags - * on them all. if assoc_id is zero then it will then try - * again */ - - case SCTP_CANT_STR_ASSOC: - { - log_print("Can't start SCTP association - retrying"); - init_failed(); - } - break; - - default: - log_print("unexpected SCTP assoc change id=%d state=%d", - (int)sn->sn_assoc_change.sac_assoc_id, - sn->sn_assoc_change.sac_state); - } - } -} - -/* Data received from remote end */ -static int receive_from_sock(void) -{ - int ret = 0; - struct msghdr msg; - struct kvec iov[2]; - unsigned len; - int r; - struct sctp_sndrcvinfo *sinfo; - struct cmsghdr *cmsg; - struct nodeinfo *ni; - - /* These two are marginally too big for stack allocation, but this - * function is (currently) only called by dlm_recvd so static should be - * OK. - */ - static struct sockaddr_storage msgname; - static char incmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))]; - - if (sctp_con.sock == NULL) - goto out; - - if (sctp_con.rx_page == NULL) { - /* - * This doesn't need to be atomic, but I think it should - * improve performance if it is. - */ - sctp_con.rx_page = alloc_page(GFP_ATOMIC); - if (sctp_con.rx_page == NULL) - goto out_resched; - CBUF_INIT(&sctp_con.cb, PAGE_CACHE_SIZE); - } - - memset(&incmsg, 0, sizeof(incmsg)); - memset(&msgname, 0, sizeof(msgname)); - - memset(incmsg, 0, sizeof(incmsg)); - msg.msg_name = &msgname; - msg.msg_namelen = sizeof(msgname); - msg.msg_flags = 0; - msg.msg_control = incmsg; - msg.msg_controllen = sizeof(incmsg); - - /* I don't see why this circular buffer stuff is necessary for SCTP - * which is a packet-based protocol, but the whole thing breaks under - * load without it! The overhead is minimal (and is in the TCP lowcomms - * anyway, of course) so I'll leave it in until I can figure out what's - * really happening. - */ - - /* - * iov[0] is the bit of the circular buffer between the current end - * point (cb.base + cb.len) and the end of the buffer. - */ - iov[0].iov_len = sctp_con.cb.base - CBUF_DATA(&sctp_con.cb); - iov[0].iov_base = page_address(sctp_con.rx_page) + - CBUF_DATA(&sctp_con.cb); - iov[1].iov_len = 0; - - /* - * iov[1] is the bit of the circular buffer between the start of the - * buffer and the start of the currently used section (cb.base) - */ - if (CBUF_DATA(&sctp_con.cb) >= sctp_con.cb.base) { - iov[0].iov_len = PAGE_CACHE_SIZE - CBUF_DATA(&sctp_con.cb); - iov[1].iov_len = sctp_con.cb.base; - iov[1].iov_base = page_address(sctp_con.rx_page); - msg.msg_iovlen = 2; - } - len = iov[0].iov_len + iov[1].iov_len; - - r = ret = kernel_recvmsg(sctp_con.sock, &msg, iov, 1, len, - MSG_NOSIGNAL | MSG_DONTWAIT); - if (ret <= 0) - goto out_close; - - msg.msg_control = incmsg; - msg.msg_controllen = sizeof(incmsg); - cmsg = CMSG_FIRSTHDR(&msg); - sinfo = (struct sctp_sndrcvinfo *)CMSG_DATA(cmsg); - - if (msg.msg_flags & MSG_NOTIFICATION) { - process_sctp_notification(&msg, page_address(sctp_con.rx_page)); - return 0; - } - - /* Is this a new association ? */ - ni = nodeid2nodeinfo(le32_to_cpu(sinfo->sinfo_ppid), GFP_KERNEL); - if (ni) { - ni->assoc_id = sinfo->sinfo_assoc_id; - if (test_and_clear_bit(NI_INIT_PENDING, &ni->flags)) { - - if (!test_and_set_bit(NI_WRITE_PENDING, &ni->flags)) { - spin_lock_bh(&write_nodes_lock); - list_add_tail(&ni->write_list, &write_nodes); - spin_unlock_bh(&write_nodes_lock); - } - wake_up_process(send_task); - } - } - - /* INIT sends a message with length of 1 - ignore it */ - if (r == 1) - return 0; - - CBUF_ADD(&sctp_con.cb, ret); - ret = dlm_process_incoming_buffer(cpu_to_le32(sinfo->sinfo_ppid), - page_address(sctp_con.rx_page), - sctp_con.cb.base, sctp_con.cb.len, - PAGE_CACHE_SIZE); - if (ret < 0) - goto out_close; - CBUF_EAT(&sctp_con.cb, ret); - - out: - ret = 0; - goto out_ret; - - out_resched: - lowcomms_data_ready(sctp_con.sock->sk, 0); - ret = 0; - schedule(); - goto out_ret; - - out_close: - if (ret != -EAGAIN) - log_print("error reading from sctp socket: %d", ret); - out_ret: - return ret; -} - -/* Bind to an IP address. SCTP allows multiple address so it can do multi-homing */ -static int add_bind_addr(struct sockaddr_storage *addr, int addr_len, int num) -{ - mm_segment_t fs; - int result = 0; - - fs = get_fs(); - set_fs(get_ds()); - if (num == 1) - result = sctp_con.sock->ops->bind(sctp_con.sock, - (struct sockaddr *) addr, addr_len); - else - result = sctp_con.sock->ops->setsockopt(sctp_con.sock, SOL_SCTP, - SCTP_SOCKOPT_BINDX_ADD, (char *)addr, addr_len); - set_fs(fs); - - if (result < 0) - log_print("Can't bind to port %d addr number %d", - dlm_config.ci_tcp_port, num); - - return result; -} - -static void init_local(void) -{ - struct sockaddr_storage sas, *addr; - int i; - - dlm_local_nodeid = dlm_our_nodeid(); - - for (i = 0; i < DLM_MAX_ADDR_COUNT - 1; i++) { - if (dlm_our_addr(&sas, i)) - break; - - addr = kmalloc(sizeof(*addr), GFP_KERNEL); - if (!addr) - break; - memcpy(addr, &sas, sizeof(*addr)); - dlm_local_addr[dlm_local_count++] = addr; - } -} - -/* Initialise SCTP socket and bind to all interfaces */ -static int init_sock(void) -{ - mm_segment_t fs; - struct socket *sock = NULL; - struct sockaddr_storage localaddr; - struct sctp_event_subscribe subscribe; - int result = -EINVAL, num = 1, i, addr_len; - - if (!dlm_local_count) { - init_local(); - if (!dlm_local_count) { - log_print("no local IP address has been set"); - goto out; - } - } - - result = sock_create_kern(dlm_local_addr[0]->ss_family, SOCK_SEQPACKET, - IPPROTO_SCTP, &sock); - if (result < 0) { - log_print("Can't create comms socket, check SCTP is loaded"); - goto out; - } - - /* Listen for events */ - memset(&subscribe, 0, sizeof(subscribe)); - subscribe.sctp_data_io_event = 1; - subscribe.sctp_association_event = 1; - subscribe.sctp_send_failure_event = 1; - subscribe.sctp_shutdown_event = 1; - subscribe.sctp_partial_delivery_event = 1; - - fs = get_fs(); - set_fs(get_ds()); - result = sock->ops->setsockopt(sock, SOL_SCTP, SCTP_EVENTS, - (char *)&subscribe, sizeof(subscribe)); - set_fs(fs); - - if (result < 0) { - log_print("Failed to set SCTP_EVENTS on socket: result=%d", - result); - goto create_delsock; - } - - /* Init con struct */ - sock->sk->sk_user_data = &sctp_con; - sctp_con.sock = sock; - sctp_con.sock->sk->sk_data_ready = lowcomms_data_ready; - - /* Bind to all interfaces. */ - for (i = 0; i < dlm_local_count; i++) { - memcpy(&localaddr, dlm_local_addr[i], sizeof(localaddr)); - make_sockaddr(&localaddr, dlm_config.ci_tcp_port, &addr_len); - - result = add_bind_addr(&localaddr, addr_len, num); - if (result) - goto create_delsock; - ++num; - } - - result = sock->ops->listen(sock, 5); - if (result < 0) { - log_print("Can't set socket listening"); - goto create_delsock; - } - - return 0; - - create_delsock: - sock_release(sock); - sctp_con.sock = NULL; - out: - return result; -} - - -static struct writequeue_entry *new_writequeue_entry(int allocation) -{ - struct writequeue_entry *entry; - - entry = kmalloc(sizeof(struct writequeue_entry), allocation); - if (!entry) - return NULL; - - entry->page = alloc_page(allocation); - if (!entry->page) { - kfree(entry); - return NULL; - } - - entry->offset = 0; - entry->len = 0; - entry->end = 0; - entry->users = 0; - - return entry; -} - -void *dlm_lowcomms_get_buffer(int nodeid, int len, int allocation, char **ppc) -{ - struct writequeue_entry *e; - int offset = 0; - int users = 0; - struct nodeinfo *ni; - - if (!atomic_read(&accepting)) - return NULL; - - ni = nodeid2nodeinfo(nodeid, allocation); - if (!ni) - return NULL; - - spin_lock(&ni->writequeue_lock); - e = list_entry(ni->writequeue.prev, struct writequeue_entry, list); - if (((struct list_head *) e == &ni->writequeue) || - (PAGE_CACHE_SIZE - e->end < len)) { - e = NULL; - } else { - offset = e->end; - e->end += len; - users = e->users++; - } - spin_unlock(&ni->writequeue_lock); - - if (e) { - got_one: - if (users == 0) - kmap(e->page); - *ppc = page_address(e->page) + offset; - return e; - } - - e = new_writequeue_entry(allocation); - if (e) { - spin_lock(&ni->writequeue_lock); - offset = e->end; - e->end += len; - e->ni = ni; - users = e->users++; - list_add_tail(&e->list, &ni->writequeue); - spin_unlock(&ni->writequeue_lock); - goto got_one; - } - return NULL; -} - -void dlm_lowcomms_commit_buffer(void *arg) -{ - struct writequeue_entry *e = (struct writequeue_entry *) arg; - int users; - struct nodeinfo *ni = e->ni; - - if (!atomic_read(&accepting)) - return; - - spin_lock(&ni->writequeue_lock); - users = --e->users; - if (users) - goto out; - e->len = e->end - e->offset; - kunmap(e->page); - spin_unlock(&ni->writequeue_lock); - - if (!test_and_set_bit(NI_WRITE_PENDING, &ni->flags)) { - spin_lock_bh(&write_nodes_lock); - list_add_tail(&ni->write_list, &write_nodes); - spin_unlock_bh(&write_nodes_lock); - wake_up_process(send_task); - } - return; - - out: - spin_unlock(&ni->writequeue_lock); - return; -} - -static void free_entry(struct writequeue_entry *e) -{ - __free_page(e->page); - kfree(e); -} - -/* Initiate an SCTP association. In theory we could just use sendmsg() on - the first IP address and it should work, but this allows us to set up the - association before sending any valuable data that we can't afford to lose. - It also keeps the send path clean as it can now always use the association ID */ -static void initiate_association(int nodeid) -{ - struct sockaddr_storage rem_addr; - static char outcmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))]; - struct msghdr outmessage; - struct cmsghdr *cmsg; - struct sctp_sndrcvinfo *sinfo; - int ret; - int addrlen; - char buf[1]; - struct kvec iov[1]; - struct nodeinfo *ni; - - log_print("Initiating association with node %d", nodeid); - - ni = nodeid2nodeinfo(nodeid, GFP_KERNEL); - if (!ni) - return; - - if (nodeid_to_addr(nodeid, (struct sockaddr *)&rem_addr)) { - log_print("no address for nodeid %d", nodeid); - return; - } - - make_sockaddr(&rem_addr, dlm_config.ci_tcp_port, &addrlen); - - outmessage.msg_name = &rem_addr; - outmessage.msg_namelen = addrlen; - outmessage.msg_control = outcmsg; - outmessage.msg_controllen = sizeof(outcmsg); - outmessage.msg_flags = MSG_EOR; - - iov[0].iov_base = buf; - iov[0].iov_len = 1; - - /* Real INIT messages seem to cause trouble. Just send a 1 byte message - we can afford to lose */ - cmsg = CMSG_FIRSTHDR(&outmessage); - cmsg->cmsg_level = IPPROTO_SCTP; - cmsg->cmsg_type = SCTP_SNDRCV; - cmsg->cmsg_len = CMSG_LEN(sizeof(struct sctp_sndrcvinfo)); - sinfo = (struct sctp_sndrcvinfo *)CMSG_DATA(cmsg); - memset(sinfo, 0x00, sizeof(struct sctp_sndrcvinfo)); - sinfo->sinfo_ppid = cpu_to_le32(dlm_local_nodeid); - - outmessage.msg_controllen = cmsg->cmsg_len; - ret = kernel_sendmsg(sctp_con.sock, &outmessage, iov, 1, 1); - if (ret < 0) { - log_print("send INIT to node failed: %d", ret); - /* Try again later */ - clear_bit(NI_INIT_PENDING, &ni->flags); - } -} - -/* Send a message */ -static int send_to_sock(struct nodeinfo *ni) -{ - int ret = 0; - struct writequeue_entry *e; - int len, offset; - struct msghdr outmsg; - static char outcmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))]; - struct cmsghdr *cmsg; - struct sctp_sndrcvinfo *sinfo; - struct kvec iov; - - /* See if we need to init an association before we start - sending precious messages */ - spin_lock(&ni->lock); - if (!ni->assoc_id && !test_and_set_bit(NI_INIT_PENDING, &ni->flags)) { - spin_unlock(&ni->lock); - initiate_association(ni->nodeid); - return 0; - } - spin_unlock(&ni->lock); - - outmsg.msg_name = NULL; /* We use assoc_id */ - outmsg.msg_namelen = 0; - outmsg.msg_control = outcmsg; - outmsg.msg_controllen = sizeof(outcmsg); - outmsg.msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL | MSG_EOR; - - cmsg = CMSG_FIRSTHDR(&outmsg); - cmsg->cmsg_level = IPPROTO_SCTP; - cmsg->cmsg_type = SCTP_SNDRCV; - cmsg->cmsg_len = CMSG_LEN(sizeof(struct sctp_sndrcvinfo)); - sinfo = (struct sctp_sndrcvinfo *)CMSG_DATA(cmsg); - memset(sinfo, 0x00, sizeof(struct sctp_sndrcvinfo)); - sinfo->sinfo_ppid = cpu_to_le32(dlm_local_nodeid); - sinfo->sinfo_assoc_id = ni->assoc_id; - outmsg.msg_controllen = cmsg->cmsg_len; - - spin_lock(&ni->writequeue_lock); - for (;;) { - if (list_empty(&ni->writequeue)) - break; - e = list_entry(ni->writequeue.next, struct writequeue_entry, - list); - len = e->len; - offset = e->offset; - BUG_ON(len == 0 && e->users == 0); - spin_unlock(&ni->writequeue_lock); - kmap(e->page); - - ret = 0; - if (len) { - iov.iov_base = page_address(e->page)+offset; - iov.iov_len = len; - - ret = kernel_sendmsg(sctp_con.sock, &outmsg, &iov, 1, - len); - if (ret == -EAGAIN) { - sctp_con.eagain_flag = 1; - goto out; - } else if (ret < 0) - goto send_error; - } else { - /* Don't starve people filling buffers */ - schedule(); - } - - spin_lock(&ni->writequeue_lock); - e->offset += ret; - e->len -= ret; - - if (e->len == 0 && e->users == 0) { - list_del(&e->list); - free_entry(e); - continue; - } - } - spin_unlock(&ni->writequeue_lock); - out: - return ret; - - send_error: - log_print("Error sending to node %d %d", ni->nodeid, ret); - spin_lock(&ni->lock); - if (!test_and_set_bit(NI_INIT_PENDING, &ni->flags)) { - ni->assoc_id = 0; - spin_unlock(&ni->lock); - initiate_association(ni->nodeid); - } else - spin_unlock(&ni->lock); - - return ret; -} - -/* Try to send any messages that are pending */ -static void process_output_queue(void) -{ - struct list_head *list; - struct list_head *temp; - - spin_lock_bh(&write_nodes_lock); - list_for_each_safe(list, temp, &write_nodes) { - struct nodeinfo *ni = - list_entry(list, struct nodeinfo, write_list); - clear_bit(NI_WRITE_PENDING, &ni->flags); - list_del(&ni->write_list); - - spin_unlock_bh(&write_nodes_lock); - - send_to_sock(ni); - spin_lock_bh(&write_nodes_lock); - } - spin_unlock_bh(&write_nodes_lock); -} - -/* Called after we've had -EAGAIN and been woken up */ -static void refill_write_queue(void) -{ - int i; - - for (i=1; i<=max_nodeid; i++) { - struct nodeinfo *ni = nodeid2nodeinfo(i, 0); - - if (ni) { - if (!test_and_set_bit(NI_WRITE_PENDING, &ni->flags)) { - spin_lock_bh(&write_nodes_lock); - list_add_tail(&ni->write_list, &write_nodes); - spin_unlock_bh(&write_nodes_lock); - } - } - } -} - -static void clean_one_writequeue(struct nodeinfo *ni) -{ - struct list_head *list; - struct list_head *temp; - - spin_lock(&ni->writequeue_lock); - list_for_each_safe(list, temp, &ni->writequeue) { - struct writequeue_entry *e = - list_entry(list, struct writequeue_entry, list); - list_del(&e->list); - free_entry(e); - } - spin_unlock(&ni->writequeue_lock); -} - -static void clean_writequeues(void) -{ - int i; - - for (i=1; i<=max_nodeid; i++) { - struct nodeinfo *ni = nodeid2nodeinfo(i, 0); - if (ni) - clean_one_writequeue(ni); - } -} - - -static void dealloc_nodeinfo(void) -{ - int i; - - for (i=1; i<=max_nodeid; i++) { - struct nodeinfo *ni = nodeid2nodeinfo(i, 0); - if (ni) { - idr_remove(&nodeinfo_idr, i); - kfree(ni); - } - } -} - -int dlm_lowcomms_close(int nodeid) -{ - struct nodeinfo *ni; - - ni = nodeid2nodeinfo(nodeid, 0); - if (!ni) - return -1; - - spin_lock(&ni->lock); - if (ni->assoc_id) { - ni->assoc_id = 0; - /* Don't send shutdown here, sctp will just queue it - till the node comes back up! */ - } - spin_unlock(&ni->lock); - - clean_one_writequeue(ni); - clear_bit(NI_INIT_PENDING, &ni->flags); - return 0; -} - -static int write_list_empty(void) -{ - int status; - - spin_lock_bh(&write_nodes_lock); - status = list_empty(&write_nodes); - spin_unlock_bh(&write_nodes_lock); - - return status; -} - -static int dlm_recvd(void *data) -{ - DECLARE_WAITQUEUE(wait, current); - - while (!kthread_should_stop()) { - int count = 0; - - set_current_state(TASK_INTERRUPTIBLE); - add_wait_queue(&lowcomms_recv_wait, &wait); - if (!test_bit(CF_READ_PENDING, &sctp_con.flags)) - schedule(); - remove_wait_queue(&lowcomms_recv_wait, &wait); - set_current_state(TASK_RUNNING); - - if (test_and_clear_bit(CF_READ_PENDING, &sctp_con.flags)) { - int ret; - - do { - ret = receive_from_sock(); - - /* Don't starve out everyone else */ - if (++count >= MAX_RX_MSG_COUNT) { - schedule(); - count = 0; - } - } while (!kthread_should_stop() && ret >=0); - } - schedule(); - } - - return 0; -} - -static int dlm_sendd(void *data) -{ - DECLARE_WAITQUEUE(wait, current); - - add_wait_queue(sctp_con.sock->sk->sk_sleep, &wait); - - while (!kthread_should_stop()) { - set_current_state(TASK_INTERRUPTIBLE); - if (write_list_empty()) - schedule(); - set_current_state(TASK_RUNNING); - - if (sctp_con.eagain_flag) { - sctp_con.eagain_flag = 0; - refill_write_queue(); - } - process_output_queue(); - } - - remove_wait_queue(sctp_con.sock->sk->sk_sleep, &wait); - - return 0; -} - -static void daemons_stop(void) -{ - kthread_stop(recv_task); - kthread_stop(send_task); -} - -static int daemons_start(void) -{ - struct task_struct *p; - int error; - - p = kthread_run(dlm_recvd, NULL, "dlm_recvd"); - error = IS_ERR(p); - if (error) { - log_print("can't start dlm_recvd %d", error); - return error; - } - recv_task = p; - - p = kthread_run(dlm_sendd, NULL, "dlm_sendd"); - error = IS_ERR(p); - if (error) { - log_print("can't start dlm_sendd %d", error); - kthread_stop(recv_task); - return error; - } - send_task = p; - - return 0; -} - -/* - * This is quite likely to sleep... - */ -int dlm_lowcomms_start(void) -{ - int error; - - error = init_sock(); - if (error) - goto fail_sock; - error = daemons_start(); - if (error) - goto fail_sock; - atomic_set(&accepting, 1); - return 0; - - fail_sock: - close_connection(); - return error; -} - -/* Set all the activity flags to prevent any socket activity. */ - -void dlm_lowcomms_stop(void) -{ - atomic_set(&accepting, 0); - sctp_con.flags = 0x7; - daemons_stop(); - clean_writequeues(); - close_connection(); - dealloc_nodeinfo(); - max_nodeid = 0; -} - -int dlm_lowcomms_init(void) -{ - init_waitqueue_head(&lowcomms_recv_wait); - spin_lock_init(&write_nodes_lock); - INIT_LIST_HEAD(&write_nodes); - init_rwsem(&nodeinfo_lock); - return 0; -} - -void dlm_lowcomms_exit(void) -{ - int i; - - for (i = 0; i < dlm_local_count; i++) - kfree(dlm_local_addr[i]); - dlm_local_count = 0; - dlm_local_nodeid = 0; -} - Index: linux-rhel51-quilt/fs/dlm/lowcomms-tcp.c =================================================================== --- linux-rhel51-quilt.orig/fs/dlm/lowcomms-tcp.c 2007-06-08 09:01:14.000000000 -0500 +++ /dev/null 1970-01-01 00:00:00.000000000 +0000 @@ -1,1263 +0,0 @@ -/****************************************************************************** -******************************************************************************* -** -** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved. -** Copyright (C) 2004-2006 Red Hat, Inc. All rights reserved. -** -** This copyrighted material is made available to anyone wishing to use, -** modify, copy, or redistribute it subject to the terms and conditions -** of the GNU General Public License v.2. -** -******************************************************************************* -******************************************************************************/ - -/* - * lowcomms.c - * - * This is the "low-level" comms layer. - * - * It is responsible for sending/receiving messages - * from other nodes in the cluster. - * - * Cluster nodes are referred to by their nodeids. nodeids are - * simply 32 bit numbers to the locking module - if they need to - * be expanded for the cluster infrastructure then that is it's - * responsibility. It is this layer's - * responsibility to resolve these into IP address or - * whatever it needs for inter-node communication. - * - * The comms level is two kernel threads that deal mainly with - * the receiving of messages from other nodes and passing them - * up to the mid-level comms layer (which understands the - * message format) for execution by the locking core, and - * a send thread which does all the setting up of connections - * to remote nodes and the sending of data. Threads are not allowed - * to send their own data because it may cause them to wait in times - * of high load. Also, this way, the sending thread can collect together - * messages bound for one node and send them in one block. - * - * I don't see any problem with the recv thread executing the locking - * code on behalf of remote processes as the locking code is - * short, efficient and never waits. - * - */ - - -#include <asm/ioctls.h> -#include <net/sock.h> -#include <net/tcp.h> -#include <linux/pagemap.h> - -#include "dlm_internal.h" -#include "lowcomms.h" -#include "midcomms.h" -#include "config.h" - -struct cbuf { - unsigned base; - unsigned len; - unsigned mask; -}; - -#ifndef FALSE -#define FALSE 0 -#define TRUE 1 -#endif -#define NODE_INCREMENT 32 - -#define CBUF_INIT(cb, size) do { (cb)->base = (cb)->len = 0; (cb)->mask = ((size)-1); } while(0) -#define CBUF_ADD(cb, n) do { (cb)->len += n; } while(0) -#define CBUF_EMPTY(cb) ((cb)->len == 0) -#define CBUF_MAY_ADD(cb, n) (((cb)->len + (n)) < ((cb)->mask + 1)) -#define CBUF_EAT(cb, n) do { (cb)->len -= (n); \ - (cb)->base += (n); (cb)->base &= (cb)->mask; } while(0) -#define CBUF_DATA(cb) (((cb)->base + (cb)->len) & (cb)->mask) - -/* Maximum number of incoming messages to process before - doing a schedule() -*/ -#define MAX_RX_MSG_COUNT 25 - -struct connection { - struct socket *sock; /* NULL if not connected */ - uint32_t nodeid; /* So we know who we are in the list */ - struct rw_semaphore sock_sem; /* Stop connect races */ - struct list_head read_list; /* On this list when ready for reading */ - struct list_head write_list; /* On this list when ready for writing */ - struct list_head state_list; /* On this list when ready to connect */ - unsigned long flags; /* bit 1,2 = We are on the read/write lists */ -#define CF_READ_PENDING 1 -#define CF_WRITE_PENDING 2 -#define CF_CONNECT_PENDING 3 -#define CF_IS_OTHERCON 4 - struct list_head writequeue; /* List of outgoing writequeue_entries */ - struct list_head listenlist; /* List of allocated listening sockets */ - spinlock_t writequeue_lock; - int (*rx_action) (struct connection *); /* What to do when active */ - struct page *rx_page; - struct cbuf cb; - int retries; - atomic_t waiting_requests; -#define MAX_CONNECT_RETRIES 3 - struct connection *othercon; -}; -#define sock2con(x) ((struct connection *)(x)->sk_user_data) - -/* An entry waiting to be sent */ -struct writequeue_entry { - struct list_head list; - struct page *page; - int offset; - int len; - int end; - int users; - struct connection *con; -}; - -static struct sockaddr_storage dlm_local_addr; - -/* Manage daemons */ -static struct task_struct *recv_task; -static struct task_struct *send_task; - -static wait_queue_t lowcomms_send_waitq_head; -static wait_queue_head_t lowcomms_send_waitq; -static wait_queue_t lowcomms_recv_waitq_head; -static wait_queue_head_t lowcomms_recv_waitq; - -/* An array of pointers to connections, indexed by NODEID */ -static struct connection **connections; -static struct semaphore connections_lock; -static kmem_cache_t *con_cache; -static int conn_array_size; -static atomic_t accepting; - -/* List of sockets that have reads pending */ -static struct list_head read_sockets; -static spinlock_t read_sockets_lock; - -/* List of sockets which have writes pending */ -static struct list_head write_sockets; -static spinlock_t write_sockets_lock; - -/* List of sockets which have connects pending */ -static struct list_head state_sockets; -static spinlock_t state_sockets_lock; - -static struct connection *nodeid2con(int nodeid, int allocation) -{ - struct connection *con = NULL; - - down(&connections_lock); - if (nodeid >= conn_array_size) { - int new_size = nodeid + NODE_INCREMENT; - struct connection **new_conns; - - new_conns = kmalloc(sizeof(struct connection *) * - new_size, allocation); - if (!new_conns) - goto finish; - - memset(new_conns, 0, sizeof(struct connection *) * new_size); - memcpy(new_conns, connections, sizeof(struct connection *) * conn_array_size); - conn_array_size = new_size; - kfree(connections); - connections = new_conns; - - } - - con = connections[nodeid]; - if (con == NULL && allocation) { - con = kmem_cache_alloc(con_cache, allocation); - if (!con) - goto finish; - - memset(con, 0, sizeof(*con)); - con->nodeid = nodeid; - init_rwsem(&con->sock_sem); - INIT_LIST_HEAD(&con->writequeue); - spin_lock_init(&con->writequeue_lock); - - connections[nodeid] = con; - } - - finish: - up(&connections_lock); - return con; -} - -/* Data available on socket or listen socket received a connect */ -static void lowcomms_data_ready(struct sock *sk, int count_unused) -{ - struct connection *con = sock2con(sk); - - atomic_inc(&con->waiting_requests); - if (test_and_set_bit(CF_READ_PENDING, &con->flags)) - return; - - spin_lock_bh(&read_sockets_lock); - list_add_tail(&con->read_list, &read_sockets); - spin_unlock_bh(&read_sockets_lock); - - wake_up_interruptible(&lowcomms_recv_waitq); -} - -static void lowcomms_write_space(struct sock *sk) -{ - struct connection *con = sock2con(sk); - - if (test_and_set_bit(CF_WRITE_PENDING, &con->flags)) - return; - - spin_lock_bh(&write_sockets_lock); - list_add_tail(&con->write_list, &write_sockets); - spin_unlock_bh(&write_sockets_lock); - - wake_up_interruptible(&lowcomms_send_waitq); -} - -static inline void lowcomms_connect_sock(struct connection *con) -{ - if (test_and_set_bit(CF_CONNECT_PENDING, &con->flags)) - return; - if (!atomic_read(&accepting)) - return; - - spin_lock_bh(&state_sockets_lock); - list_add_tail(&con->state_list, &state_sockets); - spin_unlock_bh(&state_sockets_lock); - - wake_up_interruptible(&lowcomms_send_waitq); -} - -static void lowcomms_state_change(struct sock *sk) -{ -/* struct connection *con = sock2con(sk); */ - - switch (sk->sk_state) { - case TCP_ESTABLISHED: - lowcomms_write_space(sk); - break; - - case TCP_FIN_WAIT1: - case TCP_FIN_WAIT2: - case TCP_TIME_WAIT: - case TCP_CLOSE: - case TCP_CLOSE_WAIT: - case TCP_LAST_ACK: - case TCP_CLOSING: - /* FIXME: I think this causes more trouble than it solves. - lowcomms wil reconnect anyway when there is something to - send. This just attempts reconnection if a node goes down! - */ - /* lowcomms_connect_sock(con); */ - break; - - default: - printk("dlm: lowcomms_state_change: state=%d\n", sk->sk_state); - break; - } -} - -/* Make a socket active */ -static int add_sock(struct socket *sock, struct connection *con) -{ - con->sock = sock; - - /* Install a data_ready callback */ - con->sock->sk->sk_data_ready = lowcomms_data_ready; - con->sock->sk->sk_write_space = lowcomms_write_space; - con->sock->sk->sk_state_change = lowcomms_state_change; - - return 0; -} - -/* Add the port number to an IP6 or 4 sockaddr and return the address - length */ -static void make_sockaddr(struct sockaddr_storage *saddr, uint16_t port, - int *addr_len) -{ - saddr->ss_family = dlm_local_addr.ss_family; - if (saddr->ss_family == AF_INET) { - struct sockaddr_in *in4_addr = (struct sockaddr_in *)saddr; - in4_addr->sin_port = cpu_to_be16(port); - *addr_len = sizeof(struct sockaddr_in); - } - else { - struct sockaddr_in6 *in6_addr = (struct sockaddr_in6 *)saddr; - in6_addr->sin6_port = cpu_to_be16(port); - *addr_len = sizeof(struct sockaddr_in6); - } -} - -/* Close a remote connection and tidy up */ -static void close_connection(struct connection *con, int and_other) -{ - down_write(&con->sock_sem); - - if (con->sock) { - sock_release(con->sock); - con->sock = NULL; - } - if (con->othercon && and_other) { - /* Argh! recursion in kernel code! - Actually, this isn't a list so it - will only re-enter once. - */ - close_connection(con->othercon, FALSE); - } - if (con->rx_page) { - __free_page(con->rx_page); - con->rx_page = NULL; - } - con->retries = 0; - up_write(&con->sock_sem); -} - -/* Data received from remote end */ -static int receive_from_sock(struct connection *con) -{ - int ret = 0; - struct msghdr msg; - struct iovec iov[2]; - mm_segment_t fs; - unsigned len; - int r; - int call_again_soon = 0; - - down_read(&con->sock_sem); - - if (con->sock == NULL) - goto out; - if (con->rx_page == NULL) { - /* - * This doesn't need to be atomic, but I think it should - * improve performance if it is. - */ - con->rx_page = alloc_page(GFP_ATOMIC); - if (con->rx_page == NULL) - goto out_resched; - CBUF_INIT(&con->cb, PAGE_CACHE_SIZE); - } - - msg.msg_control = NULL; - msg.msg_controllen = 0; - msg.msg_iovlen = 1; - msg.msg_iov = iov; - msg.msg_name = NULL; - msg.msg_namelen = 0; - msg.msg_flags = 0; - - /* - * iov[0] is the bit of the circular buffer between the current end - * point (cb.base + cb.len) and the end of the buffer. - */ - iov[0].iov_len = con->cb.base - CBUF_DATA(&con->cb); - iov[0].iov_base = page_address(con->rx_page) + CBUF_DATA(&con->cb); - iov[1].iov_len = 0; - - /* - * iov[1] is the bit of the circular buffer between the start of the - * buffer and the start of the currently used section (cb.base) - */ - if (CBUF_DATA(&con->cb) >= con->cb.base) { - iov[0].iov_len = PAGE_CACHE_SIZE - CBUF_DATA(&con->cb); - iov[1].iov_len = con->cb.base; - iov[1].iov_base = page_address(con->rx_page); - msg.msg_iovlen = 2; - } - len = iov[0].iov_len + iov[1].iov_len; - - fs = get_fs(); - set_fs(get_ds()); - r = ret = sock_recvmsg(con->sock, &msg, len, - MSG_DONTWAIT | MSG_NOSIGNAL); - set_fs(fs); - - if (ret <= 0) - goto out_close; - if (ret == len) - call_again_soon = 1; - CBUF_ADD(&con->cb, ret); - ret = dlm_process_incoming_buffer(con->nodeid, - page_address(con->rx_page), - con->cb.base, con->cb.len, - PAGE_CACHE_SIZE); - if (ret == -EBADMSG) { - printk(KERN_INFO "dlm: lowcomms: addr=%p, base=%u, len=%u, " - "iov_len=%u, iov_base[0]=%p, read=%d\n", - page_address(con->rx_page), con->cb.base, con->cb.len, - len, iov[0].iov_base, r); - } - if (ret < 0) - goto out_close; - CBUF_EAT(&con->cb, ret); - - if (CBUF_EMPTY(&con->cb) && !call_again_soon) { - __free_page(con->rx_page); - con->rx_page = NULL; - } - - out: - if (call_again_soon) - goto out_resched; - up_read(&con->sock_sem); - ret = 0; - goto out_ret; - - out_resched: - lowcomms_data_ready(con->sock->sk, 0); - up_read(&con->sock_sem); - ret = 0; - schedule(); - goto out_ret; - - out_close: - up_read(&con->sock_sem); - if (ret != -EAGAIN && !test_bit(CF_IS_OTHERCON, &con->flags)) { - close_connection(con, FALSE); - /* Reconnect when there is something to send */ - } - - out_ret: - return ret; -} - -/* Listening socket is busy, accept a connection */ -static int accept_from_sock(struct connection *con) -{ - int result; - struct sockaddr_storage peeraddr; - struct socket *newsock; - int len; - int nodeid; - struct connection *newcon; - - memset(&peeraddr, 0, sizeof(peeraddr)); - result = sock_create_kern(dlm_local_addr.ss_family, SOCK_STREAM, IPPROTO_TCP, &newsock); - if (result < 0) - return -ENOMEM; - - down_read(&con->sock_sem); - - result = -ENOTCONN; - if (con->sock == NULL) - goto accept_err; - - newsock->type = con->sock->type; - newsock->ops = con->sock->ops; - - result = con->sock->ops->accept(con->sock, newsock, O_NONBLOCK); - if (result < 0) - goto accept_err; - - /* Get the connected socket's peer */ - memset(&peeraddr, 0, sizeof(peeraddr)); - if (newsock->ops->getname(newsock, (struct sockaddr *)&peeraddr, - &len, 2)) { - result = -ECONNABORTED; - goto accept_err; - } - - /* Get the new node's NODEID */ - make_sockaddr(&peeraddr, 0, &len); - if (dlm_addr_to_nodeid(&peeraddr, &nodeid)) { - printk("dlm: connect from non cluster node\n"); - sock_release(newsock); - up_read(&con->sock_sem); - return -1; - } - - log_print("got connection from %d", nodeid); - - /* Check to see if we already have a connection to this node. This - * could happen if the two nodes initiate a connection at roughly - * the same time and the connections cross on the wire. - * TEMPORARY FIX: - * In this case we store the incoming one in "othercon" - */ - newcon = nodeid2con(nodeid, GFP_KERNEL); - if (!newcon) { - result = -ENOMEM; - goto accept_err; - } - down_write(&newcon->sock_sem); - if (newcon->sock) { - struct connection *othercon = newcon->othercon; - - if (!othercon) { - othercon = kmem_cache_alloc(con_cache, GFP_KERNEL); - if (!othercon) { - printk("dlm: failed to allocate incoming socket\n"); - up_write(&newcon->sock_sem); - result = -ENOMEM; - goto accept_err; - } - memset(othercon, 0, sizeof(*othercon)); - othercon->nodeid = nodeid; - othercon->rx_action = receive_from_sock; - init_rwsem(&othercon->sock_sem); - set_bit(CF_IS_OTHERCON, &othercon->flags); - newcon->othercon = othercon; - } - othercon->sock = newsock; - newsock->sk->sk_user_data = othercon; - add_sock(newsock, othercon); - } - else { - newsock->sk->sk_user_data = newcon; - newcon->rx_action = receive_from_sock; - add_sock(newsock, newcon); - - } - - up_write(&newcon->sock_sem); - - /* - * Add it to the active queue in case we got data - * beween processing the accept adding the socket - * to the read_sockets list - */ - lowcomms_data_ready(newsock->sk, 0); - up_read(&con->sock_sem); - - return 0; - - accept_err: - up_read(&con->sock_sem); - sock_release(newsock); - - if (result != -EAGAIN) - printk("dlm: error accepting connection from node: %d\n", result); - return result; -} - -/* Connect a new socket to its peer */ -static int connect_to_sock(struct connection *con) -{ - int result = -EHOSTUNREACH; - struct sockaddr_storage saddr; - int addr_len; - struct socket *sock; - - if (con->nodeid == 0) { - log_print("attempt to connect sock 0 foiled"); - return 0; - } - - down_write(&con->sock_sem); - if (con->retries++ > MAX_CONNECT_RETRIES) - goto out; - - /* Some odd races can cause double-connects, ignore them */ - if (con->sock) { - result = 0; - goto out; - } - - /* Create a socket to communicate with */ - result = sock_create_kern(dlm_local_addr.ss_family, SOCK_STREAM, IPPROTO_TCP, &sock); - if (result < 0) - goto out_err; - - memset(&saddr, 0, sizeof(saddr)); - if (dlm_nodeid_to_addr(con->nodeid, &saddr)) - goto out_err; - - sock->sk->sk_user_data = con; - con->rx_action = receive_from_sock; - - make_sockaddr(&saddr, dlm_config.ci_tcp_port, &addr_len); - - add_sock(sock, con); - - log_print("connecting to %d", con->nodeid); - result = - sock->ops->connect(sock, (struct sockaddr *)&saddr, addr_len, - O_NONBLOCK); - if (result == -EINPROGRESS) - result = 0; - if (result != 0) - goto out_err; - - out: - up_write(&con->sock_sem); - /* - * Returning an error here means we've given up trying to connect to - * a remote node, otherwise we return 0 and reschedule the connetion - * attempt - */ - return result; - - out_err: - if (con->sock) { - sock_release(con->sock); - con->sock = NULL; - } - /* - * Some errors are fatal and this list might need adjusting. For other - * errors we try again until the max number of retries is reached. - */ - if (result != -EHOSTUNREACH && result != -ENETUNREACH && - result != -ENETDOWN && result != EINVAL - && result != -EPROTONOSUPPORT) { - lowcomms_connect_sock(con); - result = 0; - } - goto out; -} - -static struct socket *create_listen_sock(struct connection *con, struct sockaddr_storage *saddr) -{ - struct socket *sock = NULL; - mm_segment_t fs; - int result = 0; - int one = 1; - int addr_len; - - if (dlm_local_addr.ss_family == AF_INET) - addr_len = sizeof(struct sockaddr_in); - else - addr_len = sizeof(struct sockaddr_in6); - - /* Create a socket to communicate with */ - result = sock_create_kern(dlm_local_addr.ss_family, SOCK_STREAM, IPPROTO_TCP, &sock); - if (result < 0) { - printk("dlm: Can't create listening comms socket\n"); - goto create_out; - } - - fs = get_fs(); - set_fs(get_ds()); - result = sock_setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (char *)&one, sizeof(one)); - set_fs(fs); - if (result < 0) { - printk("dlm: Failed to set SO_REUSEADDR on socket: result=%d\n",result); - } - sock->sk->sk_user_data = con; - con->rx_action = accept_from_sock; - con->sock = sock; - - /* Bind to our port */ - make_sockaddr(saddr, dlm_config.ci_tcp_port, &addr_len); - result = sock->ops->bind(sock, (struct sockaddr *) saddr, addr_len); - if (result < 0) { - printk("dlm: Can't bind to port %d\n", dlm_config.ci_tcp_port); - sock_release(sock); - sock = NULL; - con->sock = NULL; - goto create_out; - } - - fs = get_fs(); - set_fs(get_ds()); - - result = sock_setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE, (char *)&one, sizeof(one)); - set_fs(fs); - if (result < 0) { - printk("dlm: Set keepalive failed: %d\n", result); - } - - result = sock->ops->listen(sock, 5); - if (result < 0) { - printk("dlm: Can't listen on port %d\n", - dlm_config.ci_tcp_port); - sock_release(sock); - sock = NULL; - goto create_out; - } - - create_out: - return sock; -} - - -/* Listen on all interfaces */ -static int listen_for_all(void) -{ - struct socket *sock = NULL; - struct connection *con = nodeid2con(0, GFP_KERNEL); - int result = -EINVAL; - - /* We don't support multi-homed hosts */ - memset(con, 0, sizeof(*con)); - init_rwsem(&con->sock_sem); - spin_lock_init(&con->writequeue_lock); - INIT_LIST_HEAD(&con->writequeue); - set_bit(CF_IS_OTHERCON, &con->flags); - - sock = create_listen_sock(con, &dlm_local_addr); - if (sock) { - add_sock(sock, con); - result = 0; - } - else { - result = -EADDRINUSE; - } - - return result; -} - - - -static struct writequeue_entry *new_writequeue_entry(struct connection *con, - int allocation) -{ - struct writequeue_entry *entry; - - entry = kmalloc(sizeof(struct writequeue_entry), allocation); - if (!entry) - return NULL; - - entry->page = alloc_page(allocation); - if (!entry->page) { - kfree(entry); - return NULL; - } - - entry->offset = 0; - entry->len = 0; - entry->end = 0; - entry->users = 0; - entry->con = con; - - return entry; -} - -void *dlm_lowcomms_get_buffer(int nodeid, int len, - int allocation, char **ppc) -{ - struct connection *con; - struct writequeue_entry *e; - int offset = 0; - int users = 0; - - if (!atomic_read(&accepting)) - return NULL; - - con = nodeid2con(nodeid, allocation); - if (!con) - return NULL; - - spin_lock(&con->writequeue_lock); - e = list_entry(con->writequeue.prev, struct writequeue_entry, list); - if (((struct list_head *) e == &con->writequeue) || - (PAGE_CACHE_SIZE - e->end < len)) { - e = NULL; - } else { - offset = e->end; - e->end += len; - users = e->users++; - } - spin_unlock(&con->writequeue_lock); - - if (e) { - got_one: - if (users == 0) - kmap(e->page); - *ppc = page_address(e->page) + offset; - return e; - } - - e = new_writequeue_entry(con, allocation); - if (e) { - spin_lock(&con->writequeue_lock); - offset = e->end; - e->end += len; - users = e->users++; - list_add_tail(&e->list, &con->writequeue); - spin_unlock(&con->writequeue_lock); - goto got_one; - } - return NULL; -} - -void dlm_lowcomms_commit_buffer(void *mh) -{ - struct writequeue_entry *e = (struct writequeue_entry *)mh; - struct connection *con = e->con; - int users; - - if (!atomic_read(&accepting)) - return; - - spin_lock(&con->writequeue_lock); - users = --e->users; - if (users) - goto out; - e->len = e->end - e->offset; - kunmap(e->page); - spin_unlock(&con->writequeue_lock); - - if (test_and_set_bit(CF_WRITE_PENDING, &con->flags) == 0) { - spin_lock_bh(&write_sockets_lock); - list_add_tail(&con->write_list, &write_sockets); - spin_unlock_bh(&write_sockets_lock); - - wake_up_interruptible(&lowcomms_send_waitq); - } - return; - - out: - spin_unlock(&con->writequeue_lock); - return; -} - -static void free_entry(struct writequeue_entry *e) -{ - __free_page(e->page); - kfree(e); -} - -/* Send a message */ -static int send_to_sock(struct connection *con) -{ - int ret = 0; - ssize_t(*sendpage) (struct socket *, struct page *, int, size_t, int); - const int msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL; - struct writequeue_entry *e; - int len, offset; - - down_read(&con->sock_sem); - if (con->sock == NULL) - goto out_connect; - - sendpage = con->sock->ops->sendpage; - - spin_lock(&con->writequeue_lock); - for (;;) { - e = list_entry(con->writequeue.next, struct writequeue_entry, - list); - if ((struct list_head *) e == &con->writequeue) - break; - - len = e->len; - offset = e->offset; - BUG_ON(len == 0 && e->users == 0); - spin_unlock(&con->writequeue_lock); - - ret = 0; - if (len) { - ret = sendpage(con->sock, e->page, offset, len, - msg_flags); - if (ret == -EAGAIN || ret == 0) - goto out; - if (ret <= 0) - goto send_error; - } - else { - /* Don't starve people filling buffers */ - schedule(); - } - - spin_lock(&con->writequeue_lock); - e->offset += ret; - e->len -= ret; - - if (e->len == 0 && e->users == 0) { - list_del(&e->list); - free_entry(e); - continue; - } - } - spin_unlock(&con->writequeue_lock); - out: - up_read(&con->sock_sem); - return ret; - - send_error: - up_read(&con->sock_sem); - close_connection(con, FALSE); - lowcomms_connect_sock(con); - return ret; - - out_connect: - up_read(&con->sock_sem); - lowcomms_connect_sock(con); - return 0; -} - -static void clean_one_writequeue(struct connection *con) -{ - struct list_head *list; - struct list_head *temp; - - spin_lock(&con->writequeue_lock); - list_for_each_safe(list, temp, &con->writequeue) { - struct writequeue_entry *e = - list_entry(list, struct writequeue_entry, list); - list_del(&e->list); - free_entry(e); - } - spin_unlock(&con->writequeue_lock); -} - -/* Called from recovery when it knows that a node has - left the cluster */ -int dlm_lowcomms_close(int nodeid) -{ - struct connection *con; - - if (!connections) - goto out; - - log_print("closing connection to node %d", nodeid); - con = nodeid2con(nodeid, 0); - if (con) { - clean_one_writequeue(con); - close_connection(con, TRUE); - atomic_set(&con->waiting_requests, 0); - } - return 0; - - out: - return -1; -} - -/* API send message call, may queue the request */ -/* N.B. This is the old interface - use the new one for new calls */ -int lowcomms_send_message(int nodeid, char *buf, int len, int allocation) -{ - struct writequeue_entry *e; - char *b; - - e = dlm_lowcomms_get_buffer(nodeid, len, allocation, &b); - if (e) { - memcpy(b, buf, len); - dlm_lowcomms_commit_buffer(e); - return 0; - } - return -ENOBUFS; -} - -/* Look for activity on active sockets */ -static void process_sockets(void) -{ - struct list_head *list; - struct list_head *temp; - int count = 0; - - spin_lock_bh(&read_sockets_lock); - list_for_each_safe(list, temp, &read_sockets) { - - struct connection *con = - list_entry(list, struct connection, read_list); - list_del(&con->read_list); - clear_bit(CF_READ_PENDING, &con->flags); - - spin_unlock_bh(&read_sockets_lock); - - /* This can reach zero if we are processing requests - * as they come in. - */ - if (atomic_read(&con->waiting_requests) == 0) { - spin_lock_bh(&read_sockets_lock); - continue; - } - - do { - con->rx_action(con); - - /* Don't starve out everyone else */ - if (++count >= MAX_RX_MSG_COUNT) { - schedule(); - count = 0; - } - - } while (!atomic_dec_and_test(&con->waiting_requests) && - !kthread_should_stop()); - - spin_lock_bh(&read_sockets_lock); - } - spin_unlock_bh(&read_sockets_lock); -} - -/* Try to send any messages that are pending - */ -static void process_output_queue(void) -{ - struct list_head *list; - struct list_head *temp; - int ret; - - spin_lock_bh(&write_sockets_lock); - list_for_each_safe(list, temp, &write_sockets) { - struct connection *con = - list_entry(list, struct connection, write_list); - clear_bit(CF_WRITE_PENDING, &con->flags); - list_del(&con->write_list); - - spin_unlock_bh(&write_sockets_lock); - - ret = send_to_sock(con); - if (ret < 0) { - } - spin_lock_bh(&write_sockets_lock); - } - spin_unlock_bh(&write_sockets_lock); -} - -static void process_state_queue(void) -{ - struct list_head *list; - struct list_head *temp; - int ret; - - spin_lock_bh(&state_sockets_lock); - list_for_each_safe(list, temp, &state_sockets) { - struct connection *con = - list_entry(list, struct connection, state_list); - list_del(&con->state_list); - clear_bit(CF_CONNECT_PENDING, &con->flags); - spin_unlock_bh(&state_sockets_lock); - - ret = connect_to_sock(con); - if (ret < 0) { - } - spin_lock_bh(&state_sockets_lock); - } - spin_unlock_bh(&state_sockets_lock); -} - - -/* Discard all entries on the write queues */ -static void clean_writequeues(void) -{ - int nodeid; - - for (nodeid = 1; nodeid < conn_array_size; nodeid++) { - struct connection *con = nodeid2con(nodeid, 0); - - if (con) - clean_one_writequeue(con); - } -} - -static int read_list_empty(void) -{ - int status; - - spin_lock_bh(&read_sockets_lock); - status = list_empty(&read_sockets); - spin_unlock_bh(&read_sockets_lock); - - return status; -} - -/* DLM Transport comms receive daemon */ -static int dlm_recvd(void *data) -{ - init_waitqueue_head(&lowcomms_recv_waitq); - init_waitqueue_entry(&lowcomms_recv_waitq_head, current); - add_wait_queue(&lowcomms_recv_waitq, &lowcomms_recv_waitq_head); - - while (!kthread_should_stop()) { - set_current_state(TASK_INTERRUPTIBLE); - if (read_list_empty()) - schedule(); - set_current_state(TASK_RUNNING); - - process_sockets(); - } - - return 0; -} - -static int write_and_state_lists_empty(void) -{ - int status; - - spin_lock_bh(&write_sockets_lock); - status = list_empty(&write_sockets); - spin_unlock_bh(&write_sockets_lock); - - spin_lock_bh(&state_sockets_lock); - if (list_empty(&state_sockets) == 0) - status = 0; - spin_unlock_bh(&state_sockets_lock); - - return status; -} - -/* DLM Transport send daemon */ -static int dlm_sendd(void *data) -{ - init_waitqueue_head(&lowcomms_send_waitq); - init_waitqueue_entry(&lowcomms_send_waitq_head, current); - add_wait_queue(&lowcomms_send_waitq, &lowcomms_send_waitq_head); - - while (!kthread_should_stop()) { - set_current_state(TASK_INTERRUPTIBLE); - if (write_and_state_lists_empty()) - schedule(); - set_current_state(TASK_RUNNING); - - process_state_queue(); - process_output_queue(); - } - - return 0; -} - -static void daemons_stop(void) -{ - kthread_stop(recv_task); - kthread_stop(send_task); -} - -static int daemons_start(void) -{ - struct task_struct *p; - int error; - - p = kthread_run(dlm_recvd, NULL, "dlm_recvd"); - error = IS_ERR(p); - if (error) { - log_print("can't start dlm_recvd %d", error); - return error; - } - recv_task = p; - - p = kthread_run(dlm_sendd, NULL, "dlm_sendd"); - error = IS_ERR(p); - if (error) { - log_print("can't start dlm_sendd %d", error); - kthread_stop(recv_task); - return error; - } - send_task = p; - - return 0; -} - -/* - * Return the largest buffer size we can cope with. - */ -int lowcomms_max_buffer_size(void) -{ - return PAGE_CACHE_SIZE; -} - -void dlm_lowcomms_stop(void) -{ - int i; - - atomic_set(&accepting, 0); - - /* Set all the activity flags to prevent any - socket activity. - */ - for (i = 0; i < conn_array_size; i++) { - if (connections[i]) - connections[i]->flags |= 0x7; - } - daemons_stop(); - clean_writequeues(); - - for (i = 0; i < conn_array_size; i++) { - if (connections[i]) { - close_connection(connections[i], TRUE); - if (connections[i]->othercon) - kmem_cache_free(con_cache, connections[i]->othercon); - kmem_cache_free(con_cache, connections[i]); - } - } - - kfree(connections); - connections = NULL; - - kmem_cache_destroy(con_cache); -} - -/* This is quite likely to sleep... */ -int dlm_lowcomms_start(void) -{ - int error = 0; - - error = -ENOTCONN; - - /* - * Temporarily initialise the waitq head so that lowcomms_send_message - * doesn't crash if it gets called before the thread is fully - * initialised - */ - init_waitqueue_head(&lowcomms_send_waitq); - - error = -ENOMEM; - connections = kmalloc(sizeof(struct connection *) * - NODE_INCREMENT, GFP_KERNEL); - if (!connections) - goto out; - - memset(connections, 0, - sizeof(struct connection *) * NODE_INCREMENT); - - conn_array_size = NODE_INCREMENT; - - if (dlm_our_addr(&dlm_local_addr, 0)) { - log_print("no local IP address has been set"); - goto fail_free_conn; - } - if (!dlm_our_addr(&dlm_local_addr, 1)) { - log_print("This dlm comms module does not support multi-homed clustering"); - goto fail_free_conn; - } - - con_cache = kmem_cache_create("dlm_conn", sizeof(struct connection), - __alignof__(struct connection), 0, NULL, NULL); - if (!con_cache) - goto fail_free_conn; - - - /* Start listening */ - error = listen_for_all(); - if (error) - goto fail_unlisten; - - error = daemons_start(); - if (error) - goto fail_unlisten; - - atomic_set(&accepting, 1); - - return 0; - - fail_unlisten: - close_connection(connections[0], 0); - kmem_cache_free(con_cache, connections[0]); - kmem_cache_destroy(con_cache); - - fail_free_conn: - kfree(connections); - - out: - return error; -} - -int dlm_lowcomms_init(void) -{ - INIT_LIST_HEAD(&read_sockets); - INIT_LIST_HEAD(&write_sockets); - INIT_LIST_HEAD(&state_sockets); - - spin_lock_init(&read_sockets_lock); - spin_lock_init(&write_sockets_lock); - spin_lock_init(&state_sockets_lock); - init_MUTEX(&connections_lock); - - return 0; -} - -void dlm_lowcomms_exit(void) -{ - /* Nothing to do here, - needed for compatibility with the sctp version - */ -} - -/* Don't accept any more outgoing work */ -void lowcomms_stop_accept(void) -{ - atomic_set(&accepting, 0); -} - Index: linux-rhel51-quilt/fs/dlm/lowcomms.c =================================================================== --- /dev/null 1970-01-01 00:00:00.000000000 +0000 +++ linux-rhel51-quilt/fs/dlm/lowcomms.c 2007-06-08 09:55:54.000000000 -0500 @@ -0,0 +1,1474 @@ +/****************************************************************************** +******************************************************************************* +** +** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved. +** Copyright (C) 2004-2007 Red Hat, Inc. All rights reserved. +** +** This copyrighted material is made available to anyone wishing to use, +** modify, copy, or redistribute it subject to the terms and conditions +** of the GNU General Public License v.2. +** +******************************************************************************* +******************************************************************************/ + +/* + * lowcomms.c + * + * This is the "low-level" comms layer. + * + * It is responsible for sending/receiving messages + * from other nodes in the cluster. + * + * Cluster nodes are referred to by their nodeids. nodeids are + * simply 32 bit numbers to the locking module - if they need to + * be expanded for the cluster infrastructure then that is it's + * responsibility. It is this layer's + * responsibility to resolve these into IP address or + * whatever it needs for inter-node communication. + * + * The comms level is two kernel threads that deal mainly with + * the receiving of messages from other nodes and passing them + * up to the mid-level comms layer (which understands the + * message format) for execution by the locking core, and + * a send thread which does all the setting up of connections + * to remote nodes and the sending of data. Threads are not allowed + * to send their own data because it may cause them to wait in times + * of high load. Also, this way, the sending thread can collect together + * messages bound for one node and send them in one block. + * + * lowcomms will choose to use wither TCP or SCTP as its transport layer + * depending on the configuration variable 'protocol'. This should be set + * to 0 (default) for TCP or 1 for SCTP. It shouldbe configured using a + * cluster-wide mechanism as it must be the same on all nodes of the cluster + * for the DLM to function. + * + */ + +#include <asm/ioctls.h> +#include <net/sock.h> +#include <net/tcp.h> +#include <linux/pagemap.h> +#include <linux/idr.h> +#include <linux/file.h> +#include <linux/sctp.h> +#include <net/sctp/user.h> + +#include "dlm_internal.h" +#include "lowcomms.h" +#include "midcomms.h" +#include "config.h" + +#define NEEDED_RMEM (4*1024*1024) + +struct cbuf { + unsigned int base; + unsigned int len; + unsigned int mask; +}; + +static void cbuf_add(struct cbuf *cb, int n) +{ + cb->len += n; +} + +static int cbuf_data(struct cbuf *cb) +{ + return ((cb->base + cb->len) & cb->mask); +} + +static void cbuf_init(struct cbuf *cb, int size) +{ + cb->base = cb->len = 0; + cb->mask = size-1; +} + +static void cbuf_eat(struct cbuf *cb, int n) +{ + cb->len -= n; + cb->base += n; + cb->base &= cb->mask; +} + +static int cbuf_empty(struct cbuf *cb) +{ + return cb->len == 0; +} + +struct connection { + struct socket *sock; /* NULL if not connected */ + uint32_t nodeid; /* So we know who we are in the list */ + struct mutex sock_mutex; + unsigned long flags; +#define CF_READ_PENDING 1 +#define CF_WRITE_PENDING 2 +#define CF_CONNECT_PENDING 3 +#define CF_INIT_PENDING 4 +#define CF_IS_OTHERCON 5 + struct list_head writequeue; /* List of outgoing writequeue_entries */ + spinlock_t writequeue_lock; + int (*rx_action) (struct connection *); /* What to do when active */ + void (*connect_action) (struct connection *); /* What to do to connect */ + struct page *rx_page; + struct cbuf cb; + int retries; +#define MAX_CONNECT_RETRIES 3 + int sctp_assoc; + struct connection *othercon; + struct work_struct rwork; /* Receive workqueue */ + struct work_struct swork; /* Send workqueue */ +}; +#define sock2con(x) ((struct connection *)(x)->sk_user_data) + +/* An entry waiting to be sent */ +struct writequeue_entry { + struct list_head list; + struct page *page; + int offset; + int len; + int end; + int users; + struct connection *con; +}; + +static struct sockaddr_storage *dlm_local_addr[DLM_MAX_ADDR_COUNT]; +static int dlm_local_count; + +/* Work queues */ +static struct workqueue_struct *recv_workqueue; +static struct workqueue_struct *send_workqueue; + +static DEFINE_IDR(connections_idr); +static DECLARE_MUTEX(connections_lock); +static int max_nodeid; +static struct kmem_cache *con_cache; + +static void process_recv_sockets(void *data); +static void process_send_sockets(void *data); + +/* + * If 'allocation' is zero then we don't attempt to create a new + * connection structure for this node. + */ +static struct connection *__nodeid2con(int nodeid, gfp_t alloc) +{ + struct connection *con = NULL; + int r; + int n; + + con = idr_find(&connections_idr, nodeid); + if (con || !alloc) + return con; + + r = idr_pre_get(&connections_idr, alloc); + if (!r) + return NULL; + + con = kmem_cache_zalloc(con_cache, alloc); + if (!con) + return NULL; + + r = idr_get_new_above(&connections_idr, con, nodeid, &n); + if (r) { + kmem_cache_free(con_cache, con); + return NULL; + } + + if (n != nodeid) { + idr_remove(&connections_idr, n); + kmem_cache_free(con_cache, con); + return NULL; + } + + con->nodeid = nodeid; + mutex_init(&con->sock_mutex); + INIT_LIST_HEAD(&con->writequeue); + spin_lock_init(&con->writequeue_lock); + INIT_WORK(&con->swork, process_send_sockets, con); + INIT_WORK(&con->rwork, process_recv_sockets, con); + + /* Setup action pointers for child sockets */ + if (con->nodeid) { + struct connection *zerocon = idr_find(&connections_idr, 0); + + con->connect_action = zerocon->connect_action; + if (!con->rx_action) + con->rx_action = zerocon->rx_action; + } + + if (nodeid > max_nodeid) + max_nodeid = nodeid; + + return con; +} + +static struct connection *nodeid2con(int nodeid, gfp_t allocation) +{ + struct connection *con; + + down(&connections_lock); + con = __nodeid2con(nodeid, allocation); + up(&connections_lock); + + return con; +} + +/* This is a bit drastic, but only called when things go wrong */ +static struct connection *assoc2con(int assoc_id) +{ + int i; + struct connection *con; + + down(&connections_lock); + for (i=0; i<=max_nodeid; i++) { + con = __nodeid2con(i, 0); + if (con && con->sctp_assoc == assoc_id) { + up(&connections_lock); + return con; + } + } + up(&connections_lock); + return NULL; +} + +static int nodeid_to_addr(int nodeid, struct sockaddr *retaddr) +{ + struct sockaddr_storage addr; + int error; + + if (!dlm_local_count) + return -1; + + error = dlm_nodeid_to_addr(nodeid, &addr); + if (error) + return error; + + if (dlm_local_addr[0]->ss_family == AF_INET) { + struct sockaddr_in *in4 = (struct sockaddr_in *) &addr; + struct sockaddr_in *ret4 = (struct sockaddr_in *) retaddr; + ret4->sin_addr.s_addr = in4->sin_addr.s_addr; + } else { + struct sockaddr_in6 *in6 = (struct sockaddr_in6 *) &addr; + struct sockaddr_in6 *ret6 = (struct sockaddr_in6 *) retaddr; + memcpy(&ret6->sin6_addr, &in6->sin6_addr, + sizeof(in6->sin6_addr)); + } + + return 0; +} + +/* Data available on socket or listen socket received a connect */ +static void lowcomms_data_ready(struct sock *sk, int count_unused) +{ + struct connection *con = sock2con(sk); + if (!test_and_set_bit(CF_READ_PENDING, &con->flags)) + queue_work(recv_workqueue, &con->rwork); +} + +static void lowcomms_write_space(struct sock *sk) +{ + struct connection *con = sock2con(sk); + + if (!test_and_set_bit(CF_WRITE_PENDING, &con->flags)) + queue_work(send_workqueue, &con->swork); +} + +static inline void lowcomms_connect_sock(struct connection *con) +{ + if (!test_and_set_bit(CF_CONNECT_PENDING, &con->flags)) + queue_work(send_workqueue, &con->swork); +} + +static void lowcomms_state_change(struct sock *sk) +{ + if (sk->sk_state == TCP_ESTABLISHED) + lowcomms_write_space(sk); +} + +/* Make a socket active */ +static int add_sock(struct socket *sock, struct connection *con) +{ + con->sock = sock; + + /* Install a data_ready callback */ + con->sock->sk->sk_data_ready = lowcomms_data_ready; + con->sock->sk->sk_write_space = lowcomms_write_space; + con->sock->sk->sk_state_change = lowcomms_state_change; + con->sock->sk->sk_user_data = con; + return 0; +} + +/* Add the port number to an IPv6 or 4 sockaddr and return the address + length */ +static void make_sockaddr(struct sockaddr_storage *saddr, uint16_t port, + int *addr_len) +{ + saddr->ss_family = dlm_local_addr[0]->ss_family; + if (saddr->ss_family == AF_INET) { + struct sockaddr_in *in4_addr = (struct sockaddr_in *)saddr; + in4_addr->sin_port = cpu_to_be16(port); + *addr_len = sizeof(struct sockaddr_in); + memset(&in4_addr->sin_zero, 0, sizeof(in4_addr->sin_zero)); + } else { + struct sockaddr_in6 *in6_addr = (struct sockaddr_in6 *)saddr; + in6_addr->sin6_port = cpu_to_be16(port); + *addr_len = sizeof(struct sockaddr_in6); + } +} + +/* Close a remote connection and tidy up */ +static void close_connection(struct connection *con, int and_other) +{ + mutex_lock(&con->sock_mutex); + + if (con->sock) { + sock_release(con->sock); + con->sock = NULL; + } + if (con->othercon && and_other) { + /* Will only re-enter once. */ + close_connection(con->othercon, 0); + } + if (con->rx_page) { + __free_page(con->rx_page); + con->rx_page = NULL; + } + con->retries = 0; + mutex_unlock(&con->sock_mutex); +} + +/* We only send shutdown messages to nodes that are not part of the cluster */ +static void sctp_send_shutdown(sctp_assoc_t associd) +{ + static char outcmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))]; + struct msghdr outmessage; + struct cmsghdr *cmsg; + struct sctp_sndrcvinfo *sinfo; + int ret; + struct connection *con; + + con = nodeid2con(0,0); + BUG_ON(con == NULL); + + outmessage.msg_name = NULL; + outmessage.msg_namelen = 0; + outmessage.msg_control = outcmsg; + outmessage.msg_controllen = sizeof(outcmsg); + outmessage.msg_flags = MSG_EOR; + + cmsg = CMSG_FIRSTHDR(&outmessage); + cmsg->cmsg_level = IPPROTO_SCTP; + cmsg->cmsg_type = SCTP_SNDRCV; + cmsg->cmsg_len = CMSG_LEN(sizeof(struct sctp_sndrcvinfo)); + outmessage.msg_controllen = cmsg->cmsg_len; + sinfo = CMSG_DATA(cmsg); + memset(sinfo, 0x00, sizeof(struct sctp_sndrcvinfo)); + + sinfo->sinfo_flags |= MSG_EOF; + sinfo->sinfo_assoc_id = associd; + + ret = kernel_sendmsg(con->sock, &outmessage, NULL, 0, 0); + + if (ret != 0) + log_print("send EOF to node failed: %d", ret); +} + +/* INIT failed but we don't know which node... + restart INIT on all pending nodes */ +static void sctp_init_failed(void) +{ + int i; + struct connection *con; + + down(&connections_lock); + for (i=1; i<=max_nodeid; i++) { + con = __nodeid2con(i, 0); + if (!con) + continue; + con->sctp_assoc = 0; + if (test_and_clear_bit(CF_CONNECT_PENDING, &con->flags)) { + if (!test_and_set_bit(CF_WRITE_PENDING, &con->flags)) { + queue_work(send_workqueue, &con->swork); + } + } + } + up(&connections_lock); +} + +/* Something happened to an association */ +static void process_sctp_notification(struct connection *con, + struct msghdr *msg, char *buf) +{ + union sctp_notification *sn = (union sctp_notification *)buf; + + if (sn->sn_header.sn_type == SCTP_ASSOC_CHANGE) { + switch (sn->sn_assoc_change.sac_state) { + + case SCTP_COMM_UP: + case SCTP_RESTART: + { + /* Check that the new node is in the lockspace */ + struct sctp_prim prim; + int nodeid; + int prim_len, ret; + int addr_len; + struct connection *new_con; + struct file *file; + sctp_peeloff_arg_t parg; + int parglen = sizeof(parg); + + /* + * We get this before any data for an association. + * We verify that the node is in the cluster and + * then peel off a socket for it. + */ + if ((int)sn->sn_assoc_change.sac_assoc_id <= 0) { + log_print("COMM_UP for invalid assoc ID %d", + (int)sn->sn_assoc_change.sac_assoc_id); + sctp_init_failed(); + return; + } + memset(&prim, 0, sizeof(struct sctp_prim)); + prim_len = sizeof(struct sctp_prim); + prim.ssp_assoc_id = sn->sn_assoc_change.sac_assoc_id; + + ret = kernel_getsockopt(con->sock, + IPPROTO_SCTP, + SCTP_PRIMARY_ADDR, + (char*)&prim, + &prim_len); + if (ret < 0) { + log_print("getsockopt/sctp_primary_addr on " + "new assoc %d failed : %d", + (int)sn->sn_assoc_change.sac_assoc_id, + ret); + + /* Retry INIT later */ + new_con = assoc2con(sn->sn_assoc_change.sac_assoc_id); + if (new_con) + clear_bit(CF_CONNECT_PENDING, &con->flags); + return; + } + make_sockaddr(&prim.ssp_addr, 0, &addr_len); + if (dlm_addr_to_nodeid(&prim.ssp_addr, &nodeid)) { + int i; + unsigned char *b=(unsigned char *)&prim.ssp_addr; + log_print("reject connect from unknown addr"); + for (i=0; i<sizeof(struct sockaddr_storage);i++) + printk("%02x ", b[i]); + printk("\n"); + sctp_send_shutdown(prim.ssp_assoc_id); + return; + } + + new_con = nodeid2con(nodeid, GFP_KERNEL); + if (!new_con) + return; + + /* Peel off a new sock */ + parg.associd = sn->sn_assoc_change.sac_assoc_id; + ret = kernel_getsockopt(con->sock, IPPROTO_SCTP, + SCTP_SOCKOPT_PEELOFF, + (void *)&parg, &parglen); + if (ret) { + log_print("Can't peel off a socket for " + "connection %d to node %d: err=%d\n", + parg.associd, nodeid, ret); + } + file = fget(parg.sd); + new_con->sock = SOCKET_I(file->f_dentry->d_inode); + add_sock(new_con->sock, new_con); + fput(file); + put_unused_fd(parg.sd); + + log_print("got new/restarted association %d nodeid %d", + (int)sn->sn_assoc_change.sac_assoc_id, nodeid); + + /* Send any pending writes */ + clear_bit(CF_CONNECT_PENDING, &new_con->flags); + clear_bit(CF_INIT_PENDING, &con->flags); + if (!test_and_set_bit(CF_WRITE_PENDING, &new_con->flags)) { + queue_work(send_workqueue, &new_con->swork); + } + if (!test_and_set_bit(CF_READ_PENDING, &new_con->flags)) + queue_work(recv_workqueue, &new_con->rwork); + } + break; + + case SCTP_COMM_LOST: + case SCTP_SHUTDOWN_COMP: + { + con = assoc2con(sn->sn_assoc_change.sac_assoc_id); + if (con) { + con->sctp_assoc = 0; + } + } + break; + + /* We don't know which INIT failed, so clear the PENDING flags + * on them all. if assoc_id is zero then it will then try + * again */ + + case SCTP_CANT_STR_ASSOC: + { + log_print("Can't start SCTP association - retrying"); + sctp_init_failed(); + } + break; + + default: + log_print("unexpected SCTP assoc change id=%d state=%d", + (int)sn->sn_assoc_change.sac_assoc_id, + sn->sn_assoc_change.sac_state); + } + } +} + +/* Data received from remote end */ +static int receive_from_sock(struct connection *con) +{ + int ret = 0; + struct msghdr msg = {}; + struct kvec iov[2]; + unsigned len; + int r; + int call_again_soon = 0; + int nvec; + char incmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))]; + + mutex_lock(&con->sock_mutex); + + if (con->sock == NULL) { + ret = -EAGAIN; + goto out_close; + } + + if (con->rx_page == NULL) { + /* + * This doesn't need to be atomic, but I think it should + * improve performance if it is. + */ + con->rx_page = alloc_page(GFP_ATOMIC); + if (con->rx_page == NULL) + goto out_resched; + cbuf_init(&con->cb, PAGE_CACHE_SIZE); + } + + /* Only SCTP needs these really */ + memset(&incmsg, 0, sizeof(incmsg)); + msg.msg_control = incmsg; + msg.msg_controllen = sizeof(incmsg); + + /* + * iov[0] is the bit of the circular buffer between the current end + * point (cb.base + cb.len) and the end of the buffer. + */ + iov[0].iov_len = con->cb.base - cbuf_data(&con->cb); + iov[0].iov_base = page_address(con->rx_page) + cbuf_data(&con->cb); + iov[1].iov_len = 0; + nvec = 1; + + /* + * iov[1] is the bit of the circular buffer between the start of the + * buffer and the start of the currently used section (cb.base) + */ + if (cbuf_data(&con->cb) >= con->cb.base) { + iov[0].iov_len = PAGE_CACHE_SIZE - cbuf_data(&con->cb); + iov[1].iov_len = con->cb.base; + iov[1].iov_base = page_address(con->rx_page); + nvec = 2; + } + len = iov[0].iov_len + iov[1].iov_len; + + r = ret = kernel_recvmsg(con->sock, &msg, iov, nvec, len, + MSG_DONTWAIT | MSG_NOSIGNAL); + if (ret <= 0) + goto out_close; + + /* Process SCTP notifications */ + if (msg.msg_flags & MSG_NOTIFICATION) { + msg.msg_control = incmsg; + msg.msg_controllen = sizeof(incmsg); + + process_sctp_notification(con, &msg, + page_address(con->rx_page) + con->cb.base); + mutex_unlock(&con->sock_mutex); + return 0; + } + BUG_ON(con->nodeid == 0); + + if (ret == len) + call_again_soon = 1; + cbuf_add(&con->cb, ret); + ret = dlm_process_incoming_buffer(con->nodeid, + page_address(con->rx_page), + con->cb.base, con->cb.len, + PAGE_CACHE_SIZE); + if (ret == -EBADMSG) { + log_print("lowcomms: addr=%p, base=%u, len=%u, " + "iov_len=%u, iov_base[0]=%p, read=%d", + page_address(con->rx_page), con->cb.base, con->cb.len, + len, iov[0].iov_base, r); + } + if (ret < 0) + goto out_close; + cbuf_eat(&con->cb, ret); + + if (cbuf_empty(&con->cb) && !call_again_soon) { + __free_page(con->rx_page); + con->rx_page = NULL; + } + + if (call_again_soon) + goto out_resched; + mutex_unlock(&con->sock_mutex); + return 0; + +out_resched: + if (!test_and_set_bit(CF_READ_PENDING, &con->flags)) + queue_work(recv_workqueue, &con->rwork); + mutex_unlock(&con->sock_mutex); + return -EAGAIN; + +out_close: + mutex_unlock(&con->sock_mutex); + if (ret != -EAGAIN && !test_bit(CF_IS_OTHERCON, &con->flags)) { + close_connection(con, 0); + /* Reconnect when there is something to send */ + } + /* Don't return success if we really got EOF */ + if (ret == 0) + ret = -EAGAIN; + + return ret; +} + +/* Listening socket is busy, accept a connection */ +static int tcp_accept_from_sock(struct connection *con) +{ + int result; + struct sockaddr_storage peeraddr; + struct socket *newsock; + int len; + int nodeid; + struct connection *newcon; + struct connection *addcon; + + memset(&peeraddr, 0, sizeof(peeraddr)); + result = sock_create_kern(dlm_local_addr[0]->ss_family, SOCK_STREAM, + IPPROTO_TCP, &newsock); + if (result < 0) + return -ENOMEM; + + mutex_lock_nested(&con->sock_mutex, 0); + + result = -ENOTCONN; + if (con->sock == NULL) + goto accept_err; + + newsock->type = con->sock->type; + newsock->ops = con->sock->ops; + + result = con->sock->ops->accept(con->sock, newsock, O_NONBLOCK); + if (result < 0) + goto accept_err; + + /* Get the connected socket's peer */ + memset(&peeraddr, 0, sizeof(peeraddr)); + if (newsock->ops->getname(newsock, (struct sockaddr *)&peeraddr, + &len, 2)) { + result = -ECONNABORTED; + goto accept_err; + } + + /* Get the new node's NODEID */ + make_sockaddr(&peeraddr, 0, &len); + if (dlm_addr_to_nodeid(&peeraddr, &nodeid)) { + log_print("connect from non cluster node"); + sock_release(newsock); + mutex_unlock(&con->sock_mutex); + return -1; + } + + log_print("got connection from %d", nodeid); + + /* Check to see if we already have a connection to this node. This + * could happen if the two nodes initiate a connection at roughly + * the same time and the connections cross on the wire. + * In this case we store the incoming one in "othercon" + */ + newcon = nodeid2con(nodeid, GFP_KERNEL); + if (!newcon) { + result = -ENOMEM; + goto accept_err; + } + mutex_lock_nested(&newcon->sock_mutex, 1); + if (newcon->sock) { + struct connection *othercon = newcon->othercon; + + if (!othercon) { + othercon = kmem_cache_zalloc(con_cache, GFP_KERNEL); + if (!othercon) { + log_print("failed to allocate incoming socket"); + mutex_unlock(&newcon->sock_mutex); + result = -ENOMEM; + goto accept_err; + } + othercon->nodeid = nodeid; + othercon->rx_action = receive_from_sock; + mutex_init(&othercon->sock_mutex); + INIT_WORK(&othercon->swork, process_send_sockets, othercon); + INIT_WORK(&othercon->rwork, process_recv_sockets, othercon); + set_bit(CF_IS_OTHERCON, &othercon->flags); + newcon->othercon = othercon; + } + othercon->sock = newsock; + newsock->sk->sk_user_data = othercon; + add_sock(newsock, othercon); + addcon = othercon; + } + else { + newsock->sk->sk_user_data = newcon; + newcon->rx_action = receive_from_sock; + add_sock(newsock, newcon); + addcon = newcon; + } + + mutex_unlock(&newcon->sock_mutex); + + /* + * Add it to the active queue in case we got data + * beween processing the accept adding the socket + * to the read_sockets list + */ + if (!test_and_set_bit(CF_READ_PENDING, &addcon->flags)) + queue_work(recv_workqueue, &addcon->rwork); + mutex_unlock(&con->sock_mutex); + + return 0; + +accept_err: + mutex_unlock(&con->sock_mutex); + sock_release(newsock); + + if (result != -EAGAIN) + log_print("error accepting connection from node: %d", result); + return result; +} + +static void free_entry(struct writequeue_entry *e) +{ + __free_page(e->page); + kfree(e); +} + +/* Initiate an SCTP association. + This is a special case of send_to_sock() in that we don't yet have a + peeled-off socket for this association, so we use the listening socket + and add the primary IP address of the remote node. + */ +static void sctp_init_assoc(struct connection *con) +{ + struct sockaddr_storage rem_addr; + char outcmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))]; + struct msghdr outmessage; + struct cmsghdr *cmsg; + struct sctp_sndrcvinfo *sinfo; + struct connection *base_con; + struct writequeue_entry *e; + int len, offset; + int ret; + int addrlen; + struct kvec iov[1]; + + if (test_and_set_bit(CF_INIT_PENDING, &con->flags)) + return; + + if (con->retries++ > MAX_CONNECT_RETRIES) + return; + + log_print("Initiating association with node %d", con->nodeid); + + if (nodeid_to_addr(con->nodeid, (struct sockaddr *)&rem_addr)) { + log_print("no address for nodeid %d", con->nodeid); + return; + } + base_con = nodeid2con(0, 0); + BUG_ON(base_con == NULL); + + make_sockaddr(&rem_addr, dlm_config.ci_tcp_port, &addrlen); + + outmessage.msg_name = &rem_addr; + outmessage.msg_namelen = addrlen; + outmessage.msg_control = outcmsg; + outmessage.msg_controllen = sizeof(outcmsg); + outmessage.msg_flags = MSG_EOR; + + spin_lock(&con->writequeue_lock); + e = list_entry(con->writequeue.next, struct writequeue_entry, + list); + + BUG_ON((struct list_head *) e == &con->writequeue); + + len = e->len; + offset = e->offset; + spin_unlock(&con->writequeue_lock); + kmap(e->page); + + /* Send the first block off the write queue */ + iov[0].iov_base = page_address(e->page)+offset; + iov[0].iov_len = len; + + cmsg = CMSG_FIRSTHDR(&outmessage); + cmsg->cmsg_level = IPPROTO_SCTP; + cmsg->cmsg_type = SCTP_SNDRCV; + cmsg->cmsg_len = CMSG_LEN(sizeof(struct sctp_sndrcvinfo)); + sinfo = CMSG_DATA(cmsg); + memset(sinfo, 0x00, sizeof(struct sctp_sndrcvinfo)); + sinfo->sinfo_ppid = cpu_to_le32(dlm_our_nodeid()); + outmessage.msg_controllen = cmsg->cmsg_len; + + ret = kernel_sendmsg(base_con->sock, &outmessage, iov, 1, len); + if (ret < 0) { + log_print("Send first packet to node %d failed: %d", + con->nodeid, ret); + + /* Try again later */ + clear_bit(CF_CONNECT_PENDING, &con->flags); + clear_bit(CF_INIT_PENDING, &con->flags); + } + else { + spin_lock(&con->writequeue_lock); + e->offset += ret; + e->len -= ret; + + if (e->len == 0 && e->users == 0) { + list_del(&e->list); + kunmap(e->page); + free_entry(e); + } + spin_unlock(&con->writequeue_lock); + } +} + +/* Connect a new socket to its peer */ +static void tcp_connect_to_sock(struct connection *con) +{ + int result = -EHOSTUNREACH; + struct sockaddr_storage saddr; + int addr_len; + struct socket *sock; + + if (con->nodeid == 0) { + log_print("attempt to connect sock 0 foiled"); + return; + } + + mutex_lock(&con->sock_mutex); + if (con->retries++ > MAX_CONNECT_RETRIES) + goto out; + + /* Some odd races can cause double-connects, ignore them */ + if (con->sock) { + result = 0; + goto out; + } + + /* Create a socket to communicate with */ + result = sock_create_kern(dlm_local_addr[0]->ss_family, SOCK_STREAM, + IPPROTO_TCP, &sock); + if (result < 0) + goto out_err; + + memset(&saddr, 0, sizeof(saddr)); + if (dlm_nodeid_to_addr(con->nodeid, &saddr)) + goto out_err; + + sock->sk->sk_user_data = con; + con->rx_action = receive_from_sock; + con->connect_action = tcp_connect_to_sock; + add_sock(sock, con); + + make_sockaddr(&saddr, dlm_config.ci_tcp_port, &addr_len); + + log_print("connecting to %d", con->nodeid); + result = + sock->ops->connect(sock, (struct sockaddr *)&saddr, addr_len, + O_NONBLOCK); + if (result == -EINPROGRESS) + result = 0; + if (result == 0) + goto out; + +out_err: + if (con->sock) { + sock_release(con->sock); + con->sock = NULL; + } + /* + * Some errors are fatal and this list might need adjusting. For other + * errors we try again until the max number of retries is reached. + */ + if (result != -EHOSTUNREACH && result != -ENETUNREACH && + result != -ENETDOWN && result != EINVAL + && result != -EPROTONOSUPPORT) { + lowcomms_connect_sock(con); + result = 0; + } +out: + mutex_unlock(&con->sock_mutex); + return; +} + +static struct socket *tcp_create_listen_sock(struct connection *con, + struct sockaddr_storage *saddr) +{ + struct socket *sock = NULL; + int result = 0; + int one = 1; + int addr_len; + + if (dlm_local_addr[0]->ss_family == AF_INET) + addr_len = sizeof(struct sockaddr_in); + else + addr_len = sizeof(struct sockaddr_in6); + + /* Create a socket to communicate with */ + result = sock_create_kern(dlm_local_addr[0]->ss_family, SOCK_STREAM, + IPPROTO_TCP, &sock); + if (result < 0) { + log_print("Can't create listening comms socket"); + goto create_out; + } + + result = kernel_setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, + (char *)&one, sizeof(one)); + + if (result < 0) { + log_print("Failed to set SO_REUSEADDR on socket: %d", result); + } + sock->sk->sk_user_data = con; + con->rx_action = tcp_accept_from_sock; + con->connect_action = tcp_connect_to_sock; + con->sock = sock; + + /* Bind to our port */ + make_sockaddr(saddr, dlm_config.ci_tcp_port, &addr_len); + result = sock->ops->bind(sock, (struct sockaddr *) saddr, addr_len); + if (result < 0) { + log_print("Can't bind to port %d", dlm_config.ci_tcp_port); + sock_release(sock); + sock = NULL; + con->sock = NULL; + goto create_out; + } + result = kernel_setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE, + (char *)&one, sizeof(one)); + if (result < 0) { + log_print("Set keepalive failed: %d", result); + } + + result = sock->ops->listen(sock, 5); + if (result < 0) { + log_print("Can't listen on port %d", dlm_config.ci_tcp_port); + sock_release(sock); + sock = NULL; + goto create_out; + } + +create_out: + return sock; +} + +/* Get local addresses */ +static void init_local(void) +{ + struct sockaddr_storage sas, *addr; + int i; + + dlm_local_count = 0; + for (i = 0; i < DLM_MAX_ADDR_COUNT - 1; i++) { + if (dlm_our_addr(&sas, i)) + break; + + addr = kmalloc(sizeof(*addr), GFP_KERNEL); + if (!addr) + break; + memcpy(addr, &sas, sizeof(*addr)); + dlm_local_addr[dlm_local_count++] = addr; + } +} + +/* Bind to an IP address. SCTP allows multiple address so it can do + multi-homing */ +static int add_sctp_bind_addr(struct connection *sctp_con, + struct sockaddr_storage *addr, + int addr_len, int num) +{ + int result = 0; + + if (num == 1) + result = kernel_bind(sctp_con->sock, + (struct sockaddr *) addr, + addr_len); + else + result = kernel_setsockopt(sctp_con->sock, SOL_SCTP, + SCTP_SOCKOPT_BINDX_ADD, + (char *)addr, addr_len); + + if (result < 0) + log_print("Can't bind to port %d addr number %d", + dlm_config.ci_tcp_port, num); + + return result; +} + +/* Initialise SCTP socket and bind to all interfaces */ +static int sctp_listen_for_all(void) +{ + struct socket *sock = NULL; + struct sockaddr_storage localaddr; + struct sctp_event_subscribe subscribe; + int result = -EINVAL, num = 1, i, addr_len; + struct connection *con = nodeid2con(0, GFP_KERNEL); + int bufsize = NEEDED_RMEM; + + if (!con) + return -ENOMEM; + + log_print("Using SCTP for communications"); + + result = sock_create_kern(dlm_local_addr[0]->ss_family, SOCK_SEQPACKET, + IPPROTO_SCTP, &sock); + if (result < 0) { + log_print("Can't create comms socket, check SCTP is loaded"); + goto out; + } + + /* Listen for events */ + memset(&subscribe, 0, sizeof(subscribe)); + subscribe.sctp_data_io_event = 1; + subscribe.sctp_association_event = 1; + subscribe.sctp_send_failure_event = 1; + subscribe.sctp_shutdown_event = 1; + subscribe.sctp_partial_delivery_event = 1; + + result = kernel_setsockopt(sock, SOL_SOCKET, SO_RCVBUF, + (char *)&bufsize, sizeof(bufsize)); + if (result) + log_print("Error increasing buffer space on socket %d", result); + + result = kernel_setsockopt(sock, SOL_SCTP, SCTP_EVENTS, + (char *)&subscribe, sizeof(subscribe)); + if (result < 0) { + log_print("Failed to set SCTP_EVENTS on socket: result=%d", + result); + goto create_delsock; + } + + /* Init con struct */ + sock->sk->sk_user_data = con; + con->sock = sock; + con->sock->sk->sk_data_ready = lowcomms_data_ready; + con->rx_action = receive_from_sock; + con->connect_action = sctp_init_assoc; + + /* Bind to all interfaces. */ + for (i = 0; i < dlm_local_count; i++) { + memcpy(&localaddr, dlm_local_addr[i], sizeof(localaddr)); + make_sockaddr(&localaddr, dlm_config.ci_tcp_port, &addr_len); + + result = add_sctp_bind_addr(con, &localaddr, addr_len, num); + if (result) + goto create_delsock; + ++num; + } + + result = sock->ops->listen(sock, 5); + if (result < 0) { + log_print("Can't set socket listening"); + goto create_delsock; + } + + return 0; + +create_delsock: + sock_release(sock); + con->sock = NULL; +out: + return result; +} + +static int tcp_listen_for_all(void) +{ + struct socket *sock = NULL; + struct connection *con = nodeid2con(0, GFP_KERNEL); + int result = -EINVAL; + + if (!con) + return -ENOMEM; + + /* We don't support multi-homed hosts */ + if (dlm_local_addr[1] != NULL) { + log_print("TCP protocol can't handle multi-homed hosts, " + "try SCTP"); + return -EINVAL; + } + + log_print("Using TCP for communications"); + + set_bit(CF_IS_OTHERCON, &con->flags); + + sock = tcp_create_listen_sock(con, dlm_local_addr[0]); + if (sock) { + add_sock(sock, con); + result = 0; + } + else { + result = -EADDRINUSE; + } + + return result; +} + + + +static struct writequeue_entry *new_writequeue_entry(struct connection *con, + gfp_t allocation) +{ + struct writequeue_entry *entry; + + entry = kmalloc(sizeof(struct writequeue_entry), allocation); + if (!entry) + return NULL; + + entry->page = alloc_page(allocation); + if (!entry->page) { + kfree(entry); + return NULL; + } + + entry->offset = 0; + entry->len = 0; + entry->end = 0; + entry->users = 0; + entry->con = con; + + return entry; +} + +void *dlm_lowcomms_get_buffer(int nodeid, int len, gfp_t allocation, char **ppc) +{ + struct connection *con; + struct writequeue_entry *e; + int offset = 0; + int users = 0; + + con = nodeid2con(nodeid, allocation); + if (!con) + return NULL; + + spin_lock(&con->writequeue_lock); + e = list_entry(con->writequeue.prev, struct writequeue_entry, list); + if ((&e->list == &con->writequeue) || + (PAGE_CACHE_SIZE - e->end < len)) { + e = NULL; + } else { + offset = e->end; + e->end += len; + users = e->users++; + } + spin_unlock(&con->writequeue_lock); + + if (e) { + got_one: + if (users == 0) + kmap(e->page); + *ppc = page_address(e->page) + offset; + return e; + } + + e = new_writequeue_entry(con, allocation); + if (e) { + spin_lock(&con->writequeue_lock); + offset = e->end; + e->end += len; + users = e->users++; + list_add_tail(&e->list, &con->writequeue); + spin_unlock(&con->writequeue_lock); + goto got_one; + } + return NULL; +} + +void dlm_lowcomms_commit_buffer(void *mh) +{ + struct writequeue_entry *e = (struct writequeue_entry *)mh; + struct connection *con = e->con; + int users; + + spin_lock(&con->writequeue_lock); + users = --e->users; + if (users) + goto out; + e->len = e->end - e->offset; + kunmap(e->page); + spin_unlock(&con->writequeue_lock); + + if (!test_and_set_bit(CF_WRITE_PENDING, &con->flags)) { + queue_work(send_workqueue, &con->swork); + } + return; + +out: + spin_unlock(&con->writequeue_lock); + return; +} + +/* Send a message */ +static void send_to_sock(struct connection *con) +{ + int ret = 0; + ssize_t(*sendpage) (struct socket *, struct page *, int, size_t, int); + const int msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL; + struct writequeue_entry *e; + int len, offset; + + mutex_lock(&con->sock_mutex); + if (con->sock == NULL) + goto out_connect; + + sendpage = con->sock->ops->sendpage; + + spin_lock(&con->writequeue_lock); + for (;;) { + e = list_entry(con->writequeue.next, struct writequeue_entry, + list); + if ((struct list_head *) e == &con->writequeue) + break; + + len = e->len; + offset = e->offset; + BUG_ON(len == 0 && e->users == 0); + spin_unlock(&con->writequeue_lock); + kmap(e->page); + + ret = 0; + if (len) { + ret = sendpage(con->sock, e->page, offset, len, + msg_flags); + if (ret == -EAGAIN || ret == 0) + goto out; + if (ret <= 0) + goto send_error; + } else { + /* Don't starve people filling buffers */ + cond_resched(); + } + + spin_lock(&con->writequeue_lock); + e->offset += ret; + e->len -= ret; + + if (e->len == 0 && e->users == 0) { + list_del(&e->list); + kunmap(e->page); + free_entry(e); + continue; + } + } + spin_unlock(&con->writequeue_lock); +out: + mutex_unlock(&con->sock_mutex); + return; + +send_error: + mutex_unlock(&con->sock_mutex); + close_connection(con, 0); + lowcomms_connect_sock(con); + return; + +out_connect: + mutex_unlock(&con->sock_mutex); + if (!test_bit(CF_INIT_PENDING, &con->flags)) + lowcomms_connect_sock(con); + return; +} + +static void clean_one_writequeue(struct connection *con) +{ + struct list_head *list; + struct list_head *temp; + + spin_lock(&con->writequeue_lock); + list_for_each_safe(list, temp, &con->writequeue) { + struct writequeue_entry *e = + list_entry(list, struct writequeue_entry, list); + list_del(&e->list); + free_entry(e); + } + spin_unlock(&con->writequeue_lock); +} + +/* Called from recovery when it knows that a node has + left the cluster */ +int dlm_lowcomms_close(int nodeid) +{ + struct connection *con; + + log_print("closing connection to node %d", nodeid); + con = nodeid2con(nodeid, 0); + if (con) { + clean_one_writequeue(con); + close_connection(con, 1); + } + return 0; +} + +/* Receive workqueue function */ +static void process_recv_sockets(void *data) +{ + struct connection *con = data; + int err; + + clear_bit(CF_READ_PENDING, &con->flags); + do { + err = con->rx_action(con); + } while (!err); +} + +/* Send workqueue function */ +static void process_send_sockets(void *data) +{ + struct connection *con = data; + if (test_and_clear_bit(CF_CONNECT_PENDING, &con->flags)) { + con->connect_action(con); + } + clear_bit(CF_WRITE_PENDING, &con->flags); + send_to_sock(con); +} + + +/* Discard all entries on the write queues */ +static void clean_writequeues(void) +{ + int nodeid; + + for (nodeid = 1; nodeid <= max_nodeid; nodeid++) { + struct connection *con = __nodeid2con(nodeid, 0); + + if (con) + clean_one_writequeue(con); + } +} + +static void work_stop(void) +{ + destroy_workqueue(recv_workqueue); + destroy_workqueue(send_workqueue); +} + +static int work_start(void) +{ + int error; + recv_workqueue = create_singlethread_workqueue("dlm_recv"); + error = IS_ERR(recv_workqueue); + if (error) { + log_print("can't start dlm_recv %d", error); + return error; + } + + send_workqueue = create_singlethread_workqueue("dlm_send"); + error = IS_ERR(send_workqueue); + if (error) { + log_print("can't start dlm_send %d", error); + destroy_workqueue(recv_workqueue); + return error; + } + + return 0; +} + +void dlm_lowcomms_stop(void) +{ + int i; + struct connection *con; + + /* Set all the flags to prevent any + socket activity. + */ + down(&connections_lock); + for (i = 0; i <= max_nodeid; i++) { + con = __nodeid2con(i, 0); + if (con) + con->flags |= 0xFF; + } + up(&connections_lock); + + work_stop(); + + down(&connections_lock); + clean_writequeues(); + + for (i = 0; i <= max_nodeid; i++) { + con = __nodeid2con(i, 0); + if (con) { + close_connection(con, 1); + if (con->othercon) + kmem_cache_free(con_cache, con->othercon); + kmem_cache_free(con_cache, con); + } + } + max_nodeid = 0; + up(&connections_lock); + kmem_cache_destroy(con_cache); + idr_init(&connections_idr); +} + +int dlm_lowcomms_start(void) +{ + int error = -EINVAL; + struct connection *con; + + init_local(); + if (!dlm_local_count) { + error = -ENOTCONN; + log_print("no local IP address has been set"); + goto out; + } + + error = -ENOMEM; + con_cache = kmem_cache_create("dlm_conn", sizeof(struct connection), + __alignof__(struct connection), 0, + NULL, NULL); + if (!con_cache) + goto out; + + /* Set some sysctl minima */ + if (sysctl_rmem_max < NEEDED_RMEM) + sysctl_rmem_max = NEEDED_RMEM; + + /* Start listening */ + if (dlm_config.ci_protocol == 0) + error = tcp_listen_for_all(); + else + error = sctp_listen_for_all(); + if (error) + goto fail_unlisten; + + error = work_start(); + if (error) + goto fail_unlisten; + + return 0; + +fail_unlisten: + con = nodeid2con(0,0); + if (con) { + close_connection(con, 0); + kmem_cache_free(con_cache, con); + } + kmem_cache_destroy(con_cache); + +out: + return error; +} Index: linux-rhel51-quilt/fs/dlm/lowcomms.h =================================================================== --- linux-rhel51-quilt.orig/fs/dlm/lowcomms.h 2007-06-08 09:01:02.000000000 -0500 +++ linux-rhel51-quilt/fs/dlm/lowcomms.h 2007-06-08 09:24:08.000000000 -0500 @@ -14,12 +14,10 @@ #ifndef __LOWCOMMS_DOT_H__ #define __LOWCOMMS_DOT_H__ -int dlm_lowcomms_init(void); -void dlm_lowcomms_exit(void); int dlm_lowcomms_start(void); void dlm_lowcomms_stop(void); int dlm_lowcomms_close(int nodeid); -void *dlm_lowcomms_get_buffer(int nodeid, int len, int allocation, char **ppc); +void *dlm_lowcomms_get_buffer(int nodeid, int len, gfp_t allocation, char **ppc); void dlm_lowcomms_commit_buffer(void *mh); #endif /* __LOWCOMMS_DOT_H__ */ Index: linux-rhel51-quilt/fs/dlm/main.c =================================================================== --- linux-rhel51-quilt.orig/fs/dlm/main.c 2007-06-08 09:01:02.000000000 -0500 +++ linux-rhel51-quilt/fs/dlm/main.c 2007-06-08 09:24:08.000000000 -0500 @@ -16,7 +16,6 @@ #include "lock.h" #include "user.h" #include "memory.h" -#include "lowcomms.h" #include "config.h" #ifdef CONFIG_DLM_DEBUG @@ -47,20 +46,14 @@ if (error) goto out_config; - error = dlm_lowcomms_init(); - if (error) - goto out_debug; - error = dlm_user_init(); if (error) - goto out_lowcomms; + goto out_debug; printk("DLM (built %s %s) installed\n", __DATE__, __TIME__); return 0; - out_lowcomms: - dlm_lowcomms_exit(); out_debug: dlm_unregister_debugfs(); out_config: @@ -76,7 +69,6 @@ static void __exit exit_dlm(void) { dlm_user_exit(); - dlm_lowcomms_exit(); dlm_config_exit(); dlm_memory_exit(); dlm_lockspace_exit();