Enable Dark Mode!
how-to-create-automated-importexport-scripts-in-odoo-19.jpg
By: Sayed Mahir Abdulla KK

How to Create Automated Import/Export Scripts in Odoo 19

Technical Odoo 19 Odoo Enterprises Odoo Community

Data shouldn’t be stuck in one place. Whether you’re getting daily sales from a partner’s system or sending invoices to your accountant’s software, manually uploading CSV files becomes slow and inefficient over time.

Automated scripts help connect your systems. Instead of manually importing data every day, you can set everything to run automatically—daily, hourly, or whenever something happens. This not only saves time but also reduces human errors that often occur during manual work.

Usage

For Imports: You typically use Python methods with Odoo's ORM to read data from external sources (JSON, XML, API responses) and create or update records in Odoo. The script can be triggered by scheduled actions (cron jobs) or called manually.

For Exports: You extract data from Odoo models using search and read operations, format it into the desired structure (CSV, Excel, JSON, XML), and send it to external systems via file export, email, FTP, or API calls.

Examples:

1. Scheduled Product Export to CSV

Set up a scheduled action that runs daily, extracts product data, writes it to a CSV file, and saves it to a specific directory or emails it to stakeholders.

File: your_module/models/product_export.py

import csv
import logging
import os
from datetime import datetime
from odoo import models, api
_logger = logging.getLogger(__name__)
EXPORT_DIR = '/home/odoo/exports/'

class ProductExport(models.Model):
   _name = 'product.export'
   _description = 'Scheduled Product Export'
   @api.model
   def export_products_to_csv(self):
       """
       Exports all active storable/consumable products to a dated CSV file.
       Designed to be called by a Scheduled Action (cron).
       """
       # Ensure the export directory exists
       os.makedirs(EXPORT_DIR, exist_ok=True)
       timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
       filename = f"products_export_{timestamp}.csv"
       filepath = os.path.join(EXPORT_DIR, filename)
       # Fetch products - adjust domain as needed
       products = self.env['product.template'].search([
           ('active', '=', True),
           ('type', 'in', ['product', 'consu']),
       ])
       fieldnames = [
           'id', 'name', 'default_code', 'type',
           'list_price', 'standard_price', 'categ_id',
           'qty_available', 'uom_id'
       ]
       rows_written = 0
       with open(filepath, mode='w', newline='', encoding='utf-8') as csvfile:
           writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
           writer.writeheader()
           for product in products:
               writer.writerow({
                   'id':             product.id,
                   'name':           product.name,
                   'default_code':   product.default_code or '',
                   'type':           product.type,
                   'list_price':     product.list_price,
                   'standard_price': product.standard_price,
                   'categ_id':       product.categ_id.name if product.categ_id else '',
                   'qty_available':  product.qty_available,
                   'uom_id':         product.uom_id.name if product.uom_id else '',
               })
               rows_written += 1
       _logger.info("Product export complete: %d rows written to %s", rows_written, filepath)
       # Optional: Email the file to stakeholders
       self._email_export_file(filepath, filename, rows_written)
       return filepath
   def _email_export_file(self, filepath, filename, row_count):
       """Attaches the CSV to an email and sends it."""
       recipient_email = self.env['ir.config_parameter'].sudo().get_param(
           'product_export.recipient_email', default='manager@yourcompany.com'
       )
       with open(filepath, 'rb') as f:
           file_content = f.read()
       # Create an ir.attachment
       attachment = self.env['ir.attachment'].create({
           'name':       filename,
           'type':       'binary',
           'datas':      file_content,
           'res_model':  'product.export',
           'mimetype':   'text/csv',
       })
       mail_values = {
           'subject':        f'Daily Product Export — {datetime.now().strftime("%Y-%m-%d")}',
           'body_html':      f'<p>Please find attached the daily product export report '
                             f'containing <strong>{row_count}</strong> products.</p>',
           'email_to':       recipient_email,
           'attachment_ids': [(4, attachment.id)],
       }
       mail = self.env['mail.mail'].create(mail_values)
       mail.send()
       _logger.info("Product export email sent to %s", recipient_email)

Registering the Scheduled Action (XML):

