Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #!/bin/bash
- CMD="$(basename $BASH_SOURCE .sh)"
- PROC=rabbitmq_bottel_recv$1
- if pidof -s $PROC >/dev/null; then
- echo "$CMD $PROC already run"
- else
- echo "$CMD $PROC not run"
- php rabbitmq_bottel_recv.php $PROC
- fi
- <?php
- ignore_user_abort(true);
- set_time_limit(0);
- ini_set('memory_limit', -1);
- if(empty($argv[1]))exit(1);
- define('PROC_NAME', $argv[1]);
- define('LOCK_FILE', "/var/run/" . basename(PROC_NAME, ".php") . ".lock");
- if (!tryLock())die("Already running.\n");
- # remove the lock on exit (Control+C doesn't count as 'exit'?)
- register_shutdown_function('unlink', LOCK_FILE);
- $pid = getmypid();
- if(!cli_set_process_title(PROC_NAME)){
- echo "ERROR set_process_title::$pid...\n";
- exit(1);
- }
- require_once __DIR__ . '/vendor/autoload.php';
- use PhpAmqpLib\Connection\AMQPStreamConnection;
- use PhpAmqpLib\Message\AMQPMessage;
- use Psr\Http\Message\ResponseInterface;
- use Psr\Http\Message\ServerRequestInterface;
- use \Aws\S3\S3Client;
- use \Aws\Exception\AwsException;
- use \Aws\S3\Exception\S3Exception;
- //Подключение Madeline с гитхаба
- if (!file_exists(__DIR__ . '/madeline.php')) {
- copy('https://phar.madelineproto.xyz/madeline.php', __DIR__ . '/madeline.php');
- }
- include __DIR__ . '/madeline.php';
- $settings['app_info'] = [
- 'api_id' => '1111*',
- 'api_hash' => '0v**',
- ];
- $MadelineProto = new \danog\MadelineProto\API('session.madeline', $settings);
- $MadelineProto->start();
- $me = $MadelineProto->get_self();
- #######
- $pdo = new PDO("mysql:host=localhost;port=3307;dbname=bottelegram;charset=utf8",
- 'bottelegram1','%');
- $pdo->setAttribute(PDO::ATTR_ERRMODE, PDO::ERRMODE_EXCEPTION);
- $pdo->setAttribute(PDO::ATTR_DEFAULT_FETCH_MODE, PDO::FETCH_ASSOC);
- $token='bot-telegram:3c9e5145';
- $client_jid='bot-telegram@localhost';
- $bot=new CmmntBot\Bot($token);
- ###########
- $queue_name='%:bottelegram';
- $connection = new AMQPStreamConnection('%', 5672, 'u', 'p');
- $channel = $connection->channel();
- $channel->queue_declare($queue_name, false, true, false, false);
- echo " [*] Waiting for messages. To exit press CTRL+C\n";
- $channel->basic_qos(null,1,null);
- $callback = function (AMQPMessage $msg) use($bot,$client_jid,$pdo,$MadelineProto){
- echo ' [x] Received ';
- $json=json_decode( $msg->body ,true);
- // $pdo
- $sender=$json['sender'];
- $to=$json['jid_group'];
- //$sign=md5("BOT{$client_jid}{$to}");;
- $id=(int)$json['id'];
- $telegram_channel=$json['telegram_channel'];
- echo $id.':'.$telegram_channel.PHP_EOL;
- $telegram_channel=substr($telegram_channel,1);
- #get offset id
- $sql='SELECT offset_id from `log` where id='.$id.' limit 1';
- $q = $pdo->prepare($sql);
- $q->execute();
- $res = $q->fetchAll(PDO::FETCH_ASSOC);
- $min_id=$res[0]['offset_id']??0;
- $offset_id_new=$min_id;
- echo PHP_EOL;
- $q = new SplStack();
- $channel = $telegram_channel;
- $offset_id = 0;
- $limit = 90;
- echo '->'.$min_id.PHP_EOL;
- //max id определить
- $messages_Messages = $MadelineProto->messages->getHistory(['peer' => $channel, 'offset_id' => 0, 'offset_date' => 0, 'add_offset' => 0, 'limit' => 1, 'max_id' => 0, 'min_id' => 0, 'hash' => 0 ]);
- foreach ($messages_Messages['messages'] as $message) {
- //echo "id: " . $message['id'].':'.@$message['message'];
- $end_offset=$message['id'];
- break;
- }
- echo 'end_offset:'.$end_offset .PHP_EOL;
- if($offset_id_new!=$end_offset){
- $offset_id_new+=$limit;
- //echo $offset_id_new.PHP_EOL;
- //echo $end_offset.PHP_EOL;
- /*if($end_offset<$offset_id_new){
- $offset_id_new=$end_offset;
- $limit=$end_offset-$min_id;
- }*/
- echo 'for:'.$offset_id_new.':'.$end_offset.PHP_EOL;
- $for=array();
- $offset_id1=$end_offset;
- for($i=$offset_id_new;$i<=$end_offset;$i+=$limit){
- $offset_id1=$i+1;
- $for[]=['offset_id'=>$i+1,'limit'=>$limit+1, 'max_id' => $i+1, 'min_id' => $i-$limit];
- }
- if($offset_id1<$end_offset){
- $for[]=['offset_id'=>$end_offset+1,'limit'=>$end_offset-$offset_id1+1, 'max_id' => $end_offset+1, 'min_id' => $offset_id1-1];
- }
- if(empty($for)){
- $for[]=['offset_id'=>$end_offset+1,'limit'=>$end_offset+1, 'max_id' => $end_offset+1, 'min_id' =>0];
- }
- /*$t1=$for[116];
- $for=[];
- $for[]=$t1;*/
- print_r($for);
- // exit;
- foreach($for as $v){
- $st=new SplStack();
- $messages_Messages = $MadelineProto->messages->getHistory(['peer' => $channel, 'offset_id' => $v['offset_id'], 'offset_date' => 0, 'add_offset' => 0, 'limit' => $v['limit'], 'max_id' => $v['max_id'], 'min_id' => $v['min_id'], 'hash' => 0 ]);
- echo '::offset_id:'.($v['offset_id']).':min_id:'.$v['min_id'].':'.$end_offset.':' .PHP_EOL;
- if(!empty($messages_Messages['messages'])) {
- foreach ($messages_Messages['messages'] as $message) {
- if(isset($message['message'])){
- // echo "id: " . $message['id'];//.':'.$file_url['url'] . "\n";
- // echo @$message['message'];
- //echo "\n";
- #################
- /*if(!empty($message['media']) && ($message['media']['_']=='messageMediaPhoto'||$message['media']['_']=='messageMediaDocument')){
- try{
- $info =$MadelineProto->getDownloadInfo($message['media']);
- echo $message['id'].':'.$info['name'].$info['ext'].PHP_EOL;
- $tmpfile = tempnam("/tmp", "tl");
- if($info['size']>0){
- $MadelineProto->downloadToFile($message['media'], $tmpfile);
- $file_url=uploadS3($sender,$info['name'].$info['ext'],$tmpfile);
- unlink($tmpfile);
- }
- }catch(Exception $e){
- //$info =$MadelineProto->getDownloadInfo($message['media']);
- // [_] => messageMediaWebPage
- }
- }*/
- ####################
- $aa= [
- 'id'=>$message['id'],
- 'msg'=>@$message['message'],
- ];
- if(!empty($file_url['url'])){
- $aa['media_url']=$file_url['url'];
- }
- $st->push($aa);
- }
- }
- }
- $st->rewind();
- while($st->valid()){
- $c = $st->current();
- //print_r($c);
- if(!empty($c['media_url'])){
- echo 'send:'.$c['id'].':'.$c['media_url'] .PHP_EOL;
- }else{
- echo 'send:'.$c['id'] .PHP_EOL;
- }
- $e=array();
- $e["body"]=$c['msg'].' https://t.me/'.$telegram_channel.'/'.$c['id'];
- // $e["image"]
- try{
- sendMessage($to,$e );
- }catch(Exception $e){}
- $st->next();
- }
- //break;
- }
- echo '$i:'.$i.' $end_offset'.$end_offset.PHP_EOL;
- $sql=sprintf(
- "INSERT INTO `log` (`id` ,`telegram_channel`,`offset_id`)VALUES(%d,%s,%d) ON DUPLICATE KEY UPDATE offset_id=VALUES(offset_id),tdate=NOW()",
- $id,
- $pdo->quote($telegram_channel),
- $end_offset
- );
- if($pdo->exec($sql)){}
- }
- echo 'end'.$offset_id_new.PHP_EOL;
- /*try{
- }catch(Exception $e){}*/
- echo "\n";
- $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
- };
- $channel->basic_consume($queue_name, '',
- false,
- false, //подтвердить
- false,
- false, $callback);
- while ($channel->is_consuming()) {
- //while(count($channel->callbacks)) {
- $channel->wait();
- }
- $channel->close();
- $connection->close();
- #######################################
- function tryLock()
- {
- # If lock file exists, check if stale. If exists and is not stale, return TRUE
- # Else, create lock file and return FALSE.
- if (@symlink("/proc/" . getmypid(), LOCK_FILE) !== FALSE) # the @ in front of 'symlink' is to suppress the NOTICE you get if the LOCK_FILE exists
- return true;
- # link already exists
- # check if it's stale
- if (is_link(LOCK_FILE) && !is_dir(LOCK_FILE))
- {
- unlink(LOCK_FILE);
- # try to lock again
- return tryLock();
- }
- return false;
- }
- function uploadS3($sender,$FileName,$uploadedFile){
- //$uploadedFile = $uploadedFiles[$var];
- $s3 = new S3Client(array(
- 'region' => "eu-west-1",
- 'version' => 'latest',
- 'endpoint' => "http://**:8000",
- 'credentials' => array(
- 'key' =>"*",
- 'secret' => "*",
- ),
- 'use_path_style_endpoint' => true
- //,'debug'=>true
- ));
- $md5=md5_file($uploadedFile);
- $Key ="{$md5[0]}/{$md5[1]}/{$md5[2]}/{$md5[3]}/". substr($md5,4).'/'.$FileName;
- if($s3->doesObjectExist("app",$Key))return ['exist'=>true,'url'=>$s3->getObjectUrl("app", $Key)];
- $result = $s3->putObject(array(
- 'Bucket' => "bot",
- 'Key' => $Key,
- 'SourceFile' => $uploadedFile,
- 'ACL' => 'public-read',
- ));
- if($result instanceof \Aws\Result){
- return ['url'=>$result->get('ObjectURL')];
- }else throw new \Exception('S3 not upload',504);
- }
- function sendMessage($jid,$param){
- try{
- $id=explode('@',$jid)[0];
- $context = stream_context_create(array(
- 'http' => array(
- 'method' => 'POST',
- 'header' => "Content-Type: application/json\r\n"
- . 'Authorization: Basic ' . base64_encode("user@localhost:pass"),
- 'content' => json_encode($param)
- ),
- "ssl"=>array(
- "verify_peer"=>false,
- "verify_peer_name"=>false,
- ),
- )
- );
- $j= file_get_contents("https://*:8089/api/rooms/$id/messages", false, $context);
- $j=json_decode($j,true);
- return $j['id'];
- } catch (Exception $e) {
- return false;
- }
- }
Add Comment
Please, Sign In to add comment