Upload
amaury-bouchard
View
25.623
Download
1
Embed Size (px)
DESCRIPTION
Conférence données à l'Open World Forum, 05 octobre 2013. Comment créer une base de données noSQL par paires clés-valeurs en moins d'une heure, en se basant sur le bibliothèques Nanomsg et LightningDB.
Citation preview
Créer un serveur noSQLen une heure
Bases noSQL
Bibliothèques utilisées
Présentation AngstromDB
Étude de code
Benchmark & évolutions
Créer un serveur noSQL
Base noSQL
Aller plus loin
Scalabilité horizontale
Haute disponibilité
Big Data
S'affranchir
Modèle relationnel
Transactions ACID
Principe général
Types de bases noSQL
Clé-valeur Document Colonne GraphDynamo
Riak
Redis
Voldemort
Tokyo Tyrant
MemcacheDB
FoundationDB
MongoDB
CouchDB
CouchBase
HyperDex
RethinkDB
Cassandra
Hadoop / Hbase
Accumulo
Neo4J
Allegro
Virtuoso
InfoGrid
Bibliothèquesde programmation
Le successeur de ZeroMQ
Bibliothèque réseau avec des super-pouvoirs
Nanomsg
Gestion de files de messages
Protocole spécifique
14 langages supportés
Redéfinition des concepts réseau
Nanomsg
Méthodes de transport
Inter-threads inproc
Inter-processus ipc
Inter-machines tcp
Découplage du sens de connexion
ServeurClient
Découplage du sens de connexion
ServeurClient
Socket multi-connexion
ServeurClient
Client
Client
port A
port B
Connexion REQ/REP
ServeurClientREQ
Connexion REQ/REP
ServeurClientREP
Connexion PUSH/PULL
ServeurClientPUSH
Connexion PUSH/PULL
ServeurClientPULL
Connexion PUB/SUB
ClientPUB
Serveur
Client
Client
Connexion PUB/SUB
ClientServeur
Client
Client
SUB
SUB
SUB
Load-balancing
ClientServeur
Client
Client
PULL
PUSH
Load-balancing
ClientServeur
Client
Client
PULLPUSH
Load-balancing
ClientServeur
Client
ClientPULL
PUSH
Bus
Client
Client
Client
Client
Bus
Client Client
Client
Client
Stockage paires clé-valeur multivaluées
Persistance sur disque, mapping RAM
Transactionnel (1 thread d'écriture)
Au cœur de OpenLDAP
LMDB (LightningDB)
Débit
Latence
ops / sec.
microsec.
HyperDex : LMDB vs LevelDB
PrésentationAngstromDB
Serveur paires clé-valeur
Codé en C
Basé sur des bibliothèques reconnues
Protocole binaire
Aucun contrôle d'erreur
Implémentation naïve
API très simple
PUT Ajout ou mise-à-jour de clé
GET Retourne une valeur
DELETE Efface une clé
Protocole : DELETE
2
cmd taille clé
clé
1 1 1 à 255 octets
Requête
Réponse 1
statut
1
Protocole : GET
3
cmd taille clé
clé
1 1 1 à 255 octets
Requête
Réponse 1
statut taille données
données
1 4 0 à 4 GO
Protocole : PUT
1
cmd taille clé clé
1 1 1 à 255 octets
Requête
Réponse 1
statut
taille données données
1
4 0 à 4 GO
Multi-threadé
Thread principal Attend les nouvelles connexions
Threads de communication Un thread par client connecté
Thread d'écriture Pas d'écriture concurrente
Thread principal
Threads de communication
LMDBThread
d'écriture
Étude de code
github.com/Amaury/AngstromDB ↩
↪ /tree/master/src
Démarrage
/*
* db Pointer to the database environment.
* socket Socket descriptor for incoming connections.
* threads_socket Nanomsg socket for threads communication.
* writer_tid ID of the writer thread.
* comm_threads Array of communication threads.
*/
typedef struct angstrom_s {
MDB_env *db;
int socket;
int threads_socket;
pthread_t writer_tid;
struct comm_thread_s *comm_threads;
} angstrom_t;
/*
* tid Thread's identifier.
* angstrom Pointer to the server's structure.
* client_sock Socket used to communicate with the client.
* writer_sock Nanomsg socket to send data to the writer. */
typedef struct comm_thread_s {
pthread_t tid;
angstrom_t *angstrom;
int client_sock;
int writer_sock;
} comm_thread_t;
angstrom.h
Démarrage
int main() {
angstrom_t *angstrom;
int i;
// server init
angstrom = calloc(1, sizeof(angstrom_t));
angstrom->socket = angstrom->threads_socket = -1;
angstrom->comm_threads = calloc(NBR_THREADS,
sizeof(comm_thread_t));
// open the database
angstrom->db = database_open(DEFAULT_DB_PATH,
DEFAULT_MAPSIZE, NBR_THREADS);
// create the nanomsg socket for threads communication
angstrom->threads_socket = nn_socket(AF_SP, NN_PUSH);
nn_bind(angstrom->threads_socket, ENDPOINT_THREADS_SOCKET);
// create the writer thread
pthread_create(&angstrom->writer_tid, NULL,
thread_writer_loop, angstrom);
main.c
Démarrage
int main() {
angstrom_t *angstrom;
int i;
// server init
angstrom = calloc(1, sizeof(angstrom_t));
angstrom->socket = angstrom->threads_socket = -1;
angstrom->comm_threads = calloc(NBR_THREADS,
sizeof(comm_thread_t));
// open the database
angstrom->db = database_open(DEFAULT_DB_PATH,
DEFAULT_MAPSIZE, NBR_THREADS);
// create the nanomsg socket for threads communication
angstrom->threads_socket = nn_socket(AF_SP, NN_PUSH);
nn_bind(angstrom->threads_socket, ENDPOINT_THREADS_SOCKET);
// create the writer thread
pthread_create(&angstrom->writer_tid, NULL,
thread_writer_loop, angstrom);
main.c
"inproc://threads_socket"
Démarrage
// create communication threads
for (i = 0; i < NBR_THREADS; i++) {
comm_thread_t *thread = &(angstrom->comm_threads[i]);
thread->client_sock = -1;
thread->angstrom = angstrom;
pthread_create(&thread->tid, 0, thread_comm_loop,
thread);
pthread_detach(thread->tid);
}
// create listening socket
angstrom->socket = _create_listening_socket(DEFAULT_PORT);
// server loop
_main_thread_loop(angstrom);
return (0);
}
main.c
Démarrage
MDB_env *database_open(const char *path, size_t mapsize,
unsigned int nbr_threads) {
MDB_env *env = NULL;
mdb_env_create(&env);
mdb_env_set_mapsize(env, mapsize);
mdb_env_set_maxreaders(env, nbr_threads);
mdb_env_open(env, path, 0, 0664);
return (env);
}
database.c
Démarrage
static int _create_listening_socket(unsigned short port) {
int sock;
struct sockaddr_in addr;
unsigned int addr_size;
const int on = 1;
// create the socket
sock = socket(AF_INET, SOCK_STREAM, 0);
// some options
setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (void*)&on,
sizeof(on));
setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE, (void*)&on,
sizeof(on));
// binding to any interface
addr_size = sizeof(addr);
bzero(&addr, addr_size);
addr.sin_addr.s_addr = htonl(INADDR_ANY);
addr.sin_family = AF_INET;
addr.sin_port = htons(port);
bind(sock, (struct sockaddr*)&addr, addr_size);
listen(sock, SOMAXCONN);
return (sock);
}
main.c
Nouvelle connexion
static void _main_thread_loop(angstrom_t *angstrom) {
int fd;
struct sockaddr_in addr;
unsigned int addr_size;
const int on = 1;
addr_size = sizeof(addr);
for (; ; ) {
bzero(&addr, addr_size);
// accept a new connection
if ((fd = accept(angstrom->socket,
(struct sockaddr*)&addr,
&addr_size)) < 0) {
continue ;
}
setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, (void*)&on,
sizeof(on));
// send the file descriptor number to comm threads
nn_send(angstrom->threads_socket, &fd, sizeof(fd), 0);
}
}
main.c
Nouvelle connexion
void *thread_comm_loop(void *param) {
comm_thread_t *thread = param;
int in_sock;
// opening a connection to the writer thread
thread->writer_sock = nn_socket(AF_SP, NN_PUSH);
nn_connect(thread->writer_sock, ENDPOINT_WRITER_SOCKET);
// opening a connection to the main thread
in_sock = nn_socket(AF_SP, NN_PULL);
nn_connect(in_sock, ENDPOINT_THREADS_SOCKET);
// loop to process new connections
for (; ; ) {
// waiting for a new connection to handle
nn_recv(in_sock, &thread->client_sock, sizeof(int), 0);
// process connection
_process_connection(thread);
}
return (NULL);
}
thread_communication.c
Nouvelle connexion
void *thread_comm_loop(void *param) {
comm_thread_t *thread = param;
int in_sock;
// opening a connection to the writer thread
thread->writer_sock = nn_socket(AF_SP, NN_PUSH);
nn_connect(thread->writer_sock, ENDPOINT_WRITER_SOCKET);
// opening a connection to the main thread
in_sock = nn_socket(AF_SP, NN_PULL);
nn_connect(in_sock, ENDPOINT_THREADS_SOCKET);
// loop to process new connections
for (; ; ) {
// waiting for a new connection to handle
nn_recv(in_sock, &thread->client_sock, sizeof(int), 0);
// process connection
_process_connection(thread);
}
return (NULL);
}
thread_communication.c
"inproc://writer_socket"
"inproc://threads_socket"
Nouvelle connexion
static void _process_connection(comm_thread_t *thread) {
uint8_t cmd;
// loop on incoming requests
for (; ; ) {
// read command byte
if (read(thread->client_sock, &cmd, sizeof(cmd)) <= 0) {
close(thread->client_sock);
break;
}
// interpret command
switch (cmd) {
case PROTO_PUT:
command_put(thread);
break;
case PROTO_DELETE:
command_del(thread);
break;
case PROTO_GET:
command_get(thread);
break;
}
}
}
thread_communication.c
Lecture de données
void command_get(comm_thread_t *thread) {
uint8_t key_size, response = PROTO_OK;
uint32_t value_size;
MDB_val key, value;
// read key length
read(thread->client_sock, &key_size, sizeof(key_size));
// read key data
key.mv_data = malloc(key_size);
read(thread->client_sock, key.mv_data, key_size);
// get data
key.mv_size = (size_t)key_size;
database_get(thread->angstrom->db, &key, &value);
// send response to the client
write(thread->client_sock, &response, sizeof(response));
value_size = htonl((uint32_t)value.mv_size);
write(thread->client_sock, &value_size, sizeof(value_size));
if (value_size)
write(thread->client_sock, value.mv_data, value.mv_size);
}
command_get.c
Lecture de données
void database_get(MDB_env *db, MDB_val *key, MDB_val *value) {
MDB_dbi dbi;
MDB_txn *txn;
// transaction init
mdb_txn_begin(db, NULL, MDB_RDONLY, &txn);
// open database in read-write mode
mdb_dbi_open(txn, NULL, 0, &dbi);
// get data
if (mdb_get(txn, dbi, key, value))
bzero(value, sizeof(*value));
// end of transaction
mdb_txn_abort(txn);
// close database
mdb_dbi_close(db, dbi);
}
database.c
Thread d'écriture
/*
* type Type of action (WRITE_PUT, WRITE_DEL).
* key Size and content of the key.
* value Size and content of the value.
*/
typedef struct writer_msg_s {
enum {
WRITE_PUT,
WRITE_DEL
} type;
MDB_val key;
MDB_val value;
} writer_msg_t;
angstrom.h
Thread d'écriture
void *thread_writer_loop(void *param) {
angstrom_t *angstrom = param;
int socket;
// create the nanomsg socket for threads communication
socket = nn_socket(AF_SP, NN_PULL);
nn_bind(socket, ENDPOINT_WRITER_SOCKET);
// loop to process new connections
for (; ; ) {
writer_msg_t *msg;
// waiting for a new connection to handle
if (nn_recv(socket, &msg, sizeof(writer_msg_t*), 0) < 0)
continue;
// processing
switch (msg->type) {
case WRITE_PUT:
database_put(angstrom->db, &msg->key, &msg->value);
break;
case WRITE_DEL:
database_del(angstrom->db, &msg->key);
break;
}
thread_writer.c
Thread d'écriture
void *thread_writer_loop(void *param) {
angstrom_t *angstrom = param;
int socket;
// create the nanomsg socket for threads communication
socket = nn_socket(AF_SP, NN_PULL);
nn_bind(socket, ENDPOINT_WRITER_SOCKET);
// loop to process new connections
for (; ; ) {
writer_msg_t *msg;
// waiting for a new connection to handle
if (nn_recv(socket, &msg, sizeof(writer_msg_t*), 0) < 0)
continue;
// processing
switch (msg->type) {
case WRITE_PUT:
database_put(angstrom->db, &msg->key, &msg->value);
break;
case WRITE_DEL:
database_del(angstrom->db, &msg->key);
break;
}
thread_writer.c
"inproc://writer_socket"
Thread d'écriture
// free data
if (msg->key.mv_data)
free(msg->key.mv_data);
if (msg->value.mv_data)
free(msg->value.mv_data);
free(msg);
}
return (NULL);
}
thread_writer.c
Effacement de clé
void command_delete(comm_thread_t *thread) {
uint8_t key_size, response = PROTO_OK;
void *key;
writer_msg_t *msg;
// read key length
read(thread->client_sock, &key_size, sizeof(key_size));
// read key data
key = malloc(key_size);
read(thread->client_sock, key, key_size);
// create message
msg = calloc(1, sizeof(writer_msg_t));
msg->type = WRITE_DEL;
msg->key.mv_size = (size_t)key_size;
msg->key.mv_data = key;
// send the message to the writer thread
nn_send(thread->writer_sock, &msg, sizeof(msg), 0);
// send response to the client
write(thread->client_sock, &response, sizeof(response));
}
command_delete.c
Effacement de clé
void database_del(MDB_env *db, MDB_val *key) {
MDB_dbi dbi;
MDB_txn *txn;
// transaction init
mdb_txn_begin(db, NULL, 0, &txn);
// open database in read-write mode
mdb_dbi_open(txn, NULL, 0, &dbi);
// delete key
mdb_del(txn, dbi, key, NULL);
// close database
mdb_dbi_close(db, dbi);
// transaction commit
mdb_txn_commit(txn);
}
database.c
Ajout / mise-à-jour de clé
void command_put(comm_thread_t *thread) {
uint8_t key_size, response = PROTO_OK;
void *key, *value = NULL;
uint32_t value_size;
writer_msg_t *msg;
// read key length
read(thread->client_sock, &key_size, sizeof(key_size));
// read key data
key = malloc(key_size);
read(thread->client_sock, key, key_size);
// read value length
read(thread->client_sock, &value_size, sizeof(value_size));
value_size = ntohl(value_size);
if (value_size > 0) {
// read value data
value = malloc(value_size);
read(thread->client_sock, value, value_size);
}
command_put.c
Ajout / mise-à-jour de clé
// create message
msg = malloc(sizeof(writer_msg_t));
msg->type = WRITE_PUT;
msg->key.mv_size = (size_t)key_size;
msg->key.mv_data = key;
msg->value.mv_size = (size_t)value_size;
msg->value.mv_data = value;
// send the message to the writer thread
nn_send(thread->writer_sock, &msg, sizeof(msg), 0);
// send response to the client
write(thread->client_sock, &response, sizeof(response));
}
command_put.c
Ajout / mise-à-jour de clé
void database_put(MDB_env *db, MDB_val *key, MDB_val *value) {
MDB_dbi dbi;
MDB_txn *txn;
// transaction init
mdb_txn_begin(db, NULL, 0, &txn);
// open database in read-write mode
mdb_dbi_open(txn, NULL, 0, &dbi);
// put data
mdb_put(txn, dbi, key, value, 0);
// close database
mdb_dbi_close(db, dbi);
// transaction commit
mdb_txn_commit(txn);
}
database.c
clocLanguage files comment code-------------------------------------------------C 7 132 212C Header 1 111 55make 1 13 27-------------------------------------------------SUM: 9 256 294
sloccountSchedule Estimate, Years (Months) = 0.17 (2.06)
Total Estimated Cost to Develop = $ 6,753
Benchmark
Base Écritures Lectures
AngstromDB 22 ms 55 ms
Couchbase 25 ms 22 ms
Redis 29 ms 30 ms
MongoDB 39 ms 27 ms
45 écritures / lectures séquentielles de données représentatives
Améliorationspossibles
Réduire les appels systèmes !
Bufferiser les lectures réseau
Regrouper les écritures réseau
Facile à tester (sendmsg vs write)
Pour commencer
Benchmark
Base Écritures Lectures
AngstromDB 22 ms 55 ms
AngstromDB 22 ms 26 ms
Couchbase 25 ms 22 ms
Redis 29 ms 30 ms
MongoDB 39 ms 27 ms
Compression à la volée (Zippy)
Sérialisation de données (MsgPack)
Transactions en lecture
… Mono-processus asynchrone ?
Pour aller plus loin
geek-directeur-technique.com
github.com/Amaury/AngstromDB
@geekcto