<odoo>
   <record id="cron_export_products" model="ir.cron">
       <field name="name">Daily Product Export to CSV</field>
       <field name="model_id" ref="your_module.model_product_export"/>
       <field name="state">code</field>
       <field name="code">model.export_products_to_csv()</field>
       <field name="interval_number">1</field>
       <field name="interval_type">days</field>
       <field name="numbercall">-1</field>
       <field name="active">True</field>
   </record>
</odoo>

2. Import from External API

Use Python's requests library to fetch data from a REST API, parse the JSON response, and map it to Odoo model fields for import.

File: your_module/models/api_import.py

import logging
import requests
from odoo import models, api
from odoo.exceptions import UserError
_logger = logging.getLogger(__name__)
# Store these in Odoo System Parameters (Settings > Technical > Parameters)
API_BASE_URL  = 'https://api.partnerstore.com/v2'
API_KEY       = 'your_api_key_here'

class ApiImport(models.Model):
   _name = 'api.import'
   _description = 'External API Import'
   @api.model
   def import_orders_from_api(self):
       """
       Fetches new orders from an external REST API and creates sale.orders in Odoo.
       """
       # Retrieve API key securely from system parameters
       api_key = self.env['ir.config_parameter'].sudo().get_param(
           'api_import.api_key', default=API_KEY
       )
       headers = {
           'Authorization': f'Bearer {api_key}',
           'Content-Type':  'application/json',
           'Accept':        'application/json',
       }
       try:
           response = requests.get(
               f'{API_BASE_URL}/orders',
               headers=headers,
               params={'status': 'new', 'per_page': 100},
               timeout=30,  # Always set a timeout
           )
           response.raise_for_status()  # Raises HTTPError for 4xx/5xx
       except requests.exceptions.Timeout:
           _logger.error("API request timed out after 30 seconds.")
           return
       except requests.exceptions.ConnectionError as e:
           _logger.error("Cannot connect to API: %s", str(e))
           return
       except requests.exceptions.HTTPError as e:
           _logger.error("API returned error: %s", str(e))
           return
       orders_data = response.json()
       if not isinstance(orders_data, list):
           _logger.error("Unexpected API response format: %s", type(orders_data))
           return
       created_count = 0
       skipped_count = 0
       for order in orders_data:
           try:
               external_ref = str(order.get('id', ''))
               # Skip if already imported (check for duplicate external reference)
               existing = self.env['sale.order'].search(
                   [('client_order_ref', '=', f'EXT-{external_ref}')], limit=1
               )
               if existing:
                   skipped_count += 1
                   continue
               # Resolve or create the customer
               partner = self._get_or_create_partner(order.get('customer', {}))
               if not partner:
                   _logger.warning("Could not resolve customer for order %s", external_ref)
                   continue
               # Build order lines
               order_lines = []
               for item in order.get('line_items', []):
                   product = self.env['product.product'].search(
                       [('default_code', '=', item.get('sku'))], limit=1
                   )
                   if not product:
                       _logger.warning("Product SKU not found: %s", item.get('sku'))
                       continue
                   order_lines.append((0, 0, {
                       'product_id':    product.id,
                       'product_uom_qty': float(item.get('quantity', 1)),
                       'price_unit':    float(item.get('price', 0.0)),
                       'name':          item.get('name', product.name),
                   }))
               if not order_lines:
                   _logger.warning("No valid order lines for order %s", external_ref)
                   continue
               # Create the sale order
               self.env['sale.order'].create({
                   'partner_id':       partner.id,
                   'client_order_ref': f'EXT-{external_ref}',
                   'order_line':       order_lines,
                   'note':             order.get('customer_note', ''),
               })
               created_count += 1
           except Exception as e:
               _logger.error("Failed to import order %s: %s", order.get('id'), str(e))
               continue
       _logger.info(
           "API Import done — Created: %d | Skipped (duplicate): %d",
           created_count, skipped_count
       )
   def _get_or_create_partner(self, customer_data):
       """Finds an existing partner by email or creates a new one."""
       email = customer_data.get('email', '').strip()
       if not email:
           return None
       partner = self.env['res.partner'].search([('email', '=', email)], limit=1)
       if not partner:
           partner = self.env['res.partner'].create({
               'name':  f"{customer_data.get('first_name', '')} {customer_data.get('last_name', '')}".strip(),
               'email': email,
               'phone': customer_data.get('phone', ''),
           })
           _logger.info("Created new partner: %s", partner.name)
       return partner

