An Example Module

The following example module connects to Nexxus, retrieves the configuration for the system, and then goes idle. It impliments a few IPC commands. The 'PING' command will cause the module to attempt to ping the remote node. The AUTO_PING_SET command will enable or disable asynchronous event notification. If enabled, an unsolicited message with a job id of 0 is sent to the client with the results of the ping. It is a very simple module, and is here only to demonstrate how to use libloose.a to simplify the process of writing VACM modules. For an example of a more complex module, examine the EMP module source code.
#include <stdio.h>
#include <stdlib.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/un.h>
#include <sys/time.h>
#include <libloose.h>

static void  config_cb(__uint32_t job_id, int client_fd, char *msg);
static void  deinit_cb(__uint32_t job_id, int client_fd, char *msg);
static void  discon_cb(__uint32_t job_id, int client_fd, char *msg);
static void  ipc_cb(char *node_id, __uint32_t job_id, int client_fd, char *msg);
static int  do_ping(__uint32_t job_id, int client_fd, char *node_id);
static int  timer_cb(void *arg);

#define TRUE  1
#define FALSE  !TRUE
#define KNOWN_UNUSED_PARAM(x) { int __bleh; __bleh = (int) x;}

typedef struct  nodelist
  {
  char            node_id[64];
  unsigned int    ip_address;
  int              subscribers[255];
  int              timer_tag;

  struct nodelist  *next;
  struct nodelist  *prev;
  } NODELIST;

static NODELIST  *node_create(char *node_id);
static NODELIST  *lookup_node(char *node_id);
static void      node_destroy(NODELIST  *node);
static void     node_remove_client(NODELIST *node, int client_fd);
static int      node_add_client(NODELIST *node, int client_fd);

static NODELIST  *nodelist_head = NULL;
static NODELIST  *nodelist_tail = NULL;

int main(argc, argv)
int    argc;
char  **argv;
{
  KNOWN_UNUSED_PARAM(argc);
  KNOWN_UNUSED_PARAM(argv);

  lm_init();
  lm_nexxus_connect();
  if (lm_register("ICMP ECHO",
                  "ICMP Echo Tester",
                  "Sends ICMP ECHO messages to a remote host to test connectivity",
                  "San Mehat (nettwerk@valinux.com)",
                  "ICMP_ECHO",
                  1,
                  0) < 0)
    {
    printf("Unable to register module with Nexxus\n");
    exit(-1);
    }
  lm_main_loop(config_cb, deinit_cb, discon_cb, ipc_cb);
  exit(0);
}

static int timer_cb(void *arg)
{
  NODELIST  *node = (NODELIST *) arg;
  int        rc;

  rc = do_ping(0, 0, node->node_id);
#if 0
  printf("[PING] Node '%s' -> %s\n",
          node->node_id,
          (rc == 0 ? "OK" : "FAIL"));
#endif
  return(TRUE);
}

static void config_cb(__uint32_t job_id, int client_fd, char *msg)
{
  NODELIST  *node;
  char      *cmd_module;
  char      *node_id;

  KNOWN_UNUSED_PARAM(job_id);
  KNOWN_UNUSED_PARAM(client_fd);

  cmd_module = strtok(msg, ":");
  node_id = strtok(NULL, ":");

  node = lookup_node(node_id);

  if (!strcasecmp(cmd_module, "NODE"))
    node = node_create(node_id);
  else if (!strcasecmp(cmd_module, "END_NODE"))
    {
    }
  else if (!strcasecmp(cmd_module, "ADDITION"))
    {
    node = node_create(node_id);
    }
  else if (!strcasecmp(cmd_module, "DELETION"))
    {
    node_destroy(node);
    }
  else if (!strcasecmp(cmd_module, "RENAME"))
    {
    char  *new_name;
    
    if (!(new_name = strtok(NULL,":")))
      {
      printf("[ICMP ECHO] Malformatted cfg msg\n");
      return;
      }
    strcpy(node->node_id, new_name);
    }
  else if (!strcasecmp(cmd_module, "GLOBAL"))
    {
    char  *variable;
    char  *value;
    
    if (!(variable = strtok(NULL,":")))
      {
      printf("[ICMP ECHO] Malformatted cfg msg\n");
      return;
      }
    if (!(value = strtok(NULL,":")))
      {
      printf("[ICMP ECHO] Malformatted cfg msg\n");
      return;
      }
    if (!strcasecmp(variable, "IP_ADDRESS"))
      {
      struct  in_addr  addr;
      char    msg[255];

      node->ip_address = inet_addr(value);
      addr.s_addr = node->ip_address;
      snprintf(msg,sizeof(msg),"Node '%s' (ip %s) ready",
            node_id,inet_ntoa(addr));
      lm_log(msg);
      }
    }
  else if (!strcasecmp(cmd_module, "ICMP_ECHO"))
    {
    char  *variable;
    
    if (!(variable = strtok(NULL,":")))
      {
      printf("[ICMP ECHO] Malformatted cfg msg\n");
      return;
      }
    printf("[ICMP ECHO] Received unknown local module variable '%s'\n",
          variable);

    }
  else
    printf("[ICMP ECHO] Received unknown config event <%s>\n",cmd_module);
}

