Créer une base NoSQL en 1 heure

Preview:

DESCRIPTION

Conférence données à l'Open World Forum, 05 octobre 2013. Comment créer une base de données noSQL par paires clés-valeurs en moins d'une heure, en se basant sur le bibliothèques Nanomsg et LightningDB.

Citation preview

Créer un serveur noSQLen une heure

Bases noSQL

Bibliothèques utilisées

Présentation AngstromDB

Étude de code

Benchmark & évolutions

Créer un serveur noSQL

Base noSQL

Aller plus loin

Scalabilité horizontale

Haute disponibilité

Big Data

S'affranchir

Modèle relationnel

Transactions ACID

Principe général

Types de bases noSQL

Clé-valeur Document Colonne GraphDynamo

Riak

Redis

Voldemort

Tokyo Tyrant

MemcacheDB

FoundationDB

MongoDB

CouchDB

CouchBase

HyperDex

RethinkDB

Cassandra

Hadoop / Hbase

Accumulo

Neo4J

Allegro

Virtuoso

InfoGrid

Bibliothèquesde programmation

Le successeur de ZeroMQ

Bibliothèque réseau avec des super-pouvoirs

Nanomsg

Gestion de files de messages

Protocole spécifique

14 langages supportés

Redéfinition des concepts réseau

Nanomsg

Méthodes de transport

Inter-threads inproc

Inter-processus ipc

Inter-machines tcp

Découplage du sens de connexion

ServeurClient

Découplage du sens de connexion

ServeurClient

Socket multi-connexion

ServeurClient

Client

Client

port A

port B

Connexion REQ/REP

ServeurClientREQ

Connexion REQ/REP

ServeurClientREP

Connexion PUSH/PULL

ServeurClientPUSH

Connexion PUSH/PULL

ServeurClientPULL

Connexion PUB/SUB

ClientPUB

Serveur

Client

Client

Connexion PUB/SUB

ClientServeur

Client

Client

SUB

SUB

SUB

Load-balancing

ClientServeur

Client

Client

PULL

PUSH

Load-balancing

ClientServeur

Client

Client

PULLPUSH

Load-balancing

ClientServeur

Client

ClientPULL

PUSH

Bus

Client

Client

Client

Client

Bus

Client Client

Client

Client

Stockage paires clé-valeur multivaluées

Persistance sur disque, mapping RAM

