Code Examples
SDK examples for Python, JavaScript, Ruby, and cURL
Code Examples
Complete code examples for integrating with the InboxIssue API.
Python
import requests
import time
from typing import Optional, Dict, List
class InboxIssueClient:
def __init__(
self,
api_token: str,
base_url: str = "https://app.inboxissue.com/api/v1"
):
self.api_token = api_token
self.base_url = base_url
self.headers = {
"Authorization": f"Bearer {api_token}",
"Content-Type": "application/json"
}
def get_providers(
self,
provider_types: Optional[List[str]] = None
) -> Dict:
"""Get available test email addresses."""
params = {}
if provider_types:
params["provider_types[]"] = provider_types
response = requests.get(
f"{self.base_url}/external_tests/providers",
headers=self.headers,
params=params
)
response.raise_for_status()
return response.json()
def create_test(
self,
name: str,
source: str,
metadata: Optional[Dict] = None,
provider_types: Optional[List[str]] = None
) -> Dict:
"""Create a new deliverability test."""
payload = {
"external_test": {
"name": name,
"source": source,
"metadata": metadata or {}
}
}
if provider_types:
payload["provider_types"] = provider_types
response = requests.post(
f"{self.base_url}/external_tests",
headers=self.headers,
json=payload
)
response.raise_for_status()
return response.json()
def get_test(self, test_id: int) -> Dict:
"""Get test status and results."""
response = requests.get(
f"{self.base_url}/external_tests/{test_id}",
headers=self.headers
)
response.raise_for_status()
return response.json()
def list_tests(
self,
source: Optional[str] = None,
status: Optional[str] = None,
page: int = 1,
per_page: int = 10
) -> Dict:
"""List external tests with filtering."""
params = {"page": page, "per_page": per_page}
if source:
params["source"] = source
if status:
params["status"] = status
response = requests.get(
f"{self.base_url}/external_tests",
headers=self.headers,
params=params
)
response.raise_for_status()
return response.json()
def wait_for_completion(
self,
test_id: int,
timeout_seconds: int = 3600,
poll_interval: int = 30
) -> Dict:
"""Poll until test completes or times out."""
start = time.time()
while time.time() - start < timeout_seconds:
result = self.get_test(test_id)
status = result["test"]["status"]
if status in ["complete", "expired"]:
return result
time.sleep(poll_interval)
raise TimeoutError(
f"Test {test_id} did not complete within {timeout_seconds}s"
)
# Usage example
if __name__ == "__main__":
client = InboxIssueClient("your_api_token_here")
# Create a test
test = client.create_test(
name="My Campaign Test",
source="my_app",
metadata={"campaign_id": "123"},
provider_types=["consumer", "business"]
)
print(f"Created test {test['test']['id']}")
print(f"Send emails to: {test['send_to']}")
print(f"Include tracking ID: {test['test']['tracking_id']}")
# Wait for results
result = client.wait_for_completion(test['test']['id'])
print(f"Inbox rate: {result['summary']['inbox_rate']}%")JavaScript / Node.js
const axios = require('axios');
class InboxIssueClient {
constructor(apiToken, baseUrl = 'https://app.inboxissue.com/api/v1') {
this.apiToken = apiToken;
this.baseUrl = baseUrl;
this.client = axios.create({
baseURL: baseUrl,
headers: {
'Authorization': `Bearer ${apiToken}`,
'Content-Type': 'application/json'
}
});
}
async getProviders(providerTypes = null) {
const params = providerTypes
? { 'provider_types[]': providerTypes }
: {};
const response = await this.client.get(
'/external_tests/providers',
{ params }
);
return response.data;
}
async createTest(name, source, metadata = {}, providerTypes = null) {
const payload = {
external_test: { name, source, metadata }
};
if (providerTypes) {
payload.provider_types = providerTypes;
}
const response = await this.client.post('/external_tests', payload);
return response.data;
}
async getTest(testId) {
const response = await this.client.get(`/external_tests/${testId}`);
return response.data;
}
async listTests({ source, status, page = 1, perPage = 10 } = {}) {
const params = { page, per_page: perPage };
if (source) params.source = source;
if (status) params.status = status;
const response = await this.client.get('/external_tests', { params });
return response.data;
}
async waitForCompletion(
testId,
timeoutMs = 3600000,
pollIntervalMs = 30000
) {
const startTime = Date.now();
while (Date.now() - startTime < timeoutMs) {
const result = await this.getTest(testId);
const status = result.test.status;
if (status === 'complete' || status === 'expired') {
return result;
}
await new Promise(resolve =>
setTimeout(resolve, pollIntervalMs)
);
}
throw new Error(
`Test ${testId} did not complete within ${timeoutMs}ms`
);
}
}
// Usage example
async function main() {
const client = new InboxIssueClient('your_api_token_here');
try {
// Create a test
const test = await client.createTest(
'My Campaign Test',
'my_app',
{ campaign_id: '123' },
['consumer', 'business']
);
console.log(`Created test ${test.test.id}`);
console.log('Send emails to:', test.send_to);
console.log(`Include tracking ID: ${test.test.tracking_id}`);
// Wait for results
const result = await client.waitForCompletion(test.test.id);
console.log(`Inbox rate: ${result.summary.inbox_rate}%`);
} catch (error) {
console.error('Error:', error.message);
}
}
main();Ruby
require 'httparty'
require 'json'
class InboxIssueClient
include HTTParty
base_uri 'https://app.inboxissue.com/api/v1'
def initialize(api_token)
@headers = {
'Authorization' => "Bearer #{api_token}",
'Content-Type' => 'application/json'
}
end
def get_providers(provider_types: nil)
options = { headers: @headers }
if provider_types
options[:query] = { 'provider_types[]' => provider_types }
end
self.class.get('/external_tests/providers', options)
end
def create_test(name:, source:, metadata: {}, provider_types: nil)
body = {
external_test: {
name: name,
source: source,
metadata: metadata
}
}
body[:provider_types] = provider_types if provider_types
self.class.post('/external_tests',
headers: @headers,
body: body.to_json
)
end
def get_test(test_id)
self.class.get(
"/external_tests/#{test_id}",
headers: @headers
)
end
def list_tests(source: nil, status: nil, page: 1, per_page: 10)
query = { page: page, per_page: per_page }
query[:source] = source if source
query[:status] = status if status
self.class.get('/external_tests',
headers: @headers,
query: query
)
end
def wait_for_completion(
test_id,
timeout_seconds: 3600,
poll_interval: 30
)
start_time = Time.now
loop do
result = get_test(test_id)
status = result['test']['status']
return result if %w[complete expired].include?(status)
if Time.now - start_time > timeout_seconds
raise "Test #{test_id} did not complete"
end
sleep(poll_interval)
end
end
end
# Usage example
client = InboxIssueClient.new('your_api_token_here')
# Create a test
test = client.create_test(
name: 'My Campaign Test',
source: 'my_app',
metadata: { campaign_id: '123' },
provider_types: %w[consumer business]
)
puts "Created test #{test['test']['id']}"
puts "Send emails to: #{test['send_to']}"
puts "Include tracking ID: #{test['test']['tracking_id']}"
# Wait for results
result = client.wait_for_completion(test['test']['id'])
puts "Inbox rate: #{result['summary']['inbox_rate']}%"cURL
#!/bin/bash
API_TOKEN="your_api_token_here"
BASE_URL="https://app.inboxissue.com/api/v1"
# Get providers
echo "=== Getting Providers ==="
curl -s -X GET "$BASE_URL/external_tests/providers" \
-H "Authorization: Bearer $API_TOKEN" | jq
# Create test
echo "=== Creating Test ==="
TEST_RESPONSE=$(curl -s -X POST "$BASE_URL/external_tests" \
-H "Authorization: Bearer $API_TOKEN" \
-H "Content-Type: application/json" \
-d '{
"external_test": {
"name": "My Campaign Test",
"source": "shell_script",
"metadata": {"campaign_id": "123"}
},
"provider_types": ["consumer"]
}')
TEST_ID=$(echo $TEST_RESPONSE | jq -r '.test.id')
TRACKING_ID=$(echo $TEST_RESPONSE | jq -r '.test.tracking_id')
echo "Created test $TEST_ID"
echo "Tracking ID: $TRACKING_ID"
echo "Send to:"
echo $TEST_RESPONSE | jq '.send_to'
# Poll for results
echo "=== Polling for Results ==="
while true; do
RESULT=$(curl -s -X GET "$BASE_URL/external_tests/$TEST_ID" \
-H "Authorization: Bearer $API_TOKEN")
STATUS=$(echo $RESULT | jq -r '.test.status')
COMPLETED=$(echo $RESULT | jq -r '.summary.completed')
TOTAL=$(echo $RESULT | jq -r '.summary.total_providers')
echo "Status: $STATUS ($COMPLETED/$TOTAL)"
if [ "$STATUS" = "complete" ] || [ "$STATUS" = "expired" ]; then
echo "Final inbox rate: $(echo $RESULT | jq '.summary.inbox_rate')%"
break
fi
sleep 30
doneWebhook Handler Examples
Python (Flask)
import hmac
import hashlib
from flask import Flask, request, jsonify
app = Flask(__name__)
WEBHOOK_SECRET = 'your_webhook_secret'
def verify_signature(payload: bytes, signature: str) -> bool:
expected = hmac.new(
WEBHOOK_SECRET.encode(),
payload,
hashlib.sha256
).hexdigest()
return hmac.compare_digest(expected, signature)
@app.route('/webhook', methods=['POST'])
def webhook():
payload = request.get_data()
signature = request.headers.get('X-InboxIssue-Signature')
if not verify_signature(payload, signature):
return jsonify({'error': 'Invalid signature'}), 401
data = request.get_json()
# Process result
event_type = data['event_type']
test_result = data['test_result']
print(f"Received {event_type}")
print(f"Provider: {test_result['provider_name']}")
print(f"Inbox: {test_result['delivery']['is_inbox']}")
return jsonify({'status': 'ok'}), 200Node.js (Express)
const crypto = require('crypto');
const express = require('express');
const app = express();
const WEBHOOK_SECRET = 'your_webhook_secret';
app.use(express.raw({ type: 'application/json' }));
function verifySignature(payload, signature) {
const expected = crypto
.createHmac('sha256', WEBHOOK_SECRET)
.update(payload)
.digest('hex');
return crypto.timingSafeEqual(
Buffer.from(expected),
Buffer.from(signature)
);
}
app.post('/webhook', (req, res) => {
const payload = req.body;
const signature = req.headers['x-inboxissue-signature'];
if (!verifySignature(payload, signature)) {
return res.status(401).json({ error: 'Invalid signature' });
}
const data = JSON.parse(payload.toString());
// Process result
console.log(`Received ${data.event_type}`);
console.log(`Provider: ${data.test_result.provider_name}`);
console.log(`Inbox: ${data.test_result.delivery.is_inbox}`);
res.status(200).json({ status: 'ok' });
});
app.listen(3000);Error Handling Example
import requests
from requests.exceptions import RequestException
class APIError(Exception):
def __init__(self, message, status_code=None):
self.message = message
self.status_code = status_code
super().__init__(message)
def safe_api_call(func):
"""Decorator for safe API calls with retry."""
def wrapper(*args, **kwargs):
max_retries = 3
delay = 1
for attempt in range(max_retries):
try:
response = func(*args, **kwargs)
if response.status_code == 401:
raise APIError("Invalid API token", 401)
elif response.status_code == 403:
raise APIError("API access not available", 403)
elif response.status_code == 429:
if attempt < max_retries - 1:
time.sleep(delay)
delay *= 2
continue
raise APIError("Rate limited", 429)
elif response.status_code >= 500:
if attempt < max_retries - 1:
time.sleep(delay)
delay *= 2
continue
raise APIError("Server error", response.status_code)
return response.json()
except RequestException as e:
if attempt < max_retries - 1:
time.sleep(delay)
delay *= 2
continue
raise APIError(f"Request failed: {e}")
return wrapperRelated
- Authentication - API token setup
- External Tests API - Endpoint reference
- Webhooks - Webhook integration