3. Export Invoice Data to Excel

Generate Excel reports using libraries like openpyxl or xlsxwriter, format them with headers and styling, and attach them to email or save to file server.

Install dependency: pip install openpyxl (add to requirements.txt)

File: your_module/models/invoice_export.py

import io
import base64
import logging
from datetime import datetime, date
from odoo import models, api
_logger = logging.getLogger(__name__)

class InvoiceExportExcel(models.Model):
   _name = 'invoice.export.excel'
   _description = 'Invoice Excel Export'
   @api.model
   def export_invoices_to_excel(self, date_from=None, date_to=None):
       """
       Generates an Excel report of posted invoices within a date range.
       Returns the file as a base64-encoded ir.attachment.
       """
       try:
           import openpyxl
           from openpyxl.styles import Font, PatternFill, Alignment, Border, Side
           from openpyxl.utils import get_column_letter
       except ImportError:
           _logger.error("openpyxl is not installed. Run: pip install openpyxl")
           return False
       # Default to current month if no dates provided
       today = date.today()
       if not date_from:
           date_from = today.replace(day=1)
       if not date_to:
           date_to = today
       # Fetch posted (validated) customer invoices
       invoices = self.env['account.move'].search([
           ('move_type',      '=',  'out_invoice'),
           ('state',          '=',  'posted'),
           ('invoice_date',   '>=', date_from),
           ('invoice_date',   '<=', date_to),
       ], order='invoice_date asc')
       # -- Build Excel workbook ----------------------------------------------
       wb = openpyxl.Workbook()
       ws = wb.active
       ws.title = 'Invoices'
       # Styles
       header_font    = Font(name='Calibri', bold=True, color='FFFFFF', size=11)
       header_fill    = PatternFill(start_color='1F4E79', end_color='1F4E79', fill_type='solid')
       header_align   = Alignment(horizontal='center', vertical='center', wrap_text=True)
       currency_fmt   = '#,##0.00'
       date_fmt       = 'YYYY-MM-DD'
       thin_border    = Border(
           left=Side(style='thin', color='CCCCCC'),
           right=Side(style='thin', color='CCCCCC'),
           bottom=Side(style='thin', color='CCCCCC'),
       )
       alt_fill       = PatternFill(start_color='EBF3FB', end_color='EBF3FB', fill_type='solid')
       # Headers
       headers = [
           'Invoice Number', 'Invoice Date', 'Due Date',
           'Customer', 'Salesperson', 'Currency',
           'Amount Untaxed', 'Tax Amount', 'Amount Total',
           'Amount Due', 'Status'
       ]
       ws.row_dimensions[1].height = 30
       for col_idx, header in enumerate(headers, start=1):
           cell = ws.cell(row=1, column=col_idx, value=header)
           cell.font      = header_font
           cell.fill      = header_fill
           cell.alignment = header_align
           cell.border    = thin_border
       # Data rows
       for row_idx, inv in enumerate(invoices, start=2):
           is_alt = (row_idx % 2 == 0)
           row_data = [
               inv.name,
               inv.invoice_date,
               inv.invoice_date_due,
               inv.partner_id.name if inv.partner_id else '',
               inv.invoice_user_id.name if inv.invoice_user_id else '',
               inv.currency_id.name if inv.currency_id else '',
               inv.amount_untaxed,
               inv.amount_tax,
               inv.amount_total,
               inv.amount_residual,
               dict(inv._fields['payment_state'].selection).get(inv.payment_state, inv.payment_state),
           ]
           for col_idx, value in enumerate(row_data, start=1):
               cell = ws.cell(row=row_idx, column=col_idx, value=value)
               cell.border = thin_border
               if is_alt:
                   cell.fill = alt_fill
               # Apply specific formatting per column
               if col_idx in (2, 3) and value:          # Date columns
                   cell.number_format = date_fmt
                   cell.alignment = Alignment(horizontal='center')
               elif col_idx in (7, 8, 9, 10):           # Currency columns
                   cell.number_format = currency_fmt
                   cell.alignment = Alignment(horizontal='right')
       # Auto-size columns
       for col_idx, _ in enumerate(headers, start=1):
           col_letter = get_column_letter(col_idx)
           max_length = max(
               len(str(ws.cell(row=r, column=col_idx).value or ''))
               for r in range(1, len(invoices) + 2)
           )
           ws.column_dimensions[col_letter].width = min(max_length + 4, 40)
       # Freeze the header row
       ws.freeze_panes = 'A2'
       # -- Save and attach 
       buffer = io.BytesIO()
       wb.save(buffer)
       buffer.seek(0)
       filename = f"invoices_{date_from}_to_{date_to}.xlsx"
       attachment = self.env['ir.attachment'].create({
           'name':      filename,
           'type':      'binary',
           'datas':     base64.b64encode(buffer.read()),
           'res_model': 'invoice.export.excel',
           'mimetype':  'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet',
       })
       _logger.info("Invoice Excel export created: %s (%d rows)", filename, len(invoices))
       return attachment.id

