Computer cluster

Computer cluster

A Computer cluster is set of computers(nodes) which working together, it can be interpreted as single system where each computer is processor. All of computers working on the same task and this bunch of computers connected in one network. One node can share data to another node, and master node can share data to slave nodes.

Computer cluster usually used to increase performance and availability over than a single computer. It can be used in many cases, started from web-service support to high-performance computation. Usually each node run own instance of the operating system, but nodes can use the same operating system and use some setups(like OSCAR). Also for increasing performance computing, clusters using parallel computing such as MPI.

Beowulf cluster

Beowulf cluster

One of the most popular cluster is Beowulf cluster. Beowulf cluster is Open Source scalable performance cluster, on a small LAN. Beowulf, it’s kind of software recipe, what software you need to use to build cluster. They include OpenMPI or MPICH, PVM, Linux kernel and many others. It’s can be interpreted as ‘build computer cluster at home’, that allow building a hight-performance cluster on cheap PC hardware.

Our ‘cluster’

Simple cluster

In this topic, I will show you simple version of cluster computing without sharing information between nodes, and without any cluster OS or cluster software. For simplification, just imagine you have a lot of different remote computers in different regions, and every computer computes some function f(x). Function f and parameter x can be the same on every computer or depends on the computer.

It will be a simple C++ code which will be run on a computer, and for communication, we will use UDP protocol. It allows to us greatly simplify integrating computation cluster, we just need include header, implement response on message(receive a new parameter for computation, get intermediate results, etc…) and implement our computation function.

#include "socket.h"

long long val = 1000;
void on_message(int sockfd, char* msg, struct sockaddr *their_addr, socklen_t addr_len){
    sendto(sockfd, &val, sizeof(val), 0, their_addr, addr_len);
}
void calcFunc(void*){
    while(true){
        val++;
        val%=10;
    }
}
int main(){
    CalculationSocketManager calculationSocketManager("4950",on_message); //Run on port 4950

    calculationSocketManager.start(calcFunc);
}

Implementation

Instead of TCP we will use UDP, because we do not need continuous connection. For implementation we will use <sys/socket.h> header, it available for *nix machines, I run it on MacOS machine. If you are using Windows machine please include <winsock2.h> header, it describes here.

Firstly we need to create and bind UDP socket.

int sockfd;
struct addrinfo hints, *servinfo, *p;

int rv;
//Convert port to address info
if ((rv = getaddrinfo(NULL, this->port, &hints, &servinfo)) != 0) {
    fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(rv));
    return 1;
}

// loop through all the results and bind to the first we can
for(p = servinfo; p != NULL; p = p->ai_next) {
    //Create
    if ((sockfd = socket(p->ai_family, p->ai_socktype,
                         p->ai_protocol)) == -1) {
        continue;
    }
    //Bind
	if (bind(sockfd, p->ai_addr, p->ai_addrlen) == -1) {
        close(sockfd);
        continue;
    }
    break;
}
if (p == NULL) {
    fprintf(stderr, "Cannot create/bind socket!\n");
    return 2;
}

After that, we need to receive a message from the socket.

 while(true) {
    if ((numbytes = recvfrom(sockfd, buf, MAXBUFLEN - 1, 0,
                             (struct sockaddr *) &their_addr, &addr_len)) == -1) {
        perror("recvfrom");
        exit(1);
    }

	//Function pointer
    onmessage(sockfd,buf,(struct sockaddr *) &their_addr, (socklen_t) addr_len);

    buf[numbytes] = '\0';
}

onmessage is function pointer aka callback, which we need to implement. In our case, it will be a response to master node request, for example, send current intermediate result and it will depend on the message which will be received.

typedef void(*onmessage_callback)(int, char*, struct sockaddr *, socklen_t);

onmessage_callback onmessage;

...

char buf[2048];

void on_message(int sockfd, char* msg, struct sockaddr *their_addr, socklen_t addr_len){
    float progress = 0.12;
    int val = 1011;
    int n = sprintf(buf,"%d,%f",val, progress);
    sendto(sockfd, buf, n, 0, their_addr, addr_len);
}

