NAME

libslack(agent) - agent module


SYNOPSIS

    #include <slack/std.h>
    #include <slack/agent.h>

    typedef struct Agent Agent;
    typedef int agent_action_t(Agent *agent, void *arg);
    typedef int agent_reaction_t(Agent *agent, int fd, int revents, void *arg);

    Agent *agent_create(void);
    Agent *agent_create_with_locker(Locker *locker);
    Agent *agent_create_measured(void);
    Agent *agent_create_measured_with_locker(Locker *locker);
    Agent *agent_create_using_select(void);
    Agent *agent_create_using_select_with_locker(Locker *locker);
    void agent_release(Agent *agent);
    void *agent_destroy(Agent **agent);
    int agent_rdlock(const Agent *agent);
    int agent_wrlock(const Agent *agent);
    int agent_unlock(const Agent *agent);
    int agent_connect(Agent *agent, int fd, int events, agent_reaction_t *reaction, void *arg);
    int agent_connect_unlocked(Agent *agent, int fd, int events, agent_reaction_t *reaction, void *arg);
    int agent_disconnect(Agent *agent, int fd);
    int agent_disconnect_unlocked(Agent *agent, int fd);
    int agent_transfer(Agent *agent, int fd, Agent *dst);
    int agent_transfer_unlocked(Agent *agent, int fd, Agent *dst);
    int agent_send(Agent *agent, int fd, int sockfd);
    int agent_send_unlocked(Agent *agent, int fd, int sockfd);
    int agent_recv(Agent *agent, int sockfd, agent_reaction_t *reaction, void *arg);
    int agent_recv_unlocked(Agent *agent, int sockfd, agent_reaction_t *reaction, void *arg);
    int agent_detail(Agent *agent, int fd);
    int agent_detail_unlocked(Agent *agent, int fd);
    const struct timeval * const agent_last(Agent *agent, int fd);
    const struct timeval * const agent_last_unlocked(Agent *agent, int fd);
    int agent_velocity(Agent *agent, int fd);
    int agent_velocity_unlocked(Agent *agent, int fd);
    int agent_acceleration(Agent *agent, int fd);
    int agent_acceleration_unlocked(Agent *agent, int fd);
    int agent_dadt(Agent *agent, int fd);
    int agent_dadt_unlocked(Agent *agent, int fd);
    void *agent_schedule(Agent *agent, long sec, long usec, agent_action_t *action, void *arg);
    void *agent_schedule_unlocked(Agent *agent, long sec, long usec, agent_action_t *action, void *arg);
    int agent_cancel(Agent *agent, void *action_id);
    int agent_cancel_unlocked(Agent *agent, void *action_id);
    int agent_start(Agent *agent);
    int agent_stop(Agent *agent);


DESCRIPTION

This module provides support for a generic agent programming model. Agents are like event loops except that while event loops only react to input events, agents can also take independent actions at specific times.

Unlike event loops, which are typically GUI-specific and receive input events by calling some concrete event retrieval function, input events for agents take the form of data transfers across file descriptors. This means that input events can come from any source and have any semantics. For example, to implement an event loop for a specific GUI using an agent, you'd have a separate thread or process that calls the GUI's event retrieval function and then sends each event to the agent across a pipe or socket.

Agents multiplex input sources using poll(2) (or select(2) if unavoidable) and multiplex timers for scheduled actions over poll(2)'s timeout facility using hierarchical timing wheels. If timers are not used, agents are just an alternate interface to poll(2). If input sources are not used, agents are just a multipurpose timer that doesn't use any signals.

Multiple agents can be connected to each other via pipes and sockets in arbitrary networks (in multiple threads or multiple processes on the same host or multiple hosts) and these connections may change over time.

It is expected that agents will generally be used to build highly scalable internet servers because connecting and disconnecting file descriptors and scheduling and cancelling timed actions are all O(1) operations and managing timers has constant average time. If two or more agents cooperate (on a system that has poll(2)), responding to input events can also be highly scalable (See the SCALABILITY section for details).

