#!/usr/bin/env python3
import json
import os
import logging
import asyncio
import aiohttp
from datetime import datetime, timedelta
import mysql.connector
import random
from urllib.parse import urlparse, urlunparse
import sys
from typing import Optional, Set
from pathlib import Path
import aiofiles
import redis as redis_lib
MAX_RETRIES = 10
RETRY_BASE_DELAY = 2 # seconds
IMPORTED_PRODUCTS_FILE = "imported_products_current_category.json"
SAVE_INTERVAL = 50 # Save every 50 products for performance
async def _request_with_retries(session, method, url, **kwargs):
"""Generic request wrapper with retry/backoff for 5xx errors."""
for attempt in range(1, MAX_RETRIES + 1):
try:
async with session.request(method, url, **kwargs) as resp:
text = await resp.text()
if resp.status in (200, 201, 204):
try:
return await resp.json()
except Exception:
return text
elif 500 <= resp.status < 600:
logger.warning(f"⚠️ {method} {url} failed with {resp.status}, attempt {attempt}/{MAX_RETRIES}. Retrying...")
if attempt < MAX_RETRIES:
await asyncio.sleep(RETRY_BASE_DELAY * attempt)
continue
logger.error(f"❌ {method} {url} failed, status {resp.status}, headers={dict(resp.headers)}, body={text[:500]}")
if resp.status == 400:
try:
return await resp.json()
except Exception:
return None
return None
except Exception as e:
logger.error(f"❌ Exception in {method} {url} (attempt {attempt}/{MAX_RETRIES}): {e}")
if attempt < MAX_RETRIES:
await asyncio.sleep(RETRY_BASE_DELAY * attempt)
continue
return None
return None
# =========================
# Configs and constants
# =========================
db_config = {
'host': 'localhost',
'user': 'ceoquantritidozone',
'password': 'london1986',
'database': 'tidozone_vn',
'port': 3306
}
LOG_FILE = "/var/www/sites/vn.tidozzon.com/public/wp-content/cjdropshipping-project/cj-to-tidozone-product-import.log"
logging.basicConfig(
# level=logging.DEBUG,
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
handlers=[logging.FileHandler(LOG_FILE), logging.StreamHandler()]
)
logger = logging.getLogger(__name__)
WC_API_URL = "https://tidozone.com:444/wp-json/wc/v3/products"
WC_REVIEW_API_URL = "https://tidozone.com:444/wp-json/wc/v3/products/reviews"
WC_CONSUMER_KEY = "ck_19865916e67b26d824606a0cc248e02a43bbe1c5"
WC_CONSUMER_SECRET = "cs_78133f37c7eef8a963644c06e0c190ae89170859"
CJ_AUTH_URL = "https://developers.cjdropshipping.com/api2.0/v1/authentication/getAccessToken"
CJ_PRODUCT_LIST_URL = "https://developers.cjdropshipping.com/api2.0/v1/product/list"
CJ_PRODUCT_DETAIL_URL = "https://developers.cjdropshipping.com/api2.0/v1/product/query"
CJ_CATEGORY_URL = "https://developers.cjdropshipping.com/api2.0/v1/product/getCategory"
# CJ account pool — add more accounts here, they will be used in rotation on 429
CJ_ACCOUNTS = [
{"email": "dinhsonwork@gmail.com", "api_key": "CJ4474766@api@32aecb0d71004592802cb8e58d3908ff"},
{"email": "dinhsonkt@gmail.com", "api_key": "CJ270417@api@dc5650313632410588d5c093f245a37c"},
# {"email": "account3@gmail.com", "api_key": "CJxxxxxxx@api@xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"},
]
_cj_account_index = 0 # current active account
# Redis config — shared with PHP plugin (DB#10 for tokens)
REDIS_HOST = "192.168.1.141"
REDIS_PORT = 6379
REDIS_DB = 10
REVIEW_COMMENTS_FILE = "/var/www/sites/vn.tidozzon.com/public/wp-content/woo-product-comment/woo-product-comment-tidozone.com.txt"
CATEGORY_LIST_FILE = "/var/www/sites/vn.tidozzon.com/public/wp-content/cjdropshipping-project/category-list.txt"
SITEMAP_FILE = "/var/www/sites/vn.tidozzon.com/public/sitemap/new-post"
PROCESSED_PID_FILE = "/var/www/sites/vn.tidozzon.com/public/cj_processed_pids.json"
CATEGORY_PAGE_FILE = "/var/www/sites/vn.tidozzon.com/public/cj_category_page.json"
CATEGORY_COMPLETED_FILE = "/var/www/sites/vn.tidozzon.com/public/cj_completed_categories.json"
CONCURRENCY = 1
PRICE_FACTOR = 1.5
SLEEP_BETWEEN_PRODUCTS = 1
# =========================
# Reviewer name pools
# =========================
ENGLISH_FIRST_NAMES = [
"John","Jane","Michael","Emily","Robert","Linda","David","Susan","James","Karen","William","Patricia",
"Thomas","Barbara","Christopher","Jessica","Daniel","Sarah","Matthew","Lisa","Anthony","Nancy","Mark",
"Betty","Paul","Margaret","Steven","Sandra","Andrew","Ashley","Kenneth","Kimberly","Joshua","Donna","Kevin",
"Carol","Brian","Michelle","George","Dorothy","Edward","Amanda","Ronald","Melissa","Timothy","Deborah",
"Jason","Stephanie","Jeffrey","Rebecca","Ryan","Laura","Jacob","Sharon","Gary","Cynthia","Nicholas","Kathleen",
"Eric","Amy","Stephen","Shirley","Jonathan","Angela","Larry","Helen","Justin","Anna","Scott","Brenda",
"Brandon","Pamela","Frank","Nicole","Benjamin","Emma","Gregory","Samantha","Raymond","Katherine","Samuel",
"Christine","Patrick","Debra","Alexander","Rachel","Jack","Catherine","Dennis","Carolyn","Jerry","Janet",
"Tyler","Ruth","Aaron","Maria","Jose","Heather","Adam","Diane","Henry","Virginia","Nathan","Julie","Douglas",
"Joyce","Zachary","Victoria","Peter","Olivia","Kyle","Kelly","Walter","Christina","Ethan","Lauren","Jeremy",
"Joan","Harold","Evelyn","Keith","Judith","Christian","Megan","Roger","Cheryl","Noah","Andrea","Gerald",
"Hannah","Carl","Martha","Terry","Jacqueline","Sean","Frances","Austin","Gloria","Arthur","Ann","Lawrence",
"Teresa","Jesse","Kathryn","Dylan","Sara","Bryan","Janice","Joe","Jean","Jordan","Alice","Billy","Madison",
"Bruce","Doris","Albert","Abigail","Willie","Julia","Gabriel","Judy","Logan","Grace","Alan","Denise","Juan",
"Amber","Wayne","Marilyn","Roy","Beverly","Ralph","Danielle","Randy","Theresa","Eugene","Sophia","Vincent",
"Marie","Russell","Diana","Elijah","Brittany","Louis","Natalie","Bobby","Isabella","Philip","Charlotte",
"Johnny","Rose","Bradley","Alexis","Kayla","Martin","Lori","Phillip","Annette","Craig","Tiffany","Curtis",
"Bonnie","Shawn","Beth","Clarence","Leslie","Danny","Jill","Todd","Vicki","Frederick","Sheila","Harry",
"Erin","Howard","Melanie","Tina","Paula","Victor","Marsha","Robyn","Jimmy","Kathy","Ernest","Monica",
"Stanley","Carrie","Leonard","Charlene","Dale","Kristin","Ray","Lois","Francis","Connie","Wendy","Andre",
"Annie","Clifford","Shannon","Alex","Christy","Glen","Karla","Jeffery","Tracy","Neil","Gina","Terrence",
"Lynn","Ricky","Eva","Joanne","Troy","Kristina","Ronnie","Peggy","Wesley","Esther","Randall","Molly",
"Allen","Teri","Calvin","Ellen","Norman","Tammy","Tony","Melinda","Marvin","Valerie","Glenn","Yvonne",
"Barry","Dawn","Derrick","Rosa","Leon","Sherry","Harvey","Tara","Milton","Carla","Edwin","Tonya","Lance",
"Tricia","Arnold","Nina","Max","Nora","Gilbert","Patty","Erik","Alicia","Gene","Michele","Maurice","Kristy",
"Reginald","Jeanette","Chad","Lillian","Antonio","Shelly","Raul","Becky","Oscar","Yolanda","Clinton","Pat",
"Isaac","Belinda"
]
ENGLISH_LAST_NAMES = [
"Smith","Johnson","Williams","Brown","Jones","Garcia","Miller","Davis","Rodriguez","Martinez","Hernandez",
"Lopez","Gonzalez","Wilson","Anderson","Thomas","Taylor","Moore","Jackson","Martin","Lee","Perez","Thompson",
"White","Harris","Sanchez","Clark","Ramirez","Lewis","Robinson","Walker","Young","Allen","King","Wright",
"Scott","Torres","Nguyen","Hill","Flores","Green","Adams","Nelson","Baker","Hall","Rivera","Campbell",
"Mitchell","Carter","Roberts","Gomez","Phillips","Evans","Turner","Diaz","Parker","Cruz","Edwards","Collins",
"Reyes","Stewart","Morris","Morales","Murphy","Cook","Rogers","Gutierrez","Ortiz","Morgan","Cooper","Peterson",
"Bailey","Reed","Kelly","Howard","Ramos","Kim","Cox","Ward","Richardson","Watson","Brooks","Chavez","Wood",
"James","Bennett","Gray","Mendoza","Ruiz","Hughes","Price","Alvarez","Castillo","Sanders","Patel","Myers",
"Long","Ross","Foster","Jimenez","Powell","Jenkins","Perry","Russell","Sullivan","Bell","Coleman","Butler",
"Henderson","Barnes","Gonzales","Fisher","Vasquez","Simmons","Romero","Jordan","Patterson","Alexander","Hamilton",
"Graham","Reynolds","Griffin","Wallace","Moreno","West","Cole","Hayes","Bryant","Herrera","Gibson","Ellis",
"Tran","Medina","Aguilar","Stevens","Murray","Ford","Castro","Marshall","Owens","Harrison","Fernandez",
"Mcdonald","Woods","Washington","Kennedy","Wells","Vargas","Henry","Chen","Freeman","Webb","Tucker","Guzman",
"Burns","Crawford","Olson","Simpson","Porter","Hunter","Gordon","Mendez","Silva","Shaw","Snyder","Mason",
"Dixon","Munoz","Hunt","Hicks","Holmes","Palmer","Wagner","Black","Robertson","Boyd","Rose","Stone","Salazar",
"Fox","Warren","Mills","Meyer","Rice","Schmidt","Garza","Daniels","Ferguson","Nichols","Stephens","Soto",
"Weaver","Ryan","Gardner","Payne","Grant","Dunn","Kelley","Spencer","Hawkins","Arnold","Pierce","Vazquez",
"Hansen","Peters","Santos","Hart","Bradley","Knight","Elliott","Cunningham","Duncan","Armstrong","Hudson",
"Carroll","Lane","Riley","Andrews","Alvarado","Ray","Delgado","Berry","Perkins","Hoffman","Johnston","Matthews",
"Pena","Richards","Contreras","Willis","Carpenter","Lawrence","Sandoval","Guerrero","George","Chapman","Rios",
"Estrada","Ortega","Watkins","Greene","Nunez","Wheeler","Valdez","Harper","Burke","Larson","Santiago",
"Maldonado","Morrison","Franklin","Carlson","Austin","Dominguez","Carr","Lawson","Jacobs","Obrien","Lynch","Singh"
]
# =========================
# Helpers
# =========================
def get_current_cj_account():
return CJ_ACCOUNTS[_cj_account_index]
def rotate_cj_account():
global _cj_account_index
next_index = (_cj_account_index + 1) % len(CJ_ACCOUNTS)
if next_index == _cj_account_index:
logger.warning("⚠️ Only one CJ account configured, cannot rotate")
return False
_cj_account_index = next_index
# Clear both tokens for the new account so it fetches fresh
r = get_redis()
if r:
r.delete(f"tido_cj_access_token_{next_index}")
r.delete(f"tido_cj_refresh_token_{next_index}")
logger.warning(f"🔄 Rotated to CJ account: {get_current_cj_account()['email']}")
return True
def load_review_comments():
try:
with open(REVIEW_COMMENTS_FILE, 'r') as f:
return [line.strip() for line in f if line.strip()]
except Exception as e:
logger.error(f"Failed to load review comments: {e}")
return []
REVIEW_COMMENTS = load_review_comments()
def random_reviewer():
first = random.choice(ENGLISH_FIRST_NAMES)
last = random.choice(ENGLISH_LAST_NAMES)
email = f"{first.lower()}.{last.lower()}{random.randint(100,999)}@example.com"
return f"{first} {last}", email
def cleanup_wc_sku_lookup():
"""Remove orphaned SKU rows from lookup and postmeta tables."""
cleanup_lookup_sql = """
DELETE pl
FROM wp_514_wc_product_meta_lookup pl
LEFT JOIN wp_514_posts p ON pl.product_id = p.ID
WHERE p.ID IS NULL OR p.post_type NOT IN ('product', 'product_variation') OR p.post_status = 'trash';
"""
cleanup_postmeta_sql = """
DELETE pm
FROM wp_514_postmeta pm
LEFT JOIN wp_514_posts p ON pm.post_id = p.ID
WHERE pm.meta_key = '_sku'
AND (p.ID IS NULL OR p.post_type NOT IN ('product','product_variation') OR p.post_status = 'trash');
"""
try:
conn = mysql.connector.connect(**db_config)
cursor = conn.cursor()
cursor.execute(cleanup_lookup_sql)
affected_lookup = cursor.rowcount
cursor.execute(cleanup_postmeta_sql)
affected_meta = cursor.rowcount
conn.commit()
logger.info(f"MySQL cleanup: removed {affected_lookup} lookup rows, {affected_meta} postmeta rows.")
except Exception as e:
logger.error(f"MySQL cleanup error: {e}")
finally:
try: cursor.close(); conn.close()
except Exception: pass
def cleanup_duplicate_meta():
"""Remove duplicate postmeta rows keeping only the latest meta_id per post_id+meta_key.
Covers all CJ-specific keys that the WC REST API tends to duplicate on re-import."""
sql = """
DELETE pm1
FROM wp_514_postmeta pm1
INNER JOIN wp_514_postmeta pm2
WHERE pm1.meta_id < pm2.meta_id
AND pm1.post_id = pm2.post_id
AND pm1.meta_key = pm2.meta_key
AND pm1.meta_key IN (
'vid','pid','variantKey','variantSku',
'startCountryCode','srcAreaCode','variantStandard'
);
"""
try:
conn = mysql.connector.connect(**db_config)
cursor = conn.cursor()
cursor.execute(sql)
affected = cursor.rowcount
conn.commit()
logger.info(f"Duplicate meta cleanup: removed {affected} duplicate rows.")
except Exception as e:
logger.error(f"Duplicate meta cleanup error: {e}")
finally:
try: cursor.close(); conn.close()
except Exception: pass
def repair_missing_parent_meta():
try:
conn = mysql.connector.connect(**db_config)
cursor = conn.cursor()
cursor.execute("""
SELECT DISTINCT p.ID,
COALESCE(NULLIF(lk.sku, ''), '') AS sku
FROM wp_514_posts p
LEFT JOIN wp_514_wc_product_meta_lookup lk ON lk.product_id = p.ID
WHERE p.post_type = 'product'
AND p.post_status = 'publish'
""")
products = cursor.fetchall()
logger.info(f"[REPAIR] Checking {len(products)} published products...")
fixed_sku = fixed_price = fixed_stock = 0
for product_id, sku in products:
cursor.execute(
"SELECT meta_value FROM wp_514_postmeta WHERE post_id = %s AND meta_key = '_sku' LIMIT 1",
(product_id,)
)
row = cursor.fetchone()
if not row or not row[0]:
rescue_sku = sku
if not rescue_sku:
cursor.execute("""
SELECT pm.meta_value
FROM wp_514_posts var
INNER JOIN wp_514_postmeta pm ON pm.post_id = var.ID
WHERE var.post_parent = %s
AND var.post_type = 'product_variation'
AND var.post_status != 'trash'
AND pm.meta_key = '_sku'
AND pm.meta_value != ''
ORDER BY var.ID ASC LIMIT 1
""", (product_id,))
sku_row = cursor.fetchone()
if sku_row and sku_row[0]:
rescue_sku = sku_row[0]
if rescue_sku:
cursor.execute("""
INSERT INTO wp_514_postmeta (post_id, meta_key, meta_value)
VALUES (%s, '_sku', %s)
ON DUPLICATE KEY UPDATE meta_value = VALUES(meta_value)
""", (product_id, rescue_sku))
fixed_sku += 1
cursor.execute(
"SELECT meta_value FROM wp_514_postmeta WHERE post_id = %s AND meta_key = '_regular_price' LIMIT 1",
(product_id,)
)
row = cursor.fetchone()
if not row or not row[0] or row[0] in ('0', '0.00'):
cursor.execute("""
SELECT MIN(pm.meta_value)
FROM wp_514_posts var
INNER JOIN wp_514_postmeta pm ON pm.post_id = var.ID
WHERE var.post_parent = %s
AND var.post_type = 'product_variation'
AND var.post_status != 'trash'
AND pm.meta_key = '_regular_price'
AND pm.meta_value != ''
AND pm.meta_value != '0'
AND pm.meta_value != '0.00'
""", (product_id,))
price_row = cursor.fetchone()
price = price_row[0] if price_row and price_row[0] else None
if price:
cursor.execute("""
INSERT INTO wp_514_postmeta (post_id, meta_key, meta_value)
VALUES (%s, '_regular_price', %s)
ON DUPLICATE KEY UPDATE meta_value = VALUES(meta_value)
""", (product_id, price))
fixed_price += 1
cursor.execute(
"SELECT COUNT(*) FROM wp_514_postmeta WHERE post_id = %s AND meta_key = '_stock_status'",
(product_id,)
)
count = cursor.fetchone()[0]
if count != 1:
cursor.execute(
"DELETE FROM wp_514_postmeta WHERE post_id = %s AND meta_key = '_stock_status'",
(product_id,)
)
cursor.execute(
"INSERT INTO wp_514_postmeta (post_id, meta_key, meta_value) VALUES (%s, '_stock_status', 'instock')",
(product_id,)
)
fixed_stock += 1
if (fixed_sku + fixed_price + fixed_stock) % 100 == 0:
conn.commit()
conn.commit()
logger.info(f"[REPAIR] ✅ Fixed _sku for {fixed_sku}, _regular_price for {fixed_price}, _stock_status for {fixed_stock} products.")
except Exception as e:
logger.error(f"[REPAIR] ❌ failed: {e}")
finally:
try: cursor.close(); conn.close()
except Exception: pass
# =========================
# PID tracking
# =========================
def load_processed_pids() -> Set[str]:
if os.path.exists(PROCESSED_PID_FILE):
try:
with open(PROCESSED_PID_FILE, "r") as f:
content = f.read().strip()
return set(json.loads(content)) if content else set()
except Exception:
return set()
return set()
def save_processed_pids(pids: Set[str]):
with open(PROCESSED_PID_FILE, "w") as f:
json.dump(list(pids), f)
processed_pids = load_processed_pids()
def is_pid_processed(pid: str) -> bool:
return pid in processed_pids
def mark_pid_processed(pid: str):
processed_pids.add(pid)
save_processed_pids(processed_pids)
# =========================
# Category page tracking
# =========================
def read_current_page(category_id: str) -> int:
if os.path.exists(CATEGORY_PAGE_FILE):
try:
with open(CATEGORY_PAGE_FILE, "r") as f:
content = f.read().strip()
return json.loads(content).get(category_id, 1) if content else 1
except Exception:
pass
return 1
def write_current_page(category_id: str, page: int):
data = {}
if os.path.exists(CATEGORY_PAGE_FILE):
try:
with open(CATEGORY_PAGE_FILE, "r") as f:
data = json.load(f)
except Exception:
pass
data[category_id] = page
with open(CATEGORY_PAGE_FILE, "w") as f:
json.dump(data, f)
# =========================
# Category completion tracking
# =========================
def get_last_completed_category() -> Optional[str]:
if os.path.exists(CATEGORY_COMPLETED_FILE):
try:
with open(CATEGORY_COMPLETED_FILE, "r") as f:
content = f.read().strip()
return json.loads(content).get("last_completed") if content else None
except Exception:
pass
return None
def mark_category_completed(category_id: str):
with open(CATEGORY_COMPLETED_FILE, "w") as f:
json.dump({"last_completed": category_id}, f)
# =========================
# Per-category import tracking
# =========================
def load_imported_products_for_category(category_id: str) -> Set[str]:
if os.path.exists(IMPORTED_PRODUCTS_FILE):
try:
with open(IMPORTED_PRODUCTS_FILE, 'r') as f:
data = json.load(f)
if data.get("category_id") == category_id:
return set(data.get("products", []))
except Exception as e:
logger.warning(f"Failed to load imported products: {e}")
return set()
def save_imported_products_for_category(category_id: str, products: Set[str]):
try:
with open(IMPORTED_PRODUCTS_FILE, 'w') as f:
json.dump({"category_id": category_id, "products": sorted(list(products))}, f, indent=2)
except Exception as e:
logger.error(f"Failed to save imported products: {e}")
def clear_category_product_tracking():
if os.path.exists(IMPORTED_PRODUCTS_FILE):
try:
os.remove(IMPORTED_PRODUCTS_FILE)
except Exception as e:
logger.warning(f"Failed to clear product tracking: {e}")
# =========================
# CJ auth & fetch
# =========================
# -------------------------
# Redis connection (shared with PHP plugin, DB#10)
# -------------------------
def get_redis():
try:
r = redis_lib.Redis(host=REDIS_HOST, port=REDIS_PORT, db=REDIS_DB,
socket_connect_timeout=2, decode_responses=True)
r.ping()
return r
except Exception as e:
logger.warning(f"Redis unavailable: {e}")
return None
def cj_store_tokens(r, index, email, data):
"""Store access + refresh tokens to Redis with TTLs from expiry dates."""
access_token = data.get("accessToken", "")
refresh_token = data.get("refreshToken", "")
# Calculate TTL from expiry dates; fallback to safe defaults
access_ttl = 12 * 3600 # 12h fallback
refresh_ttl = 150 * 24 * 3600 # 150 days fallback
if data.get("accessTokenExpiryDate"):
try:
exp = datetime.fromisoformat(data["accessTokenExpiryDate"].replace("+08:00", "+00:00"))
ttl = int((exp - datetime.utcnow()).total_seconds()) - 300
if ttl > 60: access_ttl = ttl
except Exception: pass
if data.get("refreshTokenExpiryDate"):
try:
exp = datetime.fromisoformat(data["refreshTokenExpiryDate"].replace("+08:00", "+00:00"))
ttl = int((exp - datetime.utcnow()).total_seconds()) - 300
if ttl > 60: refresh_ttl = ttl
except Exception: pass
if access_token:
r.setex(f"tido_cj_access_token_{index}", access_ttl, access_token)
logger.info(f"CJ AUTH: access token stored index={index} [{email}] ttl={access_ttl}s")
if refresh_token:
r.setex(f"tido_cj_refresh_token_{index}", refresh_ttl, refresh_token)
logger.info(f"CJ AUTH: refresh token stored index={index} [{email}] ttl={refresh_ttl}s")
async def refresh_cj_token(index):
"""Try to refresh access token using stored refresh token."""
r = get_redis()
if not r:
return None
refresh_token = r.get(f"tido_cj_refresh_token_{index}")
if not refresh_token:
logger.info(f"CJ REFRESH: no refresh token in Redis for index={index}")
return None
email = CJ_ACCOUNTS[index]["email"]
logger.info(f"CJ REFRESH: attempting refresh for [{email}]")
try:
async with aiohttp.ClientSession() as s:
async with s.post(
"https://developers.cjdropshipping.com/api2.0/v1/authentication/refreshAccessToken",
json={"refreshToken": refresh_token}, timeout=aiohttp.ClientTimeout(total=15)
) as resp:
data = await resp.json()
logger.info(f"CJ REFRESH RESPONSE [{email}]: {json.dumps(data)}")
token = data.get("data", {}).get("accessToken")
if token:
cj_store_tokens(r, index, email, data.get("data", {}))
return token
logger.warning(f"CJ REFRESH: failed for [{email}], will fetch fresh token")
except Exception as e:
logger.error(f"CJ REFRESH ERROR [{email}]: {e}")
return None
async def fetch_cj_token(account=None):
"""Fetch fresh access token from CJ using API key."""
if account is None:
account = get_current_cj_account()
index = CJ_ACCOUNTS.index(account)
email = account["email"]
api_key = account.get("api_key", "")
if not api_key:
logger.error(f"CJ AUTH: no api_key for [{email}]")
return None
try:
async with aiohttp.ClientSession() as s:
async with s.post(
CJ_AUTH_URL, json={"apiKey": api_key},
timeout=aiohttp.ClientTimeout(total=15)
) as resp:
data = await resp.json()
logger.info(f"CJ AUTH RESPONSE [{email}]: {json.dumps(data)}")
# Rate limited — rotate account
if data.get("code") == 1600200 or "Too Many Requests" in (data.get("message") or ""):
logger.warning(f"CJ AUTH: rate limited for [{email}], rotating account")
if rotate_cj_account():
return await fetch_cj_token(get_current_cj_account())
return None
token = data.get("data", {}).get("accessToken")
if token:
r = get_redis()
if r:
cj_store_tokens(r, index, email, data.get("data", {}))
return token
logger.error(f"CJ AUTH: accessToken missing for [{email}]")
except Exception as e:
logger.error(f"CJ AUTH ERROR [{email}]: {e}")
return None
async def get_cj_access_token(account=None):
"""Get token: try Redis cache → refresh token → fetch fresh with apiKey."""
if account is None:
account = get_current_cj_account()
index = CJ_ACCOUNTS.index(account)
email = account["email"]
# 1. Try cached access token
r = get_redis()
if r:
token = r.get(f"tido_cj_access_token_{index}")
if token:
logger.info(f"CJ AUTH: using cached access token index={index} [{email}]")
return token
logger.info(f"CJ AUTH: access token expired/missing for index={index}, trying refresh...")
# 2. Try refresh token
token = await refresh_cj_token(index)
if token:
logger.info(f"CJ AUTH: token refreshed successfully for index={index}")
return token
# 3. Full re-auth with API key
logger.info(f"CJ AUTH: refresh failed, fetching fresh token for index={index}")
return await fetch_cj_token(account)
async def fetch_cj_product_list(session: aiohttp.ClientSession, token: str, page_num: int, category_id: str):
await asyncio.sleep(0.3)
headers = {"CJ-Access-Token": token}
params = {"pageSize": 200, "categoryId": category_id, "pageNum": page_num}
for attempt in range(1, MAX_RETRIES + 1):
try:
async with session.get(CJ_PRODUCT_LIST_URL, headers=headers, params=params) as resp:
if resp.status == 429:
wait = RETRY_BASE_DELAY * attempt * 2
logger.warning(f"⚠️ CJ rate limit 429 page {page_num}, attempt {attempt}/{MAX_RETRIES}. Trying account rotation...")
if rotate_cj_account():
token = await get_cj_access_token()
headers = {"CJ-Access-Token": token}
else:
logger.warning(f"Waiting {wait}s...")
await asyncio.sleep(wait)
continue
resp.raise_for_status()
data = await resp.json()
lst = data.get("data", {}).get("list", []) or []
return [item.get("pid") for item in lst if item.get("pid")]
except Exception as e:
logger.error(f"Error fetching CJ product list page {page_num} attempt {attempt}: {e}")
if attempt < MAX_RETRIES:
await asyncio.sleep(RETRY_BASE_DELAY * attempt)
continue
return []
return []
async def fetch_cj_product_detail(session: aiohttp.ClientSession, pid: str, token: str):
await asyncio.sleep(0.5)
headers = {"CJ-Access-Token": token}
params = {"pid": pid}
for attempt in range(1, MAX_RETRIES + 1):
try:
async with session.get(CJ_PRODUCT_DETAIL_URL, headers=headers, params=params) as resp:
if resp.status == 429:
wait = RETRY_BASE_DELAY * attempt * 2
logger.warning(f"⚠️ CJ rate limit 429 for PID {pid}, attempt {attempt}/{MAX_RETRIES}. Trying account rotation...")
if rotate_cj_account():
token = await get_cj_access_token()
headers = {"CJ-Access-Token": token}
else:
logger.warning(f"Waiting {wait}s...")
await asyncio.sleep(wait)
continue
resp.raise_for_status()
data = await resp.json()
return data.get("data")
except Exception as e:
logger.error(f"Error fetching product detail for PID {pid} attempt {attempt}/{MAX_RETRIES}: {e}")
if attempt < MAX_RETRIES:
await asyncio.sleep(RETRY_BASE_DELAY * attempt)
continue
logger.error(f"❌ Giving up on PID {pid} after {MAX_RETRIES} attempts")
return None
# =========================
# Category list handling
# =========================
async def fetch_and_save_categories(token):
try:
headers = {"CJ-Access-Token": token}
async with aiohttp.ClientSession() as s:
async with s.get(CJ_CATEGORY_URL, headers=headers) as resp:
resp.raise_for_status()
data = await resp.json()
raw = data.get("data", []) or []
rows = []
for first in raw:
first_id = first.get("categoryFirstId", "")
for second in first.get("categoryFirstList", []) or []:
second_id = second.get("categorySecondId", "")
for third in second.get("categorySecondList", []) or []:
cat_id = third.get("categoryId", "")
if cat_id:
rows.append(f"{cat_id},{first_id},{second_id}")
os.makedirs(os.path.dirname(CATEGORY_LIST_FILE), exist_ok=True)
with open(CATEGORY_LIST_FILE, "w") as f:
f.write("\n".join(rows))
logger.info(f"Saved {len(rows)} leaf categories to {CATEGORY_LIST_FILE}")
except Exception as e:
logger.error(f"Failed to fetch/save categories: {e}")
def iterate_all_category_ids():
if not os.path.exists(CATEGORY_LIST_FILE):
logger.error(f"{CATEGORY_LIST_FILE} not found")
return []
ids = []
with open(CATEGORY_LIST_FILE, "r") as f:
for line in f:
line = line.strip()
if not line:
continue
parts = line.split(",")
if parts and parts[0]:
ids.append(parts[0])
return ids
def upsert_variation_meta(conn, variation_id: int, meta_key: str, meta_value: str):
"""Write meta directly to DB — DELETE duplicates then INSERT once."""
with conn.cursor() as cursor:
cursor.execute(
"DELETE FROM wp_514_postmeta WHERE post_id = %s AND meta_key = %s",
(variation_id, meta_key)
)
cursor.execute(
"INSERT INTO wp_514_postmeta (post_id, meta_key, meta_value) VALUES (%s, %s, %s)",
(variation_id, meta_key, meta_value)
)
conn.commit()
# =========================
# MySQL helpers
# =========================
def verify_product_has_vid(product_id: int) -> bool:
sql = """
SELECT pm.meta_value
FROM wp_514_postmeta pm
INNER JOIN wp_514_posts p ON pm.post_id = p.ID
WHERE p.post_parent = %s
AND p.post_type = 'product_variation'
AND p.post_status != 'trash'
AND pm.meta_key = 'vid'
AND pm.meta_value IS NOT NULL
AND pm.meta_value != ''
LIMIT 1
"""
try:
conn = mysql.connector.connect(**db_config)
cursor = conn.cursor()
cursor.execute(sql, (product_id,))
row = cursor.fetchone()
if row:
logger.info(f"✅ VID verified for product {product_id}: vid='{row[0]}'")
return True
logger.warning(f"⚠️ VID verification FAILED for product {product_id}: no variation has 'vid'.")
return False
except Exception as e:
logger.error(f"MySQL VID verification error for product {product_id}: {e}")
return False
finally:
try: cursor.close(); conn.close()
except Exception: pass
def verify_product_complete(product_id: int, sku: str, parent_price: str, variation_data: list) -> tuple[bool, list]:
issues = []
if not sku:
issues.append(f"Parent {product_id} missing sku")
try:
if not parent_price or float(parent_price or 0) <= 0:
issues.append(f"Parent {product_id} missing or zero price")
except Exception:
issues.append(f"Parent {product_id} invalid price '{parent_price}'")
if not variation_data:
issues.append(f"Product {product_id} has no variations")
for var in variation_data:
if not var.get('sku'):
issues.append(f"Variation {var.get('id')} missing sku")
if not var.get('regular_price'):
issues.append(f"Variation {var.get('id')} missing regular_price")
if not var.get('vid'):
issues.append(f"Variation {var.get('id')} missing vid")
try:
conn = mysql.connector.connect(**db_config)
cursor = conn.cursor()
cursor.execute("""
SELECT meta_key, meta_value FROM wp_514_postmeta
WHERE post_id = %s AND meta_key IN ('_sku', '_regular_price')
""", (product_id,))
db_meta = {r[0]: r[1] for r in cursor.fetchall()}
if not db_meta.get('_sku'):
issues.append(f"Parent {product_id} _sku missing in DB")
if not db_meta.get('_regular_price') or db_meta.get('_regular_price') in ('0', '0.00', ''):
issues.append(f"Parent {product_id} _regular_price missing in DB")
except Exception as e:
logger.error(f"[VERIFY] DB check failed: {e}")
finally:
try: cursor.close(); conn.close()
except Exception: pass
return len(issues) == 0, issues
def find_product_id_by_sku(sku: str) -> Optional[int]:
sql = "SELECT post_id FROM wp_514_postmeta WHERE meta_key = '_sku' AND meta_value = %s LIMIT 1"
try:
conn = mysql.connector.connect(**db_config)
cursor = conn.cursor()
cursor.execute(sql, (sku,))
row = cursor.fetchone()
return int(row[0]) if row else None
except Exception as e:
logger.error(f"MySQL lookup error for SKU {sku}: {e}")
return None
finally:
try: cursor.close(); conn.close()
except Exception: pass
def find_conflicting_product_id(unique_sku: str) -> Optional[int]:
try:
conn = mysql.connector.connect(**db_config)
cursor = conn.cursor()
cursor.execute(
"SELECT post_id FROM wp_514_postmeta WHERE meta_key = '_sku' AND meta_value = %s LIMIT 1",
(unique_sku,)
)
row = cursor.fetchone()
if row:
return int(row[0])
cursor.execute(
"SELECT product_id FROM wp_514_wc_product_meta_lookup WHERE sku = %s LIMIT 1",
(unique_sku,)
)
row = cursor.fetchone()
return int(row[0]) if row else None
except Exception as e:
logger.error(f"MySQL lookup error for unique_sku {unique_sku}: {e}")
return None
finally:
try: cursor.close(); conn.close()
except Exception: pass
def force_delete_sku_from_db(unique_sku: str):
try:
conn = mysql.connector.connect(**db_config)
cursor = conn.cursor()
cursor.execute("DELETE FROM wp_514_postmeta WHERE meta_key = '_sku' AND meta_value = %s", (unique_sku,))
affected_postmeta = cursor.rowcount
cursor.execute("DELETE FROM wp_514_wc_product_meta_lookup WHERE sku = %s", (unique_sku,))
affected_lookup = cursor.rowcount
conn.commit()
logger.warning(f"⚠️ Force-deleted SKU {unique_sku} (postmeta={affected_postmeta}, lookup={affected_lookup}).")
except Exception as e:
logger.error(f"MySQL force delete error for SKU {unique_sku}: {e}")
finally:
try: cursor.close(); conn.close()
except Exception: pass
# =========================
# WooCommerce helpers
# =========================
def ensure_fallback_category():
"""Ensure 'cjdropshipping' category exists. Creates it if missing."""
try:
conn = mysql.connector.connect(**db_config)
cursor = conn.cursor()
cursor.execute("""
SELECT t.term_id FROM wp_514_terms t
INNER JOIN wp_514_term_taxonomy tt ON tt.term_id = t.term_id
WHERE t.slug = 'cjdropshipping' AND tt.taxonomy = 'product_cat'
LIMIT 1
""")
row = cursor.fetchone()
if row:
logger.info(f"[CATEGORY] Fallback category 'cjdropshipping' exists (term_id={row[0]})")
return row[0]
cursor.execute(
"INSERT INTO wp_514_terms (name, slug, term_group) VALUES ('cjdropshipping', 'cjdropshipping', 0)"
)
term_id = cursor.lastrowid
cursor.execute(
"INSERT INTO wp_514_term_taxonomy (term_id, taxonomy, description, parent, count) VALUES (%s, 'product_cat', '', 0, 0)",
(term_id,)
)
conn.commit()
logger.info(f"[CATEGORY] Created fallback category 'cjdropshipping' (term_id={term_id})")
return term_id
except Exception as e:
logger.error(f"[CATEGORY] ensure_fallback_category failed: {e}")
return None
finally:
try: cursor.close(); conn.close()
except Exception: pass
def force_sync_product_meta(product_id: int, sku: str, regular_price: str, variation_data: list):
try:
conn = mysql.connector.connect(**db_config)
cursor = conn.cursor()
cursor.execute("""
DELETE FROM wp_514_postmeta
WHERE post_id = %s
AND meta_key IN ('_sku', '_regular_price')
AND (meta_value = '' OR meta_value IS NULL)
""", (product_id,))
all_ids = [product_id] + [v.get('id') for v in variation_data if v.get('id')]
if all_ids:
placeholders = ','.join(['%s'] * len(all_ids))
cursor.execute(f"""
DELETE FROM wp_514_postmeta
WHERE post_id IN ({placeholders})
AND meta_key = '_stock_status'
""", all_ids)
def upsert_meta(post_id, key, value):
if value is None or value == '':
return
try:
cursor.execute("""
INSERT INTO wp_514_postmeta (post_id, meta_key, meta_value)
VALUES (%s, %s, %s)
ON DUPLICATE KEY UPDATE meta_value = VALUES(meta_value)
""", (post_id, key, str(value)))
except Exception as e:
logger.error(f"[UPSERT] FAILED post_id={post_id} key={key} value='{value}': {e}")
upsert_meta(product_id, '_sku', sku)
upsert_meta(product_id, '_regular_price', regular_price)
upsert_meta(product_id, '_price', regular_price)
upsert_meta(product_id, '_stock_status', 'instock')
for var in variation_data:
var_id = var.get('id')
var_sku = var.get('sku', '')
var_price = var.get('regular_price', '')
var_vid = var.get('vid', '')
var_weight = var.get('weight', '')
var_length = var.get('length', '')
var_width = var.get('width', '')
var_height = var.get('height', '')
if not var_id:
continue
upsert_meta(var_id, '_sku', var_sku)
upsert_meta(var_id, '_regular_price', var_price)
upsert_meta(var_id, '_price', var_price)
upsert_meta(var_id, '_stock_status', 'instock')
if var_vid:
cursor.execute(
"DELETE FROM wp_514_postmeta WHERE post_id = %s AND meta_key = 'vid'",
(var_id,)
)
cursor.execute(
"INSERT INTO wp_514_postmeta (post_id, meta_key, meta_value) VALUES (%s, 'vid', %s)",
(var_id, var_vid)
)
if var_weight: upsert_meta(var_id, '_weight', var_weight)
if var_length: upsert_meta(var_id, '_length', var_length)
if var_width: upsert_meta(var_id, '_width', var_width)
if var_height: upsert_meta(var_id, '_height', var_height)
try:
parent_price_float = float(regular_price) if regular_price else 0.0
except Exception:
parent_price_float = 0.0
cursor.execute("""
INSERT INTO wp_514_wc_product_meta_lookup
(product_id, sku, `virtual`, downloadable, min_price, max_price, onsale, stock_quantity, stock_status, rating_count, average_rating, total_sales)
VALUES (%s, %s, 0, 0, %s, %s, 0, NULL, 'instock', 0, 0, 0)
ON DUPLICATE KEY UPDATE
sku = VALUES(sku),
min_price = IF(VALUES(min_price) > 0, VALUES(min_price), min_price),
max_price = IF(VALUES(max_price) > 0, VALUES(max_price), max_price),
stock_status = 'instock'
""", (product_id, sku, parent_price_float, parent_price_float))
for var in variation_data:
var_id = var.get('id')
var_price = var.get('regular_price', '') or '0'
var_sku = var.get('sku', '')
if not var_id:
continue
try:
price_float = float(var_price)
except Exception:
price_float = 0.0
cursor.execute("""
INSERT INTO wp_514_wc_product_meta_lookup
(product_id, sku, `virtual`, downloadable, min_price, max_price, onsale, stock_quantity, stock_status, rating_count, average_rating, total_sales)
VALUES (%s, %s, 0, 0, %s, %s, 0, NULL, 'instock', 0, 0, 0)
ON DUPLICATE KEY UPDATE
sku = VALUES(sku),
min_price = IF(VALUES(min_price) > 0, VALUES(min_price), min_price),
max_price = IF(VALUES(max_price) > 0, VALUES(max_price), max_price),
stock_status = 'instock'
""", (var_id, var_sku, price_float, price_float))
conn.commit()
logger.info(f"✅ Force-synced meta for product {product_id} (sku='{sku}', price='{regular_price}') and {len(variation_data)} variations.")
except Exception as e:
logger.error(f"MySQL force_sync_product_meta error for {product_id}: {e}")
finally:
try: cursor.close(); conn.close()
except Exception: pass
async def flush_wc_caches(session):
url = f"{WC_API_URL}/system_status/tools"
payload = {"id": "clear_transients"}
async with session.post(url, auth=aiohttp.BasicAuth(WC_CONSUMER_KEY, WC_CONSUMER_SECRET), json=payload) as resp:
if resp.status == 200:
logger.info("✅ Flushed WooCommerce transients/caches")
else:
logger.warning(f"⚠️ Failed to flush WooCommerce caches, status {resp.status}")
async def get_reviews(session, product_id):
url = f"{WC_API_URL}/reviews?product={product_id}"
try:
response = await _request_with_retries(
session, "GET", url,
auth=aiohttp.BasicAuth(WC_CONSUMER_KEY, WC_CONSUMER_SECRET),
headers={"Host": "tidozone.com"}
)
if isinstance(response, list):
return response
logger.warning(f"Unexpected review response type for product {product_id}: {response}")
return []
except Exception as e:
logger.error(f"❌ Exception fetching reviews for product {product_id}: {e}")
return []
async def get_existing_wc_product_by_sku(session: aiohttp.ClientSession, sku: str):
if not sku:
return None
try:
async with session.get(
WC_API_URL,
auth=aiohttp.BasicAuth(WC_CONSUMER_KEY, WC_CONSUMER_SECRET),
params={"sku": sku, "per_page": 1},
headers={"Host": "tidozone.com"}
) as resp:
resp.raise_for_status()
products = await resp.json()
return products[0] if products else None
except Exception as e:
logger.error(f"Error checking existing product SKU {sku}: {e}")
return None
async def find_variation_by_sku_global(session, sku):
url = f"{WC_API_URL}?sku={sku}&per_page=1"
async with session.get(
url,
auth=aiohttp.BasicAuth(WC_CONSUMER_KEY, WC_CONSUMER_SECRET),
headers={"Host": "tidozone.com"}
) as resp:
if resp.status == 200:
products = await resp.json()
if products:
return products[0]
return None
async def delete_all_by_sku(session, sku):
try:
async with session.get(
WC_API_URL,
auth=aiohttp.BasicAuth(WC_CONSUMER_KEY, WC_CONSUMER_SECRET),
params={"sku": sku},
headers={"Host": "tidozone.com"}
) as resp:
if resp.status != 200:
logger.error(f"❌ Failed searching products by SKU {sku}, status {resp.status}")
return False
products = await resp.json()
if not products:
return True
for p in products:
pid = p.get("id")
if pid:
await delete_wc_product(session, pid)
return True
except Exception as e:
logger.error(f"Exception in delete_all_by_sku({sku}): {e}")
return False
async def update_variation_meta(session, parent_id, variation_id, meta_key, meta_value, retries=3, delay=2):
url = f"{WC_API_URL}/{parent_id}/variations/{variation_id}"
payload = {"meta_data": [{"key": meta_key, "value": meta_value}]}
for attempt in range(1, retries + 1):
try:
async with session.put(
url,
auth=aiohttp.BasicAuth(WC_CONSUMER_KEY, WC_CONSUMER_SECRET),
json=payload,
headers={"Host": "tidozone.com"}
) as resp:
if resp.status in (200, 201):
logger.info(f" Updated variation {variation_id} meta {meta_key} (attempt {attempt})")
return await resp.json()
text = await resp.text()
logger.warning(f" Variation meta update {variation_id} failed, status {resp.status}, attempt {attempt}/{retries}: {text}")
except Exception as e:
logger.warning(f" Exception updating variation meta {variation_id}, attempt {attempt}/{retries}: {e}")
if attempt < retries:
await asyncio.sleep(delay * attempt)
logger.error(f" Giving up updating variation {variation_id} after {retries} attempts")
return None
def diff_product(old: dict, new: dict) -> dict:
update_payload = {}
for key in ["regular_price", "sale_price"]:
update_payload[key] = new.get(key)
for key in ["name","type","sku","manage_stock","stock_status","external_url","description","status","post_author"]:
if old.get(key) != new.get(key):
update_payload[key] = new.get(key)
old_cats = sorted([c.get("id") for c in old.get("categories", [])])
new_cats = sorted([c.get("id") for c in new.get("categories", [])])
if old_cats != new_cats:
update_payload["categories"] = new.get("categories", [])
old_meta = {m["key"]: m["value"] for m in old.get("meta_data", [])}
new_meta = {m["key"]: m["value"] for m in new.get("meta_data", [])}
price_keys = {"_price", "_regular_price", "_sale_price"}
meta_updates = []
for k, v in new_meta.items():
if k in price_keys:
meta_updates.append({"key": k, "value": v})
elif not old_meta.get(k):
meta_updates.append({"key": k, "value": v})
if meta_updates:
update_payload["meta_data"] = meta_updates
return update_payload
async def update_wc_product(session, product_id, product_data):
result = await _request_with_retries(
session, "PUT", f"{WC_API_URL}/{product_id}",
auth=aiohttp.BasicAuth(WC_CONSUMER_KEY, WC_CONSUMER_SECRET),
json=product_data,
headers={"Host": "tidozone.com"}
)
if not isinstance(result, dict):
logger.error(f"PUT failed for product {product_id}, invalid response: {result}")
return None
if result.get("code"):
logger.error(f"WooCommerce error updating product {product_id}: {result}")
if result["code"] == "product_invalid_sku" and isinstance(result.get("data"), dict):
return {"sku_conflict": True, "resource_id": result["data"].get("resource_id"), "unique_sku": result["data"].get("unique_sku")}
return None
logger.info(f"✅ Updated product {result.get('id')} Name: {result.get('name')}")
return result
async def create_wc_product(session, product_data):
result = await _request_with_retries(
session, "POST", WC_API_URL,
auth=aiohttp.BasicAuth(WC_CONSUMER_KEY, WC_CONSUMER_SECRET),
json=product_data,
headers={"Host": "tidozone.com"}
)
if not isinstance(result, dict):
logger.error(f"POST failed for SKU={product_data.get('sku')}, invalid response: {result}")
return None
if result.get("code"):
logger.error(f"WooCommerce error creating product SKU={product_data.get('sku')}: {result}")
if result["code"] == "product_invalid_sku" and isinstance(result.get("data"), dict):
return {"sku_conflict": True, "resource_id": result["data"].get("resource_id")}
return None
logger.info(f"✅ Created product {result.get('id')} Name: {result.get('name')}")
return result
async def delete_wc_product(session, product_id):
result = await _request_with_retries(
session, "DELETE", f"{WC_API_URL}/{product_id}",
auth=aiohttp.BasicAuth(WC_CONSUMER_KEY, WC_CONSUMER_SECRET),
params={"force": "true"},
headers={"Host": "tidozone.com"}
)
if not isinstance(result, dict):
logger.error(f"DELETE failed for product {product_id}, invalid response: {result}")
return None
return result
async def get_existing_variations_for_parent(session, parent_id):
variations = []
per_page = 100
page = 1
try:
while True:
async with session.get(
f"{WC_API_URL}/{parent_id}/variations",
auth=aiohttp.BasicAuth(WC_CONSUMER_KEY, WC_CONSUMER_SECRET),
params={"per_page": per_page, "page": page},
headers={"Host": "tidozone.com"}
) as resp:
if resp.status == 200:
data = await resp.json()
if not data:
break
variations.extend(data)
if len(data) < per_page:
break
page += 1
else:
txt = await resp.text()
logger.warning(f"Failed to fetch variations for {parent_id} page {page}: {txt}")
break
except Exception as e:
logger.error(f"Exception fetching variations for parent {parent_id}: {e}")
return variations
def diff_variation(old: dict, new: dict) -> dict:
update_payload = {}
for key in ["regular_price", "sale_price"]:
update_payload[key] = new.get(key)
for key in ["sku","manage_stock","stock_status","description","image"]:
if old.get(key) != new.get(key):
update_payload[key] = new.get(key)
old_attrs = sorted(old.get("attributes", []), key=lambda x: (x.get("id"), x.get("name")))
new_attrs = sorted(new.get("attributes", []), key=lambda x: (x.get("id"), x.get("name")))
if old_attrs != new_attrs:
update_payload["attributes"] = new.get("attributes", [])
old_meta = {m["key"]: m["value"] for m in old.get("meta_data", [])}
new_meta = {m["key"]: m["value"] for m in new.get("meta_data", [])}
price_keys = {"_price", "_regular_price", "_sale_price"}
meta_updates = []
for k, v in new_meta.items():
if k in price_keys:
meta_updates.append({"key": k, "value": v})
elif not old_meta.get(k):
meta_updates.append({"key": k, "value": v})
if meta_updates:
update_payload["meta_data"] = meta_updates
return update_payload
async def update_wc_variation(session, parent_id, variation_id, variation_payload):
post_payload = {k: v for k, v in variation_payload.items() if k != "fifu_image_url"}
result = await _request_with_retries(
session, "PUT", f"{WC_API_URL}/{parent_id}/variations/{variation_id}",
auth=aiohttp.BasicAuth(WC_CONSUMER_KEY, WC_CONSUMER_SECRET),
json=post_payload,
headers={"Host": "tidozone.com"}
)
if not isinstance(result, dict):
logger.error(f"Variation update failed for {variation_id}, invalid response: {result}")
return None
if result.get("code"):
logger.error(f"WooCommerce error updating variation {variation_id}: {result}")
return None
if not result.get("sku") and variation_payload.get("sku"):
result["sku"] = variation_payload["sku"]
logger.warning(f" ⚠️ WC response missing SKU for variation {result.get('id')}, injected from payload: {variation_payload['sku']}")
logger.info(f"✅ Updated variation {result.get('id')} for parent {parent_id} SKU: {result.get('sku')}")
return result
async def create_wc_variation(session, parent_id, variation_payload):
url = f"{WC_API_URL}/{parent_id}/variations"
post_payload = {k: v for k, v in variation_payload.items() if k != "fifu_image_url"}
resp = await session.post(
url,
auth=aiohttp.BasicAuth(WC_CONSUMER_KEY, WC_CONSUMER_SECRET),
json=post_payload,
headers={"Host": "tidozone.com"}
)
try:
result = await resp.json(content_type=None)
except Exception:
result = None
if resp.status >= 400:
logger.error(f"❌ POST {url} failed, status {resp.status}, body={result}")
return result
if not isinstance(result, dict):
logger.error(f"Variation creation failed for parent {parent_id}, invalid response: {result}")
return None
if not result.get("sku") and variation_payload.get("sku"):
result["sku"] = variation_payload["sku"]
logger.warning(f" ⚠️ WC response missing SKU for variation {result.get('id')}, injected from payload: {variation_payload['sku']}")
logger.info(f"✅ Created variation {result.get('id')} for parent {parent_id} SKU: {result.get('sku')}")
return result
async def delete_variation(session, parent_id, variation_id):
logger.info(f"Deleting variation {variation_id} from parent {parent_id}")
return await _request_with_retries(
session, "DELETE", f"{WC_API_URL}/products/{parent_id}/variations/{variation_id}?force=true",
auth=aiohttp.BasicAuth(WC_CONSUMER_KEY, WC_CONSUMER_SECRET)
)
async def create_or_update_wc_variation(session, parent_id, variation_payload, existing_variations_cache, processed_skus):
sku_raw = (variation_payload.get("sku") or "").strip()
sku_lower = sku_raw.lower()
if not sku_raw:
logger.warning(" Skipping variation with empty SKU")
return None
if sku_lower in processed_skus:
logger.info(f" SKU {sku_raw} already processed in this run, skipping duplicate.")
return next((v for v in existing_variations_cache if (v.get("sku") or "").strip().lower() == sku_lower), None)
match = next((v for v in existing_variations_cache if (v.get("sku") or "").strip().lower() == sku_lower), None)
if match:
var_id = match.get("id")
logger.info(f" Updating existing variation {var_id} for SKU {sku_raw}")
updated = await update_wc_variation(session, parent_id, var_id, variation_payload)
if updated:
for i, v in enumerate(existing_variations_cache):
if v.get("id") == var_id:
existing_variations_cache[i] = updated
break
processed_skus.add(sku_lower)
return updated
logger.error(f" Failed to update variation {var_id} for SKU {sku_raw}.")
return None
logger.info(f" Creating new variation for SKU {sku_raw}")
created = await create_wc_variation(session, parent_id, variation_payload)
if not created:
logger.error(f" Failed to create variation for SKU {sku_raw}.")
return None
if isinstance(created, dict) and created.get("code") == "product_invalid_sku":
unique_sku = None
conflict_id = None
data = created.get("data", {})
if isinstance(data, dict):
conflict_id = data.get("resource_id")
unique_sku = data.get("unique_sku")
if unique_sku:
logger.warning(f"⚠️ Duplicate SKU conflict: {unique_sku}")
db_id = find_conflicting_product_id(unique_sku)
if db_id:
await delete_wc_product(session, db_id)
cleanup_wc_sku_lookup()
await asyncio.sleep(2)
else:
logger.error(f"Could not find conflicting product for unique_sku={unique_sku}")
if conflict_id:
await delete_wc_product(session, conflict_id)
cleanup_wc_sku_lookup()
await asyncio.sleep(2)
force_delete_sku_from_db(unique_sku)
cleanup_wc_sku_lookup()
await asyncio.sleep(2)
elif conflict_id:
logger.warning(f"⚠️ Duplicate SKU {sku_raw}. Deleting conflict {conflict_id}.")
await delete_variation(session, parent_id, conflict_id)
cleanup_wc_sku_lookup()
await asyncio.sleep(2)
await flush_wc_caches(session)
retry = await create_wc_variation(session, parent_id, variation_payload)
if retry and not (isinstance(retry, dict) and retry.get("code")):
existing_variations_cache.append(retry)
processed_skus.add(sku_lower)
return retry
logger.error(f"❌ Retry failed for SKU {sku_raw} after conflict cleanup")
return None
existing_variations_cache.append(created)
processed_skus.add(sku_lower)
return created
async def find_variation_by_sku(session, parent_id, sku):
async with session.get(
f"{WC_API_URL}/products/{parent_id}/variations?per_page=100",
auth=aiohttp.BasicAuth(WC_CONSUMER_KEY, WC_CONSUMER_SECRET)
) as resp:
if resp.status == 200:
for var in await resp.json():
if var.get("sku") == sku:
return var
else:
logger.error(f"Failed to fetch variations for parent {parent_id}: {await resp.text()}")
return None
# =========================
# Transformation
# =========================
def safe_json_loads(s):
try:
if s is None: return None
if isinstance(s, (list, dict)): return s
return json.loads(s)
except Exception:
return None
def _num(val, default=0.0):
try: return float(val)
except Exception: return default
def _as_list_images(imgs):
if imgs is None: return []
if isinstance(imgs, list): return [str(u) for u in imgs if str(u)]
if isinstance(imgs, str):
parsed = safe_json_loads(imgs)
if isinstance(parsed, list): return [str(u) for u in parsed if str(u)]
if "|" in imgs: return [p.strip() for p in imgs.split("|") if p.strip()]
return [imgs.strip()]
return []
def extract_variants_from_cj(detail):
variant_sources = []
for key in ("variantList", "variants", "productVariant", "productVariants", "variantListDTO"):
if key in detail and detail.get(key):
variant_sources = detail.get(key)
break
if not variant_sources:
return [], []
attr_names = []
product_key_en = detail.get("productKeyEn", "")
if isinstance(product_key_en, str) and product_key_en:
attr_names = [k.strip() for k in product_key_en.split("-")]
variation_payloads = []
attr_values = {}
for variant in variant_sources:
if isinstance(variant, str):
parsed = safe_json_loads(variant)
if parsed: variant = parsed
else: continue
if not variant.get("vid"):
continue
if not variant.get("variantSku"):
logger.debug(f"Skipping variant with no variantSku (vid={variant.get('vid')})")
continue
properties = []
variant_key = variant.get("variantKey", "")
if variant_key:
parts = [p.strip() for p in variant_key.split("-")]
for i, part in enumerate(parts):
name = attr_names[i] if i < len(attr_names) else f"Option{i+1}"
properties.append((name, part))
else:
name = attr_names[0] if attr_names else "Option1"
properties.append((name, ""))
attr_list = []
for name, val in properties:
attr_list.append({"name": name, "option": val})
attr_values.setdefault(name, set()).add(val)
try:
regular_price = float(variant.get("variantSellPrice")) * PRICE_FACTOR
except Exception:
regular_price = 0.0
try:
weight_kg = round(float(variant.get("variantWeight", 0)) / 1000, 2)
weight_str = str(weight_kg)
except Exception:
weight_str = "0"
length = str(variant.get("variantLength") or "")
width = str(variant.get("variantWidth") or "")
height = str(variant.get("variantHeight") or "")
variant_standard = variant.get("variantStandard") or (
f"long={length},width={width},height={height}" if (length and width and height) else ""
)
create_time = str(variant.get("createrTime") or variant.get("createTime") or "")
variation_payload = {
"attributes": attr_list,
"productId": "",
"regular_price": f"{regular_price:.2f}",
"sku": variant.get("variantSku") or "",
"weight": weight_str,
"dimensions": {"length": length, "width": width, "height": height},
"fifu_image_url": variant.get("variantImage") or "",
"vid": variant.get("vid", ""),
"meta_data": [
# vid written directly via SQL after creation to avoid WC REST API duplication
{"key": "pid", "value": str(variant.get("pid", ""))},
{"key": "variantKey", "value": variant.get("variantKey", "")},
{"key": "variantSku", "value": variant.get("variantSku", "")},
{"key": "variantStandard", "value": variant_standard},
{"key": "createTime", "value": create_time},
{"key": "startCountryCode", "value": "CN"},
{"key": "srcAreaCode", "value": "CN"},
]
}
variation_payloads.append(variation_payload)
parent_attributes = [
{
"name": name, "options": sorted(list(vals)),
"position": idx, "variation": True, "visible": True
}
for idx, (name, vals) in enumerate(attr_values.items())
]
return parent_attributes, variation_payloads
def transform_product(detail):
try:
name = detail.get("productNameEn") or detail.get("productName") or "Untitled"
sku = detail.get("productSku", "")
images = _as_list_images(detail.get("productImageSet") or detail.get("productImage"))
fifu_image_url = images[0] if images else ""
fifu_list_url = "|".join(images[1:]) if len(images) > 1 else ""
categories = []
cat_name = (detail.get("categoryName") or "").split(" > ")[-1].strip()
categories.append({"name": cat_name if cat_name else "cjdropshipping"})
sell_price = _num(detail.get("sellPrice")) * PRICE_FACTOR
price_str = str(sell_price) if sell_price > 0 else ""
vlist = None
for k in ("variants","variantList","productVariant","productVariants","variantListDTO"):
seq = detail.get(k)
if isinstance(seq, list) and seq:
vlist = seq; break
if vlist:
v0 = vlist[0]
length = str(v0.get("variantLength") or "")
width = str(v0.get("variantWidth") or "")
height = str(v0.get("variantHeight") or "")
weight_g = _num(v0.get("variantWeight", 0))
else:
length = str(detail.get("productLength") or "")
width = str(detail.get("productWidth") or "")
height = str(detail.get("productHeight") or "")
weight_g = _num(detail.get("packingWeight", "0").split("-")[0] if isinstance(detail.get("packingWeight"), str) else detail.get("packingWeight", 0))
weight_kg = round(weight_g / 1000.0, 2)
weight_str = f"{weight_kg:.2f}" if weight_kg > 0 else ""
tags = [{"name": t} for t in (detail.get("productProEnSet") or [])]
create_time = str(detail.get("createrTime") or detail.get("createTime") or "")
return {
"name": name, "type": "simple", "sku": sku,
"regular_price": price_str, "sale_price": "",
"categories": categories,
"description": detail.get("description", "") or detail.get("remark", ""),
"short_description": detail.get("remark", "") or "",
"status": "draft", "manage_stock": False, "stock_status": "instock",
"weight": weight_str,
"dimensions": {"length": length, "width": width, "height": height},
"tags": tags,
"meta_data": [
{"key": "fifu_image_url", "value": fifu_image_url},
{"key": "fifu_list_url", "value": fifu_list_url},
{"key": "productType", "value": detail.get("productType", "")},
{"key": "sourceFrom", "value": str(detail.get("sourceFrom", ""))},
{"key": "shippingCountryCodes", "value": ",".join(detail.get("shippingCountryCodes", [])) if detail.get("shippingCountryCodes") else ""},
{"key": "pid", "value": str(detail.get("pid", "") or "").strip()},
{"key": "createTime", "value": create_time},
{"key": "entryNameEn", "value": detail.get("entryNameEn", "") or detail.get("entryName", "")},
{"key": "entryCode", "value": detail.get("entryCode", "")},
{"key": "productUnit", "value": detail.get("productUnit", "") or ""},
{"key": "materialNameEn", "value": json.dumps(detail.get("materialNameEnSet") or detail.get("materialNameEn") or "")},
{"key": "materialKey", "value": json.dumps(detail.get("materialKeySet") or detail.get("materialKey") or "")},
{"key": "packingNameEn", "value": json.dumps(detail.get("packingNameEnSet") or detail.get("packingNameEn") or "")},
{"key": "packingKey", "value": json.dumps(detail.get("packingKeySet") or detail.get("packingKey") or "")},
{"key": "productKeyEn", "value": detail.get("productKeyEn", "")},
{"key": "listedNum", "value": str(detail.get("listedNum", ""))},
{"key": "supplierName", "value": detail.get("supplierName", "") or ""},
{"key": "supplierId", "value": str(detail.get("supplierId", "") or "")},
{"key": "suggestSellPrice", "value": str(detail.get("suggestSellPrice", ""))},
{"key": "startCountryCode", "value": "CN"},
{"key": "srcAreaCode", "value": "CN"},
]
}
except Exception as e:
logger.error(f"Error transforming product: {e}")
sys.exit(1)
# =========================
# Reviews
# =========================
async def product_has_reviews(session, product_id):
try:
async with session.get(
f"{WC_REVIEW_API_URL}?product={product_id}",
auth=aiohttp.BasicAuth(WC_CONSUMER_KEY, WC_CONSUMER_SECRET),
headers={"Host": "tidozone.com"}
) as resp:
if resp.status == 200:
return await resp.json()
except Exception as e:
logger.warning(f"Failed to check reviews for product {product_id}: {e}")
return []
async def add_reviews(session, product_id, count=10):
existing_reviews = await product_has_reviews(session, product_id)
existing_comments = {r.get('review') for r in existing_reviews if isinstance(r, dict)}
for _ in range(count):
if not REVIEW_COMMENTS:
break
comment = random.choice(REVIEW_COMMENTS)
if comment in existing_comments:
continue
reviewer, email = random_reviewer()
try:
async with session.post(
WC_REVIEW_API_URL,
auth=aiohttp.BasicAuth(WC_CONSUMER_KEY, WC_CONSUMER_SECRET),
json={"product_id": product_id, "review": comment, "reviewer": reviewer,
"reviewer_email": email, "rating": random.randint(4, 5)},
headers={"Host": "tidozone.com"}
) as resp:
if resp.status in (200, 201):
logger.info(f"✅ Added review for product {product_id}")
else:
logger.warning(f"⚠️ Review failed for product {product_id}, status {resp.status}")
except Exception as e:
logger.error(f"❌ Failed to add review to product {product_id}: {e}")
async def fetch_pretty_permalink(session, product_id):
async with session.get(
f"{WC_API_URL}/{product_id}",
auth=aiohttp.BasicAuth(WC_CONSUMER_KEY, WC_CONSUMER_SECRET),
headers={"Accept": "application/json", "Host": "tidozone.com"}
) as resp:
content_type = resp.headers.get("Content-Type", "")
if resp.status == 200 and "application/json" in content_type:
wc_data = await resp.json()
permalink = wc_data.get("permalink")
if permalink and "post_type=product&p=" not in permalink:
return permalink.strip()
else:
text = await resp.text()
logger.error(f"fetch_pretty_permalink: unexpected response ({resp.status}, {content_type}): {text[:200]}")
return None
# =========================
# Orchestration
# =========================
async def process_product(pid, session, token, sem) -> bool:
async with sem:
logger.info(f"Processing PID: {pid}")
detail = await fetch_cj_product_detail(session, pid, token)
if not detail:
logger.error(f"❌ No detail for PID {pid}.")
return False
wc_data = transform_product(detail)
if not wc_data:
logger.warning(f"Skipping PID {pid}: transformation failed")
return False
sku = wc_data.get("sku")
if not sku:
logger.warning(f"Skipping PID {pid}: empty SKU")
return False
# Guard 1: source data must have at least one variant with vid
variant_seq = None
for key in ("variants","variantList","productVariant","productVariants","variantListDTO"):
seq = detail.get(key)
if isinstance(seq, list) and seq:
variant_seq = seq; break
if not variant_seq or not any(v.get("vid") for v in variant_seq if isinstance(v, dict)):
logger.info(f"⏩ Skipping PID {pid} (SKU {sku}): no CJ variants have 'vid'.")
return False
parent_attributes, variation_payloads = extract_variants_from_cj(detail)
# Guard 2: all variants may have been filtered out
if not variation_payloads:
logger.info(f"⏩ Skipping PID {pid} (SKU {sku}): all variants filtered (no vid).")
return False
wc_data["type"] = "variable"
wc_data["attributes"] = parent_attributes or []
wc_data["status"] = "draft"
existing = await get_existing_wc_product_by_sku(session, sku)
result = await update_wc_product(session, existing["id"], wc_data) if existing else await create_wc_product(session, wc_data)
if result and result.get("sku_conflict"):
conflict_id = result.get("resource_id")
unique_sku = result.get("unique_sku") or (sku + "-1")
logger.warning(f"SKU conflict for {sku} (WC ID {conflict_id} unique_sku={unique_sku}). Trying update first.")
result = await update_wc_product(session, conflict_id, wc_data)
if result and result.get("id"):
logger.info(f"✅ Resolved SKU conflict by updating existing product {conflict_id}")
else:
logger.warning(f"Update failed, deleting conflict {conflict_id} and recreating.")
await delete_wc_product(session, conflict_id)
await asyncio.sleep(1)
force_delete_sku_from_db(sku)
force_delete_sku_from_db(unique_sku)
cleanup_wc_sku_lookup()
await flush_wc_caches(session)
await asyncio.sleep(3)
result = await create_wc_product(session, wc_data)
if not result or result.get("sku_conflict"):
logger.error(f"❌ SKU conflict persists for {sku} after cleanup, skipping.")
result = None
if not result or not result.get("id"):
logger.error(f"❌ Failed to create/update parent for PID {pid} (SKU {sku}).")
return False
product_id = result["id"]
logger.info(f"Processing {len(variation_payloads)} variations for parent {product_id}")
existing_variations_cache = await get_existing_variations_for_parent(session, product_id)
# Delete stale variations no longer in CJ data
new_skus = {vp['sku'].strip().lower() for vp in variation_payloads if vp.get('sku')}
for existing_var in existing_variations_cache:
esku = (existing_var.get('sku') or '').strip().lower()
if esku and esku not in new_skus:
logger.info(f" Deleting stale variation {existing_var['id']} SKU={esku} (not in CJ data)")
await _request_with_retries(
session, 'DELETE',
f"{WC_API_URL}/{product_id}/variations/{existing_var['id']}",
auth=aiohttp.BasicAuth(WC_CONSUMER_KEY, WC_CONSUMER_SECRET),
params={'force': 'true'},
headers={'Host': 'tidozone.com'}
)
processed_skus = set()
async def process_single_variation(var_payload):
var_payload["productId"] = str(product_id)
for a in var_payload.get("attributes", []):
a["name"] = str(a.get("name", "Option1"))
a["option"] = str(a.get("option", ""))
if not var_payload.get("regular_price"):
var_payload["regular_price"] = str(result.get("regular_price") or result.get("price") or "")
created_var = await create_or_update_wc_variation(
session, product_id, var_payload, existing_variations_cache, processed_skus
)
if created_var:
var_id = created_var["id"]
vid_val = var_payload.get("vid", "")
# Write vid directly to DB — WC REST API appends and creates duplicates
if vid_val:
conn = mysql.connector.connect(**db_config)
try:
with conn.cursor() as cursor:
cursor.execute(
"DELETE FROM wp_514_postmeta WHERE post_id = %s AND meta_key = %s",
(var_id, "vid")
)
cursor.execute(
"INSERT INTO wp_514_postmeta (post_id, meta_key, meta_value) VALUES (%s, %s, %s)",
(var_id, "vid", vid_val)
)
conn.commit()
except Exception as e:
logger.error(f" DB meta write failed for variation {var_id}: {e}")
finally:
conn.close()
fifu_url = var_payload.get("fifu_image_url", "")
if fifu_url:
await update_variation_meta(session, product_id, var_id, "fifu_image_url", fifu_url)
return created_var
all_variation_results = []
for i in range(0, len(variation_payloads), 10):
chunk_results = await asyncio.gather(*[process_single_variation(p) for p in variation_payloads[i:i+10]])
all_variation_results.extend(chunk_results)
variation_data = []
for var_payload in variation_payloads:
matched_result = next(
(v for v in all_variation_results
if v and isinstance(v, dict)
and (v.get('sku') or '').strip().lower() == (var_payload.get('sku') or '').strip().lower()),
None
)
if not matched_result:
payload_vid = var_payload.get('vid', '')
matched_result = next(
(v for v in all_variation_results
if v and isinstance(v, dict)
and any(m.get('key') == 'vid' and m.get('value') == payload_vid
for m in v.get('meta_data', []))),
None
)
if not matched_result:
continue
dims = var_payload.get('dimensions', {})
variation_data.append({
'id': matched_result.get('id'),
'sku': var_payload.get('sku', ''),
'regular_price': var_payload.get('regular_price', ''),
'vid': var_payload.get('vid', ''),
'weight': var_payload.get('weight', ''),
'length': dims.get('length', '') if isinstance(dims, dict) else '',
'width': dims.get('width', '') if isinstance(dims, dict) else '',
'height': dims.get('height', '') if isinstance(dims, dict) else '',
})
if len(variation_data) < len(variation_payloads):
missing = len(variation_payloads) - len(variation_data)
missed_skus = [
vp.get('sku') for vp in variation_payloads
if not any(vd['sku'] == vp.get('sku') for vd in variation_data)
]
logger.warning(
f"[VARIATION-MATCH] ⚠️ {missing} variation(s) not matched for product {product_id}. "
f"Missed SKUs: {missed_skus}"
)
parent_price = next((vp['regular_price'] for vp in variation_payloads if vp.get('regular_price')), '')
sync_sku = wc_data.get('sku', '')
logger.info(f"[FORCE-SYNC-INPUT] product_id={product_id} sku='{sync_sku}' price='{parent_price}' variation_payloads_count={len(variation_payloads)} variation_data_count={len(variation_data)}")
logger.info(f"[FORCE-SYNC-INPUT] first_vp={variation_payloads[0] if variation_payloads else 'EMPTY'}")
# Reviews
try:
existing_reviews = await get_reviews(session, product_id)
if existing_reviews:
logger.info(f"Skipping reviews for {product_id}: {len(existing_reviews)} already exist.")
else:
existing_comments = {r.get('review') for r in existing_reviews if isinstance(r, dict)}
review_payloads = []
for _ in range(10):
if not REVIEW_COMMENTS:
break
comment = random.choice(REVIEW_COMMENTS)
if comment in existing_comments:
continue
reviewer, email = random_reviewer()
review_payloads.append({
"product_id": product_id,
"review": comment,
"reviewer": reviewer,
"reviewer_email": email,
"rating": random.randint(4, 5)
})
existing_comments.add(comment)
async def post_single_review(payload):
try:
async with session.post(
WC_REVIEW_API_URL,
auth=aiohttp.BasicAuth(WC_CONSUMER_KEY, WC_CONSUMER_SECRET),
json=payload,
headers={"Host": "tidozone.com"}
) as resp:
if resp.status in (200, 201):
logger.info(f"✅ Added review for product {product_id}")
else:
logger.warning(f"⚠️ Review failed for product {product_id}, status {resp.status}")
except Exception as e:
logger.error(f"❌ Failed to add review to product {product_id}: {e}")
for i in range(0, len(review_payloads), 10):
await asyncio.gather(*[post_single_review(p) for p in review_payloads[i:i+10]])
except Exception as e:
logger.error(f"❌ Error handling reviews for product {product_id}: {e}")
# Force draft before publish
try:
await asyncio.wait_for(
update_wc_product(session, product_id, {"status": "draft"}),
timeout=30
)
except Exception:
pass
# Publish
publish_ok = False
for pub_attempt in range(1, 4):
try:
pub_result = await asyncio.wait_for(
update_wc_product(session, product_id, {"status": "publish"}),
timeout=30
)
if pub_result and pub_result.get("id"):
logger.info(f"[PUBLISH] ✅ Published product {product_id} (SKU {sku}) attempt {pub_attempt}.")
publish_ok = True
break
else:
logger.warning(f"[PUBLISH] ⚠️ Attempt {pub_attempt} returned no result for {product_id}.")
except asyncio.TimeoutError:
logger.warning(f"[PUBLISH] ⚠️ Timeout on attempt {pub_attempt} for {product_id}, checking DB...")
try:
conn = mysql.connector.connect(**db_config)
cursor = conn.cursor()
cursor.execute("SELECT post_status FROM wp_514_posts WHERE ID = %s", (product_id,))
row = cursor.fetchone()
if row and row[0] == 'publish':
logger.info(f"[PUBLISH] ✅ Product {product_id} confirmed published in DB despite timeout.")
publish_ok = True
break
except Exception as dbe:
logger.warning(f"[PUBLISH] DB check failed: {dbe}")
finally:
try: cursor.close(); conn.close()
except Exception: pass
except Exception as e:
logger.warning(f"[PUBLISH] ⚠️ Attempt {pub_attempt} failed for {product_id}: {e}.")
await asyncio.sleep(2)
if not publish_ok:
logger.error(f"[PUBLISH] ❌ Failed to publish product {product_id} after 3 attempts.")
return False
# AFTER publish — WC publish hook overwrites postmeta so sync must run last
force_sync_product_meta(product_id, wc_data.get('sku', ''), parent_price, variation_data)
try:
conn = mysql.connector.connect(**db_config)
cursor = conn.cursor()
cursor.execute("""
INSERT INTO wp_514_wc_product_meta_lookup
(product_id, sku, `virtual`, downloadable, min_price, max_price,
onsale, stock_quantity, stock_status, rating_count, average_rating, total_sales)
SELECT
p.ID,
COALESCE(MAX(CASE WHEN pm.meta_key = '_sku' THEN pm.meta_value END), ''),
0, 0,
COALESCE(CAST(MAX(CASE WHEN pm.meta_key = '_regular_price' THEN pm.meta_value END) AS DECIMAL(10,4)), 0),
COALESCE(CAST(MAX(CASE WHEN pm.meta_key = '_regular_price' THEN pm.meta_value END) AS DECIMAL(10,4)), 0),
0, NULL, 'instock', 0, 0, 0
FROM wp_514_posts p
LEFT JOIN wp_514_postmeta pm ON pm.post_id = p.ID
WHERE p.post_parent = %s
AND p.post_type = 'product_variation'
AND p.post_status != 'trash'
AND p.ID NOT IN (SELECT product_id FROM wp_514_wc_product_meta_lookup)
GROUP BY p.ID
ON DUPLICATE KEY UPDATE stock_status = 'instock'
""", (product_id,))
rescued = cursor.rowcount
conn.commit()
if rescued:
logger.warning(f"[LOOKUP-RESCUE] Inserted {rescued} missing lookup rows for product {product_id}")
except Exception as e:
logger.error(f"[LOOKUP-RESCUE] failed for product {product_id}: {e}")
finally:
try: cursor.close(); conn.close()
except Exception: pass
# Verify AFTER post-publish sync
is_complete, issues = verify_product_complete(product_id, sku, parent_price, variation_data)
if not is_complete:
logger.error(
f"[VERIFY-FAIL] ❌ Product {product_id} (SKU {sku}) failed completeness check after publish. "
f"Deleting. Issues: {issues}"
)
await delete_wc_product(session, product_id)
return False
logger.info(f"[VERIFY] ✅ Product {product_id} (SKU {sku}) passed completeness check.")
# Sitemap
try:
pretty_url = await fetch_pretty_permalink(session, product_id)
if pretty_url:
parsed = urlparse(pretty_url)
url_with_port = urlunparse(parsed._replace(netloc=f"{parsed.hostname}:444"))
old_content = ""
if Path(SITEMAP_FILE).exists():
async with aiofiles.open(SITEMAP_FILE, "r") as f:
old_content = await f.read()
async with aiofiles.open(SITEMAP_FILE, "w") as f:
await f.write(url_with_port + "\n" + old_content)
logger.info(f"📝 Prepended to sitemap: {url_with_port}")
except Exception as e:
logger.error(f"Failed to write sitemap for product {product_id}: {e}")
return True
async def process_product_with_tracking(
pid: str,
session: aiohttp.ClientSession,
token: str,
sem: asyncio.Semaphore,
imported_products: Set[str],
save_counter: dict,
category_id: str
) -> bool:
if pid in imported_products:
logger.debug(f"Product {pid} already imported, skipping.")
return True
try:
success = await process_product(pid, session, token, sem)
if success:
imported_products.add(pid)
save_counter['count'] += 1
if save_counter['count'] % SAVE_INTERVAL == 0:
save_imported_products_for_category(category_id, imported_products)
logger.info(f"💾 Progress saved: {len(imported_products)} products in category {category_id}")
logger.info(f"✅ Product {pid} imported successfully")
else:
logger.info(f"⏭️ Product {pid} not added to imported tracking (failed or skipped).")
return success
except Exception as e:
logger.error(f"❌ Unexpected exception processing product {pid}: {e}")
return False
async def main():
try:
repair_missing_parent_meta()
logger.info("Parent meta repair completed.")
ensure_fallback_category()
cleanup_wc_sku_lookup()
cleanup_duplicate_meta()
logger.info("MySQL cleanup completed.")
token = await get_cj_access_token()
if not token:
logger.error(f"Cannot obtain CJ token for {get_current_cj_account()['email']}, trying other accounts...")
for acc in CJ_ACCOUNTS:
token = await get_cj_access_token(acc)
if token:
logger.info(f"✅ Got token using account {acc['email']}")
break
if not token:
logger.error("All CJ accounts failed to get token, exiting.")
return
await fetch_and_save_categories(token)
logger.info("Categories saved.")
all_categories = iterate_all_category_ids()
logger.info(f"Loaded {len(all_categories)} categories. First 5: {all_categories[:5]}")
if not all_categories:
logger.error("No categories found.")
return
last_completed = get_last_completed_category()
if last_completed and last_completed not in all_categories:
logger.warning(f"Last completed category {last_completed} not in list. Resetting.")
last_completed = None
start_idx = all_categories.index(last_completed) + 1 if last_completed else 0
remaining = len(all_categories[start_idx:])
logger.info(f"Starting at index {start_idx}, {remaining} categories to process.")
if remaining == 0:
logger.info("No categories left. Exiting.")
return
async with aiohttp.ClientSession() as session:
for category_id in all_categories[start_idx:]:
logger.info("=" * 80)
logger.info(f"📂 Processing category: {category_id}")
imported_products = load_imported_products_for_category(category_id)
save_counter = {'count': 0}
if imported_products:
logger.info(f"📊 {len(imported_products)} already imported in this category")
current_page = read_current_page(category_id)
category_new_products = 0
while True:
logger.info(f"📄 Page {current_page} for category {category_id}")
pids = await fetch_cj_product_list(session, token, current_page, category_id)
if not pids:
logger.info(f"No products on page {current_page}. Category {category_id} done.")
break
new_pids = [p for p in pids if p not in imported_products]
skipped_count = len(pids) - len(new_pids)
if skipped_count:
logger.info(f"⏭️ Skipping {skipped_count} already imported on page {current_page}")
if new_pids:
logger.info(f"🔄 Processing {len(new_pids)} new products on page {current_page}")
sem = asyncio.Semaphore(CONCURRENCY)
tasks = [
process_product_with_tracking(p, session, token, sem, imported_products, save_counter, category_id)
for p in new_pids
]
results = await asyncio.gather(*tasks, return_exceptions=True)
successes = sum(1 for r in results if r is True)
failures = len(results) - successes
category_new_products += successes
logger.info(f"📊 Page {current_page}: ✅ {successes} success, ❌ {failures} failed")
if SLEEP_BETWEEN_PRODUCTS:
await asyncio.sleep(SLEEP_BETWEEN_PRODUCTS)
else:
logger.info(f"⏭️ All products on page {current_page} already imported")
save_imported_products_for_category(category_id, imported_products)
write_current_page(category_id, current_page)
current_page += 1
logger.info("=" * 80)
logger.info(f"✅ Category {category_id} done. New: {category_new_products}, Total: {len(imported_products)}")
logger.info("=" * 80)
mark_category_completed(category_id)
write_current_page(category_id, 1)
clear_category_product_tracking()
logger.info("🎉 All categories processed.")
except Exception as e:
logger.exception(f"Fatal error in main: {e}")
if __name__ == "__main__":
asyncio.run(main())
Đồ đá banh cho bé thun lạnh cao cấp, mặc hè mát mẻ, thấm hút mồ hôi, co giãn tốt, dễ vận động SV14 | ViKi Sport - Tidozone.vn
Trang chủ Uncategorized Đồ đá banh cho bé thun lạnh cao cấp, mặc hè mát mẻ, thấm hút mồ hôi, co giãn tốt, dễ vận động SV14 | ViKi Sport
Đồ đá banh cho bé thun lạnh cao cấp, mặc hè mát mẻ, thấm hút mồ hôi, co giãn tốt, dễ vận động SV14 | ViKi Sport
✨VIKI SPORTS – CHUYÊN CUNG CẤP THỜI TRANG THỂ THAO CHO BÉ
✔️ THÔNG TIN SẢN PHẨM
– Set gồm: 1 áo + 1 quần thể thao năng động
– Chất liệu: Thun lạnh cao cấp, co giãn 4 chiều, thoáng mát, thấm hút mồ hôi cực tốt
– Kiểu dáng: Tay ngắn, vải thun trơn mềm mịn, dễ mặc cho cả bé trai và bé gái
✔️ HƯỚNG DẪN CHỌN SIZE
Size 1: 10–13kg | Cao 85–95cm (Dài áo 44cm – Rộng 32cm)
Size 3: 14–16kg | Cao 95–125cm (Dài áo 47cm – Rộng 33cm)
Size 5: 17–20kg | Cao 125–130cm (Dài áo 50cm – Rộng 35cm)
Size 7: 21–24kg | Cao 130–135cm (Dài áo 53cm – Rộng 37cm)
Size 9: 25–28kg | Cao 135–140cm (Dài áo 55cm – Rộng 39cm)
Size 11: 29–32kg | Cao 135–140cm (Dài áo 58cm – Rộng 40cm)
Size 13: 33–35kg | Cao 140–145cm (Dài áo 61cm – Rộng 42cm)
Size 15: 36–40kg | Cao 145–150cm (Dài áo 64cm – Rộng 44cm)
👉 Mẹo chọn size:
– Nếu bé mũm mĩm hoặc thích mặc rộng, nên chọn lên 1 size.
– Nếu phân vân giữa 2 size → chọn size lớn hơn (có thể chênh lệch ±1cm).
✔️ HƯỚNG DẪN BẢO QUẢN
– Giặt tay hoặc giặt máy nhiệt độ < 30°C
– Lộn trái áo trước khi giặt, không ngâm lâu, không dùng chất tẩy mạnh
– Phơi nơi thoáng mát, tránh ánh nắng trực tiếp
– Lần đầu nên xả nhẹ với nước lạnh, sau 3 ngày hãy giặt lần đầu tiên để áo giữ form đẹp
✔️ CHÍNH SÁCH MUA HÀNG
– Cam kết chất lượng: Sản phẩm Viki Sports được kiểm tra kỹ lưỡng trước khi giao, đảm bảo đúng mẫu và chất liệu như mô tả.
– Chính sách đổi trả: Hỗ trợ đổi/trả trong 15 ngày nếu sản phẩm lỗi hoặc sai sót từ nhà sản xuất.
– Hỗ trợ khách hàng: Đội ngũ Viki Sports luôn sẵn sàng tư vấn và hỗ trợ nhanh chóng, mang đến trải nghiệm mua sắm hài lòng nhất.
#quanaobongdatreem #bobongdachobetrai #dodabanhchobe #boquanaobongdachotreem #bododabanhtreem #aodabong #aodabonginten #bothethaobetrai #bobongda #boquanaodabong #bodabongtreem #quanaobongda #quanaodabanh #bododabongtreem #quanaobongrotreem #bobongro #VikiSport
Vendor Information
Store Name:
Tiki.vn
Vendor:
Tiki.vn
Address:
TP Hồ Chí Minh
No ratings found yet!
So sánh với sản phẩm tương tự Current Product: Đồ đá banh cho bé thun lạnh cao cấp, mặc hè mát mẻ, thấm hút mồ hôi, co giãn tốt, dễ vận động SV14 | ViKi Sport
Price |
Đã bán:
Giấy Vệ Sinh Lency Cao Cấp Không Lõi 3 Lớp 10 Cuộn- 8936020760643
Price |
Đã bán:
Thùng 24 Chai Trà Ô Long Tea Plus 450mlChai – 8934588873560
Price |
Đã bán:
Thùng 24 Chai Nước Tinh Khiết Dasani 510ml- 38935049510049
Price |
Đã bán:
Sách Hackers IELTS Basic- Speaking
Price |
Đã bán:
Set Bộ Nữ , Set Bộ Thun Cotton Hình Gấu Vàng
Price |
Đã bán:
Phô Mai Mozzarella Bottega Zelachi 200G- 8935128710093
Price |
Đã bán:
Thêm sản phẩm cùng cửa hàng
Đồ đá banh cho bé thun lạnh cao cấp, mặc hè mát mẻ, thấm hút mồ hôi, co giãn tốt, dễ vận động SV14 | ViKi Sport
Server IP: 192.168.1.241
0
Bùi Thị Phương –
Tôi đánh giá cao sự chuyên nghiệp trong khâu giao hàng và chăm sóc khách hàng. Mọi thắc mắc đều được giải đáp rõ ràng và nhanh chóng. Giao hàng siêu nhanh, nhân viên hỗ trợ chu đáo. Chất lượng dịch vụ tuyệt vời, chắc chắn sẽ quay lại mua lần sau.
Đỗ Văn Minh –
Tôi thực sự hài lòng với trải nghiệm mua sắm lần này. Thời gian giao hàng hợp lý, dịch vụ khách hàng phản hồi nhanh. Rất đáng tin cậy. Sản phẩm có chất lượng tốt, đúng như mong đợi. Giao hàng nhanh chóng, đóng gói cẩn thận. Nhân viên hỗ trợ nhiệt tình, rất hài lòng với dịch vụ.
Nguyễn Văn An –
Đây là lần thứ hai tôi đặt hàng tại đây và vẫn giữ được chất lượng tốt. Đóng gói kỹ, nhân viên hỗ trợ rất thân thiện và chuyên nghiệp. Tôi rất bất ngờ vì sự tận tình của đội ngũ hỗ trợ. Giao hàng đúng thời gian, thông tin rõ ràng và chính xác.
Hoàng Thị Sang –
Tôi rất bất ngờ vì sự tận tình của đội ngũ hỗ trợ. Giao hàng đúng thời gian, thông tin rõ ràng và chính xác. Sản phẩm có chất lượng tốt, đúng như mong đợi. Giao hàng nhanh chóng, đóng gói cẩn thận. Nhân viên hỗ trợ nhiệt tình, rất hài lòng với dịch vụ.