Handle webhooks
Use the code examples of the webhooks’ handling functions for a single endpoint.
The webhooks used in the examples:
- Python
- Ruby
- Node.js
- Go
# Use this sample code to handle webhook events in your integration.
#
# 1. Paste this code into a new file `server.py`.
#
# 2. Install dependencies:
# python -m pip install fastapi[all]
#
# 3. Run the server on http://localhost:8000
# python server.py
import fastapi, hashlib, hmac, json, typing
app = fastapi.FastAPI()
@app.post("/webhook")
async def webhook(request: fastapi.Request) -> dict[str, typing.Any]:
secret_key = "<YOUR_S2S_KEY>" # Replace with your actual webhook secret key
raw_payload = await request.body()
payload = raw_payload.decode()
timestamp = request.headers["x-aghanim-signature-timestamp"]
received_signature = request.headers["x-aghanim-signature"]
if not verify_signature(secret_key, payload, timestamp, received_signature):
raise fastapi.HTTPException(status_code=403, detail="Invalid signature")
data = json.loads(payload)
event_type = data["event_type"]
event_data = data["event_data"]
raise fastapi.HTTPException(status_code=400, detail="Unknown event type")
def verify_signature(secret_key: str, payload: str, timestamp: str, received_signature: str) -> bool:
signature_data = f"{timestamp}.{payload}"
computed_hash = hmac.new(secret_key.encode(), signature_data.encode(), hashlib.sha256)
computed_signature = computed_hash.hexdigest()
return hmac.compare_digest(computed_signature, received_signature)
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
# Use this sample code to handle webhook events in your integration.
#
# 1. Paste this code into a new file `server.rb`.
#
# 2. Install dependencies:
# gem install sinatra json hmac
#
# 3. Run the server on http://localhost:8000
# ruby server.rb
require 'sinatra'
require 'json'
require 'openssl'
post '/webhook' do
secret_key = "<YOUR_S2S_KEY>" # Replace with your actual webhook secret key
payload = request.body.read
timestamp = request.env["HTTP_X_AGHANIM_SIGNATURE_TIMESTAMP"]
received_signature = request.env["HTTP_X_AGHANIM_SIGNATURE"]
unless verify_signature(secret_key, payload, timestamp, received_signature)
halt 403, "Invalid signature"
end
data = JSON.parse(payload)
event_type = data["event_type"]
event_data = data["event_data"]
halt 400, "Unknown event type"
end
def verify_signature(secret_key, payload, timestamp, received_signature)
signature_data = "#{timestamp}.#{payload}"
computed_signature = OpenSSL::HMAC.hexdigest('sha256', secret_key, signature_data)
OpenSSL.secure_compare(computed_signature, received_signature)
end
if __FILE__ == $0
require 'sinatra'
set :bind, '0.0.0.0'
set :port, 8000
end
// Use this sample code to handle webhook events in your integration.
//
// 1. Paste this code into a new file `server.js`.
//
// 2. Install dependencies:
// npm install express
//
// 3. Run the server on http://localhost:8000
// node server.js
const express = require('express');
const crypto = require('crypto');
const app = express();
app.post('/webhook', express.raw({ type: "*/*" }), async (req, res) => {
const secretKey = '<YOUR_S2S_KEY>'; // Replace with your actual webhook secret key
const rawPayload = req.body;
const timestamp = req.headers['x-aghanim-signature-timestamp'];
const receivedSignature = req.headers['x-aghanim-signature'];
if (!verifySignature(secretKey, rawPayload, timestamp, receivedSignature)) {
return res.status(403).send('Invalid signature');
}
const payload = JSON.parse(req.body);
const { event_type, event_data } = payload;
return res.status(400).send('Unknown event type');
});
function verifySignature(secretKey, payload, timestamp, receivedSignature) {
const signatureData = `${timestamp}.${payload}`;
const computedSignature = crypto
.createHmac('sha256', secretKey)
.update(signatureData)
.digest('hex');
return crypto.timingSafeEqual(Buffer.from(computedSignature), Buffer.from(receivedSignature));
}
app.listen(8000, () => {
console.log('Server is running on http://localhost:8000');
});
// Use this sample code to handle webhook events in your integration.
//
// 1. Paste this code into a new file `server.go`.
//
// 2. Run the server on http://localhost:8000
// go run server.go
package main
import (
"crypto/hmac"
"crypto/sha256"
"encoding/hex"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
)
func webhookHandler(w http.ResponseWriter, r *http.Request) {
secretKey := "<YOUR_S2S_KEY>" // Replace with your actual webhook secret key
rawPayload, _ := ioutil.ReadAll(r.Body)
payload := string(rawPayload)
timestamp := r.Header.Get("X-Aghanim-Signature-Timestamp")
receivedSignature := r.Header.Get("X-Aghanim-Signature")
if !verifySignature(secretKey, payload, timestamp, receivedSignature) {
http.Error(w, "Invalid signature", http.StatusForbidden)
return
}
var data map[string]interface{}
if err := json.Unmarshal(rawPayload, &data); err != nil {
http.Error(w, "Invalid payload", http.StatusBadRequest)
return
}
eventType := data["event_type"].(string)
eventData := data["event_data"].(map[string]interface{})
http.Error(w, "Unknown event type", http.StatusBadRequest)
}
func verifySignature(secretKey, payload, timestamp, receivedSignature string) bool {
signatureData := fmt.Sprintf("%s.%s", timestamp, payload)
mac := hmac.New(sha256.New, []byte(secretKey))
mac.Write([]byte(signatureData))
computedSignature := hex.EncodeToString(mac.Sum(nil))
return hmac.Equal([]byte(computedSignature), []byte(receivedSignature))
}
func main() {
http.HandleFunc("/webhook", webhookHandler)
fmt.Println("Server is running on http://localhost:8000")
http.ListenAndServe(":8000", nil)
}
Testing
To test the webhooks’ implementation, locally, send the requests below.
- cURL
- Python
curl -X POST "https://<YOUR_WEBHOOK_ENDPOINT_DOMAIN>/<YOUR_WEBHOOK_ENDPOINT_URI>" \
-H 'x-aghanim-signature-timestamp: <EVENT_TIMESTAMP>' \
-H 'x-aghanim-signature: <HMAC-SHA256_SIGNATURE>' \
-H 'user-agent: Aghanim/0.1.0' \
-H 'content-type: application/json' \
-H 'accept: */*' \
-H 'host: <YOUR_WEBHOOK_ENDPOINT_DOMAIN>' \
-d '{
"event_id": "whevt_eAeXhOxLwxy",
"event_type": "player.verify",
"idempotency_key": null,
"event_data": {
"player_id": "testplayer"
},
"context": null
}'
import requests
# Endpoint URL
url = "https://<YOUR_WEBHOOK_ENDPOINT_DOMAIN>/<YOUR_WEBHOOK_ENDPOINT_URI>"
# Headers
headers = {
'x-aghanim-signature-timestamp': '<EVENT_TIMESTAMP>',
'x-aghanim-signature': '<HMAC-SHA256_SIGNATURE>',
'user-agent': 'Aghanim/0.1.0',
'content-type': 'application/json',
'accept': '*/*',
'host': '<YOUR_WEBHOOK_ENDPOINT_DOMAIN>'
}
# JSON payload
data = {
"event_id": "whevt_eAeXhOxLwxy",
"event_type": "player.verify",
"idempotency_key": null,
"event_data": {
"player_id": "testplayer"
},
"context": null
}
# Make the POST request
response = requests.post(url, headers=headers, json=data)
# Print the response from the server
print(response.text)
Need help?
Contact our integration team at integration@aghanim.com