Evaluating the Expressiveness and Simplicity of Our Approach

 In this experiment we are comparing the complexity of the Chord algorithm implementation in four platforms: Macedon, PlanetSim, PeerSim, and PyActive. PlanetSim and PeerSim are P2P event simulators implemented in the Java language. They are very popular and widely used in the P2P community and they provide a simple framework for developing decentralized scalable algorithms. Continuation Complexity: A Callback Hell for Distributed Systems 295 Macedon is a Domain Specific Language (DSL) that generates code for the Ns-2 simulator and C++ code with sockets for experimentation. The DSL is based on event-driven finite state machines and it claims a reduction in lines of code and simplicity of the implemented distributed algorithms.

Even if we are comparing different programming languages (Java in PeerSim and Planetsim, DSL in Macedon, Python in our Actor library) the comparison is useful to understand different approaches of implementing the same Chord algorithm. PlanetSim and PeerSim are good examples of the Callback Hell since they require complex callback handlers and message programming. They had to break the elegant Chord RPCs into different code fragments clearly showing our Continuation Complexity Problem.

Macedon is a different approach that uses a DSL and state machines to simplify message handling. But again, it resulted in a complex code that is far from the simplicity of Chord sequential code using RPCs.

Before comparing the code, it is important to outline that the different versions are not implementing the Chord algorithm exactly as stated in the original article. For example, Macedon only implements a successor list of size 2, and their fix finger protocol is using the simpler update others variant that is not recommended for real settings.

Regarding PeerSim, it is important to outline that their implementation is not completely event-based because they also use object invocation short-cuts to simplify the code. In this line, Nodes access the getProtocol() method that provides a clone of the desired node. Finally, PlanetSim provides a fully event-oriented implementation of Chord, but the implementation of the successor list does not consider all possible cases and errors in the protocol.

 

As we can see in the Fig. 4 our approach (PyActive, JPyActive) is providing the simplest solution. Note that we also implemented a Java version (JPyactive) to be fair in the comparison with other Java-based solutions like PlanetSim or Peersim. 

 Our implementation has less LoC. Furthermore, our implementation is simpler and easier to understand that any of the presented alternatives. Our object oriented model is straightforward and it does not need additional understanding of messages, handlers and states. Macedon, PeerSim, and PlanetSim will require an understanding of messages and transitions that is more intricate that the simplicity of sequence diagrams in an object oriented design.

It is worth comparing the different approaches in code complexity (i.e., Cyclomatic Complexity). Again, our implementation is beating the other proposals in the overall complexity and in the most complex method. One important reason is that our model avoids large message handling conditionals because messages are cleanly mapped to methods. Furthermore, synchronous calls that return results are naturally mapped to method invocations, whereas event-based approaches must split these invocations in requests and responses.

As we can see, even accepting that the Python language produces less lines of code compared to Java (e.g., PeerSim or PlanetSim), we have demonstrated that our additional reduction in terms of complexity and LoC is very meaningful. There are two main reasons: continuation complexity and message handling complexity. The continuation complexity is reduced in our model thanks to syn- chronous calls masked by proxies. The message handling complexity is reduced by the transparent mapping of messages to method calls in the Active Object pattern.

In particular, four methods in the Chord original algorithm present Continuation Complexity: find predecessor, fix finger, stabilize, and join. All of them should be changed in an asynchronous programming model, and thus requiring different code fragments (request, response, and timeout) for RPCs. In our case, we implemented Chord as a slight modification of Chord’s original OOP code. Just by annotating the remote abstractions in the class, we can run Chord in an Actor library that also permits remote Actors in a transparent way.

On the contrary, the rest of the implementations (Macedon, PlanetSim, Peer- Sim) suffer from the continuation complexity problem in different degrees. Each takes a different strategy but all of them need to break every RPC in two code fragments (request and response). Regarding Timeouts, some declare a time- out function for every method, or they can reuse the timeout handler for all methods. So for example, Macedon is breaking find predecessor in three code fragments (request, reply, timeout), but they are duplicated for different states of the protocol (joined, joining). Macedon does not add another function for the iteration but the additional code is found inside the response handler. PlanetSim and PeerSim also include handlers for requests, responses and timeouts inside their code implementing implicit continuations.

Description

