DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Please enter at least three characters to search
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Zones

Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks

The software you build is only as secure as the code that powers it. Learn how malicious code creeps into your software supply chain.

Apache Cassandra combines the benefits of major NoSQL databases to support data management needs not covered by traditional RDBMS vendors.

Generative AI has transformed nearly every industry. How can you leverage GenAI to improve your productivity and efficiency?

Modernize your data layer. Learn how to design cloud-native database architectures to meet the evolving demands of AI and GenAI workloads.

Related

  • OPC-UA and MQTT: A Guide to Protocols, Python Implementations
  • Application-Level Tracing: The Good, the Bad, and the Alternative
  • Python Stack Data Structure: A Versatile Tool for Real-time Applications
  • Enhancing Business Decision-Making Through Advanced Data Visualization Techniques

Trending

  • Monolith: The Good, The Bad and The Ugly
  • The Role of AI in Identity and Access Management for Organizations
  • Agentic AI for Automated Application Security and Vulnerability Management
  • Endpoint Security Controls: Designing a Secure Endpoint Architecture, Part 1
  1. DZone
  2. Software Design and Architecture
  3. Integration
  4. Automating Twilio Recording Exports for Quality Purposes: Python Implementation Guidelines

Automating Twilio Recording Exports for Quality Purposes: Python Implementation Guidelines

Discover how to use Python to download recordings from Twilio and transcribe them for sentimental analysis, quality, and audit purposes.

By 
Sandeep Kakani user avatar
Sandeep Kakani
·
Jan. 07, 25 · Tutorial
Likes (2)
Comment
Save
Tweet
Share
3.3K Views

Join the DZone community and get the full member experience.

Join For Free

For crucial business operations, compliance, and quality assurance call recordings are pivotal. Twilio is a call management system that provides excellent call recording capabilities, but often organizations are in need of automatically downloading and storing these recordings locally or in their preferred cloud storage. However, downloading large numbers of recordings from Twilio can be challenging. In this article, we'll explore how to build an efficient Python solution for bulk-downloading Twilio recordings while handling pagination, parallel downloads, and queue filtering. 

Use Cases

When working with call management systems like Twilio, we might need to:

  • Download thousands of call recordings for quality assurance.
  • Export call recordings while excluding specific queues.
  • Process or download recordings within specific date ranges.
  • Handle processes efficiently without overwhelming resources.

Solution Overview

Using Python, we will create a class that handles the bulk download of recordings with the following key features: 

  • Parallel downloads using ThreadPoolExecutor
  • Pagination handling for large datasets
  • Queue filtering capabilities
  • Progress tracking with tqdm
  • Error handling and retry logic

Prerequisites

  • Python 3.8+
  • Twilio account with recordings

Required Python packages:

  • twilio
  • boto3
  • python-dotenv
  • requests
Python
 
import os
from datetime import datetime, timedelta
from twilio.rest import Client
import requests
from pathlib import Path
import random
from concurrent.futures import ThreadPoolExecutor, as_completed
from tqdm import tqdm
import time


Implementation

Complete the Python class as shown here:

Python
 
import os
from datetime import datetime, timedelta
from twilio.rest import Client
import requests
from pathlib import Path
import random
from concurrent.futures import ThreadPoolExecutor, as_completed
from tqdm import tqdm
import time

