diff options
Diffstat (limited to 'src/kiro-client.c')
-rw-r--r-- | src/kiro-client.c | 274 |
1 files changed, 231 insertions, 43 deletions
diff --git a/src/kiro-client.c b/src/kiro-client.c index 714a003..6e140b5 100644 --- a/src/kiro-client.c +++ b/src/kiro-client.c @@ -56,7 +56,8 @@ struct _KiroClientPrivate { gboolean close_signal; // Flag used to signal event listening to stop for connection tear-down GMainLoop *main_loop; // Main loop of the server for event polling and handling - GIOChannel *g_io_ec; // GLib IO Channel encapsulation for the connection manager event channel + GIOChannel *conn_ec; // GLib IO Channel encapsulation for the connection manager event channel + GIOChannel *rdma_ec; // GLib IO Channel encapsulation for the communication event channel GThread *main_thread; // Main KIRO client thread }; @@ -64,6 +65,11 @@ struct _KiroClientPrivate { G_DEFINE_TYPE (KiroClient, kiro_client, G_TYPE_OBJECT); +// Temporary storage and lock for PING timing +G_LOCK_DEFINE (ping_time); +volatile struct timeval ping_time; + + KiroClient * kiro_client_new (void) { @@ -89,9 +95,10 @@ kiro_client_init (KiroClient *self) { KiroClientPrivate *priv = KIRO_CLIENT_GET_PRIVATE (self); memset (priv, 0, sizeof (&priv)); - //Hack to make the 'unused function' from the kiro-rdma include go away... kiro_attach_qp (NULL); + ping_time.tv_sec = -1; + ping_time.tv_usec = -1; } @@ -147,6 +154,87 @@ process_cm_event (GIOChannel *source, GIOCondition condition, gpointer data) } + +static gboolean +process_rdma_event (GIOChannel *source, GIOCondition condition, gpointer data) +{ + // Right now, we don't need 'source' and 'condition' + // Tell the compiler to ignore them by (void)-ing them + (void) source; + //(void) condition; + g_debug ("Message condidition: %i", condition); + + KiroClientPrivate *priv = (KiroClientPrivate *)data; + struct ibv_wc wc; + + if (ibv_poll_cq (priv->conn->recv_cq, 1, &wc) < 0) { + g_critical ("Failure getting receive completion event from the queue: %s", strerror (errno)); + return FALSE; + } + void *cq_ctx; + struct ibv_cq *cq; + int err = ibv_get_cq_event (priv->conn->recv_cq_channel, &cq, &cq_ctx); + if (!err) + ibv_ack_cq_events (cq, 1); + + struct kiro_connection_context *ctx = (struct kiro_connection_context *)priv->conn->context; + guint type = ((struct kiro_ctrl_msg *)ctx->cf_mr_recv->mem)->msg_type; + g_debug ("Received a message from the Server of type: %u", type); + + if (type == KIRO_ACK_RDMA) { + g_debug ("Got RDMI Access information from Server"); + ctx->peer_mr = (((struct kiro_ctrl_msg *) (ctx->cf_mr_recv->mem))->peer_mri); + g_debug ("Expected Memory Size is: %zu", ctx->peer_mr.length); + ctx->rdma_mr = kiro_create_rdma_memory (priv->conn->pd, ctx->peer_mr.length, IBV_ACCESS_LOCAL_WRITE); + + if (!ctx->rdma_mr) { + //TODO: Connection teardown in an event handler routine? Not a good + //idea... + g_critical ("Failed to allocate memory for receive buffer (Out of memory?)"); + rdma_disconnect (priv->conn); + kiro_destroy_connection_context (&ctx); + rdma_destroy_ep (priv->conn); + return FALSE; + } + } + if (type == KIRO_PONG) { + G_LOCK (ping_time); + struct timeval local_time; + gettimeofday (&local_time, NULL); + + if (ping_time.tv_sec == 0 && ping_time.tv_usec == 0) { + g_debug ("Received PONG message from server"); + ping_time.tv_sec = local_time.tv_sec; + ping_time.tv_usec = local_time.tv_usec; + } + else { + g_debug ("Received unexpected PONG message from server"); + } + + G_UNLOCK (ping_time); + } + + //Post a generic receive in order to stay responsive to any messages from + //the server + if (rdma_post_recv (priv->conn, priv->conn, ctx->cf_mr_recv->mem, ctx->cf_mr_recv->size, ctx->cf_mr_recv->mr)) { + //TODO: Connection teardown in an event handler routine? Not a good + //idea... + g_critical ("Posting generic receive for connection failed: %s", strerror (errno)); + kiro_destroy_connection_context (&ctx); + rdma_destroy_ep (priv->conn); + return FALSE; + } + + // make sure the next incoming work completion causes an event on the + // receive completion channel. We will poll() the channels file descriptor + // for this in the kiro client main loop. + ibv_req_notify_cq (priv->conn->recv_cq, 0); + + g_debug ("Finished RDMA event handling"); + return TRUE; +} + + gpointer start_client_main_loop (gpointer data) { @@ -210,64 +298,52 @@ kiro_client_connect (KiroClient *self, const char *address, const char *port) if (!ctx->cf_mr_recv || !ctx->cf_mr_send) { g_critical ("Failed to register control message memory (Out of memory?)"); - kiro_destroy_connection_context (&ctx); - rdma_destroy_ep (priv->conn); - return -1; + goto fail; } ctx->cf_mr_recv->size = ctx->cf_mr_send->size = sizeof (struct kiro_ctrl_msg); priv->conn->context = ctx; + //Post an preemtive receive for the servers welcome message if (rdma_post_recv (priv->conn, priv->conn, ctx->cf_mr_recv->mem, ctx->cf_mr_recv->size, ctx->cf_mr_recv->mr)) { g_critical ("Posting preemtive receive for connection failed: %s", strerror (errno)); - kiro_destroy_connection_context (&ctx); - rdma_destroy_ep (priv->conn); - return -1; + goto fail; } if (rdma_connect (priv->conn, NULL)) { g_critical ("Failed to establish connection to the server: %s", strerror (errno)); - kiro_destroy_connection_context (&ctx); - rdma_destroy_ep (priv->conn); - return -1; + goto fail; } - g_message ("Connection to server established"); - priv->ec = priv->conn->channel; //For easy access - struct ibv_wc wc; - - if (rdma_get_recv_comp (priv->conn, &wc) < 0) { - g_critical ("Failure waiting for POST from server: %s", strerror (errno)); - rdma_disconnect (priv->conn); - kiro_destroy_connection_context (&ctx); - rdma_destroy_ep (priv->conn); - return -1; + g_message ("Connection to server established. Waiting for response."); + ibv_req_notify_cq (priv->conn->recv_cq, 0); // Make the respective Queue push events onto the channel + if (!process_rdma_event (NULL, 0, (gpointer)priv)) { + g_critical ("No RDMA access information received from the server. Failed to connect."); + goto fail; } - g_debug ("Got RDMI Access information from Server"); - ctx->peer_mr = (((struct kiro_ctrl_msg *) (ctx->cf_mr_recv->mem))->peer_mri); - g_debug ("Expected Memory Size is: %zu", ctx->peer_mr.length); - ctx->rdma_mr = kiro_create_rdma_memory (priv->conn->pd, ctx->peer_mr.length, IBV_ACCESS_LOCAL_WRITE); - - if (!ctx->rdma_mr) { - g_critical ("Failed to allocate memory for receive buffer (Out of memory?)"); - rdma_disconnect (priv->conn); - kiro_destroy_connection_context (&ctx); - rdma_destroy_ep (priv->conn); - return -1; - } + g_message ("Connected to %s:%s", address, port); + priv->ec = priv->conn->channel; //For easy access priv->main_loop = g_main_loop_new (NULL, FALSE); - priv->g_io_ec = g_io_channel_unix_new (priv->ec->fd); - g_io_add_watch (priv->g_io_ec, G_IO_IN | G_IO_PRI | G_IO_ERR | G_IO_HUP, process_cm_event, (gpointer)priv); + priv->conn_ec = g_io_channel_unix_new (priv->ec->fd); + priv->rdma_ec = g_io_channel_unix_new (priv->conn->recv_cq_channel->fd); + g_io_add_watch (priv->conn_ec, G_IO_IN | G_IO_PRI, process_cm_event, (gpointer)priv); + g_io_add_watch (priv->rdma_ec, G_IO_IN | G_IO_PRI, process_rdma_event, (gpointer)priv); priv->main_thread = g_thread_new ("KIRO Client main loop", start_client_main_loop, priv->main_loop); // We gave control to the main_loop (with add_watch) and don't need our ref // any longer - g_io_channel_unref (priv->g_io_ec); + g_io_channel_unref (priv->conn_ec); + g_io_channel_unref (priv->rdma_ec); - g_message ("Connected to %s:%s", address, port); return 0; + +fail: + kiro_destroy_connection_context (&ctx); + rdma_destroy_ep (priv->conn); + priv->conn = NULL; + return -1; } @@ -309,11 +385,119 @@ kiro_client_sync (KiroClient *self) } fail: - kiro_destroy_connection (&(priv->conn)); + kiro_destroy_connection (&(priv->conn)); return -1; } +gboolean +ping_timeout (gpointer data) { + + //Not needed. Void it to prevent 'unused variable' warning + (void) data; + + G_LOCK (ping_time); + + // Maybe the server did answer while dispatching the timeout? + if (ping_time.tv_sec != 0 || ping_time.tv_usec != 0) { + goto done; + } + + ping_time.tv_usec = -1; + ping_time.tv_sec = -1; + + +done: + G_UNLOCK (ping_time); + + // Return FALSE to automtically stop the timeout from reoccuring + return FALSE; +} + + +gint +kiro_client_ping_server (KiroClient *self) +{ + // Will be returned. -1 for error. + gint t_usec = 0; + + KiroClientPrivate *priv = KIRO_CLIENT_GET_PRIVATE (self); + if (!priv->conn) { + g_warning ("Client not connected"); + return -1; + } + + struct kiro_connection_context *ctx = (struct kiro_connection_context *)priv->conn->context; + + struct kiro_ctrl_msg *msg = (struct kiro_ctrl_msg *)(ctx->cf_mr_send->mem); + msg->msg_type = KIRO_PING; + + G_LOCK (ping_time); + ping_time.tv_sec = 0; + ping_time.tv_usec = 0; + struct timeval local_time; + gettimeofday (&local_time, NULL); + + if (rdma_post_send (priv->conn, priv->conn, ctx->cf_mr_send->mem, ctx->cf_mr_send->size, ctx->cf_mr_send->mr, IBV_SEND_SIGNALED)) { + g_warning ("Failure while trying to post SEND for PING: %s", strerror (errno)); + t_usec = -1; + goto end; + } + G_UNLOCK (ping_time); + + struct ibv_wc wc; + if (rdma_get_send_comp (priv->conn, &wc) < 0) { + g_warning ("Failure during PING send: %s", strerror (errno)); + t_usec = -1; + goto end; + } + + // Set a two-second timeout for the ping + guint timeout = g_timeout_add_seconds (2, ping_timeout, NULL); + + //Wait for ping response + while (ping_time.tv_sec == 0 && ping_time.tv_usec == 0) {}; + + + G_LOCK (ping_time); + // No response from the server. Timeout kicked in + // (Note: The timeout callback has already deregistered itself. We don't + // need to do that here again) + if (ping_time.tv_sec == -1 && ping_time.tv_usec == -1) { + g_message ("PING timed out."); + G_UNLOCK (ping_time); + t_usec = -1; + goto end; + } + + // Remove the timeout + GSource *timeout_source = g_main_context_find_source_by_id (NULL, timeout); + if (timeout_source) { + g_source_destroy (timeout_source); + } + + gint secs = ping_time.tv_sec - local_time.tv_sec; + + // tv_usecs wraps back to 0 at 1000000us (1s). + // This might cause our calculation to produce negative numbers when time > 1s. + for (int i = 0; i < secs; i++) { + ping_time.tv_usec += 1000 * 1000; + } + t_usec = ping_time.tv_usec - local_time.tv_usec; + gint millis = (gint)(t_usec/1000.); + G_UNLOCK (ping_time); + + g_debug ("Server responded to PING in: %is, %ims, %ius", secs, millis, t_usec); + +end: + G_LOCK (ping_time); + ping_time.tv_sec = -1; + ping_time.tv_usec = -1; + G_UNLOCK (ping_time); + return t_usec; +} + + void * kiro_client_get_memory (KiroClient *self) { @@ -331,7 +515,7 @@ kiro_client_get_memory (KiroClient *self) } -size_t +size_t kiro_client_get_memory_size (KiroClient *self) { KiroClientPrivate *priv = KIRO_CLIENT_GET_PRIVATE (self); @@ -375,19 +559,23 @@ kiro_client_disconnect (KiroClient *self) // We don't need the connection management IO channel container any more. // Unref and thus free it. - g_io_channel_unref (priv->g_io_ec); - priv->g_io_ec = NULL; + g_io_channel_unref (priv->conn_ec); + priv->conn_ec = NULL; + + // The same goes for the cp channels + g_io_channel_unref (priv->rdma_ec); + priv->rdma_ec = NULL; priv->close_signal = FALSE; //kiro_destroy_connection does not free RDMA memory. Therefore, we need to - //cache the memory pointer and free the memory afterwards manually + //cache the memory pointer and free the memory afterwards manually struct kiro_connection_context *ctx = (struct kiro_connection_context *) (priv->conn->context); void *rdma_mem = ctx->rdma_mr->mem; kiro_destroy_connection (&(priv->conn)); free (rdma_mem); - // priv->ec is just an easy-access pointer. Don't free it. Just NULL it + // priv->ec is just an easy-access pointer. Don't free it. Just NULL it priv->ec = NULL; g_message ("Client disconnected from server"); } |