mirror of
https://github.com/adulau/aha.git
synced 2024-12-27 11:16:11 +00:00
RDS: Add TCP transport to RDS
This code allows RDS to be tunneled over a TCP connection. RDMA operations are disabled when using TCP transport, but this frees RDS from the IB/RDMA stack dependency, and allows it to be used with standard Ethernet adapters, or in a VM. Signed-off-by: Andy Grover <andy.grover@oracle.com> Signed-off-by: David S. Miller <davem@davemloft.net>
This commit is contained in:
parent
7d6fd5e7e9
commit
70041088e3
8 changed files with 1469 additions and 0 deletions
|
@ -147,6 +147,18 @@ struct rds_info_socket {
|
|||
u_int64_t inum;
|
||||
} __attribute__((packed));
|
||||
|
||||
struct rds_info_tcp_socket {
|
||||
__be32 local_addr;
|
||||
__be16 local_port;
|
||||
__be32 peer_addr;
|
||||
__be16 peer_port;
|
||||
u_int64_t hdr_rem;
|
||||
u_int64_t data_rem;
|
||||
u_int32_t last_sent_nxt;
|
||||
u_int32_t last_expected_una;
|
||||
u_int32_t last_seen_una;
|
||||
} __attribute__((packed));
|
||||
|
||||
#define RDS_IB_GID_LEN 16
|
||||
struct rds_info_rdma_connection {
|
||||
__be32 src_addr;
|
||||
|
|
319
net/rds/tcp.c
Normal file
319
net/rds/tcp.c
Normal file
|
@ -0,0 +1,319 @@
|
|||
/*
|
||||
* Copyright (c) 2006 Oracle. All rights reserved.
|
||||
*
|
||||
* This software is available to you under a choice of one of two
|
||||
* licenses. You may choose to be licensed under the terms of the GNU
|
||||
* General Public License (GPL) Version 2, available from the file
|
||||
* COPYING in the main directory of this source tree, or the
|
||||
* OpenIB.org BSD license below:
|
||||
*
|
||||
* Redistribution and use in source and binary forms, with or
|
||||
* without modification, are permitted provided that the following
|
||||
* conditions are met:
|
||||
*
|
||||
* - Redistributions of source code must retain the above
|
||||
* copyright notice, this list of conditions and the following
|
||||
* disclaimer.
|
||||
*
|
||||
* - Redistributions in binary form must reproduce the above
|
||||
* copyright notice, this list of conditions and the following
|
||||
* disclaimer in the documentation and/or other materials
|
||||
* provided with the distribution.
|
||||
*
|
||||
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
|
||||
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
|
||||
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
|
||||
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
|
||||
* BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
|
||||
* ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
|
||||
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
* SOFTWARE.
|
||||
*
|
||||
*/
|
||||
#include <linux/kernel.h>
|
||||
#include <linux/in.h>
|
||||
#include <net/tcp.h>
|
||||
|
||||
#include "rds.h"
|
||||
#include "tcp.h"
|
||||
|
||||
/* only for info exporting */
|
||||
static DEFINE_SPINLOCK(rds_tcp_tc_list_lock);
|
||||
static LIST_HEAD(rds_tcp_tc_list);
|
||||
unsigned int rds_tcp_tc_count;
|
||||
|
||||
/* Track rds_tcp_connection structs so they can be cleaned up */
|
||||
static DEFINE_SPINLOCK(rds_tcp_conn_lock);
|
||||
static LIST_HEAD(rds_tcp_conn_list);
|
||||
|
||||
static struct kmem_cache *rds_tcp_conn_slab;
|
||||
|
||||
#define RDS_TCP_DEFAULT_BUFSIZE (128 * 1024)
|
||||
|
||||
/* doing it this way avoids calling tcp_sk() */
|
||||
void rds_tcp_nonagle(struct socket *sock)
|
||||
{
|
||||
mm_segment_t oldfs = get_fs();
|
||||
int val = 1;
|
||||
|
||||
set_fs(KERNEL_DS);
|
||||
sock->ops->setsockopt(sock, SOL_TCP, TCP_NODELAY, (char __user *)&val,
|
||||
sizeof(val));
|
||||
set_fs(oldfs);
|
||||
}
|
||||
|
||||
void rds_tcp_tune(struct socket *sock)
|
||||
{
|
||||
struct sock *sk = sock->sk;
|
||||
|
||||
rds_tcp_nonagle(sock);
|
||||
|
||||
/*
|
||||
* We're trying to saturate gigabit with the default,
|
||||
* see svc_sock_setbufsize().
|
||||
*/
|
||||
lock_sock(sk);
|
||||
sk->sk_sndbuf = RDS_TCP_DEFAULT_BUFSIZE;
|
||||
sk->sk_rcvbuf = RDS_TCP_DEFAULT_BUFSIZE;
|
||||
sk->sk_userlocks |= SOCK_SNDBUF_LOCK|SOCK_RCVBUF_LOCK;
|
||||
release_sock(sk);
|
||||
}
|
||||
|
||||
u32 rds_tcp_snd_nxt(struct rds_tcp_connection *tc)
|
||||
{
|
||||
return tcp_sk(tc->t_sock->sk)->snd_nxt;
|
||||
}
|
||||
|
||||
u32 rds_tcp_snd_una(struct rds_tcp_connection *tc)
|
||||
{
|
||||
return tcp_sk(tc->t_sock->sk)->snd_una;
|
||||
}
|
||||
|
||||
void rds_tcp_restore_callbacks(struct socket *sock,
|
||||
struct rds_tcp_connection *tc)
|
||||
{
|
||||
rdsdebug("restoring sock %p callbacks from tc %p\n", sock, tc);
|
||||
write_lock_bh(&sock->sk->sk_callback_lock);
|
||||
|
||||
/* done under the callback_lock to serialize with write_space */
|
||||
spin_lock(&rds_tcp_tc_list_lock);
|
||||
list_del_init(&tc->t_list_item);
|
||||
rds_tcp_tc_count--;
|
||||
spin_unlock(&rds_tcp_tc_list_lock);
|
||||
|
||||
tc->t_sock = NULL;
|
||||
|
||||
sock->sk->sk_write_space = tc->t_orig_write_space;
|
||||
sock->sk->sk_data_ready = tc->t_orig_data_ready;
|
||||
sock->sk->sk_state_change = tc->t_orig_state_change;
|
||||
sock->sk->sk_user_data = NULL;
|
||||
|
||||
write_unlock_bh(&sock->sk->sk_callback_lock);
|
||||
}
|
||||
|
||||
/*
|
||||
* This is the only path that sets tc->t_sock. Send and receive trust that
|
||||
* it is set. The RDS_CONN_CONNECTED bit protects those paths from being
|
||||
* called while it isn't set.
|
||||
*/
|
||||
void rds_tcp_set_callbacks(struct socket *sock, struct rds_connection *conn)
|
||||
{
|
||||
struct rds_tcp_connection *tc = conn->c_transport_data;
|
||||
|
||||
rdsdebug("setting sock %p callbacks to tc %p\n", sock, tc);
|
||||
write_lock_bh(&sock->sk->sk_callback_lock);
|
||||
|
||||
/* done under the callback_lock to serialize with write_space */
|
||||
spin_lock(&rds_tcp_tc_list_lock);
|
||||
list_add_tail(&tc->t_list_item, &rds_tcp_tc_list);
|
||||
rds_tcp_tc_count++;
|
||||
spin_unlock(&rds_tcp_tc_list_lock);
|
||||
|
||||
/* accepted sockets need our listen data ready undone */
|
||||
if (sock->sk->sk_data_ready == rds_tcp_listen_data_ready)
|
||||
sock->sk->sk_data_ready = sock->sk->sk_user_data;
|
||||
|
||||
tc->t_sock = sock;
|
||||
tc->conn = conn;
|
||||
tc->t_orig_data_ready = sock->sk->sk_data_ready;
|
||||
tc->t_orig_write_space = sock->sk->sk_write_space;
|
||||
tc->t_orig_state_change = sock->sk->sk_state_change;
|
||||
|
||||
sock->sk->sk_user_data = conn;
|
||||
sock->sk->sk_data_ready = rds_tcp_data_ready;
|
||||
sock->sk->sk_write_space = rds_tcp_write_space;
|
||||
sock->sk->sk_state_change = rds_tcp_state_change;
|
||||
|
||||
write_unlock_bh(&sock->sk->sk_callback_lock);
|
||||
}
|
||||
|
||||
static void rds_tcp_tc_info(struct socket *sock, unsigned int len,
|
||||
struct rds_info_iterator *iter,
|
||||
struct rds_info_lengths *lens)
|
||||
{
|
||||
struct rds_info_tcp_socket tsinfo;
|
||||
struct rds_tcp_connection *tc;
|
||||
unsigned long flags;
|
||||
struct sockaddr_in sin;
|
||||
int sinlen;
|
||||
|
||||
spin_lock_irqsave(&rds_tcp_tc_list_lock, flags);
|
||||
|
||||
if (len / sizeof(tsinfo) < rds_tcp_tc_count)
|
||||
goto out;
|
||||
|
||||
list_for_each_entry(tc, &rds_tcp_tc_list, t_list_item) {
|
||||
|
||||
sock->ops->getname(sock, (struct sockaddr *)&sin, &sinlen, 0);
|
||||
tsinfo.local_addr = sin.sin_addr.s_addr;
|
||||
tsinfo.local_port = sin.sin_port;
|
||||
sock->ops->getname(sock, (struct sockaddr *)&sin, &sinlen, 1);
|
||||
tsinfo.peer_addr = sin.sin_addr.s_addr;
|
||||
tsinfo.peer_port = sin.sin_port;
|
||||
|
||||
tsinfo.hdr_rem = tc->t_tinc_hdr_rem;
|
||||
tsinfo.data_rem = tc->t_tinc_data_rem;
|
||||
tsinfo.last_sent_nxt = tc->t_last_sent_nxt;
|
||||
tsinfo.last_expected_una = tc->t_last_expected_una;
|
||||
tsinfo.last_seen_una = tc->t_last_seen_una;
|
||||
|
||||
rds_info_copy(iter, &tsinfo, sizeof(tsinfo));
|
||||
}
|
||||
|
||||
out:
|
||||
lens->nr = rds_tcp_tc_count;
|
||||
lens->each = sizeof(tsinfo);
|
||||
|
||||
spin_unlock_irqrestore(&rds_tcp_tc_list_lock, flags);
|
||||
}
|
||||
|
||||
static int rds_tcp_laddr_check(__be32 addr)
|
||||
{
|
||||
if (inet_addr_type(&init_net, addr) == RTN_LOCAL)
|
||||
return 0;
|
||||
return -EADDRNOTAVAIL;
|
||||
}
|
||||
|
||||
static int rds_tcp_conn_alloc(struct rds_connection *conn, gfp_t gfp)
|
||||
{
|
||||
struct rds_tcp_connection *tc;
|
||||
|
||||
tc = kmem_cache_alloc(rds_tcp_conn_slab, gfp);
|
||||
if (tc == NULL)
|
||||
return -ENOMEM;
|
||||
|
||||
tc->t_sock = NULL;
|
||||
tc->t_tinc = NULL;
|
||||
tc->t_tinc_hdr_rem = sizeof(struct rds_header);
|
||||
tc->t_tinc_data_rem = 0;
|
||||
|
||||
conn->c_transport_data = tc;
|
||||
|
||||
spin_lock_irq(&rds_tcp_conn_lock);
|
||||
list_add_tail(&tc->t_tcp_node, &rds_tcp_conn_list);
|
||||
spin_unlock_irq(&rds_tcp_conn_lock);
|
||||
|
||||
rdsdebug("alloced tc %p\n", conn->c_transport_data);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static void rds_tcp_conn_free(void *arg)
|
||||
{
|
||||
struct rds_tcp_connection *tc = arg;
|
||||
rdsdebug("freeing tc %p\n", tc);
|
||||
kmem_cache_free(rds_tcp_conn_slab, tc);
|
||||
}
|
||||
|
||||
static void rds_tcp_destroy_conns(void)
|
||||
{
|
||||
struct rds_tcp_connection *tc, *_tc;
|
||||
LIST_HEAD(tmp_list);
|
||||
|
||||
/* avoid calling conn_destroy with irqs off */
|
||||
spin_lock_irq(&rds_tcp_conn_lock);
|
||||
list_splice(&rds_tcp_conn_list, &tmp_list);
|
||||
INIT_LIST_HEAD(&rds_tcp_conn_list);
|
||||
spin_unlock_irq(&rds_tcp_conn_lock);
|
||||
|
||||
list_for_each_entry_safe(tc, _tc, &tmp_list, t_tcp_node) {
|
||||
if (tc->conn->c_passive)
|
||||
rds_conn_destroy(tc->conn->c_passive);
|
||||
rds_conn_destroy(tc->conn);
|
||||
}
|
||||
}
|
||||
|
||||
void rds_tcp_exit(void)
|
||||
{
|
||||
rds_info_deregister_func(RDS_INFO_TCP_SOCKETS, rds_tcp_tc_info);
|
||||
rds_tcp_listen_stop();
|
||||
rds_tcp_destroy_conns();
|
||||
rds_trans_unregister(&rds_tcp_transport);
|
||||
rds_tcp_recv_exit();
|
||||
kmem_cache_destroy(rds_tcp_conn_slab);
|
||||
}
|
||||
module_exit(rds_tcp_exit);
|
||||
|
||||
struct rds_transport rds_tcp_transport = {
|
||||
.laddr_check = rds_tcp_laddr_check,
|
||||
.xmit_prepare = rds_tcp_xmit_prepare,
|
||||
.xmit_complete = rds_tcp_xmit_complete,
|
||||
.xmit_cong_map = rds_tcp_xmit_cong_map,
|
||||
.xmit = rds_tcp_xmit,
|
||||
.recv = rds_tcp_recv,
|
||||
.conn_alloc = rds_tcp_conn_alloc,
|
||||
.conn_free = rds_tcp_conn_free,
|
||||
.conn_connect = rds_tcp_conn_connect,
|
||||
.conn_shutdown = rds_tcp_conn_shutdown,
|
||||
.inc_copy_to_user = rds_tcp_inc_copy_to_user,
|
||||
.inc_purge = rds_tcp_inc_purge,
|
||||
.inc_free = rds_tcp_inc_free,
|
||||
.stats_info_copy = rds_tcp_stats_info_copy,
|
||||
.exit = rds_tcp_exit,
|
||||
.t_owner = THIS_MODULE,
|
||||
.t_name = "tcp",
|
||||
.t_prefer_loopback = 1,
|
||||
};
|
||||
|
||||
int __init rds_tcp_init(void)
|
||||
{
|
||||
int ret;
|
||||
|
||||
rds_tcp_conn_slab = kmem_cache_create("rds_tcp_connection",
|
||||
sizeof(struct rds_tcp_connection),
|
||||
0, 0, NULL);
|
||||
if (rds_tcp_conn_slab == NULL) {
|
||||
ret = -ENOMEM;
|
||||
goto out;
|
||||
}
|
||||
|
||||
ret = rds_tcp_recv_init();
|
||||
if (ret)
|
||||
goto out_slab;
|
||||
|
||||
ret = rds_trans_register(&rds_tcp_transport);
|
||||
if (ret)
|
||||
goto out_recv;
|
||||
|
||||
ret = rds_tcp_listen_init();
|
||||
if (ret)
|
||||
goto out_register;
|
||||
|
||||
rds_info_register_func(RDS_INFO_TCP_SOCKETS, rds_tcp_tc_info);
|
||||
|
||||
goto out;
|
||||
|
||||
out_register:
|
||||
rds_trans_unregister(&rds_tcp_transport);
|
||||
out_recv:
|
||||
rds_tcp_recv_exit();
|
||||
out_slab:
|
||||
kmem_cache_destroy(rds_tcp_conn_slab);
|
||||
out:
|
||||
return ret;
|
||||
}
|
||||
module_init(rds_tcp_init);
|
||||
|
||||
MODULE_AUTHOR("Oracle Corporation <rds-devel@oss.oracle.com>");
|
||||
MODULE_DESCRIPTION("RDS: TCP transport");
|
||||
MODULE_LICENSE("Dual BSD/GPL");
|
||||
|
93
net/rds/tcp.h
Normal file
93
net/rds/tcp.h
Normal file
|
@ -0,0 +1,93 @@
|
|||
#ifndef _RDS_TCP_H
|
||||
#define _RDS_TCP_H
|
||||
|
||||
#define RDS_TCP_PORT 16385
|
||||
|
||||
struct rds_tcp_incoming {
|
||||
struct rds_incoming ti_inc;
|
||||
struct sk_buff_head ti_skb_list;
|
||||
};
|
||||
|
||||
struct rds_tcp_connection {
|
||||
|
||||
struct list_head t_tcp_node;
|
||||
struct rds_connection *conn;
|
||||
struct socket *t_sock;
|
||||
void *t_orig_write_space;
|
||||
void *t_orig_data_ready;
|
||||
void *t_orig_state_change;
|
||||
|
||||
struct rds_tcp_incoming *t_tinc;
|
||||
size_t t_tinc_hdr_rem;
|
||||
size_t t_tinc_data_rem;
|
||||
|
||||
/* XXX error report? */
|
||||
struct work_struct t_conn_w;
|
||||
struct work_struct t_send_w;
|
||||
struct work_struct t_down_w;
|
||||
struct work_struct t_recv_w;
|
||||
|
||||
/* for info exporting only */
|
||||
struct list_head t_list_item;
|
||||
u32 t_last_sent_nxt;
|
||||
u32 t_last_expected_una;
|
||||
u32 t_last_seen_una;
|
||||
};
|
||||
|
||||
struct rds_tcp_statistics {
|
||||
uint64_t s_tcp_data_ready_calls;
|
||||
uint64_t s_tcp_write_space_calls;
|
||||
uint64_t s_tcp_sndbuf_full;
|
||||
uint64_t s_tcp_connect_raced;
|
||||
uint64_t s_tcp_listen_closed_stale;
|
||||
};
|
||||
|
||||
/* tcp.c */
|
||||
int __init rds_tcp_init(void);
|
||||
void rds_tcp_exit(void);
|
||||
void rds_tcp_tune(struct socket *sock);
|
||||
void rds_tcp_nonagle(struct socket *sock);
|
||||
void rds_tcp_set_callbacks(struct socket *sock, struct rds_connection *conn);
|
||||
void rds_tcp_restore_callbacks(struct socket *sock,
|
||||
struct rds_tcp_connection *tc);
|
||||
u32 rds_tcp_snd_nxt(struct rds_tcp_connection *tc);
|
||||
u32 rds_tcp_snd_una(struct rds_tcp_connection *tc);
|
||||
u64 rds_tcp_map_seq(struct rds_tcp_connection *tc, u32 seq);
|
||||
extern struct rds_transport rds_tcp_transport;
|
||||
|
||||
/* tcp_connect.c */
|
||||
int rds_tcp_conn_connect(struct rds_connection *conn);
|
||||
void rds_tcp_conn_shutdown(struct rds_connection *conn);
|
||||
void rds_tcp_state_change(struct sock *sk);
|
||||
|
||||
/* tcp_listen.c */
|
||||
int __init rds_tcp_listen_init(void);
|
||||
void rds_tcp_listen_stop(void);
|
||||
void rds_tcp_listen_data_ready(struct sock *sk, int bytes);
|
||||
|
||||
/* tcp_recv.c */
|
||||
int __init rds_tcp_recv_init(void);
|
||||
void rds_tcp_recv_exit(void);
|
||||
void rds_tcp_data_ready(struct sock *sk, int bytes);
|
||||
int rds_tcp_recv(struct rds_connection *conn);
|
||||
void rds_tcp_inc_purge(struct rds_incoming *inc);
|
||||
void rds_tcp_inc_free(struct rds_incoming *inc);
|
||||
int rds_tcp_inc_copy_to_user(struct rds_incoming *inc, struct iovec *iov,
|
||||
size_t size);
|
||||
|
||||
/* tcp_send.c */
|
||||
void rds_tcp_xmit_prepare(struct rds_connection *conn);
|
||||
void rds_tcp_xmit_complete(struct rds_connection *conn);
|
||||
int rds_tcp_xmit(struct rds_connection *conn, struct rds_message *rm,
|
||||
unsigned int hdr_off, unsigned int sg, unsigned int off);
|
||||
void rds_tcp_write_space(struct sock *sk);
|
||||
int rds_tcp_xmit_cong_map(struct rds_connection *conn,
|
||||
struct rds_cong_map *map, unsigned long offset);
|
||||
|
||||
/* tcp_stats.c */
|
||||
DECLARE_PER_CPU(struct rds_tcp_statistics, rds_tcp_stats);
|
||||
#define rds_tcp_stats_inc(member) rds_stats_inc_which(rds_tcp_stats, member)
|
||||
unsigned int rds_tcp_stats_info_copy(struct rds_info_iterator *iter,
|
||||
unsigned int avail);
|
||||
|
||||
#endif
|
153
net/rds/tcp_connect.c
Normal file
153
net/rds/tcp_connect.c
Normal file
|
@ -0,0 +1,153 @@
|
|||
/*
|
||||
* Copyright (c) 2006 Oracle. All rights reserved.
|
||||
*
|
||||
* This software is available to you under a choice of one of two
|
||||
* licenses. You may choose to be licensed under the terms of the GNU
|
||||
* General Public License (GPL) Version 2, available from the file
|
||||
* COPYING in the main directory of this source tree, or the
|
||||
* OpenIB.org BSD license below:
|
||||
*
|
||||
* Redistribution and use in source and binary forms, with or
|
||||
* without modification, are permitted provided that the following
|
||||
* conditions are met:
|
||||
*
|
||||
* - Redistributions of source code must retain the above
|
||||
* copyright notice, this list of conditions and the following
|
||||
* disclaimer.
|
||||
*
|
||||
* - Redistributions in binary form must reproduce the above
|
||||
* copyright notice, this list of conditions and the following
|
||||
* disclaimer in the documentation and/or other materials
|
||||
* provided with the distribution.
|
||||
*
|
||||
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
|
||||
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
|
||||
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
|
||||
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
|
||||
* BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
|
||||
* ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
|
||||
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
* SOFTWARE.
|
||||
*
|
||||
*/
|
||||
#include <linux/kernel.h>
|
||||
#include <linux/in.h>
|
||||
#include <net/tcp.h>
|
||||
|
||||
#include "rds.h"
|
||||
#include "tcp.h"
|
||||
|
||||
void rds_tcp_state_change(struct sock *sk)
|
||||
{
|
||||
void (*state_change)(struct sock *sk);
|
||||
struct rds_connection *conn;
|
||||
struct rds_tcp_connection *tc;
|
||||
|
||||
read_lock(&sk->sk_callback_lock);
|
||||
conn = sk->sk_user_data;
|
||||
if (conn == NULL) {
|
||||
state_change = sk->sk_state_change;
|
||||
goto out;
|
||||
}
|
||||
tc = conn->c_transport_data;
|
||||
state_change = tc->t_orig_state_change;
|
||||
|
||||
rdsdebug("sock %p state_change to %d\n", tc->t_sock, sk->sk_state);
|
||||
|
||||
switch(sk->sk_state) {
|
||||
/* ignore connecting sockets as they make progress */
|
||||
case TCP_SYN_SENT:
|
||||
case TCP_SYN_RECV:
|
||||
break;
|
||||
case TCP_ESTABLISHED:
|
||||
rds_connect_complete(conn);
|
||||
break;
|
||||
case TCP_CLOSE:
|
||||
rds_conn_drop(conn);
|
||||
default:
|
||||
break;
|
||||
}
|
||||
out:
|
||||
read_unlock(&sk->sk_callback_lock);
|
||||
state_change(sk);
|
||||
}
|
||||
|
||||
int rds_tcp_conn_connect(struct rds_connection *conn)
|
||||
{
|
||||
struct socket *sock = NULL;
|
||||
struct sockaddr_in src, dest;
|
||||
int ret;
|
||||
|
||||
ret = sock_create(PF_INET, SOCK_STREAM, IPPROTO_TCP, &sock);
|
||||
if (ret < 0)
|
||||
goto out;
|
||||
|
||||
rds_tcp_tune(sock);
|
||||
|
||||
src.sin_family = AF_INET;
|
||||
src.sin_addr.s_addr = (__force u32)conn->c_laddr;
|
||||
src.sin_port = (__force u16)htons(0);
|
||||
|
||||
ret = sock->ops->bind(sock, (struct sockaddr *)&src, sizeof(src));
|
||||
if (ret) {
|
||||
rdsdebug("bind failed with %d at address %u.%u.%u.%u\n",
|
||||
ret, NIPQUAD(conn->c_laddr));
|
||||
goto out;
|
||||
}
|
||||
|
||||
dest.sin_family = AF_INET;
|
||||
dest.sin_addr.s_addr = (__force u32)conn->c_faddr;
|
||||
dest.sin_port = (__force u16)htons(RDS_TCP_PORT);
|
||||
|
||||
/*
|
||||
* once we call connect() we can start getting callbacks and they
|
||||
* own the socket
|
||||
*/
|
||||
rds_tcp_set_callbacks(sock, conn);
|
||||
ret = sock->ops->connect(sock, (struct sockaddr *)&dest, sizeof(dest),
|
||||
O_NONBLOCK);
|
||||
sock = NULL;
|
||||
|
||||
rdsdebug("connect to address %u.%u.%u.%u returned %d\n",
|
||||
NIPQUAD(conn->c_faddr), ret);
|
||||
if (ret == -EINPROGRESS)
|
||||
ret = 0;
|
||||
|
||||
out:
|
||||
if (sock)
|
||||
sock_release(sock);
|
||||
return ret;
|
||||
}
|
||||
|
||||
/*
|
||||
* Before killing the tcp socket this needs to serialize with callbacks. The
|
||||
* caller has already grabbed the sending sem so we're serialized with other
|
||||
* senders.
|
||||
*
|
||||
* TCP calls the callbacks with the sock lock so we hold it while we reset the
|
||||
* callbacks to those set by TCP. Our callbacks won't execute again once we
|
||||
* hold the sock lock.
|
||||
*/
|
||||
void rds_tcp_conn_shutdown(struct rds_connection *conn)
|
||||
{
|
||||
struct rds_tcp_connection *tc = conn->c_transport_data;
|
||||
struct socket *sock = tc->t_sock;
|
||||
|
||||
rdsdebug("shutting down conn %p tc %p sock %p\n", conn, tc, sock);
|
||||
|
||||
if (sock) {
|
||||
sock->ops->shutdown(sock, RCV_SHUTDOWN | SEND_SHUTDOWN);
|
||||
lock_sock(sock->sk);
|
||||
rds_tcp_restore_callbacks(sock, tc); /* tc->tc_sock = NULL */
|
||||
|
||||
release_sock(sock->sk);
|
||||
sock_release(sock);
|
||||
};
|
||||
|
||||
if (tc->t_tinc) {
|
||||
rds_inc_put(&tc->t_tinc->ti_inc);
|
||||
tc->t_tinc = NULL;
|
||||
}
|
||||
tc->t_tinc_hdr_rem = sizeof(struct rds_header);
|
||||
tc->t_tinc_data_rem = 0;
|
||||
}
|
199
net/rds/tcp_listen.c
Normal file
199
net/rds/tcp_listen.c
Normal file
|
@ -0,0 +1,199 @@
|
|||
/*
|
||||
* Copyright (c) 2006 Oracle. All rights reserved.
|
||||
*
|
||||
* This software is available to you under a choice of one of two
|
||||
* licenses. You may choose to be licensed under the terms of the GNU
|
||||
* General Public License (GPL) Version 2, available from the file
|
||||
* COPYING in the main directory of this source tree, or the
|
||||
* OpenIB.org BSD license below:
|
||||
*
|
||||
* Redistribution and use in source and binary forms, with or
|
||||
* without modification, are permitted provided that the following
|
||||
* conditions are met:
|
||||
*
|
||||
* - Redistributions of source code must retain the above
|
||||
* copyright notice, this list of conditions and the following
|
||||
* disclaimer.
|
||||
*
|
||||
* - Redistributions in binary form must reproduce the above
|
||||
* copyright notice, this list of conditions and the following
|
||||
* disclaimer in the documentation and/or other materials
|
||||
* provided with the distribution.
|
||||
*
|
||||
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
|
||||
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
|
||||
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
|
||||
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
|
||||
* BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
|
||||
* ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
|
||||
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
* SOFTWARE.
|
||||
*
|
||||
*/
|
||||
#include <linux/kernel.h>
|
||||
#include <linux/in.h>
|
||||
#include <net/tcp.h>
|
||||
|
||||
#include "rds.h"
|
||||
#include "tcp.h"
|
||||
|
||||
/*
|
||||
* cheesy, but simple..
|
||||
*/
|
||||
static void rds_tcp_accept_worker(struct work_struct *work);
|
||||
static DECLARE_WORK(rds_tcp_listen_work, rds_tcp_accept_worker);
|
||||
static struct socket *rds_tcp_listen_sock;
|
||||
|
||||
static int rds_tcp_accept_one(struct socket *sock)
|
||||
{
|
||||
struct socket *new_sock = NULL;
|
||||
struct rds_connection *conn;
|
||||
int ret;
|
||||
struct inet_sock *inet;
|
||||
|
||||
ret = sock_create_lite(sock->sk->sk_family, sock->sk->sk_type,
|
||||
sock->sk->sk_protocol, &new_sock);
|
||||
if (ret)
|
||||
goto out;
|
||||
|
||||
new_sock->type = sock->type;
|
||||
new_sock->ops = sock->ops;
|
||||
ret = sock->ops->accept(sock, new_sock, O_NONBLOCK);
|
||||
if (ret < 0)
|
||||
goto out;
|
||||
|
||||
rds_tcp_tune(new_sock);
|
||||
|
||||
inet = inet_sk(new_sock->sk);
|
||||
|
||||
rdsdebug("accepted tcp %u.%u.%u.%u:%u -> %u.%u.%u.%u:%u\n",
|
||||
NIPQUAD(inet->saddr), ntohs(inet->sport),
|
||||
NIPQUAD(inet->daddr), ntohs(inet->dport));
|
||||
|
||||
conn = rds_conn_create(inet->saddr, inet->daddr, &rds_tcp_transport,
|
||||
GFP_KERNEL);
|
||||
if (IS_ERR(conn)) {
|
||||
ret = PTR_ERR(conn);
|
||||
goto out;
|
||||
}
|
||||
|
||||
/*
|
||||
* see the comment above rds_queue_delayed_reconnect()
|
||||
*/
|
||||
if (!rds_conn_transition(conn, RDS_CONN_DOWN, RDS_CONN_CONNECTING)) {
|
||||
if (rds_conn_state(conn) == RDS_CONN_UP)
|
||||
rds_tcp_stats_inc(s_tcp_listen_closed_stale);
|
||||
else
|
||||
rds_tcp_stats_inc(s_tcp_connect_raced);
|
||||
rds_conn_drop(conn);
|
||||
ret = 0;
|
||||
goto out;
|
||||
}
|
||||
|
||||
rds_tcp_set_callbacks(new_sock, conn);
|
||||
rds_connect_complete(conn);
|
||||
new_sock = NULL;
|
||||
ret = 0;
|
||||
|
||||
out:
|
||||
if (new_sock)
|
||||
sock_release(new_sock);
|
||||
return ret;
|
||||
}
|
||||
|
||||
static void rds_tcp_accept_worker(struct work_struct *work)
|
||||
{
|
||||
while (rds_tcp_accept_one(rds_tcp_listen_sock) == 0)
|
||||
cond_resched();
|
||||
}
|
||||
|
||||
void rds_tcp_listen_data_ready(struct sock *sk, int bytes)
|
||||
{
|
||||
void (*ready)(struct sock *sk, int bytes);
|
||||
|
||||
rdsdebug("listen data ready sk %p\n", sk);
|
||||
|
||||
read_lock(&sk->sk_callback_lock);
|
||||
ready = sk->sk_user_data;
|
||||
if (ready == NULL) { /* check for teardown race */
|
||||
ready = sk->sk_data_ready;
|
||||
goto out;
|
||||
}
|
||||
|
||||
/*
|
||||
* ->sk_data_ready is also called for a newly established child socket
|
||||
* before it has been accepted and the accepter has set up their
|
||||
* data_ready.. we only want to queue listen work for our listening
|
||||
* socket
|
||||
*/
|
||||
if (sk->sk_state == TCP_LISTEN)
|
||||
queue_work(rds_wq, &rds_tcp_listen_work);
|
||||
|
||||
out:
|
||||
read_unlock(&sk->sk_callback_lock);
|
||||
ready(sk, bytes);
|
||||
}
|
||||
|
||||
int __init rds_tcp_listen_init(void)
|
||||
{
|
||||
struct sockaddr_in sin;
|
||||
struct socket *sock = NULL;
|
||||
int ret;
|
||||
|
||||
ret = sock_create(PF_INET, SOCK_STREAM, IPPROTO_TCP, &sock);
|
||||
if (ret < 0)
|
||||
goto out;
|
||||
|
||||
sock->sk->sk_reuse = 1;
|
||||
rds_tcp_nonagle(sock);
|
||||
|
||||
write_lock_bh(&sock->sk->sk_callback_lock);
|
||||
sock->sk->sk_user_data = sock->sk->sk_data_ready;
|
||||
sock->sk->sk_data_ready = rds_tcp_listen_data_ready;
|
||||
write_unlock_bh(&sock->sk->sk_callback_lock);
|
||||
|
||||
sin.sin_family = PF_INET,
|
||||
sin.sin_addr.s_addr = (__force u32)htonl(INADDR_ANY);
|
||||
sin.sin_port = (__force u16)htons(RDS_TCP_PORT);
|
||||
|
||||
ret = sock->ops->bind(sock, (struct sockaddr *)&sin, sizeof(sin));
|
||||
if (ret < 0)
|
||||
goto out;
|
||||
|
||||
ret = sock->ops->listen(sock, 64);
|
||||
if (ret < 0)
|
||||
goto out;
|
||||
|
||||
rds_tcp_listen_sock = sock;
|
||||
sock = NULL;
|
||||
out:
|
||||
if (sock)
|
||||
sock_release(sock);
|
||||
return ret;
|
||||
}
|
||||
|
||||
void rds_tcp_listen_stop(void)
|
||||
{
|
||||
struct socket *sock = rds_tcp_listen_sock;
|
||||
struct sock *sk;
|
||||
|
||||
if (sock == NULL)
|
||||
return;
|
||||
|
||||
sk = sock->sk;
|
||||
|
||||
/* serialize with and prevent further callbacks */
|
||||
lock_sock(sk);
|
||||
write_lock_bh(&sk->sk_callback_lock);
|
||||
if (sk->sk_user_data) {
|
||||
sk->sk_data_ready = sk->sk_user_data;
|
||||
sk->sk_user_data = NULL;
|
||||
}
|
||||
write_unlock_bh(&sk->sk_callback_lock);
|
||||
release_sock(sk);
|
||||
|
||||
/* wait for accepts to stop and close the socket */
|
||||
flush_workqueue(rds_wq);
|
||||
sock_release(sock);
|
||||
rds_tcp_listen_sock = NULL;
|
||||
}
|
356
net/rds/tcp_recv.c
Normal file
356
net/rds/tcp_recv.c
Normal file
|
@ -0,0 +1,356 @@
|
|||
/*
|
||||
* Copyright (c) 2006 Oracle. All rights reserved.
|
||||
*
|
||||
* This software is available to you under a choice of one of two
|
||||
* licenses. You may choose to be licensed under the terms of the GNU
|
||||
* General Public License (GPL) Version 2, available from the file
|
||||
* COPYING in the main directory of this source tree, or the
|
||||
* OpenIB.org BSD license below:
|
||||
*
|
||||
* Redistribution and use in source and binary forms, with or
|
||||
* without modification, are permitted provided that the following
|
||||
* conditions are met:
|
||||
*
|
||||
* - Redistributions of source code must retain the above
|
||||
* copyright notice, this list of conditions and the following
|
||||
* disclaimer.
|
||||
*
|
||||
* - Redistributions in binary form must reproduce the above
|
||||
* copyright notice, this list of conditions and the following
|
||||
* disclaimer in the documentation and/or other materials
|
||||
* provided with the distribution.
|
||||
*
|
||||
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
|
||||
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
|
||||
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
|
||||
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
|
||||
* BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
|
||||
* ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
|
||||
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
* SOFTWARE.
|
||||
*
|
||||
*/
|
||||
#include <linux/kernel.h>
|
||||
#include <net/tcp.h>
|
||||
|
||||
#include "rds.h"
|
||||
#include "tcp.h"
|
||||
|
||||
static struct kmem_cache *rds_tcp_incoming_slab;
|
||||
|
||||
void rds_tcp_inc_purge(struct rds_incoming *inc)
|
||||
{
|
||||
struct rds_tcp_incoming *tinc;
|
||||
tinc = container_of(inc, struct rds_tcp_incoming, ti_inc);
|
||||
rdsdebug("purging tinc %p inc %p\n", tinc, inc);
|
||||
skb_queue_purge(&tinc->ti_skb_list);
|
||||
}
|
||||
|
||||
void rds_tcp_inc_free(struct rds_incoming *inc)
|
||||
{
|
||||
struct rds_tcp_incoming *tinc;
|
||||
tinc = container_of(inc, struct rds_tcp_incoming, ti_inc);
|
||||
rds_tcp_inc_purge(inc);
|
||||
rdsdebug("freeing tinc %p inc %p\n", tinc, inc);
|
||||
kmem_cache_free(rds_tcp_incoming_slab, tinc);
|
||||
}
|
||||
|
||||
/*
|
||||
* this is pretty lame, but, whatever.
|
||||
*/
|
||||
int rds_tcp_inc_copy_to_user(struct rds_incoming *inc, struct iovec *first_iov,
|
||||
size_t size)
|
||||
{
|
||||
struct rds_tcp_incoming *tinc;
|
||||
struct iovec *iov, tmp;
|
||||
struct sk_buff *skb;
|
||||
unsigned long to_copy, skb_off;
|
||||
int ret = 0;
|
||||
|
||||
if (size == 0)
|
||||
goto out;
|
||||
|
||||
tinc = container_of(inc, struct rds_tcp_incoming, ti_inc);
|
||||
iov = first_iov;
|
||||
tmp = *iov;
|
||||
|
||||
skb_queue_walk(&tinc->ti_skb_list, skb) {
|
||||
skb_off = 0;
|
||||
while (skb_off < skb->len) {
|
||||
while (tmp.iov_len == 0) {
|
||||
iov++;
|
||||
tmp = *iov;
|
||||
}
|
||||
|
||||
to_copy = min(tmp.iov_len, size);
|
||||
to_copy = min(to_copy, skb->len - skb_off);
|
||||
|
||||
rdsdebug("ret %d size %zu skb %p skb_off %lu "
|
||||
"skblen %d iov_base %p iov_len %zu cpy %lu\n",
|
||||
ret, size, skb, skb_off, skb->len,
|
||||
tmp.iov_base, tmp.iov_len, to_copy);
|
||||
|
||||
/* modifies tmp as it copies */
|
||||
if (skb_copy_datagram_iovec(skb, skb_off, &tmp,
|
||||
to_copy)) {
|
||||
ret = -EFAULT;
|
||||
goto out;
|
||||
}
|
||||
|
||||
size -= to_copy;
|
||||
ret += to_copy;
|
||||
skb_off += to_copy;
|
||||
if (size == 0)
|
||||
goto out;
|
||||
}
|
||||
}
|
||||
out:
|
||||
return ret;
|
||||
}
|
||||
|
||||
/*
|
||||
* We have a series of skbs that have fragmented pieces of the congestion
|
||||
* bitmap. They must add up to the exact size of the congestion bitmap. We
|
||||
* use the skb helpers to copy those into the pages that make up the in-memory
|
||||
* congestion bitmap for the remote address of this connection. We then tell
|
||||
* the congestion core that the bitmap has been changed so that it can wake up
|
||||
* sleepers.
|
||||
*
|
||||
* This is racing with sending paths which are using test_bit to see if the
|
||||
* bitmap indicates that their recipient is congested.
|
||||
*/
|
||||
|
||||
static void rds_tcp_cong_recv(struct rds_connection *conn,
|
||||
struct rds_tcp_incoming *tinc)
|
||||
{
|
||||
struct sk_buff *skb;
|
||||
unsigned int to_copy, skb_off;
|
||||
unsigned int map_off;
|
||||
unsigned int map_page;
|
||||
struct rds_cong_map *map;
|
||||
int ret;
|
||||
|
||||
/* catch completely corrupt packets */
|
||||
if (be32_to_cpu(tinc->ti_inc.i_hdr.h_len) != RDS_CONG_MAP_BYTES)
|
||||
return;
|
||||
|
||||
map_page = 0;
|
||||
map_off = 0;
|
||||
map = conn->c_fcong;
|
||||
|
||||
skb_queue_walk(&tinc->ti_skb_list, skb) {
|
||||
skb_off = 0;
|
||||
while (skb_off < skb->len) {
|
||||
to_copy = min_t(unsigned int, PAGE_SIZE - map_off,
|
||||
skb->len - skb_off);
|
||||
|
||||
BUG_ON(map_page >= RDS_CONG_MAP_PAGES);
|
||||
|
||||
/* only returns 0 or -error */
|
||||
ret = skb_copy_bits(skb, skb_off,
|
||||
(void *)map->m_page_addrs[map_page] + map_off,
|
||||
to_copy);
|
||||
BUG_ON(ret != 0);
|
||||
|
||||
skb_off += to_copy;
|
||||
map_off += to_copy;
|
||||
if (map_off == PAGE_SIZE) {
|
||||
map_off = 0;
|
||||
map_page++;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
rds_cong_map_updated(map, ~(u64) 0);
|
||||
}
|
||||
|
||||
struct rds_tcp_desc_arg {
|
||||
struct rds_connection *conn;
|
||||
gfp_t gfp;
|
||||
enum km_type km;
|
||||
};
|
||||
|
||||
static int rds_tcp_data_recv(read_descriptor_t *desc, struct sk_buff *skb,
|
||||
unsigned int offset, size_t len)
|
||||
{
|
||||
struct rds_tcp_desc_arg *arg = desc->arg.data;
|
||||
struct rds_connection *conn = arg->conn;
|
||||
struct rds_tcp_connection *tc = conn->c_transport_data;
|
||||
struct rds_tcp_incoming *tinc = tc->t_tinc;
|
||||
struct sk_buff *clone;
|
||||
size_t left = len, to_copy;
|
||||
|
||||
rdsdebug("tcp data tc %p skb %p offset %u len %zu\n", tc, skb, offset,
|
||||
len);
|
||||
|
||||
/*
|
||||
* tcp_read_sock() interprets partial progress as an indication to stop
|
||||
* processing.
|
||||
*/
|
||||
while (left) {
|
||||
if (tinc == NULL) {
|
||||
tinc = kmem_cache_alloc(rds_tcp_incoming_slab,
|
||||
arg->gfp);
|
||||
if (tinc == NULL) {
|
||||
desc->error = -ENOMEM;
|
||||
goto out;
|
||||
}
|
||||
tc->t_tinc = tinc;
|
||||
rdsdebug("alloced tinc %p\n", tinc);
|
||||
rds_inc_init(&tinc->ti_inc, conn, conn->c_faddr);
|
||||
/*
|
||||
* XXX * we might be able to use the __ variants when
|
||||
* we've already serialized at a higher level.
|
||||
*/
|
||||
skb_queue_head_init(&tinc->ti_skb_list);
|
||||
}
|
||||
|
||||
if (left && tc->t_tinc_hdr_rem) {
|
||||
to_copy = min(tc->t_tinc_hdr_rem, left);
|
||||
rdsdebug("copying %zu header from skb %p\n", to_copy,
|
||||
skb);
|
||||
skb_copy_bits(skb, offset,
|
||||
(char *)&tinc->ti_inc.i_hdr +
|
||||
sizeof(struct rds_header) -
|
||||
tc->t_tinc_hdr_rem,
|
||||
to_copy);
|
||||
tc->t_tinc_hdr_rem -= to_copy;
|
||||
left -= to_copy;
|
||||
offset += to_copy;
|
||||
|
||||
if (tc->t_tinc_hdr_rem == 0) {
|
||||
/* could be 0 for a 0 len message */
|
||||
tc->t_tinc_data_rem =
|
||||
be32_to_cpu(tinc->ti_inc.i_hdr.h_len);
|
||||
}
|
||||
}
|
||||
|
||||
if (left && tc->t_tinc_data_rem) {
|
||||
clone = skb_clone(skb, arg->gfp);
|
||||
if (clone == NULL) {
|
||||
desc->error = -ENOMEM;
|
||||
goto out;
|
||||
}
|
||||
|
||||
to_copy = min(tc->t_tinc_data_rem, left);
|
||||
pskb_pull(clone, offset);
|
||||
pskb_trim(clone, to_copy);
|
||||
skb_queue_tail(&tinc->ti_skb_list, clone);
|
||||
|
||||
rdsdebug("skb %p data %p len %d off %u to_copy %zu -> "
|
||||
"clone %p data %p len %d\n",
|
||||
skb, skb->data, skb->len, offset, to_copy,
|
||||
clone, clone->data, clone->len);
|
||||
|
||||
tc->t_tinc_data_rem -= to_copy;
|
||||
left -= to_copy;
|
||||
offset += to_copy;
|
||||
}
|
||||
|
||||
if (tc->t_tinc_hdr_rem == 0 && tc->t_tinc_data_rem == 0) {
|
||||
if (tinc->ti_inc.i_hdr.h_flags == RDS_FLAG_CONG_BITMAP)
|
||||
rds_tcp_cong_recv(conn, tinc);
|
||||
else
|
||||
rds_recv_incoming(conn, conn->c_faddr,
|
||||
conn->c_laddr, &tinc->ti_inc,
|
||||
arg->gfp, arg->km);
|
||||
|
||||
tc->t_tinc_hdr_rem = sizeof(struct rds_header);
|
||||
tc->t_tinc_data_rem = 0;
|
||||
tc->t_tinc = NULL;
|
||||
rds_inc_put(&tinc->ti_inc);
|
||||
tinc = NULL;
|
||||
}
|
||||
}
|
||||
out:
|
||||
rdsdebug("returning len %zu left %zu skb len %d rx queue depth %d\n",
|
||||
len, left, skb->len,
|
||||
skb_queue_len(&tc->t_sock->sk->sk_receive_queue));
|
||||
return len - left;
|
||||
}
|
||||
|
||||
/* the caller has to hold the sock lock */
|
||||
int rds_tcp_read_sock(struct rds_connection *conn, gfp_t gfp, enum km_type km)
|
||||
{
|
||||
struct rds_tcp_connection *tc = conn->c_transport_data;
|
||||
struct socket *sock = tc->t_sock;
|
||||
read_descriptor_t desc;
|
||||
struct rds_tcp_desc_arg arg;
|
||||
|
||||
/* It's like glib in the kernel! */
|
||||
arg.conn = conn;
|
||||
arg.gfp = gfp;
|
||||
arg.km = km;
|
||||
desc.arg.data = &arg;
|
||||
desc.error = 0;
|
||||
desc.count = 1; /* give more than one skb per call */
|
||||
|
||||
tcp_read_sock(sock->sk, &desc, rds_tcp_data_recv);
|
||||
rdsdebug("tcp_read_sock for tc %p gfp 0x%x returned %d\n", tc, gfp,
|
||||
desc.error);
|
||||
|
||||
return desc.error;
|
||||
}
|
||||
|
||||
/*
|
||||
* We hold the sock lock to serialize our rds_tcp_recv->tcp_read_sock from
|
||||
* data_ready.
|
||||
*
|
||||
* if we fail to allocate we're in trouble.. blindly wait some time before
|
||||
* trying again to see if the VM can free up something for us.
|
||||
*/
|
||||
int rds_tcp_recv(struct rds_connection *conn)
|
||||
{
|
||||
struct rds_tcp_connection *tc = conn->c_transport_data;
|
||||
struct socket *sock = tc->t_sock;
|
||||
int ret = 0;
|
||||
|
||||
rdsdebug("recv worker conn %p tc %p sock %p\n", conn, tc, sock);
|
||||
|
||||
lock_sock(sock->sk);
|
||||
ret = rds_tcp_read_sock(conn, GFP_KERNEL, KM_USER0);
|
||||
release_sock(sock->sk);
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
void rds_tcp_data_ready(struct sock *sk, int bytes)
|
||||
{
|
||||
void (*ready)(struct sock *sk, int bytes);
|
||||
struct rds_connection *conn;
|
||||
struct rds_tcp_connection *tc;
|
||||
|
||||
rdsdebug("data ready sk %p bytes %d\n", sk, bytes);
|
||||
|
||||
read_lock(&sk->sk_callback_lock);
|
||||
conn = sk->sk_user_data;
|
||||
if (conn == NULL) { /* check for teardown race */
|
||||
ready = sk->sk_data_ready;
|
||||
goto out;
|
||||
}
|
||||
|
||||
tc = conn->c_transport_data;
|
||||
ready = tc->t_orig_data_ready;
|
||||
rds_tcp_stats_inc(s_tcp_data_ready_calls);
|
||||
|
||||
if (rds_tcp_read_sock(conn, GFP_ATOMIC, KM_SOFTIRQ0) == -ENOMEM)
|
||||
queue_delayed_work(rds_wq, &conn->c_recv_w, 0);
|
||||
out:
|
||||
read_unlock(&sk->sk_callback_lock);
|
||||
ready(sk, bytes);
|
||||
}
|
||||
|
||||
int __init rds_tcp_recv_init(void)
|
||||
{
|
||||
rds_tcp_incoming_slab = kmem_cache_create("rds_tcp_incoming",
|
||||
sizeof(struct rds_tcp_incoming),
|
||||
0, 0, NULL);
|
||||
if (rds_tcp_incoming_slab == NULL)
|
||||
return -ENOMEM;
|
||||
return 0;
|
||||
}
|
||||
|
||||
void rds_tcp_recv_exit(void)
|
||||
{
|
||||
kmem_cache_destroy(rds_tcp_incoming_slab);
|
||||
}
|
263
net/rds/tcp_send.c
Normal file
263
net/rds/tcp_send.c
Normal file
|
@ -0,0 +1,263 @@
|
|||
/*
|
||||
* Copyright (c) 2006 Oracle. All rights reserved.
|
||||
*
|
||||
* This software is available to you under a choice of one of two
|
||||
* licenses. You may choose to be licensed under the terms of the GNU
|
||||
* General Public License (GPL) Version 2, available from the file
|
||||
* COPYING in the main directory of this source tree, or the
|
||||
* OpenIB.org BSD license below:
|
||||
*
|
||||
* Redistribution and use in source and binary forms, with or
|
||||
* without modification, are permitted provided that the following
|
||||
* conditions are met:
|
||||
*
|
||||
* - Redistributions of source code must retain the above
|
||||
* copyright notice, this list of conditions and the following
|
||||
* disclaimer.
|
||||
*
|
||||
* - Redistributions in binary form must reproduce the above
|
||||
* copyright notice, this list of conditions and the following
|
||||
* disclaimer in the documentation and/or other materials
|
||||
* provided with the distribution.
|
||||
*
|
||||
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
|
||||
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
|
||||
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
|
||||
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
|
||||
* BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
|
||||
* ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
|
||||
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
* SOFTWARE.
|
||||
*
|
||||
*/
|
||||
#include <linux/kernel.h>
|
||||
#include <linux/in.h>
|
||||
#include <net/tcp.h>
|
||||
|
||||
#include "rds.h"
|
||||
#include "tcp.h"
|
||||
|
||||
static void rds_tcp_cork(struct socket *sock, int val)
|
||||
{
|
||||
mm_segment_t oldfs;
|
||||
|
||||
oldfs = get_fs();
|
||||
set_fs(KERNEL_DS);
|
||||
sock->ops->setsockopt(sock, SOL_TCP, TCP_CORK, (char __user *)&val,
|
||||
sizeof(val));
|
||||
set_fs(oldfs);
|
||||
}
|
||||
|
||||
void rds_tcp_xmit_prepare(struct rds_connection *conn)
|
||||
{
|
||||
struct rds_tcp_connection *tc = conn->c_transport_data;
|
||||
|
||||
rds_tcp_cork(tc->t_sock, 1);
|
||||
}
|
||||
|
||||
void rds_tcp_xmit_complete(struct rds_connection *conn)
|
||||
{
|
||||
struct rds_tcp_connection *tc = conn->c_transport_data;
|
||||
|
||||
rds_tcp_cork(tc->t_sock, 0);
|
||||
}
|
||||
|
||||
/* the core send_sem serializes this with other xmit and shutdown */
|
||||
int rds_tcp_sendmsg(struct socket *sock, void *data, unsigned int len)
|
||||
{
|
||||
struct kvec vec = {
|
||||
.iov_base = data,
|
||||
.iov_len = len,
|
||||
};
|
||||
struct msghdr msg = {
|
||||
.msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL,
|
||||
};
|
||||
|
||||
return kernel_sendmsg(sock, &msg, &vec, 1, vec.iov_len);
|
||||
}
|
||||
|
||||
/* the core send_sem serializes this with other xmit and shutdown */
|
||||
int rds_tcp_xmit_cong_map(struct rds_connection *conn,
|
||||
struct rds_cong_map *map, unsigned long offset)
|
||||
{
|
||||
static struct rds_header rds_tcp_map_header = {
|
||||
.h_flags = RDS_FLAG_CONG_BITMAP,
|
||||
};
|
||||
struct rds_tcp_connection *tc = conn->c_transport_data;
|
||||
unsigned long i;
|
||||
int ret;
|
||||
int copied = 0;
|
||||
|
||||
/* Some problem claims cpu_to_be32(constant) isn't a constant. */
|
||||
rds_tcp_map_header.h_len = cpu_to_be32(RDS_CONG_MAP_BYTES);
|
||||
|
||||
if (offset < sizeof(struct rds_header)) {
|
||||
ret = rds_tcp_sendmsg(tc->t_sock,
|
||||
(void *)&rds_tcp_map_header + offset,
|
||||
sizeof(struct rds_header) - offset);
|
||||
if (ret <= 0)
|
||||
return ret;
|
||||
offset += ret;
|
||||
copied = ret;
|
||||
if (offset < sizeof(struct rds_header))
|
||||
return ret;
|
||||
}
|
||||
|
||||
offset -= sizeof(struct rds_header);
|
||||
i = offset / PAGE_SIZE;
|
||||
offset = offset % PAGE_SIZE;
|
||||
BUG_ON(i >= RDS_CONG_MAP_PAGES);
|
||||
|
||||
do {
|
||||
ret = tc->t_sock->ops->sendpage(tc->t_sock,
|
||||
virt_to_page(map->m_page_addrs[i]),
|
||||
offset, PAGE_SIZE - offset,
|
||||
MSG_DONTWAIT);
|
||||
if (ret <= 0)
|
||||
break;
|
||||
copied += ret;
|
||||
offset += ret;
|
||||
if (offset == PAGE_SIZE) {
|
||||
offset = 0;
|
||||
i++;
|
||||
}
|
||||
} while (i < RDS_CONG_MAP_PAGES);
|
||||
|
||||
return copied ? copied : ret;
|
||||
}
|
||||
|
||||
/* the core send_sem serializes this with other xmit and shutdown */
|
||||
int rds_tcp_xmit(struct rds_connection *conn, struct rds_message *rm,
|
||||
unsigned int hdr_off, unsigned int sg, unsigned int off)
|
||||
{
|
||||
struct rds_tcp_connection *tc = conn->c_transport_data;
|
||||
int done = 0;
|
||||
int ret = 0;
|
||||
|
||||
if (hdr_off == 0) {
|
||||
/*
|
||||
* m_ack_seq is set to the sequence number of the last byte of
|
||||
* header and data. see rds_tcp_is_acked().
|
||||
*/
|
||||
tc->t_last_sent_nxt = rds_tcp_snd_nxt(tc);
|
||||
rm->m_ack_seq = tc->t_last_sent_nxt +
|
||||
sizeof(struct rds_header) +
|
||||
be32_to_cpu(rm->m_inc.i_hdr.h_len) - 1;
|
||||
smp_mb__before_clear_bit();
|
||||
set_bit(RDS_MSG_HAS_ACK_SEQ, &rm->m_flags);
|
||||
tc->t_last_expected_una = rm->m_ack_seq + 1;
|
||||
|
||||
rdsdebug("rm %p tcp nxt %u ack_seq %llu\n",
|
||||
rm, rds_tcp_snd_nxt(tc),
|
||||
(unsigned long long)rm->m_ack_seq);
|
||||
}
|
||||
|
||||
if (hdr_off < sizeof(struct rds_header)) {
|
||||
/* see rds_tcp_write_space() */
|
||||
set_bit(SOCK_NOSPACE, &tc->t_sock->sk->sk_socket->flags);
|
||||
|
||||
ret = rds_tcp_sendmsg(tc->t_sock,
|
||||
(void *)&rm->m_inc.i_hdr + hdr_off,
|
||||
sizeof(rm->m_inc.i_hdr) - hdr_off);
|
||||
if (ret < 0)
|
||||
goto out;
|
||||
done += ret;
|
||||
if (hdr_off + done != sizeof(struct rds_header))
|
||||
goto out;
|
||||
}
|
||||
|
||||
while (sg < rm->m_nents) {
|
||||
ret = tc->t_sock->ops->sendpage(tc->t_sock,
|
||||
sg_page(&rm->m_sg[sg]),
|
||||
rm->m_sg[sg].offset + off,
|
||||
rm->m_sg[sg].length - off,
|
||||
MSG_DONTWAIT|MSG_NOSIGNAL);
|
||||
rdsdebug("tcp sendpage %p:%u:%u ret %d\n", (void *)sg_page(&rm->m_sg[sg]),
|
||||
rm->m_sg[sg].offset + off, rm->m_sg[sg].length - off,
|
||||
ret);
|
||||
if (ret <= 0)
|
||||
break;
|
||||
|
||||
off += ret;
|
||||
done += ret;
|
||||
if (off == rm->m_sg[sg].length) {
|
||||
off = 0;
|
||||
sg++;
|
||||
}
|
||||
}
|
||||
|
||||
out:
|
||||
if (ret <= 0) {
|
||||
/* write_space will hit after EAGAIN, all else fatal */
|
||||
if (ret == -EAGAIN) {
|
||||
rds_tcp_stats_inc(s_tcp_sndbuf_full);
|
||||
ret = 0;
|
||||
} else {
|
||||
printk(KERN_WARNING "RDS/tcp: send to %u.%u.%u.%u "
|
||||
"returned %d, disconnecting and reconnecting\n",
|
||||
NIPQUAD(conn->c_faddr), ret);
|
||||
rds_conn_drop(conn);
|
||||
}
|
||||
}
|
||||
if (done == 0)
|
||||
done = ret;
|
||||
return done;
|
||||
}
|
||||
|
||||
/*
|
||||
* rm->m_ack_seq is set to the tcp sequence number that corresponds to the
|
||||
* last byte of the message, including the header. This means that the
|
||||
* entire message has been received if rm->m_ack_seq is "before" the next
|
||||
* unacked byte of the TCP sequence space. We have to do very careful
|
||||
* wrapping 32bit comparisons here.
|
||||
*/
|
||||
static int rds_tcp_is_acked(struct rds_message *rm, uint64_t ack)
|
||||
{
|
||||
if (!test_bit(RDS_MSG_HAS_ACK_SEQ, &rm->m_flags))
|
||||
return 0;
|
||||
return (__s32)((u32)rm->m_ack_seq - (u32)ack) < 0;
|
||||
}
|
||||
|
||||
void rds_tcp_write_space(struct sock *sk)
|
||||
{
|
||||
void (*write_space)(struct sock *sk);
|
||||
struct rds_connection *conn;
|
||||
struct rds_tcp_connection *tc;
|
||||
|
||||
read_lock(&sk->sk_callback_lock);
|
||||
conn = sk->sk_user_data;
|
||||
if (conn == NULL) {
|
||||
write_space = sk->sk_write_space;
|
||||
goto out;
|
||||
}
|
||||
|
||||
tc = conn->c_transport_data;
|
||||
rdsdebug("write_space for tc %p\n", tc);
|
||||
write_space = tc->t_orig_write_space;
|
||||
rds_tcp_stats_inc(s_tcp_write_space_calls);
|
||||
|
||||
rdsdebug("tcp una %u\n", rds_tcp_snd_una(tc));
|
||||
tc->t_last_seen_una = rds_tcp_snd_una(tc);
|
||||
rds_send_drop_acked(conn, rds_tcp_snd_una(tc), rds_tcp_is_acked);
|
||||
|
||||
queue_delayed_work(rds_wq, &conn->c_send_w, 0);
|
||||
out:
|
||||
read_unlock(&sk->sk_callback_lock);
|
||||
|
||||
/*
|
||||
* write_space is only called when data leaves tcp's send queue if
|
||||
* SOCK_NOSPACE is set. We set SOCK_NOSPACE every time we put
|
||||
* data in tcp's send queue because we use write_space to parse the
|
||||
* sequence numbers and notice that rds messages have been fully
|
||||
* received.
|
||||
*
|
||||
* tcp's write_space clears SOCK_NOSPACE if the send queue has more
|
||||
* than a certain amount of space. So we need to set it again *after*
|
||||
* we call tcp's write_space or else we might only get called on the
|
||||
* first of a series of incoming tcp acks.
|
||||
*/
|
||||
write_space(sk);
|
||||
|
||||
if (sk->sk_socket)
|
||||
set_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
|
||||
}
|
74
net/rds/tcp_stats.c
Normal file
74
net/rds/tcp_stats.c
Normal file
|
@ -0,0 +1,74 @@
|
|||
/*
|
||||
* Copyright (c) 2006 Oracle. All rights reserved.
|
||||
*
|
||||
* This software is available to you under a choice of one of two
|
||||
* licenses. You may choose to be licensed under the terms of the GNU
|
||||
* General Public License (GPL) Version 2, available from the file
|
||||
* COPYING in the main directory of this source tree, or the
|
||||
* OpenIB.org BSD license below:
|
||||
*
|
||||
* Redistribution and use in source and binary forms, with or
|
||||
* without modification, are permitted provided that the following
|
||||
* conditions are met:
|
||||
*
|
||||
* - Redistributions of source code must retain the above
|
||||
* copyright notice, this list of conditions and the following
|
||||
* disclaimer.
|
||||
*
|
||||
* - Redistributions in binary form must reproduce the above
|
||||
* copyright notice, this list of conditions and the following
|
||||
* disclaimer in the documentation and/or other materials
|
||||
* provided with the distribution.
|
||||
*
|
||||
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
|
||||
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
|
||||
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
|
||||
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
|
||||
* BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
|
||||
* ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
|
||||
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
* SOFTWARE.
|
||||
*
|
||||
*/
|
||||
#include <linux/percpu.h>
|
||||
#include <linux/seq_file.h>
|
||||
#include <linux/proc_fs.h>
|
||||
|
||||
#include "rds.h"
|
||||
#include "tcp.h"
|
||||
|
||||
DEFINE_PER_CPU(struct rds_tcp_statistics, rds_tcp_stats)
|
||||
____cacheline_aligned;
|
||||
|
||||
static const char const *rds_tcp_stat_names[] = {
|
||||
"tcp_data_ready_calls",
|
||||
"tcp_write_space_calls",
|
||||
"tcp_sndbuf_full",
|
||||
"tcp_connect_raced",
|
||||
"tcp_listen_closed_stale",
|
||||
};
|
||||
|
||||
unsigned int rds_tcp_stats_info_copy(struct rds_info_iterator *iter,
|
||||
unsigned int avail)
|
||||
{
|
||||
struct rds_tcp_statistics stats = {0, };
|
||||
uint64_t *src;
|
||||
uint64_t *sum;
|
||||
size_t i;
|
||||
int cpu;
|
||||
|
||||
if (avail < ARRAY_SIZE(rds_tcp_stat_names))
|
||||
goto out;
|
||||
|
||||
for_each_online_cpu(cpu) {
|
||||
src = (uint64_t *)&(per_cpu(rds_tcp_stats, cpu));
|
||||
sum = (uint64_t *)&stats;
|
||||
for (i = 0; i < sizeof(stats) / sizeof(uint64_t); i++)
|
||||
*(sum++) += *(src++);
|
||||
}
|
||||
|
||||
rds_stats_info_copy(iter, (uint64_t *)&stats, rds_tcp_stat_names,
|
||||
ARRAY_SIZE(rds_tcp_stat_names));
|
||||
out:
|
||||
return ARRAY_SIZE(rds_tcp_stat_names);
|
||||
}
|
Loading…
Reference in a new issue