PyActive is a novel object oriented implementation of the Actor model in Python. What it is novel in our approach is the tight integration with OO concepts that considerably simplifies the use of this library.

In particular, PyActive provides non-blocking synchronous invocations in Actors. Unlike traditional Actor frameworks that only provide asynchronous calls, our library offers synchronous non-blocking calls that respect and maintain the message-passing concurrency model. To this end, we have implemented a variant of the Active Object pattern using both Threads and micro-Threads (Stackless). We demonstrate in complex examples like the Chord overlay that our approach reduces substantially the complexity and Lines of Code (LoC) of the problem.

PyActive follows a pure object oriented approach for method invocation. Other actor frameworks use special notations (!) for sending messages and pattern matching or conditionals to receive them. Instead of that, Pyactive middleware transparently maps messages to methods and thus achieving better code expressiveness.

PyActive also includes advanced abstractions like remote reference passing, one-to-many invocation abstractions, and exception handling to ease the implementation of distributed algorithms. PyActive is also a distributed object middleware and it offers remote dispatchers enabling remote method invocation. Finally, PyActive's log mechanisms can generate UML sequence diagrams that help to understand the interactions among Actors using a OO aproach.

PyActive is now provided in two platforms: using cooperative microthreads on top of Stackless Python and on top of python threads using the standard threading library. We validated the performance and expressiveness of Pyactive to code distributed algorithms.

Basic method abstractions

  • async: It’s used to indicate the method can receive asynchronous remote calls.

  • sync: It’s used to indicate the method can receive synchronous remote calls. So it’s necessary to return something.

  • parallel: It guarantees that the current method will not be blocked in a synchronous call by launching an additional thread of control. Our library ensures that no concurrency conflicts arise by ensuring that only one thread at a time can access the Passive Object.

  • ref: It’s used to activate the remote reference layer in this method. This means that one parameter or result are Actors. So this annotation guarantees pass-by-reference.

Basic Functions

  • start_controller: It's used to choose the module. At this moment, we can choose between 'atom_thread' and 'tasklet'. Note that this decision can change the python version that you need. For example the 'tasklet' module needs Stackless Python.

  • launch: It's used to throw the main function which initializes the program. Once this function ends, the program will die.

  • serve_forever. It’s used like launch function but once the function ends, the program continues.

What do you need to run PyActive?

In this section we explain all you need to use this middleware. It's easy!

Into Pyactive_Project folder you can find how to install the middleware in INSTALL.txt.

Requirements

  • If you only use the threads module, you only need Python 2.7

  • If you need use the stackless version, you need Python 2.7 with Stackless Python

You can download Python in: http://www.python.org/download/