Agent *agent_create(void)

Creates an Agent object. On error, returns null with errno set appropriately. It is the caller's responsibility to deallocate the new agent with agent_release(3) or agent_destroy(3).

Agent *agent_create_with_locker(Locker *locker)

Equivalent to agent_create(3) except that multiple threads accessing the new agent will be synchronised by locker.

Agent *agent_create_measured(void)

Creates an Agent object that measures I/O activity. Such agents can be passed to the following functions to determine the level of I/O activity handled by the agent: agent_detail(3) which returns the level of detail available (this determines which of the subsequent functions may be called); agent_last(3) which returns the time that the most recent I/O event occurred; agent_velocity(3) which returns the rate of I/O events; agent_acceleration(3) which returns the rate of change of the I/O event rate; and agent_dadt(3) which returns the rate of change of the rate of change of the I/O event rate. These functions can be applied to individual descriptors or to the agent as a whole. These agents can be combined to produce a fast/slow lane structure that improves scalability of I/O with respect to the number of connected descriptors. See the SCALABILITY section below for more details. On error, returns null with errno set appropriately. It is the caller's responsibility to deallocate the new agent with agent_release(3) or agent_destroy(3). Note, if this system does not have poll(2), this function is not very useful.

Agent *agent_create_measured_with_locker(Locker *locker)

Equivalent to agent_create_measured(3) except that multiple threads accessing the new agent will be synchronised by locker.

Agent *agent_create_using_select(void)

Equivalent to agent_create(3) except that the agent created will use select(2) instead of poll(2). This should only be used under Linux when accurate 10ms timers are required (see the BUGS section for details). It should not be used for I/O (see the SCALABILITY section for details).

Agent *agent_create_using_select_with_locker(Locker *locker)

Equivalent to agent_create_using_select(3) except that multiple threads accessing the new agent will be synchronised by locker.

void agent_release(Agent *agent)

Releases (deallocates) agent.

void *agent_destroy(Agent **agent)

Destroys (deallocates and sets to null) *agent. Returns null. Note: agents shared by multiple threads must not be destroyed until after all threads have finished with it.

int agent_rdlock(const Agent *agent)

Claims a read lock on agent (if agent was created with a Locker). This is needed when multiple read only agent(3) module functions need to be called atomically. It is the caller's responsibility to call agent_unlock(3) after the atomic operation. The only functions that may be called on agent between calls to agent_rdlock(3) and agent_unlock(3) are any read only agent(3) module functions whose name ends with _unlocked. On success, returns 0. On error, returns an error code.

int agent_wrlock(const Agent *agent)

Claims a write lock on agent (if agent was created with a Locker). This is needed when multiple read/write agent(3) module functions need to be called atomically. It is the caller's responsibility to call agent_unlock(3) after the atomic operation. The only functions that may be called on agent between calls to agent_wrlock(3) and agent_unlock(3) are any agent(3) module functions whose name ends with _unlocked. On success, returns 0. On error, returns an error code.

int agent_unlock(const Agent *agent)

Unlocks a read or write lock on agent obtained with agent_rdlock(3) or agent_wrlock(3) (if agent was created with a locker). On success, returns 0. On error, returns an error code.

int agent_connect(Agent *agent, int fd, int events, agent_reaction_t *reaction, void *arg)

Connect the descriptor, fd, to agent. events specifies the input/output events of interest. It is a bitmask of the following values: R_OK, W_OK and X_OK indicating, respectively, readability, writability and exceptional condition (i.e. arrival of out of band data). When any of the specified events occur, the function, reaction, will be called with four arguments: agent, fd, revents (the bitmask of the events that occurred), and arg. If fd is already connected, the existing events, reaction and arg are replaced with the new values. On success, returns 0. On error, returns -1 with errno set appropriately.

int agent_connect_unlocked(Agent *agent, int fd, int events, agent_reaction_t *reaction, void *arg)

