31
Messaging temps réel avec Go Mickaël Rémond ProcessOne

Messaging temps réel avec Go

Embed Size (px)

Citation preview

Messaging temps réel avec GoMickaël Rémond

ProcessOne

Retour d'expérience

Construction d'objets connectés en Go

Introduction: Pourquoi utiliser Go pour lemessaging ?

Pertinence

Go est pertinent pour le messaging temps réel:

sur le client: objets connectés

sur le serveur: plate-forme de messaging

Pour les clients / objets connectés

Channels: Le passage de messages est au coeur du language Go.

Cross compilation: Il est possible de compiler et déployer le code client sur unearchitecture différente.

Accès système: La création d'objet est simplifiée par les capacités systèmes de Go.

Pour les serveurs

Performant sur deux axes:

Go routines: Connections simultanés

Les serveurs de messagerie gèrent de nombreux processus en parallèle

Performance: Débit

Permet de développer des systèmes ayant un fort débit en terme de nombre demessages par seconde.

Latence:

Permet de réduire fortement la latence dans la transmission des messages.

Exemple: NATS

gnatsd, implementation du protocole Nats.io

NATS: Capacité de traitement

Source: Brave New Geek

NATS: Latence

Source: Brave New Geek

Illustration du messaging avec divers protocoles

À connaître:

XMPP: eXtensible Messaging and Presence Protocol