Transactionnel (1 thread d'écriture)

Au cœur de OpenLDAP

LMDB (LightningDB)

Débit

Latence

ops / sec.

microsec.

HyperDex : LMDB vs LevelDB

PrésentationAngstromDB

Serveur paires clé-valeur

Codé en C

Basé sur des bibliothèques reconnues

Protocole binaire

Aucun contrôle d'erreur

Implémentation naïve

API très simple

PUT Ajout ou mise-à-jour de clé

GET Retourne une valeur

DELETE Efface une clé

Protocole : DELETE

2

cmd taille clé

clé

1 1 1 à 255 octets

Requête

Réponse 1

statut

1

Protocole : GET

3

cmd taille clé

clé

1 1 1 à 255 octets

Requête

Réponse 1

statut taille données

données

1 4 0 à 4 GO

Protocole : PUT

1

cmd taille clé clé

1 1 1 à 255 octets

Requête

Réponse 1

statut

taille données données

1

4 0 à 4 GO

Multi-threadé

Thread principal Attend les nouvelles connexions

Threads de communication Un thread par client connecté

Thread d'écriture Pas d'écriture concurrente

Thread principal

Threads de communication

LMDBThread

d'écriture

Étude de code

github.com/Amaury/AngstromDB ↩

↪ /tree/master/src

Démarrage

/*

* db Pointer to the database environment.

* socket Socket descriptor for incoming connections.

* threads_socket Nanomsg socket for threads communication.

* writer_tid ID of the writer thread.

* comm_threads Array of communication threads.

*/

typedef struct angstrom_s {

MDB_env *db;

int socket;

int threads_socket;

pthread_t writer_tid;

struct comm_thread_s *comm_threads;

} angstrom_t;

/*

* tid Thread's identifier.

* angstrom Pointer to the server's structure.

* client_sock Socket used to communicate with the client.

* writer_sock Nanomsg socket to send data to the writer. */

typedef struct comm_thread_s {

pthread_t tid;

angstrom_t *angstrom;

int client_sock;

int writer_sock;

} comm_thread_t;

angstrom.h

Démarrage

int main() {

angstrom_t *angstrom;

int i;

// server init

angstrom = calloc(1, sizeof(angstrom_t));

angstrom->socket = angstrom->threads_socket = -1;

angstrom->comm_threads = calloc(NBR_THREADS,

sizeof(comm_thread_t));

// open the database

angstrom->db = database_open(DEFAULT_DB_PATH,

DEFAULT_MAPSIZE, NBR_THREADS);

// create the nanomsg socket for threads communication

angstrom->threads_socket = nn_socket(AF_SP, NN_PUSH);

nn_bind(angstrom->threads_socket, ENDPOINT_THREADS_SOCKET);

// create the writer thread

pthread_create(&angstrom->writer_tid, NULL,

thread_writer_loop, angstrom);

main.c

Démarrage

int main() {

angstrom_t *angstrom;

int i;

// server init

angstrom = calloc(1, sizeof(angstrom_t));

angstrom->socket = angstrom->threads_socket = -1;

angstrom->comm_threads = calloc(NBR_THREADS,

sizeof(comm_thread_t));

// open the database

angstrom->db = database_open(DEFAULT_DB_PATH,

DEFAULT_MAPSIZE, NBR_THREADS);

// create the nanomsg socket for threads communication

angstrom->threads_socket = nn_socket(AF_SP, NN_PUSH);

nn_bind(angstrom->threads_socket, ENDPOINT_THREADS_SOCKET);

// create the writer thread

pthread_create(&angstrom->writer_tid, NULL,

thread_writer_loop, angstrom);

main.c

"inproc://threads_socket"

Démarrage

// create communication threads

for (i = 0; i < NBR_THREADS; i++) {

comm_thread_t *thread = &(angstrom->comm_threads[i]);

thread->client_sock = -1;

thread->angstrom = angstrom;

pthread_create(&thread->tid, 0, thread_comm_loop,

thread);

pthread_detach(thread->tid);

}

// create listening socket

angstrom->socket = _create_listening_socket(DEFAULT_PORT);

// server loop

_main_thread_loop(angstrom);

return (0);

}

main.c

Démarrage

MDB_env *database_open(const char *path, size_t mapsize,

unsigned int nbr_threads) {

MDB_env *env = NULL;

mdb_env_create(&env);

mdb_env_set_mapsize(env, mapsize);

mdb_env_set_maxreaders(env, nbr_threads);

mdb_env_open(env, path, 0, 0664);

return (env);

}

database.c

Démarrage

static int _create_listening_socket(unsigned short port) {

int sock;

struct sockaddr_in addr;

unsigned int addr_size;

const int on = 1;

// create the socket

sock = socket(AF_INET, SOCK_STREAM, 0);

// some options

setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (void*)&on,

sizeof(on));

setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE, (void*)&on,

sizeof(on));

// binding to any interface

addr_size = sizeof(addr);

bzero(&addr, addr_size);

addr.sin_addr.s_addr = htonl(INADDR_ANY);

addr.sin_family = AF_INET;

addr.sin_port = htons(port);

bind(sock, (struct sockaddr*)&addr, addr_size);

listen(sock, SOMAXCONN);

return (sock);

}

main.c

Nouvelle connexion

static void _main_thread_loop(angstrom_t *angstrom) {

int fd;

struct sockaddr_in addr;

unsigned int addr_size;

const int on = 1;

addr_size = sizeof(addr);

for (; ; ) {

bzero(&addr, addr_size);

// accept a new connection

if ((fd = accept(angstrom->socket,

(struct sockaddr*)&addr,

&addr_size)) < 0) {

continue ;

}

setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, (void*)&on,

sizeof(on));

// send the file descriptor number to comm threads

nn_send(angstrom->threads_socket, &fd, sizeof(fd), 0);

}

}

main.c

Nouvelle connexion

void *thread_comm_loop(void *param) {

comm_thread_t *thread = param;

int in_sock;

// opening a connection to the writer thread

thread->writer_sock = nn_socket(AF_SP, NN_PUSH);

nn_connect(thread->writer_sock, ENDPOINT_WRITER_SOCKET);

// opening a connection to the main thread

in_sock = nn_socket(AF_SP, NN_PULL);

nn_connect(in_sock, ENDPOINT_THREADS_SOCKET);

// loop to process new connections

for (; ; ) {

// waiting for a new connection to handle

nn_recv(in_sock, &thread->client_sock, sizeof(int), 0);

// process connection

_process_connection(thread);

}

return (NULL);

}