onmessage = on_message;

Here we just send a message with 2 values each time, when we receive a request. But in working example, we will send current calculated value and current progress.

As we can see above we are waiting for the incoming message when we are waiting, current thread is blocked! So we need to create tread and receive a message in another thread. But firstly we need to create a class with all code above.

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <netdb.h>

#include <pthread.h>

#ifndef SERVER_SOCKET_H
#define SERVER_SOCKET_H


typedef void(*onmessage_callback)(int, char*, struct sockaddr *, socklen_t);

class ServerSocket{
    char* port;
public:
    onmessage_callback onmessage;
    ServerSocket(const char* port, onmessage_callback onmessage){
        this->port = new char[strlen(port)];
        strcpy(this->port, port);

        this->onmessage = onmessage;
    }
    ~ServerSocket(){
        delete this->port;
    }

    int start(){
        int sockfd;
        struct addrinfo hints, *servinfo, *p;
        int rv;
        int numbytes;
        struct sockaddr_storage their_addr;
        int MAXBUFLEN = 128;

        char buf[MAXBUFLEN];
        socklen_t addr_len;
        char s[INET6_ADDRSTRLEN];

        memset(&hints, 0, sizeof hints);
        hints.ai_family = AF_UNSPEC; // set to AF_INET to force IPv4
        hints.ai_socktype = SOCK_DGRAM;
        hints.ai_flags = AI_PASSIVE; // use my IP

        if ((rv = getaddrinfo(NULL, this->port, &hints, &servinfo)) != 0) {
            fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(rv));
            return 1;
        }

        // loop through all the results and bind to the first we can
        for(p = servinfo; p != NULL; p = p->ai_next) {
            if ((sockfd = socket(p->ai_family, p->ai_socktype,
                                 p->ai_protocol)) == -1) {
                perror("listener: socket");
                continue;
            }

            if (bind(sockfd, p->ai_addr, p->ai_addrlen) == -1) {
                close(sockfd);
                perror("listener: bind");
                continue;
            }

            break;
        }

        if (p == NULL) {
            fprintf(stderr, "listener: failed to bind socket\n");
            return 2;
        }

        freeaddrinfo(servinfo);


        addr_len = sizeof their_addr;

        while(true) {
            if ((numbytes = recvfrom(sockfd, buf, MAXBUFLEN - 1, 0,
                                     (struct sockaddr *) &their_addr, &addr_len)) == -1) {
                perror("recvfrom");
                exit(1);
            }


            onmessage(sockfd,buf,(struct sockaddr *) &their_addr, (socklen_t) addr_len);

            buf[numbytes] = '\0';
        }
        close(sockfd);
    }
};
#endif

File sluster.h

For creation new thread we will use phtreads. It really simple to implement thread with pthreads, also you use C++11 std::thread.

void* socket_func(void* server_socket){
    ServerSocket* serverSocket = (ServerSocket*)server_socket;
    serverSocket->start();
}


pthread_t socket_thread;
ServerSocket serverSocket("4950", &onMessage);

if(pthread_create(&socket_thread, NULL,socket_func, serverSocket)) {
    fprintf(stderr, "Error creating socket thread\n");
    return 1;
} 

The thread will be started after creation, and you need to implement onMessage function. Let’s wrap it into class.

...


class CalculationSocketManager{
    ServerSocket* serverSocket;
    pthread_t socket_thread;

public:
    CalculationSocketManager(char* port, onmessage_callback onmessage){
        serverSocket = new ServerSocket(port, onmessage);
    }
    ~CalculationSocketManager(){
        delete serverSocket;
    }
    int start(void(*calc_func)(void*) ){


        if(pthread_create(&socket_thread, NULL,socket_func, this->serverSocket)) {
            fprintf(stderr, "Error creating socket thread\n");
            return 1;
        }
        //Start calculation
        calc_func(NULL);
    }
};

File sluster.h

Now we can try to test it :) We will calculate Fibonacci number without any performance modification starting from the number 38.

#include "sluster.h"
#include <vector>
#include <utility>