4. Automated FTP File Import

Connect to an FTP server, download new files, parse them, import data to Odoo, then archive or delete processed files.

File: your_module/models/ftp_import.py

import csv
import io
import ftplib
import logging
from odoo import models, api
_logger = logging.getLogger(__name__)
# Store these in Odoo System Parameters for security
FTP_HOST     = 'ftp.yourpartner.com'
FTP_PORT     = 21
FTP_USER     = 'odoo_user'
FTP_PASSWORD = 'your_password'
FTP_INBOX    = '/incoming/orders/'
FTP_ARCHIVE  = '/processed/orders/'

class FtpImport(models.Model):
   _name = 'ftp.import'
   _description = 'FTP File Import'
   @api.model
   def run_ftp_import(self):
       """
       Connects to an FTP server, downloads CSV files from the inbox,
       imports the records to Odoo, then archives the processed files.
       """
       ftp = None
       try:
           ftp = ftplib.FTP()
           ftp.connect(host=FTP_HOST, port=FTP_PORT, timeout=30)
           ftp.login(user=FTP_USER, passwd=FTP_PASSWORD)
           _logger.info("Connected to FTP server: %s", FTP_HOST)
           # List files in the inbox directory
           ftp.cwd(FTP_INBOX)
           filenames = ftp.nlst()
           csv_files = [f for f in filenames if f.lower().endswith('.csv')]
           if not csv_files:
               _logger.info("No new CSV files found in FTP inbox.")
               return
           for filename in csv_files:
               _logger.info("Processing FTP file: %s", filename)
               try:
                   # Download the file into memory (no disk I/O needed)
                   buffer = io.BytesIO()
                   ftp.retrbinary(f'RETR {filename}', buffer.write)
                   buffer.seek(0)
                   # Decode and parse the CSV
                   content   = buffer.read().decode('utf-8')
                   reader    = csv.DictReader(io.StringIO(content))
                   imported  = self._process_csv_rows(reader, filename)
                   archive_path = f'{FTP_ARCHIVE}{filename}'
                   ftp.rename(f'{FTP_INBOX}{filename}', archive_path)
                   _logger.info(
                       "Archived %s ? %s | Records imported: %d",
                       filename, archive_path, imported
                   )
               except Exception as e:
                   _logger.error("Failed to process file %s: %s", filename, str(e))
                   continue  # Move on to the next file
       except ftplib.all_errors as e:
           _logger.error("FTP connection error: %s", str(e))
       finally:
           if ftp:
               try:
                   ftp.quit()
               except Exception:
                   pass
   def _process_csv_rows(self, reader, source_filename):
       """Processes rows from a parsed CSV DictReader and creates purchase orders."""
       count = 0
       for row in reader:
           try:
               vendor_name = row.get('vendor_name', '').strip()
               product_sku = row.get('product_sku', '').strip()
               quantity    = float(row.get('quantity', 0))
               unit_price  = float(row.get('unit_price', 0.0))
               if not all([vendor_name, product_sku, quantity]):
                   continue
               vendor = self.env['res.partner'].search(
                   [('name', 'ilike', vendor_name), ('supplier_rank', '>', 0)],
                   limit=1
               )
               if not vendor:
                   _logger.warning("Vendor not found: %s", vendor_name)
                   continue
               product = self.env['product.product'].search(
                   [('default_code', '=', product_sku)], limit=1
               )
               if not product:
                   _logger.warning("Product SKU not found: %s", product_sku)
                   continue
               # Find or create a draft PO for this vendor
               po = self.env['purchase.order'].search([
                   ('partner_id', '=', vendor.id),
                   ('state',      '=', 'draft'),
                   ('origin',     '=', f'FTP:{source_filename}'),
               ], limit=1)
               if not po:
                   po = self.env['purchase.order'].create({
                       'partner_id': vendor.id,
                       'origin':     f'FTP:{source_filename}',
                   })
               self.env['purchase.order.line'].create({
                   'order_id':          po.id,
                   'product_id':        product.id,
                   'product_qty':       quantity,
                   'price_unit':        unit_price,
                   'product_uom':       product.uom_po_id.id,
                   'date_planned':      fields.Datetime.now(),
                   'name':              product.name,
               })
               count += 1
           except Exception as e:
               _logger.error("Error on row %s: %s", row, str(e))
       return count

