aziros/src/app/Console/Commands/ProcessMailQueue.php

197 lines
6.3 KiB
PHP

<?php
namespace App\Console\Commands;
use App\Models\MailQueue;
use Illuminate\Console\Command;
use Illuminate\Support\Facades\Mail;
class ProcessMailQueue extends Command
{
protected $signature = 'app:process-mail-queue';
protected $description = 'Process mail queue';
public function handle()
{
while (true) {
$jobs = MailQueue::where(function ($q) {
$q->where('status', 'pending')
->orWhere(function ($q) {
$q->where('status', 'processing')
->where('updated_at', '<', now()->subMinutes(2));
});
})
->where(function ($q) {
$q->whereNull('available_at')
->orWhere('available_at', '<=', now());
})
->orderBy('created_at')
->limit(10)
->get();
foreach ($jobs as $mail) {
// 🔥 ATOMIC LOCK
$updated = MailQueue::where('id', $mail->id)
->where(function ($q) {
$q->where('status', 'pending')
->orWhere(function ($q) {
$q->where('status', 'processing')
->where('updated_at', '<', now()->subMinutes(2));
});
})
->limit(1)
->update([
'status' => 'processing',
'updated_at' => now(),
]);
if (!$updated) {
continue;
}
try {
app()->setLocale($mail->locale);
$html = view('emails.' . $mail->template, $mail->meta)->render();
Mail::html($html, function ($message) use ($mail) {
$message->to($mail->to)
->subject($mail->subject ?? 'Mail');
});
$mail->update([
'sent_at' => now(),
'status' => 'sent',
'error' => null
]);
usleep(500000);
} catch (\Throwable $e) {
$tries = $mail->tries + 1;
$delay = 30 * $tries;
$status = $tries >= $mail->max_tries ? 'failed' : 'pending';
$mail->update([
'tries' => $tries,
'status' => $status,
'error' => $e->getMessage(),
'available_at' => $status === 'pending'
? now()->addSeconds($delay)
: null,
]);
}
}
// 🔥 dynamisches sleep
if ($jobs->isEmpty()) {
usleep(1000000); // idle
} else {
usleep(200000); // busy
}
}
}
}
//
//namespace App\Console\Commands;
//
//use App\Models\MailQueue;
//use Illuminate\Console\Attributes\Description;
//use Illuminate\Console\Attributes\Signature;
//use Illuminate\Console\Command;
//use Illuminate\Support\Facades\Mail;
//
//#[Signature('app:process-mail-queue')]
//#[Description('Command description')]
//class ProcessMailQueue extends Command
//{
// /**
// * Execute the console command.
// */
// public function handle()
// {
// while (true) {
//
// $jobs = MailQueue::where(function ($q) {
// $q->where('status', 'pending')
// ->orWhere(function ($q) {
// $q->where('status', 'processing')
// ->where('updated_at', '<', now()->subMinutes(2));
// });
// })
// ->limit(10)
// ->orderBy('created_at')
// ->get();
//
// $jobs->each(function ($mail) {
//
// // 🔥 LOCK setzen
// $updated = MailQueue::where('id', $mail->id)
// ->where(function ($q) {
// $q->where('status', 'pending')
// ->orWhere(function ($q) {
// $q->where('status', 'processing')
// ->where('updated_at', '<', now()->subMinutes(2));
// });
// })
// ->update([
// 'status' => 'processing',
// 'updated_at' => now(),
// ]);
//
// // wenn schon verarbeitet → skip
// if (!$updated) return;
//
// $mail->refresh();
//
// try {
//
// app()->setLocale($mail->locale);
//
// $html = view('emails.' . $mail->template, $mail->meta)->render();
//
// Mail::html($html, function ($message) use ($mail) {
// $message->to($mail->to)
// ->subject($mail->subject ?? 'Mail');
// });
//
// $mail->update([
// 'sent_at' => now(),
// 'status' => 'sent',
// 'error' => null
// ]);
//
// } catch (\Throwable $e) {
//
// $tries = $mail->tries + 1;
// $delay = 30 * $tries;
//
// $status = $tries >= $mail->max_tries ? 'failed' : 'pending';
//
// $mail->update([
// 'tries' => $tries,
// 'status' => $status,
// 'error' => $e->getMessage(),
// 'available_at' => $status === 'pending'
// ? now()->addSeconds($delay)
// : null,
// ]);
// }
//
// });
//
// if ($jobs->isEmpty()) {
// usleep(1000000); // 1s
// } else {
// usleep(200000); // 0.2s
// }
// }
// }
//}