Equivalent to agent_connect(3) except that agent is not write locked.

int agent_disconnect(Agent *agent, int fd)

Disconnect the descriptor, fd, from agent. agent will no longer respond to input/output events that occur on fd. On success, returns 0. On error, returns -1 with errno set appropriately.

int agent_disconnect_unlocked(Agent *agent, int fd)

Equivalent to agent_disconnect(3) except that agent is not write locked.

int agent_transfer(Agent *agent, int fd, Agent *dst)

Transfers the connected descriptor, fd, from agent to dst. The activity data for fd (i.e. time of last event, velocity, acceleration and dadt) are transferred as well. Both agent and dst must be agents created using agent_create_measured(3). On success, returns 0. On error, returns -1 with errno set appropriately. Note this only works for agents in separate threads. To transfer a descriptor to another agent in another process on the same host, use agent_send(3) and agent_recv(3).

int agent_transfer_unlocked(Agent *agent, int fd, Agent *dst)

Equivalent to agent_transfer(3) except that agent is not write locked. Note that dst is still write locked.

int agent_send(Agent *agent, int fd, int sockfd)

Transfers the connected descriptor, fd, from agent to a receiving agent on the other end of the UNIX domain stream socket, sockfd. Both the sending and the receiving agent must have been created using agent_create_measured(3). The receiving agent must call agent_recv(3) to receive the descriptor. The activity data for fd (i.e. time of last event, velocity, acceleration and dadt) are transferred as well. The events to react to and the reaction function and its argument are also sent to the receiving agent but note that the reaction function and its argument will be meaningless if the receiving agent exists in an unrelated process. They are passed just in case the processes are related and the reaction function's argument points to shared memory. If not, the receiving agent must specify a new reaction function and argument in the call to agent_recv(3). If the receiving agent exists in a separate thread, agent_transfer(3) should be used instead. It is much faster. On success, returns 0. On error, returns -1 with errno set appropriately. Note that this function does not close fd. The caller must do this. Note that there is no provision for returning errors encountered by the receiving process to the sending process. If this is a problem, use threads instead and call agent_transfer(3).

int agent_send_unlocked(Agent *agent, int fd, int sockfd)

Equivalent to agent_send(3) except that agent is not write locked.

int agent_recv(Agent *agent, int sockfd, agent_reaction_t *reaction, void *arg)

Receives a descriptor from the UNIX domain stream socket, sockfd, and connects it to agent. reaction and arg are used when connecting the descriptor to agent all other data is received along with the descriptor. On success, returns the descriptor received. On error, returns -1 with errno set appropriately.

int agent_recv_unlocked(Agent *agent, int sockfd, agent_reaction_t *reaction, void *arg)

Equivalent to agent_recv(3) except that agent is not write locked.

int agent_detail(Agent *agent, int fd)

Returns the level of detail in the activity data available for the descriptor fd handled by agent. If fd is -1, returns the number of level of detail available for agent itself. On error, returns -1 with errno set appropriately.

If 0 is returned, there have been no I/O events for fd (or agent if fd is -1), so no activity data is available. If 1 is returned, there has been one I/O event so only agent_last(3) may be called with the same fd argument. If 2 is returned, there have been two I/O events, so agent_last(3) and agent_velocity(3) may be called with the same fd argument. If 3 is returned, there have been 3 I/O events so agent_last(3), agent_velocity(3) and agent_acceleration(3) may be called with the same fd argument. If 4 is returned, there have been at least 4 I/O events so agent_last(3), agent_velocity(3), agent_acceleration(3) and agent_dadt(3) may be called with the same fd argument.

These functions may be used to implement algorithms that determine whether or not a given descriptor should remain with a given agent, or be transferred to another agent using agent_transfer(3) or agent_send(3) and agent_recv(3). See the SCALABILITY section.

int agent_detail_unlocked(Agent *agent, int fd)

Equivalent to agent_detail(3) except that agent is not read locked.