class TwilioRecordingExporter:
    def __init__(self, account_sid, auth_token, output_dir="random_recordings"):
        """
        Initialize the exporter with Twilio credentials
        """
        self.client = Client(account_sid, auth_token)
        self.account_sid = account_sid
        self.auth_token = auth_token
        self.output_dir = Path(output_dir)
        self.output_dir.mkdir(exist_ok=True)
        
        # Excluded queues with their queue SIDs (no need for names anymore)
        self.excluded_queue_sids = {
            'WQ65xxxxxxxxxxxxxxxxxxxxxxxxxxxxxx',  # Example SID 1 
            'WQ3xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx',  # Example SID 2 
            'WQexxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx',  # Example SID 3 
            'WQ0xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx'   # Example SID 4 
        }
        
        self.max_workers = 10  # Number of parallel downloads

    def download_recording(self, recording):
        """
        Download a single recording
        """
        try:
            date_str = recording.date_created.strftime('%Y%m%d_')
            filename = f"{date_str}{recording.sid}.wav"
            filepath = self.output_dir / filename

            if filepath.exists():
                return filepath

            wav_url = f"{recording.media_url}.wav"
            response = requests.get(wav_url, auth=(self.account_sid, self.auth_token))
            
            if response.status_code == 200:
                filepath.write_bytes(response.content)
                return filepath
            else:
                print(f"\nFailed to download {recording.sid}: {response.status_code}")
                return None
        except Exception as e:
            print(f"\nError downloading recording {recording.sid}: {str(e)}")
            return None

    def download_batch(self, recordings):
        """
        Download a batch of recordings in parallel
        """
        successful_downloads = []
        with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
            future_to_recording = {
                executor.submit(self.download_recording, recording): recording 
                for recording in recordings
            }
            
            for future in as_completed(future_to_recording):
                filepath = future.result()
                if filepath:
                    successful_downloads.append(filepath)
                
        return successful_downloads

    def export_random_recordings(self, num_recordings=10000, days_back=180, batch_size=100):
        """
        Export random recordings while excluding specific queues
        """
        downloaded_files = []
        
        try:
            # Calculate date range
            end_date = datetime.utcnow()
            start_date = end_date - timedelta(days=days_back)
            
            print(f"Fetching recordings from {start_date} to {end_date}")
            print("Excluded queues SIDs:", ", ".join(self.excluded_queue_sids))
            
            # Fetch recordings with pagination
            all_recordings = []
            page = self.client.recordings.list(
                date_created_after=start_date,
                date_created_before=end_date,
                page_size=100  # Maximum page size
            )
            
            with tqdm(desc="Fetching recordings", unit="page") as pbar:
                while page:
                    all_recordings.extend(page)
                    pbar.update(1)
                    if len(all_recordings) >= num_recordings * 2:  # Fetch extra to account for excluded queues
                        break
                    page = page.next_page() if hasattr(page, 'next_page') else None

            print(f"\nFound {len(all_recordings)} recordings")
            
            # Shuffle recordings
            random.shuffle(all_recordings)
            
            # Process in batches
            selected_recordings = []
            processed_count = 0
            
            with tqdm(total=num_recordings, desc="Downloading recordings") as pbar:
                for i in range(0, len(all_recordings), batch_size):
                    if processed_count >= num_recordings:
                        break
                        
                    batch = all_recordings[i:i + batch_size]
                    
                    # Filter out recordings associated with excluded queues
                    filtered_batch = [
                        recording for recording in batch 
                        if not self.is_recording_in_excluded_queue(recording)
                    ]
                    
                    downloaded_batch = self.download_batch(filtered_batch)
                    downloaded_files.extend(downloaded_batch)
                    
                    new_count = min(len(downloaded_batch), num_recordings - processed_count)
                    processed_count += new_count
                    pbar.update(new_count)
                    
                    if processed_count >= num_recordings:
                        break
            
        except Exception as e:
            print(f"\nError in export process: {str(e)}")
        
        return downloaded_files[:num_recordings]

    def is_recording_in_excluded_queue(self, recording):
        """
        Check if the recording is associated with an excluded queue based on task queue SID
        """
        task_queue_sid = recording.queue_sid if hasattr(recording, 'queue_sid') else None
        return task_queue_sid in self.excluded_queue_sids

def main():
    # Your Twilio credentials
    ACCOUNT_SID = "AC738a9a46c65dxxxxxxxxxxxxxxxxx"
    AUTH_TOKEN = "xxxxxxxxxx9ae2e4572xxxxxxxxxxxx"
    
    try:
        start_time = time.time()
        
        # Create exporter instance
        exporter = TwilioRecordingExporter(ACCOUNT_SID, AUTH_TOKEN)
        
        # Download random recordings
        print("Starting random recording export...")
        downloaded_files = exporter.export_random_recordings(
            num_recordings=10000,
            days_back=180,
            batch_size=100
        )
        
        duration = time.time() - start_time
        print(f"\nExport complete:")
        print(f"- Downloaded: {len(downloaded_files)} files")
        print(f"- Location: {exporter.output_dir}")
        print(f"- Time taken: {duration:.2f} seconds")
        
    except Exception as e:
        print(f"Error: {str(e)}")

if __name__ == "__main__":
    main()
    main()

    print('success')


Let's breakdown the above code into manageable components: 

1. Basic Setup

