InboxIssue

Code Examples

SDK examples for Python, JavaScript, Ruby, and cURL

Code Examples

Complete code examples for integrating with the InboxIssue API.

Python

import requests
import time
from typing import Optional, Dict, List
 
class InboxIssueClient:
    def __init__(
        self,
        api_token: str,
        base_url: str = "https://app.inboxissue.com/api/v1"
    ):
        self.api_token = api_token
        self.base_url = base_url
        self.headers = {
            "Authorization": f"Bearer {api_token}",
            "Content-Type": "application/json"
        }
 
    def get_providers(
        self,
        provider_types: Optional[List[str]] = None
    ) -> Dict:
        """Get available test email addresses."""
        params = {}
        if provider_types:
            params["provider_types[]"] = provider_types
 
        response = requests.get(
            f"{self.base_url}/external_tests/providers",
            headers=self.headers,
            params=params
        )
        response.raise_for_status()
        return response.json()
 
    def create_test(
        self,
        name: str,
        source: str,
        metadata: Optional[Dict] = None,
        provider_types: Optional[List[str]] = None
    ) -> Dict:
        """Create a new deliverability test."""
        payload = {
            "external_test": {
                "name": name,
                "source": source,
                "metadata": metadata or {}
            }
        }
        if provider_types:
            payload["provider_types"] = provider_types
 
        response = requests.post(
            f"{self.base_url}/external_tests",
            headers=self.headers,
            json=payload
        )
        response.raise_for_status()
        return response.json()
 
    def get_test(self, test_id: int) -> Dict:
        """Get test status and results."""
        response = requests.get(
            f"{self.base_url}/external_tests/{test_id}",
            headers=self.headers
        )
        response.raise_for_status()
        return response.json()
 
    def list_tests(
        self,
        source: Optional[str] = None,
        status: Optional[str] = None,
        page: int = 1,
        per_page: int = 10
    ) -> Dict:
        """List external tests with filtering."""
        params = {"page": page, "per_page": per_page}
        if source:
            params["source"] = source
        if status:
            params["status"] = status
 
        response = requests.get(
            f"{self.base_url}/external_tests",
            headers=self.headers,
            params=params
        )
        response.raise_for_status()
        return response.json()
 
    def wait_for_completion(
        self,
        test_id: int,
        timeout_seconds: int = 3600,
        poll_interval: int = 30
    ) -> Dict:
        """Poll until test completes or times out."""
        start = time.time()
 
        while time.time() - start < timeout_seconds:
            result = self.get_test(test_id)
            status = result["test"]["status"]
 
            if status in ["complete", "expired"]:
                return result
 
            time.sleep(poll_interval)
 
        raise TimeoutError(
            f"Test {test_id} did not complete within {timeout_seconds}s"
        )
 
 
# Usage example
if __name__ == "__main__":
    client = InboxIssueClient("your_api_token_here")
 
    # Create a test
    test = client.create_test(
        name="My Campaign Test",
        source="my_app",
        metadata={"campaign_id": "123"},
        provider_types=["consumer", "business"]
    )
 
    print(f"Created test {test['test']['id']}")
    print(f"Send emails to: {test['send_to']}")
    print(f"Include tracking ID: {test['test']['tracking_id']}")
 
    # Wait for results
    result = client.wait_for_completion(test['test']['id'])
    print(f"Inbox rate: {result['summary']['inbox_rate']}%")

JavaScript / Node.js

const axios = require('axios');
 
class InboxIssueClient {
  constructor(apiToken, baseUrl = 'https://app.inboxissue.com/api/v1') {
    this.apiToken = apiToken;
    this.baseUrl = baseUrl;
    this.client = axios.create({
      baseURL: baseUrl,
      headers: {
        'Authorization': `Bearer ${apiToken}`,
        'Content-Type': 'application/json'
      }
    });
  }
 
  async getProviders(providerTypes = null) {
    const params = providerTypes
      ? { 'provider_types[]': providerTypes }
      : {};
    const response = await this.client.get(
      '/external_tests/providers',
      { params }
    );
    return response.data;
  }
 
  async createTest(name, source, metadata = {}, providerTypes = null) {
    const payload = {
      external_test: { name, source, metadata }
    };
    if (providerTypes) {
      payload.provider_types = providerTypes;
    }
    const response = await this.client.post('/external_tests', payload);
    return response.data;
  }
 
  async getTest(testId) {
    const response = await this.client.get(`/external_tests/${testId}`);
    return response.data;
  }
 