Once you have installed python, the next step is to install Stackless python. You can download Stackless python at: [http://www.stackless.com/]

PyActive contains some examples and tests. You can run the following tests:

    $> cd/pyactive
    $> python ./examples/hello_world/hello_world_sync.py
    $> python ./examples/hello_world/hello_world_async.py

Choose the module using the function: 'start_controller'. Nowadays, you can put either the parameter 'tasklet' or 'pyactive_thread' to choose the module. Note that you choose the tasklet module, you need the Stacklees Python.


Context

The Continuation Complexity problem leads us to a paradox in Actor models: synchronous blocking calls are needed to correctly implement RPCs in order to avoid breaking the code into asynchronous messages and handlers. But Actor models are purely asynchronous, and using synchronous calls would block the calling Actor unique thread of control until the response arrives. 

The aforementioned paradox also represents a burden for Object Oriented Actor libraries. If only asynchronous calls are allowed, the resulting code breaks with the traditional object oriented paradigm and complicates the resulting code.

To solve this problem, we present a novel method invocation abstraction called parallel, enabling synchronous invocations to other Actors, without stalling the current Actor’s thread of control. Our parallel calls enable concurrent interleavings of method executions within a single Actor.

Concurrency Control

The major challenge is to enable concurrent interleaving of method executions within a single Actor. We mainly want to allow the main Actor thread to continue processing incoming method requests while the parallel thread is blocked waiting for a response in a remote Actor due to a synchronous invocation.

We present two solutions: one based on threads and locks and another based on green threads and continuations, 

Solution 1 Threads: The Actor implementation will spawn normal threads in the Scheduler and Parallel Methods threads. To achieve consistency between Scheduler and Parallel threads, we use traditional Thread Lock mechanisms that prevent multiple threads to access the same object at the same time. Similar to the Monitor Object pattern, we use a Lock in the Actor object that is only activated when parallel threads are spawned. We aim to provide here a simple solution to demonstrate that parallel threads can coexist with the Actor main thread without conflicting with the servant object shared state. Our lock mechanism is completely transparent to the developer, so that the simplicity of the message passing concurrency model is not affected. 

 

 

In the figure above we can see how our solution handles the concurrency problem. In this flow diagram we can observe where it uses the Lock acquire and Lock release primitives in a call flow. In addition, it is necessary to know that a single Lock is shared between Actor and Parallel Threads wrappers for each Actor. The life cycle is the following: 

  1. Acquire: When a call is incoming, if it exists some parallel call in the object, we need to acquire the lock to be sure that only one thread at this time is using the object. In Fig. 2, we show an example with a parallel synchronous call. At this moment, a parallel thread takes the control of the object, and any other thread can not access to the object.
  2. Release: The parallel thread sends the synchronous invocation to a remote actor. In this moment, the parallel actor releases the lock so that the main thread or other parallel threads can continue working. This is the key point where the Actor is not blocked until the response arrives, and it can process incoming messages.
  3. Acquire: While the parallel thread is waiting the response, our main thread and other parallel threads can serve other petitions. When the response arrives to the Proxy, the Parallel thread will be able to continue execut- ing the method. Nonetheless, it must try to acquire the Lock, because it is possible that the Lock is now in possession of another Thread.
  4. Release: Finally, when the multicalls return object method ends, the parallel synchronous wrapper will release the Lock, and it will end its process.

As we can see, our solution ensures that only one thread access the shared state, but also that waiting for a response in a parallel thread will not preclude the main thread to process incoming messages. Obviously, we control whether the response will not return using a timeout. Using this system we guarantee that the main thread will not be blocked even if the response does not arrive.

Our solution permits to increase the Actor service time because it avoids blocking the Actor during synchronous invocations. We guarantee the correct interleavings of Actor parallel and main threads. Furthermore, we maintain the simplicity of message passing concurrency since the developer is still unaware of this concurrency control mechanisms. He must only tag the appropriate methods as parallel when necessary.

 

Solution 2: Micro Threads and Continuations: In this case, we can use the same figure used in the solution 1, but removing the acquire and release invocations. In that case, we don’t need to use a lock system. This is because in a single-threaded environment, two microthreads cannot modify the same state at the same time because their execution is sequential.

We assume here that the send and receive primitives in microthreads will execute a context-switch to other green thread processing the communication to other Actor. Following, we will try to explain better this process, step by step:

  1. send: When a proxy sends a synchronous message to another Actor, it auto- matically releases its control to other microthread.
  2. receive: Parallel microthread will be inactive until it will receive a message. At moment that it will receive a message it will wake up and wait for its turn to continue the method execution. The implicit continuation implies that the code continues from that point.

Since send and receive policies are executed inside the Proxy, the developer is completely unaware of context-switches and continuations. Furthermore, in this case there is no need to use locks to prevent concurrent access.

Example using parallel

Note: If you remove the start method from parallel list, you will be able to look another result, because the node 1 will be blocked.

from pyactive.controller import init_host, launch, start_controller, sleep

class Node(object):
    _sync = {'send_msg':'50', 'return_msg':'50'}
    _async = ['print_some', 'start', 'start_n3', "registry_node"]
    _parallel = []
    _ref = ["registry_node"]

    def registry_node(self, n2):
        self.remote = n2

    def send_msg(self):
        print self.remote
        msg = self.remote.return_msg()
        print msg
        return True

    def return_msg(self):
        print 'im here'
        sleep(10)
        print 'after sleep'
        return 'Hello World'

    def print_some(self):
        print 'hello print some'

    def start(self):
        print 'call ...'
        msg = self.remote.return_msg()
        print msg

    def start_n3(self):
        for i in range(6):
            self.remote.print_some()

def test1():
    host = init_host()
    n2 = host.spawn_id('2','parallel1','Node',[])
    n1 = host.spawn_id('1','parallel1','Node',[])
    n3 = host.spawn_id('3','parallel1','Node',[])
    n2.registry_node(n1)
    n1.registry_node(n2)
    n3.registry_node(n1)
    n1.start()
    sleep(1)
    n3.start_n3()
    sleep(10)


def main():
    start_controller('pyactive_thread')
    launch(test1)

if __name__ == "__main__":
    main()

 

Hello_world example

In this section you can see a simple Hello World synchronous and asynchronous. In Pyactive_Project you can find more complex examples into Examples folder.

Moreover, we design a turorial with a lot of examples, you can find this tutorial in examples/turorial.

Hello_World Synchronous

    from pyactive.controller import init_host, launch,start_controller, sleep
    class Server():
        _sync = {'hello_world':'1'}
        _async = []
       	_parallel = []
       	_ref = []
    
def hello_world(self): return 'hello world' def test(): host = init_host() # parameters 1 = 'id', 'test_sync' = module name, 'Server' = class name n1 = host.spawn_id('1', 'test_sync', 'Server', []) response = n1.hello_world() print response
if __name__ == '__main__': #you can change the parameter 'pyactive_thread' to 'tasklet' if you would like to run the Stackless model controller. start_controller('pyactive_thread') launch(test)

Hello_World Asynchronous

    from pyactive.controller import init_host, launch,start_controller, sleep
    
class Server(): _sync = {} _async = ['hello_world'] _parallel = [] _ref = []
def hello_world(self): print 'hello world' def test(): host = init_host() # parameters 1 = 'id', 'test_async' = module name, 'Server' = class name n1 = host.spawn_id('1', 'test_async', 'Server', []) n1.hello_world()
if __name__ == '__main__': #you can change the parameter 'pyactive_thread' to 'tasklet' if you would like to run the Stackless model controller. start_controller('pyactive_thread') launch(test)

A complex issue in our platform is how to debug distributed code involving complex one-to-one and one-to-many abstractions among Actors. One of the major drawbacks of the Actor model is the complexity of debugging the code. But it is also true that debugging distributed applications that communicate using asynchronous messages and callbacks are notoriously hard to debug and follow. Our approach here is to offer good event tracing capacities for Actors. Hence, we offer a log system that allows register all the generated events through the actors.

To achieve this log system, PyActive only needs to intercept all the messages between actors. Although, we need an actor that receives all the intercepted messages, where the developer can handle this information (e.g. save the intercepted messages in a file, print the messages through standard output). 

As seen in other sections of this guide, we can bind new actors using spawn_id function. Therefore, to register the log actor, we need to do the same as all the actors, but in this case we need to provide to this actor with all the intercepted messages. Then, we going to explain how to do that.

Firstly, we need to create a new object, which have to have a method called notify. This method must be asynchronous and receive one parameter which will be an intercepted message. Once we have the object with this asynchronous method, the object will be ready to join in the host as an actor. At this point, we have an actor running with an asynchronous method named notify. Now is the moment to tell to PyActive, that this actor will be a log actor who will handle the intercepted messages.So, to indicate to PyActive who is the actor that will act as a log actor, we need to call the set_trace function which receives as a parameter the reference of the actor which has the log task assigned. Once we have called the set_trace function, the log system starts, so the log Actor starts to receive messages through the notify method. 

We decided this system methodology because the developer can choose what to do with the messages (save, show, filter...). In this section we show  two different examples about how to use this system: (1) a simple Log which only prints the messages it received, (2) we provide an UML logger that generates sequence diagrams from interactions among Actors.

Example 1: Simple Log 

This example shows a simple log actor that we can make.

class Log():
    _sync = {}
    _async = ['notify']
    _ref = []
    _parallel = []
   
    def notify(self, msg):
        #print all messages that it receives.
        print 'log system:', msg

The code above shows you a simple log actor, that implements the notify method, and prints the msg through the standard output.

In the code bellow we show you a full example, with a Server class and the Log class that we showed above. 

class Server():
    _sync = {'add': '1'}
    _async = ['substract']
    _ref = []
    _parallel = []
   
    def add(self,x,y):
        return x+y
    def substract(self,x,y):
        print 'substract',x-y

#This method initialize the test.
#Registry Log class. Call set_trace method.
def test_log():
    host = init_host()
    log = host.spawn_id('log', 'log','Log',[])
    host.set_tracer(log)
    ref = host.spawn_id('1','log','Server',[])
    ref.substract(6,5)
    print ref.add(5,5)
    sleep(1)
As you can see in the above code, we have the test_log function, where we create the host and bind the actors. Note that we link the actor log using the spawn_id function, and after that, we call the set_trace function at host, to communicate that the Log actor will be the actor charged to receive the intercepted messages. 
 
You can find this example in Examples folder of PyActive project.
 

Example 2. UML Log

We provide an UML logger that generates sequence diagrams from interactions among Actors. You can view above the code of log class that it generate a specific output document:

class LogUML():
    _sync = {}
    _async = ['notify', 'to_uml']
    _ref = []
    _parallel = []
    
    def __init__(self):
        self.events = []
        self.nameNodes = {}
        self.cnt2 = 0
        self.cont = 0
        self.nodes = {}
        self.titles = {}
        self.list_nodes = []
        
 

    def notify(self,msg):  
        self.events.append(msg.copy())
        
   
    def to_uml(self, filename):
        uml = []
        uml_names = []
        uml.append('\n')
        for msg in self.events:
            fromref = urlparse(msg[FROM])
            toref = urlparse(msg[TO])
            if not self.nodes.has_key(fromref.path):
                self.list_nodes.append(fromref.path)
                self.nodes[fromref.path] = fromref.path
            if not self.nodes.has_key(toref.path):
                self.list_nodes.append(toref.path)
                self.nodes[toref.path] = toref.path
            self.titles[fromref.path] = msg[FROM]
            self.titles[toref.path] = msg[TO]   

        while(self.cnt2 < len(self.list_nodes)):
            evt = 'n'+str(self.cnt2)+':Process[p] "'+self.titles[self.list_nodes[self.cnt2]]+'"\n'
            self.nameNodes[self.titles[self.list_nodes[self.cnt2]]] = "n"+str(self.cnt2)
            uml_names.append(evt)
            self.cnt2 += 1
uml.append('\n') for msg in self.events: if msg[TYPE]==CALL: fromref = urlparse(msg[FROM]) toref = urlparse(msg[TO]) nfrom = 'n'+str(self.list_nodes.index(fromref.path)) nto = 'n'+str(self.list_nodes.index(toref.path)) if isinstance(msg[PARAMS], list): params = [] for node in msg[PARAMS]: if isinstance(node, Ref): params.append(self.nameNodes.get(str(node.get_aref()))) else: params.append(node) evt = nfrom+':'+nto+'.'+msg[METHOD]+'('+str(params)+')\n' else: evt = nfrom+':'+nto+'.'+msg[METHOD]+'('+str(msg[PARAMS])+')\n' else: fromref = urlparse(msg[FROM]) toref = urlparse(msg[TO]) nfrom = 'n'+str(self.list_nodes.index(fromref.path)) nto = 'n'+str(self.list_nodes.index(toref.path)) if isinstance(msg[RESULT], list): result = [] for node in msg[RESULT]: result.append(self.nameNodes.get(str(node.get_aref()))) evt = nfrom+':'+nto+'.'+msg[METHOD]+'()='+str(result)+'\n' elif isinstance(msg[RESULT], Ref): evt = nfrom+':'+nto+'.'+msg[METHOD]+'()='+self.nameNodes.get(str(msg[RESULT].get_aref()))+'\n' else: evt = nfrom+':'+nto+'.'+msg[METHOD]+'()='+str(msg[RESULT])+'\n' try: uml.append(evt) except: None self.events = []
# (!) save name write_uml(filename+'_names.sdx', uml_names)
# (2) save interactions among actors write_uml(filename+'.sdx', uml) def write_uml(filename, events): f3 = open(filename, 'a') f3.writelines(events) f3.close()
As seen in code, we need to call to_uml method periodically, that it transform the messages in UML format. Finally save the results in a file using write_uml method. We separate the uml code in two files: (1) save the names of actors using references, (2) save the interactions among actors. We take this decision because of it's better when exists the possibility to join nodes in a future. Using this solution, final step before generate the sequence diagram, is concatenate two files generated for UMLLog. 
 
To view the diagram, we use the sdedit software, which you can obtain in this url: http://sdedit.sourceforge.net. Once concatenated two files, only is necessary open file with sdedit. You obtain a beautiful sequence diagram easily. 
 
Next image show you a sequence diagram example with three nodes using chord algorithm:
 
 
 
 
 Sequence Diagram