const struct timeval * const agent_last(Agent *agent, int fd)

Returns the time of the most recent I/O event for the descriptor fd handled by agent. If fd is -1, returns the time of the most recent event handled by agent for any descriptor. On error, returns -1 with errno set appropriately. Note: This function may only be called after agent_detail(3) has returned a value greater than 0 for the same fd argument.

const struct timeval * const agent_last_unlocked(Agent *agent, int fd)

Equivalent to agent_last(3) except that agent is not read locked.

int agent_velocity(Agent *agent, int fd)

Returns the number of milliseconds that elapsed between the last two I/O events for the descriptor fd handled by agent. If fd is -1, returns the number of milliseconds that elapsed between the last two events handled by agent for any descriptor. Large return values indicate less I/O activity. On error, returns -1 with errno set appropriately. Note: This function may only be called after agent_detail(3) has returned a value greater than 1 for the same fd argument.

int agent_velocity_unlocked(Agent *agent, int fd)

Equivalent to agent_velocity(3) except that agent is not read locked.

int agent_acceleration(Agent *agent, int fd)

Returns the rate of change of the velocity of I/O events for the descriptor fd handled by agent. If fd is -1, returns the rate of change of I/O events for agent for any descriptor. Negative return values indicate acceleration. Positive return values indicate deceleration. A zero return value indicates no acceleration. The larger the magnitude of the return value, the greater the acceleration or deceleration. On error, returns -1 with errno set appropriately. Note: This function may only be called after agent_detail(3) has returned a value greater than 2 for the same fd argument.

int agent_acceleration_unlocked(Agent *agent, int fd)

Equivalent to agent_acceleration(3) except that agent is not read locked.

int agent_dadt(Agent *agent, int fd)

Returns the rate of change of the rate of change of I/O events for the descriptor fd handled by agent. If fd is -1, returns the rate of change of the rate of change of I/O events for agent for any descriptor. Negative return values indicate that acceleration or deceleration is increasing. Positive return values indicate that acceleration or deceleration is decreasing. A zero return value indicates that acceleration or deceleration is constant. The larger the magnitude of the return value, the greater the increase or decrease in acceleration or deceleration. On error, returns -1 with errno set appropriately. Note: This function may only be called after agent_detail(3) has returned a value greater than 3 for the same fd argument.

int agent_dadt_unlocked(Agent *agent, int fd)

Equivalent to agent_dadt(3) except that agent is not read locked.

void *agent_schedule(Agent *agent, long sec, long usec, agent_action_t *action, void *arg)

Schedule agent to invoke action in sec seconds and usec microseconds. Note, however, that timer precision is in 10ms units. When the timer expires, action is invoked. It is passed two arguments: agent and arg. On success, returns an action identifier that may be used to cancel the action with agent_cancel(3). On error, returns <-1> with errno set appropriately.

void *agent_schedule_unlocked(Agent *agent, long sec, long usec, agent_action_t *action, void *arg)

Equivalent to agent_schedule(3) except that agent is not write locked.

int agent_cancel(Agent *agent, void *action_id)

Cancel an action that was scheduled with agent_schedule(3). action_id is the value returned by agent_schedule(3). It is the caller's responsibility to ensure that this function is not passed an action_id that corresponds to an action that has already executed (since the action will have been deallocated). On success, returns 0. On error, returns -1 with errno set appropriately.

int agent_cancel_unlocked(Agent *agent, void *action_id)

Equivalent to agent_cancel(3) except that agent is not write locked.

int agent_start(Agent *agent)

Starts agent. The agent will react to events on connected descriptors and execute scheduled actions until there are no connected file descriptors and no scheduled actions or until agent_stop(3) is called. It is the caller's responsibility to ensure that action and reaction functions will not take too long to execute. If they are going to take more than a few milliseconds, consider having them execute in their own detached thread. Otherwise, actions scheduled for the near future (e.g. 10ms) will not execute until they have finished. Of course, when there are no scheduled actions, this doesn't matter. On success, returns 0. On error, returns -1 with errno set appropriately. If any action or reaction function returns -1, returns -1. Note that you cannot call agent_start(3) on agent in one of it's action or reaction functions.