xmpp.org (http://xmpp.org)

MQTT: Message Queuing Telemetry Transport

mqtt.org (http://mqtt.org)

NATS: Protocole et serveur

nats.io (http://nats.io)

Cette présentation se concentre sur l'utilisation des serveurs de messaging depuis unclient Go.

La mise en œuvre de serveurs de messaging en pur Go viendra dans une autreprésentation.

Code

XMPP

XMPP est idéal pour le contrôle des objets.

Le protocole est extensible et propose des spécifications adaptées à l'écriture deséquences d'interaction pour le contrôle:

Découverte (XEP-0347: Internet of Things - Discovery)

Contrôle: Requête / Réponse (XEP-0325: Internet of Things - Control)

Développement d'un Jukebox XMPP en Go

Technologies utilisées:

Protocole XMPP

ejabberd: serveur XMPP

Bibliothèque XMPP en Go, Gox: écriture du Jukebox

Raspberry Pi 2 avec image Linux custom et haut parleur connecté à la sortie sonanalogique

Connexion à SoundCloud

Architecture

Contrôle XMPP du jukebox: Chat bot

Le jukebox joue les liens Soundcloud envoyés dans le chat:

<message type="chat" to="test@localhost" id="aac9a"> <body>https://soundcloud.com/radiohead/spectre</body> </message>

Il stoppe la musique en cours de lecture avec la commande stop:

<message type="chat" to="test@localhost" id="aacaa"> <body>stop</body> </message>

Contrôle du jukebox: IoT XMPP Control

Notre jukebox peut interpréter les requêtes de commande IoT (XEP-0325):

<iq type='set'     to='test@localhost/jukebox'     id='2'>    <set xmlns='urn:xmpp:iot:control' xml:lang='en'>      <string name='action' value='play'/>      <string name='url' value='https://soundcloud.com/radiohead/spectre'/>    </set> </iq>

Code du jukebox: Main receive loop

Déclaration de la connection XMPP:

var client *xmpp.Client var err error if client, err = connectXmpp(*jid, *password, *address); err != nil { log.Fatal("Could not connect to XMPP: ", err) }

Boucle de traitement des paquets XMPP:

for packet := range client.Recv() { switch packet := packet.(type) { case *xmpp.ClientMessage: processMessage(client, p, packet) case *xmpp.ClientIQ: processIq(client, p, packet) case *xmpp.ClientPresence: // Do nothing with received presence default: fmt.Printf("Ignoring packet: %T\n", packet) } }

Code du jukebox: Connexion XMPP

func connectXmpp(jid string, password string, address string) (client *xmpp.Client, err error) {

xmppOptions := xmpp.Options{Address: address,

Jid: jid, Password: password, PacketLogger: os.Stdout,

Retry: 10}

if client, err = xmpp.NewClient(xmppOptions); err != nil {

return

}

if _, err = client.Connect(); err != nil {

return

}

return

}

Traitement des messsages

func processMessage(client *xmpp.Client, p *mpg123.Player, packet *xmpp.ClientMessage) { command := strings.Trim(packet.Body, " ") if command == "stop" { p.Stop() } else { playSCURL(p, command) } }

Traitement des commandes

func processIq(client *xmpp.Client, p *mpg123.Player, packet *xmpp.ClientIQ) {

switch payload := packet.Payload.(type) {

// We support IOT Control IQ

case *iot.ControlSet:

var url string

for _, element := range payload.Fields {

if element.XMLName.Local == "string" && element.Name == "url" {

url = strings.Trim(element.Value, " ")

break

}

}

playSCURL(p, url)

setResponse := new(iot.ControlSetResponse)

reply := xmpp.ClientIQ{Packet: xmpp.Packet{To: packet.From, Type: "result", Id: packet.Id}, Payload: setResponse}

client.Send(reply.XMPPFormat())

default:

fmt.Printf("Other IQ Payload: %T\n", packet.Payload)

}

}

Jouer le morceau SoundCloud

func playSCURL(p *mpg123.Player, rawURL string) { songID, _ := soundcloud.GetSongID(rawURL) url := soundcloud.FormatStreamURL(songID) p.Play(url) }

MQTT

MQTT est idéal pour remonter l'information venant de capteurs.

Reporting d'un capteur de température

Technologies utilisées:

Capteur: Températeur CPU OSX

sysctl -n machdep.xcpm.cpu_thermal_level

Serveur MQTT de test: Mosquitto

Bibliothèque MQTT en Go: ProcessOne MQTT

Publisher: Mise en place de la connexion

func main() { client := mqtt.New("localhost:1883", nil) client.ClientID = "mremond-osx" if err := client.Connect(); err != nil { log.Fatal("Connection error: ", err) } ticker := time.NewTicker(5 * time.Second) stop := make(chan bool) go publishLoop(client, ticker, stop) runtime.Goexit() }

Publisher: La boucle de publication

func publishLoop(client *mqtt.Client, ticker *time.Ticker, stop <-chan bool) { for done := false; !done; { select { case <-ticker.C: payload := make([]byte, 1, 1) payload[0] = getTemp() client.Publish(getTopic(client.ClientID), payload) case <-stop: done = true break } } }

Publisher: La lecture de la température

func getTemp() byte {

out, err := exec.Command("sysctl", "-n", "machdep.xcpm.cpu_thermal_level").Output()

if err != nil {

log.Println("Cannot read CPU temperature: ", err)

return byte(0)

}

s := string(out)

if temp, err := strconv.ParseInt(strings.Trim(s, "\n"), 10, 32); err != nil {

return byte(temp)

}

return byte(0)

}

Subscriber

func main() {

messages := make(chan *mqtt.Message)

client := mqtt.New("localhost:1883", messages)

client.ClientID = "MQTT-Sub"

if err := client.Connect(); err != nil {

fmt.Printf("Connection error: %q\n", err)

return

}

name := "mremond-osx/cputemp"

topic := packet.Topic{Name: name, QOS: 1}

client.Subscribe(topic)

for m := range messages {

fmt.Printf("Received message on topic %s: %+v\n", m.Topic, m.Payload)

}

}

Prochaines étapes

Publication du code client MQTT:

Publication pour la fin de semaine, après nettoyage et documentation, sur le

compte Github de ProcessOne (https://github.com/processone/)

Présentation de ce que j'ai appris lors du design de l'API de la bibliothèque ?

Mise en oeuvre de serveurs de messaging en Go:

Présentation du serveur NATS ?: Illustration de NATS qui est un protocol non

standard mais également très performant.

Autres serveurs de messaging en Go.

Liens

Code des slides

github.com/processone/talks/tree/master/2016/go-paris-meetup(https://github.com/processone/talks/tree/master/2016/go-paris-meetup)

Go XMPP

github.com/processone/gox (https://github.com/processone/gox)

Go MQTT

github.com/processone/mqtt (https://github.com/processone/mqtt)

Brave New Geek: Dissecting Message Queues

bravenewgeek.com/dissecting-message-queues/ (http://bravenewgeek.com/dissecting-message-queues/)

Thank you

Mickaël Rémond

ProcessOne

[email protected] (mailto:[email protected])

http://www.process-one.net/ (http://www.process-one.net/)

@mickael (http://twitter.com/mickael)