std::vector<std::pair<int,long long>> results;
int value = 38;
char buf[16394];
void on_message(int sockfd, char* msg, struct sockaddr *their_addr, socklen_t addr_len){

    int offset = 0;
    for(auto& p: results){
        offset += sprintf(buf+offset,"%d,%lld\n",p.first, p.second);
    }

    sendto(sockfd, buf, offset, 0, their_addr, addr_len);
}

long long fib(int n){
    if(n < 2){
        return 1;
    }
    return fib(n - 1) + fib(n - 2);
}

void calcFunc(void*){
    while(true) {
        results.push_back({value, fib(value)});
        value++;
    }
}


int main(){
    CalculationSocketManager calculationSocketManager("4950",on_message);

    calculationSocketManager.start(calcFunc);
}

File main.cpp

For sending requests we can write a client or use Netcat. Here we use netcat. netcat Woho, it works great and it ready to use. Let’s try to deploy it.

Deploying

Usually, I use AWS, but now I will use Google Cloud. You can see tutorial how to create google cloud instance here. After login into the remote machine, clone sluster repo, compile and run.

> git clone https://github.com/albertaleksieiev/sluster.git
> cd sluster/example 
> g++ fib.cpp -pthread -o main.o
> ./main.o
starting UDP server on port 4950

After that, you need to open port 4950 in your firewall, in Google cloud you need to go to your app dashboard and type in search input Networking and select it: gcp Go to Firewall rules, click create firewall rule and fill fields:

  • Name: any name for this rule
  • Source IP ranges: 0.0.0.0/0 - for any IP, or type your IP. Trafic only allowed with this IP.
  • Protocols and Ports: udp:4950 - allow 4950 port for UDP. Allowing ports

And now I can connect to remote connection and get intermediate results, for calculation Fibonacci num. We just need run from local machine netcat but instead localhost enter remote machine IP address. And we need to run our server as a daemon because if we close ssh connection, our serve will be closed too. I use tmux, but you can use screen.

Ok, firstly we shut down our server if it runs on the remote machine. After that run new tmux session, and run a server from this session.

Run server on remote machine

> tmux
> g++ fib.cpp -pthread -o main.o
> ./main.o
starting UDP server on port 4950

Get results, runned from local machine

> nc -u 11.111.11.111 4950
> 
38,63245986
39,102334155
40,165580141
41,267914296
42,433494437
43,701408733
44,1134903170
45,1836311903
46,2971215073
47,4807526976
48,7778742049

Change 11.111.11.111 to your remote machine IP address.

MD5 passwords

Cool it’s really easy to integrate, and it’s working! Just clone repo -> #include "sluster.h" -> implement 2 functions -> open ports. But now let’s try to implement something interesting, like bruteforce attack to md5 passwords :).

For simplification, we will have a bunch of encoded in md5 passwords. But firstly wee need to implement this in our handle message function. All data will be passed in JSON format, we will use Boost Property Tree, we will have 3 functions.

  • Add encoded passwords. {"command": "add", "passwords" : ["32CFE6C19200B67AFB7C3D0E1C43EADB","332FE6C19200B67AFB7C3D0E1C43EADB"]}
  • Start bruteforcing, we will pass all possible ranges for passwords. {"command":"start-bruteforce", "ranges":[{"begin":"a","end":"z"},{"begin":"1","end":"9"}]} - current message will try all possible lowercase letters in first position with all possible digits in second position, all possible number of combination = 26 * 9 = 234.
  • Status returns cracked passwords, and current progress. Request message : {"command":"status"}, responce message :
{
	'current-progress':10,
	'max-progress':'234', 
	'cracked-passwords':[
		{
			'pass':'a5','md5':'32CFE6C19200B67AFB7C3D0E1C43EADB'
		}
	]
}

Implementation

Firstly we need to implement a md5 function, really good source here copy and paste it into a header file and include this header #include "md5.h", the working code you can find here.

After that we need Boost.PropertyTree package.

I already install boost on my PC, so you need to install it too. I use CMake in my project, so I need to add dependencies into CMakeLists.txt