5. Real-time Data Sync Using Webhooks

Set up a controller endpoint that receives webhook notifications from external systems and immediately imports the data.

File: your_module/controllers/webhook.py

import json
import logging
import hmac
import hashlib
from odoo import http
from odoo.http import request, Response
_logger = logging.getLogger(__name__)
# Shared secret for verifying the webhook signature
WEBHOOK_SECRET = 'your_shared_webhook_secret'

class WebhookController(http.Controller):
   @http.route(
       '/webhook/orders/receive',
       type='http',
       auth='none',      # Public endpoint — we validate by signature, not session
       methods=['POST'],
       csrf=False,       # CSRF protection doesn't apply to webhooks
   )
   def receive_order_webhook(self, **kwargs):
       """
       Receives a webhook POST from an external system and creates a sale.order.
       Expected JSON payload:
       {
           "order_id":    "EXT-78910",
           "customer_email": "alice@example.com",
           "customer_name":  "Alice Johnson",
           "items": [
               { "sku": "PROD-001", "quantity": 2, "unit_price": 49.99 }
           ],
           "total": 99.98
       }
       """
       try:
           # -- 1. Verify the request signature -----------------------------
           raw_body  = request.httprequest.get_data()
           signature = request.httprequest.headers.get('X-Webhook-Signature', '')
           if not self._verify_signature(raw_body, signature):
               _logger.warning("Webhook received with invalid signature — rejected.")
               return Response(
                   json.dumps({'error': 'Invalid signature'}),
                   status=401, content_type='application/json'
               )
           # -- 2. Parse the JSON payload ------------------------------------
           try:
               payload = json.loads(raw_body.decode('utf-8'))
           except (json.JSONDecodeError, UnicodeDecodeError) as e:
               _logger.error("Webhook: malformed JSON body — %s", str(e))
               return Response(
                   json.dumps({'error': 'Invalid JSON'}),
                   status=400, content_type='application/json'
               )
           # -- 3. Prevent duplicate processing -----------------------------
           external_ref = payload.get('order_id', '')
           existing = request.env['sale.order'].sudo().search(
               [('client_order_ref', '=', external_ref)], limit=1
           )
           if existing:
               _logger.info("Webhook: duplicate order %s — skipped.", external_ref)
               return Response(
                   json.dumps({'status': 'skipped', 'reason': 'duplicate'}),
                   status=200, content_type='application/json'
               )
           # -- 4. Resolve or create the customer ---------------------------
           partner = request.env['res.partner'].sudo().search(
               [('email', '=', payload.get('customer_email'))], limit=1
           )
           if not partner:
               partner = request.env['res.partner'].sudo().create({
                   'name':  payload.get('customer_name', 'Unknown'),
                   'email': payload.get('customer_email', ''),
               })
           # -- 5. Build and create the sale order ---------------------------
           order_lines = []
           for item in payload.get('items', []):
               product = request.env['product.product'].sudo().search(
                   [('default_code', '=', item.get('sku'))], limit=1
               )
               if not product:
                   _logger.warning("Webhook: SKU not found: %s", item.get('sku'))
                   continue
               order_lines.append((0, 0, {
                   'product_id':      product.id,
                   'product_uom_qty': float(item.get('quantity', 1)),
                   'price_unit':      float(item.get('unit_price', 0.0)),
                   'name':            product.name,
               }))
           if not order_lines:
               return Response(
                   json.dumps({'error': 'No valid products found in payload'}),
                   status=422, content_type='application/json'
               )
           order = request.env['sale.order'].sudo().create({
               'partner_id':       partner.id,
               'client_order_ref': external_ref,
               'order_line':       order_lines,
           })
           _logger.info("Webhook: Created sale.order %s from external ref %s", order.name, external_ref)
           return Response(
               json.dumps({'status': 'ok', 'odoo_order': order.name}),
               status=200, content_type='application/json'
           )
       except Exception as e:
           _logger.exception("Webhook processing error: %s", str(e))
           return Response(
               json.dumps({'error': 'Internal server error'}),
               status=500, content_type='application/json'
           )
   def _verify_signature(self, raw_body, provided_signature):
       """
       Validates the HMAC-SHA256 signature of the incoming webhook.
       The external system signs the raw request body with the shared secret.
       """
       if not provided_signature:
           return False
       expected = hmac.new(
           WEBHOOK_SECRET.encode('utf-8'),
           raw_body,
           hashlib.sha256
       ).hexdigest()
       return hmac.compare_digest(expected, provided_signature)