  async listTests({ source, status, page = 1, perPage = 10 } = {}) {
    const params = { page, per_page: perPage };
    if (source) params.source = source;
    if (status) params.status = status;
    const response = await this.client.get('/external_tests', { params });
    return response.data;
  }
 
  async waitForCompletion(
    testId,
    timeoutMs = 3600000,
    pollIntervalMs = 30000
  ) {
    const startTime = Date.now();
 
    while (Date.now() - startTime < timeoutMs) {
      const result = await this.getTest(testId);
      const status = result.test.status;
 
      if (status === 'complete' || status === 'expired') {
        return result;
      }
 
      await new Promise(resolve =>
        setTimeout(resolve, pollIntervalMs)
      );
    }
 
    throw new Error(
      `Test ${testId} did not complete within ${timeoutMs}ms`
    );
  }
}
 
// Usage example
async function main() {
  const client = new InboxIssueClient('your_api_token_here');
 
  try {
    // Create a test
    const test = await client.createTest(
      'My Campaign Test',
      'my_app',
      { campaign_id: '123' },
      ['consumer', 'business']
    );
 
    console.log(`Created test ${test.test.id}`);
    console.log('Send emails to:', test.send_to);
    console.log(`Include tracking ID: ${test.test.tracking_id}`);
 
    // Wait for results
    const result = await client.waitForCompletion(test.test.id);
    console.log(`Inbox rate: ${result.summary.inbox_rate}%`);
 
  } catch (error) {
    console.error('Error:', error.message);
  }
}
 
main();

Ruby

require 'httparty'
require 'json'
 
class InboxIssueClient
  include HTTParty
  base_uri 'https://app.inboxissue.com/api/v1'
 
  def initialize(api_token)
    @headers = {
      'Authorization' => "Bearer #{api_token}",
      'Content-Type' => 'application/json'
    }
  end
 
  def get_providers(provider_types: nil)
    options = { headers: @headers }
    if provider_types
      options[:query] = { 'provider_types[]' => provider_types }
    end
    self.class.get('/external_tests/providers', options)
  end
 
  def create_test(name:, source:, metadata: {}, provider_types: nil)
    body = {
      external_test: {
        name: name,
        source: source,
        metadata: metadata
      }
    }
    body[:provider_types] = provider_types if provider_types
 
    self.class.post('/external_tests',
      headers: @headers,
      body: body.to_json
    )
  end
 
  def get_test(test_id)
    self.class.get(
      "/external_tests/#{test_id}",
      headers: @headers
    )
  end
 
  def list_tests(source: nil, status: nil, page: 1, per_page: 10)
    query = { page: page, per_page: per_page }
    query[:source] = source if source
    query[:status] = status if status
 
    self.class.get('/external_tests',
      headers: @headers,
      query: query
    )
  end
 
  def wait_for_completion(
    test_id,
    timeout_seconds: 3600,
    poll_interval: 30
  )
    start_time = Time.now
 
    loop do
      result = get_test(test_id)
      status = result['test']['status']
 
      return result if %w[complete expired].include?(status)
 
      if Time.now - start_time > timeout_seconds
        raise "Test #{test_id} did not complete"
      end
 
      sleep(poll_interval)
    end
  end
end
 
# Usage example
client = InboxIssueClient.new('your_api_token_here')
 
# Create a test
test = client.create_test(
  name: 'My Campaign Test',
  source: 'my_app',
  metadata: { campaign_id: '123' },
  provider_types: %w[consumer business]
)
 
puts "Created test #{test['test']['id']}"
puts "Send emails to: #{test['send_to']}"
puts "Include tracking ID: #{test['test']['tracking_id']}"
 
# Wait for results
result = client.wait_for_completion(test['test']['id'])
puts "Inbox rate: #{result['summary']['inbox_rate']}%"

cURL

#!/bin/bash
API_TOKEN="your_api_token_here"
BASE_URL="https://app.inboxissue.com/api/v1"
 
# Get providers
echo "=== Getting Providers ==="
curl -s -X GET "$BASE_URL/external_tests/providers" \
  -H "Authorization: Bearer $API_TOKEN" | jq
 
# Create test
echo "=== Creating Test ==="
TEST_RESPONSE=$(curl -s -X POST "$BASE_URL/external_tests" \
  -H "Authorization: Bearer $API_TOKEN" \
  -H "Content-Type: application/json" \
  -d '{
    "external_test": {
      "name": "My Campaign Test",
      "source": "shell_script",
      "metadata": {"campaign_id": "123"}
    },
    "provider_types": ["consumer"]
  }')
 