...
set(Boost_USE_STATIC_LIBS OFF)
set(Boost_USE_MULTITHREADED ON)
set(Boost_USE_STATIC_RUNTIME OFF)
find_package(Boost 1.61.0)
if(Boost_FOUND)
    include_directories(${Boost_INCLUDE_DIRS})
    add_executable(SocketClient socket/client.cpp)
    target_link_libraries(SocketClient ${Boost_LIBRARIES})
endif()
...

CMakeLists.txt

After that I need to implement message handler function:

std::unordered_set<std::string> md5Passwords;
std::vector<std::string> crackedPasswords;
long long curr_progress = 0;
long long max_progress = 0;
long long keyLenght = 0;


#include <boost/property_tree/ptree.hpp>
#include <boost/property_tree/json_parser.hpp>

#include <boost/foreach.hpp>
template <typename T>
std::vector<T> as_vector(boost::property_tree::ptree const& pt, boost::property_tree::ptree::key_type const& key)
{
    std::vector<T> r;
    for (auto& item : pt.get_child(key))
        r.push_back(item.second.get_value<T>());
    return r;
}

void on_message(int sockfd, char* msg, struct sockaddr *their_addr, socklen_t addr_len){

    using boost::property_tree::ptree;
    using boost::property_tree::read_json;
    using boost::property_tree::write_json;

    ptree messageObj;
    std::istringstream is (msg);
    read_json (is, messageObj);


    std::string command = messageObj.get<std::string> ("command");
    if(command == "add"){
        for(auto md5: as_vector<std::string>(messageObj,"passwords")){
            md5Passwords.push_back(md5);
        }
        sprintf(buf, "added %d passwords", (int)md5Passwords.size());
    }else if(command == "start-bruteforce"){
        BOOST_FOREACH(const ptree::value_type& child,
                      messageObj.get_child("ranges")) {
                        char begin = child.second.get<char>("begin");

                        char end = child.second.get<char>("end");
                        std::cout<<begin<<' '<<end<<std::endl;
                        //Add logic
                    }
        sprintf(buf, "starting");
    }else if(command == "status"){
        ptree pt;
        pt.put("current-progress",curr_progress);
        pt.put("max-progress", max_progress);
        ptree cracked;
        for(auto& crackedPass: crackedPasswords ){
            ptree pass;
            pass.put("pass",crackedPass);
            pass.put("md5",md5(crackedPass));
            cracked.push_back(std::make_pair("",pass));
        }

        pt.add_child("cracked-passwords",cracked);

        std::ostringstream osbuf;
        write_json (osbuf, pt, false);

        strcpy(buf, osbuf.str().c_str());
    }


    sendto(sockfd, buf, strlen(buf), 0, their_addr, addr_len);
}

on_message implementation

As you can see I need to implement brute force logic, but I wrote a blog post how to easely create multifunctional enumerator, we need just copy the last source code from this page and include it into the project.

After that we need to add logic for handling command start-bruteforce:


auto enumerator = Enumerations::Enumerator();

...
else if(command == "start-bruteforce"){
    max_progress = 1;
    int counter = 0;
    BOOST_FOREACH(const ptree::value_type& child,
                  messageObj.get_child("ranges")) {
                    char begin = child.second.get<char>("begin");

                    char end = child.second.get<char>("end");

                    enumerator.addEnumeration(std::to_string(keyLenght),Enumerations::Enumeration<char>(end, begin, 1));

                    max_progress *= (end - begin + 1);
                    keyLenght++;
                }
    sprintf(buf, "starting, total count : %lld", max_progress);
}
...

And at the end we need to add logic into computation function:

#include <chrono>
#include <thread>

bool start = false;
void calcFunc(void*){

    while(!start){
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
    }
    std::string pass;
    pass.reserve(keyLenght);
    enumerator.enumerate([&pass](Enumerations::EnumerationMap enumerationMap){

        for(int i=0;i<keyLenght;i++){
            pass[i] = enumerationMap.get<char>(std::to_string(i));
        }
        if(md5Passwords.find(md5(pass)) != md5Passwords.end()){
            crackedPasswords.push_back(pass);
            md5Passwords.erase(md5(pass));
        }
        curr_progress++;

    });
}