static void deinit_cb(__uint32_t job_id, int client_fd, char *msg)
{
  KNOWN_UNUSED_PARAM(job_id);
  KNOWN_UNUSED_PARAM(client_fd);
  KNOWN_UNUSED_PARAM(msg);

  printf("[ICMP ECHO] Received request to shutdown\n");
  lm_nexxus_disconnect();
  exit(0);
}

static void discon_cb(__uint32_t job_id, int client_fd, char *msg)
{
  NODELIST  *node;

  KNOWN_UNUSED_PARAM(job_id);
  KNOWN_UNUSED_PARAM(msg);

  node = nodelist_head;
  while(node)
    {
    node_remove_client(node, client_fd);
    node = node->next;
    }
}

static void ipc_cb(char *node_id, __uint32_t job_id, int client_fd, char *msg)
{
  char      *cmd;
  char      response_msg[255];
  char      *node_id_glob;
  NODELIST  *node;

  if (!(cmd = strtok(msg, ":")))
    goto out_malformatted;

  if (!(node_id_glob = strtok(msg, ":")))
    goto out_malformatted;

  if (!(node = lookup_node(node_id)))
    {
    sprintf(response_msg,
            "%d:%d:FOR:JOB_ERROR:NODE_NOT_FOUND",
            job_id,
            client_fd);
    lm_send_to_nexxus(response_msg);
    return;
    }
  if (!strcasecmp(cmd, "PING"))
    {
    do_ping(job_id, client_fd, node_id);
    }
  else if (!strcasecmp(cmd, "AUTO_PING_SET"))
    {
    char      *mode;

    if (!(mode = strtok(NULL, ":")))
      goto out_malformatted;
  
    if (!strcasecmp(mode, "ON"))
      node_add_client(node, client_fd);
    else if (!strcasecmp(mode, "OFF"))
      node_remove_client(node, client_fd);
    else
      goto out_malformatted;
    sprintf(response_msg,
            "%d:%d:FOR:JOB_COMPLETED",
            job_id,
            client_fd);
    lm_send_to_nexxus(response_msg);
    }
  else
    {
    sprintf(response_msg,
            "%d:%d:FOR:JOB_ERROR:UNSUPPORTED_MESSAGE",
            job_id,
            client_fd);
    lm_send_to_nexxus(response_msg);
    }
  return;
out_malformatted:
  sprintf(response_msg,
          "%d:%d:FOR:JOB_ERROR:MALFORMATTED_MESSAGE",
          job_id,
          client_fd);
  lm_send_to_nexxus(response_msg);
}

static int node_add_client(NODELIST *node, int client_fd)
{
  int  I_c1;

  for (I_c1 = 0; I_c1 < 255; I_c1 ++)
    {
    if (node->subscribers[I_c1] == client_fd)
      {
      printf("[ICMP ECHO] client fd %d already subscribed!\n",client_fd);
      return(0);
      }
    }

  /* New subscription */
  if (node->timer_tag == -1)
    node->timer_tag = lm_timer_add(15,
                                    timer_cb,  
                                   node);
  for (I_c1 = 0; I_c1 < 255; I_c1 ++)
    {
    if (node->subscribers[I_c1] == -1)
      {
      node->subscribers[I_c1] = client_fd;
      return(0);
      }
    }
  if (I_c1 == 255)
    {
    printf("[ICMP ECHO] No client slots available\n");
    return(-1);
    }
  return(0);
}

static void node_remove_client(NODELIST *node, int client_fd)
{
  int  I_c1;
  int  cnt = 0;

  for (I_c1 = 0; I_c1 < 255; I_c1 ++)
    {
    if (node->subscribers[I_c1] == client_fd)
      {
      node->subscribers[I_c1] = -1;
      break;
      }
    }

  if (I_c1 == 255)
    return; // Not subscribed
  for (I_c1 = 0; I_c1 < 255; I_c1 ++)
    if (node->subscribers[I_c1] != -1)
      cnt++;
  if (cnt == 0)
    {
    lm_timer_remove(node->timer_tag);
    node->timer_tag = -1;
    }
}