TEST_ID=$(echo $TEST_RESPONSE | jq -r '.test.id')
TRACKING_ID=$(echo $TEST_RESPONSE | jq -r '.test.tracking_id')
 
echo "Created test $TEST_ID"
echo "Tracking ID: $TRACKING_ID"
echo "Send to:"
echo $TEST_RESPONSE | jq '.send_to'
 
# Poll for results
echo "=== Polling for Results ==="
while true; do
  RESULT=$(curl -s -X GET "$BASE_URL/external_tests/$TEST_ID" \
    -H "Authorization: Bearer $API_TOKEN")
 
  STATUS=$(echo $RESULT | jq -r '.test.status')
  COMPLETED=$(echo $RESULT | jq -r '.summary.completed')
  TOTAL=$(echo $RESULT | jq -r '.summary.total_providers')
 
  echo "Status: $STATUS ($COMPLETED/$TOTAL)"
 
  if [ "$STATUS" = "complete" ] || [ "$STATUS" = "expired" ]; then
    echo "Final inbox rate: $(echo $RESULT | jq '.summary.inbox_rate')%"
    break
  fi
 
  sleep 30
done

Webhook Handler Examples

Python (Flask)

import hmac
import hashlib
from flask import Flask, request, jsonify
 
app = Flask(__name__)
WEBHOOK_SECRET = 'your_webhook_secret'
 
def verify_signature(payload: bytes, signature: str) -> bool:
    expected = hmac.new(
        WEBHOOK_SECRET.encode(),
        payload,
        hashlib.sha256
    ).hexdigest()
    return hmac.compare_digest(expected, signature)
 
@app.route('/webhook', methods=['POST'])
def webhook():
    payload = request.get_data()
    signature = request.headers.get('X-InboxIssue-Signature')
 
    if not verify_signature(payload, signature):
        return jsonify({'error': 'Invalid signature'}), 401
 
    data = request.get_json()
 
    # Process result
    event_type = data['event_type']
    test_result = data['test_result']
 
    print(f"Received {event_type}")
    print(f"Provider: {test_result['provider_name']}")
    print(f"Inbox: {test_result['delivery']['is_inbox']}")
 
    return jsonify({'status': 'ok'}), 200

Node.js (Express)

const crypto = require('crypto');
const express = require('express');
 
const app = express();
const WEBHOOK_SECRET = 'your_webhook_secret';
 
app.use(express.raw({ type: 'application/json' }));
 
function verifySignature(payload, signature) {
  const expected = crypto
    .createHmac('sha256', WEBHOOK_SECRET)
    .update(payload)
    .digest('hex');
  return crypto.timingSafeEqual(
    Buffer.from(expected),
    Buffer.from(signature)
  );
}
 
app.post('/webhook', (req, res) => {
  const payload = req.body;
  const signature = req.headers['x-inboxissue-signature'];
 
  if (!verifySignature(payload, signature)) {
    return res.status(401).json({ error: 'Invalid signature' });
  }
 
  const data = JSON.parse(payload.toString());
 
  // Process result
  console.log(`Received ${data.event_type}`);
  console.log(`Provider: ${data.test_result.provider_name}`);
  console.log(`Inbox: ${data.test_result.delivery.is_inbox}`);
 
  res.status(200).json({ status: 'ok' });
});
 
app.listen(3000);

Error Handling Example

import requests
from requests.exceptions import RequestException
 
class APIError(Exception):
    def __init__(self, message, status_code=None):
        self.message = message
        self.status_code = status_code
        super().__init__(message)
 
def safe_api_call(func):
    """Decorator for safe API calls with retry."""
    def wrapper(*args, **kwargs):
        max_retries = 3
        delay = 1
 
        for attempt in range(max_retries):
            try:
                response = func(*args, **kwargs)
 
                if response.status_code == 401:
                    raise APIError("Invalid API token", 401)
                elif response.status_code == 403:
                    raise APIError("API access not available", 403)
                elif response.status_code == 429:
                    if attempt < max_retries - 1:
                        time.sleep(delay)
                        delay *= 2
                        continue
                    raise APIError("Rate limited", 429)
                elif response.status_code >= 500:
                    if attempt < max_retries - 1:
                        time.sleep(delay)
                        delay *= 2
                        continue
                    raise APIError("Server error", response.status_code)
 
                return response.json()
 
            except RequestException as e:
                if attempt < max_retries - 1:
                    time.sleep(delay)
                    delay *= 2
                    continue
                raise APIError(f"Request failed: {e}")
 
    return wrapper

On this page