Mastering automated import/export scripts transforms how you manage data in Odoo 19. Instead of spending hours on manual data entry or exports, you can set up reliable, scheduled processes that run in the background. Whether you're syncing with warehouses, accounting systems, or customer portals, automation ensures data consistency and frees your team to focus on higher-value work. Start with simple scheduled CSV exports, then gradually build more sophisticated integrations as your needs grow. The key is proper error handling, logging, and testing to ensure your automated scripts run reliably without manual intervention.

To read more about How to Transfer Data From Odoo 18 to Odoo 19 Using XML-RPC, refer to our blog How to Transfer Data From Odoo 18 to Odoo 19 Using XML-RPC.


Frequently Asked Questions

Do I need to create a custom module just to run import/export scripts?

Not always. For quick, one-off tasks, you can use Odoo's built-in shell (odoo-bin shell) to run Python code directly against the database. However, for anything that needs to run on a schedule, be triggered from the UI, or be maintained long-term, wrapping the logic inside a custom module is strongly recommended. It keeps your code version-controlled, upgradeable, and properly integrated with Odoo's permission system.

Can I trigger an import/export script from a button in the Odoo UI?

Yes. Define the method on a TransientModel (wizard), create a simple form view with a button, and map the button to your method. This is the cleanest way to give users a manual "Run Now" option without giving them server access.

Is it possible to import data into Odoo from another Odoo instance?

Yes — and it's one of the cleanest integration scenarios. Use Odoo's JSON-RPC API (the same API the web client uses) from your script to authenticate and read records from the source instance, then write them into the destination instance via the ORM. The xmlrpc.client library (built into Python) works without any extra dependencies:

How do I send an export file via email automatically after the cron runs?

Create an ir.attachment from the file content, then create and send a mail.mail record. Both steps are shown in full in Example 2 above. The key points are: encode the file bytes in base64 for the attachment datas field, link the attachment to the mail via (4, attachment.id) in the Many2many write syntax, and call mail.send() at the end. Make sure the recipient email address is stored in a System Parameter so it can be changed without a code deployment.

If you need any assistance in odoo, we are online, please chat with us.



0
Comments



Leave a comment



whatsapp_icon
location

Calicut

Cybrosys Technologies Pvt. Ltd.
Neospace, KINFRA Techno Park
Kakkanchery, Calicut
Kerala, India - 673635

location

Kochi

Cybrosys Technologies Pvt. Ltd.
1st Floor, Thapasya Building,
Infopark, Kakkanad,
Kochi, India - 682030.

location

Bangalore

Cybrosys Techno Solutions
The Estate, 8th Floor,
Dickenson Road,
Bangalore, India - 560042

Send Us A Message