int agent_stop(Agent *agent)

Stops agent. All connected descriptors and scheduled actions remain intact and agent can be started again with agent_start(3). Note that any actions scheduled to occur while agent is stopped will be executed when agent_start(3) is next called. On success, returns 0. On error, returns -1 with errno set appropriately.


ERRORS

On error, errno is set either by an underlying function, or as follows:

EINVAL

When arguments to any of the functions is invalid.

When agent_start(3) is called on an agent that isn't idle.

When agent_stop(3) is called on an agent that isn't started.


MT-Level

MT-Disciplined


SCALABILITY

There are two aspects to the scalability of agents: scalability with respect to the number of scheduled actions and scalability with respect to the number of connected file descriptors.

The timers for scheduled actions are multiplexed over the timeout facility provided by poll(2) using a state of the art data structure for timing facilities (hierarchical timing wheels) which guarantees constant time to start and stop timers and constant average time to maintain timers so that thousands of timers may be outstanding without performance penalty.

Adding and removing connected file descriptors take constant time but maintaining them is O(n) where n is the number of connected file descriptors. That wouldn't be a problem if all of the file descriptors were active since work would have to be done reacting to all the events anyway, but if only a few file descriptors are active, both the kernel and the application waste significant effort examining the elements of the pollfd array that correspond to the inactive file descriptors. Over a WAN such as the internet, inactive file descriptors typically far outnumber active file descriptors since many connections can be waiting for lost packets to be retransmitted.

To implement a portable internet service that scales well with respect to the number of inactive file descriptors, use two agents, each running in it's own thread. The first only deals with active file descriptors. The second only deals with inactive file descriptors. These agents swap file descriptors between themselves as their activity changes. Agents measure the activity of each file descriptor to facilitate this. The result is one thread being woken up frequently but only dealing with a small number of active file descriptors each time, and another thread being woken up infrequently and dealing with a large number of file descriptors each time. The second thread still wastes effort but it does so less often. Credit goes to Richard Gooch for this "fast/slow lane" approach. To reduce overhead further, more agents could be created to deal with the inactive file descriptors (multiple slow lanes) but it's unlikely to be worthwhile on hosts with a single processor. Note that one process can pass an open file descriptor to another process, so these agents could exist in separate processes but it's not as fast.

The simpler, traditional approach is to just have multiple pre-forked servers, each accept(2)ing connections. The set of connections will then be split between the servers. Experiments indicate that the connections are split evenly between the servers but if the active connections are split between multiple servers, then the context switching overhead of multiple threads waking up could outweigh the savings gained by splitting up the connections into smaller sets. In the worst case, all of the threads might be woken up at the same time resulting in the entire set of connections being processed. This is precisely the problem we are trying to avoid but we've added context switching overhead as well. Another thing to note is that since this method is usually implemented with select(3), rather than poll(2), the effort wasted is far greater. Consider 1000 connections split between 10 pre-forked servers using select(3). Assume for simplicity that the first 100 connections are handled by the first server, the next 100 connections are handled by the second server, and so on. Due to the fact that select(2) uses bitsets to record the file descriptors of interest, and has to check every bit up to the one corresponding to the highest numbered file descriptor, the total number of bits checked would be 1000 + 900 + 800 + 700 + 600 + 500 + 400 + 300 + 200 + 100 = 5500. In the worst case it would be 1000 + 999 + 998 + 997 + 996 + 995 + 994 + 993 + 992 + 991 = 9955. That's an order of magnitude more work than the obvious single threaded approach.


EXAMPLES

