diff options
| author | Timo Dritschler <timo.dritschler@kit.edu> | 2014-12-10 16:23:49 +0100 | 
|---|---|---|
| committer | Timo Dritschler <timo.dritschler@kit.edu> | 2014-12-10 16:23:49 +0100 | 
| commit | 8d50558755b93f26108313e40e92a8ae457864ab (patch) | |
| tree | 7190a92e2bc2f13e3b20fc85caea10f349cfee37 | |
| parent | 9f0b6da7cf20f085d2729e5433f85ffa60a6fd94 (diff) | |
| parent | 0dc8c19937b52dfb793672226183697b6987b9fe (diff) | |
Merge pull request #14 from ufo-kit/communicationHandling
Release Version 2 (0.2.0)
Added kiro_client_ping_server to KIRO client
| -rw-r--r-- | CMakeLists.txt | 4 | ||||
| -rw-r--r-- | src/kiro-client.c | 274 | ||||
| -rw-r--r-- | src/kiro-client.h | 13 | ||||
| -rw-r--r-- | src/kiro-rdma.h | 12 | ||||
| -rw-r--r-- | src/kiro-server.c | 163 | ||||
| -rw-r--r-- | test/test-client-latency.c | 32 | 
6 files changed, 402 insertions, 96 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt index a2491de..31c74f9 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -5,9 +5,9 @@ set(CMAKE_INCLUDE_CURRENT_DIR TRUE)  set(TARNAME "kiro")  set(LIBKIRO_VERSION_MAJOR "0") -set(LIBKIRO_VERSION_MINOR "1") +set(LIBKIRO_VERSION_MINOR "2")  set(LIBKIRO_VERSION_PATCH "0") -set(LIBKIRO_VERSION_RELEASE "1") +set(LIBKIRO_VERSION_RELEASE "2")  set(LIBKIRO_VERSION_STRING "${LIBKIRO_VERSION_MAJOR}.${LIBKIRO_VERSION_MINOR}.${LIBKIRO_VERSION_PATCH}")  set(VERSION "${LIBKIRO_VERSION_STRING}")  set(LIBKIRO_DESCRIPTION "Small InfiniBand communication Server and Client") diff --git a/src/kiro-client.c b/src/kiro-client.c index 714a003..6e140b5 100644 --- a/src/kiro-client.c +++ b/src/kiro-client.c @@ -56,7 +56,8 @@ struct _KiroClientPrivate {      gboolean                    close_signal; // Flag used to signal event listening to stop for connection tear-down      GMainLoop                   *main_loop;   // Main loop of the server for event polling and handling -    GIOChannel                  *g_io_ec;     // GLib IO Channel encapsulation for the connection manager event channel +    GIOChannel                  *conn_ec;     // GLib IO Channel encapsulation for the connection manager event channel +    GIOChannel                  *rdma_ec;     // GLib IO Channel encapsulation for the communication event channel      GThread                     *main_thread; // Main KIRO client thread  }; @@ -64,6 +65,11 @@ struct _KiroClientPrivate {  G_DEFINE_TYPE (KiroClient, kiro_client, G_TYPE_OBJECT); +// Temporary storage and lock for PING timing +G_LOCK_DEFINE (ping_time); +volatile struct timeval ping_time; + +  KiroClient *  kiro_client_new (void)  { @@ -89,9 +95,10 @@ kiro_client_init (KiroClient *self)  {      KiroClientPrivate *priv = KIRO_CLIENT_GET_PRIVATE (self);      memset (priv, 0, sizeof (&priv)); -      //Hack to make the 'unused function' from the kiro-rdma include go away...      kiro_attach_qp (NULL); +    ping_time.tv_sec = -1; +    ping_time.tv_usec = -1;  } @@ -147,6 +154,87 @@ process_cm_event (GIOChannel *source, GIOCondition condition, gpointer data)  } + +static gboolean +process_rdma_event (GIOChannel *source, GIOCondition condition, gpointer data) +{ +    // Right now, we don't need 'source' and 'condition' +    // Tell the compiler to ignore them by (void)-ing them +    (void) source; +    //(void) condition; +    g_debug ("Message condidition: %i", condition); + +    KiroClientPrivate *priv = (KiroClientPrivate *)data; +    struct ibv_wc wc; + +    if (ibv_poll_cq (priv->conn->recv_cq, 1, &wc) < 0) { +        g_critical ("Failure getting receive completion event from the queue: %s", strerror (errno)); +        return FALSE; +    } +    void *cq_ctx; +    struct ibv_cq *cq; +    int err = ibv_get_cq_event (priv->conn->recv_cq_channel, &cq, &cq_ctx); +    if (!err) +        ibv_ack_cq_events (cq, 1); + +    struct kiro_connection_context *ctx = (struct kiro_connection_context *)priv->conn->context; +    guint type = ((struct kiro_ctrl_msg *)ctx->cf_mr_recv->mem)->msg_type; +    g_debug ("Received a message from the Server of type: %u", type); + +    if (type == KIRO_ACK_RDMA) { +        g_debug ("Got RDMI Access information from Server"); +        ctx->peer_mr = (((struct kiro_ctrl_msg *) (ctx->cf_mr_recv->mem))->peer_mri); +        g_debug ("Expected Memory Size is: %zu", ctx->peer_mr.length); +        ctx->rdma_mr = kiro_create_rdma_memory (priv->conn->pd, ctx->peer_mr.length, IBV_ACCESS_LOCAL_WRITE); + +        if (!ctx->rdma_mr) { +            //TODO: Connection teardown in an event handler routine? Not a good +            //idea... +            g_critical ("Failed to allocate memory for receive buffer (Out of memory?)"); +            rdma_disconnect (priv->conn); +            kiro_destroy_connection_context (&ctx); +            rdma_destroy_ep (priv->conn); +            return FALSE; +        } +    } +    if (type == KIRO_PONG) { +        G_LOCK (ping_time); +        struct timeval local_time; +        gettimeofday (&local_time, NULL); + +        if (ping_time.tv_sec == 0 && ping_time.tv_usec == 0) { +            g_debug ("Received PONG message from server"); +            ping_time.tv_sec = local_time.tv_sec; +            ping_time.tv_usec = local_time.tv_usec; +        } +        else { +            g_debug ("Received unexpected PONG message from server"); +        } + +        G_UNLOCK (ping_time); +    } + +    //Post a generic receive in order to stay responsive to any messages from +    //the server +    if (rdma_post_recv (priv->conn, priv->conn, ctx->cf_mr_recv->mem, ctx->cf_mr_recv->size, ctx->cf_mr_recv->mr)) { +        //TODO: Connection teardown in an event handler routine? Not a good +        //idea... +        g_critical ("Posting generic receive for connection failed: %s", strerror (errno)); +        kiro_destroy_connection_context (&ctx); +        rdma_destroy_ep (priv->conn); +        return FALSE; +    } + +    // make sure the next incoming work completion causes an event on the +    // receive completion channel. We will poll() the channels file descriptor +    // for this in the kiro client main loop. +    ibv_req_notify_cq (priv->conn->recv_cq, 0); + +    g_debug ("Finished RDMA event handling"); +    return TRUE; +} + +  gpointer  start_client_main_loop (gpointer data)  { @@ -210,64 +298,52 @@ kiro_client_connect (KiroClient *self, const char *address, const char *port)      if (!ctx->cf_mr_recv || !ctx->cf_mr_send) {          g_critical ("Failed to register control message memory (Out of memory?)"); -        kiro_destroy_connection_context (&ctx); -        rdma_destroy_ep (priv->conn); -        return -1; +        goto fail;      }      ctx->cf_mr_recv->size = ctx->cf_mr_send->size = sizeof (struct kiro_ctrl_msg);      priv->conn->context = ctx; +    //Post an preemtive receive for the servers welcome message      if (rdma_post_recv (priv->conn, priv->conn, ctx->cf_mr_recv->mem, ctx->cf_mr_recv->size, ctx->cf_mr_recv->mr)) {          g_critical ("Posting preemtive receive for connection failed: %s", strerror (errno)); -        kiro_destroy_connection_context (&ctx); -        rdma_destroy_ep (priv->conn); -        return -1; +        goto fail;      }      if (rdma_connect (priv->conn, NULL)) {          g_critical ("Failed to establish connection to the server: %s", strerror (errno)); -        kiro_destroy_connection_context (&ctx); -        rdma_destroy_ep (priv->conn); -        return -1; +        goto fail;      } -    g_message ("Connection to server established"); -    priv->ec = priv->conn->channel; //For easy access -    struct ibv_wc wc; - -    if (rdma_get_recv_comp (priv->conn, &wc) < 0) { -        g_critical ("Failure waiting for POST from server: %s", strerror (errno)); -        rdma_disconnect (priv->conn); -        kiro_destroy_connection_context (&ctx); -        rdma_destroy_ep (priv->conn); -        return -1; +    g_message ("Connection to server established. Waiting for response."); +    ibv_req_notify_cq (priv->conn->recv_cq, 0); // Make the respective Queue push events onto the channel +    if (!process_rdma_event (NULL, 0, (gpointer)priv)) { +        g_critical ("No RDMA access information received from the server. Failed to connect."); +        goto fail;      } -    g_debug ("Got RDMI Access information from Server"); -    ctx->peer_mr = (((struct kiro_ctrl_msg *) (ctx->cf_mr_recv->mem))->peer_mri); -    g_debug ("Expected Memory Size is: %zu", ctx->peer_mr.length); -    ctx->rdma_mr = kiro_create_rdma_memory (priv->conn->pd, ctx->peer_mr.length, IBV_ACCESS_LOCAL_WRITE); - -    if (!ctx->rdma_mr) { -        g_critical ("Failed to allocate memory for receive buffer (Out of memory?)"); -        rdma_disconnect (priv->conn); -        kiro_destroy_connection_context (&ctx); -        rdma_destroy_ep (priv->conn); -        return -1; -    } +    g_message ("Connected to %s:%s", address, port); +    priv->ec = priv->conn->channel; //For easy access      priv->main_loop = g_main_loop_new (NULL, FALSE); -    priv->g_io_ec = g_io_channel_unix_new (priv->ec->fd); -    g_io_add_watch (priv->g_io_ec, G_IO_IN | G_IO_PRI | G_IO_ERR | G_IO_HUP, process_cm_event, (gpointer)priv); +    priv->conn_ec = g_io_channel_unix_new (priv->ec->fd); +    priv->rdma_ec = g_io_channel_unix_new (priv->conn->recv_cq_channel->fd); +    g_io_add_watch (priv->conn_ec, G_IO_IN | G_IO_PRI, process_cm_event, (gpointer)priv); +    g_io_add_watch (priv->rdma_ec, G_IO_IN | G_IO_PRI, process_rdma_event, (gpointer)priv);      priv->main_thread = g_thread_new ("KIRO Client main loop", start_client_main_loop, priv->main_loop);      // We gave control to the main_loop (with add_watch) and don't need our ref      // any longer -    g_io_channel_unref (priv->g_io_ec); +    g_io_channel_unref (priv->conn_ec); +    g_io_channel_unref (priv->rdma_ec); -    g_message ("Connected to %s:%s", address, port);      return 0; + +fail: +    kiro_destroy_connection_context (&ctx); +    rdma_destroy_ep (priv->conn); +    priv->conn = NULL; +    return -1;  } @@ -309,11 +385,119 @@ kiro_client_sync (KiroClient *self)      }  fail: -    kiro_destroy_connection (&(priv->conn));  +    kiro_destroy_connection (&(priv->conn));      return -1;  } +gboolean +ping_timeout (gpointer data) { + +    //Not needed. Void it to prevent 'unused variable' warning +    (void) data; + +    G_LOCK (ping_time); + +    // Maybe the server did answer while dispatching the timeout? +    if (ping_time.tv_sec != 0 || ping_time.tv_usec != 0) { +        goto done; +    } + +    ping_time.tv_usec = -1; +    ping_time.tv_sec = -1; + + +done: +    G_UNLOCK (ping_time); + +    // Return FALSE to automtically stop the timeout from reoccuring +    return FALSE; +} + + +gint +kiro_client_ping_server (KiroClient *self) +{ +    // Will be returned. -1 for error. +    gint t_usec = 0; + +    KiroClientPrivate *priv = KIRO_CLIENT_GET_PRIVATE (self); +    if (!priv->conn) { +        g_warning ("Client not connected"); +        return -1; +    } + +    struct kiro_connection_context *ctx = (struct kiro_connection_context *)priv->conn->context; + +    struct kiro_ctrl_msg *msg = (struct kiro_ctrl_msg *)(ctx->cf_mr_send->mem); +    msg->msg_type = KIRO_PING; + +    G_LOCK (ping_time); +    ping_time.tv_sec = 0; +    ping_time.tv_usec = 0; +    struct timeval local_time; +    gettimeofday (&local_time, NULL); + +    if (rdma_post_send (priv->conn, priv->conn, ctx->cf_mr_send->mem, ctx->cf_mr_send->size, ctx->cf_mr_send->mr, IBV_SEND_SIGNALED)) { +        g_warning ("Failure while trying to post SEND for PING: %s", strerror (errno)); +        t_usec = -1; +        goto end; +    } +    G_UNLOCK (ping_time); + +    struct ibv_wc wc; +    if (rdma_get_send_comp (priv->conn, &wc) < 0) { +        g_warning ("Failure during PING send: %s", strerror (errno)); +        t_usec = -1; +        goto end; +    } + +    // Set a two-second timeout for the ping +    guint timeout = g_timeout_add_seconds (2, ping_timeout, NULL); + +    //Wait for ping response +    while (ping_time.tv_sec == 0 && ping_time.tv_usec == 0) {}; + + +    G_LOCK (ping_time); +    // No response from the server. Timeout kicked in +    // (Note: The timeout callback has already deregistered itself. We don't +    // need to do that here again) +    if (ping_time.tv_sec == -1 && ping_time.tv_usec == -1) { +        g_message ("PING timed out."); +        G_UNLOCK (ping_time); +        t_usec = -1; +        goto end; +    } + +    // Remove the timeout +    GSource *timeout_source = g_main_context_find_source_by_id (NULL, timeout); +    if (timeout_source) { +        g_source_destroy (timeout_source); +    } + +    gint secs = ping_time.tv_sec - local_time.tv_sec; + +    // tv_usecs wraps back to 0 at 1000000us (1s). +    // This might cause our calculation to produce negative numbers when time > 1s. +    for (int i = 0; i < secs; i++) { +        ping_time.tv_usec += 1000 * 1000; +    } +    t_usec = ping_time.tv_usec - local_time.tv_usec; +    gint millis = (gint)(t_usec/1000.); +    G_UNLOCK (ping_time); + +    g_debug ("Server responded to PING in: %is, %ims, %ius", secs, millis, t_usec); + +end: +    G_LOCK (ping_time); +    ping_time.tv_sec = -1; +    ping_time.tv_usec = -1; +    G_UNLOCK (ping_time); +    return t_usec; +} + +  void *  kiro_client_get_memory (KiroClient *self)  { @@ -331,7 +515,7 @@ kiro_client_get_memory (KiroClient *self)  } -size_t  +size_t  kiro_client_get_memory_size (KiroClient *self)  {      KiroClientPrivate *priv = KIRO_CLIENT_GET_PRIVATE (self); @@ -375,19 +559,23 @@ kiro_client_disconnect (KiroClient *self)      // We don't need the connection management IO channel container any more.      // Unref and thus free it. -    g_io_channel_unref (priv->g_io_ec); -    priv->g_io_ec = NULL; +    g_io_channel_unref (priv->conn_ec); +    priv->conn_ec = NULL; + +    // The same goes for the cp channels +    g_io_channel_unref (priv->rdma_ec); +    priv->rdma_ec = NULL;      priv->close_signal = FALSE;      //kiro_destroy_connection does not free RDMA memory. Therefore, we need to -    //cache the memory pointer and free the memory afterwards manually  +    //cache the memory pointer and free the memory afterwards manually      struct kiro_connection_context *ctx = (struct kiro_connection_context *) (priv->conn->context);      void *rdma_mem = ctx->rdma_mr->mem;      kiro_destroy_connection (&(priv->conn));      free (rdma_mem); -    // priv->ec is just an easy-access pointer. Don't free it. Just NULL it  +    // priv->ec is just an easy-access pointer. Don't free it. Just NULL it      priv->ec = NULL;      g_message ("Client disconnected from server");  } diff --git a/src/kiro-client.h b/src/kiro-client.h index 9c6036d..3be2621 100644 --- a/src/kiro-client.h +++ b/src/kiro-client.h @@ -160,6 +160,19 @@ void        kiro_client_disconnect             (KiroClient *client);  int         kiro_client_sync                (KiroClient *client);  /** + * kiro_client_ping_server - Sends a PING to the server + * @client: (transfer none): The #KiroServer to send the PING from + * Returns: + *   A #guint telling the time (in microseconds) how long it took for the + *   connected #KiroServer to reply + * Description: + *   Sends a PING package to the connected #KiroServer and waits for a PONG + *   package from that server. The time between sending the PING and receiving + *   the PONG (in microseconds) is measured and returned by this function. + */ +gint        kiro_client_ping_server         (KiroClient *client); + +/**   * kiro_client_get_memory - Return a pointer to the current client memory   * @client: (transfer none): The #KiroClient to get the memory from   * Returns: (transfer none): diff --git a/src/kiro-rdma.h b/src/kiro-rdma.h index af502ec..5b4895f 100644 --- a/src/kiro-rdma.h +++ b/src/kiro-rdma.h @@ -19,6 +19,7 @@  #include <stdio.h>  #include <stdlib.h>  #include <unistd.h> +#include <sys/time.h>  #ifndef __KIRO_RDMA_H__  #define __KIRO_RDMA_H__ @@ -36,6 +37,8 @@ struct kiro_connection_context {      struct ibv_mr           peer_mr;                // RDMA Memory Region Information of the peer +    void                    *container;             // Make the connection aware of its container (if any) +      enum {          KIRO_IDLE,          KIRO_MRI_REQUESTED,                         // Memory Region Information Requested @@ -51,11 +54,12 @@ struct kiro_ctrl_msg {      enum {          KIRO_REQ_RDMA,                              // Requesting RDMA Access to/from the peer          KIRO_ACK_RDMA,                              // acknowledge RDMA Request and provide Memory Region Information -        KIRO_REJ_RDMA                               // RDMA Request rejected :(  (peer_mri will be invalid) +        KIRO_REJ_RDMA,                              // RDMA Request rejected :(  (peer_mri will be invalid) +        KIRO_PING,                                  // PING Message +        KIRO_PONG                                   // PONG Message (PING reply)      } msg_type;      struct ibv_mr peer_mri; -  }; @@ -85,8 +89,8 @@ kiro_attach_qp (struct rdma_cm_id *id)      qp_attr.send_cq = id->send_cq;      qp_attr.recv_cq = id->recv_cq;      qp_attr.qp_type = IBV_QPT_RC; -    qp_attr.cap.max_send_wr = 1; -    qp_attr.cap.max_recv_wr = 1; +    qp_attr.cap.max_send_wr = 10; +    qp_attr.cap.max_recv_wr = 10;      qp_attr.cap.max_send_sge = 1;      qp_attr.cap.max_recv_sge = 1;      return rdma_create_qp (id, id->pd, &qp_attr); diff --git a/src/kiro-server.c b/src/kiro-server.c index bedba95..ff6c0f8 100644 --- a/src/kiro-server.c +++ b/src/kiro-server.c @@ -59,7 +59,7 @@ struct _KiroServerPrivate {      gboolean                    close_signal;    // Flag used to signal event listening to stop for server shutdown      GMainLoop                   *main_loop;      // Main loop of the server for event polling and handling -    GIOChannel                  *g_io_ec;        // GLib IO Channel encapsulation for the connection manager event channel +    GIOChannel                  *conn_ec;        // GLib IO Channel encapsulation for the connection manager event channel      GThread                     *main_thread;    // Main KIRO server thread  }; @@ -67,6 +67,15 @@ struct _KiroServerPrivate {  G_DEFINE_TYPE (KiroServer, kiro_server, G_TYPE_OBJECT); +struct kiro_client_connection { + +    guint                       id;              // Client identification (Easy access) +    GIOChannel                  *rcv_ec;         // GLib IO Channel encapsulation for receive completions for the client +    guint                       source_id;       // ID of the source created by g_io_add_watch, needed to remove it again +    struct rdma_cm_id           *conn;           // Connection Manager ID of the client +}; + +  KiroServer *  kiro_server_new (void)  { @@ -213,6 +222,65 @@ welcome_client (struct rdma_cm_id *client, void *mem, size_t mem_size)  static gboolean +process_rdma_event (GIOChannel *source, GIOCondition condition, gpointer data) +{ +    // Right now, we don't need 'source' and 'condition' +    // Tell the compiler to ignore them by (void)-ing them +    (void) source; +    //(void) condition; +    g_debug ("Message condition: %i", condition); + +    struct kiro_client_connection *cc = (struct kiro_client_connection *)data; +    struct ibv_wc wc; + +    if (ibv_poll_cq (cc->conn->recv_cq, 1, &wc) < 0) { +        g_critical ("Failure getting receive completion event from the queue: %s", strerror (errno)); +        return FALSE; +    } +    void *cq_ctx; +    struct ibv_cq *cq; +    int err = ibv_get_cq_event (cc->conn->recv_cq_channel, &cq, &cq_ctx); +    if (!err) +        ibv_ack_cq_events (cq, 1); + +    struct kiro_connection_context *ctx = (struct kiro_connection_context *)cc->conn->context; +    guint type = ((struct kiro_ctrl_msg *)ctx->cf_mr_recv->mem)->msg_type; +    g_debug ("Received a message from Client %u of type %u", cc->id, type); + +    if (type == KIRO_PING) { +        struct kiro_ctrl_msg *msg = (struct kiro_ctrl_msg *) (ctx->cf_mr_send->mem); +        msg->msg_type = KIRO_PONG; + +        if (rdma_post_send (cc->conn, cc->conn, ctx->cf_mr_send->mem, ctx->cf_mr_send->size, ctx->cf_mr_send->mr, IBV_SEND_SIGNALED)) { +            g_warning ("Failure while trying to post PONG send: %s", strerror (errno)); +            goto done; +        } + +        if (rdma_get_send_comp (cc->conn, &wc) < 0) { +            g_warning ("An error occured while sending PONG: %s", strerror (errno)); +        } +    } + +done: +    //Post a generic receive in order to stay responsive to any messages from +    //the client +    if (rdma_post_recv (cc->conn, cc->conn, ctx->cf_mr_recv->mem, ctx->cf_mr_recv->size, ctx->cf_mr_recv->mr)) { +        //TODO: Connection teardown in an event handler routine? Not a good +        //idea... +        g_critical ("Posting generic receive for event handling failed: %s", strerror (errno)); +        kiro_destroy_connection_context (&ctx); +        rdma_destroy_ep (cc->conn); +        return FALSE; +    } + +    ibv_req_notify_cq (cc->conn->recv_cq, 0); // Make the respective Queue push events onto the channel + +    g_debug ("Finished RDMA event handling"); +    return TRUE; +} + + +static gboolean  process_cm_event (GIOChannel *source, GIOCondition condition, gpointer data)  {      // Right now, we don't need 'source' and 'condition' @@ -245,29 +313,65 @@ process_cm_event (GIOChannel *source, GIOCondition condition, gpointer data)                  return TRUE;              } -            g_debug ("Got connection request from client"); +            do { +                g_debug ("Got connection request from client"); +                struct kiro_client_connection *cc = (struct kiro_client_connection *)g_try_malloc (sizeof (struct kiro_client_connection)); +                if (!cc) { +                    errno = ENOMEM; +                    rdma_reject (ev->id, NULL, 0); +                    goto fail; +                } + +                if (connect_client (ev->id)) +                    goto fail; -            if (0 == connect_client (ev->id)) {                  // Post a welcoming "Receive" for handshaking -                if (0 == welcome_client (ev->id, priv->mem, priv->mem_size)) { -                    // Connection set-up successfully! (Server) -                    struct kiro_connection_context *ctx = (struct kiro_connection_context *) (ev->id->context); -                    ctx->identifier = priv->next_client_id++; -                    priv->clients = g_list_append (priv->clients, (gpointer)ev->id); -                    g_debug ("Client connection assigned with ID %u", ctx->identifier); -                    g_debug ("Currently %u clients in total are connected", g_list_length (priv->clients)); -                } -            } -            else -                g_warning ("Failed to accept client connection: %s", strerror (errno)); +                if (welcome_client (ev->id, priv->mem, priv->mem_size)) +                    goto fail; + +                ibv_req_notify_cq (ev->id->recv_cq, 0); // Make the respective Queue push events onto the channel + +                // Connection set-up successfully! (Server) +                // ctx was created by 'welcome_client' +                struct kiro_connection_context *ctx = (struct kiro_connection_context *) (ev->id->context); +                ctx->identifier = priv->next_client_id++; +                ctx->container = cc; // Make the connection aware of its container + +                // Fill the client connection container. Also create a +                // g_io_channel wrapper for the new clients receive queue event +                // channel and add a main_loop watch to it. +                cc->id = ctx->identifier; +                cc->conn = ev->id; +                cc->rcv_ec = g_io_channel_unix_new (ev->id->recv_cq_channel->fd); +                cc->source_id = g_io_add_watch (cc->rcv_ec, G_IO_IN | G_IO_PRI, process_rdma_event, (gpointer)cc); +                g_io_channel_unref (cc->rcv_ec); // main_loop now holds a reference. We don't need ours any more + +                priv->clients = g_list_append (priv->clients, (gpointer)cc); +                g_debug ("Client connection assigned with ID %u", ctx->identifier); +                g_debug ("Currently %u clients in total are connected", g_list_length (priv->clients)); +                break; + +                fail: +                    g_warning ("Failed to accept client connection: %s", strerror (errno)); + +            } while(0);          }          else if (ev->event == RDMA_CM_EVENT_DISCONNECTED) { -            GList *client = g_list_find (priv->clients, (gconstpointer) ev->id); +            struct kiro_connection_context *ctx = (struct kiro_connection_context *) (ev->id->context); +            if (!ctx->container) { +                g_debug ("Got disconnect request from unknown client"); +                return FALSE; +            } + +            GList *client = g_list_find (priv->clients, (gconstpointer) ctx->container);              if (client) { -                struct kiro_connection_context *ctx = (struct kiro_connection_context *) (ev->id->context);                  g_debug ("Got disconnect request from client ID %u", ctx->identifier); +                struct kiro_client_connection *cc = (struct kiro_client_connection *)ctx->container; +                g_source_remove (cc->source_id); // this also unrefs the GIOChannel of the source. Nice.                  priv->clients = g_list_delete_link (priv->clients, client); +                g_free (cc); +                ctx->container = NULL;              }              else                  g_debug ("Got disconnect request from unknown client"); @@ -281,6 +385,7 @@ process_cm_event (GIOChannel *source, GIOCondition condition, gpointer data)              struct ibv_pd *pd = ev->id->pd;              kiro_destroy_connection (& (ev->id));              g_free (pd); +              g_debug ("Connection closed successfully. %u connected clients remaining", g_list_length (priv->clients));          } @@ -324,7 +429,7 @@ kiro_server_start (KiroServer *self, const char *address, const char *port, void      int rtn = rdma_getaddrinfo (addr_c, port_c, &hints, &res_addrinfo);      g_free (addr_c);      g_free (port_c); -     +      if (rtn) {          g_critical ("Failed to create address information: %s", strerror (errno));          return -1; @@ -382,13 +487,13 @@ kiro_server_start (KiroServer *self, const char *address, const char *port, void      }      priv->main_loop = g_main_loop_new (NULL, FALSE); -    priv->g_io_ec = g_io_channel_unix_new (priv->ec->fd); -    g_io_add_watch (priv->g_io_ec, G_IO_IN | G_IO_PRI | G_IO_ERR | G_IO_HUP, process_cm_event, (gpointer)priv); +    priv->conn_ec = g_io_channel_unix_new (priv->ec->fd); +    g_io_add_watch (priv->conn_ec, G_IO_IN | G_IO_PRI, process_cm_event, (gpointer)priv);      priv->main_thread = g_thread_new ("KIRO Server main loop", start_server_main_loop, priv->main_loop);      // We gave control to the main_loop (with add_watch) and don't need our ref      // any longer -    g_io_channel_unref (priv->g_io_ec); +    g_io_channel_unref (priv->conn_ec);      g_message ("Enpoint listening"); @@ -400,19 +505,23 @@ static void  disconnect_client (gpointer data, gpointer user_data)  {      (void)user_data; -     +      if (data) { -        struct rdma_cm_id *id = (struct rdma_cm_id *)data; +        struct kiro_client_connection *cc = (struct kiro_client_connection *)data; +        struct rdma_cm_id *id = cc->conn;          struct kiro_connection_context *ctx = (struct kiro_connection_context *) (id->context);          g_debug ("Disconnecting client: %u", ctx->identifier); +        g_source_remove (cc->source_id); +          // Note:          // The ProtectionDomain needs to be buffered and freed manually.          // Each connecting client is attached with its own pd, which we          // create manually. So we also need to clean it up manually.          // This needs to be done AFTER the connection is brought down, so we          // buffer the pointer to the pd and clean it up afterwards. -        struct ibv_pd *pd = id->pd; -        kiro_destroy_connection (&id); +        struct ibv_pd *pd = cc->conn->pd; +        kiro_destroy_connection (&(cc->conn)); +        g_free (cc);          g_free (pd);      }  } @@ -432,7 +541,7 @@ kiro_server_stop (KiroServer *self)      //Shut down event listening      priv->close_signal = TRUE;      g_debug ("Event handling stopped"); -     +      g_list_foreach (priv->clients, disconnect_client, NULL);      g_list_free (priv->clients); @@ -448,8 +557,8 @@ kiro_server_stop (KiroServer *self)      // We don't need the connection management IO channel container any more.      // Unref and thus free it. -    g_io_channel_unref (priv->g_io_ec); -    priv->g_io_ec = NULL; +    g_io_channel_unref (priv->conn_ec); +    priv->conn_ec = NULL;      priv->close_signal = FALSE;      // kiro_destroy_connection would try to call rdma_disconnect on the given diff --git a/test/test-client-latency.c b/test/test-client-latency.c index d05747d..208c37c 100644 --- a/test/test-client-latency.c +++ b/test/test-client-latency.c @@ -6,7 +6,7 @@  #include <assert.h> -int  +int  main ( int argc, char *argv[] )  {      if (argc < 3) { @@ -15,38 +15,30 @@ main ( int argc, char *argv[] )      }      KiroClient *client = kiro_client_new (); -    KiroTrb *trb = kiro_trb_new ();      if (-1 == kiro_client_connect (client, argv[1], argv[2])) {          kiro_client_free (client);          return -1;      } -    kiro_client_sync (client); -    kiro_trb_adopt (trb, kiro_client_get_memory (client)); +    int iterations = 10000; -    GTimer *timer = g_timer_new (); -while (1) {  -    g_timer_reset (timer); +while (1) {      int i = 0; -    while(i < 50000) { -        kiro_client_sync (client); +    float ping_us = 0; +    int fail_count = 0; +    while(i < iterations) { +        float tmp = kiro_client_ping_server (client); +        if (tmp < 0) +            fail_count++; +        else +            ping_us += tmp;          i++;      } -    double elapsed = g_timer_elapsed (timer, NULL); -    printf ("Average Latency: %fus\n", (elapsed/50000.)*1000*1000); +    printf ("Average Latency: %fus\n", ping_us/(float)(iterations - fail_count));  } -    g_timer_stop (timer);      kiro_client_free (client); -    kiro_trb_free (trb);      return 0;  } - - - - - - -  | 