thread_communication.c

Nouvelle connexion

void *thread_comm_loop(void *param) {

comm_thread_t *thread = param;

int in_sock;

// opening a connection to the writer thread

thread->writer_sock = nn_socket(AF_SP, NN_PUSH);

nn_connect(thread->writer_sock, ENDPOINT_WRITER_SOCKET);

// opening a connection to the main thread

in_sock = nn_socket(AF_SP, NN_PULL);

nn_connect(in_sock, ENDPOINT_THREADS_SOCKET);

// loop to process new connections

for (; ; ) {

// waiting for a new connection to handle

nn_recv(in_sock, &thread->client_sock, sizeof(int), 0);

// process connection

_process_connection(thread);

}

return (NULL);

}

thread_communication.c

"inproc://writer_socket"

"inproc://threads_socket"

Nouvelle connexion

static void _process_connection(comm_thread_t *thread) {

uint8_t cmd;

// loop on incoming requests

for (; ; ) {

// read command byte

if (read(thread->client_sock, &cmd, sizeof(cmd)) <= 0) {

close(thread->client_sock);

break;

}

// interpret command

switch (cmd) {

case PROTO_PUT:

command_put(thread);

break;

case PROTO_DELETE:

command_del(thread);

break;

case PROTO_GET:

command_get(thread);

break;

}

}

}

thread_communication.c

Lecture de données

void command_get(comm_thread_t *thread) {

uint8_t key_size, response = PROTO_OK;

uint32_t value_size;

MDB_val key, value;

// read key length

read(thread->client_sock, &key_size, sizeof(key_size));

// read key data

key.mv_data = malloc(key_size);

read(thread->client_sock, key.mv_data, key_size);

// get data

key.mv_size = (size_t)key_size;

database_get(thread->angstrom->db, &key, &value);

// send response to the client

write(thread->client_sock, &response, sizeof(response));

value_size = htonl((uint32_t)value.mv_size);

write(thread->client_sock, &value_size, sizeof(value_size));

if (value_size)

write(thread->client_sock, value.mv_data, value.mv_size);

}

command_get.c

Lecture de données

void database_get(MDB_env *db, MDB_val *key, MDB_val *value) {

MDB_dbi dbi;

MDB_txn *txn;

// transaction init

mdb_txn_begin(db, NULL, MDB_RDONLY, &txn);

// open database in read-write mode

mdb_dbi_open(txn, NULL, 0, &dbi);

// get data

if (mdb_get(txn, dbi, key, value))

bzero(value, sizeof(*value));

// end of transaction

mdb_txn_abort(txn);

// close database

mdb_dbi_close(db, dbi);

}

database.c

Thread d'écriture

/*

* type Type of action (WRITE_PUT, WRITE_DEL).

* key Size and content of the key.

* value Size and content of the value.

*/

typedef struct writer_msg_s {

enum {

WRITE_PUT,

WRITE_DEL

} type;

MDB_val key;

MDB_val value;

} writer_msg_t;

angstrom.h

Thread d'écriture