First, we create a Python class to handle Twilio client initialization and configuration: 

Python
 
class TwilioRecordingExporter:
    def __init__(self, account_sid, auth_token, output_dir="random_recordings"):
        self.client = Client(account_sid, auth_token)
        self.output_dir = Path(output_dir)
        self.output_dir.mkdir(exist_ok=True)
        self.max_workers = 10


2. Single Record Download Implementation

The method below will handle individual recording downloads: 

Python
 
def download_recording(self, recording):
    try:
        date_str = recording.date_created.strftime('%Y%m%d_')
        filename = f"{date_str}{recording.sid}.wav"
        filepath = self.output_dir / filename

        if filepath.exists():
            return filepath

        wav_url = f"{recording.media_url}.wav"
        response = requests.get(wav_url, 
                              auth=(self.account_sid, self.auth_token))
        
        if response.status_code == 200:
            filepath.write_bytes(response.content)
            return filepath
    except Exception as e:
        print(f"\nError downloading recording {recording.sid}: {str(e)}")
        return None


3. Parallel Downloads

Implementing the code below will improve performance when downloading a large number of recordings.

Python
 
def download_batch(self, recordings):
    successful_downloads = []
    with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
        future_to_recording = {
            executor.submit(self.download_recording, recording): recording 
            for recording in recordings
        }
        
        for future in as_completed(future_to_recording):
            filepath = future.result()
            if filepath:
                successful_downloads.append(filepath)
            
    return successful_downloads


4. Queue Filtering

For queue filtering, we can filter out or eliminate a few queues that are not required for QA.

Python
 
def is_recording_in_excluded_queue(self, recording):
    task_queue_sid = recording.queue_sid if hasattr(recording, 'queue_sid') else None
    return task_queue_sid in self.excluded_queue_sids


Best Practices and Optimizations

Batch Processing

To manage resources efficiently and process recordings in batches, use the following:

Python
 
for i in range(0, len(all_recordings), batch_size):
    batch = all_recordings[i:i + batch_size]


Tracking Progress

Implement tqdm for tracking progress visually:

Python
 
with tqdm(total=num_recordings, desc="Downloading recordings") as pbar:
    # Download process
    pbar.update(new_count)


Error Handling

Error handling can be implemented at multiple levels, such as:

  1. Download failures
  2. Batch processing errors
  3. API communication issues

Resource Management

Parallel downloads can be controlled by max_workers as shown below:

Python
 
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
    # Parallel download logic


Example Usage

Use the below exporter to download 10000 records within 180 days chunking batch sizes of 100:

Python
 
exporter = TwilioRecordingExporter(ACCOUNT_SID, AUTH_TOKEN)
downloaded_files = exporter.export_random_recordings(
    num_recordings=10000,
    days_back=180,
    batch_size=100
)


Considerations for Security

  • File safety: Use pathlib for safe file operations: 
Python
 
filepath = Path(output_dir) / filename


  • Credential management:
Python
 
ACCOUNT_SID = os.environ.get('TWILIO_ACCOUNT_SID')
AUTH_TOKEN = os.environ.get('TWILIO_AUTH_TOKEN')


  • Further improvement considerations: 
    • Call metadata can be included to know the caller name, agent name, duration, and other factors.
    • Automate export to AWS S3 buckets

Conclusion

Using the solution above, one can efficiently download bulk recordings from Twilio while maintaining best practices like error handling, performance, and resource management. This implementation can be easily extended for further use cases and can be scaled according to needs.

Batch processing Implementation Management system Exporter (computing) Python (language)

Opinions expressed by DZone contributors are their own.

Related

  • OPC-UA and MQTT: A Guide to Protocols, Python Implementations
  • Application-Level Tracing: The Good, the Bad, and the Alternative
  • Python Stack Data Structure: A Versatile Tool for Real-time Applications
  • Enhancing Business Decision-Making Through Advanced Data Visualization Techniques

Partner Resources

×

Comments
Oops! Something Went Wrong

The likes didn't load as expected. Please refresh the page and try again.

ABOUT US

  • About DZone
  • Support and feedback
  • Community research
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 100
  • Nashville, TN 37211
  • support@dzone.com

Let's be friends:

Likes
There are no likes...yet! 👀
Be the first to like this post!
It looks like you're not logged in.
Sign in to see who liked this post!