Example pubsub code cleanup

This commit is contained in:
CORNIERE Rémi 2020-01-14 23:13:13 +01:00
parent 20e02cc9ad
commit 1d1adb0c48

View file

@ -32,10 +32,10 @@ func main() {
router.NewRoute().Packet("message"). router.NewRoute().Packet("message").
HandlerFunc(func(s xmpp.Sender, p stanza.Packet) { HandlerFunc(func(s xmpp.Sender, p stanza.Packet) {
data, _ := xml.Marshal(p) data, _ := xml.Marshal(p)
fmt.Println("Received a publication ! => \n" + string(data)) log.Println("Received a message ! => \n" + string(data))
}) })
client, err := xmpp.NewClient(config, router, func(err error) { fmt.Println(err) }) client, err := xmpp.NewClient(config, router, func(err error) { log.Println(err) })
if err != nil { if err != nil {
log.Fatalf("%+v", err) log.Fatalf("%+v", err)
} }
@ -49,11 +49,45 @@ func main() {
// ========================== // ==========================
// Create a node // Create a node
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
createNode(ctx, cancel, client)
// =============================
// Configure the node. This can also be done in a single message with the creation
configureNode(ctx, cancel, client)
// ====================================
// Subscribe to this node :
subToNode(ctx, cancel, client)
// ==========================
// Publish to that node
pubToNode(ctx, cancel, client)
// =============================
// Let's purge the node :
purgeRq, _ := stanza.NewPurgeAllItems(serviceName, nodeName)
purgeCh, err := client.SendIQ(ctx, purgeRq)
select {
case purgeResp := <-purgeCh:
if purgeResp.Error != nil {
cancel()
log.Fatalf("error while purging node : %s", purgeResp.Error.Text)
}
log.Println("node successfully purged")
case <-time.After(1000 * time.Millisecond):
cancel()
log.Fatal("No iq response was received in time while purging node")
}
cancel()
}
func createNode(ctx context.Context, cancel context.CancelFunc, client *xmpp.Client) {
rqCreate, err := stanza.NewCreateNode(serviceName, nodeName) rqCreate, err := stanza.NewCreateNode(serviceName, nodeName)
if err != nil { if err != nil {
log.Fatalf("%+v", err) log.Fatalf("%+v", err)
} }
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
createCh, err := client.SendIQ(ctx, rqCreate) createCh, err := client.SendIQ(ctx, rqCreate)
if err != nil { if err != nil {
log.Fatalf("%+v", err) log.Fatalf("%+v", err)
@ -67,20 +101,73 @@ func main() {
if respCr.Error.Reason != "conflict" { if respCr.Error.Reason != "conflict" {
log.Fatalf("%+v", respCr.Error.Text) log.Fatalf("%+v", respCr.Error.Text)
} }
fmt.Println(respCr.Error.Text) log.Println(respCr.Error.Text)
} else { } else {
fmt.Print("successfully created channel") fmt.Print("successfully created channel")
} }
case <-time.After(100 * time.Millisecond): case <-time.After(100 * time.Millisecond):
cancel() cancel()
log.Fatal("No iq response was received in time") log.Fatal("No iq response was received in time while creating node")
}
} }
} }
} }
// ==================================== func configureNode(ctx context.Context, cancel context.CancelFunc, client *xmpp.Client) {
// Now let's subscribe to this node : // First, ask for a form with the config options
rqSubscribe, _ := stanza.NewSubRq(serviceName, stanza.SubInfo{ confRq, _ := stanza.NewConfigureNode(serviceName, nodeName)
confReqCh, err := client.SendIQ(ctx, confRq)
if err != nil {
log.Fatalf("could not send iq : %v", err)
}
select {
case confForm := <-confReqCh:
// If the request was successful, we now have a form with configuration options to update
fields, err := confForm.GetFormFields()
if err != nil {
log.Fatal("No config fields found !")
}
// These are some common fields expected to be present. Change processing to your liking
if fields["pubsub#max_payload_size"] != nil {
fields["pubsub#max_payload_size"].ValuesList[0] = "100000"
}
if fields["pubsub#notification_type"] != nil {
fields["pubsub#notification_type"].ValuesList[0] = "headline"
}
// Send the modified fields as a form
submitConf, err := stanza.NewFormSubmissionOwner(serviceName,
nodeName,
[]*stanza.Field{
fields["pubsub#max_payload_size"],
fields["pubsub#notification_type"],
})
c, _ := client.SendIQ(ctx, submitConf)
select {
case confResp := <-c:
if confResp.Error != nil {
cancel()
log.Fatalf("node configuration failed : %s", confResp.Error.Text)
}
log.Println("node configuration was successful")
return
case <-time.After(300 * time.Millisecond):
cancel()
log.Fatal("No iq response was received in time while configuring the node")
}
case <-time.After(300 * time.Millisecond):
cancel()
log.Fatal("No iq response was received in time while asking for the config form")
}
}
func subToNode(ctx context.Context, cancel context.CancelFunc, client *xmpp.Client) {
rqSubscribe, err := stanza.NewSubRq(serviceName, stanza.SubInfo{
Node: nodeName, Node: nodeName,
Jid: userJID, Jid: userJID,
}) })
@ -91,15 +178,15 @@ func main() {
if subRespCh != nil { if subRespCh != nil {
select { select {
case <-subRespCh: case <-subRespCh:
fmt.Println("Subscribed to the service") log.Println("Subscribed to the service")
case <-time.After(100 * time.Millisecond): case <-time.After(300 * time.Millisecond):
cancel() cancel()
log.Fatal("No iq response was received in time") log.Fatal("No iq response was received in time while subscribing")
}
} }
} }
// ========================== func pubToNode(ctx context.Context, cancel context.CancelFunc, client *xmpp.Client) {
// Publish to that node
pub, err := stanza.NewPublishItemRq(serviceName, nodeName, "", stanza.Item{ pub, err := stanza.NewPublishItemRq(serviceName, nodeName, "", stanza.Item{
Publisher: "testuser2", Publisher: "testuser2",
Any: &stanza.Node{ Any: &stanza.Node{
@ -166,59 +253,10 @@ func main() {
if pubRespCh != nil { if pubRespCh != nil {
select { select {
case <-pubRespCh: case <-pubRespCh:
fmt.Println("Published item to the service") log.Println("Published item to the service")
case <-time.After(100 * time.Millisecond):
cancel()
log.Fatal("No iq response was received in time")
}
}
// =============================
// Let's purge the node now :
purgeRq, _ := stanza.NewPurgeAllItems(serviceName, nodeName)
client.SendIQ(ctx, purgeRq)
// =============================
// Configure the node :
confRq, _ := stanza.NewConfigureNode(serviceName, nodeName)
confReqCh, err := client.SendIQ(ctx, confRq)
select {
case confForm := <-confReqCh:
fields, err := confForm.GetFormFields()
if err != nil {
log.Fatal("No config fields found !")
}
// These are some common fields expected to be present. Change processing to your liking
if fields["pubsub#max_payload_size"] != nil {
fields["pubsub#max_payload_size"].ValuesList[0] = "100000"
}
if fields["pubsub#notification_type"] != nil {
fields["pubsub#notification_type"].ValuesList[0] = "headline"
}
submitConf, err := stanza.NewFormSubmissionOwner(serviceName,
nodeName,
[]*stanza.Field{
fields["pubsub#max_payload_size"],
fields["pubsub#notification_type"],
})
c, _ := client.SendIQ(ctx, submitConf)
select {
case <-c:
fmt.Println("node configuration was successful")
case <-time.After(300 * time.Millisecond): case <-time.After(300 * time.Millisecond):
cancel() cancel()
log.Fatal("No iq response was received in time") log.Fatal("No iq response was received in time while publishing")
} }
case <-time.After(300 * time.Millisecond):
cancel()
log.Fatal("No iq response was received in time")
} }
cancel()
} }