The full working code you can find here. And the last step we need to add encoded passwords to brute force attack, in current case I use dictionary length of 6, only with allowed character ‘a’ - ‘z’, it’s really simple case, in a real project you will have complicated passwords. In our case, we need to enumerate 26^6 = 308.9M passwords. After launching the server on tmux session cmake ./;make;./BruteForceServer we need to add passwords, and start brute force!

Adding passwords
{"command": "add", "passwords" :  ["c7ded9f87b961a4da4cde26f8c9b5573", "3d4ae0ea237537d3dfced250f221b5a8", "6f2d656486e9d69ae7832841d5f46068", "15c2e9950c8774af70928f34e4ba6f77", "dc9cd480f121ce65df38724756e59c58", "ffa4a945137da6625e38190ae30a226d", "b3fe4e3dc342719fd616fc047032a264", "fbfbc08ff787f8664898003f60744374", "d45a132eb640f69cfca5239b8141fe25", "eeed4a2691eb491c84f34f6c8d164cbb", "4f97e25fc286605b7a1ee21ca2ed2e9c", "89cc49cec6e4a66ee0b472da34cbe601", "da1f66098f1c95680d6001140aebf0fc", "02eba1c2c6f3b5f8cd41ad17446558e9", "c1f41ffb2e427d8ee2ab7f1707776abb", "4e1f1bfe5c486e88b74abd5a7309eb2a", "5b8aa9f637c4afb1c49fb2fb3d1e89e6", "6cb175ed60218e7654a927edf37a28fe", "7262d17af60d2670c2052fb5e54f69c1", "d43745fc5287f325833052931cbe0133", "b8e18becbd0642d96472cd91127bc213", "cccf18341e6b165baeab43c5451fdcc3", "ecdcae459a040da1447555fe558054cc", "9f8a85fbb2bb262b5f9d6ec49129c803", "f5dcd3e3676df8eee99534db3c312853", "cdccb7f7a14e8614787b14732095e03b", "39d6fa8e45c910c3f30a2ad65abb4edd", "be140e890924f86d1168aba9d372786c", "eca36a815e5c47c80967cad37c6fecd5", "b57b8de9eab75a9b8c189f477379c428", "e467e62cf7e9d28de95aa8801be649a4", "d24d12e45da8d598db31dbbfd9133251", "0e77ba4bc050eeb5a7fb4cead3c3e9ba", "a81aa845275423a39ba0a549fa51cda4", "847e8dc147c28fffac9f79f637626283", "698d9a6a37bafb02cbcb4a42d16a647b", "bc706bbbcd05b18791353c345febeb1f", "39e8802d9511eb72c01bdced2873bbe2", "8a618abcd652a005cd3d3d3423979248", "d13fb0206784235405dd30c331c979ce", "50dc4f40caa99044fcb2b15d35937f01", "9b6e6c56f6cc83ce4c43ed325050f6d7", "4b84d7da40337d8ff4a028714258ddca", "f021758f2787c4d94f41c97fc256999d", "2e0f67cd2598f15a936278947eefd851", "ebce9a90a3a81efb15574ccf7e894420", "feebbfd618dc8ee2003da2bed6144d17", "4d1030ee7ebc0416d3aa25958658ee2e", "9c06d0f94c69e80a719b883472cf7aa1", "e11da07c8a2845e4ff437cedf1f511fe"] }
Start bruteforce
{"command":"start-bruteforce", "ranges":[{"begin":"a","end":"z"},{"begin":"a","end":"z"},{"begin":"a","end":"z"},{"begin":"a","end":"z"},{"begin":"a","end":"z"},{"begin":"a","end":"z"}]}
Get status
{"command":"status"}

If you will have a more complicated case you can split all possible encoded passwords or enumeration on nodes. For example, if we have 26 nodes(for simplification), we send on each node first letter, and nodes enumerate on dictionary length of lenght - 1, and append received a letter in the first position.

After 30 minutes I crack 26 of 50 passwords only use 1 node. result

Thanks for your reading! All code available here. I hope it will helpful for you!