Trivial example: Read from stdin and timeout after 5 seconds with no input

    #include <slack/std.h>
    #include <slack/agent.h>

    void *timeout;

    int action(Agent *agent, void *arg)
    {
        return agent_stop(agent);
    }

    int reaction(Agent *agent, int fd, int revents, void *arg)
    {
        char buf[BUFSIZ];
        ssize_t bytes;

        // Reschedule timeout for 5 seconds into the future
        // Note: action hasn't executed or we wouldn't be here

        if (agent_cancel(agent, timeout) == -1)
            return -1;

        if (!(timeout = agent_schedule(agent, 5, 0, action, NULL)))
            return -1;

        // Read from fd and write to stdout

        if ((bytes = read(fd, buf, BUFSIZ)) == -1)
            return -1;

        if (bytes && write(STDOUT_FILENO, buf, bytes) == -1)
            return -1;

        // Disconnect fd upon EOF

        if (bytes == 0 && agent_disconnect(agent, fd) == -1)
            return -1;

        return 0;
    }

    int main(int ac, char **av)
    {
        Agent *agent;
        int rc;

        // Create an agent

        if (!(agent = agent_create()))
            return EXIT_FAILURE;

        // Schedule an action

        if (!(timeout = agent_schedule(agent, 5, 0, action, NULL)))
            return EXIT_FAILURE;

        // Connect standard input

        if (agent_connect(agent, STDIN_FILENO, R_OK, reaction, NULL) == -1)
            return EXIT_FAILURE;

        // Start the agent

        while ((rc = agent_start(agent)) == -1 && errno == EINTR)
        {}

        return (rc == -1) ? EXIT_FAILURE : EXIT_SUCCESS;
    }


BUGS

Linux (at least 2.2.x and 2.4.x) has a bug in poll(2) that can wreak havoc with timers. If you specify a timeout of between 10n-9 and 10n ms (where n >= 1) under Linux, poll(2) will timeout after 10(n+1) ms instead of 10n ms like select(2). This means that if you ask poll(2) for a 10ms timeout, you get a 20ms timeout. If you ask for 20ms, you get 30ms and so on. As a workaround, the agent module subtracts 10ms from timeouts greater than 10ms under Linux. This means that (under Linux) you can't have a 10ms timer but you can have 20ms, 30ms and so on. It also means that if two actions are scheduled to occur 10ms apart, the second action will execute 20ms after the first. If you need accurate 10ms timers under Linux, use agent_create_using_select(3) instead of agent_create(3). This will create an agent that uses select(2) instead of poll(2). Note, however, that select(2) is unscalable with respect to the number of connections and hence can't be used in a fast/slow lane server (See the SCALABILITY section for details). If accurate 10ms timers and scalable I/O are both required under Linux, use agent_create(3) for all agents that will handle I/O and use agent_create_using_select(3) for a separate agent that will handle actions. Note that on systems whose poll(2) does not have this bug (e.g. Solaris), this isn't necessary. Also note that on systems that don't have poll(2) (e.g. Mac OS X), agents will always use select(2) and hence can't be used in a fast/slow lane server.

It is an error to call agent_cancel(3) for an action that has already happened (because the memory associated with the action is deallocated when it is executed). Unfortunately, there is no guaranteed atomic way to tell if an action has already occurred. If it is necessary to be able to safely cancel scheduled actions, the client must provide the necessary safeguards itself. This could prove difficult. The simplest safe way to cancel is to do so from another action that was scheduled at least 10ms before the action being cancelled. Alternatively, you could disable, rather than cancel, an action by modifying a global variable that it checks before doing anything.

If an action or reaction take a long time to run and an action scheduled for the near future misses it's schedule, the agent will catch up, executing any missed actions (better late than never). Unfortunately, there is no way to distinguish between an action or reaction taking a long time to run and the system's clock being set forward. So, if the system's clock is set forward, the agent will execute all actions scheduled for the missing time. The solution is to run an NTP daemon on the system to maintain accurate system time. Then, there would never be a large enough change to the system time to cause problems.


SEE ALSO

libslack(3), poll(2), select(2)


AUTHOR

20100612 raf <raf@raf.org>