Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import json
- from sqlalchemy.future import select
- from starlette.endpoints import HTTPEndpoint
- from starlette.requests import Request
- from starlette.responses import JSONResponse
- from starlette.status import HTTP_200_OK
- from starlette.status import HTTP_201_CREATED
- from starlette.status import HTTP_400_BAD_REQUEST
- from starlette.status import HTTP_404_NOT_FOUND
- from starlette.status import HTTP_422_UNPROCESSABLE_ENTITY
- from mixins import RequestIdentificationMixin
- from services import ExceptionResponse
- from services import ParserController
- from services import Serializer
- from services import TelegramController
- from services import ValidateSchema
- from settings.config import caches
- from src.models import Applications
- from src.schemas import ApplicationSchema
- from src.schemas import ApplicationVerificationSchema
- from src.schemas import ReverifyApplication
- class Application(HTTPEndpoint):
- @RequestIdentificationMixin.protected_request
- async def post(self, request: Request) -> JSONResponse:
- """
- {
- "responses": {
- 201: {
- "description": "A note was create",
- },
- 400: {
- "description": "Bad request",
- }
- },
- "parameters": [
- {
- "name": "app_id",
- "in": "query",
- "description": "Application id",
- "required": true,
- "schema": {
- "type": "integer",
- "items": {
- "type": "integer",
- "format": "int64"
- }
- },
- "style": "simple"
- },
- {
- "name": "app_hash",
- "in": "query",
- "description": "Hash string",
- "required": true,
- "schema": {
- "type": "string",
- "items": {
- "type": "string",
- "format": "str"
- }
- },
- "style": "simple"
- },
- {
- "name": "phone",
- "in": "query",
- "description": "id applications or list if parameters was null",
- "required": true,
- "schema": {
- "type": "string",
- "items": {
- "type": "string",
- "format": "str"
- }
- },
- "style": "simple"
- },
- ],
- }
- """
- query_params = await request.json()
- if query_params:
- applications_data = ValidateSchema(
- schema=ApplicationSchema,
- query_params=query_params,
- )
- if applications_data.is_valid():
- applications_data = applications_data.get_data()
- (
- is_valid,
- errors,
- application,
- ) = await Applications.get_object_from_field(
- select_value=applications_data.app_id,
- session=self.session,
- field="app_id",
- )
- if is_valid:
- return JSONResponse(
- content={
- "message": "Applications is already exists!",
- },
- status_code=HTTP_400_BAD_REQUEST,
- )
- application = Applications(**dict(applications_data))
- telegram_client = TelegramController(
- api_id=application.app_id,
- api_hash=application.app_hash,
- phone=application.phone,
- )
- await telegram_client.client.connect()
- connection = await caches.set_connection(
- application.app_id,
- telegram_client,
- )
- if connection:
- send_code = await telegram_client.send_code()
- if send_code:
- try:
- self.session.add(application)
- application.phone_code_hash = send_code
- await caches.set_send_code(application.app_id, True)
- application.user_id = self.user.id
- await self.session.commit()
- await self.session.close()
- return JSONResponse(
- content=json.dumps(
- {
- "application_id": application.id,
- "status": HTTP_201_CREATED,
- }
- ),
- status_code=HTTP_201_CREATED,
- )
- except Exception as exception:
- print(exception)
- await self.session.close()
- return JSONResponse(
- content="Code is already send!",
- status_code=HTTP_400_BAD_REQUEST,
- )
- else:
- await self.session.close()
- return JSONResponse(
- content=applications_data.error,
- status_code=HTTP_400_BAD_REQUEST
- )
- await self.session.close()
- return JSONResponse(HTTP_400_BAD_REQUEST)
- @RequestIdentificationMixin.protected_request
- async def get(self, request: Request) -> JSONResponse:
- """
- {
- "responses": {
- 200: {
- "description": "Get list applications",
- },
- 400: {
- "description": "Bad request",
- }
- },
- "parameters": [],
- }
- """
- query = await self.session.execute(select(Applications).options())
- json_data = Serializer(
- data=query.scalars().all(),
- )
- await self.session.close()
- return JSONResponse(content=json_data.data)
- class ApplicationRestoreAccount(HTTPEndpoint):
- @RequestIdentificationMixin.protected_request
- async def post(self, request: Request) -> JSONResponse:
- """
- {
- "responses": {
- 200: {
- "description": "A note was reverify",
- },
- 400: {
- "description": "A note not found",
- },
- 422: {
- "description": "Code is already send!",
- },
- },
- "parameters": [
- {
- "name": "app_id",
- "in": "body",
- "description": "app_id application field was passed",
- "required": true,
- "schema": {
- "type": "integer",
- "items": {
- "type": "integer",
- "format": "int64"
- }
- },
- "style": "simple"
- },
- ],
- }
- """
- query_params = await request.json()
- if query_params:
- applications_data_verify = ValidateSchema(
- schema=ReverifyApplication,
- query_params=query_params,
- )
- if applications_data_verify.is_valid():
- applications_data = applications_data_verify.get_data()
- (
- is_valid,
- errors,
- application,
- ) = await Applications.get_object_from_field(
- select_value=applications_data.app_id,
- session=self.session,
- field="app_id",
- )
- if application:
- if (application.state == Applications.ApplicationStateType.READY.name
- and application.string_authentication
- ):
- is_valid, errors, application_ = await Applications.dynamic_update(
- column=application,
- dynamic_column={
- "state": Applications.ApplicationStateType.WAIT_AUTHORIZATION_CODE.name,
- },
- session=self.session,
- )
- telegram_client = TelegramController(
- api_id=application_.app_id,
- api_hash=application_.app_hash,
- phone=application_.phone,
- )
- await telegram_client.client.connect()
- await telegram_client.client.log_out() # Logout session
- await caches.remove_connections(key=application.app_id)
- new_telegram_client = TelegramController(
- api_id=application.app_id,
- api_hash=application.app_hash,
- phone=application.phone,
- )
- await new_telegram_client.client.connect()
- connection = await caches.set_connection(
- application.app_id,
- new_telegram_client,
- )
- if connection:
- send_code = await new_telegram_client.send_code()
- if send_code:
- try:
- self.session.add(application)
- application.phone_code_hash = send_code
- application.user_id = self.user.id
- await caches.set_send_code(application.app_id, True)
- await caches.display_cashes()
- await self.session.commit()
- await self.session.close()
- return JSONResponse(
- content=json.dumps(
- {
- "application_id": application.id,
- "status": HTTP_200_OK,
- }
- ),
- status_code=HTTP_200_OK,
- )
- except Exception as exception:
- print(exception)
- await self.session.close()
- error = ExceptionResponse(
- message="Account is blocked !",
- error=ExceptionResponse.ExceptionResponseType.VALIDATION_ERROR.name,
- ).to_json()
- return JSONResponse(
- content=error,
- status_code=HTTP_422_UNPROCESSABLE_ENTITY,
- )
- await self.session.close()
- error = ExceptionResponse(
- message="Code is already send!",
- error=ExceptionResponse.ExceptionResponseType.VALIDATION_ERROR.name,
- ).to_json()
- return JSONResponse(
- content=error,
- status_code=HTTP_422_UNPROCESSABLE_ENTITY,
- )
- else:
- await self.session.close()
- return JSONResponse(
- content=applications_data.error,
- status_code=HTTP_400_BAD_REQUEST
- )
- await self.session.close()
- return JSONResponse(
- content="Bad request!",
- status_code=HTTP_400_BAD_REQUEST,
- )
- class ApplicationDelete(HTTPEndpoint):
- @RequestIdentificationMixin.protected_request
- async def delete(self, request: Request) -> JSONResponse:
- """
- {
- "responses": {
- 200: {
- "description": "A note was delete",
- },
- 404: {
- "description": "A note not found",
- }
- },
- "parameters": [
- {
- "name": "id",
- "in": "path",
- "description": "id application for delete",
- "required": true,
- "schema": {
- "type": "integer",
- "items": {
- "type": "integer",
- "format": "int64"
- }
- },
- "style": "simple"
- },
- ],
- }
- """
- application_id = request.path_params.get("id")
- result = await Applications.delete(
- column_id=application_id,
- session=self.session,
- )
- if result:
- await self.session.close()
- return JSONResponse(
- content="OK",
- status_code=HTTP_200_OK
- )
- await self.session.close()
- return JSONResponse(
- content=ExceptionResponse(
- message="Record missing",
- error=ExceptionResponse.ExceptionResponseType.NOT_FOUND.name,
- ).to_json(),
- status_code=HTTP_400_BAD_REQUEST,
- )
- class ApplicationVerification(HTTPEndpoint):
- @RequestIdentificationMixin.protected_request
- async def post(self, request) -> JSONResponse:
- """
- {
- "responses": {
- 200: {
- "description": "A note was verify",
- },
- 404: {
- "description": "A note not found",
- }
- },
- "parameters": [
- {
- "name": "code",
- "in": "path",
- "description": "Code from message",
- "required": true,
- "schema": {
- "type": "integer",
- "items": {
- "type": "integer",
- "format": "int64"
- }
- },
- "style": "simple"
- },
- {
- "name": "app_id",
- "in": "path",
- "description": "id application",
- "required": true,
- "schema": {
- "type": "integer",
- "items": {
- "type": "integer",
- "format": "int64"
- }
- },
- "style": "simple"
- },
- ],
- }
- """
- query_params = await request.json()
- if query_params:
- applications_data_verify = ValidateSchema(
- schema=ApplicationVerificationSchema,
- query_params=query_params,
- )
- if applications_data_verify.is_valid():
- applications_data = applications_data_verify.get_data()
- (
- is_valid,
- errors,
- application,
- ) = await Applications.get_object_from_field(
- select_value=applications_data.app_id,
- session=self.session,
- field="app_id",
- )
- if (
- application.state == Applications.ApplicationStateType.READY.name
- and application.string_authentication
- ):
- await self.session.close()
- return JSONResponse(
- content={
- "message": "Application is verify",
- },
- status_code=HTTP_400_BAD_REQUEST,
- )
- if not is_valid:
- await self.session.close()
- return JSONResponse(
- content=errors,
- status_code=HTTP_404_NOT_FOUND,
- )
- telegram_client = await caches.get_connection(
- key=application.app_id,
- )
- if telegram_client.get("code_send"):
- telegram_client = telegram_client.get("_object")
- await telegram_client.client.sign_in(
- application.phone,
- )
- telegram_client_is_authorized = await telegram_client.verify_session(
- code=applications_data.code,
- phone_code_hash=application.phone_code_hash,
- )
- if telegram_client_is_authorized:
- session_string = await telegram_client.create_string_session()
- is_valid, errors, application_ = await Applications.dynamic_update(
- column=application,
- dynamic_column={
- "state": Applications.ApplicationStateType.READY.name,
- "string_authentication": session_string,
- },
- session=self.session,
- )
- await self.session.close()
- if is_valid:
- await caches.remove_connections(key=application.app_id)
- await ParserController.sync_sources_applications()
- await self.session.close()
- return JSONResponse(
- content=json.dumps(
- {
- "application_id": application_.id,
- "status": HTTP_200_OK,
- }
- ),
- status_code=HTTP_200_OK,
- )
- return JSONResponse(
- content=errors,
- status_code=HTTP_404_NOT_FOUND,
- )
- else:
- await self.session.close()
- return JSONResponse(
- content="Application is not send code",
- status_code=HTTP_404_NOT_FOUND,
- )
- await self.session.close()
- return JSONResponse(
- content="Not found",
- status_code=HTTP_404_NOT_FOUND,
- )
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement