Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import asyncio
- import os
- import argparse
- import logging
- import xml.etree.ElementTree as ET
- from bs4 import BeautifulSoup
- from aiofiles import open as aio_open
- import re
- import json
- logging.basicConfig(level=logging.INFO)
- MATHJAX_SCRIPT = """
- <script type="text/javascript" async
- src="https://cdnjs.cloudflare.com/ajax/libs/mathjax/2.7.7/MathJax.js?config=TeX-MML-AM_CHTML">
- </script>
- """
- async def read_file(file_path):
- try:
- async with aio_open(file_path, 'r', encoding='utf-8') as f:
- return await f.read()
- except Exception as e:
- logging.error(f"Error reading {file_path}: {e}")
- return None
- async def write_file(file_path, content):
- try:
- async with aio_open(file_path, 'w', encoding='utf-8') as f:
- await f.write(content)
- logging.info(f"Content saved to {file_path}")
- except Exception as e:
- logging.error(f"Error writing to {file_path}: {e}")
- def clean_html(html_content):
- soup = BeautifulSoup(html_content, 'html.parser')
- for tag in soup(['script', 'a']):
- tag.decompose()
- for tag in soup.find_all(True):
- if not tag.contents:
- tag.extract()
- return str(soup)
- async def create_json_from_html(html_path, json_path):
- html_content = await read_file(html_path)
- if not html_content:
- print("No HTML content found")
- return
- soup = BeautifulSoup(html_content, 'html.parser')
- tr_tags = soup.find_all('tr')
- if not tr_tags:
- print("No <tr> tags found")
- return
- questions_json = []
- question_text = ''
- options = {}
- for tr in tr_tags:
- td_tags = tr.find_all('td')
- for td in td_tags:
- td_content = td.get_text().strip()
- question_match = re.match(r"(\d+\.)\s*(.*)", td_content)
- option_match = re.match(r"(a\)|b\)|c\)|d\))\s*(.*)", td_content)
- if question_match:
- # Store previous question and options if they exist
- if question_text and options:
- questions_json.append({
- "Question": question_text,
- "Options": options
- })
- # Start a new question
- question_text = question_match.group(2).strip()
- options = {}
- elif option_match:
- option_key = option_match.group(1)
- option_value = option_match.group(2).strip()
- options[option_key] = option_value
- # Handle the last question
- if question_text and options:
- questions_json.append({
- "Question": question_text,
- "Options": options
- })
- await write_file(json_path, json.dumps(questions_json, indent=4))
- # You'll have to define or import read_file and write_file functions yourself.
- # # Working for questions otions not found
- # async def create_json_from_html(html_path, json_path):
- # html_content = await read_file(html_path)
- # if not html_content:
- # print("No HTML content found")
- # return
- # soup = BeautifulSoup(html_content, 'html.parser')
- # tr_tags = soup.find_all('tr')
- # if not tr_tags:
- # print("No <tr> tags found")
- # return
- # questions_json = []
- # question_text = ''
- # options = {}
- # for tr in tr_tags:
- # text_content = tr.get_text().strip()
- # # Identify questions
- # question_match = re.match(r"(\d+\.)\s*(.*)", text_content)
- # if question_match:
- # if question_text and options:
- # questions_json.append({
- # "Question": question_text,
- # "Options": options
- # })
- # question_text = question_match.group(2).strip()
- # options = {}
- # continue # Move on to next tr as the question has been processed
- # # Identify options
- # td_tags = tr.find_all('td')
- # for td in td_tags:
- # td_content = td.get_text().strip()
- # option_match = re.match(r"(a\)|b\)|c\)|d\))\s*(.*)", td_content)
- # if option_match:
- # option_key = option_match.group(1)
- # option_value = option_match.group(2).strip()
- # options[option_key] = option_value
- # # Handle the last question
- # if question_text and options:
- # questions_json.append({
- # "Question": question_text,
- # "Options": options
- # })
- # await write_file(json_path, json.dumps(questions_json, indent=4))
- # Working unorganised
- # async def create_json_from_html(html_path, json_path):
- # html_content = await read_file(html_path)
- # if not html_content:
- # return
- # soup = BeautifulSoup(html_content, 'html.parser')
- # tr_tags = soup.find_all('tr')
- # questions_json = []
- # question_text = ''
- # options = {}
- # for tr in tr_tags:
- # text_content = tr.get_text().strip()
- # question_match = re.match(r"(\d+\.)\s*(.*)", text_content)
- # option_match = re.match(r"(a\)|b\)|c\)|d\))\s*(.*)", text_content)
- # if question_match:
- # if question_text and options:
- # questions_json.append({
- # "Question": question_text,
- # "Options": options
- # })
- # question_text = question_match.group(2).strip()
- # options = {}
- # elif option_match:
- # option_key = option_match.group(1)
- # option_value = option_match.group(2).strip()
- # options[option_key] = option_value
- # if question_text and options:
- # questions_json.append({
- # "Question": question_text,
- # "Options": options
- # })
- # await write_file(json_path, json.dumps(questions_json, indent=4))
- async def main(args):
- try:
- process = await asyncio.create_subprocess_shell(f'pandoc -s --toc --section-divs {args.input_docx} -o {args.output_html}')
- await process.communicate()
- logging.info("HTML Pandoc command executed successfully.")
- except Exception as e:
- logging.error(f"Error executing Pandoc commands: {e}")
- return
- html_content = await read_file(args.output_html)
- if html_content is None:
- return
- cleaned_html = clean_html(html_content)
- cleaned_html_with_mathjax = MATHJAX_SCRIPT + cleaned_html
- await write_file(args.output_modified_html, cleaned_html_with_mathjax)
- await create_json_from_html(args.output_modified_html, args.output_json)
- if __name__ == "__main__":
- parser = argparse.ArgumentParser(description='Convert DOCX to HTML to JSON.')
- parser.add_argument('--input_docx', type=str, default='sdss.docx', help='Input DOCX file path')
- parser.add_argument('--output_html', type=str, default='soutput.html', help='Output HTML file path')
- parser.add_argument('--output_json', type=str, default='soutput.json', help='Output JSON file path')
- parser.add_argument('--output_modified_html', type=str, default='soutput_modified.html', help='Modified output HTML file path')
- args = parser.parse_args()
- asyncio.run(main(args))
- # import json
- # import re
- # import asyncio
- # import aiohttp
- # async def fetch_pdf_and_log_json():
- # url = "http://20.244.0.255:8080/api/transformer"
- # headers = {}
- # try:
- # async with aiohttp.ClientSession() as session:
- # form_data = aiohttp.FormData()
- # form_data.add_field('file', open('sdss.pdf', 'rb'), filename='sdss.pdf', content_type='application/pdf')
- # async with session.post(url, headers=headers, data=form_data) as response:
- # if response.status == 200:
- # json_response = await response.json()
- # with open('api_response_log.json', 'w') as json_file:
- # json.dump(json_response, json_file, indent=4)
- # print(f"Successfully logged the JSON response to 'api_response_log.json'")
- # return json_response # Assuming 'text' is a key in the returned JSON that holds the text data.
- # else:
- # print(f"Failed to make the API request. Status code: {response.status}")
- # return None
- # except Exception as e:
- # print(f"An error occurred: {e}")
- # return None
- # def extract_questions(text):
- # questions_json = []
- # pattern = r"\d+\.\s*([\w\s\W]+?)(?:a\)|b\)|c\)|d\))([\w\s\W]+?)(?:\d+\.\s*|$)"
- # matches = re.findall(pattern, text, re.DOTALL)
- # print(f"Matches: {matches}") # Debug line
- # for i, (question, options) in enumerate(matches):
- # question_dict = {"Question": question.strip(), "Options": {}}
- # for idx, option in enumerate(re.split(r"a\)|b\)|c\)|d\)", options.strip())[1:]):
- # question_dict["Options"][chr(ord('a') + idx)] = option.strip()
- # questions_json.append(question_dict)
- # return questions_json
- # async def main():
- # fetched_text = await fetch_pdf_and_log_json()
- # if fetched_text:
- # with open('debug_text.txt', 'w') as debug_file:
- # debug_file.write(json.dumps(fetched_text)) # Debug line
- # for text in fetched_text.items():
- # print(text[1])
- # extracted_questions = extract_questions(text[1])
- # with open('structured_questions.json', 'w') as f:
- # json.dump(extracted_questions, f, indent=4)
- # print(f"Successfully saved the structured questions to 'structured_questions.json'")
- # if __name__ == "__main__":
- # asyncio.run(main())
Add Comment
Please, Sign In to add comment