static int do_ping(__uint32_t job_id, int client_fd, char *node_id)
{
  char            response_msg[255];
  char            SZ_command[255];
  char            SZ_line[255];
  NODELIST        *node;
  FILE            *pipe;
  struct in_addr  addr;
  char            *p = NULL;
  char            *q;
  int              I_c1;

  if (!(node = lookup_node(node_id)))
    {
    if (job_id)
      {
      sprintf(response_msg,
              "%d:%d:FOR:JOB_ERROR:NODE_NOT_FOUND",
              job_id,
              client_fd);
      lm_send_to_nexxus(response_msg);
      }
    return(-1);
    }
  addr.s_addr = node->ip_address;

  sprintf(SZ_command,"/bin/ping -c 1 %s", inet_ntoa(addr));

  if (!(pipe = popen(SZ_command,"r")))
    {
    if (job_id)
      {
      sprintf(response_msg,
              "%d:%d:FOR:JOB_ERROR:INTERNAL_ERROR (%m)",
              job_id,
              client_fd);
      lm_send_to_nexxus(response_msg);
      }
    return(-1);
    }
  while(fgets(SZ_line, sizeof(SZ_line), pipe))
    {
    p = q = NULL;
    SZ_line[(strlen(SZ_line)-1)] = 0x00;
    if (strstr(SZ_line,"bytes from"))
      {
      if (!(p = strstr(SZ_line, "time=")))
        {
        sprintf(response_msg,
                "%d:%d:FOR:JOB_ERROR:INTERNAL_ERROR (NO TIME)",
                job_id,
                client_fd);
        lm_send_to_nexxus(response_msg);
        pclose(pipe);
        return(-1);
        }
      p+=5;
      if (!(q = rindex(SZ_line, ' ')))
        {
        sprintf(response_msg,
                "%d:%d:FOR:JOB_ERROR:INTERNAL_ERROR (NO TIME)",
                job_id,
                client_fd);
        lm_send_to_nexxus(response_msg);
        pclose(pipe);
        return(-1);
        }
      *q=0;
      break;
      }
    }
  if (job_id)
    {
    if (!p)
      {
      sprintf(response_msg,
              "%d:%d:FOR:%s:TIMED_OUT",
              job_id,
              client_fd,
              node->node_id);
      lm_send_to_nexxus(response_msg);
      }
    else
      {
      sprintf(response_msg,
              "%d:%d:FOR:%s",
              job_id,
              client_fd,
              p);
      lm_send_to_nexxus(response_msg);
      }
    sprintf(response_msg,
            "%d:%d:FOR:JOB_COMPLETED",
            job_id,
            client_fd);
    lm_send_to_nexxus(response_msg);
    }
  else
    {
    for (I_c1 = 0; I_c1 < 255; I_c1 ++)
      {
      if (node->subscribers[I_c1] != -1)
        {
        if (!p)
          {
          sprintf(response_msg,
                  "0:%d:FOR:%s:TIMED_OUT",
                  node->subscribers[I_c1],
                  node->node_id);
          }
        else
          {
          sprintf(response_msg,
                  "%d:%d:FOR:%s",
                  job_id,
                  client_fd,
                  p);
          }
        lm_send_to_nexxus(response_msg);
        }
      }
    }

  pclose(pipe);
  if (p)
    return(0);
  else
    return(-1);
}

static NODELIST  *node_create(char *node_id)
{
  NODELIST  *new;
  int        I_c1;

  if (!(new = (NODELIST  *) malloc(sizeof(NODELIST))))
    return(0);
  memset(new, 0, sizeof(NODELIST));
  if (!nodelist_head)
    nodelist_head = nodelist_tail = new;
  else
    {
    nodelist_tail->next = new;
    new->prev = nodelist_tail;
    nodelist_tail = new;
    }
  for (I_c1=0; I_c1 <255;I_c1++)
    new->subscribers[I_c1] = -1;
  new->timer_tag = -1;
  strcpy(new->node_id, node_id);
  return(new);
}

static NODELIST  *lookup_node(char *node_id)
{
  NODELIST  *scan;

  if (!node_id)
    return(NULL);
  scan = nodelist_head;
  while(scan)
    {
    if (!strcmp(node_id, scan->node_id))
      return(scan);
    scan = scan->next;
    }
  return(NULL);
}

static void  node_destroy(NODELIST  *node)
{
  if (node == nodelist_head)
    {
    nodelist_head = nodelist_head->next;
    if (nodelist_head)
      nodelist_head->prev = NULL;
    else
      nodelist_tail = NULL;
    } 
  else if (node == nodelist_tail)
    {
    nodelist_tail = nodelist_tail->prev;
    if (nodelist_tail)
      nodelist_tail->next = NULL;
    else
      nodelist_head = NULL;
    } 
  else
    {
    node->next->prev = node->prev;
    node->prev->next = node->next;
    }
  free(node);
}