void *thread_writer_loop(void *param) {

angstrom_t *angstrom = param;

int socket;

// create the nanomsg socket for threads communication

socket = nn_socket(AF_SP, NN_PULL);

nn_bind(socket, ENDPOINT_WRITER_SOCKET);

// loop to process new connections

for (; ; ) {

writer_msg_t *msg;

// waiting for a new connection to handle

if (nn_recv(socket, &msg, sizeof(writer_msg_t*), 0) < 0)

continue;

// processing

switch (msg->type) {

case WRITE_PUT:

database_put(angstrom->db, &msg->key, &msg->value);

break;

case WRITE_DEL:

database_del(angstrom->db, &msg->key);

break;

}

thread_writer.c

Thread d'écriture

void *thread_writer_loop(void *param) {

angstrom_t *angstrom = param;

int socket;

// create the nanomsg socket for threads communication

socket = nn_socket(AF_SP, NN_PULL);

nn_bind(socket, ENDPOINT_WRITER_SOCKET);

// loop to process new connections

for (; ; ) {

writer_msg_t *msg;

// waiting for a new connection to handle

if (nn_recv(socket, &msg, sizeof(writer_msg_t*), 0) < 0)

continue;

// processing

switch (msg->type) {

case WRITE_PUT:

database_put(angstrom->db, &msg->key, &msg->value);

break;

case WRITE_DEL:

database_del(angstrom->db, &msg->key);

break;

}

thread_writer.c

"inproc://writer_socket"

Thread d'écriture

// free data

if (msg->key.mv_data)

free(msg->key.mv_data);

if (msg->value.mv_data)

free(msg->value.mv_data);

free(msg);

}

return (NULL);

}

thread_writer.c

Effacement de clé

void command_delete(comm_thread_t *thread) {

uint8_t key_size, response = PROTO_OK;

void *key;

writer_msg_t *msg;

// read key length

read(thread->client_sock, &key_size, sizeof(key_size));

// read key data

key = malloc(key_size);

read(thread->client_sock, key, key_size);

// create message

msg = calloc(1, sizeof(writer_msg_t));

msg->type = WRITE_DEL;

msg->key.mv_size = (size_t)key_size;

msg->key.mv_data = key;

// send the message to the writer thread

nn_send(thread->writer_sock, &msg, sizeof(msg), 0);

// send response to the client

write(thread->client_sock, &response, sizeof(response));

}

command_delete.c

Effacement de clé

void database_del(MDB_env *db, MDB_val *key) {

MDB_dbi dbi;

MDB_txn *txn;

// transaction init

mdb_txn_begin(db, NULL, 0, &txn);

// open database in read-write mode

mdb_dbi_open(txn, NULL, 0, &dbi);

// delete key

mdb_del(txn, dbi, key, NULL);

// close database

mdb_dbi_close(db, dbi);

// transaction commit

mdb_txn_commit(txn);

}

database.c

Ajout / mise-à-jour de clé

void command_put(comm_thread_t *thread) {

uint8_t key_size, response = PROTO_OK;

void *key, *value = NULL;

uint32_t value_size;

writer_msg_t *msg;

// read key length

read(thread->client_sock, &key_size, sizeof(key_size));

// read key data

key = malloc(key_size);

read(thread->client_sock, key, key_size);

// read value length

read(thread->client_sock, &value_size, sizeof(value_size));

value_size = ntohl(value_size);

if (value_size > 0) {

// read value data

value = malloc(value_size);

read(thread->client_sock, value, value_size);

}

command_put.c

Ajout / mise-à-jour de clé

// create message

msg = malloc(sizeof(writer_msg_t));

msg->type = WRITE_PUT;

msg->key.mv_size = (size_t)key_size;

msg->key.mv_data = key;

msg->value.mv_size = (size_t)value_size;

msg->value.mv_data = value;

// send the message to the writer thread

nn_send(thread->writer_sock, &msg, sizeof(msg), 0);

// send response to the client

write(thread->client_sock, &response, sizeof(response));

}

command_put.c

Ajout / mise-à-jour de clé

void database_put(MDB_env *db, MDB_val *key, MDB_val *value) {

MDB_dbi dbi;

MDB_txn *txn;

// transaction init

mdb_txn_begin(db, NULL, 0, &txn);

// open database in read-write mode

mdb_dbi_open(txn, NULL, 0, &dbi);

// put data

mdb_put(txn, dbi, key, value, 0);

// close database

mdb_dbi_close(db, dbi);

// transaction commit

mdb_txn_commit(txn);

}

database.c

clocLanguage files comment code-------------------------------------------------C 7 132 212C Header 1 111 55make 1 13 27-------------------------------------------------SUM: 9 256 294

sloccountSchedule Estimate, Years (Months) = 0.17 (2.06)

Total Estimated Cost to Develop = $ 6,753

Benchmark

Base Écritures Lectures

AngstromDB 22 ms 55 ms

Couchbase 25 ms 22 ms

Redis 29 ms 30 ms

MongoDB 39 ms 27 ms

45 écritures / lectures séquentielles de données représentatives

Améliorationspossibles

Réduire les appels systèmes !

Bufferiser les lectures réseau

Regrouper les écritures réseau

Facile à tester (sendmsg vs write)

Pour commencer

Benchmark

Base Écritures Lectures

AngstromDB 22 ms 55 ms

AngstromDB 22 ms 26 ms

Couchbase 25 ms 22 ms

Redis 29 ms 30 ms

MongoDB 39 ms 27 ms

Compression à la volée (Zippy)

Sérialisation de données (MsgPack)

Transactions en lecture

… Mono-processus asynchrone ?

Pour aller plus loin

geek-directeur-technique.com

github.com/Amaury/AngstromDB

amaury@amaury.net

@geekcto

Recommended