Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import pandas as pd
- import graphviz # Required for 'graphviz' format type
- import os
- from io import StringIO # To read string data into DataFrame for example
- # ==============================================================
- # PASSTE THE FULL CODE FOR THE FOLLOWING FUNCTIONS HERE:
- # - _generate_graphviz_code
- # - _generate_mermaid_code
- # - generate_hierarchy_visualization
- # - expand_nested_groups (if you need the expanded table)
- # - add_super_masters (if you use super masters)
- # ==============================================================
- # --- Start of Function Definitions (Paste from previous answers) ---
- def expand_nested_groups(df, master_col, soon_col):
- # --- (Full code from previous answer) ---
- if master_col not in df.columns or soon_col not in df.columns: return None
- df_original = df[[master_col, soon_col]].copy().dropna(subset=[master_col, soon_col])
- df_original[master_col] = df_original[master_col].astype(str)
- df_original[soon_col] = df_original[soon_col].astype(str)
- all_masters = set(df_original[master_col].unique())
- df_expanded = df_original.rename(columns={master_col: 'Level_0', soon_col: 'Level_1'})
- df_final_paths = pd.DataFrame()
- current_level = 1
- max_iterations = 20
- iterations = 0
- while not df_expanded.empty and iterations < max_iterations:
- iterations += 1
- level_col = f'Level_{current_level}'
- if level_col not in df_expanded.columns:
- df_final_paths = pd.concat([df_final_paths, df_expanded], ignore_index=True); break
- is_master_mask = df_expanded[level_col].isin(all_masters)
- df_to_expand_further = df_expanded[is_master_mask].copy()
- df_terminal_at_this_level = df_expanded[~is_master_mask].copy()
- if not df_terminal_at_this_level.empty:
- df_final_paths = pd.concat([df_final_paths, df_terminal_at_this_level], ignore_index=True)
- if df_to_expand_further.empty: break
- current_level += 1
- next_level_col = f'Level_{current_level}'
- df_expanded_next = pd.merge(df_to_expand_further, df_original, left_on=level_col, right_on=master_col, how='left', suffixes=('', '_lookup'))
- df_expanded_next = df_expanded_next.rename(columns={soon_col: next_level_col})
- if master_col in df_expanded_next.columns: df_expanded_next = df_expanded_next.drop(columns=[master_col])
- no_members_mask = df_expanded_next[next_level_col].isna()
- if no_members_mask.any():
- df_final_paths = pd.concat([df_final_paths, df_expanded_next[no_members_mask]], ignore_index=True)
- df_expanded_next = df_expanded_next[~no_members_mask]
- df_expanded = df_expanded_next
- if iterations == max_iterations: print(f"Warning: Max iterations ({max_iterations}) reached."); df_final_paths = pd.concat([df_final_paths, df_expanded], ignore_index=True)
- max_level_found = 0; final_cols = []
- for i in range(current_level + 2):
- col_name = f'Level_{i}';
- if col_name in df_final_paths.columns: final_cols.append(col_name); max_level_found = i
- else: break
- if not final_cols:
- if 'Level_0' in df_final_paths.columns and 'Level_1' in df_final_paths.columns: final_cols = ['Level_0', 'Level_1']; max_level_found = 1
- elif not df_final_paths.empty: final_cols = df_final_paths.columns.tolist();
- if final_cols: max_level_found = len(final_cols) -1
- else: return pd.DataFrame()
- df_final_paths = df_final_paths[final_cols].copy()
- if final_cols: rename_dict = {f'Level_{max_level_found}': 'Ultimate_Member', 'Level_0': 'Initial_Master'}; df_final_paths.rename(columns=rename_dict, inplace=True)
- return df_final_paths.drop_duplicates().reset_index(drop=True)
- def add_super_masters(df_original, master_col, soon_col, super_master_map):
- # --- (Full code from previous answer) ---
- if not isinstance(df_original, pd.DataFrame): return None
- if master_col not in df_original.columns or soon_col not in df_original.columns: return None
- if not isinstance(super_master_map, dict): return None
- new_rows = []
- for super_master, original_masters in super_master_map.items():
- if isinstance(original_masters, (list, tuple)):
- for original_master in original_masters: new_rows.append({master_col: str(super_master), soon_col: str(original_master)})
- else: print(f"Warning: Value for super master '{super_master}' not list/tuple."); continue
- if not new_rows: print("Warning: No new relationships generated."); return df_original.copy()
- df_new_relations = pd.DataFrame(new_rows)
- df_original_copy = df_original.astype(str) # Ensure original data is string
- df_combined = pd.concat([df_new_relations, df_original_copy], ignore_index=True)
- return df_combined
- def _generate_graphviz_code(df_plot, master_col, soon_col, **kwargs):
- # --- (Full code from previous answer) ---
- rankdir = kwargs.get('rankdir', 'TB'); filename = kwargs.get('filename', 'graphviz_output'); render_format = kwargs.get('render_format', 'png'); view = kwargs.get('view', False); render_graph = kwargs.get('render', False)
- all_masters = set(df_plot[master_col].unique()); all_items = set(all_masters).union(set(df_plot[soon_col].unique()))
- dot = graphviz.Digraph(comment='Group Hierarchy', format=render_format, graph_attr={'rankdir': rankdir}, strict=True)
- node_map = {}
- for item in all_items:
- node_id = ''.join(filter(str.isalnum, item.replace(' ', '_'))); node_label = item;
- if not node_id: node_id = f"node_{hash(item)}"
- node_map[item] = node_id # Store mapping for edges
- if item in all_masters: dot.node(node_id, node_label, shape='box', style='filled', fillcolor='lightblue')
- else: dot.node(node_id, node_label, shape='ellipse')
- for _, row in df_plot.iterrows():
- master_name = row[master_col]; soon_name = row[soon_col]
- master_node_id = node_map.get(master_name)
- soon_node_id = node_map.get(soon_name)
- if master_node_id and soon_node_id: dot.edge(master_node_id, soon_node_id)
- else: print(f"Warning: Missing node ID for edge {master_name} -> {soon_name}")
- if render_graph:
- try: rendered_path = dot.render(filename, view=view, cleanup=True, format=render_format); print(f"Graph saved to: {rendered_path}"); return rendered_path
- except graphviz.ExecutableNotFound: print("\nError: graphviz.ExecutableNotFound..."); return None
- except Exception as e: print(f"\nAn error occurred during graph rendering: {e}"); return None
- else: return dot
- def _generate_mermaid_code(df_plot, master_col, soon_col, **kwargs):
- # --- (Full code from previous answer) ---
- rankdir = kwargs.get('rankdir', 'TB'); save_path = kwargs.get('save_to_file')
- mermaid_code = [f"graph {rankdir};"]; all_masters = set(df_plot[master_col].unique()); all_items = set(all_masters).union(set(df_plot[soon_col].unique()))
- node_definitions = []; style_definitions = []; node_map = {}
- for i, item in enumerate(all_items):
- safe_chars = ''.join(filter(str.isalnum, item)); mermaid_id = f"id_{i}_{safe_chars[:20]}";
- if not safe_chars: mermaid_id = f"id_empty_{i}"
- node_map[item] = mermaid_id; node_label = item.replace('"', '#quot;')
- node_definitions.append(f' {mermaid_id}["{node_label}"]')
- if item in all_masters: style_definitions.append(f' style {mermaid_id} fill:#lightblue,stroke:#333,stroke-width:1px')
- mermaid_code.extend(node_definitions); mermaid_code.extend(style_definitions); edge_definitions = []
- for _, row in df_plot.iterrows():
- master_name = row[master_col]; soon_name = row[soon_col]
- if master_name in node_map and soon_name in node_map: master_id = node_map[master_name]; soon_id = node_map[soon_name]; edge_definitions.append(f" {master_id} --> {soon_id};")
- else: print(f"Warning: Could not find map ID for edge: {master_name} -> {soon_name}")
- mermaid_code.extend(edge_definitions); final_code = "\n".join(mermaid_code)
- if save_path:
- try:
- with open(save_path, 'w', encoding='utf-8') as f: f.write(final_code); print(f"Mermaid code saved to: {save_path}")
- except Exception as e: print(f"Error saving Mermaid code to {save_path}: {e}")
- return final_code
- def generate_hierarchy_visualization(df_input, master_col, soon_col, format_type='graphviz', **kwargs):
- # --- (Full code from previous answer, including docstring) ---
- """Generates hierarchy visualization code or renders a graph."""
- if not isinstance(df_input, pd.DataFrame): print("Error: df_input must be a pandas DataFrame."); return None
- if master_col not in df_input.columns or soon_col not in df_input.columns: print(f"Error: Columns '{master_col}' or '{soon_col}' not found."); return None
- df_plot = df_input[[master_col, soon_col]].copy().dropna(subset=[master_col, soon_col])
- try: df_plot[master_col] = df_plot[master_col].astype(str); df_plot[soon_col] = df_plot[soon_col].astype(str)
- except Exception as e: print(f"Warning: Could not convert columns to string. Error: {e}")
- format_type = format_type.lower()
- if format_type == 'graphviz': print("--- Generating Graphviz data ---"); return _generate_graphviz_code(df_plot, master_col, soon_col, **kwargs)
- elif format_type == 'mermaid': print("--- Generating Mermaid code ---"); return _generate_mermaid_code(df_plot, master_col, soon_col, **kwargs)
- else: print(f"Error: Unsupported format_type '{format_type}'."); return None
- # --- End of Function Definitions ---
- # ==============================================================
- # EXAMPLE USAGE
- # ==============================================================
- # 1. --- Define Input Data ---
- # Using the previous example data
- data = """
- Group Name,Member
- Core Dev Team,Alice
- Core Dev Team,Bob
- Core Dev Team,Charlie
- UI/UX Crew,David
- UI/UX Crew,Eve
- Backend Brigade,Core Dev Team
- Backend Brigade,Frank
- Frontend Force,UI/UX Crew
- Frontend Force,Grace
- Project Dragon,Backend Brigade
- Project Dragon,Frontend Force
- Project Dragon,Hannah
- QA Squad,Ian
- QA Squad,Judy
- Operations Unit,Project Dragon
- Operations Unit,QA Squad
- Operations Unit,Kevin
- """
- data_io = StringIO(data)
- df_input_original = pd.read_csv(data_io)
- master_column_name = 'Group Name'
- soon_column_name = 'Member'
- print("--- Original Input DataFrame ---")
- print(df_input_original.head()) # Display first few rows
- # 2. --- Optional: Define and Add Super Masters ---
- super_masters = {
- 'Overall Company': ['Operations Unit', 'Frontend Force'],
- 'Special Projects': ['Backend Brigade']
- }
- # Create the augmented DataFrame (includes original + super masters)
- df_augmented = add_super_masters(
- df_input_original,
- master_column_name,
- soon_column_name,
- super_masters
- )
- # --- >> NEW: Save the Augmented DataFrame << ---
- if df_augmented is not None:
- augmented_filename_csv = 'augmented_hierarchy_input.csv'
- # augmented_filename_excel = 'augmented_hierarchy_input.xlsx' # Optional Excel
- try:
- df_augmented.to_csv(augmented_filename_csv, index=False, encoding='utf-8')
- print(f"\n--- Augmented DataFrame saved to: {augmented_filename_csv} ---")
- # To save as Excel (requires openpyxl: pip install openpyxl):
- # df_augmented.to_excel(augmented_filename_excel, index=False)
- # print(f"--- Augmented DataFrame saved to: {augmented_filename_excel} ---")
- except Exception as e:
- print(f"Error saving augmented DataFrame: {e}")
- # Display head of augmented data
- print(df_augmented.head(10).to_string()) # Show new + old rows
- print("...")
- else:
- print("Failed to create augmented DataFrame. Skipping subsequent steps.")
- # Exit or handle error appropriately if needed
- exit() # Example: Stop script if augmentation failed
- # 3. --- Optional: Expand the Augmented Hierarchy ---
- # Use the 'df_augmented' DataFrame as input for expansion
- df_expanded_result = expand_nested_groups(
- df_augmented, # Use the DataFrame with super masters
- master_column_name,
- soon_column_name
- )
- # --- >> NEW: Save the Expanded DataFrame << ---
- if df_expanded_result is not None:
- expanded_filename_csv = 'expanded_hierarchy_paths.csv'
- # expanded_filename_excel = 'expanded_hierarchy_paths.xlsx' # Optional Excel
- try:
- df_expanded_result.to_csv(expanded_filename_csv, index=False, encoding='utf-8')
- print(f"\n--- Expanded Paths DataFrame saved to: {expanded_filename_csv} ---")
- # To save as Excel (requires openpyxl: pip install openpyxl):
- # df_expanded_result.to_excel(expanded_filename_excel, index=False)
- # print(f"--- Expanded Paths DataFrame saved to: {expanded_filename_excel} ---")
- except Exception as e:
- print(f"Error saving expanded DataFrame: {e}")
- # Display the expanded result
- print("\n--- Fully Expanded DataFrame (from Augmented Data) ---")
- print(df_expanded_result.to_string())
- else:
- print("Failed to expand the augmented DataFrame.")
- # 4. --- Generate Visualizations (using Augmented Data) ---
- # Now use 'df_augmented' as the input for visualization functions
- print("\n======= Generating Visualizations (from Augmented Data) =======")
- # Example 4a: Generate Mermaid code string and save to file
- mermaid_output = generate_hierarchy_visualization(
- df_input=df_augmented, # Use the DataFrame with super masters
- master_col=master_column_name,
- soon_col=soon_column_name,
- format_type='mermaid',
- rankdir='TB',
- save_to_file='final_hierarchy.mmd' # Save Mermaid code
- )
- if mermaid_output:
- print("\n--- Mermaid Code Snippet: ---")
- print("```mermaid")
- # Print first few lines for preview if long
- print('\n'.join(mermaid_output.splitlines()[:15]))
- if len(mermaid_output.splitlines()) > 15: print("...")
- print("```")
- print("(Full code saved to final_hierarchy.mmd)")
- # Example 4b: Render Graphviz to PNG and view
- gv_rendered_path = generate_hierarchy_visualization(
- df_input=df_augmented, # Use the DataFrame with super masters
- master_col=master_column_name,
- soon_col=soon_column_name,
- format_type='graphviz',
- render=True, # Render the image
- filename='final_hierarchy_gv', # Base filename for .png and .gv
- render_format='png',
- view=True, # Try to open the PNG
- rankdir='LR'
- )
- if gv_rendered_path:
- print(f"Graphviz rendering process initiated for {gv_rendered_path}.")
- print("\n